adaptive_pipeline_domain/entities/processing_context.rs
1// /////////////////////////////////////////////////////////////////////////////
2// Adaptive Pipeline
3// Copyright (c) 2025 Michael Gardner, A Bit of Help, Inc.
4// SPDX-License-Identifier: BSD-3-Clause
5// See LICENSE file in the project root.
6// /////////////////////////////////////////////////////////////////////////////
7
8//! # Processing Context Entity
9//!
10//! The `ProcessingContext` entity maintains runtime state and context
11//! information throughout pipeline execution. It serves as a central repository
12//! for tracking processing progress, configuration parameters, and execution
13//! metadata.
14//!
15//! ## Overview
16//!
17//! The processing context acts as a stateful carrier object that:
18//!
19//! - **Tracks Progress**: Monitors bytes processed and completion status
20//! - **Manages Configuration**: Maintains processing parameters and settings
21//! - **Collects Metrics**: Aggregates performance and operational data
22//! - **Stores Metadata**: Preserves stage-specific results and information
23//! - **Enforces Security**: Maintains security context throughout processing
24//!
25//! ## Entity Characteristics
26//!
27//! - **Mutable State**: Tracks changing values during processing
28//! - **Unique Identity**: Each context has a distinct `ProcessingContextId`
29//! - **Thread Safety**: Designed for safe concurrent access patterns
30//! - **Serializable**: Can be persisted and restored for long-running
31//! operations
32//!
33//! ## State Management
34//!
35//! The context maintains several categories of state:
36//!
37//! ### Chunk Processing State
38//! - Total file size and bytes processed (for progress tracking)
39//! - Progress calculation and completion status
40//!
41//! ### Configuration State
42//! - Chunk size for processing operations
43//! - Worker count for parallel processing
44//! - Security context and permissions
45//!
46//! ### Runtime State
47//! - Processing metrics and performance data
48//! - Stage-specific results and outputs
49//! - Custom metadata and annotations
50
51use crate::services::datetime_serde;
52use crate::value_objects::{ChunkSize, ProcessingContextId, WorkerCount};
53use crate::{ProcessingMetrics, SecurityContext};
54use serde::{Deserialize, Serialize};
55use std::collections::HashMap;
56
57/// Processing context entity that maintains runtime state during pipeline
58/// execution.
59///
60/// The `ProcessingContext` serves as a central state container that travels
61/// through the pipeline, collecting information and tracking progress as each
62/// stage processes the data. It provides a unified interface for accessing and
63/// updating processing state across all pipeline stages.
64///
65/// ## Entity Purpose
66///
67/// - **State Coordination**: Centralizes processing state across pipeline
68/// stages
69/// - **Progress Tracking**: Monitors processing progress and completion status
70/// - **Configuration Management**: Maintains processing parameters and settings
71/// - **Metrics Collection**: Aggregates performance and operational metrics
72/// - **Security Enforcement**: Preserves security context throughout processing
73///
74/// ## Usage Examples
75///
76/// ### Creating a Processing Context
77///
78///
79/// ### Tracking Processing Progress
80///
81///
82/// ### Managing Stage Results
83///
84///
85/// ### Adding Custom Metadata
86///
87///
88/// ### Updating Processing Metrics
89///
90///
91/// ## State Lifecycle
92///
93/// The processing context follows a predictable lifecycle:
94///
95/// ### 1. Initialization
96///
97/// ### 2. Processing Updates
98///
99/// ### 3. Completion
100///
101/// ## Thread Safety and Concurrency
102///
103/// While the context itself is not thread-safe, it's designed for safe
104/// concurrent patterns:
105///
106///
107/// ## Serialization and Persistence
108///
109/// The context supports serialization for checkpointing and recovery:
110///
111///
112/// ## Performance Considerations
113///
114/// - Context updates are lightweight and fast
115/// - Metadata and stage results use efficient HashMap storage
116/// - Progress calculations are performed on-demand
117/// - Timestamps are updated only when state changes
118/// - Memory usage scales with the amount of stored metadata
119///
120/// ## Error Handling
121///
122/// The context provides safe access to all state with appropriate defaults:
123///
124/// - Missing metadata returns `None` rather than panicking
125/// - Progress calculations handle edge cases (zero file size)
126/// - All numeric operations are checked for overflow
127/// - Timestamp operations are guaranteed to succeed
128#[derive(Debug, Clone, Serialize, Deserialize)]
129pub struct ProcessingContext {
130 // Identity fields (always first)
131 id: ProcessingContextId,
132
133 // Core business fields (alphabetical within group)
134 chunk_size: ChunkSize,
135 file_size: u64,
136 metadata: HashMap<String, String>,
137 metrics: ProcessingMetrics,
138 processed_bytes: u64,
139 security_context: SecurityContext,
140 stage_results: HashMap<String, String>,
141 worker_count: WorkerCount,
142
143 // Metadata fields (always last)
144 #[serde(with = "datetime_serde")]
145 created_at: chrono::DateTime<chrono::Utc>,
146 #[serde(with = "datetime_serde")]
147 updated_at: chrono::DateTime<chrono::Utc>,
148}
149
150impl ProcessingContext {
151 /// Creates a new processing context for pipeline execution
152 ///
153 /// Initializes a chunk-scoped context with default configuration values and
154 /// empty state. The context starts with zero processed bytes and will track
155 /// progress and metadata as the chunk flows through pipeline stages.
156 ///
157 /// # Design Note
158 ///
159 /// This context is chunk-scoped, not file-scoped. File paths are managed
160 /// by the pipeline worker (via `CpuWorkerContext`) using dependency injection.
161 /// This separation ensures the context focuses on chunk processing metadata
162 /// without coupling to file I/O concerns.
163 ///
164 /// # Arguments
165 ///
166 /// * `file_size` - Total size of the file being processed (for progress tracking)
167 /// * `security_context` - Security context for authorization and access control
168 ///
169 /// # Returns
170 ///
171 /// A new `ProcessingContext` with initialized state
172 ///
173 /// # Examples
174 pub fn new(file_size: u64, security_context: SecurityContext) -> Self {
175 let now = chrono::Utc::now();
176
177 ProcessingContext {
178 // Identity fields
179 id: ProcessingContextId::new(),
180
181 // Core business fields (alphabetical)
182 chunk_size: ChunkSize::from_mb(1).unwrap_or_else(|_| ChunkSize::default()),
183 file_size,
184 metadata: HashMap::new(),
185 metrics: ProcessingMetrics::default(),
186 processed_bytes: 0,
187 security_context,
188 stage_results: HashMap::new(),
189 worker_count: WorkerCount::new(4), // Default to 4 workers
190
191 // Metadata fields
192 created_at: now,
193 updated_at: now,
194 }
195 }
196
197 /// Gets the unique identifier for this processing context
198 ///
199 /// # Returns
200 ///
201 /// Reference to the context's unique identifier
202 pub fn id(&self) -> &ProcessingContextId {
203 &self.id
204 }
205
206 /// Gets the total size of the file being processed
207 ///
208 /// # Returns
209 ///
210 /// Total file size in bytes
211 pub fn file_size(&self) -> u64 {
212 self.file_size
213 }
214
215 /// Gets the number of bytes processed so far
216 ///
217 /// # Returns
218 ///
219 /// Number of bytes processed
220 pub fn processed_bytes(&self) -> u64 {
221 self.processed_bytes
222 }
223
224 /// Gets the security context for authorization and access control
225 ///
226 /// # Returns
227 ///
228 /// Reference to the security context
229 pub fn security_context(&self) -> &SecurityContext {
230 &self.security_context
231 }
232
233 /// Gets the current processing metrics
234 ///
235 /// # Returns
236 ///
237 /// Reference to the processing metrics
238 pub fn metrics(&self) -> &ProcessingMetrics {
239 &self.metrics
240 }
241
242 /// Gets the chunk size configuration for processing
243 ///
244 /// # Returns
245 ///
246 /// Reference to the chunk size configuration
247 pub fn chunk_size(&self) -> &ChunkSize {
248 &self.chunk_size
249 }
250
251 /// Gets the number of worker threads for parallel processing
252 ///
253 /// # Returns
254 ///
255 /// Reference to the worker count configuration
256 pub fn worker_count(&self) -> &WorkerCount {
257 &self.worker_count
258 }
259
260 /// Gets all custom metadata associated with this context
261 ///
262 /// # Returns
263 ///
264 /// Reference to the metadata HashMap
265 pub fn metadata(&self) -> &HashMap<String, String> {
266 &self.metadata
267 }
268
269 /// Gets all stage processing results
270 ///
271 /// # Returns
272 ///
273 /// Reference to the stage results HashMap
274 pub fn stage_results(&self) -> &HashMap<String, String> {
275 &self.stage_results
276 }
277
278 /// Sets the total number of bytes processed
279 ///
280 /// Replaces the current processed byte count with a new absolute value.
281 ///
282 /// # Arguments
283 ///
284 /// * `bytes` - New total byte count
285 ///
286 /// # Side Effects
287 ///
288 /// Updates the `updated_at` timestamp
289 pub fn update_processed_bytes(&mut self, bytes: u64) {
290 self.processed_bytes = bytes;
291 self.updated_at = chrono::Utc::now();
292 }
293
294 /// Increments the processed byte count
295 ///
296 /// Adds the specified number of bytes to the current processed total.
297 ///
298 /// # Arguments
299 ///
300 /// * `bytes` - Number of additional bytes to add
301 ///
302 /// # Side Effects
303 ///
304 /// Updates the `updated_at` timestamp
305 pub fn add_processed_bytes(&mut self, bytes: u64) {
306 self.processed_bytes += bytes;
307 self.updated_at = chrono::Utc::now();
308 }
309
310 /// Updates the processing metrics with new values
311 ///
312 /// # Arguments
313 ///
314 /// * `metrics` - New metrics to replace current metrics
315 ///
316 /// # Side Effects
317 ///
318 /// Updates the `updated_at` timestamp
319 pub fn update_metrics(&mut self, metrics: ProcessingMetrics) {
320 self.metrics = metrics;
321 self.updated_at = chrono::Utc::now();
322 }
323
324 /// Adds or updates a metadata key-value pair
325 ///
326 /// # Arguments
327 ///
328 /// * `key` - Metadata key
329 /// * `value` - Metadata value
330 ///
331 /// # Side Effects
332 ///
333 /// Updates the `updated_at` timestamp
334 pub fn add_metadata(&mut self, key: String, value: String) {
335 self.metadata.insert(key, value);
336 self.updated_at = chrono::Utc::now();
337 }
338
339 /// Retrieves a metadata value by key
340 ///
341 /// # Arguments
342 ///
343 /// * `key` - Metadata key to look up
344 ///
345 /// # Returns
346 ///
347 /// * `Some(&String)` - Value if key exists
348 /// * `None` - If key not found
349 pub fn get_metadata(&self, key: &str) -> Option<&String> {
350 self.metadata.get(key)
351 }
352
353 /// Records the result of a processing stage
354 ///
355 /// # Arguments
356 ///
357 /// * `stage_name` - Name of the stage
358 /// * `result` - Processing result or status
359 ///
360 /// # Side Effects
361 ///
362 /// Updates the `updated_at` timestamp
363 pub fn add_stage_result(&mut self, stage_name: String, result: String) {
364 self.stage_results.insert(stage_name, result);
365 self.updated_at = chrono::Utc::now();
366 }
367
368 /// Updates the security context
369 ///
370 /// # Arguments
371 ///
372 /// * `security_context` - New security context
373 ///
374 /// # Side Effects
375 ///
376 /// Updates the `updated_at` timestamp
377 pub fn update_security_context(&mut self, security_context: SecurityContext) {
378 self.security_context = security_context;
379 self.updated_at = chrono::Utc::now();
380 }
381
382 /// Calculates processing progress as a percentage
383 ///
384 /// # Returns
385 ///
386 /// Progress as a percentage (0.0 to 100.0)
387 ///
388 /// # Examples
389 pub fn progress_percentage(&self) -> f64 {
390 if self.file_size == 0 {
391 return 0.0;
392 }
393 ((self.processed_bytes as f64) / (self.file_size as f64)) * 100.0
394 }
395
396 /// Checks if processing is complete
397 ///
398 /// # Returns
399 ///
400 /// `true` if all bytes have been processed, `false` otherwise
401 pub fn is_complete(&self) -> bool {
402 self.processed_bytes >= self.file_size
403 }
404
405 /// Gets the timestamp when this context was created
406 ///
407 /// # Returns
408 ///
409 /// UTC creation timestamp
410 pub fn created_at(&self) -> chrono::DateTime<chrono::Utc> {
411 self.created_at
412 }
413
414 /// Gets the timestamp of the last update to this context
415 ///
416 /// # Returns
417 ///
418 /// UTC timestamp of last modification
419 pub fn updated_at(&self) -> chrono::DateTime<chrono::Utc> {
420 self.updated_at
421 }
422}