1use std::io::Write as _;
37use std::path::Path;
38use std::path::PathBuf;
39use std::sync::Arc;
40use std::sync::atomic::{AtomicBool, Ordering};
41use std::time::{Duration, Instant};
42
43use anyhow::{Context, Result};
44use axum::Router;
45use clap::{Args, CommandFactory, Parser, Subcommand};
46use clap_complete::{Shell, generate};
47use rusqlite::Connection;
48use tokio::sync::{Mutex, Notify};
49use tokio::task::JoinHandle;
50use tracing_subscriber::EnvFilter;
51
52use crate::cli::agents::{AgentsArgs, PendingArgs};
53use crate::cli::archive::ArchiveArgs;
54use crate::cli::backup::{BackupArgs, RestoreArgs};
55use crate::cli::consolidate::{AutoConsolidateArgs, ConsolidateArgs};
56use crate::cli::crud::{DeleteArgs, GetArgs, ListArgs};
57use crate::cli::curator::CuratorArgs;
58use crate::cli::forget::ForgetArgs;
59use crate::cli::io::{ImportArgs, MineArgs};
60use crate::cli::link::{LinkArgs, ResolveArgs};
61use crate::cli::promote::PromoteArgs;
62use crate::cli::recall::RecallArgs;
63use crate::cli::search::SearchArgs;
64use crate::cli::store::StoreArgs;
65use crate::cli::sync::{SyncArgs, SyncDaemonArgs};
66use crate::cli::update::UpdateArgs;
67use crate::config::{AppConfig, FeatureTier};
68use crate::embeddings::Embedder;
69use crate::handlers::{ApiKeyState, AppState, Db};
70use crate::hnsw::VectorIndex;
71use crate::{bench, cli, db, embeddings, federation, hnsw, llm, mcp, tls};
72
73#[cfg(feature = "sal")]
74use crate::migrate;
75
76const DEFAULT_DB: &str = "ai-memory.db";
77const DEFAULT_PORT: u16 = 9077;
78const GC_INTERVAL_SECS: u64 = 1800;
79const WAL_CHECKPOINT_INTERVAL_SECS: u64 = 600;
82
83#[derive(Parser)]
92#[command(
93 name = "ai-memory",
94 version,
95 about = "AI-agnostic persistent memory — MCP server, HTTP API, and CLI for any AI platform"
96)]
97pub struct Cli {
98 #[command(subcommand)]
99 pub command: Command,
100 #[arg(long, env = "AI_MEMORY_DB", default_value = DEFAULT_DB, global = true)]
101 pub db: PathBuf,
102 #[arg(long, global = true, default_value_t = false)]
104 pub json: bool,
105 #[arg(long, env = "AI_MEMORY_AGENT_ID", global = true)]
109 pub agent_id: Option<String>,
110 #[arg(long, global = true, value_name = "PATH")]
119 pub db_passphrase_file: Option<PathBuf>,
120}
121
122#[derive(Subcommand)]
123pub enum Command {
124 Serve(ServeArgs),
126 Mcp {
128 #[arg(long, default_value = "semantic")]
130 tier: String,
131 },
132 Store(StoreArgs),
134 Update(UpdateArgs),
136 Recall(RecallArgs),
138 Search(SearchArgs),
140 Get(GetArgs),
142 List(ListArgs),
144 Delete(DeleteArgs),
146 Promote(PromoteArgs),
148 Forget(ForgetArgs),
150 Link(LinkArgs),
152 Consolidate(ConsolidateArgs),
154 Gc,
156 Stats,
158 Namespaces,
160 Export,
162 Import(ImportArgs),
164 Resolve(ResolveArgs),
166 Shell,
168 Sync(SyncArgs),
170 SyncDaemon(SyncDaemonArgs),
175 AutoConsolidate(AutoConsolidateArgs),
177 Completions(CompletionsArgs),
179 Man,
181 Mine(MineArgs),
183 Archive(ArchiveArgs),
185 Agents(AgentsArgs),
187 Pending(PendingArgs),
189 Backup(BackupArgs),
193 Restore(RestoreArgs),
198 Curator(CuratorArgs),
203 Bench(BenchArgs),
209 #[cfg(feature = "sal")]
214 Migrate(MigrateArgs),
215}
216
217#[derive(Args)]
218pub struct BenchArgs {
219 #[arg(long, default_value_t = bench::DEFAULT_ITERATIONS)]
221 pub iterations: usize,
222 #[arg(long, default_value_t = bench::DEFAULT_WARMUP)]
225 pub warmup: usize,
226 #[arg(long)]
228 pub json: bool,
229 #[arg(long, value_name = "PATH")]
235 pub baseline: Option<String>,
236 #[arg(long, default_value_t = bench::DEFAULT_REGRESSION_THRESHOLD_PCT)]
240 pub regression_threshold: f64,
241 #[arg(long, value_name = "PATH")]
249 pub history: Option<PathBuf>,
250}
251
252#[cfg(feature = "sal")]
253#[derive(Args)]
254pub struct MigrateArgs {
255 #[arg(long)]
258 pub from: String,
259 #[arg(long)]
261 pub to: String,
262 #[arg(long, default_value_t = 1000)]
264 pub batch: usize,
265 #[arg(long)]
267 pub namespace: Option<String>,
268 #[arg(long)]
270 pub dry_run: bool,
271 #[arg(long)]
273 pub json: bool,
274}
275
276#[derive(Args)]
277pub struct ServeArgs {
278 #[arg(long, default_value = "127.0.0.1")]
279 pub host: String,
280 #[arg(long, default_value_t = DEFAULT_PORT)]
281 pub port: u16,
282 #[arg(long, requires = "tls_key")]
287 pub tls_cert: Option<PathBuf>,
288 #[arg(long, requires = "tls_cert")]
290 pub tls_key: Option<PathBuf>,
291 #[arg(long, requires = "tls_cert")]
301 pub mtls_allowlist: Option<PathBuf>,
302 #[arg(long, default_value_t = 30)]
307 pub shutdown_grace_secs: u64,
308
309 #[arg(long, default_value_t = 0)]
316 pub quorum_writes: usize,
317 #[arg(long, value_delimiter = ',')]
321 pub quorum_peers: Vec<String>,
322 #[arg(long, default_value_t = 2000)]
325 pub quorum_timeout_ms: u64,
326 #[arg(long)]
329 pub quorum_client_cert: Option<PathBuf>,
330 #[arg(long)]
332 pub quorum_client_key: Option<PathBuf>,
333 #[arg(long)]
340 pub quorum_ca_cert: Option<PathBuf>,
341 #[arg(long, default_value_t = 30)]
346 pub catchup_interval_secs: u64,
347}
348
349#[derive(Args)]
350pub struct CompletionsArgs {
351 pub shell: Shell,
352}
353
354#[allow(clippy::too_many_lines)]
365pub async fn run(cli: Cli, app_config: &AppConfig) -> Result<()> {
366 if let Some(path) = &cli.db_passphrase_file {
372 let passphrase = passphrase_from_file(path)?;
373 unsafe { std::env::set_var("AI_MEMORY_DB_PASSPHRASE", passphrase) };
375 }
376 let db_path = app_config.effective_db(&cli.db);
377 let j = cli.json;
378 let cli_agent_id: Option<String> = cli.agent_id.clone();
379 let needs_checkpoint = is_write_command(&cli.command);
381 let db_path_for_checkpoint = if needs_checkpoint {
382 Some(db_path.clone())
383 } else {
384 None
385 };
386
387 let result = match cli.command {
388 Command::Serve(a) => serve(db_path, a, app_config).await,
389 Command::Mcp { tier } => {
390 let feature_tier = app_config.effective_tier(Some(&tier));
391 mcp::run_mcp_server(&db_path, feature_tier, app_config)?;
392 Ok(())
393 }
394 Command::Store(a) => {
395 let stdout = std::io::stdout();
396 let stderr = std::io::stderr();
397 let mut so = stdout.lock();
398 let mut se = stderr.lock();
399 let mut out = cli::CliOutput::from_std(&mut so, &mut se);
400 cli::store::run(
401 &db_path,
402 a,
403 j,
404 app_config,
405 cli_agent_id.as_deref(),
406 &mut out,
407 )
408 }
409 Command::Update(a) => {
410 let stdout = std::io::stdout();
411 let stderr = std::io::stderr();
412 let mut so = stdout.lock();
413 let mut se = stderr.lock();
414 let mut out = cli::CliOutput::from_std(&mut so, &mut se);
415 cli::update::run(&db_path, &a, j, &mut out)
416 }
417 Command::Recall(a) => {
418 let stdout = std::io::stdout();
419 let stderr = std::io::stderr();
420 let mut so = stdout.lock();
421 let mut se = stderr.lock();
422 let mut out = cli::CliOutput::from_std(&mut so, &mut se);
423 cli::recall::run(&db_path, &a, j, app_config, &mut out)
424 }
425 Command::Search(a) => {
426 let stdout = std::io::stdout();
427 let stderr = std::io::stderr();
428 let mut so = stdout.lock();
429 let mut se = stderr.lock();
430 let mut out = cli::CliOutput::from_std(&mut so, &mut se);
431 cli::search::run(&db_path, &a, j, &mut out)
432 }
433 Command::Get(a) => {
434 let stdout = std::io::stdout();
435 let stderr = std::io::stderr();
436 let mut so = stdout.lock();
437 let mut se = stderr.lock();
438 let mut out = cli::CliOutput::from_std(&mut so, &mut se);
439 cli::crud::cmd_get(&db_path, &a, j, &mut out)
440 }
441 Command::List(a) => {
442 let stdout = std::io::stdout();
443 let stderr = std::io::stderr();
444 let mut so = stdout.lock();
445 let mut se = stderr.lock();
446 let mut out = cli::CliOutput::from_std(&mut so, &mut se);
447 cli::crud::cmd_list(&db_path, &a, j, app_config, &mut out)
448 }
449 Command::Delete(a) => {
450 let stdout = std::io::stdout();
451 let stderr = std::io::stderr();
452 let mut so = stdout.lock();
453 let mut se = stderr.lock();
454 let mut out = cli::CliOutput::from_std(&mut so, &mut se);
455 cli::crud::cmd_delete(&db_path, &a, j, cli_agent_id.as_deref(), &mut out)
456 }
457 Command::Promote(a) => {
458 let stdout = std::io::stdout();
459 let stderr = std::io::stderr();
460 let mut so = stdout.lock();
461 let mut se = stderr.lock();
462 let mut out = cli::CliOutput::from_std(&mut so, &mut se);
463 cli::promote::cmd_promote(&db_path, &a, j, cli_agent_id.as_deref(), &mut out)
464 }
465 Command::Forget(a) => {
466 let stdout = std::io::stdout();
467 let stderr = std::io::stderr();
468 let mut so = stdout.lock();
469 let mut se = stderr.lock();
470 let mut out = cli::CliOutput::from_std(&mut so, &mut se);
471 cli::forget::cmd_forget(&db_path, &a, j, &mut out)
472 }
473 Command::Link(a) => {
474 let stdout = std::io::stdout();
475 let stderr = std::io::stderr();
476 let mut so = stdout.lock();
477 let mut se = stderr.lock();
478 let mut out = cli::CliOutput::from_std(&mut so, &mut se);
479 cli::link::cmd_link(&db_path, &a, j, &mut out)
480 }
481 Command::Consolidate(a) => {
482 let stdout = std::io::stdout();
483 let stderr = std::io::stderr();
484 let mut so = stdout.lock();
485 let mut se = stderr.lock();
486 let mut out = cli::CliOutput::from_std(&mut so, &mut se);
487 cli::consolidate::run(&db_path, a, j, cli_agent_id.as_deref(), &mut out)
488 }
489 Command::Resolve(a) => {
490 let stdout = std::io::stdout();
491 let stderr = std::io::stderr();
492 let mut so = stdout.lock();
493 let mut se = stderr.lock();
494 let mut out = cli::CliOutput::from_std(&mut so, &mut se);
495 cli::link::cmd_resolve(&db_path, &a, j, &mut out)
496 }
497 Command::Shell => cli::shell::run(&db_path),
498 Command::Sync(a) => {
499 let stdout = std::io::stdout();
500 let stderr = std::io::stderr();
501 let mut so = stdout.lock();
502 let mut se = stderr.lock();
503 let mut out = cli::CliOutput::from_std(&mut so, &mut se);
504 cli::sync::run(&db_path, &a, j, cli_agent_id.as_deref(), &mut out)
505 }
506 Command::SyncDaemon(a) => cli::sync::run_daemon(&db_path, a, cli_agent_id.as_deref()).await,
507 Command::AutoConsolidate(a) => {
508 let stdout = std::io::stdout();
509 let stderr = std::io::stderr();
510 let mut so = stdout.lock();
511 let mut se = stderr.lock();
512 let mut out = cli::CliOutput::from_std(&mut so, &mut se);
513 cli::consolidate::run_auto(&db_path, &a, j, cli_agent_id.as_deref(), &mut out)
514 }
515 Command::Gc => {
516 let stdout = std::io::stdout();
517 let stderr = std::io::stderr();
518 let mut so = stdout.lock();
519 let mut se = stderr.lock();
520 let mut out = cli::CliOutput::from_std(&mut so, &mut se);
521 cli::gc::run_gc(&db_path, j, app_config, &mut out)
522 }
523 Command::Stats => {
524 let stdout = std::io::stdout();
525 let stderr = std::io::stderr();
526 let mut so = stdout.lock();
527 let mut se = stderr.lock();
528 let mut out = cli::CliOutput::from_std(&mut so, &mut se);
529 cli::gc::run_stats(&db_path, j, &mut out)
530 }
531 Command::Namespaces => {
532 let stdout = std::io::stdout();
533 let stderr = std::io::stderr();
534 let mut so = stdout.lock();
535 let mut se = stderr.lock();
536 let mut out = cli::CliOutput::from_std(&mut so, &mut se);
537 cli::gc::run_namespaces(&db_path, j, &mut out)
538 }
539 Command::Export => {
540 let stdout = std::io::stdout();
541 let stderr = std::io::stderr();
542 let mut so = stdout.lock();
543 let mut se = stderr.lock();
544 let mut out = cli::CliOutput::from_std(&mut so, &mut se);
545 cli::io::export(&db_path, &mut out)
546 }
547 Command::Import(a) => {
548 let stdout = std::io::stdout();
549 let stderr = std::io::stderr();
550 let mut so = stdout.lock();
551 let mut se = stderr.lock();
552 let mut out = cli::CliOutput::from_std(&mut so, &mut se);
553 cli::io::import(&db_path, &a, j, cli_agent_id.as_deref(), &mut out)
554 }
555 Command::Completions(a) => {
556 generate(
557 a.shell,
558 &mut Cli::command(),
559 "ai-memory",
560 &mut std::io::stdout(),
561 );
562 Ok(())
563 }
564 Command::Man => {
565 let cmd = Cli::command();
566 let man = clap_mangen::Man::new(cmd);
567 man.render(&mut std::io::stdout())?;
568 Ok(())
569 }
570 Command::Mine(a) => {
571 let stdout = std::io::stdout();
572 let stderr = std::io::stderr();
573 let mut so = stdout.lock();
574 let mut se = stderr.lock();
575 let mut out = cli::CliOutput::from_std(&mut so, &mut se);
576 cli::io::mine(
577 &db_path,
578 a,
579 j,
580 app_config,
581 cli_agent_id.as_deref(),
582 &mut out,
583 )
584 }
585 Command::Archive(a) => {
586 let stdout = std::io::stdout();
587 let stderr = std::io::stderr();
588 let mut so = stdout.lock();
589 let mut se = stderr.lock();
590 let mut out = cli::CliOutput::from_std(&mut so, &mut se);
591 cli::archive::run(&db_path, a, j, &mut out)
592 }
593 Command::Agents(a) => {
594 let stdout = std::io::stdout();
595 let stderr = std::io::stderr();
596 let mut so = stdout.lock();
597 let mut se = stderr.lock();
598 let mut out = cli::CliOutput::from_std(&mut so, &mut se);
599 cli::agents::run_agents(&db_path, a, j, &mut out)
600 }
601 Command::Pending(a) => {
602 let stdout = std::io::stdout();
603 let stderr = std::io::stderr();
604 let mut so = stdout.lock();
605 let mut se = stderr.lock();
606 let mut out = cli::CliOutput::from_std(&mut so, &mut se);
607 cli::agents::run_pending(&db_path, a, j, cli_agent_id.as_deref(), &mut out)
608 }
609 Command::Backup(a) => {
610 let stdout = std::io::stdout();
611 let stderr = std::io::stderr();
612 let mut so = stdout.lock();
613 let mut se = stderr.lock();
614 let mut out = cli::CliOutput::from_std(&mut so, &mut se);
615 cli::backup::run_backup(&db_path, &a, j, &mut out)
616 }
617 Command::Restore(a) => {
618 let stdout = std::io::stdout();
619 let stderr = std::io::stderr();
620 let mut so = stdout.lock();
621 let mut se = stderr.lock();
622 let mut out = cli::CliOutput::from_std(&mut so, &mut se);
623 cli::backup::run_restore(&db_path, &a, j, &mut out)
624 }
625 Command::Curator(a) => {
626 let stdout = std::io::stdout();
627 let stderr = std::io::stderr();
628 let mut so = stdout.lock();
629 let mut se = stderr.lock();
630 let mut out = cli::CliOutput::from_std(&mut so, &mut se);
631 cli::curator::run(&db_path, &a, app_config, &mut out).await
632 }
633 Command::Bench(a) => cmd_bench(&a),
634 #[cfg(feature = "sal")]
635 Command::Migrate(a) => cmd_migrate(&a).await,
636 };
637
638 if result.is_ok()
640 && let Some(cp_path) = db_path_for_checkpoint
641 && let Ok(conn) = db::open(&cp_path)
642 {
643 let _ = db::checkpoint(&conn);
644 }
645
646 result
647}
648
649#[must_use]
656pub fn is_write_command(cmd: &Command) -> bool {
657 matches!(
658 cmd,
659 Command::Store(_)
660 | Command::Update(_)
661 | Command::Delete(_)
662 | Command::Promote(_)
663 | Command::Forget(_)
664 | Command::Link(_)
665 | Command::Consolidate(_)
666 | Command::Resolve(_)
667 | Command::Sync(_)
668 | Command::SyncDaemon(_)
669 | Command::Import(_)
670 | Command::AutoConsolidate(_)
671 | Command::Gc
672 )
673}
674
675pub fn passphrase_from_file(path: &Path) -> Result<String> {
688 let raw = std::fs::read_to_string(path)
689 .with_context(|| format!("reading passphrase file {}", path.display()))?;
690 let passphrase = raw.trim_end_matches(['\n', '\r']).to_string();
691 if passphrase.is_empty() {
692 anyhow::bail!("passphrase file {} is empty", path.display());
693 }
694 Ok(passphrase)
695}
696
697pub fn apply_anonymize_default(app_config: &AppConfig) {
706 if app_config.effective_anonymize_default() && std::env::var("AI_MEMORY_ANONYMIZE").is_err() {
709 unsafe { std::env::set_var("AI_MEMORY_ANONYMIZE", "1") };
711 }
712}
713
714pub async fn build_embedder(feature_tier: FeatureTier, app_config: &AppConfig) -> Option<Embedder> {
728 let tier_config = feature_tier.config();
729 let Some(emb_model) = tier_config.embedding_model else {
730 tracing::info!(
731 "embedder disabled — tier={} keyword-only (FTS5); semantic recall not wired",
732 feature_tier.as_str()
733 );
734 return None;
735 };
736 let embed_url = app_config.effective_embed_url().to_string();
737 let build = match tokio::task::spawn_blocking(move || {
743 let embed_client = llm::OllamaClient::new_with_url(&embed_url, "nomic-embed-text")
744 .ok()
745 .map(Arc::new);
746 embeddings::Embedder::for_model(emb_model, embed_client)
747 })
748 .await
749 {
750 Ok(b) => b,
751 Err(e) => {
752 tracing::error!("embedder spawn_blocking join failed: {e}");
753 return None;
754 }
755 };
756 match build {
757 Ok(emb) => {
758 tracing::info!(
759 "embedder loaded ({}) — tier={} semantic recall enabled",
760 emb.model_description(),
761 feature_tier.as_str()
762 );
763 Some(emb)
764 }
765 Err(e) => {
766 tracing::error!(
774 "EMBEDDER LOAD FAILED — tier={} requested semantic features, \
775 but embedder init errored: {e}. Daemon falls back to keyword-only. \
776 Semantic recall, sync_push embedding refresh (#322), and HNSW index \
777 will be NO-OPS. Check network egress to HuggingFace Hub + available \
778 memory for model weights. To force keyword-only explicitly (silences \
779 this error), set `tier = \"keyword\"` in config.toml.",
780 feature_tier.as_str()
781 );
782 None
783 }
784 }
785}
786
787#[must_use]
793pub fn build_vector_index(conn: &Connection, embedder_present: bool) -> Option<VectorIndex> {
794 if !embedder_present {
795 return None;
796 }
797 match db::get_all_embeddings(conn) {
798 Ok(entries) if !entries.is_empty() => Some(hnsw::VectorIndex::build(entries)),
799 _ => Some(hnsw::VectorIndex::empty()),
800 }
801}
802
803#[must_use]
812pub fn spawn_gc_loop(
813 state: Db,
814 archive_max_days: Option<i64>,
815 interval: Duration,
816) -> JoinHandle<()> {
817 tokio::spawn(async move {
818 loop {
819 tokio::time::sleep(interval).await;
820 let lock = state.lock().await;
821 match db::gc(&lock.0, lock.3) {
822 Ok(n) if n > 0 => tracing::info!("gc: expired {n} memories"),
823 _ => {}
824 }
825 match db::auto_purge_archive(&lock.0, archive_max_days) {
827 Ok(n) if n > 0 => tracing::info!("gc: purged {n} old archived memories"),
828 _ => {}
829 }
830 }
831 })
832}
833
834#[must_use]
838pub fn spawn_wal_checkpoint_loop(state: Db, interval: Duration) -> JoinHandle<()> {
839 let half = interval / 2;
840 tokio::spawn(async move {
841 tokio::time::sleep(half).await;
844 loop {
845 {
846 let lock = state.lock().await;
847 match db::checkpoint(&lock.0) {
848 Ok(()) => tracing::debug!("wal checkpoint: ok"),
849 Err(e) => tracing::warn!("wal checkpoint failed: {e}"),
850 }
851 }
852 tokio::time::sleep(interval).await;
853 }
854 })
855}
856
857#[must_use]
868pub fn build_router(app_state: AppState, api_key_state: ApiKeyState) -> Router {
869 crate::build_router(api_key_state, app_state)
870}
871
872pub struct ServeBootstrap {
878 pub app_state: AppState,
879 pub api_key_state: ApiKeyState,
880 pub db_state: Db,
881 pub archive_max_days: Option<i64>,
882 pub task_handles: Vec<JoinHandle<()>>,
883}
884
885pub async fn bootstrap_serve(
888 db_path: &Path,
889 args: &ServeArgs,
890 app_config: &AppConfig,
891) -> Result<ServeBootstrap> {
892 let resolved_ttl = app_config.effective_ttl();
893 let archive_on_gc = app_config.effective_archive_on_gc();
894 let conn = db::open(db_path)?;
895
896 let feature_tier = app_config.effective_tier(None);
903 let tier_config = feature_tier.config();
904 let embedder = build_embedder(feature_tier, app_config).await;
905 let vector_index = build_vector_index(&conn, embedder.is_some());
906
907 let db_state: Db = Arc::new(Mutex::new((
908 conn,
909 db_path.to_path_buf(),
910 resolved_ttl,
911 archive_on_gc,
912 )));
913
914 let federation = federation::FederationConfig::build(
918 args.quorum_writes,
919 &args.quorum_peers,
920 std::time::Duration::from_millis(args.quorum_timeout_ms),
921 args.quorum_client_cert.as_deref(),
922 args.quorum_client_key.as_deref(),
923 args.quorum_ca_cert.as_deref(),
924 format!("host:{}", gethostname::gethostname().to_string_lossy()),
925 )
926 .context("federation config")?;
927
928 let mut task_handles: Vec<JoinHandle<()>> = Vec::new();
929
930 if let Some(ref fed) = federation {
931 tracing::info!(
932 "federation enabled: W={} over {} peer(s), timeout {}ms",
933 fed.policy.w,
934 fed.peer_count(),
935 args.quorum_timeout_ms,
936 );
937 if args.catchup_interval_secs > 0 {
940 let interval = std::time::Duration::from_secs(args.catchup_interval_secs);
941 tracing::info!(
942 "catchup loop enabled: polling {} peer(s) every {}s",
943 fed.peer_count(),
944 args.catchup_interval_secs,
945 );
946 federation::spawn_catchup_loop(fed.clone(), db_state.clone(), interval);
947 } else {
948 tracing::info!("catchup loop disabled (--catchup-interval-secs=0)");
949 }
950 }
951
952 let app_state = AppState {
953 db: db_state.clone(),
954 embedder: Arc::new(embedder),
955 vector_index: Arc::new(Mutex::new(vector_index)),
956 federation: Arc::new(federation),
957 tier_config: Arc::new(tier_config),
958 scoring: Arc::new(app_config.effective_scoring()),
959 };
960
961 task_handles.push(spawn_gc_loop(
963 db_state.clone(),
964 app_config.archive_max_days,
965 Duration::from_secs(GC_INTERVAL_SECS),
966 ));
967
968 task_handles.push(spawn_wal_checkpoint_loop(
977 db_state.clone(),
978 Duration::from_secs(WAL_CHECKPOINT_INTERVAL_SECS),
979 ));
980
981 let api_key_state = ApiKeyState {
982 key: app_config.api_key.clone(),
983 };
984 if api_key_state.key.is_some() {
985 tracing::info!("API key authentication enabled");
986 }
987
988 Ok(ServeBootstrap {
989 app_state,
990 api_key_state,
991 db_state,
992 archive_max_days: app_config.archive_max_days,
993 task_handles,
994 })
995}
996
997fn init_tracing() {
1001 let _ = tracing_subscriber::fmt()
1002 .with_env_filter(
1003 EnvFilter::from_default_env()
1004 .add_directive("ai_memory=info".parse().unwrap())
1005 .add_directive("tower_http=info".parse().unwrap()),
1006 )
1007 .try_init();
1008}
1009
1010#[allow(clippy::too_many_lines)]
1016pub async fn serve(db_path: PathBuf, args: ServeArgs, app_config: &AppConfig) -> Result<()> {
1017 init_tracing();
1018
1019 let bootstrap = bootstrap_serve(&db_path, &args, app_config).await?;
1020
1021 let addr = format!("{}:{}", args.host, args.port);
1022 tracing::info!("database: {}", db_path.display());
1023
1024 let shutdown_state = bootstrap.db_state.clone();
1026 let shutdown = async move {
1027 let _ = tokio::signal::ctrl_c().await;
1028 tracing::info!("shutting down — checkpointing WAL");
1029 let lock = shutdown_state.lock().await;
1030 let _ = db::checkpoint(&lock.0);
1031 };
1032
1033 if let (Some(cert), Some(key)) = (&args.tls_cert, &args.tls_key) {
1038 let _ = rustls::crypto::ring::default_provider().install_default();
1042 let tls_config = if let Some(allowlist_path) = &args.mtls_allowlist {
1046 tracing::info!(
1047 "mTLS enabled — client certs required. Allowlist: {}",
1048 allowlist_path.display()
1049 );
1050 tls::load_mtls_rustls_config(cert, key, allowlist_path).await?
1051 } else {
1052 tracing::warn!(
1053 "TLS enabled but mTLS NOT configured — sync endpoints \
1054 (/api/v1/sync/push, /api/v1/sync/since) accept any client. \
1055 Set --mtls-allowlist for production peer-mesh deployments \
1056 (red-team #231)."
1057 );
1058 tls::load_rustls_config(cert, key).await?
1059 };
1060 let app = build_router(bootstrap.app_state, bootstrap.api_key_state);
1061 tracing::info!("ai-memory listening on https://{addr}");
1062 let socket_addr: std::net::SocketAddr = addr.parse()?;
1063 let grace = std::time::Duration::from_secs(args.shutdown_grace_secs);
1069 let handle = axum_server::Handle::new();
1070 let handle_clone = handle.clone();
1071 tokio::spawn(async move {
1072 shutdown.await;
1073 handle_clone.graceful_shutdown(Some(grace));
1074 });
1075 axum_server::bind_rustls(socket_addr, tls_config)
1076 .handle(handle)
1077 .serve(app.into_make_service())
1078 .await?;
1079 } else {
1080 tracing::warn!(
1081 "TLS NOT enabled — sync endpoints (/api/v1/sync/push, \
1082 /api/v1/sync/since) accept any caller over plain HTTP. \
1083 Set --tls-cert + --tls-key + --mtls-allowlist for production \
1084 peer-mesh deployments (red-team #231)."
1085 );
1086 tracing::info!("ai-memory listening on http://{addr}");
1087 serve_http_with_shutdown_future(
1094 &addr,
1095 bootstrap.api_key_state,
1096 bootstrap.app_state,
1097 shutdown,
1098 )
1099 .await?;
1100 }
1101 Ok(())
1102}
1103
1104fn cmd_bench(args: &BenchArgs) -> Result<()> {
1109 let iterations = args.iterations.clamp(1, 100_000);
1110 let warmup = args.warmup.min(10_000);
1111 let regression_threshold = args.regression_threshold.clamp(0.0, 1000.0);
1112 let conn = db::open(Path::new(":memory:"))?;
1116 let config = bench::BenchConfig {
1117 iterations,
1118 warmup,
1119 namespace: bench::BENCH_NAMESPACE.to_string(),
1120 };
1121 let results = bench::run(&conn, &config)?;
1122
1123 let regressions = if let Some(path) = &args.baseline {
1124 let baseline = bench::load_baseline(Path::new(path))?;
1125 Some(bench::compare_against_baseline(
1126 &results,
1127 &baseline,
1128 regression_threshold,
1129 ))
1130 } else {
1131 None
1132 };
1133
1134 if args.json {
1135 println!(
1136 "{}",
1137 serde_json::to_string_pretty(&serde_json::json!({
1138 "iterations": iterations,
1139 "warmup": warmup,
1140 "results": results,
1141 "regressions": regressions,
1142 }))?
1143 );
1144 } else {
1145 print!("{}", bench::render_table(&results));
1146 if let Some(rows) = ®ressions {
1147 println!();
1148 print!("{}", bench::render_regression_table(rows));
1149 }
1150 }
1151
1152 if let Some(history_path) = &args.history {
1153 let captured_at = chrono::Utc::now().to_rfc3339();
1154 bench::append_history(history_path, &captured_at, iterations, warmup, &results)?;
1155 let mut stderr = std::io::stderr().lock();
1156 let _ = writeln!(
1157 stderr,
1158 "bench: appended run to history file {}",
1159 history_path.display()
1160 );
1161 }
1162
1163 let budget_failed = results
1164 .iter()
1165 .any(|r| matches!(r.status, bench::Status::Fail));
1166 let regression_failed = regressions
1167 .as_ref()
1168 .is_some_and(|rows| rows.iter().any(|r| r.regressed));
1169
1170 if budget_failed && regression_failed {
1171 anyhow::bail!(
1172 "bench: at least one operation exceeded its p95 budget by >10% AND regressed >{regression_threshold:.1}% vs baseline"
1173 );
1174 }
1175 if budget_failed {
1176 anyhow::bail!("bench: at least one operation exceeded its p95 budget by >10%");
1177 }
1178 if regression_failed {
1179 anyhow::bail!(
1180 "bench: at least one operation regressed >{regression_threshold:.1}% vs baseline"
1181 );
1182 }
1183 Ok(())
1184}
1185
1186#[cfg(feature = "sal")]
1187async fn cmd_migrate(args: &MigrateArgs) -> Result<()> {
1188 let src = migrate::open_store(&args.from)
1189 .await
1190 .context("open source store")?;
1191 let dst = migrate::open_store(&args.to)
1192 .await
1193 .context("open destination store")?;
1194 let report = migrate::migrate(
1195 src.as_ref(),
1196 dst.as_ref(),
1197 args.batch,
1198 args.namespace.clone(),
1199 args.dry_run,
1200 )
1201 .await;
1202 if args.json {
1203 let value = serde_json::json!({
1204 "from_url": args.from,
1205 "to_url": args.to,
1206 "memories_read": report.memories_read,
1207 "memories_written": report.memories_written,
1208 "batches": report.batches,
1209 "errors": report.errors,
1210 "dry_run": report.dry_run,
1211 });
1212 println!("{}", serde_json::to_string_pretty(&value)?);
1213 } else {
1214 println!("migration report");
1215 println!(" from: {}", args.from);
1216 println!(" to: {}", args.to);
1217 println!(" memories_read: {}", report.memories_read);
1218 println!(" memories_written: {}", report.memories_written);
1219 println!(" batches: {}", report.batches);
1220 println!(" dry_run: {}", report.dry_run);
1221 println!(" errors: {}", report.errors.len());
1222 for e in &report.errors {
1223 println!(" - {e}");
1224 }
1225 }
1226 if !report.errors.is_empty() {
1227 anyhow::bail!("migration completed with {} error(s)", report.errors.len());
1228 }
1229 Ok(())
1230}
1231
1232pub async fn serve_http_with_shutdown(
1247 addr: &str,
1248 api_key_state: ApiKeyState,
1249 app_state: AppState,
1250 shutdown: Arc<Notify>,
1251) -> Result<()> {
1252 serve_http_with_shutdown_future(addr, api_key_state, app_state, async move {
1253 shutdown.notified().await;
1254 })
1255 .await
1256}
1257
1258pub async fn serve_http_with_shutdown_future<F>(
1266 addr: &str,
1267 api_key_state: ApiKeyState,
1268 app_state: AppState,
1269 shutdown: F,
1270) -> Result<()>
1271where
1272 F: std::future::Future<Output = ()> + Send + 'static,
1273{
1274 let app = crate::build_router(api_key_state, app_state);
1275 let listener = tokio::net::TcpListener::bind(addr)
1276 .await
1277 .with_context(|| format!("bind {addr}"))?;
1278 axum::serve(listener, app)
1279 .with_graceful_shutdown(shutdown)
1280 .await
1281 .context("axum::serve")?;
1282 Ok(())
1283}
1284
1285pub async fn sync_cycle_once(
1292 client: &reqwest::Client,
1293 db_path: &Path,
1294 local_agent_id: &str,
1295 peer_url: &str,
1296 api_key: Option<&str>,
1297 batch_size: usize,
1298) -> Result<()> {
1299 let peer_url = peer_url.trim_end_matches('/');
1300
1301 let since = {
1303 let conn = db::open(db_path)?;
1304 db::sync_state_load(&conn, local_agent_id)?
1305 .entries
1306 .get(peer_url)
1307 .cloned()
1308 };
1309
1310 let mut pull_url = format!(
1311 "{peer_url}/api/v1/sync/since?limit={batch_size}&peer={}",
1312 urlencoding_minimal(local_agent_id)
1313 );
1314 if let Some(ref s) = since {
1315 pull_url.push_str("&since=");
1316 pull_url.push_str(&urlencoding_minimal(s));
1317 }
1318
1319 let mut req = client.get(&pull_url).header("x-agent-id", local_agent_id);
1320 if let Some(key) = api_key {
1321 req = req.header("x-api-key", key);
1322 }
1323 let resp = req.send().await?;
1324 if !resp.status().is_success() {
1325 anyhow::bail!("sync-daemon: pull status {}", resp.status());
1326 }
1327 let pulled: SyncSinceResponse = resp.json().await?;
1328 let pull_count = pulled.memories.len();
1329 let latest_pulled = pulled.memories.last().map(|m| m.updated_at.clone());
1330
1331 {
1332 let conn = db::open(db_path)?;
1333 for mem in &pulled.memories {
1334 if crate::validate::validate_memory(mem).is_ok() {
1335 let _ = db::insert_if_newer(&conn, mem);
1336 }
1337 }
1338 if let Some(ref at) = latest_pulled {
1339 db::sync_state_observe(&conn, local_agent_id, peer_url, at)?;
1340 }
1341 }
1342
1343 let last_pushed = {
1345 let conn = db::open(db_path)?;
1346 db::sync_state_last_pushed(&conn, local_agent_id, peer_url)
1347 };
1348 let outgoing = {
1349 let conn = db::open(db_path)?;
1350 db::memories_updated_since(&conn, last_pushed.as_deref(), batch_size)?
1351 };
1352 let push_count = outgoing.len();
1353 let latest_pushed = outgoing.last().map(|m| m.updated_at.clone());
1354
1355 if !outgoing.is_empty() {
1356 let body = serde_json::json!({
1357 "sender_agent_id": local_agent_id,
1358 "sender_clock": { "entries": {} },
1359 "memories": outgoing,
1360 "dry_run": false,
1361 });
1362 let mut req = client
1363 .post(format!("{peer_url}/api/v1/sync/push"))
1364 .header("x-agent-id", local_agent_id)
1365 .header("content-type", "application/json")
1366 .json(&body);
1367 if let Some(key) = api_key {
1368 req = req.header("x-api-key", key);
1369 }
1370 let resp = req.send().await?;
1371 if !resp.status().is_success() {
1372 anyhow::bail!("sync-daemon: push status {}", resp.status());
1373 }
1374 if let Some(at) = latest_pushed {
1375 let conn = db::open(db_path)?;
1376 db::sync_state_record_push(&conn, local_agent_id, peer_url, &at)?;
1377 }
1378 }
1379
1380 tracing::info!("sync-daemon: peer={peer_url} pulled={pull_count} pushed={push_count}");
1381 Ok(())
1382}
1383
1384pub async fn run_sync_daemon_with_shutdown(
1392 db_path: PathBuf,
1393 local_agent_id: String,
1394 peers: Vec<String>,
1395 api_key: Option<String>,
1396 interval_secs: u64,
1397 batch_size: usize,
1398 shutdown: Arc<Notify>,
1399) -> Result<()> {
1400 let client = reqwest::Client::builder()
1401 .timeout(Duration::from_secs(30))
1402 .build()?;
1403 run_sync_daemon_with_shutdown_using_client(
1404 client,
1405 db_path,
1406 local_agent_id,
1407 peers,
1408 api_key,
1409 interval_secs,
1410 batch_size,
1411 shutdown,
1412 )
1413 .await
1414}
1415
1416pub async fn run_sync_daemon_with_shutdown_using_client(
1423 client: reqwest::Client,
1424 db_path: PathBuf,
1425 local_agent_id: String,
1426 peers: Vec<String>,
1427 api_key: Option<String>,
1428 interval_secs: u64,
1429 batch_size: usize,
1430 shutdown: Arc<Notify>,
1431) -> Result<()> {
1432 let interval = interval_secs.max(1);
1433 let batch_size = batch_size.max(1);
1434
1435 let db_path_owned: Arc<Path> = Arc::from(db_path.as_path());
1436 let local_agent_id_arc: Arc<str> = Arc::from(local_agent_id.as_str());
1437 let api_key_arc: Option<Arc<str>> = api_key.as_deref().map(Arc::from);
1438 let peers_arc: Vec<Arc<str>> = peers.iter().map(|s| Arc::from(s.as_str())).collect();
1439 loop {
1440 let mut set: tokio::task::JoinSet<()> = tokio::task::JoinSet::new();
1441 for peer_url in &peers_arc {
1442 let client = client.clone();
1443 let db_path = db_path_owned.clone();
1444 let local_agent_id = local_agent_id_arc.clone();
1445 let peer_url = peer_url.clone();
1446 let api_key = api_key_arc.clone();
1447 set.spawn(async move {
1448 if let Err(e) = sync_cycle_once(
1449 &client,
1450 &db_path,
1451 &local_agent_id,
1452 &peer_url,
1453 api_key.as_deref(),
1454 batch_size,
1455 )
1456 .await
1457 {
1458 tracing::warn!("sync-daemon: peer {peer_url} cycle failed: {e}");
1459 }
1460 });
1461 }
1462 while set.join_next().await.is_some() {}
1463
1464 tokio::select! {
1465 () = tokio::time::sleep(Duration::from_secs(interval)) => {}
1466 () = shutdown.notified() => {
1467 tracing::info!("sync-daemon: shutdown signal received");
1468 return Ok(());
1469 }
1470 }
1471 }
1472}
1473
1474pub async fn run_curator_daemon_with_shutdown(
1482 db_path: PathBuf,
1483 cfg: crate::curator::CuratorConfig,
1484 shutdown: Arc<Notify>,
1485) -> Result<()> {
1486 let shutdown_flag = Arc::new(AtomicBool::new(false));
1487 let shutdown_flag_for_signal = shutdown_flag.clone();
1488 tokio::spawn(async move {
1489 shutdown.notified().await;
1490 shutdown_flag_for_signal.store(true, Ordering::Relaxed);
1491 });
1492
1493 let llm_arc: Option<Arc<crate::llm::OllamaClient>> = None;
1494 let db_owned = db_path;
1495 tokio::task::spawn_blocking(move || {
1496 crate::curator::run_daemon(db_owned, llm_arc, cfg, shutdown_flag);
1497 })
1498 .await
1499 .map_err(|e| anyhow::anyhow!("curator daemon join: {e}"))?;
1500 Ok(())
1501}
1502
1503#[allow(clippy::too_many_arguments)]
1508pub async fn run_curator_daemon_with_primitives(
1509 db_path: PathBuf,
1510 interval_secs: u64,
1511 max_ops_per_cycle: usize,
1512 dry_run: bool,
1513 include_namespaces: Vec<String>,
1514 exclude_namespaces: Vec<String>,
1515 ollama_model: Option<String>,
1516 shutdown: Arc<Notify>,
1517) -> Result<()> {
1518 let cfg = crate::curator::CuratorConfig {
1519 interval_secs,
1520 max_ops_per_cycle,
1521 dry_run,
1522 include_namespaces,
1523 exclude_namespaces,
1524 };
1525 let llm: Option<Arc<crate::llm::OllamaClient>> =
1526 ollama_model.and_then(|m| crate::llm::OllamaClient::new(&m).ok().map(Arc::new));
1527
1528 let shutdown_flag = Arc::new(AtomicBool::new(false));
1529 let shutdown_flag_for_signal = shutdown_flag.clone();
1530 tokio::spawn(async move {
1531 shutdown.notified().await;
1532 shutdown_flag_for_signal.store(true, Ordering::Relaxed);
1533 });
1534
1535 tokio::task::spawn_blocking(move || {
1536 crate::curator::run_daemon(db_path, llm, cfg, shutdown_flag);
1537 })
1538 .await
1539 .map_err(|e| anyhow::anyhow!("curator daemon join: {e}"))?;
1540 Ok(())
1541}
1542
1543fn urlencoding_minimal(s: &str) -> String {
1552 use std::fmt::Write as _;
1553 let mut out = String::with_capacity(s.len());
1554 for b in s.bytes() {
1555 match b {
1556 b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => {
1557 out.push(b as char);
1558 }
1559 _ => {
1560 let _ = write!(out, "%{b:02X}");
1561 }
1562 }
1563 }
1564 out
1565}
1566
1567#[derive(serde::Deserialize)]
1572struct SyncSinceResponse {
1573 #[allow(dead_code)]
1574 count: usize,
1575 #[allow(dead_code)]
1576 limit: usize,
1577 memories: Vec<crate::models::Memory>,
1578}
1579
1580#[allow(dead_code)]
1583fn _imports_in_use(_: Instant, _: Duration) {}
1584
1585#[cfg(test)]
1590mod tests {
1591 use super::*;
1592 use crate::cli::test_utils::TestEnv;
1593 use crate::config::ResolvedTtl;
1594 use axum::body::Body;
1595 use axum::http::{Request, StatusCode};
1596 use tower::ServiceExt as _;
1597
1598 fn args_with_db(_db: &Path) -> ServeArgs {
1601 ServeArgs {
1602 host: "127.0.0.1".to_string(),
1603 port: 0,
1604 tls_cert: None,
1605 tls_key: None,
1606 mtls_allowlist: None,
1607 shutdown_grace_secs: 30,
1608 quorum_writes: 0,
1609 quorum_peers: vec![],
1610 quorum_timeout_ms: 2000,
1611 quorum_client_cert: None,
1612 quorum_client_key: None,
1613 quorum_ca_cert: None,
1614 catchup_interval_secs: 0,
1615 }
1616 }
1617
1618 fn keyword_app_state(db_path: &Path) -> AppState {
1619 let conn = db::open(db_path).unwrap();
1620 let db_state: Db = Arc::new(Mutex::new((
1621 conn,
1622 db_path.to_path_buf(),
1623 ResolvedTtl::default(),
1624 true,
1625 )));
1626 AppState {
1627 db: db_state,
1628 embedder: Arc::new(None),
1629 vector_index: Arc::new(Mutex::new(None)),
1630 federation: Arc::new(None),
1631 tier_config: Arc::new(FeatureTier::Keyword.config()),
1632 scoring: Arc::new(crate::config::ResolvedScoring::default()),
1633 }
1634 }
1635
1636 fn env_var_lock() -> std::sync::MutexGuard<'static, ()> {
1640 use std::sync::OnceLock;
1641 static LOCK: OnceLock<std::sync::Mutex<()>> = OnceLock::new();
1642 LOCK.get_or_init(|| std::sync::Mutex::new(()))
1643 .lock()
1644 .unwrap_or_else(|e| e.into_inner())
1645 }
1646
1647 #[test]
1650 fn test_is_write_command_all_variants() {
1651 let writes: &[&[&str]] = &[
1658 &["ai-memory", "store", "title", "content"],
1659 &["ai-memory", "update", "id123", "--title", "t"],
1660 &["ai-memory", "delete", "id123"],
1661 &["ai-memory", "promote", "id123"],
1662 &["ai-memory", "forget", "pattern"],
1663 &["ai-memory", "link", "a", "b"],
1664 &["ai-memory", "consolidate", "ids"],
1665 &["ai-memory", "resolve", "a", "b"],
1666 &["ai-memory", "sync", "--peer", "/tmp/peer.db"],
1667 &[
1668 "ai-memory",
1669 "sync-daemon",
1670 "--peers",
1671 "http://x",
1672 "--interval-secs",
1673 "60",
1674 ],
1675 &["ai-memory", "import"],
1676 &["ai-memory", "auto-consolidate"],
1677 &["ai-memory", "gc"],
1678 ];
1679 let mut writes_checked = 0;
1680 for argv in writes {
1681 if let Ok(cli) = Cli::try_parse_from(*argv) {
1685 assert!(
1686 is_write_command(&cli.command),
1687 "expected write for {argv:?}"
1688 );
1689 writes_checked += 1;
1690 }
1691 }
1692 assert!(
1693 writes_checked >= 5,
1694 "expected at least 5 write variants checked, got {writes_checked}"
1695 );
1696
1697 let reads: &[&[&str]] = &[
1699 &["ai-memory", "mcp"],
1700 &["ai-memory", "recall", "context"],
1701 &["ai-memory", "search", "query"],
1702 &["ai-memory", "get", "id"],
1703 &["ai-memory", "list"],
1704 &["ai-memory", "stats"],
1705 &["ai-memory", "namespaces"],
1706 &["ai-memory", "export"],
1707 &["ai-memory", "shell"],
1708 &["ai-memory", "man"],
1709 &["ai-memory", "completions", "bash"],
1710 &["ai-memory", "archive", "list"],
1711 &["ai-memory", "agents", "list"],
1712 &["ai-memory", "pending", "list"],
1713 &["ai-memory", "bench"],
1714 &["ai-memory", "serve", "--host", "127.0.0.1", "--port", "0"],
1715 ];
1716 let mut reads_checked = 0;
1717 for argv in reads {
1718 if let Ok(cli) = Cli::try_parse_from(*argv) {
1719 assert!(
1720 !is_write_command(&cli.command),
1721 "expected read for {argv:?}"
1722 );
1723 reads_checked += 1;
1724 }
1725 }
1726 assert!(
1727 reads_checked >= 8,
1728 "expected at least 8 read variants checked, got {reads_checked}"
1729 );
1730
1731 assert!(is_write_command(&Command::Gc));
1735 assert!(!is_write_command(&Command::Stats));
1736 assert!(!is_write_command(&Command::Namespaces));
1737 assert!(!is_write_command(&Command::Export));
1738 assert!(!is_write_command(&Command::Shell));
1739 assert!(!is_write_command(&Command::Man));
1740 assert!(!is_write_command(&Command::Mcp {
1741 tier: "keyword".to_string()
1742 }));
1743 }
1744
1745 #[tokio::test]
1748 async fn test_router_has_health_endpoint() {
1749 let env = TestEnv::fresh();
1750 let app_state = keyword_app_state(&env.db_path);
1751 let api_key_state = ApiKeyState { key: None };
1752 let router = build_router(app_state, api_key_state);
1753 let resp = router
1754 .oneshot(
1755 Request::builder()
1756 .method("GET")
1757 .uri("/api/v1/health")
1758 .body(Body::empty())
1759 .unwrap(),
1760 )
1761 .await
1762 .unwrap();
1763 assert_eq!(resp.status(), StatusCode::OK);
1764 }
1765
1766 #[tokio::test]
1767 async fn test_router_has_metrics_at_both_paths() {
1768 let env = TestEnv::fresh();
1769 let app_state = keyword_app_state(&env.db_path);
1770 let api_key_state = ApiKeyState { key: None };
1771 let r1 = build_router(app_state.clone(), api_key_state.clone())
1773 .oneshot(
1774 Request::builder()
1775 .method("GET")
1776 .uri("/metrics")
1777 .body(Body::empty())
1778 .unwrap(),
1779 )
1780 .await
1781 .unwrap();
1782 assert_eq!(r1.status(), StatusCode::OK);
1783 let r2 = build_router(app_state, api_key_state)
1785 .oneshot(
1786 Request::builder()
1787 .method("GET")
1788 .uri("/api/v1/metrics")
1789 .body(Body::empty())
1790 .unwrap(),
1791 )
1792 .await
1793 .unwrap();
1794 assert_eq!(r2.status(), StatusCode::OK);
1795 }
1796
1797 #[tokio::test]
1798 async fn test_router_lists_all_v1_memory_routes() {
1799 let env = TestEnv::fresh();
1800 let app_state = keyword_app_state(&env.db_path);
1801 let api_key_state = ApiKeyState { key: None };
1802 let router = build_router(app_state, api_key_state);
1803 let resp = router
1804 .oneshot(
1805 Request::builder()
1806 .method("GET")
1807 .uri("/api/v1/memories")
1808 .body(Body::empty())
1809 .unwrap(),
1810 )
1811 .await
1812 .unwrap();
1813 assert!(resp.status().is_success(), "got {}", resp.status());
1816 }
1817
1818 #[tokio::test]
1819 async fn test_router_applies_api_key_middleware_when_key_set() {
1820 let env = TestEnv::fresh();
1821 let app_state = keyword_app_state(&env.db_path);
1822 let api_key_state = ApiKeyState {
1823 key: Some("s3cret".to_string()),
1824 };
1825 let router = build_router(app_state, api_key_state);
1826 let resp = router
1827 .oneshot(
1828 Request::builder()
1829 .method("GET")
1830 .uri("/api/v1/memories")
1831 .body(Body::empty())
1832 .unwrap(),
1833 )
1834 .await
1835 .unwrap();
1836 assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
1837 }
1838
1839 #[tokio::test]
1840 async fn test_router_skips_api_key_middleware_when_key_none() {
1841 let env = TestEnv::fresh();
1842 let app_state = keyword_app_state(&env.db_path);
1843 let api_key_state = ApiKeyState { key: None };
1844 let router = build_router(app_state, api_key_state);
1845 let resp = router
1846 .oneshot(
1847 Request::builder()
1848 .method("GET")
1849 .uri("/api/v1/memories")
1850 .body(Body::empty())
1851 .unwrap(),
1852 )
1853 .await
1854 .unwrap();
1855 assert_eq!(resp.status(), StatusCode::OK);
1856 }
1857
1858 #[tokio::test]
1861 async fn test_build_embedder_keyword_tier_returns_none() {
1862 let cfg = AppConfig::default();
1863 let emb = build_embedder(FeatureTier::Keyword, &cfg).await;
1864 assert!(emb.is_none());
1865 }
1866
1867 #[tokio::test]
1868 async fn test_build_embedder_load_failure_returns_none() {
1869 }
1877
1878 #[test]
1881 fn test_build_vector_index_no_embedder_returns_none() {
1882 let env = TestEnv::fresh();
1883 let conn = db::open(&env.db_path).unwrap();
1884 assert!(build_vector_index(&conn, false).is_none());
1885 }
1886
1887 #[test]
1888 fn test_build_vector_index_empty_db_returns_empty_index() {
1889 let env = TestEnv::fresh();
1890 let conn = db::open(&env.db_path).unwrap();
1891 let idx = build_vector_index(&conn, true);
1892 assert!(
1893 idx.is_some(),
1894 "empty DB with embedder must yield empty index"
1895 );
1896 assert_eq!(idx.unwrap().len(), 0);
1897 }
1898
1899 #[tokio::test(start_paused = true)]
1902 async fn test_spawn_gc_loop_runs_and_can_be_aborted() {
1903 let env = TestEnv::fresh();
1904 let conn = db::open(&env.db_path).unwrap();
1905 let state: Db = Arc::new(Mutex::new((
1906 conn,
1907 env.db_path.clone(),
1908 ResolvedTtl::default(),
1909 true,
1910 )));
1911 let h = spawn_gc_loop(state, None, Duration::from_secs(60));
1912 tokio::time::advance(Duration::from_secs(61)).await;
1917 tokio::task::yield_now().await;
1919 h.abort();
1920 let err = h.await.unwrap_err();
1922 assert!(err.is_cancelled());
1923 }
1924
1925 #[tokio::test(start_paused = true)]
1926 async fn test_spawn_wal_checkpoint_loop_runs_and_can_be_aborted() {
1927 let env = TestEnv::fresh();
1928 let conn = db::open(&env.db_path).unwrap();
1929 let state: Db = Arc::new(Mutex::new((
1930 conn,
1931 env.db_path.clone(),
1932 ResolvedTtl::default(),
1933 true,
1934 )));
1935 let h = spawn_wal_checkpoint_loop(state, Duration::from_secs(60));
1936 tokio::time::advance(Duration::from_secs(31)).await;
1939 tokio::task::yield_now().await;
1940 tokio::time::advance(Duration::from_secs(60)).await;
1941 tokio::task::yield_now().await;
1942 h.abort();
1943 let err = h.await.unwrap_err();
1944 assert!(err.is_cancelled());
1945 }
1946
1947 #[test]
1950 fn test_passphrase_strips_trailing_newline() {
1951 let dir = tempfile::tempdir().unwrap();
1952 let p = dir.path().join("pass");
1953 std::fs::write(&p, "secret\n").unwrap();
1954 assert_eq!(passphrase_from_file(&p).unwrap(), "secret");
1955 }
1956
1957 #[test]
1958 fn test_passphrase_strips_trailing_crlf() {
1959 let dir = tempfile::tempdir().unwrap();
1960 let p = dir.path().join("pass");
1961 std::fs::write(&p, "secret\r\n").unwrap();
1962 assert_eq!(passphrase_from_file(&p).unwrap(), "secret");
1963 }
1964
1965 #[test]
1966 fn test_passphrase_empty_file_errors() {
1967 let dir = tempfile::tempdir().unwrap();
1968 let p = dir.path().join("empty");
1969 std::fs::write(&p, "").unwrap();
1970 let err = passphrase_from_file(&p).unwrap_err();
1971 assert!(
1972 err.to_string().contains("empty"),
1973 "expected 'empty' error, got: {err}"
1974 );
1975 }
1976
1977 #[test]
1978 fn test_passphrase_empty_after_trim_errors() {
1979 let dir = tempfile::tempdir().unwrap();
1983 let p = dir.path().join("nl-only");
1984 std::fs::write(&p, "\n").unwrap();
1985 let err = passphrase_from_file(&p).unwrap_err();
1986 assert!(err.to_string().contains("empty"));
1987 }
1988
1989 #[test]
1990 fn test_passphrase_nonexistent_file_errors() {
1991 let dir = tempfile::tempdir().unwrap();
1992 let p = dir.path().join("does-not-exist");
1993 let err = passphrase_from_file(&p).unwrap_err();
1994 assert!(
1995 err.to_string().contains("reading passphrase file")
1996 || err.chain().any(|e| e.to_string().contains("No such file"))
1997 || err.chain().any(|e| e.to_string().contains("cannot find")),
1998 "got: {err:#}"
1999 );
2000 }
2001
2002 #[test]
2003 fn test_passphrase_preserves_internal_whitespace() {
2004 let dir = tempfile::tempdir().unwrap();
2005 let p = dir.path().join("pass");
2006 std::fs::write(&p, "my pass phrase\n").unwrap();
2007 assert_eq!(passphrase_from_file(&p).unwrap(), "my pass phrase");
2008 }
2009
2010 #[test]
2013 fn test_anonymize_set_when_config_true_and_env_unset() {
2014 let _g = env_var_lock();
2015 unsafe { std::env::remove_var("AI_MEMORY_ANONYMIZE") };
2017 let mut cfg = AppConfig::default();
2018 cfg.identity = Some(crate::config::IdentityConfig {
2019 anonymize_default: true,
2020 });
2021 apply_anonymize_default(&cfg);
2022 assert_eq!(std::env::var("AI_MEMORY_ANONYMIZE").unwrap(), "1");
2023 unsafe { std::env::remove_var("AI_MEMORY_ANONYMIZE") };
2025 }
2026
2027 #[test]
2028 fn test_anonymize_unchanged_when_env_already_set() {
2029 let _g = env_var_lock();
2030 unsafe { std::env::set_var("AI_MEMORY_ANONYMIZE", "0") };
2032 let mut cfg = AppConfig::default();
2033 cfg.identity = Some(crate::config::IdentityConfig {
2034 anonymize_default: true,
2035 });
2036 apply_anonymize_default(&cfg);
2037 assert_eq!(std::env::var("AI_MEMORY_ANONYMIZE").unwrap(), "0");
2039 unsafe { std::env::remove_var("AI_MEMORY_ANONYMIZE") };
2041 }
2042
2043 #[test]
2044 fn test_anonymize_unchanged_when_config_false() {
2045 let _g = env_var_lock();
2046 unsafe { std::env::remove_var("AI_MEMORY_ANONYMIZE") };
2048 let cfg = AppConfig::default();
2049 apply_anonymize_default(&cfg);
2051 assert!(std::env::var("AI_MEMORY_ANONYMIZE").is_err());
2052 }
2053
2054 #[tokio::test]
2057 async fn test_bootstrap_serve_keyword_tier_no_embedder() {
2058 let env = TestEnv::fresh();
2059 let mut cfg = AppConfig::default();
2060 cfg.tier = Some("keyword".to_string());
2061 let args = args_with_db(&env.db_path);
2062 let bs = bootstrap_serve(&env.db_path, &args, &cfg).await.unwrap();
2063 assert!(bs.app_state.embedder.is_none());
2065 let vi = bs.app_state.vector_index.lock().await;
2066 assert!(vi.is_none());
2067 assert_eq!(bs.task_handles.len(), 2);
2069 for h in bs.task_handles {
2071 h.abort();
2072 }
2073 }
2074
2075 #[tokio::test]
2076 async fn test_bootstrap_serve_with_api_key_logs_enabled() {
2077 let env = TestEnv::fresh();
2078 let mut cfg = AppConfig::default();
2079 cfg.tier = Some("keyword".to_string());
2080 cfg.api_key = Some("test-key".to_string());
2081 let args = args_with_db(&env.db_path);
2082 let bs = bootstrap_serve(&env.db_path, &args, &cfg).await.unwrap();
2083 assert_eq!(bs.api_key_state.key.as_deref(), Some("test-key"));
2084 for h in bs.task_handles {
2085 h.abort();
2086 }
2087 }
2088
2089 #[tokio::test]
2090 async fn test_bootstrap_serve_federation_disabled_when_quorum_zero() {
2091 let env = TestEnv::fresh();
2092 let mut cfg = AppConfig::default();
2093 cfg.tier = Some("keyword".to_string());
2094 let args = args_with_db(&env.db_path);
2095 let bs = bootstrap_serve(&env.db_path, &args, &cfg).await.unwrap();
2096 assert!(bs.app_state.federation.is_none());
2097 for h in bs.task_handles {
2098 h.abort();
2099 }
2100 }
2101
2102 #[tokio::test]
2114 async fn test_bootstrap_serve_federation_enabled_attaches_config() {
2115 let env = TestEnv::fresh();
2120 let mut cfg = AppConfig::default();
2121 cfg.tier = Some("keyword".to_string());
2122 let mut args = args_with_db(&env.db_path);
2123 args.quorum_writes = 1;
2124 args.quorum_peers = vec!["http://127.0.0.1:65530".to_string()];
2125 args.quorum_timeout_ms = 100;
2126 args.catchup_interval_secs = 0;
2127 let bs = bootstrap_serve(&env.db_path, &args, &cfg).await.unwrap();
2128 assert!(bs.app_state.federation.is_some());
2129 for h in bs.task_handles {
2130 h.abort();
2131 }
2132 }
2133
2134 #[tokio::test]
2135 async fn test_bootstrap_serve_federation_enabled_with_catchup_loop() {
2136 let env = TestEnv::fresh();
2142 let mut cfg = AppConfig::default();
2143 cfg.tier = Some("keyword".to_string());
2144 let mut args = args_with_db(&env.db_path);
2145 args.quorum_writes = 1;
2146 args.quorum_peers = vec!["http://127.0.0.1:65531".to_string()];
2147 args.quorum_timeout_ms = 100;
2148 args.catchup_interval_secs = 3600; let bs = bootstrap_serve(&env.db_path, &args, &cfg).await.unwrap();
2150 assert!(bs.app_state.federation.is_some());
2151 for h in bs.task_handles {
2152 h.abort();
2153 }
2154 }
2155
2156 #[tokio::test]
2157 async fn test_bootstrap_serve_federation_invalid_peer_errors() {
2158 let env = TestEnv::fresh();
2162 let mut cfg = AppConfig::default();
2163 cfg.tier = Some("keyword".to_string());
2164 let mut args = args_with_db(&env.db_path);
2165 args.quorum_writes = 1;
2166 args.quorum_peers = vec![
2167 "http://127.0.0.1:65532".to_string(),
2168 "http://127.0.0.1:65532/".to_string(), ];
2170 let res = bootstrap_serve(&env.db_path, &args, &cfg).await;
2171 let err = match res {
2172 Ok(_) => panic!("expected error from duplicate peer URLs"),
2173 Err(e) => e,
2174 };
2175 let s = format!("{err:#}");
2176 assert!(
2177 s.contains("federation") || s.contains("duplicate"),
2178 "got: {s}"
2179 );
2180 }
2181
2182 #[test]
2185 fn test_build_vector_index_populated_db_returns_built_index() {
2186 let env = TestEnv::fresh();
2190 let conn = db::open(&env.db_path).unwrap();
2191 let now = chrono::Utc::now().to_rfc3339();
2193 let mem = crate::models::Memory {
2194 id: uuid::Uuid::new_v4().to_string(),
2195 tier: crate::models::Tier::Mid,
2196 namespace: "ns".to_string(),
2197 title: "t".to_string(),
2198 content: "c".to_string(),
2199 tags: vec![],
2200 priority: 5,
2201 confidence: 1.0,
2202 source: "test".to_string(),
2203 access_count: 0,
2204 created_at: now.clone(),
2205 updated_at: now,
2206 last_accessed_at: None,
2207 expires_at: None,
2208 metadata: crate::models::default_metadata(),
2209 };
2210 let id = db::insert(&conn, &mem).unwrap();
2211 db::set_embedding(&conn, &id, &[1.0, 0.0, 0.0]).unwrap();
2212 let idx = build_vector_index(&conn, true).expect("populated index");
2213 assert!(
2214 idx.len() >= 1,
2215 "expected non-empty index, got len={}",
2216 idx.len()
2217 );
2218 }
2219
2220 #[tokio::test(start_paused = true)]
2228 async fn test_spawn_gc_loop_purges_expired_memories() {
2229 let env = TestEnv::fresh();
2230 let conn = db::open(&env.db_path).unwrap();
2231 let past = (chrono::Utc::now() - chrono::Duration::days(1)).to_rfc3339();
2233 let now = chrono::Utc::now().to_rfc3339();
2234 let mem = crate::models::Memory {
2235 id: uuid::Uuid::new_v4().to_string(),
2236 tier: crate::models::Tier::Short,
2237 namespace: "ns-gc".to_string(),
2238 title: "stale".to_string(),
2239 content: "stale".to_string(),
2240 tags: vec![],
2241 priority: 1,
2242 confidence: 1.0,
2243 source: "test".to_string(),
2244 access_count: 0,
2245 created_at: now.clone(),
2246 updated_at: now,
2247 last_accessed_at: None,
2248 expires_at: Some(past),
2249 metadata: crate::models::default_metadata(),
2250 };
2251 db::insert(&conn, &mem).unwrap();
2252 drop(conn);
2253
2254 let conn = db::open(&env.db_path).unwrap();
2255 let state: Db = Arc::new(Mutex::new((
2256 conn,
2257 env.db_path.clone(),
2258 ResolvedTtl::default(),
2259 true,
2260 )));
2261 let h = spawn_gc_loop(state.clone(), Some(1), Duration::from_secs(60));
2264 tokio::time::advance(Duration::from_secs(61)).await;
2267 tokio::task::yield_now().await;
2268 tokio::time::advance(Duration::from_secs(61)).await;
2269 tokio::task::yield_now().await;
2270 h.abort();
2271 let _ = h.await;
2272 }
2273
2274 #[tokio::test(start_paused = true)]
2277 async fn test_spawn_wal_checkpoint_loop_runs_multiple_cycles() {
2278 let env = TestEnv::fresh();
2279 let conn = db::open(&env.db_path).unwrap();
2280 let state: Db = Arc::new(Mutex::new((
2281 conn,
2282 env.db_path.clone(),
2283 ResolvedTtl::default(),
2284 true,
2285 )));
2286 let h = spawn_wal_checkpoint_loop(state, Duration::from_secs(2));
2287 for _ in 0..4 {
2290 tokio::time::advance(Duration::from_secs(2)).await;
2291 tokio::task::yield_now().await;
2292 }
2293 h.abort();
2294 let _ = h.await;
2295 }
2296
2297 #[test]
2300 fn test_urlencoding_minimal_round_trip() {
2301 assert_eq!(urlencoding_minimal("abcXYZ-_.~"), "abcXYZ-_.~");
2303 assert_eq!(urlencoding_minimal("0123456789"), "0123456789");
2304 assert_eq!(urlencoding_minimal("a:b"), "a%3Ab");
2306 assert_eq!(urlencoding_minimal("a/b"), "a%2Fb");
2307 assert_eq!(urlencoding_minimal("a@b"), "a%40b");
2308 assert_eq!(urlencoding_minimal("a+b"), "a%2Bb");
2309 assert_eq!(urlencoding_minimal(" "), "%20");
2310 assert_eq!(urlencoding_minimal(""), "");
2312 assert_eq!(
2314 urlencoding_minimal("2024-01-02T03:04:05+00:00"),
2315 "2024-01-02T03%3A04%3A05%2B00%3A00"
2316 );
2317 }
2318
2319 fn no_config_env() -> std::sync::MutexGuard<'static, ()> {
2328 env_var_lock()
2333 }
2334
2335 #[tokio::test]
2336 async fn test_run_dispatch_stats_command() {
2337 let _g = no_config_env();
2338 let env = TestEnv::fresh();
2339 let cfg = AppConfig::default();
2340 let cli =
2341 Cli::try_parse_from(["ai-memory", "--db", env.db_path.to_str().unwrap(), "stats"])
2342 .unwrap();
2343 run(cli, &cfg).await.unwrap();
2344 }
2345
2346 #[tokio::test]
2347 async fn test_run_dispatch_namespaces_command() {
2348 let _g = no_config_env();
2349 let env = TestEnv::fresh();
2350 let cfg = AppConfig::default();
2351 let cli = Cli::try_parse_from([
2352 "ai-memory",
2353 "--db",
2354 env.db_path.to_str().unwrap(),
2355 "namespaces",
2356 ])
2357 .unwrap();
2358 run(cli, &cfg).await.unwrap();
2359 }
2360
2361 #[tokio::test]
2362 async fn test_run_dispatch_export_command() {
2363 let _g = no_config_env();
2364 let env = TestEnv::fresh();
2365 let cfg = AppConfig::default();
2366 let cli =
2367 Cli::try_parse_from(["ai-memory", "--db", env.db_path.to_str().unwrap(), "export"])
2368 .unwrap();
2369 run(cli, &cfg).await.unwrap();
2370 }
2371
2372 #[tokio::test]
2373 async fn test_run_dispatch_list_command() {
2374 let _g = no_config_env();
2375 let env = TestEnv::fresh();
2376 let cfg = AppConfig::default();
2377 let cli = Cli::try_parse_from(["ai-memory", "--db", env.db_path.to_str().unwrap(), "list"])
2378 .unwrap();
2379 run(cli, &cfg).await.unwrap();
2380 }
2381
2382 #[tokio::test]
2383 async fn test_run_dispatch_search_command() {
2384 let _g = no_config_env();
2385 let env = TestEnv::fresh();
2386 let cfg = AppConfig::default();
2387 let cli = Cli::try_parse_from([
2388 "ai-memory",
2389 "--db",
2390 env.db_path.to_str().unwrap(),
2391 "search",
2392 "anyq",
2393 ])
2394 .unwrap();
2395 run(cli, &cfg).await.unwrap();
2396 }
2397
2398 #[tokio::test]
2399 async fn test_run_dispatch_archive_list_command() {
2400 let _g = no_config_env();
2401 let env = TestEnv::fresh();
2402 let cfg = AppConfig::default();
2403 let cli = Cli::try_parse_from([
2404 "ai-memory",
2405 "--db",
2406 env.db_path.to_str().unwrap(),
2407 "archive",
2408 "list",
2409 ])
2410 .unwrap();
2411 run(cli, &cfg).await.unwrap();
2412 }
2413
2414 #[tokio::test]
2415 async fn test_run_dispatch_agents_list_command() {
2416 let _g = no_config_env();
2417 let env = TestEnv::fresh();
2418 let cfg = AppConfig::default();
2419 let cli = Cli::try_parse_from([
2420 "ai-memory",
2421 "--db",
2422 env.db_path.to_str().unwrap(),
2423 "agents",
2424 "list",
2425 ])
2426 .unwrap();
2427 run(cli, &cfg).await.unwrap();
2428 }
2429
2430 #[tokio::test]
2431 async fn test_run_dispatch_pending_list_command() {
2432 let _g = no_config_env();
2433 let env = TestEnv::fresh();
2434 let cfg = AppConfig::default();
2435 let cli = Cli::try_parse_from([
2436 "ai-memory",
2437 "--db",
2438 env.db_path.to_str().unwrap(),
2439 "pending",
2440 "list",
2441 ])
2442 .unwrap();
2443 run(cli, &cfg).await.unwrap();
2444 }
2445
2446 #[tokio::test]
2447 async fn test_run_dispatch_completions_command() {
2448 let _g = no_config_env();
2449 let env = TestEnv::fresh();
2450 let cfg = AppConfig::default();
2451 let cli = Cli::try_parse_from([
2452 "ai-memory",
2453 "--db",
2454 env.db_path.to_str().unwrap(),
2455 "completions",
2456 "bash",
2457 ])
2458 .unwrap();
2459 run(cli, &cfg).await.unwrap();
2460 }
2461
2462 #[tokio::test]
2463 async fn test_run_dispatch_man_command() {
2464 let _g = no_config_env();
2465 let env = TestEnv::fresh();
2466 let cfg = AppConfig::default();
2467 let cli = Cli::try_parse_from(["ai-memory", "--db", env.db_path.to_str().unwrap(), "man"])
2468 .unwrap();
2469 run(cli, &cfg).await.unwrap();
2470 }
2471
2472 #[tokio::test]
2473 async fn test_run_dispatch_gc_triggers_post_run_checkpoint() {
2474 let _g = no_config_env();
2477 let env = TestEnv::fresh();
2478 let cfg = AppConfig::default();
2479 let cli = Cli::try_parse_from(["ai-memory", "--db", env.db_path.to_str().unwrap(), "gc"])
2480 .unwrap();
2481 run(cli, &cfg).await.unwrap();
2482 }
2483
2484 #[tokio::test]
2485 async fn test_run_dispatch_resolve_command() {
2486 let _g = no_config_env();
2488 let env = TestEnv::fresh();
2489 let id_a = crate::cli::test_utils::seed_memory(&env.db_path, "ns", "old", "old fact");
2490 let id_b = crate::cli::test_utils::seed_memory(&env.db_path, "ns", "new", "new fact");
2491 let cfg = AppConfig::default();
2492 let cli = Cli::try_parse_from([
2493 "ai-memory",
2494 "--db",
2495 env.db_path.to_str().unwrap(),
2496 "resolve",
2497 &id_a,
2498 &id_b,
2499 ])
2500 .unwrap();
2501 run(cli, &cfg).await.unwrap();
2502 }
2503
2504 #[tokio::test]
2505 async fn test_run_dispatch_get_command() {
2506 let _g = no_config_env();
2507 let env = TestEnv::fresh();
2508 let id = crate::cli::test_utils::seed_memory(&env.db_path, "ns", "t", "c");
2509 let cfg = AppConfig::default();
2510 let cli = Cli::try_parse_from([
2511 "ai-memory",
2512 "--db",
2513 env.db_path.to_str().unwrap(),
2514 "get",
2515 &id,
2516 ])
2517 .unwrap();
2518 run(cli, &cfg).await.unwrap();
2519 }
2520
2521 #[tokio::test]
2522 async fn test_run_dispatch_promote_triggers_write_checkpoint() {
2523 let _g = no_config_env();
2526 let env = TestEnv::fresh();
2527 let id = crate::cli::test_utils::seed_memory(&env.db_path, "ns", "t", "c");
2528 let cfg = AppConfig::default();
2529 let cli = Cli::try_parse_from([
2530 "ai-memory",
2531 "--db",
2532 env.db_path.to_str().unwrap(),
2533 "promote",
2534 &id,
2535 ])
2536 .unwrap();
2537 run(cli, &cfg).await.unwrap();
2538 }
2539
2540 #[tokio::test]
2543 async fn test_run_dispatch_bench_smoke_runs_one_iteration() {
2544 let _g = no_config_env();
2549 let env = TestEnv::fresh();
2550 let cfg = AppConfig::default();
2551 let cli = Cli::try_parse_from([
2552 "ai-memory",
2553 "--db",
2554 env.db_path.to_str().unwrap(),
2555 "bench",
2556 "--iterations",
2557 "1",
2558 "--warmup",
2559 "0",
2560 ])
2561 .unwrap();
2562 let _ = run(cli, &cfg).await;
2565 }
2566
2567 #[tokio::test]
2568 async fn test_run_dispatch_bench_json_with_history() {
2569 let _g = no_config_env();
2571 let env = TestEnv::fresh();
2572 let history = env.db_path.with_file_name("hist.jsonl");
2573 let cfg = AppConfig::default();
2574 let cli = Cli::try_parse_from([
2575 "ai-memory",
2576 "--db",
2577 env.db_path.to_str().unwrap(),
2578 "bench",
2579 "--iterations",
2580 "1",
2581 "--warmup",
2582 "0",
2583 "--json",
2584 "--history",
2585 history.to_str().unwrap(),
2586 ])
2587 .unwrap();
2588 let _ = run(cli, &cfg).await;
2589 if history.exists() {
2591 let content = std::fs::read_to_string(&history).unwrap();
2592 assert!(content.contains("captured_at") || !content.is_empty());
2593 }
2594 }
2595
2596 #[cfg(feature = "sal")]
2599 #[tokio::test]
2600 async fn test_run_dispatch_migrate_sqlite_to_sqlite_dry_run() {
2601 let _g = no_config_env();
2603 let src_env = TestEnv::fresh();
2604 let dst_env = TestEnv::fresh();
2605 crate::cli::test_utils::seed_memory(&src_env.db_path, "ns-mig", "t", "c");
2607 let from = format!("sqlite://{}", src_env.db_path.display());
2608 let to = format!("sqlite://{}", dst_env.db_path.display());
2609 let cfg = AppConfig::default();
2610 let cli = Cli::try_parse_from([
2611 "ai-memory",
2612 "--db",
2613 src_env.db_path.to_str().unwrap(),
2614 "migrate",
2615 "--from",
2616 &from,
2617 "--to",
2618 &to,
2619 "--dry-run",
2620 ])
2621 .unwrap();
2622 run(cli, &cfg).await.unwrap();
2623 }
2624
2625 #[cfg(feature = "sal")]
2626 #[tokio::test]
2627 async fn test_run_dispatch_migrate_json_output() {
2628 let _g = no_config_env();
2630 let src_env = TestEnv::fresh();
2631 let dst_env = TestEnv::fresh();
2632 crate::cli::test_utils::seed_memory(&src_env.db_path, "ns-mig", "t", "c");
2633 let from = format!("sqlite://{}", src_env.db_path.display());
2634 let to = format!("sqlite://{}", dst_env.db_path.display());
2635 let cfg = AppConfig::default();
2636 let cli = Cli::try_parse_from([
2637 "ai-memory",
2638 "--db",
2639 src_env.db_path.to_str().unwrap(),
2640 "migrate",
2641 "--from",
2642 &from,
2643 "--to",
2644 &to,
2645 "--json",
2646 ])
2647 .unwrap();
2648 run(cli, &cfg).await.unwrap();
2649 }
2650
2651 #[tokio::test]
2654 async fn test_run_with_db_passphrase_file_exports_env() {
2655 let _g = env_var_lock();
2659 unsafe { std::env::remove_var("AI_MEMORY_DB_PASSPHRASE") };
2661 let env = TestEnv::fresh();
2662 let pass_path = env.db_path.with_file_name("pass");
2663 std::fs::write(&pass_path, "test-passphrase\n").unwrap();
2664 let cfg = AppConfig::default();
2665 let cli = Cli::try_parse_from([
2666 "ai-memory",
2667 "--db",
2668 env.db_path.to_str().unwrap(),
2669 "--db-passphrase-file",
2670 pass_path.to_str().unwrap(),
2671 "stats",
2672 ])
2673 .unwrap();
2674 run(cli, &cfg).await.unwrap();
2675 assert_eq!(
2677 std::env::var("AI_MEMORY_DB_PASSPHRASE").unwrap(),
2678 "test-passphrase"
2679 );
2680 unsafe { std::env::remove_var("AI_MEMORY_DB_PASSPHRASE") };
2682 }
2683
2684 #[test]
2687 fn test_init_tracing_is_idempotent() {
2688 init_tracing();
2693 init_tracing();
2694 }
2695
2696 #[tokio::test]
2704 async fn test_serve_http_with_shutdown_future_serves_then_stops() {
2705 let env = TestEnv::fresh();
2706 let app_state = keyword_app_state(&env.db_path);
2707 let api_key_state = ApiKeyState { key: None };
2708 let port = {
2710 let l = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
2711 let p = l.local_addr().unwrap().port();
2712 drop(l);
2713 p
2714 };
2715 let addr = format!("127.0.0.1:{port}");
2716 let shutdown = Arc::new(Notify::new());
2717 let shutdown_clone = shutdown.clone();
2718 let handle = tokio::spawn(async move {
2719 serve_http_with_shutdown_future(&addr, api_key_state, app_state, async move {
2720 shutdown_clone.notified().await;
2721 })
2722 .await
2723 });
2724 for _ in 0..40 {
2726 if let Ok(client) = reqwest::Client::builder()
2727 .timeout(Duration::from_millis(200))
2728 .build()
2729 && client
2730 .get(format!("http://127.0.0.1:{port}/api/v1/health"))
2731 .send()
2732 .await
2733 .is_ok()
2734 {
2735 break;
2736 }
2737 tokio::time::sleep(Duration::from_millis(50)).await;
2738 }
2739 shutdown.notify_one();
2740 let res = handle.await.unwrap();
2741 assert!(res.is_ok(), "serve future returned: {res:?}");
2742 }
2743
2744 #[tokio::test]
2747 async fn test_serve_http_with_shutdown_future_bind_failure_errors() {
2748 let env = TestEnv::fresh();
2752 let app_state = keyword_app_state(&env.db_path);
2753 let api_key_state = ApiKeyState { key: None };
2754 let res = serve_http_with_shutdown_future(
2759 "definitely-not-an-address:99999",
2760 api_key_state,
2761 app_state,
2762 async {},
2763 )
2764 .await;
2765 assert!(res.is_err(), "expected bind error, got: {res:?}");
2766 }
2767}