Skip to main content

tandem_server/
http.rs

1use std::collections::HashMap;
2use std::net::SocketAddr;
3use std::path::Path as FsPath;
4use std::path::PathBuf;
5use std::sync::Arc;
6use std::time::Duration;
7
8use async_trait::async_trait;
9use axum::extract::Extension;
10use axum::extract::{Path, Query, State};
11use axum::http::header::{self, HeaderValue};
12use axum::http::{HeaderMap, StatusCode};
13use axum::response::sse::{Event, KeepAlive, Sse};
14use axum::response::IntoResponse;
15use axum::response::Response;
16use axum::{Json, Router};
17use futures::Stream;
18use regex::Regex;
19use serde::{Deserialize, Serialize};
20use serde_json::{json, Value};
21use sha2::{Digest, Sha256};
22use tandem_memory::types::GlobalMemoryRecord;
23use tandem_memory::{
24    db::MemoryDatabase, MemoryCapabilities, MemoryCapabilityToken, MemoryPromoteRequest,
25    MemoryPromoteResponse, MemoryPutRequest, MemoryPutResponse, MemorySearchRequest,
26    MemorySearchResponse, ScrubReport, ScrubStatus,
27};
28use tandem_skills::{SkillBundleArtifacts, SkillLocation, SkillService, SkillsConflictPolicy};
29use tokio_stream::wrappers::BroadcastStream;
30use tokio_stream::StreamExt;
31use uuid::Uuid;
32
33use tandem_channels::start_channel_listeners;
34use tandem_tools::Tool;
35use tandem_types::{
36    CreateSessionRequest, EngineEvent, Message, MessagePart, MessagePartInput, MessageRole,
37    SendMessageRequest, Session, TenantContext, TodoItem, ToolResult, ToolSchema,
38};
39use tandem_wire::{WireSession, WireSessionMessage};
40
41use crate::ResourceStoreError;
42use crate::{
43    capability_resolver::{
44        classify_missing_required, providers_for_capability, CapabilityBindingsFile,
45        CapabilityBlockingIssue, CapabilityReadinessInput, CapabilityReadinessOutput,
46        CapabilityResolveInput,
47    },
48    mcp_catalog,
49    pack_manager::{PackExportRequest, PackInstallRequest, PackUninstallRequest},
50    ActiveRun, AppState, ChannelStatus, DiscordConfigFile, SlackConfigFile, TelegramConfigFile,
51};
52
53mod automation_projection_runtime;
54pub(crate) mod bug_monitor;
55mod capabilities;
56mod channels_api;
57mod coder;
58mod config_providers;
59pub(crate) mod context_packs;
60mod context_run_ledger;
61mod context_run_mutation_checkpoints;
62pub(crate) mod context_runs;
63pub(crate) mod context_types;
64mod enterprise;
65mod external_actions;
66mod global;
67mod marketplace;
68pub(crate) mod mcp;
69mod middleware;
70mod mission_builder;
71mod mission_builder_host;
72mod mission_builder_runtime;
73mod missions_teams;
74mod optimizations;
75mod pack_builder;
76mod packs;
77mod permissions_questions;
78mod presets;
79mod resources;
80mod router;
81mod routes_bug_monitor;
82mod routes_capabilities;
83mod routes_coder;
84mod routes_config_providers;
85mod routes_context;
86mod routes_external_actions;
87mod routes_global;
88mod routes_marketplace;
89mod routes_mcp;
90mod routes_mission_builder;
91mod routes_missions_teams;
92mod routes_optimizations;
93mod routes_pack_builder;
94mod routes_packs;
95mod routes_permissions_questions;
96mod routes_presets;
97mod routes_resources;
98mod routes_routines_automations;
99mod routes_sessions;
100mod routes_setup_understanding;
101mod routes_skills_memory;
102mod routes_system_api;
103mod routes_task_intake;
104mod routes_workflow_planner;
105mod routes_workflows;
106pub(crate) mod routines_automations;
107mod sessions;
108mod setup_understanding;
109mod skills_memory;
110mod system_api;
111mod task_intake;
112pub(crate) mod workflow_planner;
113mod workflow_planner_host;
114mod workflow_planner_policy;
115pub(crate) mod workflow_planner_runtime;
116mod workflow_planner_transport;
117mod workflows;
118
119use capabilities::*;
120use context_runs::*;
121use context_types::*;
122use marketplace::*;
123use mcp::*;
124use pack_builder::*;
125use packs::*;
126use permissions_questions::*;
127use presets::*;
128use resources::*;
129use sessions::*;
130use setup_understanding::*;
131use skills_memory::*;
132use system_api::*;
133
134#[cfg(test)]
135pub(crate) use context_runs::session_context_run_id;
136pub(crate) use context_runs::sync_workflow_run_blackboard;
137#[cfg(test)]
138pub(crate) use context_runs::workflow_context_run_id;
139pub(crate) use workflow_planner_runtime::compile_plan_to_automation_v2;
140
141#[derive(Debug, Deserialize)]
142struct ListSessionsQuery {
143    q: Option<String>,
144    page: Option<usize>,
145    page_size: Option<usize>,
146    archived: Option<bool>,
147    scope: Option<SessionScope>,
148    workspace: Option<String>,
149}
150
151#[derive(Debug, Deserialize, Default)]
152struct EventFilterQuery {
153    #[serde(rename = "sessionID")]
154    session_id: Option<String>,
155    #[serde(rename = "runID")]
156    run_id: Option<String>,
157}
158
159#[derive(Debug, Deserialize, Default, Clone, Copy)]
160struct RunEventsQuery {
161    since_seq: Option<u64>,
162    tail: Option<usize>,
163}
164
165#[derive(Debug, Deserialize, Default)]
166struct PromptAsyncQuery {
167    r#return: Option<String>,
168}
169
170#[derive(Debug, Deserialize)]
171struct EngineLeaseAcquireInput {
172    client_id: Option<String>,
173    client_type: Option<String>,
174    ttl_ms: Option<u64>,
175}
176
177#[derive(Debug, Deserialize)]
178struct EngineLeaseRenewInput {
179    lease_id: String,
180}
181
182#[derive(Debug, Deserialize)]
183struct EngineLeaseReleaseInput {
184    lease_id: String,
185}
186
187#[derive(Debug, Deserialize, Default)]
188struct StorageRepairInput {
189    force: Option<bool>,
190}
191
192#[derive(Debug, Deserialize, Default)]
193struct StorageFilesQuery {
194    path: Option<String>,
195    limit: Option<usize>,
196}
197
198#[derive(Debug, Deserialize, Default)]
199struct UpdateSessionInput {
200    title: Option<String>,
201    archived: Option<bool>,
202    permission: Option<Vec<serde_json::Value>>,
203}
204
205#[derive(Debug, Deserialize)]
206struct AttachSessionInput {
207    target_workspace: String,
208    reason_tag: Option<String>,
209}
210
211#[derive(Debug, Deserialize)]
212struct WorkspaceOverrideInput {
213    ttl_seconds: Option<u64>,
214}
215
216#[derive(Debug, Deserialize, Default)]
217struct WorktreeInput {
218    repo_root: Option<String>,
219    path: Option<String>,
220    branch: Option<String>,
221    base: Option<String>,
222    task_id: Option<String>,
223    owner_run_id: Option<String>,
224    lease_id: Option<String>,
225    managed: Option<bool>,
226    cleanup_branch: Option<bool>,
227}
228
229#[derive(Debug, Deserialize, Default)]
230struct WorktreeListQuery {
231    repo_root: Option<String>,
232    managed_only: Option<bool>,
233}
234
235#[derive(Debug, Deserialize, Default)]
236struct LogInput {
237    level: Option<String>,
238    message: Option<String>,
239    context: Option<Value>,
240}
241
242#[derive(Debug, Serialize)]
243struct ErrorEnvelope {
244    error: String,
245    #[serde(skip_serializing_if = "Option::is_none")]
246    code: Option<String>,
247}
248
249pub async fn serve(addr: SocketAddr, state: AppState) -> anyhow::Result<()> {
250    let reaper_state = state.clone();
251    let session_part_persister_state = state.clone();
252    let session_context_run_journaler_state = state.clone();
253    let status_indexer_state = state.clone();
254    let routine_scheduler_state = state.clone();
255    let routine_executor_state = state.clone();
256    let usage_aggregator_state = state.clone();
257    let automation_v2_scheduler_state = state.clone();
258    let automation_v2_executor_state = state.clone();
259    let optimization_scheduler_state = state.clone();
260    let workflow_dispatcher_state = state.clone();
261    let agent_team_supervisor_state = state.clone();
262    let global_memory_ingestor_state = state.clone();
263    let bug_monitor_state = state.clone();
264    let mcp_bootstrap_state = state.clone();
265    tokio::spawn(async move {
266        bootstrap_mcp_servers_when_ready(mcp_bootstrap_state).await;
267    });
268    let app = app_router(state.clone());
269    let reaper = tokio::spawn(async move {
270        loop {
271            tokio::time::sleep(Duration::from_secs(5)).await;
272            let stale = reaper_state
273                .run_registry
274                .reap_stale(reaper_state.run_stale_ms)
275                .await;
276            for (session_id, run) in stale {
277                let _ = reaper_state.cancellations.cancel(&session_id).await;
278                let _ = reaper_state
279                    .close_browser_sessions_for_owner(&session_id)
280                    .await;
281                reaper_state.event_bus.publish(EngineEvent::new(
282                    "session.run.finished",
283                    json!({
284                        "sessionID": session_id,
285                        "runID": run.run_id,
286                        "finishedAtMs": crate::now_ms(),
287                        "status": "timeout",
288                    }),
289                ));
290            }
291        }
292    });
293    let session_part_persister = tokio::spawn(crate::run_session_part_persister(
294        session_part_persister_state,
295    ));
296    let session_context_run_journaler = tokio::spawn(crate::run_session_context_run_journaler(
297        session_context_run_journaler_state,
298    ));
299    let status_indexer = tokio::spawn(crate::run_status_indexer(status_indexer_state));
300    let routine_scheduler = tokio::spawn(crate::run_routine_scheduler(routine_scheduler_state));
301    let routine_executor = tokio::spawn(crate::run_routine_executor(routine_executor_state));
302    let usage_aggregator = tokio::spawn(crate::run_usage_aggregator(usage_aggregator_state));
303    let automation_v2_scheduler = tokio::spawn(crate::run_automation_v2_scheduler(
304        automation_v2_scheduler_state,
305    ));
306    let automation_v2_executor = tokio::spawn(crate::run_automation_v2_executor(
307        automation_v2_executor_state,
308    ));
309    let optimization_scheduler = tokio::spawn(crate::run_optimization_scheduler(
310        optimization_scheduler_state,
311    ));
312    let workflow_dispatcher =
313        tokio::spawn(crate::run_workflow_dispatcher(workflow_dispatcher_state));
314    let agent_team_supervisor = tokio::spawn(crate::run_agent_team_supervisor(
315        agent_team_supervisor_state,
316    ));
317    let bug_monitor = tokio::spawn(crate::run_bug_monitor(bug_monitor_state));
318    let global_memory_ingestor =
319        tokio::spawn(run_global_memory_ingestor(global_memory_ingestor_state));
320    let shutdown_state = state.clone();
321    let shutdown_timeout_secs = crate::config::env::resolve_scheduler_shutdown_timeout_secs();
322
323    // --- Memory hygiene background task (runs every 12 hours) ---
324    // Opens a fresh connection to memory.sqlite each cycle — safe because WAL
325    // mode allows concurrent readers alongside the main engine connection.
326    let hygiene_task = tokio::spawn(async move {
327        // Initial delay so startup is not impacted.
328        tokio::time::sleep(Duration::from_secs(60)).await;
329        loop {
330            let retention_days: u32 = std::env::var("TANDEM_MEMORY_RETENTION_DAYS")
331                .ok()
332                .and_then(|v| v.parse().ok())
333                .unwrap_or(30);
334            if retention_days > 0 {
335                match tandem_core::resolve_shared_paths() {
336                    Ok(paths) => {
337                        match tandem_memory::db::MemoryDatabase::new(&paths.memory_db_path).await {
338                            Ok(db) => {
339                                if let Err(e) = db.run_hygiene(retention_days).await {
340                                    tracing::warn!("memory hygiene failed: {}", e);
341                                }
342                            }
343                            Err(e) => tracing::warn!("memory hygiene: could not open DB: {}", e),
344                        }
345                    }
346                    Err(e) => tracing::warn!("memory hygiene: could not resolve paths: {}", e),
347                }
348            }
349            tokio::time::sleep(Duration::from_secs(12 * 60 * 60)).await;
350        }
351    });
352
353    // --- Channel listeners (optional) ---
354    // Reads TANDEM_TELEGRAM_BOT_TOKEN, TANDEM_DISCORD_BOT_TOKEN, TANDEM_SLACK_BOT_TOKEN etc.
355    // If no channels are configured the server starts normally without them.
356    let channel_listener_set = match tandem_channels::config::ChannelsConfig::from_env() {
357        Ok(config) => {
358            tracing::info!("tandem-channels: starting configured channel listeners");
359            let set = start_channel_listeners(config).await;
360            Some(set)
361        }
362        Err(e) => {
363            tracing::info!("tandem-channels: no channels configured ({})", e);
364            None
365        }
366    };
367
368    let listener = tokio::net::TcpListener::bind(addr).await?;
369    let result = axum::serve(listener, app)
370        .with_graceful_shutdown(async move {
371            if tokio::signal::ctrl_c().await.is_err() {
372                futures::future::pending::<()>().await;
373            }
374            shutdown_state.set_automation_scheduler_stopping(true);
375            tokio::time::sleep(Duration::from_secs(shutdown_timeout_secs)).await;
376            let failed = shutdown_state
377                .fail_running_automation_runs_for_shutdown()
378                .await;
379            if failed > 0 {
380                tracing::warn!(
381                    failed_runs = failed,
382                    "automation runs marked failed during scheduler shutdown"
383                );
384            }
385        })
386        .await;
387    reaper.abort();
388    session_part_persister.abort();
389    session_context_run_journaler.abort();
390    status_indexer.abort();
391    routine_scheduler.abort();
392    routine_executor.abort();
393    usage_aggregator.abort();
394    automation_v2_scheduler.abort();
395    automation_v2_executor.abort();
396    optimization_scheduler.abort();
397    workflow_dispatcher.abort();
398    agent_team_supervisor.abort();
399    bug_monitor.abort();
400    global_memory_ingestor.abort();
401    hygiene_task.abort();
402    if let Some(mut set) = channel_listener_set {
403        set.abort_all();
404    }
405    result?;
406    Ok(())
407}
408
409fn app_router(state: AppState) -> Router {
410    router::build_router(state)
411}
412fn load_run_events_jsonl(path: &FsPath, since_seq: Option<u64>, tail: Option<usize>) -> Vec<Value> {
413    let content = match std::fs::read_to_string(path) {
414        Ok(value) => value,
415        Err(_) => return Vec::new(),
416    };
417    let mut rows: Vec<Value> = content
418        .lines()
419        .filter_map(|line| serde_json::from_str::<Value>(line).ok())
420        .filter(|row| {
421            if let Some(since) = since_seq {
422                return row.get("seq").and_then(|value| value.as_u64()).unwrap_or(0) > since;
423            }
424            true
425        })
426        .collect();
427    rows.sort_by_key(|row| row.get("seq").and_then(|value| value.as_u64()).unwrap_or(0));
428    if let Some(tail_count) = tail {
429        if rows.len() > tail_count {
430            rows = rows.split_off(rows.len().saturating_sub(tail_count));
431        }
432    }
433    rows
434}
435
436pub(super) fn truncate_for_stream(input: &str, max_len: usize) -> String {
437    if input.len() <= max_len {
438        return input.to_string();
439    }
440    let mut out = input[..max_len].to_string();
441    out.push_str("...<truncated>");
442    out
443}
444
445#[cfg(test)]
446mod tests;