Skip to main content

turul_mcp_task_storage/
in_memory.rs

1//! In-memory task storage backend.
2//!
3//! Suitable for development, testing, and single-instance deployments.
4//! Tasks are stored in a `HashMap` behind an `RwLock`.
5
6use crate::error::TaskStorageError;
7use crate::state_machine;
8use crate::traits::{TaskListPage, TaskOutcome, TaskRecord, TaskStorage};
9use async_trait::async_trait;
10use chrono::Utc;
11use std::collections::HashMap;
12use std::sync::Arc;
13use tokio::sync::RwLock;
14use turul_mcp_protocol::TaskStatus;
15use uuid::Uuid;
16
17/// Configuration for the in-memory task storage backend.
18#[derive(Debug, Clone)]
19pub struct InMemoryTaskConfig {
20    /// Maximum number of tasks to store (0 = unlimited)
21    pub max_tasks: usize,
22    /// Default page size for list operations
23    pub default_page_size: u32,
24}
25
26impl Default for InMemoryTaskConfig {
27    fn default() -> Self {
28        Self {
29            max_tasks: 10_000,
30            default_page_size: 50,
31        }
32    }
33}
34
35/// In-memory task storage backend.
36///
37/// Uses `Arc<RwLock<HashMap>>` for concurrent access.
38/// Task IDs are UUID v7 for temporal ordering.
39#[derive(Clone)]
40pub struct InMemoryTaskStorage {
41    tasks: Arc<RwLock<HashMap<String, TaskRecord>>>,
42    config: InMemoryTaskConfig,
43}
44
45impl InMemoryTaskStorage {
46    /// Create a new in-memory task storage with default configuration.
47    pub fn new() -> Self {
48        Self {
49            tasks: Arc::new(RwLock::new(HashMap::new())),
50            config: InMemoryTaskConfig::default(),
51        }
52    }
53
54    /// Create a new in-memory task storage with custom configuration.
55    pub fn with_config(config: InMemoryTaskConfig) -> Self {
56        Self {
57            tasks: Arc::new(RwLock::new(HashMap::new())),
58            config,
59        }
60    }
61
62    /// Generate a new task ID using UUID v7 (temporal ordering).
63    pub fn generate_task_id() -> String {
64        Uuid::now_v7().as_simple().to_string()
65    }
66
67    fn now_iso8601() -> String {
68        Utc::now().to_rfc3339()
69    }
70}
71
72impl Default for InMemoryTaskStorage {
73    fn default() -> Self {
74        Self::new()
75    }
76}
77
78#[async_trait]
79impl TaskStorage for InMemoryTaskStorage {
80    fn backend_name(&self) -> &'static str {
81        "in-memory"
82    }
83
84    async fn create_task(&self, mut task: TaskRecord) -> Result<TaskRecord, TaskStorageError> {
85        let mut tasks = self.tasks.write().await;
86
87        if self.config.max_tasks > 0 && tasks.len() >= self.config.max_tasks {
88            return Err(TaskStorageError::MaxTasksReached(self.config.max_tasks));
89        }
90
91        // Ensure timestamps are set
92        if task.created_at.is_empty() {
93            task.created_at = Self::now_iso8601();
94        }
95        if task.last_updated_at.is_empty() {
96            task.last_updated_at = task.created_at.clone();
97        }
98
99        tasks.insert(task.task_id.clone(), task.clone());
100        Ok(task)
101    }
102
103    async fn get_task(&self, task_id: &str) -> Result<Option<TaskRecord>, TaskStorageError> {
104        let tasks = self.tasks.read().await;
105        Ok(tasks.get(task_id).cloned())
106    }
107
108    async fn update_task(&self, task: TaskRecord) -> Result<(), TaskStorageError> {
109        let mut tasks = self.tasks.write().await;
110        if !tasks.contains_key(&task.task_id) {
111            return Err(TaskStorageError::TaskNotFound(task.task_id.clone()));
112        }
113        tasks.insert(task.task_id.clone(), task);
114        Ok(())
115    }
116
117    async fn delete_task(&self, task_id: &str) -> Result<bool, TaskStorageError> {
118        let mut tasks = self.tasks.write().await;
119        Ok(tasks.remove(task_id).is_some())
120    }
121
122    async fn list_tasks(
123        &self,
124        cursor: Option<&str>,
125        limit: Option<u32>,
126    ) -> Result<TaskListPage, TaskStorageError> {
127        let tasks = self.tasks.read().await;
128        let limit = limit.unwrap_or(self.config.default_page_size) as usize;
129
130        // Sort by (created_at, task_id) for deterministic ordering
131        let mut sorted: Vec<&TaskRecord> = tasks.values().collect();
132        sorted.sort_by(|a, b| {
133            a.created_at
134                .cmp(&b.created_at)
135                .then_with(|| a.task_id.cmp(&b.task_id))
136        });
137
138        // Apply cursor — find the cursor task_id, start after it.
139        // If cursor doesn't exist, start from the beginning (graceful degradation).
140        let start = if let Some(cursor_id) = cursor {
141            sorted
142                .iter()
143                .position(|t| t.task_id == cursor_id)
144                .map(|pos| pos + 1)
145                .unwrap_or(0)
146        } else {
147            0
148        };
149
150        let page: Vec<TaskRecord> = sorted
151            .iter()
152            .skip(start)
153            .take(limit)
154            .map(|t| (*t).clone())
155            .collect();
156
157        let next_cursor = if start + limit < sorted.len() {
158            page.last().map(|t| t.task_id.clone())
159        } else {
160            None
161        };
162
163        Ok(TaskListPage {
164            tasks: page,
165            next_cursor,
166        })
167    }
168
169    async fn update_task_status(
170        &self,
171        task_id: &str,
172        new_status: TaskStatus,
173        status_message: Option<String>,
174    ) -> Result<TaskRecord, TaskStorageError> {
175        let mut tasks = self.tasks.write().await;
176
177        let task = tasks
178            .get_mut(task_id)
179            .ok_or_else(|| TaskStorageError::TaskNotFound(task_id.to_string()))?;
180
181        // Validate state machine transition
182        state_machine::validate_transition(task.status, new_status)?;
183
184        task.status = new_status;
185        task.status_message = status_message;
186        task.last_updated_at = Self::now_iso8601();
187
188        Ok(task.clone())
189    }
190
191    async fn store_task_result(
192        &self,
193        task_id: &str,
194        result: TaskOutcome,
195    ) -> Result<(), TaskStorageError> {
196        let mut tasks = self.tasks.write().await;
197
198        let task = tasks
199            .get_mut(task_id)
200            .ok_or_else(|| TaskStorageError::TaskNotFound(task_id.to_string()))?;
201
202        task.result = Some(result);
203        task.last_updated_at = Self::now_iso8601();
204
205        Ok(())
206    }
207
208    async fn get_task_result(
209        &self,
210        task_id: &str,
211    ) -> Result<Option<TaskOutcome>, TaskStorageError> {
212        let tasks = self.tasks.read().await;
213
214        let task = tasks
215            .get(task_id)
216            .ok_or_else(|| TaskStorageError::TaskNotFound(task_id.to_string()))?;
217
218        Ok(task.result.clone())
219    }
220
221    async fn expire_tasks(&self) -> Result<Vec<String>, TaskStorageError> {
222        let mut tasks = self.tasks.write().await;
223        let now = Utc::now();
224        let mut expired = Vec::new();
225
226        // Collect IDs of expired tasks
227        let to_expire: Vec<String> = tasks
228            .values()
229            .filter(|t| {
230                if let Some(ttl) = t.ttl
231                    && let Ok(created) = chrono::DateTime::parse_from_rfc3339(&t.created_at)
232                {
233                    let expiry = created.with_timezone(&Utc) + chrono::Duration::milliseconds(ttl);
234                    return now > expiry;
235                }
236                false
237            })
238            .map(|t| t.task_id.clone())
239            .collect();
240
241        for id in to_expire {
242            tasks.remove(&id);
243            expired.push(id);
244        }
245
246        Ok(expired)
247    }
248
249    async fn task_count(&self) -> Result<usize, TaskStorageError> {
250        let tasks = self.tasks.read().await;
251        Ok(tasks.len())
252    }
253
254    async fn maintenance(&self) -> Result<(), TaskStorageError> {
255        self.expire_tasks().await?;
256        Ok(())
257    }
258
259    async fn list_tasks_for_session(
260        &self,
261        session_id: &str,
262        cursor: Option<&str>,
263        limit: Option<u32>,
264    ) -> Result<TaskListPage, TaskStorageError> {
265        let tasks = self.tasks.read().await;
266        let limit = limit.unwrap_or(self.config.default_page_size) as usize;
267
268        // Filter by session_id, sort by (created_at, task_id) for deterministic ordering
269        let mut sorted: Vec<&TaskRecord> = tasks
270            .values()
271            .filter(|t| t.session_id.as_deref() == Some(session_id))
272            .collect();
273        sorted.sort_by(|a, b| {
274            a.created_at
275                .cmp(&b.created_at)
276                .then_with(|| a.task_id.cmp(&b.task_id))
277        });
278
279        let start = if let Some(cursor_id) = cursor {
280            sorted
281                .iter()
282                .position(|t| t.task_id == cursor_id)
283                .map(|pos| pos + 1)
284                .unwrap_or(0)
285        } else {
286            0
287        };
288
289        let page: Vec<TaskRecord> = sorted
290            .iter()
291            .skip(start)
292            .take(limit)
293            .map(|t| (*t).clone())
294            .collect();
295
296        let next_cursor = if start + limit < sorted.len() {
297            page.last().map(|t| t.task_id.clone())
298        } else {
299            None
300        };
301
302        Ok(TaskListPage {
303            tasks: page,
304            next_cursor,
305        })
306    }
307
308    async fn recover_stuck_tasks(&self, max_age_ms: u64) -> Result<Vec<String>, TaskStorageError> {
309        let mut tasks = self.tasks.write().await;
310        let now = Utc::now();
311        let mut recovered = Vec::new();
312
313        for task in tasks.values_mut() {
314            if state_machine::is_terminal(task.status) {
315                continue;
316            }
317
318            if let Ok(created) = chrono::DateTime::parse_from_rfc3339(&task.last_updated_at) {
319                let age_ms = (now - created.with_timezone(&Utc)).num_milliseconds();
320                if age_ms > max_age_ms as i64 {
321                    task.status = TaskStatus::Failed;
322                    task.status_message = Some("Server restarted — task interrupted".to_string());
323                    task.last_updated_at = Self::now_iso8601();
324                    recovered.push(task.task_id.clone());
325                }
326            }
327        }
328
329        Ok(recovered)
330    }
331}
332
333#[cfg(test)]
334mod tests {
335    use super::*;
336    use serde_json::json;
337
338    fn make_task(task_id: &str, session_id: Option<&str>) -> TaskRecord {
339        TaskRecord {
340            task_id: task_id.to_string(),
341            session_id: session_id.map(|s| s.to_string()),
342            status: TaskStatus::Working,
343            status_message: None,
344            created_at: Utc::now().to_rfc3339(),
345            last_updated_at: Utc::now().to_rfc3339(),
346            ttl: None,
347            poll_interval: None,
348            original_method: "tools/call".to_string(),
349            original_params: None,
350            result: None,
351            meta: None,
352        }
353    }
354
355    fn make_task_with_time(task_id: &str, created_at: &str) -> TaskRecord {
356        TaskRecord {
357            task_id: task_id.to_string(),
358            session_id: None,
359            status: TaskStatus::Working,
360            status_message: None,
361            created_at: created_at.to_string(),
362            last_updated_at: created_at.to_string(),
363            ttl: None,
364            poll_interval: None,
365            original_method: "tools/call".to_string(),
366            original_params: None,
367            result: None,
368            meta: None,
369        }
370    }
371
372    #[tokio::test]
373    async fn test_create_and_get_task() {
374        let storage = InMemoryTaskStorage::new();
375        let task = make_task("task-1", None);
376
377        let created = storage.create_task(task).await.unwrap();
378        assert_eq!(created.task_id, "task-1");
379        assert_eq!(created.status, TaskStatus::Working);
380
381        let fetched = storage.get_task("task-1").await.unwrap();
382        assert!(fetched.is_some());
383        assert_eq!(fetched.unwrap().task_id, "task-1");
384    }
385
386    #[tokio::test]
387    async fn test_get_nonexistent_task() {
388        let storage = InMemoryTaskStorage::new();
389        let result = storage.get_task("nonexistent").await.unwrap();
390        assert!(result.is_none());
391    }
392
393    #[tokio::test]
394    async fn test_task_lifecycle() {
395        let storage = InMemoryTaskStorage::new();
396        let task = make_task("task-life", None);
397        storage.create_task(task).await.unwrap();
398
399        // Working -> Completed
400        let updated = storage
401            .update_task_status("task-life", TaskStatus::Completed, Some("Done".to_string()))
402            .await
403            .unwrap();
404        assert_eq!(updated.status, TaskStatus::Completed);
405        assert_eq!(updated.status_message, Some("Done".to_string()));
406
407        // Verify stored
408        let fetched = storage.get_task("task-life").await.unwrap().unwrap();
409        assert_eq!(fetched.status, TaskStatus::Completed);
410    }
411
412    #[tokio::test]
413    async fn test_task_cancellation() {
414        let storage = InMemoryTaskStorage::new();
415        let task = make_task("task-cancel", None);
416        storage.create_task(task).await.unwrap();
417
418        let updated = storage
419            .update_task_status(
420                "task-cancel",
421                TaskStatus::Cancelled,
422                Some("User cancelled".to_string()),
423            )
424            .await
425            .unwrap();
426        assert_eq!(updated.status, TaskStatus::Cancelled);
427    }
428
429    #[tokio::test]
430    async fn test_invalid_state_transition() {
431        let storage = InMemoryTaskStorage::new();
432        let task = make_task("task-invalid", None);
433        storage.create_task(task).await.unwrap();
434
435        // Complete the task
436        storage
437            .update_task_status("task-invalid", TaskStatus::Completed, None)
438            .await
439            .unwrap();
440
441        // Completed -> Working should fail
442        let result = storage
443            .update_task_status("task-invalid", TaskStatus::Working, None)
444            .await;
445        assert!(result.is_err());
446        match result.unwrap_err() {
447            TaskStorageError::TerminalState(s) => assert_eq!(s, TaskStatus::Completed),
448            other => panic!("Expected TerminalState, got: {:?}", other),
449        }
450    }
451
452    #[tokio::test]
453    async fn test_result_storage() {
454        let storage = InMemoryTaskStorage::new();
455        let task = make_task("task-result", None);
456        storage.create_task(task).await.unwrap();
457
458        let outcome = TaskOutcome::Success(json!({"content": [{"type": "text", "text": "done"}]}));
459        storage
460            .store_task_result("task-result", outcome)
461            .await
462            .unwrap();
463
464        let result = storage.get_task_result("task-result").await.unwrap();
465        assert!(result.is_some());
466        match result.unwrap() {
467            TaskOutcome::Success(v) => {
468                assert_eq!(v["content"][0]["text"], "done");
469            }
470            _ => panic!("Expected Success"),
471        }
472    }
473
474    #[tokio::test]
475    async fn test_error_result_storage() {
476        let storage = InMemoryTaskStorage::new();
477        let task = make_task("task-err", None);
478        storage.create_task(task).await.unwrap();
479
480        let outcome = TaskOutcome::Error {
481            code: -32010,
482            message: "Tool failed".to_string(),
483            data: Some(json!({"detail": "oops"})),
484        };
485        storage
486            .store_task_result("task-err", outcome)
487            .await
488            .unwrap();
489
490        let result = storage.get_task_result("task-err").await.unwrap().unwrap();
491        match result {
492            TaskOutcome::Error {
493                code,
494                message,
495                data,
496            } => {
497                assert_eq!(code, -32010);
498                assert_eq!(message, "Tool failed");
499                assert_eq!(data.unwrap()["detail"], "oops");
500            }
501            _ => panic!("Expected Error"),
502        }
503    }
504
505    #[tokio::test]
506    async fn test_ttl_expiry() {
507        let storage = InMemoryTaskStorage::new();
508
509        // Create task with very short TTL and old timestamp
510        let mut task = make_task("task-expire", None);
511        task.ttl = Some(1); // 1ms TTL
512        task.created_at = "2020-01-01T00:00:00Z".to_string();
513        storage.create_task(task).await.unwrap();
514
515        // Also create a task without TTL
516        let task2 = make_task("task-keep", None);
517        storage.create_task(task2).await.unwrap();
518
519        let expired = storage.expire_tasks().await.unwrap();
520        assert_eq!(expired.len(), 1);
521        assert_eq!(expired[0], "task-expire");
522
523        // Verify expired task is gone
524        assert!(storage.get_task("task-expire").await.unwrap().is_none());
525        // Verify other task still exists
526        assert!(storage.get_task("task-keep").await.unwrap().is_some());
527    }
528
529    #[tokio::test]
530    async fn test_pagination() {
531        let storage = InMemoryTaskStorage::new();
532
533        // Create tasks with sequential timestamps for consistent ordering
534        for i in 0..5 {
535            let task =
536                make_task_with_time(&format!("task-{}", i), &format!("2025-01-01T00:00:0{}Z", i));
537            storage.create_task(task).await.unwrap();
538        }
539
540        // Page 1: limit 2
541        let page1 = storage.list_tasks(None, Some(2)).await.unwrap();
542        assert_eq!(page1.tasks.len(), 2);
543        assert_eq!(page1.tasks[0].task_id, "task-0");
544        assert_eq!(page1.tasks[1].task_id, "task-1");
545        assert!(page1.next_cursor.is_some());
546
547        // Page 2: using cursor from page 1
548        let page2 = storage
549            .list_tasks(page1.next_cursor.as_deref(), Some(2))
550            .await
551            .unwrap();
552        assert_eq!(page2.tasks.len(), 2);
553        assert_eq!(page2.tasks[0].task_id, "task-2");
554        assert_eq!(page2.tasks[1].task_id, "task-3");
555
556        // Page 3: last page
557        let page3 = storage
558            .list_tasks(page2.next_cursor.as_deref(), Some(2))
559            .await
560            .unwrap();
561        assert_eq!(page3.tasks.len(), 1);
562        assert_eq!(page3.tasks[0].task_id, "task-4");
563        assert!(page3.next_cursor.is_none());
564    }
565
566    #[tokio::test]
567    async fn test_session_binding() {
568        let storage = InMemoryTaskStorage::new();
569
570        storage
571            .create_task(make_task("task-a", Some("session-1")))
572            .await
573            .unwrap();
574        storage
575            .create_task(make_task("task-b", Some("session-1")))
576            .await
577            .unwrap();
578        storage
579            .create_task(make_task("task-c", Some("session-2")))
580            .await
581            .unwrap();
582
583        let session1_tasks = storage
584            .list_tasks_for_session("session-1", None, None)
585            .await
586            .unwrap();
587        assert_eq!(session1_tasks.tasks.len(), 2);
588
589        let session2_tasks = storage
590            .list_tasks_for_session("session-2", None, None)
591            .await
592            .unwrap();
593        assert_eq!(session2_tasks.tasks.len(), 1);
594        assert_eq!(session2_tasks.tasks[0].task_id, "task-c");
595
596        let empty = storage
597            .list_tasks_for_session("session-3", None, None)
598            .await
599            .unwrap();
600        assert_eq!(empty.tasks.len(), 0);
601    }
602
603    #[tokio::test]
604    async fn test_delete_task() {
605        let storage = InMemoryTaskStorage::new();
606        storage
607            .create_task(make_task("task-del", None))
608            .await
609            .unwrap();
610
611        assert!(storage.delete_task("task-del").await.unwrap());
612        assert!(!storage.delete_task("task-del").await.unwrap()); // Already deleted
613        assert!(storage.get_task("task-del").await.unwrap().is_none());
614    }
615
616    #[tokio::test]
617    async fn test_task_count() {
618        let storage = InMemoryTaskStorage::new();
619        assert_eq!(storage.task_count().await.unwrap(), 0);
620
621        storage
622            .create_task(make_task("task-1", None))
623            .await
624            .unwrap();
625        assert_eq!(storage.task_count().await.unwrap(), 1);
626
627        storage
628            .create_task(make_task("task-2", None))
629            .await
630            .unwrap();
631        assert_eq!(storage.task_count().await.unwrap(), 2);
632
633        storage.delete_task("task-1").await.unwrap();
634        assert_eq!(storage.task_count().await.unwrap(), 1);
635    }
636
637    #[tokio::test]
638    async fn test_max_tasks_limit() {
639        let config = InMemoryTaskConfig {
640            max_tasks: 2,
641            ..Default::default()
642        };
643        let storage = InMemoryTaskStorage::with_config(config);
644
645        storage
646            .create_task(make_task("task-1", None))
647            .await
648            .unwrap();
649        storage
650            .create_task(make_task("task-2", None))
651            .await
652            .unwrap();
653
654        let result = storage.create_task(make_task("task-3", None)).await;
655        assert!(result.is_err());
656        match result.unwrap_err() {
657            TaskStorageError::MaxTasksReached(n) => assert_eq!(n, 2),
658            other => panic!("Expected MaxTasksReached, got: {:?}", other),
659        }
660    }
661
662    #[tokio::test]
663    async fn test_recover_stuck_tasks() {
664        let storage = InMemoryTaskStorage::new();
665
666        // Create a "stuck" working task with old timestamp
667        let mut stuck = make_task("task-stuck", None);
668        stuck.last_updated_at = "2020-01-01T00:00:00Z".to_string();
669        storage.create_task(stuck).await.unwrap();
670
671        // Create a recent working task that should NOT be recovered
672        let recent = make_task("task-recent", None);
673        storage.create_task(recent).await.unwrap();
674
675        // Create a completed task that should NOT be touched
676        let mut completed = make_task("task-done", None);
677        completed.status = TaskStatus::Completed;
678        completed.last_updated_at = "2020-01-01T00:00:00Z".to_string();
679        storage.create_task(completed).await.unwrap();
680
681        // Recover with 5 minute threshold
682        let recovered = storage.recover_stuck_tasks(300_000).await.unwrap();
683        assert_eq!(recovered.len(), 1);
684        assert_eq!(recovered[0], "task-stuck");
685
686        // Verify stuck task is now Failed
687        let task = storage.get_task("task-stuck").await.unwrap().unwrap();
688        assert_eq!(task.status, TaskStatus::Failed);
689        assert_eq!(
690            task.status_message,
691            Some("Server restarted — task interrupted".to_string())
692        );
693
694        // Verify recent task is still Working
695        let recent = storage.get_task("task-recent").await.unwrap().unwrap();
696        assert_eq!(recent.status, TaskStatus::Working);
697
698        // Verify completed task is untouched
699        let done = storage.get_task("task-done").await.unwrap().unwrap();
700        assert_eq!(done.status, TaskStatus::Completed);
701    }
702
703    #[tokio::test]
704    async fn test_to_protocol_task() {
705        let record = TaskRecord {
706            task_id: "task-proto".to_string(),
707            session_id: Some("sess-1".to_string()),
708            status: TaskStatus::Working,
709            status_message: Some("Processing".to_string()),
710            created_at: "2025-01-01T00:00:00Z".to_string(),
711            last_updated_at: "2025-01-01T00:00:01Z".to_string(),
712            ttl: Some(60000),
713            poll_interval: Some(5000),
714            original_method: "tools/call".to_string(),
715            original_params: None,
716            result: None,
717            meta: None,
718        };
719
720        let task = record.to_protocol_task();
721        assert_eq!(task.task_id, "task-proto");
722        assert_eq!(task.status, TaskStatus::Working);
723        assert_eq!(task.status_message, Some("Processing".to_string()));
724        assert_eq!(task.ttl, Some(60000));
725        assert_eq!(task.poll_interval, Some(5000));
726    }
727
728    #[tokio::test]
729    async fn test_task_outcome_serialization() {
730        let success = TaskOutcome::Success(json!({"content": []}));
731        let json = serde_json::to_string(&success).unwrap();
732        let parsed: TaskOutcome = serde_json::from_str(&json).unwrap();
733        match parsed {
734            TaskOutcome::Success(v) => assert!(v["content"].is_array()),
735            _ => panic!("Expected Success"),
736        }
737
738        let error = TaskOutcome::Error {
739            code: -32603,
740            message: "Internal error".to_string(),
741            data: None,
742        };
743        let json = serde_json::to_string(&error).unwrap();
744        let parsed: TaskOutcome = serde_json::from_str(&json).unwrap();
745        match parsed {
746            TaskOutcome::Error { code, message, .. } => {
747                assert_eq!(code, -32603);
748                assert_eq!(message, "Internal error");
749            }
750            _ => panic!("Expected Error"),
751        }
752    }
753
754    #[tokio::test]
755    async fn test_update_nonexistent_task() {
756        let storage = InMemoryTaskStorage::new();
757        let result = storage
758            .update_task_status("nonexistent", TaskStatus::Completed, None)
759            .await;
760        assert!(result.is_err());
761        match result.unwrap_err() {
762            TaskStorageError::TaskNotFound(id) => assert_eq!(id, "nonexistent"),
763            other => panic!("Expected TaskNotFound, got: {:?}", other),
764        }
765    }
766
767    #[tokio::test]
768    async fn test_input_required_transition() {
769        let storage = InMemoryTaskStorage::new();
770        storage
771            .create_task(make_task("task-ir", None))
772            .await
773            .unwrap();
774
775        // Working -> InputRequired
776        storage
777            .update_task_status(
778                "task-ir",
779                TaskStatus::InputRequired,
780                Some("Need user input".to_string()),
781            )
782            .await
783            .unwrap();
784
785        // InputRequired -> Working (resume)
786        storage
787            .update_task_status("task-ir", TaskStatus::Working, Some("Resuming".to_string()))
788            .await
789            .unwrap();
790
791        // Working -> Completed
792        storage
793            .update_task_status("task-ir", TaskStatus::Completed, None)
794            .await
795            .unwrap();
796    }
797
798    #[tokio::test]
799    async fn test_generate_task_id() {
800        let id1 = InMemoryTaskStorage::generate_task_id();
801        let id2 = InMemoryTaskStorage::generate_task_id();
802        assert_ne!(id1, id2);
803        // UUID v7 should be parseable
804        assert!(uuid::Uuid::parse_str(&id1).is_ok());
805    }
806
807    // === Parity tests ===
808
809    #[tokio::test]
810    async fn parity_create_and_retrieve() {
811        let storage = InMemoryTaskStorage::new();
812        crate::parity_tests::test_create_and_retrieve(&storage).await;
813    }
814
815    #[tokio::test]
816    async fn parity_state_machine_enforcement() {
817        let storage = InMemoryTaskStorage::new();
818        crate::parity_tests::test_state_machine_enforcement(&storage).await;
819    }
820
821    #[tokio::test]
822    async fn parity_terminal_state_rejection() {
823        let storage = InMemoryTaskStorage::new();
824        crate::parity_tests::test_terminal_state_rejection(&storage).await;
825    }
826
827    #[tokio::test]
828    async fn parity_cursor_determinism() {
829        let storage = InMemoryTaskStorage::new();
830        crate::parity_tests::test_cursor_determinism(&storage).await;
831    }
832
833    #[tokio::test]
834    async fn parity_session_scoping() {
835        let storage = InMemoryTaskStorage::new();
836        crate::parity_tests::test_session_scoping(&storage).await;
837    }
838
839    #[tokio::test]
840    async fn parity_ttl_expiry() {
841        let storage = InMemoryTaskStorage::new();
842        crate::parity_tests::test_ttl_expiry(&storage).await;
843    }
844
845    #[tokio::test]
846    async fn parity_task_result_round_trip() {
847        let storage = InMemoryTaskStorage::new();
848        crate::parity_tests::test_task_result_round_trip(&storage).await;
849    }
850
851    #[tokio::test]
852    async fn parity_recover_stuck_tasks() {
853        let storage = InMemoryTaskStorage::new();
854        crate::parity_tests::test_recover_stuck_tasks(&storage).await;
855    }
856
857    #[tokio::test]
858    async fn parity_max_tasks_limit() {
859        let storage = InMemoryTaskStorage::with_config(InMemoryTaskConfig {
860            max_tasks: 5,
861            ..Default::default()
862        });
863        crate::parity_tests::test_max_tasks_limit(&storage, 5).await;
864    }
865
866    #[tokio::test]
867    async fn parity_error_mapping() {
868        let storage = InMemoryTaskStorage::new();
869        crate::parity_tests::test_error_mapping_parity(&storage).await;
870    }
871
872    #[tokio::test]
873    async fn parity_concurrent_status_updates() {
874        let storage = std::sync::Arc::new(InMemoryTaskStorage::new());
875        crate::parity_tests::test_concurrent_status_updates(storage).await;
876    }
877}