adaptive_pipeline/infrastructure/adapters/
async_checksum.rs

1// /////////////////////////////////////////////////////////////////////////////
2// Adaptive Pipeline
3// Copyright (c) 2025 Michael Gardner, A Bit of Help, Inc.
4// SPDX-License-Identifier: BSD-3-Clause
5// See LICENSE file in the project root.
6// /////////////////////////////////////////////////////////////////////////////
7
8//! # Async Checksum Adapter
9//!
10//! This module provides an async adapter for the synchronous `ChecksumService`
11//! domain trait. It demonstrates the proper pattern for handling sync domain
12//! services in async infrastructure contexts.
13//!
14//! ## Architecture Pattern
15//!
16//! Following the hybrid DDD/Clean/Hexagonal architecture:
17//!
18//! - **Domain**: Defines sync `ChecksumService` trait (business logic)
19//! - **Infrastructure**: Provides async adapter (execution model)
20//! - **Separation of Concerns**: Domain doesn't dictate async/sync execution
21//!
22//! ## Usage
23//!
24//! ```rust,ignore
25//! use std::sync::Arc;
26//! use adaptive_pipeline::infrastructure::adapters::AsyncChecksumAdapter;
27//! use adaptive_pipeline_domain::services::checksum_service::ChecksumProcessor;
28//!
29//! // Create sync implementation
30//! let sync_service = Arc::new(ChecksumProcessor::sha256_processor(true));
31//!
32//! // Wrap in async adapter
33//! let async_service = AsyncChecksumAdapter::new(sync_service);
34//!
35//! // Use in async context
36//! let result = async_service.process_chunk_async(chunk, &mut context, "stage").await?;
37//! ```
38
39use adaptive_pipeline_domain::entities::ProcessingContext;
40use adaptive_pipeline_domain::services::checksum_service::ChecksumService;
41use adaptive_pipeline_domain::value_objects::FileChunk;
42use adaptive_pipeline_domain::PipelineError;
43use std::sync::Arc;
44
45/// Async adapter for `ChecksumService`
46///
47/// Wraps a synchronous `ChecksumService` implementation and provides
48/// async methods that execute the sync operations in a way that doesn't
49/// block the async runtime.
50///
51/// ## Design Rationale
52///
53/// - **Domain Purity**: Domain traits remain sync and portable
54/// - **Infrastructure Flexibility**: Async execution is an implementation
55///   detail
56/// - **Non-Blocking**: Uses `spawn_blocking` for CPU-intensive operations
57/// - **Zero-Cost When Sync**: No overhead if used in sync contexts
58pub struct AsyncChecksumAdapter<T: ChecksumService + 'static> {
59    inner: Arc<T>,
60}
61
62impl<T: ChecksumService + 'static> AsyncChecksumAdapter<T> {
63    /// Creates a new async adapter wrapping a sync checksum service
64    pub fn new(service: Arc<T>) -> Self {
65        Self { inner: service }
66    }
67
68    /// Processes a chunk asynchronously, updating the running checksum
69    ///
70    /// Executes the synchronous process operation in a blocking task pool
71    /// to avoid blocking the async runtime.
72    pub async fn process_chunk_async(
73        &self,
74        chunk: FileChunk,
75        context: &mut ProcessingContext,
76        stage_name: &str,
77    ) -> Result<FileChunk, PipelineError> {
78        let service = self.inner.clone();
79        let mut context_clone = context.clone();
80        let stage_name = stage_name.to_string();
81
82        tokio::task::spawn_blocking(move || service.process_chunk(chunk, &mut context_clone, &stage_name))
83            .await
84            .map_err(|e| PipelineError::InternalError(format!("Task join error: {}", e)))?
85    }
86
87    /// Processes multiple chunks in parallel (infrastructure concern)
88    ///
89    /// This method demonstrates how parallelization is an infrastructure
90    /// concern, not a domain concern. The domain just defines process_chunk.
91    pub async fn process_chunks_parallel(
92        &self,
93        chunks: Vec<FileChunk>,
94        context: &mut ProcessingContext,
95        stage_name: &str,
96    ) -> Result<Vec<FileChunk>, PipelineError> {
97        let mut tasks = Vec::new();
98
99        for chunk in chunks {
100            let service = self.inner.clone();
101            let mut context_clone = context.clone();
102            let stage_name = stage_name.to_string();
103
104            let task =
105                tokio::task::spawn_blocking(move || service.process_chunk(chunk, &mut context_clone, &stage_name));
106
107            tasks.push(task);
108        }
109
110        let mut results = Vec::new();
111        for task in tasks {
112            let result = task
113                .await
114                .map_err(|e| PipelineError::InternalError(format!("Task join error: {}", e)))??;
115            results.push(result);
116        }
117
118        Ok(results)
119    }
120
121    /// Gets the final checksum value (sync operation)
122    pub fn get_checksum(&self, context: &ProcessingContext, stage_name: &str) -> Option<String> {
123        self.inner.get_checksum(context, stage_name)
124    }
125}
126
127impl<T: ChecksumService + 'static> Clone for AsyncChecksumAdapter<T> {
128    fn clone(&self) -> Self {
129        Self {
130            inner: self.inner.clone(),
131        }
132    }
133}
134
135#[cfg(test)]
136mod tests {
137    use super::*;
138
139    // Test double for demonstration
140    struct FakeChecksumService;
141
142    impl ChecksumService for FakeChecksumService {
143        fn process_chunk(
144            &self,
145            chunk: FileChunk,
146            _context: &mut ProcessingContext,
147            _stage_name: &str,
148        ) -> Result<FileChunk, PipelineError> {
149            Ok(chunk) // Fake: just return the same chunk
150        }
151
152        fn get_checksum(&self, _context: &ProcessingContext, _stage_name: &str) -> Option<String> {
153            Some("fake_checksum".to_string())
154        }
155    }
156
157    #[tokio::test]
158    async fn test_async_adapter_pattern() {
159        use adaptive_pipeline_domain::entities::{ProcessingContext, SecurityContext, SecurityLevel};
160        
161
162        let sync_service = Arc::new(FakeChecksumService);
163        let async_adapter = AsyncChecksumAdapter::new(sync_service);
164
165        // Test that we can call sync methods
166        let security_context = SecurityContext::new(Some("test".to_string()), SecurityLevel::Internal);
167        let context = ProcessingContext::new(
168            1024,
169            security_context,
170        );
171        let checksum = async_adapter.get_checksum(&context, "test_stage");
172        assert_eq!(checksum, Some("fake_checksum".to_string()));
173    }
174}