Skip to main content

construct/gateway/
mod.rs

1//! Axum-based HTTP gateway with proper HTTP/1.1 compliance, body limits, and timeouts.
2//!
3//! This module replaces the raw TCP implementation with axum for:
4//! - Proper HTTP/1.1 parsing and compliance
5//! - Content-Length validation (handled by hyper)
6//! - Request body size limits (64KB max)
7//! - Request timeouts (30s) to prevent slow-loris attacks
8//! - Header sanitization (handled by axum/hyper)
9
10pub mod api;
11pub mod api_agents;
12pub mod api_artifact_body;
13pub mod api_clawhub;
14pub mod api_kumiho_proxy;
15pub mod api_mcp;
16pub mod api_memory_graph;
17pub mod api_pairing;
18#[cfg(feature = "plugins-wasm")]
19pub mod api_plugins;
20pub mod api_skills;
21pub mod api_teams;
22#[cfg(feature = "webauthn")]
23pub mod api_webauthn;
24pub mod api_workflows;
25pub mod approval_registry;
26pub mod auth_rate_limit;
27pub mod canvas;
28pub mod kumiho_client;
29pub mod mcp_discovery;
30pub mod nodes;
31pub mod session_queue;
32pub mod sse;
33pub mod static_files;
34pub mod terminal;
35pub mod tls;
36pub mod ws;
37pub mod ws_mcp_events;
38
39use crate::channels::{
40    Channel, GmailPushChannel, LinqChannel, NextcloudTalkChannel, SendMessage, WatiChannel,
41    WhatsAppChannel, session_backend::SessionBackend, session_sqlite::SqliteSessionBackend,
42};
43use crate::config::Config;
44use crate::cost::CostTracker;
45use crate::memory::{self, Memory, MemoryCategory};
46use crate::providers::{self, ChatMessage, Provider};
47use crate::runtime;
48use crate::security::SecurityPolicy;
49use crate::security::pairing::{PairingGuard, constant_time_eq, is_public_bind};
50use crate::tools;
51use crate::tools::canvas::CanvasStore;
52use crate::tools::traits::ToolSpec;
53use crate::util::truncate_with_ellipsis;
54use anyhow::{Context, Result};
55use axum::{
56    Router,
57    body::Bytes,
58    extract::{ConnectInfo, Query, State},
59    http::{HeaderMap, StatusCode, header},
60    response::{IntoResponse, Json},
61    routing::{delete, get, post, put},
62};
63use parking_lot::Mutex;
64use std::collections::HashMap;
65use std::net::{IpAddr, SocketAddr};
66use std::sync::Arc;
67use std::time::{Duration, Instant};
68use tower_http::limit::RequestBodyLimitLayer;
69use tower_http::timeout::TimeoutLayer;
70use uuid::Uuid;
71
72/// Maximum request body size (64KB) — prevents memory exhaustion
73pub const MAX_BODY_SIZE: usize = 65_536;
74/// Default request timeout (30s) — prevents slow-loris attacks.
75pub const REQUEST_TIMEOUT_SECS: u64 = 30;
76
77/// Read gateway request timeout from `CONSTRUCT_GATEWAY_TIMEOUT_SECS` env var
78/// at runtime, falling back to [`REQUEST_TIMEOUT_SECS`].
79///
80/// Agentic workloads with tool use (web search, MCP tools, sub-agent
81/// delegation) regularly exceed 30 seconds. This allows operators to
82/// increase the timeout without recompiling.
83pub fn gateway_request_timeout_secs() -> u64 {
84    std::env::var("CONSTRUCT_GATEWAY_TIMEOUT_SECS")
85        .ok()
86        .and_then(|v| v.parse().ok())
87        .unwrap_or(REQUEST_TIMEOUT_SECS)
88}
89/// Sliding window used by gateway rate limiting.
90pub const RATE_LIMIT_WINDOW_SECS: u64 = 60;
91/// Fallback max distinct client keys tracked in gateway rate limiter.
92pub const RATE_LIMIT_MAX_KEYS_DEFAULT: usize = 10_000;
93/// Fallback max distinct idempotency keys retained in gateway memory.
94pub const IDEMPOTENCY_MAX_KEYS_DEFAULT: usize = 10_000;
95
96fn webhook_memory_key() -> String {
97    format!("webhook_msg_{}", Uuid::new_v4())
98}
99
100fn whatsapp_memory_key(msg: &crate::channels::traits::ChannelMessage) -> String {
101    format!("whatsapp_{}_{}", msg.sender, msg.id)
102}
103
104fn linq_memory_key(msg: &crate::channels::traits::ChannelMessage) -> String {
105    format!("linq_{}_{}", msg.sender, msg.id)
106}
107
108fn wati_memory_key(msg: &crate::channels::traits::ChannelMessage) -> String {
109    format!("wati_{}_{}", msg.sender, msg.id)
110}
111
112fn nextcloud_talk_memory_key(msg: &crate::channels::traits::ChannelMessage) -> String {
113    format!("nextcloud_talk_{}_{}", msg.sender, msg.id)
114}
115
116fn sender_session_id(channel: &str, msg: &crate::channels::traits::ChannelMessage) -> String {
117    match &msg.thread_ts {
118        Some(thread_id) => format!("{channel}_{thread_id}_{}", msg.sender),
119        None => format!("{channel}_{}", msg.sender),
120    }
121}
122
123fn webhook_session_id(headers: &HeaderMap) -> Option<String> {
124    headers
125        .get("X-Session-Id")
126        .and_then(|v| v.to_str().ok())
127        .map(str::trim)
128        .filter(|value| !value.is_empty())
129        .map(str::to_owned)
130}
131
132fn hash_webhook_secret(value: &str) -> String {
133    use sha2::{Digest, Sha256};
134
135    let digest = Sha256::digest(value.as_bytes());
136    hex::encode(digest)
137}
138
139/// How often the rate limiter sweeps stale IP entries from its map.
140const RATE_LIMITER_SWEEP_INTERVAL_SECS: u64 = 300; // 5 minutes
141
142#[derive(Debug)]
143struct SlidingWindowRateLimiter {
144    limit_per_window: u32,
145    window: Duration,
146    max_keys: usize,
147    requests: Mutex<(HashMap<String, Vec<Instant>>, Instant)>,
148}
149
150impl SlidingWindowRateLimiter {
151    fn new(limit_per_window: u32, window: Duration, max_keys: usize) -> Self {
152        Self {
153            limit_per_window,
154            window,
155            max_keys: max_keys.max(1),
156            requests: Mutex::new((HashMap::new(), Instant::now())),
157        }
158    }
159
160    fn prune_stale(requests: &mut HashMap<String, Vec<Instant>>, cutoff: Instant) {
161        requests.retain(|_, timestamps| {
162            timestamps.retain(|t| *t > cutoff);
163            !timestamps.is_empty()
164        });
165    }
166
167    fn allow(&self, key: &str) -> bool {
168        if self.limit_per_window == 0 {
169            return true;
170        }
171
172        let now = Instant::now();
173        let cutoff = now.checked_sub(self.window).unwrap_or_else(Instant::now);
174
175        let mut guard = self.requests.lock();
176        let (requests, last_sweep) = &mut *guard;
177
178        // Periodic sweep: remove keys with no recent requests
179        if last_sweep.elapsed() >= Duration::from_secs(RATE_LIMITER_SWEEP_INTERVAL_SECS) {
180            Self::prune_stale(requests, cutoff);
181            *last_sweep = now;
182        }
183
184        if !requests.contains_key(key) && requests.len() >= self.max_keys {
185            // Opportunistic stale cleanup before eviction under cardinality pressure.
186            Self::prune_stale(requests, cutoff);
187            *last_sweep = now;
188
189            if requests.len() >= self.max_keys {
190                let evict_key = requests
191                    .iter()
192                    .min_by_key(|(_, timestamps)| timestamps.last().copied().unwrap_or(cutoff))
193                    .map(|(k, _)| k.clone());
194                if let Some(evict_key) = evict_key {
195                    requests.remove(&evict_key);
196                }
197            }
198        }
199
200        let entry = requests.entry(key.to_owned()).or_default();
201        entry.retain(|instant| *instant > cutoff);
202
203        if entry.len() >= self.limit_per_window as usize {
204            return false;
205        }
206
207        entry.push(now);
208        true
209    }
210}
211
212#[derive(Debug)]
213pub struct GatewayRateLimiter {
214    pair: SlidingWindowRateLimiter,
215    webhook: SlidingWindowRateLimiter,
216}
217
218impl GatewayRateLimiter {
219    fn new(pair_per_minute: u32, webhook_per_minute: u32, max_keys: usize) -> Self {
220        let window = Duration::from_secs(RATE_LIMIT_WINDOW_SECS);
221        Self {
222            pair: SlidingWindowRateLimiter::new(pair_per_minute, window, max_keys),
223            webhook: SlidingWindowRateLimiter::new(webhook_per_minute, window, max_keys),
224        }
225    }
226
227    fn allow_pair(&self, key: &str) -> bool {
228        self.pair.allow(key)
229    }
230
231    fn allow_webhook(&self, key: &str) -> bool {
232        self.webhook.allow(key)
233    }
234}
235
236#[derive(Debug)]
237pub struct IdempotencyStore {
238    ttl: Duration,
239    max_keys: usize,
240    keys: Mutex<HashMap<String, Instant>>,
241}
242
243impl IdempotencyStore {
244    fn new(ttl: Duration, max_keys: usize) -> Self {
245        Self {
246            ttl,
247            max_keys: max_keys.max(1),
248            keys: Mutex::new(HashMap::new()),
249        }
250    }
251
252    /// Returns true if this key is new and is now recorded.
253    fn record_if_new(&self, key: &str) -> bool {
254        let now = Instant::now();
255        let mut keys = self.keys.lock();
256
257        keys.retain(|_, seen_at| now.duration_since(*seen_at) < self.ttl);
258
259        if keys.contains_key(key) {
260            return false;
261        }
262
263        if keys.len() >= self.max_keys {
264            let evict_key = keys
265                .iter()
266                .min_by_key(|(_, seen_at)| *seen_at)
267                .map(|(k, _)| k.clone());
268            if let Some(evict_key) = evict_key {
269                keys.remove(&evict_key);
270            }
271        }
272
273        keys.insert(key.to_owned(), now);
274        true
275    }
276}
277
278fn parse_client_ip(value: &str) -> Option<IpAddr> {
279    let value = value.trim().trim_matches('"').trim();
280    if value.is_empty() {
281        return None;
282    }
283
284    if let Ok(ip) = value.parse::<IpAddr>() {
285        return Some(ip);
286    }
287
288    if let Ok(addr) = value.parse::<SocketAddr>() {
289        return Some(addr.ip());
290    }
291
292    let value = value.trim_matches(['[', ']']);
293    value.parse::<IpAddr>().ok()
294}
295
296fn forwarded_client_ip(headers: &HeaderMap) -> Option<IpAddr> {
297    // Read the rightmost X-Forwarded-For hop. Proxies APPEND to XFF, so the
298    // leftmost value is supplied by the client (attacker-controlled) while the
299    // rightmost was written by the immediate upstream proxy we are trusting.
300    // Taking the leftmost would let any caller spoof an arbitrary source IP.
301    if let Some(xff) = headers.get("X-Forwarded-For").and_then(|v| v.to_str().ok()) {
302        for candidate in xff.rsplit(',') {
303            if let Some(ip) = parse_client_ip(candidate) {
304                return Some(ip);
305            }
306        }
307    }
308
309    headers
310        .get("X-Real-IP")
311        .and_then(|v| v.to_str().ok())
312        .and_then(parse_client_ip)
313}
314
315pub(super) fn client_key_from_request(
316    peer_addr: Option<SocketAddr>,
317    headers: &HeaderMap,
318    trust_forwarded_headers: bool,
319) -> String {
320    if trust_forwarded_headers {
321        if let Some(ip) = forwarded_client_ip(headers) {
322            return ip.to_string();
323        }
324    }
325
326    peer_addr
327        .map(|addr| addr.ip().to_string())
328        .unwrap_or_else(|| "unknown".to_string())
329}
330
331fn normalize_max_keys(configured: usize, fallback: usize) -> usize {
332    if configured == 0 {
333        fallback.max(1)
334    } else {
335        configured
336    }
337}
338
339/// Shared state for all axum handlers
340#[derive(Clone)]
341pub struct AppState {
342    pub config: Arc<Mutex<Config>>,
343    pub provider: Arc<dyn Provider>,
344    pub model: String,
345    pub temperature: f64,
346    pub mem: Arc<dyn Memory>,
347    pub auto_save: bool,
348    /// SHA-256 hash of `X-Webhook-Secret` (hex-encoded), never plaintext.
349    pub webhook_secret_hash: Option<Arc<str>>,
350    pub pairing: Arc<PairingGuard>,
351    pub trust_forwarded_headers: bool,
352    pub rate_limiter: Arc<GatewayRateLimiter>,
353    pub auth_limiter: Arc<auth_rate_limit::AuthRateLimiter>,
354    pub idempotency_store: Arc<IdempotencyStore>,
355    pub whatsapp: Option<Arc<WhatsAppChannel>>,
356    /// `WhatsApp` app secret for webhook signature verification (`X-Hub-Signature-256`)
357    pub whatsapp_app_secret: Option<Arc<str>>,
358    pub linq: Option<Arc<LinqChannel>>,
359    /// Linq webhook signing secret for signature verification
360    pub linq_signing_secret: Option<Arc<str>>,
361    pub nextcloud_talk: Option<Arc<NextcloudTalkChannel>>,
362    /// Nextcloud Talk webhook secret for signature verification
363    pub nextcloud_talk_webhook_secret: Option<Arc<str>>,
364    pub wati: Option<Arc<WatiChannel>>,
365    /// Gmail Pub/Sub push notification channel
366    pub gmail_push: Option<Arc<GmailPushChannel>>,
367    /// Observability backend for metrics scraping
368    pub observer: Arc<dyn crate::observability::Observer>,
369    /// Registered tool specs (for web dashboard tools page)
370    pub tools_registry: Arc<Vec<ToolSpec>>,
371    /// Cost tracker (optional, for web dashboard cost page)
372    pub cost_tracker: Option<Arc<CostTracker>>,
373    /// Audit logger (optional, for web dashboard audit viewer)
374    pub audit_logger: Option<Arc<crate::security::audit::AuditLogger>>,
375    /// SSE broadcast channel for real-time events
376    pub event_tx: tokio::sync::broadcast::Sender<serde_json::Value>,
377    /// Shutdown signal sender for graceful shutdown
378    pub shutdown_tx: tokio::sync::watch::Sender<bool>,
379    /// Registry of dynamically connected nodes
380    pub node_registry: Arc<nodes::NodeRegistry>,
381    /// Path prefix for reverse-proxy deployments (empty string = no prefix)
382    pub path_prefix: String,
383    /// Session backend for persisting gateway WS chat sessions
384    pub session_backend: Option<Arc<dyn SessionBackend>>,
385    /// Per-session actor queue for serializing concurrent turns
386    pub session_queue: Arc<session_queue::SessionActorQueue>,
387    /// Device registry for paired device management
388    pub device_registry: Option<Arc<api_pairing::DeviceRegistry>>,
389    /// Pending pairing request store
390    pub pending_pairings: Option<Arc<api_pairing::PairingStore>>,
391    /// Shared canvas store for Live Canvas (A2UI) system
392    pub canvas_store: CanvasStore,
393    /// MCP registry for direct tool invocation from HTTP handlers (memory graph, etc.)
394    pub mcp_registry: Option<Arc<tools::McpRegistry>>,
395    /// WebAuthn state for hardware key authentication (optional, requires `webauthn` feature)
396    #[cfg(feature = "webauthn")]
397    pub webauthn: Option<Arc<api_webauthn::WebAuthnState>>,
398    /// Registry of pending human approval requests from workflow runs
399    pub approval_registry: Arc<approval_registry::ApprovalRegistry>,
400    /// Base URL (e.g. `http://127.0.0.1:60004`) of the in-process MCP server,
401    /// used by gateway reverse-proxy handlers in [`api_mcp`]. `None` if the
402    /// MCP server failed to bind — proxy handlers should then return 503.
403    /// Populated after MCP server bind in [`run_gateway`].
404    pub mcp_local_url: Option<Arc<str>>,
405}
406
407/// Run the HTTP gateway using axum with proper HTTP/1.1 compliance.
408#[allow(clippy::too_many_lines)]
409pub async fn run_gateway(host: &str, port: u16, config: Config) -> Result<()> {
410    // ── Security: warn on public bind without tunnel or explicit opt-in ──
411    if is_public_bind(host) && config.tunnel.provider == "none" && !config.gateway.allow_public_bind
412    {
413        tracing::warn!(
414            "⚠️  Binding to {host} — gateway will be exposed to all network interfaces.\n\
415             Suggestion: use --host 127.0.0.1 (default), configure a tunnel, or set\n\
416             [gateway] allow_public_bind = true in config.toml to silence this warning.\n\n\
417             Docker/VM: if you are running inside a container or VM, this is expected."
418        );
419    }
420    let config_state = Arc::new(Mutex::new(config.clone()));
421
422    // ── Hooks ──────────────────────────────────────────────────────
423    let hooks: Option<std::sync::Arc<crate::hooks::HookRunner>> = if config.hooks.enabled {
424        Some(std::sync::Arc::new(crate::hooks::HookRunner::new()))
425    } else {
426        None
427    };
428
429    let addr: SocketAddr = format!("{host}:{port}").parse()?;
430    let listener = tokio::net::TcpListener::bind(addr).await?;
431    let actual_port = listener.local_addr()?.port();
432    let display_addr = format!("{host}:{actual_port}");
433
434    let provider: Arc<dyn Provider> = Arc::from(providers::create_resilient_provider_with_options(
435        config.default_provider.as_deref().unwrap_or("openrouter"),
436        config.api_key.as_deref(),
437        config.api_url.as_deref(),
438        &config.reliability,
439        &providers::ProviderRuntimeOptions {
440            auth_profile_override: None,
441            provider_api_url: config.api_url.clone(),
442            construct_dir: config.config_path.parent().map(std::path::PathBuf::from),
443            secrets_encrypt: config.secrets.encrypt,
444            reasoning_enabled: config.runtime.reasoning_enabled,
445            reasoning_effort: config.runtime.reasoning_effort.clone(),
446            provider_timeout_secs: Some(config.provider_timeout_secs),
447            extra_headers: config.extra_headers.clone(),
448            api_path: config.api_path.clone(),
449            provider_max_tokens: config.provider_max_tokens,
450        },
451    )?);
452    let model = config
453        .default_model
454        .clone()
455        .unwrap_or_else(|| "anthropic/claude-sonnet-4".into());
456    let temperature = config.default_temperature;
457    let mem: Arc<dyn Memory> = Arc::from(memory::create_memory_with_storage_and_routes(
458        &config.memory,
459        &config.embedding_routes,
460        Some(&config.storage.provider.config),
461        &config.workspace_dir,
462        config.api_key.as_deref(),
463    )?);
464    let runtime: Arc<dyn runtime::RuntimeAdapter> =
465        Arc::from(runtime::create_runtime(&config.runtime)?);
466    let security = Arc::new(SecurityPolicy::from_config(
467        &config.autonomy,
468        &config.workspace_dir,
469    ));
470
471    let (composio_key, composio_entity_id) = if config.composio.enabled {
472        (
473            config.composio.api_key.as_deref(),
474            Some(config.composio.entity_id.as_str()),
475        )
476    } else {
477        (None, None)
478    };
479
480    let canvas_store = tools::canvas::global_store();
481
482    let (
483        mut tools_registry_raw,
484        delegate_handle_gw,
485        _reaction_handle_gw,
486        _channel_map_handle,
487        _ask_user_handle_gw,
488        _escalate_handle_gw,
489    ) = tools::all_tools_with_runtime(
490        Arc::new(config.clone()),
491        &security,
492        runtime,
493        Arc::clone(&mem),
494        composio_key,
495        composio_entity_id,
496        &config.browser,
497        &config.http_request,
498        &config.web_fetch,
499        &config.workspace_dir,
500        &config.agents,
501        config.api_key.as_deref(),
502        &config,
503        Some(canvas_store.clone()),
504    );
505
506    // ── Wire MCP tools into the gateway tool registry (non-fatal) ───
507    // Without this, the `/api/tools` endpoint misses MCP tools.
508    // Inject operator + kumiho MCP server configs (same as agent/channels do)
509    // so the gateway can call operator tools directly from HTTP handlers.
510    let gateway_mcp_config = {
511        let mut c = config.clone();
512        c = crate::agent::kumiho::inject_kumiho(c, false);
513        c = crate::agent::operator::inject_operator(c, false);
514        c.mcp
515    };
516    let mut mcp_registry_shared: Option<Arc<tools::McpRegistry>> = None;
517    if gateway_mcp_config.enabled && !gateway_mcp_config.servers.is_empty() {
518        tracing::info!(
519            "Gateway: initializing MCP client — {} server(s) configured",
520            gateway_mcp_config.servers.len()
521        );
522        match tools::McpRegistry::connect_all(&gateway_mcp_config.servers).await {
523            Ok(registry) => {
524                let registry = std::sync::Arc::new(registry);
525                mcp_registry_shared = Some(std::sync::Arc::clone(&registry));
526                if gateway_mcp_config.deferred_loading {
527                    let operator_prefix =
528                        format!("{}__", crate::agent::operator::OPERATOR_SERVER_NAME);
529                    let kumiho_prefix = format!("{}__", crate::agent::kumiho::KUMIHO_SERVER_NAME);
530                    let all_names = registry.tool_names();
531                    let mut eager_count = 0usize;
532
533                    for name in &all_names {
534                        if name.starts_with(&operator_prefix) || name.starts_with(&kumiho_prefix) {
535                            if let Some(def) = registry.get_tool_def(name).await {
536                                let wrapper: std::sync::Arc<dyn tools::Tool> =
537                                    std::sync::Arc::new(tools::McpToolWrapper::new(
538                                        name.clone(),
539                                        def,
540                                        std::sync::Arc::clone(&registry),
541                                    ));
542                                if let Some(ref handle) = delegate_handle_gw {
543                                    handle.write().push(std::sync::Arc::clone(&wrapper));
544                                }
545                                tools_registry_raw.push(Box::new(tools::ArcToolRef(wrapper)));
546                                eager_count += 1;
547                            }
548                        }
549                    }
550
551                    let deferred_set = tools::DeferredMcpToolSet::from_registry_filtered(
552                        std::sync::Arc::clone(&registry),
553                        |name| {
554                            !name.starts_with(&operator_prefix) && !name.starts_with(&kumiho_prefix)
555                        },
556                    )
557                    .await;
558                    tracing::info!(
559                        "Gateway MCP hybrid: {} eager operator+kumiho tool(s), {} deferred stub(s) from {} server(s)",
560                        eager_count,
561                        deferred_set.len(),
562                        registry.server_count()
563                    );
564                    let activated =
565                        std::sync::Arc::new(std::sync::Mutex::new(tools::ActivatedToolSet::new()));
566                    tools_registry_raw.push(Box::new(tools::ToolSearchTool::new(
567                        deferred_set,
568                        activated,
569                    )));
570                } else {
571                    let names = registry.tool_names();
572                    let mut registered = 0usize;
573                    for name in names {
574                        if let Some(def) = registry.get_tool_def(&name).await {
575                            let wrapper: std::sync::Arc<dyn tools::Tool> =
576                                std::sync::Arc::new(tools::McpToolWrapper::new(
577                                    name,
578                                    def,
579                                    std::sync::Arc::clone(&registry),
580                                ));
581                            if let Some(ref handle) = delegate_handle_gw {
582                                handle.write().push(std::sync::Arc::clone(&wrapper));
583                            }
584                            tools_registry_raw.push(Box::new(tools::ArcToolRef(wrapper)));
585                            registered += 1;
586                        }
587                    }
588                    tracing::info!(
589                        "Gateway MCP: {} tool(s) registered from {} server(s)",
590                        registered,
591                        registry.server_count()
592                    );
593                }
594            }
595            Err(e) => {
596                tracing::error!("Gateway MCP registry failed to initialize: {e:#}");
597            }
598        }
599    }
600
601    let tools_registry: Arc<Vec<ToolSpec>> =
602        Arc::new(tools_registry_raw.iter().map(|t| t.spec()).collect());
603
604    // Cost tracker — process-global singleton so channels share the same instance
605    let cost_tracker = CostTracker::get_or_init_global(config.cost.clone(), &config.workspace_dir);
606
607    // Audit logger — optional, for dashboard audit viewer
608    let audit_logger = if config.security.audit.enabled {
609        match crate::security::audit::AuditLogger::new(
610            config.security.audit.clone(),
611            std::path::PathBuf::from(&config.workspace_dir),
612        ) {
613            Ok(logger) => Some(Arc::new(logger)),
614            Err(e) => {
615                tracing::warn!("Audit logger init failed (dashboard audit viewer disabled): {e}");
616                None
617            }
618        }
619    } else {
620        None
621    };
622
623    // SSE broadcast channel for real-time events
624    let (event_tx, _event_rx) = tokio::sync::broadcast::channel::<serde_json::Value>(4096);
625    // Extract webhook secret for authentication
626    let webhook_secret_hash: Option<Arc<str>> =
627        config.channels_config.webhook.as_ref().and_then(|webhook| {
628            webhook.secret.as_ref().and_then(|raw_secret| {
629                let trimmed_secret = raw_secret.trim();
630                (!trimmed_secret.is_empty())
631                    .then(|| Arc::<str>::from(hash_webhook_secret(trimmed_secret)))
632            })
633        });
634
635    // WhatsApp channel (if configured)
636    let whatsapp_channel: Option<Arc<WhatsAppChannel>> = config
637        .channels_config
638        .whatsapp
639        .as_ref()
640        .filter(|wa| wa.is_cloud_config())
641        .map(|wa| {
642            Arc::new(WhatsAppChannel::new(
643                wa.access_token.clone().unwrap_or_default(),
644                wa.phone_number_id.clone().unwrap_or_default(),
645                wa.verify_token.clone().unwrap_or_default(),
646                wa.allowed_numbers.clone(),
647            ))
648        });
649
650    // WhatsApp app secret for webhook signature verification
651    // Priority: environment variable > config file
652    let whatsapp_app_secret: Option<Arc<str>> = std::env::var("CONSTRUCT_WHATSAPP_APP_SECRET")
653        .ok()
654        .and_then(|secret| {
655            let secret = secret.trim();
656            (!secret.is_empty()).then(|| secret.to_owned())
657        })
658        .or_else(|| {
659            config.channels_config.whatsapp.as_ref().and_then(|wa| {
660                wa.app_secret
661                    .as_deref()
662                    .map(str::trim)
663                    .filter(|secret| !secret.is_empty())
664                    .map(ToOwned::to_owned)
665            })
666        })
667        .map(Arc::from);
668
669    // Linq channel (if configured)
670    let linq_channel: Option<Arc<LinqChannel>> = config.channels_config.linq.as_ref().map(|lq| {
671        Arc::new(LinqChannel::new(
672            lq.api_token.clone(),
673            lq.from_phone.clone(),
674            lq.allowed_senders.clone(),
675        ))
676    });
677
678    // Linq signing secret for webhook signature verification
679    // Priority: environment variable > config file
680    let linq_signing_secret: Option<Arc<str>> = std::env::var("CONSTRUCT_LINQ_SIGNING_SECRET")
681        .ok()
682        .and_then(|secret| {
683            let secret = secret.trim();
684            (!secret.is_empty()).then(|| secret.to_owned())
685        })
686        .or_else(|| {
687            config.channels_config.linq.as_ref().and_then(|lq| {
688                lq.signing_secret
689                    .as_deref()
690                    .map(str::trim)
691                    .filter(|secret| !secret.is_empty())
692                    .map(ToOwned::to_owned)
693            })
694        })
695        .map(Arc::from);
696
697    // WATI channel (if configured)
698    let wati_channel: Option<Arc<WatiChannel>> =
699        config.channels_config.wati.as_ref().map(|wati_cfg| {
700            Arc::new(
701                WatiChannel::new(
702                    wati_cfg.api_token.clone(),
703                    wati_cfg.api_url.clone(),
704                    wati_cfg.tenant_id.clone(),
705                    wati_cfg.allowed_numbers.clone(),
706                )
707                .with_transcription(config.transcription.clone()),
708            )
709        });
710
711    // Nextcloud Talk channel (if configured)
712    let nextcloud_talk_channel: Option<Arc<NextcloudTalkChannel>> =
713        config.channels_config.nextcloud_talk.as_ref().map(|nc| {
714            Arc::new(NextcloudTalkChannel::new(
715                nc.base_url.clone(),
716                nc.app_token.clone(),
717                nc.bot_name.clone().unwrap_or_default(),
718                nc.allowed_users.clone(),
719            ))
720        });
721
722    // Nextcloud Talk webhook secret for signature verification
723    // Priority: environment variable > config file
724    let nextcloud_talk_webhook_secret: Option<Arc<str>> =
725        std::env::var("CONSTRUCT_NEXTCLOUD_TALK_WEBHOOK_SECRET")
726            .ok()
727            .and_then(|secret| {
728                let secret = secret.trim();
729                (!secret.is_empty()).then(|| secret.to_owned())
730            })
731            .or_else(|| {
732                config
733                    .channels_config
734                    .nextcloud_talk
735                    .as_ref()
736                    .and_then(|nc| {
737                        nc.webhook_secret
738                            .as_deref()
739                            .map(str::trim)
740                            .filter(|secret| !secret.is_empty())
741                            .map(ToOwned::to_owned)
742                    })
743            })
744            .map(Arc::from);
745
746    // Gmail Push channel (if configured and enabled)
747    let gmail_push_channel: Option<Arc<GmailPushChannel>> = config
748        .channels_config
749        .gmail_push
750        .as_ref()
751        .filter(|gp| gp.enabled)
752        .map(|gp| Arc::new(GmailPushChannel::new(gp.clone())));
753
754    // ── Session persistence for WS chat ─────────────────────
755    let session_backend: Option<Arc<dyn SessionBackend>> = if config.gateway.session_persistence {
756        match SqliteSessionBackend::new(&config.workspace_dir) {
757            Ok(b) => {
758                tracing::info!("Gateway session persistence enabled (SQLite)");
759                if config.gateway.session_ttl_hours > 0 {
760                    if let Ok(cleaned) = b.cleanup_stale(config.gateway.session_ttl_hours) {
761                        if cleaned > 0 {
762                            tracing::info!("Cleaned up {cleaned} stale gateway sessions");
763                        }
764                    }
765                }
766                Some(Arc::new(b))
767            }
768            Err(e) => {
769                tracing::warn!("Session persistence disabled: {e}");
770                None
771            }
772        }
773    } else {
774        None
775    };
776
777    // ── Pairing guard ──────────────────────────────────────
778    let pairing = Arc::new(PairingGuard::new(
779        config.gateway.require_pairing,
780        &config.gateway.paired_tokens,
781    ));
782    let rate_limit_max_keys = normalize_max_keys(
783        config.gateway.rate_limit_max_keys,
784        RATE_LIMIT_MAX_KEYS_DEFAULT,
785    );
786    let rate_limiter = Arc::new(GatewayRateLimiter::new(
787        config.gateway.pair_rate_limit_per_minute,
788        config.gateway.webhook_rate_limit_per_minute,
789        rate_limit_max_keys,
790    ));
791    let idempotency_max_keys = normalize_max_keys(
792        config.gateway.idempotency_max_keys,
793        IDEMPOTENCY_MAX_KEYS_DEFAULT,
794    );
795    let idempotency_store = Arc::new(IdempotencyStore::new(
796        Duration::from_secs(config.gateway.idempotency_ttl_secs.max(1)),
797        idempotency_max_keys,
798    ));
799
800    // Resolve optional path prefix for reverse-proxy deployments.
801    let path_prefix: Option<&str> = config
802        .gateway
803        .path_prefix
804        .as_deref()
805        .filter(|p| !p.is_empty());
806
807    // ── Tunnel ────────────────────────────────────────────────
808    let tunnel = crate::tunnel::create_tunnel(&config.tunnel)?;
809    let mut tunnel_url: Option<String> = None;
810
811    if let Some(ref tun) = tunnel {
812        println!("🔗 Starting {} tunnel...", tun.name());
813        match tun.start(host, actual_port).await {
814            Ok(url) => {
815                println!("🌐 Tunnel active: {url}");
816                tunnel_url = Some(url);
817            }
818            Err(e) => {
819                println!("⚠️  Tunnel failed to start: {e}");
820                println!("   Falling back to local-only mode.");
821            }
822        }
823    }
824
825    let pfx = path_prefix.unwrap_or("");
826    println!("🦀 Construct Gateway listening on http://{display_addr}{pfx}");
827    if let Some(ref url) = tunnel_url {
828        println!("  🌐 Public URL: {url}");
829    }
830    println!("  🌐 Web Dashboard: http://{display_addr}{pfx}/");
831    if let Some(code) = pairing.pairing_code() {
832        println!();
833        println!("  🔐 PAIRING REQUIRED — use this one-time code:");
834        println!("     ┌──────────────┐");
835        println!("     │  {code}  │");
836        println!("     └──────────────┘");
837        println!("     Send: POST {pfx}/pair with header X-Pairing-Code: {code}");
838    } else if pairing.require_pairing() {
839        println!("  🔒 Pairing: ACTIVE (bearer token required)");
840        println!("     To pair a new device: construct gateway get-paircode --new");
841        println!();
842    } else {
843        println!("  ⚠️  Pairing: DISABLED (all requests accepted)");
844        println!();
845    }
846    println!("  POST {pfx}/pair      — pair a new client (X-Pairing-Code header)");
847    println!("  POST {pfx}/webhook   — {{\"message\": \"your prompt\"}}");
848    if whatsapp_channel.is_some() {
849        println!("  GET  {pfx}/whatsapp  — Meta webhook verification");
850        println!("  POST {pfx}/whatsapp  — WhatsApp message webhook");
851    }
852    if linq_channel.is_some() {
853        println!("  POST {pfx}/linq      — Linq message webhook (iMessage/RCS/SMS)");
854    }
855    if wati_channel.is_some() {
856        println!("  GET  {pfx}/wati      — WATI webhook verification");
857        println!("  POST {pfx}/wati      — WATI message webhook");
858    }
859    if nextcloud_talk_channel.is_some() {
860        println!("  POST {pfx}/nextcloud-talk — Nextcloud Talk bot webhook");
861    }
862    println!("  GET  {pfx}/api/*     — REST API (bearer token required)");
863    println!("  GET  {pfx}/ws/chat   — WebSocket agent chat");
864    if config.nodes.enabled {
865        println!("  GET  {pfx}/ws/nodes  — WebSocket node discovery");
866    }
867    println!("  GET  {pfx}/health    — health check");
868    println!("  GET  {pfx}/metrics   — Prometheus metrics");
869    println!("  Press Ctrl+C to stop.\n");
870
871    crate::health::mark_component_ok("gateway");
872
873    // Fire gateway start hook
874    if let Some(ref hooks) = hooks {
875        hooks.fire_gateway_start(host, actual_port).await;
876    }
877
878    // Wrap observer with broadcast capability for SSE
879    let broadcast_observer: Arc<dyn crate::observability::Observer> =
880        Arc::new(sse::BroadcastObserver::new(
881            crate::observability::create_observer(&config.observability),
882            event_tx.clone(),
883        ));
884
885    let (shutdown_tx, mut shutdown_rx) = tokio::sync::watch::channel(false);
886
887    // Node registry for dynamic node discovery
888    let node_registry = Arc::new(nodes::NodeRegistry::new(config.nodes.max_nodes));
889
890    // Device registry and pairing store (only when pairing is required)
891    let device_registry = if config.gateway.require_pairing {
892        Some(Arc::new(api_pairing::DeviceRegistry::new(
893            &config.workspace_dir,
894        )?))
895    } else {
896        None
897    };
898    let pending_pairings = if config.gateway.require_pairing {
899        Some(Arc::new(api_pairing::PairingStore::new(
900            config.gateway.pairing_dashboard.max_pending_codes,
901        )))
902    } else {
903        None
904    };
905
906    // ── Build RuntimeHandles for the in-process MCP server ──────────────
907    //
908    // The MCP task needs live handles so the tools the standalone binary
909    // used to skip (workspace, channel-bound tools, session_store, discord
910    // memory, delegate, …) register properly. We clone individual Arcs out
911    // of the pieces we already built above and feed them in.
912    //
913    // Tools that keep shared handles (reaction/ask_user/escalate/poll) are
914    // easiest to wire by handing their already-built, channel-map-aware
915    // instances over via `pre_built_tools`. We do this by building a second
916    // parallel Arc-vec via `all_tools_with_runtime` — the per-tool state is
917    // stateless for registry purposes, and delegate/swarm get their own
918    // per-call depth counters anyway.
919    let mcp_workspace_dir = config.workspace_dir.clone();
920    let mcp_config_snapshot = config.clone();
921    let mcp_runtime_handles = {
922        let mut rh = crate::mcp_server::RuntimeHandles::empty();
923
924        // Workspace manager (if workspace isolation is on).
925        if config.workspace.enabled {
926            let workspaces_dir = if config.workspace.workspaces_dir.starts_with("~/") {
927                let home = directories::UserDirs::new()
928                    .map(|u| u.home_dir().to_path_buf())
929                    .unwrap_or_else(|| std::path::PathBuf::from("."));
930                home.join(&config.workspace.workspaces_dir[2..])
931            } else {
932                std::path::PathBuf::from(&config.workspace.workspaces_dir)
933            };
934            let manager = crate::config::workspace::WorkspaceManager::new(workspaces_dir);
935            rh.workspace_manager = Some(Arc::new(tokio::sync::RwLock::new(manager)));
936        }
937
938        // Session backend from channels::session_store::SessionStore.
939        if let Ok(store) = crate::channels::session_store::SessionStore::new(&config.workspace_dir)
940        {
941            let backend: Arc<dyn crate::channels::session_backend::SessionBackend> =
942                Arc::new(store);
943            rh.session_store = Some(backend);
944        }
945
946        // Discord memory (formerly for discord_search) — removed with the
947        // SQLite backend; persistent cross-session memory should use Kumiho MCP.
948
949        // Agent config + provider options for delegate / swarm.
950        if !config.agents.is_empty() {
951            rh.agent_config = Some(Arc::new(config.agents.clone()));
952            rh.fallback_api_key = config.api_key.as_deref().map(Arc::<str>::from);
953            rh.provider_runtime_options =
954                Some(Arc::new(crate::providers::ProviderRuntimeOptions {
955                    auth_profile_override: None,
956                    provider_api_url: config.api_url.clone(),
957                    construct_dir: config.config_path.parent().map(std::path::PathBuf::from),
958                    secrets_encrypt: config.secrets.encrypt,
959                    reasoning_enabled: config.runtime.reasoning_enabled,
960                    reasoning_effort: config.runtime.reasoning_effort.clone(),
961                    provider_timeout_secs: Some(config.provider_timeout_secs),
962                    extra_headers: config.extra_headers.clone(),
963                    api_path: config.api_path.clone(),
964                    provider_max_tokens: config.provider_max_tokens,
965                }));
966        }
967
968        // Build a second parallel tool set and hand it to MCP via
969        // `pre_built_tools` so every channel-aware / delegate tool lands in
970        // the MCP registry fully wired (even though its ChannelMap handle is
971        // distinct from the gateway's — the channel supervisor's populate
972        // path still routes through the gateway's tool instances; the MCP
973        // versions advertise and execute independently).
974        let (mcp_tools_boxed, _d_h, _r_h, _c_h, _a_h, _e_h) = tools::all_tools_with_runtime(
975            Arc::new(config.clone()),
976            &security,
977            Arc::new(crate::runtime::NativeRuntime::new()),
978            Arc::clone(&mem),
979            composio_key,
980            composio_entity_id,
981            &config.browser,
982            &config.http_request,
983            &config.web_fetch,
984            &config.workspace_dir,
985            &config.agents,
986            config.api_key.as_deref(),
987            &config,
988            Some(canvas_store.clone()),
989        );
990        // Convert Box<dyn Tool> to Arc<dyn Tool>.
991        let mcp_tool_arcs: Vec<Arc<dyn tools::Tool>> = mcp_tools_boxed
992            .into_iter()
993            .map(|b| Arc::<dyn tools::Tool>::from(b))
994            .collect();
995        rh.pre_built_tools = Some(mcp_tool_arcs);
996
997        rh
998    };
999
1000    let mut state = AppState {
1001        config: config_state,
1002        provider,
1003        model,
1004        temperature,
1005        mem,
1006        auto_save: config.memory.auto_save,
1007        webhook_secret_hash,
1008        pairing,
1009        trust_forwarded_headers: config.gateway.trust_forwarded_headers,
1010        rate_limiter,
1011        auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
1012        idempotency_store,
1013        whatsapp: whatsapp_channel,
1014        whatsapp_app_secret,
1015        linq: linq_channel,
1016        linq_signing_secret,
1017        nextcloud_talk: nextcloud_talk_channel,
1018        nextcloud_talk_webhook_secret,
1019        wati: wati_channel,
1020        gmail_push: gmail_push_channel,
1021        observer: broadcast_observer,
1022        tools_registry,
1023        cost_tracker,
1024        audit_logger,
1025        event_tx,
1026        shutdown_tx,
1027        node_registry,
1028        session_backend,
1029        session_queue: Arc::new(session_queue::SessionActorQueue::new(8, 30, 600)),
1030        device_registry,
1031        pending_pairings,
1032        path_prefix: path_prefix.unwrap_or("").to_string(),
1033        canvas_store,
1034        mcp_registry: mcp_registry_shared,
1035        approval_registry: approval_registry::global(),
1036        // Populated after the in-process MCP server binds (see below).
1037        mcp_local_url: None,
1038        #[cfg(feature = "webauthn")]
1039        webauthn: if config.security.webauthn.enabled {
1040            let secret_store = Arc::new(crate::security::SecretStore::new(
1041                &config.workspace_dir,
1042                true,
1043            ));
1044            let wa_config = crate::security::webauthn::WebAuthnConfig {
1045                enabled: true,
1046                rp_id: config.security.webauthn.rp_id.clone(),
1047                rp_origin: config.security.webauthn.rp_origin.clone(),
1048                rp_name: config.security.webauthn.rp_name.clone(),
1049            };
1050            Some(Arc::new(api_webauthn::WebAuthnState {
1051                manager: crate::security::webauthn::WebAuthnManager::new(
1052                    wa_config,
1053                    secret_store,
1054                    &config.workspace_dir,
1055                ),
1056                pending_registrations: parking_lot::Mutex::new(std::collections::HashMap::new()),
1057                pending_authentications: parking_lot::Mutex::new(std::collections::HashMap::new()),
1058            }))
1059        } else {
1060            None
1061        },
1062    };
1063
1064    // ── Spawn the in-process MCP server ─────────────────────────────────
1065    //
1066    // Binds 127.0.0.1:0 (ephemeral, as external clients expect), writes
1067    // ~/.construct/mcp.json with {url,pid,started_at} atomically, and tears
1068    // down on gateway shutdown. If the bind fails we log and move on —
1069    // this is non-fatal for the gateway itself.
1070    let mcp_shutdown_watch = state.shutdown_tx.subscribe();
1071    let mcp_task: Option<tokio::task::JoinHandle<()>> = {
1072        let (mcp_state, mcp_skipped) = crate::mcp_server::server::state_from_runtime(
1073            &mcp_workspace_dir,
1074            &mcp_config_snapshot,
1075            &mcp_runtime_handles,
1076        );
1077        for (name, reason) in &mcp_skipped {
1078            tracing::info!("mcp-server: skipped tool `{name}` — {reason}");
1079        }
1080        tracing::info!(
1081            "mcp-server: advertising {} tools to external MCP clients",
1082            mcp_state.tools.len()
1083        );
1084
1085        match crate::mcp_server::serve_on(SocketAddr::from(([127, 0, 0, 1], 0)), mcp_state).await {
1086            Ok(handle) => {
1087                let url = format!("http://{}/mcp", handle.addr);
1088                if let Err(e) = crate::mcp_server::server::write_discovery_file(&url) {
1089                    tracing::warn!("mcp-server: failed to write discovery file: {e}");
1090                } else {
1091                    tracing::info!(
1092                        "mcp-server: listening on {url} (discovery at ~/.construct/mcp.json)"
1093                    );
1094                }
1095
1096                // Expose the bound base URL (no `/mcp` suffix) to the gateway
1097                // reverse-proxy handlers in `api_mcp`.
1098                state.mcp_local_url = Some(Arc::from(format!("http://{}", handle.addr)));
1099
1100                let mut watch = mcp_shutdown_watch;
1101                Some(tokio::spawn(async move {
1102                    // Wait for the gateway's shutdown signal.
1103                    loop {
1104                        if *watch.borrow() {
1105                            break;
1106                        }
1107                        if watch.changed().await.is_err() {
1108                            break;
1109                        }
1110                    }
1111                    let _ = handle.shutdown.send(());
1112                    let _ = handle.joined.await;
1113                    crate::mcp_server::server::cleanup_discovery_file();
1114                    tracing::info!("mcp-server: stopped");
1115                }))
1116            }
1117            Err(e) => {
1118                tracing::error!("mcp-server: failed to bind: {e}");
1119                None
1120            }
1121        }
1122    };
1123
1124    // Config PUT needs larger body limit (1MB)
1125    let config_put_router = Router::new()
1126        .route("/api/config", put(api::handle_api_config_put))
1127        .layer(RequestBodyLimitLayer::new(1_048_576));
1128
1129    // Memory graph needs longer timeout (aggregates many Kumiho calls via operator).
1130    // Built as a separate router that gets merged AFTER the global timeout layer.
1131    let memory_graph_router = Router::new()
1132        .route(
1133            "/api/memory/graph",
1134            get(api_memory_graph::handle_memory_graph),
1135        )
1136        .with_state(state.clone())
1137        .layer(RequestBodyLimitLayer::new(MAX_BODY_SIZE))
1138        .layer(TimeoutLayer::with_status_code(
1139            StatusCode::REQUEST_TIMEOUT,
1140            Duration::from_secs(60),
1141        ));
1142
1143    // Build router with middleware
1144    let inner = Router::new()
1145        // ── Admin routes (for CLI management) ──
1146        .route("/admin/shutdown", post(handle_admin_shutdown))
1147        .route("/admin/paircode", get(handle_admin_paircode))
1148        .route("/admin/paircode/new", post(handle_admin_paircode_new))
1149        // ── Existing routes ──
1150        .route("/health", get(handle_health))
1151        .route("/metrics", get(handle_metrics))
1152        .route("/pair", post(handle_pair))
1153        .route("/pair/code", get(handle_pair_code))
1154        .route("/webhook", post(handle_webhook))
1155        .route("/whatsapp", get(handle_whatsapp_verify))
1156        .route("/whatsapp", post(handle_whatsapp_message))
1157        .route("/linq", post(handle_linq_webhook))
1158        .route("/wati", get(handle_wati_verify))
1159        .route("/wati", post(handle_wati_webhook))
1160        .route("/nextcloud-talk", post(handle_nextcloud_talk_webhook))
1161        .route("/webhook/gmail", post(handle_gmail_push_webhook))
1162        // ── Claude Code runner hooks ──
1163        .route("/hooks/claude-code", post(api::handle_claude_code_hook))
1164        // ── Web Dashboard API routes ──
1165        .route("/api/status", get(api::handle_api_status))
1166        .route("/api/config", get(api::handle_api_config_get))
1167        .route("/api/tools", get(api::handle_api_tools))
1168        .route("/api/cron", get(api::handle_api_cron_list))
1169        .route("/api/cron", post(api::handle_api_cron_add))
1170        .route(
1171            "/api/cron/settings",
1172            get(api::handle_api_cron_settings_get).patch(api::handle_api_cron_settings_patch),
1173        )
1174        .route(
1175            "/api/cron/{id}",
1176            delete(api::handle_api_cron_delete).patch(api::handle_api_cron_patch),
1177        )
1178        .route("/api/cron/{id}/runs", get(api::handle_api_cron_runs))
1179        .route("/api/integrations", get(api::handle_api_integrations))
1180        .route(
1181            "/api/integrations/settings",
1182            get(api::handle_api_integrations_settings),
1183        )
1184        .route(
1185            "/api/doctor",
1186            get(api::handle_api_doctor).post(api::handle_api_doctor),
1187        )
1188        // Old /api/memory CRUD removed — use Kumiho via /api/memory/graph instead.
1189        .route("/api/cost", get(api::handle_api_cost))
1190        .route("/api/audit", get(api::handle_api_audit))
1191        .route("/api/audit/verify", get(api::handle_api_audit_verify))
1192        .route("/api/cli-tools", get(api::handle_api_cli_tools))
1193        .route("/api/health", get(api::handle_api_health))
1194        .route("/api/mcp/discovery", get(api_mcp::handle_api_mcp_discovery))
1195        .route("/api/mcp/servers/test", post(api_mcp::handle_api_mcp_servers_test))
1196        // ── MCP HTTP reverse-proxy (browser stays same-origin) ──
1197        .route("/api/mcp/health", get(api_mcp::handle_api_mcp_health))
1198        .route("/api/mcp/session", post(api_mcp::handle_api_mcp_session_create))
1199        .route(
1200            "/api/mcp/session/{session_id}/events",
1201            get(api_mcp::handle_api_mcp_session_events),
1202        )
1203        .route("/api/mcp/call", post(api_mcp::handle_api_mcp_call))
1204        .route("/api/nodes", get(api::handle_api_nodes))
1205        .route("/api/nodes/{node_id}/invoke", post(api::handle_api_node_invoke))
1206        .route("/api/sessions", get(api::handle_api_sessions_list))
1207        .route("/api/sessions/running", get(api::handle_api_sessions_running))
1208        .route(
1209            "/api/sessions/{id}/messages",
1210            get(api::handle_api_session_messages),
1211        )
1212        .route("/api/sessions/{id}", delete(api::handle_api_session_delete).put(api::handle_api_session_rename))
1213        .route("/api/sessions/{id}/state", get(api::handle_api_session_state))
1214        // ── Channel detail API ──
1215        .route("/api/channels", get(api::handle_api_channels))
1216        .route("/api/channel-events", post(api::handle_api_channel_events))
1217        // ── Agent management API (proxied to Kumiho FastAPI) ──
1218        .route("/api/agents", get(api_agents::handle_list_agents).post(api_agents::handle_create_agent))
1219        .route("/api/agents/deprecate", post(api_agents::handle_deprecate_agent))
1220        .route("/api/agents/{*kref}", put(api_agents::handle_update_agent).delete(api_agents::handle_delete_agent))
1221        // ── Skill management API (proxied to Kumiho FastAPI) ──
1222        .route("/api/skills", get(api_skills::handle_list_skills).post(api_skills::handle_create_skill))
1223        .route("/api/skills/deprecate", post(api_skills::handle_deprecate_skill))
1224        .route("/api/skills/{*kref}", get(api_skills::handle_get_skill).put(api_skills::handle_update_skill).delete(api_skills::handle_delete_skill))
1225        // ── Team management API (proxied to Kumiho FastAPI) ──
1226        .route("/api/teams", get(api_teams::handle_list_teams).post(api_teams::handle_create_team))
1227        .route("/api/teams/deprecate", post(api_teams::handle_deprecate_team))
1228        .route("/api/teams/{*kref}", get(api_teams::handle_get_team).put(api_teams::handle_update_team).delete(api_teams::handle_delete_team))
1229        // ── Workflow management API (proxied to Kumiho FastAPI) ──
1230        .route("/api/workflows", get(api_workflows::handle_list_workflows).post(api_workflows::handle_create_workflow))
1231        .route("/api/workflows/deprecate", post(api_workflows::handle_deprecate_workflow))
1232        .route("/api/workflows/run/{name}", post(api_workflows::handle_run_workflow))
1233        .route("/api/workflows/revisions/{*kref}", get(api_workflows::handle_get_workflow_by_revision))
1234        .route("/api/workflows/{*kref}", put(api_workflows::handle_update_workflow).delete(api_workflows::handle_delete_workflow))
1235        .route("/api/workflows/runs", get(api_workflows::handle_list_workflow_runs))
1236        .route("/api/workflows/runs/{run_id}", get(api_workflows::handle_get_workflow_run).delete(api_workflows::handle_delete_workflow_run))
1237        .route("/api/workflows/runs/{run_id}/approve", post(api_workflows::handle_approve_workflow_run))
1238        .route("/api/workflows/runs/{run_id}/retry", post(api_workflows::handle_retry_workflow_run))
1239        .route("/api/workflows/agent-activity/{agent_id}", get(api_workflows::handle_agent_activity))
1240        .route("/api/workflows/dashboard", get(api_workflows::handle_workflow_dashboard))
1241        // ── ClawHub marketplace API ──
1242        .route("/api/clawhub/search", get(api_clawhub::handle_clawhub_search))
1243        .route("/api/clawhub/trending", get(api_clawhub::handle_clawhub_trending))
1244        .route("/api/clawhub/skills/{slug}", get(api_clawhub::handle_clawhub_skill_detail))
1245        .route("/api/clawhub/install/{slug}", post(api_clawhub::handle_clawhub_install))
1246        // NOTE: Memory graph route is merged separately with its own 60s timeout
1247        // ── Generic Kumiho API proxy (for Asset Browser, Memory Auditor, etc.) ──
1248        .route("/api/kumiho/{*path}", get(api_kumiho_proxy::handle_kumiho_proxy))
1249        // ── Artifact body (serve local file bytes referenced by Kumiho) ──
1250        .route("/api/artifact-body", get(api_artifact_body::handle_artifact_body))
1251        // ── Pairing + Device management API ──
1252        .route("/api/pairing/initiate", post(api_pairing::initiate_pairing))
1253        .route("/api/pair", post(api_pairing::submit_pairing_enhanced))
1254        .route("/api/devices", get(api_pairing::list_devices))
1255        .route("/api/devices/{id}", delete(api_pairing::revoke_device))
1256        .route(
1257            "/api/devices/{id}/token/rotate",
1258            post(api_pairing::rotate_token),
1259        )
1260        // ── Live Canvas (A2UI) routes ──
1261        .route("/api/canvas", get(canvas::handle_canvas_list))
1262        .route(
1263            "/api/canvas/{id}",
1264            get(canvas::handle_canvas_get)
1265                .post(canvas::handle_canvas_post)
1266                .delete(canvas::handle_canvas_clear),
1267        )
1268        .route(
1269            "/api/canvas/{id}/history",
1270            get(canvas::handle_canvas_history),
1271        );
1272
1273    // ── WebAuthn hardware key authentication API (requires webauthn feature) ──
1274    #[cfg(feature = "webauthn")]
1275    let inner = inner
1276        .route(
1277            "/api/webauthn/register/start",
1278            post(api_webauthn::handle_register_start),
1279        )
1280        .route(
1281            "/api/webauthn/register/finish",
1282            post(api_webauthn::handle_register_finish),
1283        )
1284        .route(
1285            "/api/webauthn/auth/start",
1286            post(api_webauthn::handle_auth_start),
1287        )
1288        .route(
1289            "/api/webauthn/auth/finish",
1290            post(api_webauthn::handle_auth_finish),
1291        )
1292        .route(
1293            "/api/webauthn/credentials",
1294            get(api_webauthn::handle_list_credentials),
1295        )
1296        .route(
1297            "/api/webauthn/credentials/{id}",
1298            delete(api_webauthn::handle_delete_credential),
1299        );
1300
1301    // ── Plugin management API (requires plugins-wasm feature) ──
1302    #[cfg(feature = "plugins-wasm")]
1303    let inner = inner.route(
1304        "/api/plugins",
1305        get(api_plugins::plugin_routes::list_plugins),
1306    );
1307
1308    let inner = inner
1309        // ── SSE event stream ──
1310        .route("/api/events", get(sse::handle_sse_events))
1311        .route("/api/daemon/logs", get(sse::handle_api_daemon_logs))
1312        // ── WebSocket agent chat ──
1313        .route("/ws/chat", get(ws::handle_ws_chat))
1314        // ── WebSocket canvas updates ──
1315        .route("/ws/canvas/{id}", get(canvas::handle_ws_canvas))
1316        // ── WebSocket node discovery ──
1317        .route("/ws/nodes", get(nodes::handle_ws_nodes))
1318        // ── WebSocket PTY terminal ──
1319        .route("/ws/terminal", get(terminal::handle_ws_terminal))
1320        // ── WebSocket proxy onto the in-process MCP server's session events ──
1321        .route("/ws/mcp/events", get(ws_mcp_events::handle_ws_mcp_events))
1322        // ── Static assets (web dashboard) ──
1323        .route("/_app/{*path}", get(static_files::handle_static))
1324        // ── Config PUT with larger body limit ──
1325        .merge(config_put_router)
1326        // ── SPA fallback: non-API GET requests serve index.html ──
1327        .fallback(get(static_files::handle_spa_fallback))
1328        .with_state(state)
1329        .layer(RequestBodyLimitLayer::new(MAX_BODY_SIZE))
1330        .layer(TimeoutLayer::with_status_code(
1331            StatusCode::REQUEST_TIMEOUT,
1332            Duration::from_secs(gateway_request_timeout_secs()),
1333        ));
1334
1335    // Merge memory graph router (has its own 60s timeout, outside the global 30s)
1336    let inner = inner.merge(memory_graph_router);
1337
1338    // Nest under path prefix when configured (axum strips prefix before routing).
1339    // nest() at "/prefix" handles both "/prefix" and "/prefix/*" but not "/prefix/"
1340    // with a trailing slash, so we add a fallback redirect for that case.
1341    let app = if let Some(prefix) = path_prefix {
1342        let redirect_target = prefix.to_string();
1343        Router::new().nest(prefix, inner).route(
1344            &format!("{prefix}/"),
1345            get(|| async move { axum::response::Redirect::permanent(&redirect_target) }),
1346        )
1347    } else {
1348        inner
1349    };
1350
1351    // ── TLS / mTLS setup ───────────────────────────────────────────
1352    let tls_acceptor = match &config.gateway.tls {
1353        Some(tls_cfg) if tls_cfg.enabled => {
1354            let has_mtls = tls_cfg.client_auth.as_ref().is_some_and(|ca| ca.enabled);
1355            if has_mtls {
1356                tracing::info!("TLS enabled with mutual TLS (mTLS) client verification");
1357            } else {
1358                tracing::info!("TLS enabled (no client certificate requirement)");
1359            }
1360            Some(tls::build_tls_acceptor(tls_cfg)?)
1361        }
1362        _ => None,
1363    };
1364
1365    if let Some(tls_acceptor) = tls_acceptor {
1366        // Manual TLS accept loop — serves each connection via hyper.
1367        let app = app.into_make_service_with_connect_info::<SocketAddr>();
1368        let mut app = app;
1369
1370        let mut shutdown_signal = shutdown_rx;
1371        loop {
1372            tokio::select! {
1373                conn = listener.accept() => {
1374                    let (tcp_stream, remote_addr) = conn?;
1375                    let tls_acceptor = tls_acceptor.clone();
1376                    let svc = tower::MakeService::<
1377                        SocketAddr,
1378                        hyper::Request<hyper::body::Incoming>,
1379                    >::make_service(&mut app, remote_addr)
1380                    .await
1381                    .expect("infallible make_service");
1382
1383                    tokio::spawn(async move {
1384                        let tls_stream = match tls_acceptor.accept(tcp_stream).await {
1385                            Ok(s) => s,
1386                            Err(e) => {
1387                                tracing::debug!("TLS handshake failed from {remote_addr}: {e}");
1388                                return;
1389                            }
1390                        };
1391                        let io = hyper_util::rt::TokioIo::new(tls_stream);
1392                        let hyper_svc = hyper::service::service_fn(move |req: hyper::Request<hyper::body::Incoming>| {
1393                            let mut svc = svc.clone();
1394                            async move {
1395                                tower::Service::call(&mut svc, req).await
1396                            }
1397                        });
1398                        if let Err(e) = hyper_util::server::conn::auto::Builder::new(
1399                            hyper_util::rt::TokioExecutor::new(),
1400                        )
1401                        .serve_connection(io, hyper_svc)
1402                        .await
1403                        {
1404                            tracing::debug!("connection error from {remote_addr}: {e}");
1405                        }
1406                    });
1407                }
1408                _ = shutdown_signal.changed() => {
1409                    tracing::info!("🦀 Construct Gateway shutting down...");
1410                    break;
1411                }
1412            }
1413        }
1414    } else {
1415        // Plain TCP — use axum's built-in serve.
1416        axum::serve(
1417            listener,
1418            app.into_make_service_with_connect_info::<SocketAddr>(),
1419        )
1420        .with_graceful_shutdown(async move {
1421            let _ = shutdown_rx.changed().await;
1422            tracing::info!("🦀 Construct Gateway shutting down...");
1423        })
1424        .await?;
1425    }
1426
1427    // Wait for the in-process MCP task to finish its own graceful shutdown.
1428    // It watches the same `shutdown_tx` we just flipped above.
1429    if let Some(task) = mcp_task {
1430        let _ = task.await;
1431    }
1432
1433    Ok(())
1434}
1435
1436// ══════════════════════════════════════════════════════════════════════════════
1437// AXUM HANDLERS
1438// ══════════════════════════════════════════════════════════════════════════════
1439
1440/// GET /health — always public (no secrets leaked)
1441async fn handle_health(State(state): State<AppState>) -> impl IntoResponse {
1442    let body = serde_json::json!({
1443        "status": "ok",
1444        "paired": state.pairing.is_paired(),
1445        "require_pairing": state.pairing.require_pairing(),
1446        "runtime": crate::health::snapshot_json(),
1447    });
1448    Json(body)
1449}
1450
1451/// Prometheus content type for text exposition format.
1452const PROMETHEUS_CONTENT_TYPE: &str = "text/plain; version=0.0.4; charset=utf-8";
1453
1454fn prometheus_disabled_hint() -> String {
1455    String::from(
1456        "# Prometheus backend not enabled. Set [observability] backend = \"prometheus\" in config.\n",
1457    )
1458}
1459
1460#[cfg(feature = "observability-prometheus")]
1461fn prometheus_observer_from_state(
1462    observer: &dyn crate::observability::Observer,
1463) -> Option<&crate::observability::PrometheusObserver> {
1464    observer
1465        .as_any()
1466        .downcast_ref::<crate::observability::PrometheusObserver>()
1467        .or_else(|| {
1468            observer
1469                .as_any()
1470                .downcast_ref::<sse::BroadcastObserver>()
1471                .and_then(|broadcast| {
1472                    broadcast
1473                        .inner()
1474                        .as_any()
1475                        .downcast_ref::<crate::observability::PrometheusObserver>()
1476                })
1477        })
1478}
1479
1480/// GET /metrics — Prometheus text exposition format
1481async fn handle_metrics(State(state): State<AppState>) -> impl IntoResponse {
1482    let body = {
1483        #[cfg(feature = "observability-prometheus")]
1484        {
1485            if let Some(prom) = prometheus_observer_from_state(state.observer.as_ref()) {
1486                prom.encode()
1487            } else {
1488                prometheus_disabled_hint()
1489            }
1490        }
1491        #[cfg(not(feature = "observability-prometheus"))]
1492        {
1493            let _ = &state;
1494            prometheus_disabled_hint()
1495        }
1496    };
1497
1498    (
1499        StatusCode::OK,
1500        [(header::CONTENT_TYPE, PROMETHEUS_CONTENT_TYPE)],
1501        body,
1502    )
1503}
1504
1505/// POST /pair — exchange one-time code for bearer token
1506#[axum::debug_handler]
1507async fn handle_pair(
1508    State(state): State<AppState>,
1509    ConnectInfo(peer_addr): ConnectInfo<SocketAddr>,
1510    headers: HeaderMap,
1511) -> impl IntoResponse {
1512    let rate_key =
1513        client_key_from_request(Some(peer_addr), &headers, state.trust_forwarded_headers);
1514    let peer_is_loopback = peer_addr.ip().is_loopback();
1515    if !state.rate_limiter.allow_pair(&rate_key) {
1516        tracing::warn!("/pair rate limit exceeded");
1517        let err = serde_json::json!({
1518            "error": "Too many pairing requests. Please retry later.",
1519            "retry_after": RATE_LIMIT_WINDOW_SECS,
1520        });
1521        return (StatusCode::TOO_MANY_REQUESTS, Json(err));
1522    }
1523
1524    // ── Auth rate limiting (brute-force protection) ──
1525    if let Err(e) = state
1526        .auth_limiter
1527        .check_rate_limit(&rate_key, peer_is_loopback)
1528    {
1529        tracing::warn!("🔐 Pairing auth rate limit exceeded for {rate_key}");
1530        let err = serde_json::json!({
1531            "error": format!("Too many auth attempts. Try again in {}s.", e.retry_after_secs),
1532            "retry_after": e.retry_after_secs,
1533        });
1534        return (StatusCode::TOO_MANY_REQUESTS, Json(err));
1535    }
1536
1537    let code = headers
1538        .get("X-Pairing-Code")
1539        .and_then(|v| v.to_str().ok())
1540        .unwrap_or("");
1541
1542    match state.pairing.try_pair(code, &rate_key).await {
1543        Ok(Some(token)) => {
1544            tracing::info!("🔐 New client paired successfully");
1545            if let Some(ref logger) = state.audit_logger {
1546                let _ =
1547                    logger.log_auth_success("gateway", &format!("Client paired from {rate_key}"));
1548            }
1549            if let Err(err) =
1550                Box::pin(persist_pairing_tokens(state.config.clone(), &state.pairing)).await
1551            {
1552                tracing::error!("🔐 Pairing succeeded but token persistence failed: {err:#}");
1553                let body = serde_json::json!({
1554                    "paired": true,
1555                    "persisted": false,
1556                    "token": token,
1557                    "message": "Paired for this process, but failed to persist token to config.toml. Check config path and write permissions.",
1558                });
1559                return (StatusCode::OK, Json(body));
1560            }
1561
1562            let body = serde_json::json!({
1563                "paired": true,
1564                "persisted": true,
1565                "token": token,
1566                "message": "Save this token — use it as Authorization: Bearer <token>"
1567            });
1568            (StatusCode::OK, Json(body))
1569        }
1570        Ok(None) => {
1571            state
1572                .auth_limiter
1573                .record_attempt(&rate_key, peer_is_loopback);
1574            tracing::warn!("🔐 Pairing attempt with invalid code");
1575            if let Some(ref logger) = state.audit_logger {
1576                let _ = logger
1577                    .log_auth_failure("gateway", &format!("Invalid pairing code from {rate_key}"));
1578            }
1579            let err = serde_json::json!({"error": "Invalid pairing code"});
1580            (StatusCode::FORBIDDEN, Json(err))
1581        }
1582        Err(lockout_secs) => {
1583            tracing::warn!(
1584                "🔐 Pairing locked out — too many failed attempts ({lockout_secs}s remaining)"
1585            );
1586            if let Some(ref logger) = state.audit_logger {
1587                let _ = logger.log_auth_failure(
1588                    "gateway",
1589                    &format!("Pairing lockout for {rate_key} ({lockout_secs}s)"),
1590                );
1591            }
1592            let err = serde_json::json!({
1593                "error": format!("Too many failed attempts. Try again in {lockout_secs}s."),
1594                "retry_after": lockout_secs
1595            });
1596            (StatusCode::TOO_MANY_REQUESTS, Json(err))
1597        }
1598    }
1599}
1600
1601async fn persist_pairing_tokens(config: Arc<Mutex<Config>>, pairing: &PairingGuard) -> Result<()> {
1602    let paired_tokens = pairing.tokens();
1603    // This is needed because parking_lot's guard is not Send so we clone the inner
1604    // this should be removed once async mutexes are used everywhere
1605    let mut updated_cfg = { config.lock().clone() };
1606    updated_cfg.gateway.paired_tokens = paired_tokens;
1607    updated_cfg
1608        .save()
1609        .await
1610        .context("Failed to persist paired tokens to config.toml")?;
1611
1612    // Keep shared runtime config in sync with persisted tokens.
1613    *config.lock() = updated_cfg;
1614    Ok(())
1615}
1616
1617/// Simple chat for webhook endpoint (no tools, for backward compatibility and testing).
1618async fn run_gateway_chat_simple(state: &AppState, message: &str) -> anyhow::Result<String> {
1619    let user_messages = vec![ChatMessage::user(message)];
1620
1621    // Keep webhook/gateway prompts aligned with channel behavior by injecting
1622    // workspace-aware system context before model invocation.
1623    let system_prompt = {
1624        let config_guard = state.config.lock();
1625        crate::channels::build_system_prompt(
1626            &config_guard.workspace_dir,
1627            &state.model,
1628            &[], // tools - empty for simple chat
1629            &[], // skills
1630            Some(&config_guard.identity),
1631            None, // bootstrap_max_chars - use default
1632        )
1633    };
1634
1635    let mut messages = Vec::with_capacity(1 + user_messages.len());
1636    messages.push(ChatMessage::system(system_prompt));
1637    messages.extend(user_messages);
1638
1639    let multimodal_config = state.config.lock().multimodal.clone();
1640    let prepared =
1641        crate::multimodal::prepare_messages_for_provider(&messages, &multimodal_config).await?;
1642
1643    state
1644        .provider
1645        .chat_with_history(&prepared.messages, &state.model, state.temperature)
1646        .await
1647}
1648
1649/// Full-featured chat with tools for channel handlers (WhatsApp, Linq, Nextcloud Talk).
1650async fn run_gateway_chat_with_tools(
1651    state: &AppState,
1652    message: &str,
1653    session_id: Option<&str>,
1654) -> anyhow::Result<String> {
1655    let config = state.config.lock().clone();
1656    Box::pin(crate::agent::process_message(config, message, session_id)).await
1657}
1658
1659/// Webhook request body
1660#[derive(serde::Deserialize)]
1661pub struct WebhookBody {
1662    pub message: String,
1663}
1664
1665/// POST /webhook — main webhook endpoint
1666async fn handle_webhook(
1667    State(state): State<AppState>,
1668    ConnectInfo(peer_addr): ConnectInfo<SocketAddr>,
1669    headers: HeaderMap,
1670    body: Result<Json<WebhookBody>, axum::extract::rejection::JsonRejection>,
1671) -> impl IntoResponse {
1672    let rate_key =
1673        client_key_from_request(Some(peer_addr), &headers, state.trust_forwarded_headers);
1674    let peer_is_loopback = peer_addr.ip().is_loopback();
1675    if !state.rate_limiter.allow_webhook(&rate_key) {
1676        tracing::warn!("/webhook rate limit exceeded");
1677        let err = serde_json::json!({
1678            "error": "Too many webhook requests. Please retry later.",
1679            "retry_after": RATE_LIMIT_WINDOW_SECS,
1680        });
1681        return (StatusCode::TOO_MANY_REQUESTS, Json(err));
1682    }
1683
1684    // ── Bearer token auth (pairing) with auth rate limiting ──
1685    if state.pairing.require_pairing() {
1686        if let Err(e) = state
1687            .auth_limiter
1688            .check_rate_limit(&rate_key, peer_is_loopback)
1689        {
1690            tracing::warn!("Webhook: auth rate limit exceeded for {rate_key}");
1691            let err = serde_json::json!({
1692                "error": format!("Too many auth attempts. Try again in {}s.", e.retry_after_secs),
1693                "retry_after": e.retry_after_secs,
1694            });
1695            return (StatusCode::TOO_MANY_REQUESTS, Json(err));
1696        }
1697        let auth = headers
1698            .get(header::AUTHORIZATION)
1699            .and_then(|v| v.to_str().ok())
1700            .unwrap_or("");
1701        let token = auth.strip_prefix("Bearer ").unwrap_or("");
1702        if !state.pairing.is_authenticated(token) {
1703            state
1704                .auth_limiter
1705                .record_attempt(&rate_key, peer_is_loopback);
1706            tracing::warn!("Webhook: rejected — not paired / invalid bearer token");
1707            let err = serde_json::json!({
1708                "error": "Unauthorized — pair first via POST /pair, then send Authorization: Bearer <token>"
1709            });
1710            return (StatusCode::UNAUTHORIZED, Json(err));
1711        }
1712    }
1713
1714    // ── Webhook secret auth (optional, additional layer) ──
1715    if let Some(ref secret_hash) = state.webhook_secret_hash {
1716        let header_hash = headers
1717            .get("X-Webhook-Secret")
1718            .and_then(|v| v.to_str().ok())
1719            .map(str::trim)
1720            .filter(|value| !value.is_empty())
1721            .map(hash_webhook_secret);
1722        match header_hash {
1723            Some(val) if constant_time_eq(&val, secret_hash.as_ref()) => {}
1724            _ => {
1725                tracing::warn!("Webhook: rejected request — invalid or missing X-Webhook-Secret");
1726                let err = serde_json::json!({"error": "Unauthorized — invalid or missing X-Webhook-Secret header"});
1727                return (StatusCode::UNAUTHORIZED, Json(err));
1728            }
1729        }
1730    }
1731
1732    // ── Parse body ──
1733    let Json(webhook_body) = match body {
1734        Ok(b) => b,
1735        Err(e) => {
1736            tracing::warn!("Webhook JSON parse error: {e}");
1737            let err = serde_json::json!({
1738                "error": "Invalid JSON body. Expected: {\"message\": \"...\"}"
1739            });
1740            return (StatusCode::BAD_REQUEST, Json(err));
1741        }
1742    };
1743
1744    // ── Idempotency (optional) ──
1745    if let Some(idempotency_key) = headers
1746        .get("X-Idempotency-Key")
1747        .and_then(|v| v.to_str().ok())
1748        .map(str::trim)
1749        .filter(|value| !value.is_empty())
1750    {
1751        if !state.idempotency_store.record_if_new(idempotency_key) {
1752            tracing::info!("Webhook duplicate ignored (idempotency key: {idempotency_key})");
1753            let body = serde_json::json!({
1754                "status": "duplicate",
1755                "idempotent": true,
1756                "message": "Request already processed for this idempotency key"
1757            });
1758            return (StatusCode::OK, Json(body));
1759        }
1760    }
1761
1762    let message = &webhook_body.message;
1763    let session_id = webhook_session_id(&headers);
1764
1765    if state.auto_save && !memory::should_skip_autosave_content(message) {
1766        let key = webhook_memory_key();
1767        let _ = state
1768            .mem
1769            .store(
1770                &key,
1771                message,
1772                MemoryCategory::Conversation,
1773                session_id.as_deref(),
1774            )
1775            .await;
1776    }
1777
1778    let provider_label = state
1779        .config
1780        .lock()
1781        .default_provider
1782        .clone()
1783        .unwrap_or_else(|| "unknown".to_string());
1784    let model_label = state.model.clone();
1785    let started_at = Instant::now();
1786
1787    state
1788        .observer
1789        .record_event(&crate::observability::ObserverEvent::AgentStart {
1790            provider: provider_label.clone(),
1791            model: model_label.clone(),
1792        });
1793    state
1794        .observer
1795        .record_event(&crate::observability::ObserverEvent::LlmRequest {
1796            provider: provider_label.clone(),
1797            model: model_label.clone(),
1798            messages_count: 1,
1799        });
1800
1801    match run_gateway_chat_simple(&state, message).await {
1802        Ok(response) => {
1803            let duration = started_at.elapsed();
1804            state
1805                .observer
1806                .record_event(&crate::observability::ObserverEvent::LlmResponse {
1807                    provider: provider_label.clone(),
1808                    model: model_label.clone(),
1809                    duration,
1810                    success: true,
1811                    error_message: None,
1812                    input_tokens: None,
1813                    output_tokens: None,
1814                });
1815            state.observer.record_metric(
1816                &crate::observability::traits::ObserverMetric::RequestLatency(duration),
1817            );
1818            state
1819                .observer
1820                .record_event(&crate::observability::ObserverEvent::AgentEnd {
1821                    provider: provider_label,
1822                    model: model_label,
1823                    duration,
1824                    tokens_used: None,
1825                    cost_usd: None,
1826                });
1827
1828            let body = serde_json::json!({"response": response, "model": state.model});
1829            (StatusCode::OK, Json(body))
1830        }
1831        Err(e) => {
1832            let duration = started_at.elapsed();
1833            let sanitized = providers::sanitize_api_error(&e.to_string());
1834
1835            state
1836                .observer
1837                .record_event(&crate::observability::ObserverEvent::LlmResponse {
1838                    provider: provider_label.clone(),
1839                    model: model_label.clone(),
1840                    duration,
1841                    success: false,
1842                    error_message: Some(sanitized.clone()),
1843                    input_tokens: None,
1844                    output_tokens: None,
1845                });
1846            state.observer.record_metric(
1847                &crate::observability::traits::ObserverMetric::RequestLatency(duration),
1848            );
1849            state
1850                .observer
1851                .record_event(&crate::observability::ObserverEvent::Error {
1852                    component: "gateway".to_string(),
1853                    message: sanitized.clone(),
1854                });
1855            state
1856                .observer
1857                .record_event(&crate::observability::ObserverEvent::AgentEnd {
1858                    provider: provider_label,
1859                    model: model_label,
1860                    duration,
1861                    tokens_used: None,
1862                    cost_usd: None,
1863                });
1864
1865            tracing::error!("Webhook provider error: {}", sanitized);
1866            let err = serde_json::json!({"error": "LLM request failed"});
1867            (StatusCode::INTERNAL_SERVER_ERROR, Json(err))
1868        }
1869    }
1870}
1871
1872/// `WhatsApp` verification query params
1873#[derive(serde::Deserialize)]
1874pub struct WhatsAppVerifyQuery {
1875    #[serde(rename = "hub.mode")]
1876    pub mode: Option<String>,
1877    #[serde(rename = "hub.verify_token")]
1878    pub verify_token: Option<String>,
1879    #[serde(rename = "hub.challenge")]
1880    pub challenge: Option<String>,
1881}
1882
1883/// GET /whatsapp — Meta webhook verification
1884async fn handle_whatsapp_verify(
1885    State(state): State<AppState>,
1886    Query(params): Query<WhatsAppVerifyQuery>,
1887) -> impl IntoResponse {
1888    let Some(ref wa) = state.whatsapp else {
1889        return (StatusCode::NOT_FOUND, "WhatsApp not configured".to_string());
1890    };
1891
1892    // Verify the token matches (constant-time comparison to prevent timing attacks)
1893    let token_matches = params
1894        .verify_token
1895        .as_deref()
1896        .is_some_and(|t| constant_time_eq(t, wa.verify_token()));
1897    if params.mode.as_deref() == Some("subscribe") && token_matches {
1898        if let Some(ch) = params.challenge {
1899            tracing::info!("WhatsApp webhook verified successfully");
1900            return (StatusCode::OK, ch);
1901        }
1902        return (StatusCode::BAD_REQUEST, "Missing hub.challenge".to_string());
1903    }
1904
1905    tracing::warn!("WhatsApp webhook verification failed — token mismatch");
1906    (StatusCode::FORBIDDEN, "Forbidden".to_string())
1907}
1908
1909/// Verify `WhatsApp` webhook signature (`X-Hub-Signature-256`).
1910/// Returns true if the signature is valid, false otherwise.
1911/// See: <https://developers.facebook.com/docs/graph-api/webhooks/getting-started#verification-requests>
1912pub fn verify_whatsapp_signature(app_secret: &str, body: &[u8], signature_header: &str) -> bool {
1913    use hmac::{Hmac, Mac};
1914    use sha2::Sha256;
1915
1916    // Signature format: "sha256=<hex_signature>"
1917    let Some(hex_sig) = signature_header.strip_prefix("sha256=") else {
1918        return false;
1919    };
1920
1921    // Decode hex signature
1922    let Ok(expected) = hex::decode(hex_sig) else {
1923        return false;
1924    };
1925
1926    // Compute HMAC-SHA256
1927    let Ok(mut mac) = Hmac::<Sha256>::new_from_slice(app_secret.as_bytes()) else {
1928        return false;
1929    };
1930    mac.update(body);
1931
1932    // Constant-time comparison
1933    mac.verify_slice(&expected).is_ok()
1934}
1935
1936/// POST /whatsapp — incoming message webhook
1937async fn handle_whatsapp_message(
1938    State(state): State<AppState>,
1939    headers: HeaderMap,
1940    body: Bytes,
1941) -> impl IntoResponse {
1942    let Some(ref wa) = state.whatsapp else {
1943        return (
1944            StatusCode::NOT_FOUND,
1945            Json(serde_json::json!({"error": "WhatsApp not configured"})),
1946        );
1947    };
1948
1949    // ── Security: Verify X-Hub-Signature-256 if app_secret is configured ──
1950    if let Some(ref app_secret) = state.whatsapp_app_secret {
1951        let signature = headers
1952            .get("X-Hub-Signature-256")
1953            .and_then(|v| v.to_str().ok())
1954            .unwrap_or("");
1955
1956        if !verify_whatsapp_signature(app_secret, &body, signature) {
1957            tracing::warn!(
1958                "WhatsApp webhook signature verification failed (signature: {})",
1959                if signature.is_empty() {
1960                    "missing"
1961                } else {
1962                    "invalid"
1963                }
1964            );
1965            return (
1966                StatusCode::UNAUTHORIZED,
1967                Json(serde_json::json!({"error": "Invalid signature"})),
1968            );
1969        }
1970    }
1971
1972    // Parse JSON body
1973    let Ok(payload) = serde_json::from_slice::<serde_json::Value>(&body) else {
1974        return (
1975            StatusCode::BAD_REQUEST,
1976            Json(serde_json::json!({"error": "Invalid JSON payload"})),
1977        );
1978    };
1979
1980    // Parse messages from the webhook payload
1981    let messages = wa.parse_webhook_payload(&payload);
1982
1983    if messages.is_empty() {
1984        // Acknowledge the webhook even if no messages (could be status updates)
1985        return (StatusCode::OK, Json(serde_json::json!({"status": "ok"})));
1986    }
1987
1988    // Process each message
1989    for msg in &messages {
1990        tracing::info!(
1991            "WhatsApp message from {}: {}",
1992            msg.sender,
1993            truncate_with_ellipsis(&msg.content, 50)
1994        );
1995        let session_id = sender_session_id("whatsapp", msg);
1996
1997        // Auto-save to memory
1998        if state.auto_save && !memory::should_skip_autosave_content(&msg.content) {
1999            let key = whatsapp_memory_key(msg);
2000            let _ = state
2001                .mem
2002                .store(
2003                    &key,
2004                    &msg.content,
2005                    MemoryCategory::Conversation,
2006                    Some(&session_id),
2007                )
2008                .await;
2009        }
2010
2011        match Box::pin(run_gateway_chat_with_tools(
2012            &state,
2013            &msg.content,
2014            Some(&session_id),
2015        ))
2016        .await
2017        {
2018            Ok(response) => {
2019                // Send reply via WhatsApp
2020                if let Err(e) = wa
2021                    .send(&SendMessage::new(response, &msg.reply_target))
2022                    .await
2023                {
2024                    tracing::error!("Failed to send WhatsApp reply: {e}");
2025                }
2026            }
2027            Err(e) => {
2028                tracing::error!("LLM error for WhatsApp message: {e:#}");
2029                let _ = wa
2030                    .send(&SendMessage::new(
2031                        "Sorry, I couldn't process your message right now.",
2032                        &msg.reply_target,
2033                    ))
2034                    .await;
2035            }
2036        }
2037    }
2038
2039    // Acknowledge the webhook
2040    (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
2041}
2042
2043/// POST /linq — incoming message webhook (iMessage/RCS/SMS via Linq)
2044async fn handle_linq_webhook(
2045    State(state): State<AppState>,
2046    headers: HeaderMap,
2047    body: Bytes,
2048) -> impl IntoResponse {
2049    let Some(ref linq) = state.linq else {
2050        return (
2051            StatusCode::NOT_FOUND,
2052            Json(serde_json::json!({"error": "Linq not configured"})),
2053        );
2054    };
2055
2056    let body_str = String::from_utf8_lossy(&body);
2057
2058    // ── Security: Verify X-Webhook-Signature if signing_secret is configured ──
2059    if let Some(ref signing_secret) = state.linq_signing_secret {
2060        let timestamp = headers
2061            .get("X-Webhook-Timestamp")
2062            .and_then(|v| v.to_str().ok())
2063            .unwrap_or("");
2064
2065        let signature = headers
2066            .get("X-Webhook-Signature")
2067            .and_then(|v| v.to_str().ok())
2068            .unwrap_or("");
2069
2070        if !crate::channels::linq::verify_linq_signature(
2071            signing_secret,
2072            &body_str,
2073            timestamp,
2074            signature,
2075        ) {
2076            tracing::warn!(
2077                "Linq webhook signature verification failed (signature: {})",
2078                if signature.is_empty() {
2079                    "missing"
2080                } else {
2081                    "invalid"
2082                }
2083            );
2084            return (
2085                StatusCode::UNAUTHORIZED,
2086                Json(serde_json::json!({"error": "Invalid signature"})),
2087            );
2088        }
2089    }
2090
2091    // Parse JSON body
2092    let Ok(payload) = serde_json::from_slice::<serde_json::Value>(&body) else {
2093        return (
2094            StatusCode::BAD_REQUEST,
2095            Json(serde_json::json!({"error": "Invalid JSON payload"})),
2096        );
2097    };
2098
2099    // Parse messages from the webhook payload
2100    let messages = linq.parse_webhook_payload(&payload);
2101
2102    if messages.is_empty() {
2103        // Acknowledge the webhook even if no messages (could be status/delivery events)
2104        return (StatusCode::OK, Json(serde_json::json!({"status": "ok"})));
2105    }
2106
2107    // Process each message
2108    for msg in &messages {
2109        tracing::info!(
2110            "Linq message from {}: {}",
2111            msg.sender,
2112            truncate_with_ellipsis(&msg.content, 50)
2113        );
2114        let session_id = sender_session_id("linq", msg);
2115
2116        // Auto-save to memory
2117        if state.auto_save && !memory::should_skip_autosave_content(&msg.content) {
2118            let key = linq_memory_key(msg);
2119            let _ = state
2120                .mem
2121                .store(
2122                    &key,
2123                    &msg.content,
2124                    MemoryCategory::Conversation,
2125                    Some(&session_id),
2126                )
2127                .await;
2128        }
2129
2130        // Call the LLM
2131        match Box::pin(run_gateway_chat_with_tools(
2132            &state,
2133            &msg.content,
2134            Some(&session_id),
2135        ))
2136        .await
2137        {
2138            Ok(response) => {
2139                // Send reply via Linq
2140                if let Err(e) = linq
2141                    .send(&SendMessage::new(response, &msg.reply_target))
2142                    .await
2143                {
2144                    tracing::error!("Failed to send Linq reply: {e}");
2145                }
2146            }
2147            Err(e) => {
2148                tracing::error!("LLM error for Linq message: {e:#}");
2149                let _ = linq
2150                    .send(&SendMessage::new(
2151                        "Sorry, I couldn't process your message right now.",
2152                        &msg.reply_target,
2153                    ))
2154                    .await;
2155            }
2156        }
2157    }
2158
2159    // Acknowledge the webhook
2160    (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
2161}
2162
2163/// GET /wati — WATI webhook verification (echoes hub.challenge)
2164async fn handle_wati_verify(
2165    State(state): State<AppState>,
2166    Query(params): Query<WatiVerifyQuery>,
2167) -> impl IntoResponse {
2168    if state.wati.is_none() {
2169        return (StatusCode::NOT_FOUND, "WATI not configured".to_string());
2170    }
2171
2172    // WATI may use Meta-style webhook verification; echo the challenge
2173    if let Some(challenge) = params.challenge {
2174        tracing::info!("WATI webhook verified successfully");
2175        return (StatusCode::OK, challenge);
2176    }
2177
2178    (StatusCode::BAD_REQUEST, "Missing hub.challenge".to_string())
2179}
2180
2181#[derive(Debug, serde::Deserialize)]
2182pub struct WatiVerifyQuery {
2183    #[serde(rename = "hub.challenge")]
2184    pub challenge: Option<String>,
2185}
2186
2187/// POST /wati — incoming WATI WhatsApp message webhook
2188async fn handle_wati_webhook(State(state): State<AppState>, body: Bytes) -> impl IntoResponse {
2189    let Some(ref wati) = state.wati else {
2190        return (
2191            StatusCode::NOT_FOUND,
2192            Json(serde_json::json!({"error": "WATI not configured"})),
2193        );
2194    };
2195
2196    // Parse JSON body
2197    let Ok(payload) = serde_json::from_slice::<serde_json::Value>(&body) else {
2198        return (
2199            StatusCode::BAD_REQUEST,
2200            Json(serde_json::json!({"error": "Invalid JSON payload"})),
2201        );
2202    };
2203
2204    // Detect audio before the synchronous parse
2205    let msg_type = payload.get("type").and_then(|v| v.as_str()).unwrap_or("");
2206
2207    let messages = if matches!(msg_type, "audio" | "voice") {
2208        // Build a synthetic ChannelMessage from the audio transcript
2209        if let Some(transcript) = wati.try_transcribe_audio(&payload).await {
2210            wati.parse_audio_as_message(&payload, transcript)
2211        } else {
2212            vec![]
2213        }
2214    } else {
2215        wati.parse_webhook_payload(&payload)
2216    };
2217
2218    if messages.is_empty() {
2219        return (StatusCode::OK, Json(serde_json::json!({"status": "ok"})));
2220    }
2221
2222    // Process each message
2223    for msg in &messages {
2224        tracing::info!(
2225            "WATI message from {}: {}",
2226            msg.sender,
2227            truncate_with_ellipsis(&msg.content, 50)
2228        );
2229        let session_id = sender_session_id("wati", msg);
2230
2231        // Auto-save to memory
2232        if state.auto_save && !memory::should_skip_autosave_content(&msg.content) {
2233            let key = wati_memory_key(msg);
2234            let _ = state
2235                .mem
2236                .store(
2237                    &key,
2238                    &msg.content,
2239                    MemoryCategory::Conversation,
2240                    Some(&session_id),
2241                )
2242                .await;
2243        }
2244
2245        // Call the LLM
2246        match Box::pin(run_gateway_chat_with_tools(
2247            &state,
2248            &msg.content,
2249            Some(&session_id),
2250        ))
2251        .await
2252        {
2253            Ok(response) => {
2254                // Send reply via WATI
2255                if let Err(e) = wati
2256                    .send(&SendMessage::new(response, &msg.reply_target))
2257                    .await
2258                {
2259                    tracing::error!("Failed to send WATI reply: {e}");
2260                }
2261            }
2262            Err(e) => {
2263                tracing::error!("LLM error for WATI message: {e:#}");
2264                let _ = wati
2265                    .send(&SendMessage::new(
2266                        "Sorry, I couldn't process your message right now.",
2267                        &msg.reply_target,
2268                    ))
2269                    .await;
2270            }
2271        }
2272    }
2273
2274    // Acknowledge the webhook
2275    (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
2276}
2277
2278/// POST /nextcloud-talk — incoming message webhook (Nextcloud Talk bot API)
2279async fn handle_nextcloud_talk_webhook(
2280    State(state): State<AppState>,
2281    headers: HeaderMap,
2282    body: Bytes,
2283) -> impl IntoResponse {
2284    let Some(ref nextcloud_talk) = state.nextcloud_talk else {
2285        return (
2286            StatusCode::NOT_FOUND,
2287            Json(serde_json::json!({"error": "Nextcloud Talk not configured"})),
2288        );
2289    };
2290
2291    let body_str = String::from_utf8_lossy(&body);
2292
2293    // ── Security: Verify Nextcloud Talk HMAC signature if secret is configured ──
2294    if let Some(ref webhook_secret) = state.nextcloud_talk_webhook_secret {
2295        let random = headers
2296            .get("X-Nextcloud-Talk-Random")
2297            .and_then(|v| v.to_str().ok())
2298            .unwrap_or("");
2299
2300        let signature = headers
2301            .get("X-Nextcloud-Talk-Signature")
2302            .and_then(|v| v.to_str().ok())
2303            .unwrap_or("");
2304
2305        if !crate::channels::nextcloud_talk::verify_nextcloud_talk_signature(
2306            webhook_secret,
2307            random,
2308            &body_str,
2309            signature,
2310        ) {
2311            tracing::warn!(
2312                "Nextcloud Talk webhook signature verification failed (signature: {})",
2313                if signature.is_empty() {
2314                    "missing"
2315                } else {
2316                    "invalid"
2317                }
2318            );
2319            return (
2320                StatusCode::UNAUTHORIZED,
2321                Json(serde_json::json!({"error": "Invalid signature"})),
2322            );
2323        }
2324    }
2325
2326    // Parse JSON body
2327    let Ok(payload) = serde_json::from_slice::<serde_json::Value>(&body) else {
2328        return (
2329            StatusCode::BAD_REQUEST,
2330            Json(serde_json::json!({"error": "Invalid JSON payload"})),
2331        );
2332    };
2333
2334    // Parse messages from webhook payload
2335    let messages = nextcloud_talk.parse_webhook_payload(&payload);
2336    if messages.is_empty() {
2337        // Acknowledge webhook even if payload does not contain actionable user messages.
2338        return (StatusCode::OK, Json(serde_json::json!({"status": "ok"})));
2339    }
2340
2341    for msg in &messages {
2342        tracing::info!(
2343            "Nextcloud Talk message from {}: {}",
2344            msg.sender,
2345            truncate_with_ellipsis(&msg.content, 50)
2346        );
2347        let session_id = sender_session_id("nextcloud_talk", msg);
2348
2349        if state.auto_save && !memory::should_skip_autosave_content(&msg.content) {
2350            let key = nextcloud_talk_memory_key(msg);
2351            let _ = state
2352                .mem
2353                .store(
2354                    &key,
2355                    &msg.content,
2356                    MemoryCategory::Conversation,
2357                    Some(&session_id),
2358                )
2359                .await;
2360        }
2361
2362        match Box::pin(run_gateway_chat_with_tools(
2363            &state,
2364            &msg.content,
2365            Some(&session_id),
2366        ))
2367        .await
2368        {
2369            Ok(response) => {
2370                if let Err(e) = nextcloud_talk
2371                    .send(&SendMessage::new(response, &msg.reply_target))
2372                    .await
2373                {
2374                    tracing::error!("Failed to send Nextcloud Talk reply: {e}");
2375                }
2376            }
2377            Err(e) => {
2378                tracing::error!("LLM error for Nextcloud Talk message: {e:#}");
2379                let _ = nextcloud_talk
2380                    .send(&SendMessage::new(
2381                        "Sorry, I couldn't process your message right now.",
2382                        &msg.reply_target,
2383                    ))
2384                    .await;
2385            }
2386        }
2387    }
2388
2389    (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
2390}
2391
2392/// Maximum request body size for the Gmail webhook endpoint (1 MB).
2393/// Google Pub/Sub messages are typically under 10 KB.
2394const GMAIL_WEBHOOK_MAX_BODY: usize = 1024 * 1024;
2395
2396/// POST /webhook/gmail — incoming Gmail Pub/Sub push notification
2397async fn handle_gmail_push_webhook(
2398    State(state): State<AppState>,
2399    headers: HeaderMap,
2400    body: Bytes,
2401) -> impl IntoResponse {
2402    let Some(ref gmail_push) = state.gmail_push else {
2403        return (
2404            StatusCode::NOT_FOUND,
2405            Json(serde_json::json!({"error": "Gmail push not configured"})),
2406        );
2407    };
2408
2409    // Enforce body size limit.
2410    if body.len() > GMAIL_WEBHOOK_MAX_BODY {
2411        return (
2412            StatusCode::PAYLOAD_TOO_LARGE,
2413            Json(serde_json::json!({"error": "Request body too large"})),
2414        );
2415    }
2416
2417    // Authenticate the webhook request using a shared secret.
2418    let secret = gmail_push.resolve_webhook_secret();
2419    if !secret.is_empty() {
2420        let provided = headers
2421            .get(axum::http::header::AUTHORIZATION)
2422            .and_then(|v| v.to_str().ok())
2423            .and_then(|auth| auth.strip_prefix("Bearer "))
2424            .unwrap_or("");
2425
2426        if provided != secret {
2427            tracing::warn!("Gmail push webhook: unauthorized request");
2428            return (
2429                StatusCode::UNAUTHORIZED,
2430                Json(serde_json::json!({"error": "Unauthorized"})),
2431            );
2432        }
2433    }
2434
2435    let body_str = String::from_utf8_lossy(&body);
2436    let envelope: crate::channels::gmail_push::PubSubEnvelope =
2437        match serde_json::from_str(&body_str) {
2438            Ok(e) => e,
2439            Err(e) => {
2440                tracing::warn!("Gmail push webhook: invalid payload: {e}");
2441                return (
2442                    StatusCode::BAD_REQUEST,
2443                    Json(serde_json::json!({"error": "Invalid Pub/Sub envelope"})),
2444                );
2445            }
2446        };
2447
2448    // Process the notification asynchronously (non-blocking for the webhook response)
2449    let channel = Arc::clone(gmail_push);
2450    tokio::spawn(async move {
2451        if let Err(e) = channel.handle_notification(&envelope).await {
2452            tracing::error!("Gmail push notification processing failed: {e:#}");
2453        }
2454    });
2455
2456    // Acknowledge immediately — Google Pub/Sub requires a 2xx within ~10s
2457    (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
2458}
2459
2460// ══════════════════════════════════════════════════════════════════════════════
2461// ADMIN HANDLERS (for CLI management)
2462// ══════════════════════════════════════════════════════════════════════════════
2463
2464/// Response for admin endpoints
2465#[derive(serde::Serialize)]
2466struct AdminResponse {
2467    success: bool,
2468    message: String,
2469}
2470
2471/// Reject requests that do not originate from a loopback address.
2472fn require_localhost(peer: &SocketAddr) -> Result<(), (StatusCode, Json<serde_json::Value>)> {
2473    if peer.ip().is_loopback() {
2474        Ok(())
2475    } else {
2476        Err((
2477            StatusCode::FORBIDDEN,
2478            Json(serde_json::json!({
2479                "error": "Admin endpoints are restricted to localhost"
2480            })),
2481        ))
2482    }
2483}
2484
2485/// POST /admin/shutdown — graceful shutdown from CLI (localhost only)
2486async fn handle_admin_shutdown(
2487    State(state): State<AppState>,
2488    ConnectInfo(peer): ConnectInfo<SocketAddr>,
2489) -> Result<impl IntoResponse, (StatusCode, Json<serde_json::Value>)> {
2490    require_localhost(&peer)?;
2491    tracing::info!("🔌 Admin shutdown request received — initiating graceful shutdown");
2492
2493    let body = AdminResponse {
2494        success: true,
2495        message: "Gateway shutdown initiated".to_string(),
2496    };
2497
2498    let _ = state.shutdown_tx.send(true);
2499
2500    Ok((StatusCode::OK, Json(body)))
2501}
2502
2503/// GET /admin/paircode — fetch current pairing code (localhost only)
2504async fn handle_admin_paircode(
2505    State(state): State<AppState>,
2506    ConnectInfo(peer): ConnectInfo<SocketAddr>,
2507) -> Result<impl IntoResponse, (StatusCode, Json<serde_json::Value>)> {
2508    require_localhost(&peer)?;
2509    let code = state.pairing.pairing_code();
2510
2511    let body = if let Some(c) = code {
2512        serde_json::json!({
2513            "success": true,
2514            "pairing_required": state.pairing.require_pairing(),
2515            "pairing_code": c,
2516            "message": "Use this one-time code to pair"
2517        })
2518    } else {
2519        serde_json::json!({
2520            "success": true,
2521            "pairing_required": state.pairing.require_pairing(),
2522            "pairing_code": null,
2523            "message": if state.pairing.require_pairing() {
2524                "Pairing is active but no new code available (already paired or code expired)"
2525            } else {
2526                "Pairing is disabled for this gateway"
2527            }
2528        })
2529    };
2530
2531    Ok((StatusCode::OK, Json(body)))
2532}
2533
2534/// POST /admin/paircode/new — generate a new pairing code (localhost only)
2535async fn handle_admin_paircode_new(
2536    State(state): State<AppState>,
2537    ConnectInfo(peer): ConnectInfo<SocketAddr>,
2538) -> Result<impl IntoResponse, (StatusCode, Json<serde_json::Value>)> {
2539    require_localhost(&peer)?;
2540    match state.pairing.generate_new_pairing_code() {
2541        Some(code) => {
2542            tracing::info!("🔐 New pairing code generated via admin endpoint");
2543            let body = serde_json::json!({
2544                "success": true,
2545                "pairing_required": state.pairing.require_pairing(),
2546                "pairing_code": code,
2547                "message": "New pairing code generated — use this one-time code to pair"
2548            });
2549            Ok((StatusCode::OK, Json(body)))
2550        }
2551        None => {
2552            let body = serde_json::json!({
2553                "success": false,
2554                "pairing_required": false,
2555                "pairing_code": null,
2556                "message": "Pairing is disabled for this gateway"
2557            });
2558            Ok((StatusCode::BAD_REQUEST, Json(body)))
2559        }
2560    }
2561}
2562
2563/// GET /pair/code — fetch the initial pairing code.
2564///
2565/// Requires a loopback peer. A publicly-reachable endpoint would let any caller
2566/// (e.g. an attacker scanning exposed ngrok/Cloudflare tunnels during first-run)
2567/// fetch the code before the legitimate operator. Host-side dashboards should
2568/// reach the gateway over loopback; containerized setups can call this via
2569/// `docker exec` or fetch the code from `construct onboard` output.
2570async fn handle_pair_code(
2571    State(state): State<AppState>,
2572    ConnectInfo(peer): ConnectInfo<SocketAddr>,
2573) -> Result<impl IntoResponse, (StatusCode, Json<serde_json::Value>)> {
2574    require_localhost(&peer)?;
2575
2576    let require = state.pairing.require_pairing();
2577    let is_paired = state.pairing.is_paired();
2578
2579    let code = if require && !is_paired {
2580        state.pairing.pairing_code()
2581    } else {
2582        None
2583    };
2584
2585    let body = serde_json::json!({
2586        "success": true,
2587        "pairing_required": require,
2588        "pairing_code": code,
2589    });
2590
2591    Ok((StatusCode::OK, Json(body)))
2592}
2593
2594#[cfg(test)]
2595mod tests {
2596    use super::*;
2597    use crate::channels::traits::ChannelMessage;
2598    use crate::memory::{Memory, MemoryCategory, MemoryEntry};
2599    use crate::providers::Provider;
2600    use async_trait::async_trait;
2601    use axum::http::HeaderValue;
2602    use axum::response::IntoResponse;
2603    use http_body_util::BodyExt;
2604    use parking_lot::Mutex;
2605    use std::sync::atomic::{AtomicUsize, Ordering};
2606
2607    /// Generate a random hex secret at runtime to avoid hard-coded cryptographic values.
2608    fn generate_test_secret() -> String {
2609        let bytes: [u8; 32] = rand::random();
2610        hex::encode(bytes)
2611    }
2612
2613    #[test]
2614    fn security_body_limit_is_64kb() {
2615        assert_eq!(MAX_BODY_SIZE, 65_536);
2616    }
2617
2618    #[test]
2619    fn security_timeout_default_is_30_seconds() {
2620        assert_eq!(REQUEST_TIMEOUT_SECS, 30);
2621    }
2622
2623    #[test]
2624    fn gateway_timeout_falls_back_to_default() {
2625        // When env var is not set, should return the default constant
2626        // SAFETY: test-only, single-threaded test runner.
2627        unsafe { std::env::remove_var("CONSTRUCT_GATEWAY_TIMEOUT_SECS") };
2628        assert_eq!(gateway_request_timeout_secs(), 30);
2629    }
2630
2631    #[test]
2632    fn webhook_body_requires_message_field() {
2633        let valid = r#"{"message": "hello"}"#;
2634        let parsed: Result<WebhookBody, _> = serde_json::from_str(valid);
2635        assert!(parsed.is_ok());
2636        assert_eq!(parsed.unwrap().message, "hello");
2637
2638        let missing = r#"{"other": "field"}"#;
2639        let parsed: Result<WebhookBody, _> = serde_json::from_str(missing);
2640        assert!(parsed.is_err());
2641    }
2642
2643    #[test]
2644    fn whatsapp_query_fields_are_optional() {
2645        let q = WhatsAppVerifyQuery {
2646            mode: None,
2647            verify_token: None,
2648            challenge: None,
2649        };
2650        assert!(q.mode.is_none());
2651    }
2652
2653    #[test]
2654    fn app_state_is_clone() {
2655        fn assert_clone<T: Clone>() {}
2656        assert_clone::<AppState>();
2657    }
2658
2659    #[tokio::test]
2660    async fn metrics_endpoint_returns_hint_when_prometheus_is_disabled() {
2661        let state = AppState {
2662            config: Arc::new(Mutex::new(Config::default())),
2663            provider: Arc::new(MockProvider::default()),
2664            model: "test-model".into(),
2665            temperature: 0.0,
2666            mem: Arc::new(MockMemory),
2667            auto_save: false,
2668            webhook_secret_hash: None,
2669            pairing: Arc::new(PairingGuard::new(false, &[])),
2670            trust_forwarded_headers: false,
2671            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
2672            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
2673            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
2674            whatsapp: None,
2675            whatsapp_app_secret: None,
2676            linq: None,
2677            linq_signing_secret: None,
2678            nextcloud_talk: None,
2679            nextcloud_talk_webhook_secret: None,
2680            wati: None,
2681            gmail_push: None,
2682            observer: Arc::new(crate::observability::NoopObserver),
2683            tools_registry: Arc::new(Vec::new()),
2684            cost_tracker: None,
2685            audit_logger: None,
2686            event_tx: tokio::sync::broadcast::channel(16).0,
2687            shutdown_tx: tokio::sync::watch::channel(false).0,
2688            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
2689            path_prefix: String::new(),
2690            session_backend: None,
2691            session_queue: std::sync::Arc::new(
2692                crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
2693            ),
2694            device_registry: None,
2695            pending_pairings: None,
2696            canvas_store: CanvasStore::new(),
2697            mcp_registry: None,
2698            approval_registry: approval_registry::global(),
2699            mcp_local_url: None,
2700            #[cfg(feature = "webauthn")]
2701            webauthn: None,
2702        };
2703
2704        let response = handle_metrics(State(state)).await.into_response();
2705        assert_eq!(response.status(), StatusCode::OK);
2706        assert_eq!(
2707            response
2708                .headers()
2709                .get(header::CONTENT_TYPE)
2710                .and_then(|value| value.to_str().ok()),
2711            Some(PROMETHEUS_CONTENT_TYPE)
2712        );
2713
2714        let body = response.into_body().collect().await.unwrap().to_bytes();
2715        let text = String::from_utf8(body.to_vec()).unwrap();
2716        assert!(text.contains("Prometheus backend not enabled"));
2717    }
2718
2719    #[cfg(feature = "observability-prometheus")]
2720    #[tokio::test]
2721    async fn metrics_endpoint_renders_prometheus_output() {
2722        let event_tx = tokio::sync::broadcast::channel(16).0;
2723        let wrapped = sse::BroadcastObserver::new(
2724            Box::new(crate::observability::PrometheusObserver::new()),
2725            event_tx.clone(),
2726        );
2727        crate::observability::Observer::record_event(
2728            &wrapped,
2729            &crate::observability::ObserverEvent::HeartbeatTick,
2730        );
2731
2732        let observer: Arc<dyn crate::observability::Observer> = Arc::new(wrapped);
2733        let state = AppState {
2734            config: Arc::new(Mutex::new(Config::default())),
2735            provider: Arc::new(MockProvider::default()),
2736            model: "test-model".into(),
2737            temperature: 0.0,
2738            mem: Arc::new(MockMemory),
2739            auto_save: false,
2740            webhook_secret_hash: None,
2741            pairing: Arc::new(PairingGuard::new(false, &[])),
2742            trust_forwarded_headers: false,
2743            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
2744            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
2745            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
2746            whatsapp: None,
2747            whatsapp_app_secret: None,
2748            linq: None,
2749            linq_signing_secret: None,
2750            nextcloud_talk: None,
2751            nextcloud_talk_webhook_secret: None,
2752            wati: None,
2753            gmail_push: None,
2754            observer,
2755            tools_registry: Arc::new(Vec::new()),
2756            cost_tracker: None,
2757            audit_logger: None,
2758            event_tx,
2759            shutdown_tx: tokio::sync::watch::channel(false).0,
2760            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
2761            path_prefix: String::new(),
2762            session_backend: None,
2763            session_queue: std::sync::Arc::new(
2764                crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
2765            ),
2766            device_registry: None,
2767            pending_pairings: None,
2768            canvas_store: CanvasStore::new(),
2769            mcp_registry: None,
2770            approval_registry: approval_registry::global(),
2771            mcp_local_url: None,
2772            #[cfg(feature = "webauthn")]
2773            webauthn: None,
2774        };
2775
2776        let response = handle_metrics(State(state)).await.into_response();
2777        assert_eq!(response.status(), StatusCode::OK);
2778
2779        let body = response.into_body().collect().await.unwrap().to_bytes();
2780        let text = String::from_utf8(body.to_vec()).unwrap();
2781        assert!(text.contains("construct_heartbeat_ticks_total 1"));
2782    }
2783
2784    #[test]
2785    fn gateway_rate_limiter_blocks_after_limit() {
2786        let limiter = GatewayRateLimiter::new(2, 2, 100);
2787        assert!(limiter.allow_pair("127.0.0.1"));
2788        assert!(limiter.allow_pair("127.0.0.1"));
2789        assert!(!limiter.allow_pair("127.0.0.1"));
2790    }
2791
2792    #[test]
2793    fn rate_limiter_sweep_removes_stale_entries() {
2794        let limiter = SlidingWindowRateLimiter::new(10, Duration::from_secs(60), 100);
2795        // Add entries for multiple IPs
2796        assert!(limiter.allow("ip-1"));
2797        assert!(limiter.allow("ip-2"));
2798        assert!(limiter.allow("ip-3"));
2799
2800        {
2801            let guard = limiter.requests.lock();
2802            assert_eq!(guard.0.len(), 3);
2803        }
2804
2805        // Force a sweep by backdating last_sweep
2806        {
2807            let mut guard = limiter.requests.lock();
2808            guard.1 = Instant::now()
2809                .checked_sub(Duration::from_secs(RATE_LIMITER_SWEEP_INTERVAL_SECS + 1))
2810                .unwrap();
2811            // Clear timestamps for ip-2 and ip-3 to simulate stale entries
2812            guard.0.get_mut("ip-2").unwrap().clear();
2813            guard.0.get_mut("ip-3").unwrap().clear();
2814        }
2815
2816        // Next allow() call should trigger sweep and remove stale entries
2817        assert!(limiter.allow("ip-1"));
2818
2819        {
2820            let guard = limiter.requests.lock();
2821            assert_eq!(guard.0.len(), 1, "Stale entries should have been swept");
2822            assert!(guard.0.contains_key("ip-1"));
2823        }
2824    }
2825
2826    #[test]
2827    fn rate_limiter_zero_limit_always_allows() {
2828        let limiter = SlidingWindowRateLimiter::new(0, Duration::from_secs(60), 10);
2829        for _ in 0..100 {
2830            assert!(limiter.allow("any-key"));
2831        }
2832    }
2833
2834    #[test]
2835    fn idempotency_store_rejects_duplicate_key() {
2836        let store = IdempotencyStore::new(Duration::from_secs(30), 10);
2837        assert!(store.record_if_new("req-1"));
2838        assert!(!store.record_if_new("req-1"));
2839        assert!(store.record_if_new("req-2"));
2840    }
2841
2842    #[test]
2843    fn rate_limiter_bounded_cardinality_evicts_oldest_key() {
2844        let limiter = SlidingWindowRateLimiter::new(5, Duration::from_secs(60), 2);
2845        assert!(limiter.allow("ip-1"));
2846        assert!(limiter.allow("ip-2"));
2847        assert!(limiter.allow("ip-3"));
2848
2849        let guard = limiter.requests.lock();
2850        assert_eq!(guard.0.len(), 2);
2851        assert!(guard.0.contains_key("ip-2"));
2852        assert!(guard.0.contains_key("ip-3"));
2853    }
2854
2855    #[test]
2856    fn idempotency_store_bounded_cardinality_evicts_oldest_key() {
2857        let store = IdempotencyStore::new(Duration::from_secs(300), 2);
2858        assert!(store.record_if_new("k1"));
2859        std::thread::sleep(Duration::from_millis(2));
2860        assert!(store.record_if_new("k2"));
2861        std::thread::sleep(Duration::from_millis(2));
2862        assert!(store.record_if_new("k3"));
2863
2864        let keys = store.keys.lock();
2865        assert_eq!(keys.len(), 2);
2866        assert!(!keys.contains_key("k1"));
2867        assert!(keys.contains_key("k2"));
2868        assert!(keys.contains_key("k3"));
2869    }
2870
2871    #[test]
2872    fn client_key_defaults_to_peer_addr_when_untrusted_proxy_mode() {
2873        let peer = SocketAddr::from(([10, 0, 0, 5], 42617));
2874        let mut headers = HeaderMap::new();
2875        headers.insert(
2876            "X-Forwarded-For",
2877            HeaderValue::from_static("198.51.100.10, 203.0.113.11"),
2878        );
2879
2880        let key = client_key_from_request(Some(peer), &headers, false);
2881        assert_eq!(key, "10.0.0.5");
2882    }
2883
2884    #[test]
2885    fn client_key_uses_forwarded_ip_only_in_trusted_proxy_mode() {
2886        // Rightmost XFF hop is the one appended by our trusted upstream proxy;
2887        // leftmost values are attacker-controlled (clients send whatever).
2888        let peer = SocketAddr::from(([10, 0, 0, 5], 42617));
2889        let mut headers = HeaderMap::new();
2890        headers.insert(
2891            "X-Forwarded-For",
2892            HeaderValue::from_static("198.51.100.10, 203.0.113.11"),
2893        );
2894
2895        let key = client_key_from_request(Some(peer), &headers, true);
2896        assert_eq!(key, "203.0.113.11");
2897    }
2898
2899    #[test]
2900    fn client_key_spoofed_leftmost_xff_does_not_bypass_trust() {
2901        // Attacker sets `X-Forwarded-For: 127.0.0.1, <legit-upstream>`. The
2902        // rightmost (trusted proxy's appended value) must win — if we took
2903        // the leftmost, a remote attacker could spoof loopback and evade
2904        // rate limits / lockouts.
2905        let peer = SocketAddr::from(([10, 0, 0, 5], 42617));
2906        let mut headers = HeaderMap::new();
2907        headers.insert(
2908            "X-Forwarded-For",
2909            HeaderValue::from_static("127.0.0.1, 203.0.113.11"),
2910        );
2911
2912        let key = client_key_from_request(Some(peer), &headers, true);
2913        assert_eq!(key, "203.0.113.11");
2914    }
2915
2916    #[test]
2917    fn client_key_falls_back_to_peer_when_forwarded_header_invalid() {
2918        let peer = SocketAddr::from(([10, 0, 0, 5], 42617));
2919        let mut headers = HeaderMap::new();
2920        headers.insert("X-Forwarded-For", HeaderValue::from_static("garbage-value"));
2921
2922        let key = client_key_from_request(Some(peer), &headers, true);
2923        assert_eq!(key, "10.0.0.5");
2924    }
2925
2926    #[test]
2927    fn normalize_max_keys_uses_fallback_for_zero() {
2928        assert_eq!(normalize_max_keys(0, 10_000), 10_000);
2929        assert_eq!(normalize_max_keys(0, 0), 1);
2930    }
2931
2932    #[test]
2933    fn normalize_max_keys_preserves_nonzero_values() {
2934        assert_eq!(normalize_max_keys(2_048, 10_000), 2_048);
2935        assert_eq!(normalize_max_keys(1, 10_000), 1);
2936    }
2937
2938    #[tokio::test]
2939    async fn persist_pairing_tokens_writes_config_tokens() {
2940        let temp = tempfile::tempdir().unwrap();
2941        let config_path = temp.path().join("config.toml");
2942        let workspace_path = temp.path().join("workspace");
2943
2944        let mut config = Config::default();
2945        config.config_path = config_path.clone();
2946        config.workspace_dir = workspace_path;
2947        config.save().await.unwrap();
2948
2949        let guard = PairingGuard::new(true, &[]);
2950        let code = guard.pairing_code().unwrap();
2951        let token = guard.try_pair(&code, "test_client").await.unwrap().unwrap();
2952        assert!(guard.is_authenticated(&token));
2953
2954        let shared_config = Arc::new(Mutex::new(config));
2955        Box::pin(persist_pairing_tokens(shared_config.clone(), &guard))
2956            .await
2957            .unwrap();
2958
2959        // In-memory tokens should remain as plaintext 64-char hex hashes.
2960        let plaintext = {
2961            let in_memory = shared_config.lock();
2962            assert_eq!(in_memory.gateway.paired_tokens.len(), 1);
2963            in_memory.gateway.paired_tokens[0].clone()
2964        };
2965        assert_eq!(plaintext.len(), 64);
2966        assert!(plaintext.chars().all(|c: char| c.is_ascii_hexdigit()));
2967
2968        // On disk, the token should be encrypted (secrets.encrypt defaults to true).
2969        let saved = tokio::fs::read_to_string(config_path).await.unwrap();
2970        let raw_parsed: Config = toml::from_str(&saved).unwrap();
2971        assert_eq!(raw_parsed.gateway.paired_tokens.len(), 1);
2972        let on_disk = &raw_parsed.gateway.paired_tokens[0];
2973        assert!(
2974            crate::security::SecretStore::is_encrypted(on_disk),
2975            "paired_token should be encrypted on disk"
2976        );
2977    }
2978
2979    #[test]
2980    fn webhook_memory_key_is_unique() {
2981        let key1 = webhook_memory_key();
2982        let key2 = webhook_memory_key();
2983
2984        assert!(key1.starts_with("webhook_msg_"));
2985        assert!(key2.starts_with("webhook_msg_"));
2986        assert_ne!(key1, key2);
2987    }
2988
2989    #[test]
2990    fn whatsapp_memory_key_includes_sender_and_message_id() {
2991        let msg = ChannelMessage {
2992            id: "wamid-123".into(),
2993            sender: "+1234567890".into(),
2994            reply_target: "+1234567890".into(),
2995            content: "hello".into(),
2996            channel: "whatsapp".into(),
2997            timestamp: 1,
2998            thread_ts: None,
2999            interruption_scope_id: None,
3000            attachments: vec![],
3001        };
3002
3003        let key = whatsapp_memory_key(&msg);
3004        assert_eq!(key, "whatsapp_+1234567890_wamid-123");
3005    }
3006
3007    #[derive(Default)]
3008    struct MockMemory;
3009
3010    #[async_trait]
3011    impl Memory for MockMemory {
3012        fn name(&self) -> &str {
3013            "mock"
3014        }
3015
3016        async fn store(
3017            &self,
3018            _key: &str,
3019            _content: &str,
3020            _category: MemoryCategory,
3021            _session_id: Option<&str>,
3022        ) -> anyhow::Result<()> {
3023            Ok(())
3024        }
3025
3026        async fn recall(
3027            &self,
3028            _query: &str,
3029            _limit: usize,
3030            _session_id: Option<&str>,
3031            _since: Option<&str>,
3032            _until: Option<&str>,
3033        ) -> anyhow::Result<Vec<MemoryEntry>> {
3034            Ok(Vec::new())
3035        }
3036
3037        async fn get(&self, _key: &str) -> anyhow::Result<Option<MemoryEntry>> {
3038            Ok(None)
3039        }
3040
3041        async fn list(
3042            &self,
3043            _category: Option<&MemoryCategory>,
3044            _session_id: Option<&str>,
3045        ) -> anyhow::Result<Vec<MemoryEntry>> {
3046            Ok(Vec::new())
3047        }
3048
3049        async fn forget(&self, _key: &str) -> anyhow::Result<bool> {
3050            Ok(false)
3051        }
3052
3053        async fn count(&self) -> anyhow::Result<usize> {
3054            Ok(0)
3055        }
3056
3057        async fn health_check(&self) -> bool {
3058            true
3059        }
3060    }
3061
3062    #[derive(Default)]
3063    struct MockProvider {
3064        calls: AtomicUsize,
3065    }
3066
3067    #[async_trait]
3068    impl Provider for MockProvider {
3069        async fn chat_with_system(
3070            &self,
3071            _system_prompt: Option<&str>,
3072            _message: &str,
3073            _model: &str,
3074            _temperature: f64,
3075        ) -> anyhow::Result<String> {
3076            self.calls.fetch_add(1, Ordering::SeqCst);
3077            Ok("ok".into())
3078        }
3079    }
3080
3081    #[derive(Default)]
3082    struct TrackingMemory {
3083        keys: Mutex<Vec<String>>,
3084    }
3085
3086    #[async_trait]
3087    impl Memory for TrackingMemory {
3088        fn name(&self) -> &str {
3089            "tracking"
3090        }
3091
3092        async fn store(
3093            &self,
3094            key: &str,
3095            _content: &str,
3096            _category: MemoryCategory,
3097            _session_id: Option<&str>,
3098        ) -> anyhow::Result<()> {
3099            self.keys.lock().push(key.to_string());
3100            Ok(())
3101        }
3102
3103        async fn recall(
3104            &self,
3105            _query: &str,
3106            _limit: usize,
3107            _session_id: Option<&str>,
3108            _since: Option<&str>,
3109            _until: Option<&str>,
3110        ) -> anyhow::Result<Vec<MemoryEntry>> {
3111            Ok(Vec::new())
3112        }
3113
3114        async fn get(&self, _key: &str) -> anyhow::Result<Option<MemoryEntry>> {
3115            Ok(None)
3116        }
3117
3118        async fn list(
3119            &self,
3120            _category: Option<&MemoryCategory>,
3121            _session_id: Option<&str>,
3122        ) -> anyhow::Result<Vec<MemoryEntry>> {
3123            Ok(Vec::new())
3124        }
3125
3126        async fn forget(&self, _key: &str) -> anyhow::Result<bool> {
3127            Ok(false)
3128        }
3129
3130        async fn count(&self) -> anyhow::Result<usize> {
3131            let size = self.keys.lock().len();
3132            Ok(size)
3133        }
3134
3135        async fn health_check(&self) -> bool {
3136            true
3137        }
3138    }
3139
3140    fn test_connect_info() -> ConnectInfo<SocketAddr> {
3141        ConnectInfo(SocketAddr::from(([127, 0, 0, 1], 30_300)))
3142    }
3143
3144    #[tokio::test]
3145    async fn webhook_idempotency_skips_duplicate_provider_calls() {
3146        let provider_impl = Arc::new(MockProvider::default());
3147        let provider: Arc<dyn Provider> = provider_impl.clone();
3148        let memory: Arc<dyn Memory> = Arc::new(MockMemory);
3149
3150        let state = AppState {
3151            config: Arc::new(Mutex::new(Config::default())),
3152            provider,
3153            model: "test-model".into(),
3154            temperature: 0.0,
3155            mem: memory,
3156            auto_save: false,
3157            webhook_secret_hash: None,
3158            pairing: Arc::new(PairingGuard::new(false, &[])),
3159            trust_forwarded_headers: false,
3160            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3161            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3162            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3163            whatsapp: None,
3164            whatsapp_app_secret: None,
3165            linq: None,
3166            linq_signing_secret: None,
3167            nextcloud_talk: None,
3168            nextcloud_talk_webhook_secret: None,
3169            wati: None,
3170            gmail_push: None,
3171            observer: Arc::new(crate::observability::NoopObserver),
3172            tools_registry: Arc::new(Vec::new()),
3173            cost_tracker: None,
3174            audit_logger: None,
3175            event_tx: tokio::sync::broadcast::channel(16).0,
3176            shutdown_tx: tokio::sync::watch::channel(false).0,
3177            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3178            path_prefix: String::new(),
3179            session_backend: None,
3180            session_queue: std::sync::Arc::new(
3181                crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3182            ),
3183            device_registry: None,
3184            pending_pairings: None,
3185            canvas_store: CanvasStore::new(),
3186            mcp_registry: None,
3187            approval_registry: approval_registry::global(),
3188            mcp_local_url: None,
3189            #[cfg(feature = "webauthn")]
3190            webauthn: None,
3191        };
3192
3193        let mut headers = HeaderMap::new();
3194        headers.insert("X-Idempotency-Key", HeaderValue::from_static("abc-123"));
3195
3196        let body = Ok(Json(WebhookBody {
3197            message: "hello".into(),
3198        }));
3199        let first = handle_webhook(
3200            State(state.clone()),
3201            test_connect_info(),
3202            headers.clone(),
3203            body,
3204        )
3205        .await
3206        .into_response();
3207        assert_eq!(first.status(), StatusCode::OK);
3208
3209        let body = Ok(Json(WebhookBody {
3210            message: "hello".into(),
3211        }));
3212        let second = handle_webhook(State(state), test_connect_info(), headers, body)
3213            .await
3214            .into_response();
3215        assert_eq!(second.status(), StatusCode::OK);
3216
3217        let payload = second.into_body().collect().await.unwrap().to_bytes();
3218        let parsed: serde_json::Value = serde_json::from_slice(&payload).unwrap();
3219        assert_eq!(parsed["status"], "duplicate");
3220        assert_eq!(parsed["idempotent"], true);
3221        assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 1);
3222    }
3223
3224    #[tokio::test]
3225    async fn webhook_autosave_stores_distinct_keys_per_request() {
3226        let provider_impl = Arc::new(MockProvider::default());
3227        let provider: Arc<dyn Provider> = provider_impl.clone();
3228
3229        let tracking_impl = Arc::new(TrackingMemory::default());
3230        let memory: Arc<dyn Memory> = tracking_impl.clone();
3231
3232        let state = AppState {
3233            config: Arc::new(Mutex::new(Config::default())),
3234            provider,
3235            model: "test-model".into(),
3236            temperature: 0.0,
3237            mem: memory,
3238            auto_save: true,
3239            webhook_secret_hash: None,
3240            pairing: Arc::new(PairingGuard::new(false, &[])),
3241            trust_forwarded_headers: false,
3242            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3243            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3244            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3245            whatsapp: None,
3246            whatsapp_app_secret: None,
3247            linq: None,
3248            linq_signing_secret: None,
3249            nextcloud_talk: None,
3250            nextcloud_talk_webhook_secret: None,
3251            wati: None,
3252            gmail_push: None,
3253            observer: Arc::new(crate::observability::NoopObserver),
3254            tools_registry: Arc::new(Vec::new()),
3255            cost_tracker: None,
3256            audit_logger: None,
3257            event_tx: tokio::sync::broadcast::channel(16).0,
3258            shutdown_tx: tokio::sync::watch::channel(false).0,
3259            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3260            path_prefix: String::new(),
3261            session_backend: None,
3262            session_queue: std::sync::Arc::new(
3263                crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3264            ),
3265            device_registry: None,
3266            pending_pairings: None,
3267            canvas_store: CanvasStore::new(),
3268            mcp_registry: None,
3269            approval_registry: approval_registry::global(),
3270            mcp_local_url: None,
3271            #[cfg(feature = "webauthn")]
3272            webauthn: None,
3273        };
3274
3275        let headers = HeaderMap::new();
3276
3277        let body1 = Ok(Json(WebhookBody {
3278            message: "hello one".into(),
3279        }));
3280        let first = handle_webhook(
3281            State(state.clone()),
3282            test_connect_info(),
3283            headers.clone(),
3284            body1,
3285        )
3286        .await
3287        .into_response();
3288        assert_eq!(first.status(), StatusCode::OK);
3289
3290        let body2 = Ok(Json(WebhookBody {
3291            message: "hello two".into(),
3292        }));
3293        let second = handle_webhook(State(state), test_connect_info(), headers, body2)
3294            .await
3295            .into_response();
3296        assert_eq!(second.status(), StatusCode::OK);
3297
3298        let keys = tracking_impl.keys.lock().clone();
3299        assert_eq!(keys.len(), 2);
3300        assert_ne!(keys[0], keys[1]);
3301        assert!(keys[0].starts_with("webhook_msg_"));
3302        assert!(keys[1].starts_with("webhook_msg_"));
3303        assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 2);
3304    }
3305
3306    #[test]
3307    fn webhook_secret_hash_is_deterministic_and_nonempty() {
3308        let secret_a = generate_test_secret();
3309        let secret_b = generate_test_secret();
3310        let one = hash_webhook_secret(&secret_a);
3311        let two = hash_webhook_secret(&secret_a);
3312        let other = hash_webhook_secret(&secret_b);
3313
3314        assert_eq!(one, two);
3315        assert_ne!(one, other);
3316        assert_eq!(one.len(), 64);
3317    }
3318
3319    #[tokio::test]
3320    async fn webhook_secret_hash_rejects_missing_header() {
3321        let provider_impl = Arc::new(MockProvider::default());
3322        let provider: Arc<dyn Provider> = provider_impl.clone();
3323        let memory: Arc<dyn Memory> = Arc::new(MockMemory);
3324        let secret = generate_test_secret();
3325
3326        let state = AppState {
3327            config: Arc::new(Mutex::new(Config::default())),
3328            provider,
3329            model: "test-model".into(),
3330            temperature: 0.0,
3331            mem: memory,
3332            auto_save: false,
3333            webhook_secret_hash: Some(Arc::from(hash_webhook_secret(&secret))),
3334            pairing: Arc::new(PairingGuard::new(false, &[])),
3335            trust_forwarded_headers: false,
3336            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3337            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3338            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3339            whatsapp: None,
3340            whatsapp_app_secret: None,
3341            linq: None,
3342            linq_signing_secret: None,
3343            nextcloud_talk: None,
3344            nextcloud_talk_webhook_secret: None,
3345            wati: None,
3346            gmail_push: None,
3347            observer: Arc::new(crate::observability::NoopObserver),
3348            tools_registry: Arc::new(Vec::new()),
3349            cost_tracker: None,
3350            audit_logger: None,
3351            event_tx: tokio::sync::broadcast::channel(16).0,
3352            shutdown_tx: tokio::sync::watch::channel(false).0,
3353            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3354            path_prefix: String::new(),
3355            session_backend: None,
3356            session_queue: std::sync::Arc::new(
3357                crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3358            ),
3359            device_registry: None,
3360            pending_pairings: None,
3361            canvas_store: CanvasStore::new(),
3362            mcp_registry: None,
3363            approval_registry: approval_registry::global(),
3364            mcp_local_url: None,
3365            #[cfg(feature = "webauthn")]
3366            webauthn: None,
3367        };
3368
3369        let response = handle_webhook(
3370            State(state),
3371            test_connect_info(),
3372            HeaderMap::new(),
3373            Ok(Json(WebhookBody {
3374                message: "hello".into(),
3375            })),
3376        )
3377        .await
3378        .into_response();
3379
3380        assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
3381        assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 0);
3382    }
3383
3384    #[tokio::test]
3385    async fn webhook_secret_hash_rejects_invalid_header() {
3386        let provider_impl = Arc::new(MockProvider::default());
3387        let provider: Arc<dyn Provider> = provider_impl.clone();
3388        let memory: Arc<dyn Memory> = Arc::new(MockMemory);
3389        let valid_secret = generate_test_secret();
3390        let wrong_secret = generate_test_secret();
3391
3392        let state = AppState {
3393            config: Arc::new(Mutex::new(Config::default())),
3394            provider,
3395            model: "test-model".into(),
3396            temperature: 0.0,
3397            mem: memory,
3398            auto_save: false,
3399            webhook_secret_hash: Some(Arc::from(hash_webhook_secret(&valid_secret))),
3400            pairing: Arc::new(PairingGuard::new(false, &[])),
3401            trust_forwarded_headers: false,
3402            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3403            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3404            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3405            whatsapp: None,
3406            whatsapp_app_secret: None,
3407            linq: None,
3408            linq_signing_secret: None,
3409            nextcloud_talk: None,
3410            nextcloud_talk_webhook_secret: None,
3411            wati: None,
3412            gmail_push: None,
3413            observer: Arc::new(crate::observability::NoopObserver),
3414            tools_registry: Arc::new(Vec::new()),
3415            cost_tracker: None,
3416            audit_logger: None,
3417            event_tx: tokio::sync::broadcast::channel(16).0,
3418            shutdown_tx: tokio::sync::watch::channel(false).0,
3419            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3420            path_prefix: String::new(),
3421            session_backend: None,
3422            session_queue: std::sync::Arc::new(
3423                crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3424            ),
3425            device_registry: None,
3426            pending_pairings: None,
3427            canvas_store: CanvasStore::new(),
3428            mcp_registry: None,
3429            approval_registry: approval_registry::global(),
3430            mcp_local_url: None,
3431            #[cfg(feature = "webauthn")]
3432            webauthn: None,
3433        };
3434
3435        let mut headers = HeaderMap::new();
3436        headers.insert(
3437            "X-Webhook-Secret",
3438            HeaderValue::from_str(&wrong_secret).unwrap(),
3439        );
3440
3441        let response = handle_webhook(
3442            State(state),
3443            test_connect_info(),
3444            headers,
3445            Ok(Json(WebhookBody {
3446                message: "hello".into(),
3447            })),
3448        )
3449        .await
3450        .into_response();
3451
3452        assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
3453        assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 0);
3454    }
3455
3456    #[tokio::test]
3457    async fn webhook_secret_hash_accepts_valid_header() {
3458        let provider_impl = Arc::new(MockProvider::default());
3459        let provider: Arc<dyn Provider> = provider_impl.clone();
3460        let memory: Arc<dyn Memory> = Arc::new(MockMemory);
3461        let secret = generate_test_secret();
3462
3463        let state = AppState {
3464            config: Arc::new(Mutex::new(Config::default())),
3465            provider,
3466            model: "test-model".into(),
3467            temperature: 0.0,
3468            mem: memory,
3469            auto_save: false,
3470            webhook_secret_hash: Some(Arc::from(hash_webhook_secret(&secret))),
3471            pairing: Arc::new(PairingGuard::new(false, &[])),
3472            trust_forwarded_headers: false,
3473            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3474            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3475            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3476            whatsapp: None,
3477            whatsapp_app_secret: None,
3478            linq: None,
3479            linq_signing_secret: None,
3480            nextcloud_talk: None,
3481            nextcloud_talk_webhook_secret: None,
3482            wati: None,
3483            gmail_push: None,
3484            observer: Arc::new(crate::observability::NoopObserver),
3485            tools_registry: Arc::new(Vec::new()),
3486            cost_tracker: None,
3487            audit_logger: None,
3488            event_tx: tokio::sync::broadcast::channel(16).0,
3489            shutdown_tx: tokio::sync::watch::channel(false).0,
3490            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3491            path_prefix: String::new(),
3492            session_backend: None,
3493            session_queue: std::sync::Arc::new(
3494                crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3495            ),
3496            device_registry: None,
3497            pending_pairings: None,
3498            canvas_store: CanvasStore::new(),
3499            mcp_registry: None,
3500            approval_registry: approval_registry::global(),
3501            mcp_local_url: None,
3502            #[cfg(feature = "webauthn")]
3503            webauthn: None,
3504        };
3505
3506        let mut headers = HeaderMap::new();
3507        headers.insert("X-Webhook-Secret", HeaderValue::from_str(&secret).unwrap());
3508
3509        let response = handle_webhook(
3510            State(state),
3511            test_connect_info(),
3512            headers,
3513            Ok(Json(WebhookBody {
3514                message: "hello".into(),
3515            })),
3516        )
3517        .await
3518        .into_response();
3519
3520        assert_eq!(response.status(), StatusCode::OK);
3521        assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 1);
3522    }
3523
3524    fn compute_nextcloud_signature_hex(secret: &str, random: &str, body: &str) -> String {
3525        use hmac::{Hmac, Mac};
3526        use sha2::Sha256;
3527
3528        let payload = format!("{random}{body}");
3529        let mut mac = Hmac::<Sha256>::new_from_slice(secret.as_bytes()).unwrap();
3530        mac.update(payload.as_bytes());
3531        hex::encode(mac.finalize().into_bytes())
3532    }
3533
3534    #[tokio::test]
3535    async fn nextcloud_talk_webhook_returns_not_found_when_not_configured() {
3536        let provider: Arc<dyn Provider> = Arc::new(MockProvider::default());
3537        let memory: Arc<dyn Memory> = Arc::new(MockMemory);
3538
3539        let state = AppState {
3540            config: Arc::new(Mutex::new(Config::default())),
3541            provider,
3542            model: "test-model".into(),
3543            temperature: 0.0,
3544            mem: memory,
3545            auto_save: false,
3546            webhook_secret_hash: None,
3547            pairing: Arc::new(PairingGuard::new(false, &[])),
3548            trust_forwarded_headers: false,
3549            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3550            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3551            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3552            whatsapp: None,
3553            whatsapp_app_secret: None,
3554            linq: None,
3555            linq_signing_secret: None,
3556            nextcloud_talk: None,
3557            nextcloud_talk_webhook_secret: None,
3558            wati: None,
3559            gmail_push: None,
3560            observer: Arc::new(crate::observability::NoopObserver),
3561            tools_registry: Arc::new(Vec::new()),
3562            cost_tracker: None,
3563            audit_logger: None,
3564            event_tx: tokio::sync::broadcast::channel(16).0,
3565            shutdown_tx: tokio::sync::watch::channel(false).0,
3566            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3567            path_prefix: String::new(),
3568            session_backend: None,
3569            session_queue: std::sync::Arc::new(
3570                crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3571            ),
3572            device_registry: None,
3573            pending_pairings: None,
3574            canvas_store: CanvasStore::new(),
3575            mcp_registry: None,
3576            approval_registry: approval_registry::global(),
3577            mcp_local_url: None,
3578            #[cfg(feature = "webauthn")]
3579            webauthn: None,
3580        };
3581
3582        let response = Box::pin(handle_nextcloud_talk_webhook(
3583            State(state),
3584            HeaderMap::new(),
3585            Bytes::from_static(br#"{"type":"message"}"#),
3586        ))
3587        .await
3588        .into_response();
3589
3590        assert_eq!(response.status(), StatusCode::NOT_FOUND);
3591    }
3592
3593    #[tokio::test]
3594    async fn nextcloud_talk_webhook_rejects_invalid_signature() {
3595        let provider_impl = Arc::new(MockProvider::default());
3596        let provider: Arc<dyn Provider> = provider_impl.clone();
3597        let memory: Arc<dyn Memory> = Arc::new(MockMemory);
3598
3599        let channel = Arc::new(NextcloudTalkChannel::new(
3600            "https://cloud.example.com".into(),
3601            "app-token".into(),
3602            String::new(),
3603            vec!["*".into()],
3604        ));
3605
3606        let secret = "nextcloud-test-secret";
3607        let random = "seed-value";
3608        let body = r#"{"type":"message","object":{"token":"room-token"},"message":{"actorType":"users","actorId":"user_a","message":"hello"}}"#;
3609        let _valid_signature = compute_nextcloud_signature_hex(secret, random, body);
3610        let invalid_signature = "deadbeef";
3611
3612        let state = AppState {
3613            config: Arc::new(Mutex::new(Config::default())),
3614            provider,
3615            model: "test-model".into(),
3616            temperature: 0.0,
3617            mem: memory,
3618            auto_save: false,
3619            webhook_secret_hash: None,
3620            pairing: Arc::new(PairingGuard::new(false, &[])),
3621            trust_forwarded_headers: false,
3622            rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3623            auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3624            idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3625            whatsapp: None,
3626            whatsapp_app_secret: None,
3627            linq: None,
3628            linq_signing_secret: None,
3629            nextcloud_talk: Some(channel),
3630            nextcloud_talk_webhook_secret: Some(Arc::from(secret)),
3631            wati: None,
3632            gmail_push: None,
3633            observer: Arc::new(crate::observability::NoopObserver),
3634            tools_registry: Arc::new(Vec::new()),
3635            cost_tracker: None,
3636            audit_logger: None,
3637            event_tx: tokio::sync::broadcast::channel(16).0,
3638            shutdown_tx: tokio::sync::watch::channel(false).0,
3639            node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3640            path_prefix: String::new(),
3641            session_backend: None,
3642            session_queue: std::sync::Arc::new(
3643                crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3644            ),
3645            device_registry: None,
3646            pending_pairings: None,
3647            canvas_store: CanvasStore::new(),
3648            mcp_registry: None,
3649            approval_registry: approval_registry::global(),
3650            mcp_local_url: None,
3651            #[cfg(feature = "webauthn")]
3652            webauthn: None,
3653        };
3654
3655        let mut headers = HeaderMap::new();
3656        headers.insert(
3657            "X-Nextcloud-Talk-Random",
3658            HeaderValue::from_str(random).unwrap(),
3659        );
3660        headers.insert(
3661            "X-Nextcloud-Talk-Signature",
3662            HeaderValue::from_str(invalid_signature).unwrap(),
3663        );
3664
3665        let response = Box::pin(handle_nextcloud_talk_webhook(
3666            State(state),
3667            headers,
3668            Bytes::from(body),
3669        ))
3670        .await
3671        .into_response();
3672        assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
3673        assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 0);
3674    }
3675
3676    // ══════════════════════════════════════════════════════════
3677    // WhatsApp Signature Verification Tests (CWE-345 Prevention)
3678    // ══════════════════════════════════════════════════════════
3679
3680    fn compute_whatsapp_signature_hex(secret: &str, body: &[u8]) -> String {
3681        use hmac::{Hmac, Mac};
3682        use sha2::Sha256;
3683
3684        let mut mac = Hmac::<Sha256>::new_from_slice(secret.as_bytes()).unwrap();
3685        mac.update(body);
3686        hex::encode(mac.finalize().into_bytes())
3687    }
3688
3689    fn compute_whatsapp_signature_header(secret: &str, body: &[u8]) -> String {
3690        format!("sha256={}", compute_whatsapp_signature_hex(secret, body))
3691    }
3692
3693    #[test]
3694    fn whatsapp_signature_valid() {
3695        let app_secret = generate_test_secret();
3696        let body = b"test body content";
3697
3698        let signature_header = compute_whatsapp_signature_header(&app_secret, body);
3699
3700        assert!(verify_whatsapp_signature(
3701            &app_secret,
3702            body,
3703            &signature_header
3704        ));
3705    }
3706
3707    #[test]
3708    fn whatsapp_signature_invalid_wrong_secret() {
3709        let app_secret = generate_test_secret();
3710        let wrong_secret = generate_test_secret();
3711        let body = b"test body content";
3712
3713        let signature_header = compute_whatsapp_signature_header(&wrong_secret, body);
3714
3715        assert!(!verify_whatsapp_signature(
3716            &app_secret,
3717            body,
3718            &signature_header
3719        ));
3720    }
3721
3722    #[test]
3723    fn whatsapp_signature_invalid_wrong_body() {
3724        let app_secret = generate_test_secret();
3725        let original_body = b"original body";
3726        let tampered_body = b"tampered body";
3727
3728        let signature_header = compute_whatsapp_signature_header(&app_secret, original_body);
3729
3730        // Verify with tampered body should fail
3731        assert!(!verify_whatsapp_signature(
3732            &app_secret,
3733            tampered_body,
3734            &signature_header
3735        ));
3736    }
3737
3738    #[test]
3739    fn whatsapp_signature_missing_prefix() {
3740        let app_secret = generate_test_secret();
3741        let body = b"test body";
3742
3743        // Signature without "sha256=" prefix
3744        let signature_header = "abc123def456";
3745
3746        assert!(!verify_whatsapp_signature(
3747            &app_secret,
3748            body,
3749            signature_header
3750        ));
3751    }
3752
3753    #[test]
3754    fn whatsapp_signature_empty_header() {
3755        let app_secret = generate_test_secret();
3756        let body = b"test body";
3757
3758        assert!(!verify_whatsapp_signature(&app_secret, body, ""));
3759    }
3760
3761    #[test]
3762    fn whatsapp_signature_invalid_hex() {
3763        let app_secret = generate_test_secret();
3764        let body = b"test body";
3765
3766        // Invalid hex characters
3767        let signature_header = "sha256=not_valid_hex_zzz";
3768
3769        assert!(!verify_whatsapp_signature(
3770            &app_secret,
3771            body,
3772            signature_header
3773        ));
3774    }
3775
3776    #[test]
3777    fn whatsapp_signature_empty_body() {
3778        let app_secret = generate_test_secret();
3779        let body = b"";
3780
3781        let signature_header = compute_whatsapp_signature_header(&app_secret, body);
3782
3783        assert!(verify_whatsapp_signature(
3784            &app_secret,
3785            body,
3786            &signature_header
3787        ));
3788    }
3789
3790    #[test]
3791    fn whatsapp_signature_unicode_body() {
3792        let app_secret = generate_test_secret();
3793        let body = "Hello 🦀 World".as_bytes();
3794
3795        let signature_header = compute_whatsapp_signature_header(&app_secret, body);
3796
3797        assert!(verify_whatsapp_signature(
3798            &app_secret,
3799            body,
3800            &signature_header
3801        ));
3802    }
3803
3804    #[test]
3805    fn whatsapp_signature_json_payload() {
3806        let app_secret = generate_test_secret();
3807        let body = br#"{"entry":[{"changes":[{"value":{"messages":[{"from":"1234567890","text":{"body":"Hello"}}]}}]}]}"#;
3808
3809        let signature_header = compute_whatsapp_signature_header(&app_secret, body);
3810
3811        assert!(verify_whatsapp_signature(
3812            &app_secret,
3813            body,
3814            &signature_header
3815        ));
3816    }
3817
3818    #[test]
3819    fn whatsapp_signature_case_sensitive_prefix() {
3820        let app_secret = generate_test_secret();
3821        let body = b"test body";
3822
3823        let hex_sig = compute_whatsapp_signature_hex(&app_secret, body);
3824
3825        // Wrong case prefix should fail
3826        let wrong_prefix = format!("SHA256={hex_sig}");
3827        assert!(!verify_whatsapp_signature(&app_secret, body, &wrong_prefix));
3828
3829        // Correct prefix should pass
3830        let correct_prefix = format!("sha256={hex_sig}");
3831        assert!(verify_whatsapp_signature(
3832            &app_secret,
3833            body,
3834            &correct_prefix
3835        ));
3836    }
3837
3838    #[test]
3839    fn whatsapp_signature_truncated_hex() {
3840        let app_secret = generate_test_secret();
3841        let body = b"test body";
3842
3843        let hex_sig = compute_whatsapp_signature_hex(&app_secret, body);
3844        let truncated = &hex_sig[..32]; // Only half the signature
3845        let signature_header = format!("sha256={truncated}");
3846
3847        assert!(!verify_whatsapp_signature(
3848            &app_secret,
3849            body,
3850            &signature_header
3851        ));
3852    }
3853
3854    #[test]
3855    fn whatsapp_signature_extra_bytes() {
3856        let app_secret = generate_test_secret();
3857        let body = b"test body";
3858
3859        let hex_sig = compute_whatsapp_signature_hex(&app_secret, body);
3860        let extended = format!("{hex_sig}deadbeef");
3861        let signature_header = format!("sha256={extended}");
3862
3863        assert!(!verify_whatsapp_signature(
3864            &app_secret,
3865            body,
3866            &signature_header
3867        ));
3868    }
3869
3870    // ══════════════════════════════════════════════════════════
3871    // IdempotencyStore Edge-Case Tests
3872    // ══════════════════════════════════════════════════════════
3873
3874    #[test]
3875    fn idempotency_store_allows_different_keys() {
3876        let store = IdempotencyStore::new(Duration::from_secs(60), 100);
3877        assert!(store.record_if_new("key-a"));
3878        assert!(store.record_if_new("key-b"));
3879        assert!(store.record_if_new("key-c"));
3880        assert!(store.record_if_new("key-d"));
3881    }
3882
3883    #[test]
3884    fn idempotency_store_max_keys_clamped_to_one() {
3885        let store = IdempotencyStore::new(Duration::from_secs(60), 0);
3886        assert!(store.record_if_new("only-key"));
3887        assert!(!store.record_if_new("only-key"));
3888    }
3889
3890    #[test]
3891    fn idempotency_store_rapid_duplicate_rejected() {
3892        let store = IdempotencyStore::new(Duration::from_secs(300), 100);
3893        assert!(store.record_if_new("rapid"));
3894        assert!(!store.record_if_new("rapid"));
3895    }
3896
3897    #[test]
3898    fn idempotency_store_accepts_after_ttl_expires() {
3899        let store = IdempotencyStore::new(Duration::from_millis(1), 100);
3900        assert!(store.record_if_new("ttl-key"));
3901        std::thread::sleep(Duration::from_millis(10));
3902        assert!(store.record_if_new("ttl-key"));
3903    }
3904
3905    #[test]
3906    fn idempotency_store_eviction_preserves_newest() {
3907        let store = IdempotencyStore::new(Duration::from_secs(300), 1);
3908        assert!(store.record_if_new("old-key"));
3909        std::thread::sleep(Duration::from_millis(2));
3910        assert!(store.record_if_new("new-key"));
3911
3912        let keys = store.keys.lock();
3913        assert_eq!(keys.len(), 1);
3914        assert!(!keys.contains_key("old-key"));
3915        assert!(keys.contains_key("new-key"));
3916    }
3917
3918    #[test]
3919    fn rate_limiter_allows_after_window_expires() {
3920        let window = Duration::from_millis(50);
3921        let limiter = SlidingWindowRateLimiter::new(2, window, 100);
3922        assert!(limiter.allow("ip-1"));
3923        assert!(limiter.allow("ip-1"));
3924        assert!(!limiter.allow("ip-1")); // blocked
3925
3926        // Wait for window to expire
3927        std::thread::sleep(Duration::from_millis(60));
3928
3929        // Should be allowed again
3930        assert!(limiter.allow("ip-1"));
3931    }
3932
3933    #[test]
3934    fn rate_limiter_independent_keys_tracked_separately() {
3935        let limiter = SlidingWindowRateLimiter::new(2, Duration::from_secs(60), 100);
3936        assert!(limiter.allow("ip-1"));
3937        assert!(limiter.allow("ip-1"));
3938        assert!(!limiter.allow("ip-1")); // ip-1 blocked
3939
3940        // ip-2 should still work
3941        assert!(limiter.allow("ip-2"));
3942        assert!(limiter.allow("ip-2"));
3943        assert!(!limiter.allow("ip-2")); // ip-2 now blocked
3944    }
3945
3946    #[test]
3947    fn rate_limiter_exact_boundary_at_max_keys() {
3948        let limiter = SlidingWindowRateLimiter::new(10, Duration::from_secs(60), 3);
3949        assert!(limiter.allow("ip-1"));
3950        assert!(limiter.allow("ip-2"));
3951        assert!(limiter.allow("ip-3"));
3952        // At capacity now
3953        assert!(limiter.allow("ip-4")); // should evict ip-1
3954
3955        let guard = limiter.requests.lock();
3956        assert_eq!(guard.0.len(), 3);
3957        assert!(
3958            !guard.0.contains_key("ip-1"),
3959            "ip-1 should have been evicted"
3960        );
3961        assert!(guard.0.contains_key("ip-2"));
3962        assert!(guard.0.contains_key("ip-3"));
3963        assert!(guard.0.contains_key("ip-4"));
3964    }
3965
3966    #[test]
3967    fn gateway_rate_limiter_pair_and_webhook_are_independent() {
3968        let limiter = GatewayRateLimiter::new(2, 3, 100);
3969
3970        // Exhaust pair limit
3971        assert!(limiter.allow_pair("ip-1"));
3972        assert!(limiter.allow_pair("ip-1"));
3973        assert!(!limiter.allow_pair("ip-1")); // pair blocked
3974
3975        // Webhook should still work
3976        assert!(limiter.allow_webhook("ip-1"));
3977        assert!(limiter.allow_webhook("ip-1"));
3978        assert!(limiter.allow_webhook("ip-1"));
3979        assert!(!limiter.allow_webhook("ip-1")); // webhook now blocked
3980    }
3981
3982    #[test]
3983    fn rate_limiter_single_key_max_allows_one_request() {
3984        let limiter = SlidingWindowRateLimiter::new(5, Duration::from_secs(60), 1);
3985        assert!(limiter.allow("ip-1"));
3986        assert!(limiter.allow("ip-2")); // evicts ip-1
3987
3988        let guard = limiter.requests.lock();
3989        assert_eq!(guard.0.len(), 1);
3990        assert!(guard.0.contains_key("ip-2"));
3991        assert!(!guard.0.contains_key("ip-1"));
3992    }
3993
3994    #[test]
3995    fn rate_limiter_concurrent_access_safe() {
3996        use std::sync::Arc;
3997
3998        let limiter = Arc::new(SlidingWindowRateLimiter::new(
3999            1000,
4000            Duration::from_secs(60),
4001            1000,
4002        ));
4003        let mut handles = Vec::new();
4004
4005        for i in 0..10 {
4006            let limiter = limiter.clone();
4007            handles.push(std::thread::spawn(move || {
4008                for j in 0..100 {
4009                    limiter.allow(&format!("thread-{i}-req-{j}"));
4010                }
4011            }));
4012        }
4013
4014        for handle in handles {
4015            handle.join().unwrap();
4016        }
4017
4018        // Should not panic or deadlock
4019        let guard = limiter.requests.lock();
4020        assert!(guard.0.len() <= 1000, "should respect max_keys");
4021    }
4022
4023    #[test]
4024    fn idempotency_store_concurrent_access_safe() {
4025        use std::sync::Arc;
4026
4027        let store = Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000));
4028        let mut handles = Vec::new();
4029
4030        for i in 0..10 {
4031            let store = store.clone();
4032            handles.push(std::thread::spawn(move || {
4033                for j in 0..100 {
4034                    store.record_if_new(&format!("thread-{i}-key-{j}"));
4035                }
4036            }));
4037        }
4038
4039        for handle in handles {
4040            handle.join().unwrap();
4041        }
4042
4043        let keys = store.keys.lock();
4044        assert!(keys.len() <= 1000, "should respect max_keys");
4045    }
4046
4047    #[test]
4048    fn rate_limiter_rapid_burst_then_cooldown() {
4049        let limiter = SlidingWindowRateLimiter::new(5, Duration::from_millis(50), 100);
4050
4051        // Burst: use all 5 requests
4052        for _ in 0..5 {
4053            assert!(limiter.allow("burst-ip"));
4054        }
4055        assert!(!limiter.allow("burst-ip")); // 6th should fail
4056
4057        // Cooldown
4058        std::thread::sleep(Duration::from_millis(60));
4059
4060        // Should be allowed again
4061        assert!(limiter.allow("burst-ip"));
4062    }
4063
4064    #[test]
4065    fn require_localhost_accepts_ipv4_loopback() {
4066        let peer = SocketAddr::from(([127, 0, 0, 1], 12345));
4067        assert!(require_localhost(&peer).is_ok());
4068    }
4069
4070    #[test]
4071    fn require_localhost_accepts_ipv6_loopback() {
4072        let peer = SocketAddr::from((std::net::Ipv6Addr::LOCALHOST, 12345));
4073        assert!(require_localhost(&peer).is_ok());
4074    }
4075
4076    #[test]
4077    fn require_localhost_rejects_non_loopback_ipv4() {
4078        let peer = SocketAddr::from(([192, 168, 1, 100], 12345));
4079        let err = require_localhost(&peer).unwrap_err();
4080        assert_eq!(err.0, StatusCode::FORBIDDEN);
4081    }
4082
4083    #[test]
4084    fn require_localhost_rejects_non_loopback_ipv6() {
4085        let peer = SocketAddr::from((
4086            std::net::Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1),
4087            12345,
4088        ));
4089        let err = require_localhost(&peer).unwrap_err();
4090        assert_eq!(err.0, StatusCode::FORBIDDEN);
4091    }
4092}