1use anyhow::{Context, Result, bail};
2use glob::glob;
3use std::{
4 collections::BTreeSet,
5 fs,
6 io::{BufRead, BufReader, BufWriter, Write},
7 path::{Path, PathBuf},
8};
9
10use crate::{
11 config::{AppConfig, ensure_dirs},
12 consent::require_consent,
13 episode::{EpisodeRecord, build_episodes},
14 models::CanonicalEvent,
15 parser::parse_jsonl_file,
16 publish::index_episode_pointer,
17 sanitize::{SanitizationReport, sanitize_events},
18 state::StateStore,
19 worker::upload_episode,
20};
21
22#[derive(Debug, Clone)]
23pub struct ScanResult {
24 pub input_files: usize,
25 pub produced_events: usize,
26 pub output_file: PathBuf,
27}
28
29#[derive(Debug, Clone)]
30pub struct SanitizeResult {
31 pub input_events: usize,
32 pub output_events: usize,
33 pub output_file: PathBuf,
34 pub report_file: PathBuf,
35 pub report: SanitizationReport,
36}
37
38#[derive(Debug, Clone)]
39pub struct PublishResult {
40 pub produced_docs: usize,
41 pub would_upload_docs: usize,
42 pub uploaded_docs: usize,
43 pub skipped_existing_docs: usize,
44 pub capped_docs: usize,
45 pub would_upload_bytes: u64,
46 pub uploaded_bytes: u64,
47 pub capped_bytes: u64,
48}
49
50pub fn scan_to_dir(input: &str, out_dir: &Path) -> Result<ScanResult> {
51 ensure_dirs()?;
52 fs::create_dir_all(out_dir)?;
53
54 let input_files = collect_input_files(input)?;
55 if input_files.is_empty() {
56 bail!("no files found for input: {input}");
57 }
58
59 let output_file = out_dir.join("canonical_events.jsonl");
60 let mut writer = BufWriter::new(fs::File::create(&output_file)?);
61
62 let mut produced_events = 0usize;
63 for path in &input_files {
64 let events = parse_jsonl_file(path, "manual_scan")?;
65 for event in events {
66 serde_json::to_writer(&mut writer, &event)?;
67 writer.write_all(b"\n")?;
68 produced_events += 1;
69 }
70 }
71 writer.flush()?;
72
73 let summary = serde_json::json!({
74 "input_files": input_files.len(),
75 "produced_events": produced_events,
76 "output_file": output_file,
77 });
78 fs::write(
79 out_dir.join("scan_summary.json"),
80 serde_json::to_vec_pretty(&summary)?,
81 )?;
82
83 Ok(ScanResult {
84 input_files: input_files.len(),
85 produced_events,
86 output_file,
87 })
88}
89
90pub fn sanitize_to_dir(
91 input: &Path,
92 out_dir: &Path,
93 _policy: Option<&Path>,
94) -> Result<SanitizeResult> {
95 ensure_dirs()?;
96 fs::create_dir_all(out_dir)?;
97
98 let events = read_canonical_events(input)?;
99 let input_events = events.len();
100 let (sanitized, report) = sanitize_events(&events);
101
102 let output_file = out_dir.join("sanitized_events.jsonl");
103 let mut writer = BufWriter::new(fs::File::create(&output_file)?);
104 for event in &sanitized {
105 serde_json::to_writer(&mut writer, event)?;
106 writer.write_all(b"\n")?;
107 }
108 writer.flush()?;
109
110 let report_file = out_dir.join("redaction_report.json");
111 fs::write(&report_file, serde_json::to_vec_pretty(&report)?)?;
112
113 Ok(SanitizeResult {
114 input_events,
115 output_events: sanitized.len(),
116 output_file,
117 report_file,
118 report,
119 })
120}
121
122#[allow(clippy::too_many_arguments)]
123pub async fn publish_from_input(
124 config: &AppConfig,
125 input: &Path,
126 namespace: Option<&str>,
127 dry_run: bool,
128 review: bool,
129 yes: bool,
130 include_raw: bool,
131 max_upload_bytes: Option<u64>,
132) -> Result<PublishResult> {
133 if !dry_run && (!yes || !review) {
134 bail!("publish requires --review and --yes unless --dry-run");
135 }
136
137 ensure_dirs()?;
138 let store = StateStore::open_default()?;
139
140 let mut episodes = read_episode_records(input)?;
141 if episodes.is_empty() {
142 let consent = require_consent(&store)?;
143 let events = read_canonical_events(input)?;
144 if !events.is_empty() {
145 let built = build_episodes(
146 "manual_scan",
147 &events[0].session_id,
148 &events,
149 include_raw,
150 &consent.accepted_at,
151 &consent.consent_version,
152 &consent.license,
153 "policy-v1",
154 "sanitizer-v1",
155 );
156 episodes.extend(built);
157 }
158 }
159
160 if let Some(ns) = namespace {
161 for ep in &mut episodes {
162 ep.source_tool = format!("{ns}:{}", ep.source_tool);
163 }
164 }
165
166 let produced_docs = episodes.len();
167 let mut would_upload_docs = 0usize;
168 let mut uploaded_docs = 0usize;
169 let mut skipped_existing_docs = 0usize;
170 let mut capped_docs = 0usize;
171 let mut would_upload_bytes = 0u64;
172 let mut uploaded_bytes = 0u64;
173 let mut capped_bytes = 0u64;
174
175 for (idx, episode) in episodes.iter().enumerate() {
176 if store.has_episode_upload(&episode.id)? {
177 skipped_existing_docs += 1;
178 continue;
179 }
180 let episode_bytes = serde_json::to_vec(episode)?.len() as u64;
181 would_upload_docs += 1;
182 would_upload_bytes += episode_bytes;
183
184 if review && idx < 5 {
185 println!(
186 "[review] episode_id={} source_tool={} ts_start={}",
187 episode.id, episode.source_tool, episode.ts_start
188 );
189 let preview = if episode.result.len() > 240 {
190 format!("{}...", &episode.result[..240])
191 } else {
192 episode.result.clone()
193 };
194 println!("[review] text_preview={}", preview.replace('\n', " "));
195 }
196
197 if dry_run {
198 continue;
199 }
200
201 if let Some(limit) = max_upload_bytes {
202 if limit > 0 && uploaded_bytes + episode_bytes > limit {
203 capped_docs += 1;
204 capped_bytes += episode_bytes;
205 continue;
206 }
207 }
208
209 let upload = upload_episode(config, episode).await?;
210 index_episode_pointer(config, episode, &upload.object_key, None).await?;
211 store.upsert_episode_upload(
212 &episode.id,
213 &episode.content_hash,
214 &episode.source_tool,
215 &episode.session_id,
216 &upload.object_key,
217 &episode.consent.consent_version,
218 &episode.license,
219 )?;
220 uploaded_docs += 1;
221 uploaded_bytes += episode_bytes;
222 }
223
224 Ok(PublishResult {
225 produced_docs,
226 would_upload_docs,
227 uploaded_docs,
228 skipped_existing_docs,
229 capped_docs,
230 would_upload_bytes,
231 uploaded_bytes,
232 capped_bytes,
233 })
234}
235
236fn read_canonical_events(input: &Path) -> Result<Vec<CanonicalEvent>> {
237 let files = collect_files_from_path(input)?;
238 let mut out = Vec::new();
239 for path in files {
240 let file = fs::File::open(&path)?;
241 let reader = BufReader::new(file);
242 for line in reader.lines() {
243 let line = line?;
244 if line.trim().is_empty() {
245 continue;
246 }
247 if let Ok(event) = serde_json::from_str::<CanonicalEvent>(&line) {
248 out.push(event);
249 }
250 }
251 }
252 Ok(out)
253}
254
255fn read_episode_records(input: &Path) -> Result<Vec<EpisodeRecord>> {
256 let files = collect_files_from_path(input)?;
257 let mut out = Vec::new();
258 for path in files {
259 let file = fs::File::open(&path)?;
260 let reader = BufReader::new(file);
261 for line in reader.lines() {
262 let line = line?;
263 if line.trim().is_empty() {
264 continue;
265 }
266 if let Ok(ep) = serde_json::from_str::<EpisodeRecord>(&line) {
267 out.push(ep);
268 }
269 }
270 }
271 Ok(out)
272}
273
274fn collect_input_files(input: &str) -> Result<Vec<PathBuf>> {
275 let mut files = BTreeSet::new();
276
277 let has_glob = input.contains('*') || input.contains('?') || input.contains('[');
278 if has_glob {
279 for path in glob(input)
280 .with_context(|| format!("invalid input glob: {input}"))?
281 .flatten()
282 {
283 if path.is_file() {
284 files.insert(path);
285 }
286 }
287 return Ok(files.into_iter().collect());
288 }
289
290 let path = PathBuf::from(input);
291 if path.is_file() {
292 files.insert(path);
293 } else if path.is_dir() {
294 for entry in ignore::WalkBuilder::new(&path)
295 .hidden(false)
296 .git_ignore(false)
297 .build()
298 {
299 let entry = match entry {
300 Ok(v) => v,
301 Err(_) => continue,
302 };
303 if entry.file_type().map(|ft| ft.is_file()).unwrap_or(false) {
304 files.insert(entry.path().to_path_buf());
305 }
306 }
307 }
308
309 Ok(files.into_iter().collect())
310}
311
312fn collect_files_from_path(input: &Path) -> Result<Vec<PathBuf>> {
313 let mut files = Vec::new();
314 if input.is_file() {
315 files.push(input.to_path_buf());
316 return Ok(files);
317 }
318
319 if input.is_dir() {
320 for entry in ignore::WalkBuilder::new(input)
321 .hidden(false)
322 .git_ignore(false)
323 .build()
324 {
325 let entry = match entry {
326 Ok(v) => v,
327 Err(_) => continue,
328 };
329 if entry.file_type().map(|ft| ft.is_file()).unwrap_or(false) {
330 files.push(entry.path().to_path_buf());
331 }
332 }
333 files.sort();
334 files.dedup();
335 return Ok(files);
336 }
337
338 bail!("input path does not exist: {}", input.display())
339}