actr_runtime/inbound/
data_stream_registry.rs

1//! DataStreamRegistry - Fast path data stream registry
2
3use actr_protocol::{ActorResult, ActrId, DataStream};
4use dashmap::DashMap;
5use futures_util::future::BoxFuture;
6use std::sync::Arc;
7
8/// Stream chunk callback type
9///
10/// # Design Rationale
11/// Fast path is stream-based push, not RPC, so it doesn't need full Context:
12/// - Only passes sender ActrId (to know where data comes from)
13/// - Doesn't pass Context (avoids confusing RPC and Stream semantics)
14/// - If reverse signaling needed, user should send via OutboundGate
15pub type DataStreamCallback =
16    Arc<dyn Fn(DataStream, ActrId) -> BoxFuture<'static, ActorResult<()>> + Send + Sync>;
17
18/// DataStreamRegistry - Stream chunk callback manager
19///
20/// # Responsibilities
21/// - Receive DataStream from LatencyFirst Lane (stream-format data packets)
22/// - Maintain stream_id → callback mapping
23/// - Concurrently invoke user-registered data stream callbacks
24///
25/// # Typical Use Cases
26/// - Streaming RPC (server-side push streams)
27/// - Real-time collaborative editing (multi-user editing sync)
28/// - Game state streams (position updates, event streams)
29/// - Log streams, sensor data streams, metrics streams
30pub struct DataStreamRegistry {
31    /// Concurrent mapping of stream_id → callback function
32    callbacks: DashMap<String, DataStreamCallback>,
33}
34
35impl Default for DataStreamRegistry {
36    fn default() -> Self {
37        Self::new()
38    }
39}
40
41impl DataStreamRegistry {
42    pub fn new() -> Self {
43        Self {
44            callbacks: DashMap::new(),
45        }
46    }
47
48    /// Register stream callback
49    ///
50    /// # Arguments
51    /// - `stream_id`: stream identifier (must be globally unique)
52    /// - `callback`: data stream handler callback
53    pub fn register(&self, stream_id: String, callback: DataStreamCallback) {
54        self.callbacks.insert(stream_id.clone(), callback);
55        tracing::info!("ðŸ“Ą Registered data stream handler: {}", stream_id);
56    }
57
58    /// Unregister stream callback
59    ///
60    /// # Arguments
61    /// - `stream_id`: stream identifier to unregister
62    pub fn unregister(&self, stream_id: &str) {
63        self.callbacks.remove(stream_id);
64        tracing::info!("ðŸšŦ Unregistered data stream handler: {}", stream_id);
65    }
66
67    /// Dispatch data stream to callback (concurrent execution)
68    ///
69    /// # Arguments
70    /// - `chunk`: data stream
71    /// - `sender_id`: sender ActrId
72    ///
73    /// # Performance
74    /// - Direct callback invocation, no queueing overhead
75    /// - Latency: ~10Ξs
76    /// - Concurrent execution, doesn't block other streams
77    pub async fn dispatch(&self, chunk: DataStream, sender_id: ActrId) {
78        let start = std::time::Instant::now();
79
80        if let Some(callback) = self.callbacks.get(&chunk.stream_id) {
81            let callback = callback.clone();
82            tokio::spawn(async move {
83                if let Err(e) = callback(chunk, sender_id).await {
84                    tracing::error!("❌ Stream chunk callback error: {:?}", e);
85                }
86            });
87
88            tracing::debug!("🚀 Dispatched data stream in {:?}", start.elapsed());
89        } else {
90            tracing::warn!("⚠ïļ No callback registered for stream: {}", chunk.stream_id);
91        }
92    }
93
94    /// Get active stream count
95    pub fn active_streams(&self) -> usize {
96        self.callbacks.len()
97    }
98}