Skip to main content

construct/gateway/
mod.rs

1//! Axum-based HTTP gateway with proper HTTP/1.1 compliance, body limits, and timeouts.
2//!
3//! This module replaces the raw TCP implementation with axum for:
4//! - Proper HTTP/1.1 parsing and compliance
5//! - Content-Length validation (handled by hyper)
6//! - Request body size limits (64KB max)
7//! - Request timeouts (30s) to prevent slow-loris attacks
8//! - Header sanitization (handled by axum/hyper)
9
10pub mod api;
11pub mod api_agents;
12pub mod api_clawhub;
13pub mod api_kumiho_proxy;
14pub mod api_mcp;
15pub mod api_memory_graph;
16pub mod api_pairing;
17#[cfg(feature = "plugins-wasm")]
18pub mod api_plugins;
19pub mod api_skills;
20pub mod api_teams;
21#[cfg(feature = "webauthn")]
22pub mod api_webauthn;
23pub mod api_workflows;
24pub mod approval_registry;
25pub mod auth_rate_limit;
26pub mod canvas;
27pub mod kumiho_client;
28pub mod mcp_discovery;
29pub mod nodes;
30pub mod session_queue;
31pub mod sse;
32pub mod static_files;
33pub mod terminal;
34pub mod tls;
35pub mod ws;
36pub mod ws_mcp_events;
37
38use crate::channels::{
39    Channel, GmailPushChannel, LinqChannel, NextcloudTalkChannel, SendMessage, WatiChannel,
40    WhatsAppChannel, session_backend::SessionBackend, session_sqlite::SqliteSessionBackend,
41};
42use crate::config::Config;
43use crate::cost::CostTracker;
44use crate::memory::{self, Memory, MemoryCategory};
45use crate::providers::{self, ChatMessage, Provider};
46use crate::runtime;
47use crate::security::SecurityPolicy;
48use crate::security::pairing::{PairingGuard, constant_time_eq, is_public_bind};
49use crate::tools;
50use crate::tools::canvas::CanvasStore;
51use crate::tools::traits::ToolSpec;
52use crate::util::truncate_with_ellipsis;
53use anyhow::{Context, Result};
54use axum::{
55    Router,
56    body::Bytes,
57    extract::{ConnectInfo, Query, State},
58    http::{HeaderMap, StatusCode, header},
59    response::{IntoResponse, Json},
60    routing::{delete, get, post, put},
61};
62use parking_lot::Mutex;
63use std::collections::HashMap;
64use std::net::{IpAddr, SocketAddr};
65use std::sync::Arc;
66use std::time::{Duration, Instant};
67use tower_http::limit::RequestBodyLimitLayer;
68use tower_http::timeout::TimeoutLayer;
69use uuid::Uuid;
70
71/// Maximum request body size (64KB) — prevents memory exhaustion
72pub const MAX_BODY_SIZE: usize = 65_536;
73/// Default request timeout (30s) — prevents slow-loris attacks.
74pub const REQUEST_TIMEOUT_SECS: u64 = 30;
75
76/// Read gateway request timeout from `CONSTRUCT_GATEWAY_TIMEOUT_SECS` env var
77/// at runtime, falling back to [`REQUEST_TIMEOUT_SECS`].
78///
79/// Agentic workloads with tool use (web search, MCP tools, sub-agent
80/// delegation) regularly exceed 30 seconds. This allows operators to
81/// increase the timeout without recompiling.
82pub fn gateway_request_timeout_secs() -> u64 {
83    std::env::var("CONSTRUCT_GATEWAY_TIMEOUT_SECS")
84        .ok()
85        .and_then(|v| v.parse().ok())
86        .unwrap_or(REQUEST_TIMEOUT_SECS)
87}
88/// Sliding window used by gateway rate limiting.
89pub const RATE_LIMIT_WINDOW_SECS: u64 = 60;
90/// Fallback max distinct client keys tracked in gateway rate limiter.
91pub const RATE_LIMIT_MAX_KEYS_DEFAULT: usize = 10_000;
92/// Fallback max distinct idempotency keys retained in gateway memory.
93pub const IDEMPOTENCY_MAX_KEYS_DEFAULT: usize = 10_000;
94
95fn webhook_memory_key() -> String {
96    format!("webhook_msg_{}", Uuid::new_v4())
97}
98
99fn whatsapp_memory_key(msg: &crate::channels::traits::ChannelMessage) -> String {
100    format!("whatsapp_{}_{}", msg.sender, msg.id)
101}
102
103fn linq_memory_key(msg: &crate::channels::traits::ChannelMessage) -> String {
104    format!("linq_{}_{}", msg.sender, msg.id)
105}
106
107fn wati_memory_key(msg: &crate::channels::traits::ChannelMessage) -> String {
108    format!("wati_{}_{}", msg.sender, msg.id)
109}
110
111fn nextcloud_talk_memory_key(msg: &crate::channels::traits::ChannelMessage) -> String {
112    format!("nextcloud_talk_{}_{}", msg.sender, msg.id)
113}
114
115fn sender_session_id(channel: &str, msg: &crate::channels::traits::ChannelMessage) -> String {
116    match &msg.thread_ts {
117        Some(thread_id) => format!("{channel}_{thread_id}_{}", msg.sender),
118        None => format!("{channel}_{}", msg.sender),
119    }
120}
121
122fn webhook_session_id(headers: &HeaderMap) -> Option<String> {
123    headers
124        .get("X-Session-Id")
125        .and_then(|v| v.to_str().ok())
126        .map(str::trim)
127        .filter(|value| !value.is_empty())
128        .map(str::to_owned)
129}
130
131fn hash_webhook_secret(value: &str) -> String {
132    use sha2::{Digest, Sha256};
133
134    let digest = Sha256::digest(value.as_bytes());
135    hex::encode(digest)
136}
137
138/// How often the rate limiter sweeps stale IP entries from its map.
139const RATE_LIMITER_SWEEP_INTERVAL_SECS: u64 = 300; // 5 minutes
140
141#[derive(Debug)]
142struct SlidingWindowRateLimiter {
143    limit_per_window: u32,
144    window: Duration,
145    max_keys: usize,
146    requests: Mutex<(HashMap<String, Vec<Instant>>, Instant)>,
147}
148
149impl SlidingWindowRateLimiter {
150    fn new(limit_per_window: u32, window: Duration, max_keys: usize) -> Self {
151        Self {
152            limit_per_window,
153            window,
154            max_keys: max_keys.max(1),
155            requests: Mutex::new((HashMap::new(), Instant::now())),
156        }
157    }
158
159    fn prune_stale(requests: &mut HashMap<String, Vec<Instant>>, cutoff: Instant) {
160        requests.retain(|_, timestamps| {
161            timestamps.retain(|t| *t > cutoff);
162            !timestamps.is_empty()
163        });
164    }
165
166    fn allow(&self, key: &str) -> bool {
167        if self.limit_per_window == 0 {
168            return true;
169        }
170
171        let now = Instant::now();
172        let cutoff = now.checked_sub(self.window).unwrap_or_else(Instant::now);
173
174        let mut guard = self.requests.lock();
175        let (requests, last_sweep) = &mut *guard;
176
177        // Periodic sweep: remove keys with no recent requests
178        if last_sweep.elapsed() >= Duration::from_secs(RATE_LIMITER_SWEEP_INTERVAL_SECS) {
179            Self::prune_stale(requests, cutoff);
180            *last_sweep = now;
181        }
182
183        if !requests.contains_key(key) && requests.len() >= self.max_keys {
184            // Opportunistic stale cleanup before eviction under cardinality pressure.
185            Self::prune_stale(requests, cutoff);
186            *last_sweep = now;
187
188            if requests.len() >= self.max_keys {
189                let evict_key = requests
190                    .iter()
191                    .min_by_key(|(_, timestamps)| timestamps.last().copied().unwrap_or(cutoff))
192                    .map(|(k, _)| k.clone());
193                if let Some(evict_key) = evict_key {
194                    requests.remove(&evict_key);
195                }
196            }
197        }
198
199        let entry = requests.entry(key.to_owned()).or_default();
200        entry.retain(|instant| *instant > cutoff);
201
202        if entry.len() >= self.limit_per_window as usize {
203            return false;
204        }
205
206        entry.push(now);
207        true
208    }
209}
210
211#[derive(Debug)]
212pub struct GatewayRateLimiter {
213    pair: SlidingWindowRateLimiter,
214    webhook: SlidingWindowRateLimiter,
215}
216
217impl GatewayRateLimiter {
218    fn new(pair_per_minute: u32, webhook_per_minute: u32, max_keys: usize) -> Self {
219        let window = Duration::from_secs(RATE_LIMIT_WINDOW_SECS);
220        Self {
221            pair: SlidingWindowRateLimiter::new(pair_per_minute, window, max_keys),
222            webhook: SlidingWindowRateLimiter::new(webhook_per_minute, window, max_keys),
223        }
224    }
225
226    fn allow_pair(&self, key: &str) -> bool {
227        self.pair.allow(key)
228    }
229
230    fn allow_webhook(&self, key: &str) -> bool {
231        self.webhook.allow(key)
232    }
233}
234
235#[derive(Debug)]
236pub struct IdempotencyStore {
237    ttl: Duration,
238    max_keys: usize,
239    keys: Mutex<HashMap<String, Instant>>,
240}
241
242impl IdempotencyStore {
243    fn new(ttl: Duration, max_keys: usize) -> Self {
244        Self {
245            ttl,
246            max_keys: max_keys.max(1),
247            keys: Mutex::new(HashMap::new()),
248        }
249    }
250
251    /// Returns true if this key is new and is now recorded.
252    fn record_if_new(&self, key: &str) -> bool {
253        let now = Instant::now();
254        let mut keys = self.keys.lock();
255
256        keys.retain(|_, seen_at| now.duration_since(*seen_at) < self.ttl);
257
258        if keys.contains_key(key) {
259            return false;
260        }
261
262        if keys.len() >= self.max_keys {
263            let evict_key = keys
264                .iter()
265                .min_by_key(|(_, seen_at)| *seen_at)
266                .map(|(k, _)| k.clone());
267            if let Some(evict_key) = evict_key {
268                keys.remove(&evict_key);
269            }
270        }
271
272        keys.insert(key.to_owned(), now);
273        true
274    }
275}
276
277fn parse_client_ip(value: &str) -> Option<IpAddr> {
278    let value = value.trim().trim_matches('"').trim();
279    if value.is_empty() {
280        return None;
281    }
282
283    if let Ok(ip) = value.parse::<IpAddr>() {
284        return Some(ip);
285    }
286
287    if let Ok(addr) = value.parse::<SocketAddr>() {
288        return Some(addr.ip());
289    }
290
291    let value = value.trim_matches(['[', ']']);
292    value.parse::<IpAddr>().ok()
293}
294
295fn forwarded_client_ip(headers: &HeaderMap) -> Option<IpAddr> {
296    // Read the rightmost X-Forwarded-For hop. Proxies APPEND to XFF, so the
297    // leftmost value is supplied by the client (attacker-controlled) while the
298    // rightmost was written by the immediate upstream proxy we are trusting.
299    // Taking the leftmost would let any caller spoof an arbitrary source IP.
300    if let Some(xff) = headers.get("X-Forwarded-For").and_then(|v| v.to_str().ok()) {
301        for candidate in xff.rsplit(',') {
302            if let Some(ip) = parse_client_ip(candidate) {
303                return Some(ip);
304            }
305        }
306    }
307
308    headers
309        .get("X-Real-IP")
310        .and_then(|v| v.to_str().ok())
311        .and_then(parse_client_ip)
312}
313
314pub(super) fn client_key_from_request(
315    peer_addr: Option<SocketAddr>,
316    headers: &HeaderMap,
317    trust_forwarded_headers: bool,
318) -> String {
319    if trust_forwarded_headers {
320        if let Some(ip) = forwarded_client_ip(headers) {
321            return ip.to_string();
322        }
323    }
324
325    peer_addr
326        .map(|addr| addr.ip().to_string())
327        .unwrap_or_else(|| "unknown".to_string())
328}
329
330fn normalize_max_keys(configured: usize, fallback: usize) -> usize {
331    if configured == 0 {
332        fallback.max(1)
333    } else {
334        configured
335    }
336}
337
338/// Shared state for all axum handlers
339#[derive(Clone)]
340pub struct AppState {
341    pub config: Arc<Mutex<Config>>,
342    pub provider: Arc<dyn Provider>,
343    pub model: String,
344    pub temperature: f64,
345    pub mem: Arc<dyn Memory>,
346    pub auto_save: bool,
347    /// SHA-256 hash of `X-Webhook-Secret` (hex-encoded), never plaintext.
348    pub webhook_secret_hash: Option<Arc<str>>,
349    pub pairing: Arc<PairingGuard>,
350    pub trust_forwarded_headers: bool,
351    pub rate_limiter: Arc<GatewayRateLimiter>,
352    pub auth_limiter: Arc<auth_rate_limit::AuthRateLimiter>,
353    pub idempotency_store: Arc<IdempotencyStore>,
354    pub whatsapp: Option<Arc<WhatsAppChannel>>,
355    /// `WhatsApp` app secret for webhook signature verification (`X-Hub-Signature-256`)
356    pub whatsapp_app_secret: Option<Arc<str>>,
357    pub linq: Option<Arc<LinqChannel>>,
358    /// Linq webhook signing secret for signature verification
359    pub linq_signing_secret: Option<Arc<str>>,
360    pub nextcloud_talk: Option<Arc<NextcloudTalkChannel>>,
361    /// Nextcloud Talk webhook secret for signature verification
362    pub nextcloud_talk_webhook_secret: Option<Arc<str>>,
363    pub wati: Option<Arc<WatiChannel>>,
364    /// Gmail Pub/Sub push notification channel
365    pub gmail_push: Option<Arc<GmailPushChannel>>,
366    /// Observability backend for metrics scraping
367    pub observer: Arc<dyn crate::observability::Observer>,
368    /// Registered tool specs (for web dashboard tools page)
369    pub tools_registry: Arc<Vec<ToolSpec>>,
370    /// Cost tracker (optional, for web dashboard cost page)
371    pub cost_tracker: Option<Arc<CostTracker>>,
372    /// Audit logger (optional, for web dashboard audit viewer)
373    pub audit_logger: Option<Arc<crate::security::audit::AuditLogger>>,
374    /// SSE broadcast channel for real-time events
375    pub event_tx: tokio::sync::broadcast::Sender<serde_json::Value>,
376    /// Shutdown signal sender for graceful shutdown
377    pub shutdown_tx: tokio::sync::watch::Sender<bool>,
378    /// Registry of dynamically connected nodes
379    pub node_registry: Arc<nodes::NodeRegistry>,
380    /// Path prefix for reverse-proxy deployments (empty string = no prefix)
381    pub path_prefix: String,
382    /// Session backend for persisting gateway WS chat sessions
383    pub session_backend: Option<Arc<dyn SessionBackend>>,
384    /// Per-session actor queue for serializing concurrent turns
385    pub session_queue: Arc<session_queue::SessionActorQueue>,
386    /// Device registry for paired device management
387    pub device_registry: Option<Arc<api_pairing::DeviceRegistry>>,
388    /// Pending pairing request store
389    pub pending_pairings: Option<Arc<api_pairing::PairingStore>>,
390    /// Shared canvas store for Live Canvas (A2UI) system
391    pub canvas_store: CanvasStore,
392    /// MCP registry for direct tool invocation from HTTP handlers (memory graph, etc.)
393    pub mcp_registry: Option<Arc<tools::McpRegistry>>,
394    /// WebAuthn state for hardware key authentication (optional, requires `webauthn` feature)
395    #[cfg(feature = "webauthn")]
396    pub webauthn: Option<Arc<api_webauthn::WebAuthnState>>,
397    /// Registry of pending human approval requests from workflow runs
398    pub approval_registry: Arc<approval_registry::ApprovalRegistry>,
399    /// Base URL (e.g. `http://127.0.0.1:60004`) of the in-process MCP server,
400    /// used by gateway reverse-proxy handlers in [`api_mcp`]. `None` if the
401    /// MCP server failed to bind — proxy handlers should then return 503.
402    /// Populated after MCP server bind in [`run_gateway`].
403    pub mcp_local_url: Option<Arc<str>>,
404}
405
406/// Run the HTTP gateway using axum with proper HTTP/1.1 compliance.
407#[allow(clippy::too_many_lines)]
408pub async fn run_gateway(host: &str, port: u16, config: Config) -> Result<()> {
409    // ── Security: warn on public bind without tunnel or explicit opt-in ──
410    if is_public_bind(host) && config.tunnel.provider == "none" && !config.gateway.allow_public_bind
411    {
412        tracing::warn!(
413            "⚠️  Binding to {host} — gateway will be exposed to all network interfaces.\n\
414             Suggestion: use --host 127.0.0.1 (default), configure a tunnel, or set\n\
415             [gateway] allow_public_bind = true in config.toml to silence this warning.\n\n\
416             Docker/VM: if you are running inside a container or VM, this is expected."
417        );
418    }
419    let config_state = Arc::new(Mutex::new(config.clone()));
420
421    // ── Hooks ──────────────────────────────────────────────────────
422    let hooks: Option<std::sync::Arc<crate::hooks::HookRunner>> = if config.hooks.enabled {
423        Some(std::sync::Arc::new(crate::hooks::HookRunner::new()))
424    } else {
425        None
426    };
427
428    let addr: SocketAddr = format!("{host}:{port}").parse()?;
429    let listener = tokio::net::TcpListener::bind(addr).await?;
430    let actual_port = listener.local_addr()?.port();
431    let display_addr = format!("{host}:{actual_port}");
432
433    let provider: Arc<dyn Provider> = Arc::from(providers::create_resilient_provider_with_options(
434        config.default_provider.as_deref().unwrap_or("openrouter"),
435        config.api_key.as_deref(),
436        config.api_url.as_deref(),
437        &config.reliability,
438        &providers::ProviderRuntimeOptions {
439            auth_profile_override: None,
440            provider_api_url: config.api_url.clone(),
441            construct_dir: config.config_path.parent().map(std::path::PathBuf::from),
442            secrets_encrypt: config.secrets.encrypt,
443            reasoning_enabled: config.runtime.reasoning_enabled,
444            reasoning_effort: config.runtime.reasoning_effort.clone(),
445            provider_timeout_secs: Some(config.provider_timeout_secs),
446            extra_headers: config.extra_headers.clone(),
447            api_path: config.api_path.clone(),
448            provider_max_tokens: config.provider_max_tokens,
449        },
450    )?);
451    let model = config
452        .default_model
453        .clone()
454        .unwrap_or_else(|| "anthropic/claude-sonnet-4".into());
455    let temperature = config.default_temperature;
456    let mem: Arc<dyn Memory> = Arc::from(memory::create_memory_with_storage_and_routes(
457        &config.memory,
458        &config.embedding_routes,
459        Some(&config.storage.provider.config),
460        &config.workspace_dir,
461        config.api_key.as_deref(),
462    )?);
463    let runtime: Arc<dyn runtime::RuntimeAdapter> =
464        Arc::from(runtime::create_runtime(&config.runtime)?);
465    let security = Arc::new(SecurityPolicy::from_config(
466        &config.autonomy,
467        &config.workspace_dir,
468    ));
469
470    let (composio_key, composio_entity_id) = if config.composio.enabled {
471        (
472            config.composio.api_key.as_deref(),
473            Some(config.composio.entity_id.as_str()),
474        )
475    } else {
476        (None, None)
477    };
478
479    let canvas_store = tools::canvas::global_store();
480
481    let (
482        mut tools_registry_raw,
483        delegate_handle_gw,
484        _reaction_handle_gw,
485        _channel_map_handle,
486        _ask_user_handle_gw,
487        _escalate_handle_gw,
488    ) = tools::all_tools_with_runtime(
489        Arc::new(config.clone()),
490        &security,
491        runtime,
492        Arc::clone(&mem),
493        composio_key,
494        composio_entity_id,
495        &config.browser,
496        &config.http_request,
497        &config.web_fetch,
498        &config.workspace_dir,
499        &config.agents,
500        config.api_key.as_deref(),
501        &config,
502        Some(canvas_store.clone()),
503    );
504
505    // ── Wire MCP tools into the gateway tool registry (non-fatal) ───
506    // Without this, the `/api/tools` endpoint misses MCP tools.
507    // Inject operator + kumiho MCP server configs (same as agent/channels do)
508    // so the gateway can call operator tools directly from HTTP handlers.
509    let gateway_mcp_config = {
510        let mut c = config.clone();
511        c = crate::agent::kumiho::inject_kumiho(c, false);
512        c = crate::agent::operator::inject_operator(c, false);
513        c.mcp
514    };
515    let mut mcp_registry_shared: Option<Arc<tools::McpRegistry>> = None;
516    if gateway_mcp_config.enabled && !gateway_mcp_config.servers.is_empty() {
517        tracing::info!(
518            "Gateway: initializing MCP client — {} server(s) configured",
519            gateway_mcp_config.servers.len()
520        );
521        match tools::McpRegistry::connect_all(&gateway_mcp_config.servers).await {
522            Ok(registry) => {
523                let registry = std::sync::Arc::new(registry);
524                mcp_registry_shared = Some(std::sync::Arc::clone(&registry));
525                if gateway_mcp_config.deferred_loading {
526                    let operator_prefix =
527                        format!("{}__", crate::agent::operator::OPERATOR_SERVER_NAME);
528                    let kumiho_prefix = format!("{}__", crate::agent::kumiho::KUMIHO_SERVER_NAME);
529                    let all_names = registry.tool_names();
530                    let mut eager_count = 0usize;
531
532                    for name in &all_names {
533                        if name.starts_with(&operator_prefix) || name.starts_with(&kumiho_prefix) {
534                            if let Some(def) = registry.get_tool_def(name).await {
535                                let wrapper: std::sync::Arc<dyn tools::Tool> =
536                                    std::sync::Arc::new(tools::McpToolWrapper::new(
537                                        name.clone(),
538                                        def,
539                                        std::sync::Arc::clone(&registry),
540                                    ));
541                                if let Some(ref handle) = delegate_handle_gw {
542                                    handle.write().push(std::sync::Arc::clone(&wrapper));
543                                }
544                                tools_registry_raw.push(Box::new(tools::ArcToolRef(wrapper)));
545                                eager_count += 1;
546                            }
547                        }
548                    }
549
550                    let deferred_set = tools::DeferredMcpToolSet::from_registry_filtered(
551                        std::sync::Arc::clone(&registry),
552                        |name| {
553                            !name.starts_with(&operator_prefix) && !name.starts_with(&kumiho_prefix)
554                        },
555                    )
556                    .await;
557                    tracing::info!(
558                        "Gateway MCP hybrid: {} eager operator+kumiho tool(s), {} deferred stub(s) from {} server(s)",
559                        eager_count,
560                        deferred_set.len(),
561                        registry.server_count()
562                    );
563                    let activated =
564                        std::sync::Arc::new(std::sync::Mutex::new(tools::ActivatedToolSet::new()));
565                    tools_registry_raw.push(Box::new(tools::ToolSearchTool::new(
566                        deferred_set,
567                        activated,
568                    )));
569                } else {
570                    let names = registry.tool_names();
571                    let mut registered = 0usize;
572                    for name in names {
573                        if let Some(def) = registry.get_tool_def(&name).await {
574                            let wrapper: std::sync::Arc<dyn tools::Tool> =
575                                std::sync::Arc::new(tools::McpToolWrapper::new(
576                                    name,
577                                    def,
578                                    std::sync::Arc::clone(&registry),
579                                ));
580                            if let Some(ref handle) = delegate_handle_gw {
581                                handle.write().push(std::sync::Arc::clone(&wrapper));
582                            }
583                            tools_registry_raw.push(Box::new(tools::ArcToolRef(wrapper)));
584                            registered += 1;
585                        }
586                    }
587                    tracing::info!(
588                        "Gateway MCP: {} tool(s) registered from {} server(s)",
589                        registered,
590                        registry.server_count()
591                    );
592                }
593            }
594            Err(e) => {
595                tracing::error!("Gateway MCP registry failed to initialize: {e:#}");
596            }
597        }
598    }
599
600    let tools_registry: Arc<Vec<ToolSpec>> =
601        Arc::new(tools_registry_raw.iter().map(|t| t.spec()).collect());
602
603    // Cost tracker — process-global singleton so channels share the same instance
604    let cost_tracker = CostTracker::get_or_init_global(config.cost.clone(), &config.workspace_dir);
605
606    // Audit logger — optional, for dashboard audit viewer
607    let audit_logger = if config.security.audit.enabled {
608        match crate::security::audit::AuditLogger::new(
609            config.security.audit.clone(),
610            std::path::PathBuf::from(&config.workspace_dir),
611        ) {
612            Ok(logger) => Some(Arc::new(logger)),
613            Err(e) => {
614                tracing::warn!("Audit logger init failed (dashboard audit viewer disabled): {e}");
615                None
616            }
617        }
618    } else {
619        None
620    };
621
622    // SSE broadcast channel for real-time events
623    let (event_tx, _event_rx) = tokio::sync::broadcast::channel::<serde_json::Value>(4096);
624    // Extract webhook secret for authentication
625    let webhook_secret_hash: Option<Arc<str>> =
626        config.channels_config.webhook.as_ref().and_then(|webhook| {
627            webhook.secret.as_ref().and_then(|raw_secret| {
628                let trimmed_secret = raw_secret.trim();
629                (!trimmed_secret.is_empty())
630                    .then(|| Arc::<str>::from(hash_webhook_secret(trimmed_secret)))
631            })
632        });
633
634    // WhatsApp channel (if configured)
635    let whatsapp_channel: Option<Arc<WhatsAppChannel>> = config
636        .channels_config
637        .whatsapp
638        .as_ref()
639        .filter(|wa| wa.is_cloud_config())
640        .map(|wa| {
641            Arc::new(WhatsAppChannel::new(
642                wa.access_token.clone().unwrap_or_default(),
643                wa.phone_number_id.clone().unwrap_or_default(),
644                wa.verify_token.clone().unwrap_or_default(),
645                wa.allowed_numbers.clone(),
646            ))
647        });
648
649    // WhatsApp app secret for webhook signature verification
650    // Priority: environment variable > config file
651    let whatsapp_app_secret: Option<Arc<str>> = std::env::var("CONSTRUCT_WHATSAPP_APP_SECRET")
652        .ok()
653        .and_then(|secret| {
654            let secret = secret.trim();
655            (!secret.is_empty()).then(|| secret.to_owned())
656        })
657        .or_else(|| {
658            config.channels_config.whatsapp.as_ref().and_then(|wa| {
659                wa.app_secret
660                    .as_deref()
661                    .map(str::trim)
662                    .filter(|secret| !secret.is_empty())
663                    .map(ToOwned::to_owned)
664            })
665        })
666        .map(Arc::from);
667
668    // Linq channel (if configured)
669    let linq_channel: Option<Arc<LinqChannel>> = config.channels_config.linq.as_ref().map(|lq| {
670        Arc::new(LinqChannel::new(
671            lq.api_token.clone(),
672            lq.from_phone.clone(),
673            lq.allowed_senders.clone(),
674        ))
675    });
676
677    // Linq signing secret for webhook signature verification
678    // Priority: environment variable > config file
679    let linq_signing_secret: Option<Arc<str>> = std::env::var("CONSTRUCT_LINQ_SIGNING_SECRET")
680        .ok()
681        .and_then(|secret| {
682            let secret = secret.trim();
683            (!secret.is_empty()).then(|| secret.to_owned())
684        })
685        .or_else(|| {
686            config.channels_config.linq.as_ref().and_then(|lq| {
687                lq.signing_secret
688                    .as_deref()
689                    .map(str::trim)
690                    .filter(|secret| !secret.is_empty())
691                    .map(ToOwned::to_owned)
692            })
693        })
694        .map(Arc::from);
695
696    // WATI channel (if configured)
697    let wati_channel: Option<Arc<WatiChannel>> =
698        config.channels_config.wati.as_ref().map(|wati_cfg| {
699            Arc::new(
700                WatiChannel::new(
701                    wati_cfg.api_token.clone(),
702                    wati_cfg.api_url.clone(),
703                    wati_cfg.tenant_id.clone(),
704                    wati_cfg.allowed_numbers.clone(),
705                )
706                .with_transcription(config.transcription.clone()),
707            )
708        });
709
710    // Nextcloud Talk channel (if configured)
711    let nextcloud_talk_channel: Option<Arc<NextcloudTalkChannel>> =
712        config.channels_config.nextcloud_talk.as_ref().map(|nc| {
713            Arc::new(NextcloudTalkChannel::new(
714                nc.base_url.clone(),
715                nc.app_token.clone(),
716                nc.bot_name.clone().unwrap_or_default(),
717                nc.allowed_users.clone(),
718            ))
719        });
720
721    // Nextcloud Talk webhook secret for signature verification
722    // Priority: environment variable > config file
723    let nextcloud_talk_webhook_secret: Option<Arc<str>> =
724        std::env::var("CONSTRUCT_NEXTCLOUD_TALK_WEBHOOK_SECRET")
725            .ok()
726            .and_then(|secret| {
727                let secret = secret.trim();
728                (!secret.is_empty()).then(|| secret.to_owned())
729            })
730            .or_else(|| {
731                config
732                    .channels_config
733                    .nextcloud_talk
734                    .as_ref()
735                    .and_then(|nc| {
736                        nc.webhook_secret
737                            .as_deref()
738                            .map(str::trim)
739                            .filter(|secret| !secret.is_empty())
740                            .map(ToOwned::to_owned)
741                    })
742            })
743            .map(Arc::from);
744
745    // Gmail Push channel (if configured and enabled)
746    let gmail_push_channel: Option<Arc<GmailPushChannel>> = config
747        .channels_config
748        .gmail_push
749        .as_ref()
750        .filter(|gp| gp.enabled)
751        .map(|gp| Arc::new(GmailPushChannel::new(gp.clone())));
752
753    // ── Session persistence for WS chat ─────────────────────
754    let session_backend: Option<Arc<dyn SessionBackend>> = if config.gateway.session_persistence {
755        match SqliteSessionBackend::new(&config.workspace_dir) {
756            Ok(b) => {
757                tracing::info!("Gateway session persistence enabled (SQLite)");
758                if config.gateway.session_ttl_hours > 0 {
759                    if let Ok(cleaned) = b.cleanup_stale(config.gateway.session_ttl_hours) {
760                        if cleaned > 0 {
761                            tracing::info!("Cleaned up {cleaned} stale gateway sessions");
762                        }
763                    }
764                }
765                Some(Arc::new(b))
766            }
767            Err(e) => {
768                tracing::warn!("Session persistence disabled: {e}");
769                None
770            }
771        }
772    } else {
773        None
774    };
775
776    // ── Pairing guard ──────────────────────────────────────
777    let pairing = Arc::new(PairingGuard::new(
778        config.gateway.require_pairing,
779        &config.gateway.paired_tokens,
780    ));
781    let rate_limit_max_keys = normalize_max_keys(
782        config.gateway.rate_limit_max_keys,
783        RATE_LIMIT_MAX_KEYS_DEFAULT,
784    );
785    let rate_limiter = Arc::new(GatewayRateLimiter::new(
786        config.gateway.pair_rate_limit_per_minute,
787        config.gateway.webhook_rate_limit_per_minute,
788        rate_limit_max_keys,
789    ));
790    let idempotency_max_keys = normalize_max_keys(
791        config.gateway.idempotency_max_keys,
792        IDEMPOTENCY_MAX_KEYS_DEFAULT,
793    );
794    let idempotency_store = Arc::new(IdempotencyStore::new(
795        Duration::from_secs(config.gateway.idempotency_ttl_secs.max(1)),
796        idempotency_max_keys,
797    ));
798
799    // Resolve optional path prefix for reverse-proxy deployments.
800    let path_prefix: Option<&str> = config
801        .gateway
802        .path_prefix
803        .as_deref()
804        .filter(|p| !p.is_empty());
805
806    // ── Tunnel ────────────────────────────────────────────────
807    let tunnel = crate::tunnel::create_tunnel(&config.tunnel)?;
808    let mut tunnel_url: Option<String> = None;
809
810    if let Some(ref tun) = tunnel {
811        println!("🔗 Starting {} tunnel...", tun.name());
812        match tun.start(host, actual_port).await {
813            Ok(url) => {
814                println!("🌐 Tunnel active: {url}");
815                tunnel_url = Some(url);
816            }
817            Err(e) => {
818                println!("⚠️  Tunnel failed to start: {e}");
819                println!("   Falling back to local-only mode.");
820            }
821        }
822    }
823
824    let pfx = path_prefix.unwrap_or("");
825    println!("🦀 Construct Gateway listening on http://{display_addr}{pfx}");
826    if let Some(ref url) = tunnel_url {
827        println!("  🌐 Public URL: {url}");
828    }
829    println!("  🌐 Web Dashboard: http://{display_addr}{pfx}/");
830    if let Some(code) = pairing.pairing_code() {
831        println!();
832        println!("  🔐 PAIRING REQUIRED — use this one-time code:");
833        println!("     ┌──────────────┐");
834        println!("     │  {code}  │");
835        println!("     └──────────────┘");
836        println!("     Send: POST {pfx}/pair with header X-Pairing-Code: {code}");
837    } else if pairing.require_pairing() {
838        println!("  🔒 Pairing: ACTIVE (bearer token required)");
839        println!("     To pair a new device: construct gateway get-paircode --new");
840        println!();
841    } else {
842        println!("  ⚠️  Pairing: DISABLED (all requests accepted)");
843        println!();
844    }
845    println!("  POST {pfx}/pair      — pair a new client (X-Pairing-Code header)");
846    println!("  POST {pfx}/webhook   — {{\"message\": \"your prompt\"}}");
847    if whatsapp_channel.is_some() {
848        println!("  GET  {pfx}/whatsapp  — Meta webhook verification");
849        println!("  POST {pfx}/whatsapp  — WhatsApp message webhook");
850    }
851    if linq_channel.is_some() {
852        println!("  POST {pfx}/linq      — Linq message webhook (iMessage/RCS/SMS)");
853    }
854    if wati_channel.is_some() {
855        println!("  GET  {pfx}/wati      — WATI webhook verification");
856        println!("  POST {pfx}/wati      — WATI message webhook");
857    }
858    if nextcloud_talk_channel.is_some() {
859        println!("  POST {pfx}/nextcloud-talk — Nextcloud Talk bot webhook");
860    }
861    println!("  GET  {pfx}/api/*     — REST API (bearer token required)");
862    println!("  GET  {pfx}/ws/chat   — WebSocket agent chat");
863    if config.nodes.enabled {
864        println!("  GET  {pfx}/ws/nodes  — WebSocket node discovery");
865    }
866    println!("  GET  {pfx}/health    — health check");
867    println!("  GET  {pfx}/metrics   — Prometheus metrics");
868    println!("  Press Ctrl+C to stop.\n");
869
870    crate::health::mark_component_ok("gateway");
871
872    // Fire gateway start hook
873    if let Some(ref hooks) = hooks {
874        hooks.fire_gateway_start(host, actual_port).await;
875    }
876
877    // Wrap observer with broadcast capability for SSE
878    let broadcast_observer: Arc<dyn crate::observability::Observer> =
879        Arc::new(sse::BroadcastObserver::new(
880            crate::observability::create_observer(&config.observability),
881            event_tx.clone(),
882        ));
883
884    let (shutdown_tx, mut shutdown_rx) = tokio::sync::watch::channel(false);
885
886    // Node registry for dynamic node discovery
887    let node_registry = Arc::new(nodes::NodeRegistry::new(config.nodes.max_nodes));
888
889    // Device registry and pairing store (only when pairing is required)
890    let device_registry = if config.gateway.require_pairing {
891        Some(Arc::new(api_pairing::DeviceRegistry::new(
892            &config.workspace_dir,
893        )?))
894    } else {
895        None
896    };
897    let pending_pairings = if config.gateway.require_pairing {
898        Some(Arc::new(api_pairing::PairingStore::new(
899            config.gateway.pairing_dashboard.max_pending_codes,
900        )))
901    } else {
902        None
903    };
904
905    // ── Build RuntimeHandles for the in-process MCP server ──────────────
906    //
907    // The MCP task needs live handles so the tools the standalone binary
908    // used to skip (workspace, channel-bound tools, session_store, discord
909    // memory, delegate, …) register properly. We clone individual Arcs out
910    // of the pieces we already built above and feed them in.
911    //
912    // Tools that keep shared handles (reaction/ask_user/escalate/poll) are
913    // easiest to wire by handing their already-built, channel-map-aware
914    // instances over via `pre_built_tools`. We do this by building a second
915    // parallel Arc-vec via `all_tools_with_runtime` — the per-tool state is
916    // stateless for registry purposes, and delegate/swarm get their own
917    // per-call depth counters anyway.
918    let mcp_workspace_dir = config.workspace_dir.clone();
919    let mcp_config_snapshot = config.clone();
920    let mcp_runtime_handles = {
921        let mut rh = crate::mcp_server::RuntimeHandles::empty();
922
923        // Workspace manager (if workspace isolation is on).
924        if config.workspace.enabled {
925            let workspaces_dir = if config.workspace.workspaces_dir.starts_with("~/") {
926                let home = directories::UserDirs::new()
927                    .map(|u| u.home_dir().to_path_buf())
928                    .unwrap_or_else(|| std::path::PathBuf::from("."));
929                home.join(&config.workspace.workspaces_dir[2..])
930            } else {
931                std::path::PathBuf::from(&config.workspace.workspaces_dir)
932            };
933            let manager = crate::config::workspace::WorkspaceManager::new(workspaces_dir);
934            rh.workspace_manager = Some(Arc::new(tokio::sync::RwLock::new(manager)));
935        }
936
937        // Session backend from channels::session_store::SessionStore.
938        if let Ok(store) = crate::channels::session_store::SessionStore::new(&config.workspace_dir)
939        {
940            let backend: Arc<dyn crate::channels::session_backend::SessionBackend> =
941                Arc::new(store);
942            rh.session_store = Some(backend);
943        }
944
945        // Discord memory (formerly for discord_search) — removed with the
946        // SQLite backend; persistent cross-session memory should use Kumiho MCP.
947
948        // Agent config + provider options for delegate / swarm.
949        if !config.agents.is_empty() {
950            rh.agent_config = Some(Arc::new(config.agents.clone()));
951            rh.fallback_api_key = config.api_key.as_deref().map(Arc::<str>::from);
952            rh.provider_runtime_options =
953                Some(Arc::new(crate::providers::ProviderRuntimeOptions {
954                    auth_profile_override: None,
955                    provider_api_url: config.api_url.clone(),
956                    construct_dir: config.config_path.parent().map(std::path::PathBuf::from),
957                    secrets_encrypt: config.secrets.encrypt,
958                    reasoning_enabled: config.runtime.reasoning_enabled,
959                    reasoning_effort: config.runtime.reasoning_effort.clone(),
960                    provider_timeout_secs: Some(config.provider_timeout_secs),
961                    extra_headers: config.extra_headers.clone(),
962                    api_path: config.api_path.clone(),
963                    provider_max_tokens: config.provider_max_tokens,
964                }));
965        }
966
967        // Build a second parallel tool set and hand it to MCP via
968        // `pre_built_tools` so every channel-aware / delegate tool lands in
969        // the MCP registry fully wired (even though its ChannelMap handle is
970        // distinct from the gateway's — the channel supervisor's populate
971        // path still routes through the gateway's tool instances; the MCP
972        // versions advertise and execute independently).
973        let (mcp_tools_boxed, _d_h, _r_h, _c_h, _a_h, _e_h) = tools::all_tools_with_runtime(
974            Arc::new(config.clone()),
975            &security,
976            Arc::new(crate::runtime::NativeRuntime::new()),
977            Arc::clone(&mem),
978            composio_key,
979            composio_entity_id,
980            &config.browser,
981            &config.http_request,
982            &config.web_fetch,
983            &config.workspace_dir,
984            &config.agents,
985            config.api_key.as_deref(),
986            &config,
987            Some(canvas_store.clone()),
988        );
989        // Convert Box<dyn Tool> to Arc<dyn Tool>.
990        let mcp_tool_arcs: Vec<Arc<dyn tools::Tool>> = mcp_tools_boxed
991            .into_iter()
992            .map(|b| Arc::<dyn tools::Tool>::from(b))
993            .collect();
994        rh.pre_built_tools = Some(mcp_tool_arcs);
995
996        rh
997    };
998
999    let mut state = AppState {
1000        config: config_state,
1001        provider,
1002        model,
1003        temperature,
1004        mem,
1005        auto_save: config.memory.auto_save,
1006        webhook_secret_hash,
1007        pairing,
1008        trust_forwarded_headers: config.gateway.trust_forwarded_headers,
1009        rate_limiter,
1010        auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
1011        idempotency_store,
1012        whatsapp: whatsapp_channel,
1013        whatsapp_app_secret,
1014        linq: linq_channel,
1015        linq_signing_secret,
1016        nextcloud_talk: nextcloud_talk_channel,
1017        nextcloud_talk_webhook_secret,
1018        wati: wati_channel,
1019        gmail_push: gmail_push_channel,
1020        observer: broadcast_observer,
1021        tools_registry,
1022        cost_tracker,
1023        audit_logger,
1024        event_tx,
1025        shutdown_tx,
1026        node_registry,
1027        session_backend,
1028        session_queue: Arc::new(session_queue::SessionActorQueue::new(8, 30, 600)),
1029        device_registry,
1030        pending_pairings,
1031        path_prefix: path_prefix.unwrap_or("").to_string(),
1032        canvas_store,
1033        mcp_registry: mcp_registry_shared,
1034        approval_registry: approval_registry::global(),
1035        // Populated after the in-process MCP server binds (see below).
1036        mcp_local_url: None,
1037        #[cfg(feature = "webauthn")]
1038        webauthn: if config.security.webauthn.enabled {
1039            let secret_store = Arc::new(crate::security::SecretStore::new(
1040                &config.workspace_dir,
1041                true,
1042            ));
1043            let wa_config = crate::security::webauthn::WebAuthnConfig {
1044                enabled: true,
1045                rp_id: config.security.webauthn.rp_id.clone(),
1046                rp_origin: config.security.webauthn.rp_origin.clone(),
1047                rp_name: config.security.webauthn.rp_name.clone(),
1048            };
1049            Some(Arc::new(api_webauthn::WebAuthnState {
1050                manager: crate::security::webauthn::WebAuthnManager::new(
1051                    wa_config,
1052                    secret_store,
1053                    &config.workspace_dir,
1054                ),
1055                pending_registrations: parking_lot::Mutex::new(std::collections::HashMap::new()),
1056                pending_authentications: parking_lot::Mutex::new(std::collections::HashMap::new()),
1057            }))
1058        } else {
1059            None
1060        },
1061    };
1062
1063    // ── Spawn the in-process MCP server ─────────────────────────────────
1064    //
1065    // Binds 127.0.0.1:0 (ephemeral, as external clients expect), writes
1066    // ~/.construct/mcp.json with {url,pid,started_at} atomically, and tears
1067    // down on gateway shutdown. If the bind fails we log and move on —
1068    // this is non-fatal for the gateway itself.
1069    let mcp_shutdown_watch = state.shutdown_tx.subscribe();
1070    let mcp_task: Option<tokio::task::JoinHandle<()>> = {
1071        let (mcp_state, mcp_skipped) = crate::mcp_server::server::state_from_runtime(
1072            &mcp_workspace_dir,
1073            &mcp_config_snapshot,
1074            &mcp_runtime_handles,
1075        );
1076        for (name, reason) in &mcp_skipped {
1077            tracing::info!("mcp-server: skipped tool `{name}` — {reason}");
1078        }
1079        tracing::info!(
1080            "mcp-server: advertising {} tools to external MCP clients",
1081            mcp_state.tools.len()
1082        );
1083
1084        match crate::mcp_server::serve_on(SocketAddr::from(([127, 0, 0, 1], 0)), mcp_state).await {
1085            Ok(handle) => {
1086                let url = format!("http://{}/mcp", handle.addr);
1087                if let Err(e) = crate::mcp_server::server::write_discovery_file(&url) {
1088                    tracing::warn!("mcp-server: failed to write discovery file: {e}");
1089                } else {
1090                    tracing::info!(
1091                        "mcp-server: listening on {url} (discovery at ~/.construct/mcp.json)"
1092                    );
1093                }
1094
1095                // Expose the bound base URL (no `/mcp` suffix) to the gateway
1096                // reverse-proxy handlers in `api_mcp`.
1097                state.mcp_local_url = Some(Arc::from(format!("http://{}", handle.addr)));
1098
1099                let mut watch = mcp_shutdown_watch;
1100                Some(tokio::spawn(async move {
1101                    // Wait for the gateway's shutdown signal.
1102                    loop {
1103                        if *watch.borrow() {
1104                            break;
1105                        }
1106                        if watch.changed().await.is_err() {
1107                            break;
1108                        }
1109                    }
1110                    let _ = handle.shutdown.send(());
1111                    let _ = handle.joined.await;
1112                    crate::mcp_server::server::cleanup_discovery_file();
1113                    tracing::info!("mcp-server: stopped");
1114                }))
1115            }
1116            Err(e) => {
1117                tracing::error!("mcp-server: failed to bind: {e}");
1118                None
1119            }
1120        }
1121    };
1122
1123    // Config PUT needs larger body limit (1MB)
1124    let config_put_router = Router::new()
1125        .route("/api/config", put(api::handle_api_config_put))
1126        .layer(RequestBodyLimitLayer::new(1_048_576));
1127
1128    // Memory graph needs longer timeout (aggregates many Kumiho calls via operator).
1129    // Built as a separate router that gets merged AFTER the global timeout layer.
1130    let memory_graph_router = Router::new()
1131        .route(
1132            "/api/memory/graph",
1133            get(api_memory_graph::handle_memory_graph),
1134        )
1135        .with_state(state.clone())
1136        .layer(RequestBodyLimitLayer::new(MAX_BODY_SIZE))
1137        .layer(TimeoutLayer::with_status_code(
1138            StatusCode::REQUEST_TIMEOUT,
1139            Duration::from_secs(60),
1140        ));
1141
1142    // Build router with middleware
1143    let inner = Router::new()
1144        // ── Admin routes (for CLI management) ──
1145        .route("/admin/shutdown", post(handle_admin_shutdown))
1146        .route("/admin/paircode", get(handle_admin_paircode))
1147        .route("/admin/paircode/new", post(handle_admin_paircode_new))
1148        // ── Existing routes ──
1149        .route("/health", get(handle_health))
1150        .route("/metrics", get(handle_metrics))
1151        .route("/pair", post(handle_pair))
1152        .route("/pair/code", get(handle_pair_code))
1153        .route("/webhook", post(handle_webhook))
1154        .route("/whatsapp", get(handle_whatsapp_verify))
1155        .route("/whatsapp", post(handle_whatsapp_message))
1156        .route("/linq", post(handle_linq_webhook))
1157        .route("/wati", get(handle_wati_verify))
1158        .route("/wati", post(handle_wati_webhook))
1159        .route("/nextcloud-talk", post(handle_nextcloud_talk_webhook))
1160        .route("/webhook/gmail", post(handle_gmail_push_webhook))
1161        // ── Claude Code runner hooks ──
1162        .route("/hooks/claude-code", post(api::handle_claude_code_hook))
1163        // ── Web Dashboard API routes ──
1164        .route("/api/status", get(api::handle_api_status))
1165        .route("/api/config", get(api::handle_api_config_get))
1166        .route("/api/tools", get(api::handle_api_tools))
1167        .route("/api/cron", get(api::handle_api_cron_list))
1168        .route("/api/cron", post(api::handle_api_cron_add))
1169        .route(
1170            "/api/cron/settings",
1171            get(api::handle_api_cron_settings_get).patch(api::handle_api_cron_settings_patch),
1172        )
1173        .route(
1174            "/api/cron/{id}",
1175            delete(api::handle_api_cron_delete).patch(api::handle_api_cron_patch),
1176        )
1177        .route("/api/cron/{id}/runs", get(api::handle_api_cron_runs))
1178        .route("/api/integrations", get(api::handle_api_integrations))
1179        .route(
1180            "/api/integrations/settings",
1181            get(api::handle_api_integrations_settings),
1182        )
1183        .route(
1184            "/api/doctor",
1185            get(api::handle_api_doctor).post(api::handle_api_doctor),
1186        )
1187        // Old /api/memory CRUD removed — use Kumiho via /api/memory/graph instead.
1188        .route("/api/cost", get(api::handle_api_cost))
1189        .route("/api/audit", get(api::handle_api_audit))
1190        .route("/api/audit/verify", get(api::handle_api_audit_verify))
1191        .route("/api/cli-tools", get(api::handle_api_cli_tools))
1192        .route("/api/health", get(api::handle_api_health))
1193        .route("/api/mcp/discovery", get(api_mcp::handle_api_mcp_discovery))
1194        .route("/api/mcp/servers/test", post(api_mcp::handle_api_mcp_servers_test))
1195        // ── MCP HTTP reverse-proxy (browser stays same-origin) ──
1196        .route("/api/mcp/health", get(api_mcp::handle_api_mcp_health))
1197        .route("/api/mcp/session", post(api_mcp::handle_api_mcp_session_create))
1198        .route(
1199            "/api/mcp/session/{session_id}/events",
1200            get(api_mcp::handle_api_mcp_session_events),
1201        )
1202        .route("/api/mcp/call", post(api_mcp::handle_api_mcp_call))
1203        .route("/api/nodes", get(api::handle_api_nodes))
1204        .route("/api/nodes/{node_id}/invoke", post(api::handle_api_node_invoke))
1205        .route("/api/sessions", get(api::handle_api_sessions_list))
1206        .route("/api/sessions/running", get(api::handle_api_sessions_running))
1207        .route(
1208            "/api/sessions/{id}/messages",
1209            get(api::handle_api_session_messages),
1210        )
1211        .route("/api/sessions/{id}", delete(api::handle_api_session_delete).put(api::handle_api_session_rename))
1212        .route("/api/sessions/{id}/state", get(api::handle_api_session_state))
1213        // ── Channel detail API ──
1214        .route("/api/channels", get(api::handle_api_channels))
1215        .route("/api/channel-events", post(api::handle_api_channel_events))
1216        // ── Agent management API (proxied to Kumiho FastAPI) ──
1217        .route("/api/agents", get(api_agents::handle_list_agents).post(api_agents::handle_create_agent))
1218        .route("/api/agents/deprecate", post(api_agents::handle_deprecate_agent))
1219        .route("/api/agents/{*kref}", put(api_agents::handle_update_agent).delete(api_agents::handle_delete_agent))
1220        // ── Skill management API (proxied to Kumiho FastAPI) ──
1221        .route("/api/skills", get(api_skills::handle_list_skills).post(api_skills::handle_create_skill))
1222        .route("/api/skills/deprecate", post(api_skills::handle_deprecate_skill))
1223        .route("/api/skills/{*kref}", get(api_skills::handle_get_skill).put(api_skills::handle_update_skill).delete(api_skills::handle_delete_skill))
1224        // ── Team management API (proxied to Kumiho FastAPI) ──
1225        .route("/api/teams", get(api_teams::handle_list_teams).post(api_teams::handle_create_team))
1226        .route("/api/teams/deprecate", post(api_teams::handle_deprecate_team))
1227        .route("/api/teams/{*kref}", get(api_teams::handle_get_team).put(api_teams::handle_update_team).delete(api_teams::handle_delete_team))
1228        // ── Workflow management API (proxied to Kumiho FastAPI) ──
1229        .route("/api/workflows", get(api_workflows::handle_list_workflows).post(api_workflows::handle_create_workflow))
1230        .route("/api/workflows/deprecate", post(api_workflows::handle_deprecate_workflow))
1231        .route("/api/workflows/run/{name}", post(api_workflows::handle_run_workflow))
1232        .route("/api/workflows/revisions/{*kref}", get(api_workflows::handle_get_workflow_by_revision))
1233        .route("/api/workflows/{*kref}", put(api_workflows::handle_update_workflow).delete(api_workflows::handle_delete_workflow))
1234        .route("/api/workflows/runs", get(api_workflows::handle_list_workflow_runs))
1235        .route("/api/workflows/runs/{run_id}", get(api_workflows::handle_get_workflow_run).delete(api_workflows::handle_delete_workflow_run))
1236        .route("/api/workflows/runs/{run_id}/approve", post(api_workflows::handle_approve_workflow_run))
1237        .route("/api/workflows/runs/{run_id}/retry", post(api_workflows::handle_retry_workflow_run))
1238        .route("/api/workflows/agent-activity/{agent_id}", get(api_workflows::handle_agent_activity))
1239        .route("/api/workflows/dashboard", get(api_workflows::handle_workflow_dashboard))
1240        // ── ClawHub marketplace API ──
1241        .route("/api/clawhub/search", get(api_clawhub::handle_clawhub_search))
1242        .route("/api/clawhub/trending", get(api_clawhub::handle_clawhub_trending))
1243        .route("/api/clawhub/skills/{slug}", get(api_clawhub::handle_clawhub_skill_detail))
1244        .route("/api/clawhub/install/{slug}", post(api_clawhub::handle_clawhub_install))
1245        // NOTE: Memory graph route is merged separately with its own 60s timeout
1246        // ── Generic Kumiho API proxy (for Asset Browser, Memory Auditor, etc.) ──
1247        .route("/api/kumiho/{*path}", get(api_kumiho_proxy::handle_kumiho_proxy))
1248        // ── Pairing + Device management API ──
1249        .route("/api/pairing/initiate", post(api_pairing::initiate_pairing))
1250        .route("/api/pair", post(api_pairing::submit_pairing_enhanced))
1251        .route("/api/devices", get(api_pairing::list_devices))
1252        .route("/api/devices/{id}", delete(api_pairing::revoke_device))
1253        .route(
1254            "/api/devices/{id}/token/rotate",
1255            post(api_pairing::rotate_token),
1256        )
1257        // ── Live Canvas (A2UI) routes ──
1258        .route("/api/canvas", get(canvas::handle_canvas_list))
1259        .route(
1260            "/api/canvas/{id}",
1261            get(canvas::handle_canvas_get)
1262                .post(canvas::handle_canvas_post)
1263                .delete(canvas::handle_canvas_clear),
1264        )
1265        .route(
1266            "/api/canvas/{id}/history",
1267            get(canvas::handle_canvas_history),
1268        );
1269
1270    // ── WebAuthn hardware key authentication API (requires webauthn feature) ──
1271    #[cfg(feature = "webauthn")]
1272    let inner = inner
1273        .route(
1274            "/api/webauthn/register/start",
1275            post(api_webauthn::handle_register_start),
1276        )
1277        .route(
1278            "/api/webauthn/register/finish",
1279            post(api_webauthn::handle_register_finish),
1280        )
1281        .route(
1282            "/api/webauthn/auth/start",
1283            post(api_webauthn::handle_auth_start),
1284        )
1285        .route(
1286            "/api/webauthn/auth/finish",
1287            post(api_webauthn::handle_auth_finish),
1288        )
1289        .route(
1290            "/api/webauthn/credentials",
1291            get(api_webauthn::handle_list_credentials),
1292        )
1293        .route(
1294            "/api/webauthn/credentials/{id}",
1295            delete(api_webauthn::handle_delete_credential),
1296        );
1297
1298    // ── Plugin management API (requires plugins-wasm feature) ──
1299    #[cfg(feature = "plugins-wasm")]
1300    let inner = inner.route(
1301        "/api/plugins",
1302        get(api_plugins::plugin_routes::list_plugins),
1303    );
1304
1305    let inner = inner
1306        // ── SSE event stream ──
1307        .route("/api/events", get(sse::handle_sse_events))
1308        .route("/api/daemon/logs", get(sse::handle_api_daemon_logs))
1309        // ── WebSocket agent chat ──
1310        .route("/ws/chat", get(ws::handle_ws_chat))
1311        // ── WebSocket canvas updates ──
1312        .route("/ws/canvas/{id}", get(canvas::handle_ws_canvas))
1313        // ── WebSocket node discovery ──
1314        .route("/ws/nodes", get(nodes::handle_ws_nodes))
1315        // ── WebSocket PTY terminal ──
1316        .route("/ws/terminal", get(terminal::handle_ws_terminal))
1317        // ── WebSocket proxy onto the in-process MCP server's session events ──
1318        .route("/ws/mcp/events", get(ws_mcp_events::handle_ws_mcp_events))
1319        // ── Static assets (web dashboard) ──
1320        .route("/_app/{*path}", get(static_files::handle_static))
1321        // ── Config PUT with larger body limit ──
1322        .merge(config_put_router)
1323        // ── SPA fallback: non-API GET requests serve index.html ──
1324        .fallback(get(static_files::handle_spa_fallback))
1325        .with_state(state)
1326        .layer(RequestBodyLimitLayer::new(MAX_BODY_SIZE))
1327        .layer(TimeoutLayer::with_status_code(
1328            StatusCode::REQUEST_TIMEOUT,
1329            Duration::from_secs(gateway_request_timeout_secs()),
1330        ));
1331
1332    // Merge memory graph router (has its own 60s timeout, outside the global 30s)
1333    let inner = inner.merge(memory_graph_router);
1334
1335    // Nest under path prefix when configured (axum strips prefix before routing).
1336    // nest() at "/prefix" handles both "/prefix" and "/prefix/*" but not "/prefix/"
1337    // with a trailing slash, so we add a fallback redirect for that case.
1338    let app = if let Some(prefix) = path_prefix {
1339        let redirect_target = prefix.to_string();
1340        Router::new().nest(prefix, inner).route(
1341            &format!("{prefix}/"),
1342            get(|| async move { axum::response::Redirect::permanent(&redirect_target) }),
1343        )
1344    } else {
1345        inner
1346    };
1347
1348    // ── TLS / mTLS setup ───────────────────────────────────────────
1349    let tls_acceptor = match &config.gateway.tls {
1350        Some(tls_cfg) if tls_cfg.enabled => {
1351            let has_mtls = tls_cfg.client_auth.as_ref().is_some_and(|ca| ca.enabled);
1352            if has_mtls {
1353                tracing::info!("TLS enabled with mutual TLS (mTLS) client verification");
1354            } else {
1355                tracing::info!("TLS enabled (no client certificate requirement)");
1356            }
1357            Some(tls::build_tls_acceptor(tls_cfg)?)
1358        }
1359        _ => None,
1360    };
1361
1362    if let Some(tls_acceptor) = tls_acceptor {
1363        // Manual TLS accept loop — serves each connection via hyper.
1364        let app = app.into_make_service_with_connect_info::<SocketAddr>();
1365        let mut app = app;
1366
1367        let mut shutdown_signal = shutdown_rx;
1368        loop {
1369            tokio::select! {
1370                conn = listener.accept() => {
1371                    let (tcp_stream, remote_addr) = conn?;
1372                    let tls_acceptor = tls_acceptor.clone();
1373                    let svc = tower::MakeService::<
1374                        SocketAddr,
1375                        hyper::Request<hyper::body::Incoming>,
1376                    >::make_service(&mut app, remote_addr)
1377                    .await
1378                    .expect("infallible make_service");
1379
1380                    tokio::spawn(async move {
1381                        let tls_stream = match tls_acceptor.accept(tcp_stream).await {
1382                            Ok(s) => s,
1383                            Err(e) => {
1384                                tracing::debug!("TLS handshake failed from {remote_addr}: {e}");
1385                                return;
1386                            }
1387                        };
1388                        let io = hyper_util::rt::TokioIo::new(tls_stream);
1389                        let hyper_svc = hyper::service::service_fn(move |req: hyper::Request<hyper::body::Incoming>| {
1390                            let mut svc = svc.clone();
1391                            async move {
1392                                tower::Service::call(&mut svc, req).await
1393                            }
1394                        });
1395                        if let Err(e) = hyper_util::server::conn::auto::Builder::new(
1396                            hyper_util::rt::TokioExecutor::new(),
1397                        )
1398                        .serve_connection(io, hyper_svc)
1399                        .await
1400                        {
1401                            tracing::debug!("connection error from {remote_addr}: {e}");
1402                        }
1403                    });
1404                }
1405                _ = shutdown_signal.changed() => {
1406                    tracing::info!("🦀 Construct Gateway shutting down...");
1407                    break;
1408                }
1409            }
1410        }
1411    } else {
1412        // Plain TCP — use axum's built-in serve.
1413        axum::serve(
1414            listener,
1415            app.into_make_service_with_connect_info::<SocketAddr>(),
1416        )
1417        .with_graceful_shutdown(async move {
1418            let _ = shutdown_rx.changed().await;
1419            tracing::info!("🦀 Construct Gateway shutting down...");
1420        })
1421        .await?;
1422    }
1423
1424    // Wait for the in-process MCP task to finish its own graceful shutdown.
1425    // It watches the same `shutdown_tx` we just flipped above.
1426    if let Some(task) = mcp_task {
1427        let _ = task.await;
1428    }
1429
1430    Ok(())
1431}
1432
1433// ══════════════════════════════════════════════════════════════════════════════
1434// AXUM HANDLERS
1435// ══════════════════════════════════════════════════════════════════════════════
1436
1437/// GET /health — always public (no secrets leaked)
1438async fn handle_health(State(state): State<AppState>) -> impl IntoResponse {
1439    let body = serde_json::json!({
1440        "status": "ok",
1441        "paired": state.pairing.is_paired(),
1442        "require_pairing": state.pairing.require_pairing(),
1443        "runtime": crate::health::snapshot_json(),
1444    });
1445    Json(body)
1446}
1447
1448/// Prometheus content type for text exposition format.
1449const PROMETHEUS_CONTENT_TYPE: &str = "text/plain; version=0.0.4; charset=utf-8";
1450
1451fn prometheus_disabled_hint() -> String {
1452    String::from(
1453        "# Prometheus backend not enabled. Set [observability] backend = \"prometheus\" in config.\n",
1454    )
1455}
1456
1457#[cfg(feature = "observability-prometheus")]
1458fn prometheus_observer_from_state(
1459    observer: &dyn crate::observability::Observer,
1460) -> Option<&crate::observability::PrometheusObserver> {
1461    observer
1462        .as_any()
1463        .downcast_ref::<crate::observability::PrometheusObserver>()
1464        .or_else(|| {
1465            observer
1466                .as_any()
1467                .downcast_ref::<sse::BroadcastObserver>()
1468                .and_then(|broadcast| {
1469                    broadcast
1470                        .inner()
1471                        .as_any()
1472                        .downcast_ref::<crate::observability::PrometheusObserver>()
1473                })
1474        })
1475}
1476
1477/// GET /metrics — Prometheus text exposition format
1478async fn handle_metrics(State(state): State<AppState>) -> impl IntoResponse {
1479    let body = {
1480        #[cfg(feature = "observability-prometheus")]
1481        {
1482            if let Some(prom) = prometheus_observer_from_state(state.observer.as_ref()) {
1483                prom.encode()
1484            } else {
1485                prometheus_disabled_hint()
1486            }
1487        }
1488        #[cfg(not(feature = "observability-prometheus"))]
1489        {
1490            let _ = &state;
1491            prometheus_disabled_hint()
1492        }
1493    };
1494
1495    (
1496        StatusCode::OK,
1497        [(header::CONTENT_TYPE, PROMETHEUS_CONTENT_TYPE)],
1498        body,
1499    )
1500}
1501
1502/// POST /pair — exchange one-time code for bearer token
1503#[axum::debug_handler]
1504async fn handle_pair(
1505    State(state): State<AppState>,
1506    ConnectInfo(peer_addr): ConnectInfo<SocketAddr>,
1507    headers: HeaderMap,
1508) -> impl IntoResponse {
1509    let rate_key =
1510        client_key_from_request(Some(peer_addr), &headers, state.trust_forwarded_headers);
1511    let peer_is_loopback = peer_addr.ip().is_loopback();
1512    if !state.rate_limiter.allow_pair(&rate_key) {
1513        tracing::warn!("/pair rate limit exceeded");
1514        let err = serde_json::json!({
1515            "error": "Too many pairing requests. Please retry later.",
1516            "retry_after": RATE_LIMIT_WINDOW_SECS,
1517        });
1518        return (StatusCode::TOO_MANY_REQUESTS, Json(err));
1519    }
1520
1521    // ── Auth rate limiting (brute-force protection) ──
1522    if let Err(e) = state.auth_limiter.check_rate_limit(&rate_key, peer_is_loopback) {
1523        tracing::warn!("🔐 Pairing auth rate limit exceeded for {rate_key}");
1524        let err = serde_json::json!({
1525            "error": format!("Too many auth attempts. Try again in {}s.", e.retry_after_secs),
1526            "retry_after": e.retry_after_secs,
1527        });
1528        return (StatusCode::TOO_MANY_REQUESTS, Json(err));
1529    }
1530
1531    let code = headers
1532        .get("X-Pairing-Code")
1533        .and_then(|v| v.to_str().ok())
1534        .unwrap_or("");
1535
1536    match state.pairing.try_pair(code, &rate_key).await {
1537        Ok(Some(token)) => {
1538            tracing::info!("🔐 New client paired successfully");
1539            if let Some(ref logger) = state.audit_logger {
1540                let _ =
1541                    logger.log_auth_success("gateway", &format!("Client paired from {rate_key}"));
1542            }
1543            if let Err(err) =
1544                Box::pin(persist_pairing_tokens(state.config.clone(), &state.pairing)).await
1545            {
1546                tracing::error!("🔐 Pairing succeeded but token persistence failed: {err:#}");
1547                let body = serde_json::json!({
1548                    "paired": true,
1549                    "persisted": false,
1550                    "token": token,
1551                    "message": "Paired for this process, but failed to persist token to config.toml. Check config path and write permissions.",
1552                });
1553                return (StatusCode::OK, Json(body));
1554            }
1555
1556            let body = serde_json::json!({
1557                "paired": true,
1558                "persisted": true,
1559                "token": token,
1560                "message": "Save this token — use it as Authorization: Bearer <token>"
1561            });
1562            (StatusCode::OK, Json(body))
1563        }
1564        Ok(None) => {
1565            state.auth_limiter.record_attempt(&rate_key, peer_is_loopback);
1566            tracing::warn!("🔐 Pairing attempt with invalid code");
1567            if let Some(ref logger) = state.audit_logger {
1568                let _ = logger
1569                    .log_auth_failure("gateway", &format!("Invalid pairing code from {rate_key}"));
1570            }
1571            let err = serde_json::json!({"error": "Invalid pairing code"});
1572            (StatusCode::FORBIDDEN, Json(err))
1573        }
1574        Err(lockout_secs) => {
1575            tracing::warn!(
1576                "🔐 Pairing locked out — too many failed attempts ({lockout_secs}s remaining)"
1577            );
1578            if let Some(ref logger) = state.audit_logger {
1579                let _ = logger.log_auth_failure(
1580                    "gateway",
1581                    &format!("Pairing lockout for {rate_key} ({lockout_secs}s)"),
1582                );
1583            }
1584            let err = serde_json::json!({
1585                "error": format!("Too many failed attempts. Try again in {lockout_secs}s."),
1586                "retry_after": lockout_secs
1587            });
1588            (StatusCode::TOO_MANY_REQUESTS, Json(err))
1589        }
1590    }
1591}
1592
1593async fn persist_pairing_tokens(config: Arc<Mutex<Config>>, pairing: &PairingGuard) -> Result<()> {
1594    let paired_tokens = pairing.tokens();
1595    // This is needed because parking_lot's guard is not Send so we clone the inner
1596    // this should be removed once async mutexes are used everywhere
1597    let mut updated_cfg = { config.lock().clone() };
1598    updated_cfg.gateway.paired_tokens = paired_tokens;
1599    updated_cfg
1600        .save()
1601        .await
1602        .context("Failed to persist paired tokens to config.toml")?;
1603
1604    // Keep shared runtime config in sync with persisted tokens.
1605    *config.lock() = updated_cfg;
1606    Ok(())
1607}
1608
1609/// Simple chat for webhook endpoint (no tools, for backward compatibility and testing).
1610async fn run_gateway_chat_simple(state: &AppState, message: &str) -> anyhow::Result<String> {
1611    let user_messages = vec![ChatMessage::user(message)];
1612
1613    // Keep webhook/gateway prompts aligned with channel behavior by injecting
1614    // workspace-aware system context before model invocation.
1615    let system_prompt = {
1616        let config_guard = state.config.lock();
1617        crate::channels::build_system_prompt(
1618            &config_guard.workspace_dir,
1619            &state.model,
1620            &[], // tools - empty for simple chat
1621            &[], // skills
1622            Some(&config_guard.identity),
1623            None, // bootstrap_max_chars - use default
1624        )
1625    };
1626
1627    let mut messages = Vec::with_capacity(1 + user_messages.len());
1628    messages.push(ChatMessage::system(system_prompt));
1629    messages.extend(user_messages);
1630
1631    let multimodal_config = state.config.lock().multimodal.clone();
1632    let prepared =
1633        crate::multimodal::prepare_messages_for_provider(&messages, &multimodal_config).await?;
1634
1635    state
1636        .provider
1637        .chat_with_history(&prepared.messages, &state.model, state.temperature)
1638        .await
1639}
1640
1641/// Full-featured chat with tools for channel handlers (WhatsApp, Linq, Nextcloud Talk).
1642async fn run_gateway_chat_with_tools(
1643    state: &AppState,
1644    message: &str,
1645    session_id: Option<&str>,
1646) -> anyhow::Result<String> {
1647    let config = state.config.lock().clone();
1648    Box::pin(crate::agent::process_message(config, message, session_id)).await
1649}
1650
1651/// Webhook request body
1652#[derive(serde::Deserialize)]
1653pub struct WebhookBody {
1654    pub message: String,
1655}
1656
1657/// POST /webhook — main webhook endpoint
1658async fn handle_webhook(
1659    State(state): State<AppState>,
1660    ConnectInfo(peer_addr): ConnectInfo<SocketAddr>,
1661    headers: HeaderMap,
1662    body: Result<Json<WebhookBody>, axum::extract::rejection::JsonRejection>,
1663) -> impl IntoResponse {
1664    let rate_key =
1665        client_key_from_request(Some(peer_addr), &headers, state.trust_forwarded_headers);
1666    let peer_is_loopback = peer_addr.ip().is_loopback();
1667    if !state.rate_limiter.allow_webhook(&rate_key) {
1668        tracing::warn!("/webhook rate limit exceeded");
1669        let err = serde_json::json!({
1670            "error": "Too many webhook requests. Please retry later.",
1671            "retry_after": RATE_LIMIT_WINDOW_SECS,
1672        });
1673        return (StatusCode::TOO_MANY_REQUESTS, Json(err));
1674    }
1675
1676    // ── Bearer token auth (pairing) with auth rate limiting ──
1677    if state.pairing.require_pairing() {
1678        if let Err(e) = state.auth_limiter.check_rate_limit(&rate_key, peer_is_loopback) {
1679            tracing::warn!("Webhook: auth rate limit exceeded for {rate_key}");
1680            let err = serde_json::json!({
1681                "error": format!("Too many auth attempts. Try again in {}s.", e.retry_after_secs),
1682                "retry_after": e.retry_after_secs,
1683            });
1684            return (StatusCode::TOO_MANY_REQUESTS, Json(err));
1685        }
1686        let auth = headers
1687            .get(header::AUTHORIZATION)
1688            .and_then(|v| v.to_str().ok())
1689            .unwrap_or("");
1690        let token = auth.strip_prefix("Bearer ").unwrap_or("");
1691        if !state.pairing.is_authenticated(token) {
1692            state.auth_limiter.record_attempt(&rate_key, peer_is_loopback);
1693            tracing::warn!("Webhook: rejected — not paired / invalid bearer token");
1694            let err = serde_json::json!({
1695                "error": "Unauthorized — pair first via POST /pair, then send Authorization: Bearer <token>"
1696            });
1697            return (StatusCode::UNAUTHORIZED, Json(err));
1698        }
1699    }
1700
1701    // ── Webhook secret auth (optional, additional layer) ──
1702    if let Some(ref secret_hash) = state.webhook_secret_hash {
1703        let header_hash = headers
1704            .get("X-Webhook-Secret")
1705            .and_then(|v| v.to_str().ok())
1706            .map(str::trim)
1707            .filter(|value| !value.is_empty())
1708            .map(hash_webhook_secret);
1709        match header_hash {
1710            Some(val) if constant_time_eq(&val, secret_hash.as_ref()) => {}
1711            _ => {
1712                tracing::warn!("Webhook: rejected request — invalid or missing X-Webhook-Secret");
1713                let err = serde_json::json!({"error": "Unauthorized — invalid or missing X-Webhook-Secret header"});
1714                return (StatusCode::UNAUTHORIZED, Json(err));
1715            }
1716        }
1717    }
1718
1719    // ── Parse body ──
1720    let Json(webhook_body) = match body {
1721        Ok(b) => b,
1722        Err(e) => {
1723            tracing::warn!("Webhook JSON parse error: {e}");
1724            let err = serde_json::json!({
1725                "error": "Invalid JSON body. Expected: {\"message\": \"...\"}"
1726            });
1727            return (StatusCode::BAD_REQUEST, Json(err));
1728        }
1729    };
1730
1731    // ── Idempotency (optional) ──
1732    if let Some(idempotency_key) = headers
1733        .get("X-Idempotency-Key")
1734        .and_then(|v| v.to_str().ok())
1735        .map(str::trim)
1736        .filter(|value| !value.is_empty())
1737    {
1738        if !state.idempotency_store.record_if_new(idempotency_key) {
1739            tracing::info!("Webhook duplicate ignored (idempotency key: {idempotency_key})");
1740            let body = serde_json::json!({
1741                "status": "duplicate",
1742                "idempotent": true,
1743                "message": "Request already processed for this idempotency key"
1744            });
1745            return (StatusCode::OK, Json(body));
1746        }
1747    }
1748
1749    let message = &webhook_body.message;
1750    let session_id = webhook_session_id(&headers);
1751
1752    if state.auto_save && !memory::should_skip_autosave_content(message) {
1753        let key = webhook_memory_key();
1754        let _ = state
1755            .mem
1756            .store(
1757                &key,
1758                message,
1759                MemoryCategory::Conversation,
1760                session_id.as_deref(),
1761            )
1762            .await;
1763    }
1764
1765    let provider_label = state
1766        .config
1767        .lock()
1768        .default_provider
1769        .clone()
1770        .unwrap_or_else(|| "unknown".to_string());
1771    let model_label = state.model.clone();
1772    let started_at = Instant::now();
1773
1774    state
1775        .observer
1776        .record_event(&crate::observability::ObserverEvent::AgentStart {
1777            provider: provider_label.clone(),
1778            model: model_label.clone(),
1779        });
1780    state
1781        .observer
1782        .record_event(&crate::observability::ObserverEvent::LlmRequest {
1783            provider: provider_label.clone(),
1784            model: model_label.clone(),
1785            messages_count: 1,
1786        });
1787
1788    match run_gateway_chat_simple(&state, message).await {
1789        Ok(response) => {
1790            let duration = started_at.elapsed();
1791            state
1792                .observer
1793                .record_event(&crate::observability::ObserverEvent::LlmResponse {
1794                    provider: provider_label.clone(),
1795                    model: model_label.clone(),
1796                    duration,
1797                    success: true,
1798                    error_message: None,
1799                    input_tokens: None,
1800                    output_tokens: None,
1801                });
1802            state.observer.record_metric(
1803                &crate::observability::traits::ObserverMetric::RequestLatency(duration),
1804            );
1805            state
1806                .observer
1807                .record_event(&crate::observability::ObserverEvent::AgentEnd {
1808                    provider: provider_label,
1809                    model: model_label,
1810                    duration,
1811                    tokens_used: None,
1812                    cost_usd: None,
1813                });
1814
1815            let body = serde_json::json!({"response": response, "model": state.model});
1816            (StatusCode::OK, Json(body))
1817        }
1818        Err(e) => {
1819            let duration = started_at.elapsed();
1820            let sanitized = providers::sanitize_api_error(&e.to_string());
1821
1822            state
1823                .observer
1824                .record_event(&crate::observability::ObserverEvent::LlmResponse {
1825                    provider: provider_label.clone(),
1826                    model: model_label.clone(),
1827                    duration,
1828                    success: false,
1829                    error_message: Some(sanitized.clone()),
1830                    input_tokens: None,
1831                    output_tokens: None,
1832                });
1833            state.observer.record_metric(
1834                &crate::observability::traits::ObserverMetric::RequestLatency(duration),
1835            );
1836            state
1837                .observer
1838                .record_event(&crate::observability::ObserverEvent::Error {
1839                    component: "gateway".to_string(),
1840                    message: sanitized.clone(),
1841                });
1842            state
1843                .observer
1844                .record_event(&crate::observability::ObserverEvent::AgentEnd {
1845                    provider: provider_label,
1846                    model: model_label,
1847                    duration,
1848                    tokens_used: None,
1849                    cost_usd: None,
1850                });
1851
1852            tracing::error!("Webhook provider error: {}", sanitized);
1853            let err = serde_json::json!({"error": "LLM request failed"});
1854            (StatusCode::INTERNAL_SERVER_ERROR, Json(err))
1855        }
1856    }
1857}
1858
1859/// `WhatsApp` verification query params
1860#[derive(serde::Deserialize)]
1861pub struct WhatsAppVerifyQuery {
1862    #[serde(rename = "hub.mode")]
1863    pub mode: Option<String>,
1864    #[serde(rename = "hub.verify_token")]
1865    pub verify_token: Option<String>,
1866    #[serde(rename = "hub.challenge")]
1867    pub challenge: Option<String>,
1868}
1869
1870/// GET /whatsapp — Meta webhook verification
1871async fn handle_whatsapp_verify(
1872    State(state): State<AppState>,
1873    Query(params): Query<WhatsAppVerifyQuery>,
1874) -> impl IntoResponse {
1875    let Some(ref wa) = state.whatsapp else {
1876        return (StatusCode::NOT_FOUND, "WhatsApp not configured".to_string());
1877    };
1878
1879    // Verify the token matches (constant-time comparison to prevent timing attacks)
1880    let token_matches = params
1881        .verify_token
1882        .as_deref()
1883        .is_some_and(|t| constant_time_eq(t, wa.verify_token()));
1884    if params.mode.as_deref() == Some("subscribe") && token_matches {
1885        if let Some(ch) = params.challenge {
1886            tracing::info!("WhatsApp webhook verified successfully");
1887            return (StatusCode::OK, ch);
1888        }
1889        return (StatusCode::BAD_REQUEST, "Missing hub.challenge".to_string());
1890    }
1891
1892    tracing::warn!("WhatsApp webhook verification failed — token mismatch");
1893    (StatusCode::FORBIDDEN, "Forbidden".to_string())
1894}
1895
1896/// Verify `WhatsApp` webhook signature (`X-Hub-Signature-256`).
1897/// Returns true if the signature is valid, false otherwise.
1898/// See: <https://developers.facebook.com/docs/graph-api/webhooks/getting-started#verification-requests>
1899pub fn verify_whatsapp_signature(app_secret: &str, body: &[u8], signature_header: &str) -> bool {
1900    use hmac::{Hmac, Mac};
1901    use sha2::Sha256;
1902
1903    // Signature format: "sha256=<hex_signature>"
1904    let Some(hex_sig) = signature_header.strip_prefix("sha256=") else {
1905        return false;
1906    };
1907
1908    // Decode hex signature
1909    let Ok(expected) = hex::decode(hex_sig) else {
1910        return false;
1911    };
1912
1913    // Compute HMAC-SHA256
1914    let Ok(mut mac) = Hmac::<Sha256>::new_from_slice(app_secret.as_bytes()) else {
1915        return false;
1916    };
1917    mac.update(body);
1918
1919    // Constant-time comparison
1920    mac.verify_slice(&expected).is_ok()
1921}
1922
1923/// POST /whatsapp — incoming message webhook
1924async fn handle_whatsapp_message(
1925    State(state): State<AppState>,
1926    headers: HeaderMap,
1927    body: Bytes,
1928) -> impl IntoResponse {
1929    let Some(ref wa) = state.whatsapp else {
1930        return (
1931            StatusCode::NOT_FOUND,
1932            Json(serde_json::json!({"error": "WhatsApp not configured"})),
1933        );
1934    };
1935
1936    // ── Security: Verify X-Hub-Signature-256 if app_secret is configured ──
1937    if let Some(ref app_secret) = state.whatsapp_app_secret {
1938        let signature = headers
1939            .get("X-Hub-Signature-256")
1940            .and_then(|v| v.to_str().ok())
1941            .unwrap_or("");
1942
1943        if !verify_whatsapp_signature(app_secret, &body, signature) {
1944            tracing::warn!(
1945                "WhatsApp webhook signature verification failed (signature: {})",
1946                if signature.is_empty() {
1947                    "missing"
1948                } else {
1949                    "invalid"
1950                }
1951            );
1952            return (
1953                StatusCode::UNAUTHORIZED,
1954                Json(serde_json::json!({"error": "Invalid signature"})),
1955            );
1956        }
1957    }
1958
1959    // Parse JSON body
1960    let Ok(payload) = serde_json::from_slice::<serde_json::Value>(&body) else {
1961        return (
1962            StatusCode::BAD_REQUEST,
1963            Json(serde_json::json!({"error": "Invalid JSON payload"})),
1964        );
1965    };
1966
1967    // Parse messages from the webhook payload
1968    let messages = wa.parse_webhook_payload(&payload);
1969
1970    if messages.is_empty() {
1971        // Acknowledge the webhook even if no messages (could be status updates)
1972        return (StatusCode::OK, Json(serde_json::json!({"status": "ok"})));
1973    }
1974
1975    // Process each message
1976    for msg in &messages {
1977        tracing::info!(
1978            "WhatsApp message from {}: {}",
1979            msg.sender,
1980            truncate_with_ellipsis(&msg.content, 50)
1981        );
1982        let session_id = sender_session_id("whatsapp", msg);
1983
1984        // Auto-save to memory
1985        if state.auto_save && !memory::should_skip_autosave_content(&msg.content) {
1986            let key = whatsapp_memory_key(msg);
1987            let _ = state
1988                .mem
1989                .store(
1990                    &key,
1991                    &msg.content,
1992                    MemoryCategory::Conversation,
1993                    Some(&session_id),
1994                )
1995                .await;
1996        }
1997
1998        match Box::pin(run_gateway_chat_with_tools(
1999            &state,
2000            &msg.content,
2001            Some(&session_id),
2002        ))
2003        .await
2004        {
2005            Ok(response) => {
2006                // Send reply via WhatsApp
2007                if let Err(e) = wa
2008                    .send(&SendMessage::new(response, &msg.reply_target))
2009                    .await
2010                {
2011                    tracing::error!("Failed to send WhatsApp reply: {e}");
2012                }
2013            }
2014            Err(e) => {
2015                tracing::error!("LLM error for WhatsApp message: {e:#}");
2016                let _ = wa
2017                    .send(&SendMessage::new(
2018                        "Sorry, I couldn't process your message right now.",
2019                        &msg.reply_target,
2020                    ))
2021                    .await;
2022            }
2023        }
2024    }
2025
2026    // Acknowledge the webhook
2027    (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
2028}
2029
2030/// POST /linq — incoming message webhook (iMessage/RCS/SMS via Linq)
2031async fn handle_linq_webhook(
2032    State(state): State<AppState>,
2033    headers: HeaderMap,
2034    body: Bytes,
2035) -> impl IntoResponse {
2036    let Some(ref linq) = state.linq else {
2037        return (
2038            StatusCode::NOT_FOUND,
2039            Json(serde_json::json!({"error": "Linq not configured"})),
2040        );
2041    };
2042
2043    let body_str = String::from_utf8_lossy(&body);
2044
2045    // ── Security: Verify X-Webhook-Signature if signing_secret is configured ──
2046    if let Some(ref signing_secret) = state.linq_signing_secret {
2047        let timestamp = headers
2048            .get("X-Webhook-Timestamp")
2049            .and_then(|v| v.to_str().ok())
2050            .unwrap_or("");
2051
2052        let signature = headers
2053            .get("X-Webhook-Signature")
2054            .and_then(|v| v.to_str().ok())
2055            .unwrap_or("");
2056
2057        if !crate::channels::linq::verify_linq_signature(
2058            signing_secret,
2059            &body_str,
2060            timestamp,
2061            signature,
2062        ) {
2063            tracing::warn!(
2064                "Linq webhook signature verification failed (signature: {})",
2065                if signature.is_empty() {
2066                    "missing"
2067                } else {
2068                    "invalid"
2069                }
2070            );
2071            return (
2072                StatusCode::UNAUTHORIZED,
2073                Json(serde_json::json!({"error": "Invalid signature"})),
2074            );
2075        }
2076    }
2077
2078    // Parse JSON body
2079    let Ok(payload) = serde_json::from_slice::<serde_json::Value>(&body) else {
2080        return (
2081            StatusCode::BAD_REQUEST,
2082            Json(serde_json::json!({"error": "Invalid JSON payload"})),
2083        );
2084    };
2085
2086    // Parse messages from the webhook payload
2087    let messages = linq.parse_webhook_payload(&payload);
2088
2089    if messages.is_empty() {
2090        // Acknowledge the webhook even if no messages (could be status/delivery events)
2091        return (StatusCode::OK, Json(serde_json::json!({"status": "ok"})));
2092    }
2093
2094    // Process each message
2095    for msg in &messages {
2096        tracing::info!(
2097            "Linq message from {}: {}",
2098            msg.sender,
2099            truncate_with_ellipsis(&msg.content, 50)
2100        );
2101        let session_id = sender_session_id("linq", msg);
2102
2103        // Auto-save to memory
2104        if state.auto_save && !memory::should_skip_autosave_content(&msg.content) {
2105            let key = linq_memory_key(msg);
2106            let _ = state
2107                .mem
2108                .store(
2109                    &key,
2110                    &msg.content,
2111                    MemoryCategory::Conversation,
2112                    Some(&session_id),
2113                )
2114                .await;
2115        }
2116
2117        // Call the LLM
2118        match Box::pin(run_gateway_chat_with_tools(
2119            &state,
2120            &msg.content,
2121            Some(&session_id),
2122        ))
2123        .await
2124        {
2125            Ok(response) => {
2126                // Send reply via Linq
2127                if let Err(e) = linq
2128                    .send(&SendMessage::new(response, &msg.reply_target))
2129                    .await
2130                {
2131                    tracing::error!("Failed to send Linq reply: {e}");
2132                }
2133            }
2134            Err(e) => {
2135                tracing::error!("LLM error for Linq message: {e:#}");
2136                let _ = linq
2137                    .send(&SendMessage::new(
2138                        "Sorry, I couldn't process your message right now.",
2139                        &msg.reply_target,
2140                    ))
2141                    .await;
2142            }
2143        }
2144    }
2145
2146    // Acknowledge the webhook
2147    (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
2148}
2149
2150/// GET /wati — WATI webhook verification (echoes hub.challenge)
2151async fn handle_wati_verify(
2152    State(state): State<AppState>,
2153    Query(params): Query<WatiVerifyQuery>,
2154) -> impl IntoResponse {
2155    if state.wati.is_none() {
2156        return (StatusCode::NOT_FOUND, "WATI not configured".to_string());
2157    }
2158
2159    // WATI may use Meta-style webhook verification; echo the challenge
2160    if let Some(challenge) = params.challenge {
2161        tracing::info!("WATI webhook verified successfully");
2162        return (StatusCode::OK, challenge);
2163    }
2164
2165    (StatusCode::BAD_REQUEST, "Missing hub.challenge".to_string())
2166}
2167
2168#[derive(Debug, serde::Deserialize)]
2169pub struct WatiVerifyQuery {
2170    #[serde(rename = "hub.challenge")]
2171    pub challenge: Option<String>,
2172}
2173
2174/// POST /wati — incoming WATI WhatsApp message webhook
2175async fn handle_wati_webhook(State(state): State<AppState>, body: Bytes) -> impl IntoResponse {
2176    let Some(ref wati) = state.wati else {
2177        return (
2178            StatusCode::NOT_FOUND,
2179            Json(serde_json::json!({"error": "WATI not configured"})),
2180        );
2181    };
2182
2183    // Parse JSON body
2184    let Ok(payload) = serde_json::from_slice::<serde_json::Value>(&body) else {
2185        return (
2186            StatusCode::BAD_REQUEST,
2187            Json(serde_json::json!({"error": "Invalid JSON payload"})),
2188        );
2189    };
2190
2191    // Detect audio before the synchronous parse
2192    let msg_type = payload.get("type").and_then(|v| v.as_str()).unwrap_or("");
2193
2194    let messages = if matches!(msg_type, "audio" | "voice") {
2195        // Build a synthetic ChannelMessage from the audio transcript
2196        if let Some(transcript) = wati.try_transcribe_audio(&payload).await {
2197            wati.parse_audio_as_message(&payload, transcript)
2198        } else {
2199            vec![]
2200        }
2201    } else {
2202        wati.parse_webhook_payload(&payload)
2203    };
2204
2205    if messages.is_empty() {
2206        return (StatusCode::OK, Json(serde_json::json!({"status": "ok"})));
2207    }
2208
2209    // Process each message
2210    for msg in &messages {
2211        tracing::info!(
2212            "WATI message from {}: {}",
2213            msg.sender,
2214            truncate_with_ellipsis(&msg.content, 50)
2215        );
2216        let session_id = sender_session_id("wati", msg);
2217
2218        // Auto-save to memory
2219        if state.auto_save && !memory::should_skip_autosave_content(&msg.content) {
2220            let key = wati_memory_key(msg);
2221            let _ = state
2222                .mem
2223                .store(
2224                    &key,
2225                    &msg.content,
2226                    MemoryCategory::Conversation,
2227                    Some(&session_id),
2228                )
2229                .await;
2230        }
2231
2232        // Call the LLM
2233        match Box::pin(run_gateway_chat_with_tools(
2234            &state,
2235            &msg.content,
2236            Some(&session_id),
2237        ))
2238        .await
2239        {
2240            Ok(response) => {
2241                // Send reply via WATI
2242                if let Err(e) = wati
2243                    .send(&SendMessage::new(response, &msg.reply_target))
2244                    .await
2245                {
2246                    tracing::error!("Failed to send WATI reply: {e}");
2247                }
2248            }
2249            Err(e) => {
2250                tracing::error!("LLM error for WATI message: {e:#}");
2251                let _ = wati
2252                    .send(&SendMessage::new(
2253                        "Sorry, I couldn't process your message right now.",
2254                        &msg.reply_target,
2255                    ))
2256                    .await;
2257            }
2258        }
2259    }
2260
2261    // Acknowledge the webhook
2262    (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
2263}
2264
2265/// POST /nextcloud-talk — incoming message webhook (Nextcloud Talk bot API)
2266async fn handle_nextcloud_talk_webhook(
2267    State(state): State<AppState>,
2268    headers: HeaderMap,
2269    body: Bytes,
2270) -> impl IntoResponse {
2271    let Some(ref nextcloud_talk) = state.nextcloud_talk else {
2272        return (
2273            StatusCode::NOT_FOUND,
2274            Json(serde_json::json!({"error": "Nextcloud Talk not configured"})),
2275        );
2276    };
2277
2278    let body_str = String::from_utf8_lossy(&body);
2279
2280    // ── Security: Verify Nextcloud Talk HMAC signature if secret is configured ──
2281    if let Some(ref webhook_secret) = state.nextcloud_talk_webhook_secret {
2282        let random = headers
2283            .get("X-Nextcloud-Talk-Random")
2284            .and_then(|v| v.to_str().ok())
2285            .unwrap_or("");
2286
2287        let signature = headers
2288            .get("X-Nextcloud-Talk-Signature")
2289            .and_then(|v| v.to_str().ok())
2290            .unwrap_or("");
2291
2292        if !crate::channels::nextcloud_talk::verify_nextcloud_talk_signature(
2293            webhook_secret,
2294            random,
2295            &body_str,
2296            signature,
2297        ) {
2298            tracing::warn!(
2299                "Nextcloud Talk webhook signature verification failed (signature: {})",
2300                if signature.is_empty() {
2301                    "missing"
2302                } else {
2303                    "invalid"
2304                }
2305            );
2306            return (
2307                StatusCode::UNAUTHORIZED,
2308                Json(serde_json::json!({"error": "Invalid signature"})),
2309            );
2310        }
2311    }
2312
2313    // Parse JSON body
2314    let Ok(payload) = serde_json::from_slice::<serde_json::Value>(&body) else {
2315        return (
2316            StatusCode::BAD_REQUEST,
2317            Json(serde_json::json!({"error": "Invalid JSON payload"})),
2318        );
2319    };
2320
2321    // Parse messages from webhook payload
2322    let messages = nextcloud_talk.parse_webhook_payload(&payload);
2323    if messages.is_empty() {
2324        // Acknowledge webhook even if payload does not contain actionable user messages.
2325        return (StatusCode::OK, Json(serde_json::json!({"status": "ok"})));
2326    }
2327
2328    for msg in &messages {
2329        tracing::info!(
2330            "Nextcloud Talk message from {}: {}",
2331            msg.sender,
2332            truncate_with_ellipsis(&msg.content, 50)
2333        );
2334        let session_id = sender_session_id("nextcloud_talk", msg);
2335
2336        if state.auto_save && !memory::should_skip_autosave_content(&msg.content) {
2337            let key = nextcloud_talk_memory_key(msg);
2338            let _ = state
2339                .mem
2340                .store(
2341                    &key,
2342                    &msg.content,
2343                    MemoryCategory::Conversation,
2344                    Some(&session_id),
2345                )
2346                .await;
2347        }
2348
2349        match Box::pin(run_gateway_chat_with_tools(
2350            &state,
2351            &msg.content,
2352            Some(&session_id),
2353        ))
2354        .await
2355        {
2356            Ok(response) => {
2357                if let Err(e) = nextcloud_talk
2358                    .send(&SendMessage::new(response, &msg.reply_target))
2359                    .await
2360                {
2361                    tracing::error!("Failed to send Nextcloud Talk reply: {e}");
2362                }
2363            }
2364            Err(e) => {
2365                tracing::error!("LLM error for Nextcloud Talk message: {e:#}");
2366                let _ = nextcloud_talk
2367                    .send(&SendMessage::new(
2368                        "Sorry, I couldn't process your message right now.",
2369                        &msg.reply_target,
2370                    ))
2371                    .await;
2372            }
2373        }
2374    }
2375
2376    (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
2377}
2378
2379/// Maximum request body size for the Gmail webhook endpoint (1 MB).
2380/// Google Pub/Sub messages are typically under 10 KB.
2381const GMAIL_WEBHOOK_MAX_BODY: usize = 1024 * 1024;
2382
2383/// POST /webhook/gmail — incoming Gmail Pub/Sub push notification
2384async fn handle_gmail_push_webhook(
2385    State(state): State<AppState>,
2386    headers: HeaderMap,
2387    body: Bytes,
2388) -> impl IntoResponse {
2389    let Some(ref gmail_push) = state.gmail_push else {
2390        return (
2391            StatusCode::NOT_FOUND,
2392            Json(serde_json::json!({"error": "Gmail push not configured"})),
2393        );
2394    };
2395
2396    // Enforce body size limit.
2397    if body.len() > GMAIL_WEBHOOK_MAX_BODY {
2398        return (
2399            StatusCode::PAYLOAD_TOO_LARGE,
2400            Json(serde_json::json!({"error": "Request body too large"})),
2401        );
2402    }
2403
2404    // Authenticate the webhook request using a shared secret.
2405    let secret = gmail_push.resolve_webhook_secret();
2406    if !secret.is_empty() {
2407        let provided = headers
2408            .get(axum::http::header::AUTHORIZATION)
2409            .and_then(|v| v.to_str().ok())
2410            .and_then(|auth| auth.strip_prefix("Bearer "))
2411            .unwrap_or("");
2412
2413        if provided != secret {
2414            tracing::warn!("Gmail push webhook: unauthorized request");
2415            return (
2416                StatusCode::UNAUTHORIZED,
2417                Json(serde_json::json!({"error": "Unauthorized"})),
2418            );
2419        }
2420    }
2421
2422    let body_str = String::from_utf8_lossy(&body);
2423    let envelope: crate::channels::gmail_push::PubSubEnvelope =
2424        match serde_json::from_str(&body_str) {
2425            Ok(e) => e,
2426            Err(e) => {
2427                tracing::warn!("Gmail push webhook: invalid payload: {e}");
2428                return (
2429                    StatusCode::BAD_REQUEST,
2430                    Json(serde_json::json!({"error": "Invalid Pub/Sub envelope"})),
2431                );
2432            }
2433        };
2434
2435    // Process the notification asynchronously (non-blocking for the webhook response)
2436    let channel = Arc::clone(gmail_push);
2437    tokio::spawn(async move {
2438        if let Err(e) = channel.handle_notification(&envelope).await {
2439            tracing::error!("Gmail push notification processing failed: {e:#}");
2440        }
2441    });
2442
2443    // Acknowledge immediately — Google Pub/Sub requires a 2xx within ~10s
2444    (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
2445}
2446
2447// ══════════════════════════════════════════════════════════════════════════════
2448// ADMIN HANDLERS (for CLI management)
2449// ══════════════════════════════════════════════════════════════════════════════
2450
2451/// Response for admin endpoints
2452#[derive(serde::Serialize)]
2453struct AdminResponse {
2454    success: bool,
2455    message: String,
2456}
2457
2458/// Reject requests that do not originate from a loopback address.
2459fn require_localhost(peer: &SocketAddr) -> Result<(), (StatusCode, Json<serde_json::Value>)> {
2460    if peer.ip().is_loopback() {
2461        Ok(())
2462    } else {
2463        Err((
2464            StatusCode::FORBIDDEN,
2465            Json(serde_json::json!({
2466                "error": "Admin endpoints are restricted to localhost"
2467            })),
2468        ))
2469    }
2470}
2471
2472/// POST /admin/shutdown — graceful shutdown from CLI (localhost only)
2473async fn handle_admin_shutdown(
2474    State(state): State<AppState>,
2475    ConnectInfo(peer): ConnectInfo<SocketAddr>,
2476) -> Result<impl IntoResponse, (StatusCode, Json<serde_json::Value>)> {
2477    require_localhost(&peer)?;
2478    tracing::info!("🔌 Admin shutdown request received — initiating graceful shutdown");
2479
2480    let body = AdminResponse {
2481        success: true,
2482        message: "Gateway shutdown initiated".to_string(),
2483    };
2484
2485    let _ = state.shutdown_tx.send(true);
2486
2487    Ok((StatusCode::OK, Json(body)))
2488}
2489
2490/// GET /admin/paircode — fetch current pairing code (localhost only)
2491async fn handle_admin_paircode(
2492    State(state): State<AppState>,
2493    ConnectInfo(peer): ConnectInfo<SocketAddr>,
2494) -> Result<impl IntoResponse, (StatusCode, Json<serde_json::Value>)> {
2495    require_localhost(&peer)?;
2496    let code = state.pairing.pairing_code();
2497
2498    let body = if let Some(c) = code {
2499        serde_json::json!({
2500            "success": true,
2501            "pairing_required": state.pairing.require_pairing(),
2502            "pairing_code": c,
2503            "message": "Use this one-time code to pair"
2504        })
2505    } else {
2506        serde_json::json!({
2507            "success": true,
2508            "pairing_required": state.pairing.require_pairing(),
2509            "pairing_code": null,
2510            "message": if state.pairing.require_pairing() {
2511                "Pairing is active but no new code available (already paired or code expired)"
2512            } else {
2513                "Pairing is disabled for this gateway"
2514            }
2515        })
2516    };
2517
2518    Ok((StatusCode::OK, Json(body)))
2519}
2520
2521/// POST /admin/paircode/new — generate a new pairing code (localhost only)
2522async fn handle_admin_paircode_new(
2523    State(state): State<AppState>,
2524    ConnectInfo(peer): ConnectInfo<SocketAddr>,
2525) -> Result<impl IntoResponse, (StatusCode, Json<serde_json::Value>)> {
2526    require_localhost(&peer)?;
2527    match state.pairing.generate_new_pairing_code() {
2528        Some(code) => {
2529            tracing::info!("🔐 New pairing code generated via admin endpoint");
2530            let body = serde_json::json!({
2531                "success": true,
2532                "pairing_required": state.pairing.require_pairing(),
2533                "pairing_code": code,
2534                "message": "New pairing code generated — use this one-time code to pair"
2535            });
2536            Ok((StatusCode::OK, Json(body)))
2537        }
2538        None => {
2539            let body = serde_json::json!({
2540                "success": false,
2541                "pairing_required": false,
2542                "pairing_code": null,
2543                "message": "Pairing is disabled for this gateway"
2544            });
2545            Ok((StatusCode::BAD_REQUEST, Json(body)))
2546        }
2547    }
2548}
2549
2550/// GET /pair/code — fetch the initial pairing code.
2551///
2552/// Requires a loopback peer. A publicly-reachable endpoint would let any caller
2553/// (e.g. an attacker scanning exposed ngrok/Cloudflare tunnels during first-run)
2554/// fetch the code before the legitimate operator. Host-side dashboards should
2555/// reach the gateway over loopback; containerized setups can call this via
2556/// `docker exec` or fetch the code from `construct onboard` output.
2557async fn handle_pair_code(
2558    State(state): State<AppState>,
2559    ConnectInfo(peer): ConnectInfo<SocketAddr>,
2560) -> Result<impl IntoResponse, (StatusCode, Json<serde_json::Value>)> {
2561    require_localhost(&peer)?;
2562
2563    let require = state.pairing.require_pairing();
2564    let is_paired = state.pairing.is_paired();
2565
2566    let code = if require && !is_paired {
2567        state.pairing.pairing_code()
2568    } else {
2569        None
2570    };
2571
2572    let body = serde_json::json!({
2573        "success": true,
2574        "pairing_required": require,
2575        "pairing_code": code,
2576    });
2577
2578    Ok((StatusCode::OK, Json(body)))
2579}
2580
2581#[cfg(test)]
2582mod tests {
2583    use super::*;
2584    use crate::channels::traits::ChannelMessage;
2585    use crate::memory::{Memory, MemoryCategory, MemoryEntry};
2586    use crate::providers::Provider;
2587    use async_trait::async_trait;
2588    use axum::http::HeaderValue;
2589    use axum::response::IntoResponse;
2590    use http_body_util::BodyExt;
2591    use parking_lot::Mutex;
2592    use std::sync::atomic::{AtomicUsize, Ordering};
2593
2594    /// Generate a random hex secret at runtime to avoid hard-coded cryptographic values.
2595    fn generate_test_secret() -> String {
2596        let bytes: [u8; 32] = rand::random();
2597        hex::encode(bytes)
2598    }
2599
2600    #[test]
2601    fn security_body_limit_is_64kb() {
2602        assert_eq!(MAX_BODY_SIZE, 65_536);
2603    }
2604
2605    #[test]
2606    fn security_timeout_default_is_30_seconds() {
2607        assert_eq!(REQUEST_TIMEOUT_SECS, 30);
2608    }
2609
2610    #[test]
2611    fn gateway_timeout_falls_back_to_default() {
2612        // When env var is not set, should return the default constant
2613        // SAFETY: test-only, single-threaded test runner.
2614        unsafe { std::env::remove_var("CONSTRUCT_GATEWAY_TIMEOUT_SECS") };
2615        assert_eq!(gateway_request_timeout_secs(), 30);
2616    }
2617
2618    #[test]
2619    fn webhook_body_requires_message_field() {
2620        let valid = r#"{"message": "hello"}"#;
2621        let parsed: Result<WebhookBody, _> = serde_json::from_str(valid);
2622        assert!(parsed.is_ok());
2623        assert_eq!(parsed.unwrap().message, "hello");
2624
2625        let missing = r#"{"other": "field"}"#;
2626        let parsed: Result<WebhookBody, _> = serde_json::from_str(missing);
2627        assert!(parsed.is_err());
2628    }
2629
2630    #[test]
2631    fn whatsapp_query_fields_are_optional() {
2632        let q = WhatsAppVerifyQuery {
2633            mode: None,
2634            verify_token: None,
2635            challenge: None,
2636        };
2637        assert!(q.mode.is_none());
2638    }
2639
2640    #[test]
2641    fn app_state_is_clone() {
2642        fn assert_clone<T: Clone>() {}
2643        assert_clone::<AppState>();
2644    }
2645
2646    #[tokio::test]
2647    async fn metrics_endpoint_returns_hint_when_prometheus_is_disabled() {
2648        let state = AppState {
2649            config: Arc::new(Mutex::new(Config::default())),
2650            provider: Arc::new(MockProvider::default()),
2651            model: "test-model".into(),
2652            temperature: 0.0,
2653            mem: Arc::new(MockMemory),
2654            auto_save: false,
2655            webhook_secret_hash: None,
2656            pairing: Arc::new(PairingGuard::new(false, &[])),
2657            trust_forwarded_headers: false,
2658            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
2659            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
2660            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
2661            whatsapp: None,
2662            whatsapp_app_secret: None,
2663            linq: None,
2664            linq_signing_secret: None,
2665            nextcloud_talk: None,
2666            nextcloud_talk_webhook_secret: None,
2667            wati: None,
2668            gmail_push: None,
2669            observer: Arc::new(crate::observability::NoopObserver),
2670            tools_registry: Arc::new(Vec::new()),
2671            cost_tracker: None,
2672            audit_logger: None,
2673            event_tx: tokio::sync::broadcast::channel(16).0,
2674            shutdown_tx: tokio::sync::watch::channel(false).0,
2675            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
2676            path_prefix: String::new(),
2677            session_backend: None,
2678            session_queue: std::sync::Arc::new(
2679                crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
2680            ),
2681            device_registry: None,
2682            pending_pairings: None,
2683            canvas_store: CanvasStore::new(),
2684            mcp_registry: None,
2685            approval_registry: approval_registry::global(),
2686            mcp_local_url: None,
2687            #[cfg(feature = "webauthn")]
2688            webauthn: None,
2689        };
2690
2691        let response = handle_metrics(State(state)).await.into_response();
2692        assert_eq!(response.status(), StatusCode::OK);
2693        assert_eq!(
2694            response
2695                .headers()
2696                .get(header::CONTENT_TYPE)
2697                .and_then(|value| value.to_str().ok()),
2698            Some(PROMETHEUS_CONTENT_TYPE)
2699        );
2700
2701        let body = response.into_body().collect().await.unwrap().to_bytes();
2702        let text = String::from_utf8(body.to_vec()).unwrap();
2703        assert!(text.contains("Prometheus backend not enabled"));
2704    }
2705
2706    #[cfg(feature = "observability-prometheus")]
2707    #[tokio::test]
2708    async fn metrics_endpoint_renders_prometheus_output() {
2709        let event_tx = tokio::sync::broadcast::channel(16).0;
2710        let wrapped = sse::BroadcastObserver::new(
2711            Box::new(crate::observability::PrometheusObserver::new()),
2712            event_tx.clone(),
2713        );
2714        crate::observability::Observer::record_event(
2715            &wrapped,
2716            &crate::observability::ObserverEvent::HeartbeatTick,
2717        );
2718
2719        let observer: Arc<dyn crate::observability::Observer> = Arc::new(wrapped);
2720        let state = AppState {
2721            config: Arc::new(Mutex::new(Config::default())),
2722            provider: Arc::new(MockProvider::default()),
2723            model: "test-model".into(),
2724            temperature: 0.0,
2725            mem: Arc::new(MockMemory),
2726            auto_save: false,
2727            webhook_secret_hash: None,
2728            pairing: Arc::new(PairingGuard::new(false, &[])),
2729            trust_forwarded_headers: false,
2730            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
2731            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
2732            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
2733            whatsapp: None,
2734            whatsapp_app_secret: None,
2735            linq: None,
2736            linq_signing_secret: None,
2737            nextcloud_talk: None,
2738            nextcloud_talk_webhook_secret: None,
2739            wati: None,
2740            gmail_push: None,
2741            observer,
2742            tools_registry: Arc::new(Vec::new()),
2743            cost_tracker: None,
2744            audit_logger: None,
2745            event_tx,
2746            shutdown_tx: tokio::sync::watch::channel(false).0,
2747            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
2748            path_prefix: String::new(),
2749            session_backend: None,
2750            session_queue: std::sync::Arc::new(
2751                crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
2752            ),
2753            device_registry: None,
2754            pending_pairings: None,
2755            canvas_store: CanvasStore::new(),
2756            mcp_registry: None,
2757            approval_registry: approval_registry::global(),
2758            mcp_local_url: None,
2759            #[cfg(feature = "webauthn")]
2760            webauthn: None,
2761        };
2762
2763        let response = handle_metrics(State(state)).await.into_response();
2764        assert_eq!(response.status(), StatusCode::OK);
2765
2766        let body = response.into_body().collect().await.unwrap().to_bytes();
2767        let text = String::from_utf8(body.to_vec()).unwrap();
2768        assert!(text.contains("construct_heartbeat_ticks_total 1"));
2769    }
2770
2771    #[test]
2772    fn gateway_rate_limiter_blocks_after_limit() {
2773        let limiter = GatewayRateLimiter::new(2, 2, 100);
2774        assert!(limiter.allow_pair("127.0.0.1"));
2775        assert!(limiter.allow_pair("127.0.0.1"));
2776        assert!(!limiter.allow_pair("127.0.0.1"));
2777    }
2778
2779    #[test]
2780    fn rate_limiter_sweep_removes_stale_entries() {
2781        let limiter = SlidingWindowRateLimiter::new(10, Duration::from_secs(60), 100);
2782        // Add entries for multiple IPs
2783        assert!(limiter.allow("ip-1"));
2784        assert!(limiter.allow("ip-2"));
2785        assert!(limiter.allow("ip-3"));
2786
2787        {
2788            let guard = limiter.requests.lock();
2789            assert_eq!(guard.0.len(), 3);
2790        }
2791
2792        // Force a sweep by backdating last_sweep
2793        {
2794            let mut guard = limiter.requests.lock();
2795            guard.1 = Instant::now()
2796                .checked_sub(Duration::from_secs(RATE_LIMITER_SWEEP_INTERVAL_SECS + 1))
2797                .unwrap();
2798            // Clear timestamps for ip-2 and ip-3 to simulate stale entries
2799            guard.0.get_mut("ip-2").unwrap().clear();
2800            guard.0.get_mut("ip-3").unwrap().clear();
2801        }
2802
2803        // Next allow() call should trigger sweep and remove stale entries
2804        assert!(limiter.allow("ip-1"));
2805
2806        {
2807            let guard = limiter.requests.lock();
2808            assert_eq!(guard.0.len(), 1, "Stale entries should have been swept");
2809            assert!(guard.0.contains_key("ip-1"));
2810        }
2811    }
2812
2813    #[test]
2814    fn rate_limiter_zero_limit_always_allows() {
2815        let limiter = SlidingWindowRateLimiter::new(0, Duration::from_secs(60), 10);
2816        for _ in 0..100 {
2817            assert!(limiter.allow("any-key"));
2818        }
2819    }
2820
2821    #[test]
2822    fn idempotency_store_rejects_duplicate_key() {
2823        let store = IdempotencyStore::new(Duration::from_secs(30), 10);
2824        assert!(store.record_if_new("req-1"));
2825        assert!(!store.record_if_new("req-1"));
2826        assert!(store.record_if_new("req-2"));
2827    }
2828
2829    #[test]
2830    fn rate_limiter_bounded_cardinality_evicts_oldest_key() {
2831        let limiter = SlidingWindowRateLimiter::new(5, Duration::from_secs(60), 2);
2832        assert!(limiter.allow("ip-1"));
2833        assert!(limiter.allow("ip-2"));
2834        assert!(limiter.allow("ip-3"));
2835
2836        let guard = limiter.requests.lock();
2837        assert_eq!(guard.0.len(), 2);
2838        assert!(guard.0.contains_key("ip-2"));
2839        assert!(guard.0.contains_key("ip-3"));
2840    }
2841
2842    #[test]
2843    fn idempotency_store_bounded_cardinality_evicts_oldest_key() {
2844        let store = IdempotencyStore::new(Duration::from_secs(300), 2);
2845        assert!(store.record_if_new("k1"));
2846        std::thread::sleep(Duration::from_millis(2));
2847        assert!(store.record_if_new("k2"));
2848        std::thread::sleep(Duration::from_millis(2));
2849        assert!(store.record_if_new("k3"));
2850
2851        let keys = store.keys.lock();
2852        assert_eq!(keys.len(), 2);
2853        assert!(!keys.contains_key("k1"));
2854        assert!(keys.contains_key("k2"));
2855        assert!(keys.contains_key("k3"));
2856    }
2857
2858    #[test]
2859    fn client_key_defaults_to_peer_addr_when_untrusted_proxy_mode() {
2860        let peer = SocketAddr::from(([10, 0, 0, 5], 42617));
2861        let mut headers = HeaderMap::new();
2862        headers.insert(
2863            "X-Forwarded-For",
2864            HeaderValue::from_static("198.51.100.10, 203.0.113.11"),
2865        );
2866
2867        let key = client_key_from_request(Some(peer), &headers, false);
2868        assert_eq!(key, "10.0.0.5");
2869    }
2870
2871    #[test]
2872    fn client_key_uses_forwarded_ip_only_in_trusted_proxy_mode() {
2873        // Rightmost XFF hop is the one appended by our trusted upstream proxy;
2874        // leftmost values are attacker-controlled (clients send whatever).
2875        let peer = SocketAddr::from(([10, 0, 0, 5], 42617));
2876        let mut headers = HeaderMap::new();
2877        headers.insert(
2878            "X-Forwarded-For",
2879            HeaderValue::from_static("198.51.100.10, 203.0.113.11"),
2880        );
2881
2882        let key = client_key_from_request(Some(peer), &headers, true);
2883        assert_eq!(key, "203.0.113.11");
2884    }
2885
2886    #[test]
2887    fn client_key_spoofed_leftmost_xff_does_not_bypass_trust() {
2888        // Attacker sets `X-Forwarded-For: 127.0.0.1, <legit-upstream>`. The
2889        // rightmost (trusted proxy's appended value) must win — if we took
2890        // the leftmost, a remote attacker could spoof loopback and evade
2891        // rate limits / lockouts.
2892        let peer = SocketAddr::from(([10, 0, 0, 5], 42617));
2893        let mut headers = HeaderMap::new();
2894        headers.insert(
2895            "X-Forwarded-For",
2896            HeaderValue::from_static("127.0.0.1, 203.0.113.11"),
2897        );
2898
2899        let key = client_key_from_request(Some(peer), &headers, true);
2900        assert_eq!(key, "203.0.113.11");
2901    }
2902
2903    #[test]
2904    fn client_key_falls_back_to_peer_when_forwarded_header_invalid() {
2905        let peer = SocketAddr::from(([10, 0, 0, 5], 42617));
2906        let mut headers = HeaderMap::new();
2907        headers.insert("X-Forwarded-For", HeaderValue::from_static("garbage-value"));
2908
2909        let key = client_key_from_request(Some(peer), &headers, true);
2910        assert_eq!(key, "10.0.0.5");
2911    }
2912
2913    #[test]
2914    fn normalize_max_keys_uses_fallback_for_zero() {
2915        assert_eq!(normalize_max_keys(0, 10_000), 10_000);
2916        assert_eq!(normalize_max_keys(0, 0), 1);
2917    }
2918
2919    #[test]
2920    fn normalize_max_keys_preserves_nonzero_values() {
2921        assert_eq!(normalize_max_keys(2_048, 10_000), 2_048);
2922        assert_eq!(normalize_max_keys(1, 10_000), 1);
2923    }
2924
2925    #[tokio::test]
2926    async fn persist_pairing_tokens_writes_config_tokens() {
2927        let temp = tempfile::tempdir().unwrap();
2928        let config_path = temp.path().join("config.toml");
2929        let workspace_path = temp.path().join("workspace");
2930
2931        let mut config = Config::default();
2932        config.config_path = config_path.clone();
2933        config.workspace_dir = workspace_path;
2934        config.save().await.unwrap();
2935
2936        let guard = PairingGuard::new(true, &[]);
2937        let code = guard.pairing_code().unwrap();
2938        let token = guard.try_pair(&code, "test_client").await.unwrap().unwrap();
2939        assert!(guard.is_authenticated(&token));
2940
2941        let shared_config = Arc::new(Mutex::new(config));
2942        Box::pin(persist_pairing_tokens(shared_config.clone(), &guard))
2943            .await
2944            .unwrap();
2945
2946        // In-memory tokens should remain as plaintext 64-char hex hashes.
2947        let plaintext = {
2948            let in_memory = shared_config.lock();
2949            assert_eq!(in_memory.gateway.paired_tokens.len(), 1);
2950            in_memory.gateway.paired_tokens[0].clone()
2951        };
2952        assert_eq!(plaintext.len(), 64);
2953        assert!(plaintext.chars().all(|c: char| c.is_ascii_hexdigit()));
2954
2955        // On disk, the token should be encrypted (secrets.encrypt defaults to true).
2956        let saved = tokio::fs::read_to_string(config_path).await.unwrap();
2957        let raw_parsed: Config = toml::from_str(&saved).unwrap();
2958        assert_eq!(raw_parsed.gateway.paired_tokens.len(), 1);
2959        let on_disk = &raw_parsed.gateway.paired_tokens[0];
2960        assert!(
2961            crate::security::SecretStore::is_encrypted(on_disk),
2962            "paired_token should be encrypted on disk"
2963        );
2964    }
2965
2966    #[test]
2967    fn webhook_memory_key_is_unique() {
2968        let key1 = webhook_memory_key();
2969        let key2 = webhook_memory_key();
2970
2971        assert!(key1.starts_with("webhook_msg_"));
2972        assert!(key2.starts_with("webhook_msg_"));
2973        assert_ne!(key1, key2);
2974    }
2975
2976    #[test]
2977    fn whatsapp_memory_key_includes_sender_and_message_id() {
2978        let msg = ChannelMessage {
2979            id: "wamid-123".into(),
2980            sender: "+1234567890".into(),
2981            reply_target: "+1234567890".into(),
2982            content: "hello".into(),
2983            channel: "whatsapp".into(),
2984            timestamp: 1,
2985            thread_ts: None,
2986            interruption_scope_id: None,
2987            attachments: vec![],
2988        };
2989
2990        let key = whatsapp_memory_key(&msg);
2991        assert_eq!(key, "whatsapp_+1234567890_wamid-123");
2992    }
2993
2994    #[derive(Default)]
2995    struct MockMemory;
2996
2997    #[async_trait]
2998    impl Memory for MockMemory {
2999        fn name(&self) -> &str {
3000            "mock"
3001        }
3002
3003        async fn store(
3004            &self,
3005            _key: &str,
3006            _content: &str,
3007            _category: MemoryCategory,
3008            _session_id: Option<&str>,
3009        ) -> anyhow::Result<()> {
3010            Ok(())
3011        }
3012
3013        async fn recall(
3014            &self,
3015            _query: &str,
3016            _limit: usize,
3017            _session_id: Option<&str>,
3018            _since: Option<&str>,
3019            _until: Option<&str>,
3020        ) -> anyhow::Result<Vec<MemoryEntry>> {
3021            Ok(Vec::new())
3022        }
3023
3024        async fn get(&self, _key: &str) -> anyhow::Result<Option<MemoryEntry>> {
3025            Ok(None)
3026        }
3027
3028        async fn list(
3029            &self,
3030            _category: Option<&MemoryCategory>,
3031            _session_id: Option<&str>,
3032        ) -> anyhow::Result<Vec<MemoryEntry>> {
3033            Ok(Vec::new())
3034        }
3035
3036        async fn forget(&self, _key: &str) -> anyhow::Result<bool> {
3037            Ok(false)
3038        }
3039
3040        async fn count(&self) -> anyhow::Result<usize> {
3041            Ok(0)
3042        }
3043
3044        async fn health_check(&self) -> bool {
3045            true
3046        }
3047    }
3048
3049    #[derive(Default)]
3050    struct MockProvider {
3051        calls: AtomicUsize,
3052    }
3053
3054    #[async_trait]
3055    impl Provider for MockProvider {
3056        async fn chat_with_system(
3057            &self,
3058            _system_prompt: Option<&str>,
3059            _message: &str,
3060            _model: &str,
3061            _temperature: f64,
3062        ) -> anyhow::Result<String> {
3063            self.calls.fetch_add(1, Ordering::SeqCst);
3064            Ok("ok".into())
3065        }
3066    }
3067
3068    #[derive(Default)]
3069    struct TrackingMemory {
3070        keys: Mutex<Vec<String>>,
3071    }
3072
3073    #[async_trait]
3074    impl Memory for TrackingMemory {
3075        fn name(&self) -> &str {
3076            "tracking"
3077        }
3078
3079        async fn store(
3080            &self,
3081            key: &str,
3082            _content: &str,
3083            _category: MemoryCategory,
3084            _session_id: Option<&str>,
3085        ) -> anyhow::Result<()> {
3086            self.keys.lock().push(key.to_string());
3087            Ok(())
3088        }
3089
3090        async fn recall(
3091            &self,
3092            _query: &str,
3093            _limit: usize,
3094            _session_id: Option<&str>,
3095            _since: Option<&str>,
3096            _until: Option<&str>,
3097        ) -> anyhow::Result<Vec<MemoryEntry>> {
3098            Ok(Vec::new())
3099        }
3100
3101        async fn get(&self, _key: &str) -> anyhow::Result<Option<MemoryEntry>> {
3102            Ok(None)
3103        }
3104
3105        async fn list(
3106            &self,
3107            _category: Option<&MemoryCategory>,
3108            _session_id: Option<&str>,
3109        ) -> anyhow::Result<Vec<MemoryEntry>> {
3110            Ok(Vec::new())
3111        }
3112
3113        async fn forget(&self, _key: &str) -> anyhow::Result<bool> {
3114            Ok(false)
3115        }
3116
3117        async fn count(&self) -> anyhow::Result<usize> {
3118            let size = self.keys.lock().len();
3119            Ok(size)
3120        }
3121
3122        async fn health_check(&self) -> bool {
3123            true
3124        }
3125    }
3126
3127    fn test_connect_info() -> ConnectInfo<SocketAddr> {
3128        ConnectInfo(SocketAddr::from(([127, 0, 0, 1], 30_300)))
3129    }
3130
3131    #[tokio::test]
3132    async fn webhook_idempotency_skips_duplicate_provider_calls() {
3133        let provider_impl = Arc::new(MockProvider::default());
3134        let provider: Arc<dyn Provider> = provider_impl.clone();
3135        let memory: Arc<dyn Memory> = Arc::new(MockMemory);
3136
3137        let state = AppState {
3138            config: Arc::new(Mutex::new(Config::default())),
3139            provider,
3140            model: "test-model".into(),
3141            temperature: 0.0,
3142            mem: memory,
3143            auto_save: false,
3144            webhook_secret_hash: None,
3145            pairing: Arc::new(PairingGuard::new(false, &[])),
3146            trust_forwarded_headers: false,
3147            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3148            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3149            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3150            whatsapp: None,
3151            whatsapp_app_secret: None,
3152            linq: None,
3153            linq_signing_secret: None,
3154            nextcloud_talk: None,
3155            nextcloud_talk_webhook_secret: None,
3156            wati: None,
3157            gmail_push: None,
3158            observer: Arc::new(crate::observability::NoopObserver),
3159            tools_registry: Arc::new(Vec::new()),
3160            cost_tracker: None,
3161            audit_logger: None,
3162            event_tx: tokio::sync::broadcast::channel(16).0,
3163            shutdown_tx: tokio::sync::watch::channel(false).0,
3164            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3165            path_prefix: String::new(),
3166            session_backend: None,
3167            session_queue: std::sync::Arc::new(
3168                crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3169            ),
3170            device_registry: None,
3171            pending_pairings: None,
3172            canvas_store: CanvasStore::new(),
3173            mcp_registry: None,
3174            approval_registry: approval_registry::global(),
3175            mcp_local_url: None,
3176            #[cfg(feature = "webauthn")]
3177            webauthn: None,
3178        };
3179
3180        let mut headers = HeaderMap::new();
3181        headers.insert("X-Idempotency-Key", HeaderValue::from_static("abc-123"));
3182
3183        let body = Ok(Json(WebhookBody {
3184            message: "hello".into(),
3185        }));
3186        let first = handle_webhook(
3187            State(state.clone()),
3188            test_connect_info(),
3189            headers.clone(),
3190            body,
3191        )
3192        .await
3193        .into_response();
3194        assert_eq!(first.status(), StatusCode::OK);
3195
3196        let body = Ok(Json(WebhookBody {
3197            message: "hello".into(),
3198        }));
3199        let second = handle_webhook(State(state), test_connect_info(), headers, body)
3200            .await
3201            .into_response();
3202        assert_eq!(second.status(), StatusCode::OK);
3203
3204        let payload = second.into_body().collect().await.unwrap().to_bytes();
3205        let parsed: serde_json::Value = serde_json::from_slice(&payload).unwrap();
3206        assert_eq!(parsed["status"], "duplicate");
3207        assert_eq!(parsed["idempotent"], true);
3208        assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 1);
3209    }
3210
3211    #[tokio::test]
3212    async fn webhook_autosave_stores_distinct_keys_per_request() {
3213        let provider_impl = Arc::new(MockProvider::default());
3214        let provider: Arc<dyn Provider> = provider_impl.clone();
3215
3216        let tracking_impl = Arc::new(TrackingMemory::default());
3217        let memory: Arc<dyn Memory> = tracking_impl.clone();
3218
3219        let state = AppState {
3220            config: Arc::new(Mutex::new(Config::default())),
3221            provider,
3222            model: "test-model".into(),
3223            temperature: 0.0,
3224            mem: memory,
3225            auto_save: true,
3226            webhook_secret_hash: None,
3227            pairing: Arc::new(PairingGuard::new(false, &[])),
3228            trust_forwarded_headers: false,
3229            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3230            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3231            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3232            whatsapp: None,
3233            whatsapp_app_secret: None,
3234            linq: None,
3235            linq_signing_secret: None,
3236            nextcloud_talk: None,
3237            nextcloud_talk_webhook_secret: None,
3238            wati: None,
3239            gmail_push: None,
3240            observer: Arc::new(crate::observability::NoopObserver),
3241            tools_registry: Arc::new(Vec::new()),
3242            cost_tracker: None,
3243            audit_logger: None,
3244            event_tx: tokio::sync::broadcast::channel(16).0,
3245            shutdown_tx: tokio::sync::watch::channel(false).0,
3246            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3247            path_prefix: String::new(),
3248            session_backend: None,
3249            session_queue: std::sync::Arc::new(
3250                crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3251            ),
3252            device_registry: None,
3253            pending_pairings: None,
3254            canvas_store: CanvasStore::new(),
3255            mcp_registry: None,
3256            approval_registry: approval_registry::global(),
3257            mcp_local_url: None,
3258            #[cfg(feature = "webauthn")]
3259            webauthn: None,
3260        };
3261
3262        let headers = HeaderMap::new();
3263
3264        let body1 = Ok(Json(WebhookBody {
3265            message: "hello one".into(),
3266        }));
3267        let first = handle_webhook(
3268            State(state.clone()),
3269            test_connect_info(),
3270            headers.clone(),
3271            body1,
3272        )
3273        .await
3274        .into_response();
3275        assert_eq!(first.status(), StatusCode::OK);
3276
3277        let body2 = Ok(Json(WebhookBody {
3278            message: "hello two".into(),
3279        }));
3280        let second = handle_webhook(State(state), test_connect_info(), headers, body2)
3281            .await
3282            .into_response();
3283        assert_eq!(second.status(), StatusCode::OK);
3284
3285        let keys = tracking_impl.keys.lock().clone();
3286        assert_eq!(keys.len(), 2);
3287        assert_ne!(keys[0], keys[1]);
3288        assert!(keys[0].starts_with("webhook_msg_"));
3289        assert!(keys[1].starts_with("webhook_msg_"));
3290        assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 2);
3291    }
3292
3293    #[test]
3294    fn webhook_secret_hash_is_deterministic_and_nonempty() {
3295        let secret_a = generate_test_secret();
3296        let secret_b = generate_test_secret();
3297        let one = hash_webhook_secret(&secret_a);
3298        let two = hash_webhook_secret(&secret_a);
3299        let other = hash_webhook_secret(&secret_b);
3300
3301        assert_eq!(one, two);
3302        assert_ne!(one, other);
3303        assert_eq!(one.len(), 64);
3304    }
3305
3306    #[tokio::test]
3307    async fn webhook_secret_hash_rejects_missing_header() {
3308        let provider_impl = Arc::new(MockProvider::default());
3309        let provider: Arc<dyn Provider> = provider_impl.clone();
3310        let memory: Arc<dyn Memory> = Arc::new(MockMemory);
3311        let secret = generate_test_secret();
3312
3313        let state = AppState {
3314            config: Arc::new(Mutex::new(Config::default())),
3315            provider,
3316            model: "test-model".into(),
3317            temperature: 0.0,
3318            mem: memory,
3319            auto_save: false,
3320            webhook_secret_hash: Some(Arc::from(hash_webhook_secret(&secret))),
3321            pairing: Arc::new(PairingGuard::new(false, &[])),
3322            trust_forwarded_headers: false,
3323            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3324            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3325            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3326            whatsapp: None,
3327            whatsapp_app_secret: None,
3328            linq: None,
3329            linq_signing_secret: None,
3330            nextcloud_talk: None,
3331            nextcloud_talk_webhook_secret: None,
3332            wati: None,
3333            gmail_push: None,
3334            observer: Arc::new(crate::observability::NoopObserver),
3335            tools_registry: Arc::new(Vec::new()),
3336            cost_tracker: None,
3337            audit_logger: None,
3338            event_tx: tokio::sync::broadcast::channel(16).0,
3339            shutdown_tx: tokio::sync::watch::channel(false).0,
3340            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3341            path_prefix: String::new(),
3342            session_backend: None,
3343            session_queue: std::sync::Arc::new(
3344                crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3345            ),
3346            device_registry: None,
3347            pending_pairings: None,
3348            canvas_store: CanvasStore::new(),
3349            mcp_registry: None,
3350            approval_registry: approval_registry::global(),
3351            mcp_local_url: None,
3352            #[cfg(feature = "webauthn")]
3353            webauthn: None,
3354        };
3355
3356        let response = handle_webhook(
3357            State(state),
3358            test_connect_info(),
3359            HeaderMap::new(),
3360            Ok(Json(WebhookBody {
3361                message: "hello".into(),
3362            })),
3363        )
3364        .await
3365        .into_response();
3366
3367        assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
3368        assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 0);
3369    }
3370
3371    #[tokio::test]
3372    async fn webhook_secret_hash_rejects_invalid_header() {
3373        let provider_impl = Arc::new(MockProvider::default());
3374        let provider: Arc<dyn Provider> = provider_impl.clone();
3375        let memory: Arc<dyn Memory> = Arc::new(MockMemory);
3376        let valid_secret = generate_test_secret();
3377        let wrong_secret = generate_test_secret();
3378
3379        let state = AppState {
3380            config: Arc::new(Mutex::new(Config::default())),
3381            provider,
3382            model: "test-model".into(),
3383            temperature: 0.0,
3384            mem: memory,
3385            auto_save: false,
3386            webhook_secret_hash: Some(Arc::from(hash_webhook_secret(&valid_secret))),
3387            pairing: Arc::new(PairingGuard::new(false, &[])),
3388            trust_forwarded_headers: false,
3389            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3390            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3391            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3392            whatsapp: None,
3393            whatsapp_app_secret: None,
3394            linq: None,
3395            linq_signing_secret: None,
3396            nextcloud_talk: None,
3397            nextcloud_talk_webhook_secret: None,
3398            wati: None,
3399            gmail_push: None,
3400            observer: Arc::new(crate::observability::NoopObserver),
3401            tools_registry: Arc::new(Vec::new()),
3402            cost_tracker: None,
3403            audit_logger: None,
3404            event_tx: tokio::sync::broadcast::channel(16).0,
3405            shutdown_tx: tokio::sync::watch::channel(false).0,
3406            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3407            path_prefix: String::new(),
3408            session_backend: None,
3409            session_queue: std::sync::Arc::new(
3410                crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3411            ),
3412            device_registry: None,
3413            pending_pairings: None,
3414            canvas_store: CanvasStore::new(),
3415            mcp_registry: None,
3416            approval_registry: approval_registry::global(),
3417            mcp_local_url: None,
3418            #[cfg(feature = "webauthn")]
3419            webauthn: None,
3420        };
3421
3422        let mut headers = HeaderMap::new();
3423        headers.insert(
3424            "X-Webhook-Secret",
3425            HeaderValue::from_str(&wrong_secret).unwrap(),
3426        );
3427
3428        let response = handle_webhook(
3429            State(state),
3430            test_connect_info(),
3431            headers,
3432            Ok(Json(WebhookBody {
3433                message: "hello".into(),
3434            })),
3435        )
3436        .await
3437        .into_response();
3438
3439        assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
3440        assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 0);
3441    }
3442
3443    #[tokio::test]
3444    async fn webhook_secret_hash_accepts_valid_header() {
3445        let provider_impl = Arc::new(MockProvider::default());
3446        let provider: Arc<dyn Provider> = provider_impl.clone();
3447        let memory: Arc<dyn Memory> = Arc::new(MockMemory);
3448        let secret = generate_test_secret();
3449
3450        let state = AppState {
3451            config: Arc::new(Mutex::new(Config::default())),
3452            provider,
3453            model: "test-model".into(),
3454            temperature: 0.0,
3455            mem: memory,
3456            auto_save: false,
3457            webhook_secret_hash: Some(Arc::from(hash_webhook_secret(&secret))),
3458            pairing: Arc::new(PairingGuard::new(false, &[])),
3459            trust_forwarded_headers: false,
3460            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3461            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3462            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3463            whatsapp: None,
3464            whatsapp_app_secret: None,
3465            linq: None,
3466            linq_signing_secret: None,
3467            nextcloud_talk: None,
3468            nextcloud_talk_webhook_secret: None,
3469            wati: None,
3470            gmail_push: None,
3471            observer: Arc::new(crate::observability::NoopObserver),
3472            tools_registry: Arc::new(Vec::new()),
3473            cost_tracker: None,
3474            audit_logger: None,
3475            event_tx: tokio::sync::broadcast::channel(16).0,
3476            shutdown_tx: tokio::sync::watch::channel(false).0,
3477            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3478            path_prefix: String::new(),
3479            session_backend: None,
3480            session_queue: std::sync::Arc::new(
3481                crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3482            ),
3483            device_registry: None,
3484            pending_pairings: None,
3485            canvas_store: CanvasStore::new(),
3486            mcp_registry: None,
3487            approval_registry: approval_registry::global(),
3488            mcp_local_url: None,
3489            #[cfg(feature = "webauthn")]
3490            webauthn: None,
3491        };
3492
3493        let mut headers = HeaderMap::new();
3494        headers.insert("X-Webhook-Secret", HeaderValue::from_str(&secret).unwrap());
3495
3496        let response = handle_webhook(
3497            State(state),
3498            test_connect_info(),
3499            headers,
3500            Ok(Json(WebhookBody {
3501                message: "hello".into(),
3502            })),
3503        )
3504        .await
3505        .into_response();
3506
3507        assert_eq!(response.status(), StatusCode::OK);
3508        assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 1);
3509    }
3510
3511    fn compute_nextcloud_signature_hex(secret: &str, random: &str, body: &str) -> String {
3512        use hmac::{Hmac, Mac};
3513        use sha2::Sha256;
3514
3515        let payload = format!("{random}{body}");
3516        let mut mac = Hmac::<Sha256>::new_from_slice(secret.as_bytes()).unwrap();
3517        mac.update(payload.as_bytes());
3518        hex::encode(mac.finalize().into_bytes())
3519    }
3520
3521    #[tokio::test]
3522    async fn nextcloud_talk_webhook_returns_not_found_when_not_configured() {
3523        let provider: Arc<dyn Provider> = Arc::new(MockProvider::default());
3524        let memory: Arc<dyn Memory> = Arc::new(MockMemory);
3525
3526        let state = AppState {
3527            config: Arc::new(Mutex::new(Config::default())),
3528            provider,
3529            model: "test-model".into(),
3530            temperature: 0.0,
3531            mem: memory,
3532            auto_save: false,
3533            webhook_secret_hash: None,
3534            pairing: Arc::new(PairingGuard::new(false, &[])),
3535            trust_forwarded_headers: false,
3536            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3537            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3538            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3539            whatsapp: None,
3540            whatsapp_app_secret: None,
3541            linq: None,
3542            linq_signing_secret: None,
3543            nextcloud_talk: None,
3544            nextcloud_talk_webhook_secret: None,
3545            wati: None,
3546            gmail_push: None,
3547            observer: Arc::new(crate::observability::NoopObserver),
3548            tools_registry: Arc::new(Vec::new()),
3549            cost_tracker: None,
3550            audit_logger: None,
3551            event_tx: tokio::sync::broadcast::channel(16).0,
3552            shutdown_tx: tokio::sync::watch::channel(false).0,
3553            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3554            path_prefix: String::new(),
3555            session_backend: None,
3556            session_queue: std::sync::Arc::new(
3557                crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3558            ),
3559            device_registry: None,
3560            pending_pairings: None,
3561            canvas_store: CanvasStore::new(),
3562            mcp_registry: None,
3563            approval_registry: approval_registry::global(),
3564            mcp_local_url: None,
3565            #[cfg(feature = "webauthn")]
3566            webauthn: None,
3567        };
3568
3569        let response = Box::pin(handle_nextcloud_talk_webhook(
3570            State(state),
3571            HeaderMap::new(),
3572            Bytes::from_static(br#"{"type":"message"}"#),
3573        ))
3574        .await
3575        .into_response();
3576
3577        assert_eq!(response.status(), StatusCode::NOT_FOUND);
3578    }
3579
3580    #[tokio::test]
3581    async fn nextcloud_talk_webhook_rejects_invalid_signature() {
3582        let provider_impl = Arc::new(MockProvider::default());
3583        let provider: Arc<dyn Provider> = provider_impl.clone();
3584        let memory: Arc<dyn Memory> = Arc::new(MockMemory);
3585
3586        let channel = Arc::new(NextcloudTalkChannel::new(
3587            "https://cloud.example.com".into(),
3588            "app-token".into(),
3589            String::new(),
3590            vec!["*".into()],
3591        ));
3592
3593        let secret = "nextcloud-test-secret";
3594        let random = "seed-value";
3595        let body = r#"{"type":"message","object":{"token":"room-token"},"message":{"actorType":"users","actorId":"user_a","message":"hello"}}"#;
3596        let _valid_signature = compute_nextcloud_signature_hex(secret, random, body);
3597        let invalid_signature = "deadbeef";
3598
3599        let state = AppState {
3600            config: Arc::new(Mutex::new(Config::default())),
3601            provider,
3602            model: "test-model".into(),
3603            temperature: 0.0,
3604            mem: memory,
3605            auto_save: false,
3606            webhook_secret_hash: None,
3607            pairing: Arc::new(PairingGuard::new(false, &[])),
3608            trust_forwarded_headers: false,
3609            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3610            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3611            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3612            whatsapp: None,
3613            whatsapp_app_secret: None,
3614            linq: None,
3615            linq_signing_secret: None,
3616            nextcloud_talk: Some(channel),
3617            nextcloud_talk_webhook_secret: Some(Arc::from(secret)),
3618            wati: None,
3619            gmail_push: None,
3620            observer: Arc::new(crate::observability::NoopObserver),
3621            tools_registry: Arc::new(Vec::new()),
3622            cost_tracker: None,
3623            audit_logger: None,
3624            event_tx: tokio::sync::broadcast::channel(16).0,
3625            shutdown_tx: tokio::sync::watch::channel(false).0,
3626            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3627            path_prefix: String::new(),
3628            session_backend: None,
3629            session_queue: std::sync::Arc::new(
3630                crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3631            ),
3632            device_registry: None,
3633            pending_pairings: None,
3634            canvas_store: CanvasStore::new(),
3635            mcp_registry: None,
3636            approval_registry: approval_registry::global(),
3637            mcp_local_url: None,
3638            #[cfg(feature = "webauthn")]
3639            webauthn: None,
3640        };
3641
3642        let mut headers = HeaderMap::new();
3643        headers.insert(
3644            "X-Nextcloud-Talk-Random",
3645            HeaderValue::from_str(random).unwrap(),
3646        );
3647        headers.insert(
3648            "X-Nextcloud-Talk-Signature",
3649            HeaderValue::from_str(invalid_signature).unwrap(),
3650        );
3651
3652        let response = Box::pin(handle_nextcloud_talk_webhook(
3653            State(state),
3654            headers,
3655            Bytes::from(body),
3656        ))
3657        .await
3658        .into_response();
3659        assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
3660        assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 0);
3661    }
3662
3663    // ══════════════════════════════════════════════════════════
3664    // WhatsApp Signature Verification Tests (CWE-345 Prevention)
3665    // ══════════════════════════════════════════════════════════
3666
3667    fn compute_whatsapp_signature_hex(secret: &str, body: &[u8]) -> String {
3668        use hmac::{Hmac, Mac};
3669        use sha2::Sha256;
3670
3671        let mut mac = Hmac::<Sha256>::new_from_slice(secret.as_bytes()).unwrap();
3672        mac.update(body);
3673        hex::encode(mac.finalize().into_bytes())
3674    }
3675
3676    fn compute_whatsapp_signature_header(secret: &str, body: &[u8]) -> String {
3677        format!("sha256={}", compute_whatsapp_signature_hex(secret, body))
3678    }
3679
3680    #[test]
3681    fn whatsapp_signature_valid() {
3682        let app_secret = generate_test_secret();
3683        let body = b"test body content";
3684
3685        let signature_header = compute_whatsapp_signature_header(&app_secret, body);
3686
3687        assert!(verify_whatsapp_signature(
3688            &app_secret,
3689            body,
3690            &signature_header
3691        ));
3692    }
3693
3694    #[test]
3695    fn whatsapp_signature_invalid_wrong_secret() {
3696        let app_secret = generate_test_secret();
3697        let wrong_secret = generate_test_secret();
3698        let body = b"test body content";
3699
3700        let signature_header = compute_whatsapp_signature_header(&wrong_secret, body);
3701
3702        assert!(!verify_whatsapp_signature(
3703            &app_secret,
3704            body,
3705            &signature_header
3706        ));
3707    }
3708
3709    #[test]
3710    fn whatsapp_signature_invalid_wrong_body() {
3711        let app_secret = generate_test_secret();
3712        let original_body = b"original body";
3713        let tampered_body = b"tampered body";
3714
3715        let signature_header = compute_whatsapp_signature_header(&app_secret, original_body);
3716
3717        // Verify with tampered body should fail
3718        assert!(!verify_whatsapp_signature(
3719            &app_secret,
3720            tampered_body,
3721            &signature_header
3722        ));
3723    }
3724
3725    #[test]
3726    fn whatsapp_signature_missing_prefix() {
3727        let app_secret = generate_test_secret();
3728        let body = b"test body";
3729
3730        // Signature without "sha256=" prefix
3731        let signature_header = "abc123def456";
3732
3733        assert!(!verify_whatsapp_signature(
3734            &app_secret,
3735            body,
3736            signature_header
3737        ));
3738    }
3739
3740    #[test]
3741    fn whatsapp_signature_empty_header() {
3742        let app_secret = generate_test_secret();
3743        let body = b"test body";
3744
3745        assert!(!verify_whatsapp_signature(&app_secret, body, ""));
3746    }
3747
3748    #[test]
3749    fn whatsapp_signature_invalid_hex() {
3750        let app_secret = generate_test_secret();
3751        let body = b"test body";
3752
3753        // Invalid hex characters
3754        let signature_header = "sha256=not_valid_hex_zzz";
3755
3756        assert!(!verify_whatsapp_signature(
3757            &app_secret,
3758            body,
3759            signature_header
3760        ));
3761    }
3762
3763    #[test]
3764    fn whatsapp_signature_empty_body() {
3765        let app_secret = generate_test_secret();
3766        let body = b"";
3767
3768        let signature_header = compute_whatsapp_signature_header(&app_secret, body);
3769
3770        assert!(verify_whatsapp_signature(
3771            &app_secret,
3772            body,
3773            &signature_header
3774        ));
3775    }
3776
3777    #[test]
3778    fn whatsapp_signature_unicode_body() {
3779        let app_secret = generate_test_secret();
3780        let body = "Hello 🦀 World".as_bytes();
3781
3782        let signature_header = compute_whatsapp_signature_header(&app_secret, body);
3783
3784        assert!(verify_whatsapp_signature(
3785            &app_secret,
3786            body,
3787            &signature_header
3788        ));
3789    }
3790
3791    #[test]
3792    fn whatsapp_signature_json_payload() {
3793        let app_secret = generate_test_secret();
3794        let body = br#"{"entry":[{"changes":[{"value":{"messages":[{"from":"1234567890","text":{"body":"Hello"}}]}}]}]}"#;
3795
3796        let signature_header = compute_whatsapp_signature_header(&app_secret, body);
3797
3798        assert!(verify_whatsapp_signature(
3799            &app_secret,
3800            body,
3801            &signature_header
3802        ));
3803    }
3804
3805    #[test]
3806    fn whatsapp_signature_case_sensitive_prefix() {
3807        let app_secret = generate_test_secret();
3808        let body = b"test body";
3809
3810        let hex_sig = compute_whatsapp_signature_hex(&app_secret, body);
3811
3812        // Wrong case prefix should fail
3813        let wrong_prefix = format!("SHA256={hex_sig}");
3814        assert!(!verify_whatsapp_signature(&app_secret, body, &wrong_prefix));
3815
3816        // Correct prefix should pass
3817        let correct_prefix = format!("sha256={hex_sig}");
3818        assert!(verify_whatsapp_signature(
3819            &app_secret,
3820            body,
3821            &correct_prefix
3822        ));
3823    }
3824
3825    #[test]
3826    fn whatsapp_signature_truncated_hex() {
3827        let app_secret = generate_test_secret();
3828        let body = b"test body";
3829
3830        let hex_sig = compute_whatsapp_signature_hex(&app_secret, body);
3831        let truncated = &hex_sig[..32]; // Only half the signature
3832        let signature_header = format!("sha256={truncated}");
3833
3834        assert!(!verify_whatsapp_signature(
3835            &app_secret,
3836            body,
3837            &signature_header
3838        ));
3839    }
3840
3841    #[test]
3842    fn whatsapp_signature_extra_bytes() {
3843        let app_secret = generate_test_secret();
3844        let body = b"test body";
3845
3846        let hex_sig = compute_whatsapp_signature_hex(&app_secret, body);
3847        let extended = format!("{hex_sig}deadbeef");
3848        let signature_header = format!("sha256={extended}");
3849
3850        assert!(!verify_whatsapp_signature(
3851            &app_secret,
3852            body,
3853            &signature_header
3854        ));
3855    }
3856
3857    // ══════════════════════════════════════════════════════════
3858    // IdempotencyStore Edge-Case Tests
3859    // ══════════════════════════════════════════════════════════
3860
3861    #[test]
3862    fn idempotency_store_allows_different_keys() {
3863        let store = IdempotencyStore::new(Duration::from_secs(60), 100);
3864        assert!(store.record_if_new("key-a"));
3865        assert!(store.record_if_new("key-b"));
3866        assert!(store.record_if_new("key-c"));
3867        assert!(store.record_if_new("key-d"));
3868    }
3869
3870    #[test]
3871    fn idempotency_store_max_keys_clamped_to_one() {
3872        let store = IdempotencyStore::new(Duration::from_secs(60), 0);
3873        assert!(store.record_if_new("only-key"));
3874        assert!(!store.record_if_new("only-key"));
3875    }
3876
3877    #[test]
3878    fn idempotency_store_rapid_duplicate_rejected() {
3879        let store = IdempotencyStore::new(Duration::from_secs(300), 100);
3880        assert!(store.record_if_new("rapid"));
3881        assert!(!store.record_if_new("rapid"));
3882    }
3883
3884    #[test]
3885    fn idempotency_store_accepts_after_ttl_expires() {
3886        let store = IdempotencyStore::new(Duration::from_millis(1), 100);
3887        assert!(store.record_if_new("ttl-key"));
3888        std::thread::sleep(Duration::from_millis(10));
3889        assert!(store.record_if_new("ttl-key"));
3890    }
3891
3892    #[test]
3893    fn idempotency_store_eviction_preserves_newest() {
3894        let store = IdempotencyStore::new(Duration::from_secs(300), 1);
3895        assert!(store.record_if_new("old-key"));
3896        std::thread::sleep(Duration::from_millis(2));
3897        assert!(store.record_if_new("new-key"));
3898
3899        let keys = store.keys.lock();
3900        assert_eq!(keys.len(), 1);
3901        assert!(!keys.contains_key("old-key"));
3902        assert!(keys.contains_key("new-key"));
3903    }
3904
3905    #[test]
3906    fn rate_limiter_allows_after_window_expires() {
3907        let window = Duration::from_millis(50);
3908        let limiter = SlidingWindowRateLimiter::new(2, window, 100);
3909        assert!(limiter.allow("ip-1"));
3910        assert!(limiter.allow("ip-1"));
3911        assert!(!limiter.allow("ip-1")); // blocked
3912
3913        // Wait for window to expire
3914        std::thread::sleep(Duration::from_millis(60));
3915
3916        // Should be allowed again
3917        assert!(limiter.allow("ip-1"));
3918    }
3919
3920    #[test]
3921    fn rate_limiter_independent_keys_tracked_separately() {
3922        let limiter = SlidingWindowRateLimiter::new(2, Duration::from_secs(60), 100);
3923        assert!(limiter.allow("ip-1"));
3924        assert!(limiter.allow("ip-1"));
3925        assert!(!limiter.allow("ip-1")); // ip-1 blocked
3926
3927        // ip-2 should still work
3928        assert!(limiter.allow("ip-2"));
3929        assert!(limiter.allow("ip-2"));
3930        assert!(!limiter.allow("ip-2")); // ip-2 now blocked
3931    }
3932
3933    #[test]
3934    fn rate_limiter_exact_boundary_at_max_keys() {
3935        let limiter = SlidingWindowRateLimiter::new(10, Duration::from_secs(60), 3);
3936        assert!(limiter.allow("ip-1"));
3937        assert!(limiter.allow("ip-2"));
3938        assert!(limiter.allow("ip-3"));
3939        // At capacity now
3940        assert!(limiter.allow("ip-4")); // should evict ip-1
3941
3942        let guard = limiter.requests.lock();
3943        assert_eq!(guard.0.len(), 3);
3944        assert!(
3945            !guard.0.contains_key("ip-1"),
3946            "ip-1 should have been evicted"
3947        );
3948        assert!(guard.0.contains_key("ip-2"));
3949        assert!(guard.0.contains_key("ip-3"));
3950        assert!(guard.0.contains_key("ip-4"));
3951    }
3952
3953    #[test]
3954    fn gateway_rate_limiter_pair_and_webhook_are_independent() {
3955        let limiter = GatewayRateLimiter::new(2, 3, 100);
3956
3957        // Exhaust pair limit
3958        assert!(limiter.allow_pair("ip-1"));
3959        assert!(limiter.allow_pair("ip-1"));
3960        assert!(!limiter.allow_pair("ip-1")); // pair blocked
3961
3962        // Webhook should still work
3963        assert!(limiter.allow_webhook("ip-1"));
3964        assert!(limiter.allow_webhook("ip-1"));
3965        assert!(limiter.allow_webhook("ip-1"));
3966        assert!(!limiter.allow_webhook("ip-1")); // webhook now blocked
3967    }
3968
3969    #[test]
3970    fn rate_limiter_single_key_max_allows_one_request() {
3971        let limiter = SlidingWindowRateLimiter::new(5, Duration::from_secs(60), 1);
3972        assert!(limiter.allow("ip-1"));
3973        assert!(limiter.allow("ip-2")); // evicts ip-1
3974
3975        let guard = limiter.requests.lock();
3976        assert_eq!(guard.0.len(), 1);
3977        assert!(guard.0.contains_key("ip-2"));
3978        assert!(!guard.0.contains_key("ip-1"));
3979    }
3980
3981    #[test]
3982    fn rate_limiter_concurrent_access_safe() {
3983        use std::sync::Arc;
3984
3985        let limiter = Arc::new(SlidingWindowRateLimiter::new(
3986            1000,
3987            Duration::from_secs(60),
3988            1000,
3989        ));
3990        let mut handles = Vec::new();
3991
3992        for i in 0..10 {
3993            let limiter = limiter.clone();
3994            handles.push(std::thread::spawn(move || {
3995                for j in 0..100 {
3996                    limiter.allow(&format!("thread-{i}-req-{j}"));
3997                }
3998            }));
3999        }
4000
4001        for handle in handles {
4002            handle.join().unwrap();
4003        }
4004
4005        // Should not panic or deadlock
4006        let guard = limiter.requests.lock();
4007        assert!(guard.0.len() <= 1000, "should respect max_keys");
4008    }
4009
4010    #[test]
4011    fn idempotency_store_concurrent_access_safe() {
4012        use std::sync::Arc;
4013
4014        let store = Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000));
4015        let mut handles = Vec::new();
4016
4017        for i in 0..10 {
4018            let store = store.clone();
4019            handles.push(std::thread::spawn(move || {
4020                for j in 0..100 {
4021                    store.record_if_new(&format!("thread-{i}-key-{j}"));
4022                }
4023            }));
4024        }
4025
4026        for handle in handles {
4027            handle.join().unwrap();
4028        }
4029
4030        let keys = store.keys.lock();
4031        assert!(keys.len() <= 1000, "should respect max_keys");
4032    }
4033
4034    #[test]
4035    fn rate_limiter_rapid_burst_then_cooldown() {
4036        let limiter = SlidingWindowRateLimiter::new(5, Duration::from_millis(50), 100);
4037
4038        // Burst: use all 5 requests
4039        for _ in 0..5 {
4040            assert!(limiter.allow("burst-ip"));
4041        }
4042        assert!(!limiter.allow("burst-ip")); // 6th should fail
4043
4044        // Cooldown
4045        std::thread::sleep(Duration::from_millis(60));
4046
4047        // Should be allowed again
4048        assert!(limiter.allow("burst-ip"));
4049    }
4050
4051    #[test]
4052    fn require_localhost_accepts_ipv4_loopback() {
4053        let peer = SocketAddr::from(([127, 0, 0, 1], 12345));
4054        assert!(require_localhost(&peer).is_ok());
4055    }
4056
4057    #[test]
4058    fn require_localhost_accepts_ipv6_loopback() {
4059        let peer = SocketAddr::from((std::net::Ipv6Addr::LOCALHOST, 12345));
4060        assert!(require_localhost(&peer).is_ok());
4061    }
4062
4063    #[test]
4064    fn require_localhost_rejects_non_loopback_ipv4() {
4065        let peer = SocketAddr::from(([192, 168, 1, 100], 12345));
4066        let err = require_localhost(&peer).unwrap_err();
4067        assert_eq!(err.0, StatusCode::FORBIDDEN);
4068    }
4069
4070    #[test]
4071    fn require_localhost_rejects_non_loopback_ipv6() {
4072        let peer = SocketAddr::from((
4073            std::net::Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1),
4074            12345,
4075        ));
4076        let err = require_localhost(&peer).unwrap_err();
4077        assert_eq!(err.0, StatusCode::FORBIDDEN);
4078    }
4079}