1use std::io::Write;
10use std::path::PathBuf;
11use std::time::Duration;
12
13use async_trait::async_trait;
14use serde::{Deserialize, Serialize};
15
16use super::{Reporter, ReporterEvent, StepFinishedEvent, StepStartedEvent};
17use crate::model::{StepCategory, TestId, TestOutcome, TestStatus};
18
19const SCHEMA_VERSION: u32 = 1;
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
25#[serde(tag = "kind", rename_all = "kebab-case")]
26pub enum WireEvent {
27 Header {
28 schema: u32,
29 shard_index: Option<u32>,
30 shard_total: Option<u32>,
31 },
32 RunStarted {
33 total_tests: usize,
34 num_workers: u32,
35 metadata: serde_json::Value,
36 },
37 WorkerStarted {
38 worker_id: u32,
39 },
40 TestStarted {
41 test_id: WireTestId,
42 attempt: u32,
43 },
44 StepStarted {
45 test_id: WireTestId,
46 step_id: String,
47 parent_step_id: Option<String>,
48 title: String,
49 category: String,
50 },
51 StepFinished {
52 test_id: WireTestId,
53 step_id: String,
54 title: String,
55 category: String,
56 duration_ms: u64,
57 error: Option<String>,
58 metadata: Option<serde_json::Value>,
59 },
60 TestFinished {
61 test_id: WireTestId,
62 status: String,
63 duration_ms: u64,
64 attempt: u32,
65 error: Option<String>,
66 },
67 WorkerFinished {
68 worker_id: u32,
69 },
70 RunFinished {
71 total: usize,
72 passed: usize,
73 failed: usize,
74 skipped: usize,
75 flaky: usize,
76 duration_ms: u64,
77 },
78}
79
80#[derive(Debug, Clone, Serialize, Deserialize)]
81pub struct WireTestId {
82 pub file: String,
83 pub suite: Option<String>,
84 pub name: String,
85 pub line: Option<usize>,
86}
87
88impl From<&TestId> for WireTestId {
89 fn from(id: &TestId) -> Self {
90 Self {
91 file: id.file.clone(),
92 suite: id.suite.clone(),
93 name: id.name.clone(),
94 line: id.line,
95 }
96 }
97}
98
99impl From<WireTestId> for TestId {
100 fn from(w: WireTestId) -> Self {
101 Self {
102 file: w.file,
103 suite: w.suite,
104 name: w.name,
105 line: w.line,
106 }
107 }
108}
109
110fn step_category_str(c: StepCategory) -> &'static str {
111 match c {
112 StepCategory::TestStep => "test-step",
113 StepCategory::Expect => "expect",
114 StepCategory::Fixture => "fixture",
115 StepCategory::Hook => "hook",
116 StepCategory::PwApi => "pw-api",
117 }
118}
119
120fn parse_step_category(s: &str) -> StepCategory {
121 match s {
122 "expect" => StepCategory::Expect,
123 "fixture" => StepCategory::Fixture,
124 "hook" => StepCategory::Hook,
125 "pw-api" => StepCategory::PwApi,
126 _ => StepCategory::TestStep,
127 }
128}
129
130fn status_str(s: TestStatus) -> &'static str {
131 match s {
132 TestStatus::Passed => "passed",
133 TestStatus::Failed => "failed",
134 TestStatus::TimedOut => "timed-out",
135 TestStatus::Skipped => "skipped",
136 TestStatus::Flaky => "flaky",
137 TestStatus::Interrupted => "interrupted",
138 }
139}
140
141fn parse_status(s: &str) -> TestStatus {
142 match s {
143 "failed" => TestStatus::Failed,
144 "timed-out" => TestStatus::TimedOut,
145 "skipped" => TestStatus::Skipped,
146 "flaky" => TestStatus::Flaky,
147 "interrupted" => TestStatus::Interrupted,
148 _ => TestStatus::Passed,
149 }
150}
151
152impl WireEvent {
153 pub fn from_runtime(event: &ReporterEvent) -> Option<Self> {
154 Some(match event {
155 ReporterEvent::RunStarted {
156 total_tests,
157 num_workers,
158 metadata,
159 } => Self::RunStarted {
160 total_tests: *total_tests,
161 num_workers: *num_workers,
162 metadata: metadata.clone(),
163 },
164 ReporterEvent::WorkerStarted { worker_id } => Self::WorkerStarted { worker_id: *worker_id },
165 ReporterEvent::TestStarted { test_id, attempt } => Self::TestStarted {
166 test_id: test_id.into(),
167 attempt: *attempt,
168 },
169 ReporterEvent::StepStarted(s) => Self::StepStarted {
170 test_id: (&s.test_id).into(),
171 step_id: s.step_id.clone(),
172 parent_step_id: s.parent_step_id.clone(),
173 title: s.title.clone(),
174 category: step_category_str(s.category.clone()).to_string(),
175 },
176 ReporterEvent::StepFinished(s) => Self::StepFinished {
177 test_id: (&s.test_id).into(),
178 step_id: s.step_id.clone(),
179 title: s.title.clone(),
180 category: step_category_str(s.category.clone()).to_string(),
181 duration_ms: s.duration.as_millis() as u64,
182 error: s.error.clone(),
183 metadata: s.metadata.clone(),
184 },
185 ReporterEvent::TestFinished { test_id, outcome } => Self::TestFinished {
186 test_id: test_id.into(),
187 status: status_str(outcome.status.clone()).to_string(),
188 duration_ms: outcome.duration.as_millis() as u64,
189 attempt: outcome.attempt,
190 error: outcome.error.as_ref().map(|e| e.message.clone()),
191 },
192 ReporterEvent::WorkerFinished { worker_id } => Self::WorkerFinished { worker_id: *worker_id },
193 ReporterEvent::RunFinished {
194 total,
195 passed,
196 failed,
197 skipped,
198 flaky,
199 duration,
200 } => Self::RunFinished {
201 total: *total,
202 passed: *passed,
203 failed: *failed,
204 skipped: *skipped,
205 flaky: *flaky,
206 duration_ms: duration.as_millis() as u64,
207 },
208 })
209 }
210
211 pub fn into_runtime(self) -> Option<ReporterEvent> {
214 Some(match self {
215 Self::Header { .. } => return None,
216 Self::RunStarted {
217 total_tests,
218 num_workers,
219 metadata,
220 } => ReporterEvent::RunStarted {
221 total_tests,
222 num_workers,
223 metadata,
224 },
225 Self::WorkerStarted { worker_id } => ReporterEvent::WorkerStarted { worker_id },
226 Self::TestStarted { test_id, attempt } => ReporterEvent::TestStarted {
227 test_id: test_id.into(),
228 attempt,
229 },
230 Self::StepStarted {
231 test_id,
232 step_id,
233 parent_step_id,
234 title,
235 category,
236 } => ReporterEvent::StepStarted(Box::new(StepStartedEvent {
237 test_id: test_id.into(),
238 step_id,
239 parent_step_id,
240 title,
241 category: parse_step_category(&category),
242 })),
243 Self::StepFinished {
244 test_id,
245 step_id,
246 title,
247 category,
248 duration_ms,
249 error,
250 metadata,
251 } => ReporterEvent::StepFinished(Box::new(StepFinishedEvent {
252 test_id: test_id.into(),
253 step_id,
254 title,
255 category: parse_step_category(&category),
256 duration: Duration::from_millis(duration_ms),
257 error,
258 metadata,
259 })),
260 Self::TestFinished {
261 test_id,
262 status,
263 duration_ms,
264 attempt,
265 error,
266 } => {
267 let status = parse_status(&status);
268 let id: TestId = test_id.into();
269 ReporterEvent::TestFinished {
270 test_id: id.clone(),
271 outcome: TestOutcome {
272 test_id: id,
273 status,
274 duration: Duration::from_millis(duration_ms),
275 attempt,
276 max_attempts: 1,
277 error: error.map(|message| crate::model::TestFailure {
278 message,
279 stack: None,
280 diff: None,
281 screenshot: None,
282 }),
283 attachments: Vec::new(),
284 steps: Vec::new(),
285 stdout: String::new(),
286 stderr: String::new(),
287 annotations: Vec::new(),
288 metadata: serde_json::Value::Null,
289 },
290 }
291 },
292 Self::WorkerFinished { worker_id } => ReporterEvent::WorkerFinished { worker_id },
293 Self::RunFinished {
294 total,
295 passed,
296 failed,
297 skipped,
298 flaky,
299 duration_ms,
300 } => ReporterEvent::RunFinished {
301 total,
302 passed,
303 failed,
304 skipped,
305 flaky,
306 duration: Duration::from_millis(duration_ms),
307 },
308 })
309 }
310}
311
312pub struct BlobReporter {
317 out_path: PathBuf,
318 buffer: Vec<u8>,
319 shard_index: Option<u32>,
320 shard_total: Option<u32>,
321}
322
323impl BlobReporter {
324 #[must_use]
328 pub fn new(out_path: PathBuf) -> Self {
329 let mut buffer = Vec::new();
330 write_event(
331 &mut buffer,
332 &WireEvent::Header {
333 schema: SCHEMA_VERSION,
334 shard_index: None,
335 shard_total: None,
336 },
337 );
338 Self {
339 out_path,
340 buffer,
341 shard_index: None,
342 shard_total: None,
343 }
344 }
345
346 pub fn with_shard(mut self, current: u32, total: u32) -> Self {
347 self.shard_index = Some(current);
348 self.shard_total = Some(total);
349 self.buffer.clear();
351 write_event(
352 &mut self.buffer,
353 &WireEvent::Header {
354 schema: SCHEMA_VERSION,
355 shard_index: self.shard_index,
356 shard_total: self.shard_total,
357 },
358 );
359 self
360 }
361}
362
363#[async_trait]
364impl Reporter for BlobReporter {
365 async fn on_event(&mut self, event: &ReporterEvent) {
366 if let Some(wire) = WireEvent::from_runtime(event) {
367 write_event(&mut self.buffer, &wire);
368 }
369 }
370
371 async fn finalize(&mut self) -> ferridriver::error::Result<()> {
372 use ferridriver::FerriError;
373 if let Some(parent) = self.out_path.parent() {
374 std::fs::create_dir_all(parent)?;
375 }
376 let file = std::fs::File::create(&self.out_path)?;
377 let mut zip = zip::ZipWriter::new(file);
378 let opts: zip::write::SimpleFileOptions =
379 zip::write::SimpleFileOptions::default().compression_method(zip::CompressionMethod::Deflated);
380 zip
381 .start_file("events.jsonl", opts)
382 .map_err(|e| FerriError::backend(format!("zip start_file: {e}")))?;
383 zip
384 .write_all(&self.buffer)
385 .map_err(|e| FerriError::backend(format!("zip write: {e}")))?;
386 zip
387 .finish()
388 .map_err(|e| FerriError::backend(format!("zip finish: {e}")))?;
389 Ok(())
390 }
391}
392
393fn write_event(buffer: &mut Vec<u8>, event: &WireEvent) {
394 if let Ok(line) = serde_json::to_string(event) {
395 buffer.extend_from_slice(line.as_bytes());
396 buffer.push(b'\n');
397 }
398}
399
400pub fn read_blob_dir(dir: &std::path::Path) -> Result<Vec<ReporterEvent>, String> {
407 let mut events = Vec::new();
408 let entries = std::fs::read_dir(dir).map_err(|e| format!("read_dir {}: {e}", dir.display()))?;
409 let mut zips: Vec<PathBuf> = Vec::new();
410 for entry in entries {
411 let entry = entry.map_err(|e| format!("dir entry: {e}"))?;
412 let path = entry.path();
413 if path.extension().and_then(|s| s.to_str()) == Some("zip") {
414 zips.push(path);
415 }
416 }
417 zips.sort();
418 for path in zips {
419 let file = std::fs::File::open(&path).map_err(|e| format!("open {}: {e}", path.display()))?;
420 let mut zip = zip::ZipArchive::new(file).map_err(|e| format!("zip read {}: {e}", path.display()))?;
421 let mut events_file = zip
422 .by_name("events.jsonl")
423 .map_err(|e| format!("missing events.jsonl in {}: {e}", path.display()))?;
424 let mut buf = String::new();
425 use std::io::Read;
426 events_file
427 .read_to_string(&mut buf)
428 .map_err(|e| format!("read jsonl: {e}"))?;
429 for (i, line) in buf.lines().enumerate() {
430 if line.trim().is_empty() {
431 continue;
432 }
433 let wire: WireEvent =
434 serde_json::from_str(line).map_err(|e| format!("parse line {i} in {}: {e}", path.display()))?;
435 if let Some(event) = wire.into_runtime() {
436 events.push(event);
437 }
438 }
439 }
440 Ok(events)
441}