kiromi-ai-memory 0.2.2

Local-first multi-tenant memory store engine: Markdown/text content on object storage, metadata in SQLite, plugin-shaped embedder/storage/metadata, hybrid text+vector search.
Documentation
// SPDX-License-Identifier: Apache-2.0 OR MIT
//! Plan 12 phase H — partition scheme evolution.
//!
//! `Memory::migrate_scheme(new_scheme, mapper, opts)` walks every live
//! memory under the tenant write lock, calls the caller-supplied
//! mapper to compute the new partitioning, renames the on-disk blob
//! via `Storage::rename_within`, and updates SQL state. The migration
//! is resumable through the `migration_state` table — a crash mid-run
//! leaves the cursor at the last fully-committed memory id, so
//! re-running the same call picks up where it left off.

use std::sync::Arc;
use std::time::Instant;

use crate::audit::AuditOp;
use crate::error::{Error, Result};
use crate::handle::{Memory, MemoryInner};
use crate::memory::{MemoryId, MemoryRecord, MemoryRef};
use crate::metadata::{AuditEntry, MigrationStateRow, SchemaMeta};
use crate::partition::{PartitionPath, PartitionScheme, Partitions};
use crate::snapshot::{SnapshotOpts, SnapshotRef};
use crate::storage::StorageKey;
use crate::summary::Scope;

/// Mapper closure: turns a memory record into a new partitioning.
pub type SchemeMapper = Box<dyn Fn(&MemoryRecord) -> Result<Partitions> + Send + Sync>;

/// Caller-tunable knobs on [`Memory::migrate_scheme`].
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct MigrationOpts {
    /// When `true`, returns the move plan without applying it. Default `false`.
    pub dry_run: bool,
    /// Memories per batch. Default 64.
    pub batch_size: u32,
    /// When `true`, take a snapshot before the migration starts so the
    /// caller can roll back. Default `true`.
    pub take_snapshot_first: bool,
}

impl Default for MigrationOpts {
    fn default() -> Self {
        Self {
            dry_run: false,
            batch_size: 64,
            take_snapshot_first: true,
        }
    }
}

impl MigrationOpts {
    /// Set the dry-run flag.
    #[must_use]
    pub fn with_dry_run(mut self, v: bool) -> Self {
        self.dry_run = v;
        self
    }

    /// Set the batch size.
    #[must_use]
    pub fn with_batch_size(mut self, v: u32) -> Self {
        self.batch_size = v.max(1);
        self
    }

    /// Set whether to take a snapshot first.
    #[must_use]
    pub fn with_take_snapshot_first(mut self, v: bool) -> Self {
        self.take_snapshot_first = v;
        self
    }
}

/// Report from a [`Memory::migrate_scheme`] run.
#[non_exhaustive]
#[derive(Debug, Clone, Default)]
pub struct MigrationReport {
    /// Number of memories whose partition_path actually changed.
    pub memories_moved: u64,
    /// Number of memories whose partition stayed the same.
    pub memories_skipped: u64,
    /// Snapshot taken before the migration started, if any.
    pub snapshot: Option<SnapshotRef>,
    /// Wall-clock duration in millis.
    pub duration_ms: u64,
    /// Move plan (when `dry_run`). Each entry is `(memory_id, new_path)`.
    pub plan: Vec<(MemoryId, PartitionPath)>,
    /// First 32 errors (`subject_id`, message).
    pub errors: Vec<(String, String)>,
}

const MIGRATE_NAME: &str = "migrate_scheme";

