actr_runtime/inbound/
media_frame_registry.rs

1//! MediaFrameRegistry - Fast path media frame registry (WebRTC native)
2
3use actr_protocol::{ActorResult, ActrId};
4use dashmap::DashMap;
5use futures_util::future::BoxFuture;
6use std::sync::Arc;
7
8// Use MediaSample from framework (dependency inversion)
9use actr_framework::MediaSample;
10
11/// MediaTrack callback type
12///
13/// # Design Rationale
14/// - Uses WebRTC native types (no protobuf overhead)
15/// - Receives MediaSample directly from RTCTrackRemote
16/// - Sender ActrId provided for source identification
17/// - Fast path: direct callback, no queue
18pub type MediaTrackCallback =
19    Arc<dyn Fn(MediaSample, ActrId) -> BoxFuture<'static, ActorResult<()>> + Send + Sync>;
20
21/// MediaFrameRegistry - WebRTC native media track callback manager
22///
23/// # Architecture
24///
25/// MediaFrameRegistry works with WebRTC PeerConnection to receive native media:
26///
27/// ```text
28/// WebRTC PeerConnection
29///   └─ RTCTrackRemote (native RTP channel)
30///       └─ on_track callback
31///           └─ MediaFrameRegistry::dispatch()
32///               └─ User callback (MediaSample, sender_id)
33/// ```
34///
35/// # Key Points
36/// - **No protobuf**: Uses WebRTC native sample data
37/// - **No DataChannel**: Media goes through RTCTrackRemote (RTP)
38/// - **Zero serialization**: Direct sample bytes from RTP packets
39/// - **Low latency**: ~1-2ms from network to callback
40///
41/// # Typical Use Cases
42/// - Real-time audio/video calls
43/// - Screen sharing
44/// - Audio/video recording
45/// - Media transcoding
46pub struct MediaFrameRegistry {
47    /// Concurrent mapping of track_id → callback function
48    callbacks: DashMap<String, MediaTrackCallback>,
49}
50
51impl Default for MediaFrameRegistry {
52    fn default() -> Self {
53        Self::new()
54    }
55}
56
57impl MediaFrameRegistry {
58    pub fn new() -> Self {
59        Self {
60            callbacks: DashMap::new(),
61        }
62    }
63
64    /// Register media track callback
65    ///
66    /// # Arguments
67    /// - `track_id`: Track identifier (must be globally unique)
68    /// - `callback`: Media sample handler callback
69    ///
70    /// # Example
71    ///
72    /// ```rust,ignore
73    /// registry.register("video-track-1", Arc::new(|sample, sender| {
74    ///     Box::pin(async move {
75    ///         println!("Received {} bytes from {:?}", sample.data.len(), sender);
76    ///         // Decode and render video frame...
77    ///         Ok(())
78    ///     })
79    /// }));
80    /// ```
81    pub fn register(&self, track_id: String, callback: MediaTrackCallback) {
82        self.callbacks.insert(track_id.clone(), callback);
83        tracing::info!("🎬 Registered MediaTrack: {}", track_id);
84    }
85
86    /// Unregister media track callback
87    ///
88    /// # Arguments
89    /// - `track_id`: Track identifier to unregister
90    pub fn unregister(&self, track_id: &str) {
91        self.callbacks.remove(track_id);
92        tracing::info!("🚫 Unregistered MediaTrack: {}", track_id);
93    }
94
95    /// Dispatch media sample to callback (concurrent execution)
96    ///
97    /// Called by WebRTC on_track handler when a media sample arrives.
98    ///
99    /// # Arguments
100    /// - `track_id`: Track identifier
101    /// - `sample`: Media sample from RTCTrackRemote
102    /// - `sender_id`: Sender ActrId
103    ///
104    /// # Performance
105    /// - Direct callback invocation, no queueing overhead
106    /// - Latency: ~1-2μs dispatch time (excluding callback execution)
107    /// - Concurrent execution, doesn't block other tracks
108    pub async fn dispatch(&self, track_id: &str, sample: MediaSample, sender_id: ActrId) {
109        let start = std::time::Instant::now();
110
111        if let Some(callback) = self.callbacks.get(track_id) {
112            let callback = callback.clone();
113            tokio::spawn(async move {
114                if let Err(e) = callback(sample, sender_id).await {
115                    tracing::error!("❌ MediaTrack callback error: {:?}", e);
116                }
117            });
118
119            tracing::debug!("🎬 Dispatched media sample in {:?}", start.elapsed());
120        } else {
121            tracing::warn!("⚠️ No callback registered for track: {}", track_id);
122        }
123    }
124
125    /// Get active track count
126    pub fn active_tracks(&self) -> usize {
127        self.callbacks.len()
128    }
129}