Module channel

Module channel 

Source
Expand description

Channel module – lightweight in-process message pipes.

Provides abstractions for enqueuing and handing off Exchange instances between processing stages. Channels do not transform messages; they only move them. Transformation, routing, and filtering happen before (on the way in) or after (on receipt) via other crate components.

§Implementations

  • DirectChannel – immediate fan‑out to registered subscribers (no buffering).
  • QueueChannel – FIFO buffered channel supporting dequeue & correlation lookup.

§Extension Traits

§Construction

Channels expose explicit and random id constructors:

  • DirectChannel::with_id("id"), DirectChannel::with_random_id()
  • QueueChannel::with_id("id"), QueueChannel::with_random_id() Or use helpers: new_queue(), [new_queue_with_id("id")].

§Examples

§DirectChannel

use allora_core::{channel::DirectChannel, Channel, Exchange, Message};
let dc = DirectChannel::with_random_id();
dc.subscribe(|ex| { assert_eq!(ex.in_msg.body_text(), Some("ping")); Ok(()) });
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async { dc.send(Exchange::new(Message::from_text("ping"))).await.unwrap(); });

§QueueChannel

use allora_core::{channel::{QueueChannel, PollableChannel, Channel}, Exchange, Message};
let ch = QueueChannel::with_id("demo");
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
    ch.send(Exchange::new(Message::from_text("ping"))).await.unwrap();
    let ex = ch.try_receive().await.unwrap();
    assert_eq!(ex.in_msg.body_text(), Some("ping"));
});

§Correlation

use allora_core::{channel::{QueueChannel, CorrelationSupport}, Exchange, Message};
let ch = QueueChannel::with_random_id();
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
    let cid = ch.send_with_correlation(Exchange::new(Message::from_text("req"))).await.unwrap();
    let ex_opt = ch.receive_by_correlation(&cid).await;
    let ex = ex_opt.unwrap();
    assert_eq!(ex.in_msg.body_text(), Some("req"));
});

§Notes

  • DirectChannel does not implement PollableChannel or CorrelationSupport.
  • QueueChannel guarantees FIFO order for normal dequeue operations.
  • Correlation lookup removes the matched exchange from the internal queue.

Structs§

DirectChannel
Direct, synchronous handoff channel.
QueueChannel

Traits§

Channel
Core channel trait providing send capabilities and metadata.
CorrelationSupport
Correlation lookup extension (QueueChannel only).
PollableChannel
Dequeue extension (QueueChannel).
SubscribableChannel
Register-and-fanout extension trait (DirectChannel).

Functions§

new_queue
Create a new queue channel with auto-generated id (queue:<uuid>).
new_queue_with_id
Create a new queue channel with explicit id.

Type Aliases§

ChannelRef
Type alias for a trait object reference to any Channel implementation.