Skip to main content

tramli_plugins/idempotency/
mod.rs

1use std::any::TypeId;
2use std::collections::HashSet;
3use std::sync::Mutex;
4use tramli::{CloneAny, FlowEngine, FlowState};
5use crate::resume::{RichResumeExecutor, RichResumeResult, RichResumeStatus};
6
7/// Idempotency registry trait.
8pub trait IdempotencyRegistry: Send + Sync {
9    fn mark_if_first_seen(&self, flow_id: &str, command_id: &str) -> bool;
10}
11
12/// In-memory idempotency registry.
13pub struct InMemoryIdempotencyRegistry {
14    seen: Mutex<HashSet<String>>,
15}
16
17impl InMemoryIdempotencyRegistry {
18    pub fn new() -> Self {
19        Self { seen: Mutex::new(HashSet::new()) }
20    }
21}
22
23impl Default for InMemoryIdempotencyRegistry {
24    fn default() -> Self {
25        Self::new()
26    }
27}
28
29impl IdempotencyRegistry for InMemoryIdempotencyRegistry {
30    fn mark_if_first_seen(&self, flow_id: &str, command_id: &str) -> bool {
31        let key = format!("{}::{}", flow_id, command_id);
32        self.seen.lock().unwrap().insert(key)
33    }
34}
35
36/// Command envelope — wraps external data with a unique command ID.
37pub struct CommandEnvelope {
38    pub command_id: String,
39    pub external_data: Vec<(TypeId, Box<dyn CloneAny>)>,
40}
41
42/// Idempotent rich resume executor — duplicate suppression.
43pub struct IdempotentRichResumeExecutor;
44
45impl IdempotentRichResumeExecutor {
46    pub fn resume<S: FlowState>(
47        engine: &mut FlowEngine<S>,
48        registry: &dyn IdempotencyRegistry,
49        flow_id: &str,
50        envelope: CommandEnvelope,
51        previous_state: S,
52    ) -> RichResumeResult {
53        if !registry.mark_if_first_seen(flow_id, &envelope.command_id) {
54            return RichResumeResult {
55                status: RichResumeStatus::AlreadyComplete,
56                error: Some(tramli::FlowError::new(
57                    "DUPLICATE_COMMAND",
58                    format!("duplicate commandId {}", envelope.command_id),
59                )),
60            };
61        }
62        RichResumeExecutor::resume(engine, flow_id, envelope.external_data, previous_state)
63    }
64}