1use std::collections::HashMap;
2use std::net::SocketAddr;
3use std::path::Path as FsPath;
4use std::path::PathBuf;
5use std::sync::Arc;
6use std::time::Duration;
7
8use async_trait::async_trait;
9use axum::extract::Extension;
10use axum::extract::{Path, Query, State};
11use axum::http::header::{self, HeaderValue};
12use axum::http::{HeaderMap, StatusCode};
13use axum::response::sse::{Event, KeepAlive, Sse};
14use axum::response::IntoResponse;
15use axum::response::Response;
16use axum::{Json, Router};
17use futures::Stream;
18use regex::Regex;
19use serde::{Deserialize, Serialize};
20use serde_json::{json, Value};
21use sha2::{Digest, Sha256};
22use tandem_memory::types::GlobalMemoryRecord;
23use tandem_memory::{
24 db::MemoryDatabase, MemoryCapabilities, MemoryCapabilityToken, MemoryPromoteRequest,
25 MemoryPromoteResponse, MemoryPutRequest, MemoryPutResponse, MemorySearchRequest,
26 MemorySearchResponse, ScrubReport, ScrubStatus,
27};
28use tandem_skills::{SkillBundleArtifacts, SkillLocation, SkillService, SkillsConflictPolicy};
29use tokio_stream::wrappers::BroadcastStream;
30use tokio_stream::StreamExt;
31use uuid::Uuid;
32
33use tandem_tools::Tool;
34use tandem_types::{
35 CreateSessionRequest, EngineEvent, Message, MessagePart, MessagePartInput, MessageRole,
36 SendMessageRequest, Session, TenantContext, TodoItem, ToolResult, ToolSchema,
37};
38use tandem_wire::{WireSession, WireSessionMessage};
39
40use crate::ResourceStoreError;
41use crate::{
42 capability_resolver::{
43 classify_missing_required, providers_for_capability, CapabilityBindingsFile,
44 CapabilityBlockingIssue, CapabilityReadinessInput, CapabilityReadinessOutput,
45 CapabilityResolveInput,
46 },
47 mcp_catalog,
48 pack_manager::{PackExportRequest, PackInstallRequest, PackUninstallRequest},
49 ActiveRun, AppState, DiscordConfigFile, SlackConfigFile, TelegramConfigFile,
50};
51
52mod automation_projection_runtime;
53pub(crate) mod bug_monitor;
54mod capabilities;
55mod channels_api;
56mod coder;
57pub(crate) mod config_providers;
58pub(crate) mod context_packs;
59mod context_run_ledger;
60mod context_run_mutation_checkpoints;
61pub(crate) mod context_runs;
62pub(crate) mod context_types;
63mod enterprise;
64mod external_actions;
65mod global;
66mod marketplace;
67pub(crate) mod mcp;
68mod middleware;
69mod mission_builder;
70mod mission_builder_host;
71mod mission_builder_runtime;
72mod missions_teams;
73mod optimizations;
74mod pack_builder;
75mod packs;
76mod permissions_questions;
77mod presets;
78mod resources;
79mod router;
80mod routes_bug_monitor;
81mod routes_capabilities;
82mod routes_coder;
83mod routes_config_providers;
84mod routes_context;
85mod routes_external_actions;
86mod routes_global;
87mod routes_marketplace;
88mod routes_mcp;
89mod routes_mission_builder;
90mod routes_missions_teams;
91mod routes_optimizations;
92mod routes_pack_builder;
93mod routes_packs;
94mod routes_permissions_questions;
95mod routes_presets;
96mod routes_resources;
97mod routes_routines_automations;
98mod routes_sessions;
99mod routes_setup_understanding;
100mod routes_skills_memory;
101mod routes_system_api;
102mod routes_task_intake;
103mod routes_workflow_planner;
104mod routes_workflows;
105pub(crate) mod routines_automations;
106mod sessions;
107mod setup_understanding;
108mod skills_memory;
109mod system_api;
110mod task_intake;
111pub(crate) mod workflow_planner;
112mod workflow_planner_host;
113mod workflow_planner_policy;
114pub(crate) mod workflow_planner_runtime;
115mod workflow_planner_transport;
116mod workflows;
117
118use capabilities::*;
119use context_runs::*;
120use context_types::*;
121use marketplace::*;
122use mcp::*;
123use pack_builder::*;
124use packs::*;
125use permissions_questions::*;
126use presets::*;
127use resources::*;
128use sessions::*;
129use setup_understanding::*;
130use skills_memory::*;
131use system_api::*;
132
133#[cfg(test)]
134pub(crate) use context_runs::session_context_run_id;
135pub(crate) use context_runs::sync_workflow_run_blackboard;
136#[cfg(test)]
137pub(crate) use context_runs::workflow_context_run_id;
138pub(crate) use workflow_planner_runtime::compile_plan_to_automation_v2;
139
140#[derive(Debug, Deserialize)]
141struct ListSessionsQuery {
142 q: Option<String>,
143 page: Option<usize>,
144 page_size: Option<usize>,
145 archived: Option<bool>,
146 scope: Option<SessionScope>,
147 workspace: Option<String>,
148}
149
150#[derive(Debug, Deserialize, Default)]
151struct EventFilterQuery {
152 #[serde(rename = "sessionID")]
153 session_id: Option<String>,
154 #[serde(rename = "runID")]
155 run_id: Option<String>,
156}
157
158#[derive(Debug, Deserialize, Default, Clone, Copy)]
159struct RunEventsQuery {
160 since_seq: Option<u64>,
161 tail: Option<usize>,
162}
163
164#[derive(Debug, Deserialize, Default)]
165struct PromptAsyncQuery {
166 r#return: Option<String>,
167}
168
169#[derive(Debug, Deserialize)]
170struct EngineLeaseAcquireInput {
171 client_id: Option<String>,
172 client_type: Option<String>,
173 ttl_ms: Option<u64>,
174}
175
176#[derive(Debug, Deserialize)]
177struct EngineLeaseRenewInput {
178 lease_id: String,
179}
180
181#[derive(Debug, Deserialize)]
182struct EngineLeaseReleaseInput {
183 lease_id: String,
184}
185
186#[derive(Debug, Deserialize, Default)]
187struct StorageRepairInput {
188 force: Option<bool>,
189}
190
191#[derive(Debug, Deserialize, Default)]
192struct StorageFilesQuery {
193 path: Option<String>,
194 limit: Option<usize>,
195}
196
197#[derive(Debug, Deserialize, Default)]
198struct UpdateSessionInput {
199 title: Option<String>,
200 archived: Option<bool>,
201 permission: Option<Vec<serde_json::Value>>,
202}
203
204#[derive(Debug, Deserialize)]
205struct AttachSessionInput {
206 target_workspace: String,
207 reason_tag: Option<String>,
208}
209
210#[derive(Debug, Deserialize)]
211struct WorkspaceOverrideInput {
212 ttl_seconds: Option<u64>,
213}
214
215#[derive(Debug, Deserialize, Default)]
216struct WorktreeInput {
217 repo_root: Option<String>,
218 path: Option<String>,
219 branch: Option<String>,
220 base: Option<String>,
221 task_id: Option<String>,
222 owner_run_id: Option<String>,
223 lease_id: Option<String>,
224 managed: Option<bool>,
225 cleanup_branch: Option<bool>,
226}
227
228#[derive(Debug, Deserialize, Default)]
229struct WorktreeListQuery {
230 repo_root: Option<String>,
231 managed_only: Option<bool>,
232}
233
234#[derive(Debug, Deserialize, Default)]
235struct LogInput {
236 level: Option<String>,
237 message: Option<String>,
238 context: Option<Value>,
239}
240
241#[derive(Debug, Serialize)]
242struct ErrorEnvelope {
243 error: String,
244 #[serde(skip_serializing_if = "Option::is_none")]
245 code: Option<String>,
246}
247
248pub async fn serve(addr: SocketAddr, state: AppState) -> anyhow::Result<()> {
249 let reaper_state = state.clone();
250 let session_part_persister_state = state.clone();
251 let session_context_run_journaler_state = state.clone();
252 let status_indexer_state = state.clone();
253 let routine_scheduler_state = state.clone();
254 let routine_executor_state = state.clone();
255 let usage_aggregator_state = state.clone();
256 let automation_v2_scheduler_state = state.clone();
257 let automation_v2_executor_state = state.clone();
258 let optimization_scheduler_state = state.clone();
259 let workflow_dispatcher_state = state.clone();
260 let agent_team_supervisor_state = state.clone();
261 let global_memory_ingestor_state = state.clone();
262 let bug_monitor_state = state.clone();
263 let mcp_bootstrap_state = state.clone();
264 tokio::spawn(async move {
265 bootstrap_mcp_servers_when_ready(mcp_bootstrap_state).await;
266 });
267 let app = app_router(state.clone());
268 let reaper = tokio::spawn(async move {
269 loop {
270 tokio::time::sleep(Duration::from_secs(5)).await;
271 let stale = reaper_state
272 .run_registry
273 .reap_stale(reaper_state.run_stale_ms)
274 .await;
275 for (session_id, run) in stale {
276 let _ = reaper_state.cancellations.cancel(&session_id).await;
277 let _ = reaper_state
278 .close_browser_sessions_for_owner(&session_id)
279 .await;
280 reaper_state.event_bus.publish(EngineEvent::new(
281 "session.run.finished",
282 json!({
283 "sessionID": session_id,
284 "runID": run.run_id,
285 "finishedAtMs": crate::now_ms(),
286 "status": "timeout",
287 }),
288 ));
289 }
290 }
291 });
292 let session_part_persister = tokio::spawn(crate::run_session_part_persister(
293 session_part_persister_state,
294 ));
295 let session_context_run_journaler = tokio::spawn(crate::run_session_context_run_journaler(
296 session_context_run_journaler_state,
297 ));
298 let status_indexer = tokio::spawn(crate::run_status_indexer(status_indexer_state));
299 let routine_scheduler = tokio::spawn(crate::run_routine_scheduler(routine_scheduler_state));
300 let routine_executor = tokio::spawn(crate::run_routine_executor(routine_executor_state));
301 let usage_aggregator = tokio::spawn(crate::run_usage_aggregator(usage_aggregator_state));
302 let automation_v2_scheduler = tokio::spawn(crate::run_automation_v2_scheduler(
303 automation_v2_scheduler_state,
304 ));
305 let automation_v2_executor = tokio::spawn(crate::run_automation_v2_executor(
306 automation_v2_executor_state,
307 ));
308 let optimization_scheduler = tokio::spawn(crate::run_optimization_scheduler(
309 optimization_scheduler_state,
310 ));
311 let workflow_dispatcher =
312 tokio::spawn(crate::run_workflow_dispatcher(workflow_dispatcher_state));
313 let agent_team_supervisor = tokio::spawn(crate::run_agent_team_supervisor(
314 agent_team_supervisor_state,
315 ));
316 let bug_monitor = tokio::spawn(crate::run_bug_monitor(bug_monitor_state));
317 let global_memory_ingestor =
318 tokio::spawn(run_global_memory_ingestor(global_memory_ingestor_state));
319 let shutdown_state = state.clone();
320 let shutdown_timeout_secs = crate::config::env::resolve_scheduler_shutdown_timeout_secs();
321
322 let hygiene_task = tokio::spawn(async move {
326 tokio::time::sleep(Duration::from_secs(60)).await;
328 loop {
329 let retention_days: u32 = std::env::var("TANDEM_MEMORY_RETENTION_DAYS")
330 .ok()
331 .and_then(|v| v.parse().ok())
332 .unwrap_or(30);
333 if retention_days > 0 {
334 match tandem_core::resolve_shared_paths() {
335 Ok(paths) => {
336 match tandem_memory::db::MemoryDatabase::new(&paths.memory_db_path).await {
337 Ok(db) => {
338 if let Err(e) = db.run_hygiene(retention_days).await {
339 tracing::warn!("memory hygiene failed: {}", e);
340 }
341 }
342 Err(e) => tracing::warn!("memory hygiene: could not open DB: {}", e),
343 }
344 }
345 Err(e) => tracing::warn!("memory hygiene: could not resolve paths: {}", e),
346 }
347 }
348 tokio::time::sleep(Duration::from_secs(12 * 60 * 60)).await;
349 }
350 });
351
352 let archiver_state = state.clone();
360 let _automation_v2_archiver = tokio::spawn(async move {
361 loop {
363 if archiver_state.is_automation_scheduler_stopping() {
364 return;
365 }
366 let startup = archiver_state.startup_snapshot().await;
367 if matches!(startup.status, crate::app::startup::StartupStatus::Ready) {
368 break;
369 }
370 if matches!(startup.status, crate::app::startup::StartupStatus::Failed) {
371 return;
372 }
373 tokio::time::sleep(Duration::from_millis(250)).await;
374 }
375 loop {
376 let retention_days: u64 = std::env::var("TANDEM_AUTOMATION_V2_RUNS_RETENTION_DAYS")
377 .ok()
378 .and_then(|v| v.trim().parse().ok())
379 .unwrap_or(7);
380 if retention_days > 0 {
381 match archiver_state
382 .archive_stale_automation_v2_runs(retention_days)
383 .await
384 {
385 Ok(n) if n > 0 => {
386 tracing::info!(
387 archived = n,
388 retention_days,
389 "automation v2 archiver: pruned stale terminal runs"
390 );
391 }
392 Ok(_) => {}
393 Err(e) => {
394 tracing::warn!(error = %e, "automation v2 archiver: archive failed");
395 }
396 }
397 }
398 tokio::time::sleep(Duration::from_secs(24 * 60 * 60)).await;
399 }
400 });
401
402 let listener = tokio::net::TcpListener::bind(addr).await?;
406 let result = axum::serve(listener, app)
407 .with_graceful_shutdown(async move {
408 if tokio::signal::ctrl_c().await.is_err() {
409 futures::future::pending::<()>().await;
410 }
411 shutdown_state.set_automation_scheduler_stopping(true);
412 tokio::time::sleep(Duration::from_secs(shutdown_timeout_secs)).await;
413 let failed = shutdown_state
414 .fail_running_automation_runs_for_shutdown()
415 .await;
416 if failed > 0 {
417 tracing::warn!(
418 failed_runs = failed,
419 "automation runs marked failed during scheduler shutdown"
420 );
421 }
422 })
423 .await;
424 reaper.abort();
425 session_part_persister.abort();
426 session_context_run_journaler.abort();
427 status_indexer.abort();
428 routine_scheduler.abort();
429 routine_executor.abort();
430 usage_aggregator.abort();
431 automation_v2_scheduler.abort();
432 automation_v2_executor.abort();
433 optimization_scheduler.abort();
434 workflow_dispatcher.abort();
435 agent_team_supervisor.abort();
436 bug_monitor.abort();
437 global_memory_ingestor.abort();
438 hygiene_task.abort();
439 result?;
440 Ok(())
441}
442
443fn app_router(state: AppState) -> Router {
444 router::build_router(state)
445}
446fn load_run_events_jsonl(path: &FsPath, since_seq: Option<u64>, tail: Option<usize>) -> Vec<Value> {
447 let content = match std::fs::read_to_string(path) {
448 Ok(value) => value,
449 Err(_) => return Vec::new(),
450 };
451 let mut rows: Vec<Value> = content
452 .lines()
453 .filter_map(|line| serde_json::from_str::<Value>(line).ok())
454 .filter(|row| {
455 if let Some(since) = since_seq {
456 return row.get("seq").and_then(|value| value.as_u64()).unwrap_or(0) > since;
457 }
458 true
459 })
460 .collect();
461 rows.sort_by_key(|row| row.get("seq").and_then(|value| value.as_u64()).unwrap_or(0));
462 if let Some(tail_count) = tail {
463 if rows.len() > tail_count {
464 rows = rows.split_off(rows.len().saturating_sub(tail_count));
465 }
466 }
467 rows
468}
469
470pub(super) fn truncate_for_stream(input: &str, max_len: usize) -> String {
471 if input.len() <= max_len {
472 return input.to_string();
473 }
474 let mut end = 0usize;
475 for (idx, ch) in input.char_indices() {
476 let next = idx + ch.len_utf8();
477 if next > max_len {
478 break;
479 }
480 end = next;
481 }
482 let mut out = input[..end].to_string();
483 out.push_str("...<truncated>");
484 out
485}
486
487#[cfg(test)]
488mod tests;