1pub mod api;
11pub mod api_agents;
12pub mod api_clawhub;
13pub mod api_kumiho_proxy;
14pub mod api_mcp;
15pub mod api_memory_graph;
16pub mod api_pairing;
17#[cfg(feature = "plugins-wasm")]
18pub mod api_plugins;
19pub mod api_skills;
20pub mod api_teams;
21#[cfg(feature = "webauthn")]
22pub mod api_webauthn;
23pub mod api_workflows;
24pub mod approval_registry;
25pub mod auth_rate_limit;
26pub mod canvas;
27pub mod kumiho_client;
28pub mod mcp_discovery;
29pub mod nodes;
30pub mod session_queue;
31pub mod sse;
32pub mod static_files;
33pub mod terminal;
34pub mod tls;
35pub mod ws;
36pub mod ws_mcp_events;
37
38use crate::channels::{
39 Channel, GmailPushChannel, LinqChannel, NextcloudTalkChannel, SendMessage, WatiChannel,
40 WhatsAppChannel, session_backend::SessionBackend, session_sqlite::SqliteSessionBackend,
41};
42use crate::config::Config;
43use crate::cost::CostTracker;
44use crate::memory::{self, Memory, MemoryCategory};
45use crate::providers::{self, ChatMessage, Provider};
46use crate::runtime;
47use crate::security::SecurityPolicy;
48use crate::security::pairing::{PairingGuard, constant_time_eq, is_public_bind};
49use crate::tools;
50use crate::tools::canvas::CanvasStore;
51use crate::tools::traits::ToolSpec;
52use crate::util::truncate_with_ellipsis;
53use anyhow::{Context, Result};
54use axum::{
55 Router,
56 body::Bytes,
57 extract::{ConnectInfo, Query, State},
58 http::{HeaderMap, StatusCode, header},
59 response::{IntoResponse, Json},
60 routing::{delete, get, post, put},
61};
62use parking_lot::Mutex;
63use std::collections::HashMap;
64use std::net::{IpAddr, SocketAddr};
65use std::sync::Arc;
66use std::time::{Duration, Instant};
67use tower_http::limit::RequestBodyLimitLayer;
68use tower_http::timeout::TimeoutLayer;
69use uuid::Uuid;
70
71pub const MAX_BODY_SIZE: usize = 65_536;
73pub const REQUEST_TIMEOUT_SECS: u64 = 30;
75
76pub fn gateway_request_timeout_secs() -> u64 {
83 std::env::var("CONSTRUCT_GATEWAY_TIMEOUT_SECS")
84 .ok()
85 .and_then(|v| v.parse().ok())
86 .unwrap_or(REQUEST_TIMEOUT_SECS)
87}
88pub const RATE_LIMIT_WINDOW_SECS: u64 = 60;
90pub const RATE_LIMIT_MAX_KEYS_DEFAULT: usize = 10_000;
92pub const IDEMPOTENCY_MAX_KEYS_DEFAULT: usize = 10_000;
94
95fn webhook_memory_key() -> String {
96 format!("webhook_msg_{}", Uuid::new_v4())
97}
98
99fn whatsapp_memory_key(msg: &crate::channels::traits::ChannelMessage) -> String {
100 format!("whatsapp_{}_{}", msg.sender, msg.id)
101}
102
103fn linq_memory_key(msg: &crate::channels::traits::ChannelMessage) -> String {
104 format!("linq_{}_{}", msg.sender, msg.id)
105}
106
107fn wati_memory_key(msg: &crate::channels::traits::ChannelMessage) -> String {
108 format!("wati_{}_{}", msg.sender, msg.id)
109}
110
111fn nextcloud_talk_memory_key(msg: &crate::channels::traits::ChannelMessage) -> String {
112 format!("nextcloud_talk_{}_{}", msg.sender, msg.id)
113}
114
115fn sender_session_id(channel: &str, msg: &crate::channels::traits::ChannelMessage) -> String {
116 match &msg.thread_ts {
117 Some(thread_id) => format!("{channel}_{thread_id}_{}", msg.sender),
118 None => format!("{channel}_{}", msg.sender),
119 }
120}
121
122fn webhook_session_id(headers: &HeaderMap) -> Option<String> {
123 headers
124 .get("X-Session-Id")
125 .and_then(|v| v.to_str().ok())
126 .map(str::trim)
127 .filter(|value| !value.is_empty())
128 .map(str::to_owned)
129}
130
131fn hash_webhook_secret(value: &str) -> String {
132 use sha2::{Digest, Sha256};
133
134 let digest = Sha256::digest(value.as_bytes());
135 hex::encode(digest)
136}
137
138const RATE_LIMITER_SWEEP_INTERVAL_SECS: u64 = 300; #[derive(Debug)]
142struct SlidingWindowRateLimiter {
143 limit_per_window: u32,
144 window: Duration,
145 max_keys: usize,
146 requests: Mutex<(HashMap<String, Vec<Instant>>, Instant)>,
147}
148
149impl SlidingWindowRateLimiter {
150 fn new(limit_per_window: u32, window: Duration, max_keys: usize) -> Self {
151 Self {
152 limit_per_window,
153 window,
154 max_keys: max_keys.max(1),
155 requests: Mutex::new((HashMap::new(), Instant::now())),
156 }
157 }
158
159 fn prune_stale(requests: &mut HashMap<String, Vec<Instant>>, cutoff: Instant) {
160 requests.retain(|_, timestamps| {
161 timestamps.retain(|t| *t > cutoff);
162 !timestamps.is_empty()
163 });
164 }
165
166 fn allow(&self, key: &str) -> bool {
167 if self.limit_per_window == 0 {
168 return true;
169 }
170
171 let now = Instant::now();
172 let cutoff = now.checked_sub(self.window).unwrap_or_else(Instant::now);
173
174 let mut guard = self.requests.lock();
175 let (requests, last_sweep) = &mut *guard;
176
177 if last_sweep.elapsed() >= Duration::from_secs(RATE_LIMITER_SWEEP_INTERVAL_SECS) {
179 Self::prune_stale(requests, cutoff);
180 *last_sweep = now;
181 }
182
183 if !requests.contains_key(key) && requests.len() >= self.max_keys {
184 Self::prune_stale(requests, cutoff);
186 *last_sweep = now;
187
188 if requests.len() >= self.max_keys {
189 let evict_key = requests
190 .iter()
191 .min_by_key(|(_, timestamps)| timestamps.last().copied().unwrap_or(cutoff))
192 .map(|(k, _)| k.clone());
193 if let Some(evict_key) = evict_key {
194 requests.remove(&evict_key);
195 }
196 }
197 }
198
199 let entry = requests.entry(key.to_owned()).or_default();
200 entry.retain(|instant| *instant > cutoff);
201
202 if entry.len() >= self.limit_per_window as usize {
203 return false;
204 }
205
206 entry.push(now);
207 true
208 }
209}
210
211#[derive(Debug)]
212pub struct GatewayRateLimiter {
213 pair: SlidingWindowRateLimiter,
214 webhook: SlidingWindowRateLimiter,
215}
216
217impl GatewayRateLimiter {
218 fn new(pair_per_minute: u32, webhook_per_minute: u32, max_keys: usize) -> Self {
219 let window = Duration::from_secs(RATE_LIMIT_WINDOW_SECS);
220 Self {
221 pair: SlidingWindowRateLimiter::new(pair_per_minute, window, max_keys),
222 webhook: SlidingWindowRateLimiter::new(webhook_per_minute, window, max_keys),
223 }
224 }
225
226 fn allow_pair(&self, key: &str) -> bool {
227 self.pair.allow(key)
228 }
229
230 fn allow_webhook(&self, key: &str) -> bool {
231 self.webhook.allow(key)
232 }
233}
234
235#[derive(Debug)]
236pub struct IdempotencyStore {
237 ttl: Duration,
238 max_keys: usize,
239 keys: Mutex<HashMap<String, Instant>>,
240}
241
242impl IdempotencyStore {
243 fn new(ttl: Duration, max_keys: usize) -> Self {
244 Self {
245 ttl,
246 max_keys: max_keys.max(1),
247 keys: Mutex::new(HashMap::new()),
248 }
249 }
250
251 fn record_if_new(&self, key: &str) -> bool {
253 let now = Instant::now();
254 let mut keys = self.keys.lock();
255
256 keys.retain(|_, seen_at| now.duration_since(*seen_at) < self.ttl);
257
258 if keys.contains_key(key) {
259 return false;
260 }
261
262 if keys.len() >= self.max_keys {
263 let evict_key = keys
264 .iter()
265 .min_by_key(|(_, seen_at)| *seen_at)
266 .map(|(k, _)| k.clone());
267 if let Some(evict_key) = evict_key {
268 keys.remove(&evict_key);
269 }
270 }
271
272 keys.insert(key.to_owned(), now);
273 true
274 }
275}
276
277fn parse_client_ip(value: &str) -> Option<IpAddr> {
278 let value = value.trim().trim_matches('"').trim();
279 if value.is_empty() {
280 return None;
281 }
282
283 if let Ok(ip) = value.parse::<IpAddr>() {
284 return Some(ip);
285 }
286
287 if let Ok(addr) = value.parse::<SocketAddr>() {
288 return Some(addr.ip());
289 }
290
291 let value = value.trim_matches(['[', ']']);
292 value.parse::<IpAddr>().ok()
293}
294
295fn forwarded_client_ip(headers: &HeaderMap) -> Option<IpAddr> {
296 if let Some(xff) = headers.get("X-Forwarded-For").and_then(|v| v.to_str().ok()) {
301 for candidate in xff.rsplit(',') {
302 if let Some(ip) = parse_client_ip(candidate) {
303 return Some(ip);
304 }
305 }
306 }
307
308 headers
309 .get("X-Real-IP")
310 .and_then(|v| v.to_str().ok())
311 .and_then(parse_client_ip)
312}
313
314pub(super) fn client_key_from_request(
315 peer_addr: Option<SocketAddr>,
316 headers: &HeaderMap,
317 trust_forwarded_headers: bool,
318) -> String {
319 if trust_forwarded_headers {
320 if let Some(ip) = forwarded_client_ip(headers) {
321 return ip.to_string();
322 }
323 }
324
325 peer_addr
326 .map(|addr| addr.ip().to_string())
327 .unwrap_or_else(|| "unknown".to_string())
328}
329
330fn normalize_max_keys(configured: usize, fallback: usize) -> usize {
331 if configured == 0 {
332 fallback.max(1)
333 } else {
334 configured
335 }
336}
337
338#[derive(Clone)]
340pub struct AppState {
341 pub config: Arc<Mutex<Config>>,
342 pub provider: Arc<dyn Provider>,
343 pub model: String,
344 pub temperature: f64,
345 pub mem: Arc<dyn Memory>,
346 pub auto_save: bool,
347 pub webhook_secret_hash: Option<Arc<str>>,
349 pub pairing: Arc<PairingGuard>,
350 pub trust_forwarded_headers: bool,
351 pub rate_limiter: Arc<GatewayRateLimiter>,
352 pub auth_limiter: Arc<auth_rate_limit::AuthRateLimiter>,
353 pub idempotency_store: Arc<IdempotencyStore>,
354 pub whatsapp: Option<Arc<WhatsAppChannel>>,
355 pub whatsapp_app_secret: Option<Arc<str>>,
357 pub linq: Option<Arc<LinqChannel>>,
358 pub linq_signing_secret: Option<Arc<str>>,
360 pub nextcloud_talk: Option<Arc<NextcloudTalkChannel>>,
361 pub nextcloud_talk_webhook_secret: Option<Arc<str>>,
363 pub wati: Option<Arc<WatiChannel>>,
364 pub gmail_push: Option<Arc<GmailPushChannel>>,
366 pub observer: Arc<dyn crate::observability::Observer>,
368 pub tools_registry: Arc<Vec<ToolSpec>>,
370 pub cost_tracker: Option<Arc<CostTracker>>,
372 pub audit_logger: Option<Arc<crate::security::audit::AuditLogger>>,
374 pub event_tx: tokio::sync::broadcast::Sender<serde_json::Value>,
376 pub shutdown_tx: tokio::sync::watch::Sender<bool>,
378 pub node_registry: Arc<nodes::NodeRegistry>,
380 pub path_prefix: String,
382 pub session_backend: Option<Arc<dyn SessionBackend>>,
384 pub session_queue: Arc<session_queue::SessionActorQueue>,
386 pub device_registry: Option<Arc<api_pairing::DeviceRegistry>>,
388 pub pending_pairings: Option<Arc<api_pairing::PairingStore>>,
390 pub canvas_store: CanvasStore,
392 pub mcp_registry: Option<Arc<tools::McpRegistry>>,
394 #[cfg(feature = "webauthn")]
396 pub webauthn: Option<Arc<api_webauthn::WebAuthnState>>,
397 pub approval_registry: Arc<approval_registry::ApprovalRegistry>,
399 pub mcp_local_url: Option<Arc<str>>,
404}
405
406#[allow(clippy::too_many_lines)]
408pub async fn run_gateway(host: &str, port: u16, config: Config) -> Result<()> {
409 if is_public_bind(host) && config.tunnel.provider == "none" && !config.gateway.allow_public_bind
411 {
412 tracing::warn!(
413 "⚠️ Binding to {host} — gateway will be exposed to all network interfaces.\n\
414 Suggestion: use --host 127.0.0.1 (default), configure a tunnel, or set\n\
415 [gateway] allow_public_bind = true in config.toml to silence this warning.\n\n\
416 Docker/VM: if you are running inside a container or VM, this is expected."
417 );
418 }
419 let config_state = Arc::new(Mutex::new(config.clone()));
420
421 let hooks: Option<std::sync::Arc<crate::hooks::HookRunner>> = if config.hooks.enabled {
423 Some(std::sync::Arc::new(crate::hooks::HookRunner::new()))
424 } else {
425 None
426 };
427
428 let addr: SocketAddr = format!("{host}:{port}").parse()?;
429 let listener = tokio::net::TcpListener::bind(addr).await?;
430 let actual_port = listener.local_addr()?.port();
431 let display_addr = format!("{host}:{actual_port}");
432
433 let provider: Arc<dyn Provider> = Arc::from(providers::create_resilient_provider_with_options(
434 config.default_provider.as_deref().unwrap_or("openrouter"),
435 config.api_key.as_deref(),
436 config.api_url.as_deref(),
437 &config.reliability,
438 &providers::ProviderRuntimeOptions {
439 auth_profile_override: None,
440 provider_api_url: config.api_url.clone(),
441 construct_dir: config.config_path.parent().map(std::path::PathBuf::from),
442 secrets_encrypt: config.secrets.encrypt,
443 reasoning_enabled: config.runtime.reasoning_enabled,
444 reasoning_effort: config.runtime.reasoning_effort.clone(),
445 provider_timeout_secs: Some(config.provider_timeout_secs),
446 extra_headers: config.extra_headers.clone(),
447 api_path: config.api_path.clone(),
448 provider_max_tokens: config.provider_max_tokens,
449 },
450 )?);
451 let model = config
452 .default_model
453 .clone()
454 .unwrap_or_else(|| "anthropic/claude-sonnet-4".into());
455 let temperature = config.default_temperature;
456 let mem: Arc<dyn Memory> = Arc::from(memory::create_memory_with_storage_and_routes(
457 &config.memory,
458 &config.embedding_routes,
459 Some(&config.storage.provider.config),
460 &config.workspace_dir,
461 config.api_key.as_deref(),
462 )?);
463 let runtime: Arc<dyn runtime::RuntimeAdapter> =
464 Arc::from(runtime::create_runtime(&config.runtime)?);
465 let security = Arc::new(SecurityPolicy::from_config(
466 &config.autonomy,
467 &config.workspace_dir,
468 ));
469
470 let (composio_key, composio_entity_id) = if config.composio.enabled {
471 (
472 config.composio.api_key.as_deref(),
473 Some(config.composio.entity_id.as_str()),
474 )
475 } else {
476 (None, None)
477 };
478
479 let canvas_store = tools::canvas::global_store();
480
481 let (
482 mut tools_registry_raw,
483 delegate_handle_gw,
484 _reaction_handle_gw,
485 _channel_map_handle,
486 _ask_user_handle_gw,
487 _escalate_handle_gw,
488 ) = tools::all_tools_with_runtime(
489 Arc::new(config.clone()),
490 &security,
491 runtime,
492 Arc::clone(&mem),
493 composio_key,
494 composio_entity_id,
495 &config.browser,
496 &config.http_request,
497 &config.web_fetch,
498 &config.workspace_dir,
499 &config.agents,
500 config.api_key.as_deref(),
501 &config,
502 Some(canvas_store.clone()),
503 );
504
505 let gateway_mcp_config = {
510 let mut c = config.clone();
511 c = crate::agent::kumiho::inject_kumiho(c, false);
512 c = crate::agent::operator::inject_operator(c, false);
513 c.mcp
514 };
515 let mut mcp_registry_shared: Option<Arc<tools::McpRegistry>> = None;
516 if gateway_mcp_config.enabled && !gateway_mcp_config.servers.is_empty() {
517 tracing::info!(
518 "Gateway: initializing MCP client — {} server(s) configured",
519 gateway_mcp_config.servers.len()
520 );
521 match tools::McpRegistry::connect_all(&gateway_mcp_config.servers).await {
522 Ok(registry) => {
523 let registry = std::sync::Arc::new(registry);
524 mcp_registry_shared = Some(std::sync::Arc::clone(®istry));
525 if gateway_mcp_config.deferred_loading {
526 let operator_prefix =
527 format!("{}__", crate::agent::operator::OPERATOR_SERVER_NAME);
528 let kumiho_prefix = format!("{}__", crate::agent::kumiho::KUMIHO_SERVER_NAME);
529 let all_names = registry.tool_names();
530 let mut eager_count = 0usize;
531
532 for name in &all_names {
533 if name.starts_with(&operator_prefix) || name.starts_with(&kumiho_prefix) {
534 if let Some(def) = registry.get_tool_def(name).await {
535 let wrapper: std::sync::Arc<dyn tools::Tool> =
536 std::sync::Arc::new(tools::McpToolWrapper::new(
537 name.clone(),
538 def,
539 std::sync::Arc::clone(®istry),
540 ));
541 if let Some(ref handle) = delegate_handle_gw {
542 handle.write().push(std::sync::Arc::clone(&wrapper));
543 }
544 tools_registry_raw.push(Box::new(tools::ArcToolRef(wrapper)));
545 eager_count += 1;
546 }
547 }
548 }
549
550 let deferred_set = tools::DeferredMcpToolSet::from_registry_filtered(
551 std::sync::Arc::clone(®istry),
552 |name| {
553 !name.starts_with(&operator_prefix) && !name.starts_with(&kumiho_prefix)
554 },
555 )
556 .await;
557 tracing::info!(
558 "Gateway MCP hybrid: {} eager operator+kumiho tool(s), {} deferred stub(s) from {} server(s)",
559 eager_count,
560 deferred_set.len(),
561 registry.server_count()
562 );
563 let activated =
564 std::sync::Arc::new(std::sync::Mutex::new(tools::ActivatedToolSet::new()));
565 tools_registry_raw.push(Box::new(tools::ToolSearchTool::new(
566 deferred_set,
567 activated,
568 )));
569 } else {
570 let names = registry.tool_names();
571 let mut registered = 0usize;
572 for name in names {
573 if let Some(def) = registry.get_tool_def(&name).await {
574 let wrapper: std::sync::Arc<dyn tools::Tool> =
575 std::sync::Arc::new(tools::McpToolWrapper::new(
576 name,
577 def,
578 std::sync::Arc::clone(®istry),
579 ));
580 if let Some(ref handle) = delegate_handle_gw {
581 handle.write().push(std::sync::Arc::clone(&wrapper));
582 }
583 tools_registry_raw.push(Box::new(tools::ArcToolRef(wrapper)));
584 registered += 1;
585 }
586 }
587 tracing::info!(
588 "Gateway MCP: {} tool(s) registered from {} server(s)",
589 registered,
590 registry.server_count()
591 );
592 }
593 }
594 Err(e) => {
595 tracing::error!("Gateway MCP registry failed to initialize: {e:#}");
596 }
597 }
598 }
599
600 let tools_registry: Arc<Vec<ToolSpec>> =
601 Arc::new(tools_registry_raw.iter().map(|t| t.spec()).collect());
602
603 let cost_tracker = CostTracker::get_or_init_global(config.cost.clone(), &config.workspace_dir);
605
606 let audit_logger = if config.security.audit.enabled {
608 match crate::security::audit::AuditLogger::new(
609 config.security.audit.clone(),
610 std::path::PathBuf::from(&config.workspace_dir),
611 ) {
612 Ok(logger) => Some(Arc::new(logger)),
613 Err(e) => {
614 tracing::warn!("Audit logger init failed (dashboard audit viewer disabled): {e}");
615 None
616 }
617 }
618 } else {
619 None
620 };
621
622 let (event_tx, _event_rx) = tokio::sync::broadcast::channel::<serde_json::Value>(4096);
624 let webhook_secret_hash: Option<Arc<str>> =
626 config.channels_config.webhook.as_ref().and_then(|webhook| {
627 webhook.secret.as_ref().and_then(|raw_secret| {
628 let trimmed_secret = raw_secret.trim();
629 (!trimmed_secret.is_empty())
630 .then(|| Arc::<str>::from(hash_webhook_secret(trimmed_secret)))
631 })
632 });
633
634 let whatsapp_channel: Option<Arc<WhatsAppChannel>> = config
636 .channels_config
637 .whatsapp
638 .as_ref()
639 .filter(|wa| wa.is_cloud_config())
640 .map(|wa| {
641 Arc::new(WhatsAppChannel::new(
642 wa.access_token.clone().unwrap_or_default(),
643 wa.phone_number_id.clone().unwrap_or_default(),
644 wa.verify_token.clone().unwrap_or_default(),
645 wa.allowed_numbers.clone(),
646 ))
647 });
648
649 let whatsapp_app_secret: Option<Arc<str>> = std::env::var("CONSTRUCT_WHATSAPP_APP_SECRET")
652 .ok()
653 .and_then(|secret| {
654 let secret = secret.trim();
655 (!secret.is_empty()).then(|| secret.to_owned())
656 })
657 .or_else(|| {
658 config.channels_config.whatsapp.as_ref().and_then(|wa| {
659 wa.app_secret
660 .as_deref()
661 .map(str::trim)
662 .filter(|secret| !secret.is_empty())
663 .map(ToOwned::to_owned)
664 })
665 })
666 .map(Arc::from);
667
668 let linq_channel: Option<Arc<LinqChannel>> = config.channels_config.linq.as_ref().map(|lq| {
670 Arc::new(LinqChannel::new(
671 lq.api_token.clone(),
672 lq.from_phone.clone(),
673 lq.allowed_senders.clone(),
674 ))
675 });
676
677 let linq_signing_secret: Option<Arc<str>> = std::env::var("CONSTRUCT_LINQ_SIGNING_SECRET")
680 .ok()
681 .and_then(|secret| {
682 let secret = secret.trim();
683 (!secret.is_empty()).then(|| secret.to_owned())
684 })
685 .or_else(|| {
686 config.channels_config.linq.as_ref().and_then(|lq| {
687 lq.signing_secret
688 .as_deref()
689 .map(str::trim)
690 .filter(|secret| !secret.is_empty())
691 .map(ToOwned::to_owned)
692 })
693 })
694 .map(Arc::from);
695
696 let wati_channel: Option<Arc<WatiChannel>> =
698 config.channels_config.wati.as_ref().map(|wati_cfg| {
699 Arc::new(
700 WatiChannel::new(
701 wati_cfg.api_token.clone(),
702 wati_cfg.api_url.clone(),
703 wati_cfg.tenant_id.clone(),
704 wati_cfg.allowed_numbers.clone(),
705 )
706 .with_transcription(config.transcription.clone()),
707 )
708 });
709
710 let nextcloud_talk_channel: Option<Arc<NextcloudTalkChannel>> =
712 config.channels_config.nextcloud_talk.as_ref().map(|nc| {
713 Arc::new(NextcloudTalkChannel::new(
714 nc.base_url.clone(),
715 nc.app_token.clone(),
716 nc.bot_name.clone().unwrap_or_default(),
717 nc.allowed_users.clone(),
718 ))
719 });
720
721 let nextcloud_talk_webhook_secret: Option<Arc<str>> =
724 std::env::var("CONSTRUCT_NEXTCLOUD_TALK_WEBHOOK_SECRET")
725 .ok()
726 .and_then(|secret| {
727 let secret = secret.trim();
728 (!secret.is_empty()).then(|| secret.to_owned())
729 })
730 .or_else(|| {
731 config
732 .channels_config
733 .nextcloud_talk
734 .as_ref()
735 .and_then(|nc| {
736 nc.webhook_secret
737 .as_deref()
738 .map(str::trim)
739 .filter(|secret| !secret.is_empty())
740 .map(ToOwned::to_owned)
741 })
742 })
743 .map(Arc::from);
744
745 let gmail_push_channel: Option<Arc<GmailPushChannel>> = config
747 .channels_config
748 .gmail_push
749 .as_ref()
750 .filter(|gp| gp.enabled)
751 .map(|gp| Arc::new(GmailPushChannel::new(gp.clone())));
752
753 let session_backend: Option<Arc<dyn SessionBackend>> = if config.gateway.session_persistence {
755 match SqliteSessionBackend::new(&config.workspace_dir) {
756 Ok(b) => {
757 tracing::info!("Gateway session persistence enabled (SQLite)");
758 if config.gateway.session_ttl_hours > 0 {
759 if let Ok(cleaned) = b.cleanup_stale(config.gateway.session_ttl_hours) {
760 if cleaned > 0 {
761 tracing::info!("Cleaned up {cleaned} stale gateway sessions");
762 }
763 }
764 }
765 Some(Arc::new(b))
766 }
767 Err(e) => {
768 tracing::warn!("Session persistence disabled: {e}");
769 None
770 }
771 }
772 } else {
773 None
774 };
775
776 let pairing = Arc::new(PairingGuard::new(
778 config.gateway.require_pairing,
779 &config.gateway.paired_tokens,
780 ));
781 let rate_limit_max_keys = normalize_max_keys(
782 config.gateway.rate_limit_max_keys,
783 RATE_LIMIT_MAX_KEYS_DEFAULT,
784 );
785 let rate_limiter = Arc::new(GatewayRateLimiter::new(
786 config.gateway.pair_rate_limit_per_minute,
787 config.gateway.webhook_rate_limit_per_minute,
788 rate_limit_max_keys,
789 ));
790 let idempotency_max_keys = normalize_max_keys(
791 config.gateway.idempotency_max_keys,
792 IDEMPOTENCY_MAX_KEYS_DEFAULT,
793 );
794 let idempotency_store = Arc::new(IdempotencyStore::new(
795 Duration::from_secs(config.gateway.idempotency_ttl_secs.max(1)),
796 idempotency_max_keys,
797 ));
798
799 let path_prefix: Option<&str> = config
801 .gateway
802 .path_prefix
803 .as_deref()
804 .filter(|p| !p.is_empty());
805
806 let tunnel = crate::tunnel::create_tunnel(&config.tunnel)?;
808 let mut tunnel_url: Option<String> = None;
809
810 if let Some(ref tun) = tunnel {
811 println!("🔗 Starting {} tunnel...", tun.name());
812 match tun.start(host, actual_port).await {
813 Ok(url) => {
814 println!("🌐 Tunnel active: {url}");
815 tunnel_url = Some(url);
816 }
817 Err(e) => {
818 println!("⚠️ Tunnel failed to start: {e}");
819 println!(" Falling back to local-only mode.");
820 }
821 }
822 }
823
824 let pfx = path_prefix.unwrap_or("");
825 println!("🦀 Construct Gateway listening on http://{display_addr}{pfx}");
826 if let Some(ref url) = tunnel_url {
827 println!(" 🌐 Public URL: {url}");
828 }
829 println!(" 🌐 Web Dashboard: http://{display_addr}{pfx}/");
830 if let Some(code) = pairing.pairing_code() {
831 println!();
832 println!(" 🔐 PAIRING REQUIRED — use this one-time code:");
833 println!(" ┌──────────────┐");
834 println!(" │ {code} │");
835 println!(" └──────────────┘");
836 println!(" Send: POST {pfx}/pair with header X-Pairing-Code: {code}");
837 } else if pairing.require_pairing() {
838 println!(" 🔒 Pairing: ACTIVE (bearer token required)");
839 println!(" To pair a new device: construct gateway get-paircode --new");
840 println!();
841 } else {
842 println!(" ⚠️ Pairing: DISABLED (all requests accepted)");
843 println!();
844 }
845 println!(" POST {pfx}/pair — pair a new client (X-Pairing-Code header)");
846 println!(" POST {pfx}/webhook — {{\"message\": \"your prompt\"}}");
847 if whatsapp_channel.is_some() {
848 println!(" GET {pfx}/whatsapp — Meta webhook verification");
849 println!(" POST {pfx}/whatsapp — WhatsApp message webhook");
850 }
851 if linq_channel.is_some() {
852 println!(" POST {pfx}/linq — Linq message webhook (iMessage/RCS/SMS)");
853 }
854 if wati_channel.is_some() {
855 println!(" GET {pfx}/wati — WATI webhook verification");
856 println!(" POST {pfx}/wati — WATI message webhook");
857 }
858 if nextcloud_talk_channel.is_some() {
859 println!(" POST {pfx}/nextcloud-talk — Nextcloud Talk bot webhook");
860 }
861 println!(" GET {pfx}/api/* — REST API (bearer token required)");
862 println!(" GET {pfx}/ws/chat — WebSocket agent chat");
863 if config.nodes.enabled {
864 println!(" GET {pfx}/ws/nodes — WebSocket node discovery");
865 }
866 println!(" GET {pfx}/health — health check");
867 println!(" GET {pfx}/metrics — Prometheus metrics");
868 println!(" Press Ctrl+C to stop.\n");
869
870 crate::health::mark_component_ok("gateway");
871
872 if let Some(ref hooks) = hooks {
874 hooks.fire_gateway_start(host, actual_port).await;
875 }
876
877 let broadcast_observer: Arc<dyn crate::observability::Observer> =
879 Arc::new(sse::BroadcastObserver::new(
880 crate::observability::create_observer(&config.observability),
881 event_tx.clone(),
882 ));
883
884 let (shutdown_tx, mut shutdown_rx) = tokio::sync::watch::channel(false);
885
886 let node_registry = Arc::new(nodes::NodeRegistry::new(config.nodes.max_nodes));
888
889 let device_registry = if config.gateway.require_pairing {
891 Some(Arc::new(api_pairing::DeviceRegistry::new(
892 &config.workspace_dir,
893 )?))
894 } else {
895 None
896 };
897 let pending_pairings = if config.gateway.require_pairing {
898 Some(Arc::new(api_pairing::PairingStore::new(
899 config.gateway.pairing_dashboard.max_pending_codes,
900 )))
901 } else {
902 None
903 };
904
905 let mcp_workspace_dir = config.workspace_dir.clone();
919 let mcp_config_snapshot = config.clone();
920 let mcp_runtime_handles = {
921 let mut rh = crate::mcp_server::RuntimeHandles::empty();
922
923 if config.workspace.enabled {
925 let workspaces_dir = if config.workspace.workspaces_dir.starts_with("~/") {
926 let home = directories::UserDirs::new()
927 .map(|u| u.home_dir().to_path_buf())
928 .unwrap_or_else(|| std::path::PathBuf::from("."));
929 home.join(&config.workspace.workspaces_dir[2..])
930 } else {
931 std::path::PathBuf::from(&config.workspace.workspaces_dir)
932 };
933 let manager = crate::config::workspace::WorkspaceManager::new(workspaces_dir);
934 rh.workspace_manager = Some(Arc::new(tokio::sync::RwLock::new(manager)));
935 }
936
937 if let Ok(store) = crate::channels::session_store::SessionStore::new(&config.workspace_dir)
939 {
940 let backend: Arc<dyn crate::channels::session_backend::SessionBackend> =
941 Arc::new(store);
942 rh.session_store = Some(backend);
943 }
944
945 if !config.agents.is_empty() {
950 rh.agent_config = Some(Arc::new(config.agents.clone()));
951 rh.fallback_api_key = config.api_key.as_deref().map(Arc::<str>::from);
952 rh.provider_runtime_options =
953 Some(Arc::new(crate::providers::ProviderRuntimeOptions {
954 auth_profile_override: None,
955 provider_api_url: config.api_url.clone(),
956 construct_dir: config.config_path.parent().map(std::path::PathBuf::from),
957 secrets_encrypt: config.secrets.encrypt,
958 reasoning_enabled: config.runtime.reasoning_enabled,
959 reasoning_effort: config.runtime.reasoning_effort.clone(),
960 provider_timeout_secs: Some(config.provider_timeout_secs),
961 extra_headers: config.extra_headers.clone(),
962 api_path: config.api_path.clone(),
963 provider_max_tokens: config.provider_max_tokens,
964 }));
965 }
966
967 let (mcp_tools_boxed, _d_h, _r_h, _c_h, _a_h, _e_h) = tools::all_tools_with_runtime(
974 Arc::new(config.clone()),
975 &security,
976 Arc::new(crate::runtime::NativeRuntime::new()),
977 Arc::clone(&mem),
978 composio_key,
979 composio_entity_id,
980 &config.browser,
981 &config.http_request,
982 &config.web_fetch,
983 &config.workspace_dir,
984 &config.agents,
985 config.api_key.as_deref(),
986 &config,
987 Some(canvas_store.clone()),
988 );
989 let mcp_tool_arcs: Vec<Arc<dyn tools::Tool>> = mcp_tools_boxed
991 .into_iter()
992 .map(|b| Arc::<dyn tools::Tool>::from(b))
993 .collect();
994 rh.pre_built_tools = Some(mcp_tool_arcs);
995
996 rh
997 };
998
999 let mut state = AppState {
1000 config: config_state,
1001 provider,
1002 model,
1003 temperature,
1004 mem,
1005 auto_save: config.memory.auto_save,
1006 webhook_secret_hash,
1007 pairing,
1008 trust_forwarded_headers: config.gateway.trust_forwarded_headers,
1009 rate_limiter,
1010 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
1011 idempotency_store,
1012 whatsapp: whatsapp_channel,
1013 whatsapp_app_secret,
1014 linq: linq_channel,
1015 linq_signing_secret,
1016 nextcloud_talk: nextcloud_talk_channel,
1017 nextcloud_talk_webhook_secret,
1018 wati: wati_channel,
1019 gmail_push: gmail_push_channel,
1020 observer: broadcast_observer,
1021 tools_registry,
1022 cost_tracker,
1023 audit_logger,
1024 event_tx,
1025 shutdown_tx,
1026 node_registry,
1027 session_backend,
1028 session_queue: Arc::new(session_queue::SessionActorQueue::new(8, 30, 600)),
1029 device_registry,
1030 pending_pairings,
1031 path_prefix: path_prefix.unwrap_or("").to_string(),
1032 canvas_store,
1033 mcp_registry: mcp_registry_shared,
1034 approval_registry: approval_registry::global(),
1035 mcp_local_url: None,
1037 #[cfg(feature = "webauthn")]
1038 webauthn: if config.security.webauthn.enabled {
1039 let secret_store = Arc::new(crate::security::SecretStore::new(
1040 &config.workspace_dir,
1041 true,
1042 ));
1043 let wa_config = crate::security::webauthn::WebAuthnConfig {
1044 enabled: true,
1045 rp_id: config.security.webauthn.rp_id.clone(),
1046 rp_origin: config.security.webauthn.rp_origin.clone(),
1047 rp_name: config.security.webauthn.rp_name.clone(),
1048 };
1049 Some(Arc::new(api_webauthn::WebAuthnState {
1050 manager: crate::security::webauthn::WebAuthnManager::new(
1051 wa_config,
1052 secret_store,
1053 &config.workspace_dir,
1054 ),
1055 pending_registrations: parking_lot::Mutex::new(std::collections::HashMap::new()),
1056 pending_authentications: parking_lot::Mutex::new(std::collections::HashMap::new()),
1057 }))
1058 } else {
1059 None
1060 },
1061 };
1062
1063 let mcp_shutdown_watch = state.shutdown_tx.subscribe();
1070 let mcp_task: Option<tokio::task::JoinHandle<()>> = {
1071 let (mcp_state, mcp_skipped) = crate::mcp_server::server::state_from_runtime(
1072 &mcp_workspace_dir,
1073 &mcp_config_snapshot,
1074 &mcp_runtime_handles,
1075 );
1076 for (name, reason) in &mcp_skipped {
1077 tracing::info!("mcp-server: skipped tool `{name}` — {reason}");
1078 }
1079 tracing::info!(
1080 "mcp-server: advertising {} tools to external MCP clients",
1081 mcp_state.tools.len()
1082 );
1083
1084 match crate::mcp_server::serve_on(SocketAddr::from(([127, 0, 0, 1], 0)), mcp_state).await {
1085 Ok(handle) => {
1086 let url = format!("http://{}/mcp", handle.addr);
1087 if let Err(e) = crate::mcp_server::server::write_discovery_file(&url) {
1088 tracing::warn!("mcp-server: failed to write discovery file: {e}");
1089 } else {
1090 tracing::info!(
1091 "mcp-server: listening on {url} (discovery at ~/.construct/mcp.json)"
1092 );
1093 }
1094
1095 state.mcp_local_url = Some(Arc::from(format!("http://{}", handle.addr)));
1098
1099 let mut watch = mcp_shutdown_watch;
1100 Some(tokio::spawn(async move {
1101 loop {
1103 if *watch.borrow() {
1104 break;
1105 }
1106 if watch.changed().await.is_err() {
1107 break;
1108 }
1109 }
1110 let _ = handle.shutdown.send(());
1111 let _ = handle.joined.await;
1112 crate::mcp_server::server::cleanup_discovery_file();
1113 tracing::info!("mcp-server: stopped");
1114 }))
1115 }
1116 Err(e) => {
1117 tracing::error!("mcp-server: failed to bind: {e}");
1118 None
1119 }
1120 }
1121 };
1122
1123 let config_put_router = Router::new()
1125 .route("/api/config", put(api::handle_api_config_put))
1126 .layer(RequestBodyLimitLayer::new(1_048_576));
1127
1128 let memory_graph_router = Router::new()
1131 .route(
1132 "/api/memory/graph",
1133 get(api_memory_graph::handle_memory_graph),
1134 )
1135 .with_state(state.clone())
1136 .layer(RequestBodyLimitLayer::new(MAX_BODY_SIZE))
1137 .layer(TimeoutLayer::with_status_code(
1138 StatusCode::REQUEST_TIMEOUT,
1139 Duration::from_secs(60),
1140 ));
1141
1142 let inner = Router::new()
1144 .route("/admin/shutdown", post(handle_admin_shutdown))
1146 .route("/admin/paircode", get(handle_admin_paircode))
1147 .route("/admin/paircode/new", post(handle_admin_paircode_new))
1148 .route("/health", get(handle_health))
1150 .route("/metrics", get(handle_metrics))
1151 .route("/pair", post(handle_pair))
1152 .route("/pair/code", get(handle_pair_code))
1153 .route("/webhook", post(handle_webhook))
1154 .route("/whatsapp", get(handle_whatsapp_verify))
1155 .route("/whatsapp", post(handle_whatsapp_message))
1156 .route("/linq", post(handle_linq_webhook))
1157 .route("/wati", get(handle_wati_verify))
1158 .route("/wati", post(handle_wati_webhook))
1159 .route("/nextcloud-talk", post(handle_nextcloud_talk_webhook))
1160 .route("/webhook/gmail", post(handle_gmail_push_webhook))
1161 .route("/hooks/claude-code", post(api::handle_claude_code_hook))
1163 .route("/api/status", get(api::handle_api_status))
1165 .route("/api/config", get(api::handle_api_config_get))
1166 .route("/api/tools", get(api::handle_api_tools))
1167 .route("/api/cron", get(api::handle_api_cron_list))
1168 .route("/api/cron", post(api::handle_api_cron_add))
1169 .route(
1170 "/api/cron/settings",
1171 get(api::handle_api_cron_settings_get).patch(api::handle_api_cron_settings_patch),
1172 )
1173 .route(
1174 "/api/cron/{id}",
1175 delete(api::handle_api_cron_delete).patch(api::handle_api_cron_patch),
1176 )
1177 .route("/api/cron/{id}/runs", get(api::handle_api_cron_runs))
1178 .route("/api/integrations", get(api::handle_api_integrations))
1179 .route(
1180 "/api/integrations/settings",
1181 get(api::handle_api_integrations_settings),
1182 )
1183 .route(
1184 "/api/doctor",
1185 get(api::handle_api_doctor).post(api::handle_api_doctor),
1186 )
1187 .route("/api/cost", get(api::handle_api_cost))
1189 .route("/api/audit", get(api::handle_api_audit))
1190 .route("/api/audit/verify", get(api::handle_api_audit_verify))
1191 .route("/api/cli-tools", get(api::handle_api_cli_tools))
1192 .route("/api/health", get(api::handle_api_health))
1193 .route("/api/mcp/discovery", get(api_mcp::handle_api_mcp_discovery))
1194 .route("/api/mcp/servers/test", post(api_mcp::handle_api_mcp_servers_test))
1195 .route("/api/mcp/health", get(api_mcp::handle_api_mcp_health))
1197 .route("/api/mcp/session", post(api_mcp::handle_api_mcp_session_create))
1198 .route(
1199 "/api/mcp/session/{session_id}/events",
1200 get(api_mcp::handle_api_mcp_session_events),
1201 )
1202 .route("/api/mcp/call", post(api_mcp::handle_api_mcp_call))
1203 .route("/api/nodes", get(api::handle_api_nodes))
1204 .route("/api/nodes/{node_id}/invoke", post(api::handle_api_node_invoke))
1205 .route("/api/sessions", get(api::handle_api_sessions_list))
1206 .route("/api/sessions/running", get(api::handle_api_sessions_running))
1207 .route(
1208 "/api/sessions/{id}/messages",
1209 get(api::handle_api_session_messages),
1210 )
1211 .route("/api/sessions/{id}", delete(api::handle_api_session_delete).put(api::handle_api_session_rename))
1212 .route("/api/sessions/{id}/state", get(api::handle_api_session_state))
1213 .route("/api/channels", get(api::handle_api_channels))
1215 .route("/api/channel-events", post(api::handle_api_channel_events))
1216 .route("/api/agents", get(api_agents::handle_list_agents).post(api_agents::handle_create_agent))
1218 .route("/api/agents/deprecate", post(api_agents::handle_deprecate_agent))
1219 .route("/api/agents/{*kref}", put(api_agents::handle_update_agent).delete(api_agents::handle_delete_agent))
1220 .route("/api/skills", get(api_skills::handle_list_skills).post(api_skills::handle_create_skill))
1222 .route("/api/skills/deprecate", post(api_skills::handle_deprecate_skill))
1223 .route("/api/skills/{*kref}", get(api_skills::handle_get_skill).put(api_skills::handle_update_skill).delete(api_skills::handle_delete_skill))
1224 .route("/api/teams", get(api_teams::handle_list_teams).post(api_teams::handle_create_team))
1226 .route("/api/teams/deprecate", post(api_teams::handle_deprecate_team))
1227 .route("/api/teams/{*kref}", get(api_teams::handle_get_team).put(api_teams::handle_update_team).delete(api_teams::handle_delete_team))
1228 .route("/api/workflows", get(api_workflows::handle_list_workflows).post(api_workflows::handle_create_workflow))
1230 .route("/api/workflows/deprecate", post(api_workflows::handle_deprecate_workflow))
1231 .route("/api/workflows/run/{name}", post(api_workflows::handle_run_workflow))
1232 .route("/api/workflows/revisions/{*kref}", get(api_workflows::handle_get_workflow_by_revision))
1233 .route("/api/workflows/{*kref}", put(api_workflows::handle_update_workflow).delete(api_workflows::handle_delete_workflow))
1234 .route("/api/workflows/runs", get(api_workflows::handle_list_workflow_runs))
1235 .route("/api/workflows/runs/{run_id}", get(api_workflows::handle_get_workflow_run).delete(api_workflows::handle_delete_workflow_run))
1236 .route("/api/workflows/runs/{run_id}/approve", post(api_workflows::handle_approve_workflow_run))
1237 .route("/api/workflows/runs/{run_id}/retry", post(api_workflows::handle_retry_workflow_run))
1238 .route("/api/workflows/agent-activity/{agent_id}", get(api_workflows::handle_agent_activity))
1239 .route("/api/workflows/dashboard", get(api_workflows::handle_workflow_dashboard))
1240 .route("/api/clawhub/search", get(api_clawhub::handle_clawhub_search))
1242 .route("/api/clawhub/trending", get(api_clawhub::handle_clawhub_trending))
1243 .route("/api/clawhub/skills/{slug}", get(api_clawhub::handle_clawhub_skill_detail))
1244 .route("/api/clawhub/install/{slug}", post(api_clawhub::handle_clawhub_install))
1245 .route("/api/kumiho/{*path}", get(api_kumiho_proxy::handle_kumiho_proxy))
1248 .route("/api/pairing/initiate", post(api_pairing::initiate_pairing))
1250 .route("/api/pair", post(api_pairing::submit_pairing_enhanced))
1251 .route("/api/devices", get(api_pairing::list_devices))
1252 .route("/api/devices/{id}", delete(api_pairing::revoke_device))
1253 .route(
1254 "/api/devices/{id}/token/rotate",
1255 post(api_pairing::rotate_token),
1256 )
1257 .route("/api/canvas", get(canvas::handle_canvas_list))
1259 .route(
1260 "/api/canvas/{id}",
1261 get(canvas::handle_canvas_get)
1262 .post(canvas::handle_canvas_post)
1263 .delete(canvas::handle_canvas_clear),
1264 )
1265 .route(
1266 "/api/canvas/{id}/history",
1267 get(canvas::handle_canvas_history),
1268 );
1269
1270 #[cfg(feature = "webauthn")]
1272 let inner = inner
1273 .route(
1274 "/api/webauthn/register/start",
1275 post(api_webauthn::handle_register_start),
1276 )
1277 .route(
1278 "/api/webauthn/register/finish",
1279 post(api_webauthn::handle_register_finish),
1280 )
1281 .route(
1282 "/api/webauthn/auth/start",
1283 post(api_webauthn::handle_auth_start),
1284 )
1285 .route(
1286 "/api/webauthn/auth/finish",
1287 post(api_webauthn::handle_auth_finish),
1288 )
1289 .route(
1290 "/api/webauthn/credentials",
1291 get(api_webauthn::handle_list_credentials),
1292 )
1293 .route(
1294 "/api/webauthn/credentials/{id}",
1295 delete(api_webauthn::handle_delete_credential),
1296 );
1297
1298 #[cfg(feature = "plugins-wasm")]
1300 let inner = inner.route(
1301 "/api/plugins",
1302 get(api_plugins::plugin_routes::list_plugins),
1303 );
1304
1305 let inner = inner
1306 .route("/api/events", get(sse::handle_sse_events))
1308 .route("/api/daemon/logs", get(sse::handle_api_daemon_logs))
1309 .route("/ws/chat", get(ws::handle_ws_chat))
1311 .route("/ws/canvas/{id}", get(canvas::handle_ws_canvas))
1313 .route("/ws/nodes", get(nodes::handle_ws_nodes))
1315 .route("/ws/terminal", get(terminal::handle_ws_terminal))
1317 .route("/ws/mcp/events", get(ws_mcp_events::handle_ws_mcp_events))
1319 .route("/_app/{*path}", get(static_files::handle_static))
1321 .merge(config_put_router)
1323 .fallback(get(static_files::handle_spa_fallback))
1325 .with_state(state)
1326 .layer(RequestBodyLimitLayer::new(MAX_BODY_SIZE))
1327 .layer(TimeoutLayer::with_status_code(
1328 StatusCode::REQUEST_TIMEOUT,
1329 Duration::from_secs(gateway_request_timeout_secs()),
1330 ));
1331
1332 let inner = inner.merge(memory_graph_router);
1334
1335 let app = if let Some(prefix) = path_prefix {
1339 let redirect_target = prefix.to_string();
1340 Router::new().nest(prefix, inner).route(
1341 &format!("{prefix}/"),
1342 get(|| async move { axum::response::Redirect::permanent(&redirect_target) }),
1343 )
1344 } else {
1345 inner
1346 };
1347
1348 let tls_acceptor = match &config.gateway.tls {
1350 Some(tls_cfg) if tls_cfg.enabled => {
1351 let has_mtls = tls_cfg.client_auth.as_ref().is_some_and(|ca| ca.enabled);
1352 if has_mtls {
1353 tracing::info!("TLS enabled with mutual TLS (mTLS) client verification");
1354 } else {
1355 tracing::info!("TLS enabled (no client certificate requirement)");
1356 }
1357 Some(tls::build_tls_acceptor(tls_cfg)?)
1358 }
1359 _ => None,
1360 };
1361
1362 if let Some(tls_acceptor) = tls_acceptor {
1363 let app = app.into_make_service_with_connect_info::<SocketAddr>();
1365 let mut app = app;
1366
1367 let mut shutdown_signal = shutdown_rx;
1368 loop {
1369 tokio::select! {
1370 conn = listener.accept() => {
1371 let (tcp_stream, remote_addr) = conn?;
1372 let tls_acceptor = tls_acceptor.clone();
1373 let svc = tower::MakeService::<
1374 SocketAddr,
1375 hyper::Request<hyper::body::Incoming>,
1376 >::make_service(&mut app, remote_addr)
1377 .await
1378 .expect("infallible make_service");
1379
1380 tokio::spawn(async move {
1381 let tls_stream = match tls_acceptor.accept(tcp_stream).await {
1382 Ok(s) => s,
1383 Err(e) => {
1384 tracing::debug!("TLS handshake failed from {remote_addr}: {e}");
1385 return;
1386 }
1387 };
1388 let io = hyper_util::rt::TokioIo::new(tls_stream);
1389 let hyper_svc = hyper::service::service_fn(move |req: hyper::Request<hyper::body::Incoming>| {
1390 let mut svc = svc.clone();
1391 async move {
1392 tower::Service::call(&mut svc, req).await
1393 }
1394 });
1395 if let Err(e) = hyper_util::server::conn::auto::Builder::new(
1396 hyper_util::rt::TokioExecutor::new(),
1397 )
1398 .serve_connection(io, hyper_svc)
1399 .await
1400 {
1401 tracing::debug!("connection error from {remote_addr}: {e}");
1402 }
1403 });
1404 }
1405 _ = shutdown_signal.changed() => {
1406 tracing::info!("🦀 Construct Gateway shutting down...");
1407 break;
1408 }
1409 }
1410 }
1411 } else {
1412 axum::serve(
1414 listener,
1415 app.into_make_service_with_connect_info::<SocketAddr>(),
1416 )
1417 .with_graceful_shutdown(async move {
1418 let _ = shutdown_rx.changed().await;
1419 tracing::info!("🦀 Construct Gateway shutting down...");
1420 })
1421 .await?;
1422 }
1423
1424 if let Some(task) = mcp_task {
1427 let _ = task.await;
1428 }
1429
1430 Ok(())
1431}
1432
1433async fn handle_health(State(state): State<AppState>) -> impl IntoResponse {
1439 let body = serde_json::json!({
1440 "status": "ok",
1441 "paired": state.pairing.is_paired(),
1442 "require_pairing": state.pairing.require_pairing(),
1443 "runtime": crate::health::snapshot_json(),
1444 });
1445 Json(body)
1446}
1447
1448const PROMETHEUS_CONTENT_TYPE: &str = "text/plain; version=0.0.4; charset=utf-8";
1450
1451fn prometheus_disabled_hint() -> String {
1452 String::from(
1453 "# Prometheus backend not enabled. Set [observability] backend = \"prometheus\" in config.\n",
1454 )
1455}
1456
1457#[cfg(feature = "observability-prometheus")]
1458fn prometheus_observer_from_state(
1459 observer: &dyn crate::observability::Observer,
1460) -> Option<&crate::observability::PrometheusObserver> {
1461 observer
1462 .as_any()
1463 .downcast_ref::<crate::observability::PrometheusObserver>()
1464 .or_else(|| {
1465 observer
1466 .as_any()
1467 .downcast_ref::<sse::BroadcastObserver>()
1468 .and_then(|broadcast| {
1469 broadcast
1470 .inner()
1471 .as_any()
1472 .downcast_ref::<crate::observability::PrometheusObserver>()
1473 })
1474 })
1475}
1476
1477async fn handle_metrics(State(state): State<AppState>) -> impl IntoResponse {
1479 let body = {
1480 #[cfg(feature = "observability-prometheus")]
1481 {
1482 if let Some(prom) = prometheus_observer_from_state(state.observer.as_ref()) {
1483 prom.encode()
1484 } else {
1485 prometheus_disabled_hint()
1486 }
1487 }
1488 #[cfg(not(feature = "observability-prometheus"))]
1489 {
1490 let _ = &state;
1491 prometheus_disabled_hint()
1492 }
1493 };
1494
1495 (
1496 StatusCode::OK,
1497 [(header::CONTENT_TYPE, PROMETHEUS_CONTENT_TYPE)],
1498 body,
1499 )
1500}
1501
1502#[axum::debug_handler]
1504async fn handle_pair(
1505 State(state): State<AppState>,
1506 ConnectInfo(peer_addr): ConnectInfo<SocketAddr>,
1507 headers: HeaderMap,
1508) -> impl IntoResponse {
1509 let rate_key =
1510 client_key_from_request(Some(peer_addr), &headers, state.trust_forwarded_headers);
1511 let peer_is_loopback = peer_addr.ip().is_loopback();
1512 if !state.rate_limiter.allow_pair(&rate_key) {
1513 tracing::warn!("/pair rate limit exceeded");
1514 let err = serde_json::json!({
1515 "error": "Too many pairing requests. Please retry later.",
1516 "retry_after": RATE_LIMIT_WINDOW_SECS,
1517 });
1518 return (StatusCode::TOO_MANY_REQUESTS, Json(err));
1519 }
1520
1521 if let Err(e) = state.auth_limiter.check_rate_limit(&rate_key, peer_is_loopback) {
1523 tracing::warn!("🔐 Pairing auth rate limit exceeded for {rate_key}");
1524 let err = serde_json::json!({
1525 "error": format!("Too many auth attempts. Try again in {}s.", e.retry_after_secs),
1526 "retry_after": e.retry_after_secs,
1527 });
1528 return (StatusCode::TOO_MANY_REQUESTS, Json(err));
1529 }
1530
1531 let code = headers
1532 .get("X-Pairing-Code")
1533 .and_then(|v| v.to_str().ok())
1534 .unwrap_or("");
1535
1536 match state.pairing.try_pair(code, &rate_key).await {
1537 Ok(Some(token)) => {
1538 tracing::info!("🔐 New client paired successfully");
1539 if let Some(ref logger) = state.audit_logger {
1540 let _ =
1541 logger.log_auth_success("gateway", &format!("Client paired from {rate_key}"));
1542 }
1543 if let Err(err) =
1544 Box::pin(persist_pairing_tokens(state.config.clone(), &state.pairing)).await
1545 {
1546 tracing::error!("🔐 Pairing succeeded but token persistence failed: {err:#}");
1547 let body = serde_json::json!({
1548 "paired": true,
1549 "persisted": false,
1550 "token": token,
1551 "message": "Paired for this process, but failed to persist token to config.toml. Check config path and write permissions.",
1552 });
1553 return (StatusCode::OK, Json(body));
1554 }
1555
1556 let body = serde_json::json!({
1557 "paired": true,
1558 "persisted": true,
1559 "token": token,
1560 "message": "Save this token — use it as Authorization: Bearer <token>"
1561 });
1562 (StatusCode::OK, Json(body))
1563 }
1564 Ok(None) => {
1565 state.auth_limiter.record_attempt(&rate_key, peer_is_loopback);
1566 tracing::warn!("🔐 Pairing attempt with invalid code");
1567 if let Some(ref logger) = state.audit_logger {
1568 let _ = logger
1569 .log_auth_failure("gateway", &format!("Invalid pairing code from {rate_key}"));
1570 }
1571 let err = serde_json::json!({"error": "Invalid pairing code"});
1572 (StatusCode::FORBIDDEN, Json(err))
1573 }
1574 Err(lockout_secs) => {
1575 tracing::warn!(
1576 "🔐 Pairing locked out — too many failed attempts ({lockout_secs}s remaining)"
1577 );
1578 if let Some(ref logger) = state.audit_logger {
1579 let _ = logger.log_auth_failure(
1580 "gateway",
1581 &format!("Pairing lockout for {rate_key} ({lockout_secs}s)"),
1582 );
1583 }
1584 let err = serde_json::json!({
1585 "error": format!("Too many failed attempts. Try again in {lockout_secs}s."),
1586 "retry_after": lockout_secs
1587 });
1588 (StatusCode::TOO_MANY_REQUESTS, Json(err))
1589 }
1590 }
1591}
1592
1593async fn persist_pairing_tokens(config: Arc<Mutex<Config>>, pairing: &PairingGuard) -> Result<()> {
1594 let paired_tokens = pairing.tokens();
1595 let mut updated_cfg = { config.lock().clone() };
1598 updated_cfg.gateway.paired_tokens = paired_tokens;
1599 updated_cfg
1600 .save()
1601 .await
1602 .context("Failed to persist paired tokens to config.toml")?;
1603
1604 *config.lock() = updated_cfg;
1606 Ok(())
1607}
1608
1609async fn run_gateway_chat_simple(state: &AppState, message: &str) -> anyhow::Result<String> {
1611 let user_messages = vec![ChatMessage::user(message)];
1612
1613 let system_prompt = {
1616 let config_guard = state.config.lock();
1617 crate::channels::build_system_prompt(
1618 &config_guard.workspace_dir,
1619 &state.model,
1620 &[], &[], Some(&config_guard.identity),
1623 None, )
1625 };
1626
1627 let mut messages = Vec::with_capacity(1 + user_messages.len());
1628 messages.push(ChatMessage::system(system_prompt));
1629 messages.extend(user_messages);
1630
1631 let multimodal_config = state.config.lock().multimodal.clone();
1632 let prepared =
1633 crate::multimodal::prepare_messages_for_provider(&messages, &multimodal_config).await?;
1634
1635 state
1636 .provider
1637 .chat_with_history(&prepared.messages, &state.model, state.temperature)
1638 .await
1639}
1640
1641async fn run_gateway_chat_with_tools(
1643 state: &AppState,
1644 message: &str,
1645 session_id: Option<&str>,
1646) -> anyhow::Result<String> {
1647 let config = state.config.lock().clone();
1648 Box::pin(crate::agent::process_message(config, message, session_id)).await
1649}
1650
1651#[derive(serde::Deserialize)]
1653pub struct WebhookBody {
1654 pub message: String,
1655}
1656
1657async fn handle_webhook(
1659 State(state): State<AppState>,
1660 ConnectInfo(peer_addr): ConnectInfo<SocketAddr>,
1661 headers: HeaderMap,
1662 body: Result<Json<WebhookBody>, axum::extract::rejection::JsonRejection>,
1663) -> impl IntoResponse {
1664 let rate_key =
1665 client_key_from_request(Some(peer_addr), &headers, state.trust_forwarded_headers);
1666 let peer_is_loopback = peer_addr.ip().is_loopback();
1667 if !state.rate_limiter.allow_webhook(&rate_key) {
1668 tracing::warn!("/webhook rate limit exceeded");
1669 let err = serde_json::json!({
1670 "error": "Too many webhook requests. Please retry later.",
1671 "retry_after": RATE_LIMIT_WINDOW_SECS,
1672 });
1673 return (StatusCode::TOO_MANY_REQUESTS, Json(err));
1674 }
1675
1676 if state.pairing.require_pairing() {
1678 if let Err(e) = state.auth_limiter.check_rate_limit(&rate_key, peer_is_loopback) {
1679 tracing::warn!("Webhook: auth rate limit exceeded for {rate_key}");
1680 let err = serde_json::json!({
1681 "error": format!("Too many auth attempts. Try again in {}s.", e.retry_after_secs),
1682 "retry_after": e.retry_after_secs,
1683 });
1684 return (StatusCode::TOO_MANY_REQUESTS, Json(err));
1685 }
1686 let auth = headers
1687 .get(header::AUTHORIZATION)
1688 .and_then(|v| v.to_str().ok())
1689 .unwrap_or("");
1690 let token = auth.strip_prefix("Bearer ").unwrap_or("");
1691 if !state.pairing.is_authenticated(token) {
1692 state.auth_limiter.record_attempt(&rate_key, peer_is_loopback);
1693 tracing::warn!("Webhook: rejected — not paired / invalid bearer token");
1694 let err = serde_json::json!({
1695 "error": "Unauthorized — pair first via POST /pair, then send Authorization: Bearer <token>"
1696 });
1697 return (StatusCode::UNAUTHORIZED, Json(err));
1698 }
1699 }
1700
1701 if let Some(ref secret_hash) = state.webhook_secret_hash {
1703 let header_hash = headers
1704 .get("X-Webhook-Secret")
1705 .and_then(|v| v.to_str().ok())
1706 .map(str::trim)
1707 .filter(|value| !value.is_empty())
1708 .map(hash_webhook_secret);
1709 match header_hash {
1710 Some(val) if constant_time_eq(&val, secret_hash.as_ref()) => {}
1711 _ => {
1712 tracing::warn!("Webhook: rejected request — invalid or missing X-Webhook-Secret");
1713 let err = serde_json::json!({"error": "Unauthorized — invalid or missing X-Webhook-Secret header"});
1714 return (StatusCode::UNAUTHORIZED, Json(err));
1715 }
1716 }
1717 }
1718
1719 let Json(webhook_body) = match body {
1721 Ok(b) => b,
1722 Err(e) => {
1723 tracing::warn!("Webhook JSON parse error: {e}");
1724 let err = serde_json::json!({
1725 "error": "Invalid JSON body. Expected: {\"message\": \"...\"}"
1726 });
1727 return (StatusCode::BAD_REQUEST, Json(err));
1728 }
1729 };
1730
1731 if let Some(idempotency_key) = headers
1733 .get("X-Idempotency-Key")
1734 .and_then(|v| v.to_str().ok())
1735 .map(str::trim)
1736 .filter(|value| !value.is_empty())
1737 {
1738 if !state.idempotency_store.record_if_new(idempotency_key) {
1739 tracing::info!("Webhook duplicate ignored (idempotency key: {idempotency_key})");
1740 let body = serde_json::json!({
1741 "status": "duplicate",
1742 "idempotent": true,
1743 "message": "Request already processed for this idempotency key"
1744 });
1745 return (StatusCode::OK, Json(body));
1746 }
1747 }
1748
1749 let message = &webhook_body.message;
1750 let session_id = webhook_session_id(&headers);
1751
1752 if state.auto_save && !memory::should_skip_autosave_content(message) {
1753 let key = webhook_memory_key();
1754 let _ = state
1755 .mem
1756 .store(
1757 &key,
1758 message,
1759 MemoryCategory::Conversation,
1760 session_id.as_deref(),
1761 )
1762 .await;
1763 }
1764
1765 let provider_label = state
1766 .config
1767 .lock()
1768 .default_provider
1769 .clone()
1770 .unwrap_or_else(|| "unknown".to_string());
1771 let model_label = state.model.clone();
1772 let started_at = Instant::now();
1773
1774 state
1775 .observer
1776 .record_event(&crate::observability::ObserverEvent::AgentStart {
1777 provider: provider_label.clone(),
1778 model: model_label.clone(),
1779 });
1780 state
1781 .observer
1782 .record_event(&crate::observability::ObserverEvent::LlmRequest {
1783 provider: provider_label.clone(),
1784 model: model_label.clone(),
1785 messages_count: 1,
1786 });
1787
1788 match run_gateway_chat_simple(&state, message).await {
1789 Ok(response) => {
1790 let duration = started_at.elapsed();
1791 state
1792 .observer
1793 .record_event(&crate::observability::ObserverEvent::LlmResponse {
1794 provider: provider_label.clone(),
1795 model: model_label.clone(),
1796 duration,
1797 success: true,
1798 error_message: None,
1799 input_tokens: None,
1800 output_tokens: None,
1801 });
1802 state.observer.record_metric(
1803 &crate::observability::traits::ObserverMetric::RequestLatency(duration),
1804 );
1805 state
1806 .observer
1807 .record_event(&crate::observability::ObserverEvent::AgentEnd {
1808 provider: provider_label,
1809 model: model_label,
1810 duration,
1811 tokens_used: None,
1812 cost_usd: None,
1813 });
1814
1815 let body = serde_json::json!({"response": response, "model": state.model});
1816 (StatusCode::OK, Json(body))
1817 }
1818 Err(e) => {
1819 let duration = started_at.elapsed();
1820 let sanitized = providers::sanitize_api_error(&e.to_string());
1821
1822 state
1823 .observer
1824 .record_event(&crate::observability::ObserverEvent::LlmResponse {
1825 provider: provider_label.clone(),
1826 model: model_label.clone(),
1827 duration,
1828 success: false,
1829 error_message: Some(sanitized.clone()),
1830 input_tokens: None,
1831 output_tokens: None,
1832 });
1833 state.observer.record_metric(
1834 &crate::observability::traits::ObserverMetric::RequestLatency(duration),
1835 );
1836 state
1837 .observer
1838 .record_event(&crate::observability::ObserverEvent::Error {
1839 component: "gateway".to_string(),
1840 message: sanitized.clone(),
1841 });
1842 state
1843 .observer
1844 .record_event(&crate::observability::ObserverEvent::AgentEnd {
1845 provider: provider_label,
1846 model: model_label,
1847 duration,
1848 tokens_used: None,
1849 cost_usd: None,
1850 });
1851
1852 tracing::error!("Webhook provider error: {}", sanitized);
1853 let err = serde_json::json!({"error": "LLM request failed"});
1854 (StatusCode::INTERNAL_SERVER_ERROR, Json(err))
1855 }
1856 }
1857}
1858
1859#[derive(serde::Deserialize)]
1861pub struct WhatsAppVerifyQuery {
1862 #[serde(rename = "hub.mode")]
1863 pub mode: Option<String>,
1864 #[serde(rename = "hub.verify_token")]
1865 pub verify_token: Option<String>,
1866 #[serde(rename = "hub.challenge")]
1867 pub challenge: Option<String>,
1868}
1869
1870async fn handle_whatsapp_verify(
1872 State(state): State<AppState>,
1873 Query(params): Query<WhatsAppVerifyQuery>,
1874) -> impl IntoResponse {
1875 let Some(ref wa) = state.whatsapp else {
1876 return (StatusCode::NOT_FOUND, "WhatsApp not configured".to_string());
1877 };
1878
1879 let token_matches = params
1881 .verify_token
1882 .as_deref()
1883 .is_some_and(|t| constant_time_eq(t, wa.verify_token()));
1884 if params.mode.as_deref() == Some("subscribe") && token_matches {
1885 if let Some(ch) = params.challenge {
1886 tracing::info!("WhatsApp webhook verified successfully");
1887 return (StatusCode::OK, ch);
1888 }
1889 return (StatusCode::BAD_REQUEST, "Missing hub.challenge".to_string());
1890 }
1891
1892 tracing::warn!("WhatsApp webhook verification failed — token mismatch");
1893 (StatusCode::FORBIDDEN, "Forbidden".to_string())
1894}
1895
1896pub fn verify_whatsapp_signature(app_secret: &str, body: &[u8], signature_header: &str) -> bool {
1900 use hmac::{Hmac, Mac};
1901 use sha2::Sha256;
1902
1903 let Some(hex_sig) = signature_header.strip_prefix("sha256=") else {
1905 return false;
1906 };
1907
1908 let Ok(expected) = hex::decode(hex_sig) else {
1910 return false;
1911 };
1912
1913 let Ok(mut mac) = Hmac::<Sha256>::new_from_slice(app_secret.as_bytes()) else {
1915 return false;
1916 };
1917 mac.update(body);
1918
1919 mac.verify_slice(&expected).is_ok()
1921}
1922
1923async fn handle_whatsapp_message(
1925 State(state): State<AppState>,
1926 headers: HeaderMap,
1927 body: Bytes,
1928) -> impl IntoResponse {
1929 let Some(ref wa) = state.whatsapp else {
1930 return (
1931 StatusCode::NOT_FOUND,
1932 Json(serde_json::json!({"error": "WhatsApp not configured"})),
1933 );
1934 };
1935
1936 if let Some(ref app_secret) = state.whatsapp_app_secret {
1938 let signature = headers
1939 .get("X-Hub-Signature-256")
1940 .and_then(|v| v.to_str().ok())
1941 .unwrap_or("");
1942
1943 if !verify_whatsapp_signature(app_secret, &body, signature) {
1944 tracing::warn!(
1945 "WhatsApp webhook signature verification failed (signature: {})",
1946 if signature.is_empty() {
1947 "missing"
1948 } else {
1949 "invalid"
1950 }
1951 );
1952 return (
1953 StatusCode::UNAUTHORIZED,
1954 Json(serde_json::json!({"error": "Invalid signature"})),
1955 );
1956 }
1957 }
1958
1959 let Ok(payload) = serde_json::from_slice::<serde_json::Value>(&body) else {
1961 return (
1962 StatusCode::BAD_REQUEST,
1963 Json(serde_json::json!({"error": "Invalid JSON payload"})),
1964 );
1965 };
1966
1967 let messages = wa.parse_webhook_payload(&payload);
1969
1970 if messages.is_empty() {
1971 return (StatusCode::OK, Json(serde_json::json!({"status": "ok"})));
1973 }
1974
1975 for msg in &messages {
1977 tracing::info!(
1978 "WhatsApp message from {}: {}",
1979 msg.sender,
1980 truncate_with_ellipsis(&msg.content, 50)
1981 );
1982 let session_id = sender_session_id("whatsapp", msg);
1983
1984 if state.auto_save && !memory::should_skip_autosave_content(&msg.content) {
1986 let key = whatsapp_memory_key(msg);
1987 let _ = state
1988 .mem
1989 .store(
1990 &key,
1991 &msg.content,
1992 MemoryCategory::Conversation,
1993 Some(&session_id),
1994 )
1995 .await;
1996 }
1997
1998 match Box::pin(run_gateway_chat_with_tools(
1999 &state,
2000 &msg.content,
2001 Some(&session_id),
2002 ))
2003 .await
2004 {
2005 Ok(response) => {
2006 if let Err(e) = wa
2008 .send(&SendMessage::new(response, &msg.reply_target))
2009 .await
2010 {
2011 tracing::error!("Failed to send WhatsApp reply: {e}");
2012 }
2013 }
2014 Err(e) => {
2015 tracing::error!("LLM error for WhatsApp message: {e:#}");
2016 let _ = wa
2017 .send(&SendMessage::new(
2018 "Sorry, I couldn't process your message right now.",
2019 &msg.reply_target,
2020 ))
2021 .await;
2022 }
2023 }
2024 }
2025
2026 (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
2028}
2029
2030async fn handle_linq_webhook(
2032 State(state): State<AppState>,
2033 headers: HeaderMap,
2034 body: Bytes,
2035) -> impl IntoResponse {
2036 let Some(ref linq) = state.linq else {
2037 return (
2038 StatusCode::NOT_FOUND,
2039 Json(serde_json::json!({"error": "Linq not configured"})),
2040 );
2041 };
2042
2043 let body_str = String::from_utf8_lossy(&body);
2044
2045 if let Some(ref signing_secret) = state.linq_signing_secret {
2047 let timestamp = headers
2048 .get("X-Webhook-Timestamp")
2049 .and_then(|v| v.to_str().ok())
2050 .unwrap_or("");
2051
2052 let signature = headers
2053 .get("X-Webhook-Signature")
2054 .and_then(|v| v.to_str().ok())
2055 .unwrap_or("");
2056
2057 if !crate::channels::linq::verify_linq_signature(
2058 signing_secret,
2059 &body_str,
2060 timestamp,
2061 signature,
2062 ) {
2063 tracing::warn!(
2064 "Linq webhook signature verification failed (signature: {})",
2065 if signature.is_empty() {
2066 "missing"
2067 } else {
2068 "invalid"
2069 }
2070 );
2071 return (
2072 StatusCode::UNAUTHORIZED,
2073 Json(serde_json::json!({"error": "Invalid signature"})),
2074 );
2075 }
2076 }
2077
2078 let Ok(payload) = serde_json::from_slice::<serde_json::Value>(&body) else {
2080 return (
2081 StatusCode::BAD_REQUEST,
2082 Json(serde_json::json!({"error": "Invalid JSON payload"})),
2083 );
2084 };
2085
2086 let messages = linq.parse_webhook_payload(&payload);
2088
2089 if messages.is_empty() {
2090 return (StatusCode::OK, Json(serde_json::json!({"status": "ok"})));
2092 }
2093
2094 for msg in &messages {
2096 tracing::info!(
2097 "Linq message from {}: {}",
2098 msg.sender,
2099 truncate_with_ellipsis(&msg.content, 50)
2100 );
2101 let session_id = sender_session_id("linq", msg);
2102
2103 if state.auto_save && !memory::should_skip_autosave_content(&msg.content) {
2105 let key = linq_memory_key(msg);
2106 let _ = state
2107 .mem
2108 .store(
2109 &key,
2110 &msg.content,
2111 MemoryCategory::Conversation,
2112 Some(&session_id),
2113 )
2114 .await;
2115 }
2116
2117 match Box::pin(run_gateway_chat_with_tools(
2119 &state,
2120 &msg.content,
2121 Some(&session_id),
2122 ))
2123 .await
2124 {
2125 Ok(response) => {
2126 if let Err(e) = linq
2128 .send(&SendMessage::new(response, &msg.reply_target))
2129 .await
2130 {
2131 tracing::error!("Failed to send Linq reply: {e}");
2132 }
2133 }
2134 Err(e) => {
2135 tracing::error!("LLM error for Linq message: {e:#}");
2136 let _ = linq
2137 .send(&SendMessage::new(
2138 "Sorry, I couldn't process your message right now.",
2139 &msg.reply_target,
2140 ))
2141 .await;
2142 }
2143 }
2144 }
2145
2146 (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
2148}
2149
2150async fn handle_wati_verify(
2152 State(state): State<AppState>,
2153 Query(params): Query<WatiVerifyQuery>,
2154) -> impl IntoResponse {
2155 if state.wati.is_none() {
2156 return (StatusCode::NOT_FOUND, "WATI not configured".to_string());
2157 }
2158
2159 if let Some(challenge) = params.challenge {
2161 tracing::info!("WATI webhook verified successfully");
2162 return (StatusCode::OK, challenge);
2163 }
2164
2165 (StatusCode::BAD_REQUEST, "Missing hub.challenge".to_string())
2166}
2167
2168#[derive(Debug, serde::Deserialize)]
2169pub struct WatiVerifyQuery {
2170 #[serde(rename = "hub.challenge")]
2171 pub challenge: Option<String>,
2172}
2173
2174async fn handle_wati_webhook(State(state): State<AppState>, body: Bytes) -> impl IntoResponse {
2176 let Some(ref wati) = state.wati else {
2177 return (
2178 StatusCode::NOT_FOUND,
2179 Json(serde_json::json!({"error": "WATI not configured"})),
2180 );
2181 };
2182
2183 let Ok(payload) = serde_json::from_slice::<serde_json::Value>(&body) else {
2185 return (
2186 StatusCode::BAD_REQUEST,
2187 Json(serde_json::json!({"error": "Invalid JSON payload"})),
2188 );
2189 };
2190
2191 let msg_type = payload.get("type").and_then(|v| v.as_str()).unwrap_or("");
2193
2194 let messages = if matches!(msg_type, "audio" | "voice") {
2195 if let Some(transcript) = wati.try_transcribe_audio(&payload).await {
2197 wati.parse_audio_as_message(&payload, transcript)
2198 } else {
2199 vec![]
2200 }
2201 } else {
2202 wati.parse_webhook_payload(&payload)
2203 };
2204
2205 if messages.is_empty() {
2206 return (StatusCode::OK, Json(serde_json::json!({"status": "ok"})));
2207 }
2208
2209 for msg in &messages {
2211 tracing::info!(
2212 "WATI message from {}: {}",
2213 msg.sender,
2214 truncate_with_ellipsis(&msg.content, 50)
2215 );
2216 let session_id = sender_session_id("wati", msg);
2217
2218 if state.auto_save && !memory::should_skip_autosave_content(&msg.content) {
2220 let key = wati_memory_key(msg);
2221 let _ = state
2222 .mem
2223 .store(
2224 &key,
2225 &msg.content,
2226 MemoryCategory::Conversation,
2227 Some(&session_id),
2228 )
2229 .await;
2230 }
2231
2232 match Box::pin(run_gateway_chat_with_tools(
2234 &state,
2235 &msg.content,
2236 Some(&session_id),
2237 ))
2238 .await
2239 {
2240 Ok(response) => {
2241 if let Err(e) = wati
2243 .send(&SendMessage::new(response, &msg.reply_target))
2244 .await
2245 {
2246 tracing::error!("Failed to send WATI reply: {e}");
2247 }
2248 }
2249 Err(e) => {
2250 tracing::error!("LLM error for WATI message: {e:#}");
2251 let _ = wati
2252 .send(&SendMessage::new(
2253 "Sorry, I couldn't process your message right now.",
2254 &msg.reply_target,
2255 ))
2256 .await;
2257 }
2258 }
2259 }
2260
2261 (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
2263}
2264
2265async fn handle_nextcloud_talk_webhook(
2267 State(state): State<AppState>,
2268 headers: HeaderMap,
2269 body: Bytes,
2270) -> impl IntoResponse {
2271 let Some(ref nextcloud_talk) = state.nextcloud_talk else {
2272 return (
2273 StatusCode::NOT_FOUND,
2274 Json(serde_json::json!({"error": "Nextcloud Talk not configured"})),
2275 );
2276 };
2277
2278 let body_str = String::from_utf8_lossy(&body);
2279
2280 if let Some(ref webhook_secret) = state.nextcloud_talk_webhook_secret {
2282 let random = headers
2283 .get("X-Nextcloud-Talk-Random")
2284 .and_then(|v| v.to_str().ok())
2285 .unwrap_or("");
2286
2287 let signature = headers
2288 .get("X-Nextcloud-Talk-Signature")
2289 .and_then(|v| v.to_str().ok())
2290 .unwrap_or("");
2291
2292 if !crate::channels::nextcloud_talk::verify_nextcloud_talk_signature(
2293 webhook_secret,
2294 random,
2295 &body_str,
2296 signature,
2297 ) {
2298 tracing::warn!(
2299 "Nextcloud Talk webhook signature verification failed (signature: {})",
2300 if signature.is_empty() {
2301 "missing"
2302 } else {
2303 "invalid"
2304 }
2305 );
2306 return (
2307 StatusCode::UNAUTHORIZED,
2308 Json(serde_json::json!({"error": "Invalid signature"})),
2309 );
2310 }
2311 }
2312
2313 let Ok(payload) = serde_json::from_slice::<serde_json::Value>(&body) else {
2315 return (
2316 StatusCode::BAD_REQUEST,
2317 Json(serde_json::json!({"error": "Invalid JSON payload"})),
2318 );
2319 };
2320
2321 let messages = nextcloud_talk.parse_webhook_payload(&payload);
2323 if messages.is_empty() {
2324 return (StatusCode::OK, Json(serde_json::json!({"status": "ok"})));
2326 }
2327
2328 for msg in &messages {
2329 tracing::info!(
2330 "Nextcloud Talk message from {}: {}",
2331 msg.sender,
2332 truncate_with_ellipsis(&msg.content, 50)
2333 );
2334 let session_id = sender_session_id("nextcloud_talk", msg);
2335
2336 if state.auto_save && !memory::should_skip_autosave_content(&msg.content) {
2337 let key = nextcloud_talk_memory_key(msg);
2338 let _ = state
2339 .mem
2340 .store(
2341 &key,
2342 &msg.content,
2343 MemoryCategory::Conversation,
2344 Some(&session_id),
2345 )
2346 .await;
2347 }
2348
2349 match Box::pin(run_gateway_chat_with_tools(
2350 &state,
2351 &msg.content,
2352 Some(&session_id),
2353 ))
2354 .await
2355 {
2356 Ok(response) => {
2357 if let Err(e) = nextcloud_talk
2358 .send(&SendMessage::new(response, &msg.reply_target))
2359 .await
2360 {
2361 tracing::error!("Failed to send Nextcloud Talk reply: {e}");
2362 }
2363 }
2364 Err(e) => {
2365 tracing::error!("LLM error for Nextcloud Talk message: {e:#}");
2366 let _ = nextcloud_talk
2367 .send(&SendMessage::new(
2368 "Sorry, I couldn't process your message right now.",
2369 &msg.reply_target,
2370 ))
2371 .await;
2372 }
2373 }
2374 }
2375
2376 (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
2377}
2378
2379const GMAIL_WEBHOOK_MAX_BODY: usize = 1024 * 1024;
2382
2383async fn handle_gmail_push_webhook(
2385 State(state): State<AppState>,
2386 headers: HeaderMap,
2387 body: Bytes,
2388) -> impl IntoResponse {
2389 let Some(ref gmail_push) = state.gmail_push else {
2390 return (
2391 StatusCode::NOT_FOUND,
2392 Json(serde_json::json!({"error": "Gmail push not configured"})),
2393 );
2394 };
2395
2396 if body.len() > GMAIL_WEBHOOK_MAX_BODY {
2398 return (
2399 StatusCode::PAYLOAD_TOO_LARGE,
2400 Json(serde_json::json!({"error": "Request body too large"})),
2401 );
2402 }
2403
2404 let secret = gmail_push.resolve_webhook_secret();
2406 if !secret.is_empty() {
2407 let provided = headers
2408 .get(axum::http::header::AUTHORIZATION)
2409 .and_then(|v| v.to_str().ok())
2410 .and_then(|auth| auth.strip_prefix("Bearer "))
2411 .unwrap_or("");
2412
2413 if provided != secret {
2414 tracing::warn!("Gmail push webhook: unauthorized request");
2415 return (
2416 StatusCode::UNAUTHORIZED,
2417 Json(serde_json::json!({"error": "Unauthorized"})),
2418 );
2419 }
2420 }
2421
2422 let body_str = String::from_utf8_lossy(&body);
2423 let envelope: crate::channels::gmail_push::PubSubEnvelope =
2424 match serde_json::from_str(&body_str) {
2425 Ok(e) => e,
2426 Err(e) => {
2427 tracing::warn!("Gmail push webhook: invalid payload: {e}");
2428 return (
2429 StatusCode::BAD_REQUEST,
2430 Json(serde_json::json!({"error": "Invalid Pub/Sub envelope"})),
2431 );
2432 }
2433 };
2434
2435 let channel = Arc::clone(gmail_push);
2437 tokio::spawn(async move {
2438 if let Err(e) = channel.handle_notification(&envelope).await {
2439 tracing::error!("Gmail push notification processing failed: {e:#}");
2440 }
2441 });
2442
2443 (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
2445}
2446
2447#[derive(serde::Serialize)]
2453struct AdminResponse {
2454 success: bool,
2455 message: String,
2456}
2457
2458fn require_localhost(peer: &SocketAddr) -> Result<(), (StatusCode, Json<serde_json::Value>)> {
2460 if peer.ip().is_loopback() {
2461 Ok(())
2462 } else {
2463 Err((
2464 StatusCode::FORBIDDEN,
2465 Json(serde_json::json!({
2466 "error": "Admin endpoints are restricted to localhost"
2467 })),
2468 ))
2469 }
2470}
2471
2472async fn handle_admin_shutdown(
2474 State(state): State<AppState>,
2475 ConnectInfo(peer): ConnectInfo<SocketAddr>,
2476) -> Result<impl IntoResponse, (StatusCode, Json<serde_json::Value>)> {
2477 require_localhost(&peer)?;
2478 tracing::info!("🔌 Admin shutdown request received — initiating graceful shutdown");
2479
2480 let body = AdminResponse {
2481 success: true,
2482 message: "Gateway shutdown initiated".to_string(),
2483 };
2484
2485 let _ = state.shutdown_tx.send(true);
2486
2487 Ok((StatusCode::OK, Json(body)))
2488}
2489
2490async fn handle_admin_paircode(
2492 State(state): State<AppState>,
2493 ConnectInfo(peer): ConnectInfo<SocketAddr>,
2494) -> Result<impl IntoResponse, (StatusCode, Json<serde_json::Value>)> {
2495 require_localhost(&peer)?;
2496 let code = state.pairing.pairing_code();
2497
2498 let body = if let Some(c) = code {
2499 serde_json::json!({
2500 "success": true,
2501 "pairing_required": state.pairing.require_pairing(),
2502 "pairing_code": c,
2503 "message": "Use this one-time code to pair"
2504 })
2505 } else {
2506 serde_json::json!({
2507 "success": true,
2508 "pairing_required": state.pairing.require_pairing(),
2509 "pairing_code": null,
2510 "message": if state.pairing.require_pairing() {
2511 "Pairing is active but no new code available (already paired or code expired)"
2512 } else {
2513 "Pairing is disabled for this gateway"
2514 }
2515 })
2516 };
2517
2518 Ok((StatusCode::OK, Json(body)))
2519}
2520
2521async fn handle_admin_paircode_new(
2523 State(state): State<AppState>,
2524 ConnectInfo(peer): ConnectInfo<SocketAddr>,
2525) -> Result<impl IntoResponse, (StatusCode, Json<serde_json::Value>)> {
2526 require_localhost(&peer)?;
2527 match state.pairing.generate_new_pairing_code() {
2528 Some(code) => {
2529 tracing::info!("🔐 New pairing code generated via admin endpoint");
2530 let body = serde_json::json!({
2531 "success": true,
2532 "pairing_required": state.pairing.require_pairing(),
2533 "pairing_code": code,
2534 "message": "New pairing code generated — use this one-time code to pair"
2535 });
2536 Ok((StatusCode::OK, Json(body)))
2537 }
2538 None => {
2539 let body = serde_json::json!({
2540 "success": false,
2541 "pairing_required": false,
2542 "pairing_code": null,
2543 "message": "Pairing is disabled for this gateway"
2544 });
2545 Ok((StatusCode::BAD_REQUEST, Json(body)))
2546 }
2547 }
2548}
2549
2550async fn handle_pair_code(
2558 State(state): State<AppState>,
2559 ConnectInfo(peer): ConnectInfo<SocketAddr>,
2560) -> Result<impl IntoResponse, (StatusCode, Json<serde_json::Value>)> {
2561 require_localhost(&peer)?;
2562
2563 let require = state.pairing.require_pairing();
2564 let is_paired = state.pairing.is_paired();
2565
2566 let code = if require && !is_paired {
2567 state.pairing.pairing_code()
2568 } else {
2569 None
2570 };
2571
2572 let body = serde_json::json!({
2573 "success": true,
2574 "pairing_required": require,
2575 "pairing_code": code,
2576 });
2577
2578 Ok((StatusCode::OK, Json(body)))
2579}
2580
2581#[cfg(test)]
2582mod tests {
2583 use super::*;
2584 use crate::channels::traits::ChannelMessage;
2585 use crate::memory::{Memory, MemoryCategory, MemoryEntry};
2586 use crate::providers::Provider;
2587 use async_trait::async_trait;
2588 use axum::http::HeaderValue;
2589 use axum::response::IntoResponse;
2590 use http_body_util::BodyExt;
2591 use parking_lot::Mutex;
2592 use std::sync::atomic::{AtomicUsize, Ordering};
2593
2594 fn generate_test_secret() -> String {
2596 let bytes: [u8; 32] = rand::random();
2597 hex::encode(bytes)
2598 }
2599
2600 #[test]
2601 fn security_body_limit_is_64kb() {
2602 assert_eq!(MAX_BODY_SIZE, 65_536);
2603 }
2604
2605 #[test]
2606 fn security_timeout_default_is_30_seconds() {
2607 assert_eq!(REQUEST_TIMEOUT_SECS, 30);
2608 }
2609
2610 #[test]
2611 fn gateway_timeout_falls_back_to_default() {
2612 unsafe { std::env::remove_var("CONSTRUCT_GATEWAY_TIMEOUT_SECS") };
2615 assert_eq!(gateway_request_timeout_secs(), 30);
2616 }
2617
2618 #[test]
2619 fn webhook_body_requires_message_field() {
2620 let valid = r#"{"message": "hello"}"#;
2621 let parsed: Result<WebhookBody, _> = serde_json::from_str(valid);
2622 assert!(parsed.is_ok());
2623 assert_eq!(parsed.unwrap().message, "hello");
2624
2625 let missing = r#"{"other": "field"}"#;
2626 let parsed: Result<WebhookBody, _> = serde_json::from_str(missing);
2627 assert!(parsed.is_err());
2628 }
2629
2630 #[test]
2631 fn whatsapp_query_fields_are_optional() {
2632 let q = WhatsAppVerifyQuery {
2633 mode: None,
2634 verify_token: None,
2635 challenge: None,
2636 };
2637 assert!(q.mode.is_none());
2638 }
2639
2640 #[test]
2641 fn app_state_is_clone() {
2642 fn assert_clone<T: Clone>() {}
2643 assert_clone::<AppState>();
2644 }
2645
2646 #[tokio::test]
2647 async fn metrics_endpoint_returns_hint_when_prometheus_is_disabled() {
2648 let state = AppState {
2649 config: Arc::new(Mutex::new(Config::default())),
2650 provider: Arc::new(MockProvider::default()),
2651 model: "test-model".into(),
2652 temperature: 0.0,
2653 mem: Arc::new(MockMemory),
2654 auto_save: false,
2655 webhook_secret_hash: None,
2656 pairing: Arc::new(PairingGuard::new(false, &[])),
2657 trust_forwarded_headers: false,
2658 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
2659 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
2660 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
2661 whatsapp: None,
2662 whatsapp_app_secret: None,
2663 linq: None,
2664 linq_signing_secret: None,
2665 nextcloud_talk: None,
2666 nextcloud_talk_webhook_secret: None,
2667 wati: None,
2668 gmail_push: None,
2669 observer: Arc::new(crate::observability::NoopObserver),
2670 tools_registry: Arc::new(Vec::new()),
2671 cost_tracker: None,
2672 audit_logger: None,
2673 event_tx: tokio::sync::broadcast::channel(16).0,
2674 shutdown_tx: tokio::sync::watch::channel(false).0,
2675 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
2676 path_prefix: String::new(),
2677 session_backend: None,
2678 session_queue: std::sync::Arc::new(
2679 crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
2680 ),
2681 device_registry: None,
2682 pending_pairings: None,
2683 canvas_store: CanvasStore::new(),
2684 mcp_registry: None,
2685 approval_registry: approval_registry::global(),
2686 mcp_local_url: None,
2687 #[cfg(feature = "webauthn")]
2688 webauthn: None,
2689 };
2690
2691 let response = handle_metrics(State(state)).await.into_response();
2692 assert_eq!(response.status(), StatusCode::OK);
2693 assert_eq!(
2694 response
2695 .headers()
2696 .get(header::CONTENT_TYPE)
2697 .and_then(|value| value.to_str().ok()),
2698 Some(PROMETHEUS_CONTENT_TYPE)
2699 );
2700
2701 let body = response.into_body().collect().await.unwrap().to_bytes();
2702 let text = String::from_utf8(body.to_vec()).unwrap();
2703 assert!(text.contains("Prometheus backend not enabled"));
2704 }
2705
2706 #[cfg(feature = "observability-prometheus")]
2707 #[tokio::test]
2708 async fn metrics_endpoint_renders_prometheus_output() {
2709 let event_tx = tokio::sync::broadcast::channel(16).0;
2710 let wrapped = sse::BroadcastObserver::new(
2711 Box::new(crate::observability::PrometheusObserver::new()),
2712 event_tx.clone(),
2713 );
2714 crate::observability::Observer::record_event(
2715 &wrapped,
2716 &crate::observability::ObserverEvent::HeartbeatTick,
2717 );
2718
2719 let observer: Arc<dyn crate::observability::Observer> = Arc::new(wrapped);
2720 let state = AppState {
2721 config: Arc::new(Mutex::new(Config::default())),
2722 provider: Arc::new(MockProvider::default()),
2723 model: "test-model".into(),
2724 temperature: 0.0,
2725 mem: Arc::new(MockMemory),
2726 auto_save: false,
2727 webhook_secret_hash: None,
2728 pairing: Arc::new(PairingGuard::new(false, &[])),
2729 trust_forwarded_headers: false,
2730 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
2731 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
2732 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
2733 whatsapp: None,
2734 whatsapp_app_secret: None,
2735 linq: None,
2736 linq_signing_secret: None,
2737 nextcloud_talk: None,
2738 nextcloud_talk_webhook_secret: None,
2739 wati: None,
2740 gmail_push: None,
2741 observer,
2742 tools_registry: Arc::new(Vec::new()),
2743 cost_tracker: None,
2744 audit_logger: None,
2745 event_tx,
2746 shutdown_tx: tokio::sync::watch::channel(false).0,
2747 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
2748 path_prefix: String::new(),
2749 session_backend: None,
2750 session_queue: std::sync::Arc::new(
2751 crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
2752 ),
2753 device_registry: None,
2754 pending_pairings: None,
2755 canvas_store: CanvasStore::new(),
2756 mcp_registry: None,
2757 approval_registry: approval_registry::global(),
2758 mcp_local_url: None,
2759 #[cfg(feature = "webauthn")]
2760 webauthn: None,
2761 };
2762
2763 let response = handle_metrics(State(state)).await.into_response();
2764 assert_eq!(response.status(), StatusCode::OK);
2765
2766 let body = response.into_body().collect().await.unwrap().to_bytes();
2767 let text = String::from_utf8(body.to_vec()).unwrap();
2768 assert!(text.contains("construct_heartbeat_ticks_total 1"));
2769 }
2770
2771 #[test]
2772 fn gateway_rate_limiter_blocks_after_limit() {
2773 let limiter = GatewayRateLimiter::new(2, 2, 100);
2774 assert!(limiter.allow_pair("127.0.0.1"));
2775 assert!(limiter.allow_pair("127.0.0.1"));
2776 assert!(!limiter.allow_pair("127.0.0.1"));
2777 }
2778
2779 #[test]
2780 fn rate_limiter_sweep_removes_stale_entries() {
2781 let limiter = SlidingWindowRateLimiter::new(10, Duration::from_secs(60), 100);
2782 assert!(limiter.allow("ip-1"));
2784 assert!(limiter.allow("ip-2"));
2785 assert!(limiter.allow("ip-3"));
2786
2787 {
2788 let guard = limiter.requests.lock();
2789 assert_eq!(guard.0.len(), 3);
2790 }
2791
2792 {
2794 let mut guard = limiter.requests.lock();
2795 guard.1 = Instant::now()
2796 .checked_sub(Duration::from_secs(RATE_LIMITER_SWEEP_INTERVAL_SECS + 1))
2797 .unwrap();
2798 guard.0.get_mut("ip-2").unwrap().clear();
2800 guard.0.get_mut("ip-3").unwrap().clear();
2801 }
2802
2803 assert!(limiter.allow("ip-1"));
2805
2806 {
2807 let guard = limiter.requests.lock();
2808 assert_eq!(guard.0.len(), 1, "Stale entries should have been swept");
2809 assert!(guard.0.contains_key("ip-1"));
2810 }
2811 }
2812
2813 #[test]
2814 fn rate_limiter_zero_limit_always_allows() {
2815 let limiter = SlidingWindowRateLimiter::new(0, Duration::from_secs(60), 10);
2816 for _ in 0..100 {
2817 assert!(limiter.allow("any-key"));
2818 }
2819 }
2820
2821 #[test]
2822 fn idempotency_store_rejects_duplicate_key() {
2823 let store = IdempotencyStore::new(Duration::from_secs(30), 10);
2824 assert!(store.record_if_new("req-1"));
2825 assert!(!store.record_if_new("req-1"));
2826 assert!(store.record_if_new("req-2"));
2827 }
2828
2829 #[test]
2830 fn rate_limiter_bounded_cardinality_evicts_oldest_key() {
2831 let limiter = SlidingWindowRateLimiter::new(5, Duration::from_secs(60), 2);
2832 assert!(limiter.allow("ip-1"));
2833 assert!(limiter.allow("ip-2"));
2834 assert!(limiter.allow("ip-3"));
2835
2836 let guard = limiter.requests.lock();
2837 assert_eq!(guard.0.len(), 2);
2838 assert!(guard.0.contains_key("ip-2"));
2839 assert!(guard.0.contains_key("ip-3"));
2840 }
2841
2842 #[test]
2843 fn idempotency_store_bounded_cardinality_evicts_oldest_key() {
2844 let store = IdempotencyStore::new(Duration::from_secs(300), 2);
2845 assert!(store.record_if_new("k1"));
2846 std::thread::sleep(Duration::from_millis(2));
2847 assert!(store.record_if_new("k2"));
2848 std::thread::sleep(Duration::from_millis(2));
2849 assert!(store.record_if_new("k3"));
2850
2851 let keys = store.keys.lock();
2852 assert_eq!(keys.len(), 2);
2853 assert!(!keys.contains_key("k1"));
2854 assert!(keys.contains_key("k2"));
2855 assert!(keys.contains_key("k3"));
2856 }
2857
2858 #[test]
2859 fn client_key_defaults_to_peer_addr_when_untrusted_proxy_mode() {
2860 let peer = SocketAddr::from(([10, 0, 0, 5], 42617));
2861 let mut headers = HeaderMap::new();
2862 headers.insert(
2863 "X-Forwarded-For",
2864 HeaderValue::from_static("198.51.100.10, 203.0.113.11"),
2865 );
2866
2867 let key = client_key_from_request(Some(peer), &headers, false);
2868 assert_eq!(key, "10.0.0.5");
2869 }
2870
2871 #[test]
2872 fn client_key_uses_forwarded_ip_only_in_trusted_proxy_mode() {
2873 let peer = SocketAddr::from(([10, 0, 0, 5], 42617));
2876 let mut headers = HeaderMap::new();
2877 headers.insert(
2878 "X-Forwarded-For",
2879 HeaderValue::from_static("198.51.100.10, 203.0.113.11"),
2880 );
2881
2882 let key = client_key_from_request(Some(peer), &headers, true);
2883 assert_eq!(key, "203.0.113.11");
2884 }
2885
2886 #[test]
2887 fn client_key_spoofed_leftmost_xff_does_not_bypass_trust() {
2888 let peer = SocketAddr::from(([10, 0, 0, 5], 42617));
2893 let mut headers = HeaderMap::new();
2894 headers.insert(
2895 "X-Forwarded-For",
2896 HeaderValue::from_static("127.0.0.1, 203.0.113.11"),
2897 );
2898
2899 let key = client_key_from_request(Some(peer), &headers, true);
2900 assert_eq!(key, "203.0.113.11");
2901 }
2902
2903 #[test]
2904 fn client_key_falls_back_to_peer_when_forwarded_header_invalid() {
2905 let peer = SocketAddr::from(([10, 0, 0, 5], 42617));
2906 let mut headers = HeaderMap::new();
2907 headers.insert("X-Forwarded-For", HeaderValue::from_static("garbage-value"));
2908
2909 let key = client_key_from_request(Some(peer), &headers, true);
2910 assert_eq!(key, "10.0.0.5");
2911 }
2912
2913 #[test]
2914 fn normalize_max_keys_uses_fallback_for_zero() {
2915 assert_eq!(normalize_max_keys(0, 10_000), 10_000);
2916 assert_eq!(normalize_max_keys(0, 0), 1);
2917 }
2918
2919 #[test]
2920 fn normalize_max_keys_preserves_nonzero_values() {
2921 assert_eq!(normalize_max_keys(2_048, 10_000), 2_048);
2922 assert_eq!(normalize_max_keys(1, 10_000), 1);
2923 }
2924
2925 #[tokio::test]
2926 async fn persist_pairing_tokens_writes_config_tokens() {
2927 let temp = tempfile::tempdir().unwrap();
2928 let config_path = temp.path().join("config.toml");
2929 let workspace_path = temp.path().join("workspace");
2930
2931 let mut config = Config::default();
2932 config.config_path = config_path.clone();
2933 config.workspace_dir = workspace_path;
2934 config.save().await.unwrap();
2935
2936 let guard = PairingGuard::new(true, &[]);
2937 let code = guard.pairing_code().unwrap();
2938 let token = guard.try_pair(&code, "test_client").await.unwrap().unwrap();
2939 assert!(guard.is_authenticated(&token));
2940
2941 let shared_config = Arc::new(Mutex::new(config));
2942 Box::pin(persist_pairing_tokens(shared_config.clone(), &guard))
2943 .await
2944 .unwrap();
2945
2946 let plaintext = {
2948 let in_memory = shared_config.lock();
2949 assert_eq!(in_memory.gateway.paired_tokens.len(), 1);
2950 in_memory.gateway.paired_tokens[0].clone()
2951 };
2952 assert_eq!(plaintext.len(), 64);
2953 assert!(plaintext.chars().all(|c: char| c.is_ascii_hexdigit()));
2954
2955 let saved = tokio::fs::read_to_string(config_path).await.unwrap();
2957 let raw_parsed: Config = toml::from_str(&saved).unwrap();
2958 assert_eq!(raw_parsed.gateway.paired_tokens.len(), 1);
2959 let on_disk = &raw_parsed.gateway.paired_tokens[0];
2960 assert!(
2961 crate::security::SecretStore::is_encrypted(on_disk),
2962 "paired_token should be encrypted on disk"
2963 );
2964 }
2965
2966 #[test]
2967 fn webhook_memory_key_is_unique() {
2968 let key1 = webhook_memory_key();
2969 let key2 = webhook_memory_key();
2970
2971 assert!(key1.starts_with("webhook_msg_"));
2972 assert!(key2.starts_with("webhook_msg_"));
2973 assert_ne!(key1, key2);
2974 }
2975
2976 #[test]
2977 fn whatsapp_memory_key_includes_sender_and_message_id() {
2978 let msg = ChannelMessage {
2979 id: "wamid-123".into(),
2980 sender: "+1234567890".into(),
2981 reply_target: "+1234567890".into(),
2982 content: "hello".into(),
2983 channel: "whatsapp".into(),
2984 timestamp: 1,
2985 thread_ts: None,
2986 interruption_scope_id: None,
2987 attachments: vec![],
2988 };
2989
2990 let key = whatsapp_memory_key(&msg);
2991 assert_eq!(key, "whatsapp_+1234567890_wamid-123");
2992 }
2993
2994 #[derive(Default)]
2995 struct MockMemory;
2996
2997 #[async_trait]
2998 impl Memory for MockMemory {
2999 fn name(&self) -> &str {
3000 "mock"
3001 }
3002
3003 async fn store(
3004 &self,
3005 _key: &str,
3006 _content: &str,
3007 _category: MemoryCategory,
3008 _session_id: Option<&str>,
3009 ) -> anyhow::Result<()> {
3010 Ok(())
3011 }
3012
3013 async fn recall(
3014 &self,
3015 _query: &str,
3016 _limit: usize,
3017 _session_id: Option<&str>,
3018 _since: Option<&str>,
3019 _until: Option<&str>,
3020 ) -> anyhow::Result<Vec<MemoryEntry>> {
3021 Ok(Vec::new())
3022 }
3023
3024 async fn get(&self, _key: &str) -> anyhow::Result<Option<MemoryEntry>> {
3025 Ok(None)
3026 }
3027
3028 async fn list(
3029 &self,
3030 _category: Option<&MemoryCategory>,
3031 _session_id: Option<&str>,
3032 ) -> anyhow::Result<Vec<MemoryEntry>> {
3033 Ok(Vec::new())
3034 }
3035
3036 async fn forget(&self, _key: &str) -> anyhow::Result<bool> {
3037 Ok(false)
3038 }
3039
3040 async fn count(&self) -> anyhow::Result<usize> {
3041 Ok(0)
3042 }
3043
3044 async fn health_check(&self) -> bool {
3045 true
3046 }
3047 }
3048
3049 #[derive(Default)]
3050 struct MockProvider {
3051 calls: AtomicUsize,
3052 }
3053
3054 #[async_trait]
3055 impl Provider for MockProvider {
3056 async fn chat_with_system(
3057 &self,
3058 _system_prompt: Option<&str>,
3059 _message: &str,
3060 _model: &str,
3061 _temperature: f64,
3062 ) -> anyhow::Result<String> {
3063 self.calls.fetch_add(1, Ordering::SeqCst);
3064 Ok("ok".into())
3065 }
3066 }
3067
3068 #[derive(Default)]
3069 struct TrackingMemory {
3070 keys: Mutex<Vec<String>>,
3071 }
3072
3073 #[async_trait]
3074 impl Memory for TrackingMemory {
3075 fn name(&self) -> &str {
3076 "tracking"
3077 }
3078
3079 async fn store(
3080 &self,
3081 key: &str,
3082 _content: &str,
3083 _category: MemoryCategory,
3084 _session_id: Option<&str>,
3085 ) -> anyhow::Result<()> {
3086 self.keys.lock().push(key.to_string());
3087 Ok(())
3088 }
3089
3090 async fn recall(
3091 &self,
3092 _query: &str,
3093 _limit: usize,
3094 _session_id: Option<&str>,
3095 _since: Option<&str>,
3096 _until: Option<&str>,
3097 ) -> anyhow::Result<Vec<MemoryEntry>> {
3098 Ok(Vec::new())
3099 }
3100
3101 async fn get(&self, _key: &str) -> anyhow::Result<Option<MemoryEntry>> {
3102 Ok(None)
3103 }
3104
3105 async fn list(
3106 &self,
3107 _category: Option<&MemoryCategory>,
3108 _session_id: Option<&str>,
3109 ) -> anyhow::Result<Vec<MemoryEntry>> {
3110 Ok(Vec::new())
3111 }
3112
3113 async fn forget(&self, _key: &str) -> anyhow::Result<bool> {
3114 Ok(false)
3115 }
3116
3117 async fn count(&self) -> anyhow::Result<usize> {
3118 let size = self.keys.lock().len();
3119 Ok(size)
3120 }
3121
3122 async fn health_check(&self) -> bool {
3123 true
3124 }
3125 }
3126
3127 fn test_connect_info() -> ConnectInfo<SocketAddr> {
3128 ConnectInfo(SocketAddr::from(([127, 0, 0, 1], 30_300)))
3129 }
3130
3131 #[tokio::test]
3132 async fn webhook_idempotency_skips_duplicate_provider_calls() {
3133 let provider_impl = Arc::new(MockProvider::default());
3134 let provider: Arc<dyn Provider> = provider_impl.clone();
3135 let memory: Arc<dyn Memory> = Arc::new(MockMemory);
3136
3137 let state = AppState {
3138 config: Arc::new(Mutex::new(Config::default())),
3139 provider,
3140 model: "test-model".into(),
3141 temperature: 0.0,
3142 mem: memory,
3143 auto_save: false,
3144 webhook_secret_hash: None,
3145 pairing: Arc::new(PairingGuard::new(false, &[])),
3146 trust_forwarded_headers: false,
3147 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3148 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3149 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3150 whatsapp: None,
3151 whatsapp_app_secret: None,
3152 linq: None,
3153 linq_signing_secret: None,
3154 nextcloud_talk: None,
3155 nextcloud_talk_webhook_secret: None,
3156 wati: None,
3157 gmail_push: None,
3158 observer: Arc::new(crate::observability::NoopObserver),
3159 tools_registry: Arc::new(Vec::new()),
3160 cost_tracker: None,
3161 audit_logger: None,
3162 event_tx: tokio::sync::broadcast::channel(16).0,
3163 shutdown_tx: tokio::sync::watch::channel(false).0,
3164 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3165 path_prefix: String::new(),
3166 session_backend: None,
3167 session_queue: std::sync::Arc::new(
3168 crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3169 ),
3170 device_registry: None,
3171 pending_pairings: None,
3172 canvas_store: CanvasStore::new(),
3173 mcp_registry: None,
3174 approval_registry: approval_registry::global(),
3175 mcp_local_url: None,
3176 #[cfg(feature = "webauthn")]
3177 webauthn: None,
3178 };
3179
3180 let mut headers = HeaderMap::new();
3181 headers.insert("X-Idempotency-Key", HeaderValue::from_static("abc-123"));
3182
3183 let body = Ok(Json(WebhookBody {
3184 message: "hello".into(),
3185 }));
3186 let first = handle_webhook(
3187 State(state.clone()),
3188 test_connect_info(),
3189 headers.clone(),
3190 body,
3191 )
3192 .await
3193 .into_response();
3194 assert_eq!(first.status(), StatusCode::OK);
3195
3196 let body = Ok(Json(WebhookBody {
3197 message: "hello".into(),
3198 }));
3199 let second = handle_webhook(State(state), test_connect_info(), headers, body)
3200 .await
3201 .into_response();
3202 assert_eq!(second.status(), StatusCode::OK);
3203
3204 let payload = second.into_body().collect().await.unwrap().to_bytes();
3205 let parsed: serde_json::Value = serde_json::from_slice(&payload).unwrap();
3206 assert_eq!(parsed["status"], "duplicate");
3207 assert_eq!(parsed["idempotent"], true);
3208 assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 1);
3209 }
3210
3211 #[tokio::test]
3212 async fn webhook_autosave_stores_distinct_keys_per_request() {
3213 let provider_impl = Arc::new(MockProvider::default());
3214 let provider: Arc<dyn Provider> = provider_impl.clone();
3215
3216 let tracking_impl = Arc::new(TrackingMemory::default());
3217 let memory: Arc<dyn Memory> = tracking_impl.clone();
3218
3219 let state = AppState {
3220 config: Arc::new(Mutex::new(Config::default())),
3221 provider,
3222 model: "test-model".into(),
3223 temperature: 0.0,
3224 mem: memory,
3225 auto_save: true,
3226 webhook_secret_hash: None,
3227 pairing: Arc::new(PairingGuard::new(false, &[])),
3228 trust_forwarded_headers: false,
3229 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3230 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3231 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3232 whatsapp: None,
3233 whatsapp_app_secret: None,
3234 linq: None,
3235 linq_signing_secret: None,
3236 nextcloud_talk: None,
3237 nextcloud_talk_webhook_secret: None,
3238 wati: None,
3239 gmail_push: None,
3240 observer: Arc::new(crate::observability::NoopObserver),
3241 tools_registry: Arc::new(Vec::new()),
3242 cost_tracker: None,
3243 audit_logger: None,
3244 event_tx: tokio::sync::broadcast::channel(16).0,
3245 shutdown_tx: tokio::sync::watch::channel(false).0,
3246 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3247 path_prefix: String::new(),
3248 session_backend: None,
3249 session_queue: std::sync::Arc::new(
3250 crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3251 ),
3252 device_registry: None,
3253 pending_pairings: None,
3254 canvas_store: CanvasStore::new(),
3255 mcp_registry: None,
3256 approval_registry: approval_registry::global(),
3257 mcp_local_url: None,
3258 #[cfg(feature = "webauthn")]
3259 webauthn: None,
3260 };
3261
3262 let headers = HeaderMap::new();
3263
3264 let body1 = Ok(Json(WebhookBody {
3265 message: "hello one".into(),
3266 }));
3267 let first = handle_webhook(
3268 State(state.clone()),
3269 test_connect_info(),
3270 headers.clone(),
3271 body1,
3272 )
3273 .await
3274 .into_response();
3275 assert_eq!(first.status(), StatusCode::OK);
3276
3277 let body2 = Ok(Json(WebhookBody {
3278 message: "hello two".into(),
3279 }));
3280 let second = handle_webhook(State(state), test_connect_info(), headers, body2)
3281 .await
3282 .into_response();
3283 assert_eq!(second.status(), StatusCode::OK);
3284
3285 let keys = tracking_impl.keys.lock().clone();
3286 assert_eq!(keys.len(), 2);
3287 assert_ne!(keys[0], keys[1]);
3288 assert!(keys[0].starts_with("webhook_msg_"));
3289 assert!(keys[1].starts_with("webhook_msg_"));
3290 assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 2);
3291 }
3292
3293 #[test]
3294 fn webhook_secret_hash_is_deterministic_and_nonempty() {
3295 let secret_a = generate_test_secret();
3296 let secret_b = generate_test_secret();
3297 let one = hash_webhook_secret(&secret_a);
3298 let two = hash_webhook_secret(&secret_a);
3299 let other = hash_webhook_secret(&secret_b);
3300
3301 assert_eq!(one, two);
3302 assert_ne!(one, other);
3303 assert_eq!(one.len(), 64);
3304 }
3305
3306 #[tokio::test]
3307 async fn webhook_secret_hash_rejects_missing_header() {
3308 let provider_impl = Arc::new(MockProvider::default());
3309 let provider: Arc<dyn Provider> = provider_impl.clone();
3310 let memory: Arc<dyn Memory> = Arc::new(MockMemory);
3311 let secret = generate_test_secret();
3312
3313 let state = AppState {
3314 config: Arc::new(Mutex::new(Config::default())),
3315 provider,
3316 model: "test-model".into(),
3317 temperature: 0.0,
3318 mem: memory,
3319 auto_save: false,
3320 webhook_secret_hash: Some(Arc::from(hash_webhook_secret(&secret))),
3321 pairing: Arc::new(PairingGuard::new(false, &[])),
3322 trust_forwarded_headers: false,
3323 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3324 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3325 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3326 whatsapp: None,
3327 whatsapp_app_secret: None,
3328 linq: None,
3329 linq_signing_secret: None,
3330 nextcloud_talk: None,
3331 nextcloud_talk_webhook_secret: None,
3332 wati: None,
3333 gmail_push: None,
3334 observer: Arc::new(crate::observability::NoopObserver),
3335 tools_registry: Arc::new(Vec::new()),
3336 cost_tracker: None,
3337 audit_logger: None,
3338 event_tx: tokio::sync::broadcast::channel(16).0,
3339 shutdown_tx: tokio::sync::watch::channel(false).0,
3340 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3341 path_prefix: String::new(),
3342 session_backend: None,
3343 session_queue: std::sync::Arc::new(
3344 crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3345 ),
3346 device_registry: None,
3347 pending_pairings: None,
3348 canvas_store: CanvasStore::new(),
3349 mcp_registry: None,
3350 approval_registry: approval_registry::global(),
3351 mcp_local_url: None,
3352 #[cfg(feature = "webauthn")]
3353 webauthn: None,
3354 };
3355
3356 let response = handle_webhook(
3357 State(state),
3358 test_connect_info(),
3359 HeaderMap::new(),
3360 Ok(Json(WebhookBody {
3361 message: "hello".into(),
3362 })),
3363 )
3364 .await
3365 .into_response();
3366
3367 assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
3368 assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 0);
3369 }
3370
3371 #[tokio::test]
3372 async fn webhook_secret_hash_rejects_invalid_header() {
3373 let provider_impl = Arc::new(MockProvider::default());
3374 let provider: Arc<dyn Provider> = provider_impl.clone();
3375 let memory: Arc<dyn Memory> = Arc::new(MockMemory);
3376 let valid_secret = generate_test_secret();
3377 let wrong_secret = generate_test_secret();
3378
3379 let state = AppState {
3380 config: Arc::new(Mutex::new(Config::default())),
3381 provider,
3382 model: "test-model".into(),
3383 temperature: 0.0,
3384 mem: memory,
3385 auto_save: false,
3386 webhook_secret_hash: Some(Arc::from(hash_webhook_secret(&valid_secret))),
3387 pairing: Arc::new(PairingGuard::new(false, &[])),
3388 trust_forwarded_headers: false,
3389 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3390 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3391 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3392 whatsapp: None,
3393 whatsapp_app_secret: None,
3394 linq: None,
3395 linq_signing_secret: None,
3396 nextcloud_talk: None,
3397 nextcloud_talk_webhook_secret: None,
3398 wati: None,
3399 gmail_push: None,
3400 observer: Arc::new(crate::observability::NoopObserver),
3401 tools_registry: Arc::new(Vec::new()),
3402 cost_tracker: None,
3403 audit_logger: None,
3404 event_tx: tokio::sync::broadcast::channel(16).0,
3405 shutdown_tx: tokio::sync::watch::channel(false).0,
3406 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3407 path_prefix: String::new(),
3408 session_backend: None,
3409 session_queue: std::sync::Arc::new(
3410 crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3411 ),
3412 device_registry: None,
3413 pending_pairings: None,
3414 canvas_store: CanvasStore::new(),
3415 mcp_registry: None,
3416 approval_registry: approval_registry::global(),
3417 mcp_local_url: None,
3418 #[cfg(feature = "webauthn")]
3419 webauthn: None,
3420 };
3421
3422 let mut headers = HeaderMap::new();
3423 headers.insert(
3424 "X-Webhook-Secret",
3425 HeaderValue::from_str(&wrong_secret).unwrap(),
3426 );
3427
3428 let response = handle_webhook(
3429 State(state),
3430 test_connect_info(),
3431 headers,
3432 Ok(Json(WebhookBody {
3433 message: "hello".into(),
3434 })),
3435 )
3436 .await
3437 .into_response();
3438
3439 assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
3440 assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 0);
3441 }
3442
3443 #[tokio::test]
3444 async fn webhook_secret_hash_accepts_valid_header() {
3445 let provider_impl = Arc::new(MockProvider::default());
3446 let provider: Arc<dyn Provider> = provider_impl.clone();
3447 let memory: Arc<dyn Memory> = Arc::new(MockMemory);
3448 let secret = generate_test_secret();
3449
3450 let state = AppState {
3451 config: Arc::new(Mutex::new(Config::default())),
3452 provider,
3453 model: "test-model".into(),
3454 temperature: 0.0,
3455 mem: memory,
3456 auto_save: false,
3457 webhook_secret_hash: Some(Arc::from(hash_webhook_secret(&secret))),
3458 pairing: Arc::new(PairingGuard::new(false, &[])),
3459 trust_forwarded_headers: false,
3460 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3461 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3462 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3463 whatsapp: None,
3464 whatsapp_app_secret: None,
3465 linq: None,
3466 linq_signing_secret: None,
3467 nextcloud_talk: None,
3468 nextcloud_talk_webhook_secret: None,
3469 wati: None,
3470 gmail_push: None,
3471 observer: Arc::new(crate::observability::NoopObserver),
3472 tools_registry: Arc::new(Vec::new()),
3473 cost_tracker: None,
3474 audit_logger: None,
3475 event_tx: tokio::sync::broadcast::channel(16).0,
3476 shutdown_tx: tokio::sync::watch::channel(false).0,
3477 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3478 path_prefix: String::new(),
3479 session_backend: None,
3480 session_queue: std::sync::Arc::new(
3481 crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3482 ),
3483 device_registry: None,
3484 pending_pairings: None,
3485 canvas_store: CanvasStore::new(),
3486 mcp_registry: None,
3487 approval_registry: approval_registry::global(),
3488 mcp_local_url: None,
3489 #[cfg(feature = "webauthn")]
3490 webauthn: None,
3491 };
3492
3493 let mut headers = HeaderMap::new();
3494 headers.insert("X-Webhook-Secret", HeaderValue::from_str(&secret).unwrap());
3495
3496 let response = handle_webhook(
3497 State(state),
3498 test_connect_info(),
3499 headers,
3500 Ok(Json(WebhookBody {
3501 message: "hello".into(),
3502 })),
3503 )
3504 .await
3505 .into_response();
3506
3507 assert_eq!(response.status(), StatusCode::OK);
3508 assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 1);
3509 }
3510
3511 fn compute_nextcloud_signature_hex(secret: &str, random: &str, body: &str) -> String {
3512 use hmac::{Hmac, Mac};
3513 use sha2::Sha256;
3514
3515 let payload = format!("{random}{body}");
3516 let mut mac = Hmac::<Sha256>::new_from_slice(secret.as_bytes()).unwrap();
3517 mac.update(payload.as_bytes());
3518 hex::encode(mac.finalize().into_bytes())
3519 }
3520
3521 #[tokio::test]
3522 async fn nextcloud_talk_webhook_returns_not_found_when_not_configured() {
3523 let provider: Arc<dyn Provider> = Arc::new(MockProvider::default());
3524 let memory: Arc<dyn Memory> = Arc::new(MockMemory);
3525
3526 let state = AppState {
3527 config: Arc::new(Mutex::new(Config::default())),
3528 provider,
3529 model: "test-model".into(),
3530 temperature: 0.0,
3531 mem: memory,
3532 auto_save: false,
3533 webhook_secret_hash: None,
3534 pairing: Arc::new(PairingGuard::new(false, &[])),
3535 trust_forwarded_headers: false,
3536 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3537 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3538 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3539 whatsapp: None,
3540 whatsapp_app_secret: None,
3541 linq: None,
3542 linq_signing_secret: None,
3543 nextcloud_talk: None,
3544 nextcloud_talk_webhook_secret: None,
3545 wati: None,
3546 gmail_push: None,
3547 observer: Arc::new(crate::observability::NoopObserver),
3548 tools_registry: Arc::new(Vec::new()),
3549 cost_tracker: None,
3550 audit_logger: None,
3551 event_tx: tokio::sync::broadcast::channel(16).0,
3552 shutdown_tx: tokio::sync::watch::channel(false).0,
3553 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3554 path_prefix: String::new(),
3555 session_backend: None,
3556 session_queue: std::sync::Arc::new(
3557 crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3558 ),
3559 device_registry: None,
3560 pending_pairings: None,
3561 canvas_store: CanvasStore::new(),
3562 mcp_registry: None,
3563 approval_registry: approval_registry::global(),
3564 mcp_local_url: None,
3565 #[cfg(feature = "webauthn")]
3566 webauthn: None,
3567 };
3568
3569 let response = Box::pin(handle_nextcloud_talk_webhook(
3570 State(state),
3571 HeaderMap::new(),
3572 Bytes::from_static(br#"{"type":"message"}"#),
3573 ))
3574 .await
3575 .into_response();
3576
3577 assert_eq!(response.status(), StatusCode::NOT_FOUND);
3578 }
3579
3580 #[tokio::test]
3581 async fn nextcloud_talk_webhook_rejects_invalid_signature() {
3582 let provider_impl = Arc::new(MockProvider::default());
3583 let provider: Arc<dyn Provider> = provider_impl.clone();
3584 let memory: Arc<dyn Memory> = Arc::new(MockMemory);
3585
3586 let channel = Arc::new(NextcloudTalkChannel::new(
3587 "https://cloud.example.com".into(),
3588 "app-token".into(),
3589 String::new(),
3590 vec!["*".into()],
3591 ));
3592
3593 let secret = "nextcloud-test-secret";
3594 let random = "seed-value";
3595 let body = r#"{"type":"message","object":{"token":"room-token"},"message":{"actorType":"users","actorId":"user_a","message":"hello"}}"#;
3596 let _valid_signature = compute_nextcloud_signature_hex(secret, random, body);
3597 let invalid_signature = "deadbeef";
3598
3599 let state = AppState {
3600 config: Arc::new(Mutex::new(Config::default())),
3601 provider,
3602 model: "test-model".into(),
3603 temperature: 0.0,
3604 mem: memory,
3605 auto_save: false,
3606 webhook_secret_hash: None,
3607 pairing: Arc::new(PairingGuard::new(false, &[])),
3608 trust_forwarded_headers: false,
3609 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3610 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3611 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3612 whatsapp: None,
3613 whatsapp_app_secret: None,
3614 linq: None,
3615 linq_signing_secret: None,
3616 nextcloud_talk: Some(channel),
3617 nextcloud_talk_webhook_secret: Some(Arc::from(secret)),
3618 wati: None,
3619 gmail_push: None,
3620 observer: Arc::new(crate::observability::NoopObserver),
3621 tools_registry: Arc::new(Vec::new()),
3622 cost_tracker: None,
3623 audit_logger: None,
3624 event_tx: tokio::sync::broadcast::channel(16).0,
3625 shutdown_tx: tokio::sync::watch::channel(false).0,
3626 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3627 path_prefix: String::new(),
3628 session_backend: None,
3629 session_queue: std::sync::Arc::new(
3630 crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3631 ),
3632 device_registry: None,
3633 pending_pairings: None,
3634 canvas_store: CanvasStore::new(),
3635 mcp_registry: None,
3636 approval_registry: approval_registry::global(),
3637 mcp_local_url: None,
3638 #[cfg(feature = "webauthn")]
3639 webauthn: None,
3640 };
3641
3642 let mut headers = HeaderMap::new();
3643 headers.insert(
3644 "X-Nextcloud-Talk-Random",
3645 HeaderValue::from_str(random).unwrap(),
3646 );
3647 headers.insert(
3648 "X-Nextcloud-Talk-Signature",
3649 HeaderValue::from_str(invalid_signature).unwrap(),
3650 );
3651
3652 let response = Box::pin(handle_nextcloud_talk_webhook(
3653 State(state),
3654 headers,
3655 Bytes::from(body),
3656 ))
3657 .await
3658 .into_response();
3659 assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
3660 assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 0);
3661 }
3662
3663 fn compute_whatsapp_signature_hex(secret: &str, body: &[u8]) -> String {
3668 use hmac::{Hmac, Mac};
3669 use sha2::Sha256;
3670
3671 let mut mac = Hmac::<Sha256>::new_from_slice(secret.as_bytes()).unwrap();
3672 mac.update(body);
3673 hex::encode(mac.finalize().into_bytes())
3674 }
3675
3676 fn compute_whatsapp_signature_header(secret: &str, body: &[u8]) -> String {
3677 format!("sha256={}", compute_whatsapp_signature_hex(secret, body))
3678 }
3679
3680 #[test]
3681 fn whatsapp_signature_valid() {
3682 let app_secret = generate_test_secret();
3683 let body = b"test body content";
3684
3685 let signature_header = compute_whatsapp_signature_header(&app_secret, body);
3686
3687 assert!(verify_whatsapp_signature(
3688 &app_secret,
3689 body,
3690 &signature_header
3691 ));
3692 }
3693
3694 #[test]
3695 fn whatsapp_signature_invalid_wrong_secret() {
3696 let app_secret = generate_test_secret();
3697 let wrong_secret = generate_test_secret();
3698 let body = b"test body content";
3699
3700 let signature_header = compute_whatsapp_signature_header(&wrong_secret, body);
3701
3702 assert!(!verify_whatsapp_signature(
3703 &app_secret,
3704 body,
3705 &signature_header
3706 ));
3707 }
3708
3709 #[test]
3710 fn whatsapp_signature_invalid_wrong_body() {
3711 let app_secret = generate_test_secret();
3712 let original_body = b"original body";
3713 let tampered_body = b"tampered body";
3714
3715 let signature_header = compute_whatsapp_signature_header(&app_secret, original_body);
3716
3717 assert!(!verify_whatsapp_signature(
3719 &app_secret,
3720 tampered_body,
3721 &signature_header
3722 ));
3723 }
3724
3725 #[test]
3726 fn whatsapp_signature_missing_prefix() {
3727 let app_secret = generate_test_secret();
3728 let body = b"test body";
3729
3730 let signature_header = "abc123def456";
3732
3733 assert!(!verify_whatsapp_signature(
3734 &app_secret,
3735 body,
3736 signature_header
3737 ));
3738 }
3739
3740 #[test]
3741 fn whatsapp_signature_empty_header() {
3742 let app_secret = generate_test_secret();
3743 let body = b"test body";
3744
3745 assert!(!verify_whatsapp_signature(&app_secret, body, ""));
3746 }
3747
3748 #[test]
3749 fn whatsapp_signature_invalid_hex() {
3750 let app_secret = generate_test_secret();
3751 let body = b"test body";
3752
3753 let signature_header = "sha256=not_valid_hex_zzz";
3755
3756 assert!(!verify_whatsapp_signature(
3757 &app_secret,
3758 body,
3759 signature_header
3760 ));
3761 }
3762
3763 #[test]
3764 fn whatsapp_signature_empty_body() {
3765 let app_secret = generate_test_secret();
3766 let body = b"";
3767
3768 let signature_header = compute_whatsapp_signature_header(&app_secret, body);
3769
3770 assert!(verify_whatsapp_signature(
3771 &app_secret,
3772 body,
3773 &signature_header
3774 ));
3775 }
3776
3777 #[test]
3778 fn whatsapp_signature_unicode_body() {
3779 let app_secret = generate_test_secret();
3780 let body = "Hello 🦀 World".as_bytes();
3781
3782 let signature_header = compute_whatsapp_signature_header(&app_secret, body);
3783
3784 assert!(verify_whatsapp_signature(
3785 &app_secret,
3786 body,
3787 &signature_header
3788 ));
3789 }
3790
3791 #[test]
3792 fn whatsapp_signature_json_payload() {
3793 let app_secret = generate_test_secret();
3794 let body = br#"{"entry":[{"changes":[{"value":{"messages":[{"from":"1234567890","text":{"body":"Hello"}}]}}]}]}"#;
3795
3796 let signature_header = compute_whatsapp_signature_header(&app_secret, body);
3797
3798 assert!(verify_whatsapp_signature(
3799 &app_secret,
3800 body,
3801 &signature_header
3802 ));
3803 }
3804
3805 #[test]
3806 fn whatsapp_signature_case_sensitive_prefix() {
3807 let app_secret = generate_test_secret();
3808 let body = b"test body";
3809
3810 let hex_sig = compute_whatsapp_signature_hex(&app_secret, body);
3811
3812 let wrong_prefix = format!("SHA256={hex_sig}");
3814 assert!(!verify_whatsapp_signature(&app_secret, body, &wrong_prefix));
3815
3816 let correct_prefix = format!("sha256={hex_sig}");
3818 assert!(verify_whatsapp_signature(
3819 &app_secret,
3820 body,
3821 &correct_prefix
3822 ));
3823 }
3824
3825 #[test]
3826 fn whatsapp_signature_truncated_hex() {
3827 let app_secret = generate_test_secret();
3828 let body = b"test body";
3829
3830 let hex_sig = compute_whatsapp_signature_hex(&app_secret, body);
3831 let truncated = &hex_sig[..32]; let signature_header = format!("sha256={truncated}");
3833
3834 assert!(!verify_whatsapp_signature(
3835 &app_secret,
3836 body,
3837 &signature_header
3838 ));
3839 }
3840
3841 #[test]
3842 fn whatsapp_signature_extra_bytes() {
3843 let app_secret = generate_test_secret();
3844 let body = b"test body";
3845
3846 let hex_sig = compute_whatsapp_signature_hex(&app_secret, body);
3847 let extended = format!("{hex_sig}deadbeef");
3848 let signature_header = format!("sha256={extended}");
3849
3850 assert!(!verify_whatsapp_signature(
3851 &app_secret,
3852 body,
3853 &signature_header
3854 ));
3855 }
3856
3857 #[test]
3862 fn idempotency_store_allows_different_keys() {
3863 let store = IdempotencyStore::new(Duration::from_secs(60), 100);
3864 assert!(store.record_if_new("key-a"));
3865 assert!(store.record_if_new("key-b"));
3866 assert!(store.record_if_new("key-c"));
3867 assert!(store.record_if_new("key-d"));
3868 }
3869
3870 #[test]
3871 fn idempotency_store_max_keys_clamped_to_one() {
3872 let store = IdempotencyStore::new(Duration::from_secs(60), 0);
3873 assert!(store.record_if_new("only-key"));
3874 assert!(!store.record_if_new("only-key"));
3875 }
3876
3877 #[test]
3878 fn idempotency_store_rapid_duplicate_rejected() {
3879 let store = IdempotencyStore::new(Duration::from_secs(300), 100);
3880 assert!(store.record_if_new("rapid"));
3881 assert!(!store.record_if_new("rapid"));
3882 }
3883
3884 #[test]
3885 fn idempotency_store_accepts_after_ttl_expires() {
3886 let store = IdempotencyStore::new(Duration::from_millis(1), 100);
3887 assert!(store.record_if_new("ttl-key"));
3888 std::thread::sleep(Duration::from_millis(10));
3889 assert!(store.record_if_new("ttl-key"));
3890 }
3891
3892 #[test]
3893 fn idempotency_store_eviction_preserves_newest() {
3894 let store = IdempotencyStore::new(Duration::from_secs(300), 1);
3895 assert!(store.record_if_new("old-key"));
3896 std::thread::sleep(Duration::from_millis(2));
3897 assert!(store.record_if_new("new-key"));
3898
3899 let keys = store.keys.lock();
3900 assert_eq!(keys.len(), 1);
3901 assert!(!keys.contains_key("old-key"));
3902 assert!(keys.contains_key("new-key"));
3903 }
3904
3905 #[test]
3906 fn rate_limiter_allows_after_window_expires() {
3907 let window = Duration::from_millis(50);
3908 let limiter = SlidingWindowRateLimiter::new(2, window, 100);
3909 assert!(limiter.allow("ip-1"));
3910 assert!(limiter.allow("ip-1"));
3911 assert!(!limiter.allow("ip-1")); std::thread::sleep(Duration::from_millis(60));
3915
3916 assert!(limiter.allow("ip-1"));
3918 }
3919
3920 #[test]
3921 fn rate_limiter_independent_keys_tracked_separately() {
3922 let limiter = SlidingWindowRateLimiter::new(2, Duration::from_secs(60), 100);
3923 assert!(limiter.allow("ip-1"));
3924 assert!(limiter.allow("ip-1"));
3925 assert!(!limiter.allow("ip-1")); assert!(limiter.allow("ip-2"));
3929 assert!(limiter.allow("ip-2"));
3930 assert!(!limiter.allow("ip-2")); }
3932
3933 #[test]
3934 fn rate_limiter_exact_boundary_at_max_keys() {
3935 let limiter = SlidingWindowRateLimiter::new(10, Duration::from_secs(60), 3);
3936 assert!(limiter.allow("ip-1"));
3937 assert!(limiter.allow("ip-2"));
3938 assert!(limiter.allow("ip-3"));
3939 assert!(limiter.allow("ip-4")); let guard = limiter.requests.lock();
3943 assert_eq!(guard.0.len(), 3);
3944 assert!(
3945 !guard.0.contains_key("ip-1"),
3946 "ip-1 should have been evicted"
3947 );
3948 assert!(guard.0.contains_key("ip-2"));
3949 assert!(guard.0.contains_key("ip-3"));
3950 assert!(guard.0.contains_key("ip-4"));
3951 }
3952
3953 #[test]
3954 fn gateway_rate_limiter_pair_and_webhook_are_independent() {
3955 let limiter = GatewayRateLimiter::new(2, 3, 100);
3956
3957 assert!(limiter.allow_pair("ip-1"));
3959 assert!(limiter.allow_pair("ip-1"));
3960 assert!(!limiter.allow_pair("ip-1")); assert!(limiter.allow_webhook("ip-1"));
3964 assert!(limiter.allow_webhook("ip-1"));
3965 assert!(limiter.allow_webhook("ip-1"));
3966 assert!(!limiter.allow_webhook("ip-1")); }
3968
3969 #[test]
3970 fn rate_limiter_single_key_max_allows_one_request() {
3971 let limiter = SlidingWindowRateLimiter::new(5, Duration::from_secs(60), 1);
3972 assert!(limiter.allow("ip-1"));
3973 assert!(limiter.allow("ip-2")); let guard = limiter.requests.lock();
3976 assert_eq!(guard.0.len(), 1);
3977 assert!(guard.0.contains_key("ip-2"));
3978 assert!(!guard.0.contains_key("ip-1"));
3979 }
3980
3981 #[test]
3982 fn rate_limiter_concurrent_access_safe() {
3983 use std::sync::Arc;
3984
3985 let limiter = Arc::new(SlidingWindowRateLimiter::new(
3986 1000,
3987 Duration::from_secs(60),
3988 1000,
3989 ));
3990 let mut handles = Vec::new();
3991
3992 for i in 0..10 {
3993 let limiter = limiter.clone();
3994 handles.push(std::thread::spawn(move || {
3995 for j in 0..100 {
3996 limiter.allow(&format!("thread-{i}-req-{j}"));
3997 }
3998 }));
3999 }
4000
4001 for handle in handles {
4002 handle.join().unwrap();
4003 }
4004
4005 let guard = limiter.requests.lock();
4007 assert!(guard.0.len() <= 1000, "should respect max_keys");
4008 }
4009
4010 #[test]
4011 fn idempotency_store_concurrent_access_safe() {
4012 use std::sync::Arc;
4013
4014 let store = Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000));
4015 let mut handles = Vec::new();
4016
4017 for i in 0..10 {
4018 let store = store.clone();
4019 handles.push(std::thread::spawn(move || {
4020 for j in 0..100 {
4021 store.record_if_new(&format!("thread-{i}-key-{j}"));
4022 }
4023 }));
4024 }
4025
4026 for handle in handles {
4027 handle.join().unwrap();
4028 }
4029
4030 let keys = store.keys.lock();
4031 assert!(keys.len() <= 1000, "should respect max_keys");
4032 }
4033
4034 #[test]
4035 fn rate_limiter_rapid_burst_then_cooldown() {
4036 let limiter = SlidingWindowRateLimiter::new(5, Duration::from_millis(50), 100);
4037
4038 for _ in 0..5 {
4040 assert!(limiter.allow("burst-ip"));
4041 }
4042 assert!(!limiter.allow("burst-ip")); std::thread::sleep(Duration::from_millis(60));
4046
4047 assert!(limiter.allow("burst-ip"));
4049 }
4050
4051 #[test]
4052 fn require_localhost_accepts_ipv4_loopback() {
4053 let peer = SocketAddr::from(([127, 0, 0, 1], 12345));
4054 assert!(require_localhost(&peer).is_ok());
4055 }
4056
4057 #[test]
4058 fn require_localhost_accepts_ipv6_loopback() {
4059 let peer = SocketAddr::from((std::net::Ipv6Addr::LOCALHOST, 12345));
4060 assert!(require_localhost(&peer).is_ok());
4061 }
4062
4063 #[test]
4064 fn require_localhost_rejects_non_loopback_ipv4() {
4065 let peer = SocketAddr::from(([192, 168, 1, 100], 12345));
4066 let err = require_localhost(&peer).unwrap_err();
4067 assert_eq!(err.0, StatusCode::FORBIDDEN);
4068 }
4069
4070 #[test]
4071 fn require_localhost_rejects_non_loopback_ipv6() {
4072 let peer = SocketAddr::from((
4073 std::net::Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1),
4074 12345,
4075 ));
4076 let err = require_localhost(&peer).unwrap_err();
4077 assert_eq!(err.0, StatusCode::FORBIDDEN);
4078 }
4079}