zksync_concurrency
The zksync_concurrency
crate provides a structured concurrency framework inspired by Golang's context and errgroup, tailored for Rust. It aims to simplify the management of concurrent tasks, their cancellation, and error propagation in asynchronous and synchronous code.
Core Concepts
1. ctx::Ctx
- Context
The ctx::Ctx
(Context) is a fundamental concept in this crate. It acts as a carrier for cancellation signals, deadlines, and other request-scoped values.
- Cancellation: A
Ctx
can be canceled, signaling all operations associated with it to terminate gracefully as soon as possible. - Deadlines & Timeouts: A
Ctx
can have a deadline, after which it is automatically canceled. You can create child contexts with shorter timeouts. - Propagation: Contexts are typically passed down the call stack. Functions that perform potentially long-running operations or I/O should accept a
&ctx::Ctx
argument. - Clock Abstraction:
Ctx
provides access to aClock
(ctx.now()
,ctx.now_utc()
,ctx.sleep()
), which can be aRealClock
, or for testingManualClock
andAffineClock
. - Random Number Generator:
Ctx
also provides access to aRng
(ctx.rng()
), which can be a OS-level RNG for production or a deterministic RNG for testing.
Usage:
use ;
async
async
async
2. scope
- Structured Concurrency
Structured concurrency ensures that concurrent tasks are managed within a well-defined scope. If a scope exits, all tasks spawned within it are guaranteed to have completed or been canceled.
scope::run!
(async) andscope::run_blocking!
(sync): These macros create a new scope. The scope manages a group of tasks.scope::run!(ctx, async_closure)
: Use this for asynchronous scopes. The provided closure must beasync
. Therun!
macro itself returns a future that you.await
. Inside, you'll typically uses.spawn()
for async tasks ands.spawn_blocking()
for tasks that need to run on a separate thread due to blocking operations.scope::run_blocking!(ctx, sync_closure)
: Use this for synchronous scopes. The provided closure is a regular (non-async
) closure. Therun_blocking!
macro is a blocking call. Inside, you'll typically uses.spawn_blocking()
for tasks, thoughs.spawn()
for async sub-tasks is also possible. The choice depends on whether the function creating the scope isasync
or synchronous.
- Task Spawning within a Scope:
- Main Tasks:
s.spawn(async_fn)
ors.spawn_blocking(sync_fn)
. The scope waits for all main tasks to complete. If any main task returns an error or is canceled, the scope is canceled. - Background Tasks:
s.spawn_bg(async_fn)
ors.spawn_bg_blocking(sync_fn)
. The scope's primary work is considered complete once all main tasks finish. At this point (or if any task errors, or if the scope is explicitly canceled), the scope's context is canceled. Background tasks are also subject to this cancellation and must terminate gracefully. Thescope::run!
orscope::run_blocking!
call will only return after all tasks, including background tasks, have fully terminated. They are useful for auxiliary operations like logging, monitoring, or handling side effects that don't define the main completion criteria of the scope.
- Main Tasks:
- Cancellation:
- Automatic: If any task (main or background) within the scope returns an
Err
, the scope's context is immediately canceled. All other tasks within that scope should then terminate gracefully. - Explicit: You can explicitly cancel a scope's context using
s.cancel()
.
- Automatic: If any task (main or background) within the scope returns an
- Error Handling:
- The
scope::run!
orscope::run_blocking!
macro returns aResult
. - If all tasks succeed, it returns the
Ok
result of the root task provided to the macro. - If any task fails (returns an
Err
), the macro returns the error of the first task that failed. Subsequent errors from other tasks are ignored. - If a task panics, the panic is propagated after all other tasks in the scope have completed (or been canceled and terminated).
- The
JoinHandle
: When you spawn a task, you get aJoinHandle
. You can usehandle.join(ctx).await
to get theResult
of that specific task. If the task itself errored, joining it will result inErr(ctx::Canceled)
because the scope (and thus the task's specific context for error reporting) would have been canceled.
Usage:
use ;
use ;
async
Error Types
-
ctx::Canceled
: A distinct error type indicating that an operation was specifically canceled because its context was terminated. This is crucial for distinguishing cancellations from other failures. -
ctx::Error
: This is an enum that represents the outcome of a cancellable operation. It can be one of two variants:Error::Canceled(ctx::Canceled)
: Indicates the operation was canceled via its context.Error::Internal(anyhow::Error)
: Indicates the operation failed for a reason other than cancellation. The underlying cause is captured as ananyhow::Error
, allowing for rich error reporting from various sources.ctx::Error
is the standard error type for many functions and methods within thezksync_concurrency
crate.
-
ctx::Result<T>
: A convenient type alias forstd::result::Result<T, ctx::Error>
. -
error::Wrap
: A trait similar toanyhow::Context
for adding context to your custom error types that might embedanyhow::Error
, while correctly handlingctx::Canceled
.
use ;
async
Pitfalls: ctx::Canceled
vs. anyhow::Error
It is critical to handle ctx::Canceled
correctly to maintain its distinct semantic meaning. Improperly mixing ctx::Error::Canceled
with general anyhow::Error
propagation or the anyhow::Context
trait can obscure the specific reason for termination (i.e., cancellation).
-
Using
anyhow::Context::context()
onctx::Error
(orctx::Result
):- If you have a
ctx::Result<T>
(which isResult<T, ctx::Error>
) and you call.context("some context")
on it, actx::Error::Canceled
variant will be converted into ananyhow::Error
. - The resulting
anyhow::Error
will contain the new context string prepended to the string representation of the originalCanceled
error (e.g., "some context: canceled"). - Crucially, the distinct
ctx::Error::Canceled
variant is lost. The error is now a genericanyhow::Error
, and you can no longer specifically match onctx::Error::Canceled
. - Recommendation: Always use
your_ctx_result.wrap("...")
(fromzksync_concurrency::error::Wrap
) when you want to add context to actx::Result
while preserving thectx::Error::Canceled
variant if it occurs. If the function must returnanyhow::Result
, be aware of the conversion (see next point).
use ; use Context as _; let canceled_result: Result = Err; // INCORRECT - converts Canceled to generic anyhow::Error: let wrong = canceled_result.clone.context; // `wrong` is now Result<(), anyhow::Error>, cannot match on ctx::Error::Canceled // CORRECT - preserves Canceled type: let correct = canceled_result.wrap; match correct
- If you have a
-
Propagating
ctx::Error::Canceled
via?
into a function returningResult<_, anyhow::Error>
:- If a function
func_A()
returnsctx::Result<T>
and this result isErr(ctx::Error::Canceled)
, and you usefunc_A()?
inside another functionfunc_B()
that is declared to returnResult<U, anyhow::Error>
. The?
operator will perform an implicit conversion fromctx::Error
toanyhow::Error
(becausectx::Error
implementsInto<anyhow::Error>
). Thectx::Error::Canceled
will be converted into a genericanyhow::Error
. The error message of thisanyhow::Error
will typically be just "canceled". - The specific
ctx::Canceled
type information is lost at the level offunc_B()
's return type.func_B()
now only knows it got ananyhow::Error
, not specifically a cancellation. - Recommendation: The primary and most robust approach is to explicitly handle the
ctx::Result
returned byfunc_A()
before it's propagated further using?
, especially iffunc_B()
needs to return a genericResult<_, anyhow::Error>
. This typically involves using amatch
statement on the result offunc_A()
:- If
Err(ctx::Error::Canceled)
is received,func_B()
can perform cancellation-specific cleanup, log the cancellation, or transform it into an appropriateanyhow::Error
if cancellation at this specific point infunc_B
's logic is unexpected and should be reported as a distinct failure (while still acknowledging its origin if possible). - If
Err(ctx::Error::Internal)
is received, it can be propagated (e.g., using?
iffunc_B
returnsanyhow::Error
) or wrapped as needed. - If
Ok
is received, proceed as normal. Allowing the?
operator to implicitly convertctx::Error::Canceled
to a genericanyhow::Error
(whenfunc_B
returnsResult<_, anyhow::Error>
) should be a conscious decision. It is only appropriate iffunc_B
can genuinely treat a context cancellation originating fromfunc_A
identically to any other internal error, and the loss of the specificCanceled
type information is acceptable for all callers offunc_B
. If precise cancellation information must be propagated out offunc_B
, thenfunc_B
itself should consider returningctx::Result
or a custom error enum that can distinctly represent cancellation (as shown in Scenario 2 of the conceptual example below).
- If
use ; // This function loses the specific Canceled type if operation is canceled. // This function preserves the Canceled type. // Example of explicit handling when returning anyhow::Error: async
- If a function
-
Moving
Copy
types into tasks:- When spawning tasks,
Copy
types are prevented from being moved into the task async block. Usectx::NoCopy
to wrapCopy
types when you want to move them inside tasks, then unwrap them inside the task.
use ; let epoch = 42u64; // A Copy type // Without NoCopy - this won't compile run!.await.unwrap; // With NoCopy - epoch is moved, ensuring transfer of ownership let nc_epoch = NoCopy; run!.await.unwrap;
- When spawning tasks,
Utilities
The zksync_concurrency
crate also provides several context-aware utilities:
ctx::channel
: Bounded and unbounded MPSC (multi-producer, single-consumer) channels.send()
andrecv()
operations are context-aware and will returnErr(ctx::Canceled)
if the context is canceled during the operation. Channel disconnection is generally not directly observable; tasks should rely on context cancellation for termination.oneshot
: Context-aware one-shot channels, for sending a single value between two tasks.limiter
: A rate limiter that supports delayed permit consumption.acquire()
is context-aware.time
Module &ctx::Clock
:time::Duration
,time::Instant
,time::Utc
,time::Deadline
.ctx::Clock
(accessible viactx.clock()
,ctx.now()
,ctx.now_utc()
,ctx.sleep()
) provides an abstraction over time sources. This is particularly useful for testing, where aManualClock
can be used to control time progression deterministically.
io
Module: Context-aware asynchronous I/O operations (e.g.,read
,read_exact
,write_all
) built on top oftokio::io
. These operations will honor context cancellation.net
Module: Context-aware network utilities, such asHost::resolve()
for DNS resolution andtcp::connect
/tcp::accept
for TCP operations, all respecting context cancellation.sync
Module: Context-aware synchronization primitives likeMutex
(sync::lock
),Semaphore
(sync::acquire
), andNotify
(sync::notified
). It also includes aprunable_mpsc
channel where messages can be filtered or pruned based on predicates.signal::Once
: A simple signal that can be sent once and awaited by multiple tasks, also context-aware.
Best Practices
- Pass
&ctx::Ctx
Everywhere: Any function that might block, perform I/O, or is part of a larger cancellable operation should accept a&ctx::Ctx
as its first argument. - Favor
ctx.wait()
: Whenawait
ing a future that is not itself context-aware, wrap it withctx.wait(your_future).await?
to make it responsive to cancellation. - Check
ctx.is_active()
in Loops: In long-running computations or loops, periodically checkctx.is_active()
and returnErr(ctx::Canceled)
or clean up and exit if it's false. - Use
scope
for Related Tasks: Group related concurrent tasks within ascope
to ensure proper lifecycle management and error propagation. - Return
Result<T, YourError>
from Tasks: Tasks spawned in a scope should return aResult
. The error typeYourError
should be consistent across the scope (oftenanyhow::Error
orctx::Error
). - Keep Tasks Focused: Tasks should ideally represent a single logical unit of work.
- Resource Cleanup: When a task detects cancellation, it should clean up any resources it holds before returning.