Skip to main content

ferridriver_test/reporter/
blob.rs

1//! `blob` reporter — emits a `report.zip` containing every
2//! `ReporterEvent` as a JSON-lines stream. Mirrors Playwright's
3//! `/tmp/playwright/packages/playwright/src/reporters/blob.ts`.
4//!
5//! The merge subcommand (`ferridriver-test merge-reports <dir>`)
6//! reads every blob in a directory, replays the merged event stream
7//! through the configured reporter, and produces a unified report.
8
9use std::io::Write;
10use std::path::PathBuf;
11use std::time::Duration;
12
13use async_trait::async_trait;
14use serde::{Deserialize, Serialize};
15
16use super::{Reporter, ReporterEvent, StepFinishedEvent, StepStartedEvent};
17use crate::model::{StepCategory, TestId, TestOutcome, TestStatus};
18
19const SCHEMA_VERSION: u32 = 1;
20
21/// Wire-format mirror of `ReporterEvent`. Distinct from the runtime
22/// enum so adding a new event variant doesn't break stored blobs and
23/// vice-versa — the Wire shape is the contract.
24#[derive(Debug, Clone, Serialize, Deserialize)]
25#[serde(tag = "kind", rename_all = "kebab-case")]
26pub enum WireEvent {
27  Header {
28    schema: u32,
29    shard_index: Option<u32>,
30    shard_total: Option<u32>,
31  },
32  RunStarted {
33    total_tests: usize,
34    num_workers: u32,
35    metadata: serde_json::Value,
36  },
37  WorkerStarted {
38    worker_id: u32,
39  },
40  TestStarted {
41    test_id: WireTestId,
42    attempt: u32,
43  },
44  StepStarted {
45    test_id: WireTestId,
46    step_id: String,
47    parent_step_id: Option<String>,
48    title: String,
49    category: String,
50  },
51  StepFinished {
52    test_id: WireTestId,
53    step_id: String,
54    title: String,
55    category: String,
56    duration_ms: u64,
57    error: Option<String>,
58    metadata: Option<serde_json::Value>,
59  },
60  TestFinished {
61    test_id: WireTestId,
62    status: String,
63    duration_ms: u64,
64    attempt: u32,
65    error: Option<String>,
66  },
67  WorkerFinished {
68    worker_id: u32,
69  },
70  RunFinished {
71    total: usize,
72    passed: usize,
73    failed: usize,
74    skipped: usize,
75    flaky: usize,
76    duration_ms: u64,
77  },
78}
79
80#[derive(Debug, Clone, Serialize, Deserialize)]
81pub struct WireTestId {
82  pub file: String,
83  pub suite: Option<String>,
84  pub name: String,
85  pub line: Option<usize>,
86}
87
88impl From<&TestId> for WireTestId {
89  fn from(id: &TestId) -> Self {
90    Self {
91      file: id.file.clone(),
92      suite: id.suite.clone(),
93      name: id.name.clone(),
94      line: id.line,
95    }
96  }
97}
98
99impl From<WireTestId> for TestId {
100  fn from(w: WireTestId) -> Self {
101    Self {
102      file: w.file,
103      suite: w.suite,
104      name: w.name,
105      line: w.line,
106    }
107  }
108}
109
110fn step_category_str(c: StepCategory) -> &'static str {
111  match c {
112    StepCategory::TestStep => "test-step",
113    StepCategory::Expect => "expect",
114    StepCategory::Fixture => "fixture",
115    StepCategory::Hook => "hook",
116    StepCategory::PwApi => "pw-api",
117  }
118}
119
120fn parse_step_category(s: &str) -> StepCategory {
121  match s {
122    "expect" => StepCategory::Expect,
123    "fixture" => StepCategory::Fixture,
124    "hook" => StepCategory::Hook,
125    "pw-api" => StepCategory::PwApi,
126    _ => StepCategory::TestStep,
127  }
128}
129
130fn status_str(s: TestStatus) -> &'static str {
131  match s {
132    TestStatus::Passed => "passed",
133    TestStatus::Failed => "failed",
134    TestStatus::TimedOut => "timed-out",
135    TestStatus::Skipped => "skipped",
136    TestStatus::Flaky => "flaky",
137    TestStatus::Interrupted => "interrupted",
138  }
139}
140
141fn parse_status(s: &str) -> TestStatus {
142  match s {
143    "failed" => TestStatus::Failed,
144    "timed-out" => TestStatus::TimedOut,
145    "skipped" => TestStatus::Skipped,
146    "flaky" => TestStatus::Flaky,
147    "interrupted" => TestStatus::Interrupted,
148    _ => TestStatus::Passed,
149  }
150}
151
152impl WireEvent {
153  pub fn from_runtime(event: &ReporterEvent) -> Option<Self> {
154    Some(match event {
155      ReporterEvent::RunStarted {
156        total_tests,
157        num_workers,
158        metadata,
159      } => Self::RunStarted {
160        total_tests: *total_tests,
161        num_workers: *num_workers,
162        metadata: metadata.clone(),
163      },
164      ReporterEvent::WorkerStarted { worker_id } => Self::WorkerStarted { worker_id: *worker_id },
165      ReporterEvent::TestStarted { test_id, attempt } => Self::TestStarted {
166        test_id: test_id.into(),
167        attempt: *attempt,
168      },
169      ReporterEvent::StepStarted(s) => Self::StepStarted {
170        test_id: (&s.test_id).into(),
171        step_id: s.step_id.clone(),
172        parent_step_id: s.parent_step_id.clone(),
173        title: s.title.clone(),
174        category: step_category_str(s.category.clone()).to_string(),
175      },
176      ReporterEvent::StepFinished(s) => Self::StepFinished {
177        test_id: (&s.test_id).into(),
178        step_id: s.step_id.clone(),
179        title: s.title.clone(),
180        category: step_category_str(s.category.clone()).to_string(),
181        duration_ms: s.duration.as_millis() as u64,
182        error: s.error.clone(),
183        metadata: s.metadata.clone(),
184      },
185      ReporterEvent::TestFinished { test_id, outcome } => Self::TestFinished {
186        test_id: test_id.into(),
187        status: status_str(outcome.status.clone()).to_string(),
188        duration_ms: outcome.duration.as_millis() as u64,
189        attempt: outcome.attempt,
190        error: outcome.error.as_ref().map(|e| e.message.clone()),
191      },
192      ReporterEvent::WorkerFinished { worker_id } => Self::WorkerFinished { worker_id: *worker_id },
193      ReporterEvent::RunFinished {
194        total,
195        passed,
196        failed,
197        skipped,
198        flaky,
199        duration,
200      } => Self::RunFinished {
201        total: *total,
202        passed: *passed,
203        failed: *failed,
204        skipped: *skipped,
205        flaky: *flaky,
206        duration_ms: duration.as_millis() as u64,
207      },
208    })
209  }
210
211  /// Lower a wire event back into the runtime variant. Header
212  /// frames return `None` since they're metadata, not test events.
213  pub fn into_runtime(self) -> Option<ReporterEvent> {
214    Some(match self {
215      Self::Header { .. } => return None,
216      Self::RunStarted {
217        total_tests,
218        num_workers,
219        metadata,
220      } => ReporterEvent::RunStarted {
221        total_tests,
222        num_workers,
223        metadata,
224      },
225      Self::WorkerStarted { worker_id } => ReporterEvent::WorkerStarted { worker_id },
226      Self::TestStarted { test_id, attempt } => ReporterEvent::TestStarted {
227        test_id: test_id.into(),
228        attempt,
229      },
230      Self::StepStarted {
231        test_id,
232        step_id,
233        parent_step_id,
234        title,
235        category,
236      } => ReporterEvent::StepStarted(Box::new(StepStartedEvent {
237        test_id: test_id.into(),
238        step_id,
239        parent_step_id,
240        title,
241        category: parse_step_category(&category),
242      })),
243      Self::StepFinished {
244        test_id,
245        step_id,
246        title,
247        category,
248        duration_ms,
249        error,
250        metadata,
251      } => ReporterEvent::StepFinished(Box::new(StepFinishedEvent {
252        test_id: test_id.into(),
253        step_id,
254        title,
255        category: parse_step_category(&category),
256        duration: Duration::from_millis(duration_ms),
257        error,
258        metadata,
259      })),
260      Self::TestFinished {
261        test_id,
262        status,
263        duration_ms,
264        attempt,
265        error,
266      } => {
267        let status = parse_status(&status);
268        let id: TestId = test_id.into();
269        ReporterEvent::TestFinished {
270          test_id: id.clone(),
271          outcome: TestOutcome {
272            test_id: id,
273            status,
274            duration: Duration::from_millis(duration_ms),
275            attempt,
276            max_attempts: 1,
277            error: error.map(|message| crate::model::TestFailure {
278              message,
279              stack: None,
280              diff: None,
281              screenshot: None,
282            }),
283            attachments: Vec::new(),
284            steps: Vec::new(),
285            stdout: String::new(),
286            stderr: String::new(),
287            annotations: Vec::new(),
288            metadata: serde_json::Value::Null,
289          },
290        }
291      },
292      Self::WorkerFinished { worker_id } => ReporterEvent::WorkerFinished { worker_id },
293      Self::RunFinished {
294        total,
295        passed,
296        failed,
297        skipped,
298        flaky,
299        duration_ms,
300      } => ReporterEvent::RunFinished {
301        total,
302        passed,
303        failed,
304        skipped,
305        flaky,
306        duration: Duration::from_millis(duration_ms),
307      },
308    })
309  }
310}
311
312/// `--reporter blob` writes one `report-<shard>.zip` per run; each
313/// zip contains a single `events.jsonl` member. The merge subcommand
314/// reads every zip in a directory, concats the streams, and replays
315/// them through the configured reporter.
316pub struct BlobReporter {
317  out_path: PathBuf,
318  buffer: Vec<u8>,
319  shard_index: Option<u32>,
320  shard_total: Option<u32>,
321}
322
323impl BlobReporter {
324  /// Construct a blob reporter that writes to `out_path` on
325  /// `finalize()`. Shard metadata (if known) is recorded in the
326  /// header frame so the merger can preserve the run boundary.
327  #[must_use]
328  pub fn new(out_path: PathBuf) -> Self {
329    let mut buffer = Vec::new();
330    write_event(
331      &mut buffer,
332      &WireEvent::Header {
333        schema: SCHEMA_VERSION,
334        shard_index: None,
335        shard_total: None,
336      },
337    );
338    Self {
339      out_path,
340      buffer,
341      shard_index: None,
342      shard_total: None,
343    }
344  }
345
346  pub fn with_shard(mut self, current: u32, total: u32) -> Self {
347    self.shard_index = Some(current);
348    self.shard_total = Some(total);
349    // Rewrite the header now that we know the shard.
350    self.buffer.clear();
351    write_event(
352      &mut self.buffer,
353      &WireEvent::Header {
354        schema: SCHEMA_VERSION,
355        shard_index: self.shard_index,
356        shard_total: self.shard_total,
357      },
358    );
359    self
360  }
361}
362
363#[async_trait]
364impl Reporter for BlobReporter {
365  async fn on_event(&mut self, event: &ReporterEvent) {
366    if let Some(wire) = WireEvent::from_runtime(event) {
367      write_event(&mut self.buffer, &wire);
368    }
369  }
370
371  async fn finalize(&mut self) -> ferridriver::error::Result<()> {
372    use ferridriver::FerriError;
373    if let Some(parent) = self.out_path.parent() {
374      std::fs::create_dir_all(parent)?;
375    }
376    let file = std::fs::File::create(&self.out_path)?;
377    let mut zip = zip::ZipWriter::new(file);
378    let opts: zip::write::SimpleFileOptions =
379      zip::write::SimpleFileOptions::default().compression_method(zip::CompressionMethod::Deflated);
380    zip
381      .start_file("events.jsonl", opts)
382      .map_err(|e| FerriError::backend(format!("zip start_file: {e}")))?;
383    zip
384      .write_all(&self.buffer)
385      .map_err(|e| FerriError::backend(format!("zip write: {e}")))?;
386    zip
387      .finish()
388      .map_err(|e| FerriError::backend(format!("zip finish: {e}")))?;
389    Ok(())
390  }
391}
392
393fn write_event(buffer: &mut Vec<u8>, event: &WireEvent) {
394  if let Ok(line) = serde_json::to_string(event) {
395    buffer.extend_from_slice(line.as_bytes());
396    buffer.push(b'\n');
397  }
398}
399
400/// Read every `report-*.zip` (or any `*.zip`) under `dir` and return
401/// the concatenated runtime event stream.
402///
403/// # Errors
404///
405/// Returns an error if a zip is unreadable or contains malformed JSON.
406pub fn read_blob_dir(dir: &std::path::Path) -> Result<Vec<ReporterEvent>, String> {
407  let mut events = Vec::new();
408  let entries = std::fs::read_dir(dir).map_err(|e| format!("read_dir {}: {e}", dir.display()))?;
409  let mut zips: Vec<PathBuf> = Vec::new();
410  for entry in entries {
411    let entry = entry.map_err(|e| format!("dir entry: {e}"))?;
412    let path = entry.path();
413    if path.extension().and_then(|s| s.to_str()) == Some("zip") {
414      zips.push(path);
415    }
416  }
417  zips.sort();
418  for path in zips {
419    let file = std::fs::File::open(&path).map_err(|e| format!("open {}: {e}", path.display()))?;
420    let mut zip = zip::ZipArchive::new(file).map_err(|e| format!("zip read {}: {e}", path.display()))?;
421    let mut events_file = zip
422      .by_name("events.jsonl")
423      .map_err(|e| format!("missing events.jsonl in {}: {e}", path.display()))?;
424    let mut buf = String::new();
425    use std::io::Read;
426    events_file
427      .read_to_string(&mut buf)
428      .map_err(|e| format!("read jsonl: {e}"))?;
429    for (i, line) in buf.lines().enumerate() {
430      if line.trim().is_empty() {
431        continue;
432      }
433      let wire: WireEvent =
434        serde_json::from_str(line).map_err(|e| format!("parse line {i} in {}: {e}", path.display()))?;
435      if let Some(event) = wire.into_runtime() {
436        events.push(event);
437      }
438    }
439  }
440  Ok(events)
441}