oo_ide/editor/
highlight_worker.rs1use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use std::sync::Arc;
8use std::time::Duration;
9
10use tokio::sync::{mpsc, oneshot};
11
12use crate::editor::buffer::Version;
13use crate::editor::highlight::{SyntaxHighlighter, Token};
14
15pub struct HighlightWorker {
16 tx: mpsc::Sender<HighlightRequest>,
17 handle: tokio::task::JoinHandle<()>,
18}
19
20#[derive(Debug)]
21pub struct HighlightRequest {
22 pub version: Version,
23 pub start_line: usize,
24 pub count: usize,
25 pub lines: Arc<Vec<String>>,
27 pub reply: oneshot::Sender<Vec<Arc<Vec<Token>>>>,
28}
29
30impl HighlightWorker {
31 pub fn new(highlighter: SyntaxHighlighter) -> Self {
32 let (tx, rx) = mpsc::channel::<HighlightRequest>(32);
33
34 let handle = tokio::spawn(async move {
35 Self::run_worker(rx, highlighter).await;
36 });
37
38 Self { tx, handle }
39 }
40
41 async fn run_worker(
42 mut rx: mpsc::Receiver<HighlightRequest>,
43 highlighter: SyntaxHighlighter,
44 ) {
45 const DEBOUNCE_MS: u64 = 50;
46
47 let mut pending: Option<HighlightRequest> = None;
48 let mut last_request_time = std::time::Instant::now();
49
50 let scheduler = Arc::new(HighlightScheduler::new());
52
53 loop {
54 tokio::select! {
55 biased;
56
57 request = rx.recv() => {
58 match request {
59 None => break,
60 Some(req) => {
61 pending = Some(req);
62 last_request_time = std::time::Instant::now();
63 }
64 }
65 }
66
67 _ = tokio::time::sleep(Duration::from_millis(DEBOUNCE_MS)), if pending.is_some() => {
68 if last_request_time.elapsed() >= Duration::from_millis(DEBOUNCE_MS)
69 && let Some(req) = pending.take() {
70 let theme = highlighter.theme_name.clone();
73 let syntax = highlighter.syntax_name.clone();
74 let lines = req.lines;
75 let version = req.version;
76 let start_line = req.start_line;
77 let count = req.count;
78 let reply = req.reply;
79
80 let job = HighlightJob::new(version, start_line, count);
86 let cancelled = job.cancelled.clone();
87 scheduler.cancel_all_jobs();
88 scheduler.register_job(cancelled.clone());
89
90 let scheduler_clone = scheduler.clone();
91 tokio::task::spawn_blocking(move || {
92 let local = SyntaxHighlighter::new().with_theme(&theme).with_syntax(&syntax);
93 let tokens = local.highlight_tokens_cancellable(&lines, version, start_line, count, &cancelled);
96 if !cancelled.load(std::sync::atomic::Ordering::SeqCst)
98 && reply.send(tokens).is_err() {
99 log::debug!("Highlight request receiver dropped");
100 }
101 scheduler_clone.unregister_job(&cancelled);
103 });
104 }
105 }
106 }
107 }
108 }
109
110 pub async fn highlight(
111 &self,
112 version: Version,
113 start_line: usize,
114 count: usize,
115 lines: Arc<Vec<String>>,
116 ) -> Vec<Arc<Vec<Token>>> {
117 let (reply, received) = oneshot::channel();
118
119 let request = HighlightRequest {
120 version,
121 start_line,
122 count,
123 lines,
124 reply,
125 };
126
127 if self.tx.send(request).await.is_err() {
128 log::warn!("Highlight worker channel closed");
129 return Vec::new();
130 }
131
132 received.await.unwrap_or_default()
133 }
134
135 pub async fn shutdown(self) {
136 drop(self.tx);
137 let _ = self.handle.await;
138 }
139}
140
141#[derive(Debug, Clone)]
142pub struct HighlightJob {
143 pub version: Version,
144 pub start_line: usize,
145 pub count: usize,
146 pub cancelled: Arc<AtomicBool>,
147}
148
149impl HighlightJob {
150 pub fn new(version: Version, start_line: usize, count: usize) -> Self {
151 Self {
152 version,
153 start_line,
154 count,
155 cancelled: Arc::new(AtomicBool::new(false)),
156 }
157 }
158
159 pub fn cancel(&self) {
160 self.cancelled.store(true, Ordering::SeqCst);
161 }
162
163 pub fn is_cancelled(&self) -> bool {
164 self.cancelled.load(Ordering::SeqCst)
165 }
166}
167
168pub struct HighlightScheduler {
169 generation: Arc<AtomicU64>,
170 active_jobs: std::sync::Mutex<Vec<Arc<AtomicBool>>>,
171}
172
173impl Default for HighlightScheduler {
174 fn default() -> Self {
175 Self::new()
176 }
177}
178
179impl HighlightScheduler {
180 pub fn new() -> Self {
181 Self {
182 generation: Arc::new(AtomicU64::new(0)),
183 active_jobs: std::sync::Mutex::new(Vec::new()),
184 }
185 }
186
187 pub fn next_generation(&self) -> u64 {
188 self.generation.fetch_add(1, Ordering::SeqCst) + 1
189 }
190
191 pub fn current_generation(&self) -> u64 {
192 self.generation.load(Ordering::SeqCst)
193 }
194
195 pub fn cancel_all_jobs(&self) {
196 let jobs = self.active_jobs.lock().unwrap();
197 for job in jobs.iter() {
198 job.store(true, Ordering::SeqCst);
199 }
200 }
201
202 pub fn register_job(&self, job: Arc<AtomicBool>) {
203 let mut jobs = self.active_jobs.lock().unwrap();
204 jobs.push(job);
205 if jobs.len() > 100 {
206 jobs.remove(0);
207 }
208 }
209
210 pub fn unregister_job(&self, job: &Arc<AtomicBool>) {
211 let mut jobs = self.active_jobs.lock().unwrap();
212 jobs.retain(|j| !j.as_ptr().eq(&job.as_ptr()));
213 }
214}
215
216#[cfg(test)]
217mod tests {
218 use super::*;
219
220 #[test]
221 fn job_cancellation() {
222 let job = HighlightJob::new(Version::new(), 0, 10);
223 assert!(!job.is_cancelled());
224
225 job.cancel();
226 assert!(job.is_cancelled());
227 }
228
229 #[test]
230 fn scheduler_generation() {
231 let scheduler = HighlightScheduler::new();
232 let gen1 = scheduler.current_generation();
233 let gen2 = scheduler.next_generation();
234 assert!(gen2 > gen1);
235 }
236}