1mod error;
17
18use std::path::{Path, PathBuf};
19use std::process::Stdio;
20use std::sync::{Arc, LazyLock, Mutex};
21
22use async_trait::async_trait;
23use tokio::process::{Child, Command};
24
25use waydriver::{CompositorRuntime, Result};
26
27use crate::error::MutterError;
28
29const DEFAULT_RESOLUTION: &str = "1024x768";
32
33pub struct MutterState {
49 conn: zbus::Connection,
50 rd_session_path: String,
51 rd_session_id: String,
52 rd_started: Arc<Mutex<bool>>,
53 runtime_dir: PathBuf,
54 active_stream_path: Arc<Mutex<Option<String>>>,
55}
56
57impl MutterState {
58 pub fn conn(&self) -> &zbus::Connection {
64 &self.conn
65 }
66
67 pub fn rd_session_path(&self) -> &str {
71 &self.rd_session_path
72 }
73
74 pub fn rd_session_id(&self) -> &str {
80 &self.rd_session_id
81 }
82
83 pub fn runtime_dir(&self) -> &Path {
86 &self.runtime_dir
87 }
88
89 pub fn rd_started_lock(&self) -> Result<std::sync::MutexGuard<'_, bool>> {
98 self.rd_started
99 .lock()
100 .map_err(|_| waydriver::Error::process("rd_started mutex poisoned"))
101 }
102
103 pub fn active_stream_path_lock(&self) -> Result<std::sync::MutexGuard<'_, Option<String>>> {
111 self.active_stream_path
112 .lock()
113 .map_err(|_| waydriver::Error::process("active_stream_path mutex poisoned"))
114 }
115}
116
117pub struct MutterCompositor {
119 id: String,
120 wayland_display: String,
121 runtime_dir: PathBuf,
122 mutter_dbus_address: String,
123 mutter_dbus_pid: Option<u32>,
124 mutter: Option<Child>,
125 pipewire: Option<Child>,
126 wireplumber: Option<Child>,
127 state: Option<Arc<MutterState>>,
128}
129
130static HOST_RUNTIME_ROOT: LazyLock<PathBuf> = LazyLock::new(|| {
152 let root = std::env::var("XDG_RUNTIME_DIR")
153 .unwrap_or_else(|_| format!("/run/user/{}", unsafe { libc::getuid() }));
154 PathBuf::from(root)
155});
156
157impl MutterCompositor {
158 pub fn new() -> Self {
161 let id = uuid::Uuid::new_v4().to_string()[..8].to_string();
162 let wayland_display = format!("wayland-wd-{}", id);
163
164 let runtime_dir = HOST_RUNTIME_ROOT.join(format!("wd-session-{}", id));
165
166 Self {
167 id,
168 wayland_display,
169 runtime_dir,
170 mutter_dbus_address: String::new(),
171 mutter_dbus_pid: None,
172 mutter: None,
173 pipewire: None,
174 wireplumber: None,
175 state: None,
176 }
177 }
178
179 pub fn state(&self) -> Option<Arc<MutterState>> {
192 self.state.clone()
193 }
194}
195
196impl Default for MutterCompositor {
197 fn default() -> Self {
198 Self::new()
199 }
200}
201
202impl MutterCompositor {
203 async fn start_inner(
217 &mut self,
218 resolution: Option<&str>,
219 ) -> std::result::Result<(), MutterError> {
220 let resolution = resolution.unwrap_or(DEFAULT_RESOLUTION);
221 parse_resolution(resolution)?;
225
226 tracing::info!(id = self.id, resolution, "starting mutter compositor");
227
228 tokio::fs::create_dir_all(&self.runtime_dir).await?;
229 let runtime_str = self
235 .runtime_dir
236 .to_str()
237 .expect("invariant: runtime_dir built from UTF-8 inputs in new()")
238 .to_string();
239
240 let dbus_output = Command::new("dbus-launch")
242 .arg("--sh-syntax")
243 .output()
244 .await?;
245 if !dbus_output.status.success() {
246 return Err(MutterError::DbusLaunchFailed(
247 String::from_utf8_lossy(&dbus_output.stderr).into_owned(),
248 ));
249 }
250 let dbus_stdout = String::from_utf8_lossy(&dbus_output.stdout);
251 self.mutter_dbus_address = parse_dbus_address(&dbus_stdout)?;
252 self.mutter_dbus_pid = Some(parse_dbus_pid(&dbus_stdout)?);
253 tracing::debug!(id = self.id, mutter_dbus_address = %self.mutter_dbus_address, "private D-Bus for mutter");
254
255 let pipewire = Command::new("pipewire")
270 .env_remove("PIPEWIRE_REMOTE")
271 .env("DBUS_SESSION_BUS_ADDRESS", &self.mutter_dbus_address)
272 .env("XDG_RUNTIME_DIR", &runtime_str)
273 .stdout(Stdio::null())
274 .stderr(Stdio::null())
275 .spawn()
276 .map_err(|source| MutterError::Spawn {
277 process: "pipewire",
278 source,
279 })?;
280 self.pipewire = Some(pipewire);
281
282 wait_for_pipewire_socket(&runtime_str).await?;
289
290 let wireplumber = Command::new("wireplumber")
291 .env_remove("PIPEWIRE_REMOTE")
292 .env("DBUS_SESSION_BUS_ADDRESS", &self.mutter_dbus_address)
293 .env("XDG_RUNTIME_DIR", &runtime_str)
294 .stdout(Stdio::null())
295 .stderr(Stdio::null())
296 .spawn()
297 .map_err(|source| MutterError::Spawn {
298 process: "wireplumber",
299 source,
300 })?;
301 self.wireplumber = Some(wireplumber);
302
303 tracing::debug!(id = self.id, "PipeWire + WirePlumber started");
313
314 let mutter = Command::new("mutter")
316 .args([
317 "--headless",
318 "--wayland",
319 "--no-x11",
320 "--wayland-display",
321 &self.wayland_display,
322 "--virtual-monitor",
323 resolution,
324 ])
325 .env_remove("PIPEWIRE_REMOTE")
326 .env("DBUS_SESSION_BUS_ADDRESS", &self.mutter_dbus_address)
327 .env("XDG_RUNTIME_DIR", &runtime_str)
328 .stdout(Stdio::null())
329 .stderr(Stdio::inherit())
330 .spawn()
331 .map_err(|source| MutterError::Spawn {
332 process: "mutter",
333 source,
334 })?;
335 self.mutter = Some(mutter);
336 tracing::debug!(id = self.id, wayland_display = %self.wayland_display, "mutter spawned");
337
338 wait_for_wayland_socket(&runtime_str, &self.wayland_display).await?;
340 tracing::debug!(id = self.id, "wayland socket ready");
341
342 let mutter_addr: zbus::address::Address = self
344 .mutter_dbus_address
345 .as_str()
346 .try_into()
347 .map_err(|source: zbus::Error| MutterError::DbusAddressInvalid {
348 addr: self.mutter_dbus_address.clone(),
349 source,
350 })?;
351 let mutter_conn = zbus::connection::Builder::address(mutter_addr)
352 .map_err(|source| MutterError::DbusConnect {
353 stage: "build connection builder",
354 source,
355 })?
356 .build()
357 .await
358 .map_err(|source| MutterError::DbusConnect {
359 stage: "connect",
360 source,
361 })?;
362
363 let mut rd_reply = None;
365 for i in 0..50 {
366 match mutter_conn
367 .call_method(
368 Some("org.gnome.Mutter.RemoteDesktop"),
369 "/org/gnome/Mutter/RemoteDesktop",
370 Some("org.gnome.Mutter.RemoteDesktop"),
371 "CreateSession",
372 &(),
373 )
374 .await
375 {
376 Ok(reply) => {
377 rd_reply = Some(reply);
378 break;
379 }
380 Err(e) if i < 49 => {
381 tracing::debug!(
382 id = self.id,
383 attempt = i,
384 "waiting for RemoteDesktop service: {e}"
385 );
386 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
387 }
388 Err(e) => {
389 return Err(MutterError::RemoteDesktopCreate(e));
390 }
391 }
392 }
393 let rd_reply = rd_reply.expect("retry loop sets Some on break or returns Err");
397 let rd_session_path: zbus::zvariant::OwnedObjectPath = rd_reply
398 .body()
399 .deserialize()
400 .map_err(MutterError::RdSessionPathParse)?;
401 let rd_session_id_reply = mutter_conn
412 .call_method(
413 Some("org.gnome.Mutter.RemoteDesktop"),
414 rd_session_path.as_str(),
415 Some("org.freedesktop.DBus.Properties"),
416 "Get",
417 &("org.gnome.Mutter.RemoteDesktop.Session", "SessionId"),
418 )
419 .await
420 .map_err(MutterError::SessionIdGet)?;
421 let rd_session_id_body = rd_session_id_reply.body();
424 let rd_session_id_variant: zbus::zvariant::OwnedValue = rd_session_id_body
425 .deserialize()
426 .map_err(MutterError::SessionIdVariantParse)?;
427 let rd_session_id: String = rd_session_id_variant
428 .try_into()
429 .map_err(MutterError::SessionIdNotString)?;
430
431 let rd_session_path = rd_session_path.to_string();
432 tracing::debug!(
433 id = self.id,
434 rd_session_path = %rd_session_path,
435 rd_session_id = %rd_session_id,
436 "RemoteDesktop session started"
437 );
438
439 self.state = Some(Arc::new(MutterState {
440 conn: mutter_conn,
441 rd_session_path,
442 rd_session_id,
443 rd_started: Arc::new(Mutex::new(false)),
444 runtime_dir: self.runtime_dir.clone(),
445 active_stream_path: Arc::new(Mutex::new(None)),
446 }));
447
448 Ok(())
449 }
450}
451
452#[async_trait]
453impl CompositorRuntime for MutterCompositor {
454 async fn start(&mut self, resolution: Option<&str>) -> Result<()> {
455 Ok(self.start_inner(resolution).await?)
461 }
462
463 async fn stop(&mut self) -> Result<()> {
464 tracing::info!(id = self.id, "stopping mutter compositor");
465
466 if let Some(state) = &self.state {
472 let _ = state
473 .conn()
474 .call_method(
475 Some("org.gnome.Mutter.RemoteDesktop"),
476 state.rd_session_path(),
477 Some("org.gnome.Mutter.RemoteDesktop.Session"),
478 "Stop",
479 &(),
480 )
481 .await;
482 }
483
484 self.state = None;
489
490 if let Some(mut mutter) = self.mutter.take() {
491 let _ = mutter.kill().await;
492 let _ = mutter.wait().await;
493 }
494 if let Some(mut wireplumber) = self.wireplumber.take() {
495 let _ = wireplumber.kill().await;
496 let _ = wireplumber.wait().await;
497 }
498 if let Some(mut pipewire) = self.pipewire.take() {
499 let _ = pipewire.kill().await;
500 let _ = pipewire.wait().await;
501 }
502
503 if let Some(pid) = self.mutter_dbus_pid.take() {
504 unsafe {
505 libc::kill(pid as i32, libc::SIGTERM);
506 }
507 }
508
509 let _ = tokio::fs::remove_dir_all(&self.runtime_dir).await;
510
511 tracing::debug!(id = self.id, "mutter compositor stopped");
512 Ok(())
513 }
514
515 fn id(&self) -> &str {
516 &self.id
517 }
518
519 fn wayland_display(&self) -> &str {
520 &self.wayland_display
521 }
522
523 fn runtime_dir(&self) -> &Path {
524 &self.runtime_dir
525 }
526}
527
528impl Drop for MutterCompositor {
529 fn drop(&mut self) {
530 self.state = None;
533
534 if let Some(ref mut child) = self.mutter {
535 let _ = child.start_kill();
536 }
537 if let Some(ref mut child) = self.wireplumber {
538 let _ = child.start_kill();
539 }
540 if let Some(ref mut child) = self.pipewire {
541 let _ = child.start_kill();
542 }
543 if let Some(pid) = self.mutter_dbus_pid {
544 unsafe {
545 libc::kill(pid as i32, libc::SIGKILL);
546 }
547 }
548 let _ = std::fs::remove_dir_all(&self.runtime_dir);
549 }
550}
551
552fn parse_dbus_address(output: &str) -> std::result::Result<String, MutterError> {
555 for line in output.lines() {
556 if let Some(rest) = line.strip_prefix("DBUS_SESSION_BUS_ADDRESS='") {
557 if let Some(addr) = rest.strip_suffix("';") {
558 return Ok(addr.to_string());
559 }
560 }
561 }
562 Err(MutterError::DbusOutputMissingField {
563 field: "DBUS_SESSION_BUS_ADDRESS",
564 })
565}
566
567fn parse_dbus_pid(output: &str) -> std::result::Result<u32, MutterError> {
568 for line in output.lines() {
569 if let Some(rest) = line.strip_prefix("DBUS_SESSION_BUS_PID=") {
570 let pid_str = rest.trim_end_matches(';').trim();
571 return pid_str.parse().map_err(MutterError::DbusPidParse);
572 }
573 }
574 Err(MutterError::DbusOutputMissingField {
575 field: "DBUS_SESSION_BUS_PID",
576 })
577}
578
579fn parse_resolution(s: &str) -> std::result::Result<(u32, u32), MutterError> {
580 let invalid = || MutterError::ResolutionInvalid {
581 value: s.to_string(),
582 };
583 let (w, h) = s.split_once('x').ok_or_else(invalid)?;
584 let parse = |part: &str| -> std::result::Result<u32, MutterError> {
585 part.parse::<u32>()
586 .ok()
587 .filter(|n| *n > 0)
588 .ok_or_else(invalid)
589 };
590 Ok((parse(w)?, parse(h)?))
591}
592
593async fn wait_for_wayland_socket(
594 runtime_dir: &str,
595 display: &str,
596) -> std::result::Result<(), MutterError> {
597 let socket_path = PathBuf::from(runtime_dir).join(display);
598 for _ in 0..50 {
599 if socket_path.exists() {
600 return Ok(());
601 }
602 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
603 }
604 Err(MutterError::WaylandSocketTimeout {
605 socket: socket_path.display().to_string(),
606 })
607}
608
609async fn wait_for_pipewire_socket(runtime_dir: &str) -> std::result::Result<(), MutterError> {
615 let socket_path = PathBuf::from(runtime_dir).join("pipewire-0");
616 for _ in 0..50 {
617 if socket_path.exists() {
618 return Ok(());
619 }
620 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
621 }
622 Err(MutterError::PipewireSocketTimeout {
623 socket: socket_path.display().to_string(),
624 })
625}
626
627#[cfg(test)]
628mod tests {
629 use super::*;
630
631 #[test]
632 fn test_parse_dbus_address_valid() {
633 let output = "DBUS_SESSION_BUS_ADDRESS='unix:abstract=/tmp/dbus-XXX,guid=abc123';\nDBUS_SESSION_BUS_PID=12345;\n";
634 let addr = parse_dbus_address(output).unwrap();
635 assert_eq!(addr, "unix:abstract=/tmp/dbus-XXX,guid=abc123");
636 }
637
638 #[test]
639 fn test_parse_dbus_address_missing() {
640 let output = "DBUS_SESSION_BUS_PID=12345;\n";
641 assert!(parse_dbus_address(output).is_err());
642 }
643
644 #[test]
645 fn test_parse_dbus_pid_valid() {
646 let output = "DBUS_SESSION_BUS_ADDRESS='unix:abstract=/tmp/dbus-XXX,guid=abc123';\nDBUS_SESSION_BUS_PID=12345;\n";
647 let pid = parse_dbus_pid(output).unwrap();
648 assert_eq!(pid, 12345);
649 }
650
651 #[test]
652 fn test_parse_dbus_pid_missing() {
653 let output = "DBUS_SESSION_BUS_ADDRESS='unix:abstract=/tmp/dbus-XXX,guid=abc123';\n";
654 assert!(parse_dbus_pid(output).is_err());
655 }
656
657 #[test]
658 fn test_parse_dbus_pid_invalid() {
659 let output = "DBUS_SESSION_BUS_PID=notanumber;\n";
660 assert!(parse_dbus_pid(output).is_err());
661 }
662
663 #[tokio::test]
664 async fn test_wait_for_socket_found() {
665 let dir = tempfile::tempdir().unwrap();
666 let runtime_dir = dir.path().to_str().unwrap().to_string();
667 let display = "wayland-test-99";
668 std::fs::File::create(dir.path().join(display)).unwrap();
669 wait_for_wayland_socket(&runtime_dir, display)
670 .await
671 .unwrap();
672 }
673
674 #[tokio::test]
675 async fn test_wait_for_pipewire_socket_found() {
676 let dir = tempfile::tempdir().unwrap();
677 let runtime_dir = dir.path().to_str().unwrap().to_string();
678 std::fs::File::create(dir.path().join("pipewire-0")).unwrap();
679 wait_for_pipewire_socket(&runtime_dir).await.unwrap();
680 }
681
682 #[tokio::test]
683 async fn test_wait_for_pipewire_socket_timeout() {
684 let dir = tempfile::tempdir().unwrap();
685 let runtime_dir = dir.path().to_str().unwrap().to_string();
686 let err = wait_for_pipewire_socket(&runtime_dir).await.unwrap_err();
687 assert!(
688 matches!(err, MutterError::PipewireSocketTimeout { .. }),
689 "expected PipewireSocketTimeout, got: {err}"
690 );
691 let public: waydriver::Error = err.into();
695 assert!(
696 matches!(public, waydriver::Error::Timeout(_)),
697 "expected waydriver::Error::Timeout, got: {public}"
698 );
699 }
700
701 #[tokio::test]
702 async fn test_wait_for_socket_timeout() {
703 let dir = tempfile::tempdir().unwrap();
704 let runtime_dir = dir.path().to_str().unwrap().to_string();
705 let display = "wayland-nonexistent-0";
706 let err = wait_for_wayland_socket(&runtime_dir, display)
707 .await
708 .unwrap_err();
709 assert!(
710 matches!(err, MutterError::WaylandSocketTimeout { .. }),
711 "expected WaylandSocketTimeout, got: {err}"
712 );
713 let public: waydriver::Error = err.into();
717 assert!(
718 matches!(public, waydriver::Error::Timeout(_)),
719 "expected waydriver::Error::Timeout, got: {public}"
720 );
721 }
722
723 #[test]
724 fn test_new_generates_unique_ids() {
725 let a = MutterCompositor::new();
726 let b = MutterCompositor::new();
727 assert_ne!(a.id(), b.id());
728 }
729
730 #[test]
731 fn test_new_wayland_display_contains_id() {
732 let c = MutterCompositor::new();
733 assert!(
734 c.wayland_display().contains(c.id()),
735 "display '{}' should contain id '{}'",
736 c.wayland_display(),
737 c.id()
738 );
739 }
740
741 #[test]
742 fn test_new_runtime_dir_contains_id() {
743 let c = MutterCompositor::new();
744 let dir_str = c.runtime_dir().to_str().unwrap();
745 assert!(
746 dir_str.contains(c.id()),
747 "runtime_dir '{}' should contain id '{}'",
748 dir_str,
749 c.id()
750 );
751 }
752
753 #[test]
761 fn test_session_runtime_dirs_are_siblings_not_nested() {
762 let a = MutterCompositor::new();
763 let dir_a = a.runtime_dir().to_path_buf();
764
765 unsafe {
768 std::env::set_var("XDG_RUNTIME_DIR", &dir_a);
769 }
770
771 let b = MutterCompositor::new();
772 let dir_b = b.runtime_dir().to_path_buf();
773
774 assert_eq!(
775 dir_a.parent(),
776 dir_b.parent(),
777 "session dirs must share a parent (siblings), got a={dir_a:?} b={dir_b:?}"
778 );
779 assert!(
780 !dir_b.starts_with(&dir_a),
781 "session B nested inside session A: {dir_b:?}"
782 );
783 }
784
785 #[test]
786 fn test_new_wayland_display_prefix() {
787 let c = MutterCompositor::new();
788 assert!(c.wayland_display().starts_with("wayland-wd-"));
789 }
790
791 #[test]
792 fn test_new_runtime_dir_contains_session_prefix() {
793 let c = MutterCompositor::new();
794 let dir_str = c.runtime_dir().to_str().unwrap();
795 assert!(dir_str.contains("wd-session-"));
796 }
797
798 #[test]
799 fn test_state_returns_none_before_start() {
800 let c = MutterCompositor::new();
804 assert!(c.state().is_none());
805 }
806
807 #[test]
808 fn test_parse_resolution_accepts_hd() {
809 assert_eq!(parse_resolution("1920x1080").unwrap(), (1920, 1080));
810 assert_eq!(parse_resolution("1024x768").unwrap(), (1024, 768));
811 }
812
813 #[test]
814 fn test_parse_resolution_rejects_garbage() {
815 for bad in [
816 "",
817 "1920",
818 "1920x",
819 "x1080",
820 "0x0",
821 "1920x0",
822 "0x1080",
823 "1920x1080x1",
824 "abcxdef",
825 "-1x1080",
826 "1920 x 1080",
827 ] {
828 assert!(parse_resolution(bad).is_err(), "expected error for {bad:?}");
829 }
830 }
831
832 #[test]
833 fn test_default_same_structure_as_new() {
834 let c = MutterCompositor::default();
835 assert!(c.wayland_display().starts_with("wayland-wd-"));
836 assert!(c.runtime_dir().to_str().unwrap().contains("wd-session-"));
837 }
838}