waydriver_compositor_mutter/
lib.rs1mod error;
17
18use std::path::{Path, PathBuf};
19use std::process::Stdio;
20use std::sync::{Arc, Mutex};
21
22use async_trait::async_trait;
23use tokio::process::{Child, Command};
24
25use waydriver::{CompositorRuntime, Result};
26
27use crate::error::MutterError;
28
29const DEFAULT_RESOLUTION: &str = "1024x768";
32
33pub struct MutterState {
49 conn: zbus::Connection,
50 rd_session_path: String,
51 rd_session_id: String,
52 rd_started: Arc<Mutex<bool>>,
53 runtime_dir: PathBuf,
54 active_stream_path: Arc<Mutex<Option<String>>>,
55}
56
57impl MutterState {
58 pub fn conn(&self) -> &zbus::Connection {
64 &self.conn
65 }
66
67 pub fn rd_session_path(&self) -> &str {
71 &self.rd_session_path
72 }
73
74 pub fn rd_session_id(&self) -> &str {
80 &self.rd_session_id
81 }
82
83 pub fn runtime_dir(&self) -> &Path {
86 &self.runtime_dir
87 }
88
89 pub fn rd_started_lock(&self) -> Result<std::sync::MutexGuard<'_, bool>> {
98 self.rd_started
99 .lock()
100 .map_err(|_| waydriver::Error::process("rd_started mutex poisoned"))
101 }
102
103 pub fn active_stream_path_lock(&self) -> Result<std::sync::MutexGuard<'_, Option<String>>> {
111 self.active_stream_path
112 .lock()
113 .map_err(|_| waydriver::Error::process("active_stream_path mutex poisoned"))
114 }
115}
116
117pub struct MutterCompositor {
119 id: String,
120 wayland_display: String,
121 runtime_dir: PathBuf,
122 mutter_dbus_address: String,
123 mutter_dbus_pid: Option<u32>,
124 mutter: Option<Child>,
125 pipewire: Option<Child>,
126 wireplumber: Option<Child>,
127 state: Option<Arc<MutterState>>,
128}
129
130impl MutterCompositor {
131 pub fn new() -> Self {
134 let id = uuid::Uuid::new_v4().to_string()[..8].to_string();
135 let wayland_display = format!("wayland-wd-{}", id);
136
137 let host_runtime = std::env::var("XDG_RUNTIME_DIR")
138 .unwrap_or_else(|_| format!("/run/user/{}", unsafe { libc::getuid() }));
139 let runtime_dir = PathBuf::from(&host_runtime).join(format!("wd-session-{}", id));
140
141 Self {
142 id,
143 wayland_display,
144 runtime_dir,
145 mutter_dbus_address: String::new(),
146 mutter_dbus_pid: None,
147 mutter: None,
148 pipewire: None,
149 wireplumber: None,
150 state: None,
151 }
152 }
153
154 pub fn state(&self) -> Option<Arc<MutterState>> {
167 self.state.clone()
168 }
169}
170
171impl Default for MutterCompositor {
172 fn default() -> Self {
173 Self::new()
174 }
175}
176
177impl MutterCompositor {
178 async fn start_inner(
192 &mut self,
193 resolution: Option<&str>,
194 ) -> std::result::Result<(), MutterError> {
195 let resolution = resolution.unwrap_or(DEFAULT_RESOLUTION);
196 parse_resolution(resolution)?;
200
201 tracing::info!(id = self.id, resolution, "starting mutter compositor");
202
203 tokio::fs::create_dir_all(&self.runtime_dir).await?;
204 let runtime_str = self
210 .runtime_dir
211 .to_str()
212 .expect("invariant: runtime_dir built from UTF-8 inputs in new()")
213 .to_string();
214
215 let dbus_output = Command::new("dbus-launch")
217 .arg("--sh-syntax")
218 .output()
219 .await?;
220 if !dbus_output.status.success() {
221 return Err(MutterError::DbusLaunchFailed(
222 String::from_utf8_lossy(&dbus_output.stderr).into_owned(),
223 ));
224 }
225 let dbus_stdout = String::from_utf8_lossy(&dbus_output.stdout);
226 self.mutter_dbus_address = parse_dbus_address(&dbus_stdout)?;
227 self.mutter_dbus_pid = Some(parse_dbus_pid(&dbus_stdout)?);
228 tracing::debug!(id = self.id, mutter_dbus_address = %self.mutter_dbus_address, "private D-Bus for mutter");
229
230 let pipewire = Command::new("pipewire")
245 .env_remove("PIPEWIRE_REMOTE")
246 .env("DBUS_SESSION_BUS_ADDRESS", &self.mutter_dbus_address)
247 .env("XDG_RUNTIME_DIR", &runtime_str)
248 .stdout(Stdio::null())
249 .stderr(Stdio::null())
250 .spawn()
251 .map_err(|source| MutterError::Spawn {
252 process: "pipewire",
253 source,
254 })?;
255 self.pipewire = Some(pipewire);
256
257 wait_for_pipewire_socket(&runtime_str).await?;
264
265 let wireplumber = Command::new("wireplumber")
266 .env_remove("PIPEWIRE_REMOTE")
267 .env("DBUS_SESSION_BUS_ADDRESS", &self.mutter_dbus_address)
268 .env("XDG_RUNTIME_DIR", &runtime_str)
269 .stdout(Stdio::null())
270 .stderr(Stdio::null())
271 .spawn()
272 .map_err(|source| MutterError::Spawn {
273 process: "wireplumber",
274 source,
275 })?;
276 self.wireplumber = Some(wireplumber);
277
278 tracing::debug!(id = self.id, "PipeWire + WirePlumber started");
288
289 let mutter = Command::new("mutter")
291 .args([
292 "--headless",
293 "--wayland",
294 "--no-x11",
295 "--wayland-display",
296 &self.wayland_display,
297 "--virtual-monitor",
298 resolution,
299 ])
300 .env_remove("PIPEWIRE_REMOTE")
301 .env("DBUS_SESSION_BUS_ADDRESS", &self.mutter_dbus_address)
302 .env("XDG_RUNTIME_DIR", &runtime_str)
303 .stdout(Stdio::null())
304 .stderr(Stdio::inherit())
305 .spawn()
306 .map_err(|source| MutterError::Spawn {
307 process: "mutter",
308 source,
309 })?;
310 self.mutter = Some(mutter);
311 tracing::debug!(id = self.id, wayland_display = %self.wayland_display, "mutter spawned");
312
313 wait_for_wayland_socket(&runtime_str, &self.wayland_display).await?;
315 tracing::debug!(id = self.id, "wayland socket ready");
316
317 let mutter_addr: zbus::address::Address = self
319 .mutter_dbus_address
320 .as_str()
321 .try_into()
322 .map_err(|source: zbus::Error| MutterError::DbusAddressInvalid {
323 addr: self.mutter_dbus_address.clone(),
324 source,
325 })?;
326 let mutter_conn = zbus::connection::Builder::address(mutter_addr)
327 .map_err(|source| MutterError::DbusConnect {
328 stage: "build connection builder",
329 source,
330 })?
331 .build()
332 .await
333 .map_err(|source| MutterError::DbusConnect {
334 stage: "connect",
335 source,
336 })?;
337
338 let mut rd_reply = None;
340 for i in 0..50 {
341 match mutter_conn
342 .call_method(
343 Some("org.gnome.Mutter.RemoteDesktop"),
344 "/org/gnome/Mutter/RemoteDesktop",
345 Some("org.gnome.Mutter.RemoteDesktop"),
346 "CreateSession",
347 &(),
348 )
349 .await
350 {
351 Ok(reply) => {
352 rd_reply = Some(reply);
353 break;
354 }
355 Err(e) if i < 49 => {
356 tracing::debug!(
357 id = self.id,
358 attempt = i,
359 "waiting for RemoteDesktop service: {e}"
360 );
361 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
362 }
363 Err(e) => {
364 return Err(MutterError::RemoteDesktopCreate(e));
365 }
366 }
367 }
368 let rd_reply = rd_reply.expect("retry loop sets Some on break or returns Err");
372 let rd_session_path: zbus::zvariant::OwnedObjectPath = rd_reply
373 .body()
374 .deserialize()
375 .map_err(MutterError::RdSessionPathParse)?;
376 let rd_session_id_reply = mutter_conn
387 .call_method(
388 Some("org.gnome.Mutter.RemoteDesktop"),
389 rd_session_path.as_str(),
390 Some("org.freedesktop.DBus.Properties"),
391 "Get",
392 &("org.gnome.Mutter.RemoteDesktop.Session", "SessionId"),
393 )
394 .await
395 .map_err(MutterError::SessionIdGet)?;
396 let rd_session_id_body = rd_session_id_reply.body();
399 let rd_session_id_variant: zbus::zvariant::OwnedValue = rd_session_id_body
400 .deserialize()
401 .map_err(MutterError::SessionIdVariantParse)?;
402 let rd_session_id: String = rd_session_id_variant
403 .try_into()
404 .map_err(MutterError::SessionIdNotString)?;
405
406 let rd_session_path = rd_session_path.to_string();
407 tracing::debug!(
408 id = self.id,
409 rd_session_path = %rd_session_path,
410 rd_session_id = %rd_session_id,
411 "RemoteDesktop session started"
412 );
413
414 self.state = Some(Arc::new(MutterState {
415 conn: mutter_conn,
416 rd_session_path,
417 rd_session_id,
418 rd_started: Arc::new(Mutex::new(false)),
419 runtime_dir: self.runtime_dir.clone(),
420 active_stream_path: Arc::new(Mutex::new(None)),
421 }));
422
423 Ok(())
424 }
425}
426
427#[async_trait]
428impl CompositorRuntime for MutterCompositor {
429 async fn start(&mut self, resolution: Option<&str>) -> Result<()> {
430 Ok(self.start_inner(resolution).await?)
436 }
437
438 async fn stop(&mut self) -> Result<()> {
439 tracing::info!(id = self.id, "stopping mutter compositor");
440
441 if let Some(state) = &self.state {
447 let _ = state
448 .conn()
449 .call_method(
450 Some("org.gnome.Mutter.RemoteDesktop"),
451 state.rd_session_path(),
452 Some("org.gnome.Mutter.RemoteDesktop.Session"),
453 "Stop",
454 &(),
455 )
456 .await;
457 }
458
459 self.state = None;
464
465 if let Some(mut mutter) = self.mutter.take() {
466 let _ = mutter.kill().await;
467 let _ = mutter.wait().await;
468 }
469 if let Some(mut wireplumber) = self.wireplumber.take() {
470 let _ = wireplumber.kill().await;
471 let _ = wireplumber.wait().await;
472 }
473 if let Some(mut pipewire) = self.pipewire.take() {
474 let _ = pipewire.kill().await;
475 let _ = pipewire.wait().await;
476 }
477
478 if let Some(pid) = self.mutter_dbus_pid.take() {
479 unsafe {
480 libc::kill(pid as i32, libc::SIGTERM);
481 }
482 }
483
484 let _ = tokio::fs::remove_dir_all(&self.runtime_dir).await;
485
486 tracing::debug!(id = self.id, "mutter compositor stopped");
487 Ok(())
488 }
489
490 fn id(&self) -> &str {
491 &self.id
492 }
493
494 fn wayland_display(&self) -> &str {
495 &self.wayland_display
496 }
497
498 fn runtime_dir(&self) -> &Path {
499 &self.runtime_dir
500 }
501}
502
503impl Drop for MutterCompositor {
504 fn drop(&mut self) {
505 self.state = None;
508
509 if let Some(ref mut child) = self.mutter {
510 let _ = child.start_kill();
511 }
512 if let Some(ref mut child) = self.wireplumber {
513 let _ = child.start_kill();
514 }
515 if let Some(ref mut child) = self.pipewire {
516 let _ = child.start_kill();
517 }
518 if let Some(pid) = self.mutter_dbus_pid {
519 unsafe {
520 libc::kill(pid as i32, libc::SIGKILL);
521 }
522 }
523 let _ = std::fs::remove_dir_all(&self.runtime_dir);
524 }
525}
526
527fn parse_dbus_address(output: &str) -> std::result::Result<String, MutterError> {
530 for line in output.lines() {
531 if let Some(rest) = line.strip_prefix("DBUS_SESSION_BUS_ADDRESS='") {
532 if let Some(addr) = rest.strip_suffix("';") {
533 return Ok(addr.to_string());
534 }
535 }
536 }
537 Err(MutterError::DbusOutputMissingField {
538 field: "DBUS_SESSION_BUS_ADDRESS",
539 })
540}
541
542fn parse_dbus_pid(output: &str) -> std::result::Result<u32, MutterError> {
543 for line in output.lines() {
544 if let Some(rest) = line.strip_prefix("DBUS_SESSION_BUS_PID=") {
545 let pid_str = rest.trim_end_matches(';').trim();
546 return pid_str.parse().map_err(MutterError::DbusPidParse);
547 }
548 }
549 Err(MutterError::DbusOutputMissingField {
550 field: "DBUS_SESSION_BUS_PID",
551 })
552}
553
554fn parse_resolution(s: &str) -> std::result::Result<(u32, u32), MutterError> {
555 let invalid = || MutterError::ResolutionInvalid {
556 value: s.to_string(),
557 };
558 let (w, h) = s.split_once('x').ok_or_else(invalid)?;
559 let parse = |part: &str| -> std::result::Result<u32, MutterError> {
560 part.parse::<u32>()
561 .ok()
562 .filter(|n| *n > 0)
563 .ok_or_else(invalid)
564 };
565 Ok((parse(w)?, parse(h)?))
566}
567
568async fn wait_for_wayland_socket(
569 runtime_dir: &str,
570 display: &str,
571) -> std::result::Result<(), MutterError> {
572 let socket_path = PathBuf::from(runtime_dir).join(display);
573 for _ in 0..50 {
574 if socket_path.exists() {
575 return Ok(());
576 }
577 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
578 }
579 Err(MutterError::WaylandSocketTimeout {
580 socket: socket_path.display().to_string(),
581 })
582}
583
584async fn wait_for_pipewire_socket(runtime_dir: &str) -> std::result::Result<(), MutterError> {
590 let socket_path = PathBuf::from(runtime_dir).join("pipewire-0");
591 for _ in 0..50 {
592 if socket_path.exists() {
593 return Ok(());
594 }
595 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
596 }
597 Err(MutterError::PipewireSocketTimeout {
598 socket: socket_path.display().to_string(),
599 })
600}
601
602#[cfg(test)]
603mod tests {
604 use super::*;
605
606 #[test]
607 fn test_parse_dbus_address_valid() {
608 let output = "DBUS_SESSION_BUS_ADDRESS='unix:abstract=/tmp/dbus-XXX,guid=abc123';\nDBUS_SESSION_BUS_PID=12345;\n";
609 let addr = parse_dbus_address(output).unwrap();
610 assert_eq!(addr, "unix:abstract=/tmp/dbus-XXX,guid=abc123");
611 }
612
613 #[test]
614 fn test_parse_dbus_address_missing() {
615 let output = "DBUS_SESSION_BUS_PID=12345;\n";
616 assert!(parse_dbus_address(output).is_err());
617 }
618
619 #[test]
620 fn test_parse_dbus_pid_valid() {
621 let output = "DBUS_SESSION_BUS_ADDRESS='unix:abstract=/tmp/dbus-XXX,guid=abc123';\nDBUS_SESSION_BUS_PID=12345;\n";
622 let pid = parse_dbus_pid(output).unwrap();
623 assert_eq!(pid, 12345);
624 }
625
626 #[test]
627 fn test_parse_dbus_pid_missing() {
628 let output = "DBUS_SESSION_BUS_ADDRESS='unix:abstract=/tmp/dbus-XXX,guid=abc123';\n";
629 assert!(parse_dbus_pid(output).is_err());
630 }
631
632 #[test]
633 fn test_parse_dbus_pid_invalid() {
634 let output = "DBUS_SESSION_BUS_PID=notanumber;\n";
635 assert!(parse_dbus_pid(output).is_err());
636 }
637
638 #[tokio::test]
639 async fn test_wait_for_socket_found() {
640 let dir = tempfile::tempdir().unwrap();
641 let runtime_dir = dir.path().to_str().unwrap().to_string();
642 let display = "wayland-test-99";
643 std::fs::File::create(dir.path().join(display)).unwrap();
644 wait_for_wayland_socket(&runtime_dir, display)
645 .await
646 .unwrap();
647 }
648
649 #[tokio::test]
650 async fn test_wait_for_pipewire_socket_found() {
651 let dir = tempfile::tempdir().unwrap();
652 let runtime_dir = dir.path().to_str().unwrap().to_string();
653 std::fs::File::create(dir.path().join("pipewire-0")).unwrap();
654 wait_for_pipewire_socket(&runtime_dir).await.unwrap();
655 }
656
657 #[tokio::test]
658 async fn test_wait_for_pipewire_socket_timeout() {
659 let dir = tempfile::tempdir().unwrap();
660 let runtime_dir = dir.path().to_str().unwrap().to_string();
661 let err = wait_for_pipewire_socket(&runtime_dir).await.unwrap_err();
662 assert!(
663 matches!(err, MutterError::PipewireSocketTimeout { .. }),
664 "expected PipewireSocketTimeout, got: {err}"
665 );
666 let public: waydriver::Error = err.into();
670 assert!(
671 matches!(public, waydriver::Error::Timeout(_)),
672 "expected waydriver::Error::Timeout, got: {public}"
673 );
674 }
675
676 #[tokio::test]
677 async fn test_wait_for_socket_timeout() {
678 let dir = tempfile::tempdir().unwrap();
679 let runtime_dir = dir.path().to_str().unwrap().to_string();
680 let display = "wayland-nonexistent-0";
681 let err = wait_for_wayland_socket(&runtime_dir, display)
682 .await
683 .unwrap_err();
684 assert!(
685 matches!(err, MutterError::WaylandSocketTimeout { .. }),
686 "expected WaylandSocketTimeout, got: {err}"
687 );
688 let public: waydriver::Error = err.into();
692 assert!(
693 matches!(public, waydriver::Error::Timeout(_)),
694 "expected waydriver::Error::Timeout, got: {public}"
695 );
696 }
697
698 #[test]
699 fn test_new_generates_unique_ids() {
700 let a = MutterCompositor::new();
701 let b = MutterCompositor::new();
702 assert_ne!(a.id(), b.id());
703 }
704
705 #[test]
706 fn test_new_wayland_display_contains_id() {
707 let c = MutterCompositor::new();
708 assert!(
709 c.wayland_display().contains(c.id()),
710 "display '{}' should contain id '{}'",
711 c.wayland_display(),
712 c.id()
713 );
714 }
715
716 #[test]
717 fn test_new_runtime_dir_contains_id() {
718 let c = MutterCompositor::new();
719 let dir_str = c.runtime_dir().to_str().unwrap();
720 assert!(
721 dir_str.contains(c.id()),
722 "runtime_dir '{}' should contain id '{}'",
723 dir_str,
724 c.id()
725 );
726 }
727
728 #[test]
729 fn test_new_wayland_display_prefix() {
730 let c = MutterCompositor::new();
731 assert!(c.wayland_display().starts_with("wayland-wd-"));
732 }
733
734 #[test]
735 fn test_new_runtime_dir_contains_session_prefix() {
736 let c = MutterCompositor::new();
737 let dir_str = c.runtime_dir().to_str().unwrap();
738 assert!(dir_str.contains("wd-session-"));
739 }
740
741 #[test]
742 fn test_state_returns_none_before_start() {
743 let c = MutterCompositor::new();
747 assert!(c.state().is_none());
748 }
749
750 #[test]
751 fn test_parse_resolution_accepts_hd() {
752 assert_eq!(parse_resolution("1920x1080").unwrap(), (1920, 1080));
753 assert_eq!(parse_resolution("1024x768").unwrap(), (1024, 768));
754 }
755
756 #[test]
757 fn test_parse_resolution_rejects_garbage() {
758 for bad in [
759 "",
760 "1920",
761 "1920x",
762 "x1080",
763 "0x0",
764 "1920x0",
765 "0x1080",
766 "1920x1080x1",
767 "abcxdef",
768 "-1x1080",
769 "1920 x 1080",
770 ] {
771 assert!(parse_resolution(bad).is_err(), "expected error for {bad:?}");
772 }
773 }
774
775 #[test]
776 fn test_default_same_structure_as_new() {
777 let c = MutterCompositor::default();
778 assert!(c.wayland_display().starts_with("wayland-wd-"));
779 assert!(c.runtime_dir().to_str().unwrap().contains("wd-session-"));
780 }
781}