adaptive_pipeline_domain/aggregates/
pipeline_aggregate.rs

1// /////////////////////////////////////////////////////////////////////////////
2// Adaptive Pipeline
3// Copyright (c) 2025 Michael Gardner, A Bit of Help, Inc.
4// SPDX-License-Identifier: BSD-3-Clause
5// See LICENSE file in the project root.
6// /////////////////////////////////////////////////////////////////////////////
7
8//! Example UUID conversion:
9
10use crate::entities::pipeline::pipeline_id_to_uuid;
11use crate::events::{
12    PipelineCreatedEvent, PipelineUpdatedEvent, ProcessingCompletedEvent, ProcessingFailedEvent, ProcessingStartedEvent,
13};
14use crate::{Pipeline, PipelineError, PipelineEvent, ProcessingContext, ProcessingMetrics, SecurityContext};
15use std::collections::HashMap;
16use uuid::Uuid;
17///
18/// ### Managing Processing Operations
19///
20///
21/// ### Error Handling and Recovery
22///
23///
24/// ### Pipeline Configuration Updates
25///
26///
27/// ## Event Management
28///
29/// The aggregate generates domain events for all significant state changes:
30///
31/// - `PipelineCreated`: When a new pipeline is created
32/// - `PipelineUpdated`: When pipeline configuration changes
33/// - `ProcessingStarted`: When file processing begins
34/// - `ProcessingCompleted`: When processing finishes successfully
35/// - `ProcessingFailed`: When processing encounters errors
36///
37/// ## Concurrency and Thread Safety
38///
39/// The aggregate is designed for single-threaded access within a transaction
40/// boundary. Concurrent access should be managed through:
41///
42/// - Repository-level locking mechanisms
43/// - Optimistic concurrency control using version numbers
44/// - Event store transaction boundaries
45/// - Application-level coordination
46///
47/// ## Performance Considerations
48///
49/// - **Memory Usage**: Scales with number of active processing contexts
50/// - **Event Storage**: Uncommitted events held in memory until persistence
51/// - **Validation Overhead**: All operations include business rule validation
52/// - **Version Tracking**: Minimal overhead for optimistic concurrency control
53///
54/// ## Error Recovery
55///
56/// The aggregate provides robust error handling:
57///
58/// - **Validation Errors**: Prevent invalid state transitions
59/// - **Processing Failures**: Tracked with detailed error information
60/// - **Event Application**: Supports replay for crash recovery
61/// - **State Consistency**: Maintains valid state even during failures
62#[derive(Debug, Clone)]
63pub struct PipelineAggregate {
64    pipeline: Pipeline,
65    version: u64,
66    uncommitted_events: Vec<PipelineEvent>,
67    active_processing_contexts: HashMap<Uuid, ProcessingContext>,
68}
69
70impl PipelineAggregate {
71    /// Creates a new pipeline aggregate
72    pub fn new(pipeline: Pipeline) -> Result<Self, PipelineError> {
73        pipeline.validate()?;
74
75        let mut aggregate = Self {
76            pipeline: pipeline.clone(),
77            version: 1,
78            uncommitted_events: Vec::new(),
79            active_processing_contexts: HashMap::new(),
80        };
81
82        // Raise pipeline created event
83        let event = PipelineCreatedEvent::new(
84            pipeline_id_to_uuid(pipeline.id()),
85            pipeline.name().to_string(),
86            pipeline.stages().len(),
87            None, // TODO: Get from security context
88        );
89        aggregate.add_event(PipelineEvent::PipelineCreated(event));
90
91        Ok(aggregate)
92    }
93
94    /// Loads aggregate from events (event sourcing)
95    pub fn from_events(events: Vec<PipelineEvent>) -> Result<Self, PipelineError> {
96        if events.is_empty() {
97            return Err(PipelineError::InvalidConfiguration("No events provided".to_string()));
98        }
99
100        // Find the first PipelineCreated event to initialize the aggregate
101        let created_event = events
102            .iter()
103            .find_map(|e| match e {
104                PipelineEvent::PipelineCreated(event) => Some(event),
105                _ => None,
106            })
107            .ok_or_else(|| PipelineError::InvalidConfiguration("No PipelineCreated event found".to_string()))?;
108
109        // Create initial pipeline (this would normally be reconstructed from events)
110        let pipeline = Pipeline::new(
111            created_event.pipeline_name.clone(),
112            Vec::new(), // Stages would be reconstructed from events
113        )?;
114
115        let mut aggregate = Self {
116            pipeline,
117            version: 0,
118            uncommitted_events: Vec::new(),
119            active_processing_contexts: HashMap::new(),
120        };
121
122        // Apply all events to reconstruct state
123        for event in events {
124            aggregate.apply_event(&event)?;
125        }
126
127        Ok(aggregate)
128    }
129
130    /// Gets the pipeline
131    pub fn pipeline(&self) -> &Pipeline {
132        &self.pipeline
133    }
134
135    /// Gets the aggregate version
136    pub fn version(&self) -> u64 {
137        self.version
138    }
139
140    /// Gets uncommitted events
141    pub fn uncommitted_events(&self) -> &[PipelineEvent] {
142        &self.uncommitted_events
143    }
144
145    /// Marks events as committed
146    pub fn mark_events_as_committed(&mut self) {
147        self.uncommitted_events.clear();
148    }
149
150    /// Updates the pipeline configuration
151    pub fn update_pipeline(&mut self, updated_pipeline: Pipeline) -> Result<(), PipelineError> {
152        updated_pipeline.validate()?;
153
154        // Track changes
155        let mut changes = Vec::new();
156        if self.pipeline.name() != updated_pipeline.name() {
157            changes.push(format!(
158                "Name changed from '{}' to '{}'",
159                self.pipeline.name(),
160                updated_pipeline.name()
161            ));
162        }
163        if self.pipeline.stages().len() != updated_pipeline.stages().len() {
164            changes.push(format!(
165                "Stage count changed from {} to {}",
166                self.pipeline.stages().len(),
167                updated_pipeline.stages().len()
168            ));
169        }
170
171        self.pipeline = updated_pipeline;
172
173        // Raise pipeline updated event
174        let event = PipelineUpdatedEvent {
175            event_id: Uuid::new_v4(),
176            pipeline_id: pipeline_id_to_uuid(self.pipeline.id()),
177            changes,
178            updated_by: None, // TODO: Get from security context
179            occurred_at: chrono::Utc::now(),
180            version: self.version + 1,
181        };
182        self.add_event(PipelineEvent::PipelineUpdated(event));
183
184        Ok(())
185    }
186
187    /// Starts processing a file
188    pub fn start_processing(
189        &mut self,
190        input_path: String,
191        output_path: String,
192        file_size: u64,
193        security_context: SecurityContext,
194    ) -> Result<Uuid, PipelineError> {
195        // Validate security context
196        security_context.validate()?;
197
198        // Create processing context
199        let processing_id = Uuid::new_v4();
200        let context = ProcessingContext::new(
201            file_size,
202            security_context.clone(),
203        );
204
205        self.active_processing_contexts.insert(processing_id, context);
206
207        // Raise processing started event
208        let event = ProcessingStartedEvent::new(
209            pipeline_id_to_uuid(self.pipeline.id()),
210            processing_id,
211            input_path,
212            output_path,
213            file_size,
214            security_context,
215        );
216        self.add_event(PipelineEvent::ProcessingStarted(event));
217
218        Ok(processing_id)
219    }
220
221    /// Completes processing
222    pub fn complete_processing(
223        &mut self,
224        processing_id: Uuid,
225        metrics: ProcessingMetrics,
226        output_size: u64,
227    ) -> Result<(), PipelineError> {
228        if !self.active_processing_contexts.contains_key(&processing_id) {
229            return Err(PipelineError::InvalidConfiguration(
230                "Processing context not found".to_string(),
231            ));
232        }
233
234        // Remove from active contexts
235        self.active_processing_contexts.remove(&processing_id);
236
237        // Raise processing completed event
238        let event = ProcessingCompletedEvent::new(
239            pipeline_id_to_uuid(self.pipeline.id()),
240            processing_id,
241            metrics,
242            output_size,
243        );
244        self.add_event(PipelineEvent::ProcessingCompleted(event));
245
246        Ok(())
247    }
248
249    /// Fails processing
250    pub fn fail_processing(
251        &mut self,
252        processing_id: Uuid,
253        error_message: String,
254        error_code: String,
255        stage_name: Option<String>,
256        partial_metrics: Option<ProcessingMetrics>,
257    ) -> Result<(), PipelineError> {
258        if !self.active_processing_contexts.contains_key(&processing_id) {
259            return Err(PipelineError::InvalidConfiguration(
260                "Processing context not found".to_string(),
261            ));
262        }
263
264        // Remove from active contexts
265        self.active_processing_contexts.remove(&processing_id);
266
267        // Raise processing failed event
268        let event = ProcessingFailedEvent {
269            event_id: Uuid::new_v4(),
270            pipeline_id: pipeline_id_to_uuid(self.pipeline.id()),
271            processing_id,
272            error_message,
273            error_code,
274            stage_name,
275            partial_metrics,
276            occurred_at: chrono::Utc::now(),
277            version: self.version + 1,
278        };
279        self.add_event(PipelineEvent::ProcessingFailed(event));
280
281        Ok(())
282    }
283
284    /// Gets active processing contexts
285    pub fn active_processing_contexts(&self) -> &HashMap<Uuid, ProcessingContext> {
286        &self.active_processing_contexts
287    }
288
289    /// Gets a specific processing context
290    pub fn get_processing_context(&self, processing_id: Uuid) -> Option<&ProcessingContext> {
291        self.active_processing_contexts.get(&processing_id)
292    }
293
294    /// Updates a processing context
295    pub fn update_processing_context(
296        &mut self,
297        processing_id: Uuid,
298        context: ProcessingContext,
299    ) -> Result<(), PipelineError> {
300        if !self.active_processing_contexts.contains_key(&processing_id) {
301            return Err(PipelineError::InvalidConfiguration(
302                "Processing context not found".to_string(),
303            ));
304        }
305
306        self.active_processing_contexts.insert(processing_id, context);
307        Ok(())
308    }
309
310    /// Validates the aggregate state
311    pub fn validate(&self) -> Result<(), PipelineError> {
312        self.pipeline.validate()?;
313
314        // Validate all active processing contexts
315        for context in self.active_processing_contexts.values() {
316            context.security_context().validate()?;
317        }
318
319        Ok(())
320    }
321
322    /// Adds an event to uncommitted events
323    fn add_event(&mut self, event: PipelineEvent) {
324        self.version += 1;
325        self.uncommitted_events.push(event);
326    }
327
328    /// Applies an event to the aggregate state
329    fn apply_event(&mut self, event: &PipelineEvent) -> Result<(), PipelineError> {
330        match event {
331            PipelineEvent::PipelineCreated(_) => {
332                // Pipeline already created in constructor
333                self.version += 1;
334            }
335            PipelineEvent::PipelineUpdated(_) => {
336                // Pipeline updates would be applied here
337                self.version += 1;
338            }
339            PipelineEvent::ProcessingStarted(event) => {
340                let context = ProcessingContext::new(
341                    event.file_size,
342                    event.security_context.clone(),
343                );
344                self.active_processing_contexts.insert(event.processing_id, context);
345                self.version += 1;
346            }
347            PipelineEvent::ProcessingCompleted(event) => {
348                self.active_processing_contexts.remove(&event.processing_id);
349                self.version += 1;
350            }
351            PipelineEvent::ProcessingFailed(event) => {
352                self.active_processing_contexts.remove(&event.processing_id);
353                self.version += 1;
354            }
355            _ => {
356                // Handle other events as needed
357                self.version += 1;
358            }
359        }
360
361        Ok(())
362    }
363
364    /// Gets the aggregate ID
365    pub fn id(&self) -> Uuid {
366        pipeline_id_to_uuid(self.pipeline.id())
367    }
368
369    /// Checks if the aggregate has uncommitted events
370    pub fn has_uncommitted_events(&self) -> bool {
371        !self.uncommitted_events.is_empty()
372    }
373
374    /// Gets the number of active processing contexts
375    pub fn active_processing_count(&self) -> usize {
376        self.active_processing_contexts.len()
377    }
378
379    /// Checks if processing is active
380    pub fn is_processing_active(&self) -> bool {
381        !self.active_processing_contexts.is_empty()
382    }
383}