use std::process::Stdio;
use kiromi_ai_memory::{MigrationOpts, Partitions, SchemeMapper};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::process::Command;
use tokio::sync::Mutex;
use crate::cli::{GlobalArgs, MigrateSchemeArgs};
use crate::error::{CliError, ExitCode};
use crate::runtime::Runtime;
pub(crate) async fn run(args: MigrateSchemeArgs, globals: &GlobalArgs) -> Result<(), CliError> {
let rt = Runtime::open(globals).await?;
let mapper_binary = args.mapper_binary.clone();
if !mapper_binary.exists() {
return Err(CliError {
kind: ExitCode::Config,
source: anyhow::anyhow!("mapper-binary does not exist: {}", mapper_binary.display()),
});
}
let handle = tokio::runtime::Handle::current();
let mut child = Command::new(&mapper_binary)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::inherit())
.spawn()
.map_err(|e| CliError {
kind: ExitCode::Backend,
source: anyhow::anyhow!("spawn mapper: {e}"),
})?;
let stdin = child.stdin.take().ok_or_else(|| CliError {
kind: ExitCode::Backend,
source: anyhow::anyhow!("mapper has no stdin"),
})?;
let stdout = child.stdout.take().ok_or_else(|| CliError {
kind: ExitCode::Backend,
source: anyhow::anyhow!("mapper has no stdout"),
})?;
let lock = std::sync::Arc::new(Mutex::new((stdin, BufReader::new(stdout).lines())));
let mapper: SchemeMapper = {
let lock = std::sync::Arc::clone(&lock);
let handle = handle.clone();
Box::new(move |rec| {
let lock = std::sync::Arc::clone(&lock);
let payload = serde_json::json!({
"id": rec.r#ref.id.to_string(),
"partition": rec.r#ref.partition.as_str(),
"content": rec.content.as_str(),
});
handle.block_on(async move {
let mut g = lock.lock().await;
let line = format!("{}\n", payload);
g.0.write_all(line.as_bytes())
.await
.map_err(kiromi_ai_memory::Error::Io)?;
g.0.flush().await.map_err(kiromi_ai_memory::Error::Io)?;
let line = match g.1.next_line().await {
Ok(Some(l)) => l,
_ => {
return Err(kiromi_ai_memory::Error::Config(
"mapper closed stdout".into(),
));
}
};
let v: serde_json::Value = serde_json::from_str(&line)
.map_err(|e| kiromi_ai_memory::Error::Config(format!("mapper json: {e}")))?;
let kvs = v
.get("partitions")
.and_then(|x| x.as_object())
.ok_or_else(|| {
kiromi_ai_memory::Error::Config(
"mapper output missing `partitions` object".into(),
)
})?;
let mut p = Partitions::new();
for (k, val) in kvs {
let s = val.as_str().ok_or_else(|| {
kiromi_ai_memory::Error::Config(format!(
"mapper partition value not a string for `{k}`"
))
})?;
p = p.with(k, s.to_string());
}
Ok(p)
})
})
};
let opts = MigrationOpts::default()
.with_dry_run(args.dry_run)
.with_take_snapshot_first(!args.no_snapshot)
.with_batch_size(args.batch_size);
let report = rt
.mem
.migrate_scheme(&args.new_scheme, mapper, opts)
.await?;
drop(lock);
let _ = child.kill().await;
if rt.json {
println!(
"{}",
serde_json::json!({
"memories_moved": report.memories_moved,
"memories_skipped": report.memories_skipped,
"duration_ms": report.duration_ms,
"snapshot_id": report.snapshot.as_ref().map(|s| s.id.to_string()),
"errors": report.errors,
"plan_len": report.plan.len(),
"dry_run": args.dry_run,
})
);
} else {
println!(
"moved={} skipped={} duration_ms={} dry_run={}",
report.memories_moved, report.memories_skipped, report.duration_ms, args.dry_run,
);
}
rt.mem.close().await?;
Ok(())
}