waydriver-compositor-mutter 0.2.8

Mutter headless compositor backend for waydriver
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
//! Mutter implementation of [`waydriver::CompositorRuntime`].
//!
//! Owns the private-bus `dbus-daemon`, the `pipewire` + `wireplumber` pair,
//! and a headless `mutter --wayland` instance. After [`MutterCompositor::start`]
//! returns, [`MutterCompositor::state`] exposes an `Arc<MutterState>` that
//! sibling backends (`waydriver-input-mutter`, `waydriver-capture-mutter`) use
//! to talk to the same mutter D-Bus session.
//!
//! ## Shared-state invariant
//!
//! While any `Arc<MutterState>` exists, the mutter child processes and the
//! private D-Bus connection MUST remain alive. [`waydriver::Session::kill`]
//! enforces this by dropping input and capture trait objects before calling
//! `compositor.stop().await`.

mod error;

use std::path::{Path, PathBuf};
use std::process::Stdio;
use std::sync::{Arc, LazyLock, Mutex};

use async_trait::async_trait;
use tokio::process::{Child, Command};

use waydriver::{CompositorRuntime, Result};

use crate::error::MutterError;

/// Default virtual-monitor geometry passed to mutter when the caller doesn't
/// override it. Matches mutter's own implicit default.
const DEFAULT_RESOLUTION: &str = "1024x768";

/// Shared mutter-backend state consumed by `waydriver-input-mutter` and
/// `waydriver-capture-mutter`.
///
/// **Invariant:** while any `Arc<MutterState>` exists, the underlying D-Bus
/// connection and the mutter child process must remain alive. See the
/// module docs for details.
///
/// Fields are private — all access goes through the accessor methods
/// below. Sibling crates (`waydriver-input-mutter`,
/// `waydriver-capture-mutter`) that previously read fields directly
/// now call `state.conn()`, `state.rd_session_path()`, etc. The
/// shape of the underlying storage (e.g. how `active_stream_path` is
/// guarded) is therefore an implementation detail that can change
/// without breaking those callers — the contract lives entirely in
/// the method signatures.
pub struct MutterState {
    conn: zbus::Connection,
    rd_session_path: String,
    rd_session_id: String,
    rd_started: Arc<Mutex<bool>>,
    runtime_dir: PathBuf,
    active_stream_path: Arc<Mutex<Option<String>>>,
}

impl MutterState {
    /// Persistent connection to mutter's private D-Bus.
    ///
    /// Both sibling backends (`waydriver-input-mutter`,
    /// `waydriver-capture-mutter`) issue all their RemoteDesktop and
    /// ScreenCast method calls through this connection.
    pub fn conn(&self) -> &zbus::Connection {
        &self.conn
    }

    /// RemoteDesktop session object path. Used by
    /// `waydriver-input-mutter` as the `path` argument on every
    /// pointer / keyboard `Notify*` D-Bus call.
    pub fn rd_session_path(&self) -> &str {
        &self.rd_session_path
    }

    /// RemoteDesktop session id, read from the `SessionId` property on
    /// the RD session. `waydriver-capture-mutter` passes this as the
    /// `remote-desktop-session-id` option to
    /// `ScreenCast.CreateSession` so mutter links the two; the link is
    /// required for `NotifyPointerMotionAbsolute` to be accepted.
    pub fn rd_session_id(&self) -> &str {
        &self.rd_session_id
    }

    /// Per-session `XDG_RUNTIME_DIR`. `waydriver-capture-mutter` joins
    /// this with `pipewire-0` to locate the PipeWire socket.
    pub fn runtime_dir(&self) -> &Path {
        &self.runtime_dir
    }