impl Memory {
    /// Repartition every live memory under a new scheme template.
    /// Resumable, snapshot-backed, dry-runnable.
    ///
    /// The mapper is called once per memory record and returns the
    /// `Partitions` struct that resolves under the new scheme. Memories
    /// whose mapper output equals their current partition are skipped.
    /// Storage blobs are renamed via [`crate::Storage::rename_within`];
    /// indices are rebuilt at the end with [`Memory::reindex`].
    ///
    /// **Cost:** O(live memories) — one storage `rename_within` per moved
    /// blob, one SQL transaction per memory + audit, one full `reindex` at
    /// the end (skipped on `dry_run`). Resumes from `migration_state` if
    /// interrupted, so a crash mid-run replays only the unfinished tail on
    /// the next call. **Errors:**
    /// [`Error::MigrationConflict`]
    /// if a previous run is `done` and the cursor was not cleared;
    /// [`Error::Storage`] /
    /// [`Error::Metadata`] for backend
    /// failures; [`Error::PartitionInvalid`]
    /// if the mapper produces a path that does not match the new scheme.
    ///
    /// ```no_run
    /// # async fn _ex(mem: kiromi_ai_memory::Memory) -> kiromi_ai_memory::Result<()> {
    /// use kiromi_ai_memory::{Partitions, migrate::MigrationOpts};
    /// let mapper: kiromi_ai_memory::migrate::SchemeMapper = Box::new(|rec| {
    ///     Ok(Partitions::new().with("user", rec.r#ref.partition.as_str()))
    /// });
    /// let r = mem.migrate_scheme("user={user}", mapper, MigrationOpts::default()).await?;
    /// # let _ = r; Ok(()) }
    /// ```
    pub async fn migrate_scheme(
        &self,
        new_scheme: &str,
        mapper: SchemeMapper,
        opts: MigrationOpts,
    ) -> Result<MigrationReport> {
        let started = Instant::now();
        let new_scheme = PartitionScheme::parse(new_scheme).map_err(Error::from)?;

        // Pre-flight snapshot.
        let snapshot = if opts.take_snapshot_first && !opts.dry_run {
            Some(
                self.snapshot(
                    SnapshotOpts::default()
                        .with_tag("pre-migrate-scheme")
                        .with_reason(format!("scheme: {new_scheme}")),
                )
                .await?,
            )
        } else {
            None
        };

        // Hold the write lock for the move loop, then release before
        // calling reindex (which acquires the same lock). Wrapped in a
        // block so the guard drops at the right point.
        let g = self.inner.locks.lock(&self.inner.tenant).await;

        // Resume from migration_state.
        let state = self
            .inner
            .metadata
            .read_migration_state(MIGRATE_NAME)
            .await?;
        if let Some(s) = state.as_ref()
            && s.status == "done"
            && !opts.dry_run
        {
            return Err(Error::MigrationConflict {
                reason: "previous migrate_scheme already done; clear migration_state to re-run"
                    .into(),
            });
        }
        let cursor = state.as_ref().and_then(|s| s.cursor.clone());

        // Fetch all live memories (sorted by id from list_all_memory_ids).
        let all_ids = self.inner.metadata.list_all_memory_ids().await?;
        let mut report = MigrationReport::default();

        let now = self.inner.clock.now_ms();
        if !opts.dry_run {
            self.inner
                .metadata
                .upsert_migration_state(MigrationStateRow {
                    name: MIGRATE_NAME.to_string(),
                    status: "in_progress".into(),
                    cursor: cursor.clone(),
                    started_at_ms: now,
                    finished_at_ms: None,
                })
                .await?;
        }

        let mut processed_since_checkpoint: u32 = 0;
        let batch_size = opts.batch_size.max(1);

        for id in &all_ids {
            // Resume: skip ids <= cursor.
            if let Some(c) = cursor.as_deref()
                && id.to_string().as_str() <= c
            {
                continue;
            }

            let row = match self.inner.metadata.get_memory(id).await? {
                Some(r) => r,
                None => continue,
            };
            if row.tombstoned {
                continue;
            }
            // Fetch full record so the mapper has body access.
            let mref = MemoryRef {
                id: row.id,
                partition: row.partition_path.clone(),
            };
            let record = match self.get(&mref).await {
                Ok(r) => r,
                Err(e) => {
                    if report.errors.len() < 32 {
                        report.errors.push((id.to_string(), format!("get: {e}")));
                    }
                    continue;
                }
            };
            let new_partitions = match mapper(&record) {
                Ok(p) => p,
                Err(e) => {
                    if report.errors.len() < 32 {
                        report.errors.push((id.to_string(), e.to_string()));
                    }
                    continue;
                }
            };
            let new_path = match new_partitions.resolve(&new_scheme) {
                Ok(p) => p,
                Err(e) => {
                    if report.errors.len() < 32 {
                        report
                            .errors
                            .push((id.to_string(), format!("resolve: {e}")));
                    }
                    continue;
                }
            };

            if new_path == row.partition_path {
                report.memories_skipped += 1;
                continue;
            }

            if opts.dry_run {
                report.plan.push((row.id, new_path));
                continue;
            }

            // Compute new data_path: replace the partition segment.
            let old_key = StorageKey::new(row.data_path.clone());
            let file_name =
                old_key.as_str().rsplit('/').next().ok_or_else(|| {
                    Error::IndexCorrupt(format!("bad data_path: {}", row.data_path))
                })?;
            let new_data_path = format!(
                "{}/data/{}/{}",
                self.inner.tenant.as_str(),
                new_path.as_str(),
                file_name,
            );
            let new_key = StorageKey::new(new_data_path.clone());

            // 1. Ensure the new partition chain exists in SQL.
            self.inner
                .metadata
                .ensure_partition_chain(&new_path, true, now)
                .await?;
            // 2. Rename the blob.
            self.inner.storage.rename_within(&old_key, &new_key).await?;
            // 3. Update memory row.
            self.inner
                .metadata
                .relocate_memory(&row.id, &new_path, &new_data_path, now)
                .await?;
            // 4. Audit.
            let audit = AuditEntry {
                ts_ms: now,
                actor: self.inner.actor.clone(),
                op: AuditOp::MigrateScheme,
                partition_path: Some(new_path.clone()),
                memory_id: Some(row.id),
                detail: serde_json::json!({
                    "from": row.partition_path.as_str(),
                    "to": new_path.as_str(),
                }),
            };
            let _ = self.inner.metadata.insert_restore_audit(audit).await?;

            report.memories_moved += 1;
            processed_since_checkpoint += 1;
            // Checkpoint cursor every batch_size moves.
            if processed_since_checkpoint >= batch_size {
                self.inner
                    .metadata
                    .upsert_migration_state(MigrationStateRow {
                        name: MIGRATE_NAME.to_string(),
                        status: "in_progress".into(),
                        cursor: Some(row.id.to_string()),
                        started_at_ms: now,
                        finished_at_ms: None,
                    })
                    .await?;
                processed_since_checkpoint = 0;
            }
        }

        // Release the write lock before reindex (which re-acquires it).
        drop(g);

        if !opts.dry_run {
            // Final reindex + scheme bump.
            let _ = self.reindex(Scope::All).await?;

            // Bump schema_meta.
            let meta = self
                .inner
                .metadata
                .read_schema_meta()
                .await?
                .ok_or_else(|| Error::Config("schema_meta missing".into()))?;
            let new_meta = SchemaMeta {
                partition_scheme: new_scheme.to_string(),
                scheme_version: meta.scheme_version + 1,
                embedder_id: meta.embedder_id,
                embedder_dims: meta.embedder_dims,
                created_at_ms: meta.created_at_ms,
            };
            self.inner.metadata.write_schema_meta(&new_meta).await?;

            // Mark migration done.
            self.inner
                .metadata
                .upsert_migration_state(MigrationStateRow {
                    name: MIGRATE_NAME.to_string(),
                    status: "done".into(),
                    cursor: None,
                    started_at_ms: state.map_or(now, |s| s.started_at_ms),
                    finished_at_ms: Some(self.inner.clock.now_ms()),
                })
                .await?;

            let _ = self
                .inner
                .event_tx
                .send(crate::event::MemoryEvent::SchemeMigrated {
                    memories_moved: report.memories_moved,
                    ts_ms: now,
                });
        }

        report.snapshot = snapshot;
        report.duration_ms = u64::try_from(started.elapsed().as_millis()).unwrap_or(0);
        Ok(report)
    }
}

#[allow(dead_code)]
const _UNUSED: Option<Arc<MemoryInner>> = None;