Skip to main content

semantic_memory/
episodes.rs

1use crate::db;
2#[cfg(feature = "hnsw")]
3use crate::db::IndexOpKind;
4use crate::error::MemoryError;
5use crate::quantize::{self, Quantizer};
6use crate::types::{EpisodeMeta, EpisodeOutcome, VerificationStatus};
7use crate::{build_episode_search_text, verification_status_for_outcome, MemoryStore};
8use rusqlite::{params, Connection};
9use stack_ids::{DigestBuilder, TraceCtx};
10use std::collections::BTreeSet;
11
12// ─── Centralized episode identity helpers ──────────────────────────────
13
14/// Canonical HNSW item key for an episode.
15pub(crate) fn episode_item_key(episode_id: &str) -> String {
16    format!("episode:{episode_id}")
17}
18
19/// Canonical graph node ID for an episode.
20pub(crate) fn episode_node_id(episode_id: &str) -> String {
21    format!("episode:{episode_id}")
22}
23
24/// Resolve the primary (first-created) episode_id for a document.
25/// This is **legacy compatibility** behavior for APIs that still target
26/// a single episode by document_id. Canonical code should use episode_id directly.
27pub(crate) fn resolve_primary_episode_id_legacy(
28    conn: &Connection,
29    document_id: &str,
30) -> Result<Option<String>, MemoryError> {
31    match conn.query_row(
32        "SELECT episode_id FROM episodes WHERE document_id = ?1 ORDER BY created_at ASC LIMIT 1",
33        params![document_id],
34        |row| row.get(0),
35    ) {
36        Ok(id) => Ok(Some(id)),
37        Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
38        Err(err) => Err(MemoryError::Database(err)),
39    }
40}
41
42/// List all episode_ids for a document, ordered by creation time.
43pub(crate) fn list_document_episode_ids(
44    conn: &Connection,
45    document_id: &str,
46) -> Result<Vec<String>, MemoryError> {
47    let mut stmt = conn.prepare(
48        "SELECT episode_id FROM episodes WHERE document_id = ?1 ORDER BY created_at ASC",
49    )?;
50    let ids = stmt
51        .query_map(params![document_id], |row| row.get::<_, String>(0))?
52        .collect::<Result<Vec<_>, _>>()?;
53    Ok(ids)
54}
55
56/// Insert a new episode with an explicit episode_id (canonical path).
57/// Returns the episode_id.
58#[allow(clippy::too_many_arguments)]
59pub(crate) fn create_episode(
60    conn: &Connection,
61    episode_id: &str,
62    document_id: &str,
63    meta: &EpisodeMeta,
64    search_text: &str,
65    embedding_bytes: &[u8],
66    q8_bytes: Option<&[u8]>,
67    trace_id: Option<&str>,
68) -> Result<String, MemoryError> {
69    let cause_ids_json =
70        serde_json::to_string(&meta.cause_ids).map_err(|e| MemoryError::Other(e.to_string()))?;
71    let verification_json = serde_json::to_string(&meta.verification_status)
72        .map_err(|e| MemoryError::Other(e.to_string()))?;
73    let item_key = episode_item_key(episode_id);
74
75    db::with_transaction(conn, |tx| {
76        let exists: bool = tx.query_row(
77            "SELECT EXISTS(SELECT 1 FROM documents WHERE id = ?1)",
78            params![document_id],
79            |row| row.get(0),
80        )?;
81        if !exists {
82            return Err(MemoryError::DocumentNotFound(document_id.to_string()));
83        }
84
85        tx.execute(
86            "INSERT INTO episodes
87                (episode_id, document_id, cause_ids, effect_type, outcome, confidence,
88                 verification_status, experiment_id, search_text, embedding, embedding_q8,
89                 trace_id, updated_at)
90             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, datetime('now'))",
91            params![
92                episode_id,
93                document_id,
94                cause_ids_json,
95                meta.effect_type,
96                meta.outcome.as_str(),
97                meta.confidence,
98                verification_json,
99                meta.experiment_id,
100                search_text,
101                embedding_bytes,
102                q8_bytes,
103                trace_id
104            ],
105        )?;
106
107        // Insert FTS mapping
108        tx.execute(
109            "INSERT INTO episodes_rowid_map (episode_id, document_id) VALUES (?1, ?2)",
110            params![episode_id, document_id],
111        )?;
112        let fts_rowid: i64 = tx.query_row(
113            "SELECT rowid FROM episodes_rowid_map WHERE episode_id = ?1",
114            params![episode_id],
115            |row| row.get(0),
116        )?;
117        tx.execute(
118            "INSERT INTO episodes_fts (rowid, content) VALUES (?1, ?2)",
119            params![fts_rowid, search_text],
120        )?;
121
122        // Populate normalized causal edges
123        sync_causal_edges(tx, episode_id, &meta.cause_ids)?;
124
125        #[cfg(feature = "hnsw")]
126        db::queue_pending_index_op(tx, &item_key, "episode", IndexOpKind::Upsert)?;
127        db::invalidate_derived_vector_artifact(tx, &item_key)?;
128        Ok(episode_id.to_string())
129    })
130}
131
132/// Legacy compatibility: upsert the primary episode for a document.
133///
134/// If an episode already exists for this document, updates the first one.
135/// Otherwise creates a new one with a deterministic `{document_id}-ep0` episode_id.
136/// Canonical callers should use `create_episode()` with an explicit episode_id instead.
137#[allow(clippy::too_many_arguments)]
138pub(crate) fn upsert_episode(
139    conn: &Connection,
140    document_id: &str,
141    meta: &EpisodeMeta,
142    search_text: &str,
143    embedding_bytes: &[u8],
144    q8_bytes: Option<&[u8]>,
145    trace_id: Option<&str>,
146) -> Result<String, MemoryError> {
147    let cause_ids_json =
148        serde_json::to_string(&meta.cause_ids).map_err(|e| MemoryError::Other(e.to_string()))?;
149    let verification_json = serde_json::to_string(&meta.verification_status)
150        .map_err(|e| MemoryError::Other(e.to_string()))?;
151
152    // Legacy compat: resolve the primary episode for this document
153    let existing_episode_id = resolve_primary_episode_id_legacy(conn, document_id)?;
154
155    let episode_id = existing_episode_id.unwrap_or_else(|| format!("{}-ep0", document_id));
156
157    let item_key = episode_item_key(&episode_id);
158
159    db::with_transaction(conn, |tx| {
160        // INTENTIONAL: episode may not exist yet on first upsert
161        let old_search_text: Option<String> = tx
162            .query_row(
163                "SELECT search_text FROM episodes WHERE episode_id = ?1",
164                params![episode_id],
165                |row| row.get(0),
166            )
167            .ok();
168        let exists: bool = tx.query_row(
169            "SELECT EXISTS(SELECT 1 FROM documents WHERE id = ?1)",
170            params![document_id],
171            |row| row.get(0),
172        )?;
173        if !exists {
174            return Err(MemoryError::DocumentNotFound(document_id.to_string()));
175        }
176
177        if old_search_text.is_some() {
178            // ── Bitemporal append-supersede via UPDATE ──────────────────────────────
179            // Read the current row's fact_digest (if any) so we can mark what it supersedes.
180            let prior_fact_digest: Option<String> = tx
181                .query_row(
182                    "SELECT fact_digest FROM episodes WHERE episode_id = ?1
183                     ORDER BY recorded_time DESC LIMIT 1",
184                    params![episode_id],
185                    |row| row.get(0),
186                )
187                .ok()
188                .flatten();
189
190            // Compute content-addressed digest of the new fact payload.
191            let mut digest_builder = DigestBuilder::new();
192            digest_builder.update_str("semantic-memory.episode.v1");
193            digest_builder.separator();
194            digest_builder.update_str(&cause_ids_json);
195            digest_builder.separator();
196            digest_builder.update_str(meta.effect_type.as_str());
197            digest_builder.separator();
198            digest_builder.update_str(meta.outcome.as_str());
199            digest_builder.separator();
200            digest_builder.update(&meta.confidence.to_le_bytes());
201            let new_fact_digest = format!("blake3:{}", digest_builder.finalize().hex());
202
203            // valid_time: when this episode fact is true in the domain
204            let valid_time_sql: Option<String> =
205                meta.valid_time.map(|dt| format!("'{}'", dt.to_rfc3339()));
206
207            // Advance the row in place: new recorded_time, new valid_time,
208            // superseded_by chains to prior_fact_digest, fact_digest is the new digest.
209            tx.execute(
210                &format!(
211                    "UPDATE episodes SET
212                         cause_ids = ?1,
213                         effect_type = ?2,
214                         outcome = ?3,
215                         confidence = ?4,
216                         verification_status = ?5,
217                         experiment_id = ?6,
218                         search_text = ?7,
219                         embedding = ?8,
220                         embedding_q8 = ?9,
221                         trace_id = COALESCE(?10, trace_id),
222                         updated_at = datetime('now'),
223                         valid_time = {},
224                         recorded_time = datetime('now'),
225                         superseded_by = ?11,
226                         fact_digest = ?12
227                     WHERE episode_id = ?13",
228                    valid_time_sql.as_deref().unwrap_or("NULL"),
229                ),
230                params![
231                    cause_ids_json,
232                    meta.effect_type,
233                    meta.outcome.as_str(),
234                    meta.confidence,
235                    verification_json,
236                    meta.experiment_id,
237                    search_text,
238                    embedding_bytes,
239                    q8_bytes,
240                    trace_id,
241                    prior_fact_digest,
242                    new_fact_digest,
243                    episode_id,
244                ],
245            )?;
246            // FTS entry already exists for this episode; search_text update is a no-op for FTS
247            // since the existing rowid stays the same — the content change is handled by
248            // the text-based search query, not a separate FTS entry.
249        } else {
250            // Insert new episode
251            tx.execute(
252                "INSERT INTO episodes
253                    (episode_id, document_id, cause_ids, effect_type, outcome, confidence,
254                     verification_status, experiment_id, search_text, embedding, embedding_q8,
255                     trace_id, updated_at)
256                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, datetime('now'))",
257                params![
258                    episode_id,
259                    document_id,
260                    cause_ids_json,
261                    meta.effect_type,
262                    meta.outcome.as_str(),
263                    meta.confidence,
264                    verification_json,
265                    meta.experiment_id,
266                    search_text,
267                    embedding_bytes,
268                    q8_bytes,
269                    trace_id
270                ],
271            )?;
272
273            // Insert FTS mapping
274            tx.execute(
275                "INSERT INTO episodes_rowid_map (episode_id, document_id) VALUES (?1, ?2)",
276                params![episode_id, document_id],
277            )?;
278            let fts_rowid: i64 = tx.query_row(
279                "SELECT rowid FROM episodes_rowid_map WHERE episode_id = ?1",
280                params![episode_id],
281                |row| row.get(0),
282            )?;
283            tx.execute(
284                "INSERT INTO episodes_fts (rowid, content) VALUES (?1, ?2)",
285                params![fts_rowid, search_text],
286            )?;
287        }
288
289        // Sync normalized causal edges
290        sync_causal_edges(tx, &episode_id, &meta.cause_ids)?;
291
292        #[cfg(feature = "hnsw")]
293        db::queue_pending_index_op(tx, &item_key, "episode", IndexOpKind::Upsert)?;
294        db::invalidate_derived_vector_artifact(tx, &item_key)?;
295        Ok(episode_id.to_string())
296    })
297}
298
299/// Synchronize the episode_causes table with the given cause_ids.
300fn sync_causal_edges(
301    tx: &rusqlite::Transaction<'_>,
302    episode_id: &str,
303    cause_ids: &[String],
304) -> Result<(), MemoryError> {
305    let mut seen = BTreeSet::new();
306    for cause_id in cause_ids {
307        if !seen.insert(cause_id) {
308            return Err(MemoryError::InvalidConfig {
309                field: "episodes.cause_ids",
310                reason: format!("duplicate cause id: {cause_id}"),
311            });
312        }
313    }
314    tx.execute(
315        "DELETE FROM episode_causes WHERE episode_id = ?1",
316        params![episode_id],
317    )?;
318    for (ordinal, cause_id) in cause_ids.iter().enumerate() {
319        tx.execute(
320            "INSERT INTO episode_causes (episode_id, cause_node_id, ordinal)
321             VALUES (?1, ?2, ?3)",
322            params![episode_id, cause_id, ordinal as i64],
323        )?;
324    }
325    Ok(())
326}
327
328/// Legacy compatibility: update the primary episode's outcome for a document.
329/// Resolves the first-created episode and delegates to `update_episode_outcome_by_id`.
330#[allow(clippy::too_many_arguments)]
331pub(crate) fn update_episode_outcome(
332    conn: &Connection,
333    document_id: &str,
334    outcome: EpisodeOutcome,
335    confidence: f32,
336    experiment_id: Option<&str>,
337    verification_status: &VerificationStatus,
338    search_text: &str,
339    embedding_bytes: &[u8],
340    q8_bytes: Option<&[u8]>,
341) -> Result<(), MemoryError> {
342    // Legacy compat: resolve the primary episode for this document
343    let episode_id = resolve_primary_episode_id_legacy(conn, document_id)?
344        .ok_or_else(|| MemoryError::DocumentNotFound(document_id.to_string()))?;
345
346    update_episode_outcome_by_id(
347        conn,
348        &episode_id,
349        outcome,
350        confidence,
351        experiment_id,
352        verification_status,
353        search_text,
354        embedding_bytes,
355        q8_bytes,
356    )
357}
358
359/// Update the outcome of an episode by its episode_id (canonical path).
360#[allow(clippy::too_many_arguments)]
361pub(crate) fn update_episode_outcome_by_id(
362    conn: &Connection,
363    episode_id: &str,
364    outcome: EpisodeOutcome,
365    confidence: f32,
366    experiment_id: Option<&str>,
367    verification_status: &VerificationStatus,
368    search_text: &str,
369    embedding_bytes: &[u8],
370    q8_bytes: Option<&[u8]>,
371) -> Result<(), MemoryError> {
372    let verification_json = serde_json::to_string(verification_status)
373        .map_err(|e| MemoryError::Other(e.to_string()))?;
374    let item_key = episode_item_key(episode_id);
375
376    db::with_transaction(conn, |tx| {
377        let old_search_text: String = tx
378            .query_row(
379                "SELECT search_text FROM episodes WHERE episode_id = ?1",
380                params![episode_id],
381                |row| row.get(0),
382            )
383            .map_err(|e| MemoryError::EpisodeNotFound(format!("{}: {e}", episode_id)))?;
384        let fts_rowid: i64 = tx.query_row(
385            "SELECT rowid FROM episodes_rowid_map WHERE episode_id = ?1",
386            params![episode_id],
387            |row| row.get(0),
388        )?;
389
390        tx.execute(
391            "INSERT INTO episodes_fts (episodes_fts, rowid, content) VALUES ('delete', ?1, ?2)",
392            params![fts_rowid, old_search_text],
393        )?;
394        tx.execute(
395            "UPDATE episodes
396             SET outcome = ?1,
397                 confidence = ?2,
398                 experiment_id = COALESCE(?3, experiment_id),
399                 verification_status = ?4,
400                 search_text = ?5,
401                 embedding = ?6,
402                 embedding_q8 = ?7,
403                 updated_at = datetime('now')
404             WHERE episode_id = ?8",
405            params![
406                outcome.as_str(),
407                confidence,
408                experiment_id,
409                verification_json,
410                search_text,
411                embedding_bytes,
412                q8_bytes,
413                episode_id
414            ],
415        )?;
416        tx.execute(
417            "INSERT INTO episodes_fts (rowid, content) VALUES (?1, ?2)",
418            params![fts_rowid, search_text],
419        )?;
420
421        #[cfg(feature = "hnsw")]
422        db::queue_pending_index_op(tx, &item_key, "episode", IndexOpKind::Upsert)?;
423        db::invalidate_derived_vector_artifact(tx, &item_key)?;
424        Ok(())
425    })
426}
427
428pub(crate) fn search_episodes(
429    conn: &Connection,
430    effect_type: Option<&str>,
431    outcome: Option<&EpisodeOutcome>,
432    limit: usize,
433) -> Result<Vec<(String, EpisodeMeta)>, MemoryError> {
434    const MAX_EPISODE_SEARCH_LIMIT: usize = 1_000;
435    let limit = limit.clamp(1, MAX_EPISODE_SEARCH_LIMIT);
436    let effect_type = effect_type.map(ToOwned::to_owned);
437    let outcome = outcome.map(|value| value.as_str().to_string());
438
439    let mut sql = String::from(
440        "SELECT episode_id, document_id, cause_ids, effect_type, outcome, confidence, verification_status, experiment_id
441         FROM episodes
442         WHERE 1 = 1",
443    );
444    let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
445
446    if let Some(effect_type) = &effect_type {
447        sql.push_str(&format!(" AND effect_type = ?{}", params.len() + 1));
448        params.push(Box::new(effect_type.clone()));
449    }
450    if let Some(outcome) = &outcome {
451        sql.push_str(&format!(" AND outcome = ?{}", params.len() + 1));
452        params.push(Box::new(outcome.clone()));
453    }
454    let limit_param = params.len() + 1;
455    sql.push_str(&format!(" ORDER BY updated_at DESC LIMIT ?{}", limit_param));
456    params.push(Box::new(limit as i64));
457
458    let param_refs: Vec<&dyn rusqlite::types::ToSql> =
459        params.iter().map(|value| value.as_ref()).collect();
460    let mut stmt = conn.prepare(&sql)?;
461    let rows = stmt
462        .query_map(&*param_refs, |row| {
463            Ok((
464                row.get::<_, String>(0)?,
465                row.get::<_, String>(1)?,
466                row.get::<_, String>(2)?,
467                row.get::<_, String>(3)?,
468                row.get::<_, String>(4)?,
469                row.get::<_, f32>(5)?,
470                row.get::<_, String>(6)?,
471                row.get::<_, Option<String>>(7)?,
472            ))
473        })?
474        .collect::<Result<Vec<_>, _>>()?;
475
476    rows.into_iter()
477        .map(
478            |(
479                episode_id,
480                _document_id,
481                cause_ids_raw,
482                effect_type,
483                outcome_raw,
484                confidence,
485                verification_status_raw,
486                experiment_id,
487            )| {
488                Ok((
489                    episode_id.clone(),
490                    EpisodeMeta {
491                        cause_ids: db::parse_string_list_json(
492                            "episodes",
493                            &episode_id,
494                            "cause_ids",
495                            &cause_ids_raw,
496                        )?,
497                        effect_type,
498                        outcome: db::parse_episode_outcome(&episode_id, &outcome_raw)?,
499                        confidence,
500                        verification_status: db::parse_verification_status(
501                            &episode_id,
502                            &verification_status_raw,
503                        )?,
504                        experiment_id,
505                        valid_time: None,
506                        fact_digest: None,
507                    },
508                ))
509            },
510        )
511        .collect()
512}
513
514/// Get episode by episode_id.
515pub(crate) fn get_episode(
516    conn: &Connection,
517    episode_id: &str,
518) -> Result<Option<(String, EpisodeMeta)>, MemoryError> {
519    let row = conn.query_row(
520        "SELECT document_id, cause_ids, effect_type, outcome, confidence, verification_status, experiment_id
521         FROM episodes
522         WHERE episode_id = ?1",
523        params![episode_id],
524        |row| {
525            Ok((
526                row.get::<_, String>(0)?,
527                row.get::<_, String>(1)?,
528                row.get::<_, String>(2)?,
529                row.get::<_, String>(3)?,
530                row.get::<_, f32>(4)?,
531                row.get::<_, String>(5)?,
532                row.get::<_, Option<String>>(6)?,
533            ))
534        },
535    );
536
537    match row {
538        Ok((
539            document_id,
540            cause_ids_raw,
541            effect_type,
542            outcome_raw,
543            confidence,
544            verification_status_raw,
545            experiment_id,
546        )) => Ok(Some((
547            document_id.clone(),
548            EpisodeMeta {
549                cause_ids: db::parse_string_list_json(
550                    "episodes",
551                    episode_id,
552                    "cause_ids",
553                    &cause_ids_raw,
554                )?,
555                effect_type,
556                outcome: db::parse_episode_outcome(episode_id, &outcome_raw)?,
557                confidence,
558                verification_status: db::parse_verification_status(
559                    episode_id,
560                    &verification_status_raw,
561                )?,
562                experiment_id,
563                valid_time: None,
564                fact_digest: None,
565            },
566        ))),
567        Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
568        Err(err) => Err(MemoryError::Database(err)),
569    }
570}
571
572/// Legacy compatibility: load the primary episode's metadata for a document.
573/// Returns the first-created episode's metadata, or None if no episodes exist.
574pub(crate) fn load_episode_meta(
575    conn: &Connection,
576    document_id: &str,
577) -> Result<Option<EpisodeMeta>, MemoryError> {
578    let row = conn.query_row(
579        "SELECT cause_ids, effect_type, outcome, confidence, verification_status, experiment_id
580         FROM episodes
581         WHERE document_id = ?1
582         ORDER BY created_at ASC
583         LIMIT 1",
584        params![document_id],
585        |row| {
586            Ok((
587                row.get::<_, String>(0)?,
588                row.get::<_, String>(1)?,
589                row.get::<_, String>(2)?,
590                row.get::<_, f32>(3)?,
591                row.get::<_, String>(4)?,
592                row.get::<_, Option<String>>(5)?,
593            ))
594        },
595    );
596
597    match row {
598        Ok((
599            cause_ids_raw,
600            effect_type,
601            outcome_raw,
602            confidence,
603            verification_status_raw,
604            experiment_id,
605        )) => Ok(Some(EpisodeMeta {
606            cause_ids: db::parse_string_list_json(
607                "episodes",
608                document_id,
609                "cause_ids",
610                &cause_ids_raw,
611            )?,
612            effect_type,
613            outcome: db::parse_episode_outcome(document_id, &outcome_raw)?,
614            confidence,
615            verification_status: db::parse_verification_status(
616                document_id,
617                &verification_status_raw,
618            )?,
619            experiment_id,
620            valid_time: None,
621            fact_digest: None,
622        })),
623        Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
624        Err(err) => Err(MemoryError::Database(err)),
625    }
626}
627
628pub(crate) fn load_episode_context(
629    conn: &Connection,
630    document_id: &str,
631) -> Result<(String, String), MemoryError> {
632    let title: String = conn
633        .query_row(
634            "SELECT title FROM documents WHERE id = ?1",
635            params![document_id],
636            |row| row.get(0),
637        )
638        .map_err(|e| MemoryError::DocumentNotFound(format!("{}: {e}", document_id)))?;
639
640    let mut stmt =
641        conn.prepare("SELECT content FROM chunks WHERE document_id = ?1 ORDER BY chunk_index ASC")?;
642    let chunks = stmt
643        .query_map(params![document_id], |row| row.get::<_, String>(0))?
644        .collect::<Result<Vec<_>, _>>()?;
645
646    Ok((title, chunks.join("\n")))
647}
648
649impl MemoryStore {
650    /// Ingest or update a causal episode attached to a document.
651    ///
652    /// The document must already exist. Existing episodes keep their original `created_at`
653    /// timestamp while their searchable text, outcome state, verification metadata, embeddings,
654    /// and `updated_at` are refreshed.
655    pub async fn ingest_episode(
656        &self,
657        document_id: &str,
658        meta: &EpisodeMeta,
659    ) -> Result<String, MemoryError> {
660        self.ingest_episode_with_trace(document_id, meta, None)
661            .await
662    }
663
664    /// Ingest a causal episode with optional trace metadata. Returns the episode_id.
665    pub async fn ingest_episode_with_trace(
666        &self,
667        document_id: &str,
668        meta: &EpisodeMeta,
669        trace_ctx: Option<&TraceCtx>,
670    ) -> Result<String, MemoryError> {
671        self.validate_content("episodes.effect_type", &meta.effect_type)?;
672        Self::validate_confidence(meta.confidence)?;
673        let doc_id = document_id.to_string();
674        let meta = meta.clone();
675        let (document_title, document_context) = self
676            .with_read_conn(move |conn| load_episode_context(conn, &doc_id))
677            .await?;
678        let search_text = build_episode_search_text(&document_title, &document_context, &meta);
679        let embedding = self.embed_text_internal(&search_text).await?;
680        self.validate_embedding_dimensions(&embedding)?;
681        let embedding_bytes = db::embedding_to_bytes(&embedding);
682        // INTENTIONAL: q8 quantization is an optional search optimization; missing q8 is non-fatal
683        let q8_bytes = Quantizer::new(self.inner.config.embedding.dimensions)
684            .quantize(&embedding)
685            .map(|vector| quantize::pack_quantized(&vector))
686            .ok();
687        let trace_id_owned = trace_ctx.map(|value| value.trace_id.clone());
688
689        let doc_id = document_id.to_string();
690        let episode_id = self
691            .with_write_conn(move |conn| {
692                upsert_episode(
693                    conn,
694                    &doc_id,
695                    &meta,
696                    &search_text,
697                    &embedding_bytes,
698                    q8_bytes.as_deref(),
699                    trace_id_owned.as_deref(),
700                )
701            })
702            .await?;
703
704        #[cfg(feature = "hnsw")]
705        self.sync_pending_hnsw_ops_best_effort("ingest_episode")
706            .await;
707
708        Ok(episode_id)
709    }
710
711    /// Create a new episode with an explicit episode_id. Returns the episode_id.
712    pub async fn create_episode(
713        &self,
714        episode_id: &str,
715        document_id: &str,
716        meta: &EpisodeMeta,
717    ) -> Result<String, MemoryError> {
718        self.create_episode_with_trace(episode_id, document_id, meta, None)
719            .await
720    }
721
722    /// Create a new episode with an explicit episode_id and optional trace metadata.
723    pub async fn create_episode_with_trace(
724        &self,
725        episode_id: &str,
726        document_id: &str,
727        meta: &EpisodeMeta,
728        trace_ctx: Option<&TraceCtx>,
729    ) -> Result<String, MemoryError> {
730        self.validate_content("episodes.effect_type", &meta.effect_type)?;
731        Self::validate_confidence(meta.confidence)?;
732        let doc_id = document_id.to_string();
733        let meta = meta.clone();
734        let (document_title, document_context) = self
735            .with_read_conn(move |conn| load_episode_context(conn, &doc_id))
736            .await?;
737        let search_text = build_episode_search_text(&document_title, &document_context, &meta);
738        let embedding = self.embed_text_internal(&search_text).await?;
739        self.validate_embedding_dimensions(&embedding)?;
740        let embedding_bytes = db::embedding_to_bytes(&embedding);
741        // INTENTIONAL: q8 quantization is an optional search optimization; missing q8 is non-fatal
742        let q8_bytes = Quantizer::new(self.inner.config.embedding.dimensions)
743            .quantize(&embedding)
744            .map(|vector| quantize::pack_quantized(&vector))
745            .ok();
746        let trace_id_owned = trace_ctx.map(|value| value.trace_id.clone());
747
748        let ep_id = episode_id.to_string();
749        let doc_id = document_id.to_string();
750        let created_ep_id = self
751            .with_write_conn(move |conn| {
752                crate::episodes::create_episode(
753                    conn,
754                    &ep_id,
755                    &doc_id,
756                    &meta,
757                    &search_text,
758                    &embedding_bytes,
759                    q8_bytes.as_deref(),
760                    trace_id_owned.as_deref(),
761                )
762            })
763            .await?;
764
765        #[cfg(feature = "hnsw")]
766        self.sync_pending_hnsw_ops_best_effort("create_episode")
767            .await;
768
769        Ok(created_ep_id)
770    }
771
772    /// Retrieve an episode by its episode_id.
773    pub async fn get_episode(
774        &self,
775        episode_id: &str,
776    ) -> Result<Option<(String, EpisodeMeta)>, MemoryError> {
777        let ep_id = episode_id.to_string();
778        self.with_read_conn(move |conn| get_episode(conn, &ep_id))
779            .await
780    }
781
782    /// Update the outcome of an episode by its episode_id.
783    pub async fn update_episode_outcome_by_id(
784        &self,
785        episode_id: &str,
786        outcome: EpisodeOutcome,
787        confidence: f32,
788        experiment_id: Option<&str>,
789    ) -> Result<(), MemoryError> {
790        Self::validate_confidence(confidence)?;
791        let ep_id = episode_id.to_string();
792        let ep_id_clone = ep_id.clone();
793        let (doc_id, current_meta) = self
794            .with_read_conn(move |conn| {
795                get_episode(conn, &ep_id_clone)?
796                    .ok_or_else(|| MemoryError::EpisodeNotFound(ep_id_clone.clone()))
797            })
798            .await?;
799
800        let experiment_id_owned = experiment_id.map(|value| value.to_string());
801        let verification_status =
802            verification_status_for_outcome(&outcome, experiment_id_owned.as_deref());
803        let updated_meta = EpisodeMeta {
804            cause_ids: current_meta.cause_ids,
805            effect_type: current_meta.effect_type,
806            outcome: outcome.clone(),
807            confidence,
808            verification_status: verification_status.clone(),
809            experiment_id: experiment_id_owned.clone().or(current_meta.experiment_id),
810            valid_time: current_meta.valid_time,
811            fact_digest: current_meta.fact_digest.clone(),
812        };
813
814        let (document_title, document_context) = self
815            .with_read_conn(move |conn| load_episode_context(conn, &doc_id))
816            .await?;
817        let search_text =
818            build_episode_search_text(&document_title, &document_context, &updated_meta);
819        let embedding = self.embed_text_internal(&search_text).await?;
820        self.validate_embedding_dimensions(&embedding)?;
821        let embedding_bytes = db::embedding_to_bytes(&embedding);
822        // INTENTIONAL: q8 quantization is an optional search optimization; missing q8 is non-fatal
823        let q8_bytes = Quantizer::new(self.inner.config.embedding.dimensions)
824            .quantize(&embedding)
825            .map(|vector| quantize::pack_quantized(&vector))
826            .ok();
827
828        self.with_write_conn(move |conn| {
829            crate::episodes::update_episode_outcome_by_id(
830                conn,
831                &ep_id,
832                outcome,
833                confidence,
834                experiment_id_owned.as_deref(),
835                &verification_status,
836                &search_text,
837                &embedding_bytes,
838                q8_bytes.as_deref(),
839            )
840        })
841        .await?;
842
843        #[cfg(feature = "hnsw")]
844        self.sync_pending_hnsw_ops_best_effort("update_episode_outcome_by_id")
845            .await;
846
847        Ok(())
848    }
849
850    /// Update the outcome of an existing episode.
851    pub async fn update_episode_outcome(
852        &self,
853        document_id: &str,
854        outcome: EpisodeOutcome,
855        confidence: f32,
856        experiment_id: Option<&str>,
857    ) -> Result<(), MemoryError> {
858        Self::validate_confidence(confidence)?;
859        let doc_id = document_id.to_string();
860        let current_meta = self
861            .with_read_conn(move |conn| load_episode_meta(conn, &doc_id))
862            .await?
863            .ok_or_else(|| MemoryError::DocumentNotFound(document_id.to_string()))?;
864
865        let experiment_id_owned = experiment_id.map(|value| value.to_string());
866        let verification_status =
867            verification_status_for_outcome(&outcome, experiment_id_owned.as_deref());
868        let updated_meta = EpisodeMeta {
869            cause_ids: current_meta.cause_ids,
870            effect_type: current_meta.effect_type,
871            outcome: outcome.clone(),
872            confidence,
873            verification_status: verification_status.clone(),
874            experiment_id: experiment_id_owned.clone().or(current_meta.experiment_id),
875            valid_time: current_meta.valid_time,
876            fact_digest: current_meta.fact_digest.clone(),
877        };
878
879        let doc_id = document_id.to_string();
880        let (document_title, document_context) = self
881            .with_read_conn(move |conn| load_episode_context(conn, &doc_id))
882            .await?;
883        let search_text =
884            build_episode_search_text(&document_title, &document_context, &updated_meta);
885        let embedding = self.embed_text_internal(&search_text).await?;
886        self.validate_embedding_dimensions(&embedding)?;
887        let embedding_bytes = db::embedding_to_bytes(&embedding);
888        // INTENTIONAL: q8 quantization is an optional search optimization; missing q8 is non-fatal
889        let q8_bytes = Quantizer::new(self.inner.config.embedding.dimensions)
890            .quantize(&embedding)
891            .map(|vector| quantize::pack_quantized(&vector))
892            .ok();
893
894        let doc_id = document_id.to_string();
895        self.with_write_conn(move |conn| {
896            crate::episodes::update_episode_outcome(
897                conn,
898                &doc_id,
899                outcome,
900                confidence,
901                experiment_id_owned.as_deref(),
902                &verification_status,
903                &search_text,
904                &embedding_bytes,
905                q8_bytes.as_deref(),
906            )
907        })
908        .await?;
909
910        #[cfg(feature = "hnsw")]
911        self.sync_pending_hnsw_ops_best_effort("update_episode_outcome")
912            .await;
913
914        Ok(())
915    }
916
917    /// Search for episodes by effect_type and/or outcome.
918    pub async fn search_episodes(
919        &self,
920        effect_type: Option<&str>,
921        outcome: Option<&EpisodeOutcome>,
922        limit: usize,
923    ) -> Result<Vec<(String, EpisodeMeta)>, MemoryError> {
924        let et = effect_type.map(|s| s.to_string());
925        let outcome_owned = outcome.cloned();
926
927        self.with_read_conn(move |conn| {
928            search_episodes(conn, et.as_deref(), outcome_owned.as_ref(), limit)
929        })
930        .await
931    }
932}