1use anyhow::{Context, Result, anyhow};
2use serde::{Deserialize, Serialize};
3use serde_json::{Value, json};
4use std::collections::{HashMap, HashSet};
5use std::fs::{self, File, OpenOptions};
6use std::io::{Read, Write};
7use std::path::{Component, Path, PathBuf};
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::{Mutex, OnceLock};
10use std::time::{Duration, SystemTime, UNIX_EPOCH};
11
12const RAW_MIRROR_SCHEMA_VERSION: u32 = 1;
13const RAW_MIRROR_ROOT_DIR: &str = "raw-mirror";
14const RAW_MIRROR_VERSION_DIR: &str = "v1";
15const RAW_MIRROR_MANIFEST_KIND: &str = "cass_raw_session_mirror_v1";
16const RAW_MIRROR_HASH_ALGORITHM: &str = "blake3";
17const RAW_MIRROR_BLOB_EXTENSION: &str = "raw";
18
19static TEMP_NONCE: AtomicU64 = AtomicU64::new(0);
20static BLOB_CAPTURE_CACHE: OnceLock<Mutex<HashMap<RawMirrorBlobCacheKey, RawMirrorBlobRecord>>> =
21 OnceLock::new();
22static MANIFEST_UPDATE_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
23
24#[derive(Debug, Clone)]
25pub struct RawMirrorCaptureInput<'a> {
26 pub data_dir: &'a Path,
27 pub provider: &'a str,
28 pub source_id: &'a str,
29 pub origin_kind: &'a str,
30 pub origin_host: Option<&'a str>,
31 pub source_path: &'a Path,
32 pub db_links: &'a [RawMirrorDbLink],
33}
34
35#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
36pub struct RawMirrorCaptureRecord {
37 pub manifest_id: String,
38 pub manifest_relative_path: String,
39 pub blob_relative_path: String,
40 pub blob_blake3: String,
41 pub blob_size_bytes: u64,
42 pub captured_at_ms: i64,
43 pub source_mtime_ms: Option<i64>,
44 pub already_present: bool,
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
48pub struct RawMirrorDbLink {
49 pub conversation_id: Option<i64>,
50 pub message_count: Option<usize>,
51 pub source_path: Option<String>,
52 pub started_at_ms: Option<i64>,
53}
54
55#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
56pub struct RawMirrorStorageSummary {
57 pub initialized: bool,
58 pub root_path: String,
59 pub total_storage_bytes: u64,
60 pub manifest_count: u64,
61 pub manifest_bytes: u64,
62 pub unique_blob_count: u64,
63 pub total_blob_bytes: u64,
64 pub largest_blob_bytes: u64,
65 pub missing_blob_count: u64,
66 pub invalid_manifest_count: u64,
67 pub oldest_capture_at_ms: Option<i64>,
68 pub newest_capture_at_ms: Option<i64>,
69 pub oldest_source_mtime_ms: Option<i64>,
70 pub newest_source_mtime_ms: Option<i64>,
71}
72
73pub fn storage_summary(data_dir: &Path) -> RawMirrorStorageSummary {
74 let root = raw_mirror_root(data_dir);
75 let mut summary = RawMirrorStorageSummary {
76 root_path: root.display().to_string(),
77 ..RawMirrorStorageSummary::default()
78 };
79 let root_metadata = match fs::symlink_metadata(&root) {
80 Ok(metadata) => metadata,
81 Err(_) => return summary,
82 };
83 summary.initialized = true;
84 if root_metadata.file_type().is_symlink() || !root_metadata.is_dir() {
85 summary.invalid_manifest_count = 1;
86 return summary;
87 }
88
89 summary.total_storage_bytes = raw_mirror_dir_file_bytes(&root);
90
91 let manifests_dir = root.join("manifests");
92 let Ok(manifests_metadata) = fs::symlink_metadata(&manifests_dir) else {
93 return summary;
94 };
95 if manifests_metadata.file_type().is_symlink() || !manifests_metadata.is_dir() {
96 summary.invalid_manifest_count = summary.invalid_manifest_count.saturating_add(1);
97 return summary;
98 }
99 let entries = match fs::read_dir(&manifests_dir) {
100 Ok(entries) => entries,
101 Err(_) => return summary,
102 };
103 let mut seen_blobs = HashSet::new();
104 for entry in entries {
105 let Ok(entry) = entry else {
106 summary.invalid_manifest_count = summary.invalid_manifest_count.saturating_add(1);
107 continue;
108 };
109 let path = entry.path();
110 let manifest_metadata = match fs::symlink_metadata(&path) {
111 Ok(metadata) if metadata.is_file() && !metadata.file_type().is_symlink() => metadata,
112 _ => {
113 summary.invalid_manifest_count = summary.invalid_manifest_count.saturating_add(1);
114 continue;
115 }
116 };
117 summary.manifest_bytes = summary
118 .manifest_bytes
119 .saturating_add(manifest_metadata.len());
120 let manifest = match read_raw_mirror_manifest(&path) {
121 Ok(manifest) if manifest.manifest_kind == RAW_MIRROR_MANIFEST_KIND => manifest,
122 _ => {
123 summary.invalid_manifest_count = summary.invalid_manifest_count.saturating_add(1);
124 continue;
125 }
126 };
127 summary.manifest_count = summary.manifest_count.saturating_add(1);
128 merge_min_max(
129 &mut summary.oldest_capture_at_ms,
130 &mut summary.newest_capture_at_ms,
131 Some(manifest.captured_at_ms),
132 );
133 merge_min_max(
134 &mut summary.oldest_source_mtime_ms,
135 &mut summary.newest_source_mtime_ms,
136 manifest.source_mtime_ms,
137 );
138
139 let Some(blob_relative_path) = raw_mirror_blob_relative_path(&manifest.blob_blake3) else {
140 summary.invalid_manifest_count = summary.invalid_manifest_count.saturating_add(1);
141 continue;
142 };
143 if manifest.blob_relative_path != blob_relative_path {
144 summary.invalid_manifest_count = summary.invalid_manifest_count.saturating_add(1);
145 continue;
146 }
147
148 if !seen_blobs.insert(blob_relative_path.clone()) {
149 continue;
150 }
151 let blob_path = root.join(blob_relative_path);
152 match fs::symlink_metadata(&blob_path) {
153 Ok(metadata) if metadata.is_file() && !metadata.file_type().is_symlink() => {
154 let size = metadata.len();
155 summary.unique_blob_count = summary.unique_blob_count.saturating_add(1);
156 summary.total_blob_bytes = summary.total_blob_bytes.saturating_add(size);
157 summary.largest_blob_bytes = summary.largest_blob_bytes.max(size);
158 }
159 _ => {
160 summary.missing_blob_count = summary.missing_blob_count.saturating_add(1);
161 }
162 }
163 }
164
165 summary
166}
167
168#[derive(Debug, Clone, Default)]
169pub struct RawMirrorPruneOptions {
170 pub older_than_ms: Option<i64>,
171 pub max_size_bytes: Option<u64>,
172 pub keep_tags: Vec<String>,
173 pub safety_hold_down_ms: i64,
174 pub apply: bool,
175}
176
177#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
178pub struct RawMirrorPruneReport {
179 pub initialized: bool,
180 pub root_path: String,
181 pub mode: String,
182 pub manifest_count: u64,
183 pub unique_blob_count: u64,
184 pub current_blob_bytes: u64,
185 pub safety_hold_down_ms: i64,
186 pub keep_tags: Vec<String>,
187 pub pinned_manifest_count: u64,
188 pub pinned_blob_count: u64,
189 pub planned_manifest_count: u64,
190 pub planned_blob_count: u64,
191 pub planned_reclaim_bytes: u64,
192 pub applied_manifest_count: u64,
193 pub applied_blob_count: u64,
194 pub applied_reclaim_bytes: u64,
195 pub audit_log_path: Option<String>,
196 pub entries: Vec<RawMirrorPruneEntry>,
197}
198
199#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
200pub struct RawMirrorPruneEntry {
201 pub kind: String,
202 pub path: String,
203 pub blob_blake3: Option<String>,
204 pub size_bytes: u64,
205 pub reason: String,
206 pub applied: bool,
207}
208
209#[derive(Debug, Clone)]
210struct RawMirrorPruneManifest {
211 manifest_id: String,
212 relative_path: String,
213 size_bytes: u64,
214 blob_blake3: String,
215 blob_relative_path: String,
216 blob_size_bytes: u64,
217 captured_at_ms: i64,
218 provider: String,
219 original_path: String,
220 db_links: Vec<RawMirrorDbLink>,
221}
222
223pub fn prune(data_dir: &Path, options: RawMirrorPruneOptions) -> Result<RawMirrorPruneReport> {
224 let root = raw_mirror_root(data_dir);
225 let mut report = RawMirrorPruneReport {
226 initialized: false,
227 root_path: root.display().to_string(),
228 mode: if options.apply {
229 "apply".to_string()
230 } else {
231 "dry-run".to_string()
232 },
233 manifest_count: 0,
234 unique_blob_count: 0,
235 current_blob_bytes: 0,
236 safety_hold_down_ms: options.safety_hold_down_ms,
237 keep_tags: options.keep_tags.clone(),
238 pinned_manifest_count: 0,
239 pinned_blob_count: 0,
240 planned_manifest_count: 0,
241 planned_blob_count: 0,
242 planned_reclaim_bytes: 0,
243 applied_manifest_count: 0,
244 applied_blob_count: 0,
245 applied_reclaim_bytes: 0,
246 audit_log_path: None,
247 entries: Vec::new(),
248 };
249
250 let metadata = match fs::symlink_metadata(&root) {
251 Ok(metadata) => metadata,
252 Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(report),
253 Err(err) => {
254 return Err(err).with_context(|| format!("stat raw mirror root {}", root.display()));
255 }
256 };
257 if metadata.file_type().is_symlink() || !metadata.is_dir() {
258 anyhow::bail!(
259 "refusing to prune invalid raw mirror root {}",
260 root.display()
261 );
262 }
263 report.initialized = true;
264
265 let manifests = collect_prune_manifests(&root)?;
266 report.manifest_count = manifests.len() as u64;
267
268 let mut blob_to_manifests: HashMap<String, Vec<String>> = HashMap::new();
269 let mut manifest_by_id: HashMap<String, &RawMirrorPruneManifest> = HashMap::new();
270 let mut blob_size_by_relative: HashMap<String, u64> = HashMap::new();
271 for manifest in &manifests {
272 manifest_by_id.insert(manifest.manifest_id.clone(), manifest);
273 blob_to_manifests
274 .entry(manifest.blob_relative_path.clone())
275 .or_default()
276 .push(manifest.manifest_id.clone());
277 blob_size_by_relative
278 .entry(manifest.blob_relative_path.clone())
279 .or_insert_with(|| {
280 blob_file_size(&root.join(&manifest.blob_relative_path))
281 .unwrap_or(manifest.blob_size_bytes)
282 });
283 }
284 report.unique_blob_count = blob_size_by_relative.len() as u64;
285 report.current_blob_bytes = blob_size_by_relative
286 .values()
287 .copied()
288 .fold(0u64, u64::saturating_add);
289
290 let now = now_ms();
291 let pinned_manifests = pinned_prune_manifest_ids(
292 data_dir,
293 &manifests,
294 &options.keep_tags,
295 options.safety_hold_down_ms,
296 now,
297 )?;
298 report.pinned_manifest_count = pinned_manifests.len() as u64;
299 let pinned_blobs: HashSet<String> = blob_to_manifests
300 .iter()
301 .filter(|(_, manifest_ids)| manifest_ids.iter().any(|id| pinned_manifests.contains(id)))
302 .map(|(blob_relative_path, _)| blob_relative_path.clone())
303 .collect();
304 report.pinned_blob_count = pinned_blobs.len() as u64;
305
306 let mut selected_manifests: HashSet<String> = HashSet::new();
307 let mut manifest_reasons: HashMap<String, String> = HashMap::new();
308
309 if let Some(older_than_ms) = options.older_than_ms {
310 let cutoff_ms = now.saturating_sub(older_than_ms.max(0));
311 for manifest in &manifests {
312 if manifest.captured_at_ms <= cutoff_ms
313 && !pinned_manifests.contains(&manifest.manifest_id)
314 {
315 selected_manifests.insert(manifest.manifest_id.clone());
316 manifest_reasons
317 .entry(manifest.manifest_id.clone())
318 .or_insert_with(|| format!("captured_at_ms <= {cutoff_ms}"));
319 }
320 }
321 }
322
323 if let Some(max_size_bytes) = options.max_size_bytes
324 && report.current_blob_bytes > max_size_bytes
325 {
326 let mut blob_groups: Vec<_> = blob_to_manifests
327 .iter()
328 .map(|(blob_relative_path, manifest_ids)| {
329 let oldest_capture = manifest_ids
330 .iter()
331 .filter_map(|id| manifest_by_id.get(id).map(|m| m.captured_at_ms))
332 .min()
333 .unwrap_or(i64::MAX);
334 let size = blob_size_by_relative
335 .get(blob_relative_path)
336 .copied()
337 .unwrap_or(0);
338 (
339 blob_relative_path.clone(),
340 manifest_ids.clone(),
341 oldest_capture,
342 size,
343 )
344 })
345 .collect::<Vec<_>>();
346 blob_groups.sort_by(|left, right| left.2.cmp(&right.2).then_with(|| left.0.cmp(&right.0)));
347
348 let mut projected_bytes = report.current_blob_bytes;
349 for (blob_relative_path, manifest_ids, _, size) in blob_groups {
350 if projected_bytes <= max_size_bytes {
351 break;
352 }
353 if pinned_blobs.contains(&blob_relative_path) {
354 continue;
355 }
356 for manifest_id in manifest_ids {
357 if !pinned_manifests.contains(&manifest_id) {
358 selected_manifests.insert(manifest_id.clone());
359 manifest_reasons.entry(manifest_id).or_insert_with(|| {
360 format!("max-size over budget; retiring blob {blob_relative_path}")
361 });
362 }
363 }
364 projected_bytes = projected_bytes.saturating_sub(size);
365 }
366 }
367
368 let selected_blobs: HashSet<String> = blob_to_manifests
369 .iter()
370 .filter(|(_, manifest_ids)| {
371 manifest_ids
372 .iter()
373 .all(|id| selected_manifests.contains(id))
374 })
375 .map(|(blob_relative_path, _)| blob_relative_path.clone())
376 .collect();
377
378 let mut entries = Vec::new();
379 let mut selected_manifest_ids = selected_manifests.into_iter().collect::<Vec<_>>();
380 selected_manifest_ids.sort();
381 for manifest_id in selected_manifest_ids {
382 let Some(manifest) = manifest_by_id.get(&manifest_id) else {
383 continue;
384 };
385 let reason = manifest_reasons
386 .remove(&manifest_id)
387 .unwrap_or_else(|| "selected by retention policy".to_string());
388 entries.push(RawMirrorPruneEntry {
389 kind: "manifest".to_string(),
390 path: manifest.relative_path.clone(),
391 blob_blake3: Some(manifest.blob_blake3.clone()),
392 size_bytes: manifest.size_bytes,
393 reason,
394 applied: false,
395 });
396 }
397
398 let mut selected_blob_paths = selected_blobs.into_iter().collect::<Vec<_>>();
399 selected_blob_paths.sort();
400 for blob_relative_path in selected_blob_paths {
401 let size = blob_size_by_relative
402 .get(&blob_relative_path)
403 .copied()
404 .unwrap_or(0);
405 let blob_blake3 = blob_relative_path
406 .rsplit('/')
407 .next()
408 .and_then(|name| name.strip_suffix(".raw"))
409 .map(ToOwned::to_owned);
410 entries.push(RawMirrorPruneEntry {
411 kind: "blob".to_string(),
412 path: blob_relative_path,
413 blob_blake3,
414 size_bytes: size,
415 reason: "no retained manifest references this blob after prune plan".to_string(),
416 applied: false,
417 });
418 }
419
420 report.planned_manifest_count = entries
421 .iter()
422 .filter(|entry| entry.kind == "manifest")
423 .count() as u64;
424 report.planned_blob_count = entries.iter().filter(|entry| entry.kind == "blob").count() as u64;
425 report.planned_reclaim_bytes = entries
426 .iter()
427 .map(|entry| entry.size_bytes)
428 .fold(0, u64::saturating_add);
429
430 if options.apply {
431 for entry in &mut entries {
432 let path = root.join(&entry.path);
433 let removed = remove_prune_target_file(&path)
434 .with_context(|| format!("applying raw mirror prune for {}", path.display()))?;
435 entry.applied = removed;
436 if removed {
437 if entry.kind == "manifest" {
438 report.applied_manifest_count = report.applied_manifest_count.saturating_add(1);
439 } else if entry.kind == "blob" {
440 report.applied_blob_count = report.applied_blob_count.saturating_add(1);
441 }
442 report.applied_reclaim_bytes = report
443 .applied_reclaim_bytes
444 .saturating_add(entry.size_bytes);
445 }
446 }
447 }
448
449 report.entries = entries;
450 if !report.entries.is_empty() {
451 let audit_path = append_prune_audit_log(&root, &report)?;
452 report.audit_log_path = Some(audit_path.display().to_string());
453 }
454 Ok(report)
455}
456
457fn collect_prune_manifests(root: &Path) -> Result<Vec<RawMirrorPruneManifest>> {
458 let manifests_dir = root.join("manifests");
459 let metadata = match fs::symlink_metadata(&manifests_dir) {
460 Ok(metadata) => metadata,
461 Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
462 Err(err) => return Err(err).with_context(|| format!("stat {}", manifests_dir.display())),
463 };
464 if metadata.file_type().is_symlink() || !metadata.is_dir() {
465 anyhow::bail!(
466 "refusing to prune invalid raw mirror manifests directory {}",
467 manifests_dir.display()
468 );
469 }
470
471 let mut manifests = Vec::new();
472 for entry in
473 fs::read_dir(&manifests_dir).with_context(|| format!("read {}", manifests_dir.display()))?
474 {
475 let entry = entry?;
476 let path = entry.path();
477 if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
478 continue;
479 }
480 let manifest_metadata = fs::symlink_metadata(&path)
481 .with_context(|| format!("stat raw mirror manifest {}", path.display()))?;
482 if manifest_metadata.file_type().is_symlink() || !manifest_metadata.is_file() {
483 anyhow::bail!(
484 "refusing to prune with non-regular raw mirror manifest {}",
485 path.display()
486 );
487 }
488 let manifest = read_raw_mirror_manifest(&path)?;
489 if manifest.manifest_kind != RAW_MIRROR_MANIFEST_KIND {
490 anyhow::bail!(
491 "refusing to prune with unexpected raw mirror manifest kind `{}` in {}",
492 manifest.manifest_kind,
493 path.display()
494 );
495 }
496 let Some(expected_blob_relative_path) =
497 raw_mirror_blob_relative_path(&manifest.blob_blake3)
498 else {
499 anyhow::bail!(
500 "refusing to prune raw mirror manifest {} with invalid blob hash",
501 path.display()
502 );
503 };
504 if manifest.blob_relative_path != expected_blob_relative_path {
505 anyhow::bail!(
506 "refusing to prune raw mirror manifest {} with unexpected blob path `{}`",
507 path.display(),
508 manifest.blob_relative_path
509 );
510 }
511 let relative_path = path
512 .strip_prefix(root)
513 .unwrap_or(&path)
514 .display()
515 .to_string();
516 manifests.push(RawMirrorPruneManifest {
517 manifest_id: manifest.manifest_id,
518 relative_path,
519 size_bytes: manifest_metadata.len(),
520 blob_blake3: manifest.blob_blake3,
521 blob_relative_path: manifest.blob_relative_path,
522 blob_size_bytes: manifest.blob_size_bytes,
523 captured_at_ms: manifest.captured_at_ms,
524 provider: manifest.provider,
525 original_path: manifest.original_path,
526 db_links: manifest.db_links,
527 });
528 }
529 manifests.sort_by(|left, right| {
530 left.captured_at_ms
531 .cmp(&right.captured_at_ms)
532 .then_with(|| left.provider.cmp(&right.provider))
533 .then_with(|| left.original_path.cmp(&right.original_path))
534 .then_with(|| left.manifest_id.cmp(&right.manifest_id))
535 });
536 Ok(manifests)
537}
538
539fn pinned_prune_manifest_ids(
540 data_dir: &Path,
541 manifests: &[RawMirrorPruneManifest],
542 keep_tags: &[String],
543 safety_hold_down_ms: i64,
544 now_ms: i64,
545) -> Result<HashSet<String>> {
546 let mut pinned = HashSet::new();
547 if safety_hold_down_ms > 0 {
548 let cutoff_ms = now_ms.saturating_sub(safety_hold_down_ms);
549 for manifest in manifests {
550 if manifest.captured_at_ms > cutoff_ms {
551 pinned.insert(manifest.manifest_id.clone());
552 }
553 }
554 }
555
556 let normalized_keep_tags = keep_tags
557 .iter()
558 .map(|tag| tag.trim())
559 .filter(|tag| !tag.is_empty())
560 .map(ToOwned::to_owned)
561 .collect::<Vec<_>>();
562 if normalized_keep_tags.is_empty() {
563 return Ok(pinned);
564 }
565
566 let keep_tag_conversation_ids =
567 load_keep_tag_conversation_ids(data_dir, manifests, &normalized_keep_tags)?;
568 for manifest in manifests {
569 if manifest.db_links.iter().any(|link| {
570 link.conversation_id
571 .is_some_and(|id| keep_tag_conversation_ids.contains(&id))
572 }) {
573 pinned.insert(manifest.manifest_id.clone());
574 }
575 }
576 Ok(pinned)
577}
578
579fn load_keep_tag_conversation_ids(
580 data_dir: &Path,
581 manifests: &[RawMirrorPruneManifest],
582 keep_tags: &[String],
583) -> Result<HashSet<i64>> {
584 use frankensqlite::compat::{ConnectionExt as _, ParamValue, RowExt as _};
585
586 let mut conversation_ids = manifests
587 .iter()
588 .flat_map(|manifest| manifest.db_links.iter())
589 .filter_map(|link| link.conversation_id)
590 .collect::<Vec<_>>();
591 conversation_ids.sort_unstable();
592 conversation_ids.dedup();
593 if conversation_ids.is_empty() {
594 return Ok(HashSet::new());
595 }
596
597 let db_path = data_dir.join("agent_search.db");
598 let conn = crate::storage::sqlite::open_franken_raw_readonly_connection_with_timeout(
599 &db_path,
600 Duration::from_secs(30),
601 )
602 .with_context(|| {
603 format!(
604 "open {} to honor raw-mirror prune --keep-tag",
605 db_path.display()
606 )
607 })?;
608 let _ = conn.execute("PRAGMA query_only = 1;");
609
610 let mut pinned = HashSet::new();
611 for id_chunk in conversation_ids.chunks(400) {
612 let tag_placeholders = (0..keep_tags.len())
613 .map(|idx| format!("?{}", idx + 1))
614 .collect::<Vec<_>>()
615 .join(", ");
616 let id_offset = keep_tags.len();
617 let id_placeholders = (0..id_chunk.len())
618 .map(|idx| format!("?{}", id_offset + idx + 1))
619 .collect::<Vec<_>>()
620 .join(", ");
621 let sql = format!(
622 "SELECT DISTINCT ct.conversation_id \
623 FROM conversation_tags ct \
624 JOIN tags t ON t.id = ct.tag_id \
625 WHERE t.name IN ({tag_placeholders}) \
626 AND ct.conversation_id IN ({id_placeholders})"
627 );
628 let mut params = keep_tags
629 .iter()
630 .map(|tag| ParamValue::from(tag.as_str()))
631 .collect::<Vec<_>>();
632 params.extend(id_chunk.iter().copied().map(ParamValue::from));
633 let rows: Vec<i64> = conn
634 .query_map_collect(&sql, ¶ms, |row: &frankensqlite::Row| row.get_typed(0))
635 .with_context(|| "query raw-mirror prune keep-tag conversation pins")?;
636 pinned.extend(rows);
637 }
638
639 Ok(pinned)
640}
641
642fn blob_file_size(path: &Path) -> Option<u64> {
643 fs::symlink_metadata(path)
644 .ok()
645 .filter(|metadata| metadata.is_file() && !metadata.file_type().is_symlink())
646 .map(|metadata| metadata.len())
647}
648
649fn remove_prune_target_file(path: &Path) -> Result<bool> {
650 let metadata = match fs::symlink_metadata(path) {
651 Ok(metadata) => metadata,
652 Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(false),
653 Err(err) => return Err(err).with_context(|| format!("stat {}", path.display())),
654 };
655 if metadata.file_type().is_symlink() || !metadata.is_file() {
656 anyhow::bail!(
657 "refusing to prune non-regular raw mirror file {}",
658 path.display()
659 );
660 }
661 fs::remove_file(path).with_context(|| format!("remove raw mirror file {}", path.display()))?;
662 sync_parent(path)?;
663 Ok(true)
664}
665
666fn append_prune_audit_log(root: &Path, report: &RawMirrorPruneReport) -> Result<PathBuf> {
667 ensure_private_dir(root)?;
668 let audit_path = root.join("pruned.jsonl");
669 ensure_prune_audit_log_appendable(&audit_path)?;
670 let mut file = OpenOptions::new()
671 .create(true)
672 .append(true)
673 .open(&audit_path)
674 .with_context(|| format!("open raw mirror prune audit {}", audit_path.display()))?;
675 set_private_file_permissions(&audit_path)?;
676 let now = now_ms();
677 for entry in &report.entries {
678 let record = json!({
679 "schema_version": 1,
680 "recorded_at_ms": now,
681 "mode": report.mode,
682 "kind": entry.kind,
683 "path": entry.path,
684 "blob_blake3": entry.blob_blake3,
685 "size_bytes": entry.size_bytes,
686 "reason": entry.reason,
687 "applied": entry.applied,
688 });
689 writeln!(file, "{record}")
690 .with_context(|| format!("write raw mirror prune audit {}", audit_path.display()))?;
691 }
692 file.sync_all()
693 .with_context(|| format!("sync raw mirror prune audit {}", audit_path.display()))?;
694 sync_parent(&audit_path)?;
695 Ok(audit_path)
696}
697
698fn ensure_prune_audit_log_appendable(path: &Path) -> Result<()> {
699 match fs::symlink_metadata(path) {
700 Ok(metadata) if metadata.file_type().is_symlink() => {
701 anyhow::bail!(
702 "refusing to append raw mirror prune audit through symlink {}",
703 path.display()
704 );
705 }
706 Ok(metadata) if !metadata.is_file() => {
707 anyhow::bail!(
708 "refusing to append raw mirror prune audit to non-file {}",
709 path.display()
710 );
711 }
712 Ok(_) => Ok(()),
713 Err(err) if matches!(err.kind(), std::io::ErrorKind::NotFound) => Ok(()),
714 Err(err) => Err(err).with_context(|| {
715 format!(
716 "inspect raw mirror prune audit before append {}",
717 path.display()
718 )
719 }),
720 }
721}
722
723fn merge_min_max(min: &mut Option<i64>, max: &mut Option<i64>, value: Option<i64>) {
724 let Some(value) = value else {
725 return;
726 };
727 *min = Some(min.map_or(value, |current| current.min(value)));
728 *max = Some(max.map_or(value, |current| current.max(value)));
729}
730
731fn raw_mirror_dir_file_bytes(root: &Path) -> u64 {
732 let mut total = 0u64;
733 let mut stack = vec![root.to_path_buf()];
734 while let Some(path) = stack.pop() {
735 let Ok(metadata) = fs::symlink_metadata(&path) else {
736 continue;
737 };
738 if metadata.file_type().is_symlink() {
739 continue;
740 }
741 if metadata.is_file() {
742 total = total.saturating_add(metadata.len());
743 } else if metadata.is_dir() {
744 let Ok(entries) = fs::read_dir(&path) else {
745 continue;
746 };
747 for entry in entries.flatten() {
748 stack.push(entry.path());
749 }
750 }
751 }
752 total
753}
754
755#[derive(Debug, Clone, PartialEq, Eq, Hash)]
756struct RawMirrorBlobCacheKey {
757 data_dir: PathBuf,
758 source_path: PathBuf,
759 source_identity: Option<String>,
760 source_size_bytes: u64,
761 source_mtime_ns: Option<u128>,
762 source_change_time_ns: Option<u128>,
763}
764
765#[derive(Debug, Clone, PartialEq, Eq)]
766struct RawMirrorBlobRecord {
767 blob_blake3: String,
768 bytes_copied: u64,
769}
770
771#[derive(Debug, Clone, Serialize, Deserialize)]
772struct RawMirrorCompressionEnvelope {
773 state: String,
774 algorithm: Option<String>,
775 uncompressed_size_bytes: Option<u64>,
776}
777
778#[derive(Debug, Clone, Serialize, Deserialize)]
779struct RawMirrorEncryptionEnvelope {
780 state: String,
781 algorithm: Option<String>,
782 key_id: Option<String>,
783 envelope_version: Option<u32>,
784}
785
786#[derive(Debug, Clone, Serialize, Deserialize)]
787struct RawMirrorVerificationRecord {
788 status: String,
789 verifier: String,
790 content_blake3: Option<String>,
791 verified_at_ms: Option<i64>,
792}
793
794#[derive(Debug, Clone, Serialize, Deserialize)]
795struct RawMirrorManifestFile {
796 schema_version: u32,
797 manifest_kind: String,
798 manifest_id: String,
799 blob_hash_algorithm: String,
800 blob_relative_path: String,
801 blob_blake3: String,
802 blob_size_bytes: u64,
803 provider: String,
804 source_id: String,
805 origin_kind: String,
806 origin_host: Option<String>,
807 original_path: String,
808 redacted_original_path: String,
809 original_path_blake3: String,
810 captured_at_ms: i64,
811 source_mtime_ms: Option<i64>,
812 source_size_bytes: u64,
813 compression: RawMirrorCompressionEnvelope,
814 encryption: RawMirrorEncryptionEnvelope,
815 db_links: Vec<RawMirrorDbLink>,
816 verification: RawMirrorVerificationRecord,
817 manifest_blake3: Option<String>,
818}
819
820pub fn capture_source_file(input: RawMirrorCaptureInput<'_>) -> Result<RawMirrorCaptureRecord> {
821 let source_metadata = fs::symlink_metadata(input.source_path)
822 .with_context(|| format!("stat raw mirror source {}", input.source_path.display()))?;
823 if source_metadata.file_type().is_symlink() {
824 return Err(anyhow!(
825 "refusing to raw-mirror symlink source {}",
826 input.source_path.display()
827 ));
828 }
829 if !source_metadata.is_file() {
830 return Err(anyhow!(
831 "refusing to raw-mirror non-file source {}",
832 input.source_path.display()
833 ));
834 }
835
836 let root = ensure_raw_mirror_root(input.data_dir)?;
837 ensure_private_dir_descendant(&root, &root.join("tmp"))?;
838
839 let cache_key = raw_mirror_blob_cache_key(&input, &source_metadata);
840 let (blob_blake3, bytes_copied, blob_already_present) =
841 match cached_raw_mirror_blob_record(&cache_key, &root) {
842 Some(record) => (record.blob_blake3, record.bytes_copied, true),
843 None => {
844 let temp_dir = unique_capture_temp_dir(&root);
845 ensure_private_dir_descendant(&root, &temp_dir)?;
846 let CopyToTempResult {
847 temp_path,
848 blob_blake3,
849 bytes_copied,
850 } = copy_source_to_private_temp(input.source_path, &temp_dir, &source_metadata)?;
851 let blob_relative_path = raw_mirror_blob_relative_path(&blob_blake3)
852 .ok_or_else(|| anyhow!("computed invalid raw mirror blake3 digest"))?;
853 let blob_path = root.join(&blob_relative_path);
854 let already_present =
855 publish_content_addressed_temp(&root, &temp_path, &blob_path, &blob_blake3)?;
856 remove_empty_temp_dir_best_effort(&temp_dir);
857 cache_raw_mirror_blob_record(
858 cache_key.clone(),
859 RawMirrorBlobRecord {
860 blob_blake3: blob_blake3.clone(),
861 bytes_copied,
862 },
863 );
864 (blob_blake3, bytes_copied, already_present)
865 }
866 };
867 let blob_relative_path = raw_mirror_blob_relative_path(&blob_blake3)
868 .ok_or_else(|| anyhow!("computed invalid raw mirror blake3 digest"))?;
869
870 let original_path = input.source_path.display().to_string();
871 let original_path_blake3 = raw_mirror_original_path_blake3(&original_path);
872 let manifest_id = raw_mirror_manifest_id(
873 input.provider,
874 input.source_id,
875 input.origin_kind,
876 input.origin_host,
877 &original_path_blake3,
878 &blob_blake3,
879 );
880 let manifest_relative_path = raw_mirror_manifest_relative_path(&manifest_id);
881 let manifest_path = root.join(&manifest_relative_path);
882 let captured_at_ms = now_ms();
883 let source_mtime_ms = source_metadata.modified().ok().and_then(system_time_to_ms);
884 let mut manifest = RawMirrorManifestFile {
885 schema_version: RAW_MIRROR_SCHEMA_VERSION,
886 manifest_kind: RAW_MIRROR_MANIFEST_KIND.to_string(),
887 manifest_id: manifest_id.clone(),
888 blob_hash_algorithm: RAW_MIRROR_HASH_ALGORITHM.to_string(),
889 blob_relative_path: blob_relative_path.clone(),
890 blob_blake3: blob_blake3.clone(),
891 blob_size_bytes: bytes_copied,
892 provider: input.provider.to_string(),
893 source_id: input.source_id.to_string(),
894 origin_kind: input.origin_kind.to_string(),
895 origin_host: input.origin_host.map(ToOwned::to_owned),
896 original_path,
897 redacted_original_path: redacted_original_path(input.provider, input.source_path),
898 original_path_blake3,
899 captured_at_ms,
900 source_mtime_ms,
901 source_size_bytes: source_metadata.len(),
902 compression: RawMirrorCompressionEnvelope {
903 state: "none".to_string(),
904 algorithm: None,
905 uncompressed_size_bytes: Some(bytes_copied),
906 },
907 encryption: RawMirrorEncryptionEnvelope {
908 state: "none".to_string(),
909 algorithm: None,
910 key_id: None,
911 envelope_version: None,
912 },
913 db_links: unique_db_links(input.db_links),
914 verification: RawMirrorVerificationRecord {
915 status: "captured".to_string(),
916 verifier: "cass_indexer".to_string(),
917 content_blake3: Some(blob_blake3.clone()),
918 verified_at_ms: Some(captured_at_ms),
919 },
920 manifest_blake3: None,
921 };
922 manifest.manifest_blake3 = Some(raw_mirror_manifest_blake3(&manifest));
923 let manifest_bytes = serde_json::to_vec_pretty(&manifest)?;
924 let manifest_already_present =
925 publish_manifest_bytes_create_new(&root, &manifest_path, &manifest_bytes, &blob_blake3)?;
926 let (record_blob_size_bytes, record_captured_at_ms, record_source_mtime_ms) =
927 if manifest_already_present {
928 merge_raw_mirror_manifest_db_links(
929 &root,
930 &manifest_path,
931 input.db_links,
932 Some(&blob_blake3),
933 )?;
934 let published = read_raw_mirror_manifest(&manifest_path)?;
935 (
936 published.blob_size_bytes,
937 published.captured_at_ms,
938 published.source_mtime_ms,
939 )
940 } else {
941 (bytes_copied, captured_at_ms, source_mtime_ms)
942 };
943
944 Ok(RawMirrorCaptureRecord {
945 manifest_id,
946 manifest_relative_path,
947 blob_relative_path,
948 blob_blake3,
949 blob_size_bytes: record_blob_size_bytes,
950 captured_at_ms: record_captured_at_ms,
951 source_mtime_ms: record_source_mtime_ms,
952 already_present: blob_already_present && manifest_already_present,
953 })
954}
955
956pub fn merge_manifest_db_links(
957 data_dir: &Path,
958 manifest_relative_path: &str,
959 links: &[RawMirrorDbLink],
960) -> Result<()> {
961 if links.is_empty() {
962 return Ok(());
963 }
964 let root = raw_mirror_root(data_dir);
965 let manifest_path = raw_mirror_manifest_path_from_relative(&root, manifest_relative_path)?;
966 merge_raw_mirror_manifest_db_links(&root, &manifest_path, links, None)
967}
968
969struct CopyToTempResult {
970 temp_path: PathBuf,
971 blob_blake3: String,
972 bytes_copied: u64,
973}
974
975fn copy_source_to_private_temp(
976 source_path: &Path,
977 temp_dir: &Path,
978 source_metadata: &fs::Metadata,
979) -> Result<CopyToTempResult> {
980 let temp_path = unique_temp_path(temp_dir, "blob");
981 let mut source = open_stable_source_file(source_path, source_metadata)?;
982 let mut temp = private_create_new_file(&temp_path)?;
983 let mut hasher = blake3::Hasher::new();
984 let mut buffer = [0u8; 64 * 1024];
985 let mut bytes_copied = 0u64;
986 loop {
987 let read = source
988 .read(&mut buffer)
989 .with_context(|| format!("read raw mirror source {}", source_path.display()))?;
990 if read == 0 {
991 break;
992 }
993 temp.write_all(&buffer[..read])
994 .with_context(|| format!("write raw mirror temp {}", temp_path.display()))?;
995 hasher.update(&buffer[..read]);
996 bytes_copied = bytes_copied.saturating_add(read as u64);
997 }
998 temp.sync_all()
999 .with_context(|| format!("sync raw mirror temp {}", temp_path.display()))?;
1000
1001 let final_source_metadata = source
1002 .metadata()
1003 .with_context(|| format!("stat opened raw mirror source {}", source_path.display()))?;
1004 if source_file_changed_during_capture(source_metadata, &final_source_metadata) {
1005 remove_temp_best_effort(&temp_path);
1006 return Err(anyhow!(
1007 "raw mirror source {} changed while it was being captured; retry indexing to capture a stable copy",
1008 source_path.display()
1009 ));
1010 }
1011
1012 Ok(CopyToTempResult {
1013 temp_path,
1014 blob_blake3: hasher.finalize().to_hex().to_string(),
1015 bytes_copied,
1016 })
1017}
1018
1019fn open_stable_source_file(source_path: &Path, expected_metadata: &fs::Metadata) -> Result<File> {
1020 let source = File::open(source_path)
1021 .with_context(|| format!("open raw mirror source {}", source_path.display()))?;
1022 let opened_metadata = source
1023 .metadata()
1024 .with_context(|| format!("stat opened raw mirror source {}", source_path.display()))?;
1025 if !same_source_identity(expected_metadata, &opened_metadata) {
1026 return Err(anyhow!(
1027 "raw mirror source {} changed identity before capture",
1028 source_path.display()
1029 ));
1030 }
1031 let current_path_metadata = fs::symlink_metadata(source_path)
1032 .with_context(|| format!("restat raw mirror source {}", source_path.display()))?;
1033 if current_path_metadata.file_type().is_symlink() {
1034 return Err(anyhow!(
1035 "refusing to raw-mirror symlink source {}",
1036 source_path.display()
1037 ));
1038 }
1039 if !same_source_identity(expected_metadata, ¤t_path_metadata) {
1040 return Err(anyhow!(
1041 "raw mirror source {} changed identity before capture",
1042 source_path.display()
1043 ));
1044 }
1045 Ok(source)
1046}
1047
1048#[cfg(unix)]
1049fn same_source_identity(expected: &fs::Metadata, actual: &fs::Metadata) -> bool {
1050 use std::os::unix::fs::MetadataExt;
1051 actual.is_file() && expected.dev() == actual.dev() && expected.ino() == actual.ino()
1052}
1053
1054#[cfg(not(unix))]
1055fn same_source_identity(_expected: &fs::Metadata, actual: &fs::Metadata) -> bool {
1056 actual.is_file()
1057}
1058
1059#[cfg(unix)]
1060fn source_identity_token(metadata: &fs::Metadata) -> Option<String> {
1061 use std::os::unix::fs::MetadataExt;
1062 Some(format!("{}:{}", metadata.dev(), metadata.ino()))
1063}
1064
1065#[cfg(not(unix))]
1066fn source_identity_token(_metadata: &fs::Metadata) -> Option<String> {
1067 None
1068}
1069
1070#[cfg(unix)]
1071fn source_change_time_ns(metadata: &fs::Metadata) -> Option<u128> {
1072 use std::os::unix::fs::MetadataExt;
1073
1074 let seconds = u128::try_from(metadata.ctime()).ok()?;
1075 let nanoseconds = u128::try_from(metadata.ctime_nsec()).ok()?;
1076 Some(
1077 seconds
1078 .saturating_mul(1_000_000_000)
1079 .saturating_add(nanoseconds),
1080 )
1081}
1082
1083#[cfg(not(unix))]
1084fn source_change_time_ns(_metadata: &fs::Metadata) -> Option<u128> {
1085 None
1086}
1087
1088fn source_file_changed_during_capture(
1089 initial: &fs::Metadata,
1090 final_metadata: &fs::Metadata,
1091) -> bool {
1092 if initial.len() != final_metadata.len() {
1093 return true;
1094 }
1095 match (initial.modified().ok(), final_metadata.modified().ok()) {
1096 (Some(initial_mtime), Some(final_mtime)) => initial_mtime != final_mtime,
1097 _ => false,
1098 }
1099}
1100
1101fn publish_content_addressed_temp(
1102 root: &Path,
1103 temp_path: &Path,
1104 final_path: &Path,
1105 expected_blake3: &str,
1106) -> Result<bool> {
1107 ensure_private_dir_descendant(
1108 root,
1109 final_path
1110 .parent()
1111 .ok_or_else(|| anyhow!("raw mirror blob path has no parent"))?,
1112 )?;
1113 if final_path.exists() {
1114 verify_existing_file(final_path, expected_blake3)?;
1115 remove_temp_best_effort(temp_path);
1116 return Ok(true);
1117 }
1118
1119 match fs::hard_link(temp_path, final_path) {
1120 Ok(()) => {
1121 sync_file(final_path)?;
1122 sync_parent(final_path)?;
1123 remove_temp_best_effort(temp_path);
1124 Ok(false)
1125 }
1126 Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {
1127 verify_existing_file(final_path, expected_blake3)?;
1128 remove_temp_best_effort(temp_path);
1129 Ok(true)
1130 }
1131 Err(err) => Err(anyhow!(
1132 "publish raw mirror blob {} from {}: {err}",
1133 final_path.display(),
1134 temp_path.display()
1135 )),
1136 }
1137}
1138
1139fn publish_manifest_bytes_create_new(
1140 root: &Path,
1141 manifest_path: &Path,
1142 manifest_bytes: &[u8],
1143 blob_blake3: &str,
1144) -> Result<bool> {
1145 ensure_private_dir_descendant(
1146 root,
1147 manifest_path
1148 .parent()
1149 .ok_or_else(|| anyhow!("raw mirror manifest path has no parent"))?,
1150 )?;
1151 if manifest_path.exists() {
1152 verify_existing_manifest(manifest_path, blob_blake3)?;
1153 return Ok(true);
1154 }
1155
1156 let temp_dir = unique_capture_temp_dir(root);
1157 ensure_private_dir_descendant(root, &temp_dir)?;
1158 let temp_path = unique_temp_path(&temp_dir, "manifest");
1159 let mut temp = private_create_new_file(&temp_path)?;
1160 temp.write_all(manifest_bytes)
1161 .with_context(|| format!("write raw mirror manifest temp {}", temp_path.display()))?;
1162 temp.sync_all()
1163 .with_context(|| format!("sync raw mirror manifest temp {}", temp_path.display()))?;
1164
1165 match fs::hard_link(&temp_path, manifest_path) {
1166 Ok(()) => {
1167 sync_file(manifest_path)?;
1168 sync_parent(manifest_path)?;
1169 remove_temp_best_effort(&temp_path);
1170 remove_empty_temp_dir_best_effort(&temp_dir);
1171 Ok(false)
1172 }
1173 Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {
1174 verify_existing_manifest(manifest_path, blob_blake3)?;
1175 remove_temp_best_effort(&temp_path);
1176 remove_empty_temp_dir_best_effort(&temp_dir);
1177 Ok(true)
1178 }
1179 Err(err) => Err(anyhow!(
1180 "publish raw mirror manifest {} from {}: {err}",
1181 manifest_path.display(),
1182 temp_path.display()
1183 )),
1184 }
1185}
1186
1187fn merge_raw_mirror_manifest_db_links(
1188 root: &Path,
1189 manifest_path: &Path,
1190 links: &[RawMirrorDbLink],
1191 expected_blob_blake3: Option<&str>,
1192) -> Result<()> {
1193 if links.is_empty() {
1194 return Ok(());
1195 }
1196
1197 let lock = MANIFEST_UPDATE_LOCK.get_or_init(|| Mutex::new(()));
1198 let _guard = lock
1199 .lock()
1200 .map_err(|_| anyhow!("raw mirror manifest update lock poisoned"))?;
1201
1202 let mut manifest = read_raw_mirror_manifest(manifest_path)?;
1203 if let Some(expected_blob_blake3) = expected_blob_blake3
1204 && manifest.blob_blake3 != expected_blob_blake3
1205 {
1206 return Err(anyhow!(
1207 "existing raw mirror manifest {} points at blob {}, expected {}",
1208 manifest_path.display(),
1209 manifest.blob_blake3,
1210 expected_blob_blake3
1211 ));
1212 }
1213
1214 let mut merged_links = manifest.db_links.clone();
1215 merged_links.extend_from_slice(links);
1216 let merged_links = unique_db_links(&merged_links);
1217 if merged_links == manifest.db_links {
1218 return Ok(());
1219 }
1220
1221 manifest.db_links = merged_links;
1222 manifest.manifest_blake3 = Some(raw_mirror_manifest_blake3(&manifest));
1223 let manifest_bytes = serde_json::to_vec_pretty(&manifest)?;
1224 replace_manifest_bytes(root, manifest_path, &manifest_bytes)
1225}
1226
1227fn replace_manifest_bytes(root: &Path, manifest_path: &Path, manifest_bytes: &[u8]) -> Result<()> {
1228 ensure_private_dir_descendant(
1229 root,
1230 manifest_path
1231 .parent()
1232 .ok_or_else(|| anyhow!("raw mirror manifest path has no parent"))?,
1233 )?;
1234 let temp_dir = unique_capture_temp_dir(root);
1235 ensure_private_dir_descendant(root, &temp_dir)?;
1236 let temp_path = unique_temp_path(&temp_dir, "manifest-update");
1237 let mut temp = private_create_new_file(&temp_path)?;
1238 temp.write_all(manifest_bytes).with_context(|| {
1239 format!(
1240 "write raw mirror manifest update temp {}",
1241 temp_path.display()
1242 )
1243 })?;
1244 temp.sync_all().with_context(|| {
1245 format!(
1246 "sync raw mirror manifest update temp {}",
1247 temp_path.display()
1248 )
1249 })?;
1250 drop(temp);
1251
1252 fs::rename(&temp_path, manifest_path).with_context(|| {
1253 format!(
1254 "replace raw mirror manifest {} from {}",
1255 manifest_path.display(),
1256 temp_path.display()
1257 )
1258 })?;
1259 set_private_file_permissions(manifest_path)?;
1260 sync_file(manifest_path)?;
1261 sync_parent(manifest_path)?;
1262 remove_empty_temp_dir_best_effort(&temp_dir);
1263 Ok(())
1264}
1265
1266fn raw_mirror_manifest_path_from_relative(root: &Path, relative_path: &str) -> Result<PathBuf> {
1267 let relative = Path::new(relative_path);
1268 if relative.is_absolute() {
1269 return Err(anyhow!(
1270 "raw mirror manifest path must be relative: {relative_path}"
1271 ));
1272 }
1273
1274 let mut normal_components = Vec::new();
1275 for component in relative.components() {
1276 match component {
1277 std::path::Component::Normal(part) => normal_components.push(part),
1278 _ => {
1279 return Err(anyhow!(
1280 "raw mirror manifest path must use only normal relative components: {relative_path}"
1281 ));
1282 }
1283 }
1284 }
1285
1286 if normal_components.len() != 2
1287 || normal_components[0] != std::ffi::OsStr::new("manifests")
1288 || Path::new(normal_components[1])
1289 .extension()
1290 .and_then(|ext| ext.to_str())
1291 != Some("json")
1292 {
1293 return Err(anyhow!(
1294 "raw mirror manifest path must match manifests/<id>.json: {relative_path}"
1295 ));
1296 }
1297
1298 Ok(root.join(relative))
1299}
1300
1301fn verify_existing_file(path: &Path, expected_blake3: &str) -> Result<()> {
1302 let metadata = fs::symlink_metadata(path)
1303 .with_context(|| format!("stat raw mirror blob {}", path.display()))?;
1304 if metadata.file_type().is_symlink() {
1305 return Err(anyhow!(
1306 "refusing to read symlink raw mirror blob {}",
1307 path.display()
1308 ));
1309 }
1310 if !metadata.is_file() {
1311 return Err(anyhow!(
1312 "refusing to read non-file raw mirror blob {}",
1313 path.display()
1314 ));
1315 }
1316 let actual = file_blake3(path)?;
1317 if actual == expected_blake3 {
1318 Ok(())
1319 } else {
1320 Err(anyhow!(
1321 "existing raw mirror blob {} has blake3 {}, expected {}",
1322 path.display(),
1323 actual,
1324 expected_blake3
1325 ))
1326 }
1327}
1328
1329fn verify_existing_manifest(path: &Path, expected_blob_blake3: &str) -> Result<()> {
1330 let manifest = read_raw_mirror_manifest(path)?;
1331 if manifest.blob_blake3 == expected_blob_blake3 {
1332 Ok(())
1333 } else {
1334 Err(anyhow!(
1335 "existing raw mirror manifest {} points at blob {}, expected {}",
1336 path.display(),
1337 manifest.blob_blake3,
1338 expected_blob_blake3
1339 ))
1340 }
1341}
1342
1343fn read_raw_mirror_manifest(path: &Path) -> Result<RawMirrorManifestFile> {
1344 let metadata = fs::symlink_metadata(path)
1345 .with_context(|| format!("stat raw mirror manifest {}", path.display()))?;
1346 if metadata.file_type().is_symlink() {
1347 return Err(anyhow!(
1348 "refusing to read symlink raw mirror manifest {}",
1349 path.display()
1350 ));
1351 }
1352 if !metadata.is_file() {
1353 return Err(anyhow!(
1354 "refusing to read non-file raw mirror manifest {}",
1355 path.display()
1356 ));
1357 }
1358 serde_json::from_slice(
1359 &fs::read(path).with_context(|| format!("read raw mirror manifest {}", path.display()))?,
1360 )
1361 .with_context(|| format!("parse raw mirror manifest {}", path.display()))
1362}
1363
1364fn raw_mirror_root(data_dir: &Path) -> PathBuf {
1365 data_dir
1366 .join(RAW_MIRROR_ROOT_DIR)
1367 .join(RAW_MIRROR_VERSION_DIR)
1368}
1369
1370fn ensure_raw_mirror_root(data_dir: &Path) -> Result<PathBuf> {
1371 let root_parent = data_dir.join(RAW_MIRROR_ROOT_DIR);
1372 ensure_private_dir(&root_parent)?;
1373 let root = root_parent.join(RAW_MIRROR_VERSION_DIR);
1374 ensure_private_dir(&root)?;
1375 Ok(root)
1376}
1377
1378fn raw_mirror_blob_cache_key(
1379 input: &RawMirrorCaptureInput<'_>,
1380 source_metadata: &fs::Metadata,
1381) -> RawMirrorBlobCacheKey {
1382 RawMirrorBlobCacheKey {
1383 data_dir: input.data_dir.to_path_buf(),
1384 source_path: input.source_path.to_path_buf(),
1385 source_identity: source_identity_token(source_metadata),
1386 source_size_bytes: source_metadata.len(),
1387 source_mtime_ns: source_metadata.modified().ok().and_then(system_time_to_ns),
1388 source_change_time_ns: source_change_time_ns(source_metadata),
1389 }
1390}
1391
1392fn cached_raw_mirror_blob_record(
1393 key: &RawMirrorBlobCacheKey,
1394 root: &Path,
1395) -> Option<RawMirrorBlobRecord> {
1396 let cache = BLOB_CAPTURE_CACHE.get_or_init(|| Mutex::new(HashMap::new()));
1397 let record = {
1398 let mut guard = cache.lock().ok()?;
1399 let record = guard.get(key).cloned()?;
1400 if raw_mirror_blob_relative_path(&record.blob_blake3).is_none() {
1401 guard.remove(key);
1402 return None;
1403 }
1404 record
1405 };
1406
1407 let blob_relative_path = raw_mirror_blob_relative_path(&record.blob_blake3)?;
1408 let blob_path = root.join(blob_relative_path);
1409 let metadata_valid = fs::symlink_metadata(&blob_path)
1410 .map(|metadata| metadata.is_file() && !metadata.file_type().is_symlink())
1411 .unwrap_or(false);
1412 if !metadata_valid {
1413 remove_cached_raw_mirror_blob_record_if_unchanged(cache, key, &record);
1414 return None;
1415 }
1416
1417 match file_blake3(&blob_path) {
1418 Ok(actual) if actual == record.blob_blake3 => Some(record),
1419 Ok(actual) => {
1420 tracing::warn!(
1421 path = %blob_path.display(),
1422 expected_blake3 = %record.blob_blake3,
1423 actual_blake3 = %actual,
1424 "discarding raw mirror blob cache entry with mismatched content"
1425 );
1426 remove_cached_raw_mirror_blob_record_if_unchanged(cache, key, &record);
1427 None
1428 }
1429 Err(err) => {
1430 tracing::debug!(
1431 path = %blob_path.display(),
1432 error = %err,
1433 "discarding unreadable raw mirror blob cache entry"
1434 );
1435 remove_cached_raw_mirror_blob_record_if_unchanged(cache, key, &record);
1436 None
1437 }
1438 }
1439}
1440
1441fn remove_cached_raw_mirror_blob_record_if_unchanged(
1442 cache: &Mutex<HashMap<RawMirrorBlobCacheKey, RawMirrorBlobRecord>>,
1443 key: &RawMirrorBlobCacheKey,
1444 stale_record: &RawMirrorBlobRecord,
1445) {
1446 if let Ok(mut guard) = cache.lock()
1447 && guard
1448 .get(key)
1449 .is_some_and(|current| current == stale_record)
1450 {
1451 guard.remove(key);
1452 }
1453}
1454
1455fn cache_raw_mirror_blob_record(key: RawMirrorBlobCacheKey, record: RawMirrorBlobRecord) {
1456 let cache = BLOB_CAPTURE_CACHE.get_or_init(|| Mutex::new(HashMap::new()));
1457 if let Ok(mut guard) = cache.lock() {
1458 guard.insert(key, record);
1459 }
1460}
1461
1462fn raw_mirror_blob_relative_path(blob_blake3: &str) -> Option<String> {
1463 if blob_blake3.len() != 64 || !blob_blake3.chars().all(|c| c.is_ascii_hexdigit()) {
1464 return None;
1465 }
1466 let lower = blob_blake3.to_ascii_lowercase();
1467 Some(format!(
1468 "blobs/{}/{}/{}.{}",
1469 RAW_MIRROR_HASH_ALGORITHM,
1470 &lower[..2],
1471 lower,
1472 RAW_MIRROR_BLOB_EXTENSION
1473 ))
1474}
1475
1476fn raw_mirror_manifest_relative_path(manifest_id: &str) -> String {
1477 format!("manifests/{manifest_id}.json")
1478}
1479
1480fn raw_mirror_original_path_blake3(original_path: &str) -> String {
1481 let mut hasher = blake3::Hasher::new();
1482 hasher.update(b"doctor-raw-mirror-original-path-v1");
1483 hasher.update(&[0]);
1484 hasher.update(original_path.as_bytes());
1485 hasher.finalize().to_hex().to_string()
1486}
1487
1488fn raw_mirror_manifest_id(
1489 provider: &str,
1490 source_id: &str,
1491 origin_kind: &str,
1492 origin_host: Option<&str>,
1493 original_path_blake3: &str,
1494 blob_blake3: &str,
1495) -> String {
1496 canonical_blake3(
1497 "doctor-raw-mirror-manifest-id-v1",
1498 json!({
1499 "provider": provider,
1500 "source_id": source_id,
1501 "origin_kind": origin_kind,
1502 "origin_host": origin_host,
1503 "original_path_blake3": original_path_blake3,
1504 "blob_blake3": blob_blake3,
1505 }),
1506 )
1507}
1508
1509fn raw_mirror_manifest_blake3(manifest: &RawMirrorManifestFile) -> String {
1510 let mut value = serde_json::to_value(manifest).unwrap_or_default();
1511 if let Value::Object(map) = &mut value {
1512 map.remove("manifest_blake3");
1513 }
1514 canonical_blake3("doctor-raw-mirror-manifest-v1", value)
1515}
1516
1517fn canonical_blake3(prefix: &str, value: Value) -> String {
1518 let encoded = serde_json::to_vec(&canonical_json_value(value)).unwrap_or_default();
1519 let mut hasher = blake3::Hasher::new();
1520 hasher.update(prefix.as_bytes());
1521 hasher.update(&[0]);
1522 hasher.update(&encoded);
1523 format!("{prefix}-{}", hasher.finalize().to_hex())
1524}
1525
1526fn canonical_json_value(value: Value) -> Value {
1527 match value {
1528 Value::Array(items) => Value::Array(items.into_iter().map(canonical_json_value).collect()),
1529 Value::Object(map) => {
1530 let mut entries: Vec<_> = map.into_iter().collect();
1531 entries.sort_by(|left, right| left.0.cmp(&right.0));
1532 let mut canonical = serde_json::Map::new();
1533 for (key, value) in entries {
1534 canonical.insert(key, canonical_json_value(value));
1535 }
1536 Value::Object(canonical)
1537 }
1538 other => other,
1539 }
1540}
1541
1542fn unique_db_links(links: &[RawMirrorDbLink]) -> Vec<RawMirrorDbLink> {
1543 let mut dedup = links.to_vec();
1544 dedup.sort_by(|left, right| {
1545 (
1546 left.conversation_id,
1547 left.message_count,
1548 left.started_at_ms,
1549 left.source_path.as_deref().unwrap_or(""),
1550 )
1551 .cmp(&(
1552 right.conversation_id,
1553 right.message_count,
1554 right.started_at_ms,
1555 right.source_path.as_deref().unwrap_or(""),
1556 ))
1557 });
1558 dedup.dedup();
1559 dedup
1560}
1561
1562fn file_blake3(path: &Path) -> Result<String> {
1563 let mut file = File::open(path).with_context(|| format!("open {}", path.display()))?;
1564 let mut hasher = blake3::Hasher::new();
1565 let mut buffer = [0u8; 64 * 1024];
1566 loop {
1567 let read = file
1568 .read(&mut buffer)
1569 .with_context(|| format!("read {}", path.display()))?;
1570 if read == 0 {
1571 break;
1572 }
1573 hasher.update(&buffer[..read]);
1574 }
1575 Ok(hasher.finalize().to_hex().to_string())
1576}
1577
1578fn ensure_private_dir(path: &Path) -> Result<()> {
1579 create_private_dir_all(path)
1580 .with_context(|| format!("create raw mirror dir {}", path.display()))?;
1581 let metadata = fs::symlink_metadata(path)
1582 .with_context(|| format!("stat raw mirror dir {}", path.display()))?;
1583 let file_type = metadata.file_type();
1584 if file_type.is_symlink() {
1585 return Err(anyhow!(
1586 "refusing to use symlink raw mirror dir {}",
1587 path.display()
1588 ));
1589 }
1590 if !file_type.is_dir() {
1591 return Err(anyhow!(
1592 "refusing to use non-directory raw mirror path {}",
1593 path.display()
1594 ));
1595 }
1596 #[cfg(unix)]
1597 {
1598 use std::os::unix::fs::PermissionsExt;
1599 if metadata.permissions().mode() & 0o777 != 0o700 {
1600 set_private_dir_permissions(path)?;
1601 }
1602 }
1603 #[cfg(not(unix))]
1604 {
1605 set_private_dir_permissions(path)?;
1606 }
1607 Ok(())
1608}
1609
1610fn ensure_private_dir_descendant(root: &Path, path: &Path) -> Result<()> {
1611 let relative = path.strip_prefix(root).with_context(|| {
1612 format!(
1613 "raw mirror private dir {} is not under root {}",
1614 path.display(),
1615 root.display()
1616 )
1617 })?;
1618
1619 if let Some(root_parent) = root.parent() {
1620 ensure_private_dir(root_parent)?;
1621 }
1622 ensure_private_dir(root)?;
1623 let mut current = root.to_path_buf();
1624 for component in relative.components() {
1625 match component {
1626 Component::Normal(part) => {
1627 current.push(part);
1628 ensure_private_dir(¤t)?;
1629 }
1630 Component::CurDir => {}
1631 _ => {
1632 return Err(anyhow!(
1633 "raw mirror private dir contains non-normal component: {}",
1634 path.display()
1635 ));
1636 }
1637 }
1638 }
1639
1640 Ok(())
1641}
1642
1643fn private_create_new_file(path: &Path) -> Result<File> {
1644 let mut options = OpenOptions::new();
1645 options.write(true).create_new(true);
1646 set_private_create_file_mode(&mut options);
1647 let file = options
1648 .open(path)
1649 .with_context(|| format!("create raw mirror file {}", path.display()))?;
1650 Ok(file)
1651}
1652
1653#[cfg(unix)]
1654fn create_private_dir_all(path: &Path) -> std::io::Result<()> {
1655 use std::os::unix::fs::DirBuilderExt;
1656
1657 let mut builder = fs::DirBuilder::new();
1658 builder.recursive(true).mode(0o700).create(path)
1659}
1660
1661#[cfg(not(unix))]
1662fn create_private_dir_all(path: &Path) -> std::io::Result<()> {
1663 fs::create_dir_all(path)
1664}
1665
1666#[cfg(unix)]
1667fn set_private_create_file_mode(options: &mut OpenOptions) {
1668 use std::os::unix::fs::OpenOptionsExt;
1669
1670 options.mode(0o600);
1671}
1672
1673#[cfg(not(unix))]
1674fn set_private_create_file_mode(_options: &mut OpenOptions) {}
1675
1676fn sync_file(path: &Path) -> Result<()> {
1677 let mut options = OpenOptions::new();
1678 options.read(true);
1679 #[cfg(windows)]
1680 options.write(true);
1681 options
1682 .open(path)
1683 .and_then(|file| file.sync_all())
1684 .with_context(|| format!("sync raw mirror file {}", path.display()))
1685}
1686
1687#[cfg(not(windows))]
1688fn sync_parent(path: &Path) -> Result<()> {
1689 let Some(parent) = path.parent() else {
1690 return Ok(());
1691 };
1692 File::open(parent)
1693 .and_then(|file| file.sync_all())
1694 .with_context(|| format!("sync raw mirror parent {}", parent.display()))
1695}
1696
1697#[cfg(windows)]
1698fn sync_parent(_path: &Path) -> Result<()> {
1699 Ok(())
1700}
1701
1702fn unique_temp_path(dir: &Path, label: &str) -> PathBuf {
1703 let nonce = TEMP_NONCE.fetch_add(1, Ordering::Relaxed);
1704 let nanos = SystemTime::now()
1705 .duration_since(UNIX_EPOCH)
1706 .unwrap_or_default()
1707 .as_nanos();
1708 dir.join(format!(
1709 ".{label}.{}.{}.{}.tmp",
1710 std::process::id(),
1711 nanos,
1712 nonce
1713 ))
1714}
1715
1716fn unique_capture_temp_dir(root: &Path) -> PathBuf {
1717 let nonce = TEMP_NONCE.fetch_add(1, Ordering::Relaxed);
1718 let nanos = SystemTime::now()
1719 .duration_since(UNIX_EPOCH)
1720 .unwrap_or_default()
1721 .as_nanos();
1722 root.join("tmp").join(format!(
1723 "capture.{}.{}.{}",
1724 std::process::id(),
1725 nanos,
1726 nonce
1727 ))
1728}
1729
1730fn remove_temp_best_effort(path: &Path) {
1731 if let Err(err) = fs::remove_file(path) {
1732 tracing::debug!(
1733 path = %path.display(),
1734 error = %err,
1735 "failed to remove raw mirror temp file"
1736 );
1737 }
1738}
1739
1740fn remove_empty_temp_dir_best_effort(path: &Path) {
1741 if let Err(err) = fs::remove_dir(path) {
1742 tracing::debug!(
1743 path = %path.display(),
1744 error = %err,
1745 "failed to remove raw mirror temp directory"
1746 );
1747 }
1748}
1749
1750fn redacted_original_path(provider: &str, source_path: &Path) -> String {
1751 let file_name = source_path
1752 .file_name()
1753 .and_then(|name| name.to_str())
1754 .unwrap_or("session");
1755 format!("[{provider}]/{file_name}")
1756}
1757
1758fn now_ms() -> i64 {
1759 system_time_to_ms(SystemTime::now()).unwrap_or(0)
1760}
1761
1762fn system_time_to_ms(time: SystemTime) -> Option<i64> {
1763 time.duration_since(UNIX_EPOCH)
1764 .ok()
1765 .and_then(|duration| i64::try_from(duration.as_millis()).ok())
1766}
1767
1768fn system_time_to_ns(time: SystemTime) -> Option<u128> {
1769 time.duration_since(UNIX_EPOCH)
1770 .ok()
1771 .map(|duration| duration.as_nanos())
1772}
1773
1774#[cfg(unix)]
1775fn set_private_dir_permissions(path: &Path) -> Result<()> {
1776 use std::os::unix::fs::PermissionsExt;
1777 fs::set_permissions(path, fs::Permissions::from_mode(0o700))
1778 .with_context(|| format!("set raw mirror dir permissions {}", path.display()))
1779}
1780
1781#[cfg(not(unix))]
1782fn set_private_dir_permissions(_path: &Path) -> Result<()> {
1783 Ok(())
1784}
1785
1786#[cfg(unix)]
1787fn set_private_file_permissions(path: &Path) -> Result<()> {
1788 use std::os::unix::fs::PermissionsExt;
1789 fs::set_permissions(path, fs::Permissions::from_mode(0o600))
1790 .with_context(|| format!("set raw mirror file permissions {}", path.display()))
1791}
1792
1793#[cfg(not(unix))]
1794fn set_private_file_permissions(_path: &Path) -> Result<()> {
1795 Ok(())
1796}
1797
1798#[cfg(test)]
1799mod tests {
1800 use super::*;
1801
1802 #[test]
1803 fn capture_source_file_writes_doctor_compatible_manifest_idempotently() {
1804 let temp = tempfile::TempDir::new().expect("tempdir");
1805 let data_dir = temp.path().join("cass-data");
1806 let source_path = temp.path().join("rollout-fixture.jsonl");
1807 let source_bytes = b"{\"type\":\"message\",\"text\":\"hello\"}\n";
1808 fs::write(&source_path, source_bytes).expect("write source");
1809 let db_link = RawMirrorDbLink {
1810 conversation_id: Some(42),
1811 message_count: Some(1),
1812 source_path: Some(source_path.display().to_string()),
1813 started_at_ms: Some(1_733_000_000_000),
1814 };
1815
1816 let first = capture_source_file(RawMirrorCaptureInput {
1817 data_dir: &data_dir,
1818 provider: "codex",
1819 source_id: "local",
1820 origin_kind: "local",
1821 origin_host: None,
1822 source_path: &source_path,
1823 db_links: std::slice::from_ref(&db_link),
1824 })
1825 .expect("first capture");
1826 let second = capture_source_file(RawMirrorCaptureInput {
1827 data_dir: &data_dir,
1828 provider: "codex",
1829 source_id: "local",
1830 origin_kind: "local",
1831 origin_host: None,
1832 source_path: &source_path,
1833 db_links: std::slice::from_ref(&db_link),
1834 })
1835 .expect("second capture");
1836
1837 assert_eq!(first.manifest_id, second.manifest_id);
1838 assert_eq!(first.blob_blake3, second.blob_blake3);
1839 assert_eq!(first.captured_at_ms, second.captured_at_ms);
1840 assert_eq!(first.source_mtime_ms, second.source_mtime_ms);
1841 assert!(!first.already_present);
1842 assert!(second.already_present);
1843 assert_eq!(fs::read(&source_path).expect("source bytes"), source_bytes);
1844
1845 let blob_path = data_dir
1846 .join(RAW_MIRROR_ROOT_DIR)
1847 .join(RAW_MIRROR_VERSION_DIR)
1848 .join(&first.blob_relative_path);
1849 let manifest_path = data_dir
1850 .join(RAW_MIRROR_ROOT_DIR)
1851 .join(RAW_MIRROR_VERSION_DIR)
1852 .join(&first.manifest_relative_path);
1853 assert_eq!(fs::read(blob_path).expect("blob bytes"), source_bytes);
1854
1855 let manifest: Value =
1856 serde_json::from_slice(&fs::read(&manifest_path).expect("manifest bytes"))
1857 .expect("manifest json");
1858 assert_eq!(
1859 manifest["manifest_kind"].as_str(),
1860 Some(RAW_MIRROR_MANIFEST_KIND)
1861 );
1862 assert_eq!(manifest["provider"].as_str(), Some("codex"));
1863 assert_eq!(
1864 manifest["blob_blake3"].as_str(),
1865 Some(first.blob_blake3.as_str())
1866 );
1867 assert_eq!(
1868 manifest["redacted_original_path"].as_str(),
1869 Some("[codex]/rollout-fixture.jsonl")
1870 );
1871 assert_eq!(
1872 manifest["db_links"][0]["conversation_id"].as_i64(),
1873 Some(42)
1874 );
1875 assert_eq!(manifest["db_links"][0]["message_count"].as_u64(), Some(1));
1876 assert!(
1877 manifest["manifest_blake3"]
1878 .as_str()
1879 .is_some_and(|value| value.starts_with("doctor-raw-mirror-manifest-v1-"))
1880 );
1881 let tmp_root = data_dir
1882 .join(RAW_MIRROR_ROOT_DIR)
1883 .join(RAW_MIRROR_VERSION_DIR)
1884 .join("tmp");
1885 assert_eq!(
1886 fs::read_dir(&tmp_root)
1887 .expect("raw mirror tmp root")
1888 .collect::<Vec<_>>()
1889 .len(),
1890 0,
1891 "successful captures must not leave doctor-visible interrupted temp artifacts"
1892 );
1893
1894 #[cfg(unix)]
1895 {
1896 use std::os::unix::fs::PermissionsExt;
1897
1898 let root = data_dir
1899 .join(RAW_MIRROR_ROOT_DIR)
1900 .join(RAW_MIRROR_VERSION_DIR);
1901 assert_eq!(
1902 fs::metadata(&root)
1903 .expect("raw mirror root metadata")
1904 .permissions()
1905 .mode()
1906 & 0o777,
1907 0o700
1908 );
1909 assert_eq!(
1910 fs::metadata(&manifest_path)
1911 .expect("manifest metadata")
1912 .permissions()
1913 .mode()
1914 & 0o777,
1915 0o600
1916 );
1917 }
1918 }
1919
1920 #[test]
1921 fn capture_source_file_merges_db_links_into_existing_manifest() {
1922 let temp = tempfile::TempDir::new().expect("tempdir");
1923 let data_dir = temp.path().join("cass-data");
1924 let source_path = temp.path().join("preparse-then-parsed.jsonl");
1925 let source_bytes = b"{\"type\":\"message\",\"text\":\"hello\"}\n";
1926 fs::write(&source_path, source_bytes).expect("write source");
1927
1928 let preparse = capture_source_file(RawMirrorCaptureInput {
1929 data_dir: &data_dir,
1930 provider: "codex",
1931 source_id: "local",
1932 origin_kind: "local",
1933 origin_host: None,
1934 source_path: &source_path,
1935 db_links: &[],
1936 })
1937 .expect("preparse capture");
1938
1939 let parsed_link = RawMirrorDbLink {
1940 conversation_id: None,
1941 message_count: Some(1),
1942 source_path: Some(source_path.display().to_string()),
1943 started_at_ms: Some(1_733_000_000_000),
1944 };
1945 let parsed = capture_source_file(RawMirrorCaptureInput {
1946 data_dir: &data_dir,
1947 provider: "codex",
1948 source_id: "local",
1949 origin_kind: "local",
1950 origin_host: None,
1951 source_path: &source_path,
1952 db_links: std::slice::from_ref(&parsed_link),
1953 })
1954 .expect("parsed capture");
1955
1956 assert_eq!(preparse.manifest_id, parsed.manifest_id);
1957 assert_eq!(preparse.blob_blake3, parsed.blob_blake3);
1958 assert!(parsed.already_present);
1959
1960 let manifest_path = data_dir
1961 .join(RAW_MIRROR_ROOT_DIR)
1962 .join(RAW_MIRROR_VERSION_DIR)
1963 .join(&parsed.manifest_relative_path);
1964 let manifest = read_raw_mirror_manifest(&manifest_path).expect("merged manifest");
1965 assert_eq!(
1966 manifest.db_links,
1967 vec![parsed_link],
1968 "second capture must enrich the pre-parse manifest with DB-link evidence"
1969 );
1970 let expected_manifest_blake3 = raw_mirror_manifest_blake3(&manifest);
1971 assert_eq!(
1972 manifest.manifest_blake3.as_deref(),
1973 Some(expected_manifest_blake3.as_str()),
1974 "manifest checksum must be recomputed after DB-link merge"
1975 );
1976 assert_eq!(fs::read(&source_path).expect("source bytes"), source_bytes);
1977 }
1978
1979 #[test]
1980 fn merge_manifest_db_links_rejects_hostile_relative_paths() {
1981 let temp = tempfile::TempDir::new().expect("tempdir");
1982 let data_dir = temp.path().join("cass-data");
1983 let db_link = RawMirrorDbLink {
1984 conversation_id: Some(42),
1985 message_count: Some(1),
1986 source_path: Some("source.jsonl".to_string()),
1987 started_at_ms: Some(1_733_000_000_000),
1988 };
1989
1990 for relative in [
1991 "../escape.json",
1992 "/tmp/escape.json",
1993 "manifests/../escape.json",
1994 "blobs/blake3/ab/not-a-manifest.raw",
1995 "manifests/not-json.txt",
1996 ] {
1997 let err = merge_manifest_db_links(&data_dir, relative, std::slice::from_ref(&db_link))
1998 .expect_err("hostile manifest path should be rejected");
1999 assert!(
2000 err.to_string().contains("raw mirror manifest path"),
2001 "unexpected error for {relative}: {err}"
2002 );
2003 }
2004 }
2005
2006 #[cfg(unix)]
2007 #[test]
2008 fn merge_manifest_db_links_rejects_symlink_manifest_path() {
2009 let temp = tempfile::TempDir::new().expect("tempdir");
2010 let data_dir = temp.path().join("cass-data");
2011 let manifest_dir = data_dir.join("raw-mirror/v1/manifests");
2012 fs::create_dir_all(&manifest_dir).expect("manifest dir");
2013 let outside = temp.path().join("outside.json");
2014 fs::write(&outside, "{}").expect("outside manifest");
2015 std::os::unix::fs::symlink(&outside, manifest_dir.join("link.json"))
2016 .expect("symlink manifest");
2017 let db_link = RawMirrorDbLink {
2018 conversation_id: Some(42),
2019 message_count: Some(1),
2020 source_path: Some("source.jsonl".to_string()),
2021 started_at_ms: Some(1_733_000_000_000),
2022 };
2023
2024 let err = merge_manifest_db_links(
2025 &data_dir,
2026 "manifests/link.json",
2027 std::slice::from_ref(&db_link),
2028 )
2029 .expect_err("symlink manifest should be rejected");
2030 assert!(
2031 err.to_string().contains("symlink raw mirror manifest"),
2032 "unexpected symlink-manifest error: {err}"
2033 );
2034 }
2035
2036 #[test]
2037 fn capture_source_file_deduplicates_blob_for_distinct_source_paths() {
2038 let temp = tempfile::TempDir::new().expect("tempdir");
2039 let data_dir = temp.path().join("cass-data");
2040 let first_source = temp.path().join("first.jsonl");
2041 let second_source = temp.path().join("second.jsonl");
2042 let source_bytes = b"{\"type\":\"message\",\"text\":\"shared\"}\n";
2043 fs::write(&first_source, source_bytes).expect("write first source");
2044 fs::write(&second_source, source_bytes).expect("write second source");
2045
2046 let first = capture_source_file(RawMirrorCaptureInput {
2047 data_dir: &data_dir,
2048 provider: "codex",
2049 source_id: "local",
2050 origin_kind: "local",
2051 origin_host: None,
2052 source_path: &first_source,
2053 db_links: &[],
2054 })
2055 .expect("first capture");
2056 let second = capture_source_file(RawMirrorCaptureInput {
2057 data_dir: &data_dir,
2058 provider: "codex",
2059 source_id: "local",
2060 origin_kind: "local",
2061 origin_host: None,
2062 source_path: &second_source,
2063 db_links: &[],
2064 })
2065 .expect("second capture");
2066
2067 assert_eq!(first.blob_blake3, second.blob_blake3);
2068 assert_eq!(first.blob_relative_path, second.blob_relative_path);
2069 assert_ne!(first.manifest_id, second.manifest_id);
2070 assert!(
2071 !second.already_present,
2072 "a duplicate blob with a new source manifest is not a full capture replay"
2073 );
2074
2075 let manifest_root = data_dir
2076 .join(RAW_MIRROR_ROOT_DIR)
2077 .join(RAW_MIRROR_VERSION_DIR)
2078 .join("manifests");
2079 let manifests = fs::read_dir(manifest_root)
2080 .expect("manifest dir")
2081 .collect::<std::io::Result<Vec<_>>>()
2082 .expect("manifest entries");
2083 assert_eq!(manifests.len(), 2);
2084
2085 let summary = storage_summary(&data_dir);
2086 assert!(summary.initialized);
2087 assert_eq!(summary.manifest_count, 2);
2088 assert_eq!(summary.unique_blob_count, 1);
2089 assert_eq!(summary.total_blob_bytes, source_bytes.len() as u64);
2090 assert_eq!(summary.largest_blob_bytes, source_bytes.len() as u64);
2091 assert_eq!(summary.missing_blob_count, 0);
2092 assert_eq!(summary.invalid_manifest_count, 0);
2093 assert!(summary.total_storage_bytes >= source_bytes.len() as u64);
2094 }
2095
2096 #[test]
2097 fn storage_summary_rejects_hostile_blob_relative_path() {
2098 let temp = tempfile::TempDir::new().expect("tempdir");
2099 let data_dir = temp.path().join("cass-data");
2100 let source_path = temp.path().join("source.jsonl");
2101 fs::write(
2102 &source_path,
2103 b"{\"type\":\"message\",\"text\":\"hostile\"}\n",
2104 )
2105 .expect("write source");
2106
2107 let captured = capture_source_file(RawMirrorCaptureInput {
2108 data_dir: &data_dir,
2109 provider: "codex",
2110 source_id: "local",
2111 origin_kind: "local",
2112 origin_host: None,
2113 source_path: &source_path,
2114 db_links: &[],
2115 })
2116 .expect("capture source");
2117 let manifest_path = data_dir
2118 .join(RAW_MIRROR_ROOT_DIR)
2119 .join(RAW_MIRROR_VERSION_DIR)
2120 .join(&captured.manifest_relative_path);
2121 let mut manifest = read_raw_mirror_manifest(&manifest_path).expect("read manifest");
2122 manifest.blob_relative_path = "../outside.raw".to_string();
2123 manifest.manifest_blake3 = Some(raw_mirror_manifest_blake3(&manifest));
2124 fs::write(
2125 &manifest_path,
2126 serde_json::to_vec_pretty(&manifest).expect("serialize manifest"),
2127 )
2128 .expect("tamper manifest");
2129
2130 let summary = storage_summary(&data_dir);
2131 assert_eq!(summary.manifest_count, 1);
2132 assert_eq!(summary.invalid_manifest_count, 1);
2133 assert_eq!(summary.unique_blob_count, 0);
2134 assert_eq!(summary.total_blob_bytes, 0);
2135 }
2136
2137 #[test]
2138 fn prune_fails_closed_on_hostile_manifest_inventory() {
2139 let temp = tempfile::TempDir::new().expect("tempdir");
2140 let data_dir = temp.path().join("cass-data");
2141 let source_path = temp.path().join("source.jsonl");
2142 fs::write(
2143 &source_path,
2144 b"{\"type\":\"message\",\"text\":\"hostile\"}\n",
2145 )
2146 .expect("write source");
2147
2148 let captured = capture_source_file(RawMirrorCaptureInput {
2149 data_dir: &data_dir,
2150 provider: "codex",
2151 source_id: "local",
2152 origin_kind: "local",
2153 origin_host: None,
2154 source_path: &source_path,
2155 db_links: &[],
2156 })
2157 .expect("capture source");
2158 let root = data_dir
2159 .join(RAW_MIRROR_ROOT_DIR)
2160 .join(RAW_MIRROR_VERSION_DIR);
2161 let manifest_path = root.join(&captured.manifest_relative_path);
2162 let blob_path = root.join(&captured.blob_relative_path);
2163 let mut manifest = read_raw_mirror_manifest(&manifest_path).expect("read manifest");
2164 manifest.blob_relative_path = "../outside.raw".to_string();
2165 manifest.manifest_blake3 = Some(raw_mirror_manifest_blake3(&manifest));
2166 fs::write(
2167 &manifest_path,
2168 serde_json::to_vec_pretty(&manifest).expect("serialize manifest"),
2169 )
2170 .expect("tamper manifest");
2171
2172 let err = prune(
2173 &data_dir,
2174 RawMirrorPruneOptions {
2175 older_than_ms: Some(0),
2176 max_size_bytes: None,
2177 keep_tags: Vec::new(),
2178 safety_hold_down_ms: 0,
2179 apply: true,
2180 },
2181 )
2182 .expect_err("hostile inventory should fail closed");
2183
2184 assert!(
2185 err.to_string().contains("unexpected blob path"),
2186 "error should explain the unsafe manifest inventory: {err}"
2187 );
2188 assert!(manifest_path.exists());
2189 assert!(blob_path.exists());
2190 assert!(!root.join("pruned.jsonl").exists());
2191 }
2192
2193 #[test]
2194 fn prune_dry_run_audits_without_removing_manifest_or_blob() {
2195 let temp = tempfile::TempDir::new().expect("tempdir");
2196 let data_dir = temp.path().join("cass-data");
2197 let source_path = temp.path().join("source.jsonl");
2198 fs::write(&source_path, b"{\"type\":\"message\",\"text\":\"old\"}\n")
2199 .expect("write source");
2200 let captured = capture_source_file(RawMirrorCaptureInput {
2201 data_dir: &data_dir,
2202 provider: "codex",
2203 source_id: "local",
2204 origin_kind: "local",
2205 origin_host: None,
2206 source_path: &source_path,
2207 db_links: &[],
2208 })
2209 .expect("capture source");
2210
2211 let report = prune(
2212 &data_dir,
2213 RawMirrorPruneOptions {
2214 older_than_ms: Some(0),
2215 max_size_bytes: None,
2216 keep_tags: Vec::new(),
2217 safety_hold_down_ms: 0,
2218 apply: false,
2219 },
2220 )
2221 .expect("dry-run prune");
2222
2223 assert!(report.initialized);
2224 assert_eq!(report.mode, "dry-run");
2225 assert_eq!(report.planned_manifest_count, 1);
2226 assert_eq!(report.planned_blob_count, 1);
2227 assert_eq!(report.applied_reclaim_bytes, 0);
2228 let root = data_dir
2229 .join(RAW_MIRROR_ROOT_DIR)
2230 .join(RAW_MIRROR_VERSION_DIR);
2231 assert!(root.join(&captured.manifest_relative_path).exists());
2232 assert!(root.join(&captured.blob_relative_path).exists());
2233 let audit_path = root.join("pruned.jsonl");
2234 let audit = fs::read_to_string(audit_path).expect("read audit");
2235 assert!(audit.contains("\"mode\":\"dry-run\""));
2236 assert!(audit.contains("\"applied\":false"));
2237 }
2238
2239 #[test]
2240 #[cfg(unix)]
2241 fn prune_refuses_symlinked_audit_log_without_writing_target() -> Result<()> {
2242 use std::os::unix::fs::symlink;
2243
2244 let temp = tempfile::TempDir::new()?;
2245 let data_dir = temp.path().join("cass-data");
2246 let source_path = temp.path().join("source.jsonl");
2247 let protected_audit_target = temp.path().join("protected-prune-audit.jsonl");
2248 fs::write(&source_path, b"{\"type\":\"message\",\"text\":\"old\"}\n")?;
2249 fs::write(&protected_audit_target, b"protected\n")?;
2250
2251 let captured = capture_source_file(RawMirrorCaptureInput {
2252 data_dir: &data_dir,
2253 provider: "codex",
2254 source_id: "local",
2255 origin_kind: "local",
2256 origin_host: None,
2257 source_path: &source_path,
2258 db_links: &[],
2259 })?;
2260 let root = data_dir
2261 .join(RAW_MIRROR_ROOT_DIR)
2262 .join(RAW_MIRROR_VERSION_DIR);
2263 let audit_path = root.join("pruned.jsonl");
2264 symlink(&protected_audit_target, &audit_path)?;
2265
2266 let err = match prune(
2267 &data_dir,
2268 RawMirrorPruneOptions {
2269 older_than_ms: Some(0),
2270 max_size_bytes: None,
2271 keep_tags: Vec::new(),
2272 safety_hold_down_ms: 0,
2273 apply: false,
2274 },
2275 ) {
2276 Ok(_) => anyhow::bail!("symlinked prune audit log was accepted"),
2277 Err(err) => err,
2278 };
2279
2280 if !err.to_string().contains("prune audit through symlink") {
2281 anyhow::bail!("unexpected audit symlink error: {err:#}");
2282 }
2283 if !fs::read(&protected_audit_target)?
2284 .as_slice()
2285 .eq(b"protected\n")
2286 {
2287 anyhow::bail!("protected audit target was modified");
2288 }
2289 if !fs::read_link(&audit_path)?
2290 .as_os_str()
2291 .eq(protected_audit_target.as_os_str())
2292 {
2293 anyhow::bail!("audit path did not remain a symlink to the protected target");
2294 }
2295 if !root.join(&captured.manifest_relative_path).exists() {
2296 anyhow::bail!("failed audit append removed the captured manifest");
2297 }
2298 if !root.join(&captured.blob_relative_path).exists() {
2299 anyhow::bail!("failed audit append removed the captured blob");
2300 }
2301 Ok(())
2302 }
2303
2304 #[test]
2305 fn prune_apply_removes_selected_manifest_and_unreferenced_blob() {
2306 let temp = tempfile::TempDir::new().expect("tempdir");
2307 let data_dir = temp.path().join("cass-data");
2308 let source_path = temp.path().join("source.jsonl");
2309 fs::write(&source_path, b"{\"type\":\"message\",\"text\":\"apply\"}\n")
2310 .expect("write source");
2311 let captured = capture_source_file(RawMirrorCaptureInput {
2312 data_dir: &data_dir,
2313 provider: "codex",
2314 source_id: "local",
2315 origin_kind: "local",
2316 origin_host: None,
2317 source_path: &source_path,
2318 db_links: &[],
2319 })
2320 .expect("capture source");
2321 let root = data_dir
2322 .join(RAW_MIRROR_ROOT_DIR)
2323 .join(RAW_MIRROR_VERSION_DIR);
2324 let manifest_path = root.join(&captured.manifest_relative_path);
2325 let blob_path = root.join(&captured.blob_relative_path);
2326
2327 let report = prune(
2328 &data_dir,
2329 RawMirrorPruneOptions {
2330 older_than_ms: Some(0),
2331 max_size_bytes: None,
2332 keep_tags: Vec::new(),
2333 safety_hold_down_ms: 0,
2334 apply: true,
2335 },
2336 )
2337 .expect("apply prune");
2338
2339 assert_eq!(report.applied_manifest_count, 1);
2340 assert_eq!(report.applied_blob_count, 1);
2341 assert!(!manifest_path.exists());
2342 assert!(!blob_path.exists());
2343 let audit = fs::read_to_string(root.join("pruned.jsonl")).expect("read audit");
2344 assert!(audit.contains("\"mode\":\"apply\""));
2345 assert!(audit.contains("\"applied\":true"));
2346 }
2347
2348 #[test]
2349 fn prune_apply_keeps_blob_referenced_by_retained_manifest() {
2350 let temp = tempfile::TempDir::new().expect("tempdir");
2351 let data_dir = temp.path().join("cass-data");
2352 let first_source = temp.path().join("first.jsonl");
2353 let second_source = temp.path().join("second.jsonl");
2354 let bytes = b"{\"type\":\"message\",\"text\":\"shared-retained\"}\n";
2355 fs::write(&first_source, bytes).expect("write first");
2356 fs::write(&second_source, bytes).expect("write second");
2357 let first = capture_source_file(RawMirrorCaptureInput {
2358 data_dir: &data_dir,
2359 provider: "codex",
2360 source_id: "local",
2361 origin_kind: "local",
2362 origin_host: None,
2363 source_path: &first_source,
2364 db_links: &[],
2365 })
2366 .expect("capture first");
2367 let second = capture_source_file(RawMirrorCaptureInput {
2368 data_dir: &data_dir,
2369 provider: "codex",
2370 source_id: "local",
2371 origin_kind: "local",
2372 origin_host: None,
2373 source_path: &second_source,
2374 db_links: &[],
2375 })
2376 .expect("capture second");
2377 let root = data_dir
2378 .join(RAW_MIRROR_ROOT_DIR)
2379 .join(RAW_MIRROR_VERSION_DIR);
2380 let first_manifest_path = root.join(&first.manifest_relative_path);
2381 let second_manifest_path = root.join(&second.manifest_relative_path);
2382 let mut first_manifest =
2383 read_raw_mirror_manifest(&first_manifest_path).expect("first manifest");
2384 first_manifest.captured_at_ms = now_ms().saturating_sub(2 * 86_400_000);
2385 first_manifest.manifest_blake3 = Some(raw_mirror_manifest_blake3(&first_manifest));
2386 fs::write(
2387 &first_manifest_path,
2388 serde_json::to_vec_pretty(&first_manifest).expect("serialize first manifest"),
2389 )
2390 .expect("rewrite first manifest");
2391
2392 let report = prune(
2393 &data_dir,
2394 RawMirrorPruneOptions {
2395 older_than_ms: Some(86_400_000),
2396 max_size_bytes: None,
2397 keep_tags: Vec::new(),
2398 safety_hold_down_ms: 0,
2399 apply: true,
2400 },
2401 )
2402 .expect("apply one-manifest prune");
2403
2404 assert_eq!(report.applied_manifest_count, 1);
2405 assert_eq!(report.applied_blob_count, 0);
2406 assert!(!first_manifest_path.exists());
2407 assert!(second_manifest_path.exists());
2408 assert!(
2409 root.join(&first.blob_relative_path).exists(),
2410 "shared blob must stay while a retained manifest still references it"
2411 );
2412 }
2413
2414 #[test]
2415 fn prune_apply_keep_tag_pins_linked_manifest_and_blob() {
2416 use frankensqlite::compat::ConnectionExt as _;
2417
2418 let temp = tempfile::TempDir::new().expect("tempdir");
2419 let data_dir = temp.path().join("cass-data");
2420 std::fs::create_dir_all(&data_dir).expect("create data dir");
2421 let source_path = temp.path().join("tagged.jsonl");
2422 fs::write(
2423 &source_path,
2424 b"{\"type\":\"message\",\"text\":\"tagged\"}\n",
2425 )
2426 .expect("write source");
2427 let db_link = RawMirrorDbLink {
2428 conversation_id: Some(7),
2429 message_count: Some(1),
2430 source_path: Some(source_path.display().to_string()),
2431 started_at_ms: Some(1_733_000_000_000),
2432 };
2433 let captured = capture_source_file(RawMirrorCaptureInput {
2434 data_dir: &data_dir,
2435 provider: "codex",
2436 source_id: "local",
2437 origin_kind: "local",
2438 origin_host: None,
2439 source_path: &source_path,
2440 db_links: std::slice::from_ref(&db_link),
2441 })
2442 .expect("capture source");
2443 let db_path = data_dir.join("agent_search.db");
2444 let conn = frankensqlite::Connection::open(db_path.to_string_lossy().into_owned())
2445 .expect("open keep-tag db");
2446 conn.execute_compat(
2447 "CREATE TABLE tags (id INTEGER PRIMARY KEY, name TEXT NOT NULL UNIQUE)",
2448 frankensqlite::params![],
2449 )
2450 .expect("create tags");
2451 conn.execute_compat(
2452 "CREATE TABLE conversation_tags (conversation_id INTEGER NOT NULL, tag_id INTEGER NOT NULL, PRIMARY KEY (conversation_id, tag_id))",
2453 frankensqlite::params![],
2454 )
2455 .expect("create conversation_tags");
2456 conn.execute_compat(
2457 "INSERT INTO tags (id, name) VALUES (1, 'keep')",
2458 frankensqlite::params![],
2459 )
2460 .expect("insert tag");
2461 conn.execute_compat(
2462 "INSERT INTO conversation_tags (conversation_id, tag_id) VALUES (7, 1)",
2463 frankensqlite::params![],
2464 )
2465 .expect("insert conversation tag");
2466 drop(conn);
2467
2468 let report = prune(
2469 &data_dir,
2470 RawMirrorPruneOptions {
2471 older_than_ms: Some(0),
2472 max_size_bytes: Some(0),
2473 keep_tags: vec!["keep".to_string()],
2474 safety_hold_down_ms: 0,
2475 apply: true,
2476 },
2477 )
2478 .expect("keep-tag prune");
2479
2480 let root = data_dir
2481 .join(RAW_MIRROR_ROOT_DIR)
2482 .join(RAW_MIRROR_VERSION_DIR);
2483 assert_eq!(report.pinned_manifest_count, 1);
2484 assert_eq!(report.pinned_blob_count, 1);
2485 assert_eq!(report.planned_manifest_count, 0);
2486 assert_eq!(report.planned_blob_count, 0);
2487 assert!(root.join(&captured.manifest_relative_path).exists());
2488 assert!(root.join(&captured.blob_relative_path).exists());
2489 }
2490
2491 #[test]
2492 fn prune_apply_safety_hold_down_pins_recent_manifest_during_size_prune() {
2493 let temp = tempfile::TempDir::new().expect("tempdir");
2494 let data_dir = temp.path().join("cass-data");
2495 let source_path = temp.path().join("recent.jsonl");
2496 fs::write(
2497 &source_path,
2498 b"{\"type\":\"message\",\"text\":\"recent\"}\n",
2499 )
2500 .expect("write source");
2501 let captured = capture_source_file(RawMirrorCaptureInput {
2502 data_dir: &data_dir,
2503 provider: "codex",
2504 source_id: "local",
2505 origin_kind: "local",
2506 origin_host: None,
2507 source_path: &source_path,
2508 db_links: &[],
2509 })
2510 .expect("capture source");
2511
2512 let report = prune(
2513 &data_dir,
2514 RawMirrorPruneOptions {
2515 older_than_ms: None,
2516 max_size_bytes: Some(0),
2517 keep_tags: Vec::new(),
2518 safety_hold_down_ms: 7 * 86_400_000,
2519 apply: true,
2520 },
2521 )
2522 .expect("hold-down prune");
2523
2524 let root = data_dir
2525 .join(RAW_MIRROR_ROOT_DIR)
2526 .join(RAW_MIRROR_VERSION_DIR);
2527 assert_eq!(report.pinned_manifest_count, 1);
2528 assert_eq!(report.pinned_blob_count, 1);
2529 assert_eq!(report.planned_manifest_count, 0);
2530 assert_eq!(report.planned_blob_count, 0);
2531 assert!(root.join(&captured.manifest_relative_path).exists());
2532 assert!(root.join(&captured.blob_relative_path).exists());
2533 }
2534
2535 #[test]
2536 fn capture_source_file_revalidates_cached_blob_contents() {
2537 let temp = tempfile::TempDir::new().expect("tempdir");
2538 let data_dir = temp.path().join("cass-data");
2539 let source_path = temp.path().join("cached-source.jsonl");
2540 let source_bytes = b"{\"type\":\"message\",\"text\":\"cache me\"}\n";
2541 fs::write(&source_path, source_bytes).expect("write source");
2542
2543 let first = capture_source_file(RawMirrorCaptureInput {
2544 data_dir: &data_dir,
2545 provider: "codex",
2546 source_id: "local",
2547 origin_kind: "local",
2548 origin_host: None,
2549 source_path: &source_path,
2550 db_links: &[],
2551 })
2552 .expect("first capture");
2553
2554 let blob_path = data_dir
2555 .join(RAW_MIRROR_ROOT_DIR)
2556 .join(RAW_MIRROR_VERSION_DIR)
2557 .join(&first.blob_relative_path);
2558 fs::write(&blob_path, b"corrupted cached blob").expect("corrupt cached blob");
2559
2560 let err = capture_source_file(RawMirrorCaptureInput {
2561 data_dir: &data_dir,
2562 provider: "codex",
2563 source_id: "local",
2564 origin_kind: "local",
2565 origin_host: None,
2566 source_path: &source_path,
2567 db_links: &[],
2568 })
2569 .expect_err("corrupted content-addressed blob must be rejected");
2570 assert!(
2571 err.to_string().contains("existing raw mirror blob"),
2572 "unexpected cached-blob error: {err:#}"
2573 );
2574 assert_eq!(fs::read(&source_path).expect("source bytes"), source_bytes);
2575 }
2576
2577 #[cfg(unix)]
2578 #[test]
2579 fn capture_source_file_does_not_reuse_cache_after_same_size_mtime_preserving_rewrite() {
2580 let temp = tempfile::TempDir::new().expect("tempdir");
2581 let data_dir = temp.path().join("cass-data");
2582 let source_path = temp.path().join("same-size-rewrite.jsonl");
2583 let first_bytes = b"same length payload A\n";
2584 let second_bytes = b"same length payload B\n";
2585 fs::write(&source_path, first_bytes).expect("write first source");
2586
2587 let first_modified = fs::metadata(&source_path)
2588 .expect("first metadata")
2589 .modified()
2590 .expect("first modified time");
2591 let first = capture_source_file(RawMirrorCaptureInput {
2592 data_dir: &data_dir,
2593 provider: "codex",
2594 source_id: "local",
2595 origin_kind: "local",
2596 origin_host: None,
2597 source_path: &source_path,
2598 db_links: &[],
2599 })
2600 .expect("first capture");
2601
2602 std::thread::sleep(std::time::Duration::from_millis(5));
2603 fs::write(&source_path, second_bytes).expect("rewrite source");
2604 let source = OpenOptions::new()
2605 .write(true)
2606 .open(&source_path)
2607 .expect("open rewritten source");
2608 source
2609 .set_times(std::fs::FileTimes::new().set_modified(first_modified))
2610 .expect("restore original mtime");
2611
2612 let second = capture_source_file(RawMirrorCaptureInput {
2613 data_dir: &data_dir,
2614 provider: "codex",
2615 source_id: "local",
2616 origin_kind: "local",
2617 origin_host: None,
2618 source_path: &source_path,
2619 db_links: &[],
2620 })
2621 .expect("second capture");
2622
2623 assert_ne!(first.blob_blake3, second.blob_blake3);
2624 assert_eq!(
2625 second.blob_blake3,
2626 blake3::hash(second_bytes).to_hex().to_string()
2627 );
2628 assert_eq!(
2629 fs::read(&source_path).expect("source bytes after rewrite"),
2630 second_bytes
2631 );
2632 }
2633
2634 #[cfg(unix)]
2635 #[test]
2636 fn capture_source_file_rejects_symlinked_existing_blob_path() {
2637 let temp = tempfile::TempDir::new().expect("tempdir");
2638 let data_dir = temp.path().join("cass-data");
2639 let source_path = temp.path().join("cached-source.jsonl");
2640 let source_bytes = b"{\"type\":\"message\",\"text\":\"cache me\"}\n";
2641 fs::write(&source_path, source_bytes).expect("write source");
2642
2643 let blob_blake3 = blake3::hash(source_bytes).to_hex().to_string();
2644 let blob_relative_path =
2645 raw_mirror_blob_relative_path(&blob_blake3).expect("blob relative path");
2646 let blob_path = data_dir
2647 .join(RAW_MIRROR_ROOT_DIR)
2648 .join(RAW_MIRROR_VERSION_DIR)
2649 .join(&blob_relative_path);
2650 fs::create_dir_all(blob_path.parent().expect("blob parent")).expect("blob parent dir");
2651 let outside = temp.path().join("outside.raw");
2652 fs::write(&outside, source_bytes).expect("outside blob bytes");
2653 std::os::unix::fs::symlink(&outside, &blob_path).expect("symlink blob");
2654
2655 let err = capture_source_file(RawMirrorCaptureInput {
2656 data_dir: &data_dir,
2657 provider: "codex",
2658 source_id: "local",
2659 origin_kind: "local",
2660 origin_host: None,
2661 source_path: &source_path,
2662 db_links: &[],
2663 })
2664 .expect_err("symlinked content-addressed blob path must be rejected");
2665 assert!(
2666 err.to_string().contains("symlink raw mirror blob"),
2667 "unexpected symlink-blob error: {err:#}"
2668 );
2669
2670 let manifest_root = data_dir
2671 .join(RAW_MIRROR_ROOT_DIR)
2672 .join(RAW_MIRROR_VERSION_DIR)
2673 .join("manifests");
2674 assert!(
2675 !manifest_root.exists(),
2676 "failed blob publish must not write a manifest pointing at a symlinked blob"
2677 );
2678 assert_eq!(fs::read(&source_path).expect("source bytes"), source_bytes);
2679 assert_eq!(fs::read(&outside).expect("outside bytes"), source_bytes);
2680 }
2681
2682 #[cfg(unix)]
2683 #[test]
2684 fn capture_source_file_rejects_symlinked_raw_mirror_root_dir() {
2685 let temp = tempfile::TempDir::new().expect("tempdir");
2686 let data_dir = temp.path().join("cass-data");
2687 let source_path = temp.path().join("source.jsonl");
2688 let outside_mirror = temp.path().join("outside-mirror");
2689 let source_bytes = b"{\"type\":\"message\",\"text\":\"do not redirect archive\"}\n";
2690
2691 fs::create_dir_all(&data_dir).expect("data dir");
2692 fs::create_dir_all(&outside_mirror).expect("outside mirror dir");
2693 fs::write(&source_path, source_bytes).expect("write source");
2694 std::os::unix::fs::symlink(&outside_mirror, data_dir.join(RAW_MIRROR_ROOT_DIR))
2695 .expect("symlink raw mirror root");
2696
2697 let err = capture_source_file(RawMirrorCaptureInput {
2698 data_dir: &data_dir,
2699 provider: "codex",
2700 source_id: "local",
2701 origin_kind: "local",
2702 origin_host: None,
2703 source_path: &source_path,
2704 db_links: &[],
2705 })
2706 .expect_err("symlinked raw-mirror root must be rejected");
2707
2708 assert!(
2709 err.to_string().contains("symlink raw mirror dir"),
2710 "unexpected symlink-root error: {err:#}"
2711 );
2712 assert!(
2713 !outside_mirror.join(RAW_MIRROR_VERSION_DIR).exists(),
2714 "raw mirror capture must not create redirected archive state outside data_dir"
2715 );
2716 assert_eq!(fs::read(&source_path).expect("source bytes"), source_bytes);
2717 }
2718
2719 #[cfg(unix)]
2720 #[test]
2721 fn capture_source_file_rejects_symlinked_blob_directory_component() {
2722 let temp = tempfile::TempDir::new().expect("tempdir");
2723 let data_dir = temp.path().join("cass-data");
2724 let root = data_dir
2725 .join(RAW_MIRROR_ROOT_DIR)
2726 .join(RAW_MIRROR_VERSION_DIR);
2727 let source_path = temp.path().join("source.jsonl");
2728 let outside_blobs = temp.path().join("outside-blobs");
2729 let source_bytes = b"{\"type\":\"message\",\"text\":\"do not redirect blobs\"}\n";
2730
2731 fs::create_dir_all(&root).expect("raw mirror root");
2732 fs::create_dir_all(&outside_blobs).expect("outside blobs dir");
2733 fs::write(&source_path, source_bytes).expect("write source");
2734 std::os::unix::fs::symlink(&outside_blobs, root.join("blobs")).expect("symlink blobs dir");
2735
2736 let err = capture_source_file(RawMirrorCaptureInput {
2737 data_dir: &data_dir,
2738 provider: "codex",
2739 source_id: "local",
2740 origin_kind: "local",
2741 origin_host: None,
2742 source_path: &source_path,
2743 db_links: &[],
2744 })
2745 .expect_err("symlinked blob directory must be rejected");
2746
2747 assert!(
2748 err.to_string().contains("symlink raw mirror dir"),
2749 "unexpected symlink-blob-dir error: {err:#}"
2750 );
2751 assert!(
2752 !outside_blobs.join(RAW_MIRROR_HASH_ALGORITHM).exists(),
2753 "raw mirror capture must not create redirected blob state outside data_dir"
2754 );
2755 assert!(
2756 !root.join("manifests").exists(),
2757 "failed blob publish must not write a manifest"
2758 );
2759 assert_eq!(fs::read(&source_path).expect("source bytes"), source_bytes);
2760 }
2761
2762 #[test]
2763 fn capture_source_file_rejects_non_file_sources() {
2764 let temp = tempfile::TempDir::new().expect("tempdir");
2765 let data_dir = temp.path().join("cass-data");
2766 let source_dir = temp.path().join("source-dir");
2767 fs::create_dir(&source_dir).expect("source dir");
2768
2769 let err = capture_source_file(RawMirrorCaptureInput {
2770 data_dir: &data_dir,
2771 provider: "codex",
2772 source_id: "local",
2773 origin_kind: "local",
2774 origin_host: None,
2775 source_path: &source_dir,
2776 db_links: &[],
2777 })
2778 .expect_err("directory source should be rejected");
2779 assert!(
2780 err.to_string().contains("non-file source"),
2781 "unexpected non-file-source error: {err}"
2782 );
2783 assert!(
2784 !data_dir.join(RAW_MIRROR_ROOT_DIR).exists(),
2785 "rejected non-file sources must not initialize raw mirror storage"
2786 );
2787 }
2788
2789 #[cfg(unix)]
2790 #[test]
2791 fn capture_source_file_rejects_unreadable_sources_without_manifest() {
2792 use std::os::unix::fs::PermissionsExt;
2793
2794 let temp = tempfile::TempDir::new().expect("tempdir");
2795 let data_dir = temp.path().join("cass-data");
2796 let source_path = temp.path().join("unreadable.jsonl");
2797 fs::write(&source_path, b"private session bytes\n").expect("source");
2798 fs::set_permissions(&source_path, fs::Permissions::from_mode(0o000))
2799 .expect("make source unreadable");
2800
2801 let err = capture_source_file(RawMirrorCaptureInput {
2802 data_dir: &data_dir,
2803 provider: "codex",
2804 source_id: "local",
2805 origin_kind: "local",
2806 origin_host: None,
2807 source_path: &source_path,
2808 db_links: &[],
2809 })
2810 .expect_err("unreadable source should be rejected");
2811 fs::set_permissions(&source_path, fs::Permissions::from_mode(0o600))
2812 .expect("restore source perms");
2813 assert!(
2814 err.to_string().contains("open raw mirror source"),
2815 "unexpected unreadable-source error: {err}"
2816 );
2817 assert!(
2818 !data_dir.join("raw-mirror/v1/manifests").exists(),
2819 "failed unreadable-source captures must not publish manifests"
2820 );
2821 }
2822
2823 #[cfg(unix)]
2824 #[test]
2825 fn capture_source_file_rejects_symlink_sources() {
2826 use std::os::unix::fs::symlink;
2827
2828 let temp = tempfile::TempDir::new().expect("tempdir");
2829 let data_dir = temp.path().join("cass-data");
2830 let real_source = temp.path().join("real.jsonl");
2831 let symlink_source = temp.path().join("link.jsonl");
2832 fs::write(&real_source, b"secret session").expect("write source");
2833 symlink(&real_source, &symlink_source).expect("symlink");
2834
2835 let err = capture_source_file(RawMirrorCaptureInput {
2836 data_dir: &data_dir,
2837 provider: "codex",
2838 source_id: "local",
2839 origin_kind: "local",
2840 origin_host: None,
2841 source_path: &symlink_source,
2842 db_links: &[],
2843 })
2844 .expect_err("symlink source should be rejected");
2845 assert!(
2846 err.to_string().contains("symlink source"),
2847 "unexpected error: {err:#}"
2848 );
2849 }
2850}