1use anyhow::Result;
2use serde::{Deserialize, Serialize};
3use std::{
4 collections::HashMap,
5 fs,
6 io::{BufWriter, Write},
7 path::{Path, PathBuf},
8};
9use uuid::Uuid;
10
11use crate::{
12 config::{AppConfig, ensure_dirs},
13 consent::require_consent,
14 episode::build_episodes,
15 parser::{parse_jsonl_file_from_offset, parse_source_file},
16 publish::index_episode_pointer,
17 sanitize::{SanitizationReport, sanitize_events},
18 sources::{SourceDef, discover_files, resolve_sources},
19 state::{RunStats, StateStore},
20 worker::upload_episode,
21};
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
24pub struct RunOptions {
25 pub sources: Vec<String>,
26 pub dry_run: bool,
27 pub review: bool,
28 pub yes: bool,
29 pub include_raw: bool,
30 pub show_payload: bool,
31 pub preview_limit: usize,
32 pub explain_size: bool,
33 pub export_payload_path: Option<PathBuf>,
34 pub export_limit: Option<usize>,
35 pub max_upload_bytes: Option<u64>,
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize, Default)]
39pub struct SourceSizeStats {
40 pub source: String,
41 pub scanned_files: usize,
42 pub input_file_bytes: u64,
43 pub parsed_event_text_bytes: u64,
44 pub sanitized_event_text_bytes: u64,
45 pub episode_payload_bytes: u64,
46 pub would_upload_docs: usize,
47 pub skipped_existing_docs: usize,
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct RunResult {
52 pub scanned_files: usize,
53 pub produced_docs: usize, pub uploaded_docs: usize, pub would_upload_docs: usize, pub skipped_existing_docs: usize,
57 pub capped_docs: usize,
58 pub redactions: usize,
59 pub would_upload_bytes: u64,
60 pub uploaded_bytes: u64,
61 pub capped_bytes: u64,
62 pub by_source: HashMap<String, usize>,
63 pub payload_preview: Vec<crate::episode::EpisodeRecord>,
64 pub source_size_stats: Vec<SourceSizeStats>,
65 pub exported_payload_docs: usize,
66}
67
68#[derive(Debug, Clone, Default, Serialize, Deserialize)]
69struct SourceCursor {
70 files: HashMap<String, FileCursor>,
71}
72
73#[derive(Debug, Clone, Serialize, Deserialize)]
74struct FileCursor {
75 last_byte_offset: u64,
76 file_fingerprint: String,
77}
78
79pub async fn run_once(config: &AppConfig, options: &RunOptions) -> Result<RunResult> {
80 if !options.dry_run && (!options.yes || !options.review) {
81 anyhow::bail!("run uploads require --review and --yes (or use --dry-run)");
82 }
83
84 ensure_dirs()?;
85 let store = StateStore::open_default()?;
86 let consent = require_consent(&store)?;
87 let run_id = Uuid::new_v4().to_string();
88 store.start_run(&run_id)?;
89
90 let selected_sources = select_sources(resolve_sources(config).await?, &options.sources);
91
92 let mut scanned_files = 0usize;
93 let mut produced_docs = 0usize;
94 let mut uploaded_docs = 0usize;
95 let mut would_upload_docs = 0usize;
96 let mut skipped_existing_docs = 0usize;
97 let mut capped_docs = 0usize;
98 let mut redactions = 0usize;
99 let mut would_upload_bytes = 0u64;
100 let mut uploaded_bytes = 0u64;
101 let mut capped_bytes = 0u64;
102 let mut by_source = HashMap::new();
103 let mut payload_preview = Vec::new();
104 let mut source_size_stats = Vec::new();
105
106 let mut export_writer = if let Some(path) = &options.export_payload_path {
107 if let Some(parent) = path.parent() {
108 fs::create_dir_all(parent)?;
109 }
110 Some(BufWriter::new(fs::File::create(path)?))
111 } else {
112 None
113 };
114 let export_limit = options.export_limit.unwrap_or(usize::MAX);
115 let mut exported_payload_docs = 0usize;
116
117 for source in selected_sources {
118 let files = discover_files(&source)?;
119 let mut source_docs = 0usize;
120 let mut source_cursor = load_source_cursor(&store, &source.id)?;
121 let mut source_stats = SourceSizeStats {
122 source: source.id.clone(),
123 ..Default::default()
124 };
125
126 for file in files {
127 scanned_files += 1;
128 source_stats.scanned_files += 1;
129 let path_str = file.to_string_lossy().to_string();
130 let fingerprint = file_fingerprint(&file)?;
131 if let Ok(md) = fs::metadata(&file) {
132 source_stats.input_file_bytes += md.len();
133 }
134
135 let prior = source_cursor.files.get(&path_str);
136 let parsed: Result<(Vec<crate::models::CanonicalEvent>, u64)> =
137 if source.format == "jsonl" {
138 let start_offset = prior
139 .filter(|c| c.file_fingerprint == fingerprint)
140 .map(|c| c.last_byte_offset)
141 .unwrap_or(0);
142 parse_jsonl_file_from_offset(&file, &source.id, start_offset)
143 } else {
144 Ok((
145 parse_source_file(
146 &file,
147 &source.id,
148 &source.format,
149 source.parser_hint.as_deref(),
150 )?,
151 0,
152 ))
153 };
154 let (events, next_offset) = match parsed {
155 Ok(v) => v,
156 Err(e) => {
157 eprintln!(
158 "[warn] source={} file={} parse failed: {}",
159 source.id,
160 file.display(),
161 e
162 );
163 continue;
164 }
165 };
166
167 source_cursor.files.insert(
168 path_str.clone(),
169 FileCursor {
170 last_byte_offset: next_offset,
171 file_fingerprint: fingerprint.clone(),
172 },
173 );
174
175 if events.is_empty() {
176 store.upsert_file_fingerprint(&path_str, &fingerprint)?;
177 continue;
178 }
179 source_stats.parsed_event_text_bytes +=
180 events.iter().map(|e| e.text.len() as u64).sum::<u64>();
181
182 let (sanitized, report): (_, SanitizationReport) = sanitize_events(&events);
183 redactions += report.total_redactions;
184 source_stats.sanitized_event_text_bytes +=
185 sanitized.iter().map(|e| e.text.len() as u64).sum::<u64>();
186
187 if options.review {
188 print_review(&source.id, &file, &report);
189 }
190
191 let mut episodes = build_episodes(
192 &source.id,
193 &events[0].session_id,
194 &sanitized,
195 options.include_raw,
196 &consent.accepted_at,
197 &consent.consent_version,
198 &consent.license,
199 "policy-v1",
200 "sanitizer-v1",
201 );
202
203 for episode in &mut episodes {
204 episode.session_id = crate::publish::hash_identifier(
205 &crate::publish::load_or_create_anonymization_salt()?,
206 &episode.session_id,
207 );
208 }
209
210 produced_docs += episodes.len();
211 source_docs += episodes.len();
212
213 for episode in episodes {
214 if store.has_episode_upload(&episode.id)? {
215 skipped_existing_docs += 1;
216 source_stats.skipped_existing_docs += 1;
217 continue;
218 }
219
220 let episode_bytes = serde_json::to_vec(&episode)?.len() as u64;
221 source_stats.episode_payload_bytes += episode_bytes;
222 would_upload_docs += 1;
223 source_stats.would_upload_docs += 1;
224 would_upload_bytes += episode_bytes;
225
226 if (options.dry_run || options.show_payload)
227 && payload_preview.len() < options.preview_limit
228 {
229 payload_preview.push(episode.clone());
230 }
231 if let Some(writer) = export_writer.as_mut() {
232 if exported_payload_docs < export_limit {
233 serde_json::to_writer(&mut *writer, &episode)?;
234 writer.write_all(b"\n")?;
235 exported_payload_docs += 1;
236 }
237 }
238
239 if !options.dry_run {
240 if let Some(limit) = options.max_upload_bytes {
241 if limit > 0 && uploaded_bytes + episode_bytes > limit {
242 capped_docs += 1;
243 capped_bytes += episode_bytes;
244 continue;
245 }
246 }
247
248 let upload = upload_episode(config, &episode).await?;
249 index_episode_pointer(config, &episode, &upload.object_key, None).await?;
250 uploaded_docs += 1;
251 uploaded_bytes += episode_bytes;
252 store.upsert_episode_upload(
253 &episode.id,
254 &episode.content_hash,
255 &episode.source_tool,
256 &episode.session_id,
257 &upload.object_key,
258 &episode.consent.consent_version,
259 &episode.license,
260 )?;
261 }
262 }
263
264 store.upsert_file_fingerprint(&path_str, &fingerprint)?;
265 }
266
267 let cursor_json = serde_json::to_string(&source_cursor)?;
268 store.upsert_source_cursor(&source.id, &cursor_json)?;
269 by_source.insert(source.id, source_docs);
270 source_size_stats.push(source_stats);
271 }
272
273 if let Some(writer) = export_writer.as_mut() {
274 writer.flush()?;
275 }
276
277 store.finish_run(&RunStats {
278 run_id,
279 scanned_files,
280 produced_docs,
281 uploaded_docs,
282 redactions,
283 errors: 0,
284 })?;
285
286 Ok(RunResult {
287 scanned_files,
288 produced_docs,
289 uploaded_docs,
290 would_upload_docs,
291 skipped_existing_docs,
292 capped_docs,
293 redactions,
294 would_upload_bytes,
295 uploaded_bytes,
296 capped_bytes,
297 by_source,
298 payload_preview,
299 source_size_stats,
300 exported_payload_docs,
301 })
302}
303
304fn select_sources(all: Vec<SourceDef>, only: &[String]) -> Vec<SourceDef> {
305 if only.is_empty() {
306 return all;
307 }
308 all.into_iter()
309 .filter(|s| only.iter().any(|x| x == &s.id))
310 .collect()
311}
312
313fn file_fingerprint(path: &Path) -> Result<String> {
314 let md = fs::metadata(path)?;
315 let size = md.len();
316 let modified = md
317 .modified()
318 .ok()
319 .and_then(|m| m.duration_since(std::time::UNIX_EPOCH).ok())
320 .map(|d| d.as_secs())
321 .unwrap_or_default();
322 let seed = format!("{}:{}", size, modified);
323 Ok(blake3::hash(seed.as_bytes()).to_hex().to_string())
324}
325
326fn load_source_cursor(store: &StateStore, source_id: &str) -> Result<SourceCursor> {
327 if let Some(raw) = store.source_cursor(source_id)? {
328 if raw.trim().is_empty() {
329 return Ok(SourceCursor::default());
330 }
331 if let Ok(cursor) = serde_json::from_str::<SourceCursor>(&raw) {
332 return Ok(cursor);
333 }
334 }
335 Ok(SourceCursor::default())
336}
337
338fn print_review(source_id: &str, path: &Path, report: &SanitizationReport) {
339 println!("[review] source={source_id} file={}", path.display());
340 println!(
341 "[review] redactions total={} secrets={} email={} ip={} path={}",
342 report.total_redactions,
343 report.secret_redactions,
344 report.email_redactions,
345 report.ip_redactions,
346 report.path_redactions,
347 );
348 for (idx, sample) in report.sample_redacted.iter().enumerate() {
349 println!("[review][sample:{}] {}", idx + 1, sample);
350 }
351}