use anyhow::Result;
use std::sync::{Arc, Mutex};
use tokio::signal;
use tokio::sync::{broadcast, Mutex as TokioMutex, watch};
use tokio::task::JoinHandle;
use tracing::{error, info, warn};
use crate::audio::device::{find_device_by_name, select_audio_device, AudioDevice};
use crate::audio::stream_manager::AudioStreamManager;
use crate::audio_state::RecordingState;
use crate::config::{AppConfig, TranscriptionMode, TranscriptionProviderAPI};
use crate::hotkey_service::{AppContext, HotkeyService};
use crate::providers::{self, TranscriptionProvider};
use crate::shutdown_handler::{ExitPriority, ShutdownManager};
use tower::{Service, util::ServiceExt};
use super::audio::constants::SAMPLE_RATE;
use tokio_util::sync::CancellationToken;
pub struct App {
config: AppConfig,
stream_manager: Option<Arc<TokioMutex<AudioStreamManager>>>,
provider: Arc<TokioMutex<Box<dyn TranscriptionProvider + Send + Sync>>>,
shutdown_manager: Arc<ShutdownManager>,
task_handles: Vec<JoinHandle<()>>,
recording_state: Arc<RecordingState>,
transcription_cancel: Option<watch::Sender<bool>>,
transcription_handle: Option<JoinHandle<()>>,
pipeline_cancel_token: Option<CancellationToken>,
pipeline_handle: Option<JoinHandle<()>>,
}
impl App {
pub async fn new(config: AppConfig) -> Result<Self> {
let shutdown_manager = Arc::new(ShutdownManager::new("tts-groq"));
let recording_state = Arc::new(RecordingState::new(
config.mode == TranscriptionMode::AlwaysOn,
));
let provider: Box<dyn TranscriptionProvider + Send + Sync> = match config
.transcription_provider
{
TranscriptionProviderAPI::Groq => Box::new(providers::GroqProvider::new().await),
TranscriptionProviderAPI::OpenAI => Box::new(providers::OpenAIProvider::new().await),
};
Ok(Self {
config,
stream_manager: None,
provider: Arc::new(TokioMutex::new(provider)),
shutdown_manager,
task_handles: Vec::new(),
recording_state,
transcription_cancel: None,
transcription_handle: None,
pipeline_cancel_token: None,
pipeline_handle: None,
})
}
pub async fn run(&mut self) -> Result<()> {
let device = self.initialize_audio_device().await?;
let provider = self.provider.clone();
let stream_manager = Arc::new(TokioMutex::new(AudioStreamManager::new(device, provider)?));
self.stream_manager = Some(stream_manager.clone());
if let Some(manager) = &self.stream_manager {
AudioStreamManager::register_shutdown_handler(manager.clone(), &self.shutdown_manager);
}
if let Some(manager) = &self.stream_manager {
let mut mgr = manager.lock().await;
mgr.start_stream().await?;
}
self.start_tasks().await?;
info!("Waiting for shutdown signal...");
self.wait_for_shutdown().await?;
Ok(())
}
async fn initialize_audio_device(&self) -> Result<Arc<AudioDevice>> {
if let Some(device_name) = &self.config.device {
let host = cpal::default_host();
if let Ok(device) = find_device_by_name(&host, device_name) {
return Ok(device);
}
warn!(
"Specified device '{}' not found, falling back to default",
device_name
);
}
select_audio_device(&cpal::default_host())
}
async fn start_tasks(&mut self) -> Result<()> {
if let TranscriptionMode::Hotkey = self.config.mode {
let mut hotkey_service = HotkeyService::new(
self.recording_state.clone(),
self.shutdown_manager.force_shutdown_tx.subscribe(),
)?;
hotkey_service.register_hotkey(&self.config.hotkey)?;
let app_context = Arc::new(AppContext {
stream_manager: self.stream_manager.as_ref().unwrap().clone(),
provider: self.provider.clone(),
pipeline_cancel_token: Mutex::new(None),
pipeline_handle: Mutex::new(None),
config: self.config.clone(),
});
hotkey_service.set_app_context(app_context);
let hotkey_service = Arc::new(hotkey_service);
HotkeyService::register_shutdown_handler(
hotkey_service.clone(),
&self.shutdown_manager,
);
let hotkey_handle = tokio::task::spawn_local(async move {
hotkey_service.run().await;
});
self.task_handles.push(hotkey_handle);
}
Ok(())
}
fn convert_samples_to_wav(samples: &[f32]) -> Result<Vec<u8>> {
use hound::{SampleFormat, WavSpec, WavWriter};
use std::io::Cursor;
let spec = WavSpec {
channels: 1,
sample_rate: SAMPLE_RATE,
bits_per_sample: 16,
sample_format: SampleFormat::Int,
};
let mut buffer = Vec::new();
{
let mut writer = WavWriter::new(Cursor::new(&mut buffer), spec)?;
for &sample in samples {
let amplitude = if sample >= 1.0 {
i16::MAX
} else if sample <= -1.0 {
i16::MIN
} else {
(sample * i16::MAX as f32) as i16
};
writer.write_sample(amplitude)?;
}
writer.finalize()?;
}
Ok(buffer)
}
async fn wait_for_shutdown(&mut self) -> Result<()> {
info!("Waiting for shutdown signal...");
let shutdown_manager = Arc::clone(&self.shutdown_manager);
tokio::select! {
_ = signal::ctrl_c() => {
info!("Received Ctrl+C signal");
let _ = shutdown_manager.force_shutdown_tx.send(());
shutdown_manager.execute_shutdown().await;
}
_ = shutdown_manager.wait_for_shutdown() => {
info!("Received shutdown signal");
}
}
if let Some(manager) = &self.stream_manager {
let mut mgr = manager.lock().await;
mgr.stop_stream().await?;
}
if let Some(token) = self.pipeline_cancel_token.take() {
token.cancel();
info!("Shutdown: cancellation signal sent to pipeline");
}
if let Some(handle) = self.pipeline_handle.take() {
use tokio::time::{timeout, Duration};
match timeout(Duration::from_secs(2), handle).await {
Ok(Ok(_)) => info!("Pipeline task finished after shutdown cancellation"),
Ok(Err(e)) => error!("Pipeline task panicked during shutdown: {}", e),
Err(_) => warn!("Timeout waiting for pipeline task to finish during shutdown"),
}
}
for mut handle in self.task_handles.drain(..) {
match tokio::time::timeout(std::time::Duration::from_secs(5), &mut handle).await {
Ok(join_result) => {
if let Err(e) = join_result {
warn!("Task panicked or failed: {}", e);
}
}
Err(_) => {
warn!("Task did not complete within timeout, aborting");
handle.abort();
}
}
}
info!("Shutdown complete");
Ok(())
}
}