1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
use crate::coordinate::Region;
use crate::store::index::{IndexEntry, StoreIndex};
use crate::store::{RestartPolicy, Store, StoreError};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
/// Cursor: pull-based event consumption with guaranteed delivery.
/// Reads from index, not channels. Cannot lose events.
/// [SPEC:src/store/cursor.rs]
pub struct Cursor {
region: Region,
position: u64, // tracks global_sequence — next poll starts after this
started: bool, // false until first event consumed (global_sequence 0 is valid)
index: Arc<StoreIndex>,
}
impl Cursor {
pub(crate) fn new(region: Region, index: Arc<StoreIndex>) -> Self {
Self {
region,
position: 0,
started: false,
index,
}
}
/// Poll for the next matching event at or after our current position.
pub fn poll(&mut self) -> Option<IndexEntry> {
let results = self.index.query(&self.region);
for entry in results {
if !self.started || entry.global_sequence > self.position {
self.position = entry.global_sequence;
self.started = true;
return Some(entry);
}
}
None
}
/// Poll for up to max matching events.
pub fn poll_batch(&mut self, max: usize) -> Vec<IndexEntry> {
let mut batch = Vec::with_capacity(max);
let results = self.index.query(&self.region);
for entry in results {
if !self.started || entry.global_sequence > self.position {
self.position = entry.global_sequence;
self.started = true;
batch.push(entry);
if batch.len() >= max {
break;
}
}
}
batch
}
pub(crate) fn checkpoint(&self) -> (u64, bool) {
(self.position, self.started)
}
pub(crate) fn restore_checkpoint(&mut self, position: u64, started: bool) {
self.position = position;
self.started = started;
}
}
/// Outcome returned by a cursor worker batch handler.
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum CursorWorkerAction {
/// Continue polling future batches.
Continue,
/// Stop the worker cleanly after the current handler returns.
Stop,
}
/// Configuration for a supervised cursor worker thread.
#[derive(Clone, Debug)]
pub struct CursorWorkerConfig {
/// Maximum number of matching events to hand to the handler at once.
pub batch_size: usize,
/// Sleep duration when no matching events are currently available.
pub idle_sleep: Duration,
/// Panic restart policy for the worker loop.
pub restart: RestartPolicy,
}
impl Default for CursorWorkerConfig {
fn default() -> Self {
Self {
batch_size: 64,
idle_sleep: Duration::from_millis(10),
restart: RestartPolicy::Once,
}
}
}
/// Handle for a background cursor worker.
pub struct CursorWorkerHandle {
stop: Arc<AtomicBool>,
join: Option<std::thread::JoinHandle<()>>,
}
impl CursorWorkerHandle {
/// Request a clean stop. The worker exits after the current iteration.
pub fn stop(&self) {
self.stop.store(true, Ordering::Release);
}
/// Join the worker thread and surface thread panics as store errors.
///
/// # Errors
/// Returns [`StoreError::WriterCrashed`] if the worker thread panicked
/// before it could exit cleanly.
pub fn join(mut self) -> Result<(), StoreError> {
self.stop();
if let Some(join) = self.join.take() {
join.join().map_err(|_| StoreError::WriterCrashed)?;
}
Ok(())
}
}
impl Drop for CursorWorkerHandle {
fn drop(&mut self) {
self.stop.store(true, Ordering::Release);
}
}
impl Store<crate::store::Open> {
/// Spawn a supervised cursor worker that processes guaranteed-delivery batches.
///
/// # Errors
/// Returns [`StoreError::Io`] if the background worker thread cannot be
/// spawned.
pub fn cursor_worker<F>(
self: &Arc<Self>,
region: &Region,
config: CursorWorkerConfig,
mut handler: F,
) -> Result<CursorWorkerHandle, StoreError>
where
F: FnMut(&[IndexEntry], &Store<crate::store::Open>) -> CursorWorkerAction + Send + 'static,
{
let store = Arc::clone(self);
let region = region.clone();
let stop = Arc::new(AtomicBool::new(false));
let stop_thread = Arc::clone(&stop);
let join = std::thread::Builder::new()
.name("batpak-cursor-worker".into())
.spawn(move || {
let mut cursor = store.cursor_guaranteed(®ion);
let mut committed = cursor.checkpoint();
let mut restarts = 0u32;
let mut window_start = Instant::now();
while !stop_thread.load(Ordering::Acquire) {
let batch = cursor.poll_batch(config.batch_size);
if batch.is_empty() {
std::thread::sleep(config.idle_sleep);
continue;
}
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
handler(&batch, &store)
}));
match result {
Ok(CursorWorkerAction::Continue) => {
committed = cursor.checkpoint();
}
Ok(CursorWorkerAction::Stop) => {
committed = cursor.checkpoint();
stop_thread.store(true, Ordering::Release);
}
Err(_) => {
let budget_ok = match &config.restart {
RestartPolicy::Once => {
if restarts >= 1 {
false
} else {
restarts += 1;
true
}
}
RestartPolicy::Bounded {
max_restarts,
within_ms,
} => {
if window_start.elapsed() > Duration::from_millis(*within_ms) {
restarts = 0;
window_start = Instant::now();
}
if restarts >= *max_restarts {
false
} else {
restarts += 1;
true
}
}
};
if !budget_ok {
tracing::error!(
"cursor worker restart budget exhausted; stopping worker"
);
stop_thread.store(true, Ordering::Release);
continue;
}
tracing::warn!(
"cursor worker panicked; restarting from last checkpoint"
);
cursor = store.cursor_guaranteed(®ion);
cursor.restore_checkpoint(committed.0, committed.1);
}
}
}
})
.map_err(StoreError::Io)?;
Ok(CursorWorkerHandle {
stop,
join: Some(join),
})
}
}