waydriver_compositor_mutter/lib.rs
1//! Mutter implementation of [`waydriver::CompositorRuntime`].
2//!
3//! Owns the private-bus `dbus-daemon`, the `pipewire` + `wireplumber` pair,
4//! and a headless `mutter --wayland` instance. After [`MutterCompositor::start`]
5//! returns, [`MutterCompositor::state`] exposes an `Arc<MutterState>` that
6//! sibling backends (`waydriver-input-mutter`, `waydriver-capture-mutter`) use
7//! to talk to the same mutter D-Bus session.
8//!
9//! ## Shared-state invariant
10//!
11//! While any `Arc<MutterState>` exists, the mutter child processes and the
12//! private D-Bus connection MUST remain alive. [`waydriver::Session::kill`]
13//! enforces this by dropping input and capture trait objects before calling
14//! `compositor.stop().await`.
15
16mod error;
17
18use std::collections::HashMap;
19use std::path::{Path, PathBuf};
20use std::process::Stdio;
21use std::sync::{Arc, LazyLock, Mutex};
22
23use async_trait::async_trait;
24use tokio::process::{Child, Command};
25use zbus::zvariant::OwnedValue;
26
27use waydriver::gsettings::{self, GSettingEntry, GSettingsConfig};
28use waydriver::{CompositorRuntime, Result};
29
30use crate::error::MutterError;
31
32/// Default virtual-monitor geometry passed to mutter when the caller doesn't
33/// override it. Matches mutter's own implicit default.
34const DEFAULT_RESOLUTION: &str = "1024x768";
35
36/// Default logical-monitor scale: 1:1, i.e. `resolution` pixels are also the
37/// logical (application) size. Any other value drives the HiDPI path in
38/// [`apply_scale`].
39const DEFAULT_SCALE: f64 = 1.0;
40
41/// GVariant-text value seeded into `org.gnome.mutter experimental-features`
42/// when GSettings isolation is on. `scale-monitor-framebuffer` switches the
43/// native headless backend to logical layout mode, which is what makes
44/// fractional scales (1.5, 1.75, …) appear in a mode's `supported-scales` and
45/// be accepted by `ApplyMonitorsConfig`. Harmless at integer/1.0 scales.
46const MUTTER_FRACTIONAL_SCALING: &str = "['scale-monitor-framebuffer']";
47
48/// Accepted scale range. Below 0.5 the UI is unusably small; above 4.0 mutter
49/// won't offer the scale for any virtual-monitor mode we'd create. Validated
50/// up-front so a typo fails before we spawn any subprocess.
51const MIN_SCALE: f64 = 0.5;
52const MAX_SCALE: f64 = 4.0;
53
54/// How far a requested scale may sit from the nearest mutter-supported scale
55/// before we log a warning about snapping to it. Mutter only accepts scales it
56/// lists in a mode's `supported-scales`, so an exact arbitrary value (e.g.
57/// 1.66) may be nudged to the closest legal step.
58const SCALE_SNAP_TOLERANCE: f64 = 0.01;
59
60// ── DisplayConfig D-Bus shapes ───────────────────────────────────────────────
61//
62// Type aliases mirroring the `org.gnome.Mutter.DisplayConfig` wire types so
63// `body().deserialize::<CurrentState>()` validates the reply against the exact
64// signature mutter sends. The `a{sv}` property dicts are kept as
65// `HashMap<String, OwnedValue>` (signature `a{sv}`) and ignored — we only need
66// the connector, mode id, and supported-scales list.
67
68/// `a{sv}` — a D-Bus property dict.
69type DbusProps = HashMap<String, OwnedValue>;
70/// `(siiddada{sv})` — one monitor mode: id, width, height, refresh, preferred
71/// scale, supported scales, properties.
72type MonitorMode = (String, i32, i32, f64, f64, Vec<f64>, DbusProps);
73/// `(ssss)` — connector, vendor, product, serial.
74type MonitorSpec = (String, String, String, String);
75/// `((ssss)a(siiddada{sv})a{sv})` — one physical monitor: spec, modes, props.
76type PhysicalMonitor = (MonitorSpec, Vec<MonitorMode>, DbusProps);
77/// `(iiduba(ssss)a{sv})` — one logical monitor in the current layout.
78type LogicalMonitor = (i32, i32, f64, u32, bool, Vec<MonitorSpec>, DbusProps);
79/// Return tuple of `GetCurrentState`: `(serial, monitors, logical, props)`.
80type CurrentState = (u32, Vec<PhysicalMonitor>, Vec<LogicalMonitor>, DbusProps);
81
82/// `(ssa{sv})` — one monitor assignment in an `ApplyMonitorsConfig` request:
83/// connector, mode id, properties.
84type MonitorAssignment = (String, String, DbusProps);
85/// `(iiduba(ssa{sv}))` — one logical monitor to apply: x, y, scale, transform,
86/// primary, assigned monitors.
87type LogicalMonitorConfig = (i32, i32, f64, u32, bool, Vec<MonitorAssignment>);
88
89/// Shared mutter-backend state consumed by `waydriver-input-mutter` and
90/// `waydriver-capture-mutter`.
91///
92/// **Invariant:** while any `Arc<MutterState>` exists, the underlying D-Bus
93/// connection and the mutter child process must remain alive. See the
94/// module docs for details.
95///
96/// Fields are private — all access goes through the accessor methods
97/// below. Sibling crates (`waydriver-input-mutter`,
98/// `waydriver-capture-mutter`) that previously read fields directly
99/// now call `state.conn()`, `state.rd_session_path()`, etc. The
100/// shape of the underlying storage (e.g. how `active_stream_path` is
101/// guarded) is therefore an implementation detail that can change
102/// without breaking those callers — the contract lives entirely in
103/// the method signatures.
104pub struct MutterState {
105 conn: zbus::Connection,
106 rd_session_path: String,
107 rd_session_id: String,
108 rd_started: Arc<Mutex<bool>>,
109 runtime_dir: PathBuf,
110 active_stream_path: Arc<Mutex<Option<String>>>,
111}
112
113impl MutterState {
114 /// Persistent connection to mutter's private D-Bus.
115 ///
116 /// Both sibling backends (`waydriver-input-mutter`,
117 /// `waydriver-capture-mutter`) issue all their RemoteDesktop and
118 /// ScreenCast method calls through this connection.
119 pub fn conn(&self) -> &zbus::Connection {
120 &self.conn
121 }
122
123 /// RemoteDesktop session object path. Used by
124 /// `waydriver-input-mutter` as the `path` argument on every
125 /// pointer / keyboard `Notify*` D-Bus call.
126 pub fn rd_session_path(&self) -> &str {
127 &self.rd_session_path
128 }
129
130 /// RemoteDesktop session id, read from the `SessionId` property on
131 /// the RD session. `waydriver-capture-mutter` passes this as the
132 /// `remote-desktop-session-id` option to
133 /// `ScreenCast.CreateSession` so mutter links the two; the link is
134 /// required for `NotifyPointerMotionAbsolute` to be accepted.
135 pub fn rd_session_id(&self) -> &str {
136 &self.rd_session_id
137 }
138
139 /// Per-session `XDG_RUNTIME_DIR`. `waydriver-capture-mutter` joins
140 /// this with `pipewire-0` to locate the PipeWire socket.
141 pub fn runtime_dir(&self) -> &Path {
142 &self.runtime_dir
143 }
144
145 /// Lock the "RD-started" flag.
146 ///
147 /// Acquires the underlying mutex and returns the guard so the
148 /// caller can perform a check-and-set under one critical section
149 /// (the capture backend defers `RD.Session.Start` until the first
150 /// linked `ScreenCast.CreateSession` succeeds — that's a load,
151 /// some D-Bus work, and a store; splitting the read and write
152 /// would race). `Error::Process` if the mutex is poisoned.
153 pub fn rd_started_lock(&self) -> Result<std::sync::MutexGuard<'_, bool>> {
154 self.rd_started
155 .lock()
156 .map_err(|_| waydriver::Error::process("rd_started mutex poisoned"))
157 }
158
159 /// Lock the active ScreenCast Stream object path.
160 ///
161 /// Set by `waydriver-capture-mutter` in `start_stream`, cleared in
162 /// `stop_stream`. `waydriver-input-mutter` reads it to route
163 /// `NotifyPointerMotionAbsolute` at the correct monitor. `None`
164 /// inside the guard means no stream is open — absolute pointer
165 /// motion will error.
166 pub fn active_stream_path_lock(&self) -> Result<std::sync::MutexGuard<'_, Option<String>>> {
167 self.active_stream_path
168 .lock()
169 .map_err(|_| waydriver::Error::process("active_stream_path mutex poisoned"))
170 }
171}
172
173/// Headless mutter instance.
174pub struct MutterCompositor {
175 id: String,
176 wayland_display: String,
177 runtime_dir: PathBuf,
178 mutter_dbus_address: String,
179 mutter_dbus_pid: Option<u32>,
180 mutter: Option<Child>,
181 pipewire: Option<Child>,
182 wireplumber: Option<Child>,
183 state: Option<Arc<MutterState>>,
184 gsettings: GSettingsConfig,
185}
186
187/// The host runtime root under which every session's `wd-session-<id>`
188/// directory is created. Snapshotted once, lazily, on the first
189/// `MutterCompositor::new()` call.
190///
191/// This is deliberately read **once** and cached, rather than re-read from
192/// `XDG_RUNTIME_DIR` per session. `waydriver`'s screenshot and video pipelines
193/// (`waydriver::capture`) mutate the parent process's `XDG_RUNTIME_DIR` to
194/// point `pipewiresrc` at the *live* session's pipewire socket, and never
195/// restore it. If `new()` re-read the live env each time, session N+1's
196/// runtime dir would be created **inside** session N's dir
197/// (`…/wd-session-A/wd-session-B/…`), nesting one level deeper per session.
198/// After ~4 levels the `<dir>/pipewire-0` path exceeds the ~107-byte AF_UNIX
199/// `sun_path` limit, pipewire can no longer bind its socket, and every
200/// subsequent `start_session` fails with a "timeout: pipewire socket" error
201/// until the server is restarted (which resets the process env). Snapshotting
202/// the root keeps each session dir a flat sibling under the original
203/// `XDG_RUNTIME_DIR`, independent of how many sessions preceded it.
204///
205/// The first `new()` runs before any session exists, so the env is still the
206/// pristine value set by the launcher (e.g. the Docker entrypoint) — capturing
207/// it then is safe.
208static HOST_RUNTIME_ROOT: LazyLock<PathBuf> = LazyLock::new(|| {
209 let root = std::env::var("XDG_RUNTIME_DIR")
210 .unwrap_or_else(|_| format!("/run/user/{}", unsafe { libc::getuid() }));
211 PathBuf::from(root)
212});
213
214impl MutterCompositor {
215 /// Construct but do not start. Generates the session id and computes
216 /// where the Wayland socket and runtime dir will live. No I/O.
217 pub fn new() -> Self {
218 let id = uuid::Uuid::new_v4().to_string()[..8].to_string();
219 let wayland_display = format!("wayland-wd-{}", id);
220
221 let runtime_dir = HOST_RUNTIME_ROOT.join(format!("wd-session-{}", id));
222
223 Self {
224 id,
225 wayland_display,
226 runtime_dir,
227 mutter_dbus_address: String::new(),
228 mutter_dbus_pid: None,
229 mutter: None,
230 pipewire: None,
231 wireplumber: None,
232 state: None,
233 gsettings: GSettingsConfig::default(),
234 }
235 }
236
237 /// Set the per-session GSettings isolation config (see
238 /// [`waydriver::gsettings`]). Defaults to isolated with no seeded entries.
239 /// When isolated, [`start`](Self::start) writes a private keyfile (seeded
240 /// with `org.gnome.mutter experimental-features` plus `config.initial`)
241 /// and points mutter at it, so fractional scales work and the host's dconf
242 /// is neither read nor written. Pass `isolated: false` to run mutter
243 /// against the host's GSettings instead.
244 pub fn with_gsettings(mut self, config: GSettingsConfig) -> Self {
245 self.gsettings = config;
246 self
247 }
248
249 /// Returns the shared `Arc<MutterState>` for passing to sibling
250 /// backends, or `None` when called outside the started window.
251 ///
252 /// `None` is returned when:
253 /// - `start()` has not yet completed (or returned an error), or
254 /// - `stop()` has been called and dropped the state.
255 ///
256 /// Callers that have just awaited `start()?` know the state is
257 /// present — `expect()` or `?`-with-typed-error is appropriate
258 /// there. Returning `Option` instead of panicking keeps the API
259 /// honest about the lifecycle and lets callers detect "stopped"
260 /// without first matching on a panic.
261 pub fn state(&self) -> Option<Arc<MutterState>> {
262 self.state.clone()
263 }
264}
265
266impl Default for MutterCompositor {
267 fn default() -> Self {
268 Self::new()
269 }
270}
271
272impl MutterCompositor {
273 /// Typed-error implementation of `start`. The trait method calls
274 /// this and converts the result via `From<MutterError>`.
275 ///
276 /// Steps (each fails with a specific `MutterError` variant):
277 /// 1. validate resolution + scale,
278 /// 2. ensure the session runtime dir exists,
279 /// 3. spawn a private `dbus-daemon` and parse its address + PID,
280 /// 4. spawn `pipewire` + `wireplumber` on that bus,
281 /// 5. spawn headless `mutter --wayland`,
282 /// 6. wait for the Wayland socket,
283 /// 7. open a zbus connection, retry-create the RemoteDesktop session,
284 /// 8. read its `SessionId` property,
285 /// 9. apply a non-default logical-monitor scale via DisplayConfig,
286 /// 10. publish the `Arc<MutterState>` for sibling backends.
287 async fn start_inner(
288 &mut self,
289 resolution: Option<&str>,
290 scale: Option<f64>,
291 ) -> std::result::Result<(), MutterError> {
292 let resolution = resolution.unwrap_or(DEFAULT_RESOLUTION);
293 // Validate before we start spawning subprocesses — mutter silently
294 // ignores bad --virtual-monitor values and falls back to its own
295 // default, which would surprise the caller.
296 parse_resolution(resolution)?;
297 let scale = scale.unwrap_or(DEFAULT_SCALE);
298 // Fail fast on a nonsense scale too, for the same reason — the
299 // DisplayConfig apply that consumes it doesn't run until mutter is up.
300 validate_scale(scale)?;
301
302 tracing::info!(
303 id = self.id,
304 resolution,
305 scale,
306 isolated = self.gsettings.isolated,
307 "starting mutter compositor"
308 );
309
310 tokio::fs::create_dir_all(&self.runtime_dir).await?;
311 // `runtime_dir` is built in `new()` from a UTF-8 String
312 // (XDG_RUNTIME_DIR or `/run/user/<uid>`) joined with a UTF-8
313 // ASCII session id, so the path is guaranteed valid UTF-8.
314 // `expect` documents that invariant rather than re-deriving
315 // it via the `to_str()` `Option`.
316 let runtime_str = self
317 .runtime_dir
318 .to_str()
319 .expect("invariant: runtime_dir built from UTF-8 inputs in new()")
320 .to_string();
321
322 // GSettings isolation: when on, write the session's private keyfile
323 // (read by both mutter and the app — see `waydriver::gsettings`) and
324 // compute the env that points mutter at it. The keyfile is seeded with
325 // the fractional-scaling experimental feature so a non-integer `scale`
326 // is actually advertised by mutter, then any caller-supplied entries
327 // are appended (last-wins, so callers can override). When off, mutter
328 // reads the host's GSettings and `config_env` stays empty.
329 let config_env: Vec<(&str, String)> = if self.gsettings.isolated {
330 let mut entries = vec![GSettingEntry::new(
331 "org.gnome.mutter",
332 "experimental-features",
333 MUTTER_FRACTIONAL_SCALING,
334 )];
335 entries.extend(self.gsettings.initial.iter().cloned());
336 gsettings::write_keyfile(&self.runtime_dir, &entries)?;
337 let config_dir = gsettings::config_dir(&self.runtime_dir)
338 .to_str()
339 .expect("invariant: config_dir is runtime_dir (UTF-8) + ASCII suffix")
340 .to_string();
341 vec![
342 ("XDG_CONFIG_HOME", config_dir),
343 ("GSETTINGS_BACKEND", gsettings::KEYFILE_BACKEND.to_string()),
344 ]
345 } else {
346 Vec::new()
347 };
348
349 // Step 1: Private D-Bus for mutter (so its ScreenCast API doesn't conflict with host).
350 let dbus_output = Command::new("dbus-launch")
351 .arg("--sh-syntax")
352 .output()
353 .await?;
354 if !dbus_output.status.success() {
355 return Err(MutterError::DbusLaunchFailed(
356 String::from_utf8_lossy(&dbus_output.stderr).into_owned(),
357 ));
358 }
359 let dbus_stdout = String::from_utf8_lossy(&dbus_output.stdout);
360 self.mutter_dbus_address = parse_dbus_address(&dbus_stdout)?;
361 self.mutter_dbus_pid = Some(parse_dbus_pid(&dbus_stdout)?);
362 tracing::debug!(id = self.id, mutter_dbus_address = %self.mutter_dbus_address, "private D-Bus for mutter");
363
364 // Step 2: PipeWire + WirePlumber (for screenshots via ScreenCast).
365 //
366 // `env_remove("PIPEWIRE_REMOTE")` is load-bearing: `waydriver`'s
367 // `grab_png_sync` mutates the parent's process env to point
368 // `pipewiresrc` at the live session's pipewire socket. After a
369 // session stops, that socket is gone but the env var lingers in
370 // the parent. Without scrubbing it here, a freshly spawned
371 // `pipewire`/`wireplumber`/`mutter` for the next session would
372 // inherit the stale value and try to connect to the previous
373 // session's dead socket — wireplumber/mutter prefer
374 // `PIPEWIRE_REMOTE` over `XDG_RUNTIME_DIR/pipewire-0`, so the
375 // explicit `XDG_RUNTIME_DIR` override below isn't enough.
376 // Symptom: `ScreenCast.Start` fails with "Couldn't connect
377 // pipewire context" on every session after the first.
378 let pipewire = Command::new("pipewire")
379 .env_remove("PIPEWIRE_REMOTE")
380 .env("DBUS_SESSION_BUS_ADDRESS", &self.mutter_dbus_address)
381 .env("XDG_RUNTIME_DIR", &runtime_str)
382 .stdout(Stdio::null())
383 .stderr(Stdio::null())
384 .spawn()
385 .map_err(|source| MutterError::Spawn {
386 process: "pipewire",
387 source,
388 })?;
389 self.pipewire = Some(pipewire);
390
391 // Wait for pipewire's socket to appear before launching
392 // wireplumber. Polling for the socket file is the same
393 // readiness signal `wait_for_wayland_socket` uses for
394 // mutter: it's the actual handshake clients use, so any
395 // earlier signal would either be racier (process spawn) or
396 // just as expensive to probe.
397 wait_for_pipewire_socket(&runtime_str).await?;
398
399 let wireplumber = Command::new("wireplumber")
400 .env_remove("PIPEWIRE_REMOTE")
401 .env("DBUS_SESSION_BUS_ADDRESS", &self.mutter_dbus_address)
402 .env("XDG_RUNTIME_DIR", &runtime_str)
403 .stdout(Stdio::null())
404 .stderr(Stdio::null())
405 .spawn()
406 .map_err(|source| MutterError::Spawn {
407 process: "wireplumber",
408 source,
409 })?;
410 self.wireplumber = Some(wireplumber);
411
412 // No bus-readiness signal poll for wireplumber: it's a
413 // session-policy daemon that doesn't register a stable D-Bus
414 // name we can probe, and its initialisation runs in parallel
415 // with mutter's own startup. The downstream
416 // `ScreenCast.CreateSession` retry loop in
417 // `waydriver-capture-mutter::start_stream` is what actually
418 // gates on wireplumber having joined the graph — putting a
419 // pessimistic sleep here as well would add startup latency
420 // without changing correctness.
421 tracing::debug!(id = self.id, "PipeWire + WirePlumber started");
422
423 // Step 3: mutter in headless Wayland mode (on its private D-Bus).
424 let mutter = Command::new("mutter")
425 .args([
426 "--headless",
427 "--wayland",
428 "--no-x11",
429 "--wayland-display",
430 &self.wayland_display,
431 "--virtual-monitor",
432 resolution,
433 ])
434 .env_remove("PIPEWIRE_REMOTE")
435 .env("DBUS_SESSION_BUS_ADDRESS", &self.mutter_dbus_address)
436 .env("XDG_RUNTIME_DIR", &runtime_str)
437 // Empty when isolation is off; otherwise points mutter at the
438 // per-session keyfile GSettings store written above.
439 .envs(config_env.iter().map(|(k, v)| (*k, v.as_str())))
440 .stdout(Stdio::null())
441 .stderr(Stdio::inherit())
442 .spawn()
443 .map_err(|source| MutterError::Spawn {
444 process: "mutter",
445 source,
446 })?;
447 self.mutter = Some(mutter);
448 tracing::debug!(id = self.id, wayland_display = %self.wayland_display, "mutter spawned");
449
450 // Step 4: Wait for the Wayland socket.
451 wait_for_wayland_socket(&runtime_str, &self.wayland_display).await?;
452 tracing::debug!(id = self.id, "wayland socket ready");
453
454 // Step 5: Connect to mutter's private D-Bus and start RemoteDesktop session.
455 let mutter_addr: zbus::address::Address = self
456 .mutter_dbus_address
457 .as_str()
458 .try_into()
459 .map_err(|source: zbus::Error| MutterError::DbusAddressInvalid {
460 addr: self.mutter_dbus_address.clone(),
461 source,
462 })?;
463 let mutter_conn = zbus::connection::Builder::address(mutter_addr)
464 .map_err(|source| MutterError::DbusConnect {
465 stage: "build connection builder",
466 source,
467 })?
468 .build()
469 .await
470 .map_err(|source| MutterError::DbusConnect {
471 stage: "connect",
472 source,
473 })?;
474
475 // Wait for mutter to register its D-Bus services (may take a moment after socket appears)
476 let mut rd_reply = None;
477 for i in 0..50 {
478 match mutter_conn
479 .call_method(
480 Some("org.gnome.Mutter.RemoteDesktop"),
481 "/org/gnome/Mutter/RemoteDesktop",
482 Some("org.gnome.Mutter.RemoteDesktop"),
483 "CreateSession",
484 &(),
485 )
486 .await
487 {
488 Ok(reply) => {
489 rd_reply = Some(reply);
490 break;
491 }
492 Err(e) if i < 49 => {
493 tracing::debug!(
494 id = self.id,
495 attempt = i,
496 "waiting for RemoteDesktop service: {e}"
497 );
498 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
499 }
500 Err(e) => {
501 return Err(MutterError::RemoteDesktopCreate(e));
502 }
503 }
504 }
505 // The retry loop above either `break`s with `rd_reply = Some(_)`
506 // or returns `Err(...)` from the final attempt — `unwrap` here
507 // is unreachable by construction.
508 let rd_reply = rd_reply.expect("retry loop sets Some on break or returns Err");
509 let rd_session_path: zbus::zvariant::OwnedObjectPath = rd_reply
510 .body()
511 .deserialize()
512 .map_err(MutterError::RdSessionPathParse)?;
513 // Intentionally do NOT call `RemoteDesktop.Session.Start` here.
514 // Mutter only accepts `remote-desktop-session-id` on
515 // `ScreenCast.CreateSession` when the RD session is not yet
516 // started, so `waydriver-capture-mutter::start_stream` defers
517 // the Start call until after it has created the linked
518 // ScreenCast session.
519 // Read the RD session's `SessionId` property — it's the token
520 // ScreenCast.CreateSession needs in `remote-desktop-session-id`
521 // to link the two sessions. Without that link, mutter rejects
522 // NotifyPointerMotionAbsolute with "No screen cast active".
523 let rd_session_id_reply = mutter_conn
524 .call_method(
525 Some("org.gnome.Mutter.RemoteDesktop"),
526 rd_session_path.as_str(),
527 Some("org.freedesktop.DBus.Properties"),
528 "Get",
529 &("org.gnome.Mutter.RemoteDesktop.Session", "SessionId"),
530 )
531 .await
532 .map_err(MutterError::SessionIdGet)?;
533 // `Get` returns a variant; deserialize as `OwnedValue` to detach
534 // the string from the reply's body before the reply is dropped.
535 let rd_session_id_body = rd_session_id_reply.body();
536 let rd_session_id_variant: zbus::zvariant::OwnedValue = rd_session_id_body
537 .deserialize()
538 .map_err(MutterError::SessionIdVariantParse)?;
539 let rd_session_id: String = rd_session_id_variant
540 .try_into()
541 .map_err(MutterError::SessionIdNotString)?;
542
543 let rd_session_path = rd_session_path.to_string();
544 tracing::debug!(
545 id = self.id,
546 rd_session_path = %rd_session_path,
547 rd_session_id = %rd_session_id,
548 "RemoteDesktop session started"
549 );
550
551 // Step: apply a non-default logical-monitor scale. `--virtual-monitor`
552 // has no scale component, so HiDPI is configured here over mutter's
553 // private bus once DisplayConfig is up. Skipped at 1.0 to leave the
554 // default 1:1 path completely untouched.
555 if (scale - DEFAULT_SCALE).abs() > f64::EPSILON {
556 let applied = apply_scale(&mutter_conn, scale, &self.id).await?;
557 tracing::info!(
558 id = self.id,
559 requested = scale,
560 applied,
561 "applied logical-monitor scale"
562 );
563 }
564
565 self.state = Some(Arc::new(MutterState {
566 conn: mutter_conn,
567 rd_session_path,
568 rd_session_id,
569 rd_started: Arc::new(Mutex::new(false)),
570 runtime_dir: self.runtime_dir.clone(),
571 active_stream_path: Arc::new(Mutex::new(None)),
572 }));
573
574 Ok(())
575 }
576}
577
578#[async_trait]
579impl CompositorRuntime for MutterCompositor {
580 async fn start(&mut self, resolution: Option<&str>, scale: Option<f64>) -> Result<()> {
581 // Body uses the crate-local typed `MutterError`. The `?` at the
582 // end of `self.start_inner(...).await?` runs the
583 // `From<MutterError> for waydriver::Error` impl in `error.rs`,
584 // which is the single boundary at which the typed enum becomes
585 // the workspace's shared `waydriver::Error`.
586 Ok(self.start_inner(resolution, scale).await?)
587 }
588
589 async fn stop(&mut self) -> Result<()> {
590 tracing::info!(id = self.id, "stopping mutter compositor");
591
592 // Stop RemoteDesktop session if still reachable. We could
593 // touch the private fields directly here (same crate), but
594 // routing through the public accessors keeps the contract
595 // visible and means a future change to the field layout
596 // doesn't need to update this site.
597 if let Some(state) = &self.state {
598 let _ = state
599 .conn()
600 .call_method(
601 Some("org.gnome.Mutter.RemoteDesktop"),
602 state.rd_session_path(),
603 Some("org.gnome.Mutter.RemoteDesktop.Session"),
604 "Stop",
605 &(),
606 )
607 .await;
608 }
609
610 // Drop our strong ref to the shared state. If callers haven't dropped
611 // theirs (the input/capture trait objects), their Arc still points at
612 // the D-Bus connection we're about to tear down below — any method
613 // call on them after this will fail with "connection closed".
614 self.state = None;
615
616 if let Some(mut mutter) = self.mutter.take() {
617 let _ = mutter.kill().await;
618 let _ = mutter.wait().await;
619 }
620 if let Some(mut wireplumber) = self.wireplumber.take() {
621 let _ = wireplumber.kill().await;
622 let _ = wireplumber.wait().await;
623 }
624 if let Some(mut pipewire) = self.pipewire.take() {
625 let _ = pipewire.kill().await;
626 let _ = pipewire.wait().await;
627 }
628
629 if let Some(pid) = self.mutter_dbus_pid.take() {
630 unsafe {
631 libc::kill(pid as i32, libc::SIGTERM);
632 }
633 }
634
635 let _ = tokio::fs::remove_dir_all(&self.runtime_dir).await;
636
637 tracing::debug!(id = self.id, "mutter compositor stopped");
638 Ok(())
639 }
640
641 fn id(&self) -> &str {
642 &self.id
643 }
644
645 fn wayland_display(&self) -> &str {
646 &self.wayland_display
647 }
648
649 fn runtime_dir(&self) -> &Path {
650 &self.runtime_dir
651 }
652}
653
654impl Drop for MutterCompositor {
655 fn drop(&mut self) {
656 // Best-effort cleanup when dropped without calling stop().
657 // Can't use async here, so send SIGKILL synchronously.
658 self.state = None;
659
660 if let Some(ref mut child) = self.mutter {
661 let _ = child.start_kill();
662 }
663 if let Some(ref mut child) = self.wireplumber {
664 let _ = child.start_kill();
665 }
666 if let Some(ref mut child) = self.pipewire {
667 let _ = child.start_kill();
668 }
669 if let Some(pid) = self.mutter_dbus_pid {
670 unsafe {
671 libc::kill(pid as i32, libc::SIGKILL);
672 }
673 }
674 let _ = std::fs::remove_dir_all(&self.runtime_dir);
675 }
676}
677
678// ── Helpers ─────────────────────────────────────────────────────────────────
679
680fn parse_dbus_address(output: &str) -> std::result::Result<String, MutterError> {
681 for line in output.lines() {
682 if let Some(rest) = line.strip_prefix("DBUS_SESSION_BUS_ADDRESS='") {
683 if let Some(addr) = rest.strip_suffix("';") {
684 return Ok(addr.to_string());
685 }
686 }
687 }
688 Err(MutterError::DbusOutputMissingField {
689 field: "DBUS_SESSION_BUS_ADDRESS",
690 })
691}
692
693fn parse_dbus_pid(output: &str) -> std::result::Result<u32, MutterError> {
694 for line in output.lines() {
695 if let Some(rest) = line.strip_prefix("DBUS_SESSION_BUS_PID=") {
696 let pid_str = rest.trim_end_matches(';').trim();
697 return pid_str.parse().map_err(MutterError::DbusPidParse);
698 }
699 }
700 Err(MutterError::DbusOutputMissingField {
701 field: "DBUS_SESSION_BUS_PID",
702 })
703}
704
705fn parse_resolution(s: &str) -> std::result::Result<(u32, u32), MutterError> {
706 let invalid = || MutterError::ResolutionInvalid {
707 value: s.to_string(),
708 };
709 let (w, h) = s.split_once('x').ok_or_else(invalid)?;
710 let parse = |part: &str| -> std::result::Result<u32, MutterError> {
711 part.parse::<u32>()
712 .ok()
713 .filter(|n| *n > 0)
714 .ok_or_else(invalid)
715 };
716 Ok((parse(w)?, parse(h)?))
717}
718
719/// Reject a scale that isn't a finite, positive factor inside
720/// [`MIN_SCALE`]..=[`MAX_SCALE`]. Run before any subprocess spawns so a bad
721/// value fails fast.
722fn validate_scale(scale: f64) -> std::result::Result<(), MutterError> {
723 if scale.is_finite() && (MIN_SCALE..=MAX_SCALE).contains(&scale) {
724 Ok(())
725 } else {
726 Err(MutterError::ScaleInvalid {
727 value: scale,
728 min: MIN_SCALE,
729 max: MAX_SCALE,
730 })
731 }
732}
733
734/// Pick the entry of `supported` closest to `requested`. Mutter only accepts
735/// scales it advertises for a mode, so an arbitrary request (e.g. 1.66) is
736/// snapped to the nearest legal step. Falls back to `requested` when the list
737/// is empty (mutter then validates — and likely rejects — it).
738fn nearest_supported_scale(requested: f64, supported: &[f64]) -> f64 {
739 supported
740 .iter()
741 .copied()
742 .min_by(|a, b| {
743 let da = (a - requested).abs();
744 let db = (b - requested).abs();
745 da.partial_cmp(&db).unwrap_or(std::cmp::Ordering::Equal)
746 })
747 .unwrap_or(requested)
748}
749
750/// Apply `requested` as the logical-monitor scale of the (single) virtual
751/// monitor via `org.gnome.Mutter.DisplayConfig`, returning the scale mutter
752/// actually accepted (snapped to a supported step).
753///
754/// Reads `GetCurrentState` fresh each call so the `serial` is current —
755/// mutter rejects `ApplyMonitorsConfig` on a stale serial, and the serial
756/// bumps when the virtual monitor first appears. Fractional scales (1.5,
757/// 1.75, …) require the `scale-monitor-framebuffer` experimental feature; the
758/// container entrypoint enables it. Without it only integer scales are
759/// advertised, so [`nearest_supported_scale`] would snap a fractional request
760/// to 1.0 or 2.0.
761async fn apply_scale(
762 conn: &zbus::Connection,
763 requested: f64,
764 id: &str,
765) -> std::result::Result<f64, MutterError> {
766 let state_reply = conn
767 .call_method(
768 Some("org.gnome.Mutter.DisplayConfig"),
769 "/org/gnome/Mutter/DisplayConfig",
770 Some("org.gnome.Mutter.DisplayConfig"),
771 "GetCurrentState",
772 &(),
773 )
774 .await
775 .map_err(|source| MutterError::DisplayConfigState {
776 stage: "call",
777 source,
778 })?;
779 let state_body = state_reply.body();
780 let (serial, monitors, _logical, _props): CurrentState =
781 state_body
782 .deserialize()
783 .map_err(|source| MutterError::DisplayConfigState {
784 stage: "deserialize",
785 source,
786 })?;
787
788 // Headless mutter started with a single `--virtual-monitor` exposes
789 // exactly one monitor advertising exactly the mode we asked for, so the
790 // first monitor / first mode is the one to scale.
791 let (spec, modes, _mprops) = monitors
792 .into_iter()
793 .next()
794 .ok_or(MutterError::DisplayConfigNoMonitor)?;
795 let connector = spec.0;
796 let (mode_id, _w, _h, _refresh, _preferred, supported, _modeprops) =
797 modes
798 .into_iter()
799 .next()
800 .ok_or(MutterError::DisplayConfigNoMonitor)?;
801
802 let applied = nearest_supported_scale(requested, &supported);
803 if (applied - requested).abs() > SCALE_SNAP_TOLERANCE {
804 tracing::warn!(
805 id,
806 requested,
807 applied,
808 supported = ?supported,
809 "requested scale not advertised by mutter; snapped to nearest supported"
810 );
811 }
812
813 // (x, y, scale, transform, primary, [(connector, mode_id, {})]).
814 let logical: LogicalMonitorConfig = (
815 0,
816 0,
817 applied,
818 0,
819 true,
820 vec![(connector, mode_id, DbusProps::new())],
821 );
822 // method 1 = temporary: applies for this session without writing
823 // ~/.config/monitors.xml, which is all a throwaway headless run needs.
824 conn.call_method(
825 Some("org.gnome.Mutter.DisplayConfig"),
826 "/org/gnome/Mutter/DisplayConfig",
827 Some("org.gnome.Mutter.DisplayConfig"),
828 "ApplyMonitorsConfig",
829 &(serial, 1u32, vec![logical], DbusProps::new()),
830 )
831 .await
832 .map_err(|source| MutterError::DisplayConfigApply {
833 scale: applied,
834 source,
835 })?;
836
837 Ok(applied)
838}
839
840async fn wait_for_wayland_socket(
841 runtime_dir: &str,
842 display: &str,
843) -> std::result::Result<(), MutterError> {
844 let socket_path = PathBuf::from(runtime_dir).join(display);
845 for _ in 0..50 {
846 if socket_path.exists() {
847 return Ok(());
848 }
849 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
850 }
851 Err(MutterError::WaylandSocketTimeout {
852 socket: socket_path.display().to_string(),
853 })
854}
855
856/// PipeWire creates `<runtime_dir>/pipewire-0` as soon as it's ready
857/// to accept client connections. Polling for that file replaces the
858/// previous unconditional `sleep(1s)` after spawning the pipewire
859/// process — same readiness model as
860/// [`wait_for_wayland_socket`].
861async fn wait_for_pipewire_socket(runtime_dir: &str) -> std::result::Result<(), MutterError> {
862 let socket_path = PathBuf::from(runtime_dir).join("pipewire-0");
863 for _ in 0..50 {
864 if socket_path.exists() {
865 return Ok(());
866 }
867 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
868 }
869 Err(MutterError::PipewireSocketTimeout {
870 socket: socket_path.display().to_string(),
871 })
872}
873
874#[cfg(test)]
875mod tests {
876 use super::*;
877
878 /// Live end-to-end check that a fractional scale is actually applied by a
879 /// real mutter. Requires the runtime stack (mutter, pipewire, wireplumber,
880 /// dbus-launch) on `PATH` plus the GSettings schemas in `XDG_DATA_DIRS`, so
881 /// it's `#[ignore]`d by default; run with the dev shell's env via
882 /// `cargo test -p waydriver-compositor-mutter -- --ignored`.
883 ///
884 /// This exercises the whole chain at once: the per-session keyfile must
885 /// enable `scale-monitor-framebuffer` (otherwise 1.5 is not advertised and
886 /// would snap to 1.0/2.0), and `apply_scale` must drive DisplayConfig
887 /// correctly. We read the scale straight back from `GetCurrentState`.
888 #[tokio::test]
889 #[ignore = "requires a live mutter/pipewire/dbus runtime stack"]
890 async fn applies_fractional_scale_against_real_mutter() {
891 let mut compositor = MutterCompositor::new();
892 compositor
893 .start(Some("1920x1080"), Some(1.5))
894 .await
895 .expect("compositor should start");
896 let state = compositor.state().expect("state present after start");
897
898 let reply = state
899 .conn()
900 .call_method(
901 Some("org.gnome.Mutter.DisplayConfig"),
902 "/org/gnome/Mutter/DisplayConfig",
903 Some("org.gnome.Mutter.DisplayConfig"),
904 "GetCurrentState",
905 &(),
906 )
907 .await
908 .expect("GetCurrentState should succeed");
909 let body = reply.body();
910 let (_serial, _monitors, logical, _props): CurrentState = body
911 .deserialize()
912 .expect("GetCurrentState body should deserialize");
913 let applied = logical.first().expect("at least one logical monitor").2;
914
915 compositor.stop().await.expect("compositor should stop");
916
917 assert!(
918 (applied - 1.5).abs() < 0.01,
919 "expected logical scale ~1.5 (fractional scaling enabled via keyfile), got {applied}"
920 );
921 }
922
923 #[test]
924 fn test_parse_dbus_address_valid() {
925 let output = "DBUS_SESSION_BUS_ADDRESS='unix:abstract=/tmp/dbus-XXX,guid=abc123';\nDBUS_SESSION_BUS_PID=12345;\n";
926 let addr = parse_dbus_address(output).unwrap();
927 assert_eq!(addr, "unix:abstract=/tmp/dbus-XXX,guid=abc123");
928 }
929
930 #[test]
931 fn test_parse_dbus_address_missing() {
932 let output = "DBUS_SESSION_BUS_PID=12345;\n";
933 assert!(parse_dbus_address(output).is_err());
934 }
935
936 #[test]
937 fn test_parse_dbus_pid_valid() {
938 let output = "DBUS_SESSION_BUS_ADDRESS='unix:abstract=/tmp/dbus-XXX,guid=abc123';\nDBUS_SESSION_BUS_PID=12345;\n";
939 let pid = parse_dbus_pid(output).unwrap();
940 assert_eq!(pid, 12345);
941 }
942
943 #[test]
944 fn test_parse_dbus_pid_missing() {
945 let output = "DBUS_SESSION_BUS_ADDRESS='unix:abstract=/tmp/dbus-XXX,guid=abc123';\n";
946 assert!(parse_dbus_pid(output).is_err());
947 }
948
949 #[test]
950 fn test_parse_dbus_pid_invalid() {
951 let output = "DBUS_SESSION_BUS_PID=notanumber;\n";
952 assert!(parse_dbus_pid(output).is_err());
953 }
954
955 #[tokio::test]
956 async fn test_wait_for_socket_found() {
957 let dir = tempfile::tempdir().unwrap();
958 let runtime_dir = dir.path().to_str().unwrap().to_string();
959 let display = "wayland-test-99";
960 std::fs::File::create(dir.path().join(display)).unwrap();
961 wait_for_wayland_socket(&runtime_dir, display)
962 .await
963 .unwrap();
964 }
965
966 #[tokio::test]
967 async fn test_wait_for_pipewire_socket_found() {
968 let dir = tempfile::tempdir().unwrap();
969 let runtime_dir = dir.path().to_str().unwrap().to_string();
970 std::fs::File::create(dir.path().join("pipewire-0")).unwrap();
971 wait_for_pipewire_socket(&runtime_dir).await.unwrap();
972 }
973
974 #[tokio::test]
975 async fn test_wait_for_pipewire_socket_timeout() {
976 let dir = tempfile::tempdir().unwrap();
977 let runtime_dir = dir.path().to_str().unwrap().to_string();
978 let err = wait_for_pipewire_socket(&runtime_dir).await.unwrap_err();
979 assert!(
980 matches!(err, MutterError::PipewireSocketTimeout { .. }),
981 "expected PipewireSocketTimeout, got: {err}"
982 );
983 // Public mapping: same Timeout bucket as the wayland one,
984 // so workspace callers matching `Error::Timeout(_)` (e.g.
985 // the e2e tests) keep working.
986 let public: waydriver::Error = err.into();
987 assert!(
988 matches!(public, waydriver::Error::Timeout(_)),
989 "expected waydriver::Error::Timeout, got: {public}"
990 );
991 }
992
993 #[tokio::test]
994 async fn test_wait_for_socket_timeout() {
995 let dir = tempfile::tempdir().unwrap();
996 let runtime_dir = dir.path().to_str().unwrap().to_string();
997 let display = "wayland-nonexistent-0";
998 let err = wait_for_wayland_socket(&runtime_dir, display)
999 .await
1000 .unwrap_err();
1001 assert!(
1002 matches!(err, MutterError::WaylandSocketTimeout { .. }),
1003 "expected WaylandSocketTimeout, got: {err}"
1004 );
1005 // And confirm the From<MutterError> -> waydriver::Error mapping
1006 // still produces the public Timeout variant — workspace callers
1007 // (notably the e2e tests) match on it.
1008 let public: waydriver::Error = err.into();
1009 assert!(
1010 matches!(public, waydriver::Error::Timeout(_)),
1011 "expected waydriver::Error::Timeout, got: {public}"
1012 );
1013 }
1014
1015 #[test]
1016 fn test_new_generates_unique_ids() {
1017 let a = MutterCompositor::new();
1018 let b = MutterCompositor::new();
1019 assert_ne!(a.id(), b.id());
1020 }
1021
1022 #[test]
1023 fn test_new_wayland_display_contains_id() {
1024 let c = MutterCompositor::new();
1025 assert!(
1026 c.wayland_display().contains(c.id()),
1027 "display '{}' should contain id '{}'",
1028 c.wayland_display(),
1029 c.id()
1030 );
1031 }
1032
1033 #[test]
1034 fn test_new_runtime_dir_contains_id() {
1035 let c = MutterCompositor::new();
1036 let dir_str = c.runtime_dir().to_str().unwrap();
1037 assert!(
1038 dir_str.contains(c.id()),
1039 "runtime_dir '{}' should contain id '{}'",
1040 dir_str,
1041 c.id()
1042 );
1043 }
1044
1045 /// Regression: session runtime dirs must be flat siblings under one root,
1046 /// never nested inside each other. `waydriver::capture` repoints the
1047 /// process-wide `XDG_RUNTIME_DIR` at the live session's dir after a
1048 /// screenshot/recording; if `new()` re-read that mutated value, each
1049 /// session would nest one level deeper and eventually overflow the
1050 /// AF_UNIX `sun_path` limit, wedging pipewire socket creation. See
1051 /// `HOST_RUNTIME_ROOT`.
1052 #[test]
1053 fn test_session_runtime_dirs_are_siblings_not_nested() {
1054 let a = MutterCompositor::new();
1055 let dir_a = a.runtime_dir().to_path_buf();
1056
1057 // Simulate what a screenshot/recording does: point XDG_RUNTIME_DIR at
1058 // the live session's runtime dir and leave it there.
1059 unsafe {
1060 std::env::set_var("XDG_RUNTIME_DIR", &dir_a);
1061 }
1062
1063 let b = MutterCompositor::new();
1064 let dir_b = b.runtime_dir().to_path_buf();
1065
1066 assert_eq!(
1067 dir_a.parent(),
1068 dir_b.parent(),
1069 "session dirs must share a parent (siblings), got a={dir_a:?} b={dir_b:?}"
1070 );
1071 assert!(
1072 !dir_b.starts_with(&dir_a),
1073 "session B nested inside session A: {dir_b:?}"
1074 );
1075 }
1076
1077 #[test]
1078 fn test_new_wayland_display_prefix() {
1079 let c = MutterCompositor::new();
1080 assert!(c.wayland_display().starts_with("wayland-wd-"));
1081 }
1082
1083 #[test]
1084 fn test_new_runtime_dir_contains_session_prefix() {
1085 let c = MutterCompositor::new();
1086 let dir_str = c.runtime_dir().to_str().unwrap();
1087 assert!(dir_str.contains("wd-session-"));
1088 }
1089
1090 #[test]
1091 fn test_state_returns_none_before_start() {
1092 // `state()` previously panicked when called outside the started
1093 // window. The current contract returns `None` so callers can
1094 // detect the lifecycle without trapping a panic.
1095 let c = MutterCompositor::new();
1096 assert!(c.state().is_none());
1097 }
1098
1099 #[test]
1100 fn test_parse_resolution_accepts_hd() {
1101 assert_eq!(parse_resolution("1920x1080").unwrap(), (1920, 1080));
1102 assert_eq!(parse_resolution("1024x768").unwrap(), (1024, 768));
1103 }
1104
1105 #[test]
1106 fn test_parse_resolution_rejects_garbage() {
1107 for bad in [
1108 "",
1109 "1920",
1110 "1920x",
1111 "x1080",
1112 "0x0",
1113 "1920x0",
1114 "0x1080",
1115 "1920x1080x1",
1116 "abcxdef",
1117 "-1x1080",
1118 "1920 x 1080",
1119 ] {
1120 assert!(parse_resolution(bad).is_err(), "expected error for {bad:?}");
1121 }
1122 }
1123
1124 #[test]
1125 fn test_validate_scale_accepts_common_factors() {
1126 for ok in [0.5, 1.0, 1.25, 1.5, 1.6666, 1.75, 2.0, 3.0, 4.0] {
1127 assert!(validate_scale(ok).is_ok(), "expected {ok} to validate");
1128 }
1129 }
1130
1131 #[test]
1132 fn test_validate_scale_rejects_out_of_range_and_nonfinite() {
1133 for bad in [0.0, 0.49, 4.01, -1.0, f64::NAN, f64::INFINITY] {
1134 assert!(
1135 validate_scale(bad).is_err(),
1136 "expected {bad} to be rejected"
1137 );
1138 }
1139 }
1140
1141 #[test]
1142 fn test_nearest_supported_scale_snaps_to_closest() {
1143 let supported = [1.0, 1.25, 1.5, 1.75, 2.0];
1144 // Exact hits pass straight through.
1145 assert_eq!(nearest_supported_scale(1.5, &supported), 1.5);
1146 assert_eq!(nearest_supported_scale(2.0, &supported), 2.0);
1147 // 1.66 (166%) isn't advertised → nearest is 1.75.
1148 assert_eq!(nearest_supported_scale(1.66, &supported), 1.75);
1149 // 1.6 is closer to 1.5.
1150 assert_eq!(nearest_supported_scale(1.6, &supported), 1.5);
1151 }
1152
1153 #[test]
1154 fn test_nearest_supported_scale_empty_list_returns_request() {
1155 assert_eq!(nearest_supported_scale(1.5, &[]), 1.5);
1156 }
1157
1158 #[test]
1159 fn test_default_same_structure_as_new() {
1160 let c = MutterCompositor::default();
1161 assert!(c.wayland_display().starts_with("wayland-wd-"));
1162 assert!(c.runtime_dir().to_str().unwrap().contains("wd-session-"));
1163 }
1164}