meerkat_runtime/
input_ledger.rs1use chrono::{Duration, Utc};
6use indexmap::IndexMap;
7use meerkat_core::lifecycle::InputId;
8
9use crate::identifiers::IdempotencyKey;
10use crate::input::InputDurability;
11use crate::input_state::InputState;
12
13const TERMINAL_DEDUP_RETENTION_LIMIT: usize = 4096;
14const TERMINAL_DEDUP_RETENTION_TTL_HOURS: i64 = 24;
15
16#[derive(Debug, Default, Clone)]
18pub struct InputLedger {
19 states: IndexMap<InputId, InputState>,
21 idempotency_index: IndexMap<IdempotencyKey, InputId>,
23}
24
25impl InputLedger {
26 pub fn new() -> Self {
28 Self::default()
29 }
30
31 pub fn accept(&mut self, state: InputState) {
33 self.states.insert(state.input_id.clone(), state);
34 }
35
36 pub fn accept_with_idempotency(
39 &mut self,
40 state: InputState,
41 key: IdempotencyKey,
42 ) -> Option<InputId> {
43 self.prune_terminal_dedup(Utc::now());
44 if let Some(existing_id) = self.idempotency_index.get(&key) {
45 return Some(existing_id.clone());
46 }
47 let input_id = state.input_id.clone();
48 self.idempotency_index.insert(key, input_id);
49 self.states.insert(state.input_id.clone(), state);
50 None
51 }
52
53 pub fn recover(&mut self, state: InputState) -> bool {
59 if state.durability == Some(InputDurability::Ephemeral) {
61 return false;
62 }
63
64 if let Some(ref key) = state.idempotency_key {
66 self.idempotency_index
67 .insert(key.clone(), state.input_id.clone());
68 }
69
70 self.states.insert(state.input_id.clone(), state);
71 true
72 }
73
74 pub fn get(&self, input_id: &InputId) -> Option<&InputState> {
76 self.states.get(input_id)
77 }
78
79 pub fn input_id_for_idempotency_key(&self, key: &IdempotencyKey) -> Option<InputId> {
81 self.idempotency_index.get(key).cloned()
82 }
83
84 pub fn remove(&mut self, input_id: &InputId) -> Option<InputState> {
86 let removed = self.states.shift_remove(input_id)?;
87 if let Some(key) = &removed.idempotency_key {
88 self.idempotency_index.shift_remove(key);
89 }
90 Some(removed)
91 }
92
93 pub fn get_mut(&mut self, input_id: &InputId) -> Option<&mut InputState> {
95 self.states.get_mut(input_id)
96 }
97
98 pub fn iter_non_terminal(&self) -> impl Iterator<Item = (&InputId, &InputState)> {
100 self.states.iter().filter(|(_, state)| !state.is_terminal())
101 }
102
103 pub fn iter(&self) -> impl Iterator<Item = (&InputId, &InputState)> {
107 self.states.iter()
108 }
109
110 pub fn len(&self) -> usize {
112 self.states.len()
113 }
114
115 pub fn is_empty(&self) -> bool {
117 self.states.is_empty()
118 }
119
120 pub fn active_count(&self) -> usize {
122 self.states.values().filter(|s| !s.is_terminal()).count()
123 }
124
125 pub fn active_input_ids(&self) -> Vec<InputId> {
127 self.iter_non_terminal().map(|(id, _)| id.clone()).collect()
128 }
129
130 fn prune_terminal_dedup(&mut self, now: chrono::DateTime<Utc>) {
131 self.prune_terminal_dedup_with_limit(now, TERMINAL_DEDUP_RETENTION_LIMIT);
132 }
133
134 fn prune_terminal_dedup_with_limit(
135 &mut self,
136 now: chrono::DateTime<Utc>,
137 retention_limit: usize,
138 ) {
139 let cutoff = now - Duration::hours(TERMINAL_DEDUP_RETENTION_TTL_HOURS);
140 let expired_keys = self
141 .idempotency_index
142 .iter()
143 .filter_map(|(key, input_id)| {
144 let state = self.states.get(input_id)?;
145 (state.is_terminal() && state.updated_at() < cutoff).then(|| key.clone())
146 })
147 .collect::<Vec<_>>();
148 for key in expired_keys {
149 self.idempotency_index.shift_remove(&key);
150 }
151
152 let terminal_keys = self
153 .idempotency_index
154 .iter()
155 .filter(|(_, input_id)| {
156 self.states
157 .get(*input_id)
158 .is_some_and(InputState::is_terminal)
159 })
160 .map(|(key, _)| key.clone())
161 .collect::<Vec<_>>();
162 let overflow = terminal_keys.len().saturating_sub(retention_limit);
163 for key in terminal_keys.into_iter().take(overflow) {
164 self.idempotency_index.shift_remove(&key);
165 }
166 }
167}
168
169#[cfg(test)]
170#[allow(clippy::unwrap_used)]
171mod tests {
172 use super::*;
173
174 #[test]
175 fn accept_and_retrieve() {
176 let mut ledger = InputLedger::new();
177 let id = InputId::new();
178 let state = InputState::new_accepted(id.clone());
179 ledger.accept(state);
180
181 assert_eq!(ledger.len(), 1);
182 assert!(!ledger.is_empty());
183 let retrieved = ledger.get(&id).unwrap();
184 assert_eq!(retrieved.input_id, id);
185 }
186
187 #[test]
188 fn dedup_by_idempotency_key() {
189 let mut ledger = InputLedger::new();
190 let key = IdempotencyKey::new("req-123");
191
192 let id1 = InputId::new();
193 let state1 = InputState::new_accepted(id1.clone());
194 let result = ledger.accept_with_idempotency(state1, key.clone());
195 assert!(result.is_none()); let id2 = InputId::new();
198 let state2 = InputState::new_accepted(id2);
199 let result = ledger.accept_with_idempotency(state2, key);
200 assert!(result.is_some()); assert_eq!(result.unwrap(), id1);
202 assert_eq!(ledger.len(), 1); }
204
205 #[test]
206 fn iter_non_terminal() {
207 let mut ledger = InputLedger::new();
208
209 let id1 = InputId::new();
210 let state1 = InputState::new_accepted(id1.clone());
211 ledger.accept(state1);
212
213 let id2 = InputId::new();
214 let mut state2 = InputState::new_accepted(id2);
215 state2.terminal_outcome = Some(crate::input_state::InputTerminalOutcome::Consumed);
216 ledger.accept(state2);
217
218 let active: Vec<_> = ledger.iter_non_terminal().collect();
219 assert_eq!(active.len(), 1);
220 assert_eq!(active[0].0, &id1);
221 }
222
223 #[test]
224 fn active_count() {
225 let mut ledger = InputLedger::new();
226
227 ledger.accept(InputState::new_accepted(InputId::new()));
228 ledger.accept(InputState::new_accepted(InputId::new()));
229
230 let id3 = InputId::new();
231 let mut state3 = InputState::new_accepted(id3);
232 state3.terminal_outcome = Some(crate::input_state::InputTerminalOutcome::Superseded {
233 superseded_by: InputId::new(),
234 });
235 ledger.accept(state3);
236
237 assert_eq!(ledger.len(), 3);
238 assert_eq!(ledger.active_count(), 2);
239 }
240
241 #[test]
242 fn active_input_ids() {
243 let mut ledger = InputLedger::new();
244 let id1 = InputId::new();
245 let id2 = InputId::new();
246 ledger.accept(InputState::new_accepted(id1.clone()));
247 ledger.accept(InputState::new_accepted(id2.clone()));
248
249 let ids = ledger.active_input_ids();
250 assert_eq!(ids.len(), 2);
251 assert!(ids.contains(&id1));
252 assert!(ids.contains(&id2));
253 }
254
255 #[test]
256 fn recover_rebuilds_idempotency_index() {
257 let mut ledger = InputLedger::new();
258 let key = IdempotencyKey::new("req-123");
259
260 let id1 = InputId::new();
262 let mut state = InputState::new_accepted(id1.clone());
263 state.idempotency_key = Some(key.clone());
264 state.durability = Some(InputDurability::Durable);
265 assert!(ledger.recover(state));
266
267 let id2 = InputId::new();
269 let state2 = InputState::new_accepted(id2);
270 let result = ledger.accept_with_idempotency(state2, key);
271 assert_eq!(result, Some(id1), "Dedup should find the recovered input");
272 assert_eq!(ledger.len(), 1, "No duplicate entry should be created");
273 }
274
275 #[test]
276 fn terminal_dedup_retention_evicts_oldest_key_after_limit() {
277 let mut ledger = InputLedger::new();
278 let oldest_key = IdempotencyKey::new("oldest-terminal-key");
279 let retained_keys = [
280 IdempotencyKey::new("terminal-key-1"),
281 IdempotencyKey::new("terminal-key-2"),
282 IdempotencyKey::new("terminal-key-3"),
283 ];
284
285 for key in std::iter::once(oldest_key.clone()).chain(retained_keys.iter().cloned()) {
286 let mut state = InputState::new_accepted(InputId::new());
287 state.terminal_outcome = Some(crate::input_state::InputTerminalOutcome::Consumed);
291 let input_id = state.input_id.clone();
292 ledger.states.insert(input_id.clone(), state);
293 ledger.idempotency_index.insert(key, input_id);
294 }
295
296 ledger.prune_terminal_dedup_with_limit(Utc::now(), retained_keys.len());
297 assert!(
298 ledger.input_id_for_idempotency_key(&oldest_key).is_none(),
299 "the oldest terminal dedup key should be evicted once the retention cap is exceeded"
300 );
301 for key in retained_keys {
302 assert!(
303 ledger.input_id_for_idempotency_key(&key).is_some(),
304 "newer terminal dedup key should be retained"
305 );
306 }
307 }
308
309 #[test]
310 fn recover_filters_ephemeral() {
311 let mut ledger = InputLedger::new();
312
313 let mut state = InputState::new_accepted(InputId::new());
315 state.durability = Some(InputDurability::Ephemeral);
316 assert!(
317 !ledger.recover(state),
318 "Ephemeral inputs should be filtered"
319 );
320 assert!(ledger.is_empty());
321
322 let mut state = InputState::new_accepted(InputId::new());
324 state.durability = Some(InputDurability::Durable);
325 assert!(ledger.recover(state), "Durable inputs should be kept");
326 assert_eq!(ledger.len(), 1);
327 }
328}