use super::*;
pub(super) fn wal_flush_interval() -> std::time::Duration {
let ms: u64 = std::env::var("ZCCACHE_WAL_FLUSH_MS")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(5000);
std::time::Duration::from_millis(ms.max(1))
}
pub(super) fn wal_max_pending() -> usize {
std::env::var("ZCCACHE_WAL_MAX_PENDING")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(2048)
.max(1)
}
pub(super) async fn run_index_writer(
mut rx: tokio::sync::mpsc::UnboundedReceiver<(String, ArtifactIndex)>,
store: Arc<ArtifactStore>,
shutdown: Arc<Notify>,
) {
use std::collections::HashMap;
let flush_interval = wal_flush_interval();
let max_pending = wal_max_pending();
let mut wal: HashMap<String, ArtifactIndex> = HashMap::with_capacity(max_pending);
let mut ticker = tokio::time::interval(flush_interval);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
let _ = ticker.tick().await;
loop {
tokio::select! {
msg = rx.recv() => {
match msg {
Some((k, v)) => {
wal.insert(k, v);
while let Ok((k, v)) = rx.try_recv() {
wal.insert(k, v);
}
if wal.len() >= max_pending {
flush_wal_to_disk(&store, &mut wal).await;
}
}
None => {
flush_wal_to_disk(&store, &mut wal).await;
return;
}
}
}
_ = ticker.tick() => {
if !wal.is_empty() {
flush_wal_to_disk(&store, &mut wal).await;
}
}
_ = shutdown.notified() => {
while let Ok((k, v)) = rx.try_recv() {
wal.insert(k, v);
}
tracing::info!(
pending = wal.len(),
"index-writer shutdown signal received, draining and flushing"
);
flush_wal_to_disk(&store, &mut wal).await;
return;
}
}
}
}
pub(super) async fn flush_wal_to_disk(
store: &Arc<ArtifactStore>,
wal: &mut std::collections::HashMap<String, ArtifactIndex>,
) {
if wal.is_empty() {
return;
}
let drained: Vec<(String, ArtifactIndex)> = wal.drain().collect();
let count = drained.len();
store.insert_many(drained);
let store = Arc::clone(store);
let res = tokio::task::spawn_blocking(move || store.flush()).await;
match res {
Ok(Ok(())) => tracing::info!(committed = count, "WAL flushed to disk"),
Ok(Err(e)) => tracing::warn!(count, "WAL flush to disk failed: {e}"),
Err(e) => tracing::warn!(count, "WAL flush task join error: {e}"),
}
}