    /// Lock the "RD-started" flag.
    ///
    /// Acquires the underlying mutex and returns the guard so the
    /// caller can perform a check-and-set under one critical section
    /// (the capture backend defers `RD.Session.Start` until the first
    /// linked `ScreenCast.CreateSession` succeeds — that's a load,
    /// some D-Bus work, and a store; splitting the read and write
    /// would race). `Error::Process` if the mutex is poisoned.
    pub fn rd_started_lock(&self) -> Result<std::sync::MutexGuard<'_, bool>> {
        self.rd_started
            .lock()
            .map_err(|_| waydriver::Error::process("rd_started mutex poisoned"))
    }

    /// Lock the active ScreenCast Stream object path.
    ///
    /// Set by `waydriver-capture-mutter` in `start_stream`, cleared in
    /// `stop_stream`. `waydriver-input-mutter` reads it to route
    /// `NotifyPointerMotionAbsolute` at the correct monitor. `None`
    /// inside the guard means no stream is open — absolute pointer
    /// motion will error.
    pub fn active_stream_path_lock(&self) -> Result<std::sync::MutexGuard<'_, Option<String>>> {
        self.active_stream_path
            .lock()
            .map_err(|_| waydriver::Error::process("active_stream_path mutex poisoned"))
    }
}

/// Headless mutter instance.
pub struct MutterCompositor {
    id: String,
    wayland_display: String,
    runtime_dir: PathBuf,
    mutter_dbus_address: String,
    mutter_dbus_pid: Option<u32>,
    mutter: Option<Child>,
    pipewire: Option<Child>,
    wireplumber: Option<Child>,
    state: Option<Arc<MutterState>>,
}

/// The host runtime root under which every session's `wd-session-<id>`
/// directory is created. Snapshotted once, lazily, on the first
/// `MutterCompositor::new()` call.
///
/// This is deliberately read **once** and cached, rather than re-read from
/// `XDG_RUNTIME_DIR` per session. `waydriver`'s screenshot and video pipelines
/// (`waydriver::capture`) mutate the parent process's `XDG_RUNTIME_DIR` to
/// point `pipewiresrc` at the *live* session's pipewire socket, and never
/// restore it. If `new()` re-read the live env each time, session N+1's
/// runtime dir would be created **inside** session N's dir
/// (`…/wd-session-A/wd-session-B/…`), nesting one level deeper per session.
/// After ~4 levels the `<dir>/pipewire-0` path exceeds the ~107-byte AF_UNIX
/// `sun_path` limit, pipewire can no longer bind its socket, and every
/// subsequent `start_session` fails with a "timeout: pipewire socket" error
/// until the server is restarted (which resets the process env). Snapshotting
/// the root keeps each session dir a flat sibling under the original
/// `XDG_RUNTIME_DIR`, independent of how many sessions preceded it.
///
/// The first `new()` runs before any session exists, so the env is still the
/// pristine value set by the launcher (e.g. the Docker entrypoint) — capturing
/// it then is safe.
static HOST_RUNTIME_ROOT: LazyLock<PathBuf> = LazyLock::new(|| {
    let root = std::env::var("XDG_RUNTIME_DIR")
        .unwrap_or_else(|_| format!("/run/user/{}", unsafe { libc::getuid() }));
    PathBuf::from(root)
});

impl MutterCompositor {
    /// Construct but do not start. Generates the session id and computes
    /// where the Wayland socket and runtime dir will live. No I/O.
    pub fn new() -> Self {
        let id = uuid::Uuid::new_v4().to_string()[..8].to_string();
        let wayland_display = format!("wayland-wd-{}", id);

        let runtime_dir = HOST_RUNTIME_ROOT.join(format!("wd-session-{}", id));

        Self {
            id,
            wayland_display,
            runtime_dir,
            mutter_dbus_address: String::new(),
            mutter_dbus_pid: None,
            mutter: None,
            pipewire: None,
            wireplumber: None,
            state: None,
        }
    }

    /// Returns the shared `Arc<MutterState>` for passing to sibling
    /// backends, or `None` when called outside the started window.
    ///
    /// `None` is returned when:
    /// - `start()` has not yet completed (or returned an error), or
    /// - `stop()` has been called and dropped the state.
    ///
    /// Callers that have just awaited `start()?` know the state is
    /// present — `expect()` or `?`-with-typed-error is appropriate
    /// there. Returning `Option` instead of panicking keeps the API
    /// honest about the lifecycle and lets callers detect "stopped"
    /// without first matching on a panic.
    pub fn state(&self) -> Option<Arc<MutterState>> {
        self.state.clone()
    }
}

