rust_supervisor/task/context.rs
1//! Task execution context.
2//!
3//! This module provides the per-child_start_count handles that a task uses to observe
4//! cancellation, emit heartbeats, and report readiness.
5
6use crate::id::types::{ChildId, ChildStartCount, Generation, SupervisorPath};
7use crate::readiness::signal::{ReadinessState, ReadySignal};
8use tokio::sync::watch;
9use tokio::time::Instant;
10use tokio_util::sync::CancellationToken;
11
12/// Context passed to a task for a single child_start_count.
13#[derive(Debug, Clone)]
14pub struct TaskContext {
15 /// Child task identifier
16 pub child_id: ChildId,
17 /// Full path of the child task in the supervisor tree.
18 pub path: SupervisorPath,
19 /// Generation for the runtime slot that owns this child_start_count.
20 pub generation: Generation,
21 /// ChildStartCount number for the running task.
22 pub child_start_count: ChildStartCount,
23 /// Token that tells the task when cancellation was requested.
24 cancellation_token: CancellationToken,
25 /// Sender used to report readiness.
26 ready_signal: ReadySignal,
27 /// Sender used to publish the latest heartbeat time.
28 heartbeat_sender: watch::Sender<Option<Instant>>,
29}
30
31impl TaskContext {
32 /// Creates a context for a child task.
33 ///
34 /// # Arguments
35 ///
36 /// - `child_id`: Stable child identifier.
37 /// - `path`: Full supervisor tree path for this child.
38 /// - `generation`: Runtime slot generation.
39 /// - `child_start_count`: ChildStartCount number for this execution.
40 ///
41 /// # Returns
42 ///
43 /// Returns the context and a heartbeat receiver for runtime observers.
44 ///
45 /// # Examples
46 ///
47 /// ```
48 /// let (ctx, _heartbeat) = rust_supervisor::task::context::TaskContext::new(
49 /// rust_supervisor::id::types::ChildId::new("worker"),
50 /// rust_supervisor::id::types::SupervisorPath::root().join("worker"),
51 /// rust_supervisor::id::types::Generation::initial(),
52 /// rust_supervisor::id::types::ChildStartCount::first(),
53 /// );
54 /// assert!(!ctx.is_cancelled());
55 /// ```
56 pub fn new(
57 child_id: ChildId,
58 path: SupervisorPath,
59 generation: Generation,
60 child_start_count: ChildStartCount,
61 ) -> (Self, watch::Receiver<Option<Instant>>) {
62 let (ready_signal, _ready_receiver) = ReadySignal::new();
63 let (heartbeat_sender, heartbeat_receiver) = watch::channel(None);
64 (
65 Self {
66 child_id,
67 path,
68 generation,
69 child_start_count,
70 cancellation_token: CancellationToken::new(),
71 ready_signal,
72 heartbeat_sender,
73 },
74 heartbeat_receiver,
75 )
76 }
77
78 /// Creates a task context with an existing readiness signal.
79 ///
80 /// # Arguments
81 ///
82 /// - `child_id`: Stable child identifier.
83 /// - `path`: Full supervisor tree path for this child.
84 /// - `generation`: Runtime slot generation.
85 /// - `child_start_count`: ChildStartCount number for this execution.
86 /// - `ready_signal`: Signal used to publish readiness.
87 ///
88 /// # Returns
89 ///
90 /// Returns the context and a heartbeat receiver for runtime observers.
91 pub fn with_ready_signal(
92 child_id: ChildId,
93 path: SupervisorPath,
94 generation: Generation,
95 child_start_count: ChildStartCount,
96 ready_signal: ReadySignal,
97 ) -> (Self, watch::Receiver<Option<Instant>>) {
98 Self::with_ready_signal_and_cancellation_token(
99 child_id,
100 path,
101 generation,
102 child_start_count,
103 ready_signal,
104 CancellationToken::new(),
105 )
106 }
107
108 /// Creates a task context with an existing readiness signal and token.
109 ///
110 /// # Arguments
111 ///
112 /// - `child_id`: Stable child identifier.
113 /// - `path`: Full supervisor tree path for this child.
114 /// - `generation`: Runtime slot generation.
115 /// - `child_start_count`: ChildStartCount number for this execution.
116 /// - `ready_signal`: Signal used to publish readiness.
117 /// - `cancellation_token`: Token shared with runtime shutdown.
118 ///
119 /// # Returns
120 ///
121 /// Returns the context and a heartbeat receiver for runtime observers.
122 pub fn with_ready_signal_and_cancellation_token(
123 child_id: ChildId,
124 path: SupervisorPath,
125 generation: Generation,
126 child_start_count: ChildStartCount,
127 ready_signal: ReadySignal,
128 cancellation_token: CancellationToken,
129 ) -> (Self, watch::Receiver<Option<Instant>>) {
130 let (heartbeat_sender, heartbeat_receiver) = watch::channel(None);
131 (
132 Self {
133 child_id,
134 path,
135 generation,
136 child_start_count,
137 cancellation_token,
138 ready_signal,
139 heartbeat_sender,
140 },
141 heartbeat_receiver,
142 )
143 }
144
145 /// Reports that the task is ready.
146 ///
147 /// # Arguments
148 ///
149 /// This function has no arguments.
150 ///
151 /// # Returns
152 ///
153 /// This function does not return a value.
154 pub fn mark_ready(&self) {
155 self.ready_signal.mark_ready();
156 }
157
158 /// Reports the current readiness state.
159 ///
160 /// # Arguments
161 ///
162 /// - `state`: Readiness state observed by runtime readers.
163 ///
164 /// # Returns
165 ///
166 /// This function does not return a value.
167 pub fn set_readiness(&self, state: ReadinessState) {
168 self.ready_signal.set_readiness(state);
169 }
170
171 /// Emits a heartbeat with the current monotonic time.
172 ///
173 /// # Arguments
174 ///
175 /// This function has no arguments.
176 ///
177 /// # Returns
178 ///
179 /// This function does not return a value.
180 pub fn heartbeat(&self) {
181 let _ignored = self.heartbeat_sender.send(Some(Instant::now()));
182 }
183
184 /// Requests cancellation for this task child_start_count.
185 ///
186 /// # Arguments
187 ///
188 /// This function has no arguments.
189 ///
190 /// # Returns
191 ///
192 /// This function does not return a value.
193 pub fn cancel(&self) {
194 self.cancellation_token.cancel();
195 }
196
197 /// Returns whether cancellation was requested.
198 ///
199 /// # Arguments
200 ///
201 /// This function has no arguments.
202 ///
203 /// # Returns
204 ///
205 /// Returns `true` when cancellation was requested.
206 pub fn is_cancelled(&self) -> bool {
207 self.cancellation_token.is_cancelled()
208 }
209
210 /// Returns a clone of the cancellation token.
211 ///
212 /// # Arguments
213 ///
214 /// This function has no arguments.
215 ///
216 /// # Returns
217 ///
218 /// Returns the cancellation token for asynchronous selection.
219 pub fn cancellation_token(&self) -> CancellationToken {
220 self.cancellation_token.clone()
221 }
222
223 /// Subscribes to readiness updates.
224 ///
225 /// # Arguments
226 ///
227 /// This function has no arguments.
228 ///
229 /// # Returns
230 ///
231 /// Returns a receiver that observes readiness changes.
232 pub fn readiness_receiver(&self) -> watch::Receiver<ReadinessState> {
233 self.ready_signal.subscribe()
234 }
235}