semantic-memory 0.5.0

Hybrid semantic search with SQLite, FTS5, and HNSW — built for AI agents
Documentation
use crate::db;
use crate::error::MemoryError;
use crate::knowledge;
use crate::projection_import;
use crate::quantize;
use crate::MemoryStore;

pub(crate) async fn import_envelope(
    store: &MemoryStore,
    envelope: &projection_import::ImportEnvelope,
) -> Result<projection_import::ImportReceipt, MemoryError> {
    envelope.validate()?;

    let (eid, sv, cd) = envelope.dedupe_key();
    let eid = eid.to_string();
    let sv = sv.to_string();
    let cd = cd.to_string();

    let already = {
        let eid_c = eid.clone();
        let sv_c = sv.clone();
        let cd_c = cd.clone();
        store
            .with_read_conn(move |conn| {
                projection_import::check_import_exists(conn, &eid_c, &sv_c, &cd_c)
            })
            .await?
    };
    if already {
        return Ok(projection_import::ImportReceipt {
            envelope_id: envelope.envelope_id.clone(),
            schema_version: envelope.schema_version.clone(),
            content_digest: envelope.content_digest.clone(),
            status: projection_import::ImportStatus::AlreadyImported,
            record_count: envelope.records.len(),
            imported_at: String::new(),
            was_duplicate: true,
            trace_id: envelope.trace_id.clone(),
        });
    }

    let mut prepared = Vec::new();
    for (i, record) in envelope.records.iter().enumerate() {
        let embedding = store.embed_text_internal(record.content_text()).await?;
        store.validate_embedding_dimensions(&embedding)?;
        let embedding_bytes = db::embedding_to_bytes(&embedding);
        let q8_bytes = quantize::Quantizer::new(store.inner.config.embedding.dimensions)
            .quantize(&embedding)
            .map(|qv| quantize::pack_quantized(&qv))
            .ok();
        prepared.push((i, embedding_bytes, q8_bytes));
    }

    let envelope_c = envelope.clone();
    let record_count = envelope.records.len();
    let receipt = store
        .with_write_conn(move |conn| {
            db::with_transaction(conn, |tx| {
                if projection_import::check_import_exists(
                    tx,
                    envelope_c.envelope_id.as_str(),
                    &envelope_c.schema_version,
                    &envelope_c.content_digest,
                )? {
                    return Ok(projection_import::ImportReceipt {
                        envelope_id: envelope_c.envelope_id.clone(),
                        schema_version: envelope_c.schema_version.clone(),
                        content_digest: envelope_c.content_digest.clone(),
                        status: projection_import::ImportStatus::AlreadyImported,
                        record_count,
                        imported_at: String::new(),
                        was_duplicate: true,
                        trace_id: envelope_c.trace_id.clone(),
                    });
                }

                for (i, embedding_bytes, q8_bytes) in &prepared {
                    let record = &envelope_c.records[*i];
                    let provenance_meta = serde_json::json!({
                        "import_envelope_id": envelope_c.envelope_id.as_str(),
                        "import_source_authority": envelope_c.source_authority,
                        "import_schema_version": envelope_c.schema_version,
                    });

                    match record {
                        projection_import::ImportRecord::Fact {
                            content,
                            source,
                            metadata,
                        } => {
                            let fact_id = uuid::Uuid::new_v4().to_string();
                            let mut meta = metadata.clone().unwrap_or(serde_json::json!({}));
                            if let Some(obj) = meta.as_object_mut() {
                                obj.insert("_import".into(), provenance_meta.clone());
                            }
                            if let Some(trace_id) = &envelope_c.trace_id {
                                if let Some(obj) = meta.as_object_mut() {
                                    obj.insert(
                                        "trace_id".into(),
                                        serde_json::Value::String(trace_id.0.clone()),
                                    );
                                }
                            }

                            knowledge::insert_fact_in_tx(
                                tx,
                                &fact_id,
                                &envelope_c.namespace,
                                content,
                                embedding_bytes,
                                q8_bytes.as_deref(),
                                source.as_deref(),
                                Some(&meta),
                            )?;
                        }
                        projection_import::ImportRecord::Episode {
                            document_id,
                            meta,
                        } => {
                            let episode_id = uuid::Uuid::new_v4().to_string();
                            let meta_json = serde_json::to_value(meta).map_err(|e| {
                                MemoryError::ImportInvalid {
                                    reason: format!("failed to serialize episode meta: {e}"),
                                }
                            })?;
                            let cause_ids_json =
                                serde_json::to_string(&meta.cause_ids).unwrap_or_default();
                            let verification_json =
                                serde_json::to_string(&meta.verification_status)
                                    .unwrap_or_default();
                            let search_text = format!(
                                "{} {} {} {}",
                                meta.effect_type,
                                meta.outcome.as_str(),
                                meta.experiment_id.as_deref().unwrap_or(""),
                                cause_ids_json
                            );
                            let _ = meta_json;

                            tx.execute(
                                "INSERT INTO episodes
                                 (episode_id, document_id, cause_ids, effect_type, outcome,
                                  confidence, verification_status, experiment_id,
                                  search_text, embedding, embedding_q8, trace_id)
                                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
                                rusqlite::params![
                                    episode_id,
                                    document_id,
                                    cause_ids_json,
                                    meta.effect_type,
                                    meta.outcome.as_str(),
                                    meta.confidence,
                                    verification_json,
                                    meta.experiment_id,
                                    search_text,
                                    embedding_bytes,
                                    q8_bytes.as_deref(),
                                    envelope_c
                                        .trace_id
                                        .as_ref()
                                        .map(|t| t.as_str().to_string()),
                                ],
                            )?;

                            tx.execute(
                                "INSERT INTO episodes_rowid_map (episode_id, document_id) VALUES (?1, ?2)",
                                rusqlite::params![episode_id, document_id],
                            )?;
                            let fts_rowid = tx.last_insert_rowid();
                            tx.execute(
                                "INSERT INTO episodes_fts(rowid, content) VALUES (?1, ?2)",
                                rusqlite::params![fts_rowid, search_text],
                            )?;

                            for (ordinal, cause_id) in meta.cause_ids.iter().enumerate() {
                                tx.execute(
                                    "INSERT OR IGNORE INTO episode_causes (episode_id, cause_node_id, ordinal)
                                     VALUES (?1, ?2, ?3)",
                                    rusqlite::params![episode_id, cause_id, ordinal as i64],
                                )?;
                            }

                            #[cfg(feature = "hnsw")]
                            db::enqueue_pending_index_op(
                                tx,
                                &format!("episode:{}", episode_id),
                                "episode",
                                db::PendingIndexOpKind::Upsert,
                            )?;
                        }
                    }
                }

                projection_import::insert_import_log(
                    tx,
                    &envelope_c,
                    &projection_import::ImportStatus::Complete,
                    record_count,
                )?;

                Ok(projection_import::ImportReceipt {
                    envelope_id: envelope_c.envelope_id.clone(),
                    schema_version: envelope_c.schema_version.clone(),
                    content_digest: envelope_c.content_digest.clone(),
                    status: projection_import::ImportStatus::Complete,
                    record_count,
                    imported_at: chrono::Utc::now().to_rfc3339(),
                    was_duplicate: false,
                    trace_id: envelope_c.trace_id.clone(),
                })
            })
        })
        .await?;

    #[cfg(feature = "hnsw")]
    store
        .sync_pending_hnsw_ops_best_effort("import_envelope")
        .await;

    Ok(receipt)
}

