Skip to main content

Context

Struct Context 

Source
pub struct Context<'a> { /* private fields */ }
Expand description

Per-delivery context, threaded through middleware and into the handler.

Carries the channel (name), a working copy of the message headers (middleware may enrich them for the handler; the broker message itself is untouched), shared application state, a per-delivery Extensions type-map (get / insert), and access to named publishers for publishing from inside a handler. The headers copy is made lazily on the first headers_mut call. Outgoing messages do not inherit it: replies and manual publishes start from fresh headers, shaped by the publish pipeline.

Implementations§

Source§

impl<'a> Context<'a>

Source

pub fn name(&self) -> &str

The channel (name / subject) the message arrived on.

Source

pub fn publisher(&self, name: &str) -> Option<ScopedPublisher<'_>>

Resolves a named publisher (registered with RustStream::publisher) to publish from this handler.

Sends through it run the scope’s publish middleware (envelope, metrics) - the same chain as a macro reply - so a manual publish is not a hole in the pipeline. Returns None if no publisher is registered under name.

Source

pub fn headers(&self) -> &Headers

The working copy of the message headers.

Source

pub fn headers_mut(&mut self) -> &mut Headers

The working copy of the message headers, mutably. The first call clones the message headers; later calls return the same copy.

Source

pub fn state(&self) -> &State

Returns the shared application State, the type-map set once at build with RustStream::insert_state. Read a value from it with State::get.

§Examples
use ruststream::IncomingMessage;
use ruststream::runtime::{Context, HandlerResult};

async fn handle<M: IncomingMessage>(_msg: &M, ctx: &mut Context<'_>) -> HandlerResult {
    if let Some(prefix) = ctx.state().get::<String>() {
        let _ = prefix;
    }
    HandlerResult::Ack
}
Source

pub fn get<T: Any + Send + Sync>(&self) -> Option<&T>

Returns the per-delivery Extensions value of type T, if any.

This reads the per-delivery type-map (broker-contributed metadata, middleware-set values), not the app state. For shared app state use ctx.state().get::<T>().

§Examples
use ruststream::IncomingMessage;
use ruststream::runtime::{Context, HandlerResult};

async fn handle<M: IncomingMessage>(_msg: &M, ctx: &mut Context<'_>) -> HandlerResult {
    if let Some(span_id) = ctx.get::<u64>() {
        let _ = span_id;
    }
    HandlerResult::Ack
}
Source

pub fn insert<T: Any + Send + Sync>(&mut self, value: T)

Inserts a per-delivery Extensions value, replacing any previous value of the same type.

Middleware uses this to hand typed data to downstream handlers (an authenticated user, a correlation id) without serializing it into the headers.

§Examples
use ruststream::IncomingMessage;
use ruststream::runtime::{Context, HandlerResult};

async fn handle<M: IncomingMessage>(_msg: &M, ctx: &mut Context<'_>) -> HandlerResult {
    ctx.insert(123u64);
    assert_eq!(ctx.get::<u64>(), Some(&123));
    HandlerResult::Ack
}
Source

pub fn after(&mut self, outcome: HandlerResult) -> After<'_, 'a>

Begins registering a post-settle hook gated on outcome.

The returned builder’s then registers a future that the dispatcher runs once the message has been settled, but only if the actual settlement matches outcome by kind. The four kinds are distinct: HandlerResult::Ack, HandlerResult::drop (nack without requeue), HandlerResult::retry (nack with requeue), and HandlerResult::retry_after (which matches regardless of the delay). So a hook gated on drop() does not fire on a retry() settlement, and vice versa. Multiple hooks accumulate and every matching one runs.

The hook is scoped to the whole delivery. On the batch path a Context is one per batch, so a hook registered here runs after the entire batch settles; because a batch has per-element outcomes, the outcome gate is ignored there and only after_settle hooks (which run regardless) fire (see that method).

§Cancel safety

Post-settle hooks are at-most-once: the message is already settled before any hook runs, so a hook that panics, or that is lost when the process crashes, never causes a redelivery and never blocks the next delivery. A graceful shutdown drains in-flight hooks (bounded by the app’s shutdown_timeout); an aborted shutdown may drop them.

§Examples
use ruststream::IncomingMessage;
use ruststream::runtime::{Context, Handler, HandlerResult};

fn use_after<M: IncomingMessage + 'static>() {
    let _handler = |_msg: &M, ctx: &mut Context| {
        ctx.after(HandlerResult::Ack)
            .then(async move { /* runs only after this message is acked */ });
        async { HandlerResult::Ack }
    };
}
Source

pub fn after_ack(&mut self, fut: impl Future<Output = ()> + Send + 'static)

Registers a post-settle hook that runs only after the message is acked.

Sugar for self.after(HandlerResult::Ack).then(fut); see after for the gating and cancel-safety semantics.

§Cancel safety

At-most-once, as for after: the ack has already happened, so a lost hook never redelivers.

§Examples
use ruststream::IncomingMessage;
use ruststream::runtime::{Context, HandlerResult};

fn use_after_ack<M: IncomingMessage + 'static>() {
    let _handler = |_msg: &M, ctx: &mut Context| {
        ctx.after_ack(async move { /* fire-and-forget once acked */ });
        async { HandlerResult::Ack }
    };
}
Source

pub fn after_settle(&mut self, fut: impl Future<Output = ()> + Send + 'static)

Registers a post-settle hook that runs after the message settles, whatever the outcome.

Unlike after this has no outcome gate, so it fires on Ack, Drop, Retry, and RetryAfter alike. It is the only post-settle form honoured on the batch path, where the per-element outcomes make an outcome gate ill-defined; there it runs once after the whole batch has been settled.

§Cancel safety

At-most-once, as for after.

§Examples
use ruststream::IncomingMessage;
use ruststream::runtime::{Context, HandlerResult};

fn use_after_settle<M: IncomingMessage + 'static>() {
    let _handler = |_msg: &M, ctx: &mut Context| {
        ctx.after_settle(async move { /* runs once the message is settled, any outcome */ });
        async { HandlerResult::retry() }
    };
}

Trait Implementations§

Source§

impl Debug for Context<'_>

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

§

impl<'a> !RefUnwindSafe for Context<'a>

§

impl<'a> !Sync for Context<'a>

§

impl<'a> !UnwindSafe for Context<'a>

§

impl<'a> Freeze for Context<'a>

§

impl<'a> Send for Context<'a>

§

impl<'a> Unpin for Context<'a>

§

impl<'a> UnsafeUnpin for Context<'a>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more