rust_supervisor/task/context.rs
1//! Task execution context.
2//!
3//! This module provides the per-attempt handles that a task uses to observe
4//! cancellation, emit heartbeats, and report readiness.
5
6use crate::id::types::{Attempt, ChildId, Generation, SupervisorPath};
7use crate::readiness::signal::ReadySignal;
8use tokio::sync::watch;
9use tokio::time::Instant;
10use tokio_util::sync::CancellationToken;
11
12/// Context passed to a task for a single attempt.
13#[derive(Debug, Clone)]
14pub struct TaskContext {
15 /// Stable child identifier for the task attempt.
16 pub child_id: ChildId,
17 /// Full path of the child in the supervisor tree.
18 pub path: SupervisorPath,
19 /// Generation for the runtime slot that owns this attempt.
20 pub generation: Generation,
21 /// Attempt number for the running task.
22 pub attempt: Attempt,
23 /// Token that tells the task when cancellation was requested.
24 cancellation_token: CancellationToken,
25 /// Sender used to report readiness.
26 ready_signal: ReadySignal,
27 /// Sender used to publish the latest heartbeat time.
28 heartbeat_sender: watch::Sender<Option<Instant>>,
29}
30
31impl TaskContext {
32 /// Creates a task context for a child attempt.
33 ///
34 /// # Arguments
35 ///
36 /// - `child_id`: Stable child identifier.
37 /// - `path`: Full supervisor tree path for this child.
38 /// - `generation`: Runtime slot generation.
39 /// - `attempt`: Attempt number for this execution.
40 ///
41 /// # Returns
42 ///
43 /// Returns the context and a heartbeat receiver for runtime observers.
44 ///
45 /// # Examples
46 ///
47 /// ```
48 /// let (ctx, _heartbeat) = rust_supervisor::task::context::TaskContext::new(
49 /// rust_supervisor::id::types::ChildId::new("worker"),
50 /// rust_supervisor::id::types::SupervisorPath::root().join("worker"),
51 /// rust_supervisor::id::types::Generation::initial(),
52 /// rust_supervisor::id::types::Attempt::first(),
53 /// );
54 /// assert!(!ctx.is_cancelled());
55 /// ```
56 pub fn new(
57 child_id: ChildId,
58 path: SupervisorPath,
59 generation: Generation,
60 attempt: Attempt,
61 ) -> (Self, watch::Receiver<Option<Instant>>) {
62 let (ready_signal, _ready_receiver) = ReadySignal::new();
63 let (heartbeat_sender, heartbeat_receiver) = watch::channel(None);
64 (
65 Self {
66 child_id,
67 path,
68 generation,
69 attempt,
70 cancellation_token: CancellationToken::new(),
71 ready_signal,
72 heartbeat_sender,
73 },
74 heartbeat_receiver,
75 )
76 }
77
78 /// Creates a task context with an existing readiness signal.
79 ///
80 /// # Arguments
81 ///
82 /// - `child_id`: Stable child identifier.
83 /// - `path`: Full supervisor tree path for this child.
84 /// - `generation`: Runtime slot generation.
85 /// - `attempt`: Attempt number for this execution.
86 /// - `ready_signal`: Signal used to publish readiness.
87 ///
88 /// # Returns
89 ///
90 /// Returns the context and a heartbeat receiver for runtime observers.
91 pub fn with_ready_signal(
92 child_id: ChildId,
93 path: SupervisorPath,
94 generation: Generation,
95 attempt: Attempt,
96 ready_signal: ReadySignal,
97 ) -> (Self, watch::Receiver<Option<Instant>>) {
98 let (heartbeat_sender, heartbeat_receiver) = watch::channel(None);
99 (
100 Self {
101 child_id,
102 path,
103 generation,
104 attempt,
105 cancellation_token: CancellationToken::new(),
106 ready_signal,
107 heartbeat_sender,
108 },
109 heartbeat_receiver,
110 )
111 }
112
113 /// Reports that the task is ready.
114 ///
115 /// # Arguments
116 ///
117 /// This function has no arguments.
118 ///
119 /// # Returns
120 ///
121 /// This function does not return a value.
122 pub fn mark_ready(&self) {
123 self.ready_signal.mark_ready();
124 }
125
126 /// Emits a heartbeat with the current monotonic time.
127 ///
128 /// # Arguments
129 ///
130 /// This function has no arguments.
131 ///
132 /// # Returns
133 ///
134 /// This function does not return a value.
135 pub fn heartbeat(&self) {
136 let _ignored = self.heartbeat_sender.send(Some(Instant::now()));
137 }
138
139 /// Requests cancellation for this task attempt.
140 ///
141 /// # Arguments
142 ///
143 /// This function has no arguments.
144 ///
145 /// # Returns
146 ///
147 /// This function does not return a value.
148 pub fn cancel(&self) {
149 self.cancellation_token.cancel();
150 }
151
152 /// Returns whether cancellation was requested.
153 ///
154 /// # Arguments
155 ///
156 /// This function has no arguments.
157 ///
158 /// # Returns
159 ///
160 /// Returns `true` when cancellation was requested.
161 pub fn is_cancelled(&self) -> bool {
162 self.cancellation_token.is_cancelled()
163 }
164
165 /// Returns a clone of the cancellation token.
166 ///
167 /// # Arguments
168 ///
169 /// This function has no arguments.
170 ///
171 /// # Returns
172 ///
173 /// Returns the cancellation token for asynchronous selection.
174 pub fn cancellation_token(&self) -> CancellationToken {
175 self.cancellation_token.clone()
176 }
177
178 /// Subscribes to readiness updates.
179 ///
180 /// # Arguments
181 ///
182 /// This function has no arguments.
183 ///
184 /// # Returns
185 ///
186 /// Returns a receiver that observes readiness changes.
187 pub fn readiness_receiver(&self) -> watch::Receiver<bool> {
188 self.ready_signal.subscribe()
189 }
190}