meerkat_runtime/
input_ledger.rs1use indexmap::IndexMap;
6use meerkat_core::lifecycle::InputId;
7
8use crate::identifiers::IdempotencyKey;
9use crate::input::InputDurability;
10use crate::input_state::InputState;
11
12#[derive(Debug, Default, Clone)]
14pub struct InputLedger {
15 states: IndexMap<InputId, InputState>,
17 idempotency_index: IndexMap<IdempotencyKey, InputId>,
19}
20
21impl InputLedger {
22 pub fn new() -> Self {
24 Self::default()
25 }
26
27 pub fn accept(&mut self, state: InputState) {
29 self.states.insert(state.input_id.clone(), state);
30 }
31
32 pub fn accept_with_idempotency(
35 &mut self,
36 state: InputState,
37 key: IdempotencyKey,
38 ) -> Option<InputId> {
39 if let Some(existing_id) = self.idempotency_index.get(&key) {
40 return Some(existing_id.clone());
41 }
42 let input_id = state.input_id.clone();
43 self.idempotency_index.insert(key, input_id);
44 self.states.insert(state.input_id.clone(), state);
45 None
46 }
47
48 pub fn recover(&mut self, state: InputState) -> bool {
54 if state.durability == Some(InputDurability::Ephemeral) {
56 return false;
57 }
58
59 if let Some(ref key) = state.idempotency_key {
61 self.idempotency_index
62 .insert(key.clone(), state.input_id.clone());
63 }
64
65 self.states.insert(state.input_id.clone(), state);
66 true
67 }
68
69 pub fn get(&self, input_id: &InputId) -> Option<&InputState> {
71 self.states.get(input_id)
72 }
73
74 pub fn remove(&mut self, input_id: &InputId) -> Option<InputState> {
76 let removed = self.states.shift_remove(input_id)?;
77 if let Some(key) = &removed.idempotency_key {
78 self.idempotency_index.shift_remove(key);
79 }
80 Some(removed)
81 }
82
83 pub fn get_mut(&mut self, input_id: &InputId) -> Option<&mut InputState> {
85 self.states.get_mut(input_id)
86 }
87
88 pub fn iter_non_terminal(&self) -> impl Iterator<Item = (&InputId, &InputState)> {
90 self.states.iter().filter(|(_, s)| !s.is_terminal())
91 }
92
93 pub fn iter(&self) -> impl Iterator<Item = (&InputId, &InputState)> {
95 self.states.iter()
96 }
97
98 pub fn len(&self) -> usize {
100 self.states.len()
101 }
102
103 pub fn is_empty(&self) -> bool {
105 self.states.is_empty()
106 }
107
108 pub fn active_count(&self) -> usize {
110 self.states.values().filter(|s| !s.is_terminal()).count()
111 }
112
113 pub fn active_input_ids(&self) -> Vec<InputId> {
115 self.iter_non_terminal().map(|(id, _)| id.clone()).collect()
116 }
117}
118
119#[cfg(test)]
120#[allow(clippy::unwrap_used)]
121mod tests {
122 use super::*;
123 use crate::input_lifecycle_authority::InputLifecycleInput;
124
125 #[test]
126 fn accept_and_retrieve() {
127 let mut ledger = InputLedger::new();
128 let id = InputId::new();
129 let state = InputState::new_accepted(id.clone());
130 ledger.accept(state);
131
132 assert_eq!(ledger.len(), 1);
133 assert!(!ledger.is_empty());
134 let retrieved = ledger.get(&id).unwrap();
135 assert_eq!(retrieved.input_id, id);
136 }
137
138 #[test]
139 fn dedup_by_idempotency_key() {
140 let mut ledger = InputLedger::new();
141 let key = IdempotencyKey::new("req-123");
142
143 let id1 = InputId::new();
144 let state1 = InputState::new_accepted(id1.clone());
145 let result = ledger.accept_with_idempotency(state1, key.clone());
146 assert!(result.is_none()); let id2 = InputId::new();
149 let state2 = InputState::new_accepted(id2);
150 let result = ledger.accept_with_idempotency(state2, key);
151 assert!(result.is_some()); assert_eq!(result.unwrap(), id1);
153 assert_eq!(ledger.len(), 1); }
155
156 #[test]
157 fn iter_non_terminal() {
158 let mut ledger = InputLedger::new();
159
160 let id1 = InputId::new();
161 let state1 = InputState::new_accepted(id1.clone());
162 ledger.accept(state1);
163
164 let id2 = InputId::new();
165 let mut state2 = InputState::new_accepted(id2);
166 state2.apply(InputLifecycleInput::ConsumeOnAccept).unwrap();
167 ledger.accept(state2);
168
169 let active: Vec<_> = ledger.iter_non_terminal().collect();
170 assert_eq!(active.len(), 1);
171 assert_eq!(active[0].0, &id1);
172 }
173
174 #[test]
175 fn active_count() {
176 let mut ledger = InputLedger::new();
177
178 ledger.accept(InputState::new_accepted(InputId::new()));
179 ledger.accept(InputState::new_accepted(InputId::new()));
180
181 let id3 = InputId::new();
182 let mut state3 = InputState::new_accepted(id3);
183 state3.apply(InputLifecycleInput::QueueAccepted).unwrap();
184 state3.apply(InputLifecycleInput::Supersede).unwrap();
185 ledger.accept(state3);
186
187 assert_eq!(ledger.len(), 3);
188 assert_eq!(ledger.active_count(), 2);
189 }
190
191 #[test]
192 fn active_input_ids() {
193 let mut ledger = InputLedger::new();
194 let id1 = InputId::new();
195 let id2 = InputId::new();
196 ledger.accept(InputState::new_accepted(id1.clone()));
197 ledger.accept(InputState::new_accepted(id2.clone()));
198
199 let ids = ledger.active_input_ids();
200 assert_eq!(ids.len(), 2);
201 assert!(ids.contains(&id1));
202 assert!(ids.contains(&id2));
203 }
204
205 #[test]
206 fn recover_rebuilds_idempotency_index() {
207 let mut ledger = InputLedger::new();
208 let key = IdempotencyKey::new("req-123");
209
210 let id1 = InputId::new();
212 let mut state = InputState::new_accepted(id1.clone());
213 state.idempotency_key = Some(key.clone());
214 state.durability = Some(InputDurability::Durable);
215 assert!(ledger.recover(state));
216
217 let id2 = InputId::new();
219 let state2 = InputState::new_accepted(id2);
220 let result = ledger.accept_with_idempotency(state2, key);
221 assert_eq!(result, Some(id1), "Dedup should find the recovered input");
222 assert_eq!(ledger.len(), 1, "No duplicate entry should be created");
223 }
224
225 #[test]
226 fn recover_filters_ephemeral() {
227 let mut ledger = InputLedger::new();
228
229 let mut state = InputState::new_accepted(InputId::new());
231 state.durability = Some(InputDurability::Ephemeral);
232 assert!(
233 !ledger.recover(state),
234 "Ephemeral inputs should be filtered"
235 );
236 assert!(ledger.is_empty());
237
238 let mut state = InputState::new_accepted(InputId::new());
240 state.durability = Some(InputDurability::Durable);
241 assert!(ledger.recover(state), "Durable inputs should be kept");
242 assert_eq!(ledger.len(), 1);
243 }
244}