use crate::{
define::IndexDataSender,
packet::{EsInfo, Video},
};
use anyhow::Result;
use cyberex::xtime::datatime::XTime;
use std::{
path::Path,
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
time::{Duration, Instant},
};
use tokio::task::JoinHandle;
#[derive(PartialEq, Eq)]
pub enum ElapsedState {
Timeout,
NoTimeout,
}
enum InsTime {
Instant(Instant),
ContentInstant(XTime),
}
pub struct DataInstant {
from: Option<InsTime>,
timeout: Duration,
}
impl DataInstant {
pub fn new(timeout: Duration) -> Self {
Self { from: None, timeout }
}
pub fn timeout(&self) -> Duration {
self.timeout
}
pub fn elapsed(&mut self, es_data: &EsInfo) -> ElapsedState {
let time_stamp: Option<u64> = match es_data {
EsInfo::Video(Video { time_stamp, .. }) if *time_stamp > 0 => Some(*time_stamp),
_ => None,
};
let from = self.from.get_or_insert_with(move || match time_stamp {
Some(time_stamp) => {
tracing::debug!("DataInstant use ContentInstant, probe by: {:?}", es_data);
InsTime::ContentInstant(XTime::from_timestamp_ms_nocheck(time_stamp))
},
None => {
tracing::debug!("DataInstant use Instant, probe by: {:?}", es_data);
InsTime::Instant(Instant::now())
},
});
match from {
InsTime::Instant(from) => {
if from.elapsed() >= self.timeout {
ElapsedState::Timeout
} else {
ElapsedState::NoTimeout
}
},
InsTime::ContentInstant(from) => match time_stamp {
Some(time_stamp) => {
if time_stamp - from.as_timestamp_ms() >= self.timeout.as_millis() as _ {
ElapsedState::Timeout
} else {
ElapsedState::NoTimeout
}
},
None => ElapsedState::NoTimeout,
},
}
}
}
pub fn spawn_idle(
sub: IndexDataSender,
timeout_idle: Duration,
protect_time: Duration,
is_idle: Arc<AtomicBool>,
) -> JoinHandle<()> {
tokio::spawn({
let scan_time = std::cmp::min(timeout_idle, protect_time) / 5;
let mut interval = tokio::time::interval(scan_time);
async move {
let mut start_time = Instant::now();
let mut have_relay = false;
loop {
interval.tick().await;
let cnt = sub.receiver_count();
let have_idle = if cnt == 0 {
if have_relay {
start_time.elapsed() >= timeout_idle
} else {
start_time.elapsed() >= protect_time
}
} else {
have_relay = true;
start_time = std::time::Instant::now();
false
};
is_idle.store(have_idle, Ordering::SeqCst);
if have_idle {
return;
}
}
}
})
}
pub(crate) async fn wait_file_ready(file_path: impl AsRef<Path>) -> Result<()> {
while !matches!(tokio::fs::try_exists(file_path.as_ref()).await, Ok(true)) {
tokio::time::sleep(Duration::from_millis(10)).await;
}
Ok(())
}