Skip to main content

selection_capture/
linux_runtime_adapter.rs

1use crate::linux::linux_default_runtime_event_source as linux_platform_runtime_event_source;
2#[cfg(target_os = "linux")]
3use crate::linux_observer::LinuxObserverBridge;
4use crate::linux_subscriber::{
5    linux_native_runtime_adapter_registered, set_linux_native_runtime_adapter,
6};
7#[cfg(target_os = "linux")]
8use std::io::{BufRead, BufReader};
9#[cfg(target_os = "linux")]
10use std::process::{Child, ChildStdout, Command, Stdio};
11#[cfg(target_os = "linux")]
12use std::sync::{
13    atomic::{AtomicBool, Ordering},
14    Arc, Mutex as StdMutex,
15};
16use std::sync::{Mutex, OnceLock};
17#[cfg(target_os = "linux")]
18use std::thread::{self, JoinHandle};
19#[cfg(target_os = "linux")]
20use std::time::Duration;
21
22#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
23pub struct LinuxDefaultRuntimeAdapterState {
24    pub attached: bool,
25    pub worker_running: bool,
26    pub attach_calls: u64,
27    pub detach_calls: u64,
28    pub listener_exits: u64,
29    pub listener_restarts: u64,
30    pub listener_failures: u64,
31}
32
33pub type LinuxDefaultRuntimeEventSource = fn() -> Option<String>;
34
35#[cfg(target_os = "linux")]
36const LINUX_RUNTIME_EVENT_MARKER: &str = "__SC_EVENT__";
37#[cfg(target_os = "linux")]
38const LINUX_ATTACH_RETRY_LIMIT: u32 = 4;
39#[cfg(target_os = "linux")]
40const LINUX_RESTART_RETRY_LIMIT: u32 = 8;
41#[cfg(target_os = "linux")]
42const LINUX_RETRY_BACKOFF_BASE: Duration = Duration::from_millis(50);
43#[cfg(target_os = "linux")]
44const LINUX_RETRY_BACKOFF_MAX: Duration = Duration::from_millis(800);
45
46#[cfg(target_os = "linux")]
47const _: () = assert!(
48    LINUX_RETRY_BACKOFF_MAX.as_millis() <= u64::MAX as u128,
49    "LINUX_RETRY_BACKOFF_MAX exceeds u64::MAX ms"
50);
51
52#[cfg(target_os = "linux")]
53fn retry_backoff_delay(attempt: u32) -> Duration {
54    let factor = 1u64 << attempt.min(6);
55    let millis = LINUX_RETRY_BACKOFF_BASE
56        .as_millis()
57        .saturating_mul(u128::from(factor))
58        .min(LINUX_RETRY_BACKOFF_MAX.as_millis());
59    Duration::from_millis(millis as u64)
60}
61
62#[cfg(target_os = "linux")]
63const LINUX_RUNTIME_LISTENER_SCRIPT: &str = r#"
64import re
65import subprocess
66import sys
67
68def call(cmd):
69    proc = subprocess.run(cmd, capture_output=True, text=True)
70    if proc.returncode != 0:
71        raise RuntimeError((proc.stderr or proc.stdout).strip())
72    return proc.stdout.strip()
73
74def parse_address(output):
75    match = re.search(r"'([^']+)'", output)
76    return match.group(1) if match else None
77
78address_output = call([
79    "gdbus", "call",
80    "--session",
81    "--dest", "org.a11y.Bus",
82    "--object-path", "/org/a11y/bus",
83    "--method", "org.a11y.Bus.GetAddress",
84])
85
86address = parse_address(address_output)
87if not address:
88    sys.exit(0)
89
90monitor = subprocess.Popen(
91    [
92        "dbus-monitor",
93        "--address",
94        address,
95        "type='signal',interface='org.a11y.atspi.Event.Object'",
96    ],
97    stdout=subprocess.PIPE,
98    stderr=subprocess.DEVNULL,
99    text=True,
100    bufsize=1,
101)
102
103try:
104    for line in monitor.stdout or []:
105        if (
106            "member=TextSelectionChanged" in line
107            or "member=TextChanged" in line
108            or "member=StateChanged" in line
109            or "member=TextCaretMoved" in line
110        ):
111            print("__SC_EVENT__", flush=True)
112finally:
113    monitor.terminate()
114"#;
115
116#[cfg(target_os = "linux")]
117struct LinuxRuntimeWorker {
118    stop: Arc<AtomicBool>,
119    child: Arc<StdMutex<Option<Child>>>,
120    telemetry: Arc<LinuxWorkerTelemetry>,
121    handle: JoinHandle<()>,
122}
123
124#[cfg(target_os = "linux")]
125#[derive(Default)]
126struct LinuxWorkerTelemetry {
127    listener_exits: std::sync::atomic::AtomicU64,
128    listener_restarts: std::sync::atomic::AtomicU64,
129    listener_failures: std::sync::atomic::AtomicU64,
130}
131
132#[cfg(target_os = "linux")]
133impl LinuxWorkerTelemetry {
134    fn snapshot(&self) -> (u64, u64, u64) {
135        (
136            self.listener_exits.load(Ordering::SeqCst),
137            self.listener_restarts.load(Ordering::SeqCst),
138            self.listener_failures.load(Ordering::SeqCst),
139        )
140    }
141}
142
143#[cfg(target_os = "linux")]
144impl LinuxRuntimeWorker {
145    fn spawn() -> Option<Self> {
146        let stop = Arc::new(AtomicBool::new(false));
147        let child = Arc::new(StdMutex::new(None));
148        let telemetry = Arc::new(LinuxWorkerTelemetry::default());
149        let stdout = install_new_linux_listener(&child)?;
150        let stop_signal = Arc::clone(&stop);
151        let child_signal = Arc::clone(&child);
152        let telemetry_signal = Arc::clone(&telemetry);
153        let handle = thread::Builder::new()
154            .name("selection-capture-linux-runtime".to_string())
155            .spawn(move || {
156                let mut reader = BufReader::new(stdout);
157                loop {
158                    if stop_signal.load(Ordering::SeqCst) {
159                        break;
160                    }
161
162                    let mut line = String::new();
163                    let Ok(read) = reader.read_line(&mut line) else {
164                        telemetry_signal
165                            .listener_exits
166                            .fetch_add(1, Ordering::SeqCst);
167                        if !restart_linux_listener(
168                            &child_signal,
169                            &stop_signal,
170                            &telemetry_signal,
171                            &mut reader,
172                        ) {
173                            break;
174                        }
175                        continue;
176                    };
177                    if read == 0 {
178                        telemetry_signal
179                            .listener_exits
180                            .fetch_add(1, Ordering::SeqCst);
181                        if !restart_linux_listener(
182                            &child_signal,
183                            &stop_signal,
184                            &telemetry_signal,
185                            &mut reader,
186                        ) {
187                            break;
188                        }
189                        continue;
190                    }
191
192                    if line.trim() == LINUX_RUNTIME_EVENT_MARKER {
193                        if let Some(source) = linux_default_runtime_event_source() {
194                            if let Some(text) = source() {
195                                let _ = LinuxObserverBridge::push_event(text);
196                            }
197                        }
198                    }
199                }
200
201                if let Ok(mut slot) = child_signal.lock() {
202                    if let Some(mut child) = slot.take() {
203                        let _ = child.kill();
204                        let _ = child.wait();
205                    }
206                }
207            })
208            .ok()?;
209        Some(Self {
210            stop,
211            child,
212            telemetry,
213            handle,
214        })
215    }
216
217    fn stop(self) -> bool {
218        self.stop.store(true, Ordering::SeqCst);
219        if let Ok(mut slot) = self.child.lock() {
220            if let Some(mut child) = slot.take() {
221                let _ = child.kill();
222                let _ = child.wait();
223            }
224        }
225        self.handle.join().is_ok()
226    }
227
228    fn telemetry_snapshot(&self) -> (u64, u64, u64) {
229        self.telemetry.snapshot()
230    }
231
232    fn is_running(&self) -> bool {
233        !self.handle.is_finished()
234    }
235}
236
237#[cfg(target_os = "linux")]
238fn spawn_linux_runtime_listener_process() -> Option<Child> {
239    Command::new("python3")
240        .args(["-u", "-c", LINUX_RUNTIME_LISTENER_SCRIPT])
241        .stdin(Stdio::null())
242        .stdout(Stdio::piped())
243        .stderr(Stdio::null())
244        .spawn()
245        .ok()
246}
247
248#[cfg(target_os = "linux")]
249fn install_new_linux_listener(child_slot: &Arc<StdMutex<Option<Child>>>) -> Option<ChildStdout> {
250    let mut child = spawn_linux_runtime_listener_process()?;
251    let stdout = child.stdout.take()?;
252    if let Ok(mut slot) = child_slot.lock() {
253        if let Some(mut previous) = slot.replace(child) {
254            let _ = previous.kill();
255            let _ = previous.wait();
256        }
257    }
258    Some(stdout)
259}
260
261#[cfg(target_os = "linux")]
262fn restart_linux_listener(
263    child_slot: &Arc<StdMutex<Option<Child>>>,
264    stop_signal: &Arc<AtomicBool>,
265    telemetry: &Arc<LinuxWorkerTelemetry>,
266    reader: &mut BufReader<ChildStdout>,
267) -> bool {
268    for attempt in 0..LINUX_RESTART_RETRY_LIMIT {
269        if stop_signal.load(Ordering::SeqCst) {
270            return false;
271        }
272        telemetry.listener_restarts.fetch_add(1, Ordering::SeqCst);
273        if let Some(stdout) = install_new_linux_listener(child_slot) {
274            *reader = BufReader::new(stdout);
275            return true;
276        }
277        telemetry.listener_failures.fetch_add(1, Ordering::SeqCst);
278        thread::sleep(retry_backoff_delay(attempt));
279    }
280    false
281}
282
283#[derive(Default)]
284struct LinuxDefaultRuntimeAdapterRuntime {
285    state: LinuxDefaultRuntimeAdapterState,
286    #[cfg(target_os = "linux")]
287    worker: Option<LinuxRuntimeWorker>,
288}
289
290fn adapter_runtime() -> &'static Mutex<LinuxDefaultRuntimeAdapterRuntime> {
291    static RUNTIME: OnceLock<Mutex<LinuxDefaultRuntimeAdapterRuntime>> = OnceLock::new();
292    RUNTIME.get_or_init(|| Mutex::new(LinuxDefaultRuntimeAdapterRuntime::default()))
293}
294
295fn event_source_slot() -> &'static Mutex<Option<LinuxDefaultRuntimeEventSource>> {
296    static SOURCE: OnceLock<Mutex<Option<LinuxDefaultRuntimeEventSource>>> = OnceLock::new();
297    SOURCE.get_or_init(|| Mutex::new(None))
298}
299
300#[cfg(target_os = "linux")]
301fn linux_default_runtime_event_source() -> Option<LinuxDefaultRuntimeEventSource> {
302    event_source_slot().lock().ok().and_then(|slot| *slot)
303}
304
305fn attach_default_linux_listener(runtime: &mut LinuxDefaultRuntimeAdapterRuntime) -> bool {
306    #[cfg(target_os = "linux")]
307    {
308        if runtime.worker.is_some() {
309            return true;
310        }
311        for attempt in 0..LINUX_ATTACH_RETRY_LIMIT {
312            if let Some(worker) = LinuxRuntimeWorker::spawn() {
313                runtime.worker = Some(worker);
314                return true;
315            }
316            runtime.state.listener_failures += 1;
317            thread::sleep(retry_backoff_delay(attempt));
318        }
319        false
320    }
321    #[cfg(not(target_os = "linux"))]
322    {
323        let _ = runtime;
324        true
325    }
326}
327
328fn detach_default_linux_listener(runtime: &mut LinuxDefaultRuntimeAdapterRuntime) -> bool {
329    #[cfg(target_os = "linux")]
330    {
331        runtime
332            .worker
333            .take()
334            .map(|worker| worker.stop())
335            .unwrap_or(true)
336    }
337    #[cfg(not(target_os = "linux"))]
338    {
339        let _ = runtime;
340        true
341    }
342}
343
344fn default_linux_runtime_adapter(active: bool) -> bool {
345    let Ok(mut runtime) = adapter_runtime().lock() else {
346        return false;
347    };
348
349    if active {
350        if runtime.state.attached {
351            return true;
352        }
353        if !attach_default_linux_listener(&mut runtime) {
354            return false;
355        }
356        runtime.state.attached = true;
357        runtime.state.worker_running = cfg!(target_os = "linux");
358        runtime.state.attach_calls += 1;
359        return true;
360    }
361
362    if !runtime.state.attached {
363        return true;
364    }
365    if !detach_default_linux_listener(&mut runtime) {
366        return false;
367    }
368    runtime.state.attached = false;
369    runtime.state.worker_running = false;
370    runtime.state.detach_calls += 1;
371    true
372}
373
374pub fn linux_default_runtime_adapter_state() -> LinuxDefaultRuntimeAdapterState {
375    adapter_runtime()
376        .lock()
377        .map(|runtime| {
378            #[cfg(target_os = "linux")]
379            {
380                let mut state = runtime.state;
381                if let Some(worker) = runtime.worker.as_ref() {
382                    state.worker_running = state.worker_running && worker.is_running();
383                    let (listener_exits, listener_restarts, listener_failures) =
384                        worker.telemetry_snapshot();
385                    state.listener_exits = state.listener_exits.saturating_add(listener_exits);
386                    state.listener_restarts =
387                        state.listener_restarts.saturating_add(listener_restarts);
388                    state.listener_failures =
389                        state.listener_failures.saturating_add(listener_failures);
390                }
391                state
392            }
393            #[cfg(not(target_os = "linux"))]
394            {
395                runtime.state
396            }
397        })
398        .unwrap_or_default()
399}
400
401pub fn set_linux_default_runtime_event_source(source: Option<LinuxDefaultRuntimeEventSource>) {
402    if let Ok(mut slot) = event_source_slot().lock() {
403        *slot = source;
404    }
405}
406
407pub fn linux_default_runtime_event_source_registered() -> bool {
408    event_source_slot()
409        .lock()
410        .map(|slot| slot.is_some())
411        .unwrap_or(false)
412}
413
414#[cfg(test)]
415fn reset_linux_default_runtime_adapter_state() {
416    let _ = default_linux_runtime_adapter(false);
417    if let Ok(mut runtime) = adapter_runtime().lock() {
418        *runtime = LinuxDefaultRuntimeAdapterRuntime::default();
419    }
420    set_linux_default_runtime_event_source(None);
421}
422
423#[cfg(all(test, target_os = "linux"))]
424fn kill_linux_listener_for_tests() -> bool {
425    let Ok(runtime) = adapter_runtime().lock() else {
426        return false;
427    };
428    let Some(worker) = runtime.worker.as_ref() else {
429        return false;
430    };
431    let Ok(mut slot) = worker.child.lock() else {
432        return false;
433    };
434    let Some(child) = slot.as_mut() else {
435        return false;
436    };
437    child.kill().is_ok()
438}
439
440pub fn install_default_linux_runtime_adapter_if_absent() {
441    if !linux_default_runtime_event_source_registered() {
442        set_linux_default_runtime_event_source(Some(linux_platform_runtime_event_source));
443    }
444    if !linux_native_runtime_adapter_registered() {
445        set_linux_native_runtime_adapter(Some(default_linux_runtime_adapter));
446    }
447}
448
449#[cfg(test)]
450mod tests {
451    use super::*;
452    use crate::linux_observer::linux_observer_test_lock;
453    use crate::{
454        ensure_linux_native_subscriber_hook_installed, linux_native_subscriber_stats,
455        LinuxObserverBridge,
456    };
457
458    #[test]
459    fn installing_default_adapter_enables_lifecycle_attempt_tracking() {
460        let _guard = linux_observer_test_lock()
461            .lock()
462            .expect("test lock poisoned");
463        let _ = LinuxObserverBridge::stop();
464        LinuxObserverBridge::set_lifecycle_hook(None);
465        reset_linux_default_runtime_adapter_state();
466        set_linux_native_runtime_adapter(None);
467        set_linux_default_runtime_event_source(None);
468        ensure_linux_native_subscriber_hook_installed();
469        install_default_linux_runtime_adapter_if_absent();
470        assert!(linux_native_runtime_adapter_registered());
471        assert!(linux_default_runtime_event_source_registered());
472
473        let before = linux_native_subscriber_stats();
474        let _ = LinuxObserverBridge::start();
475        let _ = LinuxObserverBridge::stop();
476        let after = linux_native_subscriber_stats();
477
478        assert!(after.adapter_attempts >= before.adapter_attempts);
479    }
480
481    #[test]
482    fn default_adapter_state_tracks_attach_detach_idempotently() {
483        let _guard = linux_observer_test_lock()
484            .lock()
485            .expect("test lock poisoned");
486        reset_linux_default_runtime_adapter_state();
487        assert_eq!(
488            linux_default_runtime_adapter_state(),
489            LinuxDefaultRuntimeAdapterState::default()
490        );
491
492        assert!(default_linux_runtime_adapter(true));
493        assert!(default_linux_runtime_adapter(true));
494        let started = linux_default_runtime_adapter_state();
495        assert!(started.attached);
496        assert_eq!(started.worker_running, cfg!(target_os = "linux"));
497        assert_eq!(started.attach_calls, 1);
498        assert_eq!(started.detach_calls, 0);
499
500        assert!(default_linux_runtime_adapter(false));
501        assert!(default_linux_runtime_adapter(false));
502        let stopped = linux_default_runtime_adapter_state();
503        assert!(!stopped.attached);
504        assert!(!stopped.worker_running);
505        assert_eq!(stopped.attach_calls, 1);
506        assert_eq!(stopped.detach_calls, 1);
507    }
508
509    #[test]
510    #[cfg(target_os = "linux")]
511    fn retry_backoff_delay_is_bounded_exponential() {
512        assert_eq!(retry_backoff_delay(0), Duration::from_millis(50));
513        assert_eq!(retry_backoff_delay(1), Duration::from_millis(100));
514        assert_eq!(retry_backoff_delay(2), Duration::from_millis(200));
515        assert_eq!(retry_backoff_delay(4), Duration::from_millis(800));
516        assert_eq!(retry_backoff_delay(8), Duration::from_millis(800));
517    }
518
519    #[test]
520    #[cfg(target_os = "linux")]
521    fn listener_restart_updates_telemetry_after_forced_kill() {
522        let _guard = linux_observer_test_lock()
523            .lock()
524            .expect("test lock poisoned");
525        let _ = LinuxObserverBridge::stop();
526        LinuxObserverBridge::set_lifecycle_hook(None);
527        reset_linux_default_runtime_adapter_state();
528        set_linux_native_runtime_adapter(None);
529        set_linux_default_runtime_event_source(None);
530        ensure_linux_native_subscriber_hook_installed();
531        install_default_linux_runtime_adapter_if_absent();
532
533        let _ = LinuxObserverBridge::start();
534        let before = linux_default_runtime_adapter_state();
535        if !before.attached || !before.worker_running {
536            let _ = LinuxObserverBridge::stop();
537            return;
538        }
539
540        if !kill_linux_listener_for_tests() {
541            let _ = LinuxObserverBridge::stop();
542            return;
543        }
544
545        let mut after = before;
546        for _ in 0..30 {
547            std::thread::sleep(Duration::from_millis(50));
548            after = linux_default_runtime_adapter_state();
549            if after.listener_restarts > before.listener_restarts
550                || after.listener_exits > before.listener_exits
551            {
552                break;
553            }
554        }
555
556        assert!(after.listener_exits >= before.listener_exits);
557        assert!(after.listener_restarts >= before.listener_restarts);
558        let _ = LinuxObserverBridge::stop();
559    }
560}