Skip to main content

construct/gateway/
mod.rs

1//! Axum-based HTTP gateway with proper HTTP/1.1 compliance, body limits, and timeouts.
2//!
3//! This module replaces the raw TCP implementation with axum for:
4//! - Proper HTTP/1.1 parsing and compliance
5//! - Content-Length validation (handled by hyper)
6//! - Request body size limits (64KB max)
7//! - Request timeouts (30s) to prevent slow-loris attacks
8//! - Header sanitization (handled by axum/hyper)
9
10pub mod api;
11pub mod api_agents;
12pub mod api_artifact_body;
13pub mod api_clawhub;
14pub mod api_kumiho_proxy;
15pub mod api_mcp;
16pub mod api_memory_graph;
17pub mod api_pairing;
18#[cfg(feature = "plugins-wasm")]
19pub mod api_plugins;
20pub mod api_skills;
21pub mod api_teams;
22#[cfg(feature = "webauthn")]
23pub mod api_webauthn;
24pub mod api_workflows;
25pub mod approval_registry;
26pub mod auth_rate_limit;
27pub mod canvas;
28pub mod kumiho_client;
29pub mod mcp_discovery;
30pub mod nodes;
31pub mod session_queue;
32pub mod sse;
33pub mod static_files;
34// portable-pty needs `openpty` from libutil which Android's NDK does not
35// reliably link.  The websocket terminal isn't a meaningful surface on a
36// phone runtime anyway, so we drop the module + its route on Android.
37#[cfg(not(target_os = "android"))]
38pub mod terminal;
39pub mod tls;
40pub mod ws;
41pub mod ws_mcp_events;
42
43use crate::channels::{
44    Channel, GmailPushChannel, LinqChannel, NextcloudTalkChannel, SendMessage, WatiChannel,
45    WhatsAppChannel, session_backend::SessionBackend, session_sqlite::SqliteSessionBackend,
46};
47use crate::config::Config;
48use crate::cost::CostTracker;
49use crate::memory::{self, Memory, MemoryCategory};
50use crate::providers::{self, ChatMessage, Provider};
51use crate::runtime;
52use crate::security::SecurityPolicy;
53use crate::security::pairing::{PairingGuard, constant_time_eq, is_public_bind};
54use crate::tools;
55use crate::tools::canvas::CanvasStore;
56use crate::tools::traits::ToolSpec;
57use crate::util::truncate_with_ellipsis;
58use anyhow::{Context, Result};
59use axum::{
60    Router,
61    body::Bytes,
62    extract::{ConnectInfo, Query, State},
63    http::{HeaderMap, StatusCode, header},
64    response::{IntoResponse, Json},
65    routing::{delete, get, post, put},
66};
67use parking_lot::Mutex;
68use std::collections::HashMap;
69use std::net::{IpAddr, SocketAddr};
70use std::sync::Arc;
71use std::time::{Duration, Instant};
72use tower_http::limit::RequestBodyLimitLayer;
73use tower_http::timeout::TimeoutLayer;
74use uuid::Uuid;
75
76/// Maximum request body size (64KB) — prevents memory exhaustion
77pub const MAX_BODY_SIZE: usize = 65_536;
78/// Default request timeout (30s) — prevents slow-loris attacks.
79pub const REQUEST_TIMEOUT_SECS: u64 = 30;
80
81/// Read gateway request timeout from `CONSTRUCT_GATEWAY_TIMEOUT_SECS` env var
82/// at runtime, falling back to [`REQUEST_TIMEOUT_SECS`].
83///
84/// Agentic workloads with tool use (web search, MCP tools, sub-agent
85/// delegation) regularly exceed 30 seconds. This allows operators to
86/// increase the timeout without recompiling.
87pub fn gateway_request_timeout_secs() -> u64 {
88    std::env::var("CONSTRUCT_GATEWAY_TIMEOUT_SECS")
89        .ok()
90        .and_then(|v| v.parse().ok())
91        .unwrap_or(REQUEST_TIMEOUT_SECS)
92}
93/// Sliding window used by gateway rate limiting.
94pub const RATE_LIMIT_WINDOW_SECS: u64 = 60;
95/// Fallback max distinct client keys tracked in gateway rate limiter.
96pub const RATE_LIMIT_MAX_KEYS_DEFAULT: usize = 10_000;
97/// Fallback max distinct idempotency keys retained in gateway memory.
98pub const IDEMPOTENCY_MAX_KEYS_DEFAULT: usize = 10_000;
99
100fn webhook_memory_key() -> String {
101    format!("webhook_msg_{}", Uuid::new_v4())
102}
103
104fn whatsapp_memory_key(msg: &crate::channels::traits::ChannelMessage) -> String {
105    format!("whatsapp_{}_{}", msg.sender, msg.id)
106}
107
108fn linq_memory_key(msg: &crate::channels::traits::ChannelMessage) -> String {
109    format!("linq_{}_{}", msg.sender, msg.id)
110}
111
112fn wati_memory_key(msg: &crate::channels::traits::ChannelMessage) -> String {
113    format!("wati_{}_{}", msg.sender, msg.id)
114}
115
116fn nextcloud_talk_memory_key(msg: &crate::channels::traits::ChannelMessage) -> String {
117    format!("nextcloud_talk_{}_{}", msg.sender, msg.id)
118}
119
120fn sender_session_id(channel: &str, msg: &crate::channels::traits::ChannelMessage) -> String {
121    match &msg.thread_ts {
122        Some(thread_id) => format!("{channel}_{thread_id}_{}", msg.sender),
123        None => format!("{channel}_{}", msg.sender),
124    }
125}
126
127fn webhook_session_id(headers: &HeaderMap) -> Option<String> {
128    headers
129        .get("X-Session-Id")
130        .and_then(|v| v.to_str().ok())
131        .map(str::trim)
132        .filter(|value| !value.is_empty())
133        .map(str::to_owned)
134}
135
136fn hash_webhook_secret(value: &str) -> String {
137    use sha2::{Digest, Sha256};
138
139    let digest = Sha256::digest(value.as_bytes());
140    hex::encode(digest)
141}
142
143/// How often the rate limiter sweeps stale IP entries from its map.
144const RATE_LIMITER_SWEEP_INTERVAL_SECS: u64 = 300; // 5 minutes
145
146#[derive(Debug)]
147struct SlidingWindowRateLimiter {
148    limit_per_window: u32,
149    window: Duration,
150    max_keys: usize,
151    requests: Mutex<(HashMap<String, Vec<Instant>>, Instant)>,
152}
153
154impl SlidingWindowRateLimiter {
155    fn new(limit_per_window: u32, window: Duration, max_keys: usize) -> Self {
156        Self {
157            limit_per_window,
158            window,
159            max_keys: max_keys.max(1),
160            requests: Mutex::new((HashMap::new(), Instant::now())),
161        }
162    }
163
164    fn prune_stale(requests: &mut HashMap<String, Vec<Instant>>, cutoff: Instant) {
165        requests.retain(|_, timestamps| {
166            timestamps.retain(|t| *t > cutoff);
167            !timestamps.is_empty()
168        });
169    }
170
171    fn allow(&self, key: &str) -> bool {
172        if self.limit_per_window == 0 {
173            return true;
174        }
175
176        let now = Instant::now();
177        let cutoff = now.checked_sub(self.window).unwrap_or_else(Instant::now);
178
179        let mut guard = self.requests.lock();
180        let (requests, last_sweep) = &mut *guard;
181
182        // Periodic sweep: remove keys with no recent requests
183        if last_sweep.elapsed() >= Duration::from_secs(RATE_LIMITER_SWEEP_INTERVAL_SECS) {
184            Self::prune_stale(requests, cutoff);
185            *last_sweep = now;
186        }
187
188        if !requests.contains_key(key) && requests.len() >= self.max_keys {
189            // Opportunistic stale cleanup before eviction under cardinality pressure.
190            Self::prune_stale(requests, cutoff);
191            *last_sweep = now;
192
193            if requests.len() >= self.max_keys {
194                let evict_key = requests
195                    .iter()
196                    .min_by_key(|(_, timestamps)| timestamps.last().copied().unwrap_or(cutoff))
197                    .map(|(k, _)| k.clone());
198                if let Some(evict_key) = evict_key {
199                    requests.remove(&evict_key);
200                }
201            }
202        }
203
204        let entry = requests.entry(key.to_owned()).or_default();
205        entry.retain(|instant| *instant > cutoff);
206
207        if entry.len() >= self.limit_per_window as usize {
208            return false;
209        }
210
211        entry.push(now);
212        true
213    }
214}
215
216#[derive(Debug)]
217pub struct GatewayRateLimiter {
218    pair: SlidingWindowRateLimiter,
219    webhook: SlidingWindowRateLimiter,
220}
221
222impl GatewayRateLimiter {
223    fn new(pair_per_minute: u32, webhook_per_minute: u32, max_keys: usize) -> Self {
224        let window = Duration::from_secs(RATE_LIMIT_WINDOW_SECS);
225        Self {
226            pair: SlidingWindowRateLimiter::new(pair_per_minute, window, max_keys),
227            webhook: SlidingWindowRateLimiter::new(webhook_per_minute, window, max_keys),
228        }
229    }
230
231    fn allow_pair(&self, key: &str) -> bool {
232        self.pair.allow(key)
233    }
234
235    fn allow_webhook(&self, key: &str) -> bool {
236        self.webhook.allow(key)
237    }
238}
239
240#[derive(Debug)]
241pub struct IdempotencyStore {
242    ttl: Duration,
243    max_keys: usize,
244    keys: Mutex<HashMap<String, Instant>>,
245}
246
247impl IdempotencyStore {
248    fn new(ttl: Duration, max_keys: usize) -> Self {
249        Self {
250            ttl,
251            max_keys: max_keys.max(1),
252            keys: Mutex::new(HashMap::new()),
253        }
254    }
255
256    /// Returns true if this key is new and is now recorded.
257    fn record_if_new(&self, key: &str) -> bool {
258        let now = Instant::now();
259        let mut keys = self.keys.lock();
260
261        keys.retain(|_, seen_at| now.duration_since(*seen_at) < self.ttl);
262
263        if keys.contains_key(key) {
264            return false;
265        }
266
267        if keys.len() >= self.max_keys {
268            let evict_key = keys
269                .iter()
270                .min_by_key(|(_, seen_at)| *seen_at)
271                .map(|(k, _)| k.clone());
272            if let Some(evict_key) = evict_key {
273                keys.remove(&evict_key);
274            }
275        }
276
277        keys.insert(key.to_owned(), now);
278        true
279    }
280}
281
282fn parse_client_ip(value: &str) -> Option<IpAddr> {
283    let value = value.trim().trim_matches('"').trim();
284    if value.is_empty() {
285        return None;
286    }
287
288    if let Ok(ip) = value.parse::<IpAddr>() {
289        return Some(ip);
290    }
291
292    if let Ok(addr) = value.parse::<SocketAddr>() {
293        return Some(addr.ip());
294    }
295
296    let value = value.trim_matches(['[', ']']);
297    value.parse::<IpAddr>().ok()
298}
299
300fn forwarded_client_ip(headers: &HeaderMap) -> Option<IpAddr> {
301    // Read the rightmost X-Forwarded-For hop. Proxies APPEND to XFF, so the
302    // leftmost value is supplied by the client (attacker-controlled) while the
303    // rightmost was written by the immediate upstream proxy we are trusting.
304    // Taking the leftmost would let any caller spoof an arbitrary source IP.
305    if let Some(xff) = headers.get("X-Forwarded-For").and_then(|v| v.to_str().ok()) {
306        for candidate in xff.rsplit(',') {
307            if let Some(ip) = parse_client_ip(candidate) {
308                return Some(ip);
309            }
310        }
311    }
312
313    headers
314        .get("X-Real-IP")
315        .and_then(|v| v.to_str().ok())
316        .and_then(parse_client_ip)
317}
318
319pub(super) fn client_key_from_request(
320    peer_addr: Option<SocketAddr>,
321    headers: &HeaderMap,
322    trust_forwarded_headers: bool,
323) -> String {
324    if trust_forwarded_headers {
325        if let Some(ip) = forwarded_client_ip(headers) {
326            return ip.to_string();
327        }
328    }
329
330    peer_addr
331        .map(|addr| addr.ip().to_string())
332        .unwrap_or_else(|| "unknown".to_string())
333}
334
335fn normalize_max_keys(configured: usize, fallback: usize) -> usize {
336    if configured == 0 {
337        fallback.max(1)
338    } else {
339        configured
340    }
341}
342
343/// Shared state for all axum handlers
344#[derive(Clone)]
345pub struct AppState {
346    pub config: Arc<Mutex<Config>>,
347    pub provider: Arc<dyn Provider>,
348    pub model: String,
349    pub temperature: f64,
350    pub mem: Arc<dyn Memory>,
351    pub auto_save: bool,
352    /// SHA-256 hash of `X-Webhook-Secret` (hex-encoded), never plaintext.
353    pub webhook_secret_hash: Option<Arc<str>>,
354    pub pairing: Arc<PairingGuard>,
355    pub trust_forwarded_headers: bool,
356    pub rate_limiter: Arc<GatewayRateLimiter>,
357    pub auth_limiter: Arc<auth_rate_limit::AuthRateLimiter>,
358    pub idempotency_store: Arc<IdempotencyStore>,
359    pub whatsapp: Option<Arc<WhatsAppChannel>>,
360    /// `WhatsApp` app secret for webhook signature verification (`X-Hub-Signature-256`)
361    pub whatsapp_app_secret: Option<Arc<str>>,
362    pub linq: Option<Arc<LinqChannel>>,
363    /// Linq webhook signing secret for signature verification
364    pub linq_signing_secret: Option<Arc<str>>,
365    pub nextcloud_talk: Option<Arc<NextcloudTalkChannel>>,
366    /// Nextcloud Talk webhook secret for signature verification
367    pub nextcloud_talk_webhook_secret: Option<Arc<str>>,
368    pub wati: Option<Arc<WatiChannel>>,
369    /// Gmail Pub/Sub push notification channel
370    pub gmail_push: Option<Arc<GmailPushChannel>>,
371    /// Observability backend for metrics scraping
372    pub observer: Arc<dyn crate::observability::Observer>,
373    /// Registered tool specs (for web dashboard tools page)
374    pub tools_registry: Arc<Vec<ToolSpec>>,
375    /// Cost tracker (optional, for web dashboard cost page)
376    pub cost_tracker: Option<Arc<CostTracker>>,
377    /// Audit logger (optional, for web dashboard audit viewer)
378    pub audit_logger: Option<Arc<crate::security::audit::AuditLogger>>,
379    /// SSE broadcast channel for real-time events
380    pub event_tx: tokio::sync::broadcast::Sender<serde_json::Value>,
381    /// Shutdown signal sender for graceful shutdown
382    pub shutdown_tx: tokio::sync::watch::Sender<bool>,
383    /// Registry of dynamically connected nodes
384    pub node_registry: Arc<nodes::NodeRegistry>,
385    /// Path prefix for reverse-proxy deployments (empty string = no prefix)
386    pub path_prefix: String,
387    /// Session backend for persisting gateway WS chat sessions
388    pub session_backend: Option<Arc<dyn SessionBackend>>,
389    /// Per-session actor queue for serializing concurrent turns
390    pub session_queue: Arc<session_queue::SessionActorQueue>,
391    /// Device registry for paired device management
392    pub device_registry: Option<Arc<api_pairing::DeviceRegistry>>,
393    /// Pending pairing request store
394    pub pending_pairings: Option<Arc<api_pairing::PairingStore>>,
395    /// Shared canvas store for Live Canvas (A2UI) system
396    pub canvas_store: CanvasStore,
397    /// MCP registry for direct tool invocation from HTTP handlers (memory graph, etc.)
398    pub mcp_registry: Option<Arc<tools::McpRegistry>>,
399    /// WebAuthn state for hardware key authentication (optional, requires `webauthn` feature)
400    #[cfg(feature = "webauthn")]
401    pub webauthn: Option<Arc<api_webauthn::WebAuthnState>>,
402    /// Registry of pending human approval requests from workflow runs
403    pub approval_registry: Arc<approval_registry::ApprovalRegistry>,
404    /// Base URL (e.g. `http://127.0.0.1:60004`) of the in-process MCP server,
405    /// used by gateway reverse-proxy handlers in [`api_mcp`]. `None` if the
406    /// MCP server failed to bind — proxy handlers should then return 503.
407    /// Populated after MCP server bind in [`run_gateway`].
408    pub mcp_local_url: Option<Arc<str>>,
409}
410
411/// Run the HTTP gateway using axum with proper HTTP/1.1 compliance.
412#[allow(clippy::too_many_lines)]
413pub async fn run_gateway(host: &str, port: u16, config: Config) -> Result<()> {
414    // ── Security: warn on public bind without tunnel or explicit opt-in ──
415    if is_public_bind(host) && config.tunnel.provider == "none" && !config.gateway.allow_public_bind
416    {
417        tracing::warn!(
418            "⚠️  Binding to {host} — gateway will be exposed to all network interfaces.\n\
419             Suggestion: use --host 127.0.0.1 (default), configure a tunnel, or set\n\
420             [gateway] allow_public_bind = true in config.toml to silence this warning.\n\n\
421             Docker/VM: if you are running inside a container or VM, this is expected."
422        );
423    }
424    let config_state = Arc::new(Mutex::new(config.clone()));
425
426    // ── Hooks ──────────────────────────────────────────────────────
427    let hooks: Option<std::sync::Arc<crate::hooks::HookRunner>> = if config.hooks.enabled {
428        Some(std::sync::Arc::new(crate::hooks::HookRunner::new()))
429    } else {
430        None
431    };
432
433    let addr: SocketAddr = format!("{host}:{port}").parse()?;
434    let listener = tokio::net::TcpListener::bind(addr).await?;
435    let actual_port = listener.local_addr()?.port();
436    let display_addr = format!("{host}:{actual_port}");
437
438    let provider: Arc<dyn Provider> = Arc::from(providers::create_resilient_provider_with_options(
439        config.default_provider.as_deref().unwrap_or("openrouter"),
440        config.api_key.as_deref(),
441        config.api_url.as_deref(),
442        &config.reliability,
443        &providers::ProviderRuntimeOptions {
444            auth_profile_override: None,
445            provider_api_url: config.api_url.clone(),
446            construct_dir: config.config_path.parent().map(std::path::PathBuf::from),
447            secrets_encrypt: config.secrets.encrypt,
448            reasoning_enabled: config.runtime.reasoning_enabled,
449            reasoning_effort: config.runtime.reasoning_effort.clone(),
450            provider_timeout_secs: Some(config.provider_timeout_secs),
451            extra_headers: config.extra_headers.clone(),
452            api_path: config.api_path.clone(),
453            provider_max_tokens: config.provider_max_tokens,
454        },
455    )?);
456    let model = config
457        .default_model
458        .clone()
459        .unwrap_or_else(|| "anthropic/claude-sonnet-4".into());
460    let temperature = config.default_temperature;
461    let mem: Arc<dyn Memory> = Arc::from(memory::create_memory_with_storage_and_routes(
462        &config.memory,
463        &config.embedding_routes,
464        Some(&config.storage.provider.config),
465        &config.workspace_dir,
466        config.api_key.as_deref(),
467    )?);
468    let runtime: Arc<dyn runtime::RuntimeAdapter> =
469        Arc::from(runtime::create_runtime(&config.runtime)?);
470    let security = Arc::new(SecurityPolicy::from_config(
471        &config.autonomy,
472        &config.workspace_dir,
473    ));
474
475    let (composio_key, composio_entity_id) = if config.composio.enabled {
476        (
477            config.composio.api_key.as_deref(),
478            Some(config.composio.entity_id.as_str()),
479        )
480    } else {
481        (None, None)
482    };
483
484    let canvas_store = tools::canvas::global_store();
485
486    let (
487        mut tools_registry_raw,
488        delegate_handle_gw,
489        _reaction_handle_gw,
490        _channel_map_handle,
491        _ask_user_handle_gw,
492        _escalate_handle_gw,
493    ) = tools::all_tools_with_runtime(
494        Arc::new(config.clone()),
495        &security,
496        runtime,
497        Arc::clone(&mem),
498        composio_key,
499        composio_entity_id,
500        &config.browser,
501        &config.http_request,
502        &config.web_fetch,
503        &config.workspace_dir,
504        &config.agents,
505        config.api_key.as_deref(),
506        &config,
507        Some(canvas_store.clone()),
508    );
509
510    // ── Wire MCP tools into the gateway tool registry (non-fatal) ───
511    // Without this, the `/api/tools` endpoint misses MCP tools.
512    // Inject operator + kumiho MCP server configs (same as agent/channels do)
513    // so the gateway can call operator tools directly from HTTP handlers.
514    let gateway_mcp_config = {
515        let mut c = config.clone();
516        c = crate::agent::kumiho::inject_kumiho(c, false);
517        c = crate::agent::operator::inject_operator(c, false);
518        c.mcp
519    };
520    let mut mcp_registry_shared: Option<Arc<tools::McpRegistry>> = None;
521    if gateway_mcp_config.enabled && !gateway_mcp_config.servers.is_empty() {
522        tracing::info!(
523            "Gateway: initializing MCP client — {} server(s) configured",
524            gateway_mcp_config.servers.len()
525        );
526        match tools::McpRegistry::connect_all(&gateway_mcp_config.servers).await {
527            Ok(registry) => {
528                let registry = std::sync::Arc::new(registry);
529                mcp_registry_shared = Some(std::sync::Arc::clone(&registry));
530                if gateway_mcp_config.deferred_loading {
531                    let operator_prefix =
532                        format!("{}__", crate::agent::operator::OPERATOR_SERVER_NAME);
533                    let kumiho_prefix = format!("{}__", crate::agent::kumiho::KUMIHO_SERVER_NAME);
534                    let all_names = registry.tool_names();
535                    let mut eager_count = 0usize;
536
537                    for name in &all_names {
538                        if name.starts_with(&operator_prefix) || name.starts_with(&kumiho_prefix) {
539                            if let Some(def) = registry.get_tool_def(name).await {
540                                let wrapper: std::sync::Arc<dyn tools::Tool> =
541                                    std::sync::Arc::new(tools::McpToolWrapper::new(
542                                        name.clone(),
543                                        def,
544                                        std::sync::Arc::clone(&registry),
545                                    ));
546                                if let Some(ref handle) = delegate_handle_gw {
547                                    handle.write().push(std::sync::Arc::clone(&wrapper));
548                                }
549                                tools_registry_raw.push(Box::new(tools::ArcToolRef(wrapper)));
550                                eager_count += 1;
551                            }
552                        }
553                    }
554
555                    let deferred_set = tools::DeferredMcpToolSet::from_registry_filtered(
556                        std::sync::Arc::clone(&registry),
557                        |name| {
558                            !name.starts_with(&operator_prefix) && !name.starts_with(&kumiho_prefix)
559                        },
560                    )
561                    .await;
562                    tracing::info!(
563                        "Gateway MCP hybrid: {} eager operator+kumiho tool(s), {} deferred stub(s) from {} server(s)",
564                        eager_count,
565                        deferred_set.len(),
566                        registry.server_count()
567                    );
568                    let activated =
569                        std::sync::Arc::new(std::sync::Mutex::new(tools::ActivatedToolSet::new()));
570                    tools_registry_raw.push(Box::new(tools::ToolSearchTool::new(
571                        deferred_set,
572                        activated,
573                    )));
574                } else {
575                    let names = registry.tool_names();
576                    let mut registered = 0usize;
577                    for name in names {
578                        if let Some(def) = registry.get_tool_def(&name).await {
579                            let wrapper: std::sync::Arc<dyn tools::Tool> =
580                                std::sync::Arc::new(tools::McpToolWrapper::new(
581                                    name,
582                                    def,
583                                    std::sync::Arc::clone(&registry),
584                                ));
585                            if let Some(ref handle) = delegate_handle_gw {
586                                handle.write().push(std::sync::Arc::clone(&wrapper));
587                            }
588                            tools_registry_raw.push(Box::new(tools::ArcToolRef(wrapper)));
589                            registered += 1;
590                        }
591                    }
592                    tracing::info!(
593                        "Gateway MCP: {} tool(s) registered from {} server(s)",
594                        registered,
595                        registry.server_count()
596                    );
597                }
598            }
599            Err(e) => {
600                tracing::error!("Gateway MCP registry failed to initialize: {e:#}");
601            }
602        }
603    }
604
605    let tools_registry: Arc<Vec<ToolSpec>> =
606        Arc::new(tools_registry_raw.iter().map(|t| t.spec()).collect());
607
608    // Cost tracker — process-global singleton so channels share the same instance
609    let cost_tracker = CostTracker::get_or_init_global(config.cost.clone(), &config.workspace_dir);
610
611    // Audit logger — optional, for dashboard audit viewer
612    let audit_logger = if config.security.audit.enabled {
613        match crate::security::audit::AuditLogger::new(
614            config.security.audit.clone(),
615            std::path::PathBuf::from(&config.workspace_dir),
616        ) {
617            Ok(logger) => Some(Arc::new(logger)),
618            Err(e) => {
619                tracing::warn!("Audit logger init failed (dashboard audit viewer disabled): {e}");
620                None
621            }
622        }
623    } else {
624        None
625    };
626
627    // SSE broadcast channel for real-time events
628    let (event_tx, _event_rx) = tokio::sync::broadcast::channel::<serde_json::Value>(4096);
629    // Extract webhook secret for authentication
630    let webhook_secret_hash: Option<Arc<str>> =
631        config.channels_config.webhook.as_ref().and_then(|webhook| {
632            webhook.secret.as_ref().and_then(|raw_secret| {
633                let trimmed_secret = raw_secret.trim();
634                (!trimmed_secret.is_empty())
635                    .then(|| Arc::<str>::from(hash_webhook_secret(trimmed_secret)))
636            })
637        });
638
639    // WhatsApp channel (if configured)
640    let whatsapp_channel: Option<Arc<WhatsAppChannel>> = config
641        .channels_config
642        .whatsapp
643        .as_ref()
644        .filter(|wa| wa.is_cloud_config())
645        .map(|wa| {
646            Arc::new(WhatsAppChannel::new(
647                wa.access_token.clone().unwrap_or_default(),
648                wa.phone_number_id.clone().unwrap_or_default(),
649                wa.verify_token.clone().unwrap_or_default(),
650                wa.allowed_numbers.clone(),
651            ))
652        });
653
654    // WhatsApp app secret for webhook signature verification
655    // Priority: environment variable > config file
656    let whatsapp_app_secret: Option<Arc<str>> = std::env::var("CONSTRUCT_WHATSAPP_APP_SECRET")
657        .ok()
658        .and_then(|secret| {
659            let secret = secret.trim();
660            (!secret.is_empty()).then(|| secret.to_owned())
661        })
662        .or_else(|| {
663            config.channels_config.whatsapp.as_ref().and_then(|wa| {
664                wa.app_secret
665                    .as_deref()
666                    .map(str::trim)
667                    .filter(|secret| !secret.is_empty())
668                    .map(ToOwned::to_owned)
669            })
670        })
671        .map(Arc::from);
672
673    // Linq channel (if configured)
674    let linq_channel: Option<Arc<LinqChannel>> = config.channels_config.linq.as_ref().map(|lq| {
675        Arc::new(LinqChannel::new(
676            lq.api_token.clone(),
677            lq.from_phone.clone(),
678            lq.allowed_senders.clone(),
679        ))
680    });
681
682    // Linq signing secret for webhook signature verification
683    // Priority: environment variable > config file
684    let linq_signing_secret: Option<Arc<str>> = std::env::var("CONSTRUCT_LINQ_SIGNING_SECRET")
685        .ok()
686        .and_then(|secret| {
687            let secret = secret.trim();
688            (!secret.is_empty()).then(|| secret.to_owned())
689        })
690        .or_else(|| {
691            config.channels_config.linq.as_ref().and_then(|lq| {
692                lq.signing_secret
693                    .as_deref()
694                    .map(str::trim)
695                    .filter(|secret| !secret.is_empty())
696                    .map(ToOwned::to_owned)
697            })
698        })
699        .map(Arc::from);
700
701    // WATI channel (if configured)
702    let wati_channel: Option<Arc<WatiChannel>> =
703        config.channels_config.wati.as_ref().map(|wati_cfg| {
704            Arc::new(
705                WatiChannel::new(
706                    wati_cfg.api_token.clone(),
707                    wati_cfg.api_url.clone(),
708                    wati_cfg.tenant_id.clone(),
709                    wati_cfg.allowed_numbers.clone(),
710                )
711                .with_transcription(config.transcription.clone()),
712            )
713        });
714
715    // Nextcloud Talk channel (if configured)
716    let nextcloud_talk_channel: Option<Arc<NextcloudTalkChannel>> =
717        config.channels_config.nextcloud_talk.as_ref().map(|nc| {
718            Arc::new(NextcloudTalkChannel::new(
719                nc.base_url.clone(),
720                nc.app_token.clone(),
721                nc.bot_name.clone().unwrap_or_default(),
722                nc.allowed_users.clone(),
723            ))
724        });
725
726    // Nextcloud Talk webhook secret for signature verification
727    // Priority: environment variable > config file
728    let nextcloud_talk_webhook_secret: Option<Arc<str>> =
729        std::env::var("CONSTRUCT_NEXTCLOUD_TALK_WEBHOOK_SECRET")
730            .ok()
731            .and_then(|secret| {
732                let secret = secret.trim();
733                (!secret.is_empty()).then(|| secret.to_owned())
734            })
735            .or_else(|| {
736                config
737                    .channels_config
738                    .nextcloud_talk
739                    .as_ref()
740                    .and_then(|nc| {
741                        nc.webhook_secret
742                            .as_deref()
743                            .map(str::trim)
744                            .filter(|secret| !secret.is_empty())
745                            .map(ToOwned::to_owned)
746                    })
747            })
748            .map(Arc::from);
749
750    // Gmail Push channel (if configured and enabled)
751    let gmail_push_channel: Option<Arc<GmailPushChannel>> = config
752        .channels_config
753        .gmail_push
754        .as_ref()
755        .filter(|gp| gp.enabled)
756        .map(|gp| Arc::new(GmailPushChannel::new(gp.clone())));
757
758    // ── Session persistence for WS chat ─────────────────────
759    let session_backend: Option<Arc<dyn SessionBackend>> = if config.gateway.session_persistence {
760        match SqliteSessionBackend::new(&config.workspace_dir) {
761            Ok(b) => {
762                tracing::info!("Gateway session persistence enabled (SQLite)");
763                if config.gateway.session_ttl_hours > 0 {
764                    if let Ok(cleaned) = b.cleanup_stale(config.gateway.session_ttl_hours) {
765                        if cleaned > 0 {
766                            tracing::info!("Cleaned up {cleaned} stale gateway sessions");
767                        }
768                    }
769                }
770                Some(Arc::new(b))
771            }
772            Err(e) => {
773                tracing::warn!("Session persistence disabled: {e}");
774                None
775            }
776        }
777    } else {
778        None
779    };
780
781    // ── Pairing guard ──────────────────────────────────────
782    let pairing = Arc::new(PairingGuard::new(
783        config.gateway.require_pairing,
784        &config.gateway.paired_tokens,
785    ));
786    let rate_limit_max_keys = normalize_max_keys(
787        config.gateway.rate_limit_max_keys,
788        RATE_LIMIT_MAX_KEYS_DEFAULT,
789    );
790    let rate_limiter = Arc::new(GatewayRateLimiter::new(
791        config.gateway.pair_rate_limit_per_minute,
792        config.gateway.webhook_rate_limit_per_minute,
793        rate_limit_max_keys,
794    ));
795    let idempotency_max_keys = normalize_max_keys(
796        config.gateway.idempotency_max_keys,
797        IDEMPOTENCY_MAX_KEYS_DEFAULT,
798    );
799    let idempotency_store = Arc::new(IdempotencyStore::new(
800        Duration::from_secs(config.gateway.idempotency_ttl_secs.max(1)),
801        idempotency_max_keys,
802    ));
803
804    // Resolve optional path prefix for reverse-proxy deployments.
805    let path_prefix: Option<&str> = config
806        .gateway
807        .path_prefix
808        .as_deref()
809        .filter(|p| !p.is_empty());
810
811    // ── Tunnel ────────────────────────────────────────────────
812    let tunnel = crate::tunnel::create_tunnel(&config.tunnel)?;
813    let mut tunnel_url: Option<String> = None;
814
815    if let Some(ref tun) = tunnel {
816        println!("🔗 Starting {} tunnel...", tun.name());
817        match tun.start(host, actual_port).await {
818            Ok(url) => {
819                println!("🌐 Tunnel active: {url}");
820                tunnel_url = Some(url);
821            }
822            Err(e) => {
823                println!("⚠️  Tunnel failed to start: {e}");
824                println!("   Falling back to local-only mode.");
825            }
826        }
827    }
828
829    let pfx = path_prefix.unwrap_or("");
830    println!("🦀 Construct Gateway listening on http://{display_addr}{pfx}");
831    if let Some(ref url) = tunnel_url {
832        println!("  🌐 Public URL: {url}");
833    }
834    println!("  🌐 Web Dashboard: http://{display_addr}{pfx}/");
835    if let Some(code) = pairing.pairing_code() {
836        println!();
837        println!("  🔐 PAIRING REQUIRED — use this one-time code:");
838        println!("     ┌──────────────┐");
839        println!("     │  {code}  │");
840        println!("     └──────────────┘");
841        println!("     Send: POST {pfx}/pair with header X-Pairing-Code: {code}");
842    } else if pairing.require_pairing() {
843        println!("  🔒 Pairing: ACTIVE (bearer token required)");
844        println!("     To pair a new device: construct gateway get-paircode --new");
845        println!();
846    } else {
847        println!("  ⚠️  Pairing: DISABLED (all requests accepted)");
848        println!();
849    }
850    println!("  POST {pfx}/pair      — pair a new client (X-Pairing-Code header)");
851    println!("  POST {pfx}/webhook   — {{\"message\": \"your prompt\"}}");
852    if whatsapp_channel.is_some() {
853        println!("  GET  {pfx}/whatsapp  — Meta webhook verification");
854        println!("  POST {pfx}/whatsapp  — WhatsApp message webhook");
855    }
856    if linq_channel.is_some() {
857        println!("  POST {pfx}/linq      — Linq message webhook (iMessage/RCS/SMS)");
858    }
859    if wati_channel.is_some() {
860        println!("  GET  {pfx}/wati      — WATI webhook verification");
861        println!("  POST {pfx}/wati      — WATI message webhook");
862    }
863    if nextcloud_talk_channel.is_some() {
864        println!("  POST {pfx}/nextcloud-talk — Nextcloud Talk bot webhook");
865    }
866    println!("  GET  {pfx}/api/*     — REST API (bearer token required)");
867    println!("  GET  {pfx}/ws/chat   — WebSocket agent chat");
868    if config.nodes.enabled {
869        println!("  GET  {pfx}/ws/nodes  — WebSocket node discovery");
870    }
871    println!("  GET  {pfx}/health    — health check");
872    println!("  GET  {pfx}/metrics   — Prometheus metrics");
873    println!("  Press Ctrl+C to stop.\n");
874
875    crate::health::mark_component_ok("gateway");
876
877    // Fire gateway start hook
878    if let Some(ref hooks) = hooks {
879        hooks.fire_gateway_start(host, actual_port).await;
880    }
881
882    // Wrap observer with broadcast capability for SSE
883    let broadcast_observer: Arc<dyn crate::observability::Observer> =
884        Arc::new(sse::BroadcastObserver::new(
885            crate::observability::create_observer(&config.observability),
886            event_tx.clone(),
887        ));
888
889    let (shutdown_tx, mut shutdown_rx) = tokio::sync::watch::channel(false);
890
891    // Node registry for dynamic node discovery
892    let node_registry = Arc::new(nodes::NodeRegistry::new(config.nodes.max_nodes));
893
894    // Device registry and pairing store (only when pairing is required)
895    let device_registry = if config.gateway.require_pairing {
896        Some(Arc::new(api_pairing::DeviceRegistry::new(
897            &config.workspace_dir,
898        )?))
899    } else {
900        None
901    };
902    let pending_pairings = if config.gateway.require_pairing {
903        Some(Arc::new(api_pairing::PairingStore::new(
904            config.gateway.pairing_dashboard.max_pending_codes,
905        )))
906    } else {
907        None
908    };
909
910    // ── Build RuntimeHandles for the in-process MCP server ──────────────
911    //
912    // The MCP task needs live handles so the tools the standalone binary
913    // used to skip (workspace, channel-bound tools, session_store, discord
914    // memory, delegate, …) register properly. We clone individual Arcs out
915    // of the pieces we already built above and feed them in.
916    //
917    // Tools that keep shared handles (reaction/ask_user/escalate/poll) are
918    // easiest to wire by handing their already-built, channel-map-aware
919    // instances over via `pre_built_tools`. We do this by building a second
920    // parallel Arc-vec via `all_tools_with_runtime` — the per-tool state is
921    // stateless for registry purposes, and delegate/swarm get their own
922    // per-call depth counters anyway.
923    let mcp_workspace_dir = config.workspace_dir.clone();
924    let mcp_config_snapshot = config.clone();
925    let mcp_runtime_handles = {
926        let mut rh = crate::mcp_server::RuntimeHandles::empty();
927
928        // Workspace manager (if workspace isolation is on).
929        if config.workspace.enabled {
930            let workspaces_dir = if config.workspace.workspaces_dir.starts_with("~/") {
931                let home = directories::UserDirs::new()
932                    .map(|u| u.home_dir().to_path_buf())
933                    .unwrap_or_else(|| std::path::PathBuf::from("."));
934                home.join(&config.workspace.workspaces_dir[2..])
935            } else {
936                std::path::PathBuf::from(&config.workspace.workspaces_dir)
937            };
938            let manager = crate::config::workspace::WorkspaceManager::new(workspaces_dir);
939            rh.workspace_manager = Some(Arc::new(tokio::sync::RwLock::new(manager)));
940        }
941
942        // Session backend from channels::session_store::SessionStore.
943        if let Ok(store) = crate::channels::session_store::SessionStore::new(&config.workspace_dir)
944        {
945            let backend: Arc<dyn crate::channels::session_backend::SessionBackend> =
946                Arc::new(store);
947            rh.session_store = Some(backend);
948        }
949
950        // Discord memory (formerly for discord_search) — removed with the
951        // SQLite backend; persistent cross-session memory should use Kumiho MCP.
952
953        // Agent config + provider options for delegate / swarm.
954        if !config.agents.is_empty() {
955            rh.agent_config = Some(Arc::new(config.agents.clone()));
956            rh.fallback_api_key = config.api_key.as_deref().map(Arc::<str>::from);
957            rh.provider_runtime_options =
958                Some(Arc::new(crate::providers::ProviderRuntimeOptions {
959                    auth_profile_override: None,
960                    provider_api_url: config.api_url.clone(),
961                    construct_dir: config.config_path.parent().map(std::path::PathBuf::from),
962                    secrets_encrypt: config.secrets.encrypt,
963                    reasoning_enabled: config.runtime.reasoning_enabled,
964                    reasoning_effort: config.runtime.reasoning_effort.clone(),
965                    provider_timeout_secs: Some(config.provider_timeout_secs),
966                    extra_headers: config.extra_headers.clone(),
967                    api_path: config.api_path.clone(),
968                    provider_max_tokens: config.provider_max_tokens,
969                }));
970        }
971
972        // Build a second parallel tool set and hand it to MCP via
973        // `pre_built_tools` so every channel-aware / delegate tool lands in
974        // the MCP registry fully wired (even though its ChannelMap handle is
975        // distinct from the gateway's — the channel supervisor's populate
976        // path still routes through the gateway's tool instances; the MCP
977        // versions advertise and execute independently).
978        let (mcp_tools_boxed, _d_h, _r_h, _c_h, _a_h, _e_h) = tools::all_tools_with_runtime(
979            Arc::new(config.clone()),
980            &security,
981            Arc::new(crate::runtime::NativeRuntime::new()),
982            Arc::clone(&mem),
983            composio_key,
984            composio_entity_id,
985            &config.browser,
986            &config.http_request,
987            &config.web_fetch,
988            &config.workspace_dir,
989            &config.agents,
990            config.api_key.as_deref(),
991            &config,
992            Some(canvas_store.clone()),
993        );
994        // Convert Box<dyn Tool> to Arc<dyn Tool>.
995        let mcp_tool_arcs: Vec<Arc<dyn tools::Tool>> = mcp_tools_boxed
996            .into_iter()
997            .map(|b| Arc::<dyn tools::Tool>::from(b))
998            .collect();
999        rh.pre_built_tools = Some(mcp_tool_arcs);
1000
1001        rh
1002    };
1003
1004    let mut state = AppState {
1005        config: config_state,
1006        provider,
1007        model,
1008        temperature,
1009        mem,
1010        auto_save: config.memory.auto_save,
1011        webhook_secret_hash,
1012        pairing,
1013        trust_forwarded_headers: config.gateway.trust_forwarded_headers,
1014        rate_limiter,
1015        auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
1016        idempotency_store,
1017        whatsapp: whatsapp_channel,
1018        whatsapp_app_secret,
1019        linq: linq_channel,
1020        linq_signing_secret,
1021        nextcloud_talk: nextcloud_talk_channel,
1022        nextcloud_talk_webhook_secret,
1023        wati: wati_channel,
1024        gmail_push: gmail_push_channel,
1025        observer: broadcast_observer,
1026        tools_registry,
1027        cost_tracker,
1028        audit_logger,
1029        event_tx,
1030        shutdown_tx,
1031        node_registry,
1032        session_backend,
1033        session_queue: Arc::new(session_queue::SessionActorQueue::new(8, 30, 600)),
1034        device_registry,
1035        pending_pairings,
1036        path_prefix: path_prefix.unwrap_or("").to_string(),
1037        canvas_store,
1038        mcp_registry: mcp_registry_shared,
1039        approval_registry: approval_registry::global(),
1040        // Populated after the in-process MCP server binds (see below).
1041        mcp_local_url: None,
1042        #[cfg(feature = "webauthn")]
1043        webauthn: if config.security.webauthn.enabled {
1044            let secret_store = Arc::new(crate::security::SecretStore::new(
1045                &config.workspace_dir,
1046                true,
1047            ));
1048            let wa_config = crate::security::webauthn::WebAuthnConfig {
1049                enabled: true,
1050                rp_id: config.security.webauthn.rp_id.clone(),
1051                rp_origin: config.security.webauthn.rp_origin.clone(),
1052                rp_name: config.security.webauthn.rp_name.clone(),
1053            };
1054            Some(Arc::new(api_webauthn::WebAuthnState {
1055                manager: crate::security::webauthn::WebAuthnManager::new(
1056                    wa_config,
1057                    secret_store,
1058                    &config.workspace_dir,
1059                ),
1060                pending_registrations: parking_lot::Mutex::new(std::collections::HashMap::new()),
1061                pending_authentications: parking_lot::Mutex::new(std::collections::HashMap::new()),
1062            }))
1063        } else {
1064            None
1065        },
1066    };
1067
1068    // ── Skill effectiveness cache ───────────────────────────────────────
1069    //
1070    // Build a process-wide cache of recency-weighted skill outcomes so the
1071    // channels system-prompt builder can rerank skills before injecting
1072    // them into agent prompts.  See `src/skills/effectiveness_cache.rs`.
1073    //
1074    // On startup we:
1075    //   1. Construct an empty `Arc<EffectivenessCache>`.
1076    //   2. Install it as the process-wide global so
1077    //      `effectiveness_cache::global_provider()` returns it.
1078    //   3. Spawn a background task that refreshes scores from Kumiho every
1079    //      `DEFAULT_REFRESH_INTERVAL` (5 minutes).  Until the first refresh
1080    //      completes the cache is empty and skills inject in their static
1081    //      load order — same as before this feature shipped.
1082    {
1083        let effectiveness_cache = crate::skills::EffectivenessCache::new();
1084        let _ = crate::skills::effectiveness_cache::set_global(effectiveness_cache.clone());
1085
1086        let memory_project = config.kumiho.memory_project.clone();
1087        let workspace_dir = config.workspace_dir.clone();
1088        let config_for_skills = config.clone();
1089
1090        // Daemon-startup skill registration scan.
1091        //
1092        // Walk every directory under `<workspace>/skills/` and register
1093        // any skill whose SKILL.toml is missing `[skill].kref`.  This is
1094        // the safety-net that catches skills installed without a Kumiho
1095        // round-trip (CLI install with Kumiho unreachable, manual file
1096        // drop, skills materialised by a sibling tool).  Each call is
1097        // idempotent — already-registered skills are a no-op cheap.
1098        // The scan runs in the background so a slow Kumiho doesn't
1099        // delay gateway readiness.
1100        {
1101            let scan_workspace = workspace_dir.clone();
1102            let scan_project = memory_project.clone();
1103            let scan_client = crate::gateway::kumiho_client::build_client_from_config(&config);
1104            drop(tokio::spawn(async move {
1105                let skills_root = crate::skills::skills_dir(&scan_workspace);
1106                let mut entries = match tokio::fs::read_dir(&skills_root).await {
1107                    Ok(e) => e,
1108                    Err(_) => return,
1109                };
1110                while let Ok(Some(entry)) = entries.next_entry().await {
1111                    let dir = entry.path();
1112                    if !dir.is_dir() {
1113                        continue;
1114                    }
1115                    if !dir.join("SKILL.toml").exists() {
1116                        continue;
1117                    }
1118                    let registration_ok =
1119                        match crate::skills::registration::register_skill_with_kumiho(
1120                            &dir,
1121                            &scan_client,
1122                            &scan_project,
1123                        )
1124                        .await
1125                        {
1126                            Ok(crate::skills::registration::SkillRegistration::Registered {
1127                                kref,
1128                                ..
1129                            }) => {
1130                                tracing::info!(
1131                                    skill_dir = %dir.display(),
1132                                    kref,
1133                                    "daemon-startup: registered skill with Kumiho",
1134                                );
1135                                true
1136                            }
1137                            Ok(
1138                                crate::skills::registration::SkillRegistration::AlreadyRegistered {
1139                                    ..
1140                                },
1141                            ) => {
1142                                // No-op — common case after first run.
1143                                true
1144                            }
1145                            Err(e) => {
1146                                tracing::warn!(
1147                                    skill_dir = %dir.display(),
1148                                    error = ?e,
1149                                    "daemon-startup: skill registration failed; will retry next start",
1150                                );
1151                                false
1152                            }
1153                        };
1154
1155                    // Step 6d: keep SKILL.toml's content_file consistent
1156                    // with whatever revision currently holds the
1157                    // `published` tag.  Catches improvements that
1158                    // landed while the daemon was offline.  No-op when
1159                    // already in sync.  Skipped when registration just
1160                    // failed because the kref isn't valid yet.
1161                    if !registration_ok {
1162                        continue;
1163                    }
1164                    match crate::skills::registration::sync_published_content_path(
1165                        &dir,
1166                        &scan_client,
1167                    )
1168                    .await
1169                    {
1170                        Ok(crate::skills::registration::SkillContentSync::Updated {
1171                            new_content_file,
1172                            ..
1173                        }) => {
1174                            tracing::info!(
1175                                skill_dir = %dir.display(),
1176                                content_file = %new_content_file,
1177                                "daemon-startup: synced skill content_file from published kref",
1178                            );
1179                        }
1180                        Ok(_) => {
1181                            // NotRegistered (we just registered it) or
1182                            // AlreadyCurrent — both silent.
1183                        }
1184                        Err(e) => {
1185                            tracing::warn!(
1186                                skill_dir = %dir.display(),
1187                                error = ?e,
1188                                "daemon-startup: skill content_file sync failed; \
1189                                 loader will use existing pointer until next sync",
1190                            );
1191                        }
1192                    }
1193                }
1194            }));
1195        }
1196
1197        // Load the skill names once at startup — the refresh task only
1198        // queries Kumiho for skills already loaded into the runtime, so a
1199        // brand-new skill added later won't be reranked until restart.
1200        let skill_names: Vec<String> =
1201            crate::skills::load_skills_with_config(&workspace_dir, &config_for_skills)
1202                .into_iter()
1203                .map(|s| s.name)
1204                .collect();
1205
1206        if skill_names.is_empty() {
1207            tracing::info!("skill effectiveness: no skills loaded; refresh task skipped");
1208        } else {
1209            let kumiho_client = Arc::new(crate::gateway::kumiho_client::build_client_from_config(
1210                &config,
1211            ));
1212            tracing::info!(
1213                count = skill_names.len(),
1214                project = %memory_project,
1215                "skill effectiveness: starting background refresh task",
1216            );
1217            // JoinHandle is dropped (refresh task lives until process exit).
1218            // Drop the JoinHandle — task is fire-and-forget and lives
1219            // until process exit.  `drop()` rather than `let _ =` to
1220            // silence clippy::let_underscore_future (the JoinHandle is
1221            // a Future, but dropping it does not cancel a tokio task).
1222            // Future improvement: store the handle on AppState and
1223            // abort on shutdown so tests don't leak the task.
1224            drop(effectiveness_cache.clone().spawn_refresh_task(
1225                kumiho_client.clone(),
1226                memory_project,
1227                skill_names,
1228                crate::skills::effectiveness_cache::DEFAULT_REFRESH_INTERVAL,
1229            ));
1230
1231            // ── Auto-improve loop (closes the self-improvement loop) ─────
1232            //
1233            // On the same cadence as the cache refresh, walk the latest
1234            // candidates and ask the LLM to rewrite each regressed skill.
1235            // Cooldowns + atomic-write live inside SkillImprover, so this
1236            // task is just the dispatcher.  Skipped entirely when the
1237            // skill-creation feature is disabled, when the user has
1238            // turned auto-improvement off in config, or when the daemon
1239            // has no chat provider configured.
1240            #[cfg(feature = "skill-creation")]
1241            if config.skills.skill_improvement.enabled {
1242                let cache_for_improver = effectiveness_cache.clone();
1243                let auto_workspace = workspace_dir.clone();
1244                let auto_provider = state.provider.clone();
1245                let auto_model = state.model.clone();
1246                let auto_improver_config = config.skills.skill_improvement.clone();
1247                let auto_kumiho = kumiho_client.clone();
1248                let auto_project = config.kumiho.memory_project.clone();
1249
1250                tracing::info!(
1251                    cooldown_secs = auto_improver_config.cooldown_secs,
1252                    "skill auto-improve: starting LLM-driven rewrite loop",
1253                );
1254
1255                drop(tokio::spawn(async move {
1256                    let ctx = crate::skills::auto_improve::AutoImproveContext {
1257                        workspace_dir: auto_workspace.clone(),
1258                        provider: auto_provider,
1259                        model: auto_model,
1260                        temperature: crate::skills::auto_improve::DEFAULT_REWRITE_TEMPERATURE,
1261                        kumiho_client: auto_kumiho.clone(),
1262                        memory_project: auto_project.clone(),
1263                    };
1264                    let mut improver = crate::skills::improver::SkillImprover::new(
1265                        auto_workspace.clone(),
1266                        auto_improver_config,
1267                    );
1268
1269                    // Step 6f-B: rollback context shares the workspace
1270                    // and Kumiho client with the improver so a single
1271                    // background loop runs both the improvement pass
1272                    // and the regression-rollback pass on each tick.
1273                    let rollback_ctx = crate::skills::auto_rollback::AutoRollbackContext {
1274                        workspace_dir: auto_workspace.clone(),
1275                        kumiho_client: auto_kumiho,
1276                        memory_project: auto_project,
1277                    };
1278                    let mut rollback_tracker =
1279                        crate::skills::auto_rollback::SkillRollbackTracker::new(auto_workspace);
1280
1281                    let mut ticker = tokio::time::interval(
1282                        crate::skills::effectiveness_cache::DEFAULT_REFRESH_INTERVAL,
1283                    );
1284                    ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1285                    // First tick fires immediately; cache is still empty
1286                    // at startup so improvement_candidates() returns [].
1287                    // Subsequent ticks see whatever the refresh task has
1288                    // populated.
1289
1290                    loop {
1291                        ticker.tick().await;
1292                        for cand in cache_for_improver.improvement_candidates() {
1293                            match crate::skills::auto_improve::attempt_skill_improvement(
1294                                &ctx,
1295                                &cand,
1296                                &mut improver,
1297                            )
1298                            .await
1299                            {
1300                                Ok(Some(outcome)) => tracing::info!(
1301                                    skill = %outcome.slug,
1302                                    revision_kref = %outcome.revision_kref,
1303                                    content_file = %outcome.content_file,
1304                                    rate = cand.rate,
1305                                    total = cand.total,
1306                                    "auto-improve: published new skill revision",
1307                                ),
1308                                Ok(None) => {
1309                                    // cooldown / file missing / no markdown fence — silent skip
1310                                }
1311                                Err(e) => tracing::warn!(
1312                                    skill = %cand.skill_name,
1313                                    error = %e,
1314                                    "auto-improve: attempt failed",
1315                                ),
1316                            }
1317                        }
1318
1319                        // Step 6f-B: regression rollback pass.  Iterates
1320                        // skills whose freshly-published revision is
1321                        // measurably worse than its predecessor and
1322                        // retags `published` back onto the previous
1323                        // revision.  Cooldown is per-skill so a healthy
1324                        // skill that ping-pongs once won't be rolled
1325                        // back again before its stats settle.
1326                        for cand in cache_for_improver.regression_candidates() {
1327                            match crate::skills::auto_rollback::attempt_skill_rollback(
1328                                &rollback_ctx,
1329                                &cand,
1330                                &mut rollback_tracker,
1331                            )
1332                            .await
1333                            {
1334                                Ok(Some(outcome)) => tracing::warn!(
1335                                    skill = %outcome.slug,
1336                                    restored_revision_kref = %outcome.restored_revision_kref,
1337                                    demoted_revision_kref = %outcome.demoted_revision_kref,
1338                                    content_file = %outcome.content_file,
1339                                    current_rate = cand.current_rate,
1340                                    previous_rate = cand.previous_rate,
1341                                    "auto-rollback: reverted regressed skill revision",
1342                                ),
1343                                Ok(None) => {
1344                                    // cooldown / no rollback target — silent skip
1345                                }
1346                                Err(e) => tracing::warn!(
1347                                    skill = %cand.skill_name,
1348                                    error = %e,
1349                                    "auto-rollback: attempt failed",
1350                                ),
1351                            }
1352                        }
1353                    }
1354                }));
1355            }
1356        }
1357    }
1358
1359    // ── Spawn the in-process MCP server ─────────────────────────────────
1360    //
1361    // Binds 127.0.0.1:0 (ephemeral, as external clients expect), writes
1362    // ~/.construct/mcp.json with {url,pid,started_at} atomically, and tears
1363    // down on gateway shutdown. If the bind fails we log and move on —
1364    // this is non-fatal for the gateway itself.
1365    let mcp_shutdown_watch = state.shutdown_tx.subscribe();
1366    let mcp_task: Option<tokio::task::JoinHandle<()>> = {
1367        let (mcp_state, mcp_skipped) = crate::mcp_server::server::state_from_runtime(
1368            &mcp_workspace_dir,
1369            &mcp_config_snapshot,
1370            &mcp_runtime_handles,
1371        );
1372        for (name, reason) in &mcp_skipped {
1373            tracing::info!("mcp-server: skipped tool `{name}` — {reason}");
1374        }
1375        tracing::info!(
1376            "mcp-server: advertising {} tools to external MCP clients",
1377            mcp_state.tools.len()
1378        );
1379
1380        match crate::mcp_server::serve_on(SocketAddr::from(([127, 0, 0, 1], 0)), mcp_state).await {
1381            Ok(handle) => {
1382                let url = format!("http://{}/mcp", handle.addr);
1383                if let Err(e) = crate::mcp_server::server::write_discovery_file(&url) {
1384                    tracing::warn!("mcp-server: failed to write discovery file: {e}");
1385                } else {
1386                    tracing::info!(
1387                        "mcp-server: listening on {url} (discovery at ~/.construct/mcp.json)"
1388                    );
1389                }
1390
1391                // Expose the bound base URL (no `/mcp` suffix) to the gateway
1392                // reverse-proxy handlers in `api_mcp`.
1393                state.mcp_local_url = Some(Arc::from(format!("http://{}", handle.addr)));
1394
1395                let mut watch = mcp_shutdown_watch;
1396                Some(tokio::spawn(async move {
1397                    // Wait for the gateway's shutdown signal.
1398                    loop {
1399                        if *watch.borrow() {
1400                            break;
1401                        }
1402                        if watch.changed().await.is_err() {
1403                            break;
1404                        }
1405                    }
1406                    let _ = handle.shutdown.send(());
1407                    let _ = handle.joined.await;
1408                    crate::mcp_server::server::cleanup_discovery_file();
1409                    tracing::info!("mcp-server: stopped");
1410                }))
1411            }
1412            Err(e) => {
1413                tracing::error!("mcp-server: failed to bind: {e}");
1414                None
1415            }
1416        }
1417    };
1418
1419    // Config PUT needs larger body limit (1MB)
1420    let config_put_router = Router::new()
1421        .route("/api/config", put(api::handle_api_config_put))
1422        .layer(RequestBodyLimitLayer::new(1_048_576));
1423
1424    // Memory graph needs longer timeout (aggregates many Kumiho calls via operator).
1425    // Built as a separate router that gets merged AFTER the global timeout layer.
1426    let memory_graph_router = Router::new()
1427        .route(
1428            "/api/memory/graph",
1429            get(api_memory_graph::handle_memory_graph),
1430        )
1431        .with_state(state.clone())
1432        .layer(RequestBodyLimitLayer::new(MAX_BODY_SIZE))
1433        .layer(TimeoutLayer::with_status_code(
1434            StatusCode::REQUEST_TIMEOUT,
1435            Duration::from_secs(60),
1436        ));
1437
1438    // Build router with middleware
1439    let inner = Router::new()
1440        // ── Admin routes (for CLI management) ──
1441        .route("/admin/shutdown", post(handle_admin_shutdown))
1442        .route("/admin/paircode", get(handle_admin_paircode))
1443        .route("/admin/paircode/new", post(handle_admin_paircode_new))
1444        // ── Existing routes ──
1445        .route("/health", get(handle_health))
1446        .route("/metrics", get(handle_metrics))
1447        .route("/pair", post(handle_pair))
1448        .route("/pair/code", get(handle_pair_code))
1449        .route("/webhook", post(handle_webhook))
1450        .route("/whatsapp", get(handle_whatsapp_verify))
1451        .route("/whatsapp", post(handle_whatsapp_message))
1452        .route("/linq", post(handle_linq_webhook))
1453        .route("/wati", get(handle_wati_verify))
1454        .route("/wati", post(handle_wati_webhook))
1455        .route("/nextcloud-talk", post(handle_nextcloud_talk_webhook))
1456        .route("/webhook/gmail", post(handle_gmail_push_webhook))
1457        // ── Claude Code runner hooks ──
1458        .route("/hooks/claude-code", post(api::handle_claude_code_hook))
1459        // ── Web Dashboard API routes ──
1460        .route("/api/status", get(api::handle_api_status))
1461        .route("/api/config", get(api::handle_api_config_get))
1462        .route("/api/tools", get(api::handle_api_tools))
1463        .route("/api/cron", get(api::handle_api_cron_list))
1464        .route("/api/cron", post(api::handle_api_cron_add))
1465        .route(
1466            "/api/cron/settings",
1467            get(api::handle_api_cron_settings_get).patch(api::handle_api_cron_settings_patch),
1468        )
1469        .route(
1470            "/api/cron/{id}",
1471            delete(api::handle_api_cron_delete).patch(api::handle_api_cron_patch),
1472        )
1473        .route("/api/cron/{id}/runs", get(api::handle_api_cron_runs))
1474        .route("/api/integrations", get(api::handle_api_integrations))
1475        .route(
1476            "/api/integrations/settings",
1477            get(api::handle_api_integrations_settings),
1478        )
1479        .route(
1480            "/api/doctor",
1481            get(api::handle_api_doctor).post(api::handle_api_doctor),
1482        )
1483        // Old /api/memory CRUD removed — use Kumiho via /api/memory/graph instead.
1484        .route("/api/cost", get(api::handle_api_cost))
1485        .route("/api/audit", get(api::handle_api_audit))
1486        .route("/api/audit/verify", get(api::handle_api_audit_verify))
1487        .route("/api/cli-tools", get(api::handle_api_cli_tools))
1488        .route("/api/health", get(api::handle_api_health))
1489        .route("/api/mcp/discovery", get(api_mcp::handle_api_mcp_discovery))
1490        .route("/api/mcp/servers/test", post(api_mcp::handle_api_mcp_servers_test))
1491        // ── MCP HTTP reverse-proxy (browser stays same-origin) ──
1492        .route("/api/mcp/health", get(api_mcp::handle_api_mcp_health))
1493        .route("/api/mcp/session", post(api_mcp::handle_api_mcp_session_create))
1494        .route(
1495            "/api/mcp/session/{session_id}/events",
1496            get(api_mcp::handle_api_mcp_session_events),
1497        )
1498        .route("/api/mcp/call", post(api_mcp::handle_api_mcp_call))
1499        .route("/api/nodes", get(api::handle_api_nodes))
1500        .route("/api/nodes/{node_id}/invoke", post(api::handle_api_node_invoke))
1501        .route("/api/sessions", get(api::handle_api_sessions_list))
1502        .route("/api/sessions/running", get(api::handle_api_sessions_running))
1503        .route(
1504            "/api/sessions/{id}/messages",
1505            get(api::handle_api_session_messages),
1506        )
1507        .route("/api/sessions/{id}", delete(api::handle_api_session_delete).put(api::handle_api_session_rename))
1508        .route("/api/sessions/{id}/state", get(api::handle_api_session_state))
1509        // ── Channel detail API ──
1510        .route("/api/channels", get(api::handle_api_channels))
1511        .route("/api/channel-events", post(api::handle_api_channel_events))
1512        // ── Agent management API (proxied to Kumiho FastAPI) ──
1513        .route("/api/agents", get(api_agents::handle_list_agents).post(api_agents::handle_create_agent))
1514        .route("/api/agents/deprecate", post(api_agents::handle_deprecate_agent))
1515        .route("/api/agents/{*kref}", put(api_agents::handle_update_agent).delete(api_agents::handle_delete_agent))
1516        // ── Skill management API (proxied to Kumiho FastAPI) ──
1517        .route("/api/skills", get(api_skills::handle_list_skills).post(api_skills::handle_create_skill))
1518        .route("/api/skills/deprecate", post(api_skills::handle_deprecate_skill))
1519        .route("/api/skills/{*kref}", get(api_skills::handle_get_skill).put(api_skills::handle_update_skill).delete(api_skills::handle_delete_skill))
1520        // ── Team management API (proxied to Kumiho FastAPI) ──
1521        .route("/api/teams", get(api_teams::handle_list_teams).post(api_teams::handle_create_team))
1522        .route("/api/teams/deprecate", post(api_teams::handle_deprecate_team))
1523        .route("/api/teams/{*kref}", get(api_teams::handle_get_team).put(api_teams::handle_update_team).delete(api_teams::handle_delete_team))
1524        // ── Workflow management API (proxied to Kumiho FastAPI) ──
1525        .route("/api/workflows", get(api_workflows::handle_list_workflows).post(api_workflows::handle_create_workflow))
1526        .route("/api/workflows/deprecate", post(api_workflows::handle_deprecate_workflow))
1527        .route("/api/workflows/run/{name}", post(api_workflows::handle_run_workflow))
1528        .route("/api/workflows/revisions/{*kref}", get(api_workflows::handle_get_workflow_by_revision))
1529        .route("/api/workflows/{*kref}", put(api_workflows::handle_update_workflow).delete(api_workflows::handle_delete_workflow))
1530        .route("/api/workflows/runs", get(api_workflows::handle_list_workflow_runs))
1531        .route("/api/workflows/runs/{run_id}", get(api_workflows::handle_get_workflow_run).delete(api_workflows::handle_delete_workflow_run))
1532        .route("/api/workflows/runs/{run_id}/approve", post(api_workflows::handle_approve_workflow_run))
1533        .route("/api/workflows/runs/{run_id}/retry", post(api_workflows::handle_retry_workflow_run))
1534        .route("/api/workflows/agent-activity/{agent_id}", get(api_workflows::handle_agent_activity))
1535        .route("/api/workflows/dashboard", get(api_workflows::handle_workflow_dashboard))
1536        // ── ClawHub marketplace API ──
1537        .route("/api/clawhub/search", get(api_clawhub::handle_clawhub_search))
1538        .route("/api/clawhub/trending", get(api_clawhub::handle_clawhub_trending))
1539        .route("/api/clawhub/skills/{slug}", get(api_clawhub::handle_clawhub_skill_detail))
1540        .route("/api/clawhub/install/{slug}", post(api_clawhub::handle_clawhub_install))
1541        // NOTE: Memory graph route is merged separately with its own 60s timeout
1542        // ── Generic Kumiho API proxy (for Asset Browser, Memory Auditor, etc.) ──
1543        .route("/api/kumiho/{*path}", get(api_kumiho_proxy::handle_kumiho_proxy))
1544        // ── Artifact body (serve local file bytes referenced by Kumiho) ──
1545        .route("/api/artifact-body", get(api_artifact_body::handle_artifact_body))
1546        // ── Pairing + Device management API ──
1547        .route("/api/pairing/initiate", post(api_pairing::initiate_pairing))
1548        .route("/api/pair", post(api_pairing::submit_pairing_enhanced))
1549        .route("/api/devices", get(api_pairing::list_devices))
1550        .route("/api/devices/{id}", delete(api_pairing::revoke_device))
1551        .route(
1552            "/api/devices/{id}/token/rotate",
1553            post(api_pairing::rotate_token),
1554        )
1555        // ── Live Canvas (A2UI) routes ──
1556        .route("/api/canvas", get(canvas::handle_canvas_list))
1557        .route(
1558            "/api/canvas/{id}",
1559            get(canvas::handle_canvas_get)
1560                .post(canvas::handle_canvas_post)
1561                .delete(canvas::handle_canvas_clear),
1562        )
1563        .route(
1564            "/api/canvas/{id}/history",
1565            get(canvas::handle_canvas_history),
1566        );
1567
1568    // ── WebAuthn hardware key authentication API (requires webauthn feature) ──
1569    #[cfg(feature = "webauthn")]
1570    let inner = inner
1571        .route(
1572            "/api/webauthn/register/start",
1573            post(api_webauthn::handle_register_start),
1574        )
1575        .route(
1576            "/api/webauthn/register/finish",
1577            post(api_webauthn::handle_register_finish),
1578        )
1579        .route(
1580            "/api/webauthn/auth/start",
1581            post(api_webauthn::handle_auth_start),
1582        )
1583        .route(
1584            "/api/webauthn/auth/finish",
1585            post(api_webauthn::handle_auth_finish),
1586        )
1587        .route(
1588            "/api/webauthn/credentials",
1589            get(api_webauthn::handle_list_credentials),
1590        )
1591        .route(
1592            "/api/webauthn/credentials/{id}",
1593            delete(api_webauthn::handle_delete_credential),
1594        );
1595
1596    // ── Plugin management API (requires plugins-wasm feature) ──
1597    #[cfg(feature = "plugins-wasm")]
1598    let inner = inner.route(
1599        "/api/plugins",
1600        get(api_plugins::plugin_routes::list_plugins),
1601    );
1602
1603    let inner = inner
1604        // ── SSE event stream ──
1605        .route("/api/events", get(sse::handle_sse_events))
1606        .route("/api/daemon/logs", get(sse::handle_api_daemon_logs))
1607        // ── WebSocket agent chat ──
1608        .route("/ws/chat", get(ws::handle_ws_chat))
1609        // ── WebSocket canvas updates ──
1610        .route("/ws/canvas/{id}", get(canvas::handle_ws_canvas))
1611        // ── WebSocket node discovery ──
1612        .route("/ws/nodes", get(nodes::handle_ws_nodes));
1613
1614    // ── WebSocket PTY terminal ──
1615    // portable-pty needs `openpty` from libutil which Android's NDK does not
1616    // reliably link, and a phone-runtime websocket terminal isn't a real
1617    // surface anyway.  Skip the route on Android.
1618    #[cfg(not(target_os = "android"))]
1619    let inner = inner.route("/ws/terminal", get(terminal::handle_ws_terminal));
1620
1621    let inner = inner
1622        // ── WebSocket proxy onto the in-process MCP server's session events ──
1623        .route("/ws/mcp/events", get(ws_mcp_events::handle_ws_mcp_events))
1624        // ── Static assets (web dashboard) ──
1625        .route("/_app/{*path}", get(static_files::handle_static))
1626        // ── Config PUT with larger body limit ──
1627        .merge(config_put_router)
1628        // ── SPA fallback: non-API GET requests serve index.html ──
1629        .fallback(get(static_files::handle_spa_fallback))
1630        .with_state(state)
1631        .layer(RequestBodyLimitLayer::new(MAX_BODY_SIZE))
1632        .layer(TimeoutLayer::with_status_code(
1633            StatusCode::REQUEST_TIMEOUT,
1634            Duration::from_secs(gateway_request_timeout_secs()),
1635        ));
1636
1637    // Merge memory graph router (has its own 60s timeout, outside the global 30s)
1638    let inner = inner.merge(memory_graph_router);
1639
1640    // Nest under path prefix when configured (axum strips prefix before routing).
1641    // nest() at "/prefix" handles both "/prefix" and "/prefix/*" but not "/prefix/"
1642    // with a trailing slash, so we add a fallback redirect for that case.
1643    let app = if let Some(prefix) = path_prefix {
1644        let redirect_target = prefix.to_string();
1645        Router::new().nest(prefix, inner).route(
1646            &format!("{prefix}/"),
1647            get(|| async move { axum::response::Redirect::permanent(&redirect_target) }),
1648        )
1649    } else {
1650        inner
1651    };
1652
1653    // ── TLS / mTLS setup ───────────────────────────────────────────
1654    let tls_acceptor = match &config.gateway.tls {
1655        Some(tls_cfg) if tls_cfg.enabled => {
1656            let has_mtls = tls_cfg.client_auth.as_ref().is_some_and(|ca| ca.enabled);
1657            if has_mtls {
1658                tracing::info!("TLS enabled with mutual TLS (mTLS) client verification");
1659            } else {
1660                tracing::info!("TLS enabled (no client certificate requirement)");
1661            }
1662            Some(tls::build_tls_acceptor(tls_cfg)?)
1663        }
1664        _ => None,
1665    };
1666
1667    if let Some(tls_acceptor) = tls_acceptor {
1668        // Manual TLS accept loop — serves each connection via hyper.
1669        let app = app.into_make_service_with_connect_info::<SocketAddr>();
1670        let mut app = app;
1671
1672        let mut shutdown_signal = shutdown_rx;
1673        loop {
1674            tokio::select! {
1675                conn = listener.accept() => {
1676                    let (tcp_stream, remote_addr) = conn?;
1677                    let tls_acceptor = tls_acceptor.clone();
1678                    let svc = tower::MakeService::<
1679                        SocketAddr,
1680                        hyper::Request<hyper::body::Incoming>,
1681                    >::make_service(&mut app, remote_addr)
1682                    .await
1683                    .expect("infallible make_service");
1684
1685                    tokio::spawn(async move {
1686                        let tls_stream = match tls_acceptor.accept(tcp_stream).await {
1687                            Ok(s) => s,
1688                            Err(e) => {
1689                                tracing::debug!("TLS handshake failed from {remote_addr}: {e}");
1690                                return;
1691                            }
1692                        };
1693                        let io = hyper_util::rt::TokioIo::new(tls_stream);
1694                        let hyper_svc = hyper::service::service_fn(move |req: hyper::Request<hyper::body::Incoming>| {
1695                            let mut svc = svc.clone();
1696                            async move {
1697                                tower::Service::call(&mut svc, req).await
1698                            }
1699                        });
1700                        if let Err(e) = hyper_util::server::conn::auto::Builder::new(
1701                            hyper_util::rt::TokioExecutor::new(),
1702                        )
1703                        .serve_connection(io, hyper_svc)
1704                        .await
1705                        {
1706                            tracing::debug!("connection error from {remote_addr}: {e}");
1707                        }
1708                    });
1709                }
1710                _ = shutdown_signal.changed() => {
1711                    tracing::info!("🦀 Construct Gateway shutting down...");
1712                    break;
1713                }
1714            }
1715        }
1716    } else {
1717        // Plain TCP — use axum's built-in serve.
1718        axum::serve(
1719            listener,
1720            app.into_make_service_with_connect_info::<SocketAddr>(),
1721        )
1722        .with_graceful_shutdown(async move {
1723            let _ = shutdown_rx.changed().await;
1724            tracing::info!("🦀 Construct Gateway shutting down...");
1725        })
1726        .await?;
1727    }
1728
1729    // Wait for the in-process MCP task to finish its own graceful shutdown.
1730    // It watches the same `shutdown_tx` we just flipped above.
1731    if let Some(task) = mcp_task {
1732        let _ = task.await;
1733    }
1734
1735    Ok(())
1736}
1737
1738// ══════════════════════════════════════════════════════════════════════════════
1739// AXUM HANDLERS
1740// ══════════════════════════════════════════════════════════════════════════════
1741
1742/// GET /health — always public (no secrets leaked)
1743async fn handle_health(State(state): State<AppState>) -> impl IntoResponse {
1744    let body = serde_json::json!({
1745        "status": "ok",
1746        "paired": state.pairing.is_paired(),
1747        "require_pairing": state.pairing.require_pairing(),
1748        "runtime": crate::health::snapshot_json(),
1749    });
1750    Json(body)
1751}
1752
1753/// Prometheus content type for text exposition format.
1754const PROMETHEUS_CONTENT_TYPE: &str = "text/plain; version=0.0.4; charset=utf-8";
1755
1756fn prometheus_disabled_hint() -> String {
1757    String::from(
1758        "# Prometheus backend not enabled. Set [observability] backend = \"prometheus\" in config.\n",
1759    )
1760}
1761
1762#[cfg(feature = "observability-prometheus")]
1763fn prometheus_observer_from_state(
1764    observer: &dyn crate::observability::Observer,
1765) -> Option<&crate::observability::PrometheusObserver> {
1766    observer
1767        .as_any()
1768        .downcast_ref::<crate::observability::PrometheusObserver>()
1769        .or_else(|| {
1770            observer
1771                .as_any()
1772                .downcast_ref::<sse::BroadcastObserver>()
1773                .and_then(|broadcast| {
1774                    broadcast
1775                        .inner()
1776                        .as_any()
1777                        .downcast_ref::<crate::observability::PrometheusObserver>()
1778                })
1779        })
1780}
1781
1782/// GET /metrics — Prometheus text exposition format
1783async fn handle_metrics(State(state): State<AppState>) -> impl IntoResponse {
1784    let body = {
1785        #[cfg(feature = "observability-prometheus")]
1786        {
1787            if let Some(prom) = prometheus_observer_from_state(state.observer.as_ref()) {
1788                prom.encode()
1789            } else {
1790                prometheus_disabled_hint()
1791            }
1792        }
1793        #[cfg(not(feature = "observability-prometheus"))]
1794        {
1795            let _ = &state;
1796            prometheus_disabled_hint()
1797        }
1798    };
1799
1800    (
1801        StatusCode::OK,
1802        [(header::CONTENT_TYPE, PROMETHEUS_CONTENT_TYPE)],
1803        body,
1804    )
1805}
1806
1807/// POST /pair — exchange one-time code for bearer token
1808#[axum::debug_handler]
1809async fn handle_pair(
1810    State(state): State<AppState>,
1811    ConnectInfo(peer_addr): ConnectInfo<SocketAddr>,
1812    headers: HeaderMap,
1813) -> impl IntoResponse {
1814    let rate_key =
1815        client_key_from_request(Some(peer_addr), &headers, state.trust_forwarded_headers);
1816    let peer_is_loopback = peer_addr.ip().is_loopback();
1817    if !state.rate_limiter.allow_pair(&rate_key) {
1818        tracing::warn!("/pair rate limit exceeded");
1819        let err = serde_json::json!({
1820            "error": "Too many pairing requests. Please retry later.",
1821            "retry_after": RATE_LIMIT_WINDOW_SECS,
1822        });
1823        return (StatusCode::TOO_MANY_REQUESTS, Json(err));
1824    }
1825
1826    // ── Auth rate limiting (brute-force protection) ──
1827    if let Err(e) = state
1828        .auth_limiter
1829        .check_rate_limit(&rate_key, peer_is_loopback)
1830    {
1831        tracing::warn!("🔐 Pairing auth rate limit exceeded for {rate_key}");
1832        let err = serde_json::json!({
1833            "error": format!("Too many auth attempts. Try again in {}s.", e.retry_after_secs),
1834            "retry_after": e.retry_after_secs,
1835        });
1836        return (StatusCode::TOO_MANY_REQUESTS, Json(err));
1837    }
1838
1839    let code = headers
1840        .get("X-Pairing-Code")
1841        .and_then(|v| v.to_str().ok())
1842        .unwrap_or("");
1843
1844    match state.pairing.try_pair(code, &rate_key).await {
1845        Ok(Some(token)) => {
1846            tracing::info!("🔐 New client paired successfully");
1847            if let Some(ref logger) = state.audit_logger {
1848                let _ =
1849                    logger.log_auth_success("gateway", &format!("Client paired from {rate_key}"));
1850            }
1851            if let Err(err) =
1852                Box::pin(persist_pairing_tokens(state.config.clone(), &state.pairing)).await
1853            {
1854                tracing::error!("🔐 Pairing succeeded but token persistence failed: {err:#}");
1855                let body = serde_json::json!({
1856                    "paired": true,
1857                    "persisted": false,
1858                    "token": token,
1859                    "message": "Paired for this process, but failed to persist token to config.toml. Check config path and write permissions.",
1860                });
1861                return (StatusCode::OK, Json(body));
1862            }
1863
1864            let body = serde_json::json!({
1865                "paired": true,
1866                "persisted": true,
1867                "token": token,
1868                "message": "Save this token — use it as Authorization: Bearer <token>"
1869            });
1870            (StatusCode::OK, Json(body))
1871        }
1872        Ok(None) => {
1873            state
1874                .auth_limiter
1875                .record_attempt(&rate_key, peer_is_loopback);
1876            tracing::warn!("🔐 Pairing attempt with invalid code");
1877            if let Some(ref logger) = state.audit_logger {
1878                let _ = logger
1879                    .log_auth_failure("gateway", &format!("Invalid pairing code from {rate_key}"));
1880            }
1881            let err = serde_json::json!({"error": "Invalid pairing code"});
1882            (StatusCode::FORBIDDEN, Json(err))
1883        }
1884        Err(lockout_secs) => {
1885            tracing::warn!(
1886                "🔐 Pairing locked out — too many failed attempts ({lockout_secs}s remaining)"
1887            );
1888            if let Some(ref logger) = state.audit_logger {
1889                let _ = logger.log_auth_failure(
1890                    "gateway",
1891                    &format!("Pairing lockout for {rate_key} ({lockout_secs}s)"),
1892                );
1893            }
1894            let err = serde_json::json!({
1895                "error": format!("Too many failed attempts. Try again in {lockout_secs}s."),
1896                "retry_after": lockout_secs
1897            });
1898            (StatusCode::TOO_MANY_REQUESTS, Json(err))
1899        }
1900    }
1901}
1902
1903async fn persist_pairing_tokens(config: Arc<Mutex<Config>>, pairing: &PairingGuard) -> Result<()> {
1904    let paired_tokens = pairing.tokens();
1905    // This is needed because parking_lot's guard is not Send so we clone the inner
1906    // this should be removed once async mutexes are used everywhere
1907    let mut updated_cfg = { config.lock().clone() };
1908    updated_cfg.gateway.paired_tokens = paired_tokens;
1909    updated_cfg
1910        .save()
1911        .await
1912        .context("Failed to persist paired tokens to config.toml")?;
1913
1914    // Keep shared runtime config in sync with persisted tokens.
1915    *config.lock() = updated_cfg;
1916    Ok(())
1917}
1918
1919/// Simple chat for webhook endpoint (no tools, for backward compatibility and testing).
1920async fn run_gateway_chat_simple(state: &AppState, message: &str) -> anyhow::Result<String> {
1921    let user_messages = vec![ChatMessage::user(message)];
1922
1923    // Keep webhook/gateway prompts aligned with channel behavior by injecting
1924    // workspace-aware system context before model invocation.
1925    let system_prompt = {
1926        let config_guard = state.config.lock();
1927        crate::channels::build_system_prompt(
1928            &config_guard.workspace_dir,
1929            &state.model,
1930            &[], // tools - empty for simple chat
1931            &[], // skills
1932            Some(&config_guard.identity),
1933            None, // bootstrap_max_chars - use default
1934        )
1935    };
1936
1937    let mut messages = Vec::with_capacity(1 + user_messages.len());
1938    messages.push(ChatMessage::system(system_prompt));
1939    messages.extend(user_messages);
1940
1941    let multimodal_config = state.config.lock().multimodal.clone();
1942    let prepared =
1943        crate::multimodal::prepare_messages_for_provider(&messages, &multimodal_config).await?;
1944
1945    state
1946        .provider
1947        .chat_with_history(&prepared.messages, &state.model, state.temperature)
1948        .await
1949}
1950
1951/// Full-featured chat with tools for channel handlers (WhatsApp, Linq, Nextcloud Talk).
1952async fn run_gateway_chat_with_tools(
1953    state: &AppState,
1954    message: &str,
1955    session_id: Option<&str>,
1956) -> anyhow::Result<String> {
1957    let config = state.config.lock().clone();
1958    Box::pin(crate::agent::process_message(config, message, session_id)).await
1959}
1960
1961/// Webhook request body
1962#[derive(serde::Deserialize)]
1963pub struct WebhookBody {
1964    pub message: String,
1965}
1966
1967/// POST /webhook — main webhook endpoint
1968async fn handle_webhook(
1969    State(state): State<AppState>,
1970    ConnectInfo(peer_addr): ConnectInfo<SocketAddr>,
1971    headers: HeaderMap,
1972    body: Result<Json<WebhookBody>, axum::extract::rejection::JsonRejection>,
1973) -> impl IntoResponse {
1974    let rate_key =
1975        client_key_from_request(Some(peer_addr), &headers, state.trust_forwarded_headers);
1976    let peer_is_loopback = peer_addr.ip().is_loopback();
1977    if !state.rate_limiter.allow_webhook(&rate_key) {
1978        tracing::warn!("/webhook rate limit exceeded");
1979        let err = serde_json::json!({
1980            "error": "Too many webhook requests. Please retry later.",
1981            "retry_after": RATE_LIMIT_WINDOW_SECS,
1982        });
1983        return (StatusCode::TOO_MANY_REQUESTS, Json(err));
1984    }
1985
1986    // ── Bearer token auth (pairing) with auth rate limiting ──
1987    if state.pairing.require_pairing() {
1988        if let Err(e) = state
1989            .auth_limiter
1990            .check_rate_limit(&rate_key, peer_is_loopback)
1991        {
1992            tracing::warn!("Webhook: auth rate limit exceeded for {rate_key}");
1993            let err = serde_json::json!({
1994                "error": format!("Too many auth attempts. Try again in {}s.", e.retry_after_secs),
1995                "retry_after": e.retry_after_secs,
1996            });
1997            return (StatusCode::TOO_MANY_REQUESTS, Json(err));
1998        }
1999        let auth = headers
2000            .get(header::AUTHORIZATION)
2001            .and_then(|v| v.to_str().ok())
2002            .unwrap_or("");
2003        let token = auth.strip_prefix("Bearer ").unwrap_or("");
2004        if !state.pairing.is_authenticated(token) {
2005            state
2006                .auth_limiter
2007                .record_attempt(&rate_key, peer_is_loopback);
2008            tracing::warn!("Webhook: rejected — not paired / invalid bearer token");
2009            let err = serde_json::json!({
2010                "error": "Unauthorized — pair first via POST /pair, then send Authorization: Bearer <token>"
2011            });
2012            return (StatusCode::UNAUTHORIZED, Json(err));
2013        }
2014    }
2015
2016    // ── Webhook secret auth (optional, additional layer) ──
2017    if let Some(ref secret_hash) = state.webhook_secret_hash {
2018        let header_hash = headers
2019            .get("X-Webhook-Secret")
2020            .and_then(|v| v.to_str().ok())
2021            .map(str::trim)
2022            .filter(|value| !value.is_empty())
2023            .map(hash_webhook_secret);
2024        match header_hash {
2025            Some(val) if constant_time_eq(&val, secret_hash.as_ref()) => {}
2026            _ => {
2027                tracing::warn!("Webhook: rejected request — invalid or missing X-Webhook-Secret");
2028                let err = serde_json::json!({"error": "Unauthorized — invalid or missing X-Webhook-Secret header"});
2029                return (StatusCode::UNAUTHORIZED, Json(err));
2030            }
2031        }
2032    }
2033
2034    // ── Parse body ──
2035    let Json(webhook_body) = match body {
2036        Ok(b) => b,
2037        Err(e) => {
2038            tracing::warn!("Webhook JSON parse error: {e}");
2039            let err = serde_json::json!({
2040                "error": "Invalid JSON body. Expected: {\"message\": \"...\"}"
2041            });
2042            return (StatusCode::BAD_REQUEST, Json(err));
2043        }
2044    };
2045
2046    // ── Idempotency (optional) ──
2047    if let Some(idempotency_key) = headers
2048        .get("X-Idempotency-Key")
2049        .and_then(|v| v.to_str().ok())
2050        .map(str::trim)
2051        .filter(|value| !value.is_empty())
2052    {
2053        if !state.idempotency_store.record_if_new(idempotency_key) {
2054            tracing::info!("Webhook duplicate ignored (idempotency key: {idempotency_key})");
2055            let body = serde_json::json!({
2056                "status": "duplicate",
2057                "idempotent": true,
2058                "message": "Request already processed for this idempotency key"
2059            });
2060            return (StatusCode::OK, Json(body));
2061        }
2062    }
2063
2064    let message = &webhook_body.message;
2065    let session_id = webhook_session_id(&headers);
2066
2067    if state.auto_save && !memory::should_skip_autosave_content(message) {
2068        let key = webhook_memory_key();
2069        let _ = state
2070            .mem
2071            .store(
2072                &key,
2073                message,
2074                MemoryCategory::Conversation,
2075                session_id.as_deref(),
2076            )
2077            .await;
2078    }
2079
2080    let provider_label = state
2081        .config
2082        .lock()
2083        .default_provider
2084        .clone()
2085        .unwrap_or_else(|| "unknown".to_string());
2086    let model_label = state.model.clone();
2087    let started_at = Instant::now();
2088
2089    state
2090        .observer
2091        .record_event(&crate::observability::ObserverEvent::AgentStart {
2092            provider: provider_label.clone(),
2093            model: model_label.clone(),
2094        });
2095    state
2096        .observer
2097        .record_event(&crate::observability::ObserverEvent::LlmRequest {
2098            provider: provider_label.clone(),
2099            model: model_label.clone(),
2100            messages_count: 1,
2101        });
2102
2103    match run_gateway_chat_simple(&state, message).await {
2104        Ok(response) => {
2105            let duration = started_at.elapsed();
2106            state
2107                .observer
2108                .record_event(&crate::observability::ObserverEvent::LlmResponse {
2109                    provider: provider_label.clone(),
2110                    model: model_label.clone(),
2111                    duration,
2112                    success: true,
2113                    error_message: None,
2114                    input_tokens: None,
2115                    output_tokens: None,
2116                });
2117            state.observer.record_metric(
2118                &crate::observability::traits::ObserverMetric::RequestLatency(duration),
2119            );
2120            state
2121                .observer
2122                .record_event(&crate::observability::ObserverEvent::AgentEnd {
2123                    provider: provider_label,
2124                    model: model_label,
2125                    duration,
2126                    tokens_used: None,
2127                    cost_usd: None,
2128                });
2129
2130            let body = serde_json::json!({"response": response, "model": state.model});
2131            (StatusCode::OK, Json(body))
2132        }
2133        Err(e) => {
2134            let duration = started_at.elapsed();
2135            let sanitized = providers::sanitize_api_error(&e.to_string());
2136
2137            state
2138                .observer
2139                .record_event(&crate::observability::ObserverEvent::LlmResponse {
2140                    provider: provider_label.clone(),
2141                    model: model_label.clone(),
2142                    duration,
2143                    success: false,
2144                    error_message: Some(sanitized.clone()),
2145                    input_tokens: None,
2146                    output_tokens: None,
2147                });
2148            state.observer.record_metric(
2149                &crate::observability::traits::ObserverMetric::RequestLatency(duration),
2150            );
2151            state
2152                .observer
2153                .record_event(&crate::observability::ObserverEvent::Error {
2154                    component: "gateway".to_string(),
2155                    message: sanitized.clone(),
2156                });
2157            state
2158                .observer
2159                .record_event(&crate::observability::ObserverEvent::AgentEnd {
2160                    provider: provider_label,
2161                    model: model_label,
2162                    duration,
2163                    tokens_used: None,
2164                    cost_usd: None,
2165                });
2166
2167            tracing::error!("Webhook provider error: {}", sanitized);
2168            let err = serde_json::json!({"error": "LLM request failed"});
2169            (StatusCode::INTERNAL_SERVER_ERROR, Json(err))
2170        }
2171    }
2172}
2173
2174/// `WhatsApp` verification query params
2175#[derive(serde::Deserialize)]
2176pub struct WhatsAppVerifyQuery {
2177    #[serde(rename = "hub.mode")]
2178    pub mode: Option<String>,
2179    #[serde(rename = "hub.verify_token")]
2180    pub verify_token: Option<String>,
2181    #[serde(rename = "hub.challenge")]
2182    pub challenge: Option<String>,
2183}
2184
2185/// GET /whatsapp — Meta webhook verification
2186async fn handle_whatsapp_verify(
2187    State(state): State<AppState>,
2188    Query(params): Query<WhatsAppVerifyQuery>,
2189) -> impl IntoResponse {
2190    let Some(ref wa) = state.whatsapp else {
2191        return (StatusCode::NOT_FOUND, "WhatsApp not configured".to_string());
2192    };
2193
2194    // Verify the token matches (constant-time comparison to prevent timing attacks)
2195    let token_matches = params
2196        .verify_token
2197        .as_deref()
2198        .is_some_and(|t| constant_time_eq(t, wa.verify_token()));
2199    if params.mode.as_deref() == Some("subscribe") && token_matches {
2200        if let Some(ch) = params.challenge {
2201            tracing::info!("WhatsApp webhook verified successfully");
2202            return (StatusCode::OK, ch);
2203        }
2204        return (StatusCode::BAD_REQUEST, "Missing hub.challenge".to_string());
2205    }
2206
2207    tracing::warn!("WhatsApp webhook verification failed — token mismatch");
2208    (StatusCode::FORBIDDEN, "Forbidden".to_string())
2209}
2210
2211/// Verify `WhatsApp` webhook signature (`X-Hub-Signature-256`).
2212/// Returns true if the signature is valid, false otherwise.
2213/// See: <https://developers.facebook.com/docs/graph-api/webhooks/getting-started#verification-requests>
2214pub fn verify_whatsapp_signature(app_secret: &str, body: &[u8], signature_header: &str) -> bool {
2215    use hmac::{Hmac, Mac};
2216    use sha2::Sha256;
2217
2218    // Signature format: "sha256=<hex_signature>"
2219    let Some(hex_sig) = signature_header.strip_prefix("sha256=") else {
2220        return false;
2221    };
2222
2223    // Decode hex signature
2224    let Ok(expected) = hex::decode(hex_sig) else {
2225        return false;
2226    };
2227
2228    // Compute HMAC-SHA256
2229    let Ok(mut mac) = Hmac::<Sha256>::new_from_slice(app_secret.as_bytes()) else {
2230        return false;
2231    };
2232    mac.update(body);
2233
2234    // Constant-time comparison
2235    mac.verify_slice(&expected).is_ok()
2236}
2237
2238/// POST /whatsapp — incoming message webhook
2239async fn handle_whatsapp_message(
2240    State(state): State<AppState>,
2241    headers: HeaderMap,
2242    body: Bytes,
2243) -> impl IntoResponse {
2244    let Some(ref wa) = state.whatsapp else {
2245        return (
2246            StatusCode::NOT_FOUND,
2247            Json(serde_json::json!({"error": "WhatsApp not configured"})),
2248        );
2249    };
2250
2251    // ── Security: Verify X-Hub-Signature-256 if app_secret is configured ──
2252    if let Some(ref app_secret) = state.whatsapp_app_secret {
2253        let signature = headers
2254            .get("X-Hub-Signature-256")
2255            .and_then(|v| v.to_str().ok())
2256            .unwrap_or("");
2257
2258        if !verify_whatsapp_signature(app_secret, &body, signature) {
2259            tracing::warn!(
2260                "WhatsApp webhook signature verification failed (signature: {})",
2261                if signature.is_empty() {
2262                    "missing"
2263                } else {
2264                    "invalid"
2265                }
2266            );
2267            return (
2268                StatusCode::UNAUTHORIZED,
2269                Json(serde_json::json!({"error": "Invalid signature"})),
2270            );
2271        }
2272    }
2273
2274    // Parse JSON body
2275    let Ok(payload) = serde_json::from_slice::<serde_json::Value>(&body) else {
2276        return (
2277            StatusCode::BAD_REQUEST,
2278            Json(serde_json::json!({"error": "Invalid JSON payload"})),
2279        );
2280    };
2281
2282    // Parse messages from the webhook payload
2283    let messages = wa.parse_webhook_payload(&payload);
2284
2285    if messages.is_empty() {
2286        // Acknowledge the webhook even if no messages (could be status updates)
2287        return (StatusCode::OK, Json(serde_json::json!({"status": "ok"})));
2288    }
2289
2290    // Process each message
2291    for msg in &messages {
2292        tracing::info!(
2293            "WhatsApp message from {}: {}",
2294            msg.sender,
2295            truncate_with_ellipsis(&msg.content, 50)
2296        );
2297        let session_id = sender_session_id("whatsapp", msg);
2298
2299        // Auto-save to memory
2300        if state.auto_save && !memory::should_skip_autosave_content(&msg.content) {
2301            let key = whatsapp_memory_key(msg);
2302            let _ = state
2303                .mem
2304                .store(
2305                    &key,
2306                    &msg.content,
2307                    MemoryCategory::Conversation,
2308                    Some(&session_id),
2309                )
2310                .await;
2311        }
2312
2313        match Box::pin(run_gateway_chat_with_tools(
2314            &state,
2315            &msg.content,
2316            Some(&session_id),
2317        ))
2318        .await
2319        {
2320            Ok(response) => {
2321                // Send reply via WhatsApp
2322                if let Err(e) = wa
2323                    .send(&SendMessage::new(response, &msg.reply_target))
2324                    .await
2325                {
2326                    tracing::error!("Failed to send WhatsApp reply: {e}");
2327                }
2328            }
2329            Err(e) => {
2330                tracing::error!("LLM error for WhatsApp message: {e:#}");
2331                let _ = wa
2332                    .send(&SendMessage::new(
2333                        "Sorry, I couldn't process your message right now.",
2334                        &msg.reply_target,
2335                    ))
2336                    .await;
2337            }
2338        }
2339    }
2340
2341    // Acknowledge the webhook
2342    (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
2343}
2344
2345/// POST /linq — incoming message webhook (iMessage/RCS/SMS via Linq)
2346async fn handle_linq_webhook(
2347    State(state): State<AppState>,
2348    headers: HeaderMap,
2349    body: Bytes,
2350) -> impl IntoResponse {
2351    let Some(ref linq) = state.linq else {
2352        return (
2353            StatusCode::NOT_FOUND,
2354            Json(serde_json::json!({"error": "Linq not configured"})),
2355        );
2356    };
2357
2358    let body_str = String::from_utf8_lossy(&body);
2359
2360    // ── Security: Verify X-Webhook-Signature if signing_secret is configured ──
2361    if let Some(ref signing_secret) = state.linq_signing_secret {
2362        let timestamp = headers
2363            .get("X-Webhook-Timestamp")
2364            .and_then(|v| v.to_str().ok())
2365            .unwrap_or("");
2366
2367        let signature = headers
2368            .get("X-Webhook-Signature")
2369            .and_then(|v| v.to_str().ok())
2370            .unwrap_or("");
2371
2372        if !crate::channels::linq::verify_linq_signature(
2373            signing_secret,
2374            &body_str,
2375            timestamp,
2376            signature,
2377        ) {
2378            tracing::warn!(
2379                "Linq webhook signature verification failed (signature: {})",
2380                if signature.is_empty() {
2381                    "missing"
2382                } else {
2383                    "invalid"
2384                }
2385            );
2386            return (
2387                StatusCode::UNAUTHORIZED,
2388                Json(serde_json::json!({"error": "Invalid signature"})),
2389            );
2390        }
2391    }
2392
2393    // Parse JSON body
2394    let Ok(payload) = serde_json::from_slice::<serde_json::Value>(&body) else {
2395        return (
2396            StatusCode::BAD_REQUEST,
2397            Json(serde_json::json!({"error": "Invalid JSON payload"})),
2398        );
2399    };
2400
2401    // Parse messages from the webhook payload
2402    let messages = linq.parse_webhook_payload(&payload);
2403
2404    if messages.is_empty() {
2405        // Acknowledge the webhook even if no messages (could be status/delivery events)
2406        return (StatusCode::OK, Json(serde_json::json!({"status": "ok"})));
2407    }
2408
2409    // Process each message
2410    for msg in &messages {
2411        tracing::info!(
2412            "Linq message from {}: {}",
2413            msg.sender,
2414            truncate_with_ellipsis(&msg.content, 50)
2415        );
2416        let session_id = sender_session_id("linq", msg);
2417
2418        // Auto-save to memory
2419        if state.auto_save && !memory::should_skip_autosave_content(&msg.content) {
2420            let key = linq_memory_key(msg);
2421            let _ = state
2422                .mem
2423                .store(
2424                    &key,
2425                    &msg.content,
2426                    MemoryCategory::Conversation,
2427                    Some(&session_id),
2428                )
2429                .await;
2430        }
2431
2432        // Call the LLM
2433        match Box::pin(run_gateway_chat_with_tools(
2434            &state,
2435            &msg.content,
2436            Some(&session_id),
2437        ))
2438        .await
2439        {
2440            Ok(response) => {
2441                // Send reply via Linq
2442                if let Err(e) = linq
2443                    .send(&SendMessage::new(response, &msg.reply_target))
2444                    .await
2445                {
2446                    tracing::error!("Failed to send Linq reply: {e}");
2447                }
2448            }
2449            Err(e) => {
2450                tracing::error!("LLM error for Linq message: {e:#}");
2451                let _ = linq
2452                    .send(&SendMessage::new(
2453                        "Sorry, I couldn't process your message right now.",
2454                        &msg.reply_target,
2455                    ))
2456                    .await;
2457            }
2458        }
2459    }
2460
2461    // Acknowledge the webhook
2462    (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
2463}
2464
2465/// GET /wati — WATI webhook verification (echoes hub.challenge)
2466async fn handle_wati_verify(
2467    State(state): State<AppState>,
2468    Query(params): Query<WatiVerifyQuery>,
2469) -> impl IntoResponse {
2470    if state.wati.is_none() {
2471        return (StatusCode::NOT_FOUND, "WATI not configured".to_string());
2472    }
2473
2474    // WATI may use Meta-style webhook verification; echo the challenge
2475    if let Some(challenge) = params.challenge {
2476        tracing::info!("WATI webhook verified successfully");
2477        return (StatusCode::OK, challenge);
2478    }
2479
2480    (StatusCode::BAD_REQUEST, "Missing hub.challenge".to_string())
2481}
2482
2483#[derive(Debug, serde::Deserialize)]
2484pub struct WatiVerifyQuery {
2485    #[serde(rename = "hub.challenge")]
2486    pub challenge: Option<String>,
2487}
2488
2489/// POST /wati — incoming WATI WhatsApp message webhook
2490async fn handle_wati_webhook(State(state): State<AppState>, body: Bytes) -> impl IntoResponse {
2491    let Some(ref wati) = state.wati else {
2492        return (
2493            StatusCode::NOT_FOUND,
2494            Json(serde_json::json!({"error": "WATI not configured"})),
2495        );
2496    };
2497
2498    // Parse JSON body
2499    let Ok(payload) = serde_json::from_slice::<serde_json::Value>(&body) else {
2500        return (
2501            StatusCode::BAD_REQUEST,
2502            Json(serde_json::json!({"error": "Invalid JSON payload"})),
2503        );
2504    };
2505
2506    // Detect audio before the synchronous parse
2507    let msg_type = payload.get("type").and_then(|v| v.as_str()).unwrap_or("");
2508
2509    let messages = if matches!(msg_type, "audio" | "voice") {
2510        // Build a synthetic ChannelMessage from the audio transcript
2511        if let Some(transcript) = wati.try_transcribe_audio(&payload).await {
2512            wati.parse_audio_as_message(&payload, transcript)
2513        } else {
2514            vec![]
2515        }
2516    } else {
2517        wati.parse_webhook_payload(&payload)
2518    };
2519
2520    if messages.is_empty() {
2521        return (StatusCode::OK, Json(serde_json::json!({"status": "ok"})));
2522    }
2523
2524    // Process each message
2525    for msg in &messages {
2526        tracing::info!(
2527            "WATI message from {}: {}",
2528            msg.sender,
2529            truncate_with_ellipsis(&msg.content, 50)
2530        );
2531        let session_id = sender_session_id("wati", msg);
2532
2533        // Auto-save to memory
2534        if state.auto_save && !memory::should_skip_autosave_content(&msg.content) {
2535            let key = wati_memory_key(msg);
2536            let _ = state
2537                .mem
2538                .store(
2539                    &key,
2540                    &msg.content,
2541                    MemoryCategory::Conversation,
2542                    Some(&session_id),
2543                )
2544                .await;
2545        }
2546
2547        // Call the LLM
2548        match Box::pin(run_gateway_chat_with_tools(
2549            &state,
2550            &msg.content,
2551            Some(&session_id),
2552        ))
2553        .await
2554        {
2555            Ok(response) => {
2556                // Send reply via WATI
2557                if let Err(e) = wati
2558                    .send(&SendMessage::new(response, &msg.reply_target))
2559                    .await
2560                {
2561                    tracing::error!("Failed to send WATI reply: {e}");
2562                }
2563            }
2564            Err(e) => {
2565                tracing::error!("LLM error for WATI message: {e:#}");
2566                let _ = wati
2567                    .send(&SendMessage::new(
2568                        "Sorry, I couldn't process your message right now.",
2569                        &msg.reply_target,
2570                    ))
2571                    .await;
2572            }
2573        }
2574    }
2575
2576    // Acknowledge the webhook
2577    (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
2578}
2579
2580/// POST /nextcloud-talk — incoming message webhook (Nextcloud Talk bot API)
2581async fn handle_nextcloud_talk_webhook(
2582    State(state): State<AppState>,
2583    headers: HeaderMap,
2584    body: Bytes,
2585) -> impl IntoResponse {
2586    let Some(ref nextcloud_talk) = state.nextcloud_talk else {
2587        return (
2588            StatusCode::NOT_FOUND,
2589            Json(serde_json::json!({"error": "Nextcloud Talk not configured"})),
2590        );
2591    };
2592
2593    let body_str = String::from_utf8_lossy(&body);
2594
2595    // ── Security: Verify Nextcloud Talk HMAC signature if secret is configured ──
2596    if let Some(ref webhook_secret) = state.nextcloud_talk_webhook_secret {
2597        let random = headers
2598            .get("X-Nextcloud-Talk-Random")
2599            .and_then(|v| v.to_str().ok())
2600            .unwrap_or("");
2601
2602        let signature = headers
2603            .get("X-Nextcloud-Talk-Signature")
2604            .and_then(|v| v.to_str().ok())
2605            .unwrap_or("");
2606
2607        if !crate::channels::nextcloud_talk::verify_nextcloud_talk_signature(
2608            webhook_secret,
2609            random,
2610            &body_str,
2611            signature,
2612        ) {
2613            tracing::warn!(
2614                "Nextcloud Talk webhook signature verification failed (signature: {})",
2615                if signature.is_empty() {
2616                    "missing"
2617                } else {
2618                    "invalid"
2619                }
2620            );
2621            return (
2622                StatusCode::UNAUTHORIZED,
2623                Json(serde_json::json!({"error": "Invalid signature"})),
2624            );
2625        }
2626    }
2627
2628    // Parse JSON body
2629    let Ok(payload) = serde_json::from_slice::<serde_json::Value>(&body) else {
2630        return (
2631            StatusCode::BAD_REQUEST,
2632            Json(serde_json::json!({"error": "Invalid JSON payload"})),
2633        );
2634    };
2635
2636    // Parse messages from webhook payload
2637    let messages = nextcloud_talk.parse_webhook_payload(&payload);
2638    if messages.is_empty() {
2639        // Acknowledge webhook even if payload does not contain actionable user messages.
2640        return (StatusCode::OK, Json(serde_json::json!({"status": "ok"})));
2641    }
2642
2643    for msg in &messages {
2644        tracing::info!(
2645            "Nextcloud Talk message from {}: {}",
2646            msg.sender,
2647            truncate_with_ellipsis(&msg.content, 50)
2648        );
2649        let session_id = sender_session_id("nextcloud_talk", msg);
2650
2651        if state.auto_save && !memory::should_skip_autosave_content(&msg.content) {
2652            let key = nextcloud_talk_memory_key(msg);
2653            let _ = state
2654                .mem
2655                .store(
2656                    &key,
2657                    &msg.content,
2658                    MemoryCategory::Conversation,
2659                    Some(&session_id),
2660                )
2661                .await;
2662        }
2663
2664        match Box::pin(run_gateway_chat_with_tools(
2665            &state,
2666            &msg.content,
2667            Some(&session_id),
2668        ))
2669        .await
2670        {
2671            Ok(response) => {
2672                if let Err(e) = nextcloud_talk
2673                    .send(&SendMessage::new(response, &msg.reply_target))
2674                    .await
2675                {
2676                    tracing::error!("Failed to send Nextcloud Talk reply: {e}");
2677                }
2678            }
2679            Err(e) => {
2680                tracing::error!("LLM error for Nextcloud Talk message: {e:#}");
2681                let _ = nextcloud_talk
2682                    .send(&SendMessage::new(
2683                        "Sorry, I couldn't process your message right now.",
2684                        &msg.reply_target,
2685                    ))
2686                    .await;
2687            }
2688        }
2689    }
2690
2691    (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
2692}
2693
2694/// Maximum request body size for the Gmail webhook endpoint (1 MB).
2695/// Google Pub/Sub messages are typically under 10 KB.
2696const GMAIL_WEBHOOK_MAX_BODY: usize = 1024 * 1024;
2697
2698/// POST /webhook/gmail — incoming Gmail Pub/Sub push notification
2699async fn handle_gmail_push_webhook(
2700    State(state): State<AppState>,
2701    headers: HeaderMap,
2702    body: Bytes,
2703) -> impl IntoResponse {
2704    let Some(ref gmail_push) = state.gmail_push else {
2705        return (
2706            StatusCode::NOT_FOUND,
2707            Json(serde_json::json!({"error": "Gmail push not configured"})),
2708        );
2709    };
2710
2711    // Enforce body size limit.
2712    if body.len() > GMAIL_WEBHOOK_MAX_BODY {
2713        return (
2714            StatusCode::PAYLOAD_TOO_LARGE,
2715            Json(serde_json::json!({"error": "Request body too large"})),
2716        );
2717    }
2718
2719    // Authenticate the webhook request using a shared secret.
2720    let secret = gmail_push.resolve_webhook_secret();
2721    if !secret.is_empty() {
2722        let provided = headers
2723            .get(axum::http::header::AUTHORIZATION)
2724            .and_then(|v| v.to_str().ok())
2725            .and_then(|auth| auth.strip_prefix("Bearer "))
2726            .unwrap_or("");
2727
2728        if provided != secret {
2729            tracing::warn!("Gmail push webhook: unauthorized request");
2730            return (
2731                StatusCode::UNAUTHORIZED,
2732                Json(serde_json::json!({"error": "Unauthorized"})),
2733            );
2734        }
2735    }
2736
2737    let body_str = String::from_utf8_lossy(&body);
2738    let envelope: crate::channels::gmail_push::PubSubEnvelope =
2739        match serde_json::from_str(&body_str) {
2740            Ok(e) => e,
2741            Err(e) => {
2742                tracing::warn!("Gmail push webhook: invalid payload: {e}");
2743                return (
2744                    StatusCode::BAD_REQUEST,
2745                    Json(serde_json::json!({"error": "Invalid Pub/Sub envelope"})),
2746                );
2747            }
2748        };
2749
2750    // Process the notification asynchronously (non-blocking for the webhook response)
2751    let channel = Arc::clone(gmail_push);
2752    tokio::spawn(async move {
2753        if let Err(e) = channel.handle_notification(&envelope).await {
2754            tracing::error!("Gmail push notification processing failed: {e:#}");
2755        }
2756    });
2757
2758    // Acknowledge immediately — Google Pub/Sub requires a 2xx within ~10s
2759    (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
2760}
2761
2762// ══════════════════════════════════════════════════════════════════════════════
2763// ADMIN HANDLERS (for CLI management)
2764// ══════════════════════════════════════════════════════════════════════════════
2765
2766/// Response for admin endpoints
2767#[derive(serde::Serialize)]
2768struct AdminResponse {
2769    success: bool,
2770    message: String,
2771}
2772
2773/// Reject requests that do not originate from a loopback address.
2774fn require_localhost(peer: &SocketAddr) -> Result<(), (StatusCode, Json<serde_json::Value>)> {
2775    if peer.ip().is_loopback() {
2776        Ok(())
2777    } else {
2778        Err((
2779            StatusCode::FORBIDDEN,
2780            Json(serde_json::json!({
2781                "error": "Admin endpoints are restricted to localhost"
2782            })),
2783        ))
2784    }
2785}
2786
2787/// POST /admin/shutdown — graceful shutdown from CLI (localhost only)
2788async fn handle_admin_shutdown(
2789    State(state): State<AppState>,
2790    ConnectInfo(peer): ConnectInfo<SocketAddr>,
2791) -> Result<impl IntoResponse, (StatusCode, Json<serde_json::Value>)> {
2792    require_localhost(&peer)?;
2793    tracing::info!("🔌 Admin shutdown request received — initiating graceful shutdown");
2794
2795    let body = AdminResponse {
2796        success: true,
2797        message: "Gateway shutdown initiated".to_string(),
2798    };
2799
2800    let _ = state.shutdown_tx.send(true);
2801
2802    Ok((StatusCode::OK, Json(body)))
2803}
2804
2805/// GET /admin/paircode — fetch current pairing code (localhost only)
2806async fn handle_admin_paircode(
2807    State(state): State<AppState>,
2808    ConnectInfo(peer): ConnectInfo<SocketAddr>,
2809) -> Result<impl IntoResponse, (StatusCode, Json<serde_json::Value>)> {
2810    require_localhost(&peer)?;
2811    let code = state.pairing.pairing_code();
2812
2813    let body = if let Some(c) = code {
2814        serde_json::json!({
2815            "success": true,
2816            "pairing_required": state.pairing.require_pairing(),
2817            "pairing_code": c,
2818            "message": "Use this one-time code to pair"
2819        })
2820    } else {
2821        serde_json::json!({
2822            "success": true,
2823            "pairing_required": state.pairing.require_pairing(),
2824            "pairing_code": null,
2825            "message": if state.pairing.require_pairing() {
2826                "Pairing is active but no new code available (already paired or code expired)"
2827            } else {
2828                "Pairing is disabled for this gateway"
2829            }
2830        })
2831    };
2832
2833    Ok((StatusCode::OK, Json(body)))
2834}
2835
2836/// POST /admin/paircode/new — generate a new pairing code (localhost only)
2837async fn handle_admin_paircode_new(
2838    State(state): State<AppState>,
2839    ConnectInfo(peer): ConnectInfo<SocketAddr>,
2840) -> Result<impl IntoResponse, (StatusCode, Json<serde_json::Value>)> {
2841    require_localhost(&peer)?;
2842    match state.pairing.generate_new_pairing_code() {
2843        Some(code) => {
2844            tracing::info!("🔐 New pairing code generated via admin endpoint");
2845            let body = serde_json::json!({
2846                "success": true,
2847                "pairing_required": state.pairing.require_pairing(),
2848                "pairing_code": code,
2849                "message": "New pairing code generated — use this one-time code to pair"
2850            });
2851            Ok((StatusCode::OK, Json(body)))
2852        }
2853        None => {
2854            let body = serde_json::json!({
2855                "success": false,
2856                "pairing_required": false,
2857                "pairing_code": null,
2858                "message": "Pairing is disabled for this gateway"
2859            });
2860            Ok((StatusCode::BAD_REQUEST, Json(body)))
2861        }
2862    }
2863}
2864
2865/// GET /pair/code — fetch the initial pairing code.
2866///
2867/// Requires a loopback peer. A publicly-reachable endpoint would let any caller
2868/// (e.g. an attacker scanning exposed ngrok/Cloudflare tunnels during first-run)
2869/// fetch the code before the legitimate operator. Host-side dashboards should
2870/// reach the gateway over loopback; containerized setups can call this via
2871/// `docker exec` or fetch the code from `construct onboard` output.
2872async fn handle_pair_code(
2873    State(state): State<AppState>,
2874    ConnectInfo(peer): ConnectInfo<SocketAddr>,
2875) -> Result<impl IntoResponse, (StatusCode, Json<serde_json::Value>)> {
2876    require_localhost(&peer)?;
2877
2878    let require = state.pairing.require_pairing();
2879    let is_paired = state.pairing.is_paired();
2880
2881    let code = if require && !is_paired {
2882        state.pairing.pairing_code()
2883    } else {
2884        None
2885    };
2886
2887    let body = serde_json::json!({
2888        "success": true,
2889        "pairing_required": require,
2890        "pairing_code": code,
2891    });
2892
2893    Ok((StatusCode::OK, Json(body)))
2894}
2895
2896#[cfg(test)]
2897mod tests {
2898    use super::*;
2899    use crate::channels::traits::ChannelMessage;
2900    use crate::memory::{Memory, MemoryCategory, MemoryEntry};
2901    use crate::providers::Provider;
2902    use async_trait::async_trait;
2903    use axum::http::HeaderValue;
2904    use axum::response::IntoResponse;
2905    use http_body_util::BodyExt;
2906    use parking_lot::Mutex;
2907    use std::sync::atomic::{AtomicUsize, Ordering};
2908
2909    /// Generate a random hex secret at runtime to avoid hard-coded cryptographic values.
2910    fn generate_test_secret() -> String {
2911        let bytes: [u8; 32] = rand::random();
2912        hex::encode(bytes)
2913    }
2914
2915    #[test]
2916    fn security_body_limit_is_64kb() {
2917        assert_eq!(MAX_BODY_SIZE, 65_536);
2918    }
2919
2920    #[test]
2921    fn security_timeout_default_is_30_seconds() {
2922        assert_eq!(REQUEST_TIMEOUT_SECS, 30);
2923    }
2924
2925    #[test]
2926    fn gateway_timeout_falls_back_to_default() {
2927        // When env var is not set, should return the default constant
2928        // SAFETY: test-only, single-threaded test runner.
2929        unsafe { std::env::remove_var("CONSTRUCT_GATEWAY_TIMEOUT_SECS") };
2930        assert_eq!(gateway_request_timeout_secs(), 30);
2931    }
2932
2933    #[test]
2934    fn webhook_body_requires_message_field() {
2935        let valid = r#"{"message": "hello"}"#;
2936        let parsed: Result<WebhookBody, _> = serde_json::from_str(valid);
2937        assert!(parsed.is_ok());
2938        assert_eq!(parsed.unwrap().message, "hello");
2939
2940        let missing = r#"{"other": "field"}"#;
2941        let parsed: Result<WebhookBody, _> = serde_json::from_str(missing);
2942        assert!(parsed.is_err());
2943    }
2944
2945    #[test]
2946    fn whatsapp_query_fields_are_optional() {
2947        let q = WhatsAppVerifyQuery {
2948            mode: None,
2949            verify_token: None,
2950            challenge: None,
2951        };
2952        assert!(q.mode.is_none());
2953    }
2954
2955    #[test]
2956    fn app_state_is_clone() {
2957        fn assert_clone<T: Clone>() {}
2958        assert_clone::<AppState>();
2959    }
2960
2961    #[tokio::test]
2962    async fn metrics_endpoint_returns_hint_when_prometheus_is_disabled() {
2963        let state = AppState {
2964            config: Arc::new(Mutex::new(Config::default())),
2965            provider: Arc::new(MockProvider::default()),
2966            model: "test-model".into(),
2967            temperature: 0.0,
2968            mem: Arc::new(MockMemory),
2969            auto_save: false,
2970            webhook_secret_hash: None,
2971            pairing: Arc::new(PairingGuard::new(false, &[])),
2972            trust_forwarded_headers: false,
2973            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
2974            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
2975            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
2976            whatsapp: None,
2977            whatsapp_app_secret: None,
2978            linq: None,
2979            linq_signing_secret: None,
2980            nextcloud_talk: None,
2981            nextcloud_talk_webhook_secret: None,
2982            wati: None,
2983            gmail_push: None,
2984            observer: Arc::new(crate::observability::NoopObserver),
2985            tools_registry: Arc::new(Vec::new()),
2986            cost_tracker: None,
2987            audit_logger: None,
2988            event_tx: tokio::sync::broadcast::channel(16).0,
2989            shutdown_tx: tokio::sync::watch::channel(false).0,
2990            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
2991            path_prefix: String::new(),
2992            session_backend: None,
2993            session_queue: std::sync::Arc::new(
2994                crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
2995            ),
2996            device_registry: None,
2997            pending_pairings: None,
2998            canvas_store: CanvasStore::new(),
2999            mcp_registry: None,
3000            approval_registry: approval_registry::global(),
3001            mcp_local_url: None,
3002            #[cfg(feature = "webauthn")]
3003            webauthn: None,
3004        };
3005
3006        let response = handle_metrics(State(state)).await.into_response();
3007        assert_eq!(response.status(), StatusCode::OK);
3008        assert_eq!(
3009            response
3010                .headers()
3011                .get(header::CONTENT_TYPE)
3012                .and_then(|value| value.to_str().ok()),
3013            Some(PROMETHEUS_CONTENT_TYPE)
3014        );
3015
3016        let body = response.into_body().collect().await.unwrap().to_bytes();
3017        let text = String::from_utf8(body.to_vec()).unwrap();
3018        assert!(text.contains("Prometheus backend not enabled"));
3019    }
3020
3021    #[cfg(feature = "observability-prometheus")]
3022    #[tokio::test]
3023    async fn metrics_endpoint_renders_prometheus_output() {
3024        let event_tx = tokio::sync::broadcast::channel(16).0;
3025        let wrapped = sse::BroadcastObserver::new(
3026            Box::new(crate::observability::PrometheusObserver::new()),
3027            event_tx.clone(),
3028        );
3029        crate::observability::Observer::record_event(
3030            &wrapped,
3031            &crate::observability::ObserverEvent::HeartbeatTick,
3032        );
3033
3034        let observer: Arc<dyn crate::observability::Observer> = Arc::new(wrapped);
3035        let state = AppState {
3036            config: Arc::new(Mutex::new(Config::default())),
3037            provider: Arc::new(MockProvider::default()),
3038            model: "test-model".into(),
3039            temperature: 0.0,
3040            mem: Arc::new(MockMemory),
3041            auto_save: false,
3042            webhook_secret_hash: None,
3043            pairing: Arc::new(PairingGuard::new(false, &[])),
3044            trust_forwarded_headers: false,
3045            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3046            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3047            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3048            whatsapp: None,
3049            whatsapp_app_secret: None,
3050            linq: None,
3051            linq_signing_secret: None,
3052            nextcloud_talk: None,
3053            nextcloud_talk_webhook_secret: None,
3054            wati: None,
3055            gmail_push: None,
3056            observer,
3057            tools_registry: Arc::new(Vec::new()),
3058            cost_tracker: None,
3059            audit_logger: None,
3060            event_tx,
3061            shutdown_tx: tokio::sync::watch::channel(false).0,
3062            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3063            path_prefix: String::new(),
3064            session_backend: None,
3065            session_queue: std::sync::Arc::new(
3066                crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3067            ),
3068            device_registry: None,
3069            pending_pairings: None,
3070            canvas_store: CanvasStore::new(),
3071            mcp_registry: None,
3072            approval_registry: approval_registry::global(),
3073            mcp_local_url: None,
3074            #[cfg(feature = "webauthn")]
3075            webauthn: None,
3076        };
3077
3078        let response = handle_metrics(State(state)).await.into_response();
3079        assert_eq!(response.status(), StatusCode::OK);
3080
3081        let body = response.into_body().collect().await.unwrap().to_bytes();
3082        let text = String::from_utf8(body.to_vec()).unwrap();
3083        assert!(text.contains("construct_heartbeat_ticks_total 1"));
3084    }
3085
3086    #[test]
3087    fn gateway_rate_limiter_blocks_after_limit() {
3088        let limiter = GatewayRateLimiter::new(2, 2, 100);
3089        assert!(limiter.allow_pair("127.0.0.1"));
3090        assert!(limiter.allow_pair("127.0.0.1"));
3091        assert!(!limiter.allow_pair("127.0.0.1"));
3092    }
3093
3094    #[test]
3095    fn rate_limiter_sweep_removes_stale_entries() {
3096        let limiter = SlidingWindowRateLimiter::new(10, Duration::from_secs(60), 100);
3097        // Add entries for multiple IPs
3098        assert!(limiter.allow("ip-1"));
3099        assert!(limiter.allow("ip-2"));
3100        assert!(limiter.allow("ip-3"));
3101
3102        {
3103            let guard = limiter.requests.lock();
3104            assert_eq!(guard.0.len(), 3);
3105        }
3106
3107        // Force a sweep by backdating last_sweep
3108        {
3109            let mut guard = limiter.requests.lock();
3110            guard.1 = Instant::now()
3111                .checked_sub(Duration::from_secs(RATE_LIMITER_SWEEP_INTERVAL_SECS + 1))
3112                .unwrap();
3113            // Clear timestamps for ip-2 and ip-3 to simulate stale entries
3114            guard.0.get_mut("ip-2").unwrap().clear();
3115            guard.0.get_mut("ip-3").unwrap().clear();
3116        }
3117
3118        // Next allow() call should trigger sweep and remove stale entries
3119        assert!(limiter.allow("ip-1"));
3120
3121        {
3122            let guard = limiter.requests.lock();
3123            assert_eq!(guard.0.len(), 1, "Stale entries should have been swept");
3124            assert!(guard.0.contains_key("ip-1"));
3125        }
3126    }
3127
3128    #[test]
3129    fn rate_limiter_zero_limit_always_allows() {
3130        let limiter = SlidingWindowRateLimiter::new(0, Duration::from_secs(60), 10);
3131        for _ in 0..100 {
3132            assert!(limiter.allow("any-key"));
3133        }
3134    }
3135
3136    #[test]
3137    fn idempotency_store_rejects_duplicate_key() {
3138        let store = IdempotencyStore::new(Duration::from_secs(30), 10);
3139        assert!(store.record_if_new("req-1"));
3140        assert!(!store.record_if_new("req-1"));
3141        assert!(store.record_if_new("req-2"));
3142    }
3143
3144    #[test]
3145    fn rate_limiter_bounded_cardinality_evicts_oldest_key() {
3146        let limiter = SlidingWindowRateLimiter::new(5, Duration::from_secs(60), 2);
3147        assert!(limiter.allow("ip-1"));
3148        assert!(limiter.allow("ip-2"));
3149        assert!(limiter.allow("ip-3"));
3150
3151        let guard = limiter.requests.lock();
3152        assert_eq!(guard.0.len(), 2);
3153        assert!(guard.0.contains_key("ip-2"));
3154        assert!(guard.0.contains_key("ip-3"));
3155    }
3156
3157    #[test]
3158    fn idempotency_store_bounded_cardinality_evicts_oldest_key() {
3159        let store = IdempotencyStore::new(Duration::from_secs(300), 2);
3160        assert!(store.record_if_new("k1"));
3161        std::thread::sleep(Duration::from_millis(2));
3162        assert!(store.record_if_new("k2"));
3163        std::thread::sleep(Duration::from_millis(2));
3164        assert!(store.record_if_new("k3"));
3165
3166        let keys = store.keys.lock();
3167        assert_eq!(keys.len(), 2);
3168        assert!(!keys.contains_key("k1"));
3169        assert!(keys.contains_key("k2"));
3170        assert!(keys.contains_key("k3"));
3171    }
3172
3173    #[test]
3174    fn client_key_defaults_to_peer_addr_when_untrusted_proxy_mode() {
3175        let peer = SocketAddr::from(([10, 0, 0, 5], 42617));
3176        let mut headers = HeaderMap::new();
3177        headers.insert(
3178            "X-Forwarded-For",
3179            HeaderValue::from_static("198.51.100.10, 203.0.113.11"),
3180        );
3181
3182        let key = client_key_from_request(Some(peer), &headers, false);
3183        assert_eq!(key, "10.0.0.5");
3184    }
3185
3186    #[test]
3187    fn client_key_uses_forwarded_ip_only_in_trusted_proxy_mode() {
3188        // Rightmost XFF hop is the one appended by our trusted upstream proxy;
3189        // leftmost values are attacker-controlled (clients send whatever).
3190        let peer = SocketAddr::from(([10, 0, 0, 5], 42617));
3191        let mut headers = HeaderMap::new();
3192        headers.insert(
3193            "X-Forwarded-For",
3194            HeaderValue::from_static("198.51.100.10, 203.0.113.11"),
3195        );
3196
3197        let key = client_key_from_request(Some(peer), &headers, true);
3198        assert_eq!(key, "203.0.113.11");
3199    }
3200
3201    #[test]
3202    fn client_key_spoofed_leftmost_xff_does_not_bypass_trust() {
3203        // Attacker sets `X-Forwarded-For: 127.0.0.1, <legit-upstream>`. The
3204        // rightmost (trusted proxy's appended value) must win — if we took
3205        // the leftmost, a remote attacker could spoof loopback and evade
3206        // rate limits / lockouts.
3207        let peer = SocketAddr::from(([10, 0, 0, 5], 42617));
3208        let mut headers = HeaderMap::new();
3209        headers.insert(
3210            "X-Forwarded-For",
3211            HeaderValue::from_static("127.0.0.1, 203.0.113.11"),
3212        );
3213
3214        let key = client_key_from_request(Some(peer), &headers, true);
3215        assert_eq!(key, "203.0.113.11");
3216    }
3217
3218    #[test]
3219    fn client_key_falls_back_to_peer_when_forwarded_header_invalid() {
3220        let peer = SocketAddr::from(([10, 0, 0, 5], 42617));
3221        let mut headers = HeaderMap::new();
3222        headers.insert("X-Forwarded-For", HeaderValue::from_static("garbage-value"));
3223
3224        let key = client_key_from_request(Some(peer), &headers, true);
3225        assert_eq!(key, "10.0.0.5");
3226    }
3227
3228    #[test]
3229    fn normalize_max_keys_uses_fallback_for_zero() {
3230        assert_eq!(normalize_max_keys(0, 10_000), 10_000);
3231        assert_eq!(normalize_max_keys(0, 0), 1);
3232    }
3233
3234    #[test]
3235    fn normalize_max_keys_preserves_nonzero_values() {
3236        assert_eq!(normalize_max_keys(2_048, 10_000), 2_048);
3237        assert_eq!(normalize_max_keys(1, 10_000), 1);
3238    }
3239
3240    #[tokio::test]
3241    async fn persist_pairing_tokens_writes_config_tokens() {
3242        let temp = tempfile::tempdir().unwrap();
3243        let config_path = temp.path().join("config.toml");
3244        let workspace_path = temp.path().join("workspace");
3245
3246        let mut config = Config::default();
3247        config.config_path = config_path.clone();
3248        config.workspace_dir = workspace_path;
3249        config.save().await.unwrap();
3250
3251        let guard = PairingGuard::new(true, &[]);
3252        let code = guard.pairing_code().unwrap();
3253        let token = guard.try_pair(&code, "test_client").await.unwrap().unwrap();
3254        assert!(guard.is_authenticated(&token));
3255
3256        let shared_config = Arc::new(Mutex::new(config));
3257        Box::pin(persist_pairing_tokens(shared_config.clone(), &guard))
3258            .await
3259            .unwrap();
3260
3261        // In-memory tokens should remain as plaintext 64-char hex hashes.
3262        let plaintext = {
3263            let in_memory = shared_config.lock();
3264            assert_eq!(in_memory.gateway.paired_tokens.len(), 1);
3265            in_memory.gateway.paired_tokens[0].clone()
3266        };
3267        assert_eq!(plaintext.len(), 64);
3268        assert!(plaintext.chars().all(|c: char| c.is_ascii_hexdigit()));
3269
3270        // On disk, the token should be encrypted (secrets.encrypt defaults to true).
3271        let saved = tokio::fs::read_to_string(config_path).await.unwrap();
3272        let raw_parsed: Config = toml::from_str(&saved).unwrap();
3273        assert_eq!(raw_parsed.gateway.paired_tokens.len(), 1);
3274        let on_disk = &raw_parsed.gateway.paired_tokens[0];
3275        assert!(
3276            crate::security::SecretStore::is_encrypted(on_disk),
3277            "paired_token should be encrypted on disk"
3278        );
3279    }
3280
3281    #[test]
3282    fn webhook_memory_key_is_unique() {
3283        let key1 = webhook_memory_key();
3284        let key2 = webhook_memory_key();
3285
3286        assert!(key1.starts_with("webhook_msg_"));
3287        assert!(key2.starts_with("webhook_msg_"));
3288        assert_ne!(key1, key2);
3289    }
3290
3291    #[test]
3292    fn whatsapp_memory_key_includes_sender_and_message_id() {
3293        let msg = ChannelMessage {
3294            id: "wamid-123".into(),
3295            sender: "+1234567890".into(),
3296            reply_target: "+1234567890".into(),
3297            content: "hello".into(),
3298            channel: "whatsapp".into(),
3299            timestamp: 1,
3300            thread_ts: None,
3301            interruption_scope_id: None,
3302            attachments: vec![],
3303        };
3304
3305        let key = whatsapp_memory_key(&msg);
3306        assert_eq!(key, "whatsapp_+1234567890_wamid-123");
3307    }
3308
3309    #[derive(Default)]
3310    struct MockMemory;
3311
3312    #[async_trait]
3313    impl Memory for MockMemory {
3314        fn name(&self) -> &str {
3315            "mock"
3316        }
3317
3318        async fn store(
3319            &self,
3320            _key: &str,
3321            _content: &str,
3322            _category: MemoryCategory,
3323            _session_id: Option<&str>,
3324        ) -> anyhow::Result<()> {
3325            Ok(())
3326        }
3327
3328        async fn recall(
3329            &self,
3330            _query: &str,
3331            _limit: usize,
3332            _session_id: Option<&str>,
3333            _since: Option<&str>,
3334            _until: Option<&str>,
3335        ) -> anyhow::Result<Vec<MemoryEntry>> {
3336            Ok(Vec::new())
3337        }
3338
3339        async fn get(&self, _key: &str) -> anyhow::Result<Option<MemoryEntry>> {
3340            Ok(None)
3341        }
3342
3343        async fn list(
3344            &self,
3345            _category: Option<&MemoryCategory>,
3346            _session_id: Option<&str>,
3347        ) -> anyhow::Result<Vec<MemoryEntry>> {
3348            Ok(Vec::new())
3349        }
3350
3351        async fn forget(&self, _key: &str) -> anyhow::Result<bool> {
3352            Ok(false)
3353        }
3354
3355        async fn count(&self) -> anyhow::Result<usize> {
3356            Ok(0)
3357        }
3358
3359        async fn health_check(&self) -> bool {
3360            true
3361        }
3362    }
3363
3364    #[derive(Default)]
3365    struct MockProvider {
3366        calls: AtomicUsize,
3367    }
3368
3369    #[async_trait]
3370    impl Provider for MockProvider {
3371        async fn chat_with_system(
3372            &self,
3373            _system_prompt: Option<&str>,
3374            _message: &str,
3375            _model: &str,
3376            _temperature: f64,
3377        ) -> anyhow::Result<String> {
3378            self.calls.fetch_add(1, Ordering::SeqCst);
3379            Ok("ok".into())
3380        }
3381    }
3382
3383    #[derive(Default)]
3384    struct TrackingMemory {
3385        keys: Mutex<Vec<String>>,
3386    }
3387
3388    #[async_trait]
3389    impl Memory for TrackingMemory {
3390        fn name(&self) -> &str {
3391            "tracking"
3392        }
3393
3394        async fn store(
3395            &self,
3396            key: &str,
3397            _content: &str,
3398            _category: MemoryCategory,
3399            _session_id: Option<&str>,
3400        ) -> anyhow::Result<()> {
3401            self.keys.lock().push(key.to_string());
3402            Ok(())
3403        }
3404
3405        async fn recall(
3406            &self,
3407            _query: &str,
3408            _limit: usize,
3409            _session_id: Option<&str>,
3410            _since: Option<&str>,
3411            _until: Option<&str>,
3412        ) -> anyhow::Result<Vec<MemoryEntry>> {
3413            Ok(Vec::new())
3414        }
3415
3416        async fn get(&self, _key: &str) -> anyhow::Result<Option<MemoryEntry>> {
3417            Ok(None)
3418        }
3419
3420        async fn list(
3421            &self,
3422            _category: Option<&MemoryCategory>,
3423            _session_id: Option<&str>,
3424        ) -> anyhow::Result<Vec<MemoryEntry>> {
3425            Ok(Vec::new())
3426        }
3427
3428        async fn forget(&self, _key: &str) -> anyhow::Result<bool> {
3429            Ok(false)
3430        }
3431
3432        async fn count(&self) -> anyhow::Result<usize> {
3433            let size = self.keys.lock().len();
3434            Ok(size)
3435        }
3436
3437        async fn health_check(&self) -> bool {
3438            true
3439        }
3440    }
3441
3442    fn test_connect_info() -> ConnectInfo<SocketAddr> {
3443        ConnectInfo(SocketAddr::from(([127, 0, 0, 1], 30_300)))
3444    }
3445
3446    #[tokio::test]
3447    async fn webhook_idempotency_skips_duplicate_provider_calls() {
3448        let provider_impl = Arc::new(MockProvider::default());
3449        let provider: Arc<dyn Provider> = provider_impl.clone();
3450        let memory: Arc<dyn Memory> = Arc::new(MockMemory);
3451
3452        let state = AppState {
3453            config: Arc::new(Mutex::new(Config::default())),
3454            provider,
3455            model: "test-model".into(),
3456            temperature: 0.0,
3457            mem: memory,
3458            auto_save: false,
3459            webhook_secret_hash: None,
3460            pairing: Arc::new(PairingGuard::new(false, &[])),
3461            trust_forwarded_headers: false,
3462            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3463            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3464            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3465            whatsapp: None,
3466            whatsapp_app_secret: None,
3467            linq: None,
3468            linq_signing_secret: None,
3469            nextcloud_talk: None,
3470            nextcloud_talk_webhook_secret: None,
3471            wati: None,
3472            gmail_push: None,
3473            observer: Arc::new(crate::observability::NoopObserver),
3474            tools_registry: Arc::new(Vec::new()),
3475            cost_tracker: None,
3476            audit_logger: None,
3477            event_tx: tokio::sync::broadcast::channel(16).0,
3478            shutdown_tx: tokio::sync::watch::channel(false).0,
3479            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3480            path_prefix: String::new(),
3481            session_backend: None,
3482            session_queue: std::sync::Arc::new(
3483                crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3484            ),
3485            device_registry: None,
3486            pending_pairings: None,
3487            canvas_store: CanvasStore::new(),
3488            mcp_registry: None,
3489            approval_registry: approval_registry::global(),
3490            mcp_local_url: None,
3491            #[cfg(feature = "webauthn")]
3492            webauthn: None,
3493        };
3494
3495        let mut headers = HeaderMap::new();
3496        headers.insert("X-Idempotency-Key", HeaderValue::from_static("abc-123"));
3497
3498        let body = Ok(Json(WebhookBody {
3499            message: "hello".into(),
3500        }));
3501        let first = handle_webhook(
3502            State(state.clone()),
3503            test_connect_info(),
3504            headers.clone(),
3505            body,
3506        )
3507        .await
3508        .into_response();
3509        assert_eq!(first.status(), StatusCode::OK);
3510
3511        let body = Ok(Json(WebhookBody {
3512            message: "hello".into(),
3513        }));
3514        let second = handle_webhook(State(state), test_connect_info(), headers, body)
3515            .await
3516            .into_response();
3517        assert_eq!(second.status(), StatusCode::OK);
3518
3519        let payload = second.into_body().collect().await.unwrap().to_bytes();
3520        let parsed: serde_json::Value = serde_json::from_slice(&payload).unwrap();
3521        assert_eq!(parsed["status"], "duplicate");
3522        assert_eq!(parsed["idempotent"], true);
3523        assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 1);
3524    }
3525
3526    #[tokio::test]
3527    async fn webhook_autosave_stores_distinct_keys_per_request() {
3528        let provider_impl = Arc::new(MockProvider::default());
3529        let provider: Arc<dyn Provider> = provider_impl.clone();
3530
3531        let tracking_impl = Arc::new(TrackingMemory::default());
3532        let memory: Arc<dyn Memory> = tracking_impl.clone();
3533
3534        let state = AppState {
3535            config: Arc::new(Mutex::new(Config::default())),
3536            provider,
3537            model: "test-model".into(),
3538            temperature: 0.0,
3539            mem: memory,
3540            auto_save: true,
3541            webhook_secret_hash: None,
3542            pairing: Arc::new(PairingGuard::new(false, &[])),
3543            trust_forwarded_headers: false,
3544            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3545            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3546            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3547            whatsapp: None,
3548            whatsapp_app_secret: None,
3549            linq: None,
3550            linq_signing_secret: None,
3551            nextcloud_talk: None,
3552            nextcloud_talk_webhook_secret: None,
3553            wati: None,
3554            gmail_push: None,
3555            observer: Arc::new(crate::observability::NoopObserver),
3556            tools_registry: Arc::new(Vec::new()),
3557            cost_tracker: None,
3558            audit_logger: None,
3559            event_tx: tokio::sync::broadcast::channel(16).0,
3560            shutdown_tx: tokio::sync::watch::channel(false).0,
3561            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3562            path_prefix: String::new(),
3563            session_backend: None,
3564            session_queue: std::sync::Arc::new(
3565                crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3566            ),
3567            device_registry: None,
3568            pending_pairings: None,
3569            canvas_store: CanvasStore::new(),
3570            mcp_registry: None,
3571            approval_registry: approval_registry::global(),
3572            mcp_local_url: None,
3573            #[cfg(feature = "webauthn")]
3574            webauthn: None,
3575        };
3576
3577        let headers = HeaderMap::new();
3578
3579        let body1 = Ok(Json(WebhookBody {
3580            message: "hello one".into(),
3581        }));
3582        let first = handle_webhook(
3583            State(state.clone()),
3584            test_connect_info(),
3585            headers.clone(),
3586            body1,
3587        )
3588        .await
3589        .into_response();
3590        assert_eq!(first.status(), StatusCode::OK);
3591
3592        let body2 = Ok(Json(WebhookBody {
3593            message: "hello two".into(),
3594        }));
3595        let second = handle_webhook(State(state), test_connect_info(), headers, body2)
3596            .await
3597            .into_response();
3598        assert_eq!(second.status(), StatusCode::OK);
3599
3600        let keys = tracking_impl.keys.lock().clone();
3601        assert_eq!(keys.len(), 2);
3602        assert_ne!(keys[0], keys[1]);
3603        assert!(keys[0].starts_with("webhook_msg_"));
3604        assert!(keys[1].starts_with("webhook_msg_"));
3605        assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 2);
3606    }
3607
3608    #[test]
3609    fn webhook_secret_hash_is_deterministic_and_nonempty() {
3610        let secret_a = generate_test_secret();
3611        let secret_b = generate_test_secret();
3612        let one = hash_webhook_secret(&secret_a);
3613        let two = hash_webhook_secret(&secret_a);
3614        let other = hash_webhook_secret(&secret_b);
3615
3616        assert_eq!(one, two);
3617        assert_ne!(one, other);
3618        assert_eq!(one.len(), 64);
3619    }
3620
3621    #[tokio::test]
3622    async fn webhook_secret_hash_rejects_missing_header() {
3623        let provider_impl = Arc::new(MockProvider::default());
3624        let provider: Arc<dyn Provider> = provider_impl.clone();
3625        let memory: Arc<dyn Memory> = Arc::new(MockMemory);
3626        let secret = generate_test_secret();
3627
3628        let state = AppState {
3629            config: Arc::new(Mutex::new(Config::default())),
3630            provider,
3631            model: "test-model".into(),
3632            temperature: 0.0,
3633            mem: memory,
3634            auto_save: false,
3635            webhook_secret_hash: Some(Arc::from(hash_webhook_secret(&secret))),
3636            pairing: Arc::new(PairingGuard::new(false, &[])),
3637            trust_forwarded_headers: false,
3638            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3639            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3640            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3641            whatsapp: None,
3642            whatsapp_app_secret: None,
3643            linq: None,
3644            linq_signing_secret: None,
3645            nextcloud_talk: None,
3646            nextcloud_talk_webhook_secret: None,
3647            wati: None,
3648            gmail_push: None,
3649            observer: Arc::new(crate::observability::NoopObserver),
3650            tools_registry: Arc::new(Vec::new()),
3651            cost_tracker: None,
3652            audit_logger: None,
3653            event_tx: tokio::sync::broadcast::channel(16).0,
3654            shutdown_tx: tokio::sync::watch::channel(false).0,
3655            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3656            path_prefix: String::new(),
3657            session_backend: None,
3658            session_queue: std::sync::Arc::new(
3659                crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3660            ),
3661            device_registry: None,
3662            pending_pairings: None,
3663            canvas_store: CanvasStore::new(),
3664            mcp_registry: None,
3665            approval_registry: approval_registry::global(),
3666            mcp_local_url: None,
3667            #[cfg(feature = "webauthn")]
3668            webauthn: None,
3669        };
3670
3671        let response = handle_webhook(
3672            State(state),
3673            test_connect_info(),
3674            HeaderMap::new(),
3675            Ok(Json(WebhookBody {
3676                message: "hello".into(),
3677            })),
3678        )
3679        .await
3680        .into_response();
3681
3682        assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
3683        assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 0);
3684    }
3685
3686    #[tokio::test]
3687    async fn webhook_secret_hash_rejects_invalid_header() {
3688        let provider_impl = Arc::new(MockProvider::default());
3689        let provider: Arc<dyn Provider> = provider_impl.clone();
3690        let memory: Arc<dyn Memory> = Arc::new(MockMemory);
3691        let valid_secret = generate_test_secret();
3692        let wrong_secret = generate_test_secret();
3693
3694        let state = AppState {
3695            config: Arc::new(Mutex::new(Config::default())),
3696            provider,
3697            model: "test-model".into(),
3698            temperature: 0.0,
3699            mem: memory,
3700            auto_save: false,
3701            webhook_secret_hash: Some(Arc::from(hash_webhook_secret(&valid_secret))),
3702            pairing: Arc::new(PairingGuard::new(false, &[])),
3703            trust_forwarded_headers: false,
3704            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3705            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3706            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3707            whatsapp: None,
3708            whatsapp_app_secret: None,
3709            linq: None,
3710            linq_signing_secret: None,
3711            nextcloud_talk: None,
3712            nextcloud_talk_webhook_secret: None,
3713            wati: None,
3714            gmail_push: None,
3715            observer: Arc::new(crate::observability::NoopObserver),
3716            tools_registry: Arc::new(Vec::new()),
3717            cost_tracker: None,
3718            audit_logger: None,
3719            event_tx: tokio::sync::broadcast::channel(16).0,
3720            shutdown_tx: tokio::sync::watch::channel(false).0,
3721            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3722            path_prefix: String::new(),
3723            session_backend: None,
3724            session_queue: std::sync::Arc::new(
3725                crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3726            ),
3727            device_registry: None,
3728            pending_pairings: None,
3729            canvas_store: CanvasStore::new(),
3730            mcp_registry: None,
3731            approval_registry: approval_registry::global(),
3732            mcp_local_url: None,
3733            #[cfg(feature = "webauthn")]
3734            webauthn: None,
3735        };
3736
3737        let mut headers = HeaderMap::new();
3738        headers.insert(
3739            "X-Webhook-Secret",
3740            HeaderValue::from_str(&wrong_secret).unwrap(),
3741        );
3742
3743        let response = handle_webhook(
3744            State(state),
3745            test_connect_info(),
3746            headers,
3747            Ok(Json(WebhookBody {
3748                message: "hello".into(),
3749            })),
3750        )
3751        .await
3752        .into_response();
3753
3754        assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
3755        assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 0);
3756    }
3757
3758    #[tokio::test]
3759    async fn webhook_secret_hash_accepts_valid_header() {
3760        let provider_impl = Arc::new(MockProvider::default());
3761        let provider: Arc<dyn Provider> = provider_impl.clone();
3762        let memory: Arc<dyn Memory> = Arc::new(MockMemory);
3763        let secret = generate_test_secret();
3764
3765        let state = AppState {
3766            config: Arc::new(Mutex::new(Config::default())),
3767            provider,
3768            model: "test-model".into(),
3769            temperature: 0.0,
3770            mem: memory,
3771            auto_save: false,
3772            webhook_secret_hash: Some(Arc::from(hash_webhook_secret(&secret))),
3773            pairing: Arc::new(PairingGuard::new(false, &[])),
3774            trust_forwarded_headers: false,
3775            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3776            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3777            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3778            whatsapp: None,
3779            whatsapp_app_secret: None,
3780            linq: None,
3781            linq_signing_secret: None,
3782            nextcloud_talk: None,
3783            nextcloud_talk_webhook_secret: None,
3784            wati: None,
3785            gmail_push: None,
3786            observer: Arc::new(crate::observability::NoopObserver),
3787            tools_registry: Arc::new(Vec::new()),
3788            cost_tracker: None,
3789            audit_logger: None,
3790            event_tx: tokio::sync::broadcast::channel(16).0,
3791            shutdown_tx: tokio::sync::watch::channel(false).0,
3792            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3793            path_prefix: String::new(),
3794            session_backend: None,
3795            session_queue: std::sync::Arc::new(
3796                crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3797            ),
3798            device_registry: None,
3799            pending_pairings: None,
3800            canvas_store: CanvasStore::new(),
3801            mcp_registry: None,
3802            approval_registry: approval_registry::global(),
3803            mcp_local_url: None,
3804            #[cfg(feature = "webauthn")]
3805            webauthn: None,
3806        };
3807
3808        let mut headers = HeaderMap::new();
3809        headers.insert("X-Webhook-Secret", HeaderValue::from_str(&secret).unwrap());
3810
3811        let response = handle_webhook(
3812            State(state),
3813            test_connect_info(),
3814            headers,
3815            Ok(Json(WebhookBody {
3816                message: "hello".into(),
3817            })),
3818        )
3819        .await
3820        .into_response();
3821
3822        assert_eq!(response.status(), StatusCode::OK);
3823        assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 1);
3824    }
3825
3826    fn compute_nextcloud_signature_hex(secret: &str, random: &str, body: &str) -> String {
3827        use hmac::{Hmac, Mac};
3828        use sha2::Sha256;
3829
3830        let payload = format!("{random}{body}");
3831        let mut mac = Hmac::<Sha256>::new_from_slice(secret.as_bytes()).unwrap();
3832        mac.update(payload.as_bytes());
3833        hex::encode(mac.finalize().into_bytes())
3834    }
3835
3836    #[tokio::test]
3837    async fn nextcloud_talk_webhook_returns_not_found_when_not_configured() {
3838        let provider: Arc<dyn Provider> = Arc::new(MockProvider::default());
3839        let memory: Arc<dyn Memory> = Arc::new(MockMemory);
3840
3841        let state = AppState {
3842            config: Arc::new(Mutex::new(Config::default())),
3843            provider,
3844            model: "test-model".into(),
3845            temperature: 0.0,
3846            mem: memory,
3847            auto_save: false,
3848            webhook_secret_hash: None,
3849            pairing: Arc::new(PairingGuard::new(false, &[])),
3850            trust_forwarded_headers: false,
3851            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3852            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3853            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3854            whatsapp: None,
3855            whatsapp_app_secret: None,
3856            linq: None,
3857            linq_signing_secret: None,
3858            nextcloud_talk: None,
3859            nextcloud_talk_webhook_secret: None,
3860            wati: None,
3861            gmail_push: None,
3862            observer: Arc::new(crate::observability::NoopObserver),
3863            tools_registry: Arc::new(Vec::new()),
3864            cost_tracker: None,
3865            audit_logger: None,
3866            event_tx: tokio::sync::broadcast::channel(16).0,
3867            shutdown_tx: tokio::sync::watch::channel(false).0,
3868            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3869            path_prefix: String::new(),
3870            session_backend: None,
3871            session_queue: std::sync::Arc::new(
3872                crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3873            ),
3874            device_registry: None,
3875            pending_pairings: None,
3876            canvas_store: CanvasStore::new(),
3877            mcp_registry: None,
3878            approval_registry: approval_registry::global(),
3879            mcp_local_url: None,
3880            #[cfg(feature = "webauthn")]
3881            webauthn: None,
3882        };
3883
3884        let response = Box::pin(handle_nextcloud_talk_webhook(
3885            State(state),
3886            HeaderMap::new(),
3887            Bytes::from_static(br#"{"type":"message"}"#),
3888        ))
3889        .await
3890        .into_response();
3891
3892        assert_eq!(response.status(), StatusCode::NOT_FOUND);
3893    }
3894
3895    #[tokio::test]
3896    async fn nextcloud_talk_webhook_rejects_invalid_signature() {
3897        let provider_impl = Arc::new(MockProvider::default());
3898        let provider: Arc<dyn Provider> = provider_impl.clone();
3899        let memory: Arc<dyn Memory> = Arc::new(MockMemory);
3900
3901        let channel = Arc::new(NextcloudTalkChannel::new(
3902            "https://cloud.example.com".into(),
3903            "app-token".into(),
3904            String::new(),
3905            vec!["*".into()],
3906        ));
3907
3908        let secret = "nextcloud-test-secret";
3909        let random = "seed-value";
3910        let body = r#"{"type":"message","object":{"token":"room-token"},"message":{"actorType":"users","actorId":"user_a","message":"hello"}}"#;
3911        let _valid_signature = compute_nextcloud_signature_hex(secret, random, body);
3912        let invalid_signature = "deadbeef";
3913
3914        let state = AppState {
3915            config: Arc::new(Mutex::new(Config::default())),
3916            provider,
3917            model: "test-model".into(),
3918            temperature: 0.0,
3919            mem: memory,
3920            auto_save: false,
3921            webhook_secret_hash: None,
3922            pairing: Arc::new(PairingGuard::new(false, &[])),
3923            trust_forwarded_headers: false,
3924            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3925            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3926            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3927            whatsapp: None,
3928            whatsapp_app_secret: None,
3929            linq: None,
3930            linq_signing_secret: None,
3931            nextcloud_talk: Some(channel),
3932            nextcloud_talk_webhook_secret: Some(Arc::from(secret)),
3933            wati: None,
3934            gmail_push: None,
3935            observer: Arc::new(crate::observability::NoopObserver),
3936            tools_registry: Arc::new(Vec::new()),
3937            cost_tracker: None,
3938            audit_logger: None,
3939            event_tx: tokio::sync::broadcast::channel(16).0,
3940            shutdown_tx: tokio::sync::watch::channel(false).0,
3941            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3942            path_prefix: String::new(),
3943            session_backend: None,
3944            session_queue: std::sync::Arc::new(
3945                crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3946            ),
3947            device_registry: None,
3948            pending_pairings: None,
3949            canvas_store: CanvasStore::new(),
3950            mcp_registry: None,
3951            approval_registry: approval_registry::global(),
3952            mcp_local_url: None,
3953            #[cfg(feature = "webauthn")]
3954            webauthn: None,
3955        };
3956
3957        let mut headers = HeaderMap::new();
3958        headers.insert(
3959            "X-Nextcloud-Talk-Random",
3960            HeaderValue::from_str(random).unwrap(),
3961        );
3962        headers.insert(
3963            "X-Nextcloud-Talk-Signature",
3964            HeaderValue::from_str(invalid_signature).unwrap(),
3965        );
3966
3967        let response = Box::pin(handle_nextcloud_talk_webhook(
3968            State(state),
3969            headers,
3970            Bytes::from(body),
3971        ))
3972        .await
3973        .into_response();
3974        assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
3975        assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 0);
3976    }
3977
3978    // ══════════════════════════════════════════════════════════
3979    // WhatsApp Signature Verification Tests (CWE-345 Prevention)
3980    // ══════════════════════════════════════════════════════════
3981
3982    fn compute_whatsapp_signature_hex(secret: &str, body: &[u8]) -> String {
3983        use hmac::{Hmac, Mac};
3984        use sha2::Sha256;
3985
3986        let mut mac = Hmac::<Sha256>::new_from_slice(secret.as_bytes()).unwrap();
3987        mac.update(body);
3988        hex::encode(mac.finalize().into_bytes())
3989    }
3990
3991    fn compute_whatsapp_signature_header(secret: &str, body: &[u8]) -> String {
3992        format!("sha256={}", compute_whatsapp_signature_hex(secret, body))
3993    }
3994
3995    #[test]
3996    fn whatsapp_signature_valid() {
3997        let app_secret = generate_test_secret();
3998        let body = b"test body content";
3999
4000        let signature_header = compute_whatsapp_signature_header(&app_secret, body);
4001
4002        assert!(verify_whatsapp_signature(
4003            &app_secret,
4004            body,
4005            &signature_header
4006        ));
4007    }
4008
4009    #[test]
4010    fn whatsapp_signature_invalid_wrong_secret() {
4011        let app_secret = generate_test_secret();
4012        let wrong_secret = generate_test_secret();
4013        let body = b"test body content";
4014
4015        let signature_header = compute_whatsapp_signature_header(&wrong_secret, body);
4016
4017        assert!(!verify_whatsapp_signature(
4018            &app_secret,
4019            body,
4020            &signature_header
4021        ));
4022    }
4023
4024    #[test]
4025    fn whatsapp_signature_invalid_wrong_body() {
4026        let app_secret = generate_test_secret();
4027        let original_body = b"original body";
4028        let tampered_body = b"tampered body";
4029
4030        let signature_header = compute_whatsapp_signature_header(&app_secret, original_body);
4031
4032        // Verify with tampered body should fail
4033        assert!(!verify_whatsapp_signature(
4034            &app_secret,
4035            tampered_body,
4036            &signature_header
4037        ));
4038    }
4039
4040    #[test]
4041    fn whatsapp_signature_missing_prefix() {
4042        let app_secret = generate_test_secret();
4043        let body = b"test body";
4044
4045        // Signature without "sha256=" prefix
4046        let signature_header = "abc123def456";
4047
4048        assert!(!verify_whatsapp_signature(
4049            &app_secret,
4050            body,
4051            signature_header
4052        ));
4053    }
4054
4055    #[test]
4056    fn whatsapp_signature_empty_header() {
4057        let app_secret = generate_test_secret();
4058        let body = b"test body";
4059
4060        assert!(!verify_whatsapp_signature(&app_secret, body, ""));
4061    }
4062
4063    #[test]
4064    fn whatsapp_signature_invalid_hex() {
4065        let app_secret = generate_test_secret();
4066        let body = b"test body";
4067
4068        // Invalid hex characters
4069        let signature_header = "sha256=not_valid_hex_zzz";
4070
4071        assert!(!verify_whatsapp_signature(
4072            &app_secret,
4073            body,
4074            signature_header
4075        ));
4076    }
4077
4078    #[test]
4079    fn whatsapp_signature_empty_body() {
4080        let app_secret = generate_test_secret();
4081        let body = b"";
4082
4083        let signature_header = compute_whatsapp_signature_header(&app_secret, body);
4084
4085        assert!(verify_whatsapp_signature(
4086            &app_secret,
4087            body,
4088            &signature_header
4089        ));
4090    }
4091
4092    #[test]
4093    fn whatsapp_signature_unicode_body() {
4094        let app_secret = generate_test_secret();
4095        let body = "Hello 🦀 World".as_bytes();
4096
4097        let signature_header = compute_whatsapp_signature_header(&app_secret, body);
4098
4099        assert!(verify_whatsapp_signature(
4100            &app_secret,
4101            body,
4102            &signature_header
4103        ));
4104    }
4105
4106    #[test]
4107    fn whatsapp_signature_json_payload() {
4108        let app_secret = generate_test_secret();
4109        let body = br#"{"entry":[{"changes":[{"value":{"messages":[{"from":"1234567890","text":{"body":"Hello"}}]}}]}]}"#;
4110
4111        let signature_header = compute_whatsapp_signature_header(&app_secret, body);
4112
4113        assert!(verify_whatsapp_signature(
4114            &app_secret,
4115            body,
4116            &signature_header
4117        ));
4118    }
4119
4120    #[test]
4121    fn whatsapp_signature_case_sensitive_prefix() {
4122        let app_secret = generate_test_secret();
4123        let body = b"test body";
4124
4125        let hex_sig = compute_whatsapp_signature_hex(&app_secret, body);
4126
4127        // Wrong case prefix should fail
4128        let wrong_prefix = format!("SHA256={hex_sig}");
4129        assert!(!verify_whatsapp_signature(&app_secret, body, &wrong_prefix));
4130
4131        // Correct prefix should pass
4132        let correct_prefix = format!("sha256={hex_sig}");
4133        assert!(verify_whatsapp_signature(
4134            &app_secret,
4135            body,
4136            &correct_prefix
4137        ));
4138    }
4139
4140    #[test]
4141    fn whatsapp_signature_truncated_hex() {
4142        let app_secret = generate_test_secret();
4143        let body = b"test body";
4144
4145        let hex_sig = compute_whatsapp_signature_hex(&app_secret, body);
4146        let truncated = &hex_sig[..32]; // Only half the signature
4147        let signature_header = format!("sha256={truncated}");
4148
4149        assert!(!verify_whatsapp_signature(
4150            &app_secret,
4151            body,
4152            &signature_header
4153        ));
4154    }
4155
4156    #[test]
4157    fn whatsapp_signature_extra_bytes() {
4158        let app_secret = generate_test_secret();
4159        let body = b"test body";
4160
4161        let hex_sig = compute_whatsapp_signature_hex(&app_secret, body);
4162        let extended = format!("{hex_sig}deadbeef");
4163        let signature_header = format!("sha256={extended}");
4164
4165        assert!(!verify_whatsapp_signature(
4166            &app_secret,
4167            body,
4168            &signature_header
4169        ));
4170    }
4171
4172    // ══════════════════════════════════════════════════════════
4173    // IdempotencyStore Edge-Case Tests
4174    // ══════════════════════════════════════════════════════════
4175
4176    #[test]
4177    fn idempotency_store_allows_different_keys() {
4178        let store = IdempotencyStore::new(Duration::from_secs(60), 100);
4179        assert!(store.record_if_new("key-a"));
4180        assert!(store.record_if_new("key-b"));
4181        assert!(store.record_if_new("key-c"));
4182        assert!(store.record_if_new("key-d"));
4183    }
4184
4185    #[test]
4186    fn idempotency_store_max_keys_clamped_to_one() {
4187        let store = IdempotencyStore::new(Duration::from_secs(60), 0);
4188        assert!(store.record_if_new("only-key"));
4189        assert!(!store.record_if_new("only-key"));
4190    }
4191
4192    #[test]
4193    fn idempotency_store_rapid_duplicate_rejected() {
4194        let store = IdempotencyStore::new(Duration::from_secs(300), 100);
4195        assert!(store.record_if_new("rapid"));
4196        assert!(!store.record_if_new("rapid"));
4197    }
4198
4199    #[test]
4200    fn idempotency_store_accepts_after_ttl_expires() {
4201        let store = IdempotencyStore::new(Duration::from_millis(1), 100);
4202        assert!(store.record_if_new("ttl-key"));
4203        std::thread::sleep(Duration::from_millis(10));
4204        assert!(store.record_if_new("ttl-key"));
4205    }
4206
4207    #[test]
4208    fn idempotency_store_eviction_preserves_newest() {
4209        let store = IdempotencyStore::new(Duration::from_secs(300), 1);
4210        assert!(store.record_if_new("old-key"));
4211        std::thread::sleep(Duration::from_millis(2));
4212        assert!(store.record_if_new("new-key"));
4213
4214        let keys = store.keys.lock();
4215        assert_eq!(keys.len(), 1);
4216        assert!(!keys.contains_key("old-key"));
4217        assert!(keys.contains_key("new-key"));
4218    }
4219
4220    #[test]
4221    fn rate_limiter_allows_after_window_expires() {
4222        let window = Duration::from_millis(50);
4223        let limiter = SlidingWindowRateLimiter::new(2, window, 100);
4224        assert!(limiter.allow("ip-1"));
4225        assert!(limiter.allow("ip-1"));
4226        assert!(!limiter.allow("ip-1")); // blocked
4227
4228        // Wait for window to expire
4229        std::thread::sleep(Duration::from_millis(60));
4230
4231        // Should be allowed again
4232        assert!(limiter.allow("ip-1"));
4233    }
4234
4235    #[test]
4236    fn rate_limiter_independent_keys_tracked_separately() {
4237        let limiter = SlidingWindowRateLimiter::new(2, Duration::from_secs(60), 100);
4238        assert!(limiter.allow("ip-1"));
4239        assert!(limiter.allow("ip-1"));
4240        assert!(!limiter.allow("ip-1")); // ip-1 blocked
4241
4242        // ip-2 should still work
4243        assert!(limiter.allow("ip-2"));
4244        assert!(limiter.allow("ip-2"));
4245        assert!(!limiter.allow("ip-2")); // ip-2 now blocked
4246    }
4247
4248    #[test]
4249    fn rate_limiter_exact_boundary_at_max_keys() {
4250        let limiter = SlidingWindowRateLimiter::new(10, Duration::from_secs(60), 3);
4251        assert!(limiter.allow("ip-1"));
4252        assert!(limiter.allow("ip-2"));
4253        assert!(limiter.allow("ip-3"));
4254        // At capacity now
4255        assert!(limiter.allow("ip-4")); // should evict ip-1
4256
4257        let guard = limiter.requests.lock();
4258        assert_eq!(guard.0.len(), 3);
4259        assert!(
4260            !guard.0.contains_key("ip-1"),
4261            "ip-1 should have been evicted"
4262        );
4263        assert!(guard.0.contains_key("ip-2"));
4264        assert!(guard.0.contains_key("ip-3"));
4265        assert!(guard.0.contains_key("ip-4"));
4266    }
4267
4268    #[test]
4269    fn gateway_rate_limiter_pair_and_webhook_are_independent() {
4270        let limiter = GatewayRateLimiter::new(2, 3, 100);
4271
4272        // Exhaust pair limit
4273        assert!(limiter.allow_pair("ip-1"));
4274        assert!(limiter.allow_pair("ip-1"));
4275        assert!(!limiter.allow_pair("ip-1")); // pair blocked
4276
4277        // Webhook should still work
4278        assert!(limiter.allow_webhook("ip-1"));
4279        assert!(limiter.allow_webhook("ip-1"));
4280        assert!(limiter.allow_webhook("ip-1"));
4281        assert!(!limiter.allow_webhook("ip-1")); // webhook now blocked
4282    }
4283
4284    #[test]
4285    fn rate_limiter_single_key_max_allows_one_request() {
4286        let limiter = SlidingWindowRateLimiter::new(5, Duration::from_secs(60), 1);
4287        assert!(limiter.allow("ip-1"));
4288        assert!(limiter.allow("ip-2")); // evicts ip-1
4289
4290        let guard = limiter.requests.lock();
4291        assert_eq!(guard.0.len(), 1);
4292        assert!(guard.0.contains_key("ip-2"));
4293        assert!(!guard.0.contains_key("ip-1"));
4294    }
4295
4296    #[test]
4297    fn rate_limiter_concurrent_access_safe() {
4298        use std::sync::Arc;
4299
4300        let limiter = Arc::new(SlidingWindowRateLimiter::new(
4301            1000,
4302            Duration::from_secs(60),
4303            1000,
4304        ));
4305        let mut handles = Vec::new();
4306
4307        for i in 0..10 {
4308            let limiter = limiter.clone();
4309            handles.push(std::thread::spawn(move || {
4310                for j in 0..100 {
4311                    limiter.allow(&format!("thread-{i}-req-{j}"));
4312                }
4313            }));
4314        }
4315
4316        for handle in handles {
4317            handle.join().unwrap();
4318        }
4319
4320        // Should not panic or deadlock
4321        let guard = limiter.requests.lock();
4322        assert!(guard.0.len() <= 1000, "should respect max_keys");
4323    }
4324
4325    #[test]
4326    fn idempotency_store_concurrent_access_safe() {
4327        use std::sync::Arc;
4328
4329        let store = Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000));
4330        let mut handles = Vec::new();
4331
4332        for i in 0..10 {
4333            let store = store.clone();
4334            handles.push(std::thread::spawn(move || {
4335                for j in 0..100 {
4336                    store.record_if_new(&format!("thread-{i}-key-{j}"));
4337                }
4338            }));
4339        }
4340
4341        for handle in handles {
4342            handle.join().unwrap();
4343        }
4344
4345        let keys = store.keys.lock();
4346        assert!(keys.len() <= 1000, "should respect max_keys");
4347    }
4348
4349    #[test]
4350    fn rate_limiter_rapid_burst_then_cooldown() {
4351        let limiter = SlidingWindowRateLimiter::new(5, Duration::from_millis(50), 100);
4352
4353        // Burst: use all 5 requests
4354        for _ in 0..5 {
4355            assert!(limiter.allow("burst-ip"));
4356        }
4357        assert!(!limiter.allow("burst-ip")); // 6th should fail
4358
4359        // Cooldown
4360        std::thread::sleep(Duration::from_millis(60));
4361
4362        // Should be allowed again
4363        assert!(limiter.allow("burst-ip"));
4364    }
4365
4366    #[test]
4367    fn require_localhost_accepts_ipv4_loopback() {
4368        let peer = SocketAddr::from(([127, 0, 0, 1], 12345));
4369        assert!(require_localhost(&peer).is_ok());
4370    }
4371
4372    #[test]
4373    fn require_localhost_accepts_ipv6_loopback() {
4374        let peer = SocketAddr::from((std::net::Ipv6Addr::LOCALHOST, 12345));
4375        assert!(require_localhost(&peer).is_ok());
4376    }
4377
4378    #[test]
4379    fn require_localhost_rejects_non_loopback_ipv4() {
4380        let peer = SocketAddr::from(([192, 168, 1, 100], 12345));
4381        let err = require_localhost(&peer).unwrap_err();
4382        assert_eq!(err.0, StatusCode::FORBIDDEN);
4383    }
4384
4385    #[test]
4386    fn require_localhost_rejects_non_loopback_ipv6() {
4387        let peer = SocketAddr::from((
4388            std::net::Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1),
4389            12345,
4390        ));
4391        let err = require_localhost(&peer).unwrap_err();
4392        assert_eq!(err.0, StatusCode::FORBIDDEN);
4393    }
4394}