Skip to main content

lingxia_media/
video.rs

1use lingxia_platform::PlatformError;
2use lingxia_platform::traits::stream_decoder::{
3    AudioFrame, AudioStreamConfig, VideoFrame, VideoStreamConfig, VideoStreamDecoderHandle,
4};
5use serde_json::Value;
6use std::collections::HashMap;
7use std::fmt;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::{Arc, Mutex, OnceLock};
10
11#[derive(Debug, Clone)]
12pub struct StreamError {
13    message: String,
14}
15
16impl StreamError {
17    pub fn new(message: impl Into<String>) -> Self {
18        Self {
19            message: message.into(),
20        }
21    }
22}
23
24impl fmt::Display for StreamError {
25    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
26        write!(f, "{}", self.message)
27    }
28}
29
30impl std::error::Error for StreamError {}
31
32impl From<PlatformError> for StreamError {
33    fn from(err: PlatformError) -> Self {
34        StreamError::new(err.to_string())
35    }
36}
37
38pub trait StreamSession: Send + Sync {
39    fn stop(&self) -> Result<(), StreamError>;
40    fn pause(&self) -> Result<(), StreamError>;
41    fn resume(&self) -> Result<(), StreamError>;
42    fn seek(&self, position: f64) -> Result<(), StreamError>;
43}
44
45pub trait StreamProvider: Send + Sync {
46    fn name(&self) -> &'static str;
47    fn start(&self, params: Value, sink: FrameSink) -> Result<Box<dyn StreamSession>, StreamError>;
48
49    fn should_force_hard_switch(&self, _prev_params: Option<&Value>, _next_params: &Value) -> bool {
50        false
51    }
52}
53
54#[derive(Clone)]
55pub struct FrameSink {
56    decoder: Arc<dyn VideoStreamDecoderHandle>,
57    epoch_token: Option<Arc<AtomicU64>>,
58    epoch: u64,
59    stale_logged: Arc<std::sync::atomic::AtomicBool>,
60    component_id: Option<String>,
61    duration_reporter: Option<Arc<dyn Fn(u64) + Send + Sync>>,
62    ended_reporter: Option<Arc<dyn Fn() + Send + Sync>>,
63}
64
65impl FrameSink {
66    pub fn new(decoder: Box<dyn VideoStreamDecoderHandle>) -> Self {
67        Self {
68            decoder: decoder.into(),
69            epoch_token: None,
70            epoch: 0,
71            stale_logged: Arc::new(std::sync::atomic::AtomicBool::new(false)),
72            component_id: None,
73            duration_reporter: None,
74            ended_reporter: None,
75        }
76    }
77
78    pub fn from_arc(decoder: Arc<dyn VideoStreamDecoderHandle>) -> Self {
79        Self {
80            decoder,
81            epoch_token: None,
82            epoch: 0,
83            stale_logged: Arc::new(std::sync::atomic::AtomicBool::new(false)),
84            component_id: None,
85            duration_reporter: None,
86            ended_reporter: None,
87        }
88    }
89
90    pub fn from_arc_with_epoch(
91        decoder: Arc<dyn VideoStreamDecoderHandle>,
92        epoch_token: Arc<AtomicU64>,
93        epoch: u64,
94    ) -> Self {
95        Self {
96            decoder,
97            epoch_token: Some(epoch_token),
98            epoch,
99            stale_logged: Arc::new(std::sync::atomic::AtomicBool::new(false)),
100            component_id: None,
101            duration_reporter: None,
102            ended_reporter: None,
103        }
104    }
105
106    pub fn with_component_id(mut self, component_id: impl Into<String>) -> Self {
107        self.component_id = Some(component_id.into());
108        self
109    }
110
111    pub fn with_duration_reporter<F>(mut self, reporter: F) -> Self
112    where
113        F: Fn(u64) + Send + Sync + 'static,
114    {
115        self.duration_reporter = Some(Arc::new(reporter));
116        self
117    }
118
119    pub fn with_ended_reporter<F>(mut self, reporter: F) -> Self
120    where
121        F: Fn() + Send + Sync + 'static,
122    {
123        self.ended_reporter = Some(Arc::new(reporter));
124        self
125    }
126
127    pub fn component_id(&self) -> Option<&str> {
128        self.component_id.as_deref()
129    }
130
131    fn is_current(&self, op: &'static str) -> bool {
132        let Some(token) = &self.epoch_token else {
133            return true;
134        };
135        let current = token.load(Ordering::Relaxed);
136        if current == self.epoch {
137            return true;
138        }
139        if self
140            .stale_logged
141            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
142            .is_ok()
143        {
144            let component_id = self.component_id.as_deref().unwrap_or("-");
145            log::warn!(
146                "FrameSink dropped stale {}: component_id={} sink_epoch={} current_epoch={}",
147                op,
148                component_id,
149                self.epoch,
150                current
151            );
152        }
153        false
154    }
155
156    pub fn configure_video(&self, config: VideoStreamConfig) -> Result<(), StreamError> {
157        if !self.is_current("configure_video") {
158            return Ok(());
159        }
160        self.decoder
161            .configure_video(config)
162            .map_err(StreamError::from)
163    }
164
165    pub fn configure_audio(&self, config: AudioStreamConfig) -> Result<(), StreamError> {
166        if !self.is_current("configure_audio") {
167            return Ok(());
168        }
169        self.decoder
170            .configure_audio(config)
171            .map_err(StreamError::from)
172    }
173
174    pub fn push_video(&self, frame: VideoFrame) -> Result<(), StreamError> {
175        if !self.is_current("push_video") {
176            return Ok(());
177        }
178        self.decoder.push_video(frame).map_err(StreamError::from)
179    }
180
181    pub fn push_audio(&self, frame: AudioFrame) -> Result<(), StreamError> {
182        if !self.is_current("push_audio") {
183            return Ok(());
184        }
185        self.decoder.push_audio(frame).map_err(StreamError::from)
186    }
187
188    pub fn stop(&self) -> Result<(), StreamError> {
189        if !self.is_current("stop") {
190            return Ok(());
191        }
192        self.decoder.stop().map_err(StreamError::from)
193    }
194
195    pub fn report_duration_ms(&self, duration_ms: u64) {
196        if duration_ms == 0 {
197            return;
198        }
199        if !self.is_current("report_duration_ms") {
200            return;
201        }
202        if let Some(reporter) = &self.duration_reporter {
203            reporter(duration_ms);
204        }
205    }
206
207    pub fn report_ended(&self) {
208        if !self.is_current("report_ended") {
209            return;
210        }
211        if let Some(reporter) = &self.ended_reporter {
212            reporter();
213        }
214    }
215}
216
217type ProviderRegistry = HashMap<String, Arc<dyn StreamProvider>>;
218
219static STREAM_PROVIDERS: OnceLock<Mutex<ProviderRegistry>> = OnceLock::new();
220
221pub fn register_stream_provider(provider: Box<dyn StreamProvider>) {
222    let registry = STREAM_PROVIDERS.get_or_init(|| Mutex::new(HashMap::new()));
223    let mut providers = registry
224        .lock()
225        .expect("Stream provider registry mutex is poisoned");
226    providers.insert(provider.name().to_string(), provider.into());
227}
228
229pub fn get_stream_provider(name: &str) -> Option<Arc<dyn StreamProvider>> {
230    STREAM_PROVIDERS
231        .get()
232        .and_then(|registry| registry.lock().ok())
233        .and_then(|providers| providers.get(name).cloned())
234}
235
236type SeekCallback = Arc<dyn Fn(f64) -> bool + Send + Sync>;
237type SeekCallbackRegistry = HashMap<String, SeekCallback>;
238
239static STREAM_SEEK_CALLBACKS: OnceLock<Mutex<SeekCallbackRegistry>> = OnceLock::new();
240
241fn seek_callback_registry() -> &'static Mutex<SeekCallbackRegistry> {
242    STREAM_SEEK_CALLBACKS.get_or_init(|| Mutex::new(HashMap::new()))
243}
244
245pub fn register_stream_seek_callback<F>(component_id: &str, callback: F)
246where
247    F: Fn(f64) -> bool + Send + Sync + 'static,
248{
249    if let Ok(mut registry) = seek_callback_registry().lock() {
250        log::info!(
251            "[StreamSource] register_stream_seek_callback: component_id={}",
252            component_id
253        );
254        registry.insert(component_id.to_string(), Arc::new(callback));
255    }
256}
257
258pub fn unregister_stream_seek_callback(component_id: &str) {
259    if let Ok(mut registry) = seek_callback_registry().lock() {
260        registry.remove(component_id);
261    }
262}
263
264pub fn seek_stream_session(component_id: &str, position_seconds: f64) -> bool {
265    log::info!(
266        "[StreamSource] seek_stream_session: component_id={} position_seconds={}",
267        component_id,
268        position_seconds
269    );
270
271    let callback = {
272        let Ok(registry) = seek_callback_registry().lock() else {
273            log::warn!(
274                "[StreamSource] seek_stream_session: failed to lock registry for component_id={}",
275                component_id
276            );
277            return false;
278        };
279        let cb = registry.get(component_id).cloned();
280        if cb.is_none() {
281            log::warn!(
282                "[StreamSource] seek_stream_session: no callback found for component_id={}, registered_ids={:?}",
283                component_id,
284                registry.keys().collect::<Vec<_>>()
285            );
286        }
287        cb
288    };
289
290    if let Some(cb) = callback {
291        log::info!(
292            "[StreamSource] seek_stream_session: invoking callback for component_id={}",
293            component_id
294        );
295        return cb(position_seconds);
296    }
297    false
298}