1pub mod api;
11pub mod api_agents;
12pub mod api_artifact_body;
13pub mod api_clawhub;
14pub mod api_kumiho_proxy;
15pub mod api_mcp;
16pub mod api_memory_graph;
17pub mod api_pairing;
18#[cfg(feature = "plugins-wasm")]
19pub mod api_plugins;
20pub mod api_skills;
21pub mod api_teams;
22#[cfg(feature = "webauthn")]
23pub mod api_webauthn;
24pub mod api_workflows;
25pub mod approval_registry;
26pub mod auth_rate_limit;
27pub mod canvas;
28pub mod kumiho_client;
29pub mod mcp_discovery;
30pub mod nodes;
31pub mod session_queue;
32pub mod sse;
33pub mod static_files;
34#[cfg(not(target_os = "android"))]
38pub mod terminal;
39pub mod tls;
40pub mod ws;
41pub mod ws_mcp_events;
42
43use crate::channels::{
44 Channel, GmailPushChannel, LinqChannel, NextcloudTalkChannel, SendMessage, WatiChannel,
45 WhatsAppChannel, session_backend::SessionBackend, session_sqlite::SqliteSessionBackend,
46};
47use crate::config::Config;
48use crate::cost::CostTracker;
49use crate::memory::{self, Memory, MemoryCategory};
50use crate::providers::{self, ChatMessage, Provider};
51use crate::runtime;
52use crate::security::SecurityPolicy;
53use crate::security::pairing::{PairingGuard, constant_time_eq, is_public_bind};
54use crate::tools;
55use crate::tools::canvas::CanvasStore;
56use crate::tools::traits::ToolSpec;
57use crate::util::truncate_with_ellipsis;
58use anyhow::{Context, Result};
59use axum::{
60 Router,
61 body::Bytes,
62 extract::{ConnectInfo, Query, State},
63 http::{HeaderMap, StatusCode, header},
64 response::{IntoResponse, Json},
65 routing::{delete, get, post, put},
66};
67use parking_lot::Mutex;
68use std::collections::HashMap;
69use std::net::{IpAddr, SocketAddr};
70use std::sync::Arc;
71use std::time::{Duration, Instant};
72use tower_http::limit::RequestBodyLimitLayer;
73use tower_http::timeout::TimeoutLayer;
74use uuid::Uuid;
75
76pub const MAX_BODY_SIZE: usize = 65_536;
78pub const REQUEST_TIMEOUT_SECS: u64 = 30;
80
81pub fn gateway_request_timeout_secs() -> u64 {
88 std::env::var("CONSTRUCT_GATEWAY_TIMEOUT_SECS")
89 .ok()
90 .and_then(|v| v.parse().ok())
91 .unwrap_or(REQUEST_TIMEOUT_SECS)
92}
93pub const RATE_LIMIT_WINDOW_SECS: u64 = 60;
95pub const RATE_LIMIT_MAX_KEYS_DEFAULT: usize = 10_000;
97pub const IDEMPOTENCY_MAX_KEYS_DEFAULT: usize = 10_000;
99
100fn webhook_memory_key() -> String {
101 format!("webhook_msg_{}", Uuid::new_v4())
102}
103
104fn whatsapp_memory_key(msg: &crate::channels::traits::ChannelMessage) -> String {
105 format!("whatsapp_{}_{}", msg.sender, msg.id)
106}
107
108fn linq_memory_key(msg: &crate::channels::traits::ChannelMessage) -> String {
109 format!("linq_{}_{}", msg.sender, msg.id)
110}
111
112fn wati_memory_key(msg: &crate::channels::traits::ChannelMessage) -> String {
113 format!("wati_{}_{}", msg.sender, msg.id)
114}
115
116fn nextcloud_talk_memory_key(msg: &crate::channels::traits::ChannelMessage) -> String {
117 format!("nextcloud_talk_{}_{}", msg.sender, msg.id)
118}
119
120fn sender_session_id(channel: &str, msg: &crate::channels::traits::ChannelMessage) -> String {
121 match &msg.thread_ts {
122 Some(thread_id) => format!("{channel}_{thread_id}_{}", msg.sender),
123 None => format!("{channel}_{}", msg.sender),
124 }
125}
126
127fn webhook_session_id(headers: &HeaderMap) -> Option<String> {
128 headers
129 .get("X-Session-Id")
130 .and_then(|v| v.to_str().ok())
131 .map(str::trim)
132 .filter(|value| !value.is_empty())
133 .map(str::to_owned)
134}
135
136fn hash_webhook_secret(value: &str) -> String {
137 use sha2::{Digest, Sha256};
138
139 let digest = Sha256::digest(value.as_bytes());
140 hex::encode(digest)
141}
142
143const RATE_LIMITER_SWEEP_INTERVAL_SECS: u64 = 300; #[derive(Debug)]
147struct SlidingWindowRateLimiter {
148 limit_per_window: u32,
149 window: Duration,
150 max_keys: usize,
151 requests: Mutex<(HashMap<String, Vec<Instant>>, Instant)>,
152}
153
154impl SlidingWindowRateLimiter {
155 fn new(limit_per_window: u32, window: Duration, max_keys: usize) -> Self {
156 Self {
157 limit_per_window,
158 window,
159 max_keys: max_keys.max(1),
160 requests: Mutex::new((HashMap::new(), Instant::now())),
161 }
162 }
163
164 fn prune_stale(requests: &mut HashMap<String, Vec<Instant>>, cutoff: Instant) {
165 requests.retain(|_, timestamps| {
166 timestamps.retain(|t| *t > cutoff);
167 !timestamps.is_empty()
168 });
169 }
170
171 fn allow(&self, key: &str) -> bool {
172 if self.limit_per_window == 0 {
173 return true;
174 }
175
176 let now = Instant::now();
177 let cutoff = now.checked_sub(self.window).unwrap_or_else(Instant::now);
178
179 let mut guard = self.requests.lock();
180 let (requests, last_sweep) = &mut *guard;
181
182 if last_sweep.elapsed() >= Duration::from_secs(RATE_LIMITER_SWEEP_INTERVAL_SECS) {
184 Self::prune_stale(requests, cutoff);
185 *last_sweep = now;
186 }
187
188 if !requests.contains_key(key) && requests.len() >= self.max_keys {
189 Self::prune_stale(requests, cutoff);
191 *last_sweep = now;
192
193 if requests.len() >= self.max_keys {
194 let evict_key = requests
195 .iter()
196 .min_by_key(|(_, timestamps)| timestamps.last().copied().unwrap_or(cutoff))
197 .map(|(k, _)| k.clone());
198 if let Some(evict_key) = evict_key {
199 requests.remove(&evict_key);
200 }
201 }
202 }
203
204 let entry = requests.entry(key.to_owned()).or_default();
205 entry.retain(|instant| *instant > cutoff);
206
207 if entry.len() >= self.limit_per_window as usize {
208 return false;
209 }
210
211 entry.push(now);
212 true
213 }
214}
215
216#[derive(Debug)]
217pub struct GatewayRateLimiter {
218 pair: SlidingWindowRateLimiter,
219 webhook: SlidingWindowRateLimiter,
220}
221
222impl GatewayRateLimiter {
223 fn new(pair_per_minute: u32, webhook_per_minute: u32, max_keys: usize) -> Self {
224 let window = Duration::from_secs(RATE_LIMIT_WINDOW_SECS);
225 Self {
226 pair: SlidingWindowRateLimiter::new(pair_per_minute, window, max_keys),
227 webhook: SlidingWindowRateLimiter::new(webhook_per_minute, window, max_keys),
228 }
229 }
230
231 fn allow_pair(&self, key: &str) -> bool {
232 self.pair.allow(key)
233 }
234
235 fn allow_webhook(&self, key: &str) -> bool {
236 self.webhook.allow(key)
237 }
238}
239
240#[derive(Debug)]
241pub struct IdempotencyStore {
242 ttl: Duration,
243 max_keys: usize,
244 keys: Mutex<HashMap<String, Instant>>,
245}
246
247impl IdempotencyStore {
248 fn new(ttl: Duration, max_keys: usize) -> Self {
249 Self {
250 ttl,
251 max_keys: max_keys.max(1),
252 keys: Mutex::new(HashMap::new()),
253 }
254 }
255
256 fn record_if_new(&self, key: &str) -> bool {
258 let now = Instant::now();
259 let mut keys = self.keys.lock();
260
261 keys.retain(|_, seen_at| now.duration_since(*seen_at) < self.ttl);
262
263 if keys.contains_key(key) {
264 return false;
265 }
266
267 if keys.len() >= self.max_keys {
268 let evict_key = keys
269 .iter()
270 .min_by_key(|(_, seen_at)| *seen_at)
271 .map(|(k, _)| k.clone());
272 if let Some(evict_key) = evict_key {
273 keys.remove(&evict_key);
274 }
275 }
276
277 keys.insert(key.to_owned(), now);
278 true
279 }
280}
281
282fn parse_client_ip(value: &str) -> Option<IpAddr> {
283 let value = value.trim().trim_matches('"').trim();
284 if value.is_empty() {
285 return None;
286 }
287
288 if let Ok(ip) = value.parse::<IpAddr>() {
289 return Some(ip);
290 }
291
292 if let Ok(addr) = value.parse::<SocketAddr>() {
293 return Some(addr.ip());
294 }
295
296 let value = value.trim_matches(['[', ']']);
297 value.parse::<IpAddr>().ok()
298}
299
300fn forwarded_client_ip(headers: &HeaderMap) -> Option<IpAddr> {
301 if let Some(xff) = headers.get("X-Forwarded-For").and_then(|v| v.to_str().ok()) {
306 for candidate in xff.rsplit(',') {
307 if let Some(ip) = parse_client_ip(candidate) {
308 return Some(ip);
309 }
310 }
311 }
312
313 headers
314 .get("X-Real-IP")
315 .and_then(|v| v.to_str().ok())
316 .and_then(parse_client_ip)
317}
318
319pub(super) fn client_key_from_request(
320 peer_addr: Option<SocketAddr>,
321 headers: &HeaderMap,
322 trust_forwarded_headers: bool,
323) -> String {
324 if trust_forwarded_headers {
325 if let Some(ip) = forwarded_client_ip(headers) {
326 return ip.to_string();
327 }
328 }
329
330 peer_addr
331 .map(|addr| addr.ip().to_string())
332 .unwrap_or_else(|| "unknown".to_string())
333}
334
335fn normalize_max_keys(configured: usize, fallback: usize) -> usize {
336 if configured == 0 {
337 fallback.max(1)
338 } else {
339 configured
340 }
341}
342
343#[derive(Clone)]
345pub struct AppState {
346 pub config: Arc<Mutex<Config>>,
347 pub provider: Arc<dyn Provider>,
348 pub model: String,
349 pub temperature: f64,
350 pub mem: Arc<dyn Memory>,
351 pub auto_save: bool,
352 pub webhook_secret_hash: Option<Arc<str>>,
354 pub pairing: Arc<PairingGuard>,
355 pub trust_forwarded_headers: bool,
356 pub rate_limiter: Arc<GatewayRateLimiter>,
357 pub auth_limiter: Arc<auth_rate_limit::AuthRateLimiter>,
358 pub idempotency_store: Arc<IdempotencyStore>,
359 pub whatsapp: Option<Arc<WhatsAppChannel>>,
360 pub whatsapp_app_secret: Option<Arc<str>>,
362 pub linq: Option<Arc<LinqChannel>>,
363 pub linq_signing_secret: Option<Arc<str>>,
365 pub nextcloud_talk: Option<Arc<NextcloudTalkChannel>>,
366 pub nextcloud_talk_webhook_secret: Option<Arc<str>>,
368 pub wati: Option<Arc<WatiChannel>>,
369 pub gmail_push: Option<Arc<GmailPushChannel>>,
371 pub observer: Arc<dyn crate::observability::Observer>,
373 pub tools_registry: Arc<Vec<ToolSpec>>,
375 pub cost_tracker: Option<Arc<CostTracker>>,
377 pub audit_logger: Option<Arc<crate::security::audit::AuditLogger>>,
379 pub event_tx: tokio::sync::broadcast::Sender<serde_json::Value>,
381 pub shutdown_tx: tokio::sync::watch::Sender<bool>,
383 pub node_registry: Arc<nodes::NodeRegistry>,
385 pub path_prefix: String,
387 pub session_backend: Option<Arc<dyn SessionBackend>>,
389 pub session_queue: Arc<session_queue::SessionActorQueue>,
391 pub device_registry: Option<Arc<api_pairing::DeviceRegistry>>,
393 pub pending_pairings: Option<Arc<api_pairing::PairingStore>>,
395 pub canvas_store: CanvasStore,
397 pub mcp_registry: Option<Arc<tools::McpRegistry>>,
399 #[cfg(feature = "webauthn")]
401 pub webauthn: Option<Arc<api_webauthn::WebAuthnState>>,
402 pub approval_registry: Arc<approval_registry::ApprovalRegistry>,
404 pub mcp_local_url: Option<Arc<str>>,
409}
410
411#[allow(clippy::too_many_lines)]
413pub async fn run_gateway(host: &str, port: u16, config: Config) -> Result<()> {
414 if is_public_bind(host) && config.tunnel.provider == "none" && !config.gateway.allow_public_bind
416 {
417 tracing::warn!(
418 "⚠️ Binding to {host} — gateway will be exposed to all network interfaces.\n\
419 Suggestion: use --host 127.0.0.1 (default), configure a tunnel, or set\n\
420 [gateway] allow_public_bind = true in config.toml to silence this warning.\n\n\
421 Docker/VM: if you are running inside a container or VM, this is expected."
422 );
423 }
424 let config_state = Arc::new(Mutex::new(config.clone()));
425
426 let hooks: Option<std::sync::Arc<crate::hooks::HookRunner>> = if config.hooks.enabled {
428 Some(std::sync::Arc::new(crate::hooks::HookRunner::new()))
429 } else {
430 None
431 };
432
433 let addr: SocketAddr = format!("{host}:{port}").parse()?;
434 let listener = tokio::net::TcpListener::bind(addr).await?;
435 let actual_port = listener.local_addr()?.port();
436 let display_addr = format!("{host}:{actual_port}");
437
438 let provider: Arc<dyn Provider> = Arc::from(providers::create_resilient_provider_with_options(
439 config.default_provider.as_deref().unwrap_or("openrouter"),
440 config.api_key.as_deref(),
441 config.api_url.as_deref(),
442 &config.reliability,
443 &providers::ProviderRuntimeOptions {
444 auth_profile_override: None,
445 provider_api_url: config.api_url.clone(),
446 construct_dir: config.config_path.parent().map(std::path::PathBuf::from),
447 secrets_encrypt: config.secrets.encrypt,
448 reasoning_enabled: config.runtime.reasoning_enabled,
449 reasoning_effort: config.runtime.reasoning_effort.clone(),
450 provider_timeout_secs: Some(config.provider_timeout_secs),
451 extra_headers: config.extra_headers.clone(),
452 api_path: config.api_path.clone(),
453 provider_max_tokens: config.provider_max_tokens,
454 },
455 )?);
456 let model = config
457 .default_model
458 .clone()
459 .unwrap_or_else(|| "anthropic/claude-sonnet-4".into());
460 let temperature = config.default_temperature;
461 let mem: Arc<dyn Memory> = Arc::from(memory::create_memory_with_storage_and_routes(
462 &config.memory,
463 &config.embedding_routes,
464 Some(&config.storage.provider.config),
465 &config.workspace_dir,
466 config.api_key.as_deref(),
467 )?);
468 let runtime: Arc<dyn runtime::RuntimeAdapter> =
469 Arc::from(runtime::create_runtime(&config.runtime)?);
470 let security = Arc::new(SecurityPolicy::from_config(
471 &config.autonomy,
472 &config.workspace_dir,
473 ));
474
475 let (composio_key, composio_entity_id) = if config.composio.enabled {
476 (
477 config.composio.api_key.as_deref(),
478 Some(config.composio.entity_id.as_str()),
479 )
480 } else {
481 (None, None)
482 };
483
484 let canvas_store = tools::canvas::global_store();
485
486 let (
487 mut tools_registry_raw,
488 delegate_handle_gw,
489 _reaction_handle_gw,
490 _channel_map_handle,
491 _ask_user_handle_gw,
492 _escalate_handle_gw,
493 ) = tools::all_tools_with_runtime(
494 Arc::new(config.clone()),
495 &security,
496 runtime,
497 Arc::clone(&mem),
498 composio_key,
499 composio_entity_id,
500 &config.browser,
501 &config.http_request,
502 &config.web_fetch,
503 &config.workspace_dir,
504 &config.agents,
505 config.api_key.as_deref(),
506 &config,
507 Some(canvas_store.clone()),
508 );
509
510 let gateway_mcp_config = {
515 let mut c = config.clone();
516 c = crate::agent::kumiho::inject_kumiho(c, false);
517 c = crate::agent::operator::inject_operator(c, false);
518 c.mcp
519 };
520 let mut mcp_registry_shared: Option<Arc<tools::McpRegistry>> = None;
521 if gateway_mcp_config.enabled && !gateway_mcp_config.servers.is_empty() {
522 tracing::info!(
523 "Gateway: initializing MCP client — {} server(s) configured",
524 gateway_mcp_config.servers.len()
525 );
526 match tools::McpRegistry::connect_all(&gateway_mcp_config.servers).await {
527 Ok(registry) => {
528 let registry = std::sync::Arc::new(registry);
529 mcp_registry_shared = Some(std::sync::Arc::clone(®istry));
530 if gateway_mcp_config.deferred_loading {
531 let operator_prefix =
532 format!("{}__", crate::agent::operator::OPERATOR_SERVER_NAME);
533 let kumiho_prefix = format!("{}__", crate::agent::kumiho::KUMIHO_SERVER_NAME);
534 let all_names = registry.tool_names();
535 let mut eager_count = 0usize;
536
537 for name in &all_names {
538 if name.starts_with(&operator_prefix) || name.starts_with(&kumiho_prefix) {
539 if let Some(def) = registry.get_tool_def(name).await {
540 let wrapper: std::sync::Arc<dyn tools::Tool> =
541 std::sync::Arc::new(tools::McpToolWrapper::new(
542 name.clone(),
543 def,
544 std::sync::Arc::clone(®istry),
545 ));
546 if let Some(ref handle) = delegate_handle_gw {
547 handle.write().push(std::sync::Arc::clone(&wrapper));
548 }
549 tools_registry_raw.push(Box::new(tools::ArcToolRef(wrapper)));
550 eager_count += 1;
551 }
552 }
553 }
554
555 let deferred_set = tools::DeferredMcpToolSet::from_registry_filtered(
556 std::sync::Arc::clone(®istry),
557 |name| {
558 !name.starts_with(&operator_prefix) && !name.starts_with(&kumiho_prefix)
559 },
560 )
561 .await;
562 tracing::info!(
563 "Gateway MCP hybrid: {} eager operator+kumiho tool(s), {} deferred stub(s) from {} server(s)",
564 eager_count,
565 deferred_set.len(),
566 registry.server_count()
567 );
568 let activated =
569 std::sync::Arc::new(std::sync::Mutex::new(tools::ActivatedToolSet::new()));
570 tools_registry_raw.push(Box::new(tools::ToolSearchTool::new(
571 deferred_set,
572 activated,
573 )));
574 } else {
575 let names = registry.tool_names();
576 let mut registered = 0usize;
577 for name in names {
578 if let Some(def) = registry.get_tool_def(&name).await {
579 let wrapper: std::sync::Arc<dyn tools::Tool> =
580 std::sync::Arc::new(tools::McpToolWrapper::new(
581 name,
582 def,
583 std::sync::Arc::clone(®istry),
584 ));
585 if let Some(ref handle) = delegate_handle_gw {
586 handle.write().push(std::sync::Arc::clone(&wrapper));
587 }
588 tools_registry_raw.push(Box::new(tools::ArcToolRef(wrapper)));
589 registered += 1;
590 }
591 }
592 tracing::info!(
593 "Gateway MCP: {} tool(s) registered from {} server(s)",
594 registered,
595 registry.server_count()
596 );
597 }
598 }
599 Err(e) => {
600 tracing::error!("Gateway MCP registry failed to initialize: {e:#}");
601 }
602 }
603 }
604
605 let tools_registry: Arc<Vec<ToolSpec>> =
606 Arc::new(tools_registry_raw.iter().map(|t| t.spec()).collect());
607
608 let cost_tracker = CostTracker::get_or_init_global(config.cost.clone(), &config.workspace_dir);
610
611 let audit_logger = if config.security.audit.enabled {
613 match crate::security::audit::AuditLogger::new(
614 config.security.audit.clone(),
615 std::path::PathBuf::from(&config.workspace_dir),
616 ) {
617 Ok(logger) => Some(Arc::new(logger)),
618 Err(e) => {
619 tracing::warn!("Audit logger init failed (dashboard audit viewer disabled): {e}");
620 None
621 }
622 }
623 } else {
624 None
625 };
626
627 let (event_tx, _event_rx) = tokio::sync::broadcast::channel::<serde_json::Value>(4096);
629 let webhook_secret_hash: Option<Arc<str>> =
631 config.channels_config.webhook.as_ref().and_then(|webhook| {
632 webhook.secret.as_ref().and_then(|raw_secret| {
633 let trimmed_secret = raw_secret.trim();
634 (!trimmed_secret.is_empty())
635 .then(|| Arc::<str>::from(hash_webhook_secret(trimmed_secret)))
636 })
637 });
638
639 let whatsapp_channel: Option<Arc<WhatsAppChannel>> = config
641 .channels_config
642 .whatsapp
643 .as_ref()
644 .filter(|wa| wa.is_cloud_config())
645 .map(|wa| {
646 Arc::new(WhatsAppChannel::new(
647 wa.access_token.clone().unwrap_or_default(),
648 wa.phone_number_id.clone().unwrap_or_default(),
649 wa.verify_token.clone().unwrap_or_default(),
650 wa.allowed_numbers.clone(),
651 ))
652 });
653
654 let whatsapp_app_secret: Option<Arc<str>> = std::env::var("CONSTRUCT_WHATSAPP_APP_SECRET")
657 .ok()
658 .and_then(|secret| {
659 let secret = secret.trim();
660 (!secret.is_empty()).then(|| secret.to_owned())
661 })
662 .or_else(|| {
663 config.channels_config.whatsapp.as_ref().and_then(|wa| {
664 wa.app_secret
665 .as_deref()
666 .map(str::trim)
667 .filter(|secret| !secret.is_empty())
668 .map(ToOwned::to_owned)
669 })
670 })
671 .map(Arc::from);
672
673 let linq_channel: Option<Arc<LinqChannel>> = config.channels_config.linq.as_ref().map(|lq| {
675 Arc::new(LinqChannel::new(
676 lq.api_token.clone(),
677 lq.from_phone.clone(),
678 lq.allowed_senders.clone(),
679 ))
680 });
681
682 let linq_signing_secret: Option<Arc<str>> = std::env::var("CONSTRUCT_LINQ_SIGNING_SECRET")
685 .ok()
686 .and_then(|secret| {
687 let secret = secret.trim();
688 (!secret.is_empty()).then(|| secret.to_owned())
689 })
690 .or_else(|| {
691 config.channels_config.linq.as_ref().and_then(|lq| {
692 lq.signing_secret
693 .as_deref()
694 .map(str::trim)
695 .filter(|secret| !secret.is_empty())
696 .map(ToOwned::to_owned)
697 })
698 })
699 .map(Arc::from);
700
701 let wati_channel: Option<Arc<WatiChannel>> =
703 config.channels_config.wati.as_ref().map(|wati_cfg| {
704 Arc::new(
705 WatiChannel::new(
706 wati_cfg.api_token.clone(),
707 wati_cfg.api_url.clone(),
708 wati_cfg.tenant_id.clone(),
709 wati_cfg.allowed_numbers.clone(),
710 )
711 .with_transcription(config.transcription.clone()),
712 )
713 });
714
715 let nextcloud_talk_channel: Option<Arc<NextcloudTalkChannel>> =
717 config.channels_config.nextcloud_talk.as_ref().map(|nc| {
718 Arc::new(NextcloudTalkChannel::new(
719 nc.base_url.clone(),
720 nc.app_token.clone(),
721 nc.bot_name.clone().unwrap_or_default(),
722 nc.allowed_users.clone(),
723 ))
724 });
725
726 let nextcloud_talk_webhook_secret: Option<Arc<str>> =
729 std::env::var("CONSTRUCT_NEXTCLOUD_TALK_WEBHOOK_SECRET")
730 .ok()
731 .and_then(|secret| {
732 let secret = secret.trim();
733 (!secret.is_empty()).then(|| secret.to_owned())
734 })
735 .or_else(|| {
736 config
737 .channels_config
738 .nextcloud_talk
739 .as_ref()
740 .and_then(|nc| {
741 nc.webhook_secret
742 .as_deref()
743 .map(str::trim)
744 .filter(|secret| !secret.is_empty())
745 .map(ToOwned::to_owned)
746 })
747 })
748 .map(Arc::from);
749
750 let gmail_push_channel: Option<Arc<GmailPushChannel>> = config
752 .channels_config
753 .gmail_push
754 .as_ref()
755 .filter(|gp| gp.enabled)
756 .map(|gp| Arc::new(GmailPushChannel::new(gp.clone())));
757
758 let session_backend: Option<Arc<dyn SessionBackend>> = if config.gateway.session_persistence {
760 match SqliteSessionBackend::new(&config.workspace_dir) {
761 Ok(b) => {
762 tracing::info!("Gateway session persistence enabled (SQLite)");
763 if config.gateway.session_ttl_hours > 0 {
764 if let Ok(cleaned) = b.cleanup_stale(config.gateway.session_ttl_hours) {
765 if cleaned > 0 {
766 tracing::info!("Cleaned up {cleaned} stale gateway sessions");
767 }
768 }
769 }
770 Some(Arc::new(b))
771 }
772 Err(e) => {
773 tracing::warn!("Session persistence disabled: {e}");
774 None
775 }
776 }
777 } else {
778 None
779 };
780
781 let pairing = Arc::new(PairingGuard::new(
783 config.gateway.require_pairing,
784 &config.gateway.paired_tokens,
785 ));
786 let rate_limit_max_keys = normalize_max_keys(
787 config.gateway.rate_limit_max_keys,
788 RATE_LIMIT_MAX_KEYS_DEFAULT,
789 );
790 let rate_limiter = Arc::new(GatewayRateLimiter::new(
791 config.gateway.pair_rate_limit_per_minute,
792 config.gateway.webhook_rate_limit_per_minute,
793 rate_limit_max_keys,
794 ));
795 let idempotency_max_keys = normalize_max_keys(
796 config.gateway.idempotency_max_keys,
797 IDEMPOTENCY_MAX_KEYS_DEFAULT,
798 );
799 let idempotency_store = Arc::new(IdempotencyStore::new(
800 Duration::from_secs(config.gateway.idempotency_ttl_secs.max(1)),
801 idempotency_max_keys,
802 ));
803
804 let path_prefix: Option<&str> = config
806 .gateway
807 .path_prefix
808 .as_deref()
809 .filter(|p| !p.is_empty());
810
811 let tunnel = crate::tunnel::create_tunnel(&config.tunnel)?;
813 let mut tunnel_url: Option<String> = None;
814
815 if let Some(ref tun) = tunnel {
816 println!("🔗 Starting {} tunnel...", tun.name());
817 match tun.start(host, actual_port).await {
818 Ok(url) => {
819 println!("🌐 Tunnel active: {url}");
820 tunnel_url = Some(url);
821 }
822 Err(e) => {
823 println!("⚠️ Tunnel failed to start: {e}");
824 println!(" Falling back to local-only mode.");
825 }
826 }
827 }
828
829 let pfx = path_prefix.unwrap_or("");
830 println!("🦀 Construct Gateway listening on http://{display_addr}{pfx}");
831 if let Some(ref url) = tunnel_url {
832 println!(" 🌐 Public URL: {url}");
833 }
834 println!(" 🌐 Web Dashboard: http://{display_addr}{pfx}/");
835 if let Some(code) = pairing.pairing_code() {
836 println!();
837 println!(" 🔐 PAIRING REQUIRED — use this one-time code:");
838 println!(" ┌──────────────┐");
839 println!(" │ {code} │");
840 println!(" └──────────────┘");
841 println!(" Send: POST {pfx}/pair with header X-Pairing-Code: {code}");
842 } else if pairing.require_pairing() {
843 println!(" 🔒 Pairing: ACTIVE (bearer token required)");
844 println!(" To pair a new device: construct gateway get-paircode --new");
845 println!();
846 } else {
847 println!(" ⚠️ Pairing: DISABLED (all requests accepted)");
848 println!();
849 }
850 println!(" POST {pfx}/pair — pair a new client (X-Pairing-Code header)");
851 println!(" POST {pfx}/webhook — {{\"message\": \"your prompt\"}}");
852 if whatsapp_channel.is_some() {
853 println!(" GET {pfx}/whatsapp — Meta webhook verification");
854 println!(" POST {pfx}/whatsapp — WhatsApp message webhook");
855 }
856 if linq_channel.is_some() {
857 println!(" POST {pfx}/linq — Linq message webhook (iMessage/RCS/SMS)");
858 }
859 if wati_channel.is_some() {
860 println!(" GET {pfx}/wati — WATI webhook verification");
861 println!(" POST {pfx}/wati — WATI message webhook");
862 }
863 if nextcloud_talk_channel.is_some() {
864 println!(" POST {pfx}/nextcloud-talk — Nextcloud Talk bot webhook");
865 }
866 println!(" GET {pfx}/api/* — REST API (bearer token required)");
867 println!(" GET {pfx}/ws/chat — WebSocket agent chat");
868 if config.nodes.enabled {
869 println!(" GET {pfx}/ws/nodes — WebSocket node discovery");
870 }
871 println!(" GET {pfx}/health — health check");
872 println!(" GET {pfx}/metrics — Prometheus metrics");
873 println!(" Press Ctrl+C to stop.\n");
874
875 crate::health::mark_component_ok("gateway");
876
877 if let Some(ref hooks) = hooks {
879 hooks.fire_gateway_start(host, actual_port).await;
880 }
881
882 let broadcast_observer: Arc<dyn crate::observability::Observer> =
884 Arc::new(sse::BroadcastObserver::new(
885 crate::observability::create_observer(&config.observability),
886 event_tx.clone(),
887 ));
888
889 let (shutdown_tx, mut shutdown_rx) = tokio::sync::watch::channel(false);
890
891 let node_registry = Arc::new(nodes::NodeRegistry::new(config.nodes.max_nodes));
893
894 let device_registry = if config.gateway.require_pairing {
896 Some(Arc::new(api_pairing::DeviceRegistry::new(
897 &config.workspace_dir,
898 )?))
899 } else {
900 None
901 };
902 let pending_pairings = if config.gateway.require_pairing {
903 Some(Arc::new(api_pairing::PairingStore::new(
904 config.gateway.pairing_dashboard.max_pending_codes,
905 )))
906 } else {
907 None
908 };
909
910 let mcp_workspace_dir = config.workspace_dir.clone();
924 let mcp_config_snapshot = config.clone();
925 let mcp_runtime_handles = {
926 let mut rh = crate::mcp_server::RuntimeHandles::empty();
927
928 if config.workspace.enabled {
930 let workspaces_dir = if config.workspace.workspaces_dir.starts_with("~/") {
931 let home = directories::UserDirs::new()
932 .map(|u| u.home_dir().to_path_buf())
933 .unwrap_or_else(|| std::path::PathBuf::from("."));
934 home.join(&config.workspace.workspaces_dir[2..])
935 } else {
936 std::path::PathBuf::from(&config.workspace.workspaces_dir)
937 };
938 let manager = crate::config::workspace::WorkspaceManager::new(workspaces_dir);
939 rh.workspace_manager = Some(Arc::new(tokio::sync::RwLock::new(manager)));
940 }
941
942 if let Ok(store) = crate::channels::session_store::SessionStore::new(&config.workspace_dir)
944 {
945 let backend: Arc<dyn crate::channels::session_backend::SessionBackend> =
946 Arc::new(store);
947 rh.session_store = Some(backend);
948 }
949
950 if !config.agents.is_empty() {
955 rh.agent_config = Some(Arc::new(config.agents.clone()));
956 rh.fallback_api_key = config.api_key.as_deref().map(Arc::<str>::from);
957 rh.provider_runtime_options =
958 Some(Arc::new(crate::providers::ProviderRuntimeOptions {
959 auth_profile_override: None,
960 provider_api_url: config.api_url.clone(),
961 construct_dir: config.config_path.parent().map(std::path::PathBuf::from),
962 secrets_encrypt: config.secrets.encrypt,
963 reasoning_enabled: config.runtime.reasoning_enabled,
964 reasoning_effort: config.runtime.reasoning_effort.clone(),
965 provider_timeout_secs: Some(config.provider_timeout_secs),
966 extra_headers: config.extra_headers.clone(),
967 api_path: config.api_path.clone(),
968 provider_max_tokens: config.provider_max_tokens,
969 }));
970 }
971
972 let (mcp_tools_boxed, _d_h, _r_h, _c_h, _a_h, _e_h) = tools::all_tools_with_runtime(
979 Arc::new(config.clone()),
980 &security,
981 Arc::new(crate::runtime::NativeRuntime::new()),
982 Arc::clone(&mem),
983 composio_key,
984 composio_entity_id,
985 &config.browser,
986 &config.http_request,
987 &config.web_fetch,
988 &config.workspace_dir,
989 &config.agents,
990 config.api_key.as_deref(),
991 &config,
992 Some(canvas_store.clone()),
993 );
994 let mcp_tool_arcs: Vec<Arc<dyn tools::Tool>> = mcp_tools_boxed
996 .into_iter()
997 .map(|b| Arc::<dyn tools::Tool>::from(b))
998 .collect();
999 rh.pre_built_tools = Some(mcp_tool_arcs);
1000
1001 rh
1002 };
1003
1004 let mut state = AppState {
1005 config: config_state,
1006 provider,
1007 model,
1008 temperature,
1009 mem,
1010 auto_save: config.memory.auto_save,
1011 webhook_secret_hash,
1012 pairing,
1013 trust_forwarded_headers: config.gateway.trust_forwarded_headers,
1014 rate_limiter,
1015 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
1016 idempotency_store,
1017 whatsapp: whatsapp_channel,
1018 whatsapp_app_secret,
1019 linq: linq_channel,
1020 linq_signing_secret,
1021 nextcloud_talk: nextcloud_talk_channel,
1022 nextcloud_talk_webhook_secret,
1023 wati: wati_channel,
1024 gmail_push: gmail_push_channel,
1025 observer: broadcast_observer,
1026 tools_registry,
1027 cost_tracker,
1028 audit_logger,
1029 event_tx,
1030 shutdown_tx,
1031 node_registry,
1032 session_backend,
1033 session_queue: Arc::new(session_queue::SessionActorQueue::new(8, 30, 600)),
1034 device_registry,
1035 pending_pairings,
1036 path_prefix: path_prefix.unwrap_or("").to_string(),
1037 canvas_store,
1038 mcp_registry: mcp_registry_shared,
1039 approval_registry: approval_registry::global(),
1040 mcp_local_url: None,
1042 #[cfg(feature = "webauthn")]
1043 webauthn: if config.security.webauthn.enabled {
1044 let secret_store = Arc::new(crate::security::SecretStore::new(
1045 &config.workspace_dir,
1046 true,
1047 ));
1048 let wa_config = crate::security::webauthn::WebAuthnConfig {
1049 enabled: true,
1050 rp_id: config.security.webauthn.rp_id.clone(),
1051 rp_origin: config.security.webauthn.rp_origin.clone(),
1052 rp_name: config.security.webauthn.rp_name.clone(),
1053 };
1054 Some(Arc::new(api_webauthn::WebAuthnState {
1055 manager: crate::security::webauthn::WebAuthnManager::new(
1056 wa_config,
1057 secret_store,
1058 &config.workspace_dir,
1059 ),
1060 pending_registrations: parking_lot::Mutex::new(std::collections::HashMap::new()),
1061 pending_authentications: parking_lot::Mutex::new(std::collections::HashMap::new()),
1062 }))
1063 } else {
1064 None
1065 },
1066 };
1067
1068 {
1083 let effectiveness_cache = crate::skills::EffectivenessCache::new();
1084 let _ = crate::skills::effectiveness_cache::set_global(effectiveness_cache.clone());
1085
1086 let memory_project = config.kumiho.memory_project.clone();
1087 let workspace_dir = config.workspace_dir.clone();
1088 let config_for_skills = config.clone();
1089
1090 {
1101 let scan_workspace = workspace_dir.clone();
1102 let scan_project = memory_project.clone();
1103 let scan_client = crate::gateway::kumiho_client::build_client_from_config(&config);
1104 drop(tokio::spawn(async move {
1105 let skills_root = crate::skills::skills_dir(&scan_workspace);
1106 let mut entries = match tokio::fs::read_dir(&skills_root).await {
1107 Ok(e) => e,
1108 Err(_) => return,
1109 };
1110 while let Ok(Some(entry)) = entries.next_entry().await {
1111 let dir = entry.path();
1112 if !dir.is_dir() {
1113 continue;
1114 }
1115 if !dir.join("SKILL.toml").exists() {
1116 continue;
1117 }
1118 let registration_ok =
1119 match crate::skills::registration::register_skill_with_kumiho(
1120 &dir,
1121 &scan_client,
1122 &scan_project,
1123 )
1124 .await
1125 {
1126 Ok(crate::skills::registration::SkillRegistration::Registered {
1127 kref,
1128 ..
1129 }) => {
1130 tracing::info!(
1131 skill_dir = %dir.display(),
1132 kref,
1133 "daemon-startup: registered skill with Kumiho",
1134 );
1135 true
1136 }
1137 Ok(
1138 crate::skills::registration::SkillRegistration::AlreadyRegistered {
1139 ..
1140 },
1141 ) => {
1142 true
1144 }
1145 Err(e) => {
1146 tracing::warn!(
1147 skill_dir = %dir.display(),
1148 error = ?e,
1149 "daemon-startup: skill registration failed; will retry next start",
1150 );
1151 false
1152 }
1153 };
1154
1155 if !registration_ok {
1162 continue;
1163 }
1164 match crate::skills::registration::sync_published_content_path(
1165 &dir,
1166 &scan_client,
1167 )
1168 .await
1169 {
1170 Ok(crate::skills::registration::SkillContentSync::Updated {
1171 new_content_file,
1172 ..
1173 }) => {
1174 tracing::info!(
1175 skill_dir = %dir.display(),
1176 content_file = %new_content_file,
1177 "daemon-startup: synced skill content_file from published kref",
1178 );
1179 }
1180 Ok(_) => {
1181 }
1184 Err(e) => {
1185 tracing::warn!(
1186 skill_dir = %dir.display(),
1187 error = ?e,
1188 "daemon-startup: skill content_file sync failed; \
1189 loader will use existing pointer until next sync",
1190 );
1191 }
1192 }
1193 }
1194 }));
1195 }
1196
1197 let skill_names: Vec<String> =
1201 crate::skills::load_skills_with_config(&workspace_dir, &config_for_skills)
1202 .into_iter()
1203 .map(|s| s.name)
1204 .collect();
1205
1206 if skill_names.is_empty() {
1207 tracing::info!("skill effectiveness: no skills loaded; refresh task skipped");
1208 } else {
1209 let kumiho_client = Arc::new(crate::gateway::kumiho_client::build_client_from_config(
1210 &config,
1211 ));
1212 tracing::info!(
1213 count = skill_names.len(),
1214 project = %memory_project,
1215 "skill effectiveness: starting background refresh task",
1216 );
1217 drop(effectiveness_cache.clone().spawn_refresh_task(
1225 kumiho_client.clone(),
1226 memory_project,
1227 skill_names,
1228 crate::skills::effectiveness_cache::DEFAULT_REFRESH_INTERVAL,
1229 ));
1230
1231 #[cfg(feature = "skill-creation")]
1241 if config.skills.skill_improvement.enabled {
1242 let cache_for_improver = effectiveness_cache.clone();
1243 let auto_workspace = workspace_dir.clone();
1244 let auto_provider = state.provider.clone();
1245 let auto_model = state.model.clone();
1246 let auto_improver_config = config.skills.skill_improvement.clone();
1247 let auto_kumiho = kumiho_client.clone();
1248 let auto_project = config.kumiho.memory_project.clone();
1249
1250 tracing::info!(
1251 cooldown_secs = auto_improver_config.cooldown_secs,
1252 "skill auto-improve: starting LLM-driven rewrite loop",
1253 );
1254
1255 drop(tokio::spawn(async move {
1256 let ctx = crate::skills::auto_improve::AutoImproveContext {
1257 workspace_dir: auto_workspace.clone(),
1258 provider: auto_provider,
1259 model: auto_model,
1260 temperature: crate::skills::auto_improve::DEFAULT_REWRITE_TEMPERATURE,
1261 kumiho_client: auto_kumiho.clone(),
1262 memory_project: auto_project.clone(),
1263 };
1264 let mut improver = crate::skills::improver::SkillImprover::new(
1265 auto_workspace.clone(),
1266 auto_improver_config,
1267 );
1268
1269 let rollback_ctx = crate::skills::auto_rollback::AutoRollbackContext {
1274 workspace_dir: auto_workspace.clone(),
1275 kumiho_client: auto_kumiho,
1276 memory_project: auto_project,
1277 };
1278 let mut rollback_tracker =
1279 crate::skills::auto_rollback::SkillRollbackTracker::new(auto_workspace);
1280
1281 let mut ticker = tokio::time::interval(
1282 crate::skills::effectiveness_cache::DEFAULT_REFRESH_INTERVAL,
1283 );
1284 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1285 loop {
1291 ticker.tick().await;
1292 for cand in cache_for_improver.improvement_candidates() {
1293 match crate::skills::auto_improve::attempt_skill_improvement(
1294 &ctx,
1295 &cand,
1296 &mut improver,
1297 )
1298 .await
1299 {
1300 Ok(Some(outcome)) => tracing::info!(
1301 skill = %outcome.slug,
1302 revision_kref = %outcome.revision_kref,
1303 content_file = %outcome.content_file,
1304 rate = cand.rate,
1305 total = cand.total,
1306 "auto-improve: published new skill revision",
1307 ),
1308 Ok(None) => {
1309 }
1311 Err(e) => tracing::warn!(
1312 skill = %cand.skill_name,
1313 error = %e,
1314 "auto-improve: attempt failed",
1315 ),
1316 }
1317 }
1318
1319 for cand in cache_for_improver.regression_candidates() {
1327 match crate::skills::auto_rollback::attempt_skill_rollback(
1328 &rollback_ctx,
1329 &cand,
1330 &mut rollback_tracker,
1331 )
1332 .await
1333 {
1334 Ok(Some(outcome)) => tracing::warn!(
1335 skill = %outcome.slug,
1336 restored_revision_kref = %outcome.restored_revision_kref,
1337 demoted_revision_kref = %outcome.demoted_revision_kref,
1338 content_file = %outcome.content_file,
1339 current_rate = cand.current_rate,
1340 previous_rate = cand.previous_rate,
1341 "auto-rollback: reverted regressed skill revision",
1342 ),
1343 Ok(None) => {
1344 }
1346 Err(e) => tracing::warn!(
1347 skill = %cand.skill_name,
1348 error = %e,
1349 "auto-rollback: attempt failed",
1350 ),
1351 }
1352 }
1353 }
1354 }));
1355 }
1356 }
1357 }
1358
1359 let mcp_shutdown_watch = state.shutdown_tx.subscribe();
1366 let mcp_task: Option<tokio::task::JoinHandle<()>> = {
1367 let (mcp_state, mcp_skipped) = crate::mcp_server::server::state_from_runtime(
1368 &mcp_workspace_dir,
1369 &mcp_config_snapshot,
1370 &mcp_runtime_handles,
1371 );
1372 for (name, reason) in &mcp_skipped {
1373 tracing::info!("mcp-server: skipped tool `{name}` — {reason}");
1374 }
1375 tracing::info!(
1376 "mcp-server: advertising {} tools to external MCP clients",
1377 mcp_state.tools.len()
1378 );
1379
1380 match crate::mcp_server::serve_on(SocketAddr::from(([127, 0, 0, 1], 0)), mcp_state).await {
1381 Ok(handle) => {
1382 let url = format!("http://{}/mcp", handle.addr);
1383 if let Err(e) = crate::mcp_server::server::write_discovery_file(&url) {
1384 tracing::warn!("mcp-server: failed to write discovery file: {e}");
1385 } else {
1386 tracing::info!(
1387 "mcp-server: listening on {url} (discovery at ~/.construct/mcp.json)"
1388 );
1389 }
1390
1391 state.mcp_local_url = Some(Arc::from(format!("http://{}", handle.addr)));
1394
1395 let mut watch = mcp_shutdown_watch;
1396 Some(tokio::spawn(async move {
1397 loop {
1399 if *watch.borrow() {
1400 break;
1401 }
1402 if watch.changed().await.is_err() {
1403 break;
1404 }
1405 }
1406 let _ = handle.shutdown.send(());
1407 let _ = handle.joined.await;
1408 crate::mcp_server::server::cleanup_discovery_file();
1409 tracing::info!("mcp-server: stopped");
1410 }))
1411 }
1412 Err(e) => {
1413 tracing::error!("mcp-server: failed to bind: {e}");
1414 None
1415 }
1416 }
1417 };
1418
1419 let config_put_router = Router::new()
1421 .route("/api/config", put(api::handle_api_config_put))
1422 .layer(RequestBodyLimitLayer::new(1_048_576));
1423
1424 let memory_graph_router = Router::new()
1427 .route(
1428 "/api/memory/graph",
1429 get(api_memory_graph::handle_memory_graph),
1430 )
1431 .with_state(state.clone())
1432 .layer(RequestBodyLimitLayer::new(MAX_BODY_SIZE))
1433 .layer(TimeoutLayer::with_status_code(
1434 StatusCode::REQUEST_TIMEOUT,
1435 Duration::from_secs(60),
1436 ));
1437
1438 let inner = Router::new()
1440 .route("/admin/shutdown", post(handle_admin_shutdown))
1442 .route("/admin/paircode", get(handle_admin_paircode))
1443 .route("/admin/paircode/new", post(handle_admin_paircode_new))
1444 .route("/health", get(handle_health))
1446 .route("/metrics", get(handle_metrics))
1447 .route("/pair", post(handle_pair))
1448 .route("/pair/code", get(handle_pair_code))
1449 .route("/webhook", post(handle_webhook))
1450 .route("/whatsapp", get(handle_whatsapp_verify))
1451 .route("/whatsapp", post(handle_whatsapp_message))
1452 .route("/linq", post(handle_linq_webhook))
1453 .route("/wati", get(handle_wati_verify))
1454 .route("/wati", post(handle_wati_webhook))
1455 .route("/nextcloud-talk", post(handle_nextcloud_talk_webhook))
1456 .route("/webhook/gmail", post(handle_gmail_push_webhook))
1457 .route("/hooks/claude-code", post(api::handle_claude_code_hook))
1459 .route("/api/status", get(api::handle_api_status))
1461 .route("/api/config", get(api::handle_api_config_get))
1462 .route("/api/tools", get(api::handle_api_tools))
1463 .route("/api/cron", get(api::handle_api_cron_list))
1464 .route("/api/cron", post(api::handle_api_cron_add))
1465 .route(
1466 "/api/cron/settings",
1467 get(api::handle_api_cron_settings_get).patch(api::handle_api_cron_settings_patch),
1468 )
1469 .route(
1470 "/api/cron/{id}",
1471 delete(api::handle_api_cron_delete).patch(api::handle_api_cron_patch),
1472 )
1473 .route("/api/cron/{id}/runs", get(api::handle_api_cron_runs))
1474 .route("/api/integrations", get(api::handle_api_integrations))
1475 .route(
1476 "/api/integrations/settings",
1477 get(api::handle_api_integrations_settings),
1478 )
1479 .route(
1480 "/api/doctor",
1481 get(api::handle_api_doctor).post(api::handle_api_doctor),
1482 )
1483 .route("/api/cost", get(api::handle_api_cost))
1485 .route("/api/audit", get(api::handle_api_audit))
1486 .route("/api/audit/verify", get(api::handle_api_audit_verify))
1487 .route("/api/cli-tools", get(api::handle_api_cli_tools))
1488 .route("/api/health", get(api::handle_api_health))
1489 .route("/api/mcp/discovery", get(api_mcp::handle_api_mcp_discovery))
1490 .route("/api/mcp/servers/test", post(api_mcp::handle_api_mcp_servers_test))
1491 .route("/api/mcp/health", get(api_mcp::handle_api_mcp_health))
1493 .route("/api/mcp/session", post(api_mcp::handle_api_mcp_session_create))
1494 .route(
1495 "/api/mcp/session/{session_id}/events",
1496 get(api_mcp::handle_api_mcp_session_events),
1497 )
1498 .route("/api/mcp/call", post(api_mcp::handle_api_mcp_call))
1499 .route("/api/nodes", get(api::handle_api_nodes))
1500 .route("/api/nodes/{node_id}/invoke", post(api::handle_api_node_invoke))
1501 .route("/api/sessions", get(api::handle_api_sessions_list))
1502 .route("/api/sessions/running", get(api::handle_api_sessions_running))
1503 .route(
1504 "/api/sessions/{id}/messages",
1505 get(api::handle_api_session_messages),
1506 )
1507 .route("/api/sessions/{id}", delete(api::handle_api_session_delete).put(api::handle_api_session_rename))
1508 .route("/api/sessions/{id}/state", get(api::handle_api_session_state))
1509 .route("/api/channels", get(api::handle_api_channels))
1511 .route("/api/channel-events", post(api::handle_api_channel_events))
1512 .route("/api/agents", get(api_agents::handle_list_agents).post(api_agents::handle_create_agent))
1514 .route("/api/agents/deprecate", post(api_agents::handle_deprecate_agent))
1515 .route("/api/agents/{*kref}", put(api_agents::handle_update_agent).delete(api_agents::handle_delete_agent))
1516 .route("/api/skills", get(api_skills::handle_list_skills).post(api_skills::handle_create_skill))
1518 .route("/api/skills/deprecate", post(api_skills::handle_deprecate_skill))
1519 .route("/api/skills/{*kref}", get(api_skills::handle_get_skill).put(api_skills::handle_update_skill).delete(api_skills::handle_delete_skill))
1520 .route("/api/teams", get(api_teams::handle_list_teams).post(api_teams::handle_create_team))
1522 .route("/api/teams/deprecate", post(api_teams::handle_deprecate_team))
1523 .route("/api/teams/{*kref}", get(api_teams::handle_get_team).put(api_teams::handle_update_team).delete(api_teams::handle_delete_team))
1524 .route("/api/workflows", get(api_workflows::handle_list_workflows).post(api_workflows::handle_create_workflow))
1526 .route("/api/workflows/deprecate", post(api_workflows::handle_deprecate_workflow))
1527 .route("/api/workflows/run/{name}", post(api_workflows::handle_run_workflow))
1528 .route("/api/workflows/revisions/{*kref}", get(api_workflows::handle_get_workflow_by_revision))
1529 .route("/api/workflows/{*kref}", put(api_workflows::handle_update_workflow).delete(api_workflows::handle_delete_workflow))
1530 .route("/api/workflows/runs", get(api_workflows::handle_list_workflow_runs))
1531 .route("/api/workflows/runs/{run_id}", get(api_workflows::handle_get_workflow_run).delete(api_workflows::handle_delete_workflow_run))
1532 .route("/api/workflows/runs/{run_id}/approve", post(api_workflows::handle_approve_workflow_run))
1533 .route("/api/workflows/runs/{run_id}/retry", post(api_workflows::handle_retry_workflow_run))
1534 .route("/api/workflows/agent-activity/{agent_id}", get(api_workflows::handle_agent_activity))
1535 .route("/api/workflows/dashboard", get(api_workflows::handle_workflow_dashboard))
1536 .route("/api/clawhub/search", get(api_clawhub::handle_clawhub_search))
1538 .route("/api/clawhub/trending", get(api_clawhub::handle_clawhub_trending))
1539 .route("/api/clawhub/skills/{slug}", get(api_clawhub::handle_clawhub_skill_detail))
1540 .route("/api/clawhub/install/{slug}", post(api_clawhub::handle_clawhub_install))
1541 .route("/api/kumiho/{*path}", get(api_kumiho_proxy::handle_kumiho_proxy))
1544 .route("/api/artifact-body", get(api_artifact_body::handle_artifact_body))
1546 .route("/api/pairing/initiate", post(api_pairing::initiate_pairing))
1548 .route("/api/pair", post(api_pairing::submit_pairing_enhanced))
1549 .route("/api/devices", get(api_pairing::list_devices))
1550 .route("/api/devices/{id}", delete(api_pairing::revoke_device))
1551 .route(
1552 "/api/devices/{id}/token/rotate",
1553 post(api_pairing::rotate_token),
1554 )
1555 .route("/api/canvas", get(canvas::handle_canvas_list))
1557 .route(
1558 "/api/canvas/{id}",
1559 get(canvas::handle_canvas_get)
1560 .post(canvas::handle_canvas_post)
1561 .delete(canvas::handle_canvas_clear),
1562 )
1563 .route(
1564 "/api/canvas/{id}/history",
1565 get(canvas::handle_canvas_history),
1566 );
1567
1568 #[cfg(feature = "webauthn")]
1570 let inner = inner
1571 .route(
1572 "/api/webauthn/register/start",
1573 post(api_webauthn::handle_register_start),
1574 )
1575 .route(
1576 "/api/webauthn/register/finish",
1577 post(api_webauthn::handle_register_finish),
1578 )
1579 .route(
1580 "/api/webauthn/auth/start",
1581 post(api_webauthn::handle_auth_start),
1582 )
1583 .route(
1584 "/api/webauthn/auth/finish",
1585 post(api_webauthn::handle_auth_finish),
1586 )
1587 .route(
1588 "/api/webauthn/credentials",
1589 get(api_webauthn::handle_list_credentials),
1590 )
1591 .route(
1592 "/api/webauthn/credentials/{id}",
1593 delete(api_webauthn::handle_delete_credential),
1594 );
1595
1596 #[cfg(feature = "plugins-wasm")]
1598 let inner = inner.route(
1599 "/api/plugins",
1600 get(api_plugins::plugin_routes::list_plugins),
1601 );
1602
1603 let inner = inner
1604 .route("/api/events", get(sse::handle_sse_events))
1606 .route("/api/daemon/logs", get(sse::handle_api_daemon_logs))
1607 .route("/ws/chat", get(ws::handle_ws_chat))
1609 .route("/ws/canvas/{id}", get(canvas::handle_ws_canvas))
1611 .route("/ws/nodes", get(nodes::handle_ws_nodes));
1613
1614 #[cfg(not(target_os = "android"))]
1619 let inner = inner.route("/ws/terminal", get(terminal::handle_ws_terminal));
1620
1621 let inner = inner
1622 .route("/ws/mcp/events", get(ws_mcp_events::handle_ws_mcp_events))
1624 .route("/_app/{*path}", get(static_files::handle_static))
1626 .merge(config_put_router)
1628 .fallback(get(static_files::handle_spa_fallback))
1630 .with_state(state)
1631 .layer(RequestBodyLimitLayer::new(MAX_BODY_SIZE))
1632 .layer(TimeoutLayer::with_status_code(
1633 StatusCode::REQUEST_TIMEOUT,
1634 Duration::from_secs(gateway_request_timeout_secs()),
1635 ));
1636
1637 let inner = inner.merge(memory_graph_router);
1639
1640 let app = if let Some(prefix) = path_prefix {
1644 let redirect_target = prefix.to_string();
1645 Router::new().nest(prefix, inner).route(
1646 &format!("{prefix}/"),
1647 get(|| async move { axum::response::Redirect::permanent(&redirect_target) }),
1648 )
1649 } else {
1650 inner
1651 };
1652
1653 let tls_acceptor = match &config.gateway.tls {
1655 Some(tls_cfg) if tls_cfg.enabled => {
1656 let has_mtls = tls_cfg.client_auth.as_ref().is_some_and(|ca| ca.enabled);
1657 if has_mtls {
1658 tracing::info!("TLS enabled with mutual TLS (mTLS) client verification");
1659 } else {
1660 tracing::info!("TLS enabled (no client certificate requirement)");
1661 }
1662 Some(tls::build_tls_acceptor(tls_cfg)?)
1663 }
1664 _ => None,
1665 };
1666
1667 if let Some(tls_acceptor) = tls_acceptor {
1668 let app = app.into_make_service_with_connect_info::<SocketAddr>();
1670 let mut app = app;
1671
1672 let mut shutdown_signal = shutdown_rx;
1673 loop {
1674 tokio::select! {
1675 conn = listener.accept() => {
1676 let (tcp_stream, remote_addr) = conn?;
1677 let tls_acceptor = tls_acceptor.clone();
1678 let svc = tower::MakeService::<
1679 SocketAddr,
1680 hyper::Request<hyper::body::Incoming>,
1681 >::make_service(&mut app, remote_addr)
1682 .await
1683 .expect("infallible make_service");
1684
1685 tokio::spawn(async move {
1686 let tls_stream = match tls_acceptor.accept(tcp_stream).await {
1687 Ok(s) => s,
1688 Err(e) => {
1689 tracing::debug!("TLS handshake failed from {remote_addr}: {e}");
1690 return;
1691 }
1692 };
1693 let io = hyper_util::rt::TokioIo::new(tls_stream);
1694 let hyper_svc = hyper::service::service_fn(move |req: hyper::Request<hyper::body::Incoming>| {
1695 let mut svc = svc.clone();
1696 async move {
1697 tower::Service::call(&mut svc, req).await
1698 }
1699 });
1700 if let Err(e) = hyper_util::server::conn::auto::Builder::new(
1701 hyper_util::rt::TokioExecutor::new(),
1702 )
1703 .serve_connection(io, hyper_svc)
1704 .await
1705 {
1706 tracing::debug!("connection error from {remote_addr}: {e}");
1707 }
1708 });
1709 }
1710 _ = shutdown_signal.changed() => {
1711 tracing::info!("🦀 Construct Gateway shutting down...");
1712 break;
1713 }
1714 }
1715 }
1716 } else {
1717 axum::serve(
1719 listener,
1720 app.into_make_service_with_connect_info::<SocketAddr>(),
1721 )
1722 .with_graceful_shutdown(async move {
1723 let _ = shutdown_rx.changed().await;
1724 tracing::info!("🦀 Construct Gateway shutting down...");
1725 })
1726 .await?;
1727 }
1728
1729 if let Some(task) = mcp_task {
1732 let _ = task.await;
1733 }
1734
1735 Ok(())
1736}
1737
1738async fn handle_health(State(state): State<AppState>) -> impl IntoResponse {
1744 let body = serde_json::json!({
1745 "status": "ok",
1746 "paired": state.pairing.is_paired(),
1747 "require_pairing": state.pairing.require_pairing(),
1748 "runtime": crate::health::snapshot_json(),
1749 });
1750 Json(body)
1751}
1752
1753const PROMETHEUS_CONTENT_TYPE: &str = "text/plain; version=0.0.4; charset=utf-8";
1755
1756fn prometheus_disabled_hint() -> String {
1757 String::from(
1758 "# Prometheus backend not enabled. Set [observability] backend = \"prometheus\" in config.\n",
1759 )
1760}
1761
1762#[cfg(feature = "observability-prometheus")]
1763fn prometheus_observer_from_state(
1764 observer: &dyn crate::observability::Observer,
1765) -> Option<&crate::observability::PrometheusObserver> {
1766 observer
1767 .as_any()
1768 .downcast_ref::<crate::observability::PrometheusObserver>()
1769 .or_else(|| {
1770 observer
1771 .as_any()
1772 .downcast_ref::<sse::BroadcastObserver>()
1773 .and_then(|broadcast| {
1774 broadcast
1775 .inner()
1776 .as_any()
1777 .downcast_ref::<crate::observability::PrometheusObserver>()
1778 })
1779 })
1780}
1781
1782async fn handle_metrics(State(state): State<AppState>) -> impl IntoResponse {
1784 let body = {
1785 #[cfg(feature = "observability-prometheus")]
1786 {
1787 if let Some(prom) = prometheus_observer_from_state(state.observer.as_ref()) {
1788 prom.encode()
1789 } else {
1790 prometheus_disabled_hint()
1791 }
1792 }
1793 #[cfg(not(feature = "observability-prometheus"))]
1794 {
1795 let _ = &state;
1796 prometheus_disabled_hint()
1797 }
1798 };
1799
1800 (
1801 StatusCode::OK,
1802 [(header::CONTENT_TYPE, PROMETHEUS_CONTENT_TYPE)],
1803 body,
1804 )
1805}
1806
1807#[axum::debug_handler]
1809async fn handle_pair(
1810 State(state): State<AppState>,
1811 ConnectInfo(peer_addr): ConnectInfo<SocketAddr>,
1812 headers: HeaderMap,
1813) -> impl IntoResponse {
1814 let rate_key =
1815 client_key_from_request(Some(peer_addr), &headers, state.trust_forwarded_headers);
1816 let peer_is_loopback = peer_addr.ip().is_loopback();
1817 if !state.rate_limiter.allow_pair(&rate_key) {
1818 tracing::warn!("/pair rate limit exceeded");
1819 let err = serde_json::json!({
1820 "error": "Too many pairing requests. Please retry later.",
1821 "retry_after": RATE_LIMIT_WINDOW_SECS,
1822 });
1823 return (StatusCode::TOO_MANY_REQUESTS, Json(err));
1824 }
1825
1826 if let Err(e) = state
1828 .auth_limiter
1829 .check_rate_limit(&rate_key, peer_is_loopback)
1830 {
1831 tracing::warn!("🔐 Pairing auth rate limit exceeded for {rate_key}");
1832 let err = serde_json::json!({
1833 "error": format!("Too many auth attempts. Try again in {}s.", e.retry_after_secs),
1834 "retry_after": e.retry_after_secs,
1835 });
1836 return (StatusCode::TOO_MANY_REQUESTS, Json(err));
1837 }
1838
1839 let code = headers
1840 .get("X-Pairing-Code")
1841 .and_then(|v| v.to_str().ok())
1842 .unwrap_or("");
1843
1844 match state.pairing.try_pair(code, &rate_key).await {
1845 Ok(Some(token)) => {
1846 tracing::info!("🔐 New client paired successfully");
1847 if let Some(ref logger) = state.audit_logger {
1848 let _ =
1849 logger.log_auth_success("gateway", &format!("Client paired from {rate_key}"));
1850 }
1851
1852 if let Some(ref registry) = state.device_registry {
1861 use chrono::Utc;
1862 let now = Utc::now();
1863 let info = api_pairing::DeviceInfo {
1864 id: uuid::Uuid::new_v4().to_string(),
1865 name: None,
1866 device_type: Some("legacy-pair".to_string()),
1867 paired_at: now,
1868 last_seen: now,
1869 ip_address: Some(peer_addr.ip().to_string()),
1870 };
1871 if let Err(err) =
1872 registry.register(crate::security::pairing::hash_token(&token), info)
1873 {
1874 tracing::warn!(
1875 "🔐 Pairing succeeded but device registry insert failed: {err:#}"
1876 );
1877 }
1878 }
1879
1880 if let Err(err) =
1881 Box::pin(persist_pairing_tokens(state.config.clone(), &state.pairing)).await
1882 {
1883 tracing::error!("🔐 Pairing succeeded but token persistence failed: {err:#}");
1884 let body = serde_json::json!({
1885 "paired": true,
1886 "persisted": false,
1887 "token": token,
1888 "message": "Paired for this process, but failed to persist token to config.toml. Check config path and write permissions.",
1889 });
1890 return (StatusCode::OK, Json(body));
1891 }
1892
1893 let body = serde_json::json!({
1894 "paired": true,
1895 "persisted": true,
1896 "token": token,
1897 "message": "Save this token — use it as Authorization: Bearer <token>"
1898 });
1899 (StatusCode::OK, Json(body))
1900 }
1901 Ok(None) => {
1902 state
1903 .auth_limiter
1904 .record_attempt(&rate_key, peer_is_loopback);
1905 tracing::warn!("🔐 Pairing attempt with invalid code");
1906 if let Some(ref logger) = state.audit_logger {
1907 let _ = logger
1908 .log_auth_failure("gateway", &format!("Invalid pairing code from {rate_key}"));
1909 }
1910 let err = serde_json::json!({"error": "Invalid pairing code"});
1911 (StatusCode::FORBIDDEN, Json(err))
1912 }
1913 Err(lockout_secs) => {
1914 tracing::warn!(
1915 "🔐 Pairing locked out — too many failed attempts ({lockout_secs}s remaining)"
1916 );
1917 if let Some(ref logger) = state.audit_logger {
1918 let _ = logger.log_auth_failure(
1919 "gateway",
1920 &format!("Pairing lockout for {rate_key} ({lockout_secs}s)"),
1921 );
1922 }
1923 let err = serde_json::json!({
1924 "error": format!("Too many failed attempts. Try again in {lockout_secs}s."),
1925 "retry_after": lockout_secs
1926 });
1927 (StatusCode::TOO_MANY_REQUESTS, Json(err))
1928 }
1929 }
1930}
1931
1932async fn persist_pairing_tokens(config: Arc<Mutex<Config>>, pairing: &PairingGuard) -> Result<()> {
1933 let paired_tokens = pairing.tokens();
1934 let mut updated_cfg = { config.lock().clone() };
1937 updated_cfg.gateway.paired_tokens = paired_tokens;
1938 updated_cfg
1939 .save()
1940 .await
1941 .context("Failed to persist paired tokens to config.toml")?;
1942
1943 *config.lock() = updated_cfg;
1945 Ok(())
1946}
1947
1948async fn run_gateway_chat_simple(state: &AppState, message: &str) -> anyhow::Result<String> {
1950 let user_messages = vec![ChatMessage::user(message)];
1951
1952 let system_prompt = {
1955 let config_guard = state.config.lock();
1956 crate::channels::build_system_prompt(
1957 &config_guard.workspace_dir,
1958 &state.model,
1959 &[], &[], Some(&config_guard.identity),
1962 None, )
1964 };
1965
1966 let mut messages = Vec::with_capacity(1 + user_messages.len());
1967 messages.push(ChatMessage::system(system_prompt));
1968 messages.extend(user_messages);
1969
1970 let multimodal_config = state.config.lock().multimodal.clone();
1971 let prepared =
1972 crate::multimodal::prepare_messages_for_provider(&messages, &multimodal_config).await?;
1973
1974 state
1975 .provider
1976 .chat_with_history(&prepared.messages, &state.model, state.temperature)
1977 .await
1978}
1979
1980async fn run_gateway_chat_with_tools(
1982 state: &AppState,
1983 message: &str,
1984 session_id: Option<&str>,
1985) -> anyhow::Result<String> {
1986 let config = state.config.lock().clone();
1987 Box::pin(crate::agent::process_message(config, message, session_id)).await
1988}
1989
1990#[derive(serde::Deserialize)]
1992pub struct WebhookBody {
1993 pub message: String,
1994}
1995
1996async fn handle_webhook(
1998 State(state): State<AppState>,
1999 ConnectInfo(peer_addr): ConnectInfo<SocketAddr>,
2000 headers: HeaderMap,
2001 body: Result<Json<WebhookBody>, axum::extract::rejection::JsonRejection>,
2002) -> impl IntoResponse {
2003 let rate_key =
2004 client_key_from_request(Some(peer_addr), &headers, state.trust_forwarded_headers);
2005 let peer_is_loopback = peer_addr.ip().is_loopback();
2006 if !state.rate_limiter.allow_webhook(&rate_key) {
2007 tracing::warn!("/webhook rate limit exceeded");
2008 let err = serde_json::json!({
2009 "error": "Too many webhook requests. Please retry later.",
2010 "retry_after": RATE_LIMIT_WINDOW_SECS,
2011 });
2012 return (StatusCode::TOO_MANY_REQUESTS, Json(err));
2013 }
2014
2015 if state.pairing.require_pairing() {
2017 if let Err(e) = state
2018 .auth_limiter
2019 .check_rate_limit(&rate_key, peer_is_loopback)
2020 {
2021 tracing::warn!("Webhook: auth rate limit exceeded for {rate_key}");
2022 let err = serde_json::json!({
2023 "error": format!("Too many auth attempts. Try again in {}s.", e.retry_after_secs),
2024 "retry_after": e.retry_after_secs,
2025 });
2026 return (StatusCode::TOO_MANY_REQUESTS, Json(err));
2027 }
2028 let auth = headers
2029 .get(header::AUTHORIZATION)
2030 .and_then(|v| v.to_str().ok())
2031 .unwrap_or("");
2032 let token = auth.strip_prefix("Bearer ").unwrap_or("");
2033 if !state.pairing.is_authenticated(token) {
2034 state
2035 .auth_limiter
2036 .record_attempt(&rate_key, peer_is_loopback);
2037 tracing::warn!("Webhook: rejected — not paired / invalid bearer token");
2038 let err = serde_json::json!({
2039 "error": "Unauthorized — pair first via POST /pair, then send Authorization: Bearer <token>"
2040 });
2041 return (StatusCode::UNAUTHORIZED, Json(err));
2042 }
2043 }
2044
2045 if let Some(ref secret_hash) = state.webhook_secret_hash {
2047 let header_hash = headers
2048 .get("X-Webhook-Secret")
2049 .and_then(|v| v.to_str().ok())
2050 .map(str::trim)
2051 .filter(|value| !value.is_empty())
2052 .map(hash_webhook_secret);
2053 match header_hash {
2054 Some(val) if constant_time_eq(&val, secret_hash.as_ref()) => {}
2055 _ => {
2056 tracing::warn!("Webhook: rejected request — invalid or missing X-Webhook-Secret");
2057 let err = serde_json::json!({"error": "Unauthorized — invalid or missing X-Webhook-Secret header"});
2058 return (StatusCode::UNAUTHORIZED, Json(err));
2059 }
2060 }
2061 }
2062
2063 let Json(webhook_body) = match body {
2065 Ok(b) => b,
2066 Err(e) => {
2067 tracing::warn!("Webhook JSON parse error: {e}");
2068 let err = serde_json::json!({
2069 "error": "Invalid JSON body. Expected: {\"message\": \"...\"}"
2070 });
2071 return (StatusCode::BAD_REQUEST, Json(err));
2072 }
2073 };
2074
2075 if let Some(idempotency_key) = headers
2077 .get("X-Idempotency-Key")
2078 .and_then(|v| v.to_str().ok())
2079 .map(str::trim)
2080 .filter(|value| !value.is_empty())
2081 {
2082 if !state.idempotency_store.record_if_new(idempotency_key) {
2083 tracing::info!("Webhook duplicate ignored (idempotency key: {idempotency_key})");
2084 let body = serde_json::json!({
2085 "status": "duplicate",
2086 "idempotent": true,
2087 "message": "Request already processed for this idempotency key"
2088 });
2089 return (StatusCode::OK, Json(body));
2090 }
2091 }
2092
2093 let message = &webhook_body.message;
2094 let session_id = webhook_session_id(&headers);
2095
2096 if state.auto_save && !memory::should_skip_autosave_content(message) {
2097 let key = webhook_memory_key();
2098 let _ = state
2099 .mem
2100 .store(
2101 &key,
2102 message,
2103 MemoryCategory::Conversation,
2104 session_id.as_deref(),
2105 )
2106 .await;
2107 }
2108
2109 let provider_label = state
2110 .config
2111 .lock()
2112 .default_provider
2113 .clone()
2114 .unwrap_or_else(|| "unknown".to_string());
2115 let model_label = state.model.clone();
2116 let started_at = Instant::now();
2117
2118 state
2119 .observer
2120 .record_event(&crate::observability::ObserverEvent::AgentStart {
2121 provider: provider_label.clone(),
2122 model: model_label.clone(),
2123 });
2124 state
2125 .observer
2126 .record_event(&crate::observability::ObserverEvent::LlmRequest {
2127 provider: provider_label.clone(),
2128 model: model_label.clone(),
2129 messages_count: 1,
2130 });
2131
2132 match run_gateway_chat_simple(&state, message).await {
2133 Ok(response) => {
2134 let duration = started_at.elapsed();
2135 state
2136 .observer
2137 .record_event(&crate::observability::ObserverEvent::LlmResponse {
2138 provider: provider_label.clone(),
2139 model: model_label.clone(),
2140 duration,
2141 success: true,
2142 error_message: None,
2143 input_tokens: None,
2144 output_tokens: None,
2145 });
2146 state.observer.record_metric(
2147 &crate::observability::traits::ObserverMetric::RequestLatency(duration),
2148 );
2149 state
2150 .observer
2151 .record_event(&crate::observability::ObserverEvent::AgentEnd {
2152 provider: provider_label,
2153 model: model_label,
2154 duration,
2155 tokens_used: None,
2156 cost_usd: None,
2157 });
2158
2159 let body = serde_json::json!({"response": response, "model": state.model});
2160 (StatusCode::OK, Json(body))
2161 }
2162 Err(e) => {
2163 let duration = started_at.elapsed();
2164 let sanitized = providers::sanitize_api_error(&e.to_string());
2165
2166 state
2167 .observer
2168 .record_event(&crate::observability::ObserverEvent::LlmResponse {
2169 provider: provider_label.clone(),
2170 model: model_label.clone(),
2171 duration,
2172 success: false,
2173 error_message: Some(sanitized.clone()),
2174 input_tokens: None,
2175 output_tokens: None,
2176 });
2177 state.observer.record_metric(
2178 &crate::observability::traits::ObserverMetric::RequestLatency(duration),
2179 );
2180 state
2181 .observer
2182 .record_event(&crate::observability::ObserverEvent::Error {
2183 component: "gateway".to_string(),
2184 message: sanitized.clone(),
2185 });
2186 state
2187 .observer
2188 .record_event(&crate::observability::ObserverEvent::AgentEnd {
2189 provider: provider_label,
2190 model: model_label,
2191 duration,
2192 tokens_used: None,
2193 cost_usd: None,
2194 });
2195
2196 tracing::error!("Webhook provider error: {}", sanitized);
2197 let err = serde_json::json!({"error": "LLM request failed"});
2198 (StatusCode::INTERNAL_SERVER_ERROR, Json(err))
2199 }
2200 }
2201}
2202
2203#[derive(serde::Deserialize)]
2205pub struct WhatsAppVerifyQuery {
2206 #[serde(rename = "hub.mode")]
2207 pub mode: Option<String>,
2208 #[serde(rename = "hub.verify_token")]
2209 pub verify_token: Option<String>,
2210 #[serde(rename = "hub.challenge")]
2211 pub challenge: Option<String>,
2212}
2213
2214async fn handle_whatsapp_verify(
2216 State(state): State<AppState>,
2217 Query(params): Query<WhatsAppVerifyQuery>,
2218) -> impl IntoResponse {
2219 let Some(ref wa) = state.whatsapp else {
2220 return (StatusCode::NOT_FOUND, "WhatsApp not configured".to_string());
2221 };
2222
2223 let token_matches = params
2225 .verify_token
2226 .as_deref()
2227 .is_some_and(|t| constant_time_eq(t, wa.verify_token()));
2228 if params.mode.as_deref() == Some("subscribe") && token_matches {
2229 if let Some(ch) = params.challenge {
2230 tracing::info!("WhatsApp webhook verified successfully");
2231 return (StatusCode::OK, ch);
2232 }
2233 return (StatusCode::BAD_REQUEST, "Missing hub.challenge".to_string());
2234 }
2235
2236 tracing::warn!("WhatsApp webhook verification failed — token mismatch");
2237 (StatusCode::FORBIDDEN, "Forbidden".to_string())
2238}
2239
2240pub fn verify_whatsapp_signature(app_secret: &str, body: &[u8], signature_header: &str) -> bool {
2244 use hmac::{Hmac, Mac};
2245 use sha2::Sha256;
2246
2247 let Some(hex_sig) = signature_header.strip_prefix("sha256=") else {
2249 return false;
2250 };
2251
2252 let Ok(expected) = hex::decode(hex_sig) else {
2254 return false;
2255 };
2256
2257 let Ok(mut mac) = Hmac::<Sha256>::new_from_slice(app_secret.as_bytes()) else {
2259 return false;
2260 };
2261 mac.update(body);
2262
2263 mac.verify_slice(&expected).is_ok()
2265}
2266
2267async fn handle_whatsapp_message(
2269 State(state): State<AppState>,
2270 headers: HeaderMap,
2271 body: Bytes,
2272) -> impl IntoResponse {
2273 let Some(ref wa) = state.whatsapp else {
2274 return (
2275 StatusCode::NOT_FOUND,
2276 Json(serde_json::json!({"error": "WhatsApp not configured"})),
2277 );
2278 };
2279
2280 if let Some(ref app_secret) = state.whatsapp_app_secret {
2282 let signature = headers
2283 .get("X-Hub-Signature-256")
2284 .and_then(|v| v.to_str().ok())
2285 .unwrap_or("");
2286
2287 if !verify_whatsapp_signature(app_secret, &body, signature) {
2288 tracing::warn!(
2289 "WhatsApp webhook signature verification failed (signature: {})",
2290 if signature.is_empty() {
2291 "missing"
2292 } else {
2293 "invalid"
2294 }
2295 );
2296 return (
2297 StatusCode::UNAUTHORIZED,
2298 Json(serde_json::json!({"error": "Invalid signature"})),
2299 );
2300 }
2301 }
2302
2303 let Ok(payload) = serde_json::from_slice::<serde_json::Value>(&body) else {
2305 return (
2306 StatusCode::BAD_REQUEST,
2307 Json(serde_json::json!({"error": "Invalid JSON payload"})),
2308 );
2309 };
2310
2311 let messages = wa.parse_webhook_payload(&payload);
2313
2314 if messages.is_empty() {
2315 return (StatusCode::OK, Json(serde_json::json!({"status": "ok"})));
2317 }
2318
2319 for msg in &messages {
2321 tracing::info!(
2322 "WhatsApp message from {}: {}",
2323 msg.sender,
2324 truncate_with_ellipsis(&msg.content, 50)
2325 );
2326 let session_id = sender_session_id("whatsapp", msg);
2327
2328 if state.auto_save && !memory::should_skip_autosave_content(&msg.content) {
2330 let key = whatsapp_memory_key(msg);
2331 let _ = state
2332 .mem
2333 .store(
2334 &key,
2335 &msg.content,
2336 MemoryCategory::Conversation,
2337 Some(&session_id),
2338 )
2339 .await;
2340 }
2341
2342 match Box::pin(run_gateway_chat_with_tools(
2343 &state,
2344 &msg.content,
2345 Some(&session_id),
2346 ))
2347 .await
2348 {
2349 Ok(response) => {
2350 if let Err(e) = wa
2352 .send(&SendMessage::new(response, &msg.reply_target))
2353 .await
2354 {
2355 tracing::error!("Failed to send WhatsApp reply: {e}");
2356 }
2357 }
2358 Err(e) => {
2359 tracing::error!("LLM error for WhatsApp message: {e:#}");
2360 let _ = wa
2361 .send(&SendMessage::new(
2362 "Sorry, I couldn't process your message right now.",
2363 &msg.reply_target,
2364 ))
2365 .await;
2366 }
2367 }
2368 }
2369
2370 (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
2372}
2373
2374async fn handle_linq_webhook(
2376 State(state): State<AppState>,
2377 headers: HeaderMap,
2378 body: Bytes,
2379) -> impl IntoResponse {
2380 let Some(ref linq) = state.linq else {
2381 return (
2382 StatusCode::NOT_FOUND,
2383 Json(serde_json::json!({"error": "Linq not configured"})),
2384 );
2385 };
2386
2387 let body_str = String::from_utf8_lossy(&body);
2388
2389 if let Some(ref signing_secret) = state.linq_signing_secret {
2391 let timestamp = headers
2392 .get("X-Webhook-Timestamp")
2393 .and_then(|v| v.to_str().ok())
2394 .unwrap_or("");
2395
2396 let signature = headers
2397 .get("X-Webhook-Signature")
2398 .and_then(|v| v.to_str().ok())
2399 .unwrap_or("");
2400
2401 if !crate::channels::linq::verify_linq_signature(
2402 signing_secret,
2403 &body_str,
2404 timestamp,
2405 signature,
2406 ) {
2407 tracing::warn!(
2408 "Linq webhook signature verification failed (signature: {})",
2409 if signature.is_empty() {
2410 "missing"
2411 } else {
2412 "invalid"
2413 }
2414 );
2415 return (
2416 StatusCode::UNAUTHORIZED,
2417 Json(serde_json::json!({"error": "Invalid signature"})),
2418 );
2419 }
2420 }
2421
2422 let Ok(payload) = serde_json::from_slice::<serde_json::Value>(&body) else {
2424 return (
2425 StatusCode::BAD_REQUEST,
2426 Json(serde_json::json!({"error": "Invalid JSON payload"})),
2427 );
2428 };
2429
2430 let messages = linq.parse_webhook_payload(&payload);
2432
2433 if messages.is_empty() {
2434 return (StatusCode::OK, Json(serde_json::json!({"status": "ok"})));
2436 }
2437
2438 for msg in &messages {
2440 tracing::info!(
2441 "Linq message from {}: {}",
2442 msg.sender,
2443 truncate_with_ellipsis(&msg.content, 50)
2444 );
2445 let session_id = sender_session_id("linq", msg);
2446
2447 if state.auto_save && !memory::should_skip_autosave_content(&msg.content) {
2449 let key = linq_memory_key(msg);
2450 let _ = state
2451 .mem
2452 .store(
2453 &key,
2454 &msg.content,
2455 MemoryCategory::Conversation,
2456 Some(&session_id),
2457 )
2458 .await;
2459 }
2460
2461 match Box::pin(run_gateway_chat_with_tools(
2463 &state,
2464 &msg.content,
2465 Some(&session_id),
2466 ))
2467 .await
2468 {
2469 Ok(response) => {
2470 if let Err(e) = linq
2472 .send(&SendMessage::new(response, &msg.reply_target))
2473 .await
2474 {
2475 tracing::error!("Failed to send Linq reply: {e}");
2476 }
2477 }
2478 Err(e) => {
2479 tracing::error!("LLM error for Linq message: {e:#}");
2480 let _ = linq
2481 .send(&SendMessage::new(
2482 "Sorry, I couldn't process your message right now.",
2483 &msg.reply_target,
2484 ))
2485 .await;
2486 }
2487 }
2488 }
2489
2490 (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
2492}
2493
2494async fn handle_wati_verify(
2496 State(state): State<AppState>,
2497 Query(params): Query<WatiVerifyQuery>,
2498) -> impl IntoResponse {
2499 if state.wati.is_none() {
2500 return (StatusCode::NOT_FOUND, "WATI not configured".to_string());
2501 }
2502
2503 if let Some(challenge) = params.challenge {
2505 tracing::info!("WATI webhook verified successfully");
2506 return (StatusCode::OK, challenge);
2507 }
2508
2509 (StatusCode::BAD_REQUEST, "Missing hub.challenge".to_string())
2510}
2511
2512#[derive(Debug, serde::Deserialize)]
2513pub struct WatiVerifyQuery {
2514 #[serde(rename = "hub.challenge")]
2515 pub challenge: Option<String>,
2516}
2517
2518async fn handle_wati_webhook(State(state): State<AppState>, body: Bytes) -> impl IntoResponse {
2520 let Some(ref wati) = state.wati else {
2521 return (
2522 StatusCode::NOT_FOUND,
2523 Json(serde_json::json!({"error": "WATI not configured"})),
2524 );
2525 };
2526
2527 let Ok(payload) = serde_json::from_slice::<serde_json::Value>(&body) else {
2529 return (
2530 StatusCode::BAD_REQUEST,
2531 Json(serde_json::json!({"error": "Invalid JSON payload"})),
2532 );
2533 };
2534
2535 let msg_type = payload.get("type").and_then(|v| v.as_str()).unwrap_or("");
2537
2538 let messages = if matches!(msg_type, "audio" | "voice") {
2539 if let Some(transcript) = wati.try_transcribe_audio(&payload).await {
2541 wati.parse_audio_as_message(&payload, transcript)
2542 } else {
2543 vec![]
2544 }
2545 } else {
2546 wati.parse_webhook_payload(&payload)
2547 };
2548
2549 if messages.is_empty() {
2550 return (StatusCode::OK, Json(serde_json::json!({"status": "ok"})));
2551 }
2552
2553 for msg in &messages {
2555 tracing::info!(
2556 "WATI message from {}: {}",
2557 msg.sender,
2558 truncate_with_ellipsis(&msg.content, 50)
2559 );
2560 let session_id = sender_session_id("wati", msg);
2561
2562 if state.auto_save && !memory::should_skip_autosave_content(&msg.content) {
2564 let key = wati_memory_key(msg);
2565 let _ = state
2566 .mem
2567 .store(
2568 &key,
2569 &msg.content,
2570 MemoryCategory::Conversation,
2571 Some(&session_id),
2572 )
2573 .await;
2574 }
2575
2576 match Box::pin(run_gateway_chat_with_tools(
2578 &state,
2579 &msg.content,
2580 Some(&session_id),
2581 ))
2582 .await
2583 {
2584 Ok(response) => {
2585 if let Err(e) = wati
2587 .send(&SendMessage::new(response, &msg.reply_target))
2588 .await
2589 {
2590 tracing::error!("Failed to send WATI reply: {e}");
2591 }
2592 }
2593 Err(e) => {
2594 tracing::error!("LLM error for WATI message: {e:#}");
2595 let _ = wati
2596 .send(&SendMessage::new(
2597 "Sorry, I couldn't process your message right now.",
2598 &msg.reply_target,
2599 ))
2600 .await;
2601 }
2602 }
2603 }
2604
2605 (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
2607}
2608
2609async fn handle_nextcloud_talk_webhook(
2611 State(state): State<AppState>,
2612 headers: HeaderMap,
2613 body: Bytes,
2614) -> impl IntoResponse {
2615 let Some(ref nextcloud_talk) = state.nextcloud_talk else {
2616 return (
2617 StatusCode::NOT_FOUND,
2618 Json(serde_json::json!({"error": "Nextcloud Talk not configured"})),
2619 );
2620 };
2621
2622 let body_str = String::from_utf8_lossy(&body);
2623
2624 if let Some(ref webhook_secret) = state.nextcloud_talk_webhook_secret {
2626 let random = headers
2627 .get("X-Nextcloud-Talk-Random")
2628 .and_then(|v| v.to_str().ok())
2629 .unwrap_or("");
2630
2631 let signature = headers
2632 .get("X-Nextcloud-Talk-Signature")
2633 .and_then(|v| v.to_str().ok())
2634 .unwrap_or("");
2635
2636 if !crate::channels::nextcloud_talk::verify_nextcloud_talk_signature(
2637 webhook_secret,
2638 random,
2639 &body_str,
2640 signature,
2641 ) {
2642 tracing::warn!(
2643 "Nextcloud Talk webhook signature verification failed (signature: {})",
2644 if signature.is_empty() {
2645 "missing"
2646 } else {
2647 "invalid"
2648 }
2649 );
2650 return (
2651 StatusCode::UNAUTHORIZED,
2652 Json(serde_json::json!({"error": "Invalid signature"})),
2653 );
2654 }
2655 }
2656
2657 let Ok(payload) = serde_json::from_slice::<serde_json::Value>(&body) else {
2659 return (
2660 StatusCode::BAD_REQUEST,
2661 Json(serde_json::json!({"error": "Invalid JSON payload"})),
2662 );
2663 };
2664
2665 let messages = nextcloud_talk.parse_webhook_payload(&payload);
2667 if messages.is_empty() {
2668 return (StatusCode::OK, Json(serde_json::json!({"status": "ok"})));
2670 }
2671
2672 for msg in &messages {
2673 tracing::info!(
2674 "Nextcloud Talk message from {}: {}",
2675 msg.sender,
2676 truncate_with_ellipsis(&msg.content, 50)
2677 );
2678 let session_id = sender_session_id("nextcloud_talk", msg);
2679
2680 if state.auto_save && !memory::should_skip_autosave_content(&msg.content) {
2681 let key = nextcloud_talk_memory_key(msg);
2682 let _ = state
2683 .mem
2684 .store(
2685 &key,
2686 &msg.content,
2687 MemoryCategory::Conversation,
2688 Some(&session_id),
2689 )
2690 .await;
2691 }
2692
2693 match Box::pin(run_gateway_chat_with_tools(
2694 &state,
2695 &msg.content,
2696 Some(&session_id),
2697 ))
2698 .await
2699 {
2700 Ok(response) => {
2701 if let Err(e) = nextcloud_talk
2702 .send(&SendMessage::new(response, &msg.reply_target))
2703 .await
2704 {
2705 tracing::error!("Failed to send Nextcloud Talk reply: {e}");
2706 }
2707 }
2708 Err(e) => {
2709 tracing::error!("LLM error for Nextcloud Talk message: {e:#}");
2710 let _ = nextcloud_talk
2711 .send(&SendMessage::new(
2712 "Sorry, I couldn't process your message right now.",
2713 &msg.reply_target,
2714 ))
2715 .await;
2716 }
2717 }
2718 }
2719
2720 (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
2721}
2722
2723const GMAIL_WEBHOOK_MAX_BODY: usize = 1024 * 1024;
2726
2727async fn handle_gmail_push_webhook(
2729 State(state): State<AppState>,
2730 headers: HeaderMap,
2731 body: Bytes,
2732) -> impl IntoResponse {
2733 let Some(ref gmail_push) = state.gmail_push else {
2734 return (
2735 StatusCode::NOT_FOUND,
2736 Json(serde_json::json!({"error": "Gmail push not configured"})),
2737 );
2738 };
2739
2740 if body.len() > GMAIL_WEBHOOK_MAX_BODY {
2742 return (
2743 StatusCode::PAYLOAD_TOO_LARGE,
2744 Json(serde_json::json!({"error": "Request body too large"})),
2745 );
2746 }
2747
2748 let secret = gmail_push.resolve_webhook_secret();
2750 if !secret.is_empty() {
2751 let provided = headers
2752 .get(axum::http::header::AUTHORIZATION)
2753 .and_then(|v| v.to_str().ok())
2754 .and_then(|auth| auth.strip_prefix("Bearer "))
2755 .unwrap_or("");
2756
2757 if provided != secret {
2758 tracing::warn!("Gmail push webhook: unauthorized request");
2759 return (
2760 StatusCode::UNAUTHORIZED,
2761 Json(serde_json::json!({"error": "Unauthorized"})),
2762 );
2763 }
2764 }
2765
2766 let body_str = String::from_utf8_lossy(&body);
2767 let envelope: crate::channels::gmail_push::PubSubEnvelope =
2768 match serde_json::from_str(&body_str) {
2769 Ok(e) => e,
2770 Err(e) => {
2771 tracing::warn!("Gmail push webhook: invalid payload: {e}");
2772 return (
2773 StatusCode::BAD_REQUEST,
2774 Json(serde_json::json!({"error": "Invalid Pub/Sub envelope"})),
2775 );
2776 }
2777 };
2778
2779 let channel = Arc::clone(gmail_push);
2781 tokio::spawn(async move {
2782 if let Err(e) = channel.handle_notification(&envelope).await {
2783 tracing::error!("Gmail push notification processing failed: {e:#}");
2784 }
2785 });
2786
2787 (StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
2789}
2790
2791#[derive(serde::Serialize)]
2797struct AdminResponse {
2798 success: bool,
2799 message: String,
2800}
2801
2802fn require_localhost(peer: &SocketAddr) -> Result<(), (StatusCode, Json<serde_json::Value>)> {
2804 if peer.ip().is_loopback() {
2805 Ok(())
2806 } else {
2807 Err((
2808 StatusCode::FORBIDDEN,
2809 Json(serde_json::json!({
2810 "error": "Admin endpoints are restricted to localhost"
2811 })),
2812 ))
2813 }
2814}
2815
2816async fn handle_admin_shutdown(
2818 State(state): State<AppState>,
2819 ConnectInfo(peer): ConnectInfo<SocketAddr>,
2820) -> Result<impl IntoResponse, (StatusCode, Json<serde_json::Value>)> {
2821 require_localhost(&peer)?;
2822 tracing::info!("🔌 Admin shutdown request received — initiating graceful shutdown");
2823
2824 let body = AdminResponse {
2825 success: true,
2826 message: "Gateway shutdown initiated".to_string(),
2827 };
2828
2829 let _ = state.shutdown_tx.send(true);
2830
2831 Ok((StatusCode::OK, Json(body)))
2832}
2833
2834async fn handle_admin_paircode(
2836 State(state): State<AppState>,
2837 ConnectInfo(peer): ConnectInfo<SocketAddr>,
2838) -> Result<impl IntoResponse, (StatusCode, Json<serde_json::Value>)> {
2839 require_localhost(&peer)?;
2840 let code = state.pairing.pairing_code();
2841
2842 let body = if let Some(c) = code {
2843 serde_json::json!({
2844 "success": true,
2845 "pairing_required": state.pairing.require_pairing(),
2846 "pairing_code": c,
2847 "message": "Use this one-time code to pair"
2848 })
2849 } else {
2850 serde_json::json!({
2851 "success": true,
2852 "pairing_required": state.pairing.require_pairing(),
2853 "pairing_code": null,
2854 "message": if state.pairing.require_pairing() {
2855 "Pairing is active but no new code available (already paired or code expired)"
2856 } else {
2857 "Pairing is disabled for this gateway"
2858 }
2859 })
2860 };
2861
2862 Ok((StatusCode::OK, Json(body)))
2863}
2864
2865async fn handle_admin_paircode_new(
2867 State(state): State<AppState>,
2868 ConnectInfo(peer): ConnectInfo<SocketAddr>,
2869) -> Result<impl IntoResponse, (StatusCode, Json<serde_json::Value>)> {
2870 require_localhost(&peer)?;
2871 match state.pairing.generate_new_pairing_code() {
2872 Some(code) => {
2873 tracing::info!("🔐 New pairing code generated via admin endpoint");
2874 let body = serde_json::json!({
2875 "success": true,
2876 "pairing_required": state.pairing.require_pairing(),
2877 "pairing_code": code,
2878 "message": "New pairing code generated — use this one-time code to pair"
2879 });
2880 Ok((StatusCode::OK, Json(body)))
2881 }
2882 None => {
2883 let body = serde_json::json!({
2884 "success": false,
2885 "pairing_required": false,
2886 "pairing_code": null,
2887 "message": "Pairing is disabled for this gateway"
2888 });
2889 Ok((StatusCode::BAD_REQUEST, Json(body)))
2890 }
2891 }
2892}
2893
2894async fn handle_pair_code(
2902 State(state): State<AppState>,
2903 ConnectInfo(peer): ConnectInfo<SocketAddr>,
2904) -> Result<impl IntoResponse, (StatusCode, Json<serde_json::Value>)> {
2905 require_localhost(&peer)?;
2906
2907 let require = state.pairing.require_pairing();
2908 let is_paired = state.pairing.is_paired();
2909
2910 let code = if require && !is_paired {
2911 state.pairing.pairing_code()
2912 } else {
2913 None
2914 };
2915
2916 let body = serde_json::json!({
2917 "success": true,
2918 "pairing_required": require,
2919 "pairing_code": code,
2920 });
2921
2922 Ok((StatusCode::OK, Json(body)))
2923}
2924
2925#[cfg(test)]
2926mod tests {
2927 use super::*;
2928 use crate::channels::traits::ChannelMessage;
2929 use crate::memory::{Memory, MemoryCategory, MemoryEntry};
2930 use crate::providers::Provider;
2931 use async_trait::async_trait;
2932 use axum::http::HeaderValue;
2933 use axum::response::IntoResponse;
2934 use http_body_util::BodyExt;
2935 use parking_lot::Mutex;
2936 use std::sync::atomic::{AtomicUsize, Ordering};
2937
2938 fn generate_test_secret() -> String {
2940 let bytes: [u8; 32] = rand::random();
2941 hex::encode(bytes)
2942 }
2943
2944 #[test]
2945 fn security_body_limit_is_64kb() {
2946 assert_eq!(MAX_BODY_SIZE, 65_536);
2947 }
2948
2949 #[test]
2950 fn security_timeout_default_is_30_seconds() {
2951 assert_eq!(REQUEST_TIMEOUT_SECS, 30);
2952 }
2953
2954 #[test]
2955 fn gateway_timeout_falls_back_to_default() {
2956 unsafe { std::env::remove_var("CONSTRUCT_GATEWAY_TIMEOUT_SECS") };
2959 assert_eq!(gateway_request_timeout_secs(), 30);
2960 }
2961
2962 #[test]
2963 fn webhook_body_requires_message_field() {
2964 let valid = r#"{"message": "hello"}"#;
2965 let parsed: Result<WebhookBody, _> = serde_json::from_str(valid);
2966 assert!(parsed.is_ok());
2967 assert_eq!(parsed.unwrap().message, "hello");
2968
2969 let missing = r#"{"other": "field"}"#;
2970 let parsed: Result<WebhookBody, _> = serde_json::from_str(missing);
2971 assert!(parsed.is_err());
2972 }
2973
2974 #[test]
2975 fn whatsapp_query_fields_are_optional() {
2976 let q = WhatsAppVerifyQuery {
2977 mode: None,
2978 verify_token: None,
2979 challenge: None,
2980 };
2981 assert!(q.mode.is_none());
2982 }
2983
2984 #[test]
2985 fn app_state_is_clone() {
2986 fn assert_clone<T: Clone>() {}
2987 assert_clone::<AppState>();
2988 }
2989
2990 #[tokio::test]
2991 async fn metrics_endpoint_returns_hint_when_prometheus_is_disabled() {
2992 let state = AppState {
2993 config: Arc::new(Mutex::new(Config::default())),
2994 provider: Arc::new(MockProvider::default()),
2995 model: "test-model".into(),
2996 temperature: 0.0,
2997 mem: Arc::new(MockMemory),
2998 auto_save: false,
2999 webhook_secret_hash: None,
3000 pairing: Arc::new(PairingGuard::new(false, &[])),
3001 trust_forwarded_headers: false,
3002 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3003 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3004 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3005 whatsapp: None,
3006 whatsapp_app_secret: None,
3007 linq: None,
3008 linq_signing_secret: None,
3009 nextcloud_talk: None,
3010 nextcloud_talk_webhook_secret: None,
3011 wati: None,
3012 gmail_push: None,
3013 observer: Arc::new(crate::observability::NoopObserver),
3014 tools_registry: Arc::new(Vec::new()),
3015 cost_tracker: None,
3016 audit_logger: None,
3017 event_tx: tokio::sync::broadcast::channel(16).0,
3018 shutdown_tx: tokio::sync::watch::channel(false).0,
3019 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3020 path_prefix: String::new(),
3021 session_backend: None,
3022 session_queue: std::sync::Arc::new(
3023 crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3024 ),
3025 device_registry: None,
3026 pending_pairings: None,
3027 canvas_store: CanvasStore::new(),
3028 mcp_registry: None,
3029 approval_registry: approval_registry::global(),
3030 mcp_local_url: None,
3031 #[cfg(feature = "webauthn")]
3032 webauthn: None,
3033 };
3034
3035 let response = handle_metrics(State(state)).await.into_response();
3036 assert_eq!(response.status(), StatusCode::OK);
3037 assert_eq!(
3038 response
3039 .headers()
3040 .get(header::CONTENT_TYPE)
3041 .and_then(|value| value.to_str().ok()),
3042 Some(PROMETHEUS_CONTENT_TYPE)
3043 );
3044
3045 let body = response.into_body().collect().await.unwrap().to_bytes();
3046 let text = String::from_utf8(body.to_vec()).unwrap();
3047 assert!(text.contains("Prometheus backend not enabled"));
3048 }
3049
3050 #[cfg(feature = "observability-prometheus")]
3051 #[tokio::test]
3052 async fn metrics_endpoint_renders_prometheus_output() {
3053 let event_tx = tokio::sync::broadcast::channel(16).0;
3054 let wrapped = sse::BroadcastObserver::new(
3055 Box::new(crate::observability::PrometheusObserver::new()),
3056 event_tx.clone(),
3057 );
3058 crate::observability::Observer::record_event(
3059 &wrapped,
3060 &crate::observability::ObserverEvent::HeartbeatTick,
3061 );
3062
3063 let observer: Arc<dyn crate::observability::Observer> = Arc::new(wrapped);
3064 let state = AppState {
3065 config: Arc::new(Mutex::new(Config::default())),
3066 provider: Arc::new(MockProvider::default()),
3067 model: "test-model".into(),
3068 temperature: 0.0,
3069 mem: Arc::new(MockMemory),
3070 auto_save: false,
3071 webhook_secret_hash: None,
3072 pairing: Arc::new(PairingGuard::new(false, &[])),
3073 trust_forwarded_headers: false,
3074 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3075 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3076 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3077 whatsapp: None,
3078 whatsapp_app_secret: None,
3079 linq: None,
3080 linq_signing_secret: None,
3081 nextcloud_talk: None,
3082 nextcloud_talk_webhook_secret: None,
3083 wati: None,
3084 gmail_push: None,
3085 observer,
3086 tools_registry: Arc::new(Vec::new()),
3087 cost_tracker: None,
3088 audit_logger: None,
3089 event_tx,
3090 shutdown_tx: tokio::sync::watch::channel(false).0,
3091 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3092 path_prefix: String::new(),
3093 session_backend: None,
3094 session_queue: std::sync::Arc::new(
3095 crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3096 ),
3097 device_registry: None,
3098 pending_pairings: None,
3099 canvas_store: CanvasStore::new(),
3100 mcp_registry: None,
3101 approval_registry: approval_registry::global(),
3102 mcp_local_url: None,
3103 #[cfg(feature = "webauthn")]
3104 webauthn: None,
3105 };
3106
3107 let response = handle_metrics(State(state)).await.into_response();
3108 assert_eq!(response.status(), StatusCode::OK);
3109
3110 let body = response.into_body().collect().await.unwrap().to_bytes();
3111 let text = String::from_utf8(body.to_vec()).unwrap();
3112 assert!(text.contains("construct_heartbeat_ticks_total 1"));
3113 }
3114
3115 #[test]
3116 fn gateway_rate_limiter_blocks_after_limit() {
3117 let limiter = GatewayRateLimiter::new(2, 2, 100);
3118 assert!(limiter.allow_pair("127.0.0.1"));
3119 assert!(limiter.allow_pair("127.0.0.1"));
3120 assert!(!limiter.allow_pair("127.0.0.1"));
3121 }
3122
3123 #[test]
3124 fn rate_limiter_sweep_removes_stale_entries() {
3125 let limiter = SlidingWindowRateLimiter::new(10, Duration::from_secs(60), 100);
3126 assert!(limiter.allow("ip-1"));
3128 assert!(limiter.allow("ip-2"));
3129 assert!(limiter.allow("ip-3"));
3130
3131 {
3132 let guard = limiter.requests.lock();
3133 assert_eq!(guard.0.len(), 3);
3134 }
3135
3136 {
3138 let mut guard = limiter.requests.lock();
3139 guard.1 = Instant::now()
3140 .checked_sub(Duration::from_secs(RATE_LIMITER_SWEEP_INTERVAL_SECS + 1))
3141 .unwrap();
3142 guard.0.get_mut("ip-2").unwrap().clear();
3144 guard.0.get_mut("ip-3").unwrap().clear();
3145 }
3146
3147 assert!(limiter.allow("ip-1"));
3149
3150 {
3151 let guard = limiter.requests.lock();
3152 assert_eq!(guard.0.len(), 1, "Stale entries should have been swept");
3153 assert!(guard.0.contains_key("ip-1"));
3154 }
3155 }
3156
3157 #[test]
3158 fn rate_limiter_zero_limit_always_allows() {
3159 let limiter = SlidingWindowRateLimiter::new(0, Duration::from_secs(60), 10);
3160 for _ in 0..100 {
3161 assert!(limiter.allow("any-key"));
3162 }
3163 }
3164
3165 #[test]
3166 fn idempotency_store_rejects_duplicate_key() {
3167 let store = IdempotencyStore::new(Duration::from_secs(30), 10);
3168 assert!(store.record_if_new("req-1"));
3169 assert!(!store.record_if_new("req-1"));
3170 assert!(store.record_if_new("req-2"));
3171 }
3172
3173 #[test]
3174 fn rate_limiter_bounded_cardinality_evicts_oldest_key() {
3175 let limiter = SlidingWindowRateLimiter::new(5, Duration::from_secs(60), 2);
3176 assert!(limiter.allow("ip-1"));
3177 assert!(limiter.allow("ip-2"));
3178 assert!(limiter.allow("ip-3"));
3179
3180 let guard = limiter.requests.lock();
3181 assert_eq!(guard.0.len(), 2);
3182 assert!(guard.0.contains_key("ip-2"));
3183 assert!(guard.0.contains_key("ip-3"));
3184 }
3185
3186 #[test]
3187 fn idempotency_store_bounded_cardinality_evicts_oldest_key() {
3188 let store = IdempotencyStore::new(Duration::from_secs(300), 2);
3189 assert!(store.record_if_new("k1"));
3190 std::thread::sleep(Duration::from_millis(2));
3191 assert!(store.record_if_new("k2"));
3192 std::thread::sleep(Duration::from_millis(2));
3193 assert!(store.record_if_new("k3"));
3194
3195 let keys = store.keys.lock();
3196 assert_eq!(keys.len(), 2);
3197 assert!(!keys.contains_key("k1"));
3198 assert!(keys.contains_key("k2"));
3199 assert!(keys.contains_key("k3"));
3200 }
3201
3202 #[test]
3203 fn client_key_defaults_to_peer_addr_when_untrusted_proxy_mode() {
3204 let peer = SocketAddr::from(([10, 0, 0, 5], 42617));
3205 let mut headers = HeaderMap::new();
3206 headers.insert(
3207 "X-Forwarded-For",
3208 HeaderValue::from_static("198.51.100.10, 203.0.113.11"),
3209 );
3210
3211 let key = client_key_from_request(Some(peer), &headers, false);
3212 assert_eq!(key, "10.0.0.5");
3213 }
3214
3215 #[test]
3216 fn client_key_uses_forwarded_ip_only_in_trusted_proxy_mode() {
3217 let peer = SocketAddr::from(([10, 0, 0, 5], 42617));
3220 let mut headers = HeaderMap::new();
3221 headers.insert(
3222 "X-Forwarded-For",
3223 HeaderValue::from_static("198.51.100.10, 203.0.113.11"),
3224 );
3225
3226 let key = client_key_from_request(Some(peer), &headers, true);
3227 assert_eq!(key, "203.0.113.11");
3228 }
3229
3230 #[test]
3231 fn client_key_spoofed_leftmost_xff_does_not_bypass_trust() {
3232 let peer = SocketAddr::from(([10, 0, 0, 5], 42617));
3237 let mut headers = HeaderMap::new();
3238 headers.insert(
3239 "X-Forwarded-For",
3240 HeaderValue::from_static("127.0.0.1, 203.0.113.11"),
3241 );
3242
3243 let key = client_key_from_request(Some(peer), &headers, true);
3244 assert_eq!(key, "203.0.113.11");
3245 }
3246
3247 #[test]
3248 fn client_key_falls_back_to_peer_when_forwarded_header_invalid() {
3249 let peer = SocketAddr::from(([10, 0, 0, 5], 42617));
3250 let mut headers = HeaderMap::new();
3251 headers.insert("X-Forwarded-For", HeaderValue::from_static("garbage-value"));
3252
3253 let key = client_key_from_request(Some(peer), &headers, true);
3254 assert_eq!(key, "10.0.0.5");
3255 }
3256
3257 #[test]
3258 fn normalize_max_keys_uses_fallback_for_zero() {
3259 assert_eq!(normalize_max_keys(0, 10_000), 10_000);
3260 assert_eq!(normalize_max_keys(0, 0), 1);
3261 }
3262
3263 #[test]
3264 fn normalize_max_keys_preserves_nonzero_values() {
3265 assert_eq!(normalize_max_keys(2_048, 10_000), 2_048);
3266 assert_eq!(normalize_max_keys(1, 10_000), 1);
3267 }
3268
3269 #[tokio::test]
3270 async fn persist_pairing_tokens_writes_config_tokens() {
3271 let temp = tempfile::tempdir().unwrap();
3272 let config_path = temp.path().join("config.toml");
3273 let workspace_path = temp.path().join("workspace");
3274
3275 let mut config = Config::default();
3276 config.config_path = config_path.clone();
3277 config.workspace_dir = workspace_path;
3278 config.save().await.unwrap();
3279
3280 let guard = PairingGuard::new(true, &[]);
3281 let code = guard.pairing_code().unwrap();
3282 let token = guard.try_pair(&code, "test_client").await.unwrap().unwrap();
3283 assert!(guard.is_authenticated(&token));
3284
3285 let shared_config = Arc::new(Mutex::new(config));
3286 Box::pin(persist_pairing_tokens(shared_config.clone(), &guard))
3287 .await
3288 .unwrap();
3289
3290 let plaintext = {
3292 let in_memory = shared_config.lock();
3293 assert_eq!(in_memory.gateway.paired_tokens.len(), 1);
3294 in_memory.gateway.paired_tokens[0].clone()
3295 };
3296 assert_eq!(plaintext.len(), 64);
3297 assert!(plaintext.chars().all(|c: char| c.is_ascii_hexdigit()));
3298
3299 let saved = tokio::fs::read_to_string(config_path).await.unwrap();
3301 let raw_parsed: Config = toml::from_str(&saved).unwrap();
3302 assert_eq!(raw_parsed.gateway.paired_tokens.len(), 1);
3303 let on_disk = &raw_parsed.gateway.paired_tokens[0];
3304 assert!(
3305 crate::security::SecretStore::is_encrypted(on_disk),
3306 "paired_token should be encrypted on disk"
3307 );
3308 }
3309
3310 #[test]
3311 fn webhook_memory_key_is_unique() {
3312 let key1 = webhook_memory_key();
3313 let key2 = webhook_memory_key();
3314
3315 assert!(key1.starts_with("webhook_msg_"));
3316 assert!(key2.starts_with("webhook_msg_"));
3317 assert_ne!(key1, key2);
3318 }
3319
3320 #[test]
3321 fn whatsapp_memory_key_includes_sender_and_message_id() {
3322 let msg = ChannelMessage {
3323 id: "wamid-123".into(),
3324 sender: "+1234567890".into(),
3325 reply_target: "+1234567890".into(),
3326 content: "hello".into(),
3327 channel: "whatsapp".into(),
3328 timestamp: 1,
3329 thread_ts: None,
3330 interruption_scope_id: None,
3331 attachments: vec![],
3332 };
3333
3334 let key = whatsapp_memory_key(&msg);
3335 assert_eq!(key, "whatsapp_+1234567890_wamid-123");
3336 }
3337
3338 #[derive(Default)]
3339 struct MockMemory;
3340
3341 #[async_trait]
3342 impl Memory for MockMemory {
3343 fn name(&self) -> &str {
3344 "mock"
3345 }
3346
3347 async fn store(
3348 &self,
3349 _key: &str,
3350 _content: &str,
3351 _category: MemoryCategory,
3352 _session_id: Option<&str>,
3353 ) -> anyhow::Result<()> {
3354 Ok(())
3355 }
3356
3357 async fn recall(
3358 &self,
3359 _query: &str,
3360 _limit: usize,
3361 _session_id: Option<&str>,
3362 _since: Option<&str>,
3363 _until: Option<&str>,
3364 ) -> anyhow::Result<Vec<MemoryEntry>> {
3365 Ok(Vec::new())
3366 }
3367
3368 async fn get(&self, _key: &str) -> anyhow::Result<Option<MemoryEntry>> {
3369 Ok(None)
3370 }
3371
3372 async fn list(
3373 &self,
3374 _category: Option<&MemoryCategory>,
3375 _session_id: Option<&str>,
3376 ) -> anyhow::Result<Vec<MemoryEntry>> {
3377 Ok(Vec::new())
3378 }
3379
3380 async fn forget(&self, _key: &str) -> anyhow::Result<bool> {
3381 Ok(false)
3382 }
3383
3384 async fn count(&self) -> anyhow::Result<usize> {
3385 Ok(0)
3386 }
3387
3388 async fn health_check(&self) -> bool {
3389 true
3390 }
3391 }
3392
3393 #[derive(Default)]
3394 struct MockProvider {
3395 calls: AtomicUsize,
3396 }
3397
3398 #[async_trait]
3399 impl Provider for MockProvider {
3400 async fn chat_with_system(
3401 &self,
3402 _system_prompt: Option<&str>,
3403 _message: &str,
3404 _model: &str,
3405 _temperature: f64,
3406 ) -> anyhow::Result<String> {
3407 self.calls.fetch_add(1, Ordering::SeqCst);
3408 Ok("ok".into())
3409 }
3410 }
3411
3412 #[derive(Default)]
3413 struct TrackingMemory {
3414 keys: Mutex<Vec<String>>,
3415 }
3416
3417 #[async_trait]
3418 impl Memory for TrackingMemory {
3419 fn name(&self) -> &str {
3420 "tracking"
3421 }
3422
3423 async fn store(
3424 &self,
3425 key: &str,
3426 _content: &str,
3427 _category: MemoryCategory,
3428 _session_id: Option<&str>,
3429 ) -> anyhow::Result<()> {
3430 self.keys.lock().push(key.to_string());
3431 Ok(())
3432 }
3433
3434 async fn recall(
3435 &self,
3436 _query: &str,
3437 _limit: usize,
3438 _session_id: Option<&str>,
3439 _since: Option<&str>,
3440 _until: Option<&str>,
3441 ) -> anyhow::Result<Vec<MemoryEntry>> {
3442 Ok(Vec::new())
3443 }
3444
3445 async fn get(&self, _key: &str) -> anyhow::Result<Option<MemoryEntry>> {
3446 Ok(None)
3447 }
3448
3449 async fn list(
3450 &self,
3451 _category: Option<&MemoryCategory>,
3452 _session_id: Option<&str>,
3453 ) -> anyhow::Result<Vec<MemoryEntry>> {
3454 Ok(Vec::new())
3455 }
3456
3457 async fn forget(&self, _key: &str) -> anyhow::Result<bool> {
3458 Ok(false)
3459 }
3460
3461 async fn count(&self) -> anyhow::Result<usize> {
3462 let size = self.keys.lock().len();
3463 Ok(size)
3464 }
3465
3466 async fn health_check(&self) -> bool {
3467 true
3468 }
3469 }
3470
3471 fn test_connect_info() -> ConnectInfo<SocketAddr> {
3472 ConnectInfo(SocketAddr::from(([127, 0, 0, 1], 30_300)))
3473 }
3474
3475 #[tokio::test]
3476 async fn webhook_idempotency_skips_duplicate_provider_calls() {
3477 let provider_impl = Arc::new(MockProvider::default());
3478 let provider: Arc<dyn Provider> = provider_impl.clone();
3479 let memory: Arc<dyn Memory> = Arc::new(MockMemory);
3480
3481 let state = AppState {
3482 config: Arc::new(Mutex::new(Config::default())),
3483 provider,
3484 model: "test-model".into(),
3485 temperature: 0.0,
3486 mem: memory,
3487 auto_save: false,
3488 webhook_secret_hash: None,
3489 pairing: Arc::new(PairingGuard::new(false, &[])),
3490 trust_forwarded_headers: false,
3491 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3492 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3493 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3494 whatsapp: None,
3495 whatsapp_app_secret: None,
3496 linq: None,
3497 linq_signing_secret: None,
3498 nextcloud_talk: None,
3499 nextcloud_talk_webhook_secret: None,
3500 wati: None,
3501 gmail_push: None,
3502 observer: Arc::new(crate::observability::NoopObserver),
3503 tools_registry: Arc::new(Vec::new()),
3504 cost_tracker: None,
3505 audit_logger: None,
3506 event_tx: tokio::sync::broadcast::channel(16).0,
3507 shutdown_tx: tokio::sync::watch::channel(false).0,
3508 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3509 path_prefix: String::new(),
3510 session_backend: None,
3511 session_queue: std::sync::Arc::new(
3512 crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3513 ),
3514 device_registry: None,
3515 pending_pairings: None,
3516 canvas_store: CanvasStore::new(),
3517 mcp_registry: None,
3518 approval_registry: approval_registry::global(),
3519 mcp_local_url: None,
3520 #[cfg(feature = "webauthn")]
3521 webauthn: None,
3522 };
3523
3524 let mut headers = HeaderMap::new();
3525 headers.insert("X-Idempotency-Key", HeaderValue::from_static("abc-123"));
3526
3527 let body = Ok(Json(WebhookBody {
3528 message: "hello".into(),
3529 }));
3530 let first = handle_webhook(
3531 State(state.clone()),
3532 test_connect_info(),
3533 headers.clone(),
3534 body,
3535 )
3536 .await
3537 .into_response();
3538 assert_eq!(first.status(), StatusCode::OK);
3539
3540 let body = Ok(Json(WebhookBody {
3541 message: "hello".into(),
3542 }));
3543 let second = handle_webhook(State(state), test_connect_info(), headers, body)
3544 .await
3545 .into_response();
3546 assert_eq!(second.status(), StatusCode::OK);
3547
3548 let payload = second.into_body().collect().await.unwrap().to_bytes();
3549 let parsed: serde_json::Value = serde_json::from_slice(&payload).unwrap();
3550 assert_eq!(parsed["status"], "duplicate");
3551 assert_eq!(parsed["idempotent"], true);
3552 assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 1);
3553 }
3554
3555 #[tokio::test]
3556 async fn webhook_autosave_stores_distinct_keys_per_request() {
3557 let provider_impl = Arc::new(MockProvider::default());
3558 let provider: Arc<dyn Provider> = provider_impl.clone();
3559
3560 let tracking_impl = Arc::new(TrackingMemory::default());
3561 let memory: Arc<dyn Memory> = tracking_impl.clone();
3562
3563 let state = AppState {
3564 config: Arc::new(Mutex::new(Config::default())),
3565 provider,
3566 model: "test-model".into(),
3567 temperature: 0.0,
3568 mem: memory,
3569 auto_save: true,
3570 webhook_secret_hash: None,
3571 pairing: Arc::new(PairingGuard::new(false, &[])),
3572 trust_forwarded_headers: false,
3573 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3574 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3575 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3576 whatsapp: None,
3577 whatsapp_app_secret: None,
3578 linq: None,
3579 linq_signing_secret: None,
3580 nextcloud_talk: None,
3581 nextcloud_talk_webhook_secret: None,
3582 wati: None,
3583 gmail_push: None,
3584 observer: Arc::new(crate::observability::NoopObserver),
3585 tools_registry: Arc::new(Vec::new()),
3586 cost_tracker: None,
3587 audit_logger: None,
3588 event_tx: tokio::sync::broadcast::channel(16).0,
3589 shutdown_tx: tokio::sync::watch::channel(false).0,
3590 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3591 path_prefix: String::new(),
3592 session_backend: None,
3593 session_queue: std::sync::Arc::new(
3594 crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3595 ),
3596 device_registry: None,
3597 pending_pairings: None,
3598 canvas_store: CanvasStore::new(),
3599 mcp_registry: None,
3600 approval_registry: approval_registry::global(),
3601 mcp_local_url: None,
3602 #[cfg(feature = "webauthn")]
3603 webauthn: None,
3604 };
3605
3606 let headers = HeaderMap::new();
3607
3608 let body1 = Ok(Json(WebhookBody {
3609 message: "hello one".into(),
3610 }));
3611 let first = handle_webhook(
3612 State(state.clone()),
3613 test_connect_info(),
3614 headers.clone(),
3615 body1,
3616 )
3617 .await
3618 .into_response();
3619 assert_eq!(first.status(), StatusCode::OK);
3620
3621 let body2 = Ok(Json(WebhookBody {
3622 message: "hello two".into(),
3623 }));
3624 let second = handle_webhook(State(state), test_connect_info(), headers, body2)
3625 .await
3626 .into_response();
3627 assert_eq!(second.status(), StatusCode::OK);
3628
3629 let keys = tracking_impl.keys.lock().clone();
3630 assert_eq!(keys.len(), 2);
3631 assert_ne!(keys[0], keys[1]);
3632 assert!(keys[0].starts_with("webhook_msg_"));
3633 assert!(keys[1].starts_with("webhook_msg_"));
3634 assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 2);
3635 }
3636
3637 #[test]
3638 fn webhook_secret_hash_is_deterministic_and_nonempty() {
3639 let secret_a = generate_test_secret();
3640 let secret_b = generate_test_secret();
3641 let one = hash_webhook_secret(&secret_a);
3642 let two = hash_webhook_secret(&secret_a);
3643 let other = hash_webhook_secret(&secret_b);
3644
3645 assert_eq!(one, two);
3646 assert_ne!(one, other);
3647 assert_eq!(one.len(), 64);
3648 }
3649
3650 #[tokio::test]
3651 async fn webhook_secret_hash_rejects_missing_header() {
3652 let provider_impl = Arc::new(MockProvider::default());
3653 let provider: Arc<dyn Provider> = provider_impl.clone();
3654 let memory: Arc<dyn Memory> = Arc::new(MockMemory);
3655 let secret = generate_test_secret();
3656
3657 let state = AppState {
3658 config: Arc::new(Mutex::new(Config::default())),
3659 provider,
3660 model: "test-model".into(),
3661 temperature: 0.0,
3662 mem: memory,
3663 auto_save: false,
3664 webhook_secret_hash: Some(Arc::from(hash_webhook_secret(&secret))),
3665 pairing: Arc::new(PairingGuard::new(false, &[])),
3666 trust_forwarded_headers: false,
3667 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3668 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3669 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3670 whatsapp: None,
3671 whatsapp_app_secret: None,
3672 linq: None,
3673 linq_signing_secret: None,
3674 nextcloud_talk: None,
3675 nextcloud_talk_webhook_secret: None,
3676 wati: None,
3677 gmail_push: None,
3678 observer: Arc::new(crate::observability::NoopObserver),
3679 tools_registry: Arc::new(Vec::new()),
3680 cost_tracker: None,
3681 audit_logger: None,
3682 event_tx: tokio::sync::broadcast::channel(16).0,
3683 shutdown_tx: tokio::sync::watch::channel(false).0,
3684 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3685 path_prefix: String::new(),
3686 session_backend: None,
3687 session_queue: std::sync::Arc::new(
3688 crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3689 ),
3690 device_registry: None,
3691 pending_pairings: None,
3692 canvas_store: CanvasStore::new(),
3693 mcp_registry: None,
3694 approval_registry: approval_registry::global(),
3695 mcp_local_url: None,
3696 #[cfg(feature = "webauthn")]
3697 webauthn: None,
3698 };
3699
3700 let response = handle_webhook(
3701 State(state),
3702 test_connect_info(),
3703 HeaderMap::new(),
3704 Ok(Json(WebhookBody {
3705 message: "hello".into(),
3706 })),
3707 )
3708 .await
3709 .into_response();
3710
3711 assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
3712 assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 0);
3713 }
3714
3715 #[tokio::test]
3716 async fn webhook_secret_hash_rejects_invalid_header() {
3717 let provider_impl = Arc::new(MockProvider::default());
3718 let provider: Arc<dyn Provider> = provider_impl.clone();
3719 let memory: Arc<dyn Memory> = Arc::new(MockMemory);
3720 let valid_secret = generate_test_secret();
3721 let wrong_secret = generate_test_secret();
3722
3723 let state = AppState {
3724 config: Arc::new(Mutex::new(Config::default())),
3725 provider,
3726 model: "test-model".into(),
3727 temperature: 0.0,
3728 mem: memory,
3729 auto_save: false,
3730 webhook_secret_hash: Some(Arc::from(hash_webhook_secret(&valid_secret))),
3731 pairing: Arc::new(PairingGuard::new(false, &[])),
3732 trust_forwarded_headers: false,
3733 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3734 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3735 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3736 whatsapp: None,
3737 whatsapp_app_secret: None,
3738 linq: None,
3739 linq_signing_secret: None,
3740 nextcloud_talk: None,
3741 nextcloud_talk_webhook_secret: None,
3742 wati: None,
3743 gmail_push: None,
3744 observer: Arc::new(crate::observability::NoopObserver),
3745 tools_registry: Arc::new(Vec::new()),
3746 cost_tracker: None,
3747 audit_logger: None,
3748 event_tx: tokio::sync::broadcast::channel(16).0,
3749 shutdown_tx: tokio::sync::watch::channel(false).0,
3750 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3751 path_prefix: String::new(),
3752 session_backend: None,
3753 session_queue: std::sync::Arc::new(
3754 crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3755 ),
3756 device_registry: None,
3757 pending_pairings: None,
3758 canvas_store: CanvasStore::new(),
3759 mcp_registry: None,
3760 approval_registry: approval_registry::global(),
3761 mcp_local_url: None,
3762 #[cfg(feature = "webauthn")]
3763 webauthn: None,
3764 };
3765
3766 let mut headers = HeaderMap::new();
3767 headers.insert(
3768 "X-Webhook-Secret",
3769 HeaderValue::from_str(&wrong_secret).unwrap(),
3770 );
3771
3772 let response = handle_webhook(
3773 State(state),
3774 test_connect_info(),
3775 headers,
3776 Ok(Json(WebhookBody {
3777 message: "hello".into(),
3778 })),
3779 )
3780 .await
3781 .into_response();
3782
3783 assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
3784 assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 0);
3785 }
3786
3787 #[tokio::test]
3788 async fn webhook_secret_hash_accepts_valid_header() {
3789 let provider_impl = Arc::new(MockProvider::default());
3790 let provider: Arc<dyn Provider> = provider_impl.clone();
3791 let memory: Arc<dyn Memory> = Arc::new(MockMemory);
3792 let secret = generate_test_secret();
3793
3794 let state = AppState {
3795 config: Arc::new(Mutex::new(Config::default())),
3796 provider,
3797 model: "test-model".into(),
3798 temperature: 0.0,
3799 mem: memory,
3800 auto_save: false,
3801 webhook_secret_hash: Some(Arc::from(hash_webhook_secret(&secret))),
3802 pairing: Arc::new(PairingGuard::new(false, &[])),
3803 trust_forwarded_headers: false,
3804 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3805 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3806 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3807 whatsapp: None,
3808 whatsapp_app_secret: None,
3809 linq: None,
3810 linq_signing_secret: None,
3811 nextcloud_talk: None,
3812 nextcloud_talk_webhook_secret: None,
3813 wati: None,
3814 gmail_push: None,
3815 observer: Arc::new(crate::observability::NoopObserver),
3816 tools_registry: Arc::new(Vec::new()),
3817 cost_tracker: None,
3818 audit_logger: None,
3819 event_tx: tokio::sync::broadcast::channel(16).0,
3820 shutdown_tx: tokio::sync::watch::channel(false).0,
3821 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3822 path_prefix: String::new(),
3823 session_backend: None,
3824 session_queue: std::sync::Arc::new(
3825 crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3826 ),
3827 device_registry: None,
3828 pending_pairings: None,
3829 canvas_store: CanvasStore::new(),
3830 mcp_registry: None,
3831 approval_registry: approval_registry::global(),
3832 mcp_local_url: None,
3833 #[cfg(feature = "webauthn")]
3834 webauthn: None,
3835 };
3836
3837 let mut headers = HeaderMap::new();
3838 headers.insert("X-Webhook-Secret", HeaderValue::from_str(&secret).unwrap());
3839
3840 let response = handle_webhook(
3841 State(state),
3842 test_connect_info(),
3843 headers,
3844 Ok(Json(WebhookBody {
3845 message: "hello".into(),
3846 })),
3847 )
3848 .await
3849 .into_response();
3850
3851 assert_eq!(response.status(), StatusCode::OK);
3852 assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 1);
3853 }
3854
3855 fn compute_nextcloud_signature_hex(secret: &str, random: &str, body: &str) -> String {
3856 use hmac::{Hmac, Mac};
3857 use sha2::Sha256;
3858
3859 let payload = format!("{random}{body}");
3860 let mut mac = Hmac::<Sha256>::new_from_slice(secret.as_bytes()).unwrap();
3861 mac.update(payload.as_bytes());
3862 hex::encode(mac.finalize().into_bytes())
3863 }
3864
3865 #[tokio::test]
3866 async fn nextcloud_talk_webhook_returns_not_found_when_not_configured() {
3867 let provider: Arc<dyn Provider> = Arc::new(MockProvider::default());
3868 let memory: Arc<dyn Memory> = Arc::new(MockMemory);
3869
3870 let state = AppState {
3871 config: Arc::new(Mutex::new(Config::default())),
3872 provider,
3873 model: "test-model".into(),
3874 temperature: 0.0,
3875 mem: memory,
3876 auto_save: false,
3877 webhook_secret_hash: None,
3878 pairing: Arc::new(PairingGuard::new(false, &[])),
3879 trust_forwarded_headers: false,
3880 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3881 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3882 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3883 whatsapp: None,
3884 whatsapp_app_secret: None,
3885 linq: None,
3886 linq_signing_secret: None,
3887 nextcloud_talk: None,
3888 nextcloud_talk_webhook_secret: None,
3889 wati: None,
3890 gmail_push: None,
3891 observer: Arc::new(crate::observability::NoopObserver),
3892 tools_registry: Arc::new(Vec::new()),
3893 cost_tracker: None,
3894 audit_logger: None,
3895 event_tx: tokio::sync::broadcast::channel(16).0,
3896 shutdown_tx: tokio::sync::watch::channel(false).0,
3897 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3898 path_prefix: String::new(),
3899 session_backend: None,
3900 session_queue: std::sync::Arc::new(
3901 crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3902 ),
3903 device_registry: None,
3904 pending_pairings: None,
3905 canvas_store: CanvasStore::new(),
3906 mcp_registry: None,
3907 approval_registry: approval_registry::global(),
3908 mcp_local_url: None,
3909 #[cfg(feature = "webauthn")]
3910 webauthn: None,
3911 };
3912
3913 let response = Box::pin(handle_nextcloud_talk_webhook(
3914 State(state),
3915 HeaderMap::new(),
3916 Bytes::from_static(br#"{"type":"message"}"#),
3917 ))
3918 .await
3919 .into_response();
3920
3921 assert_eq!(response.status(), StatusCode::NOT_FOUND);
3922 }
3923
3924 #[tokio::test]
3925 async fn nextcloud_talk_webhook_rejects_invalid_signature() {
3926 let provider_impl = Arc::new(MockProvider::default());
3927 let provider: Arc<dyn Provider> = provider_impl.clone();
3928 let memory: Arc<dyn Memory> = Arc::new(MockMemory);
3929
3930 let channel = Arc::new(NextcloudTalkChannel::new(
3931 "https://cloud.example.com".into(),
3932 "app-token".into(),
3933 String::new(),
3934 vec!["*".into()],
3935 ));
3936
3937 let secret = "nextcloud-test-secret";
3938 let random = "seed-value";
3939 let body = r#"{"type":"message","object":{"token":"room-token"},"message":{"actorType":"users","actorId":"user_a","message":"hello"}}"#;
3940 let _valid_signature = compute_nextcloud_signature_hex(secret, random, body);
3941 let invalid_signature = "deadbeef";
3942
3943 let state = AppState {
3944 config: Arc::new(Mutex::new(Config::default())),
3945 provider,
3946 model: "test-model".into(),
3947 temperature: 0.0,
3948 mem: memory,
3949 auto_save: false,
3950 webhook_secret_hash: None,
3951 pairing: Arc::new(PairingGuard::new(false, &[])),
3952 trust_forwarded_headers: false,
3953 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
3954 auth_limiter: Arc::new(auth_rate_limit::AuthRateLimiter::new()),
3955 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
3956 whatsapp: None,
3957 whatsapp_app_secret: None,
3958 linq: None,
3959 linq_signing_secret: None,
3960 nextcloud_talk: Some(channel),
3961 nextcloud_talk_webhook_secret: Some(Arc::from(secret)),
3962 wati: None,
3963 gmail_push: None,
3964 observer: Arc::new(crate::observability::NoopObserver),
3965 tools_registry: Arc::new(Vec::new()),
3966 cost_tracker: None,
3967 audit_logger: None,
3968 event_tx: tokio::sync::broadcast::channel(16).0,
3969 shutdown_tx: tokio::sync::watch::channel(false).0,
3970 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
3971 path_prefix: String::new(),
3972 session_backend: None,
3973 session_queue: std::sync::Arc::new(
3974 crate::gateway::session_queue::SessionActorQueue::new(8, 30, 600),
3975 ),
3976 device_registry: None,
3977 pending_pairings: None,
3978 canvas_store: CanvasStore::new(),
3979 mcp_registry: None,
3980 approval_registry: approval_registry::global(),
3981 mcp_local_url: None,
3982 #[cfg(feature = "webauthn")]
3983 webauthn: None,
3984 };
3985
3986 let mut headers = HeaderMap::new();
3987 headers.insert(
3988 "X-Nextcloud-Talk-Random",
3989 HeaderValue::from_str(random).unwrap(),
3990 );
3991 headers.insert(
3992 "X-Nextcloud-Talk-Signature",
3993 HeaderValue::from_str(invalid_signature).unwrap(),
3994 );
3995
3996 let response = Box::pin(handle_nextcloud_talk_webhook(
3997 State(state),
3998 headers,
3999 Bytes::from(body),
4000 ))
4001 .await
4002 .into_response();
4003 assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
4004 assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 0);
4005 }
4006
4007 fn compute_whatsapp_signature_hex(secret: &str, body: &[u8]) -> String {
4012 use hmac::{Hmac, Mac};
4013 use sha2::Sha256;
4014
4015 let mut mac = Hmac::<Sha256>::new_from_slice(secret.as_bytes()).unwrap();
4016 mac.update(body);
4017 hex::encode(mac.finalize().into_bytes())
4018 }
4019
4020 fn compute_whatsapp_signature_header(secret: &str, body: &[u8]) -> String {
4021 format!("sha256={}", compute_whatsapp_signature_hex(secret, body))
4022 }
4023
4024 #[test]
4025 fn whatsapp_signature_valid() {
4026 let app_secret = generate_test_secret();
4027 let body = b"test body content";
4028
4029 let signature_header = compute_whatsapp_signature_header(&app_secret, body);
4030
4031 assert!(verify_whatsapp_signature(
4032 &app_secret,
4033 body,
4034 &signature_header
4035 ));
4036 }
4037
4038 #[test]
4039 fn whatsapp_signature_invalid_wrong_secret() {
4040 let app_secret = generate_test_secret();
4041 let wrong_secret = generate_test_secret();
4042 let body = b"test body content";
4043
4044 let signature_header = compute_whatsapp_signature_header(&wrong_secret, body);
4045
4046 assert!(!verify_whatsapp_signature(
4047 &app_secret,
4048 body,
4049 &signature_header
4050 ));
4051 }
4052
4053 #[test]
4054 fn whatsapp_signature_invalid_wrong_body() {
4055 let app_secret = generate_test_secret();
4056 let original_body = b"original body";
4057 let tampered_body = b"tampered body";
4058
4059 let signature_header = compute_whatsapp_signature_header(&app_secret, original_body);
4060
4061 assert!(!verify_whatsapp_signature(
4063 &app_secret,
4064 tampered_body,
4065 &signature_header
4066 ));
4067 }
4068
4069 #[test]
4070 fn whatsapp_signature_missing_prefix() {
4071 let app_secret = generate_test_secret();
4072 let body = b"test body";
4073
4074 let signature_header = "abc123def456";
4076
4077 assert!(!verify_whatsapp_signature(
4078 &app_secret,
4079 body,
4080 signature_header
4081 ));
4082 }
4083
4084 #[test]
4085 fn whatsapp_signature_empty_header() {
4086 let app_secret = generate_test_secret();
4087 let body = b"test body";
4088
4089 assert!(!verify_whatsapp_signature(&app_secret, body, ""));
4090 }
4091
4092 #[test]
4093 fn whatsapp_signature_invalid_hex() {
4094 let app_secret = generate_test_secret();
4095 let body = b"test body";
4096
4097 let signature_header = "sha256=not_valid_hex_zzz";
4099
4100 assert!(!verify_whatsapp_signature(
4101 &app_secret,
4102 body,
4103 signature_header
4104 ));
4105 }
4106
4107 #[test]
4108 fn whatsapp_signature_empty_body() {
4109 let app_secret = generate_test_secret();
4110 let body = b"";
4111
4112 let signature_header = compute_whatsapp_signature_header(&app_secret, body);
4113
4114 assert!(verify_whatsapp_signature(
4115 &app_secret,
4116 body,
4117 &signature_header
4118 ));
4119 }
4120
4121 #[test]
4122 fn whatsapp_signature_unicode_body() {
4123 let app_secret = generate_test_secret();
4124 let body = "Hello 🦀 World".as_bytes();
4125
4126 let signature_header = compute_whatsapp_signature_header(&app_secret, body);
4127
4128 assert!(verify_whatsapp_signature(
4129 &app_secret,
4130 body,
4131 &signature_header
4132 ));
4133 }
4134
4135 #[test]
4136 fn whatsapp_signature_json_payload() {
4137 let app_secret = generate_test_secret();
4138 let body = br#"{"entry":[{"changes":[{"value":{"messages":[{"from":"1234567890","text":{"body":"Hello"}}]}}]}]}"#;
4139
4140 let signature_header = compute_whatsapp_signature_header(&app_secret, body);
4141
4142 assert!(verify_whatsapp_signature(
4143 &app_secret,
4144 body,
4145 &signature_header
4146 ));
4147 }
4148
4149 #[test]
4150 fn whatsapp_signature_case_sensitive_prefix() {
4151 let app_secret = generate_test_secret();
4152 let body = b"test body";
4153
4154 let hex_sig = compute_whatsapp_signature_hex(&app_secret, body);
4155
4156 let wrong_prefix = format!("SHA256={hex_sig}");
4158 assert!(!verify_whatsapp_signature(&app_secret, body, &wrong_prefix));
4159
4160 let correct_prefix = format!("sha256={hex_sig}");
4162 assert!(verify_whatsapp_signature(
4163 &app_secret,
4164 body,
4165 &correct_prefix
4166 ));
4167 }
4168
4169 #[test]
4170 fn whatsapp_signature_truncated_hex() {
4171 let app_secret = generate_test_secret();
4172 let body = b"test body";
4173
4174 let hex_sig = compute_whatsapp_signature_hex(&app_secret, body);
4175 let truncated = &hex_sig[..32]; let signature_header = format!("sha256={truncated}");
4177
4178 assert!(!verify_whatsapp_signature(
4179 &app_secret,
4180 body,
4181 &signature_header
4182 ));
4183 }
4184
4185 #[test]
4186 fn whatsapp_signature_extra_bytes() {
4187 let app_secret = generate_test_secret();
4188 let body = b"test body";
4189
4190 let hex_sig = compute_whatsapp_signature_hex(&app_secret, body);
4191 let extended = format!("{hex_sig}deadbeef");
4192 let signature_header = format!("sha256={extended}");
4193
4194 assert!(!verify_whatsapp_signature(
4195 &app_secret,
4196 body,
4197 &signature_header
4198 ));
4199 }
4200
4201 #[test]
4206 fn idempotency_store_allows_different_keys() {
4207 let store = IdempotencyStore::new(Duration::from_secs(60), 100);
4208 assert!(store.record_if_new("key-a"));
4209 assert!(store.record_if_new("key-b"));
4210 assert!(store.record_if_new("key-c"));
4211 assert!(store.record_if_new("key-d"));
4212 }
4213
4214 #[test]
4215 fn idempotency_store_max_keys_clamped_to_one() {
4216 let store = IdempotencyStore::new(Duration::from_secs(60), 0);
4217 assert!(store.record_if_new("only-key"));
4218 assert!(!store.record_if_new("only-key"));
4219 }
4220
4221 #[test]
4222 fn idempotency_store_rapid_duplicate_rejected() {
4223 let store = IdempotencyStore::new(Duration::from_secs(300), 100);
4224 assert!(store.record_if_new("rapid"));
4225 assert!(!store.record_if_new("rapid"));
4226 }
4227
4228 #[test]
4229 fn idempotency_store_accepts_after_ttl_expires() {
4230 let store = IdempotencyStore::new(Duration::from_millis(1), 100);
4231 assert!(store.record_if_new("ttl-key"));
4232 std::thread::sleep(Duration::from_millis(10));
4233 assert!(store.record_if_new("ttl-key"));
4234 }
4235
4236 #[test]
4237 fn idempotency_store_eviction_preserves_newest() {
4238 let store = IdempotencyStore::new(Duration::from_secs(300), 1);
4239 assert!(store.record_if_new("old-key"));
4240 std::thread::sleep(Duration::from_millis(2));
4241 assert!(store.record_if_new("new-key"));
4242
4243 let keys = store.keys.lock();
4244 assert_eq!(keys.len(), 1);
4245 assert!(!keys.contains_key("old-key"));
4246 assert!(keys.contains_key("new-key"));
4247 }
4248
4249 #[test]
4250 fn rate_limiter_allows_after_window_expires() {
4251 let window = Duration::from_millis(50);
4252 let limiter = SlidingWindowRateLimiter::new(2, window, 100);
4253 assert!(limiter.allow("ip-1"));
4254 assert!(limiter.allow("ip-1"));
4255 assert!(!limiter.allow("ip-1")); std::thread::sleep(Duration::from_millis(60));
4259
4260 assert!(limiter.allow("ip-1"));
4262 }
4263
4264 #[test]
4265 fn rate_limiter_independent_keys_tracked_separately() {
4266 let limiter = SlidingWindowRateLimiter::new(2, Duration::from_secs(60), 100);
4267 assert!(limiter.allow("ip-1"));
4268 assert!(limiter.allow("ip-1"));
4269 assert!(!limiter.allow("ip-1")); assert!(limiter.allow("ip-2"));
4273 assert!(limiter.allow("ip-2"));
4274 assert!(!limiter.allow("ip-2")); }
4276
4277 #[test]
4278 fn rate_limiter_exact_boundary_at_max_keys() {
4279 let limiter = SlidingWindowRateLimiter::new(10, Duration::from_secs(60), 3);
4280 assert!(limiter.allow("ip-1"));
4281 assert!(limiter.allow("ip-2"));
4282 assert!(limiter.allow("ip-3"));
4283 assert!(limiter.allow("ip-4")); let guard = limiter.requests.lock();
4287 assert_eq!(guard.0.len(), 3);
4288 assert!(
4289 !guard.0.contains_key("ip-1"),
4290 "ip-1 should have been evicted"
4291 );
4292 assert!(guard.0.contains_key("ip-2"));
4293 assert!(guard.0.contains_key("ip-3"));
4294 assert!(guard.0.contains_key("ip-4"));
4295 }
4296
4297 #[test]
4298 fn gateway_rate_limiter_pair_and_webhook_are_independent() {
4299 let limiter = GatewayRateLimiter::new(2, 3, 100);
4300
4301 assert!(limiter.allow_pair("ip-1"));
4303 assert!(limiter.allow_pair("ip-1"));
4304 assert!(!limiter.allow_pair("ip-1")); assert!(limiter.allow_webhook("ip-1"));
4308 assert!(limiter.allow_webhook("ip-1"));
4309 assert!(limiter.allow_webhook("ip-1"));
4310 assert!(!limiter.allow_webhook("ip-1")); }
4312
4313 #[test]
4314 fn rate_limiter_single_key_max_allows_one_request() {
4315 let limiter = SlidingWindowRateLimiter::new(5, Duration::from_secs(60), 1);
4316 assert!(limiter.allow("ip-1"));
4317 assert!(limiter.allow("ip-2")); let guard = limiter.requests.lock();
4320 assert_eq!(guard.0.len(), 1);
4321 assert!(guard.0.contains_key("ip-2"));
4322 assert!(!guard.0.contains_key("ip-1"));
4323 }
4324
4325 #[test]
4326 fn rate_limiter_concurrent_access_safe() {
4327 use std::sync::Arc;
4328
4329 let limiter = Arc::new(SlidingWindowRateLimiter::new(
4330 1000,
4331 Duration::from_secs(60),
4332 1000,
4333 ));
4334 let mut handles = Vec::new();
4335
4336 for i in 0..10 {
4337 let limiter = limiter.clone();
4338 handles.push(std::thread::spawn(move || {
4339 for j in 0..100 {
4340 limiter.allow(&format!("thread-{i}-req-{j}"));
4341 }
4342 }));
4343 }
4344
4345 for handle in handles {
4346 handle.join().unwrap();
4347 }
4348
4349 let guard = limiter.requests.lock();
4351 assert!(guard.0.len() <= 1000, "should respect max_keys");
4352 }
4353
4354 #[test]
4355 fn idempotency_store_concurrent_access_safe() {
4356 use std::sync::Arc;
4357
4358 let store = Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000));
4359 let mut handles = Vec::new();
4360
4361 for i in 0..10 {
4362 let store = store.clone();
4363 handles.push(std::thread::spawn(move || {
4364 for j in 0..100 {
4365 store.record_if_new(&format!("thread-{i}-key-{j}"));
4366 }
4367 }));
4368 }
4369
4370 for handle in handles {
4371 handle.join().unwrap();
4372 }
4373
4374 let keys = store.keys.lock();
4375 assert!(keys.len() <= 1000, "should respect max_keys");
4376 }
4377
4378 #[test]
4379 fn rate_limiter_rapid_burst_then_cooldown() {
4380 let limiter = SlidingWindowRateLimiter::new(5, Duration::from_millis(50), 100);
4381
4382 for _ in 0..5 {
4384 assert!(limiter.allow("burst-ip"));
4385 }
4386 assert!(!limiter.allow("burst-ip")); std::thread::sleep(Duration::from_millis(60));
4390
4391 assert!(limiter.allow("burst-ip"));
4393 }
4394
4395 #[test]
4396 fn require_localhost_accepts_ipv4_loopback() {
4397 let peer = SocketAddr::from(([127, 0, 0, 1], 12345));
4398 assert!(require_localhost(&peer).is_ok());
4399 }
4400
4401 #[test]
4402 fn require_localhost_accepts_ipv6_loopback() {
4403 let peer = SocketAddr::from((std::net::Ipv6Addr::LOCALHOST, 12345));
4404 assert!(require_localhost(&peer).is_ok());
4405 }
4406
4407 #[test]
4408 fn require_localhost_rejects_non_loopback_ipv4() {
4409 let peer = SocketAddr::from(([192, 168, 1, 100], 12345));
4410 let err = require_localhost(&peer).unwrap_err();
4411 assert_eq!(err.0, StatusCode::FORBIDDEN);
4412 }
4413
4414 #[test]
4415 fn require_localhost_rejects_non_loopback_ipv6() {
4416 let peer = SocketAddr::from((
4417 std::net::Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1),
4418 12345,
4419 ));
4420 let err = require_localhost(&peer).unwrap_err();
4421 assert_eq!(err.0, StatusCode::FORBIDDEN);
4422 }
4423}