use std::{
env,
io::{Read, Write},
net::SocketAddr,
path::PathBuf,
sync::{Arc, Mutex},
};
use anyhow::{Context, Result};
use axum::{
extract::{ws::Message, Path, Query, State, WebSocketUpgrade},
http::{
header::{AUTHORIZATION, WWW_AUTHENTICATE},
HeaderMap, HeaderValue, Request, StatusCode,
},
middleware::{self, Next},
response::{IntoResponse, Response},
routing::{any, delete, get, post},
Extension, Json, Router,
};
use base64::{
engine::general_purpose::{STANDARD as BASE64, URL_SAFE_NO_PAD as BASE64URL},
Engine,
};
use futures_util::{SinkExt, StreamExt};
use portable_pty::{native_pty_system, CommandBuilder, PtySize};
use rand::{distr::Alphanumeric, Rng};
use regex::Regex;
use serde::Deserialize;
use serde_json::json;
#[derive(rust_embed::RustEmbed)]
#[folder = "web/static"]
#[exclude = "vendor/*.map"]
#[exclude = "install/*"]
#[exclude = ".well-known/*"]
struct StaticAssets;
mod db;
mod mesh;
mod push;
mod relay;
mod shell_integration;
mod ssl;
mod tmux;
mod transcribe;
mod update;
#[derive(Clone, Debug, PartialEq)]
enum InstallPhase {
Idle,
Running,
Success,
Failed(String),
}
struct SttInstallState {
phase: InstallPhase,
output_tail: Vec<String>,
}
#[derive(Clone)]
struct AppState {
session_name_re: Arc<Regex>,
auth: Option<AuthConfig>,
cache_bust: String,
db: Arc<db::Db>,
internal_token: Arc<String>,
port: u16,
data_dir: PathBuf,
use_tls: bool,
update: update::UpdateState,
dev_mode: bool,
build_hash: String,
stt_install: Arc<tokio::sync::Mutex<SttInstallState>>,
}
#[derive(Clone)]
struct AuthConfig {
user: String,
pass: String,
session_cookie_name: String,
session_cookie_value: String,
}
#[tokio::main]
async fn main() -> Result<()> {
rustls::crypto::aws_lc_rs::default_provider()
.install_default()
.map_err(|_| anyhow::anyhow!("failed to install rustls crypto provider"))?;
let auth = load_auth_config();
let data_dir = resolve_data_dir()?;
std::fs::create_dir_all(&data_dir)
.with_context(|| format!("creating data dir: {}", data_dir.display()))?;
let db_path = data_dir.join("mobux.db");
println!("data dir: {}", data_dir.display());
let db = Arc::new(db::Db::open(&db_path)?);
let _ = db.vapid_keys()?;
let internal_token: String = (&mut rand::rng())
.sample_iter(Alphanumeric)
.take(32)
.map(char::from)
.collect();
let port = env::var("PORT")
.ok()
.and_then(|p| p.parse::<u16>().ok())
.unwrap_or(8080);
let use_tls = env::var("MOBUX_TLS")
.map(|v| v != "0" && v.to_lowercase() != "false")
.unwrap_or(true);
let dev_mode = env::var("MOBUX_DEV")
.map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
.unwrap_or(false);
let build_hash = std::fs::read_to_string(
std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("web/static/build-info.json"),
)
.ok()
.and_then(|s| serde_json::from_str::<serde_json::Value>(&s).ok())
.and_then(|v| v["hash"].as_str().map(str::to_owned))
.unwrap_or_else(|| "unknown".to_string());
let update_state = update::UpdateState::new();
update::spawn_checker(update_state.clone());
let state = AppState {
session_name_re: Arc::new(Regex::new(r"^[a-zA-Z0-9_-]+$")?),
auth,
cache_bust: format!(
"{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs()
),
db,
internal_token: Arc::new(internal_token),
port,
data_dir: data_dir.clone(),
use_tls,
update: update_state,
dev_mode,
build_hash,
stt_install: Arc::new(tokio::sync::Mutex::new(SttInstallState {
phase: InstallPhase::Idle,
output_tail: vec![],
})),
};
let internal_app = Router::new()
.route("/internal/trigger", post(api_internal_trigger))
.with_state(state.clone());
let internal_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
let internal_port = internal_listener.local_addr()?.port();
tokio::spawn(async move {
if let Err(e) = axum::serve(internal_listener, internal_app).await {
eprintln!("internal listener error: {e:#}");
}
});
if let Err(e) = tmux::install_bell_hook(internal_port, &state.internal_token).await {
eprintln!("warning: failed to install tmux alert-bell hook: {e:#}");
} else {
println!("tmux alert-bell hook installed (internal port {internal_port})");
}
let state_for_mw = state.clone();
let app = Router::new()
.route("/", get(root_redirect))
.route("/api/identify", get(api_identify))
.route("/api/build-info", get(api_build_info))
.route("/api/peers", get(api_peers))
.route("/api/sessions", get(api_sessions).post(api_create_session))
.route("/api/sessions/{name}/kill", post(api_kill_session))
.route("/api/sessions/{name}/rename", post(api_rename_session))
.route("/api/sessions/{name}/panes", get(api_list_panes))
.route(
"/api/sessions/{name}/panes/{pane}/select",
post(api_select_pane),
)
.route("/api/sessions/{name}/history", get(api_session_history))
.route("/api/sessions/{name}/command", post(api_tmux_command))
.route(
"/api/telemetry",
post(api_telemetry).layer(axum::extract::DefaultBodyLimit::max(64 * 1024)),
)
.route(
"/api/upload",
post(api_upload).layer(axum::extract::DefaultBodyLimit::max(200 * 1024 * 1024)),
)
.route(
"/transcribe",
post(api_transcribe).layer(axum::extract::DefaultBodyLimit::max(8 * 1024 * 1024)),
)
.route("/api/push/vapid-public-key", get(api_push_vapid_public_key))
.route(
"/api/push/subscribe",
post(api_push_subscribe).delete(api_push_unsubscribe),
)
.route("/api/push/devices", get(api_push_devices))
.route("/api/push/notify", post(api_push_notify))
.route(
"/api/settings/notifications",
get(api_get_notification_prefs).put(api_set_notification_prefs),
)
.route(
"/api/settings/stt",
get(api_get_stt_config).put(api_set_stt_config),
)
.route(
"/api/settings/mesh",
get(api_get_mesh_settings).put(api_set_mesh_settings),
)
.route("/api/stt/status", get(api_stt_status))
.route("/api/stt/models", get(api_stt_models))
.route(
"/api/stt/install",
post(api_stt_install).layer(axum::extract::DefaultBodyLimit::max(1024)),
)
.route("/api/stt/install/status", get(api_stt_install_status))
.route("/api/stt/start", post(api_stt_start))
.route("/api/stt/stop", post(api_stt_stop))
.route(
"/api/shell-integration/status",
get(api_shell_integration_status),
)
.route(
"/api/shell-integration/install",
post(api_shell_integration_install),
)
.route(
"/api/shell-integration/uninstall",
post(api_shell_integration_uninstall),
)
.route("/api/update/status", get(api_update_status))
.route("/api/update/check", post(api_update_check))
.route("/api/update/run", post(api_update_run))
.route("/r/{peer}/ws/{*rest}", get(relay::relay_ws))
.route("/r/{peer}/{*rest}", any(relay::relay_http))
.route("/api/peers/{peer}/pin", delete(relay::delete_peer_pin))
.route("/settings", get(settings_page))
.route("/s/{name}", get(terminal_page))
.route("/s/{host}/{name}", get(terminal_page_pinned))
.route("/ws/{name}", get(terminal_ws))
.route("/sw.js", get(serve_sw))
.route("/install", get(install_page))
.route("/install/mobux.apk", get(serve_install_apk))
.route("/install/mobux-ca.crt", get(serve_install_ca))
.route("/.well-known/assetlinks.json", get(serve_assetlinks))
.route("/app", get(serve_spa_index))
.route("/app/{*rest}", get(serve_spa_index))
.route("/static/{*path}", get(serve_static));
let app = if std::env::var_os("MOBUX_UPDATE_TEST_INDEX").is_some() {
app.route("/api/update/test-index", get(api_update_test_index))
} else {
app
};
let app = app
.fallback(get(|| async { axum::response::Redirect::temporary("/") }))
.with_state(state.clone())
.layer(middleware::from_fn_with_state(
state_for_mw,
auth_middleware,
));
let addr = SocketAddr::from(([0, 0, 0, 0], port));
if state.auth.is_some() {
println!("auth: enabled (HTTP Basic)");
} else {
println!("auth: disabled (set MOBUX_AUTH_USER/MOBUX_AUTH_PASS or MOBUX_PIN)");
}
if state.dev_mode {
println!("dev mode: ON (MOBUX_DEV) — /api/telemetry active, logs to stderr");
}
if use_tls {
let extra_hosts: Vec<String> = env::var("MOBUX_TLS_HOSTS")
.unwrap_or_default()
.split(',')
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
.collect();
let (cert_path, key_path) = match (env::var("MOBUX_CERT_FILE"), env::var("MOBUX_KEY_FILE"))
{
(Ok(c), Ok(k)) => {
eprintln!("[ssl] Using provided cert: {c}, key: {k}");
(std::path::PathBuf::from(c), std::path::PathBuf::from(k))
}
_ => {
let challenges = if ssl::acme_mode_enabled() {
let c = ssl::new_acme_challenges();
spawn_acme_http_server(c.clone()).await?;
Some(c)
} else {
None
};
let paths = ssl::ensure_certs(&extra_hosts, challenges).await?;
(paths.cert, paths.key)
}
};
let tls_config = ssl::load_rustls_config(&cert_path, &key_path)?;
let rustls_config =
axum_server::tls_rustls::RustlsConfig::from_config(std::sync::Arc::new(tls_config));
println!("mobux listening on https://{}", addr);
axum_server::bind_rustls(addr, rustls_config)
.serve(app.into_make_service())
.await?;
} else {
println!("mobux listening on http://{}", addr);
let listener = tokio::net::TcpListener::bind(addr).await?;
axum::serve(listener, app).await?;
}
Ok(())
}
fn resolve_data_dir() -> Result<PathBuf> {
if let Some(override_dir) = env::var_os("MOBUX_DATA_DIR") {
let path = PathBuf::from(override_dir);
if path.as_os_str().is_empty() {
return Err(anyhow::anyhow!("MOBUX_DATA_DIR is set but empty"));
}
return Ok(path);
}
let dirs = directories::ProjectDirs::from("", "", "mobux")
.ok_or_else(|| anyhow::anyhow!("could not resolve user home directory for data dir"))?;
Ok(dirs.data_dir().to_path_buf())
}
async fn spawn_acme_http_server(challenges: ssl::AcmeChallenges) -> Result<()> {
let port: u16 = env::var("MOBUX_ACME_HTTP_PORT")
.ok()
.and_then(|p| p.parse().ok())
.unwrap_or(80);
let addr = SocketAddr::from(([0, 0, 0, 0], port));
let router = Router::new()
.route(
"/.well-known/acme-challenge/{token}",
get(serve_acme_challenge),
)
.layer(Extension(challenges));
let listener = tokio::net::TcpListener::bind(addr).await.map_err(|e| {
anyhow::anyhow!(
"ACME mode: failed to bind HTTP listener on {addr} for HTTP-01 challenges \
(set MOBUX_ACME_HTTP_PORT to override): {e}"
)
})?;
eprintln!("[ssl] ACME: HTTP-01 challenge server listening on http://{addr}");
tokio::spawn(async move {
if let Err(e) = axum::serve(listener, router).await {
eprintln!("[ssl] ACME HTTP server exited with error: {e}");
}
});
Ok(())
}
async fn serve_acme_challenge(
Path(token): Path<String>,
Extension(challenges): Extension<ssl::AcmeChallenges>,
) -> Response {
match ssl::lookup_acme_challenge(&challenges, &token) {
Some(value) => (
StatusCode::OK,
[(axum::http::header::CONTENT_TYPE, "text/plain")],
value,
)
.into_response(),
None => (StatusCode::NOT_FOUND, "unknown acme challenge token").into_response(),
}
}
fn ensure_session_cookie_value() -> String {
let path = ssl::config_dir().join("session-cookie");
if let Ok(existing) = std::fs::read_to_string(&path) {
let trimmed = existing.trim();
if trimmed.len() >= 32 {
return trimmed.to_string();
}
}
let value: String = rand::rng()
.sample_iter(&Alphanumeric)
.take(32)
.map(char::from)
.collect();
if let Some(parent) = path.parent() {
let _ = std::fs::create_dir_all(parent);
}
if let Err(e) = std::fs::write(&path, &value) {
eprintln!(
"[auth] WARN: could not persist session cookie to {}: {e}. \
Restarts will re-prompt clients for credentials.",
path.display()
);
} else {
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let _ = std::fs::set_permissions(&path, std::fs::Permissions::from_mode(0o600));
}
}
value
}
fn load_auth_config() -> Option<AuthConfig> {
let user_env = env::var("MOBUX_AUTH_USER")
.ok()
.map(|v| v.trim().to_string());
let pass_env = env::var("MOBUX_AUTH_PASS")
.ok()
.map(|v| v.trim().to_string());
let pin_env = env::var("MOBUX_PIN").ok().map(|v| v.trim().to_string());
let session_cookie_name = "mobux_session".to_string();
let session_cookie_value = ensure_session_cookie_value();
match (user_env, pass_env, pin_env) {
(Some(user), Some(pass), _) if !user.is_empty() && !pass.is_empty() => Some(AuthConfig {
user,
pass,
session_cookie_name,
session_cookie_value,
}),
(user_opt, None, Some(pin)) if !pin.is_empty() => Some(AuthConfig {
user: user_opt
.filter(|u| !u.is_empty())
.unwrap_or_else(|| "mobux".to_string()),
pass: pin,
session_cookie_name,
session_cookie_value,
}),
_ => None,
}
}
fn is_public_path(path: &str) -> bool {
path == "/api/identify"
|| path == "/api/update/test-index"
|| path == "/install"
|| path.starts_with("/install/")
|| path.starts_with("/.well-known/")
|| path.starts_with("/static/icon-")
|| path == "/static/manifest.json"
|| path == "/sw.js"
}
async fn auth_middleware(
State(state): State<AppState>,
req: Request<axum::body::Body>,
next: Next,
) -> Response {
let Some(auth) = &state.auth else {
return next.run(req).await;
};
if is_public_path(req.uri().path()) {
return next.run(req).await;
}
let cookie_ok = req
.headers()
.get(axum::http::header::COOKIE)
.and_then(|v| v.to_str().ok())
.map(|cookie| {
cookie
.split(';')
.filter_map(|p| p.trim().split_once('='))
.any(|(k, v)| k == auth.session_cookie_name && v == auth.session_cookie_value)
})
.unwrap_or(false);
if cookie_ok {
return next.run(req).await;
}
let basic_ok = req
.headers()
.get(AUTHORIZATION)
.and_then(|v| v.to_str().ok())
.and_then(|v| v.strip_prefix("Basic "))
.and_then(|b64| BASE64.decode(b64).ok())
.and_then(|bytes| String::from_utf8(bytes).ok())
.and_then(|pair| {
let mut parts = pair.splitn(2, ':');
let user = parts.next()?.to_string();
let pass = parts.next()?.to_string();
Some((user, pass))
})
.map(|(user, pass)| user == auth.user && pass == auth.pass)
.unwrap_or(false);
if basic_ok {
let mut resp = next.run(req).await;
let set_cookie = format!(
"{}={}; Path=/; HttpOnly; SameSite=Lax; Secure; Max-Age=2592000",
auth.session_cookie_name, auth.session_cookie_value
);
if let Ok(v) = HeaderValue::from_str(&set_cookie) {
resp.headers_mut().append(axum::http::header::SET_COOKIE, v);
}
return resp;
}
let mut resp = (StatusCode::UNAUTHORIZED, "Authentication required").into_response();
resp.headers_mut().insert(
WWW_AUTHENTICATE,
HeaderValue::from_static("Basic realm=\"mobux\""),
);
resp
}
async fn root_redirect() -> impl IntoResponse {
(
axum::http::StatusCode::TEMPORARY_REDIRECT,
[
(axum::http::header::LOCATION, "/app"),
(axum::http::header::CACHE_CONTROL, "no-store"),
],
)
}
async fn api_sessions() -> Result<Json<Vec<tmux::Session>>, AppError> {
let sessions = tmux::list_sessions().await.map_err(AppError::bad_request)?;
Ok(Json(sessions))
}
async fn api_identify() -> Json<mesh::Identify> {
Json(mesh::Identify {
app: "mobux".to_string(),
version: env!("CARGO_PKG_VERSION").to_string(),
})
}
async fn api_build_info(State(state): State<AppState>) -> Json<serde_json::Value> {
Json(json!({
"version": PKG_VERSION,
"build_hash": state.build_hash,
}))
}
async fn api_peers(State(state): State<AppState>) -> Response {
let probe_port = tokio::task::spawn_blocking({
let db = state.db.clone();
move || db.mesh_peer_port()
})
.await
.ok()
.and_then(|r| r.ok())
.unwrap_or(5151);
match mesh::enumerate(probe_port).await {
Ok(peers) => Json(json!({ "peers": peers })).into_response(),
Err(err) => {
(StatusCode::BAD_GATEWAY, Json(json!({ "error": err }))).into_response()
}
}
}
async fn api_update_status(State(state): State<AppState>) -> Json<update::UpdateStatus> {
Json(state.update.status().await)
}
async fn api_update_check(State(state): State<AppState>) -> Json<update::UpdateStatus> {
Json(state.update.refresh().await)
}
async fn api_update_run(State(state): State<AppState>) -> Response {
let status = state.update.status().await;
let Some(latest) = status.latest.clone() else {
let err = update::RunError::NoUpdateAvailable {
message: "no latest version known yet; run a check first".to_string(),
};
return (StatusCode::CONFLICT, Json(json!({ "error": err }))).into_response();
};
if !status.available {
let err = update::RunError::NoUpdateAvailable {
message: format!(
"already on the latest version ({})",
update::UpdateState::current_version()
),
};
return (StatusCode::CONFLICT, Json(json!({ "error": err }))).into_response();
}
if !state.update.try_begin_run() {
let err = update::RunError::AlreadyRunning {
message: "an update is already in progress".to_string(),
};
return (StatusCode::CONFLICT, Json(json!({ "error": err }))).into_response();
}
match update::spawn_updater(&state.data_dir, &latest, state.port, state.use_tls) {
Ok(log_path) => {
(
StatusCode::ACCEPTED,
Json(json!({
"started": true,
"version": latest,
"log": log_path.to_string_lossy(),
})),
)
.into_response()
}
Err(err) => {
state.update.end_run();
let status = match err {
update::RunError::NotSystemd { .. } => StatusCode::PRECONDITION_FAILED,
_ => StatusCode::INTERNAL_SERVER_ERROR,
};
(status, Json(json!({ "error": err }))).into_response()
}
}
}
async fn api_update_test_index() -> impl IntoResponse {
let body = std::env::var("MOBUX_UPDATE_TEST_INDEX").unwrap_or_default();
([(axum::http::header::CONTENT_TYPE, "text/plain")], body)
}
#[derive(Deserialize)]
struct CreateReq {
name: String,
}
async fn api_create_session(
State(state): State<AppState>,
Json(payload): Json<CreateReq>,
) -> Result<Json<serde_json::Value>, AppError> {
let name = payload.name.trim();
validate_session_name(&state, name)?;
tmux::new_session(name)
.await
.map_err(AppError::bad_request)?;
Ok(Json(json!({"ok": true, "name": name})))
}
async fn api_kill_session(
State(state): State<AppState>,
Path(name): Path<String>,
) -> Result<Json<serde_json::Value>, AppError> {
validate_session_name(&state, &name)?;
tmux::kill_session(&name)
.await
.map_err(AppError::bad_request)?;
Ok(Json(json!({"ok": true})))
}
#[derive(Deserialize)]
struct RenameReq {
name: String,
}
async fn api_rename_session(
State(state): State<AppState>,
Path(old_name): Path<String>,
Json(payload): Json<RenameReq>,
) -> Result<Json<serde_json::Value>, AppError> {
validate_session_name(&state, &old_name)?;
validate_session_name(&state, &payload.name)?;
tmux::rename_session(&old_name, &payload.name)
.await
.map_err(AppError::bad_request)?;
Ok(Json(json!({"ok": true})))
}
async fn api_list_panes(
State(state): State<AppState>,
Path(name): Path<String>,
) -> Result<Json<Vec<tmux::Pane>>, AppError> {
validate_session_name(&state, &name)?;
let panes = tmux::list_panes(&name)
.await
.map_err(AppError::bad_request)?;
Ok(Json(panes))
}
async fn api_select_pane(
State(state): State<AppState>,
Path((name, pane)): Path<(String, String)>,
) -> Result<Json<serde_json::Value>, AppError> {
validate_session_name(&state, &name)?;
tmux::select_pane(&name, &pane)
.await
.map_err(AppError::bad_request)?;
Ok(Json(json!({"ok": true})))
}
async fn api_session_history(
State(state): State<AppState>,
Path(name): Path<String>,
) -> Result<String, AppError> {
validate_session_name(&state, &name)?;
let history = tmux::capture_history(&name, 10000)
.await
.map_err(AppError::bad_request)?;
Ok(history)
}
#[derive(Deserialize)]
struct CommandReq {
command: String,
}
async fn api_tmux_command(
State(state): State<AppState>,
Path(name): Path<String>,
Json(payload): Json<CommandReq>,
) -> Result<Json<serde_json::Value>, AppError> {
validate_session_name(&state, &name)?;
let result = tmux::run_command(&name, &payload.command)
.await
.map_err(AppError::bad_request)?;
Ok(Json(json!({"ok": true, "output": result})))
}
async fn api_telemetry(State(state): State<AppState>, body: String) -> StatusCode {
if !state.dev_mode {
return StatusCode::NOT_FOUND;
}
let ts = chrono::Local::now().format("%H:%M:%S%.3f");
eprintln!("[telemetry {ts}] {body}");
StatusCode::NO_CONTENT
}
async fn api_upload(
mut multipart: axum::extract::Multipart,
) -> Result<Json<serde_json::Value>, AppError> {
use std::fs;
use std::path::PathBuf;
let upload_dir = PathBuf::from("/tmp/mobux-uploads");
fs::create_dir_all(&upload_dir).map_err(|e| AppError::bad_request(e.into()))?;
if let Some(field) = multipart
.next_field()
.await
.map_err(|e| AppError::bad_request(e.into()))?
{
let filename = field.file_name().unwrap_or("upload").to_string();
let safe_name = filename
.chars()
.map(|c| {
if c.is_alphanumeric() || c == '.' || c == '-' || c == '_' {
c
} else {
'_'
}
})
.collect::<String>();
let ts = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis();
let dest = upload_dir.join(format!("{ts}-{safe_name}"));
let data = field
.bytes()
.await
.map_err(|e| AppError::bad_request(e.into()))?;
fs::write(&dest, &data).map_err(|e| AppError::bad_request(e.into()))?;
return Ok(Json(json!({
"path": dest.to_string_lossy(),
"size": data.len(),
"name": safe_name,
})));
}
Err(AppError::bad_request(anyhow::anyhow!("no file in upload")))
}
async fn api_transcribe(
State(state): State<AppState>,
mut multipart: axum::extract::Multipart,
) -> Result<Json<serde_json::Value>, AppError> {
let mut audio_bytes: Option<Vec<u8>> = None;
let mut filename = "speech.wav".to_string();
while let Some(field) = multipart
.next_field()
.await
.map_err(|e| anyhow::anyhow!("multipart: {e}"))
.map_err(AppError::bad_request)?
{
if field.name() == Some("audio") {
if let Some(fname) = field.file_name() {
filename = fname.to_string();
}
audio_bytes = Some(
field
.bytes()
.await
.map_err(|e| anyhow::anyhow!("read field: {e}"))
.map_err(AppError::bad_request)?
.to_vec(),
);
} else {
let _ = field.bytes().await;
}
}
let audio = audio_bytes
.ok_or_else(|| AppError::bad_request(anyhow::anyhow!("missing 'audio' field")))?;
let provider_cfg = tokio::task::spawn_blocking({
let db = state.db.clone();
move || -> anyhow::Result<transcribe::ProviderConfig> {
let kind = db.stt_active_kind()?;
let row = db
.stt_provider(&kind)?
.unwrap_or_else(|| db::SttProviderRow::default_for(&kind));
Ok(transcribe::ProviderConfig {
url: row.transcription_url(),
model: row.model,
api_key: row.api_key,
})
}
})
.await
.map_err(|e| AppError::internal(anyhow::anyhow!("spawn_blocking: {e}")))?
.map_err(AppError::internal)?;
match transcribe::transcribe_with_provider(&provider_cfg, audio, &filename).await {
Ok(text) => Ok(Json(json!({ "text": text }))),
Err(transcribe::TranscribeError::ProviderUnavailable(msg)) => Err(AppError {
status: StatusCode::SERVICE_UNAVAILABLE,
message: msg,
}),
Err(e) => Err(AppError::internal(anyhow::anyhow!("{e}"))),
}
}
#[derive(Deserialize)]
struct PushSubscribeReq {
endpoint: String,
p256dh: String,
auth: String,
label: Option<String>,
}
#[derive(Deserialize)]
struct PushUnsubscribeReq {
endpoint: String,
}
fn decode_b64url(field: &str, value: &str) -> Result<Vec<u8>, AppError> {
BASE64URL
.decode(value)
.map_err(|e| AppError::bad_request(anyhow::anyhow!("invalid base64url in '{field}': {e}")))
}
async fn api_push_vapid_public_key(
State(state): State<AppState>,
) -> Result<Json<serde_json::Value>, AppError> {
let keys = state
.db
.vapid_keys()
.map_err(|e| AppError::internal(anyhow::anyhow!("loading vapid keys: {e}")))?;
Ok(Json(json!({ "key": BASE64URL.encode(&keys.public_key) })))
}
async fn api_push_subscribe(
State(state): State<AppState>,
Json(payload): Json<PushSubscribeReq>,
) -> Result<StatusCode, AppError> {
if payload.endpoint.trim().is_empty() {
return Err(AppError::bad_request(anyhow::anyhow!(
"endpoint must not be empty"
)));
}
let p256dh = decode_b64url("p256dh", &payload.p256dh)?;
let auth = decode_b64url("auth", &payload.auth)?;
let label = payload
.label
.map(|l| l.trim().to_string())
.filter(|l| !l.is_empty());
state
.db
.insert_subscription(db::NewSubscription {
endpoint: payload.endpoint,
p256dh,
auth,
label,
})
.map_err(|e| AppError::internal(anyhow::anyhow!("storing subscription: {e}")))?;
Ok(StatusCode::NO_CONTENT)
}
async fn api_push_unsubscribe(
State(state): State<AppState>,
Json(payload): Json<PushUnsubscribeReq>,
) -> Result<StatusCode, AppError> {
if payload.endpoint.trim().is_empty() {
return Err(AppError::bad_request(anyhow::anyhow!(
"endpoint must not be empty"
)));
}
state
.db
.remove_subscription(&payload.endpoint)
.map_err(|e| AppError::internal(anyhow::anyhow!("removing subscription: {e}")))?;
Ok(StatusCode::NO_CONTENT)
}
async fn api_push_devices(
State(state): State<AppState>,
) -> Result<Json<Vec<serde_json::Value>>, AppError> {
let subs = state
.db
.list_subscriptions()
.map_err(|e| AppError::internal(anyhow::anyhow!("listing subscriptions: {e}")))?;
let out: Vec<serde_json::Value> = subs
.into_iter()
.map(|s| {
json!({
"id": s.id,
"label": s.label,
"created_at": s.created_at,
"last_seen_at": s.last_seen_at,
})
})
.collect();
Ok(Json(out))
}
#[derive(Deserialize)]
struct PushNotifyRequest {
title: Option<String>,
body: String,
tag: Option<String>,
url: Option<String>,
}
async fn api_push_notify(
State(state): State<AppState>,
Json(req): Json<PushNotifyRequest>,
) -> Result<StatusCode, AppError> {
if req.body.trim().is_empty() {
return Err(AppError::bad_request(anyhow::anyhow!("body is required")));
}
let payload = push::Payload {
title: req.title.unwrap_or_else(|| "mobux".to_string()),
body: req.body,
tag: req.tag,
url: req.url,
};
tokio::spawn(push::notify(state.db.clone(), payload));
Ok(StatusCode::NO_CONTENT)
}
#[derive(Deserialize)]
struct InternalTriggerQuery {
kind: String,
session: String,
window: Option<String>,
}
#[derive(serde::Deserialize)]
struct SttModelsQuery {
kind: Option<String>,
host: Option<String>,
port: Option<String>,
}
async fn api_internal_trigger(
State(state): State<AppState>,
headers: HeaderMap,
axum::extract::Query(q): axum::extract::Query<InternalTriggerQuery>,
) -> StatusCode {
let token = headers
.get("X-Mobux-Token")
.and_then(|v| v.to_str().ok())
.unwrap_or("");
if token != state.internal_token.as_str() {
return StatusCode::UNAUTHORIZED;
}
if !state.session_name_re.is_match(&q.session) {
return StatusCode::BAD_REQUEST;
}
match q.kind.as_str() {
"bell" => {
let prefs = state.db.notification_prefs().unwrap_or_default();
if prefs.bell {
push::fire_bell(state.db.clone(), &q.session, q.window.as_deref());
}
}
_ => return StatusCode::BAD_REQUEST,
}
StatusCode::NO_CONTENT
}
#[derive(serde::Serialize, Deserialize)]
struct NotifPrefsJson {
bell: bool,
bell_emoji: bool,
program_exit: bool,
program_exit_nonzero: bool,
}
impl From<db::NotificationPrefs> for NotifPrefsJson {
fn from(p: db::NotificationPrefs) -> Self {
Self {
bell: p.bell,
bell_emoji: p.bell_emoji,
program_exit: p.program_exit,
program_exit_nonzero: p.program_exit_nonzero,
}
}
}
impl From<NotifPrefsJson> for db::NotificationPrefs {
fn from(j: NotifPrefsJson) -> Self {
Self {
bell: j.bell,
bell_emoji: j.bell_emoji,
program_exit: j.program_exit,
program_exit_nonzero: j.program_exit_nonzero,
}
}
}
async fn api_get_notification_prefs(
State(state): State<AppState>,
) -> Result<Json<NotifPrefsJson>, AppError> {
let prefs = state
.db
.notification_prefs()
.map_err(|e| AppError::internal(anyhow::anyhow!("reading prefs: {e}")))?;
Ok(Json(prefs.into()))
}
async fn api_set_notification_prefs(
State(state): State<AppState>,
Json(req): Json<NotifPrefsJson>,
) -> Result<StatusCode, AppError> {
state
.db
.set_notification_prefs(req.into())
.map_err(|e| AppError::internal(anyhow::anyhow!("writing prefs: {e}")))?;
Ok(StatusCode::NO_CONTENT)
}
#[derive(Deserialize)]
struct ShellIntegrationReq {
shell: shell_integration::Shell,
}
async fn api_shell_integration_status() -> Result<Json<shell_integration::Status>, AppError> {
let s = shell_integration::status().map_err(AppError::internal)?;
Ok(Json(s))
}
async fn api_shell_integration_install(
Json(req): Json<ShellIntegrationReq>,
) -> Result<Json<shell_integration::Status>, AppError> {
let s = shell_integration::install(req.shell).map_err(AppError::internal)?;
Ok(Json(s))
}
async fn api_shell_integration_uninstall(
Json(req): Json<ShellIntegrationReq>,
) -> Result<Json<shell_integration::Status>, AppError> {
let s = shell_integration::uninstall(req.shell).map_err(AppError::internal)?;
Ok(Json(s))
}
const PKG_VERSION: &str = env!("CARGO_PKG_VERSION");
async fn settings_page() -> impl IntoResponse {
(
axum::http::StatusCode::TEMPORARY_REDIRECT,
[
(axum::http::header::LOCATION, "/app#/settings"),
(axum::http::header::CACHE_CONTROL, "no-store"),
],
)
}
async fn terminal_page(
State(state): State<AppState>,
Path(name): Path<String>,
) -> Result<impl IntoResponse, AppError> {
validate_session_name(&state, &name)?;
let location = format!("/app#/s/{name}");
Ok((
axum::http::StatusCode::TEMPORARY_REDIRECT,
[
(axum::http::header::LOCATION, location),
(axum::http::header::CACHE_CONTROL, "no-store".to_string()),
],
))
}
async fn terminal_page_pinned(
State(state): State<AppState>,
Path((host, name)): Path<(String, String)>,
) -> Result<impl IntoResponse, AppError> {
validate_session_name(&state, &name)?;
let peer = validate_pinned_host(&host)?;
let peer_enc = peer.replace(':', "%3A");
let location = format!("/app#/s/{peer_enc}/{name}");
Ok((
axum::http::StatusCode::TEMPORARY_REDIRECT,
[
(axum::http::header::LOCATION, location),
(axum::http::header::CACHE_CONTROL, "no-store".to_string()),
],
))
}
fn validate_pinned_host(host: &str) -> Result<String, AppError> {
let peer = relay::canonical_peer(host)
.map_err(|e| AppError::bad_request(anyhow::anyhow!("invalid host: {e}")))?;
if !peer
.chars()
.all(|c| c.is_ascii_alphanumeric() || matches!(c, '.' | ':' | '_' | '-'))
{
return Err(AppError::bad_request(anyhow::anyhow!("invalid host")));
}
Ok(peer)
}
async fn serve_sw(State(state): State<AppState>) -> impl axum::response::IntoResponse {
use axum::http::header;
let body = format!(
"{}\n// sw-version: {}\n",
include_str!("../web/static/sw.js"),
state.cache_bust,
);
(
[
(header::CONTENT_TYPE, "text/javascript"),
(header::CACHE_CONTROL, "no-store, must-revalidate"),
],
body,
)
}
async fn serve_static(Path(path): Path<String>) -> Response {
use axum::http::header;
match StaticAssets::get(&path) {
Some(file) => {
let mime = file.metadata.mimetype();
let mut resp = (StatusCode::OK, file.data).into_response();
let h = resp.headers_mut();
if let Ok(v) = HeaderValue::from_str(mime) {
h.insert(header::CONTENT_TYPE, v);
}
h.insert(
header::CACHE_CONTROL,
HeaderValue::from_static("no-store, must-revalidate"),
);
resp
}
None => (StatusCode::NOT_FOUND, "not found").into_response(),
}
}
async fn serve_spa_index() -> Response {
use axum::http::header;
match StaticAssets::get("spa/index.html") {
Some(file) => {
let mut resp = (StatusCode::OK, file.data).into_response();
let h = resp.headers_mut();
h.insert(
header::CONTENT_TYPE,
HeaderValue::from_static("text/html; charset=utf-8"),
);
h.insert(
header::CACHE_CONTROL,
HeaderValue::from_static("no-store, must-revalidate"),
);
resp
}
None => (
StatusCode::NOT_FOUND,
"SPA not built — run `node web/build.js` (or `make build`).",
)
.into_response(),
}
}
const INSTALL_APK_PATH: &str = "web/static/install/mobux.apk";
const INSTALL_ASSETLINKS_PATH: &str = "web/static/.well-known/assetlinks.json";
async fn install_page() -> impl IntoResponse {
(
axum::http::StatusCode::TEMPORARY_REDIRECT,
[
(axum::http::header::LOCATION, "/app#/install"),
(axum::http::header::CACHE_CONTROL, "no-store"),
],
)
}
async fn serve_install_apk() -> Response {
serve_file_or_404(
INSTALL_APK_PATH,
"application/vnd.android.package-archive",
Some("mobux.apk"),
)
.await
}
async fn serve_install_ca() -> Response {
if ssl::acme_mode_enabled() {
return (StatusCode::NOT_FOUND, "ACME mode: no local CA to install").into_response();
}
let path = ssl::ca_cert_path();
serve_file_or_404(
path.to_string_lossy().as_ref(),
"application/x-x509-ca-cert",
Some("mobux-ca.crt"),
)
.await
}
async fn serve_assetlinks() -> Response {
serve_file_or_404(INSTALL_ASSETLINKS_PATH, "application/json", None).await
}
async fn serve_file_or_404(
path: &str,
content_type: &'static str,
download_name: Option<&'static str>,
) -> Response {
use axum::http::header;
let bytes = match tokio::fs::read(path).await {
Ok(b) => b,
Err(_) => return (StatusCode::NOT_FOUND, "not found").into_response(),
};
let mut resp = (
StatusCode::OK,
[(header::CONTENT_TYPE, content_type)],
bytes,
)
.into_response();
if let Some(name) = download_name {
let disp = format!("attachment; filename=\"{name}\"");
if let Ok(v) = HeaderValue::from_str(&disp) {
resp.headers_mut().insert(header::CONTENT_DISPOSITION, v);
}
}
resp
}
async fn terminal_ws(
State(state): State<AppState>,
Path(name): Path<String>,
ws: WebSocketUpgrade,
) -> Result<Response, AppError> {
validate_session_name(&state, &name)?;
Ok(ws.on_upgrade(move |socket| async move {
if let Err(err) = handle_ws(socket, name).await {
eprintln!("ws error: {err:#}");
}
}))
}
#[derive(Deserialize)]
struct ResizeMsg {
#[serde(rename = "type")]
kind: String,
cols: u16,
rows: u16,
}
async fn handle_ws(socket: axum::extract::ws::WebSocket, session_name: String) -> Result<()> {
let pty_system = native_pty_system();
let pair = pty_system.openpty(PtySize {
rows: 35,
cols: 120,
pixel_width: 0,
pixel_height: 0,
})?;
let mut cmd = CommandBuilder::new("bash");
let tmux_bin = match std::env::var("MOBUX_TMUX_SOCKET") {
Ok(s) if !s.is_empty() => format!("tmux -L {}", s),
_ => "tmux".to_string(),
};
cmd.env("TERM", "xterm-256color");
cmd.args([
"-c",
&format!(
"{tmux} set-option -g mouse on 2>/dev/null; {tmux} set-option -g allow-passthrough on 2>/dev/null; {tmux} set-window-option -g aggressive-resize on 2>/dev/null; {tmux} attach-session -t {session}",
tmux = tmux_bin,
session = session_name,
),
]);
let mut child = pair.slave.spawn_command(cmd)?;
let mut reader = pair.master.try_clone_reader()?;
let writer = pair.master.take_writer()?;
let master = pair.master;
let writer = Arc::new(Mutex::new(writer));
let master = Arc::new(Mutex::new(master));
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<Vec<u8>>();
std::thread::spawn(move || {
let mut buf = vec![0u8; 8192];
loop {
match reader.read(&mut buf) {
Ok(0) => break,
Ok(n) => {
if tx.send(buf[..n].to_vec()).is_err() {
break;
}
}
Err(_) => break,
}
}
});
let (mut ws_sender, mut ws_receiver) = socket.split();
loop {
tokio::select! {
maybe_out = rx.recv() => {
match maybe_out {
Some(chunk) => {
let text = String::from_utf8_lossy(&chunk).to_string();
if ws_sender.send(Message::Text(text.into())).await.is_err() {
break;
}
}
None => break,
}
}
maybe_in = ws_receiver.next() => {
match maybe_in {
Some(Ok(msg)) => {
match msg {
Message::Text(t) => {
if let Ok(rz) = serde_json::from_str::<ResizeMsg>(&t) {
if rz.kind == "resize" && rz.cols > 0 && rz.rows > 0 {
if let Ok(m) = master.lock() {
let _ = m.resize(PtySize { rows: rz.rows, cols: rz.cols, pixel_width: 0, pixel_height: 0});
}
continue;
}
}
if let Ok(mut w) = writer.lock() {
let _ = w.write_all(t.as_bytes());
let _ = w.flush();
}
}
Message::Binary(b) => {
if let Ok(mut w) = writer.lock() {
let _ = w.write_all(&b);
let _ = w.flush();
}
}
Message::Close(_) => break,
Message::Ping(_) | Message::Pong(_) => {}
}
}
Some(Err(_)) | None => break,
}
}
}
}
let _ = child.kill();
let _ = child.wait();
Ok(())
}
fn validate_session_name(state: &AppState, name: &str) -> Result<(), AppError> {
if name.is_empty() || !state.session_name_re.is_match(name) {
return Err(AppError::bad_request(anyhow::anyhow!(
"invalid session name"
)));
}
Ok(())
}
#[derive(serde::Serialize)]
struct MeshSettingsJson {
peer_port: u16,
}
#[derive(serde::Deserialize)]
struct MeshSettingsPutJson {
peer_port: u16,
}
async fn api_get_mesh_settings(
State(state): State<AppState>,
) -> Result<Json<MeshSettingsJson>, AppError> {
let peer_port = tokio::task::spawn_blocking({
let db = state.db.clone();
move || db.mesh_peer_port()
})
.await
.map_err(|e| AppError::internal(anyhow::anyhow!("spawn_blocking: {e}")))?
.map_err(AppError::internal)?;
Ok(Json(MeshSettingsJson { peer_port }))
}
async fn api_set_mesh_settings(
State(state): State<AppState>,
Json(req): Json<MeshSettingsPutJson>,
) -> Result<StatusCode, AppError> {
if req.peer_port == 0 {
return Err(AppError::bad_request(anyhow::anyhow!(
"peer_port must be 1–65535"
)));
}
tokio::task::spawn_blocking({
let db = state.db.clone();
move || db.set_mesh_peer_port(req.peer_port)
})
.await
.map_err(|e| AppError::internal(anyhow::anyhow!("spawn_blocking: {e}")))?
.map_err(AppError::internal)?;
Ok(StatusCode::NO_CONTENT)
}
#[derive(serde::Serialize)]
struct SttProviderJson {
host: String,
port: String,
model: String,
has_key: bool,
}
#[derive(serde::Serialize)]
struct SttConfigGetJson {
#[serde(rename = "activeKind")]
active_kind: String,
providers: std::collections::HashMap<String, SttProviderJson>,
#[serde(skip_serializing_if = "Option::is_none")]
install_cmd: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
start_cmd: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
stop_cmd: Option<String>,
}
#[derive(serde::Deserialize)]
struct SttConfigPutJson {
kind: String,
host: String,
port: String,
model: String,
#[serde(default)]
api_key: Option<String>,
}
async fn api_get_stt_config(
State(state): State<AppState>,
) -> Result<Json<SttConfigGetJson>, AppError> {
let (active_kind, providers, legacy) = tokio::task::spawn_blocking({
let db = state.db.clone();
move || -> anyhow::Result<_> {
let active_kind = db.stt_active_kind()?;
let rows = db.stt_all_providers()?;
let legacy = db.stt_config()?;
Ok((active_kind, rows, legacy))
}
})
.await
.map_err(|e| AppError::internal(anyhow::anyhow!("spawn_blocking: {e}")))?
.map_err(AppError::internal)?;
let mut map = std::collections::HashMap::new();
for row in &providers {
map.insert(
row.kind.clone(),
SttProviderJson {
host: row.host.clone(),
port: row.port.clone(),
model: row.model.clone(),
has_key: row.api_key.as_deref().is_some_and(|k| !k.is_empty()),
},
);
}
Ok(Json(SttConfigGetJson {
active_kind,
providers: map,
install_cmd: legacy.install_cmd,
start_cmd: legacy.start_cmd,
stop_cmd: legacy.stop_cmd,
}))
}
async fn api_set_stt_config(
State(state): State<AppState>,
Json(req): Json<SttConfigPutJson>,
) -> Result<StatusCode, AppError> {
let row = db::SttProviderRow {
kind: req.kind.clone(),
host: req.host,
port: req.port,
model: req.model,
api_key: req.api_key,
};
tokio::task::spawn_blocking({
let db = state.db.clone();
let kind = req.kind.clone();
move || -> anyhow::Result<()> {
db.set_stt_provider(row)?;
db.set_stt_active_kind(&kind)?;
let provider = db
.stt_provider(&kind)?
.unwrap_or_else(|| db::SttProviderRow::default_for(&kind));
let legacy = db.stt_config()?;
db.set_stt_config(db::SttConfig {
kind: kind.clone(),
url: provider.transcription_url(),
model: provider.model,
api_key: provider.api_key,
install_cmd: legacy.install_cmd,
start_cmd: legacy.start_cmd,
stop_cmd: legacy.stop_cmd,
})?;
Ok(())
}
})
.await
.map_err(|e| AppError::internal(anyhow::anyhow!("spawn_blocking: {e}")))?
.map_err(AppError::internal)?;
Ok(StatusCode::NO_CONTENT)
}
async fn api_stt_status(
State(state): State<AppState>,
) -> Result<Json<serde_json::Value>, AppError> {
let (cfg, active_url, active_kind_str) = tokio::task::spawn_blocking({
let db = state.db.clone();
move || -> anyhow::Result<_> {
let cfg = db.stt_config()?;
let kind = db.stt_active_kind()?;
let row = db
.stt_provider(&kind)?
.unwrap_or_else(|| db::SttProviderRow::default_for(&kind));
let url = row.transcription_url();
Ok((cfg, url, kind))
}
})
.await
.map_err(|e| AppError::internal(anyhow::anyhow!("spawn_blocking: {e}")))?
.map_err(AppError::internal)?;
let reachable = transcribe::probe_provider(&active_url).await;
let local_process_running = tokio::process::Command::new("podman")
.args([
"ps",
"--filter",
"name=^mobux-stt$",
"--filter",
"status=running",
"--format",
"{{.Names}}",
])
.output()
.await
.map(|o| !o.stdout.trim_ascii().is_empty())
.unwrap_or(false);
let installed = state.data_dir.join("stt").join(".installed").exists();
let (install_phase, install_error, install_output) = {
let guard = state.stt_install.lock().await;
let (phase_str, error) = match &guard.phase {
InstallPhase::Idle => ("idle", None),
InstallPhase::Running => ("running", None),
InstallPhase::Success => ("success", None),
InstallPhase::Failed(e) => ("failed", Some(e.clone())),
};
(phase_str, error, guard.output_tail.clone())
};
let mut body = json!({
"kind": active_kind_str,
"url": active_url,
"reachable": reachable,
"local_process_running": local_process_running,
"installed": installed,
"install_phase": install_phase,
"install_output": install_output,
});
let _ = cfg; if let Some(err) = install_error {
body["install_error"] = serde_json::Value::String(err);
}
Ok(Json(body))
}
async fn api_stt_install(State(state): State<AppState>) -> Result<impl IntoResponse, AppError> {
{
let mut guard = state.stt_install.lock().await;
if guard.phase == InstallPhase::Running {
return Ok((
StatusCode::CONFLICT,
Json(json!({"status": "already_running"})),
));
}
guard.phase = InstallPhase::Running;
guard.output_tail.clear();
}
let install_state = state.stt_install.clone();
let db = state.db.clone();
tokio::spawn(async move {
let cfg = tokio::task::spawn_blocking({
let db = db.clone();
move || db.stt_config()
})
.await;
let cmd_str = match cfg {
Ok(Ok(c)) => match c.install_cmd {
Some(s) => s,
None => {
let mut guard = install_state.lock().await;
guard.phase = InstallPhase::Failed("no install_cmd configured".to_string());
return;
}
},
Ok(Err(e)) => {
let mut guard = install_state.lock().await;
guard.phase = InstallPhase::Failed(format!("db error: {e}"));
return;
}
Err(e) => {
let mut guard = install_state.lock().await;
guard.phase = InstallPhase::Failed(format!("spawn_blocking error: {e}"));
return;
}
};
use std::process::Stdio;
use tokio::io::{AsyncBufReadExt, BufReader};
let mut child = match tokio::process::Command::new("sh")
.arg("-c")
.arg(&cmd_str)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
{
Ok(c) => c,
Err(e) => {
let mut guard = install_state.lock().await;
guard.phase = InstallPhase::Failed(format!("spawn error: {e}"));
return;
}
};
let stdout = child.stdout.take().expect("stdout piped");
let stderr = child.stderr.take().expect("stderr piped");
let state_for_stdout = install_state.clone();
let state_for_stderr = install_state.clone();
let stdout_task = tokio::spawn(async move {
let mut reader = BufReader::new(stdout).lines();
while let Ok(Some(line)) = reader.next_line().await {
let mut guard = state_for_stdout.lock().await;
if guard.output_tail.len() >= 200 {
guard.output_tail.remove(0);
}
guard.output_tail.push(line);
}
});
let stderr_task = tokio::spawn(async move {
let mut last_line = String::new();
let mut reader = BufReader::new(stderr).lines();
while let Ok(Some(line)) = reader.next_line().await {
let mut guard = state_for_stderr.lock().await;
if guard.output_tail.len() >= 200 {
guard.output_tail.remove(0);
}
guard.output_tail.push(line.clone());
drop(guard);
last_line = line;
}
last_line
});
let _ = stdout_task.await;
let stderr_summary = stderr_task.await.unwrap_or_default();
let exit_status = child.wait().await;
let mut guard = install_state.lock().await;
match exit_status {
Ok(s) if s.success() => {
guard.phase = InstallPhase::Success;
}
Ok(s) => {
guard.phase = InstallPhase::Failed(format!(
"exit {}: {}",
s.code().unwrap_or(-1),
stderr_summary
));
}
Err(e) => {
guard.phase = InstallPhase::Failed(format!("wait error: {e}"));
}
}
});
Ok((StatusCode::ACCEPTED, Json(json!({"status": "started"}))))
}
async fn api_stt_install_status(
State(state): State<AppState>,
) -> Result<Json<serde_json::Value>, AppError> {
let guard = state.stt_install.lock().await;
let (phase_str, error) = match &guard.phase {
InstallPhase::Idle => ("idle", None),
InstallPhase::Running => ("running", None),
InstallPhase::Success => ("success", None),
InstallPhase::Failed(e) => ("failed", Some(e.clone())),
};
Ok(Json(json!({
"phase": phase_str,
"output": guard.output_tail,
"error": error,
})))
}
async fn api_stt_start(State(state): State<AppState>) -> Result<StatusCode, AppError> {
let cfg = tokio::task::spawn_blocking({
let db = state.db.clone();
move || db.stt_config()
})
.await
.map_err(|e| AppError::internal(anyhow::anyhow!("spawn_blocking: {e}")))?
.map_err(AppError::internal)?;
let cmd_str = cfg
.start_cmd
.ok_or_else(|| AppError::bad_request(anyhow::anyhow!("no start_cmd configured")))?;
tokio::process::Command::new("sh")
.arg("-c")
.arg(&cmd_str)
.spawn()
.map_err(|e| AppError::internal(anyhow::anyhow!("spawn start: {e}")))?
.wait()
.await
.map_err(|e| AppError::internal(anyhow::anyhow!("start cmd failed: {e}")))?;
Ok(StatusCode::NO_CONTENT)
}
async fn api_stt_stop(State(state): State<AppState>) -> Result<StatusCode, AppError> {
let cfg = tokio::task::spawn_blocking({
let db = state.db.clone();
move || db.stt_config()
})
.await
.map_err(|e| AppError::internal(anyhow::anyhow!("spawn_blocking: {e}")))?
.map_err(AppError::internal)?;
let cmd_str = cfg
.stop_cmd
.ok_or_else(|| AppError::bad_request(anyhow::anyhow!("no stop_cmd configured")))?;
tokio::process::Command::new("sh")
.arg("-c")
.arg(&cmd_str)
.spawn()
.map_err(|e| AppError::internal(anyhow::anyhow!("spawn stop: {e}")))?
.wait()
.await
.map_err(|e| AppError::internal(anyhow::anyhow!("stop cmd failed: {e}")))?;
Ok(StatusCode::NO_CONTENT)
}
async fn api_stt_models(
State(state): State<AppState>,
Query(q): Query<SttModelsQuery>,
) -> Result<Json<serde_json::Value>, AppError> {
use std::time::Duration;
let fallback_for_kind = |kind: &str| -> Vec<String> {
if kind == "openai" {
vec![
"whisper-1".to_string(),
"gpt-4o-transcribe".to_string(),
"gpt-4o-mini-transcribe".to_string(),
]
} else {
vec![
"Systran/faster-whisper-small".to_string(),
"Systran/faster-whisper-small.en".to_string(),
"Systran/faster-whisper-medium.en".to_string(),
]
}
};
let (base_url, api_key, kind) = if q.host.as_deref().map(|h| !h.is_empty()).unwrap_or(false) {
let raw_host = q.host.as_deref().unwrap_or("").trim_end_matches('/');
let host = if raw_host.contains("://") {
raw_host.to_string()
} else {
format!("http://{}", raw_host)
};
let port = q.port.as_deref().unwrap_or("");
let base = if port.is_empty() {
host
} else {
format!("{}:{}", host, port)
};
let k = q.kind.clone().unwrap_or_default();
let api_key = if k == "openai" {
let kc = k.clone();
tokio::task::spawn_blocking({
let db = state.db.clone();
move || db.stt_provider(&kc)
})
.await
.map_err(|e| AppError::internal(anyhow::anyhow!("spawn_blocking: {e}")))?
.map_err(AppError::internal)?
.and_then(|r| r.api_key)
.filter(|k| !k.is_empty())
} else {
None
};
(base, api_key, k)
} else {
tokio::task::spawn_blocking({
let db = state.db.clone();
move || -> anyhow::Result<_> {
let kind = db.stt_active_kind()?;
let row = db
.stt_provider(&kind)?
.unwrap_or_else(|| db::SttProviderRow::default_for(&kind));
let base = {
let raw = row.host.trim_end_matches('/');
let h = if raw.contains("://") {
raw.to_string()
} else {
format!("http://{}", raw)
};
if row.port.is_empty() {
h
} else {
format!("{}:{}", h, row.port)
}
};
let key = row.api_key.filter(|k| !k.is_empty());
Ok((base, key, kind))
}
})
.await
.map_err(|e| AppError::internal(anyhow::anyhow!("spawn_blocking: {e}")))?
.map_err(AppError::internal)?
};
if base_url.is_empty() {
return Ok(Json(serde_json::json!({
"models": fallback_for_kind(&kind)
})));
}
let models_url = format!("{}/v1/models", base_url.trim_end_matches('/'));
let client = match reqwest::Client::builder()
.timeout(Duration::from_secs(5))
.build()
{
Ok(c) => c,
Err(_) => {
return Ok(Json(
serde_json::json!({ "models": fallback_for_kind(&kind) }),
));
}
};
let mut req = client.get(&models_url);
if let Some(key) = &api_key {
req = req.bearer_auth(key);
}
let ids: Vec<String> = match req.send().await {
Ok(resp) if resp.status().is_success() => resp
.json::<serde_json::Value>()
.await
.ok()
.and_then(|v| v.get("data").cloned())
.and_then(|d| d.as_array().cloned())
.map(|arr| {
arr.iter()
.filter_map(|m| m.get("id").and_then(|id| id.as_str()).map(String::from))
.collect()
})
.filter(|v: &Vec<String>| !v.is_empty())
.unwrap_or_else(|| fallback_for_kind(&kind)),
_ => fallback_for_kind(&kind),
};
Ok(Json(serde_json::json!({ "models": ids })))
}
#[derive(Debug)]
struct AppError {
status: StatusCode,
message: String,
}
impl AppError {
fn bad_request(err: anyhow::Error) -> Self {
Self {
status: StatusCode::BAD_REQUEST,
message: err.to_string(),
}
}
fn internal(err: anyhow::Error) -> Self {
Self {
status: StatusCode::INTERNAL_SERVER_ERROR,
message: err.to_string(),
}
}
}
impl IntoResponse for AppError {
fn into_response(self) -> Response {
(self.status, self.message).into_response()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn serve_static_is_no_store() {
use axum::http::header;
let resp = serve_static(Path("style.css".to_string())).await;
assert_eq!(resp.status(), StatusCode::OK);
let cc = resp
.headers()
.get(header::CACHE_CONTROL)
.and_then(|v| v.to_str().ok())
.unwrap_or("");
assert!(
cc.contains("no-store"),
"static assets must be no-store, got Cache-Control: {cc:?}"
);
assert!(
!cc.contains("immutable"),
"static assets must never be immutable, got Cache-Control: {cc:?}"
);
}
#[test]
fn twa_declares_record_audio_when_web_uses_getusermedia() {
use std::fs;
let static_dir = std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("web/static");
let mut uses_mic = false;
let mut stack = vec![static_dir];
while let Some(dir) = stack.pop() {
let Ok(entries) = fs::read_dir(&dir) else {
continue;
};
for entry in entries.flatten() {
let path = entry.path();
if path.is_dir() {
stack.push(path);
} else if path.extension().and_then(|e| e.to_str()) == Some("js") {
if let Ok(src) = fs::read_to_string(&path) {
if src.contains("getUserMedia") {
uses_mic = true;
}
}
}
}
}
if uses_mic {
let init_js = fs::read_to_string(
std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("twa/init.js"),
)
.expect("twa/init.js must exist");
assert!(
init_js.contains("android.permission.RECORD_AUDIO"),
"web/static uses getUserMedia but twa/init.js does not inject \
android.permission.RECORD_AUDIO — the TWA mic prompt will be \
denied at the OS layer"
);
}
}
#[test]
fn base64url_round_trip_p256_point() {
let bytes: Vec<u8> = (0..65u8).collect();
let encoded = BASE64URL.encode(&bytes);
assert!(
!encoded.contains('='),
"URL_SAFE_NO_PAD must not emit padding"
);
assert!(
!encoded.contains('+') && !encoded.contains('/'),
"URL_SAFE_NO_PAD must use URL-safe alphabet"
);
let decoded = BASE64URL.decode(encoded).expect("round-trip decode");
assert_eq!(decoded, bytes);
}
#[test]
fn base64url_decode_rejects_bad_input() {
assert!(BASE64URL.decode("AAAA=").is_err());
assert!(BASE64URL.decode("AA+/").is_err());
}
#[test]
fn decode_b64url_helper_returns_400_on_garbage() {
let err = decode_b64url("p256dh", "!!not-valid!!").expect_err("must error");
assert_eq!(err.status, StatusCode::BAD_REQUEST);
assert!(
err.message.contains("p256dh"),
"error mentions field name: {}",
err.message
);
}
#[test]
fn session_name_regex_rejects_tmux_unsafe_chars() {
let re = Regex::new(r"^[a-zA-Z0-9_-]+$").unwrap();
for ok in ["foo", "my_app", "build-2", "ABC", "0"] {
assert!(re.is_match(ok), "should accept {ok:?}");
}
for bad in ["my.app", "a:b", "with space", ""] {
assert!(!re.is_match(bad), "should reject {bad:?}");
}
}
#[test]
fn pinned_host_rejects_script_breakout() {
for bad in [
"</script><script>alert(1)</script>",
"<img src=x onerror=alert(1)>",
"<svg",
"a>b",
"a\"b",
"a'b",
"",
] {
let err = validate_pinned_host(bad).expect_err(&format!("should reject {bad:?}"));
assert_eq!(err.status, StatusCode::BAD_REQUEST, "for {bad:?}");
}
assert_eq!(validate_pinned_host("box:8443").unwrap(), "box:8443");
assert_eq!(
validate_pinned_host("host-1.tailnet.ts.net").unwrap(),
"host-1.tailnet.ts.net:8080"
);
}
fn test_state(dev_mode: bool) -> (AppState, tempfile::TempDir) {
let dir = tempfile::tempdir().expect("tempdir");
let db = Arc::new(db::Db::open(&dir.path().join("mobux.db")).expect("open db"));
let state = AppState {
session_name_re: Arc::new(Regex::new(r"^[a-zA-Z0-9_-]+$").unwrap()),
auth: None,
cache_bust: "test".to_string(),
db,
internal_token: Arc::new("test-token".to_string()),
port: 8080,
data_dir: dir.path().to_path_buf(),
use_tls: false,
update: update::UpdateState::new(),
dev_mode,
build_hash: "test".to_string(),
stt_install: Arc::new(tokio::sync::Mutex::new(SttInstallState {
phase: InstallPhase::Idle,
output_tail: vec![],
})),
};
(state, dir)
}
#[tokio::test]
async fn telemetry_endpoint_inert_when_dev_off() {
let (state, _dir) = test_state(false);
let status = api_telemetry(State(state), "hello".to_string()).await;
assert_eq!(status, StatusCode::NOT_FOUND);
}
#[tokio::test]
async fn telemetry_endpoint_active_when_dev_on() {
let (state, _dir) = test_state(true);
let status = api_telemetry(State(state), "hello".to_string()).await;
assert_eq!(status, StatusCode::NO_CONTENT);
}
#[tokio::test]
async fn stt_install_returns_409_when_already_running() {
let (state, _dir) = test_state(false);
{
let mut guard = state.stt_install.lock().await;
guard.phase = InstallPhase::Running;
}
let result = api_stt_install(State(state)).await;
match result {
Ok(resp) => {
let resp = resp.into_response();
assert_eq!(resp.status(), StatusCode::CONFLICT);
}
Err(_) => panic!("expected Ok with 409"),
}
}
#[tokio::test]
async fn stt_status_installed_reflects_sentinel() {
let (state, dir) = test_state(false);
let resp = api_stt_status(State(state.clone())).await.unwrap();
assert_eq!(resp.0["installed"], false);
let stt_dir = dir.path().join("stt");
std::fs::create_dir_all(&stt_dir).unwrap();
std::fs::File::create(stt_dir.join(".installed")).unwrap();
let resp2 = api_stt_status(State(state)).await.unwrap();
assert_eq!(resp2.0["installed"], true);
}
#[tokio::test]
async fn stt_models_returns_fallback_when_no_config() {
let (state, _dir) = test_state(false);
let q = SttModelsQuery {
kind: None,
host: None,
port: None,
};
let result = api_stt_models(State(state), Query(q)).await;
let Json(val) = result.expect("handler should not error");
let models = val["models"].as_array().expect("models array");
assert!(!models.is_empty(), "fallback models must not be empty");
}
#[tokio::test]
async fn stt_models_returns_openai_fallback_for_openai_kind() {
let (state, _dir) = test_state(false);
let q = SttModelsQuery {
kind: Some("openai".to_string()),
host: Some("https://api.openai.com".to_string()),
port: Some("443".to_string()),
};
let result = api_stt_models(State(state), Query(q)).await;
let Json(val) = result.expect("handler should not error");
let models: Vec<String> = val["models"]
.as_array()
.expect("models array")
.iter()
.filter_map(|v| v.as_str().map(String::from))
.collect();
assert!(!models.is_empty());
for m in &models {
assert!(!m.is_empty(), "model id must not be empty");
}
}
}