tcrm_task/tasks/tokio/
control.rs1use std::time::SystemTime;
2
3use crate::tasks::{
4 control::{TaskControl, TaskStatusInfo},
5 error::TaskError,
6 event::{TaskEvent, TaskTerminateReason},
7 process::{
8 action::stop::stop_process,
9 control::{ProcessControl, ProcessControlAction},
10 },
11 state::{ProcessState, TaskState},
12 tokio::executor::TaskExecutor,
13};
14
15impl TaskControl for TaskExecutor {
16 fn terminate_task(&mut self, reason: TaskTerminateReason) -> Result<(), TaskError> {
17 let current_state = self.get_task_state();
18 if current_state == TaskState::Finished {
19 return Err(TaskError::Control("Task already finished".to_string()));
20 }
21 if let Some(tx) = self.terminate_tx.take() {
22 if tx.send(reason.clone()).is_err() {
23 let msg = "Terminate channel closed while sending signal";
24 #[cfg(feature = "tracing")]
25 tracing::warn!(terminate_reason=?reason, msg);
26 return Err(TaskError::Channel(msg.to_string()));
27 }
28 } else {
29 let msg = "Terminate signal already sent or channel missing";
30 #[cfg(feature = "tracing")]
31 tracing::warn!(msg);
32 return Err(TaskError::Channel(msg.to_string()));
33 }
34
35 Ok(())
36 }
37
38 #[cfg(all(feature = "signal", unix))]
39 fn send_signal(&self, signal: nix::sys::signal::Signal) -> Result<(), TaskError> {
40 #[cfg(feature = "process-group")]
41 let use_process_group = self
42 .shared_context
43 .config
44 .use_process_group
45 .unwrap_or_default();
46 #[cfg(not(feature = "process-group"))]
47 let use_process_group = false;
48
49 #[cfg(feature = "process-group")]
50 let active = {
51 match self.shared_context.group.try_lock() {
52 Ok(guard) => guard.is_active(),
53 Err(_) => false,
54 }
55 };
56 #[cfg(not(feature = "process-group"))]
57 let active = false;
58
59 let process_id = match self.shared_context.get_process_id() {
61 Some(n) => n,
62 None => {
63 let msg = "No process ID available to send signal";
64 #[cfg(feature = "tracing")]
65 tracing::warn!(msg);
66 return Err(TaskError::Control(msg.to_string()));
67 }
68 };
69
70 if use_process_group && active {
72 match self.shared_context.group.try_lock() {
73 Ok(guard) => guard.send_signal(signal).map_err(|e| {
74 let msg = format!("Failed to send signal {:?} to process group: {}", signal, e);
75 #[cfg(feature = "tracing")]
76 tracing::error!(error=%e, signal=?signal, "Failed to send signal to process group");
77 TaskError::Control(msg)
78 })?,
79 Err(_) => {
80 let msg = "Process group lock is held, cannot send signal";
81 #[cfg(feature = "tracing")]
82 tracing::warn!(msg);
83 return Err(TaskError::Control(msg.to_string()));
84 }
85 }
86 } else {
87 use crate::tasks::process::action::signal::send_signal;
89 send_signal(process_id, signal).map_err(|e| {
90 let msg = format!("Failed to send signal {:?} to process: {}", signal, e);
91 #[cfg(feature = "tracing")]
92 tracing::error!(error=%e, signal=?signal, pid=process_id, "Failed to send signal to process");
93 TaskError::Control(msg)
94 })?;
95 }
96
97 #[cfg(feature = "tracing")]
98 tracing::debug!(signal=?signal, pid=process_id, use_group=use_process_group, "Signal sent successfully");
99
100 Ok(())
101 }
102}
103
104impl TaskStatusInfo for TaskExecutor {
105 fn get_task_state(&self) -> TaskState {
106 self.shared_context.get_task_state()
107 }
108
109 fn get_process_state(&self) -> ProcessState {
110 self.shared_context.get_process_state()
111 }
112
113 fn get_process_id(&self) -> Option<u32> {
114 self.shared_context.get_process_id()
115 }
116
117 fn get_create_at(&self) -> SystemTime {
118 self.shared_context.get_create_at()
119 }
120
121 fn get_running_at(&self) -> Option<SystemTime> {
122 self.shared_context.get_running_at()
123 }
124
125 fn get_finished_at(&self) -> Option<SystemTime> {
126 self.shared_context.get_finished_at()
127 }
128
129 fn get_exit_code(&self) -> Option<i32> {
130 self.shared_context.get_exit_code()
131 }
132
133 #[cfg(unix)]
134 fn get_last_signal_code(&self) -> Option<nix::sys::signal::Signal> {
135 self.shared_context.get_last_signal_code()
136 }
137}
138impl ProcessControl for TaskExecutor {
139 async fn perform_process_action(
140 &mut self,
141 action: ProcessControlAction,
142 ) -> Result<(), TaskError> {
143 #[cfg(feature = "process-group")]
144 let use_process_group = self
145 .shared_context
146 .config
147 .use_process_group
148 .unwrap_or_default();
149 #[cfg(not(feature = "process-group"))]
150 let use_process_group = false;
151
152 #[cfg(feature = "process-group")]
153 let active = self.shared_context.group.lock().await.is_active();
154 #[cfg(not(feature = "process-group"))]
155 let active = false;
156 let process_id = match self.shared_context.get_process_id() {
157 Some(n) => n,
158 None => {
159 let msg = "No process ID available to perform action";
160 #[cfg(feature = "tracing")]
161 tracing::warn!(msg);
162 return Err(TaskError::Control(msg.to_string()));
163 }
164 };
165 match action {
166 ProcessControlAction::Stop => {
167 if use_process_group && active {
168 self.shared_context
169 .group
170 .lock()
171 .await
172 .stop_group()
173 .map_err(|e| {
174 let msg = format!("Failed to terminate process group: {}", e);
175 #[cfg(feature = "tracing")]
176 tracing::error!(error=%e, "{}", msg);
177 TaskError::Control(msg)
178 })?;
179 } else {
180 stop_process(process_id).map_err(|e| {
181 let msg = format!("Failed to terminate process: {}", e);
182 #[cfg(feature = "tracing")]
183 tracing::error!(error=%e, "{}", msg);
184 TaskError::Control(msg)
185 })?;
186 }
187 }
188 ProcessControlAction::Pause => {
189 if use_process_group && active {
190 self.shared_context
191 .group
192 .lock()
193 .await
194 .pause_group()
195 .map_err(|e| {
196 let msg = format!("Failed to pause process group: {}", e);
197 #[cfg(feature = "tracing")]
198 tracing::error!(error=%e, "{}", msg);
199 TaskError::Control(msg)
200 })?;
201 } else {
202 use crate::tasks::process::action::pause::pause_process;
203 pause_process(process_id).map_err(|e| {
204 let msg = format!("Failed to pause process: {}", e);
205 #[cfg(feature = "tracing")]
206 tracing::error!(error=%e, "{}", msg);
207 TaskError::Control(msg)
208 })?;
209 }
210 }
211 ProcessControlAction::Resume => {
212 if use_process_group && active {
213 self.shared_context
214 .group
215 .lock()
216 .await
217 .resume_group()
218 .map_err(|e| {
219 let msg = format!("Failed to resume process group: {}", e);
220 #[cfg(feature = "tracing")]
221 tracing::error!(error=%e, "{}", msg);
222 TaskError::Control(msg)
223 })?;
224 } else {
225 use crate::tasks::process::action::resume::resume_process;
226 resume_process(process_id).map_err(|e| {
227 let msg = format!("Failed to resume process: {}", e);
228 #[cfg(feature = "tracing")]
229 tracing::error!(error=%e, "{}", msg);
230 TaskError::Control(msg)
231 })?;
232 }
233 }
234 }
235 let _ = self
236 .shared_context
237 .send_event(TaskEvent::ProcessControl { action })
238 .await;
239
240 Ok(())
241 }
242}