meerkat_runtime/
input_ledger.rs1use indexmap::IndexMap;
6use meerkat_core::lifecycle::InputId;
7
8use crate::identifiers::IdempotencyKey;
9use crate::input::InputDurability;
10use crate::input_state::InputState;
11
12#[derive(Debug, Default, Clone)]
14pub struct InputLedger {
15 states: IndexMap<InputId, InputState>,
17 idempotency_index: IndexMap<IdempotencyKey, InputId>,
19}
20
21impl InputLedger {
22 pub fn new() -> Self {
24 Self::default()
25 }
26
27 pub fn accept(&mut self, state: InputState) {
29 self.states.insert(state.input_id.clone(), state);
30 }
31
32 pub fn accept_with_idempotency(
35 &mut self,
36 state: InputState,
37 key: IdempotencyKey,
38 ) -> Option<InputId> {
39 if let Some(existing_id) = self.idempotency_index.get(&key) {
40 return Some(existing_id.clone());
41 }
42 let input_id = state.input_id.clone();
43 self.idempotency_index.insert(key, input_id);
44 self.states.insert(state.input_id.clone(), state);
45 None
46 }
47
48 pub fn recover(&mut self, state: InputState) -> bool {
54 if state.durability == Some(InputDurability::Ephemeral) {
56 return false;
57 }
58
59 if let Some(ref key) = state.idempotency_key {
61 self.idempotency_index
62 .insert(key.clone(), state.input_id.clone());
63 }
64
65 self.states.insert(state.input_id.clone(), state);
66 true
67 }
68
69 pub fn get(&self, input_id: &InputId) -> Option<&InputState> {
71 self.states.get(input_id)
72 }
73
74 pub fn remove(&mut self, input_id: &InputId) -> Option<InputState> {
76 let removed = self.states.shift_remove(input_id)?;
77 if let Some(key) = &removed.idempotency_key {
78 self.idempotency_index.shift_remove(key);
79 }
80 Some(removed)
81 }
82
83 pub fn get_mut(&mut self, input_id: &InputId) -> Option<&mut InputState> {
85 self.states.get_mut(input_id)
86 }
87
88 pub fn iter_non_terminal(&self) -> impl Iterator<Item = (&InputId, &InputState)> {
90 self.states.iter().filter(|(_, s)| !s.is_terminal())
91 }
92
93 pub fn iter(&self) -> impl Iterator<Item = (&InputId, &InputState)> {
95 self.states.iter()
96 }
97
98 pub fn len(&self) -> usize {
100 self.states.len()
101 }
102
103 pub fn is_empty(&self) -> bool {
105 self.states.is_empty()
106 }
107
108 pub fn active_count(&self) -> usize {
110 self.states.values().filter(|s| !s.is_terminal()).count()
111 }
112
113 pub fn active_input_ids(&self) -> Vec<InputId> {
115 self.iter_non_terminal().map(|(id, _)| id.clone()).collect()
116 }
117}
118
119#[cfg(test)]
120#[allow(clippy::unwrap_used)]
121mod tests {
122 use super::*;
123 use crate::input_machine::InputStateMachine;
124 use crate::input_state::InputLifecycleState;
125
126 #[test]
127 fn accept_and_retrieve() {
128 let mut ledger = InputLedger::new();
129 let id = InputId::new();
130 let state = InputState::new_accepted(id.clone());
131 ledger.accept(state);
132
133 assert_eq!(ledger.len(), 1);
134 assert!(!ledger.is_empty());
135 let retrieved = ledger.get(&id).unwrap();
136 assert_eq!(retrieved.input_id, id);
137 }
138
139 #[test]
140 fn dedup_by_idempotency_key() {
141 let mut ledger = InputLedger::new();
142 let key = IdempotencyKey::new("req-123");
143
144 let id1 = InputId::new();
145 let state1 = InputState::new_accepted(id1.clone());
146 let result = ledger.accept_with_idempotency(state1, key.clone());
147 assert!(result.is_none()); let id2 = InputId::new();
150 let state2 = InputState::new_accepted(id2);
151 let result = ledger.accept_with_idempotency(state2, key);
152 assert!(result.is_some()); assert_eq!(result.unwrap(), id1);
154 assert_eq!(ledger.len(), 1); }
156
157 #[test]
158 fn iter_non_terminal() {
159 let mut ledger = InputLedger::new();
160
161 let id1 = InputId::new();
162 let state1 = InputState::new_accepted(id1.clone());
163 ledger.accept(state1);
164
165 let id2 = InputId::new();
166 let mut state2 = InputState::new_accepted(id2);
167 InputStateMachine::transition(&mut state2, InputLifecycleState::Consumed, None).unwrap();
168 ledger.accept(state2);
169
170 let active: Vec<_> = ledger.iter_non_terminal().collect();
171 assert_eq!(active.len(), 1);
172 assert_eq!(active[0].0, &id1);
173 }
174
175 #[test]
176 fn active_count() {
177 let mut ledger = InputLedger::new();
178
179 ledger.accept(InputState::new_accepted(InputId::new()));
180 ledger.accept(InputState::new_accepted(InputId::new()));
181
182 let id3 = InputId::new();
183 let mut state3 = InputState::new_accepted(id3);
184 InputStateMachine::transition(&mut state3, InputLifecycleState::Superseded, None).unwrap();
185 ledger.accept(state3);
186
187 assert_eq!(ledger.len(), 3);
188 assert_eq!(ledger.active_count(), 2);
189 }
190
191 #[test]
192 fn active_input_ids() {
193 let mut ledger = InputLedger::new();
194 let id1 = InputId::new();
195 let id2 = InputId::new();
196 ledger.accept(InputState::new_accepted(id1.clone()));
197 ledger.accept(InputState::new_accepted(id2.clone()));
198
199 let ids = ledger.active_input_ids();
200 assert_eq!(ids.len(), 2);
201 assert!(ids.contains(&id1));
202 assert!(ids.contains(&id2));
203 }
204
205 #[test]
206 fn recover_rebuilds_idempotency_index() {
207 let mut ledger = InputLedger::new();
208 let key = IdempotencyKey::new("req-123");
209
210 let id1 = InputId::new();
212 let mut state = InputState::new_accepted(id1.clone());
213 state.idempotency_key = Some(key.clone());
214 state.durability = Some(InputDurability::Durable);
215 assert!(ledger.recover(state));
216
217 let id2 = InputId::new();
219 let state2 = InputState::new_accepted(id2);
220 let result = ledger.accept_with_idempotency(state2, key);
221 assert_eq!(result, Some(id1), "Dedup should find the recovered input");
222 assert_eq!(ledger.len(), 1, "No duplicate entry should be created");
223 }
224
225 #[test]
226 fn recover_filters_ephemeral() {
227 let mut ledger = InputLedger::new();
228
229 let mut state = InputState::new_accepted(InputId::new());
231 state.durability = Some(InputDurability::Ephemeral);
232 assert!(
233 !ledger.recover(state),
234 "Ephemeral inputs should be filtered"
235 );
236 assert!(ledger.is_empty());
237
238 let mut state = InputState::new_accepted(InputId::new());
240 state.durability = Some(InputDurability::Durable);
241 assert!(ledger.recover(state), "Durable inputs should be kept");
242 assert_eq!(ledger.len(), 1);
243 }
244}