adaptive_pipeline_domain/events/
pipeline_events.rs

1// /////////////////////////////////////////////////////////////////////////////
2// Adaptive Pipeline
3// Copyright (c) 2025 Michael Gardner, A Bit of Help, Inc.
4// SPDX-License-Identifier: BSD-3-Clause
5// See LICENSE file in the project root.
6// /////////////////////////////////////////////////////////////////////////////
7
8//! # Pipeline Domain Events
9//!
10//! This module defines domain events for the pipeline processing system,
11//! implementing event-driven architecture patterns for decoupled communication
12//! and event sourcing.
13//!
14//! ## Overview
15//!
16//! Pipeline events capture significant business occurrences within the pipeline
17//! processing domain. These events enable:
18//!
19//! - **Event Sourcing**: Reconstruct aggregate state from event history
20//! - **Integration**: Communicate with external systems and bounded contexts
21//! - **Audit Trail**: Track all significant operations for compliance
22//! - **Monitoring**: Real-time system observability and alerting
23//! - **Workflow Coordination**: Trigger downstream processes and reactions
24//!
25//! ## Event Categories
26//!
27//! ### Pipeline Lifecycle Events
28//! Events related to pipeline management:
29//! - `PipelineCreated`: New pipeline definition created
30//! - `PipelineUpdated`: Pipeline configuration modified
31//! - `PipelineDeleted`: Pipeline removed from system
32//!
33//! ### Processing Events
34//! Events during file processing operations:
35//! - `ProcessingStarted`: File processing initiated
36//! - `ProcessingCompleted`: Processing finished successfully
37//! - `ProcessingFailed`: Processing encountered errors
38//! - `ProcessingPaused`: Processing temporarily suspended
39//! - `ProcessingResumed`: Processing continued from pause
40//! - `ProcessingCancelled`: Processing terminated by user
41//!
42//! ### Stage Events
43//! Events for individual processing stages:
44//! - `StageStarted`: Processing stage began execution
45//! - `StageCompleted`: Stage finished successfully
46//! - `StageFailed`: Stage encountered errors
47//!
48//! ### Operational Events
49//! System and operational events:
50//! - `ChunkProcessed`: Individual data chunk processed
51//! - `MetricsUpdated`: Performance metrics updated
52//! - `SecurityViolation`: Security policy violation detected
53//! - `ResourceExhausted`: System resource limits reached
54//!
55//! ## Event Structure
56//!
57//! All events follow a consistent structure:
58//!
59//!
60//! ## Usage Examples
61//!
62//! ### Creating Pipeline Events
63//!
64//!
65//! ### Processing Events
66//!
67//!
68//! ### Security Events
69//!
70//!
71//! ## Event Sourcing Integration
72//!
73//! Events are designed for event sourcing patterns:
74//!
75//!
76//! ## Serialization and Persistence
77//!
78//! All events support JSON serialization for persistence:
79//!
80//!
81//! ## Event Versioning
82//!
83//! Events include version information for schema evolution:
84//!
85//! - **Version 1**: Initial event schema
86//! - **Future Versions**: Backward-compatible schema changes
87//! - **Migration**: Automatic handling of version differences
88//!
89//! ## Best Practices
90//!
91//! ### Event Design
92//! - **Immutable**: Events should never be modified after creation
93//! - **Complete**: Include all necessary information for event handlers
94//! - **Focused**: Each event should represent a single business occurrence
95//! - **Timestamped**: Always include accurate occurrence timestamps
96//!
97//! ### Event Handling
98//! - **Idempotent**: Event handlers should be safe to replay
99//! - **Atomic**: Handle events in atomic operations where possible
100//! - **Resilient**: Handle missing or corrupted events gracefully
101//! - **Ordered**: Process events in chronological order when sequence matters
102//!
103//! ### Performance Considerations
104//! - **Batching**: Process multiple events in batches for efficiency
105//! - **Async Processing**: Use asynchronous handlers for non-blocking
106//!   operations
107//! - **Partitioning**: Partition events by aggregate ID for parallel processing
108//! - **Compression**: Compress event payloads for storage efficiency
109//!
110//! ## Error Handling
111//!
112//! Event processing includes comprehensive error handling:
113//!
114//! - **Validation**: Events are validated before persistence
115//! - **Retry Logic**: Failed event processing can be retried
116//! - **Dead Letter Queue**: Persistently failing events are quarantined
117//! - **Monitoring**: Event processing failures trigger alerts
118//!
119//! ## Integration Patterns
120//!
121//! Events enable various integration patterns:
122//!
123//! - **Publish-Subscribe**: Broadcast events to multiple subscribers
124//! - **Event Streaming**: Real-time event processing pipelines
125//! - **Saga Orchestration**: Coordinate complex multi-step processes
126//! - **CQRS**: Separate read and write models with event synchronization
127
128use crate::services::datetime_serde;
129use crate::{ProcessingMetrics, SecurityContext};
130use serde::{Deserialize, Serialize};
131use uuid::Uuid;
132
133/// Domain events for pipeline processing operations
134///
135/// This enum represents all possible events that can occur within the pipeline
136/// processing domain. Each variant contains a specific event type with its
137/// associated data payload.
138#[derive(Debug, Clone, Serialize, Deserialize)]
139pub enum PipelineEvent {
140    PipelineCreated(PipelineCreatedEvent),
141    PipelineUpdated(PipelineUpdatedEvent),
142    PipelineDeleted(PipelineDeletedEvent),
143    ProcessingStarted(ProcessingStartedEvent),
144    ProcessingCompleted(ProcessingCompletedEvent),
145    ProcessingFailed(ProcessingFailedEvent),
146    ProcessingPaused(ProcessingPausedEvent),
147    ProcessingResumed(ProcessingResumedEvent),
148    ProcessingCancelled(ProcessingCancelledEvent),
149    StageStarted(StageStartedEvent),
150    StageCompleted(StageCompletedEvent),
151    StageFailed(StageFailedEvent),
152    ChunkProcessed(ChunkProcessedEvent),
153    MetricsUpdated(MetricsUpdatedEvent),
154    SecurityViolation(SecurityViolationEvent),
155    ResourceExhausted(ResourceExhaustedEvent),
156}
157
158/// Base event trait
159pub trait DomainEvent {
160    fn event_id(&self) -> Uuid;
161    fn aggregate_id(&self) -> Uuid;
162    fn event_type(&self) -> &'static str;
163    fn occurred_at(&self) -> chrono::DateTime<chrono::Utc>;
164    fn version(&self) -> u64;
165}
166
167/// Pipeline created event
168#[derive(Debug, Clone, Serialize, Deserialize)]
169pub struct PipelineCreatedEvent {
170    pub event_id: Uuid,
171    pub pipeline_id: Uuid,
172    pub pipeline_name: String,
173    pub stage_count: usize,
174    pub created_by: Option<String>,
175    #[serde(with = "datetime_serde")]
176    pub occurred_at: chrono::DateTime<chrono::Utc>,
177    pub version: u64,
178}
179
180/// Pipeline updated event
181#[derive(Debug, Clone, Serialize, Deserialize)]
182pub struct PipelineUpdatedEvent {
183    pub event_id: Uuid,
184    pub pipeline_id: Uuid,
185    pub changes: Vec<String>,
186    pub updated_by: Option<String>,
187    #[serde(with = "datetime_serde")]
188    pub occurred_at: chrono::DateTime<chrono::Utc>,
189    pub version: u64,
190}
191
192/// Pipeline deleted event
193#[derive(Debug, Clone, Serialize, Deserialize)]
194pub struct PipelineDeletedEvent {
195    pub event_id: Uuid,
196    pub pipeline_id: Uuid,
197    pub deleted_by: Option<String>,
198    #[serde(with = "datetime_serde")]
199    pub occurred_at: chrono::DateTime<chrono::Utc>,
200    pub version: u64,
201}
202
203/// Processing started event
204#[derive(Debug, Clone, Serialize, Deserialize)]
205pub struct ProcessingStartedEvent {
206    pub event_id: Uuid,
207    pub pipeline_id: Uuid,
208    pub processing_id: Uuid,
209    pub input_path: String,
210    pub output_path: String,
211    pub file_size: u64,
212    pub security_context: SecurityContext,
213    #[serde(with = "datetime_serde")]
214    pub occurred_at: chrono::DateTime<chrono::Utc>,
215    pub version: u64,
216}
217
218/// Processing completed event
219#[derive(Debug, Clone, Serialize, Deserialize)]
220pub struct ProcessingCompletedEvent {
221    pub event_id: Uuid,
222    pub pipeline_id: Uuid,
223    pub processing_id: Uuid,
224    pub metrics: ProcessingMetrics,
225    pub output_size: u64,
226    #[serde(with = "datetime_serde")]
227    pub occurred_at: chrono::DateTime<chrono::Utc>,
228    pub version: u64,
229}
230
231/// Processing failed event
232#[derive(Debug, Clone, Serialize, Deserialize)]
233pub struct ProcessingFailedEvent {
234    pub event_id: Uuid,
235    pub pipeline_id: Uuid,
236    pub processing_id: Uuid,
237    pub error_message: String,
238    pub error_code: String,
239    pub stage_name: Option<String>,
240    pub partial_metrics: Option<ProcessingMetrics>,
241    #[serde(with = "datetime_serde")]
242    pub occurred_at: chrono::DateTime<chrono::Utc>,
243    pub version: u64,
244}
245
246/// Processing paused event
247#[derive(Debug, Clone, Serialize, Deserialize)]
248pub struct ProcessingPausedEvent {
249    pub event_id: Uuid,
250    pub pipeline_id: Uuid,
251    pub processing_id: Uuid,
252    pub reason: String,
253    pub checkpoint_data: Option<Vec<u8>>,
254    #[serde(with = "datetime_serde")]
255    pub occurred_at: chrono::DateTime<chrono::Utc>,
256    pub version: u64,
257}
258
259/// Processing resumed event
260#[derive(Debug, Clone, Serialize, Deserialize)]
261pub struct ProcessingResumedEvent {
262    pub event_id: Uuid,
263    pub pipeline_id: Uuid,
264    pub processing_id: Uuid,
265    pub resumed_from_checkpoint: bool,
266    #[serde(with = "datetime_serde")]
267    pub occurred_at: chrono::DateTime<chrono::Utc>,
268    pub version: u64,
269}
270
271/// Processing cancelled event
272#[derive(Debug, Clone, Serialize, Deserialize)]
273pub struct ProcessingCancelledEvent {
274    pub event_id: Uuid,
275    pub pipeline_id: Uuid,
276    pub processing_id: Uuid,
277    pub reason: String,
278    pub cancelled_by: Option<String>,
279    #[serde(with = "datetime_serde")]
280    pub occurred_at: chrono::DateTime<chrono::Utc>,
281    pub version: u64,
282}
283
284/// Stage started event
285#[derive(Debug, Clone, Serialize, Deserialize)]
286pub struct StageStartedEvent {
287    pub event_id: Uuid,
288    pub pipeline_id: Uuid,
289    pub processing_id: Uuid,
290    pub stage_id: Uuid,
291    pub stage_name: String,
292    pub stage_type: String,
293    #[serde(with = "datetime_serde")]
294    pub occurred_at: chrono::DateTime<chrono::Utc>,
295    pub version: u64,
296}
297
298/// Stage completed event
299#[derive(Debug, Clone, Serialize, Deserialize)]
300pub struct StageCompletedEvent {
301    pub event_id: Uuid,
302    pub pipeline_id: Uuid,
303    pub processing_id: Uuid,
304    pub stage_id: Uuid,
305    pub stage_name: String,
306    pub processing_time_ms: u64,
307    pub bytes_processed: u64,
308    #[serde(with = "datetime_serde")]
309    pub occurred_at: chrono::DateTime<chrono::Utc>,
310    pub version: u64,
311}
312
313/// Stage failed event
314#[derive(Debug, Clone, Serialize, Deserialize)]
315pub struct StageFailedEvent {
316    pub event_id: Uuid,
317    pub pipeline_id: Uuid,
318    pub processing_id: Uuid,
319    pub stage_id: Uuid,
320    pub stage_name: String,
321    pub error_message: String,
322    pub error_code: String,
323    #[serde(with = "datetime_serde")]
324    pub occurred_at: chrono::DateTime<chrono::Utc>,
325    pub version: u64,
326}
327
328/// Chunk processed event
329#[derive(Debug, Clone, Serialize, Deserialize)]
330pub struct ChunkProcessedEvent {
331    pub event_id: Uuid,
332    pub pipeline_id: Uuid,
333    pub processing_id: Uuid,
334    pub chunk_id: Uuid,
335    pub chunk_sequence: u64,
336    pub chunk_size: usize,
337    pub stage_name: String,
338    pub processing_time_ms: u64,
339    #[serde(with = "datetime_serde")]
340    pub occurred_at: chrono::DateTime<chrono::Utc>,
341    pub version: u64,
342}
343
344/// Metrics updated event
345#[derive(Debug, Clone, Serialize, Deserialize)]
346pub struct MetricsUpdatedEvent {
347    pub event_id: Uuid,
348    pub pipeline_id: Uuid,
349    pub processing_id: Uuid,
350    pub metrics: ProcessingMetrics,
351    #[serde(with = "datetime_serde")]
352    pub occurred_at: chrono::DateTime<chrono::Utc>,
353    pub version: u64,
354}
355
356/// Security violation event
357#[derive(Debug, Clone, Serialize, Deserialize)]
358pub struct SecurityViolationEvent {
359    pub event_id: Uuid,
360    pub pipeline_id: Uuid,
361    pub processing_id: Option<Uuid>,
362    pub violation_type: String,
363    pub description: String,
364    pub severity: SecurityViolationSeverity,
365    pub user_id: Option<String>,
366    pub source_ip: Option<String>,
367    #[serde(with = "datetime_serde")]
368    pub occurred_at: chrono::DateTime<chrono::Utc>,
369    pub version: u64,
370}
371
372/// Resource exhausted event
373#[derive(Debug, Clone, Serialize, Deserialize)]
374pub struct ResourceExhaustedEvent {
375    pub event_id: Uuid,
376    pub pipeline_id: Uuid,
377    pub processing_id: Uuid,
378    pub resource_type: String,
379    pub current_usage: u64,
380    pub limit: u64,
381    pub action_taken: String,
382    #[serde(with = "datetime_serde")]
383    pub occurred_at: chrono::DateTime<chrono::Utc>,
384    pub version: u64,
385}
386
387/// Security violation severity levels
388#[derive(Debug, Clone, Serialize, Deserialize)]
389pub enum SecurityViolationSeverity {
390    Low,
391    Medium,
392    High,
393    Critical,
394}
395
396// Implement DomainEvent for all events
397impl DomainEvent for PipelineCreatedEvent {
398    fn event_id(&self) -> Uuid {
399        self.event_id
400    }
401    fn aggregate_id(&self) -> Uuid {
402        self.pipeline_id
403    }
404    fn event_type(&self) -> &'static str {
405        "PipelineCreated"
406    }
407    fn occurred_at(&self) -> chrono::DateTime<chrono::Utc> {
408        self.occurred_at
409    }
410    fn version(&self) -> u64 {
411        self.version
412    }
413}
414
415impl DomainEvent for ProcessingStartedEvent {
416    fn event_id(&self) -> Uuid {
417        self.event_id
418    }
419    fn aggregate_id(&self) -> Uuid {
420        self.pipeline_id
421    }
422    fn event_type(&self) -> &'static str {
423        "ProcessingStarted"
424    }
425    fn occurred_at(&self) -> chrono::DateTime<chrono::Utc> {
426        self.occurred_at
427    }
428    fn version(&self) -> u64 {
429        self.version
430    }
431}
432
433impl DomainEvent for ProcessingCompletedEvent {
434    fn event_id(&self) -> Uuid {
435        self.event_id
436    }
437    fn aggregate_id(&self) -> Uuid {
438        self.pipeline_id
439    }
440    fn event_type(&self) -> &'static str {
441        "ProcessingCompleted"
442    }
443    fn occurred_at(&self) -> chrono::DateTime<chrono::Utc> {
444        self.occurred_at
445    }
446    fn version(&self) -> u64 {
447        self.version
448    }
449}
450
451// Factory functions for creating events
452impl PipelineCreatedEvent {
453    pub fn new(pipeline_id: Uuid, pipeline_name: String, stage_count: usize, created_by: Option<String>) -> Self {
454        Self {
455            event_id: Uuid::new_v4(),
456            pipeline_id,
457            pipeline_name,
458            stage_count,
459            created_by,
460            occurred_at: chrono::Utc::now(),
461            version: 1,
462        }
463    }
464}
465
466impl ProcessingStartedEvent {
467    pub fn new(
468        pipeline_id: Uuid,
469        processing_id: Uuid,
470        input_path: String,
471        output_path: String,
472        file_size: u64,
473        security_context: SecurityContext,
474    ) -> Self {
475        Self {
476            event_id: Uuid::new_v4(),
477            pipeline_id,
478            processing_id,
479            input_path,
480            output_path,
481            file_size,
482            security_context,
483            occurred_at: chrono::Utc::now(),
484            version: 1,
485        }
486    }
487}
488
489impl ProcessingCompletedEvent {
490    pub fn new(pipeline_id: Uuid, processing_id: Uuid, metrics: ProcessingMetrics, output_size: u64) -> Self {
491        Self {
492            event_id: Uuid::new_v4(),
493            pipeline_id,
494            processing_id,
495            metrics,
496            output_size,
497            occurred_at: chrono::Utc::now(),
498            version: 1,
499        }
500    }
501}
502
503impl SecurityViolationEvent {
504    pub fn new(
505        pipeline_id: Uuid,
506        violation_type: String,
507        description: String,
508        severity: SecurityViolationSeverity,
509    ) -> Self {
510        Self {
511            event_id: Uuid::new_v4(),
512            pipeline_id,
513            processing_id: None,
514            violation_type,
515            description,
516            severity,
517            user_id: None,
518            source_ip: None,
519            occurred_at: chrono::Utc::now(),
520            version: 1,
521        }
522    }
523}