tcrm_task/tasks/tokio/
control.rs

1use std::time::SystemTime;
2
3use crate::tasks::{
4    control::{TaskControl, TaskStatusInfo},
5    error::TaskError,
6    event::{TaskEvent, TaskTerminateReason},
7    process::{
8        action::stop::stop_process,
9        control::{ProcessControl, ProcessControlAction},
10    },
11    state::{ProcessState, TaskState},
12    tokio::executor::TaskExecutor,
13};
14
15impl TaskControl for TaskExecutor {
16    fn terminate_task(&mut self, reason: TaskTerminateReason) -> Result<(), TaskError> {
17        let current_state = self.get_task_state();
18        if current_state == TaskState::Finished {
19            return Err(TaskError::Control("Task already finished".to_string()));
20        }
21        if let Some(tx) = self.terminate_tx.take() {
22            if tx.send(reason.clone()).is_err() {
23                let msg = "Terminate channel closed while sending signal";
24                #[cfg(feature = "tracing")]
25                tracing::warn!(terminate_reason=?reason, msg);
26                return Err(TaskError::Channel(msg.to_string()));
27            }
28        } else {
29            let msg = "Terminate signal already sent or channel missing";
30            #[cfg(feature = "tracing")]
31            tracing::warn!(msg);
32            return Err(TaskError::Channel(msg.to_string()));
33        }
34
35        Ok(())
36    }
37
38    #[cfg(all(feature = "signal", unix))]
39    fn send_signal(&self, signal: nix::sys::signal::Signal) -> Result<(), TaskError> {
40        #[cfg(feature = "process-group")]
41        let use_process_group = self
42            .shared_context
43            .config
44            .use_process_group
45            .unwrap_or_default();
46        #[cfg(not(feature = "process-group"))]
47        let use_process_group = false;
48
49        #[cfg(feature = "process-group")]
50        let active = {
51            match self.shared_context.group.try_lock() {
52                Ok(guard) => guard.is_active(),
53                Err(_) => false,
54            }
55        };
56        #[cfg(not(feature = "process-group"))]
57        let active = false;
58
59        // Get the process ID
60        let process_id = match self.shared_context.get_process_id() {
61            Some(n) => n,
62            None => {
63                let msg = "No process ID available to send signal";
64                #[cfg(feature = "tracing")]
65                tracing::warn!(msg);
66                return Err(TaskError::Control(msg.to_string()));
67            }
68        };
69
70        // Send signal to process group or individual process
71        if use_process_group && active {
72            match self.shared_context.group.try_lock() {
73                    Ok(guard) => guard.send_signal(signal).map_err(|e| {
74                        let msg = format!("Failed to send signal {:?} to process group: {}", signal, e);
75                        #[cfg(feature = "tracing")]
76                        tracing::error!(error=%e, signal=?signal, "Failed to send signal to process group");
77                        TaskError::Control(msg)
78                    })?,
79                    Err(_) => {
80                        let msg = "Process group lock is held, cannot send signal";
81                        #[cfg(feature = "tracing")]
82                        tracing::warn!(msg);
83                        return Err(TaskError::Control(msg.to_string()));
84                    }
85                }
86        } else {
87            // Send signal to individual process
88            use crate::tasks::process::action::signal::send_signal;
89            send_signal(process_id, signal).map_err(|e| {
90                let msg = format!("Failed to send signal {:?} to process: {}", signal, e);
91                #[cfg(feature = "tracing")]
92                tracing::error!(error=%e, signal=?signal, pid=process_id, "Failed to send signal to process");
93                TaskError::Control(msg)
94            })?;
95        }
96
97        #[cfg(feature = "tracing")]
98        tracing::debug!(signal=?signal, pid=process_id, use_group=use_process_group, "Signal sent successfully");
99
100        Ok(())
101    }
102}
103
104impl TaskStatusInfo for TaskExecutor {
105    fn get_task_state(&self) -> TaskState {
106        self.shared_context.get_task_state()
107    }
108
109    fn get_process_state(&self) -> ProcessState {
110        self.shared_context.get_process_state()
111    }
112
113    fn get_process_id(&self) -> Option<u32> {
114        self.shared_context.get_process_id()
115    }
116
117    fn get_create_at(&self) -> SystemTime {
118        self.shared_context.get_create_at()
119    }
120
121    fn get_running_at(&self) -> Option<SystemTime> {
122        self.shared_context.get_running_at()
123    }
124
125    fn get_finished_at(&self) -> Option<SystemTime> {
126        self.shared_context.get_finished_at()
127    }
128
129    fn get_exit_code(&self) -> Option<i32> {
130        self.shared_context.get_exit_code()
131    }
132
133    #[cfg(unix)]
134    fn get_last_signal_code(&self) -> Option<nix::sys::signal::Signal> {
135        self.shared_context.get_last_signal_code()
136    }
137}
138impl ProcessControl for TaskExecutor {
139    async fn perform_process_action(
140        &mut self,
141        action: ProcessControlAction,
142    ) -> Result<(), TaskError> {
143        #[cfg(feature = "process-group")]
144        let use_process_group = self
145            .shared_context
146            .config
147            .use_process_group
148            .unwrap_or_default();
149        #[cfg(not(feature = "process-group"))]
150        let use_process_group = false;
151
152        #[cfg(feature = "process-group")]
153        let active = self.shared_context.group.lock().await.is_active();
154        #[cfg(not(feature = "process-group"))]
155        let active = false;
156        let process_id = match self.shared_context.get_process_id() {
157            Some(n) => n,
158            None => {
159                let msg = "No process ID available to perform action";
160                #[cfg(feature = "tracing")]
161                tracing::warn!(msg);
162                return Err(TaskError::Control(msg.to_string()));
163            }
164        };
165        match action {
166            ProcessControlAction::Stop => {
167                if use_process_group && active {
168                    self.shared_context
169                        .group
170                        .lock()
171                        .await
172                        .stop_group()
173                        .map_err(|e| {
174                            let msg = format!("Failed to terminate process group: {}", e);
175                            #[cfg(feature = "tracing")]
176                            tracing::error!(error=%e, "{}", msg);
177                            TaskError::Control(msg)
178                        })?;
179                } else {
180                    stop_process(process_id).map_err(|e| {
181                        let msg = format!("Failed to terminate process: {}", e);
182                        #[cfg(feature = "tracing")]
183                        tracing::error!(error=%e, "{}", msg);
184                        TaskError::Control(msg)
185                    })?;
186                }
187            }
188            ProcessControlAction::Pause => {
189                if use_process_group && active {
190                    self.shared_context
191                        .group
192                        .lock()
193                        .await
194                        .pause_group()
195                        .map_err(|e| {
196                            let msg = format!("Failed to pause process group: {}", e);
197                            #[cfg(feature = "tracing")]
198                            tracing::error!(error=%e, "{}", msg);
199                            TaskError::Control(msg)
200                        })?;
201                } else {
202                    use crate::tasks::process::action::pause::pause_process;
203                    pause_process(process_id).map_err(|e| {
204                        let msg = format!("Failed to pause process: {}", e);
205                        #[cfg(feature = "tracing")]
206                        tracing::error!(error=%e, "{}", msg);
207                        TaskError::Control(msg)
208                    })?;
209                }
210            }
211            ProcessControlAction::Resume => {
212                if use_process_group && active {
213                    self.shared_context
214                        .group
215                        .lock()
216                        .await
217                        .resume_group()
218                        .map_err(|e| {
219                            let msg = format!("Failed to resume process group: {}", e);
220                            #[cfg(feature = "tracing")]
221                            tracing::error!(error=%e, "{}", msg);
222                            TaskError::Control(msg)
223                        })?;
224                } else {
225                    use crate::tasks::process::action::resume::resume_process;
226                    resume_process(process_id).map_err(|e| {
227                        let msg = format!("Failed to resume process: {}", e);
228                        #[cfg(feature = "tracing")]
229                        tracing::error!(error=%e, "{}", msg);
230                        TaskError::Control(msg)
231                    })?;
232                }
233            }
234        }
235        let _ = self
236            .shared_context
237            .send_event(TaskEvent::ProcessControl { action })
238            .await;
239
240        Ok(())
241    }
242}