1use serde::{Deserialize, Serialize};
7use sha2::{Digest, Sha256};
8use std::fmt;
9use thiserror::Error;
10
11#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
16pub struct IdempotencyKey(String);
17
18impl IdempotencyKey {
19 pub fn generate(run_id: &str, episode: &str, step_idx: u32) -> Self {
24 let mut hasher = Sha256::new();
25 hasher.update(run_id.as_bytes());
26 hasher.update(b"::"); hasher.update(episode.as_bytes());
28 hasher.update(b"::"); hasher.update(step_idx.to_string().as_bytes());
30
31 let hash = format!("{:x}", hasher.finalize());
32 Self(format!("idem_{}", &hash[0..16])) }
34
35 pub fn from_intent_metadata(
37 intent_id: &str,
38 actor: &str,
39 episode: &str,
40 step_idx: Option<u32>,
41 ) -> Self {
42 let step = step_idx.unwrap_or(0);
43 let run_id = format!("{}:{}", intent_id, actor);
44 Self::generate(&run_id, episode, step)
45 }
46
47 pub fn as_str(&self) -> &str {
49 &self.0
50 }
51
52 pub fn into_string(self) -> String {
54 self.0
55 }
56
57 pub fn from_string(key: String) -> Result<Self, IdempotencyError> {
59 if !key.starts_with("idem_") {
60 return Err(IdempotencyError::InvalidFormat(key));
61 }
62
63 if key.len() != 21 {
64 return Err(IdempotencyError::InvalidFormat(key));
66 }
67
68 let hex_part = &key[5..];
70 if !hex_part.chars().all(|c| c.is_ascii_hexdigit()) {
71 return Err(IdempotencyError::InvalidFormat(key));
72 }
73
74 Ok(Self(key))
75 }
76
77 pub fn context_hint(&self) -> String {
80 format!("key:{}", &self.0[5..])
81 }
82}
83
84impl fmt::Display for IdempotencyKey {
85 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
86 write!(f, "{}", self.0)
87 }
88}
89
90impl From<IdempotencyKey> for String {
91 fn from(key: IdempotencyKey) -> Self {
92 key.0
93 }
94}
95
96#[derive(Debug, Error)]
98pub enum IdempotencyError {
99 #[error("Invalid idempotency key format: {0}")]
100 InvalidFormat(String),
101
102 #[error("Duplicate execution detected for key: {key}")]
103 DuplicateExecution { key: String },
104
105 #[error("Idempotency key not found: {key}")]
106 KeyNotFound { key: String },
107
108 #[error("Idempotency store error: {0}")]
109 StoreError(String),
110}
111
112#[derive(Debug, Clone, Serialize, Deserialize)]
114pub struct IdempotentResult {
115 pub idem_key: IdempotencyKey,
117 pub intent_id: String,
119 pub result: IdempotentExecutionResult,
121 pub stored_at: chrono::DateTime<chrono::Utc>,
123 pub expires_at: chrono::DateTime<chrono::Utc>,
125 pub access_count: u32,
127}
128
129#[derive(Debug, Clone, Serialize, Deserialize)]
131#[serde(tag = "status", content = "data")]
132pub enum IdempotentExecutionResult {
133 Success(serde_json::Value),
134 Error { code: String, message: String },
135 Timeout,
136 Killed,
137}
138
139impl IdempotentResult {
140 pub fn success(
142 idem_key: IdempotencyKey,
143 intent_id: String,
144 output: serde_json::Value,
145 ttl_hours: u32,
146 ) -> Self {
147 let now = chrono::Utc::now();
148 Self {
149 idem_key,
150 intent_id,
151 result: IdempotentExecutionResult::Success(output),
152 stored_at: now,
153 expires_at: now + chrono::Duration::hours(ttl_hours as i64),
154 access_count: 0,
155 }
156 }
157
158 pub fn error(
160 idem_key: IdempotencyKey,
161 intent_id: String,
162 error_code: String,
163 error_message: String,
164 ttl_hours: u32,
165 ) -> Self {
166 let now = chrono::Utc::now();
167 Self {
168 idem_key,
169 intent_id,
170 result: IdempotentExecutionResult::Error {
171 code: error_code,
172 message: error_message,
173 },
174 stored_at: now,
175 expires_at: now + chrono::Duration::hours(ttl_hours as i64),
176 access_count: 0,
177 }
178 }
179
180 pub fn is_expired(&self) -> bool {
182 chrono::Utc::now() > self.expires_at
183 }
184
185 pub fn increment_access(&mut self) {
187 self.access_count += 1;
188 }
189}
190
191#[async_trait::async_trait]
193pub trait IdempotencyStore: Send + Sync {
194 async fn upsert_result(&self, result: IdempotentResult) -> Result<bool, IdempotencyError>;
197
198 async fn get_result(
200 &self,
201 key: &IdempotencyKey,
202 ) -> Result<Option<IdempotentResult>, IdempotencyError>;
203
204 async fn cleanup_expired(&self) -> Result<u64, IdempotencyError>;
206
207 async fn stats(&self) -> Result<IdempotencyStats, IdempotencyError>;
209}
210
211#[derive(Debug, Clone, Serialize, Deserialize)]
213pub struct IdempotencyStats {
214 pub total_keys: u64,
215 pub expired_keys: u64,
216 pub hit_rate: f64,
217 pub average_ttl_hours: f64,
218}
219
220#[derive(Debug)]
222pub struct InMemoryIdempotencyStore {
223 results: std::sync::Arc<
224 tokio::sync::RwLock<std::collections::HashMap<IdempotencyKey, IdempotentResult>>,
225 >,
226}
227
228impl InMemoryIdempotencyStore {
229 pub fn new() -> Self {
230 Self {
231 results: std::sync::Arc::new(
232 tokio::sync::RwLock::new(std::collections::HashMap::new()),
233 ),
234 }
235 }
236}
237
238impl Default for InMemoryIdempotencyStore {
239 fn default() -> Self {
240 Self::new()
241 }
242}
243
244#[async_trait::async_trait]
245impl IdempotencyStore for InMemoryIdempotencyStore {
246 async fn upsert_result(&self, result: IdempotentResult) -> Result<bool, IdempotencyError> {
247 let mut store = self.results.write().await;
248 let is_new = !store.contains_key(&result.idem_key);
249 store.insert(result.idem_key.clone(), result);
250 Ok(is_new)
251 }
252
253 async fn get_result(
254 &self,
255 key: &IdempotencyKey,
256 ) -> Result<Option<IdempotentResult>, IdempotencyError> {
257 let mut store = self.results.write().await;
258 if let Some(mut result) = store.get(key).cloned() {
259 if result.is_expired() {
260 store.remove(key);
261 return Ok(None);
262 }
263 result.increment_access();
264 store.insert(key.clone(), result.clone());
265 Ok(Some(result))
266 } else {
267 Ok(None)
268 }
269 }
270
271 async fn cleanup_expired(&self) -> Result<u64, IdempotencyError> {
272 let mut store = self.results.write().await;
273 let mut expired_keys = Vec::new();
274
275 for (key, result) in store.iter() {
276 if result.is_expired() {
277 expired_keys.push(key.clone());
278 }
279 }
280
281 let count = expired_keys.len() as u64;
282 for key in expired_keys {
283 store.remove(&key);
284 }
285
286 Ok(count)
287 }
288
289 async fn stats(&self) -> Result<IdempotencyStats, IdempotencyError> {
290 let store = self.results.read().await;
291 let total_keys = store.len() as u64;
292
293 let expired_keys = store.values().filter(|result| result.is_expired()).count() as u64;
294
295 let total_accesses: u64 = store
296 .values()
297 .map(|result| result.access_count as u64)
298 .sum();
299
300 let hit_rate = if total_keys > 0 {
301 total_accesses as f64 / total_keys as f64
302 } else {
303 0.0
304 };
305
306 let average_ttl = if total_keys > 0 {
307 let total_ttl: i64 = store
308 .values()
309 .map(|result| (result.expires_at - result.stored_at).num_hours())
310 .sum();
311 total_ttl as f64 / total_keys as f64
312 } else {
313 0.0
314 };
315
316 Ok(IdempotencyStats {
317 total_keys,
318 expired_keys,
319 hit_rate,
320 average_ttl_hours: average_ttl,
321 })
322 }
323}
324
325pub mod helpers {
327 use super::*;
328
329 pub async fn check_idempotency(
331 store: &InMemoryIdempotencyStore,
332 idem_key: &IdempotencyKey,
333 ) -> Result<Option<IdempotentExecutionResult>, IdempotencyError> {
334 if let Some(cached_result) = store.get_result(idem_key).await? {
335 return Ok(Some(cached_result.result));
336 }
337 Ok(None)
338 }
339
340 pub async fn execute_idempotent<T, F, Fut>(
342 store: &InMemoryIdempotencyStore,
343 idem_key: IdempotencyKey,
344 intent_id: String,
345 ttl_hours: u32,
346 operation: F,
347 ) -> Result<IdempotentExecutionResult, IdempotencyError>
348 where
349 F: FnOnce() -> Fut,
350 Fut: std::future::Future<Output = Result<serde_json::Value, (String, String)>>,
351 {
352 if let Some(existing) = store.get_result(&idem_key).await? {
354 return Ok(existing.result);
355 }
356
357 let result = match operation().await {
359 Ok(output) => IdempotentResult::success(idem_key.clone(), intent_id, output, ttl_hours),
360 Err((code, message)) => {
361 IdempotentResult::error(idem_key.clone(), intent_id, code, message, ttl_hours)
362 }
363 };
364
365 store.upsert_result(result.clone()).await?;
367
368 Ok(result.result)
369 }
370}
371
372#[cfg(test)]
373mod tests {
374 use super::*;
375 use serde_json::json;
376
377 #[test]
378 fn test_idempotency_key_generation() {
379 let key1 = IdempotencyKey::generate("run123", "episode1", 5);
380 let key2 = IdempotencyKey::generate("run123", "episode1", 5);
381 let key3 = IdempotencyKey::generate("run123", "episode1", 6);
382
383 assert_eq!(key1, key2);
385
386 assert_ne!(key1, key3);
388
389 assert!(key1.as_str().starts_with("idem_"));
391 assert_eq!(key1.as_str().len(), 21);
392 }
393
394 #[test]
395 fn test_idempotency_key_from_intent() {
396 let key =
397 IdempotencyKey::from_intent_metadata("intent123", "actor456", "episode789", Some(10));
398
399 assert!(key.as_str().starts_with("idem_"));
400 assert_eq!(key.as_str().len(), 21);
401 }
402
403 #[test]
404 fn test_idempotency_key_parsing() {
405 let original_key = IdempotencyKey::generate("test", "episode", 1);
406 let key_string = original_key.to_string();
407
408 let parsed_key = IdempotencyKey::from_string(key_string).unwrap();
409 assert_eq!(original_key, parsed_key);
410
411 assert!(IdempotencyKey::from_string("invalid".to_string()).is_err());
413 assert!(IdempotencyKey::from_string("idem_".to_string()).is_err());
414 assert!(IdempotencyKey::from_string("idem_gggg".to_string()).is_err()); }
416
417 #[tokio::test]
418 async fn test_in_memory_idempotency_store() {
419 let store = InMemoryIdempotencyStore::new();
420 let key = IdempotencyKey::generate("test", "episode", 1);
421 let result = IdempotentResult::success(
422 key.clone(),
423 "intent123".to_string(),
424 json!({"status": "ok"}),
425 24,
426 );
427
428 let is_new = store.upsert_result(result.clone()).await.unwrap();
430 assert!(is_new);
431
432 let retrieved = store.get_result(&key).await.unwrap();
434 assert!(retrieved.is_some());
435 let retrieved = retrieved.unwrap();
436 assert_eq!(retrieved.intent_id, "intent123");
437 assert_eq!(retrieved.access_count, 1); let is_new = store.upsert_result(result).await.unwrap();
441 assert!(!is_new);
442
443 let stats = store.stats().await.unwrap();
445 assert_eq!(stats.total_keys, 1);
446 assert_eq!(stats.expired_keys, 0);
447 }
448
449 #[tokio::test]
450 async fn test_idempotent_result_expiration() {
451 let key = IdempotencyKey::generate("test", "episode", 1);
452 let mut result = IdempotentResult::success(
453 key,
454 "intent123".to_string(),
455 json!({"status": "ok"}),
456 0, );
458
459 result.expires_at = chrono::Utc::now() - chrono::Duration::hours(1);
461
462 assert!(result.is_expired());
463 }
464
465 #[tokio::test]
466 async fn test_cleanup_expired() {
467 let store = InMemoryIdempotencyStore::new();
468 let key1 = IdempotencyKey::generate("test1", "episode", 1);
469 let key2 = IdempotencyKey::generate("test2", "episode", 1);
470
471 let mut expired_result = IdempotentResult::success(
473 key1,
474 "intent1".to_string(),
475 json!({"status": "expired"}),
476 24,
477 );
478 expired_result.expires_at = chrono::Utc::now() - chrono::Duration::hours(1);
479
480 let valid_result =
481 IdempotentResult::success(key2, "intent2".to_string(), json!({"status": "valid"}), 24);
482
483 store.upsert_result(expired_result).await.unwrap();
484 store.upsert_result(valid_result).await.unwrap();
485
486 let cleaned = store.cleanup_expired().await.unwrap();
488 assert_eq!(cleaned, 1);
489
490 let stats = store.stats().await.unwrap();
492 assert_eq!(stats.total_keys, 1);
493 }
494
495 #[tokio::test]
496 async fn test_idempotency_helpers() {
497 use helpers::*;
498
499 let store = InMemoryIdempotencyStore::new();
500 let key = IdempotencyKey::generate("test", "episode", 1);
501
502 let cached = check_idempotency(&store, &key).await.unwrap();
504 assert!(cached.is_none());
505
506 let result = execute_idempotent::<String, _, _>(
508 &store,
509 key.clone(),
510 "intent123".to_string(),
511 24,
512 || async { Ok(json!({"computed": "value"})) },
513 )
514 .await
515 .unwrap();
516
517 match result {
518 IdempotentExecutionResult::Success(value) => {
519 assert_eq!(value, json!({"computed": "value"}));
520 }
521 _ => panic!("Expected success result"),
522 }
523
524 let cached = check_idempotency(&store, &key).await.unwrap();
526 assert!(cached.is_some());
527 }
528
529 #[test]
530 fn test_idempotent_execution_result_serialization() {
531 let success_result = IdempotentExecutionResult::Success(json!({"key": "value"}));
532 let error_result = IdempotentExecutionResult::Error {
533 code: "TEST_ERROR".to_string(),
534 message: "Test error message".to_string(),
535 };
536
537 let success_json = serde_json::to_string(&success_result).unwrap();
539 let success_deserialized: IdempotentExecutionResult =
540 serde_json::from_str(&success_json).unwrap();
541
542 match success_deserialized {
543 IdempotentExecutionResult::Success(value) => {
544 assert_eq!(value, json!({"key": "value"}));
545 }
546 _ => panic!("Expected success result"),
547 }
548
549 let error_json = serde_json::to_string(&error_result).unwrap();
550 let error_deserialized: IdempotentExecutionResult =
551 serde_json::from_str(&error_json).unwrap();
552
553 match error_deserialized {
554 IdempotentExecutionResult::Error { code, message } => {
555 assert_eq!(code, "TEST_ERROR");
556 assert_eq!(message, "Test error message");
557 }
558 _ => panic!("Expected error result"),
559 }
560 }
561
562 #[tokio::test]
563 async fn test_idempotency_stats_empty_store() {
564 let store = InMemoryIdempotencyStore::new();
565 let stats = store.stats().await.unwrap();
566
567 assert_eq!(stats.total_keys, 0);
568 assert_eq!(stats.expired_keys, 0);
569 assert_eq!(stats.hit_rate, 0.0);
570 assert_eq!(stats.average_ttl_hours, 0.0);
571 }
572
573 #[tokio::test]
574 async fn test_idempotency_stats_with_data() {
575 let store = InMemoryIdempotencyStore::new();
576 let key1 = IdempotencyKey::generate("test1", "episode", 1);
577 let key2 = IdempotencyKey::generate("test2", "episode", 1);
578
579 let mut result1 =
580 IdempotentResult::success(key1, "intent1".to_string(), json!({"status": "ok"}), 24);
581 result1.access_count = 5; let result2 =
584 IdempotentResult::success(key2, "intent2".to_string(), json!({"status": "ok"}), 12);
585
586 store.upsert_result(result1).await.unwrap();
587 store.upsert_result(result2).await.unwrap();
588
589 let stats = store.stats().await.unwrap();
590 assert_eq!(stats.total_keys, 2);
591 assert_eq!(stats.expired_keys, 0);
592 assert_eq!(stats.hit_rate, 2.5); assert_eq!(stats.average_ttl_hours, 18.0); }
595
596 #[tokio::test]
597 async fn test_execute_idempotent_error_case() {
598 use helpers::*;
599
600 let store = InMemoryIdempotencyStore::new();
601 let key = IdempotencyKey::generate("test_error", "episode", 1);
602
603 let result = execute_idempotent::<String, _, _>(
605 &store,
606 key.clone(),
607 "intent123".to_string(),
608 24,
609 || async { Err(("ERROR_CODE".to_string(), "Error message".to_string())) },
610 )
611 .await
612 .unwrap();
613
614 match result {
615 IdempotentExecutionResult::Error { code, message } => {
616 assert_eq!(code, "ERROR_CODE");
617 assert_eq!(message, "Error message");
618 }
619 _ => panic!("Expected error result"),
620 }
621
622 let cached = check_idempotency(&store, &key).await.unwrap();
624 assert!(cached.is_some());
625 match cached.unwrap() {
626 IdempotentExecutionResult::Error { code, message } => {
627 assert_eq!(code, "ERROR_CODE");
628 assert_eq!(message, "Error message");
629 }
630 _ => panic!("Expected cached error result"),
631 }
632 }
633
634 #[test]
635 fn test_idempotent_result_debug_format() {
636 let key = IdempotencyKey::generate("test", "episode", 1);
637 let result =
638 IdempotentResult::success(key, "intent123".to_string(), json!({"status": "ok"}), 24);
639
640 let debug_str = format!("{:?}", result);
641 assert!(debug_str.contains("IdempotentResult"));
642 assert!(debug_str.contains("intent123"));
643 }
644
645 #[test]
646 fn test_idempotency_error_types() {
647 let store_error = IdempotencyError::StoreError("Storage failed".to_string());
648 let invalid_format_error = IdempotencyError::InvalidFormat("invalid-key".to_string());
649 let duplicate_error = IdempotencyError::DuplicateExecution {
650 key: "test-key".to_string(),
651 };
652 let not_found_error = IdempotencyError::KeyNotFound {
653 key: "missing-key".to_string(),
654 };
655
656 assert!(format!("{}", store_error).contains("Idempotency store error"));
658 assert!(format!("{}", invalid_format_error).contains("Invalid idempotency key format"));
659 assert!(format!("{}", duplicate_error).contains("Duplicate execution detected"));
660 assert!(format!("{}", not_found_error).contains("Idempotency key not found"));
661
662 let debug_str = format!("{:?}", store_error);
664 assert!(debug_str.contains("StoreError"));
665 }
666
667 #[tokio::test]
668 async fn test_upsert_result_returns_correct_bool() {
669 let store = InMemoryIdempotencyStore::new();
670 let key = IdempotencyKey::generate("test", "episode", 1);
671
672 let result = IdempotentResult::success(
673 key.clone(),
674 "intent123".to_string(),
675 json!({"status": "ok"}),
676 24,
677 );
678
679 let is_new = store.upsert_result(result.clone()).await.unwrap();
681 assert!(is_new);
682
683 let is_new = store.upsert_result(result).await.unwrap();
685 assert!(!is_new);
686 }
687
688 #[tokio::test]
689 async fn test_get_result_nonexistent_key() {
690 let store = InMemoryIdempotencyStore::new();
691 let key = IdempotencyKey::generate("nonexistent", "episode", 1);
692
693 let result = store.get_result(&key).await.unwrap();
694 assert!(result.is_none());
695 }
696
697 #[test]
698 fn test_idempotency_key_parsing_invalid() {
699 let invalid_key = "invalid-format".to_string();
701 let result = IdempotencyKey::from_string(invalid_key);
702 assert!(result.is_err());
703 }
704
705 #[test]
706 fn test_idempotency_stats_serialization() {
707 let stats = IdempotencyStats {
708 total_keys: 10,
709 expired_keys: 2,
710 hit_rate: 1.5,
711 average_ttl_hours: 24.0,
712 };
713
714 let json = serde_json::to_string(&stats).unwrap();
715 let deserialized: IdempotencyStats = serde_json::from_str(&json).unwrap();
716
717 assert_eq!(deserialized.total_keys, 10);
718 assert_eq!(deserialized.expired_keys, 2);
719 assert_eq!(deserialized.hit_rate, 1.5);
720 assert_eq!(deserialized.average_ttl_hours, 24.0);
721 }
722}