Skip to main content

cerememory_store_raw/
lib.rs

1//! Cerememory raw journal store implementation backed by redb.
2//!
3//! The raw journal is an append-oriented preservation layer for verbatim
4//! conversation turns, tool I/O, and other externally visible traces.
5
6use std::path::{Path, PathBuf};
7use std::sync::{Arc, Mutex};
8
9use chrono::{DateTime, Utc};
10use redb::{Database, ReadableTable, ReadableTableMetadata, TableDefinition};
11use serde::{Deserialize, Serialize};
12use tantivy::collector::TopDocs;
13use tantivy::query::QueryParser;
14use tantivy::schema::*;
15use tantivy::{Directory, Index, IndexReader, IndexWriter, ReloadPolicy, TantivyDocument};
16use uuid::Uuid;
17
18use cerememory_core::error::CerememoryError;
19use cerememory_core::types::RawJournalRecord;
20use cerememory_store_common::storage_err;
21
22/// Primary table: UUID (16 bytes) -> MessagePack-encoded `RawJournalRecord`.
23const RAW_JOURNAL_RECORDS: TableDefinition<&[u8], &[u8]> =
24    TableDefinition::new("raw_journal_records");
25/// Secondary table: session id -> MessagePack-encoded ordered session index entries.
26const RAW_JOURNAL_SESSION_INDEX: TableDefinition<&str, &[u8]> =
27    TableDefinition::new("raw_journal_session_index");
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
30struct SessionIndexEntry {
31    id: Uuid,
32    created_at: DateTime<Utc>,
33}
34
35struct RawFields {
36    id: Field,
37    session_id: Field,
38    body: Field,
39}
40
41#[derive(Clone)]
42struct RawTextSearchHit {
43    record_id: Uuid,
44}
45
46struct RawTextIndex {
47    index: Index,
48    reader: IndexReader,
49    writer: Arc<Mutex<IndexWriter>>,
50    fields: RawFields,
51}
52
53impl RawTextIndex {
54    fn open(path: &Path) -> Result<Self, CerememoryError> {
55        let dir_path = raw_text_index_path(path);
56        std::fs::create_dir_all(&dir_path).map_err(|e| {
57            CerememoryError::Storage(format!("Failed to create raw text index dir: {e}"))
58        })?;
59        let dir = tantivy::directory::MmapDirectory::open(&dir_path)
60            .map_err(|e| CerememoryError::Storage(format!("Tantivy dir open: {e}")))?;
61        Self::open_with_dir(dir)
62    }
63
64    fn open_in_memory() -> Result<Self, CerememoryError> {
65        let dir = tantivy::directory::RamDirectory::create();
66        Self::open_with_dir(dir)
67    }
68
69    fn open_with_dir(dir: impl Directory + 'static) -> Result<Self, CerememoryError> {
70        let schema = Self::build_schema();
71        let index = Index::open_or_create(dir, schema.clone())
72            .map_err(|e| CerememoryError::Storage(format!("Tantivy index open: {e}")))?;
73        let writer = index
74            .writer(20_000_000)
75            .map_err(|e| CerememoryError::Storage(format!("Tantivy writer: {e}")))?;
76        let reader = index
77            .reader_builder()
78            .reload_policy(ReloadPolicy::OnCommitWithDelay)
79            .try_into()
80            .map_err(|e| CerememoryError::Storage(format!("Tantivy reader: {e}")))?;
81        let fields = RawFields {
82            id: schema.get_field("id").unwrap(),
83            session_id: schema.get_field("session_id").unwrap(),
84            body: schema.get_field("body").unwrap(),
85        };
86
87        Ok(Self {
88            index,
89            reader,
90            writer: Arc::new(Mutex::new(writer)),
91            fields,
92        })
93    }
94
95    fn build_schema() -> Schema {
96        let mut builder = Schema::builder();
97        builder.add_text_field("id", STRING | STORED);
98        builder.add_text_field("session_id", STRING | STORED);
99        builder.add_text_field("body", TEXT);
100        builder.build()
101    }
102
103    fn lock_writer(&self) -> Result<std::sync::MutexGuard<'_, IndexWriter>, CerememoryError> {
104        self.writer
105            .lock()
106            .map_err(|e| CerememoryError::Internal(format!("RawTextIndex writer lock: {e}")))
107    }
108
109    fn add(&self, id: Uuid, session_id: &str, body: &str) -> Result<(), CerememoryError> {
110        let mut doc = TantivyDocument::new();
111        doc.add_text(self.fields.id, id.to_string());
112        doc.add_text(self.fields.session_id, session_id);
113        doc.add_text(self.fields.body, body);
114
115        let mut writer = self.lock_writer()?;
116        writer
117            .add_document(doc)
118            .map_err(|e| CerememoryError::Storage(format!("Tantivy add doc: {e}")))?;
119        writer
120            .commit()
121            .map_err(|e| CerememoryError::Storage(format!("Tantivy commit: {e}")))?;
122        Ok(())
123    }
124
125    fn remove(&self, id: Uuid) -> Result<(), CerememoryError> {
126        let term = tantivy::Term::from_field_text(self.fields.id, &id.to_string());
127        let mut writer = self.lock_writer()?;
128        writer.delete_term(term);
129        writer
130            .commit()
131            .map_err(|e| CerememoryError::Storage(format!("Tantivy commit: {e}")))?;
132        Ok(())
133    }
134
135    fn update(&self, id: Uuid, session_id: &str, body: &str) -> Result<(), CerememoryError> {
136        let term = tantivy::Term::from_field_text(self.fields.id, &id.to_string());
137        let mut doc = TantivyDocument::new();
138        doc.add_text(self.fields.id, id.to_string());
139        doc.add_text(self.fields.session_id, session_id);
140        doc.add_text(self.fields.body, body);
141
142        let mut writer = self.lock_writer()?;
143        writer.delete_term(term);
144        writer
145            .add_document(doc)
146            .map_err(|e| CerememoryError::Storage(format!("Tantivy add doc: {e}")))?;
147        writer
148            .commit()
149            .map_err(|e| CerememoryError::Storage(format!("Tantivy commit: {e}")))?;
150        Ok(())
151    }
152
153    fn search(
154        &self,
155        query: &str,
156        session_id: Option<&str>,
157        limit: usize,
158    ) -> Result<Vec<RawTextSearchHit>, CerememoryError> {
159        self.reader
160            .reload()
161            .map_err(|e| CerememoryError::Storage(format!("Tantivy reader reload: {e}")))?;
162
163        let searcher = self.reader.searcher();
164        let query_parser = QueryParser::for_index(&self.index, vec![self.fields.body]);
165        let parsed = query_parser
166            .parse_query(query)
167            .map_err(|e| CerememoryError::Validation(format!("Invalid search query: {e}")))?;
168
169        let search_limit = if session_id.is_some() {
170            limit * 5
171        } else {
172            limit * 3
173        };
174        let top_docs = searcher
175            .search(&parsed, &TopDocs::with_limit(search_limit.max(limit)))
176            .map_err(|e| CerememoryError::Storage(format!("Tantivy search: {e}")))?;
177
178        let mut hits = Vec::new();
179        for (score, doc_address) in top_docs {
180            let doc: TantivyDocument = searcher
181                .doc(doc_address)
182                .map_err(|e| CerememoryError::Storage(format!("Tantivy doc fetch: {e}")))?;
183
184            if let Some(session_filter) = session_id {
185                let doc_session = doc
186                    .get_first(self.fields.session_id)
187                    .and_then(|v| v.as_str())
188                    .unwrap_or("");
189                if doc_session != session_filter {
190                    continue;
191                }
192            }
193
194            let id_str = doc
195                .get_first(self.fields.id)
196                .and_then(|v| v.as_str())
197                .unwrap_or("");
198            if let Ok(record_id) = Uuid::parse_str(id_str) {
199                let _ = score;
200                hits.push(RawTextSearchHit { record_id });
201            }
202            if hits.len() >= limit {
203                break;
204            }
205        }
206        Ok(hits)
207    }
208}
209
210#[derive(Clone)]
211pub struct RawJournalStore {
212    db: Arc<Database>,
213    text_index: Arc<RawTextIndex>,
214}
215
216impl RawJournalStore {
217    /// Open (or create) a persistent raw journal at `path`.
218    pub fn open(path: impl AsRef<Path>) -> Result<Self, CerememoryError> {
219        let db = Database::create(path.as_ref())
220            .map_err(|e| CerememoryError::Storage(format!("Failed to open redb database: {e}")))?;
221        let text_index = RawTextIndex::open(path.as_ref())?;
222
223        let store = Self {
224            db: Arc::new(db),
225            text_index: Arc::new(text_index),
226        };
227        store.ensure_tables()?;
228        Ok(store)
229    }
230
231    /// Create an ephemeral in-memory raw journal backed by a temporary file.
232    pub fn open_in_memory() -> Result<Self, CerememoryError> {
233        let tmp = tempfile::NamedTempFile::new()
234            .map_err(|e| CerememoryError::Storage(format!("Failed to create temp file: {e}")))?;
235        let path = tmp.into_temp_path();
236        let _ = std::fs::remove_file(&path);
237        let db = Database::create(&path).map_err(|e| {
238            CerememoryError::Storage(format!("Failed to open in-memory redb database: {e}"))
239        })?;
240        let text_index = RawTextIndex::open_in_memory()?;
241
242        let store = Self {
243            db: Arc::new(db),
244            text_index: Arc::new(text_index),
245        };
246        store.ensure_tables()?;
247        Ok(store)
248    }
249
250    fn ensure_tables(&self) -> Result<(), CerememoryError> {
251        let txn = self.db.begin_write().map_err(storage_err)?;
252        {
253            let _ = txn.open_table(RAW_JOURNAL_RECORDS).map_err(storage_err)?;
254            let _ = txn
255                .open_table(RAW_JOURNAL_SESSION_INDEX)
256                .map_err(storage_err)?;
257        }
258        txn.commit().map_err(storage_err)?;
259        Ok(())
260    }
261
262    /// Append a new raw journal record.
263    pub async fn append(&self, record: RawJournalRecord) -> Result<Uuid, CerememoryError> {
264        record.validate()?;
265
266        let record_id = record.id;
267        let text_payload = record
268            .text_content()
269            .map(|text| (record_id, record.session_id.clone(), text.to_string()));
270
271        let db = self.db.clone();
272        let _ = tokio::task::spawn_blocking(move || {
273            let id = record.id;
274            let packed = rmp_serde::to_vec(&record)
275                .map_err(|e| CerememoryError::Serialization(format!("msgpack encode: {e}")))?;
276
277            let txn = db.begin_write().map_err(storage_err)?;
278            {
279                let mut records = txn.open_table(RAW_JOURNAL_RECORDS).map_err(storage_err)?;
280                records
281                    .insert(id.as_bytes().as_slice(), packed.as_slice())
282                    .map_err(storage_err)?;
283
284                let mut session_index = txn
285                    .open_table(RAW_JOURNAL_SESSION_INDEX)
286                    .map_err(storage_err)?;
287                let mut entries: Vec<SessionIndexEntry> = match session_index
288                    .get(record.session_id.as_str())
289                    .map_err(storage_err)?
290                {
291                    Some(value_guard) => {
292                        rmp_serde::from_slice(value_guard.value()).map_err(|e| {
293                            CerememoryError::Serialization(format!(
294                                "msgpack decode session index: {e}"
295                            ))
296                        })?
297                    }
298                    None => Vec::new(),
299                };
300                entries.push(SessionIndexEntry {
301                    id,
302                    created_at: record.created_at,
303                });
304                let packed_entries = rmp_serde::to_vec(&entries).map_err(|e| {
305                    CerememoryError::Serialization(format!("msgpack encode session index: {e}"))
306                })?;
307                session_index
308                    .insert(record.session_id.as_str(), packed_entries.as_slice())
309                    .map_err(storage_err)?;
310            }
311            txn.commit().map_err(storage_err)?;
312            Ok::<Uuid, CerememoryError>(id)
313        })
314        .await
315        .map_err(|e| CerememoryError::Internal(format!("spawn_blocking panicked: {e}")))??;
316
317        if let Some((id, session_id, text)) = text_payload {
318            self.text_index.add(id, &session_id, &text)?;
319        }
320        Ok(record_id)
321    }
322
323    /// Update an existing raw journal record in place.
324    pub async fn update(&self, record: RawJournalRecord) -> Result<(), CerememoryError> {
325        record.validate()?;
326
327        let previous = self
328            .get(&record.id)
329            .await?
330            .ok_or_else(|| CerememoryError::RecordNotFound(record.id.to_string()))?;
331        let previous_had_text = previous.text_content().is_some();
332        let record_id = record.id;
333        let text_payload = record
334            .text_content()
335            .map(|text| (record_id, record.session_id.clone(), text.to_string()));
336
337        let previous_for_txn = previous.clone();
338        let record_for_txn = record.clone();
339
340        let db = self.db.clone();
341        tokio::task::spawn_blocking(move || {
342            let packed = rmp_serde::to_vec(&record_for_txn)
343                .map_err(|e| CerememoryError::Serialization(format!("msgpack encode: {e}")))?;
344
345            let txn = db.begin_write().map_err(storage_err)?;
346            {
347                let mut records = txn.open_table(RAW_JOURNAL_RECORDS).map_err(storage_err)?;
348                records
349                    .insert(record_for_txn.id.as_bytes().as_slice(), packed.as_slice())
350                    .map_err(storage_err)?;
351
352                let mut session_index = txn
353                    .open_table(RAW_JOURNAL_SESSION_INDEX)
354                    .map_err(storage_err)?;
355
356                let mut old_entries: Vec<SessionIndexEntry> = match session_index
357                    .get(previous_for_txn.session_id.as_str())
358                    .map_err(storage_err)?
359                {
360                    Some(value_guard) => {
361                        rmp_serde::from_slice(value_guard.value()).map_err(|e| {
362                            CerememoryError::Serialization(format!(
363                                "msgpack decode session index: {e}"
364                            ))
365                        })?
366                    }
367                    None => Vec::new(),
368                };
369                old_entries.retain(|entry| entry.id != previous_for_txn.id);
370                let packed_old_entries = rmp_serde::to_vec(&old_entries).map_err(|e| {
371                    CerememoryError::Serialization(format!("msgpack encode session index: {e}"))
372                })?;
373                session_index
374                    .insert(
375                        previous_for_txn.session_id.as_str(),
376                        packed_old_entries.as_slice(),
377                    )
378                    .map_err(storage_err)?;
379
380                let mut new_entries: Vec<SessionIndexEntry> =
381                    if previous_for_txn.session_id == record_for_txn.session_id {
382                        old_entries
383                    } else {
384                        match session_index
385                            .get(record_for_txn.session_id.as_str())
386                            .map_err(storage_err)?
387                        {
388                            Some(value_guard) => rmp_serde::from_slice(value_guard.value())
389                                .map_err(|e| {
390                                    CerememoryError::Serialization(format!(
391                                        "msgpack decode session index: {e}"
392                                    ))
393                                })?,
394                            None => Vec::new(),
395                        }
396                    };
397                new_entries.push(SessionIndexEntry {
398                    id: record_for_txn.id,
399                    created_at: record_for_txn.created_at,
400                });
401                let packed_new_entries = rmp_serde::to_vec(&new_entries).map_err(|e| {
402                    CerememoryError::Serialization(format!("msgpack encode session index: {e}"))
403                })?;
404                session_index
405                    .insert(
406                        record_for_txn.session_id.as_str(),
407                        packed_new_entries.as_slice(),
408                    )
409                    .map_err(storage_err)?;
410            }
411            txn.commit().map_err(storage_err)?;
412            Ok::<(), CerememoryError>(())
413        })
414        .await
415        .map_err(|e| CerememoryError::Internal(format!("spawn_blocking panicked: {e}")))??;
416
417        if previous_had_text && text_payload.is_none() {
418            self.text_index.remove(record_id)?;
419        }
420        if let Some((id, session_id, text)) = text_payload {
421            if previous_had_text {
422                self.text_index.update(id, &session_id, &text)?;
423            } else {
424                self.text_index.add(id, &session_id, &text)?;
425            }
426        }
427        Ok(())
428    }
429
430    /// Retrieve a raw journal record by id.
431    pub async fn get(&self, id: &Uuid) -> Result<Option<RawJournalRecord>, CerememoryError> {
432        let db = self.db.clone();
433        let id = *id;
434        tokio::task::spawn_blocking(move || {
435            let txn = db.begin_read().map_err(storage_err)?;
436            let table = txn.open_table(RAW_JOURNAL_RECORDS).map_err(storage_err)?;
437            get_raw_record_sync(&table, &id)
438        })
439        .await
440        .map_err(|e| CerememoryError::Internal(format!("spawn_blocking panicked: {e}")))?
441    }
442
443    /// Delete a raw journal record by id.
444    pub async fn delete(&self, id: &Uuid) -> Result<bool, CerememoryError> {
445        let existing = self.get(id).await?;
446        let Some(existing) = existing else {
447            return Ok(false);
448        };
449        let existing_had_text = existing.text_content().is_some();
450        let existing_session_id = existing.session_id.clone();
451        let id = *id;
452        let db = self.db.clone();
453        tokio::task::spawn_blocking(move || {
454            let txn = db.begin_write().map_err(storage_err)?;
455            {
456                let mut records = txn.open_table(RAW_JOURNAL_RECORDS).map_err(storage_err)?;
457                let _ = records
458                    .remove(id.as_bytes().as_slice())
459                    .map_err(storage_err)?;
460
461                let mut session_index = txn
462                    .open_table(RAW_JOURNAL_SESSION_INDEX)
463                    .map_err(storage_err)?;
464                let mut entries: Vec<SessionIndexEntry> = match session_index
465                    .get(existing_session_id.as_str())
466                    .map_err(storage_err)?
467                {
468                    Some(value_guard) => {
469                        rmp_serde::from_slice(value_guard.value()).map_err(|e| {
470                            CerememoryError::Serialization(format!(
471                                "msgpack decode session index: {e}"
472                            ))
473                        })?
474                    }
475                    None => Vec::new(),
476                };
477                entries.retain(|entry| entry.id != id);
478                let packed_entries = rmp_serde::to_vec(&entries).map_err(|e| {
479                    CerememoryError::Serialization(format!("msgpack encode session index: {e}"))
480                })?;
481                session_index
482                    .insert(existing_session_id.as_str(), packed_entries.as_slice())
483                    .map_err(storage_err)?;
484            }
485            txn.commit().map_err(storage_err)?;
486            Ok::<bool, CerememoryError>(true)
487        })
488        .await
489        .map_err(|e| CerememoryError::Internal(format!("spawn_blocking panicked: {e}")))??;
490
491        if existing_had_text {
492            self.text_index.remove(id)?;
493        }
494        Ok(true)
495    }
496
497    /// Return all raw journal records in storage.
498    pub async fn get_all(&self) -> Result<Vec<RawJournalRecord>, CerememoryError> {
499        let db = self.db.clone();
500        tokio::task::spawn_blocking(move || {
501            let txn = db.begin_read().map_err(storage_err)?;
502            let table = txn.open_table(RAW_JOURNAL_RECORDS).map_err(storage_err)?;
503            get_all_raw_records_sync(&table)
504        })
505        .await
506        .map_err(|e| CerememoryError::Internal(format!("spawn_blocking panicked: {e}")))?
507    }
508
509    /// Return all records for a session, sorted by creation time.
510    pub async fn query_session(
511        &self,
512        session_id: &str,
513    ) -> Result<Vec<RawJournalRecord>, CerememoryError> {
514        let session_id = session_id.trim().to_string();
515        if session_id.is_empty() {
516            return Err(CerememoryError::Validation(
517                "session_id must not be empty".to_string(),
518            ));
519        }
520
521        let db = self.db.clone();
522        tokio::task::spawn_blocking(move || {
523            let txn = db.begin_read().map_err(storage_err)?;
524            let records_table = txn.open_table(RAW_JOURNAL_RECORDS).map_err(storage_err)?;
525            let session_table = txn
526                .open_table(RAW_JOURNAL_SESSION_INDEX)
527                .map_err(storage_err)?;
528            let entries = get_session_index_entries_sync(&session_table, &session_id)?;
529            let mut records = get_raw_records_by_entries_sync(&records_table, &entries)?;
530            records.sort_by(|a, b| {
531                a.created_at
532                    .cmp(&b.created_at)
533                    .then_with(|| a.id.cmp(&b.id))
534            });
535            Ok(records)
536        })
537        .await
538        .map_err(|e| CerememoryError::Internal(format!("spawn_blocking panicked: {e}")))?
539    }
540
541    /// Return all records for a session whose `created_at` falls within `[start, end]`.
542    pub async fn query_session_range(
543        &self,
544        session_id: &str,
545        start: DateTime<Utc>,
546        end: DateTime<Utc>,
547    ) -> Result<Vec<RawJournalRecord>, CerememoryError> {
548        let session_id = session_id.trim().to_string();
549        if session_id.is_empty() {
550            return Err(CerememoryError::Validation(
551                "session_id must not be empty".to_string(),
552            ));
553        }
554        if start > end {
555            return Err(CerememoryError::Validation(
556                "Invalid time range: start must be earlier than or equal to end".to_string(),
557            ));
558        }
559
560        let db = self.db.clone();
561        tokio::task::spawn_blocking(move || {
562            let txn = db.begin_read().map_err(storage_err)?;
563            let records_table = txn.open_table(RAW_JOURNAL_RECORDS).map_err(storage_err)?;
564            let session_table = txn
565                .open_table(RAW_JOURNAL_SESSION_INDEX)
566                .map_err(storage_err)?;
567            let entries = get_session_index_entries_sync(&session_table, &session_id)?;
568            let filtered_entries: Vec<SessionIndexEntry> = entries
569                .into_iter()
570                .filter(|entry| entry.created_at >= start && entry.created_at <= end)
571                .collect();
572            let mut records = get_raw_records_by_entries_sync(&records_table, &filtered_entries)?;
573            records.sort_by(|a, b| {
574                a.created_at
575                    .cmp(&b.created_at)
576                    .then_with(|| a.id.cmp(&b.id))
577            });
578            Ok(records)
579        })
580        .await
581        .map_err(|e| CerememoryError::Internal(format!("spawn_blocking panicked: {e}")))?
582    }
583
584    pub async fn count(&self) -> Result<usize, CerememoryError> {
585        let db = self.db.clone();
586        tokio::task::spawn_blocking(move || {
587            let txn = db.begin_read().map_err(storage_err)?;
588            let table = txn.open_table(RAW_JOURNAL_RECORDS).map_err(storage_err)?;
589            let len = table.len().map_err(storage_err)?;
590            usize::try_from(len).map_err(|_| {
591                CerememoryError::Internal(format!("raw journal length {len} does not fit usize"))
592            })
593        })
594        .await
595        .map_err(|e| CerememoryError::Internal(format!("spawn_blocking panicked: {e}")))?
596    }
597
598    /// Search text content in the raw journal, optionally filtered to a session.
599    pub async fn search_text(
600        &self,
601        query: &str,
602        session_id: Option<&str>,
603        limit: usize,
604    ) -> Result<Vec<RawJournalRecord>, CerememoryError> {
605        let hits = self.text_index.search(query, session_id, limit)?;
606        let mut records = Vec::with_capacity(hits.len());
607        for hit in hits {
608            if let Some(record) = self.get(&hit.record_id).await? {
609                records.push(record);
610            }
611        }
612        Ok(records)
613    }
614}
615
616fn raw_text_index_path(path: &Path) -> PathBuf {
617    let stem = path
618        .file_stem()
619        .and_then(|stem| stem.to_str())
620        .unwrap_or("raw_journal");
621    let dir_name = format!("{stem}_text_index");
622    path.with_file_name(dir_name)
623}
624
625fn get_raw_record_sync(
626    table: &redb::ReadOnlyTable<&[u8], &[u8]>,
627    id: &Uuid,
628) -> Result<Option<RawJournalRecord>, CerememoryError> {
629    match table.get(id.as_bytes().as_slice()).map_err(storage_err)? {
630        Some(value_guard) => {
631            let record: RawJournalRecord = rmp_serde::from_slice(value_guard.value())
632                .map_err(|e| CerememoryError::Serialization(format!("msgpack decode: {e}")))?;
633            Ok(Some(record))
634        }
635        None => Ok(None),
636    }
637}
638
639fn get_all_raw_records_sync(
640    table: &redb::ReadOnlyTable<&[u8], &[u8]>,
641) -> Result<Vec<RawJournalRecord>, CerememoryError> {
642    let mut records = Vec::new();
643    for entry in table.iter().map_err(storage_err)? {
644        let (_, value) = entry.map_err(storage_err)?;
645        let record: RawJournalRecord = rmp_serde::from_slice(value.value())
646            .map_err(|e| CerememoryError::Serialization(format!("msgpack decode: {e}")))?;
647        records.push(record);
648    }
649    Ok(records)
650}
651
652fn get_session_index_entries_sync(
653    table: &redb::ReadOnlyTable<&str, &[u8]>,
654    session_id: &str,
655) -> Result<Vec<SessionIndexEntry>, CerememoryError> {
656    match table.get(session_id).map_err(storage_err)? {
657        Some(value_guard) => rmp_serde::from_slice(value_guard.value()).map_err(|e| {
658            CerememoryError::Serialization(format!("msgpack decode session index: {e}"))
659        }),
660        None => Ok(Vec::new()),
661    }
662}
663
664fn get_raw_records_by_entries_sync(
665    table: &redb::ReadOnlyTable<&[u8], &[u8]>,
666    entries: &[SessionIndexEntry],
667) -> Result<Vec<RawJournalRecord>, CerememoryError> {
668    let mut records = Vec::with_capacity(entries.len());
669    for entry in entries {
670        if let Some(record) = get_raw_record_sync(table, &entry.id)? {
671            records.push(record);
672        }
673    }
674    Ok(records)
675}
676
677#[cfg(test)]
678mod tests {
679    use super::*;
680    use cerememory_core::types::{RawSource, RawSpeaker, RawVisibility, SecrecyLevel};
681
682    fn make_record(session_id: &str, text: &str) -> RawJournalRecord {
683        RawJournalRecord::new_text(
684            session_id,
685            RawSource::Conversation,
686            RawSpeaker::User,
687            RawVisibility::Normal,
688            SecrecyLevel::Public,
689            text,
690        )
691    }
692
693    #[tokio::test]
694    async fn append_and_get_roundtrip() {
695        let store = RawJournalStore::open_in_memory().unwrap();
696        let record = make_record("sess-1", "hello raw journal");
697        let id = record.id;
698
699        store.append(record).await.unwrap();
700        let restored = store.get(&id).await.unwrap().unwrap();
701
702        assert_eq!(restored.id, id);
703        assert_eq!(restored.session_id, "sess-1");
704        assert_eq!(restored.text_content(), Some("hello raw journal"));
705    }
706
707    #[tokio::test]
708    async fn query_session_filters_records() {
709        let store = RawJournalStore::open_in_memory().unwrap();
710        store.append(make_record("sess-1", "one")).await.unwrap();
711        store.append(make_record("sess-2", "two")).await.unwrap();
712        store.append(make_record("sess-1", "three")).await.unwrap();
713
714        let sess_1 = store.query_session("sess-1").await.unwrap();
715        assert_eq!(sess_1.len(), 2);
716        assert!(sess_1.iter().all(|record| record.session_id == "sess-1"));
717    }
718
719    #[tokio::test]
720    async fn query_session_range_filters_by_time() {
721        let store = RawJournalStore::open_in_memory().unwrap();
722        let base = Utc::now();
723
724        let mut early = make_record("sess-1", "early");
725        early.created_at = base - chrono::Duration::hours(2);
726        early.updated_at = early.created_at;
727
728        let mut middle = make_record("sess-1", "middle");
729        middle.created_at = base - chrono::Duration::hours(1);
730        middle.updated_at = middle.created_at;
731
732        let mut late = make_record("sess-1", "late");
733        late.created_at = base + chrono::Duration::hours(1);
734        late.updated_at = late.created_at;
735
736        store.append(early).await.unwrap();
737        store.append(middle).await.unwrap();
738        store.append(late).await.unwrap();
739
740        let records = store
741            .query_session_range(
742                "sess-1",
743                base - chrono::Duration::hours(1),
744                base + chrono::Duration::minutes(10),
745            )
746            .await
747            .unwrap();
748
749        assert_eq!(records.len(), 1);
750        assert_eq!(records[0].text_content(), Some("middle"));
751    }
752
753    #[tokio::test]
754    async fn count_tracks_records() {
755        let store = RawJournalStore::open_in_memory().unwrap();
756        assert_eq!(store.count().await.unwrap(), 0);
757
758        store.append(make_record("sess-1", "one")).await.unwrap();
759        store.append(make_record("sess-1", "two")).await.unwrap();
760
761        assert_eq!(store.count().await.unwrap(), 2);
762    }
763
764    #[tokio::test]
765    async fn search_text_filters_by_query_and_session() {
766        let store = RawJournalStore::open_in_memory().unwrap();
767        store
768            .append(make_record("sess-1", "timeout retries are idempotent only"))
769            .await
770            .unwrap();
771        store
772            .append(make_record("sess-2", "timeout budget differs"))
773            .await
774            .unwrap();
775
776        let sess_1 = store
777            .search_text("idempotent", Some("sess-1"), 10)
778            .await
779            .unwrap();
780        assert_eq!(sess_1.len(), 1);
781        assert_eq!(sess_1[0].session_id, "sess-1");
782
783        let all = store.search_text("timeout", None, 10).await.unwrap();
784        assert_eq!(all.len(), 2);
785    }
786}