use anyhow::{anyhow, Context as _};
use chrono::Utc;
use reasonkit_mem::docset::{
open_default_docset_retriever, Docset, DocsetIngestOptions, DocsetIngestor, DocsetStore,
DocsetStoreConfig, RefreshStatus,
};
use reasonkit_mem::storage::{AccessContext, AccessLevel};
use serde::Serialize;
use std::collections::HashMap;
use std::env;
use std::path::{Path, PathBuf};
use uuid::Uuid;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let args: Vec<String> = env::args().collect();
if args.len() < 2 {
print_usage();
return Ok(());
}
match args[1].as_str() {
"docs" => docs_main(&args[2..]).await?,
"help" | "-h" | "--help" => print_usage(),
other => {
eprintln!("Unknown command: {other}");
print_usage();
}
}
Ok(())
}
fn print_usage() {
println!(
r#"rk-mem — ReasonKit Mem utilities
USAGE:
rk-mem <COMMAND> [ARGS...]
COMMANDS:
docs Docset ingestion and refresh (Cursor-like @Docs)
Run:
rk-mem docs --help
ENV:
RKMEM_DATA_DIR Base directory for docsets and indexes
"#
);
}
async fn docs_main(args: &[String]) -> anyhow::Result<()> {
if args.is_empty() || matches!(args[0].as_str(), "help" | "-h" | "--help") {
print_docs_usage();
return Ok(());
}
let data_dir = data_dir();
let store = DocsetStore::new(&data_dir, DocsetStoreConfig::default());
match args[0].as_str() {
"add" => cmd_docs_add(&store, args).await,
"list" => cmd_docs_list(&store).await,
"query" => cmd_docs_query(&store, &data_dir, args).await,
"remove" | "delete" => cmd_docs_remove(&store, &data_dir, args).await,
"refresh" => cmd_docs_refresh(&store, &data_dir, args).await,
other => Err(anyhow!("Unknown docs subcommand: {other}")),
}
}
fn print_docs_usage() {
println!(
r#"rk-mem docs — docset ingestion (Cursor-like @Docs)
USAGE:
rk-mem docs add <NAME> <START_URL> [ALLOWED_PREFIX...]
rk-mem docs list
rk-mem docs query <QUERY> [--docset <NAME|ID>] [--top-k N] [--json]
rk-mem docs remove <DOCSET_ID> [--keep-index]
rk-mem docs refresh [--due] [--max-pages N] [--concurrency N] [--timeout-secs N]
NOTES:
- If ALLOWED_PREFIX is omitted, START_URL is used as the only allowed prefix.
- `refresh --due` only refreshes docsets whose refresh policy is due.
ENV:
- RKMEM_DATA_DIR (default: platform data dir + /reasonkit/mem)
"#
);
}
fn data_dir() -> PathBuf {
if let Ok(dir) = env::var("RKMEM_DATA_DIR") {
return PathBuf::from(dir);
}
dirs::data_local_dir()
.unwrap_or_else(|| PathBuf::from("."))
.join("reasonkit")
.join("mem")
}
async fn cmd_docs_add(store: &DocsetStore, args: &[String]) -> anyhow::Result<()> {
if args.len() < 3 {
print_docs_usage();
return Ok(());
}
let name = args[1].clone();
let start_url = args[2].clone();
let allowed_prefixes: Vec<String> = if args.len() > 3 {
args[3..].to_vec()
} else {
vec![start_url.clone()]
};
let docset = Docset::new(name, start_url, allowed_prefixes);
let saved = store.upsert(docset).await?;
println!("Added docset: {} ({})", saved.name, saved.id);
Ok(())
}
async fn cmd_docs_list(store: &DocsetStore) -> anyhow::Result<()> {
let docsets = store.load().await?;
if docsets.is_empty() {
println!("No docsets configured.");
return Ok(());
}
let now = Utc::now();
for ds in docsets {
let due = ds.is_due(now);
println!(
"{}\n name: {}\n start_url: {}\n refresh: {:?}\n status: {:?}\n due: {}\n",
ds.id, ds.name, ds.start_url, ds.refresh, ds.status, due
);
}
Ok(())
}
async fn cmd_docs_remove(
store: &DocsetStore,
data_dir: &Path,
args: &[String],
) -> anyhow::Result<()> {
if args.len() < 2 {
print_docs_usage();
return Ok(());
}
let id = Uuid::parse_str(&args[1]).context("Invalid DOCSET_ID")?;
let keep_index = args.iter().any(|a| a == "--keep-index");
let deleted = store.delete(id).await?;
if !deleted {
println!("Docset not found: {id}");
return Ok(());
}
println!("Removed docset config: {id}");
if keep_index {
return Ok(());
}
let retriever = open_default_docset_retriever(data_dir.to_path_buf()).await?;
let ctx = AccessContext::new(
"rk-mem".to_string(),
AccessLevel::Admin,
"docs_remove".to_string(),
);
let doc_ids = retriever
.storage()
.list_documents(&ctx)
.await
.context("Failed to list documents")?;
let wanted_tag = format!("docset_id:{id}");
let mut removed_docs = 0usize;
for doc_id in doc_ids {
let doc = retriever
.storage()
.get_document(&doc_id, &ctx)
.await
.context("Failed to read document")?;
let Some(doc) = doc else {
continue;
};
if doc.metadata.tags.iter().any(|t| t == &wanted_tag) {
let _ = retriever.delete_document(&doc_id).await;
removed_docs += 1;
}
}
println!("Removed {removed_docs} document(s) from index.");
Ok(())
}
#[derive(Debug, Serialize)]
struct DocsQueryHit {
score: f32,
sparse_score: Option<f32>,
doc_id: String,
chunk_id: String,
url: Option<String>,
title: Option<String>,
docset: Option<String>,
docset_id: Option<String>,
text: String,
}
async fn cmd_docs_query(
store: &DocsetStore,
data_dir: &Path,
args: &[String],
) -> anyhow::Result<()> {
if args.len() < 2 {
print_docs_usage();
return Ok(());
}
let query = args[1].as_str();
let top_k: usize = if let Some(v) = parse_arg_value(args, "--top-k") {
v.parse().context("Invalid --top-k")?
} else {
8
};
let json = args.iter().any(|a| a == "--json");
let docset_filter = parse_arg_value(args, "--docset");
let wanted_docset_id = if let Some(filter) = docset_filter.as_deref() {
if let Ok(id) = Uuid::parse_str(filter) {
Some(id)
} else {
let docsets = store.load().await?;
let Some(found) = docsets.iter().find(|d| d.name.eq_ignore_ascii_case(filter)) else {
return Err(anyhow!(
"Unknown docset: {filter} (expected name or UUID; see `rk-mem docs list`)"
));
};
Some(found.id)
}
} else {
None
};
let retriever = open_default_docset_retriever(data_dir.to_path_buf()).await?;
let ctx = AccessContext::new(
"rk-mem".to_string(),
AccessLevel::Admin,
"docs_query".to_string(),
);
let candidate_k = if top_k >= 200 {
top_k
} else {
top_k.saturating_mul(5).min(200)
};
let results = retriever.search_sparse(query, candidate_k).await?;
let mut doc_cache: HashMap<Uuid, Option<reasonkit_mem::Document>> = HashMap::new();
let mut hits: Vec<DocsQueryHit> = Vec::with_capacity(top_k);
for r in results {
if hits.len() >= top_k {
break;
}
let doc = if let Some(cached) = doc_cache.get(&r.doc_id) {
cached.clone()
} else {
let loaded = retriever
.storage()
.get_document(&r.doc_id, &ctx)
.await
.context("Failed to read document")?;
doc_cache.insert(r.doc_id, loaded.clone());
loaded
};
let Some(doc) = doc else {
continue;
};
let docset_id_tag = doc
.metadata
.tags
.iter()
.find(|t| t.starts_with("docset_id:"))
.cloned();
if docset_id_tag.is_none() {
continue;
}
if let Some(wanted) = wanted_docset_id {
let wanted_tag = format!("docset_id:{wanted}");
if !doc.metadata.tags.iter().any(|t| t == &wanted_tag) {
continue;
}
}
let docset_tag = doc
.metadata
.tags
.iter()
.find(|t| t.starts_with("docset:"))
.map(|t| t.trim_start_matches("docset:").to_string());
hits.push(DocsQueryHit {
score: r.score,
sparse_score: r.sparse_score,
doc_id: r.doc_id.to_string(),
chunk_id: r.chunk_id.to_string(),
url: doc.source.url.clone(),
title: doc.metadata.title.clone(),
docset: docset_tag,
docset_id: docset_id_tag.map(|t| t.trim_start_matches("docset_id:").to_string()),
text: r.text,
});
}
if json {
println!("{}", serde_json::to_string_pretty(&hits)?);
return Ok(());
}
if hits.is_empty() {
println!("No results.");
return Ok(());
}
for (i, h) in hits.iter().enumerate() {
let url = h.url.as_deref().unwrap_or("<no-url>");
let title = h.title.as_deref().unwrap_or("<no-title>");
println!(
"#{:02} score={:.4} docset={} url={}\n {}\n",
i + 1,
h.score,
h.docset.as_deref().unwrap_or("<unknown>"),
url,
title
);
}
Ok(())
}
async fn cmd_docs_refresh(
store: &DocsetStore,
data_dir: &Path,
args: &[String],
) -> anyhow::Result<()> {
let mut opts = DocsetIngestOptions {
manifest_dir: Some(data_dir.join("docsets")),
..Default::default()
};
for w in args.iter().skip(1) {
if w.as_str() == "--due" {
opts.refresh_due_only = true;
}
}
if let Some(v) = parse_arg_value(args, "--max-pages") {
opts.max_pages = v.parse().context("Invalid --max-pages")?;
}
if let Some(v) = parse_arg_value(args, "--concurrency") {
opts.concurrency = v.parse().context("Invalid --concurrency")?;
}
if let Some(v) = parse_arg_value(args, "--timeout-secs") {
let secs: u64 = v.parse().context("Invalid --timeout-secs")?;
opts.request_timeout = std::time::Duration::from_secs(secs);
}
let mut docsets = store.load().await?;
if docsets.is_empty() {
println!("No docsets configured.");
return Ok(());
}
let retriever = open_default_docset_retriever(data_dir.to_path_buf()).await?;
let ingestor = DocsetIngestor::new(retriever)?;
for ds in docsets.iter_mut() {
let res = ingestor.ingest_docset(ds, &opts).await;
match res {
Ok(report) => {
println!(
"Docset {} ({}) — method={}, discovered={}, fetched={}, indexed={}, skipped={}, removed={}, failures={}",
report.docset_name,
report.docset_id,
report.discovery_method,
report.discovered_urls,
report.fetched_pages,
report.indexed_pages,
report.skipped_unchanged,
report.removed_pages,
report.failures
);
}
Err(e) => {
ds.status = RefreshStatus::Error {
at: Utc::now(),
message: e.to_string(),
};
eprintln!("Docset {} ({}) — ERROR: {e}", ds.name, ds.id);
}
}
let _ = store.upsert(ds.clone()).await;
}
Ok(())
}
fn parse_arg_value(args: &[String], key: &str) -> Option<String> {
args.iter()
.position(|a| a == key)
.and_then(|idx| args.get(idx + 1))
.cloned()
}