1use std::path::{Path, PathBuf};
7use std::sync::{Arc, Mutex};
8
9use chrono::{DateTime, Utc};
10use redb::{Database, ReadableTable, ReadableTableMetadata, TableDefinition};
11use serde::{Deserialize, Serialize};
12use tantivy::collector::TopDocs;
13use tantivy::query::QueryParser;
14use tantivy::schema::*;
15use tantivy::{Directory, Index, IndexReader, IndexWriter, ReloadPolicy, TantivyDocument};
16use uuid::Uuid;
17
18use cerememory_core::error::CerememoryError;
19use cerememory_core::types::RawJournalRecord;
20use cerememory_store_common::storage_err;
21
22const RAW_JOURNAL_RECORDS: TableDefinition<&[u8], &[u8]> =
24 TableDefinition::new("raw_journal_records");
25const RAW_JOURNAL_SESSION_INDEX: TableDefinition<&str, &[u8]> =
27 TableDefinition::new("raw_journal_session_index");
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
30struct SessionIndexEntry {
31 id: Uuid,
32 created_at: DateTime<Utc>,
33}
34
35struct RawFields {
36 id: Field,
37 session_id: Field,
38 body: Field,
39}
40
41#[derive(Clone)]
42struct RawTextSearchHit {
43 record_id: Uuid,
44}
45
46struct RawTextIndex {
47 index: Index,
48 reader: IndexReader,
49 writer: Arc<Mutex<IndexWriter>>,
50 fields: RawFields,
51}
52
53impl RawTextIndex {
54 fn open(path: &Path) -> Result<Self, CerememoryError> {
55 let dir_path = raw_text_index_path(path);
56 std::fs::create_dir_all(&dir_path).map_err(|e| {
57 CerememoryError::Storage(format!("Failed to create raw text index dir: {e}"))
58 })?;
59 let dir = tantivy::directory::MmapDirectory::open(&dir_path)
60 .map_err(|e| CerememoryError::Storage(format!("Tantivy dir open: {e}")))?;
61 Self::open_with_dir(dir)
62 }
63
64 fn open_in_memory() -> Result<Self, CerememoryError> {
65 let dir = tantivy::directory::RamDirectory::create();
66 Self::open_with_dir(dir)
67 }
68
69 fn open_with_dir(dir: impl Directory + 'static) -> Result<Self, CerememoryError> {
70 let schema = Self::build_schema();
71 let index = Index::open_or_create(dir, schema.clone())
72 .map_err(|e| CerememoryError::Storage(format!("Tantivy index open: {e}")))?;
73 let writer = index
74 .writer(20_000_000)
75 .map_err(|e| CerememoryError::Storage(format!("Tantivy writer: {e}")))?;
76 let reader = index
77 .reader_builder()
78 .reload_policy(ReloadPolicy::OnCommitWithDelay)
79 .try_into()
80 .map_err(|e| CerememoryError::Storage(format!("Tantivy reader: {e}")))?;
81 let fields = RawFields {
82 id: schema.get_field("id").unwrap(),
83 session_id: schema.get_field("session_id").unwrap(),
84 body: schema.get_field("body").unwrap(),
85 };
86
87 Ok(Self {
88 index,
89 reader,
90 writer: Arc::new(Mutex::new(writer)),
91 fields,
92 })
93 }
94
95 fn build_schema() -> Schema {
96 let mut builder = Schema::builder();
97 builder.add_text_field("id", STRING | STORED);
98 builder.add_text_field("session_id", STRING | STORED);
99 builder.add_text_field("body", TEXT);
100 builder.build()
101 }
102
103 fn lock_writer(&self) -> Result<std::sync::MutexGuard<'_, IndexWriter>, CerememoryError> {
104 self.writer
105 .lock()
106 .map_err(|e| CerememoryError::Internal(format!("RawTextIndex writer lock: {e}")))
107 }
108
109 fn add(&self, id: Uuid, session_id: &str, body: &str) -> Result<(), CerememoryError> {
110 let mut doc = TantivyDocument::new();
111 doc.add_text(self.fields.id, id.to_string());
112 doc.add_text(self.fields.session_id, session_id);
113 doc.add_text(self.fields.body, body);
114
115 let mut writer = self.lock_writer()?;
116 writer
117 .add_document(doc)
118 .map_err(|e| CerememoryError::Storage(format!("Tantivy add doc: {e}")))?;
119 writer
120 .commit()
121 .map_err(|e| CerememoryError::Storage(format!("Tantivy commit: {e}")))?;
122 Ok(())
123 }
124
125 fn remove(&self, id: Uuid) -> Result<(), CerememoryError> {
126 let term = tantivy::Term::from_field_text(self.fields.id, &id.to_string());
127 let mut writer = self.lock_writer()?;
128 writer.delete_term(term);
129 writer
130 .commit()
131 .map_err(|e| CerememoryError::Storage(format!("Tantivy commit: {e}")))?;
132 Ok(())
133 }
134
135 fn update(&self, id: Uuid, session_id: &str, body: &str) -> Result<(), CerememoryError> {
136 let term = tantivy::Term::from_field_text(self.fields.id, &id.to_string());
137 let mut doc = TantivyDocument::new();
138 doc.add_text(self.fields.id, id.to_string());
139 doc.add_text(self.fields.session_id, session_id);
140 doc.add_text(self.fields.body, body);
141
142 let mut writer = self.lock_writer()?;
143 writer.delete_term(term);
144 writer
145 .add_document(doc)
146 .map_err(|e| CerememoryError::Storage(format!("Tantivy add doc: {e}")))?;
147 writer
148 .commit()
149 .map_err(|e| CerememoryError::Storage(format!("Tantivy commit: {e}")))?;
150 Ok(())
151 }
152
153 fn search(
154 &self,
155 query: &str,
156 session_id: Option<&str>,
157 limit: usize,
158 ) -> Result<Vec<RawTextSearchHit>, CerememoryError> {
159 self.reader
160 .reload()
161 .map_err(|e| CerememoryError::Storage(format!("Tantivy reader reload: {e}")))?;
162
163 let searcher = self.reader.searcher();
164 let query_parser = QueryParser::for_index(&self.index, vec![self.fields.body]);
165 let parsed = query_parser
166 .parse_query(query)
167 .map_err(|e| CerememoryError::Validation(format!("Invalid search query: {e}")))?;
168
169 let search_limit = if session_id.is_some() {
170 limit * 5
171 } else {
172 limit * 3
173 };
174 let top_docs = searcher
175 .search(&parsed, &TopDocs::with_limit(search_limit.max(limit)))
176 .map_err(|e| CerememoryError::Storage(format!("Tantivy search: {e}")))?;
177
178 let mut hits = Vec::new();
179 for (score, doc_address) in top_docs {
180 let doc: TantivyDocument = searcher
181 .doc(doc_address)
182 .map_err(|e| CerememoryError::Storage(format!("Tantivy doc fetch: {e}")))?;
183
184 if let Some(session_filter) = session_id {
185 let doc_session = doc
186 .get_first(self.fields.session_id)
187 .and_then(|v| v.as_str())
188 .unwrap_or("");
189 if doc_session != session_filter {
190 continue;
191 }
192 }
193
194 let id_str = doc
195 .get_first(self.fields.id)
196 .and_then(|v| v.as_str())
197 .unwrap_or("");
198 if let Ok(record_id) = Uuid::parse_str(id_str) {
199 let _ = score;
200 hits.push(RawTextSearchHit { record_id });
201 }
202 if hits.len() >= limit {
203 break;
204 }
205 }
206 Ok(hits)
207 }
208}
209
210#[derive(Clone)]
211pub struct RawJournalStore {
212 db: Arc<Database>,
213 text_index: Arc<RawTextIndex>,
214}
215
216impl RawJournalStore {
217 pub fn open(path: impl AsRef<Path>) -> Result<Self, CerememoryError> {
219 let db = Database::create(path.as_ref())
220 .map_err(|e| CerememoryError::Storage(format!("Failed to open redb database: {e}")))?;
221 let text_index = RawTextIndex::open(path.as_ref())?;
222
223 let store = Self {
224 db: Arc::new(db),
225 text_index: Arc::new(text_index),
226 };
227 store.ensure_tables()?;
228 Ok(store)
229 }
230
231 pub fn open_in_memory() -> Result<Self, CerememoryError> {
233 let tmp = tempfile::NamedTempFile::new()
234 .map_err(|e| CerememoryError::Storage(format!("Failed to create temp file: {e}")))?;
235 let path = tmp.into_temp_path();
236 let _ = std::fs::remove_file(&path);
237 let db = Database::create(&path).map_err(|e| {
238 CerememoryError::Storage(format!("Failed to open in-memory redb database: {e}"))
239 })?;
240 let text_index = RawTextIndex::open_in_memory()?;
241
242 let store = Self {
243 db: Arc::new(db),
244 text_index: Arc::new(text_index),
245 };
246 store.ensure_tables()?;
247 Ok(store)
248 }
249
250 fn ensure_tables(&self) -> Result<(), CerememoryError> {
251 let txn = self.db.begin_write().map_err(storage_err)?;
252 {
253 let _ = txn.open_table(RAW_JOURNAL_RECORDS).map_err(storage_err)?;
254 let _ = txn
255 .open_table(RAW_JOURNAL_SESSION_INDEX)
256 .map_err(storage_err)?;
257 }
258 txn.commit().map_err(storage_err)?;
259 Ok(())
260 }
261
262 pub async fn append(&self, record: RawJournalRecord) -> Result<Uuid, CerememoryError> {
264 record.validate()?;
265
266 let record_id = record.id;
267 let text_payload = record
268 .text_content()
269 .map(|text| (record_id, record.session_id.clone(), text.to_string()));
270
271 let db = self.db.clone();
272 let _ = tokio::task::spawn_blocking(move || {
273 let id = record.id;
274 let packed = rmp_serde::to_vec(&record)
275 .map_err(|e| CerememoryError::Serialization(format!("msgpack encode: {e}")))?;
276
277 let txn = db.begin_write().map_err(storage_err)?;
278 {
279 let mut records = txn.open_table(RAW_JOURNAL_RECORDS).map_err(storage_err)?;
280 records
281 .insert(id.as_bytes().as_slice(), packed.as_slice())
282 .map_err(storage_err)?;
283
284 let mut session_index = txn
285 .open_table(RAW_JOURNAL_SESSION_INDEX)
286 .map_err(storage_err)?;
287 let mut entries: Vec<SessionIndexEntry> = match session_index
288 .get(record.session_id.as_str())
289 .map_err(storage_err)?
290 {
291 Some(value_guard) => {
292 rmp_serde::from_slice(value_guard.value()).map_err(|e| {
293 CerememoryError::Serialization(format!(
294 "msgpack decode session index: {e}"
295 ))
296 })?
297 }
298 None => Vec::new(),
299 };
300 entries.push(SessionIndexEntry {
301 id,
302 created_at: record.created_at,
303 });
304 let packed_entries = rmp_serde::to_vec(&entries).map_err(|e| {
305 CerememoryError::Serialization(format!("msgpack encode session index: {e}"))
306 })?;
307 session_index
308 .insert(record.session_id.as_str(), packed_entries.as_slice())
309 .map_err(storage_err)?;
310 }
311 txn.commit().map_err(storage_err)?;
312 Ok::<Uuid, CerememoryError>(id)
313 })
314 .await
315 .map_err(|e| CerememoryError::Internal(format!("spawn_blocking panicked: {e}")))??;
316
317 if let Some((id, session_id, text)) = text_payload {
318 self.text_index.add(id, &session_id, &text)?;
319 }
320 Ok(record_id)
321 }
322
323 pub async fn update(&self, record: RawJournalRecord) -> Result<(), CerememoryError> {
325 record.validate()?;
326
327 let previous = self
328 .get(&record.id)
329 .await?
330 .ok_or_else(|| CerememoryError::RecordNotFound(record.id.to_string()))?;
331 let previous_had_text = previous.text_content().is_some();
332 let record_id = record.id;
333 let text_payload = record
334 .text_content()
335 .map(|text| (record_id, record.session_id.clone(), text.to_string()));
336
337 let previous_for_txn = previous.clone();
338 let record_for_txn = record.clone();
339
340 let db = self.db.clone();
341 tokio::task::spawn_blocking(move || {
342 let packed = rmp_serde::to_vec(&record_for_txn)
343 .map_err(|e| CerememoryError::Serialization(format!("msgpack encode: {e}")))?;
344
345 let txn = db.begin_write().map_err(storage_err)?;
346 {
347 let mut records = txn.open_table(RAW_JOURNAL_RECORDS).map_err(storage_err)?;
348 records
349 .insert(record_for_txn.id.as_bytes().as_slice(), packed.as_slice())
350 .map_err(storage_err)?;
351
352 let mut session_index = txn
353 .open_table(RAW_JOURNAL_SESSION_INDEX)
354 .map_err(storage_err)?;
355
356 let mut old_entries: Vec<SessionIndexEntry> = match session_index
357 .get(previous_for_txn.session_id.as_str())
358 .map_err(storage_err)?
359 {
360 Some(value_guard) => {
361 rmp_serde::from_slice(value_guard.value()).map_err(|e| {
362 CerememoryError::Serialization(format!(
363 "msgpack decode session index: {e}"
364 ))
365 })?
366 }
367 None => Vec::new(),
368 };
369 old_entries.retain(|entry| entry.id != previous_for_txn.id);
370 let packed_old_entries = rmp_serde::to_vec(&old_entries).map_err(|e| {
371 CerememoryError::Serialization(format!("msgpack encode session index: {e}"))
372 })?;
373 session_index
374 .insert(
375 previous_for_txn.session_id.as_str(),
376 packed_old_entries.as_slice(),
377 )
378 .map_err(storage_err)?;
379
380 let mut new_entries: Vec<SessionIndexEntry> =
381 if previous_for_txn.session_id == record_for_txn.session_id {
382 old_entries
383 } else {
384 match session_index
385 .get(record_for_txn.session_id.as_str())
386 .map_err(storage_err)?
387 {
388 Some(value_guard) => rmp_serde::from_slice(value_guard.value())
389 .map_err(|e| {
390 CerememoryError::Serialization(format!(
391 "msgpack decode session index: {e}"
392 ))
393 })?,
394 None => Vec::new(),
395 }
396 };
397 new_entries.push(SessionIndexEntry {
398 id: record_for_txn.id,
399 created_at: record_for_txn.created_at,
400 });
401 let packed_new_entries = rmp_serde::to_vec(&new_entries).map_err(|e| {
402 CerememoryError::Serialization(format!("msgpack encode session index: {e}"))
403 })?;
404 session_index
405 .insert(
406 record_for_txn.session_id.as_str(),
407 packed_new_entries.as_slice(),
408 )
409 .map_err(storage_err)?;
410 }
411 txn.commit().map_err(storage_err)?;
412 Ok::<(), CerememoryError>(())
413 })
414 .await
415 .map_err(|e| CerememoryError::Internal(format!("spawn_blocking panicked: {e}")))??;
416
417 if previous_had_text && text_payload.is_none() {
418 self.text_index.remove(record_id)?;
419 }
420 if let Some((id, session_id, text)) = text_payload {
421 if previous_had_text {
422 self.text_index.update(id, &session_id, &text)?;
423 } else {
424 self.text_index.add(id, &session_id, &text)?;
425 }
426 }
427 Ok(())
428 }
429
430 pub async fn get(&self, id: &Uuid) -> Result<Option<RawJournalRecord>, CerememoryError> {
432 let db = self.db.clone();
433 let id = *id;
434 tokio::task::spawn_blocking(move || {
435 let txn = db.begin_read().map_err(storage_err)?;
436 let table = txn.open_table(RAW_JOURNAL_RECORDS).map_err(storage_err)?;
437 get_raw_record_sync(&table, &id)
438 })
439 .await
440 .map_err(|e| CerememoryError::Internal(format!("spawn_blocking panicked: {e}")))?
441 }
442
443 pub async fn delete(&self, id: &Uuid) -> Result<bool, CerememoryError> {
445 let existing = self.get(id).await?;
446 let Some(existing) = existing else {
447 return Ok(false);
448 };
449 let existing_had_text = existing.text_content().is_some();
450 let existing_session_id = existing.session_id.clone();
451 let id = *id;
452 let db = self.db.clone();
453 tokio::task::spawn_blocking(move || {
454 let txn = db.begin_write().map_err(storage_err)?;
455 {
456 let mut records = txn.open_table(RAW_JOURNAL_RECORDS).map_err(storage_err)?;
457 let _ = records
458 .remove(id.as_bytes().as_slice())
459 .map_err(storage_err)?;
460
461 let mut session_index = txn
462 .open_table(RAW_JOURNAL_SESSION_INDEX)
463 .map_err(storage_err)?;
464 let mut entries: Vec<SessionIndexEntry> = match session_index
465 .get(existing_session_id.as_str())
466 .map_err(storage_err)?
467 {
468 Some(value_guard) => {
469 rmp_serde::from_slice(value_guard.value()).map_err(|e| {
470 CerememoryError::Serialization(format!(
471 "msgpack decode session index: {e}"
472 ))
473 })?
474 }
475 None => Vec::new(),
476 };
477 entries.retain(|entry| entry.id != id);
478 let packed_entries = rmp_serde::to_vec(&entries).map_err(|e| {
479 CerememoryError::Serialization(format!("msgpack encode session index: {e}"))
480 })?;
481 session_index
482 .insert(existing_session_id.as_str(), packed_entries.as_slice())
483 .map_err(storage_err)?;
484 }
485 txn.commit().map_err(storage_err)?;
486 Ok::<bool, CerememoryError>(true)
487 })
488 .await
489 .map_err(|e| CerememoryError::Internal(format!("spawn_blocking panicked: {e}")))??;
490
491 if existing_had_text {
492 self.text_index.remove(id)?;
493 }
494 Ok(true)
495 }
496
497 pub async fn get_all(&self) -> Result<Vec<RawJournalRecord>, CerememoryError> {
499 let db = self.db.clone();
500 tokio::task::spawn_blocking(move || {
501 let txn = db.begin_read().map_err(storage_err)?;
502 let table = txn.open_table(RAW_JOURNAL_RECORDS).map_err(storage_err)?;
503 get_all_raw_records_sync(&table)
504 })
505 .await
506 .map_err(|e| CerememoryError::Internal(format!("spawn_blocking panicked: {e}")))?
507 }
508
509 pub async fn query_session(
511 &self,
512 session_id: &str,
513 ) -> Result<Vec<RawJournalRecord>, CerememoryError> {
514 let session_id = session_id.trim().to_string();
515 if session_id.is_empty() {
516 return Err(CerememoryError::Validation(
517 "session_id must not be empty".to_string(),
518 ));
519 }
520
521 let db = self.db.clone();
522 tokio::task::spawn_blocking(move || {
523 let txn = db.begin_read().map_err(storage_err)?;
524 let records_table = txn.open_table(RAW_JOURNAL_RECORDS).map_err(storage_err)?;
525 let session_table = txn
526 .open_table(RAW_JOURNAL_SESSION_INDEX)
527 .map_err(storage_err)?;
528 let entries = get_session_index_entries_sync(&session_table, &session_id)?;
529 let mut records = get_raw_records_by_entries_sync(&records_table, &entries)?;
530 records.sort_by(|a, b| {
531 a.created_at
532 .cmp(&b.created_at)
533 .then_with(|| a.id.cmp(&b.id))
534 });
535 Ok(records)
536 })
537 .await
538 .map_err(|e| CerememoryError::Internal(format!("spawn_blocking panicked: {e}")))?
539 }
540
541 pub async fn query_session_range(
543 &self,
544 session_id: &str,
545 start: DateTime<Utc>,
546 end: DateTime<Utc>,
547 ) -> Result<Vec<RawJournalRecord>, CerememoryError> {
548 let session_id = session_id.trim().to_string();
549 if session_id.is_empty() {
550 return Err(CerememoryError::Validation(
551 "session_id must not be empty".to_string(),
552 ));
553 }
554 if start > end {
555 return Err(CerememoryError::Validation(
556 "Invalid time range: start must be earlier than or equal to end".to_string(),
557 ));
558 }
559
560 let db = self.db.clone();
561 tokio::task::spawn_blocking(move || {
562 let txn = db.begin_read().map_err(storage_err)?;
563 let records_table = txn.open_table(RAW_JOURNAL_RECORDS).map_err(storage_err)?;
564 let session_table = txn
565 .open_table(RAW_JOURNAL_SESSION_INDEX)
566 .map_err(storage_err)?;
567 let entries = get_session_index_entries_sync(&session_table, &session_id)?;
568 let filtered_entries: Vec<SessionIndexEntry> = entries
569 .into_iter()
570 .filter(|entry| entry.created_at >= start && entry.created_at <= end)
571 .collect();
572 let mut records = get_raw_records_by_entries_sync(&records_table, &filtered_entries)?;
573 records.sort_by(|a, b| {
574 a.created_at
575 .cmp(&b.created_at)
576 .then_with(|| a.id.cmp(&b.id))
577 });
578 Ok(records)
579 })
580 .await
581 .map_err(|e| CerememoryError::Internal(format!("spawn_blocking panicked: {e}")))?
582 }
583
584 pub async fn count(&self) -> Result<usize, CerememoryError> {
585 let db = self.db.clone();
586 tokio::task::spawn_blocking(move || {
587 let txn = db.begin_read().map_err(storage_err)?;
588 let table = txn.open_table(RAW_JOURNAL_RECORDS).map_err(storage_err)?;
589 let len = table.len().map_err(storage_err)?;
590 usize::try_from(len).map_err(|_| {
591 CerememoryError::Internal(format!("raw journal length {len} does not fit usize"))
592 })
593 })
594 .await
595 .map_err(|e| CerememoryError::Internal(format!("spawn_blocking panicked: {e}")))?
596 }
597
598 pub async fn search_text(
600 &self,
601 query: &str,
602 session_id: Option<&str>,
603 limit: usize,
604 ) -> Result<Vec<RawJournalRecord>, CerememoryError> {
605 let hits = self.text_index.search(query, session_id, limit)?;
606 let mut records = Vec::with_capacity(hits.len());
607 for hit in hits {
608 if let Some(record) = self.get(&hit.record_id).await? {
609 records.push(record);
610 }
611 }
612 Ok(records)
613 }
614}
615
616fn raw_text_index_path(path: &Path) -> PathBuf {
617 let stem = path
618 .file_stem()
619 .and_then(|stem| stem.to_str())
620 .unwrap_or("raw_journal");
621 let dir_name = format!("{stem}_text_index");
622 path.with_file_name(dir_name)
623}
624
625fn get_raw_record_sync(
626 table: &redb::ReadOnlyTable<&[u8], &[u8]>,
627 id: &Uuid,
628) -> Result<Option<RawJournalRecord>, CerememoryError> {
629 match table.get(id.as_bytes().as_slice()).map_err(storage_err)? {
630 Some(value_guard) => {
631 let record: RawJournalRecord = rmp_serde::from_slice(value_guard.value())
632 .map_err(|e| CerememoryError::Serialization(format!("msgpack decode: {e}")))?;
633 Ok(Some(record))
634 }
635 None => Ok(None),
636 }
637}
638
639fn get_all_raw_records_sync(
640 table: &redb::ReadOnlyTable<&[u8], &[u8]>,
641) -> Result<Vec<RawJournalRecord>, CerememoryError> {
642 let mut records = Vec::new();
643 for entry in table.iter().map_err(storage_err)? {
644 let (_, value) = entry.map_err(storage_err)?;
645 let record: RawJournalRecord = rmp_serde::from_slice(value.value())
646 .map_err(|e| CerememoryError::Serialization(format!("msgpack decode: {e}")))?;
647 records.push(record);
648 }
649 Ok(records)
650}
651
652fn get_session_index_entries_sync(
653 table: &redb::ReadOnlyTable<&str, &[u8]>,
654 session_id: &str,
655) -> Result<Vec<SessionIndexEntry>, CerememoryError> {
656 match table.get(session_id).map_err(storage_err)? {
657 Some(value_guard) => rmp_serde::from_slice(value_guard.value()).map_err(|e| {
658 CerememoryError::Serialization(format!("msgpack decode session index: {e}"))
659 }),
660 None => Ok(Vec::new()),
661 }
662}
663
664fn get_raw_records_by_entries_sync(
665 table: &redb::ReadOnlyTable<&[u8], &[u8]>,
666 entries: &[SessionIndexEntry],
667) -> Result<Vec<RawJournalRecord>, CerememoryError> {
668 let mut records = Vec::with_capacity(entries.len());
669 for entry in entries {
670 if let Some(record) = get_raw_record_sync(table, &entry.id)? {
671 records.push(record);
672 }
673 }
674 Ok(records)
675}
676
677#[cfg(test)]
678mod tests {
679 use super::*;
680 use cerememory_core::types::{RawSource, RawSpeaker, RawVisibility, SecrecyLevel};
681
682 fn make_record(session_id: &str, text: &str) -> RawJournalRecord {
683 RawJournalRecord::new_text(
684 session_id,
685 RawSource::Conversation,
686 RawSpeaker::User,
687 RawVisibility::Normal,
688 SecrecyLevel::Public,
689 text,
690 )
691 }
692
693 #[tokio::test]
694 async fn append_and_get_roundtrip() {
695 let store = RawJournalStore::open_in_memory().unwrap();
696 let record = make_record("sess-1", "hello raw journal");
697 let id = record.id;
698
699 store.append(record).await.unwrap();
700 let restored = store.get(&id).await.unwrap().unwrap();
701
702 assert_eq!(restored.id, id);
703 assert_eq!(restored.session_id, "sess-1");
704 assert_eq!(restored.text_content(), Some("hello raw journal"));
705 }
706
707 #[tokio::test]
708 async fn query_session_filters_records() {
709 let store = RawJournalStore::open_in_memory().unwrap();
710 store.append(make_record("sess-1", "one")).await.unwrap();
711 store.append(make_record("sess-2", "two")).await.unwrap();
712 store.append(make_record("sess-1", "three")).await.unwrap();
713
714 let sess_1 = store.query_session("sess-1").await.unwrap();
715 assert_eq!(sess_1.len(), 2);
716 assert!(sess_1.iter().all(|record| record.session_id == "sess-1"));
717 }
718
719 #[tokio::test]
720 async fn query_session_range_filters_by_time() {
721 let store = RawJournalStore::open_in_memory().unwrap();
722 let base = Utc::now();
723
724 let mut early = make_record("sess-1", "early");
725 early.created_at = base - chrono::Duration::hours(2);
726 early.updated_at = early.created_at;
727
728 let mut middle = make_record("sess-1", "middle");
729 middle.created_at = base - chrono::Duration::hours(1);
730 middle.updated_at = middle.created_at;
731
732 let mut late = make_record("sess-1", "late");
733 late.created_at = base + chrono::Duration::hours(1);
734 late.updated_at = late.created_at;
735
736 store.append(early).await.unwrap();
737 store.append(middle).await.unwrap();
738 store.append(late).await.unwrap();
739
740 let records = store
741 .query_session_range(
742 "sess-1",
743 base - chrono::Duration::hours(1),
744 base + chrono::Duration::minutes(10),
745 )
746 .await
747 .unwrap();
748
749 assert_eq!(records.len(), 1);
750 assert_eq!(records[0].text_content(), Some("middle"));
751 }
752
753 #[tokio::test]
754 async fn count_tracks_records() {
755 let store = RawJournalStore::open_in_memory().unwrap();
756 assert_eq!(store.count().await.unwrap(), 0);
757
758 store.append(make_record("sess-1", "one")).await.unwrap();
759 store.append(make_record("sess-1", "two")).await.unwrap();
760
761 assert_eq!(store.count().await.unwrap(), 2);
762 }
763
764 #[tokio::test]
765 async fn search_text_filters_by_query_and_session() {
766 let store = RawJournalStore::open_in_memory().unwrap();
767 store
768 .append(make_record("sess-1", "timeout retries are idempotent only"))
769 .await
770 .unwrap();
771 store
772 .append(make_record("sess-2", "timeout budget differs"))
773 .await
774 .unwrap();
775
776 let sess_1 = store
777 .search_text("idempotent", Some("sess-1"), 10)
778 .await
779 .unwrap();
780 assert_eq!(sess_1.len(), 1);
781 assert_eq!(sess_1[0].session_id, "sess-1");
782
783 let all = store.search_text("timeout", None, 10).await.unwrap();
784 assert_eq!(all.len(), 2);
785 }
786}