use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ShardedS3Config {
pub bucket: String,
pub prefix: String,
pub total_shards: u32,
}
impl ShardedS3Config {
pub fn shard_prefix(&self, shard_id: u32) -> String {
format!("{}/shard-{:04}", self.prefix, shard_id)
}
pub fn partition_key(&self, shard_id: u32, collection: &str, partition_name: &str) -> String {
format!(
"{}/shard-{:04}/{collection}/{partition_name}",
self.prefix, shard_id
)
}
pub fn packed_partition_key(
&self,
shard_id: u32,
collection: &str,
min_ts: i64,
max_ts: i64,
) -> String {
format!(
"{}/shard-{:04}/{collection}/ts-{min_ts}_{max_ts}.ndpk",
self.prefix, shard_id
)
}
pub fn all_shard_prefixes(&self) -> Vec<String> {
(0..self.total_shards)
.map(|id| self.shard_prefix(id))
.collect()
}
pub fn collection_prefixes(&self, collection: &str) -> Vec<String> {
(0..self.total_shards)
.map(|id| format!("{}/shard-{:04}/{collection}/", self.prefix, id))
.collect()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ArchivedPartition {
pub s3_key: String,
pub shard_id: u32,
pub collection: String,
pub min_ts: i64,
pub max_ts: i64,
pub size_bytes: u64,
pub archived_at_ms: i64,
}
#[derive(Debug, Clone)]
pub struct ArchivePlan {
pub source_dir: String,
pub target_key: String,
pub shard_id: u32,
pub collection: String,
pub min_ts: i64,
pub max_ts: i64,
}
impl ShardedS3Config {
pub fn plan_archive(
&self,
shard_id: u32,
collection: &str,
partitions: &[(String, i64, i64)], ) -> Vec<ArchivePlan> {
partitions
.iter()
.map(|(source, min_ts, max_ts)| ArchivePlan {
source_dir: source.clone(),
target_key: self.packed_partition_key(shard_id, collection, *min_ts, *max_ts),
shard_id,
collection: collection.to_string(),
min_ts: *min_ts,
max_ts: *max_ts,
})
.collect()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn test_config() -> ShardedS3Config {
ShardedS3Config {
bucket: "my-bucket".into(),
prefix: "nodedb/v1/cluster-abc".into(),
total_shards: 10,
}
}
#[test]
fn shard_prefix() {
let cfg = test_config();
assert_eq!(cfg.shard_prefix(3), "nodedb/v1/cluster-abc/shard-0003");
}
#[test]
fn partition_key() {
let cfg = test_config();
assert_eq!(
cfg.partition_key(5, "metrics", "ts-1000_2000"),
"nodedb/v1/cluster-abc/shard-0005/metrics/ts-1000_2000"
);
}
#[test]
fn packed_key() {
let cfg = test_config();
assert_eq!(
cfg.packed_partition_key(7, "metrics", 1000, 2000),
"nodedb/v1/cluster-abc/shard-0007/metrics/ts-1000_2000.ndpk"
);
}
#[test]
fn all_shard_prefixes() {
let cfg = test_config();
let prefixes = cfg.all_shard_prefixes();
assert_eq!(prefixes.len(), 10);
assert!(prefixes[0].ends_with("shard-0000"));
assert!(prefixes[9].ends_with("shard-0009"));
}
#[test]
fn collection_prefixes() {
let cfg = test_config();
let prefixes = cfg.collection_prefixes("metrics");
assert_eq!(prefixes.len(), 10);
assert!(prefixes[0].contains("shard-0000/metrics/"));
}
#[test]
fn archive_plan() {
let cfg = test_config();
let partitions = vec![
("/data/ts-1000_2000".into(), 1000i64, 2000i64),
("/data/ts-3000_4000".into(), 3000, 4000),
];
let plans = cfg.plan_archive(2, "metrics", &partitions);
assert_eq!(plans.len(), 2);
assert_eq!(plans[0].shard_id, 2);
assert!(plans[0].target_key.contains("shard-0002"));
}
}