Skip to main content

construct/gateway/
mod.rs

1//! Axum-based HTTP gateway with proper HTTP/1.1 compliance, body limits, and timeouts.
2//!
3//! This module replaces the raw TCP implementation with axum for:
4//! - Proper HTTP/1.1 parsing and compliance
5//! - Content-Length validation (handled by hyper)
6//! - Request body size limits (64KB max)
7//! - Request timeouts (30s) to prevent slow-loris attacks
8//! - Header sanitization (handled by axum/hyper)
9
10pub mod api;
11pub mod api_agents;
12pub mod api_artifact_body;
13pub mod api_clawhub;
14pub mod api_kumiho_proxy;
15pub mod api_mcp;
16pub mod api_memory_graph;
17pub mod api_pairing;
18#[cfg(feature = "plugins-wasm")]
19pub mod api_plugins;
20pub mod api_skills;
21pub mod api_teams;
22#[cfg(feature = "webauthn")]
23pub mod api_webauthn;
24pub mod api_workflows;
25pub mod approval_registry;
26pub mod auth_rate_limit;
27pub mod canvas;
28pub mod kumiho_client;
29pub mod mcp_discovery;
30pub mod nodes;
31pub mod session_queue;
32pub mod sse;
33pub mod static_files;
34// portable-pty needs `openpty` from libutil which Android's NDK does not
35// reliably link.  The websocket terminal isn't a meaningful surface on a
36// phone runtime anyway, so we drop the module + its route on Android.
37#[cfg(not(target_os = "android"))]
38pub mod terminal;
39pub mod tls;
40pub mod ws;
41pub mod ws_mcp_events;
42
43use crate::channels::{
44    Channel, GmailPushChannel, LinqChannel, NextcloudTalkChannel, SendMessage, WatiChannel,
45    WhatsAppChannel, session_backend::SessionBackend, session_sqlite::SqliteSessionBackend,
46};
47use crate::config::Config;
48use crate::cost::CostTracker;
49use crate::memory::{self, Memory, MemoryCategory};
50use crate::providers::{self, ChatMessage, Provider};
51use crate::runtime;
52use crate::security::SecurityPolicy;
53use crate::security::pairing::{PairingGuard, constant_time_eq, is_public_bind};
54use crate::tools;
55use crate::tools::canvas::CanvasStore;
56use crate::tools::traits::ToolSpec;
57use crate::util::truncate_with_ellipsis;
58use anyhow::{Context, Result};
59use axum::{
60    Router,
61    body::Bytes,
62    extract::{ConnectInfo, Query, State},
63    http::{HeaderMap, StatusCode, header},
64    response::{IntoResponse, Json},
65    routing::{delete, get, post, put},
66};
67use parking_lot::Mutex;
68use std::collections::HashMap;
69use std::net::{IpAddr, SocketAddr};
70use std::sync::Arc;
71use std::time::{Duration, Instant};
72use tower_http::limit::RequestBodyLimitLayer;
73use tower_http::timeout::TimeoutLayer;
74use uuid::Uuid;
75
76/// Maximum request body size (64KB) — prevents memory exhaustion
77pub const MAX_BODY_SIZE: usize = 65_536;
78/// Default request timeout (30s) — prevents slow-loris attacks.
79pub const REQUEST_TIMEOUT_SECS: u64 = 30;
80
81/// Read gateway request timeout from `CONSTRUCT_GATEWAY_TIMEOUT_SECS` env var
82/// at runtime, falling back to [`REQUEST_TIMEOUT_SECS`].
83///
84/// Agentic workloads with tool use (web search, MCP tools, sub-agent
85/// delegation) regularly exceed 30 seconds. This allows operators to
86/// increase the timeout without recompiling.
87pub fn gateway_request_timeout_secs() -> u64 {
88    std::env::var("CONSTRUCT_GATEWAY_TIMEOUT_SECS")
89        .ok()
90        .and_then(|v| v.parse().ok())
91        .unwrap_or(REQUEST_TIMEOUT_SECS)
92}
93/// Sliding window used by gateway rate limiting.
94pub const RATE_LIMIT_WINDOW_SECS: u64 = 60;
95/// Fallback max distinct client keys tracked in gateway rate limiter.
96pub const RATE_LIMIT_MAX_KEYS_DEFAULT: usize = 10_000;
97/// Fallback max distinct idempotency keys retained in gateway memory.
98pub const IDEMPOTENCY_MAX_KEYS_DEFAULT: usize = 10_000;
99
100fn webhook_memory_key() -> String {
101    format!("webhook_msg_{}", Uuid::new_v4())
102}
103
104fn whatsapp_memory_key(msg: &crate::channels::traits::ChannelMessage) -> String {
105    format!("whatsapp_{}_{}", msg.sender, msg.id)
106}
107
108fn linq_memory_key(msg: &crate::channels::traits::ChannelMessage) -> String {
109    format!("linq_{}_{}", msg.sender, msg.id)
110}
111
112fn wati_memory_key(msg: &crate::channels::traits::ChannelMessage) -> String {
113    format!("wati_{}_{}", msg.sender, msg.id)
114}
115
116fn nextcloud_talk_memory_key(msg: &crate::channels::traits::ChannelMessage) -> String {
117    format!("nextcloud_talk_{}_{}", msg.sender, msg.id)
118}
119
120fn sender_session_id(channel: &str, msg: &crate::channels::traits::ChannelMessage) -> String {
121    match &msg.thread_ts {
122        Some(thread_id) => format!("{channel}_{thread_id}_{}", msg.sender),
123        None => format!("{channel}_{}", msg.sender),
124    }
125}
126
127fn webhook_session_id(headers: &HeaderMap) -> Option<String> {
128    headers
129        .get("X-Session-Id")
130        .and_then(|v| v.to_str().ok())
131        .map(str::trim)
132        .filter(|value| !value.is_empty())
133        .map(str::to_owned)
134}
135
136fn hash_webhook_secret(value: &str) -> String {
137    use sha2::{Digest, Sha256};
138
139    let digest = Sha256::digest(value.as_bytes());
140    hex::encode(digest)
141}
142
143/// How often the rate limiter sweeps stale IP entries from its map.
144const RATE_LIMITER_SWEEP_INTERVAL_SECS: u64 = 300; // 5 minutes
145
146#[derive(Debug)]
147struct SlidingWindowRateLimiter {
148    limit_per_window: u32,
149    window: Duration,
150    max_keys: usize,
151    requests: Mutex<(HashMap<String, Vec<Instant>>, Instant)>,
152}
153
154impl SlidingWindowRateLimiter {
155    fn new(limit_per_window: u32, window: Duration, max_keys: usize) -> Self {
156        Self {
157            limit_per_window,
158            window,
159            max_keys: max_keys.max(1),
160            requests: Mutex::new((HashMap::new(), Instant::now())),
161        }
162    }
163
164    fn prune_stale(requests: &mut HashMap<String, Vec<Instant>>, cutoff: Instant) {
165        requests.retain(|_, timestamps| {
166            timestamps.retain(|t| *t > cutoff);
167            !timestamps.is_empty()
168        });
169    }
170
171    fn allow(&self, key: &str) -> bool {
172        if self.limit_per_window == 0 {
173            return true;
174        }
175
176        let now = Instant::now();
177        let cutoff = now.checked_sub(self.window).unwrap_or_else(Instant::now);
178
179        let mut guard = self.requests.lock();
180        let (requests, last_sweep) = &mut *guard;
181
182        // Periodic sweep: remove keys with no recent requests
183        if last_sweep.elapsed() >= Duration::from_secs(RATE_LIMITER_SWEEP_INTERVAL_SECS) {
184            Self::prune_stale(requests, cutoff);
185            *last_sweep = now;
186        }
187
188        if !requests.contains_key(key) && requests.len() >= self.max_keys {
189            // Opportunistic stale cleanup before eviction under cardinality pressure.
190            Self::prune_stale(requests, cutoff);
191            *last_sweep = now;
192
193            if requests.len() >= self.max_keys {
194                let evict_key = requests
195                    .iter()
196                    .min_by_key(|(_, timestamps)| timestamps.last().copied().unwrap_or(cutoff))
197                    .map(|(k, _)| k.clone());
198                if let Some(evict_key) = evict_key {
199                    requests.remove(&evict_key);
200                }
201            }
202        }
203
204        let entry = requests.entry(key.to_owned()).or_default();
205        entry.retain(|instant| *instant > cutoff);
206
207        if entry.len() >= self.limit_per_window as usize {
208            return false;
209        }
210
211        entry.push(now);
212        true
213    }
214}
215
216#[derive(Debug)]
217pub struct GatewayRateLimiter {
218    pair: SlidingWindowRateLimiter,
219    webhook: SlidingWindowRateLimiter,
220}
221
222impl GatewayRateLimiter {
223    fn new(pair_per_minute: u32, webhook_per_minute: u32, max_keys: usize) -> Self {
224        let window = Duration::from_secs(RATE_LIMIT_WINDOW_SECS);
225        Self {
226            pair: SlidingWindowRateLimiter::new(pair_per_minute, window, max_keys),
227            webhook: SlidingWindowRateLimiter::new(webhook_per_minute, window, max_keys),
228        }
229    }
230
231    fn allow_pair(&self, key: &str) -> bool {
232        self.pair.allow(key)
233    }
234
235    fn allow_webhook(&self, key: &str) -> bool {
236        self.webhook.allow(key)
237    }
238}
239
240#[derive(Debug)]
241pub struct IdempotencyStore {
242    ttl: Duration,
243    max_keys: usize,
244    keys: Mutex<HashMap<String, Instant>>,
245}
246
247impl IdempotencyStore {
248    fn new(ttl: Duration, max_keys: usize) -> Self {
249        Self {
250            ttl,
251            max_keys: max_keys.max(1),
252            keys: Mutex::new(HashMap::new()),
253        }
254    }
255
256    /// Returns true if this key is new and is now recorded.
257    fn record_if_new(&self, key: &str) -> bool {
258        let now = Instant::now();
259        let mut keys = self.keys.lock();
260
261        keys.retain(|_, seen_at| now.duration_since(*seen_at) < self.ttl);
262
263        if keys.contains_key(key) {
264            return false;
265        }
266
267        if keys.len() >= self.max_keys {
268            let evict_key = keys
269                .iter()
270                .min_by_key(|(_, seen_at)| *seen_at)
271                .map(|(k, _)| k.clone());
272            if let Some(evict_key) = evict_key {
273                keys.remove(&evict_key);
274            }
275        }
276
277        keys.insert(key.to_owned(), now);
278        true
279    }
280}
281
282fn parse_client_ip(value: &str) -> Option<IpAddr> {
283    let value = value.trim().trim_matches('"').trim();
284    if value.is_empty() {
285        return None;
286    }
287
288    if let Ok(ip) = value.parse::<IpAddr>() {
289        return Some(ip);
290    }
291
292    if let Ok(addr) = value.parse::<SocketAddr>() {
293        return Some(addr.ip());
294    }
295
296    let value = value.trim_matches(['[', ']']);
297    value.parse::<IpAddr>().ok()
298}
299
300fn forwarded_client_ip(headers: &HeaderMap) -> Option<IpAddr> {
301    // Read the rightmost X-Forwarded-For hop. Proxies APPEND to XFF, so the
302    // leftmost value is supplied by the client (attacker-controlled) while the
303    // rightmost was written by the immediate upstream proxy we are trusting.
304    // Taking the leftmost would let any caller spoof an arbitrary source IP.
305    if let Some(xff) = headers.get("X-Forwarded-For").and_then(|v| v.to_str().ok()) {
306        for candidate in xff.rsplit(',') {
307            if let Some(ip) = parse_client_ip(candidate) {
308                return Some(ip);
309            }
310        }
311    }
312
313    headers
314        .get("X-Real-IP")
315        .and_then(|v| v.to_str().ok())
316        .and_then(parse_client_ip)
317}
318
319pub(super) fn client_key_from_request(
320    peer_addr: Option<SocketAddr>,
321    headers: &HeaderMap,
322    trust_forwarded_headers: bool,
323) -> String {
324    if trust_forwarded_headers {
325        if let Some(ip) = forwarded_client_ip(headers) {
326            return ip.to_string();
327        }
328    }
329
330    peer_addr
331        .map(|addr| addr.ip().to_string())
332        .unwrap_or_else(|| "unknown".to_string())
333}
334
335fn normalize_max_keys(configured: usize, fallback: usize) -> usize {
336    if configured == 0 {
337        fallback.max(1)
338    } else {
339        configured
340    }
341}
342
343/// Shared state for all axum handlers
344#[derive(Clone)]
345pub struct AppState {
346    pub config: Arc<Mutex<Config>>,
347    pub provider: Arc<dyn Provider>,
348    pub model: String,
349    pub temperature: f64,
350    pub mem: Arc<dyn Memory>,
351    pub auto_save: bool,
352    /// SHA-256 hash of `X-Webhook-Secret` (hex-encoded), never plaintext.
353    pub webhook_secret_hash: Option<Arc<str>>,
354    pub pairing: Arc<PairingGuard>,
355    pub trust_forwarded_headers: bool,
356    pub rate_limiter: Arc<GatewayRateLimiter>,
357    pub auth_limiter: Arc<auth_rate_limit::AuthRateLimiter>,
358    pub idempotency_store: Arc<IdempotencyStore>,
359    pub whatsapp: Option<Arc<WhatsAppChannel>>,
360    /// `WhatsApp` app secret for webhook signature verification (`X-Hub-Signature-256`)
361    pub whatsapp_app_secret: Option<Arc<str>>,
362    pub linq: Option<Arc<LinqChannel>>,
363    /// Linq webhook signing secret for signature verification
364    pub linq_signing_secret: Option<Arc<str>>,
365    pub nextcloud_talk: Option<Arc<NextcloudTalkChannel>>,
366    /// Nextcloud Talk webhook secret for signature verification
367    pub nextcloud_talk_webhook_secret: Option<Arc<str>>,
368    pub wati: Option<Arc<WatiChannel>>,
369    /// Gmail Pub/Sub push notification channel
370    pub gmail_push: Option<Arc<GmailPushChannel>>,
371    /// Observability backend for metrics scraping
372    pub observer: Arc<dyn crate::observability::Observer>,
373    /// Registered tool specs (for web dashboard tools page)
374    pub tools_registry: Arc<Vec<ToolSpec>>,
375    /// Cost tracker (optional, for web dashboard cost page)
376    pub cost_tracker: Option<Arc<CostTracker>>,
377    /// Audit logger (optional, for web dashboard audit viewer)
378    pub audit_logger: Option<Arc<crate::security::audit::AuditLogger>>,
379    /// SSE broadcast channel for real-time events
380    pub event_tx: tokio::sync::broadcast::Sender<serde_json::Value>,
381    /// Shutdown signal sender for graceful shutdown
382    pub shutdown_tx: tokio::sync::watch::Sender<bool>,
383    /// Registry of dynamically connected nodes
384    pub node_registry: Arc<nodes::NodeRegistry>,
385    /// Path prefix for reverse-proxy deployments (empty string = no prefix)
386    pub path_prefix: String,
387    /// Session backend for persisting gateway WS chat sessions
388    pub session_backend: Option<Arc<dyn SessionBackend>>,
389    /// Per-session actor queue for serializing concurrent turns
390    pub session_queue: Arc<session_queue::SessionActorQueue>,
391    /// Device registry for paired device management
392    pub device_registry: Option<Arc<api_pairing::DeviceRegistry>>,
393    /// Pending pairing request store
394    pub pending_pairings: Option<Arc<api_pairing::PairingStore>>,
395    /// Shared canvas store for Live Canvas (A2UI) system
396    pub canvas_store: CanvasStore,
397    /// MCP registry for direct tool invocation from HTTP handlers (memory graph, etc.)
398    pub mcp_registry: Option<Arc<tools::McpRegistry>>,
399    /// WebAuthn state for hardware key authentication (optional, requires `webauthn` feature)
400    #[cfg(feature = "webauthn")]
401    pub webauthn: Option<Arc<api_webauthn::WebAuthnState>>,
402    /// Registry of pending human approval requests from workflow runs
403    pub approval_registry: Arc<approval_registry::ApprovalRegistry>,
404    /// Base URL (e.g. `http://127.0.0.1:60004`) of the in-process MCP server,
405    /// used by gateway reverse-proxy handlers in [`api_mcp`]. `None` if the
406    /// MCP server failed to bind — proxy handlers should then return 503.
407    /// Populated after MCP server bind in [`run_gateway`].
408    pub mcp_local_url: Option<Arc<str>>,
409}
410
411/// Run the HTTP gateway using axum with proper HTTP/1.1 compliance.
412#[allow(clippy::too_many_lines)]
413pub async fn run_gateway(host: &str, port: u16, config: Config) -> Result<()> {
414    // ── Security: warn on public bind without tunnel or explicit opt-in ──
415    if is_public_bind(host) && config.tunnel.provider == "none" && !config.gateway.allow_public_bind
416    {
417        tracing::warn!(
418            "⚠️  Binding to {host} — gateway will be exposed to all network interfaces.\n\
419             Suggestion: use --host 127.0.0.1 (default), configure a tunnel, or set\n\
420             [gateway] allow_public_bind = true in config.toml to silence this warning.\n\n\
421             Docker/VM: if you are running inside a container or VM, this is expected."
422        );
423    }
424    let config_state = Arc::new(Mutex::new(config.clone()));
425
426    // ── Hooks ──────────────────────────────────────────────────────
427    let hooks: Option<std::sync::Arc<crate::hooks::HookRunner>> = if config.hooks.enabled {
428        Some(std::sync::Arc::new(crate::hooks::HookRunner::new()))
429    } else {
430        None
431    };
432
433    let addr: SocketAddr = format!("{host}:{port}").parse()?;
434    let listener = tokio::net::TcpListener::bind(addr).await?;
435    let actual_port = listener.local_addr()?.port();
436    let display_addr = format!("{host}:{actual_port}");
437
438    let provider: Arc<dyn Provider> = Arc::from(providers::create_resilient_provider_with_options(
439        config.default_provider.as_deref().unwrap_or("openrouter"),
440        config.api_key.as_deref(),
441        config.api_url.as_deref(),
442        &config.reliability,
443        &providers::ProviderRuntimeOptions {
444            auth_profile_override: None,
445            provider_api_url: config.api_url.clone(),
446            construct_dir: config.config_path.parent().map(std::path::PathBuf::from),
447            secrets_encrypt: config.secrets.encrypt,
448            reasoning_enabled: config.runtime.reasoning_enabled,
449            reasoning_effort: config.runtime.reasoning_effort.clone(),
450            provider_timeout_secs: Some(config.provider_timeout_secs),
451            extra_headers: config.extra_headers.clone(),
452            api_path: config.api_path.clone(),
453            provider_max_tokens: config.provider_max_tokens,
454        },
455    )?);
456    let model = config
457        .default_model
458        .clone()
459        .unwrap_or_else(|| "anthropic/claude-sonnet-4".into());
460    let temperature = config.default_temperature;
461    let mem: Arc<dyn Memory> = Arc::from(memory::create_memory_with_storage_and_routes(
462        &config.memory,
463        &config.embedding_routes,
464        Some(&config.storage.provider.config),
465        &config.workspace_dir,
466        config.api_key.as_deref(),
467    )?);
468    let runtime: Arc<dyn runtime::RuntimeAdapter> =
469        Arc::from(runtime::create_runtime(&config.runtime)?);
470    let security = Arc::new(SecurityPolicy::from_config(
471        &config.autonomy,
472        &config.workspace_dir,
473    ));
474
475    let (composio_key, composio_entity_id) = if config.composio.enabled {
476        (
477            config.composio.api_key.as_deref(),
478            Some(config.composio.entity_id.as_str()),
479        )
480    } else {
481        (None, None)
482    };
483
484    let canvas_store = tools::canvas::global_store();
485
486    let (
487        mut tools_registry_raw,
488        delegate_handle_gw,
489        _reaction_handle_gw,
490        _channel_map_handle,
491        _ask_user_handle_gw,
492        _escalate_handle_gw,
493    ) = tools::all_tools_with_runtime(
494        Arc::new(config.clone()),
495        &security,
496        runtime,
497        Arc::clone(&mem),
498        composio_key,
499        composio_entity_id,
500        &config.browser,
501        &config.http_request,
502        &config.web_fetch,
503        &config.workspace_dir,
504        &config.agents,
505        config.api_key.as_deref(),
506        &config,
507        Some(canvas_store.clone()),
508    );
509
510    // ── Wire MCP tools into the gateway tool registry (non-fatal) ───
511    // Without this, the `/api/tools` endpoint misses MCP tools.
512    // Inject operator + kumiho MCP server configs (same as agent/channels do)
513    // so the gateway can call operator tools directly from HTTP handlers.
514    let gateway_mcp_config = {
515        let mut c = config.clone();
516        c = crate::agent::kumiho::inject_kumiho(c, false);
517        c = crate::agent::operator::inject_operator(c, false);
518        c.mcp
519    };
520    let mut mcp_registry_shared: Option<Arc<tools::McpRegistry>> = None;
521    if gateway_mcp_config.enabled && !gateway_mcp_config.servers.is_empty() {
522        tracing::info!(
523            "Gateway: initializing MCP client — {} server(s) configured",
524            gateway_mcp_config.servers.len()
525        );
526        match tools::McpRegistry::connect_all(&gateway_mcp_config.servers).await {
527            Ok(registry) => {
528                let registry = std::sync::Arc::new(registry);
529                mcp_registry_shared = Some(std::sync::Arc::clone(&registry));
530                if gateway_mcp_config.deferred_loading {
531                    let operator_prefix =
532                        format!("{}__", crate::agent::operator::OPERATOR_SERVER_NAME);
533                    let kumiho_prefix = format!("{}__", crate::agent::kumiho::KUMIHO_SERVER_NAME);
534                    let all_names = registry.tool_names();
535                    let mut eager_count = 0usize;
536
537                    for name in &all_names {
538                        if name.starts_with(&operator_prefix) || name.starts_with(&kumiho_prefix) {
539                            if let Some(def) = registry.get_tool_def(name).await {
540                                let wrapper: std::sync::Arc<dyn tools::Tool> =
541                                    std::sync::Arc::new(tools::McpToolWrapper::new(
542                                        name.clone(),
543                                        def,
544                                        std::sync::Arc::clone(&registry),
545                                    ));
546                                if let Some(ref handle) = delegate_handle_gw {
547                                    handle.write().push(std::sync::Arc::clone(&wrapper));
548                                }
549                                tools_registry_raw.push(Box::new(tools::ArcToolRef(wrapper)));
550                                eager_count += 1;
551                            }
552                        }
553                    }
554
555                    let deferred_set = tools::DeferredMcpToolSet::from_registry_filtered(
556                        std::sync::Arc::clone(&registry),
557                        |name| {
558                            !name.starts_with(&operator_prefix) && !name.starts_with(&kumiho_prefix)
559                        },
560                    )
561                    .await;
562                    tracing::info!(
563                        "Gateway MCP hybrid: {} eager operator+kumiho tool(s), {} deferred stub(s) from {} server(s)",
564                        eager_count,
565                        deferred_set.len(),
566                        registry.server_count()
567                    );
568                    let activated =
569                        std::sync::Arc::new(std::sync::Mutex::new(tools::ActivatedToolSet::new()));
570                    tools_registry_raw.push(Box::new(tools::ToolSearchTool::new(
571                        deferred_set,
572                        activated,
573                    )));
574                } else {
575                    let names = registry.tool_names();
576                    let mut registered = 0usize;
577                    for name in names {
578                        if let Some(def) = registry.get_tool_def(&name).await {
579                            let wrapper: std::sync::Arc<dyn tools::Tool> =
580                                std::sync::Arc::new(tools::McpToolWrapper::new(
581                                    name,
582                                    def,
583                                    std::sync::Arc::clone(&registry),
584                                ));
585                            if let Some(ref handle) = delegate_handle_gw {
586                                handle.write().push(std::sync::Arc::clone(&wrapper));
587                            }
588                            tools_registry_raw.push(Box::new(tools::ArcToolRef(wrapper)));
589                            registered += 1;
590                        }
591                    }
592                    tracing::info!(
593                        "Gateway MCP: {} tool(s) registered from {} server(s)",
594                        registered,
595                        registry.server_count()
596                    );
597                }
598            }
599            Err(e) => {
600                tracing::error!("Gateway MCP registry failed to initialize: {e:#}");
601            }
602        }
603    }
604
605    let tools_registry: Arc<Vec<ToolSpec>> =
606        Arc::new(tools_registry_raw.iter().map(|t| t.spec()).collect());
607
608    // Cost tracker — process-global singleton so channels share the same instance
609    let cost_tracker = CostTracker::get_or_init_global(config.cost.clone(), &config.workspace_dir);
610
611    // Audit logger — optional, for dashboard audit viewer
612    let audit_logger = if config.security.audit.enabled {
613        match crate::security::audit::AuditLogger::new(
614            config.security.audit.clone(),
615            std::path::PathBuf::from(&config.workspace_dir),
616        ) {
617            Ok(logger) => Some(Arc::new(logger)),
618            Err(e) => {
619                tracing::warn!("Audit logger init failed (dashboard audit viewer disabled): {e}");
620                None
621            }
622        }
623    } else {
624        None
625    };
626
627    // SSE broadcast channel for real-time events
628    let (event_tx, _event_rx) = tokio::sync::broadcast::channel::<serde_json::Value>(4096);
629    // Extract webhook secret for authentication
630    let webhook_secret_hash: Option<Arc<str>> =
631        config.channels_config.webhook.as_ref().and_then(|webhook| {
632            webhook.secret.as_ref().and_then(|raw_secret| {
633                let trimmed_secret = raw_secret.trim();
634                (!trimmed_secret.is_empty())
635                    .then(|| Arc::<str>::from(hash_webhook_secret(trimmed_secret)))
636            })
637        });
638
639    // WhatsApp channel (if configured)
640    let whatsapp_channel: Option<Arc<WhatsAppChannel>> = config
641        .channels_config
642        .whatsapp
643        .as_ref()
644        .filter(|wa| wa.is_cloud_config())
645        .map(|wa| {
646            Arc::new(WhatsAppChannel::new(
647                wa.access_token.clone().unwrap_or_default(),
648                wa.phone_number_id.clone().unwrap_or_default(),
649                wa.verify_token.clone().unwrap_or_default(),
650                wa.allowed_numbers.clone(),
651            ))
652        });
653
654    // WhatsApp app secret for webhook signature verification
655    // Priority: environment variable > config file
656    let whatsapp_app_secret: Option<Arc<str>> = std::env::var("CONSTRUCT_WHATSAPP_APP_SECRET")
657        .ok()
658        .and_then(|secret| {
659            let secret = secret.trim();
660            (!secret.is_empty()).then(|| secret.to_owned())
661        })
662        .or_else(|| {
663            config.channels_config.whatsapp.as_ref().and_then(|wa| {
664                wa.app_secret
665                    .as_deref()
666                    .map(str::trim)
667                    .filter(|secret| !secret.is_empty())
668                    .map(ToOwned::to_owned)
669            })
670        })
671        .map(Arc::from);
672
673    // Linq channel (if configured)
674    let linq_channel: Option<Arc<LinqChannel>> = config.channels_config.linq.as_ref().map(|lq| {
675        Arc::new(LinqChannel::new(
676            lq.api_token.clone(),
677            lq.from_phone.clone(),
678            lq.allowed_senders.clone(),
679        ))
680    });
681
682    // Linq signing secret for webhook signature verification
683    // Priority: environment variable > config file
684    let linq_signing_secret: Option<Arc<str>> = std::env::var("CONSTRUCT_LINQ_SIGNING_SECRET")
685        .ok()
686        .and_then(|secret| {
687            let secret = secret.trim();
688            (!secret.is_empty()).then(|| secret.to_owned())
689        })
690        .or_else(|| {
691            config.channels_config.linq.as_ref().and_then(|lq| {
692                lq.signing_secret
693                    .as_deref()
694                    .map(str::trim)
695                    .filter(|secret| !secret.is_empty())
696                    .map(ToOwned::to_owned)
697            })
698        })
699        .map(Arc::from);
700
701    // WATI channel (if configured)
702    let wati_channel: Option<Arc<WatiChannel>> =
703        config.channels_config.wati.as_ref().map(|wati_cfg| {
704            Arc::new(
705                WatiChannel::new(
706                    wati_cfg.api_token.clone(),
707                    wati_cfg.api_url.clone(),
708                    wati_cfg.tenant_id.clone(),
709                    wati_cfg.allowed_numbers.clone(),
710                )
711                .with_transcription(config.transcription.clone()),
712            )
713        });
714
715    // Nextcloud Talk channel (if configured)
716    let nextcloud_talk_channel: Option<Arc<NextcloudTalkChannel>> =
717        config.channels_config.nextcloud_talk.as_ref().map(|nc| {
718            Arc::new(NextcloudTalkChannel::new(
719                nc.base_url.clone(),
720                nc.app_token.clone(),
721                nc.bot_name.clone().unwrap_or_default(),
722                nc.allowed_users.clone(),
723            ))
724        });
725
726    // Nextcloud Talk webhook secret for signature verification
727    // Priority: environment variable > config file
728    let nextcloud_talk_webhook_secret: Option<Arc<str>> =
729        std::env::var("CONSTRUCT_NEXTCLOUD_TALK_WEBHOOK_SECRET")
730            .ok()
731            .and_then(|secret| {
732                let secret = secret.trim();
733                (!secret.is_empty()).then(|| secret.to_owned())
734            })
735            .or_else(|| {
736                config
737                    .channels_config
738                    .nextcloud_talk
739                    .as_ref()
740                    .and_then(|nc| {
741                        nc.webhook_secret
742                            .as_deref()
743                            .map(str::trim)
744                            .filter(|secret| !secret.is_empty())
745                            .map(ToOwned::to_owned)
746                    })
747            })
748            .map(Arc::from);
749
750    // Gmail Push channel (if configured and enabled)
751    let gmail_push_channel: Option<Arc<GmailPushChannel>> = config
752        .channels_config
753        .gmail_push
754        .as_ref()
755        .filter(|gp| gp.enabled)
756        .map(|gp| Arc::new(GmailPushChannel::new(gp.clone())));
757
758    // ── Session persistence for WS chat ─────────────────────
759    let session_backend: Option<Arc<dyn SessionBackend>> = if config.gateway.session_persistence {
760        match SqliteSessionBackend::new(&config.workspace_dir) {
761            Ok(b) => {
762                tracing::info!("Gateway session persistence enabled (SQLite)");
763                if config.gateway.session_ttl_hours > 0 {
764                    if let Ok(cleaned) = b.cleanup_stale(config.gateway.session_ttl_hours) {
765                        if cleaned > 0 {
766                            tracing::info!("Cleaned up {cleaned} stale gateway sessions");
767                        }
768                    }
769                }
770                Some(Arc::new(b))
771            }
772            Err(e) => {
773                tracing::warn!("Session persistence disabled: {e}");
774                None
775            }
776        }
777    } else {
778        None
779    };
780
781    // ── Pairing guard ──────────────────────────────────────
782    let pairing = Arc::new(PairingGuard::new(
783        config.gateway.require_pairing,
784        &config.gateway.paired_tokens,
785    ));
786    let rate_limit_max_keys = normalize_max_keys(
787        config.gateway.rate_limit_max_keys,
788        RATE_LIMIT_MAX_KEYS_DEFAULT,
789    );
790    let rate_limiter = Arc::new(GatewayRateLimiter::new(
791        config.gateway.pair_rate_limit_per_minute,
792        config.gateway.webhook_rate_limit_per_minute,
793        rate_limit_max_keys,
794    ));
795    let idempotency_max_keys = normalize_max_keys(
796        config.gateway.idempotency_max_keys,
797        IDEMPOTENCY_MAX_KEYS_DEFAULT,
798    );
799    let idempotency_store = Arc::new(IdempotencyStore::new(
800        Duration::from_secs(config.gateway.idempotency_ttl_secs.max(1)),
801        idempotency_max_keys,
802    ));
803
804    // Resolve optional path prefix for reverse-proxy deployments.
805    let path_prefix: Option<&str> = config
806        .gateway
807        .path_prefix
808        .as_deref()
809        .filter(|p| !p.is_empty());
810
811    // ── Tunnel ────────────────────────────────────────────────
812    let tunnel = crate::tunnel::create_tunnel(&config.tunnel)?;
813    let mut tunnel_url: Option<String> = None;
814
815    if let Some(ref tun) = tunnel {
816        println!("🔗 Starting {} tunnel...", tun.name());
817        match tun.start(host, actual_port).await {
818            Ok(url) => {
819                println!("🌐 Tunnel active: {url}");
820                tunnel_url = Some(url);
821            }
822            Err(e) => {
823                println!("⚠️  Tunnel failed to start: {e}");
824                println!("   Falling back to local-only mode.");
825            }
826        }
827    }
828
829    let pfx = path_prefix.unwrap_or("");
830    println!("🦀 Construct Gateway listening on http://{display_addr}{pfx}");
831    if let Some(ref url) = tunnel_url {
832        println!("  🌐 Public URL: {url}");
833    }
834    println!("  🌐 Web Dashboard: http://{display_addr}{pfx}/");
835    if let Some(code) = pairing.pairing_code() {
836        println!();
837        println!("  🔐 PAIRING REQUIRED — use this one-time code:");
838        println!("     ┌──────────────┐");
839        println!("     │  {code}  │");
840        println!("     └──────────────┘");
841        println!("     Send: POST {pfx}/pair with header X-Pairing-Code: {code}");
842    } else if pairing.require_pairing() {
843        println!("  🔒 Pairing: ACTIVE (bearer token required)");
844        println!("     To pair a new device: construct gateway get-paircode --new");
845        println!();
846    } else {
847        println!("  ⚠️  Pairing: DISABLED (all requests accepted)");
848        println!();
849    }
850    println!("  POST {pfx}/pair      — pair a new client (X-Pairing-Code header)");
851    println!("  POST {pfx}/webhook   — {{\"message\": \"your prompt\"}}");
852    if whatsapp_channel.is_some() {
853        println!("  GET  {pfx}/whatsapp  — Meta webhook verification");
854        println!("  POST {pfx}/whatsapp  — WhatsApp message webhook");
855    }
856    if linq_channel.is_some() {
857        println!("  POST {pfx}/linq      — Linq message webhook (iMessage/RCS/SMS)");
858    }
859    if wati_channel.is_some() {
860        println!("  GET  {pfx}/wati      — WATI webhook verification");
861        println!("  POST {pfx}/wati      — WATI message webhook");
862    }
863    if nextcloud_talk_channel.is_some() {
864        println!("  POST {pfx}/nextcloud-talk — Nextcloud Talk bot webhook");
865    }
866    println!("  GET  {pfx}/api/*     — REST API (bearer token required)");
867    println!("  GET  {pfx}/ws/chat   — WebSocket agent chat");
868    if config.nodes.enabled {
869        println!("  GET  {pfx}/ws/nodes  — WebSocket node discovery");
870    }
871    println!("  GET  {pfx}/health    — health check");
872    println!("  GET  {pfx}/metrics   — Prometheus metrics");
873    println!("  Press Ctrl+C to stop.\n");
874
875    crate::health::mark_component_ok("gateway");
876
877    // Fire gateway start hook
878    if let Some(ref hooks) = hooks {
879        hooks.fire_gateway_start(host, actual_port).await;
880    }
881
882    // Wrap observer with broadcast capability for SSE
883    let broadcast_observer: Arc<dyn crate::observability::Observer> =
884        Arc::new(sse::BroadcastObserver::new(
885            crate::observability::create_observer(&config.observability),
886            event_tx.clone(),
887        ));
888
889    let (shutdown_tx, mut shutdown_rx) = tokio::sync::watch::channel(false);
890
891    // Node registry for dynamic node discovery
892    let node_registry = Arc::new(nodes::NodeRegistry::new(config.nodes.max_nodes));
893
894    // Device registry and pairing store (only when pairing is required)
895    let device_registry = if config.gateway.require_pairing {
896        Some(Arc::new(api_pairing::DeviceRegistry::new(
897            &config.workspace_dir,
898        )?))
899    } else {
900        None
901    };
902    let pending_pairings = if config.gateway.require_pairing {
903        Some(Arc::new(api_pairing::PairingStore::new(
904            config.gateway.pairing_dashboard.max_pending_codes,
905        )))
906    } else {
907        None
908    };
909
910    // ── Build RuntimeHandles for the in-process MCP server ──────────────
911    //
912    // The MCP task needs live handles so the tools the standalone binary
913    // used to skip (workspace, channel-bound tools, session_store, discord
914    // memory, delegate, …) register properly. We clone individual Arcs out
915    // of the pieces we already built above and feed them in.
916    //
917    // Tools that keep shared handles (reaction/ask_user/escalate/poll) are
918    // easiest to wire by handing their already-built, channel-map-aware
919    // instances over via `pre_built_tools`. We do this by building a second
920    // parallel Arc-vec via `all_tools_with_runtime` — the per-tool state is
921    // stateless for registry purposes, and delegate/swarm get their own
922    // per-call depth counters anyway.
923    let mcp_workspace_dir = config.workspace_dir.clone();
924    let mcp_config_snapshot = config.clone();
925    let mcp_runtime_handles = {
926        let mut rh = crate::mcp_server::RuntimeHandles::empty();
927
928        // Workspace manager (if workspace isolation is on).
929        if config.workspace.enabled {
930            let workspaces_dir = if config.workspace.workspaces_dir.starts_with("~/") {
931                let home = directories::UserDirs::new()
932                    .map(|u| u.home_dir().to_path_buf())
933                    .unwrap_or_else(|| std::path::PathBuf::from("."));
934                home.join(&config.workspace.workspaces_dir[2..])
935            } else {
936                std::path::PathBuf::from(&config.workspace.workspaces_dir)
937            };
938            let manager = crate::config::workspace::WorkspaceManager::new(workspaces_dir);
939            rh.workspace_manager = Some(Arc::new(tokio::sync::RwLock::new(manager)));
940        }
941
942        // Session backend from channels::session_store::SessionStore.
943        if let Ok(store) = crate::channels::session_store::SessionStore::new(&config.workspace_dir)
944        {
945            let backend: Arc<dyn crate::channels::session_backend::SessionBackend> =
946                Arc::new(store);
947            rh.session_store = Some(backend);
948        }
949
950        // Discord memory (formerly for discord_search) — removed with the
951        // SQLite backend; persistent cross-session memory should use Kumiho MCP.
952
953        // Agent config + provider options for delegate / swarm.
954        if !config.agents.is_empty() {
955            rh.agent_config = Some(Arc::new(config.agents.clone()));
956            rh.fallback_api_key = config.api_key.as_deref().map(Arc::<str>::from);
957            rh.provider_runtime_options =
958                Some(Arc::new(crate::providers::ProviderRuntimeOptions {
959                    auth_profile_override: None,
960                    provider_api_url: config.api_url.clone(),
961                    construct_dir: config.config_path.parent().map(std::path::PathBuf::from),
962                    secrets_encrypt: config.secrets.encrypt,
963                    reasoning_enabled: config.runtime.reasoning_enabled,
964                    reasoning_effort: config.runtime.reasoning_effort.clone(),
965                    provider_timeout_secs: Some(config.provider_timeout_secs),
966                    extra_headers: config.extra_headers.clone(),
967                    api_path: config.api_path.clone(),
968                    provider_max_tokens: config.provider_max_tokens,
969                }));
970        }
971
972        // Build a second parallel tool set and hand it to MCP via
973        // `pre_built_tools` so every channel-aware / delegate tool lands in
974        // the MCP registry fully wired (even though its ChannelMap handle is
975        // distinct from the gateway's — the channel supervisor's populate
976        // path still routes through the gateway's tool instances; the MCP
977        // versions advertise and execute independently).
978        let (mcp_tools_boxed, _d_h, _r_h, _c_h, _a_h, _e_h) = tools::all_tools_with_runtime(
979            Arc::new(config.clone()),
980            &security,
981            Arc::new(crate::runtime::NativeRuntime::new()),
982            Arc::clone(&mem),
983            composio_key,
984            composio_entity_id,
985            &config.browser,
986            &config.http_request,
987            &config.web_fetch,
988            &config.workspace_dir,
989            &config.agents,
990            config.api_key.as_deref(),
991            &config,
992            Some(canvas_store.clone()),
993        );
994        // Convert Box<dyn Tool> to Arc<dyn Tool>.
995        let mcp_tool_arcs: Vec<Arc<dyn tools::Tool>> = mcp_tools_boxed
996            .into_iter()
997            .map(|b| Arc::<dyn tools::Tool>::from(b))
998            .collect();
999        rh.pre_built_tools = Some(mcp_tool_arcs);
1000
1001        rh
1002    };
1003
1004    let mut state = AppState {
1005        config: config_state,
1006        provider,
1007        model,
1008        temperature,
1009        mem,
1010        auto_save: config.memory.auto_save,
1011        webhook_secret_hash,
1012        pairing,
1013        trust_forwarded_headers: config.gateway.trust_forwarded_headers,
1014        rate_limiter,
1015        auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
1016        idempotency_store,
1017        whatsapp: whatsapp_channel,
1018        whatsapp_app_secret,
1019        linq: linq_channel,
1020        linq_signing_secret,
1021        nextcloud_talk: nextcloud_talk_channel,
1022        nextcloud_talk_webhook_secret,
1023        wati: wati_channel,
1024        gmail_push: gmail_push_channel,
1025        observer: broadcast_observer,
1026        tools_registry,
1027        cost_tracker,
1028        audit_logger,
1029        event_tx,
1030        shutdown_tx,
1031        node_registry,
1032        session_backend,
1033        session_queue: Arc::new(session_queue::SessionActorQueue::new(8, 30, 600)),
1034        device_registry,
1035        pending_pairings,
1036        path_prefix: path_prefix.unwrap_or("").to_string(),
1037        canvas_store,
1038        mcp_registry: mcp_registry_shared,
1039        approval_registry: approval_registry::global(),
1040        // Populated after the in-process MCP server binds (see below).
1041        mcp_local_url: None,
1042        #[cfg(feature = "webauthn")]
1043        webauthn: if config.security.webauthn.enabled {
1044            let secret_store = Arc::new(crate::security::SecretStore::new(
1045                &config.workspace_dir,
1046                true,
1047            ));
1048            let wa_config = crate::security::webauthn::WebAuthnConfig {
1049                enabled: true,
1050                rp_id: config.security.webauthn.rp_id.clone(),
1051                rp_origin: config.security.webauthn.rp_origin.clone(),
1052                rp_name: config.security.webauthn.rp_name.clone(),
1053            };
1054            Some(Arc::new(api_webauthn::WebAuthnState {
1055                manager: crate::security::webauthn::WebAuthnManager::new(
1056                    wa_config,
1057                    secret_store,
1058                    &config.workspace_dir,
1059                ),
1060                pending_registrations: parking_lot::Mutex::new(std::collections::HashMap::new()),
1061                pending_authentications: parking_lot::Mutex::new(std::collections::HashMap::new()),
1062            }))
1063        } else {
1064            None
1065        },
1066    };
1067
1068    // ── Skill effectiveness cache ───────────────────────────────────────
1069    //
1070    // Build a process-wide cache of recency-weighted skill outcomes so the
1071    // channels system-prompt builder can rerank skills before injecting
1072    // them into agent prompts.  See `src/skills/effectiveness_cache.rs`.
1073    //
1074    // On startup we:
1075    //   1. Construct an empty `Arc<EffectivenessCache>`.
1076    //   2. Install it as the process-wide global so
1077    //      `effectiveness_cache::global_provider()` returns it.
1078    //   3. Spawn a background task that refreshes scores from Kumiho every
1079    //      `DEFAULT_REFRESH_INTERVAL` (5 minutes).  Until the first refresh
1080    //      completes the cache is empty and skills inject in their static
1081    //      load order — same as before this feature shipped.
1082    {
1083        let effectiveness_cache = crate::skills::EffectivenessCache::new();
1084        let _ = crate::skills::effectiveness_cache::set_global(effectiveness_cache.clone());
1085
1086        let memory_project = config.kumiho.memory_project.clone();
1087        let workspace_dir = config.workspace_dir.clone();
1088        let config_for_skills = config.clone();
1089
1090        // Daemon-startup skill registration scan.
1091        //
1092        // Walk every directory under `<workspace>/skills/` and register
1093        // any skill whose SKILL.toml is missing `[skill].kref`.  This is
1094        // the safety-net that catches skills installed without a Kumiho
1095        // round-trip (CLI install with Kumiho unreachable, manual file
1096        // drop, skills materialised by a sibling tool).  Each call is
1097        // idempotent — already-registered skills are a no-op cheap.
1098        // The scan runs in the background so a slow Kumiho doesn't
1099        // delay gateway readiness.
1100        {
1101            let scan_workspace = workspace_dir.clone();
1102            let scan_project = memory_project.clone();
1103            let scan_client = crate::gateway::kumiho_client::build_client_from_config(&config);
1104            drop(tokio::spawn(async move {
1105                let skills_root = crate::skills::skills_dir(&scan_workspace);
1106                let mut entries = match tokio::fs::read_dir(&skills_root).await {
1107                    Ok(e) => e,
1108                    Err(_) => return,
1109                };
1110                while let Ok(Some(entry)) = entries.next_entry().await {
1111                    let dir = entry.path();
1112                    if !dir.is_dir() {
1113                        continue;
1114                    }
1115                    if !dir.join("SKILL.toml").exists() {
1116                        continue;
1117                    }
1118                    let registration_ok =
1119                        match crate::skills::registration::register_skill_with_kumiho(
1120                            &dir,
1121                            &scan_client,
1122                            &scan_project,
1123                        )
1124                        .await
1125                        {
1126                            Ok(crate::skills::registration::SkillRegistration::Registered {
1127                                kref,
1128                                ..
1129                            }) => {
1130                                tracing::info!(
1131                                    skill_dir = %dir.display(),
1132                                    kref,
1133                                    "daemon-startup: registered skill with Kumiho",
1134                                );
1135                                true
1136                            }
1137                            Ok(
1138                                crate::skills::registration::SkillRegistration::AlreadyRegistered {
1139                                    ..
1140                                },
1141                            ) => {
1142                                // No-op — common case after first run.
1143                                true
1144                            }
1145                            Err(e) => {
1146                                tracing::warn!(
1147                                    skill_dir = %dir.display(),
1148                                    error = ?e,
1149                                    "daemon-startup: skill registration failed; will retry next start",
1150                                );
1151                                false
1152                            }
1153                        };
1154
1155                    // Step 6d: keep SKILL.toml's content_file consistent
1156                    // with whatever revision currently holds the
1157                    // `published` tag.  Catches improvements that
1158                    // landed while the daemon was offline.  No-op when
1159                    // already in sync.  Skipped when registration just
1160                    // failed because the kref isn't valid yet.
1161                    if !registration_ok {
1162                        continue;
1163                    }
1164                    match crate::skills::registration::sync_published_content_path(
1165                        &dir,
1166                        &scan_client,
1167                    )
1168                    .await
1169                    {
1170                        Ok(crate::skills::registration::SkillContentSync::Updated {
1171                            new_content_file,
1172                            ..
1173                        }) => {
1174                            tracing::info!(
1175                                skill_dir = %dir.display(),
1176                                content_file = %new_content_file,
1177                                "daemon-startup: synced skill content_file from published kref",
1178                            );
1179                        }
1180                        Ok(_) => {
1181                            // NotRegistered (we just registered it) or
1182                            // AlreadyCurrent — both silent.
1183                        }
1184                        Err(e) => {
1185                            tracing::warn!(
1186                                skill_dir = %dir.display(),
1187                                error = ?e,
1188                                "daemon-startup: skill content_file sync failed; \
1189                                 loader will use existing pointer until next sync",
1190                            );
1191                        }
1192                    }
1193                }
1194            }));
1195        }
1196
1197        // Load the skill names once at startup — the refresh task only
1198        // queries Kumiho for skills already loaded into the runtime, so a
1199        // brand-new skill added later won't be reranked until restart.
1200        let skill_names: Vec<String> =
1201            crate::skills::load_skills_with_config(&workspace_dir, &config_for_skills)
1202                .into_iter()
1203                .map(|s| s.name)
1204                .collect();
1205
1206        if skill_names.is_empty() {
1207            tracing::info!("skill effectiveness: no skills loaded; refresh task skipped");
1208        } else {
1209            let kumiho_client = Arc::new(crate::gateway::kumiho_client::build_client_from_config(
1210                &config,
1211            ));
1212            tracing::info!(
1213                count = skill_names.len(),
1214                project = %memory_project,
1215                "skill effectiveness: starting background refresh task",
1216            );
1217            // JoinHandle is dropped (refresh task lives until process exit).
1218            // Drop the JoinHandle — task is fire-and-forget and lives
1219            // until process exit.  `drop()` rather than `let _ =` to
1220            // silence clippy::let_underscore_future (the JoinHandle is
1221            // a Future, but dropping it does not cancel a tokio task).
1222            // Future improvement: store the handle on AppState and
1223            // abort on shutdown so tests don't leak the task.
1224            drop(effectiveness_cache.clone().spawn_refresh_task(
1225                kumiho_client.clone(),
1226                memory_project,
1227                skill_names,
1228                crate::skills::effectiveness_cache::DEFAULT_REFRESH_INTERVAL,
1229            ));
1230
1231            // ── Auto-improve loop (closes the self-improvement loop) ─────
1232            //
1233            // On the same cadence as the cache refresh, walk the latest
1234            // candidates and ask the LLM to rewrite each regressed skill.
1235            // Cooldowns + atomic-write live inside SkillImprover, so this
1236            // task is just the dispatcher.  Skipped entirely when the
1237            // skill-creation feature is disabled, when the user has
1238            // turned auto-improvement off in config, or when the daemon
1239            // has no chat provider configured.
1240            #[cfg(feature = "skill-creation")]
1241            if config.skills.skill_improvement.enabled {
1242                let cache_for_improver = effectiveness_cache.clone();
1243                let auto_workspace = workspace_dir.clone();
1244                let auto_provider = state.provider.clone();
1245                let auto_model = state.model.clone();
1246                let auto_improver_config = config.skills.skill_improvement.clone();
1247                let auto_kumiho = kumiho_client.clone();
1248                let auto_project = config.kumiho.memory_project.clone();
1249
1250                tracing::info!(
1251                    cooldown_secs = auto_improver_config.cooldown_secs,
1252                    "skill auto-improve: starting LLM-driven rewrite loop",
1253                );
1254
1255                drop(tokio::spawn(async move {
1256                    let ctx = crate::skills::auto_improve::AutoImproveContext {
1257                        workspace_dir: auto_workspace.clone(),
1258                        provider: auto_provider,
1259                        model: auto_model,
1260                        temperature: crate::skills::auto_improve::DEFAULT_REWRITE_TEMPERATURE,
1261                        kumiho_client: auto_kumiho.clone(),
1262                        memory_project: auto_project.clone(),
1263                    };
1264                    let mut improver = crate::skills::improver::SkillImprover::new(
1265                        auto_workspace.clone(),
1266                        auto_improver_config,
1267                    );
1268
1269                    // Step 6f-B: rollback context shares the workspace
1270                    // and Kumiho client with the improver so a single
1271                    // background loop runs both the improvement pass
1272                    // and the regression-rollback pass on each tick.
1273                    let rollback_ctx = crate::skills::auto_rollback::AutoRollbackContext {
1274                        workspace_dir: auto_workspace.clone(),
1275                        kumiho_client: auto_kumiho,
1276                        memory_project: auto_project,
1277                    };
1278                    let mut rollback_tracker =
1279                        crate::skills::auto_rollback::SkillRollbackTracker::new(auto_workspace);
1280
1281                    let mut ticker = tokio::time::interval(
1282                        crate::skills::effectiveness_cache::DEFAULT_REFRESH_INTERVAL,
1283                    );
1284                    ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1285                    // First tick fires immediately; cache is still empty
1286                    // at startup so improvement_candidates() returns [].
1287                    // Subsequent ticks see whatever the refresh task has
1288                    // populated.
1289
1290                    loop {
1291                        ticker.tick().await;
1292                        for cand in cache_for_improver.improvement_candidates() {
1293                            match crate::skills::auto_improve::attempt_skill_improvement(
1294                                &ctx,
1295                                &cand,
1296                                &mut improver,
1297                            )
1298                            .await
1299                            {
1300                                Ok(Some(outcome)) => tracing::info!(
1301                                    skill = %outcome.slug,
1302                                    revision_kref = %outcome.revision_kref,
1303                                    content_file = %outcome.content_file,
1304                                    rate = cand.rate,
1305                                    total = cand.total,
1306                                    "auto-improve: published new skill revision",
1307                                ),
1308                                Ok(None) => {
1309                                    // cooldown / file missing / no markdown fence — silent skip
1310                                }
1311                                Err(e) => tracing::warn!(
1312                                    skill = %cand.skill_name,
1313                                    error = %e,
1314                                    "auto-improve: attempt failed",
1315                                ),
1316                            }
1317                        }
1318
1319                        // Step 6f-B: regression rollback pass.  Iterates
1320                        // skills whose freshly-published revision is
1321                        // measurably worse than its predecessor and
1322                        // retags `published` back onto the previous
1323                        // revision.  Cooldown is per-skill so a healthy
1324                        // skill that ping-pongs once won't be rolled
1325                        // back again before its stats settle.
1326                        for cand in cache_for_improver.regression_candidates() {
1327                            match crate::skills::auto_rollback::attempt_skill_rollback(
1328                                &rollback_ctx,
1329                                &cand,
1330                                &mut rollback_tracker,
1331                            )
1332                            .await
1333                            {
1334                                Ok(Some(outcome)) => tracing::warn!(
1335                                    skill = %outcome.slug,
1336                                    restored_revision_kref = %outcome.restored_revision_kref,
1337                                    demoted_revision_kref = %outcome.demoted_revision_kref,
1338                                    content_file = %outcome.content_file,
1339                                    current_rate = cand.current_rate,
1340                                    previous_rate = cand.previous_rate,
1341                                    "auto-rollback: reverted regressed skill revision",
1342                                ),
1343                                Ok(None) => {
1344                                    // cooldown / no rollback target — silent skip
1345                                }
1346                                Err(e) => tracing::warn!(
1347                                    skill = %cand.skill_name,
1348                                    error = %e,
1349                                    "auto-rollback: attempt failed",
1350                                ),
1351                            }
1352                        }
1353                    }
1354                }));
1355            }
1356        }
1357    }
1358
1359    // ── Spawn the in-process MCP server ─────────────────────────────────
1360    //
1361    // Binds 127.0.0.1:0 (ephemeral, as external clients expect), writes
1362    // ~/.construct/mcp.json with {url,pid,started_at} atomically, and tears
1363    // down on gateway shutdown. If the bind fails we log and move on —
1364    // this is non-fatal for the gateway itself.
1365    let mcp_shutdown_watch = state.shutdown_tx.subscribe();
1366    let mcp_task: Option<tokio::task::JoinHandle<()>> = {
1367        let (mcp_state, mcp_skipped) = crate::mcp_server::server::state_from_runtime(
1368            &mcp_workspace_dir,
1369            &mcp_config_snapshot,
1370            &mcp_runtime_handles,
1371        );
1372        for (name, reason) in &mcp_skipped {
1373            tracing::info!("mcp-server: skipped tool `{name}` — {reason}");
1374        }
1375        tracing::info!(
1376            "mcp-server: advertising {} tools to external MCP clients",
1377            mcp_state.tools.len()
1378        );
1379
1380        match crate::mcp_server::serve_on(SocketAddr::from(([127, 0, 0, 1], 0)), mcp_state).await {
1381            Ok(handle) => {
1382                let url = format!("http://{}/mcp", handle.addr);
1383                if let Err(e) = crate::mcp_server::server::write_discovery_file(&url) {
1384                    tracing::warn!("mcp-server: failed to write discovery file: {e}");
1385                } else {
1386                    tracing::info!(
1387                        "mcp-server: listening on {url} (discovery at ~/.construct/mcp.json)"
1388                    );
1389                }
1390
1391                // Expose the bound base URL (no `/mcp` suffix) to the gateway
1392                // reverse-proxy handlers in `api_mcp`.
1393                state.mcp_local_url = Some(Arc::from(format!("http://{}", handle.addr)));
1394
1395                let mut watch = mcp_shutdown_watch;
1396                Some(tokio::spawn(async move {
1397                    // Wait for the gateway's shutdown signal.
1398                    loop {
1399                        if *watch.borrow() {
1400                            break;
1401                        }
1402                        if watch.changed().await.is_err() {
1403                            break;
1404                        }
1405                    }
1406                    let _ = handle.shutdown.send(());
1407                    let _ = handle.joined.await;
1408                    crate::mcp_server::server::cleanup_discovery_file();
1409                    tracing::info!("mcp-server: stopped");
1410                }))
1411            }
1412            Err(e) => {
1413                tracing::error!("mcp-server: failed to bind: {e}");
1414                None
1415            }
1416        }
1417    };
1418
1419    // Config PUT needs larger body limit (1MB)
1420    let config_put_router = Router::new()
1421        .route("/api/config", put(api::handle_api_config_put))
1422        .layer(RequestBodyLimitLayer::new(1_048_576));
1423
1424    // Memory graph needs longer timeout (aggregates many Kumiho calls via operator).
1425    // Built as a separate router that gets merged AFTER the global timeout layer.
1426    let memory_graph_router = Router::new()
1427        .route(
1428            "/api/memory/graph",
1429            get(api_memory_graph::handle_memory_graph),
1430        )
1431        .with_state(state.clone())
1432        .layer(RequestBodyLimitLayer::new(MAX_BODY_SIZE))
1433        .layer(TimeoutLayer::with_status_code(
1434            StatusCode::REQUEST_TIMEOUT,
1435            Duration::from_secs(60),
1436        ));
1437
1438    // Build router with middleware
1439    let inner = Router::new()
1440        // ── Admin routes (for CLI management) ──
1441        .route("/admin/shutdown", post(handle_admin_shutdown))
1442        .route("/admin/paircode", get(handle_admin_paircode))
1443        .route("/admin/paircode/new", post(handle_admin_paircode_new))
1444        // ── Existing routes ──
1445        .route("/health", get(handle_health))
1446        .route("/metrics", get(handle_metrics))
1447        .route("/pair", post(handle_pair))
1448        .route("/pair/code", get(handle_pair_code))
1449        .route("/webhook", post(handle_webhook))
1450        .route("/whatsapp", get(handle_whatsapp_verify))
1451        .route("/whatsapp", post(handle_whatsapp_message))
1452        .route("/linq", post(handle_linq_webhook))
1453        .route("/wati", get(handle_wati_verify))
1454        .route("/wati", post(handle_wati_webhook))
1455        .route("/nextcloud-talk", post(handle_nextcloud_talk_webhook))
1456        .route("/webhook/gmail", post(handle_gmail_push_webhook))
1457        // ── Claude Code runner hooks ──
1458        .route("/hooks/claude-code", post(api::handle_claude_code_hook))
1459        // ── Web Dashboard API routes ──
1460        .route("/api/status", get(api::handle_api_status))
1461        .route("/api/config", get(api::handle_api_config_get))
1462        .route("/api/tools", get(api::handle_api_tools))
1463        .route("/api/cron", get(api::handle_api_cron_list))
1464        .route("/api/cron", post(api::handle_api_cron_add))
1465        .route(
1466            "/api/cron/settings",
1467            get(api::handle_api_cron_settings_get).patch(api::handle_api_cron_settings_patch),
1468        )
1469        .route(
1470            "/api/cron/{id}",
1471            delete(api::handle_api_cron_delete).patch(api::handle_api_cron_patch),
1472        )
1473        .route("/api/cron/{id}/runs", get(api::handle_api_cron_runs))
1474        .route("/api/integrations", get(api::handle_api_integrations))
1475        .route(
1476            "/api/integrations/settings",
1477            get(api::handle_api_integrations_settings),
1478        )
1479        .route(
1480            "/api/doctor",
1481            get(api::handle_api_doctor).post(api::handle_api_doctor),
1482        )
1483        // Old /api/memory CRUD removed — use Kumiho via /api/memory/graph instead.
1484        .route("/api/cost", get(api::handle_api_cost))
1485        .route("/api/audit", get(api::handle_api_audit))
1486        .route("/api/audit/verify", get(api::handle_api_audit_verify))
1487        .route("/api/cli-tools", get(api::handle_api_cli_tools))
1488        .route("/api/health", get(api::handle_api_health))
1489        .route("/api/mcp/discovery", get(api_mcp::handle_api_mcp_discovery))
1490        .route("/api/mcp/servers/test", post(api_mcp::handle_api_mcp_servers_test))
1491        // ── MCP HTTP reverse-proxy (browser stays same-origin) ──
1492        .route("/api/mcp/health", get(api_mcp::handle_api_mcp_health))
1493        .route("/api/mcp/session", post(api_mcp::handle_api_mcp_session_create))
1494        .route(
1495            "/api/mcp/session/{session_id}/events",
1496            get(api_mcp::handle_api_mcp_session_events),
1497        )
1498        .route("/api/mcp/call", post(api_mcp::handle_api_mcp_call))
1499        .route("/api/nodes", get(api::handle_api_nodes))
1500        .route("/api/nodes/{node_id}/invoke", post(api::handle_api_node_invoke))
1501        .route("/api/sessions", get(api::handle_api_sessions_list))
1502        .route("/api/sessions/running", get(api::handle_api_sessions_running))
1503        .route(
1504            "/api/sessions/{id}/messages",
1505            get(api::handle_api_session_messages),
1506        )
1507        .route("/api/sessions/{id}", delete(api::handle_api_session_delete).put(api::handle_api_session_rename))
1508        .route("/api/sessions/{id}/state", get(api::handle_api_session_state))
1509        // ── Channel detail API ──
1510        .route("/api/channels", get(api::handle_api_channels))
1511        .route("/api/channel-events", post(api::handle_api_channel_events))
1512        // ── Agent management API (proxied to Kumiho FastAPI) ──
1513        .route("/api/agents", get(api_agents::handle_list_agents).post(api_agents::handle_create_agent))
1514        .route("/api/agents/deprecate", post(api_agents::handle_deprecate_agent))
1515        .route("/api/agents/{*kref}", put(api_agents::handle_update_agent).delete(api_agents::handle_delete_agent))
1516        // ── Skill management API (proxied to Kumiho FastAPI) ──
1517        .route("/api/skills", get(api_skills::handle_list_skills).post(api_skills::handle_create_skill))
1518        .route("/api/skills/deprecate", post(api_skills::handle_deprecate_skill))
1519        .route("/api/skills/{*kref}", get(api_skills::handle_get_skill).put(api_skills::handle_update_skill).delete(api_skills::handle_delete_skill))
1520        // ── Team management API (proxied to Kumiho FastAPI) ──
1521        .route("/api/teams", get(api_teams::handle_list_teams).post(api_teams::handle_create_team))
1522        .route("/api/teams/deprecate", post(api_teams::handle_deprecate_team))
1523        .route("/api/teams/{*kref}", get(api_teams::handle_get_team).put(api_teams::handle_update_team).delete(api_teams::handle_delete_team))
1524        // ── Workflow management API (proxied to Kumiho FastAPI) ──
1525        .route("/api/workflows", get(api_workflows::handle_list_workflows).post(api_workflows::handle_create_workflow))
1526        .route("/api/workflows/deprecate", post(api_workflows::handle_deprecate_workflow))
1527        .route("/api/workflows/run/{name}", post(api_workflows::handle_run_workflow))
1528        .route("/api/workflows/revisions/{*kref}", get(api_workflows::handle_get_workflow_by_revision))
1529        .route("/api/workflows/{*kref}", put(api_workflows::handle_update_workflow).delete(api_workflows::handle_delete_workflow))
1530        .route("/api/workflows/runs", get(api_workflows::handle_list_workflow_runs))
1531        .route("/api/workflows/runs/{run_id}", get(api_workflows::handle_get_workflow_run).delete(api_workflows::handle_delete_workflow_run))
1532        .route("/api/workflows/runs/{run_id}/approve", post(api_workflows::handle_approve_workflow_run))
1533        .route("/api/workflows/runs/{run_id}/retry", post(api_workflows::handle_retry_workflow_run))
1534        .route("/api/workflows/agent-activity/{agent_id}", get(api_workflows::handle_agent_activity))
1535        .route("/api/workflows/dashboard", get(api_workflows::handle_workflow_dashboard))
1536        // ── ClawHub marketplace API ──
1537        .route("/api/clawhub/search", get(api_clawhub::handle_clawhub_search))
1538        .route("/api/clawhub/trending", get(api_clawhub::handle_clawhub_trending))
1539        .route("/api/clawhub/skills/{slug}", get(api_clawhub::handle_clawhub_skill_detail))
1540        .route("/api/clawhub/install/{slug}", post(api_clawhub::handle_clawhub_install))
1541        // NOTE: Memory graph route is merged separately with its own 60s timeout
1542        // ── Generic Kumiho API proxy (for Asset Browser, Memory Auditor, etc.) ──
1543        .route("/api/kumiho/{*path}", get(api_kumiho_proxy::handle_kumiho_proxy))
1544        // ── Artifact body (serve local file bytes referenced by Kumiho) ──
1545        .route("/api/artifact-body", get(api_artifact_body::handle_artifact_body))
1546        // ── Pairing + Device management API ──
1547        .route("/api/pairing/initiate", post(api_pairing::initiate_pairing))
1548        .route("/api/pair", post(api_pairing::submit_pairing_enhanced))
1549        .route("/api/devices", get(api_pairing::list_devices))
1550        .route("/api/devices/{id}", delete(api_pairing::revoke_device))
1551        .route(
1552            "/api/devices/{id}/token/rotate",
1553            post(api_pairing::rotate_token),
1554        )
1555        // ── Live Canvas (A2UI) routes ──
1556        .route("/api/canvas", get(canvas::handle_canvas_list))
1557        .route(
1558            "/api/canvas/{id}",
1559            get(canvas::handle_canvas_get)
1560                .post(canvas::handle_canvas_post)
1561                .delete(canvas::handle_canvas_clear),
1562        )
1563        .route(
1564            "/api/canvas/{id}/history",
1565            get(canvas::handle_canvas_history),
1566        );
1567
1568    // ── WebAuthn hardware key authentication API (requires webauthn feature) ──
1569    #[cfg(feature = "webauthn")]
1570    let inner = inner
1571        .route(
1572            "/api/webauthn/register/start",
1573            post(api_webauthn::handle_register_start),
1574        )
1575        .route(
1576            "/api/webauthn/register/finish",
1577            post(api_webauthn::handle_register_finish),
1578        )
1579        .route(
1580            "/api/webauthn/auth/start",
1581            post(api_webauthn::handle_auth_start),
1582        )
1583        .route(
1584            "/api/webauthn/auth/finish",
1585            post(api_webauthn::handle_auth_finish),
1586        )
1587        .route(
1588            "/api/webauthn/credentials",
1589            get(api_webauthn::handle_list_credentials),
1590        )
1591        .route(
1592            "/api/webauthn/credentials/{id}",
1593            delete(api_webauthn::handle_delete_credential),
1594        );
1595
1596    // ── Plugin management API (requires plugins-wasm feature) ──
1597    #[cfg(feature = "plugins-wasm")]
1598    let inner = inner.route(
1599        "/api/plugins",
1600        get(api_plugins::plugin_routes::list_plugins),
1601    );
1602
1603    let inner = inner
1604        // ── SSE event stream ──
1605        .route("/api/events", get(sse::handle_sse_events))
1606        .route("/api/daemon/logs", get(sse::handle_api_daemon_logs))
1607        // ── WebSocket agent chat ──
1608        .route("/ws/chat", get(ws::handle_ws_chat))
1609        // ── WebSocket canvas updates ──
1610        .route("/ws/canvas/{id}", get(canvas::handle_ws_canvas))
1611        // ── WebSocket node discovery ──
1612        .route("/ws/nodes", get(nodes::handle_ws_nodes));
1613
1614    // ── WebSocket PTY terminal ──
1615    // portable-pty needs `openpty` from libutil which Android's NDK does not
1616    // reliably link, and a phone-runtime websocket terminal isn't a real
1617    // surface anyway.  Skip the route on Android.
1618    #[cfg(not(target_os = "android"))]
1619    let inner = inner.route("/ws/terminal", get(terminal::handle_ws_terminal));
1620
1621    let inner = inner
1622        // ── WebSocket proxy onto the in-process MCP server's session events ──
1623        .route("/ws/mcp/events", get(ws_mcp_events::handle_ws_mcp_events))
1624        // ── Static assets (web dashboard) ──
1625        .route("/_app/{*path}", get(static_files::handle_static))
1626        // ── Config PUT with larger body limit ──
1627        .merge(config_put_router)
1628        // ── SPA fallback: non-API GET requests serve index.html ──
1629        .fallback(get(static_files::handle_spa_fallback))
1630        .with_state(state)
1631        .layer(RequestBodyLimitLayer::new(MAX_BODY_SIZE))
1632        .layer(TimeoutLayer::with_status_code(
1633            StatusCode::REQUEST_TIMEOUT,
1634            Duration::from_secs(gateway_request_timeout_secs()),
1635        ));
1636
1637    // Merge memory graph router (has its own 60s timeout, outside the global 30s)
1638    let inner = inner.merge(memory_graph_router);
1639
1640    // Nest under path prefix when configured (axum strips prefix before routing).
1641    // nest() at "/prefix" handles both "/prefix" and "/prefix/*" but not "/prefix/"
1642    // with a trailing slash, so we add a fallback redirect for that case.
1643    let app = if let Some(prefix) = path_prefix {
1644        let redirect_target = prefix.to_string();
1645        Router::new().nest(prefix, inner).route(
1646            &format!("{prefix}/"),
1647            get(|| async move { axum::response::Redirect::permanent(&redirect_target) }),
1648        )
1649    } else {
1650        inner
1651    };
1652
1653    // ── TLS / mTLS setup ───────────────────────────────────────────
1654    let tls_acceptor = match &config.gateway.tls {
1655        Some(tls_cfg) if tls_cfg.enabled => {
1656            let has_mtls = tls_cfg.client_auth.as_ref().is_some_and(|ca| ca.enabled);
1657            if has_mtls {
1658                tracing::info!("TLS enabled with mutual TLS (mTLS) client verification");
1659            } else {
1660                tracing::info!("TLS enabled (no client certificate requirement)");
1661            }
1662            Some(tls::build_tls_acceptor(tls_cfg)?)
1663        }
1664        _ => None,
1665    };
1666
1667    if let Some(tls_acceptor) = tls_acceptor {
1668        // Manual TLS accept loop — serves each connection via hyper.
1669        let app = app.into_make_service_with_connect_info::<SocketAddr>();
1670        let mut app = app;
1671
1672        let mut shutdown_signal = shutdown_rx;
1673        loop {
1674            tokio::select! {
1675                conn = listener.accept() => {
1676                    let (tcp_stream, remote_addr) = conn?;
1677                    let tls_acceptor = tls_acceptor.clone();
1678                    let svc = tower::MakeService::<
1679                        SocketAddr,
1680                        hyper::Request<hyper::body::Incoming>,
1681                    >::make_service(&mut app, remote_addr)
1682                    .await
1683                    .expect("infallible make_service");
1684
1685                    tokio::spawn(async move {
1686                        let tls_stream = match tls_acceptor.accept(tcp_stream).await {
1687                            Ok(s) => s,
1688                            Err(e) => {
1689                                tracing::debug!("TLS handshake failed from {remote_addr}: {e}");
1690                                return;
1691                            }
1692                        };
1693                        let io = hyper_util::rt::TokioIo::new(tls_stream);
1694                        let hyper_svc = hyper::service::service_fn(move |req: hyper::Request<hyper::body::Incoming>| {
1695                            let mut svc = svc.clone();
1696                            async move {
1697                                tower::Service::call(&mut svc, req).await
1698                            }
1699                        });
1700                        if let Err(e) = hyper_util::server::conn::auto::Builder::new(
1701                            hyper_util::rt::TokioExecutor::new(),
1702                        )
1703                        .serve_connection(io, hyper_svc)
1704                        .await
1705                        {
1706                            tracing::debug!("connection error from {remote_addr}: {e}");
1707                        }
1708                    });
1709                }
1710                _ = shutdown_signal.changed() => {
1711                    tracing::info!("🦀 Construct Gateway shutting down...");
1712                    break;
1713                }
1714            }
1715        }
1716    } else {
1717        // Plain TCP — use axum's built-in serve.
1718        axum::serve(
1719            listener,
1720            app.into_make_service_with_connect_info::<SocketAddr>(),
1721        )
1722        .with_graceful_shutdown(async move {
1723            let _ = shutdown_rx.changed().await;
1724            tracing::info!("🦀 Construct Gateway shutting down...");
1725        })
1726        .await?;
1727    }
1728
1729    // Wait for the in-process MCP task to finish its own graceful shutdown.
1730    // It watches the same `shutdown_tx` we just flipped above.
1731    if let Some(task) = mcp_task {
1732        let _ = task.await;
1733    }
1734
1735    Ok(())
1736}
1737
1738// ══════════════════════════════════════════════════════════════════════════════
1739// AXUM HANDLERS
1740// ══════════════════════════════════════════════════════════════════════════════
1741
1742/// GET /health — always public (no secrets leaked)
1743async fn handle_health(State(state): State<AppState>) -> impl IntoResponse {
1744    let body = serde_json::json!({
1745        "status": "ok",
1746        "paired": state.pairing.is_paired(),
1747        "require_pairing": state.pairing.require_pairing(),
1748        "runtime": crate::health::snapshot_json(),
1749    });
1750    Json(body)
1751}
1752
1753/// Prometheus content type for text exposition format.
1754const PROMETHEUS_CONTENT_TYPE: &str = "text/plain; version=0.0.4; charset=utf-8";
1755
1756fn prometheus_disabled_hint() -> String {
1757    String::from(
1758        "# Prometheus backend not enabled. Set [observability] backend = \"prometheus\" in config.\n",
1759    )
1760}
1761
1762#[cfg(feature = "observability-prometheus")]
1763fn prometheus_observer_from_state(
1764    observer: &dyn crate::observability::Observer,
1765) -> Option<&crate::observability::PrometheusObserver> {
1766    observer
1767        .as_any()
1768        .downcast_ref::<crate::observability::PrometheusObserver>()
1769        .or_else(|| {
1770            observer
1771                .as_any()
1772                .downcast_ref::<sse::BroadcastObserver>()
1773                .and_then(|broadcast| {
1774                    broadcast
1775                        .inner()
1776                        .as_any()
1777                        .downcast_ref::<crate::observability::PrometheusObserver>()
1778                })
1779        })
1780}
1781
1782/// GET /metrics — Prometheus text exposition format
1783async fn handle_metrics(State(state): State<AppState>) -> impl IntoResponse {
1784    let body = {
1785        #[cfg(feature = "observability-prometheus")]
1786        {
1787            if let Some(prom) = prometheus_observer_from_state(state.observer.as_ref()) {
1788                prom.encode()
1789            } else {
1790                prometheus_disabled_hint()
1791            }
1792        }
1793        #[cfg(not(feature = "observability-prometheus"))]
1794        {
1795            let _ = &state;
1796            prometheus_disabled_hint()
1797        }
1798    };
1799
1800    (
1801        StatusCode::OK,
1802        [(header::CONTENT_TYPE, PROMETHEUS_CONTENT_TYPE)],
1803        body,
1804    )
1805}
1806
1807/// POST /pair — exchange one-time code for bearer token
1808#[axum::debug_handler]
1809async fn handle_pair(
1810    State(state): State<AppState>,
1811    ConnectInfo(peer_addr): ConnectInfo<SocketAddr>,
1812    headers: HeaderMap,
1813) -> impl IntoResponse {
1814    let rate_key =
1815        client_key_from_request(Some(peer_addr), &headers, state.trust_forwarded_headers);
1816    let peer_is_loopback = peer_addr.ip().is_loopback();
1817    if !state.rate_limiter.allow_pair(&rate_key) {
1818        tracing::warn!("/pair rate limit exceeded");
1819        let err = serde_json::json!({
1820            "error": "Too many pairing requests. Please retry later.",
1821            "retry_after": RATE_LIMIT_WINDOW_SECS,
1822        });
1823        return (StatusCode::TOO_MANY_REQUESTS, Json(err));
1824    }
1825
1826    // ── Auth rate limiting (brute-force protection) ──
1827    if let Err(e) = state
1828        .auth_limiter
1829        .check_rate_limit(&rate_key, peer_is_loopback)
1830    {
1831        tracing::warn!("🔐 Pairing auth rate limit exceeded for {rate_key}");
1832        let err = serde_json::json!({
1833            "error": format!("Too many auth attempts. Try again in {}s.", e.retry_after_secs),
1834            "retry_after": e.retry_after_secs,
1835        });
1836        return (StatusCode::TOO_MANY_REQUESTS, Json(err));
1837    }
1838
1839    let code = headers
1840        .get("X-Pairing-Code")
1841        .and_then(|v| v.to_str().ok())
1842        .unwrap_or("");
1843
1844    match state.pairing.try_pair(code, &rate_key).await {
1845        Ok(Some(token)) => {
1846            tracing::info!("🔐 New client paired successfully");
1847            if let Some(ref logger) = state.audit_logger {
1848                let _ =
1849                    logger.log_auth_success("gateway", &format!("Client paired from {rate_key}"));
1850            }
1851
1852            // Mirror the pairing into the SQLite device registry so the
1853            // dashboard's Pairing page lists this client. The legacy /pair
1854            // path historically only wrote to gateway.paired_tokens (auth);
1855            // the new /api/pairing/* flow writes both. Without this mirror,
1856            // anyone pairing via X-Pairing-Code (the documented flow for
1857            // the one-time onboarding code, including the QR-printed code
1858            // shown at startup) appears to have an empty Paired Devices
1859            // table even though their bearer token works.
1860            if let Some(ref registry) = state.device_registry {
1861                use chrono::Utc;
1862                let now = Utc::now();
1863                let info = api_pairing::DeviceInfo {
1864                    id: uuid::Uuid::new_v4().to_string(),
1865                    name: None,
1866                    device_type: Some("legacy-pair".to_string()),
1867                    paired_at: now,
1868                    last_seen: now,
1869                    ip_address: Some(peer_addr.ip().to_string()),
1870                };
1871                if let Err(err) =
1872                    registry.register(crate::security::pairing::hash_token(&token), info)
1873                {
1874                    tracing::warn!(
1875                        "🔐 Pairing succeeded but device registry insert failed: {err:#}"
1876                    );
1877                }
1878            }
1879
1880            if let Err(err) =
1881                Box::pin(persist_pairing_tokens(state.config.clone(), &state.pairing)).await
1882            {
1883                tracing::error!("🔐 Pairing succeeded but token persistence failed: {err:#}");
1884                let body = serde_json::json!({
1885                    "paired": true,
1886                    "persisted": false,
1887                    "token": token,
1888                    "message": "Paired for this process, but failed to persist token to config.toml. Check config path and write permissions.",
1889                });
1890                return (StatusCode::OK, Json(body));
1891            }
1892
1893            let body = serde_json::json!({
1894                "paired": true,
1895                "persisted": true,
1896                "token": token,
1897                "message": "Save this token — use it as Authorization: Bearer <token>"
1898            });
1899            (StatusCode::OK, Json(body))
1900        }
1901        Ok(None) => {
1902            state
1903                .auth_limiter
1904                .record_attempt(&rate_key, peer_is_loopback);
1905            tracing::warn!("🔐 Pairing attempt with invalid code");
1906            if let Some(ref logger) = state.audit_logger {
1907                let _ = logger
1908                    .log_auth_failure("gateway", &format!("Invalid pairing code from {rate_key}"));
1909            }
1910            let err = serde_json::json!({"error": "Invalid pairing code"});
1911            (StatusCode::FORBIDDEN, Json(err))
1912        }
1913        Err(lockout_secs) => {
1914            tracing::warn!(
1915                "🔐 Pairing locked out — too many failed attempts ({lockout_secs}s remaining)"
1916            );
1917            if let Some(ref logger) = state.audit_logger {
1918                let _ = logger.log_auth_failure(
1919                    "gateway",
1920                    &format!("Pairing lockout for {rate_key} ({lockout_secs}s)"),
1921                );
1922            }
1923            let err = serde_json::json!({
1924                "error": format!("Too many failed attempts. Try again in {lockout_secs}s."),
1925                "retry_after": lockout_secs
1926            });
1927            (StatusCode::TOO_MANY_REQUESTS, Json(err))
1928        }
1929    }
1930}
1931
1932async fn persist_pairing_tokens(config: Arc<Mutex<Config>>, pairing: &PairingGuard) -> Result<()> {
1933    let paired_tokens = pairing.tokens();
1934    // This is needed because parking_lot's guard is not Send so we clone the inner
1935    // this should be removed once async mutexes are used everywhere
1936    let mut updated_cfg = { config.lock().clone() };
1937    updated_cfg.gateway.paired_tokens = paired_tokens;
1938    updated_cfg
1939        .save()
1940        .await
1941        .context("Failed to persist paired tokens to config.toml")?;
1942
1943    // Keep shared runtime config in sync with persisted tokens.
1944    *config.lock() = updated_cfg;
1945    Ok(())
1946}
1947
1948/// Simple chat for webhook endpoint (no tools, for backward compatibility and testing).
1949async fn run_gateway_chat_simple(state: &AppState, message: &str) -> anyhow::Result<String> {
1950    let user_messages = vec![ChatMessage::user(message)];
1951
1952    // Keep webhook/gateway prompts aligned with channel behavior by injecting
1953    // workspace-aware system context before model invocation.
1954    let system_prompt = {
1955        let config_guard = state.config.lock();
1956        crate::channels::build_system_prompt(
1957            &config_guard.workspace_dir,
1958            &state.model,
1959            &[], // tools - empty for simple chat
1960            &[], // skills
1961            Some(&config_guard.identity),
1962            None, // bootstrap_max_chars - use default
1963        )
1964    };
1965
1966    let mut messages = Vec::with_capacity(1 + user_messages.len());
1967    messages.push(ChatMessage::system(system_prompt));
1968    messages.extend(user_messages);
1969
1970    let multimodal_config = state.config.lock().multimodal.clone();
1971    let prepared =
1972        crate::multimodal::prepare_messages_for_provider(&messages, &multimodal_config).await?;
1973
1974    state
1975        .provider
1976        .chat_with_history(&prepared.messages, &state.model, state.temperature)
1977        .await
1978}
1979
1980/// Full-featured chat with tools for channel handlers (WhatsApp, Linq, Nextcloud Talk).
1981async fn run_gateway_chat_with_tools(
1982    state: &AppState,
1983    message: &str,
1984    session_id: Option<&str>,
1985) -> anyhow::Result<String> {
1986    let config = state.config.lock().clone();
1987    Box::pin(crate::agent::process_message(config, message, session_id)).await
1988}
1989
1990/// Webhook request body
1991#[derive(serde::Deserialize)]
1992pub struct WebhookBody {
1993    pub message: String,
1994}
1995
1996/// POST /webhook — main webhook endpoint
1997async fn handle_webhook(
1998    State(state): State<AppState>,
1999    ConnectInfo(peer_addr): ConnectInfo<SocketAddr>,
2000    headers: HeaderMap,
2001    body: Result<Json<WebhookBody>, axum::extract::rejection::JsonRejection>,
2002) -> impl IntoResponse {
2003    let rate_key =
2004        client_key_from_request(Some(peer_addr), &headers, state.trust_forwarded_headers);
2005    let peer_is_loopback = peer_addr.ip().is_loopback();
2006    if !state.rate_limiter.allow_webhook(&rate_key) {
2007        tracing::warn!("/webhook rate limit exceeded");
2008        let err = serde_json::json!({
2009            "error": "Too many webhook requests. Please retry later.",
2010            "retry_after": RATE_LIMIT_WINDOW_SECS,
2011        });
2012        return (StatusCode::TOO_MANY_REQUESTS, Json(err));
2013    }
2014
2015    // ── Bearer token auth (pairing) with auth rate limiting ──
2016    if state.pairing.require_pairing() {
2017        if let Err(e) = state
2018            .auth_limiter
2019            .check_rate_limit(&rate_key, peer_is_loopback)
2020        {
2021            tracing::warn!("Webhook: auth rate limit exceeded for {rate_key}");
2022            let err = serde_json::json!({
2023                "error": format!("Too many auth attempts. Try again in {}s.", e.retry_after_secs),
2024                "retry_after": e.retry_after_secs,
2025            });
2026            return (StatusCode::TOO_MANY_REQUESTS, Json(err));
2027        }
2028        let auth = headers
2029            .get(header::AUTHORIZATION)
2030            .and_then(|v| v.to_str().ok())
2031            .unwrap_or("");
2032        let token = auth.strip_prefix("Bearer ").unwrap_or("");
2033        if !state.pairing.is_authenticated(token) {
2034            state
2035                .auth_limiter
2036                .record_attempt(&rate_key, peer_is_loopback);
2037            tracing::warn!("Webhook: rejected — not paired / invalid bearer token");
2038            let err = serde_json::json!({
2039                "error": "Unauthorized — pair first via POST /pair, then send Authorization: Bearer <token>"
2040            });
2041            return (StatusCode::UNAUTHORIZED, Json(err));
2042        }
2043    }
2044
2045    // ── Webhook secret auth (optional, additional layer) ──
2046    if let Some(ref secret_hash) = state.webhook_secret_hash {
2047        let header_hash = headers
2048            .get("X-Webhook-Secret")
2049            .and_then(|v| v.to_str().ok())
2050            .map(str::trim)
2051            .filter(|value| !value.is_empty())
2052            .map(hash_webhook_secret);
2053        match header_hash {
2054            Some(val) if constant_time_eq(&val, secret_hash.as_ref()) => {}
2055            _ => {
2056                tracing::warn!("Webhook: rejected request — invalid or missing X-Webhook-Secret");
2057                let err = serde_json::json!({"error": "Unauthorized — invalid or missing X-Webhook-Secret header"});
2058                return (StatusCode::UNAUTHORIZED, Json(err));
2059            }
2060        }
2061    }
2062
2063    // ── Parse body ──
2064    let Json(webhook_body) = match body {
2065        Ok(b) => b,
2066        Err(e) => {
2067            tracing::warn!("Webhook JSON parse error: {e}");
2068            let err = serde_json::json!({
2069                "error": "Invalid JSON body. Expected: {\"message\": \"...\"}"
2070            });
2071            return (StatusCode::BAD_REQUEST, Json(err));
2072        }
2073    };
2074
2075    // ── Idempotency (optional) ──
2076    if let Some(idempotency_key) = headers
2077        .get("X-Idempotency-Key")
2078        .and_then(|v| v.to_str().ok())
2079        .map(str::trim)
2080        .filter(|value| !value.is_empty())
2081    {
2082        if !state.idempotency_store.record_if_new(idempotency_key) {
2083            tracing::info!("Webhook duplicate ignored (idempotency key: {idempotency_key})");
2084            let body = serde_json::json!({
2085                "status": "duplicate",
2086                "idempotent": true,
2087                "message": "Request already processed for this idempotency key"
2088            });
2089            return (StatusCode::OK, Json(body));
2090        }
2091    }
2092
2093    let message = &webhook_body.message;
2094    let session_id = webhook_session_id(&headers);
2095
2096    if state.auto_save && !memory::should_skip_autosave_content(message) {
2097        let key = webhook_memory_key();
2098        let _ = state
2099            .mem
2100            .store(
2101                &key,
2102                message,
2103                MemoryCategory::Conversation,
2104                session_id.as_deref(),
2105            )
2106            .await;
2107    }
2108
2109    let provider_label = state
2110        .config
2111        .lock()
2112        .default_provider
2113        .clone()
2114        .unwrap_or_else(|| "unknown".to_string());
2115    let model_label = state.model.clone();
2116    let started_at = Instant::now();
2117
2118    state
2119        .observer
2120        .record_event(&crate::observability::ObserverEvent::AgentStart {
2121            provider: provider_label.clone(),
2122            model: model_label.clone(),
2123        });
2124    state
2125        .observer
2126        .record_event(&crate::observability::ObserverEvent::LlmRequest {
2127            provider: provider_label.clone(),
2128            model: model_label.clone(),
2129            messages_count: 1,
2130        });
2131
2132    match run_gateway_chat_simple(&state, message).await {
2133        Ok(response) => {
2134            let duration = started_at.elapsed();
2135            state
2136                .observer
2137                .record_event(&crate::observability::ObserverEvent::LlmResponse {
2138                    provider: provider_label.clone(),
2139                    model: model_label.clone(),
2140                    duration,
2141                    success: true,
2142                    error_message: None,
2143                    input_tokens: None,
2144                    output_tokens: None,
2145                });
2146            state.observer.record_metric(
2147                &crate::observability::traits::ObserverMetric::RequestLatency(duration),
2148            );
2149            state
2150                .observer
2151                .record_event(&crate::observability::ObserverEvent::AgentEnd {
2152                    provider: provider_label,
2153                    model: model_label,
2154                    duration,
2155                    tokens_used: None,
2156                    cost_usd: None,
2157                });
2158
2159            let body = serde_json::json!({"response": response, "model": state.model});
2160            (StatusCode::OK, Json(body))
2161        }
2162        Err(e) => {
2163            let duration = started_at.elapsed();
2164            let sanitized = providers::sanitize_api_error(&e.to_string());
2165
2166            state
2167                .observer
2168                .record_event(&crate::observability::ObserverEvent::LlmResponse {
2169                    provider: provider_label.clone(),
2170                    model: model_label.clone(),
2171                    duration,
2172                    success: false,
2173                    error_message: Some(sanitized.clone()),
2174                    input_tokens: None,
2175                    output_tokens: None,
2176                });
2177            state.observer.record_metric(
2178                &crate::observability::traits::ObserverMetric::RequestLatency(duration),
2179            );
2180            state
2181                .observer
2182                .record_event(&crate::observability::ObserverEvent::Error {
2183                    component: "gateway".to_string(),
2184                    message: sanitized.clone(),
2185                });
2186            state
2187                .observer
2188                .record_event(&crate::observability::ObserverEvent::AgentEnd {
2189                    provider: provider_label,
2190                    model: model_label,
2191                    duration,
2192                    tokens_used: None,
2193                    cost_usd: None,
2194                });
2195
2196            tracing::error!("Webhook provider error: {}", sanitized);
2197            let err = serde_json::json!({"error": "LLM request failed"});
2198            (StatusCode::INTERNAL_SERVER_ERROR, Json(err))
2199        }
2200    }
2201}
2202
2203/// `WhatsApp` verification query params
2204#[derive(serde::Deserialize)]
2205pub struct WhatsAppVerifyQuery {
2206    #[serde(rename = "hub.mode")]
2207    pub mode: Option<String>,
2208    #[serde(rename = "hub.verify_token")]
2209    pub verify_token: Option<String>,
2210    #[serde(rename = "hub.challenge")]
2211    pub challenge: Option<String>,
2212}
2213
2214/// GET /whatsapp — Meta webhook verification
2215async fn handle_whatsapp_verify(
2216    State(state): State<AppState>,
2217    Query(params): Query<WhatsAppVerifyQuery>,
2218) -> impl IntoResponse {
2219    let Some(ref wa) = state.whatsapp else {
2220        return (StatusCode::NOT_FOUND, "WhatsApp not configured".to_string());
2221    };
2222
2223    // Verify the token matches (constant-time comparison to prevent timing attacks)
2224    let token_matches = params
2225        .verify_token
2226        .as_deref()
2227        .is_some_and(|t| constant_time_eq(t, wa.verify_token()));
2228    if params.mode.as_deref() == Some("subscribe") && token_matches {
2229        if let Some(ch) = params.challenge {
2230            tracing::info!("WhatsApp webhook verified successfully");
2231            return (StatusCode::OK, ch);
2232        }
2233        return (StatusCode::BAD_REQUEST, "Missing hub.challenge".to_string());
2234    }
2235
2236    tracing::warn!("WhatsApp webhook verification failed — token mismatch");
2237    (StatusCode::FORBIDDEN, "Forbidden".to_string())
2238}
2239
2240/// Verify `WhatsApp` webhook signature (`X-Hub-Signature-256`).
2241/// Returns true if the signature is valid, false otherwise.
2242/// See: <https://developers.facebook.com/docs/graph-api/webhooks/getting-started#verification-requests>
2243pub fn verify_whatsapp_signature(app_secret: &str, body: &[u8], signature_header: &str) -> bool {
2244    use hmac::{Hmac, Mac};
2245    use sha2::Sha256;
2246
2247    // Signature format: "sha256=<hex_signature>"
2248    let Some(hex_sig) = signature_header.strip_prefix("sha256=") else {
2249        return false;
2250    };
2251
2252    // Decode hex signature
2253    let Ok(expected) = hex::decode(hex_sig) else {
2254        return false;
2255    };
2256
2257    // Compute HMAC-SHA256
2258    let Ok(mut mac) = Hmac::<Sha256>::new_from_slice(app_secret.as_bytes()) else {
2259        return false;
2260    };
2261    mac.update(body);
2262
2263    // Constant-time comparison
2264    mac.verify_slice(&expected).is_ok()
2265}
2266
2267/// POST /whatsapp — incoming message webhook
2268async fn handle_whatsapp_message(
2269    State(state): State<AppState>,
2270    headers: HeaderMap,
2271    body: Bytes,
2272) -> impl IntoResponse {
2273    let Some(ref wa) = state.whatsapp else {
2274        return (
2275            StatusCode::NOT_FOUND,
2276            Json(serde_json::json!({"error": "WhatsApp not configured"})),
2277        );
2278    };
2279
2280    // ── Security: Verify X-Hub-Signature-256 if app_secret is configured ──
2281    if let Some(ref app_secret) = state.whatsapp_app_secret {
2282        let signature = headers
2283            .get("X-Hub-Signature-256")
2284            .and_then(|v| v.to_str().ok())
2285            .unwrap_or("");
2286
2287        if !verify_whatsapp_signature(app_secret, &body, signature) {
2288            tracing::warn!(
2289                "WhatsApp webhook signature verification failed (signature: {})",
2290                if signature.is_empty() {
2291                    "missing"
2292                } else {
2293                    "invalid"
2294                }
2295            );
2296            return (
2297                StatusCode::UNAUTHORIZED,
2298                Json(serde_json::json!({"error": "Invalid signature"})),
2299            );
2300        }
2301    }
2302
2303    // Parse JSON body
2304    let Ok(payload) = serde_json::from_slice::<serde_json::Value>(&body) else {
2305        return (
2306            StatusCode::BAD_REQUEST,
2307            Json(serde_json::json!({"error": "Invalid JSON payload"})),
2308        );
2309    };
2310
2311    // Parse messages from the webhook payload
2312    let messages = wa.parse_webhook_payload(&payload);
2313
2314    if messages.is_empty() {
2315        // Acknowledge the webhook even if no messages (could be status updates)
2316        return (StatusCode::OK, Json(serde_json::json!({"status": "ok"})));
2317    }
2318
2319    // Process each message
2320    for msg in &messages {
2321        tracing::info!(
2322            "WhatsApp message from {}: {}",
2323            msg.sender,
2324            truncate_with_ellipsis(&msg.content, 50)
2325        );
2326        let session_id = sender_session_id("whatsapp", msg);
2327
2328        // Auto-save to memory
2329        if state.auto_save && !memory::should_skip_autosave_content(&msg.content) {
2330            let key = whatsapp_memory_key(msg);
2331            let _ = state
2332                .mem
2333                .store(
2334                    &key,
2335                    &msg.content,
2336                    MemoryCategory::Conversation,
2337                    Some(&session_id),
2338                )
2339                .await;
2340        }
2341
2342        match Box::pin(run_gateway_chat_with_tools(
2343            &state,
2344            &msg.content,
2345            Some(&session_id),
2346        ))
2347        .await
2348        {
2349            Ok(response) => {
2350                // Send reply via WhatsApp
2351                if let Err(e) = wa
2352                    .send(&SendMessage::new(response, &msg.reply_target))
2353                    .await
2354                {
2355                    tracing::error!("Failed to send WhatsApp reply: {e}");
2356                }
2357            }
2358            Err(e) => {
2359                tracing::error!("LLM error for WhatsApp message: {e:#}");
2360                let _ = wa
2361                    .send(&SendMessage::new(
2362                        "Sorry, I couldn't process your message right now.",
2363                        &msg.reply_target,
2364                    ))
2365                    .await;
2366            }
2367        }
2368    }
2369
2370    // Acknowledge the webhook
2371    (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
2372}
2373
2374/// POST /linq — incoming message webhook (iMessage/RCS/SMS via Linq)
2375async fn handle_linq_webhook(
2376    State(state): State<AppState>,
2377    headers: HeaderMap,
2378    body: Bytes,
2379) -> impl IntoResponse {
2380    let Some(ref linq) = state.linq else {
2381        return (
2382            StatusCode::NOT_FOUND,
2383            Json(serde_json::json!({"error": "Linq not configured"})),
2384        );
2385    };
2386
2387    let body_str = String::from_utf8_lossy(&body);
2388
2389    // ── Security: Verify X-Webhook-Signature if signing_secret is configured ──
2390    if let Some(ref signing_secret) = state.linq_signing_secret {
2391        let timestamp = headers
2392            .get("X-Webhook-Timestamp")
2393            .and_then(|v| v.to_str().ok())
2394            .unwrap_or("");
2395
2396        let signature = headers
2397            .get("X-Webhook-Signature")
2398            .and_then(|v| v.to_str().ok())
2399            .unwrap_or("");
2400
2401        if !crate::channels::linq::verify_linq_signature(
2402            signing_secret,
2403            &body_str,
2404            timestamp,
2405            signature,
2406        ) {
2407            tracing::warn!(
2408                "Linq webhook signature verification failed (signature: {})",
2409                if signature.is_empty() {
2410                    "missing"
2411                } else {
2412                    "invalid"
2413                }
2414            );
2415            return (
2416                StatusCode::UNAUTHORIZED,
2417                Json(serde_json::json!({"error": "Invalid signature"})),
2418            );
2419        }
2420    }
2421
2422    // Parse JSON body
2423    let Ok(payload) = serde_json::from_slice::<serde_json::Value>(&body) else {
2424        return (
2425            StatusCode::BAD_REQUEST,
2426            Json(serde_json::json!({"error": "Invalid JSON payload"})),
2427        );
2428    };
2429
2430    // Parse messages from the webhook payload
2431    let messages = linq.parse_webhook_payload(&payload);
2432
2433    if messages.is_empty() {
2434        // Acknowledge the webhook even if no messages (could be status/delivery events)
2435        return (StatusCode::OK, Json(serde_json::json!({"status": "ok"})));
2436    }
2437
2438    // Process each message
2439    for msg in &messages {
2440        tracing::info!(
2441            "Linq message from {}: {}",
2442            msg.sender,
2443            truncate_with_ellipsis(&msg.content, 50)
2444        );
2445        let session_id = sender_session_id("linq", msg);
2446
2447        // Auto-save to memory
2448        if state.auto_save && !memory::should_skip_autosave_content(&msg.content) {
2449            let key = linq_memory_key(msg);
2450            let _ = state
2451                .mem
2452                .store(
2453                    &key,
2454                    &msg.content,
2455                    MemoryCategory::Conversation,
2456                    Some(&session_id),
2457                )
2458                .await;
2459        }
2460
2461        // Call the LLM
2462        match Box::pin(run_gateway_chat_with_tools(
2463            &state,
2464            &msg.content,
2465            Some(&session_id),
2466        ))
2467        .await
2468        {
2469            Ok(response) => {
2470                // Send reply via Linq
2471                if let Err(e) = linq
2472                    .send(&SendMessage::new(response, &msg.reply_target))
2473                    .await
2474                {
2475                    tracing::error!("Failed to send Linq reply: {e}");
2476                }
2477            }
2478            Err(e) => {
2479                tracing::error!("LLM error for Linq message: {e:#}");
2480                let _ = linq
2481                    .send(&SendMessage::new(
2482                        "Sorry, I couldn't process your message right now.",
2483                        &msg.reply_target,
2484                    ))
2485                    .await;
2486            }
2487        }
2488    }
2489
2490    // Acknowledge the webhook
2491    (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
2492}
2493
2494/// GET /wati — WATI webhook verification (echoes hub.challenge)
2495async fn handle_wati_verify(
2496    State(state): State<AppState>,
2497    Query(params): Query<WatiVerifyQuery>,
2498) -> impl IntoResponse {
2499    if state.wati.is_none() {
2500        return (StatusCode::NOT_FOUND, "WATI not configured".to_string());
2501    }
2502
2503    // WATI may use Meta-style webhook verification; echo the challenge
2504    if let Some(challenge) = params.challenge {
2505        tracing::info!("WATI webhook verified successfully");
2506        return (StatusCode::OK, challenge);
2507    }
2508
2509    (StatusCode::BAD_REQUEST, "Missing hub.challenge".to_string())
2510}
2511
2512#[derive(Debug, serde::Deserialize)]
2513pub struct WatiVerifyQuery {
2514    #[serde(rename = "hub.challenge")]
2515    pub challenge: Option<String>,
2516}
2517
2518/// POST /wati — incoming WATI WhatsApp message webhook
2519async fn handle_wati_webhook(State(state): State<AppState>, body: Bytes) -> impl IntoResponse {
2520    let Some(ref wati) = state.wati else {
2521        return (
2522            StatusCode::NOT_FOUND,
2523            Json(serde_json::json!({"error": "WATI not configured"})),
2524        );
2525    };
2526
2527    // Parse JSON body
2528    let Ok(payload) = serde_json::from_slice::<serde_json::Value>(&body) else {
2529        return (
2530            StatusCode::BAD_REQUEST,
2531            Json(serde_json::json!({"error": "Invalid JSON payload"})),
2532        );
2533    };
2534
2535    // Detect audio before the synchronous parse
2536    let msg_type = payload.get("type").and_then(|v| v.as_str()).unwrap_or("");
2537
2538    let messages = if matches!(msg_type, "audio" | "voice") {
2539        // Build a synthetic ChannelMessage from the audio transcript
2540        if let Some(transcript) = wati.try_transcribe_audio(&payload).await {
2541            wati.parse_audio_as_message(&payload, transcript)
2542        } else {
2543            vec![]
2544        }
2545    } else {
2546        wati.parse_webhook_payload(&payload)
2547    };
2548
2549    if messages.is_empty() {
2550        return (StatusCode::OK, Json(serde_json::json!({"status": "ok"})));
2551    }
2552
2553    // Process each message
2554    for msg in &messages {
2555        tracing::info!(
2556            "WATI message from {}: {}",
2557            msg.sender,
2558            truncate_with_ellipsis(&msg.content, 50)
2559        );
2560        let session_id = sender_session_id("wati", msg);
2561
2562        // Auto-save to memory
2563        if state.auto_save && !memory::should_skip_autosave_content(&msg.content) {
2564            let key = wati_memory_key(msg);
2565            let _ = state
2566                .mem
2567                .store(
2568                    &key,
2569                    &msg.content,
2570                    MemoryCategory::Conversation,
2571                    Some(&session_id),
2572                )
2573                .await;
2574        }
2575
2576        // Call the LLM
2577        match Box::pin(run_gateway_chat_with_tools(
2578            &state,
2579            &msg.content,
2580            Some(&session_id),
2581        ))
2582        .await
2583        {
2584            Ok(response) => {
2585                // Send reply via WATI
2586                if let Err(e) = wati
2587                    .send(&SendMessage::new(response, &msg.reply_target))
2588                    .await
2589                {
2590                    tracing::error!("Failed to send WATI reply: {e}");
2591                }
2592            }
2593            Err(e) => {
2594                tracing::error!("LLM error for WATI message: {e:#}");
2595                let _ = wati
2596                    .send(&SendMessage::new(
2597                        "Sorry, I couldn't process your message right now.",
2598                        &msg.reply_target,
2599                    ))
2600                    .await;
2601            }
2602        }
2603    }
2604
2605    // Acknowledge the webhook
2606    (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
2607}
2608
2609/// POST /nextcloud-talk — incoming message webhook (Nextcloud Talk bot API)
2610async fn handle_nextcloud_talk_webhook(
2611    State(state): State<AppState>,
2612    headers: HeaderMap,
2613    body: Bytes,
2614) -> impl IntoResponse {
2615    let Some(ref nextcloud_talk) = state.nextcloud_talk else {
2616        return (
2617            StatusCode::NOT_FOUND,
2618            Json(serde_json::json!({"error": "Nextcloud Talk not configured"})),
2619        );
2620    };
2621
2622    let body_str = String::from_utf8_lossy(&body);
2623
2624    // ── Security: Verify Nextcloud Talk HMAC signature if secret is configured ──
2625    if let Some(ref webhook_secret) = state.nextcloud_talk_webhook_secret {
2626        let random = headers
2627            .get("X-Nextcloud-Talk-Random")
2628            .and_then(|v| v.to_str().ok())
2629            .unwrap_or("");
2630
2631        let signature = headers
2632            .get("X-Nextcloud-Talk-Signature")
2633            .and_then(|v| v.to_str().ok())
2634            .unwrap_or("");
2635
2636        if !crate::channels::nextcloud_talk::verify_nextcloud_talk_signature(
2637            webhook_secret,
2638            random,
2639            &body_str,
2640            signature,
2641        ) {
2642            tracing::warn!(
2643                "Nextcloud Talk webhook signature verification failed (signature: {})",
2644                if signature.is_empty() {
2645                    "missing"
2646                } else {
2647                    "invalid"
2648                }
2649            );
2650            return (
2651                StatusCode::UNAUTHORIZED,
2652                Json(serde_json::json!({"error": "Invalid signature"})),
2653            );
2654        }
2655    }
2656
2657    // Parse JSON body
2658    let Ok(payload) = serde_json::from_slice::<serde_json::Value>(&body) else {
2659        return (
2660            StatusCode::BAD_REQUEST,
2661            Json(serde_json::json!({"error": "Invalid JSON payload"})),
2662        );
2663    };
2664
2665    // Parse messages from webhook payload
2666    let messages = nextcloud_talk.parse_webhook_payload(&payload);
2667    if messages.is_empty() {
2668        // Acknowledge webhook even if payload does not contain actionable user messages.
2669        return (StatusCode::OK, Json(serde_json::json!({"status": "ok"})));
2670    }
2671
2672    for msg in &messages {
2673        tracing::info!(
2674            "Nextcloud Talk message from {}: {}",
2675            msg.sender,
2676            truncate_with_ellipsis(&msg.content, 50)
2677        );
2678        let session_id = sender_session_id("nextcloud_talk", msg);
2679
2680        if state.auto_save && !memory::should_skip_autosave_content(&msg.content) {
2681            let key = nextcloud_talk_memory_key(msg);
2682            let _ = state
2683                .mem
2684                .store(
2685                    &key,
2686                    &msg.content,
2687                    MemoryCategory::Conversation,
2688                    Some(&session_id),
2689                )
2690                .await;
2691        }
2692
2693        match Box::pin(run_gateway_chat_with_tools(
2694            &state,
2695            &msg.content,
2696            Some(&session_id),
2697        ))
2698        .await
2699        {
2700            Ok(response) => {
2701                if let Err(e) = nextcloud_talk
2702                    .send(&SendMessage::new(response, &msg.reply_target))
2703                    .await
2704                {
2705                    tracing::error!("Failed to send Nextcloud Talk reply: {e}");
2706                }
2707            }
2708            Err(e) => {
2709                tracing::error!("LLM error for Nextcloud Talk message: {e:#}");
2710                let _ = nextcloud_talk
2711                    .send(&SendMessage::new(
2712                        "Sorry, I couldn't process your message right now.",
2713                        &msg.reply_target,
2714                    ))
2715                    .await;
2716            }
2717        }
2718    }
2719
2720    (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
2721}
2722
2723/// Maximum request body size for the Gmail webhook endpoint (1 MB).
2724/// Google Pub/Sub messages are typically under 10 KB.
2725const GMAIL_WEBHOOK_MAX_BODY: usize = 1024 * 1024;
2726
2727/// POST /webhook/gmail — incoming Gmail Pub/Sub push notification
2728async fn handle_gmail_push_webhook(
2729    State(state): State<AppState>,
2730    headers: HeaderMap,
2731    body: Bytes,
2732) -> impl IntoResponse {
2733    let Some(ref gmail_push) = state.gmail_push else {
2734        return (
2735            StatusCode::NOT_FOUND,
2736            Json(serde_json::json!({"error": "Gmail push not configured"})),
2737        );
2738    };
2739
2740    // Enforce body size limit.
2741    if body.len() > GMAIL_WEBHOOK_MAX_BODY {
2742        return (
2743            StatusCode::PAYLOAD_TOO_LARGE,
2744            Json(serde_json::json!({"error": "Request body too large"})),
2745        );
2746    }
2747
2748    // Authenticate the webhook request using a shared secret.
2749    let secret = gmail_push.resolve_webhook_secret();
2750    if !secret.is_empty() {
2751        let provided = headers
2752            .get(axum::http::header::AUTHORIZATION)
2753            .and_then(|v| v.to_str().ok())
2754            .and_then(|auth| auth.strip_prefix("Bearer "))
2755            .unwrap_or("");
2756
2757        if provided != secret {
2758            tracing::warn!("Gmail push webhook: unauthorized request");
2759            return (
2760                StatusCode::UNAUTHORIZED,
2761                Json(serde_json::json!({"error": "Unauthorized"})),
2762            );
2763        }
2764    }
2765
2766    let body_str = String::from_utf8_lossy(&body);
2767    let envelope: crate::channels::gmail_push::PubSubEnvelope =
2768        match serde_json::from_str(&body_str) {
2769            Ok(e) => e,
2770            Err(e) => {
2771                tracing::warn!("Gmail push webhook: invalid payload: {e}");
2772                return (
2773                    StatusCode::BAD_REQUEST,
2774                    Json(serde_json::json!({"error": "Invalid Pub/Sub envelope"})),
2775                );
2776            }
2777        };
2778
2779    // Process the notification asynchronously (non-blocking for the webhook response)
2780    let channel = Arc::clone(gmail_push);
2781    tokio::spawn(async move {
2782        if let Err(e) = channel.handle_notification(&envelope).await {
2783            tracing::error!("Gmail push notification processing failed: {e:#}");
2784        }
2785    });
2786
2787    // Acknowledge immediately — Google Pub/Sub requires a 2xx within ~10s
2788    (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
2789}
2790
2791// ══════════════════════════════════════════════════════════════════════════════
2792// ADMIN HANDLERS (for CLI management)
2793// ══════════════════════════════════════════════════════════════════════════════
2794
2795/// Response for admin endpoints
2796#[derive(serde::Serialize)]
2797struct AdminResponse {
2798    success: bool,
2799    message: String,
2800}
2801
2802/// Reject requests that do not originate from a loopback address.
2803fn require_localhost(peer: &SocketAddr) -> Result<(), (StatusCode, Json<serde_json::Value>)> {
2804    if peer.ip().is_loopback() {
2805        Ok(())
2806    } else {
2807        Err((
2808            StatusCode::FORBIDDEN,
2809            Json(serde_json::json!({
2810                "error": "Admin endpoints are restricted to localhost"
2811            })),
2812        ))
2813    }
2814}
2815
2816/// POST /admin/shutdown — graceful shutdown from CLI (localhost only)
2817async fn handle_admin_shutdown(
2818    State(state): State<AppState>,
2819    ConnectInfo(peer): ConnectInfo<SocketAddr>,
2820) -> Result<impl IntoResponse, (StatusCode, Json<serde_json::Value>)> {
2821    require_localhost(&peer)?;
2822    tracing::info!("🔌 Admin shutdown request received — initiating graceful shutdown");
2823
2824    let body = AdminResponse {
2825        success: true,
2826        message: "Gateway shutdown initiated".to_string(),
2827    };
2828
2829    let _ = state.shutdown_tx.send(true);
2830
2831    Ok((StatusCode::OK, Json(body)))
2832}
2833
2834/// GET /admin/paircode — fetch current pairing code (localhost only)
2835async fn handle_admin_paircode(
2836    State(state): State<AppState>,
2837    ConnectInfo(peer): ConnectInfo<SocketAddr>,
2838) -> Result<impl IntoResponse, (StatusCode, Json<serde_json::Value>)> {
2839    require_localhost(&peer)?;
2840    let code = state.pairing.pairing_code();
2841
2842    let body = if let Some(c) = code {
2843        serde_json::json!({
2844            "success": true,
2845            "pairing_required": state.pairing.require_pairing(),
2846            "pairing_code": c,
2847            "message": "Use this one-time code to pair"
2848        })
2849    } else {
2850        serde_json::json!({
2851            "success": true,
2852            "pairing_required": state.pairing.require_pairing(),
2853            "pairing_code": null,
2854            "message": if state.pairing.require_pairing() {
2855                "Pairing is active but no new code available (already paired or code expired)"
2856            } else {
2857                "Pairing is disabled for this gateway"
2858            }
2859        })
2860    };
2861
2862    Ok((StatusCode::OK, Json(body)))
2863}
2864
2865/// POST /admin/paircode/new — generate a new pairing code (localhost only)
2866async fn handle_admin_paircode_new(
2867    State(state): State<AppState>,
2868    ConnectInfo(peer): ConnectInfo<SocketAddr>,
2869) -> Result<impl IntoResponse, (StatusCode, Json<serde_json::Value>)> {
2870    require_localhost(&peer)?;
2871    match state.pairing.generate_new_pairing_code() {
2872        Some(code) => {
2873            tracing::info!("🔐 New pairing code generated via admin endpoint");
2874            let body = serde_json::json!({
2875                "success": true,
2876                "pairing_required": state.pairing.require_pairing(),
2877                "pairing_code": code,
2878                "message": "New pairing code generated — use this one-time code to pair"
2879            });
2880            Ok((StatusCode::OK, Json(body)))
2881        }
2882        None => {
2883            let body = serde_json::json!({
2884                "success": false,
2885                "pairing_required": false,
2886                "pairing_code": null,
2887                "message": "Pairing is disabled for this gateway"
2888            });
2889            Ok((StatusCode::BAD_REQUEST, Json(body)))
2890        }
2891    }
2892}
2893
2894/// GET /pair/code — fetch the initial pairing code.
2895///
2896/// Requires a loopback peer. A publicly-reachable endpoint would let any caller
2897/// (e.g. an attacker scanning exposed ngrok/Cloudflare tunnels during first-run)
2898/// fetch the code before the legitimate operator. Host-side dashboards should
2899/// reach the gateway over loopback; containerized setups can call this via
2900/// `docker exec` or fetch the code from `construct onboard` output.
2901async fn handle_pair_code(
2902    State(state): State<AppState>,
2903    ConnectInfo(peer): ConnectInfo<SocketAddr>,
2904) -> Result<impl IntoResponse, (StatusCode, Json<serde_json::Value>)> {
2905    require_localhost(&peer)?;
2906
2907    let require = state.pairing.require_pairing();
2908    let is_paired = state.pairing.is_paired();
2909
2910    let code = if require && !is_paired {
2911        state.pairing.pairing_code()
2912    } else {
2913        None
2914    };
2915
2916    let body = serde_json::json!({
2917        "success": true,
2918        "pairing_required": require,
2919        "pairing_code": code,
2920    });
2921
2922    Ok((StatusCode::OK, Json(body)))
2923}
2924
2925#[cfg(test)]
2926mod tests {
2927    use super::*;
2928    use crate::channels::traits::ChannelMessage;
2929    use crate::memory::{Memory, MemoryCategory, MemoryEntry};
2930    use crate::providers::Provider;
2931    use async_trait::async_trait;
2932    use axum::http::HeaderValue;
2933    use axum::response::IntoResponse;
2934    use http_body_util::BodyExt;
2935    use parking_lot::Mutex;
2936    use std::sync::atomic::{AtomicUsize, Ordering};
2937
2938    /// Generate a random hex secret at runtime to avoid hard-coded cryptographic values.
2939    fn generate_test_secret() -> String {
2940        let bytes: [u8; 32] = rand::random();
2941        hex::encode(bytes)
2942    }
2943
2944    #[test]
2945    fn security_body_limit_is_64kb() {
2946        assert_eq!(MAX_BODY_SIZE, 65_536);
2947    }
2948
2949    #[test]
2950    fn security_timeout_default_is_30_seconds() {
2951        assert_eq!(REQUEST_TIMEOUT_SECS, 30);
2952    }
2953
2954    #[test]
2955    fn gateway_timeout_falls_back_to_default() {
2956        // When env var is not set, should return the default constant
2957        // SAFETY: test-only, single-threaded test runner.
2958        unsafe { std::env::remove_var("CONSTRUCT_GATEWAY_TIMEOUT_SECS") };
2959        assert_eq!(gateway_request_timeout_secs(), 30);
2960    }
2961
2962    #[test]
2963    fn webhook_body_requires_message_field() {
2964        let valid = r#"{"message": "hello"}"#;
2965        let parsed: Result<WebhookBody, _> = serde_json::from_str(valid);
2966        assert!(parsed.is_ok());
2967        assert_eq!(parsed.unwrap().message, "hello");
2968
2969        let missing = r#"{"other": "field"}"#;
2970        let parsed: Result<WebhookBody, _> = serde_json::from_str(missing);
2971        assert!(parsed.is_err());
2972    }
2973
2974    #[test]
2975    fn whatsapp_query_fields_are_optional() {
2976        let q = WhatsAppVerifyQuery {
2977            mode: None,
2978            verify_token: None,
2979            challenge: None,
2980        };
2981        assert!(q.mode.is_none());
2982    }
2983
2984    #[test]
2985    fn app_state_is_clone() {
2986        fn assert_clone<T: Clone>() {}
2987        assert_clone::<AppState>();
2988    }
2989
2990    #[tokio::test]
2991    async fn metrics_endpoint_returns_hint_when_prometheus_is_disabled() {
2992        let state = AppState {
2993            config: Arc::new(Mutex::new(Config::default())),
2994            provider: Arc::new(MockProvider::default()),
2995            model: "test-model".into(),
2996            temperature: 0.0,
2997            mem: Arc::new(MockMemory),
2998            auto_save: false,
2999            webhook_secret_hash: None,
3000            pairing: Arc::new(PairingGuard::new(false, &[])),
3001            trust_forwarded_headers: false,
3002            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3003            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3004            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3005            whatsapp: None,
3006            whatsapp_app_secret: None,
3007            linq: None,
3008            linq_signing_secret: None,
3009            nextcloud_talk: None,
3010            nextcloud_talk_webhook_secret: None,
3011            wati: None,
3012            gmail_push: None,
3013            observer: Arc::new(crate::observability::NoopObserver),
3014            tools_registry: Arc::new(Vec::new()),
3015            cost_tracker: None,
3016            audit_logger: None,
3017            event_tx: tokio::sync::broadcast::channel(16).0,
3018            shutdown_tx: tokio::sync::watch::channel(false).0,
3019            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3020            path_prefix: String::new(),
3021            session_backend: None,
3022            session_queue: std::sync::Arc::new(
3023                crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3024            ),
3025            device_registry: None,
3026            pending_pairings: None,
3027            canvas_store: CanvasStore::new(),
3028            mcp_registry: None,
3029            approval_registry: approval_registry::global(),
3030            mcp_local_url: None,
3031            #[cfg(feature = "webauthn")]
3032            webauthn: None,
3033        };
3034
3035        let response = handle_metrics(State(state)).await.into_response();
3036        assert_eq!(response.status(), StatusCode::OK);
3037        assert_eq!(
3038            response
3039                .headers()
3040                .get(header::CONTENT_TYPE)
3041                .and_then(|value| value.to_str().ok()),
3042            Some(PROMETHEUS_CONTENT_TYPE)
3043        );
3044
3045        let body = response.into_body().collect().await.unwrap().to_bytes();
3046        let text = String::from_utf8(body.to_vec()).unwrap();
3047        assert!(text.contains("Prometheus backend not enabled"));
3048    }
3049
3050    #[cfg(feature = "observability-prometheus")]
3051    #[tokio::test]
3052    async fn metrics_endpoint_renders_prometheus_output() {
3053        let event_tx = tokio::sync::broadcast::channel(16).0;
3054        let wrapped = sse::BroadcastObserver::new(
3055            Box::new(crate::observability::PrometheusObserver::new()),
3056            event_tx.clone(),
3057        );
3058        crate::observability::Observer::record_event(
3059            &wrapped,
3060            &crate::observability::ObserverEvent::HeartbeatTick,
3061        );
3062
3063        let observer: Arc<dyn crate::observability::Observer> = Arc::new(wrapped);
3064        let state = AppState {
3065            config: Arc::new(Mutex::new(Config::default())),
3066            provider: Arc::new(MockProvider::default()),
3067            model: "test-model".into(),
3068            temperature: 0.0,
3069            mem: Arc::new(MockMemory),
3070            auto_save: false,
3071            webhook_secret_hash: None,
3072            pairing: Arc::new(PairingGuard::new(false, &[])),
3073            trust_forwarded_headers: false,
3074            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3075            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3076            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3077            whatsapp: None,
3078            whatsapp_app_secret: None,
3079            linq: None,
3080            linq_signing_secret: None,
3081            nextcloud_talk: None,
3082            nextcloud_talk_webhook_secret: None,
3083            wati: None,
3084            gmail_push: None,
3085            observer,
3086            tools_registry: Arc::new(Vec::new()),
3087            cost_tracker: None,
3088            audit_logger: None,
3089            event_tx,
3090            shutdown_tx: tokio::sync::watch::channel(false).0,
3091            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3092            path_prefix: String::new(),
3093            session_backend: None,
3094            session_queue: std::sync::Arc::new(
3095                crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3096            ),
3097            device_registry: None,
3098            pending_pairings: None,
3099            canvas_store: CanvasStore::new(),
3100            mcp_registry: None,
3101            approval_registry: approval_registry::global(),
3102            mcp_local_url: None,
3103            #[cfg(feature = "webauthn")]
3104            webauthn: None,
3105        };
3106
3107        let response = handle_metrics(State(state)).await.into_response();
3108        assert_eq!(response.status(), StatusCode::OK);
3109
3110        let body = response.into_body().collect().await.unwrap().to_bytes();
3111        let text = String::from_utf8(body.to_vec()).unwrap();
3112        assert!(text.contains("construct_heartbeat_ticks_total 1"));
3113    }
3114
3115    #[test]
3116    fn gateway_rate_limiter_blocks_after_limit() {
3117        let limiter = GatewayRateLimiter::new(2, 2, 100);
3118        assert!(limiter.allow_pair("127.0.0.1"));
3119        assert!(limiter.allow_pair("127.0.0.1"));
3120        assert!(!limiter.allow_pair("127.0.0.1"));
3121    }
3122
3123    #[test]
3124    fn rate_limiter_sweep_removes_stale_entries() {
3125        let limiter = SlidingWindowRateLimiter::new(10, Duration::from_secs(60), 100);
3126        // Add entries for multiple IPs
3127        assert!(limiter.allow("ip-1"));
3128        assert!(limiter.allow("ip-2"));
3129        assert!(limiter.allow("ip-3"));
3130
3131        {
3132            let guard = limiter.requests.lock();
3133            assert_eq!(guard.0.len(), 3);
3134        }
3135
3136        // Force a sweep by backdating last_sweep
3137        {
3138            let mut guard = limiter.requests.lock();
3139            guard.1 = Instant::now()
3140                .checked_sub(Duration::from_secs(RATE_LIMITER_SWEEP_INTERVAL_SECS + 1))
3141                .unwrap();
3142            // Clear timestamps for ip-2 and ip-3 to simulate stale entries
3143            guard.0.get_mut("ip-2").unwrap().clear();
3144            guard.0.get_mut("ip-3").unwrap().clear();
3145        }
3146
3147        // Next allow() call should trigger sweep and remove stale entries
3148        assert!(limiter.allow("ip-1"));
3149
3150        {
3151            let guard = limiter.requests.lock();
3152            assert_eq!(guard.0.len(), 1, "Stale entries should have been swept");
3153            assert!(guard.0.contains_key("ip-1"));
3154        }
3155    }
3156
3157    #[test]
3158    fn rate_limiter_zero_limit_always_allows() {
3159        let limiter = SlidingWindowRateLimiter::new(0, Duration::from_secs(60), 10);
3160        for _ in 0..100 {
3161            assert!(limiter.allow("any-key"));
3162        }
3163    }
3164
3165    #[test]
3166    fn idempotency_store_rejects_duplicate_key() {
3167        let store = IdempotencyStore::new(Duration::from_secs(30), 10);
3168        assert!(store.record_if_new("req-1"));
3169        assert!(!store.record_if_new("req-1"));
3170        assert!(store.record_if_new("req-2"));
3171    }
3172
3173    #[test]
3174    fn rate_limiter_bounded_cardinality_evicts_oldest_key() {
3175        let limiter = SlidingWindowRateLimiter::new(5, Duration::from_secs(60), 2);
3176        assert!(limiter.allow("ip-1"));
3177        assert!(limiter.allow("ip-2"));
3178        assert!(limiter.allow("ip-3"));
3179
3180        let guard = limiter.requests.lock();
3181        assert_eq!(guard.0.len(), 2);
3182        assert!(guard.0.contains_key("ip-2"));
3183        assert!(guard.0.contains_key("ip-3"));
3184    }
3185
3186    #[test]
3187    fn idempotency_store_bounded_cardinality_evicts_oldest_key() {
3188        let store = IdempotencyStore::new(Duration::from_secs(300), 2);
3189        assert!(store.record_if_new("k1"));
3190        std::thread::sleep(Duration::from_millis(2));
3191        assert!(store.record_if_new("k2"));
3192        std::thread::sleep(Duration::from_millis(2));
3193        assert!(store.record_if_new("k3"));
3194
3195        let keys = store.keys.lock();
3196        assert_eq!(keys.len(), 2);
3197        assert!(!keys.contains_key("k1"));
3198        assert!(keys.contains_key("k2"));
3199        assert!(keys.contains_key("k3"));
3200    }
3201
3202    #[test]
3203    fn client_key_defaults_to_peer_addr_when_untrusted_proxy_mode() {
3204        let peer = SocketAddr::from(([10, 0, 0, 5], 42617));
3205        let mut headers = HeaderMap::new();
3206        headers.insert(
3207            "X-Forwarded-For",
3208            HeaderValue::from_static("198.51.100.10, 203.0.113.11"),
3209        );
3210
3211        let key = client_key_from_request(Some(peer), &headers, false);
3212        assert_eq!(key, "10.0.0.5");
3213    }
3214
3215    #[test]
3216    fn client_key_uses_forwarded_ip_only_in_trusted_proxy_mode() {
3217        // Rightmost XFF hop is the one appended by our trusted upstream proxy;
3218        // leftmost values are attacker-controlled (clients send whatever).
3219        let peer = SocketAddr::from(([10, 0, 0, 5], 42617));
3220        let mut headers = HeaderMap::new();
3221        headers.insert(
3222            "X-Forwarded-For",
3223            HeaderValue::from_static("198.51.100.10, 203.0.113.11"),
3224        );
3225
3226        let key = client_key_from_request(Some(peer), &headers, true);
3227        assert_eq!(key, "203.0.113.11");
3228    }
3229
3230    #[test]
3231    fn client_key_spoofed_leftmost_xff_does_not_bypass_trust() {
3232        // Attacker sets `X-Forwarded-For: 127.0.0.1, <legit-upstream>`. The
3233        // rightmost (trusted proxy's appended value) must win — if we took
3234        // the leftmost, a remote attacker could spoof loopback and evade
3235        // rate limits / lockouts.
3236        let peer = SocketAddr::from(([10, 0, 0, 5], 42617));
3237        let mut headers = HeaderMap::new();
3238        headers.insert(
3239            "X-Forwarded-For",
3240            HeaderValue::from_static("127.0.0.1, 203.0.113.11"),
3241        );
3242
3243        let key = client_key_from_request(Some(peer), &headers, true);
3244        assert_eq!(key, "203.0.113.11");
3245    }
3246
3247    #[test]
3248    fn client_key_falls_back_to_peer_when_forwarded_header_invalid() {
3249        let peer = SocketAddr::from(([10, 0, 0, 5], 42617));
3250        let mut headers = HeaderMap::new();
3251        headers.insert("X-Forwarded-For", HeaderValue::from_static("garbage-value"));
3252
3253        let key = client_key_from_request(Some(peer), &headers, true);
3254        assert_eq!(key, "10.0.0.5");
3255    }
3256
3257    #[test]
3258    fn normalize_max_keys_uses_fallback_for_zero() {
3259        assert_eq!(normalize_max_keys(0, 10_000), 10_000);
3260        assert_eq!(normalize_max_keys(0, 0), 1);
3261    }
3262
3263    #[test]
3264    fn normalize_max_keys_preserves_nonzero_values() {
3265        assert_eq!(normalize_max_keys(2_048, 10_000), 2_048);
3266        assert_eq!(normalize_max_keys(1, 10_000), 1);
3267    }
3268
3269    #[tokio::test]
3270    async fn persist_pairing_tokens_writes_config_tokens() {
3271        let temp = tempfile::tempdir().unwrap();
3272        let config_path = temp.path().join("config.toml");
3273        let workspace_path = temp.path().join("workspace");
3274
3275        let mut config = Config::default();
3276        config.config_path = config_path.clone();
3277        config.workspace_dir = workspace_path;
3278        config.save().await.unwrap();
3279
3280        let guard = PairingGuard::new(true, &[]);
3281        let code = guard.pairing_code().unwrap();
3282        let token = guard.try_pair(&code, "test_client").await.unwrap().unwrap();
3283        assert!(guard.is_authenticated(&token));
3284
3285        let shared_config = Arc::new(Mutex::new(config));
3286        Box::pin(persist_pairing_tokens(shared_config.clone(), &guard))
3287            .await
3288            .unwrap();
3289
3290        // In-memory tokens should remain as plaintext 64-char hex hashes.
3291        let plaintext = {
3292            let in_memory = shared_config.lock();
3293            assert_eq!(in_memory.gateway.paired_tokens.len(), 1);
3294            in_memory.gateway.paired_tokens[0].clone()
3295        };
3296        assert_eq!(plaintext.len(), 64);
3297        assert!(plaintext.chars().all(|c: char| c.is_ascii_hexdigit()));
3298
3299        // On disk, the token should be encrypted (secrets.encrypt defaults to true).
3300        let saved = tokio::fs::read_to_string(config_path).await.unwrap();
3301        let raw_parsed: Config = toml::from_str(&saved).unwrap();
3302        assert_eq!(raw_parsed.gateway.paired_tokens.len(), 1);
3303        let on_disk = &raw_parsed.gateway.paired_tokens[0];
3304        assert!(
3305            crate::security::SecretStore::is_encrypted(on_disk),
3306            "paired_token should be encrypted on disk"
3307        );
3308    }
3309
3310    #[test]
3311    fn webhook_memory_key_is_unique() {
3312        let key1 = webhook_memory_key();
3313        let key2 = webhook_memory_key();
3314
3315        assert!(key1.starts_with("webhook_msg_"));
3316        assert!(key2.starts_with("webhook_msg_"));
3317        assert_ne!(key1, key2);
3318    }
3319
3320    #[test]
3321    fn whatsapp_memory_key_includes_sender_and_message_id() {
3322        let msg = ChannelMessage {
3323            id: "wamid-123".into(),
3324            sender: "+1234567890".into(),
3325            reply_target: "+1234567890".into(),
3326            content: "hello".into(),
3327            channel: "whatsapp".into(),
3328            timestamp: 1,
3329            thread_ts: None,
3330            interruption_scope_id: None,
3331            attachments: vec![],
3332        };
3333
3334        let key = whatsapp_memory_key(&msg);
3335        assert_eq!(key, "whatsapp_+1234567890_wamid-123");
3336    }
3337
3338    #[derive(Default)]
3339    struct MockMemory;
3340
3341    #[async_trait]
3342    impl Memory for MockMemory {
3343        fn name(&self) -> &str {
3344            "mock"
3345        }
3346
3347        async fn store(
3348            &self,
3349            _key: &str,
3350            _content: &str,
3351            _category: MemoryCategory,
3352            _session_id: Option<&str>,
3353        ) -> anyhow::Result<()> {
3354            Ok(())
3355        }
3356
3357        async fn recall(
3358            &self,
3359            _query: &str,
3360            _limit: usize,
3361            _session_id: Option<&str>,
3362            _since: Option<&str>,
3363            _until: Option<&str>,
3364        ) -> anyhow::Result<Vec<MemoryEntry>> {
3365            Ok(Vec::new())
3366        }
3367
3368        async fn get(&self, _key: &str) -> anyhow::Result<Option<MemoryEntry>> {
3369            Ok(None)
3370        }
3371
3372        async fn list(
3373            &self,
3374            _category: Option<&MemoryCategory>,
3375            _session_id: Option<&str>,
3376        ) -> anyhow::Result<Vec<MemoryEntry>> {
3377            Ok(Vec::new())
3378        }
3379
3380        async fn forget(&self, _key: &str) -> anyhow::Result<bool> {
3381            Ok(false)
3382        }
3383
3384        async fn count(&self) -> anyhow::Result<usize> {
3385            Ok(0)
3386        }
3387
3388        async fn health_check(&self) -> bool {
3389            true
3390        }
3391    }
3392
3393    #[derive(Default)]
3394    struct MockProvider {
3395        calls: AtomicUsize,
3396    }
3397
3398    #[async_trait]
3399    impl Provider for MockProvider {
3400        async fn chat_with_system(
3401            &self,
3402            _system_prompt: Option<&str>,
3403            _message: &str,
3404            _model: &str,
3405            _temperature: f64,
3406        ) -> anyhow::Result<String> {
3407            self.calls.fetch_add(1, Ordering::SeqCst);
3408            Ok("ok".into())
3409        }
3410    }
3411
3412    #[derive(Default)]
3413    struct TrackingMemory {
3414        keys: Mutex<Vec<String>>,
3415    }
3416
3417    #[async_trait]
3418    impl Memory for TrackingMemory {
3419        fn name(&self) -> &str {
3420            "tracking"
3421        }
3422
3423        async fn store(
3424            &self,
3425            key: &str,
3426            _content: &str,
3427            _category: MemoryCategory,
3428            _session_id: Option<&str>,
3429        ) -> anyhow::Result<()> {
3430            self.keys.lock().push(key.to_string());
3431            Ok(())
3432        }
3433
3434        async fn recall(
3435            &self,
3436            _query: &str,
3437            _limit: usize,
3438            _session_id: Option<&str>,
3439            _since: Option<&str>,
3440            _until: Option<&str>,
3441        ) -> anyhow::Result<Vec<MemoryEntry>> {
3442            Ok(Vec::new())
3443        }
3444
3445        async fn get(&self, _key: &str) -> anyhow::Result<Option<MemoryEntry>> {
3446            Ok(None)
3447        }
3448
3449        async fn list(
3450            &self,
3451            _category: Option<&MemoryCategory>,
3452            _session_id: Option<&str>,
3453        ) -> anyhow::Result<Vec<MemoryEntry>> {
3454            Ok(Vec::new())
3455        }
3456
3457        async fn forget(&self, _key: &str) -> anyhow::Result<bool> {
3458            Ok(false)
3459        }
3460
3461        async fn count(&self) -> anyhow::Result<usize> {
3462            let size = self.keys.lock().len();
3463            Ok(size)
3464        }
3465
3466        async fn health_check(&self) -> bool {
3467            true
3468        }
3469    }
3470
3471    fn test_connect_info() -> ConnectInfo<SocketAddr> {
3472        ConnectInfo(SocketAddr::from(([127, 0, 0, 1], 30_300)))
3473    }
3474
3475    #[tokio::test]
3476    async fn webhook_idempotency_skips_duplicate_provider_calls() {
3477        let provider_impl = Arc::new(MockProvider::default());
3478        let provider: Arc<dyn Provider> = provider_impl.clone();
3479        let memory: Arc<dyn Memory> = Arc::new(MockMemory);
3480
3481        let state = AppState {
3482            config: Arc::new(Mutex::new(Config::default())),
3483            provider,
3484            model: "test-model".into(),
3485            temperature: 0.0,
3486            mem: memory,
3487            auto_save: false,
3488            webhook_secret_hash: None,
3489            pairing: Arc::new(PairingGuard::new(false, &[])),
3490            trust_forwarded_headers: false,
3491            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3492            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3493            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3494            whatsapp: None,
3495            whatsapp_app_secret: None,
3496            linq: None,
3497            linq_signing_secret: None,
3498            nextcloud_talk: None,
3499            nextcloud_talk_webhook_secret: None,
3500            wati: None,
3501            gmail_push: None,
3502            observer: Arc::new(crate::observability::NoopObserver),
3503            tools_registry: Arc::new(Vec::new()),
3504            cost_tracker: None,
3505            audit_logger: None,
3506            event_tx: tokio::sync::broadcast::channel(16).0,
3507            shutdown_tx: tokio::sync::watch::channel(false).0,
3508            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3509            path_prefix: String::new(),
3510            session_backend: None,
3511            session_queue: std::sync::Arc::new(
3512                crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3513            ),
3514            device_registry: None,
3515            pending_pairings: None,
3516            canvas_store: CanvasStore::new(),
3517            mcp_registry: None,
3518            approval_registry: approval_registry::global(),
3519            mcp_local_url: None,
3520            #[cfg(feature = "webauthn")]
3521            webauthn: None,
3522        };
3523
3524        let mut headers = HeaderMap::new();
3525        headers.insert("X-Idempotency-Key", HeaderValue::from_static("abc-123"));
3526
3527        let body = Ok(Json(WebhookBody {
3528            message: "hello".into(),
3529        }));
3530        let first = handle_webhook(
3531            State(state.clone()),
3532            test_connect_info(),
3533            headers.clone(),
3534            body,
3535        )
3536        .await
3537        .into_response();
3538        assert_eq!(first.status(), StatusCode::OK);
3539
3540        let body = Ok(Json(WebhookBody {
3541            message: "hello".into(),
3542        }));
3543        let second = handle_webhook(State(state), test_connect_info(), headers, body)
3544            .await
3545            .into_response();
3546        assert_eq!(second.status(), StatusCode::OK);
3547
3548        let payload = second.into_body().collect().await.unwrap().to_bytes();
3549        let parsed: serde_json::Value = serde_json::from_slice(&payload).unwrap();
3550        assert_eq!(parsed["status"], "duplicate");
3551        assert_eq!(parsed["idempotent"], true);
3552        assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 1);
3553    }
3554
3555    #[tokio::test]
3556    async fn webhook_autosave_stores_distinct_keys_per_request() {
3557        let provider_impl = Arc::new(MockProvider::default());
3558        let provider: Arc<dyn Provider> = provider_impl.clone();
3559
3560        let tracking_impl = Arc::new(TrackingMemory::default());
3561        let memory: Arc<dyn Memory> = tracking_impl.clone();
3562
3563        let state = AppState {
3564            config: Arc::new(Mutex::new(Config::default())),
3565            provider,
3566            model: "test-model".into(),
3567            temperature: 0.0,
3568            mem: memory,
3569            auto_save: true,
3570            webhook_secret_hash: None,
3571            pairing: Arc::new(PairingGuard::new(false, &[])),
3572            trust_forwarded_headers: false,
3573            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3574            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3575            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3576            whatsapp: None,
3577            whatsapp_app_secret: None,
3578            linq: None,
3579            linq_signing_secret: None,
3580            nextcloud_talk: None,
3581            nextcloud_talk_webhook_secret: None,
3582            wati: None,
3583            gmail_push: None,
3584            observer: Arc::new(crate::observability::NoopObserver),
3585            tools_registry: Arc::new(Vec::new()),
3586            cost_tracker: None,
3587            audit_logger: None,
3588            event_tx: tokio::sync::broadcast::channel(16).0,
3589            shutdown_tx: tokio::sync::watch::channel(false).0,
3590            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3591            path_prefix: String::new(),
3592            session_backend: None,
3593            session_queue: std::sync::Arc::new(
3594                crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3595            ),
3596            device_registry: None,
3597            pending_pairings: None,
3598            canvas_store: CanvasStore::new(),
3599            mcp_registry: None,
3600            approval_registry: approval_registry::global(),
3601            mcp_local_url: None,
3602            #[cfg(feature = "webauthn")]
3603            webauthn: None,
3604        };
3605
3606        let headers = HeaderMap::new();
3607
3608        let body1 = Ok(Json(WebhookBody {
3609            message: "hello one".into(),
3610        }));
3611        let first = handle_webhook(
3612            State(state.clone()),
3613            test_connect_info(),
3614            headers.clone(),
3615            body1,
3616        )
3617        .await
3618        .into_response();
3619        assert_eq!(first.status(), StatusCode::OK);
3620
3621        let body2 = Ok(Json(WebhookBody {
3622            message: "hello two".into(),
3623        }));
3624        let second = handle_webhook(State(state), test_connect_info(), headers, body2)
3625            .await
3626            .into_response();
3627        assert_eq!(second.status(), StatusCode::OK);
3628
3629        let keys = tracking_impl.keys.lock().clone();
3630        assert_eq!(keys.len(), 2);
3631        assert_ne!(keys[0], keys[1]);
3632        assert!(keys[0].starts_with("webhook_msg_"));
3633        assert!(keys[1].starts_with("webhook_msg_"));
3634        assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 2);
3635    }
3636
3637    #[test]
3638    fn webhook_secret_hash_is_deterministic_and_nonempty() {
3639        let secret_a = generate_test_secret();
3640        let secret_b = generate_test_secret();
3641        let one = hash_webhook_secret(&secret_a);
3642        let two = hash_webhook_secret(&secret_a);
3643        let other = hash_webhook_secret(&secret_b);
3644
3645        assert_eq!(one, two);
3646        assert_ne!(one, other);
3647        assert_eq!(one.len(), 64);
3648    }
3649
3650    #[tokio::test]
3651    async fn webhook_secret_hash_rejects_missing_header() {
3652        let provider_impl = Arc::new(MockProvider::default());
3653        let provider: Arc<dyn Provider> = provider_impl.clone();
3654        let memory: Arc<dyn Memory> = Arc::new(MockMemory);
3655        let secret = generate_test_secret();
3656
3657        let state = AppState {
3658            config: Arc::new(Mutex::new(Config::default())),
3659            provider,
3660            model: "test-model".into(),
3661            temperature: 0.0,
3662            mem: memory,
3663            auto_save: false,
3664            webhook_secret_hash: Some(Arc::from(hash_webhook_secret(&secret))),
3665            pairing: Arc::new(PairingGuard::new(false, &[])),
3666            trust_forwarded_headers: false,
3667            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3668            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3669            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3670            whatsapp: None,
3671            whatsapp_app_secret: None,
3672            linq: None,
3673            linq_signing_secret: None,
3674            nextcloud_talk: None,
3675            nextcloud_talk_webhook_secret: None,
3676            wati: None,
3677            gmail_push: None,
3678            observer: Arc::new(crate::observability::NoopObserver),
3679            tools_registry: Arc::new(Vec::new()),
3680            cost_tracker: None,
3681            audit_logger: None,
3682            event_tx: tokio::sync::broadcast::channel(16).0,
3683            shutdown_tx: tokio::sync::watch::channel(false).0,
3684            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3685            path_prefix: String::new(),
3686            session_backend: None,
3687            session_queue: std::sync::Arc::new(
3688                crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3689            ),
3690            device_registry: None,
3691            pending_pairings: None,
3692            canvas_store: CanvasStore::new(),
3693            mcp_registry: None,
3694            approval_registry: approval_registry::global(),
3695            mcp_local_url: None,
3696            #[cfg(feature = "webauthn")]
3697            webauthn: None,
3698        };
3699
3700        let response = handle_webhook(
3701            State(state),
3702            test_connect_info(),
3703            HeaderMap::new(),
3704            Ok(Json(WebhookBody {
3705                message: "hello".into(),
3706            })),
3707        )
3708        .await
3709        .into_response();
3710
3711        assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
3712        assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 0);
3713    }
3714
3715    #[tokio::test]
3716    async fn webhook_secret_hash_rejects_invalid_header() {
3717        let provider_impl = Arc::new(MockProvider::default());
3718        let provider: Arc<dyn Provider> = provider_impl.clone();
3719        let memory: Arc<dyn Memory> = Arc::new(MockMemory);
3720        let valid_secret = generate_test_secret();
3721        let wrong_secret = generate_test_secret();
3722
3723        let state = AppState {
3724            config: Arc::new(Mutex::new(Config::default())),
3725            provider,
3726            model: "test-model".into(),
3727            temperature: 0.0,
3728            mem: memory,
3729            auto_save: false,
3730            webhook_secret_hash: Some(Arc::from(hash_webhook_secret(&valid_secret))),
3731            pairing: Arc::new(PairingGuard::new(false, &[])),
3732            trust_forwarded_headers: false,
3733            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3734            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3735            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3736            whatsapp: None,
3737            whatsapp_app_secret: None,
3738            linq: None,
3739            linq_signing_secret: None,
3740            nextcloud_talk: None,
3741            nextcloud_talk_webhook_secret: None,
3742            wati: None,
3743            gmail_push: None,
3744            observer: Arc::new(crate::observability::NoopObserver),
3745            tools_registry: Arc::new(Vec::new()),
3746            cost_tracker: None,
3747            audit_logger: None,
3748            event_tx: tokio::sync::broadcast::channel(16).0,
3749            shutdown_tx: tokio::sync::watch::channel(false).0,
3750            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3751            path_prefix: String::new(),
3752            session_backend: None,
3753            session_queue: std::sync::Arc::new(
3754                crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3755            ),
3756            device_registry: None,
3757            pending_pairings: None,
3758            canvas_store: CanvasStore::new(),
3759            mcp_registry: None,
3760            approval_registry: approval_registry::global(),
3761            mcp_local_url: None,
3762            #[cfg(feature = "webauthn")]
3763            webauthn: None,
3764        };
3765
3766        let mut headers = HeaderMap::new();
3767        headers.insert(
3768            "X-Webhook-Secret",
3769            HeaderValue::from_str(&wrong_secret).unwrap(),
3770        );
3771
3772        let response = handle_webhook(
3773            State(state),
3774            test_connect_info(),
3775            headers,
3776            Ok(Json(WebhookBody {
3777                message: "hello".into(),
3778            })),
3779        )
3780        .await
3781        .into_response();
3782
3783        assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
3784        assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 0);
3785    }
3786
3787    #[tokio::test]
3788    async fn webhook_secret_hash_accepts_valid_header() {
3789        let provider_impl = Arc::new(MockProvider::default());
3790        let provider: Arc<dyn Provider> = provider_impl.clone();
3791        let memory: Arc<dyn Memory> = Arc::new(MockMemory);
3792        let secret = generate_test_secret();
3793
3794        let state = AppState {
3795            config: Arc::new(Mutex::new(Config::default())),
3796            provider,
3797            model: "test-model".into(),
3798            temperature: 0.0,
3799            mem: memory,
3800            auto_save: false,
3801            webhook_secret_hash: Some(Arc::from(hash_webhook_secret(&secret))),
3802            pairing: Arc::new(PairingGuard::new(false, &[])),
3803            trust_forwarded_headers: false,
3804            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3805            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3806            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3807            whatsapp: None,
3808            whatsapp_app_secret: None,
3809            linq: None,
3810            linq_signing_secret: None,
3811            nextcloud_talk: None,
3812            nextcloud_talk_webhook_secret: None,
3813            wati: None,
3814            gmail_push: None,
3815            observer: Arc::new(crate::observability::NoopObserver),
3816            tools_registry: Arc::new(Vec::new()),
3817            cost_tracker: None,
3818            audit_logger: None,
3819            event_tx: tokio::sync::broadcast::channel(16).0,
3820            shutdown_tx: tokio::sync::watch::channel(false).0,
3821            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3822            path_prefix: String::new(),
3823            session_backend: None,
3824            session_queue: std::sync::Arc::new(
3825                crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3826            ),
3827            device_registry: None,
3828            pending_pairings: None,
3829            canvas_store: CanvasStore::new(),
3830            mcp_registry: None,
3831            approval_registry: approval_registry::global(),
3832            mcp_local_url: None,
3833            #[cfg(feature = "webauthn")]
3834            webauthn: None,
3835        };
3836
3837        let mut headers = HeaderMap::new();
3838        headers.insert("X-Webhook-Secret", HeaderValue::from_str(&secret).unwrap());
3839
3840        let response = handle_webhook(
3841            State(state),
3842            test_connect_info(),
3843            headers,
3844            Ok(Json(WebhookBody {
3845                message: "hello".into(),
3846            })),
3847        )
3848        .await
3849        .into_response();
3850
3851        assert_eq!(response.status(), StatusCode::OK);
3852        assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 1);
3853    }
3854
3855    fn compute_nextcloud_signature_hex(secret: &str, random: &str, body: &str) -> String {
3856        use hmac::{Hmac, Mac};
3857        use sha2::Sha256;
3858
3859        let payload = format!("{random}{body}");
3860        let mut mac = Hmac::<Sha256>::new_from_slice(secret.as_bytes()).unwrap();
3861        mac.update(payload.as_bytes());
3862        hex::encode(mac.finalize().into_bytes())
3863    }
3864
3865    #[tokio::test]
3866    async fn nextcloud_talk_webhook_returns_not_found_when_not_configured() {
3867        let provider: Arc<dyn Provider> = Arc::new(MockProvider::default());
3868        let memory: Arc<dyn Memory> = Arc::new(MockMemory);
3869
3870        let state = AppState {
3871            config: Arc::new(Mutex::new(Config::default())),
3872            provider,
3873            model: "test-model".into(),
3874            temperature: 0.0,
3875            mem: memory,
3876            auto_save: false,
3877            webhook_secret_hash: None,
3878            pairing: Arc::new(PairingGuard::new(false, &[])),
3879            trust_forwarded_headers: false,
3880            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3881            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3882            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3883            whatsapp: None,
3884            whatsapp_app_secret: None,
3885            linq: None,
3886            linq_signing_secret: None,
3887            nextcloud_talk: None,
3888            nextcloud_talk_webhook_secret: None,
3889            wati: None,
3890            gmail_push: None,
3891            observer: Arc::new(crate::observability::NoopObserver),
3892            tools_registry: Arc::new(Vec::new()),
3893            cost_tracker: None,
3894            audit_logger: None,
3895            event_tx: tokio::sync::broadcast::channel(16).0,
3896            shutdown_tx: tokio::sync::watch::channel(false).0,
3897            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3898            path_prefix: String::new(),
3899            session_backend: None,
3900            session_queue: std::sync::Arc::new(
3901                crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3902            ),
3903            device_registry: None,
3904            pending_pairings: None,
3905            canvas_store: CanvasStore::new(),
3906            mcp_registry: None,
3907            approval_registry: approval_registry::global(),
3908            mcp_local_url: None,
3909            #[cfg(feature = "webauthn")]
3910            webauthn: None,
3911        };
3912
3913        let response = Box::pin(handle_nextcloud_talk_webhook(
3914            State(state),
3915            HeaderMap::new(),
3916            Bytes::from_static(br#"{"type":"message"}"#),
3917        ))
3918        .await
3919        .into_response();
3920
3921        assert_eq!(response.status(), StatusCode::NOT_FOUND);
3922    }
3923
3924    #[tokio::test]
3925    async fn nextcloud_talk_webhook_rejects_invalid_signature() {
3926        let provider_impl = Arc::new(MockProvider::default());
3927        let provider: Arc<dyn Provider> = provider_impl.clone();
3928        let memory: Arc<dyn Memory> = Arc::new(MockMemory);
3929
3930        let channel = Arc::new(NextcloudTalkChannel::new(
3931            "https://cloud.example.com".into(),
3932            "app-token".into(),
3933            String::new(),
3934            vec!["*".into()],
3935        ));
3936
3937        let secret = "nextcloud-test-secret";
3938        let random = "seed-value";
3939        let body = r#"{"type":"message","object":{"token":"room-token"},"message":{"actorType":"users","actorId":"user_a","message":"hello"}}"#;
3940        let _valid_signature = compute_nextcloud_signature_hex(secret, random, body);
3941        let invalid_signature = "deadbeef";
3942
3943        let state = AppState {
3944            config: Arc::new(Mutex::new(Config::default())),
3945            provider,
3946            model: "test-model".into(),
3947            temperature: 0.0,
3948            mem: memory,
3949            auto_save: false,
3950            webhook_secret_hash: None,
3951            pairing: Arc::new(PairingGuard::new(false, &[])),
3952            trust_forwarded_headers: false,
3953            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3954            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3955            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3956            whatsapp: None,
3957            whatsapp_app_secret: None,
3958            linq: None,
3959            linq_signing_secret: None,
3960            nextcloud_talk: Some(channel),
3961            nextcloud_talk_webhook_secret: Some(Arc::from(secret)),
3962            wati: None,
3963            gmail_push: None,
3964            observer: Arc::new(crate::observability::NoopObserver),
3965            tools_registry: Arc::new(Vec::new()),
3966            cost_tracker: None,
3967            audit_logger: None,
3968            event_tx: tokio::sync::broadcast::channel(16).0,
3969            shutdown_tx: tokio::sync::watch::channel(false).0,
3970            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3971            path_prefix: String::new(),
3972            session_backend: None,
3973            session_queue: std::sync::Arc::new(
3974                crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3975            ),
3976            device_registry: None,
3977            pending_pairings: None,
3978            canvas_store: CanvasStore::new(),
3979            mcp_registry: None,
3980            approval_registry: approval_registry::global(),
3981            mcp_local_url: None,
3982            #[cfg(feature = "webauthn")]
3983            webauthn: None,
3984        };
3985
3986        let mut headers = HeaderMap::new();
3987        headers.insert(
3988            "X-Nextcloud-Talk-Random",
3989            HeaderValue::from_str(random).unwrap(),
3990        );
3991        headers.insert(
3992            "X-Nextcloud-Talk-Signature",
3993            HeaderValue::from_str(invalid_signature).unwrap(),
3994        );
3995
3996        let response = Box::pin(handle_nextcloud_talk_webhook(
3997            State(state),
3998            headers,
3999            Bytes::from(body),
4000        ))
4001        .await
4002        .into_response();
4003        assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
4004        assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 0);
4005    }
4006
4007    // ══════════════════════════════════════════════════════════
4008    // WhatsApp Signature Verification Tests (CWE-345 Prevention)
4009    // ══════════════════════════════════════════════════════════
4010
4011    fn compute_whatsapp_signature_hex(secret: &str, body: &[u8]) -> String {
4012        use hmac::{Hmac, Mac};
4013        use sha2::Sha256;
4014
4015        let mut mac = Hmac::<Sha256>::new_from_slice(secret.as_bytes()).unwrap();
4016        mac.update(body);
4017        hex::encode(mac.finalize().into_bytes())
4018    }
4019
4020    fn compute_whatsapp_signature_header(secret: &str, body: &[u8]) -> String {
4021        format!("sha256={}", compute_whatsapp_signature_hex(secret, body))
4022    }
4023
4024    #[test]
4025    fn whatsapp_signature_valid() {
4026        let app_secret = generate_test_secret();
4027        let body = b"test body content";
4028
4029        let signature_header = compute_whatsapp_signature_header(&app_secret, body);
4030
4031        assert!(verify_whatsapp_signature(
4032            &app_secret,
4033            body,
4034            &signature_header
4035        ));
4036    }
4037
4038    #[test]
4039    fn whatsapp_signature_invalid_wrong_secret() {
4040        let app_secret = generate_test_secret();
4041        let wrong_secret = generate_test_secret();
4042        let body = b"test body content";
4043
4044        let signature_header = compute_whatsapp_signature_header(&wrong_secret, body);
4045
4046        assert!(!verify_whatsapp_signature(
4047            &app_secret,
4048            body,
4049            &signature_header
4050        ));
4051    }
4052
4053    #[test]
4054    fn whatsapp_signature_invalid_wrong_body() {
4055        let app_secret = generate_test_secret();
4056        let original_body = b"original body";
4057        let tampered_body = b"tampered body";
4058
4059        let signature_header = compute_whatsapp_signature_header(&app_secret, original_body);
4060
4061        // Verify with tampered body should fail
4062        assert!(!verify_whatsapp_signature(
4063            &app_secret,
4064            tampered_body,
4065            &signature_header
4066        ));
4067    }
4068
4069    #[test]
4070    fn whatsapp_signature_missing_prefix() {
4071        let app_secret = generate_test_secret();
4072        let body = b"test body";
4073
4074        // Signature without "sha256=" prefix
4075        let signature_header = "abc123def456";
4076
4077        assert!(!verify_whatsapp_signature(
4078            &app_secret,
4079            body,
4080            signature_header
4081        ));
4082    }
4083
4084    #[test]
4085    fn whatsapp_signature_empty_header() {
4086        let app_secret = generate_test_secret();
4087        let body = b"test body";
4088
4089        assert!(!verify_whatsapp_signature(&app_secret, body, ""));
4090    }
4091
4092    #[test]
4093    fn whatsapp_signature_invalid_hex() {
4094        let app_secret = generate_test_secret();
4095        let body = b"test body";
4096
4097        // Invalid hex characters
4098        let signature_header = "sha256=not_valid_hex_zzz";
4099
4100        assert!(!verify_whatsapp_signature(
4101            &app_secret,
4102            body,
4103            signature_header
4104        ));
4105    }
4106
4107    #[test]
4108    fn whatsapp_signature_empty_body() {
4109        let app_secret = generate_test_secret();
4110        let body = b"";
4111
4112        let signature_header = compute_whatsapp_signature_header(&app_secret, body);
4113
4114        assert!(verify_whatsapp_signature(
4115            &app_secret,
4116            body,
4117            &signature_header
4118        ));
4119    }
4120
4121    #[test]
4122    fn whatsapp_signature_unicode_body() {
4123        let app_secret = generate_test_secret();
4124        let body = "Hello 🦀 World".as_bytes();
4125
4126        let signature_header = compute_whatsapp_signature_header(&app_secret, body);
4127
4128        assert!(verify_whatsapp_signature(
4129            &app_secret,
4130            body,
4131            &signature_header
4132        ));
4133    }
4134
4135    #[test]
4136    fn whatsapp_signature_json_payload() {
4137        let app_secret = generate_test_secret();
4138        let body = br#"{"entry":[{"changes":[{"value":{"messages":[{"from":"1234567890","text":{"body":"Hello"}}]}}]}]}"#;
4139
4140        let signature_header = compute_whatsapp_signature_header(&app_secret, body);
4141
4142        assert!(verify_whatsapp_signature(
4143            &app_secret,
4144            body,
4145            &signature_header
4146        ));
4147    }
4148
4149    #[test]
4150    fn whatsapp_signature_case_sensitive_prefix() {
4151        let app_secret = generate_test_secret();
4152        let body = b"test body";
4153
4154        let hex_sig = compute_whatsapp_signature_hex(&app_secret, body);
4155
4156        // Wrong case prefix should fail
4157        let wrong_prefix = format!("SHA256={hex_sig}");
4158        assert!(!verify_whatsapp_signature(&app_secret, body, &wrong_prefix));
4159
4160        // Correct prefix should pass
4161        let correct_prefix = format!("sha256={hex_sig}");
4162        assert!(verify_whatsapp_signature(
4163            &app_secret,
4164            body,
4165            &correct_prefix
4166        ));
4167    }
4168
4169    #[test]
4170    fn whatsapp_signature_truncated_hex() {
4171        let app_secret = generate_test_secret();
4172        let body = b"test body";
4173
4174        let hex_sig = compute_whatsapp_signature_hex(&app_secret, body);
4175        let truncated = &hex_sig[..32]; // Only half the signature
4176        let signature_header = format!("sha256={truncated}");
4177
4178        assert!(!verify_whatsapp_signature(
4179            &app_secret,
4180            body,
4181            &signature_header
4182        ));
4183    }
4184
4185    #[test]
4186    fn whatsapp_signature_extra_bytes() {
4187        let app_secret = generate_test_secret();
4188        let body = b"test body";
4189
4190        let hex_sig = compute_whatsapp_signature_hex(&app_secret, body);
4191        let extended = format!("{hex_sig}deadbeef");
4192        let signature_header = format!("sha256={extended}");
4193
4194        assert!(!verify_whatsapp_signature(
4195            &app_secret,
4196            body,
4197            &signature_header
4198        ));
4199    }
4200
4201    // ══════════════════════════════════════════════════════════
4202    // IdempotencyStore Edge-Case Tests
4203    // ══════════════════════════════════════════════════════════
4204
4205    #[test]
4206    fn idempotency_store_allows_different_keys() {
4207        let store = IdempotencyStore::new(Duration::from_secs(60), 100);
4208        assert!(store.record_if_new("key-a"));
4209        assert!(store.record_if_new("key-b"));
4210        assert!(store.record_if_new("key-c"));
4211        assert!(store.record_if_new("key-d"));
4212    }
4213
4214    #[test]
4215    fn idempotency_store_max_keys_clamped_to_one() {
4216        let store = IdempotencyStore::new(Duration::from_secs(60), 0);
4217        assert!(store.record_if_new("only-key"));
4218        assert!(!store.record_if_new("only-key"));
4219    }
4220
4221    #[test]
4222    fn idempotency_store_rapid_duplicate_rejected() {
4223        let store = IdempotencyStore::new(Duration::from_secs(300), 100);
4224        assert!(store.record_if_new("rapid"));
4225        assert!(!store.record_if_new("rapid"));
4226    }
4227
4228    #[test]
4229    fn idempotency_store_accepts_after_ttl_expires() {
4230        let store = IdempotencyStore::new(Duration::from_millis(1), 100);
4231        assert!(store.record_if_new("ttl-key"));
4232        std::thread::sleep(Duration::from_millis(10));
4233        assert!(store.record_if_new("ttl-key"));
4234    }
4235
4236    #[test]
4237    fn idempotency_store_eviction_preserves_newest() {
4238        let store = IdempotencyStore::new(Duration::from_secs(300), 1);
4239        assert!(store.record_if_new("old-key"));
4240        std::thread::sleep(Duration::from_millis(2));
4241        assert!(store.record_if_new("new-key"));
4242
4243        let keys = store.keys.lock();
4244        assert_eq!(keys.len(), 1);
4245        assert!(!keys.contains_key("old-key"));
4246        assert!(keys.contains_key("new-key"));
4247    }
4248
4249    #[test]
4250    fn rate_limiter_allows_after_window_expires() {
4251        let window = Duration::from_millis(50);
4252        let limiter = SlidingWindowRateLimiter::new(2, window, 100);
4253        assert!(limiter.allow("ip-1"));
4254        assert!(limiter.allow("ip-1"));
4255        assert!(!limiter.allow("ip-1")); // blocked
4256
4257        // Wait for window to expire
4258        std::thread::sleep(Duration::from_millis(60));
4259
4260        // Should be allowed again
4261        assert!(limiter.allow("ip-1"));
4262    }
4263
4264    #[test]
4265    fn rate_limiter_independent_keys_tracked_separately() {
4266        let limiter = SlidingWindowRateLimiter::new(2, Duration::from_secs(60), 100);
4267        assert!(limiter.allow("ip-1"));
4268        assert!(limiter.allow("ip-1"));
4269        assert!(!limiter.allow("ip-1")); // ip-1 blocked
4270
4271        // ip-2 should still work
4272        assert!(limiter.allow("ip-2"));
4273        assert!(limiter.allow("ip-2"));
4274        assert!(!limiter.allow("ip-2")); // ip-2 now blocked
4275    }
4276
4277    #[test]
4278    fn rate_limiter_exact_boundary_at_max_keys() {
4279        let limiter = SlidingWindowRateLimiter::new(10, Duration::from_secs(60), 3);
4280        assert!(limiter.allow("ip-1"));
4281        assert!(limiter.allow("ip-2"));
4282        assert!(limiter.allow("ip-3"));
4283        // At capacity now
4284        assert!(limiter.allow("ip-4")); // should evict ip-1
4285
4286        let guard = limiter.requests.lock();
4287        assert_eq!(guard.0.len(), 3);
4288        assert!(
4289            !guard.0.contains_key("ip-1"),
4290            "ip-1 should have been evicted"
4291        );
4292        assert!(guard.0.contains_key("ip-2"));
4293        assert!(guard.0.contains_key("ip-3"));
4294        assert!(guard.0.contains_key("ip-4"));
4295    }
4296
4297    #[test]
4298    fn gateway_rate_limiter_pair_and_webhook_are_independent() {
4299        let limiter = GatewayRateLimiter::new(2, 3, 100);
4300
4301        // Exhaust pair limit
4302        assert!(limiter.allow_pair("ip-1"));
4303        assert!(limiter.allow_pair("ip-1"));
4304        assert!(!limiter.allow_pair("ip-1")); // pair blocked
4305
4306        // Webhook should still work
4307        assert!(limiter.allow_webhook("ip-1"));
4308        assert!(limiter.allow_webhook("ip-1"));
4309        assert!(limiter.allow_webhook("ip-1"));
4310        assert!(!limiter.allow_webhook("ip-1")); // webhook now blocked
4311    }
4312
4313    #[test]
4314    fn rate_limiter_single_key_max_allows_one_request() {
4315        let limiter = SlidingWindowRateLimiter::new(5, Duration::from_secs(60), 1);
4316        assert!(limiter.allow("ip-1"));
4317        assert!(limiter.allow("ip-2")); // evicts ip-1
4318
4319        let guard = limiter.requests.lock();
4320        assert_eq!(guard.0.len(), 1);
4321        assert!(guard.0.contains_key("ip-2"));
4322        assert!(!guard.0.contains_key("ip-1"));
4323    }
4324
4325    #[test]
4326    fn rate_limiter_concurrent_access_safe() {
4327        use std::sync::Arc;
4328
4329        let limiter = Arc::new(SlidingWindowRateLimiter::new(
4330            1000,
4331            Duration::from_secs(60),
4332            1000,
4333        ));
4334        let mut handles = Vec::new();
4335
4336        for i in 0..10 {
4337            let limiter = limiter.clone();
4338            handles.push(std::thread::spawn(move || {
4339                for j in 0..100 {
4340                    limiter.allow(&format!("thread-{i}-req-{j}"));
4341                }
4342            }));
4343        }
4344
4345        for handle in handles {
4346            handle.join().unwrap();
4347        }
4348
4349        // Should not panic or deadlock
4350        let guard = limiter.requests.lock();
4351        assert!(guard.0.len() <= 1000, "should respect max_keys");
4352    }
4353
4354    #[test]
4355    fn idempotency_store_concurrent_access_safe() {
4356        use std::sync::Arc;
4357
4358        let store = Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000));
4359        let mut handles = Vec::new();
4360
4361        for i in 0..10 {
4362            let store = store.clone();
4363            handles.push(std::thread::spawn(move || {
4364                for j in 0..100 {
4365                    store.record_if_new(&format!("thread-{i}-key-{j}"));
4366                }
4367            }));
4368        }
4369
4370        for handle in handles {
4371            handle.join().unwrap();
4372        }
4373
4374        let keys = store.keys.lock();
4375        assert!(keys.len() <= 1000, "should respect max_keys");
4376    }
4377
4378    #[test]
4379    fn rate_limiter_rapid_burst_then_cooldown() {
4380        let limiter = SlidingWindowRateLimiter::new(5, Duration::from_millis(50), 100);
4381
4382        // Burst: use all 5 requests
4383        for _ in 0..5 {
4384            assert!(limiter.allow("burst-ip"));
4385        }
4386        assert!(!limiter.allow("burst-ip")); // 6th should fail
4387
4388        // Cooldown
4389        std::thread::sleep(Duration::from_millis(60));
4390
4391        // Should be allowed again
4392        assert!(limiter.allow("burst-ip"));
4393    }
4394
4395    #[test]
4396    fn require_localhost_accepts_ipv4_loopback() {
4397        let peer = SocketAddr::from(([127, 0, 0, 1], 12345));
4398        assert!(require_localhost(&peer).is_ok());
4399    }
4400
4401    #[test]
4402    fn require_localhost_accepts_ipv6_loopback() {
4403        let peer = SocketAddr::from((std::net::Ipv6Addr::LOCALHOST, 12345));
4404        assert!(require_localhost(&peer).is_ok());
4405    }
4406
4407    #[test]
4408    fn require_localhost_rejects_non_loopback_ipv4() {
4409        let peer = SocketAddr::from(([192, 168, 1, 100], 12345));
4410        let err = require_localhost(&peer).unwrap_err();
4411        assert_eq!(err.0, StatusCode::FORBIDDEN);
4412    }
4413
4414    #[test]
4415    fn require_localhost_rejects_non_loopback_ipv6() {
4416        let peer = SocketAddr::from((
4417            std::net::Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1),
4418            12345,
4419        ));
4420        let err = require_localhost(&peer).unwrap_err();
4421        assert_eq!(err.0, StatusCode::FORBIDDEN);
4422    }
4423}