Skip to main content

tandem_server/
http.rs

1use std::collections::HashMap;
2use std::net::SocketAddr;
3use std::path::Path as FsPath;
4use std::path::PathBuf;
5use std::sync::Arc;
6use std::time::Duration;
7
8use async_trait::async_trait;
9use axum::extract::{Path, Query, State};
10use axum::http::header::{self, HeaderValue};
11use axum::http::{HeaderMap, StatusCode};
12use axum::response::sse::{Event, KeepAlive, Sse};
13use axum::response::IntoResponse;
14use axum::response::Response;
15use axum::{Json, Router};
16use futures::Stream;
17use regex::Regex;
18use serde::{Deserialize, Serialize};
19use serde_json::{json, Value};
20use sha2::{Digest, Sha256};
21use tandem_memory::types::GlobalMemoryRecord;
22use tandem_memory::{
23    db::MemoryDatabase, MemoryCapabilities, MemoryCapabilityToken, MemoryPromoteRequest,
24    MemoryPromoteResponse, MemoryPutRequest, MemoryPutResponse, MemorySearchRequest,
25    MemorySearchResponse, ScrubReport, ScrubStatus,
26};
27use tandem_skills::{SkillBundleArtifacts, SkillLocation, SkillService, SkillsConflictPolicy};
28use tokio_stream::wrappers::BroadcastStream;
29use tokio_stream::StreamExt;
30use uuid::Uuid;
31
32use tandem_channels::start_channel_listeners;
33use tandem_tools::Tool;
34use tandem_types::{
35    CreateSessionRequest, EngineEvent, Message, MessagePart, MessagePartInput, MessageRole,
36    SendMessageRequest, Session, TodoItem, ToolResult, ToolSchema,
37};
38use tandem_wire::{WireSession, WireSessionMessage};
39
40use crate::ResourceStoreError;
41use crate::{
42    capability_resolver::{
43        classify_missing_required, providers_for_capability, CapabilityBindingsFile,
44        CapabilityBlockingIssue, CapabilityReadinessInput, CapabilityReadinessOutput,
45        CapabilityResolveInput,
46    },
47    mcp_catalog,
48    pack_manager::{PackExportRequest, PackInstallRequest, PackUninstallRequest},
49    ActiveRun, AppState, ChannelStatus, DiscordConfigFile, SlackConfigFile, TelegramConfigFile,
50};
51
52mod automation_projection_runtime;
53pub(crate) mod bug_monitor;
54mod capabilities;
55mod channels_api;
56mod coder;
57mod config_providers;
58mod context_run_ledger;
59mod context_run_mutation_checkpoints;
60pub(crate) mod context_runs;
61pub(crate) mod context_types;
62mod external_actions;
63mod global;
64pub(crate) mod mcp;
65mod middleware;
66mod mission_builder;
67mod mission_builder_runtime;
68mod missions_teams;
69mod optimizations;
70mod pack_builder;
71mod packs;
72mod permissions_questions;
73mod presets;
74mod resources;
75mod router;
76mod routes_bug_monitor;
77mod routes_capabilities;
78mod routes_coder;
79mod routes_config_providers;
80mod routes_context;
81mod routes_external_actions;
82mod routes_global;
83mod routes_mcp;
84mod routes_mission_builder;
85mod routes_missions_teams;
86mod routes_optimizations;
87mod routes_pack_builder;
88mod routes_packs;
89mod routes_permissions_questions;
90mod routes_presets;
91mod routes_resources;
92mod routes_routines_automations;
93mod routes_sessions;
94mod routes_setup_understanding;
95mod routes_skills_memory;
96mod routes_system_api;
97mod routes_task_intake;
98mod routes_workflow_planner;
99mod routes_workflows;
100pub(crate) mod routines_automations;
101mod sessions;
102mod setup_understanding;
103mod skills_memory;
104mod system_api;
105mod task_intake;
106mod workflow_planner;
107mod workflow_planner_host;
108mod workflow_planner_policy;
109pub(crate) mod workflow_planner_runtime;
110mod workflow_planner_transport;
111mod workflows;
112
113use capabilities::*;
114use context_run_ledger::*;
115use context_run_mutation_checkpoints::*;
116use context_runs::*;
117use context_types::*;
118use mcp::*;
119use pack_builder::*;
120use packs::*;
121use permissions_questions::*;
122use presets::*;
123use resources::*;
124use sessions::*;
125use setup_understanding::*;
126use skills_memory::*;
127use system_api::*;
128
129#[cfg(test)]
130pub(crate) use context_runs::session_context_run_id;
131pub(crate) use context_runs::sync_workflow_run_blackboard;
132#[cfg(test)]
133pub(crate) use context_runs::workflow_context_run_id;
134pub(crate) use workflow_planner_runtime::compile_plan_to_automation_v2;
135
136#[derive(Debug, Deserialize)]
137struct ListSessionsQuery {
138    q: Option<String>,
139    page: Option<usize>,
140    page_size: Option<usize>,
141    archived: Option<bool>,
142    scope: Option<SessionScope>,
143    workspace: Option<String>,
144}
145
146#[derive(Debug, Deserialize, Default)]
147struct EventFilterQuery {
148    #[serde(rename = "sessionID")]
149    session_id: Option<String>,
150    #[serde(rename = "runID")]
151    run_id: Option<String>,
152}
153
154#[derive(Debug, Deserialize, Default, Clone, Copy)]
155struct RunEventsQuery {
156    since_seq: Option<u64>,
157    tail: Option<usize>,
158}
159
160#[derive(Debug, Deserialize, Default)]
161struct PromptAsyncQuery {
162    r#return: Option<String>,
163}
164
165#[derive(Debug, Deserialize)]
166struct EngineLeaseAcquireInput {
167    client_id: Option<String>,
168    client_type: Option<String>,
169    ttl_ms: Option<u64>,
170}
171
172#[derive(Debug, Deserialize)]
173struct EngineLeaseRenewInput {
174    lease_id: String,
175}
176
177#[derive(Debug, Deserialize)]
178struct EngineLeaseReleaseInput {
179    lease_id: String,
180}
181
182#[derive(Debug, Deserialize, Default)]
183struct StorageRepairInput {
184    force: Option<bool>,
185}
186
187#[derive(Debug, Deserialize, Default)]
188struct StorageFilesQuery {
189    path: Option<String>,
190    limit: Option<usize>,
191}
192
193#[derive(Debug, Deserialize, Default)]
194struct UpdateSessionInput {
195    title: Option<String>,
196    archived: Option<bool>,
197}
198
199#[derive(Debug, Deserialize)]
200struct AttachSessionInput {
201    target_workspace: String,
202    reason_tag: Option<String>,
203}
204
205#[derive(Debug, Deserialize)]
206struct WorkspaceOverrideInput {
207    ttl_seconds: Option<u64>,
208}
209
210#[derive(Debug, Deserialize, Default)]
211struct WorktreeInput {
212    repo_root: Option<String>,
213    path: Option<String>,
214    branch: Option<String>,
215    base: Option<String>,
216    task_id: Option<String>,
217    owner_run_id: Option<String>,
218    lease_id: Option<String>,
219    managed: Option<bool>,
220    cleanup_branch: Option<bool>,
221}
222
223#[derive(Debug, Deserialize, Default)]
224struct WorktreeListQuery {
225    repo_root: Option<String>,
226    managed_only: Option<bool>,
227}
228
229#[derive(Debug, Deserialize, Default)]
230struct LogInput {
231    level: Option<String>,
232    message: Option<String>,
233    context: Option<Value>,
234}
235
236#[derive(Debug, Serialize)]
237struct ErrorEnvelope {
238    error: String,
239    #[serde(skip_serializing_if = "Option::is_none")]
240    code: Option<String>,
241}
242
243pub async fn serve(addr: SocketAddr, state: AppState) -> anyhow::Result<()> {
244    let reaper_state = state.clone();
245    let session_part_persister_state = state.clone();
246    let session_context_run_journaler_state = state.clone();
247    let status_indexer_state = state.clone();
248    let routine_scheduler_state = state.clone();
249    let routine_executor_state = state.clone();
250    let usage_aggregator_state = state.clone();
251    let automation_v2_scheduler_state = state.clone();
252    let automation_v2_executor_state = state.clone();
253    let optimization_scheduler_state = state.clone();
254    let workflow_dispatcher_state = state.clone();
255    let agent_team_supervisor_state = state.clone();
256    let global_memory_ingestor_state = state.clone();
257    let bug_monitor_state = state.clone();
258    let mcp_bootstrap_state = state.clone();
259    tokio::spawn(async move {
260        bootstrap_mcp_servers_when_ready(mcp_bootstrap_state).await;
261    });
262    let app = app_router(state.clone());
263    let reaper = tokio::spawn(async move {
264        loop {
265            tokio::time::sleep(Duration::from_secs(5)).await;
266            let stale = reaper_state
267                .run_registry
268                .reap_stale(reaper_state.run_stale_ms)
269                .await;
270            for (session_id, run) in stale {
271                let _ = reaper_state.cancellations.cancel(&session_id).await;
272                let _ = reaper_state
273                    .close_browser_sessions_for_owner(&session_id)
274                    .await;
275                reaper_state.event_bus.publish(EngineEvent::new(
276                    "session.run.finished",
277                    json!({
278                        "sessionID": session_id,
279                        "runID": run.run_id,
280                        "finishedAtMs": crate::now_ms(),
281                        "status": "timeout",
282                    }),
283                ));
284            }
285        }
286    });
287    let session_part_persister = tokio::spawn(crate::run_session_part_persister(
288        session_part_persister_state,
289    ));
290    let session_context_run_journaler = tokio::spawn(crate::run_session_context_run_journaler(
291        session_context_run_journaler_state,
292    ));
293    let status_indexer = tokio::spawn(crate::run_status_indexer(status_indexer_state));
294    let routine_scheduler = tokio::spawn(crate::run_routine_scheduler(routine_scheduler_state));
295    let routine_executor = tokio::spawn(crate::run_routine_executor(routine_executor_state));
296    let usage_aggregator = tokio::spawn(crate::run_usage_aggregator(usage_aggregator_state));
297    let automation_v2_scheduler = tokio::spawn(crate::run_automation_v2_scheduler(
298        automation_v2_scheduler_state,
299    ));
300    let automation_v2_executor = tokio::spawn(crate::run_automation_v2_executor(
301        automation_v2_executor_state,
302    ));
303    let optimization_scheduler = tokio::spawn(crate::run_optimization_scheduler(
304        optimization_scheduler_state,
305    ));
306    let workflow_dispatcher =
307        tokio::spawn(crate::run_workflow_dispatcher(workflow_dispatcher_state));
308    let agent_team_supervisor = tokio::spawn(crate::run_agent_team_supervisor(
309        agent_team_supervisor_state,
310    ));
311    let bug_monitor = tokio::spawn(crate::run_bug_monitor(bug_monitor_state));
312    let global_memory_ingestor =
313        tokio::spawn(run_global_memory_ingestor(global_memory_ingestor_state));
314    let shutdown_state = state.clone();
315    let shutdown_timeout_secs = crate::config::env::resolve_scheduler_shutdown_timeout_secs();
316
317    // --- Memory hygiene background task (runs every 12 hours) ---
318    // Opens a fresh connection to memory.sqlite each cycle — safe because WAL
319    // mode allows concurrent readers alongside the main engine connection.
320    let hygiene_task = tokio::spawn(async move {
321        // Initial delay so startup is not impacted.
322        tokio::time::sleep(Duration::from_secs(60)).await;
323        loop {
324            let retention_days: u32 = std::env::var("TANDEM_MEMORY_RETENTION_DAYS")
325                .ok()
326                .and_then(|v| v.parse().ok())
327                .unwrap_or(30);
328            if retention_days > 0 {
329                match tandem_core::resolve_shared_paths() {
330                    Ok(paths) => {
331                        match tandem_memory::db::MemoryDatabase::new(&paths.memory_db_path).await {
332                            Ok(db) => {
333                                if let Err(e) = db.run_hygiene(retention_days).await {
334                                    tracing::warn!("memory hygiene failed: {}", e);
335                                }
336                            }
337                            Err(e) => tracing::warn!("memory hygiene: could not open DB: {}", e),
338                        }
339                    }
340                    Err(e) => tracing::warn!("memory hygiene: could not resolve paths: {}", e),
341                }
342            }
343            tokio::time::sleep(Duration::from_secs(12 * 60 * 60)).await;
344        }
345    });
346
347    // --- Channel listeners (optional) ---
348    // Reads TANDEM_TELEGRAM_BOT_TOKEN, TANDEM_DISCORD_BOT_TOKEN, TANDEM_SLACK_BOT_TOKEN etc.
349    // If no channels are configured the server starts normally without them.
350    let channel_listener_set = match tandem_channels::config::ChannelsConfig::from_env() {
351        Ok(config) => {
352            tracing::info!("tandem-channels: starting configured channel listeners");
353            let set = start_channel_listeners(config).await;
354            Some(set)
355        }
356        Err(e) => {
357            tracing::info!("tandem-channels: no channels configured ({})", e);
358            None
359        }
360    };
361
362    let listener = tokio::net::TcpListener::bind(addr).await?;
363    let result = axum::serve(listener, app)
364        .with_graceful_shutdown(async move {
365            if tokio::signal::ctrl_c().await.is_err() {
366                futures::future::pending::<()>().await;
367            }
368            shutdown_state.set_automation_scheduler_stopping(true);
369            tokio::time::sleep(Duration::from_secs(shutdown_timeout_secs)).await;
370            let failed = shutdown_state
371                .fail_running_automation_runs_for_shutdown()
372                .await;
373            if failed > 0 {
374                tracing::warn!(
375                    failed_runs = failed,
376                    "automation runs marked failed during scheduler shutdown"
377                );
378            }
379        })
380        .await;
381    reaper.abort();
382    session_part_persister.abort();
383    session_context_run_journaler.abort();
384    status_indexer.abort();
385    routine_scheduler.abort();
386    routine_executor.abort();
387    usage_aggregator.abort();
388    automation_v2_scheduler.abort();
389    automation_v2_executor.abort();
390    optimization_scheduler.abort();
391    workflow_dispatcher.abort();
392    agent_team_supervisor.abort();
393    bug_monitor.abort();
394    global_memory_ingestor.abort();
395    hygiene_task.abort();
396    if let Some(mut set) = channel_listener_set {
397        set.abort_all();
398    }
399    result?;
400    Ok(())
401}
402
403fn app_router(state: AppState) -> Router {
404    router::build_router(state)
405}
406fn load_run_events_jsonl(path: &FsPath, since_seq: Option<u64>, tail: Option<usize>) -> Vec<Value> {
407    let content = match std::fs::read_to_string(path) {
408        Ok(value) => value,
409        Err(_) => return Vec::new(),
410    };
411    let mut rows: Vec<Value> = content
412        .lines()
413        .filter_map(|line| serde_json::from_str::<Value>(line).ok())
414        .filter(|row| {
415            if let Some(since) = since_seq {
416                return row.get("seq").and_then(|value| value.as_u64()).unwrap_or(0) > since;
417            }
418            true
419        })
420        .collect();
421    rows.sort_by_key(|row| row.get("seq").and_then(|value| value.as_u64()).unwrap_or(0));
422    if let Some(tail_count) = tail {
423        if rows.len() > tail_count {
424            rows = rows.split_off(rows.len().saturating_sub(tail_count));
425        }
426    }
427    rows
428}
429
430pub(super) fn truncate_for_stream(input: &str, max_len: usize) -> String {
431    if input.len() <= max_len {
432        return input.to_string();
433    }
434    let mut out = input[..max_len].to_string();
435    out.push_str("...<truncated>");
436    out
437}
438
439#[cfg(test)]
440mod tests;