actr_runtime/inbound/media_frame_registry.rs
1//! MediaFrameRegistry - Fast path media frame registry (WebRTC native)
2
3use actr_protocol::{ActorResult, ActrId};
4use dashmap::DashMap;
5use futures_util::future::BoxFuture;
6use std::sync::Arc;
7
8// Use MediaSample from framework (dependency inversion)
9use actr_framework::MediaSample;
10
11/// MediaTrack callback type
12///
13/// # Design Rationale
14/// - Uses WebRTC native types (no protobuf overhead)
15/// - Receives MediaSample directly from RTCTrackRemote
16/// - Sender ActrId provided for source identification
17/// - Fast path: direct callback, no queue
18pub type MediaTrackCallback =
19 Arc<dyn Fn(MediaSample, ActrId) -> BoxFuture<'static, ActorResult<()>> + Send + Sync>;
20
21/// MediaFrameRegistry - WebRTC native media track callback manager
22///
23/// # Architecture
24///
25/// MediaFrameRegistry works with WebRTC PeerConnection to receive native media:
26///
27/// ```text
28/// WebRTC PeerConnection
29/// └─ RTCTrackRemote (native RTP channel)
30/// └─ on_track callback
31/// └─ MediaFrameRegistry::dispatch()
32/// └─ User callback (MediaSample, sender_id)
33/// ```
34///
35/// # Key Points
36/// - **No protobuf**: Uses WebRTC native sample data
37/// - **No DataChannel**: Media goes through RTCTrackRemote (RTP)
38/// - **Zero serialization**: Direct sample bytes from RTP packets
39/// - **Low latency**: ~1-2ms from network to callback
40///
41/// # Typical Use Cases
42/// - Real-time audio/video calls
43/// - Screen sharing
44/// - Audio/video recording
45/// - Media transcoding
46pub struct MediaFrameRegistry {
47 /// Concurrent mapping of track_id → callback function
48 callbacks: DashMap<String, MediaTrackCallback>,
49}
50
51impl Default for MediaFrameRegistry {
52 fn default() -> Self {
53 Self::new()
54 }
55}
56
57impl MediaFrameRegistry {
58 pub fn new() -> Self {
59 Self {
60 callbacks: DashMap::new(),
61 }
62 }
63
64 /// Register media track callback
65 ///
66 /// # Arguments
67 /// - `track_id`: Track identifier (must be globally unique)
68 /// - `callback`: Media sample handler callback
69 ///
70 /// # Example
71 ///
72 /// ```rust,ignore
73 /// registry.register("video-track-1", Arc::new(|sample, sender| {
74 /// Box::pin(async move {
75 /// println!("Received {} bytes from {:?}", sample.data.len(), sender);
76 /// // Decode and render video frame...
77 /// Ok(())
78 /// })
79 /// }));
80 /// ```
81 pub fn register(&self, track_id: String, callback: MediaTrackCallback) {
82 self.callbacks.insert(track_id.clone(), callback);
83 tracing::info!("🎬 Registered MediaTrack: {}", track_id);
84 }
85
86 /// Unregister media track callback
87 ///
88 /// # Arguments
89 /// - `track_id`: Track identifier to unregister
90 pub fn unregister(&self, track_id: &str) {
91 self.callbacks.remove(track_id);
92 tracing::info!("🚫 Unregistered MediaTrack: {}", track_id);
93 }
94
95 /// Dispatch media sample to callback (concurrent execution)
96 ///
97 /// Called by WebRTC on_track handler when a media sample arrives.
98 ///
99 /// # Arguments
100 /// - `track_id`: Track identifier
101 /// - `sample`: Media sample from RTCTrackRemote
102 /// - `sender_id`: Sender ActrId
103 ///
104 /// # Performance
105 /// - Direct callback invocation, no queueing overhead
106 /// - Latency: ~1-2μs dispatch time (excluding callback execution)
107 /// - Concurrent execution, doesn't block other tracks
108 pub async fn dispatch(&self, track_id: &str, sample: MediaSample, sender_id: ActrId) {
109 let start = std::time::Instant::now();
110
111 if let Some(callback) = self.callbacks.get(track_id) {
112 let callback = callback.clone();
113 tokio::spawn(async move {
114 if let Err(e) = callback(sample, sender_id).await {
115 tracing::error!("❌ MediaTrack callback error: {:?}", e);
116 }
117 });
118
119 tracing::debug!("🎬 Dispatched media sample in {:?}", start.elapsed());
120 } else {
121 tracing::warn!("⚠️ No callback registered for track: {}", track_id);
122 }
123 }
124
125 /// Get active track count
126 pub fn active_tracks(&self) -> usize {
127 self.callbacks.len()
128 }
129}