Skip to main content

meerkat_runtime/
input_ledger.rs

1//! InputLedger — in-memory ledger of InputState entries.
2//!
3//! IndexMap<InputId, InputState> with dedup by idempotency_key.
4
5use indexmap::IndexMap;
6use meerkat_core::lifecycle::InputId;
7
8use crate::identifiers::IdempotencyKey;
9use crate::input::InputDurability;
10use crate::input_state::InputState;
11
12/// In-memory ledger tracking InputState for all inputs.
13#[derive(Debug, Default, Clone)]
14pub struct InputLedger {
15    /// InputId → InputState (insertion order preserved).
16    states: IndexMap<InputId, InputState>,
17    /// IdempotencyKey → InputId for dedup lookup.
18    idempotency_index: IndexMap<IdempotencyKey, InputId>,
19}
20
21impl InputLedger {
22    /// Create a new empty ledger.
23    pub fn new() -> Self {
24        Self::default()
25    }
26
27    /// Accept a new InputState into the ledger.
28    pub fn accept(&mut self, state: InputState) {
29        self.states.insert(state.input_id.clone(), state);
30    }
31
32    /// Accept with an idempotency key for dedup.
33    /// Returns `Some(existing_id)` if the key already exists (dedup hit).
34    pub fn accept_with_idempotency(
35        &mut self,
36        state: InputState,
37        key: IdempotencyKey,
38    ) -> Option<InputId> {
39        if let Some(existing_id) = self.idempotency_index.get(&key) {
40            return Some(existing_id.clone());
41        }
42        let input_id = state.input_id.clone();
43        self.idempotency_index.insert(key, input_id);
44        self.states.insert(state.input_id.clone(), state);
45        None
46    }
47
48    /// Recover a durable InputState from persistent storage.
49    ///
50    /// Unlike `accept()`, this also rebuilds the idempotency index
51    /// and filters out Ephemeral inputs (which should not survive restart).
52    /// Returns `true` if the state was inserted, `false` if filtered.
53    pub fn recover(&mut self, state: InputState) -> bool {
54        // Ephemeral inputs should not survive restarts
55        if state.durability == Some(InputDurability::Ephemeral) {
56            return false;
57        }
58
59        // Rebuild idempotency index so dedup works after restart
60        if let Some(ref key) = state.idempotency_key {
61            self.idempotency_index
62                .insert(key.clone(), state.input_id.clone());
63        }
64
65        self.states.insert(state.input_id.clone(), state);
66        true
67    }
68
69    /// Get the state of a specific input.
70    pub fn get(&self, input_id: &InputId) -> Option<&InputState> {
71        self.states.get(input_id)
72    }
73
74    /// Remove an input from the ledger and dedup index.
75    pub fn remove(&mut self, input_id: &InputId) -> Option<InputState> {
76        let removed = self.states.shift_remove(input_id)?;
77        if let Some(key) = &removed.idempotency_key {
78            self.idempotency_index.shift_remove(key);
79        }
80        Some(removed)
81    }
82
83    /// Get mutable reference to the state of a specific input.
84    pub fn get_mut(&mut self, input_id: &InputId) -> Option<&mut InputState> {
85        self.states.get_mut(input_id)
86    }
87
88    /// Iterate over all non-terminal input states.
89    pub fn iter_non_terminal(&self) -> impl Iterator<Item = (&InputId, &InputState)> {
90        self.states.iter().filter(|(_, s)| !s.is_terminal())
91    }
92
93    /// Iterate over all input states.
94    pub fn iter(&self) -> impl Iterator<Item = (&InputId, &InputState)> {
95        self.states.iter()
96    }
97
98    /// Number of entries in the ledger.
99    pub fn len(&self) -> usize {
100        self.states.len()
101    }
102
103    /// Check if the ledger is empty.
104    pub fn is_empty(&self) -> bool {
105        self.states.is_empty()
106    }
107
108    /// Number of non-terminal entries.
109    pub fn active_count(&self) -> usize {
110        self.states.values().filter(|s| !s.is_terminal()).count()
111    }
112
113    /// Get all active (non-terminal) input IDs.
114    pub fn active_input_ids(&self) -> Vec<InputId> {
115        self.iter_non_terminal().map(|(id, _)| id.clone()).collect()
116    }
117}
118
119#[cfg(test)]
120#[allow(clippy::unwrap_used)]
121mod tests {
122    use super::*;
123    use crate::input_machine::InputStateMachine;
124    use crate::input_state::InputLifecycleState;
125
126    #[test]
127    fn accept_and_retrieve() {
128        let mut ledger = InputLedger::new();
129        let id = InputId::new();
130        let state = InputState::new_accepted(id.clone());
131        ledger.accept(state);
132
133        assert_eq!(ledger.len(), 1);
134        assert!(!ledger.is_empty());
135        let retrieved = ledger.get(&id).unwrap();
136        assert_eq!(retrieved.input_id, id);
137    }
138
139    #[test]
140    fn dedup_by_idempotency_key() {
141        let mut ledger = InputLedger::new();
142        let key = IdempotencyKey::new("req-123");
143
144        let id1 = InputId::new();
145        let state1 = InputState::new_accepted(id1.clone());
146        let result = ledger.accept_with_idempotency(state1, key.clone());
147        assert!(result.is_none()); // First time — accepted
148
149        let id2 = InputId::new();
150        let state2 = InputState::new_accepted(id2);
151        let result = ledger.accept_with_idempotency(state2, key);
152        assert!(result.is_some()); // Duplicate — returns existing ID
153        assert_eq!(result.unwrap(), id1);
154        assert_eq!(ledger.len(), 1); // Only one entry
155    }
156
157    #[test]
158    fn iter_non_terminal() {
159        let mut ledger = InputLedger::new();
160
161        let id1 = InputId::new();
162        let state1 = InputState::new_accepted(id1.clone());
163        ledger.accept(state1);
164
165        let id2 = InputId::new();
166        let mut state2 = InputState::new_accepted(id2);
167        InputStateMachine::transition(&mut state2, InputLifecycleState::Consumed, None).unwrap();
168        ledger.accept(state2);
169
170        let active: Vec<_> = ledger.iter_non_terminal().collect();
171        assert_eq!(active.len(), 1);
172        assert_eq!(active[0].0, &id1);
173    }
174
175    #[test]
176    fn active_count() {
177        let mut ledger = InputLedger::new();
178
179        ledger.accept(InputState::new_accepted(InputId::new()));
180        ledger.accept(InputState::new_accepted(InputId::new()));
181
182        let id3 = InputId::new();
183        let mut state3 = InputState::new_accepted(id3);
184        InputStateMachine::transition(&mut state3, InputLifecycleState::Superseded, None).unwrap();
185        ledger.accept(state3);
186
187        assert_eq!(ledger.len(), 3);
188        assert_eq!(ledger.active_count(), 2);
189    }
190
191    #[test]
192    fn active_input_ids() {
193        let mut ledger = InputLedger::new();
194        let id1 = InputId::new();
195        let id2 = InputId::new();
196        ledger.accept(InputState::new_accepted(id1.clone()));
197        ledger.accept(InputState::new_accepted(id2.clone()));
198
199        let ids = ledger.active_input_ids();
200        assert_eq!(ids.len(), 2);
201        assert!(ids.contains(&id1));
202        assert!(ids.contains(&id2));
203    }
204
205    #[test]
206    fn recover_rebuilds_idempotency_index() {
207        let mut ledger = InputLedger::new();
208        let key = IdempotencyKey::new("req-123");
209
210        // Simulate recovery: inject state with an idempotency key
211        let id1 = InputId::new();
212        let mut state = InputState::new_accepted(id1.clone());
213        state.idempotency_key = Some(key.clone());
214        state.durability = Some(InputDurability::Durable);
215        assert!(ledger.recover(state));
216
217        // Now try to accept a new input with the same key → should be dedup'd
218        let id2 = InputId::new();
219        let state2 = InputState::new_accepted(id2);
220        let result = ledger.accept_with_idempotency(state2, key);
221        assert_eq!(result, Some(id1), "Dedup should find the recovered input");
222        assert_eq!(ledger.len(), 1, "No duplicate entry should be created");
223    }
224
225    #[test]
226    fn recover_filters_ephemeral() {
227        let mut ledger = InputLedger::new();
228
229        // Ephemeral input should be filtered out during recovery
230        let mut state = InputState::new_accepted(InputId::new());
231        state.durability = Some(InputDurability::Ephemeral);
232        assert!(
233            !ledger.recover(state),
234            "Ephemeral inputs should be filtered"
235        );
236        assert!(ledger.is_empty());
237
238        // Durable input should be kept
239        let mut state = InputState::new_accepted(InputId::new());
240        state.durability = Some(InputDurability::Durable);
241        assert!(ledger.recover(state), "Durable inputs should be kept");
242        assert_eq!(ledger.len(), 1);
243    }
244}