adaptive_pipeline_domain/services/checksum_service.rs
1// /////////////////////////////////////////////////////////////////////////////
2// Adaptive Pipeline
3// Copyright (c) 2025 Michael Gardner, A Bit of Help, Inc.
4// SPDX-License-Identifier: BSD-3-Clause
5// See LICENSE file in the project root.
6// /////////////////////////////////////////////////////////////////////////////
7
8//! # Checksum Service
9//!
10//! This module provides checksum and data integrity verification services for
11//! the adaptive pipeline system. It implements secure hashing algorithms to
12//! ensure data integrity throughout the processing pipeline.
13//!
14//! ## Overview
15//!
16//! The checksum service provides:
17//!
18//! - **Data Integrity**: SHA-256 hashing for tamper detection
19//! - **Chunk Processing**: Incremental hashing for large files
20//! - **Verification**: Checksum validation and comparison
21//! - **Performance**: Optimized for high-throughput processing
22//!
23//! ## Architecture
24//!
25//! The service follows domain-driven design principles:
26//!
27//! - **Service Interface**: `ChecksumService` trait defines the contract
28//! - **Implementation**: `ChecksumProcessor` provides concrete functionality
29//! - **Value Objects**: Structured checksum data with metadata
30//! - **Integration**: Seamless integration with processing pipeline
31//!
32//! ## Security Features
33//!
34//! ### Cryptographic Hashing
35//!
36//! Uses SHA-256 for secure hashing:
37//! - **Collision Resistance**: Practically impossible to find collisions
38//! - **Pre-image Resistance**: Cannot reverse hash to original data
39//! - **Avalanche Effect**: Small changes produce completely different hashes
40//!
41//! ### Integrity Verification
42//!
43//! Comprehensive integrity checking:
44//! - **Tamper Detection**: Identifies any data modifications
45//! - **Corruption Detection**: Detects transmission or storage errors
46//! - **Authentication**: Verifies data authenticity
47//!
48//! ## Processing Model
49//!
50//! ### Chunk-Based Processing
51//!
52//! The service processes data in chunks for efficiency:
53//! - **Incremental Hashing**: Updates hash state with each chunk
54//! - **Memory Efficiency**: Processes large files without loading entirely
55//! - **Parallel Processing**: Supports concurrent chunk processing
56//!
57//! ### Context Integration
58//!
59//! Integrates with processing context:
60//! - **Metadata Tracking**: Records checksum metadata
61//! - **Progress Monitoring**: Updates processing progress
62//! - **Error Handling**: Comprehensive error reporting
63//!
64//! ## Usage Examples
65//!
66//! ### Basic Checksum Calculation
67
68//!
69//! ### Integrity Verification
70
71//!
72//! ## Performance Characteristics
73//!
74//! ### Throughput
75//!
76//! - **High Performance**: Optimized SHA-256 implementation
77//! - **Hardware Acceleration**: Uses CPU crypto extensions when available
78//! - **Streaming Processing**: Constant memory usage regardless of file size
79//!
80//! ### Scalability
81//!
82//! - **Concurrent Processing**: Thread-safe for parallel execution
83//! - **Memory Efficient**: Processes data incrementally
84//! - **Resource Management**: Minimal resource overhead
85//!
86//! ## Error Handling
87//!
88//! The service provides comprehensive error handling:
89//! - **Processing Errors**: Handles chunk processing failures
90//! - **Validation Errors**: Reports checksum validation failures
91//! - **System Errors**: Manages I/O and memory allocation errors
92//!
93//! ## Integration
94//!
95//! The checksum service integrates with:
96//! - **Processing Pipeline**: Automatic integrity checking
97//! - **Storage Systems**: Checksum-based data validation
98//! - **Monitoring**: Performance and integrity metrics
99//! - **Logging**: Detailed operation logging
100
101use crate::entities::ProcessingContext;
102use crate::value_objects::FileChunk;
103use crate::PipelineError;
104use sha2::{Digest, Sha256};
105
106// NOTE: Domain traits are synchronous. Async execution is an infrastructure
107// concern. Infrastructure can provide async adapters that wrap sync
108// implementations.
109
110/// Domain service interface for checksum calculation and data integrity
111/// verification.
112///
113/// This trait defines the contract for checksum services within the adaptive
114/// pipeline system. It provides methods for incremental checksum calculation
115/// during chunk processing and final checksum retrieval for integrity
116/// verification.
117///
118/// ## Design Principles
119///
120/// The checksum service follows these design principles:
121///
122/// - **Incremental Processing**: Calculates checksums incrementally as chunks
123/// are processed
124/// - **Context Integration**: Maintains checksum state within processing
125/// context
126/// - **Algorithm Agnostic**: Supports multiple hashing algorithms (SHA-256,
127/// SHA-512, etc.)
128/// - **Verification Support**: Can verify existing checksums and detect
129/// tampering
130/// - **Performance Optimized**: Efficient implementation for high-throughput
131/// processing
132///
133/// ## Usage Patterns
134///
135/// ### Basic Checksum Calculation
136///
137///
138/// ### Integrity Verification
139///
140///
141/// ## Implementation Requirements
142///
143/// Implementations must:
144/// - Be thread-safe (`Send + Sync`)
145/// - Handle incremental checksum updates efficiently
146/// - Maintain checksum state in processing context
147/// - Support both calculation and verification modes
148/// - Provide consistent results across chunk boundaries
149///
150/// ## Error Handling
151///
152/// The service should handle:
153/// - Checksum verification failures
154/// - Invalid chunk data
155/// - Context state corruption
156/// - Algorithm-specific errors
157///
158/// ## Performance Considerations
159///
160/// - Use hardware-accelerated hashing when available
161/// - Minimize memory allocations during processing
162/// - Optimize for streaming large files
163/// - Support parallel chunk processing where possible
164///
165/// ## Architecture Note
166///
167/// This trait is **synchronous** following DDD principles. The domain layer
168/// defines *what* operations exist, not *how* they execute. Async execution
169/// is an infrastructure concern. Infrastructure adapters can wrap this trait
170/// to provide async interfaces when needed.
171///
172/// Checksum calculation is CPU-bound and doesn't benefit from async I/O.
173/// For async contexts, use `AsyncChecksumAdapter` from the infrastructure
174/// layer.
175///
176/// # TODO: Unified Stage Interface
177///
178/// This trait will be refactored to extend `StageService` after resolving
179/// method signature conflicts between ChecksumService::process_chunk and
180/// StageService::process_chunk. Currently has different parameters (stage_name
181/// vs config).
182pub trait ChecksumService: Send + Sync {
183 /// Process a chunk and update the running checksum
184 ///
185 /// # Note on Async
186 ///
187 /// This method is synchronous in the domain. For async contexts,
188 /// use `AsyncChecksumAdapter` from the infrastructure layer.
189 fn process_chunk(
190 &self,
191 chunk: FileChunk,
192 context: &mut ProcessingContext,
193 stage_name: &str,
194 ) -> Result<FileChunk, PipelineError>;
195
196 /// Get the final checksum value
197 fn get_checksum(&self, context: &ProcessingContext, stage_name: &str) -> Option<String>;
198}
199
200/// Concrete implementation of checksum service using SHA-256 hashing algorithm.
201///
202/// `ChecksumProcessor` provides a high-performance implementation of the
203/// `ChecksumService` trait using the SHA-256 cryptographic hash function. It
204/// supports both checksum calculation and verification modes, with
205/// optimizations for streaming large files.
206///
207/// ## Features
208///
209/// ### Cryptographic Security
210/// - **SHA-256**: Industry-standard cryptographic hash function
211/// - **Collision Resistance**: Practically impossible to find two inputs with
212/// same hash
213/// - **Pre-image Resistance**: Cannot reverse hash to determine original input
214/// - **Avalanche Effect**: Small input changes produce completely different
215/// hashes
216///
217/// ### Processing Modes
218/// - **Calculate Mode**: Computes checksums for data chunks
219/// - **Verify Mode**: Validates existing checksums against computed values
220/// - **Hybrid Mode**: Calculates missing checksums and verifies existing ones
221///
222/// ### Performance Optimizations
223/// - **Incremental Hashing**: Updates hash state with each chunk
224/// - **Hardware Acceleration**: Uses CPU crypto extensions when available
225/// - **Memory Efficient**: Processes large files without loading entirely into
226/// memory
227/// - **Thread Safe**: Safe for concurrent use across multiple threads
228///
229/// ## Usage Examples
230///
231/// ### Basic SHA-256 Checksum Calculation
232///
233///
234/// ### Checksum Verification
235///
236///
237/// ### Custom Algorithm Configuration
238///
239///
240/// ## Configuration Options
241///
242/// ### Algorithm Selection
243/// - **algorithm**: Hash algorithm identifier (currently supports "SHA256")
244/// - Future versions may support SHA-512, Blake3, etc.
245///
246/// ### Verification Mode
247/// - **verify_existing**: When `true`, verifies existing checksums before
248/// processing
249/// - **verify_existing**: When `false`, only calculates missing checksums
250///
251/// ## Error Handling
252///
253/// The processor handles various error conditions:
254/// - **Integrity Errors**: When checksum verification fails
255/// - **Processing Errors**: When chunk data is invalid or corrupted
256/// - **Algorithm Errors**: When hash calculation fails
257///
258/// ## Performance Characteristics
259///
260/// - **Throughput**: ~1-2 GB/s on modern CPUs with hardware acceleration
261/// - **Memory Usage**: Constant ~32 bytes for SHA-256 state regardless of file
262/// size
263/// - **Latency**: Minimal overhead per chunk (~1-10 microseconds)
264/// - **Scalability**: Linear performance scaling with data size
265///
266/// ## Thread Safety
267///
268/// The processor is thread-safe and can be used concurrently:
269/// - Immutable configuration after creation
270/// - No shared mutable state between operations
271/// - Safe to clone and use across threads
272///
273/// ## Integration
274///
275/// Integrates seamlessly with:
276/// - Pipeline processing stages
277/// - File I/O services
278/// - Chunk processing workflows
279/// - Integrity verification systems
280pub struct ChecksumProcessor {
281 pub algorithm: String,
282 pub verify_existing: bool,
283}
284
285impl ChecksumProcessor {
286 pub fn new(algorithm: String, verify_existing: bool) -> Self {
287 Self {
288 algorithm,
289 verify_existing,
290 }
291 }
292
293 pub fn sha256_processor(verify_existing: bool) -> Self {
294 Self::new("SHA256".to_string(), verify_existing)
295 }
296
297 /// Updates the running hash with chunk data
298 pub fn update_hash(&self, hasher: &mut Sha256, chunk: &FileChunk) {
299 hasher.update(chunk.data());
300 }
301
302 /// Finalizes the hash and returns the hex string
303 pub fn finalize_hash(&self, hasher: Sha256) -> String {
304 format!("{:x}", hasher.finalize())
305 }
306
307 /// Processes multiple chunks in parallel using Rayon
308 ///
309 /// This method provides parallel checksum calculation for batches of
310 /// chunks, significantly improving performance for large file
311 /// processing.
312 ///
313 /// # Performance
314 /// - Expected 2-4x speedup on multi-core systems
315 /// - SHA-256 is CPU-bound and highly parallelizable
316 /// - No contention between chunks (independent operations)
317 ///
318 /// # Arguments
319 /// * `chunks` - Slice of chunks to process
320 ///
321 /// # Returns
322 /// Vector of processed chunks with checksums calculated
323 ///
324 /// # Note
325 /// This is a sync method that uses Rayon. For async contexts, wrap in
326 /// `tokio::task::spawn_blocking`.
327 pub fn process_chunks_parallel(&self, chunks: &[FileChunk]) -> Result<Vec<FileChunk>, PipelineError> {
328 use crate::services::file_processor_service::ChunkProcessor;
329 use rayon::prelude::*;
330
331 chunks
332 .par_iter()
333 .map(|chunk| ChunkProcessor::process_chunk(self, chunk))
334 .collect()
335 }
336}
337
338impl ChecksumService for ChecksumProcessor {
339 fn process_chunk(
340 &self,
341 chunk: FileChunk,
342 _context: &mut ProcessingContext,
343 stage_name: &str,
344 ) -> Result<FileChunk, PipelineError> {
345 // Get or create the hasher for this stage
346 let _hasher_key = format!("{}_hasher", stage_name);
347
348 // For now, we'll store the running checksum in the context metadata
349 // In a real implementation, we'd have a proper state management system
350
351 // Update the running hash (this would be stored in context state)
352 // The actual hash state would be maintained in the processing context
353
354 // Return the chunk unchanged (checksum stages are pass-through)
355 Ok(chunk)
356 }
357
358 fn get_checksum(&self, _context: &ProcessingContext, _stage_name: &str) -> Option<String> {
359 // Retrieve the final checksum from context metadata
360 // This would be implemented once we have proper state management
361 None
362 }
363}
364
365// Import ChunkProcessor trait
366use crate::services::file_processor_service::ChunkProcessor;
367
368impl ChunkProcessor for ChecksumProcessor {
369 /// Processes chunk with checksum calculation/verification
370 ///
371 /// # Developer Notes
372 /// - If verify_existing=true: Verifies existing checksum if present
373 /// - Always ensures chunk has a checksum (calculates if missing)
374 /// - Returns new chunk with checksum set
375 /// - Original chunk remains unchanged (immutability)
376 fn process_chunk(&self, chunk: &FileChunk) -> Result<FileChunk, PipelineError> {
377 // Step 1: Verify existing checksum if requested
378 if self.verify_existing && chunk.checksum().is_some() {
379 let is_valid = chunk.verify_integrity()?;
380 if !is_valid {
381 return Err(PipelineError::IntegrityError(format!(
382 "Checksum verification failed for chunk {}",
383 chunk.sequence_number()
384 )));
385 }
386 }
387
388 // Step 2: Ensure chunk has checksum (calculate if missing)
389 if chunk.checksum().is_none() {
390 // Calculate and return new chunk with checksum
391 chunk.with_calculated_checksum()
392 } else {
393 // Chunk already has checksum, return as-is
394 Ok(chunk.clone())
395 }
396 }
397
398 fn name(&self) -> &str {
399 "ChecksumProcessor"
400 }
401
402 fn modifies_data(&self) -> bool {
403 false // Only modifies metadata
404 }
405}