use chrono::{Duration, Utc};
use indexmap::IndexMap;
use meerkat_core::lifecycle::InputId;
use crate::identifiers::IdempotencyKey;
use crate::input::InputDurability;
use crate::input_state::InputState;
const TERMINAL_DEDUP_RETENTION_LIMIT: usize = 4096;
const TERMINAL_DEDUP_RETENTION_TTL_HOURS: i64 = 24;
#[derive(Debug, Default, Clone)]
pub struct InputLedger {
states: IndexMap<InputId, InputState>,
idempotency_index: IndexMap<IdempotencyKey, InputId>,
}
impl InputLedger {
pub fn new() -> Self {
Self::default()
}
pub fn accept(&mut self, state: InputState) {
self.states.insert(state.input_id.clone(), state);
}
pub fn accept_with_idempotency(
&mut self,
state: InputState,
key: IdempotencyKey,
) -> Option<InputId> {
self.prune_terminal_dedup(Utc::now());
if let Some(existing_id) = self.idempotency_index.get(&key) {
return Some(existing_id.clone());
}
let input_id = state.input_id.clone();
self.idempotency_index.insert(key, input_id);
self.states.insert(state.input_id.clone(), state);
None
}
pub fn accept_with_active_idempotency(
&mut self,
state: InputState,
key: IdempotencyKey,
) -> Option<InputId> {
self.prune_terminal_dedup(Utc::now());
if let Some(existing_id) = self.idempotency_index.get(&key).cloned() {
if self
.states
.get(&existing_id)
.is_some_and(|state| !state.is_terminal())
{
return Some(existing_id);
}
self.idempotency_index.shift_remove(&key);
}
let input_id = state.input_id.clone();
self.idempotency_index.insert(key, input_id);
self.states.insert(state.input_id.clone(), state);
None
}
pub fn recover(&mut self, state: InputState) -> bool {
if state.durability == Some(InputDurability::Ephemeral) {
return false;
}
if let Some(ref key) = state.idempotency_key {
self.idempotency_index
.insert(key.clone(), state.input_id.clone());
}
self.states.insert(state.input_id.clone(), state);
true
}
pub fn get(&self, input_id: &InputId) -> Option<&InputState> {
self.states.get(input_id)
}
pub fn input_id_for_idempotency_key(&self, key: &IdempotencyKey) -> Option<InputId> {
self.idempotency_index.get(key).cloned()
}
pub fn remove(&mut self, input_id: &InputId) -> Option<InputState> {
let removed = self.states.shift_remove(input_id)?;
if let Some(key) = &removed.idempotency_key {
self.idempotency_index.shift_remove(key);
}
Some(removed)
}
pub fn get_mut(&mut self, input_id: &InputId) -> Option<&mut InputState> {
self.states.get_mut(input_id)
}
pub fn iter_non_terminal(&self) -> impl Iterator<Item = (&InputId, &InputState)> {
self.states.iter().filter(|(_, state)| !state.is_terminal())
}
pub fn iter(&self) -> impl Iterator<Item = (&InputId, &InputState)> {
self.states.iter()
}
pub fn len(&self) -> usize {
self.states.len()
}
pub fn is_empty(&self) -> bool {
self.states.is_empty()
}
pub fn active_count(&self) -> usize {
self.states.values().filter(|s| !s.is_terminal()).count()
}
pub fn active_input_ids(&self) -> Vec<InputId> {
self.iter_non_terminal().map(|(id, _)| id.clone()).collect()
}
fn prune_terminal_dedup(&mut self, now: chrono::DateTime<Utc>) {
self.prune_terminal_dedup_with_limit(now, TERMINAL_DEDUP_RETENTION_LIMIT);
}
fn prune_terminal_dedup_with_limit(
&mut self,
now: chrono::DateTime<Utc>,
retention_limit: usize,
) {
let cutoff = now - Duration::hours(TERMINAL_DEDUP_RETENTION_TTL_HOURS);
let expired_keys = self
.idempotency_index
.iter()
.filter_map(|(key, input_id)| {
let state = self.states.get(input_id)?;
(state.is_terminal() && state.updated_at() < cutoff).then(|| key.clone())
})
.collect::<Vec<_>>();
for key in expired_keys {
self.idempotency_index.shift_remove(&key);
}
let terminal_keys = self
.idempotency_index
.iter()
.filter(|(_, input_id)| {
self.states
.get(*input_id)
.is_some_and(InputState::is_terminal)
})
.map(|(key, _)| key.clone())
.collect::<Vec<_>>();
let overflow = terminal_keys.len().saturating_sub(retention_limit);
for key in terminal_keys.into_iter().take(overflow) {
self.idempotency_index.shift_remove(&key);
}
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
#[test]
fn accept_and_retrieve() {
let mut ledger = InputLedger::new();
let id = InputId::new();
let state = InputState::new_accepted(id.clone());
ledger.accept(state);
assert_eq!(ledger.len(), 1);
assert!(!ledger.is_empty());
let retrieved = ledger.get(&id).unwrap();
assert_eq!(retrieved.input_id, id);
}
#[test]
fn dedup_by_idempotency_key() {
let mut ledger = InputLedger::new();
let key = IdempotencyKey::new("req-123");
let id1 = InputId::new();
let state1 = InputState::new_accepted(id1.clone());
let result = ledger.accept_with_idempotency(state1, key.clone());
assert!(result.is_none());
let id2 = InputId::new();
let state2 = InputState::new_accepted(id2);
let result = ledger.accept_with_idempotency(state2, key);
assert!(result.is_some()); assert_eq!(result.unwrap(), id1);
assert_eq!(ledger.len(), 1); }
#[test]
fn active_idempotency_allows_retry_after_terminal_attempt() {
let mut ledger = InputLedger::new();
let key = IdempotencyKey::new("active-only");
let id1 = InputId::new();
let state1 = InputState::new_accepted(id1.clone());
assert!(
ledger
.accept_with_active_idempotency(state1, key.clone())
.is_none()
);
let id2 = InputId::new();
let state2 = InputState::new_accepted(id2.clone());
assert_eq!(
ledger.accept_with_active_idempotency(state2, key.clone()),
Some(id1.clone())
);
ledger
.get_mut(&id1)
.expect("first input state")
.terminal_outcome = Some(crate::input_state::InputTerminalOutcome::Consumed);
let id3 = InputId::new();
let state3 = InputState::new_accepted(id3.clone());
assert!(
ledger
.accept_with_active_idempotency(state3, key.clone())
.is_none(),
"terminal active-only keys must not block later retries"
);
assert_eq!(ledger.input_id_for_idempotency_key(&key), Some(id3));
}
#[test]
fn iter_non_terminal() {
let mut ledger = InputLedger::new();
let id1 = InputId::new();
let state1 = InputState::new_accepted(id1.clone());
ledger.accept(state1);
let id2 = InputId::new();
let mut state2 = InputState::new_accepted(id2);
state2.terminal_outcome = Some(crate::input_state::InputTerminalOutcome::Consumed);
ledger.accept(state2);
let active: Vec<_> = ledger.iter_non_terminal().collect();
assert_eq!(active.len(), 1);
assert_eq!(active[0].0, &id1);
}
#[test]
fn active_count() {
let mut ledger = InputLedger::new();
ledger.accept(InputState::new_accepted(InputId::new()));
ledger.accept(InputState::new_accepted(InputId::new()));
let id3 = InputId::new();
let mut state3 = InputState::new_accepted(id3);
state3.terminal_outcome = Some(crate::input_state::InputTerminalOutcome::Superseded {
superseded_by: InputId::new(),
});
ledger.accept(state3);
assert_eq!(ledger.len(), 3);
assert_eq!(ledger.active_count(), 2);
}
#[test]
fn active_input_ids() {
let mut ledger = InputLedger::new();
let id1 = InputId::new();
let id2 = InputId::new();
ledger.accept(InputState::new_accepted(id1.clone()));
ledger.accept(InputState::new_accepted(id2.clone()));
let ids = ledger.active_input_ids();
assert_eq!(ids.len(), 2);
assert!(ids.contains(&id1));
assert!(ids.contains(&id2));
}
#[test]
fn recover_rebuilds_idempotency_index() {
let mut ledger = InputLedger::new();
let key = IdempotencyKey::new("req-123");
let id1 = InputId::new();
let mut state = InputState::new_accepted(id1.clone());
state.idempotency_key = Some(key.clone());
state.durability = Some(InputDurability::Durable);
assert!(ledger.recover(state));
let id2 = InputId::new();
let state2 = InputState::new_accepted(id2);
let result = ledger.accept_with_idempotency(state2, key);
assert_eq!(result, Some(id1), "Dedup should find the recovered input");
assert_eq!(ledger.len(), 1, "No duplicate entry should be created");
}
#[test]
fn terminal_dedup_retention_evicts_oldest_key_after_limit() {
let mut ledger = InputLedger::new();
let oldest_key = IdempotencyKey::new("oldest-terminal-key");
let retained_keys = [
IdempotencyKey::new("terminal-key-1"),
IdempotencyKey::new("terminal-key-2"),
IdempotencyKey::new("terminal-key-3"),
];
for key in std::iter::once(oldest_key.clone()).chain(retained_keys.iter().cloned()) {
let mut state = InputState::new_accepted(InputId::new());
state.terminal_outcome = Some(crate::input_state::InputTerminalOutcome::Consumed);
let input_id = state.input_id.clone();
ledger.states.insert(input_id.clone(), state);
ledger.idempotency_index.insert(key, input_id);
}
ledger.prune_terminal_dedup_with_limit(Utc::now(), retained_keys.len());
assert!(
ledger.input_id_for_idempotency_key(&oldest_key).is_none(),
"the oldest terminal dedup key should be evicted once the retention cap is exceeded"
);
for key in retained_keys {
assert!(
ledger.input_id_for_idempotency_key(&key).is_some(),
"newer terminal dedup key should be retained"
);
}
}
#[test]
fn recover_filters_ephemeral() {
let mut ledger = InputLedger::new();
let mut state = InputState::new_accepted(InputId::new());
state.durability = Some(InputDurability::Ephemeral);
assert!(
!ledger.recover(state),
"Ephemeral inputs should be filtered"
);
assert!(ledger.is_empty());
let mut state = InputState::new_accepted(InputId::new());
state.durability = Some(InputDurability::Durable);
assert!(ledger.recover(state), "Durable inputs should be kept");
assert_eq!(ledger.len(), 1);
}
}