Skip to main content

data_connector/
memory.rs

1//! In-memory storage implementations
2//!
3//! Used for development and testing - no persistence.
4//!
5//! Structure:
6//! 1. MemoryConversationStorage
7//! 2. MemoryConversationItemStorage
8//! 3. MemoryResponseStorage
9
10use std::{
11    collections::{BTreeMap, HashMap},
12    sync::Arc,
13};
14
15use async_trait::async_trait;
16use chrono::{DateTime, Utc};
17use parking_lot::RwLock;
18
19use super::core::*;
20
21// ============================================================================
22// PART 1: MemoryConversationStorage
23// ============================================================================
24
25/// In-memory conversation storage used for development and tests
26#[derive(Default, Clone)]
27pub struct MemoryConversationStorage {
28    inner: Arc<RwLock<HashMap<ConversationId, Conversation>>>,
29}
30
31impl MemoryConversationStorage {
32    pub fn new() -> Self {
33        Self {
34            inner: Arc::new(RwLock::new(HashMap::new())),
35        }
36    }
37}
38
39#[async_trait]
40impl ConversationStorage for MemoryConversationStorage {
41    async fn create_conversation(
42        &self,
43        input: NewConversation,
44    ) -> ConversationResult<Conversation> {
45        let conversation = Conversation::new(input);
46        self.inner
47            .write()
48            .insert(conversation.id.clone(), conversation.clone());
49        Ok(conversation)
50    }
51
52    async fn get_conversation(
53        &self,
54        id: &ConversationId,
55    ) -> ConversationResult<Option<Conversation>> {
56        Ok(self.inner.read().get(id).cloned())
57    }
58
59    async fn update_conversation(
60        &self,
61        id: &ConversationId,
62        metadata: Option<ConversationMetadata>,
63    ) -> ConversationResult<Option<Conversation>> {
64        let mut store = self.inner.write();
65        if let Some(entry) = store.get_mut(id) {
66            entry.metadata = metadata;
67            return Ok(Some(entry.clone()));
68        }
69
70        Ok(None)
71    }
72
73    async fn delete_conversation(&self, id: &ConversationId) -> ConversationResult<bool> {
74        let removed = self.inner.write().remove(id).is_some();
75        Ok(removed)
76    }
77}
78
79// ============================================================================
80// PART 2: MemoryConversationItemStorage
81// ============================================================================
82
83/// Internal store for conversation items, protected by a single lock to prevent
84/// lock ordering inversions between the three maps.
85#[derive(Default)]
86struct ConversationItemInner {
87    /// All items indexed by ID
88    items: HashMap<ConversationItemId, ConversationItem>,
89    /// Per-conversation sorted links: (timestamp, item_id_str) -> ConversationItemId
90    links: HashMap<ConversationId, BTreeMap<(i64, String), ConversationItemId>>,
91    /// Per-conversation reverse index: item_id_str -> (timestamp, item_id_str)
92    rev_index: HashMap<ConversationId, HashMap<String, (i64, String)>>,
93}
94
95#[derive(Default, Clone)]
96pub struct MemoryConversationItemStorage {
97    inner: Arc<RwLock<ConversationItemInner>>,
98}
99
100impl MemoryConversationItemStorage {
101    pub fn new() -> Self {
102        Self::default()
103    }
104}
105
106#[async_trait]
107impl ConversationItemStorage for MemoryConversationItemStorage {
108    async fn create_item(
109        &self,
110        new_item: NewConversationItem,
111    ) -> ConversationItemResult<ConversationItem> {
112        let id = new_item
113            .id
114            .clone()
115            .unwrap_or_else(|| make_item_id(&new_item.item_type));
116        let created_at = Utc::now();
117        let item = ConversationItem {
118            id: id.clone(),
119            response_id: new_item.response_id,
120            item_type: new_item.item_type,
121            role: new_item.role,
122            content: new_item.content,
123            status: new_item.status,
124            created_at,
125        };
126        self.inner.write().items.insert(id, item.clone());
127        Ok(item)
128    }
129
130    async fn link_item(
131        &self,
132        conversation_id: &ConversationId,
133        item_id: &ConversationItemId,
134        added_at: DateTime<Utc>,
135    ) -> ConversationItemResult<()> {
136        let mut store = self.inner.write();
137        store
138            .links
139            .entry(conversation_id.clone())
140            .or_default()
141            .insert((added_at.timestamp(), item_id.0.clone()), item_id.clone());
142        store
143            .rev_index
144            .entry(conversation_id.clone())
145            .or_default()
146            .insert(item_id.0.clone(), (added_at.timestamp(), item_id.0.clone()));
147        Ok(())
148    }
149
150    async fn link_items(
151        &self,
152        conversation_id: &ConversationId,
153        items: &[(ConversationItemId, DateTime<Utc>)],
154    ) -> ConversationItemResult<()> {
155        let mut store = self.inner.write();
156        let links = store.links.entry(conversation_id.clone()).or_default();
157        for (item_id, added_at) in items {
158            links.insert((added_at.timestamp(), item_id.0.clone()), item_id.clone());
159        }
160        let rev = store.rev_index.entry(conversation_id.clone()).or_default();
161        for (item_id, added_at) in items {
162            rev.insert(item_id.0.clone(), (added_at.timestamp(), item_id.0.clone()));
163        }
164        Ok(())
165    }
166
167    async fn list_items(
168        &self,
169        conversation_id: &ConversationId,
170        params: ListParams,
171    ) -> ConversationItemResult<Vec<ConversationItem>> {
172        let store = self.inner.read();
173        let map = match store.links.get(conversation_id) {
174            Some(m) => m,
175            None => return Ok(Vec::new()),
176        };
177
178        let after_key: Option<(i64, String)> = if let Some(after_id) = &params.after {
179            store
180                .rev_index
181                .get(conversation_id)
182                .and_then(|idx| idx.get(after_id).cloned())
183        } else {
184            None
185        };
186
187        let take = params.limit;
188        let mut results: Vec<ConversationItem> = Vec::new();
189
190        use std::ops::Bound::{Excluded, Unbounded};
191
192        let mut push_item = |key: &ConversationItemId| -> bool {
193            if let Some(it) = store.items.get(key) {
194                results.push(it.clone());
195                if results.len() == take {
196                    return true;
197                }
198            }
199            false
200        };
201
202        match (params.order, after_key) {
203            (SortOrder::Desc, Some(k)) => {
204                for ((_ts, _id), item_key) in map.range(..k).rev() {
205                    if push_item(item_key) {
206                        break;
207                    }
208                }
209            }
210            (SortOrder::Desc, None) => {
211                for ((_ts, _id), item_key) in map.iter().rev() {
212                    if push_item(item_key) {
213                        break;
214                    }
215                }
216            }
217            (SortOrder::Asc, Some(k)) => {
218                for ((_ts, _id), item_key) in map.range((Excluded(k), Unbounded)) {
219                    if push_item(item_key) {
220                        break;
221                    }
222                }
223            }
224            (SortOrder::Asc, None) => {
225                for ((_ts, _id), item_key) in map {
226                    if push_item(item_key) {
227                        break;
228                    }
229                }
230            }
231        }
232
233        Ok(results)
234    }
235
236    async fn get_item(
237        &self,
238        item_id: &ConversationItemId,
239    ) -> ConversationItemResult<Option<ConversationItem>> {
240        Ok(self.inner.read().items.get(item_id).cloned())
241    }
242
243    async fn is_item_linked(
244        &self,
245        conversation_id: &ConversationId,
246        item_id: &ConversationItemId,
247    ) -> ConversationItemResult<bool> {
248        let store = self.inner.read();
249        Ok(store
250            .rev_index
251            .get(conversation_id)
252            .is_some_and(|idx| idx.contains_key(&item_id.0)))
253    }
254
255    async fn delete_item(
256        &self,
257        conversation_id: &ConversationId,
258        item_id: &ConversationItemId,
259    ) -> ConversationItemResult<()> {
260        let mut store = self.inner.write();
261        let key_to_remove = store
262            .rev_index
263            .get_mut(conversation_id)
264            .and_then(|idx| idx.remove(&item_id.0));
265
266        if let Some(key) = key_to_remove {
267            if let Some(conv_links) = store.links.get_mut(conversation_id) {
268                conv_links.remove(&key);
269            }
270        }
271
272        Ok(())
273    }
274}
275
276// ============================================================================
277// PART 3: MemoryResponseStorage
278// ============================================================================
279
280/// Internal store structure holding both maps together
281#[derive(Default)]
282struct InnerStore {
283    /// All stored responses indexed by ID
284    responses: HashMap<ResponseId, StoredResponse>,
285    /// Index of response IDs by safety identifier
286    identifier_index: HashMap<String, Vec<ResponseId>>,
287}
288
289/// In-memory implementation of response storage
290pub struct MemoryResponseStorage {
291    /// Single lock wrapping both maps to prevent deadlocks and ensure atomic updates
292    store: Arc<RwLock<InnerStore>>,
293}
294
295impl MemoryResponseStorage {
296    pub fn new() -> Self {
297        Self {
298            store: Arc::new(RwLock::new(InnerStore::default())),
299        }
300    }
301
302    /// Get statistics about the store
303    #[cfg(test)]
304    pub(super) fn stats(&self) -> MemoryStoreStats {
305        let store = self.store.read();
306        MemoryStoreStats {
307            response_count: store.responses.len(),
308            identifier_count: store.identifier_index.len(),
309        }
310    }
311
312    /// Clear all data (useful for testing)
313    pub fn clear(&self) {
314        let mut store = self.store.write();
315        store.responses.clear();
316        store.identifier_index.clear();
317    }
318}
319
320impl Default for MemoryResponseStorage {
321    fn default() -> Self {
322        Self::new()
323    }
324}
325
326#[async_trait]
327impl ResponseStorage for MemoryResponseStorage {
328    async fn store_response(&self, mut response: StoredResponse) -> ResponseResult<ResponseId> {
329        // Generate ID if not set
330        if response.id.0.is_empty() {
331            response.id = ResponseId::new();
332        }
333
334        let response_id = response.id.clone();
335
336        // Single lock acquisition for atomic update
337        let mut store = self.store.write();
338
339        // Update safety identifier index if specified
340        if let Some(ref safety_identifier) = response.safety_identifier {
341            store
342                .identifier_index
343                .entry(safety_identifier.clone())
344                .or_default()
345                .push(response_id.clone());
346        }
347
348        store.responses.insert(response_id.clone(), response);
349        tracing::debug!(
350            memory_store_size = store.responses.len(),
351            "Response stored in memory"
352        );
353
354        Ok(response_id)
355    }
356
357    async fn get_response(
358        &self,
359        response_id: &ResponseId,
360    ) -> ResponseResult<Option<StoredResponse>> {
361        let store = self.store.read();
362        let result = store.responses.get(response_id).cloned();
363        tracing::debug!(response_id = %response_id.0, found = result.is_some(), "Memory response lookup");
364        Ok(result)
365    }
366
367    async fn delete_response(&self, response_id: &ResponseId) -> ResponseResult<()> {
368        let mut store = self.store.write();
369
370        // Remove the response and update user index if needed
371        if let Some(response) = store.responses.remove(response_id) {
372            if let Some(ref safety_identifier) = response.safety_identifier {
373                if let Some(user_responses) = store.identifier_index.get_mut(safety_identifier) {
374                    user_responses.retain(|id| id != response_id);
375                }
376            }
377        }
378
379        Ok(())
380    }
381
382    async fn get_response_chain(
383        &self,
384        response_id: &ResponseId,
385        max_depth: Option<usize>,
386    ) -> ResponseResult<ResponseChain> {
387        let mut chain = ResponseChain::new();
388        let max_depth = max_depth.unwrap_or(100); // Default max depth to prevent infinite loops
389
390        // Single lock acquisition: walk the chain and collect responses atomically
391        // to prevent concurrent writers from causing silent data loss between reads.
392        let store = self.store.read();
393        let mut current_id = Some(response_id.clone());
394        let mut depth = 0;
395
396        while let Some(id) = current_id {
397            if depth >= max_depth {
398                break;
399            }
400
401            if let Some(response) = store.responses.get(&id) {
402                #[expect(
403                    clippy::assigning_clones,
404                    reason = "false positive: while-let moves out of current_id, making clone_from invalid"
405                )]
406                {
407                    current_id = response.previous_response_id.clone();
408                }
409                chain.add_response(response.clone());
410                depth += 1;
411            } else {
412                break;
413            }
414        }
415        drop(store);
416
417        // Reverse to get chronological order (oldest first)
418        chain.responses.reverse();
419
420        Ok(chain)
421    }
422
423    async fn list_identifier_responses(
424        &self,
425        identifier: &str,
426        limit: Option<usize>,
427    ) -> ResponseResult<Vec<StoredResponse>> {
428        let store = self.store.read();
429
430        if let Some(user_response_ids) = store.identifier_index.get(identifier) {
431            // Collect responses with their timestamps for sorting
432            let mut responses_with_time: Vec<_> = user_response_ids
433                .iter()
434                .filter_map(|id| store.responses.get(id).map(|r| (r.created_at, r)))
435                .collect();
436
437            // Sort by creation time (newest first)
438            responses_with_time.sort_by(|a, b| b.0.cmp(&a.0));
439
440            // Apply limit and collect the actual responses
441            let limit = limit.unwrap_or(responses_with_time.len());
442            let user_responses: Vec<StoredResponse> = responses_with_time
443                .into_iter()
444                .take(limit)
445                .map(|(_, r)| r.clone())
446                .collect();
447
448            Ok(user_responses)
449        } else {
450            Ok(Vec::new())
451        }
452    }
453
454    async fn delete_identifier_responses(&self, identifier: &str) -> ResponseResult<usize> {
455        let mut store = self.store.write();
456
457        if let Some(user_response_ids) = store.identifier_index.remove(identifier) {
458            let count = user_response_ids.len();
459            for id in user_response_ids {
460                store.responses.remove(&id);
461            }
462            Ok(count)
463        } else {
464            Ok(0)
465        }
466    }
467}
468
469/// Statistics for the memory store
470#[cfg(test)]
471#[derive(Debug, Clone)]
472pub(super) struct MemoryStoreStats {
473    pub response_count: usize,
474    pub identifier_count: usize,
475}
476
477#[cfg(test)]
478mod tests {
479    use chrono::{TimeZone, Utc};
480    use serde_json::json;
481
482    use super::*;
483
484    // ========================================================================
485    // ConversationItem Tests
486    // ========================================================================
487
488    fn make_item(
489        item_type: &str,
490        role: Option<&str>,
491        content: serde_json::Value,
492    ) -> NewConversationItem {
493        NewConversationItem {
494            id: None,
495            response_id: None,
496            item_type: item_type.to_string(),
497            role: role.map(|r| r.to_string()),
498            content,
499            status: Some("completed".to_string()),
500        }
501    }
502
503    #[tokio::test]
504    async fn test_list_ordering_and_cursors() {
505        let store = MemoryConversationItemStorage::new();
506        let conv: ConversationId = "conv_test".into();
507
508        // Create 3 items and link them at controlled timestamps
509        let i1 = store
510            .create_item(make_item("message", Some("user"), json!([])))
511            .await
512            .unwrap();
513        let i2 = store
514            .create_item(make_item("message", Some("assistant"), json!([])))
515            .await
516            .unwrap();
517        let i3 = store
518            .create_item(make_item("reasoning", None, json!([])))
519            .await
520            .unwrap();
521
522        let t1 = Utc.timestamp_opt(1_700_000_001, 0).single().unwrap();
523        let t2 = Utc.timestamp_opt(1_700_000_002, 0).single().unwrap();
524        let t3 = Utc.timestamp_opt(1_700_000_003, 0).single().unwrap();
525
526        store.link_item(&conv, &i1.id, t1).await.unwrap();
527        store.link_item(&conv, &i2.id, t2).await.unwrap();
528        store.link_item(&conv, &i3.id, t3).await.unwrap();
529
530        // Desc order, no cursor
531        let desc = store
532            .list_items(
533                &conv,
534                ListParams {
535                    limit: 2,
536                    order: SortOrder::Desc,
537                    after: None,
538                },
539            )
540            .await
541            .unwrap();
542        assert!(desc.len() >= 2);
543        assert_eq!(desc[0].id, i3.id);
544        assert_eq!(desc[1].id, i2.id);
545
546        // Desc with cursor = i2 -> expect i1 next
547        let desc_after = store
548            .list_items(
549                &conv,
550                ListParams {
551                    limit: 2,
552                    order: SortOrder::Desc,
553                    after: Some(i2.id.0.clone()),
554                },
555            )
556            .await
557            .unwrap();
558        assert!(!desc_after.is_empty());
559        assert_eq!(desc_after[0].id, i1.id);
560
561        // Asc order, no cursor
562        let asc = store
563            .list_items(
564                &conv,
565                ListParams {
566                    limit: 2,
567                    order: SortOrder::Asc,
568                    after: None,
569                },
570            )
571            .await
572            .unwrap();
573        assert!(asc.len() >= 2);
574        assert_eq!(asc[0].id, i1.id);
575        assert_eq!(asc[1].id, i2.id);
576
577        // Asc with cursor = i2 -> expect i3 next
578        let asc_after = store
579            .list_items(
580                &conv,
581                ListParams {
582                    limit: 2,
583                    order: SortOrder::Asc,
584                    after: Some(i2.id.0.clone()),
585                },
586            )
587            .await
588            .unwrap();
589        assert!(!asc_after.is_empty());
590        assert_eq!(asc_after[0].id, i3.id);
591    }
592
593    // ========================================================================
594    // Response Tests
595    // ========================================================================
596
597    #[tokio::test]
598    async fn test_store_with_custom_id() {
599        let store = MemoryResponseStorage::new();
600        let mut response = StoredResponse::new(None);
601        response.id = ResponseId::from("resp_custom");
602        response.input = json!("Input");
603        response.raw_response = json!({"output": "Output"});
604        store.store_response(response.clone()).await.unwrap();
605        let retrieved = store
606            .get_response(&ResponseId::from("resp_custom"))
607            .await
608            .unwrap();
609        assert!(retrieved.is_some());
610        assert_eq!(retrieved.unwrap().raw_response["output"], json!("Output"));
611    }
612
613    #[tokio::test]
614    async fn test_memory_store_basic() {
615        let store = MemoryResponseStorage::new();
616
617        // Store a response
618        let mut response = StoredResponse::new(None);
619        response.input = json!("Hello");
620        response.raw_response = json!({"output": "Hi there!"});
621        let response_id = store.store_response(response).await.unwrap();
622
623        // Retrieve it
624        let retrieved = store.get_response(&response_id).await.unwrap();
625        assert!(retrieved.is_some());
626        assert_eq!(retrieved.unwrap().input, json!("Hello"));
627
628        // Delete it
629        store.delete_response(&response_id).await.unwrap();
630        let deleted = store.get_response(&response_id).await.unwrap();
631        assert!(deleted.is_none());
632    }
633
634    #[tokio::test]
635    async fn test_response_chain() {
636        let store = MemoryResponseStorage::new();
637
638        // Create a chain of responses
639        let mut response1 = StoredResponse::new(None);
640        response1.input = json!("First");
641        response1.raw_response = json!({"output": "First response"});
642        let id1 = store.store_response(response1).await.unwrap();
643
644        let mut response2 = StoredResponse::new(Some(id1.clone()));
645        response2.input = json!("Second");
646        response2.raw_response = json!({"output": "Second response"});
647        let id2 = store.store_response(response2).await.unwrap();
648
649        let mut response3 = StoredResponse::new(Some(id2.clone()));
650        response3.input = json!("Third");
651        response3.raw_response = json!({"output": "Third response"});
652        let id3 = store.store_response(response3).await.unwrap();
653
654        // Get the chain
655        let chain = store.get_response_chain(&id3, None).await.unwrap();
656        assert_eq!(chain.responses.len(), 3);
657        assert_eq!(chain.responses[0].input, json!("First"));
658        assert_eq!(chain.responses[1].input, json!("Second"));
659        assert_eq!(chain.responses[2].input, json!("Third"));
660
661        let limited_chain = store.get_response_chain(&id3, Some(2)).await.unwrap();
662        assert_eq!(limited_chain.responses.len(), 2);
663        assert_eq!(limited_chain.responses[0].input, json!("Second"));
664        assert_eq!(limited_chain.responses[1].input, json!("Third"));
665    }
666
667    #[tokio::test]
668    async fn test_user_responses() {
669        let store = MemoryResponseStorage::new();
670
671        // Store responses for different users
672        let mut response1 = StoredResponse::new(None);
673        response1.input = json!("User1 message");
674        response1.safety_identifier = Some("user1".to_string());
675        store.store_response(response1).await.unwrap();
676
677        let mut response2 = StoredResponse::new(None);
678        response2.input = json!("Another user1 message");
679        response2.safety_identifier = Some("user1".to_string());
680        store.store_response(response2).await.unwrap();
681
682        let mut response3 = StoredResponse::new(None);
683        response3.input = json!("User2 message");
684        response3.safety_identifier = Some("user2".to_string());
685        store.store_response(response3).await.unwrap();
686
687        // List user1's responses
688        let user1_responses = store
689            .list_identifier_responses("user1", None)
690            .await
691            .unwrap();
692        assert_eq!(user1_responses.len(), 2);
693
694        // List user2's responses
695        let user2_responses = store
696            .list_identifier_responses("user2", None)
697            .await
698            .unwrap();
699        assert_eq!(user2_responses.len(), 1);
700
701        // Delete user1's responses
702        let deleted_count = store.delete_identifier_responses("user1").await.unwrap();
703        assert_eq!(deleted_count, 2);
704
705        let user1_responses_after = store
706            .list_identifier_responses("user1", None)
707            .await
708            .unwrap();
709        assert_eq!(user1_responses_after.len(), 0);
710
711        // User2's responses should still be there
712        let user2_responses_after = store
713            .list_identifier_responses("user2", None)
714            .await
715            .unwrap();
716        assert_eq!(user2_responses_after.len(), 1);
717    }
718
719    #[tokio::test]
720    async fn test_memory_store_stats() {
721        let store = MemoryResponseStorage::new();
722
723        let mut response1 = StoredResponse::new(None);
724        response1.input = json!("Test1");
725        response1.safety_identifier = Some("user1".to_string());
726        store.store_response(response1).await.unwrap();
727
728        let mut response2 = StoredResponse::new(None);
729        response2.input = json!("Test2");
730        response2.safety_identifier = Some("user2".to_string());
731        store.store_response(response2).await.unwrap();
732
733        let stats = store.stats();
734        assert_eq!(stats.response_count, 2);
735        assert_eq!(stats.identifier_count, 2);
736    }
737
738    #[tokio::test]
739    async fn test_conversation_item_storage_clone_shares_state() {
740        let store = MemoryConversationItemStorage::new();
741        let clone = store.clone();
742
743        // Write through original
744        let item = store
745            .create_item(make_item("message", Some("user"), json!([])))
746            .await
747            .unwrap();
748
749        // Read through clone — should see the same item
750        let found = clone.get_item(&item.id).await.unwrap();
751        assert!(found.is_some());
752        assert_eq!(found.unwrap().id, item.id);
753    }
754
755    // ========================================================================
756    // MemoryConversationStorage Tests
757    // ========================================================================
758
759    #[tokio::test]
760    async fn test_conversation_create_generates_id() {
761        let store = MemoryConversationStorage::new();
762        let conv = store
763            .create_conversation(NewConversation::default())
764            .await
765            .expect("create_conversation should succeed");
766        assert!(
767            conv.id.0.starts_with("conv_"),
768            "generated ID should have conv_ prefix"
769        );
770    }
771
772    #[tokio::test]
773    async fn test_conversation_create_with_custom_id() {
774        let store = MemoryConversationStorage::new();
775        let input = NewConversation {
776            id: Some(ConversationId::from("conv_my_custom")),
777            metadata: None,
778        };
779        let conv = store
780            .create_conversation(input)
781            .await
782            .expect("create_conversation should succeed");
783        assert_eq!(conv.id.0, "conv_my_custom");
784    }
785
786    #[tokio::test]
787    async fn test_conversation_create_preserves_metadata() {
788        let store = MemoryConversationStorage::new();
789        let mut metadata = serde_json::Map::new();
790        metadata.insert("key".to_string(), json!("value"));
791        metadata.insert("count".to_string(), json!(42));
792
793        let input = NewConversation {
794            id: None,
795            metadata: Some(metadata.clone()),
796        };
797        let conv = store
798            .create_conversation(input)
799            .await
800            .expect("create_conversation should succeed");
801        let stored_metadata = conv.metadata.expect("metadata should be present");
802        assert_eq!(stored_metadata["key"], json!("value"));
803        assert_eq!(stored_metadata["count"], json!(42));
804    }
805
806    #[tokio::test]
807    async fn test_conversation_get_nonexistent_returns_none() {
808        let store = MemoryConversationStorage::new();
809        let result = store
810            .get_conversation(&ConversationId::from("conv_does_not_exist"))
811            .await
812            .expect("get_conversation should succeed");
813        assert!(result.is_none());
814    }
815
816    #[tokio::test]
817    async fn test_conversation_get_returns_stored() {
818        let store = MemoryConversationStorage::new();
819        let conv = store
820            .create_conversation(NewConversation {
821                id: Some(ConversationId::from("conv_stored")),
822                metadata: None,
823            })
824            .await
825            .expect("create_conversation should succeed");
826
827        let retrieved = store
828            .get_conversation(&conv.id)
829            .await
830            .expect("get_conversation should succeed")
831            .expect("conversation should exist");
832        assert_eq!(retrieved.id, conv.id);
833        assert_eq!(retrieved.created_at, conv.created_at);
834    }
835
836    #[tokio::test]
837    async fn test_conversation_update_metadata() {
838        let store = MemoryConversationStorage::new();
839        let conv = store
840            .create_conversation(NewConversation {
841                id: Some(ConversationId::from("conv_update")),
842                metadata: None,
843            })
844            .await
845            .expect("create_conversation should succeed");
846
847        // Update with new metadata
848        let mut new_metadata = serde_json::Map::new();
849        new_metadata.insert("updated".to_string(), json!(true));
850
851        let updated = store
852            .update_conversation(&conv.id, Some(new_metadata))
853            .await
854            .expect("update_conversation should succeed")
855            .expect("conversation should exist for update");
856        let meta = updated
857            .metadata
858            .expect("metadata should be present after update");
859        assert_eq!(meta["updated"], json!(true));
860
861        // Verify the update persists on subsequent get
862        let fetched = store
863            .get_conversation(&conv.id)
864            .await
865            .expect("get_conversation should succeed")
866            .expect("conversation should still exist");
867        let fetched_meta = fetched.metadata.expect("metadata should persist");
868        assert_eq!(fetched_meta["updated"], json!(true));
869    }
870
871    #[tokio::test]
872    async fn test_conversation_update_nonexistent_returns_none() {
873        let store = MemoryConversationStorage::new();
874        let result = store
875            .update_conversation(&ConversationId::from("conv_ghost"), None)
876            .await
877            .expect("update_conversation should succeed");
878        assert!(result.is_none());
879    }
880
881    #[tokio::test]
882    async fn test_conversation_delete_removes() {
883        let store = MemoryConversationStorage::new();
884        let conv = store
885            .create_conversation(NewConversation {
886                id: Some(ConversationId::from("conv_to_delete")),
887                metadata: None,
888            })
889            .await
890            .expect("create_conversation should succeed");
891
892        let deleted = store
893            .delete_conversation(&conv.id)
894            .await
895            .expect("delete_conversation should succeed");
896        assert!(
897            deleted,
898            "delete should return true for existing conversation"
899        );
900
901        let after = store
902            .get_conversation(&conv.id)
903            .await
904            .expect("get_conversation should succeed");
905        assert!(after.is_none(), "conversation should be gone after delete");
906    }
907
908    #[tokio::test]
909    async fn test_conversation_delete_nonexistent_returns_false() {
910        let store = MemoryConversationStorage::new();
911        let deleted = store
912            .delete_conversation(&ConversationId::from("conv_never_existed"))
913            .await
914            .expect("delete_conversation should succeed");
915        assert!(
916            !deleted,
917            "delete should return false for non-existent conversation"
918        );
919    }
920
921    #[tokio::test]
922    async fn test_multiple_conversations_coexist() {
923        let store = MemoryConversationStorage::new();
924
925        let conv1 = store
926            .create_conversation(NewConversation {
927                id: Some(ConversationId::from("conv_alpha")),
928                metadata: None,
929            })
930            .await
931            .expect("create conv1 should succeed");
932
933        let conv2 = store
934            .create_conversation(NewConversation {
935                id: Some(ConversationId::from("conv_beta")),
936                metadata: None,
937            })
938            .await
939            .expect("create conv2 should succeed");
940
941        // Both should be retrievable
942        let got1 = store
943            .get_conversation(&conv1.id)
944            .await
945            .expect("get conv1 should succeed")
946            .expect("conv1 should exist");
947        let got2 = store
948            .get_conversation(&conv2.id)
949            .await
950            .expect("get conv2 should succeed")
951            .expect("conv2 should exist");
952
953        assert_eq!(got1.id.0, "conv_alpha");
954        assert_eq!(got2.id.0, "conv_beta");
955
956        // Deleting one doesn't affect the other
957        store
958            .delete_conversation(&conv1.id)
959            .await
960            .expect("delete conv1 should succeed");
961
962        assert!(store
963            .get_conversation(&conv1.id)
964            .await
965            .expect("get should succeed")
966            .is_none());
967        assert!(store
968            .get_conversation(&conv2.id)
969            .await
970            .expect("get should succeed")
971            .is_some());
972    }
973
974    #[tokio::test]
975    async fn test_delete_item_unlinks_but_preserves_item() {
976        let store = MemoryConversationItemStorage::new();
977        let conv: ConversationId = "conv_del".into();
978
979        let item = store
980            .create_item(make_item("message", Some("user"), json!([])))
981            .await
982            .unwrap();
983        let t = Utc::now();
984        store.link_item(&conv, &item.id, t).await.unwrap();
985
986        // Item is linked
987        assert!(store.is_item_linked(&conv, &item.id).await.unwrap());
988
989        // Delete (unlink)
990        store.delete_item(&conv, &item.id).await.unwrap();
991
992        // No longer linked
993        assert!(!store.is_item_linked(&conv, &item.id).await.unwrap());
994
995        // But item data itself is still retrievable
996        assert!(store.get_item(&item.id).await.unwrap().is_some());
997    }
998}