use futures::{Stream, StreamExt};
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::watch;
use tokio_stream::wrappers::WatchStream;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn};
use crate::driver::Driver;
use crate::provider::Provider;
use crate::providers::replay::ReplayProvider;
use crate::stream::ThrottleExt;
use crate::types::{FramePacket, UpdateRate};
use crate::{FrameAdapter, Result, SessionInfo, VariableSchema};
pub struct ReplayConnection {
frames: watch::Receiver<Option<Arc<FramePacket>>>,
sessions: watch::Receiver<Option<Arc<SessionInfo>>>,
schema: Arc<VariableSchema>,
source_hz: f64,
cancel: CancellationToken,
}
impl ReplayConnection {
pub async fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
let path = path.as_ref();
info!("Opening IBT file: {}", path.display());
let provider = ReplayProvider::new(path)?;
let schema = provider.schema();
let source_hz = provider.tick_rate();
let channels = Driver::spawn(provider);
let mut frame_rx = channels.frames.clone();
let timeout = std::time::Duration::from_secs(5);
let wait_result = tokio::time::timeout(timeout, async {
loop {
frame_rx.changed().await.ok();
if frame_rx.borrow().is_some() {
break;
}
}
})
.await;
if wait_result.is_err() {
warn!("Timeout waiting for first frame from replay file");
}
info!("Replay connection opened ({}Hz)", source_hz);
Ok(Self {
frames: channels.frames,
sessions: channels.sessions,
schema,
source_hz,
cancel: channels.cancel,
})
}
pub fn subscribe<T>(&self, rate: UpdateRate) -> impl Stream<Item = T> + 'static
where
T: FrameAdapter + Send + 'static,
{
let validation = T::validate_schema(&self.schema).expect("Schema validation failed");
let frames = WatchStream::new(self.frames.clone()).filter_map(|opt| async move { opt });
let effective_rate = rate.normalize(self.source_hz);
match effective_rate {
UpdateRate::Native => {
frames.map(move |packet| T::adapt(&packet, &validation)).boxed()
}
UpdateRate::Max(hz) => {
let interval = Duration::from_secs_f64(1.0 / hz as f64);
frames.throttle(interval).map(move |packet| T::adapt(&packet, &validation)).boxed()
}
}
}
pub fn session_updates(&self) -> impl Stream<Item = Arc<SessionInfo>> + 'static {
WatchStream::new(self.sessions.clone()).filter_map(|opt| async move { opt })
}
pub fn current_session(&self) -> Option<Arc<SessionInfo>> {
self.sessions.borrow().clone()
}
pub fn source_hz(&self) -> f64 {
self.source_hz
}
pub fn schema(&self) -> &VariableSchema {
&self.schema
}
}
impl Drop for ReplayConnection {
fn drop(&mut self) {
debug!("Dropping replay connection");
self.cancel.cancel();
}
}