1use lingxia_platform::PlatformError;
2use lingxia_platform::traits::stream_decoder::{
3 AudioFrame, AudioStreamConfig, VideoFrame, VideoStreamConfig, VideoStreamDecoderHandle,
4};
5use serde_json::Value;
6use std::collections::HashMap;
7use std::fmt;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::{Arc, Mutex, OnceLock};
10
11#[derive(Debug, Clone)]
12pub struct StreamError {
13 message: String,
14}
15
16impl StreamError {
17 pub fn new(message: impl Into<String>) -> Self {
18 Self {
19 message: message.into(),
20 }
21 }
22}
23
24impl fmt::Display for StreamError {
25 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
26 write!(f, "{}", self.message)
27 }
28}
29
30impl std::error::Error for StreamError {}
31
32impl From<PlatformError> for StreamError {
33 fn from(err: PlatformError) -> Self {
34 StreamError::new(err.to_string())
35 }
36}
37
38pub trait StreamSession: Send + Sync {
39 fn stop(&self) -> Result<(), StreamError>;
40 fn pause(&self) -> Result<(), StreamError>;
41 fn resume(&self) -> Result<(), StreamError>;
42 fn seek(&self, position: f64) -> Result<(), StreamError>;
43}
44
45pub trait StreamProvider: Send + Sync {
46 fn name(&self) -> &'static str;
47 fn start(&self, params: Value, sink: FrameSink) -> Result<Box<dyn StreamSession>, StreamError>;
48
49 fn should_force_hard_switch(&self, _prev_params: Option<&Value>, _next_params: &Value) -> bool {
50 false
51 }
52}
53
54#[derive(Clone)]
55pub struct FrameSink {
56 decoder: Arc<dyn VideoStreamDecoderHandle>,
57 epoch_token: Option<Arc<AtomicU64>>,
58 epoch: u64,
59 stale_logged: Arc<std::sync::atomic::AtomicBool>,
60 component_id: Option<String>,
61 duration_reporter: Option<Arc<dyn Fn(u64) + Send + Sync>>,
62 ended_reporter: Option<Arc<dyn Fn() + Send + Sync>>,
63}
64
65impl FrameSink {
66 pub fn new(decoder: Box<dyn VideoStreamDecoderHandle>) -> Self {
67 Self {
68 decoder: decoder.into(),
69 epoch_token: None,
70 epoch: 0,
71 stale_logged: Arc::new(std::sync::atomic::AtomicBool::new(false)),
72 component_id: None,
73 duration_reporter: None,
74 ended_reporter: None,
75 }
76 }
77
78 pub fn from_arc(decoder: Arc<dyn VideoStreamDecoderHandle>) -> Self {
79 Self {
80 decoder,
81 epoch_token: None,
82 epoch: 0,
83 stale_logged: Arc::new(std::sync::atomic::AtomicBool::new(false)),
84 component_id: None,
85 duration_reporter: None,
86 ended_reporter: None,
87 }
88 }
89
90 pub fn from_arc_with_epoch(
91 decoder: Arc<dyn VideoStreamDecoderHandle>,
92 epoch_token: Arc<AtomicU64>,
93 epoch: u64,
94 ) -> Self {
95 Self {
96 decoder,
97 epoch_token: Some(epoch_token),
98 epoch,
99 stale_logged: Arc::new(std::sync::atomic::AtomicBool::new(false)),
100 component_id: None,
101 duration_reporter: None,
102 ended_reporter: None,
103 }
104 }
105
106 pub fn with_component_id(mut self, component_id: impl Into<String>) -> Self {
107 self.component_id = Some(component_id.into());
108 self
109 }
110
111 pub fn with_duration_reporter<F>(mut self, reporter: F) -> Self
112 where
113 F: Fn(u64) + Send + Sync + 'static,
114 {
115 self.duration_reporter = Some(Arc::new(reporter));
116 self
117 }
118
119 pub fn with_ended_reporter<F>(mut self, reporter: F) -> Self
120 where
121 F: Fn() + Send + Sync + 'static,
122 {
123 self.ended_reporter = Some(Arc::new(reporter));
124 self
125 }
126
127 pub fn component_id(&self) -> Option<&str> {
128 self.component_id.as_deref()
129 }
130
131 fn is_current(&self, op: &'static str) -> bool {
132 let Some(token) = &self.epoch_token else {
133 return true;
134 };
135 let current = token.load(Ordering::Relaxed);
136 if current == self.epoch {
137 return true;
138 }
139 if self
140 .stale_logged
141 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
142 .is_ok()
143 {
144 let component_id = self.component_id.as_deref().unwrap_or("-");
145 log::warn!(
146 "FrameSink dropped stale {}: component_id={} sink_epoch={} current_epoch={}",
147 op,
148 component_id,
149 self.epoch,
150 current
151 );
152 }
153 false
154 }
155
156 pub fn configure_video(&self, config: VideoStreamConfig) -> Result<(), StreamError> {
157 if !self.is_current("configure_video") {
158 return Ok(());
159 }
160 self.decoder
161 .configure_video(config)
162 .map_err(StreamError::from)
163 }
164
165 pub fn configure_audio(&self, config: AudioStreamConfig) -> Result<(), StreamError> {
166 if !self.is_current("configure_audio") {
167 return Ok(());
168 }
169 self.decoder
170 .configure_audio(config)
171 .map_err(StreamError::from)
172 }
173
174 pub fn push_video(&self, frame: VideoFrame) -> Result<(), StreamError> {
175 if !self.is_current("push_video") {
176 return Ok(());
177 }
178 self.decoder.push_video(frame).map_err(StreamError::from)
179 }
180
181 pub fn push_audio(&self, frame: AudioFrame) -> Result<(), StreamError> {
182 if !self.is_current("push_audio") {
183 return Ok(());
184 }
185 self.decoder.push_audio(frame).map_err(StreamError::from)
186 }
187
188 pub fn stop(&self) -> Result<(), StreamError> {
189 if !self.is_current("stop") {
190 return Ok(());
191 }
192 self.decoder.stop().map_err(StreamError::from)
193 }
194
195 pub fn report_duration_ms(&self, duration_ms: u64) {
196 if duration_ms == 0 {
197 return;
198 }
199 if !self.is_current("report_duration_ms") {
200 return;
201 }
202 if let Some(reporter) = &self.duration_reporter {
203 reporter(duration_ms);
204 }
205 }
206
207 pub fn report_ended(&self) {
208 if !self.is_current("report_ended") {
209 return;
210 }
211 if let Some(reporter) = &self.ended_reporter {
212 reporter();
213 }
214 }
215}
216
217type ProviderRegistry = HashMap<String, Arc<dyn StreamProvider>>;
218
219static STREAM_PROVIDERS: OnceLock<Mutex<ProviderRegistry>> = OnceLock::new();
220
221pub fn register_stream_provider(provider: Box<dyn StreamProvider>) {
222 let registry = STREAM_PROVIDERS.get_or_init(|| Mutex::new(HashMap::new()));
223 let mut providers = registry
224 .lock()
225 .expect("Stream provider registry mutex is poisoned");
226 providers.insert(provider.name().to_string(), provider.into());
227}
228
229pub fn get_stream_provider(name: &str) -> Option<Arc<dyn StreamProvider>> {
230 STREAM_PROVIDERS
231 .get()
232 .and_then(|registry| registry.lock().ok())
233 .and_then(|providers| providers.get(name).cloned())
234}
235
236type SeekCallback = Arc<dyn Fn(f64) -> bool + Send + Sync>;
237type SeekCallbackRegistry = HashMap<String, SeekCallback>;
238
239static STREAM_SEEK_CALLBACKS: OnceLock<Mutex<SeekCallbackRegistry>> = OnceLock::new();
240
241fn seek_callback_registry() -> &'static Mutex<SeekCallbackRegistry> {
242 STREAM_SEEK_CALLBACKS.get_or_init(|| Mutex::new(HashMap::new()))
243}
244
245pub fn register_stream_seek_callback<F>(component_id: &str, callback: F)
246where
247 F: Fn(f64) -> bool + Send + Sync + 'static,
248{
249 if let Ok(mut registry) = seek_callback_registry().lock() {
250 log::info!(
251 "[StreamSource] register_stream_seek_callback: component_id={}",
252 component_id
253 );
254 registry.insert(component_id.to_string(), Arc::new(callback));
255 }
256}
257
258pub fn unregister_stream_seek_callback(component_id: &str) {
259 if let Ok(mut registry) = seek_callback_registry().lock() {
260 registry.remove(component_id);
261 }
262}
263
264pub fn seek_stream_session(component_id: &str, position_seconds: f64) -> bool {
265 log::info!(
266 "[StreamSource] seek_stream_session: component_id={} position_seconds={}",
267 component_id,
268 position_seconds
269 );
270
271 let callback = {
272 let Ok(registry) = seek_callback_registry().lock() else {
273 log::warn!(
274 "[StreamSource] seek_stream_session: failed to lock registry for component_id={}",
275 component_id
276 );
277 return false;
278 };
279 let cb = registry.get(component_id).cloned();
280 if cb.is_none() {
281 log::warn!(
282 "[StreamSource] seek_stream_session: no callback found for component_id={}, registered_ids={:?}",
283 component_id,
284 registry.keys().collect::<Vec<_>>()
285 );
286 }
287 cb
288 };
289
290 if let Some(cb) = callback {
291 log::info!(
292 "[StreamSource] seek_stream_session: invoking callback for component_id={}",
293 component_id
294 );
295 return cb(position_seconds);
296 }
297 false
298}