#[cfg(feature = "s3-archival")]
use std::sync::Arc;
#[cfg(feature = "s3-archival")]
use anyhow::Result;
#[cfg(feature = "s3-archival")]
use tokio::time::{interval, Duration};
#[cfg(feature = "s3-archival")]
use tracing::{debug, error, info, warn};
#[cfg(feature = "s3-archival")]
use crate::store::WorkflowStore;
#[cfg(feature = "s3-archival")]
pub struct ArchivalConfig {
pub bucket: String,
pub prefix: String,
pub retention_secs: f64,
pub poll_secs: u64,
pub batch_size: i64,
}
#[cfg(feature = "s3-archival")]
impl ArchivalConfig {
pub fn from_env() -> Option<Self> {
let bucket = std::env::var("ASSAY_ARCHIVE_S3_BUCKET").ok()?;
let prefix =
std::env::var("ASSAY_ARCHIVE_S3_PREFIX").unwrap_or_else(|_| "assay/".to_string());
let retention_days = std::env::var("ASSAY_ARCHIVE_RETENTION_DAYS")
.ok()
.and_then(|s| s.parse::<f64>().ok())
.unwrap_or(30.0);
let poll_secs = std::env::var("ASSAY_ARCHIVE_POLL_SECS")
.ok()
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(3600);
let batch_size = std::env::var("ASSAY_ARCHIVE_BATCH_SIZE")
.ok()
.and_then(|s| s.parse::<i64>().ok())
.unwrap_or(50);
Some(Self {
bucket,
prefix,
retention_secs: retention_days * 86_400.0,
poll_secs,
batch_size,
})
}
}
#[cfg(feature = "s3-archival")]
pub async fn run_archival<S: WorkflowStore>(store: Arc<S>, cfg: ArchivalConfig) {
let aws_config = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await;
let client = aws_sdk_s3::Client::new(&aws_config);
info!(
"Archival started (bucket={}, prefix={}, retention_days={:.1}, poll_secs={}, batch_size={})",
cfg.bucket,
cfg.prefix,
cfg.retention_secs / 86_400.0,
cfg.poll_secs,
cfg.batch_size
);
let mut tick = interval(Duration::from_secs(cfg.poll_secs));
loop {
tick.tick().await;
if let Err(e) = archive_batch(&*store, &client, &cfg).await {
error!("Archival tick failed: {e}");
}
}
}
#[cfg(feature = "s3-archival")]
async fn archive_batch<S: WorkflowStore>(
store: &S,
client: &aws_sdk_s3::Client,
cfg: &ArchivalConfig,
) -> Result<()> {
let now = crate::timestamp_now();
let cutoff = now - cfg.retention_secs;
let candidates = store
.list_archivable_workflows(cutoff, cfg.batch_size)
.await?;
if candidates.is_empty() {
debug!("Archival: no workflows eligible (cutoff={cutoff})");
return Ok(());
}
for wf in candidates {
match archive_one(store, client, cfg, &wf).await {
Ok(uri) => info!("Archived workflow {} → {}", wf.id, uri),
Err(e) => {
warn!("Archival failed for workflow {}: {}", wf.id, e);
}
}
}
Ok(())
}
#[cfg(feature = "s3-archival")]
async fn archive_one<S: WorkflowStore>(
store: &S,
client: &aws_sdk_s3::Client,
cfg: &ArchivalConfig,
wf: &crate::types::WorkflowRecord,
) -> Result<String> {
let events = store.list_events(&wf.id).await?;
let bundle = serde_json::json!({
"format_version": 1,
"workflow": wf,
"events": events,
});
let body = serde_json::to_vec(&bundle)?;
let key = format!(
"{}{}/{}.json",
cfg.prefix.trim_end_matches('/').to_string() + "/",
wf.namespace,
wf.id
);
client
.put_object()
.bucket(&cfg.bucket)
.key(&key)
.body(aws_sdk_s3::primitives::ByteStream::from(body))
.content_type("application/json")
.send()
.await
.map_err(|e| anyhow::anyhow!("s3 put_object failed: {e}"))?;
let uri = format!("s3://{}/{}", cfg.bucket, key);
store
.mark_archived_and_purge(&wf.id, &uri, crate::timestamp_now())
.await?;
Ok(uri)
}