1use crate::db;
2#[cfg(feature = "hnsw")]
3use crate::db::IndexOpKind;
4use crate::error::MemoryError;
5use crate::quantize::{self, Quantizer};
6use crate::types::{EpisodeMeta, EpisodeOutcome, VerificationStatus};
7use crate::{build_episode_search_text, verification_status_for_outcome, MemoryStore};
8use rusqlite::{params, Connection};
9use stack_ids::{DigestBuilder, TraceCtx};
10use std::collections::BTreeSet;
11
12pub(crate) fn episode_item_key(episode_id: &str) -> String {
16 format!("episode:{episode_id}")
17}
18
19pub(crate) fn episode_node_id(episode_id: &str) -> String {
21 format!("episode:{episode_id}")
22}
23
24pub(crate) fn resolve_primary_episode_id_legacy(
28 conn: &Connection,
29 document_id: &str,
30) -> Result<Option<String>, MemoryError> {
31 match conn.query_row(
32 "SELECT episode_id FROM episodes WHERE document_id = ?1 ORDER BY created_at ASC LIMIT 1",
33 params![document_id],
34 |row| row.get(0),
35 ) {
36 Ok(id) => Ok(Some(id)),
37 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
38 Err(err) => Err(MemoryError::Database(err)),
39 }
40}
41
42pub(crate) fn list_document_episode_ids(
44 conn: &Connection,
45 document_id: &str,
46) -> Result<Vec<String>, MemoryError> {
47 let mut stmt = conn.prepare(
48 "SELECT episode_id FROM episodes WHERE document_id = ?1 ORDER BY created_at ASC",
49 )?;
50 let ids = stmt
51 .query_map(params![document_id], |row| row.get::<_, String>(0))?
52 .collect::<Result<Vec<_>, _>>()?;
53 Ok(ids)
54}
55
56#[allow(clippy::too_many_arguments)]
59pub(crate) fn create_episode(
60 conn: &Connection,
61 episode_id: &str,
62 document_id: &str,
63 meta: &EpisodeMeta,
64 search_text: &str,
65 embedding_bytes: &[u8],
66 q8_bytes: Option<&[u8]>,
67 trace_id: Option<&str>,
68) -> Result<String, MemoryError> {
69 let cause_ids_json =
70 serde_json::to_string(&meta.cause_ids).map_err(|e| MemoryError::Other(e.to_string()))?;
71 let verification_json = serde_json::to_string(&meta.verification_status)
72 .map_err(|e| MemoryError::Other(e.to_string()))?;
73 let item_key = episode_item_key(episode_id);
74
75 db::with_transaction(conn, |tx| {
76 let exists: bool = tx.query_row(
77 "SELECT EXISTS(SELECT 1 FROM documents WHERE id = ?1)",
78 params![document_id],
79 |row| row.get(0),
80 )?;
81 if !exists {
82 return Err(MemoryError::DocumentNotFound(document_id.to_string()));
83 }
84
85 tx.execute(
86 "INSERT INTO episodes
87 (episode_id, document_id, cause_ids, effect_type, outcome, confidence,
88 verification_status, experiment_id, search_text, embedding, embedding_q8,
89 trace_id, updated_at)
90 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, datetime('now'))",
91 params![
92 episode_id,
93 document_id,
94 cause_ids_json,
95 meta.effect_type,
96 meta.outcome.as_str(),
97 meta.confidence,
98 verification_json,
99 meta.experiment_id,
100 search_text,
101 embedding_bytes,
102 q8_bytes,
103 trace_id
104 ],
105 )?;
106
107 tx.execute(
109 "INSERT INTO episodes_rowid_map (episode_id, document_id) VALUES (?1, ?2)",
110 params![episode_id, document_id],
111 )?;
112 let fts_rowid: i64 = tx.query_row(
113 "SELECT rowid FROM episodes_rowid_map WHERE episode_id = ?1",
114 params![episode_id],
115 |row| row.get(0),
116 )?;
117 tx.execute(
118 "INSERT INTO episodes_fts (rowid, content) VALUES (?1, ?2)",
119 params![fts_rowid, search_text],
120 )?;
121
122 sync_causal_edges(tx, episode_id, &meta.cause_ids)?;
124
125 #[cfg(feature = "hnsw")]
126 db::queue_pending_index_op(tx, &item_key, "episode", IndexOpKind::Upsert)?;
127 db::invalidate_derived_vector_artifact(tx, &item_key)?;
128 Ok(episode_id.to_string())
129 })
130}
131
132#[allow(clippy::too_many_arguments)]
138pub(crate) fn upsert_episode(
139 conn: &Connection,
140 document_id: &str,
141 meta: &EpisodeMeta,
142 search_text: &str,
143 embedding_bytes: &[u8],
144 q8_bytes: Option<&[u8]>,
145 trace_id: Option<&str>,
146) -> Result<String, MemoryError> {
147 let cause_ids_json =
148 serde_json::to_string(&meta.cause_ids).map_err(|e| MemoryError::Other(e.to_string()))?;
149 let verification_json = serde_json::to_string(&meta.verification_status)
150 .map_err(|e| MemoryError::Other(e.to_string()))?;
151
152 let existing_episode_id = resolve_primary_episode_id_legacy(conn, document_id)?;
154
155 let episode_id = existing_episode_id.unwrap_or_else(|| format!("{}-ep0", document_id));
156
157 let item_key = episode_item_key(&episode_id);
158
159 db::with_transaction(conn, |tx| {
160 let old_search_text: Option<String> = tx
162 .query_row(
163 "SELECT search_text FROM episodes WHERE episode_id = ?1",
164 params![episode_id],
165 |row| row.get(0),
166 )
167 .ok();
168 let exists: bool = tx.query_row(
169 "SELECT EXISTS(SELECT 1 FROM documents WHERE id = ?1)",
170 params![document_id],
171 |row| row.get(0),
172 )?;
173 if !exists {
174 return Err(MemoryError::DocumentNotFound(document_id.to_string()));
175 }
176
177 if old_search_text.is_some() {
178 let prior_fact_digest: Option<String> = tx
181 .query_row(
182 "SELECT fact_digest FROM episodes WHERE episode_id = ?1
183 ORDER BY recorded_time DESC LIMIT 1",
184 params![episode_id],
185 |row| row.get(0),
186 )
187 .ok()
188 .flatten();
189
190 let mut digest_builder = DigestBuilder::new();
192 digest_builder.update_str("semantic-memory.episode.v1");
193 digest_builder.separator();
194 digest_builder.update_str(&cause_ids_json);
195 digest_builder.separator();
196 digest_builder.update_str(meta.effect_type.as_str());
197 digest_builder.separator();
198 digest_builder.update_str(meta.outcome.as_str());
199 digest_builder.separator();
200 digest_builder.update(&meta.confidence.to_le_bytes());
201 let new_fact_digest = format!("blake3:{}", digest_builder.finalize().hex());
202
203 let valid_time_sql: Option<String> =
205 meta.valid_time.map(|dt| format!("'{}'", dt.to_rfc3339()));
206
207 tx.execute(
210 &format!(
211 "UPDATE episodes SET
212 cause_ids = ?1,
213 effect_type = ?2,
214 outcome = ?3,
215 confidence = ?4,
216 verification_status = ?5,
217 experiment_id = ?6,
218 search_text = ?7,
219 embedding = ?8,
220 embedding_q8 = ?9,
221 trace_id = COALESCE(?10, trace_id),
222 updated_at = datetime('now'),
223 valid_time = {},
224 recorded_time = datetime('now'),
225 superseded_by = ?11,
226 fact_digest = ?12
227 WHERE episode_id = ?13",
228 valid_time_sql.as_deref().unwrap_or("NULL"),
229 ),
230 params![
231 cause_ids_json,
232 meta.effect_type,
233 meta.outcome.as_str(),
234 meta.confidence,
235 verification_json,
236 meta.experiment_id,
237 search_text,
238 embedding_bytes,
239 q8_bytes,
240 trace_id,
241 prior_fact_digest,
242 new_fact_digest,
243 episode_id,
244 ],
245 )?;
246 } else {
250 tx.execute(
252 "INSERT INTO episodes
253 (episode_id, document_id, cause_ids, effect_type, outcome, confidence,
254 verification_status, experiment_id, search_text, embedding, embedding_q8,
255 trace_id, updated_at)
256 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, datetime('now'))",
257 params![
258 episode_id,
259 document_id,
260 cause_ids_json,
261 meta.effect_type,
262 meta.outcome.as_str(),
263 meta.confidence,
264 verification_json,
265 meta.experiment_id,
266 search_text,
267 embedding_bytes,
268 q8_bytes,
269 trace_id
270 ],
271 )?;
272
273 tx.execute(
275 "INSERT INTO episodes_rowid_map (episode_id, document_id) VALUES (?1, ?2)",
276 params![episode_id, document_id],
277 )?;
278 let fts_rowid: i64 = tx.query_row(
279 "SELECT rowid FROM episodes_rowid_map WHERE episode_id = ?1",
280 params![episode_id],
281 |row| row.get(0),
282 )?;
283 tx.execute(
284 "INSERT INTO episodes_fts (rowid, content) VALUES (?1, ?2)",
285 params![fts_rowid, search_text],
286 )?;
287 }
288
289 sync_causal_edges(tx, &episode_id, &meta.cause_ids)?;
291
292 #[cfg(feature = "hnsw")]
293 db::queue_pending_index_op(tx, &item_key, "episode", IndexOpKind::Upsert)?;
294 db::invalidate_derived_vector_artifact(tx, &item_key)?;
295 Ok(episode_id.to_string())
296 })
297}
298
299fn sync_causal_edges(
301 tx: &rusqlite::Transaction<'_>,
302 episode_id: &str,
303 cause_ids: &[String],
304) -> Result<(), MemoryError> {
305 let mut seen = BTreeSet::new();
306 for cause_id in cause_ids {
307 if !seen.insert(cause_id) {
308 return Err(MemoryError::InvalidConfig {
309 field: "episodes.cause_ids",
310 reason: format!("duplicate cause id: {cause_id}"),
311 });
312 }
313 }
314 tx.execute(
315 "DELETE FROM episode_causes WHERE episode_id = ?1",
316 params![episode_id],
317 )?;
318 for (ordinal, cause_id) in cause_ids.iter().enumerate() {
319 tx.execute(
320 "INSERT INTO episode_causes (episode_id, cause_node_id, ordinal)
321 VALUES (?1, ?2, ?3)",
322 params![episode_id, cause_id, ordinal as i64],
323 )?;
324 }
325 Ok(())
326}
327
328#[allow(clippy::too_many_arguments)]
331pub(crate) fn update_episode_outcome(
332 conn: &Connection,
333 document_id: &str,
334 outcome: EpisodeOutcome,
335 confidence: f32,
336 experiment_id: Option<&str>,
337 verification_status: &VerificationStatus,
338 search_text: &str,
339 embedding_bytes: &[u8],
340 q8_bytes: Option<&[u8]>,
341) -> Result<(), MemoryError> {
342 let episode_id = resolve_primary_episode_id_legacy(conn, document_id)?
344 .ok_or_else(|| MemoryError::DocumentNotFound(document_id.to_string()))?;
345
346 update_episode_outcome_by_id(
347 conn,
348 &episode_id,
349 outcome,
350 confidence,
351 experiment_id,
352 verification_status,
353 search_text,
354 embedding_bytes,
355 q8_bytes,
356 )
357}
358
359#[allow(clippy::too_many_arguments)]
361pub(crate) fn update_episode_outcome_by_id(
362 conn: &Connection,
363 episode_id: &str,
364 outcome: EpisodeOutcome,
365 confidence: f32,
366 experiment_id: Option<&str>,
367 verification_status: &VerificationStatus,
368 search_text: &str,
369 embedding_bytes: &[u8],
370 q8_bytes: Option<&[u8]>,
371) -> Result<(), MemoryError> {
372 let verification_json = serde_json::to_string(verification_status)
373 .map_err(|e| MemoryError::Other(e.to_string()))?;
374 let item_key = episode_item_key(episode_id);
375
376 db::with_transaction(conn, |tx| {
377 let old_search_text: String = tx
378 .query_row(
379 "SELECT search_text FROM episodes WHERE episode_id = ?1",
380 params![episode_id],
381 |row| row.get(0),
382 )
383 .map_err(|e| MemoryError::EpisodeNotFound(format!("{}: {e}", episode_id)))?;
384 let fts_rowid: i64 = tx.query_row(
385 "SELECT rowid FROM episodes_rowid_map WHERE episode_id = ?1",
386 params![episode_id],
387 |row| row.get(0),
388 )?;
389
390 tx.execute(
391 "INSERT INTO episodes_fts (episodes_fts, rowid, content) VALUES ('delete', ?1, ?2)",
392 params![fts_rowid, old_search_text],
393 )?;
394 tx.execute(
395 "UPDATE episodes
396 SET outcome = ?1,
397 confidence = ?2,
398 experiment_id = COALESCE(?3, experiment_id),
399 verification_status = ?4,
400 search_text = ?5,
401 embedding = ?6,
402 embedding_q8 = ?7,
403 updated_at = datetime('now')
404 WHERE episode_id = ?8",
405 params![
406 outcome.as_str(),
407 confidence,
408 experiment_id,
409 verification_json,
410 search_text,
411 embedding_bytes,
412 q8_bytes,
413 episode_id
414 ],
415 )?;
416 tx.execute(
417 "INSERT INTO episodes_fts (rowid, content) VALUES (?1, ?2)",
418 params![fts_rowid, search_text],
419 )?;
420
421 #[cfg(feature = "hnsw")]
422 db::queue_pending_index_op(tx, &item_key, "episode", IndexOpKind::Upsert)?;
423 db::invalidate_derived_vector_artifact(tx, &item_key)?;
424 Ok(())
425 })
426}
427
428pub(crate) fn search_episodes(
429 conn: &Connection,
430 effect_type: Option<&str>,
431 outcome: Option<&EpisodeOutcome>,
432 limit: usize,
433) -> Result<Vec<(String, EpisodeMeta)>, MemoryError> {
434 const MAX_EPISODE_SEARCH_LIMIT: usize = 1_000;
435 let limit = limit.clamp(1, MAX_EPISODE_SEARCH_LIMIT);
436 let effect_type = effect_type.map(ToOwned::to_owned);
437 let outcome = outcome.map(|value| value.as_str().to_string());
438
439 let mut sql = String::from(
440 "SELECT episode_id, document_id, cause_ids, effect_type, outcome, confidence, verification_status, experiment_id
441 FROM episodes
442 WHERE 1 = 1",
443 );
444 let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
445
446 if let Some(effect_type) = &effect_type {
447 sql.push_str(&format!(" AND effect_type = ?{}", params.len() + 1));
448 params.push(Box::new(effect_type.clone()));
449 }
450 if let Some(outcome) = &outcome {
451 sql.push_str(&format!(" AND outcome = ?{}", params.len() + 1));
452 params.push(Box::new(outcome.clone()));
453 }
454 let limit_param = params.len() + 1;
455 sql.push_str(&format!(" ORDER BY updated_at DESC LIMIT ?{}", limit_param));
456 params.push(Box::new(limit as i64));
457
458 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
459 params.iter().map(|value| value.as_ref()).collect();
460 let mut stmt = conn.prepare(&sql)?;
461 let rows = stmt
462 .query_map(&*param_refs, |row| {
463 Ok((
464 row.get::<_, String>(0)?,
465 row.get::<_, String>(1)?,
466 row.get::<_, String>(2)?,
467 row.get::<_, String>(3)?,
468 row.get::<_, String>(4)?,
469 row.get::<_, f32>(5)?,
470 row.get::<_, String>(6)?,
471 row.get::<_, Option<String>>(7)?,
472 ))
473 })?
474 .collect::<Result<Vec<_>, _>>()?;
475
476 rows.into_iter()
477 .map(
478 |(
479 episode_id,
480 _document_id,
481 cause_ids_raw,
482 effect_type,
483 outcome_raw,
484 confidence,
485 verification_status_raw,
486 experiment_id,
487 )| {
488 Ok((
489 episode_id.clone(),
490 EpisodeMeta {
491 cause_ids: db::parse_string_list_json(
492 "episodes",
493 &episode_id,
494 "cause_ids",
495 &cause_ids_raw,
496 )?,
497 effect_type,
498 outcome: db::parse_episode_outcome(&episode_id, &outcome_raw)?,
499 confidence,
500 verification_status: db::parse_verification_status(
501 &episode_id,
502 &verification_status_raw,
503 )?,
504 experiment_id,
505 valid_time: None,
506 fact_digest: None,
507 },
508 ))
509 },
510 )
511 .collect()
512}
513
514pub(crate) fn get_episode(
516 conn: &Connection,
517 episode_id: &str,
518) -> Result<Option<(String, EpisodeMeta)>, MemoryError> {
519 let row = conn.query_row(
520 "SELECT document_id, cause_ids, effect_type, outcome, confidence, verification_status, experiment_id
521 FROM episodes
522 WHERE episode_id = ?1",
523 params![episode_id],
524 |row| {
525 Ok((
526 row.get::<_, String>(0)?,
527 row.get::<_, String>(1)?,
528 row.get::<_, String>(2)?,
529 row.get::<_, String>(3)?,
530 row.get::<_, f32>(4)?,
531 row.get::<_, String>(5)?,
532 row.get::<_, Option<String>>(6)?,
533 ))
534 },
535 );
536
537 match row {
538 Ok((
539 document_id,
540 cause_ids_raw,
541 effect_type,
542 outcome_raw,
543 confidence,
544 verification_status_raw,
545 experiment_id,
546 )) => Ok(Some((
547 document_id.clone(),
548 EpisodeMeta {
549 cause_ids: db::parse_string_list_json(
550 "episodes",
551 episode_id,
552 "cause_ids",
553 &cause_ids_raw,
554 )?,
555 effect_type,
556 outcome: db::parse_episode_outcome(episode_id, &outcome_raw)?,
557 confidence,
558 verification_status: db::parse_verification_status(
559 episode_id,
560 &verification_status_raw,
561 )?,
562 experiment_id,
563 valid_time: None,
564 fact_digest: None,
565 },
566 ))),
567 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
568 Err(err) => Err(MemoryError::Database(err)),
569 }
570}
571
572pub(crate) fn load_episode_meta(
575 conn: &Connection,
576 document_id: &str,
577) -> Result<Option<EpisodeMeta>, MemoryError> {
578 let row = conn.query_row(
579 "SELECT cause_ids, effect_type, outcome, confidence, verification_status, experiment_id
580 FROM episodes
581 WHERE document_id = ?1
582 ORDER BY created_at ASC
583 LIMIT 1",
584 params![document_id],
585 |row| {
586 Ok((
587 row.get::<_, String>(0)?,
588 row.get::<_, String>(1)?,
589 row.get::<_, String>(2)?,
590 row.get::<_, f32>(3)?,
591 row.get::<_, String>(4)?,
592 row.get::<_, Option<String>>(5)?,
593 ))
594 },
595 );
596
597 match row {
598 Ok((
599 cause_ids_raw,
600 effect_type,
601 outcome_raw,
602 confidence,
603 verification_status_raw,
604 experiment_id,
605 )) => Ok(Some(EpisodeMeta {
606 cause_ids: db::parse_string_list_json(
607 "episodes",
608 document_id,
609 "cause_ids",
610 &cause_ids_raw,
611 )?,
612 effect_type,
613 outcome: db::parse_episode_outcome(document_id, &outcome_raw)?,
614 confidence,
615 verification_status: db::parse_verification_status(
616 document_id,
617 &verification_status_raw,
618 )?,
619 experiment_id,
620 valid_time: None,
621 fact_digest: None,
622 })),
623 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
624 Err(err) => Err(MemoryError::Database(err)),
625 }
626}
627
628pub(crate) fn load_episode_context(
629 conn: &Connection,
630 document_id: &str,
631) -> Result<(String, String), MemoryError> {
632 let title: String = conn
633 .query_row(
634 "SELECT title FROM documents WHERE id = ?1",
635 params![document_id],
636 |row| row.get(0),
637 )
638 .map_err(|e| MemoryError::DocumentNotFound(format!("{}: {e}", document_id)))?;
639
640 let mut stmt =
641 conn.prepare("SELECT content FROM chunks WHERE document_id = ?1 ORDER BY chunk_index ASC")?;
642 let chunks = stmt
643 .query_map(params![document_id], |row| row.get::<_, String>(0))?
644 .collect::<Result<Vec<_>, _>>()?;
645
646 Ok((title, chunks.join("\n")))
647}
648
649impl MemoryStore {
650 pub async fn ingest_episode(
656 &self,
657 document_id: &str,
658 meta: &EpisodeMeta,
659 ) -> Result<String, MemoryError> {
660 self.ingest_episode_with_trace(document_id, meta, None)
661 .await
662 }
663
664 pub async fn ingest_episode_with_trace(
666 &self,
667 document_id: &str,
668 meta: &EpisodeMeta,
669 trace_ctx: Option<&TraceCtx>,
670 ) -> Result<String, MemoryError> {
671 self.validate_content("episodes.effect_type", &meta.effect_type)?;
672 Self::validate_confidence(meta.confidence)?;
673 let doc_id = document_id.to_string();
674 let meta = meta.clone();
675 let (document_title, document_context) = self
676 .with_read_conn(move |conn| load_episode_context(conn, &doc_id))
677 .await?;
678 let search_text = build_episode_search_text(&document_title, &document_context, &meta);
679 let embedding = self.embed_text_internal(&search_text).await?;
680 self.validate_embedding_dimensions(&embedding)?;
681 let embedding_bytes = db::embedding_to_bytes(&embedding);
682 let q8_bytes = Quantizer::new(self.inner.config.embedding.dimensions)
684 .quantize(&embedding)
685 .map(|vector| quantize::pack_quantized(&vector))
686 .ok();
687 let trace_id_owned = trace_ctx.map(|value| value.trace_id.clone());
688
689 let doc_id = document_id.to_string();
690 let episode_id = self
691 .with_write_conn(move |conn| {
692 upsert_episode(
693 conn,
694 &doc_id,
695 &meta,
696 &search_text,
697 &embedding_bytes,
698 q8_bytes.as_deref(),
699 trace_id_owned.as_deref(),
700 )
701 })
702 .await?;
703
704 #[cfg(feature = "hnsw")]
705 self.sync_pending_hnsw_ops_best_effort("ingest_episode")
706 .await;
707
708 Ok(episode_id)
709 }
710
711 pub async fn create_episode(
713 &self,
714 episode_id: &str,
715 document_id: &str,
716 meta: &EpisodeMeta,
717 ) -> Result<String, MemoryError> {
718 self.create_episode_with_trace(episode_id, document_id, meta, None)
719 .await
720 }
721
722 pub async fn create_episode_with_trace(
724 &self,
725 episode_id: &str,
726 document_id: &str,
727 meta: &EpisodeMeta,
728 trace_ctx: Option<&TraceCtx>,
729 ) -> Result<String, MemoryError> {
730 self.validate_content("episodes.effect_type", &meta.effect_type)?;
731 Self::validate_confidence(meta.confidence)?;
732 let doc_id = document_id.to_string();
733 let meta = meta.clone();
734 let (document_title, document_context) = self
735 .with_read_conn(move |conn| load_episode_context(conn, &doc_id))
736 .await?;
737 let search_text = build_episode_search_text(&document_title, &document_context, &meta);
738 let embedding = self.embed_text_internal(&search_text).await?;
739 self.validate_embedding_dimensions(&embedding)?;
740 let embedding_bytes = db::embedding_to_bytes(&embedding);
741 let q8_bytes = Quantizer::new(self.inner.config.embedding.dimensions)
743 .quantize(&embedding)
744 .map(|vector| quantize::pack_quantized(&vector))
745 .ok();
746 let trace_id_owned = trace_ctx.map(|value| value.trace_id.clone());
747
748 let ep_id = episode_id.to_string();
749 let doc_id = document_id.to_string();
750 let created_ep_id = self
751 .with_write_conn(move |conn| {
752 crate::episodes::create_episode(
753 conn,
754 &ep_id,
755 &doc_id,
756 &meta,
757 &search_text,
758 &embedding_bytes,
759 q8_bytes.as_deref(),
760 trace_id_owned.as_deref(),
761 )
762 })
763 .await?;
764
765 #[cfg(feature = "hnsw")]
766 self.sync_pending_hnsw_ops_best_effort("create_episode")
767 .await;
768
769 Ok(created_ep_id)
770 }
771
772 pub async fn get_episode(
774 &self,
775 episode_id: &str,
776 ) -> Result<Option<(String, EpisodeMeta)>, MemoryError> {
777 let ep_id = episode_id.to_string();
778 self.with_read_conn(move |conn| get_episode(conn, &ep_id))
779 .await
780 }
781
782 pub async fn update_episode_outcome_by_id(
784 &self,
785 episode_id: &str,
786 outcome: EpisodeOutcome,
787 confidence: f32,
788 experiment_id: Option<&str>,
789 ) -> Result<(), MemoryError> {
790 Self::validate_confidence(confidence)?;
791 let ep_id = episode_id.to_string();
792 let ep_id_clone = ep_id.clone();
793 let (doc_id, current_meta) = self
794 .with_read_conn(move |conn| {
795 get_episode(conn, &ep_id_clone)?
796 .ok_or_else(|| MemoryError::EpisodeNotFound(ep_id_clone.clone()))
797 })
798 .await?;
799
800 let experiment_id_owned = experiment_id.map(|value| value.to_string());
801 let verification_status =
802 verification_status_for_outcome(&outcome, experiment_id_owned.as_deref());
803 let updated_meta = EpisodeMeta {
804 cause_ids: current_meta.cause_ids,
805 effect_type: current_meta.effect_type,
806 outcome: outcome.clone(),
807 confidence,
808 verification_status: verification_status.clone(),
809 experiment_id: experiment_id_owned.clone().or(current_meta.experiment_id),
810 valid_time: current_meta.valid_time,
811 fact_digest: current_meta.fact_digest.clone(),
812 };
813
814 let (document_title, document_context) = self
815 .with_read_conn(move |conn| load_episode_context(conn, &doc_id))
816 .await?;
817 let search_text =
818 build_episode_search_text(&document_title, &document_context, &updated_meta);
819 let embedding = self.embed_text_internal(&search_text).await?;
820 self.validate_embedding_dimensions(&embedding)?;
821 let embedding_bytes = db::embedding_to_bytes(&embedding);
822 let q8_bytes = Quantizer::new(self.inner.config.embedding.dimensions)
824 .quantize(&embedding)
825 .map(|vector| quantize::pack_quantized(&vector))
826 .ok();
827
828 self.with_write_conn(move |conn| {
829 crate::episodes::update_episode_outcome_by_id(
830 conn,
831 &ep_id,
832 outcome,
833 confidence,
834 experiment_id_owned.as_deref(),
835 &verification_status,
836 &search_text,
837 &embedding_bytes,
838 q8_bytes.as_deref(),
839 )
840 })
841 .await?;
842
843 #[cfg(feature = "hnsw")]
844 self.sync_pending_hnsw_ops_best_effort("update_episode_outcome_by_id")
845 .await;
846
847 Ok(())
848 }
849
850 pub async fn update_episode_outcome(
852 &self,
853 document_id: &str,
854 outcome: EpisodeOutcome,
855 confidence: f32,
856 experiment_id: Option<&str>,
857 ) -> Result<(), MemoryError> {
858 Self::validate_confidence(confidence)?;
859 let doc_id = document_id.to_string();
860 let current_meta = self
861 .with_read_conn(move |conn| load_episode_meta(conn, &doc_id))
862 .await?
863 .ok_or_else(|| MemoryError::DocumentNotFound(document_id.to_string()))?;
864
865 let experiment_id_owned = experiment_id.map(|value| value.to_string());
866 let verification_status =
867 verification_status_for_outcome(&outcome, experiment_id_owned.as_deref());
868 let updated_meta = EpisodeMeta {
869 cause_ids: current_meta.cause_ids,
870 effect_type: current_meta.effect_type,
871 outcome: outcome.clone(),
872 confidence,
873 verification_status: verification_status.clone(),
874 experiment_id: experiment_id_owned.clone().or(current_meta.experiment_id),
875 valid_time: current_meta.valid_time,
876 fact_digest: current_meta.fact_digest.clone(),
877 };
878
879 let doc_id = document_id.to_string();
880 let (document_title, document_context) = self
881 .with_read_conn(move |conn| load_episode_context(conn, &doc_id))
882 .await?;
883 let search_text =
884 build_episode_search_text(&document_title, &document_context, &updated_meta);
885 let embedding = self.embed_text_internal(&search_text).await?;
886 self.validate_embedding_dimensions(&embedding)?;
887 let embedding_bytes = db::embedding_to_bytes(&embedding);
888 let q8_bytes = Quantizer::new(self.inner.config.embedding.dimensions)
890 .quantize(&embedding)
891 .map(|vector| quantize::pack_quantized(&vector))
892 .ok();
893
894 let doc_id = document_id.to_string();
895 self.with_write_conn(move |conn| {
896 crate::episodes::update_episode_outcome(
897 conn,
898 &doc_id,
899 outcome,
900 confidence,
901 experiment_id_owned.as_deref(),
902 &verification_status,
903 &search_text,
904 &embedding_bytes,
905 q8_bytes.as_deref(),
906 )
907 })
908 .await?;
909
910 #[cfg(feature = "hnsw")]
911 self.sync_pending_hnsw_ops_best_effort("update_episode_outcome")
912 .await;
913
914 Ok(())
915 }
916
917 pub async fn search_episodes(
919 &self,
920 effect_type: Option<&str>,
921 outcome: Option<&EpisodeOutcome>,
922 limit: usize,
923 ) -> Result<Vec<(String, EpisodeMeta)>, MemoryError> {
924 let et = effect_type.map(|s| s.to_string());
925 let outcome_owned = outcome.cloned();
926
927 self.with_read_conn(move |conn| {
928 search_episodes(conn, et.as_deref(), outcome_owned.as_ref(), limit)
929 })
930 .await
931 }
932}