use crate::{FrameFormat, FrameReaderConfig, TapPacket, TapReader};
use arrayvec::ArrayVec;
use rtrb::Consumer;
use std::sync::Arc;
use std::time::Duration;
pub struct AsyncFrameReader<const C: usize = 2> {
tap_fn: Box<dyn Fn() -> Option<Arc<TapReader<C>>> + Send + Sync>,
config: FrameReaderConfig,
active_consumer: Option<Consumer<TapPacket<C>>>,
active_tap: Option<Arc<TapReader<C>>>,
ch: usize,
sr: u32,
has_format: bool,
batch_buf: Vec<ArrayVec<f32, C>>,
batch_len_frames: usize,
}
impl<const C: usize> AsyncFrameReader<C> {
pub fn new<G>(tap_fn: G) -> Self
where
G: Fn() -> Option<Arc<TapReader<C>>> + Send + Sync + 'static,
{
let config = FrameReaderConfig::default();
Self::new_with_config(config, tap_fn)
}
pub fn new_with_config<G>(config: FrameReaderConfig, tap_fn: G) -> Self
where
G: Fn() -> Option<Arc<TapReader<C>>> + Send + Sync + 'static,
{
assert!(C > 0, "AsyncFrameReader requires C > 0");
assert!(
config.frames_per_batch.is_some() || config.time_per_batch.is_some(),
"FrameReaderConfig requires at least one batch target: frames_per_batch or time_per_batch"
);
Self {
tap_fn: Box::new(tap_fn),
config,
active_consumer: None,
active_tap: None,
ch: 0,
sr: 0,
has_format: false,
batch_buf: Vec::new(),
batch_len_frames: 0,
}
}
#[inline]
fn recompute_batch_size(&mut self) {
self.batch_len_frames = if let Some(frames) = self.config.frames_per_batch {
frames as usize
} else if self.sr == 0 {
1
} else {
let batch_duration = self.config.time_per_batch.expect(
"FrameReaderConfig must set time_per_batch when frames_per_batch is not set",
);
((self.sr as u128 * batch_duration.as_nanos() + 500_000_000) / 1_000_000_000) as usize
}
.max(1);
self.batch_buf.clear();
self.batch_buf.reserve(self.batch_len_frames);
}
fn try_attach_or_switch(&mut self, tap: Arc<TapReader<C>>) -> bool {
let tap_changed = match &self.active_tap {
Some(active) => !Arc::ptr_eq(active, &tap),
None => true,
};
if self.active_consumer.is_none() || tap_changed {
if let Ok(mut slot) = tap.consumer.lock()
&& let Some(cons) = slot.take()
{
self.active_consumer = Some(cons);
self.active_tap = Some(Arc::clone(&tap));
self.ch = 0;
self.sr = 0;
self.has_format = false;
self.recompute_batch_size();
#[cfg(feature = "log")]
log::debug!("AsyncFrameReader attached tap (awaiting first Format packet)");
return true;
}
self.active_consumer = None;
self.active_tap = None;
}
false
}
#[inline]
fn emit_batch<F>(&mut self, on_batch: &mut F)
where
F: FnMut(&[ArrayVec<f32, C>], usize /*channels*/, u32 /*sr*/),
{
if self.batch_buf.is_empty() {
return;
}
on_batch(&self.batch_buf, self.ch, self.sr);
self.batch_buf.clear();
}
#[inline]
fn handle_format<F>(&mut self, fmt: FrameFormat, on_batch: &mut F)
where
F: FnMut(&[ArrayVec<f32, C>], usize /*channels*/, u32 /*sr*/),
{
let new_ch = fmt.channels as usize;
if new_ch == 0 || new_ch > C {
#[cfg(feature = "log")]
log::debug!(
"AsyncFrameReader ignoring invalid format packet ({} ch @ {} Hz)",
fmt.channels,
fmt.sample_rate_hz
);
return;
}
let format_changed = !self.has_format || self.ch != new_ch || self.sr != fmt.sample_rate_hz;
if !format_changed {
return;
}
if self.has_format && !self.batch_buf.is_empty() {
self.emit_batch(on_batch);
}
self.ch = new_ch;
self.sr = fmt.sample_rate_hz;
self.has_format = true;
self.recompute_batch_size();
}
#[inline]
fn handle_frame<F>(&mut self, frame: &ArrayVec<f32, C>, on_batch: &mut F)
where
F: FnMut(&[ArrayVec<f32, C>], usize /*channels*/, u32 /*sr*/),
{
if !self.has_format {
return;
}
if frame.len() != self.ch {
#[cfg(feature = "log")]
log::debug!(
"AsyncFrameReader dropping frame with len {} (expected {})",
frame.len(),
self.ch
);
return;
}
self.batch_buf.push(frame.clone());
if self.batch_buf.len() == self.batch_len_frames {
self.emit_batch(on_batch);
}
}
#[inline]
fn sleep_for_missing(&self) -> Option<Duration> {
if self.active_consumer.is_none() || self.sr == 0 || self.ch == 0 {
return None;
}
if self.batch_buf.len() >= self.batch_len_frames {
return None;
}
let missing_frames = self.batch_len_frames - self.batch_buf.len();
if missing_frames == 0 {
return None;
}
let nanos_f = (missing_frames as f64 / self.sr as f64)
* 1_000_000_000.0
* (self.config.sleep_bias as f64);
let mut d = Duration::from_nanos(nanos_f as u64);
if d < self.config.min_sleep {
d = self.config.min_sleep;
}
if d > self.config.max_sleep {
d = self.config.max_sleep;
}
Some(d)
}
pub async fn run<F>(&mut self, mut on_batch: F) -> !
where
F: FnMut(&[ArrayVec<f32, C>], usize , u32 ) + Send + 'static,
{
loop {
if self.active_consumer.is_none() {
let Some(tap) = (self.tap_fn)() else {
tokio::time::sleep(self.config.no_tap_sleep).await;
continue;
};
if !self.try_attach_or_switch(tap) {
tokio::time::sleep(self.config.no_tap_sleep).await;
continue;
}
}
let mut made_progress = false;
if self.active_consumer.is_some() {
let mut cons = self.active_consumer.take().unwrap();
loop {
let avail = cons.slots();
if avail == 0 {
break;
}
let missing_frames = self.batch_len_frames.saturating_sub(self.batch_buf.len());
let target_packets = missing_frames.max(1).saturating_add(1);
let want = avail.min(target_packets);
let Ok(chunk) = cons.read_chunk(want) else {
break;
};
let (a, b) = chunk.as_slices();
let total = a.len() + b.len();
if total == 0 {
break;
}
for packet in a.iter().chain(b.iter()) {
match packet {
TapPacket::Format(fmt) => self.handle_format(*fmt, &mut on_batch),
TapPacket::Frame(frame) => self.handle_frame(frame, &mut on_batch),
}
made_progress = true;
}
chunk.commit(total);
}
self.active_consumer = Some(cons);
}
if let Some(tap) = (self.tap_fn)() {
let tap_changed = match &self.active_tap {
Some(active) => !Arc::ptr_eq(active, &tap),
None => true,
};
if tap_changed {
#[cfg(feature = "log")]
log::trace!("AsyncFrameReader switching tap / tracks");
self.batch_buf.clear(); self.has_format = false;
let _ = self.try_attach_or_switch(tap);
continue;
}
}
if !self.batch_buf.is_empty()
&& !made_progress
&& let Some(d) = self.sleep_for_missing()
{
tokio::time::sleep(d).await;
continue;
}
tokio::task::yield_now().await;
}
}
}