Skip to main content

pf_core/
snapshot.rs

1// SPDX-License-Identifier: MIT
2//! Atomic four-layer snapshot orchestration.
3//!
4//! [`Snapshotter`] runs the four [`LayerCapture`] implementations
5//! concurrently, collects their layer descriptors, assembles a
6//! [`Manifest`](crate::manifest::Manifest), persists it via [`PfStore`], and
7//! returns the resulting content-id. The whole orchestration is wall-clock
8//! ≤500 ms for the synthetic 4-layer fixture (Phase-1 acceptance gate).
9
10use crate::digest::Digest256;
11use crate::error::Result;
12use crate::manifest::{
13    AgentInfo, CacheLayer, EffectsLayer, MEDIATYPE_V1, Manifest, ModelLayer, TraceLayer, WorldLayer,
14};
15use crate::store::PfStore;
16
17use chrono::Utc;
18use std::sync::Arc;
19use std::thread;
20
21/// One layer's contribution to a snapshot. Implementations capture their live
22/// state, write blobs into the supplied store, and return a typed descriptor
23/// suitable for [`Manifest`] embedding.
24pub trait LayerCapture: Send + Sync {
25    /// Strongly-typed descriptor enum naming the four layer kinds.
26    fn kind(&self) -> LayerKind;
27    /// Run the capture, returning a layer descriptor.
28    fn capture(&self, store: &PfStore) -> Result<LayerDescriptor>;
29}
30
31/// Discriminator for the four layer kinds.
32#[derive(Clone, Copy, Debug, PartialEq, Eq)]
33pub enum LayerKind {
34    /// Model weights / adapters.
35    Model,
36    /// Paged KV cache.
37    Cache,
38    /// Filesystem / env / processes / DOM.
39    World,
40    /// Effect ledger.
41    Effects,
42    /// Reasoning trace (chat + tool messages).
43    Trace,
44}
45
46/// Heterogeneous capture-result shape — one variant per layer kind.
47#[derive(Clone, Debug)]
48pub enum LayerDescriptor {
49    /// Model layer descriptor.
50    Model(ModelLayer),
51    /// Cache layer descriptor.
52    Cache(CacheLayer),
53    /// World layer descriptor.
54    World(WorldLayer),
55    /// Effects layer descriptor.
56    Effects(EffectsLayer),
57    /// Trace layer descriptor.
58    Trace(TraceLayer),
59}
60
61/// Atomic four-layer snapshot orchestrator.
62///
63/// Constructed with one [`LayerCapture`] per layer (model, cache, world,
64/// effects, trace). Captures run concurrently on stdlib threads (not tokio:
65/// each capture is CPU-bound by hashing/compression, not I/O-bound; threads
66/// outperform a runtime).
67///
68/// ```no_run
69/// use std::sync::Arc;
70/// use pf_core::{
71///     snapshot::{Snapshotter, LayerCapture, LayerDescriptor, LayerKind},
72///     store::PfStore,
73///     manifest::AgentInfo,
74/// };
75/// # struct StubModel; impl LayerCapture for StubModel {
76/// #     fn kind(&self) -> LayerKind { LayerKind::Model }
77/// #     fn capture(&self, _: &PfStore) -> pf_core::Result<LayerDescriptor> { unimplemented!() }
78/// # }
79/// # let store = PfStore::open("/tmp/_pf_doctest").unwrap();
80/// # let model = Arc::new(StubModel);
81/// # let cache = Arc::new(StubModel);
82/// # let world = Arc::new(StubModel);
83/// # let effects = Arc::new(StubModel);
84/// # let trace = Arc::new(StubModel);
85/// let agent = AgentInfo {
86///     kind: "demo".into(), version: "0".into(), fingerprint: "abc".into(),
87/// };
88/// let snapper = Snapshotter::new(agent, model, cache, world, effects, trace);
89/// // let cid = snapper.snapshot(&store, vec![]).unwrap();
90/// ```
91pub struct Snapshotter {
92    agent: AgentInfo,
93    model: Arc<dyn LayerCapture>,
94    cache: Arc<dyn LayerCapture>,
95    world: Arc<dyn LayerCapture>,
96    effects: Arc<dyn LayerCapture>,
97    trace: Arc<dyn LayerCapture>,
98}
99
100impl Snapshotter {
101    /// Construct a snapshotter with one capture per layer.
102    pub fn new(
103        agent: AgentInfo,
104        model: Arc<dyn LayerCapture>,
105        cache: Arc<dyn LayerCapture>,
106        world: Arc<dyn LayerCapture>,
107        effects: Arc<dyn LayerCapture>,
108        trace: Arc<dyn LayerCapture>,
109    ) -> Self {
110        Self {
111            agent,
112            model,
113            cache,
114            world,
115            effects,
116            trace,
117        }
118    }
119
120    /// Run all five captures concurrently, assemble the manifest, persist it
121    /// via `store`, and return its content-id. `parents` becomes the
122    /// manifest's `parents` field (use `vec![]` for a root snapshot).
123    pub fn snapshot(&self, store: &PfStore, parents: Vec<Digest256>) -> Result<Digest256> {
124        // Spawn one OS thread per layer. We own the captures behind Arcs,
125        // and `PfStore` is `Send + Sync` because its inner `BlobStore` is.
126        // We re-create per-thread `&PfStore` borrows via raw pointer scopes
127        // through `thread::scope`.
128        let descriptors: Result<Vec<LayerDescriptor>> = thread::scope(|scope| {
129            let model_cap = self.model.clone();
130            let cache_cap = self.cache.clone();
131            let world_cap = self.world.clone();
132            let effects_cap = self.effects.clone();
133            let trace_cap = self.trace.clone();
134
135            let h_model = scope.spawn(move || model_cap.capture(store));
136            let h_cache = scope.spawn(move || cache_cap.capture(store));
137            let h_world = scope.spawn(move || world_cap.capture(store));
138            let h_effects = scope.spawn(move || effects_cap.capture(store));
139            let h_trace = scope.spawn(move || trace_cap.capture(store));
140
141            let mut out = Vec::with_capacity(5);
142            for handle in [h_model, h_cache, h_world, h_effects, h_trace] {
143                out.push(handle.join().expect("capture thread panicked")?);
144            }
145            Ok(out)
146        });
147        let descriptors = descriptors?;
148
149        // Stitch descriptors into the typed manifest layers. Order of
150        // descriptors matches the spawn order above.
151        let mut model = None;
152        let mut cache = None;
153        let mut world = None;
154        let mut effects = None;
155        let mut trace = None;
156        for d in descriptors {
157            match d {
158                LayerDescriptor::Model(x) => model = Some(x),
159                LayerDescriptor::Cache(x) => cache = Some(x),
160                LayerDescriptor::World(x) => world = Some(x),
161                LayerDescriptor::Effects(x) => effects = Some(x),
162                LayerDescriptor::Trace(x) => trace = Some(x),
163            }
164        }
165
166        let manifest = Manifest {
167            schema_version: 1,
168            media_type: MEDIATYPE_V1.to_owned(),
169            agent: self.agent.clone(),
170            model: model.expect("model capture must produce ModelLayer"),
171            cache: cache.expect("cache capture must produce CacheLayer"),
172            world: world.expect("world capture must produce WorldLayer"),
173            effects: effects.expect("effects capture must produce EffectsLayer"),
174            trace: trace.expect("trace capture must produce TraceLayer"),
175            created_at: Utc::now(),
176            parents,
177        };
178
179        store.put_manifest(&manifest)
180    }
181}