use super::{BatchDownloadResult, SymbolClients, SymbolFetcher};
use crate::errors::TekHsiError;
use crate::helpers::Backoff;
use crate::tekscope::new_connect_client;
use crate::tekscope::{ConnectRequest, ConnectStatus};
use smol_str::SmolStr;
use std::time::Instant;
use tokio_util::sync::CancellationToken;
use tracing::{debug, warn};
pub(super) async fn download_loop(
channel: tonic::transport::Channel,
client_name: SmolStr,
config: super::AcquisitionConfig,
batch_sender: kanal::AsyncSender<BatchDownloadResult>,
recycle_rx: kanal::AsyncReceiver<BatchDownloadResult>,
shutdown: CancellationToken,
mut symbol_updates: tokio::sync::watch::Receiver<Vec<String>>,
) -> Result<(), TekHsiError> {
let mut wait_backoff = Backoff::new(5, 100, 14);
let mut client = new_connect_client(channel.clone(), &client_name);
let mut symbol_clients = Vec::new();
let mut symbol_fetcher = SymbolFetcher::new(0);
rebuild_symbol_state(
&config,
&channel,
&config.active_symbols,
&mut symbol_clients,
&mut symbol_fetcher,
);
loop {
if shutdown.is_cancelled() {
break;
}
if symbol_updates.has_changed().unwrap_or(false) {
let active_symbols = symbol_updates.borrow_and_update();
rebuild_symbol_state(
&config,
&channel,
&active_symbols,
&mut symbol_clients,
&mut symbol_fetcher,
);
}
debug_assert!(
!symbol_clients.is_empty(),
"Active symbol list should not be empty"
);
while let Ok(Some(batch)) = recycle_rx.try_recv() {
symbol_fetcher.recycle(batch);
}
let wait_start = Instant::now();
let wait_reply = tokio::select! {
_ = shutdown.cancelled() => {
break;
}
result = client.wait_for_data_access(ConnectRequest {
name: client_name.to_string(),
}) => {
result?.into_inner()
}
};
if wait_reply.status() != ConnectStatus::ConnectstatusSuccess {
warn!("wait_for_data_access failed: {:?}", wait_reply.status());
if !wait_backoff.sleep_next().await {
warn!("wait_for_data_access exceeded retry limit");
break;
}
continue;
}
let wait_time = wait_start.elapsed();
wait_backoff.reset();
let download_start = Instant::now();
let result = symbol_fetcher
.fetch_symbols(&symbol_clients, config.download_chunk_size)
.await;
let download_time = download_start.elapsed();
match result {
Ok(mut batch) => {
batch.wait_time = wait_time;
batch.download_time = download_time;
if batch_sender.send(batch).await.is_err() {
break;
}
}
Err(err) => {
warn!("acquisition read failed: {}", err);
}
}
let finished_reply = client
.finished_with_data_access(ConnectRequest {
name: client_name.to_string(),
})
.await?
.into_inner();
if finished_reply.status() != ConnectStatus::ConnectstatusSuccess {
warn!(
"finished_with_data_access failed: {:?}",
finished_reply.status()
);
break;
}
}
drop(batch_sender);
debug!("download loop exited");
Ok(())
}
fn rebuild_symbol_state(
config: &super::AcquisitionConfig,
channel: &tonic::transport::Channel,
active_symbols: &[String],
symbol_clients: &mut Vec<SymbolClients>,
symbol_fetcher: &mut SymbolFetcher,
) {
symbol_clients.clear();
symbol_clients.extend(active_symbols.iter().map(|symbol| {
let name: SmolStr = symbol.as_str().into();
SymbolClients::new(name, config.client_name.clone(), channel.clone())
}));
*symbol_fetcher = SymbolFetcher::new((config.decode_batch_buffer + 1) * symbol_clients.len());
symbol_clients.iter().for_each(|client| {
symbol_fetcher.preallocate_pool(&client.symbol, config.decode_batch_buffer + 1);
});
}