Skip to main content

alembic_engine/
apply_retry.rs

1use crate::{AppliedOp, Op};
2use anyhow::Result;
3use async_trait::async_trait;
4
5#[derive(Debug)]
6pub struct RetryApplyResult {
7    pub applied: Vec<AppliedOp>,
8    pub pending: Vec<Op>,
9}
10
11#[async_trait]
12pub trait RetryApplyDriver {
13    async fn apply_non_delete(&mut self, op: &Op) -> Result<AppliedOp>;
14    fn is_retryable(&self, err: &anyhow::Error) -> bool;
15}
16
17pub async fn apply_non_delete_with_retries(
18    ops: &[Op],
19    driver: &mut impl RetryApplyDriver,
20) -> Result<RetryApplyResult> {
21    let mut applied = Vec::new();
22    let mut pending: Vec<Op> = ops
23        .iter()
24        .filter(|op| !matches!(op, Op::Delete { .. }))
25        .cloned()
26        .collect();
27
28    while !pending.is_empty() {
29        let current = std::mem::take(&mut pending);
30        let applied_before = applied.len();
31
32        for op in current {
33            match driver.apply_non_delete(&op).await {
34                Ok(applied_op) => applied.push(applied_op),
35                Err(err) if driver.is_retryable(&err) => pending.push(op),
36                Err(err) => return Err(err),
37            }
38        }
39
40        // Only break if no progress was made (no items applied in this iteration)
41        if applied.len() == applied_before {
42            break;
43        }
44    }
45
46    Ok(RetryApplyResult { applied, pending })
47}
48
49#[cfg(test)]
50mod tests {
51    use super::*;
52    use crate::BackendId;
53    use alembic_core::{JsonMap, Key, Object, TypeName, Uid};
54    use anyhow::anyhow;
55
56    fn create_op(uid: Uid) -> Op {
57        Op::Create {
58            uid,
59            type_name: TypeName::new("test.item"),
60            desired: Object {
61                uid,
62                type_name: TypeName::new("test.item"),
63                key: Key::default(),
64                attrs: JsonMap::default(),
65                source: None,
66            },
67        }
68    }
69
70    #[derive(Clone, Copy)]
71    enum Mode {
72        RetryThenOk,
73        AlwaysRetry,
74        Fatal,
75    }
76
77    struct TestDriver {
78        attempts: usize,
79        mode: Mode,
80    }
81
82    #[async_trait]
83    impl RetryApplyDriver for TestDriver {
84        async fn apply_non_delete(&mut self, op: &Op) -> Result<AppliedOp> {
85            self.attempts += 1;
86            match self.mode {
87                Mode::RetryThenOk if self.attempts == 1 => {
88                    Err(anyhow!("missing referenced uid {}", op.uid()))
89                }
90                Mode::AlwaysRetry => Err(anyhow!("missing referenced uid {}", op.uid())),
91                Mode::Fatal => Err(anyhow!("boom")),
92                Mode::RetryThenOk => Ok(AppliedOp {
93                    uid: op.uid(),
94                    type_name: op.type_name().clone(),
95                    backend_id: Some(BackendId::Int(1)),
96                }),
97            }
98        }
99
100        fn is_retryable(&self, err: &anyhow::Error) -> bool {
101            err.to_string().contains("missing referenced uid")
102        }
103    }
104
105    #[tokio::test]
106    async fn retries_then_applies() {
107        let uid1 = Uid::from_u128(1);
108        let uid2 = Uid::from_u128(2);
109        let ops = vec![create_op(uid1), create_op(uid2)];
110        let mut driver = TestDriver {
111            attempts: 0,
112            mode: Mode::RetryThenOk,
113        };
114
115        let result = apply_non_delete_with_retries(&ops, &mut driver)
116            .await
117            .unwrap();
118
119        assert_eq!(driver.attempts, 3);
120        assert_eq!(result.applied.len(), 2);
121        assert!(result.pending.is_empty());
122    }
123
124    #[tokio::test]
125    async fn returns_pending_when_stuck() {
126        let uid = Uid::from_u128(1);
127        let ops = vec![create_op(uid)];
128        let mut driver = TestDriver {
129            attempts: 0,
130            mode: Mode::AlwaysRetry,
131        };
132
133        let result = apply_non_delete_with_retries(&ops, &mut driver)
134            .await
135            .unwrap();
136
137        assert!(result.applied.is_empty());
138        assert_eq!(result.pending.len(), 1);
139    }
140
141    #[tokio::test]
142    async fn returns_non_retryable_error() {
143        let uid = Uid::from_u128(1);
144        let ops = vec![create_op(uid)];
145        let mut driver = TestDriver {
146            attempts: 0,
147            mode: Mode::Fatal,
148        };
149
150        let err = apply_non_delete_with_retries(&ops, &mut driver)
151            .await
152            .unwrap_err();
153
154        assert!(err.to_string().contains("boom"));
155    }
156
157    #[tokio::test]
158    async fn ignores_delete_ops() {
159        let uid = Uid::from_u128(1);
160        let ops = vec![Op::Delete {
161            uid,
162            type_name: TypeName::new("test.item"),
163            key: Key::default(),
164            backend_id: None,
165        }];
166        let mut driver = TestDriver {
167            attempts: 0,
168            mode: Mode::Fatal,
169        };
170
171        let result = apply_non_delete_with_retries(&ops, &mut driver)
172            .await
173            .unwrap();
174
175        assert_eq!(driver.attempts, 0);
176        assert!(result.pending.is_empty());
177        assert!(result.applied.is_empty());
178    }
179}