use std::{
io::{self, Read, Write},
net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, TcpStream},
path::Path,
process::{Child, Command, ExitStatus, Stdio},
sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex,
},
thread::JoinHandle,
time::{Duration, Instant},
};
use regex::Regex;
use serde::Deserialize;
#[derive(Debug)]
pub enum Error {
Io(io::Error),
Http(Box<ureq::Error>),
Json(serde_json::Error),
LaunchTimeout,
ResetTimeout,
SpeculosExited {
status: ExitStatus,
stderr: String,
},
WaitTimeout,
TouchscreenOnly,
ButtonsOnly,
ScreenshotNotPng,
EnvMissing(String),
EnvInvalid { var: String, reason: String },
}
impl From<io::Error> for Error {
fn from(e: io::Error) -> Self {
Error::Io(e)
}
}
impl From<ureq::Error> for Error {
fn from(e: ureq::Error) -> Self {
Error::Http(Box::new(e))
}
}
impl From<serde_json::Error> for Error {
fn from(e: serde_json::Error) -> Self {
Error::Json(e)
}
}
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Error::Io(e) => write!(f, "speculos io: {e}"),
Error::Http(e) => write!(f, "speculos http: {e}"),
Error::Json(e) => write!(f, "speculos json: {e}"),
Error::LaunchTimeout => write!(f, "speculos launch timeout"),
Error::ResetTimeout => write!(f, "speculos reset timeout"),
Error::SpeculosExited { status, stderr } => {
if stderr.is_empty() {
write!(f, "speculos process exited before ready: {status}")
} else {
write!(
f,
"speculos process exited before ready: {status}\nstderr:\n{stderr}"
)
}
}
Error::WaitTimeout => write!(f, "speculos wait_for timeout"),
Error::TouchscreenOnly => write!(f, "press() not supported on touchscreen models"),
Error::ButtonsOnly => write!(
f,
"tap/drag/press_at/release_at/move_to not supported on button models"
),
Error::ScreenshotNotPng => write!(f, "speculos screenshot: response is not a PNG"),
Error::EnvMissing(v) => write!(f, "speculos env: missing {v}"),
Error::EnvInvalid { var, reason } => {
write!(f, "speculos env: invalid {var}: {reason}")
}
}
}
}
impl std::error::Error for Error {}
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum Model {
NanoX,
NanoSP,
Stax,
Flex,
}
impl Model {
pub fn as_str(self) -> &'static str {
match self {
Model::NanoX => "nanox",
Model::NanoSP => "nanosp",
Model::Stax => "stax",
Model::Flex => "flex",
}
}
pub fn is_buttons(self) -> bool {
matches!(self, Model::NanoX | Model::NanoSP)
}
pub fn screen_size(self) -> (u32, u32) {
match self {
Model::NanoX | Model::NanoSP => (128, 64),
Model::Stax => (400, 672),
Model::Flex => (480, 600),
}
}
}
impl std::str::FromStr for Model {
type Err = Error;
fn from_str(s: &str) -> Result<Self> {
match s.to_ascii_lowercase().as_str() {
"nanox" => Ok(Model::NanoX),
"nanosp" | "nanosplus" | "nanos+" => Ok(Model::NanoSP),
"stax" => Ok(Model::Stax),
"flex" => Ok(Model::Flex),
_ => Err(Error::EnvInvalid {
var: "model".into(),
reason: format!("unknown: {s}"),
}),
}
}
}
#[derive(Copy, Clone, Debug)]
pub enum Button {
Left,
Right,
Both,
}
#[derive(Default, Clone, Debug)]
pub struct SpawnOptions {
pub seed: Option<String>,
pub apdu: Option<SocketAddr>,
pub api: Option<SocketAddr>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct ScreenEvent {
pub text: String,
#[serde(default)]
pub x: i32,
#[serde(default)]
pub y: i32,
#[serde(default)]
pub w: i32,
#[serde(default)]
pub h: i32,
}
#[derive(Deserialize)]
struct EventsResponse {
events: Vec<ScreenEvent>,
}
#[derive(Debug)]
pub struct Speculos {
child: Option<Child>,
apdu: SocketAddr,
api: SocketAddr,
cursor: AtomicUsize,
model: Model,
agent: ureq::Agent,
stderr_thread: Option<JoinHandle<()>>,
}
const STDERR_CAPTURE_CAP: usize = 64 * 1024;
impl Speculos {
pub fn launch(elf: &Path, model: Model) -> Result<Self> {
Self::launch_with(elf, model, SpawnOptions::default())
}
pub fn launch_on(elf: &Path, model: Model, apdu: SocketAddr, api: SocketAddr) -> Result<Self> {
Self::launch_with(
elf,
model,
SpawnOptions {
apdu: Some(apdu),
api: Some(api),
seed: None,
},
)
}
pub fn launch_with(elf: &Path, model: Model, opts: SpawnOptions) -> Result<Self> {
if !elf.exists() {
return Err(Error::Io(io::Error::new(
io::ErrorKind::NotFound,
format!("speculos elf not found: {}", elf.display()),
)));
}
const MAX_ATTEMPTS: u32 = 3;
let can_retry = opts.apdu.is_none() || opts.api.is_none();
let mut last_err = None;
for attempt in 1..=MAX_ATTEMPTS {
let apdu = match opts.apdu {
Some(a) => a,
None => pick_free_port()?,
};
let api = match opts.api {
Some(a) => a,
None => pick_free_port()?,
};
match Self::spawn_and_wait(elf, model, &opts, apdu, api) {
Ok(s) => return Ok(s),
Err(e) => {
last_err = Some(e);
if !can_retry {
break;
}
if attempt < MAX_ATTEMPTS {
std::thread::sleep(Duration::from_millis(200));
}
}
}
}
Err(last_err.expect("loop runs at least once"))
}
fn spawn_and_wait(
elf: &Path,
model: Model,
opts: &SpawnOptions,
apdu: SocketAddr,
api: SocketAddr,
) -> Result<Self> {
let bin = std::env::var("SPECULOS_BIN").unwrap_or_else(|_| "speculos".into());
let mut cmd = Command::new(&bin);
cmd.arg("--model")
.arg(model.as_str())
.arg("--display")
.arg("headless")
.arg("--apdu-port")
.arg(apdu.port().to_string())
.arg("--api-port")
.arg(api.port().to_string());
if let Some(seed) = &opts.seed {
cmd.arg("--seed").arg(seed);
}
cmd.arg(elf).stdout(Stdio::null()).stderr(Stdio::piped());
let mut child = cmd.spawn()?;
let capture = Arc::new(Mutex::new(Vec::<u8>::new()));
let stderr_thread = child.stderr.take().map(|mut s| {
let capture = capture.clone();
std::thread::spawn(move || {
let mut buf = [0u8; 4096];
loop {
match s.read(&mut buf) {
Ok(0) | Err(_) => break,
Ok(n) => {
if let Ok(mut c) = capture.lock() {
let room = STDERR_CAPTURE_CAP.saturating_sub(c.len());
if room > 0 {
c.extend_from_slice(&buf[..n.min(room)]);
}
}
}
}
}
})
});
let agent = ureq::AgentBuilder::new()
.timeout_connect(Duration::from_millis(500))
.timeout_read(Duration::from_secs(5))
.build();
if let Err(e) = poll_until_ready(
&agent,
apdu,
api,
Some(&mut child),
&capture,
Duration::from_secs(30),
) {
let _ = child.kill();
let _ = child.wait();
if let Some(t) = stderr_thread {
let _ = t.join();
}
return Err(e);
}
Ok(Self {
child: Some(child),
apdu,
api,
cursor: AtomicUsize::new(0),
model,
agent,
stderr_thread,
})
}
pub fn launch_model(model: Model) -> Result<Self> {
let (elf, opts) = options_from_env()?;
Self::launch_with(Path::new(&elf), model, opts)
}
pub fn attach(model: Model, apdu: SocketAddr, api: SocketAddr) -> Result<Self> {
let agent = ureq::AgentBuilder::new()
.timeout_connect(Duration::from_millis(500))
.timeout_read(Duration::from_secs(5))
.build();
let empty = Arc::new(Mutex::new(Vec::new()));
poll_until_ready(&agent, apdu, api, None, &empty, Duration::from_secs(5))?;
Ok(Self {
child: None,
apdu,
api,
cursor: AtomicUsize::new(0),
model,
agent,
stderr_thread: None,
})
}
pub fn from_env() -> Result<Self> {
let model_str = std::env::var("SPECULOS_MODEL")
.map_err(|_| Error::EnvMissing("SPECULOS_MODEL".into()))?;
let model: Model = model_str.parse().map_err(|e| match e {
Error::EnvInvalid { reason, .. } => Error::EnvInvalid {
var: "SPECULOS_MODEL".into(),
reason,
},
other => other,
})?;
let (elf, opts) = options_from_env()?;
Self::launch_with(Path::new(&elf), model, opts)
}
pub fn apdu_addr(&self) -> SocketAddr {
self.apdu
}
pub fn api_addr(&self) -> SocketAddr {
self.api
}
pub fn model(&self) -> Model {
self.model
}
pub fn screenshot(&self) -> Result<Vec<u8>> {
let mut buf = Vec::new();
self.agent
.get(&format!("http://{}/screenshot", self.api))
.call()?
.into_reader()
.read_to_end(&mut buf)?;
if !buf.starts_with(b"\x89PNG\r\n\x1a\n") {
return Err(Error::ScreenshotNotPng);
}
Ok(buf)
}
pub fn screenshot_to_file(&self, path: &Path) -> Result<()> {
let bytes = self.screenshot()?;
let mut f = std::fs::File::create(path)?;
f.write_all(&bytes)?;
Ok(())
}
pub fn events(&self) -> Result<Vec<ScreenEvent>> {
let body: EventsResponse = self
.agent
.get(&format!("http://{}/events", self.api))
.call()?
.into_json()?;
Ok(body.events)
}
pub fn new_events(&self) -> Result<Vec<ScreenEvent>> {
let all = self.events()?;
let prev = self.cursor.swap(all.len(), Ordering::AcqRel);
if prev > all.len() {
return Ok(all);
}
Ok(all.into_iter().skip(prev).collect())
}
pub fn wait_for(&self, re: &Regex, timeout: Duration) -> Result<ScreenEvent> {
self.wait_for_match(timeout, |ev| re.is_match(&ev.text))
}
pub fn wait_for_text(&self, needle: &str, timeout: Duration) -> Result<ScreenEvent> {
self.wait_for_match(timeout, |ev| ev.text.contains(needle))
}
fn wait_for_match<F>(&self, timeout: Duration, pred: F) -> Result<ScreenEvent>
where
F: Fn(&ScreenEvent) -> bool,
{
let deadline = Instant::now() + timeout;
loop {
for ev in self.new_events()? {
if pred(&ev) {
return Ok(ev);
}
}
let now = Instant::now();
if now >= deadline {
return Err(Error::WaitTimeout);
}
let remaining = deadline.saturating_duration_since(now);
std::thread::sleep(remaining.min(Duration::from_millis(100)));
}
}
pub fn reset(&self) -> Result<()> {
let _ = self
.agent
.post(&format!("http://{}/reset", self.api))
.send_string("");
let deadline = Instant::now() + Duration::from_secs(5);
while Instant::now() < deadline {
if self.events().is_ok() {
self.cursor.store(0, Ordering::Release);
let _ = self.new_events();
return Ok(());
}
std::thread::sleep(Duration::from_millis(50));
}
Err(Error::ResetTimeout)
}
pub fn press(&self, button: Button) -> Result<()> {
if !self.model.is_buttons() {
return Err(Error::TouchscreenOnly);
}
match button {
Button::Left => self.button_action("left", "press-and-release"),
Button::Right => self.button_action("right", "press-and-release"),
Button::Both => {
self.button_action("left", "press")?;
self.button_action("right", "press")?;
self.button_action("right", "release")?;
self.button_action("left", "release")
}
}
}
fn button_action(&self, side: &str, action: &str) -> Result<()> {
let url = format!("http://{}/button/{}", self.api, side);
self.agent
.post(&url)
.send_json(serde_json::json!({ "action": action }))?;
Ok(())
}
pub fn tap(&self, x: u32, y: u32) -> Result<()> {
if self.model.is_buttons() {
return Err(Error::ButtonsOnly);
}
self.finger_action("press-and-release", x, y)
}
pub fn drag(&self, from: (u32, u32), to: (u32, u32)) -> Result<()> {
if self.model.is_buttons() {
return Err(Error::ButtonsOnly);
}
const SEGMENTS: i64 = 8;
let dx = to.0 as i64 - from.0 as i64;
let dy = to.1 as i64 - from.1 as i64;
self.finger_action("press", from.0, from.1)?;
let mut last = (from.0, from.1);
for i in 1..SEGMENTS {
let x = (from.0 as i64 + dx * i / SEGMENTS).clamp(0, u32::MAX as i64) as u32;
let y = (from.1 as i64 + dy * i / SEGMENTS).clamp(0, u32::MAX as i64) as u32;
if (x, y) == last {
continue;
}
self.finger_action("move", x, y)?;
last = (x, y);
}
self.finger_action("release", to.0, to.1)
}
pub fn press_at(&self, x: u32, y: u32) -> Result<()> {
if self.model.is_buttons() {
return Err(Error::ButtonsOnly);
}
self.finger_action("press", x, y)
}
pub fn release_at(&self, x: u32, y: u32) -> Result<()> {
if self.model.is_buttons() {
return Err(Error::ButtonsOnly);
}
self.finger_action("release", x, y)
}
pub fn move_to(&self, x: u32, y: u32) -> Result<()> {
if self.model.is_buttons() {
return Err(Error::ButtonsOnly);
}
self.finger_action("move", x, y)
}
fn finger_action(&self, action: &str, x: u32, y: u32) -> Result<()> {
let url = format!("http://{}/finger", self.api);
self.agent
.post(&url)
.send_json(serde_json::json!({ "action": action, "x": x, "y": y }))?;
Ok(())
}
}
impl Drop for Speculos {
fn drop(&mut self) {
if let Some(mut child) = self.child.take() {
let _ = child.kill();
let _ = child.wait();
}
if let Some(t) = self.stderr_thread.take() {
let _ = t.join();
}
}
}
fn poll_until_ready(
agent: &ureq::Agent,
apdu: SocketAddr,
api: SocketAddr,
mut child: Option<&mut Child>,
stderr_capture: &Arc<Mutex<Vec<u8>>>,
timeout: Duration,
) -> Result<()> {
let deadline = Instant::now() + timeout;
loop {
if let Some(c) = child.as_deref_mut() {
if let Some(status) = c.try_wait()? {
return Err(Error::SpeculosExited {
status,
stderr: snapshot_stderr(stderr_capture),
});
}
}
if Instant::now() > deadline {
return Err(Error::LaunchTimeout);
}
if TcpStream::connect_timeout(&apdu, Duration::from_millis(200)).is_ok() {
break;
}
std::thread::sleep(Duration::from_millis(100));
}
loop {
if let Some(c) = child.as_deref_mut() {
if let Some(status) = c.try_wait()? {
return Err(Error::SpeculosExited {
status,
stderr: snapshot_stderr(stderr_capture),
});
}
}
if Instant::now() > deadline {
return Err(Error::LaunchTimeout);
}
if agent.get(&format!("http://{api}/events")).call().is_ok() {
return Ok(());
}
std::thread::sleep(Duration::from_millis(100));
}
}
fn snapshot_stderr(capture: &Arc<Mutex<Vec<u8>>>) -> String {
capture
.lock()
.map(|c| String::from_utf8_lossy(&c).into_owned())
.unwrap_or_default()
}
fn options_from_env() -> Result<(String, SpawnOptions)> {
let elf =
std::env::var("SPECULOS_ELF").map_err(|_| Error::EnvMissing("SPECULOS_ELF".into()))?;
let apdu = match std::env::var("SPECULOS_APDU").ok() {
Some(a) => Some(a.parse::<SocketAddr>().map_err(|e| Error::EnvInvalid {
var: "SPECULOS_APDU".into(),
reason: format!("{a}: {e}"),
})?),
None => None,
};
let api = match std::env::var("SPECULOS_API").ok() {
Some(a) => Some(a.parse::<SocketAddr>().map_err(|e| Error::EnvInvalid {
var: "SPECULOS_API".into(),
reason: format!("{a}: {e}"),
})?),
None => None,
};
if apdu.is_some() != api.is_some() {
return Err(Error::EnvInvalid {
var: "SPECULOS_APDU/SPECULOS_API".into(),
reason: "must both be set or both unset".into(),
});
}
let seed = std::env::var("SPECULOS_SEED")
.ok()
.filter(|s| !s.is_empty());
Ok((elf, SpawnOptions { seed, apdu, api }))
}
fn pick_free_port() -> Result<SocketAddr> {
let listener = TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0))?;
let addr = listener.local_addr()?;
drop(listener);
Ok(addr)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn model_parsing() {
assert_eq!("nanox".parse::<Model>().unwrap(), Model::NanoX);
assert_eq!("NANOSP".parse::<Model>().unwrap(), Model::NanoSP);
assert_eq!("stax".parse::<Model>().unwrap(), Model::Stax);
assert_eq!("Flex".parse::<Model>().unwrap(), Model::Flex);
assert!("nano-x".parse::<Model>().is_err());
}
#[test]
fn screen_sizes() {
assert_eq!(Model::NanoX.screen_size(), (128, 64));
assert_eq!(Model::Stax.screen_size(), (400, 672));
assert_eq!(Model::Flex.screen_size(), (480, 600));
}
#[test]
fn is_buttons() {
assert!(Model::NanoX.is_buttons());
assert!(Model::NanoSP.is_buttons());
assert!(!Model::Stax.is_buttons());
assert!(!Model::Flex.is_buttons());
}
#[test]
fn pick_free_port_returns_distinct() {
let a = pick_free_port().unwrap();
let b = pick_free_port().unwrap();
assert_ne!(a.port(), b.port());
}
}