stt-cli 0.2.1

Speech to text Cli using Groq API and OpenAI API
use futures::future::{select, Either};
use futures::FutureExt;
use std::future::Future;
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use tokio::sync::{broadcast, oneshot};
use tokio::task::JoinHandle;
use tokio::time::timeout;
use tracing::{debug, error, info, warn};

/// A type-erased closure that will be executed during shutdown
type ShutdownAction =
    Box<dyn FnOnce() -> Box<dyn Future<Output = ()> + Unpin + Send> + Send + Sync>;

/// Exit priority level for shutdown actions
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
pub enum ExitPriority {
    /// First actions to perform during shutdown
    First = 0,
    /// Normal priority actions
    Normal = 50,
    /// Last actions to perform during shutdown
    Last = 100,
}

pub enum ShutdownActionStatus {
    /// Action completed successfully
    Completed,
    /// Action timed out
    TimedOut,
    /// Action was forced to terminate
    Forced,
    /// Action failed with an error
    Failed,
}

/// An action to be executed during the shutdown process
struct ExitHandler {
    /// Human-readable name for the action
    name: String,
    /// The function to call during shutdown
    action: ShutdownAction,
    /// Priority that determines execution order
    priority: ExitPriority,
}

/// Result of a shutdown operation
#[derive(Debug, Clone)]
pub struct ShutdownResult {
    /// Number of actions that completed successfully
    pub completed: usize,
    /// Number of actions that timed out
    pub timed_out: usize,
    /// Number of actions that were forced to terminate
    pub forced: usize,
    /// Number of actions that failed with an error
    pub failed: usize,
}

/// Manages the graceful shutdown process for the application
///
/// The ShutdownManager allows for registering shutdown actions
/// that will be executed in priority order when the application
/// is shutting down.
pub struct ShutdownManager {
    handlers: Arc<Mutex<Vec<ExitHandler>>>,
    timeout_duration: Duration,
    pub force_shutdown_tx: broadcast::Sender<()>,
    name: String,
    shutdown_started: Arc<AtomicBool>, // Idempotency guard
}

impl Default for ShutdownManager {
    fn default() -> Self {
        Self::new("app")
    }
}

impl ShutdownManager {
    /// Create a new ShutdownManager with default settings
    pub fn new(name: &str) -> Self {
        let (force_shutdown_tx, _) = broadcast::channel(1);
        Self {
            handlers: Arc::new(Mutex::new(Vec::new())),
            timeout_duration: Duration::from_secs(5),
            force_shutdown_tx,
            name: name.to_string(),
            shutdown_started: Arc::new(AtomicBool::new(false)),
        }
    }

    /// Set the timeout duration for each shutdown action
    pub fn with_timeout(mut self, duration: Duration) -> Self {
        self.timeout_duration = duration;
        self
    }

    /// Register an async action to be executed during shutdown
    pub fn register<F, Fut>(&self, name: &str, priority: ExitPriority, action: F)
    where
        F: FnOnce() -> Fut + Send + Sync + 'static,
        Fut: Future<Output = ()> + Send + 'static,
    {
        let boxed_action = Box::new(move || {
            let fut = action();
            Box::new(Box::pin(fut)) as Box<dyn Future<Output = ()> + Unpin + Send>
        });

        let mut handlers = self.handlers.lock().unwrap();
        handlers.push(ExitHandler {
            name: name.to_string(),
            action: boxed_action,
            priority,
        });

        debug!(
            "[{}] Registered shutdown handler: {} (priority: {:?})",
            self.name, name, priority
        );
    }

    /// Execute a single shutdown action with timeout and force support
    async fn execute_action(
        &self,
        handler: ExitHandler,
        force_rx: &mut broadcast::Receiver<()>,
    ) -> ShutdownActionStatus {
        info!(
            "[{}] Executing shutdown handler: {} (priority: {:?})",
            self.name, handler.name, handler.priority
        );

        let future = (handler.action)();
        let timeout_fut = Box::pin(timeout(self.timeout_duration, future));
        let force_fut = Box::pin(force_rx.recv().fuse());

        match select(force_fut, timeout_fut).await {
            // Force signal received
            Either::Left((_, _)) => {
                warn!(
                    "[{}] Shutdown handler forced to terminate: {}",
                    self.name, handler.name
                );
                ShutdownActionStatus::Forced
            }
            // Completed or timed out
            Either::Right((timeout_result, _)) => match timeout_result {
                Ok(_) => {
                    info!(
                        "[{}] Shutdown handler completed: {}",
                        self.name, handler.name
                    );
                    ShutdownActionStatus::Completed
                }
                Err(_) => {
                    error!(
                        "[{}] Shutdown handler timed out after {:?}: {}",
                        self.name, self.timeout_duration, handler.name
                    );
                    ShutdownActionStatus::TimedOut
                }
            },
        }
    }

