Skip to main content

rustvello_test_suite/
lifecycle.rs

1//! Shared lifecycle test definitions.
2//!
3//! These tests exercise cross-component interactions that require a broker,
4//! orchestrator, and state backend working together — the core invocation
5//! lifecycle: register → route → retrieve → execute → store result.
6
7use std::sync::Arc;
8
9use rustvello_core::broker::Broker;
10use rustvello_core::error::TaskError;
11use rustvello_core::orchestrator::Orchestrator;
12use rustvello_core::state_backend::StateBackend;
13use rustvello_proto::call::{CallDTO, SerializedArguments};
14use rustvello_proto::identifiers::RunnerId;
15use rustvello_proto::invocation::InvocationDTO;
16use rustvello_proto::status::InvocationStatus;
17
18use crate::helpers::test_task_id;
19
20/// Convenience bundle for passing all three backends.
21pub struct BackendTriple {
22    pub broker: Arc<dyn Broker>,
23    pub orchestrator: Arc<dyn Orchestrator>,
24    pub state_backend: Arc<dyn StateBackend>,
25}
26
27/// Full happy-path lifecycle: register → route → retrieve → run → store result.
28pub async fn test_full_lifecycle_success(b: &BackendTriple) {
29    let task_id = test_task_id("lifecycle_task");
30    let call = CallDTO::new(task_id.clone(), SerializedArguments::default());
31    let runner_id = RunnerId::new();
32
33    // 1. Register invocation via orchestrator
34    let inv_id = b.orchestrator.register_invocation(&call).await.unwrap();
35    let record = b.orchestrator.get_invocation_status(&inv_id).await.unwrap();
36    assert_eq!(record.status, InvocationStatus::Registered);
37
38    // 2. Route via broker
39    b.broker.route_invocation(&inv_id).await.unwrap();
40
41    // 3. Retrieve from broker
42    let retrieved = b.broker.retrieve_invocation(None).await.unwrap();
43    assert_eq!(retrieved, Some(inv_id.clone()));
44
45    // 4. Store invocation data in state backend
46    let inv_dto = InvocationDTO::new(inv_id.clone(), task_id, call.call_id.clone());
47    b.state_backend
48        .upsert_invocation(&inv_dto, &call)
49        .await
50        .unwrap();
51
52    // 5. Transition: Registered → Pending → Running → Success
53    b.orchestrator
54        .set_invocation_status(&inv_id, InvocationStatus::Pending, Some(&runner_id))
55        .await
56        .unwrap();
57    b.orchestrator
58        .set_invocation_status(&inv_id, InvocationStatus::Running, Some(&runner_id))
59        .await
60        .unwrap();
61    b.orchestrator
62        .set_invocation_status(&inv_id, InvocationStatus::Success, Some(&runner_id))
63        .await
64        .unwrap();
65
66    // 6. Store result
67    b.state_backend
68        .store_result(&inv_id, r#""done""#)
69        .await
70        .unwrap();
71
72    // 7. Verify final state
73    let status = b.orchestrator.get_invocation_status(&inv_id).await.unwrap();
74    assert_eq!(status.status, InvocationStatus::Success);
75
76    let result = b.state_backend.get_result(&inv_id).await.unwrap();
77    assert_eq!(result, Some(r#""done""#.to_string()));
78}
79
80/// Lifecycle with failure: register → route → retrieve → fail → store error.
81pub async fn test_full_lifecycle_failure(b: &BackendTriple) {
82    let task_id = test_task_id("failing_task");
83    let call = CallDTO::new(task_id.clone(), SerializedArguments::default());
84    let runner_id = RunnerId::new();
85
86    let inv_id = b.orchestrator.register_invocation(&call).await.unwrap();
87    b.broker.route_invocation(&inv_id).await.unwrap();
88
89    let inv_dto = InvocationDTO::new(inv_id.clone(), task_id, call.call_id.clone());
90    b.state_backend
91        .upsert_invocation(&inv_dto, &call)
92        .await
93        .unwrap();
94
95    // Transition to Failed
96    b.orchestrator
97        .set_invocation_status(&inv_id, InvocationStatus::Pending, Some(&runner_id))
98        .await
99        .unwrap();
100    b.orchestrator
101        .set_invocation_status(&inv_id, InvocationStatus::Running, Some(&runner_id))
102        .await
103        .unwrap();
104    b.orchestrator
105        .set_invocation_status(&inv_id, InvocationStatus::Failed, Some(&runner_id))
106        .await
107        .unwrap();
108
109    // Store error
110    let err = TaskError {
111        error_type: "RuntimeError".to_string(),
112        message: "something broke".to_string(),
113        traceback: Some("at test".to_string()),
114    };
115    b.state_backend.store_error(&inv_id, &err).await.unwrap();
116
117    let status = b.orchestrator.get_invocation_status(&inv_id).await.unwrap();
118    assert_eq!(status.status, InvocationStatus::Failed);
119
120    let got_err = b.state_backend.get_error(&inv_id).await.unwrap().unwrap();
121    assert_eq!(got_err.error_type, "RuntimeError");
122}
123
124/// Multiple invocations with interleaved execution.
125pub async fn test_multiple_invocations(b: &BackendTriple) {
126    let task_id = test_task_id("multi_task");
127    let runner_id = RunnerId::new();
128
129    // Register and route 3 invocations
130    let mut inv_ids = Vec::new();
131    for i in 0..3 {
132        let mut args = SerializedArguments::new();
133        args.insert("n", i.to_string());
134        let call = CallDTO::new(task_id.clone(), args);
135        let inv_id = b.orchestrator.register_invocation(&call).await.unwrap();
136        b.broker.route_invocation(&inv_id).await.unwrap();
137        inv_ids.push(inv_id);
138    }
139
140    // Verify 3 invocations in broker
141    assert_eq!(b.broker.count_invocations(None).await.unwrap(), 3);
142
143    // Process them in FIFO order
144    for inv_id in &inv_ids {
145        let retrieved = b.broker.retrieve_invocation(None).await.unwrap();
146        assert_eq!(retrieved.as_ref(), Some(inv_id));
147
148        b.orchestrator
149            .set_invocation_status(inv_id, InvocationStatus::Pending, Some(&runner_id))
150            .await
151            .unwrap();
152        b.orchestrator
153            .set_invocation_status(inv_id, InvocationStatus::Running, Some(&runner_id))
154            .await
155            .unwrap();
156        b.orchestrator
157            .set_invocation_status(inv_id, InvocationStatus::Success, Some(&runner_id))
158            .await
159            .unwrap();
160    }
161
162    // All processed — broker is empty
163    assert_eq!(b.broker.count_invocations(None).await.unwrap(), 0);
164}
165
166/// Purge across all three backends.
167pub async fn test_purge_all_backends(b: &BackendTriple) {
168    let task_id = test_task_id("purge_task");
169    let call = CallDTO::new(task_id.clone(), SerializedArguments::default());
170
171    let inv_id = b.orchestrator.register_invocation(&call).await.unwrap();
172    b.broker.route_invocation(&inv_id).await.unwrap();
173
174    let inv_dto = InvocationDTO::new(inv_id.clone(), task_id, call.call_id.clone());
175    b.state_backend
176        .upsert_invocation(&inv_dto, &call)
177        .await
178        .unwrap();
179    b.state_backend
180        .store_result(&inv_id, r#""purged""#)
181        .await
182        .unwrap();
183
184    // Purge all three
185    b.broker.purge(None).await.unwrap();
186    b.orchestrator.purge().await.unwrap();
187    b.state_backend.purge().await.unwrap();
188
189    // Verify everything is gone
190    assert_eq!(b.broker.count_invocations(None).await.unwrap(), 0);
191    assert_eq!(
192        b.orchestrator.count_invocations(None, None).await.unwrap(),
193        0
194    );
195    assert!(b.state_backend.get_invocation(&inv_id).await.is_err());
196}
197
198/// Broker and orchestrator counts stay consistent.
199pub async fn test_broker_orchestrator_consistency(b: &BackendTriple) {
200    let task_id = test_task_id("consistency_task");
201    let runner_id = RunnerId::new();
202
203    let call = CallDTO::new(task_id.clone(), SerializedArguments::default());
204    let inv_id = b.orchestrator.register_invocation(&call).await.unwrap();
205    b.broker.route_invocation(&inv_id).await.unwrap();
206
207    // Orchestrator has 1 Registered, broker has 1 queued
208    assert_eq!(
209        b.orchestrator.count_invocations(None, None).await.unwrap(),
210        1
211    );
212    assert_eq!(b.broker.count_invocations(None).await.unwrap(), 1);
213
214    // Retrieve and process
215    let _ = b.broker.retrieve_invocation(None).await.unwrap();
216    b.orchestrator
217        .set_invocation_status(&inv_id, InvocationStatus::Pending, Some(&runner_id))
218        .await
219        .unwrap();
220    b.orchestrator
221        .set_invocation_status(&inv_id, InvocationStatus::Running, Some(&runner_id))
222        .await
223        .unwrap();
224    b.orchestrator
225        .set_invocation_status(&inv_id, InvocationStatus::Success, Some(&runner_id))
226        .await
227        .unwrap();
228
229    // Broker empty, orchestrator still has the record
230    assert_eq!(b.broker.count_invocations(None).await.unwrap(), 0);
231    assert_eq!(
232        b.orchestrator.count_invocations(None, None).await.unwrap(),
233        1
234    );
235}
236
237/// Macro to generate all lifecycle suite tests for a given backend triple setup.
238///
239/// # Example
240///
241/// ```rust,ignore
242/// use rustvello_test_suite::{lifecycle_suite, lifecycle::BackendTriple};
243/// use std::sync::Arc;
244///
245/// lifecycle_suite!({
246///     BackendTriple {
247///         broker: Arc::new(MemBroker::new()),
248///         orchestrator: Arc::new(MemOrchestrator::new()),
249///         state_backend: Arc::new(MemStateBackend::new()),
250///     }
251/// });
252/// ```
253#[macro_export]
254macro_rules! lifecycle_suite {
255    ($setup:expr) => {
256        #[tokio::test]
257        async fn suite_lifecycle_full_success() {
258            let triple = $setup;
259            $crate::lifecycle::test_full_lifecycle_success(&triple).await;
260        }
261
262        #[tokio::test]
263        async fn suite_lifecycle_full_failure() {
264            let triple = $setup;
265            $crate::lifecycle::test_full_lifecycle_failure(&triple).await;
266        }
267
268        #[tokio::test]
269        async fn suite_lifecycle_multiple_invocations() {
270            let triple = $setup;
271            $crate::lifecycle::test_multiple_invocations(&triple).await;
272        }
273
274        #[tokio::test]
275        async fn suite_lifecycle_purge_all() {
276            let triple = $setup;
277            $crate::lifecycle::test_purge_all_backends(&triple).await;
278        }
279
280        #[tokio::test]
281        async fn suite_lifecycle_broker_orchestrator_consistency() {
282            let triple = $setup;
283            $crate::lifecycle::test_broker_orchestrator_consistency(&triple).await;
284        }
285    };
286}
287
288/// Async-setup variant of [`lifecycle_suite!`] for testcontainers backends.
289///
290/// `$setup` is an async expression returning `(_guard, BackendTriple)`.
291/// Tests are `#[ignore = "requires Docker"]`.
292#[macro_export]
293macro_rules! async_lifecycle_suite {
294    ($setup:expr) => {
295        #[tokio::test]
296        #[ignore = "requires Docker"]
297        async fn suite_lifecycle_full_success() {
298            let (_c, triple) = $setup.await;
299            $crate::lifecycle::test_full_lifecycle_success(&triple).await;
300        }
301
302        #[tokio::test]
303        #[ignore = "requires Docker"]
304        async fn suite_lifecycle_full_failure() {
305            let (_c, triple) = $setup.await;
306            $crate::lifecycle::test_full_lifecycle_failure(&triple).await;
307        }
308
309        #[tokio::test]
310        #[ignore = "requires Docker"]
311        async fn suite_lifecycle_multiple_invocations() {
312            let (_c, triple) = $setup.await;
313            $crate::lifecycle::test_multiple_invocations(&triple).await;
314        }
315
316        #[tokio::test]
317        #[ignore = "requires Docker"]
318        async fn suite_lifecycle_purge_all() {
319            let (_c, triple) = $setup.await;
320            $crate::lifecycle::test_purge_all_backends(&triple).await;
321        }
322
323        #[tokio::test]
324        #[ignore = "requires Docker"]
325        async fn suite_lifecycle_broker_orchestrator_consistency() {
326            let (_c, triple) = $setup.await;
327            $crate::lifecycle::test_broker_orchestrator_consistency(&triple).await;
328        }
329    };
330}