impl Default for MutterCompositor {
    fn default() -> Self {
        Self::new()
    }
}

impl MutterCompositor {
    /// Typed-error implementation of `start`. The trait method calls
    /// this and converts the result via `From<MutterError>`.
    ///
    /// Steps (each fails with a specific `MutterError` variant):
    /// 1. validate resolution,
    /// 2. ensure the session runtime dir exists,
    /// 3. spawn a private `dbus-daemon` and parse its address + PID,
    /// 4. spawn `pipewire` + `wireplumber` on that bus,
    /// 5. spawn headless `mutter --wayland`,
    /// 6. wait for the Wayland socket,
    /// 7. open a zbus connection, retry-create the RemoteDesktop session,
    /// 8. read its `SessionId` property,
    /// 9. publish the `Arc<MutterState>` for sibling backends.
    async fn start_inner(
        &mut self,
        resolution: Option<&str>,
    ) -> std::result::Result<(), MutterError> {
        let resolution = resolution.unwrap_or(DEFAULT_RESOLUTION);
        // Validate before we start spawning subprocesses — mutter silently
        // ignores bad --virtual-monitor values and falls back to its own
        // default, which would surprise the caller.
        parse_resolution(resolution)?;

        tracing::info!(id = self.id, resolution, "starting mutter compositor");

        tokio::fs::create_dir_all(&self.runtime_dir).await?;
        // `runtime_dir` is built in `new()` from a UTF-8 String
        // (XDG_RUNTIME_DIR or `/run/user/<uid>`) joined with a UTF-8
        // ASCII session id, so the path is guaranteed valid UTF-8.
        // `expect` documents that invariant rather than re-deriving
        // it via the `to_str()` `Option`.
        let runtime_str = self
            .runtime_dir
            .to_str()
            .expect("invariant: runtime_dir built from UTF-8 inputs in new()")
            .to_string();

        // Step 1: Private D-Bus for mutter (so its ScreenCast API doesn't conflict with host).
        let dbus_output = Command::new("dbus-launch")
            .arg("--sh-syntax")
            .output()
            .await?;
        if !dbus_output.status.success() {
            return Err(MutterError::DbusLaunchFailed(
                String::from_utf8_lossy(&dbus_output.stderr).into_owned(),
            ));
        }
        let dbus_stdout = String::from_utf8_lossy(&dbus_output.stdout);
        self.mutter_dbus_address = parse_dbus_address(&dbus_stdout)?;
        self.mutter_dbus_pid = Some(parse_dbus_pid(&dbus_stdout)?);
        tracing::debug!(id = self.id, mutter_dbus_address = %self.mutter_dbus_address, "private D-Bus for mutter");

        // Step 2: PipeWire + WirePlumber (for screenshots via ScreenCast).
        //
        // `env_remove("PIPEWIRE_REMOTE")` is load-bearing: `waydriver`'s
        // `grab_png_sync` mutates the parent's process env to point
        // `pipewiresrc` at the live session's pipewire socket. After a
        // session stops, that socket is gone but the env var lingers in
        // the parent. Without scrubbing it here, a freshly spawned
        // `pipewire`/`wireplumber`/`mutter` for the next session would
        // inherit the stale value and try to connect to the previous
        // session's dead socket — wireplumber/mutter prefer
        // `PIPEWIRE_REMOTE` over `XDG_RUNTIME_DIR/pipewire-0`, so the
        // explicit `XDG_RUNTIME_DIR` override below isn't enough.
        // Symptom: `ScreenCast.Start` fails with "Couldn't connect
        // pipewire context" on every session after the first.
        let pipewire = Command::new("pipewire")
            .env_remove("PIPEWIRE_REMOTE")
            .env("DBUS_SESSION_BUS_ADDRESS", &self.mutter_dbus_address)
            .env("XDG_RUNTIME_DIR", &runtime_str)
            .stdout(Stdio::null())
            .stderr(Stdio::null())
            .spawn()
            .map_err(|source| MutterError::Spawn {
                process: "pipewire",
                source,
            })?;
        self.pipewire = Some(pipewire);

        // Wait for pipewire's socket to appear before launching
        // wireplumber. Polling for the socket file is the same
        // readiness signal `wait_for_wayland_socket` uses for
        // mutter: it's the actual handshake clients use, so any
        // earlier signal would either be racier (process spawn) or
        // just as expensive to probe.
        wait_for_pipewire_socket(&runtime_str).await?;

        let wireplumber = Command::new("wireplumber")
            .env_remove("PIPEWIRE_REMOTE")
            .env("DBUS_SESSION_BUS_ADDRESS", &self.mutter_dbus_address)
            .env("XDG_RUNTIME_DIR", &runtime_str)
            .stdout(Stdio::null())
            .stderr(Stdio::null())
            .spawn()
            .map_err(|source| MutterError::Spawn {
                process: "wireplumber",
                source,
            })?;
        self.wireplumber = Some(wireplumber);

        // No bus-readiness signal poll for wireplumber: it's a
        // session-policy daemon that doesn't register a stable D-Bus
        // name we can probe, and its initialisation runs in parallel
        // with mutter's own startup. The downstream
        // `ScreenCast.CreateSession` retry loop in
        // `waydriver-capture-mutter::start_stream` is what actually
        // gates on wireplumber having joined the graph — putting a
        // pessimistic sleep here as well would add startup latency
        // without changing correctness.
        tracing::debug!(id = self.id, "PipeWire + WirePlumber started");

        // Step 3: mutter in headless Wayland mode (on its private D-Bus).
        let mutter = Command::new("mutter")
            .args([
                "--headless",
                "--wayland",
                "--no-x11",
                "--wayland-display",
                &self.wayland_display,
                "--virtual-monitor",
                resolution,
            ])
            .env_remove("PIPEWIRE_REMOTE")
            .env("DBUS_SESSION_BUS_ADDRESS", &self.mutter_dbus_address)
            .env("XDG_RUNTIME_DIR", &runtime_str)
            .stdout(Stdio::null())
            .stderr(Stdio::inherit())
            .spawn()
            .map_err(|source| MutterError::Spawn {
                process: "mutter",
                source,
            })?;
        self.mutter = Some(mutter);
        tracing::debug!(id = self.id, wayland_display = %self.wayland_display, "mutter spawned");

        // Step 4: Wait for the Wayland socket.
        wait_for_wayland_socket(&runtime_str, &self.wayland_display).await?;
        tracing::debug!(id = self.id, "wayland socket ready");

        // Step 5: Connect to mutter's private D-Bus and start RemoteDesktop session.
        let mutter_addr: zbus::address::Address = self
            .mutter_dbus_address
            .as_str()
            .try_into()
            .map_err(|source: zbus::Error| MutterError::DbusAddressInvalid {
                addr: self.mutter_dbus_address.clone(),
                source,
            })?;
        let mutter_conn = zbus::connection::Builder::address(mutter_addr)
            .map_err(|source| MutterError::DbusConnect {
                stage: "build connection builder",
                source,
            })?
            .build()
            .await
            .map_err(|source| MutterError::DbusConnect {
                stage: "connect",
                source,
            })?;

        // Wait for mutter to register its D-Bus services (may take a moment after socket appears)
        let mut rd_reply = None;
        for i in 0..50 {
            match mutter_conn
                .call_method(
                    Some("org.gnome.Mutter.RemoteDesktop"),
                    "/org/gnome/Mutter/RemoteDesktop",
                    Some("org.gnome.Mutter.RemoteDesktop"),
                    "CreateSession",
                    &(),
                )
                .await
            {
                Ok(reply) => {
                    rd_reply = Some(reply);
                    break;
                }
                Err(e) if i < 49 => {
                    tracing::debug!(
                        id = self.id,
                        attempt = i,
                        "waiting for RemoteDesktop service: {e}"
                    );
                    tokio::time::sleep(std::time::Duration::from_millis(100)).await;
                }
                Err(e) => {
                    return Err(MutterError::RemoteDesktopCreate(e));
                }
            }
        }
        // The retry loop above either `break`s with `rd_reply = Some(_)`
        // or returns `Err(...)` from the final attempt — `unwrap` here
        // is unreachable by construction.
        let rd_reply = rd_reply.expect("retry loop sets Some on break or returns Err");
        let rd_session_path: zbus::zvariant::OwnedObjectPath = rd_reply
            .body()
            .deserialize()
            .map_err(MutterError::RdSessionPathParse)?;
        // Intentionally do NOT call `RemoteDesktop.Session.Start` here.
        // Mutter only accepts `remote-desktop-session-id` on
        // `ScreenCast.CreateSession` when the RD session is not yet
        // started, so `waydriver-capture-mutter::start_stream` defers
        // the Start call until after it has created the linked
        // ScreenCast session.
        // Read the RD session's `SessionId` property — it's the token
        // ScreenCast.CreateSession needs in `remote-desktop-session-id`
        // to link the two sessions. Without that link, mutter rejects
        // NotifyPointerMotionAbsolute with "No screen cast active".
        let rd_session_id_reply = mutter_conn
            .call_method(
                Some("org.gnome.Mutter.RemoteDesktop"),
                rd_session_path.as_str(),
                Some("org.freedesktop.DBus.Properties"),
                "Get",
                &("org.gnome.Mutter.RemoteDesktop.Session", "SessionId"),
            )
            .await
            .map_err(MutterError::SessionIdGet)?;
        // `Get` returns a variant; deserialize as `OwnedValue` to detach
        // the string from the reply's body before the reply is dropped.
        let rd_session_id_body = rd_session_id_reply.body();
        let rd_session_id_variant: zbus::zvariant::OwnedValue = rd_session_id_body
            .deserialize()
            .map_err(MutterError::SessionIdVariantParse)?;
        let rd_session_id: String = rd_session_id_variant
            .try_into()
            .map_err(MutterError::SessionIdNotString)?;

        let rd_session_path = rd_session_path.to_string();
        tracing::debug!(
            id = self.id,
            rd_session_path = %rd_session_path,
            rd_session_id = %rd_session_id,
            "RemoteDesktop session started"
        );

        self.state = Some(Arc::new(MutterState {
            conn: mutter_conn,
            rd_session_path,
            rd_session_id,
            rd_started: Arc::new(Mutex::new(false)),
            runtime_dir: self.runtime_dir.clone(),
            active_stream_path: Arc::new(Mutex::new(None)),
        }));

        Ok(())
    }
}

