alembic_engine/
apply_retry.rs1use crate::{AppliedOp, Op};
2use anyhow::Result;
3use async_trait::async_trait;
4
5#[derive(Debug)]
6pub struct RetryApplyResult {
7 pub applied: Vec<AppliedOp>,
8 pub pending: Vec<Op>,
9}
10
11#[async_trait]
12pub trait RetryApplyDriver {
13 async fn apply_non_delete(&mut self, op: &Op) -> Result<AppliedOp>;
14 fn is_retryable(&self, err: &anyhow::Error) -> bool;
15}
16
17pub async fn apply_non_delete_with_retries(
18 ops: &[Op],
19 driver: &mut impl RetryApplyDriver,
20) -> Result<RetryApplyResult> {
21 let mut applied = Vec::new();
22 let mut pending: Vec<Op> = ops
23 .iter()
24 .filter(|op| !matches!(op, Op::Delete { .. }))
25 .cloned()
26 .collect();
27
28 while !pending.is_empty() {
29 let current = std::mem::take(&mut pending);
30 let current_len = current.len();
31
32 for op in current {
33 match driver.apply_non_delete(&op).await {
34 Ok(applied_op) => applied.push(applied_op),
35 Err(err) if driver.is_retryable(&err) => pending.push(op),
36 Err(err) => return Err(err),
37 }
38 }
39
40 if pending.len() == current_len {
41 break;
42 }
43 }
44
45 Ok(RetryApplyResult { applied, pending })
46}
47
48#[cfg(test)]
49mod tests {
50 use super::*;
51 use crate::BackendId;
52 use alembic_core::{JsonMap, Key, Object, TypeName, Uid};
53 use anyhow::anyhow;
54
55 fn create_op(uid: Uid) -> Op {
56 Op::Create {
57 uid,
58 type_name: TypeName::new("test.item"),
59 desired: Object {
60 uid,
61 type_name: TypeName::new("test.item"),
62 key: Key::default(),
63 attrs: JsonMap::default(),
64 source: None,
65 },
66 }
67 }
68
69 #[derive(Clone, Copy)]
70 enum Mode {
71 RetryThenOk,
72 AlwaysRetry,
73 Fatal,
74 }
75
76 struct TestDriver {
77 attempts: usize,
78 mode: Mode,
79 }
80
81 #[async_trait]
82 impl RetryApplyDriver for TestDriver {
83 async fn apply_non_delete(&mut self, op: &Op) -> Result<AppliedOp> {
84 self.attempts += 1;
85 match self.mode {
86 Mode::RetryThenOk if self.attempts == 1 => {
87 Err(anyhow!("missing referenced uid {}", op.uid()))
88 }
89 Mode::AlwaysRetry => Err(anyhow!("missing referenced uid {}", op.uid())),
90 Mode::Fatal => Err(anyhow!("boom")),
91 Mode::RetryThenOk => Ok(AppliedOp {
92 uid: op.uid(),
93 type_name: op.type_name().clone(),
94 backend_id: Some(BackendId::Int(1)),
95 }),
96 }
97 }
98
99 fn is_retryable(&self, err: &anyhow::Error) -> bool {
100 err.to_string().contains("missing referenced uid")
101 }
102 }
103
104 #[tokio::test]
105 async fn retries_then_applies() {
106 let uid1 = Uid::from_u128(1);
107 let uid2 = Uid::from_u128(2);
108 let ops = vec![create_op(uid1), create_op(uid2)];
109 let mut driver = TestDriver {
110 attempts: 0,
111 mode: Mode::RetryThenOk,
112 };
113
114 let result = apply_non_delete_with_retries(&ops, &mut driver)
115 .await
116 .unwrap();
117
118 assert_eq!(driver.attempts, 3);
119 assert_eq!(result.applied.len(), 2);
120 assert!(result.pending.is_empty());
121 }
122
123 #[tokio::test]
124 async fn returns_pending_when_stuck() {
125 let uid = Uid::from_u128(1);
126 let ops = vec![create_op(uid)];
127 let mut driver = TestDriver {
128 attempts: 0,
129 mode: Mode::AlwaysRetry,
130 };
131
132 let result = apply_non_delete_with_retries(&ops, &mut driver)
133 .await
134 .unwrap();
135
136 assert!(result.applied.is_empty());
137 assert_eq!(result.pending.len(), 1);
138 }
139
140 #[tokio::test]
141 async fn returns_non_retryable_error() {
142 let uid = Uid::from_u128(1);
143 let ops = vec![create_op(uid)];
144 let mut driver = TestDriver {
145 attempts: 0,
146 mode: Mode::Fatal,
147 };
148
149 let err = apply_non_delete_with_retries(&ops, &mut driver)
150 .await
151 .unwrap_err();
152
153 assert!(err.to_string().contains("boom"));
154 }
155
156 #[tokio::test]
157 async fn ignores_delete_ops() {
158 let uid = Uid::from_u128(1);
159 let ops = vec![Op::Delete {
160 uid,
161 type_name: TypeName::new("test.item"),
162 key: Key::default(),
163 backend_id: None,
164 }];
165 let mut driver = TestDriver {
166 attempts: 0,
167 mode: Mode::Fatal,
168 };
169
170 let result = apply_non_delete_with_retries(&ops, &mut driver)
171 .await
172 .unwrap();
173
174 assert_eq!(driver.attempts, 0);
175 assert!(result.pending.is_empty());
176 assert!(result.applied.is_empty());
177 }
178}