celers_core/
result.rs

1#![allow(
2    clippy::missing_errors_doc,
3    clippy::missing_panics_doc,
4    clippy::missing_fields_in_debug
5)]
6#![allow(clippy::cast_precision_loss)]
7//! `AsyncResult` API for querying task results
8//!
9//! This module provides a Celery-compatible interface for retrieving task results,
10//! checking task state, and waiting for task completion.
11//!
12//! # Example
13//!
14//! ```no_run
15//! use celers_core::{AsyncResult, ResultStore, TaskId, TaskResultValue};
16//! use uuid::Uuid;
17//! use std::time::Duration;
18//! # use async_trait::async_trait;
19//! #
20//! # #[derive(Clone)]
21//! # struct MockBackend;
22//! #
23//! # #[async_trait]
24//! # impl ResultStore for MockBackend {
25//! #     async fn store_result(&self, _: TaskId, _: TaskResultValue) -> celers_core::Result<()> { Ok(()) }
26//! #     async fn get_result(&self, _: TaskId) -> celers_core::Result<Option<TaskResultValue>> { Ok(None) }
27//! #     async fn get_state(&self, _: TaskId) -> celers_core::Result<celers_core::TaskState> { Ok(celers_core::TaskState::Pending) }
28//! #     async fn forget(&self, _: TaskId) -> celers_core::Result<()> { Ok(()) }
29//! #     async fn has_result(&self, _: TaskId) -> celers_core::Result<bool> { Ok(false) }
30//! # }
31//!
32//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
33//! let task_id: TaskId = Uuid::new_v4();
34//! let backend = MockBackend; // Use your actual backend (Redis, Database, etc.)
35//! let result = AsyncResult::new(task_id, backend);
36//!
37//! // Check if the task is ready
38//! if result.ready().await? {
39//!     // Get the result
40//!     if result.successful().await? {
41//!         let value = result.get(Some(Duration::from_secs(30))).await?;
42//!         println!("Task succeeded: {:?}", value);
43//!     } else {
44//!         println!("Task failed");
45//!     }
46//! }
47//! # Ok(())
48//! # }
49//! ```
50
51use crate::state::TaskState;
52use crate::TaskId;
53use async_trait::async_trait;
54use serde_json::Value;
55use std::time::Duration;
56
57/// Result store trait for `AsyncResult` API
58///
59/// This trait provides the storage interface needed by `AsyncResult` for querying
60/// task results in a Celery-compatible way. Implementations should provide
61/// lightweight result storage focused on result state and values.
62#[async_trait]
63pub trait ResultStore: Send + Sync {
64    /// Store a task result
65    async fn store_result(&self, task_id: TaskId, result: TaskResultValue) -> crate::Result<()>;
66
67    /// Retrieve a task result
68    async fn get_result(&self, task_id: TaskId) -> crate::Result<Option<TaskResultValue>>;
69
70    /// Get task state
71    async fn get_state(&self, task_id: TaskId) -> crate::Result<TaskState>;
72
73    /// Delete a task result
74    async fn forget(&self, task_id: TaskId) -> crate::Result<()>;
75
76    /// Check if a result exists
77    async fn has_result(&self, task_id: TaskId) -> crate::Result<bool>;
78}
79
80/// Task result value stored in backend
81#[derive(Debug, Clone)]
82pub enum TaskResultValue {
83    /// Task is pending execution
84    Pending,
85
86    /// Task has been received by worker
87    Received,
88
89    /// Task is currently running
90    Started,
91
92    /// Task completed successfully with result
93    Success(Value),
94
95    /// Task failed with error message and optional traceback
96    Failure {
97        error: String,
98        traceback: Option<String>,
99    },
100
101    /// Task was revoked/cancelled
102    Revoked,
103
104    /// Task is being retried
105    Retry { attempt: u32, max_retries: u32 },
106
107    /// Task was rejected (e.g., validation failed)
108    Rejected { reason: String },
109}
110
111impl TaskResultValue {
112    /// Check if the result is in a terminal state
113    #[inline]
114    #[must_use]
115    pub const fn is_terminal(&self) -> bool {
116        matches!(
117            self,
118            TaskResultValue::Success(_)
119                | TaskResultValue::Failure { .. }
120                | TaskResultValue::Revoked
121                | TaskResultValue::Rejected { .. }
122        )
123    }
124
125    /// Check if the task is pending
126    #[inline]
127    #[must_use]
128    pub const fn is_pending(&self) -> bool {
129        matches!(self, TaskResultValue::Pending)
130    }
131
132    /// Check if the task is ready (in terminal state)
133    #[inline]
134    #[must_use]
135    pub const fn is_ready(&self) -> bool {
136        self.is_terminal()
137    }
138
139    /// Check if the task succeeded
140    #[inline]
141    #[must_use]
142    pub const fn is_successful(&self) -> bool {
143        matches!(self, TaskResultValue::Success(_))
144    }
145
146    /// Check if the task failed
147    #[inline]
148    #[must_use]
149    pub const fn is_failed(&self) -> bool {
150        matches!(
151            self,
152            TaskResultValue::Failure { .. } | TaskResultValue::Rejected { .. }
153        )
154    }
155
156    /// Get the success value if available
157    #[inline]
158    #[must_use]
159    pub fn success_value(&self) -> Option<&Value> {
160        match self {
161            TaskResultValue::Success(v) => Some(v),
162            _ => None,
163        }
164    }
165
166    /// Get the error message if failed
167    #[inline]
168    #[must_use]
169    pub fn error_message(&self) -> Option<&str> {
170        match self {
171            TaskResultValue::Failure { error, .. } => Some(error),
172            TaskResultValue::Rejected { reason } => Some(reason),
173            _ => None,
174        }
175    }
176
177    /// Get the traceback if available
178    #[inline]
179    #[must_use]
180    pub fn traceback(&self) -> Option<&str> {
181        match self {
182            TaskResultValue::Failure { traceback, .. } => traceback.as_deref(),
183            _ => None,
184        }
185    }
186}
187
188/// `AsyncResult` handle for querying task results (Celery-compatible API)
189#[derive(Clone)]
190pub struct AsyncResult<S: ResultStore> {
191    /// Task ID
192    task_id: TaskId,
193
194    /// Result store for retrieving results
195    store: S,
196
197    /// Parent result (for chained tasks)
198    parent: Option<Box<AsyncResult<S>>>,
199
200    /// Child results (for group/chord tasks)
201    children: Vec<AsyncResult<S>>,
202}
203
204impl<S: ResultStore + Clone> AsyncResult<S> {
205    /// Create a new `AsyncResult` for a task
206    pub fn new(task_id: TaskId, store: S) -> Self {
207        Self {
208            task_id,
209            store,
210            parent: None,
211            children: Vec::new(),
212        }
213    }
214
215    /// Create an `AsyncResult` with a parent
216    pub fn with_parent(task_id: TaskId, store: S, parent: AsyncResult<S>) -> Self {
217        Self {
218            task_id,
219            store,
220            parent: Some(Box::new(parent)),
221            children: Vec::new(),
222        }
223    }
224
225    /// Create an `AsyncResult` with children (for group/chord results)
226    pub fn with_children(task_id: TaskId, store: S, children: Vec<AsyncResult<S>>) -> Self {
227        Self {
228            task_id,
229            store,
230            parent: None,
231            children,
232        }
233    }
234
235    /// Get the task ID
236    #[inline]
237    #[must_use]
238    pub fn task_id(&self) -> TaskId {
239        self.task_id
240    }
241
242    /// Get the parent result if this is a linked task
243    #[inline]
244    #[must_use]
245    pub fn parent(&self) -> Option<&AsyncResult<S>> {
246        self.parent.as_deref()
247    }
248
249    /// Get child results (for group/chord tasks)
250    #[inline]
251    #[must_use]
252    pub fn children(&self) -> &[AsyncResult<S>] {
253        &self.children
254    }
255
256    /// Add a child result
257    pub fn add_child(&mut self, child: AsyncResult<S>) {
258        self.children.push(child);
259    }
260
261    /// Check if all children are ready (completed)
262    pub async fn children_ready(&self) -> crate::Result<bool> {
263        for child in &self.children {
264            if !child.ready().await? {
265                return Ok(false);
266            }
267        }
268        Ok(true)
269    }
270
271    /// Get results from all children
272    ///
273    /// Returns a vector of results in the same order as children were added.
274    /// Returns an error if any child failed.
275    pub async fn collect_children(
276        &self,
277        timeout: Option<Duration>,
278    ) -> crate::Result<Vec<Option<Value>>> {
279        let mut results = Vec::with_capacity(self.children.len());
280        for child in &self.children {
281            results.push(child.get(timeout).await?);
282        }
283        Ok(results)
284    }
285
286    /// Check if the task is ready (in terminal state)
287    pub async fn ready(&self) -> crate::Result<bool> {
288        let state = self.store.get_state(self.task_id).await?;
289        Ok(state.is_terminal())
290    }
291
292    /// Check if the task completed successfully
293    pub async fn successful(&self) -> crate::Result<bool> {
294        match self.store.get_result(self.task_id).await? {
295            Some(result) => Ok(result.is_successful()),
296            None => Ok(false),
297        }
298    }
299
300    /// Check if the task failed
301    pub async fn failed(&self) -> crate::Result<bool> {
302        match self.store.get_result(self.task_id).await? {
303            Some(result) => Ok(result.is_failed()),
304            None => Ok(false),
305        }
306    }
307
308    /// Get the current task state
309    pub async fn state(&self) -> crate::Result<TaskState> {
310        self.store.get_state(self.task_id).await
311    }
312
313    /// Get task information/metadata
314    pub async fn info(&self) -> crate::Result<Option<TaskResultValue>> {
315        self.store.get_result(self.task_id).await
316    }
317
318    /// Get the result, blocking until it's ready
319    ///
320    /// # Arguments
321    /// * `timeout` - Optional timeout duration. If None, waits indefinitely.
322    ///
323    /// # Returns
324    /// * `Ok(Some(Value))` - Task succeeded with result
325    /// * `Ok(None)` - Task completed but has no result
326    /// * `Err(_)` - Task failed or timeout occurred
327    pub async fn get(&self, timeout: Option<Duration>) -> crate::Result<Option<Value>> {
328        let start = std::time::Instant::now();
329        let poll_interval = Duration::from_millis(100);
330
331        loop {
332            // Check if timeout expired
333            if let Some(timeout_duration) = timeout {
334                if start.elapsed() > timeout_duration {
335                    return Err(crate::CelersError::Timeout(format!(
336                        "Task {} did not complete within {:?}",
337                        self.task_id, timeout_duration
338                    )));
339                }
340            }
341
342            // Get current result
343            if let Some(result) = self.store.get_result(self.task_id).await? {
344                match result {
345                    TaskResultValue::Success(value) => return Ok(Some(value)),
346                    TaskResultValue::Failure { error, traceback } => {
347                        let msg = if let Some(tb) = traceback {
348                            format!("{error}\n{tb}")
349                        } else {
350                            error
351                        };
352                        return Err(crate::CelersError::TaskExecution(msg));
353                    }
354                    TaskResultValue::Revoked => {
355                        return Err(crate::CelersError::TaskRevoked(self.task_id));
356                    }
357                    TaskResultValue::Rejected { reason } => {
358                        return Err(crate::CelersError::TaskExecution(format!(
359                            "Task rejected: {reason}"
360                        )));
361                    }
362                    // Task not ready yet, continue polling
363                    _ => {}
364                }
365            } else {
366                // Result not yet available
367            }
368
369            // Wait before next poll
370            tokio::time::sleep(poll_interval).await;
371        }
372    }
373
374    /// Get the result without blocking
375    ///
376    /// Returns None if the task is not yet complete
377    pub async fn result(&self) -> crate::Result<Option<Value>> {
378        match self.store.get_result(self.task_id).await? {
379            Some(TaskResultValue::Success(value)) => Ok(Some(value)),
380            _ => Ok(None),
381        }
382    }
383
384    /// Get the error traceback if the task failed
385    pub async fn traceback(&self) -> crate::Result<Option<String>> {
386        match self.store.get_result(self.task_id).await? {
387            Some(result) => Ok(result.traceback().map(String::from)),
388            None => Ok(None),
389        }
390    }
391
392    /// Revoke the task
393    pub async fn revoke(&self) -> crate::Result<()> {
394        self.store
395            .store_result(self.task_id, TaskResultValue::Revoked)
396            .await
397    }
398
399    /// Forget the task result (delete from store)
400    pub async fn forget(&self) -> crate::Result<()> {
401        self.store.forget(self.task_id).await
402    }
403
404    /// Wait for the task to complete and return the result
405    ///
406    /// This is a convenience method that combines `ready()` and `get()`
407    pub async fn wait(&self, timeout: Option<Duration>) -> crate::Result<Value> {
408        match self.get(timeout).await? {
409            Some(value) => Ok(value),
410            None => Err(crate::CelersError::TaskExecution(
411                "Task completed but returned no value".to_string(),
412            )),
413        }
414    }
415}
416
417impl<S: ResultStore + Clone> std::fmt::Debug for AsyncResult<S> {
418    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
419        f.debug_struct("AsyncResult")
420            .field("store", &"<ResultStore>")
421            .field("task_id", &self.task_id)
422            .field("has_parent", &self.parent.is_some())
423            .field("num_children", &self.children.len())
424            .finish()
425    }
426}
427
428impl<S: ResultStore + Clone> std::fmt::Display for AsyncResult<S> {
429    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
430        write!(f, "AsyncResult[{}]", &self.task_id.to_string()[..8])
431    }
432}
433
434// ============================================================================
435// Advanced Result Features
436// ============================================================================
437
438use chrono::{DateTime, Utc};
439use serde::{Deserialize, Serialize};
440use std::collections::HashMap;
441
442/// Result metadata for storing additional information with task results
443#[derive(Debug, Clone, Serialize, Deserialize)]
444pub struct ResultMetadata {
445    /// Custom tags for categorization
446    pub tags: Vec<String>,
447
448    /// Custom key-value fields
449    pub custom_fields: HashMap<String, Value>,
450
451    /// Result creation timestamp
452    pub created_at: DateTime<Utc>,
453
454    /// Result expiration timestamp (TTL)
455    #[serde(skip_serializing_if = "Option::is_none")]
456    pub expires_at: Option<DateTime<Utc>>,
457
458    /// Whether the result is compressed
459    pub compressed: bool,
460
461    /// Compression algorithm used (if compressed)
462    #[serde(skip_serializing_if = "Option::is_none")]
463    pub compression_algorithm: Option<String>,
464
465    /// Whether the result is chunked
466    pub chunked: bool,
467
468    /// Total number of chunks (if chunked)
469    #[serde(skip_serializing_if = "Option::is_none")]
470    pub total_chunks: Option<usize>,
471
472    /// Original size in bytes (before compression/chunking)
473    #[serde(skip_serializing_if = "Option::is_none")]
474    pub original_size: Option<usize>,
475
476    /// Compressed size in bytes
477    #[serde(skip_serializing_if = "Option::is_none")]
478    pub compressed_size: Option<usize>,
479}
480
481impl ResultMetadata {
482    /// Create new result metadata
483    #[must_use]
484    pub fn new() -> Self {
485        Self {
486            tags: Vec::new(),
487            custom_fields: HashMap::new(),
488            created_at: Utc::now(),
489            expires_at: None,
490            compressed: false,
491            compression_algorithm: None,
492            chunked: false,
493            total_chunks: None,
494            original_size: None,
495            compressed_size: None,
496        }
497    }
498
499    /// Add a tag
500    #[must_use]
501    pub fn with_tag(mut self, tag: impl Into<String>) -> Self {
502        self.tags.push(tag.into());
503        self
504    }
505
506    /// Add multiple tags
507    #[must_use]
508    pub fn with_tags(mut self, tags: Vec<String>) -> Self {
509        self.tags.extend(tags);
510        self
511    }
512
513    /// Add a custom field
514    #[must_use]
515    pub fn with_field(mut self, key: impl Into<String>, value: Value) -> Self {
516        self.custom_fields.insert(key.into(), value);
517        self
518    }
519
520    ///
521    /// # Panics
522    ///
523    /// Panics if the TTL duration cannot be converted to a chrono duration.
524    /// Set TTL (time to live)
525    #[must_use]
526    pub fn with_ttl(mut self, ttl: Duration) -> Self {
527        self.expires_at = Some(
528            Utc::now() + chrono::Duration::from_std(ttl).expect("TTL duration should be valid"),
529        );
530        self
531    }
532
533    /// Set expiration timestamp
534    #[must_use]
535    pub fn with_expires_at(mut self, expires_at: DateTime<Utc>) -> Self {
536        self.expires_at = Some(expires_at);
537        self
538    }
539
540    /// Check if the result has expired
541    #[inline]
542    #[must_use]
543    pub fn is_expired(&self) -> bool {
544        self.expires_at.is_some_and(|exp| Utc::now() > exp)
545    }
546
547    /// Get time until expiration
548    #[inline]
549    #[must_use]
550    pub fn time_until_expiration(&self) -> Option<Duration> {
551        self.expires_at.and_then(|exp| {
552            let diff = exp - Utc::now();
553            diff.to_std().ok()
554        })
555    }
556
557    /// Mark as compressed
558    #[must_use]
559    pub fn with_compression(
560        mut self,
561        algorithm: impl Into<String>,
562        original_size: usize,
563        compressed_size: usize,
564    ) -> Self {
565        self.compressed = true;
566        self.compression_algorithm = Some(algorithm.into());
567        self.original_size = Some(original_size);
568        self.compressed_size = Some(compressed_size);
569        self
570    }
571
572    /// Mark as chunked
573    #[must_use]
574    pub fn with_chunking(mut self, total_chunks: usize) -> Self {
575        self.chunked = true;
576        self.total_chunks = Some(total_chunks);
577        self
578    }
579
580    /// Get compression ratio
581    #[allow(clippy::cast_precision_loss)]
582    #[must_use]
583    pub fn compression_ratio(&self) -> Option<f64> {
584        if let (Some(orig), Some(comp)) = (self.original_size, self.compressed_size) {
585            if orig > 0 {
586                return Some(comp as f64 / orig as f64);
587            }
588        }
589        None
590    }
591}
592
593impl Default for ResultMetadata {
594    fn default() -> Self {
595        Self::new()
596    }
597}
598
599/// Result chunk for large results
600#[derive(Debug, Clone, Serialize, Deserialize)]
601pub struct ResultChunk {
602    /// Chunk index (0-based)
603    pub index: usize,
604
605    /// Total number of chunks
606    pub total: usize,
607
608    /// Chunk data
609    pub data: Vec<u8>,
610
611    /// Checksum for integrity verification
612    #[serde(skip_serializing_if = "Option::is_none")]
613    pub checksum: Option<String>,
614}
615
616impl ResultChunk {
617    /// Create a new result chunk
618    #[must_use]
619    pub fn new(index: usize, total: usize, data: Vec<u8>) -> Self {
620        Self {
621            index,
622            total,
623            data,
624            checksum: None,
625        }
626    }
627
628    /// Add checksum
629    #[must_use]
630    pub fn with_checksum(mut self, checksum: impl Into<String>) -> Self {
631        self.checksum = Some(checksum.into());
632        self
633    }
634
635    /// Check if this is the last chunk
636    #[inline]
637    #[must_use]
638    pub const fn is_last(&self) -> bool {
639        self.index == self.total - 1
640    }
641}
642
643/// Result tombstone marker for deleted tasks
644#[derive(Debug, Clone, Serialize, Deserialize)]
645pub struct ResultTombstone {
646    /// Task ID
647    pub task_id: TaskId,
648
649    /// Deletion timestamp
650    pub deleted_at: DateTime<Utc>,
651
652    /// Reason for deletion
653    #[serde(skip_serializing_if = "Option::is_none")]
654    pub reason: Option<String>,
655
656    /// Who deleted it (user, system, etc.)
657    #[serde(skip_serializing_if = "Option::is_none")]
658    pub deleted_by: Option<String>,
659
660    /// TTL for the tombstone itself
661    #[serde(skip_serializing_if = "Option::is_none")]
662    pub tombstone_ttl: Option<Duration>,
663}
664
665impl ResultTombstone {
666    /// Create a new tombstone
667    #[must_use]
668    pub fn new(task_id: TaskId) -> Self {
669        Self {
670            task_id,
671            deleted_at: Utc::now(),
672            reason: None,
673            deleted_by: None,
674            tombstone_ttl: None,
675        }
676    }
677
678    /// Set deletion reason
679    #[must_use]
680    pub fn with_reason(mut self, reason: impl Into<String>) -> Self {
681        self.reason = Some(reason.into());
682        self
683    }
684
685    /// Set who deleted it
686    #[must_use]
687    pub fn with_deleted_by(mut self, deleted_by: impl Into<String>) -> Self {
688        self.deleted_by = Some(deleted_by.into());
689        self
690    }
691
692    /// Set tombstone TTL
693    #[must_use]
694    pub fn with_ttl(mut self, ttl: Duration) -> Self {
695        self.tombstone_ttl = Some(ttl);
696        self
697    }
698}
699
700/// Extended result store with advanced features
701#[async_trait]
702pub trait ExtendedResultStore: ResultStore {
703    /// Store result with metadata
704    async fn store_result_with_metadata(
705        &self,
706        task_id: TaskId,
707        result: TaskResultValue,
708        metadata: ResultMetadata,
709    ) -> crate::Result<()>;
710
711    /// Get result metadata
712    async fn get_metadata(&self, task_id: TaskId) -> crate::Result<Option<ResultMetadata>>;
713
714    /// Store a result chunk
715    async fn store_chunk(&self, task_id: TaskId, chunk: ResultChunk) -> crate::Result<()>;
716
717    /// Get a result chunk
718    async fn get_chunk(&self, task_id: TaskId, index: usize) -> crate::Result<Option<ResultChunk>>;
719
720    /// Get all chunks for a task
721    async fn get_all_chunks(&self, task_id: TaskId) -> crate::Result<Vec<ResultChunk>>;
722
723    /// Store a tombstone
724    async fn store_tombstone(&self, tombstone: ResultTombstone) -> crate::Result<()>;
725
726    /// Get a tombstone
727    async fn get_tombstone(&self, task_id: TaskId) -> crate::Result<Option<ResultTombstone>>;
728
729    /// Check if a task has a tombstone
730    async fn has_tombstone(&self, task_id: TaskId) -> crate::Result<bool> {
731        Ok(self.get_tombstone(task_id).await?.is_some())
732    }
733
734    /// Cleanup expired results
735    async fn cleanup_expired(&self) -> crate::Result<usize>;
736
737    /// Query results by tags
738    async fn query_by_tags(&self, tags: &[String]) -> crate::Result<Vec<TaskId>>;
739}
740
741/// Compression helper for result values
742pub struct ResultCompressor {
743    threshold_bytes: usize,
744}
745
746impl ResultCompressor {
747    /// Create a new compressor with threshold
748    #[must_use]
749    pub fn new(threshold_bytes: usize) -> Self {
750        Self { threshold_bytes }
751    }
752
753    /// Check if value should be compressed
754    #[must_use]
755    pub const fn should_compress(&self, data: &[u8]) -> bool {
756        data.len() >= self.threshold_bytes
757    }
758
759    /// Compress data (to be implemented by backend-specific compressors)
760    ///
761    /// Note: Actual compression implementations are provided by backend crates
762    /// (e.g., celers-backend-redis) which have the compression dependencies.
763    pub fn compress(&self, _data: &[u8], _algorithm: &str) -> crate::Result<Vec<u8>> {
764        Err(crate::CelersError::Other(
765            "Compression not available - use backend-specific implementation".to_string(),
766        ))
767    }
768
769    /// Decompress data (to be implemented by backend-specific compressors)
770    ///
771    /// Note: Actual decompression implementations are provided by backend crates
772    /// (e.g., celers-backend-redis) which have the compression dependencies.
773    pub fn decompress(&self, _data: &[u8], _algorithm: &str) -> crate::Result<Vec<u8>> {
774        Err(crate::CelersError::Other(
775            "Decompression not available - use backend-specific implementation".to_string(),
776        ))
777    }
778}
779
780impl Default for ResultCompressor {
781    fn default() -> Self {
782        Self::new(1024 * 1024) // 1MB threshold
783    }
784}
785
786/// Chunker for large results
787pub struct ResultChunker {
788    chunk_size: usize,
789}
790
791impl ResultChunker {
792    /// Create a new chunker
793    #[must_use]
794    pub fn new(chunk_size: usize) -> Self {
795        Self { chunk_size }
796    }
797
798    /// Split data into chunks
799    #[must_use]
800    pub fn chunk(&self, data: &[u8]) -> Vec<ResultChunk> {
801        let total = data.len().div_ceil(self.chunk_size);
802
803        data.chunks(self.chunk_size)
804            .enumerate()
805            .map(|(index, chunk)| ResultChunk::new(index, total, chunk.to_vec()))
806            .collect()
807    }
808
809    /// Reassemble chunks into original data
810    pub fn reassemble(&self, chunks: &[ResultChunk]) -> crate::Result<Vec<u8>> {
811        if chunks.is_empty() {
812            return Ok(Vec::new());
813        }
814
815        // Verify chunks are complete and in order
816        let total = chunks[0].total;
817        if chunks.len() != total {
818            return Err(crate::CelersError::Other(format!(
819                "Incomplete chunks: expected {}, got {}",
820                total,
821                chunks.len()
822            )));
823        }
824
825        let mut result = Vec::new();
826        for (i, chunk) in chunks.iter().enumerate() {
827            if chunk.index != i {
828                return Err(crate::CelersError::Other(format!(
829                    "Chunk out of order: expected index {}, got {}",
830                    i, chunk.index
831                )));
832            }
833            result.extend_from_slice(&chunk.data);
834        }
835
836        Ok(result)
837    }
838}
839
840impl Default for ResultChunker {
841    fn default() -> Self {
842        Self::new(256 * 1024) // 256KB chunks
843    }
844}
845
846#[cfg(test)]
847mod tests {
848    use super::*;
849    use std::collections::HashMap;
850    use std::sync::{Arc, Mutex};
851    use uuid::Uuid;
852
853    // Mock backend for testing
854    #[derive(Clone)]
855    struct MockBackend {
856        results: Arc<Mutex<HashMap<TaskId, TaskResultValue>>>,
857        states: Arc<Mutex<HashMap<TaskId, TaskState>>>,
858    }
859
860    impl MockBackend {
861        fn new() -> Self {
862            Self {
863                results: Arc::new(Mutex::new(HashMap::new())),
864                states: Arc::new(Mutex::new(HashMap::new())),
865            }
866        }
867
868        fn set_result(&self, task_id: TaskId, result: TaskResultValue, state: TaskState) {
869            self.results.lock().unwrap().insert(task_id, result);
870            self.states.lock().unwrap().insert(task_id, state);
871        }
872    }
873
874    #[async_trait]
875    impl ResultStore for MockBackend {
876        async fn store_result(
877            &self,
878            task_id: TaskId,
879            result: TaskResultValue,
880        ) -> crate::Result<()> {
881            self.results.lock().unwrap().insert(task_id, result);
882            Ok(())
883        }
884
885        async fn get_result(&self, task_id: TaskId) -> crate::Result<Option<TaskResultValue>> {
886            Ok(self.results.lock().unwrap().get(&task_id).cloned())
887        }
888
889        async fn get_state(&self, task_id: TaskId) -> crate::Result<TaskState> {
890            Ok(self
891                .states
892                .lock()
893                .unwrap()
894                .get(&task_id)
895                .cloned()
896                .unwrap_or(TaskState::Pending))
897        }
898
899        async fn forget(&self, task_id: TaskId) -> crate::Result<()> {
900            self.results.lock().unwrap().remove(&task_id);
901            self.states.lock().unwrap().remove(&task_id);
902            Ok(())
903        }
904
905        async fn has_result(&self, task_id: TaskId) -> crate::Result<bool> {
906            Ok(self.results.lock().unwrap().contains_key(&task_id))
907        }
908    }
909
910    #[tokio::test]
911    async fn test_async_result_ready() {
912        let backend = MockBackend::new();
913        let task_id = Uuid::new_v4();
914
915        backend.set_result(
916            task_id,
917            TaskResultValue::Success(Value::String("test".to_string())),
918            TaskState::Succeeded(vec![]),
919        );
920
921        let result = AsyncResult::new(task_id, backend);
922        assert!(result.ready().await.unwrap());
923    }
924
925    #[tokio::test]
926    async fn test_async_result_successful() {
927        let backend = MockBackend::new();
928        let task_id = Uuid::new_v4();
929
930        backend.set_result(
931            task_id,
932            TaskResultValue::Success(Value::String("test".to_string())),
933            TaskState::Succeeded(vec![]),
934        );
935
936        let result = AsyncResult::new(task_id, backend);
937        assert!(result.successful().await.unwrap());
938        assert!(!result.failed().await.unwrap());
939    }
940
941    #[tokio::test]
942    async fn test_async_result_failed() {
943        let backend = MockBackend::new();
944        let task_id = Uuid::new_v4();
945
946        backend.set_result(
947            task_id,
948            TaskResultValue::Failure {
949                error: "Test error".to_string(),
950                traceback: None,
951            },
952            TaskState::Failed(String::from("Test error")),
953        );
954
955        let result = AsyncResult::new(task_id, backend);
956        assert!(result.failed().await.unwrap());
957        assert!(!result.successful().await.unwrap());
958    }
959
960    #[tokio::test]
961    async fn test_async_result_get_success() {
962        let backend = MockBackend::new();
963        let task_id = Uuid::new_v4();
964
965        backend.set_result(
966            task_id,
967            TaskResultValue::Success(Value::String("success".to_string())),
968            TaskState::Succeeded(vec![]),
969        );
970
971        let result = AsyncResult::new(task_id, backend);
972        let value = result.get(Some(Duration::from_secs(1))).await.unwrap();
973        assert_eq!(value, Some(Value::String("success".to_string())));
974    }
975
976    #[tokio::test]
977    async fn test_async_result_forget() {
978        let backend = MockBackend::new();
979        let task_id = Uuid::new_v4();
980
981        backend.set_result(
982            task_id,
983            TaskResultValue::Success(Value::String("test".to_string())),
984            TaskState::Succeeded(vec![]),
985        );
986
987        let result = AsyncResult::new(task_id, backend.clone());
988        assert!(backend.has_result(task_id).await.unwrap());
989
990        result.forget().await.unwrap();
991        assert!(!backend.has_result(task_id).await.unwrap());
992    }
993
994    #[tokio::test]
995    async fn test_async_result_children() {
996        let backend = MockBackend::new();
997
998        // Create parent task
999        let parent_id = Uuid::new_v4();
1000        backend.set_result(
1001            parent_id,
1002            TaskResultValue::Success(Value::String("parent".to_string())),
1003            TaskState::Succeeded(vec![]),
1004        );
1005
1006        // Create child tasks
1007        let child1_id = Uuid::new_v4();
1008        let child2_id = Uuid::new_v4();
1009        backend.set_result(
1010            child1_id,
1011            TaskResultValue::Success(Value::Number(serde_json::Number::from(1))),
1012            TaskState::Succeeded(vec![]),
1013        );
1014        backend.set_result(
1015            child2_id,
1016            TaskResultValue::Success(Value::Number(serde_json::Number::from(2))),
1017            TaskState::Succeeded(vec![]),
1018        );
1019
1020        // Create child AsyncResults
1021        let child1 = AsyncResult::new(child1_id, backend.clone());
1022        let child2 = AsyncResult::new(child2_id, backend.clone());
1023
1024        // Create parent with children
1025        let parent = AsyncResult::with_children(parent_id, backend, vec![child1, child2]);
1026
1027        // Test children access
1028        assert_eq!(parent.children().len(), 2);
1029        assert_eq!(parent.children()[0].task_id(), child1_id);
1030        assert_eq!(parent.children()[1].task_id(), child2_id);
1031
1032        // Test children_ready
1033        assert!(parent.children_ready().await.unwrap());
1034
1035        // Test collect_children
1036        let results = parent
1037            .collect_children(Some(Duration::from_secs(1)))
1038            .await
1039            .unwrap();
1040        assert_eq!(results.len(), 2);
1041        assert_eq!(results[0], Some(Value::Number(serde_json::Number::from(1))));
1042        assert_eq!(results[1], Some(Value::Number(serde_json::Number::from(2))));
1043    }
1044
1045    #[tokio::test]
1046    async fn test_async_result_add_child() {
1047        let backend = MockBackend::new();
1048
1049        let parent_id = Uuid::new_v4();
1050        let child_id = Uuid::new_v4();
1051
1052        backend.set_result(
1053            child_id,
1054            TaskResultValue::Success(Value::String("child".to_string())),
1055            TaskState::Succeeded(vec![]),
1056        );
1057
1058        let mut parent = AsyncResult::new(parent_id, backend.clone());
1059        assert_eq!(parent.children().len(), 0);
1060
1061        let child = AsyncResult::new(child_id, backend);
1062        parent.add_child(child);
1063
1064        assert_eq!(parent.children().len(), 1);
1065        assert_eq!(parent.children()[0].task_id(), child_id);
1066    }
1067}