Skip to main content

selene_graph/
recover.rs

1//! SharedGraph recovery helpers backed by selene-persist.
2
3use std::path::Path;
4use std::sync::Arc;
5
6use selene_core::{Change, GraphId};
7use selene_persist::{
8    AuditLog, DEFAULT_AUDIT_FILE_NAME, DEFAULT_WAL_FILE_NAME, ProviderRegistry, RecoveryError,
9    RecoveryProvider, RecoveryResult, WalConfig, WalWriter,
10};
11
12use crate::core_provider::CoreProvider;
13use crate::graph_types::GraphTypeDef;
14use crate::index_provider::{IndexProvider, ProviderError, SubTag};
15use crate::shared::validate_unique_provider_tags;
16use crate::{GraphResult, SharedGraph};
17
18impl SharedGraph {
19    /// Recover an open (GG01) shared graph from a persistence directory.
20    ///
21    /// `graph_id` is the caller-asserted identity. If a snapshot is present
22    /// and declares a `bound_type`, recovery fails — a closed graph must be
23    /// recovered via [`SharedGraph::recover_closed`].
24    ///
25    /// # Errors
26    ///
27    /// Returns persistence errors, [`crate::GraphError::Provider`] when a
28    /// snapshot disagrees with `graph_id` or declares a closed binding, or
29    /// graph errors when the recovered state cannot be materialized.
30    pub fn recover(dir: &Path, graph_id: GraphId) -> GraphResult<Self> {
31        Self::recover_inner(dir, graph_id, None, Vec::new())
32    }
33
34    /// Recover an open (GG01) shared graph with extension index providers.
35    ///
36    /// Each supplied provider participates in snapshot-section recovery and WAL
37    /// replay, then remains attached to the returned graph so future commits
38    /// route to the same provider set.
39    ///
40    /// # Errors
41    ///
42    /// Returns persistence errors, [`crate::GraphError::Provider`] when provider
43    /// tags are duplicated or recovered provider state is inconsistent, or graph
44    /// errors when the recovered state cannot be materialized.
45    pub fn recover_with_providers(
46        dir: &Path,
47        graph_id: GraphId,
48        providers: Vec<Arc<dyn IndexProvider>>,
49    ) -> GraphResult<Self> {
50        Self::recover_inner(dir, graph_id, None, providers)
51    }
52
53    /// Recover a closed (GG02) shared graph bound to `bound_type`.
54    ///
55    /// `graph_id` and `bound_type` are caller-asserted; recovery validates
56    /// against the snapshot:
57    ///
58    /// - If the snapshot's `CORE/META` references a `bound_type` via
59    ///   `CORE/GTYP`, it must equal `bound_type` or recovery fails (drift).
60    /// - If the snapshot declares no binding but `bound_type` is provided,
61    ///   recovery fails (snapshot says open, caller says closed).
62    /// - If no snapshot is present (WAL-only or empty-dir), the caller's
63    ///   `bound_type` is used and validation runs against replayed state.
64    ///
65    /// # Errors
66    ///
67    /// Returns persistence errors, [`crate::GraphError::Provider`] on type
68    /// drift / inconsistency, [`crate::GraphError::TypeViolation`] when
69    /// recovered entities don't conform to `bound_type`, or graph errors
70    /// when the recovered state cannot be materialized.
71    pub fn recover_closed(
72        dir: &Path,
73        graph_id: GraphId,
74        bound_type: GraphTypeDef,
75    ) -> GraphResult<Self> {
76        Self::recover_inner(dir, graph_id, Some(Arc::new(bound_type)), Vec::new())
77    }
78
79    /// Recover a closed (GG02) shared graph with extension index providers.
80    ///
81    /// This is the provider-aware counterpart to [`SharedGraph::recover_closed`].
82    /// Recovery validates the caller-supplied `bound_type`, rebuilds the supplied
83    /// index providers from snapshot sections and WAL replay, and returns a live
84    /// WAL-backed graph with those providers still attached.
85    ///
86    /// # Errors
87    ///
88    /// Returns persistence errors, [`crate::GraphError::Provider`] on provider
89    /// tag duplication or type drift,
90    /// [`crate::GraphError::TypeViolation`] when recovered entities don't
91    /// conform to `bound_type`, or graph errors when the recovered state cannot
92    /// be materialized.
93    pub fn recover_closed_with_providers(
94        dir: &Path,
95        graph_id: GraphId,
96        bound_type: GraphTypeDef,
97        providers: Vec<Arc<dyn IndexProvider>>,
98    ) -> GraphResult<Self> {
99        Self::recover_inner(dir, graph_id, Some(Arc::new(bound_type)), providers)
100    }
101
102    fn recover_inner(
103        dir: &Path,
104        graph_id: GraphId,
105        expected_bound_type: Option<Arc<GraphTypeDef>>,
106        providers: Vec<Arc<dyn IndexProvider>>,
107    ) -> GraphResult<Self> {
108        let core = CoreProvider::new_for_recovery();
109        validate_recovery_provider_tags(&core, &providers)?;
110        let mut registry = ProviderRegistry::new();
111        let provider: Arc<dyn RecoveryProvider> = core.clone();
112        registry.register(provider)?;
113        register_index_recovery_providers(&mut registry, &providers)?;
114        let outcome = selene_persist::recover(dir, &registry)?;
115        let mut graph = core.finish_recovery(graph_id, expected_bound_type)?;
116        // The committed graph generation must reflect every change that was
117        // replayed. Snapshot+WAL recovery applies WAL entries past the
118        // snapshot's sequence; without this bump, the next mutation would
119        // increment from a stale snapshot generation, regressing or
120        // duplicating sequencing relative to the recovered tip.
121        graph.meta.generation = graph.meta.generation.max(outcome.last_wal_seq);
122        mark_recovered_provider_generation(&providers, &graph, &outcome)?;
123        // Reopen the WAL file as a live writer so post-recovery commits
124        // continue to append durably. Without this, recover() returns a
125        // graph whose commits go to memory only — a crash after recovery
126        // would lose every post-recovery change even though the feature
127        // advertises live WAL durability.
128        //
129        // v1.2 BRIEF 2: the single committer is the sole fsync caller, so the
130        // reopened WAL is driven in OnFlushOnly (the committer flushes once per
131        // drained run). Recovery uses CommitBatching::Off below, so the committer
132        // still fsyncs once per post-recovery commit — durability is unchanged.
133        let writer = WalWriter::open(
134            &dir.join(DEFAULT_WAL_FILE_NAME),
135            WalConfig {
136                sync_policy: selene_persist::SyncPolicy::OnFlushOnly,
137                ..WalConfig::default()
138            },
139        )?;
140        // Reattach the audit log iff one exists on disk (its `SLAU` presence
141        // means the embedder enabled audit) so post-recovery lifecycle commits
142        // keep being mirrored. The historical events already persist in the file
143        // — recovery never re-derives them, and the WAL replay above does not
144        // re-mirror (write_commit is live-only). Absent file → no audit.
145        let audit_path = dir.join(DEFAULT_AUDIT_FILE_NAME);
146        let audit_log = if audit_path.exists() {
147            Some(AuditLog::open(&audit_path)?)
148        } else {
149            None
150        };
151        Self::from_graph_with_core_and_durables(
152            graph,
153            providers,
154            Vec::new(),
155            Some(writer),
156            audit_log,
157            // Recovery uses the BRIEF-1-equivalent policy: one fsync per commit.
158            crate::committer_batch::CommitBatching::Off,
159        )
160    }
161}
162
163/// Recovery wrapper that drives a runtime [`IndexProvider`] through
164/// [`selene-persist`]'s `RecoveryProvider` interface during snapshot-section
165/// reads and WAL replay.
166struct IndexRecoveryProvider {
167    provider: Arc<dyn IndexProvider>,
168}
169
170impl RecoveryProvider for IndexRecoveryProvider {
171    fn provider_tag(&self) -> [u8; 4] {
172        self.provider.provider_tag().0
173    }
174
175    fn read_section(&self, sub: [u8; 4], bytes: &[u8]) -> RecoveryResult<()> {
176        self.provider
177            .read_section(SubTag(sub), bytes)
178            .map_err(recovery_error)
179    }
180
181    fn on_change(&self, change: &Change) -> RecoveryResult<()> {
182        self.provider.on_change(change).map_err(recovery_error)
183    }
184
185    fn on_changes(&self, changes: &[Change]) -> RecoveryResult<()> {
186        self.provider.on_changes(changes).map_err(recovery_error)
187    }
188}
189
190fn validate_recovery_provider_tags(
191    core: &Arc<CoreProvider>,
192    providers: &[Arc<dyn IndexProvider>],
193) -> GraphResult<()> {
194    let mut all_providers = Vec::with_capacity(providers.len() + 1);
195    let core_provider: Arc<dyn IndexProvider> = core.clone();
196    all_providers.push(core_provider);
197    all_providers.extend(providers.iter().cloned());
198    validate_unique_provider_tags(&all_providers)
199}
200
201fn register_index_recovery_providers(
202    registry: &mut ProviderRegistry,
203    providers: &[Arc<dyn IndexProvider>],
204) -> GraphResult<()> {
205    for provider in providers {
206        let recovery_provider: Arc<dyn RecoveryProvider> = Arc::new(IndexRecoveryProvider {
207            provider: Arc::clone(provider),
208        });
209        registry.register(recovery_provider)?;
210    }
211    Ok(())
212}
213
214fn mark_recovered_provider_generation(
215    providers: &[Arc<dyn IndexProvider>],
216    graph: &crate::SeleneGraph,
217    outcome: &selene_persist::RecoveryOutcome,
218) -> GraphResult<()> {
219    for provider in providers {
220        let provider_tag = provider.provider_tag().0;
221        let missing_snapshot_state = outcome.applied_snapshot_seq > 0
222            && !provider.declared_sub_tags().is_empty()
223            && outcome
224                .snapshot_providers_invoked
225                .binary_search(&provider_tag)
226                .is_err();
227        if missing_snapshot_state {
228            provider.rebuild_from_graph(graph)?;
229        }
230        provider.on_commit_applied(graph.meta.generation)?;
231    }
232    Ok(())
233}
234
235fn recovery_error(error: ProviderError) -> RecoveryError {
236    Box::new(error)
237}
238
239#[cfg(test)]
240#[path = "recover_tests.rs"]
241mod tests;