use crate::models::field_names;
use std::io::Write as _;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, Instant};
use anyhow::{Context, Result};
use axum::Router;
use clap::{Args, CommandFactory, Parser, Subcommand};
use clap_complete::{Shell, generate};
use rusqlite::Connection;
use tokio::sync::{Mutex, Notify};
use tokio::task::JoinHandle;
use tracing_subscriber::EnvFilter;
use crate::cli::agents::{AgentsArgs, PendingArgs};
use crate::cli::archive::ArchiveArgs;
use crate::cli::audit::AuditArgs;
use crate::cli::backup::{BackupArgs, RestoreArgs};
use crate::cli::boot::BootArgs;
use crate::cli::consolidate::{AutoConsolidateArgs, ConsolidateArgs};
use crate::cli::crud::{DeleteArgs, GetArgs, ListArgs};
use crate::cli::curator::CuratorArgs;
use crate::cli::forget::ForgetArgs;
use crate::cli::identity::IdentityArgs;
use crate::cli::install::InstallArgs;
use crate::cli::io::{ImportArgs, MineArgs};
use crate::cli::link::{LinkArgs, ResolveArgs};
use crate::cli::logs::LogsArgs;
use crate::cli::promote::PromoteArgs;
use crate::cli::recall::RecallArgs;
use crate::cli::rules::RulesArgs;
use crate::cli::search::SearchArgs;
use crate::cli::store::StoreArgs;
use crate::cli::sync::{SyncArgs, SyncDaemonArgs};
use crate::cli::update::UpdateArgs;
use crate::cli::verify::VerifyChainArgs;
use crate::cli::verify_signed_events::VerifySignedEventsChainArgs;
use crate::cli::wrap::WrapArgs;
use crate::config::{AppConfig, FeatureTier};
use crate::embeddings::Embedder;
use crate::handlers::{ApiKeyState, AppState, Db};
use crate::hnsw::VectorIndex;
use crate::{bench, cli, db, embeddings, federation, hnsw, llm, mcp, tls};
#[cfg(feature = "sal")]
use crate::migrate;
const DEFAULT_DB: &str = "ai-memory.db";
const DEFAULT_PORT: u16 = 9077;
const GC_INTERVAL_SECS: u64 = 30 * crate::SECS_PER_MINUTE as u64;
const WAL_CHECKPOINT_INTERVAL_SECS: u64 = 10 * crate::SECS_PER_MINUTE as u64;
const PENDING_TIMEOUT_SWEEP_INTERVAL_SECS: u64 = 60;
const PENDING_TIMEOUT_DEFAULT_SECS: i64 = crate::SECS_PER_DAY;
const TRANSCRIPT_LIFECYCLE_SWEEP_INTERVAL_SECS: u64 = 600;
const AGENT_QUOTA_RESET_INTERVAL_SECS: u64 = 60;
#[derive(Parser)]
#[command(
name = "ai-memory",
version,
about = "AI-agnostic persistent memory — MCP server, HTTP API, and CLI for any AI platform"
)]
pub struct Cli {
#[command(subcommand)]
pub command: Command,
#[arg(long, env = "AI_MEMORY_DB", default_value = DEFAULT_DB, global = true)]
pub db: PathBuf,
#[arg(long, global = true, default_value_t = false)]
pub json: bool,
#[arg(long, env = "AI_MEMORY_AGENT_ID", global = true)]
pub agent_id: Option<String>,
#[arg(long, global = true, value_name = "PATH")]
pub db_passphrase_file: Option<PathBuf>,
}
#[derive(Subcommand)]
pub enum Command {
Serve(ServeArgs),
Mcp {
#[arg(long, default_value = "semantic")]
tier: String,
#[arg(long, env = "AI_MEMORY_PROFILE")]
profile: Option<String>,
},
Store(StoreArgs),
Update(UpdateArgs),
Recall(RecallArgs),
Search(SearchArgs),
Get(GetArgs),
List(ListArgs),
Delete(DeleteArgs),
Promote(PromoteArgs),
Forget(ForgetArgs),
Link(LinkArgs),
Consolidate(ConsolidateArgs),
Gc,
Stats,
Namespaces,
Namespace(crate::cli::namespace::NamespaceArgs),
Config(crate::cli::commands::config::ConfigCliArgs),
Export,
Import(ImportArgs),
Resolve(ResolveArgs),
Shell,
Sync(SyncArgs),
SyncDaemon(SyncDaemonArgs),
AutoConsolidate(AutoConsolidateArgs),
Completions(CompletionsArgs),
Man,
Mine(MineArgs),
Archive(ArchiveArgs),
Agents(AgentsArgs),
Identity(IdentityArgs),
Offload(crate::cli::offload::OffloadArgs),
Deref(crate::cli::offload::DerefArgs),
Rules(RulesArgs),
Pending(PendingArgs),
Backup(BackupArgs),
Restore(RestoreArgs),
Curator(CuratorArgs),
Bench(BenchArgs),
#[cfg(feature = "sal")]
Migrate(MigrateArgs),
#[cfg(feature = "sal")]
SchemaInit(crate::cli::schema_init::SchemaInitArgs),
Doctor(DoctorCliArgs),
Boot(BootArgs),
Install(InstallArgs),
Wrap(WrapArgs),
Logs(LogsArgs),
Audit(AuditArgs),
Governance(GovernanceCliArgs),
VerifyReflectionChain(VerifyChainArgs),
VerifySignedEventsChain(VerifySignedEventsChainArgs),
ExportForensicBundle(crate::forensic::bundle::ExportForensicBundleArgs),
VerifyForensicBundle(crate::forensic::bundle::VerifyForensicBundleArgs),
ExportReflections(crate::cli::commands::export_reflections::ExportReflectionsArgs),
RecoverPreviousSession(
crate::cli::commands::recover_previous_session::RecoverPreviousSessionArgs,
),
Atomise(crate::cli::commands::atomise::AtomiseArgs),
Persona(crate::cli::commands::persona::PersonaArgs),
Calibrate(crate::cli::commands::calibrate_confidence::CalibrateArgs),
Skill(crate::cli::commands::skill::SkillArgs),
Share(crate::cli::share::ShareArgs),
KgQuery(crate::cli::commands::kg_query::KgQueryArgs),
FindPaths(crate::cli::commands::find_paths::FindPathsArgs),
RecallObservations(crate::cli::commands::recall_observations::RecallObservationsArgs),
Expand(crate::cli::commands::expand::ExpandArgs),
CheckDuplicate(crate::cli::commands::check_duplicate::CheckDuplicateArgs),
Reembed(crate::cli::commands::reembed::ReembedArgs),
Replay(crate::cli::commands::replay::ReplayArgs),
Reflect(crate::cli::commands::reflect::ReflectArgs),
Subscribe(crate::cli::commands::subscribe::SubscribeArgs),
Unsubscribe(crate::cli::commands::unsubscribe::UnsubscribeArgs),
ListSubscriptions(crate::cli::commands::list_subscriptions::ListSubscriptionsArgs),
SubscriptionReplay(crate::cli::commands::subscription_replay::SubscriptionReplayArgs),
SubscriptionDlqList(crate::cli::commands::subscription_dlq_list::SubscriptionDlqListArgs),
Notify(crate::cli::commands::notify::NotifyArgs),
Inbox(crate::cli::commands::inbox::InboxArgs),
IngestMultistep(crate::cli::commands::ingest_multistep::IngestMultistepArgs),
KgInvalidate(crate::cli::commands::kg_invalidate::KgInvalidateArgs),
KgTimeline(crate::cli::commands::kg_timeline::KgTimelineArgs),
EntityRegister(crate::cli::commands::entity_register::EntityRegisterArgs),
EntityGetByAlias(crate::cli::commands::entity_get_by_alias::EntityGetByAliasArgs),
DependentsOfInvalidated(
crate::cli::commands::dependents_of_invalidated::DependentsOfInvalidatedArgs,
),
ReflectionOrigin(crate::cli::commands::reflection_origin::ReflectionOriginArgs),
QuotaStatus(crate::cli::commands::quota_status::QuotaStatusArgs),
}
#[derive(Args)]
pub struct GovernanceCliArgs {
#[command(subcommand)]
pub action: GovernanceAction,
}
#[derive(clap::Subcommand)]
pub enum GovernanceAction {
MigrateToPermissions(crate::cli::governance_migrate::MigrateToPermissionsArgs),
InstallDefaults(crate::cli::governance_install_defaults::InstallDefaultsArgs),
CheckAction(crate::cli::governance_check_action::CheckActionArgs),
}
#[derive(Args)]
pub struct DoctorCliArgs {
#[arg(long, value_name = "URL")]
pub remote: Option<String>,
#[arg(long)]
pub json: bool,
#[arg(long)]
pub fail_on_warn: bool,
#[arg(long)]
pub tokens: bool,
#[arg(long, value_name = "PROFILE")]
pub profile: Option<String>,
#[arg(long)]
pub raw_table: bool,
#[arg(long)]
pub hooks: bool,
}
#[derive(Args)]
pub struct BenchArgs {
#[arg(long, default_value_t = bench::DEFAULT_ITERATIONS)]
pub iterations: usize,
#[arg(long, default_value_t = bench::DEFAULT_WARMUP)]
pub warmup: usize,
#[arg(long)]
pub json: bool,
#[arg(long, value_name = "PATH")]
pub baseline: Option<String>,
#[arg(long, default_value_t = bench::DEFAULT_REGRESSION_THRESHOLD_PCT)]
pub regression_threshold: f64,
#[arg(long, value_name = "PATH")]
pub history: Option<PathBuf>,
#[arg(long, value_name = "ROWS")]
pub scale: Option<usize>,
}
#[cfg(feature = "sal")]
const MIGRATE_BATCH_DEFAULT: usize = 1000;
#[cfg(feature = "sal")]
#[derive(Args)]
pub struct MigrateArgs {
#[arg(long)]
pub from: String,
#[arg(long)]
pub to: String,
#[arg(long, default_value_t = MIGRATE_BATCH_DEFAULT)]
pub batch: usize,
#[arg(long)]
pub namespace: Option<String>,
#[arg(long)]
pub dry_run: bool,
#[arg(long)]
pub json: bool,
}
#[derive(Args)]
pub struct ServeArgs {
#[arg(long, default_value = "127.0.0.1")]
pub host: String,
#[arg(long, default_value_t = DEFAULT_PORT)]
pub port: u16,
#[arg(long, requires = "tls_key")]
pub tls_cert: Option<PathBuf>,
#[arg(long, requires = "tls_cert")]
pub tls_key: Option<PathBuf>,
#[arg(long, requires = "tls_cert")]
pub mtls_allowlist: Option<PathBuf>,
#[arg(long, default_value_t = 30)]
pub shutdown_grace_secs: u64,
#[arg(long, default_value_t = 0)]
pub quorum_writes: usize,
#[arg(long, value_delimiter = ',')]
pub quorum_peers: Vec<String>,
#[arg(long, default_value_t = 2000)]
pub quorum_timeout_ms: u64,
#[arg(long)]
pub quorum_client_cert: Option<PathBuf>,
#[arg(long)]
pub quorum_client_key: Option<PathBuf>,
#[arg(long)]
pub quorum_ca_cert: Option<PathBuf>,
#[arg(long, default_value_t = 30)]
pub catchup_interval_secs: u64,
#[arg(long)]
pub federation_identity: Option<String>,
#[cfg(feature = "sal")]
#[arg(long, value_name = "URL")]
pub store_url: Option<String>,
}
#[derive(Args)]
pub struct CompletionsArgs {
pub shell: Shell,
}
#[allow(clippy::too_many_lines)]
pub async fn run(cli: Cli, app_config: &AppConfig) -> Result<()> {
if let Some(path) = &cli.db_passphrase_file {
let passphrase = passphrase_from_file(path)?;
unsafe { std::env::set_var("AI_MEMORY_DB_PASSPHRASE", passphrase) };
}
let db_path = app_config.effective_db(&cli.db);
{
let limits = app_config.resolve_limits();
crate::quotas::set_quota_defaults(crate::quotas::QuotaDefaults {
max_memories_per_day: limits.max_memories_per_day,
max_storage_bytes: limits.max_storage_bytes,
max_links_per_day: limits.max_links_per_day,
});
}
let resolved_storage = app_config.resolve_storage();
crate::storage::set_db_mmap_size(resolved_storage.db_mmap_size_bytes);
crate::reranker::set_rerank_max_seq(app_config.resolve_reranker().max_seq_tokens);
crate::confidence::decay::set_namespace_half_life_overrides(
app_config.confidence_decay_half_life_overrides(),
);
crate::config::set_configured_default_namespace(
resolved_storage
.explicit_default_namespace()
.map(str::to_string),
);
let j = cli.json;
let cli_agent_id: Option<String> = cli.agent_id.clone();
let needs_checkpoint = is_write_command(&cli.command);
let db_path_for_checkpoint = if needs_checkpoint {
Some(db_path.clone())
} else {
None
};
let result = match cli.command {
Command::Serve(a) => {
#[cfg(feature = "sal")]
if let Some(ref url) = a.store_url {
let db_was_explicit =
std::env::var("AI_MEMORY_DB").is_ok() || db_path != PathBuf::from(DEFAULT_DB);
if db_was_explicit {
anyhow::bail!(
"--db and --store-url are mutually exclusive. \
Pass exactly one. Got --db={} and --store-url={}",
db_path.display(),
crate::logging::redact_url_password(url),
);
}
}
serve(db_path, a, app_config).await
}
Command::Mcp { tier, profile } => {
let feature_tier = app_config.effective_tier(Some(&tier));
let resolved_profile = match app_config.effective_profile(profile.as_deref()) {
Ok(p) => p,
Err(e) => {
eprintln!("ai-memory mcp: invalid profile: {e}");
std::process::exit(2);
}
};
let db_path_owned = db_path.clone();
let app_config_owned = app_config.clone();
tokio::task::spawn_blocking(move || {
mcp::run_mcp_server(
&db_path_owned,
feature_tier,
&app_config_owned,
&resolved_profile,
)
})
.await
.map_err(|e| anyhow::anyhow!("mcp join: {e}"))??;
Ok(())
}
Command::Store(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
cli::store::run(
&db_path,
a,
j,
app_config,
cli_agent_id.as_deref(),
&mut out,
)
}
Command::Update(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
cli::update::run(&db_path, &a, j, &mut out)
}
Command::Recall(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
cli::recall::run(&db_path, &a, j, app_config, &mut out)
}
Command::Search(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
cli::search::run(&db_path, &a, j, &mut out)
}
Command::Get(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
cli::crud::cmd_get(&db_path, &a, j, &mut out)
}
Command::List(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
cli::crud::cmd_list(&db_path, &a, j, app_config, &mut out)
}
Command::Delete(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
cli::crud::cmd_delete(&db_path, &a, j, cli_agent_id.as_deref(), &mut out)
}
Command::Promote(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
cli::promote::cmd_promote(&db_path, &a, j, cli_agent_id.as_deref(), &mut out)
}
Command::Forget(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
cli::forget::cmd_forget(&db_path, &a, j, &mut out)
}
Command::Link(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
cli::link::cmd_link(&db_path, &a, j, &mut out)
}
Command::Consolidate(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
cli::consolidate::run(&db_path, a, j, cli_agent_id.as_deref(), &mut out)
}
Command::Resolve(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
cli::link::cmd_resolve(&db_path, &a, j, &mut out)
}
Command::Shell => cli::shell::run(&db_path),
Command::Sync(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
cli::sync::run(&db_path, &a, j, cli_agent_id.as_deref(), &mut out)
}
Command::SyncDaemon(a) => cli::sync::run_daemon(&db_path, a, cli_agent_id.as_deref()).await,
Command::AutoConsolidate(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
cli::consolidate::run_auto(&db_path, &a, j, cli_agent_id.as_deref(), &mut out)
}
Command::Gc => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
cli::gc::run_gc(&db_path, j, app_config, &mut out)
}
Command::Stats => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
cli::gc::run_stats(&db_path, j, &mut out)
}
Command::Namespaces => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
cli::gc::run_namespaces(&db_path, j, &mut out)
}
Command::Namespace(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
cli::namespace::run(&db_path, a, j, &mut out)
}
Command::Config(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
match cli::commands::config::run(&db_path, a, &mut out)? {
0 => Ok(()),
code => std::process::exit(code),
}
}
Command::Export => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
cli::io::export(&db_path, &mut out)
}
Command::Import(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
cli::io::import(&db_path, &a, j, cli_agent_id.as_deref(), &mut out)
}
Command::Completions(a) => {
generate(
a.shell,
&mut Cli::command(),
"ai-memory",
&mut std::io::stdout(),
);
Ok(())
}
Command::Man => {
let cmd = Cli::command();
let man = clap_mangen::Man::new(cmd);
man.render(&mut std::io::stdout())?;
Ok(())
}
Command::Mine(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
cli::io::mine(
&db_path,
a,
j,
app_config,
cli_agent_id.as_deref(),
&mut out,
)
}
Command::Archive(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
cli::archive::run(&db_path, a, j, &mut out)
}
Command::Agents(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
cli::agents::run_agents(&db_path, a, j, &mut out)
}
Command::Identity(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
cli::identity::run(a, j, &mut out)
}
Command::Offload(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
cli::offload::run_offload(&db_path, &a, &mut out)
}
Command::Deref(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
cli::offload::run_deref(&db_path, &a, &mut out)
}
Command::Rules(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
cli::rules::run(&db_path, a, j, &mut out)
}
Command::Pending(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
cli::agents::run_pending(&db_path, a, j, cli_agent_id.as_deref(), &mut out)
}
Command::Backup(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
cli::backup::run_backup(&db_path, &a, j, &mut out)
}
Command::Restore(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
cli::backup::run_restore(&db_path, &a, j, &mut out)
}
Command::Curator(a) => {
#[cfg(feature = "sal")]
if let Some(ref url) = a.store_url {
let db_was_explicit =
std::env::var("AI_MEMORY_DB").is_ok() || db_path != PathBuf::from(DEFAULT_DB);
if db_was_explicit {
anyhow::bail!(
"--db and --store-url are mutually exclusive. \
Pass exactly one. Got --db={} and --store-url={}",
db_path.display(),
crate::logging::redact_url_password(url),
);
}
}
init_tracing();
if a.daemon {
let mut so = std::io::sink();
let mut se = std::io::sink();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
cli::curator::run(&db_path, &a, app_config, &mut out).await
} else {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
cli::curator::run(&db_path, &a, app_config, &mut out).await
}
}
Command::Bench(a) => cmd_bench(&a),
#[cfg(feature = "sal")]
Command::Migrate(a) => cmd_migrate(&a).await,
#[cfg(feature = "sal")]
Command::SchemaInit(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
cli::schema_init::run(&a, &mut out).await
}
Command::Doctor(a) => {
let db_path_doctor = db_path.clone();
if a.tokens || a.raw_table {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
let exit = cli::doctor::run_tokens(
cli::doctor::TokensArgs {
json: a.json,
raw_table: a.raw_table,
profile: a.profile,
hooks: a.hooks,
},
&mut out,
)?;
std::process::exit(exit);
}
if a.hooks {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
let exit = cli::doctor::run_hooks(
cli::doctor::HooksReportArgs { json: a.json },
&mut out,
)?;
std::process::exit(exit);
}
let args = cli::doctor::DoctorArgs {
remote: a.remote,
json: a.json,
fail_on_warn: a.fail_on_warn,
};
let join = tokio::task::spawn_blocking(move || {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
cli::doctor::run(&db_path_doctor, &args, &mut out)
})
.await;
match join {
Ok(Ok(0)) => Ok(()),
Ok(Ok(code)) => std::process::exit(code),
Ok(Err(e)) => Err(e),
Err(e) => Err(anyhow::anyhow!("doctor task join failed: {e}")),
}
}
Command::Boot(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
crate::audit::emit(crate::audit::EventBuilder::new(
crate::audit::AuditAction::SessionBoot,
crate::audit::actor(
cli_agent_id.as_deref().unwrap_or("anonymous"),
"explicit_or_default",
None,
),
crate::audit::target_sweep(a.namespace.as_deref().unwrap_or("auto")),
));
cli::boot::run(&db_path, &a, app_config, &mut out)
}
Command::Install(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
cli::install::run(&a, &mut out)
}
Command::Wrap(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
let code = cli::wrap::run(&db_path, &a, app_config, &mut out)?;
drop(out);
drop(so);
drop(se);
if code == 0 {
Ok(())
} else {
std::process::exit(code);
}
}
Command::Logs(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
cli::logs::run(a, app_config, &mut out)
}
Command::Audit(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
match cli::audit::run(a, app_config, &mut out)? {
0 => Ok(()),
code => std::process::exit(code),
}
}
Command::Governance(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
match a.action {
GovernanceAction::MigrateToPermissions(args) => {
cli::governance_migrate::run(args, &mut out)
}
GovernanceAction::InstallDefaults(args) => {
cli::governance_install_defaults::run(&db_path, args, &mut out)
}
GovernanceAction::CheckAction(args) => {
cli::governance_check_action::run(&db_path, &args, &mut out)
}
}
}
Command::VerifyReflectionChain(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
match cli::verify::run(&db_path, &a, &mut out)? {
0 => Ok(()),
code => std::process::exit(code),
}
}
Command::VerifySignedEventsChain(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
match cli::verify_signed_events::run(&db_path, &a, &mut out)? {
0 => Ok(()),
code => std::process::exit(code),
}
}
Command::ExportForensicBundle(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
match cli::export::export(&db_path, &a, &mut out)? {
0 => Ok(()),
code => std::process::exit(code),
}
}
Command::VerifyForensicBundle(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
match cli::export::verify(&a, &mut out)? {
0 => Ok(()),
code => std::process::exit(code),
}
}
Command::ExportReflections(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
match cli::commands::export_reflections::run(&db_path, &a, &mut out)? {
0 => Ok(()),
code => std::process::exit(code),
}
}
Command::RecoverPreviousSession(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
match cli::commands::recover_previous_session::run(&db_path, &a, &mut out)? {
0 => Ok(()),
code => std::process::exit(code),
}
}
Command::Atomise(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
match cli::commands::atomise::run(
&db_path,
&a,
app_config,
cli_agent_id.as_deref(),
&mut out,
)? {
0 => Ok(()),
code => std::process::exit(code),
}
}
Command::Persona(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
match cli::commands::persona::run(&db_path, &a, None, None, &mut out)? {
0 => Ok(()),
code => std::process::exit(code),
}
}
Command::Calibrate(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
match a.subcommand {
cli::commands::calibrate_confidence::CalibrateSubcommand::Confidence(ref conf) => {
match cli::commands::calibrate_confidence::run(&db_path, conf, &mut out)? {
0 => Ok(()),
code => std::process::exit(code),
}
}
}
}
Command::Skill(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
match cli::commands::skill::run(&db_path, &a, None, &mut out)? {
0 => Ok(()),
code => std::process::exit(code),
}
}
Command::Share(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
cli::share::cmd_share(&db_path, &a, &mut out)
}
Command::KgQuery(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
cli::commands::kg_query::cmd_kg_query(&db_path, &a, &mut out)
}
Command::FindPaths(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
cli::commands::find_paths::cmd_find_paths(&db_path, &a, &mut out)
}
Command::RecallObservations(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
cli::commands::recall_observations::cmd_recall_observations(&db_path, &a, &mut out)
}
Command::CheckDuplicate(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
cli::commands::check_duplicate::cmd_check_duplicate(&db_path, &a, app_config, &mut out)
.await
}
Command::Expand(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
match cli::commands::expand::cmd_expand(&a, app_config, &mut out).await? {
0 => Ok(()),
code => std::process::exit(code),
}
}
Command::Reembed(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
match cli::commands::reembed::cmd_reembed(&db_path, &a, app_config, &mut out).await? {
0 => Ok(()),
code => std::process::exit(code),
}
}
Command::Replay(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
cli::commands::replay::cmd_replay(&db_path, &a, &mut out)
}
Command::Reflect(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
cli::commands::reflect::cmd_reflect(&db_path, &a, &mut out)
}
Command::Subscribe(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
cli::commands::subscribe::cmd_subscribe(&db_path, &a, &mut out)
}
Command::Unsubscribe(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
cli::commands::unsubscribe::cmd_unsubscribe(&db_path, &a, &mut out)
}
Command::ListSubscriptions(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
cli::commands::list_subscriptions::cmd_list_subscriptions(&db_path, &a, &mut out)
}
Command::SubscriptionReplay(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
cli::commands::subscription_replay::cmd_subscription_replay(&db_path, &a, &mut out)
}
Command::SubscriptionDlqList(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
cli::commands::subscription_dlq_list::cmd_subscription_dlq_list(&db_path, &a, &mut out)
}
Command::Notify(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
cli::commands::notify::cmd_notify(&db_path, &a, app_config, &mut out)
}
Command::Inbox(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
cli::commands::inbox::cmd_inbox(&db_path, &a, &mut out)
}
Command::IngestMultistep(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
cli::commands::ingest_multistep::cmd_ingest_multistep(&a, app_config, &mut out)
}
Command::KgInvalidate(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
cli::commands::kg_invalidate::cmd_kg_invalidate(&db_path, &a, &mut out)
}
Command::KgTimeline(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
cli::commands::kg_timeline::cmd_kg_timeline(&db_path, &a, &mut out)
}
Command::EntityRegister(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
cli::commands::entity_register::cmd_entity_register(&db_path, &a, &mut out)
}
Command::EntityGetByAlias(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
cli::commands::entity_get_by_alias::cmd_entity_get_by_alias(&db_path, &a, &mut out)
}
Command::DependentsOfInvalidated(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
cli::commands::dependents_of_invalidated::cmd_dependents_of_invalidated(
&db_path, &a, &mut out,
)
}
Command::ReflectionOrigin(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
cli::commands::reflection_origin::cmd_reflection_origin(&db_path, &a, &mut out)
}
Command::QuotaStatus(a) => {
let stdout = std::io::stdout();
let stderr = std::io::stderr();
let mut so = stdout.lock();
let mut se = stderr.lock();
let mut out = cli::CliOutput::from_std(&mut so, &mut se);
cli::commands::quota_status::cmd_quota_status(&db_path, &a, &mut out)
}
};
if result.is_ok()
&& let Some(cp_path) = db_path_for_checkpoint
&& let Ok(conn) = db::open(&cp_path)
{
let _ = db::checkpoint(&conn);
}
result
}
#[must_use]
pub fn is_write_command(cmd: &Command) -> bool {
matches!(
cmd,
Command::Store(_)
| Command::Update(_)
| Command::Delete(_)
| Command::Promote(_)
| Command::Forget(_)
| Command::Link(_)
| Command::Consolidate(_)
| Command::Resolve(_)
| Command::Sync(_)
| Command::SyncDaemon(_)
| Command::Import(_)
| Command::AutoConsolidate(_)
| Command::Gc
| Command::Atomise(_)
| Command::Skill(_)
| Command::Namespace(_)
| Command::Share(_)
| Command::Reflect(_)
| Command::Subscribe(_)
| Command::Unsubscribe(_)
| Command::Notify(_)
| Command::IngestMultistep(_)
| Command::KgInvalidate(_)
| Command::EntityRegister(_)
)
}
pub fn passphrase_from_file(path: &Path) -> Result<String> {
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let meta = std::fs::metadata(path).with_context(|| {
format!(
"stat passphrase file {} for permission check (#1055)",
path.display()
)
})?;
let mode = meta.permissions().mode();
let lax_bits = mode & 0o077;
if lax_bits != 0 {
let fail_open = std::env::var("AI_MEMORY_PASSPHRASE_FILE_ALLOW_LAX_PERMS")
.map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
.unwrap_or(false);
if fail_open {
tracing::warn!(
target: "ai_memory::daemon_runtime",
path = %path.display(),
mode = format!("{:o}", mode & 0o777),
"passphrase_from_file: file is group/world-readable; \
AI_MEMORY_PASSPHRASE_FILE_ALLOW_LAX_PERMS=1 — accepting \
(UNSAFE, legacy posture). Tighten with `chmod 0400 <path>` \
and clear the env var."
);
} else {
anyhow::bail!(
"passphrase file {} has lax permissions (mode {:o}, group/world bits set); \
tighten with `chmod 0400 {}` OR set \
AI_MEMORY_PASSPHRASE_FILE_ALLOW_LAX_PERMS=1 to opt out (#1055)",
path.display(),
mode & 0o777,
path.display(),
);
}
}
}
let mut raw = std::fs::read_to_string(path)
.with_context(|| format!("reading passphrase file {}", path.display()))?;
let passphrase = raw.trim_end_matches(['\n', '\r']).to_string();
{
use zeroize::Zeroize;
raw.zeroize();
}
if passphrase.is_empty() {
anyhow::bail!("passphrase file {} is empty", path.display());
}
Ok(passphrase)
}
pub fn apply_anonymize_default(app_config: &AppConfig) {
if app_config.effective_anonymize_default()
&& std::env::var(crate::identity::ENV_ANONYMIZE).is_err()
{
unsafe { std::env::set_var(crate::identity::ENV_ANONYMIZE, "1") };
}
}
#[must_use]
pub fn resolve_admin_agent_ids(admin_cfg: Option<&crate::config::AdminConfig>) -> Vec<String> {
if let Ok(raw) = std::env::var("AI_MEMORY_ADMIN_AGENT_IDS")
&& !raw.trim().is_empty()
{
let mut out = Vec::new();
for entry in raw.split(',') {
let id = entry.trim();
if id.is_empty() {
continue;
}
match crate::validate::validate_agent_id(id) {
Ok(()) => out.push(id.to_string()),
Err(e) => {
tracing::warn!(
"AI_MEMORY_ADMIN_AGENT_IDS entry '{id}' rejected: {e}; dropping"
);
}
}
}
return out;
}
admin_cfg
.map(crate::config::AdminConfig::validated_agent_ids)
.unwrap_or_default()
}
#[allow(deprecated)]
pub(crate) fn resolve_embedder_model(
tier_config: &crate::config::TierConfig,
app_config: &AppConfig,
) -> Option<crate::config::EmbeddingModel> {
let preset = tier_config.embedding_model;
let preset_label = preset
.map(|m| m.hf_model_id().to_string())
.unwrap_or_else(|| "none".to_string());
let configured = app_config
.embeddings
.as_ref()
.and_then(|section| section.model.clone())
.filter(|raw| !raw.trim().is_empty())
.map(|raw| (raw, "[embeddings].model"))
.or_else(|| {
app_config
.embedding_model
.clone()
.filter(|raw| !raw.trim().is_empty())
.map(|raw| (raw, "legacy embedding_model"))
});
let Some((raw, origin)) = configured else {
return preset;
};
match crate::config::EmbeddingModel::from_canonical_id(&raw) {
Some(model) => {
tracing::info!(
"embedder: using configured model {} from {origin} (tier-preset would have been {})",
model.hf_model_id(),
preset_label
);
Some(model)
}
None => {
tracing::warn!(
"embedder: configured model {raw:?} (from {origin}) is not constructible by the \
daemon embedder (supported: nomic-embed-text-v1.5, all-MiniLM-L6-v2); \
falling back to tier-preset {preset_label}"
);
preset
}
}
}
#[allow(deprecated)]
pub async fn build_embedder(feature_tier: FeatureTier, app_config: &AppConfig) -> Option<Embedder> {
let tier_config = feature_tier.config();
let resolved_embeddings = app_config.resolve_embeddings();
let tier_model = if crate::config::is_api_embed_backend(&resolved_embeddings.backend) {
tier_config.embedding_model
} else {
resolve_embedder_model(&tier_config, app_config)
};
let Some(emb_model) = tier_model else {
tracing::info!(
"embedder disabled — tier={} keyword-only (FTS5); semantic recall not wired",
feature_tier.as_str()
);
return None;
};
let resolved_for_build = resolved_embeddings.clone();
let build = match tokio::task::spawn_blocking(move || {
embeddings::Embedder::from_resolved(&resolved_for_build, Some(emb_model))
})
.await
{
Ok(b) => b,
Err(e) => {
tracing::error!("embedder spawn_blocking join failed: {e}");
return None;
}
};
match build {
Ok(Some(emb)) => {
tracing::info!(
"embedder loaded ({}) — tier={} semantic recall enabled",
emb.model_description(),
feature_tier.as_str()
);
Some(emb)
}
Ok(None) => None,
Err(e) => {
tracing::error!(
"EMBEDDER LOAD FAILED — tier={} requested semantic features, \
but embedder init errored: {e:#}. Semantic recall DEGRADED to \
keyword (#1593/#1598 fail-closed; the chat LLM client is NEVER \
reused for embeddings). Semantic recall, sync_push embedding \
refresh (#322), and HNSW index will be NO-OPS. For local \
backends check network egress to HuggingFace Hub + available \
memory for model weights; for API backends check the resolved \
base URL / API key (`ai-memory doctor`). To force keyword-only \
explicitly (silences this error), set `tier = \"keyword\"` in \
config.toml.",
feature_tier.as_str()
);
None
}
}
}
pub async fn build_llm_client(
feature_tier: FeatureTier,
app_config: &AppConfig,
) -> Option<llm::OllamaClient> {
let resolved = app_config.resolve_llm(None, None, None);
if feature_tier.config().llm_model.is_none()
&& matches!(
resolved.source,
crate::config::ConfigSource::CompiledDefault
)
{
tracing::debug!(
"L5: llm client disabled — tier={} has no llm_model preset AND no \
operator LLM config; set AI_MEMORY_LLM_BACKEND or [llm] section to enable",
feature_tier.as_str()
);
return None;
}
let backend = resolved.backend.clone();
let model = resolved.model.clone();
let source = resolved.source.as_str().to_string();
let key_source = resolved.api_key_source.as_str().to_string();
let tier_str = feature_tier.as_str().to_string();
let build = llm::OllamaClient::build_from_resolved_async(&resolved).await;
match build {
Ok(Some(client)) => {
tracing::info!(
"L5: llm client ready — tier={tier_str} backend={backend} \
model={model} source={source} key_source={key_source} \
— auto_tag/expand_query/contradiction-detection/reflection \
hooks armed (#1146 resolver path)"
);
Some(client)
}
Ok(None) => {
tracing::warn!(
"L5: llm client disabled — resolver returned no client \
(tier={tier_str} backend={backend} source={source}); \
LLM-powered hooks are no-ops"
);
None
}
Err(e) => {
tracing::warn!(
"L5: llm client init failed (tier={tier_str} backend={backend} \
source={source}); LLM-powered hooks are no-ops: {e}"
);
None
}
}
}
#[must_use]
pub fn build_vector_index(conn: &Connection, embedder_present: bool) -> Option<VectorIndex> {
if !embedder_present {
return None;
}
match db::get_all_embeddings(conn) {
Ok(entries) if !entries.is_empty() => Some(hnsw::VectorIndex::build(entries)),
_ => Some(hnsw::VectorIndex::empty()),
}
}
pub(crate) fn load_boot_index_entries(db_path: &Path) -> Option<Vec<(String, Vec<f32>)>> {
let conn = match db::open(db_path) {
Ok(c) => c,
Err(e) => {
tracing::warn!(
db_path = %db_path.display(),
err = %e,
"HNSW boot warm-up: could not open DB; semantic index stays cold (#1579 B3)"
);
return None;
}
};
match db::get_all_embeddings(&conn) {
Ok(entries) => Some(entries),
Err(e) => {
tracing::warn!(
err = %e,
"HNSW boot warm-up: get_all_embeddings failed; semantic index stays cold (#1579 B3)"
);
None
}
}
}
pub fn spawn_vector_index_boot_load(
db_path: std::path::PathBuf,
vector_index: Arc<tokio::sync::Mutex<Option<VectorIndex>>>,
) -> std::thread::JoinHandle<()> {
std::thread::spawn(move || {
let started = std::time::Instant::now();
let Some(entries) = load_boot_index_entries(&db_path) else {
return;
};
if entries.is_empty() {
tracing::info!(
"HNSW boot warm-up: no stored embeddings — index starts empty (#1579 B3)"
);
return;
}
let total = entries.len();
let build_handle = {
let guard = vector_index.blocking_lock();
let Some(idx) = guard.as_ref() else {
return;
};
idx.seed_and_rebuild_async(entries)
};
let _ = build_handle.join();
loop {
let pending = {
let guard = vector_index.blocking_lock();
let Some(idx) = guard.as_ref() else {
return;
};
if idx.is_fully_searchable() {
None
} else {
Some(idx.rebuild_async())
}
};
match pending {
None => break,
Some(handle) => {
let _ = handle.join();
std::thread::sleep(crate::hnsw::REBUILD_WAIT_POLL_INTERVAL);
}
}
}
#[allow(clippy::cast_possible_truncation)]
let elapsed_ms = started.elapsed().as_millis() as u64;
tracing::info!(
entries = total,
elapsed_ms,
"HNSW index warm (#1579 B3): async boot build swapped in; \
semantic recall is now index-backed"
);
})
}
use crate::identity::keypair::DAEMON_KEYPAIR_LABEL;
fn ensure_and_load_daemon_keypair() -> (
Option<crate::identity::keypair::AgentKeypair>,
Option<crate::identity::keypair::EnsureOutcome>,
) {
let dir = match crate::identity::keypair::default_key_dir() {
Ok(d) => d,
Err(e) => {
tracing::info!("identity: no default key dir available, link signing disabled: {e}");
return (None, None);
}
};
let outcome = match crate::identity::keypair::ensure_keypair(DAEMON_KEYPAIR_LABEL, &dir, false)
{
Ok(o) => o,
Err(e) => {
tracing::warn!("identity: keypair auto-gen failed: {e:#}");
return (None, None);
}
};
if matches!(
outcome,
crate::identity::keypair::EnsureOutcome::SkippedDisabled
) {
return (None, Some(outcome));
}
let kp = match crate::identity::keypair::load(DAEMON_KEYPAIR_LABEL, &dir) {
Ok(kp) if kp.can_sign() => {
tracing::info!(
"identity: loaded signing keypair for {DAEMON_KEYPAIR_LABEL} from {}",
dir.display()
);
Some(kp)
}
Ok(_) => {
tracing::info!(
"identity: only public key on disk for {DAEMON_KEYPAIR_LABEL}; link signing disabled"
);
None
}
Err(e) => {
tracing::warn!(
"identity: keypair load failed for {DAEMON_KEYPAIR_LABEL}: {e:#}; link signing disabled"
);
None
}
};
(kp, Some(outcome))
}
#[must_use]
pub fn spawn_gc_loop(
state: Db,
archive_max_days: Option<i64>,
interval: Duration,
) -> JoinHandle<()> {
spawn_gc_loop_with_shadow_retention(
state,
archive_max_days,
crate::confidence::shadow::DEFAULT_SHADOW_RETENTION_DAYS,
interval,
)
}
#[must_use]
pub fn spawn_gc_loop_with_shadow_retention(
state: Db,
archive_max_days: Option<i64>,
shadow_retention_days: i64,
interval: Duration,
) -> JoinHandle<()> {
tokio::spawn(async move {
loop {
tokio::time::sleep(interval).await;
let lock = state.lock().await;
match db::gc(&lock.0, lock.3) {
Ok(n) if n > 0 => tracing::info!("gc: expired {n} memories"),
_ => {}
}
match db::auto_purge_archive(&lock.0, archive_max_days) {
Ok(n) if n > 0 => tracing::info!("gc: purged {n} old archived memories"),
_ => {}
}
match crate::confidence::shadow::gc_observations(&lock.0, shadow_retention_days) {
Ok(n) if n > 0 => tracing::info!(
"gc: purged {n} shadow observations older than {shadow_retention_days}d"
),
Ok(_) => {}
Err(e) => tracing::warn!("shadow observation gc failed: {e}"),
}
match crate::observations::gc::prune(&lock.0) {
Ok(n) if n > 0 => {
tracing::info!("gc: pruned {n} expired recall_observations");
}
Ok(_) => {}
Err(e) => tracing::warn!("recall_observations gc failed: {e}"),
}
}
})
}
#[must_use]
pub fn spawn_pending_timeout_sweep_loop(
state: Db,
db_path: PathBuf,
default_secs: i64,
interval: Duration,
) -> JoinHandle<()> {
tokio::spawn(async move {
loop {
tokio::time::sleep(interval).await;
let expired = {
let lock = state.lock().await;
match db::sweep_pending_action_timeouts(&lock.0, default_secs) {
Ok(rows) => rows,
Err(e) => {
tracing::warn!("pending_actions sweep failed: {e}");
Vec::new()
}
}
};
if expired.is_empty() {
continue;
}
tracing::info!(
"pending_actions sweep: marked {} row(s) expired",
expired.len()
);
for (id, namespace) in expired {
let lock = state.lock().await;
crate::subscriptions::dispatch_event(
&lock.0,
"pending_action_expired",
&id,
&namespace,
None,
&db_path,
);
}
}
})
}
#[must_use]
pub fn spawn_transcript_lifecycle_sweep_loop(
state: Db,
cfg: crate::config::TranscriptsConfig,
interval: Duration,
) -> JoinHandle<()> {
tokio::spawn(async move {
loop {
tokio::time::sleep(interval).await;
let report = {
let lock = state.lock().await;
match crate::transcripts::sweep_transcript_lifecycle(&lock.0, &cfg) {
Ok(r) => r,
Err(e) => {
tracing::warn!("transcript lifecycle sweep failed: {e}");
continue;
}
}
};
if report.archived > 0 || report.pruned > 0 || report.errors > 0 {
tracing::info!(
"transcript lifecycle sweep: archived={} pruned={} errors={}",
report.archived,
report.pruned,
report.errors,
);
}
}
})
}
#[must_use]
pub fn spawn_agent_quota_reset_loop(state: Db, interval: Duration) -> JoinHandle<()> {
tokio::spawn(async move {
loop {
tokio::time::sleep(interval).await;
let reset_count = {
let lock = state.lock().await;
match crate::quotas::reset_daily(&lock.0) {
Ok(n) => n,
Err(e) => {
tracing::warn!("agent_quotas daily reset failed: {e}");
continue;
}
}
};
if reset_count > 0 {
tracing::info!("agent_quotas daily reset: {reset_count} row(s) zeroed");
}
}
})
}
#[must_use]
pub fn spawn_wal_checkpoint_loop(state: Db, interval: Duration) -> JoinHandle<()> {
let half = interval / 2;
tokio::spawn(async move {
tokio::time::sleep(half).await;
loop {
{
let lock = state.lock().await;
match db::checkpoint(&lock.0) {
Ok(()) => tracing::debug!("wal checkpoint: ok"),
Err(e) => tracing::warn!("wal checkpoint failed: {e}"),
}
}
tokio::time::sleep(interval).await;
}
})
}
#[must_use]
pub fn build_router(app_state: AppState, api_key_state: ApiKeyState) -> Router {
crate::build_router(api_key_state, app_state)
}
pub struct ServeBootstrap {
pub app_state: AppState,
pub api_key_state: ApiKeyState,
pub db_state: Db,
pub archive_max_days: Option<i64>,
pub task_handles: Vec<JoinHandle<()>>,
pub daemon_keypair_outcome: Option<crate::identity::keypair::EnsureOutcome>,
pub request_timeout: std::time::Duration,
pub deferred_audit_metrics: crate::governance::deferred_audit::DeferredAuditMetrics,
}
#[cfg(feature = "sal")]
#[cfg(feature = "sal")]
#[must_use]
#[allow(deprecated)]
fn resolve_configured_embedding_dim(
app_config: &crate::config::AppConfig,
tier_config: &crate::config::TierConfig,
) -> Option<u32> {
let preset = tier_config.embedding_model;
let resolved = app_config.resolve_embeddings();
resolved
.embedding_dim
.or_else(|| {
app_config
.embedding_model
.as_deref()
.and_then(|raw| raw.parse::<crate::config::EmbeddingModel>().ok())
.map(|m| u32::try_from(m.dim()).unwrap_or(384))
})
.or_else(|| preset.map(|m| u32::try_from(m.dim()).unwrap_or(384)))
}
#[cfg(feature = "sal")]
pub(crate) async fn build_curator_store(
store_url: Option<&str>,
db_path: &Path,
app_config: &crate::config::AppConfig,
) -> Result<Arc<dyn crate::store::MemoryStore>> {
let tier_config = app_config.effective_tier(None).config();
let configured_embedding_dim = resolve_configured_embedding_dim(app_config, &tier_config);
let (_backend, store) = build_store_handle(
store_url,
db_path,
app_config.postgres_statement_timeout_secs,
configured_embedding_dim,
app_config.resolve_pg_pool(),
)
.await
.context("build SAL store handle for curator")?;
Ok(store)
}
#[cfg(feature = "sal")]
async fn build_store_handle(
store_url: Option<&str>,
db_path: &Path,
postgres_statement_timeout_secs: Option<u64>,
configured_embedding_dim: Option<u32>,
pool: crate::store::PoolConfig,
) -> Result<(
crate::handlers::StorageBackend,
Arc<dyn crate::store::MemoryStore>,
)> {
use crate::handlers::StorageBackend;
match store_url {
Some(url) => {
let lowered = url.to_ascii_lowercase();
if crate::migrate::is_postgres_url(&lowered) {
#[cfg(feature = "sal-postgres")]
{
let timeout = postgres_statement_timeout_secs
.unwrap_or(crate::store::postgres::DEFAULT_STATEMENT_TIMEOUT_SECS);
let display_url = crate::logging::redact_url_password(url);
let store = if let Some(dim) = configured_embedding_dim {
tracing::info!(
"Wave-3 (issue #877): opening Postgres SAL store at {display_url} \
(statement_timeout={timeout}s, embedding_dim={dim}, auto_migrate=on, \
pool_max={}, pool_min={}, acquire_timeout={}s)",
pool.max_connections,
pool.min_connections,
pool.acquire_timeout_secs
);
crate::store::postgres::PostgresStore::connect_with_dim_and_timeout_auto_migrate(
url, dim, timeout, pool,
)
.await
.context("connect postgres adapter (auto-migrate dim)")?
} else {
tracing::info!(
"Wave-3: opening Postgres SAL store at {display_url} \
(statement_timeout={timeout}s, no embedder configured, \
pool_max={}, pool_min={}, acquire_timeout={}s)",
pool.max_connections,
pool.min_connections,
pool.acquire_timeout_secs
);
crate::store::postgres::PostgresStore::connect_with_dim_and_timeout(
url,
crate::store::postgres::DEFAULT_EMBEDDING_DIM,
timeout,
pool,
)
.await
.context("connect postgres adapter")?
};
Ok((StorageBackend::Postgres, Arc::new(store)))
}
#[cfg(not(feature = "sal-postgres"))]
{
let _ = url;
let _ = postgres_statement_timeout_secs;
let _ = configured_embedding_dim;
let _ = pool;
anyhow::bail!(
"--store-url postgres:// requires the binary to be built with \
--features sal-postgres; this binary was built with --features sal only"
);
}
} else if let Some(path) = url
.strip_prefix("sqlite://")
.or_else(|| url.strip_prefix("SQLITE://"))
{
let clean = path
.strip_prefix('/')
.map_or(path, |p| if p.starts_with('/') { p } else { path });
tracing::info!("Wave-3: opening SQLite SAL store at {clean} (--store-url)");
let store = crate::store::sqlite::SqliteStore::open(clean)
.map_err(|e| anyhow::anyhow!("open sqlite adapter: {e}"))?;
Ok((StorageBackend::Sqlite, Arc::new(store)))
} else {
anyhow::bail!(
"unrecognised --store-url: {} (expected sqlite:///path or postgres://...)",
crate::logging::redact_url_password(url)
)
}
}
None => {
let _ = postgres_statement_timeout_secs;
let _ = configured_embedding_dim;
let _ = pool;
tracing::debug!("Wave-3: --store-url absent; opening SQLite SAL store at --db path");
let store = crate::store::sqlite::SqliteStore::open(db_path)
.map_err(|e| anyhow::anyhow!("open sqlite adapter: {e}"))?;
Ok((StorageBackend::Sqlite, Arc::new(store)))
}
}
}
const WIRE_ACTION_ACTOR: &str = "daemon:wire_action";
fn governance_fail_open_on_error() -> bool {
std::env::var(ENV_GOVERNANCE_FAIL_OPEN)
.map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
.unwrap_or(false)
}
const ENV_GOVERNANCE_FAIL_OPEN: &str = "AI_MEMORY_GOVERNANCE_FAIL_OPEN_ON_ERROR";
pub(crate) fn install_governance_pre_write_hook(
db_path: &Path,
deferred_audit_queue: &crate::governance::deferred_audit::DeferredAuditQueue,
rule_cache: &Arc<crate::governance::rule_cache::RuleCache>,
hook_consultation_conn: Option<Arc<std::sync::Mutex<rusqlite::Connection>>>,
) {
use crate::governance::agent_action::{
AgentAction, Decision as RuleDecision, check_agent_action_deferred_cached,
};
let rules_db_path = db_path.to_path_buf();
let queue_for_hook = deferred_audit_queue.clone();
let cache_for_hook = Arc::clone(rule_cache);
let conn_for_hook = hook_consultation_conn;
let install_result = crate::storage::GOVERNANCE_PRE_WRITE.set(Box::new(
move |mem: &crate::models::Memory| -> std::result::Result<(), String> {
let action = AgentAction::Custom {
custom_kind: "memory_write".to_string(),
payload: serde_json::json!({
"namespace": mem.namespace,
"tier": mem.tier.as_str(),
(field_names::MEMORY_KIND): mem.memory_kind.as_str(),
"title": mem.title,
}),
};
let agent_id = mem
.metadata
.get("agent_id")
.and_then(|v| v.as_str())
.unwrap_or("substrate:pre_write_hook")
.to_string();
let Some(conn_arc) = conn_for_hook.as_ref() else {
return governance_consultation_unavailable(
&queue_for_hook,
&agent_id,
&action,
&rules_db_path,
"L1-6 governance pre-write",
);
};
let conn_guard = match conn_arc.lock() {
Ok(g) => g,
Err(poisoned) => {
tracing::warn!(
"L1-6 governance pre-write: consultation connection mutex poisoned; \
recovering inner connection and continuing"
);
poisoned.into_inner()
}
};
let conn_for_check: &rusqlite::Connection = &conn_guard;
match check_agent_action_deferred_cached(
conn_for_check,
Some(&cache_for_hook),
&agent_id,
&action,
&queue_for_hook,
) {
Ok(RuleDecision::Allow | RuleDecision::Warn { .. }) => Ok(()),
Ok(RuleDecision::Refuse { rule_id, reason }) => {
tracing::info!(
"L1-6 governance pre-write refused namespace={:?} rule_id={} \
reason={} (chain-logged via deferred audit queue)",
mem.namespace,
rule_id,
reason
);
Err(reason)
}
Err(e) => {
let reason = format!("governance:consultation_failed: {e}");
let fail_open = std::env::var("AI_MEMORY_GOVERNANCE_FAIL_OPEN_ON_ERROR")
.map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
.unwrap_or(false);
let synthetic_refusal = RuleDecision::Refuse {
rule_id: "governance:consultation_failed".to_string(),
reason: reason.clone(),
};
queue_for_hook.submit_refusal(&agent_id, &action, &synthetic_refusal);
if fail_open {
tracing::warn!(
"L1-6 governance pre-write: rule consultation failed: {}; \
AI_MEMORY_GOVERNANCE_FAIL_OPEN_ON_ERROR=1 — \
degrading to ALLOW (UNSAFE, legacy posture)",
e
);
Ok(())
} else {
tracing::warn!(
"L1-6 governance pre-write: rule consultation failed: {}; \
failing CLOSED (post-#1054 secure default — \
set AI_MEMORY_GOVERNANCE_FAIL_OPEN_ON_ERROR=1 to revert)",
e
);
Err(reason)
}
}
}
},
));
if install_result.is_err() {
tracing::debug!(
"L1-6 governance pre-write hook already installed (process-wide OnceLock); \
the existing hook remains active for this process"
);
} else {
tracing::info!(
"L1-6 governance pre-write hook installed (substrate-authoritative \
memory_write gate active + deferred chain-log on refusal)"
);
}
}
pub(crate) fn install_governance_pre_action_hook(
db_path: &Path,
deferred_audit_queue: &crate::governance::deferred_audit::DeferredAuditQueue,
rule_cache: &Arc<crate::governance::rule_cache::RuleCache>,
hook_consultation_conn: Option<Arc<std::sync::Mutex<rusqlite::Connection>>>,
) {
use crate::governance::agent_action::{
AgentAction, Decision as RuleDecision, check_agent_action_deferred_cached,
};
let rules_db_path = db_path.to_path_buf();
let cache_for_wire_check = Arc::clone(rule_cache);
let queue_for_wire_check = deferred_audit_queue.clone();
let conn_for_wire_check = hook_consultation_conn;
let install_result = crate::governance::wire_check::GOVERNANCE_PRE_ACTION.set(Box::new(
move |action: &AgentAction| -> std::result::Result<(), String> {
let Some(conn_arc) = conn_for_wire_check.as_ref() else {
return governance_consultation_unavailable(
&queue_for_wire_check,
WIRE_ACTION_ACTOR,
action,
&rules_db_path,
"wire_check",
);
};
let conn_guard = match conn_arc.lock() {
Ok(g) => g,
Err(poisoned) => {
tracing::warn!(
"wire_check: consultation connection mutex poisoned; \
recovering inner connection and continuing"
);
poisoned.into_inner()
}
};
let conn_for_check: &rusqlite::Connection = &conn_guard;
match check_agent_action_deferred_cached(
conn_for_check,
Some(&cache_for_wire_check),
WIRE_ACTION_ACTOR,
action,
&queue_for_wire_check,
) {
Ok(RuleDecision::Allow | RuleDecision::Warn { .. }) => Ok(()),
Ok(RuleDecision::Refuse { rule_id, reason }) => {
tracing::info!(
"wire_check refused action kind={} rule_id={} reason={} \
(chain-logged via deferred audit queue)",
action.kind(),
rule_id,
reason,
);
Err(reason)
}
Err(e) => {
let reason = format!("governance:consultation_failed: {e}");
let fail_open = std::env::var("AI_MEMORY_GOVERNANCE_FAIL_OPEN_ON_ERROR")
.map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
.unwrap_or(false);
let synthetic_refusal = RuleDecision::Refuse {
rule_id: "governance:consultation_failed".to_string(),
reason: reason.clone(),
};
queue_for_wire_check.submit_refusal(
WIRE_ACTION_ACTOR,
action,
&synthetic_refusal,
);
if fail_open {
tracing::warn!(
"wire_check: rule consultation failed: {}; \
AI_MEMORY_GOVERNANCE_FAIL_OPEN_ON_ERROR=1 — \
degrading to ALLOW for this action ({}) (UNSAFE, legacy posture)",
e,
action.kind(),
);
Ok(())
} else {
tracing::warn!(
"wire_check: rule consultation failed: {}; failing CLOSED \
for this action ({}) (post-#1054 secure default — set \
AI_MEMORY_GOVERNANCE_FAIL_OPEN_ON_ERROR=1 to revert)",
e,
action.kind(),
);
Err(reason)
}
}
}
},
));
if install_result.is_err() {
tracing::debug!(
"wire_check pre-action hook already installed (process-wide OnceLock); \
the existing hook remains active for this daemon"
);
} else {
tracing::info!(
"wire_check pre-action hook installed (agent-action gate active for \
FilesystemWrite/NetworkRequest/ProcessSpawn; n26: Bash + Custom \
have no egress sink yet — structural coverage tracked v0.8 #1695)"
);
}
}
fn governance_consultation_unavailable(
queue: &crate::governance::deferred_audit::DeferredAuditQueue,
agent_id: &str,
action: &crate::governance::agent_action::AgentAction,
rules_db_path: &Path,
surface: &str,
) -> std::result::Result<(), String> {
governance_consultation_unavailable_inner(
queue,
agent_id,
action,
rules_db_path,
surface,
governance_fail_open_on_error(),
)
}
fn governance_consultation_unavailable_inner(
queue: &crate::governance::deferred_audit::DeferredAuditQueue,
agent_id: &str,
action: &crate::governance::agent_action::AgentAction,
rules_db_path: &Path,
surface: &str,
fail_open: bool,
) -> std::result::Result<(), String> {
use crate::governance::agent_action::Decision as RuleDecision;
let reason = format!(
"governance:consultation_unavailable: rules DB at {} could not be opened at hook install",
rules_db_path.display(),
);
let synthetic_refusal = RuleDecision::Refuse {
rule_id: "governance:consultation_unavailable".to_string(),
reason: reason.clone(),
};
queue.submit_refusal(agent_id, action, &synthetic_refusal);
if fail_open {
tracing::warn!(
"{surface}: hook consultation connection unavailable (rules DB at {}); \
{ENV_GOVERNANCE_FAIL_OPEN}=1 — degrading to ALLOW (UNSAFE, legacy posture)",
rules_db_path.display(),
);
Ok(())
} else {
tracing::warn!(
"{surface}: hook consultation connection unavailable (rules DB at {}); failing CLOSED \
(#1455 secure default — set {ENV_GOVERNANCE_FAIL_OPEN}=1 to revert)",
rules_db_path.display(),
);
Err(reason)
}
}
fn require_api_key_strict() -> bool {
std::env::var("AI_MEMORY_REQUIRE_API_KEY")
.map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
.unwrap_or(false)
}
fn api_key_bind_guard(
api_key_present: bool,
host: &str,
strict: bool,
) -> std::result::Result<Option<String>, String> {
if api_key_present {
return Ok(None);
}
if strict {
return Err(format!(
"refusing to start without an API key: AI_MEMORY_REQUIRE_API_KEY is set, which \
mandates `api_key` on every bind (requested host {host:?}). A reverse proxy, \
--network=host container, or socat forward can present loopback to the daemon \
while exposing it off-host, so the loopback guard alone is insufficient. \
Set top-level `api_key = \"...\"` in config (or --api-key on the CLI), or unset \
AI_MEMORY_REQUIRE_API_KEY to fall back to the loopback-only default. (#1458)"
));
}
let is_loopback = host == "127.0.0.1"
|| host == "::1"
|| host == "localhost"
|| host == "0:0:0:0:0:0:0:1"
|| host == "[::1]";
if !is_loopback {
return Err(format!(
"refusing to bind to non-loopback address {host:?} without an API key: \
the daemon's api_key is unset (default-off auth would expose every \
privileged endpoint to any caller that can reach the bind address). \
Either set top-level `api_key = \"...\"` in config (or --api-key on the CLI) and rebind, \
or rebind to 127.0.0.1 / ::1 / localhost for a single-tenant deployment. \
(v0.7.0 fix campaign S5-C1, 2026-05-13. Note: api_key is a TOP-LEVEL \
AppConfig field per src/config.rs:2283; [api] subsection is silently ignored by serde.)"
));
}
Ok(Some(format!(
"API key NOT configured — daemon bound to loopback {host:?}. \
Privileged endpoints (POST /memories, /links, /agents, /subscriptions) \
accept any caller that reaches this listener. #1458: a reverse proxy, \
--network=host container, or socat forward presents loopback to the daemon \
while exposing it off-host, re-opening this keyless write surface — set \
top-level `api_key = \"...\"` (or AI_MEMORY_REQUIRE_API_KEY=1 to hard-require it) \
for any deployment that is not strictly single-tenant on this host. \
/approve and /reject remain HMAC-gated regardless."
)))
}
#[allow(deprecated)]
pub async fn bootstrap_serve(
db_path: &Path,
args: &ServeArgs,
app_config: &AppConfig,
) -> Result<ServeBootstrap> {
match api_key_bind_guard(
app_config.api_key.is_some(),
args.host.as_str(),
require_api_key_strict(),
) {
Ok(None) => {}
Ok(Some(warning)) => tracing::warn!("{warning}"),
Err(reason) => anyhow::bail!("{reason}"),
}
let resolved_ttl = app_config.effective_ttl();
let archive_on_gc = app_config.effective_archive_on_gc();
let conn = db::open(db_path)?;
let enabled_rule_count =
crate::governance::rules_store::count_enabled_rules(&conn).unwrap_or(0);
let pubkey_resolved = crate::governance::rules_store::resolve_operator_pubkey().is_some();
if enabled_rule_count > 0 && !pubkey_resolved {
crate::governance::rules_store::log_missing_operator_pubkey_once(enabled_rule_count);
if app_config
.governance
.as_ref()
.is_some_and(|g| g.require_operator_pubkey)
{
anyhow::bail!(
"SEC-2 fail-closed: `[governance] require_operator_pubkey = true` is set but \
`governance_rules` contains {enabled_rule_count} enabled row(s) AND no \
operator pubkey is resolved (AI_MEMORY_OPERATOR_PUBKEY unset AND \
~/.config/ai-memory/operator.key.pub absent). Refusing to start: a fail-OPEN \
L1-6 loader would honour every enabled rule without signature verification. \
Run `ai-memory rules keygen` + `ai-memory rules sign-seed` to activate L1-6, \
or unset `require_operator_pubkey` to accept the pre-L1-6 posture."
);
}
}
let (deferred_audit_queue, deferred_audit_supervisor) =
crate::governance::deferred_audit::install_deferred_audit_drainer(db_path);
let deferred_audit_metrics = deferred_audit_queue.metrics();
tracing::info!(
"policy-engine item 3: deferred-audit drainer spawned (chain-logs \
storage refusals as `governance.refusal` rows in signed_events)"
);
let rule_cache: Arc<crate::governance::rule_cache::RuleCache> =
Arc::new(crate::governance::rule_cache::RuleCache::new());
let hook_consultation_conn: Option<Arc<std::sync::Mutex<rusqlite::Connection>>> =
match db::open(db_path) {
Ok(c) => Some(Arc::new(std::sync::Mutex::new(c))),
Err(e) => {
tracing::warn!(
target: "ai_memory::daemon_runtime",
"v0.7.0 #1017: failed to open hook consultation connection at {}: {}; \
governance hooks will degrade to ALLOW on every invocation",
db_path.display(),
e,
);
None
}
};
install_governance_pre_write_hook(
db_path,
&deferred_audit_queue,
&rule_cache,
hook_consultation_conn.clone(),
);
install_governance_pre_action_hook(
db_path,
&deferred_audit_queue,
&rule_cache,
hook_consultation_conn.clone(),
);
let feature_tier = app_config.effective_tier(None);
let tier_config = feature_tier.config();
let embedder = build_embedder(feature_tier, app_config).await;
let vector_index_state: Arc<Mutex<Option<VectorIndex>>> = Arc::new(Mutex::new(
embedder.is_some().then(hnsw::VectorIndex::empty),
));
if embedder.is_some() {
let _boot_index_loader =
spawn_vector_index_boot_load(db_path.to_path_buf(), Arc::clone(&vector_index_state));
}
let llm = build_llm_client(feature_tier, app_config).await;
let db_state: Db = Arc::new(Mutex::new((
conn,
db_path.to_path_buf(),
resolved_ttl,
archive_on_gc,
)));
#[cfg_attr(not(feature = "sal"), allow(unused_mut))]
let mut federation = federation::FederationConfig::build(
args.quorum_writes,
&args.quorum_peers,
std::time::Duration::from_millis(args.quorum_timeout_ms),
args.quorum_client_cert.as_deref(),
args.quorum_client_key.as_deref(),
args.quorum_ca_cert.as_deref(),
federation::identity::resolve_federation_identity(args.federation_identity.as_deref()),
app_config.api_key.clone(),
)
.context("federation config")?;
let mut task_handles: Vec<JoinHandle<()>> = Vec::new();
if let Some(ref fed) = federation {
tracing::info!(
"federation enabled: W={} over {} peer(s), timeout {}ms",
fed.policy.w,
fed.peer_count(),
args.quorum_timeout_ms,
);
if args.catchup_interval_secs > 0 {
tracing::info!(
"catchup loop enabled: polling {} peer(s) every {}s",
fed.peer_count(),
args.catchup_interval_secs,
);
} else {
tracing::info!("catchup loop disabled (--catchup-interval-secs=0)");
}
}
let resolved_profile = app_config
.effective_profile(None)
.unwrap_or_else(|_| crate::profile::Profile::core());
let mcp_config_for_http = app_config.mcp.clone();
let (active_keypair, daemon_keypair_outcome) = ensure_and_load_daemon_keypair();
let family_embeddings: Arc<
tokio::sync::RwLock<Option<Vec<(crate::profile::Family, Vec<f32>)>>>,
> = Arc::new(tokio::sync::RwLock::new(None));
let embedder_arc = Arc::new(embedder);
if tier_config.cross_encoder {
tracing::info!("serve: loading neural cross-encoder (#1691 HTTP recall rerank)");
let ce = crate::reranker::CrossEncoder::new_neural();
if ce.is_neural() {
tracing::info!("serve: neural cross-encoder ready (batched)");
} else {
tracing::warn!("serve: neural cross-encoder unavailable, using lexical fallback");
}
crate::runtime_context::RuntimeContext::global().install_reranker(Arc::new(
crate::reranker::BatchedReranker::with_score_floor(
ce,
app_config.resolve_reranker_score_floor(),
),
));
}
if std::env::var("AI_MEMORY_PRECOMPUTE_FAMILY_EMBEDDINGS")
.ok()
.as_deref()
== Some("1")
{
let cache = family_embeddings.clone();
let embedder_for_task = embedder_arc.clone();
task_handles.push(tokio::spawn(async move {
let computed = tokio::task::spawn_blocking(move || {
AppState::precompute_family_embeddings(
embedder_for_task
.as_ref()
.as_ref()
.map(|e| e as &dyn crate::embeddings::Embed),
)
})
.await
.unwrap_or_else(|e| {
tracing::warn!(
error = %e,
"B3: family-descriptor precompute task panicked; \
family_embeddings will stay empty",
);
Vec::new()
});
if !computed.is_empty() {
tracing::info!(
"B3: pre-computed {} family-descriptor embeddings (async)",
computed.len(),
);
}
*cache.write().await = Some(computed);
}));
} else {
tracing::debug!(
"B3: family-descriptor precompute disabled \
(AI_MEMORY_PRECOMPUTE_FAMILY_EMBEDDINGS != 1); \
best_family_match will return None until B2 wires \
the smart loader and the gate is flipped on"
);
}
#[cfg(feature = "sal")]
let configured_embedding_dim: Option<u32> =
resolve_configured_embedding_dim(app_config, &tier_config);
#[cfg(feature = "sal")]
let (storage_backend, store_handle) = build_store_handle(
args.store_url.as_deref(),
db_path,
app_config.postgres_statement_timeout_secs,
configured_embedding_dim,
app_config.resolve_pg_pool(),
)
.await
.context("build SAL store handle")?;
#[cfg(not(feature = "sal"))]
let storage_backend = crate::handlers::StorageBackend::Sqlite;
#[cfg(feature = "sal")]
if let Some(ref mut fed) = federation {
let sink: std::sync::Arc<dyn federation::FederationDlqSink> = match storage_backend {
#[cfg(feature = "sal-postgres")]
crate::handlers::StorageBackend::Postgres => {
if let Some(pg) = store_handle
.as_any()
.downcast_ref::<crate::store::postgres::PostgresStore>()
{
std::sync::Arc::new(federation::push_dlq::PostgresDlqSink::new(
std::sync::Arc::new(pg.clone()),
))
} else {
tracing::warn!(
"federation push DLQ: PostgresStore downcast failed; \
falling back to sqlite sink (DLQ writes WILL error \
on postgres-backed daemons until the cast is restored)"
);
std::sync::Arc::new(federation::push_dlq::SqliteDlqSink::new(db_state.clone()))
}
}
_ => std::sync::Arc::new(federation::push_dlq::SqliteDlqSink::new(db_state.clone())),
};
fed.dlq_sink = Some(sink);
}
if let Some(ref fed) = federation
&& args.catchup_interval_secs > 0
{
let interval = std::time::Duration::from_secs(args.catchup_interval_secs);
#[cfg(feature = "sal")]
{
federation::spawn_catchup_loop_with_store(
fed.clone(),
db_state.clone(),
Some(store_handle.clone()),
interval,
);
}
#[cfg(not(feature = "sal"))]
{
federation::spawn_catchup_loop(fed.clone(), db_state.clone(), interval);
}
#[cfg(feature = "sal")]
if let Some(sink) = fed.dlq_sink.clone() {
let _replay_handle =
federation::spawn_replay_federation_push_dlq(fed.clone(), sink, interval);
tracing::info!(
"federation push DLQ replay worker enabled: polling every {}s",
args.catchup_interval_secs,
);
}
}
#[cfg(feature = "sal")]
if embedder_arc.is_some() {
let backfill_store = store_handle.clone();
let backfill_embedder = embedder_arc.clone();
let backfill_batch = usize::try_from(app_config.resolve_embeddings().backfill_batch)
.unwrap_or(crate::mcp::DEFAULT_EMBED_BACKFILL_BATCH_SIZE);
task_handles.push(tokio::spawn(async move {
let Some(emb) = backfill_embedder.as_ref() else {
return;
};
let ctx = crate::store::CallerContext::for_admin(
crate::identity::sentinels::EMBEDDING_BACKFILL,
);
let written = crate::store::run_embedding_backfill_on_store(
backfill_store.as_ref(),
&ctx,
emb,
backfill_batch,
)
.await;
if written > 0 {
tracing::info!(
"embedding backfill (serve boot, #1579 A4): {written} row(s) embedded"
);
}
}));
}
if federation.is_some()
&& std::env::var(federation::identity::credential::FED_CREDENTIAL_PATH_ENV).is_ok()
{
let renewal_interval = Duration::from_secs(
federation::identity::renewal::DEFAULT_RENEWAL_INTERVAL_SECS.unsigned_abs(),
);
let _renewal_handle = federation::identity::renewal::spawn_refresh_outbound_credential(
db_state.clone(),
renewal_interval,
);
tracing::info!(
"federation outbound credential renewal worker enabled: refreshing every {}s",
renewal_interval.as_secs(),
);
}
if matches!(storage_backend, crate::handlers::StorageBackend::Postgres) {
tracing::warn!(
"v0.7.0 Wave-3: postgres-backed daemon — handlers that have not \
yet migrated to the SAL trait surface 501 Not Implemented. See \
docs/postgres-age-guide.md for the supported endpoint inventory."
);
}
let app_state = AppState {
db: db_state.clone(),
embedder: embedder_arc,
vector_index: vector_index_state,
federation: Arc::new(federation),
tier_config: Arc::new(tier_config),
scoring: Arc::new(app_config.effective_scoring()),
profile: Arc::new(resolved_profile),
mcp_config: Arc::new(mcp_config_for_http),
active_keypair: Arc::new(active_keypair),
family_embeddings,
storage_backend,
#[cfg(feature = "sal")]
store: store_handle,
llm: Arc::new(llm),
auto_tag_model: Arc::new(app_config.auto_tag_model.clone()),
llm_call_timeout: Duration::from_secs(app_config.effective_llm_call_timeout_secs()),
replay_cache: Arc::new(crate::identity::replay::ReplayCache::new()),
verify_require_nonce: app_config.verify.as_ref().is_some_and(|v| v.require_nonce),
federation_nonce_cache: Arc::new(
match crate::identity::replay::FederationNonceCache::new_with_db_persistence(db_path) {
Ok(c) => c,
Err(e) => {
tracing::warn!(
target: "ai_memory::identity::replay",
db_path = %db_path.display(),
err = %e,
"#1255: FederationNonceCache persistence open failed; falling back to \
in-memory cache. Daemon restarts will reopen the replay window until \
operators resolve the underlying sqlite issue."
);
crate::identity::replay::FederationNonceCache::new()
}
},
),
autonomous_hooks: app_config.effective_autonomous_hooks(),
recall_scope: Arc::new(app_config.effective_recall_scope().cloned()),
deferred_audit_queue: Arc::new(Some(deferred_audit_queue)),
admin_agent_ids: Arc::new(resolve_admin_agent_ids(app_config.admin.as_ref())),
rule_cache: Arc::clone(&rule_cache),
resolved_models: Arc::new(app_config.resolve_models()),
runtime: crate::runtime_context::RuntimeContext::global_arc(),
max_page_size: app_config.resolve_limits().max_page_size,
};
task_handles.push(deferred_audit_supervisor);
let shadow_retention_days = app_config.confidence.as_ref().map_or(
crate::confidence::shadow::DEFAULT_SHADOW_RETENTION_DAYS,
crate::config::ConfidenceConfig::effective_shadow_retention_days,
);
task_handles.push(spawn_gc_loop_with_shadow_retention(
db_state.clone(),
app_config.archive_max_days,
shadow_retention_days,
Duration::from_secs(GC_INTERVAL_SECS),
));
task_handles.push(crate::background::offload_ttl_sweep::spawn(
db_state.clone(),
crate::background::offload_ttl_sweep::DEFAULT_INTERVAL,
));
task_handles.push(spawn_wal_checkpoint_loop(
db_state.clone(),
Duration::from_secs(WAL_CHECKPOINT_INTERVAL_SECS),
));
task_handles.push(spawn_pending_timeout_sweep_loop(
db_state.clone(),
db_path.to_path_buf(),
PENDING_TIMEOUT_DEFAULT_SECS,
Duration::from_secs(PENDING_TIMEOUT_SWEEP_INTERVAL_SECS),
));
task_handles.push(spawn_transcript_lifecycle_sweep_loop(
db_state.clone(),
app_config.effective_transcripts(),
Duration::from_secs(TRANSCRIPT_LIFECYCLE_SWEEP_INTERVAL_SECS),
));
task_handles.push(spawn_agent_quota_reset_loop(
db_state.clone(),
Duration::from_secs(AGENT_QUOTA_RESET_INTERVAL_SECS),
));
let mtls_enforced =
args.tls_cert.is_some() && args.tls_key.is_some() && args.mtls_allowlist.is_some();
let api_key_state = ApiKeyState {
key: app_config.api_key.clone(),
mtls_enforced,
};
if api_key_state.key.is_some() {
if mtls_enforced {
tracing::info!(
"API key authentication enabled — federation endpoints (/api/v1/sync/*) \
bypass api-key check because mTLS allowlist is configured"
);
} else {
tracing::info!("API key authentication enabled");
}
}
crate::handlers::admin_role::mark_request_authn_configured(api_key_state.key.is_some());
if !app_state.admin_agent_ids.is_empty()
&& api_key_state.key.is_none()
&& !crate::handlers::admin_role::admin_header_trust_enabled()
{
tracing::warn!(
"[admin].agent_ids is configured but no api_key is set: the X-Agent-Id header is \
self-asserted, so admin-role requests will be REFUSED (403) until you either \
configure an api_key or explicitly opt into the legacy header-trust posture with \
{}=1 (#1570 secure default)",
crate::handlers::admin_role::ENV_ADMIN_HEADER_TRUST,
);
}
Ok(ServeBootstrap {
app_state,
api_key_state,
db_state,
archive_max_days: app_config.archive_max_days,
task_handles,
daemon_keypair_outcome,
request_timeout: Duration::from_secs(app_config.effective_request_timeout_secs()),
deferred_audit_metrics,
})
}
fn init_tracing() {
let _ = tracing_subscriber::fmt()
.with_env_filter(
EnvFilter::from_default_env()
.add_directive(crate::logging::DEFAULT_LOG_DIRECTIVE.parse().unwrap())
.add_directive("tower_http=info".parse().unwrap()),
)
.try_init();
}
#[allow(clippy::too_many_lines)]
pub async fn serve(db_path: PathBuf, args: ServeArgs, app_config: &AppConfig) -> Result<()> {
init_tracing();
let bootstrap = bootstrap_serve(&db_path, &args, app_config).await?;
let banner_inputs = crate::cli::serve_banner::BannerInputs {
configured_permissions_mode: app_config.permissions.as_ref().and_then(|p| p.mode),
auto_generated_keypair_path: bootstrap.daemon_keypair_outcome.as_ref().and_then(
|o| match o {
crate::identity::keypair::EnsureOutcome::Generated { pub_path } => {
Some(pub_path.display().to_string())
}
_ => None,
},
),
identity_disabled: matches!(
bootstrap.daemon_keypair_outcome,
Some(crate::identity::keypair::EnsureOutcome::SkippedDisabled)
),
};
for line in crate::cli::serve_banner::compose_banner(&banner_inputs) {
if line.is_warn() {
tracing::warn!("{}", line.message());
} else {
tracing::info!("{}", line.message());
}
}
let addr = format!("{}:{}", args.host, args.port);
tracing::info!("database: {}", db_path.display());
let checkpoint_state = bootstrap.db_state.clone();
let drain_metrics = bootstrap.deferred_audit_metrics.clone();
let shutdown = async move {
let _ = tokio::signal::ctrl_c().await;
tracing::info!("shutting down — draining deferred-audit queue then checkpointing WAL");
};
if let (Some(cert), Some(key)) = (&args.tls_cert, &args.tls_key) {
let _ = rustls::crypto::ring::default_provider().install_default();
let tls_config = if let Some(allowlist_path) = &args.mtls_allowlist {
tracing::info!(
"mTLS enabled — client certs required. Allowlist: {}",
allowlist_path.display()
);
tls::load_mtls_rustls_config(cert, key, allowlist_path).await?
} else {
tracing::warn!(
"TLS enabled but mTLS NOT configured — sync endpoints \
(/api/v1/sync/push, /api/v1/sync/since) accept any client. \
Set --mtls-allowlist for production peer-mesh deployments \
(red-team #231)."
);
tls::load_rustls_config(cert, key).await?
};
let app = crate::build_router_with_timeout(
bootstrap.api_key_state,
bootstrap.app_state,
bootstrap.request_timeout,
);
tracing::info!("ai-memory listening on https://{addr}");
let socket_addr: std::net::SocketAddr = addr.parse()?;
let grace = std::time::Duration::from_secs(args.shutdown_grace_secs);
let handle = axum_server::Handle::new();
let handle_clone = handle.clone();
tokio::spawn(async move {
shutdown.await;
handle_clone.graceful_shutdown(Some(grace));
});
axum_server::bind(socket_addr)
.acceptor(tls::serve_rustls_acceptor(&tls_config))
.handle(handle)
.serve(app.into_make_service())
.await?;
} else {
tracing::warn!(
"TLS NOT enabled — sync endpoints (/api/v1/sync/push, \
/api/v1/sync/since) accept any caller over plain HTTP. \
Set --tls-cert + --tls-key + --mtls-allowlist for production \
peer-mesh deployments (red-team #231)."
);
tracing::info!("ai-memory listening on http://{addr}");
serve_http_with_shutdown_future_and_timeout(
&addr,
bootstrap.api_key_state,
bootstrap.app_state,
bootstrap.request_timeout,
shutdown,
)
.await?;
}
let drained = crate::governance::deferred_audit::drain_pending(
&drain_metrics,
crate::governance::deferred_audit::DEFAULT_SHUTDOWN_DRAIN_TIMEOUT,
)
.await;
if drained {
tracing::info!(
"deferred-audit queue drained ({} refusals accounted) — checkpointing WAL",
drain_metrics.submitted_count()
);
} else {
tracing::warn!(
"deferred-audit drain timed out after {:?}: {} submitted but only {} accounted — \
some refusal audit rows may not have flushed before exit",
crate::governance::deferred_audit::DEFAULT_SHUTDOWN_DRAIN_TIMEOUT,
drain_metrics.submitted_count(),
drain_metrics.appended_count()
+ drain_metrics.append_failure_count()
+ drain_metrics.send_failure_count(),
);
}
{
let lock = checkpoint_state.lock().await;
let _ = db::checkpoint(&lock.0);
}
Ok(())
}
fn cmd_bench(args: &BenchArgs) -> Result<()> {
let iterations = args.iterations.clamp(1, crate::bench::MAX_ITERATIONS);
let warmup = args.warmup.min(crate::bench::MAX_WARMUP);
let regression_threshold = args
.regression_threshold
.clamp(0.0, crate::bench::MAX_REGRESSION_THRESHOLD_PCT);
let conn = db::open(Path::new(":memory:"))?;
let scale = args.scale.map(|s| s.clamp(1, crate::bench::MAX_SCALE));
let config = bench::BenchConfig {
iterations,
warmup,
namespace: bench::BENCH_NAMESPACE.to_string(),
scale,
};
let results = bench::run(&conn, &config)?;
let regressions = if let Some(path) = &args.baseline {
let baseline = bench::load_baseline(Path::new(path))?;
Some(bench::compare_against_baseline(
&results,
&baseline,
regression_threshold,
))
} else {
None
};
if args.json {
println!(
"{}",
serde_json::to_string_pretty(&serde_json::json!({
"iterations": iterations,
"warmup": warmup,
"scale": scale,
"results": results,
"regressions": regressions,
}))?
);
} else {
print!("{}", bench::render_table(&results));
if let Some(rows) = ®ressions {
println!();
print!("{}", bench::render_regression_table(rows));
}
}
if let Some(history_path) = &args.history {
let captured_at = chrono::Utc::now().to_rfc3339();
bench::append_history(
history_path,
&captured_at,
iterations,
warmup,
scale,
&results,
)?;
let mut stderr = std::io::stderr().lock();
let _ = writeln!(
stderr,
"bench: appended run to history file {}",
history_path.display()
);
}
let budget_failed = results
.iter()
.any(|r| matches!(r.status, bench::Status::Fail));
let regression_failed = regressions
.as_ref()
.is_some_and(|rows| rows.iter().any(|r| r.regressed));
if budget_failed && regression_failed {
anyhow::bail!(
"bench: at least one operation exceeded its p95 budget by >10% AND regressed >{regression_threshold:.1}% vs baseline"
);
}
if budget_failed {
anyhow::bail!("bench: at least one operation exceeded its p95 budget by >10%");
}
if regression_failed {
anyhow::bail!(
"bench: at least one operation regressed >{regression_threshold:.1}% vs baseline"
);
}
Ok(())
}
#[cfg(feature = "sal")]
async fn cmd_migrate(args: &MigrateArgs) -> Result<()> {
let src = migrate::open_store(&args.from)
.await
.context("open source store")?;
let dst = migrate::open_store(&args.to)
.await
.context("open destination store")?;
let report = migrate::migrate(
src.as_ref(),
dst.as_ref(),
args.batch,
args.namespace.clone(),
args.dry_run,
)
.await;
let from_display = crate::logging::redact_url_password(&args.from);
let to_display = crate::logging::redact_url_password(&args.to);
if args.json {
let value = serde_json::json!({
"from_url": from_display,
"to_url": to_display,
"memories_read": report.memories_read,
"memories_written": report.memories_written,
"batches": report.batches,
"errors": report.errors,
"dry_run": report.dry_run,
});
println!("{}", serde_json::to_string_pretty(&value)?);
} else {
println!("migration report");
println!(" from: {from_display}");
println!(" to: {to_display}");
println!(" memories_read: {}", report.memories_read);
println!(" memories_written: {}", report.memories_written);
println!(" batches: {}", report.batches);
println!(" dry_run: {}", report.dry_run);
println!(" errors: {}", report.errors.len());
for e in &report.errors {
println!(" - {e}");
}
}
if !report.errors.is_empty() {
anyhow::bail!("migration completed with {} error(s)", report.errors.len());
}
Ok(())
}
pub async fn serve_http_with_shutdown(
addr: &str,
api_key_state: ApiKeyState,
app_state: AppState,
shutdown: Arc<Notify>,
) -> Result<()> {
serve_http_with_shutdown_future(addr, api_key_state, app_state, async move {
shutdown.notified().await;
})
.await
}
pub async fn serve_http_with_shutdown_future<F>(
addr: &str,
api_key_state: ApiKeyState,
app_state: AppState,
shutdown: F,
) -> Result<()>
where
F: std::future::Future<Output = ()> + Send + 'static,
{
serve_http_with_shutdown_future_and_timeout(
addr,
api_key_state,
app_state,
Duration::from_secs(crate::config::DEFAULT_REQUEST_TIMEOUT_SECS),
shutdown,
)
.await
}
pub async fn serve_http_with_shutdown_future_and_timeout<F>(
addr: &str,
api_key_state: ApiKeyState,
app_state: AppState,
request_timeout: Duration,
shutdown: F,
) -> Result<()>
where
F: std::future::Future<Output = ()> + Send + 'static,
{
let app = crate::build_router_with_timeout(api_key_state, app_state, request_timeout);
let listener = tokio::net::TcpListener::bind(addr)
.await
.with_context(|| format!("bind {addr}"))?;
axum::serve(listener, app)
.with_graceful_shutdown(shutdown)
.await
.context("axum::serve")?;
Ok(())
}
pub async fn sync_cycle_once(
client: &reqwest::Client,
db_path: &Path,
local_agent_id: &str,
peer_url: &str,
api_key: Option<&str>,
batch_size: usize,
) -> Result<()> {
let peer_url = peer_url.trim_end_matches('/');
let since = {
let conn = db::open(db_path)?;
db::sync_state_load(&conn, local_agent_id)?
.entries
.get(peer_url)
.cloned()
};
let mut pull_url = format!(
"{peer_url}/api/v1/sync/since?limit={batch_size}&peer={}",
urlencoding_minimal(local_agent_id)
);
if let Some(ref s) = since {
pull_url.push_str("&since=");
pull_url.push_str(&urlencoding_minimal(s));
}
let mut req = client
.get(&pull_url)
.header(crate::HEADER_AGENT_ID, local_agent_id)
.header(
crate::federation::peer_attestation::PEER_ID_HEADER,
local_agent_id,
);
if let Some(key) = api_key {
req = req.header(crate::HEADER_API_KEY, key);
}
let resp = req.send().await?;
if !resp.status().is_success() {
anyhow::bail!("sync-daemon: pull status {}", resp.status());
}
let pulled: SyncSinceResponse = resp.json().await?;
let pull_count = pulled.memories.len();
let latest_pulled = pulled.memories.last().map(|m| m.updated_at.clone());
{
let conn = db::open(db_path)?;
for mem in &pulled.memories {
if crate::validate::RequestValidator::validate_memory(mem).is_ok() {
let _ = db::insert_if_newer(&conn, mem);
}
}
if let Some(ref at) = latest_pulled {
db::sync_state_observe(&conn, local_agent_id, peer_url, at)?;
}
}
let last_pushed = {
let conn = db::open(db_path)?;
db::sync_state_last_pushed(&conn, local_agent_id, peer_url)
};
let outgoing = {
let conn = db::open(db_path)?;
db::memories_updated_since(&conn, last_pushed.as_deref(), batch_size)?
};
let push_count = outgoing.len();
let latest_pushed = outgoing.last().map(|m| m.updated_at.clone());
if !outgoing.is_empty() {
let body = serde_json::json!({
(field_names::SENDER_AGENT_ID): local_agent_id,
"sender_clock": { "entries": {} },
"memories": outgoing,
"dry_run": false,
});
let mut req = client
.post(format!("{peer_url}/api/v1/sync/push"))
.header(crate::HEADER_AGENT_ID, local_agent_id)
.header(
crate::federation::peer_attestation::PEER_ID_HEADER,
local_agent_id,
)
.header(crate::HEADER_CONTENT_TYPE, crate::MIME_JSON)
.json(&body);
if let Some(key) = api_key {
req = req.header(crate::HEADER_API_KEY, key);
}
let resp = req.send().await?;
if !resp.status().is_success() {
anyhow::bail!("sync-daemon: push status {}", resp.status());
}
if let Some(at) = latest_pushed {
let conn = db::open(db_path)?;
db::sync_state_record_push(&conn, local_agent_id, peer_url, &at)?;
}
}
tracing::info!("sync-daemon: peer={peer_url} pulled={pull_count} pushed={push_count}");
Ok(())
}
pub async fn run_sync_daemon_with_shutdown(
db_path: PathBuf,
local_agent_id: String,
peers: Vec<String>,
api_key: Option<String>,
interval_secs: u64,
batch_size: usize,
shutdown: Arc<Notify>,
) -> Result<()> {
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()?;
run_sync_daemon_with_shutdown_using_client(
client,
db_path,
local_agent_id,
peers,
api_key,
interval_secs,
batch_size,
shutdown,
)
.await
}
pub async fn run_sync_daemon_with_shutdown_using_client(
client: reqwest::Client,
db_path: PathBuf,
local_agent_id: String,
peers: Vec<String>,
api_key: Option<String>,
interval_secs: u64,
batch_size: usize,
shutdown: Arc<Notify>,
) -> Result<()> {
let interval = interval_secs.max(1);
let batch_size = batch_size.max(1);
let db_path_owned: Arc<Path> = Arc::from(db_path.as_path());
let local_agent_id_arc: Arc<str> = Arc::from(local_agent_id.as_str());
let api_key_arc: Option<Arc<str>> = api_key.as_deref().map(Arc::from);
let peers_arc: Vec<Arc<str>> = peers.iter().map(|s| Arc::from(s.as_str())).collect();
loop {
let mut set: tokio::task::JoinSet<()> = tokio::task::JoinSet::new();
for peer_url in &peers_arc {
let client = client.clone();
let db_path = db_path_owned.clone();
let local_agent_id = local_agent_id_arc.clone();
let peer_url = peer_url.clone();
let api_key = api_key_arc.clone();
set.spawn(async move {
if let Err(e) = sync_cycle_once(
&client,
&db_path,
&local_agent_id,
&peer_url,
api_key.as_deref(),
batch_size,
)
.await
{
tracing::warn!("sync-daemon: peer {peer_url} cycle failed: {e}");
}
});
}
while set.join_next().await.is_some() {}
tokio::select! {
() = tokio::time::sleep(Duration::from_secs(interval)) => {}
() = shutdown.notified() => {
tracing::info!("sync-daemon: shutdown signal received");
return Ok(());
}
}
}
}
pub async fn run_curator_daemon_with_shutdown(
db_path: PathBuf,
cfg: crate::curator::CuratorConfig,
shutdown: Arc<Notify>,
) -> Result<()> {
let shutdown_flag = Arc::new(AtomicBool::new(false));
let shutdown_flag_for_signal = shutdown_flag.clone();
tokio::spawn(async move {
shutdown.notified().await;
shutdown_flag_for_signal.store(true, Ordering::Relaxed);
});
let llm_arc: Option<Arc<crate::llm::OllamaClient>> = None;
let (kp_opt, _outcome) = ensure_and_load_daemon_keypair();
let active_keypair = kp_opt.map(Arc::new);
let db_owned = db_path;
tokio::task::spawn_blocking(move || {
crate::curator::run_daemon(db_owned, llm_arc, cfg, shutdown_flag, active_keypair);
})
.await
.map_err(|e| anyhow::anyhow!("curator daemon join: {e}"))?;
Ok(())
}
#[allow(clippy::too_many_arguments)]
pub async fn run_curator_daemon_with_primitives(
db_path: PathBuf,
interval_secs: u64,
max_ops_per_cycle: usize,
dry_run: bool,
include_namespaces: Vec<String>,
exclude_namespaces: Vec<String>,
llm: Option<Arc<crate::llm::OllamaClient>>,
shutdown: Arc<Notify>,
) -> Result<()> {
let cfg = crate::curator::CuratorConfig {
interval_secs,
max_ops_per_cycle,
dry_run,
include_namespaces,
exclude_namespaces,
compaction: crate::curator::CompactionConfig::default(),
};
let shutdown_flag = Arc::new(AtomicBool::new(false));
let shutdown_flag_for_signal = shutdown_flag.clone();
tokio::spawn(async move {
shutdown.notified().await;
shutdown_flag_for_signal.store(true, Ordering::Relaxed);
});
let (kp_opt, _outcome) = ensure_and_load_daemon_keypair();
let active_keypair = kp_opt.map(Arc::new);
tokio::task::spawn_blocking(move || {
crate::curator::run_daemon(db_path, llm, cfg, shutdown_flag, active_keypair);
})
.await
.map_err(|e| anyhow::anyhow!("curator daemon join: {e}"))?;
Ok(())
}
fn urlencoding_minimal(s: &str) -> String {
use std::fmt::Write as _;
let mut out = String::with_capacity(s.len());
for b in s.bytes() {
match b {
b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => {
out.push(b as char);
}
_ => {
let _ = write!(out, "%{b:02X}");
}
}
}
out
}
#[derive(serde::Deserialize)]
struct SyncSinceResponse {
#[allow(dead_code)]
count: usize,
#[allow(dead_code)]
limit: usize,
memories: Vec<crate::models::Memory>,
}
#[allow(dead_code)]
fn _imports_in_use(_: Instant, _: Duration) {}
#[cfg(test)]
#[allow(deprecated)] mod tests {
use super::*;
use crate::cli::test_utils::TestEnv;
use crate::config::ResolvedTtl;
use axum::body::Body;
use axum::http::{Request, StatusCode};
use tower::ServiceExt as _;
#[cfg(feature = "sal-postgres")]
#[tokio::test]
async fn issue_1579_a3_boot_log_redacts_store_url_password() {
use std::sync::{Arc, Mutex};
#[derive(Clone, Default)]
struct SharedBuf(Arc<Mutex<Vec<u8>>>);
impl std::io::Write for SharedBuf {
fn write(&mut self, b: &[u8]) -> std::io::Result<usize> {
self.0.lock().expect("buf lock").extend_from_slice(b);
Ok(b.len())
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
let buf = SharedBuf::default();
let writer_buf = buf.clone();
let subscriber = tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.with_ansi(false)
.with_writer(move || writer_buf.clone())
.finish();
let _guard = tracing::subscriber::set_default(subscriber);
let secret = "sup3r-s3cret-pw";
let url = format!("postgres://ai_memory:{secret}@127.0.0.1:1/ai_memory");
let dir = tempfile::tempdir().expect("tempdir");
let db_path = dir.path().join("unused.db");
let res = build_store_handle(
Some(&url),
&db_path,
None,
Some(384),
crate::store::PoolConfig::default(),
)
.await;
assert!(res.is_err(), "port 1 must refuse the connection");
let logs = String::from_utf8_lossy(&buf.0.lock().expect("buf lock")).to_string();
assert!(
logs.contains("opening Postgres SAL store at postgres://ai_memory:****@127.0.0.1:1"),
"boot line must log the redacted URL; got:\n{logs}"
);
assert!(
!logs.contains(secret),
"store-URL password leaked into the boot log:\n{logs}"
);
}
#[test]
fn governance_consultation_unavailable_fails_closed_by_default_1455() {
use crate::governance::agent_action::AgentAction;
use crate::governance::deferred_audit::DeferredAuditQueue;
let (queue, _rx) = DeferredAuditQueue::new();
let action = AgentAction::Custom {
custom_kind: "memory_write".to_string(),
payload: serde_json::json!({ "namespace": "ns", "tier": "long" }),
};
let path = Path::new("/nonexistent/rules.db");
let closed = governance_consultation_unavailable_inner(
&queue,
"agent:test",
&action,
path,
"test-surface",
false,
);
let reason = closed.expect_err("missing consultation conn MUST fail CLOSED");
assert!(
reason.contains("consultation_unavailable"),
"fail-closed reason must name the cause: {reason}"
);
let opened = governance_consultation_unavailable_inner(
&queue,
"agent:test",
&action,
path,
"test-surface",
true,
);
assert!(
opened.is_ok(),
"fail_open override MUST degrade to ALLOW (legacy posture)"
);
}
#[test]
fn governance_fail_open_on_error_env_parse_1455() {
unsafe { std::env::remove_var("AI_MEMORY_GOVERNANCE_FAIL_OPEN_ON_ERROR") };
assert!(!governance_fail_open_on_error());
unsafe { std::env::set_var("AI_MEMORY_GOVERNANCE_FAIL_OPEN_ON_ERROR", "1") };
assert!(governance_fail_open_on_error());
unsafe { std::env::set_var("AI_MEMORY_GOVERNANCE_FAIL_OPEN_ON_ERROR", "TRUE") };
assert!(governance_fail_open_on_error());
unsafe { std::env::set_var("AI_MEMORY_GOVERNANCE_FAIL_OPEN_ON_ERROR", "0") };
assert!(!governance_fail_open_on_error());
unsafe { std::env::remove_var("AI_MEMORY_GOVERNANCE_FAIL_OPEN_ON_ERROR") };
}
#[test]
fn api_key_bind_guard_present_binds_silently_1458() {
assert_eq!(api_key_bind_guard(true, "0.0.0.0", false).unwrap(), None);
assert_eq!(api_key_bind_guard(true, "127.0.0.1", true).unwrap(), None);
}
#[test]
fn api_key_bind_guard_keyless_loopback_warns_1458() {
for host in ["127.0.0.1", "::1", "localhost", "[::1]", "0:0:0:0:0:0:0:1"] {
let warning = api_key_bind_guard(false, host, false)
.unwrap()
.unwrap_or_else(|| panic!("keyless loopback {host} must warn, not bind silently"));
assert!(
warning.contains("reverse proxy") && warning.contains("off-host"),
"warning must name the proxy hazard for {host}: {warning}"
);
}
}
#[test]
fn api_key_bind_guard_keyless_non_loopback_refuses_1458() {
let err = api_key_bind_guard(false, "0.0.0.0", false)
.expect_err("keyless non-loopback bind MUST be refused");
assert!(err.contains("refusing to bind to non-loopback"), "{err}");
}
#[test]
fn api_key_bind_guard_strict_refuses_keyless_loopback_1458() {
let err = api_key_bind_guard(false, "127.0.0.1", true)
.expect_err("strict mode MUST refuse keyless loopback bind");
assert!(
err.contains("AI_MEMORY_REQUIRE_API_KEY"),
"strict refusal must name the knob: {err}"
);
assert_eq!(api_key_bind_guard(true, "127.0.0.1", true).unwrap(), None);
}
#[test]
fn require_api_key_strict_env_parse_1458() {
unsafe { std::env::remove_var("AI_MEMORY_REQUIRE_API_KEY") };
assert!(!require_api_key_strict());
unsafe { std::env::set_var("AI_MEMORY_REQUIRE_API_KEY", "1") };
assert!(require_api_key_strict());
unsafe { std::env::set_var("AI_MEMORY_REQUIRE_API_KEY", "TRUE") };
assert!(require_api_key_strict());
unsafe { std::env::set_var("AI_MEMORY_REQUIRE_API_KEY", "0") };
assert!(!require_api_key_strict());
unsafe { std::env::remove_var("AI_MEMORY_REQUIRE_API_KEY") };
}
fn args_with_db(_db: &Path) -> ServeArgs {
ServeArgs {
host: "127.0.0.1".to_string(),
port: 0,
tls_cert: None,
tls_key: None,
mtls_allowlist: None,
shutdown_grace_secs: 30,
quorum_writes: 0,
quorum_peers: vec![],
quorum_timeout_ms: 2000,
quorum_client_cert: None,
quorum_client_key: None,
quorum_ca_cert: None,
catchup_interval_secs: 0,
federation_identity: None,
#[cfg(feature = "sal")]
store_url: None,
}
}
fn keyword_app_state(db_path: &Path) -> AppState {
let conn = db::open(db_path).unwrap();
let db_state: Db = Arc::new(Mutex::new((
conn,
db_path.to_path_buf(),
ResolvedTtl::default(),
true,
)));
AppState {
db: db_state,
embedder: Arc::new(None),
vector_index: Arc::new(Mutex::new(None)),
federation: Arc::new(None),
tier_config: Arc::new(FeatureTier::Keyword.config()),
scoring: Arc::new(crate::config::ResolvedScoring::default()),
profile: Arc::new(crate::profile::Profile::core()),
mcp_config: Arc::new(None),
active_keypair: Arc::new(None),
family_embeddings: Arc::new(tokio::sync::RwLock::new(Some(Vec::new()))),
storage_backend: crate::handlers::StorageBackend::Sqlite,
#[cfg(feature = "sal")]
store: {
let s = crate::store::sqlite::SqliteStore::open(db_path)
.expect("open SqliteStore for keyword_app_state");
Arc::new(s)
},
llm: Arc::new(None),
auto_tag_model: Arc::new(None),
llm_call_timeout: Duration::from_secs(crate::config::DEFAULT_LLM_CALL_TIMEOUT_SECS),
replay_cache: Arc::new(crate::identity::replay::ReplayCache::new()),
verify_require_nonce: false,
federation_nonce_cache: Arc::new(crate::identity::replay::FederationNonceCache::new()),
autonomous_hooks: false,
recall_scope: Arc::new(None),
deferred_audit_queue: Arc::new(None),
admin_agent_ids: Arc::new(Vec::new()),
rule_cache: Arc::new(crate::governance::rule_cache::RuleCache::new()),
resolved_models: Arc::new(crate::config::ResolvedModels::default()),
runtime: crate::runtime_context::RuntimeContext::global_arc(),
max_page_size: crate::handlers::MAX_BULK_SIZE,
}
}
fn env_var_lock() -> std::sync::MutexGuard<'static, ()> {
use std::sync::OnceLock;
static LOCK: OnceLock<std::sync::Mutex<()>> = OnceLock::new();
LOCK.get_or_init(|| std::sync::Mutex::new(()))
.lock()
.unwrap_or_else(|e| e.into_inner())
}
#[test]
fn test_is_write_command_all_variants() {
let writes: &[&[&str]] = &[
&["ai-memory", "store", "title", "content"],
&["ai-memory", "update", "id123", "--title", "t"],
&["ai-memory", "delete", "id123"],
&["ai-memory", "promote", "id123"],
&["ai-memory", "forget", "pattern"],
&["ai-memory", "link", "a", "b"],
&["ai-memory", "consolidate", "ids"],
&["ai-memory", "resolve", "a", "b"],
&["ai-memory", "sync", "--peer", "/tmp/peer.db"],
&[
"ai-memory",
"sync-daemon",
"--peers",
"http://x",
"--interval-secs",
"60",
],
&["ai-memory", "import"],
&["ai-memory", "auto-consolidate"],
&["ai-memory", "gc"],
];
let mut writes_checked = 0;
for argv in writes {
if let Ok(cli) = Cli::try_parse_from(*argv) {
assert!(
is_write_command(&cli.command),
"expected write for {argv:?}"
);
writes_checked += 1;
}
}
assert!(
writes_checked >= 5,
"expected at least 5 write variants checked, got {writes_checked}"
);
let reads: &[&[&str]] = &[
&["ai-memory", "mcp"],
&["ai-memory", "recall", "context"],
&["ai-memory", "search", "query"],
&["ai-memory", "get", "id"],
&["ai-memory", "list"],
&["ai-memory", "stats"],
&["ai-memory", "namespaces"],
&["ai-memory", "export"],
&["ai-memory", "shell"],
&["ai-memory", "man"],
&["ai-memory", "completions", "bash"],
&["ai-memory", "archive", "list"],
&["ai-memory", "agents", "list"],
&["ai-memory", "pending", "list"],
&["ai-memory", "bench"],
&["ai-memory", "serve", "--host", "127.0.0.1", "--port", "0"],
];
let mut reads_checked = 0;
for argv in reads {
if let Ok(cli) = Cli::try_parse_from(*argv) {
assert!(
!is_write_command(&cli.command),
"expected read for {argv:?}"
);
reads_checked += 1;
}
}
assert!(
reads_checked >= 8,
"expected at least 8 read variants checked, got {reads_checked}"
);
assert!(is_write_command(&Command::Gc));
assert!(!is_write_command(&Command::Stats));
assert!(!is_write_command(&Command::Namespaces));
assert!(!is_write_command(&Command::Export));
assert!(!is_write_command(&Command::Shell));
assert!(!is_write_command(&Command::Man));
assert!(!is_write_command(&Command::Mcp {
tier: "keyword".to_string(),
profile: None,
}));
}
#[tokio::test]
async fn test_router_has_health_endpoint() {
let env = TestEnv::fresh();
let app_state = keyword_app_state(&env.db_path);
let api_key_state = ApiKeyState {
key: None,
mtls_enforced: false,
};
let router = build_router(app_state, api_key_state);
let resp = router
.oneshot(
Request::builder()
.method("GET")
.uri("/api/v1/health")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
}
#[tokio::test]
async fn test_router_has_metrics_at_both_paths() {
let env = TestEnv::fresh();
let app_state = keyword_app_state(&env.db_path);
let api_key_state = ApiKeyState {
key: None,
mtls_enforced: false,
};
let r1 = build_router(app_state.clone(), api_key_state.clone())
.oneshot(
Request::builder()
.method("GET")
.uri("/metrics")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(r1.status(), StatusCode::OK);
let r2 = build_router(app_state, api_key_state)
.oneshot(
Request::builder()
.method("GET")
.uri("/api/v1/metrics")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(r2.status(), StatusCode::OK);
}
#[tokio::test]
async fn test_router_lists_all_v1_memory_routes() {
let env = TestEnv::fresh();
let app_state = keyword_app_state(&env.db_path);
let api_key_state = ApiKeyState {
key: None,
mtls_enforced: false,
};
let router = build_router(app_state, api_key_state);
let resp = router
.oneshot(
Request::builder()
.method("GET")
.uri("/api/v1/memories")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert!(resp.status().is_success(), "got {}", resp.status());
}
#[tokio::test]
async fn test_router_applies_api_key_middleware_when_key_set() {
let env = TestEnv::fresh();
let app_state = keyword_app_state(&env.db_path);
let api_key_state = ApiKeyState {
key: Some("s3cret".to_string()),
mtls_enforced: false,
};
let router = build_router(app_state, api_key_state);
let resp = router
.oneshot(
Request::builder()
.method("GET")
.uri("/api/v1/memories")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
}
#[tokio::test]
async fn test_router_skips_api_key_middleware_when_key_none() {
let env = TestEnv::fresh();
let app_state = keyword_app_state(&env.db_path);
let api_key_state = ApiKeyState {
key: None,
mtls_enforced: false,
};
let router = build_router(app_state, api_key_state);
let resp = router
.oneshot(
Request::builder()
.method("GET")
.uri("/api/v1/memories")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
}
#[tokio::test]
async fn test_build_embedder_keyword_tier_returns_none() {
let cfg = AppConfig::default();
let emb = build_embedder(FeatureTier::Keyword, &cfg).await;
assert!(emb.is_none());
}
#[tokio::test]
async fn test_build_embedder_load_failure_returns_none() {
}
#[tokio::test]
async fn test_build_embedder_invalid_override_falls_back_to_preset() {
let mut cfg = AppConfig::default();
cfg.embedding_model = Some("not-a-real-embedding-model-2026".to_string());
let emb = build_embedder(FeatureTier::Keyword, &cfg).await;
assert!(
emb.is_none(),
"unparseable override + keyword tier must return None"
);
}
#[test]
fn resolve_embedder_model_section_beats_tier_preset() {
let mut cfg = AppConfig::default();
cfg.embeddings = Some(crate::config::EmbeddingsSection {
model: Some("nomic_embed_v15".to_string()),
..crate::config::EmbeddingsSection::default()
});
let tier = FeatureTier::Semantic.config();
assert_eq!(
resolve_embedder_model(&tier, &cfg),
Some(crate::config::EmbeddingModel::NomicEmbedV15),
"[embeddings].model must override the Semantic tier MiniLM preset"
);
}
#[test]
fn resolve_embedder_model_legacy_flat_still_honored() {
let mut cfg = AppConfig::default();
cfg.embedding_model = Some("nomic_embed_v15".to_string());
let tier = FeatureTier::Semantic.config();
assert_eq!(
resolve_embedder_model(&tier, &cfg),
Some(crate::config::EmbeddingModel::NomicEmbedV15),
"legacy flat embedding_model must still override the preset"
);
}
#[test]
fn resolve_embedder_model_section_beats_legacy_flat() {
let mut cfg = AppConfig::default();
cfg.embedding_model = Some("nomic_embed_v15".to_string());
cfg.embeddings = Some(crate::config::EmbeddingsSection {
model: Some("mini_lm_l6_v2".to_string()),
..crate::config::EmbeddingsSection::default()
});
let tier = FeatureTier::Semantic.config();
assert_eq!(
resolve_embedder_model(&tier, &cfg),
Some(crate::config::EmbeddingModel::MiniLmL6V2),
"[embeddings].model must win over legacy flat embedding_model"
);
}
#[test]
fn resolve_embedder_model_url_only_section_keeps_preset() {
let mut cfg = AppConfig::default();
cfg.embeddings = Some(crate::config::EmbeddingsSection {
url: Some("http://127.0.0.1:11435".to_string()),
..crate::config::EmbeddingsSection::default()
});
let tier = FeatureTier::Semantic.config();
assert_eq!(
resolve_embedder_model(&tier, &cfg),
Some(crate::config::EmbeddingModel::MiniLmL6V2),
"url-only section must keep the Semantic MiniLM preset"
);
}
#[test]
fn resolve_embedder_model_unsupported_id_falls_back_to_preset() {
let mut cfg = AppConfig::default();
cfg.embeddings = Some(crate::config::EmbeddingsSection {
model: Some("bge-large-en".to_string()),
..crate::config::EmbeddingsSection::default()
});
let tier = FeatureTier::Semantic.config();
assert_eq!(
resolve_embedder_model(&tier, &cfg),
Some(crate::config::EmbeddingModel::MiniLmL6V2),
"unsupported model id must fall back to the tier preset"
);
}
#[test]
fn resolve_embedder_model_unconfigured_uses_tier_preset() {
let cfg = AppConfig::default();
assert_eq!(
resolve_embedder_model(&FeatureTier::Keyword.config(), &cfg),
None,
"keyword tier has no preset → None"
);
assert_eq!(
resolve_embedder_model(&FeatureTier::Semantic.config(), &cfg),
Some(crate::config::EmbeddingModel::MiniLmL6V2),
"semantic tier preset is MiniLM"
);
}
#[test]
fn test_build_vector_index_no_embedder_returns_none() {
let env = TestEnv::fresh();
let conn = db::open(&env.db_path).unwrap();
assert!(build_vector_index(&conn, false).is_none());
}
#[test]
fn test_build_vector_index_empty_db_returns_empty_index() {
let env = TestEnv::fresh();
let conn = db::open(&env.db_path).unwrap();
let idx = build_vector_index(&conn, true);
assert!(
idx.is_some(),
"empty DB with embedder must yield empty index"
);
assert_eq!(idx.unwrap().len(), 0);
}
#[tokio::test(start_paused = true)]
async fn test_spawn_gc_loop_runs_and_can_be_aborted() {
let env = TestEnv::fresh();
let conn = db::open(&env.db_path).unwrap();
let state: Db = Arc::new(Mutex::new((
conn,
env.db_path.clone(),
ResolvedTtl::default(),
true,
)));
let h = spawn_gc_loop(state, None, Duration::from_secs(60));
tokio::time::advance(Duration::from_secs(61)).await;
tokio::task::yield_now().await;
h.abort();
let err = h.await.unwrap_err();
assert!(err.is_cancelled());
}
#[tokio::test(start_paused = true)]
async fn test_spawn_wal_checkpoint_loop_runs_and_can_be_aborted() {
let env = TestEnv::fresh();
let conn = db::open(&env.db_path).unwrap();
let state: Db = Arc::new(Mutex::new((
conn,
env.db_path.clone(),
ResolvedTtl::default(),
true,
)));
let h = spawn_wal_checkpoint_loop(state, Duration::from_secs(60));
tokio::time::advance(Duration::from_secs(31)).await;
tokio::task::yield_now().await;
tokio::time::advance(Duration::from_secs(60)).await;
tokio::task::yield_now().await;
h.abort();
let err = h.await.unwrap_err();
assert!(err.is_cancelled());
}
#[tokio::test]
async fn test_spawn_pending_timeout_sweep_loop_marks_stale_expired() {
let env = TestEnv::fresh();
let conn = db::open(&env.db_path).unwrap();
let two_h_ago = (chrono::Utc::now() - chrono::Duration::hours(2)).to_rfc3339();
conn.execute(
"INSERT INTO pending_actions
(id, action_type, namespace, payload, requested_by, requested_at,
status)
VALUES ('sweeper-1', 'store', 'ns/a', '{}', 'tester', ?1, 'pending')",
rusqlite::params![two_h_ago],
)
.unwrap();
let state: Db = Arc::new(Mutex::new((
conn,
env.db_path.clone(),
ResolvedTtl::default(),
true,
)));
let h = spawn_pending_timeout_sweep_loop(
state.clone(),
env.db_path.clone(),
crate::SECS_PER_HOUR,
Duration::from_millis(50),
);
let mut flipped = false;
for _ in 0..40 {
tokio::time::sleep(Duration::from_millis(50)).await;
let lock = state.lock().await;
let status: String = lock
.0
.query_row(
"SELECT status FROM pending_actions WHERE id = 'sweeper-1'",
[],
|r| r.get(0),
)
.unwrap();
if status == "expired" {
flipped = true;
break;
}
}
h.abort();
let _ = h.await;
assert!(
flipped,
"sweeper must transition the stale row to 'expired' within 2s"
);
}
#[cfg(unix)]
fn write_passphrase_strict(path: &std::path::Path, body: &str) {
use std::os::unix::fs::PermissionsExt;
std::fs::write(path, body).unwrap();
std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o400)).unwrap();
}
#[cfg(not(unix))]
fn write_passphrase_strict(path: &std::path::Path, body: &str) {
std::fs::write(path, body).unwrap();
}
#[test]
fn test_passphrase_strips_trailing_newline() {
let dir = tempfile::tempdir().unwrap();
let p = dir.path().join("pass");
write_passphrase_strict(&p, "secret\n");
assert_eq!(passphrase_from_file(&p).unwrap(), "secret");
}
#[test]
fn test_passphrase_strips_trailing_crlf() {
let dir = tempfile::tempdir().unwrap();
let p = dir.path().join("pass");
write_passphrase_strict(&p, "secret\r\n");
assert_eq!(passphrase_from_file(&p).unwrap(), "secret");
}
#[test]
fn test_passphrase_empty_file_errors() {
let dir = tempfile::tempdir().unwrap();
let p = dir.path().join("empty");
write_passphrase_strict(&p, "");
let err = passphrase_from_file(&p).unwrap_err();
assert!(
err.to_string().contains("empty"),
"expected 'empty' error, got: {err}"
);
}
#[test]
fn test_passphrase_empty_after_trim_errors() {
let dir = tempfile::tempdir().unwrap();
let p = dir.path().join("nl-only");
write_passphrase_strict(&p, "\n");
let err = passphrase_from_file(&p).unwrap_err();
assert!(err.to_string().contains("empty"));
}
#[test]
fn test_passphrase_nonexistent_file_errors() {
let dir = tempfile::tempdir().unwrap();
let p = dir.path().join("does-not-exist");
let err = passphrase_from_file(&p).unwrap_err();
assert!(
err.to_string().contains("reading passphrase file")
|| err.to_string().contains("stat passphrase file")
|| err.chain().any(|e| e.to_string().contains("No such file"))
|| err.chain().any(|e| e.to_string().contains("cannot find")),
"got: {err:#}"
);
}
#[test]
fn test_passphrase_preserves_internal_whitespace() {
let dir = tempfile::tempdir().unwrap();
let p = dir.path().join("pass");
write_passphrase_strict(&p, "my pass phrase\n");
assert_eq!(passphrase_from_file(&p).unwrap(), "my pass phrase");
}
#[cfg(unix)]
#[test]
fn test_passphrase_rejects_lax_permissions_1055() {
use std::os::unix::fs::PermissionsExt;
let _g = env_var_lock();
unsafe { std::env::remove_var("AI_MEMORY_PASSPHRASE_FILE_ALLOW_LAX_PERMS") };
let dir = tempfile::tempdir().unwrap();
let p = dir.path().join("lax");
std::fs::write(&p, "secret\n").unwrap();
std::fs::set_permissions(&p, std::fs::Permissions::from_mode(0o644)).unwrap();
let err = passphrase_from_file(&p).unwrap_err();
let msg = err.to_string();
assert!(
msg.contains("lax permissions") && msg.contains("0400"),
"#1055: expected lax-permission rejection with chmod 0400 hint; got: {msg}"
);
assert!(
msg.contains("AI_MEMORY_PASSPHRASE_FILE_ALLOW_LAX_PERMS"),
"#1055: failure message MUST reference the env-var escape hatch; got: {msg}"
);
}
#[cfg(unix)]
#[test]
fn test_passphrase_lax_perms_env_overrides_1055() {
use std::os::unix::fs::PermissionsExt;
let _g = env_var_lock();
let dir = tempfile::tempdir().unwrap();
let p = dir.path().join("lax-with-env");
std::fs::write(&p, "secret\n").unwrap();
std::fs::set_permissions(&p, std::fs::Permissions::from_mode(0o644)).unwrap();
unsafe {
std::env::set_var("AI_MEMORY_PASSPHRASE_FILE_ALLOW_LAX_PERMS", "1");
}
let result = passphrase_from_file(&p);
unsafe {
std::env::remove_var("AI_MEMORY_PASSPHRASE_FILE_ALLOW_LAX_PERMS");
}
assert_eq!(
result.unwrap(),
"secret",
"#1055: env-var escape hatch MUST restore legacy permissive posture"
);
}
#[test]
fn test_anonymize_set_when_config_true_and_env_unset() {
let _g = env_var_lock();
unsafe { std::env::remove_var("AI_MEMORY_ANONYMIZE") };
let mut cfg = AppConfig::default();
cfg.identity = Some(crate::config::IdentityConfig {
anonymize_default: true,
});
apply_anonymize_default(&cfg);
assert_eq!(std::env::var("AI_MEMORY_ANONYMIZE").unwrap(), "1");
unsafe { std::env::remove_var("AI_MEMORY_ANONYMIZE") };
}
#[test]
fn test_anonymize_unchanged_when_env_already_set() {
let _g = env_var_lock();
unsafe { std::env::set_var("AI_MEMORY_ANONYMIZE", "0") };
let mut cfg = AppConfig::default();
cfg.identity = Some(crate::config::IdentityConfig {
anonymize_default: true,
});
apply_anonymize_default(&cfg);
assert_eq!(std::env::var("AI_MEMORY_ANONYMIZE").unwrap(), "0");
unsafe { std::env::remove_var("AI_MEMORY_ANONYMIZE") };
}
#[test]
fn test_anonymize_unchanged_when_config_false() {
let _g = env_var_lock();
unsafe { std::env::remove_var("AI_MEMORY_ANONYMIZE") };
let cfg = AppConfig::default();
apply_anonymize_default(&cfg);
assert!(std::env::var("AI_MEMORY_ANONYMIZE").is_err());
}
#[tokio::test]
async fn test_bootstrap_serve_keyword_tier_no_embedder() {
let env = TestEnv::fresh();
let mut cfg = AppConfig::default();
cfg.tier = Some("keyword".to_string());
let args = args_with_db(&env.db_path);
let bs = bootstrap_serve(&env.db_path, &args, &cfg).await.unwrap();
assert!(bs.app_state.embedder.is_none());
let vi = bs.app_state.vector_index.lock().await;
assert!(vi.is_none());
assert_eq!(bs.task_handles.len(), 7);
for h in bs.task_handles {
h.abort();
}
}
#[tokio::test]
async fn test_bootstrap_serve_with_api_key_logs_enabled() {
let env = TestEnv::fresh();
let mut cfg = AppConfig::default();
cfg.tier = Some("keyword".to_string());
cfg.api_key = Some("test-key".to_string());
let args = args_with_db(&env.db_path);
let bs = bootstrap_serve(&env.db_path, &args, &cfg).await.unwrap();
assert_eq!(bs.api_key_state.key.as_deref(), Some("test-key"));
for h in bs.task_handles {
h.abort();
}
}
#[tokio::test]
async fn test_bootstrap_serve_federation_disabled_when_quorum_zero() {
let env = TestEnv::fresh();
let mut cfg = AppConfig::default();
cfg.tier = Some("keyword".to_string());
let args = args_with_db(&env.db_path);
let bs = bootstrap_serve(&env.db_path, &args, &cfg).await.unwrap();
assert!(bs.app_state.federation.is_none());
for h in bs.task_handles {
h.abort();
}
}
#[tokio::test]
async fn test_bootstrap_serve_federation_enabled_attaches_config() {
let env = TestEnv::fresh();
let mut cfg = AppConfig::default();
cfg.tier = Some("keyword".to_string());
let mut args = args_with_db(&env.db_path);
args.quorum_writes = 1;
args.quorum_peers = vec!["http://127.0.0.1:65530".to_string()];
args.quorum_timeout_ms = 100;
args.catchup_interval_secs = 0;
let bs = bootstrap_serve(&env.db_path, &args, &cfg).await.unwrap();
assert!(bs.app_state.federation.is_some());
for h in bs.task_handles {
h.abort();
}
}
#[tokio::test]
async fn test_bootstrap_serve_federation_enabled_with_catchup_loop() {
let env = TestEnv::fresh();
let mut cfg = AppConfig::default();
cfg.tier = Some("keyword".to_string());
let mut args = args_with_db(&env.db_path);
args.quorum_writes = 1;
args.quorum_peers = vec!["http://127.0.0.1:65531".to_string()];
args.quorum_timeout_ms = 100;
args.catchup_interval_secs = crate::SECS_PER_HOUR as u64; let bs = bootstrap_serve(&env.db_path, &args, &cfg).await.unwrap();
assert!(bs.app_state.federation.is_some());
for h in bs.task_handles {
h.abort();
}
}
#[tokio::test]
async fn test_bootstrap_serve_federation_invalid_peer_errors() {
let env = TestEnv::fresh();
let mut cfg = AppConfig::default();
cfg.tier = Some("keyword".to_string());
let mut args = args_with_db(&env.db_path);
args.quorum_writes = 1;
args.quorum_peers = vec![
"http://127.0.0.1:65532".to_string(),
"http://127.0.0.1:65532/".to_string(), ];
let res = bootstrap_serve(&env.db_path, &args, &cfg).await;
let err = match res {
Ok(_) => panic!("expected error from duplicate peer URLs"),
Err(e) => e,
};
let s = format!("{err:#}");
assert!(
s.contains("federation") || s.contains("duplicate"),
"got: {s}"
);
}
#[test]
fn test_build_vector_index_populated_db_returns_built_index() {
let env = TestEnv::fresh();
let conn = db::open(&env.db_path).unwrap();
let now = chrono::Utc::now().to_rfc3339();
let mem = crate::models::Memory {
id: uuid::Uuid::new_v4().to_string(),
tier: crate::models::Tier::Mid,
namespace: "ns".to_string(),
title: "t".to_string(),
content: "c".to_string(),
tags: vec![],
priority: 5,
confidence: 1.0,
source: "test".to_string(),
access_count: 0,
created_at: now.clone(),
updated_at: now,
last_accessed_at: None,
expires_at: None,
metadata: crate::models::default_metadata(),
reflection_depth: 0,
memory_kind: crate::models::MemoryKind::Observation,
entity_id: None,
persona_version: None,
citations: Vec::new(),
source_uri: None,
source_span: None,
confidence_source: crate::models::ConfidenceSource::CallerProvided,
confidence_signals: None,
confidence_decayed_at: None,
version: 1,
};
let id = db::insert(&conn, &mem).unwrap();
db::set_embedding(&conn, &id, &[1.0, 0.0, 0.0]).unwrap();
let idx = build_vector_index(&conn, true).expect("populated index");
assert!(
idx.len() >= 1,
"expected non-empty index, got len={}",
idx.len()
);
}
#[tokio::test]
async fn b3_1579_boot_loader_warms_index_off_the_startup_path() {
let env = TestEnv::fresh();
let conn = db::open(&env.db_path).unwrap();
let now = chrono::Utc::now().to_rfc3339();
let mut expected_ids = Vec::new();
for i in 0..3 {
let mem = crate::models::Memory {
id: uuid::Uuid::new_v4().to_string(),
tier: crate::models::Tier::Long,
namespace: "ns-b3".to_string(),
title: format!("warm-{i}"),
content: format!("warm body {i}"),
tags: vec![],
priority: 5,
confidence: 1.0,
source: "test".to_string(),
access_count: 0,
created_at: now.clone(),
updated_at: now.clone(),
last_accessed_at: None,
expires_at: None,
metadata: crate::models::default_metadata(),
reflection_depth: 0,
memory_kind: crate::models::MemoryKind::Observation,
entity_id: None,
persona_version: None,
citations: Vec::new(),
source_uri: None,
source_span: None,
confidence_source: crate::models::ConfidenceSource::CallerProvided,
confidence_signals: None,
confidence_decayed_at: None,
version: 1,
};
let id = db::insert(&conn, &mem).unwrap();
let mut v = [0.0_f32; 3];
v[i] = 1.0;
db::set_embedding(&conn, &id, &v).unwrap();
expected_ids.push(id);
}
drop(conn);
let state: Arc<Mutex<Option<VectorIndex>>> =
Arc::new(Mutex::new(Some(hnsw::VectorIndex::empty())));
let handle = spawn_vector_index_boot_load(env.db_path.clone(), Arc::clone(&state));
{
let guard = state.lock().await;
assert!(
guard.is_some(),
"index present (possibly cold) during warm-up"
);
}
tokio::task::spawn_blocking(move || handle.join().expect("loader thread"))
.await
.expect("join task");
let guard = state.lock().await;
let idx = guard.as_ref().expect("index");
assert_eq!(idx.len(), 3, "every stored embedding seeded");
assert!(
idx.is_fully_searchable(),
"loader must drive the #968 rebuild to a swapped-in graph"
);
let hits = idx.search(&[1.0, 0.0, 0.0], 1);
assert_eq!(
hits.first().map(|h| h.id.as_str()),
Some(expected_ids[0].as_str()),
"warmed index serves the seeded rows"
);
}
#[tokio::test(start_paused = true)]
async fn test_spawn_gc_loop_purges_expired_memories() {
let env = TestEnv::fresh();
let conn = db::open(&env.db_path).unwrap();
let past = (chrono::Utc::now() - chrono::Duration::days(1)).to_rfc3339();
let now = chrono::Utc::now().to_rfc3339();
let mem = crate::models::Memory {
id: uuid::Uuid::new_v4().to_string(),
tier: crate::models::Tier::Short,
namespace: "ns-gc".to_string(),
title: "stale".to_string(),
content: "stale".to_string(),
tags: vec![],
priority: 1,
confidence: 1.0,
source: "test".to_string(),
access_count: 0,
created_at: now.clone(),
updated_at: now,
last_accessed_at: None,
expires_at: Some(past),
metadata: crate::models::default_metadata(),
reflection_depth: 0,
memory_kind: crate::models::MemoryKind::Observation,
entity_id: None,
persona_version: None,
citations: Vec::new(),
source_uri: None,
source_span: None,
confidence_source: crate::models::ConfidenceSource::CallerProvided,
confidence_signals: None,
confidence_decayed_at: None,
version: 1,
};
db::insert(&conn, &mem).unwrap();
drop(conn);
let conn = db::open(&env.db_path).unwrap();
let state: Db = Arc::new(Mutex::new((
conn,
env.db_path.clone(),
ResolvedTtl::default(),
true,
)));
let h = spawn_gc_loop(state.clone(), Some(1), Duration::from_secs(60));
tokio::time::advance(Duration::from_secs(61)).await;
tokio::task::yield_now().await;
tokio::time::advance(Duration::from_secs(61)).await;
tokio::task::yield_now().await;
h.abort();
let _ = h.await;
}
#[tokio::test(start_paused = true)]
async fn test_spawn_wal_checkpoint_loop_runs_multiple_cycles() {
let env = TestEnv::fresh();
let conn = db::open(&env.db_path).unwrap();
let state: Db = Arc::new(Mutex::new((
conn,
env.db_path.clone(),
ResolvedTtl::default(),
true,
)));
let h = spawn_wal_checkpoint_loop(state, Duration::from_secs(2));
for _ in 0..4 {
tokio::time::advance(Duration::from_secs(2)).await;
tokio::task::yield_now().await;
}
h.abort();
let _ = h.await;
}
#[test]
fn test_urlencoding_minimal_round_trip() {
assert_eq!(urlencoding_minimal("abcXYZ-_.~"), "abcXYZ-_.~");
assert_eq!(urlencoding_minimal("0123456789"), "0123456789");
assert_eq!(urlencoding_minimal("a:b"), "a%3Ab");
assert_eq!(urlencoding_minimal("a/b"), "a%2Fb");
assert_eq!(urlencoding_minimal("a@b"), "a%40b");
assert_eq!(urlencoding_minimal("a+b"), "a%2Bb");
assert_eq!(urlencoding_minimal(" "), "%20");
assert_eq!(urlencoding_minimal(""), "");
assert_eq!(
urlencoding_minimal("2024-01-02T03:04:05+00:00"),
"2024-01-02T03%3A04%3A05%2B00%3A00"
);
}
fn no_config_env() -> std::sync::MutexGuard<'static, ()> {
env_var_lock()
}
#[tokio::test]
async fn test_run_dispatch_stats_command() {
let _g = no_config_env();
let env = TestEnv::fresh();
let cfg = AppConfig::default();
let cli =
Cli::try_parse_from(["ai-memory", "--db", env.db_path.to_str().unwrap(), "stats"])
.unwrap();
run(cli, &cfg).await.unwrap();
}
#[tokio::test]
async fn test_run_dispatch_namespaces_command() {
let _g = no_config_env();
let env = TestEnv::fresh();
let cfg = AppConfig::default();
let cli = Cli::try_parse_from([
"ai-memory",
"--db",
env.db_path.to_str().unwrap(),
"namespaces",
])
.unwrap();
run(cli, &cfg).await.unwrap();
}
#[tokio::test]
async fn test_run_dispatch_export_command() {
let _g = no_config_env();
let env = TestEnv::fresh();
let cfg = AppConfig::default();
let cli =
Cli::try_parse_from(["ai-memory", "--db", env.db_path.to_str().unwrap(), "export"])
.unwrap();
run(cli, &cfg).await.unwrap();
}
#[tokio::test]
async fn test_run_dispatch_list_command() {
let _g = no_config_env();
let env = TestEnv::fresh();
let cfg = AppConfig::default();
let cli = Cli::try_parse_from(["ai-memory", "--db", env.db_path.to_str().unwrap(), "list"])
.unwrap();
run(cli, &cfg).await.unwrap();
}
#[tokio::test]
async fn test_run_dispatch_search_command() {
let _g = no_config_env();
let env = TestEnv::fresh();
let cfg = AppConfig::default();
let cli = Cli::try_parse_from([
"ai-memory",
"--db",
env.db_path.to_str().unwrap(),
"search",
"anyq",
])
.unwrap();
run(cli, &cfg).await.unwrap();
}
#[tokio::test]
async fn test_run_dispatch_archive_list_command() {
let _g = no_config_env();
let env = TestEnv::fresh();
let cfg = AppConfig::default();
let cli = Cli::try_parse_from([
"ai-memory",
"--db",
env.db_path.to_str().unwrap(),
"archive",
"list",
])
.unwrap();
run(cli, &cfg).await.unwrap();
}
#[tokio::test]
async fn test_run_dispatch_agents_list_command() {
let _g = no_config_env();
let env = TestEnv::fresh();
let cfg = AppConfig::default();
let cli = Cli::try_parse_from([
"ai-memory",
"--db",
env.db_path.to_str().unwrap(),
"agents",
"list",
])
.unwrap();
run(cli, &cfg).await.unwrap();
}
#[tokio::test]
async fn test_run_dispatch_pending_list_command() {
let _g = no_config_env();
let env = TestEnv::fresh();
let cfg = AppConfig::default();
let cli = Cli::try_parse_from([
"ai-memory",
"--db",
env.db_path.to_str().unwrap(),
"pending",
"list",
])
.unwrap();
run(cli, &cfg).await.unwrap();
}
#[tokio::test]
async fn test_run_dispatch_completions_command() {
let _g = no_config_env();
let env = TestEnv::fresh();
let cfg = AppConfig::default();
let cli = Cli::try_parse_from([
"ai-memory",
"--db",
env.db_path.to_str().unwrap(),
"completions",
"bash",
])
.unwrap();
run(cli, &cfg).await.unwrap();
}
#[tokio::test]
async fn test_run_dispatch_man_command() {
let _g = no_config_env();
let env = TestEnv::fresh();
let cfg = AppConfig::default();
let cli = Cli::try_parse_from(["ai-memory", "--db", env.db_path.to_str().unwrap(), "man"])
.unwrap();
run(cli, &cfg).await.unwrap();
}
#[tokio::test]
async fn test_run_dispatch_gc_triggers_post_run_checkpoint() {
let _g = no_config_env();
let env = TestEnv::fresh();
let cfg = AppConfig::default();
let cli = Cli::try_parse_from(["ai-memory", "--db", env.db_path.to_str().unwrap(), "gc"])
.unwrap();
run(cli, &cfg).await.unwrap();
}
#[tokio::test]
async fn test_run_dispatch_resolve_command() {
let _g = no_config_env();
let env = TestEnv::fresh();
let id_a = crate::cli::test_utils::seed_memory(&env.db_path, "ns", "old", "old fact");
let id_b = crate::cli::test_utils::seed_memory(&env.db_path, "ns", "new", "new fact");
let cfg = AppConfig::default();
let cli = Cli::try_parse_from([
"ai-memory",
"--db",
env.db_path.to_str().unwrap(),
"resolve",
&id_a,
&id_b,
])
.unwrap();
run(cli, &cfg).await.unwrap();
}
#[tokio::test]
async fn test_run_dispatch_get_command() {
let _g = no_config_env();
let env = TestEnv::fresh();
let id = crate::cli::test_utils::seed_memory(&env.db_path, "ns", "t", "c");
let cfg = AppConfig::default();
let cli = Cli::try_parse_from([
"ai-memory",
"--db",
env.db_path.to_str().unwrap(),
"get",
&id,
])
.unwrap();
run(cli, &cfg).await.unwrap();
}
#[tokio::test]
async fn test_run_dispatch_verify_signed_events_chain_command() {
let _g = no_config_env();
let env = TestEnv::fresh();
let cfg = AppConfig::default();
let cli = Cli::try_parse_from([
"ai-memory",
"--db",
env.db_path.to_str().unwrap(),
"verify-signed-events-chain",
])
.unwrap();
run(cli, &cfg).await.unwrap();
}
#[tokio::test]
async fn test_run_dispatch_promote_triggers_write_checkpoint() {
let _g = no_config_env();
let env = TestEnv::fresh();
let id = crate::cli::test_utils::seed_memory(&env.db_path, "ns", "t", "c");
let cfg = AppConfig::default();
let cli = Cli::try_parse_from([
"ai-memory",
"--db",
env.db_path.to_str().unwrap(),
"promote",
&id,
])
.unwrap();
run(cli, &cfg).await.unwrap();
}
#[tokio::test]
async fn test_run_dispatch_bench_smoke_runs_one_iteration() {
let _g = no_config_env();
let env = TestEnv::fresh();
let cfg = AppConfig::default();
let cli = Cli::try_parse_from([
"ai-memory",
"--db",
env.db_path.to_str().unwrap(),
"bench",
"--iterations",
"1",
"--warmup",
"0",
])
.unwrap();
let _ = run(cli, &cfg).await;
}
#[tokio::test]
async fn test_run_dispatch_bench_json_with_history() {
let _g = no_config_env();
let env = TestEnv::fresh();
let history = env.db_path.with_file_name("hist.jsonl");
let cfg = AppConfig::default();
let cli = Cli::try_parse_from([
"ai-memory",
"--db",
env.db_path.to_str().unwrap(),
"bench",
"--iterations",
"1",
"--warmup",
"0",
"--json",
"--history",
history.to_str().unwrap(),
])
.unwrap();
let _ = run(cli, &cfg).await;
if history.exists() {
let content = std::fs::read_to_string(&history).unwrap();
assert!(content.contains("captured_at") || !content.is_empty());
}
}
#[cfg(feature = "sal")]
#[tokio::test]
async fn test_run_dispatch_migrate_sqlite_to_sqlite_dry_run() {
let _g = no_config_env();
let src_env = TestEnv::fresh();
let dst_env = TestEnv::fresh();
crate::cli::test_utils::seed_memory(&src_env.db_path, "ns-mig", "t", "c");
let from = format!("sqlite://{}", src_env.db_path.display());
let to = format!("sqlite://{}", dst_env.db_path.display());
let cfg = AppConfig::default();
let cli = Cli::try_parse_from([
"ai-memory",
"--db",
src_env.db_path.to_str().unwrap(),
"migrate",
"--from",
&from,
"--to",
&to,
"--dry-run",
])
.unwrap();
run(cli, &cfg).await.unwrap();
}
#[cfg(feature = "sal")]
#[tokio::test]
async fn test_run_dispatch_migrate_json_output() {
let _g = no_config_env();
let src_env = TestEnv::fresh();
let dst_env = TestEnv::fresh();
crate::cli::test_utils::seed_memory(&src_env.db_path, "ns-mig", "t", "c");
let from = format!("sqlite://{}", src_env.db_path.display());
let to = format!("sqlite://{}", dst_env.db_path.display());
let cfg = AppConfig::default();
let cli = Cli::try_parse_from([
"ai-memory",
"--db",
src_env.db_path.to_str().unwrap(),
"migrate",
"--from",
&from,
"--to",
&to,
"--json",
])
.unwrap();
run(cli, &cfg).await.unwrap();
}
#[tokio::test]
async fn test_run_with_db_passphrase_file_exports_env() {
let _g = env_var_lock();
unsafe { std::env::remove_var("AI_MEMORY_DB_PASSPHRASE") };
let env = TestEnv::fresh();
let pass_path = env.db_path.with_file_name("pass");
std::fs::write(&pass_path, "test-passphrase\n").unwrap();
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
std::fs::set_permissions(&pass_path, std::fs::Permissions::from_mode(0o400)).unwrap();
}
let cfg = AppConfig::default();
let cli = Cli::try_parse_from([
"ai-memory",
"--db",
env.db_path.to_str().unwrap(),
"--db-passphrase-file",
pass_path.to_str().unwrap(),
"stats",
])
.unwrap();
run(cli, &cfg).await.unwrap();
assert_eq!(
std::env::var("AI_MEMORY_DB_PASSPHRASE").unwrap(),
"test-passphrase"
);
unsafe { std::env::remove_var("AI_MEMORY_DB_PASSPHRASE") };
}
#[test]
fn test_init_tracing_is_idempotent() {
init_tracing();
init_tracing();
}
#[tokio::test]
async fn test_serve_http_with_shutdown_future_serves_then_stops() {
let env = TestEnv::fresh();
let app_state = keyword_app_state(&env.db_path);
let api_key_state = ApiKeyState {
key: None,
mtls_enforced: false,
};
let port = {
let l = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
let p = l.local_addr().unwrap().port();
drop(l);
p
};
let addr = format!("127.0.0.1:{port}");
let shutdown = Arc::new(Notify::new());
let shutdown_clone = shutdown.clone();
let handle = tokio::spawn(async move {
serve_http_with_shutdown_future(&addr, api_key_state, app_state, async move {
shutdown_clone.notified().await;
})
.await
});
for _ in 0..40 {
if let Ok(client) = reqwest::Client::builder()
.timeout(Duration::from_millis(200))
.build()
&& client
.get(format!("http://127.0.0.1:{port}/api/v1/health"))
.send()
.await
.is_ok()
{
break;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
shutdown.notify_one();
let res = handle.await.unwrap();
assert!(res.is_ok(), "serve future returned: {res:?}");
}
#[tokio::test]
async fn test_serve_http_with_shutdown_future_bind_failure_errors() {
let env = TestEnv::fresh();
let app_state = keyword_app_state(&env.db_path);
let api_key_state = ApiKeyState {
key: None,
mtls_enforced: false,
};
let res = serve_http_with_shutdown_future(
"definitely-not-an-address:99999",
api_key_state,
app_state,
async {},
)
.await;
assert!(res.is_err(), "expected bind error, got: {res:?}");
}
#[tokio::test]
async fn test_run_dispatch_identity_list_command() {
let _g = no_config_env();
let env = TestEnv::fresh();
let key_dir = env.db_path.parent().unwrap().join("keys");
std::fs::create_dir_all(&key_dir).unwrap();
let cfg = AppConfig::default();
let cli = Cli::try_parse_from([
"ai-memory",
"--db",
env.db_path.to_str().unwrap(),
"identity",
"--key-dir",
key_dir.to_str().unwrap(),
"list",
])
.unwrap();
run(cli, &cfg).await.unwrap();
}
#[tokio::test]
async fn test_run_dispatch_rules_list_command() {
let _g = no_config_env();
let env = TestEnv::fresh();
drop(crate::db::open(&env.db_path).expect("db::open"));
let key_dir = env.db_path.parent().unwrap().join("keys");
std::fs::create_dir_all(&key_dir).unwrap();
let cfg = AppConfig::default();
let cli = Cli::try_parse_from([
"ai-memory",
"--db",
env.db_path.to_str().unwrap(),
"rules",
"--key-dir",
key_dir.to_str().unwrap(),
"list",
])
.unwrap();
run(cli, &cfg).await.unwrap();
}
#[tokio::test]
async fn test_run_dispatch_governance_migrate_command() {
let _g = no_config_env();
let env = TestEnv::fresh();
let cfg_path = env.db_path.parent().unwrap().join("legacy_cfg.toml");
std::fs::write(
&cfg_path,
r#"
[governance]
[[governance.policy]]
scope = "team/eng/*"
action = "write"
role = "engineer"
decision = "allow"
"#,
)
.unwrap();
let cfg = AppConfig::default();
let cli = Cli::try_parse_from([
"ai-memory",
"--db",
env.db_path.to_str().unwrap(),
"governance",
"migrate-to-permissions",
"--config-in",
cfg_path.to_str().unwrap(),
])
.unwrap();
run(cli, &cfg).await.unwrap();
}
#[tokio::test]
async fn test_bootstrap_serve_mtls_enforced_true_with_all_three_tls_args() {
let env = TestEnv::fresh();
let mut cfg = AppConfig::default();
cfg.tier = Some("keyword".to_string());
cfg.api_key = Some("s3cret".to_string());
let mut args = args_with_db(&env.db_path);
let cert_path = env.db_path.parent().unwrap().join("cert.pem");
let key_path = env.db_path.parent().unwrap().join("key.pem");
let allowlist_path = env.db_path.parent().unwrap().join("allowlist.json");
args.tls_cert = Some(cert_path);
args.tls_key = Some(key_path);
args.mtls_allowlist = Some(allowlist_path);
let bs = bootstrap_serve(&env.db_path, &args, &cfg).await.unwrap();
assert!(
bs.api_key_state.mtls_enforced,
"mtls_enforced should be true when cert+key+allowlist all set"
);
assert_eq!(bs.api_key_state.key.as_deref(), Some("s3cret"));
for h in bs.task_handles {
h.abort();
}
}
#[tokio::test]
async fn test_bootstrap_serve_mtls_enforced_false_when_allowlist_absent() {
let env = TestEnv::fresh();
let mut cfg = AppConfig::default();
cfg.tier = Some("keyword".to_string());
cfg.api_key = Some("only-tls".to_string());
let mut args = args_with_db(&env.db_path);
args.tls_cert = Some(env.db_path.parent().unwrap().join("cert.pem"));
args.tls_key = Some(env.db_path.parent().unwrap().join("key.pem"));
let bs = bootstrap_serve(&env.db_path, &args, &cfg).await.unwrap();
assert!(
!bs.api_key_state.mtls_enforced,
"mtls_enforced should be false without --mtls-allowlist"
);
assert_eq!(bs.api_key_state.key.as_deref(), Some("only-tls"));
for h in bs.task_handles {
h.abort();
}
}
#[tokio::test]
async fn test_bootstrap_serve_mtls_enforced_false_when_only_allowlist_set() {
let env = TestEnv::fresh();
let mut cfg = AppConfig::default();
cfg.tier = Some("keyword".to_string());
let mut args = args_with_db(&env.db_path);
args.mtls_allowlist = Some(env.db_path.parent().unwrap().join("allowlist.json"));
let bs = bootstrap_serve(&env.db_path, &args, &cfg).await.unwrap();
assert!(
!bs.api_key_state.mtls_enforced,
"mtls_enforced should be false without --tls-cert"
);
for h in bs.task_handles {
h.abort();
}
}
#[tokio::test]
async fn test_bootstrap_serve_mtls_enforced_with_federation_threads_api_key() {
let env = TestEnv::fresh();
let mut cfg = AppConfig::default();
cfg.tier = Some("keyword".to_string());
cfg.api_key = Some("fed-key".to_string());
let mut args = args_with_db(&env.db_path);
args.tls_cert = Some(env.db_path.parent().unwrap().join("cert.pem"));
args.tls_key = Some(env.db_path.parent().unwrap().join("key.pem"));
args.mtls_allowlist = Some(env.db_path.parent().unwrap().join("allowlist.json"));
args.quorum_writes = 1;
args.quorum_peers = vec!["http://127.0.0.1:65520".to_string()];
args.quorum_timeout_ms = 100;
let bs = bootstrap_serve(&env.db_path, &args, &cfg).await.unwrap();
assert!(bs.api_key_state.mtls_enforced);
assert_eq!(bs.api_key_state.key.as_deref(), Some("fed-key"));
assert!(
bs.app_state.federation.is_some(),
"federation should be wired when quorum_writes>0 and peers nonempty"
);
for h in bs.task_handles {
h.abort();
}
}
#[tokio::test]
async fn test_build_router_with_mtls_enforced_allows_sync_without_api_key() {
let env = TestEnv::fresh();
let app_state = keyword_app_state(&env.db_path);
let api_key_state = ApiKeyState {
key: Some("s3cret".to_string()),
mtls_enforced: true,
};
let router = build_router(app_state, api_key_state);
let resp = router
.oneshot(
Request::builder()
.method("POST")
.uri("/api/v1/sync/push")
.header(crate::HEADER_CONTENT_TYPE, crate::MIME_JSON)
.body(Body::from("{}"))
.unwrap(),
)
.await
.unwrap();
assert_ne!(
resp.status(),
StatusCode::UNAUTHORIZED,
"expected /sync/* to bypass api-key with mtls_enforced=true, got 401"
);
}
#[tokio::test]
async fn test_build_router_with_mtls_enforced_still_requires_key_on_non_sync() {
let env = TestEnv::fresh();
let app_state = keyword_app_state(&env.db_path);
let api_key_state = ApiKeyState {
key: Some("s3cret".to_string()),
mtls_enforced: true,
};
let router = build_router(app_state, api_key_state);
let resp = router
.oneshot(
Request::builder()
.method("GET")
.uri("/api/v1/memories")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(
resp.status(),
StatusCode::UNAUTHORIZED,
"non-/sync/ path must still demand x-api-key even with mtls_enforced"
);
}
#[tokio::test]
async fn test_build_router_with_mtls_off_does_not_bypass_sync() {
let env = TestEnv::fresh();
let app_state = keyword_app_state(&env.db_path);
let api_key_state = ApiKeyState {
key: Some("s3cret".to_string()),
mtls_enforced: false,
};
let router = build_router(app_state, api_key_state);
let resp = router
.oneshot(
Request::builder()
.method("POST")
.uri("/api/v1/sync/push")
.header(crate::HEADER_CONTENT_TYPE, crate::MIME_JSON)
.body(Body::from("{}"))
.unwrap(),
)
.await
.unwrap();
assert_eq!(
resp.status(),
StatusCode::UNAUTHORIZED,
"without mtls_enforced, /sync/* must still demand x-api-key"
);
}
#[tokio::test]
async fn test_build_router_with_mtls_enforced_accepts_valid_key_on_non_sync() {
let env = TestEnv::fresh();
let app_state = keyword_app_state(&env.db_path);
let api_key_state = ApiKeyState {
key: Some("s3cret".to_string()),
mtls_enforced: true,
};
let router = build_router(app_state, api_key_state);
let resp = router
.oneshot(
Request::builder()
.method("GET")
.uri("/api/v1/memories")
.header("x-api-key", "s3cret")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert!(
resp.status().is_success(),
"valid api-key on non-/sync/ path should succeed, got {}",
resp.status()
);
}
#[tokio::test]
async fn test_spawn_gc_loop_with_shadow_retention_runs_and_can_be_aborted() {
let env = TestEnv::fresh();
let conn = db::open(&env.db_path).unwrap();
let state: Db = Arc::new(Mutex::new((
conn,
env.db_path.clone(),
ResolvedTtl::default(),
true,
)));
let h = spawn_gc_loop_with_shadow_retention(state, Some(30), 7, Duration::from_secs(60));
tokio::time::sleep(Duration::from_millis(20)).await;
h.abort();
let _ = h.await;
}
#[tokio::test]
async fn test_spawn_gc_loop_with_shadow_retention_zero_days_is_opt_out() {
let env = TestEnv::fresh();
let conn = db::open(&env.db_path).unwrap();
let state: Db = Arc::new(Mutex::new((
conn,
env.db_path.clone(),
ResolvedTtl::default(),
true,
)));
let h = spawn_gc_loop_with_shadow_retention(
state,
None,
0, Duration::from_secs(60),
);
tokio::time::sleep(Duration::from_millis(20)).await;
h.abort();
let _ = h.await;
}
#[tokio::test]
async fn test_spawn_transcript_lifecycle_sweep_loop_runs_and_can_be_aborted() {
let env = TestEnv::fresh();
let conn = db::open(&env.db_path).unwrap();
let state: Db = Arc::new(Mutex::new((
conn,
env.db_path.clone(),
ResolvedTtl::default(),
true,
)));
let cfg = crate::config::TranscriptsConfig::default();
let h = spawn_transcript_lifecycle_sweep_loop(state, cfg, Duration::from_secs(60));
tokio::time::sleep(Duration::from_millis(20)).await;
h.abort();
let _ = h.await;
}
#[tokio::test]
async fn test_spawn_agent_quota_reset_loop_runs_and_can_be_aborted() {
let env = TestEnv::fresh();
let conn = db::open(&env.db_path).unwrap();
let state: Db = Arc::new(Mutex::new((
conn,
env.db_path.clone(),
ResolvedTtl::default(),
true,
)));
let h = spawn_agent_quota_reset_loop(state, Duration::from_secs(60));
tokio::time::sleep(Duration::from_millis(20)).await;
h.abort();
let _ = h.await;
}
#[tokio::test]
async fn test_bootstrap_serve_sec2_fail_closed_when_pubkey_missing_and_rules_enabled() {
let _no_pubkey_guard = crate::governance::rules_store::force_no_operator_pubkey_for_test();
let _gate = env_var_lock();
let env = TestEnv::fresh();
let conn = db::open(&env.db_path).unwrap();
conn.execute_batch(
"CREATE TABLE IF NOT EXISTS governance_rules (
id TEXT PRIMARY KEY,
kind TEXT NOT NULL,
matcher TEXT NOT NULL,
severity TEXT NOT NULL CHECK (severity IN ('refuse','warn','log')),
reason TEXT NOT NULL,
namespace TEXT NOT NULL DEFAULT '_global',
created_by TEXT NOT NULL,
created_at INTEGER NOT NULL,
enabled INTEGER NOT NULL DEFAULT 1,
signature BLOB,
attest_level TEXT NOT NULL DEFAULT 'unsigned'
);",
)
.unwrap();
conn.execute(
"INSERT INTO governance_rules (id, kind, matcher, severity, reason, created_by, created_at)
VALUES ('R1', 'bash', '{\"k\":\"v\"}', 'refuse', 'test', 'tester', 100)",
[],
)
.unwrap();
drop(conn);
let mut cfg = AppConfig::default();
cfg.tier = Some("keyword".to_string());
cfg.governance = Some(crate::config::GovernanceConfig {
require_operator_pubkey: true,
});
let prior = std::env::var("AI_MEMORY_OPERATOR_PUBKEY").ok();
unsafe { std::env::remove_var("AI_MEMORY_OPERATOR_PUBKEY") };
let args = args_with_db(&env.db_path);
let res = bootstrap_serve(&env.db_path, &args, &cfg).await;
if let Some(v) = prior {
unsafe { std::env::set_var("AI_MEMORY_OPERATOR_PUBKEY", v) };
}
let err = match res {
Err(e) => format!("{e:#}"),
Ok(_) => panic!("expected SEC-2 fail-closed refusal"),
};
assert!(
err.contains("SEC-2 fail-closed") || err.contains("require_operator_pubkey"),
"got: {err}"
);
}
#[tokio::test]
async fn test_build_llm_client_returns_none_for_keyword_tier() {
let _guard = env_var_lock();
fx_f1_clear_llm_env();
let cfg = AppConfig::default();
let res = build_llm_client(FeatureTier::Keyword, &cfg).await;
assert!(res.is_none(), "keyword tier must not build an LLM client");
}
#[tokio::test]
async fn test_build_llm_client_returns_none_when_ollama_unreachable() {
let _guard = env_var_lock();
fx_f1_clear_llm_env();
let mut cfg = AppConfig::default();
cfg.ollama_url = Some("http://127.0.0.1:1".to_string());
let res = build_llm_client(FeatureTier::Smart, &cfg).await;
let _ = res;
}
#[test]
fn test_build_vector_index_returns_some_when_embedder_present_and_db_empty() {
let env = TestEnv::fresh();
let conn = db::open(&env.db_path).unwrap();
let mem = crate::models::Memory {
id: "vi-1".to_string(),
tier: crate::models::Tier::Mid,
namespace: "test".to_string(),
title: "t".to_string(),
content: "c".to_string(),
tags: vec![],
priority: 5,
confidence: 1.0,
source: "test".to_string(),
access_count: 0,
created_at: chrono::Utc::now().to_rfc3339(),
updated_at: chrono::Utc::now().to_rfc3339(),
last_accessed_at: None,
expires_at: None,
metadata: crate::models::default_metadata(),
reflection_depth: 0,
memory_kind: crate::models::MemoryKind::Observation,
entity_id: None,
persona_version: None,
citations: Vec::new(),
source_uri: None,
source_span: None,
confidence_source: crate::models::ConfidenceSource::CallerProvided,
confidence_signals: None,
confidence_decayed_at: None,
version: 1,
};
let inserted_id = db::insert(&conn, &mem).unwrap();
let vec_data: Vec<f32> = (0..384).map(|i| i as f32 * 0.001).collect();
db::set_embedding(&conn, &inserted_id, &vec_data).unwrap();
let idx = build_vector_index(&conn, true);
assert!(idx.is_some());
}
#[cfg(feature = "sal")]
#[test]
fn resolve_configured_embedding_dim_resolver_arm_wins_for_known_model() {
use crate::config::{AppConfig, EmbeddingsSection, FeatureTier};
let cfg = AppConfig {
embeddings: Some(EmbeddingsSection {
backend: Some("ollama".to_string()),
model: Some("bge-large-en".to_string()),
..EmbeddingsSection::default()
}),
..AppConfig::default()
};
let tier_config = FeatureTier::Autonomous.config();
let dim = resolve_configured_embedding_dim(&cfg, &tier_config);
assert_eq!(
dim,
Some(1024),
"bge-large-en is in KNOWN_EMBEDDING_DIMS at 1024-dim; resolver wins"
);
}
#[cfg(feature = "sal")]
#[test]
fn resolve_configured_embedding_dim_handles_legacy_alias_via_resolver() {
use crate::config::{AppConfig, FeatureTier};
let cfg = AppConfig {
embedding_model: Some("nomic_embed_v15".to_string()),
..AppConfig::default()
};
let tier_config = FeatureTier::Autonomous.config();
let dim = resolve_configured_embedding_dim(&cfg, &tier_config);
assert_eq!(
dim,
Some(768),
"legacy alias nomic_embed_v15 canonicalises to nomic-embed-text-v1.5 (768)"
);
}
#[cfg(feature = "sal")]
#[test]
fn resolve_configured_embedding_dim_falls_back_to_tier_preset_when_no_override() {
use crate::config::{AppConfig, FeatureTier};
let cfg = AppConfig::default();
let tier_config = FeatureTier::Autonomous.config();
let dim = resolve_configured_embedding_dim(&cfg, &tier_config);
assert_eq!(dim, Some(768));
}
#[cfg(feature = "sal")]
#[test]
fn resolve_configured_embedding_dim_returns_none_for_keyword_tier() {
use crate::config::{AppConfig, FeatureTier};
let cfg = AppConfig::default();
let tier_config = FeatureTier::Keyword.config();
let dim = resolve_configured_embedding_dim(&cfg, &tier_config);
assert_eq!(dim, Some(768));
}
#[cfg(feature = "sal")]
#[test]
fn resolve_configured_embedding_dim_unknown_model_falls_to_tier_preset() {
use crate::config::{AppConfig, EmbeddingsSection, FeatureTier};
let cfg = AppConfig {
embeddings: Some(EmbeddingsSection {
backend: Some("ollama".to_string()),
model: Some("my-private-fork-v0.1".to_string()),
..EmbeddingsSection::default()
}),
..AppConfig::default()
};
let tier_config = FeatureTier::Autonomous.config();
let dim = resolve_configured_embedding_dim(&cfg, &tier_config);
assert_eq!(dim, Some(768));
}
fn fx_f1_clear_llm_env() {
for k in [
"AI_MEMORY_LLM_BACKEND",
"AI_MEMORY_LLM_MODEL",
"AI_MEMORY_LLM_BASE_URL",
"AI_MEMORY_LLM_API_KEY",
"OLLAMA_BASE_URL",
"XAI_API_KEY",
"OPENAI_API_KEY",
"ANTHROPIC_API_KEY",
"GEMINI_API_KEY",
"GOOGLE_API_KEY",
"DEEPSEEK_API_KEY",
"MOONSHOT_API_KEY",
"KIMI_API_KEY",
"DASHSCOPE_API_KEY",
"QWEN_API_KEY",
"MISTRAL_API_KEY",
"GROQ_API_KEY",
"TOGETHER_API_KEY",
"CEREBRAS_API_KEY",
"OPENROUTER_API_KEY",
"FIREWORKS_API_KEY",
] {
unsafe { std::env::remove_var(k) };
}
}
#[tokio::test]
async fn test_build_llm_client_semantic_tier_compiled_default_returns_none() {
let _guard = env_var_lock();
fx_f1_clear_llm_env();
let cfg = AppConfig::default();
let res = build_llm_client(FeatureTier::Semantic, &cfg).await;
assert!(
res.is_none(),
"semantic tier with no operator config must short-circuit to None"
);
}
#[tokio::test]
async fn test_build_llm_client_autonomous_tier_unreachable_ollama_returns_none() {
let _guard = env_var_lock();
fx_f1_clear_llm_env();
let mut cfg = AppConfig::default();
cfg.ollama_url = Some("http://127.0.0.1:1".to_string());
let res = build_llm_client(FeatureTier::Autonomous, &cfg).await;
assert!(
res.is_none(),
"autonomous tier against unreachable ollama must surface as None"
);
}
#[tokio::test]
async fn test_build_llm_client_xai_backend_without_api_key_returns_none() {
let _guard = env_var_lock();
fx_f1_clear_llm_env();
use crate::config::LlmSection;
let mut cfg = AppConfig::default();
cfg.llm = Some(LlmSection {
backend: Some("xai".to_string()),
model: Some("grok-4.3".to_string()),
api_key_env: Some("AI_MEMORY_FX_F1_NEVER_SET_XAI_KEY".to_string()),
..LlmSection::default()
});
let res = build_llm_client(FeatureTier::Smart, &cfg).await;
assert!(
res.is_none(),
"xai backend without API key MUST map to None (Err path)"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_build_llm_client_ollama_happy_path_against_wiremock() {
let _guard = env_var_lock();
fx_f1_clear_llm_env();
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/api/tags"))
.respond_with(ResponseTemplate::new(200).set_body_string(r#"{"models":[]}"#))
.mount(&server)
.await;
let mut cfg = AppConfig::default();
cfg.ollama_url = Some(server.uri());
cfg.llm_model = Some("test-model".to_string());
let res = build_llm_client(FeatureTier::Smart, &cfg).await;
assert!(
res.is_some(),
"wiremock-backed /api/tags must drive build_llm_client to Some"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_build_from_resolved_async_ollama_happy_path() {
let _guard = env_var_lock();
fx_f1_clear_llm_env();
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/api/tags"))
.respond_with(ResponseTemplate::new(200).set_body_string(r#"{"models":[]}"#))
.mount(&server)
.await;
let mut cfg = AppConfig::default();
cfg.ollama_url = Some(server.uri());
cfg.llm_model = Some("test-model".to_string());
let resolved = cfg.resolve_llm(None, None, None);
let client = crate::llm::OllamaClient::build_from_resolved_async(&resolved)
.await
.expect("build_from_resolved_async must succeed against healthy /api/tags");
assert!(client.is_some());
assert!(client.unwrap().is_ollama_native());
}
#[tokio::test(flavor = "multi_thread")]
async fn test_build_from_resolved_async_ollama_unreachable_errs() {
let _guard = env_var_lock();
fx_f1_clear_llm_env();
use std::net::TcpListener;
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let port = listener.local_addr().unwrap().port();
drop(listener);
let mut cfg = AppConfig::default();
cfg.ollama_url = Some(format!("http://127.0.0.1:{port}"));
cfg.llm_model = Some("test-model".to_string());
let resolved = cfg.resolve_llm(None, None, None);
let res = crate::llm::OllamaClient::build_from_resolved_async(&resolved).await;
assert!(
res.is_err(),
"unreachable Ollama endpoint MUST surface as Err"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_build_from_resolved_async_non_ollama_missing_key_errs() {
let _guard = env_var_lock();
fx_f1_clear_llm_env();
use crate::config::LlmSection;
let mut cfg = AppConfig::default();
cfg.llm = Some(LlmSection {
backend: Some("anthropic".to_string()),
model: Some("claude-opus-4.7".to_string()),
api_key_env: Some("AI_MEMORY_FX_F1_NEVER_SET_ANTHROPIC_KEY".to_string()),
..LlmSection::default()
});
let resolved = cfg.resolve_llm(None, None, None);
let res = crate::llm::OllamaClient::build_from_resolved_async(&resolved).await;
let err = match res {
Err(e) => e,
Ok(_) => panic!("anthropic backend without API key MUST Err"),
};
let msg = format!("{err}");
assert!(
msg.contains("requires an API key"),
"missing-key error must cite the API key requirement; got: {msg}"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_build_from_resolved_async_non_ollama_with_key_returns_some() {
let _guard = env_var_lock();
fx_f1_clear_llm_env();
use crate::config::LlmSection;
let env_name = "AI_MEMORY_FX_F1_OPENAI_KEY";
unsafe { std::env::set_var(env_name, "sk-test-fx-f1-fake-key") };
let mut cfg = AppConfig::default();
cfg.llm = Some(LlmSection {
backend: Some("openai".to_string()),
model: Some("gpt-5".to_string()),
api_key_env: Some(env_name.to_string()),
..LlmSection::default()
});
let resolved = cfg.resolve_llm(None, None, None);
let res = crate::llm::OllamaClient::build_from_resolved_async(&resolved).await;
unsafe { std::env::remove_var(env_name) };
let client = res.expect("openai backend with key MUST return Ok");
assert!(
client.is_some(),
"build_from_resolved_async with key MUST produce Some(client)"
);
assert!(
!client.unwrap().is_ollama_native(),
"openai backend must NOT report ollama-native"
);
}
#[tokio::test]
async fn test_build_llm_client_env_backend_unreachable_returns_none() {
let _guard = env_var_lock();
fx_f1_clear_llm_env();
unsafe {
std::env::set_var("AI_MEMORY_LLM_BACKEND", "ollama");
std::env::set_var("AI_MEMORY_LLM_BASE_URL", "http://127.0.0.1:1");
}
let cfg = AppConfig::default();
let res = build_llm_client(FeatureTier::Keyword, &cfg).await;
unsafe {
std::env::remove_var("AI_MEMORY_LLM_BACKEND");
std::env::remove_var("AI_MEMORY_LLM_BASE_URL");
}
assert!(
res.is_none(),
"env-source backend against unreachable URL MUST map to None"
);
}
#[test]
fn test_apply_anonymize_default_sets_env_when_unset() {
let _guard = env_var_lock();
let prev = std::env::var("AI_MEMORY_ANONYMIZE").ok();
unsafe { std::env::remove_var("AI_MEMORY_ANONYMIZE") };
let mut cfg = AppConfig::default();
cfg.identity = Some(crate::config::IdentityConfig {
anonymize_default: true,
..crate::config::IdentityConfig::default()
});
apply_anonymize_default(&cfg);
let got = std::env::var("AI_MEMORY_ANONYMIZE").ok();
match prev {
Some(v) => unsafe { std::env::set_var("AI_MEMORY_ANONYMIZE", v) },
None => unsafe { std::env::remove_var("AI_MEMORY_ANONYMIZE") },
}
assert_eq!(
got.as_deref(),
Some("1"),
"anonymize_default=true with env unset MUST set AI_MEMORY_ANONYMIZE=1"
);
}
#[test]
fn test_apply_anonymize_default_preserves_existing_env() {
let _guard = env_var_lock();
let prev = std::env::var("AI_MEMORY_ANONYMIZE").ok();
unsafe { std::env::set_var("AI_MEMORY_ANONYMIZE", "0") };
let mut cfg = AppConfig::default();
cfg.identity = Some(crate::config::IdentityConfig {
anonymize_default: true,
..crate::config::IdentityConfig::default()
});
apply_anonymize_default(&cfg);
let got = std::env::var("AI_MEMORY_ANONYMIZE").ok();
match prev {
Some(v) => unsafe { std::env::set_var("AI_MEMORY_ANONYMIZE", v) },
None => unsafe { std::env::remove_var("AI_MEMORY_ANONYMIZE") },
}
assert_eq!(
got.as_deref(),
Some("0"),
"env-var precedence: pre-set AI_MEMORY_ANONYMIZE MUST survive apply_anonymize_default"
);
}
#[test]
fn test_resolve_admin_agent_ids_skips_empty_entries() {
let _guard = env_var_lock();
let prev = std::env::var("AI_MEMORY_ADMIN_AGENT_IDS").ok();
unsafe { std::env::set_var("AI_MEMORY_ADMIN_AGENT_IDS", "alice,,bob,,") };
let ids = resolve_admin_agent_ids(None);
match prev {
Some(v) => unsafe { std::env::set_var("AI_MEMORY_ADMIN_AGENT_IDS", v) },
None => unsafe { std::env::remove_var("AI_MEMORY_ADMIN_AGENT_IDS") },
}
assert_eq!(
ids,
vec!["alice".to_string(), "bob".to_string()],
"empty entries between commas MUST be skipped, not surface as agent_ids"
);
}
#[test]
fn test_resolve_admin_agent_ids_drops_malformed_entries() {
let _guard = env_var_lock();
let prev = std::env::var("AI_MEMORY_ADMIN_AGENT_IDS").ok();
unsafe { std::env::set_var("AI_MEMORY_ADMIN_AGENT_IDS", "alice,bad id,*,bob") };
let ids = resolve_admin_agent_ids(None);
match prev {
Some(v) => unsafe { std::env::set_var("AI_MEMORY_ADMIN_AGENT_IDS", v) },
None => unsafe { std::env::remove_var("AI_MEMORY_ADMIN_AGENT_IDS") },
}
assert!(ids.contains(&"alice".to_string()));
assert!(ids.contains(&"bob".to_string()));
assert!(
!ids.iter().any(|s| s.contains(' ')),
"malformed entries MUST be dropped"
);
assert!(
!ids.contains(&"*".to_string()),
"wildcard `*` MUST be dropped (post-#980)"
);
}
#[test]
fn test_resolve_admin_agent_ids_falls_back_to_config() {
let _guard = env_var_lock();
let prev = std::env::var("AI_MEMORY_ADMIN_AGENT_IDS").ok();
unsafe { std::env::remove_var("AI_MEMORY_ADMIN_AGENT_IDS") };
let ids = resolve_admin_agent_ids(None);
if let Some(v) = prev {
unsafe { std::env::set_var("AI_MEMORY_ADMIN_AGENT_IDS", v) };
}
assert!(
ids.is_empty(),
"no env + no config MUST resolve to empty allowlist (secure default)"
);
}
#[test]
fn test_resolve_admin_agent_ids_whitespace_env_falls_to_config() {
let _guard = env_var_lock();
let prev = std::env::var("AI_MEMORY_ADMIN_AGENT_IDS").ok();
unsafe { std::env::set_var("AI_MEMORY_ADMIN_AGENT_IDS", " ") };
let ids = resolve_admin_agent_ids(None);
match prev {
Some(v) => unsafe { std::env::set_var("AI_MEMORY_ADMIN_AGENT_IDS", v) },
None => unsafe { std::env::remove_var("AI_MEMORY_ADMIN_AGENT_IDS") },
}
assert!(
ids.is_empty(),
"whitespace-only env MUST be treated as unset"
);
}
#[cfg(feature = "sal")]
#[tokio::test]
async fn fx_f2_build_store_handle_sqlite_url_scheme() {
let dir = tempfile::tempdir().unwrap();
let db = dir.path().join("scheme.db");
let url = format!("sqlite:///{}", db.display());
let (backend, store) = build_store_handle(
Some(&url),
&db,
None,
None,
crate::store::PoolConfig::default(),
)
.await
.expect("sqlite:// URL must dispatch to SqliteStore");
assert!(
matches!(backend, crate::handlers::StorageBackend::Sqlite),
"sqlite:// URL MUST resolve to StorageBackend::Sqlite"
);
drop(store);
}
#[cfg(feature = "sal")]
#[tokio::test]
async fn fx_f2_build_store_handle_unknown_scheme_errors() {
let dir = tempfile::tempdir().unwrap();
let db = dir.path().join("ignored.db");
let result = build_store_handle(
Some("mysql://host/db"),
&db,
None,
None,
crate::store::PoolConfig::default(),
)
.await;
let err = match result {
Ok(_) => panic!("unrecognised scheme MUST bail; got Ok"),
Err(e) => e,
};
let msg = format!("{err:#}");
assert!(
msg.contains("unrecognised --store-url"),
"bail message MUST include the canonical prefix; got: {msg}"
);
}
#[cfg(feature = "sal")]
#[tokio::test]
async fn fx_f2_build_store_handle_no_url_falls_through_to_db_path() {
let dir = tempfile::tempdir().unwrap();
let db = dir.path().join("fallthrough.db");
let (backend, _store) =
build_store_handle(None, &db, None, None, crate::store::PoolConfig::default())
.await
.expect("absent --store-url MUST resolve to SqliteStore via --db");
assert!(matches!(backend, crate::handlers::StorageBackend::Sqlite));
}
#[cfg(feature = "sal")]
#[test]
fn fx_f2_resolve_configured_embedding_dim_canonical_lookup_wins() {
let _g = env_var_lock();
let mut cfg = AppConfig::default();
cfg.embeddings = Some(crate::config::EmbeddingsSection {
model: Some("nomic-embed-text-v1.5".to_string()),
..crate::config::EmbeddingsSection::default()
});
let tier_cfg = FeatureTier::Semantic.config();
let dim = resolve_configured_embedding_dim(&cfg, &tier_cfg);
assert!(
matches!(dim, Some(d) if d == 768),
"canonical lookup MUST return 768 for nomic-embed-text-v1.5; got: {dim:?}"
);
}
#[cfg(feature = "sal")]
#[test]
fn fx_f2_resolve_configured_embedding_dim_legacy_flat_field_path() {
let _g = env_var_lock();
let mut cfg = AppConfig::default();
cfg.embedding_model = Some("mini_lm_l6_v2".to_string());
let tier_cfg = FeatureTier::Semantic.config();
let dim = resolve_configured_embedding_dim(&cfg, &tier_cfg);
assert!(
matches!(dim, Some(d) if d == 384),
"legacy flat-field path MUST resolve mini_lm_l6_v2 to 384; got: {dim:?}"
);
}
#[cfg(feature = "sal")]
#[test]
fn fx_f2_resolve_configured_embedding_dim_preset_fallback() {
let _g = env_var_lock();
let cfg = AppConfig::default();
let tier_cfg = FeatureTier::Semantic.config();
let dim = resolve_configured_embedding_dim(&cfg, &tier_cfg);
assert!(
dim.is_some(),
"Semantic tier preset MUST yield a dim via the fallback arm"
);
}
#[cfg(feature = "sal")]
#[test]
fn fx_f2_resolve_configured_embedding_dim_malformed_legacy_drops_silently() {
let _g = env_var_lock();
let mut cfg = AppConfig::default();
cfg.embedding_model = Some("not-a-real-model".to_string());
let tier_cfg = FeatureTier::Semantic.config();
let dim = resolve_configured_embedding_dim(&cfg, &tier_cfg);
assert!(
dim.is_some(),
"unparseable legacy embedding_model MUST be dropped silently \
(the .ok() arm), preset fallback fires"
);
}
#[tokio::test]
async fn fupc_spawn_gc_loop_body_archives_expired() {
use crate::models::{Memory, MemoryKind, Tier};
let env = TestEnv::fresh();
let conn = db::open(&env.db_path).unwrap();
let mem = Memory {
id: uuid::Uuid::new_v4().to_string(),
tier: Tier::Short,
namespace: "gc-ns".to_string(),
title: "expired".to_string(),
content: "stale".to_string(),
priority: 5,
confidence: 1.0,
source: "test".to_string(),
created_at: "2000-01-01T00:00:00Z".to_string(),
updated_at: "2000-01-01T00:00:00Z".to_string(),
expires_at: Some("2000-01-01T01:00:00Z".to_string()),
memory_kind: MemoryKind::Observation,
..Memory::default()
};
db::insert(&conn, &mem).unwrap();
let state: Db = Arc::new(Mutex::new((
conn,
env.db_path.clone(),
ResolvedTtl::default(),
true, )));
let h = spawn_gc_loop(state.clone(), Some(30), Duration::from_millis(1));
tokio::time::sleep(Duration::from_millis(40)).await;
h.abort();
let _ = h.await;
let lock = state.lock().await;
let remaining: i64 = lock
.0
.query_row(
"SELECT COUNT(*) FROM memories WHERE namespace = 'gc-ns'",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(
remaining, 0,
"gc loop body must have archived the expired row"
);
}
#[tokio::test]
async fn fupc_spawn_wal_checkpoint_loop_body_runs() {
let env = TestEnv::fresh();
let conn = db::open(&env.db_path).unwrap();
let state: Db = Arc::new(Mutex::new((
conn,
env.db_path.clone(),
ResolvedTtl::default(),
true,
)));
let h = spawn_wal_checkpoint_loop(state, Duration::from_millis(1));
tokio::time::sleep(Duration::from_millis(30)).await;
h.abort();
let _ = h.await;
}
#[tokio::test]
async fn fupc_spawn_transcript_lifecycle_sweep_body_runs_clean() {
let env = TestEnv::fresh();
let conn = db::open(&env.db_path).unwrap();
let state: Db = Arc::new(Mutex::new((
conn,
env.db_path.clone(),
ResolvedTtl::default(),
true,
)));
let h = spawn_transcript_lifecycle_sweep_loop(
state,
crate::config::TranscriptsConfig::default(),
Duration::from_millis(1),
);
tokio::time::sleep(Duration::from_millis(30)).await;
h.abort();
let _ = h.await;
}
#[tokio::test]
async fn fupc_spawn_agent_quota_reset_body_runs_clean() {
let env = TestEnv::fresh();
let conn = db::open(&env.db_path).unwrap();
let state: Db = Arc::new(Mutex::new((
conn,
env.db_path.clone(),
ResolvedTtl::default(),
true,
)));
let h = spawn_agent_quota_reset_loop(state, Duration::from_millis(1));
tokio::time::sleep(Duration::from_millis(30)).await;
h.abort();
let _ = h.await;
}
}