Skip to main content

coding_agent_search/
raw_mirror.rs

1use anyhow::{Context, Result, anyhow};
2use serde::{Deserialize, Serialize};
3use serde_json::{Value, json};
4use std::collections::{HashMap, HashSet};
5use std::fs::{self, File, OpenOptions};
6use std::io::{Read, Write};
7use std::path::{Component, Path, PathBuf};
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::{Mutex, OnceLock};
10use std::time::{Duration, SystemTime, UNIX_EPOCH};
11
12const RAW_MIRROR_SCHEMA_VERSION: u32 = 1;
13const RAW_MIRROR_ROOT_DIR: &str = "raw-mirror";
14const RAW_MIRROR_VERSION_DIR: &str = "v1";
15const RAW_MIRROR_MANIFEST_KIND: &str = "cass_raw_session_mirror_v1";
16const RAW_MIRROR_HASH_ALGORITHM: &str = "blake3";
17const RAW_MIRROR_BLOB_EXTENSION: &str = "raw";
18
19static TEMP_NONCE: AtomicU64 = AtomicU64::new(0);
20static BLOB_CAPTURE_CACHE: OnceLock<Mutex<HashMap<RawMirrorBlobCacheKey, RawMirrorBlobRecord>>> =
21    OnceLock::new();
22static MANIFEST_UPDATE_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
23
24#[derive(Debug, Clone)]
25pub struct RawMirrorCaptureInput<'a> {
26    pub data_dir: &'a Path,
27    pub provider: &'a str,
28    pub source_id: &'a str,
29    pub origin_kind: &'a str,
30    pub origin_host: Option<&'a str>,
31    pub source_path: &'a Path,
32    pub db_links: &'a [RawMirrorDbLink],
33}
34
35#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
36pub struct RawMirrorCaptureRecord {
37    pub manifest_id: String,
38    pub manifest_relative_path: String,
39    pub blob_relative_path: String,
40    pub blob_blake3: String,
41    pub blob_size_bytes: u64,
42    pub captured_at_ms: i64,
43    pub source_mtime_ms: Option<i64>,
44    pub already_present: bool,
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
48pub struct RawMirrorDbLink {
49    pub conversation_id: Option<i64>,
50    pub message_count: Option<usize>,
51    pub source_path: Option<String>,
52    pub started_at_ms: Option<i64>,
53}
54
55#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
56pub struct RawMirrorStorageSummary {
57    pub initialized: bool,
58    pub root_path: String,
59    pub total_storage_bytes: u64,
60    pub manifest_count: u64,
61    pub manifest_bytes: u64,
62    pub unique_blob_count: u64,
63    pub total_blob_bytes: u64,
64    pub largest_blob_bytes: u64,
65    pub missing_blob_count: u64,
66    pub invalid_manifest_count: u64,
67    pub oldest_capture_at_ms: Option<i64>,
68    pub newest_capture_at_ms: Option<i64>,
69    pub oldest_source_mtime_ms: Option<i64>,
70    pub newest_source_mtime_ms: Option<i64>,
71}
72
73pub fn storage_summary(data_dir: &Path) -> RawMirrorStorageSummary {
74    let root = raw_mirror_root(data_dir);
75    let mut summary = RawMirrorStorageSummary {
76        root_path: root.display().to_string(),
77        ..RawMirrorStorageSummary::default()
78    };
79    let root_metadata = match fs::symlink_metadata(&root) {
80        Ok(metadata) => metadata,
81        Err(_) => return summary,
82    };
83    summary.initialized = true;
84    if root_metadata.file_type().is_symlink() || !root_metadata.is_dir() {
85        summary.invalid_manifest_count = 1;
86        return summary;
87    }
88
89    summary.total_storage_bytes = raw_mirror_dir_file_bytes(&root);
90
91    let manifests_dir = root.join("manifests");
92    let Ok(manifests_metadata) = fs::symlink_metadata(&manifests_dir) else {
93        return summary;
94    };
95    if manifests_metadata.file_type().is_symlink() || !manifests_metadata.is_dir() {
96        summary.invalid_manifest_count = summary.invalid_manifest_count.saturating_add(1);
97        return summary;
98    }
99    let entries = match fs::read_dir(&manifests_dir) {
100        Ok(entries) => entries,
101        Err(_) => return summary,
102    };
103    let mut seen_blobs = HashSet::new();
104    for entry in entries {
105        let Ok(entry) = entry else {
106            summary.invalid_manifest_count = summary.invalid_manifest_count.saturating_add(1);
107            continue;
108        };
109        let path = entry.path();
110        let manifest_metadata = match fs::symlink_metadata(&path) {
111            Ok(metadata) if metadata.is_file() && !metadata.file_type().is_symlink() => metadata,
112            _ => {
113                summary.invalid_manifest_count = summary.invalid_manifest_count.saturating_add(1);
114                continue;
115            }
116        };
117        summary.manifest_bytes = summary
118            .manifest_bytes
119            .saturating_add(manifest_metadata.len());
120        let manifest = match read_raw_mirror_manifest(&path) {
121            Ok(manifest) if manifest.manifest_kind == RAW_MIRROR_MANIFEST_KIND => manifest,
122            _ => {
123                summary.invalid_manifest_count = summary.invalid_manifest_count.saturating_add(1);
124                continue;
125            }
126        };
127        summary.manifest_count = summary.manifest_count.saturating_add(1);
128        merge_min_max(
129            &mut summary.oldest_capture_at_ms,
130            &mut summary.newest_capture_at_ms,
131            Some(manifest.captured_at_ms),
132        );
133        merge_min_max(
134            &mut summary.oldest_source_mtime_ms,
135            &mut summary.newest_source_mtime_ms,
136            manifest.source_mtime_ms,
137        );
138
139        let Some(blob_relative_path) = raw_mirror_blob_relative_path(&manifest.blob_blake3) else {
140            summary.invalid_manifest_count = summary.invalid_manifest_count.saturating_add(1);
141            continue;
142        };
143        if manifest.blob_relative_path != blob_relative_path {
144            summary.invalid_manifest_count = summary.invalid_manifest_count.saturating_add(1);
145            continue;
146        }
147
148        if !seen_blobs.insert(blob_relative_path.clone()) {
149            continue;
150        }
151        let blob_path = root.join(blob_relative_path);
152        match fs::symlink_metadata(&blob_path) {
153            Ok(metadata) if metadata.is_file() && !metadata.file_type().is_symlink() => {
154                let size = metadata.len();
155                summary.unique_blob_count = summary.unique_blob_count.saturating_add(1);
156                summary.total_blob_bytes = summary.total_blob_bytes.saturating_add(size);
157                summary.largest_blob_bytes = summary.largest_blob_bytes.max(size);
158            }
159            _ => {
160                summary.missing_blob_count = summary.missing_blob_count.saturating_add(1);
161            }
162        }
163    }
164
165    summary
166}
167
168#[derive(Debug, Clone, Default)]
169pub struct RawMirrorPruneOptions {
170    pub older_than_ms: Option<i64>,
171    pub max_size_bytes: Option<u64>,
172    pub keep_tags: Vec<String>,
173    pub safety_hold_down_ms: i64,
174    pub apply: bool,
175}
176
177#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
178pub struct RawMirrorPruneReport {
179    pub initialized: bool,
180    pub root_path: String,
181    pub mode: String,
182    pub manifest_count: u64,
183    pub unique_blob_count: u64,
184    pub current_blob_bytes: u64,
185    pub safety_hold_down_ms: i64,
186    pub keep_tags: Vec<String>,
187    pub pinned_manifest_count: u64,
188    pub pinned_blob_count: u64,
189    pub planned_manifest_count: u64,
190    pub planned_blob_count: u64,
191    pub planned_reclaim_bytes: u64,
192    pub applied_manifest_count: u64,
193    pub applied_blob_count: u64,
194    pub applied_reclaim_bytes: u64,
195    pub audit_log_path: Option<String>,
196    pub entries: Vec<RawMirrorPruneEntry>,
197}
198
199#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
200pub struct RawMirrorPruneEntry {
201    pub kind: String,
202    pub path: String,
203    pub blob_blake3: Option<String>,
204    pub size_bytes: u64,
205    pub reason: String,
206    pub applied: bool,
207}
208
209#[derive(Debug, Clone)]
210struct RawMirrorPruneManifest {
211    manifest_id: String,
212    relative_path: String,
213    size_bytes: u64,
214    blob_blake3: String,
215    blob_relative_path: String,
216    blob_size_bytes: u64,
217    captured_at_ms: i64,
218    provider: String,
219    original_path: String,
220    db_links: Vec<RawMirrorDbLink>,
221}
222
223pub fn prune(data_dir: &Path, options: RawMirrorPruneOptions) -> Result<RawMirrorPruneReport> {
224    let root = raw_mirror_root(data_dir);
225    let mut report = RawMirrorPruneReport {
226        initialized: false,
227        root_path: root.display().to_string(),
228        mode: if options.apply {
229            "apply".to_string()
230        } else {
231            "dry-run".to_string()
232        },
233        manifest_count: 0,
234        unique_blob_count: 0,
235        current_blob_bytes: 0,
236        safety_hold_down_ms: options.safety_hold_down_ms,
237        keep_tags: options.keep_tags.clone(),
238        pinned_manifest_count: 0,
239        pinned_blob_count: 0,
240        planned_manifest_count: 0,
241        planned_blob_count: 0,
242        planned_reclaim_bytes: 0,
243        applied_manifest_count: 0,
244        applied_blob_count: 0,
245        applied_reclaim_bytes: 0,
246        audit_log_path: None,
247        entries: Vec::new(),
248    };
249
250    let metadata = match fs::symlink_metadata(&root) {
251        Ok(metadata) => metadata,
252        Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(report),
253        Err(err) => {
254            return Err(err).with_context(|| format!("stat raw mirror root {}", root.display()));
255        }
256    };
257    if metadata.file_type().is_symlink() || !metadata.is_dir() {
258        anyhow::bail!(
259            "refusing to prune invalid raw mirror root {}",
260            root.display()
261        );
262    }
263    report.initialized = true;
264
265    let manifests = collect_prune_manifests(&root)?;
266    report.manifest_count = manifests.len() as u64;
267
268    let mut blob_to_manifests: HashMap<String, Vec<String>> = HashMap::new();
269    let mut manifest_by_id: HashMap<String, &RawMirrorPruneManifest> = HashMap::new();
270    let mut blob_size_by_relative: HashMap<String, u64> = HashMap::new();
271    for manifest in &manifests {
272        manifest_by_id.insert(manifest.manifest_id.clone(), manifest);
273        blob_to_manifests
274            .entry(manifest.blob_relative_path.clone())
275            .or_default()
276            .push(manifest.manifest_id.clone());
277        blob_size_by_relative
278            .entry(manifest.blob_relative_path.clone())
279            .or_insert_with(|| {
280                blob_file_size(&root.join(&manifest.blob_relative_path))
281                    .unwrap_or(manifest.blob_size_bytes)
282            });
283    }
284    report.unique_blob_count = blob_size_by_relative.len() as u64;
285    report.current_blob_bytes = blob_size_by_relative
286        .values()
287        .copied()
288        .fold(0u64, u64::saturating_add);
289
290    let now = now_ms();
291    let pinned_manifests = pinned_prune_manifest_ids(
292        data_dir,
293        &manifests,
294        &options.keep_tags,
295        options.safety_hold_down_ms,
296        now,
297    )?;
298    report.pinned_manifest_count = pinned_manifests.len() as u64;
299    let pinned_blobs: HashSet<String> = blob_to_manifests
300        .iter()
301        .filter(|(_, manifest_ids)| manifest_ids.iter().any(|id| pinned_manifests.contains(id)))
302        .map(|(blob_relative_path, _)| blob_relative_path.clone())
303        .collect();
304    report.pinned_blob_count = pinned_blobs.len() as u64;
305
306    let mut selected_manifests: HashSet<String> = HashSet::new();
307    let mut manifest_reasons: HashMap<String, String> = HashMap::new();
308
309    if let Some(older_than_ms) = options.older_than_ms {
310        let cutoff_ms = now.saturating_sub(older_than_ms.max(0));
311        for manifest in &manifests {
312            if manifest.captured_at_ms <= cutoff_ms
313                && !pinned_manifests.contains(&manifest.manifest_id)
314            {
315                selected_manifests.insert(manifest.manifest_id.clone());
316                manifest_reasons
317                    .entry(manifest.manifest_id.clone())
318                    .or_insert_with(|| format!("captured_at_ms <= {cutoff_ms}"));
319            }
320        }
321    }
322
323    if let Some(max_size_bytes) = options.max_size_bytes
324        && report.current_blob_bytes > max_size_bytes
325    {
326        let mut blob_groups: Vec<_> = blob_to_manifests
327            .iter()
328            .map(|(blob_relative_path, manifest_ids)| {
329                let oldest_capture = manifest_ids
330                    .iter()
331                    .filter_map(|id| manifest_by_id.get(id).map(|m| m.captured_at_ms))
332                    .min()
333                    .unwrap_or(i64::MAX);
334                let size = blob_size_by_relative
335                    .get(blob_relative_path)
336                    .copied()
337                    .unwrap_or(0);
338                (
339                    blob_relative_path.clone(),
340                    manifest_ids.clone(),
341                    oldest_capture,
342                    size,
343                )
344            })
345            .collect::<Vec<_>>();
346        blob_groups.sort_by(|left, right| left.2.cmp(&right.2).then_with(|| left.0.cmp(&right.0)));
347
348        let mut projected_bytes = report.current_blob_bytes;
349        for (blob_relative_path, manifest_ids, _, size) in blob_groups {
350            if projected_bytes <= max_size_bytes {
351                break;
352            }
353            if pinned_blobs.contains(&blob_relative_path) {
354                continue;
355            }
356            for manifest_id in manifest_ids {
357                if !pinned_manifests.contains(&manifest_id) {
358                    selected_manifests.insert(manifest_id.clone());
359                    manifest_reasons.entry(manifest_id).or_insert_with(|| {
360                        format!("max-size over budget; retiring blob {blob_relative_path}")
361                    });
362                }
363            }
364            projected_bytes = projected_bytes.saturating_sub(size);
365        }
366    }
367
368    let selected_blobs: HashSet<String> = blob_to_manifests
369        .iter()
370        .filter(|(_, manifest_ids)| {
371            manifest_ids
372                .iter()
373                .all(|id| selected_manifests.contains(id))
374        })
375        .map(|(blob_relative_path, _)| blob_relative_path.clone())
376        .collect();
377
378    let mut entries = Vec::new();
379    let mut selected_manifest_ids = selected_manifests.into_iter().collect::<Vec<_>>();
380    selected_manifest_ids.sort();
381    for manifest_id in selected_manifest_ids {
382        let Some(manifest) = manifest_by_id.get(&manifest_id) else {
383            continue;
384        };
385        let reason = manifest_reasons
386            .remove(&manifest_id)
387            .unwrap_or_else(|| "selected by retention policy".to_string());
388        entries.push(RawMirrorPruneEntry {
389            kind: "manifest".to_string(),
390            path: manifest.relative_path.clone(),
391            blob_blake3: Some(manifest.blob_blake3.clone()),
392            size_bytes: manifest.size_bytes,
393            reason,
394            applied: false,
395        });
396    }
397
398    let mut selected_blob_paths = selected_blobs.into_iter().collect::<Vec<_>>();
399    selected_blob_paths.sort();
400    for blob_relative_path in selected_blob_paths {
401        let size = blob_size_by_relative
402            .get(&blob_relative_path)
403            .copied()
404            .unwrap_or(0);
405        let blob_blake3 = blob_relative_path
406            .rsplit('/')
407            .next()
408            .and_then(|name| name.strip_suffix(".raw"))
409            .map(ToOwned::to_owned);
410        entries.push(RawMirrorPruneEntry {
411            kind: "blob".to_string(),
412            path: blob_relative_path,
413            blob_blake3,
414            size_bytes: size,
415            reason: "no retained manifest references this blob after prune plan".to_string(),
416            applied: false,
417        });
418    }
419
420    report.planned_manifest_count = entries
421        .iter()
422        .filter(|entry| entry.kind == "manifest")
423        .count() as u64;
424    report.planned_blob_count = entries.iter().filter(|entry| entry.kind == "blob").count() as u64;
425    report.planned_reclaim_bytes = entries
426        .iter()
427        .map(|entry| entry.size_bytes)
428        .fold(0, u64::saturating_add);
429
430    if options.apply {
431        for entry in &mut entries {
432            let path = root.join(&entry.path);
433            let removed = remove_prune_target_file(&path)
434                .with_context(|| format!("applying raw mirror prune for {}", path.display()))?;
435            entry.applied = removed;
436            if removed {
437                if entry.kind == "manifest" {
438                    report.applied_manifest_count = report.applied_manifest_count.saturating_add(1);
439                } else if entry.kind == "blob" {
440                    report.applied_blob_count = report.applied_blob_count.saturating_add(1);
441                }
442                report.applied_reclaim_bytes = report
443                    .applied_reclaim_bytes
444                    .saturating_add(entry.size_bytes);
445            }
446        }
447    }
448
449    report.entries = entries;
450    if !report.entries.is_empty() {
451        let audit_path = append_prune_audit_log(&root, &report)?;
452        report.audit_log_path = Some(audit_path.display().to_string());
453    }
454    Ok(report)
455}
456
457fn collect_prune_manifests(root: &Path) -> Result<Vec<RawMirrorPruneManifest>> {
458    let manifests_dir = root.join("manifests");
459    let metadata = match fs::symlink_metadata(&manifests_dir) {
460        Ok(metadata) => metadata,
461        Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
462        Err(err) => return Err(err).with_context(|| format!("stat {}", manifests_dir.display())),
463    };
464    if metadata.file_type().is_symlink() || !metadata.is_dir() {
465        anyhow::bail!(
466            "refusing to prune invalid raw mirror manifests directory {}",
467            manifests_dir.display()
468        );
469    }
470
471    let mut manifests = Vec::new();
472    for entry in
473        fs::read_dir(&manifests_dir).with_context(|| format!("read {}", manifests_dir.display()))?
474    {
475        let entry = entry?;
476        let path = entry.path();
477        if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
478            continue;
479        }
480        let manifest_metadata = fs::symlink_metadata(&path)
481            .with_context(|| format!("stat raw mirror manifest {}", path.display()))?;
482        if manifest_metadata.file_type().is_symlink() || !manifest_metadata.is_file() {
483            anyhow::bail!(
484                "refusing to prune with non-regular raw mirror manifest {}",
485                path.display()
486            );
487        }
488        let manifest = read_raw_mirror_manifest(&path)?;
489        if manifest.manifest_kind != RAW_MIRROR_MANIFEST_KIND {
490            anyhow::bail!(
491                "refusing to prune with unexpected raw mirror manifest kind `{}` in {}",
492                manifest.manifest_kind,
493                path.display()
494            );
495        }
496        let Some(expected_blob_relative_path) =
497            raw_mirror_blob_relative_path(&manifest.blob_blake3)
498        else {
499            anyhow::bail!(
500                "refusing to prune raw mirror manifest {} with invalid blob hash",
501                path.display()
502            );
503        };
504        if manifest.blob_relative_path != expected_blob_relative_path {
505            anyhow::bail!(
506                "refusing to prune raw mirror manifest {} with unexpected blob path `{}`",
507                path.display(),
508                manifest.blob_relative_path
509            );
510        }
511        let relative_path = path
512            .strip_prefix(root)
513            .unwrap_or(&path)
514            .display()
515            .to_string();
516        manifests.push(RawMirrorPruneManifest {
517            manifest_id: manifest.manifest_id,
518            relative_path,
519            size_bytes: manifest_metadata.len(),
520            blob_blake3: manifest.blob_blake3,
521            blob_relative_path: manifest.blob_relative_path,
522            blob_size_bytes: manifest.blob_size_bytes,
523            captured_at_ms: manifest.captured_at_ms,
524            provider: manifest.provider,
525            original_path: manifest.original_path,
526            db_links: manifest.db_links,
527        });
528    }
529    manifests.sort_by(|left, right| {
530        left.captured_at_ms
531            .cmp(&right.captured_at_ms)
532            .then_with(|| left.provider.cmp(&right.provider))
533            .then_with(|| left.original_path.cmp(&right.original_path))
534            .then_with(|| left.manifest_id.cmp(&right.manifest_id))
535    });
536    Ok(manifests)
537}
538
539fn pinned_prune_manifest_ids(
540    data_dir: &Path,
541    manifests: &[RawMirrorPruneManifest],
542    keep_tags: &[String],
543    safety_hold_down_ms: i64,
544    now_ms: i64,
545) -> Result<HashSet<String>> {
546    let mut pinned = HashSet::new();
547    if safety_hold_down_ms > 0 {
548        let cutoff_ms = now_ms.saturating_sub(safety_hold_down_ms);
549        for manifest in manifests {
550            if manifest.captured_at_ms > cutoff_ms {
551                pinned.insert(manifest.manifest_id.clone());
552            }
553        }
554    }
555
556    let normalized_keep_tags = keep_tags
557        .iter()
558        .map(|tag| tag.trim())
559        .filter(|tag| !tag.is_empty())
560        .map(ToOwned::to_owned)
561        .collect::<Vec<_>>();
562    if normalized_keep_tags.is_empty() {
563        return Ok(pinned);
564    }
565
566    let keep_tag_conversation_ids =
567        load_keep_tag_conversation_ids(data_dir, manifests, &normalized_keep_tags)?;
568    for manifest in manifests {
569        if manifest.db_links.iter().any(|link| {
570            link.conversation_id
571                .is_some_and(|id| keep_tag_conversation_ids.contains(&id))
572        }) {
573            pinned.insert(manifest.manifest_id.clone());
574        }
575    }
576    Ok(pinned)
577}
578
579fn load_keep_tag_conversation_ids(
580    data_dir: &Path,
581    manifests: &[RawMirrorPruneManifest],
582    keep_tags: &[String],
583) -> Result<HashSet<i64>> {
584    use frankensqlite::compat::{ConnectionExt as _, ParamValue, RowExt as _};
585
586    let mut conversation_ids = manifests
587        .iter()
588        .flat_map(|manifest| manifest.db_links.iter())
589        .filter_map(|link| link.conversation_id)
590        .collect::<Vec<_>>();
591    conversation_ids.sort_unstable();
592    conversation_ids.dedup();
593    if conversation_ids.is_empty() {
594        return Ok(HashSet::new());
595    }
596
597    let db_path = data_dir.join("agent_search.db");
598    let conn = crate::storage::sqlite::open_franken_raw_readonly_connection_with_timeout(
599        &db_path,
600        Duration::from_secs(30),
601    )
602    .with_context(|| {
603        format!(
604            "open {} to honor raw-mirror prune --keep-tag",
605            db_path.display()
606        )
607    })?;
608    let _ = conn.execute("PRAGMA query_only = 1;");
609
610    let mut pinned = HashSet::new();
611    for id_chunk in conversation_ids.chunks(400) {
612        let tag_placeholders = (0..keep_tags.len())
613            .map(|idx| format!("?{}", idx + 1))
614            .collect::<Vec<_>>()
615            .join(", ");
616        let id_offset = keep_tags.len();
617        let id_placeholders = (0..id_chunk.len())
618            .map(|idx| format!("?{}", id_offset + idx + 1))
619            .collect::<Vec<_>>()
620            .join(", ");
621        let sql = format!(
622            "SELECT DISTINCT ct.conversation_id \
623             FROM conversation_tags ct \
624             JOIN tags t ON t.id = ct.tag_id \
625             WHERE t.name IN ({tag_placeholders}) \
626               AND ct.conversation_id IN ({id_placeholders})"
627        );
628        let mut params = keep_tags
629            .iter()
630            .map(|tag| ParamValue::from(tag.as_str()))
631            .collect::<Vec<_>>();
632        params.extend(id_chunk.iter().copied().map(ParamValue::from));
633        let rows: Vec<i64> = conn
634            .query_map_collect(&sql, &params, |row: &frankensqlite::Row| row.get_typed(0))
635            .with_context(|| "query raw-mirror prune keep-tag conversation pins")?;
636        pinned.extend(rows);
637    }
638
639    Ok(pinned)
640}
641
642fn blob_file_size(path: &Path) -> Option<u64> {
643    fs::symlink_metadata(path)
644        .ok()
645        .filter(|metadata| metadata.is_file() && !metadata.file_type().is_symlink())
646        .map(|metadata| metadata.len())
647}
648
649fn remove_prune_target_file(path: &Path) -> Result<bool> {
650    let metadata = match fs::symlink_metadata(path) {
651        Ok(metadata) => metadata,
652        Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(false),
653        Err(err) => return Err(err).with_context(|| format!("stat {}", path.display())),
654    };
655    if metadata.file_type().is_symlink() || !metadata.is_file() {
656        anyhow::bail!(
657            "refusing to prune non-regular raw mirror file {}",
658            path.display()
659        );
660    }
661    fs::remove_file(path).with_context(|| format!("remove raw mirror file {}", path.display()))?;
662    sync_parent(path)?;
663    Ok(true)
664}
665
666fn append_prune_audit_log(root: &Path, report: &RawMirrorPruneReport) -> Result<PathBuf> {
667    ensure_private_dir(root)?;
668    let audit_path = root.join("pruned.jsonl");
669    ensure_prune_audit_log_appendable(&audit_path)?;
670    let mut file = OpenOptions::new()
671        .create(true)
672        .append(true)
673        .open(&audit_path)
674        .with_context(|| format!("open raw mirror prune audit {}", audit_path.display()))?;
675    set_private_file_permissions(&audit_path)?;
676    let now = now_ms();
677    for entry in &report.entries {
678        let record = json!({
679            "schema_version": 1,
680            "recorded_at_ms": now,
681            "mode": report.mode,
682            "kind": entry.kind,
683            "path": entry.path,
684            "blob_blake3": entry.blob_blake3,
685            "size_bytes": entry.size_bytes,
686            "reason": entry.reason,
687            "applied": entry.applied,
688        });
689        writeln!(file, "{record}")
690            .with_context(|| format!("write raw mirror prune audit {}", audit_path.display()))?;
691    }
692    file.sync_all()
693        .with_context(|| format!("sync raw mirror prune audit {}", audit_path.display()))?;
694    sync_parent(&audit_path)?;
695    Ok(audit_path)
696}
697
698fn ensure_prune_audit_log_appendable(path: &Path) -> Result<()> {
699    match fs::symlink_metadata(path) {
700        Ok(metadata) if metadata.file_type().is_symlink() => {
701            anyhow::bail!(
702                "refusing to append raw mirror prune audit through symlink {}",
703                path.display()
704            );
705        }
706        Ok(metadata) if !metadata.is_file() => {
707            anyhow::bail!(
708                "refusing to append raw mirror prune audit to non-file {}",
709                path.display()
710            );
711        }
712        Ok(_) => Ok(()),
713        Err(err) if matches!(err.kind(), std::io::ErrorKind::NotFound) => Ok(()),
714        Err(err) => Err(err).with_context(|| {
715            format!(
716                "inspect raw mirror prune audit before append {}",
717                path.display()
718            )
719        }),
720    }
721}
722
723fn merge_min_max(min: &mut Option<i64>, max: &mut Option<i64>, value: Option<i64>) {
724    let Some(value) = value else {
725        return;
726    };
727    *min = Some(min.map_or(value, |current| current.min(value)));
728    *max = Some(max.map_or(value, |current| current.max(value)));
729}
730
731fn raw_mirror_dir_file_bytes(root: &Path) -> u64 {
732    let mut total = 0u64;
733    let mut stack = vec![root.to_path_buf()];
734    while let Some(path) = stack.pop() {
735        let Ok(metadata) = fs::symlink_metadata(&path) else {
736            continue;
737        };
738        if metadata.file_type().is_symlink() {
739            continue;
740        }
741        if metadata.is_file() {
742            total = total.saturating_add(metadata.len());
743        } else if metadata.is_dir() {
744            let Ok(entries) = fs::read_dir(&path) else {
745                continue;
746            };
747            for entry in entries.flatten() {
748                stack.push(entry.path());
749            }
750        }
751    }
752    total
753}
754
755#[derive(Debug, Clone, PartialEq, Eq, Hash)]
756struct RawMirrorBlobCacheKey {
757    data_dir: PathBuf,
758    source_path: PathBuf,
759    source_identity: Option<String>,
760    source_size_bytes: u64,
761    source_mtime_ns: Option<u128>,
762    source_change_time_ns: Option<u128>,
763}
764
765#[derive(Debug, Clone, PartialEq, Eq)]
766struct RawMirrorBlobRecord {
767    blob_blake3: String,
768    bytes_copied: u64,
769}
770
771#[derive(Debug, Clone, Serialize, Deserialize)]
772struct RawMirrorCompressionEnvelope {
773    state: String,
774    algorithm: Option<String>,
775    uncompressed_size_bytes: Option<u64>,
776}
777
778#[derive(Debug, Clone, Serialize, Deserialize)]
779struct RawMirrorEncryptionEnvelope {
780    state: String,
781    algorithm: Option<String>,
782    key_id: Option<String>,
783    envelope_version: Option<u32>,
784}
785
786#[derive(Debug, Clone, Serialize, Deserialize)]
787struct RawMirrorVerificationRecord {
788    status: String,
789    verifier: String,
790    content_blake3: Option<String>,
791    verified_at_ms: Option<i64>,
792}
793
794#[derive(Debug, Clone, Serialize, Deserialize)]
795struct RawMirrorManifestFile {
796    schema_version: u32,
797    manifest_kind: String,
798    manifest_id: String,
799    blob_hash_algorithm: String,
800    blob_relative_path: String,
801    blob_blake3: String,
802    blob_size_bytes: u64,
803    provider: String,
804    source_id: String,
805    origin_kind: String,
806    origin_host: Option<String>,
807    original_path: String,
808    redacted_original_path: String,
809    original_path_blake3: String,
810    captured_at_ms: i64,
811    source_mtime_ms: Option<i64>,
812    source_size_bytes: u64,
813    compression: RawMirrorCompressionEnvelope,
814    encryption: RawMirrorEncryptionEnvelope,
815    db_links: Vec<RawMirrorDbLink>,
816    verification: RawMirrorVerificationRecord,
817    manifest_blake3: Option<String>,
818}
819
820pub fn capture_source_file(input: RawMirrorCaptureInput<'_>) -> Result<RawMirrorCaptureRecord> {
821    let source_metadata = fs::symlink_metadata(input.source_path)
822        .with_context(|| format!("stat raw mirror source {}", input.source_path.display()))?;
823    if source_metadata.file_type().is_symlink() {
824        return Err(anyhow!(
825            "refusing to raw-mirror symlink source {}",
826            input.source_path.display()
827        ));
828    }
829    if !source_metadata.is_file() {
830        return Err(anyhow!(
831            "refusing to raw-mirror non-file source {}",
832            input.source_path.display()
833        ));
834    }
835
836    let root = ensure_raw_mirror_root(input.data_dir)?;
837    ensure_private_dir_descendant(&root, &root.join("tmp"))?;
838
839    let cache_key = raw_mirror_blob_cache_key(&input, &source_metadata);
840    let (blob_blake3, bytes_copied, blob_already_present) =
841        match cached_raw_mirror_blob_record(&cache_key, &root) {
842            Some(record) => (record.blob_blake3, record.bytes_copied, true),
843            None => {
844                let temp_dir = unique_capture_temp_dir(&root);
845                ensure_private_dir_descendant(&root, &temp_dir)?;
846                let CopyToTempResult {
847                    temp_path,
848                    blob_blake3,
849                    bytes_copied,
850                } = copy_source_to_private_temp(input.source_path, &temp_dir, &source_metadata)?;
851                let blob_relative_path = raw_mirror_blob_relative_path(&blob_blake3)
852                    .ok_or_else(|| anyhow!("computed invalid raw mirror blake3 digest"))?;
853                let blob_path = root.join(&blob_relative_path);
854                let already_present =
855                    publish_content_addressed_temp(&root, &temp_path, &blob_path, &blob_blake3)?;
856                remove_empty_temp_dir_best_effort(&temp_dir);
857                cache_raw_mirror_blob_record(
858                    cache_key.clone(),
859                    RawMirrorBlobRecord {
860                        blob_blake3: blob_blake3.clone(),
861                        bytes_copied,
862                    },
863                );
864                (blob_blake3, bytes_copied, already_present)
865            }
866        };
867    let blob_relative_path = raw_mirror_blob_relative_path(&blob_blake3)
868        .ok_or_else(|| anyhow!("computed invalid raw mirror blake3 digest"))?;
869
870    let original_path = input.source_path.display().to_string();
871    let original_path_blake3 = raw_mirror_original_path_blake3(&original_path);
872    let manifest_id = raw_mirror_manifest_id(
873        input.provider,
874        input.source_id,
875        input.origin_kind,
876        input.origin_host,
877        &original_path_blake3,
878        &blob_blake3,
879    );
880    let manifest_relative_path = raw_mirror_manifest_relative_path(&manifest_id);
881    let manifest_path = root.join(&manifest_relative_path);
882    let captured_at_ms = now_ms();
883    let source_mtime_ms = source_metadata.modified().ok().and_then(system_time_to_ms);
884    let mut manifest = RawMirrorManifestFile {
885        schema_version: RAW_MIRROR_SCHEMA_VERSION,
886        manifest_kind: RAW_MIRROR_MANIFEST_KIND.to_string(),
887        manifest_id: manifest_id.clone(),
888        blob_hash_algorithm: RAW_MIRROR_HASH_ALGORITHM.to_string(),
889        blob_relative_path: blob_relative_path.clone(),
890        blob_blake3: blob_blake3.clone(),
891        blob_size_bytes: bytes_copied,
892        provider: input.provider.to_string(),
893        source_id: input.source_id.to_string(),
894        origin_kind: input.origin_kind.to_string(),
895        origin_host: input.origin_host.map(ToOwned::to_owned),
896        original_path,
897        redacted_original_path: redacted_original_path(input.provider, input.source_path),
898        original_path_blake3,
899        captured_at_ms,
900        source_mtime_ms,
901        source_size_bytes: source_metadata.len(),
902        compression: RawMirrorCompressionEnvelope {
903            state: "none".to_string(),
904            algorithm: None,
905            uncompressed_size_bytes: Some(bytes_copied),
906        },
907        encryption: RawMirrorEncryptionEnvelope {
908            state: "none".to_string(),
909            algorithm: None,
910            key_id: None,
911            envelope_version: None,
912        },
913        db_links: unique_db_links(input.db_links),
914        verification: RawMirrorVerificationRecord {
915            status: "captured".to_string(),
916            verifier: "cass_indexer".to_string(),
917            content_blake3: Some(blob_blake3.clone()),
918            verified_at_ms: Some(captured_at_ms),
919        },
920        manifest_blake3: None,
921    };
922    manifest.manifest_blake3 = Some(raw_mirror_manifest_blake3(&manifest));
923    let manifest_bytes = serde_json::to_vec_pretty(&manifest)?;
924    let manifest_already_present =
925        publish_manifest_bytes_create_new(&root, &manifest_path, &manifest_bytes, &blob_blake3)?;
926    let (record_blob_size_bytes, record_captured_at_ms, record_source_mtime_ms) =
927        if manifest_already_present {
928            merge_raw_mirror_manifest_db_links(
929                &root,
930                &manifest_path,
931                input.db_links,
932                Some(&blob_blake3),
933            )?;
934            let published = read_raw_mirror_manifest(&manifest_path)?;
935            (
936                published.blob_size_bytes,
937                published.captured_at_ms,
938                published.source_mtime_ms,
939            )
940        } else {
941            (bytes_copied, captured_at_ms, source_mtime_ms)
942        };
943
944    Ok(RawMirrorCaptureRecord {
945        manifest_id,
946        manifest_relative_path,
947        blob_relative_path,
948        blob_blake3,
949        blob_size_bytes: record_blob_size_bytes,
950        captured_at_ms: record_captured_at_ms,
951        source_mtime_ms: record_source_mtime_ms,
952        already_present: blob_already_present && manifest_already_present,
953    })
954}
955
956pub fn merge_manifest_db_links(
957    data_dir: &Path,
958    manifest_relative_path: &str,
959    links: &[RawMirrorDbLink],
960) -> Result<()> {
961    if links.is_empty() {
962        return Ok(());
963    }
964    let root = raw_mirror_root(data_dir);
965    let manifest_path = raw_mirror_manifest_path_from_relative(&root, manifest_relative_path)?;
966    merge_raw_mirror_manifest_db_links(&root, &manifest_path, links, None)
967}
968
969struct CopyToTempResult {
970    temp_path: PathBuf,
971    blob_blake3: String,
972    bytes_copied: u64,
973}
974
975fn copy_source_to_private_temp(
976    source_path: &Path,
977    temp_dir: &Path,
978    source_metadata: &fs::Metadata,
979) -> Result<CopyToTempResult> {
980    let temp_path = unique_temp_path(temp_dir, "blob");
981    let mut source = open_stable_source_file(source_path, source_metadata)?;
982    let mut temp = private_create_new_file(&temp_path)?;
983    let mut hasher = blake3::Hasher::new();
984    let mut buffer = [0u8; 64 * 1024];
985    let mut bytes_copied = 0u64;
986    loop {
987        let read = source
988            .read(&mut buffer)
989            .with_context(|| format!("read raw mirror source {}", source_path.display()))?;
990        if read == 0 {
991            break;
992        }
993        temp.write_all(&buffer[..read])
994            .with_context(|| format!("write raw mirror temp {}", temp_path.display()))?;
995        hasher.update(&buffer[..read]);
996        bytes_copied = bytes_copied.saturating_add(read as u64);
997    }
998    temp.sync_all()
999        .with_context(|| format!("sync raw mirror temp {}", temp_path.display()))?;
1000
1001    let final_source_metadata = source
1002        .metadata()
1003        .with_context(|| format!("stat opened raw mirror source {}", source_path.display()))?;
1004    if source_file_changed_during_capture(source_metadata, &final_source_metadata) {
1005        remove_temp_best_effort(&temp_path);
1006        return Err(anyhow!(
1007            "raw mirror source {} changed while it was being captured; retry indexing to capture a stable copy",
1008            source_path.display()
1009        ));
1010    }
1011
1012    Ok(CopyToTempResult {
1013        temp_path,
1014        blob_blake3: hasher.finalize().to_hex().to_string(),
1015        bytes_copied,
1016    })
1017}
1018
1019fn open_stable_source_file(source_path: &Path, expected_metadata: &fs::Metadata) -> Result<File> {
1020    let source = File::open(source_path)
1021        .with_context(|| format!("open raw mirror source {}", source_path.display()))?;
1022    let opened_metadata = source
1023        .metadata()
1024        .with_context(|| format!("stat opened raw mirror source {}", source_path.display()))?;
1025    if !same_source_identity(expected_metadata, &opened_metadata) {
1026        return Err(anyhow!(
1027            "raw mirror source {} changed identity before capture",
1028            source_path.display()
1029        ));
1030    }
1031    let current_path_metadata = fs::symlink_metadata(source_path)
1032        .with_context(|| format!("restat raw mirror source {}", source_path.display()))?;
1033    if current_path_metadata.file_type().is_symlink() {
1034        return Err(anyhow!(
1035            "refusing to raw-mirror symlink source {}",
1036            source_path.display()
1037        ));
1038    }
1039    if !same_source_identity(expected_metadata, &current_path_metadata) {
1040        return Err(anyhow!(
1041            "raw mirror source {} changed identity before capture",
1042            source_path.display()
1043        ));
1044    }
1045    Ok(source)
1046}
1047
1048#[cfg(unix)]
1049fn same_source_identity(expected: &fs::Metadata, actual: &fs::Metadata) -> bool {
1050    use std::os::unix::fs::MetadataExt;
1051    actual.is_file() && expected.dev() == actual.dev() && expected.ino() == actual.ino()
1052}
1053
1054#[cfg(not(unix))]
1055fn same_source_identity(_expected: &fs::Metadata, actual: &fs::Metadata) -> bool {
1056    actual.is_file()
1057}
1058
1059#[cfg(unix)]
1060fn source_identity_token(metadata: &fs::Metadata) -> Option<String> {
1061    use std::os::unix::fs::MetadataExt;
1062    Some(format!("{}:{}", metadata.dev(), metadata.ino()))
1063}
1064
1065#[cfg(not(unix))]
1066fn source_identity_token(_metadata: &fs::Metadata) -> Option<String> {
1067    None
1068}
1069
1070#[cfg(unix)]
1071fn source_change_time_ns(metadata: &fs::Metadata) -> Option<u128> {
1072    use std::os::unix::fs::MetadataExt;
1073
1074    let seconds = u128::try_from(metadata.ctime()).ok()?;
1075    let nanoseconds = u128::try_from(metadata.ctime_nsec()).ok()?;
1076    Some(
1077        seconds
1078            .saturating_mul(1_000_000_000)
1079            .saturating_add(nanoseconds),
1080    )
1081}
1082
1083#[cfg(not(unix))]
1084fn source_change_time_ns(_metadata: &fs::Metadata) -> Option<u128> {
1085    None
1086}
1087
1088fn source_file_changed_during_capture(
1089    initial: &fs::Metadata,
1090    final_metadata: &fs::Metadata,
1091) -> bool {
1092    if initial.len() != final_metadata.len() {
1093        return true;
1094    }
1095    match (initial.modified().ok(), final_metadata.modified().ok()) {
1096        (Some(initial_mtime), Some(final_mtime)) => initial_mtime != final_mtime,
1097        _ => false,
1098    }
1099}
1100
1101fn publish_content_addressed_temp(
1102    root: &Path,
1103    temp_path: &Path,
1104    final_path: &Path,
1105    expected_blake3: &str,
1106) -> Result<bool> {
1107    ensure_private_dir_descendant(
1108        root,
1109        final_path
1110            .parent()
1111            .ok_or_else(|| anyhow!("raw mirror blob path has no parent"))?,
1112    )?;
1113    if final_path.exists() {
1114        verify_existing_file(final_path, expected_blake3)?;
1115        remove_temp_best_effort(temp_path);
1116        return Ok(true);
1117    }
1118
1119    match fs::hard_link(temp_path, final_path) {
1120        Ok(()) => {
1121            sync_file(final_path)?;
1122            sync_parent(final_path)?;
1123            remove_temp_best_effort(temp_path);
1124            Ok(false)
1125        }
1126        Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {
1127            verify_existing_file(final_path, expected_blake3)?;
1128            remove_temp_best_effort(temp_path);
1129            Ok(true)
1130        }
1131        Err(err) => Err(anyhow!(
1132            "publish raw mirror blob {} from {}: {err}",
1133            final_path.display(),
1134            temp_path.display()
1135        )),
1136    }
1137}
1138
1139fn publish_manifest_bytes_create_new(
1140    root: &Path,
1141    manifest_path: &Path,
1142    manifest_bytes: &[u8],
1143    blob_blake3: &str,
1144) -> Result<bool> {
1145    ensure_private_dir_descendant(
1146        root,
1147        manifest_path
1148            .parent()
1149            .ok_or_else(|| anyhow!("raw mirror manifest path has no parent"))?,
1150    )?;
1151    if manifest_path.exists() {
1152        verify_existing_manifest(manifest_path, blob_blake3)?;
1153        return Ok(true);
1154    }
1155
1156    let temp_dir = unique_capture_temp_dir(root);
1157    ensure_private_dir_descendant(root, &temp_dir)?;
1158    let temp_path = unique_temp_path(&temp_dir, "manifest");
1159    let mut temp = private_create_new_file(&temp_path)?;
1160    temp.write_all(manifest_bytes)
1161        .with_context(|| format!("write raw mirror manifest temp {}", temp_path.display()))?;
1162    temp.sync_all()
1163        .with_context(|| format!("sync raw mirror manifest temp {}", temp_path.display()))?;
1164
1165    match fs::hard_link(&temp_path, manifest_path) {
1166        Ok(()) => {
1167            sync_file(manifest_path)?;
1168            sync_parent(manifest_path)?;
1169            remove_temp_best_effort(&temp_path);
1170            remove_empty_temp_dir_best_effort(&temp_dir);
1171            Ok(false)
1172        }
1173        Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {
1174            verify_existing_manifest(manifest_path, blob_blake3)?;
1175            remove_temp_best_effort(&temp_path);
1176            remove_empty_temp_dir_best_effort(&temp_dir);
1177            Ok(true)
1178        }
1179        Err(err) => Err(anyhow!(
1180            "publish raw mirror manifest {} from {}: {err}",
1181            manifest_path.display(),
1182            temp_path.display()
1183        )),
1184    }
1185}
1186
1187fn merge_raw_mirror_manifest_db_links(
1188    root: &Path,
1189    manifest_path: &Path,
1190    links: &[RawMirrorDbLink],
1191    expected_blob_blake3: Option<&str>,
1192) -> Result<()> {
1193    if links.is_empty() {
1194        return Ok(());
1195    }
1196
1197    let lock = MANIFEST_UPDATE_LOCK.get_or_init(|| Mutex::new(()));
1198    let _guard = lock
1199        .lock()
1200        .map_err(|_| anyhow!("raw mirror manifest update lock poisoned"))?;
1201
1202    let mut manifest = read_raw_mirror_manifest(manifest_path)?;
1203    if let Some(expected_blob_blake3) = expected_blob_blake3
1204        && manifest.blob_blake3 != expected_blob_blake3
1205    {
1206        return Err(anyhow!(
1207            "existing raw mirror manifest {} points at blob {}, expected {}",
1208            manifest_path.display(),
1209            manifest.blob_blake3,
1210            expected_blob_blake3
1211        ));
1212    }
1213
1214    let mut merged_links = manifest.db_links.clone();
1215    merged_links.extend_from_slice(links);
1216    let merged_links = unique_db_links(&merged_links);
1217    if merged_links == manifest.db_links {
1218        return Ok(());
1219    }
1220
1221    manifest.db_links = merged_links;
1222    manifest.manifest_blake3 = Some(raw_mirror_manifest_blake3(&manifest));
1223    let manifest_bytes = serde_json::to_vec_pretty(&manifest)?;
1224    replace_manifest_bytes(root, manifest_path, &manifest_bytes)
1225}
1226
1227fn replace_manifest_bytes(root: &Path, manifest_path: &Path, manifest_bytes: &[u8]) -> Result<()> {
1228    ensure_private_dir_descendant(
1229        root,
1230        manifest_path
1231            .parent()
1232            .ok_or_else(|| anyhow!("raw mirror manifest path has no parent"))?,
1233    )?;
1234    let temp_dir = unique_capture_temp_dir(root);
1235    ensure_private_dir_descendant(root, &temp_dir)?;
1236    let temp_path = unique_temp_path(&temp_dir, "manifest-update");
1237    let mut temp = private_create_new_file(&temp_path)?;
1238    temp.write_all(manifest_bytes).with_context(|| {
1239        format!(
1240            "write raw mirror manifest update temp {}",
1241            temp_path.display()
1242        )
1243    })?;
1244    temp.sync_all().with_context(|| {
1245        format!(
1246            "sync raw mirror manifest update temp {}",
1247            temp_path.display()
1248        )
1249    })?;
1250    drop(temp);
1251
1252    fs::rename(&temp_path, manifest_path).with_context(|| {
1253        format!(
1254            "replace raw mirror manifest {} from {}",
1255            manifest_path.display(),
1256            temp_path.display()
1257        )
1258    })?;
1259    set_private_file_permissions(manifest_path)?;
1260    sync_file(manifest_path)?;
1261    sync_parent(manifest_path)?;
1262    remove_empty_temp_dir_best_effort(&temp_dir);
1263    Ok(())
1264}
1265
1266fn raw_mirror_manifest_path_from_relative(root: &Path, relative_path: &str) -> Result<PathBuf> {
1267    let relative = Path::new(relative_path);
1268    if relative.is_absolute() {
1269        return Err(anyhow!(
1270            "raw mirror manifest path must be relative: {relative_path}"
1271        ));
1272    }
1273
1274    let mut normal_components = Vec::new();
1275    for component in relative.components() {
1276        match component {
1277            std::path::Component::Normal(part) => normal_components.push(part),
1278            _ => {
1279                return Err(anyhow!(
1280                    "raw mirror manifest path must use only normal relative components: {relative_path}"
1281                ));
1282            }
1283        }
1284    }
1285
1286    if normal_components.len() != 2
1287        || normal_components[0] != std::ffi::OsStr::new("manifests")
1288        || Path::new(normal_components[1])
1289            .extension()
1290            .and_then(|ext| ext.to_str())
1291            != Some("json")
1292    {
1293        return Err(anyhow!(
1294            "raw mirror manifest path must match manifests/<id>.json: {relative_path}"
1295        ));
1296    }
1297
1298    Ok(root.join(relative))
1299}
1300
1301fn verify_existing_file(path: &Path, expected_blake3: &str) -> Result<()> {
1302    let metadata = fs::symlink_metadata(path)
1303        .with_context(|| format!("stat raw mirror blob {}", path.display()))?;
1304    if metadata.file_type().is_symlink() {
1305        return Err(anyhow!(
1306            "refusing to read symlink raw mirror blob {}",
1307            path.display()
1308        ));
1309    }
1310    if !metadata.is_file() {
1311        return Err(anyhow!(
1312            "refusing to read non-file raw mirror blob {}",
1313            path.display()
1314        ));
1315    }
1316    let actual = file_blake3(path)?;
1317    if actual == expected_blake3 {
1318        Ok(())
1319    } else {
1320        Err(anyhow!(
1321            "existing raw mirror blob {} has blake3 {}, expected {}",
1322            path.display(),
1323            actual,
1324            expected_blake3
1325        ))
1326    }
1327}
1328
1329fn verify_existing_manifest(path: &Path, expected_blob_blake3: &str) -> Result<()> {
1330    let manifest = read_raw_mirror_manifest(path)?;
1331    if manifest.blob_blake3 == expected_blob_blake3 {
1332        Ok(())
1333    } else {
1334        Err(anyhow!(
1335            "existing raw mirror manifest {} points at blob {}, expected {}",
1336            path.display(),
1337            manifest.blob_blake3,
1338            expected_blob_blake3
1339        ))
1340    }
1341}
1342
1343fn read_raw_mirror_manifest(path: &Path) -> Result<RawMirrorManifestFile> {
1344    let metadata = fs::symlink_metadata(path)
1345        .with_context(|| format!("stat raw mirror manifest {}", path.display()))?;
1346    if metadata.file_type().is_symlink() {
1347        return Err(anyhow!(
1348            "refusing to read symlink raw mirror manifest {}",
1349            path.display()
1350        ));
1351    }
1352    if !metadata.is_file() {
1353        return Err(anyhow!(
1354            "refusing to read non-file raw mirror manifest {}",
1355            path.display()
1356        ));
1357    }
1358    serde_json::from_slice(
1359        &fs::read(path).with_context(|| format!("read raw mirror manifest {}", path.display()))?,
1360    )
1361    .with_context(|| format!("parse raw mirror manifest {}", path.display()))
1362}
1363
1364fn raw_mirror_root(data_dir: &Path) -> PathBuf {
1365    data_dir
1366        .join(RAW_MIRROR_ROOT_DIR)
1367        .join(RAW_MIRROR_VERSION_DIR)
1368}
1369
1370fn ensure_raw_mirror_root(data_dir: &Path) -> Result<PathBuf> {
1371    let root_parent = data_dir.join(RAW_MIRROR_ROOT_DIR);
1372    ensure_private_dir(&root_parent)?;
1373    let root = root_parent.join(RAW_MIRROR_VERSION_DIR);
1374    ensure_private_dir(&root)?;
1375    Ok(root)
1376}
1377
1378fn raw_mirror_blob_cache_key(
1379    input: &RawMirrorCaptureInput<'_>,
1380    source_metadata: &fs::Metadata,
1381) -> RawMirrorBlobCacheKey {
1382    RawMirrorBlobCacheKey {
1383        data_dir: input.data_dir.to_path_buf(),
1384        source_path: input.source_path.to_path_buf(),
1385        source_identity: source_identity_token(source_metadata),
1386        source_size_bytes: source_metadata.len(),
1387        source_mtime_ns: source_metadata.modified().ok().and_then(system_time_to_ns),
1388        source_change_time_ns: source_change_time_ns(source_metadata),
1389    }
1390}
1391
1392fn cached_raw_mirror_blob_record(
1393    key: &RawMirrorBlobCacheKey,
1394    root: &Path,
1395) -> Option<RawMirrorBlobRecord> {
1396    let cache = BLOB_CAPTURE_CACHE.get_or_init(|| Mutex::new(HashMap::new()));
1397    let record = {
1398        let mut guard = cache.lock().ok()?;
1399        let record = guard.get(key).cloned()?;
1400        if raw_mirror_blob_relative_path(&record.blob_blake3).is_none() {
1401            guard.remove(key);
1402            return None;
1403        }
1404        record
1405    };
1406
1407    let blob_relative_path = raw_mirror_blob_relative_path(&record.blob_blake3)?;
1408    let blob_path = root.join(blob_relative_path);
1409    let metadata_valid = fs::symlink_metadata(&blob_path)
1410        .map(|metadata| metadata.is_file() && !metadata.file_type().is_symlink())
1411        .unwrap_or(false);
1412    if !metadata_valid {
1413        remove_cached_raw_mirror_blob_record_if_unchanged(cache, key, &record);
1414        return None;
1415    }
1416
1417    match file_blake3(&blob_path) {
1418        Ok(actual) if actual == record.blob_blake3 => Some(record),
1419        Ok(actual) => {
1420            tracing::warn!(
1421                path = %blob_path.display(),
1422                expected_blake3 = %record.blob_blake3,
1423                actual_blake3 = %actual,
1424                "discarding raw mirror blob cache entry with mismatched content"
1425            );
1426            remove_cached_raw_mirror_blob_record_if_unchanged(cache, key, &record);
1427            None
1428        }
1429        Err(err) => {
1430            tracing::debug!(
1431                path = %blob_path.display(),
1432                error = %err,
1433                "discarding unreadable raw mirror blob cache entry"
1434            );
1435            remove_cached_raw_mirror_blob_record_if_unchanged(cache, key, &record);
1436            None
1437        }
1438    }
1439}
1440
1441fn remove_cached_raw_mirror_blob_record_if_unchanged(
1442    cache: &Mutex<HashMap<RawMirrorBlobCacheKey, RawMirrorBlobRecord>>,
1443    key: &RawMirrorBlobCacheKey,
1444    stale_record: &RawMirrorBlobRecord,
1445) {
1446    if let Ok(mut guard) = cache.lock()
1447        && guard
1448            .get(key)
1449            .is_some_and(|current| current == stale_record)
1450    {
1451        guard.remove(key);
1452    }
1453}
1454
1455fn cache_raw_mirror_blob_record(key: RawMirrorBlobCacheKey, record: RawMirrorBlobRecord) {
1456    let cache = BLOB_CAPTURE_CACHE.get_or_init(|| Mutex::new(HashMap::new()));
1457    if let Ok(mut guard) = cache.lock() {
1458        guard.insert(key, record);
1459    }
1460}
1461
1462fn raw_mirror_blob_relative_path(blob_blake3: &str) -> Option<String> {
1463    if blob_blake3.len() != 64 || !blob_blake3.chars().all(|c| c.is_ascii_hexdigit()) {
1464        return None;
1465    }
1466    let lower = blob_blake3.to_ascii_lowercase();
1467    Some(format!(
1468        "blobs/{}/{}/{}.{}",
1469        RAW_MIRROR_HASH_ALGORITHM,
1470        &lower[..2],
1471        lower,
1472        RAW_MIRROR_BLOB_EXTENSION
1473    ))
1474}
1475
1476fn raw_mirror_manifest_relative_path(manifest_id: &str) -> String {
1477    format!("manifests/{manifest_id}.json")
1478}
1479
1480fn raw_mirror_original_path_blake3(original_path: &str) -> String {
1481    let mut hasher = blake3::Hasher::new();
1482    hasher.update(b"doctor-raw-mirror-original-path-v1");
1483    hasher.update(&[0]);
1484    hasher.update(original_path.as_bytes());
1485    hasher.finalize().to_hex().to_string()
1486}
1487
1488fn raw_mirror_manifest_id(
1489    provider: &str,
1490    source_id: &str,
1491    origin_kind: &str,
1492    origin_host: Option<&str>,
1493    original_path_blake3: &str,
1494    blob_blake3: &str,
1495) -> String {
1496    canonical_blake3(
1497        "doctor-raw-mirror-manifest-id-v1",
1498        json!({
1499            "provider": provider,
1500            "source_id": source_id,
1501            "origin_kind": origin_kind,
1502            "origin_host": origin_host,
1503            "original_path_blake3": original_path_blake3,
1504            "blob_blake3": blob_blake3,
1505        }),
1506    )
1507}
1508
1509fn raw_mirror_manifest_blake3(manifest: &RawMirrorManifestFile) -> String {
1510    let mut value = serde_json::to_value(manifest).unwrap_or_default();
1511    if let Value::Object(map) = &mut value {
1512        map.remove("manifest_blake3");
1513    }
1514    canonical_blake3("doctor-raw-mirror-manifest-v1", value)
1515}
1516
1517fn canonical_blake3(prefix: &str, value: Value) -> String {
1518    let encoded = serde_json::to_vec(&canonical_json_value(value)).unwrap_or_default();
1519    let mut hasher = blake3::Hasher::new();
1520    hasher.update(prefix.as_bytes());
1521    hasher.update(&[0]);
1522    hasher.update(&encoded);
1523    format!("{prefix}-{}", hasher.finalize().to_hex())
1524}
1525
1526fn canonical_json_value(value: Value) -> Value {
1527    match value {
1528        Value::Array(items) => Value::Array(items.into_iter().map(canonical_json_value).collect()),
1529        Value::Object(map) => {
1530            let mut entries: Vec<_> = map.into_iter().collect();
1531            entries.sort_by(|left, right| left.0.cmp(&right.0));
1532            let mut canonical = serde_json::Map::new();
1533            for (key, value) in entries {
1534                canonical.insert(key, canonical_json_value(value));
1535            }
1536            Value::Object(canonical)
1537        }
1538        other => other,
1539    }
1540}
1541
1542fn unique_db_links(links: &[RawMirrorDbLink]) -> Vec<RawMirrorDbLink> {
1543    let mut dedup = links.to_vec();
1544    dedup.sort_by(|left, right| {
1545        (
1546            left.conversation_id,
1547            left.message_count,
1548            left.started_at_ms,
1549            left.source_path.as_deref().unwrap_or(""),
1550        )
1551            .cmp(&(
1552                right.conversation_id,
1553                right.message_count,
1554                right.started_at_ms,
1555                right.source_path.as_deref().unwrap_or(""),
1556            ))
1557    });
1558    dedup.dedup();
1559    dedup
1560}
1561
1562fn file_blake3(path: &Path) -> Result<String> {
1563    let mut file = File::open(path).with_context(|| format!("open {}", path.display()))?;
1564    let mut hasher = blake3::Hasher::new();
1565    let mut buffer = [0u8; 64 * 1024];
1566    loop {
1567        let read = file
1568            .read(&mut buffer)
1569            .with_context(|| format!("read {}", path.display()))?;
1570        if read == 0 {
1571            break;
1572        }
1573        hasher.update(&buffer[..read]);
1574    }
1575    Ok(hasher.finalize().to_hex().to_string())
1576}
1577
1578fn ensure_private_dir(path: &Path) -> Result<()> {
1579    create_private_dir_all(path)
1580        .with_context(|| format!("create raw mirror dir {}", path.display()))?;
1581    let metadata = fs::symlink_metadata(path)
1582        .with_context(|| format!("stat raw mirror dir {}", path.display()))?;
1583    let file_type = metadata.file_type();
1584    if file_type.is_symlink() {
1585        return Err(anyhow!(
1586            "refusing to use symlink raw mirror dir {}",
1587            path.display()
1588        ));
1589    }
1590    if !file_type.is_dir() {
1591        return Err(anyhow!(
1592            "refusing to use non-directory raw mirror path {}",
1593            path.display()
1594        ));
1595    }
1596    #[cfg(unix)]
1597    {
1598        use std::os::unix::fs::PermissionsExt;
1599        if metadata.permissions().mode() & 0o777 != 0o700 {
1600            set_private_dir_permissions(path)?;
1601        }
1602    }
1603    #[cfg(not(unix))]
1604    {
1605        set_private_dir_permissions(path)?;
1606    }
1607    Ok(())
1608}
1609
1610fn ensure_private_dir_descendant(root: &Path, path: &Path) -> Result<()> {
1611    let relative = path.strip_prefix(root).with_context(|| {
1612        format!(
1613            "raw mirror private dir {} is not under root {}",
1614            path.display(),
1615            root.display()
1616        )
1617    })?;
1618
1619    if let Some(root_parent) = root.parent() {
1620        ensure_private_dir(root_parent)?;
1621    }
1622    ensure_private_dir(root)?;
1623    let mut current = root.to_path_buf();
1624    for component in relative.components() {
1625        match component {
1626            Component::Normal(part) => {
1627                current.push(part);
1628                ensure_private_dir(&current)?;
1629            }
1630            Component::CurDir => {}
1631            _ => {
1632                return Err(anyhow!(
1633                    "raw mirror private dir contains non-normal component: {}",
1634                    path.display()
1635                ));
1636            }
1637        }
1638    }
1639
1640    Ok(())
1641}
1642
1643fn private_create_new_file(path: &Path) -> Result<File> {
1644    let mut options = OpenOptions::new();
1645    options.write(true).create_new(true);
1646    set_private_create_file_mode(&mut options);
1647    let file = options
1648        .open(path)
1649        .with_context(|| format!("create raw mirror file {}", path.display()))?;
1650    Ok(file)
1651}
1652
1653#[cfg(unix)]
1654fn create_private_dir_all(path: &Path) -> std::io::Result<()> {
1655    use std::os::unix::fs::DirBuilderExt;
1656
1657    let mut builder = fs::DirBuilder::new();
1658    builder.recursive(true).mode(0o700).create(path)
1659}
1660
1661#[cfg(not(unix))]
1662fn create_private_dir_all(path: &Path) -> std::io::Result<()> {
1663    fs::create_dir_all(path)
1664}
1665
1666#[cfg(unix)]
1667fn set_private_create_file_mode(options: &mut OpenOptions) {
1668    use std::os::unix::fs::OpenOptionsExt;
1669
1670    options.mode(0o600);
1671}
1672
1673#[cfg(not(unix))]
1674fn set_private_create_file_mode(_options: &mut OpenOptions) {}
1675
1676fn sync_file(path: &Path) -> Result<()> {
1677    let mut options = OpenOptions::new();
1678    options.read(true);
1679    #[cfg(windows)]
1680    options.write(true);
1681    options
1682        .open(path)
1683        .and_then(|file| file.sync_all())
1684        .with_context(|| format!("sync raw mirror file {}", path.display()))
1685}
1686
1687#[cfg(not(windows))]
1688fn sync_parent(path: &Path) -> Result<()> {
1689    let Some(parent) = path.parent() else {
1690        return Ok(());
1691    };
1692    File::open(parent)
1693        .and_then(|file| file.sync_all())
1694        .with_context(|| format!("sync raw mirror parent {}", parent.display()))
1695}
1696
1697#[cfg(windows)]
1698fn sync_parent(_path: &Path) -> Result<()> {
1699    Ok(())
1700}
1701
1702fn unique_temp_path(dir: &Path, label: &str) -> PathBuf {
1703    let nonce = TEMP_NONCE.fetch_add(1, Ordering::Relaxed);
1704    let nanos = SystemTime::now()
1705        .duration_since(UNIX_EPOCH)
1706        .unwrap_or_default()
1707        .as_nanos();
1708    dir.join(format!(
1709        ".{label}.{}.{}.{}.tmp",
1710        std::process::id(),
1711        nanos,
1712        nonce
1713    ))
1714}
1715
1716fn unique_capture_temp_dir(root: &Path) -> PathBuf {
1717    let nonce = TEMP_NONCE.fetch_add(1, Ordering::Relaxed);
1718    let nanos = SystemTime::now()
1719        .duration_since(UNIX_EPOCH)
1720        .unwrap_or_default()
1721        .as_nanos();
1722    root.join("tmp").join(format!(
1723        "capture.{}.{}.{}",
1724        std::process::id(),
1725        nanos,
1726        nonce
1727    ))
1728}
1729
1730fn remove_temp_best_effort(path: &Path) {
1731    if let Err(err) = fs::remove_file(path) {
1732        tracing::debug!(
1733            path = %path.display(),
1734            error = %err,
1735            "failed to remove raw mirror temp file"
1736        );
1737    }
1738}
1739
1740fn remove_empty_temp_dir_best_effort(path: &Path) {
1741    if let Err(err) = fs::remove_dir(path) {
1742        tracing::debug!(
1743            path = %path.display(),
1744            error = %err,
1745            "failed to remove raw mirror temp directory"
1746        );
1747    }
1748}
1749
1750fn redacted_original_path(provider: &str, source_path: &Path) -> String {
1751    let file_name = source_path
1752        .file_name()
1753        .and_then(|name| name.to_str())
1754        .unwrap_or("session");
1755    format!("[{provider}]/{file_name}")
1756}
1757
1758fn now_ms() -> i64 {
1759    system_time_to_ms(SystemTime::now()).unwrap_or(0)
1760}
1761
1762fn system_time_to_ms(time: SystemTime) -> Option<i64> {
1763    time.duration_since(UNIX_EPOCH)
1764        .ok()
1765        .and_then(|duration| i64::try_from(duration.as_millis()).ok())
1766}
1767
1768fn system_time_to_ns(time: SystemTime) -> Option<u128> {
1769    time.duration_since(UNIX_EPOCH)
1770        .ok()
1771        .map(|duration| duration.as_nanos())
1772}
1773
1774#[cfg(unix)]
1775fn set_private_dir_permissions(path: &Path) -> Result<()> {
1776    use std::os::unix::fs::PermissionsExt;
1777    fs::set_permissions(path, fs::Permissions::from_mode(0o700))
1778        .with_context(|| format!("set raw mirror dir permissions {}", path.display()))
1779}
1780
1781#[cfg(not(unix))]
1782fn set_private_dir_permissions(_path: &Path) -> Result<()> {
1783    Ok(())
1784}
1785
1786#[cfg(unix)]
1787fn set_private_file_permissions(path: &Path) -> Result<()> {
1788    use std::os::unix::fs::PermissionsExt;
1789    fs::set_permissions(path, fs::Permissions::from_mode(0o600))
1790        .with_context(|| format!("set raw mirror file permissions {}", path.display()))
1791}
1792
1793#[cfg(not(unix))]
1794fn set_private_file_permissions(_path: &Path) -> Result<()> {
1795    Ok(())
1796}
1797
1798#[cfg(test)]
1799mod tests {
1800    use super::*;
1801
1802    #[test]
1803    fn capture_source_file_writes_doctor_compatible_manifest_idempotently() {
1804        let temp = tempfile::TempDir::new().expect("tempdir");
1805        let data_dir = temp.path().join("cass-data");
1806        let source_path = temp.path().join("rollout-fixture.jsonl");
1807        let source_bytes = b"{\"type\":\"message\",\"text\":\"hello\"}\n";
1808        fs::write(&source_path, source_bytes).expect("write source");
1809        let db_link = RawMirrorDbLink {
1810            conversation_id: Some(42),
1811            message_count: Some(1),
1812            source_path: Some(source_path.display().to_string()),
1813            started_at_ms: Some(1_733_000_000_000),
1814        };
1815
1816        let first = capture_source_file(RawMirrorCaptureInput {
1817            data_dir: &data_dir,
1818            provider: "codex",
1819            source_id: "local",
1820            origin_kind: "local",
1821            origin_host: None,
1822            source_path: &source_path,
1823            db_links: std::slice::from_ref(&db_link),
1824        })
1825        .expect("first capture");
1826        let second = capture_source_file(RawMirrorCaptureInput {
1827            data_dir: &data_dir,
1828            provider: "codex",
1829            source_id: "local",
1830            origin_kind: "local",
1831            origin_host: None,
1832            source_path: &source_path,
1833            db_links: std::slice::from_ref(&db_link),
1834        })
1835        .expect("second capture");
1836
1837        assert_eq!(first.manifest_id, second.manifest_id);
1838        assert_eq!(first.blob_blake3, second.blob_blake3);
1839        assert_eq!(first.captured_at_ms, second.captured_at_ms);
1840        assert_eq!(first.source_mtime_ms, second.source_mtime_ms);
1841        assert!(!first.already_present);
1842        assert!(second.already_present);
1843        assert_eq!(fs::read(&source_path).expect("source bytes"), source_bytes);
1844
1845        let blob_path = data_dir
1846            .join(RAW_MIRROR_ROOT_DIR)
1847            .join(RAW_MIRROR_VERSION_DIR)
1848            .join(&first.blob_relative_path);
1849        let manifest_path = data_dir
1850            .join(RAW_MIRROR_ROOT_DIR)
1851            .join(RAW_MIRROR_VERSION_DIR)
1852            .join(&first.manifest_relative_path);
1853        assert_eq!(fs::read(blob_path).expect("blob bytes"), source_bytes);
1854
1855        let manifest: Value =
1856            serde_json::from_slice(&fs::read(&manifest_path).expect("manifest bytes"))
1857                .expect("manifest json");
1858        assert_eq!(
1859            manifest["manifest_kind"].as_str(),
1860            Some(RAW_MIRROR_MANIFEST_KIND)
1861        );
1862        assert_eq!(manifest["provider"].as_str(), Some("codex"));
1863        assert_eq!(
1864            manifest["blob_blake3"].as_str(),
1865            Some(first.blob_blake3.as_str())
1866        );
1867        assert_eq!(
1868            manifest["redacted_original_path"].as_str(),
1869            Some("[codex]/rollout-fixture.jsonl")
1870        );
1871        assert_eq!(
1872            manifest["db_links"][0]["conversation_id"].as_i64(),
1873            Some(42)
1874        );
1875        assert_eq!(manifest["db_links"][0]["message_count"].as_u64(), Some(1));
1876        assert!(
1877            manifest["manifest_blake3"]
1878                .as_str()
1879                .is_some_and(|value| value.starts_with("doctor-raw-mirror-manifest-v1-"))
1880        );
1881        let tmp_root = data_dir
1882            .join(RAW_MIRROR_ROOT_DIR)
1883            .join(RAW_MIRROR_VERSION_DIR)
1884            .join("tmp");
1885        assert_eq!(
1886            fs::read_dir(&tmp_root)
1887                .expect("raw mirror tmp root")
1888                .collect::<Vec<_>>()
1889                .len(),
1890            0,
1891            "successful captures must not leave doctor-visible interrupted temp artifacts"
1892        );
1893
1894        #[cfg(unix)]
1895        {
1896            use std::os::unix::fs::PermissionsExt;
1897
1898            let root = data_dir
1899                .join(RAW_MIRROR_ROOT_DIR)
1900                .join(RAW_MIRROR_VERSION_DIR);
1901            assert_eq!(
1902                fs::metadata(&root)
1903                    .expect("raw mirror root metadata")
1904                    .permissions()
1905                    .mode()
1906                    & 0o777,
1907                0o700
1908            );
1909            assert_eq!(
1910                fs::metadata(&manifest_path)
1911                    .expect("manifest metadata")
1912                    .permissions()
1913                    .mode()
1914                    & 0o777,
1915                0o600
1916            );
1917        }
1918    }
1919
1920    #[test]
1921    fn capture_source_file_merges_db_links_into_existing_manifest() {
1922        let temp = tempfile::TempDir::new().expect("tempdir");
1923        let data_dir = temp.path().join("cass-data");
1924        let source_path = temp.path().join("preparse-then-parsed.jsonl");
1925        let source_bytes = b"{\"type\":\"message\",\"text\":\"hello\"}\n";
1926        fs::write(&source_path, source_bytes).expect("write source");
1927
1928        let preparse = capture_source_file(RawMirrorCaptureInput {
1929            data_dir: &data_dir,
1930            provider: "codex",
1931            source_id: "local",
1932            origin_kind: "local",
1933            origin_host: None,
1934            source_path: &source_path,
1935            db_links: &[],
1936        })
1937        .expect("preparse capture");
1938
1939        let parsed_link = RawMirrorDbLink {
1940            conversation_id: None,
1941            message_count: Some(1),
1942            source_path: Some(source_path.display().to_string()),
1943            started_at_ms: Some(1_733_000_000_000),
1944        };
1945        let parsed = capture_source_file(RawMirrorCaptureInput {
1946            data_dir: &data_dir,
1947            provider: "codex",
1948            source_id: "local",
1949            origin_kind: "local",
1950            origin_host: None,
1951            source_path: &source_path,
1952            db_links: std::slice::from_ref(&parsed_link),
1953        })
1954        .expect("parsed capture");
1955
1956        assert_eq!(preparse.manifest_id, parsed.manifest_id);
1957        assert_eq!(preparse.blob_blake3, parsed.blob_blake3);
1958        assert!(parsed.already_present);
1959
1960        let manifest_path = data_dir
1961            .join(RAW_MIRROR_ROOT_DIR)
1962            .join(RAW_MIRROR_VERSION_DIR)
1963            .join(&parsed.manifest_relative_path);
1964        let manifest = read_raw_mirror_manifest(&manifest_path).expect("merged manifest");
1965        assert_eq!(
1966            manifest.db_links,
1967            vec![parsed_link],
1968            "second capture must enrich the pre-parse manifest with DB-link evidence"
1969        );
1970        let expected_manifest_blake3 = raw_mirror_manifest_blake3(&manifest);
1971        assert_eq!(
1972            manifest.manifest_blake3.as_deref(),
1973            Some(expected_manifest_blake3.as_str()),
1974            "manifest checksum must be recomputed after DB-link merge"
1975        );
1976        assert_eq!(fs::read(&source_path).expect("source bytes"), source_bytes);
1977    }
1978
1979    #[test]
1980    fn merge_manifest_db_links_rejects_hostile_relative_paths() {
1981        let temp = tempfile::TempDir::new().expect("tempdir");
1982        let data_dir = temp.path().join("cass-data");
1983        let db_link = RawMirrorDbLink {
1984            conversation_id: Some(42),
1985            message_count: Some(1),
1986            source_path: Some("source.jsonl".to_string()),
1987            started_at_ms: Some(1_733_000_000_000),
1988        };
1989
1990        for relative in [
1991            "../escape.json",
1992            "/tmp/escape.json",
1993            "manifests/../escape.json",
1994            "blobs/blake3/ab/not-a-manifest.raw",
1995            "manifests/not-json.txt",
1996        ] {
1997            let err = merge_manifest_db_links(&data_dir, relative, std::slice::from_ref(&db_link))
1998                .expect_err("hostile manifest path should be rejected");
1999            assert!(
2000                err.to_string().contains("raw mirror manifest path"),
2001                "unexpected error for {relative}: {err}"
2002            );
2003        }
2004    }
2005
2006    #[cfg(unix)]
2007    #[test]
2008    fn merge_manifest_db_links_rejects_symlink_manifest_path() {
2009        let temp = tempfile::TempDir::new().expect("tempdir");
2010        let data_dir = temp.path().join("cass-data");
2011        let manifest_dir = data_dir.join("raw-mirror/v1/manifests");
2012        fs::create_dir_all(&manifest_dir).expect("manifest dir");
2013        let outside = temp.path().join("outside.json");
2014        fs::write(&outside, "{}").expect("outside manifest");
2015        std::os::unix::fs::symlink(&outside, manifest_dir.join("link.json"))
2016            .expect("symlink manifest");
2017        let db_link = RawMirrorDbLink {
2018            conversation_id: Some(42),
2019            message_count: Some(1),
2020            source_path: Some("source.jsonl".to_string()),
2021            started_at_ms: Some(1_733_000_000_000),
2022        };
2023
2024        let err = merge_manifest_db_links(
2025            &data_dir,
2026            "manifests/link.json",
2027            std::slice::from_ref(&db_link),
2028        )
2029        .expect_err("symlink manifest should be rejected");
2030        assert!(
2031            err.to_string().contains("symlink raw mirror manifest"),
2032            "unexpected symlink-manifest error: {err}"
2033        );
2034    }
2035
2036    #[test]
2037    fn capture_source_file_deduplicates_blob_for_distinct_source_paths() {
2038        let temp = tempfile::TempDir::new().expect("tempdir");
2039        let data_dir = temp.path().join("cass-data");
2040        let first_source = temp.path().join("first.jsonl");
2041        let second_source = temp.path().join("second.jsonl");
2042        let source_bytes = b"{\"type\":\"message\",\"text\":\"shared\"}\n";
2043        fs::write(&first_source, source_bytes).expect("write first source");
2044        fs::write(&second_source, source_bytes).expect("write second source");
2045
2046        let first = capture_source_file(RawMirrorCaptureInput {
2047            data_dir: &data_dir,
2048            provider: "codex",
2049            source_id: "local",
2050            origin_kind: "local",
2051            origin_host: None,
2052            source_path: &first_source,
2053            db_links: &[],
2054        })
2055        .expect("first capture");
2056        let second = capture_source_file(RawMirrorCaptureInput {
2057            data_dir: &data_dir,
2058            provider: "codex",
2059            source_id: "local",
2060            origin_kind: "local",
2061            origin_host: None,
2062            source_path: &second_source,
2063            db_links: &[],
2064        })
2065        .expect("second capture");
2066
2067        assert_eq!(first.blob_blake3, second.blob_blake3);
2068        assert_eq!(first.blob_relative_path, second.blob_relative_path);
2069        assert_ne!(first.manifest_id, second.manifest_id);
2070        assert!(
2071            !second.already_present,
2072            "a duplicate blob with a new source manifest is not a full capture replay"
2073        );
2074
2075        let manifest_root = data_dir
2076            .join(RAW_MIRROR_ROOT_DIR)
2077            .join(RAW_MIRROR_VERSION_DIR)
2078            .join("manifests");
2079        let manifests = fs::read_dir(manifest_root)
2080            .expect("manifest dir")
2081            .collect::<std::io::Result<Vec<_>>>()
2082            .expect("manifest entries");
2083        assert_eq!(manifests.len(), 2);
2084
2085        let summary = storage_summary(&data_dir);
2086        assert!(summary.initialized);
2087        assert_eq!(summary.manifest_count, 2);
2088        assert_eq!(summary.unique_blob_count, 1);
2089        assert_eq!(summary.total_blob_bytes, source_bytes.len() as u64);
2090        assert_eq!(summary.largest_blob_bytes, source_bytes.len() as u64);
2091        assert_eq!(summary.missing_blob_count, 0);
2092        assert_eq!(summary.invalid_manifest_count, 0);
2093        assert!(summary.total_storage_bytes >= source_bytes.len() as u64);
2094    }
2095
2096    #[test]
2097    fn storage_summary_rejects_hostile_blob_relative_path() {
2098        let temp = tempfile::TempDir::new().expect("tempdir");
2099        let data_dir = temp.path().join("cass-data");
2100        let source_path = temp.path().join("source.jsonl");
2101        fs::write(
2102            &source_path,
2103            b"{\"type\":\"message\",\"text\":\"hostile\"}\n",
2104        )
2105        .expect("write source");
2106
2107        let captured = capture_source_file(RawMirrorCaptureInput {
2108            data_dir: &data_dir,
2109            provider: "codex",
2110            source_id: "local",
2111            origin_kind: "local",
2112            origin_host: None,
2113            source_path: &source_path,
2114            db_links: &[],
2115        })
2116        .expect("capture source");
2117        let manifest_path = data_dir
2118            .join(RAW_MIRROR_ROOT_DIR)
2119            .join(RAW_MIRROR_VERSION_DIR)
2120            .join(&captured.manifest_relative_path);
2121        let mut manifest = read_raw_mirror_manifest(&manifest_path).expect("read manifest");
2122        manifest.blob_relative_path = "../outside.raw".to_string();
2123        manifest.manifest_blake3 = Some(raw_mirror_manifest_blake3(&manifest));
2124        fs::write(
2125            &manifest_path,
2126            serde_json::to_vec_pretty(&manifest).expect("serialize manifest"),
2127        )
2128        .expect("tamper manifest");
2129
2130        let summary = storage_summary(&data_dir);
2131        assert_eq!(summary.manifest_count, 1);
2132        assert_eq!(summary.invalid_manifest_count, 1);
2133        assert_eq!(summary.unique_blob_count, 0);
2134        assert_eq!(summary.total_blob_bytes, 0);
2135    }
2136
2137    #[test]
2138    fn prune_fails_closed_on_hostile_manifest_inventory() {
2139        let temp = tempfile::TempDir::new().expect("tempdir");
2140        let data_dir = temp.path().join("cass-data");
2141        let source_path = temp.path().join("source.jsonl");
2142        fs::write(
2143            &source_path,
2144            b"{\"type\":\"message\",\"text\":\"hostile\"}\n",
2145        )
2146        .expect("write source");
2147
2148        let captured = capture_source_file(RawMirrorCaptureInput {
2149            data_dir: &data_dir,
2150            provider: "codex",
2151            source_id: "local",
2152            origin_kind: "local",
2153            origin_host: None,
2154            source_path: &source_path,
2155            db_links: &[],
2156        })
2157        .expect("capture source");
2158        let root = data_dir
2159            .join(RAW_MIRROR_ROOT_DIR)
2160            .join(RAW_MIRROR_VERSION_DIR);
2161        let manifest_path = root.join(&captured.manifest_relative_path);
2162        let blob_path = root.join(&captured.blob_relative_path);
2163        let mut manifest = read_raw_mirror_manifest(&manifest_path).expect("read manifest");
2164        manifest.blob_relative_path = "../outside.raw".to_string();
2165        manifest.manifest_blake3 = Some(raw_mirror_manifest_blake3(&manifest));
2166        fs::write(
2167            &manifest_path,
2168            serde_json::to_vec_pretty(&manifest).expect("serialize manifest"),
2169        )
2170        .expect("tamper manifest");
2171
2172        let err = prune(
2173            &data_dir,
2174            RawMirrorPruneOptions {
2175                older_than_ms: Some(0),
2176                max_size_bytes: None,
2177                keep_tags: Vec::new(),
2178                safety_hold_down_ms: 0,
2179                apply: true,
2180            },
2181        )
2182        .expect_err("hostile inventory should fail closed");
2183
2184        assert!(
2185            err.to_string().contains("unexpected blob path"),
2186            "error should explain the unsafe manifest inventory: {err}"
2187        );
2188        assert!(manifest_path.exists());
2189        assert!(blob_path.exists());
2190        assert!(!root.join("pruned.jsonl").exists());
2191    }
2192
2193    #[test]
2194    fn prune_dry_run_audits_without_removing_manifest_or_blob() {
2195        let temp = tempfile::TempDir::new().expect("tempdir");
2196        let data_dir = temp.path().join("cass-data");
2197        let source_path = temp.path().join("source.jsonl");
2198        fs::write(&source_path, b"{\"type\":\"message\",\"text\":\"old\"}\n")
2199            .expect("write source");
2200        let captured = capture_source_file(RawMirrorCaptureInput {
2201            data_dir: &data_dir,
2202            provider: "codex",
2203            source_id: "local",
2204            origin_kind: "local",
2205            origin_host: None,
2206            source_path: &source_path,
2207            db_links: &[],
2208        })
2209        .expect("capture source");
2210
2211        let report = prune(
2212            &data_dir,
2213            RawMirrorPruneOptions {
2214                older_than_ms: Some(0),
2215                max_size_bytes: None,
2216                keep_tags: Vec::new(),
2217                safety_hold_down_ms: 0,
2218                apply: false,
2219            },
2220        )
2221        .expect("dry-run prune");
2222
2223        assert!(report.initialized);
2224        assert_eq!(report.mode, "dry-run");
2225        assert_eq!(report.planned_manifest_count, 1);
2226        assert_eq!(report.planned_blob_count, 1);
2227        assert_eq!(report.applied_reclaim_bytes, 0);
2228        let root = data_dir
2229            .join(RAW_MIRROR_ROOT_DIR)
2230            .join(RAW_MIRROR_VERSION_DIR);
2231        assert!(root.join(&captured.manifest_relative_path).exists());
2232        assert!(root.join(&captured.blob_relative_path).exists());
2233        let audit_path = root.join("pruned.jsonl");
2234        let audit = fs::read_to_string(audit_path).expect("read audit");
2235        assert!(audit.contains("\"mode\":\"dry-run\""));
2236        assert!(audit.contains("\"applied\":false"));
2237    }
2238
2239    #[test]
2240    #[cfg(unix)]
2241    fn prune_refuses_symlinked_audit_log_without_writing_target() -> Result<()> {
2242        use std::os::unix::fs::symlink;
2243
2244        let temp = tempfile::TempDir::new()?;
2245        let data_dir = temp.path().join("cass-data");
2246        let source_path = temp.path().join("source.jsonl");
2247        let protected_audit_target = temp.path().join("protected-prune-audit.jsonl");
2248        fs::write(&source_path, b"{\"type\":\"message\",\"text\":\"old\"}\n")?;
2249        fs::write(&protected_audit_target, b"protected\n")?;
2250
2251        let captured = capture_source_file(RawMirrorCaptureInput {
2252            data_dir: &data_dir,
2253            provider: "codex",
2254            source_id: "local",
2255            origin_kind: "local",
2256            origin_host: None,
2257            source_path: &source_path,
2258            db_links: &[],
2259        })?;
2260        let root = data_dir
2261            .join(RAW_MIRROR_ROOT_DIR)
2262            .join(RAW_MIRROR_VERSION_DIR);
2263        let audit_path = root.join("pruned.jsonl");
2264        symlink(&protected_audit_target, &audit_path)?;
2265
2266        let err = match prune(
2267            &data_dir,
2268            RawMirrorPruneOptions {
2269                older_than_ms: Some(0),
2270                max_size_bytes: None,
2271                keep_tags: Vec::new(),
2272                safety_hold_down_ms: 0,
2273                apply: false,
2274            },
2275        ) {
2276            Ok(_) => anyhow::bail!("symlinked prune audit log was accepted"),
2277            Err(err) => err,
2278        };
2279
2280        if !err.to_string().contains("prune audit through symlink") {
2281            anyhow::bail!("unexpected audit symlink error: {err:#}");
2282        }
2283        if !fs::read(&protected_audit_target)?
2284            .as_slice()
2285            .eq(b"protected\n")
2286        {
2287            anyhow::bail!("protected audit target was modified");
2288        }
2289        if !fs::read_link(&audit_path)?
2290            .as_os_str()
2291            .eq(protected_audit_target.as_os_str())
2292        {
2293            anyhow::bail!("audit path did not remain a symlink to the protected target");
2294        }
2295        if !root.join(&captured.manifest_relative_path).exists() {
2296            anyhow::bail!("failed audit append removed the captured manifest");
2297        }
2298        if !root.join(&captured.blob_relative_path).exists() {
2299            anyhow::bail!("failed audit append removed the captured blob");
2300        }
2301        Ok(())
2302    }
2303
2304    #[test]
2305    fn prune_apply_removes_selected_manifest_and_unreferenced_blob() {
2306        let temp = tempfile::TempDir::new().expect("tempdir");
2307        let data_dir = temp.path().join("cass-data");
2308        let source_path = temp.path().join("source.jsonl");
2309        fs::write(&source_path, b"{\"type\":\"message\",\"text\":\"apply\"}\n")
2310            .expect("write source");
2311        let captured = capture_source_file(RawMirrorCaptureInput {
2312            data_dir: &data_dir,
2313            provider: "codex",
2314            source_id: "local",
2315            origin_kind: "local",
2316            origin_host: None,
2317            source_path: &source_path,
2318            db_links: &[],
2319        })
2320        .expect("capture source");
2321        let root = data_dir
2322            .join(RAW_MIRROR_ROOT_DIR)
2323            .join(RAW_MIRROR_VERSION_DIR);
2324        let manifest_path = root.join(&captured.manifest_relative_path);
2325        let blob_path = root.join(&captured.blob_relative_path);
2326
2327        let report = prune(
2328            &data_dir,
2329            RawMirrorPruneOptions {
2330                older_than_ms: Some(0),
2331                max_size_bytes: None,
2332                keep_tags: Vec::new(),
2333                safety_hold_down_ms: 0,
2334                apply: true,
2335            },
2336        )
2337        .expect("apply prune");
2338
2339        assert_eq!(report.applied_manifest_count, 1);
2340        assert_eq!(report.applied_blob_count, 1);
2341        assert!(!manifest_path.exists());
2342        assert!(!blob_path.exists());
2343        let audit = fs::read_to_string(root.join("pruned.jsonl")).expect("read audit");
2344        assert!(audit.contains("\"mode\":\"apply\""));
2345        assert!(audit.contains("\"applied\":true"));
2346    }
2347
2348    #[test]
2349    fn prune_apply_keeps_blob_referenced_by_retained_manifest() {
2350        let temp = tempfile::TempDir::new().expect("tempdir");
2351        let data_dir = temp.path().join("cass-data");
2352        let first_source = temp.path().join("first.jsonl");
2353        let second_source = temp.path().join("second.jsonl");
2354        let bytes = b"{\"type\":\"message\",\"text\":\"shared-retained\"}\n";
2355        fs::write(&first_source, bytes).expect("write first");
2356        fs::write(&second_source, bytes).expect("write second");
2357        let first = capture_source_file(RawMirrorCaptureInput {
2358            data_dir: &data_dir,
2359            provider: "codex",
2360            source_id: "local",
2361            origin_kind: "local",
2362            origin_host: None,
2363            source_path: &first_source,
2364            db_links: &[],
2365        })
2366        .expect("capture first");
2367        let second = capture_source_file(RawMirrorCaptureInput {
2368            data_dir: &data_dir,
2369            provider: "codex",
2370            source_id: "local",
2371            origin_kind: "local",
2372            origin_host: None,
2373            source_path: &second_source,
2374            db_links: &[],
2375        })
2376        .expect("capture second");
2377        let root = data_dir
2378            .join(RAW_MIRROR_ROOT_DIR)
2379            .join(RAW_MIRROR_VERSION_DIR);
2380        let first_manifest_path = root.join(&first.manifest_relative_path);
2381        let second_manifest_path = root.join(&second.manifest_relative_path);
2382        let mut first_manifest =
2383            read_raw_mirror_manifest(&first_manifest_path).expect("first manifest");
2384        first_manifest.captured_at_ms = now_ms().saturating_sub(2 * 86_400_000);
2385        first_manifest.manifest_blake3 = Some(raw_mirror_manifest_blake3(&first_manifest));
2386        fs::write(
2387            &first_manifest_path,
2388            serde_json::to_vec_pretty(&first_manifest).expect("serialize first manifest"),
2389        )
2390        .expect("rewrite first manifest");
2391
2392        let report = prune(
2393            &data_dir,
2394            RawMirrorPruneOptions {
2395                older_than_ms: Some(86_400_000),
2396                max_size_bytes: None,
2397                keep_tags: Vec::new(),
2398                safety_hold_down_ms: 0,
2399                apply: true,
2400            },
2401        )
2402        .expect("apply one-manifest prune");
2403
2404        assert_eq!(report.applied_manifest_count, 1);
2405        assert_eq!(report.applied_blob_count, 0);
2406        assert!(!first_manifest_path.exists());
2407        assert!(second_manifest_path.exists());
2408        assert!(
2409            root.join(&first.blob_relative_path).exists(),
2410            "shared blob must stay while a retained manifest still references it"
2411        );
2412    }
2413
2414    #[test]
2415    fn prune_apply_keep_tag_pins_linked_manifest_and_blob() {
2416        use frankensqlite::compat::ConnectionExt as _;
2417
2418        let temp = tempfile::TempDir::new().expect("tempdir");
2419        let data_dir = temp.path().join("cass-data");
2420        std::fs::create_dir_all(&data_dir).expect("create data dir");
2421        let source_path = temp.path().join("tagged.jsonl");
2422        fs::write(
2423            &source_path,
2424            b"{\"type\":\"message\",\"text\":\"tagged\"}\n",
2425        )
2426        .expect("write source");
2427        let db_link = RawMirrorDbLink {
2428            conversation_id: Some(7),
2429            message_count: Some(1),
2430            source_path: Some(source_path.display().to_string()),
2431            started_at_ms: Some(1_733_000_000_000),
2432        };
2433        let captured = capture_source_file(RawMirrorCaptureInput {
2434            data_dir: &data_dir,
2435            provider: "codex",
2436            source_id: "local",
2437            origin_kind: "local",
2438            origin_host: None,
2439            source_path: &source_path,
2440            db_links: std::slice::from_ref(&db_link),
2441        })
2442        .expect("capture source");
2443        let db_path = data_dir.join("agent_search.db");
2444        let conn = frankensqlite::Connection::open(db_path.to_string_lossy().into_owned())
2445            .expect("open keep-tag db");
2446        conn.execute_compat(
2447            "CREATE TABLE tags (id INTEGER PRIMARY KEY, name TEXT NOT NULL UNIQUE)",
2448            frankensqlite::params![],
2449        )
2450        .expect("create tags");
2451        conn.execute_compat(
2452            "CREATE TABLE conversation_tags (conversation_id INTEGER NOT NULL, tag_id INTEGER NOT NULL, PRIMARY KEY (conversation_id, tag_id))",
2453            frankensqlite::params![],
2454        )
2455        .expect("create conversation_tags");
2456        conn.execute_compat(
2457            "INSERT INTO tags (id, name) VALUES (1, 'keep')",
2458            frankensqlite::params![],
2459        )
2460        .expect("insert tag");
2461        conn.execute_compat(
2462            "INSERT INTO conversation_tags (conversation_id, tag_id) VALUES (7, 1)",
2463            frankensqlite::params![],
2464        )
2465        .expect("insert conversation tag");
2466        drop(conn);
2467
2468        let report = prune(
2469            &data_dir,
2470            RawMirrorPruneOptions {
2471                older_than_ms: Some(0),
2472                max_size_bytes: Some(0),
2473                keep_tags: vec!["keep".to_string()],
2474                safety_hold_down_ms: 0,
2475                apply: true,
2476            },
2477        )
2478        .expect("keep-tag prune");
2479
2480        let root = data_dir
2481            .join(RAW_MIRROR_ROOT_DIR)
2482            .join(RAW_MIRROR_VERSION_DIR);
2483        assert_eq!(report.pinned_manifest_count, 1);
2484        assert_eq!(report.pinned_blob_count, 1);
2485        assert_eq!(report.planned_manifest_count, 0);
2486        assert_eq!(report.planned_blob_count, 0);
2487        assert!(root.join(&captured.manifest_relative_path).exists());
2488        assert!(root.join(&captured.blob_relative_path).exists());
2489    }
2490
2491    #[test]
2492    fn prune_apply_safety_hold_down_pins_recent_manifest_during_size_prune() {
2493        let temp = tempfile::TempDir::new().expect("tempdir");
2494        let data_dir = temp.path().join("cass-data");
2495        let source_path = temp.path().join("recent.jsonl");
2496        fs::write(
2497            &source_path,
2498            b"{\"type\":\"message\",\"text\":\"recent\"}\n",
2499        )
2500        .expect("write source");
2501        let captured = capture_source_file(RawMirrorCaptureInput {
2502            data_dir: &data_dir,
2503            provider: "codex",
2504            source_id: "local",
2505            origin_kind: "local",
2506            origin_host: None,
2507            source_path: &source_path,
2508            db_links: &[],
2509        })
2510        .expect("capture source");
2511
2512        let report = prune(
2513            &data_dir,
2514            RawMirrorPruneOptions {
2515                older_than_ms: None,
2516                max_size_bytes: Some(0),
2517                keep_tags: Vec::new(),
2518                safety_hold_down_ms: 7 * 86_400_000,
2519                apply: true,
2520            },
2521        )
2522        .expect("hold-down prune");
2523
2524        let root = data_dir
2525            .join(RAW_MIRROR_ROOT_DIR)
2526            .join(RAW_MIRROR_VERSION_DIR);
2527        assert_eq!(report.pinned_manifest_count, 1);
2528        assert_eq!(report.pinned_blob_count, 1);
2529        assert_eq!(report.planned_manifest_count, 0);
2530        assert_eq!(report.planned_blob_count, 0);
2531        assert!(root.join(&captured.manifest_relative_path).exists());
2532        assert!(root.join(&captured.blob_relative_path).exists());
2533    }
2534
2535    #[test]
2536    fn capture_source_file_revalidates_cached_blob_contents() {
2537        let temp = tempfile::TempDir::new().expect("tempdir");
2538        let data_dir = temp.path().join("cass-data");
2539        let source_path = temp.path().join("cached-source.jsonl");
2540        let source_bytes = b"{\"type\":\"message\",\"text\":\"cache me\"}\n";
2541        fs::write(&source_path, source_bytes).expect("write source");
2542
2543        let first = capture_source_file(RawMirrorCaptureInput {
2544            data_dir: &data_dir,
2545            provider: "codex",
2546            source_id: "local",
2547            origin_kind: "local",
2548            origin_host: None,
2549            source_path: &source_path,
2550            db_links: &[],
2551        })
2552        .expect("first capture");
2553
2554        let blob_path = data_dir
2555            .join(RAW_MIRROR_ROOT_DIR)
2556            .join(RAW_MIRROR_VERSION_DIR)
2557            .join(&first.blob_relative_path);
2558        fs::write(&blob_path, b"corrupted cached blob").expect("corrupt cached blob");
2559
2560        let err = capture_source_file(RawMirrorCaptureInput {
2561            data_dir: &data_dir,
2562            provider: "codex",
2563            source_id: "local",
2564            origin_kind: "local",
2565            origin_host: None,
2566            source_path: &source_path,
2567            db_links: &[],
2568        })
2569        .expect_err("corrupted content-addressed blob must be rejected");
2570        assert!(
2571            err.to_string().contains("existing raw mirror blob"),
2572            "unexpected cached-blob error: {err:#}"
2573        );
2574        assert_eq!(fs::read(&source_path).expect("source bytes"), source_bytes);
2575    }
2576
2577    #[cfg(unix)]
2578    #[test]
2579    fn capture_source_file_does_not_reuse_cache_after_same_size_mtime_preserving_rewrite() {
2580        let temp = tempfile::TempDir::new().expect("tempdir");
2581        let data_dir = temp.path().join("cass-data");
2582        let source_path = temp.path().join("same-size-rewrite.jsonl");
2583        let first_bytes = b"same length payload A\n";
2584        let second_bytes = b"same length payload B\n";
2585        fs::write(&source_path, first_bytes).expect("write first source");
2586
2587        let first_modified = fs::metadata(&source_path)
2588            .expect("first metadata")
2589            .modified()
2590            .expect("first modified time");
2591        let first = capture_source_file(RawMirrorCaptureInput {
2592            data_dir: &data_dir,
2593            provider: "codex",
2594            source_id: "local",
2595            origin_kind: "local",
2596            origin_host: None,
2597            source_path: &source_path,
2598            db_links: &[],
2599        })
2600        .expect("first capture");
2601
2602        std::thread::sleep(std::time::Duration::from_millis(5));
2603        fs::write(&source_path, second_bytes).expect("rewrite source");
2604        let source = OpenOptions::new()
2605            .write(true)
2606            .open(&source_path)
2607            .expect("open rewritten source");
2608        source
2609            .set_times(std::fs::FileTimes::new().set_modified(first_modified))
2610            .expect("restore original mtime");
2611
2612        let second = capture_source_file(RawMirrorCaptureInput {
2613            data_dir: &data_dir,
2614            provider: "codex",
2615            source_id: "local",
2616            origin_kind: "local",
2617            origin_host: None,
2618            source_path: &source_path,
2619            db_links: &[],
2620        })
2621        .expect("second capture");
2622
2623        assert_ne!(first.blob_blake3, second.blob_blake3);
2624        assert_eq!(
2625            second.blob_blake3,
2626            blake3::hash(second_bytes).to_hex().to_string()
2627        );
2628        assert_eq!(
2629            fs::read(&source_path).expect("source bytes after rewrite"),
2630            second_bytes
2631        );
2632    }
2633
2634    #[cfg(unix)]
2635    #[test]
2636    fn capture_source_file_rejects_symlinked_existing_blob_path() {
2637        let temp = tempfile::TempDir::new().expect("tempdir");
2638        let data_dir = temp.path().join("cass-data");
2639        let source_path = temp.path().join("cached-source.jsonl");
2640        let source_bytes = b"{\"type\":\"message\",\"text\":\"cache me\"}\n";
2641        fs::write(&source_path, source_bytes).expect("write source");
2642
2643        let blob_blake3 = blake3::hash(source_bytes).to_hex().to_string();
2644        let blob_relative_path =
2645            raw_mirror_blob_relative_path(&blob_blake3).expect("blob relative path");
2646        let blob_path = data_dir
2647            .join(RAW_MIRROR_ROOT_DIR)
2648            .join(RAW_MIRROR_VERSION_DIR)
2649            .join(&blob_relative_path);
2650        fs::create_dir_all(blob_path.parent().expect("blob parent")).expect("blob parent dir");
2651        let outside = temp.path().join("outside.raw");
2652        fs::write(&outside, source_bytes).expect("outside blob bytes");
2653        std::os::unix::fs::symlink(&outside, &blob_path).expect("symlink blob");
2654
2655        let err = capture_source_file(RawMirrorCaptureInput {
2656            data_dir: &data_dir,
2657            provider: "codex",
2658            source_id: "local",
2659            origin_kind: "local",
2660            origin_host: None,
2661            source_path: &source_path,
2662            db_links: &[],
2663        })
2664        .expect_err("symlinked content-addressed blob path must be rejected");
2665        assert!(
2666            err.to_string().contains("symlink raw mirror blob"),
2667            "unexpected symlink-blob error: {err:#}"
2668        );
2669
2670        let manifest_root = data_dir
2671            .join(RAW_MIRROR_ROOT_DIR)
2672            .join(RAW_MIRROR_VERSION_DIR)
2673            .join("manifests");
2674        assert!(
2675            !manifest_root.exists(),
2676            "failed blob publish must not write a manifest pointing at a symlinked blob"
2677        );
2678        assert_eq!(fs::read(&source_path).expect("source bytes"), source_bytes);
2679        assert_eq!(fs::read(&outside).expect("outside bytes"), source_bytes);
2680    }
2681
2682    #[cfg(unix)]
2683    #[test]
2684    fn capture_source_file_rejects_symlinked_raw_mirror_root_dir() {
2685        let temp = tempfile::TempDir::new().expect("tempdir");
2686        let data_dir = temp.path().join("cass-data");
2687        let source_path = temp.path().join("source.jsonl");
2688        let outside_mirror = temp.path().join("outside-mirror");
2689        let source_bytes = b"{\"type\":\"message\",\"text\":\"do not redirect archive\"}\n";
2690
2691        fs::create_dir_all(&data_dir).expect("data dir");
2692        fs::create_dir_all(&outside_mirror).expect("outside mirror dir");
2693        fs::write(&source_path, source_bytes).expect("write source");
2694        std::os::unix::fs::symlink(&outside_mirror, data_dir.join(RAW_MIRROR_ROOT_DIR))
2695            .expect("symlink raw mirror root");
2696
2697        let err = capture_source_file(RawMirrorCaptureInput {
2698            data_dir: &data_dir,
2699            provider: "codex",
2700            source_id: "local",
2701            origin_kind: "local",
2702            origin_host: None,
2703            source_path: &source_path,
2704            db_links: &[],
2705        })
2706        .expect_err("symlinked raw-mirror root must be rejected");
2707
2708        assert!(
2709            err.to_string().contains("symlink raw mirror dir"),
2710            "unexpected symlink-root error: {err:#}"
2711        );
2712        assert!(
2713            !outside_mirror.join(RAW_MIRROR_VERSION_DIR).exists(),
2714            "raw mirror capture must not create redirected archive state outside data_dir"
2715        );
2716        assert_eq!(fs::read(&source_path).expect("source bytes"), source_bytes);
2717    }
2718
2719    #[cfg(unix)]
2720    #[test]
2721    fn capture_source_file_rejects_symlinked_blob_directory_component() {
2722        let temp = tempfile::TempDir::new().expect("tempdir");
2723        let data_dir = temp.path().join("cass-data");
2724        let root = data_dir
2725            .join(RAW_MIRROR_ROOT_DIR)
2726            .join(RAW_MIRROR_VERSION_DIR);
2727        let source_path = temp.path().join("source.jsonl");
2728        let outside_blobs = temp.path().join("outside-blobs");
2729        let source_bytes = b"{\"type\":\"message\",\"text\":\"do not redirect blobs\"}\n";
2730
2731        fs::create_dir_all(&root).expect("raw mirror root");
2732        fs::create_dir_all(&outside_blobs).expect("outside blobs dir");
2733        fs::write(&source_path, source_bytes).expect("write source");
2734        std::os::unix::fs::symlink(&outside_blobs, root.join("blobs")).expect("symlink blobs dir");
2735
2736        let err = capture_source_file(RawMirrorCaptureInput {
2737            data_dir: &data_dir,
2738            provider: "codex",
2739            source_id: "local",
2740            origin_kind: "local",
2741            origin_host: None,
2742            source_path: &source_path,
2743            db_links: &[],
2744        })
2745        .expect_err("symlinked blob directory must be rejected");
2746
2747        assert!(
2748            err.to_string().contains("symlink raw mirror dir"),
2749            "unexpected symlink-blob-dir error: {err:#}"
2750        );
2751        assert!(
2752            !outside_blobs.join(RAW_MIRROR_HASH_ALGORITHM).exists(),
2753            "raw mirror capture must not create redirected blob state outside data_dir"
2754        );
2755        assert!(
2756            !root.join("manifests").exists(),
2757            "failed blob publish must not write a manifest"
2758        );
2759        assert_eq!(fs::read(&source_path).expect("source bytes"), source_bytes);
2760    }
2761
2762    #[test]
2763    fn capture_source_file_rejects_non_file_sources() {
2764        let temp = tempfile::TempDir::new().expect("tempdir");
2765        let data_dir = temp.path().join("cass-data");
2766        let source_dir = temp.path().join("source-dir");
2767        fs::create_dir(&source_dir).expect("source dir");
2768
2769        let err = capture_source_file(RawMirrorCaptureInput {
2770            data_dir: &data_dir,
2771            provider: "codex",
2772            source_id: "local",
2773            origin_kind: "local",
2774            origin_host: None,
2775            source_path: &source_dir,
2776            db_links: &[],
2777        })
2778        .expect_err("directory source should be rejected");
2779        assert!(
2780            err.to_string().contains("non-file source"),
2781            "unexpected non-file-source error: {err}"
2782        );
2783        assert!(
2784            !data_dir.join(RAW_MIRROR_ROOT_DIR).exists(),
2785            "rejected non-file sources must not initialize raw mirror storage"
2786        );
2787    }
2788
2789    #[cfg(unix)]
2790    #[test]
2791    fn capture_source_file_rejects_unreadable_sources_without_manifest() {
2792        use std::os::unix::fs::PermissionsExt;
2793
2794        let temp = tempfile::TempDir::new().expect("tempdir");
2795        let data_dir = temp.path().join("cass-data");
2796        let source_path = temp.path().join("unreadable.jsonl");
2797        fs::write(&source_path, b"private session bytes\n").expect("source");
2798        fs::set_permissions(&source_path, fs::Permissions::from_mode(0o000))
2799            .expect("make source unreadable");
2800
2801        let err = capture_source_file(RawMirrorCaptureInput {
2802            data_dir: &data_dir,
2803            provider: "codex",
2804            source_id: "local",
2805            origin_kind: "local",
2806            origin_host: None,
2807            source_path: &source_path,
2808            db_links: &[],
2809        })
2810        .expect_err("unreadable source should be rejected");
2811        fs::set_permissions(&source_path, fs::Permissions::from_mode(0o600))
2812            .expect("restore source perms");
2813        assert!(
2814            err.to_string().contains("open raw mirror source"),
2815            "unexpected unreadable-source error: {err}"
2816        );
2817        assert!(
2818            !data_dir.join("raw-mirror/v1/manifests").exists(),
2819            "failed unreadable-source captures must not publish manifests"
2820        );
2821    }
2822
2823    #[cfg(unix)]
2824    #[test]
2825    fn capture_source_file_rejects_symlink_sources() {
2826        use std::os::unix::fs::symlink;
2827
2828        let temp = tempfile::TempDir::new().expect("tempdir");
2829        let data_dir = temp.path().join("cass-data");
2830        let real_source = temp.path().join("real.jsonl");
2831        let symlink_source = temp.path().join("link.jsonl");
2832        fs::write(&real_source, b"secret session").expect("write source");
2833        symlink(&real_source, &symlink_source).expect("symlink");
2834
2835        let err = capture_source_file(RawMirrorCaptureInput {
2836            data_dir: &data_dir,
2837            provider: "codex",
2838            source_id: "local",
2839            origin_kind: "local",
2840            origin_host: None,
2841            source_path: &symlink_source,
2842            db_links: &[],
2843        })
2844        .expect_err("symlink source should be rejected");
2845        assert!(
2846            err.to_string().contains("symlink source"),
2847            "unexpected error: {err:#}"
2848        );
2849    }
2850}