use std::collections::{BTreeMap, HashMap};
use std::io::{Read, Write};
use std::net::{TcpStream, ToSocketAddrs};
use std::path::PathBuf;
use std::time::Duration;
use reddb::cli;
use reddb::cli::types::FlagValue;
use reddb::service_cli::{
install_systemd_service, probe_listener, render_systemd_unit, run_server_with_large_stack,
ServerCommandConfig, ServerTransport, SystemdServiceConfig,
};
fn wants_json(flags: &HashMap<String, FlagValue>) -> bool {
flag_bool(flags, "json") || flag_string(flags, "output").as_deref() == Some("json")
}
fn json_ok(command: &str, data: &str) {
println!(
"{{\"ok\":true,\"command\":\"{}\",\"data\":{}}}",
json_escape(command),
data
);
}
fn open_local_runtime(flags: &HashMap<String, FlagValue>) -> Result<reddb::RedDBRuntime, String> {
match flag_string(flags, "path") {
Some(path) if !path.is_empty() => {
reddb::RedDBRuntime::with_options(reddb::api::RedDBOptions::persistent(&path))
.map_err(|e| format!("open {path}: {e}"))
}
_ => reddb::RedDBRuntime::in_memory().map_err(|e| e.to_string()),
}
}
fn checkpoint_local_runtime(rt: &reddb::RedDBRuntime) {
let _ = rt.checkpoint();
}
fn has_cli_vault_key() -> bool {
std::env::var("REDDB_CERTIFICATE")
.map(|value| !value.is_empty())
.unwrap_or(false)
|| std::env::var("REDDB_VAULT_KEY")
.map(|value| !value.is_empty())
.unwrap_or(false)
}
fn attach_cli_vault(
rt: &reddb::RedDBRuntime,
required: bool,
) -> Result<Option<std::sync::Arc<reddb::auth::AuthStore>>, String> {
let db = rt.db();
let store = db.store();
let Some(pager) = store.pager() else {
if required {
return Err("vault requires a persistent database".to_string());
}
return Ok(None);
};
let has_saved_vault = reddb::auth::vault::Vault::has_saved_state(pager);
if !has_cli_vault_key() {
if required || has_saved_vault {
return Err(
"vault export/import requires REDDB_CERTIFICATE or REDDB_VAULT_KEY".to_string(),
);
}
return Ok(None);
}
let auth = std::sync::Arc::new(
reddb::auth::AuthStore::with_vault(
reddb::auth::AuthConfig::default(),
std::sync::Arc::clone(pager),
None,
)
.map_err(|err| format!("open vault: {err}"))?,
);
rt.set_auth_store(std::sync::Arc::clone(&auth));
Ok(Some(auth))
}
fn value_to_json_fragment(value: &reddb::storage::schema::Value) -> String {
use reddb::storage::schema::Value;
match value {
Value::Null => "null".to_string(),
Value::Boolean(b) => b.to_string(),
Value::Integer(n) => n.to_string(),
Value::UnsignedInteger(n) => n.to_string(),
Value::Float(n) => n.to_string(),
Value::BigInt(n) => n.to_string(),
Value::TimestampMs(n) | Value::Timestamp(n) | Value::Duration(n) | Value::Decimal(n) => {
n.to_string()
}
Value::Password(_) | Value::Secret(_) => "\"***\"".to_string(),
Value::Text(s) => format!("\"{}\"", json_escape(s.as_ref())),
Value::Email(s) | Value::Url(s) | Value::NodeRef(s) | Value::EdgeRef(s) => {
format!("\"{}\"", json_escape(s))
}
other => format!("\"{}\"", json_escape(&format!("{other}"))),
}
}
fn format_result_pretty(result: &reddb::runtime::RuntimeQueryResult) -> String {
use std::fmt::Write;
let mut out = String::new();
if result.statement_type != "select" {
let _ = writeln!(
out,
"{} ok ({} row{} affected)",
result.statement_type,
result.affected_rows,
if result.affected_rows == 1 { "" } else { "s" },
);
return out;
}
if result.result.records.is_empty() {
out.push_str("(no rows)\n");
return out;
}
for (i, record) in result.result.records.iter().enumerate() {
let mut entries: Vec<(&str, &reddb::storage::schema::Value)> =
record.iter_fields().map(|(k, v)| (k.as_ref(), v)).collect();
entries.sort_by(|a, b| a.0.cmp(b.0));
let mut line = format!("{}.", i + 1);
for (key, value) in entries {
let _ = write!(line, " {key}={value}");
}
out.push_str(&line);
out.push('\n');
}
let _ = writeln!(
out,
"({} row{})",
result.result.records.len(),
if result.result.records.len() == 1 {
""
} else {
"s"
}
);
out
}
fn format_result_json(result: &reddb::runtime::RuntimeQueryResult) -> String {
use std::fmt::Write;
let mut out = String::from("{\"statement\":\"");
out.push_str(result.statement_type);
out.push_str("\",\"affected\":");
let _ = write!(out, "{}", result.affected_rows);
out.push_str(",\"rows\":[");
for (i, record) in result.result.records.iter().enumerate() {
if i > 0 {
out.push(',');
}
out.push('{');
let mut entries: Vec<(&str, &reddb::storage::schema::Value)> =
record.iter_fields().map(|(k, v)| (k.as_ref(), v)).collect();
entries.sort_by(|a, b| a.0.cmp(b.0));
for (j, (key, value)) in entries.iter().enumerate() {
if j > 0 {
out.push(',');
}
let _ = write!(
out,
"\"{}\":{}",
json_escape(key),
value_to_json_fragment(value)
);
}
out.push('}');
}
out.push_str("]}");
out
}
fn json_error(command: &str, error: &str) -> ! {
eprintln!(
"{{\"ok\":false,\"command\":\"{}\",\"error\":\"{}\"}}",
json_escape(command),
json_escape(error)
);
std::process::exit(1);
}
fn main() {
if let Some((name, err)) = reddb::utils::expand_all_reddb_secrets().into_iter().next() {
eprintln!("error: failed to expand {name}_FILE: {err}");
std::process::exit(2);
}
let args: Vec<String> = std::env::args().skip(1).collect();
if args.is_empty() {
print!("{}", cli::commands::main_help_text());
return;
}
if args.first().map(|s| s.as_str()) == Some("--complete") {
let rest: Vec<&str> = args[1..].iter().map(|s| s.as_str()).collect();
let domain_tree = build_completion_tree();
let completions = cli::complete::complete_partial(&rest, &domain_tree);
for c in completions {
println!("{}", c);
}
return;
}
let command = identify_command(&args);
let flags = build_flags_for_command(command.as_deref());
let tokens = cli::token::tokenize(&args);
let parser = cli::schema::SchemaParser::new(flags);
let result = parser.parse(&tokens);
if result.flags.get("help").is_some_and(|v| v.is_truthy()) {
match command.as_deref() {
Some(cmd) => match cli::commands::command_help_text(cmd) {
Some(text) => {
print!("{}", text);
return;
}
None => {
print!("{}", cli::commands::main_help_text());
return;
}
},
None => {
print!("{}", cli::commands::main_help_text());
return;
}
}
}
if result.flags.get("version").is_some_and(|v| v.is_truthy()) {
if wants_json(&result.flags) {
json_ok(
"version",
&format!("{{\"version\":\"{}\"}}", env!("CARGO_PKG_VERSION")),
);
} else {
println!("reddb {}", env!("CARGO_PKG_VERSION"));
}
return;
}
if !result.errors.is_empty() {
if wants_json(&result.flags) {
let msgs: Vec<String> = result
.errors
.iter()
.map(|e| json_escape(&e.format_human()))
.collect();
json_error("parse", &msgs.join("; "));
}
for err in &result.errors {
eprint!("{}", err.format_human());
}
std::process::exit(1);
}
let positionals = &result.positionals;
if positionals.is_empty() {
print!("{}", cli::commands::main_help_text());
return;
}
let cmd = positionals[0].as_str();
let remaining = &positionals[1..];
match cmd {
"help" => {
if let Some(cmd_name) = remaining.first() {
match cli::commands::command_help_text(cmd_name) {
Some(text) => print!("{}", text),
None => {
eprintln!("Unknown command: {}", cmd_name);
eprintln!("Run 'red help' for a list of commands.");
std::process::exit(1);
}
}
} else {
print!("{}", cli::commands::main_help_text());
}
}
"version" => {
if wants_json(&result.flags) {
json_ok(
"version",
&format!("{{\"version\":\"{}\"}}", env!("CARGO_PKG_VERSION")),
);
} else {
println!("reddb {}", env!("CARGO_PKG_VERSION"));
}
}
"doctor" => {
std::process::exit(run_doctor(&result));
}
"bootstrap" => {
std::process::exit(run_bootstrap_command(&result.flags));
}
"server" => {
let json_mode = wants_json(&result.flags);
let config = build_server_config(&result.flags, None).unwrap_or_else(|err| {
if json_mode {
json_error("server", &err);
}
eprintln!("error: {err}");
std::process::exit(1);
});
if json_mode {
eprintln!("{}", server_command_json("server", &config));
}
if let Err(err) = run_server_with_large_stack(config) {
if json_mode {
json_error("server", &err.to_string());
}
eprintln!("red server: {err}");
std::process::exit(1);
}
}
"service" => {
let json_mode = wants_json(&result.flags);
let subcommand = remaining.first().map(|s| s.as_str()).unwrap_or("help");
match subcommand {
"install" => {
let config =
build_systemd_service_config(&result.flags).unwrap_or_else(|err| {
if json_mode {
json_error("service.install", &err);
}
eprintln!("error: {err}");
std::process::exit(1);
});
install_systemd_service(&config).unwrap_or_else(|err| {
if json_mode {
json_error("service.install", &err);
}
eprintln!("error: {err}");
std::process::exit(1);
});
let unit_name = format!("{}.service", config.service_name);
if json_mode {
json_ok(
"service.install",
&format!(
"{{\"unit\":\"{}\",\"path\":\"{}\",\"router_bind\":{},\"grpc_bind\":{},\"http_bind\":{}}}",
json_escape(&unit_name),
json_escape(&config.unit_path().display().to_string()),
json_optional_string(config.router_bind_addr.as_deref()),
json_optional_string(config.grpc_bind_addr.as_deref()),
json_optional_string(config.http_bind_addr.as_deref())
),
);
} else {
println!("Installed and started {}", unit_name);
println!("Status: systemctl status {}", unit_name);
}
}
"print-unit" => {
let config =
build_systemd_service_config(&result.flags).unwrap_or_else(|err| {
if json_mode {
json_error("service.print-unit", &err);
}
eprintln!("error: {err}");
std::process::exit(1);
});
let unit = render_systemd_unit(&config);
if json_mode {
json_ok("service.print-unit", &format!("{{\"unit\":{:?}}}", unit));
} else {
print!("{unit}");
}
}
_ => {
let help = "Usage: red service <install|print-unit> [flags]\n\nExamples:\n sudo red service install --binary /usr/local/bin/red --bind 0.0.0.0:5050 --path /var/lib/reddb/data.rdb\n red service print-unit --http --path /var/lib/reddb/data.rdb --bind 127.0.0.1:8080\n";
if json_mode {
json_ok("service", "{\"subcommands\":[\"install\",\"print-unit\"]}");
} else {
print!("{help}");
}
}
}
}
"replica" => {
let json_mode = wants_json(&result.flags);
let config =
build_server_config(&result.flags, Some("replica")).unwrap_or_else(|err| {
if json_mode {
json_error("replica", &err);
}
eprintln!("error: {err}");
std::process::exit(1);
});
if json_mode {
eprintln!("{}", server_command_json("replica", &config));
}
if let Err(err) = run_server_with_large_stack(config) {
if json_mode {
json_error("replica", &err.to_string());
}
eprintln!("red replica: {err}");
std::process::exit(1);
}
}
"rpc" => {
let stdio = result.flags.get("stdio").is_some_and(|v| v.is_truthy());
if !stdio {
eprintln!("Usage: red rpc --stdio [--path file | --connect grpc://host:port]");
eprintln!("Only --stdio mode is currently implemented.");
std::process::exit(1);
}
if let Some(connect) = flag_string(&result.flags, "connect") {
if !connect.is_empty() {
let token = flag_string(&result.flags, "token").filter(|s| !s.is_empty());
let endpoint = connect
.strip_prefix("grpc://")
.map(|rest| format!("http://{rest}"))
.unwrap_or_else(|| connect.clone());
let code = reddb::rpc_stdio::run_remote(&endpoint, token);
std::process::exit(code);
}
}
let rt = open_local_runtime(&result.flags).unwrap_or_else(|err| {
eprintln!("rpc: {err}");
std::process::exit(1);
});
let code = reddb::rpc_stdio::run(&rt);
let _ = rt.checkpoint();
std::process::exit(code);
}
"query" => {
let json_mode = wants_json(&result.flags);
let sql = remaining.first().map(|s| s.as_str()).unwrap_or("");
if sql.is_empty() {
if json_mode {
json_error("query", "Usage: red query [--path file] <sql>");
}
eprintln!("Usage: red query [--path file] <sql>");
eprintln!("Example: red query \"SELECT * FROM users\"");
std::process::exit(1);
}
let rt = open_local_runtime(&result.flags).unwrap_or_else(|err| {
if json_mode {
json_error("query", &err);
}
eprintln!("error: {err}");
std::process::exit(1);
});
match rt.execute_query(sql) {
Ok(qr) => {
checkpoint_local_runtime(&rt);
if json_mode {
json_ok("query", &format_result_json(&qr));
} else {
print!("{}", format_result_pretty(&qr));
}
}
Err(err) => {
if json_mode {
json_error("query", &err.to_string());
}
eprintln!("query error: {err}");
std::process::exit(1);
}
}
}
"insert" => {
let json_mode = wants_json(&result.flags);
if remaining.len() < 2 {
if json_mode {
json_error(
"insert",
"Usage: red insert [--path file] <collection> <json>",
);
}
eprintln!("Usage: red insert [--path file] <collection> <json>");
eprintln!("Example: red insert users '{{\"name\": \"Alice\"}}'");
std::process::exit(1);
}
let collection = &remaining[0];
let json_data = &remaining[1];
let parsed: reddb::json::Value =
reddb::json::from_str(json_data).unwrap_or_else(|err| {
if json_mode {
json_error("insert", &format!("invalid JSON: {err}"));
}
eprintln!("invalid JSON: {err}");
std::process::exit(1);
});
let object = match parsed {
reddb::json::Value::Object(map) => map,
_ => {
if json_mode {
json_error("insert", "expected a JSON object");
}
eprintln!("expected a JSON object");
std::process::exit(1);
}
};
let mut cols = Vec::new();
let mut vals = Vec::new();
for (k, v) in object.iter() {
cols.push(k.clone());
vals.push(match v {
reddb::json::Value::String(s) => format!("'{}'", s.replace('\'', "''")),
reddb::json::Value::Number(n) => n.to_string(),
reddb::json::Value::Bool(b) => b.to_string(),
reddb::json::Value::Null => "NULL".to_string(),
other => format!("'{}'", other.to_string().replace('\'', "''")),
});
}
let sql = format!(
"INSERT INTO {collection} ({}) VALUES ({})",
cols.join(", "),
vals.join(", "),
);
let rt = open_local_runtime(&result.flags).unwrap_or_else(|err| {
if json_mode {
json_error("insert", &err);
}
eprintln!("error: {err}");
std::process::exit(1);
});
match rt.execute_query(&sql) {
Ok(qr) => {
checkpoint_local_runtime(&rt);
if json_mode {
json_ok("insert", &format_result_json(&qr));
} else {
print!("{}", format_result_pretty(&qr));
}
}
Err(err) => {
if json_mode {
json_error("insert", &err.to_string());
}
eprintln!("insert error: {err}");
std::process::exit(1);
}
}
}
"get" => {
let json_mode = wants_json(&result.flags);
if remaining.len() < 2 {
if json_mode {
json_error("get", "Usage: red get [--path file] <collection> <id>");
}
eprintln!("Usage: red get [--path file] <collection> <id>");
eprintln!("Example: red get users 42");
std::process::exit(1);
}
let collection = &remaining[0];
let id = &remaining[1];
let sql = format!("SELECT * FROM {collection} WHERE _entity_id = {id}");
let rt = open_local_runtime(&result.flags).unwrap_or_else(|err| {
if json_mode {
json_error("get", &err);
}
eprintln!("error: {err}");
std::process::exit(1);
});
match rt.execute_query(&sql) {
Ok(qr) => {
if json_mode {
json_ok("get", &format_result_json(&qr));
} else {
print!("{}", format_result_pretty(&qr));
}
}
Err(err) => {
if json_mode {
json_error("get", &err.to_string());
}
eprintln!("get error: {err}");
std::process::exit(1);
}
}
}
"delete" => {
let json_mode = wants_json(&result.flags);
if remaining.len() < 2 {
if json_mode {
json_error(
"delete",
"Usage: red delete [--path file] <collection> <id>",
);
}
eprintln!("Usage: red delete [--path file] <collection> <id>");
eprintln!("Example: red delete users 42");
std::process::exit(1);
}
let collection = &remaining[0];
let id = &remaining[1];
let sql = format!("DELETE FROM {collection} WHERE _entity_id = {id}");
let rt = open_local_runtime(&result.flags).unwrap_or_else(|err| {
if json_mode {
json_error("delete", &err);
}
eprintln!("error: {err}");
std::process::exit(1);
});
match rt.execute_query(&sql) {
Ok(qr) => {
checkpoint_local_runtime(&rt);
if json_mode {
json_ok("delete", &format_result_json(&qr));
} else {
print!("{}", format_result_pretty(&qr));
}
}
Err(err) => {
if json_mode {
json_error("delete", &err.to_string());
}
eprintln!("delete error: {err}");
std::process::exit(1);
}
}
}
"health" => {
let json_mode = wants_json(&result.flags);
let explicit_transport =
result.flags.contains_key("grpc") || result.flags.contains_key("http");
let transport = select_transport(&result.flags).unwrap_or_else(|err| {
if json_mode {
json_error("health", &err);
}
eprintln!("error: {err}");
std::process::exit(1);
});
let bind_addr = flag_string(&result.flags, "bind").unwrap_or_else(|| {
if explicit_transport {
transport.default_bind_addr().to_string()
} else {
reddb::service_cli::DEFAULT_ROUTER_BIND_ADDR.to_string()
}
});
let transport_label = if explicit_transport {
transport.as_str()
} else {
"router"
};
let ok = probe_listener(&bind_addr, Duration::from_secs(1));
if json_mode {
json_ok(
"health",
&format!(
"{{\"healthy\":{},\"transport\":\"{}\",\"address\":\"{}\"}}",
ok,
json_escape(transport_label),
json_escape(&bind_addr)
),
);
if !ok {
std::process::exit(1);
}
} else if ok {
println!("ok {} {}", transport_label, bind_addr);
} else {
eprintln!("unreachable {} {}", transport_label, bind_addr);
std::process::exit(1);
}
}
"tick" => {
let json_mode = wants_json(&result.flags);
let bind_addr =
flag_string(&result.flags, "bind").unwrap_or_else(|| "127.0.0.1:8080".to_string());
let operations = flag_string(&result.flags, "operations");
let dry_run = flag_bool(&result.flags, "dry-run");
let payload = build_tick_payload(operations.as_deref(), dry_run);
let body = post_json_to_http(&bind_addr, "/tick", &payload).unwrap_or_else(|err| {
if json_mode {
json_error("tick", &err.to_string());
}
eprintln!("error: {err}");
std::process::exit(1);
});
if json_mode {
json_ok("tick", &body);
} else {
println!("{body}");
}
}
"migrate-from-redis" => {
std::process::exit(run_migrate_from_redis_command(&result.flags));
}
"status" => {
let json_mode = wants_json(&result.flags);
let rt = open_local_runtime(&result.flags).unwrap_or_else(|err| {
if json_mode {
json_error("status", &err);
}
eprintln!("error: {err}");
std::process::exit(1);
});
let stats = rt.stats();
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis())
.unwrap_or(0);
let uptime_ms = now_ms.saturating_sub(stats.started_at_unix_ms);
if json_mode {
json_ok(
"status",
&format!(
"{{\"uptime_ms\":{},\"collections\":{},\"entities\":{},\"pid\":{}}}",
uptime_ms,
stats.store.collection_count,
stats.store.total_entities,
stats.system.pid,
),
);
} else {
println!("uptime_ms: {}", uptime_ms);
println!("collections: {}", stats.store.collection_count);
println!("entities: {}", stats.store.total_entities);
println!("pid: {}", stats.system.pid);
println!("hostname: {}", stats.system.hostname);
println!("os/arch: {}/{}", stats.system.os, stats.system.arch);
}
}
"mcp" => {
let path = result
.flags
.get("path")
.map(|v| v.as_str_value())
.unwrap_or_default();
let runtime = if path.is_empty() {
reddb::runtime::RedDBRuntime::in_memory().unwrap()
} else {
reddb::runtime::RedDBRuntime::with_options(reddb::api::RedDBOptions::persistent(
&path,
))
.unwrap()
};
let mut server = reddb::mcp::server::McpServer::new(runtime);
server.run_stdio();
}
"auth" => {
let json_mode = wants_json(&result.flags);
let subcommand = result
.positionals
.first()
.map(|s| s.as_str())
.unwrap_or("help");
let _rt = reddb::RedDBRuntime::in_memory().expect("failed to create runtime");
let auth_store = std::sync::Arc::new(reddb::auth::store::AuthStore::new(
reddb::auth::AuthConfig {
enabled: true,
..Default::default()
},
));
match subcommand {
"bootstrap" => {
let user = result
.flags
.get("user")
.map(|v| v.as_str_value())
.unwrap_or_else(|| "admin".to_string());
let password = result
.flags
.get("password")
.map(|v| v.as_str_value())
.unwrap_or_else(|| {
if json_mode {
json_error(
"auth.bootstrap",
"--password is required for bootstrap",
);
}
eprintln!("error: --password is required for bootstrap");
std::process::exit(1);
});
match auth_store.bootstrap(&user, &password) {
Ok(br) => {
if json_mode {
let cert_json = br
.certificate
.as_ref()
.map(|c| format!("\"{}\"", json_escape(c)))
.unwrap_or_else(|| "null".to_string());
json_ok(
"auth.bootstrap",
&format!(
"{{\"username\":\"{}\",\"role\":\"{}\",\"api_key\":\"{}\",\"certificate\":{}}}",
json_escape(&br.user.username),
json_escape(br.user.role.as_str()),
json_escape(&br.api_key.key),
cert_json
),
);
} else {
println!(
"Admin user '{}' created (role: {})",
br.user.username,
br.user.role.as_str()
);
println!("API Key: {}", br.api_key.key);
if let Some(cert) = br.certificate {
println!();
println!(
"CERTIFICATE (save this — required to unseal the vault):"
);
println!(" {}", cert);
println!();
println!("Without this certificate, the vault cannot be decrypted after restart.");
} else {
println!();
println!("Save this API key — it will not be shown again.");
}
}
}
Err(e) => {
if json_mode {
json_error("auth.bootstrap", &format!("{e}"));
}
eprintln!("error: {e}");
std::process::exit(1);
}
}
}
_ => {
if json_mode {
json_ok(
"auth",
"{\"subcommands\":[\"bootstrap\",\"create-user\",\"list-users\",\"login\"],\"message\":\"use a subcommand, e.g. red auth bootstrap --password s3cret!\"}",
);
} else {
println!("Usage: red auth <subcommand>");
println!();
println!("Subcommands:");
println!(
" bootstrap Create the first admin user (only when no users exist)"
);
println!(" create-user Create a new user (requires admin)");
println!(" list-users List all users");
println!(" login Login and get a session token");
println!();
println!("Examples:");
println!(" red auth bootstrap --password s3cret!");
println!(
" red auth create-user --user alice --password pass --role write"
);
}
}
}
}
"connect" => {
let json_mode = wants_json(&result.flags);
let addr = remaining
.first()
.map(|s| s.to_string())
.unwrap_or_else(|| "localhost:6380".to_string());
let token = result.flags.get("token").map(|v| v.as_str_value());
let one_shot_query = result.flags.get("query").map(|v| v.as_str_value());
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.expect("tokio runtime");
rt.block_on(async {
let mut client = match reddb::client::RedDBClient::connect(&addr, token).await {
Ok(c) => c,
Err(e) => {
if json_mode {
json_error("connect", &format!("Failed to connect to {}: {}", addr, e));
}
eprintln!("Failed to connect to {}: {}", addr, e);
std::process::exit(1);
}
};
if let Some(query) = one_shot_query {
match client.query(&query).await {
Ok(json) => {
if json_mode {
json_ok("connect", &json);
} else {
println!("{}", json);
}
}
Err(e) => {
if json_mode {
json_error("connect", &format!("{}", e));
}
eprintln!("error: {}", e);
std::process::exit(1);
}
}
} else {
reddb::client::repl::run_repl(&mut client).await;
}
});
}
"dump" => {
let json_mode = wants_json(&result.flags);
let rt = open_local_runtime(&result.flags).unwrap_or_else(|err| {
if json_mode {
json_error("dump", &err);
}
eprintln!("error: {err}");
std::process::exit(1);
});
let auth_store = attach_cli_vault(&rt, false).unwrap_or_else(|err| {
if json_mode {
json_error("dump", &err);
}
eprintln!("error: {err}");
std::process::exit(1);
});
let store = rt.db().store();
let mut targets: Vec<String> = match flag_string(&result.flags, "collection") {
Some(name) if !name.is_empty() => {
let mut names = vec![name];
if names.iter().all(|name| name != "red_config")
&& store.get_collection("red_config").is_some()
{
names.push("red_config".to_string());
}
names
}
_ => store.list_collections(),
};
targets.sort();
targets.dedup();
let output_path = flag_string(&result.flags, "output");
let mut buf = String::new();
let mut total_rows = 0usize;
let mut secret_keys = 0usize;
for collection in &targets {
let manager = match store.get_collection(collection) {
Some(m) => m,
None => continue,
};
for entity in manager.query_all(|_| true) {
let mut row_obj = reddb::json::Map::new();
if let reddb::storage::EntityData::Row(ref row) = entity.data {
if let Some(named) = &row.named {
for (k, v) in named {
row_obj
.insert(k.clone(), reddb::json::Value::String(v.to_string()));
}
}
}
let mut wrapper = reddb::json::Map::new();
wrapper.insert(
"collection".to_string(),
reddb::json::Value::String(collection.clone()),
);
wrapper.insert("fields".to_string(), reddb::json::Value::Object(row_obj));
let line = reddb::json::Value::Object(wrapper).to_string_compact();
buf.push_str(&line);
buf.push('\n');
total_rows += 1;
}
}
if let Some(auth_store) = auth_store.as_ref() {
match auth_store.vault_kv_export_encrypted() {
Ok(Some(blob)) => {
let mut keys = auth_store.vault_kv_keys();
keys.sort();
secret_keys = keys.len();
let keys_json = keys
.iter()
.map(|key| reddb::json::Value::String(key.clone()))
.collect();
let mut wrapper = reddb::json::Map::new();
wrapper.insert(
"kind".to_string(),
reddb::json::Value::String("reddb.vault_kv.v1".to_string()),
);
wrapper.insert("encrypted".to_string(), reddb::json::Value::Bool(true));
wrapper.insert("keys".to_string(), reddb::json::Value::Array(keys_json));
wrapper.insert("blob".to_string(), reddb::json::Value::String(blob));
buf.push_str(&reddb::json::Value::Object(wrapper).to_string_compact());
buf.push('\n');
}
Ok(None) => {}
Err(err) => {
if json_mode {
json_error("dump", &err.to_string());
}
eprintln!("dump error: {err}");
std::process::exit(1);
}
}
}
match output_path {
Some(path) if !path.is_empty() => {
if let Err(e) = std::fs::write(&path, &buf) {
if json_mode {
json_error("dump", &format!("write failed: {e}"));
}
eprintln!("write failed: {e}");
std::process::exit(1);
}
if json_mode {
json_ok(
"dump",
&format!(
"{{\"path\":\"{}\",\"rows\":{},\"collections\":{},\"secret_keys\":{}}}",
path,
total_rows,
targets.len(),
secret_keys
),
);
} else {
println!(
"dumped {} rows from {} collection(s), {} secret key(s) to {}",
total_rows,
targets.len(),
secret_keys,
path
);
}
}
_ => {
print!("{}", buf);
if json_mode {
json_ok(
"dump",
&format!(
"{{\"rows\":{},\"collections\":{},\"secret_keys\":{}}}",
total_rows,
targets.len(),
secret_keys
),
);
}
}
}
}
"restore" => {
let json_mode = wants_json(&result.flags);
let input_path = match flag_string(&result.flags, "input") {
Some(p) if !p.is_empty() => p,
_ => {
if json_mode {
json_error("restore", "--input / -i is required");
}
eprintln!("Usage: red restore -i FILE [--collection NAME] [--path DB]");
std::process::exit(1);
}
};
let override_collection = flag_string(&result.flags, "collection");
let file_text = std::fs::read_to_string(&input_path).unwrap_or_else(|e| {
if json_mode {
json_error("restore", &format!("read failed: {e}"));
}
eprintln!("read failed: {e}");
std::process::exit(1);
});
let rt = open_local_runtime(&result.flags).unwrap_or_else(|err| {
if json_mode {
json_error("restore", &err);
}
eprintln!("error: {err}");
std::process::exit(1);
});
let mut restored = 0usize;
let mut errors = 0usize;
let mut restored_secret_keys = 0usize;
let mut auth_store: Option<std::sync::Arc<reddb::auth::AuthStore>> = None;
for (line_no, line) in file_text.lines().enumerate() {
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
let parsed: reddb::json::Value = match reddb::json::from_str(trimmed) {
Ok(v) => v,
Err(_) => {
errors += 1;
eprintln!("line {}: invalid JSON", line_no + 1);
continue;
}
};
if let reddb::json::Value::Object(map) = &parsed {
if map.get("kind").and_then(|v| v.as_str()) == Some("reddb.vault_kv.v1") {
let keys: Vec<String> = map
.get("keys")
.and_then(|value| value.as_array())
.map(|values| {
values
.iter()
.filter_map(|value| value.as_str().map(|s| s.to_string()))
.collect()
})
.unwrap_or_default();
let blob = match map.get("blob").and_then(|value| value.as_str()) {
Some(blob) => blob,
None => {
errors += 1;
eprintln!("line {}: missing vault export blob", line_no + 1);
continue;
}
};
if auth_store.is_none() {
match attach_cli_vault(&rt, true) {
Ok(store) => auth_store = store,
Err(err) => {
errors += 1;
eprintln!("line {}: {}", line_no + 1, err);
continue;
}
}
}
let Some(auth) = auth_store.as_ref() else {
errors += 1;
eprintln!("line {}: vault is unavailable", line_no + 1);
continue;
};
match reddb::auth::vault::Vault::unseal_logical_export(blob, None) {
Ok(state) => match auth.vault_kv_try_import(state.kv) {
Ok(count) => restored_secret_keys += count,
Err(err) => {
errors += 1;
eprintln!("line {}: vault import failed: {}", line_no + 1, err);
}
},
Err(err) => {
errors += 1;
eprintln!(
"line {}: vault import failed: {}; importing false placeholders",
line_no + 1,
err
);
match auth.vault_kv_try_import_placeholders(&keys) {
Ok(count) => restored_secret_keys += count,
Err(placeholder_err) => {
eprintln!(
"line {}: vault placeholder import failed: {}",
line_no + 1,
placeholder_err
);
}
}
}
}
continue;
}
}
let (collection, fields) = match &parsed {
reddb::json::Value::Object(map) => {
let embedded_collection = map
.get("collection")
.and_then(|v| v.as_str())
.map(|s| s.to_string());
let coll = if embedded_collection.as_deref() == Some("red_config") {
embedded_collection
} else {
override_collection.clone().or(embedded_collection)
};
let fields = map.get("fields").cloned();
match (coll, fields) {
(Some(c), Some(f)) => (c, f),
_ => {
errors += 1;
continue;
}
}
}
_ => {
errors += 1;
continue;
}
};
let obj = match fields {
reddb::json::Value::Object(m) => m,
_ => {
errors += 1;
continue;
}
};
let mut cols = Vec::new();
let mut vals = Vec::new();
for (k, v) in obj.iter() {
cols.push(k.clone());
vals.push(match v {
reddb::json::Value::String(s) => format!("'{}'", s.replace('\'', "''")),
reddb::json::Value::Number(n) => n.to_string(),
reddb::json::Value::Bool(b) => b.to_string(),
reddb::json::Value::Null => "NULL".to_string(),
other => format!("'{}'", other.to_string_compact().replace('\'', "''")),
});
}
let sql = format!(
"INSERT INTO {} ({}) VALUES ({})",
collection,
cols.join(", "),
vals.join(", ")
);
match rt.execute_query(&sql) {
Ok(_) => restored += 1,
Err(e) => {
errors += 1;
eprintln!("line {}: {}", line_no + 1, e);
}
}
}
checkpoint_local_runtime(&rt);
if json_mode {
json_ok(
"restore",
&format!(
"{{\"restored\":{},\"secret_keys\":{},\"errors\":{}}}",
restored, restored_secret_keys, errors
),
);
} else {
println!(
"restored {} rows, {} secret key(s) ({} errors)",
restored, restored_secret_keys, errors
);
}
}
"pitr-list" => {
let json_mode = wants_json(&result.flags);
let snapshot_prefix = flag_string(&result.flags, "snapshot-prefix")
.unwrap_or_else(|| "./data/snapshots".to_string());
let wal_prefix = flag_string(&result.flags, "wal-prefix")
.unwrap_or_else(|| "./data/wal-archive".to_string());
let backend = std::sync::Arc::new(reddb::storage::backend::local::LocalBackend)
as std::sync::Arc<dyn reddb::storage::backend::RemoteBackend>;
let pitr =
reddb::storage::wal::PointInTimeRecovery::new(backend, snapshot_prefix, wal_prefix);
match pitr.list_restore_points() {
Ok(points) => {
if json_mode {
let mut out = String::from("[");
for (i, p) in points.iter().enumerate() {
if i > 0 {
out.push(',');
}
out.push_str(&format!(
"{{\"snapshot_id\":{},\"snapshot_time\":{},\"wal_segments\":{},\"latest_recoverable_time\":{}}}",
p.snapshot_id,
p.snapshot_time,
p.wal_segment_count,
p.latest_recoverable_time
));
}
out.push(']');
json_ok("pitr-list", &out);
} else if points.is_empty() {
println!("no restore points found");
} else {
println!(
"{:<15} {:<24} {:<14} {:<24}",
"snapshot_id",
"snapshot_time (unix ms)",
"wal_segments",
"latest_recoverable_time"
);
for p in &points {
println!(
"{:<15} {:<24} {:<14} {:<24}",
p.snapshot_id,
p.snapshot_time,
p.wal_segment_count,
p.latest_recoverable_time
);
}
}
}
Err(err) => {
if json_mode {
json_error("pitr-list", &err.to_string());
}
eprintln!("pitr-list error: {err}");
std::process::exit(1);
}
}
}
"pitr-restore" => {
let json_mode = wants_json(&result.flags);
let target_time: u64 = flag_string(&result.flags, "target-time")
.and_then(|s| s.parse().ok())
.unwrap_or(0);
let dest = match flag_string(&result.flags, "dest") {
Some(p) if !p.is_empty() => p,
_ => {
if json_mode {
json_error("pitr-restore", "--dest is required");
}
eprintln!(
"Usage: red pitr-restore --dest PATH --target-time MS \
--snapshot-prefix DIR --wal-prefix DIR"
);
std::process::exit(1);
}
};
let snapshot_prefix = flag_string(&result.flags, "snapshot-prefix")
.unwrap_or_else(|| "./data/snapshots".to_string());
let wal_prefix = flag_string(&result.flags, "wal-prefix")
.unwrap_or_else(|| "./data/wal-archive".to_string());
let backend = std::sync::Arc::new(reddb::storage::backend::local::LocalBackend)
as std::sync::Arc<dyn reddb::storage::backend::RemoteBackend>;
let pitr =
reddb::storage::wal::PointInTimeRecovery::new(backend, snapshot_prefix, wal_prefix);
match pitr.restore_to(target_time, std::path::Path::new(&dest)) {
Ok(res) => {
if json_mode {
json_ok(
"pitr-restore",
&format!(
"{{\"snapshot_used\":{},\"wal_segments_replayed\":{},\"records_applied\":{},\"recovered_to_lsn\":{},\"recovered_to_time\":{}}}",
res.snapshot_used,
res.wal_segments_replayed,
res.records_applied,
res.recovered_to_lsn,
res.recovered_to_time
),
);
} else {
println!(
"restored to {} at lsn {} (snapshot {}, {} WAL segments, {} records applied)",
res.recovered_to_time,
res.recovered_to_lsn,
res.snapshot_used,
res.wal_segments_replayed,
res.records_applied,
);
}
}
Err(err) => {
if json_mode {
json_error("pitr-restore", &err.to_string());
}
eprintln!("pitr-restore error: {err}");
std::process::exit(1);
}
}
}
"vcs" => {
run_vcs_command(&result.flags, remaining);
}
"admin" => {
run_admin_command(&result.flags, remaining);
}
_ => {
if wants_json(&result.flags) {
json_error("unknown", &format!("Unknown command: {}", cmd));
}
eprintln!("Unknown command: {}", cmd);
eprintln!();
print!("{}", cli::commands::main_help_text());
std::process::exit(1);
}
}
}
fn run_migrate_from_redis_command(flags: &HashMap<String, FlagValue>) -> i32 {
let json_mode = wants_json(flags);
let dry_run = flag_bool(flags, "dry-run");
let phase = flag_string(flags, "phase")
.filter(|value| !value.is_empty())
.unwrap_or_else(|| "dry-run".to_string());
let namespace = flag_string(flags, "namespace")
.filter(|value| !value.is_empty())
.unwrap_or_else(|| "redis-migration".to_string());
if phase == "dual-write" {
let msg = "red migrate-from-redis does not own the dual-write shadow phase; use the documented application-owned helper pattern in docs/guides/migrate-redis-to-blob-cache.md";
if json_mode {
json_error("migrate-from-redis", msg);
}
eprintln!("migrate-from-redis: {msg}");
return 2;
}
if phase != "dry-run" {
let msg =
format!("unsupported migration phase '{phase}'; supported phases: dry-run, dual-write");
if json_mode {
json_error("migrate-from-redis", &msg);
}
eprintln!("migrate-from-redis: {msg}");
return 2;
}
if !dry_run {
let msg =
"only --dry-run is supported; dual-write must stay in the application-owned helper";
if json_mode {
json_error("migrate-from-redis", msg);
}
eprintln!("migrate-from-redis: {msg}");
return 2;
}
let redis_url = match flag_string(flags, "redis-url").filter(|value| !value.is_empty()) {
Some(value) => value,
None => {
let msg = "--redis-url is required for migrate-from-redis --dry-run";
if json_mode {
json_error("migrate-from-redis", msg);
}
eprintln!("migrate-from-redis: {msg}");
return 2;
}
};
if let Err(err) = validate_redis_tcp_connectivity(&redis_url) {
let msg = format!("Redis connectivity check failed: {err}");
if json_mode {
json_error("migrate-from-redis", &msg);
}
eprintln!("migrate-from-redis: {msg}");
return 1;
}
if let Err(err) = open_local_runtime(flags) {
let msg = format!("RedDB connectivity check failed: {err}");
if json_mode {
json_error("migrate-from-redis", &msg);
}
eprintln!("migrate-from-redis: {msg}");
return 1;
}
let data = format!(
"{{\"mode\":\"dry-run\",\"redis_reachable\":true,\"reddb_reachable\":true,\"namespace\":\"{}\",\"entries_scanned\":0,\"entries_written\":0,\"mismatch_count\":0}}",
json_escape(&namespace)
);
if json_mode {
json_ok("migrate-from-redis", &data);
} else {
println!("migrate-from-redis dry-run ok");
println!("redis_reachable: true");
println!("reddb_reachable: true");
println!("namespace: {}", namespace);
println!("entries_scanned: 0");
println!("entries_written: 0");
println!("mismatch_count: 0");
}
0
}
fn validate_redis_tcp_connectivity(redis_url: &str) -> Result<(), String> {
let addr = redis_socket_addr(redis_url)?;
let mut addrs = addr
.to_socket_addrs()
.map_err(|err| format!("resolve {addr}: {err}"))?;
let Some(sockaddr) = addrs.next() else {
return Err(format!("resolve {addr}: no addresses"));
};
TcpStream::connect_timeout(&sockaddr, Duration::from_secs(1))
.map(|_| ())
.map_err(|err| format!("connect {addr}: {err}"))
}
fn redis_socket_addr(redis_url: &str) -> Result<String, String> {
let without_scheme = if let Some(rest) = redis_url.strip_prefix("redis://") {
rest
} else if redis_url.contains("://") {
return Err("only redis:// URLs are supported for dry-run validation".to_string());
} else {
redis_url
};
let authority = without_scheme.split('/').next().unwrap_or_default();
let host_port = authority.rsplit('@').next().unwrap_or_default();
if host_port.is_empty() {
return Err("missing Redis host".to_string());
}
if !host_port
.rsplit(':')
.next()
.is_some_and(|port| !port.is_empty() && port.as_bytes().iter().all(|b| b.is_ascii_digit()))
{
return Err("Redis URL must include host:port".to_string());
}
Ok(host_port.to_string())
}
fn run_vcs_command(flags: &HashMap<String, FlagValue>, remaining: &[String]) {
let json_mode = wants_json(flags);
let subcommand = remaining.first().map(|s| s.as_str()).unwrap_or("help");
let args: Vec<&str> = remaining.iter().skip(1).map(|s| s.as_str()).collect();
let rt = match open_local_runtime(flags) {
Ok(rt) => rt,
Err(err) => {
if json_mode {
json_error("vcs", &err);
}
eprintln!("vcs error: {err}");
std::process::exit(1);
}
};
let connection_id = flag_string(flags, "connection")
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(1);
let author = reddb::application::Author {
name: flag_string(flags, "author").unwrap_or_else(|| "reddb".to_string()),
email: flag_string(flags, "email").unwrap_or_else(|| "reddb@localhost".to_string()),
};
let vcs = reddb::application::VcsUseCases::new(&rt);
let outcome: Result<String, String> = match subcommand {
"commit" => {
let message = flag_string(flags, "message")
.or_else(|| args.first().map(|s| s.to_string()))
.unwrap_or_else(|| "no message".to_string());
vcs.commit(reddb::application::CreateCommitInput {
connection_id,
message,
author,
committer: None,
amend: false,
allow_empty: true,
})
.map(|c| {
if json_mode {
format!(
"{{\"hash\":\"{}\",\"height\":{},\"parents\":{}}}",
json_escape(&c.hash),
c.height,
format_args!("[{}]", c.parents.iter().map(|p| format!("\"{}\"", json_escape(p))).collect::<Vec<_>>().join(","))
)
} else {
format!("commit {}\nHeight {}\nMessage: {}\n", c.hash, c.height, c.message)
}
})
.map_err(|e| e.to_string())
}
"branch" => match args.first() {
None => Err("usage: red vcs branch <name> [--from ref]".to_string()),
Some(name) => vcs
.branch_create(reddb::application::CreateBranchInput {
name: name.to_string(),
from: flag_string(flags, "from"),
connection_id,
})
.map(|r| {
if json_mode {
format!(
"{{\"name\":\"{}\",\"target\":\"{}\"}}",
json_escape(&r.name),
json_escape(&r.target)
)
} else {
format!("branch {} -> {}\n", r.name, r.target)
}
})
.map_err(|e| e.to_string()),
},
"branches" => {
vcs.branch_list()
.map(|refs| {
if json_mode {
let items: Vec<String> = refs
.iter()
.map(|r| format!(
"{{\"name\":\"{}\",\"target\":\"{}\"}}",
json_escape(&r.name),
json_escape(&r.target)
))
.collect();
format!("[{}]", items.join(","))
} else {
let mut out = String::new();
for r in refs {
out.push_str(&format!("{}\t{}\n", r.name, r.target));
}
out
}
})
.map_err(|e| e.to_string())
}
"tag" => match args.first() {
None => Err("usage: red vcs tag <name> [target]".to_string()),
Some(name) => {
let target = args
.get(1)
.map(|s| s.to_string())
.or_else(|| flag_string(flags, "from"))
.unwrap_or_else(|| "main".to_string());
vcs.tag(reddb::application::CreateTagInput {
name: name.to_string(),
target,
annotation: None,
})
.map(|r| {
if json_mode {
format!(
"{{\"name\":\"{}\",\"target\":\"{}\"}}",
json_escape(&r.name),
json_escape(&r.target)
)
} else {
format!("tag {} -> {}\n", r.name, r.target)
}
})
.map_err(|e| e.to_string())
}
},
"tags" => {
vcs.tag_list()
.map(|refs| {
if json_mode {
let items: Vec<String> = refs
.iter()
.map(|r| format!(
"{{\"name\":\"{}\",\"target\":\"{}\"}}",
json_escape(&r.name),
json_escape(&r.target)
))
.collect();
format!("[{}]", items.join(","))
} else {
let mut out = String::new();
for r in refs {
out.push_str(&format!("{}\t{}\n", r.name, r.target));
}
out
}
})
.map_err(|e| e.to_string())
}
"checkout" => match args.first() {
None => Err("usage: red vcs checkout <branch|tag|commit>".to_string()),
Some(target) => {
let target = target.to_string();
let kind = if target.len() == 64
&& target.chars().all(|c| c.is_ascii_hexdigit())
{
reddb::application::CheckoutTarget::Commit(target.clone())
} else if target.starts_with("refs/tags/") {
reddb::application::CheckoutTarget::Tag(target.clone())
} else {
reddb::application::CheckoutTarget::Branch(target.clone())
};
vcs.checkout(reddb::application::CheckoutInput {
connection_id,
target: kind,
force: false,
})
.map(|r| {
if json_mode {
format!("{{\"ref\":\"{}\"}}", json_escape(&r.name))
} else {
format!("switched to {}\n", r.name)
}
})
.map_err(|e| e.to_string())
}
},
"merge" => {
let from_opt = args
.first()
.map(|s| s.to_string())
.or_else(|| flag_string(flags, "from"));
let Some(from) = from_opt else {
return emit_vcs_result(
&rt,
"merge",
json_mode,
Err("usage: red vcs merge <branch>".to_string()),
);
};
let strategy = if flag_bool(flags, "ff-only") {
reddb::application::MergeStrategy::FastForwardOnly
} else if flag_bool(flags, "no-ff") {
reddb::application::MergeStrategy::NoFastForward
} else {
reddb::application::MergeStrategy::Auto
};
vcs.merge(reddb::application::MergeInput {
connection_id,
from,
opts: reddb::application::MergeOpts {
strategy,
message: flag_string(flags, "message"),
abort_on_conflict: false,
},
author,
})
.map(|outcome| {
if json_mode {
format!(
"{{\"fast_forward\":{},\"conflicts\":{},\"commit\":{}}}",
outcome.fast_forward,
outcome.conflicts.len(),
outcome
.merge_commit
.as_ref()
.map(|c| format!("\"{}\"", json_escape(&c.hash)))
.unwrap_or_else(|| "null".to_string())
)
} else if outcome.fast_forward {
"fast-forward\n".to_string()
} else {
format!(
"merged (non-ff)\ncommit {}\nmerge_state {}\n",
outcome.merge_commit.as_ref().map(|c| c.hash.as_str()).unwrap_or("?"),
outcome.merge_state_id.as_deref().unwrap_or("?")
)
}
})
.map_err(|e| e.to_string())
}
"log" => {
let limit = flag_string(flags, "limit")
.and_then(|s| s.parse::<usize>().ok())
.unwrap_or(20);
vcs.log(reddb::application::LogInput {
connection_id,
range: reddb::application::LogRange {
to: flag_string(flags, "to").or_else(|| flag_string(flags, "branch")),
from: flag_string(flags, "from"),
limit: Some(limit),
skip: None,
no_merges: false,
},
})
.map(|commits| {
if json_mode {
let items: Vec<String> = commits
.iter()
.map(|c| format!(
"{{\"hash\":\"{}\",\"height\":{},\"message\":\"{}\",\"author\":\"{}\"}}",
json_escape(&c.hash),
c.height,
json_escape(&c.message),
json_escape(&c.author.name)
))
.collect();
format!("[{}]", items.join(","))
} else {
let mut out = String::new();
for c in commits {
out.push_str(&format!(
"commit {}\nAuthor: {} <{}>\n\n {}\n\n",
c.hash, c.author.name, c.author.email, c.message
));
}
out
}
})
.map_err(|e| e.to_string())
}
"status" => {
vcs.status(reddb::application::StatusInput { connection_id })
.map(|s| {
if json_mode {
format!(
"{{\"head_ref\":{},\"head_commit\":{},\"detached\":{}}}",
s.head_ref.as_deref().map(|r| format!("\"{}\"", json_escape(r))).unwrap_or_else(|| "null".to_string()),
s.head_commit.as_deref().map(|h| format!("\"{}\"", json_escape(h))).unwrap_or_else(|| "null".to_string()),
s.detached
)
} else {
format!(
"On branch {}\nHead commit {}\n",
s.head_ref.as_deref().unwrap_or("(detached)"),
s.head_commit.as_deref().unwrap_or("(none)")
)
}
})
.map_err(|e| e.to_string())
}
"lca" => {
let (Some(a), Some(b)) = (args.first(), args.get(1)) else {
return emit_vcs_result(
&rt,
"lca",
json_mode,
Err("usage: red vcs lca <a> <b>".to_string()),
);
};
vcs.lca(a, b)
.map(|hash| {
if json_mode {
format!(
"{{\"lca\":{}}}",
hash.as_ref().map(|h| format!("\"{}\"", json_escape(h))).unwrap_or_else(|| "null".to_string())
)
} else {
hash.map(|h| format!("{h}\n")).unwrap_or_else(|| "(no common ancestor)\n".to_string())
}
})
.map_err(|e| e.to_string())
}
"resolve" => {
let Some(spec) = args.first() else {
return emit_vcs_result(
&rt,
"resolve",
json_mode,
Err("usage: red vcs resolve <ref|hash|prefix>".to_string()),
);
};
vcs.resolve_commitish(spec)
.map(|hash| {
if json_mode {
format!("{{\"hash\":\"{}\"}}", json_escape(&hash))
} else {
format!("{hash}\n")
}
})
.map_err(|e| e.to_string())
}
"versioned" => {
let action = args.first().copied().unwrap_or("list");
match action {
"list" => vcs
.list_versioned()
.map(|list| {
if json_mode {
let items: Vec<String> = list
.iter()
.map(|s| format!("\"{}\"", json_escape(s)))
.collect();
format!("[{}]", items.join(","))
} else if list.is_empty() {
"(no versioned collections)\n".to_string()
} else {
list.into_iter()
.map(|s| format!("{s}\n"))
.collect::<String>()
}
})
.map_err(|e| e.to_string()),
"on" | "enable" | "add" => match args.get(1) {
None => Err("usage: red vcs versioned on <collection>".to_string()),
Some(coll) => vcs
.set_versioned(coll, true)
.map(|()| {
if json_mode {
format!(
"{{\"collection\":\"{}\",\"versioned\":true}}",
json_escape(coll)
)
} else {
format!("opted in: {coll}\n")
}
})
.map_err(|e| e.to_string()),
},
"off" | "disable" | "remove" => match args.get(1) {
None => Err("usage: red vcs versioned off <collection>".to_string()),
Some(coll) => vcs
.set_versioned(coll, false)
.map(|()| {
if json_mode {
format!(
"{{\"collection\":\"{}\",\"versioned\":false}}",
json_escape(coll)
)
} else {
format!("opted out: {coll}\n")
}
})
.map_err(|e| e.to_string()),
},
"check" => match args.get(1) {
None => Err("usage: red vcs versioned check <collection>".to_string()),
Some(coll) => vcs
.is_versioned(coll)
.map(|b| {
if json_mode {
format!(
"{{\"collection\":\"{}\",\"versioned\":{}}}",
json_escape(coll),
b
)
} else if b {
format!("{coll}: versioned\n")
} else {
format!("{coll}: NOT versioned\n")
}
})
.map_err(|e| e.to_string()),
},
_ => Err(format!(
"usage: red vcs versioned [list|on|off|check] <collection>\n\
got: {action}"
)),
}
}
"reset" => {
let Some(target) = args.first() else {
return emit_vcs_result(
&rt,
"reset",
json_mode,
Err("usage: red vcs reset <ref|hash> [--mode soft|mixed|hard]".to_string()),
);
};
let mode_str = flag_string(flags, "mode").unwrap_or_else(|| "mixed".to_string());
let mode = match mode_str.as_str() {
"soft" => reddb::application::ResetMode::Soft,
"hard" => reddb::application::ResetMode::Hard,
_ => reddb::application::ResetMode::Mixed,
};
vcs.reset(reddb::application::ResetInput {
connection_id,
target: target.to_string(),
mode,
})
.map(|()| {
if json_mode {
"{\"ok\":true}".to_string()
} else {
format!("reset ({mode_str}) to {target}\n")
}
})
.map_err(|e| e.to_string())
}
_ => Err(format!(
"Unknown vcs subcommand `{subcommand}`\n\n\
Usage: red vcs <commit|branch|branches|tag|tags|checkout|merge|reset|log|status|lca|resolve|versioned> [args] [flags]\n"
)),
};
emit_vcs_result(&rt, subcommand, json_mode, outcome);
}
fn emit_vcs_result(
rt: &reddb::RedDBRuntime,
subcommand: &str,
json_mode: bool,
outcome: Result<String, String>,
) {
checkpoint_local_runtime(rt);
match outcome {
Ok(text) => {
if json_mode {
json_ok(&format!("vcs.{subcommand}"), &text);
} else {
print!("{text}");
}
}
Err(err) => {
if json_mode {
json_error(&format!("vcs.{subcommand}"), &err);
}
eprintln!("vcs {subcommand} error: {err}");
std::process::exit(1);
}
}
}
fn identify_command(args: &[String]) -> Option<String> {
for arg in args {
if arg == "--" {
break;
}
if !arg.starts_with('-') {
return Some(arg.clone());
}
}
None
}
fn build_flags_for_command(command: Option<&str>) -> Vec<cli::types::FlagSchema> {
let mut flags = cli::types::global_flags();
if matches!(command, Some("dump")) {
flags.retain(|flag| flag.long != "output" && flag.short != Some('o'));
}
match command {
Some("server") => {
flags.extend(vec![
cli::types::FlagSchema::new("path")
.with_short('d')
.with_description("Persistent database file path (omit for in-memory)")
.with_default("./data/reddb.rdb"),
cli::types::FlagSchema::new("bind")
.with_short('b')
.with_description("Bind address (host:port) for the default routed front-door or legacy single-transport mode"),
cli::types::FlagSchema::boolean("grpc").with_description("Enable the gRPC API"),
cli::types::FlagSchema::boolean("http").with_description("Serve the HTTP API"),
cli::types::FlagSchema::new("grpc-bind")
.with_description("Explicit gRPC bind address (host:port)"),
cli::types::FlagSchema::new("http-bind")
.with_description("Explicit HTTP bind address (host:port)"),
cli::types::FlagSchema::new("http-tls-bind")
.with_description("HTTPS bind address (host:port). Runs alongside --http-bind"),
cli::types::FlagSchema::new("http-tls-cert")
.with_description("Path to HTTPS server certificate PEM"),
cli::types::FlagSchema::new("http-tls-key")
.with_description("Path to HTTPS server private key PEM"),
cli::types::FlagSchema::new("http-tls-client-ca")
.with_description("Path to PEM CA bundle for client cert verification (mTLS)"),
cli::types::FlagSchema::new("wire-bind")
.with_description("Wire protocol TCP bind address (host:port)"),
cli::types::FlagSchema::new("wire-tls-bind")
.with_description("Wire protocol TLS bind address (host:port)"),
cli::types::FlagSchema::new("wire-tls-cert")
.with_description("Path to TLS certificate PEM (auto-generated if omitted)"),
cli::types::FlagSchema::new("wire-tls-key")
.with_description("Path to TLS private key PEM"),
cli::types::FlagSchema::new("role")
.with_short('r')
.with_description("Server role")
.with_choices(&["standalone", "primary", "replica"])
.with_default("standalone"),
cli::types::FlagSchema::new("primary-addr")
.with_description("Primary gRPC address for replica mode"),
cli::types::FlagSchema::boolean("read-only")
.with_description("Open the database in read-only mode"),
cli::types::FlagSchema::boolean("no-create-if-missing")
.with_description("Fail instead of creating the database file"),
cli::types::FlagSchema::boolean("vault")
.with_description("Enable the encrypted auth vault"),
cli::types::FlagSchema::new("workers")
.with_short('w')
.with_description("Worker thread count (default: auto-detect from CPUs)"),
]);
}
Some("replica") => {
flags.extend(vec![
cli::types::FlagSchema::new("path")
.with_short('d')
.with_description("Persistent database file path for the replica")
.with_default("./data/reddb.rdb"),
cli::types::FlagSchema::new("bind")
.with_short('b')
.with_description("Bind address (host:port) for the default routed front-door or legacy single-transport mode"),
cli::types::FlagSchema::new("primary-addr")
.with_short('p')
.with_description("Primary gRPC address for replication"),
cli::types::FlagSchema::boolean("grpc").with_description("Enable the gRPC API"),
cli::types::FlagSchema::boolean("http").with_description("Serve the HTTP API"),
cli::types::FlagSchema::new("grpc-bind")
.with_description("Explicit gRPC bind address (host:port)"),
cli::types::FlagSchema::new("http-bind")
.with_description("Explicit HTTP bind address (host:port)"),
cli::types::FlagSchema::new("wire-bind")
.with_description("Wire protocol TCP bind address (host:port)"),
cli::types::FlagSchema::boolean("vault")
.with_description("Enable the encrypted auth vault"),
]);
}
Some("service") => {
flags.extend(vec![
cli::types::FlagSchema::new("binary")
.with_description("Path to the red binary")
.with_default("/usr/local/bin/red"),
cli::types::FlagSchema::new("service-name")
.with_description("systemd unit name")
.with_default("reddb"),
cli::types::FlagSchema::new("user")
.with_description("Service user")
.with_default("reddb"),
cli::types::FlagSchema::new("group")
.with_description("Service group")
.with_default("reddb"),
cli::types::FlagSchema::new("path")
.with_short('d')
.with_description("Persistent database file path")
.with_default("/var/lib/reddb/data.rdb"),
cli::types::FlagSchema::new("bind")
.with_short('b')
.with_description("Bind address (host:port) for the default routed front-door or legacy single-transport mode"),
cli::types::FlagSchema::boolean("grpc")
.with_description("Enable the gRPC API in the service"),
cli::types::FlagSchema::boolean("http").with_description("Install an HTTP service"),
cli::types::FlagSchema::new("grpc-bind")
.with_description("Explicit gRPC bind address (host:port)"),
cli::types::FlagSchema::new("http-bind")
.with_description("Explicit HTTP bind address (host:port)"),
]);
}
Some("mcp") => {
flags.push(
cli::types::FlagSchema::new("path")
.with_short('d')
.with_description("Data directory path (omit for in-memory)")
.with_default(""),
);
}
Some("query") | Some("insert") | Some("get") | Some("delete") | Some("status") => {
flags.push(
cli::types::FlagSchema::new("bind")
.with_short('b')
.with_description("Server address")
.with_default("0.0.0.0:6380"),
);
flags.push(
cli::types::FlagSchema::new("path")
.with_short('p')
.with_description("Open a local .rdb file in embedded mode"),
);
}
Some("admin") => {
flags.extend(vec![
cli::types::FlagSchema::new("bind")
.with_short('b')
.with_description("Server HTTP address")
.with_default("127.0.0.1:8080"),
cli::types::FlagSchema::new("token")
.with_short('t')
.with_description("Admin bearer token (env: RED_ADMIN_TOKEN)"),
cli::types::FlagSchema::boolean("csv")
.with_description("Emit CSV for tabular admin catalog commands"),
cli::types::FlagSchema::new("limit")
.with_description("Max rows for list/stats/query commands"),
cli::types::FlagSchema::new("type")
.with_description("Filter red.admin collections list by model"),
cli::types::FlagSchema::boolean("include-internal")
.with_description("Include internal collections in admin collections list"),
cli::types::FlagSchema::new("collection")
.with_description("Filter red.admin indices/policies by collection"),
cli::types::FlagSchema::boolean("yes")
.with_short('y')
.with_description("Skip interactive confirmation for destructive commands"),
cli::types::FlagSchema::boolean("if-exists")
.with_description("Suppress error when collection does not exist"),
]);
}
Some("dump") => {
flags.extend(vec![
cli::types::FlagSchema::new("path")
.with_description("Local database file to dump from")
.with_default("./data/reddb.rdb"),
cli::types::FlagSchema::new("collection")
.with_short('c')
.with_description("Single collection to dump"),
cli::types::FlagSchema::new("output")
.with_short('o')
.with_description("Destination file"),
]);
}
Some("restore") => {
flags.extend(vec![
cli::types::FlagSchema::new("path")
.with_description("Local database file to restore into")
.with_default("./data/reddb.rdb"),
cli::types::FlagSchema::new("input")
.with_short('i')
.with_description("Dump file to read"),
cli::types::FlagSchema::new("collection")
.with_short('c')
.with_description("Override target collection name"),
]);
}
Some("vcs") => {
flags.extend(vec![
cli::types::FlagSchema::new("path")
.with_short('d')
.with_description("Persistent database file path (omit for in-memory)"),
cli::types::FlagSchema::new("connection")
.with_short('c')
.with_description("Connection id for workset scoping")
.with_default("1"),
cli::types::FlagSchema::new("branch")
.with_description("Branch name (log/checkout/merge)"),
cli::types::FlagSchema::new("from")
.with_description("Source ref or commit (branch create / merge)"),
cli::types::FlagSchema::new("to").with_description("Upper bound for log range"),
cli::types::FlagSchema::new("author")
.with_description("Commit author name")
.with_default("reddb"),
cli::types::FlagSchema::new("email")
.with_description("Commit author email")
.with_default("reddb@localhost"),
cli::types::FlagSchema::new("message")
.with_short('m')
.with_description("Commit message"),
cli::types::FlagSchema::new("limit")
.with_description("Max log entries")
.with_default("20"),
cli::types::FlagSchema::new("mode")
.with_description("Reset mode: soft | mixed | hard")
.with_default("mixed"),
cli::types::FlagSchema::boolean("ff-only")
.with_description("Merge only if fast-forward"),
cli::types::FlagSchema::boolean("no-ff")
.with_description("Always create a merge commit"),
]);
}
Some("health") => {
flags.extend(vec![
cli::types::FlagSchema::new("bind")
.with_short('b')
.with_description("Server bind address; defaults to the router on 127.0.0.1:5050 when no transport is selected"),
cli::types::FlagSchema::boolean("grpc")
.with_description("Probe a gRPC listener (default transport)"),
cli::types::FlagSchema::boolean("http").with_description("Probe an HTTP listener"),
]);
}
Some("tick") => {
flags.extend(vec![
cli::types::FlagSchema::new("bind")
.with_short('b')
.with_description("Server HTTP bind address")
.with_default("127.0.0.1:8080"),
cli::types::FlagSchema::new("operations").with_description(
"Comma-separated operations: maintenance,retention,checkpoint",
),
cli::types::FlagSchema::boolean("dry-run")
.with_description("Validate operations without applying"),
]);
}
Some("migrate-from-redis") => {
flags.extend(vec![
cli::types::FlagSchema::boolean("dry-run")
.with_description("Validate Redis and RedDB connectivity without cache writes"),
cli::types::FlagSchema::new("redis-url").with_description(
"Redis URL to validate, for example redis://127.0.0.1:6379/0",
),
cli::types::FlagSchema::new("path")
.with_short('d')
.with_description("Local RedDB .rdb file to open for connectivity validation"),
cli::types::FlagSchema::new("phase")
.with_description("Migration phase: dry-run | dual-write")
.with_default("dry-run"),
cli::types::FlagSchema::new("namespace")
.with_description("Blob Cache namespace recorded in dry-run output")
.with_default("redis-migration"),
]);
}
Some("rpc") => {
flags.extend(vec![
cli::types::FlagSchema::boolean("stdio")
.with_description("Speak JSON-RPC 2.0 line-delimited over stdin/stdout"),
cli::types::FlagSchema::new("path")
.with_short('d')
.with_description("Persistent database file path (omit for in-memory)"),
cli::types::FlagSchema::new("connect")
.with_short('c')
.with_description("Proxy to a remote gRPC server (e.g. grpc://host:5055)"),
cli::types::FlagSchema::new("token")
.with_short('t')
.with_description("Auth token forwarded to the remote server"),
]);
}
Some("connect") => {
flags.extend(vec![
cli::types::FlagSchema::new("token")
.with_short('t')
.with_description("Auth token (session or API key)"),
cli::types::FlagSchema::new("query")
.with_short('q')
.with_description("Execute a single query and exit"),
cli::types::FlagSchema::new("user")
.with_short('u')
.with_description("Username for login"),
cli::types::FlagSchema::new("password")
.with_short('p')
.with_description("Password for login"),
]);
}
Some("bootstrap") => {
flags.extend(vec![
cli::types::FlagSchema::new("path")
.with_short('d')
.with_description("Persistent database file path"),
cli::types::FlagSchema::boolean("vault")
.with_description("Required: seal credentials in the encrypted vault"),
cli::types::FlagSchema::new("username")
.with_short('u')
.with_description("Admin username (defaults to REDDB_USERNAME)"),
cli::types::FlagSchema::new("password").with_description(
"Admin password (DEV ONLY — leaks to ps; prefer --password-stdin)",
),
cli::types::FlagSchema::boolean("password-stdin")
.with_description("Read the admin password from stdin (one line)"),
cli::types::FlagSchema::boolean("print-certificate")
.with_description("Print only the issued certificate to stdout"),
]);
}
_ => {}
}
flags
}
#[allow(clippy::type_complexity)]
fn build_completion_tree() -> Vec<(String, Vec<(String, Vec<String>)>)> {
vec![
("server".to_string(), vec![]),
("service".to_string(), vec![]),
("replica".to_string(), vec![]),
("query".to_string(), vec![]),
("insert".to_string(), vec![]),
("get".to_string(), vec![]),
("delete".to_string(), vec![]),
("tick".to_string(), vec![]),
("migrate-from-redis".to_string(), vec![]),
("health".to_string(), vec![]),
(
"admin".to_string(),
vec![
(
"collections".to_string(),
vec![
"list".to_string(),
"show".to_string(),
"stats".to_string(),
"drop".to_string(),
"truncate".to_string(),
],
),
("indices".to_string(), vec!["list".to_string()]),
("policies".to_string(), vec!["list".to_string()]),
("query".to_string(), vec![]),
("cache".to_string(), vec![]),
],
),
("status".to_string(), vec![]),
("mcp".to_string(), vec![]),
("connect".to_string(), vec![]),
("version".to_string(), vec![]),
]
}
fn build_server_config(
flags: &HashMap<String, FlagValue>,
forced_role: Option<&str>,
) -> Result<ServerCommandConfig, String> {
let grpc_flag = flag_bool(flags, "grpc");
let http_flag = flag_bool(flags, "http");
let explicit_grpc_bind = flag_string(flags, "grpc-bind")
.filter(|value| !value.is_empty())
.or_else(|| env_string("REDDB_GRPC_BIND_ADDR"));
let explicit_http_bind = flag_string(flags, "http-bind")
.filter(|value| !value.is_empty())
.or_else(|| env_string("REDDB_HTTP_BIND_ADDR"));
let legacy_bind = flag_string(flags, "bind")
.filter(|value| !value.is_empty())
.or_else(|| {
if explicit_grpc_bind.is_none() && explicit_http_bind.is_none() {
env_string("REDDB_BIND_ADDR")
} else {
None
}
});
let wire_bind_addr = flag_string(flags, "wire-bind").filter(|v| !v.is_empty());
let wire_tls_bind_addr = flag_string(flags, "wire-tls-bind").filter(|v| !v.is_empty());
let router_bind_addr = if explicit_grpc_bind.is_none()
&& explicit_http_bind.is_none()
&& wire_bind_addr.is_none()
&& wire_tls_bind_addr.is_none()
&& !grpc_flag
&& !http_flag
{
Some(
legacy_bind
.clone()
.unwrap_or_else(|| reddb::service_cli::DEFAULT_ROUTER_BIND_ADDR.to_string()),
)
} else {
None
};
let (grpc_bind_addr, http_bind_addr) = if router_bind_addr.is_some() {
(None, None)
} else {
resolve_server_binds(flags)?
};
let path = resolve_server_path(flags).map(PathBuf::from);
let role = forced_role
.map(|value| value.to_string())
.or_else(|| flag_string(flags, "role"))
.unwrap_or_else(|| "standalone".to_string());
let workers = flag_string(flags, "workers").and_then(|v| v.parse::<usize>().ok());
let wire_tls_cert = flag_string(flags, "wire-tls-cert")
.filter(|v| !v.is_empty())
.map(PathBuf::from);
let wire_tls_key = flag_string(flags, "wire-tls-key")
.filter(|v| !v.is_empty())
.map(PathBuf::from);
let pg_bind_addr = flag_string(flags, "pg-bind").filter(|v| !v.is_empty());
let telemetry = build_telemetry_config(flags, path.as_deref());
let grpc_tls_bind_addr = std::env::var("REDDB_GRPC_TLS_BIND")
.ok()
.filter(|v| !v.trim().is_empty());
let grpc_tls_cert = std::env::var("REDDB_GRPC_TLS_CERT")
.ok()
.filter(|v| !v.trim().is_empty())
.map(PathBuf::from);
let grpc_tls_key = std::env::var("REDDB_GRPC_TLS_KEY")
.ok()
.filter(|v| !v.trim().is_empty())
.map(PathBuf::from);
let grpc_tls_client_ca = std::env::var("REDDB_GRPC_TLS_CLIENT_CA")
.ok()
.filter(|v| !v.trim().is_empty())
.map(PathBuf::from);
let http_tls_bind_addr = flag_string(flags, "http-tls-bind")
.filter(|v| !v.is_empty())
.or_else(|| {
std::env::var("REDDB_HTTP_TLS_BIND")
.ok()
.filter(|v| !v.trim().is_empty())
});
let http_tls_cert = flag_string(flags, "http-tls-cert")
.filter(|v| !v.is_empty())
.or_else(|| {
std::env::var("REDDB_HTTP_TLS_CERT")
.ok()
.filter(|v| !v.trim().is_empty())
})
.map(PathBuf::from);
let http_tls_key = flag_string(flags, "http-tls-key")
.filter(|v| !v.is_empty())
.or_else(|| {
std::env::var("REDDB_HTTP_TLS_KEY")
.ok()
.filter(|v| !v.trim().is_empty())
})
.map(PathBuf::from);
let http_tls_client_ca = flag_string(flags, "http-tls-client-ca")
.filter(|v| !v.is_empty())
.or_else(|| {
std::env::var("REDDB_HTTP_TLS_CLIENT_CA")
.ok()
.filter(|v| !v.trim().is_empty())
})
.map(PathBuf::from);
Ok(ServerCommandConfig {
path,
router_bind_addr,
grpc_bind_addr,
grpc_tls_bind_addr,
grpc_tls_cert,
grpc_tls_key,
grpc_tls_client_ca,
http_bind_addr,
http_tls_bind_addr,
http_tls_cert,
http_tls_key,
http_tls_client_ca,
wire_bind_addr,
wire_tls_bind_addr,
wire_tls_cert,
wire_tls_key,
pg_bind_addr,
create_if_missing: !flag_bool(flags, "no-create-if-missing"),
read_only: flag_bool(flags, "read-only"),
role,
primary_addr: flag_string(flags, "primary-addr").filter(|value| !value.is_empty()),
vault: flag_bool(flags, "vault"),
workers,
telemetry: Some(telemetry),
})
}
fn build_telemetry_config(
flags: &HashMap<String, FlagValue>,
db_path: Option<&std::path::Path>,
) -> reddb::telemetry::TelemetryConfig {
let mut base = reddb::service_cli::default_telemetry_for_path(db_path);
if let Some(dir) = flag_string(flags, "log-dir").filter(|v| !v.is_empty()) {
base.log_dir = Some(PathBuf::from(dir));
base.log_dir_explicit = true;
}
if flag_bool(flags, "no-log-file") {
base.log_dir = None;
base.log_file_disabled = true;
}
if let Some(level) = flag_string(flags, "log-level").filter(|v| !v.is_empty()) {
base.level_filter = level;
base.level_explicit = true;
}
if let Some(fmt) = flag_string(flags, "log-format").filter(|v| !v.is_empty()) {
if let Some(parsed) = reddb::telemetry::LogFormat::parse(&fmt) {
base.format = parsed;
base.format_explicit = true;
}
}
if let Some(prefix) = flag_string(flags, "log-file-prefix").filter(|v| !v.is_empty()) {
base.file_prefix = prefix;
base.file_prefix_explicit = true;
}
if let Some(keep) = flag_string(flags, "log-keep-days").and_then(|v| v.parse::<u16>().ok()) {
base.rotation_keep_days = keep;
base.rotation_keep_days_explicit = true;
}
base
}
fn build_systemd_service_config(
flags: &HashMap<String, FlagValue>,
) -> Result<SystemdServiceConfig, String> {
let grpc_flag = flag_bool(flags, "grpc");
let http_flag = flag_bool(flags, "http");
let legacy_bind = flag_string(flags, "bind").filter(|value| !value.is_empty());
let explicit_grpc_bind = flag_string(flags, "grpc-bind").filter(|value| !value.is_empty());
let explicit_http_bind = flag_string(flags, "http-bind").filter(|value| !value.is_empty());
let router_bind_addr =
if explicit_grpc_bind.is_none() && explicit_http_bind.is_none() && !grpc_flag && !http_flag
{
Some(
legacy_bind
.clone()
.unwrap_or_else(|| reddb::service_cli::DEFAULT_ROUTER_BIND_ADDR.to_string()),
)
} else {
None
};
let (grpc_bind_addr, http_bind_addr) = if router_bind_addr.is_some() {
(None, None)
} else {
resolve_server_binds(flags)?
};
let binary_path = flag_string(flags, "binary")
.filter(|value| !value.is_empty())
.map(PathBuf::from)
.unwrap_or_else(|| PathBuf::from("/usr/local/bin/red"));
let data_path = flag_string(flags, "path")
.filter(|value| !value.is_empty())
.map(PathBuf::from)
.unwrap_or_else(|| PathBuf::from("/var/lib/reddb/data.rdb"));
Ok(SystemdServiceConfig {
service_name: flag_string(flags, "service-name")
.filter(|value| !value.is_empty())
.unwrap_or_else(|| "reddb".to_string()),
binary_path,
run_user: flag_string(flags, "user")
.filter(|value| !value.is_empty())
.unwrap_or_else(|| "reddb".to_string()),
run_group: flag_string(flags, "group")
.filter(|value| !value.is_empty())
.unwrap_or_else(|| "reddb".to_string()),
data_path,
router_bind_addr,
grpc_bind_addr,
http_bind_addr,
})
}
fn resolve_server_binds(
flags: &HashMap<String, FlagValue>,
) -> Result<(Option<String>, Option<String>), String> {
let grpc = flag_bool(flags, "grpc");
let http = flag_bool(flags, "http");
let mut grpc_bind = flag_string(flags, "grpc-bind")
.filter(|value| !value.is_empty())
.or_else(|| env_string("REDDB_GRPC_BIND_ADDR"));
let mut http_bind = flag_string(flags, "http-bind")
.filter(|value| !value.is_empty())
.or_else(|| env_string("REDDB_HTTP_BIND_ADDR"));
let legacy_bind = flag_string(flags, "bind")
.filter(|value| !value.is_empty())
.or_else(|| {
if grpc_bind.is_none() && http_bind.is_none() {
env_string("REDDB_BIND_ADDR")
} else {
None
}
});
if legacy_bind.is_some() && (grpc_bind.is_some() || http_bind.is_some()) {
return Err("use either --bind or the explicit --grpc-bind/--http-bind flags".to_string());
}
if let Some(bind_addr) = legacy_bind {
match (grpc, http) {
(true, true) => {
return Err(
"--bind is ambiguous when both --grpc and --http are enabled; use --grpc-bind and --http-bind".to_string(),
)
}
(false, true) => http_bind = Some(bind_addr),
_ => grpc_bind = Some(bind_addr),
}
} else {
if grpc {
grpc_bind.get_or_insert_with(|| ServerTransport::Grpc.default_bind_addr().to_string());
}
if http {
http_bind.get_or_insert_with(|| ServerTransport::Http.default_bind_addr().to_string());
}
}
if grpc_bind.is_none() && http_bind.is_none() {
grpc_bind = Some(ServerTransport::Grpc.default_bind_addr().to_string());
}
Ok((grpc_bind, http_bind))
}
fn select_transport(flags: &HashMap<String, FlagValue>) -> Result<ServerTransport, String> {
let grpc = flag_bool(flags, "grpc");
let http = flag_bool(flags, "http");
match (grpc, http) {
(true, true) => Err("use only one of --grpc or --http".to_string()),
(false, true) => Ok(ServerTransport::Http),
_ => Ok(ServerTransport::Grpc),
}
}
fn flag_bool(flags: &HashMap<String, FlagValue>, name: &str) -> bool {
flags
.get(name)
.map(|value| value.is_truthy())
.unwrap_or(false)
}
fn flag_string(flags: &HashMap<String, FlagValue>, name: &str) -> Option<String> {
flags.get(name).map(|value| value.as_str_value())
}
fn env_string(name: &str) -> Option<String> {
std::env::var(name).ok().filter(|value| !value.is_empty())
}
fn resolve_server_path(flags: &HashMap<String, FlagValue>) -> Option<String> {
let env_path = env_string("REDDB_DATA_PATH");
match flag_string(flags, "path").filter(|value| !value.is_empty()) {
Some(path) if path == "./data/reddb.rdb" => env_path.or(Some(path)),
Some(path) => Some(path),
None => env_path,
}
}
fn json_optional_string(value: Option<&str>) -> String {
match value {
Some(value) => format!("\"{}\"", json_escape(value)),
None => "null".to_string(),
}
}
fn server_command_json(command: &str, config: &ServerCommandConfig) -> String {
format!(
"{{\"ok\":true,\"command\":\"{}\",\"data\":{{\"router_bind\":{},\"grpc_bind\":{},\"http_bind\":{},\"wire_bind\":{}}}}}",
json_escape(command),
json_optional_string(config.router_bind_addr.as_deref()),
json_optional_string(config.grpc_bind_addr.as_deref()),
json_optional_string(config.http_bind_addr.as_deref()),
json_optional_string(config.wire_bind_addr.as_deref()),
)
}
fn build_tick_payload(operations: Option<&str>, dry_run: bool) -> String {
match operations {
None => {
format!("{{\"dry_run\":{}}}", if dry_run { "true" } else { "false" })
}
Some(raw) => {
let normalized: Vec<String> = raw
.split(',')
.map(str::trim)
.filter(|value| !value.is_empty())
.map(json_escape)
.map(|value| format!("\"{value}\""))
.collect();
if normalized.is_empty() {
return format!("{{\"dry_run\":{}}}", if dry_run { "true" } else { "false" });
}
format!(
"{{\"operations\":[{}],\"dry_run\":{}}}",
normalized.join(","),
if dry_run { "true" } else { "false" }
)
}
}
}
fn json_escape(value: &str) -> String {
value.replace('\\', "\\\\").replace('"', "\\\"")
}
fn run_admin_command(flags: &HashMap<String, FlagValue>, remaining: &[String]) {
let json_mode = wants_json(flags);
let bind = flag_string(flags, "bind").unwrap_or_else(|| "127.0.0.1:8080".to_string());
let token = flag_string(flags, "token")
.or_else(|| std::env::var("RED_ADMIN_TOKEN").ok())
.filter(|t| !t.trim().is_empty());
let subcommand = remaining.first().map(|s| s.as_str()).unwrap_or("help");
let args: Vec<&str> = remaining.iter().skip(1).map(|s| s.as_str()).collect();
match subcommand {
"cache" => run_admin_cache_command(flags, &bind, token.as_deref(), json_mode, &args),
"collections" => {
run_admin_collections_command(flags, &bind, token.as_deref(), json_mode, &args)
}
"indices" => run_admin_indices_command(flags, &bind, token.as_deref(), json_mode, &args),
"policies" => run_admin_policies_command(flags, &bind, token.as_deref(), json_mode, &args),
"query" => run_admin_query_command(flags, &bind, token.as_deref(), json_mode, &args),
_ => {
if json_mode {
json_ok(
"admin",
"{\"subcommands\":[\"cache\",\"collections\",\"indices\",\"policies\",\"query\"],\"message\":\"use a subcommand, e.g. red admin collections list\"}",
);
} else {
println!("Usage: red admin <subcommand>");
println!();
println!("Subcommands:");
println!(" cache Blob cache admin operations");
println!(" collections Collection catalog queries via red.collections/red.columns/red.stats");
println!(" indices Index catalog queries via red.indices");
println!(" policies Policy catalog queries via red.policies");
println!(" query Run a native SQL catalog query via /query");
println!();
println!("Flags:");
println!(" --bind <addr> Server HTTP address (default: 127.0.0.1:8080, env: REDDB_BIND_ADDR)");
println!(" --token <tok> Admin bearer token (env: RED_ADMIN_TOKEN)");
println!(" --json JSON output");
println!(" --csv CSV output for tabular commands");
println!(" --limit <n> Limit rows for list/stats/query commands");
println!(" --no-color Disable ANSI colors");
}
}
}
}
fn run_admin_collections_command(
flags: &HashMap<String, FlagValue>,
bind: &str,
token: Option<&str>,
json_mode: bool,
args: &[&str],
) {
let subcommand = args.first().copied().unwrap_or("help");
let sub_args: &[&str] = if args.is_empty() { &[] } else { &args[1..] };
let format = admin_output_format(flags, json_mode);
let use_color = !flag_bool(flags, "no-color");
match subcommand {
"list" => {
let mut filters = Vec::new();
if !flag_bool(flags, "include-internal") {
filters.push("internal = false".to_string());
}
if let Some(model) = flag_string(flags, "type").filter(|s| !s.is_empty()) {
filters.push(format!("model = '{}'", sql_string_literal(&model)));
}
let mut sql = "SELECT * FROM red.collections".to_string();
if !filters.is_empty() {
sql.push_str(" WHERE ");
sql.push_str(&filters.join(" AND "));
}
push_limit(&mut sql, flags);
emit_admin_query_result(
"admin.collections.list",
bind,
token,
&sql,
format,
use_color,
);
}
"show" => {
let Some(name) = sub_args.first().copied().filter(|s| !s.is_empty()) else {
admin_usage_error(
"admin.collections.show",
"collection name is required",
"usage: red admin collections show <name>",
json_mode,
);
};
emit_admin_collection_show(bind, token, name, format, use_color);
}
"stats" => {
let mut sql = "SELECT * FROM red.stats".to_string();
if let Some(name) = sub_args.first().copied().filter(|s| !s.is_empty()) {
sql.push_str(" WHERE collection = '");
sql.push_str(&sql_string_literal(name));
sql.push('\'');
}
push_limit(&mut sql, flags);
emit_admin_query_result(
"admin.collections.stats",
bind,
token,
&sql,
format,
use_color,
);
}
"drop" => {
let Some(name) = sub_args.first().copied().filter(|s| !s.is_empty()) else {
admin_usage_error(
"admin.collections.drop",
"collection name is required",
"usage: red admin collections drop <name> [--if-exists] [--yes] [--json]",
json_mode,
);
};
let if_exists = flag_bool(flags, "if-exists");
let skip_confirm = flag_bool(flags, "yes");
if !skip_confirm && !json_mode {
eprint!("Drop collection '{name}'? This is irreversible. Type 'yes' to confirm: ");
let mut answer = String::new();
std::io::stdin().read_line(&mut answer).unwrap_or_default();
if answer.trim() != "yes" {
eprintln!("Aborted.");
std::process::exit(1);
}
}
let sql = if if_exists {
format!("DROP COLLECTION IF EXISTS {name}")
} else {
format!("DROP COLLECTION {name}")
};
admin_execute_ddl_command(
"admin.collections.drop",
bind,
token,
&sql,
name,
"dropped",
json_mode,
);
}
"truncate" => {
let Some(name) = sub_args.first().copied().filter(|s| !s.is_empty()) else {
admin_usage_error(
"admin.collections.truncate",
"collection name is required",
"usage: red admin collections truncate <name> [--if-exists] [--json]",
json_mode,
);
};
let if_exists = flag_bool(flags, "if-exists");
let sql = if if_exists {
format!("TRUNCATE COLLECTION IF EXISTS {name}")
} else {
format!("TRUNCATE COLLECTION {name}")
};
admin_execute_ddl_command(
"admin.collections.truncate",
bind,
token,
&sql,
name,
"truncated",
json_mode,
);
}
_ => {
if json_mode {
json_ok(
"admin.collections",
"{\"subcommands\":[\"list\",\"show\",\"stats\",\"drop\",\"truncate\"]}",
);
} else {
println!("Usage: red admin collections <list|show|stats|drop|truncate> [args]");
println!();
println!("Subcommands:");
println!(" list [--type table|queue|vector|document|timeseries|graph|kv] [--include-internal]");
println!(" show <name>");
println!(" stats [<name>]");
println!(" drop <name> [--if-exists] [--yes]");
println!(" truncate <name> [--if-exists]");
println!();
println!("Flags: --json --csv --limit <n> --no-color --bind <addr> --token <tok>");
}
}
}
}
fn run_admin_indices_command(
flags: &HashMap<String, FlagValue>,
bind: &str,
token: Option<&str>,
json_mode: bool,
args: &[&str],
) {
let subcommand = args.first().copied().unwrap_or("help");
let format = admin_output_format(flags, json_mode);
let use_color = !flag_bool(flags, "no-color");
match subcommand {
"list" => {
let mut sql = "SELECT * FROM red.indices".to_string();
if let Some(collection) = flag_string(flags, "collection").filter(|s| !s.is_empty()) {
sql.push_str(" WHERE collection = '");
sql.push_str(&sql_string_literal(&collection));
sql.push('\'');
}
push_limit(&mut sql, flags);
emit_admin_query_result("admin.indices.list", bind, token, &sql, format, use_color);
}
_ => {
if json_mode {
json_ok("admin.indices", "{\"subcommands\":[\"list\"]}");
} else {
println!("Usage: red admin indices list [--collection <name>]");
println!("Flags: --json --csv --limit <n> --no-color --bind <addr> --token <tok>");
}
}
}
}
fn run_admin_policies_command(
flags: &HashMap<String, FlagValue>,
bind: &str,
token: Option<&str>,
json_mode: bool,
args: &[&str],
) {
let subcommand = args.first().copied().unwrap_or("help");
let format = admin_output_format(flags, json_mode);
let use_color = !flag_bool(flags, "no-color");
match subcommand {
"list" => {
let mut sql = "SELECT * FROM red.policies".to_string();
if let Some(collection) = flag_string(flags, "collection").filter(|s| !s.is_empty()) {
sql.push_str(" WHERE collection = '");
sql.push_str(&sql_string_literal(&collection));
sql.push('\'');
}
push_limit(&mut sql, flags);
emit_admin_query_result("admin.policies.list", bind, token, &sql, format, use_color);
}
_ => {
if json_mode {
json_ok("admin.policies", "{\"subcommands\":[\"list\"]}");
} else {
println!("Usage: red admin policies list [--collection <name>]");
println!("Flags: --json --csv --limit <n> --no-color --bind <addr> --token <tok>");
}
}
}
}
fn run_admin_query_command(
flags: &HashMap<String, FlagValue>,
bind: &str,
token: Option<&str>,
json_mode: bool,
args: &[&str],
) {
let Some(sql) = args.first().copied().filter(|s| !s.is_empty()) else {
admin_usage_error(
"admin.query",
"SQL argument is required",
"usage: red admin query \"SELECT * FROM red.collections\"",
json_mode,
);
};
let mut sql = sql.to_string();
push_limit(&mut sql, flags);
emit_admin_query_result(
"admin.query",
bind,
token,
&sql,
admin_output_format(flags, json_mode),
!flag_bool(flags, "no-color"),
);
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum AdminOutputFormat {
Text,
Json,
Csv,
}
#[derive(Debug, Clone)]
struct AdminQueryTable {
columns: Vec<String>,
rows: Vec<BTreeMap<String, reddb::json::Value>>,
}
fn admin_output_format(flags: &HashMap<String, FlagValue>, json_mode: bool) -> AdminOutputFormat {
if json_mode {
AdminOutputFormat::Json
} else if flag_bool(flags, "csv") || flag_string(flags, "output").as_deref() == Some("csv") {
AdminOutputFormat::Csv
} else {
AdminOutputFormat::Text
}
}
fn admin_usage_error(command: &str, message: &str, usage: &str, json_mode: bool) -> ! {
if json_mode {
json_error(command, message);
}
eprintln!("error: {message}");
eprintln!("{usage}");
std::process::exit(1);
}
fn sql_string_literal(value: &str) -> String {
value.replace('\'', "''")
}
fn push_limit(sql: &mut String, flags: &HashMap<String, FlagValue>) {
let Some(limit) = flag_string(flags, "limit")
.filter(|s| !s.is_empty())
.and_then(|s| s.parse::<u64>().ok())
else {
return;
};
if !sql.to_ascii_lowercase().contains(" limit ") {
sql.push_str(" LIMIT ");
sql.push_str(&limit.to_string());
}
}
fn emit_admin_query_result(
command: &str,
bind: &str,
token: Option<&str>,
sql: &str,
format: AdminOutputFormat,
use_color: bool,
) {
match admin_query_table(bind, token, sql) {
Ok(table) => match format {
AdminOutputFormat::Json => println!("{}", admin_rows_json(&table.rows)),
AdminOutputFormat::Csv => print!("{}", format_admin_csv(&table)),
AdminOutputFormat::Text => print!("{}", format_admin_table(&table, use_color)),
},
Err(err) => {
if format == AdminOutputFormat::Json {
json_error(command, &err);
}
eprintln!("error: {err}");
std::process::exit(1);
}
}
}
fn emit_admin_collection_show(
bind: &str,
token: Option<&str>,
name: &str,
format: AdminOutputFormat,
use_color: bool,
) {
let escaped = sql_string_literal(name);
let queries = [
(
"collection",
format!("SELECT * FROM red.collections WHERE name = '{escaped}'"),
),
(
"schema",
format!("SELECT * FROM red.columns WHERE collection = '{escaped}'"),
),
(
"indices",
format!("SELECT * FROM red.indices WHERE collection = '{escaped}'"),
),
(
"policies",
format!("SELECT * FROM red.policies WHERE collection = '{escaped}'"),
),
(
"stats",
format!("SELECT * FROM red.stats WHERE collection = '{escaped}'"),
),
];
let mut sections = Vec::new();
for (label, sql) in queries {
match admin_query_table(bind, token, &sql) {
Ok(table) => sections.push((label.to_string(), table)),
Err(err) => {
if format == AdminOutputFormat::Json {
json_error("admin.collections.show", &err);
}
eprintln!("error: {err}");
std::process::exit(1);
}
}
}
match format {
AdminOutputFormat::Json => println!("{}", admin_sections_json(§ions)),
AdminOutputFormat::Csv => print!("{}", format_admin_sections_csv(§ions)),
AdminOutputFormat::Text => {
print!("{}", format_admin_sections_text(name, §ions, use_color))
}
}
}
fn admin_query_table(
bind: &str,
token: Option<&str>,
sql: &str,
) -> Result<AdminQueryTable, String> {
let payload = format!("{{\"query\":\"{}\"}}", json_escape(sql));
let body = post_json_to_http_authed(bind, "/query", &payload, token)?;
admin_table_from_query_response(&body)
}
fn admin_execute_ddl_command(
command: &str,
bind: &str,
token: Option<&str>,
sql: &str,
name: &str,
verb: &str,
json_mode: bool,
) {
let payload = format!("{{\"query\":\"{}\"}}", json_escape(sql));
match post_json_to_http_authed(bind, "/query", &payload, token) {
Ok(body) => {
let affected = parse_affected_rows_from_body(&body);
if json_mode {
println!(
"{{\"ok\":true,\"command\":\"{command}\",\"collection\":\"{name}\",\"affected_rows\":{affected}}}"
);
} else {
println!("Collection '{name}' {verb}. ({affected} rows affected)");
}
}
Err(err) => {
if json_mode {
json_error(command, &err);
}
eprintln!("error: {err}");
std::process::exit(1);
}
}
}
fn parse_affected_rows_from_body(body: &str) -> u64 {
reddb::json::from_str::<reddb::json::Value>(body)
.ok()
.and_then(|v| {
v.get("affected_rows")
.and_then(|n| n.as_f64())
.map(|f| f as u64)
})
.unwrap_or(0)
}
fn admin_table_from_query_response(body: &str) -> Result<AdminQueryTable, String> {
let parsed: reddb::json::Value =
reddb::json::from_str(body).map_err(|err| format!("invalid query JSON response: {err}"))?;
let result = parsed
.get("result")
.ok_or_else(|| "query response missing result".to_string())?;
let columns = result
.get("columns")
.and_then(|v| v.as_array())
.ok_or_else(|| "query response missing result.columns".to_string())?
.iter()
.filter_map(|value| value.as_str().map(ToString::to_string))
.collect::<Vec<_>>();
let records = result
.get("records")
.and_then(|v| v.as_array())
.ok_or_else(|| "query response missing result.records".to_string())?;
let rows = records
.iter()
.map(|record| {
record
.get("values")
.and_then(|values| values.as_object())
.cloned()
.unwrap_or_default()
})
.collect::<Vec<_>>();
Ok(AdminQueryTable { columns, rows })
}
fn admin_rows_json(rows: &[BTreeMap<String, reddb::json::Value>]) -> String {
let values = rows
.iter()
.cloned()
.map(reddb::json::Value::Object)
.collect::<Vec<_>>();
reddb::json::Value::Array(values).to_string_compact()
}
fn admin_sections_json(sections: &[(String, AdminQueryTable)]) -> String {
let mut object = BTreeMap::new();
for (name, table) in sections {
let rows = table
.rows
.iter()
.cloned()
.map(reddb::json::Value::Object)
.collect::<Vec<_>>();
object.insert(name.clone(), reddb::json::Value::Array(rows));
}
reddb::json::Value::Object(object).to_string_compact()
}
fn format_admin_table(table: &AdminQueryTable, use_color: bool) -> String {
if table.rows.is_empty() {
return "(no rows)\n".to_string();
}
let columns = if table.columns.is_empty() {
infer_admin_columns(&table.rows)
} else {
table.columns.clone()
};
let widths = admin_column_widths(&columns, &table.rows);
let mut out = String::new();
for (i, column) in columns.iter().enumerate() {
if i > 0 {
out.push_str(" ");
}
let label = if use_color {
format!("\x1b[36;1m{column}\x1b[0m")
} else {
column.clone()
};
out.push_str(&format!("{label:<width$}", width = widths[i]));
}
out.push('\n');
out.push_str(
&widths
.iter()
.map(|width| "-".repeat(*width))
.collect::<Vec<_>>()
.join(" "),
);
out.push('\n');
for row in &table.rows {
for (i, column) in columns.iter().enumerate() {
if i > 0 {
out.push_str(" ");
}
let value = row
.get(column)
.map(admin_json_value_display)
.unwrap_or_default();
out.push_str(&format!("{value:<width$}", width = widths[i]));
}
out.push('\n');
}
out
}
fn format_admin_sections_text(
collection: &str,
sections: &[(String, AdminQueryTable)],
use_color: bool,
) -> String {
let mut out = String::new();
out.push_str(&format!("Collection: {collection}\n"));
for (name, table) in sections {
out.push('\n');
let title = if use_color {
format!("\x1b[35;1m{name}\x1b[0m")
} else {
name.clone()
};
out.push_str(&title);
out.push('\n');
out.push_str(&format_admin_table(table, use_color));
}
out
}
fn format_admin_csv(table: &AdminQueryTable) -> String {
let columns = if table.columns.is_empty() {
infer_admin_columns(&table.rows)
} else {
table.columns.clone()
};
let mut out = String::new();
out.push_str(
&columns
.iter()
.map(|value| csv_escape(value))
.collect::<Vec<_>>()
.join(","),
);
out.push('\n');
for row in &table.rows {
out.push_str(
&columns
.iter()
.map(|column| {
row.get(column)
.map(admin_json_value_display)
.map(|value| csv_escape(&value))
.unwrap_or_default()
})
.collect::<Vec<_>>()
.join(","),
);
out.push('\n');
}
out
}
fn format_admin_sections_csv(sections: &[(String, AdminQueryTable)]) -> String {
let mut out = String::new();
for (section, table) in sections {
let columns = if table.columns.is_empty() {
infer_admin_columns(&table.rows)
} else {
table.columns.clone()
};
out.push_str("section,");
out.push_str(
&columns
.iter()
.map(|value| csv_escape(value))
.collect::<Vec<_>>()
.join(","),
);
out.push('\n');
for row in &table.rows {
out.push_str(&csv_escape(section));
out.push(',');
out.push_str(
&columns
.iter()
.map(|column| {
row.get(column)
.map(admin_json_value_display)
.map(|value| csv_escape(&value))
.unwrap_or_default()
})
.collect::<Vec<_>>()
.join(","),
);
out.push('\n');
}
}
out
}
fn infer_admin_columns(rows: &[BTreeMap<String, reddb::json::Value>]) -> Vec<String> {
let mut columns = Vec::new();
for row in rows {
for key in row.keys() {
if !columns.contains(key) {
columns.push(key.clone());
}
}
}
columns
}
fn admin_column_widths(
columns: &[String],
rows: &[BTreeMap<String, reddb::json::Value>],
) -> Vec<usize> {
columns
.iter()
.map(|column| {
let value_width = rows
.iter()
.filter_map(|row| row.get(column))
.map(|value| admin_json_value_display(value).len())
.max()
.unwrap_or(0);
column.len().max(value_width)
})
.collect()
}
fn admin_json_value_display(value: &reddb::json::Value) -> String {
match value {
reddb::json::Value::Null => "".to_string(),
reddb::json::Value::String(s) => s.clone(),
reddb::json::Value::Bool(b) => b.to_string(),
reddb::json::Value::Number(n) if n.fract() == 0.0 => (*n as i64).to_string(),
reddb::json::Value::Number(n) => n.to_string(),
reddb::json::Value::Array(_) | reddb::json::Value::Object(_) => value.to_string_compact(),
}
}
fn csv_escape(value: &str) -> String {
if value.contains(',') || value.contains('"') || value.contains('\n') || value.contains('\r') {
format!("\"{}\"", value.replace('"', "\"\""))
} else {
value.to_string()
}
}
fn run_admin_cache_command(
_flags: &HashMap<String, FlagValue>,
bind: &str,
token: Option<&str>,
json_mode: bool,
args: &[&str],
) {
let subcommand = args.first().copied().unwrap_or("help");
let sub_args: &[&str] = if args.is_empty() { &[] } else { &args[1..] };
match subcommand {
"stats" => {
let path = "/admin/blob_cache/stats";
match get_from_http(bind, path, token) {
Ok((status, body)) if status < 400 => {
if json_mode {
print!("{body}");
} else {
print_cache_stats_pretty(&body);
}
}
Ok((status, body)) => {
if json_mode {
json_error(
"admin.cache.stats",
&format!("server returned {status}: {body}"),
);
}
eprintln!("error: server returned {status}: {body}");
std::process::exit(1);
}
Err(err) => {
if json_mode {
json_error("admin.cache.stats", &err);
}
eprintln!("error: {err}");
std::process::exit(1);
}
}
}
"flush-namespace" => {
let ns = sub_args.first().copied().unwrap_or("");
if ns.is_empty() {
if json_mode {
json_error(
"admin.cache.flush-namespace",
"namespace argument is required",
);
}
eprintln!("error: namespace argument is required");
eprintln!("usage: red admin cache flush-namespace <namespace>");
std::process::exit(1);
}
let payload = format!("{{\"namespace\":\"{}\"}}", json_escape(ns));
match post_json_to_http_authed(
bind,
"/admin/blob_cache/flush_namespace",
&payload,
token,
) {
Ok(body) => {
if json_mode {
print!("{body}");
} else {
println!("flushed namespace: {ns}");
println!("{body}");
}
}
Err(err) => {
if json_mode {
json_error("admin.cache.flush-namespace", &err);
}
eprintln!("error: {err}");
std::process::exit(1);
}
}
}
"sweep" => {
let limit_entries: Option<u64> = sub_args
.windows(2)
.find(|w| w[0] == "--limit-entries")
.and_then(|w| w[1].parse().ok());
let limit_millis: Option<u64> = sub_args
.windows(2)
.find(|w| w[0] == "--limit-millis")
.and_then(|w| w[1].parse().ok());
let payload = match (limit_entries, limit_millis) {
(None, None) => "{}".to_string(),
(Some(e), None) => format!("{{\"limit_entries\":{e}}}"),
(None, Some(m)) => format!("{{\"limit_millis\":{m}}}"),
(Some(e), Some(m)) => format!("{{\"limit_entries\":{e},\"limit_millis\":{m}}}"),
};
match post_json_to_http_authed(bind, "/admin/blob_cache/sweep", &payload, token) {
Ok(body) => {
if json_mode {
print!("{body}");
} else {
println!("sweep complete");
println!("{body}");
}
}
Err(err) => {
if json_mode {
json_error("admin.cache.sweep", &err);
}
eprintln!("error: {err}");
std::process::exit(1);
}
}
}
"compare-and-set" => {
let get_flag = |name: &str| -> Option<&str> {
sub_args.windows(2).find(|w| w[0] == name).map(|w| w[1])
};
let ns = match get_flag("--namespace") {
Some(v) => v,
None => {
if json_mode {
json_error("admin.cache.compare-and-set", "--namespace is required");
}
eprintln!("error: --namespace is required");
std::process::exit(1);
}
};
let key = match get_flag("--key") {
Some(v) => v,
None => {
if json_mode {
json_error("admin.cache.compare-and-set", "--key is required");
}
eprintln!("error: --key is required");
std::process::exit(1);
}
};
let new_version: u64 = match get_flag("--new-version").and_then(|v| v.parse().ok()) {
Some(v) => v,
None => {
if json_mode {
json_error(
"admin.cache.compare-and-set",
"--new-version (u64) is required",
);
}
eprintln!("error: --new-version (u64) is required");
std::process::exit(1);
}
};
let value_path = match get_flag("--value") {
Some(v) => v,
None => {
if json_mode {
json_error("admin.cache.compare-and-set", "--value <file> is required");
}
eprintln!("error: --value <file> is required");
std::process::exit(1);
}
};
let raw_bytes = match std::fs::read(value_path) {
Ok(b) => b,
Err(err) => {
if json_mode {
json_error(
"admin.cache.compare-and-set",
&format!("failed to read {value_path}: {err}"),
);
}
eprintln!("error: failed to read {value_path}: {err}");
std::process::exit(1);
}
};
let b64 = bytes_to_base64(&raw_bytes);
let expected_version_field = get_flag("--expected-version")
.and_then(|v| v.parse::<u64>().ok())
.map(|v| format!(",\"expected_version\":{v}"))
.unwrap_or_default();
let payload = format!(
"{{\"namespace\":\"{ns}\",\"key\":\"{key}\",\"new_value_b64\":\"{b64}\",\"new_version\":{new_version}{expected_version_field}}}",
ns = json_escape(ns),
key = json_escape(key),
);
match post_json_to_http_authed(bind, "/admin/cache/compare-and-set", &payload, token) {
Ok(body) => {
if json_mode {
print!("{body}");
} else {
println!("{body}");
}
}
Err(err) => {
if json_mode {
json_error("admin.cache.compare-and-set", &err);
}
eprintln!("error: {err}");
std::process::exit(1);
}
}
}
_ => {
if json_mode {
json_ok(
"admin.cache",
"{\"subcommands\":[\"stats\",\"flush-namespace\",\"sweep\",\"compare-and-set\"]}",
);
} else {
println!("Usage: red admin cache <subcommand>");
println!();
println!("Subcommands:");
println!(" stats GET /admin/blob_cache/stats");
println!(" flush-namespace <ns> POST /admin/blob_cache/flush_namespace");
println!(" sweep [--limit-entries N] POST /admin/blob_cache/sweep");
println!(" [--limit-millis N]");
println!(" compare-and-set POST /admin/cache/compare-and-set");
println!(" --namespace ns --key k");
println!(" --new-version V --value <file>");
println!(" [--expected-version V]");
println!();
println!("Env vars:");
println!(" REDDB_BIND_ADDR Server HTTP address (overrides --bind)");
println!(" RED_ADMIN_TOKEN Admin bearer token (overrides --token)");
}
}
}
}
fn format_cache_stats_pretty(body: &str) -> String {
let fields: &[(&str, &str)] = &[
("hits", "Hits"),
("misses", "Misses"),
("insertions", "Insertions"),
("evictions", "Evictions"),
("expirations", "Expirations"),
("invalidations", "Invalidations"),
("namespace_flushes", "Namespace flushes"),
("version_mismatches", "Version mismatches"),
("entries", "Entries"),
("bytes_in_use", "L1 bytes in use"),
("l1_bytes_max", "L1 bytes max"),
("l2_bytes_in_use", "L2 bytes in use"),
("l2_bytes_max", "L2 bytes max"),
("namespaces", "Namespaces"),
("max_namespaces", "Max namespaces"),
("l2_compression_ratio_observed", "L2 compression ratio"),
("l2_bytes_saved_total", "L2 bytes saved total"),
];
let parsed: Option<reddb::json::Value> = reddb::json::from_str(body).ok();
if let Some(obj) = parsed.as_ref().and_then(|v| v.as_object()) {
let mut out = format!("{:<30} {}\n{}\n", "Metric", "Value", "-".repeat(50));
for (key, label) in fields {
if let Some(val) = obj.get(*key) {
out.push_str(&format!("{:<30} {}\n", label, val));
}
}
out
} else {
format!("{body}\n")
}
}
fn print_cache_stats_pretty(body: &str) {
print!("{}", format_cache_stats_pretty(body));
}
fn bytes_to_base64(data: &[u8]) -> String {
const CHARS: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
let mut out = String::with_capacity(data.len().div_ceil(3) * 4);
let mut buf: u32 = 0;
let mut bits: u8 = 0;
for &b in data {
buf = (buf << 8) | b as u32;
bits += 8;
while bits >= 6 {
bits -= 6;
out.push(CHARS[((buf >> bits) & 0x3F) as usize] as char);
}
}
if bits > 0 {
out.push(CHARS[((buf << (6 - bits)) & 0x3F) as usize] as char);
}
while !out.len().is_multiple_of(4) {
out.push('=');
}
out
}
fn post_json_to_http_authed(
bind_addr: &str,
path: &str,
payload: &str,
token: Option<&str>,
) -> Result<String, String> {
let bind_addr = bind_addr
.trim()
.trim_start_matches("http://")
.trim_start_matches("https://");
let mut stream = TcpStream::connect(bind_addr)
.map_err(|err| format!("failed to connect to {bind_addr}: {err}"))?;
stream
.set_read_timeout(Some(Duration::from_secs(10)))
.map_err(|err| format!("failed to set read timeout: {err}"))?;
stream
.set_write_timeout(Some(Duration::from_secs(10)))
.map_err(|err| format!("failed to set write timeout: {err}"))?;
let auth_line = token
.map(|t| format!("Authorization: Bearer {t}\r\n"))
.unwrap_or_default();
let request = format!(
"POST {path} HTTP/1.1\r\nHost: {host}\r\n{auth_line}Content-Type: application/json\r\nContent-Length: {len}\r\nConnection: close\r\n\r\n{payload}",
path = path,
host = bind_addr,
len = payload.len(),
);
stream
.write_all(request.as_bytes())
.map_err(|err| format!("failed to write request: {err}"))?;
let mut response = String::new();
stream
.read_to_string(&mut response)
.map_err(|err| format!("failed to read response: {err}"))?;
let status_line = response
.lines()
.next()
.ok_or_else(|| "empty response from server".to_string())?;
let status = status_line
.split_whitespace()
.nth(1)
.and_then(|v| v.parse::<u16>().ok())
.ok_or_else(|| format!("invalid response status line: {status_line}"))?;
let body = response
.split_once("\r\n\r\n")
.map(|(_, b)| b.to_string())
.unwrap_or_default();
if status >= 400 {
Err(format!("server responded with {status}: {body}"))
} else {
Ok(body)
}
}
fn post_json_to_http(bind_addr: &str, path: &str, payload: &str) -> Result<String, String> {
let bind_addr = bind_addr
.trim()
.trim_start_matches("http://")
.trim_start_matches("https://");
let mut stream = TcpStream::connect(bind_addr)
.map_err(|err| format!("failed to connect to {bind_addr}: {err}"))?;
stream
.set_read_timeout(Some(Duration::from_secs(10)))
.map_err(|err| format!("failed to set read timeout: {err}"))?;
stream
.set_write_timeout(Some(Duration::from_secs(10)))
.map_err(|err| format!("failed to set write timeout: {err}"))?;
let request = format!(
"POST {path} HTTP/1.1\r\n\
Host: {host}\r\n\
Content-Type: application/json\r\n\
Content-Length: {content_length}\r\n\
Connection: close\r\n\
\r\n\
{payload}",
path = path,
host = bind_addr,
content_length = payload.len(),
payload = payload
);
stream
.write_all(request.as_bytes())
.map_err(|err| format!("failed to write request: {err}"))?;
let mut response = String::new();
stream
.read_to_string(&mut response)
.map_err(|err| format!("failed to read response: {err}"))?;
let mut lines = response.lines();
let status_line = lines
.next()
.ok_or_else(|| "empty response from server".to_string())?;
let status = status_line
.split_whitespace()
.nth(1)
.and_then(|value| value.parse::<u16>().ok())
.ok_or_else(|| format!("invalid response status line: {status_line}"))?;
let body = response
.split_once("\r\n\r\n")
.map(|(_headers, body)| body.to_string())
.unwrap_or_default();
if status >= 400 {
Err(format!("server responded with {status}: {body}"))
} else {
Ok(body)
}
}
fn get_from_http(
bind_addr: &str,
path: &str,
token: Option<&str>,
) -> Result<(u16, String), String> {
let bind_addr = bind_addr
.trim()
.trim_start_matches("http://")
.trim_start_matches("https://");
let mut stream = TcpStream::connect(bind_addr)
.map_err(|err| format!("failed to connect to {bind_addr}: {err}"))?;
stream
.set_read_timeout(Some(Duration::from_secs(5)))
.map_err(|err| format!("failed to set read timeout: {err}"))?;
stream
.set_write_timeout(Some(Duration::from_secs(5)))
.map_err(|err| format!("failed to set write timeout: {err}"))?;
let auth_line = token
.map(|t| format!("Authorization: Bearer {t}\r\n"))
.unwrap_or_default();
let request = format!(
"GET {path} HTTP/1.1\r\nHost: {bind_addr}\r\n{auth_line}Connection: close\r\n\r\n",
);
stream
.write_all(request.as_bytes())
.map_err(|err| format!("failed to write request: {err}"))?;
let mut response = String::new();
stream
.read_to_string(&mut response)
.map_err(|err| format!("failed to read response: {err}"))?;
let status_line = response
.lines()
.next()
.ok_or_else(|| "empty response from server".to_string())?;
let status = status_line
.split_whitespace()
.nth(1)
.and_then(|v| v.parse::<u16>().ok())
.ok_or_else(|| format!("invalid response status line: {status_line}"))?;
let body = response
.split_once("\r\n\r\n")
.map(|(_h, b)| b.to_string())
.unwrap_or_default();
Ok((status, body))
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
enum DoctorSeverity {
Ok,
Warn,
Crit,
}
impl DoctorSeverity {
fn label(self) -> &'static str {
match self {
Self::Ok => "ok",
Self::Warn => "warn",
Self::Crit => "crit",
}
}
fn exit_code(self) -> i32 {
match self {
Self::Ok => 0,
Self::Warn => 1,
Self::Crit => 2,
}
}
}
struct DoctorCheck {
name: &'static str,
severity: DoctorSeverity,
detail: String,
}
fn parse_prom_metric(body: &str, metric_name: &str) -> Option<f64> {
for line in body.lines() {
if line.starts_with('#') || line.is_empty() {
continue;
}
let head = line.split_whitespace().next()?;
let name = head.split('{').next().unwrap_or(head);
if name == metric_name {
return line.split_whitespace().last().and_then(|v| v.parse().ok());
}
}
None
}
fn parse_prom_metric_with_label(
body: &str,
metric_name: &str,
label_key: &str,
label_value: &str,
) -> Option<f64> {
let needle = format!("{label_key}=\"{label_value}\"");
for line in body.lines() {
if line.starts_with('#') || line.is_empty() {
continue;
}
let head = line.split_whitespace().next()?;
let name = head.split('{').next().unwrap_or(head);
if name == metric_name && head.contains(&needle) {
return line.split_whitespace().last().and_then(|v| v.parse().ok());
}
}
None
}
fn run_bootstrap_command(flags: &HashMap<String, FlagValue>) -> i32 {
use reddb::cli::bootstrap::{render_success, run, BootstrapArgs};
let json_mode = wants_json(flags);
let path = match flag_string(flags, "path").filter(|s| !s.is_empty()) {
Some(p) => PathBuf::from(p),
None => {
let msg = "bootstrap requires --path <FILE>";
if json_mode {
json_error("bootstrap", msg);
}
eprintln!("error: {msg}");
return 2;
}
};
let username = flag_string(flags, "username")
.filter(|s| !s.is_empty())
.or_else(|| std::env::var("REDDB_USERNAME").ok())
.unwrap_or_default();
let args = BootstrapArgs {
path,
vault: flag_bool(flags, "vault"),
username,
password: flag_string(flags, "password").filter(|s| !s.is_empty()),
password_stdin: flag_bool(flags, "password-stdin"),
print_certificate: flag_bool(flags, "print-certificate"),
json: json_mode,
};
match run(args) {
Ok(outcome) => {
let render_args = reddb::cli::bootstrap::BootstrapArgs {
path: PathBuf::new(),
vault: true,
username: outcome.username.clone(),
password: None,
password_stdin: false,
print_certificate: flag_bool(flags, "print-certificate"),
json: json_mode,
};
render_success(&outcome, &render_args);
0
}
Err(err) => {
if json_mode {
json_error("bootstrap", &err);
}
eprintln!("bootstrap error: {err}");
1
}
}
}
fn run_doctor(result: &reddb::cli::schema::SchemaResult) -> i32 {
let json_mode = wants_json(&result.flags);
let bind = flag_string(&result.flags, "bind").unwrap_or_else(|| "127.0.0.1:8080".to_string());
let token = flag_string(&result.flags, "token")
.or_else(|| std::env::var("RED_ADMIN_TOKEN").ok())
.filter(|t| !t.trim().is_empty());
let backup_warn: f64 = flag_string(&result.flags, "backup-age-warn-secs")
.and_then(|s| s.parse().ok())
.unwrap_or(600.0);
let backup_crit: f64 = flag_string(&result.flags, "backup-age-crit-secs")
.and_then(|s| s.parse().ok())
.unwrap_or(3600.0);
let wal_warn: f64 = flag_string(&result.flags, "wal-lag-warn")
.and_then(|s| s.parse().ok())
.unwrap_or(1000.0);
let wal_crit: f64 = flag_string(&result.flags, "wal-lag-crit")
.and_then(|s| s.parse().ok())
.unwrap_or(10000.0);
let mut checks: Vec<DoctorCheck> = Vec::new();
let metrics = match get_from_http(&bind, "/metrics", token.as_deref()) {
Ok((200, body)) => Some(body),
Ok((status, _)) => {
checks.push(DoctorCheck {
name: "reachability",
severity: DoctorSeverity::Crit,
detail: format!(
"GET /metrics returned HTTP {status} (token mismatch or service down)"
),
});
None
}
Err(err) => {
checks.push(DoctorCheck {
name: "reachability",
severity: DoctorSeverity::Crit,
detail: format!("connect to {bind} failed: {err}"),
});
None
}
};
let status_body = get_from_http(&bind, "/admin/status", token.as_deref()).ok();
let status_json = status_body
.as_ref()
.filter(|(s, _)| *s == 200)
.and_then(|(_, body)| reddb::serde_json::from_str::<reddb::serde_json::Value>(body).ok());
if status_json.is_none() {
checks.push(DoctorCheck {
name: "admin_status",
severity: DoctorSeverity::Warn,
detail: "GET /admin/status not parseable; downstream checks rely on /metrics only"
.to_string(),
});
}
if let Some(body) = &metrics {
if let Some(age) = parse_prom_metric(body, "reddb_backup_age_seconds") {
let sev = if age >= backup_crit {
DoctorSeverity::Crit
} else if age >= backup_warn {
DoctorSeverity::Warn
} else {
DoctorSeverity::Ok
};
checks.push(DoctorCheck {
name: "backup_age",
severity: sev,
detail: format!(
"{age:.0}s since last successful backup (warn={backup_warn}s crit={backup_crit}s)"
),
});
} else {
checks.push(DoctorCheck {
name: "backup_age",
severity: DoctorSeverity::Warn,
detail: "no successful backup recorded yet (reddb_backup_age_seconds absent)"
.to_string(),
});
}
}
if let Some(body) = &metrics {
if let Some(lag) = parse_prom_metric(body, "reddb_wal_archive_lag_records") {
let sev = if lag >= wal_crit {
DoctorSeverity::Crit
} else if lag >= wal_warn {
DoctorSeverity::Warn
} else {
DoctorSeverity::Ok
};
checks.push(DoctorCheck {
name: "wal_archive_lag",
severity: sev,
detail: format!(
"{lag:.0} records between current LSN and last archived (warn={wal_warn} crit={wal_crit})"
),
});
}
}
if let Some(json) = &status_json {
if let Some(lease) = json.get("writer_lease").and_then(|v| v.as_str()) {
let sev = match lease {
"not_held" => DoctorSeverity::Crit,
_ => DoctorSeverity::Ok,
};
checks.push(DoctorCheck {
name: "writer_lease",
severity: sev,
detail: format!("lease state: {lease}"),
});
}
}
if let Some(json) = &status_json {
if let Some(health) = json
.get("replica")
.and_then(|v| v.get("apply_health"))
.and_then(|v| v.as_str())
{
let sev = match health {
"ok" | "healthy" | "connecting" => DoctorSeverity::Ok,
"stalled_gap" | "divergence" => DoctorSeverity::Crit,
_ => DoctorSeverity::Warn,
};
checks.push(DoctorCheck {
name: "replica_apply_health",
severity: sev,
detail: format!("replica apply state: {health}"),
});
}
}
if let Some(json) = &status_json {
if let Some(true) = json.get("read_only").and_then(|v| v.as_bool()) {
checks.push(DoctorCheck {
name: "read_only",
severity: DoctorSeverity::Warn,
detail: "instance is read-only; writes will be rejected".to_string(),
});
}
}
if let Some(body) = &metrics {
let puts =
parse_prom_metric_with_label(body, "reddb_kv_ops_total", "verb", "put").unwrap_or(0.0);
let gets =
parse_prom_metric_with_label(body, "reddb_kv_ops_total", "verb", "get").unwrap_or(0.0);
let deletes = parse_prom_metric_with_label(body, "reddb_kv_ops_total", "verb", "delete")
.unwrap_or(0.0);
let incrs =
parse_prom_metric_with_label(body, "reddb_kv_ops_total", "verb", "incr").unwrap_or(0.0);
let watch_active = parse_prom_metric(body, "reddb_kv_watch_streams_active").unwrap_or(0.0);
let watch_drops = parse_prom_metric(body, "reddb_kv_watch_drops_total").unwrap_or(0.0);
checks.push(DoctorCheck {
name: "kv_stats",
severity: DoctorSeverity::Ok,
detail: format!(
"puts={puts:.0} gets={gets:.0} deletes={deletes:.0} incrs={incrs:.0} watch_active={watch_active:.0} watch_drops={watch_drops:.0}"
),
});
}
let worst = checks
.iter()
.map(|c| c.severity)
.max()
.unwrap_or(DoctorSeverity::Ok);
if json_mode {
let mut buf = String::from("{\"checks\":[");
for (i, c) in checks.iter().enumerate() {
if i > 0 {
buf.push(',');
}
buf.push_str(&format!(
"{{\"name\":\"{}\",\"severity\":\"{}\",\"detail\":\"{}\"}}",
json_escape(c.name),
c.severity.label(),
json_escape(&c.detail)
));
}
buf.push_str(&format!("],\"worst\":\"{}\"}}", worst.label()));
json_ok("doctor", &buf);
} else {
for c in &checks {
let icon = match c.severity {
DoctorSeverity::Ok => "[ok] ",
DoctorSeverity::Warn => "[warn]",
DoctorSeverity::Crit => "[crit]",
};
println!("{icon} {} — {}", c.name, c.detail);
}
println!("\nworst: {}", worst.label());
}
worst.exit_code()
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::BTreeMap;
use std::sync::{Mutex, OnceLock};
#[test]
fn doctor_parses_simple_metric_line() {
let body = "\
# HELP reddb_uptime_seconds Seconds since boot.\n\
# TYPE reddb_uptime_seconds gauge\n\
reddb_uptime_seconds 12345.6\n";
assert_eq!(
parse_prom_metric(body, "reddb_uptime_seconds"),
Some(12345.6)
);
}
#[test]
fn doctor_parses_metric_with_labels() {
let body = "\
# HELP reddb_writer_lease_state ...\n\
# TYPE reddb_writer_lease_state gauge\n\
reddb_writer_lease_state{state=\"held\"} 1\n";
assert_eq!(
parse_prom_metric(body, "reddb_writer_lease_state"),
Some(1.0)
);
}
#[test]
fn doctor_parses_metric_with_specific_label() {
let body = "\
reddb_kv_ops_total{verb=\"put\"} 2\n\
reddb_kv_ops_total{verb=\"get\"} 3\n";
assert_eq!(
parse_prom_metric_with_label(body, "reddb_kv_ops_total", "verb", "get"),
Some(3.0)
);
}
#[test]
fn doctor_returns_first_match_when_multiple_label_sets() {
let body = "\
reddb_replica_lag_records{replica_id=\"a\"} 100\n\
reddb_replica_lag_records{replica_id=\"b\"} 250\n";
assert_eq!(
parse_prom_metric(body, "reddb_replica_lag_records"),
Some(100.0)
);
}
#[test]
fn doctor_misses_unknown_metric() {
let body = "reddb_uptime_seconds 1\n";
assert_eq!(parse_prom_metric(body, "reddb_does_not_exist"), None);
}
#[test]
fn doctor_skips_help_and_type_lines() {
let body = "\
# HELP reddb_uptime_seconds Time since boot.\n\
# TYPE reddb_uptime_seconds gauge\n";
assert_eq!(parse_prom_metric(body, "reddb_uptime_seconds"), None);
}
#[test]
fn doctor_severity_orders_ok_warn_crit() {
assert!(DoctorSeverity::Ok < DoctorSeverity::Warn);
assert!(DoctorSeverity::Warn < DoctorSeverity::Crit);
assert_eq!(DoctorSeverity::Ok.exit_code(), 0);
assert_eq!(DoctorSeverity::Warn.exit_code(), 1);
assert_eq!(DoctorSeverity::Crit.exit_code(), 2);
}
fn bool_flag(value: bool) -> FlagValue {
FlagValue::Bool(value)
}
fn str_flag(value: &str) -> FlagValue {
FlagValue::Str(value.to_string())
}
fn env_lock() -> &'static Mutex<()> {
static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
LOCK.get_or_init(|| Mutex::new(()))
}
struct EnvGuard {
saved: Vec<(&'static str, Option<String>)>,
}
impl EnvGuard {
fn set(vars: &[(&'static str, &str)]) -> Self {
let mut saved = Vec::new();
let mut dedup = BTreeMap::new();
for (key, value) in vars {
dedup.insert(*key, *value);
}
for (key, value) in dedup {
saved.push((key, std::env::var(key).ok()));
unsafe {
std::env::set_var(key, value);
}
}
Self { saved }
}
fn clear(keys: &[&'static str]) -> Self {
let mut saved = Vec::new();
let mut dedup = BTreeMap::new();
for key in keys {
dedup.insert(*key, ());
}
for (key, _) in dedup {
saved.push((key, std::env::var(key).ok()));
unsafe {
std::env::remove_var(key);
}
}
Self { saved }
}
}
impl Drop for EnvGuard {
fn drop(&mut self) {
for (key, value) in self.saved.drain(..).rev() {
match value {
Some(value) => unsafe {
std::env::set_var(key, value);
},
None => unsafe {
std::env::remove_var(key);
},
}
}
}
}
#[test]
fn resolve_server_binds_defaults_to_grpc() {
let _lock = env_lock().lock().unwrap();
let _guard = EnvGuard::clear(&[
"REDDB_BIND_ADDR",
"REDDB_GRPC_BIND_ADDR",
"REDDB_HTTP_BIND_ADDR",
]);
let flags = HashMap::new();
let (grpc_bind, http_bind) = resolve_server_binds(&flags).unwrap();
assert_eq!(
grpc_bind.as_deref(),
Some(reddb::service_cli::ServerTransport::Grpc.default_bind_addr())
);
assert_eq!(http_bind, None);
}
#[test]
fn resolve_server_binds_supports_dual_stack_defaults() {
let _lock = env_lock().lock().unwrap();
let _guard = EnvGuard::clear(&[
"REDDB_BIND_ADDR",
"REDDB_GRPC_BIND_ADDR",
"REDDB_HTTP_BIND_ADDR",
]);
let flags = HashMap::from([
("grpc".to_string(), bool_flag(true)),
("http".to_string(), bool_flag(true)),
]);
let (grpc_bind, http_bind) = resolve_server_binds(&flags).unwrap();
assert_eq!(
grpc_bind.as_deref(),
Some(reddb::service_cli::ServerTransport::Grpc.default_bind_addr())
);
assert_eq!(
http_bind.as_deref(),
Some(reddb::service_cli::ServerTransport::Http.default_bind_addr())
);
}
#[test]
fn resolve_server_binds_rejects_ambiguous_legacy_bind() {
let _lock = env_lock().lock().unwrap();
let _guard = EnvGuard::clear(&[
"REDDB_BIND_ADDR",
"REDDB_GRPC_BIND_ADDR",
"REDDB_HTTP_BIND_ADDR",
]);
let flags = HashMap::from([
("grpc".to_string(), bool_flag(true)),
("http".to_string(), bool_flag(true)),
("bind".to_string(), str_flag("0.0.0.0:9999")),
]);
let error = resolve_server_binds(&flags).unwrap_err();
assert!(error.contains("--bind is ambiguous"));
}
#[test]
fn resolve_server_binds_accepts_explicit_dual_addresses() {
let flags = HashMap::from([
("grpc-bind".to_string(), str_flag("0.0.0.0:50051")),
("http-bind".to_string(), str_flag("0.0.0.0:8080")),
]);
let (grpc_bind, http_bind) = resolve_server_binds(&flags).unwrap();
assert_eq!(grpc_bind.as_deref(), Some("0.0.0.0:50051"));
assert_eq!(http_bind.as_deref(), Some("0.0.0.0:8080"));
}
#[test]
fn build_server_config_defaults_to_router_on_5050() {
let _lock = env_lock().lock().unwrap();
let _guard = EnvGuard::clear(&[
"REDDB_BIND_ADDR",
"REDDB_GRPC_BIND_ADDR",
"REDDB_HTTP_BIND_ADDR",
]);
let flags = HashMap::new();
let config = build_server_config(&flags, None).unwrap();
assert_eq!(
config.router_bind_addr.as_deref(),
Some(reddb::service_cli::DEFAULT_ROUTER_BIND_ADDR)
);
assert_eq!(config.grpc_bind_addr, None);
assert_eq!(config.http_bind_addr, None);
assert_eq!(config.wire_bind_addr, None);
}
#[test]
fn build_server_config_maps_legacy_bind_to_router_when_no_transport_is_selected() {
let _lock = env_lock().lock().unwrap();
let _guard = EnvGuard::clear(&[
"REDDB_BIND_ADDR",
"REDDB_GRPC_BIND_ADDR",
"REDDB_HTTP_BIND_ADDR",
]);
let flags = HashMap::from([("bind".to_string(), str_flag("0.0.0.0:5050"))]);
let config = build_server_config(&flags, None).unwrap();
assert_eq!(config.router_bind_addr.as_deref(), Some("0.0.0.0:5050"));
assert_eq!(config.grpc_bind_addr, None);
assert_eq!(config.http_bind_addr, None);
}
#[test]
fn build_server_config_uses_docker_env_defaults() {
let _lock = env_lock().lock().unwrap();
let _guard = EnvGuard::set(&[
("REDDB_DATA_PATH", "/data/data.rdb"),
("REDDB_GRPC_BIND_ADDR", "0.0.0.0:50051"),
("REDDB_HTTP_BIND_ADDR", "0.0.0.0:8080"),
("REDDB_BIND_ADDR", "0.0.0.0:50051"),
]);
let flags = HashMap::from([("path".to_string(), str_flag("./data/reddb.rdb"))]);
let config = build_server_config(&flags, None).unwrap();
assert_eq!(
config.path.as_deref(),
Some(std::path::Path::new("/data/data.rdb"))
);
assert_eq!(config.router_bind_addr, None);
assert_eq!(config.grpc_bind_addr.as_deref(), Some("0.0.0.0:50051"));
assert_eq!(config.http_bind_addr.as_deref(), Some("0.0.0.0:8080"));
}
#[test]
fn build_server_config_prefers_cli_flags_over_env_defaults() {
let _lock = env_lock().lock().unwrap();
let _guard = EnvGuard::set(&[
("REDDB_DATA_PATH", "/data/data.rdb"),
("REDDB_GRPC_BIND_ADDR", "0.0.0.0:50051"),
("REDDB_HTTP_BIND_ADDR", "0.0.0.0:8080"),
]);
let flags = HashMap::from([
("path".to_string(), str_flag("/tmp/override.rdb")),
("http-bind".to_string(), str_flag("127.0.0.1:18080")),
]);
let config = build_server_config(&flags, None).unwrap();
assert_eq!(
config.path.as_deref(),
Some(std::path::Path::new("/tmp/override.rdb"))
);
assert_eq!(config.grpc_bind_addr.as_deref(), Some("0.0.0.0:50051"));
assert_eq!(config.http_bind_addr.as_deref(), Some("127.0.0.1:18080"));
}
#[test]
fn parser_default_path_yields_to_docker_env_path() {
let _lock = env_lock().lock().unwrap();
let _guard = EnvGuard::set(&[
("REDDB_DATA_PATH", "/data/data.rdb"),
("REDDB_GRPC_BIND_ADDR", "0.0.0.0:50051"),
("REDDB_HTTP_BIND_ADDR", "0.0.0.0:8080"),
("REDDB_BIND_ADDR", "0.0.0.0:50051"),
]);
let args = vec!["server".to_string()];
let tokens = cli::token::tokenize(&args);
let parser = cli::schema::SchemaParser::new(build_flags_for_command(Some("server")));
let result = parser.parse(&tokens);
assert!(result.errors.is_empty());
let config = build_server_config(&result.flags, None).unwrap();
assert_eq!(
config.path.as_deref(),
Some(std::path::Path::new("/data/data.rdb"))
);
assert_eq!(config.grpc_bind_addr.as_deref(), Some("0.0.0.0:50051"));
assert_eq!(config.http_bind_addr.as_deref(), Some("0.0.0.0:8080"));
}
#[test]
fn build_systemd_service_config_defaults_to_router_on_5050() {
let _lock = env_lock().lock().unwrap();
let _guard = EnvGuard::clear(&[
"REDDB_BIND_ADDR",
"REDDB_GRPC_BIND_ADDR",
"REDDB_HTTP_BIND_ADDR",
]);
let flags = HashMap::new();
let config = build_systemd_service_config(&flags).unwrap();
assert_eq!(
config.router_bind_addr.as_deref(),
Some(reddb::service_cli::DEFAULT_ROUTER_BIND_ADDR)
);
assert_eq!(config.grpc_bind_addr, None);
assert_eq!(config.http_bind_addr, None);
}
#[test]
fn build_systemd_service_config_keeps_explicit_http_bind() {
let _lock = env_lock().lock().unwrap();
let _guard = EnvGuard::clear(&[
"REDDB_BIND_ADDR",
"REDDB_GRPC_BIND_ADDR",
"REDDB_HTTP_BIND_ADDR",
]);
let flags = HashMap::from([("http-bind".to_string(), str_flag("0.0.0.0:8080"))]);
let config = build_systemd_service_config(&flags).unwrap();
assert_eq!(config.router_bind_addr, None);
assert_eq!(config.grpc_bind_addr, None);
assert_eq!(config.http_bind_addr.as_deref(), Some("0.0.0.0:8080"));
}
#[test]
fn format_cache_stats_pretty_renders_header_and_known_fields() {
let body = r#"{"ok":true,"hits":10,"misses":2,"entries":5,"bytes_in_use":1024}"#;
let out = format_cache_stats_pretty(body);
assert!(out.contains("Metric"), "missing header row");
assert!(out.contains("Value"), "missing header row");
assert!(out.contains("Hits"), "missing Hits row");
assert!(out.contains("10"), "missing hits value");
assert!(out.contains("Misses"), "missing Misses row");
assert!(out.contains("2"), "missing misses value");
assert!(out.contains("Entries"), "missing Entries row");
assert!(out.contains("L1 bytes in use"), "missing bytes_in_use row");
assert!(!out.contains("Evictions"), "unexpected Evictions row");
}
#[test]
fn format_cache_stats_pretty_falls_back_to_raw_on_invalid_json() {
let body = "not json at all";
let out = format_cache_stats_pretty(body);
assert_eq!(out.trim(), "not json at all");
}
#[test]
fn format_cache_stats_pretty_renders_separator_line() {
let body = r#"{"hits":0}"#;
let out = format_cache_stats_pretty(body);
assert!(out.contains("--------------------------------------------------"));
}
fn sample_admin_query_body() -> &'static str {
r#"{"ok":true,"result":{"columns":["name","model","internal"],"records":[{"values":{"name":"users","model":"table","internal":false},"nodes":{},"edges":{},"paths":[],"vector_results":[]},{"values":{"name":"red.collections","model":"table","internal":true},"nodes":{},"edges":{},"paths":[],"vector_results":[]}],"stats":{"rows_scanned":2}}}"#
}
#[test]
fn admin_table_from_query_response_extracts_columns_and_rows() {
let table = admin_table_from_query_response(sample_admin_query_body()).unwrap();
assert_eq!(table.columns, vec!["name", "model", "internal"]);
assert_eq!(table.rows.len(), 2);
assert_eq!(
table.rows[0].get("name").and_then(|v| v.as_str()),
Some("users")
);
}
#[test]
fn format_admin_table_renders_plain_table_when_color_disabled() {
let table = admin_table_from_query_response(sample_admin_query_body()).unwrap();
let out = format_admin_table(&table, false);
assert!(out.contains("name"));
assert!(out.contains("users"));
assert!(out.contains("red.collections"));
assert!(!out.contains("\x1b["));
}
#[test]
fn format_admin_table_renders_ansi_header_when_color_enabled() {
let table = admin_table_from_query_response(sample_admin_query_body()).unwrap();
let out = format_admin_table(&table, true);
assert!(out.contains("\x1b[36;1mname\x1b[0m"));
}
#[test]
fn admin_rows_json_outputs_bare_array_for_jq() {
let table = admin_table_from_query_response(sample_admin_query_body()).unwrap();
let out = admin_rows_json(&table.rows);
assert!(out.starts_with('['));
assert!(out.contains(r#""name":"users""#));
assert!(!out.contains(r#""ok""#));
}
#[test]
fn format_admin_csv_escapes_commas_and_quotes() {
let table = AdminQueryTable {
columns: vec!["name".to_string(), "model".to_string()],
rows: vec![BTreeMap::from([
(
"name".to_string(),
reddb::json::Value::String("weird,\"name\"".to_string()),
),
(
"model".to_string(),
reddb::json::Value::String("table".to_string()),
),
])],
};
let out = format_admin_csv(&table);
assert_eq!(out, "name,model\n\"weird,\"\"name\"\"\",table\n");
}
#[test]
fn bytes_to_base64_empty_input() {
assert_eq!(bytes_to_base64(b""), "");
}
#[test]
fn bytes_to_base64_one_byte() {
assert_eq!(bytes_to_base64(b"f"), "Zg==");
}
#[test]
fn bytes_to_base64_two_bytes() {
assert_eq!(bytes_to_base64(b"fo"), "Zm8=");
}
#[test]
fn bytes_to_base64_three_bytes_no_padding() {
assert_eq!(bytes_to_base64(b"foo"), "Zm9v");
}
#[test]
fn bytes_to_base64_rfc_test_vector_foobar() {
assert_eq!(bytes_to_base64(b"foobar"), "Zm9vYmFy");
}
#[test]
fn bytes_to_base64_binary_zeros() {
assert_eq!(bytes_to_base64(&[0u8, 0, 0]), "AAAA");
}
}