1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
use std::{
sync::atomic::{AtomicBool, AtomicI32, AtomicU8, AtomicU32, AtomicU64},
time::{SystemTime, UNIX_EPOCH},
};
use tokio::sync::{Mutex, mpsc, oneshot};
#[cfg(feature = "process-group")]
use crate::tasks::process::group::builder::ProcessGroup;
use crate::tasks::{
config::TaskConfig,
error::TaskError,
event::{TaskEvent, TaskEventEnvelope, TaskStopReason, TaskTerminateReason},
state::TaskState,
};
/// Shared context for task execution state and configuration.
///
/// This structure holds all the runtime state information for a task,
/// including process information, timestamps, and synchronization primitives.
/// It's designed to be thread-safe and shared between async tasks.
#[derive(Debug)]
pub(crate) struct TaskExecutorContext {
pub(crate) config: TaskConfig,
task_state: AtomicU8,
process_state: AtomicU8,
process_id: AtomicU32,
created_at: AtomicU64,
running_at: AtomicU64,
finished_at: AtomicU64,
exit_code: AtomicI32,
exit_code_set: AtomicBool,
stop_reason: Mutex<Option<TaskStopReason>>,
ready_flag: AtomicBool,
internal_terminate_tx: Mutex<Option<oneshot::Sender<TaskTerminateReason>>>,
event_tx: Mutex<Option<mpsc::Sender<TaskEventEnvelope>>>,
drop_event_tx_on_finished: AtomicBool,
#[cfg(unix)]
last_signal_code: AtomicI32,
#[cfg(feature = "process-group")]
pub(crate) group: Mutex<ProcessGroup>,
}
impl TaskExecutorContext {
/// Creates a new task execution context.
///
/// Initializes all state fields to their default values and records
/// the current time as the creation timestamp.
///
/// # Arguments
///
/// * `config` - The task configuration to use
/// * `event_tx` - Channel for sending task events
///
/// # Returns
///
/// A new `TaskExecutorContext` instance
pub(crate) fn new(config: TaskConfig, event_tx: mpsc::Sender<TaskEventEnvelope>) -> Self {
let now_nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos() as u64;
Self {
config,
task_state: AtomicU8::new(0),
process_state: AtomicU8::new(0),
process_id: AtomicU32::new(0),
created_at: AtomicU64::new(now_nanos),
running_at: AtomicU64::new(0),
finished_at: AtomicU64::new(0),
exit_code: AtomicI32::new(0),
exit_code_set: AtomicBool::new(false),
stop_reason: Mutex::new(None),
ready_flag: AtomicBool::new(false),
internal_terminate_tx: Mutex::new(None),
event_tx: Mutex::new(Some(event_tx)),
drop_event_tx_on_finished: AtomicBool::new(true),
#[cfg(unix)]
last_signal_code: AtomicI32::new(0),
#[cfg(feature = "process-group")]
group: Mutex::new(ProcessGroup::new()),
}
}
/// Gets the termination signal code for Unix systems.
///
/// Returns the signal code that was used to terminate the process,
/// or None if no signal was received.
///
/// # Returns
///
/// * `Some(i32)` - The signal code that terminated the process
/// * `None` - If no signal was received or signal code is 0
#[cfg(unix)]
pub(crate) fn get_last_signal_code(&self) -> Option<nix::sys::signal::Signal> {
let code = self
.last_signal_code
.load(std::sync::atomic::Ordering::SeqCst);
let code = nix::sys::signal::Signal::try_from(code);
match code {
Ok(sig) => Some(sig),
Err(_) => None,
}
}
/// Sets the termination signal code for Unix systems.
///
/// Stores the signal code that was used to terminate the process.
///
/// # Arguments
///
/// * `code` - The signal code to store, or None to clear it
#[cfg(unix)]
pub(crate) fn set_last_signal_code(&self, code: Option<nix::sys::signal::Signal>) {
let code = match code {
Some(c) => c as i32,
None => 0,
};
self.last_signal_code
.store(code, std::sync::atomic::Ordering::SeqCst);
}
/// Sends a termination signal through the internal oneshot channel.
///
/// Attempts to send a termination reason through the internal oneshot
/// channel if it exists and hasn't been used yet.
///
/// # Arguments
///
/// * `reason` - The reason for termination
pub(crate) async fn send_terminate_oneshot(&self, reason: TaskTerminateReason) {
let mut guard = self.internal_terminate_tx.lock().await;
if let Some(tx) = guard.take() {
if tx.send(reason.clone()).is_err() {
#[cfg(feature = "tracing")]
tracing::warn!(terminate_reason=?reason, "Internal Terminate channel closed while sending signal");
}
} else {
#[cfg(feature = "tracing")]
tracing::warn!("Terminate signal already sent or channel missing");
}
}
/// Sets the internal termination signal sender.
///
/// Stores the oneshot sender that will be used to signal task termination.
/// This is typically called during task setup.
///
/// # Arguments
///
/// * `tx` - The oneshot sender for termination signals
pub(crate) async fn set_internal_terminate_tx(&self, tx: oneshot::Sender<TaskTerminateReason>) {
let mut guard = self.internal_terminate_tx.lock().await;
*guard = Some(tx);
}
/// Gets the ready flag indicating if the task has reached a ready state.
///
/// Returns true if the ready indicator has been detected in the output.
///
/// # Returns
///
/// * `true` - If the ready indicator has been detected
/// * `false` - If the task is not ready yet
pub(crate) fn get_ready_flag(&self) -> bool {
self.ready_flag.load(std::sync::atomic::Ordering::SeqCst)
}
/// Sets the ready flag indicating the task has reached a ready state.
///
/// Called when the ready indicator is detected in the task output.
///
/// # Arguments
///
/// * `ready` - Whether the task is ready or not
pub(crate) fn set_ready_flag(&self, ready: bool) {
self.ready_flag
.store(ready, std::sync::atomic::Ordering::SeqCst);
}
/// Gets the stop reason for the task.
///
/// Returns the reason why the task was stopped, if it has been stopped.
///
/// # Returns
///
/// * `Some(TaskStopReason)` - The reason the task stopped if it has been stopped
/// * `None` - If the task has not been stopped yet
pub(crate) async fn get_stop_reason(&self) -> Option<TaskStopReason> {
self.stop_reason.lock().await.clone()
}
/// Sets the stop reason for the task.
///
/// Records the reason why the task is being stopped for future retrieval.
///
/// # Arguments
///
/// * `reason` - The reason for stopping the task
pub(crate) async fn set_stop_reason(&self, reason: TaskStopReason) {
let mut guard = self.stop_reason.lock().await;
*guard = Some(reason);
}
/// Gets the exit code of the finished task.
///
/// Returns the process exit code if the task has finished execution.
/// For terminated processes (timeout, manual termination), this returns `None`
/// to avoid race conditions between termination and natural completion.
///
/// # Exit Code Behavior
///
/// - **Natural completion**: Returns `Some(exit_code)`
/// - **Terminated processes**: Returns `None` (timeout, user termination, etc.)
/// - **Running/Not started**: Returns `None`
///
/// # Returns
///
/// * `Some(i32)` - The exit code if the task completed naturally
/// * `None` - If the task is not finished, was terminated, or exit code was not captured
pub(crate) fn get_exit_code(&self) -> Option<i32> {
let state = self.get_task_state();
if state != TaskState::Finished {
return None;
}
if self.exit_code_set.load(std::sync::atomic::Ordering::SeqCst) {
let code = self.exit_code.load(std::sync::atomic::Ordering::SeqCst);
Some(code)
} else {
None
}
}
/// Sets the exit code for the task.
///
/// Stores the process exit code when the task finishes execution.
/// Setting `None` indicates the process was terminated and should not
/// provide an exit code in the final `TaskEvent::Stopped` event.
///
/// # Arguments
///
/// * `code` - The exit code to store, or None if the process was terminated
pub(crate) fn set_exit_code(&self, code: Option<i32>) {
match code {
Some(c) => {
self.exit_code.store(c, std::sync::atomic::Ordering::SeqCst);
self.exit_code_set
.store(true, std::sync::atomic::Ordering::SeqCst);
}
None => {
self.exit_code_set
.store(false, std::sync::atomic::Ordering::SeqCst);
}
}
}
/// Gets the timestamp when the task started running.
///
/// Returns the time when the task transitioned to the running state.
///
/// # Returns
///
/// * `Some(SystemTime)` - When the task started running
/// * `None` - If the task hasn't started running yet
pub(crate) fn get_running_at(&self) -> Option<SystemTime> {
Self::get_time(&self.running_at)
}
/// Sets the running timestamp to the current time.
///
/// Records the current time as when the task started running.
///
/// # Returns
///
/// The timestamp that was recorded
pub(crate) fn set_running_at(&self) -> SystemTime {
Self::set_time(&self.running_at)
}
/// Gets the timestamp when the task finished execution.
///
/// Returns the time when the task completed or was terminated.
///
/// # Returns
///
/// * `Some(SystemTime)` - When the task finished
/// * `None` - If the task hasn't finished yet
pub(crate) fn get_finished_at(&self) -> Option<SystemTime> {
Self::get_time(&self.finished_at)
}
/// Sets the finished timestamp to the current time.
///
/// Records the current time as when the task finished execution.
///
/// # Returns
///
/// The timestamp that was recorded
pub(crate) fn set_finished_at(&self) -> SystemTime {
Self::set_time(&self.finished_at)
}
/// Gets the creation timestamp of the task context.
///
/// Returns when this task context was initially created.
///
/// # Returns
///
/// The timestamp when the task context was created
pub(crate) fn get_create_at(&self) -> SystemTime {
let nanos = self.created_at.load(std::sync::atomic::Ordering::SeqCst);
UNIX_EPOCH + std::time::Duration::from_nanos(nanos)
}
/// Gets the time value from an atomic storage.
///
/// Reads a timestamp from atomic storage and converts it to SystemTime.
/// Used internally for retrieving running_at and finished_at timestamps.
///
/// # Arguments
///
/// * `store` - Reference to the atomic storage containing the timestamp
///
/// # Returns
///
/// * `Some(SystemTime)` - The stored timestamp if it has been set
/// * `None` - If no timestamp has been stored (value is 0)
fn get_time(store: &AtomicU64) -> Option<SystemTime> {
let nanos = store.load(std::sync::atomic::Ordering::SeqCst);
if nanos == 0 {
None
} else {
Some(UNIX_EPOCH + std::time::Duration::from_nanos(nanos))
}
}
/// Sets the current time in atomic storage.
///
/// Stores the current timestamp in atomic storage for thread-safe access.
/// Used internally for setting running_at and finished_at timestamps.
///
/// # Arguments
///
/// * `store` - Reference to the atomic storage to update
///
/// # Returns
///
/// The current `SystemTime` that was stored
fn set_time(store: &AtomicU64) -> SystemTime {
let now = SystemTime::now();
let nanos = now
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos() as u64;
store.store(nanos, std::sync::atomic::Ordering::SeqCst);
now
}
/// Gets the process ID of the running task.
///
/// Returns the operating system process ID if a process has been spawned.
///
/// # Returns
///
/// * `Some(u32)` - The process ID if a process is running
/// * `None` - If no process has been spawned yet
pub(crate) fn get_process_id(&self) -> Option<u32> {
let process_id = self.process_id.load(std::sync::atomic::Ordering::SeqCst);
if process_id == 0 {
None
} else {
Some(process_id)
}
}
/// Sets the process ID for the task.
///
/// Stores the operating system process ID when a process is spawned.
///
/// # Arguments
///
/// * `pid` - The process ID to store
pub(crate) fn set_process_id(&self, pid: u32) {
self.process_id
.store(pid, std::sync::atomic::Ordering::SeqCst);
}
/// Gets the current state of the task.
///
/// Returns the current execution state of the task.
///
/// # Returns
///
/// The current `TaskState` of the task
pub(crate) fn get_task_state(&self) -> TaskState {
self.task_state
.load(std::sync::atomic::Ordering::SeqCst)
.into()
}
/// Sets the task state and returns the current timestamp.
///
/// Updates the task's execution state and records when the change occurred.
///
/// # Arguments
///
/// * `new_state` - The new state to set
///
/// # Returns
///
/// The timestamp when the state change occurred
pub(crate) fn set_task_state(&self, new_state: TaskState) -> SystemTime {
self.task_state
.store(new_state as u8, std::sync::atomic::Ordering::SeqCst);
let now = SystemTime::now();
let nanos = now
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos() as u64;
match new_state {
TaskState::Running => {
self.running_at
.store(nanos, std::sync::atomic::Ordering::SeqCst);
}
TaskState::Finished => {
self.finished_at
.store(nanos, std::sync::atomic::Ordering::SeqCst);
}
_ => {}
}
now
}
/// Gets the current state of the process.
///
/// Returns the current execution state of the process (Stopped, Running, or Pause).
///
/// # Returns
///
/// The current `ProcessState` of the process
pub(crate) fn get_process_state(&self) -> crate::tasks::state::ProcessState {
self.process_state
.load(std::sync::atomic::Ordering::SeqCst)
.into()
}
/// Sets the process state.
///
/// Updates the process's execution state (Stopped, Running, or Pause).
///
/// # Arguments
///
/// * `new_state` - The new process state to set
pub(crate) fn set_process_state(&self, new_state: crate::tasks::state::ProcessState) {
self.process_state
.store(new_state as u8, std::sync::atomic::Ordering::SeqCst);
}
/// Sends a task event through the event channel.
///
/// Attempts to send a [`TaskEvent`] through the event channel if it exists.
///
/// # Arguments
///
/// * `event` - The [`TaskEvent`] to send through the event channel.
///
/// # Returns
///
/// * `Ok(())` if the event was sent successfully.
/// * `Err(TaskError::Channel)` if the event channel is missing or closed.
///
/// # Behavior
///
/// - If the event channel exists, the event is sent asynchronously.
/// - If the event channel is missing, returns a [`TaskError::Channel`] and logs a warning (if the `tracing` feature is enabled).
/// - If the event channel is closed, returns a [`TaskError::Channel`] and logs a warning (if the `tracing` feature is enabled).
///
/// # Tracing
///
/// If the `tracing` feature is enabled, warnings are logged for missing or closed event channels, including the event attempted to be sent.
///
pub(crate) async fn send_event(&self, event: TaskEvent) -> Result<(), TaskError> {
let tx = self.get_event_tx().await;
let tx = match tx {
Some(tx) => tx,
None => {
let msg = format!("Event channel missing when sending event: {:?}", event);
#[cfg(feature = "tracing")]
tracing::warn!("{}", msg.clone());
return Err(TaskError::Channel(msg));
}
};
let envelope = TaskEventEnvelope {
id: self.config.task_id.clone(),
event,
};
match tx.send(envelope.clone()).await {
Ok(_) => Ok(()),
Err(_) => {
let msg = format!("Event channel closed when sending event: {:?}", envelope);
#[cfg(feature = "tracing")]
tracing::warn!("{}", msg.clone());
Err(TaskError::Channel(msg))
}
}
}
/// Gets a clone of the current event channel sender
///
/// # Returns
///
/// * `Some(mpsc::Sender<TaskEvent>)` - A clone of the event channel sender if it exists.
/// * `None` - If the event channel has been cleared or was never set.
pub(crate) async fn get_event_tx(&self) -> Option<mpsc::Sender<TaskEventEnvelope>> {
let guard = self.event_tx.lock().await;
guard.as_ref().cloned()
}
/// Clears (removes) the event channel sender from the context.
///
/// After calling this, the event channel will be set to `None` and no further events can be sent.
/// This is typically used internally when the task finishes and the event channel should be dropped.
pub(crate) async fn clear_event_tx(&self) {
let mut guard = self.event_tx.lock().await;
*guard = None;
}
/// Configures whether to drop the event channel when the task finishes.
///
/// Sets whether the event channel (`event_tx`) should be dropped (closed) automatically when the task finishes.
///
/// # Arguments
///
/// * `drop` - If `true`, the event channel will be dropped when the task finishes, signaling to receivers that no more events will be sent.
/// If `false`, the event channel will remain open after task completion (useful for shared channels or long-lived event streams).
///
/// # Behavior
///
/// Dropping the event channel on task finish is the default behavior. Disabling this can be useful if multiple tasks share the same event channel
/// and you want to control its lifetime externally.
pub(crate) fn set_drop_event_tx_on_finished(&self, drop: bool) {
self.drop_event_tx_on_finished
.store(drop, std::sync::atomic::Ordering::SeqCst);
}
}