Skip to main content

smith_protocol/
idempotency.rs

1//! Idempotency Key System for Smith Platform
2//!
3//! This module provides idempotency key generation and management to prevent
4//! duplicate intent execution under network retries and failures.
5
6use serde::{Deserialize, Serialize};
7use sha2::{Digest, Sha256};
8use std::fmt;
9use thiserror::Error;
10
11/// Idempotency key for preventing duplicate executions
12///
13/// Format: `idem_key = hash(run_id, episode, step_idx)`
14/// This ensures network retries never cause double-execution.
15#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
16pub struct IdempotencyKey(String);
17
18impl IdempotencyKey {
19    /// Generate idempotency key from execution context
20    ///
21    /// The key is generated as: `SHA256(run_id || episode || step_idx)`
22    /// This creates a deterministic, unique key for each execution step.
23    pub fn generate(run_id: &str, episode: &str, step_idx: u32) -> Self {
24        let mut hasher = Sha256::new();
25        hasher.update(run_id.as_bytes());
26        hasher.update(b"::"); // separator
27        hasher.update(episode.as_bytes());
28        hasher.update(b"::"); // separator
29        hasher.update(step_idx.to_string().as_bytes());
30
31        let hash = format!("{:x}", hasher.finalize());
32        Self(format!("idem_{}", &hash[0..16])) // Use first 16 chars for readability
33    }
34
35    /// Generate from intent metadata
36    pub fn from_intent_metadata(
37        intent_id: &str,
38        actor: &str,
39        episode: &str,
40        step_idx: Option<u32>,
41    ) -> Self {
42        let step = step_idx.unwrap_or(0);
43        let run_id = format!("{}:{}", intent_id, actor);
44        Self::generate(&run_id, episode, step)
45    }
46
47    /// Get the raw key string
48    pub fn as_str(&self) -> &str {
49        &self.0
50    }
51
52    /// Get the key as an owned string
53    pub fn into_string(self) -> String {
54        self.0
55    }
56
57    /// Parse from string representation
58    pub fn from_string(key: String) -> Result<Self, IdempotencyError> {
59        if !key.starts_with("idem_") {
60            return Err(IdempotencyError::InvalidFormat(key));
61        }
62
63        if key.len() != 21 {
64            // "idem_" + 16 hex chars
65            return Err(IdempotencyError::InvalidFormat(key));
66        }
67
68        // Validate hex characters
69        let hex_part = &key[5..];
70        if !hex_part.chars().all(|c| c.is_ascii_hexdigit()) {
71            return Err(IdempotencyError::InvalidFormat(key));
72        }
73
74        Ok(Self(key))
75    }
76
77    /// Extract components from context (reverse operation)
78    /// Note: This is not cryptographically reversible, used only for debugging
79    pub fn context_hint(&self) -> String {
80        format!("key:{}", &self.0[5..])
81    }
82}
83
84impl fmt::Display for IdempotencyKey {
85    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
86        write!(f, "{}", self.0)
87    }
88}
89
90impl From<IdempotencyKey> for String {
91    fn from(key: IdempotencyKey) -> Self {
92        key.0
93    }
94}
95
96/// Idempotency-related errors
97#[derive(Debug, Error)]
98pub enum IdempotencyError {
99    #[error("Invalid idempotency key format: {0}")]
100    InvalidFormat(String),
101
102    #[error("Duplicate execution detected for key: {key}")]
103    DuplicateExecution { key: String },
104
105    #[error("Idempotency key not found: {key}")]
106    KeyNotFound { key: String },
107
108    #[error("Idempotency store error: {0}")]
109    StoreError(String),
110}
111
112/// Result stored with idempotency key to prevent duplicate execution
113#[derive(Debug, Clone, Serialize, Deserialize)]
114pub struct IdempotentResult {
115    /// The idempotency key this result is associated with
116    pub idem_key: IdempotencyKey,
117    /// Intent ID that was executed
118    pub intent_id: String,
119    /// Execution result (success/failure)
120    pub result: IdempotentExecutionResult,
121    /// When this result was first stored
122    pub stored_at: chrono::DateTime<chrono::Utc>,
123    /// When this result expires (for cleanup)
124    pub expires_at: chrono::DateTime<chrono::Utc>,
125    /// Number of times this key was accessed
126    pub access_count: u32,
127}
128
129/// Execution result that can be stored for idempotency
130#[derive(Debug, Clone, Serialize, Deserialize)]
131#[serde(tag = "status", content = "data")]
132pub enum IdempotentExecutionResult {
133    Success(serde_json::Value),
134    Error { code: String, message: String },
135    Timeout,
136    Killed,
137}
138
139impl IdempotentResult {
140    /// Create a new successful result
141    pub fn success(
142        idem_key: IdempotencyKey,
143        intent_id: String,
144        output: serde_json::Value,
145        ttl_hours: u32,
146    ) -> Self {
147        let now = chrono::Utc::now();
148        Self {
149            idem_key,
150            intent_id,
151            result: IdempotentExecutionResult::Success(output),
152            stored_at: now,
153            expires_at: now + chrono::Duration::hours(ttl_hours as i64),
154            access_count: 0,
155        }
156    }
157
158    /// Create a new error result
159    pub fn error(
160        idem_key: IdempotencyKey,
161        intent_id: String,
162        error_code: String,
163        error_message: String,
164        ttl_hours: u32,
165    ) -> Self {
166        let now = chrono::Utc::now();
167        Self {
168            idem_key,
169            intent_id,
170            result: IdempotentExecutionResult::Error {
171                code: error_code,
172                message: error_message,
173            },
174            stored_at: now,
175            expires_at: now + chrono::Duration::hours(ttl_hours as i64),
176            access_count: 0,
177        }
178    }
179
180    /// Check if this result has expired
181    pub fn is_expired(&self) -> bool {
182        chrono::Utc::now() > self.expires_at
183    }
184
185    /// Increment access count
186    pub fn increment_access(&mut self) {
187        self.access_count += 1;
188    }
189}
190
191/// Trait for idempotency stores (in-memory, Redis, database, etc.)
192#[async_trait::async_trait]
193pub trait IdempotencyStore: Send + Sync {
194    /// Upsert a result by idempotency key
195    /// Returns true if this is a new key, false if it already existed
196    async fn upsert_result(&self, result: IdempotentResult) -> Result<bool, IdempotencyError>;
197
198    /// Get a result by idempotency key
199    async fn get_result(
200        &self,
201        key: &IdempotencyKey,
202    ) -> Result<Option<IdempotentResult>, IdempotencyError>;
203
204    /// Delete expired results (cleanup)
205    async fn cleanup_expired(&self) -> Result<u64, IdempotencyError>;
206
207    /// Get statistics about the store
208    async fn stats(&self) -> Result<IdempotencyStats, IdempotencyError>;
209}
210
211/// Statistics about idempotency store usage
212#[derive(Debug, Clone, Serialize, Deserialize)]
213pub struct IdempotencyStats {
214    pub total_keys: u64,
215    pub expired_keys: u64,
216    pub hit_rate: f64,
217    pub average_ttl_hours: f64,
218}
219
220/// In-memory idempotency store for development and testing
221#[derive(Debug)]
222pub struct InMemoryIdempotencyStore {
223    results: std::sync::Arc<
224        tokio::sync::RwLock<std::collections::HashMap<IdempotencyKey, IdempotentResult>>,
225    >,
226}
227
228impl InMemoryIdempotencyStore {
229    pub fn new() -> Self {
230        Self {
231            results: std::sync::Arc::new(
232                tokio::sync::RwLock::new(std::collections::HashMap::new()),
233            ),
234        }
235    }
236}
237
238impl Default for InMemoryIdempotencyStore {
239    fn default() -> Self {
240        Self::new()
241    }
242}
243
244#[async_trait::async_trait]
245impl IdempotencyStore for InMemoryIdempotencyStore {
246    async fn upsert_result(&self, result: IdempotentResult) -> Result<bool, IdempotencyError> {
247        let mut store = self.results.write().await;
248        let is_new = !store.contains_key(&result.idem_key);
249        store.insert(result.idem_key.clone(), result);
250        Ok(is_new)
251    }
252
253    async fn get_result(
254        &self,
255        key: &IdempotencyKey,
256    ) -> Result<Option<IdempotentResult>, IdempotencyError> {
257        let mut store = self.results.write().await;
258        if let Some(mut result) = store.get(key).cloned() {
259            if result.is_expired() {
260                store.remove(key);
261                return Ok(None);
262            }
263            result.increment_access();
264            store.insert(key.clone(), result.clone());
265            Ok(Some(result))
266        } else {
267            Ok(None)
268        }
269    }
270
271    async fn cleanup_expired(&self) -> Result<u64, IdempotencyError> {
272        let mut store = self.results.write().await;
273        let mut expired_keys = Vec::new();
274
275        for (key, result) in store.iter() {
276            if result.is_expired() {
277                expired_keys.push(key.clone());
278            }
279        }
280
281        let count = expired_keys.len() as u64;
282        for key in expired_keys {
283            store.remove(&key);
284        }
285
286        Ok(count)
287    }
288
289    async fn stats(&self) -> Result<IdempotencyStats, IdempotencyError> {
290        let store = self.results.read().await;
291        let total_keys = store.len() as u64;
292
293        let expired_keys = store.values().filter(|result| result.is_expired()).count() as u64;
294
295        let total_accesses: u64 = store
296            .values()
297            .map(|result| result.access_count as u64)
298            .sum();
299
300        let hit_rate = if total_keys > 0 {
301            total_accesses as f64 / total_keys as f64
302        } else {
303            0.0
304        };
305
306        let average_ttl = if total_keys > 0 {
307            let total_ttl: i64 = store
308                .values()
309                .map(|result| (result.expires_at - result.stored_at).num_hours())
310                .sum();
311            total_ttl as f64 / total_keys as f64
312        } else {
313            0.0
314        };
315
316        Ok(IdempotencyStats {
317            total_keys,
318            expired_keys,
319            hit_rate,
320            average_ttl_hours: average_ttl,
321        })
322    }
323}
324
325/// Helper functions for common idempotency patterns
326pub mod helpers {
327    use super::*;
328
329    /// Check if an execution should proceed or return cached result
330    pub async fn check_idempotency(
331        store: &InMemoryIdempotencyStore,
332        idem_key: &IdempotencyKey,
333    ) -> Result<Option<IdempotentExecutionResult>, IdempotencyError> {
334        if let Some(cached_result) = store.get_result(idem_key).await? {
335            return Ok(Some(cached_result.result));
336        }
337        Ok(None)
338    }
339
340    /// Execute with idempotency protection
341    pub async fn execute_idempotent<T, F, Fut>(
342        store: &InMemoryIdempotencyStore,
343        idem_key: IdempotencyKey,
344        intent_id: String,
345        ttl_hours: u32,
346        operation: F,
347    ) -> Result<IdempotentExecutionResult, IdempotencyError>
348    where
349        F: FnOnce() -> Fut,
350        Fut: std::future::Future<Output = Result<serde_json::Value, (String, String)>>,
351    {
352        // Check for existing result
353        if let Some(existing) = store.get_result(&idem_key).await? {
354            return Ok(existing.result);
355        }
356
357        // Execute operation
358        let result = match operation().await {
359            Ok(output) => IdempotentResult::success(idem_key.clone(), intent_id, output, ttl_hours),
360            Err((code, message)) => {
361                IdempotentResult::error(idem_key.clone(), intent_id, code, message, ttl_hours)
362            }
363        };
364
365        // Store result
366        store.upsert_result(result.clone()).await?;
367
368        Ok(result.result)
369    }
370}
371
372#[cfg(test)]
373mod tests {
374    use super::*;
375    use serde_json::json;
376
377    #[test]
378    fn test_idempotency_key_generation() {
379        let key1 = IdempotencyKey::generate("run123", "episode1", 5);
380        let key2 = IdempotencyKey::generate("run123", "episode1", 5);
381        let key3 = IdempotencyKey::generate("run123", "episode1", 6);
382
383        // Same inputs should generate same key
384        assert_eq!(key1, key2);
385
386        // Different inputs should generate different keys
387        assert_ne!(key1, key3);
388
389        // Keys should have correct format
390        assert!(key1.as_str().starts_with("idem_"));
391        assert_eq!(key1.as_str().len(), 21);
392    }
393
394    #[test]
395    fn test_idempotency_key_from_intent() {
396        let key =
397            IdempotencyKey::from_intent_metadata("intent123", "actor456", "episode789", Some(10));
398
399        assert!(key.as_str().starts_with("idem_"));
400        assert_eq!(key.as_str().len(), 21);
401    }
402
403    #[test]
404    fn test_idempotency_key_parsing() {
405        let original_key = IdempotencyKey::generate("test", "episode", 1);
406        let key_string = original_key.to_string();
407
408        let parsed_key = IdempotencyKey::from_string(key_string).unwrap();
409        assert_eq!(original_key, parsed_key);
410
411        // Test invalid formats
412        assert!(IdempotencyKey::from_string("invalid".to_string()).is_err());
413        assert!(IdempotencyKey::from_string("idem_".to_string()).is_err());
414        assert!(IdempotencyKey::from_string("idem_gggg".to_string()).is_err()); // non-hex
415    }
416
417    #[tokio::test]
418    async fn test_in_memory_idempotency_store() {
419        let store = InMemoryIdempotencyStore::new();
420        let key = IdempotencyKey::generate("test", "episode", 1);
421        let result = IdempotentResult::success(
422            key.clone(),
423            "intent123".to_string(),
424            json!({"status": "ok"}),
425            24,
426        );
427
428        // Test upsert
429        let is_new = store.upsert_result(result.clone()).await.unwrap();
430        assert!(is_new);
431
432        // Test get
433        let retrieved = store.get_result(&key).await.unwrap();
434        assert!(retrieved.is_some());
435        let retrieved = retrieved.unwrap();
436        assert_eq!(retrieved.intent_id, "intent123");
437        assert_eq!(retrieved.access_count, 1); // Should increment on access
438
439        // Test duplicate upsert
440        let is_new = store.upsert_result(result).await.unwrap();
441        assert!(!is_new);
442
443        // Test stats
444        let stats = store.stats().await.unwrap();
445        assert_eq!(stats.total_keys, 1);
446        assert_eq!(stats.expired_keys, 0);
447    }
448
449    #[tokio::test]
450    async fn test_idempotent_result_expiration() {
451        let key = IdempotencyKey::generate("test", "episode", 1);
452        let mut result = IdempotentResult::success(
453            key,
454            "intent123".to_string(),
455            json!({"status": "ok"}),
456            0, // 0 hour TTL - should expire immediately
457        );
458
459        // Manually set expiration to past
460        result.expires_at = chrono::Utc::now() - chrono::Duration::hours(1);
461
462        assert!(result.is_expired());
463    }
464
465    #[tokio::test]
466    async fn test_cleanup_expired() {
467        let store = InMemoryIdempotencyStore::new();
468        let key1 = IdempotencyKey::generate("test1", "episode", 1);
469        let key2 = IdempotencyKey::generate("test2", "episode", 1);
470
471        // Create one expired and one valid result
472        let mut expired_result = IdempotentResult::success(
473            key1,
474            "intent1".to_string(),
475            json!({"status": "expired"}),
476            24,
477        );
478        expired_result.expires_at = chrono::Utc::now() - chrono::Duration::hours(1);
479
480        let valid_result =
481            IdempotentResult::success(key2, "intent2".to_string(), json!({"status": "valid"}), 24);
482
483        store.upsert_result(expired_result).await.unwrap();
484        store.upsert_result(valid_result).await.unwrap();
485
486        // Cleanup should remove 1 expired result
487        let cleaned = store.cleanup_expired().await.unwrap();
488        assert_eq!(cleaned, 1);
489
490        // Only valid result should remain
491        let stats = store.stats().await.unwrap();
492        assert_eq!(stats.total_keys, 1);
493    }
494
495    #[tokio::test]
496    async fn test_idempotency_helpers() {
497        use helpers::*;
498
499        let store = InMemoryIdempotencyStore::new();
500        let key = IdempotencyKey::generate("test", "episode", 1);
501
502        // First check should return None (no cached result)
503        let cached = check_idempotency(&store, &key).await.unwrap();
504        assert!(cached.is_none());
505
506        // Execute idempotent operation
507        let result = execute_idempotent::<String, _, _>(
508            &store,
509            key.clone(),
510            "intent123".to_string(),
511            24,
512            || async { Ok(json!({"computed": "value"})) },
513        )
514        .await
515        .unwrap();
516
517        match result {
518            IdempotentExecutionResult::Success(value) => {
519                assert_eq!(value, json!({"computed": "value"}));
520            }
521            _ => panic!("Expected success result"),
522        }
523
524        // Second check should return cached result
525        let cached = check_idempotency(&store, &key).await.unwrap();
526        assert!(cached.is_some());
527    }
528
529    #[test]
530    fn test_idempotent_execution_result_serialization() {
531        let success_result = IdempotentExecutionResult::Success(json!({"key": "value"}));
532        let error_result = IdempotentExecutionResult::Error {
533            code: "TEST_ERROR".to_string(),
534            message: "Test error message".to_string(),
535        };
536
537        // Test serialization roundtrip
538        let success_json = serde_json::to_string(&success_result).unwrap();
539        let success_deserialized: IdempotentExecutionResult =
540            serde_json::from_str(&success_json).unwrap();
541
542        match success_deserialized {
543            IdempotentExecutionResult::Success(value) => {
544                assert_eq!(value, json!({"key": "value"}));
545            }
546            _ => panic!("Expected success result"),
547        }
548
549        let error_json = serde_json::to_string(&error_result).unwrap();
550        let error_deserialized: IdempotentExecutionResult =
551            serde_json::from_str(&error_json).unwrap();
552
553        match error_deserialized {
554            IdempotentExecutionResult::Error { code, message } => {
555                assert_eq!(code, "TEST_ERROR");
556                assert_eq!(message, "Test error message");
557            }
558            _ => panic!("Expected error result"),
559        }
560    }
561
562    #[tokio::test]
563    async fn test_idempotency_stats_empty_store() {
564        let store = InMemoryIdempotencyStore::new();
565        let stats = store.stats().await.unwrap();
566
567        assert_eq!(stats.total_keys, 0);
568        assert_eq!(stats.expired_keys, 0);
569        assert_eq!(stats.hit_rate, 0.0);
570        assert_eq!(stats.average_ttl_hours, 0.0);
571    }
572
573    #[tokio::test]
574    async fn test_idempotency_stats_with_data() {
575        let store = InMemoryIdempotencyStore::new();
576        let key1 = IdempotencyKey::generate("test1", "episode", 1);
577        let key2 = IdempotencyKey::generate("test2", "episode", 1);
578
579        let mut result1 =
580            IdempotentResult::success(key1, "intent1".to_string(), json!({"status": "ok"}), 24);
581        result1.access_count = 5; // Simulate multiple accesses
582
583        let result2 =
584            IdempotentResult::success(key2, "intent2".to_string(), json!({"status": "ok"}), 12);
585
586        store.upsert_result(result1).await.unwrap();
587        store.upsert_result(result2).await.unwrap();
588
589        let stats = store.stats().await.unwrap();
590        assert_eq!(stats.total_keys, 2);
591        assert_eq!(stats.expired_keys, 0);
592        assert_eq!(stats.hit_rate, 2.5); // (5 + 0) / 2 (access_count for result2 defaults to 0)
593        assert_eq!(stats.average_ttl_hours, 18.0); // (24 + 12) / 2
594    }
595
596    #[tokio::test]
597    async fn test_execute_idempotent_error_case() {
598        use helpers::*;
599
600        let store = InMemoryIdempotencyStore::new();
601        let key = IdempotencyKey::generate("test_error", "episode", 1);
602
603        // Execute operation that returns error
604        let result = execute_idempotent::<String, _, _>(
605            &store,
606            key.clone(),
607            "intent123".to_string(),
608            24,
609            || async { Err(("ERROR_CODE".to_string(), "Error message".to_string())) },
610        )
611        .await
612        .unwrap();
613
614        match result {
615            IdempotentExecutionResult::Error { code, message } => {
616                assert_eq!(code, "ERROR_CODE");
617                assert_eq!(message, "Error message");
618            }
619            _ => panic!("Expected error result"),
620        }
621
622        // Should return same cached error result on second execution
623        let cached = check_idempotency(&store, &key).await.unwrap();
624        assert!(cached.is_some());
625        match cached.unwrap() {
626            IdempotentExecutionResult::Error { code, message } => {
627                assert_eq!(code, "ERROR_CODE");
628                assert_eq!(message, "Error message");
629            }
630            _ => panic!("Expected cached error result"),
631        }
632    }
633
634    #[test]
635    fn test_idempotent_result_debug_format() {
636        let key = IdempotencyKey::generate("test", "episode", 1);
637        let result =
638            IdempotentResult::success(key, "intent123".to_string(), json!({"status": "ok"}), 24);
639
640        let debug_str = format!("{:?}", result);
641        assert!(debug_str.contains("IdempotentResult"));
642        assert!(debug_str.contains("intent123"));
643    }
644
645    #[test]
646    fn test_idempotency_error_types() {
647        let store_error = IdempotencyError::StoreError("Storage failed".to_string());
648        let invalid_format_error = IdempotencyError::InvalidFormat("invalid-key".to_string());
649        let duplicate_error = IdempotencyError::DuplicateExecution {
650            key: "test-key".to_string(),
651        };
652        let not_found_error = IdempotencyError::KeyNotFound {
653            key: "missing-key".to_string(),
654        };
655
656        // Test Display trait
657        assert!(format!("{}", store_error).contains("Idempotency store error"));
658        assert!(format!("{}", invalid_format_error).contains("Invalid idempotency key format"));
659        assert!(format!("{}", duplicate_error).contains("Duplicate execution detected"));
660        assert!(format!("{}", not_found_error).contains("Idempotency key not found"));
661
662        // Test Debug trait
663        let debug_str = format!("{:?}", store_error);
664        assert!(debug_str.contains("StoreError"));
665    }
666
667    #[tokio::test]
668    async fn test_upsert_result_returns_correct_bool() {
669        let store = InMemoryIdempotencyStore::new();
670        let key = IdempotencyKey::generate("test", "episode", 1);
671
672        let result = IdempotentResult::success(
673            key.clone(),
674            "intent123".to_string(),
675            json!({"status": "ok"}),
676            24,
677        );
678
679        // First insert should return true (new key)
680        let is_new = store.upsert_result(result.clone()).await.unwrap();
681        assert!(is_new);
682
683        // Second insert should return false (existing key)
684        let is_new = store.upsert_result(result).await.unwrap();
685        assert!(!is_new);
686    }
687
688    #[tokio::test]
689    async fn test_get_result_nonexistent_key() {
690        let store = InMemoryIdempotencyStore::new();
691        let key = IdempotencyKey::generate("nonexistent", "episode", 1);
692
693        let result = store.get_result(&key).await.unwrap();
694        assert!(result.is_none());
695    }
696
697    #[test]
698    fn test_idempotency_key_parsing_invalid() {
699        // Test parsing invalid key format
700        let invalid_key = "invalid-format".to_string();
701        let result = IdempotencyKey::from_string(invalid_key);
702        assert!(result.is_err());
703    }
704
705    #[test]
706    fn test_idempotency_stats_serialization() {
707        let stats = IdempotencyStats {
708            total_keys: 10,
709            expired_keys: 2,
710            hit_rate: 1.5,
711            average_ttl_hours: 24.0,
712        };
713
714        let json = serde_json::to_string(&stats).unwrap();
715        let deserialized: IdempotencyStats = serde_json::from_str(&json).unwrap();
716
717        assert_eq!(deserialized.total_keys, 10);
718        assert_eq!(deserialized.expired_keys, 2);
719        assert_eq!(deserialized.hit_rate, 1.5);
720        assert_eq!(deserialized.average_ttl_hours, 24.0);
721    }
722}