Skip to main content

actionqueue_engine/scheduler/
attempt_finish.rs

1//! Attempt-finish mutation command derivation from executor outcomes.
2//!
3//! This module defines the engine-side integration seam that maps executor-local
4//! terminal responses into canonical durable attempt result taxonomy used by
5//! storage mutation authority payloads.
6
7use actionqueue_core::ids::{AttemptId, RunId};
8use actionqueue_core::mutation::{
9    AttemptFinishCommand, AttemptOutcome, DurabilityPolicy, MutationAuthority, MutationCommand,
10    MutationOutcome,
11};
12use actionqueue_executor_local::ExecutorResponse;
13
14/// Maps an executor terminal response into a canonical attempt outcome.
15///
16/// For successful responses, the handler's opaque output bytes (if any) are
17/// threaded through to the `AttemptOutcome` and from there into the WAL and
18/// projection, making them queryable from the run's attempt history.
19pub fn map_executor_response_to_outcome(response: &ExecutorResponse) -> AttemptOutcome {
20    match response {
21        ExecutorResponse::Success { output } => match output {
22            Some(bytes) => AttemptOutcome::success_with_output(bytes.clone()),
23            None => AttemptOutcome::success(),
24        },
25        ExecutorResponse::RetryableFailure { error }
26        | ExecutorResponse::TerminalFailure { error } => AttemptOutcome::failure(error),
27        ExecutorResponse::Timeout { timeout_secs } => {
28            AttemptOutcome::timeout(format!("attempt timed out after {timeout_secs}s"))
29        }
30        ExecutorResponse::Suspended { output } => match output {
31            Some(bytes) => AttemptOutcome::suspended_with_output(bytes.clone()),
32            None => AttemptOutcome::suspended(),
33        },
34    }
35}
36
37/// Builds an attempt-finish command from executor response truth.
38pub fn build_attempt_finish_command(
39    sequence: u64,
40    run_id: RunId,
41    attempt_id: AttemptId,
42    response: &ExecutorResponse,
43    timestamp: u64,
44) -> AttemptFinishCommand {
45    AttemptFinishCommand::new(
46        sequence,
47        run_id,
48        attempt_id,
49        map_executor_response_to_outcome(response),
50        timestamp,
51    )
52}
53
54/// Error returned when authority-mediated attempt-finish submission fails.
55#[derive(Debug, Clone, PartialEq, Eq)]
56pub enum AttemptFinishSubmissionError<AuthorityError> {
57    /// Storage authority rejected or failed processing attempt-finish command.
58    Authority {
59        /// Run whose attempt-finish submission failed.
60        run_id: RunId,
61        /// Underlying authority error.
62        source: AuthorityError,
63    },
64}
65
66impl<E: std::fmt::Display> std::fmt::Display for AttemptFinishSubmissionError<E> {
67    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68        match self {
69            AttemptFinishSubmissionError::Authority { run_id, source } => {
70                write!(f, "attempt-finish authority error for run {run_id}: {source}")
71            }
72        }
73    }
74}
75
76impl<E: std::fmt::Debug + std::fmt::Display> std::error::Error for AttemptFinishSubmissionError<E> {}
77
78impl<E> AttemptFinishSubmissionError<E> {
79    /// Extracts the inner authority error, discarding wrapper context.
80    pub fn into_source(self) -> E {
81        match self {
82            Self::Authority { source, .. } => source,
83        }
84    }
85}
86
87/// Builds and submits an attempt-finish command through the mutation authority lane.
88///
89/// This is the engine-side integration seam that takes a pre-built attempt-finish
90/// command and emits it as `MutationCommand::AttemptFinish` through storage-owned
91/// authority.
92pub fn submit_attempt_finish_via_authority<A: MutationAuthority>(
93    finish_command: AttemptFinishCommand,
94    durability: DurabilityPolicy,
95    authority: &mut A,
96) -> Result<MutationOutcome, AttemptFinishSubmissionError<A::Error>> {
97    let run_id = finish_command.run_id();
98    let command = MutationCommand::AttemptFinish(finish_command);
99
100    authority
101        .submit_command(command, durability)
102        .map_err(|source| AttemptFinishSubmissionError::Authority { run_id, source })
103}
104
105#[cfg(test)]
106mod tests {
107    use actionqueue_core::ids::{AttemptId, RunId};
108    use actionqueue_core::mutation::{
109        AppliedMutation, AttemptResultKind, DurabilityPolicy, MutationAuthority, MutationCommand,
110        MutationOutcome,
111    };
112    use actionqueue_executor_local::ExecutorResponse;
113
114    use super::{
115        build_attempt_finish_command, map_executor_response_to_outcome,
116        submit_attempt_finish_via_authority,
117    };
118
119    #[derive(Debug, Default)]
120    struct MockAuthority {
121        submitted: Vec<MutationCommand>,
122    }
123
124    impl MutationAuthority for MockAuthority {
125        type Error = &'static str;
126
127        fn submit_command(
128            &mut self,
129            command: MutationCommand,
130            _durability: DurabilityPolicy,
131        ) -> Result<MutationOutcome, Self::Error> {
132            self.submitted.push(command.clone());
133            match command {
134                MutationCommand::AttemptFinish(details) => Ok(MutationOutcome::new(
135                    details.sequence(),
136                    AppliedMutation::AttemptFinish {
137                        run_id: details.run_id(),
138                        attempt_id: details.attempt_id(),
139                        outcome: details.outcome().clone(),
140                    },
141                )),
142                _ => Err("unexpected command"),
143            }
144        }
145    }
146
147    #[test]
148    fn timeout_response_maps_to_timeout_result_kind() {
149        let response = ExecutorResponse::Timeout { timeout_secs: 5 };
150        assert_eq!(
151            map_executor_response_to_outcome(&response).result(),
152            AttemptResultKind::Timeout
153        );
154    }
155
156    #[test]
157    fn failure_responses_map_to_failure_result_kind() {
158        let retryable = ExecutorResponse::RetryableFailure { error: "retryable".to_string() };
159        let terminal = ExecutorResponse::TerminalFailure { error: "terminal".to_string() };
160
161        assert_eq!(
162            map_executor_response_to_outcome(&retryable).result(),
163            AttemptResultKind::Failure
164        );
165        assert_eq!(
166            map_executor_response_to_outcome(&terminal).result(),
167            AttemptResultKind::Failure
168        );
169    }
170
171    #[test]
172    fn success_response_maps_to_success_result_kind() {
173        let response = ExecutorResponse::Success { output: Some(vec![1, 2, 3]) };
174        assert_eq!(
175            map_executor_response_to_outcome(&response).result(),
176            AttemptResultKind::Success
177        );
178    }
179
180    #[test]
181    fn build_attempt_finish_command_populates_result_and_error_from_response() {
182        let run_id = RunId::new();
183        let attempt_id = AttemptId::new();
184        let response = ExecutorResponse::Timeout { timeout_secs: 9 };
185
186        let command = build_attempt_finish_command(11, run_id, attempt_id, &response, 1_234);
187
188        assert_eq!(command.sequence(), 11);
189        assert_eq!(command.run_id(), run_id);
190        assert_eq!(command.attempt_id(), attempt_id);
191        assert_eq!(command.result(), AttemptResultKind::Timeout);
192        assert_eq!(command.error(), Some("attempt timed out after 9s"));
193        assert_eq!(command.timestamp(), 1_234);
194    }
195
196    #[test]
197    fn submit_attempt_finish_via_authority_maps_timeout_and_submits_canonical_command() {
198        let run_id = RunId::new();
199        let attempt_id = AttemptId::new();
200        let response = ExecutorResponse::Timeout { timeout_secs: 5 };
201        let mut authority = MockAuthority::default();
202
203        let finish_cmd = build_attempt_finish_command(17, run_id, attempt_id, &response, 1_700);
204        let outcome = submit_attempt_finish_via_authority(
205            finish_cmd,
206            DurabilityPolicy::Immediate,
207            &mut authority,
208        )
209        .expect("authority submission should succeed");
210
211        assert_eq!(outcome.sequence(), 17);
212        assert!(matches!(
213            outcome.applied(),
214            AppliedMutation::AttemptFinish {
215                run_id: applied_run_id,
216                attempt_id: applied_attempt_id,
217                outcome: ref o,
218            } if *applied_run_id == run_id
219                && *applied_attempt_id == attempt_id
220                && o.result() == AttemptResultKind::Timeout
221                && o.error() == Some("attempt timed out after 5s")
222        ));
223
224        assert_eq!(authority.submitted.len(), 1);
225        assert!(matches!(
226            &authority.submitted[0],
227            MutationCommand::AttemptFinish(details)
228                if details.sequence() == 17
229                    && details.run_id() == run_id
230                    && details.attempt_id() == attempt_id
231                    && details.result() == AttemptResultKind::Timeout
232                    && details.error() == Some("attempt timed out after 5s")
233                    && details.timestamp() == 1_700
234        ));
235    }
236
237    #[test]
238    fn success_with_output_maps_to_outcome_with_output() {
239        let response = ExecutorResponse::Success { output: Some(b"data".to_vec()) };
240        let outcome = map_executor_response_to_outcome(&response);
241        assert_eq!(outcome.result(), AttemptResultKind::Success);
242        assert_eq!(outcome.output(), Some(b"data".as_slice()));
243    }
244
245    #[test]
246    fn success_without_output_maps_to_outcome_without_output() {
247        let response = ExecutorResponse::Success { output: None };
248        let outcome = map_executor_response_to_outcome(&response);
249        assert_eq!(outcome.result(), AttemptResultKind::Success);
250        assert!(outcome.output().is_none());
251    }
252
253    #[test]
254    fn failure_maps_to_outcome_without_output() {
255        let response = ExecutorResponse::RetryableFailure { error: "err".to_string() };
256        let outcome = map_executor_response_to_outcome(&response);
257        assert_eq!(outcome.result(), AttemptResultKind::Failure);
258        assert!(outcome.output().is_none());
259    }
260
261    #[test]
262    fn submit_attempt_finish_via_authority_maps_failure_to_failure_taxonomy() {
263        let run_id = RunId::new();
264        let attempt_id = AttemptId::new();
265        let response =
266            ExecutorResponse::RetryableFailure { error: "transient network failure".to_string() };
267        let mut authority = MockAuthority::default();
268
269        let finish_cmd = build_attempt_finish_command(21, run_id, attempt_id, &response, 2_100);
270        let outcome = submit_attempt_finish_via_authority(
271            finish_cmd,
272            DurabilityPolicy::Immediate,
273            &mut authority,
274        )
275        .expect("authority submission should succeed");
276
277        assert!(matches!(
278            outcome.applied(),
279            AppliedMutation::AttemptFinish {
280                run_id: applied_run_id,
281                attempt_id: applied_attempt_id,
282                outcome: ref o,
283            } if *applied_run_id == run_id
284                && *applied_attempt_id == attempt_id
285                && o.result() == AttemptResultKind::Failure
286                && o.error() == Some("transient network failure")
287        ));
288    }
289}