use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use async_trait::async_trait;
use futures_util::StreamExt;
use zbus::zvariant::{OwnedObjectPath, Value};
use waydriver::{CaptureBackend, Error, PipeWireStream, Result, StreamToken};
use waydriver_compositor_mutter::MutterState;
pub struct MutterCapture {
state: Arc<MutterState>,
}
impl MutterCapture {
pub fn new(state: Arc<MutterState>) -> Self {
Self { state }
}
}
#[async_trait]
impl CaptureBackend for MutterCapture {
async fn start_stream(&self) -> Result<PipeWireStream> {
let conn = self.state.conn();
let empty_opts: HashMap<&str, Value> = HashMap::new();
let mut create_opts: HashMap<&str, Value> = HashMap::new();
create_opts.insert(
"remote-desktop-session-id",
Value::from(self.state.rd_session_id()),
);
let reply = conn
.call_method(
Some("org.gnome.Mutter.ScreenCast"),
"/org/gnome/Mutter/ScreenCast",
Some("org.gnome.Mutter.ScreenCast"),
"CreateSession",
&(create_opts,),
)
.await
.map_err(|e| Error::screenshot_with("CreateSession", e))?;
let session_path: OwnedObjectPath = reply
.body()
.deserialize()
.map_err(|e| Error::screenshot_with("parse session path", e))?;
let reply = conn
.call_method(
Some("org.gnome.Mutter.ScreenCast"),
session_path.as_str(),
Some("org.gnome.Mutter.ScreenCast.Session"),
"RecordMonitor",
&("", empty_opts),
)
.await
.map_err(|e| Error::screenshot_with("RecordMonitor", e))?;
let stream_path: OwnedObjectPath = reply
.body()
.deserialize()
.map_err(|e| Error::screenshot_with("parse stream path", e))?;
let stream_proxy: zbus::Proxy<'_> = zbus::proxy::Builder::new(conn)
.destination("org.gnome.Mutter.ScreenCast")
.map_err(|e| Error::screenshot_with("proxy destination", e))?
.path(stream_path.as_str())
.map_err(|e| Error::screenshot_with("proxy path", e))?
.interface("org.gnome.Mutter.ScreenCast.Stream")
.map_err(|e| Error::screenshot_with("proxy interface", e))?
.build()
.await
.map_err(|e| Error::screenshot_with("build stream proxy", e))?;
let mut signal_stream = stream_proxy
.receive_signal("PipeWireStreamAdded")
.await
.map_err(|e| Error::screenshot_with("receive_signal", e))?;
let should_start_rd = {
let mut guard = self.state.rd_started_lock()?;
if *guard {
false
} else {
*guard = true;
true
}
};
if should_start_rd {
conn.call_method(
Some("org.gnome.Mutter.RemoteDesktop"),
self.state.rd_session_path(),
Some("org.gnome.Mutter.RemoteDesktop.Session"),
"Start",
&(),
)
.await
.map_err(|e| Error::screenshot_with("RemoteDesktop Start", e))?;
} else {
conn.call_method(
Some("org.gnome.Mutter.ScreenCast"),
session_path.as_str(),
Some("org.gnome.Mutter.ScreenCast.Session"),
"Start",
&(),
)
.await
.map_err(|e| Error::screenshot_with("Start", e))?;
}
let node_id: u32 = tokio::time::timeout(std::time::Duration::from_secs(5), async {
let signal = signal_stream
.next()
.await
.ok_or_else(|| Error::screenshot("signal stream ended"))?;
signal
.body()
.deserialize::<u32>()
.map_err(|e| Error::screenshot_with("parse node_id", e))
})
.await
.map_err(|_| Error::screenshot("timeout waiting for PipeWireStreamAdded"))??;
tracing::debug!(node_id, "got PipeWire node id for screenshot");
*self.state.active_stream_path_lock()? = Some(stream_path.to_string());
Ok(PipeWireStream {
node_id,
token: StreamToken::new(session_path),
})
}
async fn stop_stream(&self, stream: PipeWireStream) -> Result<()> {
let session_path = stream.token.downcast::<OwnedObjectPath>()?;
let _ = self
.state
.conn()
.call_method(
Some("org.gnome.Mutter.ScreenCast"),
session_path.as_str(),
Some("org.gnome.Mutter.ScreenCast.Session"),
"Stop",
&(),
)
.await;
*self.state.active_stream_path_lock()? = None;
Ok(())
}
fn pipewire_socket(&self) -> PathBuf {
self.state.runtime_dir().join("pipewire-0")
}
}