nodedb_cluster/distributed_timeseries/
retention.rs1use std::collections::HashMap;
13
14use serde::{Deserialize, Serialize};
15
16#[derive(
18 Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
19)]
20#[msgpack(map)]
21pub struct RetentionCommand {
22 pub collection: String,
24 pub drop_before_ts: i64,
26 pub command_id: u64,
28}
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct ShardRetentionResult {
33 pub shard_id: u32,
34 pub partitions_dropped: usize,
35 pub bytes_reclaimed: u64,
36 pub success: bool,
37 pub error: Option<String>,
38}
39
40pub struct CoordinatedRetention {
42 retention_periods: HashMap<String, u64>,
44 last_command_ids: HashMap<String, u64>,
46 next_command_id: u64,
48}
49
50impl CoordinatedRetention {
51 pub fn new() -> Self {
52 Self {
53 retention_periods: HashMap::new(),
54 last_command_ids: HashMap::new(),
55 next_command_id: 1,
56 }
57 }
58
59 pub fn set_retention(&mut self, collection: &str, period_ms: u64) {
61 self.retention_periods
62 .insert(collection.to_string(), period_ms);
63 }
64
65 pub fn generate_commands(&mut self, now_ms: i64) -> Vec<RetentionCommand> {
70 let mut commands = Vec::new();
71
72 for (collection, &period_ms) in &self.retention_periods {
73 if period_ms == 0 {
74 continue;
75 }
76 let drop_before = now_ms - period_ms as i64;
77 let command_id = self.next_command_id;
78 self.next_command_id += 1;
79
80 commands.push(RetentionCommand {
81 collection: collection.clone(),
82 drop_before_ts: drop_before,
83 command_id,
84 });
85 self.last_command_ids.insert(collection.clone(), command_id);
86 }
87
88 commands
89 }
90
91 pub fn verify_results(
93 command: &RetentionCommand,
94 results: &[ShardRetentionResult],
95 ) -> RetentionVerification {
96 let total_shards = results.len();
97 let successful = results.iter().filter(|r| r.success).count();
98 let total_dropped: usize = results.iter().map(|r| r.partitions_dropped).sum();
99 let total_reclaimed: u64 = results.iter().map(|r| r.bytes_reclaimed).sum();
100 let failures: Vec<String> = results
101 .iter()
102 .filter(|r| !r.success)
103 .filter_map(|r| {
104 r.error
105 .as_ref()
106 .map(|e| format!("shard {}: {e}", r.shard_id))
107 })
108 .collect();
109
110 RetentionVerification {
111 collection: command.collection.clone(),
112 command_id: command.command_id,
113 total_shards,
114 successful_shards: successful,
115 total_partitions_dropped: total_dropped,
116 total_bytes_reclaimed: total_reclaimed,
117 failures,
118 }
119 }
120}
121
122impl Default for CoordinatedRetention {
123 fn default() -> Self {
124 Self::new()
125 }
126}
127
128#[derive(Debug)]
130pub struct RetentionVerification {
131 pub collection: String,
132 pub command_id: u64,
133 pub total_shards: usize,
134 pub successful_shards: usize,
135 pub total_partitions_dropped: usize,
136 pub total_bytes_reclaimed: u64,
137 pub failures: Vec<String>,
138}
139
140impl RetentionVerification {
141 pub fn all_succeeded(&self) -> bool {
142 self.failures.is_empty() && self.successful_shards == self.total_shards
143 }
144}
145
146#[cfg(test)]
147mod tests {
148 use super::*;
149
150 #[test]
151 fn generate_commands() {
152 let mut cr = CoordinatedRetention::new();
153 cr.set_retention("metrics", 7 * 86_400_000); cr.set_retention("logs", 3 * 86_400_000); let now = 10 * 86_400_000i64; let commands = cr.generate_commands(now);
158 assert_eq!(commands.len(), 2);
159
160 let metrics_cmd = commands.iter().find(|c| c.collection == "metrics").unwrap();
161 assert_eq!(metrics_cmd.drop_before_ts, 3 * 86_400_000); let logs_cmd = commands.iter().find(|c| c.collection == "logs").unwrap();
164 assert_eq!(logs_cmd.drop_before_ts, 7 * 86_400_000); }
166
167 #[test]
168 fn verify_all_success() {
169 let cmd = RetentionCommand {
170 collection: "m".into(),
171 drop_before_ts: 1000,
172 command_id: 1,
173 };
174 let results = vec![
175 ShardRetentionResult {
176 shard_id: 0,
177 partitions_dropped: 5,
178 bytes_reclaimed: 1000,
179 success: true,
180 error: None,
181 },
182 ShardRetentionResult {
183 shard_id: 1,
184 partitions_dropped: 3,
185 bytes_reclaimed: 800,
186 success: true,
187 error: None,
188 },
189 ];
190 let v = CoordinatedRetention::verify_results(&cmd, &results);
191 assert!(v.all_succeeded());
192 assert_eq!(v.total_partitions_dropped, 8);
193 assert_eq!(v.total_bytes_reclaimed, 1800);
194 }
195
196 #[test]
197 fn verify_partial_failure() {
198 let cmd = RetentionCommand {
199 collection: "m".into(),
200 drop_before_ts: 1000,
201 command_id: 1,
202 };
203 let results = vec![
204 ShardRetentionResult {
205 shard_id: 0,
206 partitions_dropped: 5,
207 bytes_reclaimed: 1000,
208 success: true,
209 error: None,
210 },
211 ShardRetentionResult {
212 shard_id: 1,
213 partitions_dropped: 0,
214 bytes_reclaimed: 0,
215 success: false,
216 error: Some("disk full".into()),
217 },
218 ];
219 let v = CoordinatedRetention::verify_results(&cmd, &results);
220 assert!(!v.all_succeeded());
221 assert_eq!(v.failures.len(), 1);
222 }
223
224 #[test]
225 fn infinite_retention_skipped() {
226 let mut cr = CoordinatedRetention::new();
227 cr.set_retention("metrics", 0); let commands = cr.generate_commands(1_000_000);
229 assert!(commands.is_empty());
230 }
231}