kiromi-ai-cli 0.2.2

Operator and developer CLI for the kiromi-ai-memory store: append, search, snapshot, regenerate, migrate-scheme, gc, audit-tail.
// SPDX-License-Identifier: Apache-2.0 OR MIT
//! `kiromi-ai-memory migrate-scheme` (Plan 12 phase J).
//!
//! The mapper protocol: the CLI invokes the mapper binary as a long-
//! running subprocess. Each memory record is written to its stdin as
//! one JSON object on a line; the mapper writes back one JSON object
//! per line containing `{"partitions": {"key": "value", ...}}`. The
//! caller is free to ignore the input and emit a constant rule.

use std::process::Stdio;

use kiromi_ai_memory::{MigrationOpts, Partitions, SchemeMapper};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::process::Command;
use tokio::sync::Mutex;

use crate::cli::{GlobalArgs, MigrateSchemeArgs};
use crate::error::{CliError, ExitCode};
use crate::runtime::Runtime;

pub(crate) async fn run(args: MigrateSchemeArgs, globals: &GlobalArgs) -> Result<(), CliError> {
    let rt = Runtime::open(globals).await?;
    let mapper_binary = args.mapper_binary.clone();
    if !mapper_binary.exists() {
        return Err(CliError {
            kind: ExitCode::Config,
            source: anyhow::anyhow!("mapper-binary does not exist: {}", mapper_binary.display()),
        });
    }

    // The mapper subprocess is spawned per call and held inside an
    // async Mutex so the SchemeMapper closure can hand records to it
    // serially. We use a blocking sub-channel: the `Fn` passed into
    // `migrate_scheme` is sync but the mapper communication is async,
    // so we tunnel through a `tokio::runtime::Handle::block_on`.
    let handle = tokio::runtime::Handle::current();

    let mut child = Command::new(&mapper_binary)
        .stdin(Stdio::piped())
        .stdout(Stdio::piped())
        .stderr(Stdio::inherit())
        .spawn()
        .map_err(|e| CliError {
            kind: ExitCode::Backend,
            source: anyhow::anyhow!("spawn mapper: {e}"),
        })?;
    let stdin = child.stdin.take().ok_or_else(|| CliError {
        kind: ExitCode::Backend,
        source: anyhow::anyhow!("mapper has no stdin"),
    })?;
    let stdout = child.stdout.take().ok_or_else(|| CliError {
        kind: ExitCode::Backend,
        source: anyhow::anyhow!("mapper has no stdout"),
    })?;
    let lock = std::sync::Arc::new(Mutex::new((stdin, BufReader::new(stdout).lines())));

    let mapper: SchemeMapper = {
        let lock = std::sync::Arc::clone(&lock);
        let handle = handle.clone();
        Box::new(move |rec| {
            let lock = std::sync::Arc::clone(&lock);
            let payload = serde_json::json!({
                "id": rec.r#ref.id.to_string(),
                "partition": rec.r#ref.partition.as_str(),
                "content": rec.content.as_str(),
            });
            handle.block_on(async move {
                let mut g = lock.lock().await;
                let line = format!("{}\n", payload);
                g.0.write_all(line.as_bytes())
                    .await
                    .map_err(kiromi_ai_memory::Error::Io)?;
                g.0.flush().await.map_err(kiromi_ai_memory::Error::Io)?;
                let line = match g.1.next_line().await {
                    Ok(Some(l)) => l,
                    _ => {
                        return Err(kiromi_ai_memory::Error::Config(
                            "mapper closed stdout".into(),
                        ));
                    }
                };
                let v: serde_json::Value = serde_json::from_str(&line)
                    .map_err(|e| kiromi_ai_memory::Error::Config(format!("mapper json: {e}")))?;
                let kvs = v
                    .get("partitions")
                    .and_then(|x| x.as_object())
                    .ok_or_else(|| {
                        kiromi_ai_memory::Error::Config(
                            "mapper output missing `partitions` object".into(),
                        )
                    })?;
                let mut p = Partitions::new();
                for (k, val) in kvs {
                    let s = val.as_str().ok_or_else(|| {
                        kiromi_ai_memory::Error::Config(format!(
                            "mapper partition value not a string for `{k}`"
                        ))
                    })?;
                    p = p.with(k, s.to_string());
                }
                Ok(p)
            })
        })
    };

    let opts = MigrationOpts::default()
        .with_dry_run(args.dry_run)
        .with_take_snapshot_first(!args.no_snapshot)
        .with_batch_size(args.batch_size);

    let report = rt
        .mem
        .migrate_scheme(&args.new_scheme, mapper, opts)
        .await?;

    // Reap mapper.
    drop(lock);
    let _ = child.kill().await;

    if rt.json {
        println!(
            "{}",
            serde_json::json!({
                "memories_moved": report.memories_moved,
                "memories_skipped": report.memories_skipped,
                "duration_ms": report.duration_ms,
                "snapshot_id": report.snapshot.as_ref().map(|s| s.id.to_string()),
                "errors": report.errors,
                "plan_len": report.plan.len(),
                "dry_run": args.dry_run,
            })
        );
    } else {
        println!(
            "moved={} skipped={} duration_ms={} dry_run={}",
            report.memories_moved, report.memories_skipped, report.duration_ms, args.dry_run,
        );
    }
    rt.mem.close().await?;
    Ok(())
}