use super::context::VideoContextSharedState;
use crate::i18n::{js_error_from_platform_error, js_internal_error, js_resource_not_found_error};
use lingxia_media::{FrameSink, get_stream_provider};
use lingxia_platform::traits::stream_decoder::{
VideoStreamDecoderHandle, VideoStreamDecoderManager,
};
use lingxia_platform::traits::video_player::{VideoPlayerCommand, VideoPlayerManager};
use log::{info, warn};
use rong::RongJSError;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::thread;
fn ensure_platform_stream_play(shared: &Arc<VideoContextSharedState>, component_id: &str) {
if shared.stream_paused.load(Ordering::Relaxed) {
return;
}
if !shared.play_requested.load(Ordering::Relaxed)
&& !shared.platform_playing.load(Ordering::Relaxed)
{
return;
}
let Ok(handle) = shared.runtime.bind_player(component_id) else {
return;
};
if let Err(err) = handle.dispatch(VideoPlayerCommand::Play) {
warn!(
"stream ensure play failed component_id={} err={}",
component_id, err
);
}
}
fn ensure_stream_decoder_shared(
shared: &Arc<VideoContextSharedState>,
component_id: &str,
) -> Result<Arc<dyn VideoStreamDecoderHandle>, RongJSError> {
let mut guard = shared
.stream_decoder
.lock()
.map_err(|_| js_internal_error("Stream decoder lock poisoned"))?;
if let Some(decoder) = guard.as_ref() {
return Ok(decoder.clone());
}
let decoder = shared
.runtime
.create_stream_decoder(component_id)
.map_err(|e| js_error_from_platform_error(&e))?;
let decoder: Arc<dyn VideoStreamDecoderHandle> = decoder.into();
*guard = Some(decoder.clone());
Ok(decoder)
}
pub(super) fn reset_decoder_soft_shared(
shared: &Arc<VideoContextSharedState>,
component_id: &str,
) -> Result<(), RongJSError> {
let decoder = ensure_stream_decoder_shared(shared, component_id)?;
decoder
.reset_stream(false)
.map_err(|e| js_error_from_platform_error(&e))
}
pub(super) fn reset_decoder_soft_if_present_shared(
shared: &Arc<VideoContextSharedState>,
_component_id: &str,
) -> Result<(), RongJSError> {
let decoder = shared
.stream_decoder
.lock()
.map_err(|_| js_internal_error("Stream decoder lock poisoned"))?
.clone();
let Some(decoder) = decoder else {
return Ok(());
};
decoder
.reset_stream(false)
.map_err(|e| js_error_from_platform_error(&e))
}
pub(super) fn stop_stream_session_async_shared(shared: &Arc<VideoContextSharedState>) {
let session = {
let mut guard = match shared.stream_session.lock() {
Ok(guard) => guard,
Err(_) => return,
};
guard.take()
};
if let Some(session) = session {
thread::spawn(move || {
if let Err(err) = session.stop() {
warn!("Failed to stop stream session: {}", err);
}
});
}
}
pub(super) fn resume_stream_session_shared(
shared: &Arc<VideoContextSharedState>,
component_id: &str,
) -> Result<(), RongJSError> {
if shared
.stream_starting
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
info!(
"resume_stream_session already in flight component_id={}",
component_id
);
return Ok(());
}
struct Reset<'a>(&'a AtomicBool);
impl Drop for Reset<'_> {
fn drop(&mut self) {
self.0.store(false, Ordering::Release);
}
}
let _reset = Reset(&shared.stream_starting);
let source = {
let guard = shared
.last_stream_source
.lock()
.map_err(|_| js_internal_error("Stream source lock poisoned"))?;
guard.clone()
};
let Some(source) = source else {
return Ok(());
};
let provider = get_stream_provider(&source.provider).ok_or_else(|| {
js_resource_not_found_error(format!("Stream provider not found: {}", source.provider))
})?;
let epoch = shared.stream_epoch.fetch_add(1, Ordering::Relaxed) + 1;
let decoder = ensure_stream_decoder_shared(shared, component_id)?;
let shared_for_duration = shared.clone();
let component_id_for_duration = component_id.to_string();
let expected_epoch_for_duration = epoch;
let last_duration_ms = Arc::new(AtomicU64::new(0));
let last_duration_ms_for_reporter = last_duration_ms.clone();
let shared_for_ended = shared.clone();
let component_id_for_ended = component_id.to_string();
let expected_epoch_for_ended = epoch;
let ended_reported = Arc::new(AtomicBool::new(false));
let ended_reported_for_reporter = ended_reported.clone();
let sink = FrameSink::from_arc_with_epoch(decoder, shared.stream_epoch.clone(), epoch)
.with_component_id(component_id.to_string())
.with_duration_reporter(move |duration_ms| {
if shared_for_duration.stream_live.load(Ordering::Relaxed) {
return;
}
if shared_for_duration
.stream_duration_override_ms
.load(Ordering::Relaxed)
> 0
{
return;
}
if shared_for_duration.stream_epoch.load(Ordering::Relaxed)
!= expected_epoch_for_duration
{
return;
}
if last_duration_ms_for_reporter.load(Ordering::Relaxed) == duration_ms {
return;
}
let Ok(handle) = shared_for_duration
.runtime
.bind_player(&component_id_for_duration)
else {
return;
};
if handle
.dispatch(VideoPlayerCommand::SetDuration {
duration: (duration_ms as f64) / 1000.0,
})
.is_ok()
{
info!(
"stream duration reported component_id={} duration_ms={}",
component_id_for_duration, duration_ms
);
last_duration_ms_for_reporter.store(duration_ms, Ordering::Relaxed);
}
})
.with_ended_reporter(move || {
if shared_for_ended.stream_live.load(Ordering::Relaxed) {
return;
}
if shared_for_ended.stream_epoch.load(Ordering::Relaxed) != expected_epoch_for_ended {
return;
}
if ended_reported_for_reporter.swap(true, Ordering::Relaxed) {
return;
}
let Ok(handle) = shared_for_ended
.runtime
.bind_player(&component_id_for_ended)
else {
return;
};
if handle.dispatch(VideoPlayerCommand::NotifyEnded).is_ok() {
info!(
"stream ended reported component_id={}",
component_id_for_ended
);
}
});
let session = provider
.start(source.params, sink)
.map_err(|err| js_internal_error(format!("Stream provider start failed: {}", err)))?;
let mut guard = shared
.stream_session
.lock()
.map_err(|_| js_internal_error("Stream session lock poisoned"))?;
*guard = Some(session);
ensure_platform_stream_play(shared, component_id);
Ok(())
}
pub(super) fn seek_stream_session_async_shared(
shared: &Arc<VideoContextSharedState>,
component_id: &str,
position: f64,
) {
let _ = seek_stream_session_sync_shared(shared, component_id, position);
}
pub(super) fn seek_stream_session_sync_shared(
shared: &Arc<VideoContextSharedState>,
component_id: &str,
position: f64,
) -> bool {
if !position.is_finite() || position < 0.0 {
warn!(
"stream session seek rejected: invalid position={} component_id={}",
position, component_id
);
return false;
}
let seek_result = {
let guard = match shared.stream_session.lock() {
Ok(guard) => guard,
Err(_) => return false,
};
match guard.as_ref() {
Some(session) => session.seek(position),
None => return false,
}
};
if let Err(err) = seek_result {
warn!(
"stream session seek failed component_id={} position={} err={}",
component_id, position, err
);
return false;
}
shared
.last_stream_position_ms
.store((position * 1000.0).round() as u64, Ordering::Relaxed);
if let Ok(decoder_lock) = shared.stream_decoder.lock() {
if let Some(decoder) = decoder_lock.as_ref() {
let _ = decoder.flush();
}
}
#[cfg(target_env = "ohos")]
{
let position_ms = (position * 1000.0) as i64;
lingxia_platform::harmony::video_player::notify_video_player_event(
component_id,
"seekDone",
Some(&position_ms.to_string()),
);
}
true
}
pub(super) fn pause_or_resume_stream_session_shared(
shared: &Arc<VideoContextSharedState>,
resume: bool,
component_id: &str,
) {
let mut guard = match shared.stream_session.lock() {
Ok(guard) => guard,
Err(_) => return,
};
if let Some(session) = guard.as_mut() {
let result = if resume {
session.resume()
} else {
session.pause()
};
if let Err(err) = result {
warn!(
"{} stream session failed component_id={} err={}",
if resume { "resume" } else { "pause" },
component_id,
err
);
} else if resume {
}
}
}
pub(super) fn resume_or_seek_stream_session_shared(
shared: &Arc<VideoContextSharedState>,
component_id: &str,
) {
let position_ms = shared.last_stream_position_ms.swap(0, Ordering::Relaxed);
if position_ms > 0 {
let position = (position_ms as f64) / 1000.0;
seek_stream_session_async_shared(shared, component_id, position);
return;
}
pause_or_resume_stream_session_shared(shared, true, component_id);
}