#[async_trait]
impl CompositorRuntime for MutterCompositor {
    async fn start(&mut self, resolution: Option<&str>) -> Result<()> {
        // Body uses the crate-local typed `MutterError`. The `?` at the
        // end of `self.start_inner(...).await?` runs the
        // `From<MutterError> for waydriver::Error` impl in `error.rs`,
        // which is the single boundary at which the typed enum becomes
        // the workspace's shared `waydriver::Error`.
        Ok(self.start_inner(resolution).await?)
    }

    async fn stop(&mut self) -> Result<()> {
        tracing::info!(id = self.id, "stopping mutter compositor");

        // Stop RemoteDesktop session if still reachable. We could
        // touch the private fields directly here (same crate), but
        // routing through the public accessors keeps the contract
        // visible and means a future change to the field layout
        // doesn't need to update this site.
        if let Some(state) = &self.state {
            let _ = state
                .conn()
                .call_method(
                    Some("org.gnome.Mutter.RemoteDesktop"),
                    state.rd_session_path(),
                    Some("org.gnome.Mutter.RemoteDesktop.Session"),
                    "Stop",
                    &(),
                )
                .await;
        }

        // Drop our strong ref to the shared state. If callers haven't dropped
        // theirs (the input/capture trait objects), their Arc still points at
        // the D-Bus connection we're about to tear down below — any method
        // call on them after this will fail with "connection closed".
        self.state = None;

        if let Some(mut mutter) = self.mutter.take() {
            let _ = mutter.kill().await;
            let _ = mutter.wait().await;
        }
        if let Some(mut wireplumber) = self.wireplumber.take() {
            let _ = wireplumber.kill().await;
            let _ = wireplumber.wait().await;
        }
        if let Some(mut pipewire) = self.pipewire.take() {
            let _ = pipewire.kill().await;
            let _ = pipewire.wait().await;
        }

        if let Some(pid) = self.mutter_dbus_pid.take() {
            unsafe {
                libc::kill(pid as i32, libc::SIGTERM);
            }
        }

        let _ = tokio::fs::remove_dir_all(&self.runtime_dir).await;

        tracing::debug!(id = self.id, "mutter compositor stopped");
        Ok(())
    }

