use anyhow::{Context, Result};
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex as StdMutex,
};
use thiserror::Error;
use tokio::sync::{broadcast, Mutex as TokioMutex};
use tracing::{error, info};
use super::buffer::AudioBuffer;
use super::constants::{CHUNK_DURATION_MS, SAMPLE_RATE};
use super::device::AudioDevice;
use super::stream::AudioStream;
use crate::audio_state::RecordingState;
use crate::providers::TranscriptionProvider;
use crate::shutdown_handler::ShutdownManager;
#[derive(Debug, thiserror::Error)]
pub enum AudioStreamError {
#[error("Failed to initialize stream: {0}")]
InitializationError(String),
#[error("Stream disconnected: {0}")]
DisconnectionError(String),
#[error("Buffer error: {0}")]
BufferError(String),
}
pub struct AudioStreamManager {
stream: Option<AudioStream>,
buffer: Arc<StdMutex<AudioBuffer>>,
recording_state: RecordingState,
device: Arc<AudioDevice>,
provider: Arc<TokioMutex<Box<dyn TranscriptionProvider + Send + Sync>>>,
chunk_receiver: Option<broadcast::Receiver<Vec<f32>>>,
}
impl AudioStreamManager {
pub fn new(
device: Arc<AudioDevice>,
provider: Arc<TokioMutex<Box<dyn TranscriptionProvider + Send + Sync>>>,
) -> Result<Self> {
let recording_state = RecordingState::new(false);
let buffer = Arc::new(StdMutex::new(AudioBuffer::new(
recording_state.clone(),
SAMPLE_RATE,
std::time::Duration::from_millis(CHUNK_DURATION_MS),
)));
Ok(Self {
stream: None,
buffer,
recording_state,
device,
provider,
chunk_receiver: None,
})
}
pub async fn start_stream(&mut self) -> Result<()> {
if self.stream.is_some() {
info!("Stream already running");
return Ok(());
}
let recording = Arc::new(AtomicBool::new(true));
let stream = AudioStream::from_device(self.device.clone(), recording)
.await
.context("Failed to create audio stream")?;
self.chunk_receiver = Some(stream.subscribe().await);
self.stream = Some(stream);
info!("Audio stream started successfully");
Ok(())
}
pub async fn stop_stream(&mut self) -> Result<()> {
if let Some(stream) = self.stream.take() {
stream
.stop()
.await
.with_context(|| "Failed to stop audio stream")?;
info!("Audio stream stopped successfully");
}
self.chunk_receiver = None;
Ok(())
}
pub fn get_receiver(&self) -> Option<broadcast::Receiver<Vec<f32>>> {
self.chunk_receiver.as_ref().map(|r| r.resubscribe())
}
pub fn register_shutdown_handler(
manager: Arc<TokioMutex<Self>>,
shutdown_manager: &ShutdownManager,
) {
shutdown_manager.register(
"Stop audio stream manager",
crate::shutdown_handler::ExitPriority::Normal,
move || async move {
let mut mgr = manager.lock().await;
let _ = mgr.stop_stream().await;
},
);
}
}