waydriver_compositor_mutter/
lib.rs1mod error;
17
18use std::path::{Path, PathBuf};
19use std::process::Stdio;
20use std::sync::{Arc, Mutex};
21
22use async_trait::async_trait;
23use tokio::process::{Child, Command};
24
25use waydriver::{CompositorRuntime, Result};
26
27use crate::error::MutterError;
28
29const DEFAULT_RESOLUTION: &str = "1024x768";
32
33pub struct MutterState {
49 conn: zbus::Connection,
50 rd_session_path: String,
51 rd_session_id: String,
52 rd_started: Arc<Mutex<bool>>,
53 runtime_dir: PathBuf,
54 active_stream_path: Arc<Mutex<Option<String>>>,
55}
56
57impl MutterState {
58 pub fn conn(&self) -> &zbus::Connection {
64 &self.conn
65 }
66
67 pub fn rd_session_path(&self) -> &str {
71 &self.rd_session_path
72 }
73
74 pub fn rd_session_id(&self) -> &str {
80 &self.rd_session_id
81 }
82
83 pub fn runtime_dir(&self) -> &Path {
86 &self.runtime_dir
87 }
88
89 pub fn rd_started_lock(&self) -> Result<std::sync::MutexGuard<'_, bool>> {
98 self.rd_started
99 .lock()
100 .map_err(|_| waydriver::Error::process("rd_started mutex poisoned"))
101 }
102
103 pub fn active_stream_path_lock(&self) -> Result<std::sync::MutexGuard<'_, Option<String>>> {
111 self.active_stream_path
112 .lock()
113 .map_err(|_| waydriver::Error::process("active_stream_path mutex poisoned"))
114 }
115}
116
117pub struct MutterCompositor {
119 id: String,
120 wayland_display: String,
121 runtime_dir: PathBuf,
122 mutter_dbus_address: String,
123 mutter_dbus_pid: Option<u32>,
124 mutter: Option<Child>,
125 pipewire: Option<Child>,
126 wireplumber: Option<Child>,
127 state: Option<Arc<MutterState>>,
128}
129
130impl MutterCompositor {
131 pub fn new() -> Self {
134 let id = uuid::Uuid::new_v4().to_string()[..8].to_string();
135 let wayland_display = format!("wayland-wd-{}", id);
136
137 let host_runtime = std::env::var("XDG_RUNTIME_DIR")
138 .unwrap_or_else(|_| format!("/run/user/{}", unsafe { libc::getuid() }));
139 let runtime_dir = PathBuf::from(&host_runtime).join(format!("wd-session-{}", id));
140
141 Self {
142 id,
143 wayland_display,
144 runtime_dir,
145 mutter_dbus_address: String::new(),
146 mutter_dbus_pid: None,
147 mutter: None,
148 pipewire: None,
149 wireplumber: None,
150 state: None,
151 }
152 }
153
154 pub fn state(&self) -> Option<Arc<MutterState>> {
167 self.state.clone()
168 }
169}
170
171impl Default for MutterCompositor {
172 fn default() -> Self {
173 Self::new()
174 }
175}
176
177impl MutterCompositor {
178 async fn start_inner(
192 &mut self,
193 resolution: Option<&str>,
194 ) -> std::result::Result<(), MutterError> {
195 let resolution = resolution.unwrap_or(DEFAULT_RESOLUTION);
196 parse_resolution(resolution)?;
200
201 tracing::info!(id = self.id, resolution, "starting mutter compositor");
202
203 tokio::fs::create_dir_all(&self.runtime_dir).await?;
204 let runtime_str = self
210 .runtime_dir
211 .to_str()
212 .expect("invariant: runtime_dir built from UTF-8 inputs in new()")
213 .to_string();
214
215 let dbus_output = Command::new("dbus-launch")
217 .arg("--sh-syntax")
218 .output()
219 .await?;
220 if !dbus_output.status.success() {
221 return Err(MutterError::DbusLaunchFailed(
222 String::from_utf8_lossy(&dbus_output.stderr).into_owned(),
223 ));
224 }
225 let dbus_stdout = String::from_utf8_lossy(&dbus_output.stdout);
226 self.mutter_dbus_address = parse_dbus_address(&dbus_stdout)?;
227 self.mutter_dbus_pid = Some(parse_dbus_pid(&dbus_stdout)?);
228 tracing::debug!(id = self.id, mutter_dbus_address = %self.mutter_dbus_address, "private D-Bus for mutter");
229
230 let pipewire = Command::new("pipewire")
232 .env("DBUS_SESSION_BUS_ADDRESS", &self.mutter_dbus_address)
233 .env("XDG_RUNTIME_DIR", &runtime_str)
234 .stdout(Stdio::null())
235 .stderr(Stdio::null())
236 .spawn()
237 .map_err(|source| MutterError::Spawn {
238 process: "pipewire",
239 source,
240 })?;
241 self.pipewire = Some(pipewire);
242
243 wait_for_pipewire_socket(&runtime_str).await?;
250
251 let wireplumber = Command::new("wireplumber")
252 .env("DBUS_SESSION_BUS_ADDRESS", &self.mutter_dbus_address)
253 .env("XDG_RUNTIME_DIR", &runtime_str)
254 .stdout(Stdio::null())
255 .stderr(Stdio::null())
256 .spawn()
257 .map_err(|source| MutterError::Spawn {
258 process: "wireplumber",
259 source,
260 })?;
261 self.wireplumber = Some(wireplumber);
262
263 tracing::debug!(id = self.id, "PipeWire + WirePlumber started");
273
274 let mutter = Command::new("mutter")
276 .args([
277 "--headless",
278 "--wayland",
279 "--no-x11",
280 "--wayland-display",
281 &self.wayland_display,
282 "--virtual-monitor",
283 resolution,
284 ])
285 .env("DBUS_SESSION_BUS_ADDRESS", &self.mutter_dbus_address)
286 .env("XDG_RUNTIME_DIR", &runtime_str)
287 .stdout(Stdio::null())
288 .stderr(Stdio::inherit())
289 .spawn()
290 .map_err(|source| MutterError::Spawn {
291 process: "mutter",
292 source,
293 })?;
294 self.mutter = Some(mutter);
295 tracing::debug!(id = self.id, wayland_display = %self.wayland_display, "mutter spawned");
296
297 wait_for_wayland_socket(&runtime_str, &self.wayland_display).await?;
299 tracing::debug!(id = self.id, "wayland socket ready");
300
301 let mutter_addr: zbus::address::Address = self
303 .mutter_dbus_address
304 .as_str()
305 .try_into()
306 .map_err(|source: zbus::Error| MutterError::DbusAddressInvalid {
307 addr: self.mutter_dbus_address.clone(),
308 source,
309 })?;
310 let mutter_conn = zbus::connection::Builder::address(mutter_addr)
311 .map_err(|source| MutterError::DbusConnect {
312 stage: "build connection builder",
313 source,
314 })?
315 .build()
316 .await
317 .map_err(|source| MutterError::DbusConnect {
318 stage: "connect",
319 source,
320 })?;
321
322 let mut rd_reply = None;
324 for i in 0..50 {
325 match mutter_conn
326 .call_method(
327 Some("org.gnome.Mutter.RemoteDesktop"),
328 "/org/gnome/Mutter/RemoteDesktop",
329 Some("org.gnome.Mutter.RemoteDesktop"),
330 "CreateSession",
331 &(),
332 )
333 .await
334 {
335 Ok(reply) => {
336 rd_reply = Some(reply);
337 break;
338 }
339 Err(e) if i < 49 => {
340 tracing::debug!(
341 id = self.id,
342 attempt = i,
343 "waiting for RemoteDesktop service: {e}"
344 );
345 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
346 }
347 Err(e) => {
348 return Err(MutterError::RemoteDesktopCreate(e));
349 }
350 }
351 }
352 let rd_reply = rd_reply.expect("retry loop sets Some on break or returns Err");
356 let rd_session_path: zbus::zvariant::OwnedObjectPath = rd_reply
357 .body()
358 .deserialize()
359 .map_err(MutterError::RdSessionPathParse)?;
360 let rd_session_id_reply = mutter_conn
371 .call_method(
372 Some("org.gnome.Mutter.RemoteDesktop"),
373 rd_session_path.as_str(),
374 Some("org.freedesktop.DBus.Properties"),
375 "Get",
376 &("org.gnome.Mutter.RemoteDesktop.Session", "SessionId"),
377 )
378 .await
379 .map_err(MutterError::SessionIdGet)?;
380 let rd_session_id_body = rd_session_id_reply.body();
383 let rd_session_id_variant: zbus::zvariant::OwnedValue = rd_session_id_body
384 .deserialize()
385 .map_err(MutterError::SessionIdVariantParse)?;
386 let rd_session_id: String = rd_session_id_variant
387 .try_into()
388 .map_err(MutterError::SessionIdNotString)?;
389
390 let rd_session_path = rd_session_path.to_string();
391 tracing::debug!(
392 id = self.id,
393 rd_session_path = %rd_session_path,
394 rd_session_id = %rd_session_id,
395 "RemoteDesktop session started"
396 );
397
398 self.state = Some(Arc::new(MutterState {
399 conn: mutter_conn,
400 rd_session_path,
401 rd_session_id,
402 rd_started: Arc::new(Mutex::new(false)),
403 runtime_dir: self.runtime_dir.clone(),
404 active_stream_path: Arc::new(Mutex::new(None)),
405 }));
406
407 Ok(())
408 }
409}
410
411#[async_trait]
412impl CompositorRuntime for MutterCompositor {
413 async fn start(&mut self, resolution: Option<&str>) -> Result<()> {
414 Ok(self.start_inner(resolution).await?)
420 }
421
422 async fn stop(&mut self) -> Result<()> {
423 tracing::info!(id = self.id, "stopping mutter compositor");
424
425 if let Some(state) = &self.state {
431 let _ = state
432 .conn()
433 .call_method(
434 Some("org.gnome.Mutter.RemoteDesktop"),
435 state.rd_session_path(),
436 Some("org.gnome.Mutter.RemoteDesktop.Session"),
437 "Stop",
438 &(),
439 )
440 .await;
441 }
442
443 self.state = None;
448
449 if let Some(mut mutter) = self.mutter.take() {
450 let _ = mutter.kill().await;
451 let _ = mutter.wait().await;
452 }
453 if let Some(mut wireplumber) = self.wireplumber.take() {
454 let _ = wireplumber.kill().await;
455 let _ = wireplumber.wait().await;
456 }
457 if let Some(mut pipewire) = self.pipewire.take() {
458 let _ = pipewire.kill().await;
459 let _ = pipewire.wait().await;
460 }
461
462 if let Some(pid) = self.mutter_dbus_pid.take() {
463 unsafe {
464 libc::kill(pid as i32, libc::SIGTERM);
465 }
466 }
467
468 let _ = tokio::fs::remove_dir_all(&self.runtime_dir).await;
469
470 tracing::debug!(id = self.id, "mutter compositor stopped");
471 Ok(())
472 }
473
474 fn id(&self) -> &str {
475 &self.id
476 }
477
478 fn wayland_display(&self) -> &str {
479 &self.wayland_display
480 }
481
482 fn runtime_dir(&self) -> &Path {
483 &self.runtime_dir
484 }
485}
486
487impl Drop for MutterCompositor {
488 fn drop(&mut self) {
489 self.state = None;
492
493 if let Some(ref mut child) = self.mutter {
494 let _ = child.start_kill();
495 }
496 if let Some(ref mut child) = self.wireplumber {
497 let _ = child.start_kill();
498 }
499 if let Some(ref mut child) = self.pipewire {
500 let _ = child.start_kill();
501 }
502 if let Some(pid) = self.mutter_dbus_pid {
503 unsafe {
504 libc::kill(pid as i32, libc::SIGKILL);
505 }
506 }
507 let _ = std::fs::remove_dir_all(&self.runtime_dir);
508 }
509}
510
511fn parse_dbus_address(output: &str) -> std::result::Result<String, MutterError> {
514 for line in output.lines() {
515 if let Some(rest) = line.strip_prefix("DBUS_SESSION_BUS_ADDRESS='") {
516 if let Some(addr) = rest.strip_suffix("';") {
517 return Ok(addr.to_string());
518 }
519 }
520 }
521 Err(MutterError::DbusOutputMissingField {
522 field: "DBUS_SESSION_BUS_ADDRESS",
523 })
524}
525
526fn parse_dbus_pid(output: &str) -> std::result::Result<u32, MutterError> {
527 for line in output.lines() {
528 if let Some(rest) = line.strip_prefix("DBUS_SESSION_BUS_PID=") {
529 let pid_str = rest.trim_end_matches(';').trim();
530 return pid_str.parse().map_err(MutterError::DbusPidParse);
531 }
532 }
533 Err(MutterError::DbusOutputMissingField {
534 field: "DBUS_SESSION_BUS_PID",
535 })
536}
537
538fn parse_resolution(s: &str) -> std::result::Result<(u32, u32), MutterError> {
539 let invalid = || MutterError::ResolutionInvalid {
540 value: s.to_string(),
541 };
542 let (w, h) = s.split_once('x').ok_or_else(invalid)?;
543 let parse = |part: &str| -> std::result::Result<u32, MutterError> {
544 part.parse::<u32>()
545 .ok()
546 .filter(|n| *n > 0)
547 .ok_or_else(invalid)
548 };
549 Ok((parse(w)?, parse(h)?))
550}
551
552async fn wait_for_wayland_socket(
553 runtime_dir: &str,
554 display: &str,
555) -> std::result::Result<(), MutterError> {
556 let socket_path = PathBuf::from(runtime_dir).join(display);
557 for _ in 0..50 {
558 if socket_path.exists() {
559 return Ok(());
560 }
561 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
562 }
563 Err(MutterError::WaylandSocketTimeout {
564 socket: socket_path.display().to_string(),
565 })
566}
567
568async fn wait_for_pipewire_socket(runtime_dir: &str) -> std::result::Result<(), MutterError> {
574 let socket_path = PathBuf::from(runtime_dir).join("pipewire-0");
575 for _ in 0..50 {
576 if socket_path.exists() {
577 return Ok(());
578 }
579 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
580 }
581 Err(MutterError::PipewireSocketTimeout {
582 socket: socket_path.display().to_string(),
583 })
584}
585
586#[cfg(test)]
587mod tests {
588 use super::*;
589
590 #[test]
591 fn test_parse_dbus_address_valid() {
592 let output = "DBUS_SESSION_BUS_ADDRESS='unix:abstract=/tmp/dbus-XXX,guid=abc123';\nDBUS_SESSION_BUS_PID=12345;\n";
593 let addr = parse_dbus_address(output).unwrap();
594 assert_eq!(addr, "unix:abstract=/tmp/dbus-XXX,guid=abc123");
595 }
596
597 #[test]
598 fn test_parse_dbus_address_missing() {
599 let output = "DBUS_SESSION_BUS_PID=12345;\n";
600 assert!(parse_dbus_address(output).is_err());
601 }
602
603 #[test]
604 fn test_parse_dbus_pid_valid() {
605 let output = "DBUS_SESSION_BUS_ADDRESS='unix:abstract=/tmp/dbus-XXX,guid=abc123';\nDBUS_SESSION_BUS_PID=12345;\n";
606 let pid = parse_dbus_pid(output).unwrap();
607 assert_eq!(pid, 12345);
608 }
609
610 #[test]
611 fn test_parse_dbus_pid_missing() {
612 let output = "DBUS_SESSION_BUS_ADDRESS='unix:abstract=/tmp/dbus-XXX,guid=abc123';\n";
613 assert!(parse_dbus_pid(output).is_err());
614 }
615
616 #[test]
617 fn test_parse_dbus_pid_invalid() {
618 let output = "DBUS_SESSION_BUS_PID=notanumber;\n";
619 assert!(parse_dbus_pid(output).is_err());
620 }
621
622 #[tokio::test]
623 async fn test_wait_for_socket_found() {
624 let dir = tempfile::tempdir().unwrap();
625 let runtime_dir = dir.path().to_str().unwrap().to_string();
626 let display = "wayland-test-99";
627 std::fs::File::create(dir.path().join(display)).unwrap();
628 wait_for_wayland_socket(&runtime_dir, display)
629 .await
630 .unwrap();
631 }
632
633 #[tokio::test]
634 async fn test_wait_for_pipewire_socket_found() {
635 let dir = tempfile::tempdir().unwrap();
636 let runtime_dir = dir.path().to_str().unwrap().to_string();
637 std::fs::File::create(dir.path().join("pipewire-0")).unwrap();
638 wait_for_pipewire_socket(&runtime_dir).await.unwrap();
639 }
640
641 #[tokio::test]
642 async fn test_wait_for_pipewire_socket_timeout() {
643 let dir = tempfile::tempdir().unwrap();
644 let runtime_dir = dir.path().to_str().unwrap().to_string();
645 let err = wait_for_pipewire_socket(&runtime_dir).await.unwrap_err();
646 assert!(
647 matches!(err, MutterError::PipewireSocketTimeout { .. }),
648 "expected PipewireSocketTimeout, got: {err}"
649 );
650 let public: waydriver::Error = err.into();
654 assert!(
655 matches!(public, waydriver::Error::Timeout(_)),
656 "expected waydriver::Error::Timeout, got: {public}"
657 );
658 }
659
660 #[tokio::test]
661 async fn test_wait_for_socket_timeout() {
662 let dir = tempfile::tempdir().unwrap();
663 let runtime_dir = dir.path().to_str().unwrap().to_string();
664 let display = "wayland-nonexistent-0";
665 let err = wait_for_wayland_socket(&runtime_dir, display)
666 .await
667 .unwrap_err();
668 assert!(
669 matches!(err, MutterError::WaylandSocketTimeout { .. }),
670 "expected WaylandSocketTimeout, got: {err}"
671 );
672 let public: waydriver::Error = err.into();
676 assert!(
677 matches!(public, waydriver::Error::Timeout(_)),
678 "expected waydriver::Error::Timeout, got: {public}"
679 );
680 }
681
682 #[test]
683 fn test_new_generates_unique_ids() {
684 let a = MutterCompositor::new();
685 let b = MutterCompositor::new();
686 assert_ne!(a.id(), b.id());
687 }
688
689 #[test]
690 fn test_new_wayland_display_contains_id() {
691 let c = MutterCompositor::new();
692 assert!(
693 c.wayland_display().contains(c.id()),
694 "display '{}' should contain id '{}'",
695 c.wayland_display(),
696 c.id()
697 );
698 }
699
700 #[test]
701 fn test_new_runtime_dir_contains_id() {
702 let c = MutterCompositor::new();
703 let dir_str = c.runtime_dir().to_str().unwrap();
704 assert!(
705 dir_str.contains(c.id()),
706 "runtime_dir '{}' should contain id '{}'",
707 dir_str,
708 c.id()
709 );
710 }
711
712 #[test]
713 fn test_new_wayland_display_prefix() {
714 let c = MutterCompositor::new();
715 assert!(c.wayland_display().starts_with("wayland-wd-"));
716 }
717
718 #[test]
719 fn test_new_runtime_dir_contains_session_prefix() {
720 let c = MutterCompositor::new();
721 let dir_str = c.runtime_dir().to_str().unwrap();
722 assert!(dir_str.contains("wd-session-"));
723 }
724
725 #[test]
726 fn test_state_returns_none_before_start() {
727 let c = MutterCompositor::new();
731 assert!(c.state().is_none());
732 }
733
734 #[test]
735 fn test_parse_resolution_accepts_hd() {
736 assert_eq!(parse_resolution("1920x1080").unwrap(), (1920, 1080));
737 assert_eq!(parse_resolution("1024x768").unwrap(), (1024, 768));
738 }
739
740 #[test]
741 fn test_parse_resolution_rejects_garbage() {
742 for bad in [
743 "",
744 "1920",
745 "1920x",
746 "x1080",
747 "0x0",
748 "1920x0",
749 "0x1080",
750 "1920x1080x1",
751 "abcxdef",
752 "-1x1080",
753 "1920 x 1080",
754 ] {
755 assert!(parse_resolution(bad).is_err(), "expected error for {bad:?}");
756 }
757 }
758
759 #[test]
760 fn test_default_same_structure_as_new() {
761 let c = MutterCompositor::default();
762 assert!(c.wayland_display().starts_with("wayland-wd-"));
763 assert!(c.runtime_dir().to_str().unwrap().contains("wd-session-"));
764 }
765}