1use std::collections::HashMap;
2use std::net::SocketAddr;
3use std::path::Path as FsPath;
4use std::path::PathBuf;
5use std::sync::Arc;
6use std::time::Duration;
7
8use async_trait::async_trait;
9use axum::extract::{Path, Query, State};
10use axum::http::header::{self, HeaderValue};
11use axum::http::{HeaderMap, StatusCode};
12use axum::response::sse::{Event, KeepAlive, Sse};
13use axum::response::IntoResponse;
14use axum::response::Response;
15use axum::{Json, Router};
16use futures::Stream;
17use regex::Regex;
18use serde::{Deserialize, Serialize};
19use serde_json::{json, Value};
20use sha2::{Digest, Sha256};
21use tandem_memory::types::GlobalMemoryRecord;
22use tandem_memory::{
23 db::MemoryDatabase, MemoryCapabilities, MemoryCapabilityToken, MemoryPromoteRequest,
24 MemoryPromoteResponse, MemoryPutRequest, MemoryPutResponse, MemorySearchRequest,
25 MemorySearchResponse, ScrubReport, ScrubStatus,
26};
27use tandem_skills::{SkillBundleArtifacts, SkillLocation, SkillService, SkillsConflictPolicy};
28use tokio_stream::wrappers::BroadcastStream;
29use tokio_stream::StreamExt;
30use uuid::Uuid;
31
32use tandem_channels::start_channel_listeners;
33use tandem_tools::Tool;
34use tandem_types::{
35 CreateSessionRequest, EngineEvent, Message, MessagePart, MessagePartInput, MessageRole,
36 SendMessageRequest, Session, TodoItem, ToolResult, ToolSchema,
37};
38use tandem_wire::{WireSession, WireSessionMessage};
39
40use crate::ResourceStoreError;
41use crate::{
42 capability_resolver::{
43 classify_missing_required, providers_for_capability, CapabilityBindingsFile,
44 CapabilityBlockingIssue, CapabilityReadinessInput, CapabilityReadinessOutput,
45 CapabilityResolveInput,
46 },
47 mcp_catalog,
48 pack_manager::{PackExportRequest, PackInstallRequest, PackUninstallRequest},
49 ActiveRun, AppState, ChannelStatus, DiscordConfigFile, SlackConfigFile, TelegramConfigFile,
50};
51
52mod automation_projection_runtime;
53pub(crate) mod bug_monitor;
54mod capabilities;
55mod channels_api;
56mod coder;
57mod config_providers;
58mod context_run_ledger;
59mod context_run_mutation_checkpoints;
60pub(crate) mod context_runs;
61pub(crate) mod context_types;
62mod external_actions;
63mod global;
64pub(crate) mod mcp;
65mod middleware;
66mod mission_builder;
67mod mission_builder_runtime;
68mod missions_teams;
69mod optimizations;
70mod pack_builder;
71mod packs;
72mod permissions_questions;
73mod presets;
74mod resources;
75mod router;
76mod routes_bug_monitor;
77mod routes_capabilities;
78mod routes_coder;
79mod routes_config_providers;
80mod routes_context;
81mod routes_external_actions;
82mod routes_global;
83mod routes_mcp;
84mod routes_mission_builder;
85mod routes_missions_teams;
86mod routes_optimizations;
87mod routes_pack_builder;
88mod routes_packs;
89mod routes_permissions_questions;
90mod routes_presets;
91mod routes_resources;
92mod routes_routines_automations;
93mod routes_sessions;
94mod routes_setup_understanding;
95mod routes_skills_memory;
96mod routes_system_api;
97mod routes_task_intake;
98mod routes_workflow_planner;
99mod routes_workflows;
100pub(crate) mod routines_automations;
101mod sessions;
102mod setup_understanding;
103mod skills_memory;
104mod system_api;
105mod task_intake;
106mod workflow_planner;
107mod workflow_planner_host;
108mod workflow_planner_policy;
109pub(crate) mod workflow_planner_runtime;
110mod workflow_planner_transport;
111mod workflows;
112
113use capabilities::*;
114use context_run_ledger::*;
115use context_run_mutation_checkpoints::*;
116use context_runs::*;
117use context_types::*;
118use mcp::*;
119use pack_builder::*;
120use packs::*;
121use permissions_questions::*;
122use presets::*;
123use resources::*;
124use sessions::*;
125use setup_understanding::*;
126use skills_memory::*;
127use system_api::*;
128
129#[cfg(test)]
130pub(crate) use context_runs::session_context_run_id;
131pub(crate) use context_runs::sync_workflow_run_blackboard;
132#[cfg(test)]
133pub(crate) use context_runs::workflow_context_run_id;
134pub(crate) use workflow_planner_runtime::compile_plan_to_automation_v2;
135
136#[derive(Debug, Deserialize)]
137struct ListSessionsQuery {
138 q: Option<String>,
139 page: Option<usize>,
140 page_size: Option<usize>,
141 archived: Option<bool>,
142 scope: Option<SessionScope>,
143 workspace: Option<String>,
144}
145
146#[derive(Debug, Deserialize, Default)]
147struct EventFilterQuery {
148 #[serde(rename = "sessionID")]
149 session_id: Option<String>,
150 #[serde(rename = "runID")]
151 run_id: Option<String>,
152}
153
154#[derive(Debug, Deserialize, Default, Clone, Copy)]
155struct RunEventsQuery {
156 since_seq: Option<u64>,
157 tail: Option<usize>,
158}
159
160#[derive(Debug, Deserialize, Default)]
161struct PromptAsyncQuery {
162 r#return: Option<String>,
163}
164
165#[derive(Debug, Deserialize)]
166struct EngineLeaseAcquireInput {
167 client_id: Option<String>,
168 client_type: Option<String>,
169 ttl_ms: Option<u64>,
170}
171
172#[derive(Debug, Deserialize)]
173struct EngineLeaseRenewInput {
174 lease_id: String,
175}
176
177#[derive(Debug, Deserialize)]
178struct EngineLeaseReleaseInput {
179 lease_id: String,
180}
181
182#[derive(Debug, Deserialize, Default)]
183struct StorageRepairInput {
184 force: Option<bool>,
185}
186
187#[derive(Debug, Deserialize, Default)]
188struct StorageFilesQuery {
189 path: Option<String>,
190 limit: Option<usize>,
191}
192
193#[derive(Debug, Deserialize, Default)]
194struct UpdateSessionInput {
195 title: Option<String>,
196 archived: Option<bool>,
197}
198
199#[derive(Debug, Deserialize)]
200struct AttachSessionInput {
201 target_workspace: String,
202 reason_tag: Option<String>,
203}
204
205#[derive(Debug, Deserialize)]
206struct WorkspaceOverrideInput {
207 ttl_seconds: Option<u64>,
208}
209
210#[derive(Debug, Deserialize, Default)]
211struct WorktreeInput {
212 repo_root: Option<String>,
213 path: Option<String>,
214 branch: Option<String>,
215 base: Option<String>,
216 task_id: Option<String>,
217 owner_run_id: Option<String>,
218 lease_id: Option<String>,
219 managed: Option<bool>,
220 cleanup_branch: Option<bool>,
221}
222
223#[derive(Debug, Deserialize, Default)]
224struct WorktreeListQuery {
225 repo_root: Option<String>,
226 managed_only: Option<bool>,
227}
228
229#[derive(Debug, Deserialize, Default)]
230struct LogInput {
231 level: Option<String>,
232 message: Option<String>,
233 context: Option<Value>,
234}
235
236#[derive(Debug, Serialize)]
237struct ErrorEnvelope {
238 error: String,
239 #[serde(skip_serializing_if = "Option::is_none")]
240 code: Option<String>,
241}
242
243pub async fn serve(addr: SocketAddr, state: AppState) -> anyhow::Result<()> {
244 let reaper_state = state.clone();
245 let session_part_persister_state = state.clone();
246 let session_context_run_journaler_state = state.clone();
247 let status_indexer_state = state.clone();
248 let routine_scheduler_state = state.clone();
249 let routine_executor_state = state.clone();
250 let usage_aggregator_state = state.clone();
251 let automation_v2_scheduler_state = state.clone();
252 let automation_v2_executor_state = state.clone();
253 let optimization_scheduler_state = state.clone();
254 let workflow_dispatcher_state = state.clone();
255 let agent_team_supervisor_state = state.clone();
256 let global_memory_ingestor_state = state.clone();
257 let bug_monitor_state = state.clone();
258 let mcp_bootstrap_state = state.clone();
259 tokio::spawn(async move {
260 bootstrap_mcp_servers_when_ready(mcp_bootstrap_state).await;
261 });
262 let app = app_router(state.clone());
263 let reaper = tokio::spawn(async move {
264 loop {
265 tokio::time::sleep(Duration::from_secs(5)).await;
266 let stale = reaper_state
267 .run_registry
268 .reap_stale(reaper_state.run_stale_ms)
269 .await;
270 for (session_id, run) in stale {
271 let _ = reaper_state.cancellations.cancel(&session_id).await;
272 let _ = reaper_state
273 .close_browser_sessions_for_owner(&session_id)
274 .await;
275 reaper_state.event_bus.publish(EngineEvent::new(
276 "session.run.finished",
277 json!({
278 "sessionID": session_id,
279 "runID": run.run_id,
280 "finishedAtMs": crate::now_ms(),
281 "status": "timeout",
282 }),
283 ));
284 }
285 }
286 });
287 let session_part_persister = tokio::spawn(crate::run_session_part_persister(
288 session_part_persister_state,
289 ));
290 let session_context_run_journaler = tokio::spawn(crate::run_session_context_run_journaler(
291 session_context_run_journaler_state,
292 ));
293 let status_indexer = tokio::spawn(crate::run_status_indexer(status_indexer_state));
294 let routine_scheduler = tokio::spawn(crate::run_routine_scheduler(routine_scheduler_state));
295 let routine_executor = tokio::spawn(crate::run_routine_executor(routine_executor_state));
296 let usage_aggregator = tokio::spawn(crate::run_usage_aggregator(usage_aggregator_state));
297 let automation_v2_scheduler = tokio::spawn(crate::run_automation_v2_scheduler(
298 automation_v2_scheduler_state,
299 ));
300 let automation_v2_executor = tokio::spawn(crate::run_automation_v2_executor(
301 automation_v2_executor_state,
302 ));
303 let optimization_scheduler = tokio::spawn(crate::run_optimization_scheduler(
304 optimization_scheduler_state,
305 ));
306 let workflow_dispatcher =
307 tokio::spawn(crate::run_workflow_dispatcher(workflow_dispatcher_state));
308 let agent_team_supervisor = tokio::spawn(crate::run_agent_team_supervisor(
309 agent_team_supervisor_state,
310 ));
311 let bug_monitor = tokio::spawn(crate::run_bug_monitor(bug_monitor_state));
312 let global_memory_ingestor =
313 tokio::spawn(run_global_memory_ingestor(global_memory_ingestor_state));
314 let shutdown_state = state.clone();
315 let shutdown_timeout_secs = crate::config::env::resolve_scheduler_shutdown_timeout_secs();
316
317 let hygiene_task = tokio::spawn(async move {
321 tokio::time::sleep(Duration::from_secs(60)).await;
323 loop {
324 let retention_days: u32 = std::env::var("TANDEM_MEMORY_RETENTION_DAYS")
325 .ok()
326 .and_then(|v| v.parse().ok())
327 .unwrap_or(30);
328 if retention_days > 0 {
329 match tandem_core::resolve_shared_paths() {
330 Ok(paths) => {
331 match tandem_memory::db::MemoryDatabase::new(&paths.memory_db_path).await {
332 Ok(db) => {
333 if let Err(e) = db.run_hygiene(retention_days).await {
334 tracing::warn!("memory hygiene failed: {}", e);
335 }
336 }
337 Err(e) => tracing::warn!("memory hygiene: could not open DB: {}", e),
338 }
339 }
340 Err(e) => tracing::warn!("memory hygiene: could not resolve paths: {}", e),
341 }
342 }
343 tokio::time::sleep(Duration::from_secs(12 * 60 * 60)).await;
344 }
345 });
346
347 let channel_listener_set = match tandem_channels::config::ChannelsConfig::from_env() {
351 Ok(config) => {
352 tracing::info!("tandem-channels: starting configured channel listeners");
353 let set = start_channel_listeners(config).await;
354 Some(set)
355 }
356 Err(e) => {
357 tracing::info!("tandem-channels: no channels configured ({})", e);
358 None
359 }
360 };
361
362 let listener = tokio::net::TcpListener::bind(addr).await?;
363 let result = axum::serve(listener, app)
364 .with_graceful_shutdown(async move {
365 if tokio::signal::ctrl_c().await.is_err() {
366 futures::future::pending::<()>().await;
367 }
368 shutdown_state.set_automation_scheduler_stopping(true);
369 tokio::time::sleep(Duration::from_secs(shutdown_timeout_secs)).await;
370 let failed = shutdown_state
371 .fail_running_automation_runs_for_shutdown()
372 .await;
373 if failed > 0 {
374 tracing::warn!(
375 failed_runs = failed,
376 "automation runs marked failed during scheduler shutdown"
377 );
378 }
379 })
380 .await;
381 reaper.abort();
382 session_part_persister.abort();
383 session_context_run_journaler.abort();
384 status_indexer.abort();
385 routine_scheduler.abort();
386 routine_executor.abort();
387 usage_aggregator.abort();
388 automation_v2_scheduler.abort();
389 automation_v2_executor.abort();
390 optimization_scheduler.abort();
391 workflow_dispatcher.abort();
392 agent_team_supervisor.abort();
393 bug_monitor.abort();
394 global_memory_ingestor.abort();
395 hygiene_task.abort();
396 if let Some(mut set) = channel_listener_set {
397 set.abort_all();
398 }
399 result?;
400 Ok(())
401}
402
403fn app_router(state: AppState) -> Router {
404 router::build_router(state)
405}
406fn load_run_events_jsonl(path: &FsPath, since_seq: Option<u64>, tail: Option<usize>) -> Vec<Value> {
407 let content = match std::fs::read_to_string(path) {
408 Ok(value) => value,
409 Err(_) => return Vec::new(),
410 };
411 let mut rows: Vec<Value> = content
412 .lines()
413 .filter_map(|line| serde_json::from_str::<Value>(line).ok())
414 .filter(|row| {
415 if let Some(since) = since_seq {
416 return row.get("seq").and_then(|value| value.as_u64()).unwrap_or(0) > since;
417 }
418 true
419 })
420 .collect();
421 rows.sort_by_key(|row| row.get("seq").and_then(|value| value.as_u64()).unwrap_or(0));
422 if let Some(tail_count) = tail {
423 if rows.len() > tail_count {
424 rows = rows.split_off(rows.len().saturating_sub(tail_count));
425 }
426 }
427 rows
428}
429
430pub(super) fn truncate_for_stream(input: &str, max_len: usize) -> String {
431 if input.len() <= max_len {
432 return input.to_string();
433 }
434 let mut out = input[..max_len].to_string();
435 out.push_str("...<truncated>");
436 out
437}
438
439#[cfg(test)]
440mod tests;