assay_core/trace/
ingest.rs

1use anyhow::Context;
2use std::fs::File;
3use std::io::{BufReader, Write};
4use std::path::Path;
5
6pub struct IngestStats {
7    pub event_count: usize,
8}
9
10pub fn ingest_file(input: &Path, output: &Path) -> anyhow::Result<IngestStats> {
11    // Check output format
12    let is_sqlite = output.to_str() == Some(":memory:")
13        || output
14            .extension()
15            .is_some_and(|ext| ext == "db" || ext == "sqlite");
16
17    if is_sqlite {
18        let store = crate::storage::store::Store::open(output)?;
19        store.init_schema()?;
20        ingest_into_store(&store, input)
21    } else {
22        // Open input stream
23        let file = File::open(input).context("failed to open input file")?;
24        let reader = BufReader::new(file);
25
26        // Use Upgrader to stream events (V1->V2 or V2 passthrough)
27        let upgrader = super::upgrader::StreamUpgrader::new(reader);
28
29        // JSONL Output
30        let mut out_file = File::create(output).context("failed to create output file")?;
31        let mut count = 0;
32        for event_result in upgrader {
33            let event = event_result.context("failed to process trace entry")?;
34            let out_line = serde_json::to_string(&event)?;
35            writeln!(out_file, "{}", out_line)?;
36            count += 1;
37        }
38        Ok(IngestStats { event_count: count })
39    }
40}
41
42pub fn ingest_into_store(
43    store: &crate::storage::store::Store,
44    input: &Path,
45) -> anyhow::Result<IngestStats> {
46    // Open input stream
47    let file = File::open(input).context("failed to open input file")?;
48    let reader = BufReader::new(file);
49
50    // Use Upgrader to stream events (V1->V2 or V2 passthrough)
51    let upgrader = super::upgrader::StreamUpgrader::new(reader);
52
53    let mut count = 0;
54
55    // Ensure schema (idempotent) - caller usually does this but safe to repeat
56    store.init_schema()?;
57    let mut batch = Vec::with_capacity(1000);
58
59    for event_result in upgrader {
60        let event = event_result.context("failed to process trace entry")?;
61        batch.push(event);
62        count += 1;
63
64        if batch.len() >= 1000 {
65            store.insert_batch(&batch, None, None)?;
66            batch.clear();
67        }
68    }
69    if !batch.is_empty() {
70        store.insert_batch(&batch, None, None)?;
71    }
72
73    Ok(IngestStats { event_count: count })
74}