pub(crate) async fn import_status(
    store: &MemoryStore,
    envelope_id: &projection_import::EnvelopeId,
) -> Result<Vec<projection_import::ImportReceipt>, MemoryError> {
    let eid = envelope_id.0.clone();
    store
        .with_read_conn(move |conn| {
            let mut stmt = conn.prepare(
                "SELECT envelope_id, schema_version, content_digest, status,
                        record_count, imported_at, trace_id
                 FROM import_log
                 WHERE envelope_id = ?1
                 ORDER BY imported_at DESC",
            )?;
            let rows = stmt
                .query_map(rusqlite::params![eid], |row| {
                    Ok((
                        row.get::<_, String>(0)?,
                        row.get::<_, String>(1)?,
                        row.get::<_, String>(2)?,
                        row.get::<_, String>(3)?,
                        row.get::<_, i64>(4)?,
                        row.get::<_, String>(5)?,
                        row.get::<_, Option<String>>(6)?,
                    ))
                })?
                .collect::<Result<Vec<_>, _>>()?;

            Ok(rows
                .into_iter()
                .map(|(eid, sv, cd, status, rc, ts, tid)| {
                    let status_parsed = projection_import::ImportStatus::from_str_value(&status);
                    let was_dup = matches!(
                        status_parsed,
                        projection_import::ImportStatus::AlreadyImported
                    );
                    projection_import::ImportReceipt {
                        envelope_id: projection_import::EnvelopeId(eid),
                        schema_version: sv,
                        content_digest: cd,
                        status: status_parsed,
                        record_count: rc as usize,
                        imported_at: ts,
                        was_duplicate: was_dup,
                        trace_id: tid.map(crate::types::TraceId::new),
                    }
                })
                .collect())
        })
        .await
}

pub(crate) async fn list_imports(
    store: &MemoryStore,
    namespace: Option<&str>,
    limit: usize,
) -> Result<Vec<projection_import::ImportReceipt>, MemoryError> {
    let ns = namespace.map(str::to_string);
    store
        .with_read_conn(move |conn| projection_import::query_import_log(conn, ns.as_deref(), limit))
        .await
}

pub(crate) async fn last_import_at(
    store: &MemoryStore,
    namespace: &str,
) -> Result<Option<String>, MemoryError> {
    let ns = namespace.to_string();
    store
        .with_read_conn(move |conn| projection_import::last_import_at(conn, &ns))
        .await
}