Skip to main content

oo_ide/editor/
highlight_worker.rs

1//! Async syntax highlighting worker.
2//!
3//! Provides background highlighting for large files or slow syntaxes.
4//! The main SyntaxHighlighter handles checkpointing and caching.
5
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use std::sync::Arc;
8use std::time::Duration;
9
10use tokio::sync::{mpsc, oneshot};
11
12use crate::editor::buffer::Version;
13use crate::editor::highlight::{SyntaxHighlighter, Token};
14
15pub struct HighlightWorker {
16    tx: mpsc::Sender<HighlightRequest>,
17    handle: tokio::task::JoinHandle<()>,
18}
19
20#[derive(Debug)]
21pub struct HighlightRequest {
22    pub version: Version,
23    pub start_line: usize,
24    pub count: usize,
25    /// Shared buffer lines to avoid cloning the Vec for every request.
26    pub lines: Arc<Vec<String>>,
27    pub reply: oneshot::Sender<Vec<Arc<Vec<Token>>>>,
28}
29
30impl HighlightWorker {
31    pub fn new(highlighter: SyntaxHighlighter) -> Self {
32        let (tx, rx) = mpsc::channel::<HighlightRequest>(32);
33
34        let handle = tokio::spawn(async move {
35            Self::run_worker(rx, highlighter).await;
36        });
37
38        Self { tx, handle }
39    }
40
41    async fn run_worker(
42        mut rx: mpsc::Receiver<HighlightRequest>,
43        highlighter: SyntaxHighlighter,
44    ) {
45        const DEBOUNCE_MS: u64 = 50;
46
47        let mut pending: Option<HighlightRequest> = None;
48        let mut last_request_time = std::time::Instant::now();
49
50        // Scheduler used to track and cancel active highlight jobs.
51        let scheduler = Arc::new(HighlightScheduler::new());
52
53        loop {
54            tokio::select! {
55                biased;
56
57                request = rx.recv() => {
58                    match request {
59                        None => break,
60                        Some(req) => {
61                            pending = Some(req);
62                            last_request_time = std::time::Instant::now();
63                        }
64                    }
65                }
66
67                _ = tokio::time::sleep(Duration::from_millis(DEBOUNCE_MS)), if pending.is_some() => {
68                    if last_request_time.elapsed() >= Duration::from_millis(DEBOUNCE_MS)
69                        && let Some(req) = pending.take() {
70                                    // Offload CPU-heavy highlighting to the blocking thread pool
71                            // to avoid blocking the Tokio runtime.
72                            let theme = highlighter.theme_name.clone();
73                            let syntax = highlighter.syntax_name.clone();
74                            let lines = req.lines;
75                            let version = req.version;
76                            let start_line = req.start_line;
77                            let count = req.count;
78                            let reply = req.reply;
79
80                            // Spawn a blocking task that constructs a local
81                            // SyntaxHighlighter and performs the highlighting. The
82                            // blocking task sends the result back on the oneshot
83                            // channel when complete.
84                            // Cancel any previously running jobs before starting this one.
85                            let job = HighlightJob::new(version, start_line, count);
86                            let cancelled = job.cancelled.clone();
87                            scheduler.cancel_all_jobs();
88                            scheduler.register_job(cancelled.clone());
89
90                            let scheduler_clone = scheduler.clone();
91                            tokio::task::spawn_blocking(move || {
92                                let local = SyntaxHighlighter::new().with_theme(&theme).with_syntax(&syntax);
93                                // Use the cancellable highlighter variant which checks the
94                                // cancellation flag between lines.
95                                let tokens = local.highlight_tokens_cancellable(&lines, version, start_line, count, &cancelled);
96                                // Only send results if this job was not cancelled.
97                                if !cancelled.load(std::sync::atomic::Ordering::SeqCst)
98                                    && reply.send(tokens).is_err() {
99                                        log::debug!("Highlight request receiver dropped");
100                                    }
101                                // Unregister job so scheduler stops tracking it.
102                                scheduler_clone.unregister_job(&cancelled);
103                            });
104                        }
105                }
106            }
107        }
108    }
109
110    pub async fn highlight(
111        &self,
112        version: Version,
113        start_line: usize,
114        count: usize,
115        lines: Arc<Vec<String>>,
116    ) -> Vec<Arc<Vec<Token>>> {
117        let (reply, received) = oneshot::channel();
118
119        let request = HighlightRequest {
120            version,
121            start_line,
122            count,
123            lines,
124            reply,
125        };
126
127        if self.tx.send(request).await.is_err() {
128            log::warn!("Highlight worker channel closed");
129            return Vec::new();
130        }
131
132        received.await.unwrap_or_default()
133    }
134
135    pub async fn shutdown(self) {
136        drop(self.tx);
137        let _ = self.handle.await;
138    }
139}
140
141#[derive(Debug, Clone)]
142pub struct HighlightJob {
143    pub version: Version,
144    pub start_line: usize,
145    pub count: usize,
146    pub cancelled: Arc<AtomicBool>,
147}
148
149impl HighlightJob {
150    pub fn new(version: Version, start_line: usize, count: usize) -> Self {
151        Self {
152            version,
153            start_line,
154            count,
155            cancelled: Arc::new(AtomicBool::new(false)),
156        }
157    }
158
159    pub fn cancel(&self) {
160        self.cancelled.store(true, Ordering::SeqCst);
161    }
162
163    pub fn is_cancelled(&self) -> bool {
164        self.cancelled.load(Ordering::SeqCst)
165    }
166}
167
168pub struct HighlightScheduler {
169    generation: Arc<AtomicU64>,
170    active_jobs: std::sync::Mutex<Vec<Arc<AtomicBool>>>,
171}
172
173impl Default for HighlightScheduler {
174    fn default() -> Self {
175        Self::new()
176    }
177}
178
179impl HighlightScheduler {
180    pub fn new() -> Self {
181        Self {
182            generation: Arc::new(AtomicU64::new(0)),
183            active_jobs: std::sync::Mutex::new(Vec::new()),
184        }
185    }
186
187    pub fn next_generation(&self) -> u64 {
188        self.generation.fetch_add(1, Ordering::SeqCst) + 1
189    }
190
191    pub fn current_generation(&self) -> u64 {
192        self.generation.load(Ordering::SeqCst)
193    }
194
195    pub fn cancel_all_jobs(&self) {
196        let jobs = self.active_jobs.lock().unwrap();
197        for job in jobs.iter() {
198            job.store(true, Ordering::SeqCst);
199        }
200    }
201
202    pub fn register_job(&self, job: Arc<AtomicBool>) {
203        let mut jobs = self.active_jobs.lock().unwrap();
204        jobs.push(job);
205        if jobs.len() > 100 {
206            jobs.remove(0);
207        }
208    }
209
210    pub fn unregister_job(&self, job: &Arc<AtomicBool>) {
211        let mut jobs = self.active_jobs.lock().unwrap();
212        jobs.retain(|j| !j.as_ptr().eq(&job.as_ptr()));
213    }
214}
215
216#[cfg(test)]
217mod tests {
218    use super::*;
219
220    #[test]
221    fn job_cancellation() {
222        let job = HighlightJob::new(Version::new(), 0, 10);
223        assert!(!job.is_cancelled());
224        
225        job.cancel();
226        assert!(job.is_cancelled());
227    }
228
229    #[test]
230    fn scheduler_generation() {
231        let scheduler = HighlightScheduler::new();
232        let gen1 = scheduler.current_generation();
233        let gen2 = scheduler.next_generation();
234        assert!(gen2 > gen1);
235    }
236}