1use crate::{
2 domain::value_objects::{EventType, TenantId},
3 error::Result,
4};
5use chrono::{DateTime, Utc};
6use serde::{Deserialize, Serialize};
7use uuid::Uuid;
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
13#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
14pub enum ProjectionStatus {
15 Created,
17 Running,
19 Paused,
21 Failed,
23 Stopped,
25 Rebuilding,
27}
28
29impl ProjectionStatus {
30 pub fn is_active(&self) -> bool {
32 matches!(self, Self::Running | Self::Rebuilding)
33 }
34
35 pub fn can_start(&self) -> bool {
37 matches!(self, Self::Created | Self::Stopped | Self::Paused)
38 }
39
40 pub fn can_pause(&self) -> bool {
42 matches!(self, Self::Running)
43 }
44
45 pub fn can_stop(&self) -> bool {
47 !matches!(self, Self::Stopped)
48 }
49
50 pub fn is_failed(&self) -> bool {
52 matches!(self, Self::Failed)
53 }
54}
55
56#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
60#[serde(rename_all = "snake_case")]
61pub enum ProjectionType {
62 EntitySnapshot,
64 EventCounter,
66 Custom,
68 TimeSeries,
70 Funnel,
72}
73
74#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct ProjectionConfig {
79 pub batch_size: usize,
81 pub enable_checkpoints: bool,
83 pub checkpoint_interval: usize,
85 pub parallel_processing: bool,
87 pub max_concurrency: usize,
89}
90
91impl Default for ProjectionConfig {
92 fn default() -> Self {
93 Self {
94 batch_size: 100,
95 enable_checkpoints: true,
96 checkpoint_interval: 1000,
97 parallel_processing: false,
98 max_concurrency: 4,
99 }
100 }
101}
102
103#[derive(Debug, Clone, Serialize, Deserialize)]
107pub struct ProjectionStats {
108 events_processed: u64,
109 last_processed_at: Option<DateTime<Utc>>,
110 last_checkpoint_at: Option<DateTime<Utc>>,
111 errors_count: u64,
112 last_error_at: Option<DateTime<Utc>>,
113 processing_time_ms: u64,
114}
115
116impl ProjectionStats {
117 pub fn new() -> Self {
118 Self {
119 events_processed: 0,
120 last_processed_at: None,
121 last_checkpoint_at: None,
122 errors_count: 0,
123 last_error_at: None,
124 processing_time_ms: 0,
125 }
126 }
127
128 pub fn events_processed(&self) -> u64 {
130 self.events_processed
131 }
132
133 pub fn errors_count(&self) -> u64 {
134 self.errors_count
135 }
136
137 pub fn last_processed_at(&self) -> Option<DateTime<Utc>> {
138 self.last_processed_at
139 }
140
141 pub fn processing_time_ms(&self) -> u64 {
142 self.processing_time_ms
143 }
144
145 pub fn record_event_processed(&mut self, processing_time_ms: u64) {
147 self.events_processed += 1;
148 self.last_processed_at = Some(Utc::now());
149 self.processing_time_ms += processing_time_ms;
150 }
151
152 pub fn record_error(&mut self) {
153 self.errors_count += 1;
154 self.last_error_at = Some(Utc::now());
155 }
156
157 pub fn record_checkpoint(&mut self) {
158 self.last_checkpoint_at = Some(Utc::now());
159 }
160
161 pub fn reset(&mut self) {
162 *self = Self::new();
163 }
164
165 pub fn avg_processing_time_ms(&self) -> f64 {
167 if self.events_processed == 0 {
168 0.0
169 } else {
170 self.processing_time_ms as f64 / self.events_processed as f64
171 }
172 }
173}
174
175impl Default for ProjectionStats {
176 fn default() -> Self {
177 Self::new()
178 }
179}
180
181#[derive(Debug, Clone, Serialize, Deserialize)]
194pub struct Projection {
195 id: Uuid,
196 tenant_id: TenantId,
197 name: String,
198 version: u32,
199 projection_type: ProjectionType,
200 status: ProjectionStatus,
201 config: ProjectionConfig,
202 stats: ProjectionStats,
203 created_at: DateTime<Utc>,
204 updated_at: DateTime<Utc>,
205 started_at: Option<DateTime<Utc>>,
206 stopped_at: Option<DateTime<Utc>>,
207 description: Option<String>,
208 event_types: Vec<EventType>,
210 metadata: serde_json::Value,
212}
213
214impl Projection {
215 pub fn new(
217 tenant_id: TenantId,
218 name: String,
219 version: u32,
220 projection_type: ProjectionType,
221 ) -> Result<Self> {
222 Self::validate_name(&name)?;
223 Self::validate_version(version)?;
224
225 let now = Utc::now();
226 Ok(Self {
227 id: Uuid::new_v4(),
228 tenant_id,
229 name,
230 version,
231 projection_type,
232 status: ProjectionStatus::Created,
233 config: ProjectionConfig::default(),
234 stats: ProjectionStats::new(),
235 created_at: now,
236 updated_at: now,
237 started_at: None,
238 stopped_at: None,
239 description: None,
240 event_types: Vec::new(),
241 metadata: serde_json::json!({}),
242 })
243 }
244
245 pub fn new_v1(
247 tenant_id: TenantId,
248 name: String,
249 projection_type: ProjectionType,
250 ) -> Result<Self> {
251 Self::new(tenant_id, name, 1, projection_type)
252 }
253
254 #[allow(clippy::too_many_arguments)]
256 pub fn reconstruct(
257 id: Uuid,
258 tenant_id: TenantId,
259 name: String,
260 version: u32,
261 projection_type: ProjectionType,
262 status: ProjectionStatus,
263 config: ProjectionConfig,
264 stats: ProjectionStats,
265 created_at: DateTime<Utc>,
266 updated_at: DateTime<Utc>,
267 started_at: Option<DateTime<Utc>>,
268 stopped_at: Option<DateTime<Utc>>,
269 description: Option<String>,
270 event_types: Vec<EventType>,
271 metadata: serde_json::Value,
272 ) -> Self {
273 Self {
274 id,
275 tenant_id,
276 name,
277 version,
278 projection_type,
279 status,
280 config,
281 stats,
282 created_at,
283 updated_at,
284 started_at,
285 stopped_at,
286 description,
287 event_types,
288 metadata,
289 }
290 }
291
292 pub fn id(&self) -> Uuid {
295 self.id
296 }
297
298 pub fn tenant_id(&self) -> &TenantId {
299 &self.tenant_id
300 }
301
302 pub fn name(&self) -> &str {
303 &self.name
304 }
305
306 pub fn version(&self) -> u32 {
307 self.version
308 }
309
310 pub fn projection_type(&self) -> ProjectionType {
311 self.projection_type
312 }
313
314 pub fn status(&self) -> ProjectionStatus {
315 self.status
316 }
317
318 pub fn config(&self) -> &ProjectionConfig {
319 &self.config
320 }
321
322 pub fn stats(&self) -> &ProjectionStats {
323 &self.stats
324 }
325
326 pub fn created_at(&self) -> DateTime<Utc> {
327 self.created_at
328 }
329
330 pub fn updated_at(&self) -> DateTime<Utc> {
331 self.updated_at
332 }
333
334 pub fn description(&self) -> Option<&str> {
335 self.description.as_deref()
336 }
337
338 pub fn event_types(&self) -> &[EventType] {
339 &self.event_types
340 }
341
342 pub fn metadata(&self) -> &serde_json::Value {
343 &self.metadata
344 }
345
346 pub fn start(&mut self) -> Result<()> {
350 if !self.status.can_start() {
351 return Err(crate::error::AllSourceError::ValidationError(format!(
352 "Cannot start projection in status {:?}",
353 self.status
354 )));
355 }
356
357 self.status = ProjectionStatus::Running;
358 self.started_at = Some(Utc::now());
359 self.updated_at = Utc::now();
360 Ok(())
361 }
362
363 pub fn pause(&mut self) -> Result<()> {
365 if !self.status.can_pause() {
366 return Err(crate::error::AllSourceError::ValidationError(format!(
367 "Cannot pause projection in status {:?}",
368 self.status
369 )));
370 }
371
372 self.status = ProjectionStatus::Paused;
373 self.updated_at = Utc::now();
374 Ok(())
375 }
376
377 pub fn stop(&mut self) -> Result<()> {
379 if !self.status.can_stop() {
380 return Err(crate::error::AllSourceError::ValidationError(format!(
381 "Cannot stop projection in status {:?}",
382 self.status
383 )));
384 }
385
386 self.status = ProjectionStatus::Stopped;
387 self.stopped_at = Some(Utc::now());
388 self.updated_at = Utc::now();
389 Ok(())
390 }
391
392 pub fn mark_failed(&mut self) {
394 self.status = ProjectionStatus::Failed;
395 self.updated_at = Utc::now();
396 }
397
398 pub fn start_rebuild(&mut self) -> Result<()> {
400 self.status = ProjectionStatus::Rebuilding;
401 self.stats.reset();
402 self.updated_at = Utc::now();
403 Ok(())
404 }
405
406 pub fn update_config(&mut self, config: ProjectionConfig) {
408 self.config = config;
409 self.updated_at = Utc::now();
410 }
411
412 pub fn set_description(&mut self, description: String) -> Result<()> {
414 Self::validate_description(&description)?;
415 self.description = Some(description);
416 self.updated_at = Utc::now();
417 Ok(())
418 }
419
420 pub fn add_event_type(&mut self, event_type: EventType) -> Result<()> {
422 if self.event_types.contains(&event_type) {
423 return Err(crate::error::AllSourceError::InvalidInput(format!(
424 "Event type '{}' already in filter",
425 event_type.as_str()
426 )));
427 }
428
429 self.event_types.push(event_type);
430 self.updated_at = Utc::now();
431 Ok(())
432 }
433
434 pub fn remove_event_type(&mut self, event_type: &EventType) -> Result<()> {
436 let initial_len = self.event_types.len();
437 self.event_types.retain(|et| et != event_type);
438
439 if self.event_types.len() == initial_len {
440 return Err(crate::error::AllSourceError::InvalidInput(format!(
441 "Event type '{}' not in filter",
442 event_type.as_str()
443 )));
444 }
445
446 self.updated_at = Utc::now();
447 Ok(())
448 }
449
450 pub fn processes_event_type(&self, event_type: &EventType) -> bool {
452 self.event_types.is_empty() || self.event_types.contains(event_type)
454 }
455
456 pub fn update_metadata(&mut self, metadata: serde_json::Value) {
458 self.metadata = metadata;
459 self.updated_at = Utc::now();
460 }
461
462 pub fn stats_mut(&mut self) -> &mut ProjectionStats {
464 self.updated_at = Utc::now();
465 &mut self.stats
466 }
467
468 pub fn is_first_version(&self) -> bool {
470 self.version == 1
471 }
472
473 pub fn belongs_to_tenant(&self, tenant_id: &TenantId) -> bool {
475 &self.tenant_id == tenant_id
476 }
477
478 pub fn create_next_version(&self) -> Result<Projection> {
480 Projection::new(
481 self.tenant_id.clone(),
482 self.name.clone(),
483 self.version + 1,
484 self.projection_type,
485 )
486 }
487
488 fn validate_name(name: &str) -> Result<()> {
491 if name.is_empty() {
492 return Err(crate::error::AllSourceError::InvalidInput(
493 "Projection name cannot be empty".to_string(),
494 ));
495 }
496
497 if name.len() > 100 {
498 return Err(crate::error::AllSourceError::InvalidInput(format!(
499 "Projection name cannot exceed 100 characters, got {}",
500 name.len()
501 )));
502 }
503
504 if !name
506 .chars()
507 .all(|c| c.is_alphanumeric() || c == '_' || c == '-')
508 {
509 return Err(crate::error::AllSourceError::InvalidInput(format!(
510 "Projection name '{name}' must be alphanumeric with underscores or hyphens"
511 )));
512 }
513
514 Ok(())
515 }
516
517 fn validate_version(version: u32) -> Result<()> {
518 if version == 0 {
519 return Err(crate::error::AllSourceError::InvalidInput(
520 "Projection version must be >= 1".to_string(),
521 ));
522 }
523 Ok(())
524 }
525
526 fn validate_description(description: &str) -> Result<()> {
527 if description.len() > 1000 {
528 return Err(crate::error::AllSourceError::InvalidInput(format!(
529 "Projection description cannot exceed 1000 characters, got {}",
530 description.len()
531 )));
532 }
533 Ok(())
534 }
535}
536
537#[cfg(test)]
538mod tests {
539 use super::*;
540
541 fn test_tenant_id() -> TenantId {
542 TenantId::new("test-tenant".to_string()).unwrap()
543 }
544
545 fn test_event_type() -> EventType {
546 EventType::new("test.event".to_string()).unwrap()
547 }
548
549 #[test]
550 fn test_create_projection() {
551 let projection = Projection::new(
552 test_tenant_id(),
553 "user_snapshot".to_string(),
554 1,
555 ProjectionType::EntitySnapshot,
556 );
557
558 assert!(projection.is_ok());
559 let projection = projection.unwrap();
560 assert_eq!(projection.name(), "user_snapshot");
561 assert_eq!(projection.version(), 1);
562 assert_eq!(projection.status(), ProjectionStatus::Created);
563 assert_eq!(projection.projection_type(), ProjectionType::EntitySnapshot);
564 }
565
566 #[test]
567 fn test_create_v1_projection() {
568 let projection = Projection::new_v1(
569 test_tenant_id(),
570 "event_counter".to_string(),
571 ProjectionType::EventCounter,
572 );
573
574 assert!(projection.is_ok());
575 let projection = projection.unwrap();
576 assert_eq!(projection.version(), 1);
577 assert!(projection.is_first_version());
578 }
579
580 #[test]
581 fn test_reject_empty_name() {
582 let result = Projection::new(test_tenant_id(), String::new(), 1, ProjectionType::Custom);
583
584 assert!(result.is_err());
585 }
586
587 #[test]
588 fn test_reject_too_long_name() {
589 let long_name = "a".repeat(101);
590 let result = Projection::new(test_tenant_id(), long_name, 1, ProjectionType::Custom);
591
592 assert!(result.is_err());
593 }
594
595 #[test]
596 fn test_reject_invalid_name_characters() {
597 let result = Projection::new(
598 test_tenant_id(),
599 "invalid name!".to_string(),
600 1,
601 ProjectionType::Custom,
602 );
603
604 assert!(result.is_err());
605 }
606
607 #[test]
608 fn test_accept_valid_names() {
609 let names = vec!["user_snapshot", "event-counter", "projection123"];
610
611 for name in names {
612 let result = Projection::new(
613 test_tenant_id(),
614 name.to_string(),
615 1,
616 ProjectionType::Custom,
617 );
618 assert!(result.is_ok(), "Name '{name}' should be valid");
619 }
620 }
621
622 #[test]
623 fn test_reject_zero_version() {
624 let result = Projection::new(
625 test_tenant_id(),
626 "test_projection".to_string(),
627 0,
628 ProjectionType::Custom,
629 );
630
631 assert!(result.is_err());
632 }
633
634 #[test]
635 fn test_start_projection() {
636 let mut projection =
637 Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
638 .unwrap();
639
640 assert_eq!(projection.status(), ProjectionStatus::Created);
641 assert!(projection.started_at.is_none());
642
643 let result = projection.start();
644 assert!(result.is_ok());
645 assert_eq!(projection.status(), ProjectionStatus::Running);
646 assert!(projection.started_at.is_some());
647 }
648
649 #[test]
650 fn test_cannot_start_running_projection() {
651 let mut projection =
652 Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
653 .unwrap();
654
655 projection.start().unwrap();
656 let result = projection.start();
657 assert!(result.is_err());
658 }
659
660 #[test]
661 fn test_pause_projection() {
662 let mut projection =
663 Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
664 .unwrap();
665
666 projection.start().unwrap();
667 let result = projection.pause();
668 assert!(result.is_ok());
669 assert_eq!(projection.status(), ProjectionStatus::Paused);
670 }
671
672 #[test]
673 fn test_cannot_pause_non_running_projection() {
674 let mut projection =
675 Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
676 .unwrap();
677
678 let result = projection.pause();
679 assert!(result.is_err());
680 }
681
682 #[test]
683 fn test_stop_projection() {
684 let mut projection =
685 Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
686 .unwrap();
687
688 projection.start().unwrap();
689 let result = projection.stop();
690 assert!(result.is_ok());
691 assert_eq!(projection.status(), ProjectionStatus::Stopped);
692 assert!(projection.stopped_at.is_some());
693 }
694
695 #[test]
696 fn test_mark_failed() {
697 let mut projection =
698 Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
699 .unwrap();
700
701 projection.start().unwrap();
702 projection.mark_failed();
703 assert_eq!(projection.status(), ProjectionStatus::Failed);
704 assert!(projection.status().is_failed());
705 }
706
707 #[test]
708 fn test_start_rebuild() {
709 let mut projection =
710 Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
711 .unwrap();
712
713 projection.stats_mut().record_event_processed(10);
715 assert_eq!(projection.stats().events_processed(), 1);
716
717 let result = projection.start_rebuild();
718 assert!(result.is_ok());
719 assert_eq!(projection.status(), ProjectionStatus::Rebuilding);
720 assert_eq!(projection.stats().events_processed(), 0); }
722
723 #[test]
724 fn test_set_description() {
725 let mut projection =
726 Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
727 .unwrap();
728
729 let result = projection.set_description("Test projection".to_string());
730 assert!(result.is_ok());
731 assert_eq!(projection.description(), Some("Test projection"));
732 }
733
734 #[test]
735 fn test_reject_too_long_description() {
736 let mut projection =
737 Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
738 .unwrap();
739
740 let long_desc = "a".repeat(1001);
741 let result = projection.set_description(long_desc);
742 assert!(result.is_err());
743 }
744
745 #[test]
746 fn test_add_event_type() {
747 let mut projection =
748 Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
749 .unwrap();
750
751 let event_type = test_event_type();
752 let result = projection.add_event_type(event_type.clone());
753 assert!(result.is_ok());
754 assert_eq!(projection.event_types().len(), 1);
755 assert!(projection.processes_event_type(&event_type));
756 }
757
758 #[test]
759 fn test_reject_duplicate_event_type() {
760 let mut projection =
761 Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
762 .unwrap();
763
764 let event_type = test_event_type();
765 projection.add_event_type(event_type.clone()).unwrap();
766 let result = projection.add_event_type(event_type);
767 assert!(result.is_err());
768 }
769
770 #[test]
771 fn test_remove_event_type() {
772 let mut projection =
773 Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
774 .unwrap();
775
776 let event_type = test_event_type();
777 projection.add_event_type(event_type.clone()).unwrap();
778
779 let result = projection.remove_event_type(&event_type);
780 assert!(result.is_ok());
781 assert_eq!(projection.event_types().len(), 0);
782 }
783
784 #[test]
785 fn test_processes_all_events_when_no_filter() {
786 let projection =
787 Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
788 .unwrap();
789
790 let event_type = test_event_type();
791 assert!(projection.processes_event_type(&event_type));
792 }
793
794 #[test]
795 fn test_projection_stats() {
796 let mut projection =
797 Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
798 .unwrap();
799
800 projection.stats_mut().record_event_processed(10);
802 projection.stats_mut().record_event_processed(20);
803 projection.stats_mut().record_event_processed(30);
804
805 assert_eq!(projection.stats().events_processed(), 3);
806 assert_eq!(projection.stats().processing_time_ms(), 60);
807 assert_eq!(projection.stats().avg_processing_time_ms(), 20.0);
808 }
809
810 #[test]
811 fn test_stats_record_error() {
812 let mut projection =
813 Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
814 .unwrap();
815
816 projection.stats_mut().record_error();
817 projection.stats_mut().record_error();
818
819 assert_eq!(projection.stats().errors_count(), 2);
820 }
821
822 #[test]
823 fn test_belongs_to_tenant() {
824 let tenant1 = TenantId::new("tenant1".to_string()).unwrap();
825 let tenant2 = TenantId::new("tenant2".to_string()).unwrap();
826
827 let projection =
828 Projection::new_v1(tenant1.clone(), "test".to_string(), ProjectionType::Custom)
829 .unwrap();
830
831 assert!(projection.belongs_to_tenant(&tenant1));
832 assert!(!projection.belongs_to_tenant(&tenant2));
833 }
834
835 #[test]
836 fn test_create_next_version() {
837 let projection_v1 = Projection::new_v1(
838 test_tenant_id(),
839 "test_projection".to_string(),
840 ProjectionType::Custom,
841 )
842 .unwrap();
843
844 let projection_v2 = projection_v1.create_next_version();
845 assert!(projection_v2.is_ok());
846
847 let projection_v2 = projection_v2.unwrap();
848 assert_eq!(projection_v2.version(), 2);
849 assert_eq!(projection_v2.name(), "test_projection");
850 assert_eq!(projection_v2.projection_type(), ProjectionType::Custom);
851 assert!(!projection_v2.is_first_version());
852 }
853
854 #[test]
855 fn test_projection_status_checks() {
856 assert!(ProjectionStatus::Running.is_active());
857 assert!(ProjectionStatus::Rebuilding.is_active());
858 assert!(!ProjectionStatus::Paused.is_active());
859
860 assert!(ProjectionStatus::Created.can_start());
861 assert!(ProjectionStatus::Stopped.can_start());
862 assert!(!ProjectionStatus::Running.can_start());
863
864 assert!(ProjectionStatus::Running.can_pause());
865 assert!(!ProjectionStatus::Created.can_pause());
866
867 assert!(ProjectionStatus::Running.can_stop());
868 assert!(!ProjectionStatus::Stopped.can_stop());
869
870 assert!(ProjectionStatus::Failed.is_failed());
871 assert!(!ProjectionStatus::Running.is_failed());
872 }
873
874 #[test]
875 fn test_update_config() {
876 let mut projection =
877 Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
878 .unwrap();
879
880 let config = ProjectionConfig {
881 batch_size: 500,
882 parallel_processing: true,
883 ..Default::default()
884 };
885
886 projection.update_config(config);
887 assert_eq!(projection.config().batch_size, 500);
888 assert!(projection.config().parallel_processing);
889 }
890
891 #[test]
892 fn test_serde_serialization() {
893 let projection =
894 Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
895 .unwrap();
896
897 let json = serde_json::to_string(&projection);
899 assert!(json.is_ok());
900
901 let deserialized = serde_json::from_str::<Projection>(&json.unwrap());
903 assert!(deserialized.is_ok());
904 }
905}