pub struct EmergentHandler { /* private fields */ }Expand description
A Handler primitive that subscribes to and publishes messages.
Handlers are the transformation layer in a workflow. They receive messages from Sources or other Handlers, process them, and emit new messages.
§Example
use emergent_client::{EmergentHandler, EmergentMessage};
use futures::StreamExt;
let handler = EmergentHandler::connect("my_filter").await?;
let mut stream = handler.subscribe("timer.tick").await?;
while let Some(msg) = stream.next().await {
let output = EmergentMessage::new("timer.filtered")
.with_causation_id(msg.id());
handler.publish(output).await?;
}Implementations§
Source§impl EmergentHandler
impl EmergentHandler
Sourcepub async fn connect_to(name: &str, socket_path: &Path) -> Result<Self>
pub async fn connect_to(name: &str, socket_path: &Path) -> Result<Self>
Connect to the Emergent engine at a specific socket path.
This is useful for testing or when connecting to a non-default socket.
§Errors
Returns an error if the connection fails.
Sourcepub async fn subscribe(
&mut self,
types: impl IntoSubscription,
) -> Result<MessageStream>
pub async fn subscribe( &mut self, types: impl IntoSubscription, ) -> Result<MessageStream>
Subscribe to message types and return a stream of incoming messages.
The SDK automatically handles system.shutdown messages - when the engine
signals shutdown for handlers, the stream will close gracefully.
§Examples
// Single topic
let stream = handler.subscribe("timer.tick").await?;
// Multiple topics with array
let stream = handler.subscribe(["timer.tick", "timer.filtered"]).await?;
// From a Vec
let topics = vec!["timer.tick".to_string()];
let stream = handler.subscribe(topics).await?;§Errors
Returns an error if the subscription fails.
Sourcepub async fn messages(
name: impl Into<String>,
_types: impl IntoSubscription,
) -> Result<(Self, MessageStream)>
pub async fn messages( name: impl Into<String>, _types: impl IntoSubscription, ) -> Result<(Self, MessageStream)>
Convenience method that connects, gets configured subscriptions, and returns (handler, stream) for the common one-liner pattern.
§Errors
Returns an error if connection or subscription fails.
Sourcepub async fn publish(&self, message: EmergentMessage) -> Result<()>
pub async fn publish(&self, message: EmergentMessage) -> Result<()>
Publish a message to the engine (fire-and-forget).
§Errors
Returns an error if the message cannot be sent.
Sourcepub async fn publish_ack(&self, message: EmergentMessage) -> Result<()>
pub async fn publish_ack(&self, message: EmergentMessage) -> Result<()>
Publish a message with broker acknowledgment (backpressure).
Unlike [publish], this waits for the engine’s message broker to confirm
it has processed and forwarded the message before returning. This provides
natural backpressure — the caller cannot outpace the broker.
Used internally by [publish_all] and [publish_stream].
§Errors
Returns an error if the broker rejects the message or times out.
Sourcepub async fn publish_all(
&self,
messages: impl IntoIterator<Item = EmergentMessage>,
) -> Result<usize>
pub async fn publish_all( &self, messages: impl IntoIterator<Item = EmergentMessage>, ) -> Result<usize>
Publish all messages from an iterator.
Sends each message individually with broker acknowledgment so subscribers begin consuming immediately. Stops on the first error.
Returns the number of messages successfully published.
§Errors
Returns the first publish error encountered.
Sourcepub async fn publish_stream<S>(&self, stream: S) -> Result<usize>
pub async fn publish_stream<S>(&self, stream: S) -> Result<usize>
Publish messages from an async stream.
Consumes the stream, publishing each message individually with broker acknowledgment so subscribers begin consuming immediately. Stops on the first publish error or when the stream ends.
Returns the number of messages successfully published.
§Errors
Returns the first publish error encountered.
Sourcepub async fn stream_offer(
&self,
message_type: &str,
items: impl IntoIterator<Item = Value>,
pull_stream: &mut MessageStream,
timeout: Duration,
) -> Result<usize>
pub async fn stream_offer( &self, message_type: &str, items: impl IntoIterator<Item = Value>, pull_stream: &mut MessageStream, timeout: Duration, ) -> Result<usize>
Offer items as a pull-based stream with consumer-driven backpressure.
Publishes stream.ready, then serves items one at a time as the
consumer sends stream.pull requests. Publishes stream.end
when exhausted.
§Errors
Returns an error if the stream times out or the pull stream closes.
Sourcepub async fn stream_consume(
&self,
message_type: &str,
source_stream: &mut MessageStream,
timeout: Duration,
on_item: impl FnMut(EmergentMessage),
) -> Result<usize>
pub async fn stream_consume( &self, message_type: &str, source_stream: &mut MessageStream, timeout: Duration, on_item: impl FnMut(EmergentMessage), ) -> Result<usize>
Consume a pull-based stream, yielding items via callback.
Waits for stream.ready, then sends stream.pull requests
automatically after each item is consumed. Stops on stream.end.
§Errors
Returns an error if the stream times out or the source stream closes.
Sourcepub async fn discover(&self) -> Result<DiscoveryInfo>
pub async fn discover(&self) -> Result<DiscoveryInfo>
Discover available message types and primitives.
§Errors
Returns an error if the discovery request fails.
Sourcepub async fn get_my_subscriptions(&self) -> Result<Vec<String>>
pub async fn get_my_subscriptions(&self) -> Result<Vec<String>>
Get the configured subscription types for this primitive.
§Errors
Returns an error if the request fails.
Sourcepub fn subscribed_types(&self) -> &[String]
pub fn subscribed_types(&self) -> &[String]
Get currently subscribed message types.