tramli_plugins/idempotency/
mod.rs1use std::any::TypeId;
2use std::collections::HashSet;
3use std::sync::Mutex;
4use tramli::{CloneAny, FlowEngine, FlowState};
5use crate::resume::{RichResumeExecutor, RichResumeResult, RichResumeStatus};
6
7pub trait IdempotencyRegistry: Send + Sync {
9 fn mark_if_first_seen(&self, flow_id: &str, command_id: &str) -> bool;
10}
11
12pub struct InMemoryIdempotencyRegistry {
14 seen: Mutex<HashSet<String>>,
15}
16
17impl InMemoryIdempotencyRegistry {
18 pub fn new() -> Self {
19 Self { seen: Mutex::new(HashSet::new()) }
20 }
21}
22
23impl Default for InMemoryIdempotencyRegistry {
24 fn default() -> Self {
25 Self::new()
26 }
27}
28
29impl IdempotencyRegistry for InMemoryIdempotencyRegistry {
30 fn mark_if_first_seen(&self, flow_id: &str, command_id: &str) -> bool {
31 let key = format!("{}::{}", flow_id, command_id);
32 self.seen.lock().unwrap().insert(key)
33 }
34}
35
36pub struct CommandEnvelope {
38 pub command_id: String,
39 pub external_data: Vec<(TypeId, Box<dyn CloneAny>)>,
40}
41
42pub struct IdempotentRichResumeExecutor;
44
45impl IdempotentRichResumeExecutor {
46 pub fn resume<S: FlowState>(
47 engine: &mut FlowEngine<S>,
48 registry: &dyn IdempotencyRegistry,
49 flow_id: &str,
50 envelope: CommandEnvelope,
51 previous_state: S,
52 ) -> RichResumeResult {
53 if !registry.mark_if_first_seen(flow_id, &envelope.command_id) {
54 return RichResumeResult {
55 status: RichResumeStatus::AlreadyComplete,
56 error: Some(tramli::FlowError::new(
57 "DUPLICATE_COMMAND",
58 format!("duplicate commandId {}", envelope.command_id),
59 )),
60 };
61 }
62 RichResumeExecutor::resume(engine, flow_id, envelope.external_data, previous_state)
63 }
64}