Expand description
Channel module – lightweight in-process message pipes.
Provides abstractions for enqueuing and handing off Exchange instances between processing
stages. Channels do not transform messages; they only move them. Transformation, routing,
and filtering happen before (on the way in) or after (on receipt) via other crate components.
§Implementations
DirectChannel– immediate fan‑out to registered subscribers (no buffering).QueueChannel– FIFO buffered channel supporting dequeue & correlation lookup.
§Extension Traits
SubscribableChannel– register subscriber callbacks (DirectChannel).PollableChannel– dequeue / blocking receive operations (QueueChannel).CorrelationSupport– correlation id helpers (QueueChannel only).
§Construction
Channels expose explicit and random id constructors:
DirectChannel::with_id("id"),DirectChannel::with_random_id()QueueChannel::with_id("id"),QueueChannel::with_random_id()Or use helpers:new_queue(), [new_queue_with_id("id")].
§Examples
§DirectChannel
use allora_core::{channel::DirectChannel, Channel, Exchange, Message};
let dc = DirectChannel::with_random_id();
dc.subscribe(|ex| { assert_eq!(ex.in_msg.body_text(), Some("ping")); Ok(()) });
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async { dc.send(Exchange::new(Message::from_text("ping"))).await.unwrap(); });§QueueChannel
use allora_core::{channel::{QueueChannel, PollableChannel, Channel}, Exchange, Message};
let ch = QueueChannel::with_id("demo");
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
ch.send(Exchange::new(Message::from_text("ping"))).await.unwrap();
let ex = ch.try_receive().await.unwrap();
assert_eq!(ex.in_msg.body_text(), Some("ping"));
});§Correlation
use allora_core::{channel::{QueueChannel, CorrelationSupport}, Exchange, Message};
let ch = QueueChannel::with_random_id();
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let cid = ch.send_with_correlation(Exchange::new(Message::from_text("req"))).await.unwrap();
let ex_opt = ch.receive_by_correlation(&cid).await;
let ex = ex_opt.unwrap();
assert_eq!(ex.in_msg.body_text(), Some("req"));
});§Notes
- DirectChannel does not implement
PollableChannelorCorrelationSupport. - QueueChannel guarantees FIFO order for normal dequeue operations.
- Correlation lookup removes the matched exchange from the internal queue.
Structs§
- Direct
Channel - Direct, synchronous handoff channel.
- Queue
Channel
Traits§
- Channel
- Core channel trait providing send capabilities and metadata.
- Correlation
Support - Correlation lookup extension (QueueChannel only).
- Pollable
Channel - Dequeue extension (QueueChannel).
- Subscribable
Channel - Register-and-fanout extension trait (DirectChannel).
Functions§
- new_
queue - Create a new queue channel with auto-generated id (
queue:<uuid>). - new_
queue_ with_ id - Create a new queue channel with explicit id.
Type Aliases§
- Channel
Ref - Type alias for a trait object reference to any Channel implementation.