pub struct Context<'a> { /* private fields */ }Expand description
Per-delivery context, threaded through middleware and into the handler.
Carries the channel (name), a working copy of the message
headers (middleware may enrich them for the handler; the broker message
itself is untouched), shared application state, a per-delivery
Extensions type-map (get / insert), and access to named
publishers for publishing from inside a handler. The headers copy is made
lazily on the first headers_mut call. Outgoing messages do not inherit
it: replies and manual publishes start from fresh headers, shaped by the publish pipeline.
Implementations§
Source§impl<'a> Context<'a>
impl<'a> Context<'a>
Sourcepub fn publisher(&self, name: &str) -> Option<ScopedPublisher<'_>>
pub fn publisher(&self, name: &str) -> Option<ScopedPublisher<'_>>
Resolves a named publisher (registered with
RustStream::publisher) to publish from this handler.
Sends through it run the scope’s publish middleware (envelope, metrics) - the same chain as a
macro reply - so a manual publish is not a hole in the pipeline. Returns None if no
publisher is registered under name.
Sourcepub fn headers_mut(&mut self) -> &mut Headers
pub fn headers_mut(&mut self) -> &mut Headers
The working copy of the message headers, mutably. The first call clones the message headers; later calls return the same copy.
Sourcepub fn state(&self) -> &State
pub fn state(&self) -> &State
Returns the shared application State, the type-map set once at build with
RustStream::insert_state. Read a value from it with
State::get.
§Examples
use ruststream::IncomingMessage;
use ruststream::runtime::{Context, HandlerResult};
async fn handle<M: IncomingMessage>(_msg: &M, ctx: &mut Context<'_>) -> HandlerResult {
if let Some(prefix) = ctx.state().get::<String>() {
let _ = prefix;
}
HandlerResult::Ack
}Sourcepub fn get<T: Any + Send + Sync>(&self) -> Option<&T>
pub fn get<T: Any + Send + Sync>(&self) -> Option<&T>
Returns the per-delivery Extensions value of type T, if any.
This reads the per-delivery type-map (broker-contributed metadata, middleware-set values),
not the app state. For shared app state use ctx.state().get::<T>().
§Examples
use ruststream::IncomingMessage;
use ruststream::runtime::{Context, HandlerResult};
async fn handle<M: IncomingMessage>(_msg: &M, ctx: &mut Context<'_>) -> HandlerResult {
if let Some(span_id) = ctx.get::<u64>() {
let _ = span_id;
}
HandlerResult::Ack
}Sourcepub fn insert<T: Any + Send + Sync>(&mut self, value: T)
pub fn insert<T: Any + Send + Sync>(&mut self, value: T)
Inserts a per-delivery Extensions value, replacing any previous value of the same type.
Middleware uses this to hand typed data to downstream handlers (an authenticated user, a correlation id) without serializing it into the headers.
§Examples
use ruststream::IncomingMessage;
use ruststream::runtime::{Context, HandlerResult};
async fn handle<M: IncomingMessage>(_msg: &M, ctx: &mut Context<'_>) -> HandlerResult {
ctx.insert(123u64);
assert_eq!(ctx.get::<u64>(), Some(&123));
HandlerResult::Ack
}Sourcepub fn after(&mut self, outcome: HandlerResult) -> After<'_, 'a>
pub fn after(&mut self, outcome: HandlerResult) -> After<'_, 'a>
Begins registering a post-settle hook gated on outcome.
The returned builder’s then registers a future that the dispatcher runs
once the message has been settled, but only if the actual settlement matches outcome by
kind. The four kinds are distinct: HandlerResult::Ack, HandlerResult::drop (nack
without requeue), HandlerResult::retry (nack with requeue), and
HandlerResult::retry_after (which matches regardless of the delay). So a hook gated on
drop() does not fire on a retry() settlement, and vice versa. Multiple hooks accumulate
and every matching one runs.
The hook is scoped to the whole delivery. On the batch path a Context is one per batch,
so a hook registered here runs after the entire batch settles; because a batch has
per-element outcomes, the outcome gate is ignored there and only after_settle
hooks (which run regardless) fire (see that method).
§Cancel safety
Post-settle hooks are at-most-once: the message is already settled before any hook runs, so
a hook that panics, or that is lost when the process crashes, never causes a redelivery and
never blocks the next delivery. A graceful shutdown drains in-flight hooks (bounded by the
app’s shutdown_timeout); an aborted shutdown may
drop them.
§Examples
use ruststream::IncomingMessage;
use ruststream::runtime::{Context, Handler, HandlerResult};
fn use_after<M: IncomingMessage + 'static>() {
let _handler = |_msg: &M, ctx: &mut Context| {
ctx.after(HandlerResult::Ack)
.then(async move { /* runs only after this message is acked */ });
async { HandlerResult::Ack }
};
}Sourcepub fn after_ack(&mut self, fut: impl Future<Output = ()> + Send + 'static)
pub fn after_ack(&mut self, fut: impl Future<Output = ()> + Send + 'static)
Registers a post-settle hook that runs only after the message is acked.
Sugar for self.after(HandlerResult::Ack).then(fut); see after for the
gating and cancel-safety semantics.
§Cancel safety
At-most-once, as for after: the ack has already happened, so a lost hook
never redelivers.
§Examples
use ruststream::IncomingMessage;
use ruststream::runtime::{Context, HandlerResult};
fn use_after_ack<M: IncomingMessage + 'static>() {
let _handler = |_msg: &M, ctx: &mut Context| {
ctx.after_ack(async move { /* fire-and-forget once acked */ });
async { HandlerResult::Ack }
};
}Sourcepub fn after_settle(&mut self, fut: impl Future<Output = ()> + Send + 'static)
pub fn after_settle(&mut self, fut: impl Future<Output = ()> + Send + 'static)
Registers a post-settle hook that runs after the message settles, whatever the outcome.
Unlike after this has no outcome gate, so it fires on Ack, Drop,
Retry, and RetryAfter alike. It is the only post-settle form honoured on the batch path, where the
per-element outcomes make an outcome gate ill-defined; there it runs once after the whole
batch has been settled.
§Cancel safety
At-most-once, as for after.
§Examples
use ruststream::IncomingMessage;
use ruststream::runtime::{Context, HandlerResult};
fn use_after_settle<M: IncomingMessage + 'static>() {
let _handler = |_msg: &M, ctx: &mut Context| {
ctx.after_settle(async move { /* runs once the message is settled, any outcome */ });
async { HandlerResult::retry() }
};
}