tcrm-task 0.4.2

Task execution unit for TCRM project
Documentation
use std::time::SystemTime;

use crate::tasks::{
    control::{TaskControl, TaskStatusInfo},
    error::TaskError,
    event::{TaskEvent, TaskTerminateReason},
    process::{
        action::stop::stop_process,
        control::{ProcessControl, ProcessControlAction},
    },
    state::{ProcessState, TaskState},
    tokio::executor::TaskExecutor,
};

impl TaskControl for TaskExecutor {
    fn terminate_task(&mut self, reason: TaskTerminateReason) -> Result<(), TaskError> {
        let current_state = self.get_task_state();
        if current_state == TaskState::Finished {
            return Err(TaskError::Control("Task already finished".to_string()));
        }
        if let Some(tx) = self.terminate_tx.take() {
            if tx.send(reason.clone()).is_err() {
                let msg = "Terminate channel closed while sending signal";
                #[cfg(feature = "tracing")]
                tracing::warn!(terminate_reason=?reason, msg);
                return Err(TaskError::Channel(msg.to_string()));
            }
        } else {
            let msg = "Terminate signal already sent or channel missing";
            #[cfg(feature = "tracing")]
            tracing::warn!(msg);
            return Err(TaskError::Channel(msg.to_string()));
        }

        Ok(())
    }

    #[cfg(all(feature = "signal", unix))]
    fn send_signal(&self, signal: nix::sys::signal::Signal) -> Result<(), TaskError> {
        #[cfg(feature = "process-group")]
        let use_process_group = self
            .shared_context
            .config
            .use_process_group
            .unwrap_or_default();
        #[cfg(not(feature = "process-group"))]
        let use_process_group = false;

        #[cfg(feature = "process-group")]
        let active = {
            match self.shared_context.group.try_lock() {
                Ok(guard) => guard.is_active(),
                Err(_) => false,
            }
        };
        #[cfg(not(feature = "process-group"))]
        let active = false;

        // Get the process ID
        let process_id = match self.shared_context.get_process_id() {
            Some(n) => n,
            None => {
                let msg = "No process ID available to send signal";
                #[cfg(feature = "tracing")]
                tracing::warn!(msg);
                return Err(TaskError::Control(msg.to_string()));
            }
        };

        // Send signal to process group or individual process
        if use_process_group && active {
            match self.shared_context.group.try_lock() {
                    Ok(guard) => guard.send_signal(signal).map_err(|e| {
                        let msg = format!("Failed to send signal {:?} to process group: {}", signal, e);
                        #[cfg(feature = "tracing")]
                        tracing::error!(error=%e, signal=?signal, "Failed to send signal to process group");
                        TaskError::Control(msg)
                    })?,
                    Err(_) => {
                        let msg = "Process group lock is held, cannot send signal";
                        #[cfg(feature = "tracing")]
                        tracing::warn!(msg);
                        return Err(TaskError::Control(msg.to_string()));
                    }
                }
        } else {
            // Send signal to individual process
            use crate::tasks::process::action::signal::send_signal;
            send_signal(process_id, signal).map_err(|e| {
                let msg = format!("Failed to send signal {:?} to process: {}", signal, e);
                #[cfg(feature = "tracing")]
                tracing::error!(error=%e, signal=?signal, pid=process_id, "Failed to send signal to process");
                TaskError::Control(msg)
            })?;
        }

        #[cfg(feature = "tracing")]
        tracing::debug!(signal=?signal, pid=process_id, use_group=use_process_group, "Signal sent successfully");

        Ok(())
    }
}

impl TaskStatusInfo for TaskExecutor {
    fn get_task_state(&self) -> TaskState {
        self.shared_context.get_task_state()
    }

    fn get_process_state(&self) -> ProcessState {
        self.shared_context.get_process_state()
    }

    fn get_process_id(&self) -> Option<u32> {
        self.shared_context.get_process_id()
    }

    fn get_create_at(&self) -> SystemTime {
        self.shared_context.get_create_at()
    }

    fn get_running_at(&self) -> Option<SystemTime> {
        self.shared_context.get_running_at()
    }

    fn get_finished_at(&self) -> Option<SystemTime> {
        self.shared_context.get_finished_at()
    }

    fn get_exit_code(&self) -> Option<i32> {
        self.shared_context.get_exit_code()
    }

    #[cfg(unix)]
    fn get_last_signal_code(&self) -> Option<nix::sys::signal::Signal> {
        self.shared_context.get_last_signal_code()
    }
}
impl ProcessControl for TaskExecutor {
    async fn perform_process_action(
        &mut self,
        action: ProcessControlAction,
    ) -> Result<(), TaskError> {
        #[cfg(feature = "process-group")]
        let use_process_group = self
            .shared_context
            .config
            .use_process_group
            .unwrap_or_default();
        #[cfg(not(feature = "process-group"))]
        let use_process_group = false;

        #[cfg(feature = "process-group")]
        let active = self.shared_context.group.lock().await.is_active();
        #[cfg(not(feature = "process-group"))]
        let active = false;
        let process_id = match self.shared_context.get_process_id() {
            Some(n) => n,
            None => {
                let msg = "No process ID available to perform action";
                #[cfg(feature = "tracing")]
                tracing::warn!(msg);
                return Err(TaskError::Control(msg.to_string()));
            }
        };
        match action {
            ProcessControlAction::Stop => {
                if use_process_group && active {
                    self.shared_context
                        .group
                        .lock()
                        .await
                        .stop_group()
                        .map_err(|e| {
                            let msg = format!("Failed to terminate process group: {}", e);
                            #[cfg(feature = "tracing")]
                            tracing::error!(error=%e, "{}", msg);
                            TaskError::Control(msg)
                        })?;
                } else {
                    stop_process(process_id).map_err(|e| {
                        let msg = format!("Failed to terminate process: {}", e);
                        #[cfg(feature = "tracing")]
                        tracing::error!(error=%e, "{}", msg);
                        TaskError::Control(msg)
                    })?;
                }
            }
            ProcessControlAction::Pause => {
                if use_process_group && active {
                    self.shared_context
                        .group
                        .lock()
                        .await
                        .pause_group()
                        .map_err(|e| {
                            let msg = format!("Failed to pause process group: {}", e);
                            #[cfg(feature = "tracing")]
                            tracing::error!(error=%e, "{}", msg);
                            TaskError::Control(msg)
                        })?;
                } else {
                    use crate::tasks::process::action::pause::pause_process;
                    pause_process(process_id).map_err(|e| {
                        let msg = format!("Failed to pause process: {}", e);
                        #[cfg(feature = "tracing")]
                        tracing::error!(error=%e, "{}", msg);
                        TaskError::Control(msg)
                    })?;
                }
            }
            ProcessControlAction::Resume => {
                if use_process_group && active {
                    self.shared_context
                        .group
                        .lock()
                        .await
                        .resume_group()
                        .map_err(|e| {
                            let msg = format!("Failed to resume process group: {}", e);
                            #[cfg(feature = "tracing")]
                            tracing::error!(error=%e, "{}", msg);
                            TaskError::Control(msg)
                        })?;
                } else {
                    use crate::tasks::process::action::resume::resume_process;
                    resume_process(process_id).map_err(|e| {
                        let msg = format!("Failed to resume process: {}", e);
                        #[cfg(feature = "tracing")]
                        tracing::error!(error=%e, "{}", msg);
                        TaskError::Control(msg)
                    })?;
                }
            }
        }
        let _ = self
            .shared_context
            .send_event(TaskEvent::ProcessControl { action })
            .await;

        Ok(())
    }
}