use axum::{
Json,
extract::{Multipart, Path, State},
http::{HeaderMap, StatusCode},
};
use serde_json::{Value, json};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::io::AsyncWriteExt;
use tokio::sync::Mutex;
use tokio_util::sync::CancellationToken;
use tracing::{error, info};
use uuid::Uuid;
use crate::api::jobs::{Job, JobRequest, JobResult, JobStatus, JobStore, parse_model};
use crate::credits::{self, CreditStore, is_valid_device_id};
use crate::llm::summarize_and_diagram;
use crate::transcriber::{TranscriberEngine, TranscriptionOptions};
use crate::utils::paths::get_default_output_dir;
#[derive(Clone)]
pub struct AppState {
pub jobs: JobStore,
pub engine: Arc<Mutex<TranscriberEngine>>,
pub credits: CreditStore,
}
const DEVICE_ID_HEADER: &str = "x-device-id";
pub(crate) fn require_device_id_pub(
headers: &HeaderMap,
) -> Result<String, (StatusCode, Json<Value>)> {
require_device_id(headers)
}
fn require_device_id(headers: &HeaderMap) -> Result<String, (StatusCode, Json<Value>)> {
let raw = headers
.get(DEVICE_ID_HEADER)
.and_then(|v| v.to_str().ok())
.map(|s| s.trim().to_string());
let id = match raw {
Some(s) if !s.is_empty() => s,
_ => {
return Err(bad_request(
"missing required header: X-Device-Id (client must generate and persist a UUID)",
));
}
};
if !is_valid_device_id(&id) {
return Err(bad_request(
"invalid X-Device-Id: must be alphanumeric + dashes, ≤128 chars",
));
}
Ok(id)
}
fn payment_required(balance: i32) -> (StatusCode, Json<Value>) {
(
StatusCode::PAYMENT_REQUIRED,
Json(json!({
"error": "out of credits",
"balance": balance,
"checkout_endpoint": "/api/checkout",
})),
)
}
pub async fn create_job(
State(state): State<AppState>,
headers: HeaderMap,
Json(req): Json<JobRequest>,
) -> (StatusCode, Json<Value>) {
let device_id = match require_device_id(&headers) {
Ok(id) => id,
Err(e) => return e,
};
if credits::reserve(&state.credits, &device_id).await.is_err() {
return payment_required(0);
}
let job_id = Uuid::new_v4();
let now = now_unix();
let cancel = CancellationToken::new();
let job = Job {
id: job_id,
status: JobStatus::Queued,
url: req.url.clone(),
device_id: device_id.clone(),
created_at: now,
updated_at: now,
result: None,
error: None,
cancel: cancel.clone(),
};
{
let mut store = state.jobs.lock().await;
store.insert(job_id, job);
}
info!("Created job {} for url {}", job_id, req.url);
let store = state.jobs.clone();
let engine = state.engine.clone();
let credit_store = state.credits.clone();
tokio::spawn(async move {
run_with_cancel(job_id, req, engine, store, credit_store, device_id, cancel).await
});
(StatusCode::ACCEPTED, Json(json!({ "job_id": job_id })))
}
pub async fn get_balance(
State(state): State<AppState>,
headers: HeaderMap,
) -> (StatusCode, Json<Value>) {
let device_id = match require_device_id(&headers) {
Ok(id) => id,
Err(e) => return e,
};
let bal = credits::balance(&state.credits, &device_id).await;
(StatusCode::OK, Json(json!({ "balance": bal })))
}
pub async fn get_job(
State(state): State<AppState>,
Path(id): Path<Uuid>,
) -> Result<Json<Job>, StatusCode> {
let store = state.jobs.lock().await;
store
.get(&id)
.cloned()
.map(Json)
.ok_or(StatusCode::NOT_FOUND)
}
pub async fn cancel_job(
State(state): State<AppState>,
Path(id): Path<Uuid>,
) -> Result<Json<Value>, StatusCode> {
let store = state.jobs.lock().await;
let job = store.get(&id).ok_or(StatusCode::NOT_FOUND)?;
job.cancel.cancel();
info!("Cancel signalled for job {} (current status: {:?})", id, job.status);
Ok(Json(json!({ "ok": true, "status": job.status })))
}
pub async fn upload_job(
State(state): State<AppState>,
headers: HeaderMap,
mut multipart: Multipart,
) -> (StatusCode, Json<Value>) {
let device_id = match require_device_id(&headers) {
Ok(id) => id,
Err(e) => return e,
};
if credits::reserve(&state.credits, &device_id).await.is_err() {
return payment_required(0);
}
let cancel = CancellationToken::new();
let mut saved_path: Option<PathBuf> = None;
let mut original_filename: Option<String> = None;
let mut model_str: Option<String> = None;
let mut language: Option<String> = None;
loop {
let field = match multipart.next_field().await {
Ok(Some(f)) => f,
Ok(None) => break,
Err(e) => {
return bad_request(&format!("multipart error: {}", e));
}
};
let name = field.name().unwrap_or("").to_string();
match name.as_str() {
"file" => {
let raw_name = field
.file_name()
.unwrap_or("upload.bin")
.to_string();
let safe_name = sanitize_filename(&raw_name);
let dir = std::env::temp_dir()
.join("transcriber-uploads")
.join(Uuid::new_v4().to_string());
if let Err(e) = tokio::fs::create_dir_all(&dir).await {
return server_error(&format!("mkdir failed: {}", e));
}
let path = dir.join(&safe_name);
let mut file = match tokio::fs::File::create(&path).await {
Ok(f) => f,
Err(e) => return server_error(&format!("file create: {}", e)),
};
let mut field = field;
loop {
match field.chunk().await {
Ok(Some(chunk)) => {
if let Err(e) = file.write_all(&chunk).await {
return server_error(&format!("write: {}", e));
}
}
Ok(None) => break,
Err(e) => {
return bad_request(&format!("read chunk: {}", e));
}
}
}
if let Err(e) = file.flush().await {
return server_error(&format!("flush: {}", e));
}
original_filename = Some(raw_name);
saved_path = Some(path);
}
"model" => model_str = field.text().await.ok(),
"language" => language = field.text().await.ok(),
_ => {
let _ = field.bytes().await;
}
}
}
let path = match saved_path {
Some(p) => p,
None => return bad_request("missing 'file' field"),
};
let url = path.to_string_lossy().to_string();
let job_id = Uuid::new_v4();
let now = now_unix();
let job = Job {
id: job_id,
status: JobStatus::Queued,
url: url.clone(),
device_id: device_id.clone(),
created_at: now,
updated_at: now,
result: None,
error: None,
cancel: cancel.clone(),
};
{
let mut store = state.jobs.lock().await;
store.insert(job_id, job);
}
info!(
"Created upload job {} for file {} ({})",
job_id,
original_filename.as_deref().unwrap_or("?"),
url
);
let req = JobRequest {
url,
model: model_str,
language,
};
let store = state.jobs.clone();
let engine = state.engine.clone();
let credit_store = state.credits.clone();
tokio::spawn(async move {
run_with_cancel(job_id, req, engine, store, credit_store, device_id, cancel).await
});
(StatusCode::ACCEPTED, Json(json!({ "job_id": job_id })))
}
fn sanitize_filename(name: &str) -> String {
let cleaned: String = name
.chars()
.map(|c| match c {
'/' | '\\' | ':' | '*' | '?' | '"' | '<' | '>' | '|' | '\0' => '_',
c => c,
})
.collect();
if cleaned.trim().is_empty() {
"upload.bin".to_string()
} else {
cleaned.chars().take(200).collect()
}
}
fn bad_request(msg: &str) -> (StatusCode, Json<Value>) {
(StatusCode::BAD_REQUEST, Json(json!({ "error": msg })))
}
fn server_error(msg: &str) -> (StatusCode, Json<Value>) {
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({ "error": msg })),
)
}
async fn run_with_cancel(
job_id: Uuid,
req: JobRequest,
engine: Arc<Mutex<TranscriberEngine>>,
store: JobStore,
credit_store: CreditStore,
device_id: String,
cancel: CancellationToken,
) {
tokio::select! {
_ = cancel.cancelled() => {
info!("Job {} cancelled by client", job_id);
mark_cancelled(&store, job_id).await;
credits::refund(&credit_store, &device_id).await;
}
_ = run_pipeline(job_id, req, engine, store.clone(), credit_store.clone(), device_id.clone()) => {
}
}
}
async fn run_pipeline(
job_id: Uuid,
req: JobRequest,
engine: Arc<Mutex<TranscriberEngine>>,
store: JobStore,
credit_store: CreditStore,
device_id: String,
) {
let model = parse_model(req.model.as_deref());
let options = TranscriptionOptions {
url: req.url.clone(),
output_dir: get_default_output_dir().to_string_lossy().to_string(),
model,
language: req.language.clone(),
};
update_status(&store, job_id, JobStatus::Downloading).await;
update_status(&store, job_id, JobStatus::Transcribing).await;
let transcription = {
let eng = engine.lock().await;
eng.transcribe(options).await
};
let transcription = match transcription {
Ok(t) => t,
Err(e) => {
error!("Transcription failed for job {}: {:#}", job_id, e);
mark_failed(&store, job_id, format!("{:#}", e)).await;
credits::refund(&credit_store, &device_id).await;
return;
}
};
update_status(&store, job_id, JobStatus::Summarizing).await;
let llm = match summarize_and_diagram(&transcription.transcript, &transcription.metadata).await
{
Ok(l) => l,
Err(e) => {
error!("LLM step failed for job {}: {:#}", job_id, e);
mark_failed(&store, job_id, format!("{:#}", e)).await;
credits::refund(&credit_store, &device_id).await;
return;
}
};
let result = JobResult {
transcript: transcription.transcript.clone(),
segments: transcription.segments.clone(),
metadata: transcription.metadata.clone(),
summary_md: llm.summary_md,
mermaid_src: llm.mermaid_src,
key_points: llm.key_points,
model_used: transcription.model_used.as_str().to_string(),
};
{
let mut store = store.lock().await;
if let Some(job) = store.get_mut(&job_id) {
job.status = JobStatus::Complete;
job.result = Some(result);
job.updated_at = now_unix();
}
}
info!("Job {} complete", job_id);
}
async fn update_status(store: &JobStore, job_id: Uuid, status: JobStatus) {
let mut store = store.lock().await;
if let Some(job) = store.get_mut(&job_id) {
job.status = status;
job.updated_at = now_unix();
}
}
async fn mark_failed(store: &JobStore, job_id: Uuid, error: String) {
let mut store = store.lock().await;
if let Some(job) = store.get_mut(&job_id) {
if !matches!(
job.status,
JobStatus::Complete | JobStatus::Failed | JobStatus::Cancelled
) {
job.status = JobStatus::Failed;
job.error = Some(error);
job.updated_at = now_unix();
}
}
}
async fn mark_cancelled(store: &JobStore, job_id: Uuid) {
let mut store = store.lock().await;
if let Some(job) = store.get_mut(&job_id) {
if !matches!(
job.status,
JobStatus::Complete | JobStatus::Failed | JobStatus::Cancelled
) {
job.status = JobStatus::Cancelled;
job.updated_at = now_unix();
}
}
}
fn now_unix() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs() as i64)
.unwrap_or(0)
}