use crate::{define::*, packet::*, relay::SourceRelay};
use anyhow::{Result, bail};
use parking_lot::RwLock;
use std::{
self, cmp,
collections::HashMap,
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
time::Duration,
};
use tokio::{
sync::{
broadcast,
mpsc::{UnboundedSender, unbounded_channel},
},
time::{self},
};
use tokio_util::sync::CancellationToken;
use tracing::Instrument;
use crate::source_ele::SourceElement;
pub struct Source {
sources: Arc<RwLock<HashMap<SchemeKey, Arc<SourceElement>>>>,
config: SSConfig,
is_idles: Arc<RwLock<Vec<Arc<AtomicBool>>>>,
data_tx: UnboundedSender<PacketEs>,
data_bus_tx: broadcast::Sender<PacketEs>,
cancel_token: CancellationToken,
}
impl Source {
const ACCESS_IDLE_CNT: usize = 1;
pub async fn new(event_tx: SourceEventSender, source_id: &str, config: &SSConfig) -> Result<Self> {
let is_idles: Arc<RwLock<Vec<Arc<AtomicBool>>>> = Arc::new(RwLock::new(Vec::new()));
let cancel_token = CancellationToken::new();
tokio::spawn({
let event_tx = event_tx.clone();
let source_id = source_id.to_owned();
let config = config.clone();
let is_idles = is_idles.clone();
let scan_time = cmp::min(
config.source_idleTime.into_inner(),
config.source_protectTime.into_inner(),
) / 10;
let mut interval = tokio::time::interval(scan_time);
let task = async move {
loop {
interval.tick().await;
if is_idles.read().is_empty() {
continue;
}
let all_idle = is_idles.read().iter().all(|e| e.load(Ordering::SeqCst));
if all_idle {
let _ = event_tx.send(SourceEvent::Idles(source_id.to_owned()));
break;
}
}
};
let cancel_token = cancel_token.child_token();
async move { cancel_token.run_until_cancelled(task).await }
});
let (data_tx, mut data_rx) = unbounded_channel::<PacketEs>();
tokio::spawn(
{
let data_tx = data_tx.downgrade();
let event_tx = event_tx.clone();
let source_id = source_id.to_owned();
let timeout_idle = config.source_idleTime.into_inner();
let protect_time = config.source_protectTime.into_inner();
let scan_time = cmp::min(timeout_idle, protect_time) / 5;
let mut interval = time::interval(scan_time);
let mut start_time = time::Instant::now();
let mut have_access = false;
let cancel_token = cancel_token.child_token();
let task = async move {
loop {
interval.tick().await;
let cnt = data_tx.strong_count();
let have_idle = if cnt == Source::ACCESS_IDLE_CNT {
if have_access {
start_time.elapsed() >= timeout_idle
} else {
start_time.elapsed() >= protect_time
}
} else {
have_access = true;
start_time = time::Instant::now();
false
};
if have_idle {
let _ = event_tx.send(SourceEvent::PushIdle {
source_id,
hint: format!("Source input-data subscribe idle, have_access: {}", have_access),
});
return;
}
}
};
async move { cancel_token.run_until_cancelled(task).await }
}
.instrument(tracing::info_span!("IdleTask", source_id = source_id)),
);
let source_jitter_max = *config.source_jitter_max;
let (data_bus_tx, _) = broadcast::channel(source_jitter_max);
tokio::spawn(
{
let source_enable_audio = config.source_enable_audio;
let data_bus_tx = data_bus_tx.clone();
let once = std::sync::Once::new();
let task = async move {
loop {
while let Some(data) = data_rx.recv().await {
if !source_enable_audio && data.is_audio_packet() {
once.call_once(|| {
tracing::warn!("Got audio, but audio was disabled, see `source.enable_audio`");
});
continue;
}
while data_bus_tx.len() == source_jitter_max {
tracing::warn!("Source data bus maybe full, source_jitter_max: {}, see source.jitterMax for more detail", source_jitter_max);
tokio::time::sleep(Duration::from_millis(5)).await;
}
if let Err(broadcast::error::SendError(drop_data)) = data_bus_tx.send(data) {
tracing::info!(
"No scheme subscribers for source, data drop, size: {}",
drop_data.payload().len()
);
}
}
}
};
let cancel_token = cancel_token.child_token();
async move { cancel_token.run_until_cancelled(task).await }
}
.instrument(tracing::error_span!("DataInput", source_id = source_id)),
);
Ok(Self {
sources: Arc::new(RwLock::new(HashMap::new())),
config: config.clone(),
is_idles,
data_tx,
data_bus_tx,
cancel_token,
})
}
pub async fn add_scheme(&self, source_relay: &SourceRelay) -> Result<()> {
if self.sources.read().contains_key(&source_relay.scheme_key()) {
return Ok(());
}
let is_idle = Arc::new(AtomicBool::new(false));
let data_bus_rx = self.data_bus_tx.subscribe();
let source = SourceElement::new(source_relay, is_idle.clone(), &self.config, data_bus_rx).await?;
self.is_idles.write().push(is_idle);
self.sources.write().insert(source_relay.scheme_key(), Arc::new(source));
Ok(())
}
pub async fn fetch_resource(
&self,
scheme_key: &SchemeKey,
res: SourceResourceFileRet,
resource_name: &str,
) -> Result<()> {
let Some(source) = self.sources.read().get(scheme_key).cloned() else {
bail!("scheme no found, scheme {:?}", scheme_key);
};
source.fetch_resource(res, resource_name).await
}
pub async fn subscribe(&self, scheme_key: &SchemeKey) -> Result<SourceDataReceiver> {
let Some(source) = self.sources.read().get(scheme_key).cloned() else {
bail!("scheme no found, scheme_key {:?}", scheme_key);
};
let result = source.subscribe().await?;
Ok(result)
}
pub fn subscribe_input(&self) -> Result<IndexPacketSender> {
Ok(IndexPacketSender {
input_data: self.data_tx.clone(),
})
}
pub fn have_relay(&self) -> bool {
self.sources.read().iter().any(|(_, s)| s.have_relay())
}
}
impl Drop for Source {
fn drop(&mut self) {
self.cancel_token.cancel();
}
}