use smol_str::SmolStr;
use std::time::Duration;
use tokio::sync::{broadcast, watch};
use tokio::task::JoinHandle;
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
use tracing::debug;
use crate::data::Acquisition;
pub(crate) struct AcquisitionConfig {
pub(crate) client_name: SmolStr,
pub(crate) active_symbols: Vec<String>,
pub(crate) download_chunk_size: u32,
pub(crate) decode_batch_buffer: usize,
}
pub(crate) struct AcquisitionRunner {
channel: tonic::transport::Channel,
config: AcquisitionConfig,
sender: broadcast::Sender<Acquisition>,
shutdown: CancellationToken,
symbol_updates: watch::Receiver<Vec<String>>,
}
impl AcquisitionRunner {
pub(crate) fn new(
channel: tonic::transport::Channel,
config: AcquisitionConfig,
sender: broadcast::Sender<Acquisition>,
shutdown: CancellationToken,
symbol_updates: watch::Receiver<Vec<String>>,
) -> Self {
debug_assert!(
config.download_chunk_size != 0,
"Download chunk size must be greater than 0"
);
Self {
channel,
config,
sender,
shutdown,
symbol_updates,
}
}
pub(crate) fn spawn(self) -> JoinHandle<()> {
tokio::spawn(async move {
self.run().await;
})
}
async fn run(self) {
while self.sender.receiver_count() == 0 {
if self.shutdown.is_cancelled() {
debug!("acquisition loop cancelled before start");
return;
}
sleep(Duration::from_millis(1)).await;
}
let (batch_sender, batch_receiver) =
kanal::bounded_async::<super::BatchDownloadResult>(self.config.decode_batch_buffer);
let (recycle_tx, recycle_rx) =
kanal::bounded_async::<super::BatchDownloadResult>(self.config.decode_batch_buffer);
let download_handle = tokio::spawn(super::download_loop::download_loop(
self.channel,
self.config.client_name.clone(),
self.config,
batch_sender,
recycle_rx,
self.shutdown.clone(),
self.symbol_updates,
));
let decode_handle = tokio::spawn(super::decode_loop::decode_loop(
batch_receiver,
recycle_tx,
self.sender.clone(),
self.shutdown,
));
let _ = decode_handle.await;
let _ = download_handle.await;
debug!("acquisition loop exited");
}
}