Skip to main content

waydriver_compositor_mutter/
lib.rs

1//! Mutter implementation of [`waydriver::CompositorRuntime`].
2//!
3//! Owns the private-bus `dbus-daemon`, the `pipewire` + `wireplumber` pair,
4//! and a headless `mutter --wayland` instance. After [`MutterCompositor::start`]
5//! returns, [`MutterCompositor::state`] exposes an `Arc<MutterState>` that
6//! sibling backends (`waydriver-input-mutter`, `waydriver-capture-mutter`) use
7//! to talk to the same mutter D-Bus session.
8//!
9//! ## Shared-state invariant
10//!
11//! While any `Arc<MutterState>` exists, the mutter child processes and the
12//! private D-Bus connection MUST remain alive. [`waydriver::Session::kill`]
13//! enforces this by dropping input and capture trait objects before calling
14//! `compositor.stop().await`.
15
16mod error;
17
18use std::collections::HashMap;
19use std::path::{Path, PathBuf};
20use std::process::Stdio;
21use std::sync::{Arc, LazyLock, Mutex};
22
23use async_trait::async_trait;
24use tokio::process::{Child, Command};
25use zbus::zvariant::OwnedValue;
26
27use waydriver::gsettings::{self, GSettingEntry, GSettingsConfig};
28use waydriver::{CompositorRuntime, Result};
29
30use crate::error::MutterError;
31
32/// Default virtual-monitor geometry passed to mutter when the caller doesn't
33/// override it. Matches mutter's own implicit default.
34const DEFAULT_RESOLUTION: &str = "1024x768";
35
36/// Default logical-monitor scale: 1:1, i.e. `resolution` pixels are also the
37/// logical (application) size. Any other value drives the HiDPI path in
38/// [`apply_scale`].
39const DEFAULT_SCALE: f64 = 1.0;
40
41/// GVariant-text value seeded into `org.gnome.mutter experimental-features`
42/// when GSettings isolation is on. `scale-monitor-framebuffer` switches the
43/// native headless backend to logical layout mode, which is what makes
44/// fractional scales (1.5, 1.75, …) appear in a mode's `supported-scales` and
45/// be accepted by `ApplyMonitorsConfig`. Harmless at integer/1.0 scales.
46const MUTTER_FRACTIONAL_SCALING: &str = "['scale-monitor-framebuffer']";
47
48/// Accepted scale range. Below 0.5 the UI is unusably small; above 4.0 mutter
49/// won't offer the scale for any virtual-monitor mode we'd create. Validated
50/// up-front so a typo fails before we spawn any subprocess.
51const MIN_SCALE: f64 = 0.5;
52const MAX_SCALE: f64 = 4.0;
53
54/// How far a requested scale may sit from the nearest mutter-supported scale
55/// before we log a warning about snapping to it. Mutter only accepts scales it
56/// lists in a mode's `supported-scales`, so an exact arbitrary value (e.g.
57/// 1.66) may be nudged to the closest legal step.
58const SCALE_SNAP_TOLERANCE: f64 = 0.01;
59
60// ── DisplayConfig D-Bus shapes ───────────────────────────────────────────────
61//
62// Type aliases mirroring the `org.gnome.Mutter.DisplayConfig` wire types so
63// `body().deserialize::<CurrentState>()` validates the reply against the exact
64// signature mutter sends. The `a{sv}` property dicts are kept as
65// `HashMap<String, OwnedValue>` (signature `a{sv}`) and ignored — we only need
66// the connector, mode id, and supported-scales list.
67
68/// `a{sv}` — a D-Bus property dict.
69type DbusProps = HashMap<String, OwnedValue>;
70/// `(siiddada{sv})` — one monitor mode: id, width, height, refresh, preferred
71/// scale, supported scales, properties.
72type MonitorMode = (String, i32, i32, f64, f64, Vec<f64>, DbusProps);
73/// `(ssss)` — connector, vendor, product, serial.
74type MonitorSpec = (String, String, String, String);
75/// `((ssss)a(siiddada{sv})a{sv})` — one physical monitor: spec, modes, props.
76type PhysicalMonitor = (MonitorSpec, Vec<MonitorMode>, DbusProps);
77/// `(iiduba(ssss)a{sv})` — one logical monitor in the current layout.
78type LogicalMonitor = (i32, i32, f64, u32, bool, Vec<MonitorSpec>, DbusProps);
79/// Return tuple of `GetCurrentState`: `(serial, monitors, logical, props)`.
80type CurrentState = (u32, Vec<PhysicalMonitor>, Vec<LogicalMonitor>, DbusProps);
81
82/// `(ssa{sv})` — one monitor assignment in an `ApplyMonitorsConfig` request:
83/// connector, mode id, properties.
84type MonitorAssignment = (String, String, DbusProps);
85/// `(iiduba(ssa{sv}))` — one logical monitor to apply: x, y, scale, transform,
86/// primary, assigned monitors.
87type LogicalMonitorConfig = (i32, i32, f64, u32, bool, Vec<MonitorAssignment>);
88
89/// Shared mutter-backend state consumed by `waydriver-input-mutter` and
90/// `waydriver-capture-mutter`.
91///
92/// **Invariant:** while any `Arc<MutterState>` exists, the underlying D-Bus
93/// connection and the mutter child process must remain alive. See the
94/// module docs for details.
95///
96/// Fields are private — all access goes through the accessor methods
97/// below. Sibling crates (`waydriver-input-mutter`,
98/// `waydriver-capture-mutter`) that previously read fields directly
99/// now call `state.conn()`, `state.rd_session_path()`, etc. The
100/// shape of the underlying storage (e.g. how `active_stream_path` is
101/// guarded) is therefore an implementation detail that can change
102/// without breaking those callers — the contract lives entirely in
103/// the method signatures.
104pub struct MutterState {
105    conn: zbus::Connection,
106    rd_session_path: String,
107    rd_session_id: String,
108    rd_started: Arc<Mutex<bool>>,
109    runtime_dir: PathBuf,
110    active_stream_path: Arc<Mutex<Option<String>>>,
111}
112
113impl MutterState {
114    /// Persistent connection to mutter's private D-Bus.
115    ///
116    /// Both sibling backends (`waydriver-input-mutter`,
117    /// `waydriver-capture-mutter`) issue all their RemoteDesktop and
118    /// ScreenCast method calls through this connection.
119    pub fn conn(&self) -> &zbus::Connection {
120        &self.conn
121    }
122
123    /// RemoteDesktop session object path. Used by
124    /// `waydriver-input-mutter` as the `path` argument on every
125    /// pointer / keyboard `Notify*` D-Bus call.
126    pub fn rd_session_path(&self) -> &str {
127        &self.rd_session_path
128    }
129
130    /// RemoteDesktop session id, read from the `SessionId` property on
131    /// the RD session. `waydriver-capture-mutter` passes this as the
132    /// `remote-desktop-session-id` option to
133    /// `ScreenCast.CreateSession` so mutter links the two; the link is
134    /// required for `NotifyPointerMotionAbsolute` to be accepted.
135    pub fn rd_session_id(&self) -> &str {
136        &self.rd_session_id
137    }
138
139    /// Per-session `XDG_RUNTIME_DIR`. `waydriver-capture-mutter` joins
140    /// this with `pipewire-0` to locate the PipeWire socket.
141    pub fn runtime_dir(&self) -> &Path {
142        &self.runtime_dir
143    }
144
145    /// Lock the "RD-started" flag.
146    ///
147    /// Acquires the underlying mutex and returns the guard so the
148    /// caller can perform a check-and-set under one critical section
149    /// (the capture backend defers `RD.Session.Start` until the first
150    /// linked `ScreenCast.CreateSession` succeeds — that's a load,
151    /// some D-Bus work, and a store; splitting the read and write
152    /// would race). `Error::Process` if the mutex is poisoned.
153    pub fn rd_started_lock(&self) -> Result<std::sync::MutexGuard<'_, bool>> {
154        self.rd_started
155            .lock()
156            .map_err(|_| waydriver::Error::process("rd_started mutex poisoned"))
157    }
158
159    /// Lock the active ScreenCast Stream object path.
160    ///
161    /// Set by `waydriver-capture-mutter` in `start_stream`, cleared in
162    /// `stop_stream`. `waydriver-input-mutter` reads it to route
163    /// `NotifyPointerMotionAbsolute` at the correct monitor. `None`
164    /// inside the guard means no stream is open — absolute pointer
165    /// motion will error.
166    pub fn active_stream_path_lock(&self) -> Result<std::sync::MutexGuard<'_, Option<String>>> {
167        self.active_stream_path
168            .lock()
169            .map_err(|_| waydriver::Error::process("active_stream_path mutex poisoned"))
170    }
171}
172
173/// Headless mutter instance.
174pub struct MutterCompositor {
175    id: String,
176    wayland_display: String,
177    runtime_dir: PathBuf,
178    mutter_dbus_address: String,
179    mutter_dbus_pid: Option<u32>,
180    mutter: Option<Child>,
181    pipewire: Option<Child>,
182    wireplumber: Option<Child>,
183    state: Option<Arc<MutterState>>,
184    gsettings: GSettingsConfig,
185}
186
187/// The host runtime root under which every session's `wd-session-<id>`
188/// directory is created. Snapshotted once, lazily, on the first
189/// `MutterCompositor::new()` call.
190///
191/// This is deliberately read **once** and cached, rather than re-read from
192/// `XDG_RUNTIME_DIR` per session. `waydriver`'s screenshot and video pipelines
193/// (`waydriver::capture`) mutate the parent process's `XDG_RUNTIME_DIR` to
194/// point `pipewiresrc` at the *live* session's pipewire socket, and never
195/// restore it. If `new()` re-read the live env each time, session N+1's
196/// runtime dir would be created **inside** session N's dir
197/// (`…/wd-session-A/wd-session-B/…`), nesting one level deeper per session.
198/// After ~4 levels the `<dir>/pipewire-0` path exceeds the ~107-byte AF_UNIX
199/// `sun_path` limit, pipewire can no longer bind its socket, and every
200/// subsequent `start_session` fails with a "timeout: pipewire socket" error
201/// until the server is restarted (which resets the process env). Snapshotting
202/// the root keeps each session dir a flat sibling under the original
203/// `XDG_RUNTIME_DIR`, independent of how many sessions preceded it.
204///
205/// The first `new()` runs before any session exists, so the env is still the
206/// pristine value set by the launcher (e.g. the Docker entrypoint) — capturing
207/// it then is safe.
208static HOST_RUNTIME_ROOT: LazyLock<PathBuf> = LazyLock::new(|| {
209    let root = std::env::var("XDG_RUNTIME_DIR")
210        .unwrap_or_else(|_| format!("/run/user/{}", unsafe { libc::getuid() }));
211    PathBuf::from(root)
212});
213
214/// Eagerly capture [`HOST_RUNTIME_ROOT`] from the current `XDG_RUNTIME_DIR`
215/// and return it.
216///
217/// The snapshot is otherwise taken lazily on the first [`MutterCompositor::new`].
218/// Call this once at process startup — before any session is created and
219/// before anything can mutate `XDG_RUNTIME_DIR` — to pin the root to the
220/// pristine launcher value deterministically, rather than relying on `new()`
221/// happening first. Idempotent: subsequent calls (and `new()`) return the same
222/// captured value.
223pub fn establish_runtime_root() -> &'static std::path::Path {
224    HOST_RUNTIME_ROOT.as_path()
225}
226
227impl MutterCompositor {
228    /// Construct but do not start. Generates the session id and computes
229    /// where the Wayland socket and runtime dir will live. No I/O.
230    pub fn new() -> Self {
231        let id = uuid::Uuid::new_v4().to_string()[..8].to_string();
232        let wayland_display = format!("wayland-wd-{}", id);
233
234        let runtime_dir = HOST_RUNTIME_ROOT.join(format!("wd-session-{}", id));
235
236        Self {
237            id,
238            wayland_display,
239            runtime_dir,
240            mutter_dbus_address: String::new(),
241            mutter_dbus_pid: None,
242            mutter: None,
243            pipewire: None,
244            wireplumber: None,
245            state: None,
246            gsettings: GSettingsConfig::default(),
247        }
248    }
249
250    /// Set the per-session GSettings isolation config (see
251    /// [`waydriver::gsettings`]). Defaults to isolated with no seeded entries.
252    /// When isolated, [`start`](Self::start) writes a private keyfile (seeded
253    /// with `org.gnome.mutter experimental-features` plus `config.initial`)
254    /// and points mutter at it, so fractional scales work and the host's dconf
255    /// is neither read nor written. Pass `isolated: false` to run mutter
256    /// against the host's GSettings instead.
257    pub fn with_gsettings(mut self, config: GSettingsConfig) -> Self {
258        self.gsettings = config;
259        self
260    }
261
262    /// Returns the shared `Arc<MutterState>` for passing to sibling
263    /// backends, or `None` when called outside the started window.
264    ///
265    /// `None` is returned when:
266    /// - `start()` has not yet completed (or returned an error), or
267    /// - `stop()` has been called and dropped the state.
268    ///
269    /// Callers that have just awaited `start()?` know the state is
270    /// present — `expect()` or `?`-with-typed-error is appropriate
271    /// there. Returning `Option` instead of panicking keeps the API
272    /// honest about the lifecycle and lets callers detect "stopped"
273    /// without first matching on a panic.
274    pub fn state(&self) -> Option<Arc<MutterState>> {
275        self.state.clone()
276    }
277}
278
279impl Default for MutterCompositor {
280    fn default() -> Self {
281        Self::new()
282    }
283}
284
285impl MutterCompositor {
286    /// Typed-error implementation of `start`. The trait method calls
287    /// this and converts the result via `From<MutterError>`.
288    ///
289    /// Steps (each fails with a specific `MutterError` variant):
290    /// 1. validate resolution + scale,
291    /// 2. ensure the session runtime dir exists,
292    /// 3. spawn a private `dbus-daemon` and parse its address + PID,
293    /// 4. spawn `pipewire` + `wireplumber` on that bus,
294    /// 5. spawn headless `mutter --wayland`,
295    /// 6. wait for the Wayland socket,
296    /// 7. open a zbus connection, retry-create the RemoteDesktop session,
297    /// 8. read its `SessionId` property,
298    /// 9. apply a non-default logical-monitor scale via DisplayConfig,
299    /// 10. publish the `Arc<MutterState>` for sibling backends.
300    async fn start_inner(
301        &mut self,
302        resolution: Option<&str>,
303        scale: Option<f64>,
304    ) -> std::result::Result<(), MutterError> {
305        let resolution = resolution.unwrap_or(DEFAULT_RESOLUTION);
306        // Validate before we start spawning subprocesses — mutter silently
307        // ignores bad --virtual-monitor values and falls back to its own
308        // default, which would surprise the caller.
309        parse_resolution(resolution)?;
310        let scale = scale.unwrap_or(DEFAULT_SCALE);
311        // Fail fast on a nonsense scale too, for the same reason — the
312        // DisplayConfig apply that consumes it doesn't run until mutter is up.
313        validate_scale(scale)?;
314
315        tracing::info!(
316            id = self.id,
317            resolution,
318            scale,
319            isolated = self.gsettings.isolated,
320            "starting mutter compositor"
321        );
322
323        tokio::fs::create_dir_all(&self.runtime_dir).await?;
324        // `runtime_dir` is built in `new()` from a UTF-8 String
325        // (XDG_RUNTIME_DIR or `/run/user/<uid>`) joined with a UTF-8
326        // ASCII session id, so the path is guaranteed valid UTF-8.
327        // `expect` documents that invariant rather than re-deriving
328        // it via the `to_str()` `Option`.
329        let runtime_str = self
330            .runtime_dir
331            .to_str()
332            .expect("invariant: runtime_dir built from UTF-8 inputs in new()")
333            .to_string();
334
335        // GSettings isolation: when on, write the session's private keyfile
336        // (read by both mutter and the app — see `waydriver::gsettings`) and
337        // compute the env that points mutter at it. The keyfile is seeded with
338        // the fractional-scaling experimental feature so a non-integer `scale`
339        // is actually advertised by mutter, then any caller-supplied entries
340        // are appended (last-wins, so callers can override). When off, mutter
341        // reads the host's GSettings and `config_env` stays empty.
342        let config_env: Vec<(&str, String)> = if self.gsettings.isolated {
343            let mut entries = vec![GSettingEntry::new(
344                "org.gnome.mutter",
345                "experimental-features",
346                MUTTER_FRACTIONAL_SCALING,
347            )];
348            entries.extend(self.gsettings.initial.iter().cloned());
349            gsettings::write_keyfile(&self.runtime_dir, &entries)?;
350            let config_dir = gsettings::config_dir(&self.runtime_dir)
351                .to_str()
352                .expect("invariant: config_dir is runtime_dir (UTF-8) + ASCII suffix")
353                .to_string();
354            vec![
355                ("XDG_CONFIG_HOME", config_dir),
356                ("GSETTINGS_BACKEND", gsettings::KEYFILE_BACKEND.to_string()),
357            ]
358        } else {
359            Vec::new()
360        };
361
362        // Step 1: Private D-Bus for mutter (so its ScreenCast API doesn't conflict with host).
363        let dbus_output = Command::new("dbus-launch")
364            .arg("--sh-syntax")
365            .output()
366            .await?;
367        if !dbus_output.status.success() {
368            return Err(MutterError::DbusLaunchFailed(
369                String::from_utf8_lossy(&dbus_output.stderr).into_owned(),
370            ));
371        }
372        let dbus_stdout = String::from_utf8_lossy(&dbus_output.stdout);
373        self.mutter_dbus_address = parse_dbus_address(&dbus_stdout)?;
374        self.mutter_dbus_pid = Some(parse_dbus_pid(&dbus_stdout)?);
375        tracing::debug!(id = self.id, mutter_dbus_address = %self.mutter_dbus_address, "private D-Bus for mutter");
376
377        // Step 2: PipeWire + WirePlumber (for screenshots via ScreenCast).
378        //
379        // `env_remove("PIPEWIRE_REMOTE")` is load-bearing: `waydriver`'s
380        // `grab_png_sync` mutates the parent's process env to point
381        // `pipewiresrc` at the live session's pipewire socket. After a
382        // session stops, that socket is gone but the env var lingers in
383        // the parent. Without scrubbing it here, a freshly spawned
384        // `pipewire`/`wireplumber`/`mutter` for the next session would
385        // inherit the stale value and try to connect to the previous
386        // session's dead socket — wireplumber/mutter prefer
387        // `PIPEWIRE_REMOTE` over `XDG_RUNTIME_DIR/pipewire-0`, so the
388        // explicit `XDG_RUNTIME_DIR` override below isn't enough.
389        // Symptom: `ScreenCast.Start` fails with "Couldn't connect
390        // pipewire context" on every session after the first.
391        let pipewire = Command::new("pipewire")
392            .env_remove("PIPEWIRE_REMOTE")
393            .env("DBUS_SESSION_BUS_ADDRESS", &self.mutter_dbus_address)
394            .env("XDG_RUNTIME_DIR", &runtime_str)
395            .stdout(Stdio::null())
396            .stderr(Stdio::null())
397            .spawn()
398            .map_err(|source| MutterError::Spawn {
399                process: "pipewire",
400                source,
401            })?;
402        self.pipewire = Some(pipewire);
403
404        // Wait for pipewire's socket to appear before launching
405        // wireplumber. Polling for the socket file is the same
406        // readiness signal `wait_for_wayland_socket` uses for
407        // mutter: it's the actual handshake clients use, so any
408        // earlier signal would either be racier (process spawn) or
409        // just as expensive to probe.
410        wait_for_pipewire_socket(&runtime_str).await?;
411
412        let wireplumber = Command::new("wireplumber")
413            .env_remove("PIPEWIRE_REMOTE")
414            .env("DBUS_SESSION_BUS_ADDRESS", &self.mutter_dbus_address)
415            .env("XDG_RUNTIME_DIR", &runtime_str)
416            .stdout(Stdio::null())
417            .stderr(Stdio::null())
418            .spawn()
419            .map_err(|source| MutterError::Spawn {
420                process: "wireplumber",
421                source,
422            })?;
423        self.wireplumber = Some(wireplumber);
424
425        // No bus-readiness signal poll for wireplumber: it's a
426        // session-policy daemon that doesn't register a stable D-Bus
427        // name we can probe, and its initialisation runs in parallel
428        // with mutter's own startup. The downstream
429        // `ScreenCast.CreateSession` retry loop in
430        // `waydriver-capture-mutter::start_stream` is what actually
431        // gates on wireplumber having joined the graph — putting a
432        // pessimistic sleep here as well would add startup latency
433        // without changing correctness.
434        tracing::debug!(id = self.id, "PipeWire + WirePlumber started");
435
436        // Step 3: mutter in headless Wayland mode (on its private D-Bus).
437        let mutter = Command::new("mutter")
438            .args([
439                "--headless",
440                "--wayland",
441                "--no-x11",
442                "--wayland-display",
443                &self.wayland_display,
444                "--virtual-monitor",
445                resolution,
446            ])
447            .env_remove("PIPEWIRE_REMOTE")
448            .env("DBUS_SESSION_BUS_ADDRESS", &self.mutter_dbus_address)
449            .env("XDG_RUNTIME_DIR", &runtime_str)
450            // Empty when isolation is off; otherwise points mutter at the
451            // per-session keyfile GSettings store written above.
452            .envs(config_env.iter().map(|(k, v)| (*k, v.as_str())))
453            .stdout(Stdio::null())
454            .stderr(Stdio::inherit())
455            .spawn()
456            .map_err(|source| MutterError::Spawn {
457                process: "mutter",
458                source,
459            })?;
460        self.mutter = Some(mutter);
461        tracing::debug!(id = self.id, wayland_display = %self.wayland_display, "mutter spawned");
462
463        // Step 4: Wait for the Wayland socket.
464        wait_for_wayland_socket(&runtime_str, &self.wayland_display).await?;
465        tracing::debug!(id = self.id, "wayland socket ready");
466
467        // Step 5: Connect to mutter's private D-Bus and start RemoteDesktop session.
468        let mutter_addr: zbus::address::Address = self
469            .mutter_dbus_address
470            .as_str()
471            .try_into()
472            .map_err(|source: zbus::Error| MutterError::DbusAddressInvalid {
473                addr: self.mutter_dbus_address.clone(),
474                source,
475            })?;
476        let mutter_conn = zbus::connection::Builder::address(mutter_addr)
477            .map_err(|source| MutterError::DbusConnect {
478                stage: "build connection builder",
479                source,
480            })?
481            .build()
482            .await
483            .map_err(|source| MutterError::DbusConnect {
484                stage: "connect",
485                source,
486            })?;
487
488        // Wait for mutter to register its D-Bus services (may take a moment after socket appears)
489        let mut rd_reply = None;
490        for i in 0..50 {
491            match mutter_conn
492                .call_method(
493                    Some("org.gnome.Mutter.RemoteDesktop"),
494                    "/org/gnome/Mutter/RemoteDesktop",
495                    Some("org.gnome.Mutter.RemoteDesktop"),
496                    "CreateSession",
497                    &(),
498                )
499                .await
500            {
501                Ok(reply) => {
502                    rd_reply = Some(reply);
503                    break;
504                }
505                Err(e) if i < 49 => {
506                    tracing::debug!(
507                        id = self.id,
508                        attempt = i,
509                        "waiting for RemoteDesktop service: {e}"
510                    );
511                    tokio::time::sleep(std::time::Duration::from_millis(100)).await;
512                }
513                Err(e) => {
514                    return Err(MutterError::RemoteDesktopCreate(e));
515                }
516            }
517        }
518        // The retry loop above either `break`s with `rd_reply = Some(_)`
519        // or returns `Err(...)` from the final attempt — `unwrap` here
520        // is unreachable by construction.
521        let rd_reply = rd_reply.expect("retry loop sets Some on break or returns Err");
522        let rd_session_path: zbus::zvariant::OwnedObjectPath = rd_reply
523            .body()
524            .deserialize()
525            .map_err(MutterError::RdSessionPathParse)?;
526        // Intentionally do NOT call `RemoteDesktop.Session.Start` here.
527        // Mutter only accepts `remote-desktop-session-id` on
528        // `ScreenCast.CreateSession` when the RD session is not yet
529        // started, so `waydriver-capture-mutter::start_stream` defers
530        // the Start call until after it has created the linked
531        // ScreenCast session.
532        // Read the RD session's `SessionId` property — it's the token
533        // ScreenCast.CreateSession needs in `remote-desktop-session-id`
534        // to link the two sessions. Without that link, mutter rejects
535        // NotifyPointerMotionAbsolute with "No screen cast active".
536        let rd_session_id_reply = mutter_conn
537            .call_method(
538                Some("org.gnome.Mutter.RemoteDesktop"),
539                rd_session_path.as_str(),
540                Some("org.freedesktop.DBus.Properties"),
541                "Get",
542                &("org.gnome.Mutter.RemoteDesktop.Session", "SessionId"),
543            )
544            .await
545            .map_err(MutterError::SessionIdGet)?;
546        // `Get` returns a variant; deserialize as `OwnedValue` to detach
547        // the string from the reply's body before the reply is dropped.
548        let rd_session_id_body = rd_session_id_reply.body();
549        let rd_session_id_variant: zbus::zvariant::OwnedValue = rd_session_id_body
550            .deserialize()
551            .map_err(MutterError::SessionIdVariantParse)?;
552        let rd_session_id: String = rd_session_id_variant
553            .try_into()
554            .map_err(MutterError::SessionIdNotString)?;
555
556        let rd_session_path = rd_session_path.to_string();
557        tracing::debug!(
558            id = self.id,
559            rd_session_path = %rd_session_path,
560            rd_session_id = %rd_session_id,
561            "RemoteDesktop session started"
562        );
563
564        // Step: apply a non-default logical-monitor scale. `--virtual-monitor`
565        // has no scale component, so HiDPI is configured here over mutter's
566        // private bus once DisplayConfig is up. Skipped at 1.0 to leave the
567        // default 1:1 path completely untouched.
568        if (scale - DEFAULT_SCALE).abs() > f64::EPSILON {
569            let applied = apply_scale(&mutter_conn, scale, &self.id).await?;
570            tracing::info!(
571                id = self.id,
572                requested = scale,
573                applied,
574                "applied logical-monitor scale"
575            );
576        }
577
578        self.state = Some(Arc::new(MutterState {
579            conn: mutter_conn,
580            rd_session_path,
581            rd_session_id,
582            rd_started: Arc::new(Mutex::new(false)),
583            runtime_dir: self.runtime_dir.clone(),
584            active_stream_path: Arc::new(Mutex::new(None)),
585        }));
586
587        Ok(())
588    }
589}
590
591#[async_trait]
592impl CompositorRuntime for MutterCompositor {
593    async fn start(&mut self, resolution: Option<&str>, scale: Option<f64>) -> Result<()> {
594        // Body uses the crate-local typed `MutterError`. The `?` at the
595        // end of `self.start_inner(...).await?` runs the
596        // `From<MutterError> for waydriver::Error` impl in `error.rs`,
597        // which is the single boundary at which the typed enum becomes
598        // the workspace's shared `waydriver::Error`.
599        Ok(self.start_inner(resolution, scale).await?)
600    }
601
602    async fn stop(&mut self) -> Result<()> {
603        tracing::info!(id = self.id, "stopping mutter compositor");
604
605        // Stop RemoteDesktop session if still reachable. We could
606        // touch the private fields directly here (same crate), but
607        // routing through the public accessors keeps the contract
608        // visible and means a future change to the field layout
609        // doesn't need to update this site.
610        if let Some(state) = &self.state {
611            let _ = state
612                .conn()
613                .call_method(
614                    Some("org.gnome.Mutter.RemoteDesktop"),
615                    state.rd_session_path(),
616                    Some("org.gnome.Mutter.RemoteDesktop.Session"),
617                    "Stop",
618                    &(),
619                )
620                .await;
621        }
622
623        // Drop our strong ref to the shared state. If callers haven't dropped
624        // theirs (the input/capture trait objects), their Arc still points at
625        // the D-Bus connection we're about to tear down below — any method
626        // call on them after this will fail with "connection closed".
627        self.state = None;
628
629        if let Some(mut mutter) = self.mutter.take() {
630            let _ = mutter.kill().await;
631            let _ = mutter.wait().await;
632        }
633        if let Some(mut wireplumber) = self.wireplumber.take() {
634            let _ = wireplumber.kill().await;
635            let _ = wireplumber.wait().await;
636        }
637        if let Some(mut pipewire) = self.pipewire.take() {
638            let _ = pipewire.kill().await;
639            let _ = pipewire.wait().await;
640        }
641
642        if let Some(pid) = self.mutter_dbus_pid.take() {
643            unsafe {
644                libc::kill(pid as i32, libc::SIGTERM);
645            }
646        }
647
648        let _ = tokio::fs::remove_dir_all(&self.runtime_dir).await;
649
650        tracing::debug!(id = self.id, "mutter compositor stopped");
651        Ok(())
652    }
653
654    fn id(&self) -> &str {
655        &self.id
656    }
657
658    fn wayland_display(&self) -> &str {
659        &self.wayland_display
660    }
661
662    fn runtime_dir(&self) -> &Path {
663        &self.runtime_dir
664    }
665}
666
667impl Drop for MutterCompositor {
668    fn drop(&mut self) {
669        // Best-effort cleanup when dropped without calling stop().
670        // Can't use async here, so send SIGKILL synchronously.
671        self.state = None;
672
673        if let Some(ref mut child) = self.mutter {
674            let _ = child.start_kill();
675        }
676        if let Some(ref mut child) = self.wireplumber {
677            let _ = child.start_kill();
678        }
679        if let Some(ref mut child) = self.pipewire {
680            let _ = child.start_kill();
681        }
682        if let Some(pid) = self.mutter_dbus_pid {
683            unsafe {
684                libc::kill(pid as i32, libc::SIGKILL);
685            }
686        }
687        let _ = std::fs::remove_dir_all(&self.runtime_dir);
688    }
689}
690
691// ── Helpers ─────────────────────────────────────────────────────────────────
692
693fn parse_dbus_address(output: &str) -> std::result::Result<String, MutterError> {
694    for line in output.lines() {
695        if let Some(rest) = line.strip_prefix("DBUS_SESSION_BUS_ADDRESS='") {
696            if let Some(addr) = rest.strip_suffix("';") {
697                return Ok(addr.to_string());
698            }
699        }
700    }
701    Err(MutterError::DbusOutputMissingField {
702        field: "DBUS_SESSION_BUS_ADDRESS",
703    })
704}
705
706fn parse_dbus_pid(output: &str) -> std::result::Result<u32, MutterError> {
707    for line in output.lines() {
708        if let Some(rest) = line.strip_prefix("DBUS_SESSION_BUS_PID=") {
709            let pid_str = rest.trim_end_matches(';').trim();
710            return pid_str.parse().map_err(MutterError::DbusPidParse);
711        }
712    }
713    Err(MutterError::DbusOutputMissingField {
714        field: "DBUS_SESSION_BUS_PID",
715    })
716}
717
718fn parse_resolution(s: &str) -> std::result::Result<(u32, u32), MutterError> {
719    let invalid = || MutterError::ResolutionInvalid {
720        value: s.to_string(),
721    };
722    let (w, h) = s.split_once('x').ok_or_else(invalid)?;
723    let parse = |part: &str| -> std::result::Result<u32, MutterError> {
724        part.parse::<u32>()
725            .ok()
726            .filter(|n| *n > 0)
727            .ok_or_else(invalid)
728    };
729    Ok((parse(w)?, parse(h)?))
730}
731
732/// Reject a scale that isn't a finite, positive factor inside
733/// [`MIN_SCALE`]..=[`MAX_SCALE`]. Run before any subprocess spawns so a bad
734/// value fails fast.
735fn validate_scale(scale: f64) -> std::result::Result<(), MutterError> {
736    if scale.is_finite() && (MIN_SCALE..=MAX_SCALE).contains(&scale) {
737        Ok(())
738    } else {
739        Err(MutterError::ScaleInvalid {
740            value: scale,
741            min: MIN_SCALE,
742            max: MAX_SCALE,
743        })
744    }
745}
746
747/// Pick the entry of `supported` closest to `requested`. Mutter only accepts
748/// scales it advertises for a mode, so an arbitrary request (e.g. 1.66) is
749/// snapped to the nearest legal step. Falls back to `requested` when the list
750/// is empty (mutter then validates — and likely rejects — it).
751fn nearest_supported_scale(requested: f64, supported: &[f64]) -> f64 {
752    supported
753        .iter()
754        .copied()
755        .min_by(|a, b| {
756            let da = (a - requested).abs();
757            let db = (b - requested).abs();
758            da.partial_cmp(&db).unwrap_or(std::cmp::Ordering::Equal)
759        })
760        .unwrap_or(requested)
761}
762
763/// Apply `requested` as the logical-monitor scale of the (single) virtual
764/// monitor via `org.gnome.Mutter.DisplayConfig`, returning the scale mutter
765/// actually accepted (snapped to a supported step).
766///
767/// Reads `GetCurrentState` fresh each call so the `serial` is current —
768/// mutter rejects `ApplyMonitorsConfig` on a stale serial, and the serial
769/// bumps when the virtual monitor first appears. Fractional scales (1.5,
770/// 1.75, …) require the `scale-monitor-framebuffer` experimental feature; the
771/// container entrypoint enables it. Without it only integer scales are
772/// advertised, so [`nearest_supported_scale`] would snap a fractional request
773/// to 1.0 or 2.0.
774async fn apply_scale(
775    conn: &zbus::Connection,
776    requested: f64,
777    id: &str,
778) -> std::result::Result<f64, MutterError> {
779    let state_reply = conn
780        .call_method(
781            Some("org.gnome.Mutter.DisplayConfig"),
782            "/org/gnome/Mutter/DisplayConfig",
783            Some("org.gnome.Mutter.DisplayConfig"),
784            "GetCurrentState",
785            &(),
786        )
787        .await
788        .map_err(|source| MutterError::DisplayConfigState {
789            stage: "call",
790            source,
791        })?;
792    let state_body = state_reply.body();
793    let (serial, monitors, _logical, _props): CurrentState =
794        state_body
795            .deserialize()
796            .map_err(|source| MutterError::DisplayConfigState {
797                stage: "deserialize",
798                source,
799            })?;
800
801    // Headless mutter started with a single `--virtual-monitor` exposes
802    // exactly one monitor advertising exactly the mode we asked for, so the
803    // first monitor / first mode is the one to scale.
804    let (spec, modes, _mprops) = monitors
805        .into_iter()
806        .next()
807        .ok_or(MutterError::DisplayConfigNoMonitor)?;
808    let connector = spec.0;
809    let (mode_id, _w, _h, _refresh, _preferred, supported, _modeprops) =
810        modes
811            .into_iter()
812            .next()
813            .ok_or(MutterError::DisplayConfigNoMonitor)?;
814
815    let applied = nearest_supported_scale(requested, &supported);
816    if (applied - requested).abs() > SCALE_SNAP_TOLERANCE {
817        tracing::warn!(
818            id,
819            requested,
820            applied,
821            supported = ?supported,
822            "requested scale not advertised by mutter; snapped to nearest supported"
823        );
824    }
825
826    // (x, y, scale, transform, primary, [(connector, mode_id, {})]).
827    let logical: LogicalMonitorConfig = (
828        0,
829        0,
830        applied,
831        0,
832        true,
833        vec![(connector, mode_id, DbusProps::new())],
834    );
835    // method 1 = temporary: applies for this session without writing
836    // ~/.config/monitors.xml, which is all a throwaway headless run needs.
837    conn.call_method(
838        Some("org.gnome.Mutter.DisplayConfig"),
839        "/org/gnome/Mutter/DisplayConfig",
840        Some("org.gnome.Mutter.DisplayConfig"),
841        "ApplyMonitorsConfig",
842        &(serial, 1u32, vec![logical], DbusProps::new()),
843    )
844    .await
845    .map_err(|source| MutterError::DisplayConfigApply {
846        scale: applied,
847        source,
848    })?;
849
850    Ok(applied)
851}
852
853async fn wait_for_wayland_socket(
854    runtime_dir: &str,
855    display: &str,
856) -> std::result::Result<(), MutterError> {
857    let socket_path = PathBuf::from(runtime_dir).join(display);
858    for _ in 0..50 {
859        if socket_path.exists() {
860            return Ok(());
861        }
862        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
863    }
864    Err(MutterError::WaylandSocketTimeout {
865        socket: socket_path.display().to_string(),
866    })
867}
868
869/// PipeWire creates `<runtime_dir>/pipewire-0` as soon as it's ready
870/// to accept client connections. Polling for that file replaces the
871/// previous unconditional `sleep(1s)` after spawning the pipewire
872/// process — same readiness model as
873/// [`wait_for_wayland_socket`].
874async fn wait_for_pipewire_socket(runtime_dir: &str) -> std::result::Result<(), MutterError> {
875    let socket_path = PathBuf::from(runtime_dir).join("pipewire-0");
876    for _ in 0..50 {
877        if socket_path.exists() {
878            return Ok(());
879        }
880        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
881    }
882    Err(MutterError::PipewireSocketTimeout {
883        socket: socket_path.display().to_string(),
884    })
885}
886
887#[cfg(test)]
888mod tests {
889    use super::*;
890
891    /// Live end-to-end check that a fractional scale is actually applied by a
892    /// real mutter. Requires the runtime stack (mutter, pipewire, wireplumber,
893    /// dbus-launch) on `PATH` plus the GSettings schemas in `XDG_DATA_DIRS`, so
894    /// it's `#[ignore]`d by default; run with the dev shell's env via
895    /// `cargo test -p waydriver-compositor-mutter -- --ignored`.
896    ///
897    /// This exercises the whole chain at once: the per-session keyfile must
898    /// enable `scale-monitor-framebuffer` (otherwise 1.5 is not advertised and
899    /// would snap to 1.0/2.0), and `apply_scale` must drive DisplayConfig
900    /// correctly. We read the scale straight back from `GetCurrentState`.
901    #[tokio::test]
902    #[ignore = "requires a live mutter/pipewire/dbus runtime stack"]
903    async fn applies_fractional_scale_against_real_mutter() {
904        let mut compositor = MutterCompositor::new();
905        compositor
906            .start(Some("1920x1080"), Some(1.5))
907            .await
908            .expect("compositor should start");
909        let state = compositor.state().expect("state present after start");
910
911        let reply = state
912            .conn()
913            .call_method(
914                Some("org.gnome.Mutter.DisplayConfig"),
915                "/org/gnome/Mutter/DisplayConfig",
916                Some("org.gnome.Mutter.DisplayConfig"),
917                "GetCurrentState",
918                &(),
919            )
920            .await
921            .expect("GetCurrentState should succeed");
922        let body = reply.body();
923        let (_serial, _monitors, logical, _props): CurrentState = body
924            .deserialize()
925            .expect("GetCurrentState body should deserialize");
926        let applied = logical.first().expect("at least one logical monitor").2;
927
928        compositor.stop().await.expect("compositor should stop");
929
930        assert!(
931            (applied - 1.5).abs() < 0.01,
932            "expected logical scale ~1.5 (fractional scaling enabled via keyfile), got {applied}"
933        );
934    }
935
936    #[test]
937    fn test_parse_dbus_address_valid() {
938        let output = "DBUS_SESSION_BUS_ADDRESS='unix:abstract=/tmp/dbus-XXX,guid=abc123';\nDBUS_SESSION_BUS_PID=12345;\n";
939        let addr = parse_dbus_address(output).unwrap();
940        assert_eq!(addr, "unix:abstract=/tmp/dbus-XXX,guid=abc123");
941    }
942
943    #[test]
944    fn test_parse_dbus_address_missing() {
945        let output = "DBUS_SESSION_BUS_PID=12345;\n";
946        assert!(parse_dbus_address(output).is_err());
947    }
948
949    #[test]
950    fn test_parse_dbus_pid_valid() {
951        let output = "DBUS_SESSION_BUS_ADDRESS='unix:abstract=/tmp/dbus-XXX,guid=abc123';\nDBUS_SESSION_BUS_PID=12345;\n";
952        let pid = parse_dbus_pid(output).unwrap();
953        assert_eq!(pid, 12345);
954    }
955
956    #[test]
957    fn test_parse_dbus_pid_missing() {
958        let output = "DBUS_SESSION_BUS_ADDRESS='unix:abstract=/tmp/dbus-XXX,guid=abc123';\n";
959        assert!(parse_dbus_pid(output).is_err());
960    }
961
962    #[test]
963    fn test_parse_dbus_pid_invalid() {
964        let output = "DBUS_SESSION_BUS_PID=notanumber;\n";
965        assert!(parse_dbus_pid(output).is_err());
966    }
967
968    #[tokio::test]
969    async fn test_wait_for_socket_found() {
970        let dir = tempfile::tempdir().unwrap();
971        let runtime_dir = dir.path().to_str().unwrap().to_string();
972        let display = "wayland-test-99";
973        std::fs::File::create(dir.path().join(display)).unwrap();
974        wait_for_wayland_socket(&runtime_dir, display)
975            .await
976            .unwrap();
977    }
978
979    #[tokio::test]
980    async fn test_wait_for_pipewire_socket_found() {
981        let dir = tempfile::tempdir().unwrap();
982        let runtime_dir = dir.path().to_str().unwrap().to_string();
983        std::fs::File::create(dir.path().join("pipewire-0")).unwrap();
984        wait_for_pipewire_socket(&runtime_dir).await.unwrap();
985    }
986
987    #[tokio::test]
988    async fn test_wait_for_pipewire_socket_timeout() {
989        let dir = tempfile::tempdir().unwrap();
990        let runtime_dir = dir.path().to_str().unwrap().to_string();
991        let err = wait_for_pipewire_socket(&runtime_dir).await.unwrap_err();
992        assert!(
993            matches!(err, MutterError::PipewireSocketTimeout { .. }),
994            "expected PipewireSocketTimeout, got: {err}"
995        );
996        // Public mapping: same Timeout bucket as the wayland one,
997        // so workspace callers matching `Error::Timeout(_)` (e.g.
998        // the e2e tests) keep working.
999        let public: waydriver::Error = err.into();
1000        assert!(
1001            matches!(public, waydriver::Error::Timeout(_)),
1002            "expected waydriver::Error::Timeout, got: {public}"
1003        );
1004    }
1005
1006    #[tokio::test]
1007    async fn test_wait_for_socket_timeout() {
1008        let dir = tempfile::tempdir().unwrap();
1009        let runtime_dir = dir.path().to_str().unwrap().to_string();
1010        let display = "wayland-nonexistent-0";
1011        let err = wait_for_wayland_socket(&runtime_dir, display)
1012            .await
1013            .unwrap_err();
1014        assert!(
1015            matches!(err, MutterError::WaylandSocketTimeout { .. }),
1016            "expected WaylandSocketTimeout, got: {err}"
1017        );
1018        // And confirm the From<MutterError> -> waydriver::Error mapping
1019        // still produces the public Timeout variant — workspace callers
1020        // (notably the e2e tests) match on it.
1021        let public: waydriver::Error = err.into();
1022        assert!(
1023            matches!(public, waydriver::Error::Timeout(_)),
1024            "expected waydriver::Error::Timeout, got: {public}"
1025        );
1026    }
1027
1028    #[test]
1029    fn test_new_generates_unique_ids() {
1030        let a = MutterCompositor::new();
1031        let b = MutterCompositor::new();
1032        assert_ne!(a.id(), b.id());
1033    }
1034
1035    #[test]
1036    fn test_new_wayland_display_contains_id() {
1037        let c = MutterCompositor::new();
1038        assert!(
1039            c.wayland_display().contains(c.id()),
1040            "display '{}' should contain id '{}'",
1041            c.wayland_display(),
1042            c.id()
1043        );
1044    }
1045
1046    #[test]
1047    fn test_new_runtime_dir_contains_id() {
1048        let c = MutterCompositor::new();
1049        let dir_str = c.runtime_dir().to_str().unwrap();
1050        assert!(
1051            dir_str.contains(c.id()),
1052            "runtime_dir '{}' should contain id '{}'",
1053            dir_str,
1054            c.id()
1055        );
1056    }
1057
1058    /// Regression: session runtime dirs must be flat siblings under one root,
1059    /// never nested inside each other. `waydriver::capture` repoints the
1060    /// process-wide `XDG_RUNTIME_DIR` at the live session's dir after a
1061    /// screenshot/recording; if `new()` re-read that mutated value, each
1062    /// session would nest one level deeper and eventually overflow the
1063    /// AF_UNIX `sun_path` limit, wedging pipewire socket creation. See
1064    /// `HOST_RUNTIME_ROOT`.
1065    #[test]
1066    fn test_session_runtime_dirs_are_siblings_not_nested() {
1067        let a = MutterCompositor::new();
1068        let dir_a = a.runtime_dir().to_path_buf();
1069
1070        // Simulate what a screenshot/recording does: point XDG_RUNTIME_DIR at
1071        // the live session's runtime dir and leave it there.
1072        unsafe {
1073            std::env::set_var("XDG_RUNTIME_DIR", &dir_a);
1074        }
1075
1076        let b = MutterCompositor::new();
1077        let dir_b = b.runtime_dir().to_path_buf();
1078
1079        assert_eq!(
1080            dir_a.parent(),
1081            dir_b.parent(),
1082            "session dirs must share a parent (siblings), got a={dir_a:?} b={dir_b:?}"
1083        );
1084        assert!(
1085            !dir_b.starts_with(&dir_a),
1086            "session B nested inside session A: {dir_b:?}"
1087        );
1088    }
1089
1090    #[test]
1091    fn test_new_wayland_display_prefix() {
1092        let c = MutterCompositor::new();
1093        assert!(c.wayland_display().starts_with("wayland-wd-"));
1094    }
1095
1096    #[test]
1097    fn test_new_runtime_dir_contains_session_prefix() {
1098        let c = MutterCompositor::new();
1099        let dir_str = c.runtime_dir().to_str().unwrap();
1100        assert!(dir_str.contains("wd-session-"));
1101    }
1102
1103    #[test]
1104    fn test_state_returns_none_before_start() {
1105        // `state()` previously panicked when called outside the started
1106        // window. The current contract returns `None` so callers can
1107        // detect the lifecycle without trapping a panic.
1108        let c = MutterCompositor::new();
1109        assert!(c.state().is_none());
1110    }
1111
1112    #[test]
1113    fn test_parse_resolution_accepts_hd() {
1114        assert_eq!(parse_resolution("1920x1080").unwrap(), (1920, 1080));
1115        assert_eq!(parse_resolution("1024x768").unwrap(), (1024, 768));
1116    }
1117
1118    #[test]
1119    fn test_parse_resolution_rejects_garbage() {
1120        for bad in [
1121            "",
1122            "1920",
1123            "1920x",
1124            "x1080",
1125            "0x0",
1126            "1920x0",
1127            "0x1080",
1128            "1920x1080x1",
1129            "abcxdef",
1130            "-1x1080",
1131            "1920 x 1080",
1132        ] {
1133            assert!(parse_resolution(bad).is_err(), "expected error for {bad:?}");
1134        }
1135    }
1136
1137    #[test]
1138    fn test_validate_scale_accepts_common_factors() {
1139        for ok in [0.5, 1.0, 1.25, 1.5, 1.6666, 1.75, 2.0, 3.0, 4.0] {
1140            assert!(validate_scale(ok).is_ok(), "expected {ok} to validate");
1141        }
1142    }
1143
1144    #[test]
1145    fn test_validate_scale_rejects_out_of_range_and_nonfinite() {
1146        for bad in [0.0, 0.49, 4.01, -1.0, f64::NAN, f64::INFINITY] {
1147            assert!(
1148                validate_scale(bad).is_err(),
1149                "expected {bad} to be rejected"
1150            );
1151        }
1152    }
1153
1154    #[test]
1155    fn test_nearest_supported_scale_snaps_to_closest() {
1156        let supported = [1.0, 1.25, 1.5, 1.75, 2.0];
1157        // Exact hits pass straight through.
1158        assert_eq!(nearest_supported_scale(1.5, &supported), 1.5);
1159        assert_eq!(nearest_supported_scale(2.0, &supported), 2.0);
1160        // 1.66 (166%) isn't advertised → nearest is 1.75.
1161        assert_eq!(nearest_supported_scale(1.66, &supported), 1.75);
1162        // 1.6 is closer to 1.5.
1163        assert_eq!(nearest_supported_scale(1.6, &supported), 1.5);
1164    }
1165
1166    #[test]
1167    fn test_nearest_supported_scale_empty_list_returns_request() {
1168        assert_eq!(nearest_supported_scale(1.5, &[]), 1.5);
1169    }
1170
1171    #[test]
1172    fn test_default_same_structure_as_new() {
1173        let c = MutterCompositor::default();
1174        assert!(c.wayland_display().starts_with("wayland-wd-"));
1175        assert!(c.runtime_dir().to_str().unwrap().contains("wd-session-"));
1176    }
1177}