Skip to main content

coding_agent_search/
raw_mirror.rs

1use anyhow::{Context, Result, anyhow};
2use serde::{Deserialize, Serialize};
3use serde_json::{Value, json};
4use std::collections::{HashMap, HashSet};
5use std::fs::{self, File, OpenOptions};
6use std::io::{Read, Write};
7use std::path::{Component, Path, PathBuf};
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::{Mutex, OnceLock};
10use std::time::{Duration, SystemTime, UNIX_EPOCH};
11
12const RAW_MIRROR_SCHEMA_VERSION: u32 = 1;
13const RAW_MIRROR_ROOT_DIR: &str = "raw-mirror";
14const RAW_MIRROR_VERSION_DIR: &str = "v1";
15const RAW_MIRROR_MANIFEST_KIND: &str = "cass_raw_session_mirror_v1";
16const RAW_MIRROR_HASH_ALGORITHM: &str = "blake3";
17const RAW_MIRROR_BLOB_EXTENSION: &str = "raw";
18
19static TEMP_NONCE: AtomicU64 = AtomicU64::new(0);
20static BLOB_CAPTURE_CACHE: OnceLock<Mutex<HashMap<RawMirrorBlobCacheKey, RawMirrorBlobRecord>>> =
21    OnceLock::new();
22static MANIFEST_UPDATE_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
23
24fn raw_mirror_fsync_enabled() -> bool {
25    dotenvy::var("CASS_RAW_MIRROR_FSYNC")
26        .ok()
27        .is_some_and(|value| matches!(value.trim(), "1" | "true" | "TRUE" | "yes" | "YES"))
28}
29
30#[derive(Debug, Clone)]
31pub struct RawMirrorCaptureInput<'a> {
32    pub data_dir: &'a Path,
33    pub provider: &'a str,
34    pub source_id: &'a str,
35    pub origin_kind: &'a str,
36    pub origin_host: Option<&'a str>,
37    pub source_path: &'a Path,
38    pub db_links: &'a [RawMirrorDbLink],
39}
40
41#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
42pub struct RawMirrorCaptureRecord {
43    pub manifest_id: String,
44    pub manifest_relative_path: String,
45    pub blob_relative_path: String,
46    pub blob_blake3: String,
47    pub blob_size_bytes: u64,
48    pub captured_at_ms: i64,
49    pub source_mtime_ms: Option<i64>,
50    pub already_present: bool,
51}
52
53#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
54pub struct RawMirrorDbLink {
55    pub conversation_id: Option<i64>,
56    pub message_count: Option<usize>,
57    pub source_path: Option<String>,
58    pub started_at_ms: Option<i64>,
59}
60
61#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
62pub struct RawMirrorStorageSummary {
63    pub initialized: bool,
64    pub root_path: String,
65    pub total_storage_bytes: u64,
66    pub manifest_count: u64,
67    pub manifest_bytes: u64,
68    pub unique_blob_count: u64,
69    pub total_blob_bytes: u64,
70    pub largest_blob_bytes: u64,
71    pub missing_blob_count: u64,
72    pub invalid_manifest_count: u64,
73    pub oldest_capture_at_ms: Option<i64>,
74    pub newest_capture_at_ms: Option<i64>,
75    pub oldest_source_mtime_ms: Option<i64>,
76    pub newest_source_mtime_ms: Option<i64>,
77}
78
79pub fn storage_summary(data_dir: &Path) -> RawMirrorStorageSummary {
80    let root = raw_mirror_root(data_dir);
81    let mut summary = RawMirrorStorageSummary {
82        root_path: root.display().to_string(),
83        ..RawMirrorStorageSummary::default()
84    };
85    let root_metadata = match fs::symlink_metadata(&root) {
86        Ok(metadata) => metadata,
87        Err(_) => return summary,
88    };
89    summary.initialized = true;
90    if root_metadata.file_type().is_symlink() || !root_metadata.is_dir() {
91        summary.invalid_manifest_count = 1;
92        return summary;
93    }
94
95    summary.total_storage_bytes = raw_mirror_dir_file_bytes(&root);
96
97    let manifests_dir = root.join("manifests");
98    let Ok(manifests_metadata) = fs::symlink_metadata(&manifests_dir) else {
99        return summary;
100    };
101    if manifests_metadata.file_type().is_symlink() || !manifests_metadata.is_dir() {
102        summary.invalid_manifest_count = summary.invalid_manifest_count.saturating_add(1);
103        return summary;
104    }
105    let entries = match fs::read_dir(&manifests_dir) {
106        Ok(entries) => entries,
107        Err(_) => return summary,
108    };
109    let mut seen_blobs = HashSet::new();
110    for entry in entries {
111        let Ok(entry) = entry else {
112            summary.invalid_manifest_count = summary.invalid_manifest_count.saturating_add(1);
113            continue;
114        };
115        let path = entry.path();
116        let manifest_metadata = match fs::symlink_metadata(&path) {
117            Ok(metadata) if metadata.is_file() && !metadata.file_type().is_symlink() => metadata,
118            _ => {
119                summary.invalid_manifest_count = summary.invalid_manifest_count.saturating_add(1);
120                continue;
121            }
122        };
123        summary.manifest_bytes = summary
124            .manifest_bytes
125            .saturating_add(manifest_metadata.len());
126        let manifest = match read_raw_mirror_manifest(&path) {
127            Ok(manifest) if manifest.manifest_kind == RAW_MIRROR_MANIFEST_KIND => manifest,
128            _ => {
129                summary.invalid_manifest_count = summary.invalid_manifest_count.saturating_add(1);
130                continue;
131            }
132        };
133        summary.manifest_count = summary.manifest_count.saturating_add(1);
134        merge_min_max(
135            &mut summary.oldest_capture_at_ms,
136            &mut summary.newest_capture_at_ms,
137            Some(manifest.captured_at_ms),
138        );
139        merge_min_max(
140            &mut summary.oldest_source_mtime_ms,
141            &mut summary.newest_source_mtime_ms,
142            manifest.source_mtime_ms,
143        );
144
145        let Some(blob_relative_path) = raw_mirror_blob_relative_path(&manifest.blob_blake3) else {
146            summary.invalid_manifest_count = summary.invalid_manifest_count.saturating_add(1);
147            continue;
148        };
149        if manifest.blob_relative_path != blob_relative_path {
150            summary.invalid_manifest_count = summary.invalid_manifest_count.saturating_add(1);
151            continue;
152        }
153
154        if !seen_blobs.insert(blob_relative_path.clone()) {
155            continue;
156        }
157        let blob_path = root.join(blob_relative_path);
158        match fs::symlink_metadata(&blob_path) {
159            Ok(metadata) if metadata.is_file() && !metadata.file_type().is_symlink() => {
160                let size = metadata.len();
161                summary.unique_blob_count = summary.unique_blob_count.saturating_add(1);
162                summary.total_blob_bytes = summary.total_blob_bytes.saturating_add(size);
163                summary.largest_blob_bytes = summary.largest_blob_bytes.max(size);
164            }
165            _ => {
166                summary.missing_blob_count = summary.missing_blob_count.saturating_add(1);
167            }
168        }
169    }
170
171    summary
172}
173
174#[derive(Debug, Clone, Default)]
175pub struct RawMirrorPruneOptions {
176    pub older_than_ms: Option<i64>,
177    pub max_size_bytes: Option<u64>,
178    pub keep_tags: Vec<String>,
179    pub safety_hold_down_ms: i64,
180    pub apply: bool,
181}
182
183#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
184pub struct RawMirrorPruneReport {
185    pub initialized: bool,
186    pub root_path: String,
187    pub mode: String,
188    pub manifest_count: u64,
189    pub unique_blob_count: u64,
190    pub current_blob_bytes: u64,
191    pub safety_hold_down_ms: i64,
192    pub keep_tags: Vec<String>,
193    pub pinned_manifest_count: u64,
194    pub pinned_blob_count: u64,
195    pub planned_manifest_count: u64,
196    pub planned_blob_count: u64,
197    pub planned_reclaim_bytes: u64,
198    pub applied_manifest_count: u64,
199    pub applied_blob_count: u64,
200    pub applied_reclaim_bytes: u64,
201    pub audit_log_path: Option<String>,
202    pub entries: Vec<RawMirrorPruneEntry>,
203}
204
205#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
206pub struct RawMirrorPruneEntry {
207    pub kind: String,
208    pub path: String,
209    pub blob_blake3: Option<String>,
210    pub size_bytes: u64,
211    pub reason: String,
212    pub applied: bool,
213}
214
215#[derive(Debug, Clone)]
216struct RawMirrorPruneManifest {
217    manifest_id: String,
218    relative_path: String,
219    size_bytes: u64,
220    blob_blake3: String,
221    blob_relative_path: String,
222    blob_size_bytes: u64,
223    captured_at_ms: i64,
224    provider: String,
225    original_path: String,
226    db_links: Vec<RawMirrorDbLink>,
227}
228
229pub fn prune(data_dir: &Path, options: RawMirrorPruneOptions) -> Result<RawMirrorPruneReport> {
230    let root = raw_mirror_root(data_dir);
231    let mut report = RawMirrorPruneReport {
232        initialized: false,
233        root_path: root.display().to_string(),
234        mode: if options.apply {
235            "apply".to_string()
236        } else {
237            "dry-run".to_string()
238        },
239        manifest_count: 0,
240        unique_blob_count: 0,
241        current_blob_bytes: 0,
242        safety_hold_down_ms: options.safety_hold_down_ms,
243        keep_tags: options.keep_tags.clone(),
244        pinned_manifest_count: 0,
245        pinned_blob_count: 0,
246        planned_manifest_count: 0,
247        planned_blob_count: 0,
248        planned_reclaim_bytes: 0,
249        applied_manifest_count: 0,
250        applied_blob_count: 0,
251        applied_reclaim_bytes: 0,
252        audit_log_path: None,
253        entries: Vec::new(),
254    };
255
256    let metadata = match fs::symlink_metadata(&root) {
257        Ok(metadata) => metadata,
258        Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(report),
259        Err(err) => {
260            return Err(err).with_context(|| format!("stat raw mirror root {}", root.display()));
261        }
262    };
263    if metadata.file_type().is_symlink() || !metadata.is_dir() {
264        anyhow::bail!(
265            "refusing to prune invalid raw mirror root {}",
266            root.display()
267        );
268    }
269    report.initialized = true;
270
271    let manifests = collect_prune_manifests(&root)?;
272    report.manifest_count = manifests.len() as u64;
273
274    let mut blob_to_manifests: HashMap<String, Vec<String>> = HashMap::new();
275    let mut manifest_by_id: HashMap<String, &RawMirrorPruneManifest> = HashMap::new();
276    let mut blob_size_by_relative: HashMap<String, u64> = HashMap::new();
277    for manifest in &manifests {
278        manifest_by_id.insert(manifest.manifest_id.clone(), manifest);
279        blob_to_manifests
280            .entry(manifest.blob_relative_path.clone())
281            .or_default()
282            .push(manifest.manifest_id.clone());
283        blob_size_by_relative
284            .entry(manifest.blob_relative_path.clone())
285            .or_insert_with(|| {
286                blob_file_size(&root.join(&manifest.blob_relative_path))
287                    .unwrap_or(manifest.blob_size_bytes)
288            });
289    }
290    report.unique_blob_count = blob_size_by_relative.len() as u64;
291    report.current_blob_bytes = blob_size_by_relative
292        .values()
293        .copied()
294        .fold(0u64, u64::saturating_add);
295
296    let now = now_ms();
297    let pinned_manifests = pinned_prune_manifest_ids(
298        data_dir,
299        &manifests,
300        &options.keep_tags,
301        options.safety_hold_down_ms,
302        now,
303    )?;
304    report.pinned_manifest_count = pinned_manifests.len() as u64;
305    let pinned_blobs: HashSet<String> = blob_to_manifests
306        .iter()
307        .filter(|(_, manifest_ids)| manifest_ids.iter().any(|id| pinned_manifests.contains(id)))
308        .map(|(blob_relative_path, _)| blob_relative_path.clone())
309        .collect();
310    report.pinned_blob_count = pinned_blobs.len() as u64;
311
312    let mut selected_manifests: HashSet<String> = HashSet::new();
313    let mut manifest_reasons: HashMap<String, String> = HashMap::new();
314
315    if let Some(older_than_ms) = options.older_than_ms {
316        let cutoff_ms = now.saturating_sub(older_than_ms.max(0));
317        for manifest in &manifests {
318            if manifest.captured_at_ms <= cutoff_ms
319                && !pinned_manifests.contains(&manifest.manifest_id)
320            {
321                selected_manifests.insert(manifest.manifest_id.clone());
322                manifest_reasons
323                    .entry(manifest.manifest_id.clone())
324                    .or_insert_with(|| format!("captured_at_ms <= {cutoff_ms}"));
325            }
326        }
327    }
328
329    if let Some(max_size_bytes) = options.max_size_bytes
330        && report.current_blob_bytes > max_size_bytes
331    {
332        let mut blob_groups: Vec<_> = blob_to_manifests
333            .iter()
334            .map(|(blob_relative_path, manifest_ids)| {
335                let oldest_capture = manifest_ids
336                    .iter()
337                    .filter_map(|id| manifest_by_id.get(id).map(|m| m.captured_at_ms))
338                    .min()
339                    .unwrap_or(i64::MAX);
340                let size = blob_size_by_relative
341                    .get(blob_relative_path)
342                    .copied()
343                    .unwrap_or(0);
344                (
345                    blob_relative_path.clone(),
346                    manifest_ids.clone(),
347                    oldest_capture,
348                    size,
349                )
350            })
351            .collect::<Vec<_>>();
352        blob_groups.sort_by(|left, right| left.2.cmp(&right.2).then_with(|| left.0.cmp(&right.0)));
353
354        let mut projected_bytes = report.current_blob_bytes;
355        for (blob_relative_path, manifest_ids, _, size) in blob_groups {
356            if projected_bytes <= max_size_bytes {
357                break;
358            }
359            if pinned_blobs.contains(&blob_relative_path) {
360                continue;
361            }
362            for manifest_id in manifest_ids {
363                if !pinned_manifests.contains(&manifest_id) {
364                    selected_manifests.insert(manifest_id.clone());
365                    manifest_reasons.entry(manifest_id).or_insert_with(|| {
366                        format!("max-size over budget; retiring blob {blob_relative_path}")
367                    });
368                }
369            }
370            projected_bytes = projected_bytes.saturating_sub(size);
371        }
372    }
373
374    let selected_blobs: HashSet<String> = blob_to_manifests
375        .iter()
376        .filter(|(_, manifest_ids)| {
377            manifest_ids
378                .iter()
379                .all(|id| selected_manifests.contains(id))
380        })
381        .map(|(blob_relative_path, _)| blob_relative_path.clone())
382        .collect();
383
384    let mut entries = Vec::new();
385    let mut selected_manifest_ids = selected_manifests.into_iter().collect::<Vec<_>>();
386    selected_manifest_ids.sort();
387    for manifest_id in selected_manifest_ids {
388        let Some(manifest) = manifest_by_id.get(&manifest_id) else {
389            continue;
390        };
391        let reason = manifest_reasons
392            .remove(&manifest_id)
393            .unwrap_or_else(|| "selected by retention policy".to_string());
394        entries.push(RawMirrorPruneEntry {
395            kind: "manifest".to_string(),
396            path: manifest.relative_path.clone(),
397            blob_blake3: Some(manifest.blob_blake3.clone()),
398            size_bytes: manifest.size_bytes,
399            reason,
400            applied: false,
401        });
402    }
403
404    let mut selected_blob_paths = selected_blobs.into_iter().collect::<Vec<_>>();
405    selected_blob_paths.sort();
406    for blob_relative_path in selected_blob_paths {
407        let size = blob_size_by_relative
408            .get(&blob_relative_path)
409            .copied()
410            .unwrap_or(0);
411        let blob_blake3 = blob_relative_path
412            .rsplit('/')
413            .next()
414            .and_then(|name| name.strip_suffix(".raw"))
415            .map(ToOwned::to_owned);
416        entries.push(RawMirrorPruneEntry {
417            kind: "blob".to_string(),
418            path: blob_relative_path,
419            blob_blake3,
420            size_bytes: size,
421            reason: "no retained manifest references this blob after prune plan".to_string(),
422            applied: false,
423        });
424    }
425
426    report.planned_manifest_count = entries
427        .iter()
428        .filter(|entry| entry.kind == "manifest")
429        .count() as u64;
430    report.planned_blob_count = entries.iter().filter(|entry| entry.kind == "blob").count() as u64;
431    report.planned_reclaim_bytes = entries
432        .iter()
433        .map(|entry| entry.size_bytes)
434        .fold(0, u64::saturating_add);
435
436    if options.apply {
437        for entry in &mut entries {
438            let path = root.join(&entry.path);
439            let removed = remove_prune_target_file(&path)
440                .with_context(|| format!("applying raw mirror prune for {}", path.display()))?;
441            entry.applied = removed;
442            if removed {
443                if entry.kind == "manifest" {
444                    report.applied_manifest_count = report.applied_manifest_count.saturating_add(1);
445                } else if entry.kind == "blob" {
446                    report.applied_blob_count = report.applied_blob_count.saturating_add(1);
447                }
448                report.applied_reclaim_bytes = report
449                    .applied_reclaim_bytes
450                    .saturating_add(entry.size_bytes);
451            }
452        }
453    }
454
455    report.entries = entries;
456    if !report.entries.is_empty() {
457        let audit_path = append_prune_audit_log(&root, &report)?;
458        report.audit_log_path = Some(audit_path.display().to_string());
459    }
460    Ok(report)
461}
462
463fn collect_prune_manifests(root: &Path) -> Result<Vec<RawMirrorPruneManifest>> {
464    let manifests_dir = root.join("manifests");
465    let metadata = match fs::symlink_metadata(&manifests_dir) {
466        Ok(metadata) => metadata,
467        Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
468        Err(err) => return Err(err).with_context(|| format!("stat {}", manifests_dir.display())),
469    };
470    if metadata.file_type().is_symlink() || !metadata.is_dir() {
471        anyhow::bail!(
472            "refusing to prune invalid raw mirror manifests directory {}",
473            manifests_dir.display()
474        );
475    }
476
477    let mut manifests = Vec::new();
478    for entry in
479        fs::read_dir(&manifests_dir).with_context(|| format!("read {}", manifests_dir.display()))?
480    {
481        let entry = entry?;
482        let path = entry.path();
483        if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
484            continue;
485        }
486        let manifest_metadata = fs::symlink_metadata(&path)
487            .with_context(|| format!("stat raw mirror manifest {}", path.display()))?;
488        if manifest_metadata.file_type().is_symlink() || !manifest_metadata.is_file() {
489            anyhow::bail!(
490                "refusing to prune with non-regular raw mirror manifest {}",
491                path.display()
492            );
493        }
494        let manifest = read_raw_mirror_manifest(&path)?;
495        if manifest.manifest_kind != RAW_MIRROR_MANIFEST_KIND {
496            anyhow::bail!(
497                "refusing to prune with unexpected raw mirror manifest kind `{}` in {}",
498                manifest.manifest_kind,
499                path.display()
500            );
501        }
502        let Some(expected_blob_relative_path) =
503            raw_mirror_blob_relative_path(&manifest.blob_blake3)
504        else {
505            anyhow::bail!(
506                "refusing to prune raw mirror manifest {} with invalid blob hash",
507                path.display()
508            );
509        };
510        if manifest.blob_relative_path != expected_blob_relative_path {
511            anyhow::bail!(
512                "refusing to prune raw mirror manifest {} with unexpected blob path `{}`",
513                path.display(),
514                manifest.blob_relative_path
515            );
516        }
517        let relative_path = path
518            .strip_prefix(root)
519            .unwrap_or(&path)
520            .display()
521            .to_string();
522        manifests.push(RawMirrorPruneManifest {
523            manifest_id: manifest.manifest_id,
524            relative_path,
525            size_bytes: manifest_metadata.len(),
526            blob_blake3: manifest.blob_blake3,
527            blob_relative_path: manifest.blob_relative_path,
528            blob_size_bytes: manifest.blob_size_bytes,
529            captured_at_ms: manifest.captured_at_ms,
530            provider: manifest.provider,
531            original_path: manifest.original_path,
532            db_links: manifest.db_links,
533        });
534    }
535    manifests.sort_by(|left, right| {
536        left.captured_at_ms
537            .cmp(&right.captured_at_ms)
538            .then_with(|| left.provider.cmp(&right.provider))
539            .then_with(|| left.original_path.cmp(&right.original_path))
540            .then_with(|| left.manifest_id.cmp(&right.manifest_id))
541    });
542    Ok(manifests)
543}
544
545fn pinned_prune_manifest_ids(
546    data_dir: &Path,
547    manifests: &[RawMirrorPruneManifest],
548    keep_tags: &[String],
549    safety_hold_down_ms: i64,
550    now_ms: i64,
551) -> Result<HashSet<String>> {
552    let mut pinned = HashSet::new();
553    if safety_hold_down_ms > 0 {
554        let cutoff_ms = now_ms.saturating_sub(safety_hold_down_ms);
555        for manifest in manifests {
556            if manifest.captured_at_ms > cutoff_ms {
557                pinned.insert(manifest.manifest_id.clone());
558            }
559        }
560    }
561
562    let normalized_keep_tags = keep_tags
563        .iter()
564        .map(|tag| tag.trim())
565        .filter(|tag| !tag.is_empty())
566        .map(ToOwned::to_owned)
567        .collect::<Vec<_>>();
568    if normalized_keep_tags.is_empty() {
569        return Ok(pinned);
570    }
571
572    let keep_tag_conversation_ids =
573        load_keep_tag_conversation_ids(data_dir, manifests, &normalized_keep_tags)?;
574    for manifest in manifests {
575        if manifest.db_links.iter().any(|link| {
576            link.conversation_id
577                .is_some_and(|id| keep_tag_conversation_ids.contains(&id))
578        }) {
579            pinned.insert(manifest.manifest_id.clone());
580        }
581    }
582    Ok(pinned)
583}
584
585fn load_keep_tag_conversation_ids(
586    data_dir: &Path,
587    manifests: &[RawMirrorPruneManifest],
588    keep_tags: &[String],
589) -> Result<HashSet<i64>> {
590    use frankensqlite::compat::{ConnectionExt as _, ParamValue, RowExt as _};
591
592    let mut conversation_ids = manifests
593        .iter()
594        .flat_map(|manifest| manifest.db_links.iter())
595        .filter_map(|link| link.conversation_id)
596        .collect::<Vec<_>>();
597    conversation_ids.sort_unstable();
598    conversation_ids.dedup();
599    if conversation_ids.is_empty() {
600        return Ok(HashSet::new());
601    }
602
603    let db_path = data_dir.join("agent_search.db");
604    let conn = crate::storage::sqlite::open_franken_raw_readonly_connection_with_timeout(
605        &db_path,
606        Duration::from_secs(30),
607    )
608    .with_context(|| {
609        format!(
610            "open {} to honor raw-mirror prune --keep-tag",
611            db_path.display()
612        )
613    })?;
614    let _ = conn.execute("PRAGMA query_only = 1;");
615
616    let mut pinned = HashSet::new();
617    for id_chunk in conversation_ids.chunks(400) {
618        let tag_placeholders = (0..keep_tags.len())
619            .map(|idx| format!("?{}", idx + 1))
620            .collect::<Vec<_>>()
621            .join(", ");
622        let id_offset = keep_tags.len();
623        let id_placeholders = (0..id_chunk.len())
624            .map(|idx| format!("?{}", id_offset + idx + 1))
625            .collect::<Vec<_>>()
626            .join(", ");
627        let sql = format!(
628            "SELECT DISTINCT ct.conversation_id \
629             FROM conversation_tags ct \
630             JOIN tags t ON t.id = ct.tag_id \
631             WHERE t.name IN ({tag_placeholders}) \
632               AND ct.conversation_id IN ({id_placeholders})"
633        );
634        let mut params = keep_tags
635            .iter()
636            .map(|tag| ParamValue::from(tag.as_str()))
637            .collect::<Vec<_>>();
638        params.extend(id_chunk.iter().copied().map(ParamValue::from));
639        let rows: Vec<i64> = conn
640            .query_map_collect(&sql, &params, |row: &frankensqlite::Row| row.get_typed(0))
641            .with_context(|| "query raw-mirror prune keep-tag conversation pins")?;
642        pinned.extend(rows);
643    }
644
645    Ok(pinned)
646}
647
648fn blob_file_size(path: &Path) -> Option<u64> {
649    fs::symlink_metadata(path)
650        .ok()
651        .filter(|metadata| metadata.is_file() && !metadata.file_type().is_symlink())
652        .map(|metadata| metadata.len())
653}
654
655fn remove_prune_target_file(path: &Path) -> Result<bool> {
656    let metadata = match fs::symlink_metadata(path) {
657        Ok(metadata) => metadata,
658        Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(false),
659        Err(err) => return Err(err).with_context(|| format!("stat {}", path.display())),
660    };
661    if metadata.file_type().is_symlink() || !metadata.is_file() {
662        anyhow::bail!(
663            "refusing to prune non-regular raw mirror file {}",
664            path.display()
665        );
666    }
667    fs::remove_file(path).with_context(|| format!("remove raw mirror file {}", path.display()))?;
668    sync_parent(path)?;
669    Ok(true)
670}
671
672fn append_prune_audit_log(root: &Path, report: &RawMirrorPruneReport) -> Result<PathBuf> {
673    ensure_private_dir(root)?;
674    let audit_path = root.join("pruned.jsonl");
675    ensure_prune_audit_log_appendable(&audit_path)?;
676    let mut file = OpenOptions::new()
677        .create(true)
678        .append(true)
679        .open(&audit_path)
680        .with_context(|| format!("open raw mirror prune audit {}", audit_path.display()))?;
681    set_private_file_permissions(&audit_path)?;
682    let now = now_ms();
683    for entry in &report.entries {
684        let record = json!({
685            "schema_version": 1,
686            "recorded_at_ms": now,
687            "mode": report.mode,
688            "kind": entry.kind,
689            "path": entry.path,
690            "blob_blake3": entry.blob_blake3,
691            "size_bytes": entry.size_bytes,
692            "reason": entry.reason,
693            "applied": entry.applied,
694        });
695        writeln!(file, "{record}")
696            .with_context(|| format!("write raw mirror prune audit {}", audit_path.display()))?;
697    }
698    sync_open_file_if_required(&file, || {
699        format!("sync raw mirror prune audit {}", audit_path.display())
700    })?;
701    sync_parent(&audit_path)?;
702    Ok(audit_path)
703}
704
705fn ensure_prune_audit_log_appendable(path: &Path) -> Result<()> {
706    match fs::symlink_metadata(path) {
707        Ok(metadata) if metadata.file_type().is_symlink() => {
708            anyhow::bail!(
709                "refusing to append raw mirror prune audit through symlink {}",
710                path.display()
711            );
712        }
713        Ok(metadata) if !metadata.is_file() => {
714            anyhow::bail!(
715                "refusing to append raw mirror prune audit to non-file {}",
716                path.display()
717            );
718        }
719        Ok(_) => Ok(()),
720        Err(err) if matches!(err.kind(), std::io::ErrorKind::NotFound) => Ok(()),
721        Err(err) => Err(err).with_context(|| {
722            format!(
723                "inspect raw mirror prune audit before append {}",
724                path.display()
725            )
726        }),
727    }
728}
729
730fn merge_min_max(min: &mut Option<i64>, max: &mut Option<i64>, value: Option<i64>) {
731    let Some(value) = value else {
732        return;
733    };
734    *min = Some(min.map_or(value, |current| current.min(value)));
735    *max = Some(max.map_or(value, |current| current.max(value)));
736}
737
738fn raw_mirror_dir_file_bytes(root: &Path) -> u64 {
739    let mut total = 0u64;
740    let mut stack = vec![root.to_path_buf()];
741    while let Some(path) = stack.pop() {
742        let Ok(metadata) = fs::symlink_metadata(&path) else {
743            continue;
744        };
745        if metadata.file_type().is_symlink() {
746            continue;
747        }
748        if metadata.is_file() {
749            total = total.saturating_add(metadata.len());
750        } else if metadata.is_dir() {
751            let Ok(entries) = fs::read_dir(&path) else {
752                continue;
753            };
754            for entry in entries.flatten() {
755                stack.push(entry.path());
756            }
757        }
758    }
759    total
760}
761
762#[derive(Debug, Clone, PartialEq, Eq, Hash)]
763struct RawMirrorBlobCacheKey {
764    data_dir: PathBuf,
765    source_path: PathBuf,
766    source_identity: Option<String>,
767    source_size_bytes: u64,
768    source_mtime_ns: Option<u128>,
769    source_change_time_ns: Option<u128>,
770}
771
772#[derive(Debug, Clone, PartialEq, Eq)]
773struct RawMirrorBlobRecord {
774    blob_blake3: String,
775    bytes_copied: u64,
776}
777
778#[derive(Debug, Clone, Serialize, Deserialize)]
779struct RawMirrorCompressionEnvelope {
780    state: String,
781    algorithm: Option<String>,
782    uncompressed_size_bytes: Option<u64>,
783}
784
785#[derive(Debug, Clone, Serialize, Deserialize)]
786struct RawMirrorEncryptionEnvelope {
787    state: String,
788    algorithm: Option<String>,
789    key_id: Option<String>,
790    envelope_version: Option<u32>,
791}
792
793#[derive(Debug, Clone, Serialize, Deserialize)]
794struct RawMirrorVerificationRecord {
795    status: String,
796    verifier: String,
797    content_blake3: Option<String>,
798    verified_at_ms: Option<i64>,
799}
800
801#[derive(Debug, Clone, Serialize, Deserialize)]
802struct RawMirrorManifestFile {
803    schema_version: u32,
804    manifest_kind: String,
805    manifest_id: String,
806    blob_hash_algorithm: String,
807    blob_relative_path: String,
808    blob_blake3: String,
809    blob_size_bytes: u64,
810    provider: String,
811    source_id: String,
812    origin_kind: String,
813    origin_host: Option<String>,
814    original_path: String,
815    redacted_original_path: String,
816    original_path_blake3: String,
817    captured_at_ms: i64,
818    source_mtime_ms: Option<i64>,
819    source_size_bytes: u64,
820    compression: RawMirrorCompressionEnvelope,
821    encryption: RawMirrorEncryptionEnvelope,
822    db_links: Vec<RawMirrorDbLink>,
823    verification: RawMirrorVerificationRecord,
824    manifest_blake3: Option<String>,
825}
826
827pub fn capture_source_file(input: RawMirrorCaptureInput<'_>) -> Result<RawMirrorCaptureRecord> {
828    let source_metadata = fs::symlink_metadata(input.source_path)
829        .with_context(|| format!("stat raw mirror source {}", input.source_path.display()))?;
830    if source_metadata.file_type().is_symlink() {
831        return Err(anyhow!(
832            "refusing to raw-mirror symlink source {}",
833            input.source_path.display()
834        ));
835    }
836    if !source_metadata.is_file() {
837        return Err(anyhow!(
838            "refusing to raw-mirror non-file source {}",
839            input.source_path.display()
840        ));
841    }
842
843    let root = ensure_raw_mirror_root(input.data_dir)?;
844    ensure_private_dir_descendant(&root, &root.join("tmp"))?;
845
846    let cache_key = raw_mirror_blob_cache_key(&input, &source_metadata);
847    let (blob_blake3, bytes_copied, blob_already_present) =
848        match cached_raw_mirror_blob_record(&cache_key, &root) {
849            Some(record) => (record.blob_blake3, record.bytes_copied, true),
850            None => {
851                let temp_dir = unique_capture_temp_dir(&root);
852                ensure_private_dir_descendant(&root, &temp_dir)?;
853                let CopyToTempResult {
854                    temp_path,
855                    blob_blake3,
856                    bytes_copied,
857                } = copy_source_to_private_temp(input.source_path, &temp_dir, &source_metadata)?;
858                let blob_relative_path = raw_mirror_blob_relative_path(&blob_blake3)
859                    .ok_or_else(|| anyhow!("computed invalid raw mirror blake3 digest"))?;
860                let blob_path = root.join(&blob_relative_path);
861                let already_present =
862                    publish_content_addressed_temp(&root, &temp_path, &blob_path, &blob_blake3)?;
863                remove_empty_temp_dir_best_effort(&temp_dir);
864                cache_raw_mirror_blob_record(
865                    cache_key.clone(),
866                    RawMirrorBlobRecord {
867                        blob_blake3: blob_blake3.clone(),
868                        bytes_copied,
869                    },
870                );
871                (blob_blake3, bytes_copied, already_present)
872            }
873        };
874    let blob_relative_path = raw_mirror_blob_relative_path(&blob_blake3)
875        .ok_or_else(|| anyhow!("computed invalid raw mirror blake3 digest"))?;
876
877    let original_path = input.source_path.display().to_string();
878    let original_path_blake3 = raw_mirror_original_path_blake3(&original_path);
879    let manifest_id = raw_mirror_manifest_id(
880        input.provider,
881        input.source_id,
882        input.origin_kind,
883        input.origin_host,
884        &original_path_blake3,
885        &blob_blake3,
886    );
887    let manifest_relative_path = raw_mirror_manifest_relative_path(&manifest_id);
888    let manifest_path = root.join(&manifest_relative_path);
889    let captured_at_ms = now_ms();
890    let source_mtime_ms = source_metadata.modified().ok().and_then(system_time_to_ms);
891    let mut manifest = RawMirrorManifestFile {
892        schema_version: RAW_MIRROR_SCHEMA_VERSION,
893        manifest_kind: RAW_MIRROR_MANIFEST_KIND.to_string(),
894        manifest_id: manifest_id.clone(),
895        blob_hash_algorithm: RAW_MIRROR_HASH_ALGORITHM.to_string(),
896        blob_relative_path: blob_relative_path.clone(),
897        blob_blake3: blob_blake3.clone(),
898        blob_size_bytes: bytes_copied,
899        provider: input.provider.to_string(),
900        source_id: input.source_id.to_string(),
901        origin_kind: input.origin_kind.to_string(),
902        origin_host: input.origin_host.map(ToOwned::to_owned),
903        original_path,
904        redacted_original_path: redacted_original_path(input.provider, input.source_path),
905        original_path_blake3,
906        captured_at_ms,
907        source_mtime_ms,
908        source_size_bytes: source_metadata.len(),
909        compression: RawMirrorCompressionEnvelope {
910            state: "none".to_string(),
911            algorithm: None,
912            uncompressed_size_bytes: Some(bytes_copied),
913        },
914        encryption: RawMirrorEncryptionEnvelope {
915            state: "none".to_string(),
916            algorithm: None,
917            key_id: None,
918            envelope_version: None,
919        },
920        db_links: unique_db_links(input.db_links),
921        verification: RawMirrorVerificationRecord {
922            status: "captured".to_string(),
923            verifier: "cass_indexer".to_string(),
924            content_blake3: Some(blob_blake3.clone()),
925            verified_at_ms: Some(captured_at_ms),
926        },
927        manifest_blake3: None,
928    };
929    manifest.manifest_blake3 = Some(raw_mirror_manifest_blake3(&manifest));
930    let manifest_bytes = serde_json::to_vec_pretty(&manifest)?;
931    let manifest_already_present =
932        publish_manifest_bytes_create_new(&root, &manifest_path, &manifest_bytes, &blob_blake3)?;
933    let (record_blob_size_bytes, record_captured_at_ms, record_source_mtime_ms) =
934        if manifest_already_present {
935            merge_raw_mirror_manifest_db_links(
936                &root,
937                &manifest_path,
938                input.db_links,
939                Some(&blob_blake3),
940            )?;
941            let published = read_raw_mirror_manifest(&manifest_path)?;
942            (
943                published.blob_size_bytes,
944                published.captured_at_ms,
945                published.source_mtime_ms,
946            )
947        } else {
948            (bytes_copied, captured_at_ms, source_mtime_ms)
949        };
950
951    Ok(RawMirrorCaptureRecord {
952        manifest_id,
953        manifest_relative_path,
954        blob_relative_path,
955        blob_blake3,
956        blob_size_bytes: record_blob_size_bytes,
957        captured_at_ms: record_captured_at_ms,
958        source_mtime_ms: record_source_mtime_ms,
959        already_present: blob_already_present && manifest_already_present,
960    })
961}
962
963pub fn merge_manifest_db_links(
964    data_dir: &Path,
965    manifest_relative_path: &str,
966    links: &[RawMirrorDbLink],
967) -> Result<()> {
968    if links.is_empty() {
969        return Ok(());
970    }
971    let root = raw_mirror_root(data_dir);
972    let manifest_path = raw_mirror_manifest_path_from_relative(&root, manifest_relative_path)?;
973    merge_raw_mirror_manifest_db_links(&root, &manifest_path, links, None)
974}
975
976struct CopyToTempResult {
977    temp_path: PathBuf,
978    blob_blake3: String,
979    bytes_copied: u64,
980}
981
982fn copy_source_to_private_temp(
983    source_path: &Path,
984    temp_dir: &Path,
985    source_metadata: &fs::Metadata,
986) -> Result<CopyToTempResult> {
987    let temp_path = unique_temp_path(temp_dir, "blob");
988    let mut source = open_stable_source_file(source_path, source_metadata)?;
989    let mut temp = private_create_new_file(&temp_path)?;
990    let mut hasher = blake3::Hasher::new();
991    let mut buffer = [0u8; 64 * 1024];
992    let mut bytes_copied = 0u64;
993    loop {
994        let read = source
995            .read(&mut buffer)
996            .with_context(|| format!("read raw mirror source {}", source_path.display()))?;
997        if read == 0 {
998            break;
999        }
1000        temp.write_all(&buffer[..read])
1001            .with_context(|| format!("write raw mirror temp {}", temp_path.display()))?;
1002        hasher.update(&buffer[..read]);
1003        bytes_copied = bytes_copied.saturating_add(read as u64);
1004    }
1005    sync_open_file_if_required(&temp, || {
1006        format!("sync raw mirror temp {}", temp_path.display())
1007    })?;
1008
1009    let final_source_metadata = source
1010        .metadata()
1011        .with_context(|| format!("stat opened raw mirror source {}", source_path.display()))?;
1012    if source_file_changed_during_capture(source_metadata, &final_source_metadata) {
1013        remove_temp_best_effort(&temp_path);
1014        return Err(anyhow!(
1015            "raw mirror source {} changed while it was being captured; retry indexing to capture a stable copy",
1016            source_path.display()
1017        ));
1018    }
1019
1020    Ok(CopyToTempResult {
1021        temp_path,
1022        blob_blake3: hasher.finalize().to_hex().to_string(),
1023        bytes_copied,
1024    })
1025}
1026
1027fn open_stable_source_file(source_path: &Path, expected_metadata: &fs::Metadata) -> Result<File> {
1028    let source = File::open(source_path)
1029        .with_context(|| format!("open raw mirror source {}", source_path.display()))?;
1030    let opened_metadata = source
1031        .metadata()
1032        .with_context(|| format!("stat opened raw mirror source {}", source_path.display()))?;
1033    if !same_source_identity(expected_metadata, &opened_metadata) {
1034        return Err(anyhow!(
1035            "raw mirror source {} changed identity before capture",
1036            source_path.display()
1037        ));
1038    }
1039    let current_path_metadata = fs::symlink_metadata(source_path)
1040        .with_context(|| format!("restat raw mirror source {}", source_path.display()))?;
1041    if current_path_metadata.file_type().is_symlink() {
1042        return Err(anyhow!(
1043            "refusing to raw-mirror symlink source {}",
1044            source_path.display()
1045        ));
1046    }
1047    if !same_source_identity(expected_metadata, &current_path_metadata) {
1048        return Err(anyhow!(
1049            "raw mirror source {} changed identity before capture",
1050            source_path.display()
1051        ));
1052    }
1053    Ok(source)
1054}
1055
1056#[cfg(unix)]
1057fn same_source_identity(expected: &fs::Metadata, actual: &fs::Metadata) -> bool {
1058    use std::os::unix::fs::MetadataExt;
1059    actual.is_file() && expected.dev() == actual.dev() && expected.ino() == actual.ino()
1060}
1061
1062#[cfg(not(unix))]
1063fn same_source_identity(_expected: &fs::Metadata, actual: &fs::Metadata) -> bool {
1064    actual.is_file()
1065}
1066
1067#[cfg(unix)]
1068fn source_identity_token(metadata: &fs::Metadata) -> Option<String> {
1069    use std::os::unix::fs::MetadataExt;
1070    Some(format!("{}:{}", metadata.dev(), metadata.ino()))
1071}
1072
1073#[cfg(not(unix))]
1074fn source_identity_token(_metadata: &fs::Metadata) -> Option<String> {
1075    None
1076}
1077
1078#[cfg(unix)]
1079fn source_change_time_ns(metadata: &fs::Metadata) -> Option<u128> {
1080    use std::os::unix::fs::MetadataExt;
1081
1082    let seconds = u128::try_from(metadata.ctime()).ok()?;
1083    let nanoseconds = u128::try_from(metadata.ctime_nsec()).ok()?;
1084    Some(
1085        seconds
1086            .saturating_mul(1_000_000_000)
1087            .saturating_add(nanoseconds),
1088    )
1089}
1090
1091#[cfg(not(unix))]
1092fn source_change_time_ns(_metadata: &fs::Metadata) -> Option<u128> {
1093    None
1094}
1095
1096fn source_file_changed_during_capture(
1097    initial: &fs::Metadata,
1098    final_metadata: &fs::Metadata,
1099) -> bool {
1100    if initial.len() != final_metadata.len() {
1101        return true;
1102    }
1103    match (initial.modified().ok(), final_metadata.modified().ok()) {
1104        (Some(initial_mtime), Some(final_mtime)) => initial_mtime != final_mtime,
1105        _ => false,
1106    }
1107}
1108
1109fn publish_content_addressed_temp(
1110    root: &Path,
1111    temp_path: &Path,
1112    final_path: &Path,
1113    expected_blake3: &str,
1114) -> Result<bool> {
1115    ensure_private_dir_descendant(
1116        root,
1117        final_path
1118            .parent()
1119            .ok_or_else(|| anyhow!("raw mirror blob path has no parent"))?,
1120    )?;
1121    if final_path.exists() {
1122        verify_existing_file(final_path, expected_blake3)?;
1123        remove_temp_best_effort(temp_path);
1124        return Ok(true);
1125    }
1126
1127    match fs::hard_link(temp_path, final_path) {
1128        Ok(()) => {
1129            sync_file(final_path)?;
1130            sync_parent(final_path)?;
1131            remove_temp_best_effort(temp_path);
1132            Ok(false)
1133        }
1134        Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {
1135            verify_existing_file(final_path, expected_blake3)?;
1136            remove_temp_best_effort(temp_path);
1137            Ok(true)
1138        }
1139        Err(err) => Err(anyhow!(
1140            "publish raw mirror blob {} from {}: {err}",
1141            final_path.display(),
1142            temp_path.display()
1143        )),
1144    }
1145}
1146
1147fn publish_manifest_bytes_create_new(
1148    root: &Path,
1149    manifest_path: &Path,
1150    manifest_bytes: &[u8],
1151    blob_blake3: &str,
1152) -> Result<bool> {
1153    ensure_private_dir_descendant(
1154        root,
1155        manifest_path
1156            .parent()
1157            .ok_or_else(|| anyhow!("raw mirror manifest path has no parent"))?,
1158    )?;
1159    if manifest_path.exists() {
1160        verify_existing_manifest(manifest_path, blob_blake3)?;
1161        return Ok(true);
1162    }
1163
1164    let temp_dir = unique_capture_temp_dir(root);
1165    ensure_private_dir_descendant(root, &temp_dir)?;
1166    let temp_path = unique_temp_path(&temp_dir, "manifest");
1167    let mut temp = private_create_new_file(&temp_path)?;
1168    temp.write_all(manifest_bytes)
1169        .with_context(|| format!("write raw mirror manifest temp {}", temp_path.display()))?;
1170    sync_open_file_if_required(&temp, || {
1171        format!("sync raw mirror manifest temp {}", temp_path.display())
1172    })?;
1173
1174    match fs::hard_link(&temp_path, manifest_path) {
1175        Ok(()) => {
1176            sync_file(manifest_path)?;
1177            sync_parent(manifest_path)?;
1178            remove_temp_best_effort(&temp_path);
1179            remove_empty_temp_dir_best_effort(&temp_dir);
1180            Ok(false)
1181        }
1182        Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {
1183            verify_existing_manifest(manifest_path, blob_blake3)?;
1184            remove_temp_best_effort(&temp_path);
1185            remove_empty_temp_dir_best_effort(&temp_dir);
1186            Ok(true)
1187        }
1188        Err(err) => Err(anyhow!(
1189            "publish raw mirror manifest {} from {}: {err}",
1190            manifest_path.display(),
1191            temp_path.display()
1192        )),
1193    }
1194}
1195
1196fn merge_raw_mirror_manifest_db_links(
1197    root: &Path,
1198    manifest_path: &Path,
1199    links: &[RawMirrorDbLink],
1200    expected_blob_blake3: Option<&str>,
1201) -> Result<()> {
1202    if links.is_empty() {
1203        return Ok(());
1204    }
1205
1206    let lock = MANIFEST_UPDATE_LOCK.get_or_init(|| Mutex::new(()));
1207    let _guard = lock
1208        .lock()
1209        .map_err(|_| anyhow!("raw mirror manifest update lock poisoned"))?;
1210
1211    let mut manifest = read_raw_mirror_manifest(manifest_path)?;
1212    if let Some(expected_blob_blake3) = expected_blob_blake3
1213        && manifest.blob_blake3 != expected_blob_blake3
1214    {
1215        return Err(anyhow!(
1216            "existing raw mirror manifest {} points at blob {}, expected {}",
1217            manifest_path.display(),
1218            manifest.blob_blake3,
1219            expected_blob_blake3
1220        ));
1221    }
1222
1223    let mut merged_links = manifest.db_links.clone();
1224    merged_links.extend_from_slice(links);
1225    let merged_links = unique_db_links(&merged_links);
1226    if merged_links == manifest.db_links {
1227        return Ok(());
1228    }
1229
1230    manifest.db_links = merged_links;
1231    manifest.manifest_blake3 = Some(raw_mirror_manifest_blake3(&manifest));
1232    let manifest_bytes = serde_json::to_vec_pretty(&manifest)?;
1233    replace_manifest_bytes(root, manifest_path, &manifest_bytes)
1234}
1235
1236fn replace_manifest_bytes(root: &Path, manifest_path: &Path, manifest_bytes: &[u8]) -> Result<()> {
1237    ensure_private_dir_descendant(
1238        root,
1239        manifest_path
1240            .parent()
1241            .ok_or_else(|| anyhow!("raw mirror manifest path has no parent"))?,
1242    )?;
1243    let temp_dir = unique_capture_temp_dir(root);
1244    ensure_private_dir_descendant(root, &temp_dir)?;
1245    let temp_path = unique_temp_path(&temp_dir, "manifest-update");
1246    let mut temp = private_create_new_file(&temp_path)?;
1247    temp.write_all(manifest_bytes).with_context(|| {
1248        format!(
1249            "write raw mirror manifest update temp {}",
1250            temp_path.display()
1251        )
1252    })?;
1253    sync_open_file_if_required(&temp, || {
1254        format!(
1255            "sync raw mirror manifest update temp {}",
1256            temp_path.display()
1257        )
1258    })?;
1259    drop(temp);
1260
1261    fs::rename(&temp_path, manifest_path).with_context(|| {
1262        format!(
1263            "replace raw mirror manifest {} from {}",
1264            manifest_path.display(),
1265            temp_path.display()
1266        )
1267    })?;
1268    set_private_file_permissions(manifest_path)?;
1269    sync_file(manifest_path)?;
1270    sync_parent(manifest_path)?;
1271    remove_empty_temp_dir_best_effort(&temp_dir);
1272    Ok(())
1273}
1274
1275fn raw_mirror_manifest_path_from_relative(root: &Path, relative_path: &str) -> Result<PathBuf> {
1276    let relative = Path::new(relative_path);
1277    if relative.is_absolute() {
1278        return Err(anyhow!(
1279            "raw mirror manifest path must be relative: {relative_path}"
1280        ));
1281    }
1282
1283    let mut normal_components = Vec::new();
1284    for component in relative.components() {
1285        match component {
1286            std::path::Component::Normal(part) => normal_components.push(part),
1287            _ => {
1288                return Err(anyhow!(
1289                    "raw mirror manifest path must use only normal relative components: {relative_path}"
1290                ));
1291            }
1292        }
1293    }
1294
1295    if normal_components.len() != 2
1296        || normal_components[0] != std::ffi::OsStr::new("manifests")
1297        || Path::new(normal_components[1])
1298            .extension()
1299            .and_then(|ext| ext.to_str())
1300            != Some("json")
1301    {
1302        return Err(anyhow!(
1303            "raw mirror manifest path must match manifests/<id>.json: {relative_path}"
1304        ));
1305    }
1306
1307    Ok(root.join(relative))
1308}
1309
1310fn verify_existing_file(path: &Path, expected_blake3: &str) -> Result<()> {
1311    let metadata = fs::symlink_metadata(path)
1312        .with_context(|| format!("stat raw mirror blob {}", path.display()))?;
1313    if metadata.file_type().is_symlink() {
1314        return Err(anyhow!(
1315            "refusing to read symlink raw mirror blob {}",
1316            path.display()
1317        ));
1318    }
1319    if !metadata.is_file() {
1320        return Err(anyhow!(
1321            "refusing to read non-file raw mirror blob {}",
1322            path.display()
1323        ));
1324    }
1325    let actual = file_blake3(path)?;
1326    if actual == expected_blake3 {
1327        Ok(())
1328    } else {
1329        Err(anyhow!(
1330            "existing raw mirror blob {} has blake3 {}, expected {}",
1331            path.display(),
1332            actual,
1333            expected_blake3
1334        ))
1335    }
1336}
1337
1338fn verify_existing_manifest(path: &Path, expected_blob_blake3: &str) -> Result<()> {
1339    let manifest = read_raw_mirror_manifest(path)?;
1340    if manifest.blob_blake3 == expected_blob_blake3 {
1341        Ok(())
1342    } else {
1343        Err(anyhow!(
1344            "existing raw mirror manifest {} points at blob {}, expected {}",
1345            path.display(),
1346            manifest.blob_blake3,
1347            expected_blob_blake3
1348        ))
1349    }
1350}
1351
1352fn read_raw_mirror_manifest(path: &Path) -> Result<RawMirrorManifestFile> {
1353    let metadata = fs::symlink_metadata(path)
1354        .with_context(|| format!("stat raw mirror manifest {}", path.display()))?;
1355    if metadata.file_type().is_symlink() {
1356        return Err(anyhow!(
1357            "refusing to read symlink raw mirror manifest {}",
1358            path.display()
1359        ));
1360    }
1361    if !metadata.is_file() {
1362        return Err(anyhow!(
1363            "refusing to read non-file raw mirror manifest {}",
1364            path.display()
1365        ));
1366    }
1367    serde_json::from_slice(
1368        &fs::read(path).with_context(|| format!("read raw mirror manifest {}", path.display()))?,
1369    )
1370    .with_context(|| format!("parse raw mirror manifest {}", path.display()))
1371}
1372
1373fn raw_mirror_root(data_dir: &Path) -> PathBuf {
1374    data_dir
1375        .join(RAW_MIRROR_ROOT_DIR)
1376        .join(RAW_MIRROR_VERSION_DIR)
1377}
1378
1379fn ensure_raw_mirror_root(data_dir: &Path) -> Result<PathBuf> {
1380    let root_parent = data_dir.join(RAW_MIRROR_ROOT_DIR);
1381    ensure_private_dir(&root_parent)?;
1382    let root = root_parent.join(RAW_MIRROR_VERSION_DIR);
1383    ensure_private_dir(&root)?;
1384    Ok(root)
1385}
1386
1387fn raw_mirror_blob_cache_key(
1388    input: &RawMirrorCaptureInput<'_>,
1389    source_metadata: &fs::Metadata,
1390) -> RawMirrorBlobCacheKey {
1391    RawMirrorBlobCacheKey {
1392        data_dir: input.data_dir.to_path_buf(),
1393        source_path: input.source_path.to_path_buf(),
1394        source_identity: source_identity_token(source_metadata),
1395        source_size_bytes: source_metadata.len(),
1396        source_mtime_ns: source_metadata.modified().ok().and_then(system_time_to_ns),
1397        source_change_time_ns: source_change_time_ns(source_metadata),
1398    }
1399}
1400
1401fn cached_raw_mirror_blob_record(
1402    key: &RawMirrorBlobCacheKey,
1403    root: &Path,
1404) -> Option<RawMirrorBlobRecord> {
1405    let cache = BLOB_CAPTURE_CACHE.get_or_init(|| Mutex::new(HashMap::new()));
1406    let record = {
1407        let mut guard = cache.lock().ok()?;
1408        let record = guard.get(key).cloned()?;
1409        if raw_mirror_blob_relative_path(&record.blob_blake3).is_none() {
1410            guard.remove(key);
1411            return None;
1412        }
1413        record
1414    };
1415
1416    let blob_relative_path = raw_mirror_blob_relative_path(&record.blob_blake3)?;
1417    let blob_path = root.join(blob_relative_path);
1418    let metadata_valid = fs::symlink_metadata(&blob_path)
1419        .map(|metadata| metadata.is_file() && !metadata.file_type().is_symlink())
1420        .unwrap_or(false);
1421    if !metadata_valid {
1422        remove_cached_raw_mirror_blob_record_if_unchanged(cache, key, &record);
1423        return None;
1424    }
1425
1426    match file_blake3(&blob_path) {
1427        Ok(actual) if actual == record.blob_blake3 => Some(record),
1428        Ok(actual) => {
1429            tracing::warn!(
1430                path = %blob_path.display(),
1431                expected_blake3 = %record.blob_blake3,
1432                actual_blake3 = %actual,
1433                "discarding raw mirror blob cache entry with mismatched content"
1434            );
1435            remove_cached_raw_mirror_blob_record_if_unchanged(cache, key, &record);
1436            None
1437        }
1438        Err(err) => {
1439            tracing::debug!(
1440                path = %blob_path.display(),
1441                error = %err,
1442                "discarding unreadable raw mirror blob cache entry"
1443            );
1444            remove_cached_raw_mirror_blob_record_if_unchanged(cache, key, &record);
1445            None
1446        }
1447    }
1448}
1449
1450fn remove_cached_raw_mirror_blob_record_if_unchanged(
1451    cache: &Mutex<HashMap<RawMirrorBlobCacheKey, RawMirrorBlobRecord>>,
1452    key: &RawMirrorBlobCacheKey,
1453    stale_record: &RawMirrorBlobRecord,
1454) {
1455    if let Ok(mut guard) = cache.lock()
1456        && guard
1457            .get(key)
1458            .is_some_and(|current| current == stale_record)
1459    {
1460        guard.remove(key);
1461    }
1462}
1463
1464fn cache_raw_mirror_blob_record(key: RawMirrorBlobCacheKey, record: RawMirrorBlobRecord) {
1465    let cache = BLOB_CAPTURE_CACHE.get_or_init(|| Mutex::new(HashMap::new()));
1466    if let Ok(mut guard) = cache.lock() {
1467        guard.insert(key, record);
1468    }
1469}
1470
1471fn raw_mirror_blob_relative_path(blob_blake3: &str) -> Option<String> {
1472    if blob_blake3.len() != 64 || !blob_blake3.chars().all(|c| c.is_ascii_hexdigit()) {
1473        return None;
1474    }
1475    let lower = blob_blake3.to_ascii_lowercase();
1476    Some(format!(
1477        "blobs/{}/{}/{}.{}",
1478        RAW_MIRROR_HASH_ALGORITHM,
1479        &lower[..2],
1480        lower,
1481        RAW_MIRROR_BLOB_EXTENSION
1482    ))
1483}
1484
1485fn raw_mirror_manifest_relative_path(manifest_id: &str) -> String {
1486    format!("manifests/{manifest_id}.json")
1487}
1488
1489fn raw_mirror_original_path_blake3(original_path: &str) -> String {
1490    let mut hasher = blake3::Hasher::new();
1491    hasher.update(b"doctor-raw-mirror-original-path-v1");
1492    hasher.update(&[0]);
1493    hasher.update(original_path.as_bytes());
1494    hasher.finalize().to_hex().to_string()
1495}
1496
1497fn raw_mirror_manifest_id(
1498    provider: &str,
1499    source_id: &str,
1500    origin_kind: &str,
1501    origin_host: Option<&str>,
1502    original_path_blake3: &str,
1503    blob_blake3: &str,
1504) -> String {
1505    canonical_blake3(
1506        "doctor-raw-mirror-manifest-id-v1",
1507        json!({
1508            "provider": provider,
1509            "source_id": source_id,
1510            "origin_kind": origin_kind,
1511            "origin_host": origin_host,
1512            "original_path_blake3": original_path_blake3,
1513            "blob_blake3": blob_blake3,
1514        }),
1515    )
1516}
1517
1518fn raw_mirror_manifest_blake3(manifest: &RawMirrorManifestFile) -> String {
1519    let mut value = serde_json::to_value(manifest).unwrap_or_default();
1520    if let Value::Object(map) = &mut value {
1521        map.remove("manifest_blake3");
1522    }
1523    canonical_blake3("doctor-raw-mirror-manifest-v1", value)
1524}
1525
1526fn canonical_blake3(prefix: &str, value: Value) -> String {
1527    let encoded = serde_json::to_vec(&canonical_json_value(value)).unwrap_or_default();
1528    let mut hasher = blake3::Hasher::new();
1529    hasher.update(prefix.as_bytes());
1530    hasher.update(&[0]);
1531    hasher.update(&encoded);
1532    format!("{prefix}-{}", hasher.finalize().to_hex())
1533}
1534
1535fn canonical_json_value(value: Value) -> Value {
1536    match value {
1537        Value::Array(items) => Value::Array(items.into_iter().map(canonical_json_value).collect()),
1538        Value::Object(map) => {
1539            let mut entries: Vec<_> = map.into_iter().collect();
1540            entries.sort_by(|left, right| left.0.cmp(&right.0));
1541            let mut canonical = serde_json::Map::new();
1542            for (key, value) in entries {
1543                canonical.insert(key, canonical_json_value(value));
1544            }
1545            Value::Object(canonical)
1546        }
1547        other => other,
1548    }
1549}
1550
1551fn unique_db_links(links: &[RawMirrorDbLink]) -> Vec<RawMirrorDbLink> {
1552    let mut dedup = links.to_vec();
1553    dedup.sort_by(|left, right| {
1554        (
1555            left.conversation_id,
1556            left.message_count,
1557            left.started_at_ms,
1558            left.source_path.as_deref().unwrap_or(""),
1559        )
1560            .cmp(&(
1561                right.conversation_id,
1562                right.message_count,
1563                right.started_at_ms,
1564                right.source_path.as_deref().unwrap_or(""),
1565            ))
1566    });
1567    dedup.dedup();
1568    dedup
1569}
1570
1571fn file_blake3(path: &Path) -> Result<String> {
1572    let mut file = File::open(path).with_context(|| format!("open {}", path.display()))?;
1573    let mut hasher = blake3::Hasher::new();
1574    let mut buffer = [0u8; 64 * 1024];
1575    loop {
1576        let read = file
1577            .read(&mut buffer)
1578            .with_context(|| format!("read {}", path.display()))?;
1579        if read == 0 {
1580            break;
1581        }
1582        hasher.update(&buffer[..read]);
1583    }
1584    Ok(hasher.finalize().to_hex().to_string())
1585}
1586
1587fn ensure_private_dir(path: &Path) -> Result<()> {
1588    create_private_dir_all(path)
1589        .with_context(|| format!("create raw mirror dir {}", path.display()))?;
1590    let metadata = fs::symlink_metadata(path)
1591        .with_context(|| format!("stat raw mirror dir {}", path.display()))?;
1592    let file_type = metadata.file_type();
1593    if file_type.is_symlink() {
1594        return Err(anyhow!(
1595            "refusing to use symlink raw mirror dir {}",
1596            path.display()
1597        ));
1598    }
1599    if !file_type.is_dir() {
1600        return Err(anyhow!(
1601            "refusing to use non-directory raw mirror path {}",
1602            path.display()
1603        ));
1604    }
1605    #[cfg(unix)]
1606    {
1607        use std::os::unix::fs::PermissionsExt;
1608        if metadata.permissions().mode() & 0o777 != 0o700 {
1609            set_private_dir_permissions(path)?;
1610        }
1611    }
1612    #[cfg(not(unix))]
1613    {
1614        set_private_dir_permissions(path)?;
1615    }
1616    Ok(())
1617}
1618
1619fn ensure_private_dir_descendant(root: &Path, path: &Path) -> Result<()> {
1620    let relative = path.strip_prefix(root).with_context(|| {
1621        format!(
1622            "raw mirror private dir {} is not under root {}",
1623            path.display(),
1624            root.display()
1625        )
1626    })?;
1627
1628    if let Some(root_parent) = root.parent() {
1629        ensure_private_dir(root_parent)?;
1630    }
1631    ensure_private_dir(root)?;
1632    let mut current = root.to_path_buf();
1633    for component in relative.components() {
1634        match component {
1635            Component::Normal(part) => {
1636                current.push(part);
1637                ensure_private_dir(&current)?;
1638            }
1639            Component::CurDir => {}
1640            _ => {
1641                return Err(anyhow!(
1642                    "raw mirror private dir contains non-normal component: {}",
1643                    path.display()
1644                ));
1645            }
1646        }
1647    }
1648
1649    Ok(())
1650}
1651
1652fn private_create_new_file(path: &Path) -> Result<File> {
1653    let mut options = OpenOptions::new();
1654    options.write(true).create_new(true);
1655    set_private_create_file_mode(&mut options);
1656    let file = options
1657        .open(path)
1658        .with_context(|| format!("create raw mirror file {}", path.display()))?;
1659    Ok(file)
1660}
1661
1662#[cfg(unix)]
1663fn create_private_dir_all(path: &Path) -> std::io::Result<()> {
1664    use std::os::unix::fs::DirBuilderExt;
1665
1666    let mut builder = fs::DirBuilder::new();
1667    builder.recursive(true).mode(0o700).create(path)
1668}
1669
1670#[cfg(not(unix))]
1671fn create_private_dir_all(path: &Path) -> std::io::Result<()> {
1672    fs::create_dir_all(path)
1673}
1674
1675#[cfg(unix)]
1676fn set_private_create_file_mode(options: &mut OpenOptions) {
1677    use std::os::unix::fs::OpenOptionsExt;
1678
1679    options.mode(0o600);
1680}
1681
1682#[cfg(not(unix))]
1683fn set_private_create_file_mode(_options: &mut OpenOptions) {}
1684
1685fn sync_open_file_if_required(message_file: &File, context: impl FnOnce() -> String) -> Result<()> {
1686    if !raw_mirror_fsync_enabled() {
1687        return Ok(());
1688    }
1689    message_file.sync_all().with_context(context)
1690}
1691
1692fn sync_file(path: &Path) -> Result<()> {
1693    if !raw_mirror_fsync_enabled() {
1694        return Ok(());
1695    }
1696    let mut options = OpenOptions::new();
1697    options.read(true);
1698    #[cfg(windows)]
1699    options.write(true);
1700    options
1701        .open(path)
1702        .and_then(|file| file.sync_all())
1703        .with_context(|| format!("sync raw mirror file {}", path.display()))
1704}
1705
1706#[cfg(not(windows))]
1707fn sync_parent(path: &Path) -> Result<()> {
1708    if !raw_mirror_fsync_enabled() {
1709        return Ok(());
1710    }
1711    let Some(parent) = path.parent() else {
1712        return Ok(());
1713    };
1714    File::open(parent)
1715        .and_then(|file| file.sync_all())
1716        .with_context(|| format!("sync raw mirror parent {}", parent.display()))
1717}
1718
1719#[cfg(windows)]
1720fn sync_parent(_path: &Path) -> Result<()> {
1721    Ok(())
1722}
1723
1724fn unique_temp_path(dir: &Path, label: &str) -> PathBuf {
1725    let nonce = TEMP_NONCE.fetch_add(1, Ordering::Relaxed);
1726    let nanos = SystemTime::now()
1727        .duration_since(UNIX_EPOCH)
1728        .unwrap_or_default()
1729        .as_nanos();
1730    dir.join(format!(
1731        ".{label}.{}.{}.{}.tmp",
1732        std::process::id(),
1733        nanos,
1734        nonce
1735    ))
1736}
1737
1738fn unique_capture_temp_dir(root: &Path) -> PathBuf {
1739    let nonce = TEMP_NONCE.fetch_add(1, Ordering::Relaxed);
1740    let nanos = SystemTime::now()
1741        .duration_since(UNIX_EPOCH)
1742        .unwrap_or_default()
1743        .as_nanos();
1744    root.join("tmp").join(format!(
1745        "capture.{}.{}.{}",
1746        std::process::id(),
1747        nanos,
1748        nonce
1749    ))
1750}
1751
1752fn remove_temp_best_effort(path: &Path) {
1753    if let Err(err) = fs::remove_file(path) {
1754        tracing::debug!(
1755            path = %path.display(),
1756            error = %err,
1757            "failed to remove raw mirror temp file"
1758        );
1759    }
1760}
1761
1762fn remove_empty_temp_dir_best_effort(path: &Path) {
1763    if let Err(err) = fs::remove_dir(path) {
1764        tracing::debug!(
1765            path = %path.display(),
1766            error = %err,
1767            "failed to remove raw mirror temp directory"
1768        );
1769    }
1770}
1771
1772fn redacted_original_path(provider: &str, source_path: &Path) -> String {
1773    let file_name = source_path
1774        .file_name()
1775        .and_then(|name| name.to_str())
1776        .unwrap_or("session");
1777    format!("[{provider}]/{file_name}")
1778}
1779
1780fn now_ms() -> i64 {
1781    system_time_to_ms(SystemTime::now()).unwrap_or(0)
1782}
1783
1784fn system_time_to_ms(time: SystemTime) -> Option<i64> {
1785    time.duration_since(UNIX_EPOCH)
1786        .ok()
1787        .and_then(|duration| i64::try_from(duration.as_millis()).ok())
1788}
1789
1790fn system_time_to_ns(time: SystemTime) -> Option<u128> {
1791    time.duration_since(UNIX_EPOCH)
1792        .ok()
1793        .map(|duration| duration.as_nanos())
1794}
1795
1796#[cfg(unix)]
1797fn set_private_dir_permissions(path: &Path) -> Result<()> {
1798    use std::os::unix::fs::PermissionsExt;
1799    fs::set_permissions(path, fs::Permissions::from_mode(0o700))
1800        .with_context(|| format!("set raw mirror dir permissions {}", path.display()))
1801}
1802
1803#[cfg(not(unix))]
1804fn set_private_dir_permissions(_path: &Path) -> Result<()> {
1805    Ok(())
1806}
1807
1808#[cfg(unix)]
1809fn set_private_file_permissions(path: &Path) -> Result<()> {
1810    use std::os::unix::fs::PermissionsExt;
1811    fs::set_permissions(path, fs::Permissions::from_mode(0o600))
1812        .with_context(|| format!("set raw mirror file permissions {}", path.display()))
1813}
1814
1815#[cfg(not(unix))]
1816fn set_private_file_permissions(_path: &Path) -> Result<()> {
1817    Ok(())
1818}
1819
1820#[cfg(test)]
1821mod tests {
1822    use super::*;
1823
1824    #[test]
1825    fn capture_source_file_writes_doctor_compatible_manifest_idempotently() {
1826        let temp = tempfile::TempDir::new().expect("tempdir");
1827        let data_dir = temp.path().join("cass-data");
1828        let source_path = temp.path().join("rollout-fixture.jsonl");
1829        let source_bytes = b"{\"type\":\"message\",\"text\":\"hello\"}\n";
1830        fs::write(&source_path, source_bytes).expect("write source");
1831        let db_link = RawMirrorDbLink {
1832            conversation_id: Some(42),
1833            message_count: Some(1),
1834            source_path: Some(source_path.display().to_string()),
1835            started_at_ms: Some(1_733_000_000_000),
1836        };
1837
1838        let first = capture_source_file(RawMirrorCaptureInput {
1839            data_dir: &data_dir,
1840            provider: "codex",
1841            source_id: "local",
1842            origin_kind: "local",
1843            origin_host: None,
1844            source_path: &source_path,
1845            db_links: std::slice::from_ref(&db_link),
1846        })
1847        .expect("first capture");
1848        let second = capture_source_file(RawMirrorCaptureInput {
1849            data_dir: &data_dir,
1850            provider: "codex",
1851            source_id: "local",
1852            origin_kind: "local",
1853            origin_host: None,
1854            source_path: &source_path,
1855            db_links: std::slice::from_ref(&db_link),
1856        })
1857        .expect("second capture");
1858
1859        assert_eq!(first.manifest_id, second.manifest_id);
1860        assert_eq!(first.blob_blake3, second.blob_blake3);
1861        assert_eq!(first.captured_at_ms, second.captured_at_ms);
1862        assert_eq!(first.source_mtime_ms, second.source_mtime_ms);
1863        assert!(!first.already_present);
1864        assert!(second.already_present);
1865        assert_eq!(fs::read(&source_path).expect("source bytes"), source_bytes);
1866
1867        let blob_path = data_dir
1868            .join(RAW_MIRROR_ROOT_DIR)
1869            .join(RAW_MIRROR_VERSION_DIR)
1870            .join(&first.blob_relative_path);
1871        let manifest_path = data_dir
1872            .join(RAW_MIRROR_ROOT_DIR)
1873            .join(RAW_MIRROR_VERSION_DIR)
1874            .join(&first.manifest_relative_path);
1875        assert_eq!(fs::read(blob_path).expect("blob bytes"), source_bytes);
1876
1877        let manifest: Value =
1878            serde_json::from_slice(&fs::read(&manifest_path).expect("manifest bytes"))
1879                .expect("manifest json");
1880        assert_eq!(
1881            manifest["manifest_kind"].as_str(),
1882            Some(RAW_MIRROR_MANIFEST_KIND)
1883        );
1884        assert_eq!(manifest["provider"].as_str(), Some("codex"));
1885        assert_eq!(
1886            manifest["blob_blake3"].as_str(),
1887            Some(first.blob_blake3.as_str())
1888        );
1889        assert_eq!(
1890            manifest["redacted_original_path"].as_str(),
1891            Some("[codex]/rollout-fixture.jsonl")
1892        );
1893        assert_eq!(
1894            manifest["db_links"][0]["conversation_id"].as_i64(),
1895            Some(42)
1896        );
1897        assert_eq!(manifest["db_links"][0]["message_count"].as_u64(), Some(1));
1898        assert!(
1899            manifest["manifest_blake3"]
1900                .as_str()
1901                .is_some_and(|value| value.starts_with("doctor-raw-mirror-manifest-v1-"))
1902        );
1903        let tmp_root = data_dir
1904            .join(RAW_MIRROR_ROOT_DIR)
1905            .join(RAW_MIRROR_VERSION_DIR)
1906            .join("tmp");
1907        assert_eq!(
1908            fs::read_dir(&tmp_root)
1909                .expect("raw mirror tmp root")
1910                .collect::<Vec<_>>()
1911                .len(),
1912            0,
1913            "successful captures must not leave doctor-visible interrupted temp artifacts"
1914        );
1915
1916        #[cfg(unix)]
1917        {
1918            use std::os::unix::fs::PermissionsExt;
1919
1920            let root = data_dir
1921                .join(RAW_MIRROR_ROOT_DIR)
1922                .join(RAW_MIRROR_VERSION_DIR);
1923            assert_eq!(
1924                fs::metadata(&root)
1925                    .expect("raw mirror root metadata")
1926                    .permissions()
1927                    .mode()
1928                    & 0o777,
1929                0o700
1930            );
1931            assert_eq!(
1932                fs::metadata(&manifest_path)
1933                    .expect("manifest metadata")
1934                    .permissions()
1935                    .mode()
1936                    & 0o777,
1937                0o600
1938            );
1939        }
1940    }
1941
1942    #[test]
1943    fn capture_source_file_merges_db_links_into_existing_manifest() {
1944        let temp = tempfile::TempDir::new().expect("tempdir");
1945        let data_dir = temp.path().join("cass-data");
1946        let source_path = temp.path().join("preparse-then-parsed.jsonl");
1947        let source_bytes = b"{\"type\":\"message\",\"text\":\"hello\"}\n";
1948        fs::write(&source_path, source_bytes).expect("write source");
1949
1950        let preparse = capture_source_file(RawMirrorCaptureInput {
1951            data_dir: &data_dir,
1952            provider: "codex",
1953            source_id: "local",
1954            origin_kind: "local",
1955            origin_host: None,
1956            source_path: &source_path,
1957            db_links: &[],
1958        })
1959        .expect("preparse capture");
1960
1961        let parsed_link = RawMirrorDbLink {
1962            conversation_id: None,
1963            message_count: Some(1),
1964            source_path: Some(source_path.display().to_string()),
1965            started_at_ms: Some(1_733_000_000_000),
1966        };
1967        let parsed = capture_source_file(RawMirrorCaptureInput {
1968            data_dir: &data_dir,
1969            provider: "codex",
1970            source_id: "local",
1971            origin_kind: "local",
1972            origin_host: None,
1973            source_path: &source_path,
1974            db_links: std::slice::from_ref(&parsed_link),
1975        })
1976        .expect("parsed capture");
1977
1978        assert_eq!(preparse.manifest_id, parsed.manifest_id);
1979        assert_eq!(preparse.blob_blake3, parsed.blob_blake3);
1980        assert!(parsed.already_present);
1981
1982        let manifest_path = data_dir
1983            .join(RAW_MIRROR_ROOT_DIR)
1984            .join(RAW_MIRROR_VERSION_DIR)
1985            .join(&parsed.manifest_relative_path);
1986        let manifest = read_raw_mirror_manifest(&manifest_path).expect("merged manifest");
1987        assert_eq!(
1988            manifest.db_links,
1989            vec![parsed_link],
1990            "second capture must enrich the pre-parse manifest with DB-link evidence"
1991        );
1992        let expected_manifest_blake3 = raw_mirror_manifest_blake3(&manifest);
1993        assert_eq!(
1994            manifest.manifest_blake3.as_deref(),
1995            Some(expected_manifest_blake3.as_str()),
1996            "manifest checksum must be recomputed after DB-link merge"
1997        );
1998        assert_eq!(fs::read(&source_path).expect("source bytes"), source_bytes);
1999    }
2000
2001    #[test]
2002    fn merge_manifest_db_links_rejects_hostile_relative_paths() {
2003        let temp = tempfile::TempDir::new().expect("tempdir");
2004        let data_dir = temp.path().join("cass-data");
2005        let db_link = RawMirrorDbLink {
2006            conversation_id: Some(42),
2007            message_count: Some(1),
2008            source_path: Some("source.jsonl".to_string()),
2009            started_at_ms: Some(1_733_000_000_000),
2010        };
2011
2012        for relative in [
2013            "../escape.json",
2014            "/tmp/escape.json",
2015            "manifests/../escape.json",
2016            "blobs/blake3/ab/not-a-manifest.raw",
2017            "manifests/not-json.txt",
2018        ] {
2019            let err = merge_manifest_db_links(&data_dir, relative, std::slice::from_ref(&db_link))
2020                .expect_err("hostile manifest path should be rejected");
2021            assert!(
2022                err.to_string().contains("raw mirror manifest path"),
2023                "unexpected error for {relative}: {err}"
2024            );
2025        }
2026    }
2027
2028    #[cfg(unix)]
2029    #[test]
2030    fn merge_manifest_db_links_rejects_symlink_manifest_path() {
2031        let temp = tempfile::TempDir::new().expect("tempdir");
2032        let data_dir = temp.path().join("cass-data");
2033        let manifest_dir = data_dir.join("raw-mirror/v1/manifests");
2034        fs::create_dir_all(&manifest_dir).expect("manifest dir");
2035        let outside = temp.path().join("outside.json");
2036        fs::write(&outside, "{}").expect("outside manifest");
2037        std::os::unix::fs::symlink(&outside, manifest_dir.join("link.json"))
2038            .expect("symlink manifest");
2039        let db_link = RawMirrorDbLink {
2040            conversation_id: Some(42),
2041            message_count: Some(1),
2042            source_path: Some("source.jsonl".to_string()),
2043            started_at_ms: Some(1_733_000_000_000),
2044        };
2045
2046        let err = merge_manifest_db_links(
2047            &data_dir,
2048            "manifests/link.json",
2049            std::slice::from_ref(&db_link),
2050        )
2051        .expect_err("symlink manifest should be rejected");
2052        assert!(
2053            err.to_string().contains("symlink raw mirror manifest"),
2054            "unexpected symlink-manifest error: {err}"
2055        );
2056    }
2057
2058    #[test]
2059    fn capture_source_file_deduplicates_blob_for_distinct_source_paths() {
2060        let temp = tempfile::TempDir::new().expect("tempdir");
2061        let data_dir = temp.path().join("cass-data");
2062        let first_source = temp.path().join("first.jsonl");
2063        let second_source = temp.path().join("second.jsonl");
2064        let source_bytes = b"{\"type\":\"message\",\"text\":\"shared\"}\n";
2065        fs::write(&first_source, source_bytes).expect("write first source");
2066        fs::write(&second_source, source_bytes).expect("write second source");
2067
2068        let first = capture_source_file(RawMirrorCaptureInput {
2069            data_dir: &data_dir,
2070            provider: "codex",
2071            source_id: "local",
2072            origin_kind: "local",
2073            origin_host: None,
2074            source_path: &first_source,
2075            db_links: &[],
2076        })
2077        .expect("first capture");
2078        let second = capture_source_file(RawMirrorCaptureInput {
2079            data_dir: &data_dir,
2080            provider: "codex",
2081            source_id: "local",
2082            origin_kind: "local",
2083            origin_host: None,
2084            source_path: &second_source,
2085            db_links: &[],
2086        })
2087        .expect("second capture");
2088
2089        assert_eq!(first.blob_blake3, second.blob_blake3);
2090        assert_eq!(first.blob_relative_path, second.blob_relative_path);
2091        assert_ne!(first.manifest_id, second.manifest_id);
2092        assert!(
2093            !second.already_present,
2094            "a duplicate blob with a new source manifest is not a full capture replay"
2095        );
2096
2097        let manifest_root = data_dir
2098            .join(RAW_MIRROR_ROOT_DIR)
2099            .join(RAW_MIRROR_VERSION_DIR)
2100            .join("manifests");
2101        let manifests = fs::read_dir(manifest_root)
2102            .expect("manifest dir")
2103            .collect::<std::io::Result<Vec<_>>>()
2104            .expect("manifest entries");
2105        assert_eq!(manifests.len(), 2);
2106
2107        let summary = storage_summary(&data_dir);
2108        assert!(summary.initialized);
2109        assert_eq!(summary.manifest_count, 2);
2110        assert_eq!(summary.unique_blob_count, 1);
2111        assert_eq!(summary.total_blob_bytes, source_bytes.len() as u64);
2112        assert_eq!(summary.largest_blob_bytes, source_bytes.len() as u64);
2113        assert_eq!(summary.missing_blob_count, 0);
2114        assert_eq!(summary.invalid_manifest_count, 0);
2115        assert!(summary.total_storage_bytes >= source_bytes.len() as u64);
2116    }
2117
2118    #[test]
2119    fn storage_summary_rejects_hostile_blob_relative_path() {
2120        let temp = tempfile::TempDir::new().expect("tempdir");
2121        let data_dir = temp.path().join("cass-data");
2122        let source_path = temp.path().join("source.jsonl");
2123        fs::write(
2124            &source_path,
2125            b"{\"type\":\"message\",\"text\":\"hostile\"}\n",
2126        )
2127        .expect("write source");
2128
2129        let captured = capture_source_file(RawMirrorCaptureInput {
2130            data_dir: &data_dir,
2131            provider: "codex",
2132            source_id: "local",
2133            origin_kind: "local",
2134            origin_host: None,
2135            source_path: &source_path,
2136            db_links: &[],
2137        })
2138        .expect("capture source");
2139        let manifest_path = data_dir
2140            .join(RAW_MIRROR_ROOT_DIR)
2141            .join(RAW_MIRROR_VERSION_DIR)
2142            .join(&captured.manifest_relative_path);
2143        let mut manifest = read_raw_mirror_manifest(&manifest_path).expect("read manifest");
2144        manifest.blob_relative_path = "../outside.raw".to_string();
2145        manifest.manifest_blake3 = Some(raw_mirror_manifest_blake3(&manifest));
2146        fs::write(
2147            &manifest_path,
2148            serde_json::to_vec_pretty(&manifest).expect("serialize manifest"),
2149        )
2150        .expect("tamper manifest");
2151
2152        let summary = storage_summary(&data_dir);
2153        assert_eq!(summary.manifest_count, 1);
2154        assert_eq!(summary.invalid_manifest_count, 1);
2155        assert_eq!(summary.unique_blob_count, 0);
2156        assert_eq!(summary.total_blob_bytes, 0);
2157    }
2158
2159    #[test]
2160    fn prune_fails_closed_on_hostile_manifest_inventory() {
2161        let temp = tempfile::TempDir::new().expect("tempdir");
2162        let data_dir = temp.path().join("cass-data");
2163        let source_path = temp.path().join("source.jsonl");
2164        fs::write(
2165            &source_path,
2166            b"{\"type\":\"message\",\"text\":\"hostile\"}\n",
2167        )
2168        .expect("write source");
2169
2170        let captured = capture_source_file(RawMirrorCaptureInput {
2171            data_dir: &data_dir,
2172            provider: "codex",
2173            source_id: "local",
2174            origin_kind: "local",
2175            origin_host: None,
2176            source_path: &source_path,
2177            db_links: &[],
2178        })
2179        .expect("capture source");
2180        let root = data_dir
2181            .join(RAW_MIRROR_ROOT_DIR)
2182            .join(RAW_MIRROR_VERSION_DIR);
2183        let manifest_path = root.join(&captured.manifest_relative_path);
2184        let blob_path = root.join(&captured.blob_relative_path);
2185        let mut manifest = read_raw_mirror_manifest(&manifest_path).expect("read manifest");
2186        manifest.blob_relative_path = "../outside.raw".to_string();
2187        manifest.manifest_blake3 = Some(raw_mirror_manifest_blake3(&manifest));
2188        fs::write(
2189            &manifest_path,
2190            serde_json::to_vec_pretty(&manifest).expect("serialize manifest"),
2191        )
2192        .expect("tamper manifest");
2193
2194        let err = prune(
2195            &data_dir,
2196            RawMirrorPruneOptions {
2197                older_than_ms: Some(0),
2198                max_size_bytes: None,
2199                keep_tags: Vec::new(),
2200                safety_hold_down_ms: 0,
2201                apply: true,
2202            },
2203        )
2204        .expect_err("hostile inventory should fail closed");
2205
2206        assert!(
2207            err.to_string().contains("unexpected blob path"),
2208            "error should explain the unsafe manifest inventory: {err}"
2209        );
2210        assert!(manifest_path.exists());
2211        assert!(blob_path.exists());
2212        assert!(!root.join("pruned.jsonl").exists());
2213    }
2214
2215    #[test]
2216    fn prune_dry_run_audits_without_removing_manifest_or_blob() {
2217        let temp = tempfile::TempDir::new().expect("tempdir");
2218        let data_dir = temp.path().join("cass-data");
2219        let source_path = temp.path().join("source.jsonl");
2220        fs::write(&source_path, b"{\"type\":\"message\",\"text\":\"old\"}\n")
2221            .expect("write source");
2222        let captured = capture_source_file(RawMirrorCaptureInput {
2223            data_dir: &data_dir,
2224            provider: "codex",
2225            source_id: "local",
2226            origin_kind: "local",
2227            origin_host: None,
2228            source_path: &source_path,
2229            db_links: &[],
2230        })
2231        .expect("capture source");
2232
2233        let report = prune(
2234            &data_dir,
2235            RawMirrorPruneOptions {
2236                older_than_ms: Some(0),
2237                max_size_bytes: None,
2238                keep_tags: Vec::new(),
2239                safety_hold_down_ms: 0,
2240                apply: false,
2241            },
2242        )
2243        .expect("dry-run prune");
2244
2245        assert!(report.initialized);
2246        assert_eq!(report.mode, "dry-run");
2247        assert_eq!(report.planned_manifest_count, 1);
2248        assert_eq!(report.planned_blob_count, 1);
2249        assert_eq!(report.applied_reclaim_bytes, 0);
2250        let root = data_dir
2251            .join(RAW_MIRROR_ROOT_DIR)
2252            .join(RAW_MIRROR_VERSION_DIR);
2253        assert!(root.join(&captured.manifest_relative_path).exists());
2254        assert!(root.join(&captured.blob_relative_path).exists());
2255        let audit_path = root.join("pruned.jsonl");
2256        let audit = fs::read_to_string(audit_path).expect("read audit");
2257        assert!(audit.contains("\"mode\":\"dry-run\""));
2258        assert!(audit.contains("\"applied\":false"));
2259    }
2260
2261    #[test]
2262    #[cfg(unix)]
2263    fn prune_refuses_symlinked_audit_log_without_writing_target() -> Result<()> {
2264        use std::os::unix::fs::symlink;
2265
2266        let temp = tempfile::TempDir::new()?;
2267        let data_dir = temp.path().join("cass-data");
2268        let source_path = temp.path().join("source.jsonl");
2269        let protected_audit_target = temp.path().join("protected-prune-audit.jsonl");
2270        fs::write(&source_path, b"{\"type\":\"message\",\"text\":\"old\"}\n")?;
2271        fs::write(&protected_audit_target, b"protected\n")?;
2272
2273        let captured = capture_source_file(RawMirrorCaptureInput {
2274            data_dir: &data_dir,
2275            provider: "codex",
2276            source_id: "local",
2277            origin_kind: "local",
2278            origin_host: None,
2279            source_path: &source_path,
2280            db_links: &[],
2281        })?;
2282        let root = data_dir
2283            .join(RAW_MIRROR_ROOT_DIR)
2284            .join(RAW_MIRROR_VERSION_DIR);
2285        let audit_path = root.join("pruned.jsonl");
2286        symlink(&protected_audit_target, &audit_path)?;
2287
2288        let err = match prune(
2289            &data_dir,
2290            RawMirrorPruneOptions {
2291                older_than_ms: Some(0),
2292                max_size_bytes: None,
2293                keep_tags: Vec::new(),
2294                safety_hold_down_ms: 0,
2295                apply: false,
2296            },
2297        ) {
2298            Ok(_) => anyhow::bail!("symlinked prune audit log was accepted"),
2299            Err(err) => err,
2300        };
2301
2302        if !err.to_string().contains("prune audit through symlink") {
2303            anyhow::bail!("unexpected audit symlink error: {err:#}");
2304        }
2305        if !fs::read(&protected_audit_target)?
2306            .as_slice()
2307            .eq(b"protected\n")
2308        {
2309            anyhow::bail!("protected audit target was modified");
2310        }
2311        if !fs::read_link(&audit_path)?
2312            .as_os_str()
2313            .eq(protected_audit_target.as_os_str())
2314        {
2315            anyhow::bail!("audit path did not remain a symlink to the protected target");
2316        }
2317        if !root.join(&captured.manifest_relative_path).exists() {
2318            anyhow::bail!("failed audit append removed the captured manifest");
2319        }
2320        if !root.join(&captured.blob_relative_path).exists() {
2321            anyhow::bail!("failed audit append removed the captured blob");
2322        }
2323        Ok(())
2324    }
2325
2326    #[test]
2327    fn prune_apply_removes_selected_manifest_and_unreferenced_blob() {
2328        let temp = tempfile::TempDir::new().expect("tempdir");
2329        let data_dir = temp.path().join("cass-data");
2330        let source_path = temp.path().join("source.jsonl");
2331        fs::write(&source_path, b"{\"type\":\"message\",\"text\":\"apply\"}\n")
2332            .expect("write source");
2333        let captured = capture_source_file(RawMirrorCaptureInput {
2334            data_dir: &data_dir,
2335            provider: "codex",
2336            source_id: "local",
2337            origin_kind: "local",
2338            origin_host: None,
2339            source_path: &source_path,
2340            db_links: &[],
2341        })
2342        .expect("capture source");
2343        let root = data_dir
2344            .join(RAW_MIRROR_ROOT_DIR)
2345            .join(RAW_MIRROR_VERSION_DIR);
2346        let manifest_path = root.join(&captured.manifest_relative_path);
2347        let blob_path = root.join(&captured.blob_relative_path);
2348
2349        let report = prune(
2350            &data_dir,
2351            RawMirrorPruneOptions {
2352                older_than_ms: Some(0),
2353                max_size_bytes: None,
2354                keep_tags: Vec::new(),
2355                safety_hold_down_ms: 0,
2356                apply: true,
2357            },
2358        )
2359        .expect("apply prune");
2360
2361        assert_eq!(report.applied_manifest_count, 1);
2362        assert_eq!(report.applied_blob_count, 1);
2363        assert!(!manifest_path.exists());
2364        assert!(!blob_path.exists());
2365        let audit = fs::read_to_string(root.join("pruned.jsonl")).expect("read audit");
2366        assert!(audit.contains("\"mode\":\"apply\""));
2367        assert!(audit.contains("\"applied\":true"));
2368    }
2369
2370    #[test]
2371    fn prune_apply_keeps_blob_referenced_by_retained_manifest() {
2372        let temp = tempfile::TempDir::new().expect("tempdir");
2373        let data_dir = temp.path().join("cass-data");
2374        let first_source = temp.path().join("first.jsonl");
2375        let second_source = temp.path().join("second.jsonl");
2376        let bytes = b"{\"type\":\"message\",\"text\":\"shared-retained\"}\n";
2377        fs::write(&first_source, bytes).expect("write first");
2378        fs::write(&second_source, bytes).expect("write second");
2379        let first = capture_source_file(RawMirrorCaptureInput {
2380            data_dir: &data_dir,
2381            provider: "codex",
2382            source_id: "local",
2383            origin_kind: "local",
2384            origin_host: None,
2385            source_path: &first_source,
2386            db_links: &[],
2387        })
2388        .expect("capture first");
2389        let second = capture_source_file(RawMirrorCaptureInput {
2390            data_dir: &data_dir,
2391            provider: "codex",
2392            source_id: "local",
2393            origin_kind: "local",
2394            origin_host: None,
2395            source_path: &second_source,
2396            db_links: &[],
2397        })
2398        .expect("capture second");
2399        let root = data_dir
2400            .join(RAW_MIRROR_ROOT_DIR)
2401            .join(RAW_MIRROR_VERSION_DIR);
2402        let first_manifest_path = root.join(&first.manifest_relative_path);
2403        let second_manifest_path = root.join(&second.manifest_relative_path);
2404        let mut first_manifest =
2405            read_raw_mirror_manifest(&first_manifest_path).expect("first manifest");
2406        first_manifest.captured_at_ms = now_ms().saturating_sub(2 * 86_400_000);
2407        first_manifest.manifest_blake3 = Some(raw_mirror_manifest_blake3(&first_manifest));
2408        fs::write(
2409            &first_manifest_path,
2410            serde_json::to_vec_pretty(&first_manifest).expect("serialize first manifest"),
2411        )
2412        .expect("rewrite first manifest");
2413
2414        let report = prune(
2415            &data_dir,
2416            RawMirrorPruneOptions {
2417                older_than_ms: Some(86_400_000),
2418                max_size_bytes: None,
2419                keep_tags: Vec::new(),
2420                safety_hold_down_ms: 0,
2421                apply: true,
2422            },
2423        )
2424        .expect("apply one-manifest prune");
2425
2426        assert_eq!(report.applied_manifest_count, 1);
2427        assert_eq!(report.applied_blob_count, 0);
2428        assert!(!first_manifest_path.exists());
2429        assert!(second_manifest_path.exists());
2430        assert!(
2431            root.join(&first.blob_relative_path).exists(),
2432            "shared blob must stay while a retained manifest still references it"
2433        );
2434    }
2435
2436    #[test]
2437    fn prune_apply_keep_tag_pins_linked_manifest_and_blob() {
2438        use frankensqlite::compat::ConnectionExt as _;
2439
2440        let temp = tempfile::TempDir::new().expect("tempdir");
2441        let data_dir = temp.path().join("cass-data");
2442        std::fs::create_dir_all(&data_dir).expect("create data dir");
2443        let source_path = temp.path().join("tagged.jsonl");
2444        fs::write(
2445            &source_path,
2446            b"{\"type\":\"message\",\"text\":\"tagged\"}\n",
2447        )
2448        .expect("write source");
2449        let db_link = RawMirrorDbLink {
2450            conversation_id: Some(7),
2451            message_count: Some(1),
2452            source_path: Some(source_path.display().to_string()),
2453            started_at_ms: Some(1_733_000_000_000),
2454        };
2455        let captured = capture_source_file(RawMirrorCaptureInput {
2456            data_dir: &data_dir,
2457            provider: "codex",
2458            source_id: "local",
2459            origin_kind: "local",
2460            origin_host: None,
2461            source_path: &source_path,
2462            db_links: std::slice::from_ref(&db_link),
2463        })
2464        .expect("capture source");
2465        let db_path = data_dir.join("agent_search.db");
2466        let conn = frankensqlite::Connection::open(db_path.to_string_lossy().into_owned())
2467            .expect("open keep-tag db");
2468        conn.execute_compat(
2469            "CREATE TABLE tags (id INTEGER PRIMARY KEY, name TEXT NOT NULL UNIQUE)",
2470            frankensqlite::params![],
2471        )
2472        .expect("create tags");
2473        conn.execute_compat(
2474            "CREATE TABLE conversation_tags (conversation_id INTEGER NOT NULL, tag_id INTEGER NOT NULL, PRIMARY KEY (conversation_id, tag_id))",
2475            frankensqlite::params![],
2476        )
2477        .expect("create conversation_tags");
2478        conn.execute_compat(
2479            "INSERT INTO tags (id, name) VALUES (1, 'keep')",
2480            frankensqlite::params![],
2481        )
2482        .expect("insert tag");
2483        conn.execute_compat(
2484            "INSERT INTO conversation_tags (conversation_id, tag_id) VALUES (7, 1)",
2485            frankensqlite::params![],
2486        )
2487        .expect("insert conversation tag");
2488        drop(conn);
2489
2490        let report = prune(
2491            &data_dir,
2492            RawMirrorPruneOptions {
2493                older_than_ms: Some(0),
2494                max_size_bytes: Some(0),
2495                keep_tags: vec!["keep".to_string()],
2496                safety_hold_down_ms: 0,
2497                apply: true,
2498            },
2499        )
2500        .expect("keep-tag prune");
2501
2502        let root = data_dir
2503            .join(RAW_MIRROR_ROOT_DIR)
2504            .join(RAW_MIRROR_VERSION_DIR);
2505        assert_eq!(report.pinned_manifest_count, 1);
2506        assert_eq!(report.pinned_blob_count, 1);
2507        assert_eq!(report.planned_manifest_count, 0);
2508        assert_eq!(report.planned_blob_count, 0);
2509        assert!(root.join(&captured.manifest_relative_path).exists());
2510        assert!(root.join(&captured.blob_relative_path).exists());
2511    }
2512
2513    #[test]
2514    fn prune_apply_safety_hold_down_pins_recent_manifest_during_size_prune() {
2515        let temp = tempfile::TempDir::new().expect("tempdir");
2516        let data_dir = temp.path().join("cass-data");
2517        let source_path = temp.path().join("recent.jsonl");
2518        fs::write(
2519            &source_path,
2520            b"{\"type\":\"message\",\"text\":\"recent\"}\n",
2521        )
2522        .expect("write source");
2523        let captured = capture_source_file(RawMirrorCaptureInput {
2524            data_dir: &data_dir,
2525            provider: "codex",
2526            source_id: "local",
2527            origin_kind: "local",
2528            origin_host: None,
2529            source_path: &source_path,
2530            db_links: &[],
2531        })
2532        .expect("capture source");
2533
2534        let report = prune(
2535            &data_dir,
2536            RawMirrorPruneOptions {
2537                older_than_ms: None,
2538                max_size_bytes: Some(0),
2539                keep_tags: Vec::new(),
2540                safety_hold_down_ms: 7 * 86_400_000,
2541                apply: true,
2542            },
2543        )
2544        .expect("hold-down prune");
2545
2546        let root = data_dir
2547            .join(RAW_MIRROR_ROOT_DIR)
2548            .join(RAW_MIRROR_VERSION_DIR);
2549        assert_eq!(report.pinned_manifest_count, 1);
2550        assert_eq!(report.pinned_blob_count, 1);
2551        assert_eq!(report.planned_manifest_count, 0);
2552        assert_eq!(report.planned_blob_count, 0);
2553        assert!(root.join(&captured.manifest_relative_path).exists());
2554        assert!(root.join(&captured.blob_relative_path).exists());
2555    }
2556
2557    #[test]
2558    fn capture_source_file_revalidates_cached_blob_contents() {
2559        let temp = tempfile::TempDir::new().expect("tempdir");
2560        let data_dir = temp.path().join("cass-data");
2561        let source_path = temp.path().join("cached-source.jsonl");
2562        let source_bytes = b"{\"type\":\"message\",\"text\":\"cache me\"}\n";
2563        fs::write(&source_path, source_bytes).expect("write source");
2564
2565        let first = capture_source_file(RawMirrorCaptureInput {
2566            data_dir: &data_dir,
2567            provider: "codex",
2568            source_id: "local",
2569            origin_kind: "local",
2570            origin_host: None,
2571            source_path: &source_path,
2572            db_links: &[],
2573        })
2574        .expect("first capture");
2575
2576        let blob_path = data_dir
2577            .join(RAW_MIRROR_ROOT_DIR)
2578            .join(RAW_MIRROR_VERSION_DIR)
2579            .join(&first.blob_relative_path);
2580        fs::write(&blob_path, b"corrupted cached blob").expect("corrupt cached blob");
2581
2582        let err = capture_source_file(RawMirrorCaptureInput {
2583            data_dir: &data_dir,
2584            provider: "codex",
2585            source_id: "local",
2586            origin_kind: "local",
2587            origin_host: None,
2588            source_path: &source_path,
2589            db_links: &[],
2590        })
2591        .expect_err("corrupted content-addressed blob must be rejected");
2592        assert!(
2593            err.to_string().contains("existing raw mirror blob"),
2594            "unexpected cached-blob error: {err:#}"
2595        );
2596        assert_eq!(fs::read(&source_path).expect("source bytes"), source_bytes);
2597    }
2598
2599    #[cfg(unix)]
2600    #[test]
2601    fn capture_source_file_does_not_reuse_cache_after_same_size_mtime_preserving_rewrite() {
2602        let temp = tempfile::TempDir::new().expect("tempdir");
2603        let data_dir = temp.path().join("cass-data");
2604        let source_path = temp.path().join("same-size-rewrite.jsonl");
2605        let first_bytes = b"same length payload A\n";
2606        let second_bytes = b"same length payload B\n";
2607        fs::write(&source_path, first_bytes).expect("write first source");
2608
2609        let first_modified = fs::metadata(&source_path)
2610            .expect("first metadata")
2611            .modified()
2612            .expect("first modified time");
2613        let first = capture_source_file(RawMirrorCaptureInput {
2614            data_dir: &data_dir,
2615            provider: "codex",
2616            source_id: "local",
2617            origin_kind: "local",
2618            origin_host: None,
2619            source_path: &source_path,
2620            db_links: &[],
2621        })
2622        .expect("first capture");
2623
2624        std::thread::sleep(std::time::Duration::from_millis(5));
2625        fs::write(&source_path, second_bytes).expect("rewrite source");
2626        let source = OpenOptions::new()
2627            .write(true)
2628            .open(&source_path)
2629            .expect("open rewritten source");
2630        source
2631            .set_times(std::fs::FileTimes::new().set_modified(first_modified))
2632            .expect("restore original mtime");
2633
2634        let second = capture_source_file(RawMirrorCaptureInput {
2635            data_dir: &data_dir,
2636            provider: "codex",
2637            source_id: "local",
2638            origin_kind: "local",
2639            origin_host: None,
2640            source_path: &source_path,
2641            db_links: &[],
2642        })
2643        .expect("second capture");
2644
2645        assert_ne!(first.blob_blake3, second.blob_blake3);
2646        assert_eq!(
2647            second.blob_blake3,
2648            blake3::hash(second_bytes).to_hex().to_string()
2649        );
2650        assert_eq!(
2651            fs::read(&source_path).expect("source bytes after rewrite"),
2652            second_bytes
2653        );
2654    }
2655
2656    #[cfg(unix)]
2657    #[test]
2658    fn capture_source_file_rejects_symlinked_existing_blob_path() {
2659        let temp = tempfile::TempDir::new().expect("tempdir");
2660        let data_dir = temp.path().join("cass-data");
2661        let source_path = temp.path().join("cached-source.jsonl");
2662        let source_bytes = b"{\"type\":\"message\",\"text\":\"cache me\"}\n";
2663        fs::write(&source_path, source_bytes).expect("write source");
2664
2665        let blob_blake3 = blake3::hash(source_bytes).to_hex().to_string();
2666        let blob_relative_path =
2667            raw_mirror_blob_relative_path(&blob_blake3).expect("blob relative path");
2668        let blob_path = data_dir
2669            .join(RAW_MIRROR_ROOT_DIR)
2670            .join(RAW_MIRROR_VERSION_DIR)
2671            .join(&blob_relative_path);
2672        fs::create_dir_all(blob_path.parent().expect("blob parent")).expect("blob parent dir");
2673        let outside = temp.path().join("outside.raw");
2674        fs::write(&outside, source_bytes).expect("outside blob bytes");
2675        std::os::unix::fs::symlink(&outside, &blob_path).expect("symlink blob");
2676
2677        let err = capture_source_file(RawMirrorCaptureInput {
2678            data_dir: &data_dir,
2679            provider: "codex",
2680            source_id: "local",
2681            origin_kind: "local",
2682            origin_host: None,
2683            source_path: &source_path,
2684            db_links: &[],
2685        })
2686        .expect_err("symlinked content-addressed blob path must be rejected");
2687        assert!(
2688            err.to_string().contains("symlink raw mirror blob"),
2689            "unexpected symlink-blob error: {err:#}"
2690        );
2691
2692        let manifest_root = data_dir
2693            .join(RAW_MIRROR_ROOT_DIR)
2694            .join(RAW_MIRROR_VERSION_DIR)
2695            .join("manifests");
2696        assert!(
2697            !manifest_root.exists(),
2698            "failed blob publish must not write a manifest pointing at a symlinked blob"
2699        );
2700        assert_eq!(fs::read(&source_path).expect("source bytes"), source_bytes);
2701        assert_eq!(fs::read(&outside).expect("outside bytes"), source_bytes);
2702    }
2703
2704    #[cfg(unix)]
2705    #[test]
2706    fn capture_source_file_rejects_symlinked_raw_mirror_root_dir() {
2707        let temp = tempfile::TempDir::new().expect("tempdir");
2708        let data_dir = temp.path().join("cass-data");
2709        let source_path = temp.path().join("source.jsonl");
2710        let outside_mirror = temp.path().join("outside-mirror");
2711        let source_bytes = b"{\"type\":\"message\",\"text\":\"do not redirect archive\"}\n";
2712
2713        fs::create_dir_all(&data_dir).expect("data dir");
2714        fs::create_dir_all(&outside_mirror).expect("outside mirror dir");
2715        fs::write(&source_path, source_bytes).expect("write source");
2716        std::os::unix::fs::symlink(&outside_mirror, data_dir.join(RAW_MIRROR_ROOT_DIR))
2717            .expect("symlink raw mirror root");
2718
2719        let err = capture_source_file(RawMirrorCaptureInput {
2720            data_dir: &data_dir,
2721            provider: "codex",
2722            source_id: "local",
2723            origin_kind: "local",
2724            origin_host: None,
2725            source_path: &source_path,
2726            db_links: &[],
2727        })
2728        .expect_err("symlinked raw-mirror root must be rejected");
2729
2730        assert!(
2731            err.to_string().contains("symlink raw mirror dir"),
2732            "unexpected symlink-root error: {err:#}"
2733        );
2734        assert!(
2735            !outside_mirror.join(RAW_MIRROR_VERSION_DIR).exists(),
2736            "raw mirror capture must not create redirected archive state outside data_dir"
2737        );
2738        assert_eq!(fs::read(&source_path).expect("source bytes"), source_bytes);
2739    }
2740
2741    #[cfg(unix)]
2742    #[test]
2743    fn capture_source_file_rejects_symlinked_blob_directory_component() {
2744        let temp = tempfile::TempDir::new().expect("tempdir");
2745        let data_dir = temp.path().join("cass-data");
2746        let root = data_dir
2747            .join(RAW_MIRROR_ROOT_DIR)
2748            .join(RAW_MIRROR_VERSION_DIR);
2749        let source_path = temp.path().join("source.jsonl");
2750        let outside_blobs = temp.path().join("outside-blobs");
2751        let source_bytes = b"{\"type\":\"message\",\"text\":\"do not redirect blobs\"}\n";
2752
2753        fs::create_dir_all(&root).expect("raw mirror root");
2754        fs::create_dir_all(&outside_blobs).expect("outside blobs dir");
2755        fs::write(&source_path, source_bytes).expect("write source");
2756        std::os::unix::fs::symlink(&outside_blobs, root.join("blobs")).expect("symlink blobs dir");
2757
2758        let err = capture_source_file(RawMirrorCaptureInput {
2759            data_dir: &data_dir,
2760            provider: "codex",
2761            source_id: "local",
2762            origin_kind: "local",
2763            origin_host: None,
2764            source_path: &source_path,
2765            db_links: &[],
2766        })
2767        .expect_err("symlinked blob directory must be rejected");
2768
2769        assert!(
2770            err.to_string().contains("symlink raw mirror dir"),
2771            "unexpected symlink-blob-dir error: {err:#}"
2772        );
2773        assert!(
2774            !outside_blobs.join(RAW_MIRROR_HASH_ALGORITHM).exists(),
2775            "raw mirror capture must not create redirected blob state outside data_dir"
2776        );
2777        assert!(
2778            !root.join("manifests").exists(),
2779            "failed blob publish must not write a manifest"
2780        );
2781        assert_eq!(fs::read(&source_path).expect("source bytes"), source_bytes);
2782    }
2783
2784    #[test]
2785    fn capture_source_file_rejects_non_file_sources() {
2786        let temp = tempfile::TempDir::new().expect("tempdir");
2787        let data_dir = temp.path().join("cass-data");
2788        let source_dir = temp.path().join("source-dir");
2789        fs::create_dir(&source_dir).expect("source dir");
2790
2791        let err = capture_source_file(RawMirrorCaptureInput {
2792            data_dir: &data_dir,
2793            provider: "codex",
2794            source_id: "local",
2795            origin_kind: "local",
2796            origin_host: None,
2797            source_path: &source_dir,
2798            db_links: &[],
2799        })
2800        .expect_err("directory source should be rejected");
2801        assert!(
2802            err.to_string().contains("non-file source"),
2803            "unexpected non-file-source error: {err}"
2804        );
2805        assert!(
2806            !data_dir.join(RAW_MIRROR_ROOT_DIR).exists(),
2807            "rejected non-file sources must not initialize raw mirror storage"
2808        );
2809    }
2810
2811    #[cfg(unix)]
2812    #[test]
2813    fn capture_source_file_rejects_unreadable_sources_without_manifest() {
2814        use std::os::unix::fs::PermissionsExt;
2815
2816        let temp = tempfile::TempDir::new().expect("tempdir");
2817        let data_dir = temp.path().join("cass-data");
2818        let source_path = temp.path().join("unreadable.jsonl");
2819        fs::write(&source_path, b"private session bytes\n").expect("source");
2820        fs::set_permissions(&source_path, fs::Permissions::from_mode(0o000))
2821            .expect("make source unreadable");
2822
2823        let err = capture_source_file(RawMirrorCaptureInput {
2824            data_dir: &data_dir,
2825            provider: "codex",
2826            source_id: "local",
2827            origin_kind: "local",
2828            origin_host: None,
2829            source_path: &source_path,
2830            db_links: &[],
2831        })
2832        .expect_err("unreadable source should be rejected");
2833        fs::set_permissions(&source_path, fs::Permissions::from_mode(0o600))
2834            .expect("restore source perms");
2835        assert!(
2836            err.to_string().contains("open raw mirror source"),
2837            "unexpected unreadable-source error: {err}"
2838        );
2839        assert!(
2840            !data_dir.join("raw-mirror/v1/manifests").exists(),
2841            "failed unreadable-source captures must not publish manifests"
2842        );
2843    }
2844
2845    #[cfg(unix)]
2846    #[test]
2847    fn capture_source_file_rejects_symlink_sources() {
2848        use std::os::unix::fs::symlink;
2849
2850        let temp = tempfile::TempDir::new().expect("tempdir");
2851        let data_dir = temp.path().join("cass-data");
2852        let real_source = temp.path().join("real.jsonl");
2853        let symlink_source = temp.path().join("link.jsonl");
2854        fs::write(&real_source, b"secret session").expect("write source");
2855        symlink(&real_source, &symlink_source).expect("symlink");
2856
2857        let err = capture_source_file(RawMirrorCaptureInput {
2858            data_dir: &data_dir,
2859            provider: "codex",
2860            source_id: "local",
2861            origin_kind: "local",
2862            origin_host: None,
2863            source_path: &symlink_source,
2864            db_links: &[],
2865        })
2866        .expect_err("symlink source should be rejected");
2867        assert!(
2868            err.to_string().contains("symlink source"),
2869            "unexpected error: {err:#}"
2870        );
2871    }
2872}