coalesce-worker 0.1.2

Coalescing worker thread with generation-counter stale-result rejection — the discipline needed to run tree-sitter (or any expensive computation) off the main thread without applying out-of-date results
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
//! A coalescing worker thread with generation-counter stale-result
//! rejection — the discipline needed to run tree-sitter (or any
//! expensive incremental parser) off the main thread without
//! corrupting state with out-of-date results.
//!
//! # The problem
//!
//! Suppose a GUI editor offloads syntax highlighting to a background
//! thread. The user types rapidly; the main thread fires off highlight
//! requests, one per keystroke. The worker can only process one at a
//! time. By the time request N-5 finishes, the source has moved to
//! state N, and the result computed from the stale N-5 source is no
//! longer valid — if the main thread applies it anyway, the UI shows
//! spans pointing at byte offsets that no longer exist, causing
//! rendering glitches or panics.
//!
//! This class of bug is documented in `goliajp/devops/dotclaude/common/classic-errors.md`
//! as **"Stale async cache after mutation"**.
//!
//! # The fix
//!
//! Two disciplines enforced by this crate:
//!
//! - **Request coalescing** — when the worker finishes one job, it
//!   drains any queued requests and processes only the latest; older
//!   requests that never started are silently discarded.
//! - **Generation counting** — every submitted request gets a monotonic
//!   generation number. When the main thread polls for results, it
//!   drains the receive channel and keeps only the newest generation;
//!   older results are dropped without being applied.
//!
//! # Example: tree-sitter highlighting
//!
//! ```no_run
//! use coalesce_worker::{Worker, Coalescer};
//! use std::sync::Arc;
//!
//! # #[cfg(any())] {
//! struct HighlightWorker {
//!     highlighter: tree_sitter_highlight::Highlighter,
//! }
//!
//! struct HighlightRequest {
//!     source: Arc<Vec<u8>>,
//!     config: Arc<tree_sitter_highlight::HighlightConfiguration>,
//! }
//!
//! struct HighlightResponse {
//!     events: Vec<tree_sitter_highlight::HighlightEvent>,
//! }
//!
//! impl Worker for HighlightWorker {
//!     type Request = HighlightRequest;
//!     type Response = HighlightResponse;
//!     fn handle(&mut self, req: Self::Request) -> Self::Response {
//!         let events = self.highlighter
//!             .highlight(&req.config, &req.source, None, |_| None)
//!             .unwrap()
//!             .collect::<Result<Vec<_>, _>>()
//!             .unwrap();
//!         HighlightResponse { events }
//!     }
//! }
//! # }
//! ```
//!
//! Then drive the coalescer from the main thread:
//!
//! ```no_run
//! # use coalesce_worker::{Worker, Coalescer, Output};
//! # struct MyWorker;
//! # struct Req;
//! # struct Res;
//! # impl Worker for MyWorker {
//! #     type Request = Req;
//! #     type Response = Res;
//! #     fn handle(&mut self, _r: Req) -> Res { Res }
//! # }
//! # let mut coalescer = Coalescer::new(MyWorker);
//! # fn current_source() -> Req { Req }
//! # fn render_highlights(_r: Res) {}
//! // main loop
//! coalescer.submit(current_source());
//! if let Some(Output { generation: _, value }) = coalescer.poll() {
//!     render_highlights(value);
//! }
//! ```
//!
//! # Not only for tree-sitter
//!
//! Any long-running background computation that can be superseded —
//! rebuilding a suggestion index, recompiling a preview, running a
//! linter — fits the same pattern.

#![deny(missing_docs)]

use std::sync::mpsc::{self, Receiver, Sender};
use std::thread;

/// A worker that owns its processing state and handles one request
/// at a time.
///
/// The implementation should be free of shared mutable state — the
/// worker runs on its own thread and communicates with the main
/// thread only via request/response values.
pub trait Worker: Send + 'static {
    /// Input type sent from the main thread.
    type Request: Send + 'static;
    /// Output type returned after processing.
    type Response: Send + 'static;
    /// Processes one request. Called on the worker thread.
    fn handle(&mut self, req: Self::Request) -> Self::Response;
}

/// One response from the worker, tagged with the generation of the
/// request that produced it.
#[derive(Debug, Clone)]
pub struct Output<T> {
    /// Generation number of the request.
    pub generation: u64,
    /// The worker's response.
    pub value: T,
}

enum Msg<R> {
    Run { generation: u64, request: R },
    Shutdown,
}

