1pub mod api;
11pub mod api_agents;
12pub mod api_artifact_body;
13pub mod api_clawhub;
14pub mod api_kumiho_proxy;
15pub mod api_mcp;
16pub mod api_memory_graph;
17pub mod api_pairing;
18#[cfg(feature = "plugins-wasm")]
19pub mod api_plugins;
20pub mod api_skills;
21pub mod api_teams;
22#[cfg(feature = "webauthn")]
23pub mod api_webauthn;
24pub mod api_workflows;
25pub mod approval_registry;
26pub mod auth_rate_limit;
27pub mod canvas;
28pub mod kumiho_client;
29pub mod mcp_discovery;
30pub mod nodes;
31pub mod session_queue;
32pub mod sse;
33pub mod static_files;
34#[cfg(not(target_os = "android"))]
38pub mod terminal;
39pub mod tls;
40pub mod ws;
41pub mod ws_mcp_events;
42
43use crate::channels::{
44 Channel, GmailPushChannel, LinqChannel, NextcloudTalkChannel, SendMessage, WatiChannel,
45 WhatsAppChannel, session_backend::SessionBackend, session_sqlite::SqliteSessionBackend,
46};
47use crate::config::Config;
48use crate::cost::CostTracker;
49use crate::memory::{self, Memory, MemoryCategory};
50use crate::providers::{self, ChatMessage, Provider};
51use crate::runtime;
52use crate::security::SecurityPolicy;
53use crate::security::pairing::{PairingGuard, constant_time_eq, is_public_bind};
54use crate::tools;
55use crate::tools::canvas::CanvasStore;
56use crate::tools::traits::ToolSpec;
57use crate::util::truncate_with_ellipsis;
58use anyhow::{Context, Result};
59use axum::{
60 Router,
61 body::Bytes,
62 extract::{ConnectInfo, Query, State},
63 http::{HeaderMap, StatusCode, header},
64 response::{IntoResponse, Json},
65 routing::{delete, get, post, put},
66};
67use parking_lot::Mutex;
68use std::collections::HashMap;
69use std::net::{IpAddr, SocketAddr};
70use std::sync::Arc;
71use std::time::{Duration, Instant};
72use tower_http::limit::RequestBodyLimitLayer;
73use tower_http::timeout::TimeoutLayer;
74use uuid::Uuid;
75
76pub const MAX_BODY_SIZE: usize = 65_536;
78pub const REQUEST_TIMEOUT_SECS: u64 = 30;
80
81pub fn gateway_request_timeout_secs() -> u64 {
88 std::env::var("CONSTRUCT_GATEWAY_TIMEOUT_SECS")
89 .ok()
90 .and_then(|v| v.parse().ok())
91 .unwrap_or(REQUEST_TIMEOUT_SECS)
92}
93pub const RATE_LIMIT_WINDOW_SECS: u64 = 60;
95pub const RATE_LIMIT_MAX_KEYS_DEFAULT: usize = 10_000;
97pub const IDEMPOTENCY_MAX_KEYS_DEFAULT: usize = 10_000;
99
100fn webhook_memory_key() -> String {
101 format!("webhook_msg_{}", Uuid::new_v4())
102}
103
104fn whatsapp_memory_key(msg: &crate::channels::traits::ChannelMessage) -> String {
105 format!("whatsapp_{}_{}", msg.sender, msg.id)
106}
107
108fn linq_memory_key(msg: &crate::channels::traits::ChannelMessage) -> String {
109 format!("linq_{}_{}", msg.sender, msg.id)
110}
111
112fn wati_memory_key(msg: &crate::channels::traits::ChannelMessage) -> String {
113 format!("wati_{}_{}", msg.sender, msg.id)
114}
115
116fn nextcloud_talk_memory_key(msg: &crate::channels::traits::ChannelMessage) -> String {
117 format!("nextcloud_talk_{}_{}", msg.sender, msg.id)
118}
119
120fn sender_session_id(channel: &str, msg: &crate::channels::traits::ChannelMessage) -> String {
121 match &msg.thread_ts {
122 Some(thread_id) => format!("{channel}_{thread_id}_{}", msg.sender),
123 None => format!("{channel}_{}", msg.sender),
124 }
125}
126
127fn webhook_session_id(headers: &HeaderMap) -> Option<String> {
128 headers
129 .get("X-Session-Id")
130 .and_then(|v| v.to_str().ok())
131 .map(str::trim)
132 .filter(|value| !value.is_empty())
133 .map(str::to_owned)
134}
135
136fn hash_webhook_secret(value: &str) -> String {
137 use sha2::{Digest, Sha256};
138
139 let digest = Sha256::digest(value.as_bytes());
140 hex::encode(digest)
141}
142
143const RATE_LIMITER_SWEEP_INTERVAL_SECS: u64 = 300; #[derive(Debug)]
147struct SlidingWindowRateLimiter {
148 limit_per_window: u32,
149 window: Duration,
150 max_keys: usize,
151 requests: Mutex<(HashMap<String, Vec<Instant>>, Instant)>,
152}
153
154impl SlidingWindowRateLimiter {
155 fn new(limit_per_window: u32, window: Duration, max_keys: usize) -> Self {
156 Self {
157 limit_per_window,
158 window,
159 max_keys: max_keys.max(1),
160 requests: Mutex::new((HashMap::new(), Instant::now())),
161 }
162 }
163
164 fn prune_stale(requests: &mut HashMap<String, Vec<Instant>>, cutoff: Instant) {
165 requests.retain(|_, timestamps| {
166 timestamps.retain(|t| *t > cutoff);
167 !timestamps.is_empty()
168 });
169 }
170
171 fn allow(&self, key: &str) -> bool {
172 if self.limit_per_window == 0 {
173 return true;
174 }
175
176 let now = Instant::now();
177 let cutoff = now.checked_sub(self.window).unwrap_or_else(Instant::now);
178
179 let mut guard = self.requests.lock();
180 let (requests, last_sweep) = &mut *guard;
181
182 if last_sweep.elapsed() >= Duration::from_secs(RATE_LIMITER_SWEEP_INTERVAL_SECS) {
184 Self::prune_stale(requests, cutoff);
185 *last_sweep = now;
186 }
187
188 if !requests.contains_key(key) && requests.len() >= self.max_keys {
189 Self::prune_stale(requests, cutoff);
191 *last_sweep = now;
192
193 if requests.len() >= self.max_keys {
194 let evict_key = requests
195 .iter()
196 .min_by_key(|(_, timestamps)| timestamps.last().copied().unwrap_or(cutoff))
197 .map(|(k, _)| k.clone());
198 if let Some(evict_key) = evict_key {
199 requests.remove(&evict_key);
200 }
201 }
202 }
203
204 let entry = requests.entry(key.to_owned()).or_default();
205 entry.retain(|instant| *instant > cutoff);
206
207 if entry.len() >= self.limit_per_window as usize {
208 return false;
209 }
210
211 entry.push(now);
212 true
213 }
214}
215
216#[derive(Debug)]
217pub struct GatewayRateLimiter {
218 pair: SlidingWindowRateLimiter,
219 webhook: SlidingWindowRateLimiter,
220}
221
222impl GatewayRateLimiter {
223 fn new(pair_per_minute: u32, webhook_per_minute: u32, max_keys: usize) -> Self {
224 let window = Duration::from_secs(RATE_LIMIT_WINDOW_SECS);
225 Self {
226 pair: SlidingWindowRateLimiter::new(pair_per_minute, window, max_keys),
227 webhook: SlidingWindowRateLimiter::new(webhook_per_minute, window, max_keys),
228 }
229 }
230
231 fn allow_pair(&self, key: &str) -> bool {
232 self.pair.allow(key)
233 }
234
235 fn allow_webhook(&self, key: &str) -> bool {
236 self.webhook.allow(key)
237 }
238}
239
240#[derive(Debug)]
241pub struct IdempotencyStore {
242 ttl: Duration,
243 max_keys: usize,
244 keys: Mutex<HashMap<String, Instant>>,
245}
246
247impl IdempotencyStore {
248 fn new(ttl: Duration, max_keys: usize) -> Self {
249 Self {
250 ttl,
251 max_keys: max_keys.max(1),
252 keys: Mutex::new(HashMap::new()),
253 }
254 }
255
256 fn record_if_new(&self, key: &str) -> bool {
258 let now = Instant::now();
259 let mut keys = self.keys.lock();
260
261 keys.retain(|_, seen_at| now.duration_since(*seen_at) < self.ttl);
262
263 if keys.contains_key(key) {
264 return false;
265 }
266
267 if keys.len() >= self.max_keys {
268 let evict_key = keys
269 .iter()
270 .min_by_key(|(_, seen_at)| *seen_at)
271 .map(|(k, _)| k.clone());
272 if let Some(evict_key) = evict_key {
273 keys.remove(&evict_key);
274 }
275 }
276
277 keys.insert(key.to_owned(), now);
278 true
279 }
280}
281
282fn parse_client_ip(value: &str) -> Option<IpAddr> {
283 let value = value.trim().trim_matches('"').trim();
284 if value.is_empty() {
285 return None;
286 }
287
288 if let Ok(ip) = value.parse::<IpAddr>() {
289 return Some(ip);
290 }
291
292 if let Ok(addr) = value.parse::<SocketAddr>() {
293 return Some(addr.ip());
294 }
295
296 let value = value.trim_matches(['[', ']']);
297 value.parse::<IpAddr>().ok()
298}
299
300fn forwarded_client_ip(headers: &HeaderMap) -> Option<IpAddr> {
301 if let Some(xff) = headers.get("X-Forwarded-For").and_then(|v| v.to_str().ok()) {
306 for candidate in xff.rsplit(',') {
307 if let Some(ip) = parse_client_ip(candidate) {
308 return Some(ip);
309 }
310 }
311 }
312
313 headers
314 .get("X-Real-IP")
315 .and_then(|v| v.to_str().ok())
316 .and_then(parse_client_ip)
317}
318
319pub(super) fn client_key_from_request(
320 peer_addr: Option<SocketAddr>,
321 headers: &HeaderMap,
322 trust_forwarded_headers: bool,
323) -> String {
324 if trust_forwarded_headers {
325 if let Some(ip) = forwarded_client_ip(headers) {
326 return ip.to_string();
327 }
328 }
329
330 peer_addr
331 .map(|addr| addr.ip().to_string())
332 .unwrap_or_else(|| "unknown".to_string())
333}
334
335fn normalize_max_keys(configured: usize, fallback: usize) -> usize {
336 if configured == 0 {
337 fallback.max(1)
338 } else {
339 configured
340 }
341}
342
343#[derive(Clone)]
345pub struct AppState {
346 pub config: Arc<Mutex<Config>>,
347 pub provider: Arc<dyn Provider>,
348 pub model: String,
349 pub temperature: f64,
350 pub mem: Arc<dyn Memory>,
351 pub auto_save: bool,
352 pub webhook_secret_hash: Option<Arc<str>>,
354 pub pairing: Arc<PairingGuard>,
355 pub trust_forwarded_headers: bool,
356 pub rate_limiter: Arc<GatewayRateLimiter>,
357 pub auth_limiter: Arc<auth_rate_limit::AuthRateLimiter>,
358 pub idempotency_store: Arc<IdempotencyStore>,
359 pub whatsapp: Option<Arc<WhatsAppChannel>>,
360 pub whatsapp_app_secret: Option<Arc<str>>,
362 pub linq: Option<Arc<LinqChannel>>,
363 pub linq_signing_secret: Option<Arc<str>>,
365 pub nextcloud_talk: Option<Arc<NextcloudTalkChannel>>,
366 pub nextcloud_talk_webhook_secret: Option<Arc<str>>,
368 pub wati: Option<Arc<WatiChannel>>,
369 pub gmail_push: Option<Arc<GmailPushChannel>>,
371 pub observer: Arc<dyn crate::observability::Observer>,
373 pub tools_registry: Arc<Vec<ToolSpec>>,
375 pub cost_tracker: Option<Arc<CostTracker>>,
377 pub audit_logger: Option<Arc<crate::security::audit::AuditLogger>>,
379 pub event_tx: tokio::sync::broadcast::Sender<serde_json::Value>,
381 pub shutdown_tx: tokio::sync::watch::Sender<bool>,
383 pub node_registry: Arc<nodes::NodeRegistry>,
385 pub path_prefix: String,
387 pub session_backend: Option<Arc<dyn SessionBackend>>,
389 pub session_queue: Arc<session_queue::SessionActorQueue>,
391 pub device_registry: Option<Arc<api_pairing::DeviceRegistry>>,
393 pub pending_pairings: Option<Arc<api_pairing::PairingStore>>,
395 pub canvas_store: CanvasStore,
397 pub mcp_registry: Option<Arc<tools::McpRegistry>>,
399 #[cfg(feature = "webauthn")]
401 pub webauthn: Option<Arc<api_webauthn::WebAuthnState>>,
402 pub approval_registry: Arc<approval_registry::ApprovalRegistry>,
404 pub mcp_local_url: Option<Arc<str>>,
409}
410
411#[allow(clippy::too_many_lines)]
413pub async fn run_gateway(host: &str, port: u16, config: Config) -> Result<()> {
414 if is_public_bind(host) && config.tunnel.provider == "none" && !config.gateway.allow_public_bind
416 {
417 tracing::warn!(
418 "⚠️ Binding to {host} — gateway will be exposed to all network interfaces.\n\
419 Suggestion: use --host 127.0.0.1 (default), configure a tunnel, or set\n\
420 [gateway] allow_public_bind = true in config.toml to silence this warning.\n\n\
421 Docker/VM: if you are running inside a container or VM, this is expected."
422 );
423 }
424 let config_state = Arc::new(Mutex::new(config.clone()));
425
426 let hooks: Option<std::sync::Arc<crate::hooks::HookRunner>> = if config.hooks.enabled {
428 Some(std::sync::Arc::new(crate::hooks::HookRunner::new()))
429 } else {
430 None
431 };
432
433 let addr: SocketAddr = format!("{host}:{port}").parse()?;
434 let listener = tokio::net::TcpListener::bind(addr).await?;
435 let actual_port = listener.local_addr()?.port();
436 let display_addr = format!("{host}:{actual_port}");
437
438 let provider: Arc<dyn Provider> = Arc::from(providers::create_resilient_provider_with_options(
439 config.default_provider.as_deref().unwrap_or("openrouter"),
440 config.api_key.as_deref(),
441 config.api_url.as_deref(),
442 &config.reliability,
443 &providers::ProviderRuntimeOptions {
444 auth_profile_override: None,
445 provider_api_url: config.api_url.clone(),
446 construct_dir: config.config_path.parent().map(std::path::PathBuf::from),
447 secrets_encrypt: config.secrets.encrypt,
448 reasoning_enabled: config.runtime.reasoning_enabled,
449 reasoning_effort: config.runtime.reasoning_effort.clone(),
450 provider_timeout_secs: Some(config.provider_timeout_secs),
451 extra_headers: config.extra_headers.clone(),
452 api_path: config.api_path.clone(),
453 provider_max_tokens: config.provider_max_tokens,
454 },
455 )?);
456 let model = config
457 .default_model
458 .clone()
459 .unwrap_or_else(|| "anthropic/claude-sonnet-4".into());
460 let temperature = config.default_temperature;
461 let mem: Arc<dyn Memory> = Arc::from(memory::create_memory_with_storage_and_routes(
462 &config.memory,
463 &config.embedding_routes,
464 Some(&config.storage.provider.config),
465 &config.workspace_dir,
466 config.api_key.as_deref(),
467 )?);
468 let runtime: Arc<dyn runtime::RuntimeAdapter> =
469 Arc::from(runtime::create_runtime(&config.runtime)?);
470 let security = Arc::new(SecurityPolicy::from_config(
471 &config.autonomy,
472 &config.workspace_dir,
473 ));
474
475 let (composio_key, composio_entity_id) = if config.composio.enabled {
476 (
477 config.composio.api_key.as_deref(),
478 Some(config.composio.entity_id.as_str()),
479 )
480 } else {
481 (None, None)
482 };
483
484 let canvas_store = tools::canvas::global_store();
485
486 let (
487 mut tools_registry_raw,
488 delegate_handle_gw,
489 _reaction_handle_gw,
490 _channel_map_handle,
491 _ask_user_handle_gw,
492 _escalate_handle_gw,
493 ) = tools::all_tools_with_runtime(
494 Arc::new(config.clone()),
495 &security,
496 runtime,
497 Arc::clone(&mem),
498 composio_key,
499 composio_entity_id,
500 &config.browser,
501 &config.http_request,
502 &config.web_fetch,
503 &config.workspace_dir,
504 &config.agents,
505 config.api_key.as_deref(),
506 &config,
507 Some(canvas_store.clone()),
508 );
509
510 let gateway_mcp_config = {
515 let mut c = config.clone();
516 c = crate::agent::kumiho::inject_kumiho(c, false);
517 c = crate::agent::operator::inject_operator(c, false);
518 c.mcp
519 };
520 let mut mcp_registry_shared: Option<Arc<tools::McpRegistry>> = None;
521 if gateway_mcp_config.enabled && !gateway_mcp_config.servers.is_empty() {
522 tracing::info!(
523 "Gateway: initializing MCP client — {} server(s) configured",
524 gateway_mcp_config.servers.len()
525 );
526 match tools::McpRegistry::connect_all(&gateway_mcp_config.servers).await {
527 Ok(registry) => {
528 let registry = std::sync::Arc::new(registry);
529 mcp_registry_shared = Some(std::sync::Arc::clone(®istry));
530 if gateway_mcp_config.deferred_loading {
531 let operator_prefix =
532 format!("{}__", crate::agent::operator::OPERATOR_SERVER_NAME);
533 let kumiho_prefix = format!("{}__", crate::agent::kumiho::KUMIHO_SERVER_NAME);
534 let all_names = registry.tool_names();
535 let mut eager_count = 0usize;
536
537 for name in &all_names {
538 if name.starts_with(&operator_prefix) || name.starts_with(&kumiho_prefix) {
539 if let Some(def) = registry.get_tool_def(name).await {
540 let wrapper: std::sync::Arc<dyn tools::Tool> =
541 std::sync::Arc::new(tools::McpToolWrapper::new(
542 name.clone(),
543 def,
544 std::sync::Arc::clone(®istry),
545 ));
546 if let Some(ref handle) = delegate_handle_gw {
547 handle.write().push(std::sync::Arc::clone(&wrapper));
548 }
549 tools_registry_raw.push(Box::new(tools::ArcToolRef(wrapper)));
550 eager_count += 1;
551 }
552 }
553 }
554
555 let deferred_set = tools::DeferredMcpToolSet::from_registry_filtered(
556 std::sync::Arc::clone(®istry),
557 |name| {
558 !name.starts_with(&operator_prefix) && !name.starts_with(&kumiho_prefix)
559 },
560 )
561 .await;
562 tracing::info!(
563 "Gateway MCP hybrid: {} eager operator+kumiho tool(s), {} deferred stub(s) from {} server(s)",
564 eager_count,
565 deferred_set.len(),
566 registry.server_count()
567 );
568 let activated =
569 std::sync::Arc::new(std::sync::Mutex::new(tools::ActivatedToolSet::new()));
570 tools_registry_raw.push(Box::new(tools::ToolSearchTool::new(
571 deferred_set,
572 activated,
573 )));
574 } else {
575 let names = registry.tool_names();
576 let mut registered = 0usize;
577 for name in names {
578 if let Some(def) = registry.get_tool_def(&name).await {
579 let wrapper: std::sync::Arc<dyn tools::Tool> =
580 std::sync::Arc::new(tools::McpToolWrapper::new(
581 name,
582 def,
583 std::sync::Arc::clone(®istry),
584 ));
585 if let Some(ref handle) = delegate_handle_gw {
586 handle.write().push(std::sync::Arc::clone(&wrapper));
587 }
588 tools_registry_raw.push(Box::new(tools::ArcToolRef(wrapper)));
589 registered += 1;
590 }
591 }
592 tracing::info!(
593 "Gateway MCP: {} tool(s) registered from {} server(s)",
594 registered,
595 registry.server_count()
596 );
597 }
598 }
599 Err(e) => {
600 tracing::error!("Gateway MCP registry failed to initialize: {e:#}");
601 }
602 }
603 }
604
605 let tools_registry: Arc<Vec<ToolSpec>> =
606 Arc::new(tools_registry_raw.iter().map(|t| t.spec()).collect());
607
608 let cost_tracker = CostTracker::get_or_init_global(config.cost.clone(), &config.workspace_dir);
610
611 let audit_logger = if config.security.audit.enabled {
613 match crate::security::audit::AuditLogger::new(
614 config.security.audit.clone(),
615 std::path::PathBuf::from(&config.workspace_dir),
616 ) {
617 Ok(logger) => Some(Arc::new(logger)),
618 Err(e) => {
619 tracing::warn!("Audit logger init failed (dashboard audit viewer disabled): {e}");
620 None
621 }
622 }
623 } else {
624 None
625 };
626
627 let (event_tx, _event_rx) = tokio::sync::broadcast::channel::<serde_json::Value>(4096);
629 let webhook_secret_hash: Option<Arc<str>> =
631 config.channels_config.webhook.as_ref().and_then(|webhook| {
632 webhook.secret.as_ref().and_then(|raw_secret| {
633 let trimmed_secret = raw_secret.trim();
634 (!trimmed_secret.is_empty())
635 .then(|| Arc::<str>::from(hash_webhook_secret(trimmed_secret)))
636 })
637 });
638
639 let whatsapp_channel: Option<Arc<WhatsAppChannel>> = config
641 .channels_config
642 .whatsapp
643 .as_ref()
644 .filter(|wa| wa.is_cloud_config())
645 .map(|wa| {
646 Arc::new(WhatsAppChannel::new(
647 wa.access_token.clone().unwrap_or_default(),
648 wa.phone_number_id.clone().unwrap_or_default(),
649 wa.verify_token.clone().unwrap_or_default(),
650 wa.allowed_numbers.clone(),
651 ))
652 });
653
654 let whatsapp_app_secret: Option<Arc<str>> = std::env::var("CONSTRUCT_WHATSAPP_APP_SECRET")
657 .ok()
658 .and_then(|secret| {
659 let secret = secret.trim();
660 (!secret.is_empty()).then(|| secret.to_owned())
661 })
662 .or_else(|| {
663 config.channels_config.whatsapp.as_ref().and_then(|wa| {
664 wa.app_secret
665 .as_deref()
666 .map(str::trim)
667 .filter(|secret| !secret.is_empty())
668 .map(ToOwned::to_owned)
669 })
670 })
671 .map(Arc::from);
672
673 let linq_channel: Option<Arc<LinqChannel>> = config.channels_config.linq.as_ref().map(|lq| {
675 Arc::new(LinqChannel::new(
676 lq.api_token.clone(),
677 lq.from_phone.clone(),
678 lq.allowed_senders.clone(),
679 ))
680 });
681
682 let linq_signing_secret: Option<Arc<str>> = std::env::var("CONSTRUCT_LINQ_SIGNING_SECRET")
685 .ok()
686 .and_then(|secret| {
687 let secret = secret.trim();
688 (!secret.is_empty()).then(|| secret.to_owned())
689 })
690 .or_else(|| {
691 config.channels_config.linq.as_ref().and_then(|lq| {
692 lq.signing_secret
693 .as_deref()
694 .map(str::trim)
695 .filter(|secret| !secret.is_empty())
696 .map(ToOwned::to_owned)
697 })
698 })
699 .map(Arc::from);
700
701 let wati_channel: Option<Arc<WatiChannel>> =
703 config.channels_config.wati.as_ref().map(|wati_cfg| {
704 Arc::new(
705 WatiChannel::new(
706 wati_cfg.api_token.clone(),
707 wati_cfg.api_url.clone(),
708 wati_cfg.tenant_id.clone(),
709 wati_cfg.allowed_numbers.clone(),
710 )
711 .with_transcription(config.transcription.clone()),
712 )
713 });
714
715 let nextcloud_talk_channel: Option<Arc<NextcloudTalkChannel>> =
717 config.channels_config.nextcloud_talk.as_ref().map(|nc| {
718 Arc::new(NextcloudTalkChannel::new(
719 nc.base_url.clone(),
720 nc.app_token.clone(),
721 nc.bot_name.clone().unwrap_or_default(),
722 nc.allowed_users.clone(),
723 ))
724 });
725
726 let nextcloud_talk_webhook_secret: Option<Arc<str>> =
729 std::env::var("CONSTRUCT_NEXTCLOUD_TALK_WEBHOOK_SECRET")
730 .ok()
731 .and_then(|secret| {
732 let secret = secret.trim();
733 (!secret.is_empty()).then(|| secret.to_owned())
734 })
735 .or_else(|| {
736 config
737 .channels_config
738 .nextcloud_talk
739 .as_ref()
740 .and_then(|nc| {
741 nc.webhook_secret
742 .as_deref()
743 .map(str::trim)
744 .filter(|secret| !secret.is_empty())
745 .map(ToOwned::to_owned)
746 })
747 })
748 .map(Arc::from);
749
750 let gmail_push_channel: Option<Arc<GmailPushChannel>> = config
752 .channels_config
753 .gmail_push
754 .as_ref()
755 .filter(|gp| gp.enabled)
756 .map(|gp| Arc::new(GmailPushChannel::new(gp.clone())));
757
758 let session_backend: Option<Arc<dyn SessionBackend>> = if config.gateway.session_persistence {
760 match SqliteSessionBackend::new(&config.workspace_dir) {
761 Ok(b) => {
762 tracing::info!("Gateway session persistence enabled (SQLite)");
763 if config.gateway.session_ttl_hours > 0 {
764 if let Ok(cleaned) = b.cleanup_stale(config.gateway.session_ttl_hours) {
765 if cleaned > 0 {
766 tracing::info!("Cleaned up {cleaned} stale gateway sessions");
767 }
768 }
769 }
770 Some(Arc::new(b))
771 }
772 Err(e) => {
773 tracing::warn!("Session persistence disabled: {e}");
774 None
775 }
776 }
777 } else {
778 None
779 };
780
781 let pairing = Arc::new(PairingGuard::new(
783 config.gateway.require_pairing,
784 &config.gateway.paired_tokens,
785 ));
786 let rate_limit_max_keys = normalize_max_keys(
787 config.gateway.rate_limit_max_keys,
788 RATE_LIMIT_MAX_KEYS_DEFAULT,
789 );
790 let rate_limiter = Arc::new(GatewayRateLimiter::new(
791 config.gateway.pair_rate_limit_per_minute,
792 config.gateway.webhook_rate_limit_per_minute,
793 rate_limit_max_keys,
794 ));
795 let idempotency_max_keys = normalize_max_keys(
796 config.gateway.idempotency_max_keys,
797 IDEMPOTENCY_MAX_KEYS_DEFAULT,
798 );
799 let idempotency_store = Arc::new(IdempotencyStore::new(
800 Duration::from_secs(config.gateway.idempotency_ttl_secs.max(1)),
801 idempotency_max_keys,
802 ));
803
804 let path_prefix: Option<&str> = config
806 .gateway
807 .path_prefix
808 .as_deref()
809 .filter(|p| !p.is_empty());
810
811 let tunnel = crate::tunnel::create_tunnel(&config.tunnel)?;
813 let mut tunnel_url: Option<String> = None;
814
815 if let Some(ref tun) = tunnel {
816 println!("🔗 Starting {} tunnel...", tun.name());
817 match tun.start(host, actual_port).await {
818 Ok(url) => {
819 println!("🌐 Tunnel active: {url}");
820 tunnel_url = Some(url);
821 }
822 Err(e) => {
823 println!("⚠️ Tunnel failed to start: {e}");
824 println!(" Falling back to local-only mode.");
825 }
826 }
827 }
828
829 let pfx = path_prefix.unwrap_or("");
830 println!("🦀 Construct Gateway listening on http://{display_addr}{pfx}");
831 if let Some(ref url) = tunnel_url {
832 println!(" 🌐 Public URL: {url}");
833 }
834 println!(" 🌐 Web Dashboard: http://{display_addr}{pfx}/");
835 if let Some(code) = pairing.pairing_code() {
836 println!();
837 println!(" 🔐 PAIRING REQUIRED — use this one-time code:");
838 println!(" ┌──────────────┐");
839 println!(" │ {code} │");
840 println!(" └──────────────┘");
841 println!(" Send: POST {pfx}/pair with header X-Pairing-Code: {code}");
842 } else if pairing.require_pairing() {
843 println!(" 🔒 Pairing: ACTIVE (bearer token required)");
844 println!(" To pair a new device: construct gateway get-paircode --new");
845 println!();
846 } else {
847 println!(" ⚠️ Pairing: DISABLED (all requests accepted)");
848 println!();
849 }
850 println!(" POST {pfx}/pair — pair a new client (X-Pairing-Code header)");
851 println!(" POST {pfx}/webhook — {{\"message\": \"your prompt\"}}");
852 if whatsapp_channel.is_some() {
853 println!(" GET {pfx}/whatsapp — Meta webhook verification");
854 println!(" POST {pfx}/whatsapp — WhatsApp message webhook");
855 }
856 if linq_channel.is_some() {
857 println!(" POST {pfx}/linq — Linq message webhook (iMessage/RCS/SMS)");
858 }
859 if wati_channel.is_some() {
860 println!(" GET {pfx}/wati — WATI webhook verification");
861 println!(" POST {pfx}/wati — WATI message webhook");
862 }
863 if nextcloud_talk_channel.is_some() {
864 println!(" POST {pfx}/nextcloud-talk — Nextcloud Talk bot webhook");
865 }
866 println!(" GET {pfx}/api/* — REST API (bearer token required)");
867 println!(" GET {pfx}/ws/chat — WebSocket agent chat");
868 if config.nodes.enabled {
869 println!(" GET {pfx}/ws/nodes — WebSocket node discovery");
870 }
871 println!(" GET {pfx}/health — health check");
872 println!(" GET {pfx}/metrics — Prometheus metrics");
873 println!(" Press Ctrl+C to stop.\n");
874
875 crate::health::mark_component_ok("gateway");
876
877 if let Some(ref hooks) = hooks {
879 hooks.fire_gateway_start(host, actual_port).await;
880 }
881
882 let broadcast_observer: Arc<dyn crate::observability::Observer> =
884 Arc::new(sse::BroadcastObserver::new(
885 crate::observability::create_observer(&config.observability),
886 event_tx.clone(),
887 ));
888
889 let (shutdown_tx, mut shutdown_rx) = tokio::sync::watch::channel(false);
890
891 let node_registry = Arc::new(nodes::NodeRegistry::new(config.nodes.max_nodes));
893
894 let device_registry = if config.gateway.require_pairing {
896 Some(Arc::new(api_pairing::DeviceRegistry::new(
897 &config.workspace_dir,
898 )?))
899 } else {
900 None
901 };
902 let pending_pairings = if config.gateway.require_pairing {
903 Some(Arc::new(api_pairing::PairingStore::new(
904 config.gateway.pairing_dashboard.max_pending_codes,
905 )))
906 } else {
907 None
908 };
909
910 let mcp_workspace_dir = config.workspace_dir.clone();
924 let mcp_config_snapshot = config.clone();
925 let mcp_runtime_handles = {
926 let mut rh = crate::mcp_server::RuntimeHandles::empty();
927
928 if config.workspace.enabled {
930 let workspaces_dir = if config.workspace.workspaces_dir.starts_with("~/") {
931 let home = directories::UserDirs::new()
932 .map(|u| u.home_dir().to_path_buf())
933 .unwrap_or_else(|| std::path::PathBuf::from("."));
934 home.join(&config.workspace.workspaces_dir[2..])
935 } else {
936 std::path::PathBuf::from(&config.workspace.workspaces_dir)
937 };
938 let manager = crate::config::workspace::WorkspaceManager::new(workspaces_dir);
939 rh.workspace_manager = Some(Arc::new(tokio::sync::RwLock::new(manager)));
940 }
941
942 if let Ok(store) = crate::channels::session_store::SessionStore::new(&config.workspace_dir)
944 {
945 let backend: Arc<dyn crate::channels::session_backend::SessionBackend> =
946 Arc::new(store);
947 rh.session_store = Some(backend);
948 }
949
950 if !config.agents.is_empty() {
955 rh.agent_config = Some(Arc::new(config.agents.clone()));
956 rh.fallback_api_key = config.api_key.as_deref().map(Arc::<str>::from);
957 rh.provider_runtime_options =
958 Some(Arc::new(crate::providers::ProviderRuntimeOptions {
959 auth_profile_override: None,
960 provider_api_url: config.api_url.clone(),
961 construct_dir: config.config_path.parent().map(std::path::PathBuf::from),
962 secrets_encrypt: config.secrets.encrypt,
963 reasoning_enabled: config.runtime.reasoning_enabled,
964 reasoning_effort: config.runtime.reasoning_effort.clone(),
965 provider_timeout_secs: Some(config.provider_timeout_secs),
966 extra_headers: config.extra_headers.clone(),
967 api_path: config.api_path.clone(),
968 provider_max_tokens: config.provider_max_tokens,
969 }));
970 }
971
972 let (mcp_tools_boxed, _d_h, _r_h, _c_h, _a_h, _e_h) = tools::all_tools_with_runtime(
979 Arc::new(config.clone()),
980 &security,
981 Arc::new(crate::runtime::NativeRuntime::new()),
982 Arc::clone(&mem),
983 composio_key,
984 composio_entity_id,
985 &config.browser,
986 &config.http_request,
987 &config.web_fetch,
988 &config.workspace_dir,
989 &config.agents,
990 config.api_key.as_deref(),
991 &config,
992 Some(canvas_store.clone()),
993 );
994 let mcp_tool_arcs: Vec<Arc<dyn tools::Tool>> = mcp_tools_boxed
996 .into_iter()
997 .map(|b| Arc::<dyn tools::Tool>::from(b))
998 .collect();
999 rh.pre_built_tools = Some(mcp_tool_arcs);
1000
1001 rh
1002 };
1003
1004 let mut state = AppState {
1005 config: config_state,
1006 provider,
1007 model,
1008 temperature,
1009 mem,
1010 auto_save: config.memory.auto_save,
1011 webhook_secret_hash,
1012 pairing,
1013 trust_forwarded_headers: config.gateway.trust_forwarded_headers,
1014 rate_limiter,
1015 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
1016 idempotency_store,
1017 whatsapp: whatsapp_channel,
1018 whatsapp_app_secret,
1019 linq: linq_channel,
1020 linq_signing_secret,
1021 nextcloud_talk: nextcloud_talk_channel,
1022 nextcloud_talk_webhook_secret,
1023 wati: wati_channel,
1024 gmail_push: gmail_push_channel,
1025 observer: broadcast_observer,
1026 tools_registry,
1027 cost_tracker,
1028 audit_logger,
1029 event_tx,
1030 shutdown_tx,
1031 node_registry,
1032 session_backend,
1033 session_queue: Arc::new(session_queue::SessionActorQueue::new(8, 30, 600)),
1034 device_registry,
1035 pending_pairings,
1036 path_prefix: path_prefix.unwrap_or("").to_string(),
1037 canvas_store,
1038 mcp_registry: mcp_registry_shared,
1039 approval_registry: approval_registry::global(),
1040 mcp_local_url: None,
1042 #[cfg(feature = "webauthn")]
1043 webauthn: if config.security.webauthn.enabled {
1044 let secret_store = Arc::new(crate::security::SecretStore::new(
1045 &config.workspace_dir,
1046 true,
1047 ));
1048 let wa_config = crate::security::webauthn::WebAuthnConfig {
1049 enabled: true,
1050 rp_id: config.security.webauthn.rp_id.clone(),
1051 rp_origin: config.security.webauthn.rp_origin.clone(),
1052 rp_name: config.security.webauthn.rp_name.clone(),
1053 };
1054 Some(Arc::new(api_webauthn::WebAuthnState {
1055 manager: crate::security::webauthn::WebAuthnManager::new(
1056 wa_config,
1057 secret_store,
1058 &config.workspace_dir,
1059 ),
1060 pending_registrations: parking_lot::Mutex::new(std::collections::HashMap::new()),
1061 pending_authentications: parking_lot::Mutex::new(std::collections::HashMap::new()),
1062 }))
1063 } else {
1064 None
1065 },
1066 };
1067
1068 {
1083 let effectiveness_cache = crate::skills::EffectivenessCache::new();
1084 let _ = crate::skills::effectiveness_cache::set_global(effectiveness_cache.clone());
1085
1086 let memory_project = config.kumiho.memory_project.clone();
1087 let workspace_dir = config.workspace_dir.clone();
1088 let config_for_skills = config.clone();
1089
1090 {
1101 let scan_workspace = workspace_dir.clone();
1102 let scan_project = memory_project.clone();
1103 let scan_client = crate::gateway::kumiho_client::build_client_from_config(&config);
1104 drop(tokio::spawn(async move {
1105 let skills_root = crate::skills::skills_dir(&scan_workspace);
1106 let mut entries = match tokio::fs::read_dir(&skills_root).await {
1107 Ok(e) => e,
1108 Err(_) => return,
1109 };
1110 while let Ok(Some(entry)) = entries.next_entry().await {
1111 let dir = entry.path();
1112 if !dir.is_dir() {
1113 continue;
1114 }
1115 if !dir.join("SKILL.toml").exists() {
1116 continue;
1117 }
1118 let registration_ok =
1119 match crate::skills::registration::register_skill_with_kumiho(
1120 &dir,
1121 &scan_client,
1122 &scan_project,
1123 )
1124 .await
1125 {
1126 Ok(crate::skills::registration::SkillRegistration::Registered {
1127 kref,
1128 ..
1129 }) => {
1130 tracing::info!(
1131 skill_dir = %dir.display(),
1132 kref,
1133 "daemon-startup: registered skill with Kumiho",
1134 );
1135 true
1136 }
1137 Ok(
1138 crate::skills::registration::SkillRegistration::AlreadyRegistered {
1139 ..
1140 },
1141 ) => {
1142 true
1144 }
1145 Err(e) => {
1146 tracing::warn!(
1147 skill_dir = %dir.display(),
1148 error = ?e,
1149 "daemon-startup: skill registration failed; will retry next start",
1150 );
1151 false
1152 }
1153 };
1154
1155 if !registration_ok {
1162 continue;
1163 }
1164 match crate::skills::registration::sync_published_content_path(
1165 &dir,
1166 &scan_client,
1167 )
1168 .await
1169 {
1170 Ok(crate::skills::registration::SkillContentSync::Updated {
1171 new_content_file,
1172 ..
1173 }) => {
1174 tracing::info!(
1175 skill_dir = %dir.display(),
1176 content_file = %new_content_file,
1177 "daemon-startup: synced skill content_file from published kref",
1178 );
1179 }
1180 Ok(_) => {
1181 }
1184 Err(e) => {
1185 tracing::warn!(
1186 skill_dir = %dir.display(),
1187 error = ?e,
1188 "daemon-startup: skill content_file sync failed; \
1189 loader will use existing pointer until next sync",
1190 );
1191 }
1192 }
1193 }
1194 }));
1195 }
1196
1197 let skill_names: Vec<String> =
1201 crate::skills::load_skills_with_config(&workspace_dir, &config_for_skills)
1202 .into_iter()
1203 .map(|s| s.name)
1204 .collect();
1205
1206 if skill_names.is_empty() {
1207 tracing::info!("skill effectiveness: no skills loaded; refresh task skipped");
1208 } else {
1209 let kumiho_client = Arc::new(crate::gateway::kumiho_client::build_client_from_config(
1210 &config,
1211 ));
1212 tracing::info!(
1213 count = skill_names.len(),
1214 project = %memory_project,
1215 "skill effectiveness: starting background refresh task",
1216 );
1217 drop(effectiveness_cache.clone().spawn_refresh_task(
1225 kumiho_client.clone(),
1226 memory_project,
1227 skill_names,
1228 crate::skills::effectiveness_cache::DEFAULT_REFRESH_INTERVAL,
1229 ));
1230
1231 #[cfg(feature = "skill-creation")]
1241 if config.skills.skill_improvement.enabled {
1242 let cache_for_improver = effectiveness_cache.clone();
1243 let auto_workspace = workspace_dir.clone();
1244 let auto_provider = state.provider.clone();
1245 let auto_model = state.model.clone();
1246 let auto_improver_config = config.skills.skill_improvement.clone();
1247 let auto_kumiho = kumiho_client.clone();
1248 let auto_project = config.kumiho.memory_project.clone();
1249
1250 tracing::info!(
1251 cooldown_secs = auto_improver_config.cooldown_secs,
1252 "skill auto-improve: starting LLM-driven rewrite loop",
1253 );
1254
1255 drop(tokio::spawn(async move {
1256 let ctx = crate::skills::auto_improve::AutoImproveContext {
1257 workspace_dir: auto_workspace.clone(),
1258 provider: auto_provider,
1259 model: auto_model,
1260 temperature: crate::skills::auto_improve::DEFAULT_REWRITE_TEMPERATURE,
1261 kumiho_client: auto_kumiho.clone(),
1262 memory_project: auto_project.clone(),
1263 };
1264 let mut improver = crate::skills::improver::SkillImprover::new(
1265 auto_workspace.clone(),
1266 auto_improver_config,
1267 );
1268
1269 let rollback_ctx = crate::skills::auto_rollback::AutoRollbackContext {
1274 workspace_dir: auto_workspace.clone(),
1275 kumiho_client: auto_kumiho,
1276 memory_project: auto_project,
1277 };
1278 let mut rollback_tracker =
1279 crate::skills::auto_rollback::SkillRollbackTracker::new(auto_workspace);
1280
1281 let mut ticker = tokio::time::interval(
1282 crate::skills::effectiveness_cache::DEFAULT_REFRESH_INTERVAL,
1283 );
1284 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1285 loop {
1291 ticker.tick().await;
1292 for cand in cache_for_improver.improvement_candidates() {
1293 match crate::skills::auto_improve::attempt_skill_improvement(
1294 &ctx,
1295 &cand,
1296 &mut improver,
1297 )
1298 .await
1299 {
1300 Ok(Some(outcome)) => tracing::info!(
1301 skill = %outcome.slug,
1302 revision_kref = %outcome.revision_kref,
1303 content_file = %outcome.content_file,
1304 rate = cand.rate,
1305 total = cand.total,
1306 "auto-improve: published new skill revision",
1307 ),
1308 Ok(None) => {
1309 }
1311 Err(e) => tracing::warn!(
1312 skill = %cand.skill_name,
1313 error = %e,
1314 "auto-improve: attempt failed",
1315 ),
1316 }
1317 }
1318
1319 for cand in cache_for_improver.regression_candidates() {
1327 match crate::skills::auto_rollback::attempt_skill_rollback(
1328 &rollback_ctx,
1329 &cand,
1330 &mut rollback_tracker,
1331 )
1332 .await
1333 {
1334 Ok(Some(outcome)) => tracing::warn!(
1335 skill = %outcome.slug,
1336 restored_revision_kref = %outcome.restored_revision_kref,
1337 demoted_revision_kref = %outcome.demoted_revision_kref,
1338 content_file = %outcome.content_file,
1339 current_rate = cand.current_rate,
1340 previous_rate = cand.previous_rate,
1341 "auto-rollback: reverted regressed skill revision",
1342 ),
1343 Ok(None) => {
1344 }
1346 Err(e) => tracing::warn!(
1347 skill = %cand.skill_name,
1348 error = %e,
1349 "auto-rollback: attempt failed",
1350 ),
1351 }
1352 }
1353 }
1354 }));
1355 }
1356 }
1357 }
1358
1359 let mcp_shutdown_watch = state.shutdown_tx.subscribe();
1366 let mcp_task: Option<tokio::task::JoinHandle<()>> = {
1367 let (mcp_state, mcp_skipped) = crate::mcp_server::server::state_from_runtime(
1368 &mcp_workspace_dir,
1369 &mcp_config_snapshot,
1370 &mcp_runtime_handles,
1371 );
1372 for (name, reason) in &mcp_skipped {
1373 tracing::info!("mcp-server: skipped tool `{name}` — {reason}");
1374 }
1375 tracing::info!(
1376 "mcp-server: advertising {} tools to external MCP clients",
1377 mcp_state.tools.len()
1378 );
1379
1380 match crate::mcp_server::serve_on(SocketAddr::from(([127, 0, 0, 1], 0)), mcp_state).await {
1381 Ok(handle) => {
1382 let url = format!("http://{}/mcp", handle.addr);
1383 if let Err(e) = crate::mcp_server::server::write_discovery_file(&url) {
1384 tracing::warn!("mcp-server: failed to write discovery file: {e}");
1385 } else {
1386 tracing::info!(
1387 "mcp-server: listening on {url} (discovery at ~/.construct/mcp.json)"
1388 );
1389 }
1390
1391 state.mcp_local_url = Some(Arc::from(format!("http://{}", handle.addr)));
1394
1395 let mut watch = mcp_shutdown_watch;
1396 Some(tokio::spawn(async move {
1397 loop {
1399 if *watch.borrow() {
1400 break;
1401 }
1402 if watch.changed().await.is_err() {
1403 break;
1404 }
1405 }
1406 let _ = handle.shutdown.send(());
1407 let _ = handle.joined.await;
1408 crate::mcp_server::server::cleanup_discovery_file();
1409 tracing::info!("mcp-server: stopped");
1410 }))
1411 }
1412 Err(e) => {
1413 tracing::error!("mcp-server: failed to bind: {e}");
1414 None
1415 }
1416 }
1417 };
1418
1419 let config_put_router = Router::new()
1421 .route("/api/config", put(api::handle_api_config_put))
1422 .layer(RequestBodyLimitLayer::new(1_048_576));
1423
1424 let memory_graph_router = Router::new()
1427 .route(
1428 "/api/memory/graph",
1429 get(api_memory_graph::handle_memory_graph),
1430 )
1431 .with_state(state.clone())
1432 .layer(RequestBodyLimitLayer::new(MAX_BODY_SIZE))
1433 .layer(TimeoutLayer::with_status_code(
1434 StatusCode::REQUEST_TIMEOUT,
1435 Duration::from_secs(60),
1436 ));
1437
1438 let inner = Router::new()
1440 .route("/admin/shutdown", post(handle_admin_shutdown))
1442 .route("/admin/paircode", get(handle_admin_paircode))
1443 .route("/admin/paircode/new", post(handle_admin_paircode_new))
1444 .route("/health", get(handle_health))
1446 .route("/metrics", get(handle_metrics))
1447 .route("/pair", post(handle_pair))
1448 .route("/pair/code", get(handle_pair_code))
1449 .route("/webhook", post(handle_webhook))
1450 .route("/whatsapp", get(handle_whatsapp_verify))
1451 .route("/whatsapp", post(handle_whatsapp_message))
1452 .route("/linq", post(handle_linq_webhook))
1453 .route("/wati", get(handle_wati_verify))
1454 .route("/wati", post(handle_wati_webhook))
1455 .route("/nextcloud-talk", post(handle_nextcloud_talk_webhook))
1456 .route("/webhook/gmail", post(handle_gmail_push_webhook))
1457 .route("/hooks/claude-code", post(api::handle_claude_code_hook))
1459 .route("/api/status", get(api::handle_api_status))
1461 .route("/api/config", get(api::handle_api_config_get))
1462 .route("/api/tools", get(api::handle_api_tools))
1463 .route("/api/cron", get(api::handle_api_cron_list))
1464 .route("/api/cron", post(api::handle_api_cron_add))
1465 .route(
1466 "/api/cron/settings",
1467 get(api::handle_api_cron_settings_get).patch(api::handle_api_cron_settings_patch),
1468 )
1469 .route(
1470 "/api/cron/{id}",
1471 delete(api::handle_api_cron_delete).patch(api::handle_api_cron_patch),
1472 )
1473 .route("/api/cron/{id}/runs", get(api::handle_api_cron_runs))
1474 .route("/api/integrations", get(api::handle_api_integrations))
1475 .route(
1476 "/api/integrations/settings",
1477 get(api::handle_api_integrations_settings),
1478 )
1479 .route(
1480 "/api/doctor",
1481 get(api::handle_api_doctor).post(api::handle_api_doctor),
1482 )
1483 .route("/api/cost", get(api::handle_api_cost))
1485 .route("/api/audit", get(api::handle_api_audit))
1486 .route("/api/audit/verify", get(api::handle_api_audit_verify))
1487 .route("/api/cli-tools", get(api::handle_api_cli_tools))
1488 .route("/api/health", get(api::handle_api_health))
1489 .route("/api/mcp/discovery", get(api_mcp::handle_api_mcp_discovery))
1490 .route("/api/mcp/servers/test", post(api_mcp::handle_api_mcp_servers_test))
1491 .route("/api/mcp/health", get(api_mcp::handle_api_mcp_health))
1493 .route("/api/mcp/session", post(api_mcp::handle_api_mcp_session_create))
1494 .route(
1495 "/api/mcp/session/{session_id}/events",
1496 get(api_mcp::handle_api_mcp_session_events),
1497 )
1498 .route("/api/mcp/call", post(api_mcp::handle_api_mcp_call))
1499 .route("/api/nodes", get(api::handle_api_nodes))
1500 .route("/api/nodes/{node_id}/invoke", post(api::handle_api_node_invoke))
1501 .route("/api/sessions", get(api::handle_api_sessions_list))
1502 .route("/api/sessions/running", get(api::handle_api_sessions_running))
1503 .route(
1504 "/api/sessions/{id}/messages",
1505 get(api::handle_api_session_messages),
1506 )
1507 .route("/api/sessions/{id}", delete(api::handle_api_session_delete).put(api::handle_api_session_rename))
1508 .route("/api/sessions/{id}/state", get(api::handle_api_session_state))
1509 .route("/api/channels", get(api::handle_api_channels))
1511 .route("/api/channel-events", post(api::handle_api_channel_events))
1512 .route("/api/agents", get(api_agents::handle_list_agents).post(api_agents::handle_create_agent))
1514 .route("/api/agents/deprecate", post(api_agents::handle_deprecate_agent))
1515 .route("/api/agents/{*kref}", put(api_agents::handle_update_agent).delete(api_agents::handle_delete_agent))
1516 .route("/api/skills", get(api_skills::handle_list_skills).post(api_skills::handle_create_skill))
1518 .route("/api/skills/deprecate", post(api_skills::handle_deprecate_skill))
1519 .route("/api/skills/{*kref}", get(api_skills::handle_get_skill).put(api_skills::handle_update_skill).delete(api_skills::handle_delete_skill))
1520 .route("/api/teams", get(api_teams::handle_list_teams).post(api_teams::handle_create_team))
1522 .route("/api/teams/deprecate", post(api_teams::handle_deprecate_team))
1523 .route("/api/teams/{*kref}", get(api_teams::handle_get_team).put(api_teams::handle_update_team).delete(api_teams::handle_delete_team))
1524 .route("/api/workflows", get(api_workflows::handle_list_workflows).post(api_workflows::handle_create_workflow))
1526 .route("/api/workflows/deprecate", post(api_workflows::handle_deprecate_workflow))
1527 .route("/api/workflows/run/{name}", post(api_workflows::handle_run_workflow))
1528 .route("/api/workflows/revisions/{*kref}", get(api_workflows::handle_get_workflow_by_revision))
1529 .route("/api/workflows/{*kref}", put(api_workflows::handle_update_workflow).delete(api_workflows::handle_delete_workflow))
1530 .route("/api/workflows/runs", get(api_workflows::handle_list_workflow_runs))
1531 .route("/api/workflows/runs/{run_id}", get(api_workflows::handle_get_workflow_run).delete(api_workflows::handle_delete_workflow_run))
1532 .route("/api/workflows/runs/{run_id}/approve", post(api_workflows::handle_approve_workflow_run))
1533 .route("/api/workflows/runs/{run_id}/retry", post(api_workflows::handle_retry_workflow_run))
1534 .route("/api/workflows/agent-activity/{agent_id}", get(api_workflows::handle_agent_activity))
1535 .route("/api/workflows/dashboard", get(api_workflows::handle_workflow_dashboard))
1536 .route("/api/clawhub/search", get(api_clawhub::handle_clawhub_search))
1538 .route("/api/clawhub/trending", get(api_clawhub::handle_clawhub_trending))
1539 .route("/api/clawhub/skills/{slug}", get(api_clawhub::handle_clawhub_skill_detail))
1540 .route("/api/clawhub/install/{slug}", post(api_clawhub::handle_clawhub_install))
1541 .route("/api/kumiho/{*path}", get(api_kumiho_proxy::handle_kumiho_proxy))
1544 .route("/api/artifact-body", get(api_artifact_body::handle_artifact_body))
1546 .route("/api/pairing/initiate", post(api_pairing::initiate_pairing))
1548 .route("/api/pair", post(api_pairing::submit_pairing_enhanced))
1549 .route("/api/devices", get(api_pairing::list_devices))
1550 .route("/api/devices/{id}", delete(api_pairing::revoke_device))
1551 .route(
1552 "/api/devices/{id}/token/rotate",
1553 post(api_pairing::rotate_token),
1554 )
1555 .route("/api/canvas", get(canvas::handle_canvas_list))
1557 .route(
1558 "/api/canvas/{id}",
1559 get(canvas::handle_canvas_get)
1560 .post(canvas::handle_canvas_post)
1561 .delete(canvas::handle_canvas_clear),
1562 )
1563 .route(
1564 "/api/canvas/{id}/history",
1565 get(canvas::handle_canvas_history),
1566 );
1567
1568 #[cfg(feature = "webauthn")]
1570 let inner = inner
1571 .route(
1572 "/api/webauthn/register/start",
1573 post(api_webauthn::handle_register_start),
1574 )
1575 .route(
1576 "/api/webauthn/register/finish",
1577 post(api_webauthn::handle_register_finish),
1578 )
1579 .route(
1580 "/api/webauthn/auth/start",
1581 post(api_webauthn::handle_auth_start),
1582 )
1583 .route(
1584 "/api/webauthn/auth/finish",
1585 post(api_webauthn::handle_auth_finish),
1586 )
1587 .route(
1588 "/api/webauthn/credentials",
1589 get(api_webauthn::handle_list_credentials),
1590 )
1591 .route(
1592 "/api/webauthn/credentials/{id}",
1593 delete(api_webauthn::handle_delete_credential),
1594 );
1595
1596 #[cfg(feature = "plugins-wasm")]
1598 let inner = inner.route(
1599 "/api/plugins",
1600 get(api_plugins::plugin_routes::list_plugins),
1601 );
1602
1603 let inner = inner
1604 .route("/api/events", get(sse::handle_sse_events))
1606 .route("/api/daemon/logs", get(sse::handle_api_daemon_logs))
1607 .route("/ws/chat", get(ws::handle_ws_chat))
1609 .route("/ws/canvas/{id}", get(canvas::handle_ws_canvas))
1611 .route("/ws/nodes", get(nodes::handle_ws_nodes));
1613
1614 #[cfg(not(target_os = "android"))]
1619 let inner = inner.route("/ws/terminal", get(terminal::handle_ws_terminal));
1620
1621 let inner = inner
1622 .route("/ws/mcp/events", get(ws_mcp_events::handle_ws_mcp_events))
1624 .route("/_app/{*path}", get(static_files::handle_static))
1626 .merge(config_put_router)
1628 .fallback(get(static_files::handle_spa_fallback))
1630 .with_state(state)
1631 .layer(RequestBodyLimitLayer::new(MAX_BODY_SIZE))
1632 .layer(TimeoutLayer::with_status_code(
1633 StatusCode::REQUEST_TIMEOUT,
1634 Duration::from_secs(gateway_request_timeout_secs()),
1635 ));
1636
1637 let inner = inner.merge(memory_graph_router);
1639
1640 let app = if let Some(prefix) = path_prefix {
1644 let redirect_target = prefix.to_string();
1645 Router::new().nest(prefix, inner).route(
1646 &format!("{prefix}/"),
1647 get(|| async move { axum::response::Redirect::permanent(&redirect_target) }),
1648 )
1649 } else {
1650 inner
1651 };
1652
1653 let tls_acceptor = match &config.gateway.tls {
1655 Some(tls_cfg) if tls_cfg.enabled => {
1656 let has_mtls = tls_cfg.client_auth.as_ref().is_some_and(|ca| ca.enabled);
1657 if has_mtls {
1658 tracing::info!("TLS enabled with mutual TLS (mTLS) client verification");
1659 } else {
1660 tracing::info!("TLS enabled (no client certificate requirement)");
1661 }
1662 Some(tls::build_tls_acceptor(tls_cfg)?)
1663 }
1664 _ => None,
1665 };
1666
1667 if let Some(tls_acceptor) = tls_acceptor {
1668 let app = app.into_make_service_with_connect_info::<SocketAddr>();
1670 let mut app = app;
1671
1672 let mut shutdown_signal = shutdown_rx;
1673 loop {
1674 tokio::select! {
1675 conn = listener.accept() => {
1676 let (tcp_stream, remote_addr) = conn?;
1677 let tls_acceptor = tls_acceptor.clone();
1678 let svc = tower::MakeService::<
1679 SocketAddr,
1680 hyper::Request<hyper::body::Incoming>,
1681 >::make_service(&mut app, remote_addr)
1682 .await
1683 .expect("infallible make_service");
1684
1685 tokio::spawn(async move {
1686 let tls_stream = match tls_acceptor.accept(tcp_stream).await {
1687 Ok(s) => s,
1688 Err(e) => {
1689 tracing::debug!("TLS handshake failed from {remote_addr}: {e}");
1690 return;
1691 }
1692 };
1693 let io = hyper_util::rt::TokioIo::new(tls_stream);
1694 let hyper_svc = hyper::service::service_fn(move |req: hyper::Request<hyper::body::Incoming>| {
1695 let mut svc = svc.clone();
1696 async move {
1697 tower::Service::call(&mut svc, req).await
1698 }
1699 });
1700 if let Err(e) = hyper_util::server::conn::auto::Builder::new(
1701 hyper_util::rt::TokioExecutor::new(),
1702 )
1703 .serve_connection(io, hyper_svc)
1704 .await
1705 {
1706 tracing::debug!("connection error from {remote_addr}: {e}");
1707 }
1708 });
1709 }
1710 _ = shutdown_signal.changed() => {
1711 tracing::info!("🦀 Construct Gateway shutting down...");
1712 break;
1713 }
1714 }
1715 }
1716 } else {
1717 axum::serve(
1719 listener,
1720 app.into_make_service_with_connect_info::<SocketAddr>(),
1721 )
1722 .with_graceful_shutdown(async move {
1723 let _ = shutdown_rx.changed().await;
1724 tracing::info!("🦀 Construct Gateway shutting down...");
1725 })
1726 .await?;
1727 }
1728
1729 if let Some(task) = mcp_task {
1732 let _ = task.await;
1733 }
1734
1735 Ok(())
1736}
1737
1738async fn handle_health(State(state): State<AppState>) -> impl IntoResponse {
1744 let body = serde_json::json!({
1745 "status": "ok",
1746 "paired": state.pairing.is_paired(),
1747 "require_pairing": state.pairing.require_pairing(),
1748 "runtime": crate::health::snapshot_json(),
1749 });
1750 Json(body)
1751}
1752
1753const PROMETHEUS_CONTENT_TYPE: &str = "text/plain; version=0.0.4; charset=utf-8";
1755
1756fn prometheus_disabled_hint() -> String {
1757 String::from(
1758 "# Prometheus backend not enabled. Set [observability] backend = \"prometheus\" in config.\n",
1759 )
1760}
1761
1762#[cfg(feature = "observability-prometheus")]
1763fn prometheus_observer_from_state(
1764 observer: &dyn crate::observability::Observer,
1765) -> Option<&crate::observability::PrometheusObserver> {
1766 observer
1767 .as_any()
1768 .downcast_ref::<crate::observability::PrometheusObserver>()
1769 .or_else(|| {
1770 observer
1771 .as_any()
1772 .downcast_ref::<sse::BroadcastObserver>()
1773 .and_then(|broadcast| {
1774 broadcast
1775 .inner()
1776 .as_any()
1777 .downcast_ref::<crate::observability::PrometheusObserver>()
1778 })
1779 })
1780}
1781
1782async fn handle_metrics(State(state): State<AppState>) -> impl IntoResponse {
1784 let body = {
1785 #[cfg(feature = "observability-prometheus")]
1786 {
1787 if let Some(prom) = prometheus_observer_from_state(state.observer.as_ref()) {
1788 prom.encode()
1789 } else {
1790 prometheus_disabled_hint()
1791 }
1792 }
1793 #[cfg(not(feature = "observability-prometheus"))]
1794 {
1795 let _ = &state;
1796 prometheus_disabled_hint()
1797 }
1798 };
1799
1800 (
1801 StatusCode::OK,
1802 [(header::CONTENT_TYPE, PROMETHEUS_CONTENT_TYPE)],
1803 body,
1804 )
1805}
1806
1807#[axum::debug_handler]
1809async fn handle_pair(
1810 State(state): State<AppState>,
1811 ConnectInfo(peer_addr): ConnectInfo<SocketAddr>,
1812 headers: HeaderMap,
1813) -> impl IntoResponse {
1814 let rate_key =
1815 client_key_from_request(Some(peer_addr), &headers, state.trust_forwarded_headers);
1816 let peer_is_loopback = peer_addr.ip().is_loopback();
1817 if !state.rate_limiter.allow_pair(&rate_key) {
1818 tracing::warn!("/pair rate limit exceeded");
1819 let err = serde_json::json!({
1820 "error": "Too many pairing requests. Please retry later.",
1821 "retry_after": RATE_LIMIT_WINDOW_SECS,
1822 });
1823 return (StatusCode::TOO_MANY_REQUESTS, Json(err));
1824 }
1825
1826 if let Err(e) = state
1828 .auth_limiter
1829 .check_rate_limit(&rate_key, peer_is_loopback)
1830 {
1831 tracing::warn!("🔐 Pairing auth rate limit exceeded for {rate_key}");
1832 let err = serde_json::json!({
1833 "error": format!("Too many auth attempts. Try again in {}s.", e.retry_after_secs),
1834 "retry_after": e.retry_after_secs,
1835 });
1836 return (StatusCode::TOO_MANY_REQUESTS, Json(err));
1837 }
1838
1839 let code = headers
1840 .get("X-Pairing-Code")
1841 .and_then(|v| v.to_str().ok())
1842 .unwrap_or("");
1843
1844 match state.pairing.try_pair(code, &rate_key).await {
1845 Ok(Some(token)) => {
1846 tracing::info!("🔐 New client paired successfully");
1847 if let Some(ref logger) = state.audit_logger {
1848 let _ =
1849 logger.log_auth_success("gateway", &format!("Client paired from {rate_key}"));
1850 }
1851 if let Err(err) =
1852 Box::pin(persist_pairing_tokens(state.config.clone(), &state.pairing)).await
1853 {
1854 tracing::error!("🔐 Pairing succeeded but token persistence failed: {err:#}");
1855 let body = serde_json::json!({
1856 "paired": true,
1857 "persisted": false,
1858 "token": token,
1859 "message": "Paired for this process, but failed to persist token to config.toml. Check config path and write permissions.",
1860 });
1861 return (StatusCode::OK, Json(body));
1862 }
1863
1864 let body = serde_json::json!({
1865 "paired": true,
1866 "persisted": true,
1867 "token": token,
1868 "message": "Save this token — use it as Authorization: Bearer <token>"
1869 });
1870 (StatusCode::OK, Json(body))
1871 }
1872 Ok(None) => {
1873 state
1874 .auth_limiter
1875 .record_attempt(&rate_key, peer_is_loopback);
1876 tracing::warn!("🔐 Pairing attempt with invalid code");
1877 if let Some(ref logger) = state.audit_logger {
1878 let _ = logger
1879 .log_auth_failure("gateway", &format!("Invalid pairing code from {rate_key}"));
1880 }
1881 let err = serde_json::json!({"error": "Invalid pairing code"});
1882 (StatusCode::FORBIDDEN, Json(err))
1883 }
1884 Err(lockout_secs) => {
1885 tracing::warn!(
1886 "🔐 Pairing locked out — too many failed attempts ({lockout_secs}s remaining)"
1887 );
1888 if let Some(ref logger) = state.audit_logger {
1889 let _ = logger.log_auth_failure(
1890 "gateway",
1891 &format!("Pairing lockout for {rate_key} ({lockout_secs}s)"),
1892 );
1893 }
1894 let err = serde_json::json!({
1895 "error": format!("Too many failed attempts. Try again in {lockout_secs}s."),
1896 "retry_after": lockout_secs
1897 });
1898 (StatusCode::TOO_MANY_REQUESTS, Json(err))
1899 }
1900 }
1901}
1902
1903async fn persist_pairing_tokens(config: Arc<Mutex<Config>>, pairing: &PairingGuard) -> Result<()> {
1904 let paired_tokens = pairing.tokens();
1905 let mut updated_cfg = { config.lock().clone() };
1908 updated_cfg.gateway.paired_tokens = paired_tokens;
1909 updated_cfg
1910 .save()
1911 .await
1912 .context("Failed to persist paired tokens to config.toml")?;
1913
1914 *config.lock() = updated_cfg;
1916 Ok(())
1917}
1918
1919async fn run_gateway_chat_simple(state: &AppState, message: &str) -> anyhow::Result<String> {
1921 let user_messages = vec![ChatMessage::user(message)];
1922
1923 let system_prompt = {
1926 let config_guard = state.config.lock();
1927 crate::channels::build_system_prompt(
1928 &config_guard.workspace_dir,
1929 &state.model,
1930 &[], &[], Some(&config_guard.identity),
1933 None, )
1935 };
1936
1937 let mut messages = Vec::with_capacity(1 + user_messages.len());
1938 messages.push(ChatMessage::system(system_prompt));
1939 messages.extend(user_messages);
1940
1941 let multimodal_config = state.config.lock().multimodal.clone();
1942 let prepared =
1943 crate::multimodal::prepare_messages_for_provider(&messages, &multimodal_config).await?;
1944
1945 state
1946 .provider
1947 .chat_with_history(&prepared.messages, &state.model, state.temperature)
1948 .await
1949}
1950
1951async fn run_gateway_chat_with_tools(
1953 state: &AppState,
1954 message: &str,
1955 session_id: Option<&str>,
1956) -> anyhow::Result<String> {
1957 let config = state.config.lock().clone();
1958 Box::pin(crate::agent::process_message(config, message, session_id)).await
1959}
1960
1961#[derive(serde::Deserialize)]
1963pub struct WebhookBody {
1964 pub message: String,
1965}
1966
1967async fn handle_webhook(
1969 State(state): State<AppState>,
1970 ConnectInfo(peer_addr): ConnectInfo<SocketAddr>,
1971 headers: HeaderMap,
1972 body: Result<Json<WebhookBody>, axum::extract::rejection::JsonRejection>,
1973) -> impl IntoResponse {
1974 let rate_key =
1975 client_key_from_request(Some(peer_addr), &headers, state.trust_forwarded_headers);
1976 let peer_is_loopback = peer_addr.ip().is_loopback();
1977 if !state.rate_limiter.allow_webhook(&rate_key) {
1978 tracing::warn!("/webhook rate limit exceeded");
1979 let err = serde_json::json!({
1980 "error": "Too many webhook requests. Please retry later.",
1981 "retry_after": RATE_LIMIT_WINDOW_SECS,
1982 });
1983 return (StatusCode::TOO_MANY_REQUESTS, Json(err));
1984 }
1985
1986 if state.pairing.require_pairing() {
1988 if let Err(e) = state
1989 .auth_limiter
1990 .check_rate_limit(&rate_key, peer_is_loopback)
1991 {
1992 tracing::warn!("Webhook: auth rate limit exceeded for {rate_key}");
1993 let err = serde_json::json!({
1994 "error": format!("Too many auth attempts. Try again in {}s.", e.retry_after_secs),
1995 "retry_after": e.retry_after_secs,
1996 });
1997 return (StatusCode::TOO_MANY_REQUESTS, Json(err));
1998 }
1999 let auth = headers
2000 .get(header::AUTHORIZATION)
2001 .and_then(|v| v.to_str().ok())
2002 .unwrap_or("");
2003 let token = auth.strip_prefix("Bearer ").unwrap_or("");
2004 if !state.pairing.is_authenticated(token) {
2005 state
2006 .auth_limiter
2007 .record_attempt(&rate_key, peer_is_loopback);
2008 tracing::warn!("Webhook: rejected — not paired / invalid bearer token");
2009 let err = serde_json::json!({
2010 "error": "Unauthorized — pair first via POST /pair, then send Authorization: Bearer <token>"
2011 });
2012 return (StatusCode::UNAUTHORIZED, Json(err));
2013 }
2014 }
2015
2016 if let Some(ref secret_hash) = state.webhook_secret_hash {
2018 let header_hash = headers
2019 .get("X-Webhook-Secret")
2020 .and_then(|v| v.to_str().ok())
2021 .map(str::trim)
2022 .filter(|value| !value.is_empty())
2023 .map(hash_webhook_secret);
2024 match header_hash {
2025 Some(val) if constant_time_eq(&val, secret_hash.as_ref()) => {}
2026 _ => {
2027 tracing::warn!("Webhook: rejected request — invalid or missing X-Webhook-Secret");
2028 let err = serde_json::json!({"error": "Unauthorized — invalid or missing X-Webhook-Secret header"});
2029 return (StatusCode::UNAUTHORIZED, Json(err));
2030 }
2031 }
2032 }
2033
2034 let Json(webhook_body) = match body {
2036 Ok(b) => b,
2037 Err(e) => {
2038 tracing::warn!("Webhook JSON parse error: {e}");
2039 let err = serde_json::json!({
2040 "error": "Invalid JSON body. Expected: {\"message\": \"...\"}"
2041 });
2042 return (StatusCode::BAD_REQUEST, Json(err));
2043 }
2044 };
2045
2046 if let Some(idempotency_key) = headers
2048 .get("X-Idempotency-Key")
2049 .and_then(|v| v.to_str().ok())
2050 .map(str::trim)
2051 .filter(|value| !value.is_empty())
2052 {
2053 if !state.idempotency_store.record_if_new(idempotency_key) {
2054 tracing::info!("Webhook duplicate ignored (idempotency key: {idempotency_key})");
2055 let body = serde_json::json!({
2056 "status": "duplicate",
2057 "idempotent": true,
2058 "message": "Request already processed for this idempotency key"
2059 });
2060 return (StatusCode::OK, Json(body));
2061 }
2062 }
2063
2064 let message = &webhook_body.message;
2065 let session_id = webhook_session_id(&headers);
2066
2067 if state.auto_save && !memory::should_skip_autosave_content(message) {
2068 let key = webhook_memory_key();
2069 let _ = state
2070 .mem
2071 .store(
2072 &key,
2073 message,
2074 MemoryCategory::Conversation,
2075 session_id.as_deref(),
2076 )
2077 .await;
2078 }
2079
2080 let provider_label = state
2081 .config
2082 .lock()
2083 .default_provider
2084 .clone()
2085 .unwrap_or_else(|| "unknown".to_string());
2086 let model_label = state.model.clone();
2087 let started_at = Instant::now();
2088
2089 state
2090 .observer
2091 .record_event(&crate::observability::ObserverEvent::AgentStart {
2092 provider: provider_label.clone(),
2093 model: model_label.clone(),
2094 });
2095 state
2096 .observer
2097 .record_event(&crate::observability::ObserverEvent::LlmRequest {
2098 provider: provider_label.clone(),
2099 model: model_label.clone(),
2100 messages_count: 1,
2101 });
2102
2103 match run_gateway_chat_simple(&state, message).await {
2104 Ok(response) => {
2105 let duration = started_at.elapsed();
2106 state
2107 .observer
2108 .record_event(&crate::observability::ObserverEvent::LlmResponse {
2109 provider: provider_label.clone(),
2110 model: model_label.clone(),
2111 duration,
2112 success: true,
2113 error_message: None,
2114 input_tokens: None,
2115 output_tokens: None,
2116 });
2117 state.observer.record_metric(
2118 &crate::observability::traits::ObserverMetric::RequestLatency(duration),
2119 );
2120 state
2121 .observer
2122 .record_event(&crate::observability::ObserverEvent::AgentEnd {
2123 provider: provider_label,
2124 model: model_label,
2125 duration,
2126 tokens_used: None,
2127 cost_usd: None,
2128 });
2129
2130 let body = serde_json::json!({"response": response, "model": state.model});
2131 (StatusCode::OK, Json(body))
2132 }
2133 Err(e) => {
2134 let duration = started_at.elapsed();
2135 let sanitized = providers::sanitize_api_error(&e.to_string());
2136
2137 state
2138 .observer
2139 .record_event(&crate::observability::ObserverEvent::LlmResponse {
2140 provider: provider_label.clone(),
2141 model: model_label.clone(),
2142 duration,
2143 success: false,
2144 error_message: Some(sanitized.clone()),
2145 input_tokens: None,
2146 output_tokens: None,
2147 });
2148 state.observer.record_metric(
2149 &crate::observability::traits::ObserverMetric::RequestLatency(duration),
2150 );
2151 state
2152 .observer
2153 .record_event(&crate::observability::ObserverEvent::Error {
2154 component: "gateway".to_string(),
2155 message: sanitized.clone(),
2156 });
2157 state
2158 .observer
2159 .record_event(&crate::observability::ObserverEvent::AgentEnd {
2160 provider: provider_label,
2161 model: model_label,
2162 duration,
2163 tokens_used: None,
2164 cost_usd: None,
2165 });
2166
2167 tracing::error!("Webhook provider error: {}", sanitized);
2168 let err = serde_json::json!({"error": "LLM request failed"});
2169 (StatusCode::INTERNAL_SERVER_ERROR, Json(err))
2170 }
2171 }
2172}
2173
2174#[derive(serde::Deserialize)]
2176pub struct WhatsAppVerifyQuery {
2177 #[serde(rename = "hub.mode")]
2178 pub mode: Option<String>,
2179 #[serde(rename = "hub.verify_token")]
2180 pub verify_token: Option<String>,
2181 #[serde(rename = "hub.challenge")]
2182 pub challenge: Option<String>,
2183}
2184
2185async fn handle_whatsapp_verify(
2187 State(state): State<AppState>,
2188 Query(params): Query<WhatsAppVerifyQuery>,
2189) -> impl IntoResponse {
2190 let Some(ref wa) = state.whatsapp else {
2191 return (StatusCode::NOT_FOUND, "WhatsApp not configured".to_string());
2192 };
2193
2194 let token_matches = params
2196 .verify_token
2197 .as_deref()
2198 .is_some_and(|t| constant_time_eq(t, wa.verify_token()));
2199 if params.mode.as_deref() == Some("subscribe") && token_matches {
2200 if let Some(ch) = params.challenge {
2201 tracing::info!("WhatsApp webhook verified successfully");
2202 return (StatusCode::OK, ch);
2203 }
2204 return (StatusCode::BAD_REQUEST, "Missing hub.challenge".to_string());
2205 }
2206
2207 tracing::warn!("WhatsApp webhook verification failed — token mismatch");
2208 (StatusCode::FORBIDDEN, "Forbidden".to_string())
2209}
2210
2211pub fn verify_whatsapp_signature(app_secret: &str, body: &[u8], signature_header: &str) -> bool {
2215 use hmac::{Hmac, Mac};
2216 use sha2::Sha256;
2217
2218 let Some(hex_sig) = signature_header.strip_prefix("sha256=") else {
2220 return false;
2221 };
2222
2223 let Ok(expected) = hex::decode(hex_sig) else {
2225 return false;
2226 };
2227
2228 let Ok(mut mac) = Hmac::<Sha256>::new_from_slice(app_secret.as_bytes()) else {
2230 return false;
2231 };
2232 mac.update(body);
2233
2234 mac.verify_slice(&expected).is_ok()
2236}
2237
2238async fn handle_whatsapp_message(
2240 State(state): State<AppState>,
2241 headers: HeaderMap,
2242 body: Bytes,
2243) -> impl IntoResponse {
2244 let Some(ref wa) = state.whatsapp else {
2245 return (
2246 StatusCode::NOT_FOUND,
2247 Json(serde_json::json!({"error": "WhatsApp not configured"})),
2248 );
2249 };
2250
2251 if let Some(ref app_secret) = state.whatsapp_app_secret {
2253 let signature = headers
2254 .get("X-Hub-Signature-256")
2255 .and_then(|v| v.to_str().ok())
2256 .unwrap_or("");
2257
2258 if !verify_whatsapp_signature(app_secret, &body, signature) {
2259 tracing::warn!(
2260 "WhatsApp webhook signature verification failed (signature: {})",
2261 if signature.is_empty() {
2262 "missing"
2263 } else {
2264 "invalid"
2265 }
2266 );
2267 return (
2268 StatusCode::UNAUTHORIZED,
2269 Json(serde_json::json!({"error": "Invalid signature"})),
2270 );
2271 }
2272 }
2273
2274 let Ok(payload) = serde_json::from_slice::<serde_json::Value>(&body) else {
2276 return (
2277 StatusCode::BAD_REQUEST,
2278 Json(serde_json::json!({"error": "Invalid JSON payload"})),
2279 );
2280 };
2281
2282 let messages = wa.parse_webhook_payload(&payload);
2284
2285 if messages.is_empty() {
2286 return (StatusCode::OK, Json(serde_json::json!({"status": "ok"})));
2288 }
2289
2290 for msg in &messages {
2292 tracing::info!(
2293 "WhatsApp message from {}: {}",
2294 msg.sender,
2295 truncate_with_ellipsis(&msg.content, 50)
2296 );
2297 let session_id = sender_session_id("whatsapp", msg);
2298
2299 if state.auto_save && !memory::should_skip_autosave_content(&msg.content) {
2301 let key = whatsapp_memory_key(msg);
2302 let _ = state
2303 .mem
2304 .store(
2305 &key,
2306 &msg.content,
2307 MemoryCategory::Conversation,
2308 Some(&session_id),
2309 )
2310 .await;
2311 }
2312
2313 match Box::pin(run_gateway_chat_with_tools(
2314 &state,
2315 &msg.content,
2316 Some(&session_id),
2317 ))
2318 .await
2319 {
2320 Ok(response) => {
2321 if let Err(e) = wa
2323 .send(&SendMessage::new(response, &msg.reply_target))
2324 .await
2325 {
2326 tracing::error!("Failed to send WhatsApp reply: {e}");
2327 }
2328 }
2329 Err(e) => {
2330 tracing::error!("LLM error for WhatsApp message: {e:#}");
2331 let _ = wa
2332 .send(&SendMessage::new(
2333 "Sorry, I couldn't process your message right now.",
2334 &msg.reply_target,
2335 ))
2336 .await;
2337 }
2338 }
2339 }
2340
2341 (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
2343}
2344
2345async fn handle_linq_webhook(
2347 State(state): State<AppState>,
2348 headers: HeaderMap,
2349 body: Bytes,
2350) -> impl IntoResponse {
2351 let Some(ref linq) = state.linq else {
2352 return (
2353 StatusCode::NOT_FOUND,
2354 Json(serde_json::json!({"error": "Linq not configured"})),
2355 );
2356 };
2357
2358 let body_str = String::from_utf8_lossy(&body);
2359
2360 if let Some(ref signing_secret) = state.linq_signing_secret {
2362 let timestamp = headers
2363 .get("X-Webhook-Timestamp")
2364 .and_then(|v| v.to_str().ok())
2365 .unwrap_or("");
2366
2367 let signature = headers
2368 .get("X-Webhook-Signature")
2369 .and_then(|v| v.to_str().ok())
2370 .unwrap_or("");
2371
2372 if !crate::channels::linq::verify_linq_signature(
2373 signing_secret,
2374 &body_str,
2375 timestamp,
2376 signature,
2377 ) {
2378 tracing::warn!(
2379 "Linq webhook signature verification failed (signature: {})",
2380 if signature.is_empty() {
2381 "missing"
2382 } else {
2383 "invalid"
2384 }
2385 );
2386 return (
2387 StatusCode::UNAUTHORIZED,
2388 Json(serde_json::json!({"error": "Invalid signature"})),
2389 );
2390 }
2391 }
2392
2393 let Ok(payload) = serde_json::from_slice::<serde_json::Value>(&body) else {
2395 return (
2396 StatusCode::BAD_REQUEST,
2397 Json(serde_json::json!({"error": "Invalid JSON payload"})),
2398 );
2399 };
2400
2401 let messages = linq.parse_webhook_payload(&payload);
2403
2404 if messages.is_empty() {
2405 return (StatusCode::OK, Json(serde_json::json!({"status": "ok"})));
2407 }
2408
2409 for msg in &messages {
2411 tracing::info!(
2412 "Linq message from {}: {}",
2413 msg.sender,
2414 truncate_with_ellipsis(&msg.content, 50)
2415 );
2416 let session_id = sender_session_id("linq", msg);
2417
2418 if state.auto_save && !memory::should_skip_autosave_content(&msg.content) {
2420 let key = linq_memory_key(msg);
2421 let _ = state
2422 .mem
2423 .store(
2424 &key,
2425 &msg.content,
2426 MemoryCategory::Conversation,
2427 Some(&session_id),
2428 )
2429 .await;
2430 }
2431
2432 match Box::pin(run_gateway_chat_with_tools(
2434 &state,
2435 &msg.content,
2436 Some(&session_id),
2437 ))
2438 .await
2439 {
2440 Ok(response) => {
2441 if let Err(e) = linq
2443 .send(&SendMessage::new(response, &msg.reply_target))
2444 .await
2445 {
2446 tracing::error!("Failed to send Linq reply: {e}");
2447 }
2448 }
2449 Err(e) => {
2450 tracing::error!("LLM error for Linq message: {e:#}");
2451 let _ = linq
2452 .send(&SendMessage::new(
2453 "Sorry, I couldn't process your message right now.",
2454 &msg.reply_target,
2455 ))
2456 .await;
2457 }
2458 }
2459 }
2460
2461 (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
2463}
2464
2465async fn handle_wati_verify(
2467 State(state): State<AppState>,
2468 Query(params): Query<WatiVerifyQuery>,
2469) -> impl IntoResponse {
2470 if state.wati.is_none() {
2471 return (StatusCode::NOT_FOUND, "WATI not configured".to_string());
2472 }
2473
2474 if let Some(challenge) = params.challenge {
2476 tracing::info!("WATI webhook verified successfully");
2477 return (StatusCode::OK, challenge);
2478 }
2479
2480 (StatusCode::BAD_REQUEST, "Missing hub.challenge".to_string())
2481}
2482
2483#[derive(Debug, serde::Deserialize)]
2484pub struct WatiVerifyQuery {
2485 #[serde(rename = "hub.challenge")]
2486 pub challenge: Option<String>,
2487}
2488
2489async fn handle_wati_webhook(State(state): State<AppState>, body: Bytes) -> impl IntoResponse {
2491 let Some(ref wati) = state.wati else {
2492 return (
2493 StatusCode::NOT_FOUND,
2494 Json(serde_json::json!({"error": "WATI not configured"})),
2495 );
2496 };
2497
2498 let Ok(payload) = serde_json::from_slice::<serde_json::Value>(&body) else {
2500 return (
2501 StatusCode::BAD_REQUEST,
2502 Json(serde_json::json!({"error": "Invalid JSON payload"})),
2503 );
2504 };
2505
2506 let msg_type = payload.get("type").and_then(|v| v.as_str()).unwrap_or("");
2508
2509 let messages = if matches!(msg_type, "audio" | "voice") {
2510 if let Some(transcript) = wati.try_transcribe_audio(&payload).await {
2512 wati.parse_audio_as_message(&payload, transcript)
2513 } else {
2514 vec![]
2515 }
2516 } else {
2517 wati.parse_webhook_payload(&payload)
2518 };
2519
2520 if messages.is_empty() {
2521 return (StatusCode::OK, Json(serde_json::json!({"status": "ok"})));
2522 }
2523
2524 for msg in &messages {
2526 tracing::info!(
2527 "WATI message from {}: {}",
2528 msg.sender,
2529 truncate_with_ellipsis(&msg.content, 50)
2530 );
2531 let session_id = sender_session_id("wati", msg);
2532
2533 if state.auto_save && !memory::should_skip_autosave_content(&msg.content) {
2535 let key = wati_memory_key(msg);
2536 let _ = state
2537 .mem
2538 .store(
2539 &key,
2540 &msg.content,
2541 MemoryCategory::Conversation,
2542 Some(&session_id),
2543 )
2544 .await;
2545 }
2546
2547 match Box::pin(run_gateway_chat_with_tools(
2549 &state,
2550 &msg.content,
2551 Some(&session_id),
2552 ))
2553 .await
2554 {
2555 Ok(response) => {
2556 if let Err(e) = wati
2558 .send(&SendMessage::new(response, &msg.reply_target))
2559 .await
2560 {
2561 tracing::error!("Failed to send WATI reply: {e}");
2562 }
2563 }
2564 Err(e) => {
2565 tracing::error!("LLM error for WATI message: {e:#}");
2566 let _ = wati
2567 .send(&SendMessage::new(
2568 "Sorry, I couldn't process your message right now.",
2569 &msg.reply_target,
2570 ))
2571 .await;
2572 }
2573 }
2574 }
2575
2576 (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
2578}
2579
2580async fn handle_nextcloud_talk_webhook(
2582 State(state): State<AppState>,
2583 headers: HeaderMap,
2584 body: Bytes,
2585) -> impl IntoResponse {
2586 let Some(ref nextcloud_talk) = state.nextcloud_talk else {
2587 return (
2588 StatusCode::NOT_FOUND,
2589 Json(serde_json::json!({"error": "Nextcloud Talk not configured"})),
2590 );
2591 };
2592
2593 let body_str = String::from_utf8_lossy(&body);
2594
2595 if let Some(ref webhook_secret) = state.nextcloud_talk_webhook_secret {
2597 let random = headers
2598 .get("X-Nextcloud-Talk-Random")
2599 .and_then(|v| v.to_str().ok())
2600 .unwrap_or("");
2601
2602 let signature = headers
2603 .get("X-Nextcloud-Talk-Signature")
2604 .and_then(|v| v.to_str().ok())
2605 .unwrap_or("");
2606
2607 if !crate::channels::nextcloud_talk::verify_nextcloud_talk_signature(
2608 webhook_secret,
2609 random,
2610 &body_str,
2611 signature,
2612 ) {
2613 tracing::warn!(
2614 "Nextcloud Talk webhook signature verification failed (signature: {})",
2615 if signature.is_empty() {
2616 "missing"
2617 } else {
2618 "invalid"
2619 }
2620 );
2621 return (
2622 StatusCode::UNAUTHORIZED,
2623 Json(serde_json::json!({"error": "Invalid signature"})),
2624 );
2625 }
2626 }
2627
2628 let Ok(payload) = serde_json::from_slice::<serde_json::Value>(&body) else {
2630 return (
2631 StatusCode::BAD_REQUEST,
2632 Json(serde_json::json!({"error": "Invalid JSON payload"})),
2633 );
2634 };
2635
2636 let messages = nextcloud_talk.parse_webhook_payload(&payload);
2638 if messages.is_empty() {
2639 return (StatusCode::OK, Json(serde_json::json!({"status": "ok"})));
2641 }
2642
2643 for msg in &messages {
2644 tracing::info!(
2645 "Nextcloud Talk message from {}: {}",
2646 msg.sender,
2647 truncate_with_ellipsis(&msg.content, 50)
2648 );
2649 let session_id = sender_session_id("nextcloud_talk", msg);
2650
2651 if state.auto_save && !memory::should_skip_autosave_content(&msg.content) {
2652 let key = nextcloud_talk_memory_key(msg);
2653 let _ = state
2654 .mem
2655 .store(
2656 &key,
2657 &msg.content,
2658 MemoryCategory::Conversation,
2659 Some(&session_id),
2660 )
2661 .await;
2662 }
2663
2664 match Box::pin(run_gateway_chat_with_tools(
2665 &state,
2666 &msg.content,
2667 Some(&session_id),
2668 ))
2669 .await
2670 {
2671 Ok(response) => {
2672 if let Err(e) = nextcloud_talk
2673 .send(&SendMessage::new(response, &msg.reply_target))
2674 .await
2675 {
2676 tracing::error!("Failed to send Nextcloud Talk reply: {e}");
2677 }
2678 }
2679 Err(e) => {
2680 tracing::error!("LLM error for Nextcloud Talk message: {e:#}");
2681 let _ = nextcloud_talk
2682 .send(&SendMessage::new(
2683 "Sorry, I couldn't process your message right now.",
2684 &msg.reply_target,
2685 ))
2686 .await;
2687 }
2688 }
2689 }
2690
2691 (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
2692}
2693
2694const GMAIL_WEBHOOK_MAX_BODY: usize = 1024 * 1024;
2697
2698async fn handle_gmail_push_webhook(
2700 State(state): State<AppState>,
2701 headers: HeaderMap,
2702 body: Bytes,
2703) -> impl IntoResponse {
2704 let Some(ref gmail_push) = state.gmail_push else {
2705 return (
2706 StatusCode::NOT_FOUND,
2707 Json(serde_json::json!({"error": "Gmail push not configured"})),
2708 );
2709 };
2710
2711 if body.len() > GMAIL_WEBHOOK_MAX_BODY {
2713 return (
2714 StatusCode::PAYLOAD_TOO_LARGE,
2715 Json(serde_json::json!({"error": "Request body too large"})),
2716 );
2717 }
2718
2719 let secret = gmail_push.resolve_webhook_secret();
2721 if !secret.is_empty() {
2722 let provided = headers
2723 .get(axum::http::header::AUTHORIZATION)
2724 .and_then(|v| v.to_str().ok())
2725 .and_then(|auth| auth.strip_prefix("Bearer "))
2726 .unwrap_or("");
2727
2728 if provided != secret {
2729 tracing::warn!("Gmail push webhook: unauthorized request");
2730 return (
2731 StatusCode::UNAUTHORIZED,
2732 Json(serde_json::json!({"error": "Unauthorized"})),
2733 );
2734 }
2735 }
2736
2737 let body_str = String::from_utf8_lossy(&body);
2738 let envelope: crate::channels::gmail_push::PubSubEnvelope =
2739 match serde_json::from_str(&body_str) {
2740 Ok(e) => e,
2741 Err(e) => {
2742 tracing::warn!("Gmail push webhook: invalid payload: {e}");
2743 return (
2744 StatusCode::BAD_REQUEST,
2745 Json(serde_json::json!({"error": "Invalid Pub/Sub envelope"})),
2746 );
2747 }
2748 };
2749
2750 let channel = Arc::clone(gmail_push);
2752 tokio::spawn(async move {
2753 if let Err(e) = channel.handle_notification(&envelope).await {
2754 tracing::error!("Gmail push notification processing failed: {e:#}");
2755 }
2756 });
2757
2758 (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
2760}
2761
2762#[derive(serde::Serialize)]
2768struct AdminResponse {
2769 success: bool,
2770 message: String,
2771}
2772
2773fn require_localhost(peer: &SocketAddr) -> Result<(), (StatusCode, Json<serde_json::Value>)> {
2775 if peer.ip().is_loopback() {
2776 Ok(())
2777 } else {
2778 Err((
2779 StatusCode::FORBIDDEN,
2780 Json(serde_json::json!({
2781 "error": "Admin endpoints are restricted to localhost"
2782 })),
2783 ))
2784 }
2785}
2786
2787async fn handle_admin_shutdown(
2789 State(state): State<AppState>,
2790 ConnectInfo(peer): ConnectInfo<SocketAddr>,
2791) -> Result<impl IntoResponse, (StatusCode, Json<serde_json::Value>)> {
2792 require_localhost(&peer)?;
2793 tracing::info!("🔌 Admin shutdown request received — initiating graceful shutdown");
2794
2795 let body = AdminResponse {
2796 success: true,
2797 message: "Gateway shutdown initiated".to_string(),
2798 };
2799
2800 let _ = state.shutdown_tx.send(true);
2801
2802 Ok((StatusCode::OK, Json(body)))
2803}
2804
2805async fn handle_admin_paircode(
2807 State(state): State<AppState>,
2808 ConnectInfo(peer): ConnectInfo<SocketAddr>,
2809) -> Result<impl IntoResponse, (StatusCode, Json<serde_json::Value>)> {
2810 require_localhost(&peer)?;
2811 let code = state.pairing.pairing_code();
2812
2813 let body = if let Some(c) = code {
2814 serde_json::json!({
2815 "success": true,
2816 "pairing_required": state.pairing.require_pairing(),
2817 "pairing_code": c,
2818 "message": "Use this one-time code to pair"
2819 })
2820 } else {
2821 serde_json::json!({
2822 "success": true,
2823 "pairing_required": state.pairing.require_pairing(),
2824 "pairing_code": null,
2825 "message": if state.pairing.require_pairing() {
2826 "Pairing is active but no new code available (already paired or code expired)"
2827 } else {
2828 "Pairing is disabled for this gateway"
2829 }
2830 })
2831 };
2832
2833 Ok((StatusCode::OK, Json(body)))
2834}
2835
2836async fn handle_admin_paircode_new(
2838 State(state): State<AppState>,
2839 ConnectInfo(peer): ConnectInfo<SocketAddr>,
2840) -> Result<impl IntoResponse, (StatusCode, Json<serde_json::Value>)> {
2841 require_localhost(&peer)?;
2842 match state.pairing.generate_new_pairing_code() {
2843 Some(code) => {
2844 tracing::info!("🔐 New pairing code generated via admin endpoint");
2845 let body = serde_json::json!({
2846 "success": true,
2847 "pairing_required": state.pairing.require_pairing(),
2848 "pairing_code": code,
2849 "message": "New pairing code generated — use this one-time code to pair"
2850 });
2851 Ok((StatusCode::OK, Json(body)))
2852 }
2853 None => {
2854 let body = serde_json::json!({
2855 "success": false,
2856 "pairing_required": false,
2857 "pairing_code": null,
2858 "message": "Pairing is disabled for this gateway"
2859 });
2860 Ok((StatusCode::BAD_REQUEST, Json(body)))
2861 }
2862 }
2863}
2864
2865async fn handle_pair_code(
2873 State(state): State<AppState>,
2874 ConnectInfo(peer): ConnectInfo<SocketAddr>,
2875) -> Result<impl IntoResponse, (StatusCode, Json<serde_json::Value>)> {
2876 require_localhost(&peer)?;
2877
2878 let require = state.pairing.require_pairing();
2879 let is_paired = state.pairing.is_paired();
2880
2881 let code = if require && !is_paired {
2882 state.pairing.pairing_code()
2883 } else {
2884 None
2885 };
2886
2887 let body = serde_json::json!({
2888 "success": true,
2889 "pairing_required": require,
2890 "pairing_code": code,
2891 });
2892
2893 Ok((StatusCode::OK, Json(body)))
2894}
2895
2896#[cfg(test)]
2897mod tests {
2898 use super::*;
2899 use crate::channels::traits::ChannelMessage;
2900 use crate::memory::{Memory, MemoryCategory, MemoryEntry};
2901 use crate::providers::Provider;
2902 use async_trait::async_trait;
2903 use axum::http::HeaderValue;
2904 use axum::response::IntoResponse;
2905 use http_body_util::BodyExt;
2906 use parking_lot::Mutex;
2907 use std::sync::atomic::{AtomicUsize, Ordering};
2908
2909 fn generate_test_secret() -> String {
2911 let bytes: [u8; 32] = rand::random();
2912 hex::encode(bytes)
2913 }
2914
2915 #[test]
2916 fn security_body_limit_is_64kb() {
2917 assert_eq!(MAX_BODY_SIZE, 65_536);
2918 }
2919
2920 #[test]
2921 fn security_timeout_default_is_30_seconds() {
2922 assert_eq!(REQUEST_TIMEOUT_SECS, 30);
2923 }
2924
2925 #[test]
2926 fn gateway_timeout_falls_back_to_default() {
2927 unsafe { std::env::remove_var("CONSTRUCT_GATEWAY_TIMEOUT_SECS") };
2930 assert_eq!(gateway_request_timeout_secs(), 30);
2931 }
2932
2933 #[test]
2934 fn webhook_body_requires_message_field() {
2935 let valid = r#"{"message": "hello"}"#;
2936 let parsed: Result<WebhookBody, _> = serde_json::from_str(valid);
2937 assert!(parsed.is_ok());
2938 assert_eq!(parsed.unwrap().message, "hello");
2939
2940 let missing = r#"{"other": "field"}"#;
2941 let parsed: Result<WebhookBody, _> = serde_json::from_str(missing);
2942 assert!(parsed.is_err());
2943 }
2944
2945 #[test]
2946 fn whatsapp_query_fields_are_optional() {
2947 let q = WhatsAppVerifyQuery {
2948 mode: None,
2949 verify_token: None,
2950 challenge: None,
2951 };
2952 assert!(q.mode.is_none());
2953 }
2954
2955 #[test]
2956 fn app_state_is_clone() {
2957 fn assert_clone<T: Clone>() {}
2958 assert_clone::<AppState>();
2959 }
2960
2961 #[tokio::test]
2962 async fn metrics_endpoint_returns_hint_when_prometheus_is_disabled() {
2963 let state = AppState {
2964 config: Arc::new(Mutex::new(Config::default())),
2965 provider: Arc::new(MockProvider::default()),
2966 model: "test-model".into(),
2967 temperature: 0.0,
2968 mem: Arc::new(MockMemory),
2969 auto_save: false,
2970 webhook_secret_hash: None,
2971 pairing: Arc::new(PairingGuard::new(false, &[])),
2972 trust_forwarded_headers: false,
2973 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
2974 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
2975 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
2976 whatsapp: None,
2977 whatsapp_app_secret: None,
2978 linq: None,
2979 linq_signing_secret: None,
2980 nextcloud_talk: None,
2981 nextcloud_talk_webhook_secret: None,
2982 wati: None,
2983 gmail_push: None,
2984 observer: Arc::new(crate::observability::NoopObserver),
2985 tools_registry: Arc::new(Vec::new()),
2986 cost_tracker: None,
2987 audit_logger: None,
2988 event_tx: tokio::sync::broadcast::channel(16).0,
2989 shutdown_tx: tokio::sync::watch::channel(false).0,
2990 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
2991 path_prefix: String::new(),
2992 session_backend: None,
2993 session_queue: std::sync::Arc::new(
2994 crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
2995 ),
2996 device_registry: None,
2997 pending_pairings: None,
2998 canvas_store: CanvasStore::new(),
2999 mcp_registry: None,
3000 approval_registry: approval_registry::global(),
3001 mcp_local_url: None,
3002 #[cfg(feature = "webauthn")]
3003 webauthn: None,
3004 };
3005
3006 let response = handle_metrics(State(state)).await.into_response();
3007 assert_eq!(response.status(), StatusCode::OK);
3008 assert_eq!(
3009 response
3010 .headers()
3011 .get(header::CONTENT_TYPE)
3012 .and_then(|value| value.to_str().ok()),
3013 Some(PROMETHEUS_CONTENT_TYPE)
3014 );
3015
3016 let body = response.into_body().collect().await.unwrap().to_bytes();
3017 let text = String::from_utf8(body.to_vec()).unwrap();
3018 assert!(text.contains("Prometheus backend not enabled"));
3019 }
3020
3021 #[cfg(feature = "observability-prometheus")]
3022 #[tokio::test]
3023 async fn metrics_endpoint_renders_prometheus_output() {
3024 let event_tx = tokio::sync::broadcast::channel(16).0;
3025 let wrapped = sse::BroadcastObserver::new(
3026 Box::new(crate::observability::PrometheusObserver::new()),
3027 event_tx.clone(),
3028 );
3029 crate::observability::Observer::record_event(
3030 &wrapped,
3031 &crate::observability::ObserverEvent::HeartbeatTick,
3032 );
3033
3034 let observer: Arc<dyn crate::observability::Observer> = Arc::new(wrapped);
3035 let state = AppState {
3036 config: Arc::new(Mutex::new(Config::default())),
3037 provider: Arc::new(MockProvider::default()),
3038 model: "test-model".into(),
3039 temperature: 0.0,
3040 mem: Arc::new(MockMemory),
3041 auto_save: false,
3042 webhook_secret_hash: None,
3043 pairing: Arc::new(PairingGuard::new(false, &[])),
3044 trust_forwarded_headers: false,
3045 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3046 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3047 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3048 whatsapp: None,
3049 whatsapp_app_secret: None,
3050 linq: None,
3051 linq_signing_secret: None,
3052 nextcloud_talk: None,
3053 nextcloud_talk_webhook_secret: None,
3054 wati: None,
3055 gmail_push: None,
3056 observer,
3057 tools_registry: Arc::new(Vec::new()),
3058 cost_tracker: None,
3059 audit_logger: None,
3060 event_tx,
3061 shutdown_tx: tokio::sync::watch::channel(false).0,
3062 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3063 path_prefix: String::new(),
3064 session_backend: None,
3065 session_queue: std::sync::Arc::new(
3066 crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3067 ),
3068 device_registry: None,
3069 pending_pairings: None,
3070 canvas_store: CanvasStore::new(),
3071 mcp_registry: None,
3072 approval_registry: approval_registry::global(),
3073 mcp_local_url: None,
3074 #[cfg(feature = "webauthn")]
3075 webauthn: None,
3076 };
3077
3078 let response = handle_metrics(State(state)).await.into_response();
3079 assert_eq!(response.status(), StatusCode::OK);
3080
3081 let body = response.into_body().collect().await.unwrap().to_bytes();
3082 let text = String::from_utf8(body.to_vec()).unwrap();
3083 assert!(text.contains("construct_heartbeat_ticks_total 1"));
3084 }
3085
3086 #[test]
3087 fn gateway_rate_limiter_blocks_after_limit() {
3088 let limiter = GatewayRateLimiter::new(2, 2, 100);
3089 assert!(limiter.allow_pair("127.0.0.1"));
3090 assert!(limiter.allow_pair("127.0.0.1"));
3091 assert!(!limiter.allow_pair("127.0.0.1"));
3092 }
3093
3094 #[test]
3095 fn rate_limiter_sweep_removes_stale_entries() {
3096 let limiter = SlidingWindowRateLimiter::new(10, Duration::from_secs(60), 100);
3097 assert!(limiter.allow("ip-1"));
3099 assert!(limiter.allow("ip-2"));
3100 assert!(limiter.allow("ip-3"));
3101
3102 {
3103 let guard = limiter.requests.lock();
3104 assert_eq!(guard.0.len(), 3);
3105 }
3106
3107 {
3109 let mut guard = limiter.requests.lock();
3110 guard.1 = Instant::now()
3111 .checked_sub(Duration::from_secs(RATE_LIMITER_SWEEP_INTERVAL_SECS + 1))
3112 .unwrap();
3113 guard.0.get_mut("ip-2").unwrap().clear();
3115 guard.0.get_mut("ip-3").unwrap().clear();
3116 }
3117
3118 assert!(limiter.allow("ip-1"));
3120
3121 {
3122 let guard = limiter.requests.lock();
3123 assert_eq!(guard.0.len(), 1, "Stale entries should have been swept");
3124 assert!(guard.0.contains_key("ip-1"));
3125 }
3126 }
3127
3128 #[test]
3129 fn rate_limiter_zero_limit_always_allows() {
3130 let limiter = SlidingWindowRateLimiter::new(0, Duration::from_secs(60), 10);
3131 for _ in 0..100 {
3132 assert!(limiter.allow("any-key"));
3133 }
3134 }
3135
3136 #[test]
3137 fn idempotency_store_rejects_duplicate_key() {
3138 let store = IdempotencyStore::new(Duration::from_secs(30), 10);
3139 assert!(store.record_if_new("req-1"));
3140 assert!(!store.record_if_new("req-1"));
3141 assert!(store.record_if_new("req-2"));
3142 }
3143
3144 #[test]
3145 fn rate_limiter_bounded_cardinality_evicts_oldest_key() {
3146 let limiter = SlidingWindowRateLimiter::new(5, Duration::from_secs(60), 2);
3147 assert!(limiter.allow("ip-1"));
3148 assert!(limiter.allow("ip-2"));
3149 assert!(limiter.allow("ip-3"));
3150
3151 let guard = limiter.requests.lock();
3152 assert_eq!(guard.0.len(), 2);
3153 assert!(guard.0.contains_key("ip-2"));
3154 assert!(guard.0.contains_key("ip-3"));
3155 }
3156
3157 #[test]
3158 fn idempotency_store_bounded_cardinality_evicts_oldest_key() {
3159 let store = IdempotencyStore::new(Duration::from_secs(300), 2);
3160 assert!(store.record_if_new("k1"));
3161 std::thread::sleep(Duration::from_millis(2));
3162 assert!(store.record_if_new("k2"));
3163 std::thread::sleep(Duration::from_millis(2));
3164 assert!(store.record_if_new("k3"));
3165
3166 let keys = store.keys.lock();
3167 assert_eq!(keys.len(), 2);
3168 assert!(!keys.contains_key("k1"));
3169 assert!(keys.contains_key("k2"));
3170 assert!(keys.contains_key("k3"));
3171 }
3172
3173 #[test]
3174 fn client_key_defaults_to_peer_addr_when_untrusted_proxy_mode() {
3175 let peer = SocketAddr::from(([10, 0, 0, 5], 42617));
3176 let mut headers = HeaderMap::new();
3177 headers.insert(
3178 "X-Forwarded-For",
3179 HeaderValue::from_static("198.51.100.10, 203.0.113.11"),
3180 );
3181
3182 let key = client_key_from_request(Some(peer), &headers, false);
3183 assert_eq!(key, "10.0.0.5");
3184 }
3185
3186 #[test]
3187 fn client_key_uses_forwarded_ip_only_in_trusted_proxy_mode() {
3188 let peer = SocketAddr::from(([10, 0, 0, 5], 42617));
3191 let mut headers = HeaderMap::new();
3192 headers.insert(
3193 "X-Forwarded-For",
3194 HeaderValue::from_static("198.51.100.10, 203.0.113.11"),
3195 );
3196
3197 let key = client_key_from_request(Some(peer), &headers, true);
3198 assert_eq!(key, "203.0.113.11");
3199 }
3200
3201 #[test]
3202 fn client_key_spoofed_leftmost_xff_does_not_bypass_trust() {
3203 let peer = SocketAddr::from(([10, 0, 0, 5], 42617));
3208 let mut headers = HeaderMap::new();
3209 headers.insert(
3210 "X-Forwarded-For",
3211 HeaderValue::from_static("127.0.0.1, 203.0.113.11"),
3212 );
3213
3214 let key = client_key_from_request(Some(peer), &headers, true);
3215 assert_eq!(key, "203.0.113.11");
3216 }
3217
3218 #[test]
3219 fn client_key_falls_back_to_peer_when_forwarded_header_invalid() {
3220 let peer = SocketAddr::from(([10, 0, 0, 5], 42617));
3221 let mut headers = HeaderMap::new();
3222 headers.insert("X-Forwarded-For", HeaderValue::from_static("garbage-value"));
3223
3224 let key = client_key_from_request(Some(peer), &headers, true);
3225 assert_eq!(key, "10.0.0.5");
3226 }
3227
3228 #[test]
3229 fn normalize_max_keys_uses_fallback_for_zero() {
3230 assert_eq!(normalize_max_keys(0, 10_000), 10_000);
3231 assert_eq!(normalize_max_keys(0, 0), 1);
3232 }
3233
3234 #[test]
3235 fn normalize_max_keys_preserves_nonzero_values() {
3236 assert_eq!(normalize_max_keys(2_048, 10_000), 2_048);
3237 assert_eq!(normalize_max_keys(1, 10_000), 1);
3238 }
3239
3240 #[tokio::test]
3241 async fn persist_pairing_tokens_writes_config_tokens() {
3242 let temp = tempfile::tempdir().unwrap();
3243 let config_path = temp.path().join("config.toml");
3244 let workspace_path = temp.path().join("workspace");
3245
3246 let mut config = Config::default();
3247 config.config_path = config_path.clone();
3248 config.workspace_dir = workspace_path;
3249 config.save().await.unwrap();
3250
3251 let guard = PairingGuard::new(true, &[]);
3252 let code = guard.pairing_code().unwrap();
3253 let token = guard.try_pair(&code, "test_client").await.unwrap().unwrap();
3254 assert!(guard.is_authenticated(&token));
3255
3256 let shared_config = Arc::new(Mutex::new(config));
3257 Box::pin(persist_pairing_tokens(shared_config.clone(), &guard))
3258 .await
3259 .unwrap();
3260
3261 let plaintext = {
3263 let in_memory = shared_config.lock();
3264 assert_eq!(in_memory.gateway.paired_tokens.len(), 1);
3265 in_memory.gateway.paired_tokens[0].clone()
3266 };
3267 assert_eq!(plaintext.len(), 64);
3268 assert!(plaintext.chars().all(|c: char| c.is_ascii_hexdigit()));
3269
3270 let saved = tokio::fs::read_to_string(config_path).await.unwrap();
3272 let raw_parsed: Config = toml::from_str(&saved).unwrap();
3273 assert_eq!(raw_parsed.gateway.paired_tokens.len(), 1);
3274 let on_disk = &raw_parsed.gateway.paired_tokens[0];
3275 assert!(
3276 crate::security::SecretStore::is_encrypted(on_disk),
3277 "paired_token should be encrypted on disk"
3278 );
3279 }
3280
3281 #[test]
3282 fn webhook_memory_key_is_unique() {
3283 let key1 = webhook_memory_key();
3284 let key2 = webhook_memory_key();
3285
3286 assert!(key1.starts_with("webhook_msg_"));
3287 assert!(key2.starts_with("webhook_msg_"));
3288 assert_ne!(key1, key2);
3289 }
3290
3291 #[test]
3292 fn whatsapp_memory_key_includes_sender_and_message_id() {
3293 let msg = ChannelMessage {
3294 id: "wamid-123".into(),
3295 sender: "+1234567890".into(),
3296 reply_target: "+1234567890".into(),
3297 content: "hello".into(),
3298 channel: "whatsapp".into(),
3299 timestamp: 1,
3300 thread_ts: None,
3301 interruption_scope_id: None,
3302 attachments: vec![],
3303 };
3304
3305 let key = whatsapp_memory_key(&msg);
3306 assert_eq!(key, "whatsapp_+1234567890_wamid-123");
3307 }
3308
3309 #[derive(Default)]
3310 struct MockMemory;
3311
3312 #[async_trait]
3313 impl Memory for MockMemory {
3314 fn name(&self) -> &str {
3315 "mock"
3316 }
3317
3318 async fn store(
3319 &self,
3320 _key: &str,
3321 _content: &str,
3322 _category: MemoryCategory,
3323 _session_id: Option<&str>,
3324 ) -> anyhow::Result<()> {
3325 Ok(())
3326 }
3327
3328 async fn recall(
3329 &self,
3330 _query: &str,
3331 _limit: usize,
3332 _session_id: Option<&str>,
3333 _since: Option<&str>,
3334 _until: Option<&str>,
3335 ) -> anyhow::Result<Vec<MemoryEntry>> {
3336 Ok(Vec::new())
3337 }
3338
3339 async fn get(&self, _key: &str) -> anyhow::Result<Option<MemoryEntry>> {
3340 Ok(None)
3341 }
3342
3343 async fn list(
3344 &self,
3345 _category: Option<&MemoryCategory>,
3346 _session_id: Option<&str>,
3347 ) -> anyhow::Result<Vec<MemoryEntry>> {
3348 Ok(Vec::new())
3349 }
3350
3351 async fn forget(&self, _key: &str) -> anyhow::Result<bool> {
3352 Ok(false)
3353 }
3354
3355 async fn count(&self) -> anyhow::Result<usize> {
3356 Ok(0)
3357 }
3358
3359 async fn health_check(&self) -> bool {
3360 true
3361 }
3362 }
3363
3364 #[derive(Default)]
3365 struct MockProvider {
3366 calls: AtomicUsize,
3367 }
3368
3369 #[async_trait]
3370 impl Provider for MockProvider {
3371 async fn chat_with_system(
3372 &self,
3373 _system_prompt: Option<&str>,
3374 _message: &str,
3375 _model: &str,
3376 _temperature: f64,
3377 ) -> anyhow::Result<String> {
3378 self.calls.fetch_add(1, Ordering::SeqCst);
3379 Ok("ok".into())
3380 }
3381 }
3382
3383 #[derive(Default)]
3384 struct TrackingMemory {
3385 keys: Mutex<Vec<String>>,
3386 }
3387
3388 #[async_trait]
3389 impl Memory for TrackingMemory {
3390 fn name(&self) -> &str {
3391 "tracking"
3392 }
3393
3394 async fn store(
3395 &self,
3396 key: &str,
3397 _content: &str,
3398 _category: MemoryCategory,
3399 _session_id: Option<&str>,
3400 ) -> anyhow::Result<()> {
3401 self.keys.lock().push(key.to_string());
3402 Ok(())
3403 }
3404
3405 async fn recall(
3406 &self,
3407 _query: &str,
3408 _limit: usize,
3409 _session_id: Option<&str>,
3410 _since: Option<&str>,
3411 _until: Option<&str>,
3412 ) -> anyhow::Result<Vec<MemoryEntry>> {
3413 Ok(Vec::new())
3414 }
3415
3416 async fn get(&self, _key: &str) -> anyhow::Result<Option<MemoryEntry>> {
3417 Ok(None)
3418 }
3419
3420 async fn list(
3421 &self,
3422 _category: Option<&MemoryCategory>,
3423 _session_id: Option<&str>,
3424 ) -> anyhow::Result<Vec<MemoryEntry>> {
3425 Ok(Vec::new())
3426 }
3427
3428 async fn forget(&self, _key: &str) -> anyhow::Result<bool> {
3429 Ok(false)
3430 }
3431
3432 async fn count(&self) -> anyhow::Result<usize> {
3433 let size = self.keys.lock().len();
3434 Ok(size)
3435 }
3436
3437 async fn health_check(&self) -> bool {
3438 true
3439 }
3440 }
3441
3442 fn test_connect_info() -> ConnectInfo<SocketAddr> {
3443 ConnectInfo(SocketAddr::from(([127, 0, 0, 1], 30_300)))
3444 }
3445
3446 #[tokio::test]
3447 async fn webhook_idempotency_skips_duplicate_provider_calls() {
3448 let provider_impl = Arc::new(MockProvider::default());
3449 let provider: Arc<dyn Provider> = provider_impl.clone();
3450 let memory: Arc<dyn Memory> = Arc::new(MockMemory);
3451
3452 let state = AppState {
3453 config: Arc::new(Mutex::new(Config::default())),
3454 provider,
3455 model: "test-model".into(),
3456 temperature: 0.0,
3457 mem: memory,
3458 auto_save: false,
3459 webhook_secret_hash: None,
3460 pairing: Arc::new(PairingGuard::new(false, &[])),
3461 trust_forwarded_headers: false,
3462 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3463 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3464 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3465 whatsapp: None,
3466 whatsapp_app_secret: None,
3467 linq: None,
3468 linq_signing_secret: None,
3469 nextcloud_talk: None,
3470 nextcloud_talk_webhook_secret: None,
3471 wati: None,
3472 gmail_push: None,
3473 observer: Arc::new(crate::observability::NoopObserver),
3474 tools_registry: Arc::new(Vec::new()),
3475 cost_tracker: None,
3476 audit_logger: None,
3477 event_tx: tokio::sync::broadcast::channel(16).0,
3478 shutdown_tx: tokio::sync::watch::channel(false).0,
3479 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3480 path_prefix: String::new(),
3481 session_backend: None,
3482 session_queue: std::sync::Arc::new(
3483 crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3484 ),
3485 device_registry: None,
3486 pending_pairings: None,
3487 canvas_store: CanvasStore::new(),
3488 mcp_registry: None,
3489 approval_registry: approval_registry::global(),
3490 mcp_local_url: None,
3491 #[cfg(feature = "webauthn")]
3492 webauthn: None,
3493 };
3494
3495 let mut headers = HeaderMap::new();
3496 headers.insert("X-Idempotency-Key", HeaderValue::from_static("abc-123"));
3497
3498 let body = Ok(Json(WebhookBody {
3499 message: "hello".into(),
3500 }));
3501 let first = handle_webhook(
3502 State(state.clone()),
3503 test_connect_info(),
3504 headers.clone(),
3505 body,
3506 )
3507 .await
3508 .into_response();
3509 assert_eq!(first.status(), StatusCode::OK);
3510
3511 let body = Ok(Json(WebhookBody {
3512 message: "hello".into(),
3513 }));
3514 let second = handle_webhook(State(state), test_connect_info(), headers, body)
3515 .await
3516 .into_response();
3517 assert_eq!(second.status(), StatusCode::OK);
3518
3519 let payload = second.into_body().collect().await.unwrap().to_bytes();
3520 let parsed: serde_json::Value = serde_json::from_slice(&payload).unwrap();
3521 assert_eq!(parsed["status"], "duplicate");
3522 assert_eq!(parsed["idempotent"], true);
3523 assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 1);
3524 }
3525
3526 #[tokio::test]
3527 async fn webhook_autosave_stores_distinct_keys_per_request() {
3528 let provider_impl = Arc::new(MockProvider::default());
3529 let provider: Arc<dyn Provider> = provider_impl.clone();
3530
3531 let tracking_impl = Arc::new(TrackingMemory::default());
3532 let memory: Arc<dyn Memory> = tracking_impl.clone();
3533
3534 let state = AppState {
3535 config: Arc::new(Mutex::new(Config::default())),
3536 provider,
3537 model: "test-model".into(),
3538 temperature: 0.0,
3539 mem: memory,
3540 auto_save: true,
3541 webhook_secret_hash: None,
3542 pairing: Arc::new(PairingGuard::new(false, &[])),
3543 trust_forwarded_headers: false,
3544 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3545 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3546 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3547 whatsapp: None,
3548 whatsapp_app_secret: None,
3549 linq: None,
3550 linq_signing_secret: None,
3551 nextcloud_talk: None,
3552 nextcloud_talk_webhook_secret: None,
3553 wati: None,
3554 gmail_push: None,
3555 observer: Arc::new(crate::observability::NoopObserver),
3556 tools_registry: Arc::new(Vec::new()),
3557 cost_tracker: None,
3558 audit_logger: None,
3559 event_tx: tokio::sync::broadcast::channel(16).0,
3560 shutdown_tx: tokio::sync::watch::channel(false).0,
3561 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3562 path_prefix: String::new(),
3563 session_backend: None,
3564 session_queue: std::sync::Arc::new(
3565 crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3566 ),
3567 device_registry: None,
3568 pending_pairings: None,
3569 canvas_store: CanvasStore::new(),
3570 mcp_registry: None,
3571 approval_registry: approval_registry::global(),
3572 mcp_local_url: None,
3573 #[cfg(feature = "webauthn")]
3574 webauthn: None,
3575 };
3576
3577 let headers = HeaderMap::new();
3578
3579 let body1 = Ok(Json(WebhookBody {
3580 message: "hello one".into(),
3581 }));
3582 let first = handle_webhook(
3583 State(state.clone()),
3584 test_connect_info(),
3585 headers.clone(),
3586 body1,
3587 )
3588 .await
3589 .into_response();
3590 assert_eq!(first.status(), StatusCode::OK);
3591
3592 let body2 = Ok(Json(WebhookBody {
3593 message: "hello two".into(),
3594 }));
3595 let second = handle_webhook(State(state), test_connect_info(), headers, body2)
3596 .await
3597 .into_response();
3598 assert_eq!(second.status(), StatusCode::OK);
3599
3600 let keys = tracking_impl.keys.lock().clone();
3601 assert_eq!(keys.len(), 2);
3602 assert_ne!(keys[0], keys[1]);
3603 assert!(keys[0].starts_with("webhook_msg_"));
3604 assert!(keys[1].starts_with("webhook_msg_"));
3605 assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 2);
3606 }
3607
3608 #[test]
3609 fn webhook_secret_hash_is_deterministic_and_nonempty() {
3610 let secret_a = generate_test_secret();
3611 let secret_b = generate_test_secret();
3612 let one = hash_webhook_secret(&secret_a);
3613 let two = hash_webhook_secret(&secret_a);
3614 let other = hash_webhook_secret(&secret_b);
3615
3616 assert_eq!(one, two);
3617 assert_ne!(one, other);
3618 assert_eq!(one.len(), 64);
3619 }
3620
3621 #[tokio::test]
3622 async fn webhook_secret_hash_rejects_missing_header() {
3623 let provider_impl = Arc::new(MockProvider::default());
3624 let provider: Arc<dyn Provider> = provider_impl.clone();
3625 let memory: Arc<dyn Memory> = Arc::new(MockMemory);
3626 let secret = generate_test_secret();
3627
3628 let state = AppState {
3629 config: Arc::new(Mutex::new(Config::default())),
3630 provider,
3631 model: "test-model".into(),
3632 temperature: 0.0,
3633 mem: memory,
3634 auto_save: false,
3635 webhook_secret_hash: Some(Arc::from(hash_webhook_secret(&secret))),
3636 pairing: Arc::new(PairingGuard::new(false, &[])),
3637 trust_forwarded_headers: false,
3638 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3639 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3640 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3641 whatsapp: None,
3642 whatsapp_app_secret: None,
3643 linq: None,
3644 linq_signing_secret: None,
3645 nextcloud_talk: None,
3646 nextcloud_talk_webhook_secret: None,
3647 wati: None,
3648 gmail_push: None,
3649 observer: Arc::new(crate::observability::NoopObserver),
3650 tools_registry: Arc::new(Vec::new()),
3651 cost_tracker: None,
3652 audit_logger: None,
3653 event_tx: tokio::sync::broadcast::channel(16).0,
3654 shutdown_tx: tokio::sync::watch::channel(false).0,
3655 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3656 path_prefix: String::new(),
3657 session_backend: None,
3658 session_queue: std::sync::Arc::new(
3659 crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3660 ),
3661 device_registry: None,
3662 pending_pairings: None,
3663 canvas_store: CanvasStore::new(),
3664 mcp_registry: None,
3665 approval_registry: approval_registry::global(),
3666 mcp_local_url: None,
3667 #[cfg(feature = "webauthn")]
3668 webauthn: None,
3669 };
3670
3671 let response = handle_webhook(
3672 State(state),
3673 test_connect_info(),
3674 HeaderMap::new(),
3675 Ok(Json(WebhookBody {
3676 message: "hello".into(),
3677 })),
3678 )
3679 .await
3680 .into_response();
3681
3682 assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
3683 assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 0);
3684 }
3685
3686 #[tokio::test]
3687 async fn webhook_secret_hash_rejects_invalid_header() {
3688 let provider_impl = Arc::new(MockProvider::default());
3689 let provider: Arc<dyn Provider> = provider_impl.clone();
3690 let memory: Arc<dyn Memory> = Arc::new(MockMemory);
3691 let valid_secret = generate_test_secret();
3692 let wrong_secret = generate_test_secret();
3693
3694 let state = AppState {
3695 config: Arc::new(Mutex::new(Config::default())),
3696 provider,
3697 model: "test-model".into(),
3698 temperature: 0.0,
3699 mem: memory,
3700 auto_save: false,
3701 webhook_secret_hash: Some(Arc::from(hash_webhook_secret(&valid_secret))),
3702 pairing: Arc::new(PairingGuard::new(false, &[])),
3703 trust_forwarded_headers: false,
3704 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3705 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3706 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3707 whatsapp: None,
3708 whatsapp_app_secret: None,
3709 linq: None,
3710 linq_signing_secret: None,
3711 nextcloud_talk: None,
3712 nextcloud_talk_webhook_secret: None,
3713 wati: None,
3714 gmail_push: None,
3715 observer: Arc::new(crate::observability::NoopObserver),
3716 tools_registry: Arc::new(Vec::new()),
3717 cost_tracker: None,
3718 audit_logger: None,
3719 event_tx: tokio::sync::broadcast::channel(16).0,
3720 shutdown_tx: tokio::sync::watch::channel(false).0,
3721 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3722 path_prefix: String::new(),
3723 session_backend: None,
3724 session_queue: std::sync::Arc::new(
3725 crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3726 ),
3727 device_registry: None,
3728 pending_pairings: None,
3729 canvas_store: CanvasStore::new(),
3730 mcp_registry: None,
3731 approval_registry: approval_registry::global(),
3732 mcp_local_url: None,
3733 #[cfg(feature = "webauthn")]
3734 webauthn: None,
3735 };
3736
3737 let mut headers = HeaderMap::new();
3738 headers.insert(
3739 "X-Webhook-Secret",
3740 HeaderValue::from_str(&wrong_secret).unwrap(),
3741 );
3742
3743 let response = handle_webhook(
3744 State(state),
3745 test_connect_info(),
3746 headers,
3747 Ok(Json(WebhookBody {
3748 message: "hello".into(),
3749 })),
3750 )
3751 .await
3752 .into_response();
3753
3754 assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
3755 assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 0);
3756 }
3757
3758 #[tokio::test]
3759 async fn webhook_secret_hash_accepts_valid_header() {
3760 let provider_impl = Arc::new(MockProvider::default());
3761 let provider: Arc<dyn Provider> = provider_impl.clone();
3762 let memory: Arc<dyn Memory> = Arc::new(MockMemory);
3763 let secret = generate_test_secret();
3764
3765 let state = AppState {
3766 config: Arc::new(Mutex::new(Config::default())),
3767 provider,
3768 model: "test-model".into(),
3769 temperature: 0.0,
3770 mem: memory,
3771 auto_save: false,
3772 webhook_secret_hash: Some(Arc::from(hash_webhook_secret(&secret))),
3773 pairing: Arc::new(PairingGuard::new(false, &[])),
3774 trust_forwarded_headers: false,
3775 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3776 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3777 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3778 whatsapp: None,
3779 whatsapp_app_secret: None,
3780 linq: None,
3781 linq_signing_secret: None,
3782 nextcloud_talk: None,
3783 nextcloud_talk_webhook_secret: None,
3784 wati: None,
3785 gmail_push: None,
3786 observer: Arc::new(crate::observability::NoopObserver),
3787 tools_registry: Arc::new(Vec::new()),
3788 cost_tracker: None,
3789 audit_logger: None,
3790 event_tx: tokio::sync::broadcast::channel(16).0,
3791 shutdown_tx: tokio::sync::watch::channel(false).0,
3792 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3793 path_prefix: String::new(),
3794 session_backend: None,
3795 session_queue: std::sync::Arc::new(
3796 crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3797 ),
3798 device_registry: None,
3799 pending_pairings: None,
3800 canvas_store: CanvasStore::new(),
3801 mcp_registry: None,
3802 approval_registry: approval_registry::global(),
3803 mcp_local_url: None,
3804 #[cfg(feature = "webauthn")]
3805 webauthn: None,
3806 };
3807
3808 let mut headers = HeaderMap::new();
3809 headers.insert("X-Webhook-Secret", HeaderValue::from_str(&secret).unwrap());
3810
3811 let response = handle_webhook(
3812 State(state),
3813 test_connect_info(),
3814 headers,
3815 Ok(Json(WebhookBody {
3816 message: "hello".into(),
3817 })),
3818 )
3819 .await
3820 .into_response();
3821
3822 assert_eq!(response.status(), StatusCode::OK);
3823 assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 1);
3824 }
3825
3826 fn compute_nextcloud_signature_hex(secret: &str, random: &str, body: &str) -> String {
3827 use hmac::{Hmac, Mac};
3828 use sha2::Sha256;
3829
3830 let payload = format!("{random}{body}");
3831 let mut mac = Hmac::<Sha256>::new_from_slice(secret.as_bytes()).unwrap();
3832 mac.update(payload.as_bytes());
3833 hex::encode(mac.finalize().into_bytes())
3834 }
3835
3836 #[tokio::test]
3837 async fn nextcloud_talk_webhook_returns_not_found_when_not_configured() {
3838 let provider: Arc<dyn Provider> = Arc::new(MockProvider::default());
3839 let memory: Arc<dyn Memory> = Arc::new(MockMemory);
3840
3841 let state = AppState {
3842 config: Arc::new(Mutex::new(Config::default())),
3843 provider,
3844 model: "test-model".into(),
3845 temperature: 0.0,
3846 mem: memory,
3847 auto_save: false,
3848 webhook_secret_hash: None,
3849 pairing: Arc::new(PairingGuard::new(false, &[])),
3850 trust_forwarded_headers: false,
3851 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3852 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3853 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3854 whatsapp: None,
3855 whatsapp_app_secret: None,
3856 linq: None,
3857 linq_signing_secret: None,
3858 nextcloud_talk: None,
3859 nextcloud_talk_webhook_secret: None,
3860 wati: None,
3861 gmail_push: None,
3862 observer: Arc::new(crate::observability::NoopObserver),
3863 tools_registry: Arc::new(Vec::new()),
3864 cost_tracker: None,
3865 audit_logger: None,
3866 event_tx: tokio::sync::broadcast::channel(16).0,
3867 shutdown_tx: tokio::sync::watch::channel(false).0,
3868 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3869 path_prefix: String::new(),
3870 session_backend: None,
3871 session_queue: std::sync::Arc::new(
3872 crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3873 ),
3874 device_registry: None,
3875 pending_pairings: None,
3876 canvas_store: CanvasStore::new(),
3877 mcp_registry: None,
3878 approval_registry: approval_registry::global(),
3879 mcp_local_url: None,
3880 #[cfg(feature = "webauthn")]
3881 webauthn: None,
3882 };
3883
3884 let response = Box::pin(handle_nextcloud_talk_webhook(
3885 State(state),
3886 HeaderMap::new(),
3887 Bytes::from_static(br#"{"type":"message"}"#),
3888 ))
3889 .await
3890 .into_response();
3891
3892 assert_eq!(response.status(), StatusCode::NOT_FOUND);
3893 }
3894
3895 #[tokio::test]
3896 async fn nextcloud_talk_webhook_rejects_invalid_signature() {
3897 let provider_impl = Arc::new(MockProvider::default());
3898 let provider: Arc<dyn Provider> = provider_impl.clone();
3899 let memory: Arc<dyn Memory> = Arc::new(MockMemory);
3900
3901 let channel = Arc::new(NextcloudTalkChannel::new(
3902 "https://cloud.example.com".into(),
3903 "app-token".into(),
3904 String::new(),
3905 vec!["*".into()],
3906 ));
3907
3908 let secret = "nextcloud-test-secret";
3909 let random = "seed-value";
3910 let body = r#"{"type":"message","object":{"token":"room-token"},"message":{"actorType":"users","actorId":"user_a","message":"hello"}}"#;
3911 let _valid_signature = compute_nextcloud_signature_hex(secret, random, body);
3912 let invalid_signature = "deadbeef";
3913
3914 let state = AppState {
3915 config: Arc::new(Mutex::new(Config::default())),
3916 provider,
3917 model: "test-model".into(),
3918 temperature: 0.0,
3919 mem: memory,
3920 auto_save: false,
3921 webhook_secret_hash: None,
3922 pairing: Arc::new(PairingGuard::new(false, &[])),
3923 trust_forwarded_headers: false,
3924 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3925 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3926 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3927 whatsapp: None,
3928 whatsapp_app_secret: None,
3929 linq: None,
3930 linq_signing_secret: None,
3931 nextcloud_talk: Some(channel),
3932 nextcloud_talk_webhook_secret: Some(Arc::from(secret)),
3933 wati: None,
3934 gmail_push: None,
3935 observer: Arc::new(crate::observability::NoopObserver),
3936 tools_registry: Arc::new(Vec::new()),
3937 cost_tracker: None,
3938 audit_logger: None,
3939 event_tx: tokio::sync::broadcast::channel(16).0,
3940 shutdown_tx: tokio::sync::watch::channel(false).0,
3941 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3942 path_prefix: String::new(),
3943 session_backend: None,
3944 session_queue: std::sync::Arc::new(
3945 crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3946 ),
3947 device_registry: None,
3948 pending_pairings: None,
3949 canvas_store: CanvasStore::new(),
3950 mcp_registry: None,
3951 approval_registry: approval_registry::global(),
3952 mcp_local_url: None,
3953 #[cfg(feature = "webauthn")]
3954 webauthn: None,
3955 };
3956
3957 let mut headers = HeaderMap::new();
3958 headers.insert(
3959 "X-Nextcloud-Talk-Random",
3960 HeaderValue::from_str(random).unwrap(),
3961 );
3962 headers.insert(
3963 "X-Nextcloud-Talk-Signature",
3964 HeaderValue::from_str(invalid_signature).unwrap(),
3965 );
3966
3967 let response = Box::pin(handle_nextcloud_talk_webhook(
3968 State(state),
3969 headers,
3970 Bytes::from(body),
3971 ))
3972 .await
3973 .into_response();
3974 assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
3975 assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 0);
3976 }
3977
3978 fn compute_whatsapp_signature_hex(secret: &str, body: &[u8]) -> String {
3983 use hmac::{Hmac, Mac};
3984 use sha2::Sha256;
3985
3986 let mut mac = Hmac::<Sha256>::new_from_slice(secret.as_bytes()).unwrap();
3987 mac.update(body);
3988 hex::encode(mac.finalize().into_bytes())
3989 }
3990
3991 fn compute_whatsapp_signature_header(secret: &str, body: &[u8]) -> String {
3992 format!("sha256={}", compute_whatsapp_signature_hex(secret, body))
3993 }
3994
3995 #[test]
3996 fn whatsapp_signature_valid() {
3997 let app_secret = generate_test_secret();
3998 let body = b"test body content";
3999
4000 let signature_header = compute_whatsapp_signature_header(&app_secret, body);
4001
4002 assert!(verify_whatsapp_signature(
4003 &app_secret,
4004 body,
4005 &signature_header
4006 ));
4007 }
4008
4009 #[test]
4010 fn whatsapp_signature_invalid_wrong_secret() {
4011 let app_secret = generate_test_secret();
4012 let wrong_secret = generate_test_secret();
4013 let body = b"test body content";
4014
4015 let signature_header = compute_whatsapp_signature_header(&wrong_secret, body);
4016
4017 assert!(!verify_whatsapp_signature(
4018 &app_secret,
4019 body,
4020 &signature_header
4021 ));
4022 }
4023
4024 #[test]
4025 fn whatsapp_signature_invalid_wrong_body() {
4026 let app_secret = generate_test_secret();
4027 let original_body = b"original body";
4028 let tampered_body = b"tampered body";
4029
4030 let signature_header = compute_whatsapp_signature_header(&app_secret, original_body);
4031
4032 assert!(!verify_whatsapp_signature(
4034 &app_secret,
4035 tampered_body,
4036 &signature_header
4037 ));
4038 }
4039
4040 #[test]
4041 fn whatsapp_signature_missing_prefix() {
4042 let app_secret = generate_test_secret();
4043 let body = b"test body";
4044
4045 let signature_header = "abc123def456";
4047
4048 assert!(!verify_whatsapp_signature(
4049 &app_secret,
4050 body,
4051 signature_header
4052 ));
4053 }
4054
4055 #[test]
4056 fn whatsapp_signature_empty_header() {
4057 let app_secret = generate_test_secret();
4058 let body = b"test body";
4059
4060 assert!(!verify_whatsapp_signature(&app_secret, body, ""));
4061 }
4062
4063 #[test]
4064 fn whatsapp_signature_invalid_hex() {
4065 let app_secret = generate_test_secret();
4066 let body = b"test body";
4067
4068 let signature_header = "sha256=not_valid_hex_zzz";
4070
4071 assert!(!verify_whatsapp_signature(
4072 &app_secret,
4073 body,
4074 signature_header
4075 ));
4076 }
4077
4078 #[test]
4079 fn whatsapp_signature_empty_body() {
4080 let app_secret = generate_test_secret();
4081 let body = b"";
4082
4083 let signature_header = compute_whatsapp_signature_header(&app_secret, body);
4084
4085 assert!(verify_whatsapp_signature(
4086 &app_secret,
4087 body,
4088 &signature_header
4089 ));
4090 }
4091
4092 #[test]
4093 fn whatsapp_signature_unicode_body() {
4094 let app_secret = generate_test_secret();
4095 let body = "Hello 🦀 World".as_bytes();
4096
4097 let signature_header = compute_whatsapp_signature_header(&app_secret, body);
4098
4099 assert!(verify_whatsapp_signature(
4100 &app_secret,
4101 body,
4102 &signature_header
4103 ));
4104 }
4105
4106 #[test]
4107 fn whatsapp_signature_json_payload() {
4108 let app_secret = generate_test_secret();
4109 let body = br#"{"entry":[{"changes":[{"value":{"messages":[{"from":"1234567890","text":{"body":"Hello"}}]}}]}]}"#;
4110
4111 let signature_header = compute_whatsapp_signature_header(&app_secret, body);
4112
4113 assert!(verify_whatsapp_signature(
4114 &app_secret,
4115 body,
4116 &signature_header
4117 ));
4118 }
4119
4120 #[test]
4121 fn whatsapp_signature_case_sensitive_prefix() {
4122 let app_secret = generate_test_secret();
4123 let body = b"test body";
4124
4125 let hex_sig = compute_whatsapp_signature_hex(&app_secret, body);
4126
4127 let wrong_prefix = format!("SHA256={hex_sig}");
4129 assert!(!verify_whatsapp_signature(&app_secret, body, &wrong_prefix));
4130
4131 let correct_prefix = format!("sha256={hex_sig}");
4133 assert!(verify_whatsapp_signature(
4134 &app_secret,
4135 body,
4136 &correct_prefix
4137 ));
4138 }
4139
4140 #[test]
4141 fn whatsapp_signature_truncated_hex() {
4142 let app_secret = generate_test_secret();
4143 let body = b"test body";
4144
4145 let hex_sig = compute_whatsapp_signature_hex(&app_secret, body);
4146 let truncated = &hex_sig[..32]; let signature_header = format!("sha256={truncated}");
4148
4149 assert!(!verify_whatsapp_signature(
4150 &app_secret,
4151 body,
4152 &signature_header
4153 ));
4154 }
4155
4156 #[test]
4157 fn whatsapp_signature_extra_bytes() {
4158 let app_secret = generate_test_secret();
4159 let body = b"test body";
4160
4161 let hex_sig = compute_whatsapp_signature_hex(&app_secret, body);
4162 let extended = format!("{hex_sig}deadbeef");
4163 let signature_header = format!("sha256={extended}");
4164
4165 assert!(!verify_whatsapp_signature(
4166 &app_secret,
4167 body,
4168 &signature_header
4169 ));
4170 }
4171
4172 #[test]
4177 fn idempotency_store_allows_different_keys() {
4178 let store = IdempotencyStore::new(Duration::from_secs(60), 100);
4179 assert!(store.record_if_new("key-a"));
4180 assert!(store.record_if_new("key-b"));
4181 assert!(store.record_if_new("key-c"));
4182 assert!(store.record_if_new("key-d"));
4183 }
4184
4185 #[test]
4186 fn idempotency_store_max_keys_clamped_to_one() {
4187 let store = IdempotencyStore::new(Duration::from_secs(60), 0);
4188 assert!(store.record_if_new("only-key"));
4189 assert!(!store.record_if_new("only-key"));
4190 }
4191
4192 #[test]
4193 fn idempotency_store_rapid_duplicate_rejected() {
4194 let store = IdempotencyStore::new(Duration::from_secs(300), 100);
4195 assert!(store.record_if_new("rapid"));
4196 assert!(!store.record_if_new("rapid"));
4197 }
4198
4199 #[test]
4200 fn idempotency_store_accepts_after_ttl_expires() {
4201 let store = IdempotencyStore::new(Duration::from_millis(1), 100);
4202 assert!(store.record_if_new("ttl-key"));
4203 std::thread::sleep(Duration::from_millis(10));
4204 assert!(store.record_if_new("ttl-key"));
4205 }
4206
4207 #[test]
4208 fn idempotency_store_eviction_preserves_newest() {
4209 let store = IdempotencyStore::new(Duration::from_secs(300), 1);
4210 assert!(store.record_if_new("old-key"));
4211 std::thread::sleep(Duration::from_millis(2));
4212 assert!(store.record_if_new("new-key"));
4213
4214 let keys = store.keys.lock();
4215 assert_eq!(keys.len(), 1);
4216 assert!(!keys.contains_key("old-key"));
4217 assert!(keys.contains_key("new-key"));
4218 }
4219
4220 #[test]
4221 fn rate_limiter_allows_after_window_expires() {
4222 let window = Duration::from_millis(50);
4223 let limiter = SlidingWindowRateLimiter::new(2, window, 100);
4224 assert!(limiter.allow("ip-1"));
4225 assert!(limiter.allow("ip-1"));
4226 assert!(!limiter.allow("ip-1")); std::thread::sleep(Duration::from_millis(60));
4230
4231 assert!(limiter.allow("ip-1"));
4233 }
4234
4235 #[test]
4236 fn rate_limiter_independent_keys_tracked_separately() {
4237 let limiter = SlidingWindowRateLimiter::new(2, Duration::from_secs(60), 100);
4238 assert!(limiter.allow("ip-1"));
4239 assert!(limiter.allow("ip-1"));
4240 assert!(!limiter.allow("ip-1")); assert!(limiter.allow("ip-2"));
4244 assert!(limiter.allow("ip-2"));
4245 assert!(!limiter.allow("ip-2")); }
4247
4248 #[test]
4249 fn rate_limiter_exact_boundary_at_max_keys() {
4250 let limiter = SlidingWindowRateLimiter::new(10, Duration::from_secs(60), 3);
4251 assert!(limiter.allow("ip-1"));
4252 assert!(limiter.allow("ip-2"));
4253 assert!(limiter.allow("ip-3"));
4254 assert!(limiter.allow("ip-4")); let guard = limiter.requests.lock();
4258 assert_eq!(guard.0.len(), 3);
4259 assert!(
4260 !guard.0.contains_key("ip-1"),
4261 "ip-1 should have been evicted"
4262 );
4263 assert!(guard.0.contains_key("ip-2"));
4264 assert!(guard.0.contains_key("ip-3"));
4265 assert!(guard.0.contains_key("ip-4"));
4266 }
4267
4268 #[test]
4269 fn gateway_rate_limiter_pair_and_webhook_are_independent() {
4270 let limiter = GatewayRateLimiter::new(2, 3, 100);
4271
4272 assert!(limiter.allow_pair("ip-1"));
4274 assert!(limiter.allow_pair("ip-1"));
4275 assert!(!limiter.allow_pair("ip-1")); assert!(limiter.allow_webhook("ip-1"));
4279 assert!(limiter.allow_webhook("ip-1"));
4280 assert!(limiter.allow_webhook("ip-1"));
4281 assert!(!limiter.allow_webhook("ip-1")); }
4283
4284 #[test]
4285 fn rate_limiter_single_key_max_allows_one_request() {
4286 let limiter = SlidingWindowRateLimiter::new(5, Duration::from_secs(60), 1);
4287 assert!(limiter.allow("ip-1"));
4288 assert!(limiter.allow("ip-2")); let guard = limiter.requests.lock();
4291 assert_eq!(guard.0.len(), 1);
4292 assert!(guard.0.contains_key("ip-2"));
4293 assert!(!guard.0.contains_key("ip-1"));
4294 }
4295
4296 #[test]
4297 fn rate_limiter_concurrent_access_safe() {
4298 use std::sync::Arc;
4299
4300 let limiter = Arc::new(SlidingWindowRateLimiter::new(
4301 1000,
4302 Duration::from_secs(60),
4303 1000,
4304 ));
4305 let mut handles = Vec::new();
4306
4307 for i in 0..10 {
4308 let limiter = limiter.clone();
4309 handles.push(std::thread::spawn(move || {
4310 for j in 0..100 {
4311 limiter.allow(&format!("thread-{i}-req-{j}"));
4312 }
4313 }));
4314 }
4315
4316 for handle in handles {
4317 handle.join().unwrap();
4318 }
4319
4320 let guard = limiter.requests.lock();
4322 assert!(guard.0.len() <= 1000, "should respect max_keys");
4323 }
4324
4325 #[test]
4326 fn idempotency_store_concurrent_access_safe() {
4327 use std::sync::Arc;
4328
4329 let store = Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000));
4330 let mut handles = Vec::new();
4331
4332 for i in 0..10 {
4333 let store = store.clone();
4334 handles.push(std::thread::spawn(move || {
4335 for j in 0..100 {
4336 store.record_if_new(&format!("thread-{i}-key-{j}"));
4337 }
4338 }));
4339 }
4340
4341 for handle in handles {
4342 handle.join().unwrap();
4343 }
4344
4345 let keys = store.keys.lock();
4346 assert!(keys.len() <= 1000, "should respect max_keys");
4347 }
4348
4349 #[test]
4350 fn rate_limiter_rapid_burst_then_cooldown() {
4351 let limiter = SlidingWindowRateLimiter::new(5, Duration::from_millis(50), 100);
4352
4353 for _ in 0..5 {
4355 assert!(limiter.allow("burst-ip"));
4356 }
4357 assert!(!limiter.allow("burst-ip")); std::thread::sleep(Duration::from_millis(60));
4361
4362 assert!(limiter.allow("burst-ip"));
4364 }
4365
4366 #[test]
4367 fn require_localhost_accepts_ipv4_loopback() {
4368 let peer = SocketAddr::from(([127, 0, 0, 1], 12345));
4369 assert!(require_localhost(&peer).is_ok());
4370 }
4371
4372 #[test]
4373 fn require_localhost_accepts_ipv6_loopback() {
4374 let peer = SocketAddr::from((std::net::Ipv6Addr::LOCALHOST, 12345));
4375 assert!(require_localhost(&peer).is_ok());
4376 }
4377
4378 #[test]
4379 fn require_localhost_rejects_non_loopback_ipv4() {
4380 let peer = SocketAddr::from(([192, 168, 1, 100], 12345));
4381 let err = require_localhost(&peer).unwrap_err();
4382 assert_eq!(err.0, StatusCode::FORBIDDEN);
4383 }
4384
4385 #[test]
4386 fn require_localhost_rejects_non_loopback_ipv6() {
4387 let peer = SocketAddr::from((
4388 std::net::Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1),
4389 12345,
4390 ));
4391 let err = require_localhost(&peer).unwrap_err();
4392 assert_eq!(err.0, StatusCode::FORBIDDEN);
4393 }
4394}