Skip to main content

tandem_server/
http.rs

1use std::collections::HashMap;
2use std::net::SocketAddr;
3use std::path::Path as FsPath;
4use std::path::PathBuf;
5use std::sync::Arc;
6use std::time::Duration;
7
8use async_trait::async_trait;
9use axum::extract::{Path, Query, State};
10use axum::http::header::{self, HeaderValue};
11use axum::http::{HeaderMap, StatusCode};
12use axum::response::sse::{Event, KeepAlive, Sse};
13use axum::response::IntoResponse;
14use axum::response::Response;
15use axum::{Json, Router};
16use futures::Stream;
17use regex::Regex;
18use serde::{Deserialize, Serialize};
19use serde_json::{json, Value};
20use sha2::{Digest, Sha256};
21use tandem_memory::types::GlobalMemoryRecord;
22use tandem_memory::{
23    db::MemoryDatabase, MemoryCapabilities, MemoryCapabilityToken, MemoryPromoteRequest,
24    MemoryPromoteResponse, MemoryPutRequest, MemoryPutResponse, MemorySearchRequest,
25    MemorySearchResponse, ScrubReport, ScrubStatus,
26};
27use tandem_skills::{SkillBundleArtifacts, SkillLocation, SkillService, SkillsConflictPolicy};
28use tokio_stream::wrappers::BroadcastStream;
29use tokio_stream::StreamExt;
30use uuid::Uuid;
31
32use tandem_channels::start_channel_listeners;
33use tandem_tools::Tool;
34use tandem_types::{
35    CreateSessionRequest, EngineEvent, Message, MessagePart, MessagePartInput, MessageRole,
36    SendMessageRequest, Session, TodoItem, ToolResult, ToolSchema,
37};
38use tandem_wire::{WireSession, WireSessionMessage};
39
40use crate::ResourceStoreError;
41use crate::{
42    capability_resolver::{
43        classify_missing_required, providers_for_capability, CapabilityBindingsFile,
44        CapabilityBlockingIssue, CapabilityReadinessInput, CapabilityReadinessOutput,
45        CapabilityResolveInput,
46    },
47    mcp_catalog,
48    pack_manager::{PackExportRequest, PackInstallRequest, PackUninstallRequest},
49    ActiveRun, AppState, ChannelStatus, DiscordConfigFile, SlackConfigFile, TelegramConfigFile,
50};
51
52mod automation_projection_runtime;
53pub(crate) mod bug_monitor;
54mod capabilities;
55mod channels_api;
56mod coder;
57mod config_providers;
58mod context_run_ledger;
59mod context_run_mutation_checkpoints;
60pub(crate) mod context_runs;
61pub(crate) mod context_types;
62mod external_actions;
63mod global;
64pub(crate) mod mcp;
65mod middleware;
66mod mission_builder;
67mod mission_builder_host;
68mod mission_builder_runtime;
69mod missions_teams;
70mod optimizations;
71mod pack_builder;
72mod packs;
73mod permissions_questions;
74mod presets;
75mod resources;
76mod router;
77mod routes_bug_monitor;
78mod routes_capabilities;
79mod routes_coder;
80mod routes_config_providers;
81mod routes_context;
82mod routes_external_actions;
83mod routes_global;
84mod routes_mcp;
85mod routes_mission_builder;
86mod routes_missions_teams;
87mod routes_optimizations;
88mod routes_pack_builder;
89mod routes_packs;
90mod routes_permissions_questions;
91mod routes_presets;
92mod routes_resources;
93mod routes_routines_automations;
94mod routes_sessions;
95mod routes_setup_understanding;
96mod routes_skills_memory;
97mod routes_system_api;
98mod routes_task_intake;
99mod routes_workflow_planner;
100mod routes_workflows;
101pub(crate) mod routines_automations;
102mod sessions;
103mod setup_understanding;
104mod skills_memory;
105mod system_api;
106mod task_intake;
107pub(crate) mod workflow_planner;
108mod workflow_planner_host;
109mod workflow_planner_policy;
110pub(crate) mod workflow_planner_runtime;
111mod workflow_planner_transport;
112mod workflows;
113
114use capabilities::*;
115use context_run_ledger::*;
116use context_run_mutation_checkpoints::*;
117use context_runs::*;
118use context_types::*;
119use mcp::*;
120use pack_builder::*;
121use packs::*;
122use permissions_questions::*;
123use presets::*;
124use resources::*;
125use sessions::*;
126use setup_understanding::*;
127use skills_memory::*;
128use system_api::*;
129
130#[cfg(test)]
131pub(crate) use context_runs::session_context_run_id;
132pub(crate) use context_runs::sync_workflow_run_blackboard;
133#[cfg(test)]
134pub(crate) use context_runs::workflow_context_run_id;
135pub(crate) use workflow_planner_runtime::compile_plan_to_automation_v2;
136
137#[derive(Debug, Deserialize)]
138struct ListSessionsQuery {
139    q: Option<String>,
140    page: Option<usize>,
141    page_size: Option<usize>,
142    archived: Option<bool>,
143    scope: Option<SessionScope>,
144    workspace: Option<String>,
145}
146
147#[derive(Debug, Deserialize, Default)]
148struct EventFilterQuery {
149    #[serde(rename = "sessionID")]
150    session_id: Option<String>,
151    #[serde(rename = "runID")]
152    run_id: Option<String>,
153}
154
155#[derive(Debug, Deserialize, Default, Clone, Copy)]
156struct RunEventsQuery {
157    since_seq: Option<u64>,
158    tail: Option<usize>,
159}
160
161#[derive(Debug, Deserialize, Default)]
162struct PromptAsyncQuery {
163    r#return: Option<String>,
164}
165
166#[derive(Debug, Deserialize)]
167struct EngineLeaseAcquireInput {
168    client_id: Option<String>,
169    client_type: Option<String>,
170    ttl_ms: Option<u64>,
171}
172
173#[derive(Debug, Deserialize)]
174struct EngineLeaseRenewInput {
175    lease_id: String,
176}
177
178#[derive(Debug, Deserialize)]
179struct EngineLeaseReleaseInput {
180    lease_id: String,
181}
182
183#[derive(Debug, Deserialize, Default)]
184struct StorageRepairInput {
185    force: Option<bool>,
186}
187
188#[derive(Debug, Deserialize, Default)]
189struct StorageFilesQuery {
190    path: Option<String>,
191    limit: Option<usize>,
192}
193
194#[derive(Debug, Deserialize, Default)]
195struct UpdateSessionInput {
196    title: Option<String>,
197    archived: Option<bool>,
198}
199
200#[derive(Debug, Deserialize)]
201struct AttachSessionInput {
202    target_workspace: String,
203    reason_tag: Option<String>,
204}
205
206#[derive(Debug, Deserialize)]
207struct WorkspaceOverrideInput {
208    ttl_seconds: Option<u64>,
209}
210
211#[derive(Debug, Deserialize, Default)]
212struct WorktreeInput {
213    repo_root: Option<String>,
214    path: Option<String>,
215    branch: Option<String>,
216    base: Option<String>,
217    task_id: Option<String>,
218    owner_run_id: Option<String>,
219    lease_id: Option<String>,
220    managed: Option<bool>,
221    cleanup_branch: Option<bool>,
222}
223
224#[derive(Debug, Deserialize, Default)]
225struct WorktreeListQuery {
226    repo_root: Option<String>,
227    managed_only: Option<bool>,
228}
229
230#[derive(Debug, Deserialize, Default)]
231struct LogInput {
232    level: Option<String>,
233    message: Option<String>,
234    context: Option<Value>,
235}
236
237#[derive(Debug, Serialize)]
238struct ErrorEnvelope {
239    error: String,
240    #[serde(skip_serializing_if = "Option::is_none")]
241    code: Option<String>,
242}
243
244pub async fn serve(addr: SocketAddr, state: AppState) -> anyhow::Result<()> {
245    let reaper_state = state.clone();
246    let session_part_persister_state = state.clone();
247    let session_context_run_journaler_state = state.clone();
248    let status_indexer_state = state.clone();
249    let routine_scheduler_state = state.clone();
250    let routine_executor_state = state.clone();
251    let usage_aggregator_state = state.clone();
252    let automation_v2_scheduler_state = state.clone();
253    let automation_v2_executor_state = state.clone();
254    let optimization_scheduler_state = state.clone();
255    let workflow_dispatcher_state = state.clone();
256    let agent_team_supervisor_state = state.clone();
257    let global_memory_ingestor_state = state.clone();
258    let bug_monitor_state = state.clone();
259    let mcp_bootstrap_state = state.clone();
260    tokio::spawn(async move {
261        bootstrap_mcp_servers_when_ready(mcp_bootstrap_state).await;
262    });
263    let app = app_router(state.clone());
264    let reaper = tokio::spawn(async move {
265        loop {
266            tokio::time::sleep(Duration::from_secs(5)).await;
267            let stale = reaper_state
268                .run_registry
269                .reap_stale(reaper_state.run_stale_ms)
270                .await;
271            for (session_id, run) in stale {
272                let _ = reaper_state.cancellations.cancel(&session_id).await;
273                let _ = reaper_state
274                    .close_browser_sessions_for_owner(&session_id)
275                    .await;
276                reaper_state.event_bus.publish(EngineEvent::new(
277                    "session.run.finished",
278                    json!({
279                        "sessionID": session_id,
280                        "runID": run.run_id,
281                        "finishedAtMs": crate::now_ms(),
282                        "status": "timeout",
283                    }),
284                ));
285            }
286        }
287    });
288    let session_part_persister = tokio::spawn(crate::run_session_part_persister(
289        session_part_persister_state,
290    ));
291    let session_context_run_journaler = tokio::spawn(crate::run_session_context_run_journaler(
292        session_context_run_journaler_state,
293    ));
294    let status_indexer = tokio::spawn(crate::run_status_indexer(status_indexer_state));
295    let routine_scheduler = tokio::spawn(crate::run_routine_scheduler(routine_scheduler_state));
296    let routine_executor = tokio::spawn(crate::run_routine_executor(routine_executor_state));
297    let usage_aggregator = tokio::spawn(crate::run_usage_aggregator(usage_aggregator_state));
298    let automation_v2_scheduler = tokio::spawn(crate::run_automation_v2_scheduler(
299        automation_v2_scheduler_state,
300    ));
301    let automation_v2_executor = tokio::spawn(crate::run_automation_v2_executor(
302        automation_v2_executor_state,
303    ));
304    let optimization_scheduler = tokio::spawn(crate::run_optimization_scheduler(
305        optimization_scheduler_state,
306    ));
307    let workflow_dispatcher =
308        tokio::spawn(crate::run_workflow_dispatcher(workflow_dispatcher_state));
309    let agent_team_supervisor = tokio::spawn(crate::run_agent_team_supervisor(
310        agent_team_supervisor_state,
311    ));
312    let bug_monitor = tokio::spawn(crate::run_bug_monitor(bug_monitor_state));
313    let global_memory_ingestor =
314        tokio::spawn(run_global_memory_ingestor(global_memory_ingestor_state));
315    let shutdown_state = state.clone();
316    let shutdown_timeout_secs = crate::config::env::resolve_scheduler_shutdown_timeout_secs();
317
318    // --- Memory hygiene background task (runs every 12 hours) ---
319    // Opens a fresh connection to memory.sqlite each cycle — safe because WAL
320    // mode allows concurrent readers alongside the main engine connection.
321    let hygiene_task = tokio::spawn(async move {
322        // Initial delay so startup is not impacted.
323        tokio::time::sleep(Duration::from_secs(60)).await;
324        loop {
325            let retention_days: u32 = std::env::var("TANDEM_MEMORY_RETENTION_DAYS")
326                .ok()
327                .and_then(|v| v.parse().ok())
328                .unwrap_or(30);
329            if retention_days > 0 {
330                match tandem_core::resolve_shared_paths() {
331                    Ok(paths) => {
332                        match tandem_memory::db::MemoryDatabase::new(&paths.memory_db_path).await {
333                            Ok(db) => {
334                                if let Err(e) = db.run_hygiene(retention_days).await {
335                                    tracing::warn!("memory hygiene failed: {}", e);
336                                }
337                            }
338                            Err(e) => tracing::warn!("memory hygiene: could not open DB: {}", e),
339                        }
340                    }
341                    Err(e) => tracing::warn!("memory hygiene: could not resolve paths: {}", e),
342                }
343            }
344            tokio::time::sleep(Duration::from_secs(12 * 60 * 60)).await;
345        }
346    });
347
348    // --- Channel listeners (optional) ---
349    // Reads TANDEM_TELEGRAM_BOT_TOKEN, TANDEM_DISCORD_BOT_TOKEN, TANDEM_SLACK_BOT_TOKEN etc.
350    // If no channels are configured the server starts normally without them.
351    let channel_listener_set = match tandem_channels::config::ChannelsConfig::from_env() {
352        Ok(config) => {
353            tracing::info!("tandem-channels: starting configured channel listeners");
354            let set = start_channel_listeners(config).await;
355            Some(set)
356        }
357        Err(e) => {
358            tracing::info!("tandem-channels: no channels configured ({})", e);
359            None
360        }
361    };
362
363    let listener = tokio::net::TcpListener::bind(addr).await?;
364    let result = axum::serve(listener, app)
365        .with_graceful_shutdown(async move {
366            if tokio::signal::ctrl_c().await.is_err() {
367                futures::future::pending::<()>().await;
368            }
369            shutdown_state.set_automation_scheduler_stopping(true);
370            tokio::time::sleep(Duration::from_secs(shutdown_timeout_secs)).await;
371            let failed = shutdown_state
372                .fail_running_automation_runs_for_shutdown()
373                .await;
374            if failed > 0 {
375                tracing::warn!(
376                    failed_runs = failed,
377                    "automation runs marked failed during scheduler shutdown"
378                );
379            }
380        })
381        .await;
382    reaper.abort();
383    session_part_persister.abort();
384    session_context_run_journaler.abort();
385    status_indexer.abort();
386    routine_scheduler.abort();
387    routine_executor.abort();
388    usage_aggregator.abort();
389    automation_v2_scheduler.abort();
390    automation_v2_executor.abort();
391    optimization_scheduler.abort();
392    workflow_dispatcher.abort();
393    agent_team_supervisor.abort();
394    bug_monitor.abort();
395    global_memory_ingestor.abort();
396    hygiene_task.abort();
397    if let Some(mut set) = channel_listener_set {
398        set.abort_all();
399    }
400    result?;
401    Ok(())
402}
403
404fn app_router(state: AppState) -> Router {
405    router::build_router(state)
406}
407fn load_run_events_jsonl(path: &FsPath, since_seq: Option<u64>, tail: Option<usize>) -> Vec<Value> {
408    let content = match std::fs::read_to_string(path) {
409        Ok(value) => value,
410        Err(_) => return Vec::new(),
411    };
412    let mut rows: Vec<Value> = content
413        .lines()
414        .filter_map(|line| serde_json::from_str::<Value>(line).ok())
415        .filter(|row| {
416            if let Some(since) = since_seq {
417                return row.get("seq").and_then(|value| value.as_u64()).unwrap_or(0) > since;
418            }
419            true
420        })
421        .collect();
422    rows.sort_by_key(|row| row.get("seq").and_then(|value| value.as_u64()).unwrap_or(0));
423    if let Some(tail_count) = tail {
424        if rows.len() > tail_count {
425            rows = rows.split_off(rows.len().saturating_sub(tail_count));
426        }
427    }
428    rows
429}
430
431pub(super) fn truncate_for_stream(input: &str, max_len: usize) -> String {
432    if input.len() <= max_len {
433        return input.to_string();
434    }
435    let mut out = input[..max_len].to_string();
436    out.push_str("...<truncated>");
437    out
438}
439
440#[cfg(test)]
441mod tests;