alembic_engine/
apply_retry.rs1use crate::{AppliedOp, Op};
2use anyhow::Result;
3use async_trait::async_trait;
4
5#[derive(Debug)]
6pub struct RetryApplyResult {
7 pub applied: Vec<AppliedOp>,
8 pub pending: Vec<Op>,
9}
10
11#[async_trait]
12pub trait RetryApplyDriver {
13 async fn apply_non_delete(&mut self, op: &Op) -> Result<AppliedOp>;
14 fn is_retryable(&self, err: &anyhow::Error) -> bool;
15}
16
17pub async fn apply_non_delete_with_retries(
18 ops: &[Op],
19 driver: &mut impl RetryApplyDriver,
20) -> Result<RetryApplyResult> {
21 let mut applied = Vec::new();
22 let mut pending: Vec<Op> = ops
23 .iter()
24 .filter(|op| !matches!(op, Op::Delete { .. }))
25 .cloned()
26 .collect();
27
28 while !pending.is_empty() {
29 let current = std::mem::take(&mut pending);
30 let applied_before = applied.len();
31
32 for op in current {
33 match driver.apply_non_delete(&op).await {
34 Ok(applied_op) => applied.push(applied_op),
35 Err(err) if driver.is_retryable(&err) => pending.push(op),
36 Err(err) => return Err(err),
37 }
38 }
39
40 if applied.len() == applied_before {
42 break;
43 }
44 }
45
46 Ok(RetryApplyResult { applied, pending })
47}
48
49#[cfg(test)]
50mod tests {
51 use super::*;
52 use crate::BackendId;
53 use alembic_core::{JsonMap, Key, Object, TypeName, Uid};
54 use anyhow::anyhow;
55
56 fn create_op(uid: Uid) -> Op {
57 Op::Create {
58 uid,
59 type_name: TypeName::new("test.item"),
60 desired: Object {
61 uid,
62 type_name: TypeName::new("test.item"),
63 key: Key::default(),
64 attrs: JsonMap::default(),
65 source: None,
66 },
67 }
68 }
69
70 #[derive(Clone, Copy)]
71 enum Mode {
72 RetryThenOk,
73 AlwaysRetry,
74 Fatal,
75 }
76
77 struct TestDriver {
78 attempts: usize,
79 mode: Mode,
80 }
81
82 #[async_trait]
83 impl RetryApplyDriver for TestDriver {
84 async fn apply_non_delete(&mut self, op: &Op) -> Result<AppliedOp> {
85 self.attempts += 1;
86 match self.mode {
87 Mode::RetryThenOk if self.attempts == 1 => {
88 Err(anyhow!("missing referenced uid {}", op.uid()))
89 }
90 Mode::AlwaysRetry => Err(anyhow!("missing referenced uid {}", op.uid())),
91 Mode::Fatal => Err(anyhow!("boom")),
92 Mode::RetryThenOk => Ok(AppliedOp {
93 uid: op.uid(),
94 type_name: op.type_name().clone(),
95 backend_id: Some(BackendId::Int(1)),
96 }),
97 }
98 }
99
100 fn is_retryable(&self, err: &anyhow::Error) -> bool {
101 err.to_string().contains("missing referenced uid")
102 }
103 }
104
105 #[tokio::test]
106 async fn retries_then_applies() {
107 let uid1 = Uid::from_u128(1);
108 let uid2 = Uid::from_u128(2);
109 let ops = vec![create_op(uid1), create_op(uid2)];
110 let mut driver = TestDriver {
111 attempts: 0,
112 mode: Mode::RetryThenOk,
113 };
114
115 let result = apply_non_delete_with_retries(&ops, &mut driver)
116 .await
117 .unwrap();
118
119 assert_eq!(driver.attempts, 3);
120 assert_eq!(result.applied.len(), 2);
121 assert!(result.pending.is_empty());
122 }
123
124 #[tokio::test]
125 async fn returns_pending_when_stuck() {
126 let uid = Uid::from_u128(1);
127 let ops = vec![create_op(uid)];
128 let mut driver = TestDriver {
129 attempts: 0,
130 mode: Mode::AlwaysRetry,
131 };
132
133 let result = apply_non_delete_with_retries(&ops, &mut driver)
134 .await
135 .unwrap();
136
137 assert!(result.applied.is_empty());
138 assert_eq!(result.pending.len(), 1);
139 }
140
141 #[tokio::test]
142 async fn returns_non_retryable_error() {
143 let uid = Uid::from_u128(1);
144 let ops = vec![create_op(uid)];
145 let mut driver = TestDriver {
146 attempts: 0,
147 mode: Mode::Fatal,
148 };
149
150 let err = apply_non_delete_with_retries(&ops, &mut driver)
151 .await
152 .unwrap_err();
153
154 assert!(err.to_string().contains("boom"));
155 }
156
157 #[tokio::test]
158 async fn ignores_delete_ops() {
159 let uid = Uid::from_u128(1);
160 let ops = vec![Op::Delete {
161 uid,
162 type_name: TypeName::new("test.item"),
163 key: Key::default(),
164 backend_id: None,
165 }];
166 let mut driver = TestDriver {
167 attempts: 0,
168 mode: Mode::Fatal,
169 };
170
171 let result = apply_non_delete_with_retries(&ops, &mut driver)
172 .await
173 .unwrap();
174
175 assert_eq!(driver.attempts, 0);
176 assert!(result.pending.is_empty());
177 assert!(result.applied.is_empty());
178 }
179}