zksync_concurrency 0.13.0

Structured concurrency framework
Documentation

zksync_concurrency

The zksync_concurrency crate provides a structured concurrency framework inspired by Golang's context and errgroup, tailored for Rust. It aims to simplify the management of concurrent tasks, their cancellation, and error propagation in asynchronous and synchronous code.

Core Concepts

1. ctx::Ctx - Context

The ctx::Ctx (Context) is a fundamental concept in this crate. It acts as a carrier for cancellation signals, deadlines, and other request-scoped values.

  • Cancellation: A Ctx can be canceled, signaling all operations associated with it to terminate gracefully as soon as possible.
  • Deadlines & Timeouts: A Ctx can have a deadline, after which it is automatically canceled. You can create child contexts with shorter timeouts.
  • Propagation: Contexts are typically passed down the call stack. Functions that perform potentially long-running operations or I/O should accept a &ctx::Ctx argument.
  • Clock Abstraction: Ctx provides access to a Clock (ctx.now(), ctx.now_utc(), ctx.sleep()), which can be a RealClock, or for testing ManualClock and AffineClock.
  • Random Number Generator: Ctx also provides access to a Rng (ctx.rng()), which can be a OS-level RNG for production or a deterministic RNG for testing.

Usage:

use zksync_concurrency::{ctx, time};

async fn some_long_operation(ctx: &ctx::Ctx) -> ctx::OrCanceled<()> {
    // Perform some work
    // ...

    // Use ctx.wait to await a future while also listening for cancellation
    let result = ctx.wait(async {
        // some sub-operation
        // ...
    }).await?;
    println!("Sub-operation result: {}", result);

    // Or, periodically check if the context is active
    if !ctx.is_active() {
        println!("Context was canceled, cleaning up.");
        return Err(ctx::Canceled);
    }

    // Sleep using the context's clock
    ctx.sleep(time::Duration::milliseconds(100)).await?;

    Ok(())
}

async fn my_task(parent_ctx: &ctx::Ctx) {
    // Create a child context with a timeout
    let ctx_with_timeout = &parent_ctx.with_timeout(time::Duration::seconds(5));

    match some_long_operation(ctx_with_timeout).await {
        Ok(()) => println!("Operation completed successfully."),
        Err(ctx::Canceled) => println!("Operation was canceled."),
    }
}

#[tokio::main]
async fn main() {
    let root_ctx = ctx::root(); // In tests, we use ctx::test_root() instead.
    my_task(&root_ctx).await;
}

2. scope - Structured Concurrency