/// Coalescing async dispatcher around a [`Worker`].
///
/// See the [crate-level docs](crate) for the problem it solves.
pub struct Coalescer<W: Worker> {
    tx: Sender<Msg<W::Request>>,
    rx: Receiver<Output<W::Response>>,
    generation: u64,
    _thread: Option<thread::JoinHandle<()>>,
}

impl<W: Worker> Coalescer<W> {
    /// Spawns the worker thread and returns a handle.
    ///
    /// The thread is named `coalesce-worker` by default. Use
    /// [`Coalescer::spawn_named`] to override.
    pub fn new(worker: W) -> Self {
        Self::spawn_named("coalesce-worker", worker)
    }

    /// Spawns the worker thread with a custom name (useful for
    /// profilers and panic backtraces).
    pub fn spawn_named(name: &str, worker: W) -> Self {
        let (req_tx, req_rx) = mpsc::channel::<Msg<W::Request>>();
        let (res_tx, res_rx) = mpsc::channel::<Output<W::Response>>();

        let thread = thread::Builder::new()
            .name(name.to_owned())
            .spawn(move || worker_loop(worker, req_rx, res_tx))
            .expect("failed to spawn coalescer worker thread");

        Self {
            tx: req_tx,
            rx: res_rx,
            generation: 0,
            _thread: Some(thread),
        }
    }

    /// Submits a new request. Older requests queued but not yet
    /// started are silently discarded.
    ///
    /// Returns the generation number assigned to the new request;
    /// use it later to match against [`Output::generation`].
    ///
    /// Non-blocking: just sends on a channel. A send error means the
    /// worker has exited (panicked or dropped), in which case the
    /// returned generation will never produce a response.
    pub fn submit(&mut self, request: W::Request) -> u64 {
        self.generation += 1;
        let _ = self.tx.send(Msg::Run {
            generation: self.generation,
            request,
        });
        self.generation
    }

    /// Polls for the newest completed response.
    ///
    /// Drains all pending results from the channel and returns the
    /// one with the highest generation number, silently dropping any
    /// older responses. Returns `None` if no response is ready.
    pub fn poll(&mut self) -> Option<Output<W::Response>> {
        let mut latest: Option<Output<W::Response>> = None;
        while let Ok(out) = self.rx.try_recv() {
            match &latest {
                Some(cur) if cur.generation >= out.generation => {}
                _ => latest = Some(out),
            }
        }
        latest
    }

    /// Discards all pending results without taking ownership of them.
    ///
    /// Use when switching context (tab switch, file close) so that a
    /// response for the *previous* context doesn't leak into the new
    /// one. Does not cancel in-flight worker computation — only
    /// discards what's already been sent back.
    pub fn flush_pending(&mut self) {
        while self.rx.try_recv().is_ok() {}
    }

    /// Current generation counter (the number that was last assigned
    /// by [`submit`](Self::submit), or 0 if nothing has been submitted).
    pub fn current_generation(&self) -> u64 {
        self.generation
    }
}

impl<W: Worker> Drop for Coalescer<W> {
    fn drop(&mut self) {
        // Signal worker to exit; don't join — if the worker is busy
        // we don't want Drop to block.
        let _ = self.tx.send(Msg::Shutdown);
    }
}

