use std::collections::HashMap;
use std::path::PathBuf;
use std::time::Duration;
use tokio::sync::{broadcast, oneshot};
use crate::directory_config::store::{TableCache, TimestampCache, load_all_tables};
use crate::directory_config::types::{ChangeEvent, ChangeOperation};
pub(crate) async fn refresh_loop(
cache: TableCache,
timestamps: TimestampCache,
directory: PathBuf,
interval: Duration,
change_tx: broadcast::Sender<ChangeEvent>,
mut shutdown_rx: oneshot::Receiver<()>,
) {
let mut ticker = tokio::time::interval(interval);
ticker.tick().await;
loop {
tokio::select! {
_ = ticker.tick() => {
let old_tables: HashMap<String, std::time::SystemTime> = {
timestamps.read().await.clone()
};
if let Err(e) = load_all_tables(&directory, &cache, ×tamps).await {
tracing::warn!(error = %e, "Background refresh failed");
continue;
}
let new_timestamps = timestamps.read().await.clone();
for (table, new_ts) in &new_timestamps {
let changed = match old_tables.get(table) {
Some(old_ts) => new_ts != old_ts,
None => true, };
if changed {
tracing::debug!(table = %table, "Table refreshed from disk");
let _ = change_tx.send(ChangeEvent {
table: table.clone(),
operation: ChangeOperation::Refreshed,
});
}
}
for table in old_tables.keys() {
if !new_timestamps.contains_key(table) {
tracing::debug!(table = %table, "Table removed from disk");
let _ = change_tx.send(ChangeEvent {
table: table.clone(),
operation: ChangeOperation::Deleted,
});
}
}
}
_ = &mut shutdown_rx => {
tracing::debug!("Refresh loop shutting down");
break;
}
}
}
}