adaptive_pipeline/infrastructure/adapters/async_checksum.rs
1// /////////////////////////////////////////////////////////////////////////////
2// Adaptive Pipeline
3// Copyright (c) 2025 Michael Gardner, A Bit of Help, Inc.
4// SPDX-License-Identifier: BSD-3-Clause
5// See LICENSE file in the project root.
6// /////////////////////////////////////////////////////////////////////////////
7
8//! # Async Checksum Adapter
9//!
10//! This module provides an async adapter for the synchronous `ChecksumService`
11//! domain trait. It demonstrates the proper pattern for handling sync domain
12//! services in async infrastructure contexts.
13//!
14//! ## Architecture Pattern
15//!
16//! Following the hybrid DDD/Clean/Hexagonal architecture:
17//!
18//! - **Domain**: Defines sync `ChecksumService` trait (business logic)
19//! - **Infrastructure**: Provides async adapter (execution model)
20//! - **Separation of Concerns**: Domain doesn't dictate async/sync execution
21//!
22//! ## Usage
23//!
24//! ```rust,ignore
25//! use std::sync::Arc;
26//! use adaptive_pipeline::infrastructure::adapters::AsyncChecksumAdapter;
27//! use adaptive_pipeline_domain::services::checksum_service::ChecksumProcessor;
28//!
29//! // Create sync implementation
30//! let sync_service = Arc::new(ChecksumProcessor::sha256_processor(true));
31//!
32//! // Wrap in async adapter
33//! let async_service = AsyncChecksumAdapter::new(sync_service);
34//!
35//! // Use in async context
36//! let result = async_service.process_chunk_async(chunk, &mut context, "stage").await?;
37//! ```
38
39use adaptive_pipeline_domain::entities::ProcessingContext;
40use adaptive_pipeline_domain::services::checksum_service::ChecksumService;
41use adaptive_pipeline_domain::value_objects::FileChunk;
42use adaptive_pipeline_domain::PipelineError;
43use std::sync::Arc;
44
45/// Async adapter for `ChecksumService`
46///
47/// Wraps a synchronous `ChecksumService` implementation and provides
48/// async methods that execute the sync operations in a way that doesn't
49/// block the async runtime.
50///
51/// ## Design Rationale
52///
53/// - **Domain Purity**: Domain traits remain sync and portable
54/// - **Infrastructure Flexibility**: Async execution is an implementation
55/// detail
56/// - **Non-Blocking**: Uses `spawn_blocking` for CPU-intensive operations
57/// - **Zero-Cost When Sync**: No overhead if used in sync contexts
58pub struct AsyncChecksumAdapter<T: ChecksumService + 'static> {
59 inner: Arc<T>,
60}
61
62impl<T: ChecksumService + 'static> AsyncChecksumAdapter<T> {
63 /// Creates a new async adapter wrapping a sync checksum service
64 pub fn new(service: Arc<T>) -> Self {
65 Self { inner: service }
66 }
67
68 /// Processes a chunk asynchronously, updating the running checksum
69 ///
70 /// Executes the synchronous process operation in a blocking task pool
71 /// to avoid blocking the async runtime.
72 pub async fn process_chunk_async(
73 &self,
74 chunk: FileChunk,
75 context: &mut ProcessingContext,
76 stage_name: &str,
77 ) -> Result<FileChunk, PipelineError> {
78 let service = self.inner.clone();
79 let mut context_clone = context.clone();
80 let stage_name = stage_name.to_string();
81
82 tokio::task::spawn_blocking(move || service.process_chunk(chunk, &mut context_clone, &stage_name))
83 .await
84 .map_err(|e| PipelineError::InternalError(format!("Task join error: {}", e)))?
85 }
86
87 /// Processes multiple chunks in parallel (infrastructure concern)
88 ///
89 /// This method demonstrates how parallelization is an infrastructure
90 /// concern, not a domain concern. The domain just defines process_chunk.
91 pub async fn process_chunks_parallel(
92 &self,
93 chunks: Vec<FileChunk>,
94 context: &mut ProcessingContext,
95 stage_name: &str,
96 ) -> Result<Vec<FileChunk>, PipelineError> {
97 let mut tasks = Vec::new();
98
99 for chunk in chunks {
100 let service = self.inner.clone();
101 let mut context_clone = context.clone();
102 let stage_name = stage_name.to_string();
103
104 let task =
105 tokio::task::spawn_blocking(move || service.process_chunk(chunk, &mut context_clone, &stage_name));
106
107 tasks.push(task);
108 }
109
110 let mut results = Vec::new();
111 for task in tasks {
112 let result = task
113 .await
114 .map_err(|e| PipelineError::InternalError(format!("Task join error: {}", e)))??;
115 results.push(result);
116 }
117
118 Ok(results)
119 }
120
121 /// Gets the final checksum value (sync operation)
122 pub fn get_checksum(&self, context: &ProcessingContext, stage_name: &str) -> Option<String> {
123 self.inner.get_checksum(context, stage_name)
124 }
125}
126
127impl<T: ChecksumService + 'static> Clone for AsyncChecksumAdapter<T> {
128 fn clone(&self) -> Self {
129 Self {
130 inner: self.inner.clone(),
131 }
132 }
133}
134
135#[cfg(test)]
136mod tests {
137 use super::*;
138
139 // Test double for demonstration
140 struct FakeChecksumService;
141
142 impl ChecksumService for FakeChecksumService {
143 fn process_chunk(
144 &self,
145 chunk: FileChunk,
146 _context: &mut ProcessingContext,
147 _stage_name: &str,
148 ) -> Result<FileChunk, PipelineError> {
149 Ok(chunk) // Fake: just return the same chunk
150 }
151
152 fn get_checksum(&self, _context: &ProcessingContext, _stage_name: &str) -> Option<String> {
153 Some("fake_checksum".to_string())
154 }
155 }
156
157 #[tokio::test]
158 async fn test_async_adapter_pattern() {
159 use adaptive_pipeline_domain::entities::{ProcessingContext, SecurityContext, SecurityLevel};
160
161
162 let sync_service = Arc::new(FakeChecksumService);
163 let async_adapter = AsyncChecksumAdapter::new(sync_service);
164
165 // Test that we can call sync methods
166 let security_context = SecurityContext::new(Some("test".to_string()), SecurityLevel::Internal);
167 let context = ProcessingContext::new(
168 1024,
169 security_context,
170 );
171 let checksum = async_adapter.get_checksum(&context, "test_stage");
172 assert_eq!(checksum, Some("fake_checksum".to_string()));
173 }
174}