use futures::StreamExt;
use kiromi_ai_memory::{PartitionPath, RegenSubjectOpts, Scope, SummaryStyle};
use crate::cli::{
GlobalArgs, RegenerateCmd, RegenerateEmbeddingsArgs, RegenerateSummariesListArgs, ReindexArgs,
ScopeArg,
};
use crate::error::{CliError, ExitCode};
use crate::runtime::Runtime;
pub(crate) async fn run(cmd: RegenerateCmd, globals: &GlobalArgs) -> Result<(), CliError> {
let rt = Runtime::open(globals).await?;
match cmd {
RegenerateCmd::Embeddings(a) => embeddings(rt, a).await,
RegenerateCmd::SummariesList(a) => summaries_list(rt, a).await,
RegenerateCmd::Reindex(a) => reindex(rt, a).await,
}
}
fn parse_scope(s: &ScopeArg) -> Result<Scope, CliError> {
let v = s.scope.trim();
if v == "all" {
return Ok(Scope::All);
}
if v == "tenant" {
return Ok(Scope::Tenant);
}
if let Some(rest) = v.strip_prefix("partition=") {
let p: PartitionPath = rest.parse().map_err(|e| CliError {
kind: ExitCode::Config,
source: anyhow::anyhow!("scope partition: {e}"),
})?;
return Ok(Scope::Partition(p));
}
Err(CliError {
kind: ExitCode::Config,
source: anyhow::anyhow!("invalid --scope: {v}"),
})
}
fn parse_style(s: &str) -> Result<SummaryStyle, CliError> {
match s {
"compact" => Ok(SummaryStyle::Compact),
"detailed" => Ok(SummaryStyle::Detailed),
other => Err(CliError {
kind: ExitCode::Config,
source: anyhow::anyhow!("unknown style: {other}"),
}),
}
}
async fn embeddings(rt: Runtime, a: RegenerateEmbeddingsArgs) -> Result<(), CliError> {
let scope = parse_scope(&a.scope)?;
let cfg_embedder = match rt.cfg.embedder.as_ref() {
Some(c) => c.clone(),
None => {
return Err(CliError {
kind: ExitCode::Config,
source: anyhow::anyhow!(
"regenerate embeddings requires --embedder-family / --embedder-config"
),
});
}
};
let resolved = crate::embedder::resolve(Some(&cfg_embedder)).await?;
let Some(boxed) = resolved else {
return Err(CliError {
kind: ExitCode::Config,
source: anyhow::anyhow!("embedder family resolved to None"),
});
};
let report = rt.mem.regenerate_embeddings(scope, boxed.as_ref()).await?;
if rt.json {
println!(
"{}",
serde_json::json!({
"processed": report.processed,
"skipped": report.skipped,
"failed": report.failed,
"duration_ms": report.duration_ms,
})
);
} else {
println!(
"processed={} skipped={} failed={} duration_ms={}",
report.processed, report.skipped, report.failed, report.duration_ms
);
}
rt.mem.close().await?;
Ok(())
}
async fn summaries_list(rt: Runtime, a: RegenerateSummariesListArgs) -> Result<(), CliError> {
let scope = parse_scope(&a.scope)?;
let style = parse_style(&a.style)?;
let opts = RegenSubjectOpts::default()
.with_force(a.force)
.with_style(style);
let mut stream = rt.mem.subjects_to_regenerate(scope, opts).boxed();
while let Some(item) = stream.next().await {
match item {
Ok(subject) => {
if rt.json {
println!("{}", serde_json::to_string(&subject).unwrap_or_default());
} else {
println!(
"{}\t{}",
subject.kind_str(),
subject
.partition_path()
.map(|p| p.as_str().to_string())
.unwrap_or_default()
);
}
}
Err(e) => {
eprintln!("subjects_to_regenerate: {e}");
}
}
}
rt.mem.close().await?;
Ok(())
}
async fn reindex(rt: Runtime, a: ReindexArgs) -> Result<(), CliError> {
let scope = parse_scope(&a.scope)?;
let report = rt.mem.reindex(scope).await?;
if rt.json {
println!(
"{}",
serde_json::json!({
"indices_rebuilt": report.indices_rebuilt,
"memories_indexed": report.memories_indexed,
"child_summaries_indexed": report.child_summaries_indexed,
"duration_ms": report.duration_ms,
})
);
} else {
println!(
"indices_rebuilt={} memories={} child_summaries={} duration_ms={}",
report.indices_rebuilt,
report.memories_indexed,
report.child_summaries_indexed,
report.duration_ms,
);
}
rt.mem.close().await?;
Ok(())
}