pipedream-rs
A typed, heterogeneous event relay library for Rust with observable delivery, explicit lifecycle management, and error propagation.
Features
- Observable delivery - high throughput with observable drops via
Droppedevents - Channel-based ownership - explicit lifecycle with
RelaySenderandRelayReceiver - Single relay carries messages of any type
- Type-based filtering and transformation
- Typed subscriptions that receive only matching types
- Pipeline composition via chaining
- Explicit completion tracking -
send().awaitwaits for tracked handlers - Error propagation from handlers back to senders
- Panic catching in handlers with error reporting
What Pipedream IS and IS NOT
Pipedream is designed for observable in-process event streaming.
✅ What Pipedream IS
- Observable event relay with fast broadcast delivery
- Type-safe transformations with compile-time checking
- Explicit completion tracking for async handler coordination
- In-process, high-throughput event streaming
- Panic-resilient with error propagation
Best for: logging, metrics, monitoring, in-process pub/sub, application event streaming
❌ What Pipedream IS NOT
- ❌ A guaranteed delivery queue - Use RabbitMQ, SQS, or disk-backed queues
- ❌ A replacement for Kafka / NATS - Use those for distributed systems
- ❌ A transactional system - No rollback, no exactly-once semantics
- ❌ A state machine runtime - See statecharts libraries
- ❌ A distributed system - Single-process only
- ❌ An actor framework - No supervision trees or location transparency
The boundary: If you need durability, distribution, or guaranteed delivery, pipedream is the wrong tool.
Installation
[]
= "0.1"
= { = "1", = ["full"] }
Quick Start
use Relay;
async
API
Creating a Relay
// Create a relay channel with default buffer size (65536)
let = channel;
// Custom channel size (affects drop threshold)
let = channel_with_size;
// WeakSender - send without keeping channel alive
let weak_tx = tx.weak;
weak_tx.send.await?;
// Check tracked handler count (for debugging)
println!;
// Check if closed
if tx.is_closed
Sending & Receiving
// Create channel
let = channel;
// Send and wait for tracked handlers to complete
// Returns Ok if message was sent (drops observable via Dropped events)
tx.send.await?;
// Type-filtered subscription (untracked - send() doesn't wait for these)
let mut sub = rx.;
while let Some = sub.recv.await
// Closing: drop RelaySender to close the channel
drop;
assert!;
Filtering
let = channel;
// Filter by type and predicate
let large = rx.;
// Filter by type only
let strings = rx.;
// Exclude a type
let no_errors = rx.;
// Split by type
let = rx.;
Transformation
let = channel;
// Map values of one type to another
let lengths = rx.;
// Filter and map in one operation
let numbers = rx.;
// Chain transformations
let processed = rx
.
.;
Batching
let = channel;
// Collect messages into batches of N
let batched = rx.;
// Process batches
batched.;
When the relay closes, any remaining buffered messages are emitted as a final (possibly smaller) batch.
Handlers with Error Propagation
Handlers can return () or Result<(), E>. Errors propagate back to send().await:
let = channel;
// Handler that can fail
rx.;
// Handler without errors (returns ())
rx.;
// Errors propagate to sender
match tx.send.await
Observing with tap()
let = channel;
// Tap - observe without consuming (returns receiver for chaining)
rx.
.;
Subscribing to Errors
Errors from handlers are also sent through the relay:
use RelayError;
let = channel;
// Subscribe to errors globally
let mut errors = rx.;
spawn;
Spawning Tasks with within()
For custom async processing with panic catching:
let = channel;
let mut sub = rx.;
rx.within;
Note: within() does not participate in completion tracking. Use sink() or tap() if you need send().await to wait for your handler.
Error Handling
Error Flow
Errors in handlers flow two ways:
- To sender:
send().awaitreturnsErr(SendError::Downstream(...)) - Through relay: Subscribe to
RelayErrorfor monitoring
use ;
let = channel;
// Monitor all errors
let mut error_sub = rx.;
spawn;
// Handler that may fail
rx.;
// Caller gets error
if let Err = tx.send.await
Panic Catching
Panics in sink(), tap(), and within() are caught and converted to RelayError:
let = channel;
rx.;
// Panic becomes RelayError with source="sink"
Execution Semantics
Observable Delivery
pipedream provides observable delivery semantics:
- High throughput - messages delivered via
try_send(non-blocking) - Observable drops - if subscriber buffer fills, drops are observable via
Droppedevents - No backpressure - senders never block waiting for slow consumers
send()is async and authoritative - if it returnsOk, message was sent- Per-subscriber MPSC channels with configurable buffer size (default: 65536)
- Delivery happens inside
send(), not in background tasks
Monitoring: Subscribe to Dropped events to observe message drops:
let mut dropped = rx.;
spawn;
Buffer Tuning: Use channel_with_size() to tune buffer size for your workload:
// Smaller buffer for low-latency, high-drop tolerance
let = channel_with_size;
// Larger buffer for high-throughput, low-drop scenarios
let = channel_with_size;
Message Lifecycle
tx.send(msg) called
│
▼
┌─────────────────────────────────────┐
│ 1. Check if relay is closed │
│ 2. Await ready signals │
│ 3. Snapshot handler_count (N) │
│ 4. Create tracker expecting N │
│ 5. Deliver to all subscribers │ ← try_send to each subscriber
│ 6. Wait for tracked handlers │
└─────────────────────────────────────┘
│
▼ (sent via try_send, drops observable)
│
┌───┴───┬───────┬───────┐
▼ ▼ ▼ ▼
Handler Handler Handler Subscription
(sink) (tap) (sink) (untracked)
│ │ │
▼ ▼ ▼
complete complete complete
│ │ │
└───────┴───────┘
│
▼
tracker.is_complete()
│
▼
send().await returns
Completion Tracking
What's tracked (send().await waits for these):
sink()- terminal handlertap()- pass-through observer
What's NOT tracked (fire-and-forget):
subscribe()- raw subscription, for monitoring/loggingwithin()- spawner for custom async processing- Child relay handlers - isolated tracking boundary
Tracking Boundaries
Transformations (filter, map, filter_map, batch) create tracking boundaries:
let = channel;
let filtered = rx.;
filtered.;
// send() on parent does NOT wait for filtered sink
// Each relay has its own independent handler_count
tx.send.await?; // Returns after parent's handlers complete
Parent and child relays have independent tracking. Errors from child handlers don't propagate to parent.
Handler Count Semantics
Handler count is snapshotted at send time:
let = channel;
// At this point, handler_count = 0
// Now handler_count = 1
rx.;
// send() snapshots expected = 1, waits for 1 completion
tx.send.await?;
Concurrent registration behavior:
- Handlers registering during
send()may or may not be counted - A late-registered handler may receive the message but won't be waited for
- This is intentional "best effort" - not strict membership
Guarantee: If all handlers are registered before send(), all will be tracked.
Type Filtering Completion
Handlers complete immediately for messages of non-matching types:
let = channel;
rx.; // Handler 1
rx.; // Handler 2
// expected = 2
// Handler 1 sees String, processes, completes
// Handler 2 sees String (wrong type), completes immediately
tx.send.await?; // Returns after both complete
Error Flow
Handler error
│
├──► tracker.fail(error) ──► send().await returns Err
│
└──► stream.send_raw(error) ──► subscribe::<DataStreamError>()
Both paths fire. Errors are observable via subscription AND returned to sender.
Lifecycle
Relays follow automatic cascade close:
1. RelaySender dropped/closed
2. Subscriber channels close
3. Child forwarding tasks detect closure
4. Child relays are closed
5. All subscriptions receive None
Creating a transformation on a closed relay returns a closed child immediately.
Buffer Sizing and Drop Monitoring
pipedream uses per-subscriber MPSC channels with configurable buffer size:
- When a subscriber's buffer is full, messages are dropped (not queued)
- Drops are observable via
Droppedevents - No backpressure - senders never block
- Use
channel_with_size()to tune buffer size for your workload - Larger buffers reduce drops but use more memory
// Higher buffer for bursty workloads (default is 65536)
let = channel_with_size;
// Lower buffer when drops are acceptable
let = channel_with_size;
// Monitor drops
let mut dropped = rx.;
spawn;
Example: Processing Pipeline with Error Handling
let = channel;
// Monitor errors
rx..within;
// Stage 1: Parse (can fail)
rx.;
// Stage 2: Validate
rx.;
// Stage 3: Consume
rx.;
Detailed Semantics
For a complete specification of completion tracking, error propagation, and invariants, see SEMANTICS.md.
License
MIT