assay_core/trace/
ingest.rs1use anyhow::Context;
2use std::fs::File;
3use std::io::{BufReader, Write};
4use std::path::Path;
5
6pub struct IngestStats {
7 pub event_count: usize,
8}
9
10pub fn ingest_file(input: &Path, output: &Path) -> anyhow::Result<IngestStats> {
11 let is_sqlite = output.to_str() == Some(":memory:")
13 || output
14 .extension()
15 .is_some_and(|ext| ext == "db" || ext == "sqlite");
16
17 if is_sqlite {
18 let store = crate::storage::store::Store::open(output)?;
19 store.init_schema()?;
20 ingest_into_store(&store, input)
21 } else {
22 let file = File::open(input).context("failed to open input file")?;
24 let reader = BufReader::new(file);
25
26 let upgrader = super::upgrader::StreamUpgrader::new(reader);
28
29 let mut out_file = File::create(output).context("failed to create output file")?;
31 let mut count = 0;
32 for event_result in upgrader {
33 let event = event_result.context("failed to process trace entry")?;
34 let out_line = serde_json::to_string(&event)?;
35 writeln!(out_file, "{}", out_line)?;
36 count += 1;
37 }
38 Ok(IngestStats { event_count: count })
39 }
40}
41
42pub fn ingest_into_store(
43 store: &crate::storage::store::Store,
44 input: &Path,
45) -> anyhow::Result<IngestStats> {
46 let file = File::open(input).context("failed to open input file")?;
48 let reader = BufReader::new(file);
49
50 let upgrader = super::upgrader::StreamUpgrader::new(reader);
52
53 let mut count = 0;
54
55 store.init_schema()?;
57 let mut batch = Vec::with_capacity(1000);
58
59 for event_result in upgrader {
60 let event = event_result.context("failed to process trace entry")?;
61 batch.push(event);
62 count += 1;
63
64 if batch.len() >= 1000 {
65 store.insert_batch(&batch, None, None)?;
66 batch.clear();
67 }
68 }
69 if !batch.is_empty() {
70 store.insert_batch(&batch, None, None)?;
71 }
72
73 Ok(IngestStats { event_count: count })
74}