use crate::{
define::{EsDataSubscriber, IndexDataSender, IndexPacket, SSConfig},
packet::PacketEs,
utils::spawn_idle,
};
use anyhow::{Context, Result};
use std::{
path::PathBuf,
sync::{Arc, atomic::AtomicBool},
time::Duration,
};
use tempfile::Builder;
use tempfile::NamedTempFile;
use tokio::{
fs::File,
io::{AsyncSeekExt, AsyncWriteExt},
};
use tokio::{sync::mpsc::UnboundedSender, time::timeout};
use tokio::{
sync::{broadcast, mpsc::channel},
task::JoinHandle,
};
use tokio_stream::StreamExt;
use tokio_util::io::ReaderStream;
pub struct BlobCache {
sub: IndexDataSender,
data_tx: UnboundedSender<PacketEs>,
timeout_data: Duration,
th_idle: JoinHandle<()>,
file: NamedTempFile,
}
impl BlobCache {
pub async fn new(config: &SSConfig, is_idle: Arc<AtomicBool>) -> Result<Self> {
let timeout_data = config.source_data_IdleTime.into_inner();
let timeout_idle = config.source_idleTime.into_inner();
let protect_time = config.source_protectTime.into_inner();
let (sub, _) = broadcast::channel::<IndexPacket>(1);
let th_idle = spawn_idle(sub.clone(), timeout_idle, protect_time, is_idle);
let (data_tx, mut data_rx) = tokio::sync::mpsc::unbounded_channel::<PacketEs>();
let file = Builder::new().prefix("h5ssblob_").rand_bytes(6).tempfile()?;
let path = file.path().to_path_buf();
let mut f = File::create(&path).await.with_context(|| "create temp file fail")?;
tracing::debug!("Blob file create success, path: {:?}", path);
tokio::spawn(async move {
while let Ok(Some(packet)) = timeout(timeout_data, data_rx.recv()).await {
let _ = f.write_all(&packet.payload()).await;
}
tracing::debug!("BlobCache data task exit");
});
Ok(Self {
sub,
timeout_data,
th_idle,
data_tx,
file,
})
}
pub async fn input_packet(&mut self, pkt: PacketEs) -> Result<()> {
self.data_tx.send(pkt).with_context(|| "write to buffer fail")?;
Ok(())
}
pub async fn source_readied(&self) -> tokio::sync::oneshot::Receiver<PathBuf> {
let data_tx = self.data_tx.clone();
let (tx, rx) = tokio::sync::oneshot::channel();
let file_path = self.file.path().into();
tokio::spawn(async move {
data_tx.closed().await;
let _ = tx.send(file_path);
});
rx
}
pub fn subscribe_output(&self) -> EsDataSubscriber {
let live_time = self.sub.subscribe();
let (tx, rx) = channel::<PacketEs>(1);
let path = self.file.path().to_path_buf();
let mut position: u64 = 0;
let mut last_size: u64 = 0;
let mut last_change = tokio::time::Instant::now();
let poll_interval = Duration::from_millis(200);
let idle_timeout = self.timeout_data;
tokio::spawn(async move {
loop {
let Ok(mut f) = File::open(&path).await else {
tracing::error!("open file fail");
break;
};
if position > 0 {
if f.seek(std::io::SeekFrom::Start(position)).await.is_err() {
tracing::warn!("BlobCache subscribe: seek failed");
break;
}
}
let mut stream = ReaderStream::new(f);
while let Some(Ok(chunk)) = stream.next().await {
position += chunk.len() as u64;
let data = PacketEs::new_com(chunk);
if tx.send(data).await.is_err() {
return;
}
}
if position != last_size {
last_size = position;
last_change = tokio::time::Instant::now();
} else if last_change.elapsed() >= idle_timeout {
tracing::debug!("BlobCache subscribe: no new data for {:?}, exiting", idle_timeout);
return;
}
tokio::time::sleep(poll_interval).await;
}
drop(live_time);
});
rx
}
}
impl Drop for BlobCache {
fn drop(&mut self) {
self.th_idle.abort();
}
}