1use std::sync::Arc;
2use std::time::{Duration, Instant};
3
4use axum::body::Bytes;
5use axum::extract::{Path as AxumPath, State};
6use axum::http::{StatusCode, header};
7use axum::response::IntoResponse;
8use moire_types::{RecordCurrentResponse, RecordStartRequest, RecordingImportBody};
9use tokio::sync::Notify;
10use tracing::warn;
11
12use crate::api::snapshot::take_snapshot_internal;
13use crate::app::AppState;
14use crate::recording::session::{
15 RecordingState, build_imported_frames, export_frame_rows, frame_json_by_index, push_frame,
16 recording_session_info,
17};
18use crate::util::http::{json_error, json_ok};
19use crate::util::time::now_ms;
20
21pub async fn api_record_start(State(state): State<AppState>, body: Bytes) -> impl IntoResponse {
22 let req: RecordStartRequest = if body.is_empty() {
23 RecordStartRequest {
24 interval_ms: None,
25 max_frames: None,
26 max_memory_bytes: None,
27 }
28 } else {
29 match facet_json::from_slice(&body) {
30 Ok(req) => req,
31 Err(e) => {
32 return json_error(
33 StatusCode::BAD_REQUEST,
34 format!("invalid request json: {e}"),
35 );
36 }
37 }
38 };
39
40 let (session_id, stop_signal) = {
41 let mut guard = state.inner.lock().await;
42 if guard
43 .recording
44 .as_ref()
45 .is_some_and(|r| r.stopped_at_unix_ms.is_none())
46 {
47 return json_error(StatusCode::CONFLICT, "recording already in progress");
48 }
49
50 let session_num = guard.next_session_id;
51 guard.next_session_id = guard.next_session_id.next();
52 let session_id = session_num.to_session_id();
53 let interval_ms = req.interval_ms.unwrap_or(500);
54 let max_frames = req.max_frames.unwrap_or(1000);
55 let max_memory_bytes = req.max_memory_bytes.unwrap_or(256 * 1024 * 1024);
56 let stop_signal = Arc::new(Notify::new());
57
58 guard.recording = Some(RecordingState {
59 session_id: session_id.clone(),
60 interval_ms,
61 started_at_unix_ms: now_ms(),
62 stopped_at_unix_ms: None,
63 frames: Vec::new(),
64 max_frames,
65 max_memory_bytes,
66 overflowed: false,
67 total_frames_captured: 0,
68 approx_memory_bytes: 0,
69 total_capture_ms: 0.0,
70 max_capture_ms: 0.0,
71 stop_signal: stop_signal.clone(),
72 });
73
74 (session_id, stop_signal)
75 };
76
77 let loop_state = state.clone();
78 let loop_session_id = session_id.clone();
79 tokio::spawn(async move {
80 let interval_ms = {
81 let guard = loop_state.inner.lock().await;
82 guard.recording.as_ref().map_or(500, |r| r.interval_ms)
83 };
84 loop {
85 tokio::select! {
86 _ = stop_signal.notified() => break,
87 _ = tokio::time::sleep(Duration::from_millis(interval_ms as u64)) => {
88 let capture_start = Instant::now();
89 let snapshot = take_snapshot_internal(&loop_state).await;
90 let json = match facet_json::to_string(&snapshot) {
91 Ok(json) => json,
92 Err(e) => {
93 warn!(%e, "failed to serialize recording frame");
94 continue;
95 }
96 };
97 let capture_duration_ms = capture_start.elapsed().as_secs_f64() * 1000.0;
98 let process_count = snapshot.processes.len() as u32;
99 let captured_at_unix_ms = snapshot.captured_at_unix_ms;
100 let mut guard = loop_state.inner.lock().await;
101 let Some(recording) = &mut guard.recording else { break };
102 if recording.session_id != loop_session_id || recording.stopped_at_unix_ms.is_some() {
103 break;
104 }
105 push_frame(
106 recording,
107 captured_at_unix_ms,
108 process_count,
109 capture_duration_ms,
110 json,
111 );
112 }
113 }
114 }
115 });
116
117 let guard = state.inner.lock().await;
118 let rec = guard.recording.as_ref().unwrap();
119 json_ok(&RecordCurrentResponse {
120 session: Some(recording_session_info(rec)),
121 })
122}
123
124pub async fn api_record_stop(State(state): State<AppState>) -> impl IntoResponse {
125 let stop_signal = {
126 let mut guard = state.inner.lock().await;
127 match &mut guard.recording {
128 None => return json_error(StatusCode::NOT_FOUND, "no recording in progress"),
129 Some(rec) if rec.stopped_at_unix_ms.is_some() => {
130 return json_error(StatusCode::NOT_FOUND, "no recording in progress");
131 }
132 Some(rec) => {
133 rec.stopped_at_unix_ms = Some(now_ms());
134 rec.stop_signal.clone()
135 }
136 }
137 };
138
139 stop_signal.notify_one();
140
141 let guard = state.inner.lock().await;
142 let rec = guard.recording.as_ref().unwrap();
143 json_ok(&RecordCurrentResponse {
144 session: Some(recording_session_info(rec)),
145 })
146}
147
148pub async fn api_record_current(State(state): State<AppState>) -> impl IntoResponse {
149 let guard = state.inner.lock().await;
150 let session = guard.recording.as_ref().map(recording_session_info);
151 json_ok(&RecordCurrentResponse { session })
152}
153
154pub async fn api_record_frame(
155 State(state): State<AppState>,
156 AxumPath(frame_index): AxumPath<u32>,
157) -> impl IntoResponse {
158 let guard = state.inner.lock().await;
159 let Some(recording) = &guard.recording else {
160 return json_error(StatusCode::NOT_FOUND, "no recording");
161 };
162 let Some(frame_json) = frame_json_by_index(recording, frame_index) else {
163 return json_error(StatusCode::NOT_FOUND, "frame not found");
164 };
165 (
166 StatusCode::OK,
167 [(header::CONTENT_TYPE, "application/json; charset=utf-8")],
168 frame_json.to_string(),
169 )
170 .into_response()
171}
172
173pub async fn api_record_export(State(state): State<AppState>) -> impl IntoResponse {
174 let (session_info, frames_json) = {
175 let guard = state.inner.lock().await;
176 let Some(recording) = &guard.recording else {
177 return json_error(StatusCode::NOT_FOUND, "no recording");
178 };
179 if recording.stopped_at_unix_ms.is_none() {
180 return json_error(StatusCode::CONFLICT, "recording is still in progress");
181 }
182 let session_info = recording_session_info(recording);
183 let frames_json = export_frame_rows(&recording.frames);
184 (session_info, frames_json)
185 };
186
187 let session_json = match facet_json::to_string(&session_info) {
188 Ok(s) => s,
189 Err(e) => {
190 return json_error(
191 StatusCode::INTERNAL_SERVER_ERROR,
192 format!("failed to serialize session: {e}"),
193 );
194 }
195 };
196
197 let export_json = format!(
198 r#"{{"version":1,"session":{},"frames":[{}]}}"#,
199 session_json,
200 frames_json.join(",")
201 );
202
203 let filename = format!(
204 "recording-{}.json",
205 session_info.session_id.as_str().replace(':', "_")
206 );
207 let mut response = (
208 StatusCode::OK,
209 [(header::CONTENT_TYPE, "application/json; charset=utf-8")],
210 export_json,
211 )
212 .into_response();
213 if let Ok(value) =
214 header::HeaderValue::from_str(&format!("attachment; filename=\"{filename}\""))
215 {
216 response
217 .headers_mut()
218 .insert(header::CONTENT_DISPOSITION, value);
219 }
220 response
221}
222
223pub async fn api_record_import(State(state): State<AppState>, body: Bytes) -> impl IntoResponse {
224 let import: RecordingImportBody = match facet_json::from_slice(&body) {
225 Ok(v) => v,
226 Err(e) => return json_error(StatusCode::BAD_REQUEST, format!("invalid import json: {e}")),
227 };
228
229 if import.version != 1 {
230 return json_error(
231 StatusCode::BAD_REQUEST,
232 format!("unsupported export version: {}", import.version),
233 );
234 }
235
236 let frames = match build_imported_frames(&import) {
237 Ok(frames) => frames,
238 Err(error) => return json_error(StatusCode::BAD_REQUEST, error),
239 };
240
241 let approx_memory_bytes: u64 = frames.iter().map(|f| f.json.len() as u64).sum();
242 let total_frames_captured = frames.len() as u32;
243
244 let existing_stop_signal = {
245 let mut guard = state.inner.lock().await;
246 let existing_stop_signal = guard
247 .recording
248 .as_ref()
249 .filter(|r| r.stopped_at_unix_ms.is_none())
250 .map(|r| r.stop_signal.clone());
251
252 guard.recording = Some(RecordingState {
253 session_id: import.session.session_id.clone(),
254 interval_ms: import.session.interval_ms,
255 started_at_unix_ms: import.session.started_at_unix_ms,
256 stopped_at_unix_ms: Some(import.session.stopped_at_unix_ms.unwrap_or_else(now_ms)),
257 frames,
258 max_frames: import.session.max_frames,
259 max_memory_bytes: import.session.max_memory_bytes,
260 overflowed: import.session.overflowed,
261 total_frames_captured,
262 approx_memory_bytes,
263 total_capture_ms: import.session.total_capture_ms,
264 max_capture_ms: import.session.max_capture_ms,
265 stop_signal: Arc::new(Notify::new()),
266 });
267
268 existing_stop_signal
269 };
270
271 if let Some(sig) = existing_stop_signal {
272 sig.notify_one();
273 }
274
275 let guard = state.inner.lock().await;
276 let rec = guard.recording.as_ref().unwrap();
277 json_ok(&RecordCurrentResponse {
278 session: Some(recording_session_info(rec)),
279 })
280}