1use crate::services::datetime_serde;
129use crate::{ProcessingMetrics, SecurityContext};
130use serde::{Deserialize, Serialize};
131use uuid::Uuid;
132
133#[derive(Debug, Clone, Serialize, Deserialize)]
139pub enum PipelineEvent {
140 PipelineCreated(PipelineCreatedEvent),
141 PipelineUpdated(PipelineUpdatedEvent),
142 PipelineDeleted(PipelineDeletedEvent),
143 ProcessingStarted(ProcessingStartedEvent),
144 ProcessingCompleted(ProcessingCompletedEvent),
145 ProcessingFailed(ProcessingFailedEvent),
146 ProcessingPaused(ProcessingPausedEvent),
147 ProcessingResumed(ProcessingResumedEvent),
148 ProcessingCancelled(ProcessingCancelledEvent),
149 StageStarted(StageStartedEvent),
150 StageCompleted(StageCompletedEvent),
151 StageFailed(StageFailedEvent),
152 ChunkProcessed(ChunkProcessedEvent),
153 MetricsUpdated(MetricsUpdatedEvent),
154 SecurityViolation(SecurityViolationEvent),
155 ResourceExhausted(ResourceExhaustedEvent),
156}
157
158pub trait DomainEvent {
160 fn event_id(&self) -> Uuid;
161 fn aggregate_id(&self) -> Uuid;
162 fn event_type(&self) -> &'static str;
163 fn occurred_at(&self) -> chrono::DateTime<chrono::Utc>;
164 fn version(&self) -> u64;
165}
166
167#[derive(Debug, Clone, Serialize, Deserialize)]
169pub struct PipelineCreatedEvent {
170 pub event_id: Uuid,
171 pub pipeline_id: Uuid,
172 pub pipeline_name: String,
173 pub stage_count: usize,
174 pub created_by: Option<String>,
175 #[serde(with = "datetime_serde")]
176 pub occurred_at: chrono::DateTime<chrono::Utc>,
177 pub version: u64,
178}
179
180#[derive(Debug, Clone, Serialize, Deserialize)]
182pub struct PipelineUpdatedEvent {
183 pub event_id: Uuid,
184 pub pipeline_id: Uuid,
185 pub changes: Vec<String>,
186 pub updated_by: Option<String>,
187 #[serde(with = "datetime_serde")]
188 pub occurred_at: chrono::DateTime<chrono::Utc>,
189 pub version: u64,
190}
191
192#[derive(Debug, Clone, Serialize, Deserialize)]
194pub struct PipelineDeletedEvent {
195 pub event_id: Uuid,
196 pub pipeline_id: Uuid,
197 pub deleted_by: Option<String>,
198 #[serde(with = "datetime_serde")]
199 pub occurred_at: chrono::DateTime<chrono::Utc>,
200 pub version: u64,
201}
202
203#[derive(Debug, Clone, Serialize, Deserialize)]
205pub struct ProcessingStartedEvent {
206 pub event_id: Uuid,
207 pub pipeline_id: Uuid,
208 pub processing_id: Uuid,
209 pub input_path: String,
210 pub output_path: String,
211 pub file_size: u64,
212 pub security_context: SecurityContext,
213 #[serde(with = "datetime_serde")]
214 pub occurred_at: chrono::DateTime<chrono::Utc>,
215 pub version: u64,
216}
217
218#[derive(Debug, Clone, Serialize, Deserialize)]
220pub struct ProcessingCompletedEvent {
221 pub event_id: Uuid,
222 pub pipeline_id: Uuid,
223 pub processing_id: Uuid,
224 pub metrics: ProcessingMetrics,
225 pub output_size: u64,
226 #[serde(with = "datetime_serde")]
227 pub occurred_at: chrono::DateTime<chrono::Utc>,
228 pub version: u64,
229}
230
231#[derive(Debug, Clone, Serialize, Deserialize)]
233pub struct ProcessingFailedEvent {
234 pub event_id: Uuid,
235 pub pipeline_id: Uuid,
236 pub processing_id: Uuid,
237 pub error_message: String,
238 pub error_code: String,
239 pub stage_name: Option<String>,
240 pub partial_metrics: Option<ProcessingMetrics>,
241 #[serde(with = "datetime_serde")]
242 pub occurred_at: chrono::DateTime<chrono::Utc>,
243 pub version: u64,
244}
245
246#[derive(Debug, Clone, Serialize, Deserialize)]
248pub struct ProcessingPausedEvent {
249 pub event_id: Uuid,
250 pub pipeline_id: Uuid,
251 pub processing_id: Uuid,
252 pub reason: String,
253 pub checkpoint_data: Option<Vec<u8>>,
254 #[serde(with = "datetime_serde")]
255 pub occurred_at: chrono::DateTime<chrono::Utc>,
256 pub version: u64,
257}
258
259#[derive(Debug, Clone, Serialize, Deserialize)]
261pub struct ProcessingResumedEvent {
262 pub event_id: Uuid,
263 pub pipeline_id: Uuid,
264 pub processing_id: Uuid,
265 pub resumed_from_checkpoint: bool,
266 #[serde(with = "datetime_serde")]
267 pub occurred_at: chrono::DateTime<chrono::Utc>,
268 pub version: u64,
269}
270
271#[derive(Debug, Clone, Serialize, Deserialize)]
273pub struct ProcessingCancelledEvent {
274 pub event_id: Uuid,
275 pub pipeline_id: Uuid,
276 pub processing_id: Uuid,
277 pub reason: String,
278 pub cancelled_by: Option<String>,
279 #[serde(with = "datetime_serde")]
280 pub occurred_at: chrono::DateTime<chrono::Utc>,
281 pub version: u64,
282}
283
284#[derive(Debug, Clone, Serialize, Deserialize)]
286pub struct StageStartedEvent {
287 pub event_id: Uuid,
288 pub pipeline_id: Uuid,
289 pub processing_id: Uuid,
290 pub stage_id: Uuid,
291 pub stage_name: String,
292 pub stage_type: String,
293 #[serde(with = "datetime_serde")]
294 pub occurred_at: chrono::DateTime<chrono::Utc>,
295 pub version: u64,
296}
297
298#[derive(Debug, Clone, Serialize, Deserialize)]
300pub struct StageCompletedEvent {
301 pub event_id: Uuid,
302 pub pipeline_id: Uuid,
303 pub processing_id: Uuid,
304 pub stage_id: Uuid,
305 pub stage_name: String,
306 pub processing_time_ms: u64,
307 pub bytes_processed: u64,
308 #[serde(with = "datetime_serde")]
309 pub occurred_at: chrono::DateTime<chrono::Utc>,
310 pub version: u64,
311}
312
313#[derive(Debug, Clone, Serialize, Deserialize)]
315pub struct StageFailedEvent {
316 pub event_id: Uuid,
317 pub pipeline_id: Uuid,
318 pub processing_id: Uuid,
319 pub stage_id: Uuid,
320 pub stage_name: String,
321 pub error_message: String,
322 pub error_code: String,
323 #[serde(with = "datetime_serde")]
324 pub occurred_at: chrono::DateTime<chrono::Utc>,
325 pub version: u64,
326}
327
328#[derive(Debug, Clone, Serialize, Deserialize)]
330pub struct ChunkProcessedEvent {
331 pub event_id: Uuid,
332 pub pipeline_id: Uuid,
333 pub processing_id: Uuid,
334 pub chunk_id: Uuid,
335 pub chunk_sequence: u64,
336 pub chunk_size: usize,
337 pub stage_name: String,
338 pub processing_time_ms: u64,
339 #[serde(with = "datetime_serde")]
340 pub occurred_at: chrono::DateTime<chrono::Utc>,
341 pub version: u64,
342}
343
344#[derive(Debug, Clone, Serialize, Deserialize)]
346pub struct MetricsUpdatedEvent {
347 pub event_id: Uuid,
348 pub pipeline_id: Uuid,
349 pub processing_id: Uuid,
350 pub metrics: ProcessingMetrics,
351 #[serde(with = "datetime_serde")]
352 pub occurred_at: chrono::DateTime<chrono::Utc>,
353 pub version: u64,
354}
355
356#[derive(Debug, Clone, Serialize, Deserialize)]
358pub struct SecurityViolationEvent {
359 pub event_id: Uuid,
360 pub pipeline_id: Uuid,
361 pub processing_id: Option<Uuid>,
362 pub violation_type: String,
363 pub description: String,
364 pub severity: SecurityViolationSeverity,
365 pub user_id: Option<String>,
366 pub source_ip: Option<String>,
367 #[serde(with = "datetime_serde")]
368 pub occurred_at: chrono::DateTime<chrono::Utc>,
369 pub version: u64,
370}
371
372#[derive(Debug, Clone, Serialize, Deserialize)]
374pub struct ResourceExhaustedEvent {
375 pub event_id: Uuid,
376 pub pipeline_id: Uuid,
377 pub processing_id: Uuid,
378 pub resource_type: String,
379 pub current_usage: u64,
380 pub limit: u64,
381 pub action_taken: String,
382 #[serde(with = "datetime_serde")]
383 pub occurred_at: chrono::DateTime<chrono::Utc>,
384 pub version: u64,
385}
386
387#[derive(Debug, Clone, Serialize, Deserialize)]
389pub enum SecurityViolationSeverity {
390 Low,
391 Medium,
392 High,
393 Critical,
394}
395
396impl DomainEvent for PipelineCreatedEvent {
398 fn event_id(&self) -> Uuid {
399 self.event_id
400 }
401 fn aggregate_id(&self) -> Uuid {
402 self.pipeline_id
403 }
404 fn event_type(&self) -> &'static str {
405 "PipelineCreated"
406 }
407 fn occurred_at(&self) -> chrono::DateTime<chrono::Utc> {
408 self.occurred_at
409 }
410 fn version(&self) -> u64 {
411 self.version
412 }
413}
414
415impl DomainEvent for ProcessingStartedEvent {
416 fn event_id(&self) -> Uuid {
417 self.event_id
418 }
419 fn aggregate_id(&self) -> Uuid {
420 self.pipeline_id
421 }
422 fn event_type(&self) -> &'static str {
423 "ProcessingStarted"
424 }
425 fn occurred_at(&self) -> chrono::DateTime<chrono::Utc> {
426 self.occurred_at
427 }
428 fn version(&self) -> u64 {
429 self.version
430 }
431}
432
433impl DomainEvent for ProcessingCompletedEvent {
434 fn event_id(&self) -> Uuid {
435 self.event_id
436 }
437 fn aggregate_id(&self) -> Uuid {
438 self.pipeline_id
439 }
440 fn event_type(&self) -> &'static str {
441 "ProcessingCompleted"
442 }
443 fn occurred_at(&self) -> chrono::DateTime<chrono::Utc> {
444 self.occurred_at
445 }
446 fn version(&self) -> u64 {
447 self.version
448 }
449}
450
451impl PipelineCreatedEvent {
453 pub fn new(pipeline_id: Uuid, pipeline_name: String, stage_count: usize, created_by: Option<String>) -> Self {
454 Self {
455 event_id: Uuid::new_v4(),
456 pipeline_id,
457 pipeline_name,
458 stage_count,
459 created_by,
460 occurred_at: chrono::Utc::now(),
461 version: 1,
462 }
463 }
464}
465
466impl ProcessingStartedEvent {
467 pub fn new(
468 pipeline_id: Uuid,
469 processing_id: Uuid,
470 input_path: String,
471 output_path: String,
472 file_size: u64,
473 security_context: SecurityContext,
474 ) -> Self {
475 Self {
476 event_id: Uuid::new_v4(),
477 pipeline_id,
478 processing_id,
479 input_path,
480 output_path,
481 file_size,
482 security_context,
483 occurred_at: chrono::Utc::now(),
484 version: 1,
485 }
486 }
487}
488
489impl ProcessingCompletedEvent {
490 pub fn new(pipeline_id: Uuid, processing_id: Uuid, metrics: ProcessingMetrics, output_size: u64) -> Self {
491 Self {
492 event_id: Uuid::new_v4(),
493 pipeline_id,
494 processing_id,
495 metrics,
496 output_size,
497 occurred_at: chrono::Utc::now(),
498 version: 1,
499 }
500 }
501}
502
503impl SecurityViolationEvent {
504 pub fn new(
505 pipeline_id: Uuid,
506 violation_type: String,
507 description: String,
508 severity: SecurityViolationSeverity,
509 ) -> Self {
510 Self {
511 event_id: Uuid::new_v4(),
512 pipeline_id,
513 processing_id: None,
514 violation_type,
515 description,
516 severity,
517 user_id: None,
518 source_ip: None,
519 occurred_at: chrono::Utc::now(),
520 version: 1,
521 }
522 }
523}