selene_graph/index_provider.rs
1//! Stateful provider protocol for engine-owned derived state.
2//!
3//! `IndexProvider` is the engine-internal hook through which the CORE storage
4//! provider, maintained candidate-state providers, and recovery-side replay
5//! providers participate in snapshot bootstrap and per-commit mutation
6//! observation. selene-db is a single native engine with no loadable
7//! extensions; providers are first-party engine internals, not loadable packs.
8
9use std::fmt;
10
11use selene_core::{Change, DbString};
12
13use crate::{SeleneGraph, VectorCandidateSet};
14
15/// Stable 4-byte ASCII identifier for an [`IndexProvider`] registration.
16///
17/// Reserved tag space per spec 04 section 4.2:
18/// - `CORE` is reserved for engine-owned snapshot sections.
19/// - `META`/`NODE`/`EDGE`/`SCMA` are reserved sub-tags under `CORE`, not
20/// provider tags.
21/// - First-party provider allocations include `CSET`, `TIMS`, `GRPR`.
22/// - Other ASCII uppercase 4-byte sequences are provider-allocated.
23#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
24pub struct ProviderTag(
25 /// Raw 4-byte provider tag.
26 pub [u8; 4],
27);
28
29impl fmt::Display for ProviderTag {
30 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
31 fmt_tag(self.0, f)
32 }
33}
34
35/// 4-byte subsection identifier within a provider's snapshot footprint.
36#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
37pub struct SubTag(
38 /// Raw 4-byte provider-local subsection tag.
39 pub [u8; 4],
40);
41
42impl fmt::Display for SubTag {
43 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
44 fmt_tag(self.0, f)
45 }
46}
47
48/// Stateful hook for engine-owned derived-state participation.
49///
50/// [`IndexProvider::read_section`] and [`IndexProvider::on_change`] take
51/// `&self`: `selene-graph` stores providers as `Arc<dyn IndexProvider>`, so
52/// providers use interior mutability for owned state. The engine guarantees
53/// serialized calls per graph.
54///
55/// ## Change-shape contract
56///
57/// Runtime commit fan-out observes the post-commit graph snapshot. When a
58/// mutation stages per-row tombstone expansions for
59/// [`Change::NodesOfTypeTruncated`], [`Change::EdgesOfTypeTruncated`], or
60/// [`Change::GraphReset`], live fan-out receives the expanded
61/// `NodeDeleted`/`EdgeDeleted` view instead of the persisted declarative change.
62/// WAL replay is different: recovery drives providers with the exact committed
63/// `Change` payloads after CORE applies them. Providers whose derived state
64/// depends on deleted rows must therefore handle the declarative truncate/reset
65/// variants or rebuild their state from the recovered graph before serving
66/// reads.
67///
68/// ## Re-entrancy contract
69///
70/// `on_change` MUST NOT initiate a write transaction on the same graph,
71/// directly or indirectly. The engine detects same-thread re-entry into
72/// `SharedGraph::begin_write` and panics with a clear message; the panic
73/// is caught by the outer `notify_providers` boundary so the outer commit
74/// still completes.
75///
76/// **Cross-thread re-entry is documented misuse.** A provider whose
77/// `on_change` spawns a worker thread, calls `begin_write` on that worker,
78/// and waits for the worker (e.g., `JoinHandle::join`, channel `recv`) is a
79/// circular wait the engine cannot detect: the worker blocks on the held
80/// write lock; the outer `on_change` blocks waiting for the worker; the
81/// outer commit cannot release the lock until `on_change` returns.
82/// Provider authors who spawn threads inside `on_change` must not block
83/// the callback on those threads' progress.
84pub trait IndexProvider: Send + Sync + 'static {
85 /// Stable 4-byte ASCII tag uniquely identifying this provider.
86 fn provider_tag(&self) -> ProviderTag;
87
88 /// Snapshot bootstrap for one provider-owned section.
89 ///
90 /// # Errors
91 ///
92 /// Returns [`ProviderError`] when the payload is missing, malformed, or
93 /// inconsistent with provider state.
94 fn read_section(&self, sub_tag: SubTag, bytes: &[u8]) -> Result<(), ProviderError>;
95
96 /// Snapshot publish for one provider-owned section.
97 ///
98 /// # Errors
99 ///
100 /// Returns [`ProviderError`] when serialization cannot produce a stable
101 /// section payload.
102 fn write_section(&self, sub_tag: SubTag) -> Result<Vec<u8>, ProviderError>;
103
104 /// Observe one committed mutation.
105 ///
106 /// # Errors
107 ///
108 /// Returns [`ProviderError`] for provider-local failures. Live commits log
109 /// and continue after these errors because the graph snapshot has already
110 /// been published.
111 fn on_change(&self, change: &Change) -> Result<(), ProviderError>;
112
113 /// Return true when this provider wants one callback per committed change batch.
114 ///
115 /// The default is `false`, preserving per-change panic/error isolation for
116 /// simple observers. Providers that maintain state whose invariants span
117 /// several changes in one commit can opt in and override [`Self::on_changes`]
118 /// to apply the batch under one internal lock.
119 fn handles_change_batches(&self) -> bool {
120 false
121 }
122
123 /// Observe all changes from one committed mutation batch.
124 ///
125 /// The default delegates to [`Self::on_change`] in order. Live fanout calls
126 /// this method only when [`Self::handles_change_batches`] returns true;
127 /// recovery may call it for all providers because no concurrent readers can
128 /// observe recovery-side intermediate state.
129 ///
130 /// # Errors
131 ///
132 /// Returns [`ProviderError`] for provider-local failures.
133 fn on_changes(&self, changes: &[Change]) -> Result<(), ProviderError> {
134 for change in changes {
135 self.on_change(change)?;
136 }
137 Ok(())
138 }
139
140 /// Rebuild provider-owned derived state from a recovered graph snapshot.
141 ///
142 /// Recovery calls this when a provider declares persisted snapshot sections,
143 /// a graph snapshot was applied, and that snapshot did not contain this
144 /// provider's section. Providers that can derive their complete state from
145 /// CORE graph rows should override this method.
146 ///
147 /// # Errors
148 ///
149 /// Returns [`ProviderError`] when the provider cannot rebuild safely.
150 fn rebuild_from_graph(&self, _graph: &SeleneGraph) -> Result<(), ProviderError> {
151 Err(ProviderError::Inconsistent {
152 reason: format!(
153 "provider {} has persisted sections but does not support graph rebuild",
154 self.provider_tag()
155 ),
156 })
157 }
158
159 /// Observe that every change for a committed graph generation was applied.
160 ///
161 /// Live fan-out calls this only after the provider's mutation callback path
162 /// completed without returning an error or panicking. Providers that expose
163 /// read-side state tied to graph generations can use this as their
164 /// visibility watermark.
165 ///
166 /// # Errors
167 ///
168 /// Returns [`ProviderError`] for provider-local failures.
169 fn on_commit_applied(&self, _generation: u64) -> Result<(), ProviderError> {
170 Ok(())
171 }
172
173 /// Return a provider-owned vector candidate set for `name` at `generation`.
174 ///
175 /// Providers that do not own named vector candidate state return `Ok(None)`.
176 /// A provider that owns the name but cannot prove the requested generation
177 /// should return an error instead of serving stale derived state.
178 ///
179 /// # Errors
180 ///
181 /// Returns [`ProviderError`] when the named state exists but is stale or
182 /// internally inconsistent.
183 fn vector_candidate_set(
184 &self,
185 _name: &DbString,
186 _generation: u64,
187 ) -> Result<Option<VectorCandidateSet>, ProviderError> {
188 Ok(None)
189 }
190
191 /// Return provider-owned vector candidate-state descriptors at `generation`.
192 ///
193 /// Providers that do not own named vector candidate state return an empty
194 /// vector. A provider that owns candidate state but cannot prove the
195 /// requested generation should return an error instead of exposing stale
196 /// metadata.
197 ///
198 /// # Errors
199 ///
200 /// Returns [`ProviderError`] when provider metadata is stale or internally
201 /// inconsistent.
202 fn vector_candidate_state_infos(
203 &self,
204 _generation: u64,
205 ) -> Result<Vec<VectorCandidateStateInfo>, ProviderError> {
206 Ok(Vec::new())
207 }
208
209 /// Provider-owned snapshot subsection tags.
210 ///
211 /// Empty means the provider consumes mutation events but owns no persisted
212 /// snapshot state.
213 fn declared_sub_tags(&self) -> &[SubTag];
214}
215
216/// Metadata for one provider-owned vector candidate state.
217#[derive(Clone, Debug, Eq, PartialEq)]
218pub struct VectorCandidateStateInfo {
219 /// Stable set name used by callers to retrieve candidates.
220 pub name: DbString,
221 /// Graph generation the descriptor was derived from.
222 pub generation: u64,
223 /// Number of nodes currently in the candidate set.
224 pub candidate_count: usize,
225 /// Optional node label required for membership.
226 pub required_label: Option<DbString>,
227 /// Outgoing edge labels required on the source node.
228 pub require_outgoing: Vec<DbString>,
229 /// Incoming edge labels required on the target node.
230 pub require_incoming: Vec<DbString>,
231 /// Outgoing edge labels that disqualify the source node.
232 pub exclude_outgoing: Vec<DbString>,
233 /// Incoming edge labels that disqualify the target node.
234 pub exclude_incoming: Vec<DbString>,
235}
236
237/// Errors returned by [`IndexProvider`] implementations.
238#[derive(Debug, thiserror::Error, miette::Diagnostic)]
239#[non_exhaustive]
240pub enum ProviderError {
241 /// Provider payload could not be decoded or validated.
242 #[error("invalid provider payload: {reason}")]
243 #[diagnostic(code(SLENE_G_010))]
244 InvalidPayload {
245 /// Human-readable provider failure reason.
246 reason: String,
247 },
248
249 /// Provider state could not be serialized.
250 #[error("provider serialization failed: {reason}")]
251 #[diagnostic(code(SLENE_G_012))]
252 SerializationFailed {
253 /// Human-readable serialization failure reason.
254 reason: String,
255 },
256
257 /// Provider state or registration is inconsistent.
258 #[error("provider state inconsistency: {reason}")]
259 #[diagnostic(code(SLENE_G_014))]
260 Inconsistent {
261 /// Human-readable inconsistency reason.
262 reason: String,
263 },
264}
265
266fn fmt_tag(bytes: [u8; 4], f: &mut fmt::Formatter<'_>) -> fmt::Result {
267 if bytes.iter().all(|byte| byte.is_ascii_graphic()) {
268 for byte in bytes {
269 f.write_str(char::from(byte).encode_utf8(&mut [0; 4]))?;
270 }
271 Ok(())
272 } else {
273 write!(
274 f,
275 "0x{:02X}{:02X}{:02X}{:02X}",
276 bytes[0], bytes[1], bytes[2], bytes[3]
277 )
278 }
279}
280
281#[cfg(test)]
282mod tests {
283 use parking_lot::Mutex;
284 use rstest::rstest;
285 use selene_core::{LabelSet, NodeId, PropertyMap};
286
287 use super::*;
288 use crate::{GraphError, GraphResult};
289
290 struct RecordingProvider {
291 tag: ProviderTag,
292 changes: Mutex<Vec<Change>>,
293 }
294
295 impl RecordingProvider {
296 fn new(tag: ProviderTag) -> Self {
297 Self {
298 tag,
299 changes: Mutex::new(Vec::new()),
300 }
301 }
302 }
303
304 impl IndexProvider for RecordingProvider {
305 fn provider_tag(&self) -> ProviderTag {
306 self.tag
307 }
308
309 fn read_section(&self, _sub_tag: SubTag, _bytes: &[u8]) -> Result<(), ProviderError> {
310 Ok(())
311 }
312
313 fn write_section(&self, _sub_tag: SubTag) -> Result<Vec<u8>, ProviderError> {
314 Ok(Vec::new())
315 }
316
317 fn on_change(&self, change: &Change) -> Result<(), ProviderError> {
318 self.changes.lock().push(change.clone());
319 Ok(())
320 }
321
322 fn declared_sub_tags(&self) -> &[SubTag] {
323 &[]
324 }
325 }
326
327 fn assert_send_sync_static<T: Send + Sync + 'static>() {}
328
329 #[test]
330 fn provider_tag_equality_and_ordering() {
331 let demo = ProviderTag(*b"DEMO");
332 let meta = ProviderTag(*b"META");
333 assert_eq!(demo, ProviderTag(*b"DEMO"));
334 assert!(demo < meta);
335 assert_eq!(demo.to_string(), "DEMO");
336 }
337
338 #[test]
339 fn sub_tag_equality_and_ordering() {
340 let graph = SubTag(*b"GRPH");
341 let subt = SubTag(*b"SUBT");
342 assert_eq!(graph, SubTag(*b"GRPH"));
343 assert!(graph < subt);
344 assert_eq!(graph.to_string(), "GRPH");
345 }
346
347 #[rstest]
348 #[case(ProviderError::InvalidPayload { reason: "bad".to_owned() })]
349 #[case(ProviderError::SerializationFailed { reason: "io".to_owned() })]
350 #[case(ProviderError::Inconsistent { reason: "duplicate".to_owned() })]
351 fn provider_error_gqlstatus_mappings(#[case] provider_error: ProviderError) {
352 let graph_error = GraphError::Provider(provider_error);
353 assert_eq!(graph_error.gqlstatus(), "5GQL0");
354 }
355
356 #[test]
357 fn dummy_provider_with_interior_mutability() -> GraphResult<()> {
358 assert_send_sync_static::<RecordingProvider>();
359 let provider = RecordingProvider::new(ProviderTag(*b"TEST"));
360 provider.on_change(&Change::NodeCreated {
361 id: NodeId::new(1),
362 labels: LabelSet::new(),
363 properties: PropertyMap::new(),
364 })?;
365 assert_eq!(provider.changes.lock().len(), 1);
366 assert_eq!(provider.provider_tag(), ProviderTag(*b"TEST"));
367 Ok(())
368 }
369}