use crate::service::server::SearchAppState;
pub fn shutdown_flush_timeout_secs() -> std::time::Duration {
let secs = std::env::var("TRUSTY_SHUTDOWN_FLUSH_TIMEOUT_SECS")
.ok()
.and_then(|v| v.trim().parse::<u64>().ok())
.filter(|&s| s > 0)
.unwrap_or(10);
std::time::Duration::from_secs(secs)
}
pub async fn flush_all_indexes_on_shutdown(state: &SearchAppState) {
let ids = state.registry.list();
if ids.is_empty() {
return;
}
tracing::info!(
"shutdown: flushing {} index snapshot(s) before exit",
ids.len()
);
let flush_deadline = shutdown_flush_timeout_secs();
for id in ids {
let Some(handle) = state.registry.get(&id) else {
continue;
};
if crate::service::warm_boot::scan::is_likely_external_volume(&handle.root_path) {
tracing::warn!(
"shutdown: skipping flush for '{}' — root {} is on an external volume \
(TCC/I/O stall risk under launchd, issue #874). \
On-disk state is from the last incremental persist.",
id.0,
handle.root_path.display(),
);
continue;
}
let is_colocated =
crate::service::colocated_storage::has_colocated_storage(&handle.root_path);
let chunks_path = if is_colocated {
handle.root_path.join(".trusty-search").join("chunks.json")
} else {
match crate::service::persistence::chunks_path(&id.0) {
Ok(p) => p,
Err(e) => {
tracing::warn!("shutdown: chunks path unresolvable for '{}': {e}", id.0);
continue;
}
}
};
let hnsw_path = if is_colocated {
match crate::service::colocated_storage::colocated_hnsw_path(&handle.root_path) {
Ok(p) => p,
Err(e) => {
tracing::warn!(
"shutdown: colocated hnsw path unresolvable for '{}': {e}",
id.0
);
continue;
}
}
} else {
match crate::service::persistence::hnsw_path(&id.0) {
Ok(p) => p,
Err(e) => {
tracing::warn!("shutdown: hnsw path unresolvable for '{}': {e}", id.0);
continue;
}
}
};
let indexer_arc = handle.indexer.clone();
let index_id_for_log = id.0.clone();
let flush_future = async move {
let indexer = indexer_arc.read().await;
if let Err(e) = indexer.flush_corpus_to_disk(&chunks_path).await {
tracing::warn!(
"shutdown: failed to flush chunk corpus for '{}': {e}",
index_id_for_log
);
}
match indexer.save_vector_store(&hnsw_path).await {
Ok(true) => tracing::debug!("shutdown: saved HNSW for '{}'", index_id_for_log),
Ok(false) => {} Err(e) => tracing::warn!(
"shutdown: failed to save HNSW for '{}': {e}",
index_id_for_log
),
}
};
let timeout_secs = flush_deadline.as_secs();
match tokio::time::timeout(flush_deadline, flush_future).await {
Ok(()) => {}
Err(_elapsed) => {
tracing::warn!(
"shutdown: flush TIMED OUT for '{}' after {}s — skipping \
(on-disk state from last incremental persist, issue #874). \
Increase TRUSTY_SHUTDOWN_FLUSH_TIMEOUT_SECS to allow more time.",
id.0,
timeout_secs,
);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use serial_test::serial;
#[test]
#[serial]
fn shutdown_flush_timeout_parses_env_var() {
unsafe { std::env::set_var("TRUSTY_SHUTDOWN_FLUSH_TIMEOUT_SECS", "5") };
assert_eq!(
shutdown_flush_timeout_secs(),
std::time::Duration::from_secs(5),
"must parse 5 from env var"
);
unsafe { std::env::remove_var("TRUSTY_SHUTDOWN_FLUSH_TIMEOUT_SECS") };
assert_eq!(
shutdown_flush_timeout_secs(),
std::time::Duration::from_secs(10),
"must fall back to 10s default when env var absent"
);
}
#[tokio::test]
#[serial]
async fn shutdown_flush_empty_registry_returns_immediately() {
unsafe { std::env::set_var("TRUSTY_SHUTDOWN_FLUSH_TIMEOUT_SECS", "1") };
let state = crate::service::server::SearchAppState::new(
crate::core::registry::IndexRegistry::new(),
);
let start = std::time::Instant::now();
flush_all_indexes_on_shutdown(&state).await;
unsafe { std::env::remove_var("TRUSTY_SHUTDOWN_FLUSH_TIMEOUT_SECS") };
assert!(
start.elapsed() < std::time::Duration::from_secs(2),
"flush on empty registry must complete immediately; elapsed: {:?}",
start.elapsed()
);
}
}