    /// Execute all registered shutdown actions in priority order
    pub async fn execute_shutdown(&self) -> ShutdownResult {
        // Idempotency: Only allow shutdown once
        if self.shutdown_started.swap(true, Ordering::SeqCst) {
            warn!("Shutdown already in progress, ignoring duplicate request");
            return ShutdownResult { completed: 0, timed_out: 0, forced: 0, failed: 0 };
        }
        info!("Starting shutdown sequence for {}", self.name);

        let handlers = {
            let mut handlers = self.handlers.lock().unwrap();
            // Sort by priority
            handlers.sort_by_key(|h| h.priority);
            std::mem::take(&mut *handlers)
        };

        let mut result = ShutdownResult {
            completed: 0,
            timed_out: 0,
            forced: 0,
            failed: 0,
        };

        let mut force_rx = self.force_shutdown_tx.subscribe();

        for handler in handlers {
            let status = self.execute_action(handler, &mut force_rx).await;

            match status {
                ShutdownActionStatus::Completed => result.completed += 1,
                ShutdownActionStatus::TimedOut => {
                    result.timed_out += 1;
                    // If an action times out, consider sending a force signal
                    // to expedite the remaining shutdowns
                    if result.timed_out >= 2 {
                        warn!(
                            "[{}] Multiple actions timed out, forcing remaining shutdowns",
                            self.name
                        );
                        let _ = self.force_shutdown_tx.send(());
                    }
                }
                ShutdownActionStatus::Forced => result.forced += 1,
                ShutdownActionStatus::Failed => result.failed += 1,
            }
        }

        info!(
            "[{}] Graceful shutdown sequence completed: {:?}",
            self.name, result
        );
        result
    }

    /// Execute a forced shutdown, skipping any pending actions
    pub async fn force_shutdown(&self) -> ShutdownResult {
        // Idempotency: Only allow shutdown once
        if self.shutdown_started.swap(true, Ordering::SeqCst) {
            warn!("Shutdown already in progress, ignoring duplicate request");
            return ShutdownResult { completed: 0, timed_out: 0, forced: 0, failed: 0 };
        }
        warn!("Force shutdown triggered for {}", self.name);

        // Still call execute_shutdown to process any registered handlers
        // but they'll all receive the force signal
        self.execute_shutdown().await
    }

    /// Wait for termination signals
    pub async fn wait_for_shutdown(&self) {
        crate::shutdown_handler::wait_for_signal().await;
    }
}

/// Wait for termination signals (Ctrl+C/SIGINT, SIGTERM, SIGQUIT on Unix)
#[cfg(unix)]
pub async fn wait_for_signal() {
    use tokio::signal::unix::{signal, SignalKind};
    let mut sigterm = signal(SignalKind::terminate()).expect("Failed to setup SIGTERM channel");
    let mut sigquit = signal(SignalKind::quit()).expect("Failed to setup SIGQUIT channel");

    tokio::select! {
        _ = tokio::signal::ctrl_c() => {
            info!("Received SIGINT (Ctrl+C)");
        },
        _ = sigterm.recv() => {
            info!("Received SIGTERM");
        },
        _ = sigquit.recv() => {
            info!("Received SIGQUIT");
        }
    }
}

/// Wait for termination signals (only Ctrl+C on non-Unix platforms)
#[cfg(not(unix))]
pub async fn wait_for_signal() {
    tokio::signal::ctrl_c()
        .await
        .expect("Failed to listen for Ctrl+C");
    info!("Received SIGINT (Ctrl+C)");
}

#[cfg(test)]
mod tests {
    use super::*;
}

// TODO: Review all shutdown handlers to ensure they log start, completion, and errors/timeouts
// TODO: Ensure all registered handlers are robust to double invocation or forced shutdown