kiromi-ai-memory 0.2.2

Local-first multi-tenant memory store engine: Markdown/text content on object storage, metadata in SQLite, plugin-shaped embedder/storage/metadata, hybrid text+vector search.
Documentation
// SPDX-License-Identifier: Apache-2.0 OR MIT
//! Plan 10 backfill: populate `summary_input` rows from the legacy
//! `summary.inputs` JSON column.
//!
//! Plan 9 stored summary inputs as a JSON array on `summary.inputs`. Plan 10
//! adds the indexed `summary_input` table for traverse + reverse-edge lookups.
//! This migrator iterates every summary at `Memory::open()` time and inserts
//! denormalised rows. Idempotent — guarded by a `migration_state` row.

use crate::error::Result;
use crate::metadata::{MetadataStore, MigrationStateRow};
use crate::summary::SummarySubject;
use crate::util::Clock;

const MIGRATION_NAME: &str = "summary_input_backfill_v1";

/// Run the backfill. Idempotent: a `migration_state` row with status `done`
/// short-circuits subsequent runs.
pub async fn migrate(metadata: &dyn MetadataStore, clock: &dyn Clock) -> Result<()> {
    if let Some(state) = metadata.read_migration_state(MIGRATION_NAME).await?
        && state.status == "done"
    {
        return Ok(());
    }

    let now = clock.now_ms();
    metadata
        .upsert_migration_state(MigrationStateRow {
            name: MIGRATION_NAME.to_string(),
            status: "in_progress".into(),
            cursor: None,
            started_at_ms: now,
            finished_at_ms: None,
        })
        .await?;

    let summaries = metadata.list_all_summaries_for_backfill().await?;
    for s in summaries {
        for input in &s.inputs {
            let (kind, id) = match input {
                SummarySubject::Memory(r) => ("memory", r.id.to_string()),
                SummarySubject::Partition(p) => ("partition", p.as_str().to_string()),
                SummarySubject::Tenant => ("tenant", "<root>".to_string()),
            };
            metadata.insert_summary_input(&s.id, kind, &id).await?;
        }
    }

    let now = clock.now_ms();
    metadata
        .upsert_migration_state(MigrationStateRow {
            name: MIGRATION_NAME.to_string(),
            status: "done".into(),
            cursor: None,
            started_at_ms: now,
            finished_at_ms: Some(now),
        })
        .await?;
    Ok(())
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::audit::AuditOp;
    use crate::content::ContentHash;
    use crate::metadata::{AuditEntry, SqliteMetadata, SummaryRow};
    use crate::partition::PartitionPath;
    use crate::summary::SummaryId;
    use crate::util::SystemClock;

    #[tokio::test]
    async fn migrate_is_idempotent_after_first_run() {
        let m = SqliteMetadata::connect_memory().await.unwrap();
        m.migrate().await.unwrap();
        let clock = SystemClock;
        // First run on an empty store — flips state to done.
        migrate(&m, &clock).await.unwrap();
        let state = m.read_migration_state(MIGRATION_NAME).await.unwrap();
        assert_eq!(state.unwrap().status, "done");
        // Second run short-circuits.
        migrate(&m, &clock).await.unwrap();
    }

    #[tokio::test]
    async fn migrate_backfills_inputs_for_existing_summary() {
        let m = SqliteMetadata::connect_memory().await.unwrap();
        m.migrate().await.unwrap();
        let path: PartitionPath = "user=alex".parse().unwrap();
        m.ensure_partition_chain(&path, true, 1).await.unwrap();

        let sid = SummaryId::generate();
        let row = SummaryRow {
            id: sid,
            subject_kind: "partition".into(),
            subject_path: Some(path.clone()),
            subject_memory: None,
            style: "compact".into(),
            version: 1,
            data_path: "x".into(),
            content_hash: ContentHash([0; 32]),
            bytes: 0,
            summarizer_id: "x:1".into(),
            inputs: vec![SummarySubject::Tenant],
            superseded_by: None,
            tombstoned: false,
            created_at_ms: 1,
        };
        let audit = AuditEntry {
            ts_ms: 1,
            actor: None,
            op: AuditOp::SummaryAttach,
            partition_path: Some(path.clone()),
            memory_id: None,
            detail: serde_json::Value::Null,
        };
        m.insert_summary(crate::metadata::InsertSummaryRequest {
            row,
            audit,
            embedding_blob: None,
            content_for_index: String::new(),
            parent_path: path.as_str().to_string(),
        })
        .await
        .unwrap();
        // Run the migrator — Tenant input should be denormalised.
        migrate(&m, &SystemClock).await.unwrap();
        let citing = m.summaries_citing("tenant", "<root>").await.unwrap();
        assert_eq!(citing, vec![sid]);
    }
}