1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
use super::*;
impl Store<Open> {
/// LIFECYCLE
///
/// # Errors
/// Returns `StoreError::Io` if flushing the active segment to disk fails.
pub fn sync(&self) -> Result<(), StoreError> {
lifecycle::sync(self)
}
/// Block until the named watermark reaches `point` or `timeout` elapses.
///
/// This is the canonical wait entry point; the per-kind `wait_for_<kind>`
/// methods forward here. The single `match` over [`WatermarkKind`] is the
/// one source of truth mirroring the durability-gate dispatch.
///
/// # Errors
/// Returns [`StoreError::WaitTimeout`] if the watermark does not reach
/// `point` before `timeout`. Returns [`StoreError::WriterCrashed`] if the
/// writer panicked while the caller was waiting.
pub fn wait_for(
&self,
kind: WatermarkKind,
point: HlcPoint,
timeout: std::time::Duration,
) -> Result<(), StoreError> {
match kind {
WatermarkKind::Accepted => self.watermark_handle.wait_for_accepted(point, timeout),
WatermarkKind::Written => self.watermark_handle.wait_for_written(point, timeout),
WatermarkKind::Durable => self.watermark_handle.wait_for_durable(point, timeout),
WatermarkKind::Applied => self.watermark_handle.wait_for_applied(point, timeout),
WatermarkKind::Visible => self.watermark_handle.wait_for_visible(point, timeout),
WatermarkKind::Emitted => self.watermark_handle.wait_for_emitted(point, timeout),
}
}
/// Block until one lane's logical watermark reaches `point` or `timeout`.
///
/// Canonical lane wait entry point; the per-kind `wait_for_<kind>_lane`
/// methods forward here.
///
/// # Errors
/// Returns [`StoreError::WaitTimeout`] if that lane's watermark does not
/// reach `point` before `timeout`. Returns [`StoreError::WriterCrashed`] if
/// the writer panicked while the caller was waiting.
pub fn wait_for_on_lane(
&self,
kind: WatermarkKind,
lane: u32,
point: HlcPoint,
timeout: std::time::Duration,
) -> Result<(), StoreError> {
match kind {
WatermarkKind::Accepted => self
.watermark_handle
.wait_for_accepted_on_lane(lane, point, timeout),
WatermarkKind::Written => self
.watermark_handle
.wait_for_written_on_lane(lane, point, timeout),
WatermarkKind::Durable => self
.watermark_handle
.wait_for_durable_on_lane(lane, point, timeout),
WatermarkKind::Applied => self
.watermark_handle
.wait_for_applied_on_lane(lane, point, timeout),
WatermarkKind::Visible => self
.watermark_handle
.wait_for_visible_on_lane(lane, point, timeout),
WatermarkKind::Emitted => self
.watermark_handle
.wait_for_emitted_on_lane(lane, point, timeout),
}
}
/// Block until the durable frontier reaches `point` or `timeout` elapses.
///
/// # Errors
/// Returns [`StoreError::WaitTimeout`] if `durable_hlc` does not reach
/// `point` before `timeout`. Returns [`StoreError::WriterCrashed`] if the
/// writer panicked while the caller was waiting.
pub fn wait_for_durable(
&self,
point: HlcPoint,
timeout: std::time::Duration,
) -> Result<(), StoreError> {
self.wait_for(WatermarkKind::Durable, point, timeout)
}
/// Block until the accepted frontier reaches `point` or `timeout` elapses.
///
/// # Errors
/// Returns [`StoreError::WaitTimeout`] if `accepted_hlc` does not reach
/// `point` before `timeout`. Returns [`StoreError::WriterCrashed`] if the
/// writer panicked while the caller was waiting.
pub fn wait_for_accepted(
&self,
point: HlcPoint,
timeout: std::time::Duration,
) -> Result<(), StoreError> {
self.wait_for(WatermarkKind::Accepted, point, timeout)
}
/// Block until one lane's logical accepted frontier reaches `point`.
///
/// # Errors
/// Returns [`StoreError::WaitTimeout`] if that lane's accepted frontier does
/// not reach `point` before `timeout`. Returns [`StoreError::WriterCrashed`]
/// if the writer panicked while the caller was waiting.
pub fn wait_for_accepted_lane(
&self,
lane: u32,
point: HlcPoint,
timeout: std::time::Duration,
) -> Result<(), StoreError> {
self.wait_for_on_lane(WatermarkKind::Accepted, lane, point, timeout)
}
/// Block until the written frontier reaches `point` or `timeout` elapses.
///
/// # Errors
/// Returns [`StoreError::WaitTimeout`] if `written_hlc` does not reach
/// `point` before `timeout`. Returns [`StoreError::WriterCrashed`] if the
/// writer panicked while the caller was waiting.
pub fn wait_for_written(
&self,
point: HlcPoint,
timeout: std::time::Duration,
) -> Result<(), StoreError> {
self.wait_for(WatermarkKind::Written, point, timeout)
}
/// Block until one lane's logical written frontier reaches `point`.
///
/// # Errors
/// Returns [`StoreError::WaitTimeout`] if that lane's written frontier does
/// not reach `point` before `timeout`. Returns [`StoreError::WriterCrashed`]
/// if the writer panicked while the caller was waiting.
pub fn wait_for_written_lane(
&self,
lane: u32,
point: HlcPoint,
timeout: std::time::Duration,
) -> Result<(), StoreError> {
self.wait_for_on_lane(WatermarkKind::Written, lane, point, timeout)
}
/// Block until one lane's logical durable frontier reaches `point`.
///
/// # Errors
/// Returns [`StoreError::WaitTimeout`] if that lane's durable frontier does
/// not reach `point` before `timeout`. Returns [`StoreError::WriterCrashed`]
/// if the writer panicked while the caller was waiting.
pub fn wait_for_durable_lane(
&self,
lane: u32,
point: HlcPoint,
timeout: std::time::Duration,
) -> Result<(), StoreError> {
self.wait_for_on_lane(WatermarkKind::Durable, lane, point, timeout)
}
/// Block until the applied frontier reaches `point` or `timeout` elapses.
///
/// `applied_hlc` is the minimum applied HLC across registered projections,
/// so a single lagging projection can keep this wait blocked.
///
/// # Errors
/// Returns [`StoreError::WaitTimeout`] if `applied_hlc` does not reach
/// `point` before `timeout`. Returns [`StoreError::WriterCrashed`] if the
/// writer panicked while the caller was waiting.
pub fn wait_for_applied(
&self,
point: HlcPoint,
timeout: std::time::Duration,
) -> Result<(), StoreError> {
self.wait_for(WatermarkKind::Applied, point, timeout)
}
/// Block until one lane's logical applied frontier reaches `point`.
///
/// # Errors
/// Returns [`StoreError::WaitTimeout`] if that lane's applied frontier does
/// not reach `point` before `timeout`. Returns [`StoreError::WriterCrashed`]
/// if the writer panicked while the caller was waiting.
pub fn wait_for_applied_lane(
&self,
lane: u32,
point: HlcPoint,
timeout: std::time::Duration,
) -> Result<(), StoreError> {
self.wait_for_on_lane(WatermarkKind::Applied, lane, point, timeout)
}
/// Block until the visible frontier reaches `point` or `timeout` elapses.
///
/// # Errors
/// Returns [`StoreError::WaitTimeout`] if `visible_hlc` does not reach
/// `point` before `timeout`. Returns [`StoreError::WriterCrashed`] if the
/// writer panicked while the caller was waiting.
pub fn wait_for_visible(
&self,
point: HlcPoint,
timeout: std::time::Duration,
) -> Result<(), StoreError> {
self.wait_for(WatermarkKind::Visible, point, timeout)
}
/// Block until one lane's logical visible frontier reaches `point`.
///
/// # Errors
/// Returns [`StoreError::WaitTimeout`] if that lane's visible frontier does
/// not reach `point` before `timeout`. Returns [`StoreError::WriterCrashed`]
/// if the writer panicked while the caller was waiting.
pub fn wait_for_visible_lane(
&self,
lane: u32,
point: HlcPoint,
timeout: std::time::Duration,
) -> Result<(), StoreError> {
self.wait_for_on_lane(WatermarkKind::Visible, lane, point, timeout)
}
/// Block until the emitted frontier reaches `point` or `timeout` elapses.
///
/// # Errors
/// Returns [`StoreError::WaitTimeout`] if `emitted_hlc` does not reach
/// `point` before `timeout`. Returns [`StoreError::WriterCrashed`] if the
/// writer panicked while the caller was waiting.
pub fn wait_for_emitted(
&self,
point: HlcPoint,
timeout: std::time::Duration,
) -> Result<(), StoreError> {
self.wait_for(WatermarkKind::Emitted, point, timeout)
}
/// Block until one lane's logical emitted frontier reaches `point`.
///
/// # Errors
/// Returns [`StoreError::WaitTimeout`] if that lane's emitted frontier does
/// not reach `point` before `timeout`. Returns [`StoreError::WriterCrashed`]
/// if the writer panicked while the caller was waiting.
pub fn wait_for_emitted_lane(
&self,
lane: u32,
point: HlcPoint,
timeout: std::time::Duration,
) -> Result<(), StoreError> {
self.wait_for_on_lane(WatermarkKind::Emitted, lane, point, timeout)
}
/// Snapshot the current index to a destination directory and return
/// deterministic snapshot evidence.
///
/// # Errors
/// Returns `StoreError::Io` if creating the destination directory or copying segment files fails.
pub fn snapshot_with_evidence(
&self,
dest: &std::path::Path,
) -> Result<SnapshotEvidenceReport, StoreError> {
lifecycle::snapshot(self, dest, SnapshotOptions::default())
}
/// Snapshot with explicit [`SnapshotOptions`] — notably the keyset
/// portability policy for a store with payload encryption active (D24).
///
/// With the default [`KeysetPolicy::Refuse`] an encryption-active store fails
/// closed with `StoreError::KeysetNotPortable`; pass
/// [`KeysetPolicy::ExcludeKeys`] to produce a keys-excluded snapshot (the
/// keyset must then be carried out-of-band). A store without encryption is
/// unaffected.
///
/// # Errors
/// Returns `StoreError::KeysetNotPortable` when encryption is active under the
/// default policy, or `StoreError::Io` on a copy/setup failure.
pub fn snapshot_with_evidence_with_options(
&self,
dest: &std::path::Path,
options: SnapshotOptions,
) -> Result<SnapshotEvidenceReport, StoreError> {
lifecycle::snapshot(self, dest, options)
}
/// Deprecated snapshot wrapper that drops [`SnapshotEvidenceReport`].
///
/// # Errors
/// Returns `StoreError::Io` if creating the destination directory or copying segment files fails.
#[deprecated(note = "use snapshot_with_evidence; snapshot evidence is now first-class")]
pub fn snapshot(&self, dest: &std::path::Path) -> Result<(), StoreError> {
self.snapshot_with_evidence(dest).map(|_| ())
}
/// Fork the current store into a self-contained destination directory and
/// return deterministic fork evidence.
///
/// The destination is not opened by this method. Callers that want to use
/// the fork should open it explicitly after this method returns, preserving
/// the copied directory without appending lifecycle events during the copy.
///
/// # Errors
/// Returns `StoreError::Io` if creating the destination, clearing stale
/// store artifacts, or copying/linking source files fails.
pub fn fork_with_evidence(
&self,
dest: &std::path::Path,
options: ForkOptions,
) -> Result<ForkReport, StoreError> {
lifecycle::fork(self, dest, options)
}
/// Fork the current store with default [`ForkOptions`], dropping the
/// deterministic evidence report.
///
/// # Errors
/// Returns any error surfaced by [`Store::fork_with_evidence`].
pub fn fork(&self, dest: &std::path::Path) -> Result<(), StoreError> {
self.fork_with_evidence(dest, ForkOptions::default())
.map(|_| ())
}
/// Compact: merge sealed segments, optionally filtering events.
/// The active (currently-written) segment is never touched.
///
/// # F6 / FREEZE-4 swap contract
///
/// The in-memory index is rebuilt off-side from the post-merge segment
/// layout and then published as a single atomic swap under an exclusive
/// lock (see `StoreIndex::replace_contents_from_fresh`). Reader-facing
/// methods (`query`, `stream`, `cursor_guaranteed` polls, etc.) take a
/// read guard on the same lock, so a concurrent reader observes either
/// the pre-compact index or the post-compact index — never a cleared or
/// partially rebuilt view.
///
/// Failure modes are surfaced through the returned
/// [`segment::CompactionResult`]. The accompanying
/// [`CompactionReportBody`] is always returned as deterministic evidence
/// for the compaction decision and observed outcome.
///
/// * [`segment::CompactionOutcome::Performed`] — the segment merge
/// happened and the live index has been swapped for the fresh one.
/// * [`segment::CompactionOutcome::Skipped`] — the sealed-segment count
/// was below `min_segments`; no disk or index work was done.
/// * [`segment::CompactionOutcome::Failed`] — the off-side rebuild
/// aborted before the swap point; the live index has not been
/// mutated, and the pending-compaction marker preserves a coherent
/// reopen path until cleanup completes.
///
/// Appends that arrive during compaction are safe (they go to the active
/// segment which is not compacted). `sync()` is called before and after
/// the segment merge so the off-side rebuild sees a quiescent on-disk
/// state; for maximum safety, avoid high-throughput appends during
/// compaction.
///
/// # Errors
/// Returns `StoreError::Io` if reading, writing, or removing segment
/// files fails. A rebuild failure is NOT an error — it is reported via
/// `CompactionOutcome::Failed`.
pub fn compact(
&self,
config: &CompactionConfig,
) -> Result<
(
crate::store::segment::CompactionResult,
CompactionReportBody,
),
StoreError,
> {
lifecycle::compact(self, config)
}
/// LIFECYCLE: flush pending writes and shut down the writer thread cleanly.
///
/// # Errors
/// Returns `StoreError::WriterCrashed` if the writer thread has already exited unexpectedly.
pub fn close(self) -> Result<Closed, StoreError> {
lifecycle::close(self)
}
}