Skip to main content

trace_share_core/
pipeline.rs

1use anyhow::Result;
2use serde::{Deserialize, Serialize};
3use std::{
4    collections::HashMap,
5    fs,
6    io::{BufWriter, Write},
7    path::{Path, PathBuf},
8};
9use uuid::Uuid;
10
11use crate::{
12    config::{AppConfig, ensure_dirs},
13    consent::require_consent,
14    episode::build_episodes,
15    parser::{parse_jsonl_file_from_offset, parse_source_file},
16    publish::index_episode_pointer,
17    sanitize::{SanitizationReport, sanitize_events},
18    sources::{SourceDef, discover_files, resolve_sources},
19    state::{RunStats, StateStore},
20    worker::upload_episode,
21};
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
24pub struct RunOptions {
25    pub sources: Vec<String>,
26    pub dry_run: bool,
27    pub review: bool,
28    pub yes: bool,
29    pub include_raw: bool,
30    pub show_payload: bool,
31    pub preview_limit: usize,
32    pub explain_size: bool,
33    pub export_payload_path: Option<PathBuf>,
34    pub export_limit: Option<usize>,
35    pub max_upload_bytes: Option<u64>,
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize, Default)]
39pub struct SourceSizeStats {
40    pub source: String,
41    pub scanned_files: usize,
42    pub input_file_bytes: u64,
43    pub parsed_event_text_bytes: u64,
44    pub sanitized_event_text_bytes: u64,
45    pub episode_payload_bytes: u64,
46    pub would_upload_docs: usize,
47    pub skipped_existing_docs: usize,
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct RunResult {
52    pub scanned_files: usize,
53    pub produced_docs: usize,     // episode count
54    pub uploaded_docs: usize,     // uploaded episode count
55    pub would_upload_docs: usize, // would-upload episode count
56    pub skipped_existing_docs: usize,
57    pub capped_docs: usize,
58    pub redactions: usize,
59    pub would_upload_bytes: u64,
60    pub uploaded_bytes: u64,
61    pub capped_bytes: u64,
62    pub by_source: HashMap<String, usize>,
63    pub payload_preview: Vec<crate::episode::EpisodeRecord>,
64    pub source_size_stats: Vec<SourceSizeStats>,
65    pub exported_payload_docs: usize,
66}
67
68#[derive(Debug, Clone, Default, Serialize, Deserialize)]
69struct SourceCursor {
70    files: HashMap<String, FileCursor>,
71}
72
73#[derive(Debug, Clone, Serialize, Deserialize)]
74struct FileCursor {
75    last_byte_offset: u64,
76    file_fingerprint: String,
77}
78
79pub async fn run_once(config: &AppConfig, options: &RunOptions) -> Result<RunResult> {
80    if !options.dry_run && (!options.yes || !options.review) {
81        anyhow::bail!("run uploads require --review and --yes (or use --dry-run)");
82    }
83
84    ensure_dirs()?;
85    let store = StateStore::open_default()?;
86    let consent = require_consent(&store)?;
87    let run_id = Uuid::new_v4().to_string();
88    store.start_run(&run_id)?;
89
90    let selected_sources = select_sources(resolve_sources(config).await?, &options.sources);
91
92    let mut scanned_files = 0usize;
93    let mut produced_docs = 0usize;
94    let mut uploaded_docs = 0usize;
95    let mut would_upload_docs = 0usize;
96    let mut skipped_existing_docs = 0usize;
97    let mut capped_docs = 0usize;
98    let mut redactions = 0usize;
99    let mut would_upload_bytes = 0u64;
100    let mut uploaded_bytes = 0u64;
101    let mut capped_bytes = 0u64;
102    let mut by_source = HashMap::new();
103    let mut payload_preview = Vec::new();
104    let mut source_size_stats = Vec::new();
105
106    let mut export_writer = if let Some(path) = &options.export_payload_path {
107        if let Some(parent) = path.parent() {
108            fs::create_dir_all(parent)?;
109        }
110        Some(BufWriter::new(fs::File::create(path)?))
111    } else {
112        None
113    };
114    let export_limit = options.export_limit.unwrap_or(usize::MAX);
115    let mut exported_payload_docs = 0usize;
116
117    for source in selected_sources {
118        let files = discover_files(&source)?;
119        let mut source_docs = 0usize;
120        let mut source_cursor = load_source_cursor(&store, &source.id)?;
121        let mut source_stats = SourceSizeStats {
122            source: source.id.clone(),
123            ..Default::default()
124        };
125
126        for file in files {
127            scanned_files += 1;
128            source_stats.scanned_files += 1;
129            let path_str = file.to_string_lossy().to_string();
130            let fingerprint = file_fingerprint(&file)?;
131            if let Ok(md) = fs::metadata(&file) {
132                source_stats.input_file_bytes += md.len();
133            }
134
135            let prior = source_cursor.files.get(&path_str);
136            let parsed: Result<(Vec<crate::models::CanonicalEvent>, u64)> =
137                if source.format == "jsonl" {
138                    let start_offset = prior
139                        .filter(|c| c.file_fingerprint == fingerprint)
140                        .map(|c| c.last_byte_offset)
141                        .unwrap_or(0);
142                    parse_jsonl_file_from_offset(&file, &source.id, start_offset)
143                } else {
144                    Ok((
145                        parse_source_file(
146                            &file,
147                            &source.id,
148                            &source.format,
149                            source.parser_hint.as_deref(),
150                        )?,
151                        0,
152                    ))
153                };
154            let (events, next_offset) = match parsed {
155                Ok(v) => v,
156                Err(e) => {
157                    eprintln!(
158                        "[warn] source={} file={} parse failed: {}",
159                        source.id,
160                        file.display(),
161                        e
162                    );
163                    continue;
164                }
165            };
166
167            source_cursor.files.insert(
168                path_str.clone(),
169                FileCursor {
170                    last_byte_offset: next_offset,
171                    file_fingerprint: fingerprint.clone(),
172                },
173            );
174
175            if events.is_empty() {
176                store.upsert_file_fingerprint(&path_str, &fingerprint)?;
177                continue;
178            }
179            source_stats.parsed_event_text_bytes +=
180                events.iter().map(|e| e.text.len() as u64).sum::<u64>();
181
182            let (sanitized, report): (_, SanitizationReport) = sanitize_events(&events);
183            redactions += report.total_redactions;
184            source_stats.sanitized_event_text_bytes +=
185                sanitized.iter().map(|e| e.text.len() as u64).sum::<u64>();
186
187            if options.review {
188                print_review(&source.id, &file, &report);
189            }
190
191            let mut episodes = build_episodes(
192                &source.id,
193                &events[0].session_id,
194                &sanitized,
195                options.include_raw,
196                &consent.accepted_at,
197                &consent.consent_version,
198                &consent.license,
199                "policy-v1",
200                "sanitizer-v1",
201            );
202
203            for episode in &mut episodes {
204                episode.session_id = crate::publish::hash_identifier(
205                    &crate::publish::load_or_create_anonymization_salt()?,
206                    &episode.session_id,
207                );
208            }
209
210            produced_docs += episodes.len();
211            source_docs += episodes.len();
212
213            for episode in episodes {
214                if store.has_episode_upload(&episode.id)? {
215                    skipped_existing_docs += 1;
216                    source_stats.skipped_existing_docs += 1;
217                    continue;
218                }
219
220                let episode_bytes = serde_json::to_vec(&episode)?.len() as u64;
221                source_stats.episode_payload_bytes += episode_bytes;
222                would_upload_docs += 1;
223                source_stats.would_upload_docs += 1;
224                would_upload_bytes += episode_bytes;
225
226                if (options.dry_run || options.show_payload)
227                    && payload_preview.len() < options.preview_limit
228                {
229                    payload_preview.push(episode.clone());
230                }
231                if let Some(writer) = export_writer.as_mut() {
232                    if exported_payload_docs < export_limit {
233                        serde_json::to_writer(&mut *writer, &episode)?;
234                        writer.write_all(b"\n")?;
235                        exported_payload_docs += 1;
236                    }
237                }
238
239                if !options.dry_run {
240                    if let Some(limit) = options.max_upload_bytes {
241                        if limit > 0 && uploaded_bytes + episode_bytes > limit {
242                            capped_docs += 1;
243                            capped_bytes += episode_bytes;
244                            continue;
245                        }
246                    }
247
248                    let upload = upload_episode(config, &episode).await?;
249                    index_episode_pointer(config, &episode, &upload.object_key, None).await?;
250                    uploaded_docs += 1;
251                    uploaded_bytes += episode_bytes;
252                    store.upsert_episode_upload(
253                        &episode.id,
254                        &episode.content_hash,
255                        &episode.source_tool,
256                        &episode.session_id,
257                        &upload.object_key,
258                        &episode.consent.consent_version,
259                        &episode.license,
260                    )?;
261                }
262            }
263
264            store.upsert_file_fingerprint(&path_str, &fingerprint)?;
265        }
266
267        let cursor_json = serde_json::to_string(&source_cursor)?;
268        store.upsert_source_cursor(&source.id, &cursor_json)?;
269        by_source.insert(source.id, source_docs);
270        source_size_stats.push(source_stats);
271    }
272
273    if let Some(writer) = export_writer.as_mut() {
274        writer.flush()?;
275    }
276
277    store.finish_run(&RunStats {
278        run_id,
279        scanned_files,
280        produced_docs,
281        uploaded_docs,
282        redactions,
283        errors: 0,
284    })?;
285
286    Ok(RunResult {
287        scanned_files,
288        produced_docs,
289        uploaded_docs,
290        would_upload_docs,
291        skipped_existing_docs,
292        capped_docs,
293        redactions,
294        would_upload_bytes,
295        uploaded_bytes,
296        capped_bytes,
297        by_source,
298        payload_preview,
299        source_size_stats,
300        exported_payload_docs,
301    })
302}
303
304fn select_sources(all: Vec<SourceDef>, only: &[String]) -> Vec<SourceDef> {
305    if only.is_empty() {
306        return all;
307    }
308    all.into_iter()
309        .filter(|s| only.iter().any(|x| x == &s.id))
310        .collect()
311}
312
313fn file_fingerprint(path: &Path) -> Result<String> {
314    let md = fs::metadata(path)?;
315    let size = md.len();
316    let modified = md
317        .modified()
318        .ok()
319        .and_then(|m| m.duration_since(std::time::UNIX_EPOCH).ok())
320        .map(|d| d.as_secs())
321        .unwrap_or_default();
322    let seed = format!("{}:{}", size, modified);
323    Ok(blake3::hash(seed.as_bytes()).to_hex().to_string())
324}
325
326fn load_source_cursor(store: &StateStore, source_id: &str) -> Result<SourceCursor> {
327    if let Some(raw) = store.source_cursor(source_id)? {
328        if raw.trim().is_empty() {
329            return Ok(SourceCursor::default());
330        }
331        if let Ok(cursor) = serde_json::from_str::<SourceCursor>(&raw) {
332            return Ok(cursor);
333        }
334    }
335    Ok(SourceCursor::default())
336}
337
338fn print_review(source_id: &str, path: &Path, report: &SanitizationReport) {
339    println!("[review] source={source_id} file={}", path.display());
340    println!(
341        "[review] redactions total={} secrets={} email={} ip={} path={}",
342        report.total_redactions,
343        report.secret_redactions,
344        report.email_redactions,
345        report.ip_redactions,
346        report.path_redactions,
347    );
348    for (idx, sample) in report.sample_redacted.iter().enumerate() {
349        println!("[review][sample:{}] {}", idx + 1, sample);
350    }
351}