selene_graph/recover.rs
1//! SharedGraph recovery helpers backed by selene-persist.
2
3use std::path::Path;
4use std::sync::Arc;
5
6use selene_core::{Change, GraphId};
7use selene_persist::{
8 AuditLog, DEFAULT_AUDIT_FILE_NAME, DEFAULT_WAL_FILE_NAME, ProviderRegistry, RecoveryError,
9 RecoveryProvider, RecoveryResult, WalConfig, WalWriter,
10};
11
12use crate::core_provider::CoreProvider;
13use crate::graph_types::GraphTypeDef;
14use crate::index_provider::{IndexProvider, ProviderError, SubTag};
15use crate::shared::validate_unique_provider_tags;
16use crate::{GraphResult, SharedGraph};
17
18impl SharedGraph {
19 /// Recover an open (GG01) shared graph from a persistence directory.
20 ///
21 /// `graph_id` is the caller-asserted identity. If a snapshot is present
22 /// and declares a `bound_type`, recovery fails — a closed graph must be
23 /// recovered via [`SharedGraph::recover_closed`].
24 ///
25 /// # Errors
26 ///
27 /// Returns persistence errors, [`crate::GraphError::Provider`] when a
28 /// snapshot disagrees with `graph_id` or declares a closed binding, or
29 /// graph errors when the recovered state cannot be materialized.
30 pub fn recover(dir: &Path, graph_id: GraphId) -> GraphResult<Self> {
31 Self::recover_inner(dir, graph_id, None, Vec::new())
32 }
33
34 /// Recover an open (GG01) shared graph with extension index providers.
35 ///
36 /// Each supplied provider participates in snapshot-section recovery and WAL
37 /// replay, then remains attached to the returned graph so future commits
38 /// route to the same provider set.
39 ///
40 /// # Errors
41 ///
42 /// Returns persistence errors, [`crate::GraphError::Provider`] when provider
43 /// tags are duplicated or recovered provider state is inconsistent, or graph
44 /// errors when the recovered state cannot be materialized.
45 pub fn recover_with_providers(
46 dir: &Path,
47 graph_id: GraphId,
48 providers: Vec<Arc<dyn IndexProvider>>,
49 ) -> GraphResult<Self> {
50 Self::recover_inner(dir, graph_id, None, providers)
51 }
52
53 /// Recover a closed (GG02) shared graph bound to `bound_type`.
54 ///
55 /// `graph_id` and `bound_type` are caller-asserted; recovery validates
56 /// against the snapshot:
57 ///
58 /// - If the snapshot's `CORE/META` references a `bound_type` via
59 /// `CORE/GTYP`, it must equal `bound_type` or recovery fails (drift).
60 /// - If the snapshot declares no binding but `bound_type` is provided,
61 /// recovery fails (snapshot says open, caller says closed).
62 /// - If no snapshot is present (WAL-only or empty-dir), the caller's
63 /// `bound_type` is used and validation runs against replayed state.
64 ///
65 /// # Errors
66 ///
67 /// Returns persistence errors, [`crate::GraphError::Provider`] on type
68 /// drift / inconsistency, [`crate::GraphError::TypeViolation`] when
69 /// recovered entities don't conform to `bound_type`, or graph errors
70 /// when the recovered state cannot be materialized.
71 pub fn recover_closed(
72 dir: &Path,
73 graph_id: GraphId,
74 bound_type: GraphTypeDef,
75 ) -> GraphResult<Self> {
76 Self::recover_inner(dir, graph_id, Some(Arc::new(bound_type)), Vec::new())
77 }
78
79 /// Recover a closed (GG02) shared graph with extension index providers.
80 ///
81 /// This is the provider-aware counterpart to [`SharedGraph::recover_closed`].
82 /// Recovery validates the caller-supplied `bound_type`, rebuilds the supplied
83 /// index providers from snapshot sections and WAL replay, and returns a live
84 /// WAL-backed graph with those providers still attached.
85 ///
86 /// # Errors
87 ///
88 /// Returns persistence errors, [`crate::GraphError::Provider`] on provider
89 /// tag duplication or type drift,
90 /// [`crate::GraphError::TypeViolation`] when recovered entities don't
91 /// conform to `bound_type`, or graph errors when the recovered state cannot
92 /// be materialized.
93 pub fn recover_closed_with_providers(
94 dir: &Path,
95 graph_id: GraphId,
96 bound_type: GraphTypeDef,
97 providers: Vec<Arc<dyn IndexProvider>>,
98 ) -> GraphResult<Self> {
99 Self::recover_inner(dir, graph_id, Some(Arc::new(bound_type)), providers)
100 }
101
102 fn recover_inner(
103 dir: &Path,
104 graph_id: GraphId,
105 expected_bound_type: Option<Arc<GraphTypeDef>>,
106 providers: Vec<Arc<dyn IndexProvider>>,
107 ) -> GraphResult<Self> {
108 let core = CoreProvider::new_for_recovery();
109 validate_recovery_provider_tags(&core, &providers)?;
110 let mut registry = ProviderRegistry::new();
111 let provider: Arc<dyn RecoveryProvider> = core.clone();
112 registry.register(provider)?;
113 register_index_recovery_providers(&mut registry, &providers)?;
114 let outcome = selene_persist::recover(dir, ®istry)?;
115 let mut graph = core.finish_recovery(graph_id, expected_bound_type)?;
116 // The committed graph generation must reflect every change that was
117 // replayed. Snapshot+WAL recovery applies WAL entries past the
118 // snapshot's sequence; without this bump, the next mutation would
119 // increment from a stale snapshot generation, regressing or
120 // duplicating sequencing relative to the recovered tip.
121 graph.meta.generation = graph.meta.generation.max(outcome.last_wal_seq);
122 mark_recovered_provider_generation(&providers, &graph, &outcome)?;
123 // Reopen the WAL file as a live writer so post-recovery commits
124 // continue to append durably. Without this, recover() returns a
125 // graph whose commits go to memory only — a crash after recovery
126 // would lose every post-recovery change even though the feature
127 // advertises live WAL durability.
128 //
129 // v1.2 BRIEF 2: the single committer is the sole fsync caller, so the
130 // reopened WAL is driven in OnFlushOnly (the committer flushes once per
131 // drained run). Recovery uses CommitBatching::Off below, so the committer
132 // still fsyncs once per post-recovery commit — durability is unchanged.
133 let writer = WalWriter::open(
134 &dir.join(DEFAULT_WAL_FILE_NAME),
135 WalConfig {
136 sync_policy: selene_persist::SyncPolicy::OnFlushOnly,
137 ..WalConfig::default()
138 },
139 )?;
140 // Reattach the audit log iff one exists on disk (its `SLAU` presence
141 // means the embedder enabled audit) so post-recovery lifecycle commits
142 // keep being mirrored. The historical events already persist in the file
143 // — recovery never re-derives them, and the WAL replay above does not
144 // re-mirror (write_commit is live-only). Absent file → no audit.
145 let audit_path = dir.join(DEFAULT_AUDIT_FILE_NAME);
146 let audit_log = if audit_path.exists() {
147 Some(AuditLog::open(&audit_path)?)
148 } else {
149 None
150 };
151 Self::from_graph_with_core_and_durables(
152 graph,
153 providers,
154 Vec::new(),
155 Some(writer),
156 audit_log,
157 // Recovery uses the BRIEF-1-equivalent policy: one fsync per commit.
158 crate::committer_batch::CommitBatching::Off,
159 )
160 }
161}
162
163/// Recovery wrapper that drives a runtime [`IndexProvider`] through
164/// [`selene-persist`]'s `RecoveryProvider` interface during snapshot-section
165/// reads and WAL replay.
166struct IndexRecoveryProvider {
167 provider: Arc<dyn IndexProvider>,
168}
169
170impl RecoveryProvider for IndexRecoveryProvider {
171 fn provider_tag(&self) -> [u8; 4] {
172 self.provider.provider_tag().0
173 }
174
175 fn read_section(&self, sub: [u8; 4], bytes: &[u8]) -> RecoveryResult<()> {
176 self.provider
177 .read_section(SubTag(sub), bytes)
178 .map_err(recovery_error)
179 }
180
181 fn on_change(&self, change: &Change) -> RecoveryResult<()> {
182 self.provider.on_change(change).map_err(recovery_error)
183 }
184
185 fn on_changes(&self, changes: &[Change]) -> RecoveryResult<()> {
186 self.provider.on_changes(changes).map_err(recovery_error)
187 }
188}
189
190fn validate_recovery_provider_tags(
191 core: &Arc<CoreProvider>,
192 providers: &[Arc<dyn IndexProvider>],
193) -> GraphResult<()> {
194 let mut all_providers = Vec::with_capacity(providers.len() + 1);
195 let core_provider: Arc<dyn IndexProvider> = core.clone();
196 all_providers.push(core_provider);
197 all_providers.extend(providers.iter().cloned());
198 validate_unique_provider_tags(&all_providers)
199}
200
201fn register_index_recovery_providers(
202 registry: &mut ProviderRegistry,
203 providers: &[Arc<dyn IndexProvider>],
204) -> GraphResult<()> {
205 for provider in providers {
206 let recovery_provider: Arc<dyn RecoveryProvider> = Arc::new(IndexRecoveryProvider {
207 provider: Arc::clone(provider),
208 });
209 registry.register(recovery_provider)?;
210 }
211 Ok(())
212}
213
214fn mark_recovered_provider_generation(
215 providers: &[Arc<dyn IndexProvider>],
216 graph: &crate::SeleneGraph,
217 outcome: &selene_persist::RecoveryOutcome,
218) -> GraphResult<()> {
219 for provider in providers {
220 let provider_tag = provider.provider_tag().0;
221 let missing_snapshot_state = outcome.applied_snapshot_seq > 0
222 && !provider.declared_sub_tags().is_empty()
223 && outcome
224 .snapshot_providers_invoked
225 .binary_search(&provider_tag)
226 .is_err();
227 if missing_snapshot_state {
228 provider.rebuild_from_graph(graph)?;
229 }
230 provider.on_commit_applied(graph.meta.generation)?;
231 }
232 Ok(())
233}
234
235fn recovery_error(error: ProviderError) -> RecoveryError {
236 Box::new(error)
237}
238
239#[cfg(test)]
240#[path = "recover_tests.rs"]
241mod tests;