fn worker_loop<W: Worker>(
    mut worker: W,
    req_rx: Receiver<Msg<W::Request>>,
    res_tx: Sender<Output<W::Response>>,
) {
    loop {
        // Block for the first message.
        let first = match req_rx.recv() {
            Ok(m) => m,
            Err(_) => return, // sender dropped
        };

        // Coalesce: if multiple messages are already queued, keep only
        // the newest request (by generation) and act on any Shutdown
        // we encounter.
        let mut latest: Option<(u64, W::Request)> = None;
        let mut shutdown = false;

        let process = |m: Msg<W::Request>, latest: &mut Option<(u64, W::Request)>| -> bool {
            match m {
                Msg::Run {
                    generation,
                    request,
                } => {
                    match latest {
                        Some((g, _)) if *g >= generation => {}
                        _ => *latest = Some((generation, request)),
                    }
                    false
                }
                Msg::Shutdown => true,
            }
        };

        shutdown = process(first, &mut latest) || shutdown;

        loop {
            match req_rx.try_recv() {
                Ok(m) => {
                    shutdown = process(m, &mut latest) || shutdown;
                }
                Err(mpsc::TryRecvError::Empty) => break,
                Err(mpsc::TryRecvError::Disconnected) => return,
            }
        }

        if shutdown {
            return;
        }

        if let Some((generation, request)) = latest {
            let value = worker.handle(request);
            if res_tx.send(Output { generation, value }).is_err() {
                return; // receiver dropped
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::sync::Arc;
    use std::sync::atomic::{AtomicUsize, Ordering};
    use std::time::{Duration, Instant};

    /// Trivial worker that echoes the number of times it has been called
    /// and records the input for inspection.
    struct CountingWorker {
        calls: Arc<AtomicUsize>,
    }

    impl Worker for CountingWorker {
        type Request = u32;
        type Response = u32;

        fn handle(&mut self, req: u32) -> u32 {
            self.calls.fetch_add(1, Ordering::SeqCst);
            thread::sleep(Duration::from_millis(10));
            req
        }
    }

    fn wait_until<F: FnMut() -> bool>(mut cond: F, timeout: Duration) -> bool {
        let start = Instant::now();
        while start.elapsed() < timeout {
            if cond() {
                return true;
            }
            thread::sleep(Duration::from_millis(1));
        }
        false
    }

    #[test]
    fn submit_and_poll_roundtrip() {
        let calls = Arc::new(AtomicUsize::new(0));
        let mut c = Coalescer::new(CountingWorker {
            calls: Arc::clone(&calls),
        });

        let generation = c.submit(42);
        assert_eq!(generation, 1);

        let mut got = None;
        assert!(
            wait_until(
                || {
                    got = c.poll();
                    got.is_some()
                },
                Duration::from_secs(1),
            ),
            "timed out waiting for response",
        );

        let out = got.unwrap();
        assert_eq!(out.generation, 1);
        assert_eq!(out.value, 42);
    }

    #[test]
    fn poll_returns_newest_when_multiple_pending() {
        // Spin the worker by submitting many requests and only polling
        // at the end. The worker is slow enough that multiple responses
        // will queue up.
        let calls = Arc::new(AtomicUsize::new(0));
        let mut c = Coalescer::new(CountingWorker {
            calls: Arc::clone(&calls),
        });

        for i in 0..5 {
            c.submit(i);
        }

        // Wait for the worker to process at least one; responses may queue.
        thread::sleep(Duration::from_millis(100));
        let out = c.poll().expect("should receive at least one response");
        // Whatever was newest wins — must be the highest generation observed.
        assert!(out.generation <= 5);
        // No later response should surface — poll() drains everything.
        assert!(c.poll().is_none());
    }

    #[test]
    fn coalescing_drops_intermediate_requests() {
        // Submit 100 requests faster than the worker can handle — most
        // should be dropped before starting. At minimum, the final
        // generation's response must arrive.
        let calls = Arc::new(AtomicUsize::new(0));
        let mut c = Coalescer::new(CountingWorker {
            calls: Arc::clone(&calls),
        });

        for i in 0..100 {
            c.submit(i);
        }

        let mut max_gen = 0;
        let _ = wait_until(
            || {
                if let Some(out) = c.poll() {
                    max_gen = max_gen.max(out.generation);
                }
                max_gen == 100
            },
            Duration::from_secs(3),
        );

        assert_eq!(max_gen, 100, "final request should eventually complete");
        // Workers should have been called far fewer than 100 times
        // because coalescing drops stale requests.
        let total_calls = calls.load(Ordering::SeqCst);
        assert!(
            total_calls < 100,
            "expected coalescing to drop work, got {total_calls} calls"
        );
    }

    #[test]
    fn flush_pending_drops_unread_responses() {
        let calls = Arc::new(AtomicUsize::new(0));
        let mut c = Coalescer::new(CountingWorker {
            calls: Arc::clone(&calls),
        });

        c.submit(1);
        // Wait for the response to arrive.
        wait_until(|| calls.load(Ordering::SeqCst) >= 1, Duration::from_secs(1));
        thread::sleep(Duration::from_millis(20));

        c.flush_pending();
        assert!(
            c.poll().is_none(),
            "flush should have dropped the pending response"
        );
    }

    #[test]
    fn generation_monotonic() {
        let calls = Arc::new(AtomicUsize::new(0));
        let mut c = Coalescer::new(CountingWorker {
            calls: Arc::clone(&calls),
        });
        assert_eq!(c.current_generation(), 0);
        for i in 1..=5 {
            let g = c.submit(i);
            assert_eq!(g, i as u64);
        }
        assert_eq!(c.current_generation(), 5);
    }

    #[test]
    fn drop_shuts_down_cleanly() {
        // Verify creating and dropping doesn't panic or hang.
        let calls = Arc::new(AtomicUsize::new(0));
        let c = Coalescer::new(CountingWorker {
            calls: Arc::clone(&calls),
        });
        drop(c);
    }
}