use std::sync::Arc;
use crate::audio::{generate_recording_filename, save_to_wav};
use super::queue::{QueuedSegment, TranscriptionQueue};
const RING_BUFFER_CAPACITY: usize = 48000 * 30 * 2;
const MANUAL_MAX_BUFFER_SAMPLES: usize = 48000 * 60 * 30;
const PROCESSED_MAX_BUFFER_SAMPLES: usize = 48000 * 60 * 30;
const OVERFLOW_THRESHOLD_PERCENT: usize = 90;
const MAX_SEGMENT_DURATION_MS: u64 = 4000;
const WORD_BREAK_GRACE_MS: u64 = 750;
const WORD_BREAK_ACTIVATION_MS: u64 = 2000;
const MIN_SEGMENT_DURATION_MS: u64 = 500;
const MIN_AUDIO_RMS_THRESHOLD: f32 = 0.01;
const WORD_BREAK_PRE_MARGIN_MS: u64 = 30;
pub struct SegmentRingBuffer {
buffer: Vec<f32>,
write_pos: usize,
capacity: usize,
total_written: u64,
}
impl SegmentRingBuffer {
pub fn new(capacity: usize) -> Self {
Self {
buffer: vec![0.0; capacity],
write_pos: 0,
capacity,
total_written: 0,
}
}
pub fn with_default_capacity() -> Self {
Self::new(RING_BUFFER_CAPACITY)
}
pub fn write(&mut self, samples: &[f32]) {
for &sample in samples {
self.buffer[self.write_pos] = sample;
self.write_pos = (self.write_pos + 1) % self.capacity;
self.total_written += 1;
}
}
pub fn write_position(&self) -> usize {
self.write_pos
}
pub fn capacity(&self) -> usize {
self.capacity
}
pub fn segment_length(&self, start_idx: usize) -> usize {
if self.write_pos >= start_idx {
self.write_pos - start_idx
} else {
(self.capacity - start_idx) + self.write_pos
}
}
pub fn index_from_lookback(&self, lookback_samples: usize) -> usize {
if lookback_samples >= self.capacity {
self.write_pos
} else if lookback_samples <= self.write_pos {
self.write_pos - lookback_samples
} else {
self.capacity - (lookback_samples - self.write_pos)
}
}
pub fn is_approaching_overflow(&self, start_idx: usize) -> bool {
let segment_len = self.segment_length(start_idx);
let threshold = (self.capacity * OVERFLOW_THRESHOLD_PERCENT) / 100;
segment_len >= threshold
}
pub fn extract_segment(&self, start_idx: usize) -> Vec<f32> {
self.extract_segment_to(start_idx, self.write_pos)
}
pub fn extract_segment_to(&self, start_idx: usize, end_idx: usize) -> Vec<f32> {
let segment_len = if end_idx >= start_idx {
end_idx - start_idx
} else {
(self.capacity - start_idx) + end_idx
};
if segment_len == 0 {
return Vec::new();
}
let mut result = Vec::with_capacity(segment_len);
if end_idx >= start_idx {
result.extend_from_slice(&self.buffer[start_idx..end_idx]);
} else {
result.extend_from_slice(&self.buffer[start_idx..]);
result.extend_from_slice(&self.buffer[..end_idx]);
}
result
}
pub fn clear(&mut self) {
self.write_pos = 0;
self.total_written = 0;
}
}
pub trait TranscribeStateCallback: Send + Sync + 'static {
fn on_recording_saved(&self, path: String);
fn on_queue_update(&self, depth: usize);
}
pub struct TranscribeState {
pub ring_buffer: SegmentRingBuffer,
pub is_active: bool,
pub in_speech: bool,
pub segment_start_idx: usize,
pub sample_rate: u32,
pub channels: u16,
pub transcription_queue: Arc<TranscriptionQueue>,
segment_sample_count: u64,
seeking_word_break: bool,
word_break_seek_start_samples: u64,
lookback_sample_count: usize,
vad_offset_base_ms: u64,
callback: Option<Arc<dyn TranscribeStateCallback>>,
pub manual_recording: bool,
manual_audio_buffer: Vec<f32>,
manual_buffer_full_warned: bool,
processed_audio_buffer: Vec<f32>,
processed_buffer_full_warned: bool,
playback_source_stem: Option<String>,
}
impl TranscribeState {
pub fn new(transcription_queue: Arc<TranscriptionQueue>) -> Self {
Self {
ring_buffer: SegmentRingBuffer::with_default_capacity(),
is_active: false,
in_speech: false,
segment_start_idx: 0,
sample_rate: 48000,
channels: 2,
transcription_queue,
segment_sample_count: 0,
seeking_word_break: false,
word_break_seek_start_samples: 0,
lookback_sample_count: 0,
vad_offset_base_ms: 0,
callback: None,
manual_recording: false,
manual_audio_buffer: Vec::new(),
manual_buffer_full_warned: false,
processed_audio_buffer: Vec::new(),
processed_buffer_full_warned: false,
playback_source_stem: None,
}
}
pub fn set_playback_source_stem(&mut self, stem: Option<String>) {
if let Some(ref s) = stem {
tracing::info!(
"[TranscribeState] Playback reprocessing mode: stem = {:?}",
s
);
}
self.playback_source_stem = stem;
}
pub fn set_manual_recording(&mut self, enabled: bool) {
self.manual_recording = enabled;
if enabled {
tracing::debug!(
"[TranscribeState] Manual recording enabled - VAD segmentation disabled"
);
} else {
tracing::debug!(
"[TranscribeState] Manual recording disabled - automatic VAD segmentation active"
);
}
}
pub fn save_recording_wav(&mut self) {
let raw_samples: Vec<f32> = std::mem::take(&mut self.manual_audio_buffer);
self.manual_buffer_full_warned = false;
let processed_samples: Vec<f32> = std::mem::take(&mut self.processed_audio_buffer);
self.processed_buffer_full_warned = false;
if raw_samples.is_empty() {
tracing::debug!("[TranscribeState] save_recording_wav: no audio accumulated");
return;
}
tracing::info!(
"[TranscribeState] save_recording_wav: saving {} raw samples ({:.1}s)",
raw_samples.len(),
raw_samples.len() as f64 / (self.sample_rate as f64 * self.channels as f64),
);
let is_playback_reprocess = self.playback_source_stem.is_some();
let recordings_dir = crate::audio::recordings_dir();
if let Err(e) = std::fs::create_dir_all(&recordings_dir) {
tracing::error!(
"[TranscribeState] Failed to create recordings directory: {}",
e
);
return;
}
let stem = self
.playback_source_stem
.clone()
.unwrap_or_else(crate::audio::generate_recording_stem);
let raw_path = recordings_dir.join(format!("{}.wav", stem));
if !is_playback_reprocess {
match save_to_wav(&raw_samples, self.sample_rate, 1, &raw_path) {
Ok(()) => {
tracing::info!(
"[TranscribeState] Saved raw recording WAV to: {:?}",
raw_path
);
}
Err(e) => {
tracing::error!("[TranscribeState] Failed to save raw recording WAV: {}", e);
}
}
} else {
tracing::info!(
"[TranscribeState] Playback reprocessing — skipping raw WAV write for {:?}",
raw_path
);
}
if !processed_samples.is_empty() {
let proc_filename = format!("{}-processed.wav", stem);
let proc_path = recordings_dir.join(&proc_filename);
match save_to_wav(&processed_samples, self.sample_rate, 1, &proc_path) {
Ok(()) => {
tracing::info!(
"[TranscribeState] Saved processed recording WAV to: {:?}",
proc_path
);
}
Err(e) => {
tracing::error!(
"[TranscribeState] Failed to save processed recording WAV: {}",
e
);
}
}
}
if let Some(ref cb) = self.callback {
cb.on_recording_saved(raw_path.to_string_lossy().to_string());
}
}
pub fn submit_recording(&mut self) {
if !self.is_active {
return;
}
let raw_segment: Vec<f32> = std::mem::take(&mut self.manual_audio_buffer);
self.manual_buffer_full_warned = false;
let processed_segment: Vec<f32> = std::mem::take(&mut self.processed_audio_buffer);
self.processed_buffer_full_warned = false;
let (transcription_segment, transcription_channels) = if !processed_segment.is_empty() {
(processed_segment.clone(), 1u16)
} else if !raw_segment.is_empty() {
tracing::warn!(
"[TranscribeState] submit_recording: processed buffer is empty, \
falling back to raw audio for transcription"
);
(raw_segment.clone(), self.channels)
} else {
tracing::debug!("[TranscribeState] submit_recording: no audio accumulated");
return;
};
tracing::info!(
"[TranscribeState] submit_recording: submitting {} processed samples ({:.1}s)",
transcription_segment.len(),
transcription_segment.len() as f64 / self.sample_rate as f64,
);
let mut proc_wav_path: Option<std::path::PathBuf> = None;
let is_playback_reprocess = self.playback_source_stem.is_some();
let recordings_dir = crate::audio::recordings_dir();
if let Ok(()) = std::fs::create_dir_all(&recordings_dir) {
let stem = self
.playback_source_stem
.clone()
.unwrap_or_else(crate::audio::generate_recording_stem);
if !is_playback_reprocess && !raw_segment.is_empty() {
let raw_path = recordings_dir.join(format!("{}.wav", stem));
match save_to_wav(&raw_segment, self.sample_rate, 1, &raw_path) {
Ok(()) => {
tracing::info!("[TranscribeState] Saved raw WAV: {:?}", raw_path);
}
Err(e) => {
tracing::error!("[TranscribeState] Failed to save raw WAV: {}", e);
}
}
}
if !processed_segment.is_empty() {
let proc_path = recordings_dir.join(format!("{}-processed.wav", stem));
match save_to_wav(&processed_segment, self.sample_rate, 1, &proc_path) {
Ok(()) => {
tracing::info!("[TranscribeState] Saved processed WAV: {:?}", proc_path);
proc_wav_path = Some(proc_path.clone());
if let Some(ref cb) = self.callback {
cb.on_recording_saved(proc_path.to_string_lossy().to_string());
}
}
Err(e) => {
tracing::error!("[TranscribeState] Failed to save processed WAV: {}", e);
}
}
}
} else {
tracing::error!("[TranscribeState] Failed to create recordings directory");
}
self.in_speech = false;
self.segment_sample_count = 0;
self.seeking_word_break = false;
self.lookback_sample_count = 0;
self.segment_start_idx = self.ring_buffer.write_position();
self.enqueue_for_transcription(
transcription_segment,
transcription_channels,
proc_wav_path,
);
}
pub fn set_callback(&mut self, callback: Arc<dyn TranscribeStateCallback>) {
self.callback = Some(callback);
}
pub fn clear_callback(&mut self) {
self.callback = None;
}
pub fn init_for_capture(&mut self, sample_rate: u32, channels: u16) {
self.sample_rate = sample_rate;
self.channels = channels;
self.ring_buffer.clear();
self.in_speech = false;
self.segment_start_idx = 0;
self.segment_sample_count = 0;
self.seeking_word_break = false;
self.word_break_seek_start_samples = 0;
self.lookback_sample_count = 0;
self.vad_offset_base_ms = 0;
self.manual_audio_buffer.clear();
self.manual_buffer_full_warned = false;
self.processed_audio_buffer.clear();
self.processed_buffer_full_warned = false;
}
pub fn activate(&mut self) {
self.is_active = true;
self.in_speech = false;
self.segment_start_idx = 0;
self.segment_sample_count = 0;
self.seeking_word_break = false;
self.word_break_seek_start_samples = 0;
self.lookback_sample_count = 0;
self.vad_offset_base_ms = 0;
}
pub fn deactivate(&mut self) {
self.is_active = false;
self.in_speech = false;
self.seeking_word_break = false;
}
pub fn write_manual_buffer(&mut self, samples: &[f32]) {
let remaining_capacity =
MANUAL_MAX_BUFFER_SAMPLES.saturating_sub(self.manual_audio_buffer.len());
if remaining_capacity == 0 {
if !self.manual_buffer_full_warned {
tracing::warn!(
"[TranscribeState] Manual recording has reached the 30-minute maximum \
buffer limit. Further audio will be discarded."
);
self.manual_buffer_full_warned = true;
}
return;
}
let samples_to_write = samples.len().min(remaining_capacity);
self.manual_audio_buffer
.extend_from_slice(&samples[..samples_to_write]);
if samples_to_write < samples.len() && !self.manual_buffer_full_warned {
tracing::warn!(
"[TranscribeState] Manual recording has reached the 30-minute maximum \
buffer limit. Further audio will be discarded."
);
self.manual_buffer_full_warned = true;
}
}
pub fn write_processed_buffer(&mut self, mono_samples: &[f32]) {
let remaining_capacity =
PROCESSED_MAX_BUFFER_SAMPLES.saturating_sub(self.processed_audio_buffer.len());
if remaining_capacity == 0 {
if !self.processed_buffer_full_warned {
tracing::warn!(
"[TranscribeState] Processed recording buffer has reached the 30-minute \
maximum. Further processed audio will be discarded."
);
self.processed_buffer_full_warned = true;
}
return;
}
let samples_to_write = mono_samples.len().min(remaining_capacity);
self.processed_audio_buffer
.extend_from_slice(&mono_samples[..samples_to_write]);
if samples_to_write < mono_samples.len() && !self.processed_buffer_full_warned {
tracing::warn!(
"[TranscribeState] Processed recording buffer has reached the 30-minute \
maximum. Further processed audio will be discarded."
);
self.processed_buffer_full_warned = true;
}
}
pub fn process_samples(&mut self, samples: &[f32]) -> Option<Vec<f32>> {
if !self.is_active {
return None;
}
let overflow_segment = if self.in_speech
&& self
.ring_buffer
.is_approaching_overflow(self.segment_start_idx)
{
let segment = self.ring_buffer.extract_segment(self.segment_start_idx);
self.segment_start_idx = self.ring_buffer.write_position();
self.segment_sample_count = 0;
self.seeking_word_break = false;
self.lookback_sample_count = 0;
tracing::debug!(
"[TranscribeState] Buffer overflow - extracted partial segment ({} samples)",
segment.len()
);
Some(segment)
} else {
None
};
self.ring_buffer.write(samples);
if self.in_speech {
self.segment_sample_count += samples.len() as u64;
let duration_ms = self.samples_to_ms(self.segment_sample_count);
if !self.seeking_word_break && duration_ms >= MAX_SEGMENT_DURATION_MS {
self.seeking_word_break = true;
self.word_break_seek_start_samples = self.segment_sample_count;
tracing::debug!(
"[TranscribeState] Duration threshold reached ({}ms), seeking word break",
duration_ms
);
}
if self.seeking_word_break {
let samples_since_seek =
self.segment_sample_count - self.word_break_seek_start_samples;
let grace_ms = self.samples_to_ms(samples_since_seek);
if grace_ms >= WORD_BREAK_GRACE_MS {
let forced = self.force_segment_extraction();
if forced.is_some() {
return forced;
}
}
}
}
if let Some(segment) = overflow_segment.clone() {
self.queue_segment_with_channels(segment, 1);
}
overflow_segment
}
pub fn on_speech_started(&mut self, lookback_samples: usize) {
if !self.is_active {
return;
}
let lookback_stereo_samples = lookback_samples * self.channels as usize;
self.in_speech = true;
self.segment_start_idx = self
.ring_buffer
.index_from_lookback(lookback_stereo_samples);
self.segment_sample_count = 0;
self.seeking_word_break = false;
self.lookback_sample_count = lookback_stereo_samples;
self.vad_offset_base_ms = 0;
tracing::debug!(
"[TranscribeState] Speech started, segment_start_idx={}, lookback={} mono -> {} stereo",
self.segment_start_idx,
lookback_samples,
lookback_stereo_samples
);
}
pub fn on_speech_ended(&mut self) -> Option<Vec<f32>> {
if !self.is_active || !self.in_speech {
return None;
}
let segment = self.ring_buffer.extract_segment(self.segment_start_idx);
self.in_speech = false;
self.segment_sample_count = 0;
self.seeking_word_break = false;
self.lookback_sample_count = 0;
if segment.is_empty() {
tracing::debug!("[TranscribeState] Speech ended but segment is empty");
return None;
}
tracing::debug!(
"[TranscribeState] Speech ended, extracted {} samples",
segment.len()
);
self.queue_segment_with_channels(segment.clone(), 1);
Some(segment)
}
fn samples_to_ms(&self, samples: u64) -> u64 {
let frames = samples / self.channels as u64;
frames * 1000 / self.sample_rate as u64
}
fn ms_to_samples(&self, ms: u64) -> u64 {
let frames = ms * self.sample_rate as u64 / 1000;
frames * self.channels as u64
}
pub fn on_word_break(&mut self, offset_ms: u32, gap_duration_ms: u32) -> Option<Vec<f32>> {
if !self.is_active || !self.in_speech {
return None;
}
let relative_offset_ms = (offset_ms as u64).saturating_sub(self.vad_offset_base_ms);
let current_duration_ms = self.samples_to_ms(self.segment_sample_count);
if current_duration_ms < WORD_BREAK_ACTIVATION_MS {
tracing::debug!(
"[TranscribeState] Word break at vad_offset {}ms (relative {}ms) ignored (segment only {}ms < {}ms activation threshold)",
offset_ms,
relative_offset_ms,
current_duration_ms,
WORD_BREAK_ACTIVATION_MS
);
return None;
}
let gap_start_ms = relative_offset_ms;
let extraction_point_ms = gap_start_ms.saturating_sub(WORD_BREAK_PRE_MARGIN_MS);
let extraction_point_samples = self.ms_to_samples(extraction_point_ms);
let extraction_length = self.lookback_sample_count as u64 + extraction_point_samples;
let total_segment_samples = self.lookback_sample_count as u64 + self.segment_sample_count;
let extraction_length = extraction_length.min(total_segment_samples);
let extraction_duration_ms = self.samples_to_ms(extraction_length);
if extraction_duration_ms < MIN_SEGMENT_DURATION_MS {
tracing::debug!(
"[TranscribeState] Word break would create segment too short ({}ms < {}ms), skipping",
extraction_duration_ms,
MIN_SEGMENT_DURATION_MS
);
return None;
}
if extraction_length == 0 {
tracing::debug!(
"[TranscribeState] Word break at offset {}ms but no samples to extract",
offset_ms
);
return None;
}
let extraction_end_idx =
(self.segment_start_idx + extraction_length as usize) % self.ring_buffer.capacity();
let segment = self.extract_segment_to(extraction_end_idx);
if segment.is_empty() {
tracing::debug!("[TranscribeState] Word break extraction produced empty segment");
return None;
}
tracing::debug!(
"[TranscribeState] Word break split (vad_offset: {}ms, relative: {}ms, gap: {}ms), extracted {} samples ({} lookback + {} speech) at extraction point {}ms",
offset_ms,
relative_offset_ms,
gap_duration_ms,
segment.len(),
self.lookback_sample_count,
extraction_point_samples,
extraction_point_ms
);
self.queue_segment_with_channels(segment.clone(), 1);
self.segment_start_idx = extraction_end_idx;
self.lookback_sample_count = 0;
self.segment_sample_count = self
.segment_sample_count
.saturating_sub(extraction_point_samples);
self.vad_offset_base_ms += extraction_point_ms;
self.seeking_word_break = false;
self.word_break_seek_start_samples = 0;
Some(segment)
}
fn extract_segment_to(&self, end_idx: usize) -> Vec<f32> {
self.ring_buffer
.extract_segment_to(self.segment_start_idx, end_idx)
}
fn force_segment_extraction(&mut self) -> Option<Vec<f32>> {
if !self.is_active || !self.in_speech {
return None;
}
let segment = self.ring_buffer.extract_segment(self.segment_start_idx);
if segment.is_empty() {
tracing::debug!("[TranscribeState] Grace period expired but segment is empty");
self.seeking_word_break = false;
return None;
}
tracing::debug!(
"[TranscribeState] Grace period expired, force extracted {} samples ({} lookback + {} speech)",
segment.len(),
self.lookback_sample_count,
self.segment_sample_count
);
self.queue_segment_with_channels(segment.clone(), 1);
self.segment_start_idx = self.ring_buffer.write_position();
self.segment_sample_count = 0;
self.lookback_sample_count = 0;
self.seeking_word_break = false;
Some(segment)
}
#[allow(dead_code)]
fn is_segment_valid_for_transcription(&self, samples: &[f32]) -> bool {
self.is_segment_valid_for_transcription_ch(samples, self.channels)
}
fn is_segment_valid_for_transcription_ch(&self, samples: &[f32], channels: u16) -> bool {
if samples.is_empty() {
return false;
}
let frames = samples.len() as u64 / channels as u64;
let duration_ms = frames * 1000 / self.sample_rate as u64;
if duration_ms < MIN_SEGMENT_DURATION_MS {
tracing::debug!(
"[TranscribeState] Segment too short ({}ms < {}ms), skipping",
duration_ms,
MIN_SEGMENT_DURATION_MS
);
return false;
}
let sum_squares: f32 = samples.iter().map(|s| s * s).sum();
let rms = (sum_squares / samples.len() as f32).sqrt();
if rms < MIN_AUDIO_RMS_THRESHOLD {
tracing::debug!(
"[TranscribeState] Segment too quiet (RMS {:.6} < {:.6}), skipping",
rms,
MIN_AUDIO_RMS_THRESHOLD
);
return false;
}
true
}
#[allow(dead_code)]
fn queue_segment(&self, samples: Vec<f32>) {
self.queue_segment_with_channels(samples, self.channels);
}
fn queue_segment_with_channels(&self, samples: Vec<f32>, channels: u16) {
if samples.is_empty() {
return;
}
if !self.is_segment_valid_for_transcription_ch(&samples, channels) {
return;
}
let filename = generate_recording_filename();
let recordings_dir = crate::audio::recordings_dir();
if let Err(e) = std::fs::create_dir_all(&recordings_dir) {
tracing::error!(
"[TranscribeState] Failed to create recordings directory: {}",
e
);
}
let output_path = recordings_dir.join(&filename);
let wav_path = match save_to_wav(&samples, self.sample_rate, channels, &output_path) {
Ok(()) => {
tracing::info!("[TranscribeState] Saved segment to: {:?}", output_path);
Some(output_path)
}
Err(e) => {
tracing::error!("[TranscribeState] Failed to save WAV: {}", e);
None
}
};
self.enqueue_for_transcription(samples, channels, wav_path);
}
fn enqueue_for_transcription(
&self,
samples: Vec<f32>,
channels: u16,
wav_path: Option<std::path::PathBuf>,
) {
if samples.is_empty() {
return;
}
if !self.is_segment_valid_for_transcription_ch(&samples, channels) {
return;
}
let queued = QueuedSegment {
samples,
sample_rate: self.sample_rate,
channels,
wav_path,
};
if !self.transcription_queue.enqueue(queued) {
tracing::warn!("[TranscribeState] Transcription queue is full, segment dropped");
}
let depth = self.transcription_queue.queue_depth();
if let Some(ref cb) = self.callback {
cb.on_queue_update(depth);
}
}
pub fn finalize(&mut self) -> Option<Vec<f32>> {
if self.manual_recording {
self.submit_recording();
None
} else if self.in_speech {
self.on_speech_ended()
} else {
None
}
}
}