Skip to main content

rust_supervisor/task/
context.rs

1//! Task execution context.
2//!
3//! This module provides the per-attempt handles that a task uses to observe
4//! cancellation, emit heartbeats, and report readiness.
5
6use crate::id::types::{Attempt, ChildId, Generation, SupervisorPath};
7use crate::readiness::signal::ReadySignal;
8use tokio::sync::watch;
9use tokio::time::Instant;
10use tokio_util::sync::CancellationToken;
11
12/// Context passed to a task for a single attempt.
13#[derive(Debug, Clone)]
14pub struct TaskContext {
15    /// Stable child identifier for the task attempt.
16    pub child_id: ChildId,
17    /// Full path of the child in the supervisor tree.
18    pub path: SupervisorPath,
19    /// Generation for the runtime slot that owns this attempt.
20    pub generation: Generation,
21    /// Attempt number for the running task.
22    pub attempt: Attempt,
23    /// Token that tells the task when cancellation was requested.
24    cancellation_token: CancellationToken,
25    /// Sender used to report readiness.
26    ready_signal: ReadySignal,
27    /// Sender used to publish the latest heartbeat time.
28    heartbeat_sender: watch::Sender<Option<Instant>>,
29}
30
31impl TaskContext {
32    /// Creates a task context for a child attempt.
33    ///
34    /// # Arguments
35    ///
36    /// - `child_id`: Stable child identifier.
37    /// - `path`: Full supervisor tree path for this child.
38    /// - `generation`: Runtime slot generation.
39    /// - `attempt`: Attempt number for this execution.
40    ///
41    /// # Returns
42    ///
43    /// Returns the context and a heartbeat receiver for runtime observers.
44    ///
45    /// # Examples
46    ///
47    /// ```
48    /// let (ctx, _heartbeat) = rust_supervisor::task::context::TaskContext::new(
49    ///     rust_supervisor::id::types::ChildId::new("worker"),
50    ///     rust_supervisor::id::types::SupervisorPath::root().join("worker"),
51    ///     rust_supervisor::id::types::Generation::initial(),
52    ///     rust_supervisor::id::types::Attempt::first(),
53    /// );
54    /// assert!(!ctx.is_cancelled());
55    /// ```
56    pub fn new(
57        child_id: ChildId,
58        path: SupervisorPath,
59        generation: Generation,
60        attempt: Attempt,
61    ) -> (Self, watch::Receiver<Option<Instant>>) {
62        let (ready_signal, _ready_receiver) = ReadySignal::new();
63        let (heartbeat_sender, heartbeat_receiver) = watch::channel(None);
64        (
65            Self {
66                child_id,
67                path,
68                generation,
69                attempt,
70                cancellation_token: CancellationToken::new(),
71                ready_signal,
72                heartbeat_sender,
73            },
74            heartbeat_receiver,
75        )
76    }
77
78    /// Creates a task context with an existing readiness signal.
79    ///
80    /// # Arguments
81    ///
82    /// - `child_id`: Stable child identifier.
83    /// - `path`: Full supervisor tree path for this child.
84    /// - `generation`: Runtime slot generation.
85    /// - `attempt`: Attempt number for this execution.
86    /// - `ready_signal`: Signal used to publish readiness.
87    ///
88    /// # Returns
89    ///
90    /// Returns the context and a heartbeat receiver for runtime observers.
91    pub fn with_ready_signal(
92        child_id: ChildId,
93        path: SupervisorPath,
94        generation: Generation,
95        attempt: Attempt,
96        ready_signal: ReadySignal,
97    ) -> (Self, watch::Receiver<Option<Instant>>) {
98        let (heartbeat_sender, heartbeat_receiver) = watch::channel(None);
99        (
100            Self {
101                child_id,
102                path,
103                generation,
104                attempt,
105                cancellation_token: CancellationToken::new(),
106                ready_signal,
107                heartbeat_sender,
108            },
109            heartbeat_receiver,
110        )
111    }
112
113    /// Reports that the task is ready.
114    ///
115    /// # Arguments
116    ///
117    /// This function has no arguments.
118    ///
119    /// # Returns
120    ///
121    /// This function does not return a value.
122    pub fn mark_ready(&self) {
123        self.ready_signal.mark_ready();
124    }
125
126    /// Emits a heartbeat with the current monotonic time.
127    ///
128    /// # Arguments
129    ///
130    /// This function has no arguments.
131    ///
132    /// # Returns
133    ///
134    /// This function does not return a value.
135    pub fn heartbeat(&self) {
136        let _ignored = self.heartbeat_sender.send(Some(Instant::now()));
137    }
138
139    /// Requests cancellation for this task attempt.
140    ///
141    /// # Arguments
142    ///
143    /// This function has no arguments.
144    ///
145    /// # Returns
146    ///
147    /// This function does not return a value.
148    pub fn cancel(&self) {
149        self.cancellation_token.cancel();
150    }
151
152    /// Returns whether cancellation was requested.
153    ///
154    /// # Arguments
155    ///
156    /// This function has no arguments.
157    ///
158    /// # Returns
159    ///
160    /// Returns `true` when cancellation was requested.
161    pub fn is_cancelled(&self) -> bool {
162        self.cancellation_token.is_cancelled()
163    }
164
165    /// Returns a clone of the cancellation token.
166    ///
167    /// # Arguments
168    ///
169    /// This function has no arguments.
170    ///
171    /// # Returns
172    ///
173    /// Returns the cancellation token for asynchronous selection.
174    pub fn cancellation_token(&self) -> CancellationToken {
175        self.cancellation_token.clone()
176    }
177
178    /// Subscribes to readiness updates.
179    ///
180    /// # Arguments
181    ///
182    /// This function has no arguments.
183    ///
184    /// # Returns
185    ///
186    /// Returns a receiver that observes readiness changes.
187    pub fn readiness_receiver(&self) -> watch::Receiver<bool> {
188        self.ready_signal.subscribe()
189    }
190}