stt-cli 0.2.1

Speech to text Cli using Groq API and OpenAI API
// src/app.rs
//
// This module defines the main application structure and lifecycle.
// It coordinates all the components and manages the application state.

use anyhow::Result;
use std::sync::{Arc, Mutex};
use tokio::signal;
use tokio::sync::{broadcast, Mutex as TokioMutex, watch};
use tokio::task::JoinHandle;
use tracing::{error, info, warn};

use crate::audio::device::{find_device_by_name, select_audio_device, AudioDevice};
use crate::audio::stream_manager::AudioStreamManager;
use crate::audio_state::RecordingState;
use crate::config::{AppConfig, TranscriptionMode, TranscriptionProviderAPI};
use crate::hotkey_service::{AppContext, HotkeyService};
use crate::providers::{self, TranscriptionProvider};
use crate::shutdown_handler::{ExitPriority, ShutdownManager};
use tower::{Service, util::ServiceExt};
use super::audio::constants::SAMPLE_RATE;
use tokio_util::sync::CancellationToken;

/// Main application structure
pub struct App {
    /// Application configuration
    config: AppConfig,
    /// Audio stream manager
    stream_manager: Option<Arc<TokioMutex<AudioStreamManager>>>,
    /// Transcription provider
    provider: Arc<TokioMutex<Box<dyn TranscriptionProvider + Send + Sync>>>,
    /// Shutdown manager
    shutdown_manager: Arc<ShutdownManager>,
    /// Task handles
    task_handles: Vec<JoinHandle<()>>,
    /// Recording state
    recording_state: Arc<RecordingState>,
    /// Transcription cancellation channel
    transcription_cancel: Option<watch::Sender<bool>>,
    /// Handle for the transcription task
    transcription_handle: Option<JoinHandle<()>>,
    /// Cancellation token for pipeline/capture tasks (None if not running)
    pipeline_cancel_token: Option<CancellationToken>,
    /// JoinHandle for the pipeline task (None if not running)
    pipeline_handle: Option<JoinHandle<()>>,
}

impl App {
    /// Create a new application instance
    pub async fn new(config: AppConfig) -> Result<Self> {
        let shutdown_manager = Arc::new(ShutdownManager::new("tts-groq"));
        let recording_state = Arc::new(RecordingState::new(
            config.mode == TranscriptionMode::AlwaysOn,
        ));

        let provider: Box<dyn TranscriptionProvider + Send + Sync> = match config
            .transcription_provider
        {
            TranscriptionProviderAPI::Groq => Box::new(providers::GroqProvider::new().await),
            TranscriptionProviderAPI::OpenAI => Box::new(providers::OpenAIProvider::new().await),
        };

        Ok(Self {
            config,
            stream_manager: None,
            provider: Arc::new(TokioMutex::new(provider)),
            shutdown_manager,
            task_handles: Vec::new(),
            recording_state,
            transcription_cancel: None,
            transcription_handle: None,
            pipeline_cancel_token: None,
            pipeline_handle: None,
        })
    }

    /// Run the application
    pub async fn run(&mut self) -> Result<()> {
        // Initialize audio device
        let device = self.initialize_audio_device().await?;
        let provider = self.provider.clone();

        // Create and initialize stream manager
        let stream_manager = Arc::new(TokioMutex::new(AudioStreamManager::new(device, provider)?));
        self.stream_manager = Some(stream_manager.clone());

        // Register shutdown handler for audio stream
        if let Some(manager) = &self.stream_manager {
            AudioStreamManager::register_shutdown_handler(manager.clone(), &self.shutdown_manager);
        }

        // Start the stream
        if let Some(manager) = &self.stream_manager {
            let mut mgr = manager.lock().await;
            mgr.start_stream().await?;
        }

        // Start background tasks
        self.start_tasks().await?;

        // Wait for shutdown signal and handle cleanup
        info!("Waiting for shutdown signal...");
        self.wait_for_shutdown().await?;

        Ok(())
    }

    /// Initialize the audio device
    async fn initialize_audio_device(&self) -> Result<Arc<AudioDevice>> {
        // Try to find the device specified in config
        if let Some(device_name) = &self.config.device {
            let host = cpal::default_host();
            if let Ok(device) = find_device_by_name(&host, device_name) {
                return Ok(device);
            }
            warn!(
                "Specified device '{}' not found, falling back to default",
                device_name
            );
        }

        // Fall back to default device
        select_audio_device(&cpal::default_host())
    }

