1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
//! SharedGraph recovery helpers backed by selene-persist.
use std::path::Path;
use std::sync::Arc;
use selene_core::{Change, GraphId};
use selene_persist::{
AuditLog, DEFAULT_AUDIT_FILE_NAME, DEFAULT_WAL_FILE_NAME, ProviderRegistry, RecoveryError,
RecoveryProvider, RecoveryResult, WalConfig, WalWriter,
};
use crate::core_provider::CoreProvider;
use crate::graph_types::GraphTypeDef;
use crate::index_provider::{IndexProvider, ProviderError, SubTag};
use crate::shared::validate_unique_provider_tags;
use crate::{GraphResult, SharedGraph};
impl SharedGraph {
/// Recover an open (GG01) shared graph from a persistence directory.
///
/// `graph_id` is the caller-asserted identity. If a snapshot is present
/// and declares a `bound_type`, recovery fails — a closed graph must be
/// recovered via [`SharedGraph::recover_closed`].
///
/// # Errors
///
/// Returns persistence errors, [`crate::GraphError::Provider`] when a
/// snapshot disagrees with `graph_id` or declares a closed binding, or
/// graph errors when the recovered state cannot be materialized.
pub fn recover(dir: &Path, graph_id: GraphId) -> GraphResult<Self> {
Self::recover_inner(dir, graph_id, None, Vec::new())
}
/// Recover an open (GG01) shared graph with extension index providers.
///
/// Each supplied provider participates in snapshot-section recovery and WAL
/// replay, then remains attached to the returned graph so future commits
/// route to the same provider set.
///
/// # Errors
///
/// Returns persistence errors, [`crate::GraphError::Provider`] when provider
/// tags are duplicated or recovered provider state is inconsistent, or graph
/// errors when the recovered state cannot be materialized.
pub fn recover_with_providers(
dir: &Path,
graph_id: GraphId,
providers: Vec<Arc<dyn IndexProvider>>,
) -> GraphResult<Self> {
Self::recover_inner(dir, graph_id, None, providers)
}
/// Recover a closed (GG02) shared graph bound to `bound_type`.
///
/// `graph_id` and `bound_type` are caller-asserted; recovery validates
/// against the snapshot:
///
/// - If the snapshot's `CORE/META` references a `bound_type` via
/// `CORE/GTYP`, it must equal `bound_type` or recovery fails (drift).
/// - If the snapshot declares no binding but `bound_type` is provided,
/// recovery fails (snapshot says open, caller says closed).
/// - If no snapshot is present (WAL-only or empty-dir), the caller's
/// `bound_type` is used and validation runs against replayed state.
///
/// # Errors
///
/// Returns persistence errors, [`crate::GraphError::Provider`] on type
/// drift / inconsistency, [`crate::GraphError::TypeViolation`] when
/// recovered entities don't conform to `bound_type`, or graph errors
/// when the recovered state cannot be materialized.
pub fn recover_closed(
dir: &Path,
graph_id: GraphId,
bound_type: GraphTypeDef,
) -> GraphResult<Self> {
Self::recover_inner(dir, graph_id, Some(Arc::new(bound_type)), Vec::new())
}
/// Recover a closed (GG02) shared graph with extension index providers.
///
/// This is the provider-aware counterpart to [`SharedGraph::recover_closed`].
/// Recovery validates the caller-supplied `bound_type`, rebuilds the supplied
/// index providers from snapshot sections and WAL replay, and returns a live
/// WAL-backed graph with those providers still attached.
///
/// # Errors
///
/// Returns persistence errors, [`crate::GraphError::Provider`] on provider
/// tag duplication or type drift,
/// [`crate::GraphError::TypeViolation`] when recovered entities don't
/// conform to `bound_type`, or graph errors when the recovered state cannot
/// be materialized.
pub fn recover_closed_with_providers(
dir: &Path,
graph_id: GraphId,
bound_type: GraphTypeDef,
providers: Vec<Arc<dyn IndexProvider>>,
) -> GraphResult<Self> {
Self::recover_inner(dir, graph_id, Some(Arc::new(bound_type)), providers)
}
fn recover_inner(
dir: &Path,
graph_id: GraphId,
expected_bound_type: Option<Arc<GraphTypeDef>>,
providers: Vec<Arc<dyn IndexProvider>>,
) -> GraphResult<Self> {
let core = CoreProvider::new_for_recovery();
validate_recovery_provider_tags(&core, &providers)?;
let mut registry = ProviderRegistry::new();
let provider: Arc<dyn RecoveryProvider> = core.clone();
registry.register(provider)?;
register_index_recovery_providers(&mut registry, &providers)?;
let outcome = selene_persist::recover(dir, ®istry)?;
let mut graph = core.finish_recovery(graph_id, expected_bound_type)?;
// The committed graph generation must reflect every change that was
// replayed. Snapshot+WAL recovery applies WAL entries past the
// snapshot's sequence; without this bump, the next mutation would
// increment from a stale snapshot generation, regressing or
// duplicating sequencing relative to the recovered tip.
graph.meta.generation = graph.meta.generation.max(outcome.last_wal_seq);
mark_recovered_provider_generation(&providers, &graph, &outcome)?;
// Reopen the WAL file as a live writer so post-recovery commits
// continue to append durably. Without this, recover() returns a
// graph whose commits go to memory only — a crash after recovery
// would lose every post-recovery change even though the feature
// advertises live WAL durability.
//
// v1.2 BRIEF 2: the single committer is the sole fsync caller, so the
// reopened WAL is driven in OnFlushOnly (the committer flushes once per
// drained run). Recovery uses CommitBatching::Off below, so the committer
// still fsyncs once per post-recovery commit — durability is unchanged.
let writer = WalWriter::open(
&dir.join(DEFAULT_WAL_FILE_NAME),
WalConfig {
sync_policy: selene_persist::SyncPolicy::OnFlushOnly,
..WalConfig::default()
},
)?;
// Reattach the audit log iff one exists on disk (its `SLAU` presence
// means the embedder enabled audit) so post-recovery lifecycle commits
// keep being mirrored. The historical events already persist in the file
// — recovery never re-derives them, and the WAL replay above does not
// re-mirror (write_commit is live-only). Absent file → no audit.
let audit_path = dir.join(DEFAULT_AUDIT_FILE_NAME);
let audit_log = if audit_path.exists() {
Some(AuditLog::open(&audit_path)?)
} else {
None
};
Self::from_graph_with_core_and_durables(
graph,
providers,
Vec::new(),
Some(writer),
audit_log,
// Recovery uses the BRIEF-1-equivalent policy: one fsync per commit.
crate::committer_batch::CommitBatching::Off,
)
}
}
/// Recovery wrapper that drives a runtime [`IndexProvider`] through
/// [`selene-persist`]'s `RecoveryProvider` interface during snapshot-section
/// reads and WAL replay.
struct IndexRecoveryProvider {
provider: Arc<dyn IndexProvider>,
}
impl RecoveryProvider for IndexRecoveryProvider {
fn provider_tag(&self) -> [u8; 4] {
self.provider.provider_tag().0
}
fn read_section(&self, sub: [u8; 4], bytes: &[u8]) -> RecoveryResult<()> {
self.provider
.read_section(SubTag(sub), bytes)
.map_err(recovery_error)
}
fn on_change(&self, change: &Change) -> RecoveryResult<()> {
self.provider.on_change(change).map_err(recovery_error)
}
fn on_changes(&self, changes: &[Change]) -> RecoveryResult<()> {
self.provider.on_changes(changes).map_err(recovery_error)
}
}
fn validate_recovery_provider_tags(
core: &Arc<CoreProvider>,
providers: &[Arc<dyn IndexProvider>],
) -> GraphResult<()> {
let mut all_providers = Vec::with_capacity(providers.len() + 1);
let core_provider: Arc<dyn IndexProvider> = core.clone();
all_providers.push(core_provider);
all_providers.extend(providers.iter().cloned());
validate_unique_provider_tags(&all_providers)
}
fn register_index_recovery_providers(
registry: &mut ProviderRegistry,
providers: &[Arc<dyn IndexProvider>],
) -> GraphResult<()> {
for provider in providers {
let recovery_provider: Arc<dyn RecoveryProvider> = Arc::new(IndexRecoveryProvider {
provider: Arc::clone(provider),
});
registry.register(recovery_provider)?;
}
Ok(())
}
fn mark_recovered_provider_generation(
providers: &[Arc<dyn IndexProvider>],
graph: &crate::SeleneGraph,
outcome: &selene_persist::RecoveryOutcome,
) -> GraphResult<()> {
for provider in providers {
let provider_tag = provider.provider_tag().0;
let missing_snapshot_state = outcome.applied_snapshot_seq > 0
&& !provider.declared_sub_tags().is_empty()
&& outcome
.snapshot_providers_invoked
.binary_search(&provider_tag)
.is_err();
if missing_snapshot_state {
provider.rebuild_from_graph(graph)?;
}
provider.on_commit_applied(graph.meta.generation)?;
}
Ok(())
}
fn recovery_error(error: ProviderError) -> RecoveryError {
Box::new(error)
}
#[cfg(test)]
#[path = "recover_tests.rs"]
mod tests;