emergent-client
Rust SDK for building event-driven workflows on the Emergent engine. Connect to a running engine over Unix IPC and publish or consume messages through three async primitives: Source, Handler, and Sink.
use *;
async
Install
Add the crate to your project:
Then import the prelude for the most common types:
use *;
Or import individual types as needed:
use ;
Three Primitives
Every Emergent workflow is composed of Sources, Handlers, and Sinks. Each primitive has a single, well-defined role:
| Primitive | Subscribe | Publish | Role |
|---|---|---|---|
| Source | -- | Yes | Ingress -- bring data into the system |
| Handler | Yes | Yes | Processing -- transform, enrich, or route |
| Sink | Yes | -- | Egress -- persist, display, or forward data |
Quick Start
Sink -- consume messages
A Sink subscribes to message types and processes each one as it arrives.
EmergentSink::messages is a convenience method that connects, subscribes, and
yields messages in a single call:
use *;
let mut stream = messages.await?;
while let Some = stream.next.await
For explicit lifecycle control, connect and subscribe separately:
let sink = connect.await?;
let mut stream = sink.subscribe.await?;
while let Some = stream.next.await
Source -- publish messages
A Source publishes messages into the engine. It cannot subscribe:
use ;
use json;
let source = connect.await?;
source.publish.await?;
Handler -- subscribe and publish
A Handler subscribes to incoming messages and publishes new ones. Use
with_causation_from_message to link output messages to the input that
triggered them:
use *;
use json;
let handler = connect.await?;
let mut stream = handler.subscribe.await?;
while let Some = stream.next.await
Publishing Messages
Every primitive that can publish supports two calling styles. Both produce the same result:
use ;
use json;
// Builder pattern with fluent API
source.publish.await?;
// Factory function (matches Python and TypeScript SDKs)
source.publish.await?;
Building Messages
EmergentMessage::new and create_message return a builder with fluent
methods for constructing messages:
use ;
use json;
let msg = create_message
.with_payload
.with_metadata;
Link messages into traceable chains with with_causation_from_message and
with_correlation_id:
use CorrelationId;
let reply = new
.with_causation_from_message
.with_correlation_id
.with_payload;
The builder sets id (TypeID format msg_<uuidv7>) and timestamp_ms
automatically.
Subscribing to Messages
subscribe accepts any type that implements IntoSubscription -- a single
&str, an array, a slice, or a Vec:
// Single topic
let stream = sink.subscribe.await?;
// Array of topics
let stream = sink.subscribe.await?;
// Slice of topics
let stream = sink.subscribe.await?;
// From a Vec
let topics = vec!;
let stream = sink.subscribe.await?;
Iterate over the returned MessageStream with while let:
while let Some = stream.next.await
MessageStream implements futures::Stream, so you can use StreamExt
combinators (re-exported in the prelude):
use *;
stream
.filter
.for_each
.await;
Typed payloads with serde
payload_as deserializes the JSON payload into any type that implements
serde::DeserializeOwned:
use Deserialize;
while let Some = stream.next.await
Resource Cleanup
Call disconnect() to cleanly close the connection. The SDK sends an
unsubscribe-all message so the server sees a normal EOF rather than a
connection reset:
let source = connect.await?;
// ... use source ...
source.disconnect.await?;
The SDK subscribes to system.shutdown internally. When the Emergent engine
signals a graceful shutdown, active message streams close automatically.
Helper Functions
run_source, run_handler, and run_sink eliminate connection, signal
handling, and shutdown boilerplate. Each helper connects, sets up SIGTERM
handlers, runs your async closure, and disconnects on completion.
Import them from the helpers module:
use ;
Source -- custom event loop with shutdown signal
use run_source;
use EmergentMessage;
use json;
use Duration;
run_source.await?;
Handler -- called once per message
use run_handler;
use EmergentMessage;
use json;
run_handler.await?;
Sink -- called once per message
use run_sink;
run_sink.await?;
The name argument is optional. When set to None, the helper reads from the
EMERGENT_NAME environment variable, falling back to a default.
Error Handling
All SDK operations return emergent_client::Result<T>, which uses
ClientError as the error type. Match on specific variants for precise
control:
use ;
match connect.await
Error Variants
| Variant | Description |
|---|---|
ConnectionFailed |
Engine connection failed |
SocketNotFound |
Engine socket does not exist at expected path |
Timeout |
Operation timed out |
ProtocolError |
Unexpected message from engine |
SubscriptionFailed |
Subscription request rejected |
PublishFailed |
Publish request failed |
DiscoveryFailed |
Discovery request failed |
SerializationError |
Message serialization/deserialization error |
IoError |
Underlying I/O error |
IpcError |
Low-level IPC protocol error |
EngineError |
Engine returned an application-level error |
Helper functions use a separate HelperError type with variants for
connection, subscription, signal setup, and user-function errors.
Message Shape
Every message flowing through Emergent follows the same envelope:
| Field | Type | Description |
|---|---|---|
id |
MessageId |
Unique TypeID (msg_<uuidv7>) |
message_type |
MessageType |
Routing key (e.g., "timer.tick") |
source |
PrimitiveName |
Name of the publishing primitive |
correlation_id |
Option<CorrelationId> |
Links related messages |
causation_id |
Option<CausationId> |
ID of the triggering message |
timestamp_ms |
Timestamp |
Creation time (Unix ms) |
payload |
serde_json::Value |
User-defined data |
metadata |
Option<serde_json::Value> |
Optional tracing/debug data |
All identifier types (MessageId, CorrelationId, CausationId) use the
TypeID format and are available from emergent_client::types.
Use msg.payload_as::<T>() to deserialize the payload into any
serde::DeserializeOwned type.
System Events
The Emergent engine broadcasts lifecycle events that your primitives can subscribe to:
| Event Pattern | Payload Type | Fired When |
|---|---|---|
system.started.<name> |
SystemEventPayload |
Primitive started |
system.stopped.<name> |
SystemEventPayload |
Primitive stopped |
system.error.<name> |
SystemEventPayload |
Primitive failed |
system.shutdown |
SystemShutdownPayload |
Engine shutting down |
Use the typed payload structs for safe access:
use ;
if msg.message_type.as_str.starts_with
if msg.message_type.as_str.starts_with
Requirements
- Rust 2024 edition (1.85+)
- Tokio async runtime
- A running Emergent engine with the
EMERGENT_SOCKETenvironment variable set - Unix platform (Linux or macOS) -- the SDK communicates over Unix domain sockets
License
MIT OR Apache-2.0