coalesce_worker/lib.rs
1//! A coalescing worker thread with generation-counter stale-result
2//! rejection — the discipline needed to run tree-sitter (or any
3//! expensive incremental parser) off the main thread without
4//! corrupting state with out-of-date results.
5//!
6//! # The problem
7//!
8//! Suppose a GUI editor offloads syntax highlighting to a background
9//! thread. The user types rapidly; the main thread fires off highlight
10//! requests, one per keystroke. The worker can only process one at a
11//! time. By the time request N-5 finishes, the source has moved to
12//! state N, and the result computed from the stale N-5 source is no
13//! longer valid — if the main thread applies it anyway, the UI shows
14//! spans pointing at byte offsets that no longer exist, causing
15//! rendering glitches or panics.
16//!
17//! This class of bug is documented in `goliajp/devops/dotclaude/common/classic-errors.md`
18//! as **"Stale async cache after mutation"**.
19//!
20//! # The fix
21//!
22//! Two disciplines enforced by this crate:
23//!
24//! - **Request coalescing** — when the worker finishes one job, it
25//! drains any queued requests and processes only the latest; older
26//! requests that never started are silently discarded.
27//! - **Generation counting** — every submitted request gets a monotonic
28//! generation number. When the main thread polls for results, it
29//! drains the receive channel and keeps only the newest generation;
30//! older results are dropped without being applied.
31//!
32//! # Example: tree-sitter highlighting
33//!
34//! ```no_run
35//! use coalesce_worker::{Worker, Coalescer};
36//! use std::sync::Arc;
37//!
38//! # #[cfg(any())] {
39//! struct HighlightWorker {
40//! highlighter: tree_sitter_highlight::Highlighter,
41//! }
42//!
43//! struct HighlightRequest {
44//! source: Arc<Vec<u8>>,
45//! config: Arc<tree_sitter_highlight::HighlightConfiguration>,
46//! }
47//!
48//! struct HighlightResponse {
49//! events: Vec<tree_sitter_highlight::HighlightEvent>,
50//! }
51//!
52//! impl Worker for HighlightWorker {
53//! type Request = HighlightRequest;
54//! type Response = HighlightResponse;
55//! fn handle(&mut self, req: Self::Request) -> Self::Response {
56//! let events = self.highlighter
57//! .highlight(&req.config, &req.source, None, |_| None)
58//! .unwrap()
59//! .collect::<Result<Vec<_>, _>>()
60//! .unwrap();
61//! HighlightResponse { events }
62//! }
63//! }
64//! # }
65//! ```
66//!
67//! Then drive the coalescer from the main thread:
68//!
69//! ```no_run
70//! # use coalesce_worker::{Worker, Coalescer, Output};
71//! # struct MyWorker;
72//! # struct Req;
73//! # struct Res;
74//! # impl Worker for MyWorker {
75//! # type Request = Req;
76//! # type Response = Res;
77//! # fn handle(&mut self, _r: Req) -> Res { Res }
78//! # }
79//! # let mut coalescer = Coalescer::new(MyWorker);
80//! # fn current_source() -> Req { Req }
81//! # fn render_highlights(_r: Res) {}
82//! // main loop
83//! coalescer.submit(current_source());
84//! if let Some(Output { generation: _, value }) = coalescer.poll() {
85//! render_highlights(value);
86//! }
87//! ```
88//!
89//! # Not only for tree-sitter
90//!
91//! Any long-running background computation that can be superseded —
92//! rebuilding a suggestion index, recompiling a preview, running a
93//! linter — fits the same pattern.
94
95#![deny(missing_docs)]
96
97use std::sync::mpsc::{self, Receiver, Sender};
98use std::thread;
99
100/// A worker that owns its processing state and handles one request
101/// at a time.
102///
103/// The implementation should be free of shared mutable state — the
104/// worker runs on its own thread and communicates with the main
105/// thread only via request/response values.
106pub trait Worker: Send + 'static {
107 /// Input type sent from the main thread.
108 type Request: Send + 'static;
109 /// Output type returned after processing.
110 type Response: Send + 'static;
111 /// Processes one request. Called on the worker thread.
112 fn handle(&mut self, req: Self::Request) -> Self::Response;
113}
114
115/// One response from the worker, tagged with the generation of the
116/// request that produced it.
117#[derive(Debug, Clone)]
118pub struct Output<T> {
119 /// Generation number of the request.
120 pub generation: u64,
121 /// The worker's response.
122 pub value: T,
123}
124
125enum Msg<R> {
126 Run { generation: u64, request: R },
127 Shutdown,
128}
129
130/// Coalescing async dispatcher around a [`Worker`].
131///
132/// See the [crate-level docs](crate) for the problem it solves.
133pub struct Coalescer<W: Worker> {
134 tx: Sender<Msg<W::Request>>,
135 rx: Receiver<Output<W::Response>>,
136 generation: u64,
137 _thread: Option<thread::JoinHandle<()>>,
138}
139
140impl<W: Worker> Coalescer<W> {
141 /// Spawns the worker thread and returns a handle.
142 ///
143 /// The thread is named `coalesce-worker` by default. Use
144 /// [`Coalescer::spawn_named`] to override.
145 pub fn new(worker: W) -> Self {
146 Self::spawn_named("coalesce-worker", worker)
147 }
148
149 /// Spawns the worker thread with a custom name (useful for
150 /// profilers and panic backtraces).
151 pub fn spawn_named(name: &str, worker: W) -> Self {
152 let (req_tx, req_rx) = mpsc::channel::<Msg<W::Request>>();
153 let (res_tx, res_rx) = mpsc::channel::<Output<W::Response>>();
154
155 let thread = thread::Builder::new()
156 .name(name.to_owned())
157 .spawn(move || worker_loop(worker, req_rx, res_tx))
158 .expect("failed to spawn coalescer worker thread");
159
160 Self {
161 tx: req_tx,
162 rx: res_rx,
163 generation: 0,
164 _thread: Some(thread),
165 }
166 }
167
168 /// Submits a new request. Older requests queued but not yet
169 /// started are silently discarded.
170 ///
171 /// Returns the generation number assigned to the new request;
172 /// use it later to match against [`Output::generation`].
173 ///
174 /// Non-blocking: just sends on a channel. A send error means the
175 /// worker has exited (panicked or dropped), in which case the
176 /// returned generation will never produce a response.
177 pub fn submit(&mut self, request: W::Request) -> u64 {
178 self.generation += 1;
179 let _ = self.tx.send(Msg::Run {
180 generation: self.generation,
181 request,
182 });
183 self.generation
184 }
185
186 /// Polls for the newest completed response.
187 ///
188 /// Drains all pending results from the channel and returns the
189 /// one with the highest generation number, silently dropping any
190 /// older responses. Returns `None` if no response is ready.
191 pub fn poll(&mut self) -> Option<Output<W::Response>> {
192 let mut latest: Option<Output<W::Response>> = None;
193 while let Ok(out) = self.rx.try_recv() {
194 match &latest {
195 Some(cur) if cur.generation >= out.generation => {}
196 _ => latest = Some(out),
197 }
198 }
199 latest
200 }
201
202 /// Discards all pending results without taking ownership of them.
203 ///
204 /// Use when switching context (tab switch, file close) so that a
205 /// response for the *previous* context doesn't leak into the new
206 /// one. Does not cancel in-flight worker computation — only
207 /// discards what's already been sent back.
208 pub fn flush_pending(&mut self) {
209 while self.rx.try_recv().is_ok() {}
210 }
211
212 /// Current generation counter (the number that was last assigned
213 /// by [`submit`](Self::submit), or 0 if nothing has been submitted).
214 pub fn current_generation(&self) -> u64 {
215 self.generation
216 }
217}
218
219impl<W: Worker> Drop for Coalescer<W> {
220 fn drop(&mut self) {
221 // Signal worker to exit; don't join — if the worker is busy
222 // we don't want Drop to block.
223 let _ = self.tx.send(Msg::Shutdown);
224 }
225}
226
227fn worker_loop<W: Worker>(
228 mut worker: W,
229 req_rx: Receiver<Msg<W::Request>>,
230 res_tx: Sender<Output<W::Response>>,
231) {
232 loop {
233 // Block for the first message.
234 let first = match req_rx.recv() {
235 Ok(m) => m,
236 Err(_) => return, // sender dropped
237 };
238
239 // Coalesce: if multiple messages are already queued, keep only
240 // the newest request (by generation) and act on any Shutdown
241 // we encounter.
242 let mut latest: Option<(u64, W::Request)> = None;
243 let mut shutdown = false;
244
245 let process = |m: Msg<W::Request>, latest: &mut Option<(u64, W::Request)>| -> bool {
246 match m {
247 Msg::Run {
248 generation,
249 request,
250 } => {
251 match latest {
252 Some((g, _)) if *g >= generation => {}
253 _ => *latest = Some((generation, request)),
254 }
255 false
256 }
257 Msg::Shutdown => true,
258 }
259 };
260
261 shutdown = process(first, &mut latest) || shutdown;
262
263 loop {
264 match req_rx.try_recv() {
265 Ok(m) => {
266 shutdown = process(m, &mut latest) || shutdown;
267 }
268 Err(mpsc::TryRecvError::Empty) => break,
269 Err(mpsc::TryRecvError::Disconnected) => return,
270 }
271 }
272
273 if shutdown {
274 return;
275 }
276
277 if let Some((generation, request)) = latest {
278 let value = worker.handle(request);
279 if res_tx.send(Output { generation, value }).is_err() {
280 return; // receiver dropped
281 }
282 }
283 }
284}
285
286#[cfg(test)]
287mod tests {
288 use super::*;
289 use std::sync::Arc;
290 use std::sync::atomic::{AtomicUsize, Ordering};
291 use std::time::{Duration, Instant};
292
293 /// Trivial worker that echoes the number of times it has been called
294 /// and records the input for inspection.
295 struct CountingWorker {
296 calls: Arc<AtomicUsize>,
297 }
298
299 impl Worker for CountingWorker {
300 type Request = u32;
301 type Response = u32;
302
303 fn handle(&mut self, req: u32) -> u32 {
304 self.calls.fetch_add(1, Ordering::SeqCst);
305 thread::sleep(Duration::from_millis(10));
306 req
307 }
308 }
309
310 fn wait_until<F: FnMut() -> bool>(mut cond: F, timeout: Duration) -> bool {
311 let start = Instant::now();
312 while start.elapsed() < timeout {
313 if cond() {
314 return true;
315 }
316 thread::sleep(Duration::from_millis(1));
317 }
318 false
319 }
320
321 #[test]
322 fn submit_and_poll_roundtrip() {
323 let calls = Arc::new(AtomicUsize::new(0));
324 let mut c = Coalescer::new(CountingWorker {
325 calls: Arc::clone(&calls),
326 });
327
328 let generation = c.submit(42);
329 assert_eq!(generation, 1);
330
331 let mut got = None;
332 assert!(
333 wait_until(
334 || {
335 got = c.poll();
336 got.is_some()
337 },
338 Duration::from_secs(1),
339 ),
340 "timed out waiting for response",
341 );
342
343 let out = got.unwrap();
344 assert_eq!(out.generation, 1);
345 assert_eq!(out.value, 42);
346 }
347
348 #[test]
349 fn poll_returns_newest_when_multiple_pending() {
350 // Spin the worker by submitting many requests and only polling
351 // at the end. The worker is slow enough that multiple responses
352 // will queue up.
353 let calls = Arc::new(AtomicUsize::new(0));
354 let mut c = Coalescer::new(CountingWorker {
355 calls: Arc::clone(&calls),
356 });
357
358 for i in 0..5 {
359 c.submit(i);
360 }
361
362 // Wait for the worker to process at least one; responses may queue.
363 thread::sleep(Duration::from_millis(100));
364 let out = c.poll().expect("should receive at least one response");
365 // Whatever was newest wins — must be the highest generation observed.
366 assert!(out.generation <= 5);
367 // No later response should surface — poll() drains everything.
368 assert!(c.poll().is_none());
369 }
370
371 #[test]
372 fn coalescing_drops_intermediate_requests() {
373 // Submit 100 requests faster than the worker can handle — most
374 // should be dropped before starting. At minimum, the final
375 // generation's response must arrive.
376 let calls = Arc::new(AtomicUsize::new(0));
377 let mut c = Coalescer::new(CountingWorker {
378 calls: Arc::clone(&calls),
379 });
380
381 for i in 0..100 {
382 c.submit(i);
383 }
384
385 let mut max_gen = 0;
386 let _ = wait_until(
387 || {
388 if let Some(out) = c.poll() {
389 max_gen = max_gen.max(out.generation);
390 }
391 max_gen == 100
392 },
393 Duration::from_secs(3),
394 );
395
396 assert_eq!(max_gen, 100, "final request should eventually complete");
397 // Workers should have been called far fewer than 100 times
398 // because coalescing drops stale requests.
399 let total_calls = calls.load(Ordering::SeqCst);
400 assert!(
401 total_calls < 100,
402 "expected coalescing to drop work, got {total_calls} calls"
403 );
404 }
405
406 #[test]
407 fn flush_pending_drops_unread_responses() {
408 let calls = Arc::new(AtomicUsize::new(0));
409 let mut c = Coalescer::new(CountingWorker {
410 calls: Arc::clone(&calls),
411 });
412
413 c.submit(1);
414 // Wait for the response to arrive.
415 wait_until(|| calls.load(Ordering::SeqCst) >= 1, Duration::from_secs(1));
416 thread::sleep(Duration::from_millis(20));
417
418 c.flush_pending();
419 assert!(
420 c.poll().is_none(),
421 "flush should have dropped the pending response"
422 );
423 }
424
425 #[test]
426 fn generation_monotonic() {
427 let calls = Arc::new(AtomicUsize::new(0));
428 let mut c = Coalescer::new(CountingWorker {
429 calls: Arc::clone(&calls),
430 });
431 assert_eq!(c.current_generation(), 0);
432 for i in 1..=5 {
433 let g = c.submit(i);
434 assert_eq!(g, i as u64);
435 }
436 assert_eq!(c.current_generation(), 5);
437 }
438
439 #[test]
440 fn drop_shuts_down_cleanly() {
441 // Verify creating and dropping doesn't panic or hang.
442 let calls = Arc::new(AtomicUsize::new(0));
443 let c = Coalescer::new(CountingWorker {
444 calls: Arc::clone(&calls),
445 });
446 drop(c);
447 }
448}