Skip to main content

coding_agent_search/
raw_mirror.rs

1use anyhow::{Context, Result, anyhow};
2use serde::{Deserialize, Serialize};
3use serde_json::{Value, json};
4use std::collections::{HashMap, HashSet};
5use std::fs::{self, File, OpenOptions};
6use std::io::{Read, Write};
7use std::path::{Component, Path, PathBuf};
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::{Mutex, OnceLock};
10use std::time::{Duration, SystemTime, UNIX_EPOCH};
11
12const RAW_MIRROR_SCHEMA_VERSION: u32 = 1;
13const RAW_MIRROR_ROOT_DIR: &str = "raw-mirror";
14const RAW_MIRROR_VERSION_DIR: &str = "v1";
15const RAW_MIRROR_MANIFEST_KIND: &str = "cass_raw_session_mirror_v1";
16const RAW_MIRROR_HASH_ALGORITHM: &str = "blake3";
17const RAW_MIRROR_BLOB_EXTENSION: &str = "raw";
18
19static TEMP_NONCE: AtomicU64 = AtomicU64::new(0);
20static BLOB_CAPTURE_CACHE: OnceLock<Mutex<HashMap<RawMirrorBlobCacheKey, RawMirrorBlobRecord>>> =
21    OnceLock::new();
22static MANIFEST_UPDATE_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
23
24#[derive(Debug, Clone)]
25pub struct RawMirrorCaptureInput<'a> {
26    pub data_dir: &'a Path,
27    pub provider: &'a str,
28    pub source_id: &'a str,
29    pub origin_kind: &'a str,
30    pub origin_host: Option<&'a str>,
31    pub source_path: &'a Path,
32    pub db_links: &'a [RawMirrorDbLink],
33}
34
35#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
36pub struct RawMirrorCaptureRecord {
37    pub manifest_id: String,
38    pub manifest_relative_path: String,
39    pub blob_relative_path: String,
40    pub blob_blake3: String,
41    pub blob_size_bytes: u64,
42    pub captured_at_ms: i64,
43    pub source_mtime_ms: Option<i64>,
44    pub already_present: bool,
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
48pub struct RawMirrorDbLink {
49    pub conversation_id: Option<i64>,
50    pub message_count: Option<usize>,
51    pub source_path: Option<String>,
52    pub started_at_ms: Option<i64>,
53}
54
55#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
56pub struct RawMirrorStorageSummary {
57    pub initialized: bool,
58    pub root_path: String,
59    pub total_storage_bytes: u64,
60    pub manifest_count: u64,
61    pub manifest_bytes: u64,
62    pub unique_blob_count: u64,
63    pub total_blob_bytes: u64,
64    pub largest_blob_bytes: u64,
65    pub missing_blob_count: u64,
66    pub invalid_manifest_count: u64,
67    pub oldest_capture_at_ms: Option<i64>,
68    pub newest_capture_at_ms: Option<i64>,
69    pub oldest_source_mtime_ms: Option<i64>,
70    pub newest_source_mtime_ms: Option<i64>,
71}
72
73pub fn storage_summary(data_dir: &Path) -> RawMirrorStorageSummary {
74    let root = raw_mirror_root(data_dir);
75    let mut summary = RawMirrorStorageSummary {
76        root_path: root.display().to_string(),
77        ..RawMirrorStorageSummary::default()
78    };
79    let root_metadata = match fs::symlink_metadata(&root) {
80        Ok(metadata) => metadata,
81        Err(_) => return summary,
82    };
83    summary.initialized = true;
84    if root_metadata.file_type().is_symlink() || !root_metadata.is_dir() {
85        summary.invalid_manifest_count = 1;
86        return summary;
87    }
88
89    summary.total_storage_bytes = raw_mirror_dir_file_bytes(&root);
90
91    let manifests_dir = root.join("manifests");
92    let Ok(manifests_metadata) = fs::symlink_metadata(&manifests_dir) else {
93        return summary;
94    };
95    if manifests_metadata.file_type().is_symlink() || !manifests_metadata.is_dir() {
96        summary.invalid_manifest_count = summary.invalid_manifest_count.saturating_add(1);
97        return summary;
98    }
99    let entries = match fs::read_dir(&manifests_dir) {
100        Ok(entries) => entries,
101        Err(_) => return summary,
102    };
103    let mut seen_blobs = HashSet::new();
104    for entry in entries {
105        let Ok(entry) = entry else {
106            summary.invalid_manifest_count = summary.invalid_manifest_count.saturating_add(1);
107            continue;
108        };
109        let path = entry.path();
110        let manifest_metadata = match fs::symlink_metadata(&path) {
111            Ok(metadata) if metadata.is_file() && !metadata.file_type().is_symlink() => metadata,
112            _ => {
113                summary.invalid_manifest_count = summary.invalid_manifest_count.saturating_add(1);
114                continue;
115            }
116        };
117        summary.manifest_bytes = summary
118            .manifest_bytes
119            .saturating_add(manifest_metadata.len());
120        let manifest = match read_raw_mirror_manifest(&path) {
121            Ok(manifest) if manifest.manifest_kind == RAW_MIRROR_MANIFEST_KIND => manifest,
122            _ => {
123                summary.invalid_manifest_count = summary.invalid_manifest_count.saturating_add(1);
124                continue;
125            }
126        };
127        summary.manifest_count = summary.manifest_count.saturating_add(1);
128        merge_min_max(
129            &mut summary.oldest_capture_at_ms,
130            &mut summary.newest_capture_at_ms,
131            Some(manifest.captured_at_ms),
132        );
133        merge_min_max(
134            &mut summary.oldest_source_mtime_ms,
135            &mut summary.newest_source_mtime_ms,
136            manifest.source_mtime_ms,
137        );
138
139        let Some(blob_relative_path) = raw_mirror_blob_relative_path(&manifest.blob_blake3) else {
140            summary.invalid_manifest_count = summary.invalid_manifest_count.saturating_add(1);
141            continue;
142        };
143        if manifest.blob_relative_path != blob_relative_path {
144            summary.invalid_manifest_count = summary.invalid_manifest_count.saturating_add(1);
145            continue;
146        }
147
148        if !seen_blobs.insert(blob_relative_path.clone()) {
149            continue;
150        }
151        let blob_path = root.join(blob_relative_path);
152        match fs::symlink_metadata(&blob_path) {
153            Ok(metadata) if metadata.is_file() && !metadata.file_type().is_symlink() => {
154                let size = metadata.len();
155                summary.unique_blob_count = summary.unique_blob_count.saturating_add(1);
156                summary.total_blob_bytes = summary.total_blob_bytes.saturating_add(size);
157                summary.largest_blob_bytes = summary.largest_blob_bytes.max(size);
158            }
159            _ => {
160                summary.missing_blob_count = summary.missing_blob_count.saturating_add(1);
161            }
162        }
163    }
164
165    summary
166}
167
168#[derive(Debug, Clone, Default)]
169pub struct RawMirrorPruneOptions {
170    pub older_than_ms: Option<i64>,
171    pub max_size_bytes: Option<u64>,
172    pub keep_tags: Vec<String>,
173    pub safety_hold_down_ms: i64,
174    pub apply: bool,
175}
176
177#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
178pub struct RawMirrorPruneReport {
179    pub initialized: bool,
180    pub root_path: String,
181    pub mode: String,
182    pub manifest_count: u64,
183    pub unique_blob_count: u64,
184    pub current_blob_bytes: u64,
185    pub safety_hold_down_ms: i64,
186    pub keep_tags: Vec<String>,
187    pub pinned_manifest_count: u64,
188    pub pinned_blob_count: u64,
189    pub planned_manifest_count: u64,
190    pub planned_blob_count: u64,
191    pub planned_reclaim_bytes: u64,
192    pub applied_manifest_count: u64,
193    pub applied_blob_count: u64,
194    pub applied_reclaim_bytes: u64,
195    pub audit_log_path: Option<String>,
196    pub entries: Vec<RawMirrorPruneEntry>,
197}
198
199#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
200pub struct RawMirrorPruneEntry {
201    pub kind: String,
202    pub path: String,
203    pub blob_blake3: Option<String>,
204    pub size_bytes: u64,
205    pub reason: String,
206    pub applied: bool,
207}
208
209#[derive(Debug, Clone)]
210struct RawMirrorPruneManifest {
211    manifest_id: String,
212    relative_path: String,
213    size_bytes: u64,
214    blob_blake3: String,
215    blob_relative_path: String,
216    blob_size_bytes: u64,
217    captured_at_ms: i64,
218    provider: String,
219    original_path: String,
220    db_links: Vec<RawMirrorDbLink>,
221}
222
223pub fn prune(data_dir: &Path, options: RawMirrorPruneOptions) -> Result<RawMirrorPruneReport> {
224    let root = raw_mirror_root(data_dir);
225    let mut report = RawMirrorPruneReport {
226        initialized: false,
227        root_path: root.display().to_string(),
228        mode: if options.apply {
229            "apply".to_string()
230        } else {
231            "dry-run".to_string()
232        },
233        manifest_count: 0,
234        unique_blob_count: 0,
235        current_blob_bytes: 0,
236        safety_hold_down_ms: options.safety_hold_down_ms,
237        keep_tags: options.keep_tags.clone(),
238        pinned_manifest_count: 0,
239        pinned_blob_count: 0,
240        planned_manifest_count: 0,
241        planned_blob_count: 0,
242        planned_reclaim_bytes: 0,
243        applied_manifest_count: 0,
244        applied_blob_count: 0,
245        applied_reclaim_bytes: 0,
246        audit_log_path: None,
247        entries: Vec::new(),
248    };
249
250    let metadata = match fs::symlink_metadata(&root) {
251        Ok(metadata) => metadata,
252        Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(report),
253        Err(err) => {
254            return Err(err).with_context(|| format!("stat raw mirror root {}", root.display()));
255        }
256    };
257    if metadata.file_type().is_symlink() || !metadata.is_dir() {
258        anyhow::bail!(
259            "refusing to prune invalid raw mirror root {}",
260            root.display()
261        );
262    }
263    report.initialized = true;
264
265    let manifests = collect_prune_manifests(&root)?;
266    report.manifest_count = manifests.len() as u64;
267
268    let mut blob_to_manifests: HashMap<String, Vec<String>> = HashMap::new();
269    let mut manifest_by_id: HashMap<String, &RawMirrorPruneManifest> = HashMap::new();
270    let mut blob_size_by_relative: HashMap<String, u64> = HashMap::new();
271    for manifest in &manifests {
272        manifest_by_id.insert(manifest.manifest_id.clone(), manifest);
273        blob_to_manifests
274            .entry(manifest.blob_relative_path.clone())
275            .or_default()
276            .push(manifest.manifest_id.clone());
277        blob_size_by_relative
278            .entry(manifest.blob_relative_path.clone())
279            .or_insert_with(|| {
280                blob_file_size(&root.join(&manifest.blob_relative_path))
281                    .unwrap_or(manifest.blob_size_bytes)
282            });
283    }
284    report.unique_blob_count = blob_size_by_relative.len() as u64;
285    report.current_blob_bytes = blob_size_by_relative
286        .values()
287        .copied()
288        .fold(0u64, u64::saturating_add);
289
290    let now = now_ms();
291    let pinned_manifests = pinned_prune_manifest_ids(
292        data_dir,
293        &manifests,
294        &options.keep_tags,
295        options.safety_hold_down_ms,
296        now,
297    )?;
298    report.pinned_manifest_count = pinned_manifests.len() as u64;
299    let pinned_blobs: HashSet<String> = blob_to_manifests
300        .iter()
301        .filter(|(_, manifest_ids)| manifest_ids.iter().any(|id| pinned_manifests.contains(id)))
302        .map(|(blob_relative_path, _)| blob_relative_path.clone())
303        .collect();
304    report.pinned_blob_count = pinned_blobs.len() as u64;
305
306    let mut selected_manifests: HashSet<String> = HashSet::new();
307    let mut manifest_reasons: HashMap<String, String> = HashMap::new();
308
309    if let Some(older_than_ms) = options.older_than_ms {
310        let cutoff_ms = now.saturating_sub(older_than_ms.max(0));
311        for manifest in &manifests {
312            if manifest.captured_at_ms <= cutoff_ms
313                && !pinned_manifests.contains(&manifest.manifest_id)
314            {
315                selected_manifests.insert(manifest.manifest_id.clone());
316                manifest_reasons
317                    .entry(manifest.manifest_id.clone())
318                    .or_insert_with(|| format!("captured_at_ms <= {cutoff_ms}"));
319            }
320        }
321    }
322
323    if let Some(max_size_bytes) = options.max_size_bytes
324        && report.current_blob_bytes > max_size_bytes
325    {
326        let mut blob_groups: Vec<_> = blob_to_manifests
327            .iter()
328            .map(|(blob_relative_path, manifest_ids)| {
329                let oldest_capture = manifest_ids
330                    .iter()
331                    .filter_map(|id| manifest_by_id.get(id).map(|m| m.captured_at_ms))
332                    .min()
333                    .unwrap_or(i64::MAX);
334                let size = blob_size_by_relative
335                    .get(blob_relative_path)
336                    .copied()
337                    .unwrap_or(0);
338                (
339                    blob_relative_path.clone(),
340                    manifest_ids.clone(),
341                    oldest_capture,
342                    size,
343                )
344            })
345            .collect::<Vec<_>>();
346        blob_groups.sort_by(|left, right| left.2.cmp(&right.2).then_with(|| left.0.cmp(&right.0)));
347
348        let mut projected_bytes = report.current_blob_bytes;
349        for (blob_relative_path, manifest_ids, _, size) in blob_groups {
350            if projected_bytes <= max_size_bytes {
351                break;
352            }
353            if pinned_blobs.contains(&blob_relative_path) {
354                continue;
355            }
356            for manifest_id in manifest_ids {
357                if !pinned_manifests.contains(&manifest_id) {
358                    selected_manifests.insert(manifest_id.clone());
359                    manifest_reasons.entry(manifest_id).or_insert_with(|| {
360                        format!("max-size over budget; retiring blob {blob_relative_path}")
361                    });
362                }
363            }
364            projected_bytes = projected_bytes.saturating_sub(size);
365        }
366    }
367
368    let selected_blobs: HashSet<String> = blob_to_manifests
369        .iter()
370        .filter(|(_, manifest_ids)| {
371            manifest_ids
372                .iter()
373                .all(|id| selected_manifests.contains(id))
374        })
375        .map(|(blob_relative_path, _)| blob_relative_path.clone())
376        .collect();
377
378    let mut entries = Vec::new();
379    let mut selected_manifest_ids = selected_manifests.into_iter().collect::<Vec<_>>();
380    selected_manifest_ids.sort();
381    for manifest_id in selected_manifest_ids {
382        let Some(manifest) = manifest_by_id.get(&manifest_id) else {
383            continue;
384        };
385        let reason = manifest_reasons
386            .remove(&manifest_id)
387            .unwrap_or_else(|| "selected by retention policy".to_string());
388        entries.push(RawMirrorPruneEntry {
389            kind: "manifest".to_string(),
390            path: manifest.relative_path.clone(),
391            blob_blake3: Some(manifest.blob_blake3.clone()),
392            size_bytes: manifest.size_bytes,
393            reason,
394            applied: false,
395        });
396    }
397
398    let mut selected_blob_paths = selected_blobs.into_iter().collect::<Vec<_>>();
399    selected_blob_paths.sort();
400    for blob_relative_path in selected_blob_paths {
401        let size = blob_size_by_relative
402            .get(&blob_relative_path)
403            .copied()
404            .unwrap_or(0);
405        let blob_blake3 = blob_relative_path
406            .rsplit('/')
407            .next()
408            .and_then(|name| name.strip_suffix(".raw"))
409            .map(ToOwned::to_owned);
410        entries.push(RawMirrorPruneEntry {
411            kind: "blob".to_string(),
412            path: blob_relative_path,
413            blob_blake3,
414            size_bytes: size,
415            reason: "no retained manifest references this blob after prune plan".to_string(),
416            applied: false,
417        });
418    }
419
420    report.planned_manifest_count = entries
421        .iter()
422        .filter(|entry| entry.kind == "manifest")
423        .count() as u64;
424    report.planned_blob_count = entries.iter().filter(|entry| entry.kind == "blob").count() as u64;
425    report.planned_reclaim_bytes = entries
426        .iter()
427        .map(|entry| entry.size_bytes)
428        .fold(0, u64::saturating_add);
429
430    if options.apply {
431        for entry in &mut entries {
432            let path = root.join(&entry.path);
433            let removed = remove_prune_target_file(&path)
434                .with_context(|| format!("applying raw mirror prune for {}", path.display()))?;
435            entry.applied = removed;
436            if removed {
437                if entry.kind == "manifest" {
438                    report.applied_manifest_count = report.applied_manifest_count.saturating_add(1);
439                } else if entry.kind == "blob" {
440                    report.applied_blob_count = report.applied_blob_count.saturating_add(1);
441                }
442                report.applied_reclaim_bytes = report
443                    .applied_reclaim_bytes
444                    .saturating_add(entry.size_bytes);
445            }
446        }
447    }
448
449    report.entries = entries;
450    if !report.entries.is_empty() {
451        let audit_path = append_prune_audit_log(&root, &report)?;
452        report.audit_log_path = Some(audit_path.display().to_string());
453    }
454    Ok(report)
455}
456
457fn collect_prune_manifests(root: &Path) -> Result<Vec<RawMirrorPruneManifest>> {
458    let manifests_dir = root.join("manifests");
459    let metadata = match fs::symlink_metadata(&manifests_dir) {
460        Ok(metadata) => metadata,
461        Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
462        Err(err) => return Err(err).with_context(|| format!("stat {}", manifests_dir.display())),
463    };
464    if metadata.file_type().is_symlink() || !metadata.is_dir() {
465        anyhow::bail!(
466            "refusing to prune invalid raw mirror manifests directory {}",
467            manifests_dir.display()
468        );
469    }
470
471    let mut manifests = Vec::new();
472    for entry in
473        fs::read_dir(&manifests_dir).with_context(|| format!("read {}", manifests_dir.display()))?
474    {
475        let entry = entry?;
476        let path = entry.path();
477        if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
478            continue;
479        }
480        let manifest_metadata = fs::symlink_metadata(&path)
481            .with_context(|| format!("stat raw mirror manifest {}", path.display()))?;
482        if manifest_metadata.file_type().is_symlink() || !manifest_metadata.is_file() {
483            anyhow::bail!(
484                "refusing to prune with non-regular raw mirror manifest {}",
485                path.display()
486            );
487        }
488        let manifest = read_raw_mirror_manifest(&path)?;
489        if manifest.manifest_kind != RAW_MIRROR_MANIFEST_KIND {
490            anyhow::bail!(
491                "refusing to prune with unexpected raw mirror manifest kind `{}` in {}",
492                manifest.manifest_kind,
493                path.display()
494            );
495        }
496        let Some(expected_blob_relative_path) =
497            raw_mirror_blob_relative_path(&manifest.blob_blake3)
498        else {
499            anyhow::bail!(
500                "refusing to prune raw mirror manifest {} with invalid blob hash",
501                path.display()
502            );
503        };
504        if manifest.blob_relative_path != expected_blob_relative_path {
505            anyhow::bail!(
506                "refusing to prune raw mirror manifest {} with unexpected blob path `{}`",
507                path.display(),
508                manifest.blob_relative_path
509            );
510        }
511        let relative_path = path
512            .strip_prefix(root)
513            .unwrap_or(&path)
514            .display()
515            .to_string();
516        manifests.push(RawMirrorPruneManifest {
517            manifest_id: manifest.manifest_id,
518            relative_path,
519            size_bytes: manifest_metadata.len(),
520            blob_blake3: manifest.blob_blake3,
521            blob_relative_path: manifest.blob_relative_path,
522            blob_size_bytes: manifest.blob_size_bytes,
523            captured_at_ms: manifest.captured_at_ms,
524            provider: manifest.provider,
525            original_path: manifest.original_path,
526            db_links: manifest.db_links,
527        });
528    }
529    manifests.sort_by(|left, right| {
530        left.captured_at_ms
531            .cmp(&right.captured_at_ms)
532            .then_with(|| left.provider.cmp(&right.provider))
533            .then_with(|| left.original_path.cmp(&right.original_path))
534            .then_with(|| left.manifest_id.cmp(&right.manifest_id))
535    });
536    Ok(manifests)
537}
538
539fn pinned_prune_manifest_ids(
540    data_dir: &Path,
541    manifests: &[RawMirrorPruneManifest],
542    keep_tags: &[String],
543    safety_hold_down_ms: i64,
544    now_ms: i64,
545) -> Result<HashSet<String>> {
546    let mut pinned = HashSet::new();
547    if safety_hold_down_ms > 0 {
548        let cutoff_ms = now_ms.saturating_sub(safety_hold_down_ms);
549        for manifest in manifests {
550            if manifest.captured_at_ms > cutoff_ms {
551                pinned.insert(manifest.manifest_id.clone());
552            }
553        }
554    }
555
556    let normalized_keep_tags = keep_tags
557        .iter()
558        .map(|tag| tag.trim())
559        .filter(|tag| !tag.is_empty())
560        .map(ToOwned::to_owned)
561        .collect::<Vec<_>>();
562    if normalized_keep_tags.is_empty() {
563        return Ok(pinned);
564    }
565
566    let keep_tag_conversation_ids =
567        load_keep_tag_conversation_ids(data_dir, manifests, &normalized_keep_tags)?;
568    for manifest in manifests {
569        if manifest.db_links.iter().any(|link| {
570            link.conversation_id
571                .is_some_and(|id| keep_tag_conversation_ids.contains(&id))
572        }) {
573            pinned.insert(manifest.manifest_id.clone());
574        }
575    }
576    Ok(pinned)
577}
578
579fn load_keep_tag_conversation_ids(
580    data_dir: &Path,
581    manifests: &[RawMirrorPruneManifest],
582    keep_tags: &[String],
583) -> Result<HashSet<i64>> {
584    use frankensqlite::compat::{ConnectionExt as _, ParamValue, RowExt as _};
585
586    let mut conversation_ids = manifests
587        .iter()
588        .flat_map(|manifest| manifest.db_links.iter())
589        .filter_map(|link| link.conversation_id)
590        .collect::<Vec<_>>();
591    conversation_ids.sort_unstable();
592    conversation_ids.dedup();
593    if conversation_ids.is_empty() {
594        return Ok(HashSet::new());
595    }
596
597    let db_path = data_dir.join("agent_search.db");
598    let conn = crate::storage::sqlite::open_franken_raw_readonly_connection_with_timeout(
599        &db_path,
600        Duration::from_secs(30),
601    )
602    .with_context(|| {
603        format!(
604            "open {} to honor raw-mirror prune --keep-tag",
605            db_path.display()
606        )
607    })?;
608    let _ = conn.execute("PRAGMA query_only = 1;");
609
610    let mut pinned = HashSet::new();
611    for id_chunk in conversation_ids.chunks(400) {
612        let tag_placeholders = (0..keep_tags.len())
613            .map(|idx| format!("?{}", idx + 1))
614            .collect::<Vec<_>>()
615            .join(", ");
616        let id_offset = keep_tags.len();
617        let id_placeholders = (0..id_chunk.len())
618            .map(|idx| format!("?{}", id_offset + idx + 1))
619            .collect::<Vec<_>>()
620            .join(", ");
621        let sql = format!(
622            "SELECT DISTINCT ct.conversation_id \
623             FROM conversation_tags ct \
624             JOIN tags t ON t.id = ct.tag_id \
625             WHERE t.name IN ({tag_placeholders}) \
626               AND ct.conversation_id IN ({id_placeholders})"
627        );
628        let mut params = keep_tags
629            .iter()
630            .map(|tag| ParamValue::from(tag.as_str()))
631            .collect::<Vec<_>>();
632        params.extend(id_chunk.iter().copied().map(ParamValue::from));
633        let rows: Vec<i64> = conn
634            .query_map_collect(&sql, &params, |row: &frankensqlite::Row| row.get_typed(0))
635            .with_context(|| "query raw-mirror prune keep-tag conversation pins")?;
636        pinned.extend(rows);
637    }
638
639    Ok(pinned)
640}
641
642fn blob_file_size(path: &Path) -> Option<u64> {
643    fs::symlink_metadata(path)
644        .ok()
645        .filter(|metadata| metadata.is_file() && !metadata.file_type().is_symlink())
646        .map(|metadata| metadata.len())
647}
648
649fn remove_prune_target_file(path: &Path) -> Result<bool> {
650    let metadata = match fs::symlink_metadata(path) {
651        Ok(metadata) => metadata,
652        Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(false),
653        Err(err) => return Err(err).with_context(|| format!("stat {}", path.display())),
654    };
655    if metadata.file_type().is_symlink() || !metadata.is_file() {
656        anyhow::bail!(
657            "refusing to prune non-regular raw mirror file {}",
658            path.display()
659        );
660    }
661    fs::remove_file(path).with_context(|| format!("remove raw mirror file {}", path.display()))?;
662    sync_parent(path)?;
663    Ok(true)
664}
665
666fn append_prune_audit_log(root: &Path, report: &RawMirrorPruneReport) -> Result<PathBuf> {
667    ensure_private_dir(root)?;
668    let audit_path = root.join("pruned.jsonl");
669    let mut file = OpenOptions::new()
670        .create(true)
671        .append(true)
672        .open(&audit_path)
673        .with_context(|| format!("open raw mirror prune audit {}", audit_path.display()))?;
674    set_private_file_permissions(&audit_path)?;
675    let now = now_ms();
676    for entry in &report.entries {
677        let record = json!({
678            "schema_version": 1,
679            "recorded_at_ms": now,
680            "mode": report.mode,
681            "kind": entry.kind,
682            "path": entry.path,
683            "blob_blake3": entry.blob_blake3,
684            "size_bytes": entry.size_bytes,
685            "reason": entry.reason,
686            "applied": entry.applied,
687        });
688        writeln!(file, "{record}")
689            .with_context(|| format!("write raw mirror prune audit {}", audit_path.display()))?;
690    }
691    file.sync_all()
692        .with_context(|| format!("sync raw mirror prune audit {}", audit_path.display()))?;
693    sync_parent(&audit_path)?;
694    Ok(audit_path)
695}
696
697fn merge_min_max(min: &mut Option<i64>, max: &mut Option<i64>, value: Option<i64>) {
698    let Some(value) = value else {
699        return;
700    };
701    *min = Some(min.map_or(value, |current| current.min(value)));
702    *max = Some(max.map_or(value, |current| current.max(value)));
703}
704
705fn raw_mirror_dir_file_bytes(root: &Path) -> u64 {
706    let mut total = 0u64;
707    let mut stack = vec![root.to_path_buf()];
708    while let Some(path) = stack.pop() {
709        let Ok(metadata) = fs::symlink_metadata(&path) else {
710            continue;
711        };
712        if metadata.file_type().is_symlink() {
713            continue;
714        }
715        if metadata.is_file() {
716            total = total.saturating_add(metadata.len());
717        } else if metadata.is_dir() {
718            let Ok(entries) = fs::read_dir(&path) else {
719                continue;
720            };
721            for entry in entries.flatten() {
722                stack.push(entry.path());
723            }
724        }
725    }
726    total
727}
728
729#[derive(Debug, Clone, PartialEq, Eq, Hash)]
730struct RawMirrorBlobCacheKey {
731    data_dir: PathBuf,
732    source_path: PathBuf,
733    source_identity: Option<String>,
734    source_size_bytes: u64,
735    source_mtime_ns: Option<u128>,
736    source_change_time_ns: Option<u128>,
737}
738
739#[derive(Debug, Clone, PartialEq, Eq)]
740struct RawMirrorBlobRecord {
741    blob_blake3: String,
742    bytes_copied: u64,
743}
744
745#[derive(Debug, Clone, Serialize, Deserialize)]
746struct RawMirrorCompressionEnvelope {
747    state: String,
748    algorithm: Option<String>,
749    uncompressed_size_bytes: Option<u64>,
750}
751
752#[derive(Debug, Clone, Serialize, Deserialize)]
753struct RawMirrorEncryptionEnvelope {
754    state: String,
755    algorithm: Option<String>,
756    key_id: Option<String>,
757    envelope_version: Option<u32>,
758}
759
760#[derive(Debug, Clone, Serialize, Deserialize)]
761struct RawMirrorVerificationRecord {
762    status: String,
763    verifier: String,
764    content_blake3: Option<String>,
765    verified_at_ms: Option<i64>,
766}
767
768#[derive(Debug, Clone, Serialize, Deserialize)]
769struct RawMirrorManifestFile {
770    schema_version: u32,
771    manifest_kind: String,
772    manifest_id: String,
773    blob_hash_algorithm: String,
774    blob_relative_path: String,
775    blob_blake3: String,
776    blob_size_bytes: u64,
777    provider: String,
778    source_id: String,
779    origin_kind: String,
780    origin_host: Option<String>,
781    original_path: String,
782    redacted_original_path: String,
783    original_path_blake3: String,
784    captured_at_ms: i64,
785    source_mtime_ms: Option<i64>,
786    source_size_bytes: u64,
787    compression: RawMirrorCompressionEnvelope,
788    encryption: RawMirrorEncryptionEnvelope,
789    db_links: Vec<RawMirrorDbLink>,
790    verification: RawMirrorVerificationRecord,
791    manifest_blake3: Option<String>,
792}
793
794pub fn capture_source_file(input: RawMirrorCaptureInput<'_>) -> Result<RawMirrorCaptureRecord> {
795    let source_metadata = fs::symlink_metadata(input.source_path)
796        .with_context(|| format!("stat raw mirror source {}", input.source_path.display()))?;
797    if source_metadata.file_type().is_symlink() {
798        return Err(anyhow!(
799            "refusing to raw-mirror symlink source {}",
800            input.source_path.display()
801        ));
802    }
803    if !source_metadata.is_file() {
804        return Err(anyhow!(
805            "refusing to raw-mirror non-file source {}",
806            input.source_path.display()
807        ));
808    }
809
810    let root = ensure_raw_mirror_root(input.data_dir)?;
811    ensure_private_dir_descendant(&root, &root.join("tmp"))?;
812
813    let cache_key = raw_mirror_blob_cache_key(&input, &source_metadata);
814    let (blob_blake3, bytes_copied, blob_already_present) =
815        match cached_raw_mirror_blob_record(&cache_key, &root) {
816            Some(record) => (record.blob_blake3, record.bytes_copied, true),
817            None => {
818                let temp_dir = unique_capture_temp_dir(&root);
819                ensure_private_dir_descendant(&root, &temp_dir)?;
820                let CopyToTempResult {
821                    temp_path,
822                    blob_blake3,
823                    bytes_copied,
824                } = copy_source_to_private_temp(input.source_path, &temp_dir, &source_metadata)?;
825                let blob_relative_path = raw_mirror_blob_relative_path(&blob_blake3)
826                    .ok_or_else(|| anyhow!("computed invalid raw mirror blake3 digest"))?;
827                let blob_path = root.join(&blob_relative_path);
828                let already_present =
829                    publish_content_addressed_temp(&root, &temp_path, &blob_path, &blob_blake3)?;
830                remove_empty_temp_dir_best_effort(&temp_dir);
831                cache_raw_mirror_blob_record(
832                    cache_key.clone(),
833                    RawMirrorBlobRecord {
834                        blob_blake3: blob_blake3.clone(),
835                        bytes_copied,
836                    },
837                );
838                (blob_blake3, bytes_copied, already_present)
839            }
840        };
841    let blob_relative_path = raw_mirror_blob_relative_path(&blob_blake3)
842        .ok_or_else(|| anyhow!("computed invalid raw mirror blake3 digest"))?;
843
844    let original_path = input.source_path.display().to_string();
845    let original_path_blake3 = raw_mirror_original_path_blake3(&original_path);
846    let manifest_id = raw_mirror_manifest_id(
847        input.provider,
848        input.source_id,
849        input.origin_kind,
850        input.origin_host,
851        &original_path_blake3,
852        &blob_blake3,
853    );
854    let manifest_relative_path = raw_mirror_manifest_relative_path(&manifest_id);
855    let manifest_path = root.join(&manifest_relative_path);
856    let captured_at_ms = now_ms();
857    let source_mtime_ms = source_metadata.modified().ok().and_then(system_time_to_ms);
858    let mut manifest = RawMirrorManifestFile {
859        schema_version: RAW_MIRROR_SCHEMA_VERSION,
860        manifest_kind: RAW_MIRROR_MANIFEST_KIND.to_string(),
861        manifest_id: manifest_id.clone(),
862        blob_hash_algorithm: RAW_MIRROR_HASH_ALGORITHM.to_string(),
863        blob_relative_path: blob_relative_path.clone(),
864        blob_blake3: blob_blake3.clone(),
865        blob_size_bytes: bytes_copied,
866        provider: input.provider.to_string(),
867        source_id: input.source_id.to_string(),
868        origin_kind: input.origin_kind.to_string(),
869        origin_host: input.origin_host.map(ToOwned::to_owned),
870        original_path,
871        redacted_original_path: redacted_original_path(input.provider, input.source_path),
872        original_path_blake3,
873        captured_at_ms,
874        source_mtime_ms,
875        source_size_bytes: source_metadata.len(),
876        compression: RawMirrorCompressionEnvelope {
877            state: "none".to_string(),
878            algorithm: None,
879            uncompressed_size_bytes: Some(bytes_copied),
880        },
881        encryption: RawMirrorEncryptionEnvelope {
882            state: "none".to_string(),
883            algorithm: None,
884            key_id: None,
885            envelope_version: None,
886        },
887        db_links: unique_db_links(input.db_links),
888        verification: RawMirrorVerificationRecord {
889            status: "captured".to_string(),
890            verifier: "cass_indexer".to_string(),
891            content_blake3: Some(blob_blake3.clone()),
892            verified_at_ms: Some(captured_at_ms),
893        },
894        manifest_blake3: None,
895    };
896    manifest.manifest_blake3 = Some(raw_mirror_manifest_blake3(&manifest));
897    let manifest_bytes = serde_json::to_vec_pretty(&manifest)?;
898    let manifest_already_present =
899        publish_manifest_bytes_create_new(&root, &manifest_path, &manifest_bytes, &blob_blake3)?;
900    let (record_blob_size_bytes, record_captured_at_ms, record_source_mtime_ms) =
901        if manifest_already_present {
902            merge_raw_mirror_manifest_db_links(
903                &root,
904                &manifest_path,
905                input.db_links,
906                Some(&blob_blake3),
907            )?;
908            let published = read_raw_mirror_manifest(&manifest_path)?;
909            (
910                published.blob_size_bytes,
911                published.captured_at_ms,
912                published.source_mtime_ms,
913            )
914        } else {
915            (bytes_copied, captured_at_ms, source_mtime_ms)
916        };
917
918    Ok(RawMirrorCaptureRecord {
919        manifest_id,
920        manifest_relative_path,
921        blob_relative_path,
922        blob_blake3,
923        blob_size_bytes: record_blob_size_bytes,
924        captured_at_ms: record_captured_at_ms,
925        source_mtime_ms: record_source_mtime_ms,
926        already_present: blob_already_present && manifest_already_present,
927    })
928}
929
930pub fn merge_manifest_db_links(
931    data_dir: &Path,
932    manifest_relative_path: &str,
933    links: &[RawMirrorDbLink],
934) -> Result<()> {
935    if links.is_empty() {
936        return Ok(());
937    }
938    let root = raw_mirror_root(data_dir);
939    let manifest_path = raw_mirror_manifest_path_from_relative(&root, manifest_relative_path)?;
940    merge_raw_mirror_manifest_db_links(&root, &manifest_path, links, None)
941}
942
943struct CopyToTempResult {
944    temp_path: PathBuf,
945    blob_blake3: String,
946    bytes_copied: u64,
947}
948
949fn copy_source_to_private_temp(
950    source_path: &Path,
951    temp_dir: &Path,
952    source_metadata: &fs::Metadata,
953) -> Result<CopyToTempResult> {
954    let temp_path = unique_temp_path(temp_dir, "blob");
955    let mut source = open_stable_source_file(source_path, source_metadata)?;
956    let mut temp = private_create_new_file(&temp_path)?;
957    let mut hasher = blake3::Hasher::new();
958    let mut buffer = [0u8; 64 * 1024];
959    let mut bytes_copied = 0u64;
960    loop {
961        let read = source
962            .read(&mut buffer)
963            .with_context(|| format!("read raw mirror source {}", source_path.display()))?;
964        if read == 0 {
965            break;
966        }
967        temp.write_all(&buffer[..read])
968            .with_context(|| format!("write raw mirror temp {}", temp_path.display()))?;
969        hasher.update(&buffer[..read]);
970        bytes_copied = bytes_copied.saturating_add(read as u64);
971    }
972    temp.sync_all()
973        .with_context(|| format!("sync raw mirror temp {}", temp_path.display()))?;
974
975    let final_source_metadata = source
976        .metadata()
977        .with_context(|| format!("stat opened raw mirror source {}", source_path.display()))?;
978    if source_file_changed_during_capture(source_metadata, &final_source_metadata) {
979        remove_temp_best_effort(&temp_path);
980        return Err(anyhow!(
981            "raw mirror source {} changed while it was being captured; retry indexing to capture a stable copy",
982            source_path.display()
983        ));
984    }
985
986    Ok(CopyToTempResult {
987        temp_path,
988        blob_blake3: hasher.finalize().to_hex().to_string(),
989        bytes_copied,
990    })
991}
992
993fn open_stable_source_file(source_path: &Path, expected_metadata: &fs::Metadata) -> Result<File> {
994    let source = File::open(source_path)
995        .with_context(|| format!("open raw mirror source {}", source_path.display()))?;
996    let opened_metadata = source
997        .metadata()
998        .with_context(|| format!("stat opened raw mirror source {}", source_path.display()))?;
999    if !same_source_identity(expected_metadata, &opened_metadata) {
1000        return Err(anyhow!(
1001            "raw mirror source {} changed identity before capture",
1002            source_path.display()
1003        ));
1004    }
1005    let current_path_metadata = fs::symlink_metadata(source_path)
1006        .with_context(|| format!("restat raw mirror source {}", source_path.display()))?;
1007    if current_path_metadata.file_type().is_symlink() {
1008        return Err(anyhow!(
1009            "refusing to raw-mirror symlink source {}",
1010            source_path.display()
1011        ));
1012    }
1013    if !same_source_identity(expected_metadata, &current_path_metadata) {
1014        return Err(anyhow!(
1015            "raw mirror source {} changed identity before capture",
1016            source_path.display()
1017        ));
1018    }
1019    Ok(source)
1020}
1021
1022#[cfg(unix)]
1023fn same_source_identity(expected: &fs::Metadata, actual: &fs::Metadata) -> bool {
1024    use std::os::unix::fs::MetadataExt;
1025    actual.is_file() && expected.dev() == actual.dev() && expected.ino() == actual.ino()
1026}
1027
1028#[cfg(not(unix))]
1029fn same_source_identity(_expected: &fs::Metadata, actual: &fs::Metadata) -> bool {
1030    actual.is_file()
1031}
1032
1033#[cfg(unix)]
1034fn source_identity_token(metadata: &fs::Metadata) -> Option<String> {
1035    use std::os::unix::fs::MetadataExt;
1036    Some(format!("{}:{}", metadata.dev(), metadata.ino()))
1037}
1038
1039#[cfg(not(unix))]
1040fn source_identity_token(_metadata: &fs::Metadata) -> Option<String> {
1041    None
1042}
1043
1044#[cfg(unix)]
1045fn source_change_time_ns(metadata: &fs::Metadata) -> Option<u128> {
1046    use std::os::unix::fs::MetadataExt;
1047
1048    let seconds = u128::try_from(metadata.ctime()).ok()?;
1049    let nanoseconds = u128::try_from(metadata.ctime_nsec()).ok()?;
1050    Some(
1051        seconds
1052            .saturating_mul(1_000_000_000)
1053            .saturating_add(nanoseconds),
1054    )
1055}
1056
1057#[cfg(not(unix))]
1058fn source_change_time_ns(_metadata: &fs::Metadata) -> Option<u128> {
1059    None
1060}
1061
1062fn source_file_changed_during_capture(
1063    initial: &fs::Metadata,
1064    final_metadata: &fs::Metadata,
1065) -> bool {
1066    if initial.len() != final_metadata.len() {
1067        return true;
1068    }
1069    match (initial.modified().ok(), final_metadata.modified().ok()) {
1070        (Some(initial_mtime), Some(final_mtime)) => initial_mtime != final_mtime,
1071        _ => false,
1072    }
1073}
1074
1075fn publish_content_addressed_temp(
1076    root: &Path,
1077    temp_path: &Path,
1078    final_path: &Path,
1079    expected_blake3: &str,
1080) -> Result<bool> {
1081    ensure_private_dir_descendant(
1082        root,
1083        final_path
1084            .parent()
1085            .ok_or_else(|| anyhow!("raw mirror blob path has no parent"))?,
1086    )?;
1087    if final_path.exists() {
1088        verify_existing_file(final_path, expected_blake3)?;
1089        remove_temp_best_effort(temp_path);
1090        return Ok(true);
1091    }
1092
1093    match fs::hard_link(temp_path, final_path) {
1094        Ok(()) => {
1095            sync_file(final_path)?;
1096            sync_parent(final_path)?;
1097            remove_temp_best_effort(temp_path);
1098            Ok(false)
1099        }
1100        Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {
1101            verify_existing_file(final_path, expected_blake3)?;
1102            remove_temp_best_effort(temp_path);
1103            Ok(true)
1104        }
1105        Err(err) => Err(anyhow!(
1106            "publish raw mirror blob {} from {}: {err}",
1107            final_path.display(),
1108            temp_path.display()
1109        )),
1110    }
1111}
1112
1113fn publish_manifest_bytes_create_new(
1114    root: &Path,
1115    manifest_path: &Path,
1116    manifest_bytes: &[u8],
1117    blob_blake3: &str,
1118) -> Result<bool> {
1119    ensure_private_dir_descendant(
1120        root,
1121        manifest_path
1122            .parent()
1123            .ok_or_else(|| anyhow!("raw mirror manifest path has no parent"))?,
1124    )?;
1125    if manifest_path.exists() {
1126        verify_existing_manifest(manifest_path, blob_blake3)?;
1127        return Ok(true);
1128    }
1129
1130    let temp_dir = unique_capture_temp_dir(root);
1131    ensure_private_dir_descendant(root, &temp_dir)?;
1132    let temp_path = unique_temp_path(&temp_dir, "manifest");
1133    let mut temp = private_create_new_file(&temp_path)?;
1134    temp.write_all(manifest_bytes)
1135        .with_context(|| format!("write raw mirror manifest temp {}", temp_path.display()))?;
1136    temp.sync_all()
1137        .with_context(|| format!("sync raw mirror manifest temp {}", temp_path.display()))?;
1138
1139    match fs::hard_link(&temp_path, manifest_path) {
1140        Ok(()) => {
1141            sync_file(manifest_path)?;
1142            sync_parent(manifest_path)?;
1143            remove_temp_best_effort(&temp_path);
1144            remove_empty_temp_dir_best_effort(&temp_dir);
1145            Ok(false)
1146        }
1147        Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {
1148            verify_existing_manifest(manifest_path, blob_blake3)?;
1149            remove_temp_best_effort(&temp_path);
1150            remove_empty_temp_dir_best_effort(&temp_dir);
1151            Ok(true)
1152        }
1153        Err(err) => Err(anyhow!(
1154            "publish raw mirror manifest {} from {}: {err}",
1155            manifest_path.display(),
1156            temp_path.display()
1157        )),
1158    }
1159}
1160
1161fn merge_raw_mirror_manifest_db_links(
1162    root: &Path,
1163    manifest_path: &Path,
1164    links: &[RawMirrorDbLink],
1165    expected_blob_blake3: Option<&str>,
1166) -> Result<()> {
1167    if links.is_empty() {
1168        return Ok(());
1169    }
1170
1171    let lock = MANIFEST_UPDATE_LOCK.get_or_init(|| Mutex::new(()));
1172    let _guard = lock
1173        .lock()
1174        .map_err(|_| anyhow!("raw mirror manifest update lock poisoned"))?;
1175
1176    let mut manifest = read_raw_mirror_manifest(manifest_path)?;
1177    if let Some(expected_blob_blake3) = expected_blob_blake3
1178        && manifest.blob_blake3 != expected_blob_blake3
1179    {
1180        return Err(anyhow!(
1181            "existing raw mirror manifest {} points at blob {}, expected {}",
1182            manifest_path.display(),
1183            manifest.blob_blake3,
1184            expected_blob_blake3
1185        ));
1186    }
1187
1188    let mut merged_links = manifest.db_links.clone();
1189    merged_links.extend_from_slice(links);
1190    let merged_links = unique_db_links(&merged_links);
1191    if merged_links == manifest.db_links {
1192        return Ok(());
1193    }
1194
1195    manifest.db_links = merged_links;
1196    manifest.manifest_blake3 = Some(raw_mirror_manifest_blake3(&manifest));
1197    let manifest_bytes = serde_json::to_vec_pretty(&manifest)?;
1198    replace_manifest_bytes(root, manifest_path, &manifest_bytes)
1199}
1200
1201fn replace_manifest_bytes(root: &Path, manifest_path: &Path, manifest_bytes: &[u8]) -> Result<()> {
1202    ensure_private_dir_descendant(
1203        root,
1204        manifest_path
1205            .parent()
1206            .ok_or_else(|| anyhow!("raw mirror manifest path has no parent"))?,
1207    )?;
1208    let temp_dir = unique_capture_temp_dir(root);
1209    ensure_private_dir_descendant(root, &temp_dir)?;
1210    let temp_path = unique_temp_path(&temp_dir, "manifest-update");
1211    let mut temp = private_create_new_file(&temp_path)?;
1212    temp.write_all(manifest_bytes).with_context(|| {
1213        format!(
1214            "write raw mirror manifest update temp {}",
1215            temp_path.display()
1216        )
1217    })?;
1218    temp.sync_all().with_context(|| {
1219        format!(
1220            "sync raw mirror manifest update temp {}",
1221            temp_path.display()
1222        )
1223    })?;
1224    drop(temp);
1225
1226    fs::rename(&temp_path, manifest_path).with_context(|| {
1227        format!(
1228            "replace raw mirror manifest {} from {}",
1229            manifest_path.display(),
1230            temp_path.display()
1231        )
1232    })?;
1233    set_private_file_permissions(manifest_path)?;
1234    sync_file(manifest_path)?;
1235    sync_parent(manifest_path)?;
1236    remove_empty_temp_dir_best_effort(&temp_dir);
1237    Ok(())
1238}
1239
1240fn raw_mirror_manifest_path_from_relative(root: &Path, relative_path: &str) -> Result<PathBuf> {
1241    let relative = Path::new(relative_path);
1242    if relative.is_absolute() {
1243        return Err(anyhow!(
1244            "raw mirror manifest path must be relative: {relative_path}"
1245        ));
1246    }
1247
1248    let mut normal_components = Vec::new();
1249    for component in relative.components() {
1250        match component {
1251            std::path::Component::Normal(part) => normal_components.push(part),
1252            _ => {
1253                return Err(anyhow!(
1254                    "raw mirror manifest path must use only normal relative components: {relative_path}"
1255                ));
1256            }
1257        }
1258    }
1259
1260    if normal_components.len() != 2
1261        || normal_components[0] != std::ffi::OsStr::new("manifests")
1262        || Path::new(normal_components[1])
1263            .extension()
1264            .and_then(|ext| ext.to_str())
1265            != Some("json")
1266    {
1267        return Err(anyhow!(
1268            "raw mirror manifest path must match manifests/<id>.json: {relative_path}"
1269        ));
1270    }
1271
1272    Ok(root.join(relative))
1273}
1274
1275fn verify_existing_file(path: &Path, expected_blake3: &str) -> Result<()> {
1276    let metadata = fs::symlink_metadata(path)
1277        .with_context(|| format!("stat raw mirror blob {}", path.display()))?;
1278    if metadata.file_type().is_symlink() {
1279        return Err(anyhow!(
1280            "refusing to read symlink raw mirror blob {}",
1281            path.display()
1282        ));
1283    }
1284    if !metadata.is_file() {
1285        return Err(anyhow!(
1286            "refusing to read non-file raw mirror blob {}",
1287            path.display()
1288        ));
1289    }
1290    let actual = file_blake3(path)?;
1291    if actual == expected_blake3 {
1292        Ok(())
1293    } else {
1294        Err(anyhow!(
1295            "existing raw mirror blob {} has blake3 {}, expected {}",
1296            path.display(),
1297            actual,
1298            expected_blake3
1299        ))
1300    }
1301}
1302
1303fn verify_existing_manifest(path: &Path, expected_blob_blake3: &str) -> Result<()> {
1304    let manifest = read_raw_mirror_manifest(path)?;
1305    if manifest.blob_blake3 == expected_blob_blake3 {
1306        Ok(())
1307    } else {
1308        Err(anyhow!(
1309            "existing raw mirror manifest {} points at blob {}, expected {}",
1310            path.display(),
1311            manifest.blob_blake3,
1312            expected_blob_blake3
1313        ))
1314    }
1315}
1316
1317fn read_raw_mirror_manifest(path: &Path) -> Result<RawMirrorManifestFile> {
1318    let metadata = fs::symlink_metadata(path)
1319        .with_context(|| format!("stat raw mirror manifest {}", path.display()))?;
1320    if metadata.file_type().is_symlink() {
1321        return Err(anyhow!(
1322            "refusing to read symlink raw mirror manifest {}",
1323            path.display()
1324        ));
1325    }
1326    if !metadata.is_file() {
1327        return Err(anyhow!(
1328            "refusing to read non-file raw mirror manifest {}",
1329            path.display()
1330        ));
1331    }
1332    serde_json::from_slice(
1333        &fs::read(path).with_context(|| format!("read raw mirror manifest {}", path.display()))?,
1334    )
1335    .with_context(|| format!("parse raw mirror manifest {}", path.display()))
1336}
1337
1338fn raw_mirror_root(data_dir: &Path) -> PathBuf {
1339    data_dir
1340        .join(RAW_MIRROR_ROOT_DIR)
1341        .join(RAW_MIRROR_VERSION_DIR)
1342}
1343
1344fn ensure_raw_mirror_root(data_dir: &Path) -> Result<PathBuf> {
1345    let root_parent = data_dir.join(RAW_MIRROR_ROOT_DIR);
1346    ensure_private_dir(&root_parent)?;
1347    let root = root_parent.join(RAW_MIRROR_VERSION_DIR);
1348    ensure_private_dir(&root)?;
1349    Ok(root)
1350}
1351
1352fn raw_mirror_blob_cache_key(
1353    input: &RawMirrorCaptureInput<'_>,
1354    source_metadata: &fs::Metadata,
1355) -> RawMirrorBlobCacheKey {
1356    RawMirrorBlobCacheKey {
1357        data_dir: input.data_dir.to_path_buf(),
1358        source_path: input.source_path.to_path_buf(),
1359        source_identity: source_identity_token(source_metadata),
1360        source_size_bytes: source_metadata.len(),
1361        source_mtime_ns: source_metadata.modified().ok().and_then(system_time_to_ns),
1362        source_change_time_ns: source_change_time_ns(source_metadata),
1363    }
1364}
1365
1366fn cached_raw_mirror_blob_record(
1367    key: &RawMirrorBlobCacheKey,
1368    root: &Path,
1369) -> Option<RawMirrorBlobRecord> {
1370    let cache = BLOB_CAPTURE_CACHE.get_or_init(|| Mutex::new(HashMap::new()));
1371    let record = {
1372        let mut guard = cache.lock().ok()?;
1373        let record = guard.get(key).cloned()?;
1374        if raw_mirror_blob_relative_path(&record.blob_blake3).is_none() {
1375            guard.remove(key);
1376            return None;
1377        }
1378        record
1379    };
1380
1381    let blob_relative_path = raw_mirror_blob_relative_path(&record.blob_blake3)?;
1382    let blob_path = root.join(blob_relative_path);
1383    let metadata_valid = fs::symlink_metadata(&blob_path)
1384        .map(|metadata| metadata.is_file() && !metadata.file_type().is_symlink())
1385        .unwrap_or(false);
1386    if !metadata_valid {
1387        remove_cached_raw_mirror_blob_record_if_unchanged(cache, key, &record);
1388        return None;
1389    }
1390
1391    match file_blake3(&blob_path) {
1392        Ok(actual) if actual == record.blob_blake3 => Some(record),
1393        Ok(actual) => {
1394            tracing::warn!(
1395                path = %blob_path.display(),
1396                expected_blake3 = %record.blob_blake3,
1397                actual_blake3 = %actual,
1398                "discarding raw mirror blob cache entry with mismatched content"
1399            );
1400            remove_cached_raw_mirror_blob_record_if_unchanged(cache, key, &record);
1401            None
1402        }
1403        Err(err) => {
1404            tracing::debug!(
1405                path = %blob_path.display(),
1406                error = %err,
1407                "discarding unreadable raw mirror blob cache entry"
1408            );
1409            remove_cached_raw_mirror_blob_record_if_unchanged(cache, key, &record);
1410            None
1411        }
1412    }
1413}
1414
1415fn remove_cached_raw_mirror_blob_record_if_unchanged(
1416    cache: &Mutex<HashMap<RawMirrorBlobCacheKey, RawMirrorBlobRecord>>,
1417    key: &RawMirrorBlobCacheKey,
1418    stale_record: &RawMirrorBlobRecord,
1419) {
1420    if let Ok(mut guard) = cache.lock()
1421        && guard
1422            .get(key)
1423            .is_some_and(|current| current == stale_record)
1424    {
1425        guard.remove(key);
1426    }
1427}
1428
1429fn cache_raw_mirror_blob_record(key: RawMirrorBlobCacheKey, record: RawMirrorBlobRecord) {
1430    let cache = BLOB_CAPTURE_CACHE.get_or_init(|| Mutex::new(HashMap::new()));
1431    if let Ok(mut guard) = cache.lock() {
1432        guard.insert(key, record);
1433    }
1434}
1435
1436fn raw_mirror_blob_relative_path(blob_blake3: &str) -> Option<String> {
1437    if blob_blake3.len() != 64 || !blob_blake3.chars().all(|c| c.is_ascii_hexdigit()) {
1438        return None;
1439    }
1440    let lower = blob_blake3.to_ascii_lowercase();
1441    Some(format!(
1442        "blobs/{}/{}/{}.{}",
1443        RAW_MIRROR_HASH_ALGORITHM,
1444        &lower[..2],
1445        lower,
1446        RAW_MIRROR_BLOB_EXTENSION
1447    ))
1448}
1449
1450fn raw_mirror_manifest_relative_path(manifest_id: &str) -> String {
1451    format!("manifests/{manifest_id}.json")
1452}
1453
1454fn raw_mirror_original_path_blake3(original_path: &str) -> String {
1455    let mut hasher = blake3::Hasher::new();
1456    hasher.update(b"doctor-raw-mirror-original-path-v1");
1457    hasher.update(&[0]);
1458    hasher.update(original_path.as_bytes());
1459    hasher.finalize().to_hex().to_string()
1460}
1461
1462fn raw_mirror_manifest_id(
1463    provider: &str,
1464    source_id: &str,
1465    origin_kind: &str,
1466    origin_host: Option<&str>,
1467    original_path_blake3: &str,
1468    blob_blake3: &str,
1469) -> String {
1470    canonical_blake3(
1471        "doctor-raw-mirror-manifest-id-v1",
1472        json!({
1473            "provider": provider,
1474            "source_id": source_id,
1475            "origin_kind": origin_kind,
1476            "origin_host": origin_host,
1477            "original_path_blake3": original_path_blake3,
1478            "blob_blake3": blob_blake3,
1479        }),
1480    )
1481}
1482
1483fn raw_mirror_manifest_blake3(manifest: &RawMirrorManifestFile) -> String {
1484    let mut value = serde_json::to_value(manifest).unwrap_or_default();
1485    if let Value::Object(map) = &mut value {
1486        map.remove("manifest_blake3");
1487    }
1488    canonical_blake3("doctor-raw-mirror-manifest-v1", value)
1489}
1490
1491fn canonical_blake3(prefix: &str, value: Value) -> String {
1492    let encoded = serde_json::to_vec(&canonical_json_value(value)).unwrap_or_default();
1493    let mut hasher = blake3::Hasher::new();
1494    hasher.update(prefix.as_bytes());
1495    hasher.update(&[0]);
1496    hasher.update(&encoded);
1497    format!("{prefix}-{}", hasher.finalize().to_hex())
1498}
1499
1500fn canonical_json_value(value: Value) -> Value {
1501    match value {
1502        Value::Array(items) => Value::Array(items.into_iter().map(canonical_json_value).collect()),
1503        Value::Object(map) => {
1504            let mut entries: Vec<_> = map.into_iter().collect();
1505            entries.sort_by(|left, right| left.0.cmp(&right.0));
1506            let mut canonical = serde_json::Map::new();
1507            for (key, value) in entries {
1508                canonical.insert(key, canonical_json_value(value));
1509            }
1510            Value::Object(canonical)
1511        }
1512        other => other,
1513    }
1514}
1515
1516fn unique_db_links(links: &[RawMirrorDbLink]) -> Vec<RawMirrorDbLink> {
1517    let mut dedup = links.to_vec();
1518    dedup.sort_by(|left, right| {
1519        (
1520            left.conversation_id,
1521            left.message_count,
1522            left.started_at_ms,
1523            left.source_path.as_deref().unwrap_or(""),
1524        )
1525            .cmp(&(
1526                right.conversation_id,
1527                right.message_count,
1528                right.started_at_ms,
1529                right.source_path.as_deref().unwrap_or(""),
1530            ))
1531    });
1532    dedup.dedup();
1533    dedup
1534}
1535
1536fn file_blake3(path: &Path) -> Result<String> {
1537    let mut file = File::open(path).with_context(|| format!("open {}", path.display()))?;
1538    let mut hasher = blake3::Hasher::new();
1539    let mut buffer = [0u8; 64 * 1024];
1540    loop {
1541        let read = file
1542            .read(&mut buffer)
1543            .with_context(|| format!("read {}", path.display()))?;
1544        if read == 0 {
1545            break;
1546        }
1547        hasher.update(&buffer[..read]);
1548    }
1549    Ok(hasher.finalize().to_hex().to_string())
1550}
1551
1552fn ensure_private_dir(path: &Path) -> Result<()> {
1553    create_private_dir_all(path)
1554        .with_context(|| format!("create raw mirror dir {}", path.display()))?;
1555    let metadata = fs::symlink_metadata(path)
1556        .with_context(|| format!("stat raw mirror dir {}", path.display()))?;
1557    let file_type = metadata.file_type();
1558    if file_type.is_symlink() {
1559        return Err(anyhow!(
1560            "refusing to use symlink raw mirror dir {}",
1561            path.display()
1562        ));
1563    }
1564    if !file_type.is_dir() {
1565        return Err(anyhow!(
1566            "refusing to use non-directory raw mirror path {}",
1567            path.display()
1568        ));
1569    }
1570    #[cfg(unix)]
1571    {
1572        use std::os::unix::fs::PermissionsExt;
1573        if metadata.permissions().mode() & 0o777 != 0o700 {
1574            set_private_dir_permissions(path)?;
1575        }
1576    }
1577    #[cfg(not(unix))]
1578    {
1579        set_private_dir_permissions(path)?;
1580    }
1581    Ok(())
1582}
1583
1584fn ensure_private_dir_descendant(root: &Path, path: &Path) -> Result<()> {
1585    let relative = path.strip_prefix(root).with_context(|| {
1586        format!(
1587            "raw mirror private dir {} is not under root {}",
1588            path.display(),
1589            root.display()
1590        )
1591    })?;
1592
1593    if let Some(root_parent) = root.parent() {
1594        ensure_private_dir(root_parent)?;
1595    }
1596    ensure_private_dir(root)?;
1597    let mut current = root.to_path_buf();
1598    for component in relative.components() {
1599        match component {
1600            Component::Normal(part) => {
1601                current.push(part);
1602                ensure_private_dir(&current)?;
1603            }
1604            Component::CurDir => {}
1605            _ => {
1606                return Err(anyhow!(
1607                    "raw mirror private dir contains non-normal component: {}",
1608                    path.display()
1609                ));
1610            }
1611        }
1612    }
1613
1614    Ok(())
1615}
1616
1617fn private_create_new_file(path: &Path) -> Result<File> {
1618    let mut options = OpenOptions::new();
1619    options.write(true).create_new(true);
1620    set_private_create_file_mode(&mut options);
1621    let file = options
1622        .open(path)
1623        .with_context(|| format!("create raw mirror file {}", path.display()))?;
1624    Ok(file)
1625}
1626
1627#[cfg(unix)]
1628fn create_private_dir_all(path: &Path) -> std::io::Result<()> {
1629    use std::os::unix::fs::DirBuilderExt;
1630
1631    let mut builder = fs::DirBuilder::new();
1632    builder.recursive(true).mode(0o700).create(path)
1633}
1634
1635#[cfg(not(unix))]
1636fn create_private_dir_all(path: &Path) -> std::io::Result<()> {
1637    fs::create_dir_all(path)
1638}
1639
1640#[cfg(unix)]
1641fn set_private_create_file_mode(options: &mut OpenOptions) {
1642    use std::os::unix::fs::OpenOptionsExt;
1643
1644    options.mode(0o600);
1645}
1646
1647#[cfg(not(unix))]
1648fn set_private_create_file_mode(_options: &mut OpenOptions) {}
1649
1650fn sync_file(path: &Path) -> Result<()> {
1651    File::open(path)
1652        .and_then(|file| file.sync_all())
1653        .with_context(|| format!("sync raw mirror file {}", path.display()))
1654}
1655
1656fn sync_parent(path: &Path) -> Result<()> {
1657    let Some(parent) = path.parent() else {
1658        return Ok(());
1659    };
1660    File::open(parent)
1661        .and_then(|file| file.sync_all())
1662        .with_context(|| format!("sync raw mirror parent {}", parent.display()))
1663}
1664
1665fn unique_temp_path(dir: &Path, label: &str) -> PathBuf {
1666    let nonce = TEMP_NONCE.fetch_add(1, Ordering::Relaxed);
1667    let nanos = SystemTime::now()
1668        .duration_since(UNIX_EPOCH)
1669        .unwrap_or_default()
1670        .as_nanos();
1671    dir.join(format!(
1672        ".{label}.{}.{}.{}.tmp",
1673        std::process::id(),
1674        nanos,
1675        nonce
1676    ))
1677}
1678
1679fn unique_capture_temp_dir(root: &Path) -> PathBuf {
1680    let nonce = TEMP_NONCE.fetch_add(1, Ordering::Relaxed);
1681    let nanos = SystemTime::now()
1682        .duration_since(UNIX_EPOCH)
1683        .unwrap_or_default()
1684        .as_nanos();
1685    root.join("tmp").join(format!(
1686        "capture.{}.{}.{}",
1687        std::process::id(),
1688        nanos,
1689        nonce
1690    ))
1691}
1692
1693fn remove_temp_best_effort(path: &Path) {
1694    if let Err(err) = fs::remove_file(path) {
1695        tracing::debug!(
1696            path = %path.display(),
1697            error = %err,
1698            "failed to remove raw mirror temp file"
1699        );
1700    }
1701}
1702
1703fn remove_empty_temp_dir_best_effort(path: &Path) {
1704    if let Err(err) = fs::remove_dir(path) {
1705        tracing::debug!(
1706            path = %path.display(),
1707            error = %err,
1708            "failed to remove raw mirror temp directory"
1709        );
1710    }
1711}
1712
1713fn redacted_original_path(provider: &str, source_path: &Path) -> String {
1714    let file_name = source_path
1715        .file_name()
1716        .and_then(|name| name.to_str())
1717        .unwrap_or("session");
1718    format!("[{provider}]/{file_name}")
1719}
1720
1721fn now_ms() -> i64 {
1722    system_time_to_ms(SystemTime::now()).unwrap_or(0)
1723}
1724
1725fn system_time_to_ms(time: SystemTime) -> Option<i64> {
1726    time.duration_since(UNIX_EPOCH)
1727        .ok()
1728        .and_then(|duration| i64::try_from(duration.as_millis()).ok())
1729}
1730
1731fn system_time_to_ns(time: SystemTime) -> Option<u128> {
1732    time.duration_since(UNIX_EPOCH)
1733        .ok()
1734        .map(|duration| duration.as_nanos())
1735}
1736
1737#[cfg(unix)]
1738fn set_private_dir_permissions(path: &Path) -> Result<()> {
1739    use std::os::unix::fs::PermissionsExt;
1740    fs::set_permissions(path, fs::Permissions::from_mode(0o700))
1741        .with_context(|| format!("set raw mirror dir permissions {}", path.display()))
1742}
1743
1744#[cfg(not(unix))]
1745fn set_private_dir_permissions(_path: &Path) -> Result<()> {
1746    Ok(())
1747}
1748
1749#[cfg(unix)]
1750fn set_private_file_permissions(path: &Path) -> Result<()> {
1751    use std::os::unix::fs::PermissionsExt;
1752    fs::set_permissions(path, fs::Permissions::from_mode(0o600))
1753        .with_context(|| format!("set raw mirror file permissions {}", path.display()))
1754}
1755
1756#[cfg(not(unix))]
1757fn set_private_file_permissions(_path: &Path) -> Result<()> {
1758    Ok(())
1759}
1760
1761#[cfg(test)]
1762mod tests {
1763    use super::*;
1764
1765    #[test]
1766    fn capture_source_file_writes_doctor_compatible_manifest_idempotently() {
1767        let temp = tempfile::TempDir::new().expect("tempdir");
1768        let data_dir = temp.path().join("cass-data");
1769        let source_path = temp.path().join("rollout-fixture.jsonl");
1770        let source_bytes = b"{\"type\":\"message\",\"text\":\"hello\"}\n";
1771        fs::write(&source_path, source_bytes).expect("write source");
1772        let db_link = RawMirrorDbLink {
1773            conversation_id: Some(42),
1774            message_count: Some(1),
1775            source_path: Some(source_path.display().to_string()),
1776            started_at_ms: Some(1_733_000_000_000),
1777        };
1778
1779        let first = capture_source_file(RawMirrorCaptureInput {
1780            data_dir: &data_dir,
1781            provider: "codex",
1782            source_id: "local",
1783            origin_kind: "local",
1784            origin_host: None,
1785            source_path: &source_path,
1786            db_links: std::slice::from_ref(&db_link),
1787        })
1788        .expect("first capture");
1789        let second = capture_source_file(RawMirrorCaptureInput {
1790            data_dir: &data_dir,
1791            provider: "codex",
1792            source_id: "local",
1793            origin_kind: "local",
1794            origin_host: None,
1795            source_path: &source_path,
1796            db_links: std::slice::from_ref(&db_link),
1797        })
1798        .expect("second capture");
1799
1800        assert_eq!(first.manifest_id, second.manifest_id);
1801        assert_eq!(first.blob_blake3, second.blob_blake3);
1802        assert_eq!(first.captured_at_ms, second.captured_at_ms);
1803        assert_eq!(first.source_mtime_ms, second.source_mtime_ms);
1804        assert!(!first.already_present);
1805        assert!(second.already_present);
1806        assert_eq!(fs::read(&source_path).expect("source bytes"), source_bytes);
1807
1808        let blob_path = data_dir
1809            .join(RAW_MIRROR_ROOT_DIR)
1810            .join(RAW_MIRROR_VERSION_DIR)
1811            .join(&first.blob_relative_path);
1812        let manifest_path = data_dir
1813            .join(RAW_MIRROR_ROOT_DIR)
1814            .join(RAW_MIRROR_VERSION_DIR)
1815            .join(&first.manifest_relative_path);
1816        assert_eq!(fs::read(blob_path).expect("blob bytes"), source_bytes);
1817
1818        let manifest: Value =
1819            serde_json::from_slice(&fs::read(&manifest_path).expect("manifest bytes"))
1820                .expect("manifest json");
1821        assert_eq!(
1822            manifest["manifest_kind"].as_str(),
1823            Some(RAW_MIRROR_MANIFEST_KIND)
1824        );
1825        assert_eq!(manifest["provider"].as_str(), Some("codex"));
1826        assert_eq!(
1827            manifest["blob_blake3"].as_str(),
1828            Some(first.blob_blake3.as_str())
1829        );
1830        assert_eq!(
1831            manifest["redacted_original_path"].as_str(),
1832            Some("[codex]/rollout-fixture.jsonl")
1833        );
1834        assert_eq!(
1835            manifest["db_links"][0]["conversation_id"].as_i64(),
1836            Some(42)
1837        );
1838        assert_eq!(manifest["db_links"][0]["message_count"].as_u64(), Some(1));
1839        assert!(
1840            manifest["manifest_blake3"]
1841                .as_str()
1842                .is_some_and(|value| value.starts_with("doctor-raw-mirror-manifest-v1-"))
1843        );
1844        let tmp_root = data_dir
1845            .join(RAW_MIRROR_ROOT_DIR)
1846            .join(RAW_MIRROR_VERSION_DIR)
1847            .join("tmp");
1848        assert_eq!(
1849            fs::read_dir(&tmp_root)
1850                .expect("raw mirror tmp root")
1851                .collect::<Vec<_>>()
1852                .len(),
1853            0,
1854            "successful captures must not leave doctor-visible interrupted temp artifacts"
1855        );
1856
1857        #[cfg(unix)]
1858        {
1859            use std::os::unix::fs::PermissionsExt;
1860
1861            let root = data_dir
1862                .join(RAW_MIRROR_ROOT_DIR)
1863                .join(RAW_MIRROR_VERSION_DIR);
1864            assert_eq!(
1865                fs::metadata(&root)
1866                    .expect("raw mirror root metadata")
1867                    .permissions()
1868                    .mode()
1869                    & 0o777,
1870                0o700
1871            );
1872            assert_eq!(
1873                fs::metadata(&manifest_path)
1874                    .expect("manifest metadata")
1875                    .permissions()
1876                    .mode()
1877                    & 0o777,
1878                0o600
1879            );
1880        }
1881    }
1882
1883    #[test]
1884    fn capture_source_file_merges_db_links_into_existing_manifest() {
1885        let temp = tempfile::TempDir::new().expect("tempdir");
1886        let data_dir = temp.path().join("cass-data");
1887        let source_path = temp.path().join("preparse-then-parsed.jsonl");
1888        let source_bytes = b"{\"type\":\"message\",\"text\":\"hello\"}\n";
1889        fs::write(&source_path, source_bytes).expect("write source");
1890
1891        let preparse = capture_source_file(RawMirrorCaptureInput {
1892            data_dir: &data_dir,
1893            provider: "codex",
1894            source_id: "local",
1895            origin_kind: "local",
1896            origin_host: None,
1897            source_path: &source_path,
1898            db_links: &[],
1899        })
1900        .expect("preparse capture");
1901
1902        let parsed_link = RawMirrorDbLink {
1903            conversation_id: None,
1904            message_count: Some(1),
1905            source_path: Some(source_path.display().to_string()),
1906            started_at_ms: Some(1_733_000_000_000),
1907        };
1908        let parsed = capture_source_file(RawMirrorCaptureInput {
1909            data_dir: &data_dir,
1910            provider: "codex",
1911            source_id: "local",
1912            origin_kind: "local",
1913            origin_host: None,
1914            source_path: &source_path,
1915            db_links: std::slice::from_ref(&parsed_link),
1916        })
1917        .expect("parsed capture");
1918
1919        assert_eq!(preparse.manifest_id, parsed.manifest_id);
1920        assert_eq!(preparse.blob_blake3, parsed.blob_blake3);
1921        assert!(parsed.already_present);
1922
1923        let manifest_path = data_dir
1924            .join(RAW_MIRROR_ROOT_DIR)
1925            .join(RAW_MIRROR_VERSION_DIR)
1926            .join(&parsed.manifest_relative_path);
1927        let manifest = read_raw_mirror_manifest(&manifest_path).expect("merged manifest");
1928        assert_eq!(
1929            manifest.db_links,
1930            vec![parsed_link],
1931            "second capture must enrich the pre-parse manifest with DB-link evidence"
1932        );
1933        let expected_manifest_blake3 = raw_mirror_manifest_blake3(&manifest);
1934        assert_eq!(
1935            manifest.manifest_blake3.as_deref(),
1936            Some(expected_manifest_blake3.as_str()),
1937            "manifest checksum must be recomputed after DB-link merge"
1938        );
1939        assert_eq!(fs::read(&source_path).expect("source bytes"), source_bytes);
1940    }
1941
1942    #[test]
1943    fn merge_manifest_db_links_rejects_hostile_relative_paths() {
1944        let temp = tempfile::TempDir::new().expect("tempdir");
1945        let data_dir = temp.path().join("cass-data");
1946        let db_link = RawMirrorDbLink {
1947            conversation_id: Some(42),
1948            message_count: Some(1),
1949            source_path: Some("source.jsonl".to_string()),
1950            started_at_ms: Some(1_733_000_000_000),
1951        };
1952
1953        for relative in [
1954            "../escape.json",
1955            "/tmp/escape.json",
1956            "manifests/../escape.json",
1957            "blobs/blake3/ab/not-a-manifest.raw",
1958            "manifests/not-json.txt",
1959        ] {
1960            let err = merge_manifest_db_links(&data_dir, relative, std::slice::from_ref(&db_link))
1961                .expect_err("hostile manifest path should be rejected");
1962            assert!(
1963                err.to_string().contains("raw mirror manifest path"),
1964                "unexpected error for {relative}: {err}"
1965            );
1966        }
1967    }
1968
1969    #[cfg(unix)]
1970    #[test]
1971    fn merge_manifest_db_links_rejects_symlink_manifest_path() {
1972        let temp = tempfile::TempDir::new().expect("tempdir");
1973        let data_dir = temp.path().join("cass-data");
1974        let manifest_dir = data_dir.join("raw-mirror/v1/manifests");
1975        fs::create_dir_all(&manifest_dir).expect("manifest dir");
1976        let outside = temp.path().join("outside.json");
1977        fs::write(&outside, "{}").expect("outside manifest");
1978        std::os::unix::fs::symlink(&outside, manifest_dir.join("link.json"))
1979            .expect("symlink manifest");
1980        let db_link = RawMirrorDbLink {
1981            conversation_id: Some(42),
1982            message_count: Some(1),
1983            source_path: Some("source.jsonl".to_string()),
1984            started_at_ms: Some(1_733_000_000_000),
1985        };
1986
1987        let err = merge_manifest_db_links(
1988            &data_dir,
1989            "manifests/link.json",
1990            std::slice::from_ref(&db_link),
1991        )
1992        .expect_err("symlink manifest should be rejected");
1993        assert!(
1994            err.to_string().contains("symlink raw mirror manifest"),
1995            "unexpected symlink-manifest error: {err}"
1996        );
1997    }
1998
1999    #[test]
2000    fn capture_source_file_deduplicates_blob_for_distinct_source_paths() {
2001        let temp = tempfile::TempDir::new().expect("tempdir");
2002        let data_dir = temp.path().join("cass-data");
2003        let first_source = temp.path().join("first.jsonl");
2004        let second_source = temp.path().join("second.jsonl");
2005        let source_bytes = b"{\"type\":\"message\",\"text\":\"shared\"}\n";
2006        fs::write(&first_source, source_bytes).expect("write first source");
2007        fs::write(&second_source, source_bytes).expect("write second source");
2008
2009        let first = capture_source_file(RawMirrorCaptureInput {
2010            data_dir: &data_dir,
2011            provider: "codex",
2012            source_id: "local",
2013            origin_kind: "local",
2014            origin_host: None,
2015            source_path: &first_source,
2016            db_links: &[],
2017        })
2018        .expect("first capture");
2019        let second = capture_source_file(RawMirrorCaptureInput {
2020            data_dir: &data_dir,
2021            provider: "codex",
2022            source_id: "local",
2023            origin_kind: "local",
2024            origin_host: None,
2025            source_path: &second_source,
2026            db_links: &[],
2027        })
2028        .expect("second capture");
2029
2030        assert_eq!(first.blob_blake3, second.blob_blake3);
2031        assert_eq!(first.blob_relative_path, second.blob_relative_path);
2032        assert_ne!(first.manifest_id, second.manifest_id);
2033        assert!(
2034            !second.already_present,
2035            "a duplicate blob with a new source manifest is not a full capture replay"
2036        );
2037
2038        let manifest_root = data_dir
2039            .join(RAW_MIRROR_ROOT_DIR)
2040            .join(RAW_MIRROR_VERSION_DIR)
2041            .join("manifests");
2042        let manifests = fs::read_dir(manifest_root)
2043            .expect("manifest dir")
2044            .collect::<std::io::Result<Vec<_>>>()
2045            .expect("manifest entries");
2046        assert_eq!(manifests.len(), 2);
2047
2048        let summary = storage_summary(&data_dir);
2049        assert!(summary.initialized);
2050        assert_eq!(summary.manifest_count, 2);
2051        assert_eq!(summary.unique_blob_count, 1);
2052        assert_eq!(summary.total_blob_bytes, source_bytes.len() as u64);
2053        assert_eq!(summary.largest_blob_bytes, source_bytes.len() as u64);
2054        assert_eq!(summary.missing_blob_count, 0);
2055        assert_eq!(summary.invalid_manifest_count, 0);
2056        assert!(summary.total_storage_bytes >= source_bytes.len() as u64);
2057    }
2058
2059    #[test]
2060    fn storage_summary_rejects_hostile_blob_relative_path() {
2061        let temp = tempfile::TempDir::new().expect("tempdir");
2062        let data_dir = temp.path().join("cass-data");
2063        let source_path = temp.path().join("source.jsonl");
2064        fs::write(
2065            &source_path,
2066            b"{\"type\":\"message\",\"text\":\"hostile\"}\n",
2067        )
2068        .expect("write source");
2069
2070        let captured = capture_source_file(RawMirrorCaptureInput {
2071            data_dir: &data_dir,
2072            provider: "codex",
2073            source_id: "local",
2074            origin_kind: "local",
2075            origin_host: None,
2076            source_path: &source_path,
2077            db_links: &[],
2078        })
2079        .expect("capture source");
2080        let manifest_path = data_dir
2081            .join(RAW_MIRROR_ROOT_DIR)
2082            .join(RAW_MIRROR_VERSION_DIR)
2083            .join(&captured.manifest_relative_path);
2084        let mut manifest = read_raw_mirror_manifest(&manifest_path).expect("read manifest");
2085        manifest.blob_relative_path = "../outside.raw".to_string();
2086        manifest.manifest_blake3 = Some(raw_mirror_manifest_blake3(&manifest));
2087        fs::write(
2088            &manifest_path,
2089            serde_json::to_vec_pretty(&manifest).expect("serialize manifest"),
2090        )
2091        .expect("tamper manifest");
2092
2093        let summary = storage_summary(&data_dir);
2094        assert_eq!(summary.manifest_count, 1);
2095        assert_eq!(summary.invalid_manifest_count, 1);
2096        assert_eq!(summary.unique_blob_count, 0);
2097        assert_eq!(summary.total_blob_bytes, 0);
2098    }
2099
2100    #[test]
2101    fn prune_fails_closed_on_hostile_manifest_inventory() {
2102        let temp = tempfile::TempDir::new().expect("tempdir");
2103        let data_dir = temp.path().join("cass-data");
2104        let source_path = temp.path().join("source.jsonl");
2105        fs::write(
2106            &source_path,
2107            b"{\"type\":\"message\",\"text\":\"hostile\"}\n",
2108        )
2109        .expect("write source");
2110
2111        let captured = capture_source_file(RawMirrorCaptureInput {
2112            data_dir: &data_dir,
2113            provider: "codex",
2114            source_id: "local",
2115            origin_kind: "local",
2116            origin_host: None,
2117            source_path: &source_path,
2118            db_links: &[],
2119        })
2120        .expect("capture source");
2121        let root = data_dir
2122            .join(RAW_MIRROR_ROOT_DIR)
2123            .join(RAW_MIRROR_VERSION_DIR);
2124        let manifest_path = root.join(&captured.manifest_relative_path);
2125        let blob_path = root.join(&captured.blob_relative_path);
2126        let mut manifest = read_raw_mirror_manifest(&manifest_path).expect("read manifest");
2127        manifest.blob_relative_path = "../outside.raw".to_string();
2128        manifest.manifest_blake3 = Some(raw_mirror_manifest_blake3(&manifest));
2129        fs::write(
2130            &manifest_path,
2131            serde_json::to_vec_pretty(&manifest).expect("serialize manifest"),
2132        )
2133        .expect("tamper manifest");
2134
2135        let err = prune(
2136            &data_dir,
2137            RawMirrorPruneOptions {
2138                older_than_ms: Some(0),
2139                max_size_bytes: None,
2140                keep_tags: Vec::new(),
2141                safety_hold_down_ms: 0,
2142                apply: true,
2143            },
2144        )
2145        .expect_err("hostile inventory should fail closed");
2146
2147        assert!(
2148            err.to_string().contains("unexpected blob path"),
2149            "error should explain the unsafe manifest inventory: {err}"
2150        );
2151        assert!(manifest_path.exists());
2152        assert!(blob_path.exists());
2153        assert!(!root.join("pruned.jsonl").exists());
2154    }
2155
2156    #[test]
2157    fn prune_dry_run_audits_without_removing_manifest_or_blob() {
2158        let temp = tempfile::TempDir::new().expect("tempdir");
2159        let data_dir = temp.path().join("cass-data");
2160        let source_path = temp.path().join("source.jsonl");
2161        fs::write(&source_path, b"{\"type\":\"message\",\"text\":\"old\"}\n")
2162            .expect("write source");
2163        let captured = capture_source_file(RawMirrorCaptureInput {
2164            data_dir: &data_dir,
2165            provider: "codex",
2166            source_id: "local",
2167            origin_kind: "local",
2168            origin_host: None,
2169            source_path: &source_path,
2170            db_links: &[],
2171        })
2172        .expect("capture source");
2173
2174        let report = prune(
2175            &data_dir,
2176            RawMirrorPruneOptions {
2177                older_than_ms: Some(0),
2178                max_size_bytes: None,
2179                keep_tags: Vec::new(),
2180                safety_hold_down_ms: 0,
2181                apply: false,
2182            },
2183        )
2184        .expect("dry-run prune");
2185
2186        assert!(report.initialized);
2187        assert_eq!(report.mode, "dry-run");
2188        assert_eq!(report.planned_manifest_count, 1);
2189        assert_eq!(report.planned_blob_count, 1);
2190        assert_eq!(report.applied_reclaim_bytes, 0);
2191        let root = data_dir
2192            .join(RAW_MIRROR_ROOT_DIR)
2193            .join(RAW_MIRROR_VERSION_DIR);
2194        assert!(root.join(&captured.manifest_relative_path).exists());
2195        assert!(root.join(&captured.blob_relative_path).exists());
2196        let audit_path = root.join("pruned.jsonl");
2197        let audit = fs::read_to_string(audit_path).expect("read audit");
2198        assert!(audit.contains("\"mode\":\"dry-run\""));
2199        assert!(audit.contains("\"applied\":false"));
2200    }
2201
2202    #[test]
2203    fn prune_apply_removes_selected_manifest_and_unreferenced_blob() {
2204        let temp = tempfile::TempDir::new().expect("tempdir");
2205        let data_dir = temp.path().join("cass-data");
2206        let source_path = temp.path().join("source.jsonl");
2207        fs::write(&source_path, b"{\"type\":\"message\",\"text\":\"apply\"}\n")
2208            .expect("write source");
2209        let captured = capture_source_file(RawMirrorCaptureInput {
2210            data_dir: &data_dir,
2211            provider: "codex",
2212            source_id: "local",
2213            origin_kind: "local",
2214            origin_host: None,
2215            source_path: &source_path,
2216            db_links: &[],
2217        })
2218        .expect("capture source");
2219        let root = data_dir
2220            .join(RAW_MIRROR_ROOT_DIR)
2221            .join(RAW_MIRROR_VERSION_DIR);
2222        let manifest_path = root.join(&captured.manifest_relative_path);
2223        let blob_path = root.join(&captured.blob_relative_path);
2224
2225        let report = prune(
2226            &data_dir,
2227            RawMirrorPruneOptions {
2228                older_than_ms: Some(0),
2229                max_size_bytes: None,
2230                keep_tags: Vec::new(),
2231                safety_hold_down_ms: 0,
2232                apply: true,
2233            },
2234        )
2235        .expect("apply prune");
2236
2237        assert_eq!(report.applied_manifest_count, 1);
2238        assert_eq!(report.applied_blob_count, 1);
2239        assert!(!manifest_path.exists());
2240        assert!(!blob_path.exists());
2241        let audit = fs::read_to_string(root.join("pruned.jsonl")).expect("read audit");
2242        assert!(audit.contains("\"mode\":\"apply\""));
2243        assert!(audit.contains("\"applied\":true"));
2244    }
2245
2246    #[test]
2247    fn prune_apply_keeps_blob_referenced_by_retained_manifest() {
2248        let temp = tempfile::TempDir::new().expect("tempdir");
2249        let data_dir = temp.path().join("cass-data");
2250        let first_source = temp.path().join("first.jsonl");
2251        let second_source = temp.path().join("second.jsonl");
2252        let bytes = b"{\"type\":\"message\",\"text\":\"shared-retained\"}\n";
2253        fs::write(&first_source, bytes).expect("write first");
2254        fs::write(&second_source, bytes).expect("write second");
2255        let first = capture_source_file(RawMirrorCaptureInput {
2256            data_dir: &data_dir,
2257            provider: "codex",
2258            source_id: "local",
2259            origin_kind: "local",
2260            origin_host: None,
2261            source_path: &first_source,
2262            db_links: &[],
2263        })
2264        .expect("capture first");
2265        let second = capture_source_file(RawMirrorCaptureInput {
2266            data_dir: &data_dir,
2267            provider: "codex",
2268            source_id: "local",
2269            origin_kind: "local",
2270            origin_host: None,
2271            source_path: &second_source,
2272            db_links: &[],
2273        })
2274        .expect("capture second");
2275        let root = data_dir
2276            .join(RAW_MIRROR_ROOT_DIR)
2277            .join(RAW_MIRROR_VERSION_DIR);
2278        let first_manifest_path = root.join(&first.manifest_relative_path);
2279        let second_manifest_path = root.join(&second.manifest_relative_path);
2280        let mut first_manifest =
2281            read_raw_mirror_manifest(&first_manifest_path).expect("first manifest");
2282        first_manifest.captured_at_ms = now_ms().saturating_sub(2 * 86_400_000);
2283        first_manifest.manifest_blake3 = Some(raw_mirror_manifest_blake3(&first_manifest));
2284        fs::write(
2285            &first_manifest_path,
2286            serde_json::to_vec_pretty(&first_manifest).expect("serialize first manifest"),
2287        )
2288        .expect("rewrite first manifest");
2289
2290        let report = prune(
2291            &data_dir,
2292            RawMirrorPruneOptions {
2293                older_than_ms: Some(86_400_000),
2294                max_size_bytes: None,
2295                keep_tags: Vec::new(),
2296                safety_hold_down_ms: 0,
2297                apply: true,
2298            },
2299        )
2300        .expect("apply one-manifest prune");
2301
2302        assert_eq!(report.applied_manifest_count, 1);
2303        assert_eq!(report.applied_blob_count, 0);
2304        assert!(!first_manifest_path.exists());
2305        assert!(second_manifest_path.exists());
2306        assert!(
2307            root.join(&first.blob_relative_path).exists(),
2308            "shared blob must stay while a retained manifest still references it"
2309        );
2310    }
2311
2312    #[test]
2313    fn prune_apply_keep_tag_pins_linked_manifest_and_blob() {
2314        use frankensqlite::compat::ConnectionExt as _;
2315
2316        let temp = tempfile::TempDir::new().expect("tempdir");
2317        let data_dir = temp.path().join("cass-data");
2318        std::fs::create_dir_all(&data_dir).expect("create data dir");
2319        let source_path = temp.path().join("tagged.jsonl");
2320        fs::write(
2321            &source_path,
2322            b"{\"type\":\"message\",\"text\":\"tagged\"}\n",
2323        )
2324        .expect("write source");
2325        let db_link = RawMirrorDbLink {
2326            conversation_id: Some(7),
2327            message_count: Some(1),
2328            source_path: Some(source_path.display().to_string()),
2329            started_at_ms: Some(1_733_000_000_000),
2330        };
2331        let captured = capture_source_file(RawMirrorCaptureInput {
2332            data_dir: &data_dir,
2333            provider: "codex",
2334            source_id: "local",
2335            origin_kind: "local",
2336            origin_host: None,
2337            source_path: &source_path,
2338            db_links: std::slice::from_ref(&db_link),
2339        })
2340        .expect("capture source");
2341        let db_path = data_dir.join("agent_search.db");
2342        let conn = frankensqlite::Connection::open(db_path.to_string_lossy().into_owned())
2343            .expect("open keep-tag db");
2344        conn.execute_compat(
2345            "CREATE TABLE tags (id INTEGER PRIMARY KEY, name TEXT NOT NULL UNIQUE)",
2346            frankensqlite::params![],
2347        )
2348        .expect("create tags");
2349        conn.execute_compat(
2350            "CREATE TABLE conversation_tags (conversation_id INTEGER NOT NULL, tag_id INTEGER NOT NULL, PRIMARY KEY (conversation_id, tag_id))",
2351            frankensqlite::params![],
2352        )
2353        .expect("create conversation_tags");
2354        conn.execute_compat(
2355            "INSERT INTO tags (id, name) VALUES (1, 'keep')",
2356            frankensqlite::params![],
2357        )
2358        .expect("insert tag");
2359        conn.execute_compat(
2360            "INSERT INTO conversation_tags (conversation_id, tag_id) VALUES (7, 1)",
2361            frankensqlite::params![],
2362        )
2363        .expect("insert conversation tag");
2364        drop(conn);
2365
2366        let report = prune(
2367            &data_dir,
2368            RawMirrorPruneOptions {
2369                older_than_ms: Some(0),
2370                max_size_bytes: Some(0),
2371                keep_tags: vec!["keep".to_string()],
2372                safety_hold_down_ms: 0,
2373                apply: true,
2374            },
2375        )
2376        .expect("keep-tag prune");
2377
2378        let root = data_dir
2379            .join(RAW_MIRROR_ROOT_DIR)
2380            .join(RAW_MIRROR_VERSION_DIR);
2381        assert_eq!(report.pinned_manifest_count, 1);
2382        assert_eq!(report.pinned_blob_count, 1);
2383        assert_eq!(report.planned_manifest_count, 0);
2384        assert_eq!(report.planned_blob_count, 0);
2385        assert!(root.join(&captured.manifest_relative_path).exists());
2386        assert!(root.join(&captured.blob_relative_path).exists());
2387    }
2388
2389    #[test]
2390    fn prune_apply_safety_hold_down_pins_recent_manifest_during_size_prune() {
2391        let temp = tempfile::TempDir::new().expect("tempdir");
2392        let data_dir = temp.path().join("cass-data");
2393        let source_path = temp.path().join("recent.jsonl");
2394        fs::write(
2395            &source_path,
2396            b"{\"type\":\"message\",\"text\":\"recent\"}\n",
2397        )
2398        .expect("write source");
2399        let captured = capture_source_file(RawMirrorCaptureInput {
2400            data_dir: &data_dir,
2401            provider: "codex",
2402            source_id: "local",
2403            origin_kind: "local",
2404            origin_host: None,
2405            source_path: &source_path,
2406            db_links: &[],
2407        })
2408        .expect("capture source");
2409
2410        let report = prune(
2411            &data_dir,
2412            RawMirrorPruneOptions {
2413                older_than_ms: None,
2414                max_size_bytes: Some(0),
2415                keep_tags: Vec::new(),
2416                safety_hold_down_ms: 7 * 86_400_000,
2417                apply: true,
2418            },
2419        )
2420        .expect("hold-down prune");
2421
2422        let root = data_dir
2423            .join(RAW_MIRROR_ROOT_DIR)
2424            .join(RAW_MIRROR_VERSION_DIR);
2425        assert_eq!(report.pinned_manifest_count, 1);
2426        assert_eq!(report.pinned_blob_count, 1);
2427        assert_eq!(report.planned_manifest_count, 0);
2428        assert_eq!(report.planned_blob_count, 0);
2429        assert!(root.join(&captured.manifest_relative_path).exists());
2430        assert!(root.join(&captured.blob_relative_path).exists());
2431    }
2432
2433    #[test]
2434    fn capture_source_file_revalidates_cached_blob_contents() {
2435        let temp = tempfile::TempDir::new().expect("tempdir");
2436        let data_dir = temp.path().join("cass-data");
2437        let source_path = temp.path().join("cached-source.jsonl");
2438        let source_bytes = b"{\"type\":\"message\",\"text\":\"cache me\"}\n";
2439        fs::write(&source_path, source_bytes).expect("write source");
2440
2441        let first = capture_source_file(RawMirrorCaptureInput {
2442            data_dir: &data_dir,
2443            provider: "codex",
2444            source_id: "local",
2445            origin_kind: "local",
2446            origin_host: None,
2447            source_path: &source_path,
2448            db_links: &[],
2449        })
2450        .expect("first capture");
2451
2452        let blob_path = data_dir
2453            .join(RAW_MIRROR_ROOT_DIR)
2454            .join(RAW_MIRROR_VERSION_DIR)
2455            .join(&first.blob_relative_path);
2456        fs::write(&blob_path, b"corrupted cached blob").expect("corrupt cached blob");
2457
2458        let err = capture_source_file(RawMirrorCaptureInput {
2459            data_dir: &data_dir,
2460            provider: "codex",
2461            source_id: "local",
2462            origin_kind: "local",
2463            origin_host: None,
2464            source_path: &source_path,
2465            db_links: &[],
2466        })
2467        .expect_err("corrupted content-addressed blob must be rejected");
2468        assert!(
2469            err.to_string().contains("existing raw mirror blob"),
2470            "unexpected cached-blob error: {err:#}"
2471        );
2472        assert_eq!(fs::read(&source_path).expect("source bytes"), source_bytes);
2473    }
2474
2475    #[cfg(unix)]
2476    #[test]
2477    fn capture_source_file_does_not_reuse_cache_after_same_size_mtime_preserving_rewrite() {
2478        let temp = tempfile::TempDir::new().expect("tempdir");
2479        let data_dir = temp.path().join("cass-data");
2480        let source_path = temp.path().join("same-size-rewrite.jsonl");
2481        let first_bytes = b"same length payload A\n";
2482        let second_bytes = b"same length payload B\n";
2483        fs::write(&source_path, first_bytes).expect("write first source");
2484
2485        let first_modified = fs::metadata(&source_path)
2486            .expect("first metadata")
2487            .modified()
2488            .expect("first modified time");
2489        let first = capture_source_file(RawMirrorCaptureInput {
2490            data_dir: &data_dir,
2491            provider: "codex",
2492            source_id: "local",
2493            origin_kind: "local",
2494            origin_host: None,
2495            source_path: &source_path,
2496            db_links: &[],
2497        })
2498        .expect("first capture");
2499
2500        std::thread::sleep(std::time::Duration::from_millis(5));
2501        fs::write(&source_path, second_bytes).expect("rewrite source");
2502        let source = OpenOptions::new()
2503            .write(true)
2504            .open(&source_path)
2505            .expect("open rewritten source");
2506        source
2507            .set_times(std::fs::FileTimes::new().set_modified(first_modified))
2508            .expect("restore original mtime");
2509
2510        let second = capture_source_file(RawMirrorCaptureInput {
2511            data_dir: &data_dir,
2512            provider: "codex",
2513            source_id: "local",
2514            origin_kind: "local",
2515            origin_host: None,
2516            source_path: &source_path,
2517            db_links: &[],
2518        })
2519        .expect("second capture");
2520
2521        assert_ne!(first.blob_blake3, second.blob_blake3);
2522        assert_eq!(
2523            second.blob_blake3,
2524            blake3::hash(second_bytes).to_hex().to_string()
2525        );
2526        assert_eq!(
2527            fs::read(&source_path).expect("source bytes after rewrite"),
2528            second_bytes
2529        );
2530    }
2531
2532    #[cfg(unix)]
2533    #[test]
2534    fn capture_source_file_rejects_symlinked_existing_blob_path() {
2535        let temp = tempfile::TempDir::new().expect("tempdir");
2536        let data_dir = temp.path().join("cass-data");
2537        let source_path = temp.path().join("cached-source.jsonl");
2538        let source_bytes = b"{\"type\":\"message\",\"text\":\"cache me\"}\n";
2539        fs::write(&source_path, source_bytes).expect("write source");
2540
2541        let blob_blake3 = blake3::hash(source_bytes).to_hex().to_string();
2542        let blob_relative_path =
2543            raw_mirror_blob_relative_path(&blob_blake3).expect("blob relative path");
2544        let blob_path = data_dir
2545            .join(RAW_MIRROR_ROOT_DIR)
2546            .join(RAW_MIRROR_VERSION_DIR)
2547            .join(&blob_relative_path);
2548        fs::create_dir_all(blob_path.parent().expect("blob parent")).expect("blob parent dir");
2549        let outside = temp.path().join("outside.raw");
2550        fs::write(&outside, source_bytes).expect("outside blob bytes");
2551        std::os::unix::fs::symlink(&outside, &blob_path).expect("symlink blob");
2552
2553        let err = capture_source_file(RawMirrorCaptureInput {
2554            data_dir: &data_dir,
2555            provider: "codex",
2556            source_id: "local",
2557            origin_kind: "local",
2558            origin_host: None,
2559            source_path: &source_path,
2560            db_links: &[],
2561        })
2562        .expect_err("symlinked content-addressed blob path must be rejected");
2563        assert!(
2564            err.to_string().contains("symlink raw mirror blob"),
2565            "unexpected symlink-blob error: {err:#}"
2566        );
2567
2568        let manifest_root = data_dir
2569            .join(RAW_MIRROR_ROOT_DIR)
2570            .join(RAW_MIRROR_VERSION_DIR)
2571            .join("manifests");
2572        assert!(
2573            !manifest_root.exists(),
2574            "failed blob publish must not write a manifest pointing at a symlinked blob"
2575        );
2576        assert_eq!(fs::read(&source_path).expect("source bytes"), source_bytes);
2577        assert_eq!(fs::read(&outside).expect("outside bytes"), source_bytes);
2578    }
2579
2580    #[cfg(unix)]
2581    #[test]
2582    fn capture_source_file_rejects_symlinked_raw_mirror_root_dir() {
2583        let temp = tempfile::TempDir::new().expect("tempdir");
2584        let data_dir = temp.path().join("cass-data");
2585        let source_path = temp.path().join("source.jsonl");
2586        let outside_mirror = temp.path().join("outside-mirror");
2587        let source_bytes = b"{\"type\":\"message\",\"text\":\"do not redirect archive\"}\n";
2588
2589        fs::create_dir_all(&data_dir).expect("data dir");
2590        fs::create_dir_all(&outside_mirror).expect("outside mirror dir");
2591        fs::write(&source_path, source_bytes).expect("write source");
2592        std::os::unix::fs::symlink(&outside_mirror, data_dir.join(RAW_MIRROR_ROOT_DIR))
2593            .expect("symlink raw mirror root");
2594
2595        let err = capture_source_file(RawMirrorCaptureInput {
2596            data_dir: &data_dir,
2597            provider: "codex",
2598            source_id: "local",
2599            origin_kind: "local",
2600            origin_host: None,
2601            source_path: &source_path,
2602            db_links: &[],
2603        })
2604        .expect_err("symlinked raw-mirror root must be rejected");
2605
2606        assert!(
2607            err.to_string().contains("symlink raw mirror dir"),
2608            "unexpected symlink-root error: {err:#}"
2609        );
2610        assert!(
2611            !outside_mirror.join(RAW_MIRROR_VERSION_DIR).exists(),
2612            "raw mirror capture must not create redirected archive state outside data_dir"
2613        );
2614        assert_eq!(fs::read(&source_path).expect("source bytes"), source_bytes);
2615    }
2616
2617    #[cfg(unix)]
2618    #[test]
2619    fn capture_source_file_rejects_symlinked_blob_directory_component() {
2620        let temp = tempfile::TempDir::new().expect("tempdir");
2621        let data_dir = temp.path().join("cass-data");
2622        let root = data_dir
2623            .join(RAW_MIRROR_ROOT_DIR)
2624            .join(RAW_MIRROR_VERSION_DIR);
2625        let source_path = temp.path().join("source.jsonl");
2626        let outside_blobs = temp.path().join("outside-blobs");
2627        let source_bytes = b"{\"type\":\"message\",\"text\":\"do not redirect blobs\"}\n";
2628
2629        fs::create_dir_all(&root).expect("raw mirror root");
2630        fs::create_dir_all(&outside_blobs).expect("outside blobs dir");
2631        fs::write(&source_path, source_bytes).expect("write source");
2632        std::os::unix::fs::symlink(&outside_blobs, root.join("blobs")).expect("symlink blobs dir");
2633
2634        let err = capture_source_file(RawMirrorCaptureInput {
2635            data_dir: &data_dir,
2636            provider: "codex",
2637            source_id: "local",
2638            origin_kind: "local",
2639            origin_host: None,
2640            source_path: &source_path,
2641            db_links: &[],
2642        })
2643        .expect_err("symlinked blob directory must be rejected");
2644
2645        assert!(
2646            err.to_string().contains("symlink raw mirror dir"),
2647            "unexpected symlink-blob-dir error: {err:#}"
2648        );
2649        assert!(
2650            !outside_blobs.join(RAW_MIRROR_HASH_ALGORITHM).exists(),
2651            "raw mirror capture must not create redirected blob state outside data_dir"
2652        );
2653        assert!(
2654            !root.join("manifests").exists(),
2655            "failed blob publish must not write a manifest"
2656        );
2657        assert_eq!(fs::read(&source_path).expect("source bytes"), source_bytes);
2658    }
2659
2660    #[test]
2661    fn capture_source_file_rejects_non_file_sources() {
2662        let temp = tempfile::TempDir::new().expect("tempdir");
2663        let data_dir = temp.path().join("cass-data");
2664        let source_dir = temp.path().join("source-dir");
2665        fs::create_dir(&source_dir).expect("source dir");
2666
2667        let err = capture_source_file(RawMirrorCaptureInput {
2668            data_dir: &data_dir,
2669            provider: "codex",
2670            source_id: "local",
2671            origin_kind: "local",
2672            origin_host: None,
2673            source_path: &source_dir,
2674            db_links: &[],
2675        })
2676        .expect_err("directory source should be rejected");
2677        assert!(
2678            err.to_string().contains("non-file source"),
2679            "unexpected non-file-source error: {err}"
2680        );
2681        assert!(
2682            !data_dir.join(RAW_MIRROR_ROOT_DIR).exists(),
2683            "rejected non-file sources must not initialize raw mirror storage"
2684        );
2685    }
2686
2687    #[cfg(unix)]
2688    #[test]
2689    fn capture_source_file_rejects_unreadable_sources_without_manifest() {
2690        use std::os::unix::fs::PermissionsExt;
2691
2692        let temp = tempfile::TempDir::new().expect("tempdir");
2693        let data_dir = temp.path().join("cass-data");
2694        let source_path = temp.path().join("unreadable.jsonl");
2695        fs::write(&source_path, b"private session bytes\n").expect("source");
2696        fs::set_permissions(&source_path, fs::Permissions::from_mode(0o000))
2697            .expect("make source unreadable");
2698
2699        let err = capture_source_file(RawMirrorCaptureInput {
2700            data_dir: &data_dir,
2701            provider: "codex",
2702            source_id: "local",
2703            origin_kind: "local",
2704            origin_host: None,
2705            source_path: &source_path,
2706            db_links: &[],
2707        })
2708        .expect_err("unreadable source should be rejected");
2709        fs::set_permissions(&source_path, fs::Permissions::from_mode(0o600))
2710            .expect("restore source perms");
2711        assert!(
2712            err.to_string().contains("open raw mirror source"),
2713            "unexpected unreadable-source error: {err}"
2714        );
2715        assert!(
2716            !data_dir.join("raw-mirror/v1/manifests").exists(),
2717            "failed unreadable-source captures must not publish manifests"
2718        );
2719    }
2720
2721    #[cfg(unix)]
2722    #[test]
2723    fn capture_source_file_rejects_symlink_sources() {
2724        use std::os::unix::fs::symlink;
2725
2726        let temp = tempfile::TempDir::new().expect("tempdir");
2727        let data_dir = temp.path().join("cass-data");
2728        let real_source = temp.path().join("real.jsonl");
2729        let symlink_source = temp.path().join("link.jsonl");
2730        fs::write(&real_source, b"secret session").expect("write source");
2731        symlink(&real_source, &symlink_source).expect("symlink");
2732
2733        let err = capture_source_file(RawMirrorCaptureInput {
2734            data_dir: &data_dir,
2735            provider: "codex",
2736            source_id: "local",
2737            origin_kind: "local",
2738            origin_host: None,
2739            source_path: &symlink_source,
2740            db_links: &[],
2741        })
2742        .expect_err("symlink source should be rejected");
2743        assert!(
2744            err.to_string().contains("symlink source"),
2745            "unexpected error: {err:#}"
2746        );
2747    }
2748}