Skip to main content

coalesce_worker/
lib.rs

1//! A coalescing worker thread with generation-counter stale-result
2//! rejection — the discipline needed to run tree-sitter (or any
3//! expensive incremental parser) off the main thread without
4//! corrupting state with out-of-date results.
5//!
6//! # The problem
7//!
8//! Suppose a GUI editor offloads syntax highlighting to a background
9//! thread. The user types rapidly; the main thread fires off highlight
10//! requests, one per keystroke. The worker can only process one at a
11//! time. By the time request N-5 finishes, the source has moved to
12//! state N, and the result computed from the stale N-5 source is no
13//! longer valid — if the main thread applies it anyway, the UI shows
14//! spans pointing at byte offsets that no longer exist, causing
15//! rendering glitches or panics.
16//!
17//! This class of bug is documented in `goliajp/devops/dotclaude/common/classic-errors.md`
18//! as **"Stale async cache after mutation"**.
19//!
20//! # The fix
21//!
22//! Two disciplines enforced by this crate:
23//!
24//! - **Request coalescing** — when the worker finishes one job, it
25//!   drains any queued requests and processes only the latest; older
26//!   requests that never started are silently discarded.
27//! - **Generation counting** — every submitted request gets a monotonic
28//!   generation number. When the main thread polls for results, it
29//!   drains the receive channel and keeps only the newest generation;
30//!   older results are dropped without being applied.
31//!
32//! # Example: tree-sitter highlighting
33//!
34//! ```no_run
35//! use coalesce_worker::{Worker, Coalescer};
36//! use std::sync::Arc;
37//!
38//! # #[cfg(any())] {
39//! struct HighlightWorker {
40//!     highlighter: tree_sitter_highlight::Highlighter,
41//! }
42//!
43//! struct HighlightRequest {
44//!     source: Arc<Vec<u8>>,
45//!     config: Arc<tree_sitter_highlight::HighlightConfiguration>,
46//! }
47//!
48//! struct HighlightResponse {
49//!     events: Vec<tree_sitter_highlight::HighlightEvent>,
50//! }
51//!
52//! impl Worker for HighlightWorker {
53//!     type Request = HighlightRequest;
54//!     type Response = HighlightResponse;
55//!     fn handle(&mut self, req: Self::Request) -> Self::Response {
56//!         let events = self.highlighter
57//!             .highlight(&req.config, &req.source, None, |_| None)
58//!             .unwrap()
59//!             .collect::<Result<Vec<_>, _>>()
60//!             .unwrap();
61//!         HighlightResponse { events }
62//!     }
63//! }
64//! # }
65//! ```
66//!
67//! Then drive the coalescer from the main thread:
68//!
69//! ```no_run
70//! # use coalesce_worker::{Worker, Coalescer, Output};
71//! # struct MyWorker;
72//! # struct Req;
73//! # struct Res;
74//! # impl Worker for MyWorker {
75//! #     type Request = Req;
76//! #     type Response = Res;
77//! #     fn handle(&mut self, _r: Req) -> Res { Res }
78//! # }
79//! # let mut coalescer = Coalescer::new(MyWorker);
80//! # fn current_source() -> Req { Req }
81//! # fn render_highlights(_r: Res) {}
82//! // main loop
83//! coalescer.submit(current_source());
84//! if let Some(Output { generation: _, value }) = coalescer.poll() {
85//!     render_highlights(value);
86//! }
87//! ```
88//!
89//! # Not only for tree-sitter
90//!
91//! Any long-running background computation that can be superseded —
92//! rebuilding a suggestion index, recompiling a preview, running a
93//! linter — fits the same pattern.
94
95#![deny(missing_docs)]
96
97use std::sync::mpsc::{self, Receiver, Sender};
98use std::thread;
99
100/// A worker that owns its processing state and handles one request
101/// at a time.
102///
103/// The implementation should be free of shared mutable state — the
104/// worker runs on its own thread and communicates with the main
105/// thread only via request/response values.
106pub trait Worker: Send + 'static {
107    /// Input type sent from the main thread.
108    type Request: Send + 'static;
109    /// Output type returned after processing.
110    type Response: Send + 'static;
111    /// Processes one request. Called on the worker thread.
112    fn handle(&mut self, req: Self::Request) -> Self::Response;
113}
114
115/// One response from the worker, tagged with the generation of the
116/// request that produced it.
117#[derive(Debug, Clone)]
118pub struct Output<T> {
119    /// Generation number of the request.
120    pub generation: u64,
121    /// The worker's response.
122    pub value: T,
123}
124
125enum Msg<R> {
126    Run { generation: u64, request: R },
127    Shutdown,
128}
129
130/// Coalescing async dispatcher around a [`Worker`].
131///
132/// See the [crate-level docs](crate) for the problem it solves.
133pub struct Coalescer<W: Worker> {
134    tx: Sender<Msg<W::Request>>,
135    rx: Receiver<Output<W::Response>>,
136    generation: u64,
137    _thread: Option<thread::JoinHandle<()>>,
138}
139
140impl<W: Worker> Coalescer<W> {
141    /// Spawns the worker thread and returns a handle.
142    ///
143    /// The thread is named `coalesce-worker` by default. Use
144    /// [`Coalescer::spawn_named`] to override.
145    pub fn new(worker: W) -> Self {
146        Self::spawn_named("coalesce-worker", worker)
147    }
148
149    /// Spawns the worker thread with a custom name (useful for
150    /// profilers and panic backtraces).
151    pub fn spawn_named(name: &str, worker: W) -> Self {
152        let (req_tx, req_rx) = mpsc::channel::<Msg<W::Request>>();
153        let (res_tx, res_rx) = mpsc::channel::<Output<W::Response>>();
154
155        let thread = thread::Builder::new()
156            .name(name.to_owned())
157            .spawn(move || worker_loop(worker, req_rx, res_tx))
158            .expect("failed to spawn coalescer worker thread");
159
160        Self {
161            tx: req_tx,
162            rx: res_rx,
163            generation: 0,
164            _thread: Some(thread),
165        }
166    }
167
168    /// Submits a new request. Older requests queued but not yet
169    /// started are silently discarded.
170    ///
171    /// Returns the generation number assigned to the new request;
172    /// use it later to match against [`Output::generation`].
173    ///
174    /// Non-blocking: just sends on a channel. A send error means the
175    /// worker has exited (panicked or dropped), in which case the
176    /// returned generation will never produce a response.
177    pub fn submit(&mut self, request: W::Request) -> u64 {
178        self.generation += 1;
179        let _ = self.tx.send(Msg::Run {
180            generation: self.generation,
181            request,
182        });
183        self.generation
184    }
185
186    /// Polls for the newest completed response.
187    ///
188    /// Drains all pending results from the channel and returns the
189    /// one with the highest generation number, silently dropping any
190    /// older responses. Returns `None` if no response is ready.
191    pub fn poll(&mut self) -> Option<Output<W::Response>> {
192        let mut latest: Option<Output<W::Response>> = None;
193        while let Ok(out) = self.rx.try_recv() {
194            match &latest {
195                Some(cur) if cur.generation >= out.generation => {}
196                _ => latest = Some(out),
197            }
198        }
199        latest
200    }
201
202    /// Discards all pending results without taking ownership of them.
203    ///
204    /// Use when switching context (tab switch, file close) so that a
205    /// response for the *previous* context doesn't leak into the new
206    /// one. Does not cancel in-flight worker computation — only
207    /// discards what's already been sent back.
208    pub fn flush_pending(&mut self) {
209        while self.rx.try_recv().is_ok() {}
210    }
211
212    /// Current generation counter (the number that was last assigned
213    /// by [`submit`](Self::submit), or 0 if nothing has been submitted).
214    pub fn current_generation(&self) -> u64 {
215        self.generation
216    }
217}
218
219impl<W: Worker> Drop for Coalescer<W> {
220    fn drop(&mut self) {
221        // Signal worker to exit; don't join — if the worker is busy
222        // we don't want Drop to block.
223        let _ = self.tx.send(Msg::Shutdown);
224    }
225}
226
227fn worker_loop<W: Worker>(
228    mut worker: W,
229    req_rx: Receiver<Msg<W::Request>>,
230    res_tx: Sender<Output<W::Response>>,
231) {
232    loop {
233        // Block for the first message.
234        let first = match req_rx.recv() {
235            Ok(m) => m,
236            Err(_) => return, // sender dropped
237        };
238
239        // Coalesce: if multiple messages are already queued, keep only
240        // the newest request (by generation) and act on any Shutdown
241        // we encounter.
242        let mut latest: Option<(u64, W::Request)> = None;
243        let mut shutdown = false;
244
245        let process = |m: Msg<W::Request>, latest: &mut Option<(u64, W::Request)>| -> bool {
246            match m {
247                Msg::Run {
248                    generation,
249                    request,
250                } => {
251                    match latest {
252                        Some((g, _)) if *g >= generation => {}
253                        _ => *latest = Some((generation, request)),
254                    }
255                    false
256                }
257                Msg::Shutdown => true,
258            }
259        };
260
261        shutdown = process(first, &mut latest) || shutdown;
262
263        loop {
264            match req_rx.try_recv() {
265                Ok(m) => {
266                    shutdown = process(m, &mut latest) || shutdown;
267                }
268                Err(mpsc::TryRecvError::Empty) => break,
269                Err(mpsc::TryRecvError::Disconnected) => return,
270            }
271        }
272
273        if shutdown {
274            return;
275        }
276
277        if let Some((generation, request)) = latest {
278            let value = worker.handle(request);
279            if res_tx.send(Output { generation, value }).is_err() {
280                return; // receiver dropped
281            }
282        }
283    }
284}
285
286#[cfg(test)]
287mod tests {
288    use super::*;
289    use std::sync::Arc;
290    use std::sync::atomic::{AtomicUsize, Ordering};
291    use std::time::{Duration, Instant};
292
293    /// Trivial worker that echoes the number of times it has been called
294    /// and records the input for inspection.
295    struct CountingWorker {
296        calls: Arc<AtomicUsize>,
297    }
298
299    impl Worker for CountingWorker {
300        type Request = u32;
301        type Response = u32;
302
303        fn handle(&mut self, req: u32) -> u32 {
304            self.calls.fetch_add(1, Ordering::SeqCst);
305            thread::sleep(Duration::from_millis(10));
306            req
307        }
308    }
309
310    fn wait_until<F: FnMut() -> bool>(mut cond: F, timeout: Duration) -> bool {
311        let start = Instant::now();
312        while start.elapsed() < timeout {
313            if cond() {
314                return true;
315            }
316            thread::sleep(Duration::from_millis(1));
317        }
318        false
319    }
320
321    #[test]
322    fn submit_and_poll_roundtrip() {
323        let calls = Arc::new(AtomicUsize::new(0));
324        let mut c = Coalescer::new(CountingWorker {
325            calls: Arc::clone(&calls),
326        });
327
328        let generation = c.submit(42);
329        assert_eq!(generation, 1);
330
331        let mut got = None;
332        assert!(
333            wait_until(
334                || {
335                    got = c.poll();
336                    got.is_some()
337                },
338                Duration::from_secs(1),
339            ),
340            "timed out waiting for response",
341        );
342
343        let out = got.unwrap();
344        assert_eq!(out.generation, 1);
345        assert_eq!(out.value, 42);
346    }
347
348    #[test]
349    fn poll_returns_newest_when_multiple_pending() {
350        // Spin the worker by submitting many requests and only polling
351        // at the end. The worker is slow enough that multiple responses
352        // will queue up.
353        let calls = Arc::new(AtomicUsize::new(0));
354        let mut c = Coalescer::new(CountingWorker {
355            calls: Arc::clone(&calls),
356        });
357
358        for i in 0..5 {
359            c.submit(i);
360        }
361
362        // Wait for the worker to process at least one; responses may queue.
363        thread::sleep(Duration::from_millis(100));
364        let out = c.poll().expect("should receive at least one response");
365        // Whatever was newest wins — must be the highest generation observed.
366        assert!(out.generation <= 5);
367        // No later response should surface — poll() drains everything.
368        assert!(c.poll().is_none());
369    }
370
371    #[test]
372    fn coalescing_drops_intermediate_requests() {
373        // Submit 100 requests faster than the worker can handle — most
374        // should be dropped before starting. At minimum, the final
375        // generation's response must arrive.
376        let calls = Arc::new(AtomicUsize::new(0));
377        let mut c = Coalescer::new(CountingWorker {
378            calls: Arc::clone(&calls),
379        });
380
381        for i in 0..100 {
382            c.submit(i);
383        }
384
385        let mut max_gen = 0;
386        let _ = wait_until(
387            || {
388                if let Some(out) = c.poll() {
389                    max_gen = max_gen.max(out.generation);
390                }
391                max_gen == 100
392            },
393            Duration::from_secs(3),
394        );
395
396        assert_eq!(max_gen, 100, "final request should eventually complete");
397        // Workers should have been called far fewer than 100 times
398        // because coalescing drops stale requests.
399        let total_calls = calls.load(Ordering::SeqCst);
400        assert!(
401            total_calls < 100,
402            "expected coalescing to drop work, got {total_calls} calls"
403        );
404    }
405
406    #[test]
407    fn flush_pending_drops_unread_responses() {
408        let calls = Arc::new(AtomicUsize::new(0));
409        let mut c = Coalescer::new(CountingWorker {
410            calls: Arc::clone(&calls),
411        });
412
413        c.submit(1);
414        // Wait for the response to arrive.
415        wait_until(|| calls.load(Ordering::SeqCst) >= 1, Duration::from_secs(1));
416        thread::sleep(Duration::from_millis(20));
417
418        c.flush_pending();
419        assert!(
420            c.poll().is_none(),
421            "flush should have dropped the pending response"
422        );
423    }
424
425    #[test]
426    fn generation_monotonic() {
427        let calls = Arc::new(AtomicUsize::new(0));
428        let mut c = Coalescer::new(CountingWorker {
429            calls: Arc::clone(&calls),
430        });
431        assert_eq!(c.current_generation(), 0);
432        for i in 1..=5 {
433            let g = c.submit(i);
434            assert_eq!(g, i as u64);
435        }
436        assert_eq!(c.current_generation(), 5);
437    }
438
439    #[test]
440    fn drop_shuts_down_cleanly() {
441        // Verify creating and dropping doesn't panic or hang.
442        let calls = Arc::new(AtomicUsize::new(0));
443        let c = Coalescer::new(CountingWorker {
444            calls: Arc::clone(&calls),
445        });
446        drop(c);
447    }
448}