1use anyhow::{Context, Result, anyhow};
2use serde::{Deserialize, Serialize};
3use serde_json::{Value, json};
4use std::collections::{HashMap, HashSet};
5use std::fs::{self, File, OpenOptions};
6use std::io::{Read, Write};
7use std::path::{Component, Path, PathBuf};
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::{Mutex, OnceLock};
10use std::time::{Duration, SystemTime, UNIX_EPOCH};
11
12const RAW_MIRROR_SCHEMA_VERSION: u32 = 1;
13const RAW_MIRROR_ROOT_DIR: &str = "raw-mirror";
14const RAW_MIRROR_VERSION_DIR: &str = "v1";
15const RAW_MIRROR_MANIFEST_KIND: &str = "cass_raw_session_mirror_v1";
16const RAW_MIRROR_HASH_ALGORITHM: &str = "blake3";
17const RAW_MIRROR_BLOB_EXTENSION: &str = "raw";
18
19static TEMP_NONCE: AtomicU64 = AtomicU64::new(0);
20static BLOB_CAPTURE_CACHE: OnceLock<Mutex<HashMap<RawMirrorBlobCacheKey, RawMirrorBlobRecord>>> =
21 OnceLock::new();
22static MANIFEST_UPDATE_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
23
24fn raw_mirror_fsync_enabled() -> bool {
25 dotenvy::var("CASS_RAW_MIRROR_FSYNC")
26 .ok()
27 .is_some_and(|value| matches!(value.trim(), "1" | "true" | "TRUE" | "yes" | "YES"))
28}
29
30#[derive(Debug, Clone)]
31pub struct RawMirrorCaptureInput<'a> {
32 pub data_dir: &'a Path,
33 pub provider: &'a str,
34 pub source_id: &'a str,
35 pub origin_kind: &'a str,
36 pub origin_host: Option<&'a str>,
37 pub source_path: &'a Path,
38 pub db_links: &'a [RawMirrorDbLink],
39}
40
41#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
42pub struct RawMirrorCaptureRecord {
43 pub manifest_id: String,
44 pub manifest_relative_path: String,
45 pub blob_relative_path: String,
46 pub blob_blake3: String,
47 pub blob_size_bytes: u64,
48 pub captured_at_ms: i64,
49 pub source_mtime_ms: Option<i64>,
50 pub already_present: bool,
51}
52
53#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
54pub struct RawMirrorDbLink {
55 pub conversation_id: Option<i64>,
56 pub message_count: Option<usize>,
57 pub source_path: Option<String>,
58 pub started_at_ms: Option<i64>,
59}
60
61#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
62pub struct RawMirrorStorageSummary {
63 pub initialized: bool,
64 pub root_path: String,
65 pub total_storage_bytes: u64,
66 pub manifest_count: u64,
67 pub manifest_bytes: u64,
68 pub unique_blob_count: u64,
69 pub total_blob_bytes: u64,
70 pub largest_blob_bytes: u64,
71 pub missing_blob_count: u64,
72 pub invalid_manifest_count: u64,
73 pub oldest_capture_at_ms: Option<i64>,
74 pub newest_capture_at_ms: Option<i64>,
75 pub oldest_source_mtime_ms: Option<i64>,
76 pub newest_source_mtime_ms: Option<i64>,
77}
78
79pub fn storage_summary(data_dir: &Path) -> RawMirrorStorageSummary {
80 let root = raw_mirror_root(data_dir);
81 let mut summary = RawMirrorStorageSummary {
82 root_path: root.display().to_string(),
83 ..RawMirrorStorageSummary::default()
84 };
85 let root_metadata = match fs::symlink_metadata(&root) {
86 Ok(metadata) => metadata,
87 Err(_) => return summary,
88 };
89 summary.initialized = true;
90 if root_metadata.file_type().is_symlink() || !root_metadata.is_dir() {
91 summary.invalid_manifest_count = 1;
92 return summary;
93 }
94
95 summary.total_storage_bytes = raw_mirror_dir_file_bytes(&root);
96
97 let manifests_dir = root.join("manifests");
98 let Ok(manifests_metadata) = fs::symlink_metadata(&manifests_dir) else {
99 return summary;
100 };
101 if manifests_metadata.file_type().is_symlink() || !manifests_metadata.is_dir() {
102 summary.invalid_manifest_count = summary.invalid_manifest_count.saturating_add(1);
103 return summary;
104 }
105 let entries = match fs::read_dir(&manifests_dir) {
106 Ok(entries) => entries,
107 Err(_) => return summary,
108 };
109 let mut seen_blobs = HashSet::new();
110 for entry in entries {
111 let Ok(entry) = entry else {
112 summary.invalid_manifest_count = summary.invalid_manifest_count.saturating_add(1);
113 continue;
114 };
115 let path = entry.path();
116 let manifest_metadata = match fs::symlink_metadata(&path) {
117 Ok(metadata) if metadata.is_file() && !metadata.file_type().is_symlink() => metadata,
118 _ => {
119 summary.invalid_manifest_count = summary.invalid_manifest_count.saturating_add(1);
120 continue;
121 }
122 };
123 summary.manifest_bytes = summary
124 .manifest_bytes
125 .saturating_add(manifest_metadata.len());
126 let manifest = match read_raw_mirror_manifest(&path) {
127 Ok(manifest) if manifest.manifest_kind == RAW_MIRROR_MANIFEST_KIND => manifest,
128 _ => {
129 summary.invalid_manifest_count = summary.invalid_manifest_count.saturating_add(1);
130 continue;
131 }
132 };
133 summary.manifest_count = summary.manifest_count.saturating_add(1);
134 merge_min_max(
135 &mut summary.oldest_capture_at_ms,
136 &mut summary.newest_capture_at_ms,
137 Some(manifest.captured_at_ms),
138 );
139 merge_min_max(
140 &mut summary.oldest_source_mtime_ms,
141 &mut summary.newest_source_mtime_ms,
142 manifest.source_mtime_ms,
143 );
144
145 let Some(blob_relative_path) = raw_mirror_blob_relative_path(&manifest.blob_blake3) else {
146 summary.invalid_manifest_count = summary.invalid_manifest_count.saturating_add(1);
147 continue;
148 };
149 if manifest.blob_relative_path != blob_relative_path {
150 summary.invalid_manifest_count = summary.invalid_manifest_count.saturating_add(1);
151 continue;
152 }
153
154 if !seen_blobs.insert(blob_relative_path.clone()) {
155 continue;
156 }
157 let blob_path = root.join(blob_relative_path);
158 match fs::symlink_metadata(&blob_path) {
159 Ok(metadata) if metadata.is_file() && !metadata.file_type().is_symlink() => {
160 let size = metadata.len();
161 summary.unique_blob_count = summary.unique_blob_count.saturating_add(1);
162 summary.total_blob_bytes = summary.total_blob_bytes.saturating_add(size);
163 summary.largest_blob_bytes = summary.largest_blob_bytes.max(size);
164 }
165 _ => {
166 summary.missing_blob_count = summary.missing_blob_count.saturating_add(1);
167 }
168 }
169 }
170
171 summary
172}
173
174#[derive(Debug, Clone, Default)]
175pub struct RawMirrorPruneOptions {
176 pub older_than_ms: Option<i64>,
177 pub max_size_bytes: Option<u64>,
178 pub keep_tags: Vec<String>,
179 pub safety_hold_down_ms: i64,
180 pub apply: bool,
181}
182
183#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
184pub struct RawMirrorPruneReport {
185 pub initialized: bool,
186 pub root_path: String,
187 pub mode: String,
188 pub manifest_count: u64,
189 pub unique_blob_count: u64,
190 pub current_blob_bytes: u64,
191 pub safety_hold_down_ms: i64,
192 pub keep_tags: Vec<String>,
193 pub pinned_manifest_count: u64,
194 pub pinned_blob_count: u64,
195 pub planned_manifest_count: u64,
196 pub planned_blob_count: u64,
197 pub planned_reclaim_bytes: u64,
198 pub applied_manifest_count: u64,
199 pub applied_blob_count: u64,
200 pub applied_reclaim_bytes: u64,
201 pub audit_log_path: Option<String>,
202 pub entries: Vec<RawMirrorPruneEntry>,
203}
204
205#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
206pub struct RawMirrorPruneEntry {
207 pub kind: String,
208 pub path: String,
209 pub blob_blake3: Option<String>,
210 pub size_bytes: u64,
211 pub reason: String,
212 pub applied: bool,
213}
214
215#[derive(Debug, Clone)]
216struct RawMirrorPruneManifest {
217 manifest_id: String,
218 relative_path: String,
219 size_bytes: u64,
220 blob_blake3: String,
221 blob_relative_path: String,
222 blob_size_bytes: u64,
223 captured_at_ms: i64,
224 provider: String,
225 original_path: String,
226 db_links: Vec<RawMirrorDbLink>,
227}
228
229pub fn prune(data_dir: &Path, options: RawMirrorPruneOptions) -> Result<RawMirrorPruneReport> {
230 let root = raw_mirror_root(data_dir);
231 let mut report = RawMirrorPruneReport {
232 initialized: false,
233 root_path: root.display().to_string(),
234 mode: if options.apply {
235 "apply".to_string()
236 } else {
237 "dry-run".to_string()
238 },
239 manifest_count: 0,
240 unique_blob_count: 0,
241 current_blob_bytes: 0,
242 safety_hold_down_ms: options.safety_hold_down_ms,
243 keep_tags: options.keep_tags.clone(),
244 pinned_manifest_count: 0,
245 pinned_blob_count: 0,
246 planned_manifest_count: 0,
247 planned_blob_count: 0,
248 planned_reclaim_bytes: 0,
249 applied_manifest_count: 0,
250 applied_blob_count: 0,
251 applied_reclaim_bytes: 0,
252 audit_log_path: None,
253 entries: Vec::new(),
254 };
255
256 let metadata = match fs::symlink_metadata(&root) {
257 Ok(metadata) => metadata,
258 Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(report),
259 Err(err) => {
260 return Err(err).with_context(|| format!("stat raw mirror root {}", root.display()));
261 }
262 };
263 if metadata.file_type().is_symlink() || !metadata.is_dir() {
264 anyhow::bail!(
265 "refusing to prune invalid raw mirror root {}",
266 root.display()
267 );
268 }
269 report.initialized = true;
270
271 let manifests = collect_prune_manifests(&root)?;
272 report.manifest_count = manifests.len() as u64;
273
274 let mut blob_to_manifests: HashMap<String, Vec<String>> = HashMap::new();
275 let mut manifest_by_id: HashMap<String, &RawMirrorPruneManifest> = HashMap::new();
276 let mut blob_size_by_relative: HashMap<String, u64> = HashMap::new();
277 for manifest in &manifests {
278 manifest_by_id.insert(manifest.manifest_id.clone(), manifest);
279 blob_to_manifests
280 .entry(manifest.blob_relative_path.clone())
281 .or_default()
282 .push(manifest.manifest_id.clone());
283 blob_size_by_relative
284 .entry(manifest.blob_relative_path.clone())
285 .or_insert_with(|| {
286 blob_file_size(&root.join(&manifest.blob_relative_path))
287 .unwrap_or(manifest.blob_size_bytes)
288 });
289 }
290 report.unique_blob_count = blob_size_by_relative.len() as u64;
291 report.current_blob_bytes = blob_size_by_relative
292 .values()
293 .copied()
294 .fold(0u64, u64::saturating_add);
295
296 let now = now_ms();
297 let pinned_manifests = pinned_prune_manifest_ids(
298 data_dir,
299 &manifests,
300 &options.keep_tags,
301 options.safety_hold_down_ms,
302 now,
303 )?;
304 report.pinned_manifest_count = pinned_manifests.len() as u64;
305 let pinned_blobs: HashSet<String> = blob_to_manifests
306 .iter()
307 .filter(|(_, manifest_ids)| manifest_ids.iter().any(|id| pinned_manifests.contains(id)))
308 .map(|(blob_relative_path, _)| blob_relative_path.clone())
309 .collect();
310 report.pinned_blob_count = pinned_blobs.len() as u64;
311
312 let mut selected_manifests: HashSet<String> = HashSet::new();
313 let mut manifest_reasons: HashMap<String, String> = HashMap::new();
314
315 if let Some(older_than_ms) = options.older_than_ms {
316 let cutoff_ms = now.saturating_sub(older_than_ms.max(0));
317 for manifest in &manifests {
318 if manifest.captured_at_ms <= cutoff_ms
319 && !pinned_manifests.contains(&manifest.manifest_id)
320 {
321 selected_manifests.insert(manifest.manifest_id.clone());
322 manifest_reasons
323 .entry(manifest.manifest_id.clone())
324 .or_insert_with(|| format!("captured_at_ms <= {cutoff_ms}"));
325 }
326 }
327 }
328
329 if let Some(max_size_bytes) = options.max_size_bytes
330 && report.current_blob_bytes > max_size_bytes
331 {
332 let mut blob_groups: Vec<_> = blob_to_manifests
333 .iter()
334 .map(|(blob_relative_path, manifest_ids)| {
335 let oldest_capture = manifest_ids
336 .iter()
337 .filter_map(|id| manifest_by_id.get(id).map(|m| m.captured_at_ms))
338 .min()
339 .unwrap_or(i64::MAX);
340 let size = blob_size_by_relative
341 .get(blob_relative_path)
342 .copied()
343 .unwrap_or(0);
344 (
345 blob_relative_path.clone(),
346 manifest_ids.clone(),
347 oldest_capture,
348 size,
349 )
350 })
351 .collect::<Vec<_>>();
352 blob_groups.sort_by(|left, right| left.2.cmp(&right.2).then_with(|| left.0.cmp(&right.0)));
353
354 let mut projected_bytes = report.current_blob_bytes;
355 for (blob_relative_path, manifest_ids, _, size) in blob_groups {
356 if projected_bytes <= max_size_bytes {
357 break;
358 }
359 if pinned_blobs.contains(&blob_relative_path) {
360 continue;
361 }
362 for manifest_id in manifest_ids {
363 if !pinned_manifests.contains(&manifest_id) {
364 selected_manifests.insert(manifest_id.clone());
365 manifest_reasons.entry(manifest_id).or_insert_with(|| {
366 format!("max-size over budget; retiring blob {blob_relative_path}")
367 });
368 }
369 }
370 projected_bytes = projected_bytes.saturating_sub(size);
371 }
372 }
373
374 let selected_blobs: HashSet<String> = blob_to_manifests
375 .iter()
376 .filter(|(_, manifest_ids)| {
377 manifest_ids
378 .iter()
379 .all(|id| selected_manifests.contains(id))
380 })
381 .map(|(blob_relative_path, _)| blob_relative_path.clone())
382 .collect();
383
384 let mut entries = Vec::new();
385 let mut selected_manifest_ids = selected_manifests.into_iter().collect::<Vec<_>>();
386 selected_manifest_ids.sort();
387 for manifest_id in selected_manifest_ids {
388 let Some(manifest) = manifest_by_id.get(&manifest_id) else {
389 continue;
390 };
391 let reason = manifest_reasons
392 .remove(&manifest_id)
393 .unwrap_or_else(|| "selected by retention policy".to_string());
394 entries.push(RawMirrorPruneEntry {
395 kind: "manifest".to_string(),
396 path: manifest.relative_path.clone(),
397 blob_blake3: Some(manifest.blob_blake3.clone()),
398 size_bytes: manifest.size_bytes,
399 reason,
400 applied: false,
401 });
402 }
403
404 let mut selected_blob_paths = selected_blobs.into_iter().collect::<Vec<_>>();
405 selected_blob_paths.sort();
406 for blob_relative_path in selected_blob_paths {
407 let size = blob_size_by_relative
408 .get(&blob_relative_path)
409 .copied()
410 .unwrap_or(0);
411 let blob_blake3 = blob_relative_path
412 .rsplit('/')
413 .next()
414 .and_then(|name| name.strip_suffix(".raw"))
415 .map(ToOwned::to_owned);
416 entries.push(RawMirrorPruneEntry {
417 kind: "blob".to_string(),
418 path: blob_relative_path,
419 blob_blake3,
420 size_bytes: size,
421 reason: "no retained manifest references this blob after prune plan".to_string(),
422 applied: false,
423 });
424 }
425
426 report.planned_manifest_count = entries
427 .iter()
428 .filter(|entry| entry.kind == "manifest")
429 .count() as u64;
430 report.planned_blob_count = entries.iter().filter(|entry| entry.kind == "blob").count() as u64;
431 report.planned_reclaim_bytes = entries
432 .iter()
433 .map(|entry| entry.size_bytes)
434 .fold(0, u64::saturating_add);
435
436 if options.apply {
437 for entry in &mut entries {
438 let path = root.join(&entry.path);
439 let removed = remove_prune_target_file(&path)
440 .with_context(|| format!("applying raw mirror prune for {}", path.display()))?;
441 entry.applied = removed;
442 if removed {
443 if entry.kind == "manifest" {
444 report.applied_manifest_count = report.applied_manifest_count.saturating_add(1);
445 } else if entry.kind == "blob" {
446 report.applied_blob_count = report.applied_blob_count.saturating_add(1);
447 }
448 report.applied_reclaim_bytes = report
449 .applied_reclaim_bytes
450 .saturating_add(entry.size_bytes);
451 }
452 }
453 }
454
455 report.entries = entries;
456 if !report.entries.is_empty() {
457 let audit_path = append_prune_audit_log(&root, &report)?;
458 report.audit_log_path = Some(audit_path.display().to_string());
459 }
460 Ok(report)
461}
462
463fn collect_prune_manifests(root: &Path) -> Result<Vec<RawMirrorPruneManifest>> {
464 let manifests_dir = root.join("manifests");
465 let metadata = match fs::symlink_metadata(&manifests_dir) {
466 Ok(metadata) => metadata,
467 Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
468 Err(err) => return Err(err).with_context(|| format!("stat {}", manifests_dir.display())),
469 };
470 if metadata.file_type().is_symlink() || !metadata.is_dir() {
471 anyhow::bail!(
472 "refusing to prune invalid raw mirror manifests directory {}",
473 manifests_dir.display()
474 );
475 }
476
477 let mut manifests = Vec::new();
478 for entry in
479 fs::read_dir(&manifests_dir).with_context(|| format!("read {}", manifests_dir.display()))?
480 {
481 let entry = entry?;
482 let path = entry.path();
483 if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
484 continue;
485 }
486 let manifest_metadata = fs::symlink_metadata(&path)
487 .with_context(|| format!("stat raw mirror manifest {}", path.display()))?;
488 if manifest_metadata.file_type().is_symlink() || !manifest_metadata.is_file() {
489 anyhow::bail!(
490 "refusing to prune with non-regular raw mirror manifest {}",
491 path.display()
492 );
493 }
494 let manifest = read_raw_mirror_manifest(&path)?;
495 if manifest.manifest_kind != RAW_MIRROR_MANIFEST_KIND {
496 anyhow::bail!(
497 "refusing to prune with unexpected raw mirror manifest kind `{}` in {}",
498 manifest.manifest_kind,
499 path.display()
500 );
501 }
502 let Some(expected_blob_relative_path) =
503 raw_mirror_blob_relative_path(&manifest.blob_blake3)
504 else {
505 anyhow::bail!(
506 "refusing to prune raw mirror manifest {} with invalid blob hash",
507 path.display()
508 );
509 };
510 if manifest.blob_relative_path != expected_blob_relative_path {
511 anyhow::bail!(
512 "refusing to prune raw mirror manifest {} with unexpected blob path `{}`",
513 path.display(),
514 manifest.blob_relative_path
515 );
516 }
517 let relative_path = path
518 .strip_prefix(root)
519 .unwrap_or(&path)
520 .display()
521 .to_string();
522 manifests.push(RawMirrorPruneManifest {
523 manifest_id: manifest.manifest_id,
524 relative_path,
525 size_bytes: manifest_metadata.len(),
526 blob_blake3: manifest.blob_blake3,
527 blob_relative_path: manifest.blob_relative_path,
528 blob_size_bytes: manifest.blob_size_bytes,
529 captured_at_ms: manifest.captured_at_ms,
530 provider: manifest.provider,
531 original_path: manifest.original_path,
532 db_links: manifest.db_links,
533 });
534 }
535 manifests.sort_by(|left, right| {
536 left.captured_at_ms
537 .cmp(&right.captured_at_ms)
538 .then_with(|| left.provider.cmp(&right.provider))
539 .then_with(|| left.original_path.cmp(&right.original_path))
540 .then_with(|| left.manifest_id.cmp(&right.manifest_id))
541 });
542 Ok(manifests)
543}
544
545fn pinned_prune_manifest_ids(
546 data_dir: &Path,
547 manifests: &[RawMirrorPruneManifest],
548 keep_tags: &[String],
549 safety_hold_down_ms: i64,
550 now_ms: i64,
551) -> Result<HashSet<String>> {
552 let mut pinned = HashSet::new();
553 if safety_hold_down_ms > 0 {
554 let cutoff_ms = now_ms.saturating_sub(safety_hold_down_ms);
555 for manifest in manifests {
556 if manifest.captured_at_ms > cutoff_ms {
557 pinned.insert(manifest.manifest_id.clone());
558 }
559 }
560 }
561
562 let normalized_keep_tags = keep_tags
563 .iter()
564 .map(|tag| tag.trim())
565 .filter(|tag| !tag.is_empty())
566 .map(ToOwned::to_owned)
567 .collect::<Vec<_>>();
568 if normalized_keep_tags.is_empty() {
569 return Ok(pinned);
570 }
571
572 let keep_tag_conversation_ids =
573 load_keep_tag_conversation_ids(data_dir, manifests, &normalized_keep_tags)?;
574 for manifest in manifests {
575 if manifest.db_links.iter().any(|link| {
576 link.conversation_id
577 .is_some_and(|id| keep_tag_conversation_ids.contains(&id))
578 }) {
579 pinned.insert(manifest.manifest_id.clone());
580 }
581 }
582 Ok(pinned)
583}
584
585fn load_keep_tag_conversation_ids(
586 data_dir: &Path,
587 manifests: &[RawMirrorPruneManifest],
588 keep_tags: &[String],
589) -> Result<HashSet<i64>> {
590 use frankensqlite::compat::{ConnectionExt as _, ParamValue, RowExt as _};
591
592 let mut conversation_ids = manifests
593 .iter()
594 .flat_map(|manifest| manifest.db_links.iter())
595 .filter_map(|link| link.conversation_id)
596 .collect::<Vec<_>>();
597 conversation_ids.sort_unstable();
598 conversation_ids.dedup();
599 if conversation_ids.is_empty() {
600 return Ok(HashSet::new());
601 }
602
603 let db_path = data_dir.join("agent_search.db");
604 let conn = crate::storage::sqlite::open_franken_raw_readonly_connection_with_timeout(
605 &db_path,
606 Duration::from_secs(30),
607 )
608 .with_context(|| {
609 format!(
610 "open {} to honor raw-mirror prune --keep-tag",
611 db_path.display()
612 )
613 })?;
614 let _ = conn.execute("PRAGMA query_only = 1;");
615
616 let mut pinned = HashSet::new();
617 for id_chunk in conversation_ids.chunks(400) {
618 let tag_placeholders = (0..keep_tags.len())
619 .map(|idx| format!("?{}", idx + 1))
620 .collect::<Vec<_>>()
621 .join(", ");
622 let id_offset = keep_tags.len();
623 let id_placeholders = (0..id_chunk.len())
624 .map(|idx| format!("?{}", id_offset + idx + 1))
625 .collect::<Vec<_>>()
626 .join(", ");
627 let sql = format!(
628 "SELECT DISTINCT ct.conversation_id \
629 FROM conversation_tags ct \
630 JOIN tags t ON t.id = ct.tag_id \
631 WHERE t.name IN ({tag_placeholders}) \
632 AND ct.conversation_id IN ({id_placeholders})"
633 );
634 let mut params = keep_tags
635 .iter()
636 .map(|tag| ParamValue::from(tag.as_str()))
637 .collect::<Vec<_>>();
638 params.extend(id_chunk.iter().copied().map(ParamValue::from));
639 let rows: Vec<i64> = conn
640 .query_map_collect(&sql, ¶ms, |row: &frankensqlite::Row| row.get_typed(0))
641 .with_context(|| "query raw-mirror prune keep-tag conversation pins")?;
642 pinned.extend(rows);
643 }
644
645 Ok(pinned)
646}
647
648fn blob_file_size(path: &Path) -> Option<u64> {
649 fs::symlink_metadata(path)
650 .ok()
651 .filter(|metadata| metadata.is_file() && !metadata.file_type().is_symlink())
652 .map(|metadata| metadata.len())
653}
654
655fn remove_prune_target_file(path: &Path) -> Result<bool> {
656 let metadata = match fs::symlink_metadata(path) {
657 Ok(metadata) => metadata,
658 Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(false),
659 Err(err) => return Err(err).with_context(|| format!("stat {}", path.display())),
660 };
661 if metadata.file_type().is_symlink() || !metadata.is_file() {
662 anyhow::bail!(
663 "refusing to prune non-regular raw mirror file {}",
664 path.display()
665 );
666 }
667 fs::remove_file(path).with_context(|| format!("remove raw mirror file {}", path.display()))?;
668 sync_parent(path)?;
669 Ok(true)
670}
671
672fn append_prune_audit_log(root: &Path, report: &RawMirrorPruneReport) -> Result<PathBuf> {
673 ensure_private_dir(root)?;
674 let audit_path = root.join("pruned.jsonl");
675 ensure_prune_audit_log_appendable(&audit_path)?;
676 let mut file = OpenOptions::new()
677 .create(true)
678 .append(true)
679 .open(&audit_path)
680 .with_context(|| format!("open raw mirror prune audit {}", audit_path.display()))?;
681 set_private_file_permissions(&audit_path)?;
682 let now = now_ms();
683 for entry in &report.entries {
684 let record = json!({
685 "schema_version": 1,
686 "recorded_at_ms": now,
687 "mode": report.mode,
688 "kind": entry.kind,
689 "path": entry.path,
690 "blob_blake3": entry.blob_blake3,
691 "size_bytes": entry.size_bytes,
692 "reason": entry.reason,
693 "applied": entry.applied,
694 });
695 writeln!(file, "{record}")
696 .with_context(|| format!("write raw mirror prune audit {}", audit_path.display()))?;
697 }
698 sync_open_file_if_required(&file, || {
699 format!("sync raw mirror prune audit {}", audit_path.display())
700 })?;
701 sync_parent(&audit_path)?;
702 Ok(audit_path)
703}
704
705fn ensure_prune_audit_log_appendable(path: &Path) -> Result<()> {
706 match fs::symlink_metadata(path) {
707 Ok(metadata) if metadata.file_type().is_symlink() => {
708 anyhow::bail!(
709 "refusing to append raw mirror prune audit through symlink {}",
710 path.display()
711 );
712 }
713 Ok(metadata) if !metadata.is_file() => {
714 anyhow::bail!(
715 "refusing to append raw mirror prune audit to non-file {}",
716 path.display()
717 );
718 }
719 Ok(_) => Ok(()),
720 Err(err) if matches!(err.kind(), std::io::ErrorKind::NotFound) => Ok(()),
721 Err(err) => Err(err).with_context(|| {
722 format!(
723 "inspect raw mirror prune audit before append {}",
724 path.display()
725 )
726 }),
727 }
728}
729
730fn merge_min_max(min: &mut Option<i64>, max: &mut Option<i64>, value: Option<i64>) {
731 let Some(value) = value else {
732 return;
733 };
734 *min = Some(min.map_or(value, |current| current.min(value)));
735 *max = Some(max.map_or(value, |current| current.max(value)));
736}
737
738fn raw_mirror_dir_file_bytes(root: &Path) -> u64 {
739 let mut total = 0u64;
740 let mut stack = vec![root.to_path_buf()];
741 while let Some(path) = stack.pop() {
742 let Ok(metadata) = fs::symlink_metadata(&path) else {
743 continue;
744 };
745 if metadata.file_type().is_symlink() {
746 continue;
747 }
748 if metadata.is_file() {
749 total = total.saturating_add(metadata.len());
750 } else if metadata.is_dir() {
751 let Ok(entries) = fs::read_dir(&path) else {
752 continue;
753 };
754 for entry in entries.flatten() {
755 stack.push(entry.path());
756 }
757 }
758 }
759 total
760}
761
762#[derive(Debug, Clone, PartialEq, Eq, Hash)]
763struct RawMirrorBlobCacheKey {
764 data_dir: PathBuf,
765 source_path: PathBuf,
766 source_identity: Option<String>,
767 source_size_bytes: u64,
768 source_mtime_ns: Option<u128>,
769 source_change_time_ns: Option<u128>,
770}
771
772#[derive(Debug, Clone, PartialEq, Eq)]
773struct RawMirrorBlobRecord {
774 blob_blake3: String,
775 bytes_copied: u64,
776}
777
778#[derive(Debug, Clone, Serialize, Deserialize)]
779struct RawMirrorCompressionEnvelope {
780 state: String,
781 algorithm: Option<String>,
782 uncompressed_size_bytes: Option<u64>,
783}
784
785#[derive(Debug, Clone, Serialize, Deserialize)]
786struct RawMirrorEncryptionEnvelope {
787 state: String,
788 algorithm: Option<String>,
789 key_id: Option<String>,
790 envelope_version: Option<u32>,
791}
792
793#[derive(Debug, Clone, Serialize, Deserialize)]
794struct RawMirrorVerificationRecord {
795 status: String,
796 verifier: String,
797 content_blake3: Option<String>,
798 verified_at_ms: Option<i64>,
799}
800
801#[derive(Debug, Clone, Serialize, Deserialize)]
802struct RawMirrorManifestFile {
803 schema_version: u32,
804 manifest_kind: String,
805 manifest_id: String,
806 blob_hash_algorithm: String,
807 blob_relative_path: String,
808 blob_blake3: String,
809 blob_size_bytes: u64,
810 provider: String,
811 source_id: String,
812 origin_kind: String,
813 origin_host: Option<String>,
814 original_path: String,
815 redacted_original_path: String,
816 original_path_blake3: String,
817 captured_at_ms: i64,
818 source_mtime_ms: Option<i64>,
819 source_size_bytes: u64,
820 compression: RawMirrorCompressionEnvelope,
821 encryption: RawMirrorEncryptionEnvelope,
822 db_links: Vec<RawMirrorDbLink>,
823 verification: RawMirrorVerificationRecord,
824 manifest_blake3: Option<String>,
825}
826
827pub fn capture_source_file(input: RawMirrorCaptureInput<'_>) -> Result<RawMirrorCaptureRecord> {
828 let source_metadata = fs::symlink_metadata(input.source_path)
829 .with_context(|| format!("stat raw mirror source {}", input.source_path.display()))?;
830 if source_metadata.file_type().is_symlink() {
831 return Err(anyhow!(
832 "refusing to raw-mirror symlink source {}",
833 input.source_path.display()
834 ));
835 }
836 if !source_metadata.is_file() {
837 return Err(anyhow!(
838 "refusing to raw-mirror non-file source {}",
839 input.source_path.display()
840 ));
841 }
842
843 let root = ensure_raw_mirror_root(input.data_dir)?;
844 ensure_private_dir_descendant(&root, &root.join("tmp"))?;
845
846 let cache_key = raw_mirror_blob_cache_key(&input, &source_metadata);
847 let (blob_blake3, bytes_copied, blob_already_present) =
848 match cached_raw_mirror_blob_record(&cache_key, &root) {
849 Some(record) => (record.blob_blake3, record.bytes_copied, true),
850 None => {
851 let temp_dir = unique_capture_temp_dir(&root);
852 ensure_private_dir_descendant(&root, &temp_dir)?;
853 let CopyToTempResult {
854 temp_path,
855 blob_blake3,
856 bytes_copied,
857 } = copy_source_to_private_temp(input.source_path, &temp_dir, &source_metadata)?;
858 let blob_relative_path = raw_mirror_blob_relative_path(&blob_blake3)
859 .ok_or_else(|| anyhow!("computed invalid raw mirror blake3 digest"))?;
860 let blob_path = root.join(&blob_relative_path);
861 let already_present =
862 publish_content_addressed_temp(&root, &temp_path, &blob_path, &blob_blake3)?;
863 remove_empty_temp_dir_best_effort(&temp_dir);
864 cache_raw_mirror_blob_record(
865 cache_key.clone(),
866 RawMirrorBlobRecord {
867 blob_blake3: blob_blake3.clone(),
868 bytes_copied,
869 },
870 );
871 (blob_blake3, bytes_copied, already_present)
872 }
873 };
874 let blob_relative_path = raw_mirror_blob_relative_path(&blob_blake3)
875 .ok_or_else(|| anyhow!("computed invalid raw mirror blake3 digest"))?;
876
877 let original_path = input.source_path.display().to_string();
878 let original_path_blake3 = raw_mirror_original_path_blake3(&original_path);
879 let manifest_id = raw_mirror_manifest_id(
880 input.provider,
881 input.source_id,
882 input.origin_kind,
883 input.origin_host,
884 &original_path_blake3,
885 &blob_blake3,
886 );
887 let manifest_relative_path = raw_mirror_manifest_relative_path(&manifest_id);
888 let manifest_path = root.join(&manifest_relative_path);
889 let captured_at_ms = now_ms();
890 let source_mtime_ms = source_metadata.modified().ok().and_then(system_time_to_ms);
891 let mut manifest = RawMirrorManifestFile {
892 schema_version: RAW_MIRROR_SCHEMA_VERSION,
893 manifest_kind: RAW_MIRROR_MANIFEST_KIND.to_string(),
894 manifest_id: manifest_id.clone(),
895 blob_hash_algorithm: RAW_MIRROR_HASH_ALGORITHM.to_string(),
896 blob_relative_path: blob_relative_path.clone(),
897 blob_blake3: blob_blake3.clone(),
898 blob_size_bytes: bytes_copied,
899 provider: input.provider.to_string(),
900 source_id: input.source_id.to_string(),
901 origin_kind: input.origin_kind.to_string(),
902 origin_host: input.origin_host.map(ToOwned::to_owned),
903 original_path,
904 redacted_original_path: redacted_original_path(input.provider, input.source_path),
905 original_path_blake3,
906 captured_at_ms,
907 source_mtime_ms,
908 source_size_bytes: source_metadata.len(),
909 compression: RawMirrorCompressionEnvelope {
910 state: "none".to_string(),
911 algorithm: None,
912 uncompressed_size_bytes: Some(bytes_copied),
913 },
914 encryption: RawMirrorEncryptionEnvelope {
915 state: "none".to_string(),
916 algorithm: None,
917 key_id: None,
918 envelope_version: None,
919 },
920 db_links: unique_db_links(input.db_links),
921 verification: RawMirrorVerificationRecord {
922 status: "captured".to_string(),
923 verifier: "cass_indexer".to_string(),
924 content_blake3: Some(blob_blake3.clone()),
925 verified_at_ms: Some(captured_at_ms),
926 },
927 manifest_blake3: None,
928 };
929 manifest.manifest_blake3 = Some(raw_mirror_manifest_blake3(&manifest));
930 let manifest_bytes = serde_json::to_vec_pretty(&manifest)?;
931 let manifest_already_present =
932 publish_manifest_bytes_create_new(&root, &manifest_path, &manifest_bytes, &blob_blake3)?;
933 let (record_blob_size_bytes, record_captured_at_ms, record_source_mtime_ms) =
934 if manifest_already_present {
935 merge_raw_mirror_manifest_db_links(
936 &root,
937 &manifest_path,
938 input.db_links,
939 Some(&blob_blake3),
940 )?;
941 let published = read_raw_mirror_manifest(&manifest_path)?;
942 (
943 published.blob_size_bytes,
944 published.captured_at_ms,
945 published.source_mtime_ms,
946 )
947 } else {
948 (bytes_copied, captured_at_ms, source_mtime_ms)
949 };
950
951 Ok(RawMirrorCaptureRecord {
952 manifest_id,
953 manifest_relative_path,
954 blob_relative_path,
955 blob_blake3,
956 blob_size_bytes: record_blob_size_bytes,
957 captured_at_ms: record_captured_at_ms,
958 source_mtime_ms: record_source_mtime_ms,
959 already_present: blob_already_present && manifest_already_present,
960 })
961}
962
963pub fn merge_manifest_db_links(
964 data_dir: &Path,
965 manifest_relative_path: &str,
966 links: &[RawMirrorDbLink],
967) -> Result<()> {
968 if links.is_empty() {
969 return Ok(());
970 }
971 let root = raw_mirror_root(data_dir);
972 let manifest_path = raw_mirror_manifest_path_from_relative(&root, manifest_relative_path)?;
973 merge_raw_mirror_manifest_db_links(&root, &manifest_path, links, None)
974}
975
976struct CopyToTempResult {
977 temp_path: PathBuf,
978 blob_blake3: String,
979 bytes_copied: u64,
980}
981
982fn copy_source_to_private_temp(
983 source_path: &Path,
984 temp_dir: &Path,
985 source_metadata: &fs::Metadata,
986) -> Result<CopyToTempResult> {
987 let temp_path = unique_temp_path(temp_dir, "blob");
988 let mut source = open_stable_source_file(source_path, source_metadata)?;
989 let mut temp = private_create_new_file(&temp_path)?;
990 let mut hasher = blake3::Hasher::new();
991 let mut buffer = [0u8; 64 * 1024];
992 let mut bytes_copied = 0u64;
993 loop {
994 let read = source
995 .read(&mut buffer)
996 .with_context(|| format!("read raw mirror source {}", source_path.display()))?;
997 if read == 0 {
998 break;
999 }
1000 temp.write_all(&buffer[..read])
1001 .with_context(|| format!("write raw mirror temp {}", temp_path.display()))?;
1002 hasher.update(&buffer[..read]);
1003 bytes_copied = bytes_copied.saturating_add(read as u64);
1004 }
1005 sync_open_file_if_required(&temp, || {
1006 format!("sync raw mirror temp {}", temp_path.display())
1007 })?;
1008
1009 let final_source_metadata = source
1010 .metadata()
1011 .with_context(|| format!("stat opened raw mirror source {}", source_path.display()))?;
1012 if source_file_changed_during_capture(source_metadata, &final_source_metadata) {
1013 remove_temp_best_effort(&temp_path);
1014 return Err(anyhow!(
1015 "raw mirror source {} changed while it was being captured; retry indexing to capture a stable copy",
1016 source_path.display()
1017 ));
1018 }
1019
1020 Ok(CopyToTempResult {
1021 temp_path,
1022 blob_blake3: hasher.finalize().to_hex().to_string(),
1023 bytes_copied,
1024 })
1025}
1026
1027fn open_stable_source_file(source_path: &Path, expected_metadata: &fs::Metadata) -> Result<File> {
1028 let source = File::open(source_path)
1029 .with_context(|| format!("open raw mirror source {}", source_path.display()))?;
1030 let opened_metadata = source
1031 .metadata()
1032 .with_context(|| format!("stat opened raw mirror source {}", source_path.display()))?;
1033 if !same_source_identity(expected_metadata, &opened_metadata) {
1034 return Err(anyhow!(
1035 "raw mirror source {} changed identity before capture",
1036 source_path.display()
1037 ));
1038 }
1039 let current_path_metadata = fs::symlink_metadata(source_path)
1040 .with_context(|| format!("restat raw mirror source {}", source_path.display()))?;
1041 if current_path_metadata.file_type().is_symlink() {
1042 return Err(anyhow!(
1043 "refusing to raw-mirror symlink source {}",
1044 source_path.display()
1045 ));
1046 }
1047 if !same_source_identity(expected_metadata, ¤t_path_metadata) {
1048 return Err(anyhow!(
1049 "raw mirror source {} changed identity before capture",
1050 source_path.display()
1051 ));
1052 }
1053 Ok(source)
1054}
1055
1056#[cfg(unix)]
1057fn same_source_identity(expected: &fs::Metadata, actual: &fs::Metadata) -> bool {
1058 use std::os::unix::fs::MetadataExt;
1059 actual.is_file() && expected.dev() == actual.dev() && expected.ino() == actual.ino()
1060}
1061
1062#[cfg(not(unix))]
1063fn same_source_identity(_expected: &fs::Metadata, actual: &fs::Metadata) -> bool {
1064 actual.is_file()
1065}
1066
1067#[cfg(unix)]
1068fn source_identity_token(metadata: &fs::Metadata) -> Option<String> {
1069 use std::os::unix::fs::MetadataExt;
1070 Some(format!("{}:{}", metadata.dev(), metadata.ino()))
1071}
1072
1073#[cfg(not(unix))]
1074fn source_identity_token(_metadata: &fs::Metadata) -> Option<String> {
1075 None
1076}
1077
1078#[cfg(unix)]
1079fn source_change_time_ns(metadata: &fs::Metadata) -> Option<u128> {
1080 use std::os::unix::fs::MetadataExt;
1081
1082 let seconds = u128::try_from(metadata.ctime()).ok()?;
1083 let nanoseconds = u128::try_from(metadata.ctime_nsec()).ok()?;
1084 Some(
1085 seconds
1086 .saturating_mul(1_000_000_000)
1087 .saturating_add(nanoseconds),
1088 )
1089}
1090
1091#[cfg(not(unix))]
1092fn source_change_time_ns(_metadata: &fs::Metadata) -> Option<u128> {
1093 None
1094}
1095
1096fn source_file_changed_during_capture(
1097 initial: &fs::Metadata,
1098 final_metadata: &fs::Metadata,
1099) -> bool {
1100 if initial.len() != final_metadata.len() {
1101 return true;
1102 }
1103 match (initial.modified().ok(), final_metadata.modified().ok()) {
1104 (Some(initial_mtime), Some(final_mtime)) => initial_mtime != final_mtime,
1105 _ => false,
1106 }
1107}
1108
1109fn publish_content_addressed_temp(
1110 root: &Path,
1111 temp_path: &Path,
1112 final_path: &Path,
1113 expected_blake3: &str,
1114) -> Result<bool> {
1115 ensure_private_dir_descendant(
1116 root,
1117 final_path
1118 .parent()
1119 .ok_or_else(|| anyhow!("raw mirror blob path has no parent"))?,
1120 )?;
1121 if final_path.exists() {
1122 verify_existing_file(final_path, expected_blake3)?;
1123 remove_temp_best_effort(temp_path);
1124 return Ok(true);
1125 }
1126
1127 match fs::hard_link(temp_path, final_path) {
1128 Ok(()) => {
1129 sync_file(final_path)?;
1130 sync_parent(final_path)?;
1131 remove_temp_best_effort(temp_path);
1132 Ok(false)
1133 }
1134 Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {
1135 verify_existing_file(final_path, expected_blake3)?;
1136 remove_temp_best_effort(temp_path);
1137 Ok(true)
1138 }
1139 Err(err) => Err(anyhow!(
1140 "publish raw mirror blob {} from {}: {err}",
1141 final_path.display(),
1142 temp_path.display()
1143 )),
1144 }
1145}
1146
1147fn publish_manifest_bytes_create_new(
1148 root: &Path,
1149 manifest_path: &Path,
1150 manifest_bytes: &[u8],
1151 blob_blake3: &str,
1152) -> Result<bool> {
1153 ensure_private_dir_descendant(
1154 root,
1155 manifest_path
1156 .parent()
1157 .ok_or_else(|| anyhow!("raw mirror manifest path has no parent"))?,
1158 )?;
1159 if manifest_path.exists() {
1160 verify_existing_manifest(manifest_path, blob_blake3)?;
1161 return Ok(true);
1162 }
1163
1164 let temp_dir = unique_capture_temp_dir(root);
1165 ensure_private_dir_descendant(root, &temp_dir)?;
1166 let temp_path = unique_temp_path(&temp_dir, "manifest");
1167 let mut temp = private_create_new_file(&temp_path)?;
1168 temp.write_all(manifest_bytes)
1169 .with_context(|| format!("write raw mirror manifest temp {}", temp_path.display()))?;
1170 sync_open_file_if_required(&temp, || {
1171 format!("sync raw mirror manifest temp {}", temp_path.display())
1172 })?;
1173
1174 match fs::hard_link(&temp_path, manifest_path) {
1175 Ok(()) => {
1176 sync_file(manifest_path)?;
1177 sync_parent(manifest_path)?;
1178 remove_temp_best_effort(&temp_path);
1179 remove_empty_temp_dir_best_effort(&temp_dir);
1180 Ok(false)
1181 }
1182 Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {
1183 verify_existing_manifest(manifest_path, blob_blake3)?;
1184 remove_temp_best_effort(&temp_path);
1185 remove_empty_temp_dir_best_effort(&temp_dir);
1186 Ok(true)
1187 }
1188 Err(err) => Err(anyhow!(
1189 "publish raw mirror manifest {} from {}: {err}",
1190 manifest_path.display(),
1191 temp_path.display()
1192 )),
1193 }
1194}
1195
1196fn merge_raw_mirror_manifest_db_links(
1197 root: &Path,
1198 manifest_path: &Path,
1199 links: &[RawMirrorDbLink],
1200 expected_blob_blake3: Option<&str>,
1201) -> Result<()> {
1202 if links.is_empty() {
1203 return Ok(());
1204 }
1205
1206 let lock = MANIFEST_UPDATE_LOCK.get_or_init(|| Mutex::new(()));
1207 let _guard = lock
1208 .lock()
1209 .map_err(|_| anyhow!("raw mirror manifest update lock poisoned"))?;
1210
1211 let mut manifest = read_raw_mirror_manifest(manifest_path)?;
1212 if let Some(expected_blob_blake3) = expected_blob_blake3
1213 && manifest.blob_blake3 != expected_blob_blake3
1214 {
1215 return Err(anyhow!(
1216 "existing raw mirror manifest {} points at blob {}, expected {}",
1217 manifest_path.display(),
1218 manifest.blob_blake3,
1219 expected_blob_blake3
1220 ));
1221 }
1222
1223 let mut merged_links = manifest.db_links.clone();
1224 merged_links.extend_from_slice(links);
1225 let merged_links = unique_db_links(&merged_links);
1226 if merged_links == manifest.db_links {
1227 return Ok(());
1228 }
1229
1230 manifest.db_links = merged_links;
1231 manifest.manifest_blake3 = Some(raw_mirror_manifest_blake3(&manifest));
1232 let manifest_bytes = serde_json::to_vec_pretty(&manifest)?;
1233 replace_manifest_bytes(root, manifest_path, &manifest_bytes)
1234}
1235
1236fn replace_manifest_bytes(root: &Path, manifest_path: &Path, manifest_bytes: &[u8]) -> Result<()> {
1237 ensure_private_dir_descendant(
1238 root,
1239 manifest_path
1240 .parent()
1241 .ok_or_else(|| anyhow!("raw mirror manifest path has no parent"))?,
1242 )?;
1243 let temp_dir = unique_capture_temp_dir(root);
1244 ensure_private_dir_descendant(root, &temp_dir)?;
1245 let temp_path = unique_temp_path(&temp_dir, "manifest-update");
1246 let mut temp = private_create_new_file(&temp_path)?;
1247 temp.write_all(manifest_bytes).with_context(|| {
1248 format!(
1249 "write raw mirror manifest update temp {}",
1250 temp_path.display()
1251 )
1252 })?;
1253 sync_open_file_if_required(&temp, || {
1254 format!(
1255 "sync raw mirror manifest update temp {}",
1256 temp_path.display()
1257 )
1258 })?;
1259 drop(temp);
1260
1261 fs::rename(&temp_path, manifest_path).with_context(|| {
1262 format!(
1263 "replace raw mirror manifest {} from {}",
1264 manifest_path.display(),
1265 temp_path.display()
1266 )
1267 })?;
1268 set_private_file_permissions(manifest_path)?;
1269 sync_file(manifest_path)?;
1270 sync_parent(manifest_path)?;
1271 remove_empty_temp_dir_best_effort(&temp_dir);
1272 Ok(())
1273}
1274
1275fn raw_mirror_manifest_path_from_relative(root: &Path, relative_path: &str) -> Result<PathBuf> {
1276 let relative = Path::new(relative_path);
1277 if relative.is_absolute() {
1278 return Err(anyhow!(
1279 "raw mirror manifest path must be relative: {relative_path}"
1280 ));
1281 }
1282
1283 let mut normal_components = Vec::new();
1284 for component in relative.components() {
1285 match component {
1286 std::path::Component::Normal(part) => normal_components.push(part),
1287 _ => {
1288 return Err(anyhow!(
1289 "raw mirror manifest path must use only normal relative components: {relative_path}"
1290 ));
1291 }
1292 }
1293 }
1294
1295 if normal_components.len() != 2
1296 || normal_components[0] != std::ffi::OsStr::new("manifests")
1297 || Path::new(normal_components[1])
1298 .extension()
1299 .and_then(|ext| ext.to_str())
1300 != Some("json")
1301 {
1302 return Err(anyhow!(
1303 "raw mirror manifest path must match manifests/<id>.json: {relative_path}"
1304 ));
1305 }
1306
1307 Ok(root.join(relative))
1308}
1309
1310fn verify_existing_file(path: &Path, expected_blake3: &str) -> Result<()> {
1311 let metadata = fs::symlink_metadata(path)
1312 .with_context(|| format!("stat raw mirror blob {}", path.display()))?;
1313 if metadata.file_type().is_symlink() {
1314 return Err(anyhow!(
1315 "refusing to read symlink raw mirror blob {}",
1316 path.display()
1317 ));
1318 }
1319 if !metadata.is_file() {
1320 return Err(anyhow!(
1321 "refusing to read non-file raw mirror blob {}",
1322 path.display()
1323 ));
1324 }
1325 let actual = file_blake3(path)?;
1326 if actual == expected_blake3 {
1327 Ok(())
1328 } else {
1329 Err(anyhow!(
1330 "existing raw mirror blob {} has blake3 {}, expected {}",
1331 path.display(),
1332 actual,
1333 expected_blake3
1334 ))
1335 }
1336}
1337
1338fn verify_existing_manifest(path: &Path, expected_blob_blake3: &str) -> Result<()> {
1339 let manifest = read_raw_mirror_manifest(path)?;
1340 if manifest.blob_blake3 == expected_blob_blake3 {
1341 Ok(())
1342 } else {
1343 Err(anyhow!(
1344 "existing raw mirror manifest {} points at blob {}, expected {}",
1345 path.display(),
1346 manifest.blob_blake3,
1347 expected_blob_blake3
1348 ))
1349 }
1350}
1351
1352fn read_raw_mirror_manifest(path: &Path) -> Result<RawMirrorManifestFile> {
1353 let metadata = fs::symlink_metadata(path)
1354 .with_context(|| format!("stat raw mirror manifest {}", path.display()))?;
1355 if metadata.file_type().is_symlink() {
1356 return Err(anyhow!(
1357 "refusing to read symlink raw mirror manifest {}",
1358 path.display()
1359 ));
1360 }
1361 if !metadata.is_file() {
1362 return Err(anyhow!(
1363 "refusing to read non-file raw mirror manifest {}",
1364 path.display()
1365 ));
1366 }
1367 serde_json::from_slice(
1368 &fs::read(path).with_context(|| format!("read raw mirror manifest {}", path.display()))?,
1369 )
1370 .with_context(|| format!("parse raw mirror manifest {}", path.display()))
1371}
1372
1373fn raw_mirror_root(data_dir: &Path) -> PathBuf {
1374 data_dir
1375 .join(RAW_MIRROR_ROOT_DIR)
1376 .join(RAW_MIRROR_VERSION_DIR)
1377}
1378
1379fn ensure_raw_mirror_root(data_dir: &Path) -> Result<PathBuf> {
1380 let root_parent = data_dir.join(RAW_MIRROR_ROOT_DIR);
1381 ensure_private_dir(&root_parent)?;
1382 let root = root_parent.join(RAW_MIRROR_VERSION_DIR);
1383 ensure_private_dir(&root)?;
1384 Ok(root)
1385}
1386
1387fn raw_mirror_blob_cache_key(
1388 input: &RawMirrorCaptureInput<'_>,
1389 source_metadata: &fs::Metadata,
1390) -> RawMirrorBlobCacheKey {
1391 RawMirrorBlobCacheKey {
1392 data_dir: input.data_dir.to_path_buf(),
1393 source_path: input.source_path.to_path_buf(),
1394 source_identity: source_identity_token(source_metadata),
1395 source_size_bytes: source_metadata.len(),
1396 source_mtime_ns: source_metadata.modified().ok().and_then(system_time_to_ns),
1397 source_change_time_ns: source_change_time_ns(source_metadata),
1398 }
1399}
1400
1401fn cached_raw_mirror_blob_record(
1402 key: &RawMirrorBlobCacheKey,
1403 root: &Path,
1404) -> Option<RawMirrorBlobRecord> {
1405 let cache = BLOB_CAPTURE_CACHE.get_or_init(|| Mutex::new(HashMap::new()));
1406 let record = {
1407 let mut guard = cache.lock().ok()?;
1408 let record = guard.get(key).cloned()?;
1409 if raw_mirror_blob_relative_path(&record.blob_blake3).is_none() {
1410 guard.remove(key);
1411 return None;
1412 }
1413 record
1414 };
1415
1416 let blob_relative_path = raw_mirror_blob_relative_path(&record.blob_blake3)?;
1417 let blob_path = root.join(blob_relative_path);
1418 let metadata_valid = fs::symlink_metadata(&blob_path)
1419 .map(|metadata| metadata.is_file() && !metadata.file_type().is_symlink())
1420 .unwrap_or(false);
1421 if !metadata_valid {
1422 remove_cached_raw_mirror_blob_record_if_unchanged(cache, key, &record);
1423 return None;
1424 }
1425
1426 match file_blake3(&blob_path) {
1427 Ok(actual) if actual == record.blob_blake3 => Some(record),
1428 Ok(actual) => {
1429 tracing::warn!(
1430 path = %blob_path.display(),
1431 expected_blake3 = %record.blob_blake3,
1432 actual_blake3 = %actual,
1433 "discarding raw mirror blob cache entry with mismatched content"
1434 );
1435 remove_cached_raw_mirror_blob_record_if_unchanged(cache, key, &record);
1436 None
1437 }
1438 Err(err) => {
1439 tracing::debug!(
1440 path = %blob_path.display(),
1441 error = %err,
1442 "discarding unreadable raw mirror blob cache entry"
1443 );
1444 remove_cached_raw_mirror_blob_record_if_unchanged(cache, key, &record);
1445 None
1446 }
1447 }
1448}
1449
1450fn remove_cached_raw_mirror_blob_record_if_unchanged(
1451 cache: &Mutex<HashMap<RawMirrorBlobCacheKey, RawMirrorBlobRecord>>,
1452 key: &RawMirrorBlobCacheKey,
1453 stale_record: &RawMirrorBlobRecord,
1454) {
1455 if let Ok(mut guard) = cache.lock()
1456 && guard
1457 .get(key)
1458 .is_some_and(|current| current == stale_record)
1459 {
1460 guard.remove(key);
1461 }
1462}
1463
1464fn cache_raw_mirror_blob_record(key: RawMirrorBlobCacheKey, record: RawMirrorBlobRecord) {
1465 let cache = BLOB_CAPTURE_CACHE.get_or_init(|| Mutex::new(HashMap::new()));
1466 if let Ok(mut guard) = cache.lock() {
1467 guard.insert(key, record);
1468 }
1469}
1470
1471fn raw_mirror_blob_relative_path(blob_blake3: &str) -> Option<String> {
1472 if blob_blake3.len() != 64 || !blob_blake3.chars().all(|c| c.is_ascii_hexdigit()) {
1473 return None;
1474 }
1475 let lower = blob_blake3.to_ascii_lowercase();
1476 Some(format!(
1477 "blobs/{}/{}/{}.{}",
1478 RAW_MIRROR_HASH_ALGORITHM,
1479 &lower[..2],
1480 lower,
1481 RAW_MIRROR_BLOB_EXTENSION
1482 ))
1483}
1484
1485fn raw_mirror_manifest_relative_path(manifest_id: &str) -> String {
1486 format!("manifests/{manifest_id}.json")
1487}
1488
1489fn raw_mirror_original_path_blake3(original_path: &str) -> String {
1490 let mut hasher = blake3::Hasher::new();
1491 hasher.update(b"doctor-raw-mirror-original-path-v1");
1492 hasher.update(&[0]);
1493 hasher.update(original_path.as_bytes());
1494 hasher.finalize().to_hex().to_string()
1495}
1496
1497fn raw_mirror_manifest_id(
1498 provider: &str,
1499 source_id: &str,
1500 origin_kind: &str,
1501 origin_host: Option<&str>,
1502 original_path_blake3: &str,
1503 blob_blake3: &str,
1504) -> String {
1505 canonical_blake3(
1506 "doctor-raw-mirror-manifest-id-v1",
1507 json!({
1508 "provider": provider,
1509 "source_id": source_id,
1510 "origin_kind": origin_kind,
1511 "origin_host": origin_host,
1512 "original_path_blake3": original_path_blake3,
1513 "blob_blake3": blob_blake3,
1514 }),
1515 )
1516}
1517
1518fn raw_mirror_manifest_blake3(manifest: &RawMirrorManifestFile) -> String {
1519 let mut value = serde_json::to_value(manifest).unwrap_or_default();
1520 if let Value::Object(map) = &mut value {
1521 map.remove("manifest_blake3");
1522 }
1523 canonical_blake3("doctor-raw-mirror-manifest-v1", value)
1524}
1525
1526fn canonical_blake3(prefix: &str, value: Value) -> String {
1527 let encoded = serde_json::to_vec(&canonical_json_value(value)).unwrap_or_default();
1528 let mut hasher = blake3::Hasher::new();
1529 hasher.update(prefix.as_bytes());
1530 hasher.update(&[0]);
1531 hasher.update(&encoded);
1532 format!("{prefix}-{}", hasher.finalize().to_hex())
1533}
1534
1535fn canonical_json_value(value: Value) -> Value {
1536 match value {
1537 Value::Array(items) => Value::Array(items.into_iter().map(canonical_json_value).collect()),
1538 Value::Object(map) => {
1539 let mut entries: Vec<_> = map.into_iter().collect();
1540 entries.sort_by(|left, right| left.0.cmp(&right.0));
1541 let mut canonical = serde_json::Map::new();
1542 for (key, value) in entries {
1543 canonical.insert(key, canonical_json_value(value));
1544 }
1545 Value::Object(canonical)
1546 }
1547 other => other,
1548 }
1549}
1550
1551fn unique_db_links(links: &[RawMirrorDbLink]) -> Vec<RawMirrorDbLink> {
1552 let mut dedup = links.to_vec();
1553 dedup.sort_by(|left, right| {
1554 (
1555 left.conversation_id,
1556 left.message_count,
1557 left.started_at_ms,
1558 left.source_path.as_deref().unwrap_or(""),
1559 )
1560 .cmp(&(
1561 right.conversation_id,
1562 right.message_count,
1563 right.started_at_ms,
1564 right.source_path.as_deref().unwrap_or(""),
1565 ))
1566 });
1567 dedup.dedup();
1568 dedup
1569}
1570
1571fn file_blake3(path: &Path) -> Result<String> {
1572 let mut file = File::open(path).with_context(|| format!("open {}", path.display()))?;
1573 let mut hasher = blake3::Hasher::new();
1574 let mut buffer = [0u8; 64 * 1024];
1575 loop {
1576 let read = file
1577 .read(&mut buffer)
1578 .with_context(|| format!("read {}", path.display()))?;
1579 if read == 0 {
1580 break;
1581 }
1582 hasher.update(&buffer[..read]);
1583 }
1584 Ok(hasher.finalize().to_hex().to_string())
1585}
1586
1587fn ensure_private_dir(path: &Path) -> Result<()> {
1588 create_private_dir_all(path)
1589 .with_context(|| format!("create raw mirror dir {}", path.display()))?;
1590 let metadata = fs::symlink_metadata(path)
1591 .with_context(|| format!("stat raw mirror dir {}", path.display()))?;
1592 let file_type = metadata.file_type();
1593 if file_type.is_symlink() {
1594 return Err(anyhow!(
1595 "refusing to use symlink raw mirror dir {}",
1596 path.display()
1597 ));
1598 }
1599 if !file_type.is_dir() {
1600 return Err(anyhow!(
1601 "refusing to use non-directory raw mirror path {}",
1602 path.display()
1603 ));
1604 }
1605 #[cfg(unix)]
1606 {
1607 use std::os::unix::fs::PermissionsExt;
1608 if metadata.permissions().mode() & 0o777 != 0o700 {
1609 set_private_dir_permissions(path)?;
1610 }
1611 }
1612 #[cfg(not(unix))]
1613 {
1614 set_private_dir_permissions(path)?;
1615 }
1616 Ok(())
1617}
1618
1619fn ensure_private_dir_descendant(root: &Path, path: &Path) -> Result<()> {
1620 let relative = path.strip_prefix(root).with_context(|| {
1621 format!(
1622 "raw mirror private dir {} is not under root {}",
1623 path.display(),
1624 root.display()
1625 )
1626 })?;
1627
1628 if let Some(root_parent) = root.parent() {
1629 ensure_private_dir(root_parent)?;
1630 }
1631 ensure_private_dir(root)?;
1632 let mut current = root.to_path_buf();
1633 for component in relative.components() {
1634 match component {
1635 Component::Normal(part) => {
1636 current.push(part);
1637 ensure_private_dir(¤t)?;
1638 }
1639 Component::CurDir => {}
1640 _ => {
1641 return Err(anyhow!(
1642 "raw mirror private dir contains non-normal component: {}",
1643 path.display()
1644 ));
1645 }
1646 }
1647 }
1648
1649 Ok(())
1650}
1651
1652fn private_create_new_file(path: &Path) -> Result<File> {
1653 let mut options = OpenOptions::new();
1654 options.write(true).create_new(true);
1655 set_private_create_file_mode(&mut options);
1656 let file = options
1657 .open(path)
1658 .with_context(|| format!("create raw mirror file {}", path.display()))?;
1659 Ok(file)
1660}
1661
1662#[cfg(unix)]
1663fn create_private_dir_all(path: &Path) -> std::io::Result<()> {
1664 use std::os::unix::fs::DirBuilderExt;
1665
1666 let mut builder = fs::DirBuilder::new();
1667 builder.recursive(true).mode(0o700).create(path)
1668}
1669
1670#[cfg(not(unix))]
1671fn create_private_dir_all(path: &Path) -> std::io::Result<()> {
1672 fs::create_dir_all(path)
1673}
1674
1675#[cfg(unix)]
1676fn set_private_create_file_mode(options: &mut OpenOptions) {
1677 use std::os::unix::fs::OpenOptionsExt;
1678
1679 options.mode(0o600);
1680}
1681
1682#[cfg(not(unix))]
1683fn set_private_create_file_mode(_options: &mut OpenOptions) {}
1684
1685fn sync_open_file_if_required(message_file: &File, context: impl FnOnce() -> String) -> Result<()> {
1686 if !raw_mirror_fsync_enabled() {
1687 return Ok(());
1688 }
1689 message_file.sync_all().with_context(context)
1690}
1691
1692fn sync_file(path: &Path) -> Result<()> {
1693 if !raw_mirror_fsync_enabled() {
1694 return Ok(());
1695 }
1696 let mut options = OpenOptions::new();
1697 options.read(true);
1698 #[cfg(windows)]
1699 options.write(true);
1700 options
1701 .open(path)
1702 .and_then(|file| file.sync_all())
1703 .with_context(|| format!("sync raw mirror file {}", path.display()))
1704}
1705
1706#[cfg(not(windows))]
1707fn sync_parent(path: &Path) -> Result<()> {
1708 if !raw_mirror_fsync_enabled() {
1709 return Ok(());
1710 }
1711 let Some(parent) = path.parent() else {
1712 return Ok(());
1713 };
1714 File::open(parent)
1715 .and_then(|file| file.sync_all())
1716 .with_context(|| format!("sync raw mirror parent {}", parent.display()))
1717}
1718
1719#[cfg(windows)]
1720fn sync_parent(_path: &Path) -> Result<()> {
1721 Ok(())
1722}
1723
1724fn unique_temp_path(dir: &Path, label: &str) -> PathBuf {
1725 let nonce = TEMP_NONCE.fetch_add(1, Ordering::Relaxed);
1726 let nanos = SystemTime::now()
1727 .duration_since(UNIX_EPOCH)
1728 .unwrap_or_default()
1729 .as_nanos();
1730 dir.join(format!(
1731 ".{label}.{}.{}.{}.tmp",
1732 std::process::id(),
1733 nanos,
1734 nonce
1735 ))
1736}
1737
1738fn unique_capture_temp_dir(root: &Path) -> PathBuf {
1739 let nonce = TEMP_NONCE.fetch_add(1, Ordering::Relaxed);
1740 let nanos = SystemTime::now()
1741 .duration_since(UNIX_EPOCH)
1742 .unwrap_or_default()
1743 .as_nanos();
1744 root.join("tmp").join(format!(
1745 "capture.{}.{}.{}",
1746 std::process::id(),
1747 nanos,
1748 nonce
1749 ))
1750}
1751
1752fn remove_temp_best_effort(path: &Path) {
1753 if let Err(err) = fs::remove_file(path) {
1754 tracing::debug!(
1755 path = %path.display(),
1756 error = %err,
1757 "failed to remove raw mirror temp file"
1758 );
1759 }
1760}
1761
1762fn remove_empty_temp_dir_best_effort(path: &Path) {
1763 if let Err(err) = fs::remove_dir(path) {
1764 tracing::debug!(
1765 path = %path.display(),
1766 error = %err,
1767 "failed to remove raw mirror temp directory"
1768 );
1769 }
1770}
1771
1772fn redacted_original_path(provider: &str, source_path: &Path) -> String {
1773 let file_name = source_path
1774 .file_name()
1775 .and_then(|name| name.to_str())
1776 .unwrap_or("session");
1777 format!("[{provider}]/{file_name}")
1778}
1779
1780fn now_ms() -> i64 {
1781 system_time_to_ms(SystemTime::now()).unwrap_or(0)
1782}
1783
1784fn system_time_to_ms(time: SystemTime) -> Option<i64> {
1785 time.duration_since(UNIX_EPOCH)
1786 .ok()
1787 .and_then(|duration| i64::try_from(duration.as_millis()).ok())
1788}
1789
1790fn system_time_to_ns(time: SystemTime) -> Option<u128> {
1791 time.duration_since(UNIX_EPOCH)
1792 .ok()
1793 .map(|duration| duration.as_nanos())
1794}
1795
1796#[cfg(unix)]
1797fn set_private_dir_permissions(path: &Path) -> Result<()> {
1798 use std::os::unix::fs::PermissionsExt;
1799 fs::set_permissions(path, fs::Permissions::from_mode(0o700))
1800 .with_context(|| format!("set raw mirror dir permissions {}", path.display()))
1801}
1802
1803#[cfg(not(unix))]
1804fn set_private_dir_permissions(_path: &Path) -> Result<()> {
1805 Ok(())
1806}
1807
1808#[cfg(unix)]
1809fn set_private_file_permissions(path: &Path) -> Result<()> {
1810 use std::os::unix::fs::PermissionsExt;
1811 fs::set_permissions(path, fs::Permissions::from_mode(0o600))
1812 .with_context(|| format!("set raw mirror file permissions {}", path.display()))
1813}
1814
1815#[cfg(not(unix))]
1816fn set_private_file_permissions(_path: &Path) -> Result<()> {
1817 Ok(())
1818}
1819
1820#[cfg(test)]
1821mod tests {
1822 use super::*;
1823
1824 #[test]
1825 fn capture_source_file_writes_doctor_compatible_manifest_idempotently() {
1826 let temp = tempfile::TempDir::new().expect("tempdir");
1827 let data_dir = temp.path().join("cass-data");
1828 let source_path = temp.path().join("rollout-fixture.jsonl");
1829 let source_bytes = b"{\"type\":\"message\",\"text\":\"hello\"}\n";
1830 fs::write(&source_path, source_bytes).expect("write source");
1831 let db_link = RawMirrorDbLink {
1832 conversation_id: Some(42),
1833 message_count: Some(1),
1834 source_path: Some(source_path.display().to_string()),
1835 started_at_ms: Some(1_733_000_000_000),
1836 };
1837
1838 let first = capture_source_file(RawMirrorCaptureInput {
1839 data_dir: &data_dir,
1840 provider: "codex",
1841 source_id: "local",
1842 origin_kind: "local",
1843 origin_host: None,
1844 source_path: &source_path,
1845 db_links: std::slice::from_ref(&db_link),
1846 })
1847 .expect("first capture");
1848 let second = capture_source_file(RawMirrorCaptureInput {
1849 data_dir: &data_dir,
1850 provider: "codex",
1851 source_id: "local",
1852 origin_kind: "local",
1853 origin_host: None,
1854 source_path: &source_path,
1855 db_links: std::slice::from_ref(&db_link),
1856 })
1857 .expect("second capture");
1858
1859 assert_eq!(first.manifest_id, second.manifest_id);
1860 assert_eq!(first.blob_blake3, second.blob_blake3);
1861 assert_eq!(first.captured_at_ms, second.captured_at_ms);
1862 assert_eq!(first.source_mtime_ms, second.source_mtime_ms);
1863 assert!(!first.already_present);
1864 assert!(second.already_present);
1865 assert_eq!(fs::read(&source_path).expect("source bytes"), source_bytes);
1866
1867 let blob_path = data_dir
1868 .join(RAW_MIRROR_ROOT_DIR)
1869 .join(RAW_MIRROR_VERSION_DIR)
1870 .join(&first.blob_relative_path);
1871 let manifest_path = data_dir
1872 .join(RAW_MIRROR_ROOT_DIR)
1873 .join(RAW_MIRROR_VERSION_DIR)
1874 .join(&first.manifest_relative_path);
1875 assert_eq!(fs::read(blob_path).expect("blob bytes"), source_bytes);
1876
1877 let manifest: Value =
1878 serde_json::from_slice(&fs::read(&manifest_path).expect("manifest bytes"))
1879 .expect("manifest json");
1880 assert_eq!(
1881 manifest["manifest_kind"].as_str(),
1882 Some(RAW_MIRROR_MANIFEST_KIND)
1883 );
1884 assert_eq!(manifest["provider"].as_str(), Some("codex"));
1885 assert_eq!(
1886 manifest["blob_blake3"].as_str(),
1887 Some(first.blob_blake3.as_str())
1888 );
1889 assert_eq!(
1890 manifest["redacted_original_path"].as_str(),
1891 Some("[codex]/rollout-fixture.jsonl")
1892 );
1893 assert_eq!(
1894 manifest["db_links"][0]["conversation_id"].as_i64(),
1895 Some(42)
1896 );
1897 assert_eq!(manifest["db_links"][0]["message_count"].as_u64(), Some(1));
1898 assert!(
1899 manifest["manifest_blake3"]
1900 .as_str()
1901 .is_some_and(|value| value.starts_with("doctor-raw-mirror-manifest-v1-"))
1902 );
1903 let tmp_root = data_dir
1904 .join(RAW_MIRROR_ROOT_DIR)
1905 .join(RAW_MIRROR_VERSION_DIR)
1906 .join("tmp");
1907 assert_eq!(
1908 fs::read_dir(&tmp_root)
1909 .expect("raw mirror tmp root")
1910 .collect::<Vec<_>>()
1911 .len(),
1912 0,
1913 "successful captures must not leave doctor-visible interrupted temp artifacts"
1914 );
1915
1916 #[cfg(unix)]
1917 {
1918 use std::os::unix::fs::PermissionsExt;
1919
1920 let root = data_dir
1921 .join(RAW_MIRROR_ROOT_DIR)
1922 .join(RAW_MIRROR_VERSION_DIR);
1923 assert_eq!(
1924 fs::metadata(&root)
1925 .expect("raw mirror root metadata")
1926 .permissions()
1927 .mode()
1928 & 0o777,
1929 0o700
1930 );
1931 assert_eq!(
1932 fs::metadata(&manifest_path)
1933 .expect("manifest metadata")
1934 .permissions()
1935 .mode()
1936 & 0o777,
1937 0o600
1938 );
1939 }
1940 }
1941
1942 #[test]
1943 fn capture_source_file_merges_db_links_into_existing_manifest() {
1944 let temp = tempfile::TempDir::new().expect("tempdir");
1945 let data_dir = temp.path().join("cass-data");
1946 let source_path = temp.path().join("preparse-then-parsed.jsonl");
1947 let source_bytes = b"{\"type\":\"message\",\"text\":\"hello\"}\n";
1948 fs::write(&source_path, source_bytes).expect("write source");
1949
1950 let preparse = capture_source_file(RawMirrorCaptureInput {
1951 data_dir: &data_dir,
1952 provider: "codex",
1953 source_id: "local",
1954 origin_kind: "local",
1955 origin_host: None,
1956 source_path: &source_path,
1957 db_links: &[],
1958 })
1959 .expect("preparse capture");
1960
1961 let parsed_link = RawMirrorDbLink {
1962 conversation_id: None,
1963 message_count: Some(1),
1964 source_path: Some(source_path.display().to_string()),
1965 started_at_ms: Some(1_733_000_000_000),
1966 };
1967 let parsed = capture_source_file(RawMirrorCaptureInput {
1968 data_dir: &data_dir,
1969 provider: "codex",
1970 source_id: "local",
1971 origin_kind: "local",
1972 origin_host: None,
1973 source_path: &source_path,
1974 db_links: std::slice::from_ref(&parsed_link),
1975 })
1976 .expect("parsed capture");
1977
1978 assert_eq!(preparse.manifest_id, parsed.manifest_id);
1979 assert_eq!(preparse.blob_blake3, parsed.blob_blake3);
1980 assert!(parsed.already_present);
1981
1982 let manifest_path = data_dir
1983 .join(RAW_MIRROR_ROOT_DIR)
1984 .join(RAW_MIRROR_VERSION_DIR)
1985 .join(&parsed.manifest_relative_path);
1986 let manifest = read_raw_mirror_manifest(&manifest_path).expect("merged manifest");
1987 assert_eq!(
1988 manifest.db_links,
1989 vec![parsed_link],
1990 "second capture must enrich the pre-parse manifest with DB-link evidence"
1991 );
1992 let expected_manifest_blake3 = raw_mirror_manifest_blake3(&manifest);
1993 assert_eq!(
1994 manifest.manifest_blake3.as_deref(),
1995 Some(expected_manifest_blake3.as_str()),
1996 "manifest checksum must be recomputed after DB-link merge"
1997 );
1998 assert_eq!(fs::read(&source_path).expect("source bytes"), source_bytes);
1999 }
2000
2001 #[test]
2002 fn merge_manifest_db_links_rejects_hostile_relative_paths() {
2003 let temp = tempfile::TempDir::new().expect("tempdir");
2004 let data_dir = temp.path().join("cass-data");
2005 let db_link = RawMirrorDbLink {
2006 conversation_id: Some(42),
2007 message_count: Some(1),
2008 source_path: Some("source.jsonl".to_string()),
2009 started_at_ms: Some(1_733_000_000_000),
2010 };
2011
2012 for relative in [
2013 "../escape.json",
2014 "/tmp/escape.json",
2015 "manifests/../escape.json",
2016 "blobs/blake3/ab/not-a-manifest.raw",
2017 "manifests/not-json.txt",
2018 ] {
2019 let err = merge_manifest_db_links(&data_dir, relative, std::slice::from_ref(&db_link))
2020 .expect_err("hostile manifest path should be rejected");
2021 assert!(
2022 err.to_string().contains("raw mirror manifest path"),
2023 "unexpected error for {relative}: {err}"
2024 );
2025 }
2026 }
2027
2028 #[cfg(unix)]
2029 #[test]
2030 fn merge_manifest_db_links_rejects_symlink_manifest_path() {
2031 let temp = tempfile::TempDir::new().expect("tempdir");
2032 let data_dir = temp.path().join("cass-data");
2033 let manifest_dir = data_dir.join("raw-mirror/v1/manifests");
2034 fs::create_dir_all(&manifest_dir).expect("manifest dir");
2035 let outside = temp.path().join("outside.json");
2036 fs::write(&outside, "{}").expect("outside manifest");
2037 std::os::unix::fs::symlink(&outside, manifest_dir.join("link.json"))
2038 .expect("symlink manifest");
2039 let db_link = RawMirrorDbLink {
2040 conversation_id: Some(42),
2041 message_count: Some(1),
2042 source_path: Some("source.jsonl".to_string()),
2043 started_at_ms: Some(1_733_000_000_000),
2044 };
2045
2046 let err = merge_manifest_db_links(
2047 &data_dir,
2048 "manifests/link.json",
2049 std::slice::from_ref(&db_link),
2050 )
2051 .expect_err("symlink manifest should be rejected");
2052 assert!(
2053 err.to_string().contains("symlink raw mirror manifest"),
2054 "unexpected symlink-manifest error: {err}"
2055 );
2056 }
2057
2058 #[test]
2059 fn capture_source_file_deduplicates_blob_for_distinct_source_paths() {
2060 let temp = tempfile::TempDir::new().expect("tempdir");
2061 let data_dir = temp.path().join("cass-data");
2062 let first_source = temp.path().join("first.jsonl");
2063 let second_source = temp.path().join("second.jsonl");
2064 let source_bytes = b"{\"type\":\"message\",\"text\":\"shared\"}\n";
2065 fs::write(&first_source, source_bytes).expect("write first source");
2066 fs::write(&second_source, source_bytes).expect("write second source");
2067
2068 let first = capture_source_file(RawMirrorCaptureInput {
2069 data_dir: &data_dir,
2070 provider: "codex",
2071 source_id: "local",
2072 origin_kind: "local",
2073 origin_host: None,
2074 source_path: &first_source,
2075 db_links: &[],
2076 })
2077 .expect("first capture");
2078 let second = capture_source_file(RawMirrorCaptureInput {
2079 data_dir: &data_dir,
2080 provider: "codex",
2081 source_id: "local",
2082 origin_kind: "local",
2083 origin_host: None,
2084 source_path: &second_source,
2085 db_links: &[],
2086 })
2087 .expect("second capture");
2088
2089 assert_eq!(first.blob_blake3, second.blob_blake3);
2090 assert_eq!(first.blob_relative_path, second.blob_relative_path);
2091 assert_ne!(first.manifest_id, second.manifest_id);
2092 assert!(
2093 !second.already_present,
2094 "a duplicate blob with a new source manifest is not a full capture replay"
2095 );
2096
2097 let manifest_root = data_dir
2098 .join(RAW_MIRROR_ROOT_DIR)
2099 .join(RAW_MIRROR_VERSION_DIR)
2100 .join("manifests");
2101 let manifests = fs::read_dir(manifest_root)
2102 .expect("manifest dir")
2103 .collect::<std::io::Result<Vec<_>>>()
2104 .expect("manifest entries");
2105 assert_eq!(manifests.len(), 2);
2106
2107 let summary = storage_summary(&data_dir);
2108 assert!(summary.initialized);
2109 assert_eq!(summary.manifest_count, 2);
2110 assert_eq!(summary.unique_blob_count, 1);
2111 assert_eq!(summary.total_blob_bytes, source_bytes.len() as u64);
2112 assert_eq!(summary.largest_blob_bytes, source_bytes.len() as u64);
2113 assert_eq!(summary.missing_blob_count, 0);
2114 assert_eq!(summary.invalid_manifest_count, 0);
2115 assert!(summary.total_storage_bytes >= source_bytes.len() as u64);
2116 }
2117
2118 #[test]
2119 fn storage_summary_rejects_hostile_blob_relative_path() {
2120 let temp = tempfile::TempDir::new().expect("tempdir");
2121 let data_dir = temp.path().join("cass-data");
2122 let source_path = temp.path().join("source.jsonl");
2123 fs::write(
2124 &source_path,
2125 b"{\"type\":\"message\",\"text\":\"hostile\"}\n",
2126 )
2127 .expect("write source");
2128
2129 let captured = capture_source_file(RawMirrorCaptureInput {
2130 data_dir: &data_dir,
2131 provider: "codex",
2132 source_id: "local",
2133 origin_kind: "local",
2134 origin_host: None,
2135 source_path: &source_path,
2136 db_links: &[],
2137 })
2138 .expect("capture source");
2139 let manifest_path = data_dir
2140 .join(RAW_MIRROR_ROOT_DIR)
2141 .join(RAW_MIRROR_VERSION_DIR)
2142 .join(&captured.manifest_relative_path);
2143 let mut manifest = read_raw_mirror_manifest(&manifest_path).expect("read manifest");
2144 manifest.blob_relative_path = "../outside.raw".to_string();
2145 manifest.manifest_blake3 = Some(raw_mirror_manifest_blake3(&manifest));
2146 fs::write(
2147 &manifest_path,
2148 serde_json::to_vec_pretty(&manifest).expect("serialize manifest"),
2149 )
2150 .expect("tamper manifest");
2151
2152 let summary = storage_summary(&data_dir);
2153 assert_eq!(summary.manifest_count, 1);
2154 assert_eq!(summary.invalid_manifest_count, 1);
2155 assert_eq!(summary.unique_blob_count, 0);
2156 assert_eq!(summary.total_blob_bytes, 0);
2157 }
2158
2159 #[test]
2160 fn prune_fails_closed_on_hostile_manifest_inventory() {
2161 let temp = tempfile::TempDir::new().expect("tempdir");
2162 let data_dir = temp.path().join("cass-data");
2163 let source_path = temp.path().join("source.jsonl");
2164 fs::write(
2165 &source_path,
2166 b"{\"type\":\"message\",\"text\":\"hostile\"}\n",
2167 )
2168 .expect("write source");
2169
2170 let captured = capture_source_file(RawMirrorCaptureInput {
2171 data_dir: &data_dir,
2172 provider: "codex",
2173 source_id: "local",
2174 origin_kind: "local",
2175 origin_host: None,
2176 source_path: &source_path,
2177 db_links: &[],
2178 })
2179 .expect("capture source");
2180 let root = data_dir
2181 .join(RAW_MIRROR_ROOT_DIR)
2182 .join(RAW_MIRROR_VERSION_DIR);
2183 let manifest_path = root.join(&captured.manifest_relative_path);
2184 let blob_path = root.join(&captured.blob_relative_path);
2185 let mut manifest = read_raw_mirror_manifest(&manifest_path).expect("read manifest");
2186 manifest.blob_relative_path = "../outside.raw".to_string();
2187 manifest.manifest_blake3 = Some(raw_mirror_manifest_blake3(&manifest));
2188 fs::write(
2189 &manifest_path,
2190 serde_json::to_vec_pretty(&manifest).expect("serialize manifest"),
2191 )
2192 .expect("tamper manifest");
2193
2194 let err = prune(
2195 &data_dir,
2196 RawMirrorPruneOptions {
2197 older_than_ms: Some(0),
2198 max_size_bytes: None,
2199 keep_tags: Vec::new(),
2200 safety_hold_down_ms: 0,
2201 apply: true,
2202 },
2203 )
2204 .expect_err("hostile inventory should fail closed");
2205
2206 assert!(
2207 err.to_string().contains("unexpected blob path"),
2208 "error should explain the unsafe manifest inventory: {err}"
2209 );
2210 assert!(manifest_path.exists());
2211 assert!(blob_path.exists());
2212 assert!(!root.join("pruned.jsonl").exists());
2213 }
2214
2215 #[test]
2216 fn prune_dry_run_audits_without_removing_manifest_or_blob() {
2217 let temp = tempfile::TempDir::new().expect("tempdir");
2218 let data_dir = temp.path().join("cass-data");
2219 let source_path = temp.path().join("source.jsonl");
2220 fs::write(&source_path, b"{\"type\":\"message\",\"text\":\"old\"}\n")
2221 .expect("write source");
2222 let captured = capture_source_file(RawMirrorCaptureInput {
2223 data_dir: &data_dir,
2224 provider: "codex",
2225 source_id: "local",
2226 origin_kind: "local",
2227 origin_host: None,
2228 source_path: &source_path,
2229 db_links: &[],
2230 })
2231 .expect("capture source");
2232
2233 let report = prune(
2234 &data_dir,
2235 RawMirrorPruneOptions {
2236 older_than_ms: Some(0),
2237 max_size_bytes: None,
2238 keep_tags: Vec::new(),
2239 safety_hold_down_ms: 0,
2240 apply: false,
2241 },
2242 )
2243 .expect("dry-run prune");
2244
2245 assert!(report.initialized);
2246 assert_eq!(report.mode, "dry-run");
2247 assert_eq!(report.planned_manifest_count, 1);
2248 assert_eq!(report.planned_blob_count, 1);
2249 assert_eq!(report.applied_reclaim_bytes, 0);
2250 let root = data_dir
2251 .join(RAW_MIRROR_ROOT_DIR)
2252 .join(RAW_MIRROR_VERSION_DIR);
2253 assert!(root.join(&captured.manifest_relative_path).exists());
2254 assert!(root.join(&captured.blob_relative_path).exists());
2255 let audit_path = root.join("pruned.jsonl");
2256 let audit = fs::read_to_string(audit_path).expect("read audit");
2257 assert!(audit.contains("\"mode\":\"dry-run\""));
2258 assert!(audit.contains("\"applied\":false"));
2259 }
2260
2261 #[test]
2262 #[cfg(unix)]
2263 fn prune_refuses_symlinked_audit_log_without_writing_target() -> Result<()> {
2264 use std::os::unix::fs::symlink;
2265
2266 let temp = tempfile::TempDir::new()?;
2267 let data_dir = temp.path().join("cass-data");
2268 let source_path = temp.path().join("source.jsonl");
2269 let protected_audit_target = temp.path().join("protected-prune-audit.jsonl");
2270 fs::write(&source_path, b"{\"type\":\"message\",\"text\":\"old\"}\n")?;
2271 fs::write(&protected_audit_target, b"protected\n")?;
2272
2273 let captured = capture_source_file(RawMirrorCaptureInput {
2274 data_dir: &data_dir,
2275 provider: "codex",
2276 source_id: "local",
2277 origin_kind: "local",
2278 origin_host: None,
2279 source_path: &source_path,
2280 db_links: &[],
2281 })?;
2282 let root = data_dir
2283 .join(RAW_MIRROR_ROOT_DIR)
2284 .join(RAW_MIRROR_VERSION_DIR);
2285 let audit_path = root.join("pruned.jsonl");
2286 symlink(&protected_audit_target, &audit_path)?;
2287
2288 let err = match prune(
2289 &data_dir,
2290 RawMirrorPruneOptions {
2291 older_than_ms: Some(0),
2292 max_size_bytes: None,
2293 keep_tags: Vec::new(),
2294 safety_hold_down_ms: 0,
2295 apply: false,
2296 },
2297 ) {
2298 Ok(_) => anyhow::bail!("symlinked prune audit log was accepted"),
2299 Err(err) => err,
2300 };
2301
2302 if !err.to_string().contains("prune audit through symlink") {
2303 anyhow::bail!("unexpected audit symlink error: {err:#}");
2304 }
2305 if !fs::read(&protected_audit_target)?
2306 .as_slice()
2307 .eq(b"protected\n")
2308 {
2309 anyhow::bail!("protected audit target was modified");
2310 }
2311 if !fs::read_link(&audit_path)?
2312 .as_os_str()
2313 .eq(protected_audit_target.as_os_str())
2314 {
2315 anyhow::bail!("audit path did not remain a symlink to the protected target");
2316 }
2317 if !root.join(&captured.manifest_relative_path).exists() {
2318 anyhow::bail!("failed audit append removed the captured manifest");
2319 }
2320 if !root.join(&captured.blob_relative_path).exists() {
2321 anyhow::bail!("failed audit append removed the captured blob");
2322 }
2323 Ok(())
2324 }
2325
2326 #[test]
2327 fn prune_apply_removes_selected_manifest_and_unreferenced_blob() {
2328 let temp = tempfile::TempDir::new().expect("tempdir");
2329 let data_dir = temp.path().join("cass-data");
2330 let source_path = temp.path().join("source.jsonl");
2331 fs::write(&source_path, b"{\"type\":\"message\",\"text\":\"apply\"}\n")
2332 .expect("write source");
2333 let captured = capture_source_file(RawMirrorCaptureInput {
2334 data_dir: &data_dir,
2335 provider: "codex",
2336 source_id: "local",
2337 origin_kind: "local",
2338 origin_host: None,
2339 source_path: &source_path,
2340 db_links: &[],
2341 })
2342 .expect("capture source");
2343 let root = data_dir
2344 .join(RAW_MIRROR_ROOT_DIR)
2345 .join(RAW_MIRROR_VERSION_DIR);
2346 let manifest_path = root.join(&captured.manifest_relative_path);
2347 let blob_path = root.join(&captured.blob_relative_path);
2348
2349 let report = prune(
2350 &data_dir,
2351 RawMirrorPruneOptions {
2352 older_than_ms: Some(0),
2353 max_size_bytes: None,
2354 keep_tags: Vec::new(),
2355 safety_hold_down_ms: 0,
2356 apply: true,
2357 },
2358 )
2359 .expect("apply prune");
2360
2361 assert_eq!(report.applied_manifest_count, 1);
2362 assert_eq!(report.applied_blob_count, 1);
2363 assert!(!manifest_path.exists());
2364 assert!(!blob_path.exists());
2365 let audit = fs::read_to_string(root.join("pruned.jsonl")).expect("read audit");
2366 assert!(audit.contains("\"mode\":\"apply\""));
2367 assert!(audit.contains("\"applied\":true"));
2368 }
2369
2370 #[test]
2371 fn prune_apply_keeps_blob_referenced_by_retained_manifest() {
2372 let temp = tempfile::TempDir::new().expect("tempdir");
2373 let data_dir = temp.path().join("cass-data");
2374 let first_source = temp.path().join("first.jsonl");
2375 let second_source = temp.path().join("second.jsonl");
2376 let bytes = b"{\"type\":\"message\",\"text\":\"shared-retained\"}\n";
2377 fs::write(&first_source, bytes).expect("write first");
2378 fs::write(&second_source, bytes).expect("write second");
2379 let first = capture_source_file(RawMirrorCaptureInput {
2380 data_dir: &data_dir,
2381 provider: "codex",
2382 source_id: "local",
2383 origin_kind: "local",
2384 origin_host: None,
2385 source_path: &first_source,
2386 db_links: &[],
2387 })
2388 .expect("capture first");
2389 let second = capture_source_file(RawMirrorCaptureInput {
2390 data_dir: &data_dir,
2391 provider: "codex",
2392 source_id: "local",
2393 origin_kind: "local",
2394 origin_host: None,
2395 source_path: &second_source,
2396 db_links: &[],
2397 })
2398 .expect("capture second");
2399 let root = data_dir
2400 .join(RAW_MIRROR_ROOT_DIR)
2401 .join(RAW_MIRROR_VERSION_DIR);
2402 let first_manifest_path = root.join(&first.manifest_relative_path);
2403 let second_manifest_path = root.join(&second.manifest_relative_path);
2404 let mut first_manifest =
2405 read_raw_mirror_manifest(&first_manifest_path).expect("first manifest");
2406 first_manifest.captured_at_ms = now_ms().saturating_sub(2 * 86_400_000);
2407 first_manifest.manifest_blake3 = Some(raw_mirror_manifest_blake3(&first_manifest));
2408 fs::write(
2409 &first_manifest_path,
2410 serde_json::to_vec_pretty(&first_manifest).expect("serialize first manifest"),
2411 )
2412 .expect("rewrite first manifest");
2413
2414 let report = prune(
2415 &data_dir,
2416 RawMirrorPruneOptions {
2417 older_than_ms: Some(86_400_000),
2418 max_size_bytes: None,
2419 keep_tags: Vec::new(),
2420 safety_hold_down_ms: 0,
2421 apply: true,
2422 },
2423 )
2424 .expect("apply one-manifest prune");
2425
2426 assert_eq!(report.applied_manifest_count, 1);
2427 assert_eq!(report.applied_blob_count, 0);
2428 assert!(!first_manifest_path.exists());
2429 assert!(second_manifest_path.exists());
2430 assert!(
2431 root.join(&first.blob_relative_path).exists(),
2432 "shared blob must stay while a retained manifest still references it"
2433 );
2434 }
2435
2436 #[test]
2437 fn prune_apply_keep_tag_pins_linked_manifest_and_blob() {
2438 use frankensqlite::compat::ConnectionExt as _;
2439
2440 let temp = tempfile::TempDir::new().expect("tempdir");
2441 let data_dir = temp.path().join("cass-data");
2442 std::fs::create_dir_all(&data_dir).expect("create data dir");
2443 let source_path = temp.path().join("tagged.jsonl");
2444 fs::write(
2445 &source_path,
2446 b"{\"type\":\"message\",\"text\":\"tagged\"}\n",
2447 )
2448 .expect("write source");
2449 let db_link = RawMirrorDbLink {
2450 conversation_id: Some(7),
2451 message_count: Some(1),
2452 source_path: Some(source_path.display().to_string()),
2453 started_at_ms: Some(1_733_000_000_000),
2454 };
2455 let captured = capture_source_file(RawMirrorCaptureInput {
2456 data_dir: &data_dir,
2457 provider: "codex",
2458 source_id: "local",
2459 origin_kind: "local",
2460 origin_host: None,
2461 source_path: &source_path,
2462 db_links: std::slice::from_ref(&db_link),
2463 })
2464 .expect("capture source");
2465 let db_path = data_dir.join("agent_search.db");
2466 let conn = frankensqlite::Connection::open(db_path.to_string_lossy().into_owned())
2467 .expect("open keep-tag db");
2468 conn.execute_compat(
2469 "CREATE TABLE tags (id INTEGER PRIMARY KEY, name TEXT NOT NULL UNIQUE)",
2470 frankensqlite::params![],
2471 )
2472 .expect("create tags");
2473 conn.execute_compat(
2474 "CREATE TABLE conversation_tags (conversation_id INTEGER NOT NULL, tag_id INTEGER NOT NULL, PRIMARY KEY (conversation_id, tag_id))",
2475 frankensqlite::params![],
2476 )
2477 .expect("create conversation_tags");
2478 conn.execute_compat(
2479 "INSERT INTO tags (id, name) VALUES (1, 'keep')",
2480 frankensqlite::params![],
2481 )
2482 .expect("insert tag");
2483 conn.execute_compat(
2484 "INSERT INTO conversation_tags (conversation_id, tag_id) VALUES (7, 1)",
2485 frankensqlite::params![],
2486 )
2487 .expect("insert conversation tag");
2488 drop(conn);
2489
2490 let report = prune(
2491 &data_dir,
2492 RawMirrorPruneOptions {
2493 older_than_ms: Some(0),
2494 max_size_bytes: Some(0),
2495 keep_tags: vec!["keep".to_string()],
2496 safety_hold_down_ms: 0,
2497 apply: true,
2498 },
2499 )
2500 .expect("keep-tag prune");
2501
2502 let root = data_dir
2503 .join(RAW_MIRROR_ROOT_DIR)
2504 .join(RAW_MIRROR_VERSION_DIR);
2505 assert_eq!(report.pinned_manifest_count, 1);
2506 assert_eq!(report.pinned_blob_count, 1);
2507 assert_eq!(report.planned_manifest_count, 0);
2508 assert_eq!(report.planned_blob_count, 0);
2509 assert!(root.join(&captured.manifest_relative_path).exists());
2510 assert!(root.join(&captured.blob_relative_path).exists());
2511 }
2512
2513 #[test]
2514 fn prune_apply_safety_hold_down_pins_recent_manifest_during_size_prune() {
2515 let temp = tempfile::TempDir::new().expect("tempdir");
2516 let data_dir = temp.path().join("cass-data");
2517 let source_path = temp.path().join("recent.jsonl");
2518 fs::write(
2519 &source_path,
2520 b"{\"type\":\"message\",\"text\":\"recent\"}\n",
2521 )
2522 .expect("write source");
2523 let captured = capture_source_file(RawMirrorCaptureInput {
2524 data_dir: &data_dir,
2525 provider: "codex",
2526 source_id: "local",
2527 origin_kind: "local",
2528 origin_host: None,
2529 source_path: &source_path,
2530 db_links: &[],
2531 })
2532 .expect("capture source");
2533
2534 let report = prune(
2535 &data_dir,
2536 RawMirrorPruneOptions {
2537 older_than_ms: None,
2538 max_size_bytes: Some(0),
2539 keep_tags: Vec::new(),
2540 safety_hold_down_ms: 7 * 86_400_000,
2541 apply: true,
2542 },
2543 )
2544 .expect("hold-down prune");
2545
2546 let root = data_dir
2547 .join(RAW_MIRROR_ROOT_DIR)
2548 .join(RAW_MIRROR_VERSION_DIR);
2549 assert_eq!(report.pinned_manifest_count, 1);
2550 assert_eq!(report.pinned_blob_count, 1);
2551 assert_eq!(report.planned_manifest_count, 0);
2552 assert_eq!(report.planned_blob_count, 0);
2553 assert!(root.join(&captured.manifest_relative_path).exists());
2554 assert!(root.join(&captured.blob_relative_path).exists());
2555 }
2556
2557 #[test]
2558 fn capture_source_file_revalidates_cached_blob_contents() {
2559 let temp = tempfile::TempDir::new().expect("tempdir");
2560 let data_dir = temp.path().join("cass-data");
2561 let source_path = temp.path().join("cached-source.jsonl");
2562 let source_bytes = b"{\"type\":\"message\",\"text\":\"cache me\"}\n";
2563 fs::write(&source_path, source_bytes).expect("write source");
2564
2565 let first = capture_source_file(RawMirrorCaptureInput {
2566 data_dir: &data_dir,
2567 provider: "codex",
2568 source_id: "local",
2569 origin_kind: "local",
2570 origin_host: None,
2571 source_path: &source_path,
2572 db_links: &[],
2573 })
2574 .expect("first capture");
2575
2576 let blob_path = data_dir
2577 .join(RAW_MIRROR_ROOT_DIR)
2578 .join(RAW_MIRROR_VERSION_DIR)
2579 .join(&first.blob_relative_path);
2580 fs::write(&blob_path, b"corrupted cached blob").expect("corrupt cached blob");
2581
2582 let err = capture_source_file(RawMirrorCaptureInput {
2583 data_dir: &data_dir,
2584 provider: "codex",
2585 source_id: "local",
2586 origin_kind: "local",
2587 origin_host: None,
2588 source_path: &source_path,
2589 db_links: &[],
2590 })
2591 .expect_err("corrupted content-addressed blob must be rejected");
2592 assert!(
2593 err.to_string().contains("existing raw mirror blob"),
2594 "unexpected cached-blob error: {err:#}"
2595 );
2596 assert_eq!(fs::read(&source_path).expect("source bytes"), source_bytes);
2597 }
2598
2599 #[cfg(unix)]
2600 #[test]
2601 fn capture_source_file_does_not_reuse_cache_after_same_size_mtime_preserving_rewrite() {
2602 let temp = tempfile::TempDir::new().expect("tempdir");
2603 let data_dir = temp.path().join("cass-data");
2604 let source_path = temp.path().join("same-size-rewrite.jsonl");
2605 let first_bytes = b"same length payload A\n";
2606 let second_bytes = b"same length payload B\n";
2607 fs::write(&source_path, first_bytes).expect("write first source");
2608
2609 let first_modified = fs::metadata(&source_path)
2610 .expect("first metadata")
2611 .modified()
2612 .expect("first modified time");
2613 let first = capture_source_file(RawMirrorCaptureInput {
2614 data_dir: &data_dir,
2615 provider: "codex",
2616 source_id: "local",
2617 origin_kind: "local",
2618 origin_host: None,
2619 source_path: &source_path,
2620 db_links: &[],
2621 })
2622 .expect("first capture");
2623
2624 std::thread::sleep(std::time::Duration::from_millis(5));
2625 fs::write(&source_path, second_bytes).expect("rewrite source");
2626 let source = OpenOptions::new()
2627 .write(true)
2628 .open(&source_path)
2629 .expect("open rewritten source");
2630 source
2631 .set_times(std::fs::FileTimes::new().set_modified(first_modified))
2632 .expect("restore original mtime");
2633
2634 let second = capture_source_file(RawMirrorCaptureInput {
2635 data_dir: &data_dir,
2636 provider: "codex",
2637 source_id: "local",
2638 origin_kind: "local",
2639 origin_host: None,
2640 source_path: &source_path,
2641 db_links: &[],
2642 })
2643 .expect("second capture");
2644
2645 assert_ne!(first.blob_blake3, second.blob_blake3);
2646 assert_eq!(
2647 second.blob_blake3,
2648 blake3::hash(second_bytes).to_hex().to_string()
2649 );
2650 assert_eq!(
2651 fs::read(&source_path).expect("source bytes after rewrite"),
2652 second_bytes
2653 );
2654 }
2655
2656 #[cfg(unix)]
2657 #[test]
2658 fn capture_source_file_rejects_symlinked_existing_blob_path() {
2659 let temp = tempfile::TempDir::new().expect("tempdir");
2660 let data_dir = temp.path().join("cass-data");
2661 let source_path = temp.path().join("cached-source.jsonl");
2662 let source_bytes = b"{\"type\":\"message\",\"text\":\"cache me\"}\n";
2663 fs::write(&source_path, source_bytes).expect("write source");
2664
2665 let blob_blake3 = blake3::hash(source_bytes).to_hex().to_string();
2666 let blob_relative_path =
2667 raw_mirror_blob_relative_path(&blob_blake3).expect("blob relative path");
2668 let blob_path = data_dir
2669 .join(RAW_MIRROR_ROOT_DIR)
2670 .join(RAW_MIRROR_VERSION_DIR)
2671 .join(&blob_relative_path);
2672 fs::create_dir_all(blob_path.parent().expect("blob parent")).expect("blob parent dir");
2673 let outside = temp.path().join("outside.raw");
2674 fs::write(&outside, source_bytes).expect("outside blob bytes");
2675 std::os::unix::fs::symlink(&outside, &blob_path).expect("symlink blob");
2676
2677 let err = capture_source_file(RawMirrorCaptureInput {
2678 data_dir: &data_dir,
2679 provider: "codex",
2680 source_id: "local",
2681 origin_kind: "local",
2682 origin_host: None,
2683 source_path: &source_path,
2684 db_links: &[],
2685 })
2686 .expect_err("symlinked content-addressed blob path must be rejected");
2687 assert!(
2688 err.to_string().contains("symlink raw mirror blob"),
2689 "unexpected symlink-blob error: {err:#}"
2690 );
2691
2692 let manifest_root = data_dir
2693 .join(RAW_MIRROR_ROOT_DIR)
2694 .join(RAW_MIRROR_VERSION_DIR)
2695 .join("manifests");
2696 assert!(
2697 !manifest_root.exists(),
2698 "failed blob publish must not write a manifest pointing at a symlinked blob"
2699 );
2700 assert_eq!(fs::read(&source_path).expect("source bytes"), source_bytes);
2701 assert_eq!(fs::read(&outside).expect("outside bytes"), source_bytes);
2702 }
2703
2704 #[cfg(unix)]
2705 #[test]
2706 fn capture_source_file_rejects_symlinked_raw_mirror_root_dir() {
2707 let temp = tempfile::TempDir::new().expect("tempdir");
2708 let data_dir = temp.path().join("cass-data");
2709 let source_path = temp.path().join("source.jsonl");
2710 let outside_mirror = temp.path().join("outside-mirror");
2711 let source_bytes = b"{\"type\":\"message\",\"text\":\"do not redirect archive\"}\n";
2712
2713 fs::create_dir_all(&data_dir).expect("data dir");
2714 fs::create_dir_all(&outside_mirror).expect("outside mirror dir");
2715 fs::write(&source_path, source_bytes).expect("write source");
2716 std::os::unix::fs::symlink(&outside_mirror, data_dir.join(RAW_MIRROR_ROOT_DIR))
2717 .expect("symlink raw mirror root");
2718
2719 let err = capture_source_file(RawMirrorCaptureInput {
2720 data_dir: &data_dir,
2721 provider: "codex",
2722 source_id: "local",
2723 origin_kind: "local",
2724 origin_host: None,
2725 source_path: &source_path,
2726 db_links: &[],
2727 })
2728 .expect_err("symlinked raw-mirror root must be rejected");
2729
2730 assert!(
2731 err.to_string().contains("symlink raw mirror dir"),
2732 "unexpected symlink-root error: {err:#}"
2733 );
2734 assert!(
2735 !outside_mirror.join(RAW_MIRROR_VERSION_DIR).exists(),
2736 "raw mirror capture must not create redirected archive state outside data_dir"
2737 );
2738 assert_eq!(fs::read(&source_path).expect("source bytes"), source_bytes);
2739 }
2740
2741 #[cfg(unix)]
2742 #[test]
2743 fn capture_source_file_rejects_symlinked_blob_directory_component() {
2744 let temp = tempfile::TempDir::new().expect("tempdir");
2745 let data_dir = temp.path().join("cass-data");
2746 let root = data_dir
2747 .join(RAW_MIRROR_ROOT_DIR)
2748 .join(RAW_MIRROR_VERSION_DIR);
2749 let source_path = temp.path().join("source.jsonl");
2750 let outside_blobs = temp.path().join("outside-blobs");
2751 let source_bytes = b"{\"type\":\"message\",\"text\":\"do not redirect blobs\"}\n";
2752
2753 fs::create_dir_all(&root).expect("raw mirror root");
2754 fs::create_dir_all(&outside_blobs).expect("outside blobs dir");
2755 fs::write(&source_path, source_bytes).expect("write source");
2756 std::os::unix::fs::symlink(&outside_blobs, root.join("blobs")).expect("symlink blobs dir");
2757
2758 let err = capture_source_file(RawMirrorCaptureInput {
2759 data_dir: &data_dir,
2760 provider: "codex",
2761 source_id: "local",
2762 origin_kind: "local",
2763 origin_host: None,
2764 source_path: &source_path,
2765 db_links: &[],
2766 })
2767 .expect_err("symlinked blob directory must be rejected");
2768
2769 assert!(
2770 err.to_string().contains("symlink raw mirror dir"),
2771 "unexpected symlink-blob-dir error: {err:#}"
2772 );
2773 assert!(
2774 !outside_blobs.join(RAW_MIRROR_HASH_ALGORITHM).exists(),
2775 "raw mirror capture must not create redirected blob state outside data_dir"
2776 );
2777 assert!(
2778 !root.join("manifests").exists(),
2779 "failed blob publish must not write a manifest"
2780 );
2781 assert_eq!(fs::read(&source_path).expect("source bytes"), source_bytes);
2782 }
2783
2784 #[test]
2785 fn capture_source_file_rejects_non_file_sources() {
2786 let temp = tempfile::TempDir::new().expect("tempdir");
2787 let data_dir = temp.path().join("cass-data");
2788 let source_dir = temp.path().join("source-dir");
2789 fs::create_dir(&source_dir).expect("source dir");
2790
2791 let err = capture_source_file(RawMirrorCaptureInput {
2792 data_dir: &data_dir,
2793 provider: "codex",
2794 source_id: "local",
2795 origin_kind: "local",
2796 origin_host: None,
2797 source_path: &source_dir,
2798 db_links: &[],
2799 })
2800 .expect_err("directory source should be rejected");
2801 assert!(
2802 err.to_string().contains("non-file source"),
2803 "unexpected non-file-source error: {err}"
2804 );
2805 assert!(
2806 !data_dir.join(RAW_MIRROR_ROOT_DIR).exists(),
2807 "rejected non-file sources must not initialize raw mirror storage"
2808 );
2809 }
2810
2811 #[cfg(unix)]
2812 #[test]
2813 fn capture_source_file_rejects_unreadable_sources_without_manifest() {
2814 use std::os::unix::fs::PermissionsExt;
2815
2816 let temp = tempfile::TempDir::new().expect("tempdir");
2817 let data_dir = temp.path().join("cass-data");
2818 let source_path = temp.path().join("unreadable.jsonl");
2819 fs::write(&source_path, b"private session bytes\n").expect("source");
2820 fs::set_permissions(&source_path, fs::Permissions::from_mode(0o000))
2821 .expect("make source unreadable");
2822
2823 let err = capture_source_file(RawMirrorCaptureInput {
2824 data_dir: &data_dir,
2825 provider: "codex",
2826 source_id: "local",
2827 origin_kind: "local",
2828 origin_host: None,
2829 source_path: &source_path,
2830 db_links: &[],
2831 })
2832 .expect_err("unreadable source should be rejected");
2833 fs::set_permissions(&source_path, fs::Permissions::from_mode(0o600))
2834 .expect("restore source perms");
2835 assert!(
2836 err.to_string().contains("open raw mirror source"),
2837 "unexpected unreadable-source error: {err}"
2838 );
2839 assert!(
2840 !data_dir.join("raw-mirror/v1/manifests").exists(),
2841 "failed unreadable-source captures must not publish manifests"
2842 );
2843 }
2844
2845 #[cfg(unix)]
2846 #[test]
2847 fn capture_source_file_rejects_symlink_sources() {
2848 use std::os::unix::fs::symlink;
2849
2850 let temp = tempfile::TempDir::new().expect("tempdir");
2851 let data_dir = temp.path().join("cass-data");
2852 let real_source = temp.path().join("real.jsonl");
2853 let symlink_source = temp.path().join("link.jsonl");
2854 fs::write(&real_source, b"secret session").expect("write source");
2855 symlink(&real_source, &symlink_source).expect("symlink");
2856
2857 let err = capture_source_file(RawMirrorCaptureInput {
2858 data_dir: &data_dir,
2859 provider: "codex",
2860 source_id: "local",
2861 origin_kind: "local",
2862 origin_host: None,
2863 source_path: &symlink_source,
2864 db_links: &[],
2865 })
2866 .expect_err("symlink source should be rejected");
2867 assert!(
2868 err.to_string().contains("symlink source"),
2869 "unexpected error: {err:#}"
2870 );
2871 }
2872}