emergent-client
Rust SDK for building event-driven workflows on the Emergent engine. Connect to a running engine over Unix IPC and publish or consume messages through three async primitives: Source, Handler, and Sink.
use *;
async
Install
Add the crate to your project:
Then import the prelude for the most common types:
use *;
Or import individual types as needed:
use ;
Three Primitives
Every Emergent workflow is composed of Sources, Handlers, and Sinks. Each primitive has a single, well-defined role:
| Primitive | Subscribe | Publish | Role |
|---|---|---|---|
| Source | -- | Yes | Ingress -- bring data into the system |
| Handler | Yes | Yes | Processing -- transform, enrich, or route |
| Sink | Yes | -- | Egress -- persist, display, or forward data |
Quick Start
Sink -- consume messages
A Sink subscribes to message types and processes each one as it arrives.
EmergentSink::messages is a convenience method that connects, subscribes, and
yields messages in a single call:
use *;
let mut stream = messages.await?;
while let Some = stream.next.await
For explicit lifecycle control, connect and subscribe separately:
let sink = connect.await?;
let mut stream = sink.subscribe.await?;
while let Some = stream.next.await
Source -- publish messages
A Source publishes messages into the engine. It cannot subscribe:
use ;
use json;
let source = connect.await?;
source.publish.await?;
Handler -- subscribe and publish
A Handler subscribes to incoming messages and publishes new ones. Use
with_causation_from_message to link output messages to the input that
triggered them:
use *;
use json;
let handler = connect.await?;
let mut stream = handler.subscribe.await?;
while let Some = stream.next.await
Publishing Messages
Every primitive that can publish supports two calling styles. Both produce the same result:
use ;
use json;
// Builder pattern with fluent API
source.publish.await?;
// Factory function (matches Python and TypeScript SDKs)
source.publish.await?;
Streaming Publish
Publish a collection or async stream of messages. Each message is sent individually so subscribers begin consuming immediately. Both methods return the count of successfully published messages and stop on the first error.
// From a Vec or any IntoIterator
let messages: = records.iter.map.collect;
let count = source.publish_all.await?;
// From an async stream (e.g., tokio channel)
use ReceiverStream;
let = channel;
// ... spawn producer that sends messages into tx ...
let count = source.publish_stream.await?;
Both publish_all and publish_stream are available on EmergentSource and
EmergentHandler.
Building Messages
EmergentMessage::new and create_message return a builder with fluent
methods for constructing messages:
use ;
use json;
let msg = create_message
.with_payload
.with_metadata;
Link messages into traceable chains with with_causation_from_message and
with_correlation_id:
use CorrelationId;
let reply = new
.with_causation_from_message
.with_correlation_id
.with_payload;
The builder sets id (TypeID format msg_<uuidv7>) and timestamp_ms
automatically.
Subscribing to Messages
subscribe accepts any type that implements IntoSubscription -- a single
&str, an array, a slice, or a Vec:
// Single topic
let stream = sink.subscribe.await?;
// Array of topics
let stream = sink.subscribe.await?;
// Slice of topics
let stream = sink.subscribe.await?;
// From a Vec
let topics = vec!;
let stream = sink.subscribe.await?;
Iterate over the returned MessageStream with while let:
while let Some = stream.next.await
MessageStream implements futures::Stream, so you can use StreamExt
combinators (re-exported in the prelude):
use *;
stream
.filter
.for_each
.await;
Typed payloads with serde
payload_as deserializes the JSON payload into any type that implements
serde::DeserializeOwned:
use Deserialize;
while let Some = stream.next.await
Resource Cleanup
Call disconnect() to cleanly close the connection. The SDK sends an
unsubscribe-all message so the server sees a normal EOF rather than a
connection reset:
let source = connect.await?;
// ... use source ...
source.disconnect.await?;
The SDK subscribes to system.shutdown internally. When the Emergent engine
signals a graceful shutdown, active message streams close automatically.
Helper Functions
run_source, run_handler, and run_sink eliminate connection, signal
handling, and shutdown boilerplate. Each helper connects, sets up SIGTERM
handlers, runs your async closure, and disconnects on completion.
Import them from the helpers module:
use ;
Source -- custom event loop with shutdown signal
use run_source;
use EmergentMessage;
use json;
use Duration;
run_source.await?;
Handler -- called once per message
use run_handler;
use EmergentMessage;
use json;
run_handler.await?;
Sink -- called once per message
use run_sink;
run_sink.await?;
The name argument is optional. When set to None, the helper reads from the
EMERGENT_NAME environment variable, falling back to a default.
Error Handling
All SDK operations return emergent_client::Result<T>, which uses
ClientError as the error type. Match on specific variants for precise
control:
use ;
match connect.await
Error Variants
| Variant | Description |
|---|---|
ConnectionFailed |
Engine connection failed |
SocketNotFound |
Engine socket does not exist at expected path |
Timeout |
Operation timed out |
ProtocolError |
Unexpected message from engine |
SubscriptionFailed |
Subscription request rejected |
PublishFailed |
Publish request failed |
DiscoveryFailed |
Discovery request failed |
SerializationError |
Message serialization/deserialization error |
IoError |
Underlying I/O error |
IpcError |
Low-level IPC protocol error |
EngineError |
Engine returned an application-level error |
Helper functions use a separate HelperError type with variants for
connection, subscription, signal setup, and user-function errors.
Message Shape
Every message flowing through Emergent follows the same envelope:
| Field | Type | Description |
|---|---|---|
id |
MessageId |
Unique TypeID (msg_<uuidv7>) |
message_type |
MessageType |
Routing key (e.g., "timer.tick") |
source |
PrimitiveName |
Name of the publishing primitive |
correlation_id |
Option<CorrelationId> |
Links related messages |
causation_id |
Option<CausationId> |
ID of the triggering message |
timestamp_ms |
Timestamp |
Creation time (Unix ms) |
payload |
serde_json::Value |
User-defined data |
metadata |
Option<serde_json::Value> |
Optional tracing/debug data |
All identifier types (MessageId, CorrelationId, CausationId) use the
TypeID format and are available from emergent_client::types.
Use msg.payload_as::<T>() to deserialize the payload into any
serde::DeserializeOwned type.
System Events
The Emergent engine broadcasts lifecycle events that your primitives can subscribe to:
| Event Pattern | Payload Type | Fired When |
|---|---|---|
system.started.<name> |
SystemEventPayload |
Primitive started |
system.stopped.<name> |
SystemEventPayload |
Primitive stopped |
system.error.<name> |
SystemEventPayload |
Primitive failed |
system.shutdown |
SystemShutdownPayload |
Engine shutting down |
Use the typed payload structs for safe access:
use ;
if msg.message_type.as_str.starts_with
if msg.message_type.as_str.starts_with
Requirements
- Rust 2024 edition (1.85+)
- Tokio async runtime
- A running Emergent engine with the
EMERGENT_SOCKETenvironment variable set - Unix platform (Linux or macOS) -- the SDK communicates over Unix domain sockets
License
MIT OR Apache-2.0