actr_runtime/inbound/data_stream_registry.rs
1//! DataStreamRegistry - Fast path data stream registry
2
3use actr_protocol::{ActorResult, ActrId, DataStream};
4use dashmap::DashMap;
5use futures_util::future::BoxFuture;
6use std::sync::Arc;
7
8/// Stream chunk callback type
9///
10/// # Design Rationale
11/// Fast path is stream-based push, not RPC, so it doesn't need full Context:
12/// - Only passes sender ActrId (to know where data comes from)
13/// - Doesn't pass Context (avoids confusing RPC and Stream semantics)
14/// - If reverse signaling needed, user should send via OutboundGate
15pub type DataStreamCallback =
16 Arc<dyn Fn(DataStream, ActrId) -> BoxFuture<'static, ActorResult<()>> + Send + Sync>;
17
18/// DataStreamRegistry - Stream chunk callback manager
19///
20/// # Responsibilities
21/// - Receive DataStream from LatencyFirst Lane (stream-format data packets)
22/// - Maintain stream_id â callback mapping
23/// - Concurrently invoke user-registered data stream callbacks
24///
25/// # Typical Use Cases
26/// - Streaming RPC (server-side push streams)
27/// - Real-time collaborative editing (multi-user editing sync)
28/// - Game state streams (position updates, event streams)
29/// - Log streams, sensor data streams, metrics streams
30pub struct DataStreamRegistry {
31 /// Concurrent mapping of stream_id â callback function
32 callbacks: DashMap<String, DataStreamCallback>,
33}
34
35impl Default for DataStreamRegistry {
36 fn default() -> Self {
37 Self::new()
38 }
39}
40
41impl DataStreamRegistry {
42 pub fn new() -> Self {
43 Self {
44 callbacks: DashMap::new(),
45 }
46 }
47
48 /// Register stream callback
49 ///
50 /// # Arguments
51 /// - `stream_id`: stream identifier (must be globally unique)
52 /// - `callback`: data stream handler callback
53 pub fn register(&self, stream_id: String, callback: DataStreamCallback) {
54 self.callbacks.insert(stream_id.clone(), callback);
55 tracing::info!("ðĄ Registered data stream handler: {}", stream_id);
56 }
57
58 /// Unregister stream callback
59 ///
60 /// # Arguments
61 /// - `stream_id`: stream identifier to unregister
62 pub fn unregister(&self, stream_id: &str) {
63 self.callbacks.remove(stream_id);
64 tracing::info!("ðŦ Unregistered data stream handler: {}", stream_id);
65 }
66
67 /// Dispatch data stream to callback (concurrent execution)
68 ///
69 /// # Arguments
70 /// - `chunk`: data stream
71 /// - `sender_id`: sender ActrId
72 ///
73 /// # Performance
74 /// - Direct callback invocation, no queueing overhead
75 /// - Latency: ~10Ξs
76 /// - Concurrent execution, doesn't block other streams
77 pub async fn dispatch(&self, chunk: DataStream, sender_id: ActrId) {
78 let start = std::time::Instant::now();
79
80 if let Some(callback) = self.callbacks.get(&chunk.stream_id) {
81 let callback = callback.clone();
82 tokio::spawn(async move {
83 if let Err(e) = callback(chunk, sender_id).await {
84 tracing::error!("â Stream chunk callback error: {:?}", e);
85 }
86 });
87
88 tracing::debug!("ð Dispatched data stream in {:?}", start.elapsed());
89 } else {
90 tracing::warn!("â ïļ No callback registered for stream: {}", chunk.stream_id);
91 }
92 }
93
94 /// Get active stream count
95 pub fn active_streams(&self) -> usize {
96 self.callbacks.len()
97 }
98}