1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
//! Named crash-injection points for `vector.turbo` (issue #694).
//!
//! Defines the contract that the crash-safety slice (#673) drives its
//! kill-point tests against. Production builds compile [`fire`] down
//! to a no-op; test builds and the `turbo-crash-inject` feature build
//! pull in the installable [`TurboCrashInjector`] trait so a test can
//! observe — or panic at — any of the four named boundaries in the
//! durable-write sequence.
//!
//! Boundary semantics, in the INSERT write order:
//!
//! 1. [`InjectionPoint::BeforeWalFsync`] — fired right before the WAL
//! durability handshake on the per-vector `WalRecord::VectorInsert`.
//! A crash here loses the insert entirely (WAL replay is empty).
//! 2. [`InjectionPoint::BeforeIndexCommit`] — fired after the WAL
//! insert is durable but before the in-memory `TurboQuantIndex`
//! observes the new vector. A crash here is recovered on boot by
//! WAL replay.
//! 3. [`InjectionPoint::BeforeExtentFsync`] — fired after the
//! in-memory index has been mutated but before the encoded codes
//! are appended to the persistent `TurboExtent`. A crash here is
//! recovered the same way (WAL replay re-encodes, the
//! deterministic codec seed reproduces the identical extent
//! bytes).
//! 4. [`InjectionPoint::MidCheckpoint`] — fired from inside the
//! checkpoint accounting loop when it encounters a turbo
//! `VectorInsert` record. A crash here leaves the WAL intact and
//! recovery resumes from the prior checkpoint LSN.
//!
//! This slice (#694) wires the production call sites for points 1–3
//! in the INSERT path and point 4 in the checkpoint accounting loop.
//! #673 attaches the actual kill-point tests against these names.
/// Named boundary in the durable-write / checkpoint pipeline. Stable
/// public contract: variant names appear in #673's test assertions.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum InjectionPoint {
BeforeWalFsync,
BeforeIndexCommit,
BeforeExtentFsync,
MidCheckpoint,
}
/// Test-only contract for observing the named boundaries.
/// Implementations may panic or longjmp to simulate a process crash.
#[cfg(any(test, feature = "turbo-crash-inject"))]
pub trait TurboCrashInjector: Send + Sync {
fn before(&self, point: InjectionPoint);
}
#[cfg(any(test, feature = "turbo-crash-inject"))]
mod test_only {
use super::*;
use std::sync::Arc;
use std::sync::RwLock;
static INJECTOR: RwLock<Option<Arc<dyn TurboCrashInjector>>> = RwLock::new(None);
/// Install a process-global injector. Test-only / feature-gated.
/// Returns the previously installed injector, if any.
pub fn install(
injector: Option<Arc<dyn TurboCrashInjector>>,
) -> Option<Arc<dyn TurboCrashInjector>> {
let mut guard = INJECTOR.write().expect("turbo injector lock");
std::mem::replace(&mut *guard, injector)
}
pub(crate) fn current() -> Option<Arc<dyn TurboCrashInjector>> {
INJECTOR.read().ok().and_then(|g| g.clone())
}
}
#[cfg(any(test, feature = "turbo-crash-inject"))]
pub use test_only::install;
/// Fire a named injection point. No-op in production builds.
#[inline]
pub fn fire(point: InjectionPoint) {
#[cfg(any(test, feature = "turbo-crash-inject"))]
{
if let Some(injector) = test_only::current() {
injector.before(point);
}
}
#[cfg(not(any(test, feature = "turbo-crash-inject")))]
{
let _ = point;
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
struct Counter {
hits: AtomicUsize,
target: InjectionPoint,
}
impl TurboCrashInjector for Counter {
fn before(&self, point: InjectionPoint) {
if point == self.target {
self.hits.fetch_add(1, Ordering::Relaxed);
}
}
}
// The injector slot is process-global; tests that touch it must
// serialize against each other to avoid stomping on each other's
// installed state.
fn injector_test_lock() -> &'static Mutex<()> {
static LOCK: std::sync::OnceLock<Mutex<()>> = std::sync::OnceLock::new();
LOCK.get_or_init(|| Mutex::new(()))
}
#[test]
fn fire_invokes_installed_injector_at_named_point() {
let _guard = injector_test_lock().lock().unwrap();
let counter = Arc::new(Counter {
hits: AtomicUsize::new(0),
target: InjectionPoint::BeforeIndexCommit,
});
let prev = install(Some(counter.clone()));
fire(InjectionPoint::BeforeWalFsync);
fire(InjectionPoint::BeforeIndexCommit);
fire(InjectionPoint::BeforeIndexCommit);
fire(InjectionPoint::BeforeExtentFsync);
fire(InjectionPoint::MidCheckpoint);
assert_eq!(counter.hits.load(Ordering::Relaxed), 2);
let _ = install(prev);
}
#[test]
fn fire_without_installed_injector_is_a_no_op() {
let _guard = injector_test_lock().lock().unwrap();
let prev = install(None);
fire(InjectionPoint::BeforeWalFsync);
fire(InjectionPoint::MidCheckpoint);
let _ = install(prev);
}
}