allora_core/channel/
mod.rs

1//! Channel module – lightweight in-process message pipes.
2//!
3//! Provides abstractions for enqueuing and handing off `Exchange` instances between processing
4//! stages. Channels do not transform messages; they only move them. Transformation, routing,
5//! and filtering happen before (on the way in) or after (on receipt) via other crate components.
6//!
7//! # Implementations
8//! * [`DirectChannel`] – immediate fan‑out to registered subscribers (no buffering).
9//! * [`QueueChannel`] – FIFO buffered channel supporting dequeue & correlation lookup.
10//!
11//! # Extension Traits
12//! * [`SubscribableChannel`] – register subscriber callbacks (DirectChannel).
13//! * [`PollableChannel`] – dequeue / blocking receive operations (QueueChannel).
14//! * [`CorrelationSupport`] – correlation id helpers (QueueChannel only).
15//!
16//! # Construction
17//! Channels expose explicit and random id constructors:
18//! * `DirectChannel::with_id("id")`, `DirectChannel::with_random_id()`
19//! * `QueueChannel::with_id("id")`, `QueueChannel::with_random_id()`
20//! Or use helpers: [`new_queue()`], [`new_queue_with_id("id")`].
21//!
22//! # Examples
23//!
24//! ## DirectChannel
25//! ```rust
26//! use allora_core::{channel::DirectChannel, Channel, Exchange, Message};
27//! let dc = DirectChannel::with_random_id();
28//! dc.subscribe(|ex| { assert_eq!(ex.in_msg.body_text(), Some("ping")); Ok(()) });
29//! let rt = tokio::runtime::Runtime::new().unwrap();
30//! rt.block_on(async { dc.send(Exchange::new(Message::from_text("ping"))).await.unwrap(); });
31//! ```
32//!
33//! ## QueueChannel
34//! ```rust
35//! use allora_core::{channel::{QueueChannel, PollableChannel, Channel}, Exchange, Message};
36//! let ch = QueueChannel::with_id("demo");
37//! let rt = tokio::runtime::Runtime::new().unwrap();
38//! rt.block_on(async {
39//!     ch.send(Exchange::new(Message::from_text("ping"))).await.unwrap();
40//!     let ex = ch.try_receive().await.unwrap();
41//!     assert_eq!(ex.in_msg.body_text(), Some("ping"));
42//! });
43//! ```
44//!
45//! ## Correlation
46//! ```rust
47//! use allora_core::{channel::{QueueChannel, CorrelationSupport}, Exchange, Message};
48//! let ch = QueueChannel::with_random_id();
49//! let rt = tokio::runtime::Runtime::new().unwrap();
50//! rt.block_on(async {
51//!     let cid = ch.send_with_correlation(Exchange::new(Message::from_text("req"))).await.unwrap();
52//!     let ex_opt = ch.receive_by_correlation(&cid).await;
53//!     let ex = ex_opt.unwrap();
54//!     assert_eq!(ex.in_msg.body_text(), Some("req"));
55//! });
56//! ```
57//!
58//! ## Notes
59//! * DirectChannel does not implement `PollableChannel` or `CorrelationSupport`.
60//! * QueueChannel guarantees FIFO order for normal dequeue operations.
61//! * Correlation lookup removes the matched exchange from the internal queue.
62
63// ============================================================================
64// Module declarations
65// ============================================================================
66mod direct;
67mod log; // internal logging utilities
68mod queue;
69
70// ============================================================================
71// Public exports
72// ============================================================================
73pub use direct::DirectChannel;
74pub use queue::QueueChannel;
75
76// ============================================================================
77// Imports
78// ============================================================================
79use crate::{error::Result, Exchange};
80use std::any::Any;
81use std::fmt::Debug;
82use std::sync::Arc;
83
84// ============================================================================
85// Core Traits
86// ============================================================================
87
88/// Core channel trait providing send capabilities and metadata.
89///
90/// Implementors provide either sync or async send depending on the `async` feature.
91#[async_trait::async_trait]
92pub trait Channel: Send + Sync + Debug {
93    fn id(&self) -> &str;
94    async fn send(&self, exchange: Exchange) -> Result<()>;
95    fn kind(&self) -> &'static str {
96        "unknown"
97    }
98    fn as_any(&self) -> &dyn Any;
99}
100
101// ============================================================================
102// Extension Traits
103// ============================================================================
104
105/// Register-and-fanout extension trait (DirectChannel).
106pub trait SubscribableChannel: Channel {
107    fn subscribe<F>(&self, f: F) -> usize
108    where
109        F: Fn(Exchange) -> Result<()> + Send + Sync + 'static;
110}
111/// Dequeue extension (QueueChannel).
112#[async_trait::async_trait]
113pub trait PollableChannel: Channel {
114    async fn try_receive(&self) -> Option<Exchange>;
115}
116/// Correlation lookup extension (QueueChannel only).
117#[async_trait::async_trait]
118pub trait CorrelationSupport: Channel {
119    async fn send_with_correlation(&self, exchange: Exchange) -> Result<String>;
120    async fn receive_by_correlation(&self, corr_id: &str) -> Option<Exchange>;
121}
122
123// ============================================================================
124// Type aliases
125// ============================================================================
126
127/// Type alias for a trait object reference to any Channel implementation.
128pub type ChannelRef = Arc<dyn Channel>;
129
130// ============================================================================
131// Convenience constructors
132// ============================================================================
133/// Create a new queue channel with auto-generated id (`queue:<uuid>`).
134pub fn new_queue() -> ChannelRef {
135    Arc::new(QueueChannel::with_random_id())
136}
137/// Create a new queue channel with explicit id.
138pub fn new_queue_with_id(id: impl Into<String>) -> ChannelRef {
139    Arc::new(QueueChannel::with_id(id))
140}