1use super::AppState;
6use axum::{
7 extract::{ConnectInfo, Path, Query, State},
8 http::{HeaderMap, StatusCode, header},
9 response::{IntoResponse, Json},
10};
11use serde::Deserialize;
12use uuid::Uuid;
13
14const MASKED_SECRET: &str = "***MASKED***";
15
16fn extract_bearer_token(headers: &HeaderMap) -> Option<&str> {
20 headers
21 .get(header::AUTHORIZATION)
22 .and_then(|v| v.to_str().ok())
23 .and_then(|auth| auth.strip_prefix("Bearer "))
24}
25
26pub(super) fn require_auth(
28 state: &AppState,
29 headers: &HeaderMap,
30) -> Result<(), (StatusCode, Json<serde_json::Value>)> {
31 if !state.pairing.require_pairing() {
32 return Ok(());
33 }
34
35 let token = extract_bearer_token(headers).unwrap_or("");
36 if state.pairing.is_authenticated(token) {
37 Ok(())
38 } else {
39 Err((
40 StatusCode::UNAUTHORIZED,
41 Json(serde_json::json!({
42 "error": "Unauthorized — pair first via POST /pair, then send Authorization: Bearer <token>"
43 })),
44 ))
45 }
46}
47
48#[derive(Deserialize)]
51pub struct CronRunsQuery {
52 pub limit: Option<u32>,
53}
54
55#[derive(Deserialize)]
56pub struct CronAddBody {
57 pub name: Option<String>,
58 pub schedule: String,
59 pub command: Option<String>,
60 pub job_type: Option<String>,
61 pub prompt: Option<String>,
62 pub delivery: Option<crate::cron::DeliveryConfig>,
63 pub session_target: Option<String>,
64 pub model: Option<String>,
65 pub allowed_tools: Option<Vec<String>>,
66 pub delete_after_run: Option<bool>,
67}
68
69#[derive(Deserialize)]
70pub struct CronPatchBody {
71 pub name: Option<String>,
72 pub schedule: Option<String>,
73 pub command: Option<String>,
74 pub prompt: Option<String>,
75 pub enabled: Option<bool>,
76}
77
78#[derive(Deserialize)]
79pub struct AuditQuery {
80 pub limit: Option<usize>,
82 pub event_type: Option<String>,
84 pub since: Option<String>,
86}
87
88pub async fn handle_api_status(
95 State(state): State<AppState>,
96 headers: HeaderMap,
97) -> impl IntoResponse {
98 let health = crate::health::snapshot();
99 let authed = require_auth(&state, &headers).is_ok();
100
101 if !authed {
102 let body = serde_json::json!({
104 "uptime_seconds": health.uptime_seconds,
105 "paired": state.pairing.is_paired(),
106 "health": health,
107 });
108 return Json(body).into_response();
109 }
110
111 let config = state.config.lock().clone();
112
113 let mut channels = serde_json::Map::new();
114
115 for (channel, present) in config.channels_config.channels() {
116 channels.insert(channel.name().to_string(), serde_json::Value::Bool(present));
117 }
118
119 let body = serde_json::json!({
120 "provider": config.default_provider,
121 "model": state.model,
122 "temperature": state.temperature,
123 "uptime_seconds": health.uptime_seconds,
124 "gateway_port": config.gateway.port,
125 "locale": "en",
126 "memory_backend": if config.memory.backend == "kumiho" && config.kumiho.enabled {
127 "kumiho"
128 } else {
129 state.mem.name()
130 },
131 "paired": state.pairing.is_paired(),
132 "channels": channels,
133 "health": health,
134 });
135
136 Json(body).into_response()
137}
138
139pub async fn handle_api_config_get(
141 State(state): State<AppState>,
142 headers: HeaderMap,
143) -> impl IntoResponse {
144 if let Err(e) = require_auth(&state, &headers) {
145 return e.into_response();
146 }
147
148 let config = state.config.lock().clone();
149
150 let masked_config = mask_sensitive_fields(&config);
152 let toml_str = match toml::to_string_pretty(&masked_config) {
153 Ok(s) => s,
154 Err(e) => {
155 return (
156 StatusCode::INTERNAL_SERVER_ERROR,
157 Json(serde_json::json!({"error": format!("Failed to serialize config: {e}")})),
158 )
159 .into_response();
160 }
161 };
162
163 Json(serde_json::json!({
164 "format": "toml",
165 "content": toml_str,
166 }))
167 .into_response()
168}
169
170pub async fn handle_api_config_put(
172 State(state): State<AppState>,
173 headers: HeaderMap,
174 body: String,
175) -> impl IntoResponse {
176 if let Err(e) = require_auth(&state, &headers) {
177 return e.into_response();
178 }
179
180 let incoming: crate::config::Config = match toml::from_str(&body) {
182 Ok(c) => c,
183 Err(e) => {
184 return (
185 StatusCode::BAD_REQUEST,
186 Json(serde_json::json!({"error": format!("Invalid TOML: {e}")})),
187 )
188 .into_response();
189 }
190 };
191
192 let current_config = state.config.lock().clone();
193 let new_config = hydrate_config_for_save(incoming, ¤t_config);
194
195 if let Err(e) = new_config.validate() {
196 return (
197 StatusCode::BAD_REQUEST,
198 Json(serde_json::json!({"error": format!("Invalid config: {e}")})),
199 )
200 .into_response();
201 }
202
203 if let Err(e) = new_config.save().await {
205 return (
206 StatusCode::INTERNAL_SERVER_ERROR,
207 Json(serde_json::json!({"error": format!("Failed to save config: {e}")})),
208 )
209 .into_response();
210 }
211
212 *state.config.lock() = new_config;
214
215 if let Some(ref logger) = state.audit_logger {
217 let _ = logger.log_config_change("dashboard", "Configuration updated via REST API");
218 }
219
220 Json(serde_json::json!({"status": "ok"})).into_response()
221}
222
223pub async fn handle_api_tools(
225 State(state): State<AppState>,
226 headers: HeaderMap,
227) -> impl IntoResponse {
228 if let Err(e) = require_auth(&state, &headers) {
229 return e.into_response();
230 }
231
232 let tools: Vec<serde_json::Value> = state
233 .tools_registry
234 .iter()
235 .map(|spec| {
236 serde_json::json!({
237 "name": spec.name,
238 "description": spec.description,
239 "parameters": spec.parameters,
240 })
241 })
242 .collect();
243
244 Json(serde_json::json!({"tools": tools})).into_response()
245}
246
247pub async fn handle_api_cron_list(
249 State(state): State<AppState>,
250 headers: HeaderMap,
251) -> impl IntoResponse {
252 if let Err(e) = require_auth(&state, &headers) {
253 return e.into_response();
254 }
255
256 let config = state.config.lock().clone();
257 match crate::cron::list_jobs(&config) {
258 Ok(jobs) => Json(serde_json::json!({"jobs": jobs})).into_response(),
259 Err(e) => (
260 StatusCode::INTERNAL_SERVER_ERROR,
261 Json(serde_json::json!({"error": format!("Failed to list cron jobs: {e}")})),
262 )
263 .into_response(),
264 }
265}
266
267pub async fn handle_api_cron_add(
269 State(state): State<AppState>,
270 headers: HeaderMap,
271 Json(body): Json<CronAddBody>,
272) -> impl IntoResponse {
273 if let Err(e) = require_auth(&state, &headers) {
274 return e.into_response();
275 }
276
277 let CronAddBody {
278 name,
279 schedule,
280 command,
281 job_type,
282 prompt,
283 delivery,
284 session_target,
285 model,
286 allowed_tools,
287 delete_after_run,
288 } = body;
289
290 let config = state.config.lock().clone();
291 let schedule = crate::cron::Schedule::Cron {
292 expr: schedule,
293 tz: None,
294 };
295 if let Err(e) = crate::cron::validate_delivery_config(delivery.as_ref()) {
296 return (
297 StatusCode::BAD_REQUEST,
298 Json(serde_json::json!({"error": format!("Failed to add cron job: {e}")})),
299 )
300 .into_response();
301 }
302
303 let is_agent =
305 matches!(job_type.as_deref(), Some("agent")) || (job_type.is_none() && prompt.is_some());
306
307 let result = if is_agent {
308 let prompt = match prompt.as_deref() {
309 Some(p) if !p.trim().is_empty() => p,
310 _ => {
311 return (
312 StatusCode::BAD_REQUEST,
313 Json(serde_json::json!({"error": "Missing 'prompt' for agent job"})),
314 )
315 .into_response();
316 }
317 };
318
319 let session_target = session_target
320 .as_deref()
321 .map(crate::cron::SessionTarget::parse)
322 .unwrap_or_default();
323
324 let default_delete = matches!(schedule, crate::cron::Schedule::At { .. });
325 let delete_after_run = delete_after_run.unwrap_or(default_delete);
326
327 crate::cron::add_agent_job(
328 &config,
329 name,
330 schedule,
331 prompt,
332 session_target,
333 model,
334 delivery,
335 delete_after_run,
336 allowed_tools,
337 )
338 } else {
339 let command = match command.as_deref() {
340 Some(c) if !c.trim().is_empty() => c,
341 _ => {
342 return (
343 StatusCode::BAD_REQUEST,
344 Json(serde_json::json!({"error": "Missing 'command' for shell job"})),
345 )
346 .into_response();
347 }
348 };
349
350 crate::cron::add_shell_job_with_approval(&config, name, schedule, command, delivery, false)
351 };
352
353 match result {
354 Ok(job) => Json(serde_json::json!({"status": "ok", "job": job})).into_response(),
355 Err(e) => (
356 StatusCode::INTERNAL_SERVER_ERROR,
357 Json(serde_json::json!({"error": format!("Failed to add cron job: {e}")})),
358 )
359 .into_response(),
360 }
361}
362
363pub async fn handle_api_cron_runs(
365 State(state): State<AppState>,
366 headers: HeaderMap,
367 Path(id): Path<String>,
368 Query(params): Query<CronRunsQuery>,
369) -> impl IntoResponse {
370 if let Err(e) = require_auth(&state, &headers) {
371 return e.into_response();
372 }
373
374 let limit = params.limit.unwrap_or(20).clamp(1, 100) as usize;
375 let config = state.config.lock().clone();
376
377 if let Err(e) = crate::cron::get_job(&config, &id) {
379 return (
380 StatusCode::NOT_FOUND,
381 Json(serde_json::json!({"error": format!("Cron job not found: {e}")})),
382 )
383 .into_response();
384 }
385
386 match crate::cron::list_runs(&config, &id, limit) {
387 Ok(runs) => {
388 let runs_json: Vec<serde_json::Value> = runs
389 .iter()
390 .map(|r| {
391 serde_json::json!({
392 "id": r.id,
393 "job_id": r.job_id,
394 "started_at": r.started_at.to_rfc3339(),
395 "finished_at": r.finished_at.to_rfc3339(),
396 "status": r.status,
397 "output": r.output,
398 "duration_ms": r.duration_ms,
399 })
400 })
401 .collect();
402 Json(serde_json::json!({"runs": runs_json})).into_response()
403 }
404 Err(e) => (
405 StatusCode::INTERNAL_SERVER_ERROR,
406 Json(serde_json::json!({"error": format!("Failed to list cron runs: {e}")})),
407 )
408 .into_response(),
409 }
410}
411
412pub async fn handle_api_cron_patch(
414 State(state): State<AppState>,
415 headers: HeaderMap,
416 Path(id): Path<String>,
417 Json(body): Json<CronPatchBody>,
418) -> impl IntoResponse {
419 if let Err(e) = require_auth(&state, &headers) {
420 return e.into_response();
421 }
422
423 let config = state.config.lock().clone();
424
425 let schedule = match body.schedule {
427 Some(expr) if !expr.trim().is_empty() => Some(crate::cron::Schedule::Cron {
428 expr: expr.trim().to_string(),
429 tz: None,
430 }),
431 _ => None,
432 };
433
434 let existing = match crate::cron::get_job(&config, &id) {
438 Ok(j) => j,
439 Err(e) => {
440 return (
441 StatusCode::NOT_FOUND,
442 Json(serde_json::json!({"error": format!("Cron job not found: {e}")})),
443 )
444 .into_response();
445 }
446 };
447 let is_agent = matches!(existing.job_type, crate::cron::JobType::Agent);
448 let (patch_command, patch_prompt) = if is_agent {
449 (None, body.command.or(body.prompt))
450 } else {
451 (body.command.or(body.prompt), None)
452 };
453
454 let patch = crate::cron::CronJobPatch {
455 name: body.name,
456 schedule,
457 command: patch_command,
458 prompt: patch_prompt,
459 enabled: body.enabled,
460 ..crate::cron::CronJobPatch::default()
461 };
462
463 match crate::cron::update_shell_job_with_approval(&config, &id, patch, false) {
464 Ok(job) => Json(serde_json::json!({"status": "ok", "job": job})).into_response(),
465 Err(e) => (
466 StatusCode::INTERNAL_SERVER_ERROR,
467 Json(serde_json::json!({"error": format!("Failed to update cron job: {e}")})),
468 )
469 .into_response(),
470 }
471}
472
473pub async fn handle_api_cron_delete(
475 State(state): State<AppState>,
476 headers: HeaderMap,
477 Path(id): Path<String>,
478) -> impl IntoResponse {
479 if let Err(e) = require_auth(&state, &headers) {
480 return e.into_response();
481 }
482
483 let config = state.config.lock().clone();
484 match crate::cron::remove_job(&config, &id) {
485 Ok(()) => Json(serde_json::json!({"status": "ok"})).into_response(),
486 Err(e) => (
487 StatusCode::INTERNAL_SERVER_ERROR,
488 Json(serde_json::json!({"error": format!("Failed to remove cron job: {e}")})),
489 )
490 .into_response(),
491 }
492}
493
494pub async fn handle_api_cron_settings_get(
496 State(state): State<AppState>,
497 headers: HeaderMap,
498) -> impl IntoResponse {
499 if let Err(e) = require_auth(&state, &headers) {
500 return e.into_response();
501 }
502
503 let config = state.config.lock().clone();
504 Json(serde_json::json!({
505 "enabled": config.cron.enabled,
506 "catch_up_on_startup": config.cron.catch_up_on_startup,
507 "max_run_history": config.cron.max_run_history,
508 }))
509 .into_response()
510}
511
512pub async fn handle_api_cron_settings_patch(
514 State(state): State<AppState>,
515 headers: HeaderMap,
516 Json(body): Json<serde_json::Value>,
517) -> impl IntoResponse {
518 if let Err(e) = require_auth(&state, &headers) {
519 return e.into_response();
520 }
521
522 let mut config = state.config.lock().clone();
523
524 if let Some(v) = body.get("enabled").and_then(|v| v.as_bool()) {
525 config.cron.enabled = v;
526 }
527 if let Some(v) = body.get("catch_up_on_startup").and_then(|v| v.as_bool()) {
528 config.cron.catch_up_on_startup = v;
529 }
530 if let Some(v) = body.get("max_run_history").and_then(|v| v.as_u64()) {
531 config.cron.max_run_history = u32::try_from(v).unwrap_or(u32::MAX);
532 }
533
534 if let Err(e) = config.save().await {
535 return (
536 StatusCode::INTERNAL_SERVER_ERROR,
537 Json(serde_json::json!({"error": format!("Failed to save config: {e}")})),
538 )
539 .into_response();
540 }
541
542 *state.config.lock() = config.clone();
543
544 Json(serde_json::json!({
545 "status": "ok",
546 "enabled": config.cron.enabled,
547 "catch_up_on_startup": config.cron.catch_up_on_startup,
548 "max_run_history": config.cron.max_run_history,
549 }))
550 .into_response()
551}
552
553pub async fn handle_api_integrations(
555 State(state): State<AppState>,
556 headers: HeaderMap,
557) -> impl IntoResponse {
558 if let Err(e) = require_auth(&state, &headers) {
559 return e.into_response();
560 }
561
562 let config = state.config.lock().clone();
563 let entries = crate::integrations::registry::all_integrations();
564
565 let integrations: Vec<serde_json::Value> = entries
566 .iter()
567 .map(|entry| {
568 let status = (entry.status_fn)(&config);
569 serde_json::json!({
570 "name": entry.name,
571 "description": entry.description,
572 "category": entry.category,
573 "status": status,
574 })
575 })
576 .collect();
577
578 Json(serde_json::json!({"integrations": integrations})).into_response()
579}
580
581pub async fn handle_api_integrations_settings(
583 State(state): State<AppState>,
584 headers: HeaderMap,
585) -> impl IntoResponse {
586 if let Err(e) = require_auth(&state, &headers) {
587 return e.into_response();
588 }
589
590 let config = state.config.lock().clone();
591 let entries = crate::integrations::registry::all_integrations();
592
593 let mut settings = serde_json::Map::new();
594 for entry in &entries {
595 let status = (entry.status_fn)(&config);
596 let enabled = matches!(status, crate::integrations::IntegrationStatus::Active);
597 settings.insert(
598 entry.name.to_string(),
599 serde_json::json!({
600 "enabled": enabled,
601 "category": entry.category,
602 "status": status,
603 }),
604 );
605 }
606
607 Json(serde_json::json!({"settings": settings})).into_response()
608}
609
610pub async fn handle_api_doctor(
612 State(state): State<AppState>,
613 headers: HeaderMap,
614) -> impl IntoResponse {
615 if let Err(e) = require_auth(&state, &headers) {
616 return e.into_response();
617 }
618
619 let config = state.config.lock().clone();
620 let results = crate::doctor::diagnose(&config);
621
622 let ok_count = results
623 .iter()
624 .filter(|r| r.severity == crate::doctor::Severity::Ok)
625 .count();
626 let warn_count = results
627 .iter()
628 .filter(|r| r.severity == crate::doctor::Severity::Warn)
629 .count();
630 let error_count = results
631 .iter()
632 .filter(|r| r.severity == crate::doctor::Severity::Error)
633 .count();
634
635 Json(serde_json::json!({
636 "results": results,
637 "summary": {
638 "ok": ok_count,
639 "warnings": warn_count,
640 "errors": error_count,
641 }
642 }))
643 .into_response()
644}
645
646pub async fn handle_api_cost(
652 State(state): State<AppState>,
653 _headers: HeaderMap,
654) -> impl IntoResponse {
655 if let Some(ref tracker) = state.cost_tracker {
656 match tracker.get_summary() {
657 Ok(summary) => Json(serde_json::json!({"cost": summary})).into_response(),
658 Err(e) => (
659 StatusCode::INTERNAL_SERVER_ERROR,
660 Json(serde_json::json!({"error": format!("Cost summary failed: {e}")})),
661 )
662 .into_response(),
663 }
664 } else {
665 Json(serde_json::json!({
666 "cost": {
667 "session_cost_usd": 0.0,
668 "daily_cost_usd": 0.0,
669 "monthly_cost_usd": 0.0,
670 "total_tokens": 0,
671 "request_count": 0,
672 "by_model": {},
673 }
674 }))
675 .into_response()
676 }
677}
678
679pub async fn handle_api_audit(
681 State(state): State<AppState>,
682 headers: HeaderMap,
683 Query(params): Query<AuditQuery>,
684) -> impl IntoResponse {
685 if let Err(e) = require_auth(&state, &headers) {
686 return e.into_response();
687 }
688
689 let Some(ref logger) = state.audit_logger else {
690 return Json(serde_json::json!({
691 "events": [],
692 "count": 0,
693 "audit_enabled": false,
694 }))
695 .into_response();
696 };
697
698 let limit = params.limit.unwrap_or(50).min(500);
699 let event_type = params.event_type.as_deref();
700 let since = params.since.as_deref().and_then(|s| {
701 chrono::DateTime::parse_from_rfc3339(s)
702 .ok()
703 .map(|dt| dt.with_timezone(&chrono::Utc))
704 });
705
706 match logger.read_events(limit, event_type, since) {
707 Ok(events) => {
708 let count = events.len();
709 Json(serde_json::json!({
710 "events": events,
711 "count": count,
712 "audit_enabled": true,
713 }))
714 .into_response()
715 }
716 Err(e) => (
717 StatusCode::INTERNAL_SERVER_ERROR,
718 Json(serde_json::json!({"error": format!("Audit read failed: {e}")})),
719 )
720 .into_response(),
721 }
722}
723
724pub async fn handle_api_audit_verify(
726 State(state): State<AppState>,
727 headers: HeaderMap,
728) -> impl IntoResponse {
729 if let Err(e) = require_auth(&state, &headers) {
730 return e.into_response();
731 }
732
733 let Some(ref logger) = state.audit_logger else {
734 return Json(serde_json::json!({
735 "verified": false,
736 "error": "Audit logging not enabled",
737 }))
738 .into_response();
739 };
740
741 match crate::security::audit::verify_chain(logger.log_path()) {
742 Ok(count) => Json(serde_json::json!({
743 "verified": true,
744 "entry_count": count,
745 }))
746 .into_response(),
747 Err(e) => Json(serde_json::json!({
748 "verified": false,
749 "error": format!("{e}"),
750 }))
751 .into_response(),
752 }
753}
754
755pub async fn handle_api_cli_tools(
757 State(state): State<AppState>,
758 headers: HeaderMap,
759) -> impl IntoResponse {
760 if let Err(e) = require_auth(&state, &headers) {
761 return e.into_response();
762 }
763
764 let tools = crate::tools::cli_discovery::discover_cli_tools(&[], &[]);
765
766 Json(serde_json::json!({"cli_tools": tools})).into_response()
767}
768
769pub async fn handle_api_health(
771 State(state): State<AppState>,
772 headers: HeaderMap,
773) -> impl IntoResponse {
774 if let Err(e) = require_auth(&state, &headers) {
775 return e.into_response();
776 }
777
778 let snapshot = crate::health::snapshot();
779 Json(serde_json::json!({"health": snapshot})).into_response()
780}
781
782pub async fn handle_api_nodes(
786 State(state): State<AppState>,
787 headers: HeaderMap,
788) -> impl IntoResponse {
789 if let Err(e) = require_auth(&state, &headers) {
790 return e.into_response();
791 }
792
793 let caps = state.node_registry.all_capabilities();
794 let node_ids = state.node_registry.node_ids();
795
796 let mut nodes: std::collections::HashMap<String, Vec<serde_json::Value>> =
798 std::collections::HashMap::new();
799 for (node_id, cap_name, cap) in &caps {
800 nodes
801 .entry(node_id.clone())
802 .or_default()
803 .push(serde_json::json!({
804 "name": cap_name,
805 "description": cap.description,
806 }));
807 }
808
809 let node_list: Vec<serde_json::Value> = node_ids
810 .iter()
811 .map(|id| {
812 let capabilities = nodes.get(id).cloned().unwrap_or_default();
813 serde_json::json!({
814 "node_id": id,
815 "capabilities": capabilities,
816 "capability_count": capabilities.len(),
817 })
818 })
819 .collect();
820
821 Json(serde_json::json!({
822 "nodes": node_list,
823 "count": node_list.len(),
824 }))
825 .into_response()
826}
827
828pub async fn handle_api_node_invoke(
830 State(state): State<AppState>,
831 headers: HeaderMap,
832 Path(node_id): Path<String>,
833 Json(body): Json<serde_json::Value>,
834) -> impl IntoResponse {
835 if let Err(e) = require_auth(&state, &headers) {
836 return e.into_response();
837 }
838
839 let capability = body
840 .get("capability")
841 .and_then(|v| v.as_str())
842 .unwrap_or("");
843 if capability.is_empty() {
844 return (
845 StatusCode::BAD_REQUEST,
846 Json(serde_json::json!({"error": "capability is required"})),
847 )
848 .into_response();
849 }
850
851 let args = body.get("args").cloned().unwrap_or(serde_json::json!({}));
852
853 let Some(invoke_tx) = state.node_registry.invoke_tx(&node_id) else {
854 return (
855 StatusCode::NOT_FOUND,
856 Json(serde_json::json!({"error": format!("Node not found: {node_id}")})),
857 )
858 .into_response();
859 };
860
861 let call_id = Uuid::new_v4().to_string();
862 let (response_tx, response_rx) = tokio::sync::oneshot::channel();
863
864 let invocation = super::nodes::NodeInvocation {
865 call_id: call_id.clone(),
866 capability: capability.to_string(),
867 args,
868 response_tx,
869 };
870
871 if invoke_tx.send(invocation).await.is_err() {
872 return (
873 StatusCode::INTERNAL_SERVER_ERROR,
874 Json(serde_json::json!({"error": "Failed to send to node"})),
875 )
876 .into_response();
877 }
878
879 match tokio::time::timeout(std::time::Duration::from_secs(30), response_rx).await {
880 Ok(Ok(result)) => Json(serde_json::json!({
881 "call_id": call_id,
882 "success": result.success,
883 "output": result.output,
884 "error": result.error,
885 }))
886 .into_response(),
887 Ok(Err(_)) => (
888 StatusCode::INTERNAL_SERVER_ERROR,
889 Json(serde_json::json!({"error": "Node dropped the response channel"})),
890 )
891 .into_response(),
892 Err(_) => (
893 StatusCode::GATEWAY_TIMEOUT,
894 Json(serde_json::json!({"error": "Node invocation timed out (30s)"})),
895 )
896 .into_response(),
897 }
898}
899
900fn is_masked_secret(value: &str) -> bool {
903 value == MASKED_SECRET
904}
905
906fn mask_optional_secret(value: &mut Option<String>) {
907 if value.is_some() {
908 *value = Some(MASKED_SECRET.to_string());
909 }
910}
911
912fn mask_required_secret(value: &mut String) {
913 if !value.is_empty() {
914 *value = MASKED_SECRET.to_string();
915 }
916}
917
918fn mask_vec_secrets(values: &mut [String]) {
919 for value in values.iter_mut() {
920 if !value.is_empty() {
921 *value = MASKED_SECRET.to_string();
922 }
923 }
924}
925
926#[allow(clippy::ref_option)]
927fn restore_optional_secret(value: &mut Option<String>, current: &Option<String>) {
928 if value.as_deref().is_some_and(is_masked_secret) {
929 *value = current.clone();
930 }
931}
932
933fn restore_required_secret(value: &mut String, current: &str) {
934 if is_masked_secret(value) {
935 *value = current.to_string();
936 }
937}
938
939fn restore_vec_secrets(values: &mut [String], current: &[String]) {
940 for (idx, value) in values.iter_mut().enumerate() {
941 if is_masked_secret(value) {
942 if let Some(existing) = current.get(idx) {
943 *value = existing.clone();
944 }
945 }
946 }
947}
948
949fn normalize_route_field(value: &str) -> String {
950 value.trim().to_ascii_lowercase()
951}
952
953fn model_route_identity_matches(
954 incoming: &crate::config::schema::ModelRouteConfig,
955 current: &crate::config::schema::ModelRouteConfig,
956) -> bool {
957 normalize_route_field(&incoming.hint) == normalize_route_field(¤t.hint)
958 && normalize_route_field(&incoming.provider) == normalize_route_field(¤t.provider)
959 && normalize_route_field(&incoming.model) == normalize_route_field(¤t.model)
960}
961
962fn model_route_provider_model_matches(
963 incoming: &crate::config::schema::ModelRouteConfig,
964 current: &crate::config::schema::ModelRouteConfig,
965) -> bool {
966 normalize_route_field(&incoming.provider) == normalize_route_field(¤t.provider)
967 && normalize_route_field(&incoming.model) == normalize_route_field(¤t.model)
968}
969
970fn embedding_route_identity_matches(
971 incoming: &crate::config::schema::EmbeddingRouteConfig,
972 current: &crate::config::schema::EmbeddingRouteConfig,
973) -> bool {
974 normalize_route_field(&incoming.hint) == normalize_route_field(¤t.hint)
975 && normalize_route_field(&incoming.provider) == normalize_route_field(¤t.provider)
976 && normalize_route_field(&incoming.model) == normalize_route_field(¤t.model)
977}
978
979fn embedding_route_provider_model_matches(
980 incoming: &crate::config::schema::EmbeddingRouteConfig,
981 current: &crate::config::schema::EmbeddingRouteConfig,
982) -> bool {
983 normalize_route_field(&incoming.provider) == normalize_route_field(¤t.provider)
984 && normalize_route_field(&incoming.model) == normalize_route_field(¤t.model)
985}
986
987fn restore_model_route_api_keys(
988 incoming: &mut [crate::config::schema::ModelRouteConfig],
989 current: &[crate::config::schema::ModelRouteConfig],
990) {
991 let mut used_current = vec![false; current.len()];
992 for incoming_route in incoming {
993 if !incoming_route
994 .api_key
995 .as_deref()
996 .is_some_and(is_masked_secret)
997 {
998 continue;
999 }
1000
1001 let exact_match_idx = current
1002 .iter()
1003 .enumerate()
1004 .find(|(idx, current_route)| {
1005 !used_current[*idx] && model_route_identity_matches(incoming_route, current_route)
1006 })
1007 .map(|(idx, _)| idx);
1008
1009 let match_idx = exact_match_idx.or_else(|| {
1010 current
1011 .iter()
1012 .enumerate()
1013 .find(|(idx, current_route)| {
1014 !used_current[*idx]
1015 && model_route_provider_model_matches(incoming_route, current_route)
1016 })
1017 .map(|(idx, _)| idx)
1018 });
1019
1020 if let Some(idx) = match_idx {
1021 used_current[idx] = true;
1022 incoming_route.api_key = current[idx].api_key.clone();
1023 } else {
1024 incoming_route.api_key = None;
1026 }
1027 }
1028}
1029
1030fn restore_embedding_route_api_keys(
1031 incoming: &mut [crate::config::schema::EmbeddingRouteConfig],
1032 current: &[crate::config::schema::EmbeddingRouteConfig],
1033) {
1034 let mut used_current = vec![false; current.len()];
1035 for incoming_route in incoming {
1036 if !incoming_route
1037 .api_key
1038 .as_deref()
1039 .is_some_and(is_masked_secret)
1040 {
1041 continue;
1042 }
1043
1044 let exact_match_idx = current
1045 .iter()
1046 .enumerate()
1047 .find(|(idx, current_route)| {
1048 !used_current[*idx]
1049 && embedding_route_identity_matches(incoming_route, current_route)
1050 })
1051 .map(|(idx, _)| idx);
1052
1053 let match_idx = exact_match_idx.or_else(|| {
1054 current
1055 .iter()
1056 .enumerate()
1057 .find(|(idx, current_route)| {
1058 !used_current[*idx]
1059 && embedding_route_provider_model_matches(incoming_route, current_route)
1060 })
1061 .map(|(idx, _)| idx)
1062 });
1063
1064 if let Some(idx) = match_idx {
1065 used_current[idx] = true;
1066 incoming_route.api_key = current[idx].api_key.clone();
1067 } else {
1068 incoming_route.api_key = None;
1070 }
1071 }
1072}
1073
1074fn mask_sensitive_fields(config: &crate::config::Config) -> crate::config::Config {
1075 let mut masked = config.clone();
1076
1077 mask_optional_secret(&mut masked.api_key);
1078 mask_vec_secrets(&mut masked.reliability.api_keys);
1079 mask_vec_secrets(&mut masked.gateway.paired_tokens);
1080 mask_optional_secret(&mut masked.composio.api_key);
1081 mask_optional_secret(&mut masked.browser.computer_use.api_key);
1082 mask_optional_secret(&mut masked.web_search.brave_api_key);
1083 mask_optional_secret(&mut masked.storage.provider.config.db_url);
1084 if let Some(cloudflare) = masked.tunnel.cloudflare.as_mut() {
1086 mask_required_secret(&mut cloudflare.token);
1087 }
1088 if let Some(ngrok) = masked.tunnel.ngrok.as_mut() {
1089 mask_required_secret(&mut ngrok.auth_token);
1090 }
1091
1092 for agent in masked.agents.values_mut() {
1093 mask_optional_secret(&mut agent.api_key);
1094 }
1095 for route in &mut masked.model_routes {
1096 mask_optional_secret(&mut route.api_key);
1097 }
1098 for route in &mut masked.embedding_routes {
1099 mask_optional_secret(&mut route.api_key);
1100 }
1101
1102 if let Some(telegram) = masked.channels_config.telegram.as_mut() {
1103 mask_required_secret(&mut telegram.bot_token);
1104 }
1105 if let Some(discord) = masked.channels_config.discord.as_mut() {
1106 mask_required_secret(&mut discord.bot_token);
1107 }
1108 if let Some(slack) = masked.channels_config.slack.as_mut() {
1109 mask_required_secret(&mut slack.bot_token);
1110 mask_optional_secret(&mut slack.app_token);
1111 }
1112 if let Some(mattermost) = masked.channels_config.mattermost.as_mut() {
1113 mask_required_secret(&mut mattermost.bot_token);
1114 }
1115 if let Some(webhook) = masked.channels_config.webhook.as_mut() {
1116 mask_optional_secret(&mut webhook.secret);
1117 }
1118 if let Some(matrix) = masked.channels_config.matrix.as_mut() {
1119 mask_required_secret(&mut matrix.access_token);
1120 }
1121 if let Some(whatsapp) = masked.channels_config.whatsapp.as_mut() {
1122 mask_optional_secret(&mut whatsapp.access_token);
1123 mask_optional_secret(&mut whatsapp.app_secret);
1124 mask_optional_secret(&mut whatsapp.verify_token);
1125 }
1126 if let Some(linq) = masked.channels_config.linq.as_mut() {
1127 mask_required_secret(&mut linq.api_token);
1128 mask_optional_secret(&mut linq.signing_secret);
1129 }
1130 if let Some(nextcloud) = masked.channels_config.nextcloud_talk.as_mut() {
1131 mask_required_secret(&mut nextcloud.app_token);
1132 mask_optional_secret(&mut nextcloud.webhook_secret);
1133 }
1134 if let Some(wati) = masked.channels_config.wati.as_mut() {
1135 mask_required_secret(&mut wati.api_token);
1136 }
1137 if let Some(irc) = masked.channels_config.irc.as_mut() {
1138 mask_optional_secret(&mut irc.server_password);
1139 mask_optional_secret(&mut irc.nickserv_password);
1140 mask_optional_secret(&mut irc.sasl_password);
1141 }
1142 if let Some(lark) = masked.channels_config.lark.as_mut() {
1143 mask_required_secret(&mut lark.app_secret);
1144 mask_optional_secret(&mut lark.encrypt_key);
1145 mask_optional_secret(&mut lark.verification_token);
1146 }
1147 if let Some(feishu) = masked.channels_config.feishu.as_mut() {
1148 mask_required_secret(&mut feishu.app_secret);
1149 mask_optional_secret(&mut feishu.encrypt_key);
1150 mask_optional_secret(&mut feishu.verification_token);
1151 }
1152 if let Some(dingtalk) = masked.channels_config.dingtalk.as_mut() {
1153 mask_required_secret(&mut dingtalk.client_secret);
1154 }
1155 if let Some(qq) = masked.channels_config.qq.as_mut() {
1156 mask_required_secret(&mut qq.app_secret);
1157 }
1158 #[cfg(feature = "channel-nostr")]
1159 if let Some(nostr) = masked.channels_config.nostr.as_mut() {
1160 mask_required_secret(&mut nostr.private_key);
1161 }
1162 if let Some(clawdtalk) = masked.channels_config.clawdtalk.as_mut() {
1163 mask_required_secret(&mut clawdtalk.api_key);
1164 mask_optional_secret(&mut clawdtalk.webhook_secret);
1165 }
1166 if let Some(email) = masked.channels_config.email.as_mut() {
1167 mask_required_secret(&mut email.password);
1168 }
1169 mask_optional_secret(&mut masked.transcription.api_key);
1170 mask_optional_secret(&mut masked.clawhub.api_token);
1171 masked
1172}
1173
1174fn restore_masked_sensitive_fields(
1175 incoming: &mut crate::config::Config,
1176 current: &crate::config::Config,
1177) {
1178 restore_optional_secret(&mut incoming.api_key, ¤t.api_key);
1179 restore_vec_secrets(
1180 &mut incoming.gateway.paired_tokens,
1181 ¤t.gateway.paired_tokens,
1182 );
1183 restore_vec_secrets(
1184 &mut incoming.reliability.api_keys,
1185 ¤t.reliability.api_keys,
1186 );
1187 restore_optional_secret(&mut incoming.composio.api_key, ¤t.composio.api_key);
1188 restore_optional_secret(
1189 &mut incoming.browser.computer_use.api_key,
1190 ¤t.browser.computer_use.api_key,
1191 );
1192 restore_optional_secret(
1193 &mut incoming.web_search.brave_api_key,
1194 ¤t.web_search.brave_api_key,
1195 );
1196 restore_optional_secret(
1197 &mut incoming.storage.provider.config.db_url,
1198 ¤t.storage.provider.config.db_url,
1199 );
1200 if let (Some(incoming_tunnel), Some(current_tunnel)) = (
1202 incoming.tunnel.cloudflare.as_mut(),
1203 current.tunnel.cloudflare.as_ref(),
1204 ) {
1205 restore_required_secret(&mut incoming_tunnel.token, ¤t_tunnel.token);
1206 }
1207 if let (Some(incoming_tunnel), Some(current_tunnel)) = (
1208 incoming.tunnel.ngrok.as_mut(),
1209 current.tunnel.ngrok.as_ref(),
1210 ) {
1211 restore_required_secret(&mut incoming_tunnel.auth_token, ¤t_tunnel.auth_token);
1212 }
1213
1214 for (name, agent) in &mut incoming.agents {
1215 if let Some(current_agent) = current.agents.get(name) {
1216 restore_optional_secret(&mut agent.api_key, ¤t_agent.api_key);
1217 }
1218 }
1219 restore_model_route_api_keys(&mut incoming.model_routes, ¤t.model_routes);
1220 restore_embedding_route_api_keys(&mut incoming.embedding_routes, ¤t.embedding_routes);
1221
1222 if let (Some(incoming_ch), Some(current_ch)) = (
1223 incoming.channels_config.telegram.as_mut(),
1224 current.channels_config.telegram.as_ref(),
1225 ) {
1226 restore_required_secret(&mut incoming_ch.bot_token, ¤t_ch.bot_token);
1227 }
1228 if let (Some(incoming_ch), Some(current_ch)) = (
1229 incoming.channels_config.discord.as_mut(),
1230 current.channels_config.discord.as_ref(),
1231 ) {
1232 restore_required_secret(&mut incoming_ch.bot_token, ¤t_ch.bot_token);
1233 }
1234 if let (Some(incoming_ch), Some(current_ch)) = (
1235 incoming.channels_config.slack.as_mut(),
1236 current.channels_config.slack.as_ref(),
1237 ) {
1238 restore_required_secret(&mut incoming_ch.bot_token, ¤t_ch.bot_token);
1239 restore_optional_secret(&mut incoming_ch.app_token, ¤t_ch.app_token);
1240 }
1241 if let (Some(incoming_ch), Some(current_ch)) = (
1242 incoming.channels_config.mattermost.as_mut(),
1243 current.channels_config.mattermost.as_ref(),
1244 ) {
1245 restore_required_secret(&mut incoming_ch.bot_token, ¤t_ch.bot_token);
1246 }
1247 if let (Some(incoming_ch), Some(current_ch)) = (
1248 incoming.channels_config.webhook.as_mut(),
1249 current.channels_config.webhook.as_ref(),
1250 ) {
1251 restore_optional_secret(&mut incoming_ch.secret, ¤t_ch.secret);
1252 }
1253 if let (Some(incoming_ch), Some(current_ch)) = (
1254 incoming.channels_config.matrix.as_mut(),
1255 current.channels_config.matrix.as_ref(),
1256 ) {
1257 restore_required_secret(&mut incoming_ch.access_token, ¤t_ch.access_token);
1258 }
1259 if let (Some(incoming_ch), Some(current_ch)) = (
1260 incoming.channels_config.whatsapp.as_mut(),
1261 current.channels_config.whatsapp.as_ref(),
1262 ) {
1263 restore_optional_secret(&mut incoming_ch.access_token, ¤t_ch.access_token);
1264 restore_optional_secret(&mut incoming_ch.app_secret, ¤t_ch.app_secret);
1265 restore_optional_secret(&mut incoming_ch.verify_token, ¤t_ch.verify_token);
1266 }
1267 if let (Some(incoming_ch), Some(current_ch)) = (
1268 incoming.channels_config.linq.as_mut(),
1269 current.channels_config.linq.as_ref(),
1270 ) {
1271 restore_required_secret(&mut incoming_ch.api_token, ¤t_ch.api_token);
1272 restore_optional_secret(&mut incoming_ch.signing_secret, ¤t_ch.signing_secret);
1273 }
1274 if let (Some(incoming_ch), Some(current_ch)) = (
1275 incoming.channels_config.nextcloud_talk.as_mut(),
1276 current.channels_config.nextcloud_talk.as_ref(),
1277 ) {
1278 restore_required_secret(&mut incoming_ch.app_token, ¤t_ch.app_token);
1279 restore_optional_secret(&mut incoming_ch.webhook_secret, ¤t_ch.webhook_secret);
1280 }
1281 if let (Some(incoming_ch), Some(current_ch)) = (
1282 incoming.channels_config.wati.as_mut(),
1283 current.channels_config.wati.as_ref(),
1284 ) {
1285 restore_required_secret(&mut incoming_ch.api_token, ¤t_ch.api_token);
1286 }
1287 if let (Some(incoming_ch), Some(current_ch)) = (
1288 incoming.channels_config.irc.as_mut(),
1289 current.channels_config.irc.as_ref(),
1290 ) {
1291 restore_optional_secret(
1292 &mut incoming_ch.server_password,
1293 ¤t_ch.server_password,
1294 );
1295 restore_optional_secret(
1296 &mut incoming_ch.nickserv_password,
1297 ¤t_ch.nickserv_password,
1298 );
1299 restore_optional_secret(&mut incoming_ch.sasl_password, ¤t_ch.sasl_password);
1300 }
1301 if let (Some(incoming_ch), Some(current_ch)) = (
1302 incoming.channels_config.lark.as_mut(),
1303 current.channels_config.lark.as_ref(),
1304 ) {
1305 restore_required_secret(&mut incoming_ch.app_secret, ¤t_ch.app_secret);
1306 restore_optional_secret(&mut incoming_ch.encrypt_key, ¤t_ch.encrypt_key);
1307 restore_optional_secret(
1308 &mut incoming_ch.verification_token,
1309 ¤t_ch.verification_token,
1310 );
1311 }
1312 if let (Some(incoming_ch), Some(current_ch)) = (
1313 incoming.channels_config.feishu.as_mut(),
1314 current.channels_config.feishu.as_ref(),
1315 ) {
1316 restore_required_secret(&mut incoming_ch.app_secret, ¤t_ch.app_secret);
1317 restore_optional_secret(&mut incoming_ch.encrypt_key, ¤t_ch.encrypt_key);
1318 restore_optional_secret(
1319 &mut incoming_ch.verification_token,
1320 ¤t_ch.verification_token,
1321 );
1322 }
1323 if let (Some(incoming_ch), Some(current_ch)) = (
1324 incoming.channels_config.dingtalk.as_mut(),
1325 current.channels_config.dingtalk.as_ref(),
1326 ) {
1327 restore_required_secret(&mut incoming_ch.client_secret, ¤t_ch.client_secret);
1328 }
1329 if let (Some(incoming_ch), Some(current_ch)) = (
1330 incoming.channels_config.qq.as_mut(),
1331 current.channels_config.qq.as_ref(),
1332 ) {
1333 restore_required_secret(&mut incoming_ch.app_secret, ¤t_ch.app_secret);
1334 }
1335 #[cfg(feature = "channel-nostr")]
1336 if let (Some(incoming_ch), Some(current_ch)) = (
1337 incoming.channels_config.nostr.as_mut(),
1338 current.channels_config.nostr.as_ref(),
1339 ) {
1340 restore_required_secret(&mut incoming_ch.private_key, ¤t_ch.private_key);
1341 }
1342 if let (Some(incoming_ch), Some(current_ch)) = (
1343 incoming.channels_config.clawdtalk.as_mut(),
1344 current.channels_config.clawdtalk.as_ref(),
1345 ) {
1346 restore_required_secret(&mut incoming_ch.api_key, ¤t_ch.api_key);
1347 restore_optional_secret(&mut incoming_ch.webhook_secret, ¤t_ch.webhook_secret);
1348 }
1349 if let (Some(incoming_ch), Some(current_ch)) = (
1350 incoming.channels_config.email.as_mut(),
1351 current.channels_config.email.as_ref(),
1352 ) {
1353 restore_required_secret(&mut incoming_ch.password, ¤t_ch.password);
1354 }
1355 restore_optional_secret(
1356 &mut incoming.transcription.api_key,
1357 ¤t.transcription.api_key,
1358 );
1359 restore_optional_secret(&mut incoming.clawhub.api_token, ¤t.clawhub.api_token);
1360}
1361
1362fn hydrate_config_for_save(
1363 mut incoming: crate::config::Config,
1364 current: &crate::config::Config,
1365) -> crate::config::Config {
1366 restore_masked_sensitive_fields(&mut incoming, current);
1367 incoming.config_path = current.config_path.clone();
1369 incoming.workspace_dir = current.workspace_dir.clone();
1370 incoming
1371}
1372
1373pub async fn handle_api_sessions_list(
1377 State(state): State<AppState>,
1378 headers: HeaderMap,
1379) -> impl IntoResponse {
1380 if let Err(e) = require_auth(&state, &headers) {
1381 return e.into_response();
1382 }
1383
1384 let Some(ref backend) = state.session_backend else {
1385 return Json(serde_json::json!({
1386 "sessions": [],
1387 "message": "Session persistence is disabled"
1388 }))
1389 .into_response();
1390 };
1391
1392 let now = chrono::Utc::now();
1393 let all_metadata = backend.list_sessions_with_metadata();
1394 let gw_sessions: Vec<serde_json::Value> = all_metadata
1395 .into_iter()
1396 .filter_map(|meta| {
1397 let id = meta.key.strip_prefix("gw_")?;
1398 let status = if (now - meta.last_activity).num_minutes() < 5 {
1399 "active"
1400 } else {
1401 "idle"
1402 };
1403 let mut entry = serde_json::json!({
1404 "id": id,
1405 "channel": "gateway",
1406 "started_at": meta.created_at.to_rfc3339(),
1407 "last_activity": meta.last_activity.to_rfc3339(),
1408 "status": status,
1409 "message_count": meta.message_count,
1410 });
1411 if let Some(name) = meta.name {
1412 entry["name"] = serde_json::Value::String(name);
1413 }
1414 Some(entry)
1415 })
1416 .collect();
1417
1418 Json(serde_json::json!({ "sessions": gw_sessions })).into_response()
1419}
1420
1421pub async fn handle_api_session_messages(
1423 State(state): State<AppState>,
1424 headers: HeaderMap,
1425 Path(id): Path<String>,
1426) -> impl IntoResponse {
1427 if let Err(e) = require_auth(&state, &headers) {
1428 return e.into_response();
1429 }
1430
1431 let Some(ref backend) = state.session_backend else {
1432 return Json(serde_json::json!({
1433 "session_id": id,
1434 "messages": [],
1435 "session_persistence": false,
1436 }))
1437 .into_response();
1438 };
1439
1440 let session_key = format!("gw_{id}");
1441 let msgs = backend.load(&session_key);
1442 let messages: Vec<serde_json::Value> = msgs
1443 .into_iter()
1444 .map(|m| serde_json::json!({ "role": m.role, "content": m.content }))
1445 .collect();
1446
1447 Json(serde_json::json!({
1448 "session_id": id,
1449 "messages": messages,
1450 "session_persistence": true,
1451 }))
1452 .into_response()
1453}
1454
1455pub async fn handle_api_session_delete(
1457 State(state): State<AppState>,
1458 headers: HeaderMap,
1459 Path(id): Path<String>,
1460) -> impl IntoResponse {
1461 if let Err(e) = require_auth(&state, &headers) {
1462 return e.into_response();
1463 }
1464
1465 let Some(ref backend) = state.session_backend else {
1466 return (
1467 StatusCode::NOT_FOUND,
1468 Json(serde_json::json!({"error": "Session persistence is disabled"})),
1469 )
1470 .into_response();
1471 };
1472
1473 let session_key = format!("gw_{id}");
1474 match backend.delete_session(&session_key) {
1475 Ok(true) => Json(serde_json::json!({"deleted": true, "session_id": id})).into_response(),
1476 Ok(false) => (
1477 StatusCode::NOT_FOUND,
1478 Json(serde_json::json!({"error": "Session not found"})),
1479 )
1480 .into_response(),
1481 Err(e) => (
1482 StatusCode::INTERNAL_SERVER_ERROR,
1483 Json(serde_json::json!({"error": format!("Failed to delete session: {e}")})),
1484 )
1485 .into_response(),
1486 }
1487}
1488
1489pub async fn handle_api_session_rename(
1491 State(state): State<AppState>,
1492 headers: HeaderMap,
1493 Path(id): Path<String>,
1494 Json(body): Json<serde_json::Value>,
1495) -> impl IntoResponse {
1496 if let Err(e) = require_auth(&state, &headers) {
1497 return e.into_response();
1498 }
1499
1500 let Some(ref backend) = state.session_backend else {
1501 return (
1502 StatusCode::NOT_FOUND,
1503 Json(serde_json::json!({"error": "Session persistence is disabled"})),
1504 )
1505 .into_response();
1506 };
1507
1508 let name = body["name"].as_str().unwrap_or("").trim();
1509 if name.is_empty() {
1510 return (
1511 StatusCode::BAD_REQUEST,
1512 Json(serde_json::json!({"error": "name is required"})),
1513 )
1514 .into_response();
1515 }
1516
1517 let session_key = format!("gw_{id}");
1518
1519 let sessions = backend.list_sessions();
1521 if !sessions.contains(&session_key) {
1522 return (
1523 StatusCode::NOT_FOUND,
1524 Json(serde_json::json!({"error": "Session not found"})),
1525 )
1526 .into_response();
1527 }
1528
1529 match backend.set_session_name(&session_key, name) {
1530 Ok(()) => Json(serde_json::json!({"session_id": id, "name": name})).into_response(),
1531 Err(e) => (
1532 StatusCode::INTERNAL_SERVER_ERROR,
1533 Json(serde_json::json!({"error": format!("Failed to rename session: {e}")})),
1534 )
1535 .into_response(),
1536 }
1537}
1538
1539pub async fn handle_api_sessions_running(
1541 State(state): State<AppState>,
1542 headers: HeaderMap,
1543) -> impl IntoResponse {
1544 if let Err(e) = require_auth(&state, &headers) {
1545 return e.into_response();
1546 }
1547
1548 let Some(ref backend) = state.session_backend else {
1549 return Json(serde_json::json!({
1550 "sessions": [],
1551 "message": "Session persistence is disabled"
1552 }))
1553 .into_response();
1554 };
1555
1556 let running = backend.list_running_sessions();
1557 let sessions: Vec<serde_json::Value> = running
1558 .into_iter()
1559 .filter_map(|meta| {
1560 let session_id = meta.key.strip_prefix("gw_")?;
1561 Some(serde_json::json!({
1562 "session_id": session_id,
1563 "created_at": meta.created_at.to_rfc3339(),
1564 "last_activity": meta.last_activity.to_rfc3339(),
1565 "message_count": meta.message_count,
1566 }))
1567 })
1568 .collect();
1569
1570 Json(serde_json::json!({ "sessions": sessions })).into_response()
1571}
1572
1573pub async fn handle_api_session_state(
1575 State(state): State<AppState>,
1576 headers: HeaderMap,
1577 Path(id): Path<String>,
1578) -> impl IntoResponse {
1579 if let Err(e) = require_auth(&state, &headers) {
1580 return e.into_response();
1581 }
1582
1583 let Some(ref backend) = state.session_backend else {
1584 return (
1585 StatusCode::NOT_FOUND,
1586 Json(serde_json::json!({"error": "Session persistence is disabled"})),
1587 )
1588 .into_response();
1589 };
1590
1591 let session_key = format!("gw_{id}");
1592 match backend.get_session_state(&session_key) {
1593 Ok(Some(ss)) => {
1594 let mut resp = serde_json::json!({
1595 "session_id": id,
1596 "state": ss.state,
1597 });
1598 if let Some(turn_id) = ss.turn_id {
1599 resp["turn_id"] = serde_json::Value::String(turn_id);
1600 }
1601 if let Some(started) = ss.turn_started_at {
1602 resp["turn_started_at"] = serde_json::Value::String(started.to_rfc3339());
1603 }
1604 Json(resp).into_response()
1605 }
1606 Ok(None) => (
1607 StatusCode::NOT_FOUND,
1608 Json(serde_json::json!({"error": "Session not found"})),
1609 )
1610 .into_response(),
1611 Err(e) => (
1612 StatusCode::INTERNAL_SERVER_ERROR,
1613 Json(serde_json::json!({"error": format!("Failed to get session state: {e}")})),
1614 )
1615 .into_response(),
1616 }
1617}
1618
1619pub async fn handle_claude_code_hook(
1628 State(state): State<AppState>,
1629 Json(payload): Json<crate::tools::claude_code_runner::ClaudeCodeHookEvent>,
1630) -> impl IntoResponse {
1631 let _ = &state; tracing::info!(
1637 session_id = %payload.session_id,
1638 event_type = %payload.event_type,
1639 tool_name = ?payload.tool_name,
1640 summary = ?payload.summary,
1641 "Claude Code hook event received"
1642 );
1643
1644 Json(serde_json::json!({ "ok": true }))
1645}
1646
1647pub async fn handle_api_channel_events(
1656 State(state): State<AppState>,
1657 ConnectInfo(addr): ConnectInfo<std::net::SocketAddr>,
1658 Json(body): Json<serde_json::Value>,
1659) -> impl IntoResponse {
1660 if !addr.ip().is_loopback() {
1662 return (
1663 StatusCode::FORBIDDEN,
1664 Json(
1665 serde_json::json!({ "error": "channel-events only accepts localhost connections" }),
1666 ),
1667 )
1668 .into_response();
1669 }
1670
1671 let event = serde_json::json!({
1675 "type": "channel_event",
1676 "payload": body,
1677 "timestamp": chrono::Utc::now().to_rfc3339(),
1678 });
1679 let _ = state.event_tx.send(event);
1680
1681 if body.get("type").and_then(|v| v.as_str()) == Some("human_approval_request") {
1683 if let (Some(run_id), Some(step_id)) = (
1684 body.get("run_id").and_then(|v| v.as_str()),
1685 body.get("step_id").and_then(|v| v.as_str()),
1686 ) {
1687 let approve_keywords: Vec<String> = body
1688 .get("approve_keywords")
1689 .and_then(|v| v.as_array())
1690 .map(|arr| {
1691 arr.iter()
1692 .filter_map(|v| v.as_str().map(String::from))
1693 .collect()
1694 })
1695 .unwrap_or_default();
1696 let reject_keywords: Vec<String> = body
1697 .get("reject_keywords")
1698 .and_then(|v| v.as_array())
1699 .map(|arr| {
1700 arr.iter()
1701 .filter_map(|v| v.as_str().map(String::from))
1702 .collect()
1703 })
1704 .unwrap_or_default();
1705 let approval = super::approval_registry::PendingApproval::new(
1706 run_id.to_string(),
1707 step_id.to_string(),
1708 body.get("workflow_name")
1709 .and_then(|v| v.as_str())
1710 .unwrap_or("")
1711 .to_string(),
1712 approve_keywords,
1713 reject_keywords,
1714 body.get("cwd")
1715 .and_then(|v| v.as_str())
1716 .unwrap_or("/tmp")
1717 .to_string(),
1718 );
1719 state.approval_registry.register(approval);
1720
1721 let message_text = body
1724 .pointer("/content/message")
1725 .and_then(|v| v.as_str())
1726 .unwrap_or("");
1727 let title_text = body
1728 .pointer("/content/title")
1729 .and_then(|v| v.as_str())
1730 .unwrap_or("Human approval needed");
1731 let _ = state.event_tx.send(serde_json::json!({
1732 "type": "human_approval_request",
1733 "run_id": run_id,
1734 "step_id": step_id,
1735 "workflow_name": body.get("workflow_name").and_then(|v| v.as_str()).unwrap_or(""),
1736 "title": title_text,
1737 "message": message_text,
1738 "timestamp": chrono::Utc::now().to_rfc3339(),
1739 }));
1740 }
1741 }
1742
1743 if let Some(channels) = body.get("channels").and_then(|c| c.as_array()) {
1745 let config = state.config.lock().clone();
1746 let title = body
1747 .pointer("/content/title")
1748 .and_then(|v| v.as_str())
1749 .unwrap_or("Notification")
1750 .to_string();
1751 let message = body
1752 .pointer("/content/message")
1753 .and_then(|v| v.as_str())
1754 .unwrap_or("")
1755 .to_string();
1756
1757 let event_type = body.get("type").and_then(|v| v.as_str()).unwrap_or("");
1759 let keyword_instructions = if event_type == "human_approval_request" {
1760 let approve_kws: Vec<String> = body
1761 .get("approve_keywords")
1762 .and_then(|v| v.as_array())
1763 .map(|arr| {
1764 arr.iter()
1765 .filter_map(|v| v.as_str().map(|s| format!("`{s}`")))
1766 .collect()
1767 })
1768 .unwrap_or_default();
1769 let reject_kws: Vec<String> = body
1770 .get("reject_keywords")
1771 .and_then(|v| v.as_array())
1772 .map(|arr| {
1773 arr.iter()
1774 .filter_map(|v| v.as_str().map(|s| format!("`{s}`")))
1775 .collect()
1776 })
1777 .unwrap_or_default();
1778 let approve_list = if approve_kws.is_empty() {
1779 "`approve`, `yes`, `lgtm`".to_string()
1780 } else {
1781 approve_kws.join(", ")
1782 };
1783 let reject_list = if reject_kws.is_empty() {
1784 "`reject <your feedback>`".to_string()
1785 } else {
1786 format!("{} <your feedback>", reject_kws.join(", "))
1787 };
1788 format!(
1789 "\n---\nReply with one of: {approve_list} to approve.\nReply with {reject_list} to reject."
1790 )
1791 } else {
1792 String::new()
1793 };
1794
1795 let is_approval_request = event_type == "human_approval_request";
1796 let approval_run_id = if is_approval_request {
1797 body.get("run_id")
1798 .and_then(|v| v.as_str())
1799 .map(String::from)
1800 } else {
1801 None
1802 };
1803 let approval_workflow_name = body
1804 .get("workflow_name")
1805 .and_then(|v| v.as_str())
1806 .unwrap_or("workflow")
1807 .to_string();
1808
1809 for ch in channels.iter().filter_map(|v| v.as_str()) {
1810 match ch.to_ascii_lowercase().as_str() {
1814 "discord" => {
1815 if let Some(dc) = &config.channels_config.discord {
1816 if let Some(ch_id) = &dc.notification_channel_id {
1817 let token = dc.bot_token.clone();
1818 let channel_id = ch_id.clone();
1819 let title_c = title.clone();
1820 let message_c = format!("{message}{keyword_instructions}");
1821 let approval_run_id = approval_run_id.clone();
1822 let workflow_name = approval_workflow_name.clone();
1823 let registry = state.approval_registry.clone();
1824 tokio::spawn(async move {
1825 let first_msg_id = dispatch_discord_long(
1826 &token,
1827 &channel_id,
1828 &title_c,
1829 &message_c,
1830 )
1831 .await;
1832 if let Some(run_id) = approval_run_id {
1838 let thread_name = format!(
1839 "Approval: {}",
1840 truncate_thread_name(&workflow_name)
1841 );
1842 let thread_id = match &first_msg_id {
1843 Some(msg_id) => {
1844 create_discord_thread(
1845 &token,
1846 &channel_id,
1847 msg_id,
1848 &thread_name,
1849 )
1850 .await
1851 }
1852 None => None,
1853 };
1854 registry.attach_discord(
1855 &run_id,
1856 Some(channel_id.clone()),
1857 thread_id,
1858 first_msg_id,
1859 );
1860 }
1861 });
1862 } else {
1863 tracing::debug!(
1864 "channel-events: discord requested but notification_channel_id not configured"
1865 );
1866 }
1867 }
1868 }
1869 "slack" => {
1870 if let Some(sc) = &config.channels_config.slack {
1871 if let Some(ch_id) = &sc.notification_channel_id {
1872 let token = sc.bot_token.clone();
1873 let channel_id = ch_id.clone();
1874 let title_c = title.clone();
1875 let message_c = format!("{message}{keyword_instructions}");
1876 let approval_run_id = approval_run_id.clone();
1877 let registry = state.approval_registry.clone();
1878 tokio::spawn(async move {
1879 let text = format!("*{title_c}*\n\n{message_c}");
1880 let thread_ts =
1881 dispatch_slack_notification(&token, &channel_id, &text).await;
1882 if let Some(run_id) = approval_run_id {
1883 registry.attach_slack(
1884 &run_id,
1885 Some(channel_id.clone()),
1886 thread_ts,
1887 );
1888 }
1889 });
1890 } else {
1891 tracing::debug!(
1892 "channel-events: slack requested but notification_channel_id not configured"
1893 );
1894 }
1895 }
1896 }
1897 "telegram" => {
1898 if let Some(tc) = &config.channels_config.telegram {
1899 if let Some(chat_id) = &tc.notification_chat_id {
1900 let token = tc.bot_token.clone();
1901 let chat_id_c = chat_id.clone();
1902 let title_c = title.clone();
1903 let message_c = format!("{message}{keyword_instructions}");
1904 let approval_run_id = approval_run_id.clone();
1905 let registry = state.approval_registry.clone();
1906 tokio::spawn(async move {
1907 let text = format!("*{title_c}*\n\n{message_c}");
1908 let msg_id =
1909 dispatch_telegram_notification(&token, &chat_id_c, &text).await;
1910 if let Some(run_id) = approval_run_id {
1911 registry.attach_telegram(
1912 &run_id,
1913 Some(chat_id_c.clone()),
1914 msg_id,
1915 );
1916 }
1917 });
1918 } else {
1919 tracing::debug!(
1920 "channel-events: telegram requested but notification_chat_id not configured"
1921 );
1922 }
1923 }
1924 }
1925 "dashboard" => {}
1927 _ => {
1928 tracing::debug!("channel-events: unsupported channel '{ch}', skipping");
1929 }
1930 }
1931 }
1932 }
1933
1934 Json(serde_json::json!({ "ok": true })).into_response()
1935}
1936
1937async fn dispatch_discord_notification(
1940 bot_token: &str,
1941 channel_id: &str,
1942 content: &str,
1943) -> Option<String> {
1944 let url = format!(
1945 "https://discord.com/api/v10/channels/{}/messages",
1946 channel_id
1947 );
1948 let client = reqwest::Client::new();
1949 match client
1950 .post(&url)
1951 .header("Authorization", format!("Bot {bot_token}"))
1952 .json(&serde_json::json!({ "content": content }))
1953 .send()
1954 .await
1955 {
1956 Ok(resp) if resp.status().is_success() => {
1957 tracing::info!("channel-events: Discord notification sent to {channel_id}");
1958 let body: serde_json::Value = resp.json().await.unwrap_or(serde_json::Value::Null);
1959 body.get("id").and_then(|v| v.as_str()).map(String::from)
1960 }
1961 Ok(resp) => {
1962 let status = resp.status();
1963 let body = resp.text().await.unwrap_or_default();
1964 tracing::warn!(
1965 "channel-events: Discord API {status}: {}",
1966 &body[..body.len().min(200)]
1967 );
1968 None
1969 }
1970 Err(e) => {
1971 tracing::warn!("channel-events: Discord send failed: {e}");
1972 None
1973 }
1974 }
1975}
1976
1977async fn create_discord_thread(
1983 bot_token: &str,
1984 channel_id: &str,
1985 message_id: &str,
1986 name: &str,
1987) -> Option<String> {
1988 let url =
1989 format!("https://discord.com/api/v10/channels/{channel_id}/messages/{message_id}/threads",);
1990 let client = reqwest::Client::new();
1991 match client
1992 .post(&url)
1993 .header("Authorization", format!("Bot {bot_token}"))
1994 .json(&serde_json::json!({
1995 "name": name,
1996 "auto_archive_duration": 1440, }))
1998 .send()
1999 .await
2000 {
2001 Ok(resp) if resp.status().is_success() => {
2002 let body: serde_json::Value = resp.json().await.unwrap_or(serde_json::Value::Null);
2003 let thread_id = body.get("id").and_then(|v| v.as_str()).map(String::from);
2004 if thread_id.is_some() {
2005 tracing::info!(
2006 channel_id,
2007 message_id,
2008 "channel-events: Discord thread created for approval"
2009 );
2010 }
2011 thread_id
2012 }
2013 Ok(resp) => {
2014 let status = resp.status();
2015 let body = resp.text().await.unwrap_or_default();
2016 tracing::warn!(
2017 "channel-events: Discord thread create {status}: {} (falling back to reply-only scoping)",
2018 &body[..body.len().min(200)]
2019 );
2020 None
2021 }
2022 Err(e) => {
2023 tracing::warn!("channel-events: Discord thread create failed: {e}");
2024 None
2025 }
2026 }
2027}
2028
2029async fn dispatch_slack_notification(
2032 bot_token: &str,
2033 channel_id: &str,
2034 text: &str,
2035) -> Option<String> {
2036 let client = reqwest::Client::new();
2037 match client
2038 .post("https://slack.com/api/chat.postMessage")
2039 .bearer_auth(bot_token)
2040 .json(&serde_json::json!({
2041 "channel": channel_id,
2042 "text": text,
2043 }))
2044 .send()
2045 .await
2046 {
2047 Ok(resp) => {
2048 let body: serde_json::Value = resp.json().await.unwrap_or(serde_json::Value::Null);
2049 if body.get("ok") == Some(&serde_json::Value::Bool(true)) {
2050 tracing::info!("channel-events: Slack notification sent to {channel_id}");
2051 body.get("ts").and_then(|v| v.as_str()).map(String::from)
2052 } else {
2053 let err = body
2054 .get("error")
2055 .and_then(|e| e.as_str())
2056 .unwrap_or("unknown");
2057 tracing::warn!("channel-events: Slack chat.postMessage error: {err}");
2058 None
2059 }
2060 }
2061 Err(e) => {
2062 tracing::warn!("channel-events: Slack send failed: {e}");
2063 None
2064 }
2065 }
2066}
2067
2068async fn dispatch_telegram_notification(bot_token: &str, chat_id: &str, text: &str) -> Option<i64> {
2071 let url = format!("https://api.telegram.org/bot{bot_token}/sendMessage");
2072 let client = reqwest::Client::new();
2073 match client
2074 .post(&url)
2075 .json(&serde_json::json!({
2076 "chat_id": chat_id,
2077 "text": text,
2078 }))
2079 .send()
2080 .await
2081 {
2082 Ok(resp) => {
2083 let body: serde_json::Value = resp.json().await.unwrap_or(serde_json::Value::Null);
2084 if body.get("ok") == Some(&serde_json::Value::Bool(true)) {
2085 tracing::info!("channel-events: Telegram notification sent to {chat_id}");
2086 body.pointer("/result/message_id").and_then(|v| v.as_i64())
2087 } else {
2088 let desc = body
2089 .get("description")
2090 .and_then(|e| e.as_str())
2091 .unwrap_or("unknown");
2092 tracing::warn!("channel-events: Telegram sendMessage error: {desc}");
2093 None
2094 }
2095 }
2096 Err(e) => {
2097 tracing::warn!("channel-events: Telegram send failed: {e}");
2098 None
2099 }
2100 }
2101}
2102
2103fn truncate_thread_name(name: &str) -> String {
2106 const MAX: usize = 80; if name.chars().count() <= MAX {
2108 name.to_string()
2109 } else {
2110 let truncated: String = name.chars().take(MAX - 1).collect();
2111 format!("{truncated}…")
2112 }
2113}
2114
2115async fn dispatch_discord_long(
2118 bot_token: &str,
2119 channel_id: &str,
2120 title: &str,
2121 message: &str,
2122) -> Option<String> {
2123 let header = format!("**{}**\n\n", title);
2124 let full = format!("{}{}", header, message);
2125
2126 if full.len() <= 1900 {
2127 return dispatch_discord_notification(bot_token, channel_id, &full).await;
2128 }
2129
2130 let mut chunks: Vec<String> = Vec::new();
2132 let mut current = header;
2133
2134 for line in message.lines() {
2135 let sub_lines: Vec<&str> = if line.len() > 1850 {
2137 let mut parts = Vec::new();
2139 let mut start = 0;
2140 while start < line.len() {
2141 let end = std::cmp::min(start + 1850, line.len());
2142 let break_at = if end < line.len() {
2144 line[start..end]
2145 .rfind(' ')
2146 .map(|p| start + p + 1)
2147 .unwrap_or(end)
2148 } else {
2149 end
2150 };
2151 parts.push(&line[start..break_at]);
2152 start = break_at;
2153 }
2154 parts
2155 } else {
2156 vec![line]
2157 };
2158
2159 for sub in sub_lines {
2160 let candidate = if current.is_empty() {
2161 sub.to_string()
2162 } else {
2163 format!("{}\n{}", current, sub)
2164 };
2165
2166 if candidate.len() > 1900 && !current.is_empty() {
2167 chunks.push(current);
2168 current = sub.to_string();
2169 } else {
2170 current = candidate;
2171 }
2172 }
2173 }
2174 if !current.is_empty() {
2175 chunks.push(current);
2176 }
2177
2178 let n = chunks.len();
2179 let mut first_msg_id: Option<String> = None;
2180 for (i, chunk) in chunks.iter().enumerate() {
2181 let text = if n > 1 {
2182 format!("{}\n`({}/{})`", chunk, i + 1, n)
2183 } else {
2184 chunk.clone()
2185 };
2186 let msg_id = dispatch_discord_notification(bot_token, channel_id, &text).await;
2187 if i == 0 {
2188 first_msg_id = msg_id;
2189 }
2190 if i + 1 < n {
2192 tokio::time::sleep(std::time::Duration::from_millis(600)).await;
2193 }
2194 }
2195 tracing::info!("channel-events: Discord long message sent in {n} chunk(s) to {channel_id}");
2196 first_msg_id
2197}
2198
2199pub async fn handle_api_channels(
2207 State(state): State<AppState>,
2208 headers: HeaderMap,
2209) -> impl IntoResponse {
2210 if let Err(e) = require_auth(&state, &headers) {
2211 return e.into_response();
2212 }
2213
2214 let config = state.config.lock().clone();
2215 let cc = &config.channels_config;
2216
2217 let mut channels = Vec::new();
2218
2219 channels.push(serde_json::json!({
2221 "name": "cli",
2222 "type": "cli",
2223 "enabled": cc.cli,
2224 "status": if cc.cli { "active" } else { "inactive" },
2225 "message_count": 0,
2226 "last_message_at": null,
2227 "health": if cc.cli { "healthy" } else { "down" },
2228 }));
2229
2230 for (handle, present) in cc.channels() {
2232 channels.push(serde_json::json!({
2233 "name": handle.name(),
2234 "type": handle.name(),
2235 "enabled": present,
2236 "status": if present { "active" } else { "inactive" },
2237 "message_count": 0,
2238 "last_message_at": null,
2239 "health": if present { "healthy" } else { "down" },
2240 }));
2241 }
2242
2243 Json(serde_json::json!({ "channels": channels })).into_response()
2244}
2245
2246#[cfg(test)]
2247mod tests {
2248 use super::*;
2249 use crate::gateway::{AppState, GatewayRateLimiter, IdempotencyStore, nodes};
2250 use crate::memory::{Memory, MemoryCategory, MemoryEntry};
2251 use crate::providers::Provider;
2252 use crate::security::pairing::PairingGuard;
2253 use async_trait::async_trait;
2254 use axum::response::IntoResponse;
2255 use http_body_util::BodyExt;
2256 use parking_lot::Mutex;
2257 use std::sync::Arc;
2258 use std::time::Duration;
2259
2260 struct MockMemory;
2261
2262 #[async_trait]
2263 impl Memory for MockMemory {
2264 fn name(&self) -> &str {
2265 "mock"
2266 }
2267
2268 async fn store(
2269 &self,
2270 _key: &str,
2271 _content: &str,
2272 _category: MemoryCategory,
2273 _session_id: Option<&str>,
2274 ) -> anyhow::Result<()> {
2275 Ok(())
2276 }
2277
2278 async fn recall(
2279 &self,
2280 _query: &str,
2281 _limit: usize,
2282 _session_id: Option<&str>,
2283 _since: Option<&str>,
2284 _until: Option<&str>,
2285 ) -> anyhow::Result<Vec<MemoryEntry>> {
2286 Ok(Vec::new())
2287 }
2288
2289 async fn get(&self, _key: &str) -> anyhow::Result<Option<MemoryEntry>> {
2290 Ok(None)
2291 }
2292
2293 async fn list(
2294 &self,
2295 _category: Option<&MemoryCategory>,
2296 _session_id: Option<&str>,
2297 ) -> anyhow::Result<Vec<MemoryEntry>> {
2298 Ok(Vec::new())
2299 }
2300
2301 async fn forget(&self, _key: &str) -> anyhow::Result<bool> {
2302 Ok(false)
2303 }
2304
2305 async fn count(&self) -> anyhow::Result<usize> {
2306 Ok(0)
2307 }
2308
2309 async fn health_check(&self) -> bool {
2310 true
2311 }
2312 }
2313
2314 struct MockProvider;
2315
2316 #[async_trait]
2317 impl Provider for MockProvider {
2318 async fn chat_with_system(
2319 &self,
2320 _system_prompt: Option<&str>,
2321 _message: &str,
2322 _model: &str,
2323 _temperature: f64,
2324 ) -> anyhow::Result<String> {
2325 Ok("ok".to_string())
2326 }
2327 }
2328
2329 fn test_state(config: crate::config::Config) -> AppState {
2330 AppState {
2331 config: Arc::new(Mutex::new(config)),
2332 provider: Arc::new(MockProvider),
2333 model: "test-model".into(),
2334 temperature: 0.0,
2335 mem: Arc::new(MockMemory),
2336 auto_save: false,
2337 webhook_secret_hash: None,
2338 pairing: Arc::new(PairingGuard::new(false, &[])),
2339 trust_forwarded_headers: false,
2340 rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
2341 auth_limiter: Arc::new(crate::gateway::auth_rate_limit::AuthRateLimiter::new()),
2342 idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
2343 whatsapp: None,
2344 whatsapp_app_secret: None,
2345 linq: None,
2346 linq_signing_secret: None,
2347 nextcloud_talk: None,
2348 nextcloud_talk_webhook_secret: None,
2349 wati: None,
2350 gmail_push: None,
2351 observer: Arc::new(crate::observability::NoopObserver),
2352 tools_registry: Arc::new(Vec::new()),
2353 cost_tracker: None,
2354 audit_logger: None,
2355 event_tx: tokio::sync::broadcast::channel(16).0,
2356 shutdown_tx: tokio::sync::watch::channel(false).0,
2357 node_registry: Arc::new(nodes::NodeRegistry::new(16)),
2358 session_backend: None,
2359 session_queue: Arc::new(crate::gateway::session_queue::SessionActorQueue::new(
2360 8, 30, 600,
2361 )),
2362 device_registry: None,
2363 pending_pairings: None,
2364 path_prefix: String::new(),
2365 canvas_store: crate::tools::canvas::CanvasStore::new(),
2366 mcp_registry: None,
2367 approval_registry: crate::gateway::approval_registry::global(),
2368 mcp_local_url: None,
2369 #[cfg(feature = "webauthn")]
2370 webauthn: None,
2371 }
2372 }
2373
2374 async fn response_json(response: axum::response::Response) -> serde_json::Value {
2375 let body = response
2376 .into_body()
2377 .collect()
2378 .await
2379 .expect("response body")
2380 .to_bytes();
2381 serde_json::from_slice(&body).expect("valid json response")
2382 }
2383
2384 #[test]
2385 fn masking_keeps_toml_valid_and_preserves_api_keys_type() {
2386 let mut cfg = crate::config::Config::default();
2387 cfg.api_key = Some("sk-live-123".to_string());
2388 cfg.reliability.api_keys = vec!["rk-1".to_string(), "rk-2".to_string()];
2389 cfg.gateway.paired_tokens = vec!["pair-token-1".to_string()];
2390 cfg.tunnel.cloudflare = Some(crate::config::schema::CloudflareTunnelConfig {
2391 token: "cf-token".to_string(),
2392 });
2393 cfg.channels_config.wati = Some(crate::config::schema::WatiConfig {
2395 api_token: "wati-token".to_string(),
2396 api_url: "https://live-mt-server.wati.io".to_string(),
2397 tenant_id: None,
2398 allowed_numbers: vec![],
2399 proxy_url: None,
2400 });
2401 cfg.channels_config.feishu = Some(crate::config::schema::FeishuConfig {
2402 app_id: "cli_aabbcc".to_string(),
2403 app_secret: "feishu-secret".to_string(),
2404 encrypt_key: Some("feishu-encrypt".to_string()),
2405 verification_token: Some("feishu-verify".to_string()),
2406 allowed_users: vec!["*".to_string()],
2407 receive_mode: crate::config::schema::LarkReceiveMode::Websocket,
2408 port: None,
2409 proxy_url: None,
2410 });
2411 cfg.channels_config.email = Some(crate::channels::email_channel::EmailConfig {
2412 imap_host: "imap.example.com".to_string(),
2413 imap_port: 993,
2414 imap_folder: "INBOX".to_string(),
2415 smtp_host: "smtp.example.com".to_string(),
2416 smtp_port: 465,
2417 smtp_tls: true,
2418 username: "agent@example.com".to_string(),
2419 password: "email-password-secret".to_string(),
2420 from_address: "agent@example.com".to_string(),
2421 idle_timeout_secs: 1740,
2422 allowed_senders: vec!["*".to_string()],
2423 default_subject: "Construct Message".to_string(),
2424 });
2425 cfg.model_routes = vec![crate::config::schema::ModelRouteConfig {
2426 hint: "reasoning".to_string(),
2427 provider: "openrouter".to_string(),
2428 model: "anthropic/claude-sonnet-4.6".to_string(),
2429 api_key: Some("route-model-key".to_string()),
2430 }];
2431 cfg.embedding_routes = vec![crate::config::schema::EmbeddingRouteConfig {
2432 hint: "semantic".to_string(),
2433 provider: "openai".to_string(),
2434 model: "text-embedding-3-small".to_string(),
2435 dimensions: Some(1536),
2436 api_key: Some("route-embed-key".to_string()),
2437 }];
2438
2439 let masked = mask_sensitive_fields(&cfg);
2440 let toml = toml::to_string_pretty(&masked).expect("masked config should serialize");
2441 let parsed: crate::config::Config =
2442 toml::from_str(&toml).expect("masked config should remain valid TOML for Config");
2443
2444 assert_eq!(parsed.api_key.as_deref(), Some(MASKED_SECRET));
2445 assert_eq!(
2446 parsed.reliability.api_keys,
2447 vec![MASKED_SECRET.to_string(), MASKED_SECRET.to_string()]
2448 );
2449 assert_eq!(
2450 parsed.gateway.paired_tokens,
2451 vec![MASKED_SECRET.to_string()]
2452 );
2453 assert_eq!(
2454 parsed.tunnel.cloudflare.as_ref().map(|v| v.token.as_str()),
2455 Some(MASKED_SECRET)
2456 );
2457 assert_eq!(
2458 parsed
2459 .channels_config
2460 .wati
2461 .as_ref()
2462 .map(|v| v.api_token.as_str()),
2463 Some(MASKED_SECRET)
2464 );
2465 assert_eq!(
2467 parsed
2468 .channels_config
2469 .feishu
2470 .as_ref()
2471 .map(|v| v.app_secret.as_str()),
2472 Some(MASKED_SECRET)
2473 );
2474 assert_eq!(
2475 parsed
2476 .channels_config
2477 .feishu
2478 .as_ref()
2479 .and_then(|v| v.encrypt_key.as_deref()),
2480 Some(MASKED_SECRET)
2481 );
2482 assert_eq!(
2483 parsed
2484 .channels_config
2485 .feishu
2486 .as_ref()
2487 .and_then(|v| v.verification_token.as_deref()),
2488 Some(MASKED_SECRET)
2489 );
2490 assert_eq!(
2491 parsed
2492 .model_routes
2493 .first()
2494 .and_then(|v| v.api_key.as_deref()),
2495 Some(MASKED_SECRET)
2496 );
2497 assert_eq!(
2498 parsed
2499 .embedding_routes
2500 .first()
2501 .and_then(|v| v.api_key.as_deref()),
2502 Some(MASKED_SECRET)
2503 );
2504 assert_eq!(
2505 parsed
2506 .channels_config
2507 .email
2508 .as_ref()
2509 .map(|v| v.password.as_str()),
2510 Some(MASKED_SECRET)
2511 );
2512 }
2513
2514 #[test]
2515 fn hydrate_config_for_save_restores_masked_secrets_and_paths() {
2516 let mut current = crate::config::Config::default();
2517 current.config_path = std::path::PathBuf::from("/tmp/current/config.toml");
2518 current.workspace_dir = std::path::PathBuf::from("/tmp/current/workspace");
2519 current.api_key = Some("real-key".to_string());
2520 current.reliability.api_keys = vec!["r1".to_string(), "r2".to_string()];
2521 current.gateway.paired_tokens = vec!["pair-1".to_string(), "pair-2".to_string()];
2522 current.tunnel.cloudflare = Some(crate::config::schema::CloudflareTunnelConfig {
2523 token: "cf-token-real".to_string(),
2524 });
2525 current.tunnel.ngrok = Some(crate::config::schema::NgrokTunnelConfig {
2526 auth_token: "ngrok-token-real".to_string(),
2527 domain: None,
2528 });
2529 current.channels_config.wati = Some(crate::config::schema::WatiConfig {
2531 api_token: "wati-real".to_string(),
2532 api_url: "https://live-mt-server.wati.io".to_string(),
2533 tenant_id: None,
2534 allowed_numbers: vec![],
2535 proxy_url: None,
2536 });
2537 current.channels_config.feishu = Some(crate::config::schema::FeishuConfig {
2538 app_id: "cli_current".to_string(),
2539 app_secret: "feishu-secret-real".to_string(),
2540 encrypt_key: Some("feishu-encrypt-real".to_string()),
2541 verification_token: Some("feishu-verify-real".to_string()),
2542 allowed_users: vec!["*".to_string()],
2543 receive_mode: crate::config::schema::LarkReceiveMode::Websocket,
2544 port: None,
2545 proxy_url: None,
2546 });
2547 current.channels_config.email = Some(crate::channels::email_channel::EmailConfig {
2548 imap_host: "imap.example.com".to_string(),
2549 imap_port: 993,
2550 imap_folder: "INBOX".to_string(),
2551 smtp_host: "smtp.example.com".to_string(),
2552 smtp_port: 465,
2553 smtp_tls: true,
2554 username: "agent@example.com".to_string(),
2555 password: "email-password-real".to_string(),
2556 from_address: "agent@example.com".to_string(),
2557 idle_timeout_secs: 1740,
2558 allowed_senders: vec!["*".to_string()],
2559 default_subject: "Construct Message".to_string(),
2560 });
2561 current.model_routes = vec![
2562 crate::config::schema::ModelRouteConfig {
2563 hint: "reasoning".to_string(),
2564 provider: "openrouter".to_string(),
2565 model: "anthropic/claude-sonnet-4.6".to_string(),
2566 api_key: Some("route-model-key-1".to_string()),
2567 },
2568 crate::config::schema::ModelRouteConfig {
2569 hint: "fast".to_string(),
2570 provider: "openrouter".to_string(),
2571 model: "openai/gpt-4.1-mini".to_string(),
2572 api_key: Some("route-model-key-2".to_string()),
2573 },
2574 ];
2575 current.embedding_routes = vec![
2576 crate::config::schema::EmbeddingRouteConfig {
2577 hint: "semantic".to_string(),
2578 provider: "openai".to_string(),
2579 model: "text-embedding-3-small".to_string(),
2580 dimensions: Some(1536),
2581 api_key: Some("route-embed-key-1".to_string()),
2582 },
2583 crate::config::schema::EmbeddingRouteConfig {
2584 hint: "archive".to_string(),
2585 provider: "custom:https://emb.example.com/v1".to_string(),
2586 model: "bge-m3".to_string(),
2587 dimensions: Some(1024),
2588 api_key: Some("route-embed-key-2".to_string()),
2589 },
2590 ];
2591
2592 let mut incoming = mask_sensitive_fields(¤t);
2593 incoming.default_model = Some("gpt-4.1-mini".to_string());
2594 incoming.reliability.api_keys = vec![MASKED_SECRET.to_string(), "r2-new".to_string()];
2596 incoming.gateway.paired_tokens = vec![MASKED_SECRET.to_string(), "pair-2-new".to_string()];
2597 if let Some(cloudflare) = incoming.tunnel.cloudflare.as_mut() {
2598 cloudflare.token = MASKED_SECRET.to_string();
2599 }
2600 if let Some(ngrok) = incoming.tunnel.ngrok.as_mut() {
2601 ngrok.auth_token = MASKED_SECRET.to_string();
2602 }
2603 if let Some(wati) = incoming.channels_config.wati.as_mut() {
2605 wati.api_token = MASKED_SECRET.to_string();
2606 }
2607 if let Some(feishu) = incoming.channels_config.feishu.as_mut() {
2608 feishu.app_secret = MASKED_SECRET.to_string();
2609 feishu.encrypt_key = Some(MASKED_SECRET.to_string());
2610 feishu.verification_token = Some("feishu-verify-new".to_string());
2611 }
2612 if let Some(email) = incoming.channels_config.email.as_mut() {
2613 email.password = MASKED_SECRET.to_string();
2614 }
2615 incoming.model_routes[1].api_key = Some("route-model-key-2-new".to_string());
2616 incoming.embedding_routes[1].api_key = Some("route-embed-key-2-new".to_string());
2617
2618 let hydrated = hydrate_config_for_save(incoming, ¤t);
2619
2620 assert_eq!(hydrated.config_path, current.config_path);
2621 assert_eq!(hydrated.workspace_dir, current.workspace_dir);
2622 assert_eq!(hydrated.api_key, current.api_key);
2623 assert_eq!(hydrated.default_model.as_deref(), Some("gpt-4.1-mini"));
2624 assert_eq!(
2625 hydrated.reliability.api_keys,
2626 vec!["r1".to_string(), "r2-new".to_string()]
2627 );
2628 assert_eq!(
2629 hydrated.gateway.paired_tokens,
2630 vec!["pair-1".to_string(), "pair-2-new".to_string()]
2631 );
2632 assert_eq!(
2633 hydrated
2634 .tunnel
2635 .cloudflare
2636 .as_ref()
2637 .map(|v| v.token.as_str()),
2638 Some("cf-token-real")
2639 );
2640 assert_eq!(
2641 hydrated
2642 .tunnel
2643 .ngrok
2644 .as_ref()
2645 .map(|v| v.auth_token.as_str()),
2646 Some("ngrok-token-real")
2647 );
2648 assert_eq!(
2650 hydrated
2651 .channels_config
2652 .wati
2653 .as_ref()
2654 .map(|v| v.api_token.as_str()),
2655 Some("wati-real")
2656 );
2657 assert_eq!(
2658 hydrated
2659 .channels_config
2660 .feishu
2661 .as_ref()
2662 .map(|v| v.app_secret.as_str()),
2663 Some("feishu-secret-real")
2664 );
2665 assert_eq!(
2666 hydrated
2667 .channels_config
2668 .feishu
2669 .as_ref()
2670 .and_then(|v| v.encrypt_key.as_deref()),
2671 Some("feishu-encrypt-real")
2672 );
2673 assert_eq!(
2674 hydrated
2675 .channels_config
2676 .feishu
2677 .as_ref()
2678 .and_then(|v| v.verification_token.as_deref()),
2679 Some("feishu-verify-new")
2680 );
2681 assert_eq!(
2682 hydrated.model_routes[0].api_key.as_deref(),
2683 Some("route-model-key-1")
2684 );
2685 assert_eq!(
2686 hydrated.model_routes[1].api_key.as_deref(),
2687 Some("route-model-key-2-new")
2688 );
2689 assert_eq!(
2690 hydrated.embedding_routes[0].api_key.as_deref(),
2691 Some("route-embed-key-1")
2692 );
2693 assert_eq!(
2694 hydrated.embedding_routes[1].api_key.as_deref(),
2695 Some("route-embed-key-2-new")
2696 );
2697 assert_eq!(
2698 hydrated
2699 .channels_config
2700 .email
2701 .as_ref()
2702 .map(|v| v.password.as_str()),
2703 Some("email-password-real")
2704 );
2705 }
2706
2707 #[test]
2708 fn hydrate_config_for_save_restores_route_keys_by_identity_and_clears_unmatched_masks() {
2709 let mut current = crate::config::Config::default();
2710 current.model_routes = vec![
2711 crate::config::schema::ModelRouteConfig {
2712 hint: "reasoning".to_string(),
2713 provider: "openrouter".to_string(),
2714 model: "anthropic/claude-sonnet-4.6".to_string(),
2715 api_key: Some("route-model-key-1".to_string()),
2716 },
2717 crate::config::schema::ModelRouteConfig {
2718 hint: "fast".to_string(),
2719 provider: "openrouter".to_string(),
2720 model: "openai/gpt-4.1-mini".to_string(),
2721 api_key: Some("route-model-key-2".to_string()),
2722 },
2723 ];
2724 current.embedding_routes = vec![
2725 crate::config::schema::EmbeddingRouteConfig {
2726 hint: "semantic".to_string(),
2727 provider: "openai".to_string(),
2728 model: "text-embedding-3-small".to_string(),
2729 dimensions: Some(1536),
2730 api_key: Some("route-embed-key-1".to_string()),
2731 },
2732 crate::config::schema::EmbeddingRouteConfig {
2733 hint: "archive".to_string(),
2734 provider: "custom:https://emb.example.com/v1".to_string(),
2735 model: "bge-m3".to_string(),
2736 dimensions: Some(1024),
2737 api_key: Some("route-embed-key-2".to_string()),
2738 },
2739 ];
2740
2741 let mut incoming = mask_sensitive_fields(¤t);
2742 incoming.model_routes.swap(0, 1);
2743 incoming.embedding_routes.swap(0, 1);
2744 incoming
2745 .model_routes
2746 .push(crate::config::schema::ModelRouteConfig {
2747 hint: "new".to_string(),
2748 provider: "openai".to_string(),
2749 model: "gpt-4.1".to_string(),
2750 api_key: Some(MASKED_SECRET.to_string()),
2751 });
2752 incoming
2753 .embedding_routes
2754 .push(crate::config::schema::EmbeddingRouteConfig {
2755 hint: "new-embed".to_string(),
2756 provider: "custom:https://emb2.example.com/v1".to_string(),
2757 model: "bge-small".to_string(),
2758 dimensions: Some(768),
2759 api_key: Some(MASKED_SECRET.to_string()),
2760 });
2761
2762 let hydrated = hydrate_config_for_save(incoming, ¤t);
2763
2764 assert_eq!(
2765 hydrated.model_routes[0].api_key.as_deref(),
2766 Some("route-model-key-2")
2767 );
2768 assert_eq!(
2769 hydrated.model_routes[1].api_key.as_deref(),
2770 Some("route-model-key-1")
2771 );
2772 assert_eq!(hydrated.model_routes[2].api_key, None);
2773 assert_eq!(
2774 hydrated.embedding_routes[0].api_key.as_deref(),
2775 Some("route-embed-key-2")
2776 );
2777 assert_eq!(
2778 hydrated.embedding_routes[1].api_key.as_deref(),
2779 Some("route-embed-key-1")
2780 );
2781 assert_eq!(hydrated.embedding_routes[2].api_key, None);
2782 assert!(
2783 hydrated
2784 .model_routes
2785 .iter()
2786 .all(|route| route.api_key.as_deref() != Some(MASKED_SECRET))
2787 );
2788 assert!(
2789 hydrated
2790 .embedding_routes
2791 .iter()
2792 .all(|route| route.api_key.as_deref() != Some(MASKED_SECRET))
2793 );
2794 }
2795
2796 #[tokio::test]
2797 async fn cron_api_shell_roundtrip_includes_delivery() {
2798 let tmp = tempfile::TempDir::new().unwrap();
2799 let config = crate::config::Config {
2800 workspace_dir: tmp.path().join("workspace"),
2801 config_path: tmp.path().join("config.toml"),
2802 ..crate::config::Config::default()
2803 };
2804 std::fs::create_dir_all(&config.workspace_dir).unwrap();
2805 let state = test_state(config);
2806
2807 let add_response = handle_api_cron_add(
2808 State(state.clone()),
2809 HeaderMap::new(),
2810 Json(
2811 serde_json::from_value::<CronAddBody>(serde_json::json!({
2812 "name": "test-job",
2813 "schedule": "*/5 * * * *",
2814 "command": "echo hello",
2815 "delivery": {
2816 "mode": "announce",
2817 "channel": "discord",
2818 "to": "1234567890",
2819 "best_effort": true
2820 }
2821 }))
2822 .expect("body should deserialize"),
2823 ),
2824 )
2825 .await
2826 .into_response();
2827
2828 let add_json = response_json(add_response).await;
2829 assert_eq!(add_json["status"], "ok");
2830 assert_eq!(add_json["job"]["delivery"]["mode"], "announce");
2831 assert_eq!(add_json["job"]["delivery"]["channel"], "discord");
2832 assert_eq!(add_json["job"]["delivery"]["to"], "1234567890");
2833
2834 let list_response = handle_api_cron_list(State(state), HeaderMap::new())
2835 .await
2836 .into_response();
2837 let list_json = response_json(list_response).await;
2838 let jobs = list_json["jobs"].as_array().expect("jobs array");
2839 assert_eq!(jobs.len(), 1);
2840 assert_eq!(jobs[0]["delivery"]["mode"], "announce");
2841 assert_eq!(jobs[0]["delivery"]["channel"], "discord");
2842 assert_eq!(jobs[0]["delivery"]["to"], "1234567890");
2843 }
2844
2845 #[tokio::test]
2846 async fn cron_api_accepts_agent_jobs() {
2847 let tmp = tempfile::TempDir::new().unwrap();
2848 let config = crate::config::Config {
2849 workspace_dir: tmp.path().join("workspace"),
2850 config_path: tmp.path().join("config.toml"),
2851 ..crate::config::Config::default()
2852 };
2853 std::fs::create_dir_all(&config.workspace_dir).unwrap();
2854 let state = test_state(config);
2855
2856 let response = handle_api_cron_add(
2857 State(state.clone()),
2858 HeaderMap::new(),
2859 Json(
2860 serde_json::from_value::<CronAddBody>(serde_json::json!({
2861 "name": "agent-job",
2862 "schedule": "*/5 * * * *",
2863 "job_type": "agent",
2864 "command": "ignored shell command",
2865 "prompt": "summarize the latest logs"
2866 }))
2867 .expect("body should deserialize"),
2868 ),
2869 )
2870 .await
2871 .into_response();
2872
2873 let json = response_json(response).await;
2874 assert_eq!(json["status"], "ok");
2875
2876 let config = state.config.lock().clone();
2877 let jobs = crate::cron::list_jobs(&config).unwrap();
2878 assert_eq!(jobs.len(), 1);
2879 assert_eq!(jobs[0].job_type, crate::cron::JobType::Agent);
2880 assert_eq!(jobs[0].prompt.as_deref(), Some("summarize the latest logs"));
2881 }
2882
2883 #[tokio::test]
2884 async fn cron_api_rejects_announce_delivery_without_target() {
2885 let tmp = tempfile::TempDir::new().unwrap();
2886 let config = crate::config::Config {
2887 workspace_dir: tmp.path().join("workspace"),
2888 config_path: tmp.path().join("config.toml"),
2889 ..crate::config::Config::default()
2890 };
2891 std::fs::create_dir_all(&config.workspace_dir).unwrap();
2892 let state = test_state(config);
2893
2894 let response = handle_api_cron_add(
2895 State(state.clone()),
2896 HeaderMap::new(),
2897 Json(
2898 serde_json::from_value::<CronAddBody>(serde_json::json!({
2899 "name": "invalid-delivery-job",
2900 "schedule": "*/5 * * * *",
2901 "command": "echo hello",
2902 "delivery": {
2903 "mode": "announce",
2904 "channel": "discord"
2905 }
2906 }))
2907 .expect("body should deserialize"),
2908 ),
2909 )
2910 .await
2911 .into_response();
2912
2913 assert_eq!(response.status(), StatusCode::BAD_REQUEST);
2914 let json = response_json(response).await;
2915 assert!(
2916 json["error"]
2917 .as_str()
2918 .unwrap_or_default()
2919 .contains("delivery.to is required")
2920 );
2921
2922 let config = state.config.lock().clone();
2923 assert!(crate::cron::list_jobs(&config).unwrap().is_empty());
2924 }
2925
2926 #[tokio::test]
2927 async fn cron_api_rejects_announce_delivery_with_unsupported_channel() {
2928 let tmp = tempfile::TempDir::new().unwrap();
2929 let config = crate::config::Config {
2930 workspace_dir: tmp.path().join("workspace"),
2931 config_path: tmp.path().join("config.toml"),
2932 ..crate::config::Config::default()
2933 };
2934 std::fs::create_dir_all(&config.workspace_dir).unwrap();
2935 let state = test_state(config);
2936
2937 let response = handle_api_cron_add(
2938 State(state.clone()),
2939 HeaderMap::new(),
2940 Json(
2941 serde_json::from_value::<CronAddBody>(serde_json::json!({
2942 "name": "invalid-delivery-job",
2943 "schedule": "*/5 * * * *",
2944 "command": "echo hello",
2945 "delivery": {
2946 "mode": "announce",
2947 "channel": "email",
2948 "to": "alerts@example.com"
2949 }
2950 }))
2951 .expect("body should deserialize"),
2952 ),
2953 )
2954 .await
2955 .into_response();
2956
2957 assert_eq!(response.status(), StatusCode::BAD_REQUEST);
2958 let json = response_json(response).await;
2959 assert!(
2960 json["error"]
2961 .as_str()
2962 .unwrap_or_default()
2963 .contains("unsupported delivery channel")
2964 );
2965
2966 let config = state.config.lock().clone();
2967 assert!(crate::cron::list_jobs(&config).unwrap().is_empty());
2968 }
2969}