Skip to main content

coding_agent_search/
raw_mirror.rs

1use anyhow::{Context, Result, anyhow};
2use serde::{Deserialize, Serialize};
3use serde_json::{Value, json};
4use std::collections::{HashMap, HashSet};
5use std::fs::{self, File, OpenOptions};
6use std::io::{Read, Write};
7use std::path::{Component, Path, PathBuf};
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::{Mutex, OnceLock};
10use std::time::{Duration, SystemTime, UNIX_EPOCH};
11
12const RAW_MIRROR_SCHEMA_VERSION: u32 = 1;
13const RAW_MIRROR_ROOT_DIR: &str = "raw-mirror";
14const RAW_MIRROR_VERSION_DIR: &str = "v1";
15const RAW_MIRROR_MANIFEST_KIND: &str = "cass_raw_session_mirror_v1";
16const RAW_MIRROR_HASH_ALGORITHM: &str = "blake3";
17const RAW_MIRROR_BLOB_EXTENSION: &str = "raw";
18
19static TEMP_NONCE: AtomicU64 = AtomicU64::new(0);
20static BLOB_CAPTURE_CACHE: OnceLock<Mutex<HashMap<RawMirrorBlobCacheKey, RawMirrorBlobRecord>>> =
21    OnceLock::new();
22static MANIFEST_UPDATE_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
23
24#[derive(Debug, Clone)]
25pub struct RawMirrorCaptureInput<'a> {
26    pub data_dir: &'a Path,
27    pub provider: &'a str,
28    pub source_id: &'a str,
29    pub origin_kind: &'a str,
30    pub origin_host: Option<&'a str>,
31    pub source_path: &'a Path,
32    pub db_links: &'a [RawMirrorDbLink],
33}
34
35#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
36pub struct RawMirrorCaptureRecord {
37    pub manifest_id: String,
38    pub manifest_relative_path: String,
39    pub blob_relative_path: String,
40    pub blob_blake3: String,
41    pub blob_size_bytes: u64,
42    pub captured_at_ms: i64,
43    pub source_mtime_ms: Option<i64>,
44    pub already_present: bool,
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
48pub struct RawMirrorDbLink {
49    pub conversation_id: Option<i64>,
50    pub message_count: Option<usize>,
51    pub source_path: Option<String>,
52    pub started_at_ms: Option<i64>,
53}
54
55#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
56pub struct RawMirrorStorageSummary {
57    pub initialized: bool,
58    pub root_path: String,
59    pub total_storage_bytes: u64,
60    pub manifest_count: u64,
61    pub manifest_bytes: u64,
62    pub unique_blob_count: u64,
63    pub total_blob_bytes: u64,
64    pub largest_blob_bytes: u64,
65    pub missing_blob_count: u64,
66    pub invalid_manifest_count: u64,
67    pub oldest_capture_at_ms: Option<i64>,
68    pub newest_capture_at_ms: Option<i64>,
69    pub oldest_source_mtime_ms: Option<i64>,
70    pub newest_source_mtime_ms: Option<i64>,
71}
72
73pub fn storage_summary(data_dir: &Path) -> RawMirrorStorageSummary {
74    let root = raw_mirror_root(data_dir);
75    let mut summary = RawMirrorStorageSummary {
76        root_path: root.display().to_string(),
77        ..RawMirrorStorageSummary::default()
78    };
79    let root_metadata = match fs::symlink_metadata(&root) {
80        Ok(metadata) => metadata,
81        Err(_) => return summary,
82    };
83    summary.initialized = true;
84    if root_metadata.file_type().is_symlink() || !root_metadata.is_dir() {
85        summary.invalid_manifest_count = 1;
86        return summary;
87    }
88
89    summary.total_storage_bytes = raw_mirror_dir_file_bytes(&root);
90
91    let manifests_dir = root.join("manifests");
92    let Ok(manifests_metadata) = fs::symlink_metadata(&manifests_dir) else {
93        return summary;
94    };
95    if manifests_metadata.file_type().is_symlink() || !manifests_metadata.is_dir() {
96        summary.invalid_manifest_count = summary.invalid_manifest_count.saturating_add(1);
97        return summary;
98    }
99    let entries = match fs::read_dir(&manifests_dir) {
100        Ok(entries) => entries,
101        Err(_) => return summary,
102    };
103    let mut seen_blobs = HashSet::new();
104    for entry in entries {
105        let Ok(entry) = entry else {
106            summary.invalid_manifest_count = summary.invalid_manifest_count.saturating_add(1);
107            continue;
108        };
109        let path = entry.path();
110        let manifest_metadata = match fs::symlink_metadata(&path) {
111            Ok(metadata) if metadata.is_file() && !metadata.file_type().is_symlink() => metadata,
112            _ => {
113                summary.invalid_manifest_count = summary.invalid_manifest_count.saturating_add(1);
114                continue;
115            }
116        };
117        summary.manifest_bytes = summary
118            .manifest_bytes
119            .saturating_add(manifest_metadata.len());
120        let manifest = match read_raw_mirror_manifest(&path) {
121            Ok(manifest) if manifest.manifest_kind == RAW_MIRROR_MANIFEST_KIND => manifest,
122            _ => {
123                summary.invalid_manifest_count = summary.invalid_manifest_count.saturating_add(1);
124                continue;
125            }
126        };
127        summary.manifest_count = summary.manifest_count.saturating_add(1);
128        merge_min_max(
129            &mut summary.oldest_capture_at_ms,
130            &mut summary.newest_capture_at_ms,
131            Some(manifest.captured_at_ms),
132        );
133        merge_min_max(
134            &mut summary.oldest_source_mtime_ms,
135            &mut summary.newest_source_mtime_ms,
136            manifest.source_mtime_ms,
137        );
138
139        let Some(blob_relative_path) = raw_mirror_blob_relative_path(&manifest.blob_blake3) else {
140            summary.invalid_manifest_count = summary.invalid_manifest_count.saturating_add(1);
141            continue;
142        };
143        if manifest.blob_relative_path != blob_relative_path {
144            summary.invalid_manifest_count = summary.invalid_manifest_count.saturating_add(1);
145            continue;
146        }
147
148        if !seen_blobs.insert(blob_relative_path.clone()) {
149            continue;
150        }
151        let blob_path = root.join(blob_relative_path);
152        match fs::symlink_metadata(&blob_path) {
153            Ok(metadata) if metadata.is_file() && !metadata.file_type().is_symlink() => {
154                let size = metadata.len();
155                summary.unique_blob_count = summary.unique_blob_count.saturating_add(1);
156                summary.total_blob_bytes = summary.total_blob_bytes.saturating_add(size);
157                summary.largest_blob_bytes = summary.largest_blob_bytes.max(size);
158            }
159            _ => {
160                summary.missing_blob_count = summary.missing_blob_count.saturating_add(1);
161            }
162        }
163    }
164
165    summary
166}
167
168#[derive(Debug, Clone, Default)]
169pub struct RawMirrorPruneOptions {
170    pub older_than_ms: Option<i64>,
171    pub max_size_bytes: Option<u64>,
172    pub keep_tags: Vec<String>,
173    pub safety_hold_down_ms: i64,
174    pub apply: bool,
175}
176
177#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
178pub struct RawMirrorPruneReport {
179    pub initialized: bool,
180    pub root_path: String,
181    pub mode: String,
182    pub manifest_count: u64,
183    pub unique_blob_count: u64,
184    pub current_blob_bytes: u64,
185    pub safety_hold_down_ms: i64,
186    pub keep_tags: Vec<String>,
187    pub pinned_manifest_count: u64,
188    pub pinned_blob_count: u64,
189    pub planned_manifest_count: u64,
190    pub planned_blob_count: u64,
191    pub planned_reclaim_bytes: u64,
192    pub applied_manifest_count: u64,
193    pub applied_blob_count: u64,
194    pub applied_reclaim_bytes: u64,
195    pub audit_log_path: Option<String>,
196    pub entries: Vec<RawMirrorPruneEntry>,
197}
198
199#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
200pub struct RawMirrorPruneEntry {
201    pub kind: String,
202    pub path: String,
203    pub blob_blake3: Option<String>,
204    pub size_bytes: u64,
205    pub reason: String,
206    pub applied: bool,
207}
208
209#[derive(Debug, Clone)]
210struct RawMirrorPruneManifest {
211    manifest_id: String,
212    relative_path: String,
213    size_bytes: u64,
214    blob_blake3: String,
215    blob_relative_path: String,
216    blob_size_bytes: u64,
217    captured_at_ms: i64,
218    provider: String,
219    original_path: String,
220    db_links: Vec<RawMirrorDbLink>,
221}
222
223pub fn prune(data_dir: &Path, options: RawMirrorPruneOptions) -> Result<RawMirrorPruneReport> {
224    let root = raw_mirror_root(data_dir);
225    let mut report = RawMirrorPruneReport {
226        initialized: false,
227        root_path: root.display().to_string(),
228        mode: if options.apply {
229            "apply".to_string()
230        } else {
231            "dry-run".to_string()
232        },
233        manifest_count: 0,
234        unique_blob_count: 0,
235        current_blob_bytes: 0,
236        safety_hold_down_ms: options.safety_hold_down_ms,
237        keep_tags: options.keep_tags.clone(),
238        pinned_manifest_count: 0,
239        pinned_blob_count: 0,
240        planned_manifest_count: 0,
241        planned_blob_count: 0,
242        planned_reclaim_bytes: 0,
243        applied_manifest_count: 0,
244        applied_blob_count: 0,
245        applied_reclaim_bytes: 0,
246        audit_log_path: None,
247        entries: Vec::new(),
248    };
249
250    let metadata = match fs::symlink_metadata(&root) {
251        Ok(metadata) => metadata,
252        Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(report),
253        Err(err) => {
254            return Err(err).with_context(|| format!("stat raw mirror root {}", root.display()));
255        }
256    };
257    if metadata.file_type().is_symlink() || !metadata.is_dir() {
258        anyhow::bail!(
259            "refusing to prune invalid raw mirror root {}",
260            root.display()
261        );
262    }
263    report.initialized = true;
264
265    let manifests = collect_prune_manifests(&root)?;
266    report.manifest_count = manifests.len() as u64;
267
268    let mut blob_to_manifests: HashMap<String, Vec<String>> = HashMap::new();
269    let mut manifest_by_id: HashMap<String, &RawMirrorPruneManifest> = HashMap::new();
270    let mut blob_size_by_relative: HashMap<String, u64> = HashMap::new();
271    for manifest in &manifests {
272        manifest_by_id.insert(manifest.manifest_id.clone(), manifest);
273        blob_to_manifests
274            .entry(manifest.blob_relative_path.clone())
275            .or_default()
276            .push(manifest.manifest_id.clone());
277        blob_size_by_relative
278            .entry(manifest.blob_relative_path.clone())
279            .or_insert_with(|| {
280                blob_file_size(&root.join(&manifest.blob_relative_path))
281                    .unwrap_or(manifest.blob_size_bytes)
282            });
283    }
284    report.unique_blob_count = blob_size_by_relative.len() as u64;
285    report.current_blob_bytes = blob_size_by_relative
286        .values()
287        .copied()
288        .fold(0u64, u64::saturating_add);
289
290    let now = now_ms();
291    let pinned_manifests = pinned_prune_manifest_ids(
292        data_dir,
293        &manifests,
294        &options.keep_tags,
295        options.safety_hold_down_ms,
296        now,
297    )?;
298    report.pinned_manifest_count = pinned_manifests.len() as u64;
299    let pinned_blobs: HashSet<String> = blob_to_manifests
300        .iter()
301        .filter(|(_, manifest_ids)| manifest_ids.iter().any(|id| pinned_manifests.contains(id)))
302        .map(|(blob_relative_path, _)| blob_relative_path.clone())
303        .collect();
304    report.pinned_blob_count = pinned_blobs.len() as u64;
305
306    let mut selected_manifests: HashSet<String> = HashSet::new();
307    let mut manifest_reasons: HashMap<String, String> = HashMap::new();
308
309    if let Some(older_than_ms) = options.older_than_ms {
310        let cutoff_ms = now.saturating_sub(older_than_ms.max(0));
311        for manifest in &manifests {
312            if manifest.captured_at_ms <= cutoff_ms
313                && !pinned_manifests.contains(&manifest.manifest_id)
314            {
315                selected_manifests.insert(manifest.manifest_id.clone());
316                manifest_reasons
317                    .entry(manifest.manifest_id.clone())
318                    .or_insert_with(|| format!("captured_at_ms <= {cutoff_ms}"));
319            }
320        }
321    }
322
323    if let Some(max_size_bytes) = options.max_size_bytes
324        && report.current_blob_bytes > max_size_bytes
325    {
326        let mut blob_groups: Vec<_> = blob_to_manifests
327            .iter()
328            .map(|(blob_relative_path, manifest_ids)| {
329                let oldest_capture = manifest_ids
330                    .iter()
331                    .filter_map(|id| manifest_by_id.get(id).map(|m| m.captured_at_ms))
332                    .min()
333                    .unwrap_or(i64::MAX);
334                let size = blob_size_by_relative
335                    .get(blob_relative_path)
336                    .copied()
337                    .unwrap_or(0);
338                (
339                    blob_relative_path.clone(),
340                    manifest_ids.clone(),
341                    oldest_capture,
342                    size,
343                )
344            })
345            .collect::<Vec<_>>();
346        blob_groups.sort_by(|left, right| left.2.cmp(&right.2).then_with(|| left.0.cmp(&right.0)));
347
348        let mut projected_bytes = report.current_blob_bytes;
349        for (blob_relative_path, manifest_ids, _, size) in blob_groups {
350            if projected_bytes <= max_size_bytes {
351                break;
352            }
353            if pinned_blobs.contains(&blob_relative_path) {
354                continue;
355            }
356            for manifest_id in manifest_ids {
357                if !pinned_manifests.contains(&manifest_id) {
358                    selected_manifests.insert(manifest_id.clone());
359                    manifest_reasons.entry(manifest_id).or_insert_with(|| {
360                        format!("max-size over budget; retiring blob {blob_relative_path}")
361                    });
362                }
363            }
364            projected_bytes = projected_bytes.saturating_sub(size);
365        }
366    }
367
368    let selected_blobs: HashSet<String> = blob_to_manifests
369        .iter()
370        .filter(|(_, manifest_ids)| {
371            manifest_ids
372                .iter()
373                .all(|id| selected_manifests.contains(id))
374        })
375        .map(|(blob_relative_path, _)| blob_relative_path.clone())
376        .collect();
377
378    let mut entries = Vec::new();
379    let mut selected_manifest_ids = selected_manifests.into_iter().collect::<Vec<_>>();
380    selected_manifest_ids.sort();
381    for manifest_id in selected_manifest_ids {
382        let Some(manifest) = manifest_by_id.get(&manifest_id) else {
383            continue;
384        };
385        let reason = manifest_reasons
386            .remove(&manifest_id)
387            .unwrap_or_else(|| "selected by retention policy".to_string());
388        entries.push(RawMirrorPruneEntry {
389            kind: "manifest".to_string(),
390            path: manifest.relative_path.clone(),
391            blob_blake3: Some(manifest.blob_blake3.clone()),
392            size_bytes: manifest.size_bytes,
393            reason,
394            applied: false,
395        });
396    }
397
398    let mut selected_blob_paths = selected_blobs.into_iter().collect::<Vec<_>>();
399    selected_blob_paths.sort();
400    for blob_relative_path in selected_blob_paths {
401        let size = blob_size_by_relative
402            .get(&blob_relative_path)
403            .copied()
404            .unwrap_or(0);
405        let blob_blake3 = blob_relative_path
406            .rsplit('/')
407            .next()
408            .and_then(|name| name.strip_suffix(".raw"))
409            .map(ToOwned::to_owned);
410        entries.push(RawMirrorPruneEntry {
411            kind: "blob".to_string(),
412            path: blob_relative_path,
413            blob_blake3,
414            size_bytes: size,
415            reason: "no retained manifest references this blob after prune plan".to_string(),
416            applied: false,
417        });
418    }
419
420    report.planned_manifest_count = entries
421        .iter()
422        .filter(|entry| entry.kind == "manifest")
423        .count() as u64;
424    report.planned_blob_count = entries.iter().filter(|entry| entry.kind == "blob").count() as u64;
425    report.planned_reclaim_bytes = entries
426        .iter()
427        .map(|entry| entry.size_bytes)
428        .fold(0, u64::saturating_add);
429
430    if options.apply {
431        for entry in &mut entries {
432            let path = root.join(&entry.path);
433            let removed = remove_prune_target_file(&path)
434                .with_context(|| format!("applying raw mirror prune for {}", path.display()))?;
435            entry.applied = removed;
436            if removed {
437                if entry.kind == "manifest" {
438                    report.applied_manifest_count = report.applied_manifest_count.saturating_add(1);
439                } else if entry.kind == "blob" {
440                    report.applied_blob_count = report.applied_blob_count.saturating_add(1);
441                }
442                report.applied_reclaim_bytes = report
443                    .applied_reclaim_bytes
444                    .saturating_add(entry.size_bytes);
445            }
446        }
447    }
448
449    report.entries = entries;
450    if !report.entries.is_empty() {
451        let audit_path = append_prune_audit_log(&root, &report)?;
452        report.audit_log_path = Some(audit_path.display().to_string());
453    }
454    Ok(report)
455}
456
457fn collect_prune_manifests(root: &Path) -> Result<Vec<RawMirrorPruneManifest>> {
458    let manifests_dir = root.join("manifests");
459    let metadata = match fs::symlink_metadata(&manifests_dir) {
460        Ok(metadata) => metadata,
461        Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
462        Err(err) => return Err(err).with_context(|| format!("stat {}", manifests_dir.display())),
463    };
464    if metadata.file_type().is_symlink() || !metadata.is_dir() {
465        anyhow::bail!(
466            "refusing to prune invalid raw mirror manifests directory {}",
467            manifests_dir.display()
468        );
469    }
470
471    let mut manifests = Vec::new();
472    for entry in
473        fs::read_dir(&manifests_dir).with_context(|| format!("read {}", manifests_dir.display()))?
474    {
475        let entry = entry?;
476        let path = entry.path();
477        if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
478            continue;
479        }
480        let manifest_metadata = fs::symlink_metadata(&path)
481            .with_context(|| format!("stat raw mirror manifest {}", path.display()))?;
482        if manifest_metadata.file_type().is_symlink() || !manifest_metadata.is_file() {
483            anyhow::bail!(
484                "refusing to prune with non-regular raw mirror manifest {}",
485                path.display()
486            );
487        }
488        let manifest = read_raw_mirror_manifest(&path)?;
489        if manifest.manifest_kind != RAW_MIRROR_MANIFEST_KIND {
490            anyhow::bail!(
491                "refusing to prune with unexpected raw mirror manifest kind `{}` in {}",
492                manifest.manifest_kind,
493                path.display()
494            );
495        }
496        let Some(expected_blob_relative_path) =
497            raw_mirror_blob_relative_path(&manifest.blob_blake3)
498        else {
499            anyhow::bail!(
500                "refusing to prune raw mirror manifest {} with invalid blob hash",
501                path.display()
502            );
503        };
504        if manifest.blob_relative_path != expected_blob_relative_path {
505            anyhow::bail!(
506                "refusing to prune raw mirror manifest {} with unexpected blob path `{}`",
507                path.display(),
508                manifest.blob_relative_path
509            );
510        }
511        let relative_path = path
512            .strip_prefix(root)
513            .unwrap_or(&path)
514            .display()
515            .to_string();
516        manifests.push(RawMirrorPruneManifest {
517            manifest_id: manifest.manifest_id,
518            relative_path,
519            size_bytes: manifest_metadata.len(),
520            blob_blake3: manifest.blob_blake3,
521            blob_relative_path: manifest.blob_relative_path,
522            blob_size_bytes: manifest.blob_size_bytes,
523            captured_at_ms: manifest.captured_at_ms,
524            provider: manifest.provider,
525            original_path: manifest.original_path,
526            db_links: manifest.db_links,
527        });
528    }
529    manifests.sort_by(|left, right| {
530        left.captured_at_ms
531            .cmp(&right.captured_at_ms)
532            .then_with(|| left.provider.cmp(&right.provider))
533            .then_with(|| left.original_path.cmp(&right.original_path))
534            .then_with(|| left.manifest_id.cmp(&right.manifest_id))
535    });
536    Ok(manifests)
537}
538
539fn pinned_prune_manifest_ids(
540    data_dir: &Path,
541    manifests: &[RawMirrorPruneManifest],
542    keep_tags: &[String],
543    safety_hold_down_ms: i64,
544    now_ms: i64,
545) -> Result<HashSet<String>> {
546    let mut pinned = HashSet::new();
547    if safety_hold_down_ms > 0 {
548        let cutoff_ms = now_ms.saturating_sub(safety_hold_down_ms);
549        for manifest in manifests {
550            if manifest.captured_at_ms > cutoff_ms {
551                pinned.insert(manifest.manifest_id.clone());
552            }
553        }
554    }
555
556    let normalized_keep_tags = keep_tags
557        .iter()
558        .map(|tag| tag.trim())
559        .filter(|tag| !tag.is_empty())
560        .map(ToOwned::to_owned)
561        .collect::<Vec<_>>();
562    if normalized_keep_tags.is_empty() {
563        return Ok(pinned);
564    }
565
566    let keep_tag_conversation_ids =
567        load_keep_tag_conversation_ids(data_dir, manifests, &normalized_keep_tags)?;
568    for manifest in manifests {
569        if manifest.db_links.iter().any(|link| {
570            link.conversation_id
571                .is_some_and(|id| keep_tag_conversation_ids.contains(&id))
572        }) {
573            pinned.insert(manifest.manifest_id.clone());
574        }
575    }
576    Ok(pinned)
577}
578
579fn load_keep_tag_conversation_ids(
580    data_dir: &Path,
581    manifests: &[RawMirrorPruneManifest],
582    keep_tags: &[String],
583) -> Result<HashSet<i64>> {
584    use frankensqlite::compat::{ConnectionExt as _, ParamValue, RowExt as _};
585
586    let mut conversation_ids = manifests
587        .iter()
588        .flat_map(|manifest| manifest.db_links.iter())
589        .filter_map(|link| link.conversation_id)
590        .collect::<Vec<_>>();
591    conversation_ids.sort_unstable();
592    conversation_ids.dedup();
593    if conversation_ids.is_empty() {
594        return Ok(HashSet::new());
595    }
596
597    let db_path = data_dir.join("agent_search.db");
598    let conn = crate::storage::sqlite::open_franken_raw_readonly_connection_with_timeout(
599        &db_path,
600        Duration::from_secs(30),
601    )
602    .with_context(|| {
603        format!(
604            "open {} to honor raw-mirror prune --keep-tag",
605            db_path.display()
606        )
607    })?;
608    let _ = conn.execute("PRAGMA query_only = 1;");
609
610    let mut pinned = HashSet::new();
611    for id_chunk in conversation_ids.chunks(400) {
612        let tag_placeholders = (0..keep_tags.len())
613            .map(|idx| format!("?{}", idx + 1))
614            .collect::<Vec<_>>()
615            .join(", ");
616        let id_offset = keep_tags.len();
617        let id_placeholders = (0..id_chunk.len())
618            .map(|idx| format!("?{}", id_offset + idx + 1))
619            .collect::<Vec<_>>()
620            .join(", ");
621        let sql = format!(
622            "SELECT DISTINCT ct.conversation_id \
623             FROM conversation_tags ct \
624             JOIN tags t ON t.id = ct.tag_id \
625             WHERE t.name IN ({tag_placeholders}) \
626               AND ct.conversation_id IN ({id_placeholders})"
627        );
628        let mut params = keep_tags
629            .iter()
630            .map(|tag| ParamValue::from(tag.as_str()))
631            .collect::<Vec<_>>();
632        params.extend(id_chunk.iter().copied().map(ParamValue::from));
633        let rows: Vec<i64> = conn
634            .query_map_collect(&sql, &params, |row: &frankensqlite::Row| row.get_typed(0))
635            .with_context(|| "query raw-mirror prune keep-tag conversation pins")?;
636        pinned.extend(rows);
637    }
638
639    Ok(pinned)
640}
641
642fn blob_file_size(path: &Path) -> Option<u64> {
643    fs::symlink_metadata(path)
644        .ok()
645        .filter(|metadata| metadata.is_file() && !metadata.file_type().is_symlink())
646        .map(|metadata| metadata.len())
647}
648
649fn remove_prune_target_file(path: &Path) -> Result<bool> {
650    let metadata = match fs::symlink_metadata(path) {
651        Ok(metadata) => metadata,
652        Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(false),
653        Err(err) => return Err(err).with_context(|| format!("stat {}", path.display())),
654    };
655    if metadata.file_type().is_symlink() || !metadata.is_file() {
656        anyhow::bail!(
657            "refusing to prune non-regular raw mirror file {}",
658            path.display()
659        );
660    }
661    fs::remove_file(path).with_context(|| format!("remove raw mirror file {}", path.display()))?;
662    sync_parent(path)?;
663    Ok(true)
664}
665
666fn append_prune_audit_log(root: &Path, report: &RawMirrorPruneReport) -> Result<PathBuf> {
667    ensure_private_dir(root)?;
668    let audit_path = root.join("pruned.jsonl");
669    ensure_prune_audit_log_appendable(&audit_path)?;
670    let mut file = OpenOptions::new()
671        .create(true)
672        .append(true)
673        .open(&audit_path)
674        .with_context(|| format!("open raw mirror prune audit {}", audit_path.display()))?;
675    set_private_file_permissions(&audit_path)?;
676    let now = now_ms();
677    for entry in &report.entries {
678        let record = json!({
679            "schema_version": 1,
680            "recorded_at_ms": now,
681            "mode": report.mode,
682            "kind": entry.kind,
683            "path": entry.path,
684            "blob_blake3": entry.blob_blake3,
685            "size_bytes": entry.size_bytes,
686            "reason": entry.reason,
687            "applied": entry.applied,
688        });
689        writeln!(file, "{record}")
690            .with_context(|| format!("write raw mirror prune audit {}", audit_path.display()))?;
691    }
692    file.sync_all()
693        .with_context(|| format!("sync raw mirror prune audit {}", audit_path.display()))?;
694    sync_parent(&audit_path)?;
695    Ok(audit_path)
696}
697
698fn ensure_prune_audit_log_appendable(path: &Path) -> Result<()> {
699    match fs::symlink_metadata(path) {
700        Ok(metadata) if metadata.file_type().is_symlink() => {
701            anyhow::bail!(
702                "refusing to append raw mirror prune audit through symlink {}",
703                path.display()
704            );
705        }
706        Ok(metadata) if !metadata.is_file() => {
707            anyhow::bail!(
708                "refusing to append raw mirror prune audit to non-file {}",
709                path.display()
710            );
711        }
712        Ok(_) => Ok(()),
713        Err(err) if matches!(err.kind(), std::io::ErrorKind::NotFound) => Ok(()),
714        Err(err) => Err(err).with_context(|| {
715            format!(
716                "inspect raw mirror prune audit before append {}",
717                path.display()
718            )
719        }),
720    }
721}
722
723fn merge_min_max(min: &mut Option<i64>, max: &mut Option<i64>, value: Option<i64>) {
724    let Some(value) = value else {
725        return;
726    };
727    *min = Some(min.map_or(value, |current| current.min(value)));
728    *max = Some(max.map_or(value, |current| current.max(value)));
729}
730
731fn raw_mirror_dir_file_bytes(root: &Path) -> u64 {
732    let mut total = 0u64;
733    let mut stack = vec![root.to_path_buf()];
734    while let Some(path) = stack.pop() {
735        let Ok(metadata) = fs::symlink_metadata(&path) else {
736            continue;
737        };
738        if metadata.file_type().is_symlink() {
739            continue;
740        }
741        if metadata.is_file() {
742            total = total.saturating_add(metadata.len());
743        } else if metadata.is_dir() {
744            let Ok(entries) = fs::read_dir(&path) else {
745                continue;
746            };
747            for entry in entries.flatten() {
748                stack.push(entry.path());
749            }
750        }
751    }
752    total
753}
754
755#[derive(Debug, Clone, PartialEq, Eq, Hash)]
756struct RawMirrorBlobCacheKey {
757    data_dir: PathBuf,
758    source_path: PathBuf,
759    source_identity: Option<String>,
760    source_size_bytes: u64,
761    source_mtime_ns: Option<u128>,
762    source_change_time_ns: Option<u128>,
763}
764
765#[derive(Debug, Clone, PartialEq, Eq)]
766struct RawMirrorBlobRecord {
767    blob_blake3: String,
768    bytes_copied: u64,
769}
770
771#[derive(Debug, Clone, Serialize, Deserialize)]
772struct RawMirrorCompressionEnvelope {
773    state: String,
774    algorithm: Option<String>,
775    uncompressed_size_bytes: Option<u64>,
776}
777
778#[derive(Debug, Clone, Serialize, Deserialize)]
779struct RawMirrorEncryptionEnvelope {
780    state: String,
781    algorithm: Option<String>,
782    key_id: Option<String>,
783    envelope_version: Option<u32>,
784}
785
786#[derive(Debug, Clone, Serialize, Deserialize)]
787struct RawMirrorVerificationRecord {
788    status: String,
789    verifier: String,
790    content_blake3: Option<String>,
791    verified_at_ms: Option<i64>,
792}
793
794#[derive(Debug, Clone, Serialize, Deserialize)]
795struct RawMirrorManifestFile {
796    schema_version: u32,
797    manifest_kind: String,
798    manifest_id: String,
799    blob_hash_algorithm: String,
800    blob_relative_path: String,
801    blob_blake3: String,
802    blob_size_bytes: u64,
803    provider: String,
804    source_id: String,
805    origin_kind: String,
806    origin_host: Option<String>,
807    original_path: String,
808    redacted_original_path: String,
809    original_path_blake3: String,
810    captured_at_ms: i64,
811    source_mtime_ms: Option<i64>,
812    source_size_bytes: u64,
813    compression: RawMirrorCompressionEnvelope,
814    encryption: RawMirrorEncryptionEnvelope,
815    db_links: Vec<RawMirrorDbLink>,
816    verification: RawMirrorVerificationRecord,
817    manifest_blake3: Option<String>,
818}
819
820pub fn capture_source_file(input: RawMirrorCaptureInput<'_>) -> Result<RawMirrorCaptureRecord> {
821    let source_metadata = fs::symlink_metadata(input.source_path)
822        .with_context(|| format!("stat raw mirror source {}", input.source_path.display()))?;
823    if source_metadata.file_type().is_symlink() {
824        return Err(anyhow!(
825            "refusing to raw-mirror symlink source {}",
826            input.source_path.display()
827        ));
828    }
829    if !source_metadata.is_file() {
830        return Err(anyhow!(
831            "refusing to raw-mirror non-file source {}",
832            input.source_path.display()
833        ));
834    }
835
836    let root = ensure_raw_mirror_root(input.data_dir)?;
837    ensure_private_dir_descendant(&root, &root.join("tmp"))?;
838
839    let cache_key = raw_mirror_blob_cache_key(&input, &source_metadata);
840    let (blob_blake3, bytes_copied, blob_already_present) =
841        match cached_raw_mirror_blob_record(&cache_key, &root) {
842            Some(record) => (record.blob_blake3, record.bytes_copied, true),
843            None => {
844                let temp_dir = unique_capture_temp_dir(&root);
845                ensure_private_dir_descendant(&root, &temp_dir)?;
846                let CopyToTempResult {
847                    temp_path,
848                    blob_blake3,
849                    bytes_copied,
850                } = copy_source_to_private_temp(input.source_path, &temp_dir, &source_metadata)?;
851                let blob_relative_path = raw_mirror_blob_relative_path(&blob_blake3)
852                    .ok_or_else(|| anyhow!("computed invalid raw mirror blake3 digest"))?;
853                let blob_path = root.join(&blob_relative_path);
854                let already_present =
855                    publish_content_addressed_temp(&root, &temp_path, &blob_path, &blob_blake3)?;
856                remove_empty_temp_dir_best_effort(&temp_dir);
857                cache_raw_mirror_blob_record(
858                    cache_key.clone(),
859                    RawMirrorBlobRecord {
860                        blob_blake3: blob_blake3.clone(),
861                        bytes_copied,
862                    },
863                );
864                (blob_blake3, bytes_copied, already_present)
865            }
866        };
867    let blob_relative_path = raw_mirror_blob_relative_path(&blob_blake3)
868        .ok_or_else(|| anyhow!("computed invalid raw mirror blake3 digest"))?;
869
870    let original_path = input.source_path.display().to_string();
871    let original_path_blake3 = raw_mirror_original_path_blake3(&original_path);
872    let manifest_id = raw_mirror_manifest_id(
873        input.provider,
874        input.source_id,
875        input.origin_kind,
876        input.origin_host,
877        &original_path_blake3,
878        &blob_blake3,
879    );
880    let manifest_relative_path = raw_mirror_manifest_relative_path(&manifest_id);
881    let manifest_path = root.join(&manifest_relative_path);
882    let captured_at_ms = now_ms();
883    let source_mtime_ms = source_metadata.modified().ok().and_then(system_time_to_ms);
884    let mut manifest = RawMirrorManifestFile {
885        schema_version: RAW_MIRROR_SCHEMA_VERSION,
886        manifest_kind: RAW_MIRROR_MANIFEST_KIND.to_string(),
887        manifest_id: manifest_id.clone(),
888        blob_hash_algorithm: RAW_MIRROR_HASH_ALGORITHM.to_string(),
889        blob_relative_path: blob_relative_path.clone(),
890        blob_blake3: blob_blake3.clone(),
891        blob_size_bytes: bytes_copied,
892        provider: input.provider.to_string(),
893        source_id: input.source_id.to_string(),
894        origin_kind: input.origin_kind.to_string(),
895        origin_host: input.origin_host.map(ToOwned::to_owned),
896        original_path,
897        redacted_original_path: redacted_original_path(input.provider, input.source_path),
898        original_path_blake3,
899        captured_at_ms,
900        source_mtime_ms,
901        source_size_bytes: source_metadata.len(),
902        compression: RawMirrorCompressionEnvelope {
903            state: "none".to_string(),
904            algorithm: None,
905            uncompressed_size_bytes: Some(bytes_copied),
906        },
907        encryption: RawMirrorEncryptionEnvelope {
908            state: "none".to_string(),
909            algorithm: None,
910            key_id: None,
911            envelope_version: None,
912        },
913        db_links: unique_db_links(input.db_links),
914        verification: RawMirrorVerificationRecord {
915            status: "captured".to_string(),
916            verifier: "cass_indexer".to_string(),
917            content_blake3: Some(blob_blake3.clone()),
918            verified_at_ms: Some(captured_at_ms),
919        },
920        manifest_blake3: None,
921    };
922    manifest.manifest_blake3 = Some(raw_mirror_manifest_blake3(&manifest));
923    let manifest_bytes = serde_json::to_vec_pretty(&manifest)?;
924    let manifest_already_present =
925        publish_manifest_bytes_create_new(&root, &manifest_path, &manifest_bytes, &blob_blake3)?;
926    let (record_blob_size_bytes, record_captured_at_ms, record_source_mtime_ms) =
927        if manifest_already_present {
928            merge_raw_mirror_manifest_db_links(
929                &root,
930                &manifest_path,
931                input.db_links,
932                Some(&blob_blake3),
933            )?;
934            let published = read_raw_mirror_manifest(&manifest_path)?;
935            (
936                published.blob_size_bytes,
937                published.captured_at_ms,
938                published.source_mtime_ms,
939            )
940        } else {
941            (bytes_copied, captured_at_ms, source_mtime_ms)
942        };
943
944    Ok(RawMirrorCaptureRecord {
945        manifest_id,
946        manifest_relative_path,
947        blob_relative_path,
948        blob_blake3,
949        blob_size_bytes: record_blob_size_bytes,
950        captured_at_ms: record_captured_at_ms,
951        source_mtime_ms: record_source_mtime_ms,
952        already_present: blob_already_present && manifest_already_present,
953    })
954}
955
956pub fn merge_manifest_db_links(
957    data_dir: &Path,
958    manifest_relative_path: &str,
959    links: &[RawMirrorDbLink],
960) -> Result<()> {
961    if links.is_empty() {
962        return Ok(());
963    }
964    let root = raw_mirror_root(data_dir);
965    let manifest_path = raw_mirror_manifest_path_from_relative(&root, manifest_relative_path)?;
966    merge_raw_mirror_manifest_db_links(&root, &manifest_path, links, None)
967}
968
969struct CopyToTempResult {
970    temp_path: PathBuf,
971    blob_blake3: String,
972    bytes_copied: u64,
973}
974
975fn copy_source_to_private_temp(
976    source_path: &Path,
977    temp_dir: &Path,
978    source_metadata: &fs::Metadata,
979) -> Result<CopyToTempResult> {
980    let temp_path = unique_temp_path(temp_dir, "blob");
981    let mut source = open_stable_source_file(source_path, source_metadata)?;
982    let mut temp = private_create_new_file(&temp_path)?;
983    let mut hasher = blake3::Hasher::new();
984    let mut buffer = [0u8; 64 * 1024];
985    let mut bytes_copied = 0u64;
986    loop {
987        let read = source
988            .read(&mut buffer)
989            .with_context(|| format!("read raw mirror source {}", source_path.display()))?;
990        if read == 0 {
991            break;
992        }
993        temp.write_all(&buffer[..read])
994            .with_context(|| format!("write raw mirror temp {}", temp_path.display()))?;
995        hasher.update(&buffer[..read]);
996        bytes_copied = bytes_copied.saturating_add(read as u64);
997    }
998    temp.sync_all()
999        .with_context(|| format!("sync raw mirror temp {}", temp_path.display()))?;
1000
1001    let final_source_metadata = source
1002        .metadata()
1003        .with_context(|| format!("stat opened raw mirror source {}", source_path.display()))?;
1004    if source_file_changed_during_capture(source_metadata, &final_source_metadata) {
1005        remove_temp_best_effort(&temp_path);
1006        return Err(anyhow!(
1007            "raw mirror source {} changed while it was being captured; retry indexing to capture a stable copy",
1008            source_path.display()
1009        ));
1010    }
1011
1012    Ok(CopyToTempResult {
1013        temp_path,
1014        blob_blake3: hasher.finalize().to_hex().to_string(),
1015        bytes_copied,
1016    })
1017}
1018
1019fn open_stable_source_file(source_path: &Path, expected_metadata: &fs::Metadata) -> Result<File> {
1020    let source = File::open(source_path)
1021        .with_context(|| format!("open raw mirror source {}", source_path.display()))?;
1022    let opened_metadata = source
1023        .metadata()
1024        .with_context(|| format!("stat opened raw mirror source {}", source_path.display()))?;
1025    if !same_source_identity(expected_metadata, &opened_metadata) {
1026        return Err(anyhow!(
1027            "raw mirror source {} changed identity before capture",
1028            source_path.display()
1029        ));
1030    }
1031    let current_path_metadata = fs::symlink_metadata(source_path)
1032        .with_context(|| format!("restat raw mirror source {}", source_path.display()))?;
1033    if current_path_metadata.file_type().is_symlink() {
1034        return Err(anyhow!(
1035            "refusing to raw-mirror symlink source {}",
1036            source_path.display()
1037        ));
1038    }
1039    if !same_source_identity(expected_metadata, &current_path_metadata) {
1040        return Err(anyhow!(
1041            "raw mirror source {} changed identity before capture",
1042            source_path.display()
1043        ));
1044    }
1045    Ok(source)
1046}
1047
1048#[cfg(unix)]
1049fn same_source_identity(expected: &fs::Metadata, actual: &fs::Metadata) -> bool {
1050    use std::os::unix::fs::MetadataExt;
1051    actual.is_file() && expected.dev() == actual.dev() && expected.ino() == actual.ino()
1052}
1053
1054#[cfg(not(unix))]
1055fn same_source_identity(_expected: &fs::Metadata, actual: &fs::Metadata) -> bool {
1056    actual.is_file()
1057}
1058
1059#[cfg(unix)]
1060fn source_identity_token(metadata: &fs::Metadata) -> Option<String> {
1061    use std::os::unix::fs::MetadataExt;
1062    Some(format!("{}:{}", metadata.dev(), metadata.ino()))
1063}
1064
1065#[cfg(not(unix))]
1066fn source_identity_token(_metadata: &fs::Metadata) -> Option<String> {
1067    None
1068}
1069
1070#[cfg(unix)]
1071fn source_change_time_ns(metadata: &fs::Metadata) -> Option<u128> {
1072    use std::os::unix::fs::MetadataExt;
1073
1074    let seconds = u128::try_from(metadata.ctime()).ok()?;
1075    let nanoseconds = u128::try_from(metadata.ctime_nsec()).ok()?;
1076    Some(
1077        seconds
1078            .saturating_mul(1_000_000_000)
1079            .saturating_add(nanoseconds),
1080    )
1081}
1082
1083#[cfg(not(unix))]
1084fn source_change_time_ns(_metadata: &fs::Metadata) -> Option<u128> {
1085    None
1086}
1087
1088fn source_file_changed_during_capture(
1089    initial: &fs::Metadata,
1090    final_metadata: &fs::Metadata,
1091) -> bool {
1092    if initial.len() != final_metadata.len() {
1093        return true;
1094    }
1095    match (initial.modified().ok(), final_metadata.modified().ok()) {
1096        (Some(initial_mtime), Some(final_mtime)) => initial_mtime != final_mtime,
1097        _ => false,
1098    }
1099}
1100
1101fn publish_content_addressed_temp(
1102    root: &Path,
1103    temp_path: &Path,
1104    final_path: &Path,
1105    expected_blake3: &str,
1106) -> Result<bool> {
1107    ensure_private_dir_descendant(
1108        root,
1109        final_path
1110            .parent()
1111            .ok_or_else(|| anyhow!("raw mirror blob path has no parent"))?,
1112    )?;
1113    if final_path.exists() {
1114        verify_existing_file(final_path, expected_blake3)?;
1115        remove_temp_best_effort(temp_path);
1116        return Ok(true);
1117    }
1118
1119    match fs::hard_link(temp_path, final_path) {
1120        Ok(()) => {
1121            sync_file(final_path)?;
1122            sync_parent(final_path)?;
1123            remove_temp_best_effort(temp_path);
1124            Ok(false)
1125        }
1126        Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {
1127            verify_existing_file(final_path, expected_blake3)?;
1128            remove_temp_best_effort(temp_path);
1129            Ok(true)
1130        }
1131        Err(err) => Err(anyhow!(
1132            "publish raw mirror blob {} from {}: {err}",
1133            final_path.display(),
1134            temp_path.display()
1135        )),
1136    }
1137}
1138
1139fn publish_manifest_bytes_create_new(
1140    root: &Path,
1141    manifest_path: &Path,
1142    manifest_bytes: &[u8],
1143    blob_blake3: &str,
1144) -> Result<bool> {
1145    ensure_private_dir_descendant(
1146        root,
1147        manifest_path
1148            .parent()
1149            .ok_or_else(|| anyhow!("raw mirror manifest path has no parent"))?,
1150    )?;
1151    if manifest_path.exists() {
1152        verify_existing_manifest(manifest_path, blob_blake3)?;
1153        return Ok(true);
1154    }
1155
1156    let temp_dir = unique_capture_temp_dir(root);
1157    ensure_private_dir_descendant(root, &temp_dir)?;
1158    let temp_path = unique_temp_path(&temp_dir, "manifest");
1159    let mut temp = private_create_new_file(&temp_path)?;
1160    temp.write_all(manifest_bytes)
1161        .with_context(|| format!("write raw mirror manifest temp {}", temp_path.display()))?;
1162    temp.sync_all()
1163        .with_context(|| format!("sync raw mirror manifest temp {}", temp_path.display()))?;
1164
1165    match fs::hard_link(&temp_path, manifest_path) {
1166        Ok(()) => {
1167            sync_file(manifest_path)?;
1168            sync_parent(manifest_path)?;
1169            remove_temp_best_effort(&temp_path);
1170            remove_empty_temp_dir_best_effort(&temp_dir);
1171            Ok(false)
1172        }
1173        Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {
1174            verify_existing_manifest(manifest_path, blob_blake3)?;
1175            remove_temp_best_effort(&temp_path);
1176            remove_empty_temp_dir_best_effort(&temp_dir);
1177            Ok(true)
1178        }
1179        Err(err) => Err(anyhow!(
1180            "publish raw mirror manifest {} from {}: {err}",
1181            manifest_path.display(),
1182            temp_path.display()
1183        )),
1184    }
1185}
1186
1187fn merge_raw_mirror_manifest_db_links(
1188    root: &Path,
1189    manifest_path: &Path,
1190    links: &[RawMirrorDbLink],
1191    expected_blob_blake3: Option<&str>,
1192) -> Result<()> {
1193    if links.is_empty() {
1194        return Ok(());
1195    }
1196
1197    let lock = MANIFEST_UPDATE_LOCK.get_or_init(|| Mutex::new(()));
1198    let _guard = lock
1199        .lock()
1200        .map_err(|_| anyhow!("raw mirror manifest update lock poisoned"))?;
1201
1202    let mut manifest = read_raw_mirror_manifest(manifest_path)?;
1203    if let Some(expected_blob_blake3) = expected_blob_blake3
1204        && manifest.blob_blake3 != expected_blob_blake3
1205    {
1206        return Err(anyhow!(
1207            "existing raw mirror manifest {} points at blob {}, expected {}",
1208            manifest_path.display(),
1209            manifest.blob_blake3,
1210            expected_blob_blake3
1211        ));
1212    }
1213
1214    let mut merged_links = manifest.db_links.clone();
1215    merged_links.extend_from_slice(links);
1216    let merged_links = unique_db_links(&merged_links);
1217    if merged_links == manifest.db_links {
1218        return Ok(());
1219    }
1220
1221    manifest.db_links = merged_links;
1222    manifest.manifest_blake3 = Some(raw_mirror_manifest_blake3(&manifest));
1223    let manifest_bytes = serde_json::to_vec_pretty(&manifest)?;
1224    replace_manifest_bytes(root, manifest_path, &manifest_bytes)
1225}
1226
1227fn replace_manifest_bytes(root: &Path, manifest_path: &Path, manifest_bytes: &[u8]) -> Result<()> {
1228    ensure_private_dir_descendant(
1229        root,
1230        manifest_path
1231            .parent()
1232            .ok_or_else(|| anyhow!("raw mirror manifest path has no parent"))?,
1233    )?;
1234    let temp_dir = unique_capture_temp_dir(root);
1235    ensure_private_dir_descendant(root, &temp_dir)?;
1236    let temp_path = unique_temp_path(&temp_dir, "manifest-update");
1237    let mut temp = private_create_new_file(&temp_path)?;
1238    temp.write_all(manifest_bytes).with_context(|| {
1239        format!(
1240            "write raw mirror manifest update temp {}",
1241            temp_path.display()
1242        )
1243    })?;
1244    temp.sync_all().with_context(|| {
1245        format!(
1246            "sync raw mirror manifest update temp {}",
1247            temp_path.display()
1248        )
1249    })?;
1250    drop(temp);
1251
1252    fs::rename(&temp_path, manifest_path).with_context(|| {
1253        format!(
1254            "replace raw mirror manifest {} from {}",
1255            manifest_path.display(),
1256            temp_path.display()
1257        )
1258    })?;
1259    set_private_file_permissions(manifest_path)?;
1260    sync_file(manifest_path)?;
1261    sync_parent(manifest_path)?;
1262    remove_empty_temp_dir_best_effort(&temp_dir);
1263    Ok(())
1264}
1265
1266fn raw_mirror_manifest_path_from_relative(root: &Path, relative_path: &str) -> Result<PathBuf> {
1267    let relative = Path::new(relative_path);
1268    if relative.is_absolute() {
1269        return Err(anyhow!(
1270            "raw mirror manifest path must be relative: {relative_path}"
1271        ));
1272    }
1273
1274    let mut normal_components = Vec::new();
1275    for component in relative.components() {
1276        match component {
1277            std::path::Component::Normal(part) => normal_components.push(part),
1278            _ => {
1279                return Err(anyhow!(
1280                    "raw mirror manifest path must use only normal relative components: {relative_path}"
1281                ));
1282            }
1283        }
1284    }
1285
1286    if normal_components.len() != 2
1287        || normal_components[0] != std::ffi::OsStr::new("manifests")
1288        || Path::new(normal_components[1])
1289            .extension()
1290            .and_then(|ext| ext.to_str())
1291            != Some("json")
1292    {
1293        return Err(anyhow!(
1294            "raw mirror manifest path must match manifests/<id>.json: {relative_path}"
1295        ));
1296    }
1297
1298    Ok(root.join(relative))
1299}
1300
1301fn verify_existing_file(path: &Path, expected_blake3: &str) -> Result<()> {
1302    let metadata = fs::symlink_metadata(path)
1303        .with_context(|| format!("stat raw mirror blob {}", path.display()))?;
1304    if metadata.file_type().is_symlink() {
1305        return Err(anyhow!(
1306            "refusing to read symlink raw mirror blob {}",
1307            path.display()
1308        ));
1309    }
1310    if !metadata.is_file() {
1311        return Err(anyhow!(
1312            "refusing to read non-file raw mirror blob {}",
1313            path.display()
1314        ));
1315    }
1316    let actual = file_blake3(path)?;
1317    if actual == expected_blake3 {
1318        Ok(())
1319    } else {
1320        Err(anyhow!(
1321            "existing raw mirror blob {} has blake3 {}, expected {}",
1322            path.display(),
1323            actual,
1324            expected_blake3
1325        ))
1326    }
1327}
1328
1329fn verify_existing_manifest(path: &Path, expected_blob_blake3: &str) -> Result<()> {
1330    let manifest = read_raw_mirror_manifest(path)?;
1331    if manifest.blob_blake3 == expected_blob_blake3 {
1332        Ok(())
1333    } else {
1334        Err(anyhow!(
1335            "existing raw mirror manifest {} points at blob {}, expected {}",
1336            path.display(),
1337            manifest.blob_blake3,
1338            expected_blob_blake3
1339        ))
1340    }
1341}
1342
1343fn read_raw_mirror_manifest(path: &Path) -> Result<RawMirrorManifestFile> {
1344    let metadata = fs::symlink_metadata(path)
1345        .with_context(|| format!("stat raw mirror manifest {}", path.display()))?;
1346    if metadata.file_type().is_symlink() {
1347        return Err(anyhow!(
1348            "refusing to read symlink raw mirror manifest {}",
1349            path.display()
1350        ));
1351    }
1352    if !metadata.is_file() {
1353        return Err(anyhow!(
1354            "refusing to read non-file raw mirror manifest {}",
1355            path.display()
1356        ));
1357    }
1358    serde_json::from_slice(
1359        &fs::read(path).with_context(|| format!("read raw mirror manifest {}", path.display()))?,
1360    )
1361    .with_context(|| format!("parse raw mirror manifest {}", path.display()))
1362}
1363
1364fn raw_mirror_root(data_dir: &Path) -> PathBuf {
1365    data_dir
1366        .join(RAW_MIRROR_ROOT_DIR)
1367        .join(RAW_MIRROR_VERSION_DIR)
1368}
1369
1370fn ensure_raw_mirror_root(data_dir: &Path) -> Result<PathBuf> {
1371    let root_parent = data_dir.join(RAW_MIRROR_ROOT_DIR);
1372    ensure_private_dir(&root_parent)?;
1373    let root = root_parent.join(RAW_MIRROR_VERSION_DIR);
1374    ensure_private_dir(&root)?;
1375    Ok(root)
1376}
1377
1378fn raw_mirror_blob_cache_key(
1379    input: &RawMirrorCaptureInput<'_>,
1380    source_metadata: &fs::Metadata,
1381) -> RawMirrorBlobCacheKey {
1382    RawMirrorBlobCacheKey {
1383        data_dir: input.data_dir.to_path_buf(),
1384        source_path: input.source_path.to_path_buf(),
1385        source_identity: source_identity_token(source_metadata),
1386        source_size_bytes: source_metadata.len(),
1387        source_mtime_ns: source_metadata.modified().ok().and_then(system_time_to_ns),
1388        source_change_time_ns: source_change_time_ns(source_metadata),
1389    }
1390}
1391
1392fn cached_raw_mirror_blob_record(
1393    key: &RawMirrorBlobCacheKey,
1394    root: &Path,
1395) -> Option<RawMirrorBlobRecord> {
1396    let cache = BLOB_CAPTURE_CACHE.get_or_init(|| Mutex::new(HashMap::new()));
1397    let record = {
1398        let mut guard = cache.lock().ok()?;
1399        let record = guard.get(key).cloned()?;
1400        if raw_mirror_blob_relative_path(&record.blob_blake3).is_none() {
1401            guard.remove(key);
1402            return None;
1403        }
1404        record
1405    };
1406
1407    let blob_relative_path = raw_mirror_blob_relative_path(&record.blob_blake3)?;
1408    let blob_path = root.join(blob_relative_path);
1409    let metadata_valid = fs::symlink_metadata(&blob_path)
1410        .map(|metadata| metadata.is_file() && !metadata.file_type().is_symlink())
1411        .unwrap_or(false);
1412    if !metadata_valid {
1413        remove_cached_raw_mirror_blob_record_if_unchanged(cache, key, &record);
1414        return None;
1415    }
1416
1417    match file_blake3(&blob_path) {
1418        Ok(actual) if actual == record.blob_blake3 => Some(record),
1419        Ok(actual) => {
1420            tracing::warn!(
1421                path = %blob_path.display(),
1422                expected_blake3 = %record.blob_blake3,
1423                actual_blake3 = %actual,
1424                "discarding raw mirror blob cache entry with mismatched content"
1425            );
1426            remove_cached_raw_mirror_blob_record_if_unchanged(cache, key, &record);
1427            None
1428        }
1429        Err(err) => {
1430            tracing::debug!(
1431                path = %blob_path.display(),
1432                error = %err,
1433                "discarding unreadable raw mirror blob cache entry"
1434            );
1435            remove_cached_raw_mirror_blob_record_if_unchanged(cache, key, &record);
1436            None
1437        }
1438    }
1439}
1440
1441fn remove_cached_raw_mirror_blob_record_if_unchanged(
1442    cache: &Mutex<HashMap<RawMirrorBlobCacheKey, RawMirrorBlobRecord>>,
1443    key: &RawMirrorBlobCacheKey,
1444    stale_record: &RawMirrorBlobRecord,
1445) {
1446    if let Ok(mut guard) = cache.lock()
1447        && guard
1448            .get(key)
1449            .is_some_and(|current| current == stale_record)
1450    {
1451        guard.remove(key);
1452    }
1453}
1454
1455fn cache_raw_mirror_blob_record(key: RawMirrorBlobCacheKey, record: RawMirrorBlobRecord) {
1456    let cache = BLOB_CAPTURE_CACHE.get_or_init(|| Mutex::new(HashMap::new()));
1457    if let Ok(mut guard) = cache.lock() {
1458        guard.insert(key, record);
1459    }
1460}
1461
1462fn raw_mirror_blob_relative_path(blob_blake3: &str) -> Option<String> {
1463    if blob_blake3.len() != 64 || !blob_blake3.chars().all(|c| c.is_ascii_hexdigit()) {
1464        return None;
1465    }
1466    let lower = blob_blake3.to_ascii_lowercase();
1467    Some(format!(
1468        "blobs/{}/{}/{}.{}",
1469        RAW_MIRROR_HASH_ALGORITHM,
1470        &lower[..2],
1471        lower,
1472        RAW_MIRROR_BLOB_EXTENSION
1473    ))
1474}
1475
1476fn raw_mirror_manifest_relative_path(manifest_id: &str) -> String {
1477    format!("manifests/{manifest_id}.json")
1478}
1479
1480fn raw_mirror_original_path_blake3(original_path: &str) -> String {
1481    let mut hasher = blake3::Hasher::new();
1482    hasher.update(b"doctor-raw-mirror-original-path-v1");
1483    hasher.update(&[0]);
1484    hasher.update(original_path.as_bytes());
1485    hasher.finalize().to_hex().to_string()
1486}
1487
1488fn raw_mirror_manifest_id(
1489    provider: &str,
1490    source_id: &str,
1491    origin_kind: &str,
1492    origin_host: Option<&str>,
1493    original_path_blake3: &str,
1494    blob_blake3: &str,
1495) -> String {
1496    canonical_blake3(
1497        "doctor-raw-mirror-manifest-id-v1",
1498        json!({
1499            "provider": provider,
1500            "source_id": source_id,
1501            "origin_kind": origin_kind,
1502            "origin_host": origin_host,
1503            "original_path_blake3": original_path_blake3,
1504            "blob_blake3": blob_blake3,
1505        }),
1506    )
1507}
1508
1509fn raw_mirror_manifest_blake3(manifest: &RawMirrorManifestFile) -> String {
1510    let mut value = serde_json::to_value(manifest).unwrap_or_default();
1511    if let Value::Object(map) = &mut value {
1512        map.remove("manifest_blake3");
1513    }
1514    canonical_blake3("doctor-raw-mirror-manifest-v1", value)
1515}
1516
1517fn canonical_blake3(prefix: &str, value: Value) -> String {
1518    let encoded = serde_json::to_vec(&canonical_json_value(value)).unwrap_or_default();
1519    let mut hasher = blake3::Hasher::new();
1520    hasher.update(prefix.as_bytes());
1521    hasher.update(&[0]);
1522    hasher.update(&encoded);
1523    format!("{prefix}-{}", hasher.finalize().to_hex())
1524}
1525
1526fn canonical_json_value(value: Value) -> Value {
1527    match value {
1528        Value::Array(items) => Value::Array(items.into_iter().map(canonical_json_value).collect()),
1529        Value::Object(map) => {
1530            let mut entries: Vec<_> = map.into_iter().collect();
1531            entries.sort_by(|left, right| left.0.cmp(&right.0));
1532            let mut canonical = serde_json::Map::new();
1533            for (key, value) in entries {
1534                canonical.insert(key, canonical_json_value(value));
1535            }
1536            Value::Object(canonical)
1537        }
1538        other => other,
1539    }
1540}
1541
1542fn unique_db_links(links: &[RawMirrorDbLink]) -> Vec<RawMirrorDbLink> {
1543    let mut dedup = links.to_vec();
1544    dedup.sort_by(|left, right| {
1545        (
1546            left.conversation_id,
1547            left.message_count,
1548            left.started_at_ms,
1549            left.source_path.as_deref().unwrap_or(""),
1550        )
1551            .cmp(&(
1552                right.conversation_id,
1553                right.message_count,
1554                right.started_at_ms,
1555                right.source_path.as_deref().unwrap_or(""),
1556            ))
1557    });
1558    dedup.dedup();
1559    dedup
1560}
1561
1562fn file_blake3(path: &Path) -> Result<String> {
1563    let mut file = File::open(path).with_context(|| format!("open {}", path.display()))?;
1564    let mut hasher = blake3::Hasher::new();
1565    let mut buffer = [0u8; 64 * 1024];
1566    loop {
1567        let read = file
1568            .read(&mut buffer)
1569            .with_context(|| format!("read {}", path.display()))?;
1570        if read == 0 {
1571            break;
1572        }
1573        hasher.update(&buffer[..read]);
1574    }
1575    Ok(hasher.finalize().to_hex().to_string())
1576}
1577
1578fn ensure_private_dir(path: &Path) -> Result<()> {
1579    create_private_dir_all(path)
1580        .with_context(|| format!("create raw mirror dir {}", path.display()))?;
1581    let metadata = fs::symlink_metadata(path)
1582        .with_context(|| format!("stat raw mirror dir {}", path.display()))?;
1583    let file_type = metadata.file_type();
1584    if file_type.is_symlink() {
1585        return Err(anyhow!(
1586            "refusing to use symlink raw mirror dir {}",
1587            path.display()
1588        ));
1589    }
1590    if !file_type.is_dir() {
1591        return Err(anyhow!(
1592            "refusing to use non-directory raw mirror path {}",
1593            path.display()
1594        ));
1595    }
1596    #[cfg(unix)]
1597    {
1598        use std::os::unix::fs::PermissionsExt;
1599        if metadata.permissions().mode() & 0o777 != 0o700 {
1600            set_private_dir_permissions(path)?;
1601        }
1602    }
1603    #[cfg(not(unix))]
1604    {
1605        set_private_dir_permissions(path)?;
1606    }
1607    Ok(())
1608}
1609
1610fn ensure_private_dir_descendant(root: &Path, path: &Path) -> Result<()> {
1611    let relative = path.strip_prefix(root).with_context(|| {
1612        format!(
1613            "raw mirror private dir {} is not under root {}",
1614            path.display(),
1615            root.display()
1616        )
1617    })?;
1618
1619    if let Some(root_parent) = root.parent() {
1620        ensure_private_dir(root_parent)?;
1621    }
1622    ensure_private_dir(root)?;
1623    let mut current = root.to_path_buf();
1624    for component in relative.components() {
1625        match component {
1626            Component::Normal(part) => {
1627                current.push(part);
1628                ensure_private_dir(&current)?;
1629            }
1630            Component::CurDir => {}
1631            _ => {
1632                return Err(anyhow!(
1633                    "raw mirror private dir contains non-normal component: {}",
1634                    path.display()
1635                ));
1636            }
1637        }
1638    }
1639
1640    Ok(())
1641}
1642
1643fn private_create_new_file(path: &Path) -> Result<File> {
1644    let mut options = OpenOptions::new();
1645    options.write(true).create_new(true);
1646    set_private_create_file_mode(&mut options);
1647    let file = options
1648        .open(path)
1649        .with_context(|| format!("create raw mirror file {}", path.display()))?;
1650    Ok(file)
1651}
1652
1653#[cfg(unix)]
1654fn create_private_dir_all(path: &Path) -> std::io::Result<()> {
1655    use std::os::unix::fs::DirBuilderExt;
1656
1657    let mut builder = fs::DirBuilder::new();
1658    builder.recursive(true).mode(0o700).create(path)
1659}
1660
1661#[cfg(not(unix))]
1662fn create_private_dir_all(path: &Path) -> std::io::Result<()> {
1663    fs::create_dir_all(path)
1664}
1665
1666#[cfg(unix)]
1667fn set_private_create_file_mode(options: &mut OpenOptions) {
1668    use std::os::unix::fs::OpenOptionsExt;
1669
1670    options.mode(0o600);
1671}
1672
1673#[cfg(not(unix))]
1674fn set_private_create_file_mode(_options: &mut OpenOptions) {}
1675
1676fn sync_file(path: &Path) -> Result<()> {
1677    File::open(path)
1678        .and_then(|file| file.sync_all())
1679        .with_context(|| format!("sync raw mirror file {}", path.display()))
1680}
1681
1682fn sync_parent(path: &Path) -> Result<()> {
1683    let Some(parent) = path.parent() else {
1684        return Ok(());
1685    };
1686    File::open(parent)
1687        .and_then(|file| file.sync_all())
1688        .with_context(|| format!("sync raw mirror parent {}", parent.display()))
1689}
1690
1691fn unique_temp_path(dir: &Path, label: &str) -> PathBuf {
1692    let nonce = TEMP_NONCE.fetch_add(1, Ordering::Relaxed);
1693    let nanos = SystemTime::now()
1694        .duration_since(UNIX_EPOCH)
1695        .unwrap_or_default()
1696        .as_nanos();
1697    dir.join(format!(
1698        ".{label}.{}.{}.{}.tmp",
1699        std::process::id(),
1700        nanos,
1701        nonce
1702    ))
1703}
1704
1705fn unique_capture_temp_dir(root: &Path) -> PathBuf {
1706    let nonce = TEMP_NONCE.fetch_add(1, Ordering::Relaxed);
1707    let nanos = SystemTime::now()
1708        .duration_since(UNIX_EPOCH)
1709        .unwrap_or_default()
1710        .as_nanos();
1711    root.join("tmp").join(format!(
1712        "capture.{}.{}.{}",
1713        std::process::id(),
1714        nanos,
1715        nonce
1716    ))
1717}
1718
1719fn remove_temp_best_effort(path: &Path) {
1720    if let Err(err) = fs::remove_file(path) {
1721        tracing::debug!(
1722            path = %path.display(),
1723            error = %err,
1724            "failed to remove raw mirror temp file"
1725        );
1726    }
1727}
1728
1729fn remove_empty_temp_dir_best_effort(path: &Path) {
1730    if let Err(err) = fs::remove_dir(path) {
1731        tracing::debug!(
1732            path = %path.display(),
1733            error = %err,
1734            "failed to remove raw mirror temp directory"
1735        );
1736    }
1737}
1738
1739fn redacted_original_path(provider: &str, source_path: &Path) -> String {
1740    let file_name = source_path
1741        .file_name()
1742        .and_then(|name| name.to_str())
1743        .unwrap_or("session");
1744    format!("[{provider}]/{file_name}")
1745}
1746
1747fn now_ms() -> i64 {
1748    system_time_to_ms(SystemTime::now()).unwrap_or(0)
1749}
1750
1751fn system_time_to_ms(time: SystemTime) -> Option<i64> {
1752    time.duration_since(UNIX_EPOCH)
1753        .ok()
1754        .and_then(|duration| i64::try_from(duration.as_millis()).ok())
1755}
1756
1757fn system_time_to_ns(time: SystemTime) -> Option<u128> {
1758    time.duration_since(UNIX_EPOCH)
1759        .ok()
1760        .map(|duration| duration.as_nanos())
1761}
1762
1763#[cfg(unix)]
1764fn set_private_dir_permissions(path: &Path) -> Result<()> {
1765    use std::os::unix::fs::PermissionsExt;
1766    fs::set_permissions(path, fs::Permissions::from_mode(0o700))
1767        .with_context(|| format!("set raw mirror dir permissions {}", path.display()))
1768}
1769
1770#[cfg(not(unix))]
1771fn set_private_dir_permissions(_path: &Path) -> Result<()> {
1772    Ok(())
1773}
1774
1775#[cfg(unix)]
1776fn set_private_file_permissions(path: &Path) -> Result<()> {
1777    use std::os::unix::fs::PermissionsExt;
1778    fs::set_permissions(path, fs::Permissions::from_mode(0o600))
1779        .with_context(|| format!("set raw mirror file permissions {}", path.display()))
1780}
1781
1782#[cfg(not(unix))]
1783fn set_private_file_permissions(_path: &Path) -> Result<()> {
1784    Ok(())
1785}
1786
1787#[cfg(test)]
1788mod tests {
1789    use super::*;
1790
1791    #[test]
1792    fn capture_source_file_writes_doctor_compatible_manifest_idempotently() {
1793        let temp = tempfile::TempDir::new().expect("tempdir");
1794        let data_dir = temp.path().join("cass-data");
1795        let source_path = temp.path().join("rollout-fixture.jsonl");
1796        let source_bytes = b"{\"type\":\"message\",\"text\":\"hello\"}\n";
1797        fs::write(&source_path, source_bytes).expect("write source");
1798        let db_link = RawMirrorDbLink {
1799            conversation_id: Some(42),
1800            message_count: Some(1),
1801            source_path: Some(source_path.display().to_string()),
1802            started_at_ms: Some(1_733_000_000_000),
1803        };
1804
1805        let first = capture_source_file(RawMirrorCaptureInput {
1806            data_dir: &data_dir,
1807            provider: "codex",
1808            source_id: "local",
1809            origin_kind: "local",
1810            origin_host: None,
1811            source_path: &source_path,
1812            db_links: std::slice::from_ref(&db_link),
1813        })
1814        .expect("first capture");
1815        let second = capture_source_file(RawMirrorCaptureInput {
1816            data_dir: &data_dir,
1817            provider: "codex",
1818            source_id: "local",
1819            origin_kind: "local",
1820            origin_host: None,
1821            source_path: &source_path,
1822            db_links: std::slice::from_ref(&db_link),
1823        })
1824        .expect("second capture");
1825
1826        assert_eq!(first.manifest_id, second.manifest_id);
1827        assert_eq!(first.blob_blake3, second.blob_blake3);
1828        assert_eq!(first.captured_at_ms, second.captured_at_ms);
1829        assert_eq!(first.source_mtime_ms, second.source_mtime_ms);
1830        assert!(!first.already_present);
1831        assert!(second.already_present);
1832        assert_eq!(fs::read(&source_path).expect("source bytes"), source_bytes);
1833
1834        let blob_path = data_dir
1835            .join(RAW_MIRROR_ROOT_DIR)
1836            .join(RAW_MIRROR_VERSION_DIR)
1837            .join(&first.blob_relative_path);
1838        let manifest_path = data_dir
1839            .join(RAW_MIRROR_ROOT_DIR)
1840            .join(RAW_MIRROR_VERSION_DIR)
1841            .join(&first.manifest_relative_path);
1842        assert_eq!(fs::read(blob_path).expect("blob bytes"), source_bytes);
1843
1844        let manifest: Value =
1845            serde_json::from_slice(&fs::read(&manifest_path).expect("manifest bytes"))
1846                .expect("manifest json");
1847        assert_eq!(
1848            manifest["manifest_kind"].as_str(),
1849            Some(RAW_MIRROR_MANIFEST_KIND)
1850        );
1851        assert_eq!(manifest["provider"].as_str(), Some("codex"));
1852        assert_eq!(
1853            manifest["blob_blake3"].as_str(),
1854            Some(first.blob_blake3.as_str())
1855        );
1856        assert_eq!(
1857            manifest["redacted_original_path"].as_str(),
1858            Some("[codex]/rollout-fixture.jsonl")
1859        );
1860        assert_eq!(
1861            manifest["db_links"][0]["conversation_id"].as_i64(),
1862            Some(42)
1863        );
1864        assert_eq!(manifest["db_links"][0]["message_count"].as_u64(), Some(1));
1865        assert!(
1866            manifest["manifest_blake3"]
1867                .as_str()
1868                .is_some_and(|value| value.starts_with("doctor-raw-mirror-manifest-v1-"))
1869        );
1870        let tmp_root = data_dir
1871            .join(RAW_MIRROR_ROOT_DIR)
1872            .join(RAW_MIRROR_VERSION_DIR)
1873            .join("tmp");
1874        assert_eq!(
1875            fs::read_dir(&tmp_root)
1876                .expect("raw mirror tmp root")
1877                .collect::<Vec<_>>()
1878                .len(),
1879            0,
1880            "successful captures must not leave doctor-visible interrupted temp artifacts"
1881        );
1882
1883        #[cfg(unix)]
1884        {
1885            use std::os::unix::fs::PermissionsExt;
1886
1887            let root = data_dir
1888                .join(RAW_MIRROR_ROOT_DIR)
1889                .join(RAW_MIRROR_VERSION_DIR);
1890            assert_eq!(
1891                fs::metadata(&root)
1892                    .expect("raw mirror root metadata")
1893                    .permissions()
1894                    .mode()
1895                    & 0o777,
1896                0o700
1897            );
1898            assert_eq!(
1899                fs::metadata(&manifest_path)
1900                    .expect("manifest metadata")
1901                    .permissions()
1902                    .mode()
1903                    & 0o777,
1904                0o600
1905            );
1906        }
1907    }
1908
1909    #[test]
1910    fn capture_source_file_merges_db_links_into_existing_manifest() {
1911        let temp = tempfile::TempDir::new().expect("tempdir");
1912        let data_dir = temp.path().join("cass-data");
1913        let source_path = temp.path().join("preparse-then-parsed.jsonl");
1914        let source_bytes = b"{\"type\":\"message\",\"text\":\"hello\"}\n";
1915        fs::write(&source_path, source_bytes).expect("write source");
1916
1917        let preparse = capture_source_file(RawMirrorCaptureInput {
1918            data_dir: &data_dir,
1919            provider: "codex",
1920            source_id: "local",
1921            origin_kind: "local",
1922            origin_host: None,
1923            source_path: &source_path,
1924            db_links: &[],
1925        })
1926        .expect("preparse capture");
1927
1928        let parsed_link = RawMirrorDbLink {
1929            conversation_id: None,
1930            message_count: Some(1),
1931            source_path: Some(source_path.display().to_string()),
1932            started_at_ms: Some(1_733_000_000_000),
1933        };
1934        let parsed = capture_source_file(RawMirrorCaptureInput {
1935            data_dir: &data_dir,
1936            provider: "codex",
1937            source_id: "local",
1938            origin_kind: "local",
1939            origin_host: None,
1940            source_path: &source_path,
1941            db_links: std::slice::from_ref(&parsed_link),
1942        })
1943        .expect("parsed capture");
1944
1945        assert_eq!(preparse.manifest_id, parsed.manifest_id);
1946        assert_eq!(preparse.blob_blake3, parsed.blob_blake3);
1947        assert!(parsed.already_present);
1948
1949        let manifest_path = data_dir
1950            .join(RAW_MIRROR_ROOT_DIR)
1951            .join(RAW_MIRROR_VERSION_DIR)
1952            .join(&parsed.manifest_relative_path);
1953        let manifest = read_raw_mirror_manifest(&manifest_path).expect("merged manifest");
1954        assert_eq!(
1955            manifest.db_links,
1956            vec![parsed_link],
1957            "second capture must enrich the pre-parse manifest with DB-link evidence"
1958        );
1959        let expected_manifest_blake3 = raw_mirror_manifest_blake3(&manifest);
1960        assert_eq!(
1961            manifest.manifest_blake3.as_deref(),
1962            Some(expected_manifest_blake3.as_str()),
1963            "manifest checksum must be recomputed after DB-link merge"
1964        );
1965        assert_eq!(fs::read(&source_path).expect("source bytes"), source_bytes);
1966    }
1967
1968    #[test]
1969    fn merge_manifest_db_links_rejects_hostile_relative_paths() {
1970        let temp = tempfile::TempDir::new().expect("tempdir");
1971        let data_dir = temp.path().join("cass-data");
1972        let db_link = RawMirrorDbLink {
1973            conversation_id: Some(42),
1974            message_count: Some(1),
1975            source_path: Some("source.jsonl".to_string()),
1976            started_at_ms: Some(1_733_000_000_000),
1977        };
1978
1979        for relative in [
1980            "../escape.json",
1981            "/tmp/escape.json",
1982            "manifests/../escape.json",
1983            "blobs/blake3/ab/not-a-manifest.raw",
1984            "manifests/not-json.txt",
1985        ] {
1986            let err = merge_manifest_db_links(&data_dir, relative, std::slice::from_ref(&db_link))
1987                .expect_err("hostile manifest path should be rejected");
1988            assert!(
1989                err.to_string().contains("raw mirror manifest path"),
1990                "unexpected error for {relative}: {err}"
1991            );
1992        }
1993    }
1994
1995    #[cfg(unix)]
1996    #[test]
1997    fn merge_manifest_db_links_rejects_symlink_manifest_path() {
1998        let temp = tempfile::TempDir::new().expect("tempdir");
1999        let data_dir = temp.path().join("cass-data");
2000        let manifest_dir = data_dir.join("raw-mirror/v1/manifests");
2001        fs::create_dir_all(&manifest_dir).expect("manifest dir");
2002        let outside = temp.path().join("outside.json");
2003        fs::write(&outside, "{}").expect("outside manifest");
2004        std::os::unix::fs::symlink(&outside, manifest_dir.join("link.json"))
2005            .expect("symlink manifest");
2006        let db_link = RawMirrorDbLink {
2007            conversation_id: Some(42),
2008            message_count: Some(1),
2009            source_path: Some("source.jsonl".to_string()),
2010            started_at_ms: Some(1_733_000_000_000),
2011        };
2012
2013        let err = merge_manifest_db_links(
2014            &data_dir,
2015            "manifests/link.json",
2016            std::slice::from_ref(&db_link),
2017        )
2018        .expect_err("symlink manifest should be rejected");
2019        assert!(
2020            err.to_string().contains("symlink raw mirror manifest"),
2021            "unexpected symlink-manifest error: {err}"
2022        );
2023    }
2024
2025    #[test]
2026    fn capture_source_file_deduplicates_blob_for_distinct_source_paths() {
2027        let temp = tempfile::TempDir::new().expect("tempdir");
2028        let data_dir = temp.path().join("cass-data");
2029        let first_source = temp.path().join("first.jsonl");
2030        let second_source = temp.path().join("second.jsonl");
2031        let source_bytes = b"{\"type\":\"message\",\"text\":\"shared\"}\n";
2032        fs::write(&first_source, source_bytes).expect("write first source");
2033        fs::write(&second_source, source_bytes).expect("write second source");
2034
2035        let first = capture_source_file(RawMirrorCaptureInput {
2036            data_dir: &data_dir,
2037            provider: "codex",
2038            source_id: "local",
2039            origin_kind: "local",
2040            origin_host: None,
2041            source_path: &first_source,
2042            db_links: &[],
2043        })
2044        .expect("first capture");
2045        let second = capture_source_file(RawMirrorCaptureInput {
2046            data_dir: &data_dir,
2047            provider: "codex",
2048            source_id: "local",
2049            origin_kind: "local",
2050            origin_host: None,
2051            source_path: &second_source,
2052            db_links: &[],
2053        })
2054        .expect("second capture");
2055
2056        assert_eq!(first.blob_blake3, second.blob_blake3);
2057        assert_eq!(first.blob_relative_path, second.blob_relative_path);
2058        assert_ne!(first.manifest_id, second.manifest_id);
2059        assert!(
2060            !second.already_present,
2061            "a duplicate blob with a new source manifest is not a full capture replay"
2062        );
2063
2064        let manifest_root = data_dir
2065            .join(RAW_MIRROR_ROOT_DIR)
2066            .join(RAW_MIRROR_VERSION_DIR)
2067            .join("manifests");
2068        let manifests = fs::read_dir(manifest_root)
2069            .expect("manifest dir")
2070            .collect::<std::io::Result<Vec<_>>>()
2071            .expect("manifest entries");
2072        assert_eq!(manifests.len(), 2);
2073
2074        let summary = storage_summary(&data_dir);
2075        assert!(summary.initialized);
2076        assert_eq!(summary.manifest_count, 2);
2077        assert_eq!(summary.unique_blob_count, 1);
2078        assert_eq!(summary.total_blob_bytes, source_bytes.len() as u64);
2079        assert_eq!(summary.largest_blob_bytes, source_bytes.len() as u64);
2080        assert_eq!(summary.missing_blob_count, 0);
2081        assert_eq!(summary.invalid_manifest_count, 0);
2082        assert!(summary.total_storage_bytes >= source_bytes.len() as u64);
2083    }
2084
2085    #[test]
2086    fn storage_summary_rejects_hostile_blob_relative_path() {
2087        let temp = tempfile::TempDir::new().expect("tempdir");
2088        let data_dir = temp.path().join("cass-data");
2089        let source_path = temp.path().join("source.jsonl");
2090        fs::write(
2091            &source_path,
2092            b"{\"type\":\"message\",\"text\":\"hostile\"}\n",
2093        )
2094        .expect("write source");
2095
2096        let captured = capture_source_file(RawMirrorCaptureInput {
2097            data_dir: &data_dir,
2098            provider: "codex",
2099            source_id: "local",
2100            origin_kind: "local",
2101            origin_host: None,
2102            source_path: &source_path,
2103            db_links: &[],
2104        })
2105        .expect("capture source");
2106        let manifest_path = data_dir
2107            .join(RAW_MIRROR_ROOT_DIR)
2108            .join(RAW_MIRROR_VERSION_DIR)
2109            .join(&captured.manifest_relative_path);
2110        let mut manifest = read_raw_mirror_manifest(&manifest_path).expect("read manifest");
2111        manifest.blob_relative_path = "../outside.raw".to_string();
2112        manifest.manifest_blake3 = Some(raw_mirror_manifest_blake3(&manifest));
2113        fs::write(
2114            &manifest_path,
2115            serde_json::to_vec_pretty(&manifest).expect("serialize manifest"),
2116        )
2117        .expect("tamper manifest");
2118
2119        let summary = storage_summary(&data_dir);
2120        assert_eq!(summary.manifest_count, 1);
2121        assert_eq!(summary.invalid_manifest_count, 1);
2122        assert_eq!(summary.unique_blob_count, 0);
2123        assert_eq!(summary.total_blob_bytes, 0);
2124    }
2125
2126    #[test]
2127    fn prune_fails_closed_on_hostile_manifest_inventory() {
2128        let temp = tempfile::TempDir::new().expect("tempdir");
2129        let data_dir = temp.path().join("cass-data");
2130        let source_path = temp.path().join("source.jsonl");
2131        fs::write(
2132            &source_path,
2133            b"{\"type\":\"message\",\"text\":\"hostile\"}\n",
2134        )
2135        .expect("write source");
2136
2137        let captured = capture_source_file(RawMirrorCaptureInput {
2138            data_dir: &data_dir,
2139            provider: "codex",
2140            source_id: "local",
2141            origin_kind: "local",
2142            origin_host: None,
2143            source_path: &source_path,
2144            db_links: &[],
2145        })
2146        .expect("capture source");
2147        let root = data_dir
2148            .join(RAW_MIRROR_ROOT_DIR)
2149            .join(RAW_MIRROR_VERSION_DIR);
2150        let manifest_path = root.join(&captured.manifest_relative_path);
2151        let blob_path = root.join(&captured.blob_relative_path);
2152        let mut manifest = read_raw_mirror_manifest(&manifest_path).expect("read manifest");
2153        manifest.blob_relative_path = "../outside.raw".to_string();
2154        manifest.manifest_blake3 = Some(raw_mirror_manifest_blake3(&manifest));
2155        fs::write(
2156            &manifest_path,
2157            serde_json::to_vec_pretty(&manifest).expect("serialize manifest"),
2158        )
2159        .expect("tamper manifest");
2160
2161        let err = prune(
2162            &data_dir,
2163            RawMirrorPruneOptions {
2164                older_than_ms: Some(0),
2165                max_size_bytes: None,
2166                keep_tags: Vec::new(),
2167                safety_hold_down_ms: 0,
2168                apply: true,
2169            },
2170        )
2171        .expect_err("hostile inventory should fail closed");
2172
2173        assert!(
2174            err.to_string().contains("unexpected blob path"),
2175            "error should explain the unsafe manifest inventory: {err}"
2176        );
2177        assert!(manifest_path.exists());
2178        assert!(blob_path.exists());
2179        assert!(!root.join("pruned.jsonl").exists());
2180    }
2181
2182    #[test]
2183    fn prune_dry_run_audits_without_removing_manifest_or_blob() {
2184        let temp = tempfile::TempDir::new().expect("tempdir");
2185        let data_dir = temp.path().join("cass-data");
2186        let source_path = temp.path().join("source.jsonl");
2187        fs::write(&source_path, b"{\"type\":\"message\",\"text\":\"old\"}\n")
2188            .expect("write source");
2189        let captured = capture_source_file(RawMirrorCaptureInput {
2190            data_dir: &data_dir,
2191            provider: "codex",
2192            source_id: "local",
2193            origin_kind: "local",
2194            origin_host: None,
2195            source_path: &source_path,
2196            db_links: &[],
2197        })
2198        .expect("capture source");
2199
2200        let report = prune(
2201            &data_dir,
2202            RawMirrorPruneOptions {
2203                older_than_ms: Some(0),
2204                max_size_bytes: None,
2205                keep_tags: Vec::new(),
2206                safety_hold_down_ms: 0,
2207                apply: false,
2208            },
2209        )
2210        .expect("dry-run prune");
2211
2212        assert!(report.initialized);
2213        assert_eq!(report.mode, "dry-run");
2214        assert_eq!(report.planned_manifest_count, 1);
2215        assert_eq!(report.planned_blob_count, 1);
2216        assert_eq!(report.applied_reclaim_bytes, 0);
2217        let root = data_dir
2218            .join(RAW_MIRROR_ROOT_DIR)
2219            .join(RAW_MIRROR_VERSION_DIR);
2220        assert!(root.join(&captured.manifest_relative_path).exists());
2221        assert!(root.join(&captured.blob_relative_path).exists());
2222        let audit_path = root.join("pruned.jsonl");
2223        let audit = fs::read_to_string(audit_path).expect("read audit");
2224        assert!(audit.contains("\"mode\":\"dry-run\""));
2225        assert!(audit.contains("\"applied\":false"));
2226    }
2227
2228    #[test]
2229    #[cfg(unix)]
2230    fn prune_refuses_symlinked_audit_log_without_writing_target() -> Result<()> {
2231        use std::os::unix::fs::symlink;
2232
2233        let temp = tempfile::TempDir::new()?;
2234        let data_dir = temp.path().join("cass-data");
2235        let source_path = temp.path().join("source.jsonl");
2236        let protected_audit_target = temp.path().join("protected-prune-audit.jsonl");
2237        fs::write(&source_path, b"{\"type\":\"message\",\"text\":\"old\"}\n")?;
2238        fs::write(&protected_audit_target, b"protected\n")?;
2239
2240        let captured = capture_source_file(RawMirrorCaptureInput {
2241            data_dir: &data_dir,
2242            provider: "codex",
2243            source_id: "local",
2244            origin_kind: "local",
2245            origin_host: None,
2246            source_path: &source_path,
2247            db_links: &[],
2248        })?;
2249        let root = data_dir
2250            .join(RAW_MIRROR_ROOT_DIR)
2251            .join(RAW_MIRROR_VERSION_DIR);
2252        let audit_path = root.join("pruned.jsonl");
2253        symlink(&protected_audit_target, &audit_path)?;
2254
2255        let err = match prune(
2256            &data_dir,
2257            RawMirrorPruneOptions {
2258                older_than_ms: Some(0),
2259                max_size_bytes: None,
2260                keep_tags: Vec::new(),
2261                safety_hold_down_ms: 0,
2262                apply: false,
2263            },
2264        ) {
2265            Ok(_) => anyhow::bail!("symlinked prune audit log was accepted"),
2266            Err(err) => err,
2267        };
2268
2269        if !err.to_string().contains("prune audit through symlink") {
2270            anyhow::bail!("unexpected audit symlink error: {err:#}");
2271        }
2272        if !fs::read(&protected_audit_target)?
2273            .as_slice()
2274            .eq(b"protected\n")
2275        {
2276            anyhow::bail!("protected audit target was modified");
2277        }
2278        if !fs::read_link(&audit_path)?
2279            .as_os_str()
2280            .eq(protected_audit_target.as_os_str())
2281        {
2282            anyhow::bail!("audit path did not remain a symlink to the protected target");
2283        }
2284        if !root.join(&captured.manifest_relative_path).exists() {
2285            anyhow::bail!("failed audit append removed the captured manifest");
2286        }
2287        if !root.join(&captured.blob_relative_path).exists() {
2288            anyhow::bail!("failed audit append removed the captured blob");
2289        }
2290        Ok(())
2291    }
2292
2293    #[test]
2294    fn prune_apply_removes_selected_manifest_and_unreferenced_blob() {
2295        let temp = tempfile::TempDir::new().expect("tempdir");
2296        let data_dir = temp.path().join("cass-data");
2297        let source_path = temp.path().join("source.jsonl");
2298        fs::write(&source_path, b"{\"type\":\"message\",\"text\":\"apply\"}\n")
2299            .expect("write source");
2300        let captured = capture_source_file(RawMirrorCaptureInput {
2301            data_dir: &data_dir,
2302            provider: "codex",
2303            source_id: "local",
2304            origin_kind: "local",
2305            origin_host: None,
2306            source_path: &source_path,
2307            db_links: &[],
2308        })
2309        .expect("capture source");
2310        let root = data_dir
2311            .join(RAW_MIRROR_ROOT_DIR)
2312            .join(RAW_MIRROR_VERSION_DIR);
2313        let manifest_path = root.join(&captured.manifest_relative_path);
2314        let blob_path = root.join(&captured.blob_relative_path);
2315
2316        let report = prune(
2317            &data_dir,
2318            RawMirrorPruneOptions {
2319                older_than_ms: Some(0),
2320                max_size_bytes: None,
2321                keep_tags: Vec::new(),
2322                safety_hold_down_ms: 0,
2323                apply: true,
2324            },
2325        )
2326        .expect("apply prune");
2327
2328        assert_eq!(report.applied_manifest_count, 1);
2329        assert_eq!(report.applied_blob_count, 1);
2330        assert!(!manifest_path.exists());
2331        assert!(!blob_path.exists());
2332        let audit = fs::read_to_string(root.join("pruned.jsonl")).expect("read audit");
2333        assert!(audit.contains("\"mode\":\"apply\""));
2334        assert!(audit.contains("\"applied\":true"));
2335    }
2336
2337    #[test]
2338    fn prune_apply_keeps_blob_referenced_by_retained_manifest() {
2339        let temp = tempfile::TempDir::new().expect("tempdir");
2340        let data_dir = temp.path().join("cass-data");
2341        let first_source = temp.path().join("first.jsonl");
2342        let second_source = temp.path().join("second.jsonl");
2343        let bytes = b"{\"type\":\"message\",\"text\":\"shared-retained\"}\n";
2344        fs::write(&first_source, bytes).expect("write first");
2345        fs::write(&second_source, bytes).expect("write second");
2346        let first = capture_source_file(RawMirrorCaptureInput {
2347            data_dir: &data_dir,
2348            provider: "codex",
2349            source_id: "local",
2350            origin_kind: "local",
2351            origin_host: None,
2352            source_path: &first_source,
2353            db_links: &[],
2354        })
2355        .expect("capture first");
2356        let second = capture_source_file(RawMirrorCaptureInput {
2357            data_dir: &data_dir,
2358            provider: "codex",
2359            source_id: "local",
2360            origin_kind: "local",
2361            origin_host: None,
2362            source_path: &second_source,
2363            db_links: &[],
2364        })
2365        .expect("capture second");
2366        let root = data_dir
2367            .join(RAW_MIRROR_ROOT_DIR)
2368            .join(RAW_MIRROR_VERSION_DIR);
2369        let first_manifest_path = root.join(&first.manifest_relative_path);
2370        let second_manifest_path = root.join(&second.manifest_relative_path);
2371        let mut first_manifest =
2372            read_raw_mirror_manifest(&first_manifest_path).expect("first manifest");
2373        first_manifest.captured_at_ms = now_ms().saturating_sub(2 * 86_400_000);
2374        first_manifest.manifest_blake3 = Some(raw_mirror_manifest_blake3(&first_manifest));
2375        fs::write(
2376            &first_manifest_path,
2377            serde_json::to_vec_pretty(&first_manifest).expect("serialize first manifest"),
2378        )
2379        .expect("rewrite first manifest");
2380
2381        let report = prune(
2382            &data_dir,
2383            RawMirrorPruneOptions {
2384                older_than_ms: Some(86_400_000),
2385                max_size_bytes: None,
2386                keep_tags: Vec::new(),
2387                safety_hold_down_ms: 0,
2388                apply: true,
2389            },
2390        )
2391        .expect("apply one-manifest prune");
2392
2393        assert_eq!(report.applied_manifest_count, 1);
2394        assert_eq!(report.applied_blob_count, 0);
2395        assert!(!first_manifest_path.exists());
2396        assert!(second_manifest_path.exists());
2397        assert!(
2398            root.join(&first.blob_relative_path).exists(),
2399            "shared blob must stay while a retained manifest still references it"
2400        );
2401    }
2402
2403    #[test]
2404    fn prune_apply_keep_tag_pins_linked_manifest_and_blob() {
2405        use frankensqlite::compat::ConnectionExt as _;
2406
2407        let temp = tempfile::TempDir::new().expect("tempdir");
2408        let data_dir = temp.path().join("cass-data");
2409        std::fs::create_dir_all(&data_dir).expect("create data dir");
2410        let source_path = temp.path().join("tagged.jsonl");
2411        fs::write(
2412            &source_path,
2413            b"{\"type\":\"message\",\"text\":\"tagged\"}\n",
2414        )
2415        .expect("write source");
2416        let db_link = RawMirrorDbLink {
2417            conversation_id: Some(7),
2418            message_count: Some(1),
2419            source_path: Some(source_path.display().to_string()),
2420            started_at_ms: Some(1_733_000_000_000),
2421        };
2422        let captured = capture_source_file(RawMirrorCaptureInput {
2423            data_dir: &data_dir,
2424            provider: "codex",
2425            source_id: "local",
2426            origin_kind: "local",
2427            origin_host: None,
2428            source_path: &source_path,
2429            db_links: std::slice::from_ref(&db_link),
2430        })
2431        .expect("capture source");
2432        let db_path = data_dir.join("agent_search.db");
2433        let conn = frankensqlite::Connection::open(db_path.to_string_lossy().into_owned())
2434            .expect("open keep-tag db");
2435        conn.execute_compat(
2436            "CREATE TABLE tags (id INTEGER PRIMARY KEY, name TEXT NOT NULL UNIQUE)",
2437            frankensqlite::params![],
2438        )
2439        .expect("create tags");
2440        conn.execute_compat(
2441            "CREATE TABLE conversation_tags (conversation_id INTEGER NOT NULL, tag_id INTEGER NOT NULL, PRIMARY KEY (conversation_id, tag_id))",
2442            frankensqlite::params![],
2443        )
2444        .expect("create conversation_tags");
2445        conn.execute_compat(
2446            "INSERT INTO tags (id, name) VALUES (1, 'keep')",
2447            frankensqlite::params![],
2448        )
2449        .expect("insert tag");
2450        conn.execute_compat(
2451            "INSERT INTO conversation_tags (conversation_id, tag_id) VALUES (7, 1)",
2452            frankensqlite::params![],
2453        )
2454        .expect("insert conversation tag");
2455        drop(conn);
2456
2457        let report = prune(
2458            &data_dir,
2459            RawMirrorPruneOptions {
2460                older_than_ms: Some(0),
2461                max_size_bytes: Some(0),
2462                keep_tags: vec!["keep".to_string()],
2463                safety_hold_down_ms: 0,
2464                apply: true,
2465            },
2466        )
2467        .expect("keep-tag prune");
2468
2469        let root = data_dir
2470            .join(RAW_MIRROR_ROOT_DIR)
2471            .join(RAW_MIRROR_VERSION_DIR);
2472        assert_eq!(report.pinned_manifest_count, 1);
2473        assert_eq!(report.pinned_blob_count, 1);
2474        assert_eq!(report.planned_manifest_count, 0);
2475        assert_eq!(report.planned_blob_count, 0);
2476        assert!(root.join(&captured.manifest_relative_path).exists());
2477        assert!(root.join(&captured.blob_relative_path).exists());
2478    }
2479
2480    #[test]
2481    fn prune_apply_safety_hold_down_pins_recent_manifest_during_size_prune() {
2482        let temp = tempfile::TempDir::new().expect("tempdir");
2483        let data_dir = temp.path().join("cass-data");
2484        let source_path = temp.path().join("recent.jsonl");
2485        fs::write(
2486            &source_path,
2487            b"{\"type\":\"message\",\"text\":\"recent\"}\n",
2488        )
2489        .expect("write source");
2490        let captured = capture_source_file(RawMirrorCaptureInput {
2491            data_dir: &data_dir,
2492            provider: "codex",
2493            source_id: "local",
2494            origin_kind: "local",
2495            origin_host: None,
2496            source_path: &source_path,
2497            db_links: &[],
2498        })
2499        .expect("capture source");
2500
2501        let report = prune(
2502            &data_dir,
2503            RawMirrorPruneOptions {
2504                older_than_ms: None,
2505                max_size_bytes: Some(0),
2506                keep_tags: Vec::new(),
2507                safety_hold_down_ms: 7 * 86_400_000,
2508                apply: true,
2509            },
2510        )
2511        .expect("hold-down prune");
2512
2513        let root = data_dir
2514            .join(RAW_MIRROR_ROOT_DIR)
2515            .join(RAW_MIRROR_VERSION_DIR);
2516        assert_eq!(report.pinned_manifest_count, 1);
2517        assert_eq!(report.pinned_blob_count, 1);
2518        assert_eq!(report.planned_manifest_count, 0);
2519        assert_eq!(report.planned_blob_count, 0);
2520        assert!(root.join(&captured.manifest_relative_path).exists());
2521        assert!(root.join(&captured.blob_relative_path).exists());
2522    }
2523
2524    #[test]
2525    fn capture_source_file_revalidates_cached_blob_contents() {
2526        let temp = tempfile::TempDir::new().expect("tempdir");
2527        let data_dir = temp.path().join("cass-data");
2528        let source_path = temp.path().join("cached-source.jsonl");
2529        let source_bytes = b"{\"type\":\"message\",\"text\":\"cache me\"}\n";
2530        fs::write(&source_path, source_bytes).expect("write source");
2531
2532        let first = capture_source_file(RawMirrorCaptureInput {
2533            data_dir: &data_dir,
2534            provider: "codex",
2535            source_id: "local",
2536            origin_kind: "local",
2537            origin_host: None,
2538            source_path: &source_path,
2539            db_links: &[],
2540        })
2541        .expect("first capture");
2542
2543        let blob_path = data_dir
2544            .join(RAW_MIRROR_ROOT_DIR)
2545            .join(RAW_MIRROR_VERSION_DIR)
2546            .join(&first.blob_relative_path);
2547        fs::write(&blob_path, b"corrupted cached blob").expect("corrupt cached blob");
2548
2549        let err = capture_source_file(RawMirrorCaptureInput {
2550            data_dir: &data_dir,
2551            provider: "codex",
2552            source_id: "local",
2553            origin_kind: "local",
2554            origin_host: None,
2555            source_path: &source_path,
2556            db_links: &[],
2557        })
2558        .expect_err("corrupted content-addressed blob must be rejected");
2559        assert!(
2560            err.to_string().contains("existing raw mirror blob"),
2561            "unexpected cached-blob error: {err:#}"
2562        );
2563        assert_eq!(fs::read(&source_path).expect("source bytes"), source_bytes);
2564    }
2565
2566    #[cfg(unix)]
2567    #[test]
2568    fn capture_source_file_does_not_reuse_cache_after_same_size_mtime_preserving_rewrite() {
2569        let temp = tempfile::TempDir::new().expect("tempdir");
2570        let data_dir = temp.path().join("cass-data");
2571        let source_path = temp.path().join("same-size-rewrite.jsonl");
2572        let first_bytes = b"same length payload A\n";
2573        let second_bytes = b"same length payload B\n";
2574        fs::write(&source_path, first_bytes).expect("write first source");
2575
2576        let first_modified = fs::metadata(&source_path)
2577            .expect("first metadata")
2578            .modified()
2579            .expect("first modified time");
2580        let first = capture_source_file(RawMirrorCaptureInput {
2581            data_dir: &data_dir,
2582            provider: "codex",
2583            source_id: "local",
2584            origin_kind: "local",
2585            origin_host: None,
2586            source_path: &source_path,
2587            db_links: &[],
2588        })
2589        .expect("first capture");
2590
2591        std::thread::sleep(std::time::Duration::from_millis(5));
2592        fs::write(&source_path, second_bytes).expect("rewrite source");
2593        let source = OpenOptions::new()
2594            .write(true)
2595            .open(&source_path)
2596            .expect("open rewritten source");
2597        source
2598            .set_times(std::fs::FileTimes::new().set_modified(first_modified))
2599            .expect("restore original mtime");
2600
2601        let second = capture_source_file(RawMirrorCaptureInput {
2602            data_dir: &data_dir,
2603            provider: "codex",
2604            source_id: "local",
2605            origin_kind: "local",
2606            origin_host: None,
2607            source_path: &source_path,
2608            db_links: &[],
2609        })
2610        .expect("second capture");
2611
2612        assert_ne!(first.blob_blake3, second.blob_blake3);
2613        assert_eq!(
2614            second.blob_blake3,
2615            blake3::hash(second_bytes).to_hex().to_string()
2616        );
2617        assert_eq!(
2618            fs::read(&source_path).expect("source bytes after rewrite"),
2619            second_bytes
2620        );
2621    }
2622
2623    #[cfg(unix)]
2624    #[test]
2625    fn capture_source_file_rejects_symlinked_existing_blob_path() {
2626        let temp = tempfile::TempDir::new().expect("tempdir");
2627        let data_dir = temp.path().join("cass-data");
2628        let source_path = temp.path().join("cached-source.jsonl");
2629        let source_bytes = b"{\"type\":\"message\",\"text\":\"cache me\"}\n";
2630        fs::write(&source_path, source_bytes).expect("write source");
2631
2632        let blob_blake3 = blake3::hash(source_bytes).to_hex().to_string();
2633        let blob_relative_path =
2634            raw_mirror_blob_relative_path(&blob_blake3).expect("blob relative path");
2635        let blob_path = data_dir
2636            .join(RAW_MIRROR_ROOT_DIR)
2637            .join(RAW_MIRROR_VERSION_DIR)
2638            .join(&blob_relative_path);
2639        fs::create_dir_all(blob_path.parent().expect("blob parent")).expect("blob parent dir");
2640        let outside = temp.path().join("outside.raw");
2641        fs::write(&outside, source_bytes).expect("outside blob bytes");
2642        std::os::unix::fs::symlink(&outside, &blob_path).expect("symlink blob");
2643
2644        let err = capture_source_file(RawMirrorCaptureInput {
2645            data_dir: &data_dir,
2646            provider: "codex",
2647            source_id: "local",
2648            origin_kind: "local",
2649            origin_host: None,
2650            source_path: &source_path,
2651            db_links: &[],
2652        })
2653        .expect_err("symlinked content-addressed blob path must be rejected");
2654        assert!(
2655            err.to_string().contains("symlink raw mirror blob"),
2656            "unexpected symlink-blob error: {err:#}"
2657        );
2658
2659        let manifest_root = data_dir
2660            .join(RAW_MIRROR_ROOT_DIR)
2661            .join(RAW_MIRROR_VERSION_DIR)
2662            .join("manifests");
2663        assert!(
2664            !manifest_root.exists(),
2665            "failed blob publish must not write a manifest pointing at a symlinked blob"
2666        );
2667        assert_eq!(fs::read(&source_path).expect("source bytes"), source_bytes);
2668        assert_eq!(fs::read(&outside).expect("outside bytes"), source_bytes);
2669    }
2670
2671    #[cfg(unix)]
2672    #[test]
2673    fn capture_source_file_rejects_symlinked_raw_mirror_root_dir() {
2674        let temp = tempfile::TempDir::new().expect("tempdir");
2675        let data_dir = temp.path().join("cass-data");
2676        let source_path = temp.path().join("source.jsonl");
2677        let outside_mirror = temp.path().join("outside-mirror");
2678        let source_bytes = b"{\"type\":\"message\",\"text\":\"do not redirect archive\"}\n";
2679
2680        fs::create_dir_all(&data_dir).expect("data dir");
2681        fs::create_dir_all(&outside_mirror).expect("outside mirror dir");
2682        fs::write(&source_path, source_bytes).expect("write source");
2683        std::os::unix::fs::symlink(&outside_mirror, data_dir.join(RAW_MIRROR_ROOT_DIR))
2684            .expect("symlink raw mirror root");
2685
2686        let err = capture_source_file(RawMirrorCaptureInput {
2687            data_dir: &data_dir,
2688            provider: "codex",
2689            source_id: "local",
2690            origin_kind: "local",
2691            origin_host: None,
2692            source_path: &source_path,
2693            db_links: &[],
2694        })
2695        .expect_err("symlinked raw-mirror root must be rejected");
2696
2697        assert!(
2698            err.to_string().contains("symlink raw mirror dir"),
2699            "unexpected symlink-root error: {err:#}"
2700        );
2701        assert!(
2702            !outside_mirror.join(RAW_MIRROR_VERSION_DIR).exists(),
2703            "raw mirror capture must not create redirected archive state outside data_dir"
2704        );
2705        assert_eq!(fs::read(&source_path).expect("source bytes"), source_bytes);
2706    }
2707
2708    #[cfg(unix)]
2709    #[test]
2710    fn capture_source_file_rejects_symlinked_blob_directory_component() {
2711        let temp = tempfile::TempDir::new().expect("tempdir");
2712        let data_dir = temp.path().join("cass-data");
2713        let root = data_dir
2714            .join(RAW_MIRROR_ROOT_DIR)
2715            .join(RAW_MIRROR_VERSION_DIR);
2716        let source_path = temp.path().join("source.jsonl");
2717        let outside_blobs = temp.path().join("outside-blobs");
2718        let source_bytes = b"{\"type\":\"message\",\"text\":\"do not redirect blobs\"}\n";
2719
2720        fs::create_dir_all(&root).expect("raw mirror root");
2721        fs::create_dir_all(&outside_blobs).expect("outside blobs dir");
2722        fs::write(&source_path, source_bytes).expect("write source");
2723        std::os::unix::fs::symlink(&outside_blobs, root.join("blobs")).expect("symlink blobs dir");
2724
2725        let err = capture_source_file(RawMirrorCaptureInput {
2726            data_dir: &data_dir,
2727            provider: "codex",
2728            source_id: "local",
2729            origin_kind: "local",
2730            origin_host: None,
2731            source_path: &source_path,
2732            db_links: &[],
2733        })
2734        .expect_err("symlinked blob directory must be rejected");
2735
2736        assert!(
2737            err.to_string().contains("symlink raw mirror dir"),
2738            "unexpected symlink-blob-dir error: {err:#}"
2739        );
2740        assert!(
2741            !outside_blobs.join(RAW_MIRROR_HASH_ALGORITHM).exists(),
2742            "raw mirror capture must not create redirected blob state outside data_dir"
2743        );
2744        assert!(
2745            !root.join("manifests").exists(),
2746            "failed blob publish must not write a manifest"
2747        );
2748        assert_eq!(fs::read(&source_path).expect("source bytes"), source_bytes);
2749    }
2750
2751    #[test]
2752    fn capture_source_file_rejects_non_file_sources() {
2753        let temp = tempfile::TempDir::new().expect("tempdir");
2754        let data_dir = temp.path().join("cass-data");
2755        let source_dir = temp.path().join("source-dir");
2756        fs::create_dir(&source_dir).expect("source dir");
2757
2758        let err = capture_source_file(RawMirrorCaptureInput {
2759            data_dir: &data_dir,
2760            provider: "codex",
2761            source_id: "local",
2762            origin_kind: "local",
2763            origin_host: None,
2764            source_path: &source_dir,
2765            db_links: &[],
2766        })
2767        .expect_err("directory source should be rejected");
2768        assert!(
2769            err.to_string().contains("non-file source"),
2770            "unexpected non-file-source error: {err}"
2771        );
2772        assert!(
2773            !data_dir.join(RAW_MIRROR_ROOT_DIR).exists(),
2774            "rejected non-file sources must not initialize raw mirror storage"
2775        );
2776    }
2777
2778    #[cfg(unix)]
2779    #[test]
2780    fn capture_source_file_rejects_unreadable_sources_without_manifest() {
2781        use std::os::unix::fs::PermissionsExt;
2782
2783        let temp = tempfile::TempDir::new().expect("tempdir");
2784        let data_dir = temp.path().join("cass-data");
2785        let source_path = temp.path().join("unreadable.jsonl");
2786        fs::write(&source_path, b"private session bytes\n").expect("source");
2787        fs::set_permissions(&source_path, fs::Permissions::from_mode(0o000))
2788            .expect("make source unreadable");
2789
2790        let err = capture_source_file(RawMirrorCaptureInput {
2791            data_dir: &data_dir,
2792            provider: "codex",
2793            source_id: "local",
2794            origin_kind: "local",
2795            origin_host: None,
2796            source_path: &source_path,
2797            db_links: &[],
2798        })
2799        .expect_err("unreadable source should be rejected");
2800        fs::set_permissions(&source_path, fs::Permissions::from_mode(0o600))
2801            .expect("restore source perms");
2802        assert!(
2803            err.to_string().contains("open raw mirror source"),
2804            "unexpected unreadable-source error: {err}"
2805        );
2806        assert!(
2807            !data_dir.join("raw-mirror/v1/manifests").exists(),
2808            "failed unreadable-source captures must not publish manifests"
2809        );
2810    }
2811
2812    #[cfg(unix)]
2813    #[test]
2814    fn capture_source_file_rejects_symlink_sources() {
2815        use std::os::unix::fs::symlink;
2816
2817        let temp = tempfile::TempDir::new().expect("tempdir");
2818        let data_dir = temp.path().join("cass-data");
2819        let real_source = temp.path().join("real.jsonl");
2820        let symlink_source = temp.path().join("link.jsonl");
2821        fs::write(&real_source, b"secret session").expect("write source");
2822        symlink(&real_source, &symlink_source).expect("symlink");
2823
2824        let err = capture_source_file(RawMirrorCaptureInput {
2825            data_dir: &data_dir,
2826            provider: "codex",
2827            source_id: "local",
2828            origin_kind: "local",
2829            origin_host: None,
2830            source_path: &symlink_source,
2831            db_links: &[],
2832        })
2833        .expect_err("symlink source should be rejected");
2834        assert!(
2835            err.to_string().contains("symlink source"),
2836            "unexpected error: {err:#}"
2837        );
2838    }
2839}