1use crate::db;
2#[cfg(feature = "hnsw")]
3use crate::db::IndexOpKind;
4use crate::error::MemoryError;
5use crate::quantize::{self, Quantizer};
6use crate::types::{EpisodeMeta, EpisodeOutcome, VerificationStatus};
7use crate::{build_episode_search_text, verification_status_for_outcome, MemoryStore};
8use rusqlite::{params, Connection};
9use stack_ids::TraceCtx;
10
11pub(crate) fn episode_item_key(episode_id: &str) -> String {
15 format!("episode:{episode_id}")
16}
17
18pub(crate) fn episode_node_id(episode_id: &str) -> String {
20 format!("episode:{episode_id}")
21}
22
23pub(crate) fn resolve_primary_episode_id_legacy(
27 conn: &Connection,
28 document_id: &str,
29) -> Result<Option<String>, MemoryError> {
30 match conn.query_row(
31 "SELECT episode_id FROM episodes WHERE document_id = ?1 ORDER BY created_at ASC LIMIT 1",
32 params![document_id],
33 |row| row.get(0),
34 ) {
35 Ok(id) => Ok(Some(id)),
36 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
37 Err(err) => Err(MemoryError::Database(err)),
38 }
39}
40
41pub(crate) fn list_document_episode_ids(
43 conn: &Connection,
44 document_id: &str,
45) -> Result<Vec<String>, MemoryError> {
46 let mut stmt = conn.prepare(
47 "SELECT episode_id FROM episodes WHERE document_id = ?1 ORDER BY created_at ASC",
48 )?;
49 let ids = stmt
50 .query_map(params![document_id], |row| row.get::<_, String>(0))?
51 .collect::<Result<Vec<_>, _>>()?;
52 Ok(ids)
53}
54
55#[allow(clippy::too_many_arguments)]
58pub(crate) fn create_episode(
59 conn: &Connection,
60 episode_id: &str,
61 document_id: &str,
62 meta: &EpisodeMeta,
63 search_text: &str,
64 embedding_bytes: &[u8],
65 q8_bytes: Option<&[u8]>,
66 trace_id: Option<&str>,
67) -> Result<String, MemoryError> {
68 let cause_ids_json =
69 serde_json::to_string(&meta.cause_ids).map_err(|e| MemoryError::Other(e.to_string()))?;
70 let verification_json = serde_json::to_string(&meta.verification_status)
71 .map_err(|e| MemoryError::Other(e.to_string()))?;
72 #[cfg(feature = "hnsw")]
73 let item_key = episode_item_key(episode_id);
74
75 db::with_transaction(conn, |tx| {
76 let exists: bool = tx.query_row(
77 "SELECT EXISTS(SELECT 1 FROM documents WHERE id = ?1)",
78 params![document_id],
79 |row| row.get(0),
80 )?;
81 if !exists {
82 return Err(MemoryError::DocumentNotFound(document_id.to_string()));
83 }
84
85 tx.execute(
86 "INSERT INTO episodes
87 (episode_id, document_id, cause_ids, effect_type, outcome, confidence,
88 verification_status, experiment_id, search_text, embedding, embedding_q8,
89 trace_id, updated_at)
90 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, datetime('now'))",
91 params![
92 episode_id,
93 document_id,
94 cause_ids_json,
95 meta.effect_type,
96 meta.outcome.as_str(),
97 meta.confidence,
98 verification_json,
99 meta.experiment_id,
100 search_text,
101 embedding_bytes,
102 q8_bytes,
103 trace_id
104 ],
105 )?;
106
107 tx.execute(
109 "INSERT INTO episodes_rowid_map (episode_id, document_id) VALUES (?1, ?2)",
110 params![episode_id, document_id],
111 )?;
112 let fts_rowid: i64 = tx.query_row(
113 "SELECT rowid FROM episodes_rowid_map WHERE episode_id = ?1",
114 params![episode_id],
115 |row| row.get(0),
116 )?;
117 tx.execute(
118 "INSERT INTO episodes_fts (rowid, content) VALUES (?1, ?2)",
119 params![fts_rowid, search_text],
120 )?;
121
122 sync_causal_edges(tx, episode_id, &meta.cause_ids)?;
124
125 #[cfg(feature = "hnsw")]
126 db::queue_pending_index_op(tx, &item_key, "episode", IndexOpKind::Upsert)?;
127 Ok(episode_id.to_string())
128 })
129}
130
131#[allow(clippy::too_many_arguments)]
137pub(crate) fn upsert_episode(
138 conn: &Connection,
139 document_id: &str,
140 meta: &EpisodeMeta,
141 search_text: &str,
142 embedding_bytes: &[u8],
143 q8_bytes: Option<&[u8]>,
144 trace_id: Option<&str>,
145) -> Result<String, MemoryError> {
146 let cause_ids_json =
147 serde_json::to_string(&meta.cause_ids).map_err(|e| MemoryError::Other(e.to_string()))?;
148 let verification_json = serde_json::to_string(&meta.verification_status)
149 .map_err(|e| MemoryError::Other(e.to_string()))?;
150
151 let existing_episode_id = resolve_primary_episode_id_legacy(conn, document_id)?;
153
154 let episode_id = existing_episode_id.unwrap_or_else(|| format!("{}-ep0", document_id));
155
156 #[cfg(feature = "hnsw")]
157 let item_key = episode_item_key(&episode_id);
158
159 db::with_transaction(conn, |tx| {
160 let old_search_text: Option<String> = tx
162 .query_row(
163 "SELECT search_text FROM episodes WHERE episode_id = ?1",
164 params![episode_id],
165 |row| row.get(0),
166 )
167 .ok();
168 let exists: bool = tx.query_row(
169 "SELECT EXISTS(SELECT 1 FROM documents WHERE id = ?1)",
170 params![document_id],
171 |row| row.get(0),
172 )?;
173 if !exists {
174 return Err(MemoryError::DocumentNotFound(document_id.to_string()));
175 }
176
177 if old_search_text.is_some() {
178 tx.execute(
180 "UPDATE episodes SET
181 cause_ids = ?1,
182 effect_type = ?2,
183 outcome = ?3,
184 confidence = ?4,
185 verification_status = ?5,
186 experiment_id = ?6,
187 search_text = ?7,
188 embedding = ?8,
189 embedding_q8 = ?9,
190 trace_id = COALESCE(?10, trace_id),
191 updated_at = datetime('now')
192 WHERE episode_id = ?11",
193 params![
194 cause_ids_json,
195 meta.effect_type,
196 meta.outcome.as_str(),
197 meta.confidence,
198 verification_json,
199 meta.experiment_id,
200 search_text,
201 embedding_bytes,
202 q8_bytes,
203 trace_id,
204 episode_id
205 ],
206 )?;
207
208 let fts_rowid: i64 = tx.query_row(
210 "SELECT rowid FROM episodes_rowid_map WHERE episode_id = ?1",
211 params![episode_id],
212 |row| row.get(0),
213 )?;
214 if let Some(old_text) = old_search_text {
215 tx.execute(
216 "INSERT INTO episodes_fts (episodes_fts, rowid, content) VALUES ('delete', ?1, ?2)",
217 params![fts_rowid, old_text],
218 )?;
219 }
220 tx.execute(
221 "INSERT INTO episodes_fts (rowid, content) VALUES (?1, ?2)",
222 params![fts_rowid, search_text],
223 )?;
224 } else {
225 tx.execute(
227 "INSERT INTO episodes
228 (episode_id, document_id, cause_ids, effect_type, outcome, confidence,
229 verification_status, experiment_id, search_text, embedding, embedding_q8,
230 trace_id, updated_at)
231 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, datetime('now'))",
232 params![
233 episode_id,
234 document_id,
235 cause_ids_json,
236 meta.effect_type,
237 meta.outcome.as_str(),
238 meta.confidence,
239 verification_json,
240 meta.experiment_id,
241 search_text,
242 embedding_bytes,
243 q8_bytes,
244 trace_id
245 ],
246 )?;
247
248 tx.execute(
250 "INSERT INTO episodes_rowid_map (episode_id, document_id) VALUES (?1, ?2)",
251 params![episode_id, document_id],
252 )?;
253 let fts_rowid: i64 = tx.query_row(
254 "SELECT rowid FROM episodes_rowid_map WHERE episode_id = ?1",
255 params![episode_id],
256 |row| row.get(0),
257 )?;
258 tx.execute(
259 "INSERT INTO episodes_fts (rowid, content) VALUES (?1, ?2)",
260 params![fts_rowid, search_text],
261 )?;
262 }
263
264 sync_causal_edges(tx, &episode_id, &meta.cause_ids)?;
266
267 #[cfg(feature = "hnsw")]
268 db::queue_pending_index_op(tx, &item_key, "episode", IndexOpKind::Upsert)?;
269 Ok(episode_id.to_string())
270 })
271}
272
273fn sync_causal_edges(
275 tx: &rusqlite::Transaction<'_>,
276 episode_id: &str,
277 cause_ids: &[String],
278) -> Result<(), MemoryError> {
279 tx.execute(
280 "DELETE FROM episode_causes WHERE episode_id = ?1",
281 params![episode_id],
282 )?;
283 for (ordinal, cause_id) in cause_ids.iter().enumerate() {
284 tx.execute(
285 "INSERT OR IGNORE INTO episode_causes (episode_id, cause_node_id, ordinal)
286 VALUES (?1, ?2, ?3)",
287 params![episode_id, cause_id, ordinal as i64],
288 )?;
289 }
290 Ok(())
291}
292
293#[allow(clippy::too_many_arguments)]
296pub(crate) fn update_episode_outcome(
297 conn: &Connection,
298 document_id: &str,
299 outcome: EpisodeOutcome,
300 confidence: f32,
301 experiment_id: Option<&str>,
302 verification_status: &VerificationStatus,
303 search_text: &str,
304 embedding_bytes: &[u8],
305 q8_bytes: Option<&[u8]>,
306) -> Result<(), MemoryError> {
307 let episode_id = resolve_primary_episode_id_legacy(conn, document_id)?
309 .ok_or_else(|| MemoryError::DocumentNotFound(document_id.to_string()))?;
310
311 update_episode_outcome_by_id(
312 conn,
313 &episode_id,
314 outcome,
315 confidence,
316 experiment_id,
317 verification_status,
318 search_text,
319 embedding_bytes,
320 q8_bytes,
321 )
322}
323
324#[allow(clippy::too_many_arguments)]
326pub(crate) fn update_episode_outcome_by_id(
327 conn: &Connection,
328 episode_id: &str,
329 outcome: EpisodeOutcome,
330 confidence: f32,
331 experiment_id: Option<&str>,
332 verification_status: &VerificationStatus,
333 search_text: &str,
334 embedding_bytes: &[u8],
335 q8_bytes: Option<&[u8]>,
336) -> Result<(), MemoryError> {
337 let verification_json = serde_json::to_string(verification_status)
338 .map_err(|e| MemoryError::Other(e.to_string()))?;
339 #[cfg(feature = "hnsw")]
340 let item_key = episode_item_key(episode_id);
341
342 db::with_transaction(conn, |tx| {
343 let old_search_text: String = tx
344 .query_row(
345 "SELECT search_text FROM episodes WHERE episode_id = ?1",
346 params![episode_id],
347 |row| row.get(0),
348 )
349 .map_err(|e| MemoryError::EpisodeNotFound(format!("{}: {e}", episode_id)))?;
350 let fts_rowid: i64 = tx.query_row(
351 "SELECT rowid FROM episodes_rowid_map WHERE episode_id = ?1",
352 params![episode_id],
353 |row| row.get(0),
354 )?;
355
356 tx.execute(
357 "INSERT INTO episodes_fts (episodes_fts, rowid, content) VALUES ('delete', ?1, ?2)",
358 params![fts_rowid, old_search_text],
359 )?;
360 tx.execute(
361 "UPDATE episodes
362 SET outcome = ?1,
363 confidence = ?2,
364 experiment_id = COALESCE(?3, experiment_id),
365 verification_status = ?4,
366 search_text = ?5,
367 embedding = ?6,
368 embedding_q8 = ?7,
369 updated_at = datetime('now')
370 WHERE episode_id = ?8",
371 params![
372 outcome.as_str(),
373 confidence,
374 experiment_id,
375 verification_json,
376 search_text,
377 embedding_bytes,
378 q8_bytes,
379 episode_id
380 ],
381 )?;
382 tx.execute(
383 "INSERT INTO episodes_fts (rowid, content) VALUES (?1, ?2)",
384 params![fts_rowid, search_text],
385 )?;
386
387 #[cfg(feature = "hnsw")]
388 db::queue_pending_index_op(tx, &item_key, "episode", IndexOpKind::Upsert)?;
389 Ok(())
390 })
391}
392
393pub(crate) fn search_episodes(
394 conn: &Connection,
395 effect_type: Option<&str>,
396 outcome: Option<&EpisodeOutcome>,
397 limit: usize,
398) -> Result<Vec<(String, EpisodeMeta)>, MemoryError> {
399 let effect_type = effect_type.map(ToOwned::to_owned);
400 let outcome = outcome.map(|value| value.as_str().to_string());
401
402 let mut sql = String::from(
403 "SELECT episode_id, document_id, cause_ids, effect_type, outcome, confidence, verification_status, experiment_id
404 FROM episodes
405 WHERE 1 = 1",
406 );
407 let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
408
409 if let Some(effect_type) = &effect_type {
410 sql.push_str(&format!(" AND effect_type = ?{}", params.len() + 1));
411 params.push(Box::new(effect_type.clone()));
412 }
413 if let Some(outcome) = &outcome {
414 sql.push_str(&format!(" AND outcome = ?{}", params.len() + 1));
415 params.push(Box::new(outcome.clone()));
416 }
417 sql.push_str(&format!(" ORDER BY updated_at DESC LIMIT {}", limit));
418
419 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
420 params.iter().map(|value| value.as_ref()).collect();
421 let mut stmt = conn.prepare(&sql)?;
422 let rows = stmt
423 .query_map(&*param_refs, |row| {
424 Ok((
425 row.get::<_, String>(0)?,
426 row.get::<_, String>(1)?,
427 row.get::<_, String>(2)?,
428 row.get::<_, String>(3)?,
429 row.get::<_, String>(4)?,
430 row.get::<_, f32>(5)?,
431 row.get::<_, String>(6)?,
432 row.get::<_, Option<String>>(7)?,
433 ))
434 })?
435 .collect::<Result<Vec<_>, _>>()?;
436
437 rows.into_iter()
438 .map(
439 |(
440 _episode_id,
441 document_id,
442 cause_ids_raw,
443 effect_type,
444 outcome_raw,
445 confidence,
446 verification_status_raw,
447 experiment_id,
448 )| {
449 Ok((
450 document_id.clone(),
451 EpisodeMeta {
452 cause_ids: db::parse_string_list_json(
453 "episodes",
454 &document_id,
455 "cause_ids",
456 &cause_ids_raw,
457 )?,
458 effect_type,
459 outcome: db::parse_episode_outcome(&document_id, &outcome_raw)?,
460 confidence,
461 verification_status: db::parse_verification_status(
462 &document_id,
463 &verification_status_raw,
464 )?,
465 experiment_id,
466 },
467 ))
468 },
469 )
470 .collect()
471}
472
473pub(crate) fn get_episode(
475 conn: &Connection,
476 episode_id: &str,
477) -> Result<Option<(String, EpisodeMeta)>, MemoryError> {
478 let row = conn.query_row(
479 "SELECT document_id, cause_ids, effect_type, outcome, confidence, verification_status, experiment_id
480 FROM episodes
481 WHERE episode_id = ?1",
482 params![episode_id],
483 |row| {
484 Ok((
485 row.get::<_, String>(0)?,
486 row.get::<_, String>(1)?,
487 row.get::<_, String>(2)?,
488 row.get::<_, String>(3)?,
489 row.get::<_, f32>(4)?,
490 row.get::<_, String>(5)?,
491 row.get::<_, Option<String>>(6)?,
492 ))
493 },
494 );
495
496 match row {
497 Ok((
498 document_id,
499 cause_ids_raw,
500 effect_type,
501 outcome_raw,
502 confidence,
503 verification_status_raw,
504 experiment_id,
505 )) => Ok(Some((
506 document_id.clone(),
507 EpisodeMeta {
508 cause_ids: db::parse_string_list_json(
509 "episodes",
510 &document_id,
511 "cause_ids",
512 &cause_ids_raw,
513 )?,
514 effect_type,
515 outcome: db::parse_episode_outcome(&document_id, &outcome_raw)?,
516 confidence,
517 verification_status: db::parse_verification_status(
518 &document_id,
519 &verification_status_raw,
520 )?,
521 experiment_id,
522 },
523 ))),
524 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
525 Err(err) => Err(MemoryError::Database(err)),
526 }
527}
528
529pub(crate) fn load_episode_meta(
532 conn: &Connection,
533 document_id: &str,
534) -> Result<Option<EpisodeMeta>, MemoryError> {
535 let row = conn.query_row(
536 "SELECT cause_ids, effect_type, outcome, confidence, verification_status, experiment_id
537 FROM episodes
538 WHERE document_id = ?1
539 ORDER BY created_at ASC
540 LIMIT 1",
541 params![document_id],
542 |row| {
543 Ok((
544 row.get::<_, String>(0)?,
545 row.get::<_, String>(1)?,
546 row.get::<_, String>(2)?,
547 row.get::<_, f32>(3)?,
548 row.get::<_, String>(4)?,
549 row.get::<_, Option<String>>(5)?,
550 ))
551 },
552 );
553
554 match row {
555 Ok((
556 cause_ids_raw,
557 effect_type,
558 outcome_raw,
559 confidence,
560 verification_status_raw,
561 experiment_id,
562 )) => Ok(Some(EpisodeMeta {
563 cause_ids: db::parse_string_list_json(
564 "episodes",
565 document_id,
566 "cause_ids",
567 &cause_ids_raw,
568 )?,
569 effect_type,
570 outcome: db::parse_episode_outcome(document_id, &outcome_raw)?,
571 confidence,
572 verification_status: db::parse_verification_status(
573 document_id,
574 &verification_status_raw,
575 )?,
576 experiment_id,
577 })),
578 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
579 Err(err) => Err(MemoryError::Database(err)),
580 }
581}
582
583pub(crate) fn load_episode_context(
584 conn: &Connection,
585 document_id: &str,
586) -> Result<(String, String), MemoryError> {
587 let title: String = conn
588 .query_row(
589 "SELECT title FROM documents WHERE id = ?1",
590 params![document_id],
591 |row| row.get(0),
592 )
593 .map_err(|e| MemoryError::DocumentNotFound(format!("{}: {e}", document_id)))?;
594
595 let mut stmt =
596 conn.prepare("SELECT content FROM chunks WHERE document_id = ?1 ORDER BY chunk_index ASC")?;
597 let chunks = stmt
598 .query_map(params![document_id], |row| row.get::<_, String>(0))?
599 .collect::<Result<Vec<_>, _>>()?;
600
601 Ok((title, chunks.join("\n")))
602}
603
604impl MemoryStore {
605 pub async fn ingest_episode(
611 &self,
612 document_id: &str,
613 meta: &EpisodeMeta,
614 ) -> Result<String, MemoryError> {
615 self.ingest_episode_with_trace(document_id, meta, None)
616 .await
617 }
618
619 pub async fn ingest_episode_with_trace(
621 &self,
622 document_id: &str,
623 meta: &EpisodeMeta,
624 trace_ctx: Option<&TraceCtx>,
625 ) -> Result<String, MemoryError> {
626 self.validate_content("episodes.effect_type", &meta.effect_type)?;
627 Self::validate_confidence(meta.confidence)?;
628 let doc_id = document_id.to_string();
629 let meta = meta.clone();
630 let (document_title, document_context) = self
631 .with_read_conn(move |conn| load_episode_context(conn, &doc_id))
632 .await?;
633 let search_text = build_episode_search_text(&document_title, &document_context, &meta);
634 let embedding = self.embed_text_internal(&search_text).await?;
635 self.validate_embedding_dimensions(&embedding)?;
636 let embedding_bytes = db::embedding_to_bytes(&embedding);
637 let q8_bytes = Quantizer::new(self.inner.config.embedding.dimensions)
639 .quantize(&embedding)
640 .map(|vector| quantize::pack_quantized(&vector))
641 .ok();
642 let trace_id_owned = trace_ctx.map(|value| value.trace_id.clone());
643
644 let doc_id = document_id.to_string();
645 let episode_id = self
646 .with_write_conn(move |conn| {
647 upsert_episode(
648 conn,
649 &doc_id,
650 &meta,
651 &search_text,
652 &embedding_bytes,
653 q8_bytes.as_deref(),
654 trace_id_owned.as_deref(),
655 )
656 })
657 .await?;
658
659 #[cfg(feature = "hnsw")]
660 self.sync_pending_hnsw_ops_best_effort("ingest_episode")
661 .await;
662
663 Ok(episode_id)
664 }
665
666 pub async fn create_episode(
668 &self,
669 episode_id: &str,
670 document_id: &str,
671 meta: &EpisodeMeta,
672 ) -> Result<String, MemoryError> {
673 self.create_episode_with_trace(episode_id, document_id, meta, None)
674 .await
675 }
676
677 pub async fn create_episode_with_trace(
679 &self,
680 episode_id: &str,
681 document_id: &str,
682 meta: &EpisodeMeta,
683 trace_ctx: Option<&TraceCtx>,
684 ) -> Result<String, MemoryError> {
685 self.validate_content("episodes.effect_type", &meta.effect_type)?;
686 Self::validate_confidence(meta.confidence)?;
687 let doc_id = document_id.to_string();
688 let meta = meta.clone();
689 let (document_title, document_context) = self
690 .with_read_conn(move |conn| load_episode_context(conn, &doc_id))
691 .await?;
692 let search_text = build_episode_search_text(&document_title, &document_context, &meta);
693 let embedding = self.embed_text_internal(&search_text).await?;
694 self.validate_embedding_dimensions(&embedding)?;
695 let embedding_bytes = db::embedding_to_bytes(&embedding);
696 let q8_bytes = Quantizer::new(self.inner.config.embedding.dimensions)
698 .quantize(&embedding)
699 .map(|vector| quantize::pack_quantized(&vector))
700 .ok();
701 let trace_id_owned = trace_ctx.map(|value| value.trace_id.clone());
702
703 let ep_id = episode_id.to_string();
704 let doc_id = document_id.to_string();
705 let created_ep_id = self
706 .with_write_conn(move |conn| {
707 crate::episodes::create_episode(
708 conn,
709 &ep_id,
710 &doc_id,
711 &meta,
712 &search_text,
713 &embedding_bytes,
714 q8_bytes.as_deref(),
715 trace_id_owned.as_deref(),
716 )
717 })
718 .await?;
719
720 #[cfg(feature = "hnsw")]
721 self.sync_pending_hnsw_ops_best_effort("create_episode")
722 .await;
723
724 Ok(created_ep_id)
725 }
726
727 pub async fn get_episode(
729 &self,
730 episode_id: &str,
731 ) -> Result<Option<(String, EpisodeMeta)>, MemoryError> {
732 let ep_id = episode_id.to_string();
733 self.with_read_conn(move |conn| get_episode(conn, &ep_id))
734 .await
735 }
736
737 pub async fn update_episode_outcome_by_id(
739 &self,
740 episode_id: &str,
741 outcome: EpisodeOutcome,
742 confidence: f32,
743 experiment_id: Option<&str>,
744 ) -> Result<(), MemoryError> {
745 Self::validate_confidence(confidence)?;
746 let ep_id = episode_id.to_string();
747 let ep_id_clone = ep_id.clone();
748 let (doc_id, current_meta) = self
749 .with_read_conn(move |conn| {
750 get_episode(conn, &ep_id_clone)?
751 .ok_or_else(|| MemoryError::EpisodeNotFound(ep_id_clone.clone()))
752 })
753 .await?;
754
755 let experiment_id_owned = experiment_id.map(|value| value.to_string());
756 let verification_status =
757 verification_status_for_outcome(&outcome, experiment_id_owned.as_deref());
758 let updated_meta = EpisodeMeta {
759 cause_ids: current_meta.cause_ids,
760 effect_type: current_meta.effect_type,
761 outcome: outcome.clone(),
762 confidence,
763 verification_status: verification_status.clone(),
764 experiment_id: experiment_id_owned.clone().or(current_meta.experiment_id),
765 };
766
767 let (document_title, document_context) = self
768 .with_read_conn(move |conn| load_episode_context(conn, &doc_id))
769 .await?;
770 let search_text =
771 build_episode_search_text(&document_title, &document_context, &updated_meta);
772 let embedding = self.embed_text_internal(&search_text).await?;
773 self.validate_embedding_dimensions(&embedding)?;
774 let embedding_bytes = db::embedding_to_bytes(&embedding);
775 let q8_bytes = Quantizer::new(self.inner.config.embedding.dimensions)
777 .quantize(&embedding)
778 .map(|vector| quantize::pack_quantized(&vector))
779 .ok();
780
781 self.with_write_conn(move |conn| {
782 crate::episodes::update_episode_outcome_by_id(
783 conn,
784 &ep_id,
785 outcome,
786 confidence,
787 experiment_id_owned.as_deref(),
788 &verification_status,
789 &search_text,
790 &embedding_bytes,
791 q8_bytes.as_deref(),
792 )
793 })
794 .await?;
795
796 #[cfg(feature = "hnsw")]
797 self.sync_pending_hnsw_ops_best_effort("update_episode_outcome_by_id")
798 .await;
799
800 Ok(())
801 }
802
803 pub async fn update_episode_outcome(
805 &self,
806 document_id: &str,
807 outcome: EpisodeOutcome,
808 confidence: f32,
809 experiment_id: Option<&str>,
810 ) -> Result<(), MemoryError> {
811 Self::validate_confidence(confidence)?;
812 let doc_id = document_id.to_string();
813 let current_meta = self
814 .with_read_conn(move |conn| load_episode_meta(conn, &doc_id))
815 .await?
816 .ok_or_else(|| MemoryError::DocumentNotFound(document_id.to_string()))?;
817
818 let experiment_id_owned = experiment_id.map(|value| value.to_string());
819 let verification_status =
820 verification_status_for_outcome(&outcome, experiment_id_owned.as_deref());
821 let updated_meta = EpisodeMeta {
822 cause_ids: current_meta.cause_ids,
823 effect_type: current_meta.effect_type,
824 outcome: outcome.clone(),
825 confidence,
826 verification_status: verification_status.clone(),
827 experiment_id: experiment_id_owned.clone().or(current_meta.experiment_id),
828 };
829
830 let doc_id = document_id.to_string();
831 let (document_title, document_context) = self
832 .with_read_conn(move |conn| load_episode_context(conn, &doc_id))
833 .await?;
834 let search_text =
835 build_episode_search_text(&document_title, &document_context, &updated_meta);
836 let embedding = self.embed_text_internal(&search_text).await?;
837 self.validate_embedding_dimensions(&embedding)?;
838 let embedding_bytes = db::embedding_to_bytes(&embedding);
839 let q8_bytes = Quantizer::new(self.inner.config.embedding.dimensions)
841 .quantize(&embedding)
842 .map(|vector| quantize::pack_quantized(&vector))
843 .ok();
844
845 let doc_id = document_id.to_string();
846 self.with_write_conn(move |conn| {
847 crate::episodes::update_episode_outcome(
848 conn,
849 &doc_id,
850 outcome,
851 confidence,
852 experiment_id_owned.as_deref(),
853 &verification_status,
854 &search_text,
855 &embedding_bytes,
856 q8_bytes.as_deref(),
857 )
858 })
859 .await?;
860
861 #[cfg(feature = "hnsw")]
862 self.sync_pending_hnsw_ops_best_effort("update_episode_outcome")
863 .await;
864
865 Ok(())
866 }
867
868 pub async fn search_episodes(
870 &self,
871 effect_type: Option<&str>,
872 outcome: Option<&EpisodeOutcome>,
873 limit: usize,
874 ) -> Result<Vec<(String, EpisodeMeta)>, MemoryError> {
875 let et = effect_type.map(|s| s.to_string());
876 let outcome_owned = outcome.cloned();
877
878 self.with_read_conn(move |conn| {
879 search_episodes(conn, et.as_deref(), outcome_owned.as_ref(), limit)
880 })
881 .await
882 }
883}