use crate::config::Config;
use crate::error::{LiveTranscriptError, MinutesError, TranscribeError};
use crate::pid;
use crate::streaming::AudioStream;
use crate::streaming_whisper::StreamingWhisper;
use crate::vad::Vad;
use chrono::{DateTime, Local};
use serde::{Deserialize, Serialize};
use std::fs::{File, OpenOptions};
use std::io::{BufRead, BufReader, BufWriter, Write};
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TranscriptLine {
pub line: usize,
pub ts: DateTime<Local>,
pub offset_ms: u64,
pub duration_ms: u64,
pub text: String,
pub speaker: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SessionStatus {
pub active: bool,
pub pid: Option<u32>,
pub line_count: usize,
pub duration_secs: f64,
pub jsonl_path: Option<String>,
}
struct LiveTranscriptWriter {
jsonl_writer: BufWriter<File>,
wav_writer: Option<hound::WavWriter<BufWriter<File>>>,
line_count: usize,
start_time: std::time::Instant,
start_wall: DateTime<Local>,
jsonl_path: PathBuf,
jsonl_failed: bool,
wav_failed: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LiveStatus {
pub start_time: DateTime<Local>,
pub line_count: usize,
pub last_offset_ms: u64,
pub last_duration_ms: u64,
}
impl LiveTranscriptWriter {
fn new(config: &Config) -> Result<Self, MinutesError> {
let jsonl_path = pid::live_transcript_jsonl_path();
if let Some(parent) = jsonl_path.parent() {
std::fs::create_dir_all(parent)?;
}
let jsonl_file = OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(&jsonl_path)?;
set_permissions_0600(&jsonl_path);
let jsonl_writer = BufWriter::new(jsonl_file);
let wav_writer = if config.live_transcript.save_wav {
let wav_path = pid::live_transcript_wav_path();
let spec = hound::WavSpec {
channels: 1,
sample_rate: 16000,
bits_per_sample: 16,
sample_format: hound::SampleFormat::Int,
};
match hound::WavWriter::create(&wav_path, spec) {
Ok(w) => {
set_permissions_0600(&wav_path);
Some(w)
}
Err(e) => {
tracing::warn!("could not create WAV file, continuing without: {}", e);
None
}
}
} else {
None
};
let start_wall = Local::now();
let writer = Self {
jsonl_writer,
wav_writer,
line_count: 0,
start_time: std::time::Instant::now(),
start_wall,
jsonl_path,
jsonl_failed: false,
wav_failed: false,
};
writer.write_sidecar();
Ok(writer)
}
fn write_sidecar(&self) {
let status = LiveStatus {
start_time: self.start_wall,
line_count: self.line_count,
last_offset_ms: self.start_time.elapsed().as_millis() as u64,
last_duration_ms: 0,
};
let path = pid::live_transcript_status_path();
let tmp = path.with_extension("json.tmp");
if let Ok(json) = serde_json::to_string(&status) {
if std::fs::write(&tmp, json).is_ok() {
std::fs::rename(&tmp, &path).ok();
}
}
}
fn write_utterance(&mut self, text: &str, duration_secs: f64) -> bool {
if text.trim().is_empty() {
return true; }
if self.jsonl_failed {
return false; }
self.line_count += 1;
let offset = self.start_time.elapsed();
let line = TranscriptLine {
line: self.line_count,
ts: Local::now(),
offset_ms: offset.as_millis() as u64,
duration_ms: (duration_secs * 1000.0) as u64,
text: text.trim().to_string(),
speaker: None,
};
match serde_json::to_string(&line) {
Ok(json) => {
if let Err(e) = writeln!(self.jsonl_writer, "{}", json) {
tracing::error!("JSONL write failed (disk full?): {}", e);
self.jsonl_failed = true;
return false;
} else if let Err(e) = self.jsonl_writer.flush() {
tracing::error!("JSONL flush failed: {}", e);
self.jsonl_failed = true;
return false;
}
}
Err(e) => {
tracing::error!("failed to serialize transcript line: {}", e);
}
}
self.write_sidecar();
true
}
fn write_audio(&mut self, samples: &[f32]) {
if self.wav_failed {
return;
}
if let Some(ref mut writer) = self.wav_writer {
for &sample in samples {
let s = (sample * 32767.0).clamp(-32768.0, 32767.0) as i16;
if let Err(e) = writer.write_sample(s) {
tracing::warn!("WAV write failed (disk full?), continuing without: {}", e);
self.wav_failed = true;
return;
}
}
}
}
fn finalize(mut self) -> (usize, f64, PathBuf) {
if let Some(writer) = self.wav_writer.take() {
if let Err(e) = writer.finalize() {
tracing::warn!("WAV finalize failed: {}", e);
}
}
let duration = self.start_time.elapsed().as_secs_f64();
(self.line_count, duration, self.jsonl_path)
}
}
#[cfg(feature = "whisper")]
pub fn run(
stop_flag: Arc<AtomicBool>,
config: &Config,
) -> Result<(usize, f64, PathBuf), MinutesError> {
if let Ok(Some(_)) = pid::check_recording() {
return Err(LiveTranscriptError::RecordingActive.into());
}
let dict_pid = pid::dictation_pid_path();
if let Ok(Some(_)) = pid::check_pid_file(&dict_pid) {
return Err(LiveTranscriptError::DictationActive.into());
}
pid::check_and_clear_sentinel();
let lt_pid = pid::live_transcript_pid_path();
let _pid_guard = pid::create_pid_guard(<_pid).map_err(|e| match e {
crate::error::PidError::AlreadyRecording(pid) => {
MinutesError::LiveTranscript(LiveTranscriptError::AlreadyActive(pid))
}
other => MinutesError::Pid(other),
})?;
run_inner(stop_flag, config)
}
#[cfg(feature = "whisper")]
fn run_inner(
stop_flag: Arc<AtomicBool>,
config: &Config,
) -> Result<(usize, f64, PathBuf), MinutesError> {
let whisper_ctx = {
let model_path = if config.live_transcript.model.is_empty() {
crate::transcribe::resolve_model_path_for_dictation(config)?
} else {
crate::transcribe::resolve_model_path_by_name(&config.live_transcript.model, config)?
};
tracing::info!(model = %model_path.display(), "loading whisper model for live transcript");
whisper_rs::WhisperContext::new_with_params(
model_path
.to_str()
.ok_or_else(|| TranscribeError::ModelLoadError("invalid path".into()))?,
whisper_rs::WhisperContextParameters::default(),
)
.map_err(|e| TranscribeError::ModelLoadError(format!("{}", e)))?
};
let device_override = config.recording.device.as_deref();
let mut stream = AudioStream::start(device_override)?;
tracing::info!(device = %stream.device_name, "live transcript audio stream started");
let mut device_monitor = crate::device_monitor::DeviceMonitor::new(&stream.device_name);
let mut writer = LiveTranscriptWriter::new(config)?;
let mut vad = Vad::new();
let mut streaming = StreamingWhisper::new(config.transcription.language.clone());
let mut was_speaking = false;
let mut utterance_samples: usize = 0;
let max_utterance_secs = config.live_transcript.max_utterance_secs.max(5);
let max_utterance_samples = (max_utterance_secs as usize).saturating_mul(16000);
tracing::info!("live transcript session started");
loop {
if stop_flag.load(Ordering::Relaxed) {
if utterance_samples > 0 {
if let Some(sr) = streaming.finalize(&whisper_ctx) {
writer.write_utterance(&sr.text, sr.duration_secs);
}
}
break;
}
if pid::check_and_clear_sentinel() {
if utterance_samples > 0 {
if let Some(sr) = streaming.finalize(&whisper_ctx) {
writer.write_utterance(&sr.text, sr.duration_secs);
}
}
break;
}
if stream.has_error() || device_monitor.has_device_changed() {
let old_name = stream.device_name.clone();
tracing::info!(device = %old_name, "audio stream error or device change — reconnecting");
drop(stream);
match AudioStream::start(device_override) {
Ok(new_stream) => {
tracing::info!(
old = %old_name, new = %new_stream.device_name,
"live transcript audio stream reconnected"
);
device_monitor.update_device(&new_stream.device_name);
stream = new_stream;
continue;
}
Err(e) => {
tracing::error!("live transcript reconnect failed: {}", e);
if utterance_samples > 0 {
if let Some(sr) = streaming.finalize(&whisper_ctx) {
writer.write_utterance(&sr.text, sr.duration_secs);
}
}
break;
}
}
}
let chunk = match stream
.receiver
.recv_timeout(std::time::Duration::from_millis(100))
{
Ok(chunk) => chunk,
Err(crossbeam_channel::RecvTimeoutError::Timeout) => continue,
Err(crossbeam_channel::RecvTimeoutError::Disconnected) => {
let old_name = stream.device_name.clone();
tracing::warn!("audio stream disconnected — attempting reconnect");
match AudioStream::start(device_override) {
Ok(new_stream) => {
tracing::info!(
old = %old_name, new = %new_stream.device_name,
"live transcript audio stream reconnected after disconnect"
);
device_monitor.update_device(&new_stream.device_name);
stream = new_stream;
continue;
}
Err(e) => {
tracing::error!("reconnect after disconnect failed: {}", e);
if utterance_samples > 0 {
if let Some(sr) = streaming.finalize(&whisper_ctx) {
writer.write_utterance(&sr.text, sr.duration_secs);
}
}
break;
}
}
}
};
writer.write_audio(&chunk.samples);
let vad_result = vad.process(chunk.rms);
if vad_result.speaking {
was_speaking = true;
utterance_samples += chunk.samples.len();
if let Some(_sr) = streaming.feed(&chunk.samples, &whisper_ctx) {
}
if utterance_samples >= max_utterance_samples {
tracing::info!("max utterance duration reached, force-finalizing");
if let Some(sr) = streaming.finalize(&whisper_ctx) {
if !writer.write_utterance(&sr.text, sr.duration_secs) {
tracing::error!(
"JSONL write failed — stopping session to prevent data loss"
);
break;
}
}
streaming.reset();
utterance_samples = 0;
was_speaking = false;
}
} else if was_speaking && utterance_samples > 0 {
if let Some(sr) = streaming.finalize(&whisper_ctx) {
if !writer.write_utterance(&sr.text, sr.duration_secs) {
tracing::error!("JSONL write failed — stopping session to prevent data loss");
break;
}
}
streaming.reset();
utterance_samples = 0;
was_speaking = false;
}
}
let (lines, duration, path) = writer.finalize();
tracing::info!(
lines = lines,
duration_secs = format!("{:.1}", duration),
"live transcript session ended"
);
Ok((lines, duration, path))
}
#[cfg(not(feature = "whisper"))]
pub fn run(
_stop_flag: Arc<AtomicBool>,
_config: &Config,
) -> Result<(usize, f64, PathBuf), MinutesError> {
Err(
TranscribeError::ModelLoadError("live transcript requires the whisper feature".into())
.into(),
)
}
pub fn read_since_line(since_line: usize) -> Result<Vec<TranscriptLine>, MinutesError> {
let path = pid::live_transcript_jsonl_path();
if !path.exists() {
return Ok(Vec::new());
}
let file = File::open(&path)?;
let reader = BufReader::new(file);
let mut lines = Vec::new();
for line_result in reader.lines() {
let line_str = match line_result {
Ok(s) => s,
Err(e) => {
tracing::warn!("skipping unreadable JSONL line: {}", e);
continue;
}
};
if line_str.trim().is_empty() {
continue;
}
match serde_json::from_str::<TranscriptLine>(&line_str) {
Ok(tl) if tl.line > since_line => lines.push(tl),
Ok(_) => {} Err(e) => {
tracing::warn!("skipping malformed JSONL line: {}", e);
}
}
}
Ok(lines)
}
pub fn read_since_duration(duration_ms: u64) -> Result<Vec<TranscriptLine>, MinutesError> {
let path = pid::live_transcript_jsonl_path();
if !path.exists() {
return Ok(Vec::new());
}
let all = read_since_line(0)?;
if all.is_empty() {
return Ok(all);
}
let ms = i64::try_from(duration_ms).unwrap_or(i64::MAX);
let cutoff = Local::now() - chrono::Duration::milliseconds(ms);
Ok(all.into_iter().filter(|l| l.ts >= cutoff).collect())
}
pub fn session_status() -> SessionStatus {
let lt_pid = pid::live_transcript_pid_path();
let pid = pid::check_pid_file(<_pid).ok().flatten();
let active = pid.is_some();
let jsonl_path = pid::live_transcript_jsonl_path();
let status_path = pid::live_transcript_status_path();
let (line_count, duration_secs) = if let Ok(content) = std::fs::read_to_string(&status_path) {
if let Ok(status) = serde_json::from_str::<LiveStatus>(&content) {
let elapsed = (Local::now() - status.start_time).num_seconds().max(0) as f64;
let dur = if active {
elapsed
} else {
(status.last_offset_ms + status.last_duration_ms) as f64 / 1000.0
};
(status.line_count, dur)
} else {
(0, 0.0)
}
} else {
let lines = if jsonl_path.exists() {
read_since_line(0).unwrap_or_default()
} else {
Vec::new()
};
let count = lines.len();
let dur = lines
.last()
.map(|l| (l.offset_ms + l.duration_ms) as f64 / 1000.0)
.unwrap_or(0.0);
(count, dur)
};
SessionStatus {
active,
pid,
line_count,
duration_secs,
jsonl_path: if jsonl_path.exists() {
Some(jsonl_path.to_string_lossy().to_string())
} else {
None
},
}
}
fn set_permissions_0600(path: &std::path::Path) {
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let _ = std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o600));
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
use tempfile::NamedTempFile;
#[test]
fn test_transcript_line_roundtrip() {
let line = TranscriptLine {
line: 1,
ts: Local::now(),
offset_ms: 5000,
duration_ms: 3200,
text: "hello world".into(),
speaker: None,
};
let json = serde_json::to_string(&line).unwrap();
let parsed: TranscriptLine = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.line, 1);
assert_eq!(parsed.text, "hello world");
assert_eq!(parsed.offset_ms, 5000);
assert_eq!(parsed.duration_ms, 3200);
assert!(parsed.speaker.is_none());
}
#[test]
fn test_read_since_line_filters() {
let mut tmpfile = NamedTempFile::new().unwrap();
for i in 1..=5 {
let line = TranscriptLine {
line: i,
ts: Local::now(),
offset_ms: i as u64 * 10000,
duration_ms: 3000,
text: format!("utterance {}", i),
speaker: None,
};
writeln!(tmpfile, "{}", serde_json::to_string(&line).unwrap()).unwrap();
}
let file = File::open(tmpfile.path()).unwrap();
let reader = BufReader::new(file);
let mut lines = Vec::new();
for line_result in reader.lines() {
let line_str = line_result.unwrap();
if let Ok(tl) = serde_json::from_str::<TranscriptLine>(&line_str) {
if tl.line > 3 {
lines.push(tl);
}
}
}
assert_eq!(lines.len(), 2);
assert_eq!(lines[0].line, 4);
assert_eq!(lines[1].line, 5);
}
#[test]
fn test_session_status_no_session() {
let status = session_status();
assert!(status.duration_secs >= 0.0);
}
#[test]
fn test_empty_utterance_skipped() {
let line = TranscriptLine {
line: 1,
ts: Local::now(),
offset_ms: 0,
duration_ms: 0,
text: "".into(),
speaker: None,
};
assert!(line.text.trim().is_empty());
}
}