viewpoint_core/context/trace/
mod.rs1mod capture;
10mod network;
11mod sources;
12mod types;
13mod writer;
14
15use std::sync::Arc;
16
17use chrono::Utc;
18use tokio::sync::RwLock;
19use tracing::{debug, info, instrument};
20
21use viewpoint_cdp::protocol::tracing as cdp_tracing;
22use viewpoint_cdp::CdpConnection;
23
24use crate::context::PageInfo;
25use crate::error::ContextError;
26use crate::network::har::HarPage;
27
28pub use types::{ActionEntry, TracingOptions};
29use types::{SourceFileEntry, TracingState};
30
31pub struct Tracing {
53 connection: Arc<CdpConnection>,
55 context_id: String,
57 pages: Arc<RwLock<Vec<PageInfo>>>,
59 state: Arc<RwLock<TracingState>>,
61}
62
63impl std::fmt::Debug for Tracing {
64 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65 f.debug_struct("Tracing")
66 .field("context_id", &self.context_id)
67 .finish_non_exhaustive()
68 }
69}
70
71impl Tracing {
72 pub(crate) fn new(
74 connection: Arc<CdpConnection>,
75 context_id: String,
76 pages: Arc<RwLock<Vec<PageInfo>>>,
77 ) -> Self {
78 Self {
79 connection,
80 context_id,
81 pages,
82 state: Arc::new(RwLock::new(TracingState::default())),
83 }
84 }
85
86 async fn get_session_ids(&self) -> Vec<String> {
88 let pages = self.pages.read().await;
89 pages
90 .iter()
91 .filter(|p| !p.session_id.is_empty())
92 .map(|p| p.session_id.clone())
93 .collect()
94 }
95
96 #[instrument(level = "info", skip(self, options))]
112 pub async fn start(&self, options: TracingOptions) -> Result<(), ContextError> {
113 let mut state = self.state.write().await;
114
115 if state.is_recording {
116 return Err(ContextError::Internal(
117 "Tracing is already active".to_string(),
118 ));
119 }
120
121 info!(
122 screenshots = options.screenshots,
123 snapshots = options.snapshots,
124 "Starting trace"
125 );
126
127 let categories = [
129 "devtools.timeline",
130 "disabled-by-default-devtools.timeline",
131 "disabled-by-default-devtools.timeline.frame",
132 ];
133
134 for session_id in self.get_session_ids().await {
136 let params = cdp_tracing::StartParams {
137 categories: Some(categories.join(",")),
138 transfer_mode: Some(cdp_tracing::TransferMode::ReturnAsStream),
139 ..Default::default()
140 };
141
142 self.connection
143 .send_command::<_, serde_json::Value>(
144 "Tracing.start",
145 Some(params),
146 Some(&session_id),
147 )
148 .await?;
149
150 self.connection
152 .send_command::<_, serde_json::Value>(
153 "Network.enable",
154 Some(serde_json::json!({})),
155 Some(&session_id),
156 )
157 .await?;
158 }
159
160 state.is_recording = true;
162 state.options = options;
163 state.actions.clear();
164 state.events.clear();
165 state.screenshots.clear();
166 state.snapshots.clear();
167 state.pending_requests.clear();
168 state.network_entries.clear();
169 state.har_pages.clear();
170 state.source_files.clear();
171
172 drop(state); network::start_network_listener(
175 self.connection.clone(),
176 self.state.clone(),
177 self.pages.clone(),
178 );
179
180 Ok(())
181 }
182
183 #[instrument(level = "info", skip(self), fields(path = %path.as_ref().display()))]
200 pub async fn stop(&self, path: impl AsRef<std::path::Path>) -> Result<(), ContextError> {
201 let path = path.as_ref();
202 let mut state = self.state.write().await;
203
204 if !state.is_recording {
205 return Err(ContextError::Internal("Tracing is not active".to_string()));
206 }
207
208 info!("Stopping trace and saving");
209
210 for session_id in self.get_session_ids().await {
212 let _ = self
213 .connection
214 .send_command::<_, serde_json::Value>("Tracing.end", None::<()>, Some(&session_id))
215 .await;
216 }
217
218 state.is_recording = false;
219
220 writer::write_trace_zip(path, &state)?;
222
223 Ok(())
224 }
225
226 #[instrument(level = "info", skip(self))]
234 pub async fn stop_discard(&self) -> Result<(), ContextError> {
235 let mut state = self.state.write().await;
236
237 if !state.is_recording {
238 return Err(ContextError::Internal("Tracing is not active".to_string()));
239 }
240
241 info!("Stopping trace and discarding");
242
243 for session_id in self.get_session_ids().await {
245 let _ = self
246 .connection
247 .send_command::<_, serde_json::Value>("Tracing.end", None::<()>, Some(&session_id))
248 .await;
249 }
250
251 state.is_recording = false;
253 state.actions.clear();
254 state.events.clear();
255 state.screenshots.clear();
256 state.snapshots.clear();
257 state.pending_requests.clear();
258 state.network_entries.clear();
259 state.har_pages.clear();
260 state.source_files.clear();
261
262 Ok(())
263 }
264
265 #[instrument(level = "debug", skip(self))]
274 pub async fn start_chunk(&self) -> Result<(), ContextError> {
275 let state = self.state.read().await;
276
277 if !state.is_recording {
278 return Err(ContextError::Internal("Tracing is not active".to_string()));
279 }
280
281 debug!("Starting new trace chunk");
282
283 Ok(())
287 }
288
289 #[instrument(level = "debug", skip(self), fields(path = %path.as_ref().display()))]
295 pub async fn stop_chunk(&self, path: impl AsRef<std::path::Path>) -> Result<(), ContextError> {
296 let path = path.as_ref();
297 let state = self.state.read().await;
298
299 if !state.is_recording {
300 return Err(ContextError::Internal("Tracing is not active".to_string()));
301 }
302
303 debug!("Stopping trace chunk and saving");
304
305 writer::write_trace_zip(path, &state)?;
307
308 Ok(())
312 }
313
314 pub async fn is_recording(&self) -> bool {
316 self.state.read().await.is_recording
317 }
318
319 pub async fn add_source_file(&self, path: impl Into<String>, content: impl Into<String>) {
332 let mut state = self.state.write().await;
333 state.source_files.push(SourceFileEntry {
334 path: path.into(),
335 content: content.into(),
336 });
337 }
338
339 pub async fn collect_sources(
352 &self,
353 dir: impl AsRef<std::path::Path>,
354 extensions: &[&str],
355 ) -> Result<(), ContextError> {
356 let files = sources::collect_sources_from_dir(dir.as_ref(), extensions)?;
357
358 let mut state = self.state.write().await;
359 for (path, content) in files {
360 state.source_files.push(SourceFileEntry { path, content });
361 }
362
363 Ok(())
364 }
365
366 pub(crate) async fn record_action(
370 &self,
371 action_type: &str,
372 selector: Option<&str>,
373 page_id: Option<&str>,
374 ) -> ActionHandle<'_> {
375 let start_time = std::time::SystemTime::now()
376 .duration_since(std::time::UNIX_EPOCH)
377 .unwrap_or_default()
378 .as_secs_f64()
379 * 1000.0;
380
381 let action = ActionEntry {
382 action_type: action_type.to_string(),
383 selector: selector.map(ToString::to_string),
384 page_id: page_id.map(ToString::to_string),
385 start_time,
386 end_time: None,
387 result: None,
388 value: None,
389 url: None,
390 screenshot: None,
391 snapshot: None,
392 };
393
394 let mut state = self.state.write().await;
395 let index = state.actions.len();
396 state.actions.push(action);
397
398 ActionHandle {
399 tracing: self,
400 index,
401 }
402 }
403
404 pub(crate) async fn record_page(&self, page_id: &str, title: &str) {
406 let mut state = self.state.write().await;
407 let started_date_time = Utc::now().to_rfc3339();
408 let page = HarPage::new(page_id, title, &started_date_time);
409 state.har_pages.push(page);
410 state.current_page_id = Some(page_id.to_string());
411 }
412
413 pub(crate) async fn capture_screenshot(
415 &self,
416 session_id: &str,
417 name: Option<&str>,
418 ) -> Result<(), ContextError> {
419 capture::capture_screenshot(&self.connection, &self.state, session_id, name).await
420 }
421
422 pub(crate) async fn capture_dom_snapshot(
424 &self,
425 session_id: &str,
426 ) -> Result<(), ContextError> {
427 capture::capture_dom_snapshot(&self.connection, &self.state, session_id).await
428 }
429
430 pub(crate) async fn capture_action_context(
432 &self,
433 session_id: &str,
434 action_name: Option<&str>,
435 ) -> Result<(), ContextError> {
436 capture::capture_action_context(&self.connection, &self.state, session_id, action_name).await
437 }
438}
439
440pub struct ActionHandle<'a> {
442 tracing: &'a Tracing,
443 index: usize,
444}
445
446impl ActionHandle<'_> {
447 pub async fn complete(self, result: Option<serde_json::Value>) {
449 let end_time = std::time::SystemTime::now()
450 .duration_since(std::time::UNIX_EPOCH)
451 .unwrap_or_default()
452 .as_secs_f64()
453 * 1000.0;
454
455 let mut state = self.tracing.state.write().await;
456 if let Some(action) = state.actions.get_mut(self.index) {
457 action.end_time = Some(end_time);
458 action.result = result;
459 }
460 }
461
462 pub async fn fail(self, error: &str) {
464 let end_time = std::time::SystemTime::now()
465 .duration_since(std::time::UNIX_EPOCH)
466 .unwrap_or_default()
467 .as_secs_f64()
468 * 1000.0;
469
470 let mut state = self.tracing.state.write().await;
471 if let Some(action) = state.actions.get_mut(self.index) {
472 action.end_time = Some(end_time);
473 action.result = Some(serde_json::json!({ "error": error }));
474 }
475 }
476}