Skip to main content

meerkat_runtime/
input_ledger.rs

1//! InputLedger — in-memory ledger of InputState entries.
2//!
3//! IndexMap<InputId, InputState> with dedup by idempotency_key.
4
5use chrono::{Duration, Utc};
6use indexmap::IndexMap;
7use meerkat_core::lifecycle::InputId;
8
9use crate::identifiers::IdempotencyKey;
10use crate::input::InputDurability;
11use crate::input_state::InputState;
12
13const TERMINAL_DEDUP_RETENTION_LIMIT: usize = 4096;
14const TERMINAL_DEDUP_RETENTION_TTL_HOURS: i64 = 24;
15
16/// In-memory ledger tracking InputState for all inputs.
17#[derive(Debug, Default, Clone)]
18pub struct InputLedger {
19    /// InputId → InputState (insertion order preserved).
20    states: IndexMap<InputId, InputState>,
21    /// IdempotencyKey → InputId for dedup lookup.
22    idempotency_index: IndexMap<IdempotencyKey, InputId>,
23}
24
25impl InputLedger {
26    /// Create a new empty ledger.
27    pub fn new() -> Self {
28        Self::default()
29    }
30
31    /// Accept a new InputState into the ledger.
32    pub fn accept(&mut self, state: InputState) {
33        self.states.insert(state.input_id.clone(), state);
34    }
35
36    /// Accept with an idempotency key for dedup.
37    /// Returns `Some(existing_id)` if the key already exists (dedup hit).
38    pub fn accept_with_idempotency(
39        &mut self,
40        state: InputState,
41        key: IdempotencyKey,
42    ) -> Option<InputId> {
43        self.prune_terminal_dedup(Utc::now());
44        if let Some(existing_id) = self.idempotency_index.get(&key) {
45            return Some(existing_id.clone());
46        }
47        let input_id = state.input_id.clone();
48        self.idempotency_index.insert(key, input_id);
49        self.states.insert(state.input_id.clone(), state);
50        None
51    }
52
53    /// Recover a durable InputState from persistent storage.
54    ///
55    /// Unlike `accept()`, this also rebuilds the idempotency index
56    /// and filters out Ephemeral inputs (which should not survive restart).
57    /// Returns `true` if the state was inserted, `false` if filtered.
58    pub fn recover(&mut self, state: InputState) -> bool {
59        // Ephemeral inputs should not survive restarts
60        if state.durability == Some(InputDurability::Ephemeral) {
61            return false;
62        }
63
64        // Rebuild idempotency index so dedup works after restart
65        if let Some(ref key) = state.idempotency_key {
66            self.idempotency_index
67                .insert(key.clone(), state.input_id.clone());
68        }
69
70        self.states.insert(state.input_id.clone(), state);
71        true
72    }
73
74    /// Get the state of a specific input.
75    pub fn get(&self, input_id: &InputId) -> Option<&InputState> {
76        self.states.get(input_id)
77    }
78
79    /// Look up the canonical input ID for an idempotency key.
80    pub fn input_id_for_idempotency_key(&self, key: &IdempotencyKey) -> Option<InputId> {
81        self.idempotency_index.get(key).cloned()
82    }
83
84    /// Remove an input from the ledger and dedup index.
85    pub fn remove(&mut self, input_id: &InputId) -> Option<InputState> {
86        let removed = self.states.shift_remove(input_id)?;
87        if let Some(key) = &removed.idempotency_key {
88            self.idempotency_index.shift_remove(key);
89        }
90        Some(removed)
91    }
92
93    /// Get mutable reference to the state of a specific input.
94    pub fn get_mut(&mut self, input_id: &InputId) -> Option<&mut InputState> {
95        self.states.get_mut(input_id)
96    }
97
98    /// Iterate over all non-terminal input states.
99    pub fn iter_non_terminal(&self) -> impl Iterator<Item = (&InputId, &InputState)> {
100        self.states.iter().filter(|(_, state)| !state.is_terminal())
101    }
102
103    /// Iterate over all input states. "Active" (non-terminal) filtering must
104    /// happen at the driver level, which has DSL access; the ledger by itself
105    /// carries only shell metadata.
106    pub fn iter(&self) -> impl Iterator<Item = (&InputId, &InputState)> {
107        self.states.iter()
108    }
109
110    /// Number of entries in the ledger.
111    pub fn len(&self) -> usize {
112        self.states.len()
113    }
114
115    /// Check if the ledger is empty.
116    pub fn is_empty(&self) -> bool {
117        self.states.is_empty()
118    }
119
120    /// Number of non-terminal entries.
121    pub fn active_count(&self) -> usize {
122        self.states.values().filter(|s| !s.is_terminal()).count()
123    }
124
125    /// Get all active (non-terminal) input IDs.
126    pub fn active_input_ids(&self) -> Vec<InputId> {
127        self.iter_non_terminal().map(|(id, _)| id.clone()).collect()
128    }
129
130    fn prune_terminal_dedup(&mut self, now: chrono::DateTime<Utc>) {
131        self.prune_terminal_dedup_with_limit(now, TERMINAL_DEDUP_RETENTION_LIMIT);
132    }
133
134    fn prune_terminal_dedup_with_limit(
135        &mut self,
136        now: chrono::DateTime<Utc>,
137        retention_limit: usize,
138    ) {
139        let cutoff = now - Duration::hours(TERMINAL_DEDUP_RETENTION_TTL_HOURS);
140        let expired_keys = self
141            .idempotency_index
142            .iter()
143            .filter_map(|(key, input_id)| {
144                let state = self.states.get(input_id)?;
145                (state.is_terminal() && state.updated_at() < cutoff).then(|| key.clone())
146            })
147            .collect::<Vec<_>>();
148        for key in expired_keys {
149            self.idempotency_index.shift_remove(&key);
150        }
151
152        let terminal_keys = self
153            .idempotency_index
154            .iter()
155            .filter(|(_, input_id)| {
156                self.states
157                    .get(*input_id)
158                    .is_some_and(InputState::is_terminal)
159            })
160            .map(|(key, _)| key.clone())
161            .collect::<Vec<_>>();
162        let overflow = terminal_keys.len().saturating_sub(retention_limit);
163        for key in terminal_keys.into_iter().take(overflow) {
164            self.idempotency_index.shift_remove(&key);
165        }
166    }
167}
168
169#[cfg(test)]
170#[allow(clippy::unwrap_used)]
171mod tests {
172    use super::*;
173
174    #[test]
175    fn accept_and_retrieve() {
176        let mut ledger = InputLedger::new();
177        let id = InputId::new();
178        let state = InputState::new_accepted(id.clone());
179        ledger.accept(state);
180
181        assert_eq!(ledger.len(), 1);
182        assert!(!ledger.is_empty());
183        let retrieved = ledger.get(&id).unwrap();
184        assert_eq!(retrieved.input_id, id);
185    }
186
187    #[test]
188    fn dedup_by_idempotency_key() {
189        let mut ledger = InputLedger::new();
190        let key = IdempotencyKey::new("req-123");
191
192        let id1 = InputId::new();
193        let state1 = InputState::new_accepted(id1.clone());
194        let result = ledger.accept_with_idempotency(state1, key.clone());
195        assert!(result.is_none()); // First time — accepted
196
197        let id2 = InputId::new();
198        let state2 = InputState::new_accepted(id2);
199        let result = ledger.accept_with_idempotency(state2, key);
200        assert!(result.is_some()); // Duplicate — returns existing ID
201        assert_eq!(result.unwrap(), id1);
202        assert_eq!(ledger.len(), 1); // Only one entry
203    }
204
205    #[test]
206    fn iter_non_terminal() {
207        let mut ledger = InputLedger::new();
208
209        let id1 = InputId::new();
210        let state1 = InputState::new_accepted(id1.clone());
211        ledger.accept(state1);
212
213        let id2 = InputId::new();
214        let mut state2 = InputState::new_accepted(id2);
215        state2.terminal_outcome = Some(crate::input_state::InputTerminalOutcome::Consumed);
216        ledger.accept(state2);
217
218        let active: Vec<_> = ledger.iter_non_terminal().collect();
219        assert_eq!(active.len(), 1);
220        assert_eq!(active[0].0, &id1);
221    }
222
223    #[test]
224    fn active_count() {
225        let mut ledger = InputLedger::new();
226
227        ledger.accept(InputState::new_accepted(InputId::new()));
228        ledger.accept(InputState::new_accepted(InputId::new()));
229
230        let id3 = InputId::new();
231        let mut state3 = InputState::new_accepted(id3);
232        state3.terminal_outcome = Some(crate::input_state::InputTerminalOutcome::Superseded {
233            superseded_by: InputId::new(),
234        });
235        ledger.accept(state3);
236
237        assert_eq!(ledger.len(), 3);
238        assert_eq!(ledger.active_count(), 2);
239    }
240
241    #[test]
242    fn active_input_ids() {
243        let mut ledger = InputLedger::new();
244        let id1 = InputId::new();
245        let id2 = InputId::new();
246        ledger.accept(InputState::new_accepted(id1.clone()));
247        ledger.accept(InputState::new_accepted(id2.clone()));
248
249        let ids = ledger.active_input_ids();
250        assert_eq!(ids.len(), 2);
251        assert!(ids.contains(&id1));
252        assert!(ids.contains(&id2));
253    }
254
255    #[test]
256    fn recover_rebuilds_idempotency_index() {
257        let mut ledger = InputLedger::new();
258        let key = IdempotencyKey::new("req-123");
259
260        // Simulate recovery: inject state with an idempotency key
261        let id1 = InputId::new();
262        let mut state = InputState::new_accepted(id1.clone());
263        state.idempotency_key = Some(key.clone());
264        state.durability = Some(InputDurability::Durable);
265        assert!(ledger.recover(state));
266
267        // Now try to accept a new input with the same key → should be dedup'd
268        let id2 = InputId::new();
269        let state2 = InputState::new_accepted(id2);
270        let result = ledger.accept_with_idempotency(state2, key);
271        assert_eq!(result, Some(id1), "Dedup should find the recovered input");
272        assert_eq!(ledger.len(), 1, "No duplicate entry should be created");
273    }
274
275    #[test]
276    fn terminal_dedup_retention_evicts_oldest_key_after_limit() {
277        let mut ledger = InputLedger::new();
278        let oldest_key = IdempotencyKey::new("oldest-terminal-key");
279        let retained_keys = [
280            IdempotencyKey::new("terminal-key-1"),
281            IdempotencyKey::new("terminal-key-2"),
282            IdempotencyKey::new("terminal-key-3"),
283        ];
284
285        for key in std::iter::once(oldest_key.clone()).chain(retained_keys.iter().cloned()) {
286            let mut state = InputState::new_accepted(InputId::new());
287            // InputLifecycle was absorbed into MeerkatMachine DSL; tests
288            // construct terminal state directly instead of simulating the
289            // authority apply that used to live in a standalone machine.
290            state.terminal_outcome = Some(crate::input_state::InputTerminalOutcome::Consumed);
291            let input_id = state.input_id.clone();
292            ledger.states.insert(input_id.clone(), state);
293            ledger.idempotency_index.insert(key, input_id);
294        }
295
296        ledger.prune_terminal_dedup_with_limit(Utc::now(), retained_keys.len());
297        assert!(
298            ledger.input_id_for_idempotency_key(&oldest_key).is_none(),
299            "the oldest terminal dedup key should be evicted once the retention cap is exceeded"
300        );
301        for key in retained_keys {
302            assert!(
303                ledger.input_id_for_idempotency_key(&key).is_some(),
304                "newer terminal dedup key should be retained"
305            );
306        }
307    }
308
309    #[test]
310    fn recover_filters_ephemeral() {
311        let mut ledger = InputLedger::new();
312
313        // Ephemeral input should be filtered out during recovery
314        let mut state = InputState::new_accepted(InputId::new());
315        state.durability = Some(InputDurability::Ephemeral);
316        assert!(
317            !ledger.recover(state),
318            "Ephemeral inputs should be filtered"
319        );
320        assert!(ledger.is_empty());
321
322        // Durable input should be kept
323        let mut state = InputState::new_accepted(InputId::new());
324        state.durability = Some(InputDurability::Durable);
325        assert!(ledger.recover(state), "Durable inputs should be kept");
326        assert_eq!(ledger.len(), 1);
327    }
328}