Skip to main content

waydriver_compositor_mutter/
lib.rs

1//! Mutter implementation of [`waydriver::CompositorRuntime`].
2//!
3//! Owns the private-bus `dbus-daemon`, the `pipewire` + `wireplumber` pair,
4//! and a headless `mutter --wayland` instance. After [`MutterCompositor::start`]
5//! returns, [`MutterCompositor::state`] exposes an `Arc<MutterState>` that
6//! sibling backends (`waydriver-input-mutter`, `waydriver-capture-mutter`) use
7//! to talk to the same mutter D-Bus session.
8//!
9//! ## Shared-state invariant
10//!
11//! While any `Arc<MutterState>` exists, the mutter child processes and the
12//! private D-Bus connection MUST remain alive. [`waydriver::Session::kill`]
13//! enforces this by dropping input and capture trait objects before calling
14//! `compositor.stop().await`.
15
16mod error;
17
18use std::collections::HashMap;
19use std::path::{Path, PathBuf};
20use std::process::Stdio;
21use std::sync::{Arc, LazyLock, Mutex};
22
23use async_trait::async_trait;
24use tokio::process::{Child, Command};
25use zbus::zvariant::OwnedValue;
26
27use waydriver::gsettings::{self, GSettingEntry, GSettingsConfig};
28use waydriver::{CompositorRuntime, Result};
29
30use crate::error::MutterError;
31
32/// Default virtual-monitor geometry passed to mutter when the caller doesn't
33/// override it. Matches mutter's own implicit default.
34const DEFAULT_RESOLUTION: &str = "1024x768";
35
36/// Default logical-monitor scale: 1:1, i.e. `resolution` pixels are also the
37/// logical (application) size. Any other value drives the HiDPI path in
38/// [`apply_scale`].
39const DEFAULT_SCALE: f64 = 1.0;
40
41/// GVariant-text value seeded into `org.gnome.mutter experimental-features`
42/// when GSettings isolation is on. `scale-monitor-framebuffer` switches the
43/// native headless backend to logical layout mode, which is what makes
44/// fractional scales (1.5, 1.75, …) appear in a mode's `supported-scales` and
45/// be accepted by `ApplyMonitorsConfig`. Harmless at integer/1.0 scales.
46const MUTTER_FRACTIONAL_SCALING: &str = "['scale-monitor-framebuffer']";
47
48/// Accepted scale range. Below 0.5 the UI is unusably small; above 4.0 mutter
49/// won't offer the scale for any virtual-monitor mode we'd create. Validated
50/// up-front so a typo fails before we spawn any subprocess.
51const MIN_SCALE: f64 = 0.5;
52const MAX_SCALE: f64 = 4.0;
53
54/// How far a requested scale may sit from the nearest mutter-supported scale
55/// before we log a warning about snapping to it. Mutter only accepts scales it
56/// lists in a mode's `supported-scales`, so an exact arbitrary value (e.g.
57/// 1.66) may be nudged to the closest legal step.
58const SCALE_SNAP_TOLERANCE: f64 = 0.01;
59
60// ── DisplayConfig D-Bus shapes ───────────────────────────────────────────────
61//
62// Type aliases mirroring the `org.gnome.Mutter.DisplayConfig` wire types so
63// `body().deserialize::<CurrentState>()` validates the reply against the exact
64// signature mutter sends. The `a{sv}` property dicts are kept as
65// `HashMap<String, OwnedValue>` (signature `a{sv}`) and ignored — we only need
66// the connector, mode id, and supported-scales list.
67
68/// `a{sv}` — a D-Bus property dict.
69type DbusProps = HashMap<String, OwnedValue>;
70/// `(siiddada{sv})` — one monitor mode: id, width, height, refresh, preferred
71/// scale, supported scales, properties.
72type MonitorMode = (String, i32, i32, f64, f64, Vec<f64>, DbusProps);
73/// `(ssss)` — connector, vendor, product, serial.
74type MonitorSpec = (String, String, String, String);
75/// `((ssss)a(siiddada{sv})a{sv})` — one physical monitor: spec, modes, props.
76type PhysicalMonitor = (MonitorSpec, Vec<MonitorMode>, DbusProps);
77/// `(iiduba(ssss)a{sv})` — one logical monitor in the current layout.
78type LogicalMonitor = (i32, i32, f64, u32, bool, Vec<MonitorSpec>, DbusProps);
79/// Return tuple of `GetCurrentState`: `(serial, monitors, logical, props)`.
80type CurrentState = (u32, Vec<PhysicalMonitor>, Vec<LogicalMonitor>, DbusProps);
81
82/// `(ssa{sv})` — one monitor assignment in an `ApplyMonitorsConfig` request:
83/// connector, mode id, properties.
84type MonitorAssignment = (String, String, DbusProps);
85/// `(iiduba(ssa{sv}))` — one logical monitor to apply: x, y, scale, transform,
86/// primary, assigned monitors.
87type LogicalMonitorConfig = (i32, i32, f64, u32, bool, Vec<MonitorAssignment>);
88
89/// Shared mutter-backend state consumed by `waydriver-input-mutter` and
90/// `waydriver-capture-mutter`.
91///
92/// **Invariant:** while any `Arc<MutterState>` exists, the underlying D-Bus
93/// connection and the mutter child process must remain alive. See the
94/// module docs for details.
95///
96/// Fields are private — all access goes through the accessor methods
97/// below. Sibling crates (`waydriver-input-mutter`,
98/// `waydriver-capture-mutter`) that previously read fields directly
99/// now call `state.conn()`, `state.rd_session_path()`, etc. The
100/// shape of the underlying storage (e.g. how `active_stream_path` is
101/// guarded) is therefore an implementation detail that can change
102/// without breaking those callers — the contract lives entirely in
103/// the method signatures.
104pub struct MutterState {
105    conn: zbus::Connection,
106    rd_session_path: String,
107    rd_session_id: String,
108    rd_started: Arc<Mutex<bool>>,
109    runtime_dir: PathBuf,
110    active_stream_path: Arc<Mutex<Option<String>>>,
111}
112
113impl MutterState {
114    /// Persistent connection to mutter's private D-Bus.
115    ///
116    /// Both sibling backends (`waydriver-input-mutter`,
117    /// `waydriver-capture-mutter`) issue all their RemoteDesktop and
118    /// ScreenCast method calls through this connection.
119    pub fn conn(&self) -> &zbus::Connection {
120        &self.conn
121    }
122
123    /// RemoteDesktop session object path. Used by
124    /// `waydriver-input-mutter` as the `path` argument on every
125    /// pointer / keyboard `Notify*` D-Bus call.
126    pub fn rd_session_path(&self) -> &str {
127        &self.rd_session_path
128    }
129
130    /// RemoteDesktop session id, read from the `SessionId` property on
131    /// the RD session. `waydriver-capture-mutter` passes this as the
132    /// `remote-desktop-session-id` option to
133    /// `ScreenCast.CreateSession` so mutter links the two; the link is
134    /// required for `NotifyPointerMotionAbsolute` to be accepted.
135    pub fn rd_session_id(&self) -> &str {
136        &self.rd_session_id
137    }
138
139    /// Per-session `XDG_RUNTIME_DIR`. `waydriver-capture-mutter` joins
140    /// this with `pipewire-0` to locate the PipeWire socket.
141    pub fn runtime_dir(&self) -> &Path {
142        &self.runtime_dir
143    }
144
145    /// Lock the "RD-started" flag.
146    ///
147    /// Acquires the underlying mutex and returns the guard so the
148    /// caller can perform a check-and-set under one critical section
149    /// (the capture backend defers `RD.Session.Start` until the first
150    /// linked `ScreenCast.CreateSession` succeeds — that's a load,
151    /// some D-Bus work, and a store; splitting the read and write
152    /// would race). `Error::Process` if the mutex is poisoned.
153    pub fn rd_started_lock(&self) -> Result<std::sync::MutexGuard<'_, bool>> {
154        self.rd_started
155            .lock()
156            .map_err(|_| waydriver::Error::process("rd_started mutex poisoned"))
157    }
158
159    /// Lock the active ScreenCast Stream object path.
160    ///
161    /// Set by `waydriver-capture-mutter` in `start_stream`, cleared in
162    /// `stop_stream`. `waydriver-input-mutter` reads it to route
163    /// `NotifyPointerMotionAbsolute` at the correct monitor. `None`
164    /// inside the guard means no stream is open — absolute pointer
165    /// motion will error.
166    pub fn active_stream_path_lock(&self) -> Result<std::sync::MutexGuard<'_, Option<String>>> {
167        self.active_stream_path
168            .lock()
169            .map_err(|_| waydriver::Error::process("active_stream_path mutex poisoned"))
170    }
171}
172
173/// Headless mutter instance.
174pub struct MutterCompositor {
175    id: String,
176    wayland_display: String,
177    runtime_dir: PathBuf,
178    mutter_dbus_address: String,
179    mutter_dbus_pid: Option<u32>,
180    mutter: Option<Child>,
181    pipewire: Option<Child>,
182    wireplumber: Option<Child>,
183    state: Option<Arc<MutterState>>,
184    gsettings: GSettingsConfig,
185}
186
187/// The host runtime root under which every session's `wd-session-<id>`
188/// directory is created. Snapshotted once, lazily, on the first
189/// `MutterCompositor::new()` call.
190///
191/// This is deliberately read **once** and cached, rather than re-read from
192/// `XDG_RUNTIME_DIR` per session. `waydriver`'s screenshot and video pipelines
193/// (`waydriver::capture`) mutate the parent process's `XDG_RUNTIME_DIR` to
194/// point `pipewiresrc` at the *live* session's pipewire socket, and never
195/// restore it. If `new()` re-read the live env each time, session N+1's
196/// runtime dir would be created **inside** session N's dir
197/// (`…/wd-session-A/wd-session-B/…`), nesting one level deeper per session.
198/// After ~4 levels the `<dir>/pipewire-0` path exceeds the ~107-byte AF_UNIX
199/// `sun_path` limit, pipewire can no longer bind its socket, and every
200/// subsequent `start_session` fails with a "timeout: pipewire socket" error
201/// until the server is restarted (which resets the process env). Snapshotting
202/// the root keeps each session dir a flat sibling under the original
203/// `XDG_RUNTIME_DIR`, independent of how many sessions preceded it.
204///
205/// The first `new()` runs before any session exists, so the env is still the
206/// pristine value set by the launcher (e.g. the Docker entrypoint) — capturing
207/// it then is safe.
208static HOST_RUNTIME_ROOT: LazyLock<PathBuf> = LazyLock::new(|| {
209    let root = std::env::var("XDG_RUNTIME_DIR")
210        .unwrap_or_else(|_| format!("/run/user/{}", unsafe { libc::getuid() }));
211    PathBuf::from(root)
212});
213
214impl MutterCompositor {
215    /// Construct but do not start. Generates the session id and computes
216    /// where the Wayland socket and runtime dir will live. No I/O.
217    pub fn new() -> Self {
218        let id = uuid::Uuid::new_v4().to_string()[..8].to_string();
219        let wayland_display = format!("wayland-wd-{}", id);
220
221        let runtime_dir = HOST_RUNTIME_ROOT.join(format!("wd-session-{}", id));
222
223        Self {
224            id,
225            wayland_display,
226            runtime_dir,
227            mutter_dbus_address: String::new(),
228            mutter_dbus_pid: None,
229            mutter: None,
230            pipewire: None,
231            wireplumber: None,
232            state: None,
233            gsettings: GSettingsConfig::default(),
234        }
235    }
236
237    /// Set the per-session GSettings isolation config (see
238    /// [`waydriver::gsettings`]). Defaults to isolated with no seeded entries.
239    /// When isolated, [`start`](Self::start) writes a private keyfile (seeded
240    /// with `org.gnome.mutter experimental-features` plus `config.initial`)
241    /// and points mutter at it, so fractional scales work and the host's dconf
242    /// is neither read nor written. Pass `isolated: false` to run mutter
243    /// against the host's GSettings instead.
244    pub fn with_gsettings(mut self, config: GSettingsConfig) -> Self {
245        self.gsettings = config;
246        self
247    }
248
249    /// Returns the shared `Arc<MutterState>` for passing to sibling
250    /// backends, or `None` when called outside the started window.
251    ///
252    /// `None` is returned when:
253    /// - `start()` has not yet completed (or returned an error), or
254    /// - `stop()` has been called and dropped the state.
255    ///
256    /// Callers that have just awaited `start()?` know the state is
257    /// present — `expect()` or `?`-with-typed-error is appropriate
258    /// there. Returning `Option` instead of panicking keeps the API
259    /// honest about the lifecycle and lets callers detect "stopped"
260    /// without first matching on a panic.
261    pub fn state(&self) -> Option<Arc<MutterState>> {
262        self.state.clone()
263    }
264}
265
266impl Default for MutterCompositor {
267    fn default() -> Self {
268        Self::new()
269    }
270}
271
272impl MutterCompositor {
273    /// Typed-error implementation of `start`. The trait method calls
274    /// this and converts the result via `From<MutterError>`.
275    ///
276    /// Steps (each fails with a specific `MutterError` variant):
277    /// 1. validate resolution + scale,
278    /// 2. ensure the session runtime dir exists,
279    /// 3. spawn a private `dbus-daemon` and parse its address + PID,
280    /// 4. spawn `pipewire` + `wireplumber` on that bus,
281    /// 5. spawn headless `mutter --wayland`,
282    /// 6. wait for the Wayland socket,
283    /// 7. open a zbus connection, retry-create the RemoteDesktop session,
284    /// 8. read its `SessionId` property,
285    /// 9. apply a non-default logical-monitor scale via DisplayConfig,
286    /// 10. publish the `Arc<MutterState>` for sibling backends.
287    async fn start_inner(
288        &mut self,
289        resolution: Option<&str>,
290        scale: Option<f64>,
291    ) -> std::result::Result<(), MutterError> {
292        let resolution = resolution.unwrap_or(DEFAULT_RESOLUTION);
293        // Validate before we start spawning subprocesses — mutter silently
294        // ignores bad --virtual-monitor values and falls back to its own
295        // default, which would surprise the caller.
296        parse_resolution(resolution)?;
297        let scale = scale.unwrap_or(DEFAULT_SCALE);
298        // Fail fast on a nonsense scale too, for the same reason — the
299        // DisplayConfig apply that consumes it doesn't run until mutter is up.
300        validate_scale(scale)?;
301
302        tracing::info!(
303            id = self.id,
304            resolution,
305            scale,
306            isolated = self.gsettings.isolated,
307            "starting mutter compositor"
308        );
309
310        tokio::fs::create_dir_all(&self.runtime_dir).await?;
311        // `runtime_dir` is built in `new()` from a UTF-8 String
312        // (XDG_RUNTIME_DIR or `/run/user/<uid>`) joined with a UTF-8
313        // ASCII session id, so the path is guaranteed valid UTF-8.
314        // `expect` documents that invariant rather than re-deriving
315        // it via the `to_str()` `Option`.
316        let runtime_str = self
317            .runtime_dir
318            .to_str()
319            .expect("invariant: runtime_dir built from UTF-8 inputs in new()")
320            .to_string();
321
322        // GSettings isolation: when on, write the session's private keyfile
323        // (read by both mutter and the app — see `waydriver::gsettings`) and
324        // compute the env that points mutter at it. The keyfile is seeded with
325        // the fractional-scaling experimental feature so a non-integer `scale`
326        // is actually advertised by mutter, then any caller-supplied entries
327        // are appended (last-wins, so callers can override). When off, mutter
328        // reads the host's GSettings and `config_env` stays empty.
329        let config_env: Vec<(&str, String)> = if self.gsettings.isolated {
330            let mut entries = vec![GSettingEntry::new(
331                "org.gnome.mutter",
332                "experimental-features",
333                MUTTER_FRACTIONAL_SCALING,
334            )];
335            entries.extend(self.gsettings.initial.iter().cloned());
336            gsettings::write_keyfile(&self.runtime_dir, &entries)?;
337            let config_dir = gsettings::config_dir(&self.runtime_dir)
338                .to_str()
339                .expect("invariant: config_dir is runtime_dir (UTF-8) + ASCII suffix")
340                .to_string();
341            vec![
342                ("XDG_CONFIG_HOME", config_dir),
343                ("GSETTINGS_BACKEND", gsettings::KEYFILE_BACKEND.to_string()),
344            ]
345        } else {
346            Vec::new()
347        };
348
349        // Step 1: Private D-Bus for mutter (so its ScreenCast API doesn't conflict with host).
350        let dbus_output = Command::new("dbus-launch")
351            .arg("--sh-syntax")
352            .output()
353            .await?;
354        if !dbus_output.status.success() {
355            return Err(MutterError::DbusLaunchFailed(
356                String::from_utf8_lossy(&dbus_output.stderr).into_owned(),
357            ));
358        }
359        let dbus_stdout = String::from_utf8_lossy(&dbus_output.stdout);
360        self.mutter_dbus_address = parse_dbus_address(&dbus_stdout)?;
361        self.mutter_dbus_pid = Some(parse_dbus_pid(&dbus_stdout)?);
362        tracing::debug!(id = self.id, mutter_dbus_address = %self.mutter_dbus_address, "private D-Bus for mutter");
363
364        // Step 2: PipeWire + WirePlumber (for screenshots via ScreenCast).
365        //
366        // `env_remove("PIPEWIRE_REMOTE")` is load-bearing: `waydriver`'s
367        // `grab_png_sync` mutates the parent's process env to point
368        // `pipewiresrc` at the live session's pipewire socket. After a
369        // session stops, that socket is gone but the env var lingers in
370        // the parent. Without scrubbing it here, a freshly spawned
371        // `pipewire`/`wireplumber`/`mutter` for the next session would
372        // inherit the stale value and try to connect to the previous
373        // session's dead socket — wireplumber/mutter prefer
374        // `PIPEWIRE_REMOTE` over `XDG_RUNTIME_DIR/pipewire-0`, so the
375        // explicit `XDG_RUNTIME_DIR` override below isn't enough.
376        // Symptom: `ScreenCast.Start` fails with "Couldn't connect
377        // pipewire context" on every session after the first.
378        let pipewire = Command::new("pipewire")
379            .env_remove("PIPEWIRE_REMOTE")
380            .env("DBUS_SESSION_BUS_ADDRESS", &self.mutter_dbus_address)
381            .env("XDG_RUNTIME_DIR", &runtime_str)
382            .stdout(Stdio::null())
383            .stderr(Stdio::null())
384            .spawn()
385            .map_err(|source| MutterError::Spawn {
386                process: "pipewire",
387                source,
388            })?;
389        self.pipewire = Some(pipewire);
390
391        // Wait for pipewire's socket to appear before launching
392        // wireplumber. Polling for the socket file is the same
393        // readiness signal `wait_for_wayland_socket` uses for
394        // mutter: it's the actual handshake clients use, so any
395        // earlier signal would either be racier (process spawn) or
396        // just as expensive to probe.
397        wait_for_pipewire_socket(&runtime_str).await?;
398
399        let wireplumber = Command::new("wireplumber")
400            .env_remove("PIPEWIRE_REMOTE")
401            .env("DBUS_SESSION_BUS_ADDRESS", &self.mutter_dbus_address)
402            .env("XDG_RUNTIME_DIR", &runtime_str)
403            .stdout(Stdio::null())
404            .stderr(Stdio::null())
405            .spawn()
406            .map_err(|source| MutterError::Spawn {
407                process: "wireplumber",
408                source,
409            })?;
410        self.wireplumber = Some(wireplumber);
411
412        // No bus-readiness signal poll for wireplumber: it's a
413        // session-policy daemon that doesn't register a stable D-Bus
414        // name we can probe, and its initialisation runs in parallel
415        // with mutter's own startup. The downstream
416        // `ScreenCast.CreateSession` retry loop in
417        // `waydriver-capture-mutter::start_stream` is what actually
418        // gates on wireplumber having joined the graph — putting a
419        // pessimistic sleep here as well would add startup latency
420        // without changing correctness.
421        tracing::debug!(id = self.id, "PipeWire + WirePlumber started");
422
423        // Step 3: mutter in headless Wayland mode (on its private D-Bus).
424        let mutter = Command::new("mutter")
425            .args([
426                "--headless",
427                "--wayland",
428                "--no-x11",
429                "--wayland-display",
430                &self.wayland_display,
431                "--virtual-monitor",
432                resolution,
433            ])
434            .env_remove("PIPEWIRE_REMOTE")
435            .env("DBUS_SESSION_BUS_ADDRESS", &self.mutter_dbus_address)
436            .env("XDG_RUNTIME_DIR", &runtime_str)
437            // Empty when isolation is off; otherwise points mutter at the
438            // per-session keyfile GSettings store written above.
439            .envs(config_env.iter().map(|(k, v)| (*k, v.as_str())))
440            .stdout(Stdio::null())
441            .stderr(Stdio::inherit())
442            .spawn()
443            .map_err(|source| MutterError::Spawn {
444                process: "mutter",
445                source,
446            })?;
447        self.mutter = Some(mutter);
448        tracing::debug!(id = self.id, wayland_display = %self.wayland_display, "mutter spawned");
449
450        // Step 4: Wait for the Wayland socket.
451        wait_for_wayland_socket(&runtime_str, &self.wayland_display).await?;
452        tracing::debug!(id = self.id, "wayland socket ready");
453
454        // Step 5: Connect to mutter's private D-Bus and start RemoteDesktop session.
455        let mutter_addr: zbus::address::Address = self
456            .mutter_dbus_address
457            .as_str()
458            .try_into()
459            .map_err(|source: zbus::Error| MutterError::DbusAddressInvalid {
460                addr: self.mutter_dbus_address.clone(),
461                source,
462            })?;
463        let mutter_conn = zbus::connection::Builder::address(mutter_addr)
464            .map_err(|source| MutterError::DbusConnect {
465                stage: "build connection builder",
466                source,
467            })?
468            .build()
469            .await
470            .map_err(|source| MutterError::DbusConnect {
471                stage: "connect",
472                source,
473            })?;
474
475        // Wait for mutter to register its D-Bus services (may take a moment after socket appears)
476        let mut rd_reply = None;
477        for i in 0..50 {
478            match mutter_conn
479                .call_method(
480                    Some("org.gnome.Mutter.RemoteDesktop"),
481                    "/org/gnome/Mutter/RemoteDesktop",
482                    Some("org.gnome.Mutter.RemoteDesktop"),
483                    "CreateSession",
484                    &(),
485                )
486                .await
487            {
488                Ok(reply) => {
489                    rd_reply = Some(reply);
490                    break;
491                }
492                Err(e) if i < 49 => {
493                    tracing::debug!(
494                        id = self.id,
495                        attempt = i,
496                        "waiting for RemoteDesktop service: {e}"
497                    );
498                    tokio::time::sleep(std::time::Duration::from_millis(100)).await;
499                }
500                Err(e) => {
501                    return Err(MutterError::RemoteDesktopCreate(e));
502                }
503            }
504        }
505        // The retry loop above either `break`s with `rd_reply = Some(_)`
506        // or returns `Err(...)` from the final attempt — `unwrap` here
507        // is unreachable by construction.
508        let rd_reply = rd_reply.expect("retry loop sets Some on break or returns Err");
509        let rd_session_path: zbus::zvariant::OwnedObjectPath = rd_reply
510            .body()
511            .deserialize()
512            .map_err(MutterError::RdSessionPathParse)?;
513        // Intentionally do NOT call `RemoteDesktop.Session.Start` here.
514        // Mutter only accepts `remote-desktop-session-id` on
515        // `ScreenCast.CreateSession` when the RD session is not yet
516        // started, so `waydriver-capture-mutter::start_stream` defers
517        // the Start call until after it has created the linked
518        // ScreenCast session.
519        // Read the RD session's `SessionId` property — it's the token
520        // ScreenCast.CreateSession needs in `remote-desktop-session-id`
521        // to link the two sessions. Without that link, mutter rejects
522        // NotifyPointerMotionAbsolute with "No screen cast active".
523        let rd_session_id_reply = mutter_conn
524            .call_method(
525                Some("org.gnome.Mutter.RemoteDesktop"),
526                rd_session_path.as_str(),
527                Some("org.freedesktop.DBus.Properties"),
528                "Get",
529                &("org.gnome.Mutter.RemoteDesktop.Session", "SessionId"),
530            )
531            .await
532            .map_err(MutterError::SessionIdGet)?;
533        // `Get` returns a variant; deserialize as `OwnedValue` to detach
534        // the string from the reply's body before the reply is dropped.
535        let rd_session_id_body = rd_session_id_reply.body();
536        let rd_session_id_variant: zbus::zvariant::OwnedValue = rd_session_id_body
537            .deserialize()
538            .map_err(MutterError::SessionIdVariantParse)?;
539        let rd_session_id: String = rd_session_id_variant
540            .try_into()
541            .map_err(MutterError::SessionIdNotString)?;
542
543        let rd_session_path = rd_session_path.to_string();
544        tracing::debug!(
545            id = self.id,
546            rd_session_path = %rd_session_path,
547            rd_session_id = %rd_session_id,
548            "RemoteDesktop session started"
549        );
550
551        // Step: apply a non-default logical-monitor scale. `--virtual-monitor`
552        // has no scale component, so HiDPI is configured here over mutter's
553        // private bus once DisplayConfig is up. Skipped at 1.0 to leave the
554        // default 1:1 path completely untouched.
555        if (scale - DEFAULT_SCALE).abs() > f64::EPSILON {
556            let applied = apply_scale(&mutter_conn, scale, &self.id).await?;
557            tracing::info!(
558                id = self.id,
559                requested = scale,
560                applied,
561                "applied logical-monitor scale"
562            );
563        }
564
565        self.state = Some(Arc::new(MutterState {
566            conn: mutter_conn,
567            rd_session_path,
568            rd_session_id,
569            rd_started: Arc::new(Mutex::new(false)),
570            runtime_dir: self.runtime_dir.clone(),
571            active_stream_path: Arc::new(Mutex::new(None)),
572        }));
573
574        Ok(())
575    }
576}
577
578#[async_trait]
579impl CompositorRuntime for MutterCompositor {
580    async fn start(&mut self, resolution: Option<&str>, scale: Option<f64>) -> Result<()> {
581        // Body uses the crate-local typed `MutterError`. The `?` at the
582        // end of `self.start_inner(...).await?` runs the
583        // `From<MutterError> for waydriver::Error` impl in `error.rs`,
584        // which is the single boundary at which the typed enum becomes
585        // the workspace's shared `waydriver::Error`.
586        Ok(self.start_inner(resolution, scale).await?)
587    }
588
589    async fn stop(&mut self) -> Result<()> {
590        tracing::info!(id = self.id, "stopping mutter compositor");
591
592        // Stop RemoteDesktop session if still reachable. We could
593        // touch the private fields directly here (same crate), but
594        // routing through the public accessors keeps the contract
595        // visible and means a future change to the field layout
596        // doesn't need to update this site.
597        if let Some(state) = &self.state {
598            let _ = state
599                .conn()
600                .call_method(
601                    Some("org.gnome.Mutter.RemoteDesktop"),
602                    state.rd_session_path(),
603                    Some("org.gnome.Mutter.RemoteDesktop.Session"),
604                    "Stop",
605                    &(),
606                )
607                .await;
608        }
609
610        // Drop our strong ref to the shared state. If callers haven't dropped
611        // theirs (the input/capture trait objects), their Arc still points at
612        // the D-Bus connection we're about to tear down below — any method
613        // call on them after this will fail with "connection closed".
614        self.state = None;
615
616        if let Some(mut mutter) = self.mutter.take() {
617            let _ = mutter.kill().await;
618            let _ = mutter.wait().await;
619        }
620        if let Some(mut wireplumber) = self.wireplumber.take() {
621            let _ = wireplumber.kill().await;
622            let _ = wireplumber.wait().await;
623        }
624        if let Some(mut pipewire) = self.pipewire.take() {
625            let _ = pipewire.kill().await;
626            let _ = pipewire.wait().await;
627        }
628
629        if let Some(pid) = self.mutter_dbus_pid.take() {
630            unsafe {
631                libc::kill(pid as i32, libc::SIGTERM);
632            }
633        }
634
635        let _ = tokio::fs::remove_dir_all(&self.runtime_dir).await;
636
637        tracing::debug!(id = self.id, "mutter compositor stopped");
638        Ok(())
639    }
640
641    fn id(&self) -> &str {
642        &self.id
643    }
644
645    fn wayland_display(&self) -> &str {
646        &self.wayland_display
647    }
648
649    fn runtime_dir(&self) -> &Path {
650        &self.runtime_dir
651    }
652}
653
654impl Drop for MutterCompositor {
655    fn drop(&mut self) {
656        // Best-effort cleanup when dropped without calling stop().
657        // Can't use async here, so send SIGKILL synchronously.
658        self.state = None;
659
660        if let Some(ref mut child) = self.mutter {
661            let _ = child.start_kill();
662        }
663        if let Some(ref mut child) = self.wireplumber {
664            let _ = child.start_kill();
665        }
666        if let Some(ref mut child) = self.pipewire {
667            let _ = child.start_kill();
668        }
669        if let Some(pid) = self.mutter_dbus_pid {
670            unsafe {
671                libc::kill(pid as i32, libc::SIGKILL);
672            }
673        }
674        let _ = std::fs::remove_dir_all(&self.runtime_dir);
675    }
676}
677
678// ── Helpers ─────────────────────────────────────────────────────────────────
679
680fn parse_dbus_address(output: &str) -> std::result::Result<String, MutterError> {
681    for line in output.lines() {
682        if let Some(rest) = line.strip_prefix("DBUS_SESSION_BUS_ADDRESS='") {
683            if let Some(addr) = rest.strip_suffix("';") {
684                return Ok(addr.to_string());
685            }
686        }
687    }
688    Err(MutterError::DbusOutputMissingField {
689        field: "DBUS_SESSION_BUS_ADDRESS",
690    })
691}
692
693fn parse_dbus_pid(output: &str) -> std::result::Result<u32, MutterError> {
694    for line in output.lines() {
695        if let Some(rest) = line.strip_prefix("DBUS_SESSION_BUS_PID=") {
696            let pid_str = rest.trim_end_matches(';').trim();
697            return pid_str.parse().map_err(MutterError::DbusPidParse);
698        }
699    }
700    Err(MutterError::DbusOutputMissingField {
701        field: "DBUS_SESSION_BUS_PID",
702    })
703}
704
705fn parse_resolution(s: &str) -> std::result::Result<(u32, u32), MutterError> {
706    let invalid = || MutterError::ResolutionInvalid {
707        value: s.to_string(),
708    };
709    let (w, h) = s.split_once('x').ok_or_else(invalid)?;
710    let parse = |part: &str| -> std::result::Result<u32, MutterError> {
711        part.parse::<u32>()
712            .ok()
713            .filter(|n| *n > 0)
714            .ok_or_else(invalid)
715    };
716    Ok((parse(w)?, parse(h)?))
717}
718
719/// Reject a scale that isn't a finite, positive factor inside
720/// [`MIN_SCALE`]..=[`MAX_SCALE`]. Run before any subprocess spawns so a bad
721/// value fails fast.
722fn validate_scale(scale: f64) -> std::result::Result<(), MutterError> {
723    if scale.is_finite() && (MIN_SCALE..=MAX_SCALE).contains(&scale) {
724        Ok(())
725    } else {
726        Err(MutterError::ScaleInvalid {
727            value: scale,
728            min: MIN_SCALE,
729            max: MAX_SCALE,
730        })
731    }
732}
733
734/// Pick the entry of `supported` closest to `requested`. Mutter only accepts
735/// scales it advertises for a mode, so an arbitrary request (e.g. 1.66) is
736/// snapped to the nearest legal step. Falls back to `requested` when the list
737/// is empty (mutter then validates — and likely rejects — it).
738fn nearest_supported_scale(requested: f64, supported: &[f64]) -> f64 {
739    supported
740        .iter()
741        .copied()
742        .min_by(|a, b| {
743            let da = (a - requested).abs();
744            let db = (b - requested).abs();
745            da.partial_cmp(&db).unwrap_or(std::cmp::Ordering::Equal)
746        })
747        .unwrap_or(requested)
748}
749
750/// Apply `requested` as the logical-monitor scale of the (single) virtual
751/// monitor via `org.gnome.Mutter.DisplayConfig`, returning the scale mutter
752/// actually accepted (snapped to a supported step).
753///
754/// Reads `GetCurrentState` fresh each call so the `serial` is current —
755/// mutter rejects `ApplyMonitorsConfig` on a stale serial, and the serial
756/// bumps when the virtual monitor first appears. Fractional scales (1.5,
757/// 1.75, …) require the `scale-monitor-framebuffer` experimental feature; the
758/// container entrypoint enables it. Without it only integer scales are
759/// advertised, so [`nearest_supported_scale`] would snap a fractional request
760/// to 1.0 or 2.0.
761async fn apply_scale(
762    conn: &zbus::Connection,
763    requested: f64,
764    id: &str,
765) -> std::result::Result<f64, MutterError> {
766    let state_reply = conn
767        .call_method(
768            Some("org.gnome.Mutter.DisplayConfig"),
769            "/org/gnome/Mutter/DisplayConfig",
770            Some("org.gnome.Mutter.DisplayConfig"),
771            "GetCurrentState",
772            &(),
773        )
774        .await
775        .map_err(|source| MutterError::DisplayConfigState {
776            stage: "call",
777            source,
778        })?;
779    let state_body = state_reply.body();
780    let (serial, monitors, _logical, _props): CurrentState =
781        state_body
782            .deserialize()
783            .map_err(|source| MutterError::DisplayConfigState {
784                stage: "deserialize",
785                source,
786            })?;
787
788    // Headless mutter started with a single `--virtual-monitor` exposes
789    // exactly one monitor advertising exactly the mode we asked for, so the
790    // first monitor / first mode is the one to scale.
791    let (spec, modes, _mprops) = monitors
792        .into_iter()
793        .next()
794        .ok_or(MutterError::DisplayConfigNoMonitor)?;
795    let connector = spec.0;
796    let (mode_id, _w, _h, _refresh, _preferred, supported, _modeprops) =
797        modes
798            .into_iter()
799            .next()
800            .ok_or(MutterError::DisplayConfigNoMonitor)?;
801
802    let applied = nearest_supported_scale(requested, &supported);
803    if (applied - requested).abs() > SCALE_SNAP_TOLERANCE {
804        tracing::warn!(
805            id,
806            requested,
807            applied,
808            supported = ?supported,
809            "requested scale not advertised by mutter; snapped to nearest supported"
810        );
811    }
812
813    // (x, y, scale, transform, primary, [(connector, mode_id, {})]).
814    let logical: LogicalMonitorConfig = (
815        0,
816        0,
817        applied,
818        0,
819        true,
820        vec![(connector, mode_id, DbusProps::new())],
821    );
822    // method 1 = temporary: applies for this session without writing
823    // ~/.config/monitors.xml, which is all a throwaway headless run needs.
824    conn.call_method(
825        Some("org.gnome.Mutter.DisplayConfig"),
826        "/org/gnome/Mutter/DisplayConfig",
827        Some("org.gnome.Mutter.DisplayConfig"),
828        "ApplyMonitorsConfig",
829        &(serial, 1u32, vec![logical], DbusProps::new()),
830    )
831    .await
832    .map_err(|source| MutterError::DisplayConfigApply {
833        scale: applied,
834        source,
835    })?;
836
837    Ok(applied)
838}
839
840async fn wait_for_wayland_socket(
841    runtime_dir: &str,
842    display: &str,
843) -> std::result::Result<(), MutterError> {
844    let socket_path = PathBuf::from(runtime_dir).join(display);
845    for _ in 0..50 {
846        if socket_path.exists() {
847            return Ok(());
848        }
849        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
850    }
851    Err(MutterError::WaylandSocketTimeout {
852        socket: socket_path.display().to_string(),
853    })
854}
855
856/// PipeWire creates `<runtime_dir>/pipewire-0` as soon as it's ready
857/// to accept client connections. Polling for that file replaces the
858/// previous unconditional `sleep(1s)` after spawning the pipewire
859/// process — same readiness model as
860/// [`wait_for_wayland_socket`].
861async fn wait_for_pipewire_socket(runtime_dir: &str) -> std::result::Result<(), MutterError> {
862    let socket_path = PathBuf::from(runtime_dir).join("pipewire-0");
863    for _ in 0..50 {
864        if socket_path.exists() {
865            return Ok(());
866        }
867        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
868    }
869    Err(MutterError::PipewireSocketTimeout {
870        socket: socket_path.display().to_string(),
871    })
872}
873
874#[cfg(test)]
875mod tests {
876    use super::*;
877
878    /// Live end-to-end check that a fractional scale is actually applied by a
879    /// real mutter. Requires the runtime stack (mutter, pipewire, wireplumber,
880    /// dbus-launch) on `PATH` plus the GSettings schemas in `XDG_DATA_DIRS`, so
881    /// it's `#[ignore]`d by default; run with the dev shell's env via
882    /// `cargo test -p waydriver-compositor-mutter -- --ignored`.
883    ///
884    /// This exercises the whole chain at once: the per-session keyfile must
885    /// enable `scale-monitor-framebuffer` (otherwise 1.5 is not advertised and
886    /// would snap to 1.0/2.0), and `apply_scale` must drive DisplayConfig
887    /// correctly. We read the scale straight back from `GetCurrentState`.
888    #[tokio::test]
889    #[ignore = "requires a live mutter/pipewire/dbus runtime stack"]
890    async fn applies_fractional_scale_against_real_mutter() {
891        let mut compositor = MutterCompositor::new();
892        compositor
893            .start(Some("1920x1080"), Some(1.5))
894            .await
895            .expect("compositor should start");
896        let state = compositor.state().expect("state present after start");
897
898        let reply = state
899            .conn()
900            .call_method(
901                Some("org.gnome.Mutter.DisplayConfig"),
902                "/org/gnome/Mutter/DisplayConfig",
903                Some("org.gnome.Mutter.DisplayConfig"),
904                "GetCurrentState",
905                &(),
906            )
907            .await
908            .expect("GetCurrentState should succeed");
909        let body = reply.body();
910        let (_serial, _monitors, logical, _props): CurrentState = body
911            .deserialize()
912            .expect("GetCurrentState body should deserialize");
913        let applied = logical.first().expect("at least one logical monitor").2;
914
915        compositor.stop().await.expect("compositor should stop");
916
917        assert!(
918            (applied - 1.5).abs() < 0.01,
919            "expected logical scale ~1.5 (fractional scaling enabled via keyfile), got {applied}"
920        );
921    }
922
923    #[test]
924    fn test_parse_dbus_address_valid() {
925        let output = "DBUS_SESSION_BUS_ADDRESS='unix:abstract=/tmp/dbus-XXX,guid=abc123';\nDBUS_SESSION_BUS_PID=12345;\n";
926        let addr = parse_dbus_address(output).unwrap();
927        assert_eq!(addr, "unix:abstract=/tmp/dbus-XXX,guid=abc123");
928    }
929
930    #[test]
931    fn test_parse_dbus_address_missing() {
932        let output = "DBUS_SESSION_BUS_PID=12345;\n";
933        assert!(parse_dbus_address(output).is_err());
934    }
935
936    #[test]
937    fn test_parse_dbus_pid_valid() {
938        let output = "DBUS_SESSION_BUS_ADDRESS='unix:abstract=/tmp/dbus-XXX,guid=abc123';\nDBUS_SESSION_BUS_PID=12345;\n";
939        let pid = parse_dbus_pid(output).unwrap();
940        assert_eq!(pid, 12345);
941    }
942
943    #[test]
944    fn test_parse_dbus_pid_missing() {
945        let output = "DBUS_SESSION_BUS_ADDRESS='unix:abstract=/tmp/dbus-XXX,guid=abc123';\n";
946        assert!(parse_dbus_pid(output).is_err());
947    }
948
949    #[test]
950    fn test_parse_dbus_pid_invalid() {
951        let output = "DBUS_SESSION_BUS_PID=notanumber;\n";
952        assert!(parse_dbus_pid(output).is_err());
953    }
954
955    #[tokio::test]
956    async fn test_wait_for_socket_found() {
957        let dir = tempfile::tempdir().unwrap();
958        let runtime_dir = dir.path().to_str().unwrap().to_string();
959        let display = "wayland-test-99";
960        std::fs::File::create(dir.path().join(display)).unwrap();
961        wait_for_wayland_socket(&runtime_dir, display)
962            .await
963            .unwrap();
964    }
965
966    #[tokio::test]
967    async fn test_wait_for_pipewire_socket_found() {
968        let dir = tempfile::tempdir().unwrap();
969        let runtime_dir = dir.path().to_str().unwrap().to_string();
970        std::fs::File::create(dir.path().join("pipewire-0")).unwrap();
971        wait_for_pipewire_socket(&runtime_dir).await.unwrap();
972    }
973
974    #[tokio::test]
975    async fn test_wait_for_pipewire_socket_timeout() {
976        let dir = tempfile::tempdir().unwrap();
977        let runtime_dir = dir.path().to_str().unwrap().to_string();
978        let err = wait_for_pipewire_socket(&runtime_dir).await.unwrap_err();
979        assert!(
980            matches!(err, MutterError::PipewireSocketTimeout { .. }),
981            "expected PipewireSocketTimeout, got: {err}"
982        );
983        // Public mapping: same Timeout bucket as the wayland one,
984        // so workspace callers matching `Error::Timeout(_)` (e.g.
985        // the e2e tests) keep working.
986        let public: waydriver::Error = err.into();
987        assert!(
988            matches!(public, waydriver::Error::Timeout(_)),
989            "expected waydriver::Error::Timeout, got: {public}"
990        );
991    }
992
993    #[tokio::test]
994    async fn test_wait_for_socket_timeout() {
995        let dir = tempfile::tempdir().unwrap();
996        let runtime_dir = dir.path().to_str().unwrap().to_string();
997        let display = "wayland-nonexistent-0";
998        let err = wait_for_wayland_socket(&runtime_dir, display)
999            .await
1000            .unwrap_err();
1001        assert!(
1002            matches!(err, MutterError::WaylandSocketTimeout { .. }),
1003            "expected WaylandSocketTimeout, got: {err}"
1004        );
1005        // And confirm the From<MutterError> -> waydriver::Error mapping
1006        // still produces the public Timeout variant — workspace callers
1007        // (notably the e2e tests) match on it.
1008        let public: waydriver::Error = err.into();
1009        assert!(
1010            matches!(public, waydriver::Error::Timeout(_)),
1011            "expected waydriver::Error::Timeout, got: {public}"
1012        );
1013    }
1014
1015    #[test]
1016    fn test_new_generates_unique_ids() {
1017        let a = MutterCompositor::new();
1018        let b = MutterCompositor::new();
1019        assert_ne!(a.id(), b.id());
1020    }
1021
1022    #[test]
1023    fn test_new_wayland_display_contains_id() {
1024        let c = MutterCompositor::new();
1025        assert!(
1026            c.wayland_display().contains(c.id()),
1027            "display '{}' should contain id '{}'",
1028            c.wayland_display(),
1029            c.id()
1030        );
1031    }
1032
1033    #[test]
1034    fn test_new_runtime_dir_contains_id() {
1035        let c = MutterCompositor::new();
1036        let dir_str = c.runtime_dir().to_str().unwrap();
1037        assert!(
1038            dir_str.contains(c.id()),
1039            "runtime_dir '{}' should contain id '{}'",
1040            dir_str,
1041            c.id()
1042        );
1043    }
1044
1045    /// Regression: session runtime dirs must be flat siblings under one root,
1046    /// never nested inside each other. `waydriver::capture` repoints the
1047    /// process-wide `XDG_RUNTIME_DIR` at the live session's dir after a
1048    /// screenshot/recording; if `new()` re-read that mutated value, each
1049    /// session would nest one level deeper and eventually overflow the
1050    /// AF_UNIX `sun_path` limit, wedging pipewire socket creation. See
1051    /// `HOST_RUNTIME_ROOT`.
1052    #[test]
1053    fn test_session_runtime_dirs_are_siblings_not_nested() {
1054        let a = MutterCompositor::new();
1055        let dir_a = a.runtime_dir().to_path_buf();
1056
1057        // Simulate what a screenshot/recording does: point XDG_RUNTIME_DIR at
1058        // the live session's runtime dir and leave it there.
1059        unsafe {
1060            std::env::set_var("XDG_RUNTIME_DIR", &dir_a);
1061        }
1062
1063        let b = MutterCompositor::new();
1064        let dir_b = b.runtime_dir().to_path_buf();
1065
1066        assert_eq!(
1067            dir_a.parent(),
1068            dir_b.parent(),
1069            "session dirs must share a parent (siblings), got a={dir_a:?} b={dir_b:?}"
1070        );
1071        assert!(
1072            !dir_b.starts_with(&dir_a),
1073            "session B nested inside session A: {dir_b:?}"
1074        );
1075    }
1076
1077    #[test]
1078    fn test_new_wayland_display_prefix() {
1079        let c = MutterCompositor::new();
1080        assert!(c.wayland_display().starts_with("wayland-wd-"));
1081    }
1082
1083    #[test]
1084    fn test_new_runtime_dir_contains_session_prefix() {
1085        let c = MutterCompositor::new();
1086        let dir_str = c.runtime_dir().to_str().unwrap();
1087        assert!(dir_str.contains("wd-session-"));
1088    }
1089
1090    #[test]
1091    fn test_state_returns_none_before_start() {
1092        // `state()` previously panicked when called outside the started
1093        // window. The current contract returns `None` so callers can
1094        // detect the lifecycle without trapping a panic.
1095        let c = MutterCompositor::new();
1096        assert!(c.state().is_none());
1097    }
1098
1099    #[test]
1100    fn test_parse_resolution_accepts_hd() {
1101        assert_eq!(parse_resolution("1920x1080").unwrap(), (1920, 1080));
1102        assert_eq!(parse_resolution("1024x768").unwrap(), (1024, 768));
1103    }
1104
1105    #[test]
1106    fn test_parse_resolution_rejects_garbage() {
1107        for bad in [
1108            "",
1109            "1920",
1110            "1920x",
1111            "x1080",
1112            "0x0",
1113            "1920x0",
1114            "0x1080",
1115            "1920x1080x1",
1116            "abcxdef",
1117            "-1x1080",
1118            "1920 x 1080",
1119        ] {
1120            assert!(parse_resolution(bad).is_err(), "expected error for {bad:?}");
1121        }
1122    }
1123
1124    #[test]
1125    fn test_validate_scale_accepts_common_factors() {
1126        for ok in [0.5, 1.0, 1.25, 1.5, 1.6666, 1.75, 2.0, 3.0, 4.0] {
1127            assert!(validate_scale(ok).is_ok(), "expected {ok} to validate");
1128        }
1129    }
1130
1131    #[test]
1132    fn test_validate_scale_rejects_out_of_range_and_nonfinite() {
1133        for bad in [0.0, 0.49, 4.01, -1.0, f64::NAN, f64::INFINITY] {
1134            assert!(
1135                validate_scale(bad).is_err(),
1136                "expected {bad} to be rejected"
1137            );
1138        }
1139    }
1140
1141    #[test]
1142    fn test_nearest_supported_scale_snaps_to_closest() {
1143        let supported = [1.0, 1.25, 1.5, 1.75, 2.0];
1144        // Exact hits pass straight through.
1145        assert_eq!(nearest_supported_scale(1.5, &supported), 1.5);
1146        assert_eq!(nearest_supported_scale(2.0, &supported), 2.0);
1147        // 1.66 (166%) isn't advertised → nearest is 1.75.
1148        assert_eq!(nearest_supported_scale(1.66, &supported), 1.75);
1149        // 1.6 is closer to 1.5.
1150        assert_eq!(nearest_supported_scale(1.6, &supported), 1.5);
1151    }
1152
1153    #[test]
1154    fn test_nearest_supported_scale_empty_list_returns_request() {
1155        assert_eq!(nearest_supported_scale(1.5, &[]), 1.5);
1156    }
1157
1158    #[test]
1159    fn test_default_same_structure_as_new() {
1160        let c = MutterCompositor::default();
1161        assert!(c.wayland_display().starts_with("wayland-wd-"));
1162        assert!(c.runtime_dir().to_str().unwrap().contains("wd-session-"));
1163    }
1164}