Skip to main content

selene_graph/
index_provider.rs

1//! Stateful provider protocol for engine-owned derived state.
2//!
3//! `IndexProvider` is the engine-internal hook through which the CORE storage
4//! provider, maintained candidate-state providers, and recovery-side replay
5//! providers participate in snapshot bootstrap and per-commit mutation
6//! observation. selene-db is a single native engine with no loadable
7//! extensions; providers are first-party engine internals, not loadable packs.
8
9use std::fmt;
10
11use selene_core::{Change, DbString};
12
13use crate::{SeleneGraph, VectorCandidateSet};
14
15/// Stable 4-byte ASCII identifier for an [`IndexProvider`] registration.
16///
17/// Reserved tag space per spec 04 section 4.2:
18/// - `CORE` is reserved for engine-owned snapshot sections.
19/// - `META`/`NODE`/`EDGE`/`SCMA` are reserved sub-tags under `CORE`, not
20///   provider tags.
21/// - First-party provider allocations include `CSET`, `TIMS`, `GRPR`.
22/// - Other ASCII uppercase 4-byte sequences are provider-allocated.
23#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
24pub struct ProviderTag(
25    /// Raw 4-byte provider tag.
26    pub [u8; 4],
27);
28
29impl fmt::Display for ProviderTag {
30    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
31        fmt_tag(self.0, f)
32    }
33}
34
35/// 4-byte subsection identifier within a provider's snapshot footprint.
36#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
37pub struct SubTag(
38    /// Raw 4-byte provider-local subsection tag.
39    pub [u8; 4],
40);
41
42impl fmt::Display for SubTag {
43    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
44        fmt_tag(self.0, f)
45    }
46}
47
48/// Stateful hook for engine-owned derived-state participation.
49///
50/// [`IndexProvider::read_section`] and [`IndexProvider::on_change`] take
51/// `&self`: `selene-graph` stores providers as `Arc<dyn IndexProvider>`, so
52/// providers use interior mutability for owned state. The engine guarantees
53/// serialized calls per graph.
54///
55/// ## Change-shape contract
56///
57/// Runtime commit fan-out observes the post-commit graph snapshot. When a
58/// mutation stages per-row tombstone expansions for
59/// [`Change::NodesOfTypeTruncated`], [`Change::EdgesOfTypeTruncated`], or
60/// [`Change::GraphReset`], live fan-out receives the expanded
61/// `NodeDeleted`/`EdgeDeleted` view instead of the persisted declarative change.
62/// WAL replay is different: recovery drives providers with the exact committed
63/// `Change` payloads after CORE applies them. Providers whose derived state
64/// depends on deleted rows must therefore handle the declarative truncate/reset
65/// variants or rebuild their state from the recovered graph before serving
66/// reads.
67///
68/// ## Re-entrancy contract
69///
70/// `on_change` MUST NOT initiate a write transaction on the same graph,
71/// directly or indirectly. The engine detects same-thread re-entry into
72/// `SharedGraph::begin_write` and panics with a clear message; the panic
73/// is caught by the outer `notify_providers` boundary so the outer commit
74/// still completes.
75///
76/// **Cross-thread re-entry is documented misuse.** A provider whose
77/// `on_change` spawns a worker thread, calls `begin_write` on that worker,
78/// and waits for the worker (e.g., `JoinHandle::join`, channel `recv`) is a
79/// circular wait the engine cannot detect: the worker blocks on the held
80/// write lock; the outer `on_change` blocks waiting for the worker; the
81/// outer commit cannot release the lock until `on_change` returns.
82/// Provider authors who spawn threads inside `on_change` must not block
83/// the callback on those threads' progress.
84pub trait IndexProvider: Send + Sync + 'static {
85    /// Stable 4-byte ASCII tag uniquely identifying this provider.
86    fn provider_tag(&self) -> ProviderTag;
87
88    /// Snapshot bootstrap for one provider-owned section.
89    ///
90    /// # Errors
91    ///
92    /// Returns [`ProviderError`] when the payload is missing, malformed, or
93    /// inconsistent with provider state.
94    fn read_section(&self, sub_tag: SubTag, bytes: &[u8]) -> Result<(), ProviderError>;
95
96    /// Snapshot publish for one provider-owned section.
97    ///
98    /// # Errors
99    ///
100    /// Returns [`ProviderError`] when serialization cannot produce a stable
101    /// section payload.
102    fn write_section(&self, sub_tag: SubTag) -> Result<Vec<u8>, ProviderError>;
103
104    /// Observe one committed mutation.
105    ///
106    /// # Errors
107    ///
108    /// Returns [`ProviderError`] for provider-local failures. Live commits log
109    /// and continue after these errors because the graph snapshot has already
110    /// been published.
111    fn on_change(&self, change: &Change) -> Result<(), ProviderError>;
112
113    /// Return true when this provider wants one callback per committed change batch.
114    ///
115    /// The default is `false`, preserving per-change panic/error isolation for
116    /// simple observers. Providers that maintain state whose invariants span
117    /// several changes in one commit can opt in and override [`Self::on_changes`]
118    /// to apply the batch under one internal lock.
119    fn handles_change_batches(&self) -> bool {
120        false
121    }
122
123    /// Observe all changes from one committed mutation batch.
124    ///
125    /// The default delegates to [`Self::on_change`] in order. Live fanout calls
126    /// this method only when [`Self::handles_change_batches`] returns true;
127    /// recovery may call it for all providers because no concurrent readers can
128    /// observe recovery-side intermediate state.
129    ///
130    /// # Errors
131    ///
132    /// Returns [`ProviderError`] for provider-local failures.
133    fn on_changes(&self, changes: &[Change]) -> Result<(), ProviderError> {
134        for change in changes {
135            self.on_change(change)?;
136        }
137        Ok(())
138    }
139
140    /// Rebuild provider-owned derived state from a recovered graph snapshot.
141    ///
142    /// Recovery calls this when a provider declares persisted snapshot sections,
143    /// a graph snapshot was applied, and that snapshot did not contain this
144    /// provider's section. Providers that can derive their complete state from
145    /// CORE graph rows should override this method.
146    ///
147    /// # Errors
148    ///
149    /// Returns [`ProviderError`] when the provider cannot rebuild safely.
150    fn rebuild_from_graph(&self, _graph: &SeleneGraph) -> Result<(), ProviderError> {
151        Err(ProviderError::Inconsistent {
152            reason: format!(
153                "provider {} has persisted sections but does not support graph rebuild",
154                self.provider_tag()
155            ),
156        })
157    }
158
159    /// Observe that every change for a committed graph generation was applied.
160    ///
161    /// Live fan-out calls this only after the provider's mutation callback path
162    /// completed without returning an error or panicking. Providers that expose
163    /// read-side state tied to graph generations can use this as their
164    /// visibility watermark.
165    ///
166    /// # Errors
167    ///
168    /// Returns [`ProviderError`] for provider-local failures.
169    fn on_commit_applied(&self, _generation: u64) -> Result<(), ProviderError> {
170        Ok(())
171    }
172
173    /// Return a provider-owned vector candidate set for `name` at `generation`.
174    ///
175    /// Providers that do not own named vector candidate state return `Ok(None)`.
176    /// A provider that owns the name but cannot prove the requested generation
177    /// should return an error instead of serving stale derived state.
178    ///
179    /// # Errors
180    ///
181    /// Returns [`ProviderError`] when the named state exists but is stale or
182    /// internally inconsistent.
183    fn vector_candidate_set(
184        &self,
185        _name: &DbString,
186        _generation: u64,
187    ) -> Result<Option<VectorCandidateSet>, ProviderError> {
188        Ok(None)
189    }
190
191    /// Return provider-owned vector candidate-state descriptors at `generation`.
192    ///
193    /// Providers that do not own named vector candidate state return an empty
194    /// vector. A provider that owns candidate state but cannot prove the
195    /// requested generation should return an error instead of exposing stale
196    /// metadata.
197    ///
198    /// # Errors
199    ///
200    /// Returns [`ProviderError`] when provider metadata is stale or internally
201    /// inconsistent.
202    fn vector_candidate_state_infos(
203        &self,
204        _generation: u64,
205    ) -> Result<Vec<VectorCandidateStateInfo>, ProviderError> {
206        Ok(Vec::new())
207    }
208
209    /// Provider-owned snapshot subsection tags.
210    ///
211    /// Empty means the provider consumes mutation events but owns no persisted
212    /// snapshot state.
213    fn declared_sub_tags(&self) -> &[SubTag];
214}
215
216/// Metadata for one provider-owned vector candidate state.
217#[derive(Clone, Debug, Eq, PartialEq)]
218pub struct VectorCandidateStateInfo {
219    /// Stable set name used by callers to retrieve candidates.
220    pub name: DbString,
221    /// Graph generation the descriptor was derived from.
222    pub generation: u64,
223    /// Number of nodes currently in the candidate set.
224    pub candidate_count: usize,
225    /// Optional node label required for membership.
226    pub required_label: Option<DbString>,
227    /// Outgoing edge labels required on the source node.
228    pub require_outgoing: Vec<DbString>,
229    /// Incoming edge labels required on the target node.
230    pub require_incoming: Vec<DbString>,
231    /// Outgoing edge labels that disqualify the source node.
232    pub exclude_outgoing: Vec<DbString>,
233    /// Incoming edge labels that disqualify the target node.
234    pub exclude_incoming: Vec<DbString>,
235}
236
237/// Errors returned by [`IndexProvider`] implementations.
238#[derive(Debug, thiserror::Error, miette::Diagnostic)]
239#[non_exhaustive]
240pub enum ProviderError {
241    /// Provider payload could not be decoded or validated.
242    #[error("invalid provider payload: {reason}")]
243    #[diagnostic(code(SLENE_G_010))]
244    InvalidPayload {
245        /// Human-readable provider failure reason.
246        reason: String,
247    },
248
249    /// Provider state could not be serialized.
250    #[error("provider serialization failed: {reason}")]
251    #[diagnostic(code(SLENE_G_012))]
252    SerializationFailed {
253        /// Human-readable serialization failure reason.
254        reason: String,
255    },
256
257    /// Provider state or registration is inconsistent.
258    #[error("provider state inconsistency: {reason}")]
259    #[diagnostic(code(SLENE_G_014))]
260    Inconsistent {
261        /// Human-readable inconsistency reason.
262        reason: String,
263    },
264}
265
266fn fmt_tag(bytes: [u8; 4], f: &mut fmt::Formatter<'_>) -> fmt::Result {
267    if bytes.iter().all(|byte| byte.is_ascii_graphic()) {
268        for byte in bytes {
269            f.write_str(char::from(byte).encode_utf8(&mut [0; 4]))?;
270        }
271        Ok(())
272    } else {
273        write!(
274            f,
275            "0x{:02X}{:02X}{:02X}{:02X}",
276            bytes[0], bytes[1], bytes[2], bytes[3]
277        )
278    }
279}
280
281#[cfg(test)]
282mod tests {
283    use parking_lot::Mutex;
284    use rstest::rstest;
285    use selene_core::{LabelSet, NodeId, PropertyMap};
286
287    use super::*;
288    use crate::{GraphError, GraphResult};
289
290    struct RecordingProvider {
291        tag: ProviderTag,
292        changes: Mutex<Vec<Change>>,
293    }
294
295    impl RecordingProvider {
296        fn new(tag: ProviderTag) -> Self {
297            Self {
298                tag,
299                changes: Mutex::new(Vec::new()),
300            }
301        }
302    }
303
304    impl IndexProvider for RecordingProvider {
305        fn provider_tag(&self) -> ProviderTag {
306            self.tag
307        }
308
309        fn read_section(&self, _sub_tag: SubTag, _bytes: &[u8]) -> Result<(), ProviderError> {
310            Ok(())
311        }
312
313        fn write_section(&self, _sub_tag: SubTag) -> Result<Vec<u8>, ProviderError> {
314            Ok(Vec::new())
315        }
316
317        fn on_change(&self, change: &Change) -> Result<(), ProviderError> {
318            self.changes.lock().push(change.clone());
319            Ok(())
320        }
321
322        fn declared_sub_tags(&self) -> &[SubTag] {
323            &[]
324        }
325    }
326
327    fn assert_send_sync_static<T: Send + Sync + 'static>() {}
328
329    #[test]
330    fn provider_tag_equality_and_ordering() {
331        let demo = ProviderTag(*b"DEMO");
332        let meta = ProviderTag(*b"META");
333        assert_eq!(demo, ProviderTag(*b"DEMO"));
334        assert!(demo < meta);
335        assert_eq!(demo.to_string(), "DEMO");
336    }
337
338    #[test]
339    fn sub_tag_equality_and_ordering() {
340        let graph = SubTag(*b"GRPH");
341        let subt = SubTag(*b"SUBT");
342        assert_eq!(graph, SubTag(*b"GRPH"));
343        assert!(graph < subt);
344        assert_eq!(graph.to_string(), "GRPH");
345    }
346
347    #[rstest]
348    #[case(ProviderError::InvalidPayload { reason: "bad".to_owned() })]
349    #[case(ProviderError::SerializationFailed { reason: "io".to_owned() })]
350    #[case(ProviderError::Inconsistent { reason: "duplicate".to_owned() })]
351    fn provider_error_gqlstatus_mappings(#[case] provider_error: ProviderError) {
352        let graph_error = GraphError::Provider(provider_error);
353        assert_eq!(graph_error.gqlstatus(), "5GQL0");
354    }
355
356    #[test]
357    fn dummy_provider_with_interior_mutability() -> GraphResult<()> {
358        assert_send_sync_static::<RecordingProvider>();
359        let provider = RecordingProvider::new(ProviderTag(*b"TEST"));
360        provider.on_change(&Change::NodeCreated {
361            id: NodeId::new(1),
362            labels: LabelSet::new(),
363            properties: PropertyMap::new(),
364        })?;
365        assert_eq!(provider.changes.lock().len(), 1);
366        assert_eq!(provider.provider_tag(), ProviderTag(*b"TEST"));
367        Ok(())
368    }
369}