1use crate::{
2 domain::value_objects::{EventType, TenantId},
3 error::Result,
4};
5use chrono::{DateTime, Utc};
6use serde::{Deserialize, Serialize};
7use uuid::Uuid;
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
13#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
14pub enum ProjectionStatus {
15 Created,
17 Running,
19 Paused,
21 Failed,
23 Stopped,
25 Rebuilding,
27}
28
29impl ProjectionStatus {
30 pub fn is_active(&self) -> bool {
32 matches!(self, Self::Running | Self::Rebuilding)
33 }
34
35 pub fn can_start(&self) -> bool {
37 matches!(self, Self::Created | Self::Stopped | Self::Paused)
38 }
39
40 pub fn can_pause(&self) -> bool {
42 matches!(self, Self::Running)
43 }
44
45 pub fn can_stop(&self) -> bool {
47 !matches!(self, Self::Stopped)
48 }
49
50 pub fn is_failed(&self) -> bool {
52 matches!(self, Self::Failed)
53 }
54}
55
56#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
60#[serde(rename_all = "snake_case")]
61pub enum ProjectionType {
62 EntitySnapshot,
64 EventCounter,
66 Custom,
68 TimeSeries,
70 Funnel,
72}
73
74#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct ProjectionConfig {
79 pub batch_size: usize,
81 pub enable_checkpoints: bool,
83 pub checkpoint_interval: usize,
85 pub parallel_processing: bool,
87 pub max_concurrency: usize,
89}
90
91impl Default for ProjectionConfig {
92 fn default() -> Self {
93 Self {
94 batch_size: 100,
95 enable_checkpoints: true,
96 checkpoint_interval: 1000,
97 parallel_processing: false,
98 max_concurrency: 4,
99 }
100 }
101}
102
103#[derive(Debug, Clone, Serialize, Deserialize)]
107pub struct ProjectionStats {
108 events_processed: u64,
109 last_processed_at: Option<DateTime<Utc>>,
110 last_checkpoint_at: Option<DateTime<Utc>>,
111 errors_count: u64,
112 last_error_at: Option<DateTime<Utc>>,
113 processing_time_ms: u64,
114}
115
116impl ProjectionStats {
117 pub fn new() -> Self {
118 Self {
119 events_processed: 0,
120 last_processed_at: None,
121 last_checkpoint_at: None,
122 errors_count: 0,
123 last_error_at: None,
124 processing_time_ms: 0,
125 }
126 }
127
128 pub fn events_processed(&self) -> u64 {
130 self.events_processed
131 }
132
133 pub fn errors_count(&self) -> u64 {
134 self.errors_count
135 }
136
137 pub fn last_processed_at(&self) -> Option<DateTime<Utc>> {
138 self.last_processed_at
139 }
140
141 pub fn processing_time_ms(&self) -> u64 {
142 self.processing_time_ms
143 }
144
145 pub fn record_event_processed(&mut self, processing_time_ms: u64) {
147 self.events_processed += 1;
148 self.last_processed_at = Some(Utc::now());
149 self.processing_time_ms += processing_time_ms;
150 }
151
152 pub fn record_error(&mut self) {
153 self.errors_count += 1;
154 self.last_error_at = Some(Utc::now());
155 }
156
157 pub fn record_checkpoint(&mut self) {
158 self.last_checkpoint_at = Some(Utc::now());
159 }
160
161 pub fn reset(&mut self) {
162 *self = Self::new();
163 }
164
165 pub fn avg_processing_time_ms(&self) -> f64 {
167 if self.events_processed == 0 {
168 0.0
169 } else {
170 self.processing_time_ms as f64 / self.events_processed as f64
171 }
172 }
173}
174
175impl Default for ProjectionStats {
176 fn default() -> Self {
177 Self::new()
178 }
179}
180
181#[derive(Debug, Clone, Serialize, Deserialize)]
194pub struct Projection {
195 id: Uuid,
196 tenant_id: TenantId,
197 name: String,
198 version: u32,
199 projection_type: ProjectionType,
200 status: ProjectionStatus,
201 config: ProjectionConfig,
202 stats: ProjectionStats,
203 created_at: DateTime<Utc>,
204 updated_at: DateTime<Utc>,
205 started_at: Option<DateTime<Utc>>,
206 stopped_at: Option<DateTime<Utc>>,
207 description: Option<String>,
208 event_types: Vec<EventType>,
210 metadata: serde_json::Value,
212}
213
214impl Projection {
215 pub fn new(
217 tenant_id: TenantId,
218 name: String,
219 version: u32,
220 projection_type: ProjectionType,
221 ) -> Result<Self> {
222 Self::validate_name(&name)?;
223 Self::validate_version(version)?;
224
225 let now = Utc::now();
226 Ok(Self {
227 id: Uuid::new_v4(),
228 tenant_id,
229 name,
230 version,
231 projection_type,
232 status: ProjectionStatus::Created,
233 config: ProjectionConfig::default(),
234 stats: ProjectionStats::new(),
235 created_at: now,
236 updated_at: now,
237 started_at: None,
238 stopped_at: None,
239 description: None,
240 event_types: Vec::new(),
241 metadata: serde_json::json!({}),
242 })
243 }
244
245 pub fn new_v1(
247 tenant_id: TenantId,
248 name: String,
249 projection_type: ProjectionType,
250 ) -> Result<Self> {
251 Self::new(tenant_id, name, 1, projection_type)
252 }
253
254 #[allow(clippy::too_many_arguments)]
256 pub fn reconstruct(
257 id: Uuid,
258 tenant_id: TenantId,
259 name: String,
260 version: u32,
261 projection_type: ProjectionType,
262 status: ProjectionStatus,
263 config: ProjectionConfig,
264 stats: ProjectionStats,
265 created_at: DateTime<Utc>,
266 updated_at: DateTime<Utc>,
267 started_at: Option<DateTime<Utc>>,
268 stopped_at: Option<DateTime<Utc>>,
269 description: Option<String>,
270 event_types: Vec<EventType>,
271 metadata: serde_json::Value,
272 ) -> Self {
273 Self {
274 id,
275 tenant_id,
276 name,
277 version,
278 projection_type,
279 status,
280 config,
281 stats,
282 created_at,
283 updated_at,
284 started_at,
285 stopped_at,
286 description,
287 event_types,
288 metadata,
289 }
290 }
291
292 pub fn id(&self) -> Uuid {
295 self.id
296 }
297
298 pub fn tenant_id(&self) -> &TenantId {
299 &self.tenant_id
300 }
301
302 pub fn name(&self) -> &str {
303 &self.name
304 }
305
306 pub fn version(&self) -> u32 {
307 self.version
308 }
309
310 pub fn projection_type(&self) -> ProjectionType {
311 self.projection_type
312 }
313
314 pub fn status(&self) -> ProjectionStatus {
315 self.status
316 }
317
318 pub fn config(&self) -> &ProjectionConfig {
319 &self.config
320 }
321
322 pub fn stats(&self) -> &ProjectionStats {
323 &self.stats
324 }
325
326 pub fn created_at(&self) -> DateTime<Utc> {
327 self.created_at
328 }
329
330 pub fn updated_at(&self) -> DateTime<Utc> {
331 self.updated_at
332 }
333
334 pub fn description(&self) -> Option<&str> {
335 self.description.as_deref()
336 }
337
338 pub fn event_types(&self) -> &[EventType] {
339 &self.event_types
340 }
341
342 pub fn metadata(&self) -> &serde_json::Value {
343 &self.metadata
344 }
345
346 pub fn start(&mut self) -> Result<()> {
350 if !self.status.can_start() {
351 return Err(crate::error::AllSourceError::ValidationError(format!(
352 "Cannot start projection in status {:?}",
353 self.status
354 )));
355 }
356
357 self.status = ProjectionStatus::Running;
358 self.started_at = Some(Utc::now());
359 self.updated_at = Utc::now();
360 Ok(())
361 }
362
363 pub fn pause(&mut self) -> Result<()> {
365 if !self.status.can_pause() {
366 return Err(crate::error::AllSourceError::ValidationError(format!(
367 "Cannot pause projection in status {:?}",
368 self.status
369 )));
370 }
371
372 self.status = ProjectionStatus::Paused;
373 self.updated_at = Utc::now();
374 Ok(())
375 }
376
377 pub fn stop(&mut self) -> Result<()> {
379 if !self.status.can_stop() {
380 return Err(crate::error::AllSourceError::ValidationError(format!(
381 "Cannot stop projection in status {:?}",
382 self.status
383 )));
384 }
385
386 self.status = ProjectionStatus::Stopped;
387 self.stopped_at = Some(Utc::now());
388 self.updated_at = Utc::now();
389 Ok(())
390 }
391
392 pub fn mark_failed(&mut self) {
394 self.status = ProjectionStatus::Failed;
395 self.updated_at = Utc::now();
396 }
397
398 pub fn start_rebuild(&mut self) -> Result<()> {
400 self.status = ProjectionStatus::Rebuilding;
401 self.stats.reset();
402 self.updated_at = Utc::now();
403 Ok(())
404 }
405
406 pub fn update_config(&mut self, config: ProjectionConfig) {
408 self.config = config;
409 self.updated_at = Utc::now();
410 }
411
412 pub fn set_description(&mut self, description: String) -> Result<()> {
414 Self::validate_description(&description)?;
415 self.description = Some(description);
416 self.updated_at = Utc::now();
417 Ok(())
418 }
419
420 pub fn add_event_type(&mut self, event_type: EventType) -> Result<()> {
422 if self.event_types.contains(&event_type) {
423 return Err(crate::error::AllSourceError::InvalidInput(format!(
424 "Event type '{}' already in filter",
425 event_type.as_str()
426 )));
427 }
428
429 self.event_types.push(event_type);
430 self.updated_at = Utc::now();
431 Ok(())
432 }
433
434 pub fn remove_event_type(&mut self, event_type: &EventType) -> Result<()> {
436 let initial_len = self.event_types.len();
437 self.event_types.retain(|et| et != event_type);
438
439 if self.event_types.len() == initial_len {
440 return Err(crate::error::AllSourceError::InvalidInput(format!(
441 "Event type '{}' not in filter",
442 event_type.as_str()
443 )));
444 }
445
446 self.updated_at = Utc::now();
447 Ok(())
448 }
449
450 pub fn processes_event_type(&self, event_type: &EventType) -> bool {
452 self.event_types.is_empty() || self.event_types.contains(event_type)
454 }
455
456 pub fn update_metadata(&mut self, metadata: serde_json::Value) {
458 self.metadata = metadata;
459 self.updated_at = Utc::now();
460 }
461
462 pub fn stats_mut(&mut self) -> &mut ProjectionStats {
464 self.updated_at = Utc::now();
465 &mut self.stats
466 }
467
468 pub fn is_first_version(&self) -> bool {
470 self.version == 1
471 }
472
473 pub fn belongs_to_tenant(&self, tenant_id: &TenantId) -> bool {
475 &self.tenant_id == tenant_id
476 }
477
478 pub fn create_next_version(&self) -> Result<Projection> {
480 Projection::new(
481 self.tenant_id.clone(),
482 self.name.clone(),
483 self.version + 1,
484 self.projection_type,
485 )
486 }
487
488 fn validate_name(name: &str) -> Result<()> {
491 if name.is_empty() {
492 return Err(crate::error::AllSourceError::InvalidInput(
493 "Projection name cannot be empty".to_string(),
494 ));
495 }
496
497 if name.len() > 100 {
498 return Err(crate::error::AllSourceError::InvalidInput(format!(
499 "Projection name cannot exceed 100 characters, got {}",
500 name.len()
501 )));
502 }
503
504 if !name
506 .chars()
507 .all(|c| c.is_alphanumeric() || c == '_' || c == '-')
508 {
509 return Err(crate::error::AllSourceError::InvalidInput(format!(
510 "Projection name '{}' must be alphanumeric with underscores or hyphens",
511 name
512 )));
513 }
514
515 Ok(())
516 }
517
518 fn validate_version(version: u32) -> Result<()> {
519 if version == 0 {
520 return Err(crate::error::AllSourceError::InvalidInput(
521 "Projection version must be >= 1".to_string(),
522 ));
523 }
524 Ok(())
525 }
526
527 fn validate_description(description: &str) -> Result<()> {
528 if description.len() > 1000 {
529 return Err(crate::error::AllSourceError::InvalidInput(format!(
530 "Projection description cannot exceed 1000 characters, got {}",
531 description.len()
532 )));
533 }
534 Ok(())
535 }
536}
537
538#[cfg(test)]
539mod tests {
540 use super::*;
541
542 fn test_tenant_id() -> TenantId {
543 TenantId::new("test-tenant".to_string()).unwrap()
544 }
545
546 fn test_event_type() -> EventType {
547 EventType::new("test.event".to_string()).unwrap()
548 }
549
550 #[test]
551 fn test_create_projection() {
552 let projection = Projection::new(
553 test_tenant_id(),
554 "user_snapshot".to_string(),
555 1,
556 ProjectionType::EntitySnapshot,
557 );
558
559 assert!(projection.is_ok());
560 let projection = projection.unwrap();
561 assert_eq!(projection.name(), "user_snapshot");
562 assert_eq!(projection.version(), 1);
563 assert_eq!(projection.status(), ProjectionStatus::Created);
564 assert_eq!(projection.projection_type(), ProjectionType::EntitySnapshot);
565 }
566
567 #[test]
568 fn test_create_v1_projection() {
569 let projection = Projection::new_v1(
570 test_tenant_id(),
571 "event_counter".to_string(),
572 ProjectionType::EventCounter,
573 );
574
575 assert!(projection.is_ok());
576 let projection = projection.unwrap();
577 assert_eq!(projection.version(), 1);
578 assert!(projection.is_first_version());
579 }
580
581 #[test]
582 fn test_reject_empty_name() {
583 let result = Projection::new(test_tenant_id(), "".to_string(), 1, ProjectionType::Custom);
584
585 assert!(result.is_err());
586 }
587
588 #[test]
589 fn test_reject_too_long_name() {
590 let long_name = "a".repeat(101);
591 let result = Projection::new(test_tenant_id(), long_name, 1, ProjectionType::Custom);
592
593 assert!(result.is_err());
594 }
595
596 #[test]
597 fn test_reject_invalid_name_characters() {
598 let result = Projection::new(
599 test_tenant_id(),
600 "invalid name!".to_string(),
601 1,
602 ProjectionType::Custom,
603 );
604
605 assert!(result.is_err());
606 }
607
608 #[test]
609 fn test_accept_valid_names() {
610 let names = vec!["user_snapshot", "event-counter", "projection123"];
611
612 for name in names {
613 let result = Projection::new(
614 test_tenant_id(),
615 name.to_string(),
616 1,
617 ProjectionType::Custom,
618 );
619 assert!(result.is_ok(), "Name '{}' should be valid", name);
620 }
621 }
622
623 #[test]
624 fn test_reject_zero_version() {
625 let result = Projection::new(
626 test_tenant_id(),
627 "test_projection".to_string(),
628 0,
629 ProjectionType::Custom,
630 );
631
632 assert!(result.is_err());
633 }
634
635 #[test]
636 fn test_start_projection() {
637 let mut projection =
638 Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
639 .unwrap();
640
641 assert_eq!(projection.status(), ProjectionStatus::Created);
642 assert!(projection.started_at.is_none());
643
644 let result = projection.start();
645 assert!(result.is_ok());
646 assert_eq!(projection.status(), ProjectionStatus::Running);
647 assert!(projection.started_at.is_some());
648 }
649
650 #[test]
651 fn test_cannot_start_running_projection() {
652 let mut projection =
653 Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
654 .unwrap();
655
656 projection.start().unwrap();
657 let result = projection.start();
658 assert!(result.is_err());
659 }
660
661 #[test]
662 fn test_pause_projection() {
663 let mut projection =
664 Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
665 .unwrap();
666
667 projection.start().unwrap();
668 let result = projection.pause();
669 assert!(result.is_ok());
670 assert_eq!(projection.status(), ProjectionStatus::Paused);
671 }
672
673 #[test]
674 fn test_cannot_pause_non_running_projection() {
675 let mut projection =
676 Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
677 .unwrap();
678
679 let result = projection.pause();
680 assert!(result.is_err());
681 }
682
683 #[test]
684 fn test_stop_projection() {
685 let mut projection =
686 Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
687 .unwrap();
688
689 projection.start().unwrap();
690 let result = projection.stop();
691 assert!(result.is_ok());
692 assert_eq!(projection.status(), ProjectionStatus::Stopped);
693 assert!(projection.stopped_at.is_some());
694 }
695
696 #[test]
697 fn test_mark_failed() {
698 let mut projection =
699 Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
700 .unwrap();
701
702 projection.start().unwrap();
703 projection.mark_failed();
704 assert_eq!(projection.status(), ProjectionStatus::Failed);
705 assert!(projection.status().is_failed());
706 }
707
708 #[test]
709 fn test_start_rebuild() {
710 let mut projection =
711 Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
712 .unwrap();
713
714 projection.stats_mut().record_event_processed(10);
716 assert_eq!(projection.stats().events_processed(), 1);
717
718 let result = projection.start_rebuild();
719 assert!(result.is_ok());
720 assert_eq!(projection.status(), ProjectionStatus::Rebuilding);
721 assert_eq!(projection.stats().events_processed(), 0); }
723
724 #[test]
725 fn test_set_description() {
726 let mut projection =
727 Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
728 .unwrap();
729
730 let result = projection.set_description("Test projection".to_string());
731 assert!(result.is_ok());
732 assert_eq!(projection.description(), Some("Test projection"));
733 }
734
735 #[test]
736 fn test_reject_too_long_description() {
737 let mut projection =
738 Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
739 .unwrap();
740
741 let long_desc = "a".repeat(1001);
742 let result = projection.set_description(long_desc);
743 assert!(result.is_err());
744 }
745
746 #[test]
747 fn test_add_event_type() {
748 let mut projection =
749 Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
750 .unwrap();
751
752 let event_type = test_event_type();
753 let result = projection.add_event_type(event_type.clone());
754 assert!(result.is_ok());
755 assert_eq!(projection.event_types().len(), 1);
756 assert!(projection.processes_event_type(&event_type));
757 }
758
759 #[test]
760 fn test_reject_duplicate_event_type() {
761 let mut projection =
762 Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
763 .unwrap();
764
765 let event_type = test_event_type();
766 projection.add_event_type(event_type.clone()).unwrap();
767 let result = projection.add_event_type(event_type);
768 assert!(result.is_err());
769 }
770
771 #[test]
772 fn test_remove_event_type() {
773 let mut projection =
774 Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
775 .unwrap();
776
777 let event_type = test_event_type();
778 projection.add_event_type(event_type.clone()).unwrap();
779
780 let result = projection.remove_event_type(&event_type);
781 assert!(result.is_ok());
782 assert_eq!(projection.event_types().len(), 0);
783 }
784
785 #[test]
786 fn test_processes_all_events_when_no_filter() {
787 let projection =
788 Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
789 .unwrap();
790
791 let event_type = test_event_type();
792 assert!(projection.processes_event_type(&event_type));
793 }
794
795 #[test]
796 fn test_projection_stats() {
797 let mut projection =
798 Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
799 .unwrap();
800
801 projection.stats_mut().record_event_processed(10);
803 projection.stats_mut().record_event_processed(20);
804 projection.stats_mut().record_event_processed(30);
805
806 assert_eq!(projection.stats().events_processed(), 3);
807 assert_eq!(projection.stats().processing_time_ms(), 60);
808 assert_eq!(projection.stats().avg_processing_time_ms(), 20.0);
809 }
810
811 #[test]
812 fn test_stats_record_error() {
813 let mut projection =
814 Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
815 .unwrap();
816
817 projection.stats_mut().record_error();
818 projection.stats_mut().record_error();
819
820 assert_eq!(projection.stats().errors_count(), 2);
821 }
822
823 #[test]
824 fn test_belongs_to_tenant() {
825 let tenant1 = TenantId::new("tenant1".to_string()).unwrap();
826 let tenant2 = TenantId::new("tenant2".to_string()).unwrap();
827
828 let projection =
829 Projection::new_v1(tenant1.clone(), "test".to_string(), ProjectionType::Custom)
830 .unwrap();
831
832 assert!(projection.belongs_to_tenant(&tenant1));
833 assert!(!projection.belongs_to_tenant(&tenant2));
834 }
835
836 #[test]
837 fn test_create_next_version() {
838 let projection_v1 = Projection::new_v1(
839 test_tenant_id(),
840 "test_projection".to_string(),
841 ProjectionType::Custom,
842 )
843 .unwrap();
844
845 let projection_v2 = projection_v1.create_next_version();
846 assert!(projection_v2.is_ok());
847
848 let projection_v2 = projection_v2.unwrap();
849 assert_eq!(projection_v2.version(), 2);
850 assert_eq!(projection_v2.name(), "test_projection");
851 assert_eq!(projection_v2.projection_type(), ProjectionType::Custom);
852 assert!(!projection_v2.is_first_version());
853 }
854
855 #[test]
856 fn test_projection_status_checks() {
857 assert!(ProjectionStatus::Running.is_active());
858 assert!(ProjectionStatus::Rebuilding.is_active());
859 assert!(!ProjectionStatus::Paused.is_active());
860
861 assert!(ProjectionStatus::Created.can_start());
862 assert!(ProjectionStatus::Stopped.can_start());
863 assert!(!ProjectionStatus::Running.can_start());
864
865 assert!(ProjectionStatus::Running.can_pause());
866 assert!(!ProjectionStatus::Created.can_pause());
867
868 assert!(ProjectionStatus::Running.can_stop());
869 assert!(!ProjectionStatus::Stopped.can_stop());
870
871 assert!(ProjectionStatus::Failed.is_failed());
872 assert!(!ProjectionStatus::Running.is_failed());
873 }
874
875 #[test]
876 fn test_update_config() {
877 let mut projection =
878 Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
879 .unwrap();
880
881 let config = ProjectionConfig {
882 batch_size: 500,
883 parallel_processing: true,
884 ..Default::default()
885 };
886
887 projection.update_config(config);
888 assert_eq!(projection.config().batch_size, 500);
889 assert!(projection.config().parallel_processing);
890 }
891
892 #[test]
893 fn test_serde_serialization() {
894 let projection =
895 Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
896 .unwrap();
897
898 let json = serde_json::to_string(&projection);
900 assert!(json.is_ok());
901
902 let deserialized = serde_json::from_str::<Projection>(&json.unwrap());
904 assert!(deserialized.is_ok());
905 }
906}