Skip to main content

tandem_server/
http.rs

1use std::collections::HashMap;
2use std::net::SocketAddr;
3use std::path::Path as FsPath;
4use std::path::PathBuf;
5use std::sync::Arc;
6use std::time::Duration;
7
8use async_trait::async_trait;
9use axum::extract::Extension;
10use axum::extract::{Path, Query, State};
11use axum::http::header::{self, HeaderValue};
12use axum::http::{HeaderMap, StatusCode};
13use axum::response::sse::{Event, KeepAlive, Sse};
14use axum::response::IntoResponse;
15use axum::response::Response;
16use axum::{Json, Router};
17use futures::Stream;
18use regex::Regex;
19use serde::{Deserialize, Serialize};
20use serde_json::{json, Value};
21use sha2::{Digest, Sha256};
22use tandem_memory::types::GlobalMemoryRecord;
23use tandem_memory::{
24    db::MemoryDatabase, MemoryCapabilities, MemoryCapabilityToken, MemoryPromoteRequest,
25    MemoryPromoteResponse, MemoryPutRequest, MemoryPutResponse, MemorySearchRequest,
26    MemorySearchResponse, ScrubReport, ScrubStatus,
27};
28use tandem_skills::{SkillBundleArtifacts, SkillLocation, SkillService, SkillsConflictPolicy};
29use tokio_stream::wrappers::BroadcastStream;
30use tokio_stream::StreamExt;
31use uuid::Uuid;
32
33use tandem_tools::Tool;
34use tandem_types::{
35    CreateSessionRequest, EngineEvent, Message, MessagePart, MessagePartInput, MessageRole,
36    SendMessageRequest, Session, TenantContext, TodoItem, ToolResult, ToolSchema,
37};
38use tandem_wire::{WireSession, WireSessionMessage};
39
40use crate::ResourceStoreError;
41use crate::{
42    capability_resolver::{
43        classify_missing_required, providers_for_capability, CapabilityBindingsFile,
44        CapabilityBlockingIssue, CapabilityReadinessInput, CapabilityReadinessOutput,
45        CapabilityResolveInput,
46    },
47    mcp_catalog,
48    pack_manager::{PackExportRequest, PackInstallRequest, PackUninstallRequest},
49    ActiveRun, AppState, DiscordConfigFile, SlackConfigFile, TelegramConfigFile,
50};
51
52mod automation_projection_runtime;
53pub(crate) mod bug_monitor;
54mod capabilities;
55mod channels_api;
56mod coder;
57pub(crate) mod config_providers;
58pub(crate) mod context_packs;
59mod context_run_ledger;
60mod context_run_mutation_checkpoints;
61pub(crate) mod context_runs;
62pub(crate) mod context_types;
63mod enterprise;
64mod external_actions;
65mod global;
66pub(crate) mod governance;
67mod marketplace;
68pub(crate) mod mcp;
69pub(crate) mod mcp_discovery;
70mod middleware;
71mod mission_builder;
72mod mission_builder_host;
73mod mission_builder_runtime;
74mod missions_teams;
75mod optimizations;
76mod pack_builder;
77mod packs;
78mod permissions_questions;
79mod presets;
80mod resources;
81mod router;
82mod routes_bug_monitor;
83mod routes_capabilities;
84mod routes_coder;
85mod routes_config_providers;
86mod routes_context;
87mod routes_external_actions;
88mod routes_global;
89mod routes_governance;
90mod routes_marketplace;
91mod routes_mcp;
92mod routes_mission_builder;
93mod routes_missions_teams;
94mod routes_optimizations;
95mod routes_pack_builder;
96mod routes_packs;
97mod routes_permissions_questions;
98mod routes_presets;
99mod routes_resources;
100mod routes_routines_automations;
101mod routes_sessions;
102mod routes_setup_understanding;
103mod routes_skills_memory;
104mod routes_system_api;
105mod routes_task_intake;
106mod routes_workflow_planner;
107mod routes_workflows;
108pub(crate) mod routines_automations;
109mod sessions;
110mod setup_understanding;
111mod skills_memory;
112mod system_api;
113mod task_intake;
114pub(crate) mod workflow_planner;
115mod workflow_planner_host;
116mod workflow_planner_policy;
117pub(crate) mod workflow_planner_runtime;
118mod workflow_planner_transport;
119mod workflows;
120
121use capabilities::*;
122use context_runs::*;
123use context_types::*;
124use marketplace::*;
125use mcp::*;
126use pack_builder::*;
127use packs::*;
128use permissions_questions::*;
129use presets::*;
130use resources::*;
131use sessions::*;
132use setup_understanding::*;
133use skills_memory::*;
134use system_api::*;
135
136#[cfg(test)]
137pub(crate) use context_runs::session_context_run_id;
138pub(crate) use context_runs::sync_workflow_run_blackboard;
139#[cfg(test)]
140pub(crate) use context_runs::workflow_context_run_id;
141pub(crate) use workflow_planner_runtime::compile_plan_to_automation_v2;
142
143#[derive(Debug, Deserialize)]
144struct ListSessionsQuery {
145    q: Option<String>,
146    page: Option<usize>,
147    page_size: Option<usize>,
148    archived: Option<bool>,
149    scope: Option<SessionScope>,
150    workspace: Option<String>,
151}
152
153#[derive(Debug, Deserialize, Default)]
154struct EventFilterQuery {
155    #[serde(rename = "sessionID")]
156    session_id: Option<String>,
157    #[serde(rename = "runID")]
158    run_id: Option<String>,
159}
160
161#[derive(Debug, Deserialize, Default, Clone, Copy)]
162struct RunEventsQuery {
163    since_seq: Option<u64>,
164    tail: Option<usize>,
165}
166
167#[derive(Debug, Deserialize, Default)]
168struct PromptAsyncQuery {
169    r#return: Option<String>,
170}
171
172#[derive(Debug, Deserialize)]
173struct EngineLeaseAcquireInput {
174    client_id: Option<String>,
175    client_type: Option<String>,
176    ttl_ms: Option<u64>,
177}
178
179#[derive(Debug, Deserialize)]
180struct EngineLeaseRenewInput {
181    lease_id: String,
182}
183
184#[derive(Debug, Deserialize)]
185struct EngineLeaseReleaseInput {
186    lease_id: String,
187}
188
189#[derive(Debug, Deserialize, Default)]
190struct StorageRepairInput {
191    force: Option<bool>,
192}
193
194#[derive(Debug, Deserialize, Default)]
195struct StorageFilesQuery {
196    path: Option<String>,
197    limit: Option<usize>,
198}
199
200#[derive(Debug, Deserialize, Default)]
201struct UpdateSessionInput {
202    title: Option<String>,
203    archived: Option<bool>,
204    permission: Option<Vec<serde_json::Value>>,
205}
206
207#[derive(Debug, Deserialize)]
208struct AttachSessionInput {
209    target_workspace: String,
210    reason_tag: Option<String>,
211}
212
213#[derive(Debug, Deserialize)]
214struct WorkspaceOverrideInput {
215    ttl_seconds: Option<u64>,
216}
217
218#[derive(Debug, Deserialize, Default)]
219struct WorktreeInput {
220    repo_root: Option<String>,
221    path: Option<String>,
222    branch: Option<String>,
223    base: Option<String>,
224    task_id: Option<String>,
225    owner_run_id: Option<String>,
226    lease_id: Option<String>,
227    managed: Option<bool>,
228    cleanup_branch: Option<bool>,
229}
230
231#[derive(Debug, Deserialize, Default)]
232struct WorktreeListQuery {
233    repo_root: Option<String>,
234    managed_only: Option<bool>,
235}
236
237#[derive(Debug, Deserialize, Default)]
238struct LogInput {
239    level: Option<String>,
240    message: Option<String>,
241    context: Option<Value>,
242}
243
244#[derive(Debug, Serialize)]
245struct ErrorEnvelope {
246    error: String,
247    #[serde(skip_serializing_if = "Option::is_none")]
248    code: Option<String>,
249}
250
251pub async fn serve(addr: SocketAddr, state: AppState) -> anyhow::Result<()> {
252    let reaper_state = state.clone();
253    let session_part_persister_state = state.clone();
254    let session_context_run_journaler_state = state.clone();
255    let status_indexer_state = state.clone();
256    let routine_scheduler_state = state.clone();
257    let routine_executor_state = state.clone();
258    let usage_aggregator_state = state.clone();
259    let automation_v2_scheduler_state = state.clone();
260    let automation_v2_executor_state = state.clone();
261    let optimization_scheduler_state = state.clone();
262    let workflow_dispatcher_state = state.clone();
263    let agent_team_supervisor_state = state.clone();
264    let global_memory_ingestor_state = state.clone();
265    let bug_monitor_state = state.clone();
266    let governance_health_state = state.clone();
267    let mcp_bootstrap_state = state.clone();
268    tokio::spawn(async move {
269        bootstrap_mcp_servers_when_ready(mcp_bootstrap_state).await;
270    });
271    let app = app_router(state.clone());
272    let reaper = tokio::spawn(async move {
273        loop {
274            tokio::time::sleep(Duration::from_secs(5)).await;
275            let stale = reaper_state
276                .run_registry
277                .reap_stale(reaper_state.run_stale_ms)
278                .await;
279            for (session_id, run) in stale {
280                let _ = reaper_state.cancellations.cancel(&session_id).await;
281                let _ = reaper_state
282                    .close_browser_sessions_for_owner(&session_id)
283                    .await;
284                reaper_state.event_bus.publish(EngineEvent::new(
285                    "session.run.finished",
286                    json!({
287                        "sessionID": session_id,
288                        "runID": run.run_id,
289                        "finishedAtMs": crate::now_ms(),
290                        "status": "timeout",
291                    }),
292                ));
293            }
294        }
295    });
296    let session_part_persister = tokio::spawn(crate::run_session_part_persister(
297        session_part_persister_state,
298    ));
299    let session_context_run_journaler = tokio::spawn(crate::run_session_context_run_journaler(
300        session_context_run_journaler_state,
301    ));
302    let status_indexer = tokio::spawn(crate::run_status_indexer(status_indexer_state));
303    let routine_scheduler = tokio::spawn(crate::run_routine_scheduler(routine_scheduler_state));
304    let routine_executor = tokio::spawn(crate::run_routine_executor(routine_executor_state));
305    let usage_aggregator = tokio::spawn(crate::run_usage_aggregator(usage_aggregator_state));
306    let automation_v2_scheduler = tokio::spawn(crate::run_automation_v2_scheduler(
307        automation_v2_scheduler_state,
308    ));
309    let automation_v2_executor = tokio::spawn(crate::run_automation_v2_executor(
310        automation_v2_executor_state,
311    ));
312    let optimization_scheduler = tokio::spawn(crate::run_optimization_scheduler(
313        optimization_scheduler_state,
314    ));
315    let workflow_dispatcher =
316        tokio::spawn(crate::run_workflow_dispatcher(workflow_dispatcher_state));
317    let agent_team_supervisor = tokio::spawn(crate::run_agent_team_supervisor(
318        agent_team_supervisor_state,
319    ));
320    let bug_monitor = tokio::spawn(crate::run_bug_monitor(bug_monitor_state));
321    let global_memory_ingestor =
322        tokio::spawn(run_global_memory_ingestor(global_memory_ingestor_state));
323    let shutdown_state = state.clone();
324    let shutdown_timeout_secs = crate::config::env::resolve_scheduler_shutdown_timeout_secs();
325
326    // --- Memory hygiene background task (runs every 12 hours) ---
327    // Opens a fresh connection to memory.sqlite each cycle — safe because WAL
328    // mode allows concurrent readers alongside the main engine connection.
329    let hygiene_task = tokio::spawn(async move {
330        // Initial delay so startup is not impacted.
331        tokio::time::sleep(Duration::from_secs(60)).await;
332        loop {
333            let retention_days: u32 = std::env::var("TANDEM_MEMORY_RETENTION_DAYS")
334                .ok()
335                .and_then(|v| v.parse().ok())
336                .unwrap_or(30);
337            if retention_days > 0 {
338                match tandem_core::resolve_shared_paths() {
339                    Ok(paths) => {
340                        match tandem_memory::db::MemoryDatabase::new(&paths.memory_db_path).await {
341                            Ok(db) => {
342                                if let Err(e) = db.run_hygiene(retention_days).await {
343                                    tracing::warn!("memory hygiene failed: {}", e);
344                                }
345                            }
346                            Err(e) => tracing::warn!("memory hygiene: could not open DB: {}", e),
347                        }
348                    }
349                    Err(e) => tracing::warn!("memory hygiene: could not resolve paths: {}", e),
350                }
351            }
352            tokio::time::sleep(Duration::from_secs(12 * 60 * 60)).await;
353        }
354    });
355
356    // --- Automation v2 runs archiver (runs at startup, then every 24h) ---
357    // Moves terminal (completed/failed/blocked/cancelled) runs older than
358    // TANDEM_AUTOMATION_V2_RUNS_RETENTION_DAYS (default 7) from the hot runs
359    // file to a sidecar archive file. The hot file is rewritten on every run
360    // status change, so keeping it small is critical for persistence
361    // throughput. Without this, the file grows unbounded and state writes
362    // slow to the point that in-memory state lags on-disk state by minutes.
363    let archiver_state = state.clone();
364    let _automation_v2_archiver = tokio::spawn(async move {
365        // Wait for startup to reach Ready so runtime-backed state is safe.
366        loop {
367            if archiver_state.is_automation_scheduler_stopping() {
368                return;
369            }
370            let startup = archiver_state.startup_snapshot().await;
371            if matches!(startup.status, crate::app::startup::StartupStatus::Ready) {
372                break;
373            }
374            if matches!(startup.status, crate::app::startup::StartupStatus::Failed) {
375                return;
376            }
377            tokio::time::sleep(Duration::from_millis(250)).await;
378        }
379        loop {
380            let retention_days: u64 = std::env::var("TANDEM_AUTOMATION_V2_RUNS_RETENTION_DAYS")
381                .ok()
382                .and_then(|v| v.trim().parse().ok())
383                .unwrap_or(7);
384            if retention_days > 0 {
385                match archiver_state
386                    .archive_stale_automation_v2_runs(retention_days)
387                    .await
388                {
389                    Ok(n) if n > 0 => {
390                        tracing::info!(
391                            archived = n,
392                            retention_days,
393                            "automation v2 archiver: pruned stale terminal runs"
394                        );
395                    }
396                    Ok(_) => {}
397                    Err(e) => {
398                        tracing::warn!(error = %e, "automation v2 archiver: archive failed");
399                    }
400                }
401            }
402            tokio::time::sleep(Duration::from_secs(24 * 60 * 60)).await;
403        }
404    });
405
406    let automation_governance_health_checker = tokio::spawn(async move {
407        loop {
408            if governance_health_state.is_automation_scheduler_stopping() {
409                return;
410            }
411            let startup = governance_health_state.startup_snapshot().await;
412            if matches!(startup.status, crate::app::startup::StartupStatus::Ready) {
413                break;
414            }
415            if matches!(startup.status, crate::app::startup::StartupStatus::Failed) {
416                return;
417            }
418            tokio::time::sleep(Duration::from_millis(250)).await;
419        }
420        loop {
421            let interval_ms = governance_health_state
422                .automation_governance
423                .read()
424                .await
425                .limits
426                .health_check_interval_ms
427                .max(60 * 1000);
428            match governance_health_state
429                .run_automation_governance_health_check()
430                .await
431            {
432                Ok(count) if count > 0 => {
433                    tracing::info!(
434                        finding_count = count,
435                        "automation governance health check recorded findings"
436                    );
437                }
438                Ok(_) => {}
439                Err(error) => {
440                    tracing::warn!(error = %error, "automation governance health check failed");
441                }
442            }
443            tokio::time::sleep(Duration::from_millis(interval_ms)).await;
444        }
445    });
446
447    // Channel listeners are started during runtime initialization
448    // (`initialize_runtime()` in `engine/src/main.rs`) so `serve()` only owns
449    // the HTTP server lifecycle.
450    let listener = tokio::net::TcpListener::bind(addr).await?;
451    let result = axum::serve(listener, app)
452        .with_graceful_shutdown(async move {
453            if tokio::signal::ctrl_c().await.is_err() {
454                futures::future::pending::<()>().await;
455            }
456            shutdown_state.set_automation_scheduler_stopping(true);
457            tokio::time::sleep(Duration::from_secs(shutdown_timeout_secs)).await;
458            let failed = shutdown_state
459                .fail_running_automation_runs_for_shutdown()
460                .await;
461            if failed > 0 {
462                tracing::warn!(
463                    failed_runs = failed,
464                    "automation runs marked failed during scheduler shutdown"
465                );
466            }
467        })
468        .await;
469    reaper.abort();
470    session_part_persister.abort();
471    session_context_run_journaler.abort();
472    status_indexer.abort();
473    routine_scheduler.abort();
474    routine_executor.abort();
475    usage_aggregator.abort();
476    automation_v2_scheduler.abort();
477    automation_v2_executor.abort();
478    optimization_scheduler.abort();
479    workflow_dispatcher.abort();
480    agent_team_supervisor.abort();
481    bug_monitor.abort();
482    global_memory_ingestor.abort();
483    hygiene_task.abort();
484    automation_governance_health_checker.abort();
485    result?;
486    Ok(())
487}
488
489fn app_router(state: AppState) -> Router {
490    router::build_router(state)
491}
492fn load_run_events_jsonl(path: &FsPath, since_seq: Option<u64>, tail: Option<usize>) -> Vec<Value> {
493    let content = match std::fs::read_to_string(path) {
494        Ok(value) => value,
495        Err(_) => return Vec::new(),
496    };
497    let mut rows: Vec<Value> = content
498        .lines()
499        .filter_map(|line| serde_json::from_str::<Value>(line).ok())
500        .filter(|row| {
501            if let Some(since) = since_seq {
502                return row.get("seq").and_then(|value| value.as_u64()).unwrap_or(0) > since;
503            }
504            true
505        })
506        .collect();
507    rows.sort_by_key(|row| row.get("seq").and_then(|value| value.as_u64()).unwrap_or(0));
508    if let Some(tail_count) = tail {
509        if rows.len() > tail_count {
510            rows = rows.split_off(rows.len().saturating_sub(tail_count));
511        }
512    }
513    rows
514}
515
516pub(super) fn truncate_for_stream(input: &str, max_len: usize) -> String {
517    if input.len() <= max_len {
518        return input.to_string();
519    }
520    let mut end = 0usize;
521    for (idx, ch) in input.char_indices() {
522        let next = idx + ch.len_utf8();
523        if next > max_len {
524            break;
525        }
526        end = next;
527    }
528    let mut out = input[..end].to_string();
529    out.push_str("...<truncated>");
530    out
531}
532
533#[cfg(test)]
534mod tests;