Skip to main content

nodedb_cluster/distributed_timeseries/
retention.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Coordinated retention across shards.
4//!
5//! When retention drops partitions, all shards must drop the same time range
6//! simultaneously. The coordinator broadcasts a retention command with
7//! `(collection, drop_before_ts)`. Each shard applies locally.
8//!
9//! This prevents inconsistency: shard A has data from Jan 1, shard B already
10//! dropped it → queries for that time range return partial results.
11
12use std::collections::HashMap;
13
14use serde::{Deserialize, Serialize};
15
16/// A coordinated retention command broadcast to all shards.
17#[derive(
18    Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
19)]
20#[msgpack(map)]
21pub struct RetentionCommand {
22    /// Collection to apply retention to.
23    pub collection: String,
24    /// Drop all partitions with max_ts < this timestamp.
25    pub drop_before_ts: i64,
26    /// Unique command ID for idempotency.
27    pub command_id: u64,
28}
29
30/// Result from a single shard's retention execution.
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct ShardRetentionResult {
33    pub shard_id: u32,
34    pub partitions_dropped: usize,
35    pub bytes_reclaimed: u64,
36    pub success: bool,
37    pub error: Option<String>,
38}
39
40/// Coordinator for cross-shard retention.
41pub struct CoordinatedRetention {
42    /// Retention period per collection (ms). 0 = infinite.
43    retention_periods: HashMap<String, u64>,
44    /// Last executed command ID per collection for idempotency.
45    last_command_ids: HashMap<String, u64>,
46    /// Next command ID.
47    next_command_id: u64,
48}
49
50impl CoordinatedRetention {
51    pub fn new() -> Self {
52        Self {
53            retention_periods: HashMap::new(),
54            last_command_ids: HashMap::new(),
55            next_command_id: 1,
56        }
57    }
58
59    /// Set retention period for a collection.
60    pub fn set_retention(&mut self, collection: &str, period_ms: u64) {
61        self.retention_periods
62            .insert(collection.to_string(), period_ms);
63    }
64
65    /// Generate retention commands for all collections that need trimming.
66    ///
67    /// `now_ms` is the current timestamp. Returns commands to broadcast
68    /// to all shards.
69    pub fn generate_commands(&mut self, now_ms: i64) -> Vec<RetentionCommand> {
70        let mut commands = Vec::new();
71
72        for (collection, &period_ms) in &self.retention_periods {
73            if period_ms == 0 {
74                continue;
75            }
76            let drop_before = now_ms - period_ms as i64;
77            let command_id = self.next_command_id;
78            self.next_command_id += 1;
79
80            commands.push(RetentionCommand {
81                collection: collection.clone(),
82                drop_before_ts: drop_before,
83                command_id,
84            });
85            self.last_command_ids.insert(collection.clone(), command_id);
86        }
87
88        commands
89    }
90
91    /// Verify that all shards successfully applied a retention command.
92    pub fn verify_results(
93        command: &RetentionCommand,
94        results: &[ShardRetentionResult],
95    ) -> RetentionVerification {
96        let total_shards = results.len();
97        let successful = results.iter().filter(|r| r.success).count();
98        let total_dropped: usize = results.iter().map(|r| r.partitions_dropped).sum();
99        let total_reclaimed: u64 = results.iter().map(|r| r.bytes_reclaimed).sum();
100        let failures: Vec<String> = results
101            .iter()
102            .filter(|r| !r.success)
103            .filter_map(|r| {
104                r.error
105                    .as_ref()
106                    .map(|e| format!("shard {}: {e}", r.shard_id))
107            })
108            .collect();
109
110        RetentionVerification {
111            collection: command.collection.clone(),
112            command_id: command.command_id,
113            total_shards,
114            successful_shards: successful,
115            total_partitions_dropped: total_dropped,
116            total_bytes_reclaimed: total_reclaimed,
117            failures,
118        }
119    }
120}
121
122impl Default for CoordinatedRetention {
123    fn default() -> Self {
124        Self::new()
125    }
126}
127
128/// Result of verifying a coordinated retention across all shards.
129#[derive(Debug)]
130pub struct RetentionVerification {
131    pub collection: String,
132    pub command_id: u64,
133    pub total_shards: usize,
134    pub successful_shards: usize,
135    pub total_partitions_dropped: usize,
136    pub total_bytes_reclaimed: u64,
137    pub failures: Vec<String>,
138}
139
140impl RetentionVerification {
141    pub fn all_succeeded(&self) -> bool {
142        self.failures.is_empty() && self.successful_shards == self.total_shards
143    }
144}
145
146#[cfg(test)]
147mod tests {
148    use super::*;
149
150    #[test]
151    fn generate_commands() {
152        let mut cr = CoordinatedRetention::new();
153        cr.set_retention("metrics", 7 * 86_400_000); // 7 days
154        cr.set_retention("logs", 3 * 86_400_000); // 3 days
155
156        let now = 10 * 86_400_000i64; // day 10
157        let commands = cr.generate_commands(now);
158        assert_eq!(commands.len(), 2);
159
160        let metrics_cmd = commands.iter().find(|c| c.collection == "metrics").unwrap();
161        assert_eq!(metrics_cmd.drop_before_ts, 3 * 86_400_000); // day 3
162
163        let logs_cmd = commands.iter().find(|c| c.collection == "logs").unwrap();
164        assert_eq!(logs_cmd.drop_before_ts, 7 * 86_400_000); // day 7
165    }
166
167    #[test]
168    fn verify_all_success() {
169        let cmd = RetentionCommand {
170            collection: "m".into(),
171            drop_before_ts: 1000,
172            command_id: 1,
173        };
174        let results = vec![
175            ShardRetentionResult {
176                shard_id: 0,
177                partitions_dropped: 5,
178                bytes_reclaimed: 1000,
179                success: true,
180                error: None,
181            },
182            ShardRetentionResult {
183                shard_id: 1,
184                partitions_dropped: 3,
185                bytes_reclaimed: 800,
186                success: true,
187                error: None,
188            },
189        ];
190        let v = CoordinatedRetention::verify_results(&cmd, &results);
191        assert!(v.all_succeeded());
192        assert_eq!(v.total_partitions_dropped, 8);
193        assert_eq!(v.total_bytes_reclaimed, 1800);
194    }
195
196    #[test]
197    fn verify_partial_failure() {
198        let cmd = RetentionCommand {
199            collection: "m".into(),
200            drop_before_ts: 1000,
201            command_id: 1,
202        };
203        let results = vec![
204            ShardRetentionResult {
205                shard_id: 0,
206                partitions_dropped: 5,
207                bytes_reclaimed: 1000,
208                success: true,
209                error: None,
210            },
211            ShardRetentionResult {
212                shard_id: 1,
213                partitions_dropped: 0,
214                bytes_reclaimed: 0,
215                success: false,
216                error: Some("disk full".into()),
217            },
218        ];
219        let v = CoordinatedRetention::verify_results(&cmd, &results);
220        assert!(!v.all_succeeded());
221        assert_eq!(v.failures.len(), 1);
222    }
223
224    #[test]
225    fn infinite_retention_skipped() {
226        let mut cr = CoordinatedRetention::new();
227        cr.set_retention("metrics", 0); // infinite
228        let commands = cr.generate_commands(1_000_000);
229        assert!(commands.is_empty());
230    }
231}