rustvello_test_suite/
lifecycle.rs1use std::sync::Arc;
8
9use rustvello_core::broker::Broker;
10use rustvello_core::error::TaskError;
11use rustvello_core::orchestrator::Orchestrator;
12use rustvello_core::state_backend::StateBackend;
13use rustvello_proto::call::{CallDTO, SerializedArguments};
14use rustvello_proto::identifiers::RunnerId;
15use rustvello_proto::invocation::InvocationDTO;
16use rustvello_proto::status::InvocationStatus;
17
18use crate::helpers::test_task_id;
19
20pub struct BackendTriple {
22 pub broker: Arc<dyn Broker>,
23 pub orchestrator: Arc<dyn Orchestrator>,
24 pub state_backend: Arc<dyn StateBackend>,
25}
26
27pub async fn test_full_lifecycle_success(b: &BackendTriple) {
29 let task_id = test_task_id("lifecycle_task");
30 let call = CallDTO::new(task_id.clone(), SerializedArguments::default());
31 let runner_id = RunnerId::new();
32
33 let inv_id = b.orchestrator.register_invocation(&call).await.unwrap();
35 let record = b.orchestrator.get_invocation_status(&inv_id).await.unwrap();
36 assert_eq!(record.status, InvocationStatus::Registered);
37
38 b.broker.route_invocation(&inv_id).await.unwrap();
40
41 let retrieved = b.broker.retrieve_invocation(None).await.unwrap();
43 assert_eq!(retrieved, Some(inv_id.clone()));
44
45 let inv_dto = InvocationDTO::new(inv_id.clone(), task_id, call.call_id.clone());
47 b.state_backend
48 .upsert_invocation(&inv_dto, &call)
49 .await
50 .unwrap();
51
52 b.orchestrator
54 .set_invocation_status(&inv_id, InvocationStatus::Pending, Some(&runner_id))
55 .await
56 .unwrap();
57 b.orchestrator
58 .set_invocation_status(&inv_id, InvocationStatus::Running, Some(&runner_id))
59 .await
60 .unwrap();
61 b.orchestrator
62 .set_invocation_status(&inv_id, InvocationStatus::Success, Some(&runner_id))
63 .await
64 .unwrap();
65
66 b.state_backend
68 .store_result(&inv_id, r#""done""#)
69 .await
70 .unwrap();
71
72 let status = b.orchestrator.get_invocation_status(&inv_id).await.unwrap();
74 assert_eq!(status.status, InvocationStatus::Success);
75
76 let result = b.state_backend.get_result(&inv_id).await.unwrap();
77 assert_eq!(result, Some(r#""done""#.to_string()));
78}
79
80pub async fn test_full_lifecycle_failure(b: &BackendTriple) {
82 let task_id = test_task_id("failing_task");
83 let call = CallDTO::new(task_id.clone(), SerializedArguments::default());
84 let runner_id = RunnerId::new();
85
86 let inv_id = b.orchestrator.register_invocation(&call).await.unwrap();
87 b.broker.route_invocation(&inv_id).await.unwrap();
88
89 let inv_dto = InvocationDTO::new(inv_id.clone(), task_id, call.call_id.clone());
90 b.state_backend
91 .upsert_invocation(&inv_dto, &call)
92 .await
93 .unwrap();
94
95 b.orchestrator
97 .set_invocation_status(&inv_id, InvocationStatus::Pending, Some(&runner_id))
98 .await
99 .unwrap();
100 b.orchestrator
101 .set_invocation_status(&inv_id, InvocationStatus::Running, Some(&runner_id))
102 .await
103 .unwrap();
104 b.orchestrator
105 .set_invocation_status(&inv_id, InvocationStatus::Failed, Some(&runner_id))
106 .await
107 .unwrap();
108
109 let err = TaskError {
111 error_type: "RuntimeError".to_string(),
112 message: "something broke".to_string(),
113 traceback: Some("at test".to_string()),
114 };
115 b.state_backend.store_error(&inv_id, &err).await.unwrap();
116
117 let status = b.orchestrator.get_invocation_status(&inv_id).await.unwrap();
118 assert_eq!(status.status, InvocationStatus::Failed);
119
120 let got_err = b.state_backend.get_error(&inv_id).await.unwrap().unwrap();
121 assert_eq!(got_err.error_type, "RuntimeError");
122}
123
124pub async fn test_multiple_invocations(b: &BackendTriple) {
126 let task_id = test_task_id("multi_task");
127 let runner_id = RunnerId::new();
128
129 let mut inv_ids = Vec::new();
131 for i in 0..3 {
132 let mut args = SerializedArguments::new();
133 args.insert("n", i.to_string());
134 let call = CallDTO::new(task_id.clone(), args);
135 let inv_id = b.orchestrator.register_invocation(&call).await.unwrap();
136 b.broker.route_invocation(&inv_id).await.unwrap();
137 inv_ids.push(inv_id);
138 }
139
140 assert_eq!(b.broker.count_invocations(None).await.unwrap(), 3);
142
143 for inv_id in &inv_ids {
145 let retrieved = b.broker.retrieve_invocation(None).await.unwrap();
146 assert_eq!(retrieved.as_ref(), Some(inv_id));
147
148 b.orchestrator
149 .set_invocation_status(inv_id, InvocationStatus::Pending, Some(&runner_id))
150 .await
151 .unwrap();
152 b.orchestrator
153 .set_invocation_status(inv_id, InvocationStatus::Running, Some(&runner_id))
154 .await
155 .unwrap();
156 b.orchestrator
157 .set_invocation_status(inv_id, InvocationStatus::Success, Some(&runner_id))
158 .await
159 .unwrap();
160 }
161
162 assert_eq!(b.broker.count_invocations(None).await.unwrap(), 0);
164}
165
166pub async fn test_purge_all_backends(b: &BackendTriple) {
168 let task_id = test_task_id("purge_task");
169 let call = CallDTO::new(task_id.clone(), SerializedArguments::default());
170
171 let inv_id = b.orchestrator.register_invocation(&call).await.unwrap();
172 b.broker.route_invocation(&inv_id).await.unwrap();
173
174 let inv_dto = InvocationDTO::new(inv_id.clone(), task_id, call.call_id.clone());
175 b.state_backend
176 .upsert_invocation(&inv_dto, &call)
177 .await
178 .unwrap();
179 b.state_backend
180 .store_result(&inv_id, r#""purged""#)
181 .await
182 .unwrap();
183
184 b.broker.purge(None).await.unwrap();
186 b.orchestrator.purge().await.unwrap();
187 b.state_backend.purge().await.unwrap();
188
189 assert_eq!(b.broker.count_invocations(None).await.unwrap(), 0);
191 assert_eq!(
192 b.orchestrator.count_invocations(None, None).await.unwrap(),
193 0
194 );
195 assert!(b.state_backend.get_invocation(&inv_id).await.is_err());
196}
197
198pub async fn test_broker_orchestrator_consistency(b: &BackendTriple) {
200 let task_id = test_task_id("consistency_task");
201 let runner_id = RunnerId::new();
202
203 let call = CallDTO::new(task_id.clone(), SerializedArguments::default());
204 let inv_id = b.orchestrator.register_invocation(&call).await.unwrap();
205 b.broker.route_invocation(&inv_id).await.unwrap();
206
207 assert_eq!(
209 b.orchestrator.count_invocations(None, None).await.unwrap(),
210 1
211 );
212 assert_eq!(b.broker.count_invocations(None).await.unwrap(), 1);
213
214 let _ = b.broker.retrieve_invocation(None).await.unwrap();
216 b.orchestrator
217 .set_invocation_status(&inv_id, InvocationStatus::Pending, Some(&runner_id))
218 .await
219 .unwrap();
220 b.orchestrator
221 .set_invocation_status(&inv_id, InvocationStatus::Running, Some(&runner_id))
222 .await
223 .unwrap();
224 b.orchestrator
225 .set_invocation_status(&inv_id, InvocationStatus::Success, Some(&runner_id))
226 .await
227 .unwrap();
228
229 assert_eq!(b.broker.count_invocations(None).await.unwrap(), 0);
231 assert_eq!(
232 b.orchestrator.count_invocations(None, None).await.unwrap(),
233 1
234 );
235}
236
237#[macro_export]
254macro_rules! lifecycle_suite {
255 ($setup:expr) => {
256 #[tokio::test]
257 async fn suite_lifecycle_full_success() {
258 let triple = $setup;
259 $crate::lifecycle::test_full_lifecycle_success(&triple).await;
260 }
261
262 #[tokio::test]
263 async fn suite_lifecycle_full_failure() {
264 let triple = $setup;
265 $crate::lifecycle::test_full_lifecycle_failure(&triple).await;
266 }
267
268 #[tokio::test]
269 async fn suite_lifecycle_multiple_invocations() {
270 let triple = $setup;
271 $crate::lifecycle::test_multiple_invocations(&triple).await;
272 }
273
274 #[tokio::test]
275 async fn suite_lifecycle_purge_all() {
276 let triple = $setup;
277 $crate::lifecycle::test_purge_all_backends(&triple).await;
278 }
279
280 #[tokio::test]
281 async fn suite_lifecycle_broker_orchestrator_consistency() {
282 let triple = $setup;
283 $crate::lifecycle::test_broker_orchestrator_consistency(&triple).await;
284 }
285 };
286}
287
288#[macro_export]
293macro_rules! async_lifecycle_suite {
294 ($setup:expr) => {
295 #[tokio::test]
296 #[ignore = "requires Docker"]
297 async fn suite_lifecycle_full_success() {
298 let (_c, triple) = $setup.await;
299 $crate::lifecycle::test_full_lifecycle_success(&triple).await;
300 }
301
302 #[tokio::test]
303 #[ignore = "requires Docker"]
304 async fn suite_lifecycle_full_failure() {
305 let (_c, triple) = $setup.await;
306 $crate::lifecycle::test_full_lifecycle_failure(&triple).await;
307 }
308
309 #[tokio::test]
310 #[ignore = "requires Docker"]
311 async fn suite_lifecycle_multiple_invocations() {
312 let (_c, triple) = $setup.await;
313 $crate::lifecycle::test_multiple_invocations(&triple).await;
314 }
315
316 #[tokio::test]
317 #[ignore = "requires Docker"]
318 async fn suite_lifecycle_purge_all() {
319 let (_c, triple) = $setup.await;
320 $crate::lifecycle::test_purge_all_backends(&triple).await;
321 }
322
323 #[tokio::test]
324 #[ignore = "requires Docker"]
325 async fn suite_lifecycle_broker_orchestrator_consistency() {
326 let (_c, triple) = $setup.await;
327 $crate::lifecycle::test_broker_orchestrator_consistency(&triple).await;
328 }
329 };
330}