1#![allow(
2 clippy::missing_errors_doc,
3 clippy::missing_panics_doc,
4 clippy::missing_fields_in_debug
5)]
6#![allow(clippy::cast_precision_loss)]
7use crate::state::TaskState;
52use crate::TaskId;
53use async_trait::async_trait;
54use serde_json::Value;
55use std::time::Duration;
56
57#[async_trait]
63pub trait ResultStore: Send + Sync {
64 async fn store_result(&self, task_id: TaskId, result: TaskResultValue) -> crate::Result<()>;
66
67 async fn get_result(&self, task_id: TaskId) -> crate::Result<Option<TaskResultValue>>;
69
70 async fn get_state(&self, task_id: TaskId) -> crate::Result<TaskState>;
72
73 async fn forget(&self, task_id: TaskId) -> crate::Result<()>;
75
76 async fn has_result(&self, task_id: TaskId) -> crate::Result<bool>;
78}
79
80#[derive(Debug, Clone)]
82pub enum TaskResultValue {
83 Pending,
85
86 Received,
88
89 Started,
91
92 Success(Value),
94
95 Failure {
97 error: String,
98 traceback: Option<String>,
99 },
100
101 Revoked,
103
104 Retry { attempt: u32, max_retries: u32 },
106
107 Rejected { reason: String },
109}
110
111impl TaskResultValue {
112 #[inline]
114 #[must_use]
115 pub const fn is_terminal(&self) -> bool {
116 matches!(
117 self,
118 TaskResultValue::Success(_)
119 | TaskResultValue::Failure { .. }
120 | TaskResultValue::Revoked
121 | TaskResultValue::Rejected { .. }
122 )
123 }
124
125 #[inline]
127 #[must_use]
128 pub const fn is_pending(&self) -> bool {
129 matches!(self, TaskResultValue::Pending)
130 }
131
132 #[inline]
134 #[must_use]
135 pub const fn is_ready(&self) -> bool {
136 self.is_terminal()
137 }
138
139 #[inline]
141 #[must_use]
142 pub const fn is_successful(&self) -> bool {
143 matches!(self, TaskResultValue::Success(_))
144 }
145
146 #[inline]
148 #[must_use]
149 pub const fn is_failed(&self) -> bool {
150 matches!(
151 self,
152 TaskResultValue::Failure { .. } | TaskResultValue::Rejected { .. }
153 )
154 }
155
156 #[inline]
158 #[must_use]
159 pub fn success_value(&self) -> Option<&Value> {
160 match self {
161 TaskResultValue::Success(v) => Some(v),
162 _ => None,
163 }
164 }
165
166 #[inline]
168 #[must_use]
169 pub fn error_message(&self) -> Option<&str> {
170 match self {
171 TaskResultValue::Failure { error, .. } => Some(error),
172 TaskResultValue::Rejected { reason } => Some(reason),
173 _ => None,
174 }
175 }
176
177 #[inline]
179 #[must_use]
180 pub fn traceback(&self) -> Option<&str> {
181 match self {
182 TaskResultValue::Failure { traceback, .. } => traceback.as_deref(),
183 _ => None,
184 }
185 }
186}
187
188#[derive(Clone)]
190pub struct AsyncResult<S: ResultStore> {
191 task_id: TaskId,
193
194 store: S,
196
197 parent: Option<Box<AsyncResult<S>>>,
199
200 children: Vec<AsyncResult<S>>,
202}
203
204impl<S: ResultStore + Clone> AsyncResult<S> {
205 pub fn new(task_id: TaskId, store: S) -> Self {
207 Self {
208 task_id,
209 store,
210 parent: None,
211 children: Vec::new(),
212 }
213 }
214
215 pub fn with_parent(task_id: TaskId, store: S, parent: AsyncResult<S>) -> Self {
217 Self {
218 task_id,
219 store,
220 parent: Some(Box::new(parent)),
221 children: Vec::new(),
222 }
223 }
224
225 pub fn with_children(task_id: TaskId, store: S, children: Vec<AsyncResult<S>>) -> Self {
227 Self {
228 task_id,
229 store,
230 parent: None,
231 children,
232 }
233 }
234
235 #[inline]
237 #[must_use]
238 pub fn task_id(&self) -> TaskId {
239 self.task_id
240 }
241
242 #[inline]
244 #[must_use]
245 pub fn parent(&self) -> Option<&AsyncResult<S>> {
246 self.parent.as_deref()
247 }
248
249 #[inline]
251 #[must_use]
252 pub fn children(&self) -> &[AsyncResult<S>] {
253 &self.children
254 }
255
256 pub fn add_child(&mut self, child: AsyncResult<S>) {
258 self.children.push(child);
259 }
260
261 pub async fn children_ready(&self) -> crate::Result<bool> {
263 for child in &self.children {
264 if !child.ready().await? {
265 return Ok(false);
266 }
267 }
268 Ok(true)
269 }
270
271 pub async fn collect_children(
276 &self,
277 timeout: Option<Duration>,
278 ) -> crate::Result<Vec<Option<Value>>> {
279 let mut results = Vec::with_capacity(self.children.len());
280 for child in &self.children {
281 results.push(child.get(timeout).await?);
282 }
283 Ok(results)
284 }
285
286 pub async fn ready(&self) -> crate::Result<bool> {
288 let state = self.store.get_state(self.task_id).await?;
289 Ok(state.is_terminal())
290 }
291
292 pub async fn successful(&self) -> crate::Result<bool> {
294 match self.store.get_result(self.task_id).await? {
295 Some(result) => Ok(result.is_successful()),
296 None => Ok(false),
297 }
298 }
299
300 pub async fn failed(&self) -> crate::Result<bool> {
302 match self.store.get_result(self.task_id).await? {
303 Some(result) => Ok(result.is_failed()),
304 None => Ok(false),
305 }
306 }
307
308 pub async fn state(&self) -> crate::Result<TaskState> {
310 self.store.get_state(self.task_id).await
311 }
312
313 pub async fn info(&self) -> crate::Result<Option<TaskResultValue>> {
315 self.store.get_result(self.task_id).await
316 }
317
318 pub async fn get(&self, timeout: Option<Duration>) -> crate::Result<Option<Value>> {
328 let start = std::time::Instant::now();
329 let poll_interval = Duration::from_millis(100);
330
331 loop {
332 if let Some(timeout_duration) = timeout {
334 if start.elapsed() > timeout_duration {
335 return Err(crate::CelersError::Timeout(format!(
336 "Task {} did not complete within {:?}",
337 self.task_id, timeout_duration
338 )));
339 }
340 }
341
342 if let Some(result) = self.store.get_result(self.task_id).await? {
344 match result {
345 TaskResultValue::Success(value) => return Ok(Some(value)),
346 TaskResultValue::Failure { error, traceback } => {
347 let msg = if let Some(tb) = traceback {
348 format!("{error}\n{tb}")
349 } else {
350 error
351 };
352 return Err(crate::CelersError::TaskExecution(msg));
353 }
354 TaskResultValue::Revoked => {
355 return Err(crate::CelersError::TaskRevoked(self.task_id));
356 }
357 TaskResultValue::Rejected { reason } => {
358 return Err(crate::CelersError::TaskExecution(format!(
359 "Task rejected: {reason}"
360 )));
361 }
362 _ => {}
364 }
365 } else {
366 }
368
369 tokio::time::sleep(poll_interval).await;
371 }
372 }
373
374 pub async fn result(&self) -> crate::Result<Option<Value>> {
378 match self.store.get_result(self.task_id).await? {
379 Some(TaskResultValue::Success(value)) => Ok(Some(value)),
380 _ => Ok(None),
381 }
382 }
383
384 pub async fn traceback(&self) -> crate::Result<Option<String>> {
386 match self.store.get_result(self.task_id).await? {
387 Some(result) => Ok(result.traceback().map(String::from)),
388 None => Ok(None),
389 }
390 }
391
392 pub async fn revoke(&self) -> crate::Result<()> {
394 self.store
395 .store_result(self.task_id, TaskResultValue::Revoked)
396 .await
397 }
398
399 pub async fn forget(&self) -> crate::Result<()> {
401 self.store.forget(self.task_id).await
402 }
403
404 pub async fn wait(&self, timeout: Option<Duration>) -> crate::Result<Value> {
408 match self.get(timeout).await? {
409 Some(value) => Ok(value),
410 None => Err(crate::CelersError::TaskExecution(
411 "Task completed but returned no value".to_string(),
412 )),
413 }
414 }
415}
416
417impl<S: ResultStore + Clone> std::fmt::Debug for AsyncResult<S> {
418 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
419 f.debug_struct("AsyncResult")
420 .field("store", &"<ResultStore>")
421 .field("task_id", &self.task_id)
422 .field("has_parent", &self.parent.is_some())
423 .field("num_children", &self.children.len())
424 .finish()
425 }
426}
427
428impl<S: ResultStore + Clone> std::fmt::Display for AsyncResult<S> {
429 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
430 write!(f, "AsyncResult[{}]", &self.task_id.to_string()[..8])
431 }
432}
433
434use chrono::{DateTime, Utc};
439use serde::{Deserialize, Serialize};
440use std::collections::HashMap;
441
442#[derive(Debug, Clone, Serialize, Deserialize)]
444pub struct ResultMetadata {
445 pub tags: Vec<String>,
447
448 pub custom_fields: HashMap<String, Value>,
450
451 pub created_at: DateTime<Utc>,
453
454 #[serde(skip_serializing_if = "Option::is_none")]
456 pub expires_at: Option<DateTime<Utc>>,
457
458 pub compressed: bool,
460
461 #[serde(skip_serializing_if = "Option::is_none")]
463 pub compression_algorithm: Option<String>,
464
465 pub chunked: bool,
467
468 #[serde(skip_serializing_if = "Option::is_none")]
470 pub total_chunks: Option<usize>,
471
472 #[serde(skip_serializing_if = "Option::is_none")]
474 pub original_size: Option<usize>,
475
476 #[serde(skip_serializing_if = "Option::is_none")]
478 pub compressed_size: Option<usize>,
479}
480
481impl ResultMetadata {
482 #[must_use]
484 pub fn new() -> Self {
485 Self {
486 tags: Vec::new(),
487 custom_fields: HashMap::new(),
488 created_at: Utc::now(),
489 expires_at: None,
490 compressed: false,
491 compression_algorithm: None,
492 chunked: false,
493 total_chunks: None,
494 original_size: None,
495 compressed_size: None,
496 }
497 }
498
499 #[must_use]
501 pub fn with_tag(mut self, tag: impl Into<String>) -> Self {
502 self.tags.push(tag.into());
503 self
504 }
505
506 #[must_use]
508 pub fn with_tags(mut self, tags: Vec<String>) -> Self {
509 self.tags.extend(tags);
510 self
511 }
512
513 #[must_use]
515 pub fn with_field(mut self, key: impl Into<String>, value: Value) -> Self {
516 self.custom_fields.insert(key.into(), value);
517 self
518 }
519
520 #[must_use]
526 pub fn with_ttl(mut self, ttl: Duration) -> Self {
527 self.expires_at = Some(
528 Utc::now() + chrono::Duration::from_std(ttl).expect("TTL duration should be valid"),
529 );
530 self
531 }
532
533 #[must_use]
535 pub fn with_expires_at(mut self, expires_at: DateTime<Utc>) -> Self {
536 self.expires_at = Some(expires_at);
537 self
538 }
539
540 #[inline]
542 #[must_use]
543 pub fn is_expired(&self) -> bool {
544 self.expires_at.is_some_and(|exp| Utc::now() > exp)
545 }
546
547 #[inline]
549 #[must_use]
550 pub fn time_until_expiration(&self) -> Option<Duration> {
551 self.expires_at.and_then(|exp| {
552 let diff = exp - Utc::now();
553 diff.to_std().ok()
554 })
555 }
556
557 #[must_use]
559 pub fn with_compression(
560 mut self,
561 algorithm: impl Into<String>,
562 original_size: usize,
563 compressed_size: usize,
564 ) -> Self {
565 self.compressed = true;
566 self.compression_algorithm = Some(algorithm.into());
567 self.original_size = Some(original_size);
568 self.compressed_size = Some(compressed_size);
569 self
570 }
571
572 #[must_use]
574 pub fn with_chunking(mut self, total_chunks: usize) -> Self {
575 self.chunked = true;
576 self.total_chunks = Some(total_chunks);
577 self
578 }
579
580 #[allow(clippy::cast_precision_loss)]
582 #[must_use]
583 pub fn compression_ratio(&self) -> Option<f64> {
584 if let (Some(orig), Some(comp)) = (self.original_size, self.compressed_size) {
585 if orig > 0 {
586 return Some(comp as f64 / orig as f64);
587 }
588 }
589 None
590 }
591}
592
593impl Default for ResultMetadata {
594 fn default() -> Self {
595 Self::new()
596 }
597}
598
599#[derive(Debug, Clone, Serialize, Deserialize)]
601pub struct ResultChunk {
602 pub index: usize,
604
605 pub total: usize,
607
608 pub data: Vec<u8>,
610
611 #[serde(skip_serializing_if = "Option::is_none")]
613 pub checksum: Option<String>,
614}
615
616impl ResultChunk {
617 #[must_use]
619 pub fn new(index: usize, total: usize, data: Vec<u8>) -> Self {
620 Self {
621 index,
622 total,
623 data,
624 checksum: None,
625 }
626 }
627
628 #[must_use]
630 pub fn with_checksum(mut self, checksum: impl Into<String>) -> Self {
631 self.checksum = Some(checksum.into());
632 self
633 }
634
635 #[inline]
637 #[must_use]
638 pub const fn is_last(&self) -> bool {
639 self.index == self.total - 1
640 }
641}
642
643#[derive(Debug, Clone, Serialize, Deserialize)]
645pub struct ResultTombstone {
646 pub task_id: TaskId,
648
649 pub deleted_at: DateTime<Utc>,
651
652 #[serde(skip_serializing_if = "Option::is_none")]
654 pub reason: Option<String>,
655
656 #[serde(skip_serializing_if = "Option::is_none")]
658 pub deleted_by: Option<String>,
659
660 #[serde(skip_serializing_if = "Option::is_none")]
662 pub tombstone_ttl: Option<Duration>,
663}
664
665impl ResultTombstone {
666 #[must_use]
668 pub fn new(task_id: TaskId) -> Self {
669 Self {
670 task_id,
671 deleted_at: Utc::now(),
672 reason: None,
673 deleted_by: None,
674 tombstone_ttl: None,
675 }
676 }
677
678 #[must_use]
680 pub fn with_reason(mut self, reason: impl Into<String>) -> Self {
681 self.reason = Some(reason.into());
682 self
683 }
684
685 #[must_use]
687 pub fn with_deleted_by(mut self, deleted_by: impl Into<String>) -> Self {
688 self.deleted_by = Some(deleted_by.into());
689 self
690 }
691
692 #[must_use]
694 pub fn with_ttl(mut self, ttl: Duration) -> Self {
695 self.tombstone_ttl = Some(ttl);
696 self
697 }
698}
699
700#[async_trait]
702pub trait ExtendedResultStore: ResultStore {
703 async fn store_result_with_metadata(
705 &self,
706 task_id: TaskId,
707 result: TaskResultValue,
708 metadata: ResultMetadata,
709 ) -> crate::Result<()>;
710
711 async fn get_metadata(&self, task_id: TaskId) -> crate::Result<Option<ResultMetadata>>;
713
714 async fn store_chunk(&self, task_id: TaskId, chunk: ResultChunk) -> crate::Result<()>;
716
717 async fn get_chunk(&self, task_id: TaskId, index: usize) -> crate::Result<Option<ResultChunk>>;
719
720 async fn get_all_chunks(&self, task_id: TaskId) -> crate::Result<Vec<ResultChunk>>;
722
723 async fn store_tombstone(&self, tombstone: ResultTombstone) -> crate::Result<()>;
725
726 async fn get_tombstone(&self, task_id: TaskId) -> crate::Result<Option<ResultTombstone>>;
728
729 async fn has_tombstone(&self, task_id: TaskId) -> crate::Result<bool> {
731 Ok(self.get_tombstone(task_id).await?.is_some())
732 }
733
734 async fn cleanup_expired(&self) -> crate::Result<usize>;
736
737 async fn query_by_tags(&self, tags: &[String]) -> crate::Result<Vec<TaskId>>;
739}
740
741pub struct ResultCompressor {
743 threshold_bytes: usize,
744}
745
746impl ResultCompressor {
747 #[must_use]
749 pub fn new(threshold_bytes: usize) -> Self {
750 Self { threshold_bytes }
751 }
752
753 #[must_use]
755 pub const fn should_compress(&self, data: &[u8]) -> bool {
756 data.len() >= self.threshold_bytes
757 }
758
759 pub fn compress(&self, _data: &[u8], _algorithm: &str) -> crate::Result<Vec<u8>> {
764 Err(crate::CelersError::Other(
765 "Compression not available - use backend-specific implementation".to_string(),
766 ))
767 }
768
769 pub fn decompress(&self, _data: &[u8], _algorithm: &str) -> crate::Result<Vec<u8>> {
774 Err(crate::CelersError::Other(
775 "Decompression not available - use backend-specific implementation".to_string(),
776 ))
777 }
778}
779
780impl Default for ResultCompressor {
781 fn default() -> Self {
782 Self::new(1024 * 1024) }
784}
785
786pub struct ResultChunker {
788 chunk_size: usize,
789}
790
791impl ResultChunker {
792 #[must_use]
794 pub fn new(chunk_size: usize) -> Self {
795 Self { chunk_size }
796 }
797
798 #[must_use]
800 pub fn chunk(&self, data: &[u8]) -> Vec<ResultChunk> {
801 let total = data.len().div_ceil(self.chunk_size);
802
803 data.chunks(self.chunk_size)
804 .enumerate()
805 .map(|(index, chunk)| ResultChunk::new(index, total, chunk.to_vec()))
806 .collect()
807 }
808
809 pub fn reassemble(&self, chunks: &[ResultChunk]) -> crate::Result<Vec<u8>> {
811 if chunks.is_empty() {
812 return Ok(Vec::new());
813 }
814
815 let total = chunks[0].total;
817 if chunks.len() != total {
818 return Err(crate::CelersError::Other(format!(
819 "Incomplete chunks: expected {}, got {}",
820 total,
821 chunks.len()
822 )));
823 }
824
825 let mut result = Vec::new();
826 for (i, chunk) in chunks.iter().enumerate() {
827 if chunk.index != i {
828 return Err(crate::CelersError::Other(format!(
829 "Chunk out of order: expected index {}, got {}",
830 i, chunk.index
831 )));
832 }
833 result.extend_from_slice(&chunk.data);
834 }
835
836 Ok(result)
837 }
838}
839
840impl Default for ResultChunker {
841 fn default() -> Self {
842 Self::new(256 * 1024) }
844}
845
846#[cfg(test)]
847mod tests {
848 use super::*;
849 use std::collections::HashMap;
850 use std::sync::{Arc, Mutex};
851 use uuid::Uuid;
852
853 #[derive(Clone)]
855 struct MockBackend {
856 results: Arc<Mutex<HashMap<TaskId, TaskResultValue>>>,
857 states: Arc<Mutex<HashMap<TaskId, TaskState>>>,
858 }
859
860 impl MockBackend {
861 fn new() -> Self {
862 Self {
863 results: Arc::new(Mutex::new(HashMap::new())),
864 states: Arc::new(Mutex::new(HashMap::new())),
865 }
866 }
867
868 fn set_result(&self, task_id: TaskId, result: TaskResultValue, state: TaskState) {
869 self.results.lock().unwrap().insert(task_id, result);
870 self.states.lock().unwrap().insert(task_id, state);
871 }
872 }
873
874 #[async_trait]
875 impl ResultStore for MockBackend {
876 async fn store_result(
877 &self,
878 task_id: TaskId,
879 result: TaskResultValue,
880 ) -> crate::Result<()> {
881 self.results.lock().unwrap().insert(task_id, result);
882 Ok(())
883 }
884
885 async fn get_result(&self, task_id: TaskId) -> crate::Result<Option<TaskResultValue>> {
886 Ok(self.results.lock().unwrap().get(&task_id).cloned())
887 }
888
889 async fn get_state(&self, task_id: TaskId) -> crate::Result<TaskState> {
890 Ok(self
891 .states
892 .lock()
893 .unwrap()
894 .get(&task_id)
895 .cloned()
896 .unwrap_or(TaskState::Pending))
897 }
898
899 async fn forget(&self, task_id: TaskId) -> crate::Result<()> {
900 self.results.lock().unwrap().remove(&task_id);
901 self.states.lock().unwrap().remove(&task_id);
902 Ok(())
903 }
904
905 async fn has_result(&self, task_id: TaskId) -> crate::Result<bool> {
906 Ok(self.results.lock().unwrap().contains_key(&task_id))
907 }
908 }
909
910 #[tokio::test]
911 async fn test_async_result_ready() {
912 let backend = MockBackend::new();
913 let task_id = Uuid::new_v4();
914
915 backend.set_result(
916 task_id,
917 TaskResultValue::Success(Value::String("test".to_string())),
918 TaskState::Succeeded(vec![]),
919 );
920
921 let result = AsyncResult::new(task_id, backend);
922 assert!(result.ready().await.unwrap());
923 }
924
925 #[tokio::test]
926 async fn test_async_result_successful() {
927 let backend = MockBackend::new();
928 let task_id = Uuid::new_v4();
929
930 backend.set_result(
931 task_id,
932 TaskResultValue::Success(Value::String("test".to_string())),
933 TaskState::Succeeded(vec![]),
934 );
935
936 let result = AsyncResult::new(task_id, backend);
937 assert!(result.successful().await.unwrap());
938 assert!(!result.failed().await.unwrap());
939 }
940
941 #[tokio::test]
942 async fn test_async_result_failed() {
943 let backend = MockBackend::new();
944 let task_id = Uuid::new_v4();
945
946 backend.set_result(
947 task_id,
948 TaskResultValue::Failure {
949 error: "Test error".to_string(),
950 traceback: None,
951 },
952 TaskState::Failed(String::from("Test error")),
953 );
954
955 let result = AsyncResult::new(task_id, backend);
956 assert!(result.failed().await.unwrap());
957 assert!(!result.successful().await.unwrap());
958 }
959
960 #[tokio::test]
961 async fn test_async_result_get_success() {
962 let backend = MockBackend::new();
963 let task_id = Uuid::new_v4();
964
965 backend.set_result(
966 task_id,
967 TaskResultValue::Success(Value::String("success".to_string())),
968 TaskState::Succeeded(vec![]),
969 );
970
971 let result = AsyncResult::new(task_id, backend);
972 let value = result.get(Some(Duration::from_secs(1))).await.unwrap();
973 assert_eq!(value, Some(Value::String("success".to_string())));
974 }
975
976 #[tokio::test]
977 async fn test_async_result_forget() {
978 let backend = MockBackend::new();
979 let task_id = Uuid::new_v4();
980
981 backend.set_result(
982 task_id,
983 TaskResultValue::Success(Value::String("test".to_string())),
984 TaskState::Succeeded(vec![]),
985 );
986
987 let result = AsyncResult::new(task_id, backend.clone());
988 assert!(backend.has_result(task_id).await.unwrap());
989
990 result.forget().await.unwrap();
991 assert!(!backend.has_result(task_id).await.unwrap());
992 }
993
994 #[tokio::test]
995 async fn test_async_result_children() {
996 let backend = MockBackend::new();
997
998 let parent_id = Uuid::new_v4();
1000 backend.set_result(
1001 parent_id,
1002 TaskResultValue::Success(Value::String("parent".to_string())),
1003 TaskState::Succeeded(vec![]),
1004 );
1005
1006 let child1_id = Uuid::new_v4();
1008 let child2_id = Uuid::new_v4();
1009 backend.set_result(
1010 child1_id,
1011 TaskResultValue::Success(Value::Number(serde_json::Number::from(1))),
1012 TaskState::Succeeded(vec![]),
1013 );
1014 backend.set_result(
1015 child2_id,
1016 TaskResultValue::Success(Value::Number(serde_json::Number::from(2))),
1017 TaskState::Succeeded(vec![]),
1018 );
1019
1020 let child1 = AsyncResult::new(child1_id, backend.clone());
1022 let child2 = AsyncResult::new(child2_id, backend.clone());
1023
1024 let parent = AsyncResult::with_children(parent_id, backend, vec![child1, child2]);
1026
1027 assert_eq!(parent.children().len(), 2);
1029 assert_eq!(parent.children()[0].task_id(), child1_id);
1030 assert_eq!(parent.children()[1].task_id(), child2_id);
1031
1032 assert!(parent.children_ready().await.unwrap());
1034
1035 let results = parent
1037 .collect_children(Some(Duration::from_secs(1)))
1038 .await
1039 .unwrap();
1040 assert_eq!(results.len(), 2);
1041 assert_eq!(results[0], Some(Value::Number(serde_json::Number::from(1))));
1042 assert_eq!(results[1], Some(Value::Number(serde_json::Number::from(2))));
1043 }
1044
1045 #[tokio::test]
1046 async fn test_async_result_add_child() {
1047 let backend = MockBackend::new();
1048
1049 let parent_id = Uuid::new_v4();
1050 let child_id = Uuid::new_v4();
1051
1052 backend.set_result(
1053 child_id,
1054 TaskResultValue::Success(Value::String("child".to_string())),
1055 TaskState::Succeeded(vec![]),
1056 );
1057
1058 let mut parent = AsyncResult::new(parent_id, backend.clone());
1059 assert_eq!(parent.children().len(), 0);
1060
1061 let child = AsyncResult::new(child_id, backend);
1062 parent.add_child(child);
1063
1064 assert_eq!(parent.children().len(), 1);
1065 assert_eq!(parent.children()[0].task_id(), child_id);
1066 }
1067}