1use crate::linux::linux_default_runtime_event_source as linux_platform_runtime_event_source;
2#[cfg(target_os = "linux")]
3use crate::linux_observer::LinuxObserverBridge;
4use crate::linux_subscriber::{
5 linux_native_runtime_adapter_registered, set_linux_native_runtime_adapter,
6};
7#[cfg(target_os = "linux")]
8use std::io::{BufRead, BufReader};
9#[cfg(target_os = "linux")]
10use std::process::{Child, ChildStdout, Command, Stdio};
11#[cfg(target_os = "linux")]
12use std::sync::{
13 atomic::{AtomicBool, Ordering},
14 Arc, Mutex as StdMutex,
15};
16use std::sync::{Mutex, OnceLock};
17#[cfg(target_os = "linux")]
18use std::thread::{self, JoinHandle};
19#[cfg(target_os = "linux")]
20use std::time::Duration;
21
22#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
23pub struct LinuxDefaultRuntimeAdapterState {
24 pub attached: bool,
25 pub worker_running: bool,
26 pub attach_calls: u64,
27 pub detach_calls: u64,
28 pub listener_exits: u64,
29 pub listener_restarts: u64,
30 pub listener_failures: u64,
31}
32
33pub type LinuxDefaultRuntimeEventSource = fn() -> Option<String>;
34
35#[cfg(target_os = "linux")]
36const LINUX_RUNTIME_EVENT_MARKER: &str = "__SC_EVENT__";
37#[cfg(target_os = "linux")]
38const LINUX_ATTACH_RETRY_LIMIT: u32 = 4;
39#[cfg(target_os = "linux")]
40const LINUX_RESTART_RETRY_LIMIT: u32 = 8;
41#[cfg(target_os = "linux")]
42const LINUX_RETRY_BACKOFF_BASE: Duration = Duration::from_millis(50);
43#[cfg(target_os = "linux")]
44const LINUX_RETRY_BACKOFF_MAX: Duration = Duration::from_millis(800);
45
46#[cfg(target_os = "linux")]
47const _: () = assert!(
48 LINUX_RETRY_BACKOFF_MAX.as_millis() <= u64::MAX as u128,
49 "LINUX_RETRY_BACKOFF_MAX exceeds u64::MAX ms"
50);
51
52#[cfg(target_os = "linux")]
53fn retry_backoff_delay(attempt: u32) -> Duration {
54 let factor = 1u64 << attempt.min(6);
55 let millis = LINUX_RETRY_BACKOFF_BASE
56 .as_millis()
57 .saturating_mul(u128::from(factor))
58 .min(LINUX_RETRY_BACKOFF_MAX.as_millis());
59 Duration::from_millis(millis as u64)
60}
61
62#[cfg(target_os = "linux")]
63const LINUX_RUNTIME_LISTENER_SCRIPT: &str = r#"
64import re
65import subprocess
66import sys
67
68def call(cmd):
69 proc = subprocess.run(cmd, capture_output=True, text=True)
70 if proc.returncode != 0:
71 raise RuntimeError((proc.stderr or proc.stdout).strip())
72 return proc.stdout.strip()
73
74def parse_address(output):
75 match = re.search(r"'([^']+)'", output)
76 return match.group(1) if match else None
77
78address_output = call([
79 "gdbus", "call",
80 "--session",
81 "--dest", "org.a11y.Bus",
82 "--object-path", "/org/a11y/bus",
83 "--method", "org.a11y.Bus.GetAddress",
84])
85
86address = parse_address(address_output)
87if not address:
88 sys.exit(0)
89
90monitor = subprocess.Popen(
91 [
92 "dbus-monitor",
93 "--address",
94 address,
95 "type='signal',interface='org.a11y.atspi.Event.Object'",
96 ],
97 stdout=subprocess.PIPE,
98 stderr=subprocess.DEVNULL,
99 text=True,
100 bufsize=1,
101)
102
103try:
104 for line in monitor.stdout or []:
105 if (
106 "member=TextSelectionChanged" in line
107 or "member=TextChanged" in line
108 or "member=StateChanged" in line
109 or "member=TextCaretMoved" in line
110 ):
111 print("__SC_EVENT__", flush=True)
112finally:
113 monitor.terminate()
114"#;
115
116#[cfg(target_os = "linux")]
117struct LinuxRuntimeWorker {
118 stop: Arc<AtomicBool>,
119 child: Arc<StdMutex<Option<Child>>>,
120 telemetry: Arc<LinuxWorkerTelemetry>,
121 handle: JoinHandle<()>,
122}
123
124#[cfg(target_os = "linux")]
125#[derive(Default)]
126struct LinuxWorkerTelemetry {
127 listener_exits: std::sync::atomic::AtomicU64,
128 listener_restarts: std::sync::atomic::AtomicU64,
129 listener_failures: std::sync::atomic::AtomicU64,
130}
131
132#[cfg(target_os = "linux")]
133impl LinuxWorkerTelemetry {
134 fn snapshot(&self) -> (u64, u64, u64) {
135 (
136 self.listener_exits.load(Ordering::SeqCst),
137 self.listener_restarts.load(Ordering::SeqCst),
138 self.listener_failures.load(Ordering::SeqCst),
139 )
140 }
141}
142
143#[cfg(target_os = "linux")]
144impl LinuxRuntimeWorker {
145 fn spawn() -> Option<Self> {
146 let stop = Arc::new(AtomicBool::new(false));
147 let child = Arc::new(StdMutex::new(None));
148 let telemetry = Arc::new(LinuxWorkerTelemetry::default());
149 let stdout = install_new_linux_listener(&child)?;
150 let stop_signal = Arc::clone(&stop);
151 let child_signal = Arc::clone(&child);
152 let telemetry_signal = Arc::clone(&telemetry);
153 let handle = thread::Builder::new()
154 .name("selection-capture-linux-runtime".to_string())
155 .spawn(move || {
156 let mut reader = BufReader::new(stdout);
157 loop {
158 if stop_signal.load(Ordering::SeqCst) {
159 break;
160 }
161
162 let mut line = String::new();
163 let Ok(read) = reader.read_line(&mut line) else {
164 telemetry_signal
165 .listener_exits
166 .fetch_add(1, Ordering::SeqCst);
167 if !restart_linux_listener(
168 &child_signal,
169 &stop_signal,
170 &telemetry_signal,
171 &mut reader,
172 ) {
173 break;
174 }
175 continue;
176 };
177 if read == 0 {
178 telemetry_signal
179 .listener_exits
180 .fetch_add(1, Ordering::SeqCst);
181 if !restart_linux_listener(
182 &child_signal,
183 &stop_signal,
184 &telemetry_signal,
185 &mut reader,
186 ) {
187 break;
188 }
189 continue;
190 }
191
192 if line.trim() == LINUX_RUNTIME_EVENT_MARKER {
193 if let Some(source) = linux_default_runtime_event_source() {
194 if let Some(text) = source() {
195 let _ = LinuxObserverBridge::push_event(text);
196 }
197 }
198 }
199 }
200
201 if let Ok(mut slot) = child_signal.lock() {
202 if let Some(mut child) = slot.take() {
203 let _ = child.kill();
204 let _ = child.wait();
205 }
206 }
207 })
208 .ok()?;
209 Some(Self {
210 stop,
211 child,
212 telemetry,
213 handle,
214 })
215 }
216
217 fn stop(self) -> bool {
218 self.stop.store(true, Ordering::SeqCst);
219 if let Ok(mut slot) = self.child.lock() {
220 if let Some(mut child) = slot.take() {
221 let _ = child.kill();
222 let _ = child.wait();
223 }
224 }
225 self.handle.join().is_ok()
226 }
227
228 fn telemetry_snapshot(&self) -> (u64, u64, u64) {
229 self.telemetry.snapshot()
230 }
231
232 fn is_running(&self) -> bool {
233 !self.handle.is_finished()
234 }
235}
236
237#[cfg(target_os = "linux")]
238fn spawn_linux_runtime_listener_process() -> Option<Child> {
239 Command::new("python3")
240 .args(["-u", "-c", LINUX_RUNTIME_LISTENER_SCRIPT])
241 .stdin(Stdio::null())
242 .stdout(Stdio::piped())
243 .stderr(Stdio::null())
244 .spawn()
245 .ok()
246}
247
248#[cfg(target_os = "linux")]
249fn install_new_linux_listener(child_slot: &Arc<StdMutex<Option<Child>>>) -> Option<ChildStdout> {
250 let mut child = spawn_linux_runtime_listener_process()?;
251 let stdout = child.stdout.take()?;
252 if let Ok(mut slot) = child_slot.lock() {
253 if let Some(mut previous) = slot.replace(child) {
254 let _ = previous.kill();
255 let _ = previous.wait();
256 }
257 }
258 Some(stdout)
259}
260
261#[cfg(target_os = "linux")]
262fn restart_linux_listener(
263 child_slot: &Arc<StdMutex<Option<Child>>>,
264 stop_signal: &Arc<AtomicBool>,
265 telemetry: &Arc<LinuxWorkerTelemetry>,
266 reader: &mut BufReader<ChildStdout>,
267) -> bool {
268 for attempt in 0..LINUX_RESTART_RETRY_LIMIT {
269 if stop_signal.load(Ordering::SeqCst) {
270 return false;
271 }
272 telemetry.listener_restarts.fetch_add(1, Ordering::SeqCst);
273 if let Some(stdout) = install_new_linux_listener(child_slot) {
274 *reader = BufReader::new(stdout);
275 return true;
276 }
277 telemetry.listener_failures.fetch_add(1, Ordering::SeqCst);
278 thread::sleep(retry_backoff_delay(attempt));
279 }
280 false
281}
282
283#[derive(Default)]
284struct LinuxDefaultRuntimeAdapterRuntime {
285 state: LinuxDefaultRuntimeAdapterState,
286 #[cfg(target_os = "linux")]
287 worker: Option<LinuxRuntimeWorker>,
288}
289
290fn adapter_runtime() -> &'static Mutex<LinuxDefaultRuntimeAdapterRuntime> {
291 static RUNTIME: OnceLock<Mutex<LinuxDefaultRuntimeAdapterRuntime>> = OnceLock::new();
292 RUNTIME.get_or_init(|| Mutex::new(LinuxDefaultRuntimeAdapterRuntime::default()))
293}
294
295fn event_source_slot() -> &'static Mutex<Option<LinuxDefaultRuntimeEventSource>> {
296 static SOURCE: OnceLock<Mutex<Option<LinuxDefaultRuntimeEventSource>>> = OnceLock::new();
297 SOURCE.get_or_init(|| Mutex::new(None))
298}
299
300#[cfg(target_os = "linux")]
301fn linux_default_runtime_event_source() -> Option<LinuxDefaultRuntimeEventSource> {
302 event_source_slot().lock().ok().and_then(|slot| *slot)
303}
304
305fn attach_default_linux_listener(runtime: &mut LinuxDefaultRuntimeAdapterRuntime) -> bool {
306 #[cfg(target_os = "linux")]
307 {
308 if runtime.worker.is_some() {
309 return true;
310 }
311 for attempt in 0..LINUX_ATTACH_RETRY_LIMIT {
312 if let Some(worker) = LinuxRuntimeWorker::spawn() {
313 runtime.worker = Some(worker);
314 return true;
315 }
316 runtime.state.listener_failures += 1;
317 thread::sleep(retry_backoff_delay(attempt));
318 }
319 false
320 }
321 #[cfg(not(target_os = "linux"))]
322 {
323 let _ = runtime;
324 true
325 }
326}
327
328fn detach_default_linux_listener(runtime: &mut LinuxDefaultRuntimeAdapterRuntime) -> bool {
329 #[cfg(target_os = "linux")]
330 {
331 runtime
332 .worker
333 .take()
334 .map(|worker| worker.stop())
335 .unwrap_or(true)
336 }
337 #[cfg(not(target_os = "linux"))]
338 {
339 let _ = runtime;
340 true
341 }
342}
343
344fn default_linux_runtime_adapter(active: bool) -> bool {
345 let Ok(mut runtime) = adapter_runtime().lock() else {
346 return false;
347 };
348
349 if active {
350 if runtime.state.attached {
351 return true;
352 }
353 if !attach_default_linux_listener(&mut runtime) {
354 return false;
355 }
356 runtime.state.attached = true;
357 runtime.state.worker_running = cfg!(target_os = "linux");
358 runtime.state.attach_calls += 1;
359 return true;
360 }
361
362 if !runtime.state.attached {
363 return true;
364 }
365 if !detach_default_linux_listener(&mut runtime) {
366 return false;
367 }
368 runtime.state.attached = false;
369 runtime.state.worker_running = false;
370 runtime.state.detach_calls += 1;
371 true
372}
373
374pub fn linux_default_runtime_adapter_state() -> LinuxDefaultRuntimeAdapterState {
375 adapter_runtime()
376 .lock()
377 .map(|runtime| {
378 #[cfg(target_os = "linux")]
379 {
380 let mut state = runtime.state;
381 if let Some(worker) = runtime.worker.as_ref() {
382 state.worker_running = state.worker_running && worker.is_running();
383 let (listener_exits, listener_restarts, listener_failures) =
384 worker.telemetry_snapshot();
385 state.listener_exits = state.listener_exits.saturating_add(listener_exits);
386 state.listener_restarts =
387 state.listener_restarts.saturating_add(listener_restarts);
388 state.listener_failures =
389 state.listener_failures.saturating_add(listener_failures);
390 }
391 state
392 }
393 #[cfg(not(target_os = "linux"))]
394 {
395 runtime.state
396 }
397 })
398 .unwrap_or_default()
399}
400
401pub fn set_linux_default_runtime_event_source(source: Option<LinuxDefaultRuntimeEventSource>) {
402 if let Ok(mut slot) = event_source_slot().lock() {
403 *slot = source;
404 }
405}
406
407pub fn linux_default_runtime_event_source_registered() -> bool {
408 event_source_slot()
409 .lock()
410 .map(|slot| slot.is_some())
411 .unwrap_or(false)
412}
413
414#[cfg(test)]
415fn reset_linux_default_runtime_adapter_state() {
416 let _ = default_linux_runtime_adapter(false);
417 if let Ok(mut runtime) = adapter_runtime().lock() {
418 *runtime = LinuxDefaultRuntimeAdapterRuntime::default();
419 }
420 set_linux_default_runtime_event_source(None);
421}
422
423#[cfg(all(test, target_os = "linux"))]
424fn kill_linux_listener_for_tests() -> bool {
425 let Ok(runtime) = adapter_runtime().lock() else {
426 return false;
427 };
428 let Some(worker) = runtime.worker.as_ref() else {
429 return false;
430 };
431 let Ok(mut slot) = worker.child.lock() else {
432 return false;
433 };
434 let Some(child) = slot.as_mut() else {
435 return false;
436 };
437 child.kill().is_ok()
438}
439
440pub fn install_default_linux_runtime_adapter_if_absent() {
441 if !linux_default_runtime_event_source_registered() {
442 set_linux_default_runtime_event_source(Some(linux_platform_runtime_event_source));
443 }
444 if !linux_native_runtime_adapter_registered() {
445 set_linux_native_runtime_adapter(Some(default_linux_runtime_adapter));
446 }
447}
448
449#[cfg(test)]
450mod tests {
451 use super::*;
452 use crate::linux_observer::linux_observer_test_lock;
453 use crate::{
454 ensure_linux_native_subscriber_hook_installed, linux_native_subscriber_stats,
455 LinuxObserverBridge,
456 };
457
458 #[test]
459 fn installing_default_adapter_enables_lifecycle_attempt_tracking() {
460 let _guard = linux_observer_test_lock()
461 .lock()
462 .expect("test lock poisoned");
463 let _ = LinuxObserverBridge::stop();
464 LinuxObserverBridge::set_lifecycle_hook(None);
465 reset_linux_default_runtime_adapter_state();
466 set_linux_native_runtime_adapter(None);
467 set_linux_default_runtime_event_source(None);
468 ensure_linux_native_subscriber_hook_installed();
469 install_default_linux_runtime_adapter_if_absent();
470 assert!(linux_native_runtime_adapter_registered());
471 assert!(linux_default_runtime_event_source_registered());
472
473 let before = linux_native_subscriber_stats();
474 let _ = LinuxObserverBridge::start();
475 let _ = LinuxObserverBridge::stop();
476 let after = linux_native_subscriber_stats();
477
478 assert!(after.adapter_attempts >= before.adapter_attempts);
479 }
480
481 #[test]
482 fn default_adapter_state_tracks_attach_detach_idempotently() {
483 let _guard = linux_observer_test_lock()
484 .lock()
485 .expect("test lock poisoned");
486 reset_linux_default_runtime_adapter_state();
487 assert_eq!(
488 linux_default_runtime_adapter_state(),
489 LinuxDefaultRuntimeAdapterState::default()
490 );
491
492 assert!(default_linux_runtime_adapter(true));
493 assert!(default_linux_runtime_adapter(true));
494 let started = linux_default_runtime_adapter_state();
495 assert!(started.attached);
496 assert_eq!(started.worker_running, cfg!(target_os = "linux"));
497 assert_eq!(started.attach_calls, 1);
498 assert_eq!(started.detach_calls, 0);
499
500 assert!(default_linux_runtime_adapter(false));
501 assert!(default_linux_runtime_adapter(false));
502 let stopped = linux_default_runtime_adapter_state();
503 assert!(!stopped.attached);
504 assert!(!stopped.worker_running);
505 assert_eq!(stopped.attach_calls, 1);
506 assert_eq!(stopped.detach_calls, 1);
507 }
508
509 #[test]
510 #[cfg(target_os = "linux")]
511 fn retry_backoff_delay_is_bounded_exponential() {
512 assert_eq!(retry_backoff_delay(0), Duration::from_millis(50));
513 assert_eq!(retry_backoff_delay(1), Duration::from_millis(100));
514 assert_eq!(retry_backoff_delay(2), Duration::from_millis(200));
515 assert_eq!(retry_backoff_delay(4), Duration::from_millis(800));
516 assert_eq!(retry_backoff_delay(8), Duration::from_millis(800));
517 }
518
519 #[test]
520 #[cfg(target_os = "linux")]
521 fn listener_restart_updates_telemetry_after_forced_kill() {
522 let _guard = linux_observer_test_lock()
523 .lock()
524 .expect("test lock poisoned");
525 let _ = LinuxObserverBridge::stop();
526 LinuxObserverBridge::set_lifecycle_hook(None);
527 reset_linux_default_runtime_adapter_state();
528 set_linux_native_runtime_adapter(None);
529 set_linux_default_runtime_event_source(None);
530 ensure_linux_native_subscriber_hook_installed();
531 install_default_linux_runtime_adapter_if_absent();
532
533 let _ = LinuxObserverBridge::start();
534 let before = linux_default_runtime_adapter_state();
535 if !before.attached || !before.worker_running {
536 let _ = LinuxObserverBridge::stop();
537 return;
538 }
539
540 if !kill_linux_listener_for_tests() {
541 let _ = LinuxObserverBridge::stop();
542 return;
543 }
544
545 let mut after = before;
546 for _ in 0..30 {
547 std::thread::sleep(Duration::from_millis(50));
548 after = linux_default_runtime_adapter_state();
549 if after.listener_restarts > before.listener_restarts
550 || after.listener_exits > before.listener_exits
551 {
552 break;
553 }
554 }
555
556 assert!(after.listener_exits >= before.listener_exits);
557 assert!(after.listener_restarts >= before.listener_restarts);
558 let _ = LinuxObserverBridge::stop();
559 }
560}