adaptive_pipeline_domain/aggregates/
pipeline_aggregate.rs1use crate::entities::pipeline::pipeline_id_to_uuid;
11use crate::events::{
12 PipelineCreatedEvent, PipelineUpdatedEvent, ProcessingCompletedEvent, ProcessingFailedEvent, ProcessingStartedEvent,
13};
14use crate::{Pipeline, PipelineError, PipelineEvent, ProcessingContext, ProcessingMetrics, SecurityContext};
15use std::collections::HashMap;
16use uuid::Uuid;
17#[derive(Debug, Clone)]
63pub struct PipelineAggregate {
64 pipeline: Pipeline,
65 version: u64,
66 uncommitted_events: Vec<PipelineEvent>,
67 active_processing_contexts: HashMap<Uuid, ProcessingContext>,
68}
69
70impl PipelineAggregate {
71 pub fn new(pipeline: Pipeline) -> Result<Self, PipelineError> {
73 pipeline.validate()?;
74
75 let mut aggregate = Self {
76 pipeline: pipeline.clone(),
77 version: 1,
78 uncommitted_events: Vec::new(),
79 active_processing_contexts: HashMap::new(),
80 };
81
82 let event = PipelineCreatedEvent::new(
84 pipeline_id_to_uuid(pipeline.id()),
85 pipeline.name().to_string(),
86 pipeline.stages().len(),
87 None, );
89 aggregate.add_event(PipelineEvent::PipelineCreated(event));
90
91 Ok(aggregate)
92 }
93
94 pub fn from_events(events: Vec<PipelineEvent>) -> Result<Self, PipelineError> {
96 if events.is_empty() {
97 return Err(PipelineError::InvalidConfiguration("No events provided".to_string()));
98 }
99
100 let created_event = events
102 .iter()
103 .find_map(|e| match e {
104 PipelineEvent::PipelineCreated(event) => Some(event),
105 _ => None,
106 })
107 .ok_or_else(|| PipelineError::InvalidConfiguration("No PipelineCreated event found".to_string()))?;
108
109 let pipeline = Pipeline::new(
111 created_event.pipeline_name.clone(),
112 Vec::new(), )?;
114
115 let mut aggregate = Self {
116 pipeline,
117 version: 0,
118 uncommitted_events: Vec::new(),
119 active_processing_contexts: HashMap::new(),
120 };
121
122 for event in events {
124 aggregate.apply_event(&event)?;
125 }
126
127 Ok(aggregate)
128 }
129
130 pub fn pipeline(&self) -> &Pipeline {
132 &self.pipeline
133 }
134
135 pub fn version(&self) -> u64 {
137 self.version
138 }
139
140 pub fn uncommitted_events(&self) -> &[PipelineEvent] {
142 &self.uncommitted_events
143 }
144
145 pub fn mark_events_as_committed(&mut self) {
147 self.uncommitted_events.clear();
148 }
149
150 pub fn update_pipeline(&mut self, updated_pipeline: Pipeline) -> Result<(), PipelineError> {
152 updated_pipeline.validate()?;
153
154 let mut changes = Vec::new();
156 if self.pipeline.name() != updated_pipeline.name() {
157 changes.push(format!(
158 "Name changed from '{}' to '{}'",
159 self.pipeline.name(),
160 updated_pipeline.name()
161 ));
162 }
163 if self.pipeline.stages().len() != updated_pipeline.stages().len() {
164 changes.push(format!(
165 "Stage count changed from {} to {}",
166 self.pipeline.stages().len(),
167 updated_pipeline.stages().len()
168 ));
169 }
170
171 self.pipeline = updated_pipeline;
172
173 let event = PipelineUpdatedEvent {
175 event_id: Uuid::new_v4(),
176 pipeline_id: pipeline_id_to_uuid(self.pipeline.id()),
177 changes,
178 updated_by: None, occurred_at: chrono::Utc::now(),
180 version: self.version + 1,
181 };
182 self.add_event(PipelineEvent::PipelineUpdated(event));
183
184 Ok(())
185 }
186
187 pub fn start_processing(
189 &mut self,
190 input_path: String,
191 output_path: String,
192 file_size: u64,
193 security_context: SecurityContext,
194 ) -> Result<Uuid, PipelineError> {
195 security_context.validate()?;
197
198 let processing_id = Uuid::new_v4();
200 let context = ProcessingContext::new(
201 file_size,
202 security_context.clone(),
203 );
204
205 self.active_processing_contexts.insert(processing_id, context);
206
207 let event = ProcessingStartedEvent::new(
209 pipeline_id_to_uuid(self.pipeline.id()),
210 processing_id,
211 input_path,
212 output_path,
213 file_size,
214 security_context,
215 );
216 self.add_event(PipelineEvent::ProcessingStarted(event));
217
218 Ok(processing_id)
219 }
220
221 pub fn complete_processing(
223 &mut self,
224 processing_id: Uuid,
225 metrics: ProcessingMetrics,
226 output_size: u64,
227 ) -> Result<(), PipelineError> {
228 if !self.active_processing_contexts.contains_key(&processing_id) {
229 return Err(PipelineError::InvalidConfiguration(
230 "Processing context not found".to_string(),
231 ));
232 }
233
234 self.active_processing_contexts.remove(&processing_id);
236
237 let event = ProcessingCompletedEvent::new(
239 pipeline_id_to_uuid(self.pipeline.id()),
240 processing_id,
241 metrics,
242 output_size,
243 );
244 self.add_event(PipelineEvent::ProcessingCompleted(event));
245
246 Ok(())
247 }
248
249 pub fn fail_processing(
251 &mut self,
252 processing_id: Uuid,
253 error_message: String,
254 error_code: String,
255 stage_name: Option<String>,
256 partial_metrics: Option<ProcessingMetrics>,
257 ) -> Result<(), PipelineError> {
258 if !self.active_processing_contexts.contains_key(&processing_id) {
259 return Err(PipelineError::InvalidConfiguration(
260 "Processing context not found".to_string(),
261 ));
262 }
263
264 self.active_processing_contexts.remove(&processing_id);
266
267 let event = ProcessingFailedEvent {
269 event_id: Uuid::new_v4(),
270 pipeline_id: pipeline_id_to_uuid(self.pipeline.id()),
271 processing_id,
272 error_message,
273 error_code,
274 stage_name,
275 partial_metrics,
276 occurred_at: chrono::Utc::now(),
277 version: self.version + 1,
278 };
279 self.add_event(PipelineEvent::ProcessingFailed(event));
280
281 Ok(())
282 }
283
284 pub fn active_processing_contexts(&self) -> &HashMap<Uuid, ProcessingContext> {
286 &self.active_processing_contexts
287 }
288
289 pub fn get_processing_context(&self, processing_id: Uuid) -> Option<&ProcessingContext> {
291 self.active_processing_contexts.get(&processing_id)
292 }
293
294 pub fn update_processing_context(
296 &mut self,
297 processing_id: Uuid,
298 context: ProcessingContext,
299 ) -> Result<(), PipelineError> {
300 if !self.active_processing_contexts.contains_key(&processing_id) {
301 return Err(PipelineError::InvalidConfiguration(
302 "Processing context not found".to_string(),
303 ));
304 }
305
306 self.active_processing_contexts.insert(processing_id, context);
307 Ok(())
308 }
309
310 pub fn validate(&self) -> Result<(), PipelineError> {
312 self.pipeline.validate()?;
313
314 for context in self.active_processing_contexts.values() {
316 context.security_context().validate()?;
317 }
318
319 Ok(())
320 }
321
322 fn add_event(&mut self, event: PipelineEvent) {
324 self.version += 1;
325 self.uncommitted_events.push(event);
326 }
327
328 fn apply_event(&mut self, event: &PipelineEvent) -> Result<(), PipelineError> {
330 match event {
331 PipelineEvent::PipelineCreated(_) => {
332 self.version += 1;
334 }
335 PipelineEvent::PipelineUpdated(_) => {
336 self.version += 1;
338 }
339 PipelineEvent::ProcessingStarted(event) => {
340 let context = ProcessingContext::new(
341 event.file_size,
342 event.security_context.clone(),
343 );
344 self.active_processing_contexts.insert(event.processing_id, context);
345 self.version += 1;
346 }
347 PipelineEvent::ProcessingCompleted(event) => {
348 self.active_processing_contexts.remove(&event.processing_id);
349 self.version += 1;
350 }
351 PipelineEvent::ProcessingFailed(event) => {
352 self.active_processing_contexts.remove(&event.processing_id);
353 self.version += 1;
354 }
355 _ => {
356 self.version += 1;
358 }
359 }
360
361 Ok(())
362 }
363
364 pub fn id(&self) -> Uuid {
366 pipeline_id_to_uuid(self.pipeline.id())
367 }
368
369 pub fn has_uncommitted_events(&self) -> bool {
371 !self.uncommitted_events.is_empty()
372 }
373
374 pub fn active_processing_count(&self) -> usize {
376 self.active_processing_contexts.len()
377 }
378
379 pub fn is_processing_active(&self) -> bool {
381 !self.active_processing_contexts.is_empty()
382 }
383}