allora_core/channel/direct.rs
1//! DirectChannel: immediate handoff (no internal buffering).
2//!
3//! Features:
4//! * Zero-copy handoff (only `Exchange` clone per subscriber for isolation)
5//! * Ordered subscriber invocation (registration order)
6//! * Short-circuit on first error (remaining subscribers skipped)
7//! * Sync and async send variants via feature flag
8//! * Minimal API surface (construction + subscription + send)
9//!
10//! Not Provided:
11//! * Internal queueing, polling APIs (use `QueueChannel` for that)
12//! * Built-in correlation management (headers preserved but not generated)
13
14use crate::channel::SubscribableChannel;
15use crate::{error::Result, Channel, Exchange};
16use std::any::Any;
17use std::fmt::Debug;
18use std::sync::Mutex;
19use tracing::trace;
20
21/// Direct, synchronous handoff channel.
22///
23/// No internal buffering: send immediately dispatches the exchange to all
24/// subscribers in registration order. Each subscriber receives a cloned
25/// Exchange.
26pub struct DirectChannel {
27 // Stable identifier for this channel instance.
28 id: String,
29 // Registered subscriber callbacks invoked on each send.
30 subscribers: Mutex<Vec<Box<dyn Fn(Exchange) -> Result<()> + Send + Sync>>>,
31}
32
33impl Debug for DirectChannel {
34 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35 let count = self.subscribers.lock().map(|v| v.len()).unwrap_or(0);
36 f.debug_struct("DirectChannel")
37 .field("id", &self.id)
38 .field("subscriber_count", &count)
39 .finish()
40 }
41}
42
43impl DirectChannel {
44 // ========================================================================
45 // Constructors
46 // ========================================================================
47 /// Create a new direct channel with an auto-generated UUID-based id (`direct:<uuid>`).
48 pub fn with_random_id() -> Self {
49 Self {
50 id: format!("direct:{}", uuid::Uuid::new_v4()),
51 subscribers: Mutex::new(Vec::new()),
52 }
53 }
54 /// Create a new direct channel with an explicit id.
55 pub fn with_id<S: Into<String>>(id: S) -> Self {
56 Self {
57 id: id.into(),
58 subscribers: Mutex::new(Vec::new()),
59 }
60 }
61 /// Convenience: construct with optional id and an initial iterator of subscribers.
62 /// Each provided closure is subscribed in iteration order.
63 pub fn with_subscribers<I, F>(id: Option<&str>, subs: I) -> Self
64 where
65 I: IntoIterator<Item = F>,
66 F: Fn(Exchange) -> Result<()> + Send + Sync + 'static,
67 {
68 let ch = match id {
69 Some(i) => DirectChannel::with_id(i),
70 None => DirectChannel::with_random_id(),
71 };
72 for f in subs {
73 ch.subscribe(f);
74 }
75 ch
76 }
77 // ========================================================================
78 // Public accessors
79 // ========================================================================
80 /// Returns the channel identifier.
81 pub fn id(&self) -> &str {
82 &self.id
83 }
84 // ========================================================================
85 // Internal helpers
86 // ========================================================================
87 /// Dispatch an exchange to all subscribers. Clones the exchange once per subscriber.
88 /// Short-circuits on first error.
89 fn dispatch(&self, exchange: Exchange) -> Result<()> {
90 let subs = self.subscribers.lock().unwrap();
91 trace!(target: "allora::channel", channel_id = %self.id, subscribers = subs.len(), in_body = ?exchange.in_msg.body_text(), "direct dispatch start");
92 for (_idx, sub) in subs.iter().enumerate() {
93 let cloned = exchange.clone();
94 trace!(target: "allora::channel", channel_id = %self.id, subscriber_index = _idx, in_body = ?cloned.in_msg.body_text(), "direct dispatch to subscriber");
95 sub(cloned)?;
96 }
97 Ok(())
98 }
99 // ========================================================================
100 // Subscription management
101 // ========================================================================
102 /// Register a subscriber. Returns total subscriber count after insertion.
103 pub fn subscribe<F>(&self, f: F) -> usize
104 where
105 F: Fn(Exchange) -> Result<()> + Send + Sync + 'static,
106 {
107 let mut subs = self.subscribers.lock().unwrap();
108 subs.push(Box::new(f));
109 subs.len()
110 }
111}
112
113#[async_trait::async_trait]
114impl Channel for DirectChannel {
115 // --------------------------------------------------------------------
116 // Identity & metadata
117 // --------------------------------------------------------------------
118 fn id(&self) -> &str {
119 &self.id
120 }
121 // --------------------------------------------------------------------
122 // Send operations
123 // --------------------------------------------------------------------
124 async fn send(&self, exchange: Exchange) -> Result<()> {
125 self.dispatch(exchange)
126 }
127 fn kind(&self) -> &'static str {
128 "direct"
129 }
130 fn as_any(&self) -> &dyn Any {
131 self
132 }
133}
134
135impl SubscribableChannel for DirectChannel {
136 fn subscribe<F>(&self, f: F) -> usize
137 where
138 F: Fn(Exchange) -> Result<()> + Send + Sync + 'static,
139 {
140 self.subscribe(f) // delegate to inherent method
141 }
142}