Skip to main content

Crate zlink_tokio

Crate zlink_tokio 

Source
Expand description

crates.io API Documentation Build Status

Project logo

zlink

A Rust implementation of the Varlink IPC protocol. zlink provides a safe, async API for building Varlink services and clients.

§Overview

Varlink is a simple, JSON-based IPC protocol that enables communication between system services and applications. zlink makes it easy to implement Varlink services in Rust with:

  • Async-first design: Built on async/await for efficient concurrent operations.
  • Type safety: Leverage Rust’s type system with derive macros and code generation.
  • Multiple transports: Unix domain sockets and (upcoming) USB support.
  • Code generation: Generate Rust code from Varlink IDL files.

§Project Structure

The zlink project consists of several subcrates:

  • zlink: The main unified API crate that re-exports functionality based on enabled features. This is the only crate you will want to use directly in your application and services.
  • zlink-core: Core no-std foundation providing essential Varlink types and traits.
  • zlink-macros: Contains the attribute and derive macros.
  • zlink-tokio: Tokio-based transport implementations and runtime integration.
  • zlink-codegen: Code generation tool for creating Rust bindings from Varlink IDL files.

§Examples

§Example: Calculator Service and Client

Here’s a complete example showing both service and client implementations using zlink’s attribute macros:

use serde::{Deserialize, Serialize};
use tokio::{select, sync::oneshot, fs::remove_file};
use zlink::{introspect, proxy, service, unix, ReplyError, Server};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create a channel to signal when server is ready.
    let (ready_tx, ready_rx) = oneshot::channel();

    // Run server and client concurrently.
    select! {
        res = run_server(ready_tx) => res?,
        res = run_client(ready_rx) => res?,
    }

    Ok(())
}

async fn run_client(ready_rx: oneshot::Receiver<()>) -> Result<(), Box<dyn std::error::Error>> {
    // Wait for server to be ready.
    ready_rx.await.map_err(|_| "Server failed to start")?;

    // Connect to the calculator service.
    let mut conn = unix::connect(SOCKET_PATH).await?;

    // Use the proxy-generated methods.
    let result = conn.add(5.0, 3.0).await?.unwrap();
    assert_eq!(result.result, 8.0);

    let result = conn.multiply(4.0, 7.0).await?.unwrap();
    assert_eq!(result.result, 28.0);

    // Handle errors properly.
    let Err(CalculatorError::DivisionByZero { message }) = conn.divide(10.0, 0.0).await? else {
        panic!("Expected DivisionByZero error");
    };
    assert_eq!(message, "Cannot divide by zero");

    // Test invalid input error with large dividend.
    let Err(CalculatorError::InvalidInput { field, reason }) =
        conn.divide(2000000.0, 2.0).await?
    else {
        panic!("Expected InvalidInput error");
    };
    println!("Field: {field}, Reason: {reason}");

    let stats = conn.get_stats().await?.unwrap();
    assert_eq!(stats.count, 2);
    println!("Stats: {stats:?}");

    Ok(())
}

// The client proxy.
#[proxy("org.example.Calculator")]
trait CalculatorProxy {
    async fn add(
        &mut self,
        a: f64,
        b: f64,
    ) -> zlink::Result<Result<CalculationResult, CalculatorError<'_>>>;
    async fn multiply(
        &mut self,
        x: f64,
        y: f64,
    ) -> zlink::Result<Result<CalculationResult, CalculatorError<'_>>>;
    async fn divide(
        &mut self,
        dividend: f64,
        divisor: f64,
    ) -> zlink::Result<Result<CalculationResult, CalculatorError<'_>>>;
    async fn get_stats(
        &mut self,
    ) -> zlink::Result<Result<Statistics<'_>, CalculatorError<'_>>>;
}

// Types shared between client and server.
#[derive(Debug, Serialize, Deserialize)]
struct CalculationResult {
    result: f64,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
struct Statistics<'a> {
    count: u64,
    #[serde(borrow)]
    operations: Vec<&'a str>,
}

#[derive(Debug, PartialEq, ReplyError, introspect::ReplyError)]
#[zlink(interface = "org.example.Calculator")]
enum CalculatorError<'a> {
    DivisionByZero {
        message: &'a str,
    },
    InvalidInput {
        field: &'a str,
        reason: &'a str,
    },
}

async fn run_server(ready_tx: oneshot::Sender<()>) -> Result<(), Box<dyn std::error::Error>> {
    let _ = remove_file(SOCKET_PATH).await;

    // Setup and run the server.
    let listener = unix::bind(SOCKET_PATH)?;
    let server = Server::new(listener, Calculator::new());

    // Signal that server is ready.
    let _ = ready_tx.send(());

    server.run().await.map_err(|e| e.into())
}

// The calculator service.
struct Calculator {
    operations: Vec<String>,
}

impl Calculator {
    fn new() -> Self {
        Self { operations: Vec::new() }
    }
}

#[service(interface = "org.example.Calculator")]
impl Calculator {
    async fn add(&mut self, a: f64, b: f64) -> CalculationResult {
        self.operations.push(format!("add({a}, {b})"));
        CalculationResult { result: a + b }
    }

    async fn multiply(&mut self, x: f64, y: f64) -> CalculationResult {
        self.operations.push(format!("multiply({x}, {y})"));
        CalculationResult { result: x * y }
    }

    async fn divide(
        &mut self,
        dividend: f64,
        divisor: f64,
    ) -> Result<CalculationResult, CalculatorError<'_>> {
        if divisor == 0.0 {
            Err(CalculatorError::DivisionByZero {
                message: "Cannot divide by zero",
            })
        } else if dividend < -1000000.0 || dividend > 1000000.0 {
            Err(CalculatorError::InvalidInput {
                field: "dividend",
                reason: "must be within range",
            })
        } else {
            self.operations
                .push(format!("divide({dividend}, {divisor})"));
            Ok(CalculationResult {
                result: dividend / divisor,
            })
        }
    }

    async fn get_stats(&self) -> Statistics<'_> {
        let ops: Vec<&str> = self.operations.iter().map(|s| s.as_str()).collect();
        Statistics {
            count: self.operations.len() as u64,
            operations: ops,
        }
    }
}

const SOCKET_PATH: &str = "/tmp/calculator_example.varlink";

Note: Typically you would want to spawn the server in a separate task but that’s not what we did in the example above. Please refer to Server::run docs for the reason.

§Code Generation from IDL

zlink-codegen can generate Rust code from Varlink interface description files:

# Install the code generator
cargo install zlink-codegen

# Let's create a file containing Varlink IDL
cat <<EOF > calculator.varlink
# Calculator service interface
interface org.example.Calculator

type CalculationResult (
    result: float
)

type DivisionByZeroError (
    message: string
)

method Add(a: float, b: float) -> (result: float)
method Multiply(x: float, y: float) -> (result: float)
method Divide(dividend: float, divisor: float) -> (result: float)
error DivisionByZero(message: string)
EOF

# Generate Rust code from the IDL
zlink-codegen calculator.varlink > src/calculator_gen.rs

The generated code includes type definitions and proxy traits ready to use in your application.

§Pipelining

zlink supports method call pipelining for improved throughput and reduced latency. The proxy macro adds variants for each method named chain_<method_name> and a trait named <TraitName>Chain that allow you to batch multiple requests and send them out at once without waiting for individual responses.

Note: Chain methods are only generated for proxy methods that use owned types (DeserializeOwned) in their return type. Methods with borrowed types (non-static lifetimes) don’t get chain variants since the internal buffer may be reused between stream iterations. Input arguments can still use borrowed types.

use futures_util::{StreamExt, pin_mut};
use serde::{Deserialize, Serialize};
use zlink::{proxy, unix, ReplyError};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Connect to a batch processing service
    let mut conn = unix::connect("/tmp/batch_processor.varlink").await?;

    // Send multiple pipelined requests without waiting for responses.
    // Note: chain methods are only generated for proxy methods with owned return types.
    let replies = conn
        .chain_process(1, "first")?
        .process(2, "second")?
        .process(3, "third")?
        .batch_process(vec![
            ProcessRequest { id: 4, data: "batch1" },
            ProcessRequest { id: 5, data: "batch2" },
        ])?
        .send::<ProcessReply, ProcessError>()
        .await?;

    // Collect all responses
    pin_mut!(replies);
    let mut results = Vec::new();
    while let Some(reply) = replies.next().await {
        let (reply, _fds) = reply?;
        if let Ok(response) = reply {
            match response.into_parameters() {
                Some(ProcessReply::Result(result)) => {
                    results.push(result);
                }
                Some(ProcessReply::BatchResult(batch)) => {
                    results.extend(batch.results);
                }
                None => {}
            }
        }
    }

    // Process results
    for result in results {
        println!("Processed item {}: {}", result.id, result.processed);
    }

    Ok(())
}

// Proxy trait with owned return types - chain methods are generated.
#[proxy("org.example.BatchProcessor")]
trait BatchProcessorProxy {
    async fn process(
        &mut self,
        id: u32,
        data: &str,
    ) -> zlink::Result<Result<ProcessReply, ProcessError>>;

    async fn batch_process(
        &mut self,
        requests: Vec<ProcessRequest<'_>>,
    ) -> zlink::Result<Result<ProcessReply, ProcessError>>;
}

// Input types can use borrowed data (they implement Serialize, not DeserializeOwned).
#[derive(Debug, Serialize)]
struct ProcessRequest<'a> {
    id: u32,
    data: &'a str,
}

// Owned reply types - required for chain API (DeserializeOwned).
#[derive(Debug, Deserialize)]
#[serde(untagged)]
enum ProcessReply {
    Result(ProcessResult),
    BatchResult(BatchResult),
}

#[derive(Debug, Deserialize)]
struct ProcessResult {
    id: u32,
    processed: String,
}

#[derive(Debug, Deserialize)]
struct BatchResult {
    results: Vec<ProcessResult>,
}

#[derive(Debug, ReplyError)]
#[zlink(interface = "org.example.BatchProcessor")]
enum ProcessError {
    InvalidRequest { message: String },
}

§Examples

The repository includes a few examples:

Run examples with:

cargo run --example resolved -- example.com systemd.io
cargo run \
  --example varlink-inspect \
  --features idl-parse,introspection -- \
  /run/systemd/resolve/io.systemd.Resolve

§Features

§Main Features

  • tokio (default): Enable tokio runtime integration.
  • smol: Enable smol runtime integration.
  • server (default): Enable server-related functionality (Server, Listener, Service).
  • service (default): Enable the #[service] macro. Implies server and introspection.
  • proxy (default): Enable the #[proxy] macro for type-safe client code.
  • tracing (default): Enable tracing-based logging.
  • defmt: Enable defmt-based logging. If both tracing and defmt is enabled, tracing is used.

§IDL and Introspection

  • idl: Support for IDL type representations.
  • introspection: Enable runtime introspection of service interfaces.
  • idl-parse: Parse Varlink IDL files at runtime.

§Getting Help and/or Contributing

If you need help in using these crates, are looking for ways to contribute, or just want to hang out with the cool kids, please come chat with us in the #zlink:matrix.org Matrix room. If something doesn’t seem right, please file an issue.

We welcome contributions! Please see our Contributing Guide for details.

§License

This project is licensed under the MIT License.

Modules§

connection
Contains connection related API.
idl
Interface Definition Language (IDL) support for Varlink.
introspect
Type introspection support for Varlink.
notified
Convenience API for maintaining state, that notifies on changes.
pin_project_lite
reply
Method reply API.
service
Service-related API.
unix
Provides transport over Unix Domain Sockets.
varlink_service
Types for the org.varlink.service interface.

Macros§

impl_collection_type
Implements the Type trait for a generic collection type.
impl_map_type
Implements the Type trait for map types with string keys.
impl_transparent_wrapper
Implements the Type trait for transparent wrapper types.
impl_type
Implements the Type trait for multiple types mapping to the same IDL type.

Structs§

Call
A method call.
Connection
A connection.
Reply
A successful method call reply.
Server
A server.

Enums§

Error
The Error type for the zlink crate.

Traits§

Listener
A listener is a server that listens for incoming connections.
Service
Service trait for handling method calls.

Type Aliases§

Result
The Result type for the zlink crate.

Attribute Macros§

proxy
Creates a client-side proxy for calling Varlink methods on a connection.
service
Transforms an impl block into a Service trait implementation.

Derive Macros§

ReplyError
Implements serde::{Serialize, Deserialize} for service error enums.