Skip to main content

trace_share_core/
split_pipeline.rs

1use anyhow::{Context, Result, bail};
2use glob::glob;
3use std::{
4    collections::BTreeSet,
5    fs,
6    io::{BufRead, BufReader, BufWriter, Write},
7    path::{Path, PathBuf},
8};
9
10use crate::{
11    config::{AppConfig, ensure_dirs},
12    consent::require_consent,
13    episode::{EpisodeRecord, build_episodes},
14    models::CanonicalEvent,
15    parser::parse_jsonl_file,
16    publish::index_episode_pointer,
17    sanitize::{SanitizationReport, sanitize_events},
18    state::StateStore,
19    worker::upload_episode,
20};
21
22#[derive(Debug, Clone)]
23pub struct ScanResult {
24    pub input_files: usize,
25    pub produced_events: usize,
26    pub output_file: PathBuf,
27}
28
29#[derive(Debug, Clone)]
30pub struct SanitizeResult {
31    pub input_events: usize,
32    pub output_events: usize,
33    pub output_file: PathBuf,
34    pub report_file: PathBuf,
35    pub report: SanitizationReport,
36}
37
38#[derive(Debug, Clone)]
39pub struct PublishResult {
40    pub produced_docs: usize,
41    pub would_upload_docs: usize,
42    pub uploaded_docs: usize,
43    pub skipped_existing_docs: usize,
44    pub capped_docs: usize,
45    pub would_upload_bytes: u64,
46    pub uploaded_bytes: u64,
47    pub capped_bytes: u64,
48}
49
50pub fn scan_to_dir(input: &str, out_dir: &Path) -> Result<ScanResult> {
51    ensure_dirs()?;
52    fs::create_dir_all(out_dir)?;
53
54    let input_files = collect_input_files(input)?;
55    if input_files.is_empty() {
56        bail!("no files found for input: {input}");
57    }
58
59    let output_file = out_dir.join("canonical_events.jsonl");
60    let mut writer = BufWriter::new(fs::File::create(&output_file)?);
61
62    let mut produced_events = 0usize;
63    for path in &input_files {
64        let events = parse_jsonl_file(path, "manual_scan")?;
65        for event in events {
66            serde_json::to_writer(&mut writer, &event)?;
67            writer.write_all(b"\n")?;
68            produced_events += 1;
69        }
70    }
71    writer.flush()?;
72
73    let summary = serde_json::json!({
74        "input_files": input_files.len(),
75        "produced_events": produced_events,
76        "output_file": output_file,
77    });
78    fs::write(
79        out_dir.join("scan_summary.json"),
80        serde_json::to_vec_pretty(&summary)?,
81    )?;
82
83    Ok(ScanResult {
84        input_files: input_files.len(),
85        produced_events,
86        output_file,
87    })
88}
89
90pub fn sanitize_to_dir(
91    input: &Path,
92    out_dir: &Path,
93    _policy: Option<&Path>,
94) -> Result<SanitizeResult> {
95    ensure_dirs()?;
96    fs::create_dir_all(out_dir)?;
97
98    let events = read_canonical_events(input)?;
99    let input_events = events.len();
100    let (sanitized, report) = sanitize_events(&events);
101
102    let output_file = out_dir.join("sanitized_events.jsonl");
103    let mut writer = BufWriter::new(fs::File::create(&output_file)?);
104    for event in &sanitized {
105        serde_json::to_writer(&mut writer, event)?;
106        writer.write_all(b"\n")?;
107    }
108    writer.flush()?;
109
110    let report_file = out_dir.join("redaction_report.json");
111    fs::write(&report_file, serde_json::to_vec_pretty(&report)?)?;
112
113    Ok(SanitizeResult {
114        input_events,
115        output_events: sanitized.len(),
116        output_file,
117        report_file,
118        report,
119    })
120}
121
122#[allow(clippy::too_many_arguments)]
123pub async fn publish_from_input(
124    config: &AppConfig,
125    input: &Path,
126    namespace: Option<&str>,
127    dry_run: bool,
128    review: bool,
129    yes: bool,
130    include_raw: bool,
131    max_upload_bytes: Option<u64>,
132) -> Result<PublishResult> {
133    if !dry_run && (!yes || !review) {
134        bail!("publish requires --review and --yes unless --dry-run");
135    }
136
137    ensure_dirs()?;
138    let store = StateStore::open_default()?;
139
140    let mut episodes = read_episode_records(input)?;
141    if episodes.is_empty() {
142        let consent = require_consent(&store)?;
143        let events = read_canonical_events(input)?;
144        if !events.is_empty() {
145            let built = build_episodes(
146                "manual_scan",
147                &events[0].session_id,
148                &events,
149                include_raw,
150                &consent.accepted_at,
151                &consent.consent_version,
152                &consent.license,
153                "policy-v1",
154                "sanitizer-v1",
155            );
156            episodes.extend(built);
157        }
158    }
159
160    if let Some(ns) = namespace {
161        for ep in &mut episodes {
162            ep.source_tool = format!("{ns}:{}", ep.source_tool);
163        }
164    }
165
166    let produced_docs = episodes.len();
167    let mut would_upload_docs = 0usize;
168    let mut uploaded_docs = 0usize;
169    let mut skipped_existing_docs = 0usize;
170    let mut capped_docs = 0usize;
171    let mut would_upload_bytes = 0u64;
172    let mut uploaded_bytes = 0u64;
173    let mut capped_bytes = 0u64;
174
175    for (idx, episode) in episodes.iter().enumerate() {
176        if store.has_episode_upload(&episode.id)? {
177            skipped_existing_docs += 1;
178            continue;
179        }
180        let episode_bytes = serde_json::to_vec(episode)?.len() as u64;
181        would_upload_docs += 1;
182        would_upload_bytes += episode_bytes;
183
184        if review && idx < 5 {
185            println!(
186                "[review] episode_id={} source_tool={} ts_start={}",
187                episode.id, episode.source_tool, episode.ts_start
188            );
189            let preview = if episode.result.len() > 240 {
190                format!("{}...", &episode.result[..240])
191            } else {
192                episode.result.clone()
193            };
194            println!("[review] text_preview={}", preview.replace('\n', " "));
195        }
196
197        if dry_run {
198            continue;
199        }
200
201        if let Some(limit) = max_upload_bytes {
202            if limit > 0 && uploaded_bytes + episode_bytes > limit {
203                capped_docs += 1;
204                capped_bytes += episode_bytes;
205                continue;
206            }
207        }
208
209        let upload = upload_episode(config, episode).await?;
210        index_episode_pointer(config, episode, &upload.object_key, None).await?;
211        store.upsert_episode_upload(
212            &episode.id,
213            &episode.content_hash,
214            &episode.source_tool,
215            &episode.session_id,
216            &upload.object_key,
217            &episode.consent.consent_version,
218            &episode.license,
219        )?;
220        uploaded_docs += 1;
221        uploaded_bytes += episode_bytes;
222    }
223
224    Ok(PublishResult {
225        produced_docs,
226        would_upload_docs,
227        uploaded_docs,
228        skipped_existing_docs,
229        capped_docs,
230        would_upload_bytes,
231        uploaded_bytes,
232        capped_bytes,
233    })
234}
235
236fn read_canonical_events(input: &Path) -> Result<Vec<CanonicalEvent>> {
237    let files = collect_files_from_path(input)?;
238    let mut out = Vec::new();
239    for path in files {
240        let file = fs::File::open(&path)?;
241        let reader = BufReader::new(file);
242        for line in reader.lines() {
243            let line = line?;
244            if line.trim().is_empty() {
245                continue;
246            }
247            if let Ok(event) = serde_json::from_str::<CanonicalEvent>(&line) {
248                out.push(event);
249            }
250        }
251    }
252    Ok(out)
253}
254
255fn read_episode_records(input: &Path) -> Result<Vec<EpisodeRecord>> {
256    let files = collect_files_from_path(input)?;
257    let mut out = Vec::new();
258    for path in files {
259        let file = fs::File::open(&path)?;
260        let reader = BufReader::new(file);
261        for line in reader.lines() {
262            let line = line?;
263            if line.trim().is_empty() {
264                continue;
265            }
266            if let Ok(ep) = serde_json::from_str::<EpisodeRecord>(&line) {
267                out.push(ep);
268            }
269        }
270    }
271    Ok(out)
272}
273
274fn collect_input_files(input: &str) -> Result<Vec<PathBuf>> {
275    let mut files = BTreeSet::new();
276
277    let has_glob = input.contains('*') || input.contains('?') || input.contains('[');
278    if has_glob {
279        for path in glob(input)
280            .with_context(|| format!("invalid input glob: {input}"))?
281            .flatten()
282        {
283            if path.is_file() {
284                files.insert(path);
285            }
286        }
287        return Ok(files.into_iter().collect());
288    }
289
290    let path = PathBuf::from(input);
291    if path.is_file() {
292        files.insert(path);
293    } else if path.is_dir() {
294        for entry in ignore::WalkBuilder::new(&path)
295            .hidden(false)
296            .git_ignore(false)
297            .build()
298        {
299            let entry = match entry {
300                Ok(v) => v,
301                Err(_) => continue,
302            };
303            if entry.file_type().map(|ft| ft.is_file()).unwrap_or(false) {
304                files.insert(entry.path().to_path_buf());
305            }
306        }
307    }
308
309    Ok(files.into_iter().collect())
310}
311
312fn collect_files_from_path(input: &Path) -> Result<Vec<PathBuf>> {
313    let mut files = Vec::new();
314    if input.is_file() {
315        files.push(input.to_path_buf());
316        return Ok(files);
317    }
318
319    if input.is_dir() {
320        for entry in ignore::WalkBuilder::new(input)
321            .hidden(false)
322            .git_ignore(false)
323            .build()
324        {
325            let entry = match entry {
326                Ok(v) => v,
327                Err(_) => continue,
328            };
329            if entry.file_type().map(|ft| ft.is_file()).unwrap_or(false) {
330                files.push(entry.path().to_path_buf());
331            }
332        }
333        files.sort();
334        files.dedup();
335        return Ok(files);
336    }
337
338    bail!("input path does not exist: {}", input.display())
339}