    fn id(&self) -> &str {
        &self.id
    }

    fn wayland_display(&self) -> &str {
        &self.wayland_display
    }

    fn runtime_dir(&self) -> &Path {
        &self.runtime_dir
    }
}

impl Drop for MutterCompositor {
    fn drop(&mut self) {
        // Best-effort cleanup when dropped without calling stop().
        // Can't use async here, so send SIGKILL synchronously.
        self.state = None;

        if let Some(ref mut child) = self.mutter {
            let _ = child.start_kill();
        }
        if let Some(ref mut child) = self.wireplumber {
            let _ = child.start_kill();
        }
        if let Some(ref mut child) = self.pipewire {
            let _ = child.start_kill();
        }
        if let Some(pid) = self.mutter_dbus_pid {
            unsafe {
                libc::kill(pid as i32, libc::SIGKILL);
            }
        }
        let _ = std::fs::remove_dir_all(&self.runtime_dir);
    }
}

// ── Helpers ─────────────────────────────────────────────────────────────────

fn parse_dbus_address(output: &str) -> std::result::Result<String, MutterError> {
    for line in output.lines() {
        if let Some(rest) = line.strip_prefix("DBUS_SESSION_BUS_ADDRESS='") {
            if let Some(addr) = rest.strip_suffix("';") {
                return Ok(addr.to_string());
            }
        }
    }
    Err(MutterError::DbusOutputMissingField {
        field: "DBUS_SESSION_BUS_ADDRESS",
    })
}

