adaptive_pipeline_domain/entities/
processing_context.rs

1// /////////////////////////////////////////////////////////////////////////////
2// Adaptive Pipeline
3// Copyright (c) 2025 Michael Gardner, A Bit of Help, Inc.
4// SPDX-License-Identifier: BSD-3-Clause
5// See LICENSE file in the project root.
6// /////////////////////////////////////////////////////////////////////////////
7
8//! # Processing Context Entity
9//!
10//! The `ProcessingContext` entity maintains runtime state and context
11//! information throughout pipeline execution. It serves as a central repository
12//! for tracking processing progress, configuration parameters, and execution
13//! metadata.
14//!
15//! ## Overview
16//!
17//! The processing context acts as a stateful carrier object that:
18//!
19//! - **Tracks Progress**: Monitors bytes processed and completion status
20//! - **Manages Configuration**: Maintains processing parameters and settings
21//! - **Collects Metrics**: Aggregates performance and operational data
22//! - **Stores Metadata**: Preserves stage-specific results and information
23//! - **Enforces Security**: Maintains security context throughout processing
24//!
25//! ## Entity Characteristics
26//!
27//! - **Mutable State**: Tracks changing values during processing
28//! - **Unique Identity**: Each context has a distinct `ProcessingContextId`
29//! - **Thread Safety**: Designed for safe concurrent access patterns
30//! - **Serializable**: Can be persisted and restored for long-running
31//!   operations
32//!
33//! ## State Management
34//!
35//! The context maintains several categories of state:
36//!
37//! ### Chunk Processing State
38//! - Total file size and bytes processed (for progress tracking)
39//! - Progress calculation and completion status
40//!
41//! ### Configuration State
42//! - Chunk size for processing operations
43//! - Worker count for parallel processing
44//! - Security context and permissions
45//!
46//! ### Runtime State
47//! - Processing metrics and performance data
48//! - Stage-specific results and outputs
49//! - Custom metadata and annotations
50
51use crate::services::datetime_serde;
52use crate::value_objects::{ChunkSize, ProcessingContextId, WorkerCount};
53use crate::{ProcessingMetrics, SecurityContext};
54use serde::{Deserialize, Serialize};
55use std::collections::HashMap;
56
57/// Processing context entity that maintains runtime state during pipeline
58/// execution.
59///
60/// The `ProcessingContext` serves as a central state container that travels
61/// through the pipeline, collecting information and tracking progress as each
62/// stage processes the data. It provides a unified interface for accessing and
63/// updating processing state across all pipeline stages.
64///
65/// ## Entity Purpose
66///
67/// - **State Coordination**: Centralizes processing state across pipeline
68///   stages
69/// - **Progress Tracking**: Monitors processing progress and completion status
70/// - **Configuration Management**: Maintains processing parameters and settings
71/// - **Metrics Collection**: Aggregates performance and operational metrics
72/// - **Security Enforcement**: Preserves security context throughout processing
73///
74/// ## Usage Examples
75///
76/// ### Creating a Processing Context
77///
78///
79/// ### Tracking Processing Progress
80///
81///
82/// ### Managing Stage Results
83///
84///
85/// ### Adding Custom Metadata
86///
87///
88/// ### Updating Processing Metrics
89///
90///
91/// ## State Lifecycle
92///
93/// The processing context follows a predictable lifecycle:
94///
95/// ### 1. Initialization
96///
97/// ### 2. Processing Updates
98///
99/// ### 3. Completion
100///
101/// ## Thread Safety and Concurrency
102///
103/// While the context itself is not thread-safe, it's designed for safe
104/// concurrent patterns:
105///
106///
107/// ## Serialization and Persistence
108///
109/// The context supports serialization for checkpointing and recovery:
110///
111///
112/// ## Performance Considerations
113///
114/// - Context updates are lightweight and fast
115/// - Metadata and stage results use efficient HashMap storage
116/// - Progress calculations are performed on-demand
117/// - Timestamps are updated only when state changes
118/// - Memory usage scales with the amount of stored metadata
119///
120/// ## Error Handling
121///
122/// The context provides safe access to all state with appropriate defaults:
123///
124/// - Missing metadata returns `None` rather than panicking
125/// - Progress calculations handle edge cases (zero file size)
126/// - All numeric operations are checked for overflow
127/// - Timestamp operations are guaranteed to succeed
128#[derive(Debug, Clone, Serialize, Deserialize)]
129pub struct ProcessingContext {
130    // Identity fields (always first)
131    id: ProcessingContextId,
132
133    // Core business fields (alphabetical within group)
134    chunk_size: ChunkSize,
135    file_size: u64,
136    metadata: HashMap<String, String>,
137    metrics: ProcessingMetrics,
138    processed_bytes: u64,
139    security_context: SecurityContext,
140    stage_results: HashMap<String, String>,
141    worker_count: WorkerCount,
142
143    // Metadata fields (always last)
144    #[serde(with = "datetime_serde")]
145    created_at: chrono::DateTime<chrono::Utc>,
146    #[serde(with = "datetime_serde")]
147    updated_at: chrono::DateTime<chrono::Utc>,
148}
149
150impl ProcessingContext {
151    /// Creates a new processing context for pipeline execution
152    ///
153    /// Initializes a chunk-scoped context with default configuration values and
154    /// empty state. The context starts with zero processed bytes and will track
155    /// progress and metadata as the chunk flows through pipeline stages.
156    ///
157    /// # Design Note
158    ///
159    /// This context is chunk-scoped, not file-scoped. File paths are managed
160    /// by the pipeline worker (via `CpuWorkerContext`) using dependency injection.
161    /// This separation ensures the context focuses on chunk processing metadata
162    /// without coupling to file I/O concerns.
163    ///
164    /// # Arguments
165    ///
166    /// * `file_size` - Total size of the file being processed (for progress tracking)
167    /// * `security_context` - Security context for authorization and access control
168    ///
169    /// # Returns
170    ///
171    /// A new `ProcessingContext` with initialized state
172    ///
173    /// # Examples
174    pub fn new(file_size: u64, security_context: SecurityContext) -> Self {
175        let now = chrono::Utc::now();
176
177        ProcessingContext {
178            // Identity fields
179            id: ProcessingContextId::new(),
180
181            // Core business fields (alphabetical)
182            chunk_size: ChunkSize::from_mb(1).unwrap_or_else(|_| ChunkSize::default()),
183            file_size,
184            metadata: HashMap::new(),
185            metrics: ProcessingMetrics::default(),
186            processed_bytes: 0,
187            security_context,
188            stage_results: HashMap::new(),
189            worker_count: WorkerCount::new(4), // Default to 4 workers
190
191            // Metadata fields
192            created_at: now,
193            updated_at: now,
194        }
195    }
196
197    /// Gets the unique identifier for this processing context
198    ///
199    /// # Returns
200    ///
201    /// Reference to the context's unique identifier
202    pub fn id(&self) -> &ProcessingContextId {
203        &self.id
204    }
205
206    /// Gets the total size of the file being processed
207    ///
208    /// # Returns
209    ///
210    /// Total file size in bytes
211    pub fn file_size(&self) -> u64 {
212        self.file_size
213    }
214
215    /// Gets the number of bytes processed so far
216    ///
217    /// # Returns
218    ///
219    /// Number of bytes processed
220    pub fn processed_bytes(&self) -> u64 {
221        self.processed_bytes
222    }
223
224    /// Gets the security context for authorization and access control
225    ///
226    /// # Returns
227    ///
228    /// Reference to the security context
229    pub fn security_context(&self) -> &SecurityContext {
230        &self.security_context
231    }
232
233    /// Gets the current processing metrics
234    ///
235    /// # Returns
236    ///
237    /// Reference to the processing metrics
238    pub fn metrics(&self) -> &ProcessingMetrics {
239        &self.metrics
240    }
241
242    /// Gets the chunk size configuration for processing
243    ///
244    /// # Returns
245    ///
246    /// Reference to the chunk size configuration
247    pub fn chunk_size(&self) -> &ChunkSize {
248        &self.chunk_size
249    }
250
251    /// Gets the number of worker threads for parallel processing
252    ///
253    /// # Returns
254    ///
255    /// Reference to the worker count configuration
256    pub fn worker_count(&self) -> &WorkerCount {
257        &self.worker_count
258    }
259
260    /// Gets all custom metadata associated with this context
261    ///
262    /// # Returns
263    ///
264    /// Reference to the metadata HashMap
265    pub fn metadata(&self) -> &HashMap<String, String> {
266        &self.metadata
267    }
268
269    /// Gets all stage processing results
270    ///
271    /// # Returns
272    ///
273    /// Reference to the stage results HashMap
274    pub fn stage_results(&self) -> &HashMap<String, String> {
275        &self.stage_results
276    }
277
278    /// Sets the total number of bytes processed
279    ///
280    /// Replaces the current processed byte count with a new absolute value.
281    ///
282    /// # Arguments
283    ///
284    /// * `bytes` - New total byte count
285    ///
286    /// # Side Effects
287    ///
288    /// Updates the `updated_at` timestamp
289    pub fn update_processed_bytes(&mut self, bytes: u64) {
290        self.processed_bytes = bytes;
291        self.updated_at = chrono::Utc::now();
292    }
293
294    /// Increments the processed byte count
295    ///
296    /// Adds the specified number of bytes to the current processed total.
297    ///
298    /// # Arguments
299    ///
300    /// * `bytes` - Number of additional bytes to add
301    ///
302    /// # Side Effects
303    ///
304    /// Updates the `updated_at` timestamp
305    pub fn add_processed_bytes(&mut self, bytes: u64) {
306        self.processed_bytes += bytes;
307        self.updated_at = chrono::Utc::now();
308    }
309
310    /// Updates the processing metrics with new values
311    ///
312    /// # Arguments
313    ///
314    /// * `metrics` - New metrics to replace current metrics
315    ///
316    /// # Side Effects
317    ///
318    /// Updates the `updated_at` timestamp
319    pub fn update_metrics(&mut self, metrics: ProcessingMetrics) {
320        self.metrics = metrics;
321        self.updated_at = chrono::Utc::now();
322    }
323
324    /// Adds or updates a metadata key-value pair
325    ///
326    /// # Arguments
327    ///
328    /// * `key` - Metadata key
329    /// * `value` - Metadata value
330    ///
331    /// # Side Effects
332    ///
333    /// Updates the `updated_at` timestamp
334    pub fn add_metadata(&mut self, key: String, value: String) {
335        self.metadata.insert(key, value);
336        self.updated_at = chrono::Utc::now();
337    }
338
339    /// Retrieves a metadata value by key
340    ///
341    /// # Arguments
342    ///
343    /// * `key` - Metadata key to look up
344    ///
345    /// # Returns
346    ///
347    /// * `Some(&String)` - Value if key exists
348    /// * `None` - If key not found
349    pub fn get_metadata(&self, key: &str) -> Option<&String> {
350        self.metadata.get(key)
351    }
352
353    /// Records the result of a processing stage
354    ///
355    /// # Arguments
356    ///
357    /// * `stage_name` - Name of the stage
358    /// * `result` - Processing result or status
359    ///
360    /// # Side Effects
361    ///
362    /// Updates the `updated_at` timestamp
363    pub fn add_stage_result(&mut self, stage_name: String, result: String) {
364        self.stage_results.insert(stage_name, result);
365        self.updated_at = chrono::Utc::now();
366    }
367
368    /// Updates the security context
369    ///
370    /// # Arguments
371    ///
372    /// * `security_context` - New security context
373    ///
374    /// # Side Effects
375    ///
376    /// Updates the `updated_at` timestamp
377    pub fn update_security_context(&mut self, security_context: SecurityContext) {
378        self.security_context = security_context;
379        self.updated_at = chrono::Utc::now();
380    }
381
382    /// Calculates processing progress as a percentage
383    ///
384    /// # Returns
385    ///
386    /// Progress as a percentage (0.0 to 100.0)
387    ///
388    /// # Examples
389    pub fn progress_percentage(&self) -> f64 {
390        if self.file_size == 0 {
391            return 0.0;
392        }
393        ((self.processed_bytes as f64) / (self.file_size as f64)) * 100.0
394    }
395
396    /// Checks if processing is complete
397    ///
398    /// # Returns
399    ///
400    /// `true` if all bytes have been processed, `false` otherwise
401    pub fn is_complete(&self) -> bool {
402        self.processed_bytes >= self.file_size
403    }
404
405    /// Gets the timestamp when this context was created
406    ///
407    /// # Returns
408    ///
409    /// UTC creation timestamp
410    pub fn created_at(&self) -> chrono::DateTime<chrono::Utc> {
411        self.created_at
412    }
413
414    /// Gets the timestamp of the last update to this context
415    ///
416    /// # Returns
417    ///
418    /// UTC timestamp of last modification
419    pub fn updated_at(&self) -> chrono::DateTime<chrono::Utc> {
420        self.updated_at
421    }
422}