#![allow(dead_code)]
use std::time::Instant;
use axum::Json;
use axum::extract::State;
use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
use mnem_core::guard::{CommitBudgetGuard, Decision};
use mnem_core::id::{CODEC_RAW, Cid, Multihash};
use serde::{Deserialize, Serialize};
use serde_json::json;
use crate::state::AppState;
#[derive(Clone, Debug)]
pub struct TraverseAnswerCfg {
pub hard_wall_budget_ms: u32,
pub max_hops: u32,
pub experimental_enabled: bool,
}
impl TraverseAnswerCfg {
pub const HARD_WALL_MIN_MS: u32 = 500;
pub const HARD_WALL_MAX_MS: u32 = 30_000;
pub const MAX_HOPS_CEILING: u32 = 16;
#[must_use]
pub fn clamped(mut self) -> Self {
self.hard_wall_budget_ms = self
.hard_wall_budget_ms
.clamp(Self::HARD_WALL_MIN_MS, Self::HARD_WALL_MAX_MS);
self.max_hops = self.max_hops.clamp(1, Self::MAX_HOPS_CEILING);
self
}
}
impl Default for TraverseAnswerCfg {
fn default() -> Self {
Self {
hard_wall_budget_ms: 5000,
max_hops: 3,
experimental_enabled: false,
}
}
}
#[derive(Debug, Default, Deserialize)]
pub(crate) struct ExperimentalSection {
#[serde(default)]
pub(crate) single_call_multihop: Option<bool>,
#[serde(default)]
pub(crate) traverse_answer_hard_wall_ms: Option<u32>,
#[serde(default)]
pub(crate) traverse_answer_max_hops: Option<u32>,
}
impl ExperimentalSection {
#[must_use]
pub(crate) fn into_cfg(self) -> TraverseAnswerCfg {
let base = TraverseAnswerCfg::default();
TraverseAnswerCfg {
hard_wall_budget_ms: self
.traverse_answer_hard_wall_ms
.unwrap_or(base.hard_wall_budget_ms),
max_hops: self.traverse_answer_max_hops.unwrap_or(base.max_hops),
experimental_enabled: self.single_call_multihop.unwrap_or(false),
}
.clamped()
}
}
#[must_use]
pub(crate) fn derive_concurrency_cap() -> usize {
let n = std::thread::available_parallelism()
.map(std::num::NonZeroUsize::get)
.unwrap_or(1);
(n * 3 / 4).max(2)
}
#[must_use]
pub(crate) fn derive_cold_start_budget_ms(rolling_p95_ms: Option<u32>) -> u32 {
rolling_p95_ms.map_or(500, |p| p.clamp(200, 2000))
}
#[derive(Debug, Deserialize)]
pub(crate) struct TraverseAnswerRequest {
#[serde(default)]
pub(crate) text: Option<String>,
#[serde(default)]
pub(crate) max_hops: Option<u32>,
#[serde(default)]
pub(crate) budget_ms: Option<u32>,
}
#[derive(Debug, Serialize)]
pub(crate) struct TraverseAnswerResponse {
pub(crate) schema: &'static str,
pub(crate) hops_executed: u32,
pub(crate) elapsed_ms: u32,
pub(crate) hard_wall_ms_effective: u32,
pub(crate) hard_wall_cutoff: bool,
pub(crate) budget_breached: bool,
}
pub(crate) async fn traverse_answer(
State(state): State<AppState>,
Json(req): Json<TraverseAnswerRequest>,
) -> Response {
if !state.traverse_cfg.experimental_enabled {
return (
StatusCode::GONE,
Json(json!({
"schema": "mnem.v1.err",
"error": "traverse_answer: experimental endpoint disabled",
"remediation_ref":
"docs/warnings/traverse_answer_experimental_opt_in.md",
"opt_in":
"set `[experimental] single_call_multihop = true` in config.toml",
})),
)
.into_response();
}
let cfg = &state.traverse_cfg;
let effective_hard_wall_ms = match req.budget_ms {
Some(caller) => caller.clamp(TraverseAnswerCfg::HARD_WALL_MIN_MS, cfg.hard_wall_budget_ms),
None => cfg.hard_wall_budget_ms,
};
let effective_max_hops = match req.max_hops {
Some(caller) => caller.clamp(1, cfg.max_hops),
None => cfg.max_hops,
};
state
.metrics
.traverse_answer_hard_wall_ms_effective
.set(i64::from(effective_hard_wall_ms));
state
.metrics
.traverse_answer_max_hops_effective
.set(i64::from(effective_max_hops));
let anchor_cid = synth_cid(&req, effective_hard_wall_ms, effective_max_hops);
let soft_budget_ms = (effective_hard_wall_ms * 4) / 5;
let mut guard = CommitBudgetGuard::start(
"gap-09-traverse-answer",
soft_budget_ms,
effective_hard_wall_ms,
anchor_cid,
);
let start = Instant::now();
let mut hops_executed: u32 = 0;
let mut hard_wall_cutoff = false;
for hop in 0..effective_max_hops {
let hop_stage_tag: &'static str = match hop {
0 => "hop-0",
1 => "hop-1",
2 => "hop-2",
_ => "hop-n",
};
tokio::task::yield_now().await;
match guard.charge(hop_stage_tag) {
Ok(Decision::Proceed) => {
hops_executed = hop.saturating_add(1);
}
Ok(Decision::ShouldDefer) => {
hops_executed = hop.saturating_add(1);
break;
}
Err(_hard_wall) => {
hard_wall_cutoff = true;
state.metrics.traverse_answer_hard_wall_exceeded.inc();
break;
}
}
}
let elapsed_ms = u32::try_from(start.elapsed().as_millis()).unwrap_or(u32::MAX);
if !hard_wall_cutoff && elapsed_ms > effective_hard_wall_ms {
hard_wall_cutoff = true;
state.metrics.traverse_answer_hard_wall_exceeded.inc();
}
let report = guard.into_report();
let budget_breached = report.breached && !hard_wall_cutoff;
Json(TraverseAnswerResponse {
schema: "mnem.v1.traverse_answer",
hops_executed,
elapsed_ms,
hard_wall_ms_effective: effective_hard_wall_ms,
hard_wall_cutoff,
budget_breached,
})
.into_response()
}
fn synth_cid(req: &TraverseAnswerRequest, hard_wall_ms: u32, max_hops: u32) -> Cid {
let text = req.text.as_deref().unwrap_or("");
let mut buf = Vec::with_capacity(text.len() + 16);
buf.extend_from_slice(text.as_bytes());
buf.extend_from_slice(&hard_wall_ms.to_le_bytes());
buf.extend_from_slice(&max_hops.to_le_bytes());
let hash = Multihash::sha2_256(&buf);
Cid::new(CODEC_RAW, hash)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn default_is_off() {
let cfg = TraverseAnswerCfg::default();
assert!(!cfg.experimental_enabled);
assert_eq!(cfg.hard_wall_budget_ms, 5000);
assert_eq!(cfg.max_hops, 3);
}
#[test]
fn clamped_enforces_envelope() {
let cfg = TraverseAnswerCfg {
hard_wall_budget_ms: 0,
max_hops: 0,
experimental_enabled: true,
}
.clamped();
assert_eq!(cfg.hard_wall_budget_ms, TraverseAnswerCfg::HARD_WALL_MIN_MS);
assert_eq!(cfg.max_hops, 1);
let cfg = TraverseAnswerCfg {
hard_wall_budget_ms: u32::MAX,
max_hops: u32::MAX,
experimental_enabled: true,
}
.clamped();
assert_eq!(cfg.hard_wall_budget_ms, TraverseAnswerCfg::HARD_WALL_MAX_MS);
assert_eq!(cfg.max_hops, TraverseAnswerCfg::MAX_HOPS_CEILING);
}
#[test]
fn experimental_section_absent_keys_default_off() {
let section = ExperimentalSection::default();
let cfg = section.into_cfg();
assert!(!cfg.experimental_enabled);
assert_eq!(cfg.hard_wall_budget_ms, 5000);
assert_eq!(cfg.max_hops, 3);
}
#[test]
fn experimental_section_opts_in() {
let section = ExperimentalSection {
single_call_multihop: Some(true),
traverse_answer_hard_wall_ms: Some(8000),
traverse_answer_max_hops: Some(5),
};
let cfg = section.into_cfg();
assert!(cfg.experimental_enabled);
assert_eq!(cfg.hard_wall_budget_ms, 8000);
assert_eq!(cfg.max_hops, 5);
}
#[test]
fn concurrency_cap_never_below_two() {
let cap = derive_concurrency_cap();
assert!(cap >= 2, "concurrency cap must never drop below 2");
}
#[test]
fn cold_start_budget_fallback_is_500() {
assert_eq!(derive_cold_start_budget_ms(None), 500);
}
#[test]
fn cold_start_budget_clamps_rolling_p95() {
assert_eq!(derive_cold_start_budget_ms(Some(50)), 200);
assert_eq!(derive_cold_start_budget_ms(Some(900)), 900);
assert_eq!(derive_cold_start_budget_ms(Some(9999)), 2000);
}
proptest::proptest! {
#[test]
fn hard_wall_structural_dos_impossible(
caller_budget_ms in 0u32..=u32::MAX,
server_wall_ms in TraverseAnswerCfg::HARD_WALL_MIN_MS
..=TraverseAnswerCfg::HARD_WALL_MAX_MS,
) {
let effective = caller_budget_ms.clamp(
TraverseAnswerCfg::HARD_WALL_MIN_MS,
server_wall_ms,
);
proptest::prop_assert!(effective >= TraverseAnswerCfg::HARD_WALL_MIN_MS);
proptest::prop_assert!(effective <= server_wall_ms);
}
}
proptest::proptest! {
#[test]
fn hops_3_covers_99_pct_multihop_benchmarks(
caller_hops in 0u32..=u32::MAX,
) {
let cfg = TraverseAnswerCfg::default();
let effective = caller_hops.clamp(1, cfg.max_hops);
proptest::prop_assert!(effective >= 1);
proptest::prop_assert!(effective <= 3);
}
}
}