fn parse_dbus_pid(output: &str) -> std::result::Result<u32, MutterError> {
    for line in output.lines() {
        if let Some(rest) = line.strip_prefix("DBUS_SESSION_BUS_PID=") {
            let pid_str = rest.trim_end_matches(';').trim();
            return pid_str.parse().map_err(MutterError::DbusPidParse);
        }
    }
    Err(MutterError::DbusOutputMissingField {
        field: "DBUS_SESSION_BUS_PID",
    })
}

fn parse_resolution(s: &str) -> std::result::Result<(u32, u32), MutterError> {
    let invalid = || MutterError::ResolutionInvalid {
        value: s.to_string(),
    };
    let (w, h) = s.split_once('x').ok_or_else(invalid)?;
    let parse = |part: &str| -> std::result::Result<u32, MutterError> {
        part.parse::<u32>()
            .ok()
            .filter(|n| *n > 0)
            .ok_or_else(invalid)
    };
    Ok((parse(w)?, parse(h)?))
}

async fn wait_for_wayland_socket(
    runtime_dir: &str,
    display: &str,
) -> std::result::Result<(), MutterError> {
    let socket_path = PathBuf::from(runtime_dir).join(display);
    for _ in 0..50 {
        if socket_path.exists() {
            return Ok(());
        }
        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
    }
    Err(MutterError::WaylandSocketTimeout {
        socket: socket_path.display().to_string(),
    })
}

