1mod recovery_state;
4mod sections;
5
6use std::sync::{
7 Arc,
8 atomic::{AtomicU64, Ordering},
9};
10
11use arc_swap::ArcSwap;
12use parking_lot::Mutex;
13use selene_core::{Change, HlcTimestamp, Origin};
14use selene_persist::{
15 AuditLog, AuditRecord, RecoveryError, RecoveryProvider, RecoveryResult, WalWriter,
16};
17
18use crate::core_provider::recovery_state::RecoveryState;
19use crate::core_provider::sections::{
20 encode_composite_schemas, encode_edges, encode_graph_types, encode_meta, encode_nodes,
21 encode_schemas, encode_text_schemas, encode_vector_schemas,
22};
23use crate::durable_provider::DurableProvider;
24use crate::error::GraphResult;
25use crate::graph::SeleneGraph;
26use crate::index_provider::{IndexProvider, ProviderError, ProviderTag, SubTag};
27
28pub const CORE_PROVIDER_TAG: [u8; 4] = *b"CORE";
30pub const CORE_META_SUB: [u8; 4] = *b"META";
32pub const CORE_GTYP_SUB: [u8; 4] = *b"GTYP";
34pub const CORE_NODE_SUB: [u8; 4] = *b"NODE";
36pub const CORE_EDGE_SUB: [u8; 4] = *b"EDGE";
38pub const CORE_SCMA_SUB: [u8; 4] = *b"SCMA";
40pub const CORE_CPIX_SUB: [u8; 4] = *b"CPIX";
42pub const CORE_VIDX_SUB: [u8; 4] = *b"VIDX";
44pub const CORE_TIDX_SUB: [u8; 4] = *b"TIDX";
46
47const CORE_SUB_TAGS: &[SubTag] = &[
48 SubTag(CORE_GTYP_SUB),
49 SubTag(CORE_META_SUB),
50 SubTag(CORE_NODE_SUB),
51 SubTag(CORE_EDGE_SUB),
52 SubTag(CORE_SCMA_SUB),
53 SubTag(CORE_CPIX_SUB),
54 SubTag(CORE_VIDX_SUB),
55 SubTag(CORE_TIDX_SUB),
56];
57
58pub struct CoreProvider {
64 inner: Mutex<CoreInner>,
65}
66
67enum CoreInner {
68 Live {
69 snapshot: Arc<ArcSwap<SeleneGraph>>,
70 durable: Option<DurableState>,
71 },
72 Recovery {
73 state: Box<RecoveryState>,
74 },
75}
76
77pub struct DurableState {
83 writer: Mutex<WalWriter>,
84 next_hlc: AtomicU64,
85 audit: Option<Mutex<AuditLog>>,
86}
87
88impl DurableState {
89 #[must_use]
91 pub fn new(writer: WalWriter) -> Self {
92 let last_sequence = writer.last_sequence();
93 Self {
94 writer: Mutex::new(writer),
95 next_hlc: AtomicU64::new(last_sequence),
96 audit: None,
97 }
98 }
99
100 #[must_use]
111 pub fn with_audit_log(mut self, audit: AuditLog) -> Self {
112 self.audit = Some(Mutex::new(audit));
113 self
114 }
115
116 pub fn append_audit_event(&self, kind: u16, payload: Vec<u8>) -> bool {
132 let Some(audit) = &self.audit else {
133 return false;
134 };
135 let record = AuditRecord {
136 recorded_at_unix_nanos: unix_nanos_now(),
137 kind,
138 payload,
139 };
140 let mut log = audit.lock();
141 match log.append(&record) {
142 Ok(()) => true,
143 Err(error) => {
144 tracing::error!(%error, "audit: failed to append engine event");
145 false
146 }
147 }
148 }
149}
150
151fn unix_nanos_now() -> u64 {
153 std::time::SystemTime::now()
154 .duration_since(std::time::UNIX_EPOCH)
155 .map(|elapsed| u64::try_from(elapsed.as_nanos()).unwrap_or(u64::MAX))
156 .unwrap_or(0)
157}
158
159impl CoreProvider {
160 #[must_use]
162 pub fn new_for_live(snapshot: Arc<ArcSwap<SeleneGraph>>) -> Arc<Self> {
163 Self::new_for_live_with_wal(snapshot, None)
164 }
165
166 #[must_use]
168 pub fn new_for_live_with_wal(
169 snapshot: Arc<ArcSwap<SeleneGraph>>,
170 durable: Option<DurableState>,
171 ) -> Arc<Self> {
172 Arc::new(Self {
173 inner: Mutex::new(CoreInner::Live { snapshot, durable }),
174 })
175 }
176
177 #[must_use]
179 pub fn new_for_recovery() -> Arc<Self> {
180 Arc::new(Self {
181 inner: Mutex::new(CoreInner::Recovery {
182 state: Box::new(RecoveryState::new()),
183 }),
184 })
185 }
186
187 pub fn finish_recovery(
202 self: Arc<Self>,
203 expected_graph_id: selene_core::GraphId,
204 expected_bound_type: Option<Arc<crate::graph_types::GraphTypeDef>>,
205 ) -> GraphResult<SeleneGraph> {
206 let mut inner = self.inner.lock();
207 match &mut *inner {
208 CoreInner::Live { .. } => {
209 Err(inconsistent("finish_recovery called on live-mode CoreProvider").into())
210 }
211 CoreInner::Recovery { state } => {
212 let state = std::mem::take(&mut **state);
213 state.into_graph(expected_graph_id, expected_bound_type)
214 }
215 }
216 }
217
218 fn read_section_inner(&self, sub_tag: SubTag, bytes: &[u8]) -> Result<(), ProviderError> {
219 let mut inner = self.inner.lock();
220 match &mut *inner {
221 CoreInner::Live { .. } => Err(inconsistent(
222 "read_section called on live-mode CoreProvider",
223 )),
224 CoreInner::Recovery { state } => state.read_section(sub_tag, bytes),
225 }
226 }
227
228 fn write_section_inner(&self, sub_tag: SubTag) -> Result<Vec<u8>, ProviderError> {
229 let graph = {
234 let inner = self.inner.lock();
235 match &*inner {
236 CoreInner::Live { snapshot, .. } => snapshot.load_full(),
237 CoreInner::Recovery { .. } => {
238 return Err(inconsistent(
239 "write_section called on recovery-mode CoreProvider",
240 ));
241 }
242 }
243 };
244 match sub_tag.0 {
245 CORE_GTYP_SUB => encode_graph_types(&graph),
246 CORE_META_SUB => encode_meta(&graph.meta, graph.meta.generation),
247 CORE_NODE_SUB => encode_nodes(&graph),
248 CORE_EDGE_SUB => encode_edges(&graph),
249 CORE_SCMA_SUB => encode_schemas(&graph),
250 CORE_CPIX_SUB => encode_composite_schemas(&graph),
251 CORE_VIDX_SUB => encode_vector_schemas(&graph),
252 CORE_TIDX_SUB => encode_text_schemas(&graph),
253 _ => Err(invalid_sub_tag(sub_tag)),
254 }
255 }
256
257 fn on_change_inner(&self, change: &Change) -> Result<(), ProviderError> {
258 let mut inner = self.inner.lock();
259 match &mut *inner {
260 CoreInner::Live { .. } => Ok(()),
261 CoreInner::Recovery { state } => state.apply_change(change),
262 }
263 }
264}
265
266impl IndexProvider for CoreProvider {
267 fn provider_tag(&self) -> ProviderTag {
268 ProviderTag(CORE_PROVIDER_TAG)
269 }
270
271 fn read_section(&self, sub_tag: SubTag, bytes: &[u8]) -> Result<(), ProviderError> {
272 self.read_section_inner(sub_tag, bytes)
273 }
274
275 fn write_section(&self, sub_tag: SubTag) -> Result<Vec<u8>, ProviderError> {
276 self.write_section_inner(sub_tag)
277 }
278
279 fn on_change(&self, change: &Change) -> Result<(), ProviderError> {
280 self.on_change_inner(change)
281 }
282
283 fn declared_sub_tags(&self) -> &[SubTag] {
284 CORE_SUB_TAGS
285 }
286}
287
288impl DurableProvider for CoreProvider {
289 fn provider_tag(&self) -> ProviderTag {
290 ProviderTag(CORE_PROVIDER_TAG)
291 }
292
293 fn next_timestamp(&self) -> HlcTimestamp {
294 let inner = self.inner.lock();
295 match &*inner {
296 CoreInner::Live {
297 durable: Some(durable),
298 ..
299 } => {
300 let seconds = durable
301 .next_hlc
302 .fetch_add(1, Ordering::Relaxed)
303 .saturating_add(1);
304 HlcTimestamp::new(seconds, 0)
305 }
306 CoreInner::Live { durable: None, .. } | CoreInner::Recovery { .. } => {
307 HlcTimestamp::zero()
308 }
309 }
310 }
311
312 fn write_commit(
313 &self,
314 principal: Option<&Arc<[u8]>>,
315 changes: &[Change],
316 timestamp: HlcTimestamp,
317 ) -> Result<u64, ProviderError> {
318 let mut inner = self.inner.lock();
319 match &mut *inner {
320 CoreInner::Live {
321 durable: Some(durable),
322 ..
323 } => {
324 let principal = principal.cloned();
327 let sequence = {
329 let mut writer = durable.writer.lock();
330 writer
331 .append(timestamp, Origin::Local, principal, changes)
332 .map_err(durable_error)?
333 };
334 Ok(sequence)
335 }
336 CoreInner::Live { durable: None, .. } => Ok(0),
337 CoreInner::Recovery { .. } => Err(inconsistent(
338 "write_commit called on recovery-mode CoreProvider",
339 )),
340 }
341 }
342
343 fn flush(&self) -> Result<Option<u64>, ProviderError> {
344 let mut inner = self.inner.lock();
345 match &mut *inner {
346 CoreInner::Live {
347 durable: Some(durable),
348 ..
349 } => {
350 let mut writer = durable.writer.lock();
351 writer.flush().map_err(durable_error)?;
352 Ok(Some(writer.last_sequence()))
353 }
354 CoreInner::Live { durable: None, .. } | CoreInner::Recovery { .. } => Ok(None),
355 }
356 }
357}
358
359impl RecoveryProvider for CoreProvider {
360 fn provider_tag(&self) -> [u8; 4] {
361 CORE_PROVIDER_TAG
362 }
363
364 fn read_section(&self, sub: [u8; 4], bytes: &[u8]) -> RecoveryResult<()> {
365 self.read_section_inner(SubTag(sub), bytes)
366 .map_err(box_provider_error)
367 }
368
369 fn on_changes(&self, changes: &[Change]) -> RecoveryResult<()> {
373 for change in changes {
374 self.on_change_inner(change).map_err(box_provider_error)?;
375 }
376 Ok(())
377 }
378}
379
380pub(crate) fn invalid_payload(reason: impl Into<String>) -> ProviderError {
381 ProviderError::InvalidPayload {
382 reason: reason.into(),
383 }
384}
385
386fn durable_error(error: impl std::error::Error) -> ProviderError {
387 ProviderError::SerializationFailed {
388 reason: error.to_string(),
389 }
390}
391
392pub(crate) fn serialization_failed(reason: impl Into<String>) -> ProviderError {
393 ProviderError::SerializationFailed {
394 reason: reason.into(),
395 }
396}
397
398pub(crate) fn inconsistent(reason: impl Into<String>) -> ProviderError {
399 ProviderError::Inconsistent {
400 reason: reason.into(),
401 }
402}
403
404fn invalid_sub_tag(sub_tag: SubTag) -> ProviderError {
405 invalid_payload(format!("unknown CORE sub-tag {sub_tag}"))
406}
407
408fn box_provider_error(error: ProviderError) -> RecoveryError {
409 Box::new(error)
410}
411
412#[cfg(test)]
413#[path = "core_provider/tests.rs"]
414mod tests;