Skip to main content

EmergentHandler

Struct EmergentHandler 

Source
pub struct EmergentHandler { /* private fields */ }
Expand description

A Handler primitive that subscribes to and publishes messages.

Handlers are the transformation layer in a workflow. They receive messages from Sources or other Handlers, process them, and emit new messages.

§Example

use emergent_client::{EmergentHandler, EmergentMessage};
use futures::StreamExt;

let handler = EmergentHandler::connect("my_filter").await?;
let mut stream = handler.subscribe("timer.tick").await?;

while let Some(msg) = stream.next().await {
    let output = EmergentMessage::new("timer.filtered")
        .with_causation_id(msg.id());
    handler.publish(output).await?;
}

Implementations§

Source§

impl EmergentHandler

Source

pub async fn connect(name: &str) -> Result<Self>

Connect to the Emergent engine as a Handler.

§Errors

Returns an error if the connection fails.

Source

pub async fn connect_to(name: &str, socket_path: &Path) -> Result<Self>

Connect to the Emergent engine at a specific socket path.

This is useful for testing or when connecting to a non-default socket.

§Errors

Returns an error if the connection fails.

Source

pub async fn subscribe( &mut self, types: impl IntoSubscription, ) -> Result<MessageStream>

Subscribe to message types and return a stream of incoming messages.

The SDK automatically handles system.shutdown messages - when the engine signals shutdown for handlers, the stream will close gracefully.

§Examples
// Single topic
let stream = handler.subscribe("timer.tick").await?;

// Multiple topics with array
let stream = handler.subscribe(["timer.tick", "timer.filtered"]).await?;

// From a Vec
let topics = vec!["timer.tick".to_string()];
let stream = handler.subscribe(topics).await?;
§Errors

Returns an error if the subscription fails.

Source

pub async fn messages( name: impl Into<String>, _types: impl IntoSubscription, ) -> Result<(Self, MessageStream)>

Convenience method that connects, gets configured subscriptions, and returns (handler, stream) for the common one-liner pattern.

§Errors

Returns an error if connection or subscription fails.

Source

pub async fn publish(&self, message: EmergentMessage) -> Result<()>

Publish a message to the engine (fire-and-forget).

§Errors

Returns an error if the message cannot be sent.

Source

pub async fn publish_ack(&self, message: EmergentMessage) -> Result<()>

Publish a message with broker acknowledgment (backpressure).

Unlike [publish], this waits for the engine’s message broker to confirm it has processed and forwarded the message before returning. This provides natural backpressure — the caller cannot outpace the broker.

Used internally by [publish_all] and [publish_stream].

§Errors

Returns an error if the broker rejects the message or times out.

Source

pub async fn publish_all( &self, messages: impl IntoIterator<Item = EmergentMessage>, ) -> Result<usize>

Publish all messages from an iterator.

Sends each message individually with broker acknowledgment so subscribers begin consuming immediately. Stops on the first error.

Returns the number of messages successfully published.

§Errors

Returns the first publish error encountered.

Source

pub async fn publish_stream<S>(&self, stream: S) -> Result<usize>
where S: Stream<Item = EmergentMessage> + Unpin,

Publish messages from an async stream.

Consumes the stream, publishing each message individually with broker acknowledgment so subscribers begin consuming immediately. Stops on the first publish error or when the stream ends.

Returns the number of messages successfully published.

§Errors

Returns the first publish error encountered.

Source

pub async fn stream_offer( &self, message_type: &str, items: impl IntoIterator<Item = Value>, pull_stream: &mut MessageStream, timeout: Duration, ) -> Result<usize>

Offer items as a pull-based stream with consumer-driven backpressure.

Publishes stream.ready, then serves items one at a time as the consumer sends stream.pull requests. Publishes stream.end when exhausted.

§Errors

Returns an error if the stream times out or the pull stream closes.

Source

pub async fn stream_consume( &self, message_type: &str, source_stream: &mut MessageStream, timeout: Duration, on_item: impl FnMut(EmergentMessage), ) -> Result<usize>

Consume a pull-based stream, yielding items via callback.

Waits for stream.ready, then sends stream.pull requests automatically after each item is consumed. Stops on stream.end.

§Errors

Returns an error if the stream times out or the source stream closes.

Source

pub async fn discover(&self) -> Result<DiscoveryInfo>

Discover available message types and primitives.

§Errors

Returns an error if the discovery request fails.

Source

pub async fn get_my_subscriptions(&self) -> Result<Vec<String>>

Get the configured subscription types for this primitive.

§Errors

Returns an error if the request fails.

Source

pub fn name(&self) -> &str

Get the name of this handler.

Source

pub fn subscribed_types(&self) -> &[String]

Get currently subscribed message types.

Source

pub async fn disconnect(&self) -> Result<()>

Gracefully disconnect from the engine.

§Errors

Returns an error if the disconnection fails.

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more