use std::path::{Path, PathBuf};
use std::process::Stdio;
use std::sync::Arc;
use async_trait::async_trait;
use tokio::process::{Child, Command};
use waydriver::{CompositorRuntime, Error, Result};
pub struct MutterState {
pub conn: zbus::Connection,
pub rd_session_path: String,
pub runtime_dir: PathBuf,
}
pub struct MutterCompositor {
id: String,
wayland_display: String,
runtime_dir: PathBuf,
mutter_dbus_address: String,
mutter_dbus_pid: Option<u32>,
mutter: Option<Child>,
pipewire: Option<Child>,
wireplumber: Option<Child>,
state: Option<Arc<MutterState>>,
}
impl MutterCompositor {
pub fn new() -> Self {
let id = uuid::Uuid::new_v4().to_string()[..8].to_string();
let wayland_display = format!("wayland-wd-{}", id);
let host_runtime = std::env::var("XDG_RUNTIME_DIR")
.unwrap_or_else(|_| format!("/run/user/{}", unsafe { libc::getuid() }));
let runtime_dir = PathBuf::from(&host_runtime).join(format!("wd-session-{}", id));
Self {
id,
wayland_display,
runtime_dir,
mutter_dbus_address: String::new(),
mutter_dbus_pid: None,
mutter: None,
pipewire: None,
wireplumber: None,
state: None,
}
}
pub fn state(&self) -> Arc<MutterState> {
self.state
.as_ref()
.expect("MutterCompositor::state() called before start() or after stop()")
.clone()
}
}
impl Default for MutterCompositor {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl CompositorRuntime for MutterCompositor {
async fn start(&mut self) -> Result<()> {
tracing::info!(id = self.id, "starting mutter compositor");
tokio::fs::create_dir_all(&self.runtime_dir).await?;
let runtime_str = self.runtime_dir.to_str().unwrap().to_string();
let dbus_output = Command::new("dbus-launch")
.arg("--sh-syntax")
.output()
.await?;
if !dbus_output.status.success() {
return Err(Error::Process(format!(
"dbus-launch failed: {}",
String::from_utf8_lossy(&dbus_output.stderr)
)));
}
let dbus_stdout = String::from_utf8_lossy(&dbus_output.stdout);
self.mutter_dbus_address = parse_dbus_address(&dbus_stdout)?;
self.mutter_dbus_pid = Some(parse_dbus_pid(&dbus_stdout)?);
tracing::debug!(id = self.id, mutter_dbus_address = %self.mutter_dbus_address, "private D-Bus for mutter");
let pipewire = Command::new("pipewire")
.env("DBUS_SESSION_BUS_ADDRESS", &self.mutter_dbus_address)
.env("XDG_RUNTIME_DIR", &runtime_str)
.stdout(Stdio::null())
.stderr(Stdio::null())
.spawn()
.map_err(|e| Error::Process(format!("pipewire: {e}")))?;
self.pipewire = Some(pipewire);
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
let wireplumber = Command::new("wireplumber")
.env("DBUS_SESSION_BUS_ADDRESS", &self.mutter_dbus_address)
.env("XDG_RUNTIME_DIR", &runtime_str)
.stdout(Stdio::null())
.stderr(Stdio::null())
.spawn()
.map_err(|e| Error::Process(format!("wireplumber: {e}")))?;
self.wireplumber = Some(wireplumber);
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
tracing::debug!(id = self.id, "PipeWire + WirePlumber started");
let mutter = Command::new("mutter")
.args([
"--headless",
"--wayland",
"--no-x11",
"--wayland-display",
&self.wayland_display,
"--virtual-monitor",
"1024x768",
])
.env("DBUS_SESSION_BUS_ADDRESS", &self.mutter_dbus_address)
.env("XDG_RUNTIME_DIR", &runtime_str)
.stdout(Stdio::null())
.stderr(Stdio::inherit())
.spawn()
.map_err(|e| Error::Process(format!("mutter: {e}")))?;
self.mutter = Some(mutter);
tracing::debug!(id = self.id, wayland_display = %self.wayland_display, "mutter spawned");
wait_for_wayland_socket(&runtime_str, &self.wayland_display).await?;
tracing::debug!(id = self.id, "wayland socket ready");
let mutter_addr: zbus::address::Address = self
.mutter_dbus_address
.as_str()
.try_into()
.map_err(|e: zbus::Error| {
Error::Process(format!("invalid mutter dbus address: {e}"))
})?;
let mutter_conn = zbus::connection::Builder::address(mutter_addr)?
.build()
.await
.map_err(|e| Error::Process(format!("connect to mutter dbus: {e}")))?;
let mut rd_reply = None;
for i in 0..50 {
match mutter_conn
.call_method(
Some("org.gnome.Mutter.RemoteDesktop"),
"/org/gnome/Mutter/RemoteDesktop",
Some("org.gnome.Mutter.RemoteDesktop"),
"CreateSession",
&(),
)
.await
{
Ok(reply) => {
rd_reply = Some(reply);
break;
}
Err(e) if i < 49 => {
tracing::debug!(
id = self.id,
attempt = i,
"waiting for RemoteDesktop service: {e}"
);
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
Err(e) => {
return Err(Error::Process(format!("RemoteDesktop CreateSession: {e}")));
}
}
}
let rd_reply = rd_reply.unwrap();
let rd_session_path: zbus::zvariant::OwnedObjectPath = rd_reply
.body()
.deserialize()
.map_err(|e| Error::Process(format!("parse RD session path: {e}")))?;
mutter_conn
.call_method(
Some("org.gnome.Mutter.RemoteDesktop"),
rd_session_path.as_str(),
Some("org.gnome.Mutter.RemoteDesktop.Session"),
"Start",
&(),
)
.await
.map_err(|e| Error::Process(format!("RemoteDesktop Start: {e}")))?;
let rd_session_path = rd_session_path.to_string();
tracing::debug!(id = self.id, rd_session_path = %rd_session_path, "RemoteDesktop session started");
self.state = Some(Arc::new(MutterState {
conn: mutter_conn,
rd_session_path,
runtime_dir: self.runtime_dir.clone(),
}));
Ok(())
}
async fn stop(&mut self) -> Result<()> {
tracing::info!(id = self.id, "stopping mutter compositor");
if let Some(state) = &self.state {
let _ = state
.conn
.call_method(
Some("org.gnome.Mutter.RemoteDesktop"),
state.rd_session_path.as_str(),
Some("org.gnome.Mutter.RemoteDesktop.Session"),
"Stop",
&(),
)
.await;
}
self.state = None;
if let Some(mut mutter) = self.mutter.take() {
let _ = mutter.kill().await;
let _ = mutter.wait().await;
}
if let Some(mut wireplumber) = self.wireplumber.take() {
let _ = wireplumber.kill().await;
let _ = wireplumber.wait().await;
}
if let Some(mut pipewire) = self.pipewire.take() {
let _ = pipewire.kill().await;
let _ = pipewire.wait().await;
}
if let Some(pid) = self.mutter_dbus_pid.take() {
unsafe {
libc::kill(pid as i32, libc::SIGTERM);
}
}
let _ = tokio::fs::remove_dir_all(&self.runtime_dir).await;
tracing::debug!(id = self.id, "mutter compositor stopped");
Ok(())
}
fn id(&self) -> &str {
&self.id
}
fn wayland_display(&self) -> &str {
&self.wayland_display
}
fn runtime_dir(&self) -> &Path {
&self.runtime_dir
}
}
impl Drop for MutterCompositor {
fn drop(&mut self) {
self.state = None;
if let Some(ref mut child) = self.mutter {
let _ = child.start_kill();
}
if let Some(ref mut child) = self.wireplumber {
let _ = child.start_kill();
}
if let Some(ref mut child) = self.pipewire {
let _ = child.start_kill();
}
if let Some(pid) = self.mutter_dbus_pid {
unsafe {
libc::kill(pid as i32, libc::SIGKILL);
}
}
let _ = std::fs::remove_dir_all(&self.runtime_dir);
}
}
fn parse_dbus_address(output: &str) -> Result<String> {
for line in output.lines() {
if let Some(rest) = line.strip_prefix("DBUS_SESSION_BUS_ADDRESS='") {
if let Some(addr) = rest.strip_suffix("';") {
return Ok(addr.to_string());
}
}
}
Err(Error::Process(
"could not parse DBUS_SESSION_BUS_ADDRESS from dbus-launch".to_string(),
))
}
fn parse_dbus_pid(output: &str) -> Result<u32> {
for line in output.lines() {
if let Some(rest) = line.strip_prefix("DBUS_SESSION_BUS_PID=") {
let pid_str = rest.trim_end_matches(';').trim();
return pid_str
.parse()
.map_err(|e| Error::Process(format!("invalid dbus PID: {e}")));
}
}
Err(Error::Process(
"could not parse DBUS_SESSION_BUS_PID from dbus-launch".to_string(),
))
}
async fn wait_for_wayland_socket(runtime_dir: &str, display: &str) -> Result<()> {
let socket_path = PathBuf::from(runtime_dir).join(display);
for _ in 0..50 {
if socket_path.exists() {
return Ok(());
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
Err(Error::Timeout(format!(
"wayland socket {} did not appear within 5s",
socket_path.display()
)))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_dbus_address_valid() {
let output = "DBUS_SESSION_BUS_ADDRESS='unix:abstract=/tmp/dbus-XXX,guid=abc123';\nDBUS_SESSION_BUS_PID=12345;\n";
let addr = parse_dbus_address(output).unwrap();
assert_eq!(addr, "unix:abstract=/tmp/dbus-XXX,guid=abc123");
}
#[test]
fn test_parse_dbus_address_missing() {
let output = "DBUS_SESSION_BUS_PID=12345;\n";
assert!(parse_dbus_address(output).is_err());
}
#[test]
fn test_parse_dbus_pid_valid() {
let output = "DBUS_SESSION_BUS_ADDRESS='unix:abstract=/tmp/dbus-XXX,guid=abc123';\nDBUS_SESSION_BUS_PID=12345;\n";
let pid = parse_dbus_pid(output).unwrap();
assert_eq!(pid, 12345);
}
#[test]
fn test_parse_dbus_pid_missing() {
let output = "DBUS_SESSION_BUS_ADDRESS='unix:abstract=/tmp/dbus-XXX,guid=abc123';\n";
assert!(parse_dbus_pid(output).is_err());
}
#[test]
fn test_parse_dbus_pid_invalid() {
let output = "DBUS_SESSION_BUS_PID=notanumber;\n";
assert!(parse_dbus_pid(output).is_err());
}
#[tokio::test]
async fn test_wait_for_socket_found() {
let dir = tempfile::tempdir().unwrap();
let runtime_dir = dir.path().to_str().unwrap().to_string();
let display = "wayland-test-99";
std::fs::File::create(dir.path().join(display)).unwrap();
wait_for_wayland_socket(&runtime_dir, display)
.await
.unwrap();
}
#[tokio::test]
async fn test_wait_for_socket_timeout() {
let dir = tempfile::tempdir().unwrap();
let runtime_dir = dir.path().to_str().unwrap().to_string();
let display = "wayland-nonexistent-0";
let err = wait_for_wayland_socket(&runtime_dir, display)
.await
.unwrap_err();
assert!(
matches!(err, Error::Timeout(_)),
"expected Timeout, got: {err}"
);
}
#[test]
fn test_new_generates_unique_ids() {
let a = MutterCompositor::new();
let b = MutterCompositor::new();
assert_ne!(a.id(), b.id());
}
#[test]
fn test_new_wayland_display_contains_id() {
let c = MutterCompositor::new();
assert!(
c.wayland_display().contains(c.id()),
"display '{}' should contain id '{}'",
c.wayland_display(),
c.id()
);
}
#[test]
fn test_new_runtime_dir_contains_id() {
let c = MutterCompositor::new();
let dir_str = c.runtime_dir().to_str().unwrap();
assert!(
dir_str.contains(c.id()),
"runtime_dir '{}' should contain id '{}'",
dir_str,
c.id()
);
}
#[test]
fn test_new_wayland_display_prefix() {
let c = MutterCompositor::new();
assert!(c.wayland_display().starts_with("wayland-wd-"));
}
#[test]
fn test_new_runtime_dir_contains_session_prefix() {
let c = MutterCompositor::new();
let dir_str = c.runtime_dir().to_str().unwrap();
assert!(dir_str.contains("wd-session-"));
}
#[test]
#[should_panic(expected = "before start")]
fn test_state_panics_before_start() {
let c = MutterCompositor::new();
let _ = c.state();
}
#[test]
fn test_default_same_structure_as_new() {
let c = MutterCompositor::default();
assert!(c.wayland_display().starts_with("wayland-wd-"));
assert!(c.runtime_dir().to_str().unwrap().contains("wd-session-"));
}
}