1use crate::domain::value_objects::{EventType, TenantId};
2use crate::error::Result;
3use chrono::{DateTime, Utc};
4use serde::{Deserialize, Serialize};
5use uuid::Uuid;
6
7#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
11#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
12pub enum ProjectionStatus {
13 Created,
15 Running,
17 Paused,
19 Failed,
21 Stopped,
23 Rebuilding,
25}
26
27impl ProjectionStatus {
28 pub fn is_active(&self) -> bool {
30 matches!(self, Self::Running | Self::Rebuilding)
31 }
32
33 pub fn can_start(&self) -> bool {
35 matches!(self, Self::Created | Self::Stopped | Self::Paused)
36 }
37
38 pub fn can_pause(&self) -> bool {
40 matches!(self, Self::Running)
41 }
42
43 pub fn can_stop(&self) -> bool {
45 !matches!(self, Self::Stopped)
46 }
47
48 pub fn is_failed(&self) -> bool {
50 matches!(self, Self::Failed)
51 }
52}
53
54#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
58#[serde(rename_all = "snake_case")]
59pub enum ProjectionType {
60 EntitySnapshot,
62 EventCounter,
64 Custom,
66 TimeSeries,
68 Funnel,
70}
71
72#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct ProjectionConfig {
77 pub batch_size: usize,
79 pub enable_checkpoints: bool,
81 pub checkpoint_interval: usize,
83 pub parallel_processing: bool,
85 pub max_concurrency: usize,
87}
88
89impl Default for ProjectionConfig {
90 fn default() -> Self {
91 Self {
92 batch_size: 100,
93 enable_checkpoints: true,
94 checkpoint_interval: 1000,
95 parallel_processing: false,
96 max_concurrency: 4,
97 }
98 }
99}
100
101#[derive(Debug, Clone, Serialize, Deserialize)]
105pub struct ProjectionStats {
106 events_processed: u64,
107 last_processed_at: Option<DateTime<Utc>>,
108 last_checkpoint_at: Option<DateTime<Utc>>,
109 errors_count: u64,
110 last_error_at: Option<DateTime<Utc>>,
111 processing_time_ms: u64,
112}
113
114impl ProjectionStats {
115 pub fn new() -> Self {
116 Self {
117 events_processed: 0,
118 last_processed_at: None,
119 last_checkpoint_at: None,
120 errors_count: 0,
121 last_error_at: None,
122 processing_time_ms: 0,
123 }
124 }
125
126 pub fn events_processed(&self) -> u64 {
128 self.events_processed
129 }
130
131 pub fn errors_count(&self) -> u64 {
132 self.errors_count
133 }
134
135 pub fn last_processed_at(&self) -> Option<DateTime<Utc>> {
136 self.last_processed_at
137 }
138
139 pub fn processing_time_ms(&self) -> u64 {
140 self.processing_time_ms
141 }
142
143 pub fn record_event_processed(&mut self, processing_time_ms: u64) {
145 self.events_processed += 1;
146 self.last_processed_at = Some(Utc::now());
147 self.processing_time_ms += processing_time_ms;
148 }
149
150 pub fn record_error(&mut self) {
151 self.errors_count += 1;
152 self.last_error_at = Some(Utc::now());
153 }
154
155 pub fn record_checkpoint(&mut self) {
156 self.last_checkpoint_at = Some(Utc::now());
157 }
158
159 pub fn reset(&mut self) {
160 *self = Self::new();
161 }
162
163 pub fn avg_processing_time_ms(&self) -> f64 {
165 if self.events_processed == 0 {
166 0.0
167 } else {
168 self.processing_time_ms as f64 / self.events_processed as f64
169 }
170 }
171}
172
173impl Default for ProjectionStats {
174 fn default() -> Self {
175 Self::new()
176 }
177}
178
179#[derive(Debug, Clone, Serialize, Deserialize)]
192pub struct Projection {
193 id: Uuid,
194 tenant_id: TenantId,
195 name: String,
196 version: u32,
197 projection_type: ProjectionType,
198 status: ProjectionStatus,
199 config: ProjectionConfig,
200 stats: ProjectionStats,
201 created_at: DateTime<Utc>,
202 updated_at: DateTime<Utc>,
203 started_at: Option<DateTime<Utc>>,
204 stopped_at: Option<DateTime<Utc>>,
205 description: Option<String>,
206 event_types: Vec<EventType>,
208 metadata: serde_json::Value,
210}
211
212impl Projection {
213 pub fn new(
215 tenant_id: TenantId,
216 name: String,
217 version: u32,
218 projection_type: ProjectionType,
219 ) -> Result<Self> {
220 Self::validate_name(&name)?;
221 Self::validate_version(version)?;
222
223 let now = Utc::now();
224 Ok(Self {
225 id: Uuid::new_v4(),
226 tenant_id,
227 name,
228 version,
229 projection_type,
230 status: ProjectionStatus::Created,
231 config: ProjectionConfig::default(),
232 stats: ProjectionStats::new(),
233 created_at: now,
234 updated_at: now,
235 started_at: None,
236 stopped_at: None,
237 description: None,
238 event_types: Vec::new(),
239 metadata: serde_json::json!({}),
240 })
241 }
242
243 pub fn new_v1(
245 tenant_id: TenantId,
246 name: String,
247 projection_type: ProjectionType,
248 ) -> Result<Self> {
249 Self::new(tenant_id, name, 1, projection_type)
250 }
251
252 #[allow(clippy::too_many_arguments)]
254 pub fn reconstruct(
255 id: Uuid,
256 tenant_id: TenantId,
257 name: String,
258 version: u32,
259 projection_type: ProjectionType,
260 status: ProjectionStatus,
261 config: ProjectionConfig,
262 stats: ProjectionStats,
263 created_at: DateTime<Utc>,
264 updated_at: DateTime<Utc>,
265 started_at: Option<DateTime<Utc>>,
266 stopped_at: Option<DateTime<Utc>>,
267 description: Option<String>,
268 event_types: Vec<EventType>,
269 metadata: serde_json::Value,
270 ) -> Self {
271 Self {
272 id,
273 tenant_id,
274 name,
275 version,
276 projection_type,
277 status,
278 config,
279 stats,
280 created_at,
281 updated_at,
282 started_at,
283 stopped_at,
284 description,
285 event_types,
286 metadata,
287 }
288 }
289
290 pub fn id(&self) -> Uuid {
293 self.id
294 }
295
296 pub fn tenant_id(&self) -> &TenantId {
297 &self.tenant_id
298 }
299
300 pub fn name(&self) -> &str {
301 &self.name
302 }
303
304 pub fn version(&self) -> u32 {
305 self.version
306 }
307
308 pub fn projection_type(&self) -> ProjectionType {
309 self.projection_type
310 }
311
312 pub fn status(&self) -> ProjectionStatus {
313 self.status
314 }
315
316 pub fn config(&self) -> &ProjectionConfig {
317 &self.config
318 }
319
320 pub fn stats(&self) -> &ProjectionStats {
321 &self.stats
322 }
323
324 pub fn created_at(&self) -> DateTime<Utc> {
325 self.created_at
326 }
327
328 pub fn updated_at(&self) -> DateTime<Utc> {
329 self.updated_at
330 }
331
332 pub fn description(&self) -> Option<&str> {
333 self.description.as_deref()
334 }
335
336 pub fn event_types(&self) -> &[EventType] {
337 &self.event_types
338 }
339
340 pub fn metadata(&self) -> &serde_json::Value {
341 &self.metadata
342 }
343
344 pub fn start(&mut self) -> Result<()> {
348 if !self.status.can_start() {
349 return Err(crate::error::AllSourceError::ValidationError(format!(
350 "Cannot start projection in status {:?}",
351 self.status
352 )));
353 }
354
355 self.status = ProjectionStatus::Running;
356 self.started_at = Some(Utc::now());
357 self.updated_at = Utc::now();
358 Ok(())
359 }
360
361 pub fn pause(&mut self) -> Result<()> {
363 if !self.status.can_pause() {
364 return Err(crate::error::AllSourceError::ValidationError(format!(
365 "Cannot pause projection in status {:?}",
366 self.status
367 )));
368 }
369
370 self.status = ProjectionStatus::Paused;
371 self.updated_at = Utc::now();
372 Ok(())
373 }
374
375 pub fn stop(&mut self) -> Result<()> {
377 if !self.status.can_stop() {
378 return Err(crate::error::AllSourceError::ValidationError(format!(
379 "Cannot stop projection in status {:?}",
380 self.status
381 )));
382 }
383
384 self.status = ProjectionStatus::Stopped;
385 self.stopped_at = Some(Utc::now());
386 self.updated_at = Utc::now();
387 Ok(())
388 }
389
390 pub fn mark_failed(&mut self) {
392 self.status = ProjectionStatus::Failed;
393 self.updated_at = Utc::now();
394 }
395
396 pub fn start_rebuild(&mut self) -> Result<()> {
398 self.status = ProjectionStatus::Rebuilding;
399 self.stats.reset();
400 self.updated_at = Utc::now();
401 Ok(())
402 }
403
404 pub fn update_config(&mut self, config: ProjectionConfig) {
406 self.config = config;
407 self.updated_at = Utc::now();
408 }
409
410 pub fn set_description(&mut self, description: String) -> Result<()> {
412 Self::validate_description(&description)?;
413 self.description = Some(description);
414 self.updated_at = Utc::now();
415 Ok(())
416 }
417
418 pub fn add_event_type(&mut self, event_type: EventType) -> Result<()> {
420 if self.event_types.contains(&event_type) {
421 return Err(crate::error::AllSourceError::InvalidInput(format!(
422 "Event type '{}' already in filter",
423 event_type.as_str()
424 )));
425 }
426
427 self.event_types.push(event_type);
428 self.updated_at = Utc::now();
429 Ok(())
430 }
431
432 pub fn remove_event_type(&mut self, event_type: &EventType) -> Result<()> {
434 let initial_len = self.event_types.len();
435 self.event_types.retain(|et| et != event_type);
436
437 if self.event_types.len() == initial_len {
438 return Err(crate::error::AllSourceError::InvalidInput(format!(
439 "Event type '{}' not in filter",
440 event_type.as_str()
441 )));
442 }
443
444 self.updated_at = Utc::now();
445 Ok(())
446 }
447
448 pub fn processes_event_type(&self, event_type: &EventType) -> bool {
450 self.event_types.is_empty() || self.event_types.contains(event_type)
452 }
453
454 pub fn update_metadata(&mut self, metadata: serde_json::Value) {
456 self.metadata = metadata;
457 self.updated_at = Utc::now();
458 }
459
460 pub fn stats_mut(&mut self) -> &mut ProjectionStats {
462 self.updated_at = Utc::now();
463 &mut self.stats
464 }
465
466 pub fn is_first_version(&self) -> bool {
468 self.version == 1
469 }
470
471 pub fn belongs_to_tenant(&self, tenant_id: &TenantId) -> bool {
473 &self.tenant_id == tenant_id
474 }
475
476 pub fn create_next_version(&self) -> Result<Projection> {
478 Projection::new(
479 self.tenant_id.clone(),
480 self.name.clone(),
481 self.version + 1,
482 self.projection_type,
483 )
484 }
485
486 fn validate_name(name: &str) -> Result<()> {
489 if name.is_empty() {
490 return Err(crate::error::AllSourceError::InvalidInput(
491 "Projection name cannot be empty".to_string(),
492 ));
493 }
494
495 if name.len() > 100 {
496 return Err(crate::error::AllSourceError::InvalidInput(format!(
497 "Projection name cannot exceed 100 characters, got {}",
498 name.len()
499 )));
500 }
501
502 if !name
504 .chars()
505 .all(|c| c.is_alphanumeric() || c == '_' || c == '-')
506 {
507 return Err(crate::error::AllSourceError::InvalidInput(format!(
508 "Projection name '{}' must be alphanumeric with underscores or hyphens",
509 name
510 )));
511 }
512
513 Ok(())
514 }
515
516 fn validate_version(version: u32) -> Result<()> {
517 if version == 0 {
518 return Err(crate::error::AllSourceError::InvalidInput(
519 "Projection version must be >= 1".to_string(),
520 ));
521 }
522 Ok(())
523 }
524
525 fn validate_description(description: &str) -> Result<()> {
526 if description.len() > 1000 {
527 return Err(crate::error::AllSourceError::InvalidInput(format!(
528 "Projection description cannot exceed 1000 characters, got {}",
529 description.len()
530 )));
531 }
532 Ok(())
533 }
534}
535
536#[cfg(test)]
537mod tests {
538 use super::*;
539
540 fn test_tenant_id() -> TenantId {
541 TenantId::new("test-tenant".to_string()).unwrap()
542 }
543
544 fn test_event_type() -> EventType {
545 EventType::new("test.event".to_string()).unwrap()
546 }
547
548 #[test]
549 fn test_create_projection() {
550 let projection = Projection::new(
551 test_tenant_id(),
552 "user_snapshot".to_string(),
553 1,
554 ProjectionType::EntitySnapshot,
555 );
556
557 assert!(projection.is_ok());
558 let projection = projection.unwrap();
559 assert_eq!(projection.name(), "user_snapshot");
560 assert_eq!(projection.version(), 1);
561 assert_eq!(projection.status(), ProjectionStatus::Created);
562 assert_eq!(projection.projection_type(), ProjectionType::EntitySnapshot);
563 }
564
565 #[test]
566 fn test_create_v1_projection() {
567 let projection = Projection::new_v1(
568 test_tenant_id(),
569 "event_counter".to_string(),
570 ProjectionType::EventCounter,
571 );
572
573 assert!(projection.is_ok());
574 let projection = projection.unwrap();
575 assert_eq!(projection.version(), 1);
576 assert!(projection.is_first_version());
577 }
578
579 #[test]
580 fn test_reject_empty_name() {
581 let result = Projection::new(test_tenant_id(), "".to_string(), 1, ProjectionType::Custom);
582
583 assert!(result.is_err());
584 }
585
586 #[test]
587 fn test_reject_too_long_name() {
588 let long_name = "a".repeat(101);
589 let result = Projection::new(test_tenant_id(), long_name, 1, ProjectionType::Custom);
590
591 assert!(result.is_err());
592 }
593
594 #[test]
595 fn test_reject_invalid_name_characters() {
596 let result = Projection::new(
597 test_tenant_id(),
598 "invalid name!".to_string(),
599 1,
600 ProjectionType::Custom,
601 );
602
603 assert!(result.is_err());
604 }
605
606 #[test]
607 fn test_accept_valid_names() {
608 let names = vec!["user_snapshot", "event-counter", "projection123"];
609
610 for name in names {
611 let result = Projection::new(
612 test_tenant_id(),
613 name.to_string(),
614 1,
615 ProjectionType::Custom,
616 );
617 assert!(result.is_ok(), "Name '{}' should be valid", name);
618 }
619 }
620
621 #[test]
622 fn test_reject_zero_version() {
623 let result = Projection::new(
624 test_tenant_id(),
625 "test_projection".to_string(),
626 0,
627 ProjectionType::Custom,
628 );
629
630 assert!(result.is_err());
631 }
632
633 #[test]
634 fn test_start_projection() {
635 let mut projection =
636 Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
637 .unwrap();
638
639 assert_eq!(projection.status(), ProjectionStatus::Created);
640 assert!(projection.started_at.is_none());
641
642 let result = projection.start();
643 assert!(result.is_ok());
644 assert_eq!(projection.status(), ProjectionStatus::Running);
645 assert!(projection.started_at.is_some());
646 }
647
648 #[test]
649 fn test_cannot_start_running_projection() {
650 let mut projection =
651 Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
652 .unwrap();
653
654 projection.start().unwrap();
655 let result = projection.start();
656 assert!(result.is_err());
657 }
658
659 #[test]
660 fn test_pause_projection() {
661 let mut projection =
662 Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
663 .unwrap();
664
665 projection.start().unwrap();
666 let result = projection.pause();
667 assert!(result.is_ok());
668 assert_eq!(projection.status(), ProjectionStatus::Paused);
669 }
670
671 #[test]
672 fn test_cannot_pause_non_running_projection() {
673 let mut projection =
674 Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
675 .unwrap();
676
677 let result = projection.pause();
678 assert!(result.is_err());
679 }
680
681 #[test]
682 fn test_stop_projection() {
683 let mut projection =
684 Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
685 .unwrap();
686
687 projection.start().unwrap();
688 let result = projection.stop();
689 assert!(result.is_ok());
690 assert_eq!(projection.status(), ProjectionStatus::Stopped);
691 assert!(projection.stopped_at.is_some());
692 }
693
694 #[test]
695 fn test_mark_failed() {
696 let mut projection =
697 Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
698 .unwrap();
699
700 projection.start().unwrap();
701 projection.mark_failed();
702 assert_eq!(projection.status(), ProjectionStatus::Failed);
703 assert!(projection.status().is_failed());
704 }
705
706 #[test]
707 fn test_start_rebuild() {
708 let mut projection =
709 Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
710 .unwrap();
711
712 projection.stats_mut().record_event_processed(10);
714 assert_eq!(projection.stats().events_processed(), 1);
715
716 let result = projection.start_rebuild();
717 assert!(result.is_ok());
718 assert_eq!(projection.status(), ProjectionStatus::Rebuilding);
719 assert_eq!(projection.stats().events_processed(), 0); }
721
722 #[test]
723 fn test_set_description() {
724 let mut projection =
725 Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
726 .unwrap();
727
728 let result = projection.set_description("Test projection".to_string());
729 assert!(result.is_ok());
730 assert_eq!(projection.description(), Some("Test projection"));
731 }
732
733 #[test]
734 fn test_reject_too_long_description() {
735 let mut projection =
736 Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
737 .unwrap();
738
739 let long_desc = "a".repeat(1001);
740 let result = projection.set_description(long_desc);
741 assert!(result.is_err());
742 }
743
744 #[test]
745 fn test_add_event_type() {
746 let mut projection =
747 Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
748 .unwrap();
749
750 let event_type = test_event_type();
751 let result = projection.add_event_type(event_type.clone());
752 assert!(result.is_ok());
753 assert_eq!(projection.event_types().len(), 1);
754 assert!(projection.processes_event_type(&event_type));
755 }
756
757 #[test]
758 fn test_reject_duplicate_event_type() {
759 let mut projection =
760 Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
761 .unwrap();
762
763 let event_type = test_event_type();
764 projection.add_event_type(event_type.clone()).unwrap();
765 let result = projection.add_event_type(event_type);
766 assert!(result.is_err());
767 }
768
769 #[test]
770 fn test_remove_event_type() {
771 let mut projection =
772 Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
773 .unwrap();
774
775 let event_type = test_event_type();
776 projection.add_event_type(event_type.clone()).unwrap();
777
778 let result = projection.remove_event_type(&event_type);
779 assert!(result.is_ok());
780 assert_eq!(projection.event_types().len(), 0);
781 }
782
783 #[test]
784 fn test_processes_all_events_when_no_filter() {
785 let projection =
786 Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
787 .unwrap();
788
789 let event_type = test_event_type();
790 assert!(projection.processes_event_type(&event_type));
791 }
792
793 #[test]
794 fn test_projection_stats() {
795 let mut projection =
796 Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
797 .unwrap();
798
799 projection.stats_mut().record_event_processed(10);
801 projection.stats_mut().record_event_processed(20);
802 projection.stats_mut().record_event_processed(30);
803
804 assert_eq!(projection.stats().events_processed(), 3);
805 assert_eq!(projection.stats().processing_time_ms(), 60);
806 assert_eq!(projection.stats().avg_processing_time_ms(), 20.0);
807 }
808
809 #[test]
810 fn test_stats_record_error() {
811 let mut projection =
812 Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
813 .unwrap();
814
815 projection.stats_mut().record_error();
816 projection.stats_mut().record_error();
817
818 assert_eq!(projection.stats().errors_count(), 2);
819 }
820
821 #[test]
822 fn test_belongs_to_tenant() {
823 let tenant1 = TenantId::new("tenant1".to_string()).unwrap();
824 let tenant2 = TenantId::new("tenant2".to_string()).unwrap();
825
826 let projection =
827 Projection::new_v1(tenant1.clone(), "test".to_string(), ProjectionType::Custom)
828 .unwrap();
829
830 assert!(projection.belongs_to_tenant(&tenant1));
831 assert!(!projection.belongs_to_tenant(&tenant2));
832 }
833
834 #[test]
835 fn test_create_next_version() {
836 let projection_v1 = Projection::new_v1(
837 test_tenant_id(),
838 "test_projection".to_string(),
839 ProjectionType::Custom,
840 )
841 .unwrap();
842
843 let projection_v2 = projection_v1.create_next_version();
844 assert!(projection_v2.is_ok());
845
846 let projection_v2 = projection_v2.unwrap();
847 assert_eq!(projection_v2.version(), 2);
848 assert_eq!(projection_v2.name(), "test_projection");
849 assert_eq!(projection_v2.projection_type(), ProjectionType::Custom);
850 assert!(!projection_v2.is_first_version());
851 }
852
853 #[test]
854 fn test_projection_status_checks() {
855 assert!(ProjectionStatus::Running.is_active());
856 assert!(ProjectionStatus::Rebuilding.is_active());
857 assert!(!ProjectionStatus::Paused.is_active());
858
859 assert!(ProjectionStatus::Created.can_start());
860 assert!(ProjectionStatus::Stopped.can_start());
861 assert!(!ProjectionStatus::Running.can_start());
862
863 assert!(ProjectionStatus::Running.can_pause());
864 assert!(!ProjectionStatus::Created.can_pause());
865
866 assert!(ProjectionStatus::Running.can_stop());
867 assert!(!ProjectionStatus::Stopped.can_stop());
868
869 assert!(ProjectionStatus::Failed.is_failed());
870 assert!(!ProjectionStatus::Running.is_failed());
871 }
872
873 #[test]
874 fn test_update_config() {
875 let mut projection =
876 Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
877 .unwrap();
878
879 let config = ProjectionConfig {
880 batch_size: 500,
881 parallel_processing: true,
882 ..Default::default()
883 };
884
885 projection.update_config(config);
886 assert_eq!(projection.config().batch_size, 500);
887 assert!(projection.config().parallel_processing);
888 }
889
890 #[test]
891 fn test_serde_serialization() {
892 let projection =
893 Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
894 .unwrap();
895
896 let json = serde_json::to_string(&projection);
898 assert!(json.is_ok());
899
900 let deserialized = serde_json::from_str::<Projection>(&json.unwrap());
902 assert!(deserialized.is_ok());
903 }
904}