allora_core/channel/
direct.rs

1//! DirectChannel: immediate handoff (no internal buffering).
2//!
3//! Features:
4//! * Zero-copy handoff (only `Exchange` clone per subscriber for isolation)
5//! * Ordered subscriber invocation (registration order)
6//! * Short-circuit on first error (remaining subscribers skipped)
7//! * Sync and async send variants via feature flag
8//! * Minimal API surface (construction + subscription + send)
9//!
10//! Not Provided:
11//! * Internal queueing, polling APIs (use `QueueChannel` for that)
12//! * Built-in correlation management (headers preserved but not generated)
13
14use crate::channel::SubscribableChannel;
15use crate::{error::Result, Channel, Exchange};
16use std::any::Any;
17use std::fmt::Debug;
18use std::sync::Mutex;
19use tracing::trace;
20
21/// Direct, synchronous handoff channel.
22///
23/// No internal buffering: send immediately dispatches the exchange to all
24/// subscribers in registration order. Each subscriber receives a cloned
25/// Exchange.
26pub struct DirectChannel {
27    // Stable identifier for this channel instance.
28    id: String,
29    // Registered subscriber callbacks invoked on each send.
30    subscribers: Mutex<Vec<Box<dyn Fn(Exchange) -> Result<()> + Send + Sync>>>,
31}
32
33impl Debug for DirectChannel {
34    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35        let count = self.subscribers.lock().map(|v| v.len()).unwrap_or(0);
36        f.debug_struct("DirectChannel")
37            .field("id", &self.id)
38            .field("subscriber_count", &count)
39            .finish()
40    }
41}
42
43impl DirectChannel {
44    // ========================================================================
45    // Constructors
46    // ========================================================================
47    /// Create a new direct channel with an auto-generated UUID-based id (`direct:<uuid>`).
48    pub fn with_random_id() -> Self {
49        Self {
50            id: format!("direct:{}", uuid::Uuid::new_v4()),
51            subscribers: Mutex::new(Vec::new()),
52        }
53    }
54    /// Create a new direct channel with an explicit id.
55    pub fn with_id<S: Into<String>>(id: S) -> Self {
56        Self {
57            id: id.into(),
58            subscribers: Mutex::new(Vec::new()),
59        }
60    }
61    /// Convenience: construct with optional id and an initial iterator of subscribers.
62    /// Each provided closure is subscribed in iteration order.
63    pub fn with_subscribers<I, F>(id: Option<&str>, subs: I) -> Self
64    where
65        I: IntoIterator<Item = F>,
66        F: Fn(Exchange) -> Result<()> + Send + Sync + 'static,
67    {
68        let ch = match id {
69            Some(i) => DirectChannel::with_id(i),
70            None => DirectChannel::with_random_id(),
71        };
72        for f in subs {
73            ch.subscribe(f);
74        }
75        ch
76    }
77    // ========================================================================
78    // Public accessors
79    // ========================================================================
80    /// Returns the channel identifier.
81    pub fn id(&self) -> &str {
82        &self.id
83    }
84    // ========================================================================
85    // Internal helpers
86    // ========================================================================
87    /// Dispatch an exchange to all subscribers. Clones the exchange once per subscriber.
88    /// Short-circuits on first error.
89    fn dispatch(&self, exchange: Exchange) -> Result<()> {
90        let subs = self.subscribers.lock().unwrap();
91        trace!(target: "allora::channel", channel_id = %self.id, subscribers = subs.len(), in_body = ?exchange.in_msg.body_text(), "direct dispatch start");
92        for (_idx, sub) in subs.iter().enumerate() {
93            let cloned = exchange.clone();
94            trace!(target: "allora::channel", channel_id = %self.id, subscriber_index = _idx, in_body = ?cloned.in_msg.body_text(), "direct dispatch to subscriber");
95            sub(cloned)?;
96        }
97        Ok(())
98    }
99    // ========================================================================
100    // Subscription management
101    // ========================================================================
102    /// Register a subscriber. Returns total subscriber count after insertion.
103    pub fn subscribe<F>(&self, f: F) -> usize
104    where
105        F: Fn(Exchange) -> Result<()> + Send + Sync + 'static,
106    {
107        let mut subs = self.subscribers.lock().unwrap();
108        subs.push(Box::new(f));
109        subs.len()
110    }
111}
112
113#[async_trait::async_trait]
114impl Channel for DirectChannel {
115    // --------------------------------------------------------------------
116    // Identity & metadata
117    // --------------------------------------------------------------------
118    fn id(&self) -> &str {
119        &self.id
120    }
121    // --------------------------------------------------------------------
122    // Send operations
123    // --------------------------------------------------------------------
124    async fn send(&self, exchange: Exchange) -> Result<()> {
125        self.dispatch(exchange)
126    }
127    fn kind(&self) -> &'static str {
128        "direct"
129    }
130    fn as_any(&self) -> &dyn Any {
131        self
132    }
133}
134
135impl SubscribableChannel for DirectChannel {
136    fn subscribe<F>(&self, f: F) -> usize
137    where
138        F: Fn(Exchange) -> Result<()> + Send + Sync + 'static,
139    {
140        self.subscribe(f) // delegate to inherent method
141    }
142}