Skip to main content

alembic_engine/
apply_retry.rs

1use crate::{AppliedOp, Op};
2use anyhow::Result;
3use async_trait::async_trait;
4
5#[derive(Debug)]
6pub struct RetryApplyResult {
7    pub applied: Vec<AppliedOp>,
8    pub pending: Vec<Op>,
9}
10
11#[async_trait]
12pub trait RetryApplyDriver {
13    async fn apply_non_delete(&mut self, op: &Op) -> Result<AppliedOp>;
14    fn is_retryable(&self, err: &anyhow::Error) -> bool;
15}
16
17pub async fn apply_non_delete_with_retries(
18    ops: &[Op],
19    driver: &mut impl RetryApplyDriver,
20) -> Result<RetryApplyResult> {
21    let mut applied = Vec::new();
22    let mut pending: Vec<Op> = ops
23        .iter()
24        .filter(|op| !matches!(op, Op::Delete { .. }))
25        .cloned()
26        .collect();
27
28    while !pending.is_empty() {
29        let current = std::mem::take(&mut pending);
30        let current_len = current.len();
31
32        for op in current {
33            match driver.apply_non_delete(&op).await {
34                Ok(applied_op) => applied.push(applied_op),
35                Err(err) if driver.is_retryable(&err) => pending.push(op),
36                Err(err) => return Err(err),
37            }
38        }
39
40        if pending.len() == current_len {
41            break;
42        }
43    }
44
45    Ok(RetryApplyResult { applied, pending })
46}
47
48#[cfg(test)]
49mod tests {
50    use super::*;
51    use crate::BackendId;
52    use alembic_core::{JsonMap, Key, Object, TypeName, Uid};
53    use anyhow::anyhow;
54
55    fn create_op(uid: Uid) -> Op {
56        Op::Create {
57            uid,
58            type_name: TypeName::new("test.item"),
59            desired: Object {
60                uid,
61                type_name: TypeName::new("test.item"),
62                key: Key::default(),
63                attrs: JsonMap::default(),
64                source: None,
65            },
66        }
67    }
68
69    #[derive(Clone, Copy)]
70    enum Mode {
71        RetryThenOk,
72        AlwaysRetry,
73        Fatal,
74    }
75
76    struct TestDriver {
77        attempts: usize,
78        mode: Mode,
79    }
80
81    #[async_trait]
82    impl RetryApplyDriver for TestDriver {
83        async fn apply_non_delete(&mut self, op: &Op) -> Result<AppliedOp> {
84            self.attempts += 1;
85            match self.mode {
86                Mode::RetryThenOk if self.attempts == 1 => {
87                    Err(anyhow!("missing referenced uid {}", op.uid()))
88                }
89                Mode::AlwaysRetry => Err(anyhow!("missing referenced uid {}", op.uid())),
90                Mode::Fatal => Err(anyhow!("boom")),
91                Mode::RetryThenOk => Ok(AppliedOp {
92                    uid: op.uid(),
93                    type_name: op.type_name().clone(),
94                    backend_id: Some(BackendId::Int(1)),
95                }),
96            }
97        }
98
99        fn is_retryable(&self, err: &anyhow::Error) -> bool {
100            err.to_string().contains("missing referenced uid")
101        }
102    }
103
104    #[tokio::test]
105    async fn retries_then_applies() {
106        let uid1 = Uid::from_u128(1);
107        let uid2 = Uid::from_u128(2);
108        let ops = vec![create_op(uid1), create_op(uid2)];
109        let mut driver = TestDriver {
110            attempts: 0,
111            mode: Mode::RetryThenOk,
112        };
113
114        let result = apply_non_delete_with_retries(&ops, &mut driver)
115            .await
116            .unwrap();
117
118        assert_eq!(driver.attempts, 3);
119        assert_eq!(result.applied.len(), 2);
120        assert!(result.pending.is_empty());
121    }
122
123    #[tokio::test]
124    async fn returns_pending_when_stuck() {
125        let uid = Uid::from_u128(1);
126        let ops = vec![create_op(uid)];
127        let mut driver = TestDriver {
128            attempts: 0,
129            mode: Mode::AlwaysRetry,
130        };
131
132        let result = apply_non_delete_with_retries(&ops, &mut driver)
133            .await
134            .unwrap();
135
136        assert!(result.applied.is_empty());
137        assert_eq!(result.pending.len(), 1);
138    }
139
140    #[tokio::test]
141    async fn returns_non_retryable_error() {
142        let uid = Uid::from_u128(1);
143        let ops = vec![create_op(uid)];
144        let mut driver = TestDriver {
145            attempts: 0,
146            mode: Mode::Fatal,
147        };
148
149        let err = apply_non_delete_with_retries(&ops, &mut driver)
150            .await
151            .unwrap_err();
152
153        assert!(err.to_string().contains("boom"));
154    }
155
156    #[tokio::test]
157    async fn ignores_delete_ops() {
158        let uid = Uid::from_u128(1);
159        let ops = vec![Op::Delete {
160            uid,
161            type_name: TypeName::new("test.item"),
162            key: Key::default(),
163            backend_id: None,
164        }];
165        let mut driver = TestDriver {
166            attempts: 0,
167            mode: Mode::Fatal,
168        };
169
170        let result = apply_non_delete_with_retries(&ops, &mut driver)
171            .await
172            .unwrap();
173
174        assert_eq!(driver.attempts, 0);
175        assert!(result.pending.is_empty());
176        assert!(result.applied.is_empty());
177    }
178}