1use anyhow::{Context, Result, anyhow};
2use serde::{Deserialize, Serialize};
3use serde_json::{Value, json};
4use std::collections::{HashMap, HashSet};
5use std::fs::{self, File, OpenOptions};
6use std::io::{Read, Write};
7use std::path::{Component, Path, PathBuf};
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::{Mutex, OnceLock};
10use std::time::{Duration, SystemTime, UNIX_EPOCH};
11
12const RAW_MIRROR_SCHEMA_VERSION: u32 = 1;
13const RAW_MIRROR_ROOT_DIR: &str = "raw-mirror";
14const RAW_MIRROR_VERSION_DIR: &str = "v1";
15const RAW_MIRROR_MANIFEST_KIND: &str = "cass_raw_session_mirror_v1";
16const RAW_MIRROR_HASH_ALGORITHM: &str = "blake3";
17const RAW_MIRROR_BLOB_EXTENSION: &str = "raw";
18
19static TEMP_NONCE: AtomicU64 = AtomicU64::new(0);
20static BLOB_CAPTURE_CACHE: OnceLock<Mutex<HashMap<RawMirrorBlobCacheKey, RawMirrorBlobRecord>>> =
21 OnceLock::new();
22static MANIFEST_UPDATE_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
23
24#[derive(Debug, Clone)]
25pub struct RawMirrorCaptureInput<'a> {
26 pub data_dir: &'a Path,
27 pub provider: &'a str,
28 pub source_id: &'a str,
29 pub origin_kind: &'a str,
30 pub origin_host: Option<&'a str>,
31 pub source_path: &'a Path,
32 pub db_links: &'a [RawMirrorDbLink],
33}
34
35#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
36pub struct RawMirrorCaptureRecord {
37 pub manifest_id: String,
38 pub manifest_relative_path: String,
39 pub blob_relative_path: String,
40 pub blob_blake3: String,
41 pub blob_size_bytes: u64,
42 pub captured_at_ms: i64,
43 pub source_mtime_ms: Option<i64>,
44 pub already_present: bool,
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
48pub struct RawMirrorDbLink {
49 pub conversation_id: Option<i64>,
50 pub message_count: Option<usize>,
51 pub source_path: Option<String>,
52 pub started_at_ms: Option<i64>,
53}
54
55#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
56pub struct RawMirrorStorageSummary {
57 pub initialized: bool,
58 pub root_path: String,
59 pub total_storage_bytes: u64,
60 pub manifest_count: u64,
61 pub manifest_bytes: u64,
62 pub unique_blob_count: u64,
63 pub total_blob_bytes: u64,
64 pub largest_blob_bytes: u64,
65 pub missing_blob_count: u64,
66 pub invalid_manifest_count: u64,
67 pub oldest_capture_at_ms: Option<i64>,
68 pub newest_capture_at_ms: Option<i64>,
69 pub oldest_source_mtime_ms: Option<i64>,
70 pub newest_source_mtime_ms: Option<i64>,
71}
72
73pub fn storage_summary(data_dir: &Path) -> RawMirrorStorageSummary {
74 let root = raw_mirror_root(data_dir);
75 let mut summary = RawMirrorStorageSummary {
76 root_path: root.display().to_string(),
77 ..RawMirrorStorageSummary::default()
78 };
79 let root_metadata = match fs::symlink_metadata(&root) {
80 Ok(metadata) => metadata,
81 Err(_) => return summary,
82 };
83 summary.initialized = true;
84 if root_metadata.file_type().is_symlink() || !root_metadata.is_dir() {
85 summary.invalid_manifest_count = 1;
86 return summary;
87 }
88
89 summary.total_storage_bytes = raw_mirror_dir_file_bytes(&root);
90
91 let manifests_dir = root.join("manifests");
92 let Ok(manifests_metadata) = fs::symlink_metadata(&manifests_dir) else {
93 return summary;
94 };
95 if manifests_metadata.file_type().is_symlink() || !manifests_metadata.is_dir() {
96 summary.invalid_manifest_count = summary.invalid_manifest_count.saturating_add(1);
97 return summary;
98 }
99 let entries = match fs::read_dir(&manifests_dir) {
100 Ok(entries) => entries,
101 Err(_) => return summary,
102 };
103 let mut seen_blobs = HashSet::new();
104 for entry in entries {
105 let Ok(entry) = entry else {
106 summary.invalid_manifest_count = summary.invalid_manifest_count.saturating_add(1);
107 continue;
108 };
109 let path = entry.path();
110 let manifest_metadata = match fs::symlink_metadata(&path) {
111 Ok(metadata) if metadata.is_file() && !metadata.file_type().is_symlink() => metadata,
112 _ => {
113 summary.invalid_manifest_count = summary.invalid_manifest_count.saturating_add(1);
114 continue;
115 }
116 };
117 summary.manifest_bytes = summary
118 .manifest_bytes
119 .saturating_add(manifest_metadata.len());
120 let manifest = match read_raw_mirror_manifest(&path) {
121 Ok(manifest) if manifest.manifest_kind == RAW_MIRROR_MANIFEST_KIND => manifest,
122 _ => {
123 summary.invalid_manifest_count = summary.invalid_manifest_count.saturating_add(1);
124 continue;
125 }
126 };
127 summary.manifest_count = summary.manifest_count.saturating_add(1);
128 merge_min_max(
129 &mut summary.oldest_capture_at_ms,
130 &mut summary.newest_capture_at_ms,
131 Some(manifest.captured_at_ms),
132 );
133 merge_min_max(
134 &mut summary.oldest_source_mtime_ms,
135 &mut summary.newest_source_mtime_ms,
136 manifest.source_mtime_ms,
137 );
138
139 let Some(blob_relative_path) = raw_mirror_blob_relative_path(&manifest.blob_blake3) else {
140 summary.invalid_manifest_count = summary.invalid_manifest_count.saturating_add(1);
141 continue;
142 };
143 if manifest.blob_relative_path != blob_relative_path {
144 summary.invalid_manifest_count = summary.invalid_manifest_count.saturating_add(1);
145 continue;
146 }
147
148 if !seen_blobs.insert(blob_relative_path.clone()) {
149 continue;
150 }
151 let blob_path = root.join(blob_relative_path);
152 match fs::symlink_metadata(&blob_path) {
153 Ok(metadata) if metadata.is_file() && !metadata.file_type().is_symlink() => {
154 let size = metadata.len();
155 summary.unique_blob_count = summary.unique_blob_count.saturating_add(1);
156 summary.total_blob_bytes = summary.total_blob_bytes.saturating_add(size);
157 summary.largest_blob_bytes = summary.largest_blob_bytes.max(size);
158 }
159 _ => {
160 summary.missing_blob_count = summary.missing_blob_count.saturating_add(1);
161 }
162 }
163 }
164
165 summary
166}
167
168#[derive(Debug, Clone, Default)]
169pub struct RawMirrorPruneOptions {
170 pub older_than_ms: Option<i64>,
171 pub max_size_bytes: Option<u64>,
172 pub keep_tags: Vec<String>,
173 pub safety_hold_down_ms: i64,
174 pub apply: bool,
175}
176
177#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
178pub struct RawMirrorPruneReport {
179 pub initialized: bool,
180 pub root_path: String,
181 pub mode: String,
182 pub manifest_count: u64,
183 pub unique_blob_count: u64,
184 pub current_blob_bytes: u64,
185 pub safety_hold_down_ms: i64,
186 pub keep_tags: Vec<String>,
187 pub pinned_manifest_count: u64,
188 pub pinned_blob_count: u64,
189 pub planned_manifest_count: u64,
190 pub planned_blob_count: u64,
191 pub planned_reclaim_bytes: u64,
192 pub applied_manifest_count: u64,
193 pub applied_blob_count: u64,
194 pub applied_reclaim_bytes: u64,
195 pub audit_log_path: Option<String>,
196 pub entries: Vec<RawMirrorPruneEntry>,
197}
198
199#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
200pub struct RawMirrorPruneEntry {
201 pub kind: String,
202 pub path: String,
203 pub blob_blake3: Option<String>,
204 pub size_bytes: u64,
205 pub reason: String,
206 pub applied: bool,
207}
208
209#[derive(Debug, Clone)]
210struct RawMirrorPruneManifest {
211 manifest_id: String,
212 relative_path: String,
213 size_bytes: u64,
214 blob_blake3: String,
215 blob_relative_path: String,
216 blob_size_bytes: u64,
217 captured_at_ms: i64,
218 provider: String,
219 original_path: String,
220 db_links: Vec<RawMirrorDbLink>,
221}
222
223pub fn prune(data_dir: &Path, options: RawMirrorPruneOptions) -> Result<RawMirrorPruneReport> {
224 let root = raw_mirror_root(data_dir);
225 let mut report = RawMirrorPruneReport {
226 initialized: false,
227 root_path: root.display().to_string(),
228 mode: if options.apply {
229 "apply".to_string()
230 } else {
231 "dry-run".to_string()
232 },
233 manifest_count: 0,
234 unique_blob_count: 0,
235 current_blob_bytes: 0,
236 safety_hold_down_ms: options.safety_hold_down_ms,
237 keep_tags: options.keep_tags.clone(),
238 pinned_manifest_count: 0,
239 pinned_blob_count: 0,
240 planned_manifest_count: 0,
241 planned_blob_count: 0,
242 planned_reclaim_bytes: 0,
243 applied_manifest_count: 0,
244 applied_blob_count: 0,
245 applied_reclaim_bytes: 0,
246 audit_log_path: None,
247 entries: Vec::new(),
248 };
249
250 let metadata = match fs::symlink_metadata(&root) {
251 Ok(metadata) => metadata,
252 Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(report),
253 Err(err) => {
254 return Err(err).with_context(|| format!("stat raw mirror root {}", root.display()));
255 }
256 };
257 if metadata.file_type().is_symlink() || !metadata.is_dir() {
258 anyhow::bail!(
259 "refusing to prune invalid raw mirror root {}",
260 root.display()
261 );
262 }
263 report.initialized = true;
264
265 let manifests = collect_prune_manifests(&root)?;
266 report.manifest_count = manifests.len() as u64;
267
268 let mut blob_to_manifests: HashMap<String, Vec<String>> = HashMap::new();
269 let mut manifest_by_id: HashMap<String, &RawMirrorPruneManifest> = HashMap::new();
270 let mut blob_size_by_relative: HashMap<String, u64> = HashMap::new();
271 for manifest in &manifests {
272 manifest_by_id.insert(manifest.manifest_id.clone(), manifest);
273 blob_to_manifests
274 .entry(manifest.blob_relative_path.clone())
275 .or_default()
276 .push(manifest.manifest_id.clone());
277 blob_size_by_relative
278 .entry(manifest.blob_relative_path.clone())
279 .or_insert_with(|| {
280 blob_file_size(&root.join(&manifest.blob_relative_path))
281 .unwrap_or(manifest.blob_size_bytes)
282 });
283 }
284 report.unique_blob_count = blob_size_by_relative.len() as u64;
285 report.current_blob_bytes = blob_size_by_relative
286 .values()
287 .copied()
288 .fold(0u64, u64::saturating_add);
289
290 let now = now_ms();
291 let pinned_manifests = pinned_prune_manifest_ids(
292 data_dir,
293 &manifests,
294 &options.keep_tags,
295 options.safety_hold_down_ms,
296 now,
297 )?;
298 report.pinned_manifest_count = pinned_manifests.len() as u64;
299 let pinned_blobs: HashSet<String> = blob_to_manifests
300 .iter()
301 .filter(|(_, manifest_ids)| manifest_ids.iter().any(|id| pinned_manifests.contains(id)))
302 .map(|(blob_relative_path, _)| blob_relative_path.clone())
303 .collect();
304 report.pinned_blob_count = pinned_blobs.len() as u64;
305
306 let mut selected_manifests: HashSet<String> = HashSet::new();
307 let mut manifest_reasons: HashMap<String, String> = HashMap::new();
308
309 if let Some(older_than_ms) = options.older_than_ms {
310 let cutoff_ms = now.saturating_sub(older_than_ms.max(0));
311 for manifest in &manifests {
312 if manifest.captured_at_ms <= cutoff_ms
313 && !pinned_manifests.contains(&manifest.manifest_id)
314 {
315 selected_manifests.insert(manifest.manifest_id.clone());
316 manifest_reasons
317 .entry(manifest.manifest_id.clone())
318 .or_insert_with(|| format!("captured_at_ms <= {cutoff_ms}"));
319 }
320 }
321 }
322
323 if let Some(max_size_bytes) = options.max_size_bytes
324 && report.current_blob_bytes > max_size_bytes
325 {
326 let mut blob_groups: Vec<_> = blob_to_manifests
327 .iter()
328 .map(|(blob_relative_path, manifest_ids)| {
329 let oldest_capture = manifest_ids
330 .iter()
331 .filter_map(|id| manifest_by_id.get(id).map(|m| m.captured_at_ms))
332 .min()
333 .unwrap_or(i64::MAX);
334 let size = blob_size_by_relative
335 .get(blob_relative_path)
336 .copied()
337 .unwrap_or(0);
338 (
339 blob_relative_path.clone(),
340 manifest_ids.clone(),
341 oldest_capture,
342 size,
343 )
344 })
345 .collect::<Vec<_>>();
346 blob_groups.sort_by(|left, right| left.2.cmp(&right.2).then_with(|| left.0.cmp(&right.0)));
347
348 let mut projected_bytes = report.current_blob_bytes;
349 for (blob_relative_path, manifest_ids, _, size) in blob_groups {
350 if projected_bytes <= max_size_bytes {
351 break;
352 }
353 if pinned_blobs.contains(&blob_relative_path) {
354 continue;
355 }
356 for manifest_id in manifest_ids {
357 if !pinned_manifests.contains(&manifest_id) {
358 selected_manifests.insert(manifest_id.clone());
359 manifest_reasons.entry(manifest_id).or_insert_with(|| {
360 format!("max-size over budget; retiring blob {blob_relative_path}")
361 });
362 }
363 }
364 projected_bytes = projected_bytes.saturating_sub(size);
365 }
366 }
367
368 let selected_blobs: HashSet<String> = blob_to_manifests
369 .iter()
370 .filter(|(_, manifest_ids)| {
371 manifest_ids
372 .iter()
373 .all(|id| selected_manifests.contains(id))
374 })
375 .map(|(blob_relative_path, _)| blob_relative_path.clone())
376 .collect();
377
378 let mut entries = Vec::new();
379 let mut selected_manifest_ids = selected_manifests.into_iter().collect::<Vec<_>>();
380 selected_manifest_ids.sort();
381 for manifest_id in selected_manifest_ids {
382 let Some(manifest) = manifest_by_id.get(&manifest_id) else {
383 continue;
384 };
385 let reason = manifest_reasons
386 .remove(&manifest_id)
387 .unwrap_or_else(|| "selected by retention policy".to_string());
388 entries.push(RawMirrorPruneEntry {
389 kind: "manifest".to_string(),
390 path: manifest.relative_path.clone(),
391 blob_blake3: Some(manifest.blob_blake3.clone()),
392 size_bytes: manifest.size_bytes,
393 reason,
394 applied: false,
395 });
396 }
397
398 let mut selected_blob_paths = selected_blobs.into_iter().collect::<Vec<_>>();
399 selected_blob_paths.sort();
400 for blob_relative_path in selected_blob_paths {
401 let size = blob_size_by_relative
402 .get(&blob_relative_path)
403 .copied()
404 .unwrap_or(0);
405 let blob_blake3 = blob_relative_path
406 .rsplit('/')
407 .next()
408 .and_then(|name| name.strip_suffix(".raw"))
409 .map(ToOwned::to_owned);
410 entries.push(RawMirrorPruneEntry {
411 kind: "blob".to_string(),
412 path: blob_relative_path,
413 blob_blake3,
414 size_bytes: size,
415 reason: "no retained manifest references this blob after prune plan".to_string(),
416 applied: false,
417 });
418 }
419
420 report.planned_manifest_count = entries
421 .iter()
422 .filter(|entry| entry.kind == "manifest")
423 .count() as u64;
424 report.planned_blob_count = entries.iter().filter(|entry| entry.kind == "blob").count() as u64;
425 report.planned_reclaim_bytes = entries
426 .iter()
427 .map(|entry| entry.size_bytes)
428 .fold(0, u64::saturating_add);
429
430 if options.apply {
431 for entry in &mut entries {
432 let path = root.join(&entry.path);
433 let removed = remove_prune_target_file(&path)
434 .with_context(|| format!("applying raw mirror prune for {}", path.display()))?;
435 entry.applied = removed;
436 if removed {
437 if entry.kind == "manifest" {
438 report.applied_manifest_count = report.applied_manifest_count.saturating_add(1);
439 } else if entry.kind == "blob" {
440 report.applied_blob_count = report.applied_blob_count.saturating_add(1);
441 }
442 report.applied_reclaim_bytes = report
443 .applied_reclaim_bytes
444 .saturating_add(entry.size_bytes);
445 }
446 }
447 }
448
449 report.entries = entries;
450 if !report.entries.is_empty() {
451 let audit_path = append_prune_audit_log(&root, &report)?;
452 report.audit_log_path = Some(audit_path.display().to_string());
453 }
454 Ok(report)
455}
456
457fn collect_prune_manifests(root: &Path) -> Result<Vec<RawMirrorPruneManifest>> {
458 let manifests_dir = root.join("manifests");
459 let metadata = match fs::symlink_metadata(&manifests_dir) {
460 Ok(metadata) => metadata,
461 Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
462 Err(err) => return Err(err).with_context(|| format!("stat {}", manifests_dir.display())),
463 };
464 if metadata.file_type().is_symlink() || !metadata.is_dir() {
465 anyhow::bail!(
466 "refusing to prune invalid raw mirror manifests directory {}",
467 manifests_dir.display()
468 );
469 }
470
471 let mut manifests = Vec::new();
472 for entry in
473 fs::read_dir(&manifests_dir).with_context(|| format!("read {}", manifests_dir.display()))?
474 {
475 let entry = entry?;
476 let path = entry.path();
477 if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
478 continue;
479 }
480 let manifest_metadata = fs::symlink_metadata(&path)
481 .with_context(|| format!("stat raw mirror manifest {}", path.display()))?;
482 if manifest_metadata.file_type().is_symlink() || !manifest_metadata.is_file() {
483 anyhow::bail!(
484 "refusing to prune with non-regular raw mirror manifest {}",
485 path.display()
486 );
487 }
488 let manifest = read_raw_mirror_manifest(&path)?;
489 if manifest.manifest_kind != RAW_MIRROR_MANIFEST_KIND {
490 anyhow::bail!(
491 "refusing to prune with unexpected raw mirror manifest kind `{}` in {}",
492 manifest.manifest_kind,
493 path.display()
494 );
495 }
496 let Some(expected_blob_relative_path) =
497 raw_mirror_blob_relative_path(&manifest.blob_blake3)
498 else {
499 anyhow::bail!(
500 "refusing to prune raw mirror manifest {} with invalid blob hash",
501 path.display()
502 );
503 };
504 if manifest.blob_relative_path != expected_blob_relative_path {
505 anyhow::bail!(
506 "refusing to prune raw mirror manifest {} with unexpected blob path `{}`",
507 path.display(),
508 manifest.blob_relative_path
509 );
510 }
511 let relative_path = path
512 .strip_prefix(root)
513 .unwrap_or(&path)
514 .display()
515 .to_string();
516 manifests.push(RawMirrorPruneManifest {
517 manifest_id: manifest.manifest_id,
518 relative_path,
519 size_bytes: manifest_metadata.len(),
520 blob_blake3: manifest.blob_blake3,
521 blob_relative_path: manifest.blob_relative_path,
522 blob_size_bytes: manifest.blob_size_bytes,
523 captured_at_ms: manifest.captured_at_ms,
524 provider: manifest.provider,
525 original_path: manifest.original_path,
526 db_links: manifest.db_links,
527 });
528 }
529 manifests.sort_by(|left, right| {
530 left.captured_at_ms
531 .cmp(&right.captured_at_ms)
532 .then_with(|| left.provider.cmp(&right.provider))
533 .then_with(|| left.original_path.cmp(&right.original_path))
534 .then_with(|| left.manifest_id.cmp(&right.manifest_id))
535 });
536 Ok(manifests)
537}
538
539fn pinned_prune_manifest_ids(
540 data_dir: &Path,
541 manifests: &[RawMirrorPruneManifest],
542 keep_tags: &[String],
543 safety_hold_down_ms: i64,
544 now_ms: i64,
545) -> Result<HashSet<String>> {
546 let mut pinned = HashSet::new();
547 if safety_hold_down_ms > 0 {
548 let cutoff_ms = now_ms.saturating_sub(safety_hold_down_ms);
549 for manifest in manifests {
550 if manifest.captured_at_ms > cutoff_ms {
551 pinned.insert(manifest.manifest_id.clone());
552 }
553 }
554 }
555
556 let normalized_keep_tags = keep_tags
557 .iter()
558 .map(|tag| tag.trim())
559 .filter(|tag| !tag.is_empty())
560 .map(ToOwned::to_owned)
561 .collect::<Vec<_>>();
562 if normalized_keep_tags.is_empty() {
563 return Ok(pinned);
564 }
565
566 let keep_tag_conversation_ids =
567 load_keep_tag_conversation_ids(data_dir, manifests, &normalized_keep_tags)?;
568 for manifest in manifests {
569 if manifest.db_links.iter().any(|link| {
570 link.conversation_id
571 .is_some_and(|id| keep_tag_conversation_ids.contains(&id))
572 }) {
573 pinned.insert(manifest.manifest_id.clone());
574 }
575 }
576 Ok(pinned)
577}
578
579fn load_keep_tag_conversation_ids(
580 data_dir: &Path,
581 manifests: &[RawMirrorPruneManifest],
582 keep_tags: &[String],
583) -> Result<HashSet<i64>> {
584 use frankensqlite::compat::{ConnectionExt as _, ParamValue, RowExt as _};
585
586 let mut conversation_ids = manifests
587 .iter()
588 .flat_map(|manifest| manifest.db_links.iter())
589 .filter_map(|link| link.conversation_id)
590 .collect::<Vec<_>>();
591 conversation_ids.sort_unstable();
592 conversation_ids.dedup();
593 if conversation_ids.is_empty() {
594 return Ok(HashSet::new());
595 }
596
597 let db_path = data_dir.join("agent_search.db");
598 let conn = crate::storage::sqlite::open_franken_raw_readonly_connection_with_timeout(
599 &db_path,
600 Duration::from_secs(30),
601 )
602 .with_context(|| {
603 format!(
604 "open {} to honor raw-mirror prune --keep-tag",
605 db_path.display()
606 )
607 })?;
608 let _ = conn.execute("PRAGMA query_only = 1;");
609
610 let mut pinned = HashSet::new();
611 for id_chunk in conversation_ids.chunks(400) {
612 let tag_placeholders = (0..keep_tags.len())
613 .map(|idx| format!("?{}", idx + 1))
614 .collect::<Vec<_>>()
615 .join(", ");
616 let id_offset = keep_tags.len();
617 let id_placeholders = (0..id_chunk.len())
618 .map(|idx| format!("?{}", id_offset + idx + 1))
619 .collect::<Vec<_>>()
620 .join(", ");
621 let sql = format!(
622 "SELECT DISTINCT ct.conversation_id \
623 FROM conversation_tags ct \
624 JOIN tags t ON t.id = ct.tag_id \
625 WHERE t.name IN ({tag_placeholders}) \
626 AND ct.conversation_id IN ({id_placeholders})"
627 );
628 let mut params = keep_tags
629 .iter()
630 .map(|tag| ParamValue::from(tag.as_str()))
631 .collect::<Vec<_>>();
632 params.extend(id_chunk.iter().copied().map(ParamValue::from));
633 let rows: Vec<i64> = conn
634 .query_map_collect(&sql, ¶ms, |row: &frankensqlite::Row| row.get_typed(0))
635 .with_context(|| "query raw-mirror prune keep-tag conversation pins")?;
636 pinned.extend(rows);
637 }
638
639 Ok(pinned)
640}
641
642fn blob_file_size(path: &Path) -> Option<u64> {
643 fs::symlink_metadata(path)
644 .ok()
645 .filter(|metadata| metadata.is_file() && !metadata.file_type().is_symlink())
646 .map(|metadata| metadata.len())
647}
648
649fn remove_prune_target_file(path: &Path) -> Result<bool> {
650 let metadata = match fs::symlink_metadata(path) {
651 Ok(metadata) => metadata,
652 Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(false),
653 Err(err) => return Err(err).with_context(|| format!("stat {}", path.display())),
654 };
655 if metadata.file_type().is_symlink() || !metadata.is_file() {
656 anyhow::bail!(
657 "refusing to prune non-regular raw mirror file {}",
658 path.display()
659 );
660 }
661 fs::remove_file(path).with_context(|| format!("remove raw mirror file {}", path.display()))?;
662 sync_parent(path)?;
663 Ok(true)
664}
665
666fn append_prune_audit_log(root: &Path, report: &RawMirrorPruneReport) -> Result<PathBuf> {
667 ensure_private_dir(root)?;
668 let audit_path = root.join("pruned.jsonl");
669 let mut file = OpenOptions::new()
670 .create(true)
671 .append(true)
672 .open(&audit_path)
673 .with_context(|| format!("open raw mirror prune audit {}", audit_path.display()))?;
674 set_private_file_permissions(&audit_path)?;
675 let now = now_ms();
676 for entry in &report.entries {
677 let record = json!({
678 "schema_version": 1,
679 "recorded_at_ms": now,
680 "mode": report.mode,
681 "kind": entry.kind,
682 "path": entry.path,
683 "blob_blake3": entry.blob_blake3,
684 "size_bytes": entry.size_bytes,
685 "reason": entry.reason,
686 "applied": entry.applied,
687 });
688 writeln!(file, "{record}")
689 .with_context(|| format!("write raw mirror prune audit {}", audit_path.display()))?;
690 }
691 file.sync_all()
692 .with_context(|| format!("sync raw mirror prune audit {}", audit_path.display()))?;
693 sync_parent(&audit_path)?;
694 Ok(audit_path)
695}
696
697fn merge_min_max(min: &mut Option<i64>, max: &mut Option<i64>, value: Option<i64>) {
698 let Some(value) = value else {
699 return;
700 };
701 *min = Some(min.map_or(value, |current| current.min(value)));
702 *max = Some(max.map_or(value, |current| current.max(value)));
703}
704
705fn raw_mirror_dir_file_bytes(root: &Path) -> u64 {
706 let mut total = 0u64;
707 let mut stack = vec![root.to_path_buf()];
708 while let Some(path) = stack.pop() {
709 let Ok(metadata) = fs::symlink_metadata(&path) else {
710 continue;
711 };
712 if metadata.file_type().is_symlink() {
713 continue;
714 }
715 if metadata.is_file() {
716 total = total.saturating_add(metadata.len());
717 } else if metadata.is_dir() {
718 let Ok(entries) = fs::read_dir(&path) else {
719 continue;
720 };
721 for entry in entries.flatten() {
722 stack.push(entry.path());
723 }
724 }
725 }
726 total
727}
728
729#[derive(Debug, Clone, PartialEq, Eq, Hash)]
730struct RawMirrorBlobCacheKey {
731 data_dir: PathBuf,
732 source_path: PathBuf,
733 source_identity: Option<String>,
734 source_size_bytes: u64,
735 source_mtime_ns: Option<u128>,
736 source_change_time_ns: Option<u128>,
737}
738
739#[derive(Debug, Clone, PartialEq, Eq)]
740struct RawMirrorBlobRecord {
741 blob_blake3: String,
742 bytes_copied: u64,
743}
744
745#[derive(Debug, Clone, Serialize, Deserialize)]
746struct RawMirrorCompressionEnvelope {
747 state: String,
748 algorithm: Option<String>,
749 uncompressed_size_bytes: Option<u64>,
750}
751
752#[derive(Debug, Clone, Serialize, Deserialize)]
753struct RawMirrorEncryptionEnvelope {
754 state: String,
755 algorithm: Option<String>,
756 key_id: Option<String>,
757 envelope_version: Option<u32>,
758}
759
760#[derive(Debug, Clone, Serialize, Deserialize)]
761struct RawMirrorVerificationRecord {
762 status: String,
763 verifier: String,
764 content_blake3: Option<String>,
765 verified_at_ms: Option<i64>,
766}
767
768#[derive(Debug, Clone, Serialize, Deserialize)]
769struct RawMirrorManifestFile {
770 schema_version: u32,
771 manifest_kind: String,
772 manifest_id: String,
773 blob_hash_algorithm: String,
774 blob_relative_path: String,
775 blob_blake3: String,
776 blob_size_bytes: u64,
777 provider: String,
778 source_id: String,
779 origin_kind: String,
780 origin_host: Option<String>,
781 original_path: String,
782 redacted_original_path: String,
783 original_path_blake3: String,
784 captured_at_ms: i64,
785 source_mtime_ms: Option<i64>,
786 source_size_bytes: u64,
787 compression: RawMirrorCompressionEnvelope,
788 encryption: RawMirrorEncryptionEnvelope,
789 db_links: Vec<RawMirrorDbLink>,
790 verification: RawMirrorVerificationRecord,
791 manifest_blake3: Option<String>,
792}
793
794pub fn capture_source_file(input: RawMirrorCaptureInput<'_>) -> Result<RawMirrorCaptureRecord> {
795 let source_metadata = fs::symlink_metadata(input.source_path)
796 .with_context(|| format!("stat raw mirror source {}", input.source_path.display()))?;
797 if source_metadata.file_type().is_symlink() {
798 return Err(anyhow!(
799 "refusing to raw-mirror symlink source {}",
800 input.source_path.display()
801 ));
802 }
803 if !source_metadata.is_file() {
804 return Err(anyhow!(
805 "refusing to raw-mirror non-file source {}",
806 input.source_path.display()
807 ));
808 }
809
810 let root = ensure_raw_mirror_root(input.data_dir)?;
811 ensure_private_dir_descendant(&root, &root.join("tmp"))?;
812
813 let cache_key = raw_mirror_blob_cache_key(&input, &source_metadata);
814 let (blob_blake3, bytes_copied, blob_already_present) =
815 match cached_raw_mirror_blob_record(&cache_key, &root) {
816 Some(record) => (record.blob_blake3, record.bytes_copied, true),
817 None => {
818 let temp_dir = unique_capture_temp_dir(&root);
819 ensure_private_dir_descendant(&root, &temp_dir)?;
820 let CopyToTempResult {
821 temp_path,
822 blob_blake3,
823 bytes_copied,
824 } = copy_source_to_private_temp(input.source_path, &temp_dir, &source_metadata)?;
825 let blob_relative_path = raw_mirror_blob_relative_path(&blob_blake3)
826 .ok_or_else(|| anyhow!("computed invalid raw mirror blake3 digest"))?;
827 let blob_path = root.join(&blob_relative_path);
828 let already_present =
829 publish_content_addressed_temp(&root, &temp_path, &blob_path, &blob_blake3)?;
830 remove_empty_temp_dir_best_effort(&temp_dir);
831 cache_raw_mirror_blob_record(
832 cache_key.clone(),
833 RawMirrorBlobRecord {
834 blob_blake3: blob_blake3.clone(),
835 bytes_copied,
836 },
837 );
838 (blob_blake3, bytes_copied, already_present)
839 }
840 };
841 let blob_relative_path = raw_mirror_blob_relative_path(&blob_blake3)
842 .ok_or_else(|| anyhow!("computed invalid raw mirror blake3 digest"))?;
843
844 let original_path = input.source_path.display().to_string();
845 let original_path_blake3 = raw_mirror_original_path_blake3(&original_path);
846 let manifest_id = raw_mirror_manifest_id(
847 input.provider,
848 input.source_id,
849 input.origin_kind,
850 input.origin_host,
851 &original_path_blake3,
852 &blob_blake3,
853 );
854 let manifest_relative_path = raw_mirror_manifest_relative_path(&manifest_id);
855 let manifest_path = root.join(&manifest_relative_path);
856 let captured_at_ms = now_ms();
857 let source_mtime_ms = source_metadata.modified().ok().and_then(system_time_to_ms);
858 let mut manifest = RawMirrorManifestFile {
859 schema_version: RAW_MIRROR_SCHEMA_VERSION,
860 manifest_kind: RAW_MIRROR_MANIFEST_KIND.to_string(),
861 manifest_id: manifest_id.clone(),
862 blob_hash_algorithm: RAW_MIRROR_HASH_ALGORITHM.to_string(),
863 blob_relative_path: blob_relative_path.clone(),
864 blob_blake3: blob_blake3.clone(),
865 blob_size_bytes: bytes_copied,
866 provider: input.provider.to_string(),
867 source_id: input.source_id.to_string(),
868 origin_kind: input.origin_kind.to_string(),
869 origin_host: input.origin_host.map(ToOwned::to_owned),
870 original_path,
871 redacted_original_path: redacted_original_path(input.provider, input.source_path),
872 original_path_blake3,
873 captured_at_ms,
874 source_mtime_ms,
875 source_size_bytes: source_metadata.len(),
876 compression: RawMirrorCompressionEnvelope {
877 state: "none".to_string(),
878 algorithm: None,
879 uncompressed_size_bytes: Some(bytes_copied),
880 },
881 encryption: RawMirrorEncryptionEnvelope {
882 state: "none".to_string(),
883 algorithm: None,
884 key_id: None,
885 envelope_version: None,
886 },
887 db_links: unique_db_links(input.db_links),
888 verification: RawMirrorVerificationRecord {
889 status: "captured".to_string(),
890 verifier: "cass_indexer".to_string(),
891 content_blake3: Some(blob_blake3.clone()),
892 verified_at_ms: Some(captured_at_ms),
893 },
894 manifest_blake3: None,
895 };
896 manifest.manifest_blake3 = Some(raw_mirror_manifest_blake3(&manifest));
897 let manifest_bytes = serde_json::to_vec_pretty(&manifest)?;
898 let manifest_already_present =
899 publish_manifest_bytes_create_new(&root, &manifest_path, &manifest_bytes, &blob_blake3)?;
900 let (record_blob_size_bytes, record_captured_at_ms, record_source_mtime_ms) =
901 if manifest_already_present {
902 merge_raw_mirror_manifest_db_links(
903 &root,
904 &manifest_path,
905 input.db_links,
906 Some(&blob_blake3),
907 )?;
908 let published = read_raw_mirror_manifest(&manifest_path)?;
909 (
910 published.blob_size_bytes,
911 published.captured_at_ms,
912 published.source_mtime_ms,
913 )
914 } else {
915 (bytes_copied, captured_at_ms, source_mtime_ms)
916 };
917
918 Ok(RawMirrorCaptureRecord {
919 manifest_id,
920 manifest_relative_path,
921 blob_relative_path,
922 blob_blake3,
923 blob_size_bytes: record_blob_size_bytes,
924 captured_at_ms: record_captured_at_ms,
925 source_mtime_ms: record_source_mtime_ms,
926 already_present: blob_already_present && manifest_already_present,
927 })
928}
929
930pub fn merge_manifest_db_links(
931 data_dir: &Path,
932 manifest_relative_path: &str,
933 links: &[RawMirrorDbLink],
934) -> Result<()> {
935 if links.is_empty() {
936 return Ok(());
937 }
938 let root = raw_mirror_root(data_dir);
939 let manifest_path = raw_mirror_manifest_path_from_relative(&root, manifest_relative_path)?;
940 merge_raw_mirror_manifest_db_links(&root, &manifest_path, links, None)
941}
942
943struct CopyToTempResult {
944 temp_path: PathBuf,
945 blob_blake3: String,
946 bytes_copied: u64,
947}
948
949fn copy_source_to_private_temp(
950 source_path: &Path,
951 temp_dir: &Path,
952 source_metadata: &fs::Metadata,
953) -> Result<CopyToTempResult> {
954 let temp_path = unique_temp_path(temp_dir, "blob");
955 let mut source = open_stable_source_file(source_path, source_metadata)?;
956 let mut temp = private_create_new_file(&temp_path)?;
957 let mut hasher = blake3::Hasher::new();
958 let mut buffer = [0u8; 64 * 1024];
959 let mut bytes_copied = 0u64;
960 loop {
961 let read = source
962 .read(&mut buffer)
963 .with_context(|| format!("read raw mirror source {}", source_path.display()))?;
964 if read == 0 {
965 break;
966 }
967 temp.write_all(&buffer[..read])
968 .with_context(|| format!("write raw mirror temp {}", temp_path.display()))?;
969 hasher.update(&buffer[..read]);
970 bytes_copied = bytes_copied.saturating_add(read as u64);
971 }
972 temp.sync_all()
973 .with_context(|| format!("sync raw mirror temp {}", temp_path.display()))?;
974
975 let final_source_metadata = source
976 .metadata()
977 .with_context(|| format!("stat opened raw mirror source {}", source_path.display()))?;
978 if source_file_changed_during_capture(source_metadata, &final_source_metadata) {
979 remove_temp_best_effort(&temp_path);
980 return Err(anyhow!(
981 "raw mirror source {} changed while it was being captured; retry indexing to capture a stable copy",
982 source_path.display()
983 ));
984 }
985
986 Ok(CopyToTempResult {
987 temp_path,
988 blob_blake3: hasher.finalize().to_hex().to_string(),
989 bytes_copied,
990 })
991}
992
993fn open_stable_source_file(source_path: &Path, expected_metadata: &fs::Metadata) -> Result<File> {
994 let source = File::open(source_path)
995 .with_context(|| format!("open raw mirror source {}", source_path.display()))?;
996 let opened_metadata = source
997 .metadata()
998 .with_context(|| format!("stat opened raw mirror source {}", source_path.display()))?;
999 if !same_source_identity(expected_metadata, &opened_metadata) {
1000 return Err(anyhow!(
1001 "raw mirror source {} changed identity before capture",
1002 source_path.display()
1003 ));
1004 }
1005 let current_path_metadata = fs::symlink_metadata(source_path)
1006 .with_context(|| format!("restat raw mirror source {}", source_path.display()))?;
1007 if current_path_metadata.file_type().is_symlink() {
1008 return Err(anyhow!(
1009 "refusing to raw-mirror symlink source {}",
1010 source_path.display()
1011 ));
1012 }
1013 if !same_source_identity(expected_metadata, ¤t_path_metadata) {
1014 return Err(anyhow!(
1015 "raw mirror source {} changed identity before capture",
1016 source_path.display()
1017 ));
1018 }
1019 Ok(source)
1020}
1021
1022#[cfg(unix)]
1023fn same_source_identity(expected: &fs::Metadata, actual: &fs::Metadata) -> bool {
1024 use std::os::unix::fs::MetadataExt;
1025 actual.is_file() && expected.dev() == actual.dev() && expected.ino() == actual.ino()
1026}
1027
1028#[cfg(not(unix))]
1029fn same_source_identity(_expected: &fs::Metadata, actual: &fs::Metadata) -> bool {
1030 actual.is_file()
1031}
1032
1033#[cfg(unix)]
1034fn source_identity_token(metadata: &fs::Metadata) -> Option<String> {
1035 use std::os::unix::fs::MetadataExt;
1036 Some(format!("{}:{}", metadata.dev(), metadata.ino()))
1037}
1038
1039#[cfg(not(unix))]
1040fn source_identity_token(_metadata: &fs::Metadata) -> Option<String> {
1041 None
1042}
1043
1044#[cfg(unix)]
1045fn source_change_time_ns(metadata: &fs::Metadata) -> Option<u128> {
1046 use std::os::unix::fs::MetadataExt;
1047
1048 let seconds = u128::try_from(metadata.ctime()).ok()?;
1049 let nanoseconds = u128::try_from(metadata.ctime_nsec()).ok()?;
1050 Some(
1051 seconds
1052 .saturating_mul(1_000_000_000)
1053 .saturating_add(nanoseconds),
1054 )
1055}
1056
1057#[cfg(not(unix))]
1058fn source_change_time_ns(_metadata: &fs::Metadata) -> Option<u128> {
1059 None
1060}
1061
1062fn source_file_changed_during_capture(
1063 initial: &fs::Metadata,
1064 final_metadata: &fs::Metadata,
1065) -> bool {
1066 if initial.len() != final_metadata.len() {
1067 return true;
1068 }
1069 match (initial.modified().ok(), final_metadata.modified().ok()) {
1070 (Some(initial_mtime), Some(final_mtime)) => initial_mtime != final_mtime,
1071 _ => false,
1072 }
1073}
1074
1075fn publish_content_addressed_temp(
1076 root: &Path,
1077 temp_path: &Path,
1078 final_path: &Path,
1079 expected_blake3: &str,
1080) -> Result<bool> {
1081 ensure_private_dir_descendant(
1082 root,
1083 final_path
1084 .parent()
1085 .ok_or_else(|| anyhow!("raw mirror blob path has no parent"))?,
1086 )?;
1087 if final_path.exists() {
1088 verify_existing_file(final_path, expected_blake3)?;
1089 remove_temp_best_effort(temp_path);
1090 return Ok(true);
1091 }
1092
1093 match fs::hard_link(temp_path, final_path) {
1094 Ok(()) => {
1095 sync_file(final_path)?;
1096 sync_parent(final_path)?;
1097 remove_temp_best_effort(temp_path);
1098 Ok(false)
1099 }
1100 Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {
1101 verify_existing_file(final_path, expected_blake3)?;
1102 remove_temp_best_effort(temp_path);
1103 Ok(true)
1104 }
1105 Err(err) => Err(anyhow!(
1106 "publish raw mirror blob {} from {}: {err}",
1107 final_path.display(),
1108 temp_path.display()
1109 )),
1110 }
1111}
1112
1113fn publish_manifest_bytes_create_new(
1114 root: &Path,
1115 manifest_path: &Path,
1116 manifest_bytes: &[u8],
1117 blob_blake3: &str,
1118) -> Result<bool> {
1119 ensure_private_dir_descendant(
1120 root,
1121 manifest_path
1122 .parent()
1123 .ok_or_else(|| anyhow!("raw mirror manifest path has no parent"))?,
1124 )?;
1125 if manifest_path.exists() {
1126 verify_existing_manifest(manifest_path, blob_blake3)?;
1127 return Ok(true);
1128 }
1129
1130 let temp_dir = unique_capture_temp_dir(root);
1131 ensure_private_dir_descendant(root, &temp_dir)?;
1132 let temp_path = unique_temp_path(&temp_dir, "manifest");
1133 let mut temp = private_create_new_file(&temp_path)?;
1134 temp.write_all(manifest_bytes)
1135 .with_context(|| format!("write raw mirror manifest temp {}", temp_path.display()))?;
1136 temp.sync_all()
1137 .with_context(|| format!("sync raw mirror manifest temp {}", temp_path.display()))?;
1138
1139 match fs::hard_link(&temp_path, manifest_path) {
1140 Ok(()) => {
1141 sync_file(manifest_path)?;
1142 sync_parent(manifest_path)?;
1143 remove_temp_best_effort(&temp_path);
1144 remove_empty_temp_dir_best_effort(&temp_dir);
1145 Ok(false)
1146 }
1147 Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {
1148 verify_existing_manifest(manifest_path, blob_blake3)?;
1149 remove_temp_best_effort(&temp_path);
1150 remove_empty_temp_dir_best_effort(&temp_dir);
1151 Ok(true)
1152 }
1153 Err(err) => Err(anyhow!(
1154 "publish raw mirror manifest {} from {}: {err}",
1155 manifest_path.display(),
1156 temp_path.display()
1157 )),
1158 }
1159}
1160
1161fn merge_raw_mirror_manifest_db_links(
1162 root: &Path,
1163 manifest_path: &Path,
1164 links: &[RawMirrorDbLink],
1165 expected_blob_blake3: Option<&str>,
1166) -> Result<()> {
1167 if links.is_empty() {
1168 return Ok(());
1169 }
1170
1171 let lock = MANIFEST_UPDATE_LOCK.get_or_init(|| Mutex::new(()));
1172 let _guard = lock
1173 .lock()
1174 .map_err(|_| anyhow!("raw mirror manifest update lock poisoned"))?;
1175
1176 let mut manifest = read_raw_mirror_manifest(manifest_path)?;
1177 if let Some(expected_blob_blake3) = expected_blob_blake3
1178 && manifest.blob_blake3 != expected_blob_blake3
1179 {
1180 return Err(anyhow!(
1181 "existing raw mirror manifest {} points at blob {}, expected {}",
1182 manifest_path.display(),
1183 manifest.blob_blake3,
1184 expected_blob_blake3
1185 ));
1186 }
1187
1188 let mut merged_links = manifest.db_links.clone();
1189 merged_links.extend_from_slice(links);
1190 let merged_links = unique_db_links(&merged_links);
1191 if merged_links == manifest.db_links {
1192 return Ok(());
1193 }
1194
1195 manifest.db_links = merged_links;
1196 manifest.manifest_blake3 = Some(raw_mirror_manifest_blake3(&manifest));
1197 let manifest_bytes = serde_json::to_vec_pretty(&manifest)?;
1198 replace_manifest_bytes(root, manifest_path, &manifest_bytes)
1199}
1200
1201fn replace_manifest_bytes(root: &Path, manifest_path: &Path, manifest_bytes: &[u8]) -> Result<()> {
1202 ensure_private_dir_descendant(
1203 root,
1204 manifest_path
1205 .parent()
1206 .ok_or_else(|| anyhow!("raw mirror manifest path has no parent"))?,
1207 )?;
1208 let temp_dir = unique_capture_temp_dir(root);
1209 ensure_private_dir_descendant(root, &temp_dir)?;
1210 let temp_path = unique_temp_path(&temp_dir, "manifest-update");
1211 let mut temp = private_create_new_file(&temp_path)?;
1212 temp.write_all(manifest_bytes).with_context(|| {
1213 format!(
1214 "write raw mirror manifest update temp {}",
1215 temp_path.display()
1216 )
1217 })?;
1218 temp.sync_all().with_context(|| {
1219 format!(
1220 "sync raw mirror manifest update temp {}",
1221 temp_path.display()
1222 )
1223 })?;
1224 drop(temp);
1225
1226 fs::rename(&temp_path, manifest_path).with_context(|| {
1227 format!(
1228 "replace raw mirror manifest {} from {}",
1229 manifest_path.display(),
1230 temp_path.display()
1231 )
1232 })?;
1233 set_private_file_permissions(manifest_path)?;
1234 sync_file(manifest_path)?;
1235 sync_parent(manifest_path)?;
1236 remove_empty_temp_dir_best_effort(&temp_dir);
1237 Ok(())
1238}
1239
1240fn raw_mirror_manifest_path_from_relative(root: &Path, relative_path: &str) -> Result<PathBuf> {
1241 let relative = Path::new(relative_path);
1242 if relative.is_absolute() {
1243 return Err(anyhow!(
1244 "raw mirror manifest path must be relative: {relative_path}"
1245 ));
1246 }
1247
1248 let mut normal_components = Vec::new();
1249 for component in relative.components() {
1250 match component {
1251 std::path::Component::Normal(part) => normal_components.push(part),
1252 _ => {
1253 return Err(anyhow!(
1254 "raw mirror manifest path must use only normal relative components: {relative_path}"
1255 ));
1256 }
1257 }
1258 }
1259
1260 if normal_components.len() != 2
1261 || normal_components[0] != std::ffi::OsStr::new("manifests")
1262 || Path::new(normal_components[1])
1263 .extension()
1264 .and_then(|ext| ext.to_str())
1265 != Some("json")
1266 {
1267 return Err(anyhow!(
1268 "raw mirror manifest path must match manifests/<id>.json: {relative_path}"
1269 ));
1270 }
1271
1272 Ok(root.join(relative))
1273}
1274
1275fn verify_existing_file(path: &Path, expected_blake3: &str) -> Result<()> {
1276 let metadata = fs::symlink_metadata(path)
1277 .with_context(|| format!("stat raw mirror blob {}", path.display()))?;
1278 if metadata.file_type().is_symlink() {
1279 return Err(anyhow!(
1280 "refusing to read symlink raw mirror blob {}",
1281 path.display()
1282 ));
1283 }
1284 if !metadata.is_file() {
1285 return Err(anyhow!(
1286 "refusing to read non-file raw mirror blob {}",
1287 path.display()
1288 ));
1289 }
1290 let actual = file_blake3(path)?;
1291 if actual == expected_blake3 {
1292 Ok(())
1293 } else {
1294 Err(anyhow!(
1295 "existing raw mirror blob {} has blake3 {}, expected {}",
1296 path.display(),
1297 actual,
1298 expected_blake3
1299 ))
1300 }
1301}
1302
1303fn verify_existing_manifest(path: &Path, expected_blob_blake3: &str) -> Result<()> {
1304 let manifest = read_raw_mirror_manifest(path)?;
1305 if manifest.blob_blake3 == expected_blob_blake3 {
1306 Ok(())
1307 } else {
1308 Err(anyhow!(
1309 "existing raw mirror manifest {} points at blob {}, expected {}",
1310 path.display(),
1311 manifest.blob_blake3,
1312 expected_blob_blake3
1313 ))
1314 }
1315}
1316
1317fn read_raw_mirror_manifest(path: &Path) -> Result<RawMirrorManifestFile> {
1318 let metadata = fs::symlink_metadata(path)
1319 .with_context(|| format!("stat raw mirror manifest {}", path.display()))?;
1320 if metadata.file_type().is_symlink() {
1321 return Err(anyhow!(
1322 "refusing to read symlink raw mirror manifest {}",
1323 path.display()
1324 ));
1325 }
1326 if !metadata.is_file() {
1327 return Err(anyhow!(
1328 "refusing to read non-file raw mirror manifest {}",
1329 path.display()
1330 ));
1331 }
1332 serde_json::from_slice(
1333 &fs::read(path).with_context(|| format!("read raw mirror manifest {}", path.display()))?,
1334 )
1335 .with_context(|| format!("parse raw mirror manifest {}", path.display()))
1336}
1337
1338fn raw_mirror_root(data_dir: &Path) -> PathBuf {
1339 data_dir
1340 .join(RAW_MIRROR_ROOT_DIR)
1341 .join(RAW_MIRROR_VERSION_DIR)
1342}
1343
1344fn ensure_raw_mirror_root(data_dir: &Path) -> Result<PathBuf> {
1345 let root_parent = data_dir.join(RAW_MIRROR_ROOT_DIR);
1346 ensure_private_dir(&root_parent)?;
1347 let root = root_parent.join(RAW_MIRROR_VERSION_DIR);
1348 ensure_private_dir(&root)?;
1349 Ok(root)
1350}
1351
1352fn raw_mirror_blob_cache_key(
1353 input: &RawMirrorCaptureInput<'_>,
1354 source_metadata: &fs::Metadata,
1355) -> RawMirrorBlobCacheKey {
1356 RawMirrorBlobCacheKey {
1357 data_dir: input.data_dir.to_path_buf(),
1358 source_path: input.source_path.to_path_buf(),
1359 source_identity: source_identity_token(source_metadata),
1360 source_size_bytes: source_metadata.len(),
1361 source_mtime_ns: source_metadata.modified().ok().and_then(system_time_to_ns),
1362 source_change_time_ns: source_change_time_ns(source_metadata),
1363 }
1364}
1365
1366fn cached_raw_mirror_blob_record(
1367 key: &RawMirrorBlobCacheKey,
1368 root: &Path,
1369) -> Option<RawMirrorBlobRecord> {
1370 let cache = BLOB_CAPTURE_CACHE.get_or_init(|| Mutex::new(HashMap::new()));
1371 let record = {
1372 let mut guard = cache.lock().ok()?;
1373 let record = guard.get(key).cloned()?;
1374 if raw_mirror_blob_relative_path(&record.blob_blake3).is_none() {
1375 guard.remove(key);
1376 return None;
1377 }
1378 record
1379 };
1380
1381 let blob_relative_path = raw_mirror_blob_relative_path(&record.blob_blake3)?;
1382 let blob_path = root.join(blob_relative_path);
1383 let metadata_valid = fs::symlink_metadata(&blob_path)
1384 .map(|metadata| metadata.is_file() && !metadata.file_type().is_symlink())
1385 .unwrap_or(false);
1386 if !metadata_valid {
1387 remove_cached_raw_mirror_blob_record_if_unchanged(cache, key, &record);
1388 return None;
1389 }
1390
1391 match file_blake3(&blob_path) {
1392 Ok(actual) if actual == record.blob_blake3 => Some(record),
1393 Ok(actual) => {
1394 tracing::warn!(
1395 path = %blob_path.display(),
1396 expected_blake3 = %record.blob_blake3,
1397 actual_blake3 = %actual,
1398 "discarding raw mirror blob cache entry with mismatched content"
1399 );
1400 remove_cached_raw_mirror_blob_record_if_unchanged(cache, key, &record);
1401 None
1402 }
1403 Err(err) => {
1404 tracing::debug!(
1405 path = %blob_path.display(),
1406 error = %err,
1407 "discarding unreadable raw mirror blob cache entry"
1408 );
1409 remove_cached_raw_mirror_blob_record_if_unchanged(cache, key, &record);
1410 None
1411 }
1412 }
1413}
1414
1415fn remove_cached_raw_mirror_blob_record_if_unchanged(
1416 cache: &Mutex<HashMap<RawMirrorBlobCacheKey, RawMirrorBlobRecord>>,
1417 key: &RawMirrorBlobCacheKey,
1418 stale_record: &RawMirrorBlobRecord,
1419) {
1420 if let Ok(mut guard) = cache.lock()
1421 && guard
1422 .get(key)
1423 .is_some_and(|current| current == stale_record)
1424 {
1425 guard.remove(key);
1426 }
1427}
1428
1429fn cache_raw_mirror_blob_record(key: RawMirrorBlobCacheKey, record: RawMirrorBlobRecord) {
1430 let cache = BLOB_CAPTURE_CACHE.get_or_init(|| Mutex::new(HashMap::new()));
1431 if let Ok(mut guard) = cache.lock() {
1432 guard.insert(key, record);
1433 }
1434}
1435
1436fn raw_mirror_blob_relative_path(blob_blake3: &str) -> Option<String> {
1437 if blob_blake3.len() != 64 || !blob_blake3.chars().all(|c| c.is_ascii_hexdigit()) {
1438 return None;
1439 }
1440 let lower = blob_blake3.to_ascii_lowercase();
1441 Some(format!(
1442 "blobs/{}/{}/{}.{}",
1443 RAW_MIRROR_HASH_ALGORITHM,
1444 &lower[..2],
1445 lower,
1446 RAW_MIRROR_BLOB_EXTENSION
1447 ))
1448}
1449
1450fn raw_mirror_manifest_relative_path(manifest_id: &str) -> String {
1451 format!("manifests/{manifest_id}.json")
1452}
1453
1454fn raw_mirror_original_path_blake3(original_path: &str) -> String {
1455 let mut hasher = blake3::Hasher::new();
1456 hasher.update(b"doctor-raw-mirror-original-path-v1");
1457 hasher.update(&[0]);
1458 hasher.update(original_path.as_bytes());
1459 hasher.finalize().to_hex().to_string()
1460}
1461
1462fn raw_mirror_manifest_id(
1463 provider: &str,
1464 source_id: &str,
1465 origin_kind: &str,
1466 origin_host: Option<&str>,
1467 original_path_blake3: &str,
1468 blob_blake3: &str,
1469) -> String {
1470 canonical_blake3(
1471 "doctor-raw-mirror-manifest-id-v1",
1472 json!({
1473 "provider": provider,
1474 "source_id": source_id,
1475 "origin_kind": origin_kind,
1476 "origin_host": origin_host,
1477 "original_path_blake3": original_path_blake3,
1478 "blob_blake3": blob_blake3,
1479 }),
1480 )
1481}
1482
1483fn raw_mirror_manifest_blake3(manifest: &RawMirrorManifestFile) -> String {
1484 let mut value = serde_json::to_value(manifest).unwrap_or_default();
1485 if let Value::Object(map) = &mut value {
1486 map.remove("manifest_blake3");
1487 }
1488 canonical_blake3("doctor-raw-mirror-manifest-v1", value)
1489}
1490
1491fn canonical_blake3(prefix: &str, value: Value) -> String {
1492 let encoded = serde_json::to_vec(&canonical_json_value(value)).unwrap_or_default();
1493 let mut hasher = blake3::Hasher::new();
1494 hasher.update(prefix.as_bytes());
1495 hasher.update(&[0]);
1496 hasher.update(&encoded);
1497 format!("{prefix}-{}", hasher.finalize().to_hex())
1498}
1499
1500fn canonical_json_value(value: Value) -> Value {
1501 match value {
1502 Value::Array(items) => Value::Array(items.into_iter().map(canonical_json_value).collect()),
1503 Value::Object(map) => {
1504 let mut entries: Vec<_> = map.into_iter().collect();
1505 entries.sort_by(|left, right| left.0.cmp(&right.0));
1506 let mut canonical = serde_json::Map::new();
1507 for (key, value) in entries {
1508 canonical.insert(key, canonical_json_value(value));
1509 }
1510 Value::Object(canonical)
1511 }
1512 other => other,
1513 }
1514}
1515
1516fn unique_db_links(links: &[RawMirrorDbLink]) -> Vec<RawMirrorDbLink> {
1517 let mut dedup = links.to_vec();
1518 dedup.sort_by(|left, right| {
1519 (
1520 left.conversation_id,
1521 left.message_count,
1522 left.started_at_ms,
1523 left.source_path.as_deref().unwrap_or(""),
1524 )
1525 .cmp(&(
1526 right.conversation_id,
1527 right.message_count,
1528 right.started_at_ms,
1529 right.source_path.as_deref().unwrap_or(""),
1530 ))
1531 });
1532 dedup.dedup();
1533 dedup
1534}
1535
1536fn file_blake3(path: &Path) -> Result<String> {
1537 let mut file = File::open(path).with_context(|| format!("open {}", path.display()))?;
1538 let mut hasher = blake3::Hasher::new();
1539 let mut buffer = [0u8; 64 * 1024];
1540 loop {
1541 let read = file
1542 .read(&mut buffer)
1543 .with_context(|| format!("read {}", path.display()))?;
1544 if read == 0 {
1545 break;
1546 }
1547 hasher.update(&buffer[..read]);
1548 }
1549 Ok(hasher.finalize().to_hex().to_string())
1550}
1551
1552fn ensure_private_dir(path: &Path) -> Result<()> {
1553 create_private_dir_all(path)
1554 .with_context(|| format!("create raw mirror dir {}", path.display()))?;
1555 let metadata = fs::symlink_metadata(path)
1556 .with_context(|| format!("stat raw mirror dir {}", path.display()))?;
1557 let file_type = metadata.file_type();
1558 if file_type.is_symlink() {
1559 return Err(anyhow!(
1560 "refusing to use symlink raw mirror dir {}",
1561 path.display()
1562 ));
1563 }
1564 if !file_type.is_dir() {
1565 return Err(anyhow!(
1566 "refusing to use non-directory raw mirror path {}",
1567 path.display()
1568 ));
1569 }
1570 #[cfg(unix)]
1571 {
1572 use std::os::unix::fs::PermissionsExt;
1573 if metadata.permissions().mode() & 0o777 != 0o700 {
1574 set_private_dir_permissions(path)?;
1575 }
1576 }
1577 #[cfg(not(unix))]
1578 {
1579 set_private_dir_permissions(path)?;
1580 }
1581 Ok(())
1582}
1583
1584fn ensure_private_dir_descendant(root: &Path, path: &Path) -> Result<()> {
1585 let relative = path.strip_prefix(root).with_context(|| {
1586 format!(
1587 "raw mirror private dir {} is not under root {}",
1588 path.display(),
1589 root.display()
1590 )
1591 })?;
1592
1593 if let Some(root_parent) = root.parent() {
1594 ensure_private_dir(root_parent)?;
1595 }
1596 ensure_private_dir(root)?;
1597 let mut current = root.to_path_buf();
1598 for component in relative.components() {
1599 match component {
1600 Component::Normal(part) => {
1601 current.push(part);
1602 ensure_private_dir(¤t)?;
1603 }
1604 Component::CurDir => {}
1605 _ => {
1606 return Err(anyhow!(
1607 "raw mirror private dir contains non-normal component: {}",
1608 path.display()
1609 ));
1610 }
1611 }
1612 }
1613
1614 Ok(())
1615}
1616
1617fn private_create_new_file(path: &Path) -> Result<File> {
1618 let mut options = OpenOptions::new();
1619 options.write(true).create_new(true);
1620 set_private_create_file_mode(&mut options);
1621 let file = options
1622 .open(path)
1623 .with_context(|| format!("create raw mirror file {}", path.display()))?;
1624 Ok(file)
1625}
1626
1627#[cfg(unix)]
1628fn create_private_dir_all(path: &Path) -> std::io::Result<()> {
1629 use std::os::unix::fs::DirBuilderExt;
1630
1631 let mut builder = fs::DirBuilder::new();
1632 builder.recursive(true).mode(0o700).create(path)
1633}
1634
1635#[cfg(not(unix))]
1636fn create_private_dir_all(path: &Path) -> std::io::Result<()> {
1637 fs::create_dir_all(path)
1638}
1639
1640#[cfg(unix)]
1641fn set_private_create_file_mode(options: &mut OpenOptions) {
1642 use std::os::unix::fs::OpenOptionsExt;
1643
1644 options.mode(0o600);
1645}
1646
1647#[cfg(not(unix))]
1648fn set_private_create_file_mode(_options: &mut OpenOptions) {}
1649
1650fn sync_file(path: &Path) -> Result<()> {
1651 File::open(path)
1652 .and_then(|file| file.sync_all())
1653 .with_context(|| format!("sync raw mirror file {}", path.display()))
1654}
1655
1656fn sync_parent(path: &Path) -> Result<()> {
1657 let Some(parent) = path.parent() else {
1658 return Ok(());
1659 };
1660 File::open(parent)
1661 .and_then(|file| file.sync_all())
1662 .with_context(|| format!("sync raw mirror parent {}", parent.display()))
1663}
1664
1665fn unique_temp_path(dir: &Path, label: &str) -> PathBuf {
1666 let nonce = TEMP_NONCE.fetch_add(1, Ordering::Relaxed);
1667 let nanos = SystemTime::now()
1668 .duration_since(UNIX_EPOCH)
1669 .unwrap_or_default()
1670 .as_nanos();
1671 dir.join(format!(
1672 ".{label}.{}.{}.{}.tmp",
1673 std::process::id(),
1674 nanos,
1675 nonce
1676 ))
1677}
1678
1679fn unique_capture_temp_dir(root: &Path) -> PathBuf {
1680 let nonce = TEMP_NONCE.fetch_add(1, Ordering::Relaxed);
1681 let nanos = SystemTime::now()
1682 .duration_since(UNIX_EPOCH)
1683 .unwrap_or_default()
1684 .as_nanos();
1685 root.join("tmp").join(format!(
1686 "capture.{}.{}.{}",
1687 std::process::id(),
1688 nanos,
1689 nonce
1690 ))
1691}
1692
1693fn remove_temp_best_effort(path: &Path) {
1694 if let Err(err) = fs::remove_file(path) {
1695 tracing::debug!(
1696 path = %path.display(),
1697 error = %err,
1698 "failed to remove raw mirror temp file"
1699 );
1700 }
1701}
1702
1703fn remove_empty_temp_dir_best_effort(path: &Path) {
1704 if let Err(err) = fs::remove_dir(path) {
1705 tracing::debug!(
1706 path = %path.display(),
1707 error = %err,
1708 "failed to remove raw mirror temp directory"
1709 );
1710 }
1711}
1712
1713fn redacted_original_path(provider: &str, source_path: &Path) -> String {
1714 let file_name = source_path
1715 .file_name()
1716 .and_then(|name| name.to_str())
1717 .unwrap_or("session");
1718 format!("[{provider}]/{file_name}")
1719}
1720
1721fn now_ms() -> i64 {
1722 system_time_to_ms(SystemTime::now()).unwrap_or(0)
1723}
1724
1725fn system_time_to_ms(time: SystemTime) -> Option<i64> {
1726 time.duration_since(UNIX_EPOCH)
1727 .ok()
1728 .and_then(|duration| i64::try_from(duration.as_millis()).ok())
1729}
1730
1731fn system_time_to_ns(time: SystemTime) -> Option<u128> {
1732 time.duration_since(UNIX_EPOCH)
1733 .ok()
1734 .map(|duration| duration.as_nanos())
1735}
1736
1737#[cfg(unix)]
1738fn set_private_dir_permissions(path: &Path) -> Result<()> {
1739 use std::os::unix::fs::PermissionsExt;
1740 fs::set_permissions(path, fs::Permissions::from_mode(0o700))
1741 .with_context(|| format!("set raw mirror dir permissions {}", path.display()))
1742}
1743
1744#[cfg(not(unix))]
1745fn set_private_dir_permissions(_path: &Path) -> Result<()> {
1746 Ok(())
1747}
1748
1749#[cfg(unix)]
1750fn set_private_file_permissions(path: &Path) -> Result<()> {
1751 use std::os::unix::fs::PermissionsExt;
1752 fs::set_permissions(path, fs::Permissions::from_mode(0o600))
1753 .with_context(|| format!("set raw mirror file permissions {}", path.display()))
1754}
1755
1756#[cfg(not(unix))]
1757fn set_private_file_permissions(_path: &Path) -> Result<()> {
1758 Ok(())
1759}
1760
1761#[cfg(test)]
1762mod tests {
1763 use super::*;
1764
1765 #[test]
1766 fn capture_source_file_writes_doctor_compatible_manifest_idempotently() {
1767 let temp = tempfile::TempDir::new().expect("tempdir");
1768 let data_dir = temp.path().join("cass-data");
1769 let source_path = temp.path().join("rollout-fixture.jsonl");
1770 let source_bytes = b"{\"type\":\"message\",\"text\":\"hello\"}\n";
1771 fs::write(&source_path, source_bytes).expect("write source");
1772 let db_link = RawMirrorDbLink {
1773 conversation_id: Some(42),
1774 message_count: Some(1),
1775 source_path: Some(source_path.display().to_string()),
1776 started_at_ms: Some(1_733_000_000_000),
1777 };
1778
1779 let first = capture_source_file(RawMirrorCaptureInput {
1780 data_dir: &data_dir,
1781 provider: "codex",
1782 source_id: "local",
1783 origin_kind: "local",
1784 origin_host: None,
1785 source_path: &source_path,
1786 db_links: std::slice::from_ref(&db_link),
1787 })
1788 .expect("first capture");
1789 let second = capture_source_file(RawMirrorCaptureInput {
1790 data_dir: &data_dir,
1791 provider: "codex",
1792 source_id: "local",
1793 origin_kind: "local",
1794 origin_host: None,
1795 source_path: &source_path,
1796 db_links: std::slice::from_ref(&db_link),
1797 })
1798 .expect("second capture");
1799
1800 assert_eq!(first.manifest_id, second.manifest_id);
1801 assert_eq!(first.blob_blake3, second.blob_blake3);
1802 assert_eq!(first.captured_at_ms, second.captured_at_ms);
1803 assert_eq!(first.source_mtime_ms, second.source_mtime_ms);
1804 assert!(!first.already_present);
1805 assert!(second.already_present);
1806 assert_eq!(fs::read(&source_path).expect("source bytes"), source_bytes);
1807
1808 let blob_path = data_dir
1809 .join(RAW_MIRROR_ROOT_DIR)
1810 .join(RAW_MIRROR_VERSION_DIR)
1811 .join(&first.blob_relative_path);
1812 let manifest_path = data_dir
1813 .join(RAW_MIRROR_ROOT_DIR)
1814 .join(RAW_MIRROR_VERSION_DIR)
1815 .join(&first.manifest_relative_path);
1816 assert_eq!(fs::read(blob_path).expect("blob bytes"), source_bytes);
1817
1818 let manifest: Value =
1819 serde_json::from_slice(&fs::read(&manifest_path).expect("manifest bytes"))
1820 .expect("manifest json");
1821 assert_eq!(
1822 manifest["manifest_kind"].as_str(),
1823 Some(RAW_MIRROR_MANIFEST_KIND)
1824 );
1825 assert_eq!(manifest["provider"].as_str(), Some("codex"));
1826 assert_eq!(
1827 manifest["blob_blake3"].as_str(),
1828 Some(first.blob_blake3.as_str())
1829 );
1830 assert_eq!(
1831 manifest["redacted_original_path"].as_str(),
1832 Some("[codex]/rollout-fixture.jsonl")
1833 );
1834 assert_eq!(
1835 manifest["db_links"][0]["conversation_id"].as_i64(),
1836 Some(42)
1837 );
1838 assert_eq!(manifest["db_links"][0]["message_count"].as_u64(), Some(1));
1839 assert!(
1840 manifest["manifest_blake3"]
1841 .as_str()
1842 .is_some_and(|value| value.starts_with("doctor-raw-mirror-manifest-v1-"))
1843 );
1844 let tmp_root = data_dir
1845 .join(RAW_MIRROR_ROOT_DIR)
1846 .join(RAW_MIRROR_VERSION_DIR)
1847 .join("tmp");
1848 assert_eq!(
1849 fs::read_dir(&tmp_root)
1850 .expect("raw mirror tmp root")
1851 .collect::<Vec<_>>()
1852 .len(),
1853 0,
1854 "successful captures must not leave doctor-visible interrupted temp artifacts"
1855 );
1856
1857 #[cfg(unix)]
1858 {
1859 use std::os::unix::fs::PermissionsExt;
1860
1861 let root = data_dir
1862 .join(RAW_MIRROR_ROOT_DIR)
1863 .join(RAW_MIRROR_VERSION_DIR);
1864 assert_eq!(
1865 fs::metadata(&root)
1866 .expect("raw mirror root metadata")
1867 .permissions()
1868 .mode()
1869 & 0o777,
1870 0o700
1871 );
1872 assert_eq!(
1873 fs::metadata(&manifest_path)
1874 .expect("manifest metadata")
1875 .permissions()
1876 .mode()
1877 & 0o777,
1878 0o600
1879 );
1880 }
1881 }
1882
1883 #[test]
1884 fn capture_source_file_merges_db_links_into_existing_manifest() {
1885 let temp = tempfile::TempDir::new().expect("tempdir");
1886 let data_dir = temp.path().join("cass-data");
1887 let source_path = temp.path().join("preparse-then-parsed.jsonl");
1888 let source_bytes = b"{\"type\":\"message\",\"text\":\"hello\"}\n";
1889 fs::write(&source_path, source_bytes).expect("write source");
1890
1891 let preparse = capture_source_file(RawMirrorCaptureInput {
1892 data_dir: &data_dir,
1893 provider: "codex",
1894 source_id: "local",
1895 origin_kind: "local",
1896 origin_host: None,
1897 source_path: &source_path,
1898 db_links: &[],
1899 })
1900 .expect("preparse capture");
1901
1902 let parsed_link = RawMirrorDbLink {
1903 conversation_id: None,
1904 message_count: Some(1),
1905 source_path: Some(source_path.display().to_string()),
1906 started_at_ms: Some(1_733_000_000_000),
1907 };
1908 let parsed = capture_source_file(RawMirrorCaptureInput {
1909 data_dir: &data_dir,
1910 provider: "codex",
1911 source_id: "local",
1912 origin_kind: "local",
1913 origin_host: None,
1914 source_path: &source_path,
1915 db_links: std::slice::from_ref(&parsed_link),
1916 })
1917 .expect("parsed capture");
1918
1919 assert_eq!(preparse.manifest_id, parsed.manifest_id);
1920 assert_eq!(preparse.blob_blake3, parsed.blob_blake3);
1921 assert!(parsed.already_present);
1922
1923 let manifest_path = data_dir
1924 .join(RAW_MIRROR_ROOT_DIR)
1925 .join(RAW_MIRROR_VERSION_DIR)
1926 .join(&parsed.manifest_relative_path);
1927 let manifest = read_raw_mirror_manifest(&manifest_path).expect("merged manifest");
1928 assert_eq!(
1929 manifest.db_links,
1930 vec![parsed_link],
1931 "second capture must enrich the pre-parse manifest with DB-link evidence"
1932 );
1933 let expected_manifest_blake3 = raw_mirror_manifest_blake3(&manifest);
1934 assert_eq!(
1935 manifest.manifest_blake3.as_deref(),
1936 Some(expected_manifest_blake3.as_str()),
1937 "manifest checksum must be recomputed after DB-link merge"
1938 );
1939 assert_eq!(fs::read(&source_path).expect("source bytes"), source_bytes);
1940 }
1941
1942 #[test]
1943 fn merge_manifest_db_links_rejects_hostile_relative_paths() {
1944 let temp = tempfile::TempDir::new().expect("tempdir");
1945 let data_dir = temp.path().join("cass-data");
1946 let db_link = RawMirrorDbLink {
1947 conversation_id: Some(42),
1948 message_count: Some(1),
1949 source_path: Some("source.jsonl".to_string()),
1950 started_at_ms: Some(1_733_000_000_000),
1951 };
1952
1953 for relative in [
1954 "../escape.json",
1955 "/tmp/escape.json",
1956 "manifests/../escape.json",
1957 "blobs/blake3/ab/not-a-manifest.raw",
1958 "manifests/not-json.txt",
1959 ] {
1960 let err = merge_manifest_db_links(&data_dir, relative, std::slice::from_ref(&db_link))
1961 .expect_err("hostile manifest path should be rejected");
1962 assert!(
1963 err.to_string().contains("raw mirror manifest path"),
1964 "unexpected error for {relative}: {err}"
1965 );
1966 }
1967 }
1968
1969 #[cfg(unix)]
1970 #[test]
1971 fn merge_manifest_db_links_rejects_symlink_manifest_path() {
1972 let temp = tempfile::TempDir::new().expect("tempdir");
1973 let data_dir = temp.path().join("cass-data");
1974 let manifest_dir = data_dir.join("raw-mirror/v1/manifests");
1975 fs::create_dir_all(&manifest_dir).expect("manifest dir");
1976 let outside = temp.path().join("outside.json");
1977 fs::write(&outside, "{}").expect("outside manifest");
1978 std::os::unix::fs::symlink(&outside, manifest_dir.join("link.json"))
1979 .expect("symlink manifest");
1980 let db_link = RawMirrorDbLink {
1981 conversation_id: Some(42),
1982 message_count: Some(1),
1983 source_path: Some("source.jsonl".to_string()),
1984 started_at_ms: Some(1_733_000_000_000),
1985 };
1986
1987 let err = merge_manifest_db_links(
1988 &data_dir,
1989 "manifests/link.json",
1990 std::slice::from_ref(&db_link),
1991 )
1992 .expect_err("symlink manifest should be rejected");
1993 assert!(
1994 err.to_string().contains("symlink raw mirror manifest"),
1995 "unexpected symlink-manifest error: {err}"
1996 );
1997 }
1998
1999 #[test]
2000 fn capture_source_file_deduplicates_blob_for_distinct_source_paths() {
2001 let temp = tempfile::TempDir::new().expect("tempdir");
2002 let data_dir = temp.path().join("cass-data");
2003 let first_source = temp.path().join("first.jsonl");
2004 let second_source = temp.path().join("second.jsonl");
2005 let source_bytes = b"{\"type\":\"message\",\"text\":\"shared\"}\n";
2006 fs::write(&first_source, source_bytes).expect("write first source");
2007 fs::write(&second_source, source_bytes).expect("write second source");
2008
2009 let first = capture_source_file(RawMirrorCaptureInput {
2010 data_dir: &data_dir,
2011 provider: "codex",
2012 source_id: "local",
2013 origin_kind: "local",
2014 origin_host: None,
2015 source_path: &first_source,
2016 db_links: &[],
2017 })
2018 .expect("first capture");
2019 let second = capture_source_file(RawMirrorCaptureInput {
2020 data_dir: &data_dir,
2021 provider: "codex",
2022 source_id: "local",
2023 origin_kind: "local",
2024 origin_host: None,
2025 source_path: &second_source,
2026 db_links: &[],
2027 })
2028 .expect("second capture");
2029
2030 assert_eq!(first.blob_blake3, second.blob_blake3);
2031 assert_eq!(first.blob_relative_path, second.blob_relative_path);
2032 assert_ne!(first.manifest_id, second.manifest_id);
2033 assert!(
2034 !second.already_present,
2035 "a duplicate blob with a new source manifest is not a full capture replay"
2036 );
2037
2038 let manifest_root = data_dir
2039 .join(RAW_MIRROR_ROOT_DIR)
2040 .join(RAW_MIRROR_VERSION_DIR)
2041 .join("manifests");
2042 let manifests = fs::read_dir(manifest_root)
2043 .expect("manifest dir")
2044 .collect::<std::io::Result<Vec<_>>>()
2045 .expect("manifest entries");
2046 assert_eq!(manifests.len(), 2);
2047
2048 let summary = storage_summary(&data_dir);
2049 assert!(summary.initialized);
2050 assert_eq!(summary.manifest_count, 2);
2051 assert_eq!(summary.unique_blob_count, 1);
2052 assert_eq!(summary.total_blob_bytes, source_bytes.len() as u64);
2053 assert_eq!(summary.largest_blob_bytes, source_bytes.len() as u64);
2054 assert_eq!(summary.missing_blob_count, 0);
2055 assert_eq!(summary.invalid_manifest_count, 0);
2056 assert!(summary.total_storage_bytes >= source_bytes.len() as u64);
2057 }
2058
2059 #[test]
2060 fn storage_summary_rejects_hostile_blob_relative_path() {
2061 let temp = tempfile::TempDir::new().expect("tempdir");
2062 let data_dir = temp.path().join("cass-data");
2063 let source_path = temp.path().join("source.jsonl");
2064 fs::write(
2065 &source_path,
2066 b"{\"type\":\"message\",\"text\":\"hostile\"}\n",
2067 )
2068 .expect("write source");
2069
2070 let captured = capture_source_file(RawMirrorCaptureInput {
2071 data_dir: &data_dir,
2072 provider: "codex",
2073 source_id: "local",
2074 origin_kind: "local",
2075 origin_host: None,
2076 source_path: &source_path,
2077 db_links: &[],
2078 })
2079 .expect("capture source");
2080 let manifest_path = data_dir
2081 .join(RAW_MIRROR_ROOT_DIR)
2082 .join(RAW_MIRROR_VERSION_DIR)
2083 .join(&captured.manifest_relative_path);
2084 let mut manifest = read_raw_mirror_manifest(&manifest_path).expect("read manifest");
2085 manifest.blob_relative_path = "../outside.raw".to_string();
2086 manifest.manifest_blake3 = Some(raw_mirror_manifest_blake3(&manifest));
2087 fs::write(
2088 &manifest_path,
2089 serde_json::to_vec_pretty(&manifest).expect("serialize manifest"),
2090 )
2091 .expect("tamper manifest");
2092
2093 let summary = storage_summary(&data_dir);
2094 assert_eq!(summary.manifest_count, 1);
2095 assert_eq!(summary.invalid_manifest_count, 1);
2096 assert_eq!(summary.unique_blob_count, 0);
2097 assert_eq!(summary.total_blob_bytes, 0);
2098 }
2099
2100 #[test]
2101 fn prune_fails_closed_on_hostile_manifest_inventory() {
2102 let temp = tempfile::TempDir::new().expect("tempdir");
2103 let data_dir = temp.path().join("cass-data");
2104 let source_path = temp.path().join("source.jsonl");
2105 fs::write(
2106 &source_path,
2107 b"{\"type\":\"message\",\"text\":\"hostile\"}\n",
2108 )
2109 .expect("write source");
2110
2111 let captured = capture_source_file(RawMirrorCaptureInput {
2112 data_dir: &data_dir,
2113 provider: "codex",
2114 source_id: "local",
2115 origin_kind: "local",
2116 origin_host: None,
2117 source_path: &source_path,
2118 db_links: &[],
2119 })
2120 .expect("capture source");
2121 let root = data_dir
2122 .join(RAW_MIRROR_ROOT_DIR)
2123 .join(RAW_MIRROR_VERSION_DIR);
2124 let manifest_path = root.join(&captured.manifest_relative_path);
2125 let blob_path = root.join(&captured.blob_relative_path);
2126 let mut manifest = read_raw_mirror_manifest(&manifest_path).expect("read manifest");
2127 manifest.blob_relative_path = "../outside.raw".to_string();
2128 manifest.manifest_blake3 = Some(raw_mirror_manifest_blake3(&manifest));
2129 fs::write(
2130 &manifest_path,
2131 serde_json::to_vec_pretty(&manifest).expect("serialize manifest"),
2132 )
2133 .expect("tamper manifest");
2134
2135 let err = prune(
2136 &data_dir,
2137 RawMirrorPruneOptions {
2138 older_than_ms: Some(0),
2139 max_size_bytes: None,
2140 keep_tags: Vec::new(),
2141 safety_hold_down_ms: 0,
2142 apply: true,
2143 },
2144 )
2145 .expect_err("hostile inventory should fail closed");
2146
2147 assert!(
2148 err.to_string().contains("unexpected blob path"),
2149 "error should explain the unsafe manifest inventory: {err}"
2150 );
2151 assert!(manifest_path.exists());
2152 assert!(blob_path.exists());
2153 assert!(!root.join("pruned.jsonl").exists());
2154 }
2155
2156 #[test]
2157 fn prune_dry_run_audits_without_removing_manifest_or_blob() {
2158 let temp = tempfile::TempDir::new().expect("tempdir");
2159 let data_dir = temp.path().join("cass-data");
2160 let source_path = temp.path().join("source.jsonl");
2161 fs::write(&source_path, b"{\"type\":\"message\",\"text\":\"old\"}\n")
2162 .expect("write source");
2163 let captured = capture_source_file(RawMirrorCaptureInput {
2164 data_dir: &data_dir,
2165 provider: "codex",
2166 source_id: "local",
2167 origin_kind: "local",
2168 origin_host: None,
2169 source_path: &source_path,
2170 db_links: &[],
2171 })
2172 .expect("capture source");
2173
2174 let report = prune(
2175 &data_dir,
2176 RawMirrorPruneOptions {
2177 older_than_ms: Some(0),
2178 max_size_bytes: None,
2179 keep_tags: Vec::new(),
2180 safety_hold_down_ms: 0,
2181 apply: false,
2182 },
2183 )
2184 .expect("dry-run prune");
2185
2186 assert!(report.initialized);
2187 assert_eq!(report.mode, "dry-run");
2188 assert_eq!(report.planned_manifest_count, 1);
2189 assert_eq!(report.planned_blob_count, 1);
2190 assert_eq!(report.applied_reclaim_bytes, 0);
2191 let root = data_dir
2192 .join(RAW_MIRROR_ROOT_DIR)
2193 .join(RAW_MIRROR_VERSION_DIR);
2194 assert!(root.join(&captured.manifest_relative_path).exists());
2195 assert!(root.join(&captured.blob_relative_path).exists());
2196 let audit_path = root.join("pruned.jsonl");
2197 let audit = fs::read_to_string(audit_path).expect("read audit");
2198 assert!(audit.contains("\"mode\":\"dry-run\""));
2199 assert!(audit.contains("\"applied\":false"));
2200 }
2201
2202 #[test]
2203 fn prune_apply_removes_selected_manifest_and_unreferenced_blob() {
2204 let temp = tempfile::TempDir::new().expect("tempdir");
2205 let data_dir = temp.path().join("cass-data");
2206 let source_path = temp.path().join("source.jsonl");
2207 fs::write(&source_path, b"{\"type\":\"message\",\"text\":\"apply\"}\n")
2208 .expect("write source");
2209 let captured = capture_source_file(RawMirrorCaptureInput {
2210 data_dir: &data_dir,
2211 provider: "codex",
2212 source_id: "local",
2213 origin_kind: "local",
2214 origin_host: None,
2215 source_path: &source_path,
2216 db_links: &[],
2217 })
2218 .expect("capture source");
2219 let root = data_dir
2220 .join(RAW_MIRROR_ROOT_DIR)
2221 .join(RAW_MIRROR_VERSION_DIR);
2222 let manifest_path = root.join(&captured.manifest_relative_path);
2223 let blob_path = root.join(&captured.blob_relative_path);
2224
2225 let report = prune(
2226 &data_dir,
2227 RawMirrorPruneOptions {
2228 older_than_ms: Some(0),
2229 max_size_bytes: None,
2230 keep_tags: Vec::new(),
2231 safety_hold_down_ms: 0,
2232 apply: true,
2233 },
2234 )
2235 .expect("apply prune");
2236
2237 assert_eq!(report.applied_manifest_count, 1);
2238 assert_eq!(report.applied_blob_count, 1);
2239 assert!(!manifest_path.exists());
2240 assert!(!blob_path.exists());
2241 let audit = fs::read_to_string(root.join("pruned.jsonl")).expect("read audit");
2242 assert!(audit.contains("\"mode\":\"apply\""));
2243 assert!(audit.contains("\"applied\":true"));
2244 }
2245
2246 #[test]
2247 fn prune_apply_keeps_blob_referenced_by_retained_manifest() {
2248 let temp = tempfile::TempDir::new().expect("tempdir");
2249 let data_dir = temp.path().join("cass-data");
2250 let first_source = temp.path().join("first.jsonl");
2251 let second_source = temp.path().join("second.jsonl");
2252 let bytes = b"{\"type\":\"message\",\"text\":\"shared-retained\"}\n";
2253 fs::write(&first_source, bytes).expect("write first");
2254 fs::write(&second_source, bytes).expect("write second");
2255 let first = capture_source_file(RawMirrorCaptureInput {
2256 data_dir: &data_dir,
2257 provider: "codex",
2258 source_id: "local",
2259 origin_kind: "local",
2260 origin_host: None,
2261 source_path: &first_source,
2262 db_links: &[],
2263 })
2264 .expect("capture first");
2265 let second = capture_source_file(RawMirrorCaptureInput {
2266 data_dir: &data_dir,
2267 provider: "codex",
2268 source_id: "local",
2269 origin_kind: "local",
2270 origin_host: None,
2271 source_path: &second_source,
2272 db_links: &[],
2273 })
2274 .expect("capture second");
2275 let root = data_dir
2276 .join(RAW_MIRROR_ROOT_DIR)
2277 .join(RAW_MIRROR_VERSION_DIR);
2278 let first_manifest_path = root.join(&first.manifest_relative_path);
2279 let second_manifest_path = root.join(&second.manifest_relative_path);
2280 let mut first_manifest =
2281 read_raw_mirror_manifest(&first_manifest_path).expect("first manifest");
2282 first_manifest.captured_at_ms = now_ms().saturating_sub(2 * 86_400_000);
2283 first_manifest.manifest_blake3 = Some(raw_mirror_manifest_blake3(&first_manifest));
2284 fs::write(
2285 &first_manifest_path,
2286 serde_json::to_vec_pretty(&first_manifest).expect("serialize first manifest"),
2287 )
2288 .expect("rewrite first manifest");
2289
2290 let report = prune(
2291 &data_dir,
2292 RawMirrorPruneOptions {
2293 older_than_ms: Some(86_400_000),
2294 max_size_bytes: None,
2295 keep_tags: Vec::new(),
2296 safety_hold_down_ms: 0,
2297 apply: true,
2298 },
2299 )
2300 .expect("apply one-manifest prune");
2301
2302 assert_eq!(report.applied_manifest_count, 1);
2303 assert_eq!(report.applied_blob_count, 0);
2304 assert!(!first_manifest_path.exists());
2305 assert!(second_manifest_path.exists());
2306 assert!(
2307 root.join(&first.blob_relative_path).exists(),
2308 "shared blob must stay while a retained manifest still references it"
2309 );
2310 }
2311
2312 #[test]
2313 fn prune_apply_keep_tag_pins_linked_manifest_and_blob() {
2314 use frankensqlite::compat::ConnectionExt as _;
2315
2316 let temp = tempfile::TempDir::new().expect("tempdir");
2317 let data_dir = temp.path().join("cass-data");
2318 std::fs::create_dir_all(&data_dir).expect("create data dir");
2319 let source_path = temp.path().join("tagged.jsonl");
2320 fs::write(
2321 &source_path,
2322 b"{\"type\":\"message\",\"text\":\"tagged\"}\n",
2323 )
2324 .expect("write source");
2325 let db_link = RawMirrorDbLink {
2326 conversation_id: Some(7),
2327 message_count: Some(1),
2328 source_path: Some(source_path.display().to_string()),
2329 started_at_ms: Some(1_733_000_000_000),
2330 };
2331 let captured = capture_source_file(RawMirrorCaptureInput {
2332 data_dir: &data_dir,
2333 provider: "codex",
2334 source_id: "local",
2335 origin_kind: "local",
2336 origin_host: None,
2337 source_path: &source_path,
2338 db_links: std::slice::from_ref(&db_link),
2339 })
2340 .expect("capture source");
2341 let db_path = data_dir.join("agent_search.db");
2342 let conn = frankensqlite::Connection::open(db_path.to_string_lossy().into_owned())
2343 .expect("open keep-tag db");
2344 conn.execute_compat(
2345 "CREATE TABLE tags (id INTEGER PRIMARY KEY, name TEXT NOT NULL UNIQUE)",
2346 frankensqlite::params![],
2347 )
2348 .expect("create tags");
2349 conn.execute_compat(
2350 "CREATE TABLE conversation_tags (conversation_id INTEGER NOT NULL, tag_id INTEGER NOT NULL, PRIMARY KEY (conversation_id, tag_id))",
2351 frankensqlite::params![],
2352 )
2353 .expect("create conversation_tags");
2354 conn.execute_compat(
2355 "INSERT INTO tags (id, name) VALUES (1, 'keep')",
2356 frankensqlite::params![],
2357 )
2358 .expect("insert tag");
2359 conn.execute_compat(
2360 "INSERT INTO conversation_tags (conversation_id, tag_id) VALUES (7, 1)",
2361 frankensqlite::params![],
2362 )
2363 .expect("insert conversation tag");
2364 drop(conn);
2365
2366 let report = prune(
2367 &data_dir,
2368 RawMirrorPruneOptions {
2369 older_than_ms: Some(0),
2370 max_size_bytes: Some(0),
2371 keep_tags: vec!["keep".to_string()],
2372 safety_hold_down_ms: 0,
2373 apply: true,
2374 },
2375 )
2376 .expect("keep-tag prune");
2377
2378 let root = data_dir
2379 .join(RAW_MIRROR_ROOT_DIR)
2380 .join(RAW_MIRROR_VERSION_DIR);
2381 assert_eq!(report.pinned_manifest_count, 1);
2382 assert_eq!(report.pinned_blob_count, 1);
2383 assert_eq!(report.planned_manifest_count, 0);
2384 assert_eq!(report.planned_blob_count, 0);
2385 assert!(root.join(&captured.manifest_relative_path).exists());
2386 assert!(root.join(&captured.blob_relative_path).exists());
2387 }
2388
2389 #[test]
2390 fn prune_apply_safety_hold_down_pins_recent_manifest_during_size_prune() {
2391 let temp = tempfile::TempDir::new().expect("tempdir");
2392 let data_dir = temp.path().join("cass-data");
2393 let source_path = temp.path().join("recent.jsonl");
2394 fs::write(
2395 &source_path,
2396 b"{\"type\":\"message\",\"text\":\"recent\"}\n",
2397 )
2398 .expect("write source");
2399 let captured = capture_source_file(RawMirrorCaptureInput {
2400 data_dir: &data_dir,
2401 provider: "codex",
2402 source_id: "local",
2403 origin_kind: "local",
2404 origin_host: None,
2405 source_path: &source_path,
2406 db_links: &[],
2407 })
2408 .expect("capture source");
2409
2410 let report = prune(
2411 &data_dir,
2412 RawMirrorPruneOptions {
2413 older_than_ms: None,
2414 max_size_bytes: Some(0),
2415 keep_tags: Vec::new(),
2416 safety_hold_down_ms: 7 * 86_400_000,
2417 apply: true,
2418 },
2419 )
2420 .expect("hold-down prune");
2421
2422 let root = data_dir
2423 .join(RAW_MIRROR_ROOT_DIR)
2424 .join(RAW_MIRROR_VERSION_DIR);
2425 assert_eq!(report.pinned_manifest_count, 1);
2426 assert_eq!(report.pinned_blob_count, 1);
2427 assert_eq!(report.planned_manifest_count, 0);
2428 assert_eq!(report.planned_blob_count, 0);
2429 assert!(root.join(&captured.manifest_relative_path).exists());
2430 assert!(root.join(&captured.blob_relative_path).exists());
2431 }
2432
2433 #[test]
2434 fn capture_source_file_revalidates_cached_blob_contents() {
2435 let temp = tempfile::TempDir::new().expect("tempdir");
2436 let data_dir = temp.path().join("cass-data");
2437 let source_path = temp.path().join("cached-source.jsonl");
2438 let source_bytes = b"{\"type\":\"message\",\"text\":\"cache me\"}\n";
2439 fs::write(&source_path, source_bytes).expect("write source");
2440
2441 let first = capture_source_file(RawMirrorCaptureInput {
2442 data_dir: &data_dir,
2443 provider: "codex",
2444 source_id: "local",
2445 origin_kind: "local",
2446 origin_host: None,
2447 source_path: &source_path,
2448 db_links: &[],
2449 })
2450 .expect("first capture");
2451
2452 let blob_path = data_dir
2453 .join(RAW_MIRROR_ROOT_DIR)
2454 .join(RAW_MIRROR_VERSION_DIR)
2455 .join(&first.blob_relative_path);
2456 fs::write(&blob_path, b"corrupted cached blob").expect("corrupt cached blob");
2457
2458 let err = capture_source_file(RawMirrorCaptureInput {
2459 data_dir: &data_dir,
2460 provider: "codex",
2461 source_id: "local",
2462 origin_kind: "local",
2463 origin_host: None,
2464 source_path: &source_path,
2465 db_links: &[],
2466 })
2467 .expect_err("corrupted content-addressed blob must be rejected");
2468 assert!(
2469 err.to_string().contains("existing raw mirror blob"),
2470 "unexpected cached-blob error: {err:#}"
2471 );
2472 assert_eq!(fs::read(&source_path).expect("source bytes"), source_bytes);
2473 }
2474
2475 #[cfg(unix)]
2476 #[test]
2477 fn capture_source_file_does_not_reuse_cache_after_same_size_mtime_preserving_rewrite() {
2478 let temp = tempfile::TempDir::new().expect("tempdir");
2479 let data_dir = temp.path().join("cass-data");
2480 let source_path = temp.path().join("same-size-rewrite.jsonl");
2481 let first_bytes = b"same length payload A\n";
2482 let second_bytes = b"same length payload B\n";
2483 fs::write(&source_path, first_bytes).expect("write first source");
2484
2485 let first_modified = fs::metadata(&source_path)
2486 .expect("first metadata")
2487 .modified()
2488 .expect("first modified time");
2489 let first = capture_source_file(RawMirrorCaptureInput {
2490 data_dir: &data_dir,
2491 provider: "codex",
2492 source_id: "local",
2493 origin_kind: "local",
2494 origin_host: None,
2495 source_path: &source_path,
2496 db_links: &[],
2497 })
2498 .expect("first capture");
2499
2500 std::thread::sleep(std::time::Duration::from_millis(5));
2501 fs::write(&source_path, second_bytes).expect("rewrite source");
2502 let source = OpenOptions::new()
2503 .write(true)
2504 .open(&source_path)
2505 .expect("open rewritten source");
2506 source
2507 .set_times(std::fs::FileTimes::new().set_modified(first_modified))
2508 .expect("restore original mtime");
2509
2510 let second = capture_source_file(RawMirrorCaptureInput {
2511 data_dir: &data_dir,
2512 provider: "codex",
2513 source_id: "local",
2514 origin_kind: "local",
2515 origin_host: None,
2516 source_path: &source_path,
2517 db_links: &[],
2518 })
2519 .expect("second capture");
2520
2521 assert_ne!(first.blob_blake3, second.blob_blake3);
2522 assert_eq!(
2523 second.blob_blake3,
2524 blake3::hash(second_bytes).to_hex().to_string()
2525 );
2526 assert_eq!(
2527 fs::read(&source_path).expect("source bytes after rewrite"),
2528 second_bytes
2529 );
2530 }
2531
2532 #[cfg(unix)]
2533 #[test]
2534 fn capture_source_file_rejects_symlinked_existing_blob_path() {
2535 let temp = tempfile::TempDir::new().expect("tempdir");
2536 let data_dir = temp.path().join("cass-data");
2537 let source_path = temp.path().join("cached-source.jsonl");
2538 let source_bytes = b"{\"type\":\"message\",\"text\":\"cache me\"}\n";
2539 fs::write(&source_path, source_bytes).expect("write source");
2540
2541 let blob_blake3 = blake3::hash(source_bytes).to_hex().to_string();
2542 let blob_relative_path =
2543 raw_mirror_blob_relative_path(&blob_blake3).expect("blob relative path");
2544 let blob_path = data_dir
2545 .join(RAW_MIRROR_ROOT_DIR)
2546 .join(RAW_MIRROR_VERSION_DIR)
2547 .join(&blob_relative_path);
2548 fs::create_dir_all(blob_path.parent().expect("blob parent")).expect("blob parent dir");
2549 let outside = temp.path().join("outside.raw");
2550 fs::write(&outside, source_bytes).expect("outside blob bytes");
2551 std::os::unix::fs::symlink(&outside, &blob_path).expect("symlink blob");
2552
2553 let err = capture_source_file(RawMirrorCaptureInput {
2554 data_dir: &data_dir,
2555 provider: "codex",
2556 source_id: "local",
2557 origin_kind: "local",
2558 origin_host: None,
2559 source_path: &source_path,
2560 db_links: &[],
2561 })
2562 .expect_err("symlinked content-addressed blob path must be rejected");
2563 assert!(
2564 err.to_string().contains("symlink raw mirror blob"),
2565 "unexpected symlink-blob error: {err:#}"
2566 );
2567
2568 let manifest_root = data_dir
2569 .join(RAW_MIRROR_ROOT_DIR)
2570 .join(RAW_MIRROR_VERSION_DIR)
2571 .join("manifests");
2572 assert!(
2573 !manifest_root.exists(),
2574 "failed blob publish must not write a manifest pointing at a symlinked blob"
2575 );
2576 assert_eq!(fs::read(&source_path).expect("source bytes"), source_bytes);
2577 assert_eq!(fs::read(&outside).expect("outside bytes"), source_bytes);
2578 }
2579
2580 #[cfg(unix)]
2581 #[test]
2582 fn capture_source_file_rejects_symlinked_raw_mirror_root_dir() {
2583 let temp = tempfile::TempDir::new().expect("tempdir");
2584 let data_dir = temp.path().join("cass-data");
2585 let source_path = temp.path().join("source.jsonl");
2586 let outside_mirror = temp.path().join("outside-mirror");
2587 let source_bytes = b"{\"type\":\"message\",\"text\":\"do not redirect archive\"}\n";
2588
2589 fs::create_dir_all(&data_dir).expect("data dir");
2590 fs::create_dir_all(&outside_mirror).expect("outside mirror dir");
2591 fs::write(&source_path, source_bytes).expect("write source");
2592 std::os::unix::fs::symlink(&outside_mirror, data_dir.join(RAW_MIRROR_ROOT_DIR))
2593 .expect("symlink raw mirror root");
2594
2595 let err = capture_source_file(RawMirrorCaptureInput {
2596 data_dir: &data_dir,
2597 provider: "codex",
2598 source_id: "local",
2599 origin_kind: "local",
2600 origin_host: None,
2601 source_path: &source_path,
2602 db_links: &[],
2603 })
2604 .expect_err("symlinked raw-mirror root must be rejected");
2605
2606 assert!(
2607 err.to_string().contains("symlink raw mirror dir"),
2608 "unexpected symlink-root error: {err:#}"
2609 );
2610 assert!(
2611 !outside_mirror.join(RAW_MIRROR_VERSION_DIR).exists(),
2612 "raw mirror capture must not create redirected archive state outside data_dir"
2613 );
2614 assert_eq!(fs::read(&source_path).expect("source bytes"), source_bytes);
2615 }
2616
2617 #[cfg(unix)]
2618 #[test]
2619 fn capture_source_file_rejects_symlinked_blob_directory_component() {
2620 let temp = tempfile::TempDir::new().expect("tempdir");
2621 let data_dir = temp.path().join("cass-data");
2622 let root = data_dir
2623 .join(RAW_MIRROR_ROOT_DIR)
2624 .join(RAW_MIRROR_VERSION_DIR);
2625 let source_path = temp.path().join("source.jsonl");
2626 let outside_blobs = temp.path().join("outside-blobs");
2627 let source_bytes = b"{\"type\":\"message\",\"text\":\"do not redirect blobs\"}\n";
2628
2629 fs::create_dir_all(&root).expect("raw mirror root");
2630 fs::create_dir_all(&outside_blobs).expect("outside blobs dir");
2631 fs::write(&source_path, source_bytes).expect("write source");
2632 std::os::unix::fs::symlink(&outside_blobs, root.join("blobs")).expect("symlink blobs dir");
2633
2634 let err = capture_source_file(RawMirrorCaptureInput {
2635 data_dir: &data_dir,
2636 provider: "codex",
2637 source_id: "local",
2638 origin_kind: "local",
2639 origin_host: None,
2640 source_path: &source_path,
2641 db_links: &[],
2642 })
2643 .expect_err("symlinked blob directory must be rejected");
2644
2645 assert!(
2646 err.to_string().contains("symlink raw mirror dir"),
2647 "unexpected symlink-blob-dir error: {err:#}"
2648 );
2649 assert!(
2650 !outside_blobs.join(RAW_MIRROR_HASH_ALGORITHM).exists(),
2651 "raw mirror capture must not create redirected blob state outside data_dir"
2652 );
2653 assert!(
2654 !root.join("manifests").exists(),
2655 "failed blob publish must not write a manifest"
2656 );
2657 assert_eq!(fs::read(&source_path).expect("source bytes"), source_bytes);
2658 }
2659
2660 #[test]
2661 fn capture_source_file_rejects_non_file_sources() {
2662 let temp = tempfile::TempDir::new().expect("tempdir");
2663 let data_dir = temp.path().join("cass-data");
2664 let source_dir = temp.path().join("source-dir");
2665 fs::create_dir(&source_dir).expect("source dir");
2666
2667 let err = capture_source_file(RawMirrorCaptureInput {
2668 data_dir: &data_dir,
2669 provider: "codex",
2670 source_id: "local",
2671 origin_kind: "local",
2672 origin_host: None,
2673 source_path: &source_dir,
2674 db_links: &[],
2675 })
2676 .expect_err("directory source should be rejected");
2677 assert!(
2678 err.to_string().contains("non-file source"),
2679 "unexpected non-file-source error: {err}"
2680 );
2681 assert!(
2682 !data_dir.join(RAW_MIRROR_ROOT_DIR).exists(),
2683 "rejected non-file sources must not initialize raw mirror storage"
2684 );
2685 }
2686
2687 #[cfg(unix)]
2688 #[test]
2689 fn capture_source_file_rejects_unreadable_sources_without_manifest() {
2690 use std::os::unix::fs::PermissionsExt;
2691
2692 let temp = tempfile::TempDir::new().expect("tempdir");
2693 let data_dir = temp.path().join("cass-data");
2694 let source_path = temp.path().join("unreadable.jsonl");
2695 fs::write(&source_path, b"private session bytes\n").expect("source");
2696 fs::set_permissions(&source_path, fs::Permissions::from_mode(0o000))
2697 .expect("make source unreadable");
2698
2699 let err = capture_source_file(RawMirrorCaptureInput {
2700 data_dir: &data_dir,
2701 provider: "codex",
2702 source_id: "local",
2703 origin_kind: "local",
2704 origin_host: None,
2705 source_path: &source_path,
2706 db_links: &[],
2707 })
2708 .expect_err("unreadable source should be rejected");
2709 fs::set_permissions(&source_path, fs::Permissions::from_mode(0o600))
2710 .expect("restore source perms");
2711 assert!(
2712 err.to_string().contains("open raw mirror source"),
2713 "unexpected unreadable-source error: {err}"
2714 );
2715 assert!(
2716 !data_dir.join("raw-mirror/v1/manifests").exists(),
2717 "failed unreadable-source captures must not publish manifests"
2718 );
2719 }
2720
2721 #[cfg(unix)]
2722 #[test]
2723 fn capture_source_file_rejects_symlink_sources() {
2724 use std::os::unix::fs::symlink;
2725
2726 let temp = tempfile::TempDir::new().expect("tempdir");
2727 let data_dir = temp.path().join("cass-data");
2728 let real_source = temp.path().join("real.jsonl");
2729 let symlink_source = temp.path().join("link.jsonl");
2730 fs::write(&real_source, b"secret session").expect("write source");
2731 symlink(&real_source, &symlink_source).expect("symlink");
2732
2733 let err = capture_source_file(RawMirrorCaptureInput {
2734 data_dir: &data_dir,
2735 provider: "codex",
2736 source_id: "local",
2737 origin_kind: "local",
2738 origin_host: None,
2739 source_path: &symlink_source,
2740 db_links: &[],
2741 })
2742 .expect_err("symlink source should be rejected");
2743 assert!(
2744 err.to_string().contains("symlink source"),
2745 "unexpected error: {err:#}"
2746 );
2747 }
2748}