1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
//! Channel module – lightweight in-process message pipes.
//!
//! Provides abstractions for enqueuing and handing off `Exchange` instances between processing
//! stages. Channels do not transform messages; they only move them. Transformation, routing,
//! and filtering happen before (on the way in) or after (on receipt) via other crate components.
//!
//! # Implementations
//! * [`DirectChannel`] – immediate fan‑out to registered subscribers (no buffering).
//! * [`QueueChannel`] – FIFO buffered channel supporting dequeue & correlation lookup.
//!
//! # Extension Traits
//! * [`SubscribableChannel`] – register subscriber callbacks (DirectChannel).
//! * [`PollableChannel`] – dequeue / blocking receive operations (QueueChannel).
//! * [`CorrelationSupport`] – correlation id helpers (QueueChannel only).
//!
//! # Construction
//! Channels expose explicit and random id constructors:
//! * `DirectChannel::with_id("id")`, `DirectChannel::with_random_id()`
//! * `QueueChannel::with_id("id")`, `QueueChannel::with_random_id()`
//! Or use helpers: [`new_queue()`], [`new_queue_with_id("id")`].
//!
//! # Examples
//!
//! ## DirectChannel
//! ```rust
//! use allora_core::{channel::DirectChannel, Channel, Exchange, Message};
//! let dc = DirectChannel::with_random_id();
//! dc.subscribe(|ex| { assert_eq!(ex.in_msg.body_text(), Some("ping")); Ok(()) });
//! let rt = tokio::runtime::Runtime::new().unwrap();
//! rt.block_on(async { dc.send(Exchange::new(Message::from_text("ping"))).await.unwrap(); });
//! ```
//!
//! ## QueueChannel
//! ```rust
//! use allora_core::{channel::{QueueChannel, PollableChannel, Channel}, Exchange, Message};
//! let ch = QueueChannel::with_id("demo");
//! let rt = tokio::runtime::Runtime::new().unwrap();
//! rt.block_on(async {
//! ch.send(Exchange::new(Message::from_text("ping"))).await.unwrap();
//! let ex = ch.try_receive().await.unwrap();
//! assert_eq!(ex.in_msg.body_text(), Some("ping"));
//! });
//! ```
//!
//! ## Correlation
//! ```rust
//! use allora_core::{channel::{QueueChannel, CorrelationSupport}, Exchange, Message};
//! let ch = QueueChannel::with_random_id();
//! let rt = tokio::runtime::Runtime::new().unwrap();
//! rt.block_on(async {
//! let cid = ch.send_with_correlation(Exchange::new(Message::from_text("req"))).await.unwrap();
//! let ex_opt = ch.receive_by_correlation(&cid).await;
//! let ex = ex_opt.unwrap();
//! assert_eq!(ex.in_msg.body_text(), Some("req"));
//! });
//! ```
//!
//! ## Notes
//! * DirectChannel does not implement `PollableChannel` or `CorrelationSupport`.
//! * QueueChannel guarantees FIFO order for normal dequeue operations.
//! * Correlation lookup removes the matched exchange from the internal queue.
// ============================================================================
// Module declarations
// ============================================================================
// internal logging utilities
// ============================================================================
// Public exports
// ============================================================================
pub use DirectChannel;
pub use QueueChannel;
// ============================================================================
// Imports
// ============================================================================
use crate::;
use Any;
use Debug;
use Arc;
// ============================================================================
// Core Traits
// ============================================================================
/// Core channel trait providing send capabilities and metadata.
///
/// Implementors provide either sync or async send depending on the `async` feature.
// ============================================================================
// Extension Traits
// ============================================================================
/// Register-and-fanout extension trait (DirectChannel).