/// PipeWire creates `<runtime_dir>/pipewire-0` as soon as it's ready
/// to accept client connections. Polling for that file replaces the
/// previous unconditional `sleep(1s)` after spawning the pipewire
/// process — same readiness model as
/// [`wait_for_wayland_socket`].
async fn wait_for_pipewire_socket(runtime_dir: &str) -> std::result::Result<(), MutterError> {
    let socket_path = PathBuf::from(runtime_dir).join("pipewire-0");
    for _ in 0..50 {
        if socket_path.exists() {
            return Ok(());
        }
        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
    }
    Err(MutterError::PipewireSocketTimeout {
        socket: socket_path.display().to_string(),
    })
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_parse_dbus_address_valid() {
        let output = "DBUS_SESSION_BUS_ADDRESS='unix:abstract=/tmp/dbus-XXX,guid=abc123';\nDBUS_SESSION_BUS_PID=12345;\n";
        let addr = parse_dbus_address(output).unwrap();
        assert_eq!(addr, "unix:abstract=/tmp/dbus-XXX,guid=abc123");
    }

    #[test]
    fn test_parse_dbus_address_missing() {
        let output = "DBUS_SESSION_BUS_PID=12345;\n";
        assert!(parse_dbus_address(output).is_err());
    }

    #[test]
    fn test_parse_dbus_pid_valid() {
        let output = "DBUS_SESSION_BUS_ADDRESS='unix:abstract=/tmp/dbus-XXX,guid=abc123';\nDBUS_SESSION_BUS_PID=12345;\n";
        let pid = parse_dbus_pid(output).unwrap();
        assert_eq!(pid, 12345);
    }

    #[test]
    fn test_parse_dbus_pid_missing() {
        let output = "DBUS_SESSION_BUS_ADDRESS='unix:abstract=/tmp/dbus-XXX,guid=abc123';\n";
        assert!(parse_dbus_pid(output).is_err());
    }

    #[test]
    fn test_parse_dbus_pid_invalid() {
        let output = "DBUS_SESSION_BUS_PID=notanumber;\n";
        assert!(parse_dbus_pid(output).is_err());
    }

    #[tokio::test]
    async fn test_wait_for_socket_found() {
        let dir = tempfile::tempdir().unwrap();
        let runtime_dir = dir.path().to_str().unwrap().to_string();
        let display = "wayland-test-99";
        std::fs::File::create(dir.path().join(display)).unwrap();
        wait_for_wayland_socket(&runtime_dir, display)
            .await
            .unwrap();
    }

    #[tokio::test]
    async fn test_wait_for_pipewire_socket_found() {
        let dir = tempfile::tempdir().unwrap();
        let runtime_dir = dir.path().to_str().unwrap().to_string();
        std::fs::File::create(dir.path().join("pipewire-0")).unwrap();
        wait_for_pipewire_socket(&runtime_dir).await.unwrap();
    }

    #[tokio::test]
    async fn test_wait_for_pipewire_socket_timeout() {
        let dir = tempfile::tempdir().unwrap();
        let runtime_dir = dir.path().to_str().unwrap().to_string();
        let err = wait_for_pipewire_socket(&runtime_dir).await.unwrap_err();
        assert!(
            matches!(err, MutterError::PipewireSocketTimeout { .. }),
            "expected PipewireSocketTimeout, got: {err}"
        );
        // Public mapping: same Timeout bucket as the wayland one,
        // so workspace callers matching `Error::Timeout(_)` (e.g.
        // the e2e tests) keep working.
        let public: waydriver::Error = err.into();
        assert!(
            matches!(public, waydriver::Error::Timeout(_)),
            "expected waydriver::Error::Timeout, got: {public}"
        );
    }

    #[tokio::test]
    async fn test_wait_for_socket_timeout() {
        let dir = tempfile::tempdir().unwrap();
        let runtime_dir = dir.path().to_str().unwrap().to_string();
        let display = "wayland-nonexistent-0";
        let err = wait_for_wayland_socket(&runtime_dir, display)
            .await
            .unwrap_err();
        assert!(
            matches!(err, MutterError::WaylandSocketTimeout { .. }),
            "expected WaylandSocketTimeout, got: {err}"
        );
        // And confirm the From<MutterError> -> waydriver::Error mapping
        // still produces the public Timeout variant — workspace callers
        // (notably the e2e tests) match on it.
        let public: waydriver::Error = err.into();
        assert!(
            matches!(public, waydriver::Error::Timeout(_)),
            "expected waydriver::Error::Timeout, got: {public}"
        );
    }

    #[test]
    fn test_new_generates_unique_ids() {
        let a = MutterCompositor::new();
        let b = MutterCompositor::new();
        assert_ne!(a.id(), b.id());
    }

    #[test]
    fn test_new_wayland_display_contains_id() {
        let c = MutterCompositor::new();
        assert!(
            c.wayland_display().contains(c.id()),
            "display '{}' should contain id '{}'",
            c.wayland_display(),
            c.id()
        );
    }

    #[test]
    fn test_new_runtime_dir_contains_id() {
        let c = MutterCompositor::new();
        let dir_str = c.runtime_dir().to_str().unwrap();
        assert!(
            dir_str.contains(c.id()),
            "runtime_dir '{}' should contain id '{}'",
            dir_str,
            c.id()
        );
    }

    /// Regression: session runtime dirs must be flat siblings under one root,
    /// never nested inside each other. `waydriver::capture` repoints the
    /// process-wide `XDG_RUNTIME_DIR` at the live session's dir after a
    /// screenshot/recording; if `new()` re-read that mutated value, each
    /// session would nest one level deeper and eventually overflow the
    /// AF_UNIX `sun_path` limit, wedging pipewire socket creation. See
    /// `HOST_RUNTIME_ROOT`.
    #[test]
    fn test_session_runtime_dirs_are_siblings_not_nested() {
        let a = MutterCompositor::new();
        let dir_a = a.runtime_dir().to_path_buf();

        // Simulate what a screenshot/recording does: point XDG_RUNTIME_DIR at
        // the live session's runtime dir and leave it there.
        unsafe {
            std::env::set_var("XDG_RUNTIME_DIR", &dir_a);
        }

        let b = MutterCompositor::new();
        let dir_b = b.runtime_dir().to_path_buf();

        assert_eq!(
            dir_a.parent(),
            dir_b.parent(),
            "session dirs must share a parent (siblings), got a={dir_a:?} b={dir_b:?}"
        );
        assert!(
            !dir_b.starts_with(&dir_a),
            "session B nested inside session A: {dir_b:?}"
        );
    }

    #[test]
    fn test_new_wayland_display_prefix() {
        let c = MutterCompositor::new();
        assert!(c.wayland_display().starts_with("wayland-wd-"));
    }

    #[test]
    fn test_new_runtime_dir_contains_session_prefix() {
        let c = MutterCompositor::new();
        let dir_str = c.runtime_dir().to_str().unwrap();
        assert!(dir_str.contains("wd-session-"));
    }

    #[test]
    fn test_state_returns_none_before_start() {
        // `state()` previously panicked when called outside the started
        // window. The current contract returns `None` so callers can
        // detect the lifecycle without trapping a panic.
        let c = MutterCompositor::new();
        assert!(c.state().is_none());
    }

    #[test]
    fn test_parse_resolution_accepts_hd() {
        assert_eq!(parse_resolution("1920x1080").unwrap(), (1920, 1080));
        assert_eq!(parse_resolution("1024x768").unwrap(), (1024, 768));
    }

    #[test]
    fn test_parse_resolution_rejects_garbage() {
        for bad in [
            "",
            "1920",
            "1920x",
            "x1080",
            "0x0",
            "1920x0",
            "0x1080",
            "1920x1080x1",
            "abcxdef",
            "-1x1080",
            "1920 x 1080",
        ] {
            assert!(parse_resolution(bad).is_err(), "expected error for {bad:?}");
        }
    }

    #[test]
    fn test_default_same_structure_as_new() {
        let c = MutterCompositor::default();
        assert!(c.wayland_display().starts_with("wayland-wd-"));
        assert!(c.runtime_dir().to_str().unwrap().contains("wd-session-"));
    }
}