use anyhow::{Result, bail};
use std::path::{Path, PathBuf};
const PICO_UF2: &[u8] = include_bytes!("../../firmware/pico/revka-pico.uf2");
pub const PICO_MAIN_PY: &[u8] = include_bytes!("../../firmware/pico/main.py");
const UF2_MAGIC1: [u8; 4] = [0x55, 0x46, 0x32, 0x0A];
const MIN_REAL_UF2_BYTES: usize = 100 * 1024;
fn validate_real_uf2(data: &[u8], source: &str) -> Result<()> {
if data.len() < 8 || data[..4] != UF2_MAGIC1 {
bail!(
"{source} is not a valid UF2 file (magic mismatch). Download the real \
MicroPython UF2 from https://micropython.org/download/RPI_PICO/."
);
}
if data.len() < MIN_REAL_UF2_BYTES {
bail!(
"{source} is a {len}-byte placeholder, not real MicroPython firmware \
(a real RPI_PICO UF2 is hundreds of KB). Flashing it would brick the \
Pico's runtime. Download the real UF2 from \
https://micropython.org/download/RPI_PICO/ and place it at \
~/.revka/firmware/pico/revka-pico.uf2 (existing files are never \
overwritten), or replace firmware/pico/revka-pico.uf2 and rebuild Revka.",
len = data.len()
);
}
Ok(())
}
fn main_py_is_placeholder(data: &[u8]) -> bool {
match std::str::from_utf8(data) {
Ok(src) => src
.lines()
.map(str::trim)
.filter(|line| !line.is_empty())
.all(|line| line.starts_with('#')),
Err(_) => false,
}
}
pub fn find_rpi_rp2_mount() -> Option<PathBuf> {
let mac = PathBuf::from("/Volumes/RPI-RP2");
if mac.exists() {
return Some(mac);
}
for base in &["/media", "/run/media"] {
if let Ok(entries) = std::fs::read_dir(base) {
for entry in entries.flatten() {
let candidate = entry.path().join("RPI-RP2");
if candidate.exists() {
return Some(candidate);
}
}
}
}
None
}
pub fn ensure_firmware_dir() -> Result<PathBuf> {
use directories::BaseDirs;
let base = BaseDirs::new().ok_or_else(|| anyhow::anyhow!("cannot determine home directory"))?;
let firmware_dir = base.home_dir().join(".revka").join("firmware").join("pico");
std::fs::create_dir_all(&firmware_dir)?;
let uf2_path = firmware_dir.join("revka-pico.uf2");
if !uf2_path.exists() {
validate_real_uf2(PICO_UF2, "the bundled UF2")?;
std::fs::write(&uf2_path, PICO_UF2)?;
tracing::info!(path = %uf2_path.display(), "extracted bundled UF2");
}
let main_py_path = firmware_dir.join("main.py");
if !main_py_path.exists() {
if main_py_is_placeholder(PICO_MAIN_PY) {
bail!(
"The bundled main.py is a placeholder with no serial-protocol handler. \
Provide a real MicroPython main.py at {} (existing files are never \
overwritten), or replace firmware/pico/main.py and rebuild Revka.",
main_py_path.display()
);
}
std::fs::write(&main_py_path, PICO_MAIN_PY)?;
tracing::info!(path = %main_py_path.display(), "extracted bundled main.py");
}
Ok(firmware_dir)
}
pub async fn flash_uf2(mount_point: &Path, firmware_dir: &Path) -> Result<()> {
let uf2_src = firmware_dir.join("revka-pico.uf2");
let uf2_dst = mount_point.join("firmware.uf2");
let src_str = uf2_src.to_string_lossy().into_owned();
let dst_str = uf2_dst.to_string_lossy().into_owned();
tracing::info!(
src = %src_str,
dst = %dst_str,
"flashing UF2"
);
let data = std::fs::read(&uf2_src)?;
validate_real_uf2(&data, &format!("UF2 at {}", uf2_src.display()))?;
{
let src = uf2_src.clone();
let dst = uf2_dst.clone();
let result = tokio::task::spawn_blocking(move || std::fs::copy(&src, &dst))
.await
.map_err(|e| anyhow::anyhow!("copy task panicked: {e}"));
match result {
Ok(Ok(_)) => {
tracing::info!("UF2 copy complete (std::fs::copy) — Pico will reboot");
return Ok(());
}
Ok(Err(e)) => tracing::warn!("std::fs::copy failed ({}), trying cp", e),
Err(e) => tracing::warn!("std::fs::copy task failed ({}), trying cp", e),
}
}
{
const CP_TIMEOUT_SECS: u64 = 10;
let out = tokio::time::timeout(
std::time::Duration::from_secs(CP_TIMEOUT_SECS),
tokio::process::Command::new("cp")
.arg(&src_str)
.arg(&dst_str)
.output(),
)
.await;
match out {
Err(_elapsed) => {
tracing::warn!("cp timed out after {}s, trying sudo cp", CP_TIMEOUT_SECS);
}
Ok(Ok(o)) if o.status.success() => {
tracing::info!("UF2 copy complete (cp) — Pico will reboot");
return Ok(());
}
Ok(Ok(o)) => {
let stderr = String::from_utf8_lossy(&o.stderr);
tracing::warn!("cp failed ({}), trying sudo cp", stderr.trim());
}
Ok(Err(e)) => tracing::warn!("cp spawn failed ({}), trying sudo cp", e),
}
}
{
const SUDO_CP_TIMEOUT_SECS: u64 = 10;
let out = tokio::time::timeout(
std::time::Duration::from_secs(SUDO_CP_TIMEOUT_SECS),
tokio::process::Command::new("sudo")
.args(["-n", "cp", &src_str, &dst_str])
.output(),
)
.await;
match out {
Err(_elapsed) => {
tracing::warn!("sudo cp timed out after {}s", SUDO_CP_TIMEOUT_SECS);
}
Ok(Ok(o)) if o.status.success() => {
tracing::info!("UF2 copy complete (sudo cp) — Pico will reboot");
return Ok(());
}
Ok(Ok(o)) => {
let stderr = String::from_utf8_lossy(&o.stderr);
tracing::warn!("sudo cp failed: {}", stderr.trim());
}
Ok(Err(e)) => tracing::warn!("sudo cp spawn failed: {}", e),
}
}
bail!(
"All copy methods failed. Run this command manually, then restart Revka:\n\
\n sudo cp {src_str} {dst_str}\n"
)
}
pub async fn wait_for_serial_port(
timeout: std::time::Duration,
interval: std::time::Duration,
) -> Option<PathBuf> {
#[cfg(target_os = "macos")]
let patterns = &["/dev/cu.usbmodem*"];
#[cfg(target_os = "linux")]
let patterns = &["/dev/ttyACM*"];
#[cfg(not(any(target_os = "macos", target_os = "linux")))]
let patterns: &[&str] = &[];
let deadline = tokio::time::Instant::now() + timeout;
loop {
for pattern in patterns {
if let Ok(mut hits) = glob::glob(pattern) {
if let Some(Ok(path)) = hits.next() {
return Some(path);
}
}
}
if tokio::time::Instant::now() >= deadline {
return None;
}
tokio::time::sleep(interval).await;
}
}
pub async fn deploy_main_py(port: &Path, firmware_dir: &Path) -> Result<()> {
let main_py_src = firmware_dir.join("main.py");
let src_str = main_py_src.to_string_lossy().into_owned();
let port_str = port.to_string_lossy().into_owned();
if !main_py_src.exists() {
bail!(
"main.py not found at {} — run ensure_firmware_dir() first",
main_py_src.display()
);
}
if main_py_is_placeholder(&std::fs::read(&main_py_src)?) {
bail!(
"main.py at {} is a placeholder with no serial-protocol handler. \
Replace it with a real MicroPython handler before deploying.",
main_py_src.display()
);
}
tracing::info!(
src = %src_str,
port = %port_str,
"deploying main.py via mpremote"
);
let out = tokio::process::Command::new("mpremote")
.args([
"connect", &port_str, "cp", &src_str, ":main.py", "+", "reset",
])
.output()
.await;
match out {
Ok(o) if o.status.success() => {
tracing::info!("main.py deployed and Pico reset via mpremote");
Ok(())
}
Ok(o) => {
let stderr = String::from_utf8_lossy(&o.stderr);
bail!(
"mpremote failed (exit {}): {}.\n\
Run manually:\n mpremote connect {port_str} cp {src_str} :main.py + reset",
o.status,
stderr.trim()
)
}
Err(e) => {
bail!(
"mpremote not found or could not start ({e}).\n\
Install it with: pip install mpremote\n\
Then run: mpremote connect {port_str} cp {src_str} :main.py + reset"
)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn validate_real_uf2_accepts_real_sized_image() {
let mut img = UF2_MAGIC1.to_vec();
img.resize(MIN_REAL_UF2_BYTES, 0);
assert!(validate_real_uf2(&img, "test").is_ok());
}
#[test]
fn validate_real_uf2_rejects_wrong_magic() {
let img = vec![0u8; MIN_REAL_UF2_BYTES];
let err = validate_real_uf2(&img, "test").unwrap_err().to_string();
assert!(err.contains("magic"), "got: {err}");
}
#[test]
fn validate_real_uf2_rejects_placeholder_sized_image() {
let mut img = UF2_MAGIC1.to_vec();
img.resize(512, 0);
let err = validate_real_uf2(&img, "test").unwrap_err().to_string();
assert!(err.contains("placeholder"), "got: {err}");
}
#[test]
fn bundled_uf2_is_currently_a_placeholder() {
assert!(
validate_real_uf2(PICO_UF2, "bundled").is_err(),
"bundled UF2 now looks real — replace this tripwire with an is_ok() check"
);
}
#[test]
fn pico_main_py_is_non_empty() {
assert!(!PICO_MAIN_PY.is_empty(), "bundled main.py is empty");
}
#[test]
fn bundled_main_py_is_currently_a_placeholder() {
assert!(
main_py_is_placeholder(PICO_MAIN_PY),
"bundled main.py now has executable code — replace this tripwire"
);
}
#[test]
fn main_py_is_placeholder_detects_real_code() {
assert!(!main_py_is_placeholder(
b"# comment\nimport sys\nprint('hi')\n"
));
assert!(main_py_is_placeholder(b"# only\n# comments\n\n"));
}
#[test]
fn find_rpi_rp2_mount_returns_none_when_not_connected() {
let _ = find_rpi_rp2_mount(); }
#[test]
fn uf2_magic_constant_is_correct() {
assert_eq!(UF2_MAGIC1, [0x55, 0x46, 0x32, 0x0A]);
}
#[test]
fn ensure_firmware_dir_creates_directory() {
let result = ensure_firmware_dir();
match result {
Ok(dir) => {
assert!(
dir.exists(),
"firmware dir should exist after ensure_firmware_dir"
);
assert!(dir.ends_with("pico"), "firmware dir should end with 'pico'");
}
Err(e) => {
let msg = e.to_string();
assert!(
msg.contains("placeholder") || msg.contains("UF2"),
"error should mention placeholder UF2; got: {msg}"
);
}
}
}
#[tokio::test]
async fn flash_uf2_rejects_invalid_magic() {
let tmp = tempfile::tempdir().expect("create temp dir");
let firmware_dir = tmp.path();
std::fs::write(firmware_dir.join("revka-pico.uf2"), b"NOT_A_UF2_FILE").unwrap();
let mount = tempfile::tempdir().expect("create mount dir");
let result = flash_uf2(mount.path(), firmware_dir).await;
assert!(result.is_err(), "flash_uf2 should reject invalid UF2 magic");
let err = result.unwrap_err().to_string();
assert!(
err.contains("magic"),
"error should mention magic mismatch; got: {err}"
);
}
#[tokio::test]
async fn flash_uf2_rejects_too_small_file() {
let tmp = tempfile::tempdir().expect("create temp dir");
let firmware_dir = tmp.path();
std::fs::write(firmware_dir.join("revka-pico.uf2"), b"tiny").unwrap();
let mount = tempfile::tempdir().expect("create mount dir");
let result = flash_uf2(mount.path(), firmware_dir).await;
assert!(result.is_err(), "flash_uf2 should reject too-small UF2");
}
#[tokio::test]
async fn flash_uf2_rejects_placeholder_sized_uf2() {
let tmp = tempfile::tempdir().expect("create temp dir");
let firmware_dir = tmp.path();
let mut stub = UF2_MAGIC1.to_vec();
stub.resize(512, 0);
std::fs::write(firmware_dir.join("revka-pico.uf2"), &stub).unwrap();
let mount = tempfile::tempdir().expect("create mount dir");
let err = flash_uf2(mount.path(), firmware_dir)
.await
.unwrap_err()
.to_string();
assert!(err.contains("placeholder"), "got: {err}");
}
#[tokio::test]
async fn deploy_main_py_rejects_placeholder() {
let tmp = tempfile::tempdir().expect("create temp dir");
let firmware_dir = tmp.path();
std::fs::write(
firmware_dir.join("main.py"),
b"# Placeholder: replace with real firmware\n",
)
.unwrap();
let port = std::path::Path::new("/dev/ttyACM_fake_test");
let err = deploy_main_py(port, firmware_dir)
.await
.unwrap_err()
.to_string();
assert!(err.contains("placeholder"), "got: {err}");
}
#[tokio::test]
async fn deploy_main_py_fails_when_file_missing() {
let tmp = tempfile::tempdir().expect("create temp dir");
let firmware_dir = tmp.path();
let port = std::path::Path::new("/dev/ttyACM_fake_test");
let result = deploy_main_py(port, firmware_dir).await;
assert!(
result.is_err(),
"deploy should fail when main.py is missing"
);
let err = result.unwrap_err().to_string();
assert!(
err.contains("main.py not found"),
"error should mention missing main.py; got: {err}"
);
}
}