use std::path::Path;
use object_store::aws::AmazonS3Builder;
use object_store::{ObjectStore, ObjectStoreExt};
use super::columnar_segment::{ColumnarSegmentReader, SegmentError};
use super::partition_registry::PartitionRegistry;
use nodedb_types::timeseries::PartitionState;
#[derive(Debug, Clone)]
pub struct S3ArchiveConfig {
pub bucket: String,
pub prefix: String,
pub endpoint: Option<String>,
pub region: String,
pub access_key_id: Option<String>,
pub secret_access_key: Option<String>,
pub compression: ParquetCompression,
}
#[derive(Debug, Clone, Copy)]
pub enum ParquetCompression {
Zstd,
Lz4,
Snappy,
None,
}
impl Default for S3ArchiveConfig {
fn default() -> Self {
Self {
bucket: "nodedb-archive".into(),
prefix: "timeseries/".into(),
endpoint: None,
region: "us-east-1".into(),
access_key_id: None,
secret_access_key: None,
compression: ParquetCompression::Zstd,
}
}
}
pub fn build_object_store(
config: &S3ArchiveConfig,
) -> Result<object_store::aws::AmazonS3, SegmentError> {
let mut builder = AmazonS3Builder::new()
.with_bucket_name(&config.bucket)
.with_region(&config.region);
if let Some(ref endpoint) = config.endpoint {
builder = builder.with_endpoint(endpoint);
builder = builder.with_virtual_hosted_style_request(false);
}
if let Some(ref key) = config.access_key_id {
builder = builder.with_access_key_id(key);
}
if let Some(ref secret) = config.secret_access_key {
builder = builder.with_secret_access_key(secret);
}
builder = builder.with_allow_http(true);
builder
.build()
.map_err(|e| SegmentError::Io(format!("build S3 client: {e}")))
}
pub async fn archive_partition(
store: &impl ObjectStore,
config: &S3ArchiveConfig,
collection: &str,
partition_dir: &Path,
partition_dir_name: &str,
) -> Result<String, SegmentError> {
let meta = ColumnarSegmentReader::read_meta(partition_dir)?;
if meta.state != PartitionState::Sealed && meta.state != PartitionState::Merged {
return Err(SegmentError::Io(format!(
"cannot archive partition in state {:?}",
meta.state
)));
}
let key_prefix = format!("{}{}/{}/", config.prefix, collection, partition_dir_name);
let entries =
std::fs::read_dir(partition_dir).map_err(|e| SegmentError::Io(format!("read dir: {e}")))?;
for entry in entries {
let entry = entry.map_err(|e| SegmentError::Io(format!("dir entry: {e}")))?;
let file_name = entry.file_name();
let file_name_str = file_name.to_string_lossy();
let file_path = entry.path();
if !file_path.is_file() {
continue;
}
let data = std::fs::read(&file_path)
.map_err(|e| SegmentError::Io(format!("read {}: {e}", file_path.display())))?;
let object_key = format!("{key_prefix}{file_name_str}");
let location = object_store::path::Path::from(object_key.clone());
store
.put(&location, object_store::PutPayload::from(data))
.await
.map_err(|e| SegmentError::Io(format!("S3 put {object_key}: {e}")))?;
}
let marker_key = format!("{key_prefix}_archived");
let marker_location = object_store::path::Path::from(marker_key.clone());
let marker_data = serde_json::to_vec(&serde_json::json!({
"collection": collection,
"partition": partition_dir_name,
"min_ts": meta.min_ts,
"max_ts": meta.max_ts,
"row_count": meta.row_count,
"archived_at_ms": std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0),
}))
.unwrap_or_default();
store
.put(
&marker_location,
object_store::PutPayload::from(marker_data),
)
.await
.map_err(|e| SegmentError::Io(format!("S3 put marker: {e}")))?;
Ok(marker_key)
}
pub async fn run_archive_cycle(
store: &impl ObjectStore,
config: &S3ArchiveConfig,
registry: &mut PartitionRegistry,
collection: &str,
base_dir: &Path,
archive_after_ms: u64,
now_ms: i64,
) -> Result<usize, SegmentError> {
if archive_after_ms == 0 {
return Ok(0); }
let cutoff = now_ms - archive_after_ms as i64;
let mut archived = 0;
let candidates: Vec<(i64, String)> = registry
.iter()
.filter(|(_, e)| {
(e.meta.state == PartitionState::Sealed || e.meta.state == PartitionState::Merged)
&& e.meta.max_ts < cutoff
})
.map(|(&start, e)| (start, e.dir_name.clone()))
.collect();
for (start_ts, dir_name) in candidates {
let partition_dir = base_dir.join(&dir_name);
if !partition_dir.exists() {
continue;
}
match archive_partition(store, config, collection, &partition_dir, &dir_name).await {
Ok(key) => {
tracing::info!(
collection,
partition = dir_name,
s3_key = key,
"partition archived to S3"
);
if let Some(entry) = registry.get_mut(start_ts) {
entry.meta.state = PartitionState::Archived;
}
archived += 1;
}
Err(e) => {
tracing::warn!(
collection,
partition = dir_name,
error = %e,
"failed to archive partition"
);
}
}
}
if archived > 0 {
let manifest_path = base_dir.join("partition_manifest.json");
if let Err(e) = registry.persist(&manifest_path) {
tracing::warn!(error = %e, "failed to persist manifest after archival");
}
}
Ok(archived)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn default_config() {
let cfg = S3ArchiveConfig::default();
assert_eq!(cfg.bucket, "nodedb-archive");
assert!(cfg.endpoint.is_none());
}
#[test]
fn build_store_with_custom_endpoint() {
let cfg = S3ArchiveConfig {
endpoint: Some("http://localhost:9000".into()),
access_key_id: Some("minioadmin".into()),
secret_access_key: Some("minioadmin".into()),
..Default::default()
};
let store = build_object_store(&cfg);
assert!(
store.is_ok(),
"should build with custom endpoint: {store:?}"
);
}
}