actionqueue_engine/scheduler/
attempt_finish.rs1use actionqueue_core::ids::{AttemptId, RunId};
8use actionqueue_core::mutation::{
9 AttemptFinishCommand, AttemptOutcome, DurabilityPolicy, MutationAuthority, MutationCommand,
10 MutationOutcome,
11};
12use actionqueue_executor_local::ExecutorResponse;
13
14pub fn map_executor_response_to_outcome(response: &ExecutorResponse) -> AttemptOutcome {
20 match response {
21 ExecutorResponse::Success { output } => match output {
22 Some(bytes) => AttemptOutcome::success_with_output(bytes.clone()),
23 None => AttemptOutcome::success(),
24 },
25 ExecutorResponse::RetryableFailure { error }
26 | ExecutorResponse::TerminalFailure { error } => AttemptOutcome::failure(error),
27 ExecutorResponse::Timeout { timeout_secs } => {
28 AttemptOutcome::timeout(format!("attempt timed out after {timeout_secs}s"))
29 }
30 ExecutorResponse::Suspended { output } => match output {
31 Some(bytes) => AttemptOutcome::suspended_with_output(bytes.clone()),
32 None => AttemptOutcome::suspended(),
33 },
34 }
35}
36
37pub fn build_attempt_finish_command(
39 sequence: u64,
40 run_id: RunId,
41 attempt_id: AttemptId,
42 response: &ExecutorResponse,
43 timestamp: u64,
44) -> AttemptFinishCommand {
45 AttemptFinishCommand::new(
46 sequence,
47 run_id,
48 attempt_id,
49 map_executor_response_to_outcome(response),
50 timestamp,
51 )
52}
53
54#[derive(Debug, Clone, PartialEq, Eq)]
56pub enum AttemptFinishSubmissionError<AuthorityError> {
57 Authority {
59 run_id: RunId,
61 source: AuthorityError,
63 },
64}
65
66impl<E: std::fmt::Display> std::fmt::Display for AttemptFinishSubmissionError<E> {
67 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68 match self {
69 AttemptFinishSubmissionError::Authority { run_id, source } => {
70 write!(f, "attempt-finish authority error for run {run_id}: {source}")
71 }
72 }
73 }
74}
75
76impl<E: std::fmt::Debug + std::fmt::Display> std::error::Error for AttemptFinishSubmissionError<E> {}
77
78impl<E> AttemptFinishSubmissionError<E> {
79 pub fn into_source(self) -> E {
81 match self {
82 Self::Authority { source, .. } => source,
83 }
84 }
85}
86
87pub fn submit_attempt_finish_via_authority<A: MutationAuthority>(
93 finish_command: AttemptFinishCommand,
94 durability: DurabilityPolicy,
95 authority: &mut A,
96) -> Result<MutationOutcome, AttemptFinishSubmissionError<A::Error>> {
97 let run_id = finish_command.run_id();
98 let command = MutationCommand::AttemptFinish(finish_command);
99
100 authority
101 .submit_command(command, durability)
102 .map_err(|source| AttemptFinishSubmissionError::Authority { run_id, source })
103}
104
105#[cfg(test)]
106mod tests {
107 use actionqueue_core::ids::{AttemptId, RunId};
108 use actionqueue_core::mutation::{
109 AppliedMutation, AttemptResultKind, DurabilityPolicy, MutationAuthority, MutationCommand,
110 MutationOutcome,
111 };
112 use actionqueue_executor_local::ExecutorResponse;
113
114 use super::{
115 build_attempt_finish_command, map_executor_response_to_outcome,
116 submit_attempt_finish_via_authority,
117 };
118
119 #[derive(Debug, Default)]
120 struct MockAuthority {
121 submitted: Vec<MutationCommand>,
122 }
123
124 impl MutationAuthority for MockAuthority {
125 type Error = &'static str;
126
127 fn submit_command(
128 &mut self,
129 command: MutationCommand,
130 _durability: DurabilityPolicy,
131 ) -> Result<MutationOutcome, Self::Error> {
132 self.submitted.push(command.clone());
133 match command {
134 MutationCommand::AttemptFinish(details) => Ok(MutationOutcome::new(
135 details.sequence(),
136 AppliedMutation::AttemptFinish {
137 run_id: details.run_id(),
138 attempt_id: details.attempt_id(),
139 outcome: details.outcome().clone(),
140 },
141 )),
142 _ => Err("unexpected command"),
143 }
144 }
145 }
146
147 #[test]
148 fn timeout_response_maps_to_timeout_result_kind() {
149 let response = ExecutorResponse::Timeout { timeout_secs: 5 };
150 assert_eq!(
151 map_executor_response_to_outcome(&response).result(),
152 AttemptResultKind::Timeout
153 );
154 }
155
156 #[test]
157 fn failure_responses_map_to_failure_result_kind() {
158 let retryable = ExecutorResponse::RetryableFailure { error: "retryable".to_string() };
159 let terminal = ExecutorResponse::TerminalFailure { error: "terminal".to_string() };
160
161 assert_eq!(
162 map_executor_response_to_outcome(&retryable).result(),
163 AttemptResultKind::Failure
164 );
165 assert_eq!(
166 map_executor_response_to_outcome(&terminal).result(),
167 AttemptResultKind::Failure
168 );
169 }
170
171 #[test]
172 fn success_response_maps_to_success_result_kind() {
173 let response = ExecutorResponse::Success { output: Some(vec![1, 2, 3]) };
174 assert_eq!(
175 map_executor_response_to_outcome(&response).result(),
176 AttemptResultKind::Success
177 );
178 }
179
180 #[test]
181 fn build_attempt_finish_command_populates_result_and_error_from_response() {
182 let run_id = RunId::new();
183 let attempt_id = AttemptId::new();
184 let response = ExecutorResponse::Timeout { timeout_secs: 9 };
185
186 let command = build_attempt_finish_command(11, run_id, attempt_id, &response, 1_234);
187
188 assert_eq!(command.sequence(), 11);
189 assert_eq!(command.run_id(), run_id);
190 assert_eq!(command.attempt_id(), attempt_id);
191 assert_eq!(command.result(), AttemptResultKind::Timeout);
192 assert_eq!(command.error(), Some("attempt timed out after 9s"));
193 assert_eq!(command.timestamp(), 1_234);
194 }
195
196 #[test]
197 fn submit_attempt_finish_via_authority_maps_timeout_and_submits_canonical_command() {
198 let run_id = RunId::new();
199 let attempt_id = AttemptId::new();
200 let response = ExecutorResponse::Timeout { timeout_secs: 5 };
201 let mut authority = MockAuthority::default();
202
203 let finish_cmd = build_attempt_finish_command(17, run_id, attempt_id, &response, 1_700);
204 let outcome = submit_attempt_finish_via_authority(
205 finish_cmd,
206 DurabilityPolicy::Immediate,
207 &mut authority,
208 )
209 .expect("authority submission should succeed");
210
211 assert_eq!(outcome.sequence(), 17);
212 assert!(matches!(
213 outcome.applied(),
214 AppliedMutation::AttemptFinish {
215 run_id: applied_run_id,
216 attempt_id: applied_attempt_id,
217 outcome: ref o,
218 } if *applied_run_id == run_id
219 && *applied_attempt_id == attempt_id
220 && o.result() == AttemptResultKind::Timeout
221 && o.error() == Some("attempt timed out after 5s")
222 ));
223
224 assert_eq!(authority.submitted.len(), 1);
225 assert!(matches!(
226 &authority.submitted[0],
227 MutationCommand::AttemptFinish(details)
228 if details.sequence() == 17
229 && details.run_id() == run_id
230 && details.attempt_id() == attempt_id
231 && details.result() == AttemptResultKind::Timeout
232 && details.error() == Some("attempt timed out after 5s")
233 && details.timestamp() == 1_700
234 ));
235 }
236
237 #[test]
238 fn success_with_output_maps_to_outcome_with_output() {
239 let response = ExecutorResponse::Success { output: Some(b"data".to_vec()) };
240 let outcome = map_executor_response_to_outcome(&response);
241 assert_eq!(outcome.result(), AttemptResultKind::Success);
242 assert_eq!(outcome.output(), Some(b"data".as_slice()));
243 }
244
245 #[test]
246 fn success_without_output_maps_to_outcome_without_output() {
247 let response = ExecutorResponse::Success { output: None };
248 let outcome = map_executor_response_to_outcome(&response);
249 assert_eq!(outcome.result(), AttemptResultKind::Success);
250 assert!(outcome.output().is_none());
251 }
252
253 #[test]
254 fn failure_maps_to_outcome_without_output() {
255 let response = ExecutorResponse::RetryableFailure { error: "err".to_string() };
256 let outcome = map_executor_response_to_outcome(&response);
257 assert_eq!(outcome.result(), AttemptResultKind::Failure);
258 assert!(outcome.output().is_none());
259 }
260
261 #[test]
262 fn submit_attempt_finish_via_authority_maps_failure_to_failure_taxonomy() {
263 let run_id = RunId::new();
264 let attempt_id = AttemptId::new();
265 let response =
266 ExecutorResponse::RetryableFailure { error: "transient network failure".to_string() };
267 let mut authority = MockAuthority::default();
268
269 let finish_cmd = build_attempt_finish_command(21, run_id, attempt_id, &response, 2_100);
270 let outcome = submit_attempt_finish_via_authority(
271 finish_cmd,
272 DurabilityPolicy::Immediate,
273 &mut authority,
274 )
275 .expect("authority submission should succeed");
276
277 assert!(matches!(
278 outcome.applied(),
279 AppliedMutation::AttemptFinish {
280 run_id: applied_run_id,
281 attempt_id: applied_attempt_id,
282 outcome: ref o,
283 } if *applied_run_id == run_id
284 && *applied_attempt_id == attempt_id
285 && o.result() == AttemptResultKind::Failure
286 && o.error() == Some("transient network failure")
287 ));
288 }
289}