use std::path::{Path, PathBuf};
use std::process::Stdio;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use atspi::connection::AccessibilityConnection;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::{Child, Command};
use tokio::sync::Notify;
use crate::atspi as atspi_client;
use crate::backend::{CaptureBackend, CompositorRuntime, InputBackend};
use crate::capture::VideoRecorder;
use crate::error::{Error, Result};
use crate::locator::Locator;
const FALLBACK_DEFAULT_TIMEOUT: Duration = Duration::from_secs(5);
pub const DEFAULT_TIMEOUT_ENV_VAR: &str = "WAYDRIVER_DEFAULT_TIMEOUT_MS";
pub struct SessionConfig {
pub command: String,
pub args: Vec<String>,
pub cwd: Option<String>,
pub app_name: String,
pub video_output: Option<PathBuf>,
pub video_bitrate: Option<u32>,
}
#[derive(Default)]
struct AppStdout {
lines: Mutex<Vec<String>>,
notify: Notify,
}
pub struct Session {
pub id: String,
pub app_name: String,
pub app_bus_name: String,
pub app_path: String,
pub a11y_connection: Option<AccessibilityConnection>,
default_timeout_ns: AtomicU64,
app: Child,
keepalive_stream: Option<crate::backend::PipeWireStream>,
video_recorder: Option<VideoRecorder>,
input: Box<dyn InputBackend>,
capture: Box<dyn CaptureBackend>,
compositor: Box<dyn CompositorRuntime>,
stdout: Arc<AppStdout>,
}
impl Session {
pub async fn start(
compositor: Box<dyn CompositorRuntime>,
input: Box<dyn InputBackend>,
capture: Box<dyn CaptureBackend>,
cfg: SessionConfig,
) -> Result<Self> {
let id = compositor.id().to_string();
tracing::info!(id, "starting session");
let dbus_address = get_host_session_bus()?;
let mut app = spawn_app(
&cfg,
compositor.wayland_display(),
compositor.runtime_dir(),
&dbus_address,
)?;
tracing::debug!(id, app_name = %cfg.app_name, "app spawned");
let stdout = Arc::new(AppStdout::default());
if let Some(child_stdout) = app.stdout.take() {
let captured = stdout.clone();
let id_for_task = id.clone();
tokio::spawn(async move {
let mut reader = BufReader::new(child_stdout).lines();
loop {
match reader.next_line().await {
Ok(Some(line)) => {
tracing::trace!(id = id_for_task, line = %line, "app stdout");
{
let mut guard = captured.lines.lock().unwrap();
guard.push(line);
}
captured.notify.notify_waiters();
}
Ok(None) => break,
Err(e) => {
tracing::debug!(id = id_for_task, error = %e, "app stdout read error");
break;
}
}
}
});
}
let a11y_connection = atspi_client::connect_a11y(&dbus_address).await?;
let (app_bus_name, app_path) = wait_for_app(&a11y_connection, &cfg.app_name).await?;
tracing::info!(id, app_name = %cfg.app_name, %app_bus_name, "session ready");
let keepalive_stream = capture.start_stream().await?;
let video_recorder = if let Some(ref path) = cfg.video_output {
let bitrate = cfg
.video_bitrate
.unwrap_or(crate::capture::DEFAULT_VIDEO_BITRATE);
Some(
capture
.start_recording(&keepalive_stream, path, bitrate)
.await?,
)
} else {
None
};
let session = Session {
id,
app_name: cfg.app_name,
app_bus_name,
app_path,
a11y_connection: Some(a11y_connection),
default_timeout_ns: AtomicU64::new(resolve_default_timeout().as_nanos() as u64),
app,
keepalive_stream: Some(keepalive_stream),
video_recorder,
input,
capture,
compositor,
stdout,
};
Ok(session)
}
pub async fn kill(mut self) -> Result<()> {
tracing::info!(id = self.id, "killing session");
let _ = self.app.kill().await;
let _ = self.app.wait().await;
if let Some(recorder) = self.video_recorder.take() {
if let Err(e) = self.capture.stop_recording(recorder).await {
tracing::warn!(error = %e, "stop_recording failed");
}
}
if let Some(stream) = self.keepalive_stream.take() {
let _ = self.capture.stop_stream(stream).await;
}
self.compositor.stop().await?;
Ok(())
}
pub async fn press_keysym(&self, keysym: u32) -> Result<()> {
self.input.press_keysym(keysym).await
}
pub async fn press_chord(&self, chord: &str) -> Result<()> {
let parsed = crate::keysym::parse_chord(chord)
.ok_or_else(|| Error::Process(format!("invalid chord: {chord:?}")))?;
for m in &parsed.modifiers {
self.input.key_down(*m).await?;
}
let target_result = self.input.press_keysym(parsed.key).await;
for m in parsed.modifiers.iter().rev() {
if let Err(e) = self.input.key_up(*m).await {
tracing::warn!(error = %e, keysym = m, "key_up failed during chord unwind");
}
}
target_result
}
pub async fn pointer_motion_relative(&self, dx: f64, dy: f64) -> Result<()> {
self.input.pointer_motion_relative(dx, dy).await
}
pub async fn pointer_button(&self, button: u32) -> Result<()> {
self.input.pointer_button(button).await
}
pub fn wayland_display(&self) -> &str {
self.compositor.wayland_display()
}
pub async fn take_screenshot(&self) -> Result<Vec<u8>> {
let stream = self
.keepalive_stream
.as_ref()
.ok_or_else(|| Error::Screenshot("no keepalive stream".into()))?;
self.capture.grab_screenshot(stream).await
}
pub fn default_timeout(&self) -> Duration {
Duration::from_nanos(self.default_timeout_ns.load(Ordering::Relaxed))
}
pub fn set_default_timeout(&self, timeout: Duration) {
self.default_timeout_ns
.store(timeout.as_nanos() as u64, Ordering::Relaxed);
}
pub fn stdout_lines(&self) -> Vec<String> {
self.stdout.lines.lock().unwrap().clone()
}
pub fn stdout_cursor(&self) -> usize {
self.stdout.lines.lock().unwrap().len()
}
pub async fn wait_for_stdout_line<F>(
&self,
after: usize,
pred: F,
timeout: Duration,
) -> Result<String>
where
F: Fn(&str) -> bool,
{
let deadline = tokio::time::Instant::now() + timeout;
loop {
let notified = self.stdout.notify.notified();
tokio::pin!(notified);
{
let guard = self.stdout.lines.lock().unwrap();
for line in guard.iter().skip(after) {
if pred(line) {
return Ok(line.clone());
}
}
}
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
return Err(Error::Timeout(format!(
"no stdout line matched within {timeout:?} (buffer had {} line(s) after cursor {after})",
self.stdout.lines.lock().unwrap().len().saturating_sub(after),
)));
}
if tokio::time::timeout(remaining, &mut notified)
.await
.is_err()
{
return Err(Error::Timeout(format!(
"no stdout line matched within {timeout:?} (buffer had {} line(s) after cursor {after})",
self.stdout.lines.lock().unwrap().len().saturating_sub(after),
)));
}
}
}
pub async fn dump_tree(&self) -> Result<String> {
let a11y = self
.a11y_connection
.as_ref()
.ok_or_else(|| Error::Atspi("session has no AT-SPI connection".into()))?;
atspi_client::snapshot_tree(a11y, &self.app_bus_name, &self.app_path).await
}
}
impl Session {
pub fn locate(self: &Arc<Self>, xpath: &str) -> Locator {
Locator::new(self.clone(), xpath.to_string())
}
pub fn root(self: &Arc<Self>) -> Locator {
self.locate("/*")
}
pub fn find_by_id(self: &Arc<Self>, id: &str) -> Locator {
self.locate(&find_by_id_xpath(id))
}
pub fn find_by_name(self: &Arc<Self>, name: &str) -> Locator {
self.locate(&find_by_name_xpath(name))
}
pub fn find_by_role_name(self: &Arc<Self>, role: &str, name: &str) -> Locator {
self.locate(&find_by_role_name_xpath(role, name))
}
}
fn find_by_id_xpath(id: &str) -> String {
format!("//*[@id={}]", xpath_literal(id))
}
fn find_by_name_xpath(name: &str) -> String {
format!("//*[@name={}]", xpath_literal(name))
}
fn find_by_role_name_xpath(role: &str, name: &str) -> String {
format!("//{}[@name={}]", role, xpath_literal(name))
}
fn xpath_literal(s: &str) -> String {
let has_single = s.contains('\'');
let has_double = s.contains('"');
match (has_single, has_double) {
(false, _) => format!("'{s}'"),
(true, false) => format!("\"{s}\""),
(true, true) => {
let parts: Vec<String> = s.split('\'').map(|p| format!("'{p}'")).collect::<Vec<_>>();
format!("concat({})", parts.join(", \"'\", "))
}
}
}
#[cfg(any(test, feature = "test-support"))]
impl Session {
pub fn new_for_test(
id: String,
app_name: String,
input: Box<dyn InputBackend>,
capture: Box<dyn CaptureBackend>,
compositor: Box<dyn CompositorRuntime>,
) -> Self {
let app = Command::new("sleep")
.arg("86400")
.stdout(Stdio::null())
.stderr(Stdio::null())
.spawn()
.expect("failed to spawn sleep for test session");
Session {
id,
app_name,
app_bus_name: String::new(),
app_path: String::new(),
a11y_connection: None,
default_timeout_ns: AtomicU64::new(FALLBACK_DEFAULT_TIMEOUT.as_nanos() as u64),
app,
keepalive_stream: None,
video_recorder: None,
input,
capture,
compositor,
stdout: Arc::new(AppStdout::default()),
}
}
pub fn push_stdout_line_for_test(&self, line: impl Into<String>) {
{
let mut guard = self.stdout.lines.lock().unwrap();
guard.push(line.into());
}
self.stdout.notify.notify_waiters();
}
}
impl Drop for Session {
fn drop(&mut self) {
let _ = self.app.start_kill();
}
}
fn resolve_default_timeout() -> Duration {
std::env::var(DEFAULT_TIMEOUT_ENV_VAR)
.ok()
.and_then(|s| s.parse::<u64>().ok())
.map(Duration::from_millis)
.unwrap_or(FALLBACK_DEFAULT_TIMEOUT)
}
fn get_host_session_bus() -> Result<String> {
Ok(get_host_session_bus_inner(
std::env::var("DBUS_SESSION_BUS_ADDRESS").ok().as_deref(),
))
}
fn get_host_session_bus_inner(env_addr: Option<&str>) -> String {
if let Some(addr) = env_addr {
return addr.to_string();
}
let uid = unsafe { libc::getuid() };
format!("unix:path=/run/user/{}/bus", uid)
}
fn spawn_app(
cfg: &SessionConfig,
wayland_display: &str,
runtime_dir: &Path,
dbus_address: &str,
) -> Result<Child> {
let config_dir = runtime_dir.join("config");
let _ = std::fs::create_dir_all(&config_dir);
let mut cmd = Command::new(&cfg.command);
cmd.args(&cfg.args)
.env("WAYLAND_DISPLAY", wayland_display)
.env("DBUS_SESSION_BUS_ADDRESS", dbus_address)
.env("XDG_RUNTIME_DIR", runtime_dir)
.env("XDG_CONFIG_HOME", &config_dir)
.env("GSETTINGS_BACKEND", "keyfile")
.env("NO_AT_BRIDGE", "0")
.env("GTK_A11Y", "atspi")
.stdout(Stdio::piped())
.stderr(Stdio::null());
if let Some(dir) = &cfg.cwd {
cmd.current_dir(dir);
}
cmd.spawn()
.map_err(|e| Error::Process(format!("app '{}': {e}", cfg.command)))
}
fn normalize_app_name(name: &str) -> String {
name.to_lowercase().replace('-', " ")
}
fn app_name_matches(found: &str, target: &str) -> bool {
if found.is_empty() || target.is_empty() {
return false;
}
let norm_found = normalize_app_name(found);
let norm_target = normalize_app_name(target);
norm_found.contains(&norm_target) || norm_target.contains(&norm_found)
}
async fn wait_for_app(conn: &AccessibilityConnection, app_name: &str) -> Result<(String, String)> {
for i in 0..100 {
if let Ok(root) = atspi_client::get_registry_root(conn).await {
if let Ok(children) = root.get_children().await {
let mut found_names = Vec::new();
for child_ref in &children {
let Some(bus_name) = child_ref.name_as_str() else {
continue;
};
let path = child_ref.path_as_str();
if let Ok(child) =
atspi_client::build_accessible(conn.connection(), bus_name, path).await
{
if let Ok(name) = child.name().await {
if app_name_matches(&name, app_name) {
tracing::info!(
"found app '{}' as '{}' at {}:{}",
app_name,
name,
bus_name,
path
);
return Ok((bus_name.to_string(), path.to_string()));
}
found_names.push(name);
}
}
}
if i % 20 == 0 {
tracing::debug!(
"AT-SPI registry has {} apps: {:?} (looking for '{}')",
found_names.len(),
found_names,
app_name
);
}
}
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
Err(Error::Timeout(format!(
"app '{}' did not appear in AT-SPI registry within 10s",
app_name
)))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_get_host_session_bus_from_env() {
let addr = "unix:path=/run/user/1000/bus";
let result = get_host_session_bus_inner(Some(addr));
assert_eq!(result, addr);
}
#[test]
fn test_get_host_session_bus_fallback() {
let result = get_host_session_bus_inner(None);
assert!(
result.contains("/run/user/"),
"expected /run/user/ path, got: {result}"
);
}
#[test]
fn test_normalize_app_name_lowercase() {
assert_eq!(normalize_app_name("GNOME-Calculator"), "gnome calculator");
}
#[test]
fn test_normalize_app_name_hyphens_to_spaces() {
assert_eq!(normalize_app_name("gnome-text-editor"), "gnome text editor");
}
#[test]
fn test_normalize_app_name_already_normal() {
assert_eq!(normalize_app_name("calculator"), "calculator");
}
#[test]
fn test_normalize_app_name_empty() {
assert_eq!(normalize_app_name(""), "");
}
#[test]
fn test_app_name_matches_exact() {
assert!(app_name_matches("Calculator", "calculator"));
}
#[test]
fn test_app_name_matches_target_contains_found() {
assert!(app_name_matches("Calculator", "gnome-calculator"));
}
#[test]
fn test_app_name_matches_found_contains_target() {
assert!(app_name_matches(
"GNOME Calculator 46.1",
"gnome-calculator"
));
}
#[test]
fn test_app_name_matches_no_match() {
assert!(!app_name_matches("Firefox", "gnome-calculator"));
}
#[test]
fn test_app_name_matches_hyphen_vs_space() {
assert!(app_name_matches("gnome calculator", "gnome-calculator"));
}
#[test]
fn test_app_name_matches_empty_target() {
assert!(!app_name_matches("Calculator", ""));
}
#[test]
fn test_app_name_matches_empty_found() {
assert!(!app_name_matches("", "calculator"));
}
#[test]
fn test_app_name_matches_both_empty() {
assert!(!app_name_matches("", ""));
}
#[test]
fn xpath_literal_plain() {
assert_eq!(xpath_literal("OK"), "'OK'");
}
#[test]
fn xpath_literal_with_apostrophe() {
assert_eq!(xpath_literal("John's"), "\"John's\"");
}
#[test]
fn xpath_literal_with_double_quote() {
assert_eq!(xpath_literal("a\"b"), "'a\"b'");
}
#[test]
fn xpath_literal_with_both_quotes() {
let out = xpath_literal("a'b\"c");
assert_eq!(out, "concat('a', \"'\", 'b\"c')");
}
#[test]
fn find_by_id_xpath_simple() {
assert_eq!(find_by_id_xpath("submit-btn"), "//*[@id='submit-btn']");
}
#[test]
fn find_by_id_xpath_escapes_apostrophe() {
assert_eq!(find_by_id_xpath("a'b"), "//*[@id=\"a'b\"]");
}
#[test]
fn find_by_name_xpath_simple() {
assert_eq!(find_by_name_xpath("OK"), "//*[@name='OK']");
}
#[test]
fn find_by_name_xpath_with_space() {
assert_eq!(find_by_name_xpath("Save As"), "//*[@name='Save As']");
}
#[test]
fn find_by_name_xpath_with_both_quotes_uses_concat() {
assert_eq!(
find_by_name_xpath("John's \"file\""),
"//*[@name=concat('John', \"'\", 's \"file\"')]"
);
}
#[test]
fn find_by_role_name_xpath_composes_role_and_name() {
assert_eq!(
find_by_role_name_xpath("PushButton", "OK"),
"//PushButton[@name='OK']"
);
}
#[test]
fn find_by_role_name_xpath_preserves_role_as_element_name() {
assert_eq!(
find_by_role_name_xpath("MenuItem", "File"),
"//MenuItem[@name='File']"
);
}
#[test]
fn resolve_default_timeout_cases() {
std::env::remove_var(DEFAULT_TIMEOUT_ENV_VAR);
assert_eq!(resolve_default_timeout(), FALLBACK_DEFAULT_TIMEOUT);
std::env::set_var(DEFAULT_TIMEOUT_ENV_VAR, "750");
assert_eq!(resolve_default_timeout(), Duration::from_millis(750));
std::env::set_var(DEFAULT_TIMEOUT_ENV_VAR, "not-a-number");
assert_eq!(resolve_default_timeout(), FALLBACK_DEFAULT_TIMEOUT);
std::env::set_var(DEFAULT_TIMEOUT_ENV_VAR, "");
assert_eq!(resolve_default_timeout(), FALLBACK_DEFAULT_TIMEOUT);
std::env::remove_var(DEFAULT_TIMEOUT_ENV_VAR);
}
#[tokio::test]
async fn session_default_timeout_can_be_overridden() {
use crate::backend::{CaptureBackend, CompositorRuntime, InputBackend, PipeWireStream};
use async_trait::async_trait;
use std::path::{Path, PathBuf};
struct StubCompositor;
#[async_trait]
impl CompositorRuntime for StubCompositor {
async fn start(&mut self, _r: Option<&str>) -> Result<()> {
Ok(())
}
async fn stop(&mut self) -> Result<()> {
Ok(())
}
fn id(&self) -> &str {
"s"
}
fn wayland_display(&self) -> &str {
"d"
}
fn runtime_dir(&self) -> &Path {
Path::new("/tmp")
}
}
struct StubInput;
#[async_trait]
impl InputBackend for StubInput {
async fn press_keysym(&self, _: u32) -> Result<()> {
Ok(())
}
async fn key_down(&self, _: u32) -> Result<()> {
Ok(())
}
async fn key_up(&self, _: u32) -> Result<()> {
Ok(())
}
async fn pointer_motion_relative(&self, _: f64, _: f64) -> Result<()> {
Ok(())
}
async fn pointer_button(&self, _: u32) -> Result<()> {
Ok(())
}
}
struct StubCapture;
#[async_trait]
impl CaptureBackend for StubCapture {
async fn start_stream(&self) -> Result<PipeWireStream> {
unimplemented!()
}
async fn stop_stream(&self, _: PipeWireStream) -> Result<()> {
Ok(())
}
fn pipewire_socket(&self) -> PathBuf {
PathBuf::from("/tmp")
}
}
let s = Session::new_for_test(
"t".into(),
"a".into(),
Box::new(StubInput),
Box::new(StubCapture),
Box::new(StubCompositor),
);
assert_eq!(s.default_timeout(), FALLBACK_DEFAULT_TIMEOUT);
s.set_default_timeout(Duration::from_millis(1234));
assert_eq!(s.default_timeout(), Duration::from_millis(1234));
}
#[tokio::test]
async fn press_chord_issues_modifiers_then_target_then_releases_in_reverse() {
use crate::backend::{CaptureBackend, CompositorRuntime, InputBackend, PipeWireStream};
use async_trait::async_trait;
use std::path::{Path, PathBuf};
use std::sync::Mutex;
#[derive(Debug, PartialEq, Eq)]
enum Event {
Down(u32),
Up(u32),
Press(u32),
}
struct RecordingInput(Arc<Mutex<Vec<Event>>>);
#[async_trait]
impl InputBackend for RecordingInput {
async fn press_keysym(&self, k: u32) -> Result<()> {
self.0.lock().unwrap().push(Event::Press(k));
Ok(())
}
async fn key_down(&self, k: u32) -> Result<()> {
self.0.lock().unwrap().push(Event::Down(k));
Ok(())
}
async fn key_up(&self, k: u32) -> Result<()> {
self.0.lock().unwrap().push(Event::Up(k));
Ok(())
}
async fn pointer_motion_relative(&self, _: f64, _: f64) -> Result<()> {
Ok(())
}
async fn pointer_button(&self, _: u32) -> Result<()> {
Ok(())
}
}
struct StubCompositor;
#[async_trait]
impl CompositorRuntime for StubCompositor {
async fn start(&mut self, _: Option<&str>) -> Result<()> {
Ok(())
}
async fn stop(&mut self) -> Result<()> {
Ok(())
}
fn id(&self) -> &str {
"s"
}
fn wayland_display(&self) -> &str {
"d"
}
fn runtime_dir(&self) -> &Path {
Path::new("/tmp")
}
}
struct StubCapture;
#[async_trait]
impl CaptureBackend for StubCapture {
async fn start_stream(&self) -> Result<PipeWireStream> {
unimplemented!()
}
async fn stop_stream(&self, _: PipeWireStream) -> Result<()> {
Ok(())
}
fn pipewire_socket(&self) -> PathBuf {
PathBuf::from("/tmp")
}
}
let events = Arc::new(Mutex::new(Vec::<Event>::new()));
let s = Session::new_for_test(
"t".into(),
"a".into(),
Box::new(RecordingInput(events.clone())),
Box::new(StubCapture),
Box::new(StubCompositor),
);
s.press_chord("Ctrl+Shift+A").await.unwrap();
let ctrl = 0xffe3_u32;
let shift = 0xffe1_u32;
let a = crate::keysym::char_to_keysym('A');
let recorded = events.lock().unwrap().iter().collect::<Vec<_>>().len();
let got: Vec<Event> = std::mem::take(&mut *events.lock().unwrap());
assert_eq!(recorded, 5);
assert_eq!(
got,
vec![
Event::Down(ctrl),
Event::Down(shift),
Event::Press(a),
Event::Up(shift),
Event::Up(ctrl),
]
);
}
#[tokio::test]
async fn press_chord_rejects_garbage() {
use crate::backend::{CaptureBackend, CompositorRuntime, InputBackend, PipeWireStream};
use async_trait::async_trait;
use std::path::{Path, PathBuf};
struct StubCompositor;
#[async_trait]
impl CompositorRuntime for StubCompositor {
async fn start(&mut self, _: Option<&str>) -> Result<()> {
Ok(())
}
async fn stop(&mut self) -> Result<()> {
Ok(())
}
fn id(&self) -> &str {
"s"
}
fn wayland_display(&self) -> &str {
"d"
}
fn runtime_dir(&self) -> &Path {
Path::new("/tmp")
}
}
struct StubInput;
#[async_trait]
impl InputBackend for StubInput {
async fn press_keysym(&self, _: u32) -> Result<()> {
Ok(())
}
async fn key_down(&self, _: u32) -> Result<()> {
Ok(())
}
async fn key_up(&self, _: u32) -> Result<()> {
Ok(())
}
async fn pointer_motion_relative(&self, _: f64, _: f64) -> Result<()> {
Ok(())
}
async fn pointer_button(&self, _: u32) -> Result<()> {
Ok(())
}
}
struct StubCapture;
#[async_trait]
impl CaptureBackend for StubCapture {
async fn start_stream(&self) -> Result<PipeWireStream> {
unimplemented!()
}
async fn stop_stream(&self, _: PipeWireStream) -> Result<()> {
Ok(())
}
fn pipewire_socket(&self) -> PathBuf {
PathBuf::from("/tmp")
}
}
let s = Session::new_for_test(
"t".into(),
"a".into(),
Box::new(StubInput),
Box::new(StubCapture),
Box::new(StubCompositor),
);
let err = s.press_chord("Hyper+Nope").await.unwrap_err();
assert!(
matches!(err, Error::Process(ref m) if m.contains("invalid chord")),
"expected process:invalid chord, got {err:?}"
);
}
fn make_test_session() -> Session {
use crate::backend::{CaptureBackend, CompositorRuntime, InputBackend, PipeWireStream};
use async_trait::async_trait;
use std::path::{Path, PathBuf};
struct StubCompositor;
#[async_trait]
impl CompositorRuntime for StubCompositor {
async fn start(&mut self, _: Option<&str>) -> Result<()> {
Ok(())
}
async fn stop(&mut self) -> Result<()> {
Ok(())
}
fn id(&self) -> &str {
"s"
}
fn wayland_display(&self) -> &str {
"d"
}
fn runtime_dir(&self) -> &Path {
Path::new("/tmp")
}
}
struct StubInput;
#[async_trait]
impl InputBackend for StubInput {
async fn press_keysym(&self, _: u32) -> Result<()> {
Ok(())
}
async fn key_down(&self, _: u32) -> Result<()> {
Ok(())
}
async fn key_up(&self, _: u32) -> Result<()> {
Ok(())
}
async fn pointer_motion_relative(&self, _: f64, _: f64) -> Result<()> {
Ok(())
}
async fn pointer_button(&self, _: u32) -> Result<()> {
Ok(())
}
}
struct StubCapture;
#[async_trait]
impl CaptureBackend for StubCapture {
async fn start_stream(&self) -> Result<PipeWireStream> {
unimplemented!()
}
async fn stop_stream(&self, _: PipeWireStream) -> Result<()> {
Ok(())
}
fn pipewire_socket(&self) -> PathBuf {
PathBuf::from("/tmp")
}
}
Session::new_for_test(
"t".into(),
"a".into(),
Box::new(StubInput),
Box::new(StubCapture),
Box::new(StubCompositor),
)
}
#[tokio::test]
async fn wait_for_stdout_line_returns_existing_match_immediately() {
let s = make_test_session();
s.push_stdout_line_for_test("fixture-event: clicked primary-button");
let line = s
.wait_for_stdout_line(
0,
|l| l.contains("clicked primary-button"),
Duration::from_millis(100),
)
.await
.expect("should match existing line");
assert!(line.contains("clicked primary-button"));
}
#[tokio::test]
async fn wait_for_stdout_line_respects_after_cursor() {
let s = make_test_session();
s.push_stdout_line_for_test("some startup chatter");
s.push_stdout_line_for_test("fixture-event: clicked old-button");
let cursor = s.stdout_cursor();
assert_eq!(cursor, 2);
s.push_stdout_line_for_test("fixture-event: clicked new-button");
let line = s
.wait_for_stdout_line(
cursor,
|l| l.contains("clicked"),
Duration::from_millis(100),
)
.await
.expect("should match line after cursor");
assert!(line.contains("new-button"), "got: {line}");
}
#[tokio::test]
async fn wait_for_stdout_line_wakes_on_notify() {
let s = Arc::new(make_test_session());
let cursor = s.stdout_cursor();
let pusher = s.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(50)).await;
pusher.push_stdout_line_for_test("fixture-event: clicked async-button");
});
let line = s
.wait_for_stdout_line(
cursor,
|l| l.contains("async-button"),
Duration::from_secs(2),
)
.await
.expect("should wake on notify");
assert!(line.contains("async-button"));
}
#[tokio::test]
async fn wait_for_stdout_line_times_out_when_no_match() {
let s = make_test_session();
let err = s
.wait_for_stdout_line(0, |l| l == "never", Duration::from_millis(50))
.await
.unwrap_err();
assert!(matches!(err, Error::Timeout(_)), "got: {err:?}");
}
}