Skip to main content

tandem_server/
http.rs

1use std::collections::HashMap;
2use std::net::SocketAddr;
3use std::path::Path as FsPath;
4use std::path::PathBuf;
5use std::sync::Arc;
6use std::time::Duration;
7
8use async_trait::async_trait;
9use axum::extract::Extension;
10use axum::extract::{Path, Query, State};
11use axum::http::header::{self, HeaderValue};
12use axum::http::{HeaderMap, StatusCode};
13use axum::response::sse::{Event, KeepAlive, Sse};
14use axum::response::IntoResponse;
15use axum::response::Response;
16use axum::{Json, Router};
17use futures::Stream;
18use regex::Regex;
19use serde::{Deserialize, Serialize};
20use serde_json::{json, Value};
21use sha2::{Digest, Sha256};
22use tandem_memory::types::GlobalMemoryRecord;
23use tandem_memory::{
24    db::MemoryDatabase, MemoryCapabilities, MemoryCapabilityToken, MemoryPromoteRequest,
25    MemoryPromoteResponse, MemoryPutRequest, MemoryPutResponse, MemorySearchRequest,
26    MemorySearchResponse, ScrubReport, ScrubStatus,
27};
28use tandem_skills::{SkillBundleArtifacts, SkillLocation, SkillService, SkillsConflictPolicy};
29use tokio_stream::wrappers::BroadcastStream;
30use tokio_stream::StreamExt;
31use uuid::Uuid;
32
33use tandem_tools::Tool;
34use tandem_types::{
35    CreateSessionRequest, EngineEvent, Message, MessagePart, MessagePartInput, MessageRole,
36    SendMessageRequest, Session, TenantContext, TodoItem, ToolResult, ToolSchema,
37};
38use tandem_wire::{WireSession, WireSessionMessage};
39
40use crate::ResourceStoreError;
41use crate::{
42    capability_resolver::{
43        classify_missing_required, providers_for_capability, CapabilityBindingsFile,
44        CapabilityBlockingIssue, CapabilityReadinessInput, CapabilityReadinessOutput,
45        CapabilityResolveInput,
46    },
47    mcp_catalog,
48    pack_manager::{PackExportRequest, PackInstallRequest, PackUninstallRequest},
49    ActiveRun, AppState, DiscordConfigFile, SlackConfigFile, TelegramConfigFile,
50};
51
52mod approvals;
53mod automation_projection_runtime;
54pub(crate) mod bug_monitor;
55mod capabilities;
56pub(crate) mod channel_automation_drafts;
57mod channels_api;
58mod coder;
59pub(crate) mod config_providers;
60pub(crate) mod context_packs;
61mod context_run_ledger;
62mod context_run_mutation_checkpoints;
63pub(crate) mod context_runs;
64pub(crate) mod context_types;
65mod discord_interactions;
66mod enterprise;
67mod external_actions;
68mod global;
69pub(crate) mod governance;
70mod marketplace;
71pub(crate) mod mcp;
72pub(crate) mod mcp_discovery;
73mod middleware;
74mod mission_builder;
75mod mission_builder_host;
76mod mission_builder_runtime;
77mod missions_teams;
78mod optimizations;
79mod pack_builder;
80mod packs;
81mod permissions_questions;
82mod presets;
83mod resources;
84mod router;
85mod routes_approvals;
86mod routes_bug_monitor;
87mod routes_capabilities;
88mod routes_channel_automation_drafts;
89mod routes_coder;
90mod routes_config_providers;
91mod routes_context;
92mod routes_external_actions;
93mod routes_global;
94mod routes_governance;
95mod routes_marketplace;
96mod routes_mcp;
97mod routes_mission_builder;
98mod routes_missions_teams;
99mod routes_optimizations;
100mod routes_pack_builder;
101mod routes_packs;
102mod routes_permissions_questions;
103mod routes_presets;
104mod routes_resources;
105mod routes_routines_automations;
106mod routes_sessions;
107mod routes_setup_understanding;
108mod routes_skills_memory;
109mod routes_system_api;
110mod routes_task_intake;
111mod routes_workflow_planner;
112mod routes_workflows;
113pub(crate) mod routines_automations;
114mod session_kb_grounding;
115mod sessions;
116mod setup_understanding;
117mod skills_memory;
118mod slack_interactions;
119mod system_api;
120mod task_intake;
121mod telegram_interactions;
122pub(crate) mod workflow_planner;
123mod workflow_planner_host;
124mod workflow_planner_policy;
125pub(crate) mod workflow_planner_runtime;
126mod workflow_planner_transport;
127mod workflows;
128
129use capabilities::*;
130use context_runs::*;
131use context_types::*;
132use marketplace::*;
133use mcp::*;
134use pack_builder::*;
135use packs::*;
136use permissions_questions::*;
137use presets::*;
138use resources::*;
139use sessions::*;
140use setup_understanding::*;
141use skills_memory::*;
142use system_api::*;
143
144#[cfg(test)]
145pub(crate) use context_runs::session_context_run_id;
146pub(crate) use context_runs::sync_workflow_run_blackboard;
147#[cfg(test)]
148pub(crate) use context_runs::workflow_context_run_id;
149pub(crate) use workflow_planner_runtime::compile_plan_to_automation_v2;
150
151#[derive(Debug, Deserialize)]
152struct ListSessionsQuery {
153    q: Option<String>,
154    page: Option<usize>,
155    page_size: Option<usize>,
156    archived: Option<bool>,
157    scope: Option<SessionScope>,
158    workspace: Option<String>,
159}
160
161#[derive(Debug, Deserialize, Default)]
162struct EventFilterQuery {
163    #[serde(rename = "sessionID")]
164    session_id: Option<String>,
165    #[serde(rename = "runID")]
166    run_id: Option<String>,
167}
168
169#[derive(Debug, Deserialize, Default, Clone, Copy)]
170struct RunEventsQuery {
171    since_seq: Option<u64>,
172    tail: Option<usize>,
173}
174
175#[derive(Debug, Deserialize, Default)]
176struct PromptAsyncQuery {
177    r#return: Option<String>,
178}
179
180#[derive(Debug, Deserialize)]
181struct EngineLeaseAcquireInput {
182    client_id: Option<String>,
183    client_type: Option<String>,
184    ttl_ms: Option<u64>,
185}
186
187#[derive(Debug, Deserialize)]
188struct EngineLeaseRenewInput {
189    lease_id: String,
190}
191
192#[derive(Debug, Deserialize)]
193struct EngineLeaseReleaseInput {
194    lease_id: String,
195}
196
197#[derive(Debug, Deserialize, Default)]
198struct StorageRepairInput {
199    force: Option<bool>,
200}
201
202#[derive(Debug, Deserialize, Default)]
203struct StorageFilesQuery {
204    path: Option<String>,
205    limit: Option<usize>,
206}
207
208#[derive(Debug, Deserialize, Default)]
209struct UpdateSessionInput {
210    title: Option<String>,
211    archived: Option<bool>,
212    permission: Option<Vec<serde_json::Value>>,
213}
214
215#[derive(Debug, Deserialize)]
216struct AttachSessionInput {
217    target_workspace: String,
218    reason_tag: Option<String>,
219}
220
221#[derive(Debug, Deserialize)]
222struct WorkspaceOverrideInput {
223    ttl_seconds: Option<u64>,
224}
225
226#[derive(Debug, Deserialize, Default)]
227struct WorktreeInput {
228    repo_root: Option<String>,
229    path: Option<String>,
230    branch: Option<String>,
231    base: Option<String>,
232    task_id: Option<String>,
233    owner_run_id: Option<String>,
234    lease_id: Option<String>,
235    managed: Option<bool>,
236    cleanup_branch: Option<bool>,
237}
238
239#[derive(Debug, Deserialize, Default)]
240struct WorktreeListQuery {
241    repo_root: Option<String>,
242    managed_only: Option<bool>,
243}
244
245#[derive(Debug, Deserialize, Default)]
246struct WorktreeCleanupInput {
247    repo_root: Option<String>,
248    #[serde(default)]
249    dry_run: Option<bool>,
250    #[serde(default)]
251    remove_orphan_dirs: Option<bool>,
252}
253
254#[derive(Debug, Deserialize, Default)]
255struct LogInput {
256    level: Option<String>,
257    message: Option<String>,
258    context: Option<Value>,
259}
260
261#[derive(Debug, Serialize)]
262struct ErrorEnvelope {
263    error: String,
264    #[serde(skip_serializing_if = "Option::is_none")]
265    code: Option<String>,
266}
267
268pub async fn serve(addr: SocketAddr, state: AppState) -> anyhow::Result<()> {
269    let reaper_state = state.clone();
270    let session_part_persister_state = state.clone();
271    let session_context_run_journaler_state = state.clone();
272    let status_indexer_state = state.clone();
273    let routine_scheduler_state = state.clone();
274    let routine_executor_state = state.clone();
275    let usage_aggregator_state = state.clone();
276    let automation_v2_scheduler_state = state.clone();
277    let automation_v2_executor_state = state.clone();
278    let optimization_scheduler_state = state.clone();
279    let workflow_dispatcher_state = state.clone();
280    let agent_team_supervisor_state = state.clone();
281    let global_memory_ingestor_state = state.clone();
282    let bug_monitor_state = state.clone();
283    let bug_monitor_recovery_sweep_state = state.clone();
284    let governance_health_state = state.clone();
285    let mcp_bootstrap_state = state.clone();
286    tokio::spawn(async move {
287        bootstrap_mcp_servers_when_ready(mcp_bootstrap_state).await;
288    });
289    let app = app_router(state.clone());
290    let reaper = tokio::spawn(async move {
291        if !reaper_state.wait_until_ready_or_failed(120, 250).await {
292            let startup = reaper_state.startup_snapshot().await;
293            tracing::warn!(
294                component = "run_reaper",
295                startup_status = ?startup.status,
296                startup_phase = %startup.phase,
297                attempt_id = %startup.attempt_id,
298                "run reaper exiting before runtime access because startup did not become ready"
299            );
300            return;
301        }
302        loop {
303            tokio::time::sleep(Duration::from_secs(5)).await;
304            let stale = reaper_state
305                .run_registry
306                .reap_stale(reaper_state.run_stale_ms)
307                .await;
308            for (session_id, run) in stale {
309                let _ = reaper_state.cancellations.cancel(&session_id).await;
310                let _ = reaper_state
311                    .close_browser_sessions_for_owner(&session_id)
312                    .await;
313                reaper_state.event_bus.publish(EngineEvent::new(
314                    "session.run.finished",
315                    json!({
316                        "sessionID": session_id,
317                        "runID": run.run_id,
318                        "finishedAtMs": crate::now_ms(),
319                        "status": "timeout",
320                    }),
321                ));
322            }
323        }
324    });
325    let session_part_persister = tokio::spawn(crate::run_session_part_persister(
326        session_part_persister_state,
327    ));
328    let session_context_run_journaler = tokio::spawn(crate::run_session_context_run_journaler(
329        session_context_run_journaler_state,
330    ));
331    let status_indexer = tokio::spawn(crate::run_status_indexer(status_indexer_state));
332    let routine_scheduler = tokio::spawn(crate::run_routine_scheduler(routine_scheduler_state));
333    let routine_executor = tokio::spawn(crate::run_routine_executor(routine_executor_state));
334    let usage_aggregator = tokio::spawn(crate::run_usage_aggregator(usage_aggregator_state));
335    let automation_v2_scheduler = tokio::spawn(crate::run_automation_v2_scheduler(
336        automation_v2_scheduler_state,
337    ));
338    let automation_v2_executor = tokio::spawn(crate::run_automation_v2_executor(
339        automation_v2_executor_state,
340    ));
341    let optimization_scheduler = tokio::spawn(crate::run_optimization_scheduler(
342        optimization_scheduler_state,
343    ));
344    let workflow_dispatcher =
345        tokio::spawn(crate::run_workflow_dispatcher(workflow_dispatcher_state));
346    let agent_team_supervisor = tokio::spawn(crate::run_agent_team_supervisor(
347        agent_team_supervisor_state,
348    ));
349    let bug_monitor = tokio::spawn(crate::run_bug_monitor(bug_monitor_state));
350    let bug_monitor_recovery_sweep = tokio::spawn(crate::run_bug_monitor_recovery_sweep(
351        bug_monitor_recovery_sweep_state,
352    ));
353    let global_memory_ingestor =
354        tokio::spawn(run_global_memory_ingestor(global_memory_ingestor_state));
355    let shutdown_state = state.clone();
356    let shutdown_timeout_secs = crate::config::env::resolve_scheduler_shutdown_timeout_secs();
357
358    // --- Memory hygiene background task (runs every 12 hours) ---
359    // Opens a fresh connection to memory.sqlite each cycle — safe because WAL
360    // mode allows concurrent readers alongside the main engine connection.
361    let hygiene_task = tokio::spawn(async move {
362        // Initial delay so startup is not impacted.
363        tokio::time::sleep(Duration::from_secs(60)).await;
364        loop {
365            let retention_days: u32 = std::env::var("TANDEM_MEMORY_RETENTION_DAYS")
366                .ok()
367                .and_then(|v| v.parse().ok())
368                .unwrap_or(30);
369            if retention_days > 0 {
370                match tandem_core::resolve_shared_paths() {
371                    Ok(paths) => {
372                        match tandem_memory::db::MemoryDatabase::new(&paths.memory_db_path).await {
373                            Ok(db) => {
374                                if let Err(e) = db.run_hygiene(retention_days).await {
375                                    tracing::warn!("memory hygiene failed: {}", e);
376                                }
377                            }
378                            Err(e) => tracing::warn!("memory hygiene: could not open DB: {}", e),
379                        }
380                    }
381                    Err(e) => tracing::warn!("memory hygiene: could not resolve paths: {}", e),
382                }
383            }
384            tokio::time::sleep(Duration::from_secs(12 * 60 * 60)).await;
385        }
386    });
387
388    // --- Automation v2 runs archiver (runs at startup, then every 24h) ---
389    // Moves terminal (completed/failed/blocked/cancelled) runs older than
390    // TANDEM_AUTOMATION_V2_RUNS_RETENTION_DAYS (default 7) from the hot runs
391    // file to a sidecar archive file. The hot file is rewritten on every run
392    // status change, so keeping it small is critical for persistence
393    // throughput. Without this, the file grows unbounded and state writes
394    // slow to the point that in-memory state lags on-disk state by minutes.
395    let archiver_state = state.clone();
396    let _automation_v2_archiver = tokio::spawn(async move {
397        // Wait for startup to reach Ready so runtime-backed state is safe.
398        loop {
399            if archiver_state.is_automation_scheduler_stopping() {
400                return;
401            }
402            let startup = archiver_state.startup_snapshot().await;
403            if matches!(startup.status, crate::app::startup::StartupStatus::Ready) {
404                break;
405            }
406            if matches!(startup.status, crate::app::startup::StartupStatus::Failed) {
407                return;
408            }
409            tokio::time::sleep(Duration::from_millis(250)).await;
410        }
411        loop {
412            let retention_days: u64 = std::env::var("TANDEM_AUTOMATION_V2_RUNS_RETENTION_DAYS")
413                .ok()
414                .and_then(|v| v.trim().parse().ok())
415                .unwrap_or(7);
416            if retention_days > 0 {
417                match archiver_state
418                    .archive_stale_automation_v2_runs(retention_days)
419                    .await
420                {
421                    Ok(n) if n > 0 => {
422                        tracing::info!(
423                            archived = n,
424                            retention_days,
425                            "automation v2 archiver: pruned stale terminal runs"
426                        );
427                    }
428                    Ok(_) => {}
429                    Err(e) => {
430                        tracing::warn!(error = %e, "automation v2 archiver: archive failed");
431                    }
432                }
433            }
434            tokio::time::sleep(Duration::from_secs(24 * 60 * 60)).await;
435        }
436    });
437
438    let automation_governance_health_checker = tokio::spawn(async move {
439        loop {
440            if governance_health_state.is_automation_scheduler_stopping() {
441                return;
442            }
443            let startup = governance_health_state.startup_snapshot().await;
444            if matches!(startup.status, crate::app::startup::StartupStatus::Ready) {
445                break;
446            }
447            if matches!(startup.status, crate::app::startup::StartupStatus::Failed) {
448                return;
449            }
450            tokio::time::sleep(Duration::from_millis(250)).await;
451        }
452        loop {
453            let interval_ms = governance_health_state
454                .automation_governance
455                .read()
456                .await
457                .limits
458                .health_check_interval_ms
459                .max(60 * 1000);
460            match governance_health_state
461                .run_automation_governance_health_check()
462                .await
463            {
464                Ok(count) if count > 0 => {
465                    tracing::info!(
466                        finding_count = count,
467                        "automation governance health check recorded findings"
468                    );
469                }
470                Ok(_) => {}
471                Err(error) => {
472                    tracing::warn!(error = %error, "automation governance health check failed");
473                }
474            }
475            tokio::time::sleep(Duration::from_millis(interval_ms)).await;
476        }
477    });
478
479    // Channel listeners are started during runtime initialization
480    // (`initialize_runtime()` in `engine/src/main.rs`) so `serve()` only owns
481    // the HTTP server lifecycle.
482    let listener = tokio::net::TcpListener::bind(addr).await?;
483    let result = axum::serve(listener, app)
484        .with_graceful_shutdown(async move {
485            if tokio::signal::ctrl_c().await.is_err() {
486                futures::future::pending::<()>().await;
487            }
488            shutdown_state.set_automation_scheduler_stopping(true);
489            tokio::time::sleep(Duration::from_secs(shutdown_timeout_secs)).await;
490            let failed = shutdown_state
491                .fail_running_automation_runs_for_shutdown()
492                .await;
493            if failed > 0 {
494                tracing::warn!(
495                    failed_runs = failed,
496                    "automation runs marked failed during scheduler shutdown"
497                );
498            }
499        })
500        .await;
501    reaper.abort();
502    session_part_persister.abort();
503    session_context_run_journaler.abort();
504    status_indexer.abort();
505    routine_scheduler.abort();
506    routine_executor.abort();
507    usage_aggregator.abort();
508    automation_v2_scheduler.abort();
509    automation_v2_executor.abort();
510    optimization_scheduler.abort();
511    workflow_dispatcher.abort();
512    agent_team_supervisor.abort();
513    bug_monitor.abort();
514    bug_monitor_recovery_sweep.abort();
515    global_memory_ingestor.abort();
516    hygiene_task.abort();
517    automation_governance_health_checker.abort();
518    result?;
519    Ok(())
520}
521
522fn app_router(state: AppState) -> Router {
523    router::build_router(state)
524}
525fn load_run_events_jsonl(path: &FsPath, since_seq: Option<u64>, tail: Option<usize>) -> Vec<Value> {
526    let content = match std::fs::read_to_string(path) {
527        Ok(value) => value,
528        Err(_) => return Vec::new(),
529    };
530    let mut rows: Vec<Value> = content
531        .lines()
532        .filter_map(|line| serde_json::from_str::<Value>(line).ok())
533        .filter(|row| {
534            if let Some(since) = since_seq {
535                return row.get("seq").and_then(|value| value.as_u64()).unwrap_or(0) > since;
536            }
537            true
538        })
539        .collect();
540    rows.sort_by_key(|row| row.get("seq").and_then(|value| value.as_u64()).unwrap_or(0));
541    if let Some(tail_count) = tail {
542        if rows.len() > tail_count {
543            rows = rows.split_off(rows.len().saturating_sub(tail_count));
544        }
545    }
546    rows
547}
548
549pub(super) fn truncate_for_stream(input: &str, max_len: usize) -> String {
550    if input.len() <= max_len {
551        return input.to_string();
552    }
553    let mut end = 0usize;
554    for (idx, ch) in input.char_indices() {
555        let next = idx + ch.len_utf8();
556        if next > max_len {
557            break;
558        }
559        end = next;
560    }
561    let mut out = input[..end].to_string();
562    out.push_str("...<truncated>");
563    out
564}
565
566#[cfg(test)]
567mod tests;