1use std::collections::HashMap;
2use std::net::SocketAddr;
3use std::path::Path as FsPath;
4use std::path::PathBuf;
5use std::sync::Arc;
6use std::time::Duration;
7
8use async_trait::async_trait;
9use axum::extract::{Path, Query, State};
10use axum::http::header::{self, HeaderValue};
11use axum::http::{HeaderMap, StatusCode};
12use axum::response::sse::{Event, KeepAlive, Sse};
13use axum::response::IntoResponse;
14use axum::response::Response;
15use axum::{Json, Router};
16use futures::Stream;
17use regex::Regex;
18use serde::{Deserialize, Serialize};
19use serde_json::{json, Value};
20use sha2::{Digest, Sha256};
21use tandem_memory::types::GlobalMemoryRecord;
22use tandem_memory::{
23 db::MemoryDatabase, MemoryCapabilities, MemoryCapabilityToken, MemoryPromoteRequest,
24 MemoryPromoteResponse, MemoryPutRequest, MemoryPutResponse, MemorySearchRequest,
25 MemorySearchResponse, ScrubReport, ScrubStatus,
26};
27use tandem_skills::{SkillBundleArtifacts, SkillLocation, SkillService, SkillsConflictPolicy};
28use tokio_stream::wrappers::BroadcastStream;
29use tokio_stream::StreamExt;
30use uuid::Uuid;
31
32use tandem_channels::start_channel_listeners;
33use tandem_tools::Tool;
34use tandem_types::{
35 CreateSessionRequest, EngineEvent, Message, MessagePart, MessagePartInput, MessageRole,
36 SendMessageRequest, Session, TodoItem, ToolResult, ToolSchema,
37};
38use tandem_wire::{WireSession, WireSessionMessage};
39
40use crate::ResourceStoreError;
41use crate::{
42 capability_resolver::{
43 classify_missing_required, providers_for_capability, CapabilityBindingsFile,
44 CapabilityBlockingIssue, CapabilityReadinessInput, CapabilityReadinessOutput,
45 CapabilityResolveInput,
46 },
47 mcp_catalog,
48 pack_manager::{PackExportRequest, PackInstallRequest, PackUninstallRequest},
49 ActiveRun, AppState, ChannelStatus, DiscordConfigFile, SlackConfigFile, TelegramConfigFile,
50};
51
52mod automation_projection_runtime;
53pub(crate) mod bug_monitor;
54mod capabilities;
55mod channels_api;
56mod coder;
57mod config_providers;
58mod context_run_ledger;
59mod context_run_mutation_checkpoints;
60pub(crate) mod context_runs;
61pub(crate) mod context_types;
62mod external_actions;
63mod global;
64pub(crate) mod mcp;
65mod middleware;
66mod mission_builder;
67mod mission_builder_host;
68mod mission_builder_runtime;
69mod missions_teams;
70mod optimizations;
71mod pack_builder;
72mod packs;
73mod permissions_questions;
74mod presets;
75mod resources;
76mod router;
77mod routes_bug_monitor;
78mod routes_capabilities;
79mod routes_coder;
80mod routes_config_providers;
81mod routes_context;
82mod routes_external_actions;
83mod routes_global;
84mod routes_mcp;
85mod routes_mission_builder;
86mod routes_missions_teams;
87mod routes_optimizations;
88mod routes_pack_builder;
89mod routes_packs;
90mod routes_permissions_questions;
91mod routes_presets;
92mod routes_resources;
93mod routes_routines_automations;
94mod routes_sessions;
95mod routes_setup_understanding;
96mod routes_skills_memory;
97mod routes_system_api;
98mod routes_task_intake;
99mod routes_workflow_planner;
100mod routes_workflows;
101pub(crate) mod routines_automations;
102mod sessions;
103mod setup_understanding;
104mod skills_memory;
105mod system_api;
106mod task_intake;
107pub(crate) mod workflow_planner;
108mod workflow_planner_host;
109mod workflow_planner_policy;
110pub(crate) mod workflow_planner_runtime;
111mod workflow_planner_transport;
112mod workflows;
113
114use capabilities::*;
115use context_run_ledger::*;
116use context_run_mutation_checkpoints::*;
117use context_runs::*;
118use context_types::*;
119use mcp::*;
120use pack_builder::*;
121use packs::*;
122use permissions_questions::*;
123use presets::*;
124use resources::*;
125use sessions::*;
126use setup_understanding::*;
127use skills_memory::*;
128use system_api::*;
129
130#[cfg(test)]
131pub(crate) use context_runs::session_context_run_id;
132pub(crate) use context_runs::sync_workflow_run_blackboard;
133#[cfg(test)]
134pub(crate) use context_runs::workflow_context_run_id;
135pub(crate) use workflow_planner_runtime::compile_plan_to_automation_v2;
136
137#[derive(Debug, Deserialize)]
138struct ListSessionsQuery {
139 q: Option<String>,
140 page: Option<usize>,
141 page_size: Option<usize>,
142 archived: Option<bool>,
143 scope: Option<SessionScope>,
144 workspace: Option<String>,
145}
146
147#[derive(Debug, Deserialize, Default)]
148struct EventFilterQuery {
149 #[serde(rename = "sessionID")]
150 session_id: Option<String>,
151 #[serde(rename = "runID")]
152 run_id: Option<String>,
153}
154
155#[derive(Debug, Deserialize, Default, Clone, Copy)]
156struct RunEventsQuery {
157 since_seq: Option<u64>,
158 tail: Option<usize>,
159}
160
161#[derive(Debug, Deserialize, Default)]
162struct PromptAsyncQuery {
163 r#return: Option<String>,
164}
165
166#[derive(Debug, Deserialize)]
167struct EngineLeaseAcquireInput {
168 client_id: Option<String>,
169 client_type: Option<String>,
170 ttl_ms: Option<u64>,
171}
172
173#[derive(Debug, Deserialize)]
174struct EngineLeaseRenewInput {
175 lease_id: String,
176}
177
178#[derive(Debug, Deserialize)]
179struct EngineLeaseReleaseInput {
180 lease_id: String,
181}
182
183#[derive(Debug, Deserialize, Default)]
184struct StorageRepairInput {
185 force: Option<bool>,
186}
187
188#[derive(Debug, Deserialize, Default)]
189struct StorageFilesQuery {
190 path: Option<String>,
191 limit: Option<usize>,
192}
193
194#[derive(Debug, Deserialize, Default)]
195struct UpdateSessionInput {
196 title: Option<String>,
197 archived: Option<bool>,
198}
199
200#[derive(Debug, Deserialize)]
201struct AttachSessionInput {
202 target_workspace: String,
203 reason_tag: Option<String>,
204}
205
206#[derive(Debug, Deserialize)]
207struct WorkspaceOverrideInput {
208 ttl_seconds: Option<u64>,
209}
210
211#[derive(Debug, Deserialize, Default)]
212struct WorktreeInput {
213 repo_root: Option<String>,
214 path: Option<String>,
215 branch: Option<String>,
216 base: Option<String>,
217 task_id: Option<String>,
218 owner_run_id: Option<String>,
219 lease_id: Option<String>,
220 managed: Option<bool>,
221 cleanup_branch: Option<bool>,
222}
223
224#[derive(Debug, Deserialize, Default)]
225struct WorktreeListQuery {
226 repo_root: Option<String>,
227 managed_only: Option<bool>,
228}
229
230#[derive(Debug, Deserialize, Default)]
231struct LogInput {
232 level: Option<String>,
233 message: Option<String>,
234 context: Option<Value>,
235}
236
237#[derive(Debug, Serialize)]
238struct ErrorEnvelope {
239 error: String,
240 #[serde(skip_serializing_if = "Option::is_none")]
241 code: Option<String>,
242}
243
244pub async fn serve(addr: SocketAddr, state: AppState) -> anyhow::Result<()> {
245 let reaper_state = state.clone();
246 let session_part_persister_state = state.clone();
247 let session_context_run_journaler_state = state.clone();
248 let status_indexer_state = state.clone();
249 let routine_scheduler_state = state.clone();
250 let routine_executor_state = state.clone();
251 let usage_aggregator_state = state.clone();
252 let automation_v2_scheduler_state = state.clone();
253 let automation_v2_executor_state = state.clone();
254 let optimization_scheduler_state = state.clone();
255 let workflow_dispatcher_state = state.clone();
256 let agent_team_supervisor_state = state.clone();
257 let global_memory_ingestor_state = state.clone();
258 let bug_monitor_state = state.clone();
259 let mcp_bootstrap_state = state.clone();
260 tokio::spawn(async move {
261 bootstrap_mcp_servers_when_ready(mcp_bootstrap_state).await;
262 });
263 let app = app_router(state.clone());
264 let reaper = tokio::spawn(async move {
265 loop {
266 tokio::time::sleep(Duration::from_secs(5)).await;
267 let stale = reaper_state
268 .run_registry
269 .reap_stale(reaper_state.run_stale_ms)
270 .await;
271 for (session_id, run) in stale {
272 let _ = reaper_state.cancellations.cancel(&session_id).await;
273 let _ = reaper_state
274 .close_browser_sessions_for_owner(&session_id)
275 .await;
276 reaper_state.event_bus.publish(EngineEvent::new(
277 "session.run.finished",
278 json!({
279 "sessionID": session_id,
280 "runID": run.run_id,
281 "finishedAtMs": crate::now_ms(),
282 "status": "timeout",
283 }),
284 ));
285 }
286 }
287 });
288 let session_part_persister = tokio::spawn(crate::run_session_part_persister(
289 session_part_persister_state,
290 ));
291 let session_context_run_journaler = tokio::spawn(crate::run_session_context_run_journaler(
292 session_context_run_journaler_state,
293 ));
294 let status_indexer = tokio::spawn(crate::run_status_indexer(status_indexer_state));
295 let routine_scheduler = tokio::spawn(crate::run_routine_scheduler(routine_scheduler_state));
296 let routine_executor = tokio::spawn(crate::run_routine_executor(routine_executor_state));
297 let usage_aggregator = tokio::spawn(crate::run_usage_aggregator(usage_aggregator_state));
298 let automation_v2_scheduler = tokio::spawn(crate::run_automation_v2_scheduler(
299 automation_v2_scheduler_state,
300 ));
301 let automation_v2_executor = tokio::spawn(crate::run_automation_v2_executor(
302 automation_v2_executor_state,
303 ));
304 let optimization_scheduler = tokio::spawn(crate::run_optimization_scheduler(
305 optimization_scheduler_state,
306 ));
307 let workflow_dispatcher =
308 tokio::spawn(crate::run_workflow_dispatcher(workflow_dispatcher_state));
309 let agent_team_supervisor = tokio::spawn(crate::run_agent_team_supervisor(
310 agent_team_supervisor_state,
311 ));
312 let bug_monitor = tokio::spawn(crate::run_bug_monitor(bug_monitor_state));
313 let global_memory_ingestor =
314 tokio::spawn(run_global_memory_ingestor(global_memory_ingestor_state));
315 let shutdown_state = state.clone();
316 let shutdown_timeout_secs = crate::config::env::resolve_scheduler_shutdown_timeout_secs();
317
318 let hygiene_task = tokio::spawn(async move {
322 tokio::time::sleep(Duration::from_secs(60)).await;
324 loop {
325 let retention_days: u32 = std::env::var("TANDEM_MEMORY_RETENTION_DAYS")
326 .ok()
327 .and_then(|v| v.parse().ok())
328 .unwrap_or(30);
329 if retention_days > 0 {
330 match tandem_core::resolve_shared_paths() {
331 Ok(paths) => {
332 match tandem_memory::db::MemoryDatabase::new(&paths.memory_db_path).await {
333 Ok(db) => {
334 if let Err(e) = db.run_hygiene(retention_days).await {
335 tracing::warn!("memory hygiene failed: {}", e);
336 }
337 }
338 Err(e) => tracing::warn!("memory hygiene: could not open DB: {}", e),
339 }
340 }
341 Err(e) => tracing::warn!("memory hygiene: could not resolve paths: {}", e),
342 }
343 }
344 tokio::time::sleep(Duration::from_secs(12 * 60 * 60)).await;
345 }
346 });
347
348 let channel_listener_set = match tandem_channels::config::ChannelsConfig::from_env() {
352 Ok(config) => {
353 tracing::info!("tandem-channels: starting configured channel listeners");
354 let set = start_channel_listeners(config).await;
355 Some(set)
356 }
357 Err(e) => {
358 tracing::info!("tandem-channels: no channels configured ({})", e);
359 None
360 }
361 };
362
363 let listener = tokio::net::TcpListener::bind(addr).await?;
364 let result = axum::serve(listener, app)
365 .with_graceful_shutdown(async move {
366 if tokio::signal::ctrl_c().await.is_err() {
367 futures::future::pending::<()>().await;
368 }
369 shutdown_state.set_automation_scheduler_stopping(true);
370 tokio::time::sleep(Duration::from_secs(shutdown_timeout_secs)).await;
371 let failed = shutdown_state
372 .fail_running_automation_runs_for_shutdown()
373 .await;
374 if failed > 0 {
375 tracing::warn!(
376 failed_runs = failed,
377 "automation runs marked failed during scheduler shutdown"
378 );
379 }
380 })
381 .await;
382 reaper.abort();
383 session_part_persister.abort();
384 session_context_run_journaler.abort();
385 status_indexer.abort();
386 routine_scheduler.abort();
387 routine_executor.abort();
388 usage_aggregator.abort();
389 automation_v2_scheduler.abort();
390 automation_v2_executor.abort();
391 optimization_scheduler.abort();
392 workflow_dispatcher.abort();
393 agent_team_supervisor.abort();
394 bug_monitor.abort();
395 global_memory_ingestor.abort();
396 hygiene_task.abort();
397 if let Some(mut set) = channel_listener_set {
398 set.abort_all();
399 }
400 result?;
401 Ok(())
402}
403
404fn app_router(state: AppState) -> Router {
405 router::build_router(state)
406}
407fn load_run_events_jsonl(path: &FsPath, since_seq: Option<u64>, tail: Option<usize>) -> Vec<Value> {
408 let content = match std::fs::read_to_string(path) {
409 Ok(value) => value,
410 Err(_) => return Vec::new(),
411 };
412 let mut rows: Vec<Value> = content
413 .lines()
414 .filter_map(|line| serde_json::from_str::<Value>(line).ok())
415 .filter(|row| {
416 if let Some(since) = since_seq {
417 return row.get("seq").and_then(|value| value.as_u64()).unwrap_or(0) > since;
418 }
419 true
420 })
421 .collect();
422 rows.sort_by_key(|row| row.get("seq").and_then(|value| value.as_u64()).unwrap_or(0));
423 if let Some(tail_count) = tail {
424 if rows.len() > tail_count {
425 rows = rows.split_off(rows.len().saturating_sub(tail_count));
426 }
427 }
428 rows
429}
430
431pub(super) fn truncate_for_stream(input: &str, max_len: usize) -> String {
432 if input.len() <= max_len {
433 return input.to_string();
434 }
435 let mut out = input[..max_len].to_string();
436 out.push_str("...<truncated>");
437 out
438}
439
440#[cfg(test)]
441mod tests;