1pub mod api;
11pub mod api_agents;
12pub mod api_artifact_body;
13pub mod api_clawhub;
14pub mod api_kumiho_proxy;
15pub mod api_mcp;
16pub mod api_memory_graph;
17pub mod api_pairing;
18#[cfg(feature = "plugins-wasm")]
19pub mod api_plugins;
20pub mod api_skills;
21pub mod api_teams;
22#[cfg(feature = "webauthn")]
23pub mod api_webauthn;
24pub mod api_workflows;
25pub mod approval_registry;
26pub mod auth_rate_limit;
27pub mod canvas;
28pub mod kumiho_client;
29pub mod mcp_discovery;
30pub mod nodes;
31pub mod session_queue;
32pub mod sse;
33pub mod static_files;
34pub mod terminal;
35pub mod tls;
36pub mod ws;
37pub mod ws_mcp_events;
38
39use crate::channels::{
40 Channel, GmailPushChannel, LinqChannel, NextcloudTalkChannel, SendMessage, WatiChannel,
41 WhatsAppChannel, session_backend::SessionBackend, session_sqlite::SqliteSessionBackend,
42};
43use crate::config::Config;
44use crate::cost::CostTracker;
45use crate::memory::{self, Memory, MemoryCategory};
46use crate::providers::{self, ChatMessage, Provider};
47use crate::runtime;
48use crate::security::SecurityPolicy;
49use crate::security::pairing::{PairingGuard, constant_time_eq, is_public_bind};
50use crate::tools;
51use crate::tools::canvas::CanvasStore;
52use crate::tools::traits::ToolSpec;
53use crate::util::truncate_with_ellipsis;
54use anyhow::{Context, Result};
55use axum::{
56 Router,
57 body::Bytes,
58 extract::{ConnectInfo, Query, State},
59 http::{HeaderMap, StatusCode, header},
60 response::{IntoResponse, Json},
61 routing::{delete, get, post, put},
62};
63use parking_lot::Mutex;
64use std::collections::HashMap;
65use std::net::{IpAddr, SocketAddr};
66use std::sync::Arc;
67use std::time::{Duration, Instant};
68use tower_http::limit::RequestBodyLimitLayer;
69use tower_http::timeout::TimeoutLayer;
70use uuid::Uuid;
71
72pub const MAX_BODY_SIZE: usize = 65_536;
74pub const REQUEST_TIMEOUT_SECS: u64 = 30;
76
77pub fn gateway_request_timeout_secs() -> u64 {
84 std::env::var("CONSTRUCT_GATEWAY_TIMEOUT_SECS")
85 .ok()
86 .and_then(|v| v.parse().ok())
87 .unwrap_or(REQUEST_TIMEOUT_SECS)
88}
89pub const RATE_LIMIT_WINDOW_SECS: u64 = 60;
91pub const RATE_LIMIT_MAX_KEYS_DEFAULT: usize = 10_000;
93pub const IDEMPOTENCY_MAX_KEYS_DEFAULT: usize = 10_000;
95
96fn webhook_memory_key() -> String {
97 format!("webhook_msg_{}", Uuid::new_v4())
98}
99
100fn whatsapp_memory_key(msg: &crate::channels::traits::ChannelMessage) -> String {
101 format!("whatsapp_{}_{}", msg.sender, msg.id)
102}
103
104fn linq_memory_key(msg: &crate::channels::traits::ChannelMessage) -> String {
105 format!("linq_{}_{}", msg.sender, msg.id)
106}
107
108fn wati_memory_key(msg: &crate::channels::traits::ChannelMessage) -> String {
109 format!("wati_{}_{}", msg.sender, msg.id)
110}
111
112fn nextcloud_talk_memory_key(msg: &crate::channels::traits::ChannelMessage) -> String {
113 format!("nextcloud_talk_{}_{}", msg.sender, msg.id)
114}
115
116fn sender_session_id(channel: &str, msg: &crate::channels::traits::ChannelMessage) -> String {
117 match &msg.thread_ts {
118 Some(thread_id) => format!("{channel}_{thread_id}_{}", msg.sender),
119 None => format!("{channel}_{}", msg.sender),
120 }
121}
122
123fn webhook_session_id(headers: &HeaderMap) -> Option<String> {
124 headers
125 .get("X-Session-Id")
126 .and_then(|v| v.to_str().ok())
127 .map(str::trim)
128 .filter(|value| !value.is_empty())
129 .map(str::to_owned)
130}
131
132fn hash_webhook_secret(value: &str) -> String {
133 use sha2::{Digest, Sha256};
134
135 let digest = Sha256::digest(value.as_bytes());
136 hex::encode(digest)
137}
138
139const RATE_LIMITER_SWEEP_INTERVAL_SECS: u64 = 300; #[derive(Debug)]
143struct SlidingWindowRateLimiter {
144 limit_per_window: u32,
145 window: Duration,
146 max_keys: usize,
147 requests: Mutex<(HashMap<String, Vec<Instant>>, Instant)>,
148}
149
150impl SlidingWindowRateLimiter {
151 fn new(limit_per_window: u32, window: Duration, max_keys: usize) -> Self {
152 Self {
153 limit_per_window,
154 window,
155 max_keys: max_keys.max(1),
156 requests: Mutex::new((HashMap::new(), Instant::now())),
157 }
158 }
159
160 fn prune_stale(requests: &mut HashMap<String, Vec<Instant>>, cutoff: Instant) {
161 requests.retain(|_, timestamps| {
162 timestamps.retain(|t| *t > cutoff);
163 !timestamps.is_empty()
164 });
165 }
166
167 fn allow(&self, key: &str) -> bool {
168 if self.limit_per_window == 0 {
169 return true;
170 }
171
172 let now = Instant::now();
173 let cutoff = now.checked_sub(self.window).unwrap_or_else(Instant::now);
174
175 let mut guard = self.requests.lock();
176 let (requests, last_sweep) = &mut *guard;
177
178 if last_sweep.elapsed() >= Duration::from_secs(RATE_LIMITER_SWEEP_INTERVAL_SECS) {
180 Self::prune_stale(requests, cutoff);
181 *last_sweep = now;
182 }
183
184 if !requests.contains_key(key) && requests.len() >= self.max_keys {
185 Self::prune_stale(requests, cutoff);
187 *last_sweep = now;
188
189 if requests.len() >= self.max_keys {
190 let evict_key = requests
191 .iter()
192 .min_by_key(|(_, timestamps)| timestamps.last().copied().unwrap_or(cutoff))
193 .map(|(k, _)| k.clone());
194 if let Some(evict_key) = evict_key {
195 requests.remove(&evict_key);
196 }
197 }
198 }
199
200 let entry = requests.entry(key.to_owned()).or_default();
201 entry.retain(|instant| *instant > cutoff);
202
203 if entry.len() >= self.limit_per_window as usize {
204 return false;
205 }
206
207 entry.push(now);
208 true
209 }
210}
211
212#[derive(Debug)]
213pub struct GatewayRateLimiter {
214 pair: SlidingWindowRateLimiter,
215 webhook: SlidingWindowRateLimiter,
216}
217
218impl GatewayRateLimiter {
219 fn new(pair_per_minute: u32, webhook_per_minute: u32, max_keys: usize) -> Self {
220 let window = Duration::from_secs(RATE_LIMIT_WINDOW_SECS);
221 Self {
222 pair: SlidingWindowRateLimiter::new(pair_per_minute, window, max_keys),
223 webhook: SlidingWindowRateLimiter::new(webhook_per_minute, window, max_keys),
224 }
225 }
226
227 fn allow_pair(&self, key: &str) -> bool {
228 self.pair.allow(key)
229 }
230
231 fn allow_webhook(&self, key: &str) -> bool {
232 self.webhook.allow(key)
233 }
234}
235
236#[derive(Debug)]
237pub struct IdempotencyStore {
238 ttl: Duration,
239 max_keys: usize,
240 keys: Mutex<HashMap<String, Instant>>,
241}
242
243impl IdempotencyStore {
244 fn new(ttl: Duration, max_keys: usize) -> Self {
245 Self {
246 ttl,
247 max_keys: max_keys.max(1),
248 keys: Mutex::new(HashMap::new()),
249 }
250 }
251
252 fn record_if_new(&self, key: &str) -> bool {
254 let now = Instant::now();
255 let mut keys = self.keys.lock();
256
257 keys.retain(|_, seen_at| now.duration_since(*seen_at) < self.ttl);
258
259 if keys.contains_key(key) {
260 return false;
261 }
262
263 if keys.len() >= self.max_keys {
264 let evict_key = keys
265 .iter()
266 .min_by_key(|(_, seen_at)| *seen_at)
267 .map(|(k, _)| k.clone());
268 if let Some(evict_key) = evict_key {
269 keys.remove(&evict_key);
270 }
271 }
272
273 keys.insert(key.to_owned(), now);
274 true
275 }
276}
277
278fn parse_client_ip(value: &str) -> Option<IpAddr> {
279 let value = value.trim().trim_matches('"').trim();
280 if value.is_empty() {
281 return None;
282 }
283
284 if let Ok(ip) = value.parse::<IpAddr>() {
285 return Some(ip);
286 }
287
288 if let Ok(addr) = value.parse::<SocketAddr>() {
289 return Some(addr.ip());
290 }
291
292 let value = value.trim_matches(['[', ']']);
293 value.parse::<IpAddr>().ok()
294}
295
296fn forwarded_client_ip(headers: &HeaderMap) -> Option<IpAddr> {
297 if let Some(xff) = headers.get("X-Forwarded-For").and_then(|v| v.to_str().ok()) {
302 for candidate in xff.rsplit(',') {
303 if let Some(ip) = parse_client_ip(candidate) {
304 return Some(ip);
305 }
306 }
307 }
308
309 headers
310 .get("X-Real-IP")
311 .and_then(|v| v.to_str().ok())
312 .and_then(parse_client_ip)
313}
314
315pub(super) fn client_key_from_request(
316 peer_addr: Option<SocketAddr>,
317 headers: &HeaderMap,
318 trust_forwarded_headers: bool,
319) -> String {
320 if trust_forwarded_headers {
321 if let Some(ip) = forwarded_client_ip(headers) {
322 return ip.to_string();
323 }
324 }
325
326 peer_addr
327 .map(|addr| addr.ip().to_string())
328 .unwrap_or_else(|| "unknown".to_string())
329}
330
331fn normalize_max_keys(configured: usize, fallback: usize) -> usize {
332 if configured == 0 {
333 fallback.max(1)
334 } else {
335 configured
336 }
337}
338
339#[derive(Clone)]
341pub struct AppState {
342 pub config: Arc<Mutex<Config>>,
343 pub provider: Arc<dyn Provider>,
344 pub model: String,
345 pub temperature: f64,
346 pub mem: Arc<dyn Memory>,
347 pub auto_save: bool,
348 pub webhook_secret_hash: Option<Arc<str>>,
350 pub pairing: Arc<PairingGuard>,
351 pub trust_forwarded_headers: bool,
352 pub rate_limiter: Arc<GatewayRateLimiter>,
353 pub auth_limiter: Arc<auth_rate_limit::AuthRateLimiter>,
354 pub idempotency_store: Arc<IdempotencyStore>,
355 pub whatsapp: Option<Arc<WhatsAppChannel>>,
356 pub whatsapp_app_secret: Option<Arc<str>>,
358 pub linq: Option<Arc<LinqChannel>>,
359 pub linq_signing_secret: Option<Arc<str>>,
361 pub nextcloud_talk: Option<Arc<NextcloudTalkChannel>>,
362 pub nextcloud_talk_webhook_secret: Option<Arc<str>>,
364 pub wati: Option<Arc<WatiChannel>>,
365 pub gmail_push: Option<Arc<GmailPushChannel>>,
367 pub observer: Arc<dyn crate::observability::Observer>,
369 pub tools_registry: Arc<Vec<ToolSpec>>,
371 pub cost_tracker: Option<Arc<CostTracker>>,
373 pub audit_logger: Option<Arc<crate::security::audit::AuditLogger>>,
375 pub event_tx: tokio::sync::broadcast::Sender<serde_json::Value>,
377 pub shutdown_tx: tokio::sync::watch::Sender<bool>,
379 pub node_registry: Arc<nodes::NodeRegistry>,
381 pub path_prefix: String,
383 pub session_backend: Option<Arc<dyn SessionBackend>>,
385 pub session_queue: Arc<session_queue::SessionActorQueue>,
387 pub device_registry: Option<Arc<api_pairing::DeviceRegistry>>,
389 pub pending_pairings: Option<Arc<api_pairing::PairingStore>>,
391 pub canvas_store: CanvasStore,
393 pub mcp_registry: Option<Arc<tools::McpRegistry>>,
395 #[cfg(feature = "webauthn")]
397 pub webauthn: Option<Arc<api_webauthn::WebAuthnState>>,
398 pub approval_registry: Arc<approval_registry::ApprovalRegistry>,
400 pub mcp_local_url: Option<Arc<str>>,
405}
406
407#[allow(clippy::too_many_lines)]
409pub async fn run_gateway(host: &str, port: u16, config: Config) -> Result<()> {
410 if is_public_bind(host) && config.tunnel.provider == "none" && !config.gateway.allow_public_bind
412 {
413 tracing::warn!(
414 "⚠️ Binding to {host} — gateway will be exposed to all network interfaces.\n\
415 Suggestion: use --host 127.0.0.1 (default), configure a tunnel, or set\n\
416 [gateway] allow_public_bind = true in config.toml to silence this warning.\n\n\
417 Docker/VM: if you are running inside a container or VM, this is expected."
418 );
419 }
420 let config_state = Arc::new(Mutex::new(config.clone()));
421
422 let hooks: Option<std::sync::Arc<crate::hooks::HookRunner>> = if config.hooks.enabled {
424 Some(std::sync::Arc::new(crate::hooks::HookRunner::new()))
425 } else {
426 None
427 };
428
429 let addr: SocketAddr = format!("{host}:{port}").parse()?;
430 let listener = tokio::net::TcpListener::bind(addr).await?;
431 let actual_port = listener.local_addr()?.port();
432 let display_addr = format!("{host}:{actual_port}");
433
434 let provider: Arc<dyn Provider> = Arc::from(providers::create_resilient_provider_with_options(
435 config.default_provider.as_deref().unwrap_or("openrouter"),
436 config.api_key.as_deref(),
437 config.api_url.as_deref(),
438 &config.reliability,
439 &providers::ProviderRuntimeOptions {
440 auth_profile_override: None,
441 provider_api_url: config.api_url.clone(),
442 construct_dir: config.config_path.parent().map(std::path::PathBuf::from),
443 secrets_encrypt: config.secrets.encrypt,
444 reasoning_enabled: config.runtime.reasoning_enabled,
445 reasoning_effort: config.runtime.reasoning_effort.clone(),
446 provider_timeout_secs: Some(config.provider_timeout_secs),
447 extra_headers: config.extra_headers.clone(),
448 api_path: config.api_path.clone(),
449 provider_max_tokens: config.provider_max_tokens,
450 },
451 )?);
452 let model = config
453 .default_model
454 .clone()
455 .unwrap_or_else(|| "anthropic/claude-sonnet-4".into());
456 let temperature = config.default_temperature;
457 let mem: Arc<dyn Memory> = Arc::from(memory::create_memory_with_storage_and_routes(
458 &config.memory,
459 &config.embedding_routes,
460 Some(&config.storage.provider.config),
461 &config.workspace_dir,
462 config.api_key.as_deref(),
463 )?);
464 let runtime: Arc<dyn runtime::RuntimeAdapter> =
465 Arc::from(runtime::create_runtime(&config.runtime)?);
466 let security = Arc::new(SecurityPolicy::from_config(
467 &config.autonomy,
468 &config.workspace_dir,
469 ));
470
471 let (composio_key, composio_entity_id) = if config.composio.enabled {
472 (
473 config.composio.api_key.as_deref(),
474 Some(config.composio.entity_id.as_str()),
475 )
476 } else {
477 (None, None)
478 };
479
480 let canvas_store = tools::canvas::global_store();
481
482 let (
483 mut tools_registry_raw,
484 delegate_handle_gw,
485 _reaction_handle_gw,
486 _channel_map_handle,
487 _ask_user_handle_gw,
488 _escalate_handle_gw,
489 ) = tools::all_tools_with_runtime(
490 Arc::new(config.clone()),
491 &security,
492 runtime,
493 Arc::clone(&mem),
494 composio_key,
495 composio_entity_id,
496 &config.browser,
497 &config.http_request,
498 &config.web_fetch,
499 &config.workspace_dir,
500 &config.agents,
501 config.api_key.as_deref(),
502 &config,
503 Some(canvas_store.clone()),
504 );
505
506 let gateway_mcp_config = {
511 let mut c = config.clone();
512 c = crate::agent::kumiho::inject_kumiho(c, false);
513 c = crate::agent::operator::inject_operator(c, false);
514 c.mcp
515 };
516 let mut mcp_registry_shared: Option<Arc<tools::McpRegistry>> = None;
517 if gateway_mcp_config.enabled && !gateway_mcp_config.servers.is_empty() {
518 tracing::info!(
519 "Gateway: initializing MCP client — {} server(s) configured",
520 gateway_mcp_config.servers.len()
521 );
522 match tools::McpRegistry::connect_all(&gateway_mcp_config.servers).await {
523 Ok(registry) => {
524 let registry = std::sync::Arc::new(registry);
525 mcp_registry_shared = Some(std::sync::Arc::clone(®istry));
526 if gateway_mcp_config.deferred_loading {
527 let operator_prefix =
528 format!("{}__", crate::agent::operator::OPERATOR_SERVER_NAME);
529 let kumiho_prefix = format!("{}__", crate::agent::kumiho::KUMIHO_SERVER_NAME);
530 let all_names = registry.tool_names();
531 let mut eager_count = 0usize;
532
533 for name in &all_names {
534 if name.starts_with(&operator_prefix) || name.starts_with(&kumiho_prefix) {
535 if let Some(def) = registry.get_tool_def(name).await {
536 let wrapper: std::sync::Arc<dyn tools::Tool> =
537 std::sync::Arc::new(tools::McpToolWrapper::new(
538 name.clone(),
539 def,
540 std::sync::Arc::clone(®istry),
541 ));
542 if let Some(ref handle) = delegate_handle_gw {
543 handle.write().push(std::sync::Arc::clone(&wrapper));
544 }
545 tools_registry_raw.push(Box::new(tools::ArcToolRef(wrapper)));
546 eager_count += 1;
547 }
548 }
549 }
550
551 let deferred_set = tools::DeferredMcpToolSet::from_registry_filtered(
552 std::sync::Arc::clone(®istry),
553 |name| {
554 !name.starts_with(&operator_prefix) && !name.starts_with(&kumiho_prefix)
555 },
556 )
557 .await;
558 tracing::info!(
559 "Gateway MCP hybrid: {} eager operator+kumiho tool(s), {} deferred stub(s) from {} server(s)",
560 eager_count,
561 deferred_set.len(),
562 registry.server_count()
563 );
564 let activated =
565 std::sync::Arc::new(std::sync::Mutex::new(tools::ActivatedToolSet::new()));
566 tools_registry_raw.push(Box::new(tools::ToolSearchTool::new(
567 deferred_set,
568 activated,
569 )));
570 } else {
571 let names = registry.tool_names();
572 let mut registered = 0usize;
573 for name in names {
574 if let Some(def) = registry.get_tool_def(&name).await {
575 let wrapper: std::sync::Arc<dyn tools::Tool> =
576 std::sync::Arc::new(tools::McpToolWrapper::new(
577 name,
578 def,
579 std::sync::Arc::clone(®istry),
580 ));
581 if let Some(ref handle) = delegate_handle_gw {
582 handle.write().push(std::sync::Arc::clone(&wrapper));
583 }
584 tools_registry_raw.push(Box::new(tools::ArcToolRef(wrapper)));
585 registered += 1;
586 }
587 }
588 tracing::info!(
589 "Gateway MCP: {} tool(s) registered from {} server(s)",
590 registered,
591 registry.server_count()
592 );
593 }
594 }
595 Err(e) => {
596 tracing::error!("Gateway MCP registry failed to initialize: {e:#}");
597 }
598 }
599 }
600
601 let tools_registry: Arc<Vec<ToolSpec>> =
602 Arc::new(tools_registry_raw.iter().map(|t| t.spec()).collect());
603
604 let cost_tracker = CostTracker::get_or_init_global(config.cost.clone(), &config.workspace_dir);
606
607 let audit_logger = if config.security.audit.enabled {
609 match crate::security::audit::AuditLogger::new(
610 config.security.audit.clone(),
611 std::path::PathBuf::from(&config.workspace_dir),
612 ) {
613 Ok(logger) => Some(Arc::new(logger)),
614 Err(e) => {
615 tracing::warn!("Audit logger init failed (dashboard audit viewer disabled): {e}");
616 None
617 }
618 }
619 } else {
620 None
621 };
622
623 let (event_tx, _event_rx) = tokio::sync::broadcast::channel::<serde_json::Value>(4096);
625 let webhook_secret_hash: Option<Arc<str>> =
627 config.channels_config.webhook.as_ref().and_then(|webhook| {
628 webhook.secret.as_ref().and_then(|raw_secret| {
629 let trimmed_secret = raw_secret.trim();
630 (!trimmed_secret.is_empty())
631 .then(|| Arc::<str>::from(hash_webhook_secret(trimmed_secret)))
632 })
633 });
634
635 let whatsapp_channel: Option<Arc<WhatsAppChannel>> = config
637 .channels_config
638 .whatsapp
639 .as_ref()
640 .filter(|wa| wa.is_cloud_config())
641 .map(|wa| {
642 Arc::new(WhatsAppChannel::new(
643 wa.access_token.clone().unwrap_or_default(),
644 wa.phone_number_id.clone().unwrap_or_default(),
645 wa.verify_token.clone().unwrap_or_default(),
646 wa.allowed_numbers.clone(),
647 ))
648 });
649
650 let whatsapp_app_secret: Option<Arc<str>> = std::env::var("CONSTRUCT_WHATSAPP_APP_SECRET")
653 .ok()
654 .and_then(|secret| {
655 let secret = secret.trim();
656 (!secret.is_empty()).then(|| secret.to_owned())
657 })
658 .or_else(|| {
659 config.channels_config.whatsapp.as_ref().and_then(|wa| {
660 wa.app_secret
661 .as_deref()
662 .map(str::trim)
663 .filter(|secret| !secret.is_empty())
664 .map(ToOwned::to_owned)
665 })
666 })
667 .map(Arc::from);
668
669 let linq_channel: Option<Arc<LinqChannel>> = config.channels_config.linq.as_ref().map(|lq| {
671 Arc::new(LinqChannel::new(
672 lq.api_token.clone(),
673 lq.from_phone.clone(),
674 lq.allowed_senders.clone(),
675 ))
676 });
677
678 let linq_signing_secret: Option<Arc<str>> = std::env::var("CONSTRUCT_LINQ_SIGNING_SECRET")
681 .ok()
682 .and_then(|secret| {
683 let secret = secret.trim();
684 (!secret.is_empty()).then(|| secret.to_owned())
685 })
686 .or_else(|| {
687 config.channels_config.linq.as_ref().and_then(|lq| {
688 lq.signing_secret
689 .as_deref()
690 .map(str::trim)
691 .filter(|secret| !secret.is_empty())
692 .map(ToOwned::to_owned)
693 })
694 })
695 .map(Arc::from);
696
697 let wati_channel: Option<Arc<WatiChannel>> =
699 config.channels_config.wati.as_ref().map(|wati_cfg| {
700 Arc::new(
701 WatiChannel::new(
702 wati_cfg.api_token.clone(),
703 wati_cfg.api_url.clone(),
704 wati_cfg.tenant_id.clone(),
705 wati_cfg.allowed_numbers.clone(),
706 )
707 .with_transcription(config.transcription.clone()),
708 )
709 });
710
711 let nextcloud_talk_channel: Option<Arc<NextcloudTalkChannel>> =
713 config.channels_config.nextcloud_talk.as_ref().map(|nc| {
714 Arc::new(NextcloudTalkChannel::new(
715 nc.base_url.clone(),
716 nc.app_token.clone(),
717 nc.bot_name.clone().unwrap_or_default(),
718 nc.allowed_users.clone(),
719 ))
720 });
721
722 let nextcloud_talk_webhook_secret: Option<Arc<str>> =
725 std::env::var("CONSTRUCT_NEXTCLOUD_TALK_WEBHOOK_SECRET")
726 .ok()
727 .and_then(|secret| {
728 let secret = secret.trim();
729 (!secret.is_empty()).then(|| secret.to_owned())
730 })
731 .or_else(|| {
732 config
733 .channels_config
734 .nextcloud_talk
735 .as_ref()
736 .and_then(|nc| {
737 nc.webhook_secret
738 .as_deref()
739 .map(str::trim)
740 .filter(|secret| !secret.is_empty())
741 .map(ToOwned::to_owned)
742 })
743 })
744 .map(Arc::from);
745
746 let gmail_push_channel: Option<Arc<GmailPushChannel>> = config
748 .channels_config
749 .gmail_push
750 .as_ref()
751 .filter(|gp| gp.enabled)
752 .map(|gp| Arc::new(GmailPushChannel::new(gp.clone())));
753
754 let session_backend: Option<Arc<dyn SessionBackend>> = if config.gateway.session_persistence {
756 match SqliteSessionBackend::new(&config.workspace_dir) {
757 Ok(b) => {
758 tracing::info!("Gateway session persistence enabled (SQLite)");
759 if config.gateway.session_ttl_hours > 0 {
760 if let Ok(cleaned) = b.cleanup_stale(config.gateway.session_ttl_hours) {
761 if cleaned > 0 {
762 tracing::info!("Cleaned up {cleaned} stale gateway sessions");
763 }
764 }
765 }
766 Some(Arc::new(b))
767 }
768 Err(e) => {
769 tracing::warn!("Session persistence disabled: {e}");
770 None
771 }
772 }
773 } else {
774 None
775 };
776
777 let pairing = Arc::new(PairingGuard::new(
779 config.gateway.require_pairing,
780 &config.gateway.paired_tokens,
781 ));
782 let rate_limit_max_keys = normalize_max_keys(
783 config.gateway.rate_limit_max_keys,
784 RATE_LIMIT_MAX_KEYS_DEFAULT,
785 );
786 let rate_limiter = Arc::new(GatewayRateLimiter::new(
787 config.gateway.pair_rate_limit_per_minute,
788 config.gateway.webhook_rate_limit_per_minute,
789 rate_limit_max_keys,
790 ));
791 let idempotency_max_keys = normalize_max_keys(
792 config.gateway.idempotency_max_keys,
793 IDEMPOTENCY_MAX_KEYS_DEFAULT,
794 );
795 let idempotency_store = Arc::new(IdempotencyStore::new(
796 Duration::from_secs(config.gateway.idempotency_ttl_secs.max(1)),
797 idempotency_max_keys,
798 ));
799
800 let path_prefix: Option<&str> = config
802 .gateway
803 .path_prefix
804 .as_deref()
805 .filter(|p| !p.is_empty());
806
807 let tunnel = crate::tunnel::create_tunnel(&config.tunnel)?;
809 let mut tunnel_url: Option<String> = None;
810
811 if let Some(ref tun) = tunnel {
812 println!("🔗 Starting {} tunnel...", tun.name());
813 match tun.start(host, actual_port).await {
814 Ok(url) => {
815 println!("🌐 Tunnel active: {url}");
816 tunnel_url = Some(url);
817 }
818 Err(e) => {
819 println!("⚠️ Tunnel failed to start: {e}");
820 println!(" Falling back to local-only mode.");
821 }
822 }
823 }
824
825 let pfx = path_prefix.unwrap_or("");
826 println!("🦀 Construct Gateway listening on http://{display_addr}{pfx}");
827 if let Some(ref url) = tunnel_url {
828 println!(" 🌐 Public URL: {url}");
829 }
830 println!(" 🌐 Web Dashboard: http://{display_addr}{pfx}/");
831 if let Some(code) = pairing.pairing_code() {
832 println!();
833 println!(" 🔐 PAIRING REQUIRED — use this one-time code:");
834 println!(" ┌──────────────┐");
835 println!(" │ {code} │");
836 println!(" └──────────────┘");
837 println!(" Send: POST {pfx}/pair with header X-Pairing-Code: {code}");
838 } else if pairing.require_pairing() {
839 println!(" 🔒 Pairing: ACTIVE (bearer token required)");
840 println!(" To pair a new device: construct gateway get-paircode --new");
841 println!();
842 } else {
843 println!(" ⚠️ Pairing: DISABLED (all requests accepted)");
844 println!();
845 }
846 println!(" POST {pfx}/pair — pair a new client (X-Pairing-Code header)");
847 println!(" POST {pfx}/webhook — {{\"message\": \"your prompt\"}}");
848 if whatsapp_channel.is_some() {
849 println!(" GET {pfx}/whatsapp — Meta webhook verification");
850 println!(" POST {pfx}/whatsapp — WhatsApp message webhook");
851 }
852 if linq_channel.is_some() {
853 println!(" POST {pfx}/linq — Linq message webhook (iMessage/RCS/SMS)");
854 }
855 if wati_channel.is_some() {
856 println!(" GET {pfx}/wati — WATI webhook verification");
857 println!(" POST {pfx}/wati — WATI message webhook");
858 }
859 if nextcloud_talk_channel.is_some() {
860 println!(" POST {pfx}/nextcloud-talk — Nextcloud Talk bot webhook");
861 }
862 println!(" GET {pfx}/api/* — REST API (bearer token required)");
863 println!(" GET {pfx}/ws/chat — WebSocket agent chat");
864 if config.nodes.enabled {
865 println!(" GET {pfx}/ws/nodes — WebSocket node discovery");
866 }
867 println!(" GET {pfx}/health — health check");
868 println!(" GET {pfx}/metrics — Prometheus metrics");
869 println!(" Press Ctrl+C to stop.\n");
870
871 crate::health::mark_component_ok("gateway");
872
873 if let Some(ref hooks) = hooks {
875 hooks.fire_gateway_start(host, actual_port).await;
876 }
877
878 let broadcast_observer: Arc<dyn crate::observability::Observer> =
880 Arc::new(sse::BroadcastObserver::new(
881 crate::observability::create_observer(&config.observability),
882 event_tx.clone(),
883 ));
884
885 let (shutdown_tx, mut shutdown_rx) = tokio::sync::watch::channel(false);
886
887 let node_registry = Arc::new(nodes::NodeRegistry::new(config.nodes.max_nodes));
889
890 let device_registry = if config.gateway.require_pairing {
892 Some(Arc::new(api_pairing::DeviceRegistry::new(
893 &config.workspace_dir,
894 )?))
895 } else {
896 None
897 };
898 let pending_pairings = if config.gateway.require_pairing {
899 Some(Arc::new(api_pairing::PairingStore::new(
900 config.gateway.pairing_dashboard.max_pending_codes,
901 )))
902 } else {
903 None
904 };
905
906 let mcp_workspace_dir = config.workspace_dir.clone();
920 let mcp_config_snapshot = config.clone();
921 let mcp_runtime_handles = {
922 let mut rh = crate::mcp_server::RuntimeHandles::empty();
923
924 if config.workspace.enabled {
926 let workspaces_dir = if config.workspace.workspaces_dir.starts_with("~/") {
927 let home = directories::UserDirs::new()
928 .map(|u| u.home_dir().to_path_buf())
929 .unwrap_or_else(|| std::path::PathBuf::from("."));
930 home.join(&config.workspace.workspaces_dir[2..])
931 } else {
932 std::path::PathBuf::from(&config.workspace.workspaces_dir)
933 };
934 let manager = crate::config::workspace::WorkspaceManager::new(workspaces_dir);
935 rh.workspace_manager = Some(Arc::new(tokio::sync::RwLock::new(manager)));
936 }
937
938 if let Ok(store) = crate::channels::session_store::SessionStore::new(&config.workspace_dir)
940 {
941 let backend: Arc<dyn crate::channels::session_backend::SessionBackend> =
942 Arc::new(store);
943 rh.session_store = Some(backend);
944 }
945
946 if !config.agents.is_empty() {
951 rh.agent_config = Some(Arc::new(config.agents.clone()));
952 rh.fallback_api_key = config.api_key.as_deref().map(Arc::<str>::from);
953 rh.provider_runtime_options =
954 Some(Arc::new(crate::providers::ProviderRuntimeOptions {
955 auth_profile_override: None,
956 provider_api_url: config.api_url.clone(),
957 construct_dir: config.config_path.parent().map(std::path::PathBuf::from),
958 secrets_encrypt: config.secrets.encrypt,
959 reasoning_enabled: config.runtime.reasoning_enabled,
960 reasoning_effort: config.runtime.reasoning_effort.clone(),
961 provider_timeout_secs: Some(config.provider_timeout_secs),
962 extra_headers: config.extra_headers.clone(),
963 api_path: config.api_path.clone(),
964 provider_max_tokens: config.provider_max_tokens,
965 }));
966 }
967
968 let (mcp_tools_boxed, _d_h, _r_h, _c_h, _a_h, _e_h) = tools::all_tools_with_runtime(
975 Arc::new(config.clone()),
976 &security,
977 Arc::new(crate::runtime::NativeRuntime::new()),
978 Arc::clone(&mem),
979 composio_key,
980 composio_entity_id,
981 &config.browser,
982 &config.http_request,
983 &config.web_fetch,
984 &config.workspace_dir,
985 &config.agents,
986 config.api_key.as_deref(),
987 &config,
988 Some(canvas_store.clone()),
989 );
990 let mcp_tool_arcs: Vec<Arc<dyn tools::Tool>> = mcp_tools_boxed
992 .into_iter()
993 .map(|b| Arc::<dyn tools::Tool>::from(b))
994 .collect();
995 rh.pre_built_tools = Some(mcp_tool_arcs);
996
997 rh
998 };
999
1000 let mut state = AppState {
1001 config: config_state,
1002 provider,
1003 model,
1004 temperature,
1005 mem,
1006 auto_save: config.memory.auto_save,
1007 webhook_secret_hash,
1008 pairing,
1009 trust_forwarded_headers: config.gateway.trust_forwarded_headers,
1010 rate_limiter,
1011 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
1012 idempotency_store,
1013 whatsapp: whatsapp_channel,
1014 whatsapp_app_secret,
1015 linq: linq_channel,
1016 linq_signing_secret,
1017 nextcloud_talk: nextcloud_talk_channel,
1018 nextcloud_talk_webhook_secret,
1019 wati: wati_channel,
1020 gmail_push: gmail_push_channel,
1021 observer: broadcast_observer,
1022 tools_registry,
1023 cost_tracker,
1024 audit_logger,
1025 event_tx,
1026 shutdown_tx,
1027 node_registry,
1028 session_backend,
1029 session_queue: Arc::new(session_queue::SessionActorQueue::new(8, 30, 600)),
1030 device_registry,
1031 pending_pairings,
1032 path_prefix: path_prefix.unwrap_or("").to_string(),
1033 canvas_store,
1034 mcp_registry: mcp_registry_shared,
1035 approval_registry: approval_registry::global(),
1036 mcp_local_url: None,
1038 #[cfg(feature = "webauthn")]
1039 webauthn: if config.security.webauthn.enabled {
1040 let secret_store = Arc::new(crate::security::SecretStore::new(
1041 &config.workspace_dir,
1042 true,
1043 ));
1044 let wa_config = crate::security::webauthn::WebAuthnConfig {
1045 enabled: true,
1046 rp_id: config.security.webauthn.rp_id.clone(),
1047 rp_origin: config.security.webauthn.rp_origin.clone(),
1048 rp_name: config.security.webauthn.rp_name.clone(),
1049 };
1050 Some(Arc::new(api_webauthn::WebAuthnState {
1051 manager: crate::security::webauthn::WebAuthnManager::new(
1052 wa_config,
1053 secret_store,
1054 &config.workspace_dir,
1055 ),
1056 pending_registrations: parking_lot::Mutex::new(std::collections::HashMap::new()),
1057 pending_authentications: parking_lot::Mutex::new(std::collections::HashMap::new()),
1058 }))
1059 } else {
1060 None
1061 },
1062 };
1063
1064 let mcp_shutdown_watch = state.shutdown_tx.subscribe();
1071 let mcp_task: Option<tokio::task::JoinHandle<()>> = {
1072 let (mcp_state, mcp_skipped) = crate::mcp_server::server::state_from_runtime(
1073 &mcp_workspace_dir,
1074 &mcp_config_snapshot,
1075 &mcp_runtime_handles,
1076 );
1077 for (name, reason) in &mcp_skipped {
1078 tracing::info!("mcp-server: skipped tool `{name}` — {reason}");
1079 }
1080 tracing::info!(
1081 "mcp-server: advertising {} tools to external MCP clients",
1082 mcp_state.tools.len()
1083 );
1084
1085 match crate::mcp_server::serve_on(SocketAddr::from(([127, 0, 0, 1], 0)), mcp_state).await {
1086 Ok(handle) => {
1087 let url = format!("http://{}/mcp", handle.addr);
1088 if let Err(e) = crate::mcp_server::server::write_discovery_file(&url) {
1089 tracing::warn!("mcp-server: failed to write discovery file: {e}");
1090 } else {
1091 tracing::info!(
1092 "mcp-server: listening on {url} (discovery at ~/.construct/mcp.json)"
1093 );
1094 }
1095
1096 state.mcp_local_url = Some(Arc::from(format!("http://{}", handle.addr)));
1099
1100 let mut watch = mcp_shutdown_watch;
1101 Some(tokio::spawn(async move {
1102 loop {
1104 if *watch.borrow() {
1105 break;
1106 }
1107 if watch.changed().await.is_err() {
1108 break;
1109 }
1110 }
1111 let _ = handle.shutdown.send(());
1112 let _ = handle.joined.await;
1113 crate::mcp_server::server::cleanup_discovery_file();
1114 tracing::info!("mcp-server: stopped");
1115 }))
1116 }
1117 Err(e) => {
1118 tracing::error!("mcp-server: failed to bind: {e}");
1119 None
1120 }
1121 }
1122 };
1123
1124 let config_put_router = Router::new()
1126 .route("/api/config", put(api::handle_api_config_put))
1127 .layer(RequestBodyLimitLayer::new(1_048_576));
1128
1129 let memory_graph_router = Router::new()
1132 .route(
1133 "/api/memory/graph",
1134 get(api_memory_graph::handle_memory_graph),
1135 )
1136 .with_state(state.clone())
1137 .layer(RequestBodyLimitLayer::new(MAX_BODY_SIZE))
1138 .layer(TimeoutLayer::with_status_code(
1139 StatusCode::REQUEST_TIMEOUT,
1140 Duration::from_secs(60),
1141 ));
1142
1143 let inner = Router::new()
1145 .route("/admin/shutdown", post(handle_admin_shutdown))
1147 .route("/admin/paircode", get(handle_admin_paircode))
1148 .route("/admin/paircode/new", post(handle_admin_paircode_new))
1149 .route("/health", get(handle_health))
1151 .route("/metrics", get(handle_metrics))
1152 .route("/pair", post(handle_pair))
1153 .route("/pair/code", get(handle_pair_code))
1154 .route("/webhook", post(handle_webhook))
1155 .route("/whatsapp", get(handle_whatsapp_verify))
1156 .route("/whatsapp", post(handle_whatsapp_message))
1157 .route("/linq", post(handle_linq_webhook))
1158 .route("/wati", get(handle_wati_verify))
1159 .route("/wati", post(handle_wati_webhook))
1160 .route("/nextcloud-talk", post(handle_nextcloud_talk_webhook))
1161 .route("/webhook/gmail", post(handle_gmail_push_webhook))
1162 .route("/hooks/claude-code", post(api::handle_claude_code_hook))
1164 .route("/api/status", get(api::handle_api_status))
1166 .route("/api/config", get(api::handle_api_config_get))
1167 .route("/api/tools", get(api::handle_api_tools))
1168 .route("/api/cron", get(api::handle_api_cron_list))
1169 .route("/api/cron", post(api::handle_api_cron_add))
1170 .route(
1171 "/api/cron/settings",
1172 get(api::handle_api_cron_settings_get).patch(api::handle_api_cron_settings_patch),
1173 )
1174 .route(
1175 "/api/cron/{id}",
1176 delete(api::handle_api_cron_delete).patch(api::handle_api_cron_patch),
1177 )
1178 .route("/api/cron/{id}/runs", get(api::handle_api_cron_runs))
1179 .route("/api/integrations", get(api::handle_api_integrations))
1180 .route(
1181 "/api/integrations/settings",
1182 get(api::handle_api_integrations_settings),
1183 )
1184 .route(
1185 "/api/doctor",
1186 get(api::handle_api_doctor).post(api::handle_api_doctor),
1187 )
1188 .route("/api/cost", get(api::handle_api_cost))
1190 .route("/api/audit", get(api::handle_api_audit))
1191 .route("/api/audit/verify", get(api::handle_api_audit_verify))
1192 .route("/api/cli-tools", get(api::handle_api_cli_tools))
1193 .route("/api/health", get(api::handle_api_health))
1194 .route("/api/mcp/discovery", get(api_mcp::handle_api_mcp_discovery))
1195 .route("/api/mcp/servers/test", post(api_mcp::handle_api_mcp_servers_test))
1196 .route("/api/mcp/health", get(api_mcp::handle_api_mcp_health))
1198 .route("/api/mcp/session", post(api_mcp::handle_api_mcp_session_create))
1199 .route(
1200 "/api/mcp/session/{session_id}/events",
1201 get(api_mcp::handle_api_mcp_session_events),
1202 )
1203 .route("/api/mcp/call", post(api_mcp::handle_api_mcp_call))
1204 .route("/api/nodes", get(api::handle_api_nodes))
1205 .route("/api/nodes/{node_id}/invoke", post(api::handle_api_node_invoke))
1206 .route("/api/sessions", get(api::handle_api_sessions_list))
1207 .route("/api/sessions/running", get(api::handle_api_sessions_running))
1208 .route(
1209 "/api/sessions/{id}/messages",
1210 get(api::handle_api_session_messages),
1211 )
1212 .route("/api/sessions/{id}", delete(api::handle_api_session_delete).put(api::handle_api_session_rename))
1213 .route("/api/sessions/{id}/state", get(api::handle_api_session_state))
1214 .route("/api/channels", get(api::handle_api_channels))
1216 .route("/api/channel-events", post(api::handle_api_channel_events))
1217 .route("/api/agents", get(api_agents::handle_list_agents).post(api_agents::handle_create_agent))
1219 .route("/api/agents/deprecate", post(api_agents::handle_deprecate_agent))
1220 .route("/api/agents/{*kref}", put(api_agents::handle_update_agent).delete(api_agents::handle_delete_agent))
1221 .route("/api/skills", get(api_skills::handle_list_skills).post(api_skills::handle_create_skill))
1223 .route("/api/skills/deprecate", post(api_skills::handle_deprecate_skill))
1224 .route("/api/skills/{*kref}", get(api_skills::handle_get_skill).put(api_skills::handle_update_skill).delete(api_skills::handle_delete_skill))
1225 .route("/api/teams", get(api_teams::handle_list_teams).post(api_teams::handle_create_team))
1227 .route("/api/teams/deprecate", post(api_teams::handle_deprecate_team))
1228 .route("/api/teams/{*kref}", get(api_teams::handle_get_team).put(api_teams::handle_update_team).delete(api_teams::handle_delete_team))
1229 .route("/api/workflows", get(api_workflows::handle_list_workflows).post(api_workflows::handle_create_workflow))
1231 .route("/api/workflows/deprecate", post(api_workflows::handle_deprecate_workflow))
1232 .route("/api/workflows/run/{name}", post(api_workflows::handle_run_workflow))
1233 .route("/api/workflows/revisions/{*kref}", get(api_workflows::handle_get_workflow_by_revision))
1234 .route("/api/workflows/{*kref}", put(api_workflows::handle_update_workflow).delete(api_workflows::handle_delete_workflow))
1235 .route("/api/workflows/runs", get(api_workflows::handle_list_workflow_runs))
1236 .route("/api/workflows/runs/{run_id}", get(api_workflows::handle_get_workflow_run).delete(api_workflows::handle_delete_workflow_run))
1237 .route("/api/workflows/runs/{run_id}/approve", post(api_workflows::handle_approve_workflow_run))
1238 .route("/api/workflows/runs/{run_id}/retry", post(api_workflows::handle_retry_workflow_run))
1239 .route("/api/workflows/agent-activity/{agent_id}", get(api_workflows::handle_agent_activity))
1240 .route("/api/workflows/dashboard", get(api_workflows::handle_workflow_dashboard))
1241 .route("/api/clawhub/search", get(api_clawhub::handle_clawhub_search))
1243 .route("/api/clawhub/trending", get(api_clawhub::handle_clawhub_trending))
1244 .route("/api/clawhub/skills/{slug}", get(api_clawhub::handle_clawhub_skill_detail))
1245 .route("/api/clawhub/install/{slug}", post(api_clawhub::handle_clawhub_install))
1246 .route("/api/kumiho/{*path}", get(api_kumiho_proxy::handle_kumiho_proxy))
1249 .route("/api/artifact-body", get(api_artifact_body::handle_artifact_body))
1251 .route("/api/pairing/initiate", post(api_pairing::initiate_pairing))
1253 .route("/api/pair", post(api_pairing::submit_pairing_enhanced))
1254 .route("/api/devices", get(api_pairing::list_devices))
1255 .route("/api/devices/{id}", delete(api_pairing::revoke_device))
1256 .route(
1257 "/api/devices/{id}/token/rotate",
1258 post(api_pairing::rotate_token),
1259 )
1260 .route("/api/canvas", get(canvas::handle_canvas_list))
1262 .route(
1263 "/api/canvas/{id}",
1264 get(canvas::handle_canvas_get)
1265 .post(canvas::handle_canvas_post)
1266 .delete(canvas::handle_canvas_clear),
1267 )
1268 .route(
1269 "/api/canvas/{id}/history",
1270 get(canvas::handle_canvas_history),
1271 );
1272
1273 #[cfg(feature = "webauthn")]
1275 let inner = inner
1276 .route(
1277 "/api/webauthn/register/start",
1278 post(api_webauthn::handle_register_start),
1279 )
1280 .route(
1281 "/api/webauthn/register/finish",
1282 post(api_webauthn::handle_register_finish),
1283 )
1284 .route(
1285 "/api/webauthn/auth/start",
1286 post(api_webauthn::handle_auth_start),
1287 )
1288 .route(
1289 "/api/webauthn/auth/finish",
1290 post(api_webauthn::handle_auth_finish),
1291 )
1292 .route(
1293 "/api/webauthn/credentials",
1294 get(api_webauthn::handle_list_credentials),
1295 )
1296 .route(
1297 "/api/webauthn/credentials/{id}",
1298 delete(api_webauthn::handle_delete_credential),
1299 );
1300
1301 #[cfg(feature = "plugins-wasm")]
1303 let inner = inner.route(
1304 "/api/plugins",
1305 get(api_plugins::plugin_routes::list_plugins),
1306 );
1307
1308 let inner = inner
1309 .route("/api/events", get(sse::handle_sse_events))
1311 .route("/api/daemon/logs", get(sse::handle_api_daemon_logs))
1312 .route("/ws/chat", get(ws::handle_ws_chat))
1314 .route("/ws/canvas/{id}", get(canvas::handle_ws_canvas))
1316 .route("/ws/nodes", get(nodes::handle_ws_nodes))
1318 .route("/ws/terminal", get(terminal::handle_ws_terminal))
1320 .route("/ws/mcp/events", get(ws_mcp_events::handle_ws_mcp_events))
1322 .route("/_app/{*path}", get(static_files::handle_static))
1324 .merge(config_put_router)
1326 .fallback(get(static_files::handle_spa_fallback))
1328 .with_state(state)
1329 .layer(RequestBodyLimitLayer::new(MAX_BODY_SIZE))
1330 .layer(TimeoutLayer::with_status_code(
1331 StatusCode::REQUEST_TIMEOUT,
1332 Duration::from_secs(gateway_request_timeout_secs()),
1333 ));
1334
1335 let inner = inner.merge(memory_graph_router);
1337
1338 let app = if let Some(prefix) = path_prefix {
1342 let redirect_target = prefix.to_string();
1343 Router::new().nest(prefix, inner).route(
1344 &format!("{prefix}/"),
1345 get(|| async move { axum::response::Redirect::permanent(&redirect_target) }),
1346 )
1347 } else {
1348 inner
1349 };
1350
1351 let tls_acceptor = match &config.gateway.tls {
1353 Some(tls_cfg) if tls_cfg.enabled => {
1354 let has_mtls = tls_cfg.client_auth.as_ref().is_some_and(|ca| ca.enabled);
1355 if has_mtls {
1356 tracing::info!("TLS enabled with mutual TLS (mTLS) client verification");
1357 } else {
1358 tracing::info!("TLS enabled (no client certificate requirement)");
1359 }
1360 Some(tls::build_tls_acceptor(tls_cfg)?)
1361 }
1362 _ => None,
1363 };
1364
1365 if let Some(tls_acceptor) = tls_acceptor {
1366 let app = app.into_make_service_with_connect_info::<SocketAddr>();
1368 let mut app = app;
1369
1370 let mut shutdown_signal = shutdown_rx;
1371 loop {
1372 tokio::select! {
1373 conn = listener.accept() => {
1374 let (tcp_stream, remote_addr) = conn?;
1375 let tls_acceptor = tls_acceptor.clone();
1376 let svc = tower::MakeService::<
1377 SocketAddr,
1378 hyper::Request<hyper::body::Incoming>,
1379 >::make_service(&mut app, remote_addr)
1380 .await
1381 .expect("infallible make_service");
1382
1383 tokio::spawn(async move {
1384 let tls_stream = match tls_acceptor.accept(tcp_stream).await {
1385 Ok(s) => s,
1386 Err(e) => {
1387 tracing::debug!("TLS handshake failed from {remote_addr}: {e}");
1388 return;
1389 }
1390 };
1391 let io = hyper_util::rt::TokioIo::new(tls_stream);
1392 let hyper_svc = hyper::service::service_fn(move |req: hyper::Request<hyper::body::Incoming>| {
1393 let mut svc = svc.clone();
1394 async move {
1395 tower::Service::call(&mut svc, req).await
1396 }
1397 });
1398 if let Err(e) = hyper_util::server::conn::auto::Builder::new(
1399 hyper_util::rt::TokioExecutor::new(),
1400 )
1401 .serve_connection(io, hyper_svc)
1402 .await
1403 {
1404 tracing::debug!("connection error from {remote_addr}: {e}");
1405 }
1406 });
1407 }
1408 _ = shutdown_signal.changed() => {
1409 tracing::info!("🦀 Construct Gateway shutting down...");
1410 break;
1411 }
1412 }
1413 }
1414 } else {
1415 axum::serve(
1417 listener,
1418 app.into_make_service_with_connect_info::<SocketAddr>(),
1419 )
1420 .with_graceful_shutdown(async move {
1421 let _ = shutdown_rx.changed().await;
1422 tracing::info!("🦀 Construct Gateway shutting down...");
1423 })
1424 .await?;
1425 }
1426
1427 if let Some(task) = mcp_task {
1430 let _ = task.await;
1431 }
1432
1433 Ok(())
1434}
1435
1436async fn handle_health(State(state): State<AppState>) -> impl IntoResponse {
1442 let body = serde_json::json!({
1443 "status": "ok",
1444 "paired": state.pairing.is_paired(),
1445 "require_pairing": state.pairing.require_pairing(),
1446 "runtime": crate::health::snapshot_json(),
1447 });
1448 Json(body)
1449}
1450
1451const PROMETHEUS_CONTENT_TYPE: &str = "text/plain; version=0.0.4; charset=utf-8";
1453
1454fn prometheus_disabled_hint() -> String {
1455 String::from(
1456 "# Prometheus backend not enabled. Set [observability] backend = \"prometheus\" in config.\n",
1457 )
1458}
1459
1460#[cfg(feature = "observability-prometheus")]
1461fn prometheus_observer_from_state(
1462 observer: &dyn crate::observability::Observer,
1463) -> Option<&crate::observability::PrometheusObserver> {
1464 observer
1465 .as_any()
1466 .downcast_ref::<crate::observability::PrometheusObserver>()
1467 .or_else(|| {
1468 observer
1469 .as_any()
1470 .downcast_ref::<sse::BroadcastObserver>()
1471 .and_then(|broadcast| {
1472 broadcast
1473 .inner()
1474 .as_any()
1475 .downcast_ref::<crate::observability::PrometheusObserver>()
1476 })
1477 })
1478}
1479
1480async fn handle_metrics(State(state): State<AppState>) -> impl IntoResponse {
1482 let body = {
1483 #[cfg(feature = "observability-prometheus")]
1484 {
1485 if let Some(prom) = prometheus_observer_from_state(state.observer.as_ref()) {
1486 prom.encode()
1487 } else {
1488 prometheus_disabled_hint()
1489 }
1490 }
1491 #[cfg(not(feature = "observability-prometheus"))]
1492 {
1493 let _ = &state;
1494 prometheus_disabled_hint()
1495 }
1496 };
1497
1498 (
1499 StatusCode::OK,
1500 [(header::CONTENT_TYPE, PROMETHEUS_CONTENT_TYPE)],
1501 body,
1502 )
1503}
1504
1505#[axum::debug_handler]
1507async fn handle_pair(
1508 State(state): State<AppState>,
1509 ConnectInfo(peer_addr): ConnectInfo<SocketAddr>,
1510 headers: HeaderMap,
1511) -> impl IntoResponse {
1512 let rate_key =
1513 client_key_from_request(Some(peer_addr), &headers, state.trust_forwarded_headers);
1514 let peer_is_loopback = peer_addr.ip().is_loopback();
1515 if !state.rate_limiter.allow_pair(&rate_key) {
1516 tracing::warn!("/pair rate limit exceeded");
1517 let err = serde_json::json!({
1518 "error": "Too many pairing requests. Please retry later.",
1519 "retry_after": RATE_LIMIT_WINDOW_SECS,
1520 });
1521 return (StatusCode::TOO_MANY_REQUESTS, Json(err));
1522 }
1523
1524 if let Err(e) = state
1526 .auth_limiter
1527 .check_rate_limit(&rate_key, peer_is_loopback)
1528 {
1529 tracing::warn!("🔐 Pairing auth rate limit exceeded for {rate_key}");
1530 let err = serde_json::json!({
1531 "error": format!("Too many auth attempts. Try again in {}s.", e.retry_after_secs),
1532 "retry_after": e.retry_after_secs,
1533 });
1534 return (StatusCode::TOO_MANY_REQUESTS, Json(err));
1535 }
1536
1537 let code = headers
1538 .get("X-Pairing-Code")
1539 .and_then(|v| v.to_str().ok())
1540 .unwrap_or("");
1541
1542 match state.pairing.try_pair(code, &rate_key).await {
1543 Ok(Some(token)) => {
1544 tracing::info!("🔐 New client paired successfully");
1545 if let Some(ref logger) = state.audit_logger {
1546 let _ =
1547 logger.log_auth_success("gateway", &format!("Client paired from {rate_key}"));
1548 }
1549 if let Err(err) =
1550 Box::pin(persist_pairing_tokens(state.config.clone(), &state.pairing)).await
1551 {
1552 tracing::error!("🔐 Pairing succeeded but token persistence failed: {err:#}");
1553 let body = serde_json::json!({
1554 "paired": true,
1555 "persisted": false,
1556 "token": token,
1557 "message": "Paired for this process, but failed to persist token to config.toml. Check config path and write permissions.",
1558 });
1559 return (StatusCode::OK, Json(body));
1560 }
1561
1562 let body = serde_json::json!({
1563 "paired": true,
1564 "persisted": true,
1565 "token": token,
1566 "message": "Save this token — use it as Authorization: Bearer <token>"
1567 });
1568 (StatusCode::OK, Json(body))
1569 }
1570 Ok(None) => {
1571 state
1572 .auth_limiter
1573 .record_attempt(&rate_key, peer_is_loopback);
1574 tracing::warn!("🔐 Pairing attempt with invalid code");
1575 if let Some(ref logger) = state.audit_logger {
1576 let _ = logger
1577 .log_auth_failure("gateway", &format!("Invalid pairing code from {rate_key}"));
1578 }
1579 let err = serde_json::json!({"error": "Invalid pairing code"});
1580 (StatusCode::FORBIDDEN, Json(err))
1581 }
1582 Err(lockout_secs) => {
1583 tracing::warn!(
1584 "🔐 Pairing locked out — too many failed attempts ({lockout_secs}s remaining)"
1585 );
1586 if let Some(ref logger) = state.audit_logger {
1587 let _ = logger.log_auth_failure(
1588 "gateway",
1589 &format!("Pairing lockout for {rate_key} ({lockout_secs}s)"),
1590 );
1591 }
1592 let err = serde_json::json!({
1593 "error": format!("Too many failed attempts. Try again in {lockout_secs}s."),
1594 "retry_after": lockout_secs
1595 });
1596 (StatusCode::TOO_MANY_REQUESTS, Json(err))
1597 }
1598 }
1599}
1600
1601async fn persist_pairing_tokens(config: Arc<Mutex<Config>>, pairing: &PairingGuard) -> Result<()> {
1602 let paired_tokens = pairing.tokens();
1603 let mut updated_cfg = { config.lock().clone() };
1606 updated_cfg.gateway.paired_tokens = paired_tokens;
1607 updated_cfg
1608 .save()
1609 .await
1610 .context("Failed to persist paired tokens to config.toml")?;
1611
1612 *config.lock() = updated_cfg;
1614 Ok(())
1615}
1616
1617async fn run_gateway_chat_simple(state: &AppState, message: &str) -> anyhow::Result<String> {
1619 let user_messages = vec![ChatMessage::user(message)];
1620
1621 let system_prompt = {
1624 let config_guard = state.config.lock();
1625 crate::channels::build_system_prompt(
1626 &config_guard.workspace_dir,
1627 &state.model,
1628 &[], &[], Some(&config_guard.identity),
1631 None, )
1633 };
1634
1635 let mut messages = Vec::with_capacity(1 + user_messages.len());
1636 messages.push(ChatMessage::system(system_prompt));
1637 messages.extend(user_messages);
1638
1639 let multimodal_config = state.config.lock().multimodal.clone();
1640 let prepared =
1641 crate::multimodal::prepare_messages_for_provider(&messages, &multimodal_config).await?;
1642
1643 state
1644 .provider
1645 .chat_with_history(&prepared.messages, &state.model, state.temperature)
1646 .await
1647}
1648
1649async fn run_gateway_chat_with_tools(
1651 state: &AppState,
1652 message: &str,
1653 session_id: Option<&str>,
1654) -> anyhow::Result<String> {
1655 let config = state.config.lock().clone();
1656 Box::pin(crate::agent::process_message(config, message, session_id)).await
1657}
1658
1659#[derive(serde::Deserialize)]
1661pub struct WebhookBody {
1662 pub message: String,
1663}
1664
1665async fn handle_webhook(
1667 State(state): State<AppState>,
1668 ConnectInfo(peer_addr): ConnectInfo<SocketAddr>,
1669 headers: HeaderMap,
1670 body: Result<Json<WebhookBody>, axum::extract::rejection::JsonRejection>,
1671) -> impl IntoResponse {
1672 let rate_key =
1673 client_key_from_request(Some(peer_addr), &headers, state.trust_forwarded_headers);
1674 let peer_is_loopback = peer_addr.ip().is_loopback();
1675 if !state.rate_limiter.allow_webhook(&rate_key) {
1676 tracing::warn!("/webhook rate limit exceeded");
1677 let err = serde_json::json!({
1678 "error": "Too many webhook requests. Please retry later.",
1679 "retry_after": RATE_LIMIT_WINDOW_SECS,
1680 });
1681 return (StatusCode::TOO_MANY_REQUESTS, Json(err));
1682 }
1683
1684 if state.pairing.require_pairing() {
1686 if let Err(e) = state
1687 .auth_limiter
1688 .check_rate_limit(&rate_key, peer_is_loopback)
1689 {
1690 tracing::warn!("Webhook: auth rate limit exceeded for {rate_key}");
1691 let err = serde_json::json!({
1692 "error": format!("Too many auth attempts. Try again in {}s.", e.retry_after_secs),
1693 "retry_after": e.retry_after_secs,
1694 });
1695 return (StatusCode::TOO_MANY_REQUESTS, Json(err));
1696 }
1697 let auth = headers
1698 .get(header::AUTHORIZATION)
1699 .and_then(|v| v.to_str().ok())
1700 .unwrap_or("");
1701 let token = auth.strip_prefix("Bearer ").unwrap_or("");
1702 if !state.pairing.is_authenticated(token) {
1703 state
1704 .auth_limiter
1705 .record_attempt(&rate_key, peer_is_loopback);
1706 tracing::warn!("Webhook: rejected — not paired / invalid bearer token");
1707 let err = serde_json::json!({
1708 "error": "Unauthorized — pair first via POST /pair, then send Authorization: Bearer <token>"
1709 });
1710 return (StatusCode::UNAUTHORIZED, Json(err));
1711 }
1712 }
1713
1714 if let Some(ref secret_hash) = state.webhook_secret_hash {
1716 let header_hash = headers
1717 .get("X-Webhook-Secret")
1718 .and_then(|v| v.to_str().ok())
1719 .map(str::trim)
1720 .filter(|value| !value.is_empty())
1721 .map(hash_webhook_secret);
1722 match header_hash {
1723 Some(val) if constant_time_eq(&val, secret_hash.as_ref()) => {}
1724 _ => {
1725 tracing::warn!("Webhook: rejected request — invalid or missing X-Webhook-Secret");
1726 let err = serde_json::json!({"error": "Unauthorized — invalid or missing X-Webhook-Secret header"});
1727 return (StatusCode::UNAUTHORIZED, Json(err));
1728 }
1729 }
1730 }
1731
1732 let Json(webhook_body) = match body {
1734 Ok(b) => b,
1735 Err(e) => {
1736 tracing::warn!("Webhook JSON parse error: {e}");
1737 let err = serde_json::json!({
1738 "error": "Invalid JSON body. Expected: {\"message\": \"...\"}"
1739 });
1740 return (StatusCode::BAD_REQUEST, Json(err));
1741 }
1742 };
1743
1744 if let Some(idempotency_key) = headers
1746 .get("X-Idempotency-Key")
1747 .and_then(|v| v.to_str().ok())
1748 .map(str::trim)
1749 .filter(|value| !value.is_empty())
1750 {
1751 if !state.idempotency_store.record_if_new(idempotency_key) {
1752 tracing::info!("Webhook duplicate ignored (idempotency key: {idempotency_key})");
1753 let body = serde_json::json!({
1754 "status": "duplicate",
1755 "idempotent": true,
1756 "message": "Request already processed for this idempotency key"
1757 });
1758 return (StatusCode::OK, Json(body));
1759 }
1760 }
1761
1762 let message = &webhook_body.message;
1763 let session_id = webhook_session_id(&headers);
1764
1765 if state.auto_save && !memory::should_skip_autosave_content(message) {
1766 let key = webhook_memory_key();
1767 let _ = state
1768 .mem
1769 .store(
1770 &key,
1771 message,
1772 MemoryCategory::Conversation,
1773 session_id.as_deref(),
1774 )
1775 .await;
1776 }
1777
1778 let provider_label = state
1779 .config
1780 .lock()
1781 .default_provider
1782 .clone()
1783 .unwrap_or_else(|| "unknown".to_string());
1784 let model_label = state.model.clone();
1785 let started_at = Instant::now();
1786
1787 state
1788 .observer
1789 .record_event(&crate::observability::ObserverEvent::AgentStart {
1790 provider: provider_label.clone(),
1791 model: model_label.clone(),
1792 });
1793 state
1794 .observer
1795 .record_event(&crate::observability::ObserverEvent::LlmRequest {
1796 provider: provider_label.clone(),
1797 model: model_label.clone(),
1798 messages_count: 1,
1799 });
1800
1801 match run_gateway_chat_simple(&state, message).await {
1802 Ok(response) => {
1803 let duration = started_at.elapsed();
1804 state
1805 .observer
1806 .record_event(&crate::observability::ObserverEvent::LlmResponse {
1807 provider: provider_label.clone(),
1808 model: model_label.clone(),
1809 duration,
1810 success: true,
1811 error_message: None,
1812 input_tokens: None,
1813 output_tokens: None,
1814 });
1815 state.observer.record_metric(
1816 &crate::observability::traits::ObserverMetric::RequestLatency(duration),
1817 );
1818 state
1819 .observer
1820 .record_event(&crate::observability::ObserverEvent::AgentEnd {
1821 provider: provider_label,
1822 model: model_label,
1823 duration,
1824 tokens_used: None,
1825 cost_usd: None,
1826 });
1827
1828 let body = serde_json::json!({"response": response, "model": state.model});
1829 (StatusCode::OK, Json(body))
1830 }
1831 Err(e) => {
1832 let duration = started_at.elapsed();
1833 let sanitized = providers::sanitize_api_error(&e.to_string());
1834
1835 state
1836 .observer
1837 .record_event(&crate::observability::ObserverEvent::LlmResponse {
1838 provider: provider_label.clone(),
1839 model: model_label.clone(),
1840 duration,
1841 success: false,
1842 error_message: Some(sanitized.clone()),
1843 input_tokens: None,
1844 output_tokens: None,
1845 });
1846 state.observer.record_metric(
1847 &crate::observability::traits::ObserverMetric::RequestLatency(duration),
1848 );
1849 state
1850 .observer
1851 .record_event(&crate::observability::ObserverEvent::Error {
1852 component: "gateway".to_string(),
1853 message: sanitized.clone(),
1854 });
1855 state
1856 .observer
1857 .record_event(&crate::observability::ObserverEvent::AgentEnd {
1858 provider: provider_label,
1859 model: model_label,
1860 duration,
1861 tokens_used: None,
1862 cost_usd: None,
1863 });
1864
1865 tracing::error!("Webhook provider error: {}", sanitized);
1866 let err = serde_json::json!({"error": "LLM request failed"});
1867 (StatusCode::INTERNAL_SERVER_ERROR, Json(err))
1868 }
1869 }
1870}
1871
1872#[derive(serde::Deserialize)]
1874pub struct WhatsAppVerifyQuery {
1875 #[serde(rename = "hub.mode")]
1876 pub mode: Option<String>,
1877 #[serde(rename = "hub.verify_token")]
1878 pub verify_token: Option<String>,
1879 #[serde(rename = "hub.challenge")]
1880 pub challenge: Option<String>,
1881}
1882
1883async fn handle_whatsapp_verify(
1885 State(state): State<AppState>,
1886 Query(params): Query<WhatsAppVerifyQuery>,
1887) -> impl IntoResponse {
1888 let Some(ref wa) = state.whatsapp else {
1889 return (StatusCode::NOT_FOUND, "WhatsApp not configured".to_string());
1890 };
1891
1892 let token_matches = params
1894 .verify_token
1895 .as_deref()
1896 .is_some_and(|t| constant_time_eq(t, wa.verify_token()));
1897 if params.mode.as_deref() == Some("subscribe") && token_matches {
1898 if let Some(ch) = params.challenge {
1899 tracing::info!("WhatsApp webhook verified successfully");
1900 return (StatusCode::OK, ch);
1901 }
1902 return (StatusCode::BAD_REQUEST, "Missing hub.challenge".to_string());
1903 }
1904
1905 tracing::warn!("WhatsApp webhook verification failed — token mismatch");
1906 (StatusCode::FORBIDDEN, "Forbidden".to_string())
1907}
1908
1909pub fn verify_whatsapp_signature(app_secret: &str, body: &[u8], signature_header: &str) -> bool {
1913 use hmac::{Hmac, Mac};
1914 use sha2::Sha256;
1915
1916 let Some(hex_sig) = signature_header.strip_prefix("sha256=") else {
1918 return false;
1919 };
1920
1921 let Ok(expected) = hex::decode(hex_sig) else {
1923 return false;
1924 };
1925
1926 let Ok(mut mac) = Hmac::<Sha256>::new_from_slice(app_secret.as_bytes()) else {
1928 return false;
1929 };
1930 mac.update(body);
1931
1932 mac.verify_slice(&expected).is_ok()
1934}
1935
1936async fn handle_whatsapp_message(
1938 State(state): State<AppState>,
1939 headers: HeaderMap,
1940 body: Bytes,
1941) -> impl IntoResponse {
1942 let Some(ref wa) = state.whatsapp else {
1943 return (
1944 StatusCode::NOT_FOUND,
1945 Json(serde_json::json!({"error": "WhatsApp not configured"})),
1946 );
1947 };
1948
1949 if let Some(ref app_secret) = state.whatsapp_app_secret {
1951 let signature = headers
1952 .get("X-Hub-Signature-256")
1953 .and_then(|v| v.to_str().ok())
1954 .unwrap_or("");
1955
1956 if !verify_whatsapp_signature(app_secret, &body, signature) {
1957 tracing::warn!(
1958 "WhatsApp webhook signature verification failed (signature: {})",
1959 if signature.is_empty() {
1960 "missing"
1961 } else {
1962 "invalid"
1963 }
1964 );
1965 return (
1966 StatusCode::UNAUTHORIZED,
1967 Json(serde_json::json!({"error": "Invalid signature"})),
1968 );
1969 }
1970 }
1971
1972 let Ok(payload) = serde_json::from_slice::<serde_json::Value>(&body) else {
1974 return (
1975 StatusCode::BAD_REQUEST,
1976 Json(serde_json::json!({"error": "Invalid JSON payload"})),
1977 );
1978 };
1979
1980 let messages = wa.parse_webhook_payload(&payload);
1982
1983 if messages.is_empty() {
1984 return (StatusCode::OK, Json(serde_json::json!({"status": "ok"})));
1986 }
1987
1988 for msg in &messages {
1990 tracing::info!(
1991 "WhatsApp message from {}: {}",
1992 msg.sender,
1993 truncate_with_ellipsis(&msg.content, 50)
1994 );
1995 let session_id = sender_session_id("whatsapp", msg);
1996
1997 if state.auto_save && !memory::should_skip_autosave_content(&msg.content) {
1999 let key = whatsapp_memory_key(msg);
2000 let _ = state
2001 .mem
2002 .store(
2003 &key,
2004 &msg.content,
2005 MemoryCategory::Conversation,
2006 Some(&session_id),
2007 )
2008 .await;
2009 }
2010
2011 match Box::pin(run_gateway_chat_with_tools(
2012 &state,
2013 &msg.content,
2014 Some(&session_id),
2015 ))
2016 .await
2017 {
2018 Ok(response) => {
2019 if let Err(e) = wa
2021 .send(&SendMessage::new(response, &msg.reply_target))
2022 .await
2023 {
2024 tracing::error!("Failed to send WhatsApp reply: {e}");
2025 }
2026 }
2027 Err(e) => {
2028 tracing::error!("LLM error for WhatsApp message: {e:#}");
2029 let _ = wa
2030 .send(&SendMessage::new(
2031 "Sorry, I couldn't process your message right now.",
2032 &msg.reply_target,
2033 ))
2034 .await;
2035 }
2036 }
2037 }
2038
2039 (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
2041}
2042
2043async fn handle_linq_webhook(
2045 State(state): State<AppState>,
2046 headers: HeaderMap,
2047 body: Bytes,
2048) -> impl IntoResponse {
2049 let Some(ref linq) = state.linq else {
2050 return (
2051 StatusCode::NOT_FOUND,
2052 Json(serde_json::json!({"error": "Linq not configured"})),
2053 );
2054 };
2055
2056 let body_str = String::from_utf8_lossy(&body);
2057
2058 if let Some(ref signing_secret) = state.linq_signing_secret {
2060 let timestamp = headers
2061 .get("X-Webhook-Timestamp")
2062 .and_then(|v| v.to_str().ok())
2063 .unwrap_or("");
2064
2065 let signature = headers
2066 .get("X-Webhook-Signature")
2067 .and_then(|v| v.to_str().ok())
2068 .unwrap_or("");
2069
2070 if !crate::channels::linq::verify_linq_signature(
2071 signing_secret,
2072 &body_str,
2073 timestamp,
2074 signature,
2075 ) {
2076 tracing::warn!(
2077 "Linq webhook signature verification failed (signature: {})",
2078 if signature.is_empty() {
2079 "missing"
2080 } else {
2081 "invalid"
2082 }
2083 );
2084 return (
2085 StatusCode::UNAUTHORIZED,
2086 Json(serde_json::json!({"error": "Invalid signature"})),
2087 );
2088 }
2089 }
2090
2091 let Ok(payload) = serde_json::from_slice::<serde_json::Value>(&body) else {
2093 return (
2094 StatusCode::BAD_REQUEST,
2095 Json(serde_json::json!({"error": "Invalid JSON payload"})),
2096 );
2097 };
2098
2099 let messages = linq.parse_webhook_payload(&payload);
2101
2102 if messages.is_empty() {
2103 return (StatusCode::OK, Json(serde_json::json!({"status": "ok"})));
2105 }
2106
2107 for msg in &messages {
2109 tracing::info!(
2110 "Linq message from {}: {}",
2111 msg.sender,
2112 truncate_with_ellipsis(&msg.content, 50)
2113 );
2114 let session_id = sender_session_id("linq", msg);
2115
2116 if state.auto_save && !memory::should_skip_autosave_content(&msg.content) {
2118 let key = linq_memory_key(msg);
2119 let _ = state
2120 .mem
2121 .store(
2122 &key,
2123 &msg.content,
2124 MemoryCategory::Conversation,
2125 Some(&session_id),
2126 )
2127 .await;
2128 }
2129
2130 match Box::pin(run_gateway_chat_with_tools(
2132 &state,
2133 &msg.content,
2134 Some(&session_id),
2135 ))
2136 .await
2137 {
2138 Ok(response) => {
2139 if let Err(e) = linq
2141 .send(&SendMessage::new(response, &msg.reply_target))
2142 .await
2143 {
2144 tracing::error!("Failed to send Linq reply: {e}");
2145 }
2146 }
2147 Err(e) => {
2148 tracing::error!("LLM error for Linq message: {e:#}");
2149 let _ = linq
2150 .send(&SendMessage::new(
2151 "Sorry, I couldn't process your message right now.",
2152 &msg.reply_target,
2153 ))
2154 .await;
2155 }
2156 }
2157 }
2158
2159 (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
2161}
2162
2163async fn handle_wati_verify(
2165 State(state): State<AppState>,
2166 Query(params): Query<WatiVerifyQuery>,
2167) -> impl IntoResponse {
2168 if state.wati.is_none() {
2169 return (StatusCode::NOT_FOUND, "WATI not configured".to_string());
2170 }
2171
2172 if let Some(challenge) = params.challenge {
2174 tracing::info!("WATI webhook verified successfully");
2175 return (StatusCode::OK, challenge);
2176 }
2177
2178 (StatusCode::BAD_REQUEST, "Missing hub.challenge".to_string())
2179}
2180
2181#[derive(Debug, serde::Deserialize)]
2182pub struct WatiVerifyQuery {
2183 #[serde(rename = "hub.challenge")]
2184 pub challenge: Option<String>,
2185}
2186
2187async fn handle_wati_webhook(State(state): State<AppState>, body: Bytes) -> impl IntoResponse {
2189 let Some(ref wati) = state.wati else {
2190 return (
2191 StatusCode::NOT_FOUND,
2192 Json(serde_json::json!({"error": "WATI not configured"})),
2193 );
2194 };
2195
2196 let Ok(payload) = serde_json::from_slice::<serde_json::Value>(&body) else {
2198 return (
2199 StatusCode::BAD_REQUEST,
2200 Json(serde_json::json!({"error": "Invalid JSON payload"})),
2201 );
2202 };
2203
2204 let msg_type = payload.get("type").and_then(|v| v.as_str()).unwrap_or("");
2206
2207 let messages = if matches!(msg_type, "audio" | "voice") {
2208 if let Some(transcript) = wati.try_transcribe_audio(&payload).await {
2210 wati.parse_audio_as_message(&payload, transcript)
2211 } else {
2212 vec![]
2213 }
2214 } else {
2215 wati.parse_webhook_payload(&payload)
2216 };
2217
2218 if messages.is_empty() {
2219 return (StatusCode::OK, Json(serde_json::json!({"status": "ok"})));
2220 }
2221
2222 for msg in &messages {
2224 tracing::info!(
2225 "WATI message from {}: {}",
2226 msg.sender,
2227 truncate_with_ellipsis(&msg.content, 50)
2228 );
2229 let session_id = sender_session_id("wati", msg);
2230
2231 if state.auto_save && !memory::should_skip_autosave_content(&msg.content) {
2233 let key = wati_memory_key(msg);
2234 let _ = state
2235 .mem
2236 .store(
2237 &key,
2238 &msg.content,
2239 MemoryCategory::Conversation,
2240 Some(&session_id),
2241 )
2242 .await;
2243 }
2244
2245 match Box::pin(run_gateway_chat_with_tools(
2247 &state,
2248 &msg.content,
2249 Some(&session_id),
2250 ))
2251 .await
2252 {
2253 Ok(response) => {
2254 if let Err(e) = wati
2256 .send(&SendMessage::new(response, &msg.reply_target))
2257 .await
2258 {
2259 tracing::error!("Failed to send WATI reply: {e}");
2260 }
2261 }
2262 Err(e) => {
2263 tracing::error!("LLM error for WATI message: {e:#}");
2264 let _ = wati
2265 .send(&SendMessage::new(
2266 "Sorry, I couldn't process your message right now.",
2267 &msg.reply_target,
2268 ))
2269 .await;
2270 }
2271 }
2272 }
2273
2274 (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
2276}
2277
2278async fn handle_nextcloud_talk_webhook(
2280 State(state): State<AppState>,
2281 headers: HeaderMap,
2282 body: Bytes,
2283) -> impl IntoResponse {
2284 let Some(ref nextcloud_talk) = state.nextcloud_talk else {
2285 return (
2286 StatusCode::NOT_FOUND,
2287 Json(serde_json::json!({"error": "Nextcloud Talk not configured"})),
2288 );
2289 };
2290
2291 let body_str = String::from_utf8_lossy(&body);
2292
2293 if let Some(ref webhook_secret) = state.nextcloud_talk_webhook_secret {
2295 let random = headers
2296 .get("X-Nextcloud-Talk-Random")
2297 .and_then(|v| v.to_str().ok())
2298 .unwrap_or("");
2299
2300 let signature = headers
2301 .get("X-Nextcloud-Talk-Signature")
2302 .and_then(|v| v.to_str().ok())
2303 .unwrap_or("");
2304
2305 if !crate::channels::nextcloud_talk::verify_nextcloud_talk_signature(
2306 webhook_secret,
2307 random,
2308 &body_str,
2309 signature,
2310 ) {
2311 tracing::warn!(
2312 "Nextcloud Talk webhook signature verification failed (signature: {})",
2313 if signature.is_empty() {
2314 "missing"
2315 } else {
2316 "invalid"
2317 }
2318 );
2319 return (
2320 StatusCode::UNAUTHORIZED,
2321 Json(serde_json::json!({"error": "Invalid signature"})),
2322 );
2323 }
2324 }
2325
2326 let Ok(payload) = serde_json::from_slice::<serde_json::Value>(&body) else {
2328 return (
2329 StatusCode::BAD_REQUEST,
2330 Json(serde_json::json!({"error": "Invalid JSON payload"})),
2331 );
2332 };
2333
2334 let messages = nextcloud_talk.parse_webhook_payload(&payload);
2336 if messages.is_empty() {
2337 return (StatusCode::OK, Json(serde_json::json!({"status": "ok"})));
2339 }
2340
2341 for msg in &messages {
2342 tracing::info!(
2343 "Nextcloud Talk message from {}: {}",
2344 msg.sender,
2345 truncate_with_ellipsis(&msg.content, 50)
2346 );
2347 let session_id = sender_session_id("nextcloud_talk", msg);
2348
2349 if state.auto_save && !memory::should_skip_autosave_content(&msg.content) {
2350 let key = nextcloud_talk_memory_key(msg);
2351 let _ = state
2352 .mem
2353 .store(
2354 &key,
2355 &msg.content,
2356 MemoryCategory::Conversation,
2357 Some(&session_id),
2358 )
2359 .await;
2360 }
2361
2362 match Box::pin(run_gateway_chat_with_tools(
2363 &state,
2364 &msg.content,
2365 Some(&session_id),
2366 ))
2367 .await
2368 {
2369 Ok(response) => {
2370 if let Err(e) = nextcloud_talk
2371 .send(&SendMessage::new(response, &msg.reply_target))
2372 .await
2373 {
2374 tracing::error!("Failed to send Nextcloud Talk reply: {e}");
2375 }
2376 }
2377 Err(e) => {
2378 tracing::error!("LLM error for Nextcloud Talk message: {e:#}");
2379 let _ = nextcloud_talk
2380 .send(&SendMessage::new(
2381 "Sorry, I couldn't process your message right now.",
2382 &msg.reply_target,
2383 ))
2384 .await;
2385 }
2386 }
2387 }
2388
2389 (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
2390}
2391
2392const GMAIL_WEBHOOK_MAX_BODY: usize = 1024 * 1024;
2395
2396async fn handle_gmail_push_webhook(
2398 State(state): State<AppState>,
2399 headers: HeaderMap,
2400 body: Bytes,
2401) -> impl IntoResponse {
2402 let Some(ref gmail_push) = state.gmail_push else {
2403 return (
2404 StatusCode::NOT_FOUND,
2405 Json(serde_json::json!({"error": "Gmail push not configured"})),
2406 );
2407 };
2408
2409 if body.len() > GMAIL_WEBHOOK_MAX_BODY {
2411 return (
2412 StatusCode::PAYLOAD_TOO_LARGE,
2413 Json(serde_json::json!({"error": "Request body too large"})),
2414 );
2415 }
2416
2417 let secret = gmail_push.resolve_webhook_secret();
2419 if !secret.is_empty() {
2420 let provided = headers
2421 .get(axum::http::header::AUTHORIZATION)
2422 .and_then(|v| v.to_str().ok())
2423 .and_then(|auth| auth.strip_prefix("Bearer "))
2424 .unwrap_or("");
2425
2426 if provided != secret {
2427 tracing::warn!("Gmail push webhook: unauthorized request");
2428 return (
2429 StatusCode::UNAUTHORIZED,
2430 Json(serde_json::json!({"error": "Unauthorized"})),
2431 );
2432 }
2433 }
2434
2435 let body_str = String::from_utf8_lossy(&body);
2436 let envelope: crate::channels::gmail_push::PubSubEnvelope =
2437 match serde_json::from_str(&body_str) {
2438 Ok(e) => e,
2439 Err(e) => {
2440 tracing::warn!("Gmail push webhook: invalid payload: {e}");
2441 return (
2442 StatusCode::BAD_REQUEST,
2443 Json(serde_json::json!({"error": "Invalid Pub/Sub envelope"})),
2444 );
2445 }
2446 };
2447
2448 let channel = Arc::clone(gmail_push);
2450 tokio::spawn(async move {
2451 if let Err(e) = channel.handle_notification(&envelope).await {
2452 tracing::error!("Gmail push notification processing failed: {e:#}");
2453 }
2454 });
2455
2456 (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
2458}
2459
2460#[derive(serde::Serialize)]
2466struct AdminResponse {
2467 success: bool,
2468 message: String,
2469}
2470
2471fn require_localhost(peer: &SocketAddr) -> Result<(), (StatusCode, Json<serde_json::Value>)> {
2473 if peer.ip().is_loopback() {
2474 Ok(())
2475 } else {
2476 Err((
2477 StatusCode::FORBIDDEN,
2478 Json(serde_json::json!({
2479 "error": "Admin endpoints are restricted to localhost"
2480 })),
2481 ))
2482 }
2483}
2484
2485async fn handle_admin_shutdown(
2487 State(state): State<AppState>,
2488 ConnectInfo(peer): ConnectInfo<SocketAddr>,
2489) -> Result<impl IntoResponse, (StatusCode, Json<serde_json::Value>)> {
2490 require_localhost(&peer)?;
2491 tracing::info!("🔌 Admin shutdown request received — initiating graceful shutdown");
2492
2493 let body = AdminResponse {
2494 success: true,
2495 message: "Gateway shutdown initiated".to_string(),
2496 };
2497
2498 let _ = state.shutdown_tx.send(true);
2499
2500 Ok((StatusCode::OK, Json(body)))
2501}
2502
2503async fn handle_admin_paircode(
2505 State(state): State<AppState>,
2506 ConnectInfo(peer): ConnectInfo<SocketAddr>,
2507) -> Result<impl IntoResponse, (StatusCode, Json<serde_json::Value>)> {
2508 require_localhost(&peer)?;
2509 let code = state.pairing.pairing_code();
2510
2511 let body = if let Some(c) = code {
2512 serde_json::json!({
2513 "success": true,
2514 "pairing_required": state.pairing.require_pairing(),
2515 "pairing_code": c,
2516 "message": "Use this one-time code to pair"
2517 })
2518 } else {
2519 serde_json::json!({
2520 "success": true,
2521 "pairing_required": state.pairing.require_pairing(),
2522 "pairing_code": null,
2523 "message": if state.pairing.require_pairing() {
2524 "Pairing is active but no new code available (already paired or code expired)"
2525 } else {
2526 "Pairing is disabled for this gateway"
2527 }
2528 })
2529 };
2530
2531 Ok((StatusCode::OK, Json(body)))
2532}
2533
2534async fn handle_admin_paircode_new(
2536 State(state): State<AppState>,
2537 ConnectInfo(peer): ConnectInfo<SocketAddr>,
2538) -> Result<impl IntoResponse, (StatusCode, Json<serde_json::Value>)> {
2539 require_localhost(&peer)?;
2540 match state.pairing.generate_new_pairing_code() {
2541 Some(code) => {
2542 tracing::info!("🔐 New pairing code generated via admin endpoint");
2543 let body = serde_json::json!({
2544 "success": true,
2545 "pairing_required": state.pairing.require_pairing(),
2546 "pairing_code": code,
2547 "message": "New pairing code generated — use this one-time code to pair"
2548 });
2549 Ok((StatusCode::OK, Json(body)))
2550 }
2551 None => {
2552 let body = serde_json::json!({
2553 "success": false,
2554 "pairing_required": false,
2555 "pairing_code": null,
2556 "message": "Pairing is disabled for this gateway"
2557 });
2558 Ok((StatusCode::BAD_REQUEST, Json(body)))
2559 }
2560 }
2561}
2562
2563async fn handle_pair_code(
2571 State(state): State<AppState>,
2572 ConnectInfo(peer): ConnectInfo<SocketAddr>,
2573) -> Result<impl IntoResponse, (StatusCode, Json<serde_json::Value>)> {
2574 require_localhost(&peer)?;
2575
2576 let require = state.pairing.require_pairing();
2577 let is_paired = state.pairing.is_paired();
2578
2579 let code = if require && !is_paired {
2580 state.pairing.pairing_code()
2581 } else {
2582 None
2583 };
2584
2585 let body = serde_json::json!({
2586 "success": true,
2587 "pairing_required": require,
2588 "pairing_code": code,
2589 });
2590
2591 Ok((StatusCode::OK, Json(body)))
2592}
2593
2594#[cfg(test)]
2595mod tests {
2596 use super::*;
2597 use crate::channels::traits::ChannelMessage;
2598 use crate::memory::{Memory, MemoryCategory, MemoryEntry};
2599 use crate::providers::Provider;
2600 use async_trait::async_trait;
2601 use axum::http::HeaderValue;
2602 use axum::response::IntoResponse;
2603 use http_body_util::BodyExt;
2604 use parking_lot::Mutex;
2605 use std::sync::atomic::{AtomicUsize, Ordering};
2606
2607 fn generate_test_secret() -> String {
2609 let bytes: [u8; 32] = rand::random();
2610 hex::encode(bytes)
2611 }
2612
2613 #[test]
2614 fn security_body_limit_is_64kb() {
2615 assert_eq!(MAX_BODY_SIZE, 65_536);
2616 }
2617
2618 #[test]
2619 fn security_timeout_default_is_30_seconds() {
2620 assert_eq!(REQUEST_TIMEOUT_SECS, 30);
2621 }
2622
2623 #[test]
2624 fn gateway_timeout_falls_back_to_default() {
2625 unsafe { std::env::remove_var("CONSTRUCT_GATEWAY_TIMEOUT_SECS") };
2628 assert_eq!(gateway_request_timeout_secs(), 30);
2629 }
2630
2631 #[test]
2632 fn webhook_body_requires_message_field() {
2633 let valid = r#"{"message": "hello"}"#;
2634 let parsed: Result<WebhookBody, _> = serde_json::from_str(valid);
2635 assert!(parsed.is_ok());
2636 assert_eq!(parsed.unwrap().message, "hello");
2637
2638 let missing = r#"{"other": "field"}"#;
2639 let parsed: Result<WebhookBody, _> = serde_json::from_str(missing);
2640 assert!(parsed.is_err());
2641 }
2642
2643 #[test]
2644 fn whatsapp_query_fields_are_optional() {
2645 let q = WhatsAppVerifyQuery {
2646 mode: None,
2647 verify_token: None,
2648 challenge: None,
2649 };
2650 assert!(q.mode.is_none());
2651 }
2652
2653 #[test]
2654 fn app_state_is_clone() {
2655 fn assert_clone<T: Clone>() {}
2656 assert_clone::<AppState>();
2657 }
2658
2659 #[tokio::test]
2660 async fn metrics_endpoint_returns_hint_when_prometheus_is_disabled() {
2661 let state = AppState {
2662 config: Arc::new(Mutex::new(Config::default())),
2663 provider: Arc::new(MockProvider::default()),
2664 model: "test-model".into(),
2665 temperature: 0.0,
2666 mem: Arc::new(MockMemory),
2667 auto_save: false,
2668 webhook_secret_hash: None,
2669 pairing: Arc::new(PairingGuard::new(false, &[])),
2670 trust_forwarded_headers: false,
2671 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
2672 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
2673 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
2674 whatsapp: None,
2675 whatsapp_app_secret: None,
2676 linq: None,
2677 linq_signing_secret: None,
2678 nextcloud_talk: None,
2679 nextcloud_talk_webhook_secret: None,
2680 wati: None,
2681 gmail_push: None,
2682 observer: Arc::new(crate::observability::NoopObserver),
2683 tools_registry: Arc::new(Vec::new()),
2684 cost_tracker: None,
2685 audit_logger: None,
2686 event_tx: tokio::sync::broadcast::channel(16).0,
2687 shutdown_tx: tokio::sync::watch::channel(false).0,
2688 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
2689 path_prefix: String::new(),
2690 session_backend: None,
2691 session_queue: std::sync::Arc::new(
2692 crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
2693 ),
2694 device_registry: None,
2695 pending_pairings: None,
2696 canvas_store: CanvasStore::new(),
2697 mcp_registry: None,
2698 approval_registry: approval_registry::global(),
2699 mcp_local_url: None,
2700 #[cfg(feature = "webauthn")]
2701 webauthn: None,
2702 };
2703
2704 let response = handle_metrics(State(state)).await.into_response();
2705 assert_eq!(response.status(), StatusCode::OK);
2706 assert_eq!(
2707 response
2708 .headers()
2709 .get(header::CONTENT_TYPE)
2710 .and_then(|value| value.to_str().ok()),
2711 Some(PROMETHEUS_CONTENT_TYPE)
2712 );
2713
2714 let body = response.into_body().collect().await.unwrap().to_bytes();
2715 let text = String::from_utf8(body.to_vec()).unwrap();
2716 assert!(text.contains("Prometheus backend not enabled"));
2717 }
2718
2719 #[cfg(feature = "observability-prometheus")]
2720 #[tokio::test]
2721 async fn metrics_endpoint_renders_prometheus_output() {
2722 let event_tx = tokio::sync::broadcast::channel(16).0;
2723 let wrapped = sse::BroadcastObserver::new(
2724 Box::new(crate::observability::PrometheusObserver::new()),
2725 event_tx.clone(),
2726 );
2727 crate::observability::Observer::record_event(
2728 &wrapped,
2729 &crate::observability::ObserverEvent::HeartbeatTick,
2730 );
2731
2732 let observer: Arc<dyn crate::observability::Observer> = Arc::new(wrapped);
2733 let state = AppState {
2734 config: Arc::new(Mutex::new(Config::default())),
2735 provider: Arc::new(MockProvider::default()),
2736 model: "test-model".into(),
2737 temperature: 0.0,
2738 mem: Arc::new(MockMemory),
2739 auto_save: false,
2740 webhook_secret_hash: None,
2741 pairing: Arc::new(PairingGuard::new(false, &[])),
2742 trust_forwarded_headers: false,
2743 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
2744 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
2745 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
2746 whatsapp: None,
2747 whatsapp_app_secret: None,
2748 linq: None,
2749 linq_signing_secret: None,
2750 nextcloud_talk: None,
2751 nextcloud_talk_webhook_secret: None,
2752 wati: None,
2753 gmail_push: None,
2754 observer,
2755 tools_registry: Arc::new(Vec::new()),
2756 cost_tracker: None,
2757 audit_logger: None,
2758 event_tx,
2759 shutdown_tx: tokio::sync::watch::channel(false).0,
2760 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
2761 path_prefix: String::new(),
2762 session_backend: None,
2763 session_queue: std::sync::Arc::new(
2764 crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
2765 ),
2766 device_registry: None,
2767 pending_pairings: None,
2768 canvas_store: CanvasStore::new(),
2769 mcp_registry: None,
2770 approval_registry: approval_registry::global(),
2771 mcp_local_url: None,
2772 #[cfg(feature = "webauthn")]
2773 webauthn: None,
2774 };
2775
2776 let response = handle_metrics(State(state)).await.into_response();
2777 assert_eq!(response.status(), StatusCode::OK);
2778
2779 let body = response.into_body().collect().await.unwrap().to_bytes();
2780 let text = String::from_utf8(body.to_vec()).unwrap();
2781 assert!(text.contains("construct_heartbeat_ticks_total 1"));
2782 }
2783
2784 #[test]
2785 fn gateway_rate_limiter_blocks_after_limit() {
2786 let limiter = GatewayRateLimiter::new(2, 2, 100);
2787 assert!(limiter.allow_pair("127.0.0.1"));
2788 assert!(limiter.allow_pair("127.0.0.1"));
2789 assert!(!limiter.allow_pair("127.0.0.1"));
2790 }
2791
2792 #[test]
2793 fn rate_limiter_sweep_removes_stale_entries() {
2794 let limiter = SlidingWindowRateLimiter::new(10, Duration::from_secs(60), 100);
2795 assert!(limiter.allow("ip-1"));
2797 assert!(limiter.allow("ip-2"));
2798 assert!(limiter.allow("ip-3"));
2799
2800 {
2801 let guard = limiter.requests.lock();
2802 assert_eq!(guard.0.len(), 3);
2803 }
2804
2805 {
2807 let mut guard = limiter.requests.lock();
2808 guard.1 = Instant::now()
2809 .checked_sub(Duration::from_secs(RATE_LIMITER_SWEEP_INTERVAL_SECS + 1))
2810 .unwrap();
2811 guard.0.get_mut("ip-2").unwrap().clear();
2813 guard.0.get_mut("ip-3").unwrap().clear();
2814 }
2815
2816 assert!(limiter.allow("ip-1"));
2818
2819 {
2820 let guard = limiter.requests.lock();
2821 assert_eq!(guard.0.len(), 1, "Stale entries should have been swept");
2822 assert!(guard.0.contains_key("ip-1"));
2823 }
2824 }
2825
2826 #[test]
2827 fn rate_limiter_zero_limit_always_allows() {
2828 let limiter = SlidingWindowRateLimiter::new(0, Duration::from_secs(60), 10);
2829 for _ in 0..100 {
2830 assert!(limiter.allow("any-key"));
2831 }
2832 }
2833
2834 #[test]
2835 fn idempotency_store_rejects_duplicate_key() {
2836 let store = IdempotencyStore::new(Duration::from_secs(30), 10);
2837 assert!(store.record_if_new("req-1"));
2838 assert!(!store.record_if_new("req-1"));
2839 assert!(store.record_if_new("req-2"));
2840 }
2841
2842 #[test]
2843 fn rate_limiter_bounded_cardinality_evicts_oldest_key() {
2844 let limiter = SlidingWindowRateLimiter::new(5, Duration::from_secs(60), 2);
2845 assert!(limiter.allow("ip-1"));
2846 assert!(limiter.allow("ip-2"));
2847 assert!(limiter.allow("ip-3"));
2848
2849 let guard = limiter.requests.lock();
2850 assert_eq!(guard.0.len(), 2);
2851 assert!(guard.0.contains_key("ip-2"));
2852 assert!(guard.0.contains_key("ip-3"));
2853 }
2854
2855 #[test]
2856 fn idempotency_store_bounded_cardinality_evicts_oldest_key() {
2857 let store = IdempotencyStore::new(Duration::from_secs(300), 2);
2858 assert!(store.record_if_new("k1"));
2859 std::thread::sleep(Duration::from_millis(2));
2860 assert!(store.record_if_new("k2"));
2861 std::thread::sleep(Duration::from_millis(2));
2862 assert!(store.record_if_new("k3"));
2863
2864 let keys = store.keys.lock();
2865 assert_eq!(keys.len(), 2);
2866 assert!(!keys.contains_key("k1"));
2867 assert!(keys.contains_key("k2"));
2868 assert!(keys.contains_key("k3"));
2869 }
2870
2871 #[test]
2872 fn client_key_defaults_to_peer_addr_when_untrusted_proxy_mode() {
2873 let peer = SocketAddr::from(([10, 0, 0, 5], 42617));
2874 let mut headers = HeaderMap::new();
2875 headers.insert(
2876 "X-Forwarded-For",
2877 HeaderValue::from_static("198.51.100.10, 203.0.113.11"),
2878 );
2879
2880 let key = client_key_from_request(Some(peer), &headers, false);
2881 assert_eq!(key, "10.0.0.5");
2882 }
2883
2884 #[test]
2885 fn client_key_uses_forwarded_ip_only_in_trusted_proxy_mode() {
2886 let peer = SocketAddr::from(([10, 0, 0, 5], 42617));
2889 let mut headers = HeaderMap::new();
2890 headers.insert(
2891 "X-Forwarded-For",
2892 HeaderValue::from_static("198.51.100.10, 203.0.113.11"),
2893 );
2894
2895 let key = client_key_from_request(Some(peer), &headers, true);
2896 assert_eq!(key, "203.0.113.11");
2897 }
2898
2899 #[test]
2900 fn client_key_spoofed_leftmost_xff_does_not_bypass_trust() {
2901 let peer = SocketAddr::from(([10, 0, 0, 5], 42617));
2906 let mut headers = HeaderMap::new();
2907 headers.insert(
2908 "X-Forwarded-For",
2909 HeaderValue::from_static("127.0.0.1, 203.0.113.11"),
2910 );
2911
2912 let key = client_key_from_request(Some(peer), &headers, true);
2913 assert_eq!(key, "203.0.113.11");
2914 }
2915
2916 #[test]
2917 fn client_key_falls_back_to_peer_when_forwarded_header_invalid() {
2918 let peer = SocketAddr::from(([10, 0, 0, 5], 42617));
2919 let mut headers = HeaderMap::new();
2920 headers.insert("X-Forwarded-For", HeaderValue::from_static("garbage-value"));
2921
2922 let key = client_key_from_request(Some(peer), &headers, true);
2923 assert_eq!(key, "10.0.0.5");
2924 }
2925
2926 #[test]
2927 fn normalize_max_keys_uses_fallback_for_zero() {
2928 assert_eq!(normalize_max_keys(0, 10_000), 10_000);
2929 assert_eq!(normalize_max_keys(0, 0), 1);
2930 }
2931
2932 #[test]
2933 fn normalize_max_keys_preserves_nonzero_values() {
2934 assert_eq!(normalize_max_keys(2_048, 10_000), 2_048);
2935 assert_eq!(normalize_max_keys(1, 10_000), 1);
2936 }
2937
2938 #[tokio::test]
2939 async fn persist_pairing_tokens_writes_config_tokens() {
2940 let temp = tempfile::tempdir().unwrap();
2941 let config_path = temp.path().join("config.toml");
2942 let workspace_path = temp.path().join("workspace");
2943
2944 let mut config = Config::default();
2945 config.config_path = config_path.clone();
2946 config.workspace_dir = workspace_path;
2947 config.save().await.unwrap();
2948
2949 let guard = PairingGuard::new(true, &[]);
2950 let code = guard.pairing_code().unwrap();
2951 let token = guard.try_pair(&code, "test_client").await.unwrap().unwrap();
2952 assert!(guard.is_authenticated(&token));
2953
2954 let shared_config = Arc::new(Mutex::new(config));
2955 Box::pin(persist_pairing_tokens(shared_config.clone(), &guard))
2956 .await
2957 .unwrap();
2958
2959 let plaintext = {
2961 let in_memory = shared_config.lock();
2962 assert_eq!(in_memory.gateway.paired_tokens.len(), 1);
2963 in_memory.gateway.paired_tokens[0].clone()
2964 };
2965 assert_eq!(plaintext.len(), 64);
2966 assert!(plaintext.chars().all(|c: char| c.is_ascii_hexdigit()));
2967
2968 let saved = tokio::fs::read_to_string(config_path).await.unwrap();
2970 let raw_parsed: Config = toml::from_str(&saved).unwrap();
2971 assert_eq!(raw_parsed.gateway.paired_tokens.len(), 1);
2972 let on_disk = &raw_parsed.gateway.paired_tokens[0];
2973 assert!(
2974 crate::security::SecretStore::is_encrypted(on_disk),
2975 "paired_token should be encrypted on disk"
2976 );
2977 }
2978
2979 #[test]
2980 fn webhook_memory_key_is_unique() {
2981 let key1 = webhook_memory_key();
2982 let key2 = webhook_memory_key();
2983
2984 assert!(key1.starts_with("webhook_msg_"));
2985 assert!(key2.starts_with("webhook_msg_"));
2986 assert_ne!(key1, key2);
2987 }
2988
2989 #[test]
2990 fn whatsapp_memory_key_includes_sender_and_message_id() {
2991 let msg = ChannelMessage {
2992 id: "wamid-123".into(),
2993 sender: "+1234567890".into(),
2994 reply_target: "+1234567890".into(),
2995 content: "hello".into(),
2996 channel: "whatsapp".into(),
2997 timestamp: 1,
2998 thread_ts: None,
2999 interruption_scope_id: None,
3000 attachments: vec![],
3001 };
3002
3003 let key = whatsapp_memory_key(&msg);
3004 assert_eq!(key, "whatsapp_+1234567890_wamid-123");
3005 }
3006
3007 #[derive(Default)]
3008 struct MockMemory;
3009
3010 #[async_trait]
3011 impl Memory for MockMemory {
3012 fn name(&self) -> &str {
3013 "mock"
3014 }
3015
3016 async fn store(
3017 &self,
3018 _key: &str,
3019 _content: &str,
3020 _category: MemoryCategory,
3021 _session_id: Option<&str>,
3022 ) -> anyhow::Result<()> {
3023 Ok(())
3024 }
3025
3026 async fn recall(
3027 &self,
3028 _query: &str,
3029 _limit: usize,
3030 _session_id: Option<&str>,
3031 _since: Option<&str>,
3032 _until: Option<&str>,
3033 ) -> anyhow::Result<Vec<MemoryEntry>> {
3034 Ok(Vec::new())
3035 }
3036
3037 async fn get(&self, _key: &str) -> anyhow::Result<Option<MemoryEntry>> {
3038 Ok(None)
3039 }
3040
3041 async fn list(
3042 &self,
3043 _category: Option<&MemoryCategory>,
3044 _session_id: Option<&str>,
3045 ) -> anyhow::Result<Vec<MemoryEntry>> {
3046 Ok(Vec::new())
3047 }
3048
3049 async fn forget(&self, _key: &str) -> anyhow::Result<bool> {
3050 Ok(false)
3051 }
3052
3053 async fn count(&self) -> anyhow::Result<usize> {
3054 Ok(0)
3055 }
3056
3057 async fn health_check(&self) -> bool {
3058 true
3059 }
3060 }
3061
3062 #[derive(Default)]
3063 struct MockProvider {
3064 calls: AtomicUsize,
3065 }
3066
3067 #[async_trait]
3068 impl Provider for MockProvider {
3069 async fn chat_with_system(
3070 &self,
3071 _system_prompt: Option<&str>,
3072 _message: &str,
3073 _model: &str,
3074 _temperature: f64,
3075 ) -> anyhow::Result<String> {
3076 self.calls.fetch_add(1, Ordering::SeqCst);
3077 Ok("ok".into())
3078 }
3079 }
3080
3081 #[derive(Default)]
3082 struct TrackingMemory {
3083 keys: Mutex<Vec<String>>,
3084 }
3085
3086 #[async_trait]
3087 impl Memory for TrackingMemory {
3088 fn name(&self) -> &str {
3089 "tracking"
3090 }
3091
3092 async fn store(
3093 &self,
3094 key: &str,
3095 _content: &str,
3096 _category: MemoryCategory,
3097 _session_id: Option<&str>,
3098 ) -> anyhow::Result<()> {
3099 self.keys.lock().push(key.to_string());
3100 Ok(())
3101 }
3102
3103 async fn recall(
3104 &self,
3105 _query: &str,
3106 _limit: usize,
3107 _session_id: Option<&str>,
3108 _since: Option<&str>,
3109 _until: Option<&str>,
3110 ) -> anyhow::Result<Vec<MemoryEntry>> {
3111 Ok(Vec::new())
3112 }
3113
3114 async fn get(&self, _key: &str) -> anyhow::Result<Option<MemoryEntry>> {
3115 Ok(None)
3116 }
3117
3118 async fn list(
3119 &self,
3120 _category: Option<&MemoryCategory>,
3121 _session_id: Option<&str>,
3122 ) -> anyhow::Result<Vec<MemoryEntry>> {
3123 Ok(Vec::new())
3124 }
3125
3126 async fn forget(&self, _key: &str) -> anyhow::Result<bool> {
3127 Ok(false)
3128 }
3129
3130 async fn count(&self) -> anyhow::Result<usize> {
3131 let size = self.keys.lock().len();
3132 Ok(size)
3133 }
3134
3135 async fn health_check(&self) -> bool {
3136 true
3137 }
3138 }
3139
3140 fn test_connect_info() -> ConnectInfo<SocketAddr> {
3141 ConnectInfo(SocketAddr::from(([127, 0, 0, 1], 30_300)))
3142 }
3143
3144 #[tokio::test]
3145 async fn webhook_idempotency_skips_duplicate_provider_calls() {
3146 let provider_impl = Arc::new(MockProvider::default());
3147 let provider: Arc<dyn Provider> = provider_impl.clone();
3148 let memory: Arc<dyn Memory> = Arc::new(MockMemory);
3149
3150 let state = AppState {
3151 config: Arc::new(Mutex::new(Config::default())),
3152 provider,
3153 model: "test-model".into(),
3154 temperature: 0.0,
3155 mem: memory,
3156 auto_save: false,
3157 webhook_secret_hash: None,
3158 pairing: Arc::new(PairingGuard::new(false, &[])),
3159 trust_forwarded_headers: false,
3160 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3161 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3162 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3163 whatsapp: None,
3164 whatsapp_app_secret: None,
3165 linq: None,
3166 linq_signing_secret: None,
3167 nextcloud_talk: None,
3168 nextcloud_talk_webhook_secret: None,
3169 wati: None,
3170 gmail_push: None,
3171 observer: Arc::new(crate::observability::NoopObserver),
3172 tools_registry: Arc::new(Vec::new()),
3173 cost_tracker: None,
3174 audit_logger: None,
3175 event_tx: tokio::sync::broadcast::channel(16).0,
3176 shutdown_tx: tokio::sync::watch::channel(false).0,
3177 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3178 path_prefix: String::new(),
3179 session_backend: None,
3180 session_queue: std::sync::Arc::new(
3181 crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3182 ),
3183 device_registry: None,
3184 pending_pairings: None,
3185 canvas_store: CanvasStore::new(),
3186 mcp_registry: None,
3187 approval_registry: approval_registry::global(),
3188 mcp_local_url: None,
3189 #[cfg(feature = "webauthn")]
3190 webauthn: None,
3191 };
3192
3193 let mut headers = HeaderMap::new();
3194 headers.insert("X-Idempotency-Key", HeaderValue::from_static("abc-123"));
3195
3196 let body = Ok(Json(WebhookBody {
3197 message: "hello".into(),
3198 }));
3199 let first = handle_webhook(
3200 State(state.clone()),
3201 test_connect_info(),
3202 headers.clone(),
3203 body,
3204 )
3205 .await
3206 .into_response();
3207 assert_eq!(first.status(), StatusCode::OK);
3208
3209 let body = Ok(Json(WebhookBody {
3210 message: "hello".into(),
3211 }));
3212 let second = handle_webhook(State(state), test_connect_info(), headers, body)
3213 .await
3214 .into_response();
3215 assert_eq!(second.status(), StatusCode::OK);
3216
3217 let payload = second.into_body().collect().await.unwrap().to_bytes();
3218 let parsed: serde_json::Value = serde_json::from_slice(&payload).unwrap();
3219 assert_eq!(parsed["status"], "duplicate");
3220 assert_eq!(parsed["idempotent"], true);
3221 assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 1);
3222 }
3223
3224 #[tokio::test]
3225 async fn webhook_autosave_stores_distinct_keys_per_request() {
3226 let provider_impl = Arc::new(MockProvider::default());
3227 let provider: Arc<dyn Provider> = provider_impl.clone();
3228
3229 let tracking_impl = Arc::new(TrackingMemory::default());
3230 let memory: Arc<dyn Memory> = tracking_impl.clone();
3231
3232 let state = AppState {
3233 config: Arc::new(Mutex::new(Config::default())),
3234 provider,
3235 model: "test-model".into(),
3236 temperature: 0.0,
3237 mem: memory,
3238 auto_save: true,
3239 webhook_secret_hash: None,
3240 pairing: Arc::new(PairingGuard::new(false, &[])),
3241 trust_forwarded_headers: false,
3242 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3243 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3244 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3245 whatsapp: None,
3246 whatsapp_app_secret: None,
3247 linq: None,
3248 linq_signing_secret: None,
3249 nextcloud_talk: None,
3250 nextcloud_talk_webhook_secret: None,
3251 wati: None,
3252 gmail_push: None,
3253 observer: Arc::new(crate::observability::NoopObserver),
3254 tools_registry: Arc::new(Vec::new()),
3255 cost_tracker: None,
3256 audit_logger: None,
3257 event_tx: tokio::sync::broadcast::channel(16).0,
3258 shutdown_tx: tokio::sync::watch::channel(false).0,
3259 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3260 path_prefix: String::new(),
3261 session_backend: None,
3262 session_queue: std::sync::Arc::new(
3263 crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3264 ),
3265 device_registry: None,
3266 pending_pairings: None,
3267 canvas_store: CanvasStore::new(),
3268 mcp_registry: None,
3269 approval_registry: approval_registry::global(),
3270 mcp_local_url: None,
3271 #[cfg(feature = "webauthn")]
3272 webauthn: None,
3273 };
3274
3275 let headers = HeaderMap::new();
3276
3277 let body1 = Ok(Json(WebhookBody {
3278 message: "hello one".into(),
3279 }));
3280 let first = handle_webhook(
3281 State(state.clone()),
3282 test_connect_info(),
3283 headers.clone(),
3284 body1,
3285 )
3286 .await
3287 .into_response();
3288 assert_eq!(first.status(), StatusCode::OK);
3289
3290 let body2 = Ok(Json(WebhookBody {
3291 message: "hello two".into(),
3292 }));
3293 let second = handle_webhook(State(state), test_connect_info(), headers, body2)
3294 .await
3295 .into_response();
3296 assert_eq!(second.status(), StatusCode::OK);
3297
3298 let keys = tracking_impl.keys.lock().clone();
3299 assert_eq!(keys.len(), 2);
3300 assert_ne!(keys[0], keys[1]);
3301 assert!(keys[0].starts_with("webhook_msg_"));
3302 assert!(keys[1].starts_with("webhook_msg_"));
3303 assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 2);
3304 }
3305
3306 #[test]
3307 fn webhook_secret_hash_is_deterministic_and_nonempty() {
3308 let secret_a = generate_test_secret();
3309 let secret_b = generate_test_secret();
3310 let one = hash_webhook_secret(&secret_a);
3311 let two = hash_webhook_secret(&secret_a);
3312 let other = hash_webhook_secret(&secret_b);
3313
3314 assert_eq!(one, two);
3315 assert_ne!(one, other);
3316 assert_eq!(one.len(), 64);
3317 }
3318
3319 #[tokio::test]
3320 async fn webhook_secret_hash_rejects_missing_header() {
3321 let provider_impl = Arc::new(MockProvider::default());
3322 let provider: Arc<dyn Provider> = provider_impl.clone();
3323 let memory: Arc<dyn Memory> = Arc::new(MockMemory);
3324 let secret = generate_test_secret();
3325
3326 let state = AppState {
3327 config: Arc::new(Mutex::new(Config::default())),
3328 provider,
3329 model: "test-model".into(),
3330 temperature: 0.0,
3331 mem: memory,
3332 auto_save: false,
3333 webhook_secret_hash: Some(Arc::from(hash_webhook_secret(&secret))),
3334 pairing: Arc::new(PairingGuard::new(false, &[])),
3335 trust_forwarded_headers: false,
3336 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3337 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3338 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3339 whatsapp: None,
3340 whatsapp_app_secret: None,
3341 linq: None,
3342 linq_signing_secret: None,
3343 nextcloud_talk: None,
3344 nextcloud_talk_webhook_secret: None,
3345 wati: None,
3346 gmail_push: None,
3347 observer: Arc::new(crate::observability::NoopObserver),
3348 tools_registry: Arc::new(Vec::new()),
3349 cost_tracker: None,
3350 audit_logger: None,
3351 event_tx: tokio::sync::broadcast::channel(16).0,
3352 shutdown_tx: tokio::sync::watch::channel(false).0,
3353 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3354 path_prefix: String::new(),
3355 session_backend: None,
3356 session_queue: std::sync::Arc::new(
3357 crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3358 ),
3359 device_registry: None,
3360 pending_pairings: None,
3361 canvas_store: CanvasStore::new(),
3362 mcp_registry: None,
3363 approval_registry: approval_registry::global(),
3364 mcp_local_url: None,
3365 #[cfg(feature = "webauthn")]
3366 webauthn: None,
3367 };
3368
3369 let response = handle_webhook(
3370 State(state),
3371 test_connect_info(),
3372 HeaderMap::new(),
3373 Ok(Json(WebhookBody {
3374 message: "hello".into(),
3375 })),
3376 )
3377 .await
3378 .into_response();
3379
3380 assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
3381 assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 0);
3382 }
3383
3384 #[tokio::test]
3385 async fn webhook_secret_hash_rejects_invalid_header() {
3386 let provider_impl = Arc::new(MockProvider::default());
3387 let provider: Arc<dyn Provider> = provider_impl.clone();
3388 let memory: Arc<dyn Memory> = Arc::new(MockMemory);
3389 let valid_secret = generate_test_secret();
3390 let wrong_secret = generate_test_secret();
3391
3392 let state = AppState {
3393 config: Arc::new(Mutex::new(Config::default())),
3394 provider,
3395 model: "test-model".into(),
3396 temperature: 0.0,
3397 mem: memory,
3398 auto_save: false,
3399 webhook_secret_hash: Some(Arc::from(hash_webhook_secret(&valid_secret))),
3400 pairing: Arc::new(PairingGuard::new(false, &[])),
3401 trust_forwarded_headers: false,
3402 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3403 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3404 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3405 whatsapp: None,
3406 whatsapp_app_secret: None,
3407 linq: None,
3408 linq_signing_secret: None,
3409 nextcloud_talk: None,
3410 nextcloud_talk_webhook_secret: None,
3411 wati: None,
3412 gmail_push: None,
3413 observer: Arc::new(crate::observability::NoopObserver),
3414 tools_registry: Arc::new(Vec::new()),
3415 cost_tracker: None,
3416 audit_logger: None,
3417 event_tx: tokio::sync::broadcast::channel(16).0,
3418 shutdown_tx: tokio::sync::watch::channel(false).0,
3419 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3420 path_prefix: String::new(),
3421 session_backend: None,
3422 session_queue: std::sync::Arc::new(
3423 crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3424 ),
3425 device_registry: None,
3426 pending_pairings: None,
3427 canvas_store: CanvasStore::new(),
3428 mcp_registry: None,
3429 approval_registry: approval_registry::global(),
3430 mcp_local_url: None,
3431 #[cfg(feature = "webauthn")]
3432 webauthn: None,
3433 };
3434
3435 let mut headers = HeaderMap::new();
3436 headers.insert(
3437 "X-Webhook-Secret",
3438 HeaderValue::from_str(&wrong_secret).unwrap(),
3439 );
3440
3441 let response = handle_webhook(
3442 State(state),
3443 test_connect_info(),
3444 headers,
3445 Ok(Json(WebhookBody {
3446 message: "hello".into(),
3447 })),
3448 )
3449 .await
3450 .into_response();
3451
3452 assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
3453 assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 0);
3454 }
3455
3456 #[tokio::test]
3457 async fn webhook_secret_hash_accepts_valid_header() {
3458 let provider_impl = Arc::new(MockProvider::default());
3459 let provider: Arc<dyn Provider> = provider_impl.clone();
3460 let memory: Arc<dyn Memory> = Arc::new(MockMemory);
3461 let secret = generate_test_secret();
3462
3463 let state = AppState {
3464 config: Arc::new(Mutex::new(Config::default())),
3465 provider,
3466 model: "test-model".into(),
3467 temperature: 0.0,
3468 mem: memory,
3469 auto_save: false,
3470 webhook_secret_hash: Some(Arc::from(hash_webhook_secret(&secret))),
3471 pairing: Arc::new(PairingGuard::new(false, &[])),
3472 trust_forwarded_headers: false,
3473 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3474 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3475 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3476 whatsapp: None,
3477 whatsapp_app_secret: None,
3478 linq: None,
3479 linq_signing_secret: None,
3480 nextcloud_talk: None,
3481 nextcloud_talk_webhook_secret: None,
3482 wati: None,
3483 gmail_push: None,
3484 observer: Arc::new(crate::observability::NoopObserver),
3485 tools_registry: Arc::new(Vec::new()),
3486 cost_tracker: None,
3487 audit_logger: None,
3488 event_tx: tokio::sync::broadcast::channel(16).0,
3489 shutdown_tx: tokio::sync::watch::channel(false).0,
3490 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3491 path_prefix: String::new(),
3492 session_backend: None,
3493 session_queue: std::sync::Arc::new(
3494 crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3495 ),
3496 device_registry: None,
3497 pending_pairings: None,
3498 canvas_store: CanvasStore::new(),
3499 mcp_registry: None,
3500 approval_registry: approval_registry::global(),
3501 mcp_local_url: None,
3502 #[cfg(feature = "webauthn")]
3503 webauthn: None,
3504 };
3505
3506 let mut headers = HeaderMap::new();
3507 headers.insert("X-Webhook-Secret", HeaderValue::from_str(&secret).unwrap());
3508
3509 let response = handle_webhook(
3510 State(state),
3511 test_connect_info(),
3512 headers,
3513 Ok(Json(WebhookBody {
3514 message: "hello".into(),
3515 })),
3516 )
3517 .await
3518 .into_response();
3519
3520 assert_eq!(response.status(), StatusCode::OK);
3521 assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 1);
3522 }
3523
3524 fn compute_nextcloud_signature_hex(secret: &str, random: &str, body: &str) -> String {
3525 use hmac::{Hmac, Mac};
3526 use sha2::Sha256;
3527
3528 let payload = format!("{random}{body}");
3529 let mut mac = Hmac::<Sha256>::new_from_slice(secret.as_bytes()).unwrap();
3530 mac.update(payload.as_bytes());
3531 hex::encode(mac.finalize().into_bytes())
3532 }
3533
3534 #[tokio::test]
3535 async fn nextcloud_talk_webhook_returns_not_found_when_not_configured() {
3536 let provider: Arc<dyn Provider> = Arc::new(MockProvider::default());
3537 let memory: Arc<dyn Memory> = Arc::new(MockMemory);
3538
3539 let state = AppState {
3540 config: Arc::new(Mutex::new(Config::default())),
3541 provider,
3542 model: "test-model".into(),
3543 temperature: 0.0,
3544 mem: memory,
3545 auto_save: false,
3546 webhook_secret_hash: None,
3547 pairing: Arc::new(PairingGuard::new(false, &[])),
3548 trust_forwarded_headers: false,
3549 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3550 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3551 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3552 whatsapp: None,
3553 whatsapp_app_secret: None,
3554 linq: None,
3555 linq_signing_secret: None,
3556 nextcloud_talk: None,
3557 nextcloud_talk_webhook_secret: None,
3558 wati: None,
3559 gmail_push: None,
3560 observer: Arc::new(crate::observability::NoopObserver),
3561 tools_registry: Arc::new(Vec::new()),
3562 cost_tracker: None,
3563 audit_logger: None,
3564 event_tx: tokio::sync::broadcast::channel(16).0,
3565 shutdown_tx: tokio::sync::watch::channel(false).0,
3566 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3567 path_prefix: String::new(),
3568 session_backend: None,
3569 session_queue: std::sync::Arc::new(
3570 crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3571 ),
3572 device_registry: None,
3573 pending_pairings: None,
3574 canvas_store: CanvasStore::new(),
3575 mcp_registry: None,
3576 approval_registry: approval_registry::global(),
3577 mcp_local_url: None,
3578 #[cfg(feature = "webauthn")]
3579 webauthn: None,
3580 };
3581
3582 let response = Box::pin(handle_nextcloud_talk_webhook(
3583 State(state),
3584 HeaderMap::new(),
3585 Bytes::from_static(br#"{"type":"message"}"#),
3586 ))
3587 .await
3588 .into_response();
3589
3590 assert_eq!(response.status(), StatusCode::NOT_FOUND);
3591 }
3592
3593 #[tokio::test]
3594 async fn nextcloud_talk_webhook_rejects_invalid_signature() {
3595 let provider_impl = Arc::new(MockProvider::default());
3596 let provider: Arc<dyn Provider> = provider_impl.clone();
3597 let memory: Arc<dyn Memory> = Arc::new(MockMemory);
3598
3599 let channel = Arc::new(NextcloudTalkChannel::new(
3600 "https://cloud.example.com".into(),
3601 "app-token".into(),
3602 String::new(),
3603 vec!["*".into()],
3604 ));
3605
3606 let secret = "nextcloud-test-secret";
3607 let random = "seed-value";
3608 let body = r#"{"type":"message","object":{"token":"room-token"},"message":{"actorType":"users","actorId":"user_a","message":"hello"}}"#;
3609 let _valid_signature = compute_nextcloud_signature_hex(secret, random, body);
3610 let invalid_signature = "deadbeef";
3611
3612 let state = AppState {
3613 config: Arc::new(Mutex::new(Config::default())),
3614 provider,
3615 model: "test-model".into(),
3616 temperature: 0.0,
3617 mem: memory,
3618 auto_save: false,
3619 webhook_secret_hash: None,
3620 pairing: Arc::new(PairingGuard::new(false, &[])),
3621 trust_forwarded_headers: false,
3622 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3623 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3624 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3625 whatsapp: None,
3626 whatsapp_app_secret: None,
3627 linq: None,
3628 linq_signing_secret: None,
3629 nextcloud_talk: Some(channel),
3630 nextcloud_talk_webhook_secret: Some(Arc::from(secret)),
3631 wati: None,
3632 gmail_push: None,
3633 observer: Arc::new(crate::observability::NoopObserver),
3634 tools_registry: Arc::new(Vec::new()),
3635 cost_tracker: None,
3636 audit_logger: None,
3637 event_tx: tokio::sync::broadcast::channel(16).0,
3638 shutdown_tx: tokio::sync::watch::channel(false).0,
3639 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3640 path_prefix: String::new(),
3641 session_backend: None,
3642 session_queue: std::sync::Arc::new(
3643 crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3644 ),
3645 device_registry: None,
3646 pending_pairings: None,
3647 canvas_store: CanvasStore::new(),
3648 mcp_registry: None,
3649 approval_registry: approval_registry::global(),
3650 mcp_local_url: None,
3651 #[cfg(feature = "webauthn")]
3652 webauthn: None,
3653 };
3654
3655 let mut headers = HeaderMap::new();
3656 headers.insert(
3657 "X-Nextcloud-Talk-Random",
3658 HeaderValue::from_str(random).unwrap(),
3659 );
3660 headers.insert(
3661 "X-Nextcloud-Talk-Signature",
3662 HeaderValue::from_str(invalid_signature).unwrap(),
3663 );
3664
3665 let response = Box::pin(handle_nextcloud_talk_webhook(
3666 State(state),
3667 headers,
3668 Bytes::from(body),
3669 ))
3670 .await
3671 .into_response();
3672 assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
3673 assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 0);
3674 }
3675
3676 fn compute_whatsapp_signature_hex(secret: &str, body: &[u8]) -> String {
3681 use hmac::{Hmac, Mac};
3682 use sha2::Sha256;
3683
3684 let mut mac = Hmac::<Sha256>::new_from_slice(secret.as_bytes()).unwrap();
3685 mac.update(body);
3686 hex::encode(mac.finalize().into_bytes())
3687 }
3688
3689 fn compute_whatsapp_signature_header(secret: &str, body: &[u8]) -> String {
3690 format!("sha256={}", compute_whatsapp_signature_hex(secret, body))
3691 }
3692
3693 #[test]
3694 fn whatsapp_signature_valid() {
3695 let app_secret = generate_test_secret();
3696 let body = b"test body content";
3697
3698 let signature_header = compute_whatsapp_signature_header(&app_secret, body);
3699
3700 assert!(verify_whatsapp_signature(
3701 &app_secret,
3702 body,
3703 &signature_header
3704 ));
3705 }
3706
3707 #[test]
3708 fn whatsapp_signature_invalid_wrong_secret() {
3709 let app_secret = generate_test_secret();
3710 let wrong_secret = generate_test_secret();
3711 let body = b"test body content";
3712
3713 let signature_header = compute_whatsapp_signature_header(&wrong_secret, body);
3714
3715 assert!(!verify_whatsapp_signature(
3716 &app_secret,
3717 body,
3718 &signature_header
3719 ));
3720 }
3721
3722 #[test]
3723 fn whatsapp_signature_invalid_wrong_body() {
3724 let app_secret = generate_test_secret();
3725 let original_body = b"original body";
3726 let tampered_body = b"tampered body";
3727
3728 let signature_header = compute_whatsapp_signature_header(&app_secret, original_body);
3729
3730 assert!(!verify_whatsapp_signature(
3732 &app_secret,
3733 tampered_body,
3734 &signature_header
3735 ));
3736 }
3737
3738 #[test]
3739 fn whatsapp_signature_missing_prefix() {
3740 let app_secret = generate_test_secret();
3741 let body = b"test body";
3742
3743 let signature_header = "abc123def456";
3745
3746 assert!(!verify_whatsapp_signature(
3747 &app_secret,
3748 body,
3749 signature_header
3750 ));
3751 }
3752
3753 #[test]
3754 fn whatsapp_signature_empty_header() {
3755 let app_secret = generate_test_secret();
3756 let body = b"test body";
3757
3758 assert!(!verify_whatsapp_signature(&app_secret, body, ""));
3759 }
3760
3761 #[test]
3762 fn whatsapp_signature_invalid_hex() {
3763 let app_secret = generate_test_secret();
3764 let body = b"test body";
3765
3766 let signature_header = "sha256=not_valid_hex_zzz";
3768
3769 assert!(!verify_whatsapp_signature(
3770 &app_secret,
3771 body,
3772 signature_header
3773 ));
3774 }
3775
3776 #[test]
3777 fn whatsapp_signature_empty_body() {
3778 let app_secret = generate_test_secret();
3779 let body = b"";
3780
3781 let signature_header = compute_whatsapp_signature_header(&app_secret, body);
3782
3783 assert!(verify_whatsapp_signature(
3784 &app_secret,
3785 body,
3786 &signature_header
3787 ));
3788 }
3789
3790 #[test]
3791 fn whatsapp_signature_unicode_body() {
3792 let app_secret = generate_test_secret();
3793 let body = "Hello 🦀 World".as_bytes();
3794
3795 let signature_header = compute_whatsapp_signature_header(&app_secret, body);
3796
3797 assert!(verify_whatsapp_signature(
3798 &app_secret,
3799 body,
3800 &signature_header
3801 ));
3802 }
3803
3804 #[test]
3805 fn whatsapp_signature_json_payload() {
3806 let app_secret = generate_test_secret();
3807 let body = br#"{"entry":[{"changes":[{"value":{"messages":[{"from":"1234567890","text":{"body":"Hello"}}]}}]}]}"#;
3808
3809 let signature_header = compute_whatsapp_signature_header(&app_secret, body);
3810
3811 assert!(verify_whatsapp_signature(
3812 &app_secret,
3813 body,
3814 &signature_header
3815 ));
3816 }
3817
3818 #[test]
3819 fn whatsapp_signature_case_sensitive_prefix() {
3820 let app_secret = generate_test_secret();
3821 let body = b"test body";
3822
3823 let hex_sig = compute_whatsapp_signature_hex(&app_secret, body);
3824
3825 let wrong_prefix = format!("SHA256={hex_sig}");
3827 assert!(!verify_whatsapp_signature(&app_secret, body, &wrong_prefix));
3828
3829 let correct_prefix = format!("sha256={hex_sig}");
3831 assert!(verify_whatsapp_signature(
3832 &app_secret,
3833 body,
3834 &correct_prefix
3835 ));
3836 }
3837
3838 #[test]
3839 fn whatsapp_signature_truncated_hex() {
3840 let app_secret = generate_test_secret();
3841 let body = b"test body";
3842
3843 let hex_sig = compute_whatsapp_signature_hex(&app_secret, body);
3844 let truncated = &hex_sig[..32]; let signature_header = format!("sha256={truncated}");
3846
3847 assert!(!verify_whatsapp_signature(
3848 &app_secret,
3849 body,
3850 &signature_header
3851 ));
3852 }
3853
3854 #[test]
3855 fn whatsapp_signature_extra_bytes() {
3856 let app_secret = generate_test_secret();
3857 let body = b"test body";
3858
3859 let hex_sig = compute_whatsapp_signature_hex(&app_secret, body);
3860 let extended = format!("{hex_sig}deadbeef");
3861 let signature_header = format!("sha256={extended}");
3862
3863 assert!(!verify_whatsapp_signature(
3864 &app_secret,
3865 body,
3866 &signature_header
3867 ));
3868 }
3869
3870 #[test]
3875 fn idempotency_store_allows_different_keys() {
3876 let store = IdempotencyStore::new(Duration::from_secs(60), 100);
3877 assert!(store.record_if_new("key-a"));
3878 assert!(store.record_if_new("key-b"));
3879 assert!(store.record_if_new("key-c"));
3880 assert!(store.record_if_new("key-d"));
3881 }
3882
3883 #[test]
3884 fn idempotency_store_max_keys_clamped_to_one() {
3885 let store = IdempotencyStore::new(Duration::from_secs(60), 0);
3886 assert!(store.record_if_new("only-key"));
3887 assert!(!store.record_if_new("only-key"));
3888 }
3889
3890 #[test]
3891 fn idempotency_store_rapid_duplicate_rejected() {
3892 let store = IdempotencyStore::new(Duration::from_secs(300), 100);
3893 assert!(store.record_if_new("rapid"));
3894 assert!(!store.record_if_new("rapid"));
3895 }
3896
3897 #[test]
3898 fn idempotency_store_accepts_after_ttl_expires() {
3899 let store = IdempotencyStore::new(Duration::from_millis(1), 100);
3900 assert!(store.record_if_new("ttl-key"));
3901 std::thread::sleep(Duration::from_millis(10));
3902 assert!(store.record_if_new("ttl-key"));
3903 }
3904
3905 #[test]
3906 fn idempotency_store_eviction_preserves_newest() {
3907 let store = IdempotencyStore::new(Duration::from_secs(300), 1);
3908 assert!(store.record_if_new("old-key"));
3909 std::thread::sleep(Duration::from_millis(2));
3910 assert!(store.record_if_new("new-key"));
3911
3912 let keys = store.keys.lock();
3913 assert_eq!(keys.len(), 1);
3914 assert!(!keys.contains_key("old-key"));
3915 assert!(keys.contains_key("new-key"));
3916 }
3917
3918 #[test]
3919 fn rate_limiter_allows_after_window_expires() {
3920 let window = Duration::from_millis(50);
3921 let limiter = SlidingWindowRateLimiter::new(2, window, 100);
3922 assert!(limiter.allow("ip-1"));
3923 assert!(limiter.allow("ip-1"));
3924 assert!(!limiter.allow("ip-1")); std::thread::sleep(Duration::from_millis(60));
3928
3929 assert!(limiter.allow("ip-1"));
3931 }
3932
3933 #[test]
3934 fn rate_limiter_independent_keys_tracked_separately() {
3935 let limiter = SlidingWindowRateLimiter::new(2, Duration::from_secs(60), 100);
3936 assert!(limiter.allow("ip-1"));
3937 assert!(limiter.allow("ip-1"));
3938 assert!(!limiter.allow("ip-1")); assert!(limiter.allow("ip-2"));
3942 assert!(limiter.allow("ip-2"));
3943 assert!(!limiter.allow("ip-2")); }
3945
3946 #[test]
3947 fn rate_limiter_exact_boundary_at_max_keys() {
3948 let limiter = SlidingWindowRateLimiter::new(10, Duration::from_secs(60), 3);
3949 assert!(limiter.allow("ip-1"));
3950 assert!(limiter.allow("ip-2"));
3951 assert!(limiter.allow("ip-3"));
3952 assert!(limiter.allow("ip-4")); let guard = limiter.requests.lock();
3956 assert_eq!(guard.0.len(), 3);
3957 assert!(
3958 !guard.0.contains_key("ip-1"),
3959 "ip-1 should have been evicted"
3960 );
3961 assert!(guard.0.contains_key("ip-2"));
3962 assert!(guard.0.contains_key("ip-3"));
3963 assert!(guard.0.contains_key("ip-4"));
3964 }
3965
3966 #[test]
3967 fn gateway_rate_limiter_pair_and_webhook_are_independent() {
3968 let limiter = GatewayRateLimiter::new(2, 3, 100);
3969
3970 assert!(limiter.allow_pair("ip-1"));
3972 assert!(limiter.allow_pair("ip-1"));
3973 assert!(!limiter.allow_pair("ip-1")); assert!(limiter.allow_webhook("ip-1"));
3977 assert!(limiter.allow_webhook("ip-1"));
3978 assert!(limiter.allow_webhook("ip-1"));
3979 assert!(!limiter.allow_webhook("ip-1")); }
3981
3982 #[test]
3983 fn rate_limiter_single_key_max_allows_one_request() {
3984 let limiter = SlidingWindowRateLimiter::new(5, Duration::from_secs(60), 1);
3985 assert!(limiter.allow("ip-1"));
3986 assert!(limiter.allow("ip-2")); let guard = limiter.requests.lock();
3989 assert_eq!(guard.0.len(), 1);
3990 assert!(guard.0.contains_key("ip-2"));
3991 assert!(!guard.0.contains_key("ip-1"));
3992 }
3993
3994 #[test]
3995 fn rate_limiter_concurrent_access_safe() {
3996 use std::sync::Arc;
3997
3998 let limiter = Arc::new(SlidingWindowRateLimiter::new(
3999 1000,
4000 Duration::from_secs(60),
4001 1000,
4002 ));
4003 let mut handles = Vec::new();
4004
4005 for i in 0..10 {
4006 let limiter = limiter.clone();
4007 handles.push(std::thread::spawn(move || {
4008 for j in 0..100 {
4009 limiter.allow(&format!("thread-{i}-req-{j}"));
4010 }
4011 }));
4012 }
4013
4014 for handle in handles {
4015 handle.join().unwrap();
4016 }
4017
4018 let guard = limiter.requests.lock();
4020 assert!(guard.0.len() <= 1000, "should respect max_keys");
4021 }
4022
4023 #[test]
4024 fn idempotency_store_concurrent_access_safe() {
4025 use std::sync::Arc;
4026
4027 let store = Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000));
4028 let mut handles = Vec::new();
4029
4030 for i in 0..10 {
4031 let store = store.clone();
4032 handles.push(std::thread::spawn(move || {
4033 for j in 0..100 {
4034 store.record_if_new(&format!("thread-{i}-key-{j}"));
4035 }
4036 }));
4037 }
4038
4039 for handle in handles {
4040 handle.join().unwrap();
4041 }
4042
4043 let keys = store.keys.lock();
4044 assert!(keys.len() <= 1000, "should respect max_keys");
4045 }
4046
4047 #[test]
4048 fn rate_limiter_rapid_burst_then_cooldown() {
4049 let limiter = SlidingWindowRateLimiter::new(5, Duration::from_millis(50), 100);
4050
4051 for _ in 0..5 {
4053 assert!(limiter.allow("burst-ip"));
4054 }
4055 assert!(!limiter.allow("burst-ip")); std::thread::sleep(Duration::from_millis(60));
4059
4060 assert!(limiter.allow("burst-ip"));
4062 }
4063
4064 #[test]
4065 fn require_localhost_accepts_ipv4_loopback() {
4066 let peer = SocketAddr::from(([127, 0, 0, 1], 12345));
4067 assert!(require_localhost(&peer).is_ok());
4068 }
4069
4070 #[test]
4071 fn require_localhost_accepts_ipv6_loopback() {
4072 let peer = SocketAddr::from((std::net::Ipv6Addr::LOCALHOST, 12345));
4073 assert!(require_localhost(&peer).is_ok());
4074 }
4075
4076 #[test]
4077 fn require_localhost_rejects_non_loopback_ipv4() {
4078 let peer = SocketAddr::from(([192, 168, 1, 100], 12345));
4079 let err = require_localhost(&peer).unwrap_err();
4080 assert_eq!(err.0, StatusCode::FORBIDDEN);
4081 }
4082
4083 #[test]
4084 fn require_localhost_rejects_non_loopback_ipv6() {
4085 let peer = SocketAddr::from((
4086 std::net::Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1),
4087 12345,
4088 ));
4089 let err = require_localhost(&peer).unwrap_err();
4090 assert_eq!(err.0, StatusCode::FORBIDDEN);
4091 }
4092}