Skip to main content

selene_graph/
core_provider.rs

1//! Core graph snapshot provider for persistence integration.
2
3mod recovery_state;
4mod sections;
5
6use std::sync::{
7    Arc,
8    atomic::{AtomicU64, Ordering},
9};
10
11use arc_swap::ArcSwap;
12use parking_lot::Mutex;
13use selene_core::{Change, HlcTimestamp, Origin};
14use selene_persist::{
15    AuditLog, AuditRecord, RecoveryError, RecoveryProvider, RecoveryResult, WalWriter,
16};
17
18use crate::core_provider::recovery_state::RecoveryState;
19use crate::core_provider::sections::{
20    encode_composite_schemas, encode_edges, encode_graph_types, encode_meta, encode_nodes,
21    encode_schemas, encode_text_schemas, encode_vector_schemas,
22};
23use crate::durable_provider::DurableProvider;
24use crate::error::GraphResult;
25use crate::graph::SeleneGraph;
26use crate::index_provider::{IndexProvider, ProviderError, ProviderTag, SubTag};
27
28/// Core graph provider tag used in snapshot section tables.
29pub const CORE_PROVIDER_TAG: [u8; 4] = *b"CORE";
30/// Core metadata subsection tag under [`CORE_PROVIDER_TAG`].
31pub const CORE_META_SUB: [u8; 4] = *b"META";
32/// Core graph-type subsection tag under [`CORE_PROVIDER_TAG`].
33pub const CORE_GTYP_SUB: [u8; 4] = *b"GTYP";
34/// Core node-column subsection tag under [`CORE_PROVIDER_TAG`].
35pub const CORE_NODE_SUB: [u8; 4] = *b"NODE";
36/// Core edge-column subsection tag under [`CORE_PROVIDER_TAG`].
37pub const CORE_EDGE_SUB: [u8; 4] = *b"EDGE";
38/// Core schema subsection tag under [`CORE_PROVIDER_TAG`].
39pub const CORE_SCMA_SUB: [u8; 4] = *b"SCMA";
40/// Core composite-property-index schema subsection tag under [`CORE_PROVIDER_TAG`].
41pub const CORE_CPIX_SUB: [u8; 4] = *b"CPIX";
42/// Core vector-index schema subsection tag under [`CORE_PROVIDER_TAG`].
43pub const CORE_VIDX_SUB: [u8; 4] = *b"VIDX";
44/// Core text-index schema subsection tag under [`CORE_PROVIDER_TAG`].
45pub const CORE_TIDX_SUB: [u8; 4] = *b"TIDX";
46
47const CORE_SUB_TAGS: &[SubTag] = &[
48    SubTag(CORE_GTYP_SUB),
49    SubTag(CORE_META_SUB),
50    SubTag(CORE_NODE_SUB),
51    SubTag(CORE_EDGE_SUB),
52    SubTag(CORE_SCMA_SUB),
53    SubTag(CORE_CPIX_SUB),
54    SubTag(CORE_VIDX_SUB),
55    SubTag(CORE_TIDX_SUB),
56];
57
58/// Shared provider implementation for live snapshots and recovery replay.
59///
60/// Live instances hold the exact [`ArcSwap`] used by [`crate::SharedGraph`].
61/// Recovery instances accumulate snapshot sections and WAL changes until
62/// [`CoreProvider::finish_recovery`] materializes a [`SeleneGraph`].
63pub struct CoreProvider {
64    inner: Mutex<CoreInner>,
65}
66
67enum CoreInner {
68    Live {
69        snapshot: Arc<ArcSwap<SeleneGraph>>,
70        durable: Option<DurableState>,
71    },
72    Recovery {
73        state: Box<RecoveryState>,
74    },
75}
76
77/// Durable WAL state owned by a live [`CoreProvider`].
78///
79/// The HLC counter is seeded from [`WalWriter::last_sequence`]. On a fresh WAL
80/// this is zero and the first commit receives `HlcTimestamp::new(1, 0)`; after
81/// reopening, the next timestamp advances past the recovered WAL sequence.
82pub struct DurableState {
83    writer: Mutex<WalWriter>,
84    next_hlc: AtomicU64,
85    audit: Option<Mutex<AuditLog>>,
86}
87
88impl DurableState {
89    /// Construct durable state from an already-open WAL writer.
90    #[must_use]
91    pub fn new(writer: WalWriter) -> Self {
92        let last_sequence = writer.last_sequence();
93        Self {
94            writer: Mutex::new(writer),
95            next_hlc: AtomicU64::new(last_sequence),
96            audit: None,
97        }
98    }
99
100    /// Attach an audit log so engine-owned events committed through this
101    /// provider are mirrored to it (Item 7 / D24).
102    ///
103    /// Mirroring is **WAL-first, audit-after**: the WAL append is the source of
104    /// truth and gates the commit; the audit write runs only after it succeeds
105    /// and is best-effort (a failure is logged, never failing the commit). The
106    /// event also remains in the WAL, so a failed mirror degrades to the
107    /// pre-Item-7 WAL-only behavior rather than losing the event. Per the donor
108    /// lesson "audit lag is recoverable, fiction is not," the audit can only lag
109    /// the WAL, never lead it.
110    #[must_use]
111    pub fn with_audit_log(mut self, audit: AuditLog) -> Self {
112        self.audit = Some(Mutex::new(audit));
113        self
114    }
115
116    /// Append one engine-owned event to the attached audit log, if any.
117    ///
118    /// The D24 audit log is the durable "events" surface in the
119    /// snapshot=state / WAL=changes / audit=events split, with retention
120    /// independent of the WAL lineage. Appends are **best-effort and audit-after**:
121    /// the caller stamps the wall clock here, the record is serialized by the
122    /// caller into an opaque `kind`-tagged payload, and an append failure is
123    /// logged and skipped rather than propagated, so the audit can only lag a
124    /// committed change, never lead it (the donor lesson "audit lag is
125    /// recoverable, fiction is not"). Returns `false` when no audit log is
126    /// attached or the append failed.
127    ///
128    /// The pack-lifecycle producer that previously fed this surface was removed
129    /// in the extension teardown; the framework remains wired (persisted,
130    /// reattached on recovery) for future user-action audit events (D24).
131    pub fn append_audit_event(&self, kind: u16, payload: Vec<u8>) -> bool {
132        let Some(audit) = &self.audit else {
133            return false;
134        };
135        let record = AuditRecord {
136            recorded_at_unix_nanos: unix_nanos_now(),
137            kind,
138            payload,
139        };
140        let mut log = audit.lock();
141        match log.append(&record) {
142            Ok(()) => true,
143            Err(error) => {
144                tracing::error!(%error, "audit: failed to append engine event");
145                false
146            }
147        }
148    }
149}
150
151/// Current wall-clock time as nanoseconds since the Unix epoch, saturating.
152fn unix_nanos_now() -> u64 {
153    std::time::SystemTime::now()
154        .duration_since(std::time::UNIX_EPOCH)
155        .map(|elapsed| u64::try_from(elapsed.as_nanos()).unwrap_or(u64::MAX))
156        .unwrap_or(0)
157}
158
159impl CoreProvider {
160    /// Construct a live provider bound to a shared graph snapshot pointer.
161    #[must_use]
162    pub fn new_for_live(snapshot: Arc<ArcSwap<SeleneGraph>>) -> Arc<Self> {
163        Self::new_for_live_with_wal(snapshot, None)
164    }
165
166    /// Construct a live provider with optional commit-critical WAL state.
167    #[must_use]
168    pub fn new_for_live_with_wal(
169        snapshot: Arc<ArcSwap<SeleneGraph>>,
170        durable: Option<DurableState>,
171    ) -> Arc<Self> {
172        Arc::new(Self {
173            inner: Mutex::new(CoreInner::Live { snapshot, durable }),
174        })
175    }
176
177    /// Construct a recovery-mode provider with an empty accumulator.
178    #[must_use]
179    pub fn new_for_recovery() -> Arc<Self> {
180        Arc::new(Self {
181            inner: Mutex::new(CoreInner::Recovery {
182                state: Box::new(RecoveryState::new()),
183            }),
184        })
185    }
186
187    /// Drain the recovery accumulator into a graph snapshot.
188    ///
189    /// `expected_graph_id` is the caller-asserted graph identity. If a
190    /// snapshot's `CORE/META` was applied and disagrees with this id,
191    /// recovery fails. If no `CORE/META` was applied (WAL-only or empty
192    /// recovery), `expected_graph_id` is used directly with default scalar
193    /// metadata fields.
194    ///
195    /// # Errors
196    ///
197    /// Returns [`crate::GraphError::Provider`] if this provider was
198    /// constructed for live mode, if META disagrees with
199    /// `expected_graph_id`, or if the accumulated section/changelog state
200    /// cannot be materialized into graph columns.
201    pub fn finish_recovery(
202        self: Arc<Self>,
203        expected_graph_id: selene_core::GraphId,
204        expected_bound_type: Option<Arc<crate::graph_types::GraphTypeDef>>,
205    ) -> GraphResult<SeleneGraph> {
206        let mut inner = self.inner.lock();
207        match &mut *inner {
208            CoreInner::Live { .. } => {
209                Err(inconsistent("finish_recovery called on live-mode CoreProvider").into())
210            }
211            CoreInner::Recovery { state } => {
212                let state = std::mem::take(&mut **state);
213                state.into_graph(expected_graph_id, expected_bound_type)
214            }
215        }
216    }
217
218    fn read_section_inner(&self, sub_tag: SubTag, bytes: &[u8]) -> Result<(), ProviderError> {
219        let mut inner = self.inner.lock();
220        match &mut *inner {
221            CoreInner::Live { .. } => Err(inconsistent(
222                "read_section called on live-mode CoreProvider",
223            )),
224            CoreInner::Recovery { state } => state.read_section(sub_tag, bytes),
225        }
226    }
227
228    fn write_section_inner(&self, sub_tag: SubTag) -> Result<Vec<u8>, ProviderError> {
229        // The snapshot is an `Arc<ArcSwap<SeleneGraph>>`: `load_full()` clones the
230        // Arc lock-free. Hold `inner` ONLY long enough to grab that Arc, then drop
231        // the guard so the (potentially ≤1 GiB) rkyv encode runs lock-free and
232        // never blocks the commit hot path that shares this Mutex.
233        let graph = {
234            let inner = self.inner.lock();
235            match &*inner {
236                CoreInner::Live { snapshot, .. } => snapshot.load_full(),
237                CoreInner::Recovery { .. } => {
238                    return Err(inconsistent(
239                        "write_section called on recovery-mode CoreProvider",
240                    ));
241                }
242            }
243        };
244        match sub_tag.0 {
245            CORE_GTYP_SUB => encode_graph_types(&graph),
246            CORE_META_SUB => encode_meta(&graph.meta, graph.meta.generation),
247            CORE_NODE_SUB => encode_nodes(&graph),
248            CORE_EDGE_SUB => encode_edges(&graph),
249            CORE_SCMA_SUB => encode_schemas(&graph),
250            CORE_CPIX_SUB => encode_composite_schemas(&graph),
251            CORE_VIDX_SUB => encode_vector_schemas(&graph),
252            CORE_TIDX_SUB => encode_text_schemas(&graph),
253            _ => Err(invalid_sub_tag(sub_tag)),
254        }
255    }
256
257    fn on_change_inner(&self, change: &Change) -> Result<(), ProviderError> {
258        let mut inner = self.inner.lock();
259        match &mut *inner {
260            CoreInner::Live { .. } => Ok(()),
261            CoreInner::Recovery { state } => state.apply_change(change),
262        }
263    }
264}
265
266impl IndexProvider for CoreProvider {
267    fn provider_tag(&self) -> ProviderTag {
268        ProviderTag(CORE_PROVIDER_TAG)
269    }
270
271    fn read_section(&self, sub_tag: SubTag, bytes: &[u8]) -> Result<(), ProviderError> {
272        self.read_section_inner(sub_tag, bytes)
273    }
274
275    fn write_section(&self, sub_tag: SubTag) -> Result<Vec<u8>, ProviderError> {
276        self.write_section_inner(sub_tag)
277    }
278
279    fn on_change(&self, change: &Change) -> Result<(), ProviderError> {
280        self.on_change_inner(change)
281    }
282
283    fn declared_sub_tags(&self) -> &[SubTag] {
284        CORE_SUB_TAGS
285    }
286}
287
288impl DurableProvider for CoreProvider {
289    fn provider_tag(&self) -> ProviderTag {
290        ProviderTag(CORE_PROVIDER_TAG)
291    }
292
293    fn next_timestamp(&self) -> HlcTimestamp {
294        let inner = self.inner.lock();
295        match &*inner {
296            CoreInner::Live {
297                durable: Some(durable),
298                ..
299            } => {
300                let seconds = durable
301                    .next_hlc
302                    .fetch_add(1, Ordering::Relaxed)
303                    .saturating_add(1);
304                HlcTimestamp::new(seconds, 0)
305            }
306            CoreInner::Live { durable: None, .. } | CoreInner::Recovery { .. } => {
307                HlcTimestamp::zero()
308            }
309        }
310    }
311
312    fn write_commit(
313        &self,
314        principal: Option<&Arc<[u8]>>,
315        changes: &[Change],
316        timestamp: HlcTimestamp,
317    ) -> Result<u64, ProviderError> {
318        let mut inner = self.inner.lock();
319        match &mut *inner {
320            CoreInner::Live {
321                durable: Some(durable),
322                ..
323            } => {
324                // Cheap refcount bump — the caller already owns the bytes as an
325                // `Arc<[u8]>`; no slice re-allocation or copy.
326                let principal = principal.cloned();
327                // WAL-first: the append gates the commit (its error fails it).
328                let sequence = {
329                    let mut writer = durable.writer.lock();
330                    writer
331                        .append(timestamp, Origin::Local, principal, changes)
332                        .map_err(durable_error)?
333                };
334                Ok(sequence)
335            }
336            CoreInner::Live { durable: None, .. } => Ok(0),
337            CoreInner::Recovery { .. } => Err(inconsistent(
338                "write_commit called on recovery-mode CoreProvider",
339            )),
340        }
341    }
342
343    fn flush(&self) -> Result<Option<u64>, ProviderError> {
344        let mut inner = self.inner.lock();
345        match &mut *inner {
346            CoreInner::Live {
347                durable: Some(durable),
348                ..
349            } => {
350                let mut writer = durable.writer.lock();
351                writer.flush().map_err(durable_error)?;
352                Ok(Some(writer.last_sequence()))
353            }
354            CoreInner::Live { durable: None, .. } | CoreInner::Recovery { .. } => Ok(None),
355        }
356    }
357}
358
359impl RecoveryProvider for CoreProvider {
360    fn provider_tag(&self) -> [u8; 4] {
361        CORE_PROVIDER_TAG
362    }
363
364    fn read_section(&self, sub: [u8; 4], bytes: &[u8]) -> RecoveryResult<()> {
365        self.read_section_inner(SubTag(sub), bytes)
366            .map_err(box_provider_error)
367    }
368
369    // `on_changes` is the sole entry point WAL replay invokes; the per-change
370    // `RecoveryProvider::on_change` default is unused for this provider, so it is
371    // deliberately not overridden.
372    fn on_changes(&self, changes: &[Change]) -> RecoveryResult<()> {
373        for change in changes {
374            self.on_change_inner(change).map_err(box_provider_error)?;
375        }
376        Ok(())
377    }
378}
379
380pub(crate) fn invalid_payload(reason: impl Into<String>) -> ProviderError {
381    ProviderError::InvalidPayload {
382        reason: reason.into(),
383    }
384}
385
386fn durable_error(error: impl std::error::Error) -> ProviderError {
387    ProviderError::SerializationFailed {
388        reason: error.to_string(),
389    }
390}
391
392pub(crate) fn serialization_failed(reason: impl Into<String>) -> ProviderError {
393    ProviderError::SerializationFailed {
394        reason: reason.into(),
395    }
396}
397
398pub(crate) fn inconsistent(reason: impl Into<String>) -> ProviderError {
399    ProviderError::Inconsistent {
400        reason: reason.into(),
401    }
402}
403
404fn invalid_sub_tag(sub_tag: SubTag) -> ProviderError {
405    invalid_payload(format!("unknown CORE sub-tag {sub_tag}"))
406}
407
408fn box_provider_error(error: ProviderError) -> RecoveryError {
409    Box::new(error)
410}
411
412#[cfg(test)]
413#[path = "core_provider/tests.rs"]
414mod tests;