1use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
2use std::path::PathBuf;
3use std::sync::Arc;
4
5use axum::Router;
6use axum::routing::{any, get, post};
7use tower_http::services::{ServeDir, ServeFile};
8
9use crate::api::connections::{api_connections, api_cut_status, api_trigger_cut};
10use crate::api::recording::{
11 api_record_current, api_record_export, api_record_frame, api_record_import, api_record_start,
12 api_record_stop,
13};
14use crate::api::snapshot::{api_snapshot, api_snapshot_current, api_snapshot_symbolication_ws};
15use crate::api::source::{api_source_preview, api_source_previews};
16use crate::api::sql::{api_query, api_sql};
17use crate::api::theme::api_arborium_theme_css;
18use crate::db::{Db, StoredModuleManifestEntry};
19use crate::proxy::proxy_vite;
20use crate::recording::session::RecordingState;
21use moire_trace_types::BacktraceId;
22use moire_types::{ProcessId, SnapshotCutResponse};
23use moire_wire::SnapshotReply;
24use tokio::sync::{Mutex, Notify, mpsc};
25
26pub mod ids;
27pub use ids::{ConnectionId, CutOrdinal, SessionOrdinal};
28
29#[derive(Clone)]
30pub struct AppState {
31 pub inner: Arc<Mutex<ServerState>>,
32 pub db: Arc<Db>,
33 pub dev_proxy: Option<DevProxyState>,
34 pub frontend_dist: Option<PathBuf>,
35}
36
37#[derive(Clone)]
38pub struct DevProxyState {
39 pub base_url: Arc<String>,
40}
41
42pub struct ServerState {
43 pub next_conn_id: ConnectionId,
44 pub next_cut_id: CutOrdinal,
45 pub next_snapshot_id: i64,
46 pub next_session_id: SessionOrdinal,
47 pub connections: HashMap<ConnectionId, ConnectedProcess>,
48 pub cuts: BTreeMap<moire_types::CutId, CutState>,
49 pub pending_snapshots: HashMap<i64, SnapshotPending>,
50 pub snapshot_streams: HashMap<i64, SnapshotStreamState>,
51 pub last_snapshot_json: Option<String>,
52 pub snapshot_history_ids: VecDeque<i64>,
53 pub snapshot_history_json: BTreeMap<i64, String>,
54 pub recording: Option<RecordingState>,
55}
56
57pub struct ConnectedProcess {
58 pub process_id: Option<ProcessId>,
59 pub process_name: String,
60 pub pid: u32,
61 pub handshake_received: bool,
62 pub module_manifest: Vec<StoredModuleManifestEntry>,
63 pub tx: mpsc::Sender<Vec<u8>>,
64}
65
66pub struct CutState {
67 pub requested_at_ns: i64,
68 pub pending_conn_ids: BTreeSet<ConnectionId>,
69 pub acks: BTreeMap<ConnectionId, moire_types::CutAck>,
70}
71
72pub struct SnapshotPending {
73 pub pending_conn_ids: BTreeSet<ConnectionId>,
74 pub replies: HashMap<ConnectionId, SnapshotReply>,
75 pub notify: Arc<Notify>,
76}
77
78pub struct SnapshotStreamState {
79 pub backtrace_ids: Vec<BacktraceId>,
80}
81
82impl ServerState {
83 pub fn new(next_conn_id: ConnectionId) -> Self {
84 Self {
85 next_conn_id,
86 next_cut_id: CutOrdinal::ONE,
87 next_snapshot_id: 1,
88 next_session_id: SessionOrdinal::ONE,
89 connections: HashMap::new(),
90 cuts: BTreeMap::new(),
91 pending_snapshots: HashMap::new(),
92 snapshot_streams: HashMap::new(),
93 last_snapshot_json: None,
94 snapshot_history_ids: VecDeque::new(),
95 snapshot_history_json: BTreeMap::new(),
96 recording: None,
97 }
98 }
99}
100
101impl AppState {
102 pub fn new(
103 db: Db,
104 next_conn_id: ConnectionId,
105 dev_proxy: Option<DevProxyState>,
106 frontend_dist: Option<PathBuf>,
107 ) -> Self {
108 Self {
109 inner: Arc::new(Mutex::new(ServerState::new(next_conn_id))),
110 db: Arc::new(db),
111 dev_proxy,
112 frontend_dist,
113 }
114 }
115}
116
117pub fn build_router(state: AppState) -> Router {
118 let mut app = Router::new()
119 .route("/health", get(health))
120 .route("/api/connections", get(api_connections))
121 .route("/api/cuts", post(api_trigger_cut))
122 .route("/api/cuts/{cut_id}", get(api_cut_status))
123 .route("/api/sql", post(api_sql))
124 .route("/api/query", post(api_query))
125 .route("/api/snapshot", post(api_snapshot))
126 .route("/api/snapshot/current", get(api_snapshot_current))
127 .route(
128 "/api/snapshot/{snapshot_id}/symbolication/ws",
129 get(api_snapshot_symbolication_ws),
130 )
131 .route("/api/record/start", post(api_record_start))
132 .route("/api/record/stop", post(api_record_stop))
133 .route("/api/record/current", get(api_record_current))
134 .route(
135 "/api/record/current/frame/{frame_index}",
136 get(api_record_frame),
137 )
138 .route("/api/record/current/export", get(api_record_export))
139 .route("/api/record/import", post(api_record_import))
140 .route("/api/source/preview", get(api_source_preview))
141 .route("/api/source/previews", post(api_source_previews))
142 .route("/api/arborium-theme.css", get(api_arborium_theme_css));
143 if state.dev_proxy.is_some() {
144 app = app.fallback(any(proxy_vite));
145 } else if let Some(frontend_dist) = &state.frontend_dist {
146 let spa = ServeDir::new(frontend_dist)
147 .not_found_service(ServeFile::new(frontend_dist.join("index.html")));
148 app = app.fallback_service(spa);
149 }
150 app.with_state(state)
151}
152
153pub async fn health() -> &'static str {
154 "ok"
155}
156
157pub async fn remember_snapshot(state: &AppState, snapshot: &SnapshotCutResponse) {
158 const SNAPSHOT_HISTORY_LIMIT: usize = 64;
159
160 let Ok(json) = facet_json::to_string(snapshot) else {
161 tracing::warn!("failed to serialize snapshot for cache");
162 return;
163 };
164 let mut guard = state.inner.lock().await;
165 guard.last_snapshot_json = Some(json.clone());
166 guard
167 .snapshot_history_json
168 .insert(snapshot.snapshot_id, json);
169 if !guard.snapshot_history_ids.contains(&snapshot.snapshot_id) {
170 guard.snapshot_history_ids.push_back(snapshot.snapshot_id);
171 }
172 while guard.snapshot_history_ids.len() > SNAPSHOT_HISTORY_LIMIT {
173 let Some(oldest) = guard.snapshot_history_ids.pop_front() else {
174 break;
175 };
176 guard.snapshot_history_json.remove(&oldest);
177 }
178}