use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use tracing::{debug, info, warn};
use crate::config::server::ColdStorageSettings;
use crate::control::state::SharedState;
pub fn spawn_cold_tier_task(
shared: Arc<SharedState>,
settings: ColdStorageSettings,
data_dir: PathBuf,
mut shutdown_rx: tokio::sync::watch::Receiver<bool>,
) -> tokio::task::JoinHandle<()> {
let check_interval = Duration::from_secs(settings.tier_check_interval_secs);
let tier_after = Duration::from_secs(settings.tier_after_secs);
let prefix = settings.prefix.clone();
let segments_dir = data_dir.join("segments");
tokio::spawn(async move {
info!(
check_interval_secs = settings.tier_check_interval_secs,
tier_after_secs = settings.tier_after_secs,
segments_dir = %segments_dir.display(),
"cold tier task started"
);
loop {
tokio::select! {
_ = tokio::time::sleep(check_interval) => {}
_ = shutdown_rx.changed() => {
if *shutdown_rx.borrow() {
info!("cold tier task stopping on shutdown signal");
return;
}
}
}
let cold = match shared.cold_storage.as_ref() {
Some(c) => Arc::clone(c),
None => {
warn!("cold tier: cold_storage is None, stopping task");
return;
}
};
run_tier_cycle_at(&cold, &segments_dir, tier_after, &prefix).await;
}
})
}
pub(crate) async fn run_tier_cycle_at(
cold: &crate::storage::cold::ColdStorage,
segments_dir: &std::path::Path,
tier_after: Duration,
prefix: &str,
) {
let now = SystemTime::now();
let entries = match read_dir_sync(segments_dir).await {
Ok(e) => e,
Err(e) => {
warn!(
error = %e,
dir = %segments_dir.display(),
"cold tier: failed to read segments directory"
);
return;
}
};
let mut tiered: u64 = 0;
let mut errors: u64 = 0;
for entry_path in entries {
let age = match file_age(&entry_path, now) {
Some(a) => a,
None => {
debug!(path = %entry_path.display(), "cold tier: skipping entry (mtime unavailable)");
continue;
}
};
if age < tier_after {
debug!(
path = %entry_path.display(),
age_secs = age.as_secs(),
"cold tier: segment too recent, skipping"
);
continue;
}
let segment_name = match entry_path.file_name().and_then(|n| n.to_str()) {
Some(n) => n.to_owned(),
None => {
warn!(path = %entry_path.display(), "cold tier: invalid segment filename, skipping");
continue;
}
};
let object_path = format!("{}segments/{}", prefix, segment_name);
let entry_path_clone = entry_path.clone();
match upload_raw_segment(cold, &entry_path_clone, &object_path).await {
Ok(()) => {
info!(
segment = %segment_name,
object_path = %object_path,
age_secs = age.as_secs(),
"cold tier: segment uploaded, removing local copy"
);
let remove_path = entry_path.clone();
let remove_result =
tokio::task::spawn_blocking(move || std::fs::remove_file(&remove_path))
.await
.map_err(|e| format!("spawn_blocking join: {e}"))
.and_then(|r| r.map_err(|e| e.to_string()));
match remove_result {
Ok(()) => {
tiered += 1;
}
Err(e) => {
warn!(
segment = %segment_name,
error = %e,
"cold tier: failed to remove local segment after upload"
);
errors += 1;
}
}
}
Err(e) => {
warn!(segment = %segment_name, error = %e, "cold tier: upload failed");
errors += 1;
}
}
}
if tiered > 0 || errors > 0 {
info!(tiered, errors, "cold tier cycle complete");
} else {
debug!("cold tier cycle: no eligible segments");
}
}
async fn upload_raw_segment(
cold: &crate::storage::cold::ColdStorage,
local_path: &std::path::Path,
object_path: &str,
) -> crate::Result<()> {
let path_buf = local_path.to_path_buf();
let data = tokio::task::spawn_blocking(move || std::fs::read(&path_buf))
.await
.map_err(|e| crate::Error::Internal {
detail: format!("spawn_blocking join: {e}"),
})?
.map_err(|e| crate::Error::Internal {
detail: format!("read segment file: {e}"),
})?;
let store = cold.object_store();
let opath = object_store::path::Path::from(object_path.to_owned());
store
.put_opts(
&opath,
object_store::PutPayload::from(data),
object_store::PutOptions::default(),
)
.await
.map_err(|e| crate::Error::Internal {
detail: format!("object store put: {e}"),
})?;
Ok(())
}
async fn read_dir_sync(dir: &std::path::Path) -> std::io::Result<Vec<PathBuf>> {
let dir = dir.to_path_buf();
tokio::task::spawn_blocking(move || {
let mut result = Vec::new();
for entry in std::fs::read_dir(&dir)? {
let entry = entry?;
if entry.metadata()?.is_file() {
result.push(entry.path());
}
}
Ok(result)
})
.await
.map_err(|e| std::io::Error::other(format!("spawn_blocking join: {e}")))?
}
fn file_age(path: &std::path::Path, now: SystemTime) -> Option<Duration> {
let modified = std::fs::metadata(path).ok()?.modified().ok()?;
now.duration_since(modified).ok()
}