Skip to main content

semantic_memory/
episodes.rs

1use crate::db;
2#[cfg(feature = "hnsw")]
3use crate::db::IndexOpKind;
4use crate::error::MemoryError;
5use crate::quantize::{self, Quantizer};
6use crate::types::{EpisodeMeta, EpisodeOutcome, VerificationStatus};
7use crate::{build_episode_search_text, verification_status_for_outcome, MemoryStore};
8use rusqlite::{params, Connection};
9use stack_ids::TraceCtx;
10
11// ─── Centralized episode identity helpers ──────────────────────────────
12
13/// Canonical HNSW item key for an episode.
14pub(crate) fn episode_item_key(episode_id: &str) -> String {
15    format!("episode:{episode_id}")
16}
17
18/// Canonical graph node ID for an episode.
19pub(crate) fn episode_node_id(episode_id: &str) -> String {
20    format!("episode:{episode_id}")
21}
22
23/// Resolve the primary (first-created) episode_id for a document.
24/// This is **legacy compatibility** behavior for APIs that still target
25/// a single episode by document_id. Canonical code should use episode_id directly.
26pub(crate) fn resolve_primary_episode_id_legacy(
27    conn: &Connection,
28    document_id: &str,
29) -> Result<Option<String>, MemoryError> {
30    match conn.query_row(
31        "SELECT episode_id FROM episodes WHERE document_id = ?1 ORDER BY created_at ASC LIMIT 1",
32        params![document_id],
33        |row| row.get(0),
34    ) {
35        Ok(id) => Ok(Some(id)),
36        Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
37        Err(err) => Err(MemoryError::Database(err)),
38    }
39}
40
41/// List all episode_ids for a document, ordered by creation time.
42pub(crate) fn list_document_episode_ids(
43    conn: &Connection,
44    document_id: &str,
45) -> Result<Vec<String>, MemoryError> {
46    let mut stmt = conn.prepare(
47        "SELECT episode_id FROM episodes WHERE document_id = ?1 ORDER BY created_at ASC",
48    )?;
49    let ids = stmt
50        .query_map(params![document_id], |row| row.get::<_, String>(0))?
51        .collect::<Result<Vec<_>, _>>()?;
52    Ok(ids)
53}
54
55/// Insert a new episode with an explicit episode_id (canonical path).
56/// Returns the episode_id.
57#[allow(clippy::too_many_arguments)]
58pub(crate) fn create_episode(
59    conn: &Connection,
60    episode_id: &str,
61    document_id: &str,
62    meta: &EpisodeMeta,
63    search_text: &str,
64    embedding_bytes: &[u8],
65    q8_bytes: Option<&[u8]>,
66    trace_id: Option<&str>,
67) -> Result<String, MemoryError> {
68    let cause_ids_json =
69        serde_json::to_string(&meta.cause_ids).map_err(|e| MemoryError::Other(e.to_string()))?;
70    let verification_json = serde_json::to_string(&meta.verification_status)
71        .map_err(|e| MemoryError::Other(e.to_string()))?;
72    #[cfg(feature = "hnsw")]
73    let item_key = episode_item_key(episode_id);
74
75    db::with_transaction(conn, |tx| {
76        let exists: bool = tx.query_row(
77            "SELECT EXISTS(SELECT 1 FROM documents WHERE id = ?1)",
78            params![document_id],
79            |row| row.get(0),
80        )?;
81        if !exists {
82            return Err(MemoryError::DocumentNotFound(document_id.to_string()));
83        }
84
85        tx.execute(
86            "INSERT INTO episodes
87                (episode_id, document_id, cause_ids, effect_type, outcome, confidence,
88                 verification_status, experiment_id, search_text, embedding, embedding_q8,
89                 trace_id, updated_at)
90             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, datetime('now'))",
91            params![
92                episode_id,
93                document_id,
94                cause_ids_json,
95                meta.effect_type,
96                meta.outcome.as_str(),
97                meta.confidence,
98                verification_json,
99                meta.experiment_id,
100                search_text,
101                embedding_bytes,
102                q8_bytes,
103                trace_id
104            ],
105        )?;
106
107        // Insert FTS mapping
108        tx.execute(
109            "INSERT INTO episodes_rowid_map (episode_id, document_id) VALUES (?1, ?2)",
110            params![episode_id, document_id],
111        )?;
112        let fts_rowid: i64 = tx.query_row(
113            "SELECT rowid FROM episodes_rowid_map WHERE episode_id = ?1",
114            params![episode_id],
115            |row| row.get(0),
116        )?;
117        tx.execute(
118            "INSERT INTO episodes_fts (rowid, content) VALUES (?1, ?2)",
119            params![fts_rowid, search_text],
120        )?;
121
122        // Populate normalized causal edges
123        sync_causal_edges(tx, episode_id, &meta.cause_ids)?;
124
125        #[cfg(feature = "hnsw")]
126        db::queue_pending_index_op(tx, &item_key, "episode", IndexOpKind::Upsert)?;
127        Ok(episode_id.to_string())
128    })
129}
130
131/// Legacy compatibility: upsert the primary episode for a document.
132///
133/// If an episode already exists for this document, updates the first one.
134/// Otherwise creates a new one with a deterministic `{document_id}-ep0` episode_id.
135/// Canonical callers should use `create_episode()` with an explicit episode_id instead.
136#[allow(clippy::too_many_arguments)]
137pub(crate) fn upsert_episode(
138    conn: &Connection,
139    document_id: &str,
140    meta: &EpisodeMeta,
141    search_text: &str,
142    embedding_bytes: &[u8],
143    q8_bytes: Option<&[u8]>,
144    trace_id: Option<&str>,
145) -> Result<String, MemoryError> {
146    let cause_ids_json =
147        serde_json::to_string(&meta.cause_ids).map_err(|e| MemoryError::Other(e.to_string()))?;
148    let verification_json = serde_json::to_string(&meta.verification_status)
149        .map_err(|e| MemoryError::Other(e.to_string()))?;
150
151    // Legacy compat: resolve the primary episode for this document
152    let existing_episode_id = resolve_primary_episode_id_legacy(conn, document_id)?;
153
154    let episode_id = existing_episode_id.unwrap_or_else(|| format!("{}-ep0", document_id));
155
156    #[cfg(feature = "hnsw")]
157    let item_key = episode_item_key(&episode_id);
158
159    db::with_transaction(conn, |tx| {
160        // INTENTIONAL: episode may not exist yet on first upsert
161        let old_search_text: Option<String> = tx
162            .query_row(
163                "SELECT search_text FROM episodes WHERE episode_id = ?1",
164                params![episode_id],
165                |row| row.get(0),
166            )
167            .ok();
168        let exists: bool = tx.query_row(
169            "SELECT EXISTS(SELECT 1 FROM documents WHERE id = ?1)",
170            params![document_id],
171            |row| row.get(0),
172        )?;
173        if !exists {
174            return Err(MemoryError::DocumentNotFound(document_id.to_string()));
175        }
176
177        if old_search_text.is_some() {
178            // Update existing episode
179            tx.execute(
180                "UPDATE episodes SET
181                    cause_ids = ?1,
182                    effect_type = ?2,
183                    outcome = ?3,
184                    confidence = ?4,
185                    verification_status = ?5,
186                    experiment_id = ?6,
187                    search_text = ?7,
188                    embedding = ?8,
189                    embedding_q8 = ?9,
190                    trace_id = COALESCE(?10, trace_id),
191                    updated_at = datetime('now')
192                 WHERE episode_id = ?11",
193                params![
194                    cause_ids_json,
195                    meta.effect_type,
196                    meta.outcome.as_str(),
197                    meta.confidence,
198                    verification_json,
199                    meta.experiment_id,
200                    search_text,
201                    embedding_bytes,
202                    q8_bytes,
203                    trace_id,
204                    episode_id
205                ],
206            )?;
207
208            // Update FTS
209            let fts_rowid: i64 = tx.query_row(
210                "SELECT rowid FROM episodes_rowid_map WHERE episode_id = ?1",
211                params![episode_id],
212                |row| row.get(0),
213            )?;
214            if let Some(old_text) = old_search_text {
215                tx.execute(
216                    "INSERT INTO episodes_fts (episodes_fts, rowid, content) VALUES ('delete', ?1, ?2)",
217                    params![fts_rowid, old_text],
218                )?;
219            }
220            tx.execute(
221                "INSERT INTO episodes_fts (rowid, content) VALUES (?1, ?2)",
222                params![fts_rowid, search_text],
223            )?;
224        } else {
225            // Insert new episode
226            tx.execute(
227                "INSERT INTO episodes
228                    (episode_id, document_id, cause_ids, effect_type, outcome, confidence,
229                     verification_status, experiment_id, search_text, embedding, embedding_q8,
230                     trace_id, updated_at)
231                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, datetime('now'))",
232                params![
233                    episode_id,
234                    document_id,
235                    cause_ids_json,
236                    meta.effect_type,
237                    meta.outcome.as_str(),
238                    meta.confidence,
239                    verification_json,
240                    meta.experiment_id,
241                    search_text,
242                    embedding_bytes,
243                    q8_bytes,
244                    trace_id
245                ],
246            )?;
247
248            // Insert FTS mapping
249            tx.execute(
250                "INSERT INTO episodes_rowid_map (episode_id, document_id) VALUES (?1, ?2)",
251                params![episode_id, document_id],
252            )?;
253            let fts_rowid: i64 = tx.query_row(
254                "SELECT rowid FROM episodes_rowid_map WHERE episode_id = ?1",
255                params![episode_id],
256                |row| row.get(0),
257            )?;
258            tx.execute(
259                "INSERT INTO episodes_fts (rowid, content) VALUES (?1, ?2)",
260                params![fts_rowid, search_text],
261            )?;
262        }
263
264        // Sync normalized causal edges
265        sync_causal_edges(tx, &episode_id, &meta.cause_ids)?;
266
267        #[cfg(feature = "hnsw")]
268        db::queue_pending_index_op(tx, &item_key, "episode", IndexOpKind::Upsert)?;
269        Ok(episode_id.to_string())
270    })
271}
272
273/// Synchronize the episode_causes table with the given cause_ids.
274fn sync_causal_edges(
275    tx: &rusqlite::Transaction<'_>,
276    episode_id: &str,
277    cause_ids: &[String],
278) -> Result<(), MemoryError> {
279    tx.execute(
280        "DELETE FROM episode_causes WHERE episode_id = ?1",
281        params![episode_id],
282    )?;
283    for (ordinal, cause_id) in cause_ids.iter().enumerate() {
284        tx.execute(
285            "INSERT OR IGNORE INTO episode_causes (episode_id, cause_node_id, ordinal)
286             VALUES (?1, ?2, ?3)",
287            params![episode_id, cause_id, ordinal as i64],
288        )?;
289    }
290    Ok(())
291}
292
293/// Legacy compatibility: update the primary episode's outcome for a document.
294/// Resolves the first-created episode and delegates to `update_episode_outcome_by_id`.
295#[allow(clippy::too_many_arguments)]
296pub(crate) fn update_episode_outcome(
297    conn: &Connection,
298    document_id: &str,
299    outcome: EpisodeOutcome,
300    confidence: f32,
301    experiment_id: Option<&str>,
302    verification_status: &VerificationStatus,
303    search_text: &str,
304    embedding_bytes: &[u8],
305    q8_bytes: Option<&[u8]>,
306) -> Result<(), MemoryError> {
307    // Legacy compat: resolve the primary episode for this document
308    let episode_id = resolve_primary_episode_id_legacy(conn, document_id)?
309        .ok_or_else(|| MemoryError::DocumentNotFound(document_id.to_string()))?;
310
311    update_episode_outcome_by_id(
312        conn,
313        &episode_id,
314        outcome,
315        confidence,
316        experiment_id,
317        verification_status,
318        search_text,
319        embedding_bytes,
320        q8_bytes,
321    )
322}
323
324/// Update the outcome of an episode by its episode_id (canonical path).
325#[allow(clippy::too_many_arguments)]
326pub(crate) fn update_episode_outcome_by_id(
327    conn: &Connection,
328    episode_id: &str,
329    outcome: EpisodeOutcome,
330    confidence: f32,
331    experiment_id: Option<&str>,
332    verification_status: &VerificationStatus,
333    search_text: &str,
334    embedding_bytes: &[u8],
335    q8_bytes: Option<&[u8]>,
336) -> Result<(), MemoryError> {
337    let verification_json = serde_json::to_string(verification_status)
338        .map_err(|e| MemoryError::Other(e.to_string()))?;
339    #[cfg(feature = "hnsw")]
340    let item_key = episode_item_key(episode_id);
341
342    db::with_transaction(conn, |tx| {
343        let old_search_text: String = tx
344            .query_row(
345                "SELECT search_text FROM episodes WHERE episode_id = ?1",
346                params![episode_id],
347                |row| row.get(0),
348            )
349            .map_err(|e| MemoryError::EpisodeNotFound(format!("{}: {e}", episode_id)))?;
350        let fts_rowid: i64 = tx.query_row(
351            "SELECT rowid FROM episodes_rowid_map WHERE episode_id = ?1",
352            params![episode_id],
353            |row| row.get(0),
354        )?;
355
356        tx.execute(
357            "INSERT INTO episodes_fts (episodes_fts, rowid, content) VALUES ('delete', ?1, ?2)",
358            params![fts_rowid, old_search_text],
359        )?;
360        tx.execute(
361            "UPDATE episodes
362             SET outcome = ?1,
363                 confidence = ?2,
364                 experiment_id = COALESCE(?3, experiment_id),
365                 verification_status = ?4,
366                 search_text = ?5,
367                 embedding = ?6,
368                 embedding_q8 = ?7,
369                 updated_at = datetime('now')
370             WHERE episode_id = ?8",
371            params![
372                outcome.as_str(),
373                confidence,
374                experiment_id,
375                verification_json,
376                search_text,
377                embedding_bytes,
378                q8_bytes,
379                episode_id
380            ],
381        )?;
382        tx.execute(
383            "INSERT INTO episodes_fts (rowid, content) VALUES (?1, ?2)",
384            params![fts_rowid, search_text],
385        )?;
386
387        #[cfg(feature = "hnsw")]
388        db::queue_pending_index_op(tx, &item_key, "episode", IndexOpKind::Upsert)?;
389        Ok(())
390    })
391}
392
393pub(crate) fn search_episodes(
394    conn: &Connection,
395    effect_type: Option<&str>,
396    outcome: Option<&EpisodeOutcome>,
397    limit: usize,
398) -> Result<Vec<(String, EpisodeMeta)>, MemoryError> {
399    let effect_type = effect_type.map(ToOwned::to_owned);
400    let outcome = outcome.map(|value| value.as_str().to_string());
401
402    let mut sql = String::from(
403        "SELECT episode_id, document_id, cause_ids, effect_type, outcome, confidence, verification_status, experiment_id
404         FROM episodes
405         WHERE 1 = 1",
406    );
407    let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
408
409    if let Some(effect_type) = &effect_type {
410        sql.push_str(&format!(" AND effect_type = ?{}", params.len() + 1));
411        params.push(Box::new(effect_type.clone()));
412    }
413    if let Some(outcome) = &outcome {
414        sql.push_str(&format!(" AND outcome = ?{}", params.len() + 1));
415        params.push(Box::new(outcome.clone()));
416    }
417    sql.push_str(&format!(" ORDER BY updated_at DESC LIMIT {}", limit));
418
419    let param_refs: Vec<&dyn rusqlite::types::ToSql> =
420        params.iter().map(|value| value.as_ref()).collect();
421    let mut stmt = conn.prepare(&sql)?;
422    let rows = stmt
423        .query_map(&*param_refs, |row| {
424            Ok((
425                row.get::<_, String>(0)?,
426                row.get::<_, String>(1)?,
427                row.get::<_, String>(2)?,
428                row.get::<_, String>(3)?,
429                row.get::<_, String>(4)?,
430                row.get::<_, f32>(5)?,
431                row.get::<_, String>(6)?,
432                row.get::<_, Option<String>>(7)?,
433            ))
434        })?
435        .collect::<Result<Vec<_>, _>>()?;
436
437    rows.into_iter()
438        .map(
439            |(
440                _episode_id,
441                document_id,
442                cause_ids_raw,
443                effect_type,
444                outcome_raw,
445                confidence,
446                verification_status_raw,
447                experiment_id,
448            )| {
449                Ok((
450                    document_id.clone(),
451                    EpisodeMeta {
452                        cause_ids: db::parse_string_list_json(
453                            "episodes",
454                            &document_id,
455                            "cause_ids",
456                            &cause_ids_raw,
457                        )?,
458                        effect_type,
459                        outcome: db::parse_episode_outcome(&document_id, &outcome_raw)?,
460                        confidence,
461                        verification_status: db::parse_verification_status(
462                            &document_id,
463                            &verification_status_raw,
464                        )?,
465                        experiment_id,
466                    },
467                ))
468            },
469        )
470        .collect()
471}
472
473/// Get episode by episode_id.
474pub(crate) fn get_episode(
475    conn: &Connection,
476    episode_id: &str,
477) -> Result<Option<(String, EpisodeMeta)>, MemoryError> {
478    let row = conn.query_row(
479        "SELECT document_id, cause_ids, effect_type, outcome, confidence, verification_status, experiment_id
480         FROM episodes
481         WHERE episode_id = ?1",
482        params![episode_id],
483        |row| {
484            Ok((
485                row.get::<_, String>(0)?,
486                row.get::<_, String>(1)?,
487                row.get::<_, String>(2)?,
488                row.get::<_, String>(3)?,
489                row.get::<_, f32>(4)?,
490                row.get::<_, String>(5)?,
491                row.get::<_, Option<String>>(6)?,
492            ))
493        },
494    );
495
496    match row {
497        Ok((
498            document_id,
499            cause_ids_raw,
500            effect_type,
501            outcome_raw,
502            confidence,
503            verification_status_raw,
504            experiment_id,
505        )) => Ok(Some((
506            document_id.clone(),
507            EpisodeMeta {
508                cause_ids: db::parse_string_list_json(
509                    "episodes",
510                    &document_id,
511                    "cause_ids",
512                    &cause_ids_raw,
513                )?,
514                effect_type,
515                outcome: db::parse_episode_outcome(&document_id, &outcome_raw)?,
516                confidence,
517                verification_status: db::parse_verification_status(
518                    &document_id,
519                    &verification_status_raw,
520                )?,
521                experiment_id,
522            },
523        ))),
524        Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
525        Err(err) => Err(MemoryError::Database(err)),
526    }
527}
528
529/// Legacy compatibility: load the primary episode's metadata for a document.
530/// Returns the first-created episode's metadata, or None if no episodes exist.
531pub(crate) fn load_episode_meta(
532    conn: &Connection,
533    document_id: &str,
534) -> Result<Option<EpisodeMeta>, MemoryError> {
535    let row = conn.query_row(
536        "SELECT cause_ids, effect_type, outcome, confidence, verification_status, experiment_id
537         FROM episodes
538         WHERE document_id = ?1
539         ORDER BY created_at ASC
540         LIMIT 1",
541        params![document_id],
542        |row| {
543            Ok((
544                row.get::<_, String>(0)?,
545                row.get::<_, String>(1)?,
546                row.get::<_, String>(2)?,
547                row.get::<_, f32>(3)?,
548                row.get::<_, String>(4)?,
549                row.get::<_, Option<String>>(5)?,
550            ))
551        },
552    );
553
554    match row {
555        Ok((
556            cause_ids_raw,
557            effect_type,
558            outcome_raw,
559            confidence,
560            verification_status_raw,
561            experiment_id,
562        )) => Ok(Some(EpisodeMeta {
563            cause_ids: db::parse_string_list_json(
564                "episodes",
565                document_id,
566                "cause_ids",
567                &cause_ids_raw,
568            )?,
569            effect_type,
570            outcome: db::parse_episode_outcome(document_id, &outcome_raw)?,
571            confidence,
572            verification_status: db::parse_verification_status(
573                document_id,
574                &verification_status_raw,
575            )?,
576            experiment_id,
577        })),
578        Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
579        Err(err) => Err(MemoryError::Database(err)),
580    }
581}
582
583pub(crate) fn load_episode_context(
584    conn: &Connection,
585    document_id: &str,
586) -> Result<(String, String), MemoryError> {
587    let title: String = conn
588        .query_row(
589            "SELECT title FROM documents WHERE id = ?1",
590            params![document_id],
591            |row| row.get(0),
592        )
593        .map_err(|e| MemoryError::DocumentNotFound(format!("{}: {e}", document_id)))?;
594
595    let mut stmt =
596        conn.prepare("SELECT content FROM chunks WHERE document_id = ?1 ORDER BY chunk_index ASC")?;
597    let chunks = stmt
598        .query_map(params![document_id], |row| row.get::<_, String>(0))?
599        .collect::<Result<Vec<_>, _>>()?;
600
601    Ok((title, chunks.join("\n")))
602}
603
604impl MemoryStore {
605    /// Ingest or update a causal episode attached to a document.
606    ///
607    /// The document must already exist. Existing episodes keep their original `created_at`
608    /// timestamp while their searchable text, outcome state, verification metadata, embeddings,
609    /// and `updated_at` are refreshed.
610    pub async fn ingest_episode(
611        &self,
612        document_id: &str,
613        meta: &EpisodeMeta,
614    ) -> Result<String, MemoryError> {
615        self.ingest_episode_with_trace(document_id, meta, None)
616            .await
617    }
618
619    /// Ingest a causal episode with optional trace metadata. Returns the episode_id.
620    pub async fn ingest_episode_with_trace(
621        &self,
622        document_id: &str,
623        meta: &EpisodeMeta,
624        trace_ctx: Option<&TraceCtx>,
625    ) -> Result<String, MemoryError> {
626        self.validate_content("episodes.effect_type", &meta.effect_type)?;
627        Self::validate_confidence(meta.confidence)?;
628        let doc_id = document_id.to_string();
629        let meta = meta.clone();
630        let (document_title, document_context) = self
631            .with_read_conn(move |conn| load_episode_context(conn, &doc_id))
632            .await?;
633        let search_text = build_episode_search_text(&document_title, &document_context, &meta);
634        let embedding = self.embed_text_internal(&search_text).await?;
635        self.validate_embedding_dimensions(&embedding)?;
636        let embedding_bytes = db::embedding_to_bytes(&embedding);
637        // INTENTIONAL: q8 quantization is an optional search optimization; missing q8 is non-fatal
638        let q8_bytes = Quantizer::new(self.inner.config.embedding.dimensions)
639            .quantize(&embedding)
640            .map(|vector| quantize::pack_quantized(&vector))
641            .ok();
642        let trace_id_owned = trace_ctx.map(|value| value.trace_id.clone());
643
644        let doc_id = document_id.to_string();
645        let episode_id = self
646            .with_write_conn(move |conn| {
647                upsert_episode(
648                    conn,
649                    &doc_id,
650                    &meta,
651                    &search_text,
652                    &embedding_bytes,
653                    q8_bytes.as_deref(),
654                    trace_id_owned.as_deref(),
655                )
656            })
657            .await?;
658
659        #[cfg(feature = "hnsw")]
660        self.sync_pending_hnsw_ops_best_effort("ingest_episode")
661            .await;
662
663        Ok(episode_id)
664    }
665
666    /// Create a new episode with an explicit episode_id. Returns the episode_id.
667    pub async fn create_episode(
668        &self,
669        episode_id: &str,
670        document_id: &str,
671        meta: &EpisodeMeta,
672    ) -> Result<String, MemoryError> {
673        self.create_episode_with_trace(episode_id, document_id, meta, None)
674            .await
675    }
676
677    /// Create a new episode with an explicit episode_id and optional trace metadata.
678    pub async fn create_episode_with_trace(
679        &self,
680        episode_id: &str,
681        document_id: &str,
682        meta: &EpisodeMeta,
683        trace_ctx: Option<&TraceCtx>,
684    ) -> Result<String, MemoryError> {
685        self.validate_content("episodes.effect_type", &meta.effect_type)?;
686        Self::validate_confidence(meta.confidence)?;
687        let doc_id = document_id.to_string();
688        let meta = meta.clone();
689        let (document_title, document_context) = self
690            .with_read_conn(move |conn| load_episode_context(conn, &doc_id))
691            .await?;
692        let search_text = build_episode_search_text(&document_title, &document_context, &meta);
693        let embedding = self.embed_text_internal(&search_text).await?;
694        self.validate_embedding_dimensions(&embedding)?;
695        let embedding_bytes = db::embedding_to_bytes(&embedding);
696        // INTENTIONAL: q8 quantization is an optional search optimization; missing q8 is non-fatal
697        let q8_bytes = Quantizer::new(self.inner.config.embedding.dimensions)
698            .quantize(&embedding)
699            .map(|vector| quantize::pack_quantized(&vector))
700            .ok();
701        let trace_id_owned = trace_ctx.map(|value| value.trace_id.clone());
702
703        let ep_id = episode_id.to_string();
704        let doc_id = document_id.to_string();
705        let created_ep_id = self
706            .with_write_conn(move |conn| {
707                crate::episodes::create_episode(
708                    conn,
709                    &ep_id,
710                    &doc_id,
711                    &meta,
712                    &search_text,
713                    &embedding_bytes,
714                    q8_bytes.as_deref(),
715                    trace_id_owned.as_deref(),
716                )
717            })
718            .await?;
719
720        #[cfg(feature = "hnsw")]
721        self.sync_pending_hnsw_ops_best_effort("create_episode")
722            .await;
723
724        Ok(created_ep_id)
725    }
726
727    /// Retrieve an episode by its episode_id.
728    pub async fn get_episode(
729        &self,
730        episode_id: &str,
731    ) -> Result<Option<(String, EpisodeMeta)>, MemoryError> {
732        let ep_id = episode_id.to_string();
733        self.with_read_conn(move |conn| get_episode(conn, &ep_id))
734            .await
735    }
736
737    /// Update the outcome of an episode by its episode_id.
738    pub async fn update_episode_outcome_by_id(
739        &self,
740        episode_id: &str,
741        outcome: EpisodeOutcome,
742        confidence: f32,
743        experiment_id: Option<&str>,
744    ) -> Result<(), MemoryError> {
745        Self::validate_confidence(confidence)?;
746        let ep_id = episode_id.to_string();
747        let ep_id_clone = ep_id.clone();
748        let (doc_id, current_meta) = self
749            .with_read_conn(move |conn| {
750                get_episode(conn, &ep_id_clone)?
751                    .ok_or_else(|| MemoryError::EpisodeNotFound(ep_id_clone.clone()))
752            })
753            .await?;
754
755        let experiment_id_owned = experiment_id.map(|value| value.to_string());
756        let verification_status =
757            verification_status_for_outcome(&outcome, experiment_id_owned.as_deref());
758        let updated_meta = EpisodeMeta {
759            cause_ids: current_meta.cause_ids,
760            effect_type: current_meta.effect_type,
761            outcome: outcome.clone(),
762            confidence,
763            verification_status: verification_status.clone(),
764            experiment_id: experiment_id_owned.clone().or(current_meta.experiment_id),
765        };
766
767        let (document_title, document_context) = self
768            .with_read_conn(move |conn| load_episode_context(conn, &doc_id))
769            .await?;
770        let search_text =
771            build_episode_search_text(&document_title, &document_context, &updated_meta);
772        let embedding = self.embed_text_internal(&search_text).await?;
773        self.validate_embedding_dimensions(&embedding)?;
774        let embedding_bytes = db::embedding_to_bytes(&embedding);
775        // INTENTIONAL: q8 quantization is an optional search optimization; missing q8 is non-fatal
776        let q8_bytes = Quantizer::new(self.inner.config.embedding.dimensions)
777            .quantize(&embedding)
778            .map(|vector| quantize::pack_quantized(&vector))
779            .ok();
780
781        self.with_write_conn(move |conn| {
782            crate::episodes::update_episode_outcome_by_id(
783                conn,
784                &ep_id,
785                outcome,
786                confidence,
787                experiment_id_owned.as_deref(),
788                &verification_status,
789                &search_text,
790                &embedding_bytes,
791                q8_bytes.as_deref(),
792            )
793        })
794        .await?;
795
796        #[cfg(feature = "hnsw")]
797        self.sync_pending_hnsw_ops_best_effort("update_episode_outcome_by_id")
798            .await;
799
800        Ok(())
801    }
802
803    /// Update the outcome of an existing episode.
804    pub async fn update_episode_outcome(
805        &self,
806        document_id: &str,
807        outcome: EpisodeOutcome,
808        confidence: f32,
809        experiment_id: Option<&str>,
810    ) -> Result<(), MemoryError> {
811        Self::validate_confidence(confidence)?;
812        let doc_id = document_id.to_string();
813        let current_meta = self
814            .with_read_conn(move |conn| load_episode_meta(conn, &doc_id))
815            .await?
816            .ok_or_else(|| MemoryError::DocumentNotFound(document_id.to_string()))?;
817
818        let experiment_id_owned = experiment_id.map(|value| value.to_string());
819        let verification_status =
820            verification_status_for_outcome(&outcome, experiment_id_owned.as_deref());
821        let updated_meta = EpisodeMeta {
822            cause_ids: current_meta.cause_ids,
823            effect_type: current_meta.effect_type,
824            outcome: outcome.clone(),
825            confidence,
826            verification_status: verification_status.clone(),
827            experiment_id: experiment_id_owned.clone().or(current_meta.experiment_id),
828        };
829
830        let doc_id = document_id.to_string();
831        let (document_title, document_context) = self
832            .with_read_conn(move |conn| load_episode_context(conn, &doc_id))
833            .await?;
834        let search_text =
835            build_episode_search_text(&document_title, &document_context, &updated_meta);
836        let embedding = self.embed_text_internal(&search_text).await?;
837        self.validate_embedding_dimensions(&embedding)?;
838        let embedding_bytes = db::embedding_to_bytes(&embedding);
839        // INTENTIONAL: q8 quantization is an optional search optimization; missing q8 is non-fatal
840        let q8_bytes = Quantizer::new(self.inner.config.embedding.dimensions)
841            .quantize(&embedding)
842            .map(|vector| quantize::pack_quantized(&vector))
843            .ok();
844
845        let doc_id = document_id.to_string();
846        self.with_write_conn(move |conn| {
847            crate::episodes::update_episode_outcome(
848                conn,
849                &doc_id,
850                outcome,
851                confidence,
852                experiment_id_owned.as_deref(),
853                &verification_status,
854                &search_text,
855                &embedding_bytes,
856                q8_bytes.as_deref(),
857            )
858        })
859        .await?;
860
861        #[cfg(feature = "hnsw")]
862        self.sync_pending_hnsw_ops_best_effort("update_episode_outcome")
863            .await;
864
865        Ok(())
866    }
867
868    /// Search for episodes by effect_type and/or outcome.
869    pub async fn search_episodes(
870        &self,
871        effect_type: Option<&str>,
872        outcome: Option<&EpisodeOutcome>,
873        limit: usize,
874    ) -> Result<Vec<(String, EpisodeMeta)>, MemoryError> {
875        let et = effect_type.map(|s| s.to_string());
876        let outcome_owned = outcome.cloned();
877
878        self.with_read_conn(move |conn| {
879            search_episodes(conn, et.as_deref(), outcome_owned.as_ref(), limit)
880        })
881        .await
882    }
883}