Skip to main content

moire_web/api/
recording.rs

1use std::sync::Arc;
2use std::time::{Duration, Instant};
3
4use axum::body::Bytes;
5use axum::extract::{Path as AxumPath, State};
6use axum::http::{StatusCode, header};
7use axum::response::IntoResponse;
8use moire_types::{RecordCurrentResponse, RecordStartRequest, RecordingImportBody};
9use tokio::sync::Notify;
10use tracing::warn;
11
12use crate::api::snapshot::take_snapshot_internal;
13use crate::app::AppState;
14use crate::recording::session::{
15    RecordingState, build_imported_frames, export_frame_rows, frame_json_by_index, push_frame,
16    recording_session_info,
17};
18use crate::util::http::{json_error, json_ok};
19use crate::util::time::now_ms;
20
21pub async fn api_record_start(State(state): State<AppState>, body: Bytes) -> impl IntoResponse {
22    let req: RecordStartRequest = if body.is_empty() {
23        RecordStartRequest {
24            interval_ms: None,
25            max_frames: None,
26            max_memory_bytes: None,
27        }
28    } else {
29        match facet_json::from_slice(&body) {
30            Ok(req) => req,
31            Err(e) => {
32                return json_error(
33                    StatusCode::BAD_REQUEST,
34                    format!("invalid request json: {e}"),
35                );
36            }
37        }
38    };
39
40    let (session_id, stop_signal) = {
41        let mut guard = state.inner.lock().await;
42        if guard
43            .recording
44            .as_ref()
45            .is_some_and(|r| r.stopped_at_unix_ms.is_none())
46        {
47            return json_error(StatusCode::CONFLICT, "recording already in progress");
48        }
49
50        let session_num = guard.next_session_id;
51        guard.next_session_id = guard.next_session_id.next();
52        let session_id = session_num.to_session_id();
53        let interval_ms = req.interval_ms.unwrap_or(500);
54        let max_frames = req.max_frames.unwrap_or(1000);
55        let max_memory_bytes = req.max_memory_bytes.unwrap_or(256 * 1024 * 1024);
56        let stop_signal = Arc::new(Notify::new());
57
58        guard.recording = Some(RecordingState {
59            session_id: session_id.clone(),
60            interval_ms,
61            started_at_unix_ms: now_ms(),
62            stopped_at_unix_ms: None,
63            frames: Vec::new(),
64            max_frames,
65            max_memory_bytes,
66            overflowed: false,
67            total_frames_captured: 0,
68            approx_memory_bytes: 0,
69            total_capture_ms: 0.0,
70            max_capture_ms: 0.0,
71            stop_signal: stop_signal.clone(),
72        });
73
74        (session_id, stop_signal)
75    };
76
77    let loop_state = state.clone();
78    let loop_session_id = session_id.clone();
79    tokio::spawn(async move {
80        let interval_ms = {
81            let guard = loop_state.inner.lock().await;
82            guard.recording.as_ref().map_or(500, |r| r.interval_ms)
83        };
84        loop {
85            tokio::select! {
86                _ = stop_signal.notified() => break,
87                _ = tokio::time::sleep(Duration::from_millis(interval_ms as u64)) => {
88                    let capture_start = Instant::now();
89                    let snapshot = take_snapshot_internal(&loop_state).await;
90                    let json = match facet_json::to_string(&snapshot) {
91                        Ok(json) => json,
92                        Err(e) => {
93                            warn!(%e, "failed to serialize recording frame");
94                            continue;
95                        }
96                    };
97                    let capture_duration_ms = capture_start.elapsed().as_secs_f64() * 1000.0;
98                    let process_count = snapshot.processes.len() as u32;
99                    let captured_at_unix_ms = snapshot.captured_at_unix_ms;
100                    let mut guard = loop_state.inner.lock().await;
101                    let Some(recording) = &mut guard.recording else { break };
102                    if recording.session_id != loop_session_id || recording.stopped_at_unix_ms.is_some() {
103                        break;
104                    }
105                    push_frame(
106                        recording,
107                        captured_at_unix_ms,
108                        process_count,
109                        capture_duration_ms,
110                        json,
111                    );
112                }
113            }
114        }
115    });
116
117    let guard = state.inner.lock().await;
118    let rec = guard.recording.as_ref().unwrap();
119    json_ok(&RecordCurrentResponse {
120        session: Some(recording_session_info(rec)),
121    })
122}
123
124pub async fn api_record_stop(State(state): State<AppState>) -> impl IntoResponse {
125    let stop_signal = {
126        let mut guard = state.inner.lock().await;
127        match &mut guard.recording {
128            None => return json_error(StatusCode::NOT_FOUND, "no recording in progress"),
129            Some(rec) if rec.stopped_at_unix_ms.is_some() => {
130                return json_error(StatusCode::NOT_FOUND, "no recording in progress");
131            }
132            Some(rec) => {
133                rec.stopped_at_unix_ms = Some(now_ms());
134                rec.stop_signal.clone()
135            }
136        }
137    };
138
139    stop_signal.notify_one();
140
141    let guard = state.inner.lock().await;
142    let rec = guard.recording.as_ref().unwrap();
143    json_ok(&RecordCurrentResponse {
144        session: Some(recording_session_info(rec)),
145    })
146}
147
148pub async fn api_record_current(State(state): State<AppState>) -> impl IntoResponse {
149    let guard = state.inner.lock().await;
150    let session = guard.recording.as_ref().map(recording_session_info);
151    json_ok(&RecordCurrentResponse { session })
152}
153
154pub async fn api_record_frame(
155    State(state): State<AppState>,
156    AxumPath(frame_index): AxumPath<u32>,
157) -> impl IntoResponse {
158    let guard = state.inner.lock().await;
159    let Some(recording) = &guard.recording else {
160        return json_error(StatusCode::NOT_FOUND, "no recording");
161    };
162    let Some(frame_json) = frame_json_by_index(recording, frame_index) else {
163        return json_error(StatusCode::NOT_FOUND, "frame not found");
164    };
165    (
166        StatusCode::OK,
167        [(header::CONTENT_TYPE, "application/json; charset=utf-8")],
168        frame_json.to_string(),
169    )
170        .into_response()
171}
172
173pub async fn api_record_export(State(state): State<AppState>) -> impl IntoResponse {
174    let (session_info, frames_json) = {
175        let guard = state.inner.lock().await;
176        let Some(recording) = &guard.recording else {
177            return json_error(StatusCode::NOT_FOUND, "no recording");
178        };
179        if recording.stopped_at_unix_ms.is_none() {
180            return json_error(StatusCode::CONFLICT, "recording is still in progress");
181        }
182        let session_info = recording_session_info(recording);
183        let frames_json = export_frame_rows(&recording.frames);
184        (session_info, frames_json)
185    };
186
187    let session_json = match facet_json::to_string(&session_info) {
188        Ok(s) => s,
189        Err(e) => {
190            return json_error(
191                StatusCode::INTERNAL_SERVER_ERROR,
192                format!("failed to serialize session: {e}"),
193            );
194        }
195    };
196
197    let export_json = format!(
198        r#"{{"version":1,"session":{},"frames":[{}]}}"#,
199        session_json,
200        frames_json.join(",")
201    );
202
203    let filename = format!(
204        "recording-{}.json",
205        session_info.session_id.as_str().replace(':', "_")
206    );
207    let mut response = (
208        StatusCode::OK,
209        [(header::CONTENT_TYPE, "application/json; charset=utf-8")],
210        export_json,
211    )
212        .into_response();
213    if let Ok(value) =
214        header::HeaderValue::from_str(&format!("attachment; filename=\"{filename}\""))
215    {
216        response
217            .headers_mut()
218            .insert(header::CONTENT_DISPOSITION, value);
219    }
220    response
221}
222
223pub async fn api_record_import(State(state): State<AppState>, body: Bytes) -> impl IntoResponse {
224    let import: RecordingImportBody = match facet_json::from_slice(&body) {
225        Ok(v) => v,
226        Err(e) => return json_error(StatusCode::BAD_REQUEST, format!("invalid import json: {e}")),
227    };
228
229    if import.version != 1 {
230        return json_error(
231            StatusCode::BAD_REQUEST,
232            format!("unsupported export version: {}", import.version),
233        );
234    }
235
236    let frames = match build_imported_frames(&import) {
237        Ok(frames) => frames,
238        Err(error) => return json_error(StatusCode::BAD_REQUEST, error),
239    };
240
241    let approx_memory_bytes: u64 = frames.iter().map(|f| f.json.len() as u64).sum();
242    let total_frames_captured = frames.len() as u32;
243
244    let existing_stop_signal = {
245        let mut guard = state.inner.lock().await;
246        let existing_stop_signal = guard
247            .recording
248            .as_ref()
249            .filter(|r| r.stopped_at_unix_ms.is_none())
250            .map(|r| r.stop_signal.clone());
251
252        guard.recording = Some(RecordingState {
253            session_id: import.session.session_id.clone(),
254            interval_ms: import.session.interval_ms,
255            started_at_unix_ms: import.session.started_at_unix_ms,
256            stopped_at_unix_ms: Some(import.session.stopped_at_unix_ms.unwrap_or_else(now_ms)),
257            frames,
258            max_frames: import.session.max_frames,
259            max_memory_bytes: import.session.max_memory_bytes,
260            overflowed: import.session.overflowed,
261            total_frames_captured,
262            approx_memory_bytes,
263            total_capture_ms: import.session.total_capture_ms,
264            max_capture_ms: import.session.max_capture_ms,
265            stop_signal: Arc::new(Notify::new()),
266        });
267
268        existing_stop_signal
269    };
270
271    if let Some(sig) = existing_stop_signal {
272        sig.notify_one();
273    }
274
275    let guard = state.inner.lock().await;
276    let rec = guard.recording.as_ref().unwrap();
277    json_ok(&RecordCurrentResponse {
278        session: Some(recording_session_info(rec)),
279    })
280}