#![forbid(unsafe_code)]
#![deny(missing_docs)]
use std::path::Path;
use std::sync::{Arc, Mutex};
use anyhow::Result;
use axum::Router;
use axum::extract::DefaultBodyLimit;
use axum::routing::{get, post};
use mnem_backend_redb::open_or_init;
use mnem_core::repo::ReadonlyRepo;
use tower_http::cors::{Any, CorsLayer};
use tower_http::trace::TraceLayer;
mod auth;
mod correlation;
mod error;
mod handlers;
mod handlers_ingest;
mod metrics;
mod routes;
mod state;
pub use error::{Error, RemoteError};
pub use handlers::derive_max_path_bytes;
pub use metrics::Metrics;
pub use state::AppState;
pub mod leiden_state {
pub use crate::state::{
COMMIT_LATENCY_WINDOW, COMMIT_STORM_CAP_PER_MIN, DEBOUNCE_FLOOR_MS, DELTA_RATIO_FORCE_FULL,
GRAPH_SIZE_GATE_V, LeidenCache, LeidenMode, derive_debounce_ms,
};
}
#[derive(Clone, Debug, Default)]
pub struct AppOptions {
pub allow_labels: Option<bool>,
pub in_memory: bool,
pub metrics_enabled: bool,
}
pub fn app(repo_dir: &Path) -> Result<Router> {
app_with_options(repo_dir, AppOptions::default())
}
pub fn route_table(metrics_enabled: bool) -> Vec<(&'static str, &'static str, &'static str)> {
let mut routes: Vec<(&'static str, &'static str, &'static str)> = vec![
("GET", "/v1/healthz", "liveness probe"),
(
"GET",
"/v1/stats",
"head op-id, commit CID, ref + label counts",
),
("POST", "/v1/nodes", "commit a new node"),
(
"POST",
"/v1/nodes/bulk",
"commit N nodes in one transaction",
),
("GET/DELETE", "/v1/nodes/{id}", "fetch / delete a node"),
("POST", "/v1/nodes/{id}/tombstone", "tombstone a node"),
("GET/POST", "/v1/retrieve", "agent-facing retrieval"),
(
"POST",
"/v1/ingest",
"ingest a Markdown / PDF / JSON source",
),
("POST", "/v1/explain", "explain a retrieve result"),
(
"POST",
"/v1/traverse_answer",
"single-call multihop (gated)",
),
("GET", "/remote/v1/refs", "transport: list refs"),
("POST", "/remote/v1/fetch-blocks", "transport: fetch blocks"),
(
"POST",
"/remote/v1/push-blocks",
"transport: push blocks (auth)",
),
(
"POST",
"/remote/v1/advance-head",
"transport: advance head (auth)",
),
];
if metrics_enabled {
routes.push(("GET", "/metrics", "Prometheus text-exposition"));
}
routes
}
pub fn app_with_options(repo_dir: &Path, opts: AppOptions) -> Result<Router> {
let data_dir = if repo_dir.ends_with(".mnem") {
repo_dir.to_path_buf()
} else {
repo_dir.join(".mnem")
};
std::fs::create_dir_all(&data_dir)?;
let (bs, ohs): (
std::sync::Arc<dyn mnem_core::store::Blockstore>,
std::sync::Arc<dyn mnem_core::store::OpHeadsStore>,
) = if opts.in_memory {
eprintln!(
"mnem http: --in-memory ACTIVE. All commits are RAM-only and lost on process exit. This is intended for benchmarks and ephemeral sessions; NEVER use in a durable deployment."
);
(
std::sync::Arc::new(mnem_core::store::MemoryBlockstore::new()),
std::sync::Arc::new(mnem_core::store::MemoryOpHeadsStore::new()),
)
} else {
let (bs, ohs, _file) = open_or_init(&data_dir.join("repo.redb"))?;
(bs as _, ohs as _)
};
let repo = ReadonlyRepo::open(bs.clone(), ohs.clone()).or_else(|e| {
if e.is_uninitialized() {
ReadonlyRepo::init(bs.clone(), ohs.clone())
} else {
Err(e)
}
})?;
let embed_cfg = load_embed_config(&data_dir);
let sparse_cfg = load_sparse_config(&data_dir);
let ner_cfg = load_ner_config(&data_dir);
let allow_labels = opts
.allow_labels
.unwrap_or_else(AppState::resolve_allow_labels_from_env);
if allow_labels && opts.allow_labels.is_none() {
eprintln!(
"mnem http: MNEM_BENCH set; caller-supplied `label` fields will be honoured on ingest and retrieve."
);
}
let push_token = AppState::resolve_push_token_from_env();
if push_token.is_some() {
tracing::info!(
"mnem http: MNEM_HTTP_PUSH_TOKEN configured; /remote/v1/push-blocks + /remote/v1/advance-head enabled."
);
} else {
tracing::info!(
"mnem http: MNEM_HTTP_PUSH_TOKEN not set; remote write verbs administratively disabled (503)."
);
}
let state = AppState {
repo: Arc::new(Mutex::new(repo)),
embed_cfg,
sparse_cfg,
indexes: Arc::new(Mutex::new(state::IndexCache::default())),
allow_labels,
metrics: Metrics::new(),
push_token,
graph_cache: Arc::new(Mutex::new(state::GraphCache::default())),
traverse_cfg: Arc::new(routes::traverse::TraverseAnswerCfg::default()),
ner_cfg,
};
let cors = CorsLayer::new()
.allow_methods(Any)
.allow_headers(Any)
.allow_origin(Any);
let body_limit_bytes: usize = std::env::var("MNEM_MAX_BODY_MB")
.ok()
.and_then(|s| s.parse::<usize>().ok())
.unwrap_or(64)
.saturating_mul(1024 * 1024);
let mut router = Router::new()
.route("/v1/healthz", get(handlers::healthz))
.route("/v1/stats", get(handlers::stats))
.route("/v1/nodes", post(handlers::post_node))
.route("/v1/nodes/bulk", post(handlers::post_nodes_bulk))
.route(
"/v1/nodes/{id}",
get(handlers::get_node).delete(handlers::delete_node),
)
.route("/v1/nodes/{id}/tombstone", post(handlers::tombstone_node))
.route(
"/v1/retrieve",
get(handlers::retrieve).post(handlers::retrieve_full),
)
.route("/v1/ingest", post(handlers_ingest::ingest))
.route("/v1/explain", post(handlers::explain))
.route(
"/v1/traverse_answer",
post(routes::traverse::traverse_answer),
)
.route("/remote/v1/refs", get(routes::remote::get_refs))
.route(
"/remote/v1/fetch-blocks",
post(routes::remote::post_fetch_blocks),
)
.route(
"/remote/v1/push-blocks",
post(routes::remote::post_push_blocks),
)
.route(
"/remote/v1/advance-head",
post(routes::remote::post_advance_head),
);
if opts.metrics_enabled {
router = router.route("/metrics", get(metrics::metrics_handler));
}
Ok(router
.layer(axum::middleware::from_fn_with_state(
state.clone(),
metrics::track_metrics,
))
.layer(DefaultBodyLimit::max(body_limit_bytes))
.layer(axum::middleware::from_fn(error::json_rejection_envelope))
.layer(cors)
.layer(TraceLayer::new_for_http())
.layer(axum::middleware::from_fn(correlation::correlation_id))
.with_state(state))
}
fn load_embed_config(data_dir: &Path) -> Option<mnem_embed_providers::ProviderConfig> {
#[derive(serde::Deserialize)]
struct MiniCfg {
embed: Option<mnem_embed_providers::ProviderConfig>,
}
let path = data_dir.join("config.toml");
let s = std::fs::read_to_string(&path).ok()?;
match toml::from_str::<MiniCfg>(&s) {
Ok(parsed) => parsed.embed,
Err(e) => {
tracing::warn!(
path = %path.display(),
error = %e,
"config.toml [embed] parse failed; auto-embed disabled"
);
None
}
}
}
fn load_sparse_config(data_dir: &Path) -> Option<mnem_sparse_providers::ProviderConfig> {
#[derive(serde::Deserialize)]
struct MiniCfg {
sparse: Option<mnem_sparse_providers::ProviderConfig>,
}
let path = data_dir.join("config.toml");
let s = std::fs::read_to_string(&path).ok()?;
match toml::from_str::<MiniCfg>(&s) {
Ok(parsed) => parsed.sparse,
Err(e) => {
tracing::warn!(
path = %path.display(),
error = %e,
"config.toml [sparse] parse failed; sparse auto-encode disabled"
);
None
}
}
}
fn load_ner_config(data_dir: &Path) -> Option<mnem_ingest::NerConfig> {
if let Ok(p) = std::env::var("MNEM_NER_PROVIDER") {
return Some(match p.to_ascii_lowercase().as_str() {
"none" => mnem_ingest::NerConfig::None,
_ => mnem_ingest::NerConfig::Rule,
});
}
#[derive(serde::Deserialize)]
struct MiniCfg {
ner: Option<mnem_ingest::NerConfig>,
}
let path = data_dir.join("config.toml");
let s = std::fs::read_to_string(&path).ok()?;
match toml::from_str::<MiniCfg>(&s) {
Ok(parsed) => parsed.ner,
Err(e) => {
tracing::warn!(
path = %path.display(),
error = %e,
"config.toml [ner] parse failed; NER defaults to rule-based"
);
None
}
}
}