use crate::calendar::CalendarEvent;
use crate::config::Config;
use crate::error::{MinutesError, TranscribeError};
use crate::markdown::{ContentType, OutputStatus};
use crate::pid::{self, CaptureMode, PidGuard};
use crate::pipeline::{self, BackgroundPipelineContext, PipelineStage};
use chrono::{DateTime, Local};
use serde_json::json;
use std::fs;
use std::io::ErrorKind;
use std::panic::{catch_unwind, AssertUnwindSafe};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
pub const MAX_AUTO_RETRIES: u32 = 2;
static JOB_COUNTER: AtomicU64 = AtomicU64::new(0);
const MIGRATION_MARKER: &str = ".archive-initialized-v1";
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum JobState {
Queued,
Transcribing,
TranscriptOnly,
Diarizing,
Summarizing,
Saving,
NeedsReview,
Complete,
Failed,
}
impl JobState {
pub fn is_terminal(self) -> bool {
matches!(self, Self::NeedsReview | Self::Complete | Self::Failed)
}
pub fn default_stage(self) -> Option<String> {
match self {
Self::Queued => Some("Queued for processing".into()),
Self::Transcribing => Some("Transcribing meeting".into()),
Self::TranscriptOnly => Some("Transcript ready, enriching artifact".into()),
Self::Diarizing => Some("Separating speakers".into()),
Self::Summarizing => Some("Generating summary".into()),
Self::Saving => Some("Saving artifact".into()),
Self::NeedsReview => Some("Needs review — raw capture preserved".into()),
Self::Complete => None,
Self::Failed => Some("Processing failed".into()),
}
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ProcessingJob {
pub id: String,
pub mode: CaptureMode,
pub content_type: ContentType,
pub title: Option<String>,
pub audio_path: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub output_path: Option<String>,
pub state: JobState,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub stage: Option<String>,
pub created_at: DateTime<Local>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub started_at: Option<DateTime<Local>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub finished_at: Option<DateTime<Local>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub notice_dismissed_at: Option<DateTime<Local>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub recording_started_at: Option<DateTime<Local>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub recording_finished_at: Option<DateTime<Local>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub context_session_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub user_notes: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub pre_context: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub consent: Option<crate::markdown::ConsentBasis>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub consent_notice: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub calendar_event: Option<CalendarEvent>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub template_slug: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub recording_health: Option<crate::markdown::RecordingHealth>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub word_count: Option<usize>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub owner_pid: Option<u32>,
#[serde(default)]
pub retry_count: u32,
}
fn next_job_id() -> String {
let ts = Local::now().format("%Y%m%d%H%M%S%3f");
let pid = std::process::id();
let counter = JOB_COUNTER.fetch_add(1, Ordering::Relaxed);
format!("job-{}-{}-{}", ts, pid, counter)
}
fn describe_panic_payload(payload: &(dyn std::any::Any + Send)) -> String {
if let Some(message) = payload.downcast_ref::<&str>() {
(*message).to_string()
} else if let Some(message) = payload.downcast_ref::<String>() {
message.clone()
} else {
"panic payload was not a string".to_string()
}
}
#[allow(clippy::too_many_arguments)]
pub fn queue_live_capture(
mode: CaptureMode,
title: Option<String>,
current_wav: &Path,
user_notes: Option<String>,
pre_context: Option<String>,
recording_started_at: Option<DateTime<Local>>,
recording_finished_at: Option<DateTime<Local>>,
context_session_id: Option<String>,
calendar_event: Option<CalendarEvent>,
template_slug: Option<String>,
) -> std::io::Result<ProcessingJob> {
queue_live_capture_with_recording_health(
mode,
title,
current_wav,
user_notes,
pre_context,
recording_started_at,
recording_finished_at,
context_session_id,
calendar_event,
template_slug,
None,
None,
None,
)
}
#[allow(clippy::too_many_arguments)]
pub fn queue_live_capture_with_recording_health(
mode: CaptureMode,
title: Option<String>,
current_wav: &Path,
user_notes: Option<String>,
pre_context: Option<String>,
recording_started_at: Option<DateTime<Local>>,
recording_finished_at: Option<DateTime<Local>>,
context_session_id: Option<String>,
calendar_event: Option<CalendarEvent>,
template_slug: Option<String>,
consent: Option<crate::markdown::ConsentBasis>,
consent_notice: Option<String>,
recording_health: Option<crate::markdown::RecordingHealth>,
) -> std::io::Result<ProcessingJob> {
let job_id = next_job_id();
let old_screen_dir = crate::screen::screens_dir_for(current_wav);
let audio_path = move_capture_into_job(&job_id, current_wav)?;
let new_screen_dir = crate::screen::screens_dir_for(&audio_path);
let job = ProcessingJob {
id: job_id,
mode,
content_type: mode.content_type(),
title,
audio_path: audio_path.display().to_string(),
output_path: None,
state: JobState::Queued,
stage: JobState::Queued.default_stage(),
created_at: Local::now(),
started_at: None,
finished_at: None,
notice_dismissed_at: None,
recording_started_at,
recording_finished_at,
context_session_id,
user_notes,
pre_context,
consent,
consent_notice,
calendar_event,
template_slug,
recording_health,
word_count: None,
error: None,
owner_pid: None,
retry_count: 0,
};
if let Err(error) = write_job(&job) {
if audio_path.exists() {
fs::rename(&audio_path, current_wav).ok();
move_stems_with_audio(&audio_path, current_wav).ok();
}
if new_screen_dir.exists() {
if old_screen_dir.exists() {
fs::remove_dir_all(&old_screen_dir).ok();
}
fs::rename(&new_screen_dir, &old_screen_dir).ok();
}
return Err(error);
}
maybe_mark_context_session_processing(&job, &audio_path);
Ok(job)
}
pub fn jobs_dir() -> PathBuf {
Config::minutes_dir().join("jobs")
}
pub fn archive_dir() -> PathBuf {
jobs_dir().join("archive")
}
pub fn worker_pid_path() -> PathBuf {
Config::minutes_dir().join("processing-worker.pid")
}
pub fn job_path(job_id: &str) -> PathBuf {
jobs_dir().join(format!("{}.json", job_id))
}
pub fn job_archive_path(job_id: &str) -> PathBuf {
archive_dir().join(format!("{}.json", job_id))
}
pub fn job_capture_path(job_id: &str) -> PathBuf {
jobs_dir().join(format!("{}.wav", job_id))
}
fn job_capture_path_for_source(job_id: &str, source: &Path) -> PathBuf {
let ext = source
.extension()
.and_then(|value| value.to_str())
.filter(|value| !value.is_empty())
.unwrap_or("wav");
jobs_dir().join(format!("{}.{}", job_id, ext))
}
pub fn create_worker_guard() -> Result<PidGuard, crate::error::PidError> {
pid::create_pid_guard(&worker_pid_path())
}
pub fn current_worker_pid() -> Option<u32> {
pid::check_pid_file(&worker_pid_path()).ok().flatten()
}
pub fn worker_active() -> bool {
pid::inspect_pid_file(&worker_pid_path()).is_active()
}
pub fn move_capture_into_job(job_id: &str, current_wav: &Path) -> std::io::Result<PathBuf> {
let dest = job_capture_path_for_source(job_id, current_wav);
if let Some(parent) = dest.parent() {
fs::create_dir_all(parent)?;
}
fs::rename(current_wav, &dest)?;
if let Err(error) = move_stems_with_audio(current_wav, &dest) {
fs::rename(&dest, current_wav).ok();
move_stems_with_audio(&dest, current_wav).ok();
return Err(error);
}
let old_screen_dir = crate::screen::screens_dir_for(current_wav);
if old_screen_dir.exists() {
let new_screen_dir = crate::screen::screens_dir_for(&dest);
if let Some(parent) = new_screen_dir.parent() {
fs::create_dir_all(parent)?;
}
if new_screen_dir.exists() {
fs::remove_dir_all(&new_screen_dir).ok();
}
fs::rename(old_screen_dir, new_screen_dir)?;
}
Ok(dest)
}
fn move_stems_with_audio(src_audio: &Path, dest_audio: &Path) -> std::io::Result<()> {
let Some(src_stems) = crate::capture::stem_paths_for(src_audio) else {
return Ok(());
};
let Some(dest_stems) = crate::capture::stem_paths_for(dest_audio) else {
return Ok(());
};
if src_stems.voice.exists() {
fs::rename(&src_stems.voice, &dest_stems.voice)?;
}
if src_stems.system.exists() {
fs::rename(&src_stems.system, &dest_stems.system)?;
}
Ok(())
}
fn preserve_sidecar_stems(audio_src: &Path, audio_dest: &Path) {
let Some(src_stems) = crate::capture::stem_paths_for(audio_src) else {
return;
};
let Some(dest_stems) = crate::capture::stem_paths_for(audio_dest) else {
return;
};
for (src, dest) in [
(src_stems.voice, dest_stems.voice),
(src_stems.system, dest_stems.system),
] {
if !src.exists() {
continue;
}
if let Err(rename_error) = fs::rename(&src, &dest) {
if let Err(copy_error) = fs::copy(&src, &dest) {
tracing::warn!(
src = %src.display(),
dest = %dest.display(),
rename_error = %rename_error,
copy_error = %copy_error,
"failed to preserve capture stem alongside output"
);
continue;
}
fs::remove_file(&src).ok();
}
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
fs::set_permissions(&dest, fs::Permissions::from_mode(0o600)).ok();
}
}
}
#[allow(clippy::too_many_arguments)]
pub fn enqueue_capture_job(
mode: CaptureMode,
title: Option<String>,
audio_path: PathBuf,
user_notes: Option<String>,
pre_context: Option<String>,
recording_started_at: Option<DateTime<Local>>,
recording_finished_at: Option<DateTime<Local>>,
context_session_id: Option<String>,
calendar_event: Option<CalendarEvent>,
template_slug: Option<String>,
) -> std::io::Result<ProcessingJob> {
let job = ProcessingJob {
id: next_job_id(),
mode,
content_type: mode.content_type(),
title,
audio_path: audio_path.display().to_string(),
output_path: None,
state: JobState::Queued,
stage: JobState::Queued.default_stage(),
created_at: Local::now(),
started_at: None,
finished_at: None,
notice_dismissed_at: None,
recording_started_at,
recording_finished_at,
context_session_id,
user_notes,
pre_context,
consent: None,
consent_notice: None,
calendar_event,
template_slug,
recording_health: None,
word_count: None,
error: None,
owner_pid: None,
retry_count: 0,
};
write_job(&job)?;
maybe_mark_context_session_processing(&job, Path::new(&job.audio_path));
Ok(job)
}
fn maybe_mark_context_session_processing(job: &ProcessingJob, audio_path: &Path) {
let Some(session_id) = job.context_session_id.as_deref() else {
return;
};
if let Err(error) = crate::context_store::mark_capture_session_processing(
session_id,
&job.id,
audio_path,
job.recording_finished_at,
) {
tracing::warn!(
session_id,
job_id = %job.id,
error = %error,
"failed to mark context session as processing"
);
}
}
fn maybe_mark_context_session_complete(job: &ProcessingJob, content_type: ContentType) {
let Some(session_id) = job.context_session_id.as_deref() else {
return;
};
let Some(output_path) = job.output_path.as_deref() else {
return;
};
let metadata = json!({
"job_id": job.id,
"job_state": match job.state {
JobState::NeedsReview => "needs-review",
JobState::Complete => "complete",
JobState::Failed => "failed",
JobState::Queued => "queued",
JobState::Transcribing => "transcribing",
JobState::TranscriptOnly => "transcript-only",
JobState::Diarizing => "diarizing",
JobState::Summarizing => "summarizing",
JobState::Saving => "saving",
},
});
if let Err(error) = crate::context_store::mark_capture_session_complete(
session_id,
Path::new(output_path),
Some(Path::new(&job.audio_path)),
content_type,
job.finished_at,
metadata,
) {
tracing::warn!(
session_id,
job_id = %job.id,
error = %error,
"failed to finalize context session"
);
}
}
fn maybe_mark_context_session_failed(
job: &ProcessingJob,
diagnostic: &str,
preserved_path: Option<&Path>,
) {
let Some(session_id) = job.context_session_id.as_deref() else {
return;
};
if let Err(error) = crate::context_store::mark_capture_session_failed(
session_id,
job.finished_at.or(job.recording_finished_at),
diagnostic,
preserved_path,
) {
tracing::warn!(
session_id,
job_id = %job.id,
error = %error,
"failed to mark context session as failed"
);
}
}
pub fn write_job(job: &ProcessingJob) -> std::io::Result<()> {
write_job_to(job, &job_path(&job.id))
}
fn write_job_to(job: &ProcessingJob, dest: &Path) -> std::io::Result<()> {
if let Some(parent) = dest.parent() {
fs::create_dir_all(parent)?;
}
let tmp = dest.with_extension("json.tmp");
let json = serde_json::to_string_pretty(job)?;
fs::write(&tmp, json)?;
fs::rename(tmp, dest)?;
Ok(())
}
pub fn load_job(job_id: &str) -> Option<ProcessingJob> {
load_job_with_source(job_id).map(|(job, _)| job)
}
fn load_job_with_source(job_id: &str) -> Option<(ProcessingJob, PathBuf)> {
let active = job_path(job_id);
if let Ok(text) = fs::read_to_string(&active) {
if let Ok(job) = serde_json::from_str::<ProcessingJob>(&text) {
return Some((job, active));
}
}
let archive = job_archive_path(job_id);
if let Ok(text) = fs::read_to_string(&archive) {
if let Ok(job) = serde_json::from_str::<ProcessingJob>(&text) {
return Some((job, archive));
}
}
None
}
fn list_jobs_raw() -> Vec<ProcessingJob> {
ensure_archive_initialized();
let mut jobs = Vec::new();
let dir = jobs_dir();
if !dir.exists() {
return jobs;
}
for entry in fs::read_dir(dir).into_iter().flatten().flatten() {
let path = entry.path();
if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
continue;
}
if let Ok(text) = fs::read_to_string(&path) {
if let Ok(job) = serde_json::from_str::<ProcessingJob>(&text) {
jobs.push(job);
}
}
}
jobs.sort_by_key(|job| job.created_at);
jobs
}
pub fn latest_terminal_job() -> Option<ProcessingJob> {
let mut latest: Option<ProcessingJob> = None;
for job in list_archive_jobs() {
if !job.state.is_terminal() {
continue;
}
let candidate_key = job.finished_at.unwrap_or(job.created_at);
match latest.as_ref() {
None => latest = Some(job),
Some(current) => {
let current_key = current.finished_at.unwrap_or(current.created_at);
if candidate_key > current_key {
latest = Some(job);
}
}
}
}
latest
}
fn list_archive_jobs() -> Vec<ProcessingJob> {
ensure_archive_initialized();
let dir = archive_dir();
if !dir.exists() {
return Vec::new();
}
let mut jobs = Vec::new();
for entry in fs::read_dir(dir).into_iter().flatten().flatten() {
let path = entry.path();
if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
continue;
}
if let Ok(text) = fs::read_to_string(&path) {
if let Ok(job) = serde_json::from_str::<ProcessingJob>(&text) {
jobs.push(job);
}
}
}
jobs
}
fn ensure_archive_initialized() {
let root = jobs_dir();
if let Err(error) = fs::create_dir_all(&root) {
tracing::warn!(error = %error, "failed to initialize jobs directory");
return;
}
let marker = root.join(MIGRATION_MARKER);
if marker.exists() {
return;
}
if let Err(error) = migrate_terminal_jobs_to_archive() {
tracing::warn!(error = %error, "jobs archive migration failed");
return;
}
if let Err(error) = fs::write(&marker, b"v1") {
tracing::warn!(
error = %error,
"failed to write archive migration marker; sweep will retry next call"
);
}
}
fn migrate_terminal_jobs_to_archive() -> std::io::Result<()> {
let active = jobs_dir();
if !active.exists() {
return Ok(());
}
fs::create_dir_all(archive_dir())?;
for entry in fs::read_dir(&active).into_iter().flatten().flatten() {
let path = entry.path();
if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
continue;
}
let Ok(text) = fs::read_to_string(&path) else {
continue;
};
let Ok(job) = serde_json::from_str::<ProcessingJob>(&text) else {
continue;
};
if !job.state.is_terminal() {
continue;
}
let dest = job_archive_path(&job.id);
if let Err(error) = move_to_archive(&path, &dest) {
tracing::warn!(
job_id = %job.id,
error = %error,
"migrate: failed to move job to archive"
);
}
}
Ok(())
}
fn move_to_archive(source: &Path, dest: &Path) -> std::io::Result<()> {
if let Some(parent) = dest.parent() {
fs::create_dir_all(parent)?;
}
match fs::hard_link(source, dest) {
Ok(()) => {
if let Err(error) = fs::remove_file(source) {
if error.kind() != ErrorKind::NotFound {
tracing::warn!(
source = %source.display(),
error = %error,
"move_to_archive: failed to remove source after link"
);
}
}
Ok(())
}
Err(error) if error.kind() == ErrorKind::AlreadyExists => {
tracing::debug!(
source = %source.display(),
dest = %dest.display(),
"archive already populated; dropping active duplicate"
);
if let Err(remove_error) = fs::remove_file(source) {
if remove_error.kind() != ErrorKind::NotFound {
tracing::warn!(
source = %source.display(),
error = %remove_error,
"move_to_archive: failed to drop duplicate"
);
}
}
Ok(())
}
Err(error) if error.kind() == ErrorKind::NotFound => {
if dest.exists() {
Ok(())
} else {
Err(error)
}
}
Err(error) => Err(error),
}
}
pub fn list_jobs() -> Vec<ProcessingJob> {
let snapshot = list_jobs_raw();
let recovery_candidates: Vec<String> = snapshot
.iter()
.filter(|job| {
!job.state.is_terminal()
&& job.owner_pid.is_some()
&& !job.owner_pid.map(pid::is_process_alive).unwrap_or(false)
})
.map(|job| job.id.clone())
.collect();
for id in &recovery_candidates {
let _ = update_job_state(id, |current| {
if current.state.is_terminal()
|| current.owner_pid.is_none()
|| current
.owner_pid
.map(pid::is_process_alive)
.unwrap_or(false)
{
return;
}
current.retry_count = current.retry_count.saturating_add(1);
current.owner_pid = None;
current.started_at = None;
if current.retry_count > MAX_AUTO_RETRIES {
current.state = JobState::Failed;
current.stage = JobState::Failed.default_stage();
current.finished_at = Some(Local::now());
if current.error.is_none() {
current.error = Some(format!(
"transcription worker crashed {} times without producing output; left as Failed for manual retry",
current.retry_count
));
}
} else {
current.state = JobState::Queued;
current.stage = JobState::Queued.default_stage();
}
});
}
if recovery_candidates.is_empty() {
snapshot
} else {
list_jobs_raw()
}
}
pub fn display_jobs(limit: Option<usize>, include_terminal: bool) -> Vec<ProcessingJob> {
let mut jobs = list_jobs();
if include_terminal {
jobs.extend(list_archive_jobs());
}
jobs.sort_by(|a, b| {
job_sort_bucket(a)
.cmp(&job_sort_bucket(b))
.then_with(|| b.created_at.cmp(&a.created_at))
});
if !include_terminal {
jobs.retain(|job| !job.state.is_terminal());
}
if let Some(limit) = limit {
jobs.truncate(limit);
}
jobs
}
pub fn active_jobs() -> Vec<ProcessingJob> {
display_jobs(None, false)
}
pub fn active_job_count() -> usize {
active_jobs().len()
}
pub fn requeue_job(job_id: &str) -> std::io::Result<Option<ProcessingJob>> {
let Some(job) = load_job(job_id) else {
return Ok(None);
};
if !matches!(job.state, JobState::Failed | JobState::NeedsReview) {
return Ok(None);
}
let audio_path = PathBuf::from(&job.audio_path);
if !audio_path.exists() {
return Err(std::io::Error::new(
std::io::ErrorKind::NotFound,
format!("audio file missing for job {}", job_id),
));
}
let Some(requeued) = update_job_state(job_id, |job| {
if !matches!(job.state, JobState::Failed | JobState::NeedsReview) {
return;
}
job.state = JobState::Queued;
job.stage = JobState::Queued.default_stage();
job.started_at = None;
job.finished_at = None;
job.notice_dismissed_at = None;
job.error = None;
job.owner_pid = None;
job.retry_count = 0;
})?
else {
return Ok(None);
};
if requeued.state != JobState::Queued {
return Ok(None);
}
sync_processing_status();
Ok(Some(requeued))
}
pub fn dismiss_job_notice(job_id: &str) -> std::io::Result<Option<ProcessingJob>> {
update_job_state(job_id, |job| {
if matches!(job.state, JobState::Failed | JobState::NeedsReview) {
job.notice_dismissed_at = Some(Local::now());
}
})
}
pub fn processing_summary() -> Option<ProcessingJob> {
active_jobs().into_iter().next()
}
fn job_sort_bucket(job: &ProcessingJob) -> u8 {
if job.state.is_terminal() {
2
} else if job.state == JobState::Queued {
1
} else {
0
}
}
pub fn next_pending_job() -> Option<ProcessingJob> {
list_jobs()
.into_iter()
.find(|job| job.state == JobState::Queued)
}
pub fn update_job_state<F>(job_id: &str, update: F) -> std::io::Result<Option<ProcessingJob>>
where
F: FnOnce(&mut ProcessingJob),
{
let Some((mut job, source)) = load_job_with_source(job_id) else {
return Ok(None);
};
update(&mut job);
let dest = if job.state.is_terminal() {
job_archive_path(&job.id)
} else {
job_path(&job.id)
};
if source == dest {
write_job_to(&job, &dest)?;
return Ok(Some(job));
}
let source_in_active = source == job_path(&job.id);
if !source_in_active {
write_job_to(&job, &dest)?;
if let Err(e) = fs::remove_file(&source) {
if e.kind() != ErrorKind::NotFound {
tracing::warn!(
job_id = %job.id,
error = %e,
"update_job_state: failed to remove archive copy after requeue"
);
}
}
return Ok(Some(job));
}
write_job_to(&job, &source)?;
if let Err(error) = move_to_archive(&source, &dest) {
tracing::warn!(
job_id = %job.id,
error = %error,
"update_job_state: move_to_archive failed (job stays in active dir but is terminal-state)"
);
}
Ok(Some(job))
}
pub fn remove_capture_artifacts(job: &ProcessingJob) {
let audio_path = PathBuf::from(&job.audio_path);
if audio_path.exists() {
fs::remove_file(&audio_path).ok();
}
let screens_dir = crate::screen::screens_dir_for(&audio_path);
if screens_dir.exists() {
fs::remove_dir_all(screens_dir).ok();
}
}
fn terminal_state_for_artifact(artifact: &pipeline::TranscriptArtifact) -> JobState {
if artifact.frontmatter.status == Some(OutputStatus::NoSpeech) {
JobState::NeedsReview
} else {
JobState::Complete
}
}
fn preserve_audio_alongside_output(job: &ProcessingJob) {
let Some(ref output_path) = job.output_path else {
return;
};
let output = PathBuf::from(output_path);
let audio_src = PathBuf::from(&job.audio_path);
if !audio_src.exists() {
return;
}
let audio_dest = match audio_src.extension().filter(|ext| !ext.is_empty()) {
Some(ext) => output.with_extension(ext),
None => output.with_extension("wav"),
};
if let Err(e) = fs::rename(&audio_src, &audio_dest) {
if let Err(e2) = fs::copy(&audio_src, &audio_dest) {
tracing::warn!(
src = %audio_src.display(),
dest = %audio_dest.display(),
error = %e2,
"failed to preserve audio alongside output (rename: {}, copy: {})", e, e2
);
return;
}
fs::remove_file(&audio_src).ok();
}
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
fs::set_permissions(&audio_dest, fs::Permissions::from_mode(0o600)).ok();
}
let screens_dir = crate::screen::screens_dir_for(&audio_src);
if screens_dir.exists() {
fs::remove_dir_all(screens_dir).ok();
}
let dest_str = audio_dest.display().to_string();
update_job_state(&job.id, |j| {
j.audio_path = dest_str;
})
.ok();
preserve_sidecar_stems(&audio_src, &audio_dest);
tracing::info!(
path = %audio_dest.display(),
"preserved audio alongside transcript"
);
}
fn sync_processing_status() {
if let Some(job) = processing_summary() {
let title = job.title.as_deref().or(job.output_path.as_deref());
let _ = pid::set_processing_status(
job.stage.as_deref(),
Some(job.mode),
title,
Some(&job.id),
active_job_count(),
);
} else {
let _ = pid::clear_processing_status();
}
}
fn recording_duration(job: &ProcessingJob) -> String {
match (job.recording_started_at, job.recording_finished_at) {
(Some(start), Some(end)) => {
let secs = end.signed_duration_since(start).num_seconds().max(0);
let mins = secs / 60;
let rem = secs % 60;
if mins > 0 {
format!("{}m {}s", mins, rem)
} else {
format!("{}s", rem)
}
}
_ => "unknown".into(),
}
}
fn refresh_qmd_collection(config: &Config) {
let Some(collection) = config.search.qmd_collection.as_ref() else {
return;
};
let status = std::process::Command::new("qmd")
.args(["update", "-c", collection])
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.status();
if let Err(error) = status {
tracing::debug!(error = %error, collection = %collection, "qmd update skipped");
}
}
fn job_context(job: &ProcessingJob) -> BackgroundPipelineContext {
let template = job.template_slug.as_deref().and_then(|slug| {
match crate::template::TemplateResolver::new().resolve(slug) {
Ok(t) => Some(t),
Err(error) => {
tracing::warn!(slug = %slug, error = %error, "queued template could not be resolved; processing without template");
None
}
}
});
BackgroundPipelineContext {
sidecar: None,
user_notes: job.user_notes.clone(),
pre_context: job.pre_context.clone(),
consent: job.consent,
consent_notice: job.consent_notice.clone(),
calendar_event: job.calendar_event.clone(),
recorded_at: job.recording_started_at.or(job.recording_finished_at),
requested_title: job.title.clone(),
recording_health: job.recording_health.clone(),
template,
}
}
fn stage_state(stage: PipelineStage) -> JobState {
match stage {
PipelineStage::Transcribing => JobState::Transcribing,
PipelineStage::Diarizing => JobState::Diarizing,
PipelineStage::Summarizing => JobState::Summarizing,
PipelineStage::Saving => JobState::Saving,
}
}
pub fn process_pending_jobs<F>(config: &Config, mut on_job_update: F) -> Result<(), MinutesError>
where
F: FnMut(&ProcessingJob),
{
let _guard = create_worker_guard()?;
while let Some(job) = next_pending_job() {
let owner_pid = std::process::id();
let Some(mut job) = update_job_state(&job.id, |job| {
job.state = JobState::Transcribing;
job.stage = JobState::Transcribing.default_stage();
job.owner_pid = Some(owner_pid);
job.started_at.get_or_insert_with(Local::now);
job.error = None;
})?
else {
continue;
};
sync_processing_status();
on_job_update(&job);
let audio_path = PathBuf::from(&job.audio_path);
let context = job_context(&job);
let transcribe_outcome = catch_unwind(AssertUnwindSafe(|| {
pipeline::transcribe_to_artifact(
&audio_path,
job.content_type,
job.title.as_deref(),
config,
&context,
job.output_path.as_deref().map(Path::new),
)
}));
let artifact = match transcribe_outcome {
Ok(Ok(artifact)) => artifact,
Ok(Err(error)) => {
let Some(failed_job) = update_job_state(&job.id, |job| {
job.state = JobState::Failed;
job.stage = JobState::Failed.default_stage();
job.finished_at = Some(Local::now());
job.error = Some(error.to_string());
job.owner_pid = None;
})?
else {
sync_processing_status();
continue;
};
maybe_mark_context_session_failed(&failed_job, &error.to_string(), None);
sync_processing_status();
on_job_update(&failed_job);
continue;
}
Err(payload) => {
let message = format!(
"transcription worker panicked: {}",
describe_panic_payload(payload.as_ref())
);
tracing::error!(
job_id = %job.id,
error = %message,
"transcription worker caught panic"
);
let Some(failed_job) = update_job_state(&job.id, |job| {
job.state = JobState::Failed;
job.stage = JobState::Failed.default_stage();
job.finished_at = Some(Local::now());
job.error = Some(message.clone());
job.owner_pid = None;
})?
else {
sync_processing_status();
continue;
};
maybe_mark_context_session_failed(&failed_job, &message, None);
sync_processing_status();
on_job_update(&failed_job);
continue;
}
};
if artifact.frontmatter.status == Some(OutputStatus::NoSpeech) {
let terminal_state = terminal_state_for_artifact(&artifact);
let Some(review_job) = update_job_state(&job.id, |job| {
job.state = terminal_state;
job.stage = terminal_state.default_stage();
job.output_path = Some(artifact.write_result.path.display().to_string());
job.title = Some(artifact.write_result.title.clone());
job.word_count = Some(artifact.write_result.word_count);
job.finished_at = Some(Local::now());
job.owner_pid = None;
job.error = Some(
artifact
.frontmatter
.filter_diagnosis
.clone()
.unwrap_or_else(|| "Transcript requires manual review.".into()),
);
})?
else {
sync_processing_status();
continue;
};
crate::events::append_event(crate::events::audio_processed_event(
&artifact.write_result,
&audio_path.display().to_string(),
));
crate::events::append_event(crate::events::recording_completed_event(
&artifact.write_result,
&recording_duration(&review_job),
));
maybe_mark_context_session_complete(&review_job, artifact.write_result.content_type);
if let Err(error) = crate::graph::rebuild_index(config) {
tracing::warn!(error = %error, "graph index rebuild failed after queued job");
}
refresh_qmd_collection(config);
sync_processing_status();
on_job_update(&review_job);
continue;
}
let Some(updated_job) = update_job_state(&job.id, |job| {
job.state = JobState::TranscriptOnly;
job.stage = JobState::TranscriptOnly.default_stage();
job.output_path = Some(artifact.write_result.path.display().to_string());
job.title = Some(artifact.write_result.title.clone());
job.word_count = Some(artifact.write_result.word_count);
})?
else {
sync_processing_status();
continue;
};
job = updated_job;
sync_processing_status();
on_job_update(&job);
let enrich_outcome = catch_unwind(AssertUnwindSafe(|| {
pipeline::enrich_transcript_artifact(
&audio_path,
&artifact,
config,
&context,
|stage| {
let state = stage_state(stage);
if let Ok(Some(job)) = update_job_state(&job.id, |job| {
job.state = state;
job.stage = state.default_stage();
}) {
sync_processing_status();
on_job_update(&job);
}
},
)
}));
let enrich_result = match enrich_outcome {
Ok(result) => result,
Err(payload) => {
let message = format!(
"enrichment worker panicked: {}",
describe_panic_payload(payload.as_ref())
);
tracing::error!(
job_id = %job.id,
error = %message,
"enrichment worker caught panic"
);
Err(MinutesError::Transcribe(
TranscribeError::TranscriptionFailed(message),
))
}
};
match enrich_result {
Ok(result) => {
let terminal_state = terminal_state_for_artifact(&artifact);
let Some(completed_job) = update_job_state(&job.id, |job| {
job.state = terminal_state;
job.stage = terminal_state.default_stage();
job.output_path = Some(result.path.display().to_string());
job.title = Some(result.title.clone());
job.word_count = Some(result.word_count);
job.finished_at = Some(Local::now());
job.owner_pid = None;
})?
else {
sync_processing_status();
continue;
};
crate::events::append_event(crate::events::audio_processed_event(
&result,
&audio_path.display().to_string(),
));
crate::events::append_event(crate::events::recording_completed_event(
&result,
&recording_duration(&completed_job),
));
if let Err(error) = crate::graph::rebuild_index(config) {
tracing::warn!(error = %error, "graph index rebuild failed after queued job");
}
refresh_qmd_collection(config);
pipeline::run_post_record_hook(config, &result.path);
if completed_job.state == JobState::Complete {
preserve_audio_alongside_output(&completed_job);
}
let final_job = load_job(&completed_job.id).unwrap_or(completed_job);
maybe_mark_context_session_complete(&final_job, result.content_type);
sync_processing_status();
on_job_update(&final_job);
}
Err(error) => {
let Some(failed_job) = update_job_state(&job.id, |job| {
job.state = JobState::Failed;
job.stage = JobState::Failed.default_stage();
job.finished_at = Some(Local::now());
job.error = Some(error.to_string());
job.owner_pid = None;
})?
else {
sync_processing_status();
continue;
};
maybe_mark_context_session_failed(&failed_job, &error.to_string(), None);
sync_processing_status();
on_job_update(&failed_job);
}
}
}
sync_processing_status();
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::markdown::{Frontmatter, WriteResult};
fn with_temp_home<T>(f: impl FnOnce(&tempfile::TempDir) -> T) -> T {
let _guard = crate::test_home_env_lock();
let dir = tempfile::tempdir().unwrap();
let original_home = std::env::var_os("HOME");
let original_userprofile = std::env::var_os("USERPROFILE");
std::env::set_var("HOME", dir.path());
std::env::set_var("USERPROFILE", dir.path());
let result = f(&dir);
if let Some(home) = original_home {
std::env::set_var("HOME", home);
} else {
std::env::remove_var("HOME");
}
if let Some(up) = original_userprofile {
std::env::set_var("USERPROFILE", up);
} else {
std::env::remove_var("USERPROFILE");
}
result
}
#[test]
fn queue_live_capture_moves_audio_and_writes_job_file() {
with_temp_home(|_| {
let current_wav = pid::current_wav_path();
if let Some(parent) = current_wav.parent() {
fs::create_dir_all(parent).unwrap();
}
fs::write(¤t_wav, b"fake-wav").unwrap();
let current_screens = crate::screen::screens_dir_for(¤t_wav);
fs::create_dir_all(¤t_screens).unwrap();
fs::write(current_screens.join("screen-0000-0000s.png"), b"png").unwrap();
let job = queue_live_capture(
CaptureMode::Meeting,
Some("Back to back".into()),
¤t_wav,
Some("note".into()),
Some("context".into()),
Some(Local::now()),
Some(Local::now()),
None,
None,
None,
)
.unwrap();
assert!(!current_wav.exists());
assert!(job_path(&job.id).exists());
assert!(PathBuf::from(&job.audio_path).exists());
assert!(crate::screen::screens_dir_for(Path::new(&job.audio_path)).exists());
});
}
#[test]
fn queue_live_capture_moves_stems_with_audio() {
with_temp_home(|_| {
let current_wav = pid::current_wav_path();
if let Some(parent) = current_wav.parent() {
fs::create_dir_all(parent).unwrap();
}
fs::write(¤t_wav, b"fake-wav").unwrap();
let stems = crate::capture::stem_paths_for(¤t_wav).unwrap();
fs::write(&stems.voice, b"voice").unwrap();
fs::write(&stems.system, b"system").unwrap();
let job = queue_live_capture(
CaptureMode::Meeting,
Some("Dual source".into()),
¤t_wav,
None,
None,
Some(Local::now()),
Some(Local::now()),
None,
None,
None,
)
.unwrap();
let job_audio = PathBuf::from(&job.audio_path);
let moved_stems = crate::capture::stem_paths_for(&job_audio).unwrap();
assert!(job_audio.exists());
assert!(moved_stems.voice.exists());
assert!(moved_stems.system.exists());
assert!(!stems.voice.exists());
assert!(!stems.system.exists());
});
}
#[test]
fn queue_live_capture_preserves_native_mov_extension_and_stems() {
with_temp_home(|tmp| {
let native_dir = tmp.path().join(".minutes/native-captures");
fs::create_dir_all(&native_dir).unwrap();
let current_mov = native_dir.join("2026-05-19-120148-call.mov");
fs::write(¤t_mov, b"mov-placeholder").unwrap();
let stems = crate::capture::stem_paths_for(¤t_mov).unwrap();
fs::write(&stems.voice, b"voice").unwrap();
fs::write(&stems.system, b"system").unwrap();
let job = queue_live_capture(
CaptureMode::Meeting,
Some("Native call".into()),
¤t_mov,
None,
None,
Some(Local::now()),
Some(Local::now()),
None,
None,
None,
)
.unwrap();
let job_audio = PathBuf::from(&job.audio_path);
assert_eq!(
job_audio.extension().and_then(|ext| ext.to_str()),
Some("mov")
);
assert!(job_audio.exists());
assert!(!current_mov.exists());
let moved_stems = crate::capture::stem_paths_for(&job_audio).unwrap();
assert!(moved_stems.voice.exists());
assert!(moved_stems.system.exists());
assert!(!stems.voice.exists());
assert!(!stems.system.exists());
});
}
#[test]
fn queue_live_capture_persists_recording_health() {
with_temp_home(|_| {
let current_wav = pid::current_wav_path();
if let Some(parent) = current_wav.parent() {
fs::create_dir_all(parent).unwrap();
}
fs::write(¤t_wav, b"fake-wav").unwrap();
let health = crate::health::recording_health_for_skipped_system_audio_probe(
"operator accepted mic-only call",
);
let job = queue_live_capture_with_recording_health(
CaptureMode::Meeting,
Some("Probe skipped".into()),
¤t_wav,
None,
None,
Some(Local::now()),
Some(Local::now()),
None,
None,
None,
None,
None,
Some(health.clone()),
)
.unwrap();
let loaded = load_job(&job.id).unwrap();
assert_eq!(loaded.recording_health, Some(health));
});
}
#[test]
fn queue_live_capture_persists_consent_metadata() {
with_temp_home(|_| {
let current_wav = pid::current_wav_path();
if let Some(parent) = current_wav.parent() {
fs::create_dir_all(parent).unwrap();
}
fs::write(¤t_wav, b"fake-wav").unwrap();
let job = queue_live_capture_with_recording_health(
CaptureMode::Meeting,
Some("Consent metadata".into()),
¤t_wav,
None,
None,
Some(Local::now()),
Some(Local::now()),
None,
None,
None,
Some(crate::markdown::ConsentBasis::NoticeInInvite),
Some("Shared in the invite.".into()),
None,
)
.unwrap();
let loaded = load_job(&job.id).unwrap();
assert_eq!(
loaded.consent,
Some(crate::markdown::ConsentBasis::NoticeInInvite)
);
assert_eq!(
loaded.consent_notice.as_deref(),
Some("Shared in the invite.")
);
let context = job_context(&loaded);
assert_eq!(
context.consent,
Some(crate::markdown::ConsentBasis::NoticeInInvite)
);
assert_eq!(
context.consent_notice.as_deref(),
Some("Shared in the invite.")
);
});
}
#[test]
fn preserve_audio_alongside_output_moves_stems_too() {
with_temp_home(|tmp| {
let jobs_root = jobs_dir();
fs::create_dir_all(&jobs_root).unwrap();
let audio_path = jobs_root.join("job-preserve.wav");
fs::write(&audio_path, b"mixed").unwrap();
let stems = crate::capture::stem_paths_for(&audio_path).unwrap();
fs::write(&stems.voice, b"voice").unwrap();
fs::write(&stems.system, b"system").unwrap();
let output_path = tmp.path().join("meetings/final.md");
fs::create_dir_all(output_path.parent().unwrap()).unwrap();
fs::write(&output_path, "# final").unwrap();
let job = ProcessingJob {
id: "job-preserve".into(),
mode: CaptureMode::Meeting,
content_type: ContentType::Meeting,
title: Some("preserve".into()),
audio_path: audio_path.display().to_string(),
output_path: Some(output_path.display().to_string()),
state: JobState::Complete,
stage: None,
created_at: Local::now(),
started_at: None,
finished_at: None,
notice_dismissed_at: None,
recording_started_at: None,
recording_finished_at: None,
context_session_id: None,
user_notes: None,
pre_context: None,
consent: None,
consent_notice: None,
calendar_event: None,
template_slug: None,
recording_health: None,
word_count: None,
error: None,
owner_pid: None,
retry_count: 0,
};
write_job(&job).unwrap();
preserve_audio_alongside_output(&job);
let preserved_audio = output_path.with_extension("wav");
let preserved_stems = crate::capture::stem_paths_for(&preserved_audio).unwrap();
assert!(preserved_audio.exists());
assert!(preserved_stems.voice.exists());
assert!(preserved_stems.system.exists());
assert!(!audio_path.exists());
assert!(!stems.voice.exists());
assert!(!stems.system.exists());
});
}
#[test]
fn preserve_audio_alongside_output_preserves_native_mov_extension_and_stems() {
with_temp_home(|tmp| {
let jobs_root = jobs_dir();
fs::create_dir_all(&jobs_root).unwrap();
let audio_path = jobs_root.join("job-preserve-native.mov");
fs::write(&audio_path, b"mov-anchor").unwrap();
let stems = crate::capture::stem_paths_for(&audio_path).unwrap();
fs::write(&stems.voice, b"voice").unwrap();
fs::write(&stems.system, b"system").unwrap();
let output_path = tmp.path().join("meetings/native-final.md");
fs::create_dir_all(output_path.parent().unwrap()).unwrap();
fs::write(&output_path, "# native final").unwrap();
let job = ProcessingJob {
id: "job-preserve-native".into(),
mode: CaptureMode::Meeting,
content_type: ContentType::Meeting,
title: Some("preserve native".into()),
audio_path: audio_path.display().to_string(),
output_path: Some(output_path.display().to_string()),
state: JobState::Complete,
stage: None,
created_at: Local::now(),
started_at: None,
finished_at: None,
notice_dismissed_at: None,
recording_started_at: None,
recording_finished_at: None,
context_session_id: None,
user_notes: None,
pre_context: None,
consent: None,
consent_notice: None,
calendar_event: None,
template_slug: None,
recording_health: None,
word_count: None,
error: None,
owner_pid: None,
retry_count: 0,
};
write_job(&job).unwrap();
preserve_audio_alongside_output(&job);
let preserved_audio = output_path.with_extension("mov");
let preserved_stems = crate::capture::stem_paths_for(&preserved_audio).unwrap();
assert!(preserved_audio.exists());
assert!(!output_path.with_extension("wav").exists());
assert!(preserved_stems.voice.exists());
assert!(preserved_stems.system.exists());
assert!(!audio_path.exists());
assert!(!stems.voice.exists());
assert!(!stems.system.exists());
});
}
#[test]
fn no_speech_artifacts_require_review_and_preserve_capture() {
let artifact = pipeline::TranscriptArtifact {
write_result: WriteResult {
path: PathBuf::from("/tmp/review.md"),
title: "Untitled Recording".into(),
word_count: 0,
content_type: ContentType::Meeting,
},
frontmatter: Frontmatter {
title: "Untitled Recording".into(),
r#type: ContentType::Meeting,
date: Local::now(),
duration: "5m".into(),
source: None,
status: Some(OutputStatus::NoSpeech),
tags: vec![],
attendees: vec![],
attendees_raw: None,
calendar_event: None,
people: vec![],
entities: crate::markdown::EntityLinks::default(),
device: None,
captured_at: None,
context: None,
action_items: vec![],
decisions: vec![],
intents: vec![],
recorded_by: None,
consent: None,
consent_notice: None,
visibility: None,
speaker_map: vec![],
recording_health: None,
processing_warnings: Vec::new(),
template: None,
filter_diagnosis: Some("silence strip removed ALL audio".into()),
},
transcript: String::new(),
};
assert_eq!(
terminal_state_for_artifact(&artifact),
JobState::NeedsReview
);
assert!(JobState::NeedsReview.is_terminal());
}
#[test]
fn list_jobs_recovers_stale_worker_claims() {
with_temp_home(|_| {
let job = ProcessingJob {
id: "job-stale".into(),
mode: CaptureMode::Meeting,
content_type: ContentType::Meeting,
title: Some("stale".into()),
audio_path: "/tmp/fake.wav".into(),
output_path: None,
state: JobState::Transcribing,
stage: Some("Transcribing meeting".into()),
created_at: Local::now(),
started_at: Some(Local::now()),
finished_at: None,
notice_dismissed_at: None,
recording_started_at: None,
recording_finished_at: None,
context_session_id: None,
user_notes: None,
pre_context: None,
consent: None,
consent_notice: None,
calendar_event: None,
template_slug: None,
recording_health: None,
word_count: None,
error: None,
owner_pid: Some(99_999_999),
retry_count: 0,
};
write_job(&job).unwrap();
let jobs = list_jobs();
assert_eq!(jobs.len(), 1);
assert_eq!(jobs[0].state, JobState::Queued);
assert_eq!(jobs[0].owner_pid, None);
assert_eq!(jobs[0].retry_count, 1);
});
}
#[test]
fn list_jobs_demotes_to_failed_when_retry_cap_exceeded() {
with_temp_home(|_| {
let job = ProcessingJob {
id: "job-burned".into(),
mode: CaptureMode::Meeting,
content_type: ContentType::Meeting,
title: Some("burned".into()),
audio_path: "/tmp/fake.wav".into(),
output_path: None,
state: JobState::Transcribing,
stage: Some("Transcribing meeting".into()),
created_at: Local::now(),
started_at: Some(Local::now()),
finished_at: None,
notice_dismissed_at: None,
recording_started_at: None,
recording_finished_at: None,
context_session_id: None,
user_notes: None,
pre_context: None,
consent: None,
consent_notice: None,
calendar_event: None,
template_slug: None,
recording_health: None,
word_count: None,
error: None,
owner_pid: Some(99_999_999),
retry_count: MAX_AUTO_RETRIES,
};
write_job(&job).unwrap();
let _ = list_jobs();
let recovered = load_job(&job.id).expect("archived job should be loadable");
assert_eq!(recovered.state, JobState::Failed);
assert_eq!(recovered.owner_pid, None);
assert_eq!(recovered.retry_count, MAX_AUTO_RETRIES + 1);
assert!(recovered
.error
.as_deref()
.is_some_and(|e| e.contains("crashed")));
assert!(!job_path(&job.id).exists());
assert!(job_archive_path(&job.id).exists());
});
}
#[test]
fn manual_requeue_resets_retry_count() {
with_temp_home(|dir| {
let audio_path = dir.path().join("fake.wav");
let job = ProcessingJob {
id: "job-retry-reset".into(),
mode: CaptureMode::Meeting,
content_type: ContentType::Meeting,
title: Some("retry reset".into()),
audio_path: audio_path.display().to_string(),
output_path: None,
state: JobState::Failed,
stage: Some("Processing failed".into()),
created_at: Local::now(),
started_at: Some(Local::now()),
finished_at: Some(Local::now()),
notice_dismissed_at: None,
recording_started_at: None,
recording_finished_at: None,
context_session_id: None,
user_notes: None,
pre_context: None,
consent: None,
consent_notice: None,
calendar_event: None,
template_slug: None,
recording_health: None,
word_count: None,
error: Some("crashed three times".into()),
owner_pid: None,
retry_count: MAX_AUTO_RETRIES + 1,
};
write_job(&job).unwrap();
fs::write(&audio_path, b"fake-wav").unwrap();
let requeued = requeue_job(&job.id).unwrap().unwrap();
assert_eq!(requeued.state, JobState::Queued);
assert_eq!(requeued.retry_count, 0);
});
}
#[test]
fn requeue_job_preserves_existing_output_path() {
with_temp_home(|dir| {
let audio_path = dir.path().join("fake.wav");
let output_path = dir.path().join("existing.md").display().to_string();
let job = ProcessingJob {
id: "job-failed".into(),
mode: CaptureMode::Meeting,
content_type: ContentType::Meeting,
title: Some("retry me".into()),
audio_path: audio_path.display().to_string(),
output_path: Some(output_path.clone()),
state: JobState::Failed,
stage: Some("Processing failed".into()),
created_at: Local::now(),
started_at: Some(Local::now()),
finished_at: Some(Local::now()),
notice_dismissed_at: None,
recording_started_at: None,
recording_finished_at: None,
context_session_id: None,
user_notes: None,
pre_context: None,
consent: None,
consent_notice: None,
calendar_event: None,
template_slug: None,
recording_health: None,
word_count: Some(42),
error: Some("boom".into()),
owner_pid: None,
retry_count: 0,
};
write_job(&job).unwrap();
fs::write(&audio_path, b"fake-wav").unwrap();
let requeued = requeue_job(&job.id).unwrap().unwrap();
assert_eq!(requeued.id, job.id);
assert_eq!(requeued.output_path.as_deref(), Some(output_path.as_str()));
assert_eq!(requeued.state, JobState::Queued);
assert_eq!(requeued.error, None);
assert_eq!(requeued.finished_at, None);
});
}
#[test]
fn dismiss_job_notice_marks_retryable_job_and_requeue_clears_it() {
with_temp_home(|dir| {
let audio_path = dir.path().join("fake.wav");
fs::write(&audio_path, b"fake-wav").unwrap();
let job = ProcessingJob {
id: "job-dismissed".into(),
mode: CaptureMode::Meeting,
content_type: ContentType::Meeting,
title: Some("dismiss me".into()),
audio_path: audio_path.display().to_string(),
output_path: None,
state: JobState::Failed,
stage: Some("Processing failed".into()),
created_at: Local::now(),
started_at: Some(Local::now()),
finished_at: Some(Local::now()),
notice_dismissed_at: None,
recording_started_at: None,
recording_finished_at: None,
context_session_id: None,
user_notes: None,
pre_context: None,
consent: None,
consent_notice: None,
calendar_event: None,
template_slug: None,
recording_health: None,
word_count: None,
error: Some("boom".into()),
owner_pid: None,
retry_count: 0,
};
write_job(&job).unwrap();
let dismissed = dismiss_job_notice(&job.id).unwrap().unwrap();
assert!(dismissed.notice_dismissed_at.is_some());
let requeued = requeue_job(&job.id).unwrap().unwrap();
assert_eq!(requeued.state, JobState::Queued);
assert_eq!(requeued.notice_dismissed_at, None);
assert_eq!(requeued.error, None);
});
}
#[test]
fn requeue_job_rejects_non_retryable_state() {
with_temp_home(|dir| {
let audio_path = dir.path().join("fake.wav");
fs::write(&audio_path, b"fake-wav").unwrap();
let job = ProcessingJob {
id: "job-complete".into(),
mode: CaptureMode::Meeting,
content_type: ContentType::Meeting,
title: Some("done".into()),
audio_path: audio_path.display().to_string(),
output_path: Some(dir.path().join("done.md").display().to_string()),
state: JobState::Complete,
stage: None,
created_at: Local::now(),
started_at: Some(Local::now()),
finished_at: Some(Local::now()),
notice_dismissed_at: None,
recording_started_at: None,
recording_finished_at: None,
context_session_id: None,
user_notes: None,
pre_context: None,
consent: None,
consent_notice: None,
calendar_event: None,
template_slug: None,
recording_health: None,
word_count: Some(42),
error: None,
owner_pid: None,
retry_count: 0,
};
write_job(&job).unwrap();
let requeued = requeue_job(&job.id).unwrap();
assert!(requeued.is_none());
let unchanged = load_job(&job.id).unwrap();
assert_eq!(unchanged.state, JobState::Complete);
assert_eq!(unchanged.output_path, job.output_path);
assert_eq!(unchanged.finished_at, job.finished_at);
});
}
#[test]
fn processing_summary_prefers_active_work_over_newer_queued_jobs() {
with_temp_home(|_| {
let active = ProcessingJob {
id: "job-active".into(),
mode: CaptureMode::Meeting,
content_type: ContentType::Meeting,
title: Some("Older active job".into()),
audio_path: "/tmp/old.wav".into(),
output_path: None,
state: JobState::Transcribing,
stage: Some("Transcribing meeting".into()),
created_at: Local::now() - chrono::Duration::minutes(5),
started_at: Some(Local::now() - chrono::Duration::minutes(4)),
finished_at: None,
notice_dismissed_at: None,
recording_started_at: None,
recording_finished_at: None,
context_session_id: None,
user_notes: None,
pre_context: None,
consent: None,
consent_notice: None,
calendar_event: None,
template_slug: None,
recording_health: None,
word_count: None,
error: None,
owner_pid: None,
retry_count: 0,
};
let queued = ProcessingJob {
id: "job-queued".into(),
mode: CaptureMode::Meeting,
content_type: ContentType::Meeting,
title: Some("Newer queued job".into()),
audio_path: "/tmp/new.wav".into(),
output_path: None,
state: JobState::Queued,
stage: Some("Queued for processing".into()),
created_at: Local::now(),
started_at: None,
finished_at: None,
notice_dismissed_at: None,
recording_started_at: None,
recording_finished_at: None,
context_session_id: None,
user_notes: None,
pre_context: None,
consent: None,
consent_notice: None,
calendar_event: None,
template_slug: None,
recording_health: None,
word_count: None,
error: None,
owner_pid: None,
retry_count: 0,
};
write_job(&active).unwrap();
write_job(&queued).unwrap();
let summary = processing_summary().unwrap();
assert_eq!(summary.id, "job-active");
assert_eq!(summary.state, JobState::Transcribing);
});
}
fn make_test_job(id: &str, state: JobState) -> ProcessingJob {
ProcessingJob {
id: id.into(),
mode: CaptureMode::Meeting,
content_type: ContentType::Meeting,
title: Some(format!("title-{id}")),
audio_path: format!("/tmp/{id}.wav"),
output_path: None,
state,
stage: state.default_stage(),
created_at: Local::now(),
started_at: None,
finished_at: None,
notice_dismissed_at: None,
recording_started_at: None,
recording_finished_at: None,
context_session_id: None,
user_notes: None,
pre_context: None,
consent: None,
consent_notice: None,
calendar_event: None,
template_slug: None,
recording_health: None,
word_count: None,
error: None,
owner_pid: None,
retry_count: 0,
}
}
#[test]
fn migration_moves_terminal_jobs_to_archive() {
with_temp_home(|_| {
let terminal = make_test_job("job-old-complete", JobState::Complete);
let queued = make_test_job("job-still-queued", JobState::Queued);
write_job(&terminal).unwrap();
write_job(&queued).unwrap();
assert!(job_path(&terminal.id).exists());
migrate_terminal_jobs_to_archive().unwrap();
assert!(
!job_path(&terminal.id).exists(),
"terminal job should be moved out of active dir"
);
assert!(
job_archive_path(&terminal.id).exists(),
"terminal job should now live in archive"
);
assert!(
job_path(&queued.id).exists(),
"queued job stays in active dir"
);
});
}
#[test]
fn migration_is_idempotent_when_already_archived() {
with_temp_home(|_| {
let job = make_test_job("job-double-migrate", JobState::Failed);
fs::create_dir_all(archive_dir()).unwrap();
write_job_to(&job, &job_archive_path(&job.id)).unwrap();
write_job(&job).unwrap();
migrate_terminal_jobs_to_archive().unwrap();
assert!(!job_path(&job.id).exists());
assert!(job_archive_path(&job.id).exists());
let loaded = load_job(&job.id).unwrap();
assert_eq!(loaded.state, JobState::Failed);
});
}
#[test]
fn move_to_archive_preserves_canonical_copy_on_already_exists() {
with_temp_home(|_| {
let job_id = "job-race";
let canonical = make_test_job(job_id, JobState::Complete);
fs::create_dir_all(archive_dir()).unwrap();
write_job_to(&canonical, &job_archive_path(job_id)).unwrap();
let mut stale = make_test_job(job_id, JobState::Failed);
stale.error = Some("stale snapshot".into());
write_job(&stale).unwrap();
move_to_archive(&job_path(job_id), &job_archive_path(job_id)).unwrap();
assert!(
!job_path(job_id).exists(),
"active duplicate must be dropped after race detection"
);
let archived = load_job(job_id).unwrap();
assert_eq!(
archived.state,
JobState::Complete,
"canonical archive copy must survive the race"
);
assert!(
archived.error.is_none(),
"stale state must not have leaked into archive"
);
});
}
#[test]
fn ensure_archive_initialized_writes_marker_and_skips_on_repeat() {
with_temp_home(|_| {
let terminal = make_test_job("job-marker", JobState::Complete);
write_job(&terminal).unwrap();
assert!(!jobs_dir().join(MIGRATION_MARKER).exists());
ensure_archive_initialized();
assert!(
jobs_dir().join(MIGRATION_MARKER).exists(),
"marker should be written after a successful sweep"
);
assert!(job_archive_path(&terminal.id).exists());
let later = make_test_job("job-after-marker", JobState::Failed);
write_job(&later).unwrap();
ensure_archive_initialized();
assert!(
job_path(&later.id).exists(),
"post-marker terminal jobs are routed by update_job_state, not by migration"
);
});
}
#[test]
fn ensure_archive_initialized_writes_marker_when_jobs_dir_is_missing() {
with_temp_home(|_| {
assert!(
!jobs_dir().exists(),
"fresh installs should start without a jobs directory"
);
ensure_archive_initialized();
assert!(
jobs_dir().join(MIGRATION_MARKER).exists(),
"fresh installs should still record a quiet successful migration"
);
});
}
#[test]
fn update_job_state_moves_to_archive_on_terminal_transition() {
with_temp_home(|_| {
let job = make_test_job("job-becoming-terminal", JobState::Transcribing);
write_job(&job).unwrap();
assert!(job_path(&job.id).exists());
assert!(!job_archive_path(&job.id).exists());
update_job_state(&job.id, |j| j.state = JobState::Complete)
.unwrap()
.unwrap();
assert!(
!job_path(&job.id).exists(),
"terminal job should leave active dir on transition"
);
assert!(
job_archive_path(&job.id).exists(),
"terminal job should land in archive"
);
let loaded = load_job(&job.id).unwrap();
assert_eq!(loaded.state, JobState::Complete);
});
}
#[test]
fn update_job_state_moves_back_to_active_on_requeue() {
with_temp_home(|_| {
let job = make_test_job("job-requeue", JobState::Failed);
fs::create_dir_all(archive_dir()).unwrap();
write_job_to(&job, &job_archive_path(&job.id)).unwrap();
update_job_state(&job.id, |j| {
j.state = JobState::Queued;
j.error = None;
})
.unwrap()
.unwrap();
assert!(
job_path(&job.id).exists(),
"requeued job should land back in active dir"
);
assert!(
!job_archive_path(&job.id).exists(),
"archive copy should be cleaned up after requeue"
);
let loaded = load_job(&job.id).unwrap();
assert_eq!(loaded.state, JobState::Queued);
assert_eq!(loaded.error, None);
});
}
#[test]
fn update_job_state_no_move_when_dir_unchanged() {
with_temp_home(|_| {
let job = make_test_job("job-no-move", JobState::Queued);
write_job(&job).unwrap();
update_job_state(&job.id, |j| j.state = JobState::Transcribing)
.unwrap()
.unwrap();
assert!(job_path(&job.id).exists());
assert!(!job_archive_path(&job.id).exists());
assert_eq!(load_job(&job.id).unwrap().state, JobState::Transcribing);
});
}
#[test]
fn load_job_finds_archived_jobs_via_fallback() {
with_temp_home(|_| {
let job = make_test_job("job-only-in-archive", JobState::Complete);
fs::create_dir_all(archive_dir()).unwrap();
write_job_to(&job, &job_archive_path(&job.id)).unwrap();
let loaded = load_job(&job.id).unwrap();
assert_eq!(loaded.id, job.id);
assert_eq!(loaded.state, JobState::Complete);
});
}
#[test]
fn list_jobs_raw_does_not_see_archive_subdir() {
with_temp_home(|_| {
let active_job = make_test_job("job-active-list", JobState::Transcribing);
let archived_job = make_test_job("job-archived-list", JobState::Complete);
write_job(&active_job).unwrap();
fs::create_dir_all(archive_dir()).unwrap();
write_job_to(&archived_job, &job_archive_path(&archived_job.id)).unwrap();
let raw = list_jobs_raw();
let ids: Vec<&str> = raw.iter().map(|j| j.id.as_str()).collect();
assert!(ids.contains(&"job-active-list"));
assert!(
!ids.contains(&"job-archived-list"),
"list_jobs_raw must skip archive subdir contents"
);
});
}
#[test]
fn display_jobs_with_terminal_includes_archive_contents() {
with_temp_home(|_| {
let active_job = make_test_job("job-active-display", JobState::Transcribing);
let archived_job = make_test_job("job-archived-display", JobState::Complete);
write_job(&active_job).unwrap();
fs::create_dir_all(archive_dir()).unwrap();
write_job_to(&archived_job, &job_archive_path(&archived_job.id)).unwrap();
let active_only = display_jobs(None, false);
let active_ids: Vec<&str> = active_only.iter().map(|j| j.id.as_str()).collect();
assert!(active_ids.contains(&"job-active-display"));
assert!(!active_ids.contains(&"job-archived-display"));
let all = display_jobs(None, true);
let all_ids: Vec<&str> = all.iter().map(|j| j.id.as_str()).collect();
assert!(all_ids.contains(&"job-active-display"));
assert!(all_ids.contains(&"job-archived-display"));
});
}
#[test]
fn latest_terminal_job_picks_finished_at_over_created_at() {
with_temp_home(|_| {
fs::create_dir_all(archive_dir()).unwrap();
let now = Local::now();
let mut older_created = make_test_job("job-old-created", JobState::Complete);
older_created.created_at = now - chrono::Duration::hours(3);
older_created.finished_at = Some(now - chrono::Duration::seconds(10));
let mut newer_created = make_test_job("job-new-created", JobState::Complete);
newer_created.created_at = now - chrono::Duration::minutes(5);
newer_created.finished_at = Some(now - chrono::Duration::minutes(4));
write_job_to(&older_created, &job_archive_path(&older_created.id)).unwrap();
write_job_to(&newer_created, &job_archive_path(&newer_created.id)).unwrap();
let latest = latest_terminal_job().expect("at least one terminal job");
assert_eq!(
latest.id, "job-old-created",
"finished_at must dominate created_at in the latest-terminal selection"
);
});
}
#[test]
fn latest_terminal_job_falls_back_to_created_at_when_finished_at_missing() {
with_temp_home(|_| {
fs::create_dir_all(archive_dir()).unwrap();
let now = Local::now();
let mut earlier = make_test_job("job-earlier", JobState::Complete);
earlier.created_at = now - chrono::Duration::hours(2);
earlier.finished_at = None;
let mut later = make_test_job("job-later", JobState::Failed);
later.created_at = now - chrono::Duration::minutes(1);
later.finished_at = None;
write_job_to(&earlier, &job_archive_path(&earlier.id)).unwrap();
write_job_to(&later, &job_archive_path(&later.id)).unwrap();
let latest = latest_terminal_job().expect("at least one terminal job");
assert_eq!(latest.id, "job-later");
});
}
#[test]
fn latest_terminal_job_returns_none_when_archive_empty() {
with_temp_home(|_| {
assert!(latest_terminal_job().is_none());
});
}
}