Skip to main content

tandem_server/
http.rs

1use std::collections::HashMap;
2use std::net::SocketAddr;
3use std::path::Path as FsPath;
4use std::path::PathBuf;
5use std::sync::Arc;
6use std::time::Duration;
7
8use async_trait::async_trait;
9use axum::extract::Extension;
10use axum::extract::{Path, Query, State};
11use axum::http::header::{self, HeaderValue};
12use axum::http::{HeaderMap, StatusCode};
13use axum::response::sse::{Event, KeepAlive, Sse};
14use axum::response::IntoResponse;
15use axum::response::Response;
16use axum::{Json, Router};
17use futures::Stream;
18use regex::Regex;
19use serde::{Deserialize, Serialize};
20use serde_json::{json, Value};
21use sha2::{Digest, Sha256};
22use tandem_memory::types::GlobalMemoryRecord;
23use tandem_memory::{
24    db::MemoryDatabase, MemoryCapabilities, MemoryCapabilityToken, MemoryPromoteRequest,
25    MemoryPromoteResponse, MemoryPutRequest, MemoryPutResponse, MemorySearchRequest,
26    MemorySearchResponse, ScrubReport, ScrubStatus,
27};
28use tandem_skills::{SkillBundleArtifacts, SkillLocation, SkillService, SkillsConflictPolicy};
29use tokio_stream::wrappers::BroadcastStream;
30use tokio_stream::StreamExt;
31use uuid::Uuid;
32
33use tandem_tools::Tool;
34use tandem_types::{
35    CreateSessionRequest, EngineEvent, Message, MessagePart, MessagePartInput, MessageRole,
36    SendMessageRequest, Session, TenantContext, TodoItem, ToolResult, ToolSchema,
37};
38use tandem_wire::{WireSession, WireSessionMessage};
39
40use crate::ResourceStoreError;
41use crate::{
42    capability_resolver::{
43        classify_missing_required, providers_for_capability, CapabilityBindingsFile,
44        CapabilityBlockingIssue, CapabilityReadinessInput, CapabilityReadinessOutput,
45        CapabilityResolveInput,
46    },
47    mcp_catalog,
48    pack_manager::{PackExportRequest, PackInstallRequest, PackUninstallRequest},
49    ActiveRun, AppState, DiscordConfigFile, SlackConfigFile, TelegramConfigFile,
50};
51
52mod automation_projection_runtime;
53pub(crate) mod bug_monitor;
54mod capabilities;
55mod channels_api;
56mod coder;
57pub(crate) mod config_providers;
58pub(crate) mod context_packs;
59mod context_run_ledger;
60mod context_run_mutation_checkpoints;
61pub(crate) mod context_runs;
62pub(crate) mod context_types;
63mod enterprise;
64mod external_actions;
65mod global;
66pub(crate) mod governance;
67mod marketplace;
68pub(crate) mod mcp;
69mod middleware;
70mod mission_builder;
71mod mission_builder_host;
72mod mission_builder_runtime;
73mod missions_teams;
74mod optimizations;
75mod pack_builder;
76mod packs;
77mod permissions_questions;
78mod presets;
79mod resources;
80mod router;
81mod routes_bug_monitor;
82mod routes_capabilities;
83mod routes_coder;
84mod routes_config_providers;
85mod routes_context;
86mod routes_external_actions;
87mod routes_global;
88mod routes_governance;
89mod routes_marketplace;
90mod routes_mcp;
91mod routes_mission_builder;
92mod routes_missions_teams;
93mod routes_optimizations;
94mod routes_pack_builder;
95mod routes_packs;
96mod routes_permissions_questions;
97mod routes_presets;
98mod routes_resources;
99mod routes_routines_automations;
100mod routes_sessions;
101mod routes_setup_understanding;
102mod routes_skills_memory;
103mod routes_system_api;
104mod routes_task_intake;
105mod routes_workflow_planner;
106mod routes_workflows;
107pub(crate) mod routines_automations;
108mod sessions;
109mod setup_understanding;
110mod skills_memory;
111mod system_api;
112mod task_intake;
113pub(crate) mod workflow_planner;
114mod workflow_planner_host;
115mod workflow_planner_policy;
116pub(crate) mod workflow_planner_runtime;
117mod workflow_planner_transport;
118mod workflows;
119
120use capabilities::*;
121use context_runs::*;
122use context_types::*;
123use marketplace::*;
124use mcp::*;
125use pack_builder::*;
126use packs::*;
127use permissions_questions::*;
128use presets::*;
129use resources::*;
130use sessions::*;
131use setup_understanding::*;
132use skills_memory::*;
133use system_api::*;
134
135#[cfg(test)]
136pub(crate) use context_runs::session_context_run_id;
137pub(crate) use context_runs::sync_workflow_run_blackboard;
138#[cfg(test)]
139pub(crate) use context_runs::workflow_context_run_id;
140pub(crate) use workflow_planner_runtime::compile_plan_to_automation_v2;
141
142#[derive(Debug, Deserialize)]
143struct ListSessionsQuery {
144    q: Option<String>,
145    page: Option<usize>,
146    page_size: Option<usize>,
147    archived: Option<bool>,
148    scope: Option<SessionScope>,
149    workspace: Option<String>,
150}
151
152#[derive(Debug, Deserialize, Default)]
153struct EventFilterQuery {
154    #[serde(rename = "sessionID")]
155    session_id: Option<String>,
156    #[serde(rename = "runID")]
157    run_id: Option<String>,
158}
159
160#[derive(Debug, Deserialize, Default, Clone, Copy)]
161struct RunEventsQuery {
162    since_seq: Option<u64>,
163    tail: Option<usize>,
164}
165
166#[derive(Debug, Deserialize, Default)]
167struct PromptAsyncQuery {
168    r#return: Option<String>,
169}
170
171#[derive(Debug, Deserialize)]
172struct EngineLeaseAcquireInput {
173    client_id: Option<String>,
174    client_type: Option<String>,
175    ttl_ms: Option<u64>,
176}
177
178#[derive(Debug, Deserialize)]
179struct EngineLeaseRenewInput {
180    lease_id: String,
181}
182
183#[derive(Debug, Deserialize)]
184struct EngineLeaseReleaseInput {
185    lease_id: String,
186}
187
188#[derive(Debug, Deserialize, Default)]
189struct StorageRepairInput {
190    force: Option<bool>,
191}
192
193#[derive(Debug, Deserialize, Default)]
194struct StorageFilesQuery {
195    path: Option<String>,
196    limit: Option<usize>,
197}
198
199#[derive(Debug, Deserialize, Default)]
200struct UpdateSessionInput {
201    title: Option<String>,
202    archived: Option<bool>,
203    permission: Option<Vec<serde_json::Value>>,
204}
205
206#[derive(Debug, Deserialize)]
207struct AttachSessionInput {
208    target_workspace: String,
209    reason_tag: Option<String>,
210}
211
212#[derive(Debug, Deserialize)]
213struct WorkspaceOverrideInput {
214    ttl_seconds: Option<u64>,
215}
216
217#[derive(Debug, Deserialize, Default)]
218struct WorktreeInput {
219    repo_root: Option<String>,
220    path: Option<String>,
221    branch: Option<String>,
222    base: Option<String>,
223    task_id: Option<String>,
224    owner_run_id: Option<String>,
225    lease_id: Option<String>,
226    managed: Option<bool>,
227    cleanup_branch: Option<bool>,
228}
229
230#[derive(Debug, Deserialize, Default)]
231struct WorktreeListQuery {
232    repo_root: Option<String>,
233    managed_only: Option<bool>,
234}
235
236#[derive(Debug, Deserialize, Default)]
237struct LogInput {
238    level: Option<String>,
239    message: Option<String>,
240    context: Option<Value>,
241}
242
243#[derive(Debug, Serialize)]
244struct ErrorEnvelope {
245    error: String,
246    #[serde(skip_serializing_if = "Option::is_none")]
247    code: Option<String>,
248}
249
250pub async fn serve(addr: SocketAddr, state: AppState) -> anyhow::Result<()> {
251    let reaper_state = state.clone();
252    let session_part_persister_state = state.clone();
253    let session_context_run_journaler_state = state.clone();
254    let status_indexer_state = state.clone();
255    let routine_scheduler_state = state.clone();
256    let routine_executor_state = state.clone();
257    let usage_aggregator_state = state.clone();
258    let automation_v2_scheduler_state = state.clone();
259    let automation_v2_executor_state = state.clone();
260    let optimization_scheduler_state = state.clone();
261    let workflow_dispatcher_state = state.clone();
262    let agent_team_supervisor_state = state.clone();
263    let global_memory_ingestor_state = state.clone();
264    let bug_monitor_state = state.clone();
265    let governance_health_state = state.clone();
266    let mcp_bootstrap_state = state.clone();
267    tokio::spawn(async move {
268        bootstrap_mcp_servers_when_ready(mcp_bootstrap_state).await;
269    });
270    let app = app_router(state.clone());
271    let reaper = tokio::spawn(async move {
272        loop {
273            tokio::time::sleep(Duration::from_secs(5)).await;
274            let stale = reaper_state
275                .run_registry
276                .reap_stale(reaper_state.run_stale_ms)
277                .await;
278            for (session_id, run) in stale {
279                let _ = reaper_state.cancellations.cancel(&session_id).await;
280                let _ = reaper_state
281                    .close_browser_sessions_for_owner(&session_id)
282                    .await;
283                reaper_state.event_bus.publish(EngineEvent::new(
284                    "session.run.finished",
285                    json!({
286                        "sessionID": session_id,
287                        "runID": run.run_id,
288                        "finishedAtMs": crate::now_ms(),
289                        "status": "timeout",
290                    }),
291                ));
292            }
293        }
294    });
295    let session_part_persister = tokio::spawn(crate::run_session_part_persister(
296        session_part_persister_state,
297    ));
298    let session_context_run_journaler = tokio::spawn(crate::run_session_context_run_journaler(
299        session_context_run_journaler_state,
300    ));
301    let status_indexer = tokio::spawn(crate::run_status_indexer(status_indexer_state));
302    let routine_scheduler = tokio::spawn(crate::run_routine_scheduler(routine_scheduler_state));
303    let routine_executor = tokio::spawn(crate::run_routine_executor(routine_executor_state));
304    let usage_aggregator = tokio::spawn(crate::run_usage_aggregator(usage_aggregator_state));
305    let automation_v2_scheduler = tokio::spawn(crate::run_automation_v2_scheduler(
306        automation_v2_scheduler_state,
307    ));
308    let automation_v2_executor = tokio::spawn(crate::run_automation_v2_executor(
309        automation_v2_executor_state,
310    ));
311    let optimization_scheduler = tokio::spawn(crate::run_optimization_scheduler(
312        optimization_scheduler_state,
313    ));
314    let workflow_dispatcher =
315        tokio::spawn(crate::run_workflow_dispatcher(workflow_dispatcher_state));
316    let agent_team_supervisor = tokio::spawn(crate::run_agent_team_supervisor(
317        agent_team_supervisor_state,
318    ));
319    let bug_monitor = tokio::spawn(crate::run_bug_monitor(bug_monitor_state));
320    let global_memory_ingestor =
321        tokio::spawn(run_global_memory_ingestor(global_memory_ingestor_state));
322    let shutdown_state = state.clone();
323    let shutdown_timeout_secs = crate::config::env::resolve_scheduler_shutdown_timeout_secs();
324
325    // --- Memory hygiene background task (runs every 12 hours) ---
326    // Opens a fresh connection to memory.sqlite each cycle — safe because WAL
327    // mode allows concurrent readers alongside the main engine connection.
328    let hygiene_task = tokio::spawn(async move {
329        // Initial delay so startup is not impacted.
330        tokio::time::sleep(Duration::from_secs(60)).await;
331        loop {
332            let retention_days: u32 = std::env::var("TANDEM_MEMORY_RETENTION_DAYS")
333                .ok()
334                .and_then(|v| v.parse().ok())
335                .unwrap_or(30);
336            if retention_days > 0 {
337                match tandem_core::resolve_shared_paths() {
338                    Ok(paths) => {
339                        match tandem_memory::db::MemoryDatabase::new(&paths.memory_db_path).await {
340                            Ok(db) => {
341                                if let Err(e) = db.run_hygiene(retention_days).await {
342                                    tracing::warn!("memory hygiene failed: {}", e);
343                                }
344                            }
345                            Err(e) => tracing::warn!("memory hygiene: could not open DB: {}", e),
346                        }
347                    }
348                    Err(e) => tracing::warn!("memory hygiene: could not resolve paths: {}", e),
349                }
350            }
351            tokio::time::sleep(Duration::from_secs(12 * 60 * 60)).await;
352        }
353    });
354
355    // --- Automation v2 runs archiver (runs at startup, then every 24h) ---
356    // Moves terminal (completed/failed/blocked/cancelled) runs older than
357    // TANDEM_AUTOMATION_V2_RUNS_RETENTION_DAYS (default 7) from the hot runs
358    // file to a sidecar archive file. The hot file is rewritten on every run
359    // status change, so keeping it small is critical for persistence
360    // throughput. Without this, the file grows unbounded and state writes
361    // slow to the point that in-memory state lags on-disk state by minutes.
362    let archiver_state = state.clone();
363    let _automation_v2_archiver = tokio::spawn(async move {
364        // Wait for startup to reach Ready so runtime-backed state is safe.
365        loop {
366            if archiver_state.is_automation_scheduler_stopping() {
367                return;
368            }
369            let startup = archiver_state.startup_snapshot().await;
370            if matches!(startup.status, crate::app::startup::StartupStatus::Ready) {
371                break;
372            }
373            if matches!(startup.status, crate::app::startup::StartupStatus::Failed) {
374                return;
375            }
376            tokio::time::sleep(Duration::from_millis(250)).await;
377        }
378        loop {
379            let retention_days: u64 = std::env::var("TANDEM_AUTOMATION_V2_RUNS_RETENTION_DAYS")
380                .ok()
381                .and_then(|v| v.trim().parse().ok())
382                .unwrap_or(7);
383            if retention_days > 0 {
384                match archiver_state
385                    .archive_stale_automation_v2_runs(retention_days)
386                    .await
387                {
388                    Ok(n) if n > 0 => {
389                        tracing::info!(
390                            archived = n,
391                            retention_days,
392                            "automation v2 archiver: pruned stale terminal runs"
393                        );
394                    }
395                    Ok(_) => {}
396                    Err(e) => {
397                        tracing::warn!(error = %e, "automation v2 archiver: archive failed");
398                    }
399                }
400            }
401            tokio::time::sleep(Duration::from_secs(24 * 60 * 60)).await;
402        }
403    });
404
405    let automation_governance_health_checker = tokio::spawn(async move {
406        loop {
407            if governance_health_state.is_automation_scheduler_stopping() {
408                return;
409            }
410            let startup = governance_health_state.startup_snapshot().await;
411            if matches!(startup.status, crate::app::startup::StartupStatus::Ready) {
412                break;
413            }
414            if matches!(startup.status, crate::app::startup::StartupStatus::Failed) {
415                return;
416            }
417            tokio::time::sleep(Duration::from_millis(250)).await;
418        }
419        loop {
420            let interval_ms = governance_health_state
421                .automation_governance
422                .read()
423                .await
424                .limits
425                .health_check_interval_ms
426                .max(60 * 1000);
427            match governance_health_state
428                .run_automation_governance_health_check()
429                .await
430            {
431                Ok(count) if count > 0 => {
432                    tracing::info!(
433                        finding_count = count,
434                        "automation governance health check recorded findings"
435                    );
436                }
437                Ok(_) => {}
438                Err(error) => {
439                    tracing::warn!(error = %error, "automation governance health check failed");
440                }
441            }
442            tokio::time::sleep(Duration::from_millis(interval_ms)).await;
443        }
444    });
445
446    // Channel listeners are started during runtime initialization
447    // (`initialize_runtime()` in `engine/src/main.rs`) so `serve()` only owns
448    // the HTTP server lifecycle.
449    let listener = tokio::net::TcpListener::bind(addr).await?;
450    let result = axum::serve(listener, app)
451        .with_graceful_shutdown(async move {
452            if tokio::signal::ctrl_c().await.is_err() {
453                futures::future::pending::<()>().await;
454            }
455            shutdown_state.set_automation_scheduler_stopping(true);
456            tokio::time::sleep(Duration::from_secs(shutdown_timeout_secs)).await;
457            let failed = shutdown_state
458                .fail_running_automation_runs_for_shutdown()
459                .await;
460            if failed > 0 {
461                tracing::warn!(
462                    failed_runs = failed,
463                    "automation runs marked failed during scheduler shutdown"
464                );
465            }
466        })
467        .await;
468    reaper.abort();
469    session_part_persister.abort();
470    session_context_run_journaler.abort();
471    status_indexer.abort();
472    routine_scheduler.abort();
473    routine_executor.abort();
474    usage_aggregator.abort();
475    automation_v2_scheduler.abort();
476    automation_v2_executor.abort();
477    optimization_scheduler.abort();
478    workflow_dispatcher.abort();
479    agent_team_supervisor.abort();
480    bug_monitor.abort();
481    global_memory_ingestor.abort();
482    hygiene_task.abort();
483    automation_governance_health_checker.abort();
484    result?;
485    Ok(())
486}
487
488fn app_router(state: AppState) -> Router {
489    router::build_router(state)
490}
491fn load_run_events_jsonl(path: &FsPath, since_seq: Option<u64>, tail: Option<usize>) -> Vec<Value> {
492    let content = match std::fs::read_to_string(path) {
493        Ok(value) => value,
494        Err(_) => return Vec::new(),
495    };
496    let mut rows: Vec<Value> = content
497        .lines()
498        .filter_map(|line| serde_json::from_str::<Value>(line).ok())
499        .filter(|row| {
500            if let Some(since) = since_seq {
501                return row.get("seq").and_then(|value| value.as_u64()).unwrap_or(0) > since;
502            }
503            true
504        })
505        .collect();
506    rows.sort_by_key(|row| row.get("seq").and_then(|value| value.as_u64()).unwrap_or(0));
507    if let Some(tail_count) = tail {
508        if rows.len() > tail_count {
509            rows = rows.split_off(rows.len().saturating_sub(tail_count));
510        }
511    }
512    rows
513}
514
515pub(super) fn truncate_for_stream(input: &str, max_len: usize) -> String {
516    if input.len() <= max_len {
517        return input.to_string();
518    }
519    let mut end = 0usize;
520    for (idx, ch) in input.char_indices() {
521        let next = idx + ch.len_utf8();
522        if next > max_len {
523            break;
524        }
525        end = next;
526    }
527    let mut out = input[..end].to_string();
528    out.push_str("...<truncated>");
529    out
530}
531
532#[cfg(test)]
533mod tests;