    /// Start all background tasks
    async fn start_tasks(&mut self) -> Result<()> {
        // Initialize hotkey service if in hotkey mode
        if let TranscriptionMode::Hotkey = self.config.mode {
            let mut hotkey_service = HotkeyService::new(
                self.recording_state.clone(),
                self.shutdown_manager.force_shutdown_tx.subscribe(),
            )?;

            // Register hotkey
            hotkey_service.register_hotkey(&self.config.hotkey)?;

            // Create app context for pipeline operations
            let app_context = Arc::new(AppContext {
                stream_manager: self.stream_manager.as_ref().unwrap().clone(),
                provider: self.provider.clone(),
                pipeline_cancel_token: Mutex::new(None),
                pipeline_handle: Mutex::new(None),
                config: self.config.clone(),
            });

            // Set app context
            hotkey_service.set_app_context(app_context);

            // Wrap in Arc
            let hotkey_service = Arc::new(hotkey_service);

            // Register shutdown handler
            HotkeyService::register_shutdown_handler(
                hotkey_service.clone(),
                &self.shutdown_manager,
            );

            // Start the hotkey service
            let hotkey_handle = tokio::task::spawn_local(async move {
                hotkey_service.run().await;
            });
            self.task_handles.push(hotkey_handle);
        }
        Ok(())
    }

    /// Convert raw PCM samples to WAV format
    fn convert_samples_to_wav(samples: &[f32]) -> Result<Vec<u8>> {
        use hound::{SampleFormat, WavSpec, WavWriter};
        use std::io::Cursor;

        let spec = WavSpec {
            channels: 1,
            sample_rate: SAMPLE_RATE,
            bits_per_sample: 16,
            sample_format: SampleFormat::Int,
        };

        let mut buffer = Vec::new();
        {
            let mut writer = WavWriter::new(Cursor::new(&mut buffer), spec)?;
            for &sample in samples {
                let amplitude = if sample >= 1.0 {
                    i16::MAX
                } else if sample <= -1.0 {
                    i16::MIN
                } else {
                    (sample * i16::MAX as f32) as i16
                };
                writer.write_sample(amplitude)?;
            }
            writer.finalize()?;
        }
        Ok(buffer)
    }

    /// Wait for shutdown signal
    async fn wait_for_shutdown(&mut self) -> Result<()> {
        info!("Waiting for shutdown signal...");

        let shutdown_manager = Arc::clone(&self.shutdown_manager);
        tokio::select! {
            _ = signal::ctrl_c() => {
                info!("Received Ctrl+C signal");
                // Send shutdown signal and execute shutdown handlers
                let _ = shutdown_manager.force_shutdown_tx.send(());
                shutdown_manager.execute_shutdown().await;
            }
            _ = shutdown_manager.wait_for_shutdown() => {
                info!("Received shutdown signal");
            }
        }

        // Stop the audio stream
        if let Some(manager) = &self.stream_manager {
            let mut mgr = manager.lock().await;
            mgr.stop_stream().await?;
        }

        // Cancel and await pipeline task if running
        if let Some(token) = self.pipeline_cancel_token.take() {
            token.cancel();
            info!("Shutdown: cancellation signal sent to pipeline");
        }
        if let Some(handle) = self.pipeline_handle.take() {
            use tokio::time::{timeout, Duration};
            match timeout(Duration::from_secs(2), handle).await {
                Ok(Ok(_)) => info!("Pipeline task finished after shutdown cancellation"),
                Ok(Err(e)) => error!("Pipeline task panicked during shutdown: {}", e),
                Err(_) => warn!("Timeout waiting for pipeline task to finish during shutdown"),
            }
        }

        // Wait for other tasks to complete with timeout
        for mut handle in self.task_handles.drain(..) {
            match tokio::time::timeout(std::time::Duration::from_secs(5), &mut handle).await {
                Ok(join_result) => {
                    if let Err(e) = join_result {
                        warn!("Task panicked or failed: {}", e);
                    }
                }
                Err(_) => {
                    warn!("Task did not complete within timeout, aborting");
                    handle.abort();
                }
            }
        }

        info!("Shutdown complete");
        Ok(())
    }
}

// All shutdown handlers now log start, completion, timeout, and errors; handlers are robust to double invocation.