1use std::io::Write as _;
37use std::path::Path;
38use std::path::PathBuf;
39use std::sync::Arc;
40use std::sync::atomic::{AtomicBool, Ordering};
41use std::time::{Duration, Instant};
42
43use anyhow::{Context, Result};
44use axum::Router;
45use clap::{Args, CommandFactory, Parser, Subcommand};
46use clap_complete::{Shell, generate};
47use rusqlite::Connection;
48use tokio::sync::{Mutex, Notify};
49use tokio::task::JoinHandle;
50use tracing_subscriber::EnvFilter;
51
52use crate::cli::agents::{AgentsArgs, PendingArgs};
53use crate::cli::archive::ArchiveArgs;
54use crate::cli::audit::AuditArgs;
55use crate::cli::backup::{BackupArgs, RestoreArgs};
56use crate::cli::boot::BootArgs;
57use crate::cli::consolidate::{AutoConsolidateArgs, ConsolidateArgs};
58use crate::cli::crud::{DeleteArgs, GetArgs, ListArgs};
59use crate::cli::curator::CuratorArgs;
60use crate::cli::forget::ForgetArgs;
61use crate::cli::install::InstallArgs;
62use crate::cli::io::{ImportArgs, MineArgs};
63use crate::cli::link::{LinkArgs, ResolveArgs};
64use crate::cli::logs::LogsArgs;
65use crate::cli::promote::PromoteArgs;
66use crate::cli::recall::RecallArgs;
67use crate::cli::search::SearchArgs;
68use crate::cli::store::StoreArgs;
69use crate::cli::sync::{SyncArgs, SyncDaemonArgs};
70use crate::cli::update::UpdateArgs;
71use crate::cli::wrap::WrapArgs;
72use crate::config::{AppConfig, FeatureTier};
73use crate::embeddings::Embedder;
74use crate::handlers::{ApiKeyState, AppState, Db};
75use crate::hnsw::VectorIndex;
76use crate::{bench, cli, db, embeddings, federation, hnsw, llm, mcp, tls};
77
78#[cfg(feature = "sal")]
79use crate::migrate;
80
81const DEFAULT_DB: &str = "ai-memory.db";
82const DEFAULT_PORT: u16 = 9077;
83const GC_INTERVAL_SECS: u64 = 1800;
84const WAL_CHECKPOINT_INTERVAL_SECS: u64 = 600;
87
88#[derive(Parser)]
97#[command(
98 name = "ai-memory",
99 version,
100 about = "AI-agnostic persistent memory — MCP server, HTTP API, and CLI for any AI platform"
101)]
102pub struct Cli {
103 #[command(subcommand)]
104 pub command: Command,
105 #[arg(long, env = "AI_MEMORY_DB", default_value = DEFAULT_DB, global = true)]
106 pub db: PathBuf,
107 #[arg(long, global = true, default_value_t = false)]
109 pub json: bool,
110 #[arg(long, env = "AI_MEMORY_AGENT_ID", global = true)]
114 pub agent_id: Option<String>,
115 #[arg(long, global = true, value_name = "PATH")]
124 pub db_passphrase_file: Option<PathBuf>,
125}
126
127#[derive(Subcommand)]
128pub enum Command {
129 Serve(ServeArgs),
131 Mcp {
133 #[arg(long, default_value = "semantic")]
135 tier: String,
136 #[arg(long, env = "AI_MEMORY_PROFILE")]
143 profile: Option<String>,
144 },
145 Store(StoreArgs),
147 Update(UpdateArgs),
149 Recall(RecallArgs),
151 Search(SearchArgs),
153 Get(GetArgs),
155 List(ListArgs),
157 Delete(DeleteArgs),
159 Promote(PromoteArgs),
161 Forget(ForgetArgs),
163 Link(LinkArgs),
165 Consolidate(ConsolidateArgs),
167 Gc,
169 Stats,
171 Namespaces,
173 Export,
175 Import(ImportArgs),
177 Resolve(ResolveArgs),
179 Shell,
181 Sync(SyncArgs),
183 SyncDaemon(SyncDaemonArgs),
188 AutoConsolidate(AutoConsolidateArgs),
190 Completions(CompletionsArgs),
192 Man,
194 Mine(MineArgs),
196 Archive(ArchiveArgs),
198 Agents(AgentsArgs),
200 Pending(PendingArgs),
202 Backup(BackupArgs),
206 Restore(RestoreArgs),
211 Curator(CuratorArgs),
216 Bench(BenchArgs),
222 #[cfg(feature = "sal")]
227 Migrate(MigrateArgs),
228 Doctor(DoctorCliArgs),
235 Boot(BootArgs),
243 Install(InstallArgs),
250 Wrap(WrapArgs),
258 Logs(LogsArgs),
263 Audit(AuditArgs),
267}
268
269#[derive(Args)]
273pub struct DoctorCliArgs {
274 #[arg(long, value_name = "URL")]
278 pub remote: Option<String>,
279 #[arg(long)]
282 pub json: bool,
283 #[arg(long)]
286 pub fail_on_warn: bool,
287 #[arg(long)]
293 pub tokens: bool,
294 #[arg(long, value_name = "PROFILE")]
298 pub profile: Option<String>,
299 #[arg(long)]
303 pub raw_table: bool,
304}
305
306#[derive(Args)]
307pub struct BenchArgs {
308 #[arg(long, default_value_t = bench::DEFAULT_ITERATIONS)]
310 pub iterations: usize,
311 #[arg(long, default_value_t = bench::DEFAULT_WARMUP)]
314 pub warmup: usize,
315 #[arg(long)]
317 pub json: bool,
318 #[arg(long, value_name = "PATH")]
324 pub baseline: Option<String>,
325 #[arg(long, default_value_t = bench::DEFAULT_REGRESSION_THRESHOLD_PCT)]
329 pub regression_threshold: f64,
330 #[arg(long, value_name = "PATH")]
338 pub history: Option<PathBuf>,
339}
340
341#[cfg(feature = "sal")]
342#[derive(Args)]
343pub struct MigrateArgs {
344 #[arg(long)]
347 pub from: String,
348 #[arg(long)]
350 pub to: String,
351 #[arg(long, default_value_t = 1000)]
353 pub batch: usize,
354 #[arg(long)]
356 pub namespace: Option<String>,
357 #[arg(long)]
359 pub dry_run: bool,
360 #[arg(long)]
362 pub json: bool,
363}
364
365#[derive(Args)]
366pub struct ServeArgs {
367 #[arg(long, default_value = "127.0.0.1")]
368 pub host: String,
369 #[arg(long, default_value_t = DEFAULT_PORT)]
370 pub port: u16,
371 #[arg(long, requires = "tls_key")]
376 pub tls_cert: Option<PathBuf>,
377 #[arg(long, requires = "tls_cert")]
379 pub tls_key: Option<PathBuf>,
380 #[arg(long, requires = "tls_cert")]
390 pub mtls_allowlist: Option<PathBuf>,
391 #[arg(long, default_value_t = 30)]
396 pub shutdown_grace_secs: u64,
397
398 #[arg(long, default_value_t = 0)]
405 pub quorum_writes: usize,
406 #[arg(long, value_delimiter = ',')]
410 pub quorum_peers: Vec<String>,
411 #[arg(long, default_value_t = 2000)]
414 pub quorum_timeout_ms: u64,
415 #[arg(long)]
418 pub quorum_client_cert: Option<PathBuf>,
419 #[arg(long)]
421 pub quorum_client_key: Option<PathBuf>,
422 #[arg(long)]
429 pub quorum_ca_cert: Option<PathBuf>,
430 #[arg(long, default_value_t = 30)]
435 pub catchup_interval_secs: u64,
436}
437
438#[derive(Args)]
439pub struct CompletionsArgs {
440 pub shell: Shell,
441}
442
443#[allow(clippy::too_many_lines)]
454pub async fn run(cli: Cli, app_config: &AppConfig) -> Result<()> {
455 if let Some(path) = &cli.db_passphrase_file {
461 let passphrase = passphrase_from_file(path)?;
462 unsafe { std::env::set_var("AI_MEMORY_DB_PASSPHRASE", passphrase) };
464 }
465 let db_path = app_config.effective_db(&cli.db);
466 let j = cli.json;
467 let cli_agent_id: Option<String> = cli.agent_id.clone();
468 let needs_checkpoint = is_write_command(&cli.command);
470 let db_path_for_checkpoint = if needs_checkpoint {
471 Some(db_path.clone())
472 } else {
473 None
474 };
475
476 let result = match cli.command {
477 Command::Serve(a) => serve(db_path, a, app_config).await,
478 Command::Mcp { tier, profile } => {
479 let feature_tier = app_config.effective_tier(Some(&tier));
480 let resolved_profile = match app_config.effective_profile(profile.as_deref()) {
485 Ok(p) => p,
486 Err(e) => {
487 eprintln!("ai-memory mcp: invalid profile: {e}");
488 std::process::exit(2);
489 }
490 };
491 mcp::run_mcp_server(&db_path, feature_tier, app_config, &resolved_profile)?;
492 Ok(())
493 }
494 Command::Store(a) => {
495 let stdout = std::io::stdout();
496 let stderr = std::io::stderr();
497 let mut so = stdout.lock();
498 let mut se = stderr.lock();
499 let mut out = cli::CliOutput::from_std(&mut so, &mut se);
500 cli::store::run(
501 &db_path,
502 a,
503 j,
504 app_config,
505 cli_agent_id.as_deref(),
506 &mut out,
507 )
508 }
509 Command::Update(a) => {
510 let stdout = std::io::stdout();
511 let stderr = std::io::stderr();
512 let mut so = stdout.lock();
513 let mut se = stderr.lock();
514 let mut out = cli::CliOutput::from_std(&mut so, &mut se);
515 cli::update::run(&db_path, &a, j, &mut out)
516 }
517 Command::Recall(a) => {
518 let stdout = std::io::stdout();
519 let stderr = std::io::stderr();
520 let mut so = stdout.lock();
521 let mut se = stderr.lock();
522 let mut out = cli::CliOutput::from_std(&mut so, &mut se);
523 cli::recall::run(&db_path, &a, j, app_config, &mut out)
524 }
525 Command::Search(a) => {
526 let stdout = std::io::stdout();
527 let stderr = std::io::stderr();
528 let mut so = stdout.lock();
529 let mut se = stderr.lock();
530 let mut out = cli::CliOutput::from_std(&mut so, &mut se);
531 cli::search::run(&db_path, &a, j, &mut out)
532 }
533 Command::Get(a) => {
534 let stdout = std::io::stdout();
535 let stderr = std::io::stderr();
536 let mut so = stdout.lock();
537 let mut se = stderr.lock();
538 let mut out = cli::CliOutput::from_std(&mut so, &mut se);
539 cli::crud::cmd_get(&db_path, &a, j, &mut out)
540 }
541 Command::List(a) => {
542 let stdout = std::io::stdout();
543 let stderr = std::io::stderr();
544 let mut so = stdout.lock();
545 let mut se = stderr.lock();
546 let mut out = cli::CliOutput::from_std(&mut so, &mut se);
547 cli::crud::cmd_list(&db_path, &a, j, app_config, &mut out)
548 }
549 Command::Delete(a) => {
550 let stdout = std::io::stdout();
551 let stderr = std::io::stderr();
552 let mut so = stdout.lock();
553 let mut se = stderr.lock();
554 let mut out = cli::CliOutput::from_std(&mut so, &mut se);
555 cli::crud::cmd_delete(&db_path, &a, j, cli_agent_id.as_deref(), &mut out)
556 }
557 Command::Promote(a) => {
558 let stdout = std::io::stdout();
559 let stderr = std::io::stderr();
560 let mut so = stdout.lock();
561 let mut se = stderr.lock();
562 let mut out = cli::CliOutput::from_std(&mut so, &mut se);
563 cli::promote::cmd_promote(&db_path, &a, j, cli_agent_id.as_deref(), &mut out)
564 }
565 Command::Forget(a) => {
566 let stdout = std::io::stdout();
567 let stderr = std::io::stderr();
568 let mut so = stdout.lock();
569 let mut se = stderr.lock();
570 let mut out = cli::CliOutput::from_std(&mut so, &mut se);
571 cli::forget::cmd_forget(&db_path, &a, j, &mut out)
572 }
573 Command::Link(a) => {
574 let stdout = std::io::stdout();
575 let stderr = std::io::stderr();
576 let mut so = stdout.lock();
577 let mut se = stderr.lock();
578 let mut out = cli::CliOutput::from_std(&mut so, &mut se);
579 cli::link::cmd_link(&db_path, &a, j, &mut out)
580 }
581 Command::Consolidate(a) => {
582 let stdout = std::io::stdout();
583 let stderr = std::io::stderr();
584 let mut so = stdout.lock();
585 let mut se = stderr.lock();
586 let mut out = cli::CliOutput::from_std(&mut so, &mut se);
587 cli::consolidate::run(&db_path, a, j, cli_agent_id.as_deref(), &mut out)
588 }
589 Command::Resolve(a) => {
590 let stdout = std::io::stdout();
591 let stderr = std::io::stderr();
592 let mut so = stdout.lock();
593 let mut se = stderr.lock();
594 let mut out = cli::CliOutput::from_std(&mut so, &mut se);
595 cli::link::cmd_resolve(&db_path, &a, j, &mut out)
596 }
597 Command::Shell => cli::shell::run(&db_path),
598 Command::Sync(a) => {
599 let stdout = std::io::stdout();
600 let stderr = std::io::stderr();
601 let mut so = stdout.lock();
602 let mut se = stderr.lock();
603 let mut out = cli::CliOutput::from_std(&mut so, &mut se);
604 cli::sync::run(&db_path, &a, j, cli_agent_id.as_deref(), &mut out)
605 }
606 Command::SyncDaemon(a) => cli::sync::run_daemon(&db_path, a, cli_agent_id.as_deref()).await,
607 Command::AutoConsolidate(a) => {
608 let stdout = std::io::stdout();
609 let stderr = std::io::stderr();
610 let mut so = stdout.lock();
611 let mut se = stderr.lock();
612 let mut out = cli::CliOutput::from_std(&mut so, &mut se);
613 cli::consolidate::run_auto(&db_path, &a, j, cli_agent_id.as_deref(), &mut out)
614 }
615 Command::Gc => {
616 let stdout = std::io::stdout();
617 let stderr = std::io::stderr();
618 let mut so = stdout.lock();
619 let mut se = stderr.lock();
620 let mut out = cli::CliOutput::from_std(&mut so, &mut se);
621 cli::gc::run_gc(&db_path, j, app_config, &mut out)
622 }
623 Command::Stats => {
624 let stdout = std::io::stdout();
625 let stderr = std::io::stderr();
626 let mut so = stdout.lock();
627 let mut se = stderr.lock();
628 let mut out = cli::CliOutput::from_std(&mut so, &mut se);
629 cli::gc::run_stats(&db_path, j, &mut out)
630 }
631 Command::Namespaces => {
632 let stdout = std::io::stdout();
633 let stderr = std::io::stderr();
634 let mut so = stdout.lock();
635 let mut se = stderr.lock();
636 let mut out = cli::CliOutput::from_std(&mut so, &mut se);
637 cli::gc::run_namespaces(&db_path, j, &mut out)
638 }
639 Command::Export => {
640 let stdout = std::io::stdout();
641 let stderr = std::io::stderr();
642 let mut so = stdout.lock();
643 let mut se = stderr.lock();
644 let mut out = cli::CliOutput::from_std(&mut so, &mut se);
645 cli::io::export(&db_path, &mut out)
646 }
647 Command::Import(a) => {
648 let stdout = std::io::stdout();
649 let stderr = std::io::stderr();
650 let mut so = stdout.lock();
651 let mut se = stderr.lock();
652 let mut out = cli::CliOutput::from_std(&mut so, &mut se);
653 cli::io::import(&db_path, &a, j, cli_agent_id.as_deref(), &mut out)
654 }
655 Command::Completions(a) => {
656 generate(
657 a.shell,
658 &mut Cli::command(),
659 "ai-memory",
660 &mut std::io::stdout(),
661 );
662 Ok(())
663 }
664 Command::Man => {
665 let cmd = Cli::command();
666 let man = clap_mangen::Man::new(cmd);
667 man.render(&mut std::io::stdout())?;
668 Ok(())
669 }
670 Command::Mine(a) => {
671 let stdout = std::io::stdout();
672 let stderr = std::io::stderr();
673 let mut so = stdout.lock();
674 let mut se = stderr.lock();
675 let mut out = cli::CliOutput::from_std(&mut so, &mut se);
676 cli::io::mine(
677 &db_path,
678 a,
679 j,
680 app_config,
681 cli_agent_id.as_deref(),
682 &mut out,
683 )
684 }
685 Command::Archive(a) => {
686 let stdout = std::io::stdout();
687 let stderr = std::io::stderr();
688 let mut so = stdout.lock();
689 let mut se = stderr.lock();
690 let mut out = cli::CliOutput::from_std(&mut so, &mut se);
691 cli::archive::run(&db_path, a, j, &mut out)
692 }
693 Command::Agents(a) => {
694 let stdout = std::io::stdout();
695 let stderr = std::io::stderr();
696 let mut so = stdout.lock();
697 let mut se = stderr.lock();
698 let mut out = cli::CliOutput::from_std(&mut so, &mut se);
699 cli::agents::run_agents(&db_path, a, j, &mut out)
700 }
701 Command::Pending(a) => {
702 let stdout = std::io::stdout();
703 let stderr = std::io::stderr();
704 let mut so = stdout.lock();
705 let mut se = stderr.lock();
706 let mut out = cli::CliOutput::from_std(&mut so, &mut se);
707 cli::agents::run_pending(&db_path, a, j, cli_agent_id.as_deref(), &mut out)
708 }
709 Command::Backup(a) => {
710 let stdout = std::io::stdout();
711 let stderr = std::io::stderr();
712 let mut so = stdout.lock();
713 let mut se = stderr.lock();
714 let mut out = cli::CliOutput::from_std(&mut so, &mut se);
715 cli::backup::run_backup(&db_path, &a, j, &mut out)
716 }
717 Command::Restore(a) => {
718 let stdout = std::io::stdout();
719 let stderr = std::io::stderr();
720 let mut so = stdout.lock();
721 let mut se = stderr.lock();
722 let mut out = cli::CliOutput::from_std(&mut so, &mut se);
723 cli::backup::run_restore(&db_path, &a, j, &mut out)
724 }
725 Command::Curator(a) => {
726 let stdout = std::io::stdout();
727 let stderr = std::io::stderr();
728 let mut so = stdout.lock();
729 let mut se = stderr.lock();
730 let mut out = cli::CliOutput::from_std(&mut so, &mut se);
731 cli::curator::run(&db_path, &a, app_config, &mut out).await
732 }
733 Command::Bench(a) => cmd_bench(&a),
734 #[cfg(feature = "sal")]
735 Command::Migrate(a) => cmd_migrate(&a).await,
736 Command::Doctor(a) => {
737 let db_path_doctor = db_path.clone();
746 if a.tokens || a.raw_table {
751 let stdout = std::io::stdout();
752 let stderr = std::io::stderr();
753 let mut so = stdout.lock();
754 let mut se = stderr.lock();
755 let mut out = cli::CliOutput::from_std(&mut so, &mut se);
756 let exit = cli::doctor::run_tokens(
757 cli::doctor::TokensArgs {
758 json: a.json,
759 raw_table: a.raw_table,
760 profile: a.profile,
761 },
762 &mut out,
763 )?;
764 std::process::exit(exit);
765 }
766 let args = cli::doctor::DoctorArgs {
767 remote: a.remote,
768 json: a.json,
769 fail_on_warn: a.fail_on_warn,
770 };
771 let join = tokio::task::spawn_blocking(move || {
772 let stdout = std::io::stdout();
773 let stderr = std::io::stderr();
774 let mut so = stdout.lock();
775 let mut se = stderr.lock();
776 let mut out = cli::CliOutput::from_std(&mut so, &mut se);
777 cli::doctor::run(&db_path_doctor, &args, &mut out)
778 })
779 .await;
780 match join {
781 Ok(Ok(0)) => Ok(()),
782 Ok(Ok(code)) => std::process::exit(code),
783 Ok(Err(e)) => Err(e),
784 Err(e) => Err(anyhow::anyhow!("doctor task join failed: {e}")),
785 }
786 }
787 Command::Boot(a) => {
788 let stdout = std::io::stdout();
795 let stderr = std::io::stderr();
796 let mut so = stdout.lock();
797 let mut se = stderr.lock();
798 let mut out = cli::CliOutput::from_std(&mut so, &mut se);
799 crate::audit::emit(crate::audit::EventBuilder::new(
802 crate::audit::AuditAction::SessionBoot,
803 crate::audit::actor(
804 cli_agent_id.as_deref().unwrap_or("anonymous"),
805 "explicit_or_default",
806 None,
807 ),
808 crate::audit::target_sweep(a.namespace.as_deref().unwrap_or("auto")),
809 ));
810 cli::boot::run(&db_path, &a, app_config, &mut out)
811 }
812 Command::Install(a) => {
813 let stdout = std::io::stdout();
817 let stderr = std::io::stderr();
818 let mut so = stdout.lock();
819 let mut se = stderr.lock();
820 let mut out = cli::CliOutput::from_std(&mut so, &mut se);
821 cli::install::run(&a, &mut out)
822 }
823 Command::Wrap(a) => {
824 let stdout = std::io::stdout();
830 let stderr = std::io::stderr();
831 let mut so = stdout.lock();
832 let mut se = stderr.lock();
833 let mut out = cli::CliOutput::from_std(&mut so, &mut se);
834 let code = cli::wrap::run(&db_path, &a, app_config, &mut out)?;
835 drop(out);
838 drop(so);
839 drop(se);
840 if code == 0 {
841 Ok(())
842 } else {
843 std::process::exit(code);
844 }
845 }
846 Command::Logs(a) => {
847 let stdout = std::io::stdout();
848 let stderr = std::io::stderr();
849 let mut so = stdout.lock();
850 let mut se = stderr.lock();
851 let mut out = cli::CliOutput::from_std(&mut so, &mut se);
852 cli::logs::run(a, app_config, &mut out)
853 }
854 Command::Audit(a) => {
855 let stdout = std::io::stdout();
856 let stderr = std::io::stderr();
857 let mut so = stdout.lock();
858 let mut se = stderr.lock();
859 let mut out = cli::CliOutput::from_std(&mut so, &mut se);
860 match cli::audit::run(a, app_config, &mut out)? {
861 0 => Ok(()),
862 code => std::process::exit(code),
863 }
864 }
865 };
866
867 if result.is_ok()
869 && let Some(cp_path) = db_path_for_checkpoint
870 && let Ok(conn) = db::open(&cp_path)
871 {
872 let _ = db::checkpoint(&conn);
873 }
874
875 result
876}
877
878#[must_use]
885pub fn is_write_command(cmd: &Command) -> bool {
886 matches!(
887 cmd,
888 Command::Store(_)
889 | Command::Update(_)
890 | Command::Delete(_)
891 | Command::Promote(_)
892 | Command::Forget(_)
893 | Command::Link(_)
894 | Command::Consolidate(_)
895 | Command::Resolve(_)
896 | Command::Sync(_)
897 | Command::SyncDaemon(_)
898 | Command::Import(_)
899 | Command::AutoConsolidate(_)
900 | Command::Gc
901 )
902}
903
904pub fn passphrase_from_file(path: &Path) -> Result<String> {
917 let raw = std::fs::read_to_string(path)
918 .with_context(|| format!("reading passphrase file {}", path.display()))?;
919 let passphrase = raw.trim_end_matches(['\n', '\r']).to_string();
920 if passphrase.is_empty() {
921 anyhow::bail!("passphrase file {} is empty", path.display());
922 }
923 Ok(passphrase)
924}
925
926pub fn apply_anonymize_default(app_config: &AppConfig) {
935 if app_config.effective_anonymize_default() && std::env::var("AI_MEMORY_ANONYMIZE").is_err() {
938 unsafe { std::env::set_var("AI_MEMORY_ANONYMIZE", "1") };
940 }
941}
942
943pub async fn build_embedder(feature_tier: FeatureTier, app_config: &AppConfig) -> Option<Embedder> {
957 let tier_config = feature_tier.config();
958 let Some(emb_model) = tier_config.embedding_model else {
959 tracing::info!(
960 "embedder disabled — tier={} keyword-only (FTS5); semantic recall not wired",
961 feature_tier.as_str()
962 );
963 return None;
964 };
965 let embed_url = app_config.effective_embed_url().to_string();
966 let build = match tokio::task::spawn_blocking(move || {
972 let embed_client = llm::OllamaClient::new_with_url(&embed_url, "nomic-embed-text")
973 .ok()
974 .map(Arc::new);
975 embeddings::Embedder::for_model(emb_model, embed_client)
976 })
977 .await
978 {
979 Ok(b) => b,
980 Err(e) => {
981 tracing::error!("embedder spawn_blocking join failed: {e}");
982 return None;
983 }
984 };
985 match build {
986 Ok(emb) => {
987 tracing::info!(
988 "embedder loaded ({}) — tier={} semantic recall enabled",
989 emb.model_description(),
990 feature_tier.as_str()
991 );
992 Some(emb)
993 }
994 Err(e) => {
995 tracing::error!(
1003 "EMBEDDER LOAD FAILED — tier={} requested semantic features, \
1004 but embedder init errored: {e}. Daemon falls back to keyword-only. \
1005 Semantic recall, sync_push embedding refresh (#322), and HNSW index \
1006 will be NO-OPS. Check network egress to HuggingFace Hub + available \
1007 memory for model weights. To force keyword-only explicitly (silences \
1008 this error), set `tier = \"keyword\"` in config.toml.",
1009 feature_tier.as_str()
1010 );
1011 None
1012 }
1013 }
1014}
1015
1016#[must_use]
1022pub fn build_vector_index(conn: &Connection, embedder_present: bool) -> Option<VectorIndex> {
1023 if !embedder_present {
1024 return None;
1025 }
1026 match db::get_all_embeddings(conn) {
1027 Ok(entries) if !entries.is_empty() => Some(hnsw::VectorIndex::build(entries)),
1028 _ => Some(hnsw::VectorIndex::empty()),
1029 }
1030}
1031
1032#[must_use]
1041pub fn spawn_gc_loop(
1042 state: Db,
1043 archive_max_days: Option<i64>,
1044 interval: Duration,
1045) -> JoinHandle<()> {
1046 tokio::spawn(async move {
1047 loop {
1048 tokio::time::sleep(interval).await;
1049 let lock = state.lock().await;
1050 match db::gc(&lock.0, lock.3) {
1051 Ok(n) if n > 0 => tracing::info!("gc: expired {n} memories"),
1052 _ => {}
1053 }
1054 match db::auto_purge_archive(&lock.0, archive_max_days) {
1056 Ok(n) if n > 0 => tracing::info!("gc: purged {n} old archived memories"),
1057 _ => {}
1058 }
1059 }
1060 })
1061}
1062
1063#[must_use]
1067pub fn spawn_wal_checkpoint_loop(state: Db, interval: Duration) -> JoinHandle<()> {
1068 let half = interval / 2;
1069 tokio::spawn(async move {
1070 tokio::time::sleep(half).await;
1073 loop {
1074 {
1075 let lock = state.lock().await;
1076 match db::checkpoint(&lock.0) {
1077 Ok(()) => tracing::debug!("wal checkpoint: ok"),
1078 Err(e) => tracing::warn!("wal checkpoint failed: {e}"),
1079 }
1080 }
1081 tokio::time::sleep(interval).await;
1082 }
1083 })
1084}
1085
1086#[must_use]
1097pub fn build_router(app_state: AppState, api_key_state: ApiKeyState) -> Router {
1098 crate::build_router(api_key_state, app_state)
1099}
1100
1101pub struct ServeBootstrap {
1107 pub app_state: AppState,
1108 pub api_key_state: ApiKeyState,
1109 pub db_state: Db,
1110 pub archive_max_days: Option<i64>,
1111 pub task_handles: Vec<JoinHandle<()>>,
1112}
1113
1114pub async fn bootstrap_serve(
1117 db_path: &Path,
1118 args: &ServeArgs,
1119 app_config: &AppConfig,
1120) -> Result<ServeBootstrap> {
1121 let resolved_ttl = app_config.effective_ttl();
1122 let archive_on_gc = app_config.effective_archive_on_gc();
1123 let conn = db::open(db_path)?;
1124
1125 let feature_tier = app_config.effective_tier(None);
1132 let tier_config = feature_tier.config();
1133 let embedder = build_embedder(feature_tier, app_config).await;
1134 let vector_index = build_vector_index(&conn, embedder.is_some());
1135
1136 let db_state: Db = Arc::new(Mutex::new((
1137 conn,
1138 db_path.to_path_buf(),
1139 resolved_ttl,
1140 archive_on_gc,
1141 )));
1142
1143 let federation = federation::FederationConfig::build(
1147 args.quorum_writes,
1148 &args.quorum_peers,
1149 std::time::Duration::from_millis(args.quorum_timeout_ms),
1150 args.quorum_client_cert.as_deref(),
1151 args.quorum_client_key.as_deref(),
1152 args.quorum_ca_cert.as_deref(),
1153 format!("host:{}", gethostname::gethostname().to_string_lossy()),
1154 )
1155 .context("federation config")?;
1156
1157 let mut task_handles: Vec<JoinHandle<()>> = Vec::new();
1158
1159 if let Some(ref fed) = federation {
1160 tracing::info!(
1161 "federation enabled: W={} over {} peer(s), timeout {}ms",
1162 fed.policy.w,
1163 fed.peer_count(),
1164 args.quorum_timeout_ms,
1165 );
1166 if args.catchup_interval_secs > 0 {
1169 let interval = std::time::Duration::from_secs(args.catchup_interval_secs);
1170 tracing::info!(
1171 "catchup loop enabled: polling {} peer(s) every {}s",
1172 fed.peer_count(),
1173 args.catchup_interval_secs,
1174 );
1175 federation::spawn_catchup_loop(fed.clone(), db_state.clone(), interval);
1176 } else {
1177 tracing::info!("catchup loop disabled (--catchup-interval-secs=0)");
1178 }
1179 }
1180
1181 let app_state = AppState {
1182 db: db_state.clone(),
1183 embedder: Arc::new(embedder),
1184 vector_index: Arc::new(Mutex::new(vector_index)),
1185 federation: Arc::new(federation),
1186 tier_config: Arc::new(tier_config),
1187 scoring: Arc::new(app_config.effective_scoring()),
1188 };
1189
1190 task_handles.push(spawn_gc_loop(
1192 db_state.clone(),
1193 app_config.archive_max_days,
1194 Duration::from_secs(GC_INTERVAL_SECS),
1195 ));
1196
1197 task_handles.push(spawn_wal_checkpoint_loop(
1206 db_state.clone(),
1207 Duration::from_secs(WAL_CHECKPOINT_INTERVAL_SECS),
1208 ));
1209
1210 let api_key_state = ApiKeyState {
1211 key: app_config.api_key.clone(),
1212 };
1213 if api_key_state.key.is_some() {
1214 tracing::info!("API key authentication enabled");
1215 }
1216
1217 Ok(ServeBootstrap {
1218 app_state,
1219 api_key_state,
1220 db_state,
1221 archive_max_days: app_config.archive_max_days,
1222 task_handles,
1223 })
1224}
1225
1226fn init_tracing() {
1230 let _ = tracing_subscriber::fmt()
1231 .with_env_filter(
1232 EnvFilter::from_default_env()
1233 .add_directive("ai_memory=info".parse().unwrap())
1234 .add_directive("tower_http=info".parse().unwrap()),
1235 )
1236 .try_init();
1237}
1238
1239#[allow(clippy::too_many_lines)]
1245pub async fn serve(db_path: PathBuf, args: ServeArgs, app_config: &AppConfig) -> Result<()> {
1246 init_tracing();
1247
1248 let bootstrap = bootstrap_serve(&db_path, &args, app_config).await?;
1249
1250 let addr = format!("{}:{}", args.host, args.port);
1251 tracing::info!("database: {}", db_path.display());
1252
1253 let shutdown_state = bootstrap.db_state.clone();
1255 let shutdown = async move {
1256 let _ = tokio::signal::ctrl_c().await;
1257 tracing::info!("shutting down — checkpointing WAL");
1258 let lock = shutdown_state.lock().await;
1259 let _ = db::checkpoint(&lock.0);
1260 };
1261
1262 if let (Some(cert), Some(key)) = (&args.tls_cert, &args.tls_key) {
1267 let _ = rustls::crypto::ring::default_provider().install_default();
1271 let tls_config = if let Some(allowlist_path) = &args.mtls_allowlist {
1275 tracing::info!(
1276 "mTLS enabled — client certs required. Allowlist: {}",
1277 allowlist_path.display()
1278 );
1279 tls::load_mtls_rustls_config(cert, key, allowlist_path).await?
1280 } else {
1281 tracing::warn!(
1282 "TLS enabled but mTLS NOT configured — sync endpoints \
1283 (/api/v1/sync/push, /api/v1/sync/since) accept any client. \
1284 Set --mtls-allowlist for production peer-mesh deployments \
1285 (red-team #231)."
1286 );
1287 tls::load_rustls_config(cert, key).await?
1288 };
1289 let app = build_router(bootstrap.app_state, bootstrap.api_key_state);
1290 tracing::info!("ai-memory listening on https://{addr}");
1291 let socket_addr: std::net::SocketAddr = addr.parse()?;
1292 let grace = std::time::Duration::from_secs(args.shutdown_grace_secs);
1298 let handle = axum_server::Handle::new();
1299 let handle_clone = handle.clone();
1300 tokio::spawn(async move {
1301 shutdown.await;
1302 handle_clone.graceful_shutdown(Some(grace));
1303 });
1304 axum_server::bind_rustls(socket_addr, tls_config)
1305 .handle(handle)
1306 .serve(app.into_make_service())
1307 .await?;
1308 } else {
1309 tracing::warn!(
1310 "TLS NOT enabled — sync endpoints (/api/v1/sync/push, \
1311 /api/v1/sync/since) accept any caller over plain HTTP. \
1312 Set --tls-cert + --tls-key + --mtls-allowlist for production \
1313 peer-mesh deployments (red-team #231)."
1314 );
1315 tracing::info!("ai-memory listening on http://{addr}");
1316 serve_http_with_shutdown_future(
1323 &addr,
1324 bootstrap.api_key_state,
1325 bootstrap.app_state,
1326 shutdown,
1327 )
1328 .await?;
1329 }
1330 Ok(())
1331}
1332
1333fn cmd_bench(args: &BenchArgs) -> Result<()> {
1338 let iterations = args.iterations.clamp(1, 100_000);
1339 let warmup = args.warmup.min(10_000);
1340 let regression_threshold = args.regression_threshold.clamp(0.0, 1000.0);
1341 let conn = db::open(Path::new(":memory:"))?;
1345 let config = bench::BenchConfig {
1346 iterations,
1347 warmup,
1348 namespace: bench::BENCH_NAMESPACE.to_string(),
1349 };
1350 let results = bench::run(&conn, &config)?;
1351
1352 let regressions = if let Some(path) = &args.baseline {
1353 let baseline = bench::load_baseline(Path::new(path))?;
1354 Some(bench::compare_against_baseline(
1355 &results,
1356 &baseline,
1357 regression_threshold,
1358 ))
1359 } else {
1360 None
1361 };
1362
1363 if args.json {
1364 println!(
1365 "{}",
1366 serde_json::to_string_pretty(&serde_json::json!({
1367 "iterations": iterations,
1368 "warmup": warmup,
1369 "results": results,
1370 "regressions": regressions,
1371 }))?
1372 );
1373 } else {
1374 print!("{}", bench::render_table(&results));
1375 if let Some(rows) = ®ressions {
1376 println!();
1377 print!("{}", bench::render_regression_table(rows));
1378 }
1379 }
1380
1381 if let Some(history_path) = &args.history {
1382 let captured_at = chrono::Utc::now().to_rfc3339();
1383 bench::append_history(history_path, &captured_at, iterations, warmup, &results)?;
1384 let mut stderr = std::io::stderr().lock();
1385 let _ = writeln!(
1386 stderr,
1387 "bench: appended run to history file {}",
1388 history_path.display()
1389 );
1390 }
1391
1392 let budget_failed = results
1393 .iter()
1394 .any(|r| matches!(r.status, bench::Status::Fail));
1395 let regression_failed = regressions
1396 .as_ref()
1397 .is_some_and(|rows| rows.iter().any(|r| r.regressed));
1398
1399 if budget_failed && regression_failed {
1400 anyhow::bail!(
1401 "bench: at least one operation exceeded its p95 budget by >10% AND regressed >{regression_threshold:.1}% vs baseline"
1402 );
1403 }
1404 if budget_failed {
1405 anyhow::bail!("bench: at least one operation exceeded its p95 budget by >10%");
1406 }
1407 if regression_failed {
1408 anyhow::bail!(
1409 "bench: at least one operation regressed >{regression_threshold:.1}% vs baseline"
1410 );
1411 }
1412 Ok(())
1413}
1414
1415#[cfg(feature = "sal")]
1416async fn cmd_migrate(args: &MigrateArgs) -> Result<()> {
1417 let src = migrate::open_store(&args.from)
1418 .await
1419 .context("open source store")?;
1420 let dst = migrate::open_store(&args.to)
1421 .await
1422 .context("open destination store")?;
1423 let report = migrate::migrate(
1424 src.as_ref(),
1425 dst.as_ref(),
1426 args.batch,
1427 args.namespace.clone(),
1428 args.dry_run,
1429 )
1430 .await;
1431 if args.json {
1432 let value = serde_json::json!({
1433 "from_url": args.from,
1434 "to_url": args.to,
1435 "memories_read": report.memories_read,
1436 "memories_written": report.memories_written,
1437 "batches": report.batches,
1438 "errors": report.errors,
1439 "dry_run": report.dry_run,
1440 });
1441 println!("{}", serde_json::to_string_pretty(&value)?);
1442 } else {
1443 println!("migration report");
1444 println!(" from: {}", args.from);
1445 println!(" to: {}", args.to);
1446 println!(" memories_read: {}", report.memories_read);
1447 println!(" memories_written: {}", report.memories_written);
1448 println!(" batches: {}", report.batches);
1449 println!(" dry_run: {}", report.dry_run);
1450 println!(" errors: {}", report.errors.len());
1451 for e in &report.errors {
1452 println!(" - {e}");
1453 }
1454 }
1455 if !report.errors.is_empty() {
1456 anyhow::bail!("migration completed with {} error(s)", report.errors.len());
1457 }
1458 Ok(())
1459}
1460
1461pub async fn serve_http_with_shutdown(
1476 addr: &str,
1477 api_key_state: ApiKeyState,
1478 app_state: AppState,
1479 shutdown: Arc<Notify>,
1480) -> Result<()> {
1481 serve_http_with_shutdown_future(addr, api_key_state, app_state, async move {
1482 shutdown.notified().await;
1483 })
1484 .await
1485}
1486
1487pub async fn serve_http_with_shutdown_future<F>(
1495 addr: &str,
1496 api_key_state: ApiKeyState,
1497 app_state: AppState,
1498 shutdown: F,
1499) -> Result<()>
1500where
1501 F: std::future::Future<Output = ()> + Send + 'static,
1502{
1503 let app = crate::build_router(api_key_state, app_state);
1504 let listener = tokio::net::TcpListener::bind(addr)
1505 .await
1506 .with_context(|| format!("bind {addr}"))?;
1507 axum::serve(listener, app)
1508 .with_graceful_shutdown(shutdown)
1509 .await
1510 .context("axum::serve")?;
1511 Ok(())
1512}
1513
1514pub async fn sync_cycle_once(
1521 client: &reqwest::Client,
1522 db_path: &Path,
1523 local_agent_id: &str,
1524 peer_url: &str,
1525 api_key: Option<&str>,
1526 batch_size: usize,
1527) -> Result<()> {
1528 let peer_url = peer_url.trim_end_matches('/');
1529
1530 let since = {
1532 let conn = db::open(db_path)?;
1533 db::sync_state_load(&conn, local_agent_id)?
1534 .entries
1535 .get(peer_url)
1536 .cloned()
1537 };
1538
1539 let mut pull_url = format!(
1540 "{peer_url}/api/v1/sync/since?limit={batch_size}&peer={}",
1541 urlencoding_minimal(local_agent_id)
1542 );
1543 if let Some(ref s) = since {
1544 pull_url.push_str("&since=");
1545 pull_url.push_str(&urlencoding_minimal(s));
1546 }
1547
1548 let mut req = client.get(&pull_url).header("x-agent-id", local_agent_id);
1549 if let Some(key) = api_key {
1550 req = req.header("x-api-key", key);
1551 }
1552 let resp = req.send().await?;
1553 if !resp.status().is_success() {
1554 anyhow::bail!("sync-daemon: pull status {}", resp.status());
1555 }
1556 let pulled: SyncSinceResponse = resp.json().await?;
1557 let pull_count = pulled.memories.len();
1558 let latest_pulled = pulled.memories.last().map(|m| m.updated_at.clone());
1559
1560 {
1561 let conn = db::open(db_path)?;
1562 for mem in &pulled.memories {
1563 if crate::validate::validate_memory(mem).is_ok() {
1564 let _ = db::insert_if_newer(&conn, mem);
1565 }
1566 }
1567 if let Some(ref at) = latest_pulled {
1568 db::sync_state_observe(&conn, local_agent_id, peer_url, at)?;
1569 }
1570 }
1571
1572 let last_pushed = {
1574 let conn = db::open(db_path)?;
1575 db::sync_state_last_pushed(&conn, local_agent_id, peer_url)
1576 };
1577 let outgoing = {
1578 let conn = db::open(db_path)?;
1579 db::memories_updated_since(&conn, last_pushed.as_deref(), batch_size)?
1580 };
1581 let push_count = outgoing.len();
1582 let latest_pushed = outgoing.last().map(|m| m.updated_at.clone());
1583
1584 if !outgoing.is_empty() {
1585 let body = serde_json::json!({
1586 "sender_agent_id": local_agent_id,
1587 "sender_clock": { "entries": {} },
1588 "memories": outgoing,
1589 "dry_run": false,
1590 });
1591 let mut req = client
1592 .post(format!("{peer_url}/api/v1/sync/push"))
1593 .header("x-agent-id", local_agent_id)
1594 .header("content-type", "application/json")
1595 .json(&body);
1596 if let Some(key) = api_key {
1597 req = req.header("x-api-key", key);
1598 }
1599 let resp = req.send().await?;
1600 if !resp.status().is_success() {
1601 anyhow::bail!("sync-daemon: push status {}", resp.status());
1602 }
1603 if let Some(at) = latest_pushed {
1604 let conn = db::open(db_path)?;
1605 db::sync_state_record_push(&conn, local_agent_id, peer_url, &at)?;
1606 }
1607 }
1608
1609 tracing::info!("sync-daemon: peer={peer_url} pulled={pull_count} pushed={push_count}");
1610 Ok(())
1611}
1612
1613pub async fn run_sync_daemon_with_shutdown(
1621 db_path: PathBuf,
1622 local_agent_id: String,
1623 peers: Vec<String>,
1624 api_key: Option<String>,
1625 interval_secs: u64,
1626 batch_size: usize,
1627 shutdown: Arc<Notify>,
1628) -> Result<()> {
1629 let client = reqwest::Client::builder()
1630 .timeout(Duration::from_secs(30))
1631 .build()?;
1632 run_sync_daemon_with_shutdown_using_client(
1633 client,
1634 db_path,
1635 local_agent_id,
1636 peers,
1637 api_key,
1638 interval_secs,
1639 batch_size,
1640 shutdown,
1641 )
1642 .await
1643}
1644
1645pub async fn run_sync_daemon_with_shutdown_using_client(
1652 client: reqwest::Client,
1653 db_path: PathBuf,
1654 local_agent_id: String,
1655 peers: Vec<String>,
1656 api_key: Option<String>,
1657 interval_secs: u64,
1658 batch_size: usize,
1659 shutdown: Arc<Notify>,
1660) -> Result<()> {
1661 let interval = interval_secs.max(1);
1662 let batch_size = batch_size.max(1);
1663
1664 let db_path_owned: Arc<Path> = Arc::from(db_path.as_path());
1665 let local_agent_id_arc: Arc<str> = Arc::from(local_agent_id.as_str());
1666 let api_key_arc: Option<Arc<str>> = api_key.as_deref().map(Arc::from);
1667 let peers_arc: Vec<Arc<str>> = peers.iter().map(|s| Arc::from(s.as_str())).collect();
1668 loop {
1669 let mut set: tokio::task::JoinSet<()> = tokio::task::JoinSet::new();
1670 for peer_url in &peers_arc {
1671 let client = client.clone();
1672 let db_path = db_path_owned.clone();
1673 let local_agent_id = local_agent_id_arc.clone();
1674 let peer_url = peer_url.clone();
1675 let api_key = api_key_arc.clone();
1676 set.spawn(async move {
1677 if let Err(e) = sync_cycle_once(
1678 &client,
1679 &db_path,
1680 &local_agent_id,
1681 &peer_url,
1682 api_key.as_deref(),
1683 batch_size,
1684 )
1685 .await
1686 {
1687 tracing::warn!("sync-daemon: peer {peer_url} cycle failed: {e}");
1688 }
1689 });
1690 }
1691 while set.join_next().await.is_some() {}
1692
1693 tokio::select! {
1694 () = tokio::time::sleep(Duration::from_secs(interval)) => {}
1695 () = shutdown.notified() => {
1696 tracing::info!("sync-daemon: shutdown signal received");
1697 return Ok(());
1698 }
1699 }
1700 }
1701}
1702
1703pub async fn run_curator_daemon_with_shutdown(
1711 db_path: PathBuf,
1712 cfg: crate::curator::CuratorConfig,
1713 shutdown: Arc<Notify>,
1714) -> Result<()> {
1715 let shutdown_flag = Arc::new(AtomicBool::new(false));
1716 let shutdown_flag_for_signal = shutdown_flag.clone();
1717 tokio::spawn(async move {
1718 shutdown.notified().await;
1719 shutdown_flag_for_signal.store(true, Ordering::Relaxed);
1720 });
1721
1722 let llm_arc: Option<Arc<crate::llm::OllamaClient>> = None;
1723 let db_owned = db_path;
1724 tokio::task::spawn_blocking(move || {
1725 crate::curator::run_daemon(db_owned, llm_arc, cfg, shutdown_flag);
1726 })
1727 .await
1728 .map_err(|e| anyhow::anyhow!("curator daemon join: {e}"))?;
1729 Ok(())
1730}
1731
1732#[allow(clippy::too_many_arguments)]
1737pub async fn run_curator_daemon_with_primitives(
1738 db_path: PathBuf,
1739 interval_secs: u64,
1740 max_ops_per_cycle: usize,
1741 dry_run: bool,
1742 include_namespaces: Vec<String>,
1743 exclude_namespaces: Vec<String>,
1744 ollama_model: Option<String>,
1745 shutdown: Arc<Notify>,
1746) -> Result<()> {
1747 let cfg = crate::curator::CuratorConfig {
1748 interval_secs,
1749 max_ops_per_cycle,
1750 dry_run,
1751 include_namespaces,
1752 exclude_namespaces,
1753 };
1754 let llm: Option<Arc<crate::llm::OllamaClient>> =
1755 ollama_model.and_then(|m| crate::llm::OllamaClient::new(&m).ok().map(Arc::new));
1756
1757 let shutdown_flag = Arc::new(AtomicBool::new(false));
1758 let shutdown_flag_for_signal = shutdown_flag.clone();
1759 tokio::spawn(async move {
1760 shutdown.notified().await;
1761 shutdown_flag_for_signal.store(true, Ordering::Relaxed);
1762 });
1763
1764 tokio::task::spawn_blocking(move || {
1765 crate::curator::run_daemon(db_path, llm, cfg, shutdown_flag);
1766 })
1767 .await
1768 .map_err(|e| anyhow::anyhow!("curator daemon join: {e}"))?;
1769 Ok(())
1770}
1771
1772fn urlencoding_minimal(s: &str) -> String {
1781 use std::fmt::Write as _;
1782 let mut out = String::with_capacity(s.len());
1783 for b in s.bytes() {
1784 match b {
1785 b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => {
1786 out.push(b as char);
1787 }
1788 _ => {
1789 let _ = write!(out, "%{b:02X}");
1790 }
1791 }
1792 }
1793 out
1794}
1795
1796#[derive(serde::Deserialize)]
1801struct SyncSinceResponse {
1802 #[allow(dead_code)]
1803 count: usize,
1804 #[allow(dead_code)]
1805 limit: usize,
1806 memories: Vec<crate::models::Memory>,
1807}
1808
1809#[allow(dead_code)]
1812fn _imports_in_use(_: Instant, _: Duration) {}
1813
1814#[cfg(test)]
1819mod tests {
1820 use super::*;
1821 use crate::cli::test_utils::TestEnv;
1822 use crate::config::ResolvedTtl;
1823 use axum::body::Body;
1824 use axum::http::{Request, StatusCode};
1825 use tower::ServiceExt as _;
1826
1827 fn args_with_db(_db: &Path) -> ServeArgs {
1830 ServeArgs {
1831 host: "127.0.0.1".to_string(),
1832 port: 0,
1833 tls_cert: None,
1834 tls_key: None,
1835 mtls_allowlist: None,
1836 shutdown_grace_secs: 30,
1837 quorum_writes: 0,
1838 quorum_peers: vec![],
1839 quorum_timeout_ms: 2000,
1840 quorum_client_cert: None,
1841 quorum_client_key: None,
1842 quorum_ca_cert: None,
1843 catchup_interval_secs: 0,
1844 }
1845 }
1846
1847 fn keyword_app_state(db_path: &Path) -> AppState {
1848 let conn = db::open(db_path).unwrap();
1849 let db_state: Db = Arc::new(Mutex::new((
1850 conn,
1851 db_path.to_path_buf(),
1852 ResolvedTtl::default(),
1853 true,
1854 )));
1855 AppState {
1856 db: db_state,
1857 embedder: Arc::new(None),
1858 vector_index: Arc::new(Mutex::new(None)),
1859 federation: Arc::new(None),
1860 tier_config: Arc::new(FeatureTier::Keyword.config()),
1861 scoring: Arc::new(crate::config::ResolvedScoring::default()),
1862 }
1863 }
1864
1865 fn env_var_lock() -> std::sync::MutexGuard<'static, ()> {
1869 use std::sync::OnceLock;
1870 static LOCK: OnceLock<std::sync::Mutex<()>> = OnceLock::new();
1871 LOCK.get_or_init(|| std::sync::Mutex::new(()))
1872 .lock()
1873 .unwrap_or_else(|e| e.into_inner())
1874 }
1875
1876 #[test]
1879 fn test_is_write_command_all_variants() {
1880 let writes: &[&[&str]] = &[
1887 &["ai-memory", "store", "title", "content"],
1888 &["ai-memory", "update", "id123", "--title", "t"],
1889 &["ai-memory", "delete", "id123"],
1890 &["ai-memory", "promote", "id123"],
1891 &["ai-memory", "forget", "pattern"],
1892 &["ai-memory", "link", "a", "b"],
1893 &["ai-memory", "consolidate", "ids"],
1894 &["ai-memory", "resolve", "a", "b"],
1895 &["ai-memory", "sync", "--peer", "/tmp/peer.db"],
1896 &[
1897 "ai-memory",
1898 "sync-daemon",
1899 "--peers",
1900 "http://x",
1901 "--interval-secs",
1902 "60",
1903 ],
1904 &["ai-memory", "import"],
1905 &["ai-memory", "auto-consolidate"],
1906 &["ai-memory", "gc"],
1907 ];
1908 let mut writes_checked = 0;
1909 for argv in writes {
1910 if let Ok(cli) = Cli::try_parse_from(*argv) {
1914 assert!(
1915 is_write_command(&cli.command),
1916 "expected write for {argv:?}"
1917 );
1918 writes_checked += 1;
1919 }
1920 }
1921 assert!(
1922 writes_checked >= 5,
1923 "expected at least 5 write variants checked, got {writes_checked}"
1924 );
1925
1926 let reads: &[&[&str]] = &[
1928 &["ai-memory", "mcp"],
1929 &["ai-memory", "recall", "context"],
1930 &["ai-memory", "search", "query"],
1931 &["ai-memory", "get", "id"],
1932 &["ai-memory", "list"],
1933 &["ai-memory", "stats"],
1934 &["ai-memory", "namespaces"],
1935 &["ai-memory", "export"],
1936 &["ai-memory", "shell"],
1937 &["ai-memory", "man"],
1938 &["ai-memory", "completions", "bash"],
1939 &["ai-memory", "archive", "list"],
1940 &["ai-memory", "agents", "list"],
1941 &["ai-memory", "pending", "list"],
1942 &["ai-memory", "bench"],
1943 &["ai-memory", "serve", "--host", "127.0.0.1", "--port", "0"],
1944 ];
1945 let mut reads_checked = 0;
1946 for argv in reads {
1947 if let Ok(cli) = Cli::try_parse_from(*argv) {
1948 assert!(
1949 !is_write_command(&cli.command),
1950 "expected read for {argv:?}"
1951 );
1952 reads_checked += 1;
1953 }
1954 }
1955 assert!(
1956 reads_checked >= 8,
1957 "expected at least 8 read variants checked, got {reads_checked}"
1958 );
1959
1960 assert!(is_write_command(&Command::Gc));
1964 assert!(!is_write_command(&Command::Stats));
1965 assert!(!is_write_command(&Command::Namespaces));
1966 assert!(!is_write_command(&Command::Export));
1967 assert!(!is_write_command(&Command::Shell));
1968 assert!(!is_write_command(&Command::Man));
1969 assert!(!is_write_command(&Command::Mcp {
1970 tier: "keyword".to_string(),
1971 profile: None,
1972 }));
1973 }
1974
1975 #[tokio::test]
1978 async fn test_router_has_health_endpoint() {
1979 let env = TestEnv::fresh();
1980 let app_state = keyword_app_state(&env.db_path);
1981 let api_key_state = ApiKeyState { key: None };
1982 let router = build_router(app_state, api_key_state);
1983 let resp = router
1984 .oneshot(
1985 Request::builder()
1986 .method("GET")
1987 .uri("/api/v1/health")
1988 .body(Body::empty())
1989 .unwrap(),
1990 )
1991 .await
1992 .unwrap();
1993 assert_eq!(resp.status(), StatusCode::OK);
1994 }
1995
1996 #[tokio::test]
1997 async fn test_router_has_metrics_at_both_paths() {
1998 let env = TestEnv::fresh();
1999 let app_state = keyword_app_state(&env.db_path);
2000 let api_key_state = ApiKeyState { key: None };
2001 let r1 = build_router(app_state.clone(), api_key_state.clone())
2003 .oneshot(
2004 Request::builder()
2005 .method("GET")
2006 .uri("/metrics")
2007 .body(Body::empty())
2008 .unwrap(),
2009 )
2010 .await
2011 .unwrap();
2012 assert_eq!(r1.status(), StatusCode::OK);
2013 let r2 = build_router(app_state, api_key_state)
2015 .oneshot(
2016 Request::builder()
2017 .method("GET")
2018 .uri("/api/v1/metrics")
2019 .body(Body::empty())
2020 .unwrap(),
2021 )
2022 .await
2023 .unwrap();
2024 assert_eq!(r2.status(), StatusCode::OK);
2025 }
2026
2027 #[tokio::test]
2028 async fn test_router_lists_all_v1_memory_routes() {
2029 let env = TestEnv::fresh();
2030 let app_state = keyword_app_state(&env.db_path);
2031 let api_key_state = ApiKeyState { key: None };
2032 let router = build_router(app_state, api_key_state);
2033 let resp = router
2034 .oneshot(
2035 Request::builder()
2036 .method("GET")
2037 .uri("/api/v1/memories")
2038 .body(Body::empty())
2039 .unwrap(),
2040 )
2041 .await
2042 .unwrap();
2043 assert!(resp.status().is_success(), "got {}", resp.status());
2046 }
2047
2048 #[tokio::test]
2049 async fn test_router_applies_api_key_middleware_when_key_set() {
2050 let env = TestEnv::fresh();
2051 let app_state = keyword_app_state(&env.db_path);
2052 let api_key_state = ApiKeyState {
2053 key: Some("s3cret".to_string()),
2054 };
2055 let router = build_router(app_state, api_key_state);
2056 let resp = router
2057 .oneshot(
2058 Request::builder()
2059 .method("GET")
2060 .uri("/api/v1/memories")
2061 .body(Body::empty())
2062 .unwrap(),
2063 )
2064 .await
2065 .unwrap();
2066 assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
2067 }
2068
2069 #[tokio::test]
2070 async fn test_router_skips_api_key_middleware_when_key_none() {
2071 let env = TestEnv::fresh();
2072 let app_state = keyword_app_state(&env.db_path);
2073 let api_key_state = ApiKeyState { key: None };
2074 let router = build_router(app_state, api_key_state);
2075 let resp = router
2076 .oneshot(
2077 Request::builder()
2078 .method("GET")
2079 .uri("/api/v1/memories")
2080 .body(Body::empty())
2081 .unwrap(),
2082 )
2083 .await
2084 .unwrap();
2085 assert_eq!(resp.status(), StatusCode::OK);
2086 }
2087
2088 #[tokio::test]
2091 async fn test_build_embedder_keyword_tier_returns_none() {
2092 let cfg = AppConfig::default();
2093 let emb = build_embedder(FeatureTier::Keyword, &cfg).await;
2094 assert!(emb.is_none());
2095 }
2096
2097 #[tokio::test]
2098 async fn test_build_embedder_load_failure_returns_none() {
2099 }
2107
2108 #[test]
2111 fn test_build_vector_index_no_embedder_returns_none() {
2112 let env = TestEnv::fresh();
2113 let conn = db::open(&env.db_path).unwrap();
2114 assert!(build_vector_index(&conn, false).is_none());
2115 }
2116
2117 #[test]
2118 fn test_build_vector_index_empty_db_returns_empty_index() {
2119 let env = TestEnv::fresh();
2120 let conn = db::open(&env.db_path).unwrap();
2121 let idx = build_vector_index(&conn, true);
2122 assert!(
2123 idx.is_some(),
2124 "empty DB with embedder must yield empty index"
2125 );
2126 assert_eq!(idx.unwrap().len(), 0);
2127 }
2128
2129 #[tokio::test(start_paused = true)]
2132 async fn test_spawn_gc_loop_runs_and_can_be_aborted() {
2133 let env = TestEnv::fresh();
2134 let conn = db::open(&env.db_path).unwrap();
2135 let state: Db = Arc::new(Mutex::new((
2136 conn,
2137 env.db_path.clone(),
2138 ResolvedTtl::default(),
2139 true,
2140 )));
2141 let h = spawn_gc_loop(state, None, Duration::from_secs(60));
2142 tokio::time::advance(Duration::from_secs(61)).await;
2147 tokio::task::yield_now().await;
2149 h.abort();
2150 let err = h.await.unwrap_err();
2152 assert!(err.is_cancelled());
2153 }
2154
2155 #[tokio::test(start_paused = true)]
2156 async fn test_spawn_wal_checkpoint_loop_runs_and_can_be_aborted() {
2157 let env = TestEnv::fresh();
2158 let conn = db::open(&env.db_path).unwrap();
2159 let state: Db = Arc::new(Mutex::new((
2160 conn,
2161 env.db_path.clone(),
2162 ResolvedTtl::default(),
2163 true,
2164 )));
2165 let h = spawn_wal_checkpoint_loop(state, Duration::from_secs(60));
2166 tokio::time::advance(Duration::from_secs(31)).await;
2169 tokio::task::yield_now().await;
2170 tokio::time::advance(Duration::from_secs(60)).await;
2171 tokio::task::yield_now().await;
2172 h.abort();
2173 let err = h.await.unwrap_err();
2174 assert!(err.is_cancelled());
2175 }
2176
2177 #[test]
2180 fn test_passphrase_strips_trailing_newline() {
2181 let dir = tempfile::tempdir().unwrap();
2182 let p = dir.path().join("pass");
2183 std::fs::write(&p, "secret\n").unwrap();
2184 assert_eq!(passphrase_from_file(&p).unwrap(), "secret");
2185 }
2186
2187 #[test]
2188 fn test_passphrase_strips_trailing_crlf() {
2189 let dir = tempfile::tempdir().unwrap();
2190 let p = dir.path().join("pass");
2191 std::fs::write(&p, "secret\r\n").unwrap();
2192 assert_eq!(passphrase_from_file(&p).unwrap(), "secret");
2193 }
2194
2195 #[test]
2196 fn test_passphrase_empty_file_errors() {
2197 let dir = tempfile::tempdir().unwrap();
2198 let p = dir.path().join("empty");
2199 std::fs::write(&p, "").unwrap();
2200 let err = passphrase_from_file(&p).unwrap_err();
2201 assert!(
2202 err.to_string().contains("empty"),
2203 "expected 'empty' error, got: {err}"
2204 );
2205 }
2206
2207 #[test]
2208 fn test_passphrase_empty_after_trim_errors() {
2209 let dir = tempfile::tempdir().unwrap();
2213 let p = dir.path().join("nl-only");
2214 std::fs::write(&p, "\n").unwrap();
2215 let err = passphrase_from_file(&p).unwrap_err();
2216 assert!(err.to_string().contains("empty"));
2217 }
2218
2219 #[test]
2220 fn test_passphrase_nonexistent_file_errors() {
2221 let dir = tempfile::tempdir().unwrap();
2222 let p = dir.path().join("does-not-exist");
2223 let err = passphrase_from_file(&p).unwrap_err();
2224 assert!(
2225 err.to_string().contains("reading passphrase file")
2226 || err.chain().any(|e| e.to_string().contains("No such file"))
2227 || err.chain().any(|e| e.to_string().contains("cannot find")),
2228 "got: {err:#}"
2229 );
2230 }
2231
2232 #[test]
2233 fn test_passphrase_preserves_internal_whitespace() {
2234 let dir = tempfile::tempdir().unwrap();
2235 let p = dir.path().join("pass");
2236 std::fs::write(&p, "my pass phrase\n").unwrap();
2237 assert_eq!(passphrase_from_file(&p).unwrap(), "my pass phrase");
2238 }
2239
2240 #[test]
2243 fn test_anonymize_set_when_config_true_and_env_unset() {
2244 let _g = env_var_lock();
2245 unsafe { std::env::remove_var("AI_MEMORY_ANONYMIZE") };
2247 let mut cfg = AppConfig::default();
2248 cfg.identity = Some(crate::config::IdentityConfig {
2249 anonymize_default: true,
2250 });
2251 apply_anonymize_default(&cfg);
2252 assert_eq!(std::env::var("AI_MEMORY_ANONYMIZE").unwrap(), "1");
2253 unsafe { std::env::remove_var("AI_MEMORY_ANONYMIZE") };
2255 }
2256
2257 #[test]
2258 fn test_anonymize_unchanged_when_env_already_set() {
2259 let _g = env_var_lock();
2260 unsafe { std::env::set_var("AI_MEMORY_ANONYMIZE", "0") };
2262 let mut cfg = AppConfig::default();
2263 cfg.identity = Some(crate::config::IdentityConfig {
2264 anonymize_default: true,
2265 });
2266 apply_anonymize_default(&cfg);
2267 assert_eq!(std::env::var("AI_MEMORY_ANONYMIZE").unwrap(), "0");
2269 unsafe { std::env::remove_var("AI_MEMORY_ANONYMIZE") };
2271 }
2272
2273 #[test]
2274 fn test_anonymize_unchanged_when_config_false() {
2275 let _g = env_var_lock();
2276 unsafe { std::env::remove_var("AI_MEMORY_ANONYMIZE") };
2278 let cfg = AppConfig::default();
2279 apply_anonymize_default(&cfg);
2281 assert!(std::env::var("AI_MEMORY_ANONYMIZE").is_err());
2282 }
2283
2284 #[tokio::test]
2287 async fn test_bootstrap_serve_keyword_tier_no_embedder() {
2288 let env = TestEnv::fresh();
2289 let mut cfg = AppConfig::default();
2290 cfg.tier = Some("keyword".to_string());
2291 let args = args_with_db(&env.db_path);
2292 let bs = bootstrap_serve(&env.db_path, &args, &cfg).await.unwrap();
2293 assert!(bs.app_state.embedder.is_none());
2295 let vi = bs.app_state.vector_index.lock().await;
2296 assert!(vi.is_none());
2297 assert_eq!(bs.task_handles.len(), 2);
2299 for h in bs.task_handles {
2301 h.abort();
2302 }
2303 }
2304
2305 #[tokio::test]
2306 async fn test_bootstrap_serve_with_api_key_logs_enabled() {
2307 let env = TestEnv::fresh();
2308 let mut cfg = AppConfig::default();
2309 cfg.tier = Some("keyword".to_string());
2310 cfg.api_key = Some("test-key".to_string());
2311 let args = args_with_db(&env.db_path);
2312 let bs = bootstrap_serve(&env.db_path, &args, &cfg).await.unwrap();
2313 assert_eq!(bs.api_key_state.key.as_deref(), Some("test-key"));
2314 for h in bs.task_handles {
2315 h.abort();
2316 }
2317 }
2318
2319 #[tokio::test]
2320 async fn test_bootstrap_serve_federation_disabled_when_quorum_zero() {
2321 let env = TestEnv::fresh();
2322 let mut cfg = AppConfig::default();
2323 cfg.tier = Some("keyword".to_string());
2324 let args = args_with_db(&env.db_path);
2325 let bs = bootstrap_serve(&env.db_path, &args, &cfg).await.unwrap();
2326 assert!(bs.app_state.federation.is_none());
2327 for h in bs.task_handles {
2328 h.abort();
2329 }
2330 }
2331
2332 #[tokio::test]
2344 async fn test_bootstrap_serve_federation_enabled_attaches_config() {
2345 let env = TestEnv::fresh();
2350 let mut cfg = AppConfig::default();
2351 cfg.tier = Some("keyword".to_string());
2352 let mut args = args_with_db(&env.db_path);
2353 args.quorum_writes = 1;
2354 args.quorum_peers = vec!["http://127.0.0.1:65530".to_string()];
2355 args.quorum_timeout_ms = 100;
2356 args.catchup_interval_secs = 0;
2357 let bs = bootstrap_serve(&env.db_path, &args, &cfg).await.unwrap();
2358 assert!(bs.app_state.federation.is_some());
2359 for h in bs.task_handles {
2360 h.abort();
2361 }
2362 }
2363
2364 #[tokio::test]
2365 async fn test_bootstrap_serve_federation_enabled_with_catchup_loop() {
2366 let env = TestEnv::fresh();
2372 let mut cfg = AppConfig::default();
2373 cfg.tier = Some("keyword".to_string());
2374 let mut args = args_with_db(&env.db_path);
2375 args.quorum_writes = 1;
2376 args.quorum_peers = vec!["http://127.0.0.1:65531".to_string()];
2377 args.quorum_timeout_ms = 100;
2378 args.catchup_interval_secs = 3600; let bs = bootstrap_serve(&env.db_path, &args, &cfg).await.unwrap();
2380 assert!(bs.app_state.federation.is_some());
2381 for h in bs.task_handles {
2382 h.abort();
2383 }
2384 }
2385
2386 #[tokio::test]
2387 async fn test_bootstrap_serve_federation_invalid_peer_errors() {
2388 let env = TestEnv::fresh();
2392 let mut cfg = AppConfig::default();
2393 cfg.tier = Some("keyword".to_string());
2394 let mut args = args_with_db(&env.db_path);
2395 args.quorum_writes = 1;
2396 args.quorum_peers = vec![
2397 "http://127.0.0.1:65532".to_string(),
2398 "http://127.0.0.1:65532/".to_string(), ];
2400 let res = bootstrap_serve(&env.db_path, &args, &cfg).await;
2401 let err = match res {
2402 Ok(_) => panic!("expected error from duplicate peer URLs"),
2403 Err(e) => e,
2404 };
2405 let s = format!("{err:#}");
2406 assert!(
2407 s.contains("federation") || s.contains("duplicate"),
2408 "got: {s}"
2409 );
2410 }
2411
2412 #[test]
2415 fn test_build_vector_index_populated_db_returns_built_index() {
2416 let env = TestEnv::fresh();
2420 let conn = db::open(&env.db_path).unwrap();
2421 let now = chrono::Utc::now().to_rfc3339();
2423 let mem = crate::models::Memory {
2424 id: uuid::Uuid::new_v4().to_string(),
2425 tier: crate::models::Tier::Mid,
2426 namespace: "ns".to_string(),
2427 title: "t".to_string(),
2428 content: "c".to_string(),
2429 tags: vec![],
2430 priority: 5,
2431 confidence: 1.0,
2432 source: "test".to_string(),
2433 access_count: 0,
2434 created_at: now.clone(),
2435 updated_at: now,
2436 last_accessed_at: None,
2437 expires_at: None,
2438 metadata: crate::models::default_metadata(),
2439 };
2440 let id = db::insert(&conn, &mem).unwrap();
2441 db::set_embedding(&conn, &id, &[1.0, 0.0, 0.0]).unwrap();
2442 let idx = build_vector_index(&conn, true).expect("populated index");
2443 assert!(
2444 idx.len() >= 1,
2445 "expected non-empty index, got len={}",
2446 idx.len()
2447 );
2448 }
2449
2450 #[tokio::test(start_paused = true)]
2458 async fn test_spawn_gc_loop_purges_expired_memories() {
2459 let env = TestEnv::fresh();
2460 let conn = db::open(&env.db_path).unwrap();
2461 let past = (chrono::Utc::now() - chrono::Duration::days(1)).to_rfc3339();
2463 let now = chrono::Utc::now().to_rfc3339();
2464 let mem = crate::models::Memory {
2465 id: uuid::Uuid::new_v4().to_string(),
2466 tier: crate::models::Tier::Short,
2467 namespace: "ns-gc".to_string(),
2468 title: "stale".to_string(),
2469 content: "stale".to_string(),
2470 tags: vec![],
2471 priority: 1,
2472 confidence: 1.0,
2473 source: "test".to_string(),
2474 access_count: 0,
2475 created_at: now.clone(),
2476 updated_at: now,
2477 last_accessed_at: None,
2478 expires_at: Some(past),
2479 metadata: crate::models::default_metadata(),
2480 };
2481 db::insert(&conn, &mem).unwrap();
2482 drop(conn);
2483
2484 let conn = db::open(&env.db_path).unwrap();
2485 let state: Db = Arc::new(Mutex::new((
2486 conn,
2487 env.db_path.clone(),
2488 ResolvedTtl::default(),
2489 true,
2490 )));
2491 let h = spawn_gc_loop(state.clone(), Some(1), Duration::from_secs(60));
2494 tokio::time::advance(Duration::from_secs(61)).await;
2497 tokio::task::yield_now().await;
2498 tokio::time::advance(Duration::from_secs(61)).await;
2499 tokio::task::yield_now().await;
2500 h.abort();
2501 let _ = h.await;
2502 }
2503
2504 #[tokio::test(start_paused = true)]
2507 async fn test_spawn_wal_checkpoint_loop_runs_multiple_cycles() {
2508 let env = TestEnv::fresh();
2509 let conn = db::open(&env.db_path).unwrap();
2510 let state: Db = Arc::new(Mutex::new((
2511 conn,
2512 env.db_path.clone(),
2513 ResolvedTtl::default(),
2514 true,
2515 )));
2516 let h = spawn_wal_checkpoint_loop(state, Duration::from_secs(2));
2517 for _ in 0..4 {
2520 tokio::time::advance(Duration::from_secs(2)).await;
2521 tokio::task::yield_now().await;
2522 }
2523 h.abort();
2524 let _ = h.await;
2525 }
2526
2527 #[test]
2530 fn test_urlencoding_minimal_round_trip() {
2531 assert_eq!(urlencoding_minimal("abcXYZ-_.~"), "abcXYZ-_.~");
2533 assert_eq!(urlencoding_minimal("0123456789"), "0123456789");
2534 assert_eq!(urlencoding_minimal("a:b"), "a%3Ab");
2536 assert_eq!(urlencoding_minimal("a/b"), "a%2Fb");
2537 assert_eq!(urlencoding_minimal("a@b"), "a%40b");
2538 assert_eq!(urlencoding_minimal("a+b"), "a%2Bb");
2539 assert_eq!(urlencoding_minimal(" "), "%20");
2540 assert_eq!(urlencoding_minimal(""), "");
2542 assert_eq!(
2544 urlencoding_minimal("2024-01-02T03:04:05+00:00"),
2545 "2024-01-02T03%3A04%3A05%2B00%3A00"
2546 );
2547 }
2548
2549 fn no_config_env() -> std::sync::MutexGuard<'static, ()> {
2558 env_var_lock()
2563 }
2564
2565 #[tokio::test]
2566 async fn test_run_dispatch_stats_command() {
2567 let _g = no_config_env();
2568 let env = TestEnv::fresh();
2569 let cfg = AppConfig::default();
2570 let cli =
2571 Cli::try_parse_from(["ai-memory", "--db", env.db_path.to_str().unwrap(), "stats"])
2572 .unwrap();
2573 run(cli, &cfg).await.unwrap();
2574 }
2575
2576 #[tokio::test]
2577 async fn test_run_dispatch_namespaces_command() {
2578 let _g = no_config_env();
2579 let env = TestEnv::fresh();
2580 let cfg = AppConfig::default();
2581 let cli = Cli::try_parse_from([
2582 "ai-memory",
2583 "--db",
2584 env.db_path.to_str().unwrap(),
2585 "namespaces",
2586 ])
2587 .unwrap();
2588 run(cli, &cfg).await.unwrap();
2589 }
2590
2591 #[tokio::test]
2592 async fn test_run_dispatch_export_command() {
2593 let _g = no_config_env();
2594 let env = TestEnv::fresh();
2595 let cfg = AppConfig::default();
2596 let cli =
2597 Cli::try_parse_from(["ai-memory", "--db", env.db_path.to_str().unwrap(), "export"])
2598 .unwrap();
2599 run(cli, &cfg).await.unwrap();
2600 }
2601
2602 #[tokio::test]
2603 async fn test_run_dispatch_list_command() {
2604 let _g = no_config_env();
2605 let env = TestEnv::fresh();
2606 let cfg = AppConfig::default();
2607 let cli = Cli::try_parse_from(["ai-memory", "--db", env.db_path.to_str().unwrap(), "list"])
2608 .unwrap();
2609 run(cli, &cfg).await.unwrap();
2610 }
2611
2612 #[tokio::test]
2613 async fn test_run_dispatch_search_command() {
2614 let _g = no_config_env();
2615 let env = TestEnv::fresh();
2616 let cfg = AppConfig::default();
2617 let cli = Cli::try_parse_from([
2618 "ai-memory",
2619 "--db",
2620 env.db_path.to_str().unwrap(),
2621 "search",
2622 "anyq",
2623 ])
2624 .unwrap();
2625 run(cli, &cfg).await.unwrap();
2626 }
2627
2628 #[tokio::test]
2629 async fn test_run_dispatch_archive_list_command() {
2630 let _g = no_config_env();
2631 let env = TestEnv::fresh();
2632 let cfg = AppConfig::default();
2633 let cli = Cli::try_parse_from([
2634 "ai-memory",
2635 "--db",
2636 env.db_path.to_str().unwrap(),
2637 "archive",
2638 "list",
2639 ])
2640 .unwrap();
2641 run(cli, &cfg).await.unwrap();
2642 }
2643
2644 #[tokio::test]
2645 async fn test_run_dispatch_agents_list_command() {
2646 let _g = no_config_env();
2647 let env = TestEnv::fresh();
2648 let cfg = AppConfig::default();
2649 let cli = Cli::try_parse_from([
2650 "ai-memory",
2651 "--db",
2652 env.db_path.to_str().unwrap(),
2653 "agents",
2654 "list",
2655 ])
2656 .unwrap();
2657 run(cli, &cfg).await.unwrap();
2658 }
2659
2660 #[tokio::test]
2661 async fn test_run_dispatch_pending_list_command() {
2662 let _g = no_config_env();
2663 let env = TestEnv::fresh();
2664 let cfg = AppConfig::default();
2665 let cli = Cli::try_parse_from([
2666 "ai-memory",
2667 "--db",
2668 env.db_path.to_str().unwrap(),
2669 "pending",
2670 "list",
2671 ])
2672 .unwrap();
2673 run(cli, &cfg).await.unwrap();
2674 }
2675
2676 #[tokio::test]
2677 async fn test_run_dispatch_completions_command() {
2678 let _g = no_config_env();
2679 let env = TestEnv::fresh();
2680 let cfg = AppConfig::default();
2681 let cli = Cli::try_parse_from([
2682 "ai-memory",
2683 "--db",
2684 env.db_path.to_str().unwrap(),
2685 "completions",
2686 "bash",
2687 ])
2688 .unwrap();
2689 run(cli, &cfg).await.unwrap();
2690 }
2691
2692 #[tokio::test]
2693 async fn test_run_dispatch_man_command() {
2694 let _g = no_config_env();
2695 let env = TestEnv::fresh();
2696 let cfg = AppConfig::default();
2697 let cli = Cli::try_parse_from(["ai-memory", "--db", env.db_path.to_str().unwrap(), "man"])
2698 .unwrap();
2699 run(cli, &cfg).await.unwrap();
2700 }
2701
2702 #[tokio::test]
2703 async fn test_run_dispatch_gc_triggers_post_run_checkpoint() {
2704 let _g = no_config_env();
2707 let env = TestEnv::fresh();
2708 let cfg = AppConfig::default();
2709 let cli = Cli::try_parse_from(["ai-memory", "--db", env.db_path.to_str().unwrap(), "gc"])
2710 .unwrap();
2711 run(cli, &cfg).await.unwrap();
2712 }
2713
2714 #[tokio::test]
2715 async fn test_run_dispatch_resolve_command() {
2716 let _g = no_config_env();
2718 let env = TestEnv::fresh();
2719 let id_a = crate::cli::test_utils::seed_memory(&env.db_path, "ns", "old", "old fact");
2720 let id_b = crate::cli::test_utils::seed_memory(&env.db_path, "ns", "new", "new fact");
2721 let cfg = AppConfig::default();
2722 let cli = Cli::try_parse_from([
2723 "ai-memory",
2724 "--db",
2725 env.db_path.to_str().unwrap(),
2726 "resolve",
2727 &id_a,
2728 &id_b,
2729 ])
2730 .unwrap();
2731 run(cli, &cfg).await.unwrap();
2732 }
2733
2734 #[tokio::test]
2735 async fn test_run_dispatch_get_command() {
2736 let _g = no_config_env();
2737 let env = TestEnv::fresh();
2738 let id = crate::cli::test_utils::seed_memory(&env.db_path, "ns", "t", "c");
2739 let cfg = AppConfig::default();
2740 let cli = Cli::try_parse_from([
2741 "ai-memory",
2742 "--db",
2743 env.db_path.to_str().unwrap(),
2744 "get",
2745 &id,
2746 ])
2747 .unwrap();
2748 run(cli, &cfg).await.unwrap();
2749 }
2750
2751 #[tokio::test]
2752 async fn test_run_dispatch_promote_triggers_write_checkpoint() {
2753 let _g = no_config_env();
2756 let env = TestEnv::fresh();
2757 let id = crate::cli::test_utils::seed_memory(&env.db_path, "ns", "t", "c");
2758 let cfg = AppConfig::default();
2759 let cli = Cli::try_parse_from([
2760 "ai-memory",
2761 "--db",
2762 env.db_path.to_str().unwrap(),
2763 "promote",
2764 &id,
2765 ])
2766 .unwrap();
2767 run(cli, &cfg).await.unwrap();
2768 }
2769
2770 #[tokio::test]
2773 async fn test_run_dispatch_bench_smoke_runs_one_iteration() {
2774 let _g = no_config_env();
2779 let env = TestEnv::fresh();
2780 let cfg = AppConfig::default();
2781 let cli = Cli::try_parse_from([
2782 "ai-memory",
2783 "--db",
2784 env.db_path.to_str().unwrap(),
2785 "bench",
2786 "--iterations",
2787 "1",
2788 "--warmup",
2789 "0",
2790 ])
2791 .unwrap();
2792 let _ = run(cli, &cfg).await;
2795 }
2796
2797 #[tokio::test]
2798 async fn test_run_dispatch_bench_json_with_history() {
2799 let _g = no_config_env();
2801 let env = TestEnv::fresh();
2802 let history = env.db_path.with_file_name("hist.jsonl");
2803 let cfg = AppConfig::default();
2804 let cli = Cli::try_parse_from([
2805 "ai-memory",
2806 "--db",
2807 env.db_path.to_str().unwrap(),
2808 "bench",
2809 "--iterations",
2810 "1",
2811 "--warmup",
2812 "0",
2813 "--json",
2814 "--history",
2815 history.to_str().unwrap(),
2816 ])
2817 .unwrap();
2818 let _ = run(cli, &cfg).await;
2819 if history.exists() {
2821 let content = std::fs::read_to_string(&history).unwrap();
2822 assert!(content.contains("captured_at") || !content.is_empty());
2823 }
2824 }
2825
2826 #[cfg(feature = "sal")]
2829 #[tokio::test]
2830 async fn test_run_dispatch_migrate_sqlite_to_sqlite_dry_run() {
2831 let _g = no_config_env();
2833 let src_env = TestEnv::fresh();
2834 let dst_env = TestEnv::fresh();
2835 crate::cli::test_utils::seed_memory(&src_env.db_path, "ns-mig", "t", "c");
2837 let from = format!("sqlite://{}", src_env.db_path.display());
2838 let to = format!("sqlite://{}", dst_env.db_path.display());
2839 let cfg = AppConfig::default();
2840 let cli = Cli::try_parse_from([
2841 "ai-memory",
2842 "--db",
2843 src_env.db_path.to_str().unwrap(),
2844 "migrate",
2845 "--from",
2846 &from,
2847 "--to",
2848 &to,
2849 "--dry-run",
2850 ])
2851 .unwrap();
2852 run(cli, &cfg).await.unwrap();
2853 }
2854
2855 #[cfg(feature = "sal")]
2856 #[tokio::test]
2857 async fn test_run_dispatch_migrate_json_output() {
2858 let _g = no_config_env();
2860 let src_env = TestEnv::fresh();
2861 let dst_env = TestEnv::fresh();
2862 crate::cli::test_utils::seed_memory(&src_env.db_path, "ns-mig", "t", "c");
2863 let from = format!("sqlite://{}", src_env.db_path.display());
2864 let to = format!("sqlite://{}", dst_env.db_path.display());
2865 let cfg = AppConfig::default();
2866 let cli = Cli::try_parse_from([
2867 "ai-memory",
2868 "--db",
2869 src_env.db_path.to_str().unwrap(),
2870 "migrate",
2871 "--from",
2872 &from,
2873 "--to",
2874 &to,
2875 "--json",
2876 ])
2877 .unwrap();
2878 run(cli, &cfg).await.unwrap();
2879 }
2880
2881 #[tokio::test]
2884 async fn test_run_with_db_passphrase_file_exports_env() {
2885 let _g = env_var_lock();
2889 unsafe { std::env::remove_var("AI_MEMORY_DB_PASSPHRASE") };
2891 let env = TestEnv::fresh();
2892 let pass_path = env.db_path.with_file_name("pass");
2893 std::fs::write(&pass_path, "test-passphrase\n").unwrap();
2894 let cfg = AppConfig::default();
2895 let cli = Cli::try_parse_from([
2896 "ai-memory",
2897 "--db",
2898 env.db_path.to_str().unwrap(),
2899 "--db-passphrase-file",
2900 pass_path.to_str().unwrap(),
2901 "stats",
2902 ])
2903 .unwrap();
2904 run(cli, &cfg).await.unwrap();
2905 assert_eq!(
2907 std::env::var("AI_MEMORY_DB_PASSPHRASE").unwrap(),
2908 "test-passphrase"
2909 );
2910 unsafe { std::env::remove_var("AI_MEMORY_DB_PASSPHRASE") };
2912 }
2913
2914 #[test]
2917 fn test_init_tracing_is_idempotent() {
2918 init_tracing();
2923 init_tracing();
2924 }
2925
2926 #[tokio::test]
2934 async fn test_serve_http_with_shutdown_future_serves_then_stops() {
2935 let env = TestEnv::fresh();
2936 let app_state = keyword_app_state(&env.db_path);
2937 let api_key_state = ApiKeyState { key: None };
2938 let port = {
2940 let l = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
2941 let p = l.local_addr().unwrap().port();
2942 drop(l);
2943 p
2944 };
2945 let addr = format!("127.0.0.1:{port}");
2946 let shutdown = Arc::new(Notify::new());
2947 let shutdown_clone = shutdown.clone();
2948 let handle = tokio::spawn(async move {
2949 serve_http_with_shutdown_future(&addr, api_key_state, app_state, async move {
2950 shutdown_clone.notified().await;
2951 })
2952 .await
2953 });
2954 for _ in 0..40 {
2956 if let Ok(client) = reqwest::Client::builder()
2957 .timeout(Duration::from_millis(200))
2958 .build()
2959 && client
2960 .get(format!("http://127.0.0.1:{port}/api/v1/health"))
2961 .send()
2962 .await
2963 .is_ok()
2964 {
2965 break;
2966 }
2967 tokio::time::sleep(Duration::from_millis(50)).await;
2968 }
2969 shutdown.notify_one();
2970 let res = handle.await.unwrap();
2971 assert!(res.is_ok(), "serve future returned: {res:?}");
2972 }
2973
2974 #[tokio::test]
2977 async fn test_serve_http_with_shutdown_future_bind_failure_errors() {
2978 let env = TestEnv::fresh();
2982 let app_state = keyword_app_state(&env.db_path);
2983 let api_key_state = ApiKeyState { key: None };
2984 let res = serve_http_with_shutdown_future(
2989 "definitely-not-an-address:99999",
2990 api_key_state,
2991 app_state,
2992 async {},
2993 )
2994 .await;
2995 assert!(res.is_err(), "expected bind error, got: {res:?}");
2996 }
2997}