Structured concurrency ensures that concurrent tasks are managed within a well-defined scope. If a scope exits, all tasks spawned within it are guaranteed to have completed or been canceled.

  • scope::run! (async) and scope::run_blocking! (sync): These macros create a new scope. The scope manages a group of tasks.
    • scope::run!(ctx, async_closure): Use this for asynchronous scopes. The provided closure must be async. The run! macro itself returns a future that you .await. Inside, you'll typically use s.spawn() for async tasks and s.spawn_blocking() for tasks that need to run on a separate thread due to blocking operations.
    • scope::run_blocking!(ctx, sync_closure): Use this for synchronous scopes. The provided closure is a regular (non-async) closure. The run_blocking! macro is a blocking call. Inside, you'll typically use s.spawn_blocking() for tasks, though s.spawn() for async sub-tasks is also possible. The choice depends on whether the function creating the scope is async or synchronous.
  • Task Spawning within a Scope:
    • Main Tasks: s.spawn(async_fn) or s.spawn_blocking(sync_fn). The scope waits for all main tasks to complete. If any main task returns an error or is canceled, the scope is canceled.
    • Background Tasks: s.spawn_bg(async_fn) or s.spawn_bg_blocking(sync_fn). The scope's primary work is considered complete once all main tasks finish. At this point (or if any task errors, or if the scope is explicitly canceled), the scope's context is canceled. Background tasks are also subject to this cancellation and must terminate gracefully. The scope::run! or scope::run_blocking! call will only return after all tasks, including background tasks, have fully terminated. They are useful for auxiliary operations like logging, monitoring, or handling side effects that don't define the main completion criteria of the scope.
  • Cancellation:
    • Automatic: If any task (main or background) within the scope returns an Err, the scope's context is immediately canceled. All other tasks within that scope should then terminate gracefully.
    • Explicit: You can explicitly cancel a scope's context using s.cancel().
  • Error Handling:
    • The scope::run! or scope::run_blocking! macro returns a Result.
    • If all tasks succeed, it returns the Ok result of the root task provided to the macro.
    • If any task fails (returns an Err), the macro returns the error of the first task that failed. Subsequent errors from other tasks are ignored.
    • If a task panics, the panic is propagated after all other tasks in the scope have completed (or been canceled and terminated).
  • JoinHandle: When you spawn a task, you get a JoinHandle. You can use handle.join(ctx).await to get the Result of that specific task. If the task itself errored, joining it will result in Err(ctx::Canceled) because the scope (and thus the task's specific context for error reporting) would have been canceled.

Usage:

use zksync_concurrency::{ctx, scope, time};
use std::sync::atomic::{AtomicUsize, Ordering};

async fn application_logic(ctx: &ctx::Ctx) -> Result<String, anyhow::Error> {
    // scope::run! takes the parent context and a closure.
    // The closure receives the scope's context and a reference to the scope itself.
    let result_from_scope: Result<String, anyhow::Error> = scope::run!(ctx, |ctx, s| async {
        // Spawn a main async task
        s.spawn(async {
            ctx.sleep(time::Duration::milliseconds(50)).await?;
            println!("Main async task 1 completed.");
            // Tasks should return Result<T, E> where E is the scope's error type
            Ok(())
        });

        // Spawn another main blocking task
        let handle = s.spawn_blocking(|| {
            // Simulate work
            std::thread::sleep(std::time::Duration::from_millis(100));
            println!("Main blocking task completed.");

            // Example of returning an error:
            // return Err(anyhow::anyhow!("Something went wrong in blocking task"));
            Ok(100)
        });

        // Spawn a background task
        s.spawn_bg(async {
            for i in 0..5 {
                if !ctx.is_active() {
                    println!("Background task: scope canceled, exiting.");
                    break;
                }
                println!("Background task alive: {}", i);
                ctx.sleep(time::Duration::milliseconds(30)).await?;
            }
            Ok(())
        });

        // Optionally, join a specific task
        match handle.join(ctx).await {
            Ok(val) => println!("Blocking task successfully returned: {}", val),
            Err(ctx::Canceled) => println!("Blocking task was canceled or scope failed."),
        }

        // The root task of the scope. Its Ok value is returned if all tasks succeed.
        // If we want the scope to fail, this root task can return an error,
        // or any spawned task can return an error.
        Ok("All main tasks launched".to_string())
    }).await;

    println!("Scope finished.");
    result_from_scope
}

Error Types

  • ctx::Canceled: A distinct error type indicating that an operation was specifically canceled because its context was terminated. This is crucial for distinguishing cancellations from other failures.

  • ctx::Error: This is an enum that represents the outcome of a cancellable operation. It can be one of two variants:

    • Error::Canceled(ctx::Canceled): Indicates the operation was canceled via its context.
    • Error::Internal(anyhow::Error): Indicates the operation failed for a reason other than cancellation. The underlying cause is captured as an anyhow::Error, allowing for rich error reporting from various sources. ctx::Error is the standard error type for many functions and methods within the zksync_concurrency crate.
  • ctx::Result<T>: A convenient type alias for std::result::Result<T, ctx::Error>.

  • error::Wrap: A trait similar to anyhow::Context for adding context to your custom error types that might embed anyhow::Error, while correctly handling ctx::Canceled.

use zksync_concurrency::{ctx, error::Wrap, time};

async fn process_data_with_proper_context(ctx: &ctx::Ctx, data_input: &str) -> ctx::Result<String> {
    if !ctx.is_active() {
        // This becomes ctx::Error::Canceled.
        // The .wrap() here will preserve it as Canceled.
        return Err(ctx::Canceled.into()).wrap(|| format!("Ctx canceled before processing '{}'", data_input));
    }

    if data_input == "trigger_fail" {
        // This becomes ctx::Error::Internal wrapping an anyhow::Error.
        let internal_failure = anyhow::anyhow!("Specific failure for '{}'", data_input);
        return Err(ctx::Error::Internal(internal_failure))
            .wrap(|| format!("High-level context for processing '{}'", data_input));
    }
      
    // Simulating some cancellable work
    ctx.wait(async { tokio::time::sleep(time::Duration::milliseconds(10)).await }).await?;

    ctx::Ok(format!("Successfully processed '{}'", data_input))
}

Pitfalls: ctx::Canceled vs. anyhow::Error

It is critical to handle ctx::Canceled correctly to maintain its distinct semantic meaning. Improperly mixing ctx::Error::Canceled with general anyhow::Error propagation or the anyhow::Context trait can obscure the specific reason for termination (i.e., cancellation).

  1. Using anyhow::Context::context() on ctx::Error (or ctx::Result):

    • If you have a ctx::Result<T> (which is Result<T, ctx::Error>) and you call .context("some context") on it, a ctx::Error::Canceled variant will be converted into an anyhow::Error.
    • The resulting anyhow::Error will contain the new context string prepended to the string representation of the original Canceled error (e.g., "some context: canceled").
    • Crucially, the distinct ctx::Error::Canceled variant is lost. The error is now a generic anyhow::Error, and you can no longer specifically match on ctx::Error::Canceled.
    • Recommendation: Always use your_ctx_result.wrap("...") (from zksync_concurrency::error::Wrap) when you want to add context to a ctx::Result while preserving the ctx::Error::Canceled variant if it occurs. If the function must return anyhow::Result, be aware of the conversion (see next point).
    use zksync_concurrency::{ctx, error::Wrap as _};
    use anyhow::Context as _;
    
    let canceled_result: ctx::Result<()> = Err(ctx::Error::Canceled(ctx::Canceled));
    
    // INCORRECT - converts Canceled to generic anyhow::Error:
    let wrong = canceled_result.clone().context("Added context");
    // `wrong` is now Result<(), anyhow::Error>, cannot match on ctx::Error::Canceled
    
    // CORRECT - preserves Canceled type:
    let correct = canceled_result.wrap("Added context");
    match correct {
        Err(ctx::Error::Canceled(_)) => { /* Handle cancellation specifically */ }
        Err(ctx::Error::Internal(_)) => { /* Handle other errors */ }
        Ok(_) => {}
    }
    
  2. Propagating ctx::Error::Canceled via ? into a function returning Result<_, anyhow::Error>:

    • If a function func_A() returns ctx::Result<T> and this result is Err(ctx::Error::Canceled), and you use func_A()? inside another function func_B() that is declared to return Result<U, anyhow::Error>. The ? operator will perform an implicit conversion from ctx::Error to anyhow::Error (because ctx::Error implements Into<anyhow::Error>). The ctx::Error::Canceled will be converted into a generic anyhow::Error. The error message of this anyhow::Error will typically be just "canceled".
    • The specific ctx::Canceled type information is lost at the level of func_B()'s return type. func_B() now only knows it got an anyhow::Error, not specifically a cancellation.
    • Recommendation: The primary and most robust approach is to explicitly handle the ctx::Result returned by func_A() before it's propagated further using ?, especially if func_B() needs to return a generic Result<_, anyhow::Error>. This typically involves using a match statement on the result of func_A():
      • If Err(ctx::Error::Canceled) is received, func_B() can perform cancellation-specific cleanup, log the cancellation, or transform it into an appropriate anyhow::Error if cancellation at this specific point in func_B's logic is unexpected and should be reported as a distinct failure (while still acknowledging its origin if possible).
      • If Err(ctx::Error::Internal) is received, it can be propagated (e.g., using ? if func_B returns anyhow::Error) or wrapped as needed.
      • If Ok is received, proceed as normal. Allowing the ? operator to implicitly convert ctx::Error::Canceled to a generic anyhow::Error (when func_B returns Result<_, anyhow::Error>) should be a conscious decision. It is only appropriate if func_B can genuinely treat a context cancellation originating from func_A identically to any other internal error, and the loss of the specific Canceled type information is acceptable for all callers of func_B. If precise cancellation information must be propagated out of func_B, then func_B itself should consider returning ctx::Result or a custom error enum that can distinctly represent cancellation (as shown in Scenario 2 of the conceptual example below).
    use zksync_concurrency::{ctx, time};
    
    fn operation_that_might_be_canceled(ctx: &ctx::Ctx) -> ctx::Result<String> {
        ctx.sleep(time::Duration::milliseconds(1)).await?; // This can return Err(ctx::Error::Canceled)
    
        // Or, explicitly:
        if !ctx.is_active() {
            return Err(ctx::Canceled); // Becomes ctx::Error::Canceled
        }
    
        ctx::Ok("Operation Succeeded".to_string())
    }
    
    // This function loses the specific Canceled type if operation is canceled.
    fn calling_func_that_returns_anyhow(ctx: &ctx::Ctx) -> Result<String, anyhow::Error> {
        let result = operation_that_might_be_canceled(ctx)?; // Converts ctx::Error::Canceled to anyhow::Error
        Ok(format!("Got: {}", result))
    }
    
    // This function preserves the Canceled type.
    fn calling_func_that_returns_ctx_result(ctx: &ctx::Ctx) -> ctx::Result<String> {
        let result_string = operation_that_might_be_canceled(ctx)?;
        ctx::Ok(format!("Got: {}", result_string))
    }
    
    // Example of explicit handling when returning anyhow::Error:
    async fn handler_with_explicit_handling(ctx: &ctx::Ctx) -> Result<String, anyhow::Error> {
        match operation_that_might_be_canceled(ctx) {
            Ok(result) => Ok(format!("Processed: {}", result)),
            Err(ctx::Error::Canceled(_)) => {
                // Handle cancellation specifically
                Err(anyhow::anyhow!("Operation was canceled"))
            }
            Err(ctx::Error::Internal(err)) => {
                // Propagate other errors
                Err(err.context("Operation failed"))
            }
        }
    }
    
  3. Moving Copy types into tasks:

    • When spawning tasks, Copy types are prevented from being moved into the task async block. Use ctx::NoCopy to wrap Copy types when you want to move them inside tasks, then unwrap them inside the task.
    use zksync_concurrency::{ctx, scope};
    
    let epoch = 42u64; // A Copy type
    
    // Without NoCopy - this won't compile
    scope::run!(ctx, |ctx, s| async {
        s.spawn(async {
            println!("Task processing epoch: {}", epoch);
            Ok(())
        });
        
        Ok(())
    }).await.unwrap();
    
    // With NoCopy - epoch is moved, ensuring transfer of ownership
    let nc_epoch = ctx::NoCopy(epoch);
    scope::run!(ctx, |ctx, s| async {
        s.spawn(async {
            let epoch = nc_epoch.into(); // epoch is moved
            println!("Task processing epoch: {}", epoch);
            Ok(())
        });
        // nc_epoch is no longer accessible here - it was moved
        Ok(())
    }).await.unwrap();
    

Utilities

The zksync_concurrency crate also provides several context-aware utilities:

  • ctx::channel: Bounded and unbounded MPSC (multi-producer, single-consumer) channels. send() and recv() operations are context-aware and will return Err(ctx::Canceled) if the context is canceled during the operation. Channel disconnection is generally not directly observable; tasks should rely on context cancellation for termination.
  • oneshot: Context-aware one-shot channels, for sending a single value between two tasks.
  • limiter: A rate limiter that supports delayed permit consumption. acquire() is context-aware.
  • time Module & ctx::Clock:
    • time::Duration, time::Instant, time::Utc, time::Deadline.
    • ctx::Clock (accessible via ctx.clock(), ctx.now(), ctx.now_utc(), ctx.sleep()) provides an abstraction over time sources. This is particularly useful for testing, where a ManualClock can be used to control time progression deterministically.
  • io Module: Context-aware asynchronous I/O operations (e.g., read, read_exact, write_all) built on top of tokio::io. These operations will honor context cancellation.
  • net Module: Context-aware network utilities, such as Host::resolve() for DNS resolution and tcp::connect/tcp::accept for TCP operations, all respecting context cancellation.
  • sync Module: Context-aware synchronization primitives like Mutex (sync::lock), Semaphore (sync::acquire), and Notify (sync::notified). It also includes a prunable_mpsc channel where messages can be filtered or pruned based on predicates.
  • signal::Once: A simple signal that can be sent once and awaited by multiple tasks, also context-aware.

Best Practices

  1. Pass &ctx::Ctx Everywhere: Any function that might block, perform I/O, or is part of a larger cancellable operation should accept a &ctx::Ctx as its first argument.
  2. Favor ctx.wait(): When awaiting a future that is not itself context-aware, wrap it with ctx.wait(your_future).await? to make it responsive to cancellation.
  3. Check ctx.is_active() in Loops: In long-running computations or loops, periodically check ctx.is_active() and return Err(ctx::Canceled) or clean up and exit if it's false.
  4. Use scope for Related Tasks: Group related concurrent tasks within a scope to ensure proper lifecycle management and error propagation.
  5. Return Result<T, YourError> from Tasks: Tasks spawned in a scope should return a Result. The error type YourError should be consistent across the scope (often anyhow::Error or ctx::Error).
  6. Keep Tasks Focused: Tasks should ideally represent a single logical unit of work.
  7. Resource Cleanup: When a task detects cancellation, it should clean up any resources it holds before returning.