Skip to main content

allsource_core/domain/entities/
projection.rs

1use crate::{
2    domain::value_objects::{EventType, TenantId},
3    error::Result,
4};
5use chrono::{DateTime, Utc};
6use serde::{Deserialize, Serialize};
7use uuid::Uuid;
8
9/// Projection status
10///
11/// Tracks the current state of a projection's lifecycle.
12#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
13#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
14pub enum ProjectionStatus {
15    /// Projection is defined but not yet started
16    Created,
17    /// Projection is actively processing events
18    Running,
19    /// Projection is temporarily paused
20    Paused,
21    /// Projection has encountered an error
22    Failed,
23    /// Projection has been stopped
24    Stopped,
25    /// Projection is being rebuilt from scratch
26    Rebuilding,
27}
28
29impl ProjectionStatus {
30    /// Check if projection is actively processing
31    pub fn is_active(&self) -> bool {
32        matches!(self, Self::Running | Self::Rebuilding)
33    }
34
35    /// Check if projection can be started
36    pub fn can_start(&self) -> bool {
37        matches!(self, Self::Created | Self::Stopped | Self::Paused)
38    }
39
40    /// Check if projection can be paused
41    pub fn can_pause(&self) -> bool {
42        matches!(self, Self::Running)
43    }
44
45    /// Check if projection can be stopped
46    pub fn can_stop(&self) -> bool {
47        !matches!(self, Self::Stopped)
48    }
49
50    /// Check if projection is in error state
51    pub fn is_failed(&self) -> bool {
52        matches!(self, Self::Failed)
53    }
54}
55
56/// Projection type
57///
58/// Defines how the projection processes events.
59#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
60#[serde(rename_all = "snake_case")]
61pub enum ProjectionType {
62    /// Maintains current state by entity ID
63    EntitySnapshot,
64    /// Counts events by type
65    EventCounter,
66    /// Custom user-defined projection logic
67    Custom,
68    /// Time-series aggregation
69    TimeSeries,
70    /// Funnel analysis
71    Funnel,
72}
73
74/// Projection configuration
75///
76/// Optional settings that control projection behavior.
77#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct ProjectionConfig {
79    /// Maximum number of events to process in a batch
80    pub batch_size: usize,
81    /// Whether to checkpoint state periodically
82    pub enable_checkpoints: bool,
83    /// Checkpoint interval in number of events
84    pub checkpoint_interval: usize,
85    /// Whether to process events in parallel
86    pub parallel_processing: bool,
87    /// Maximum concurrent event handlers
88    pub max_concurrency: usize,
89}
90
91impl Default for ProjectionConfig {
92    fn default() -> Self {
93        Self {
94            batch_size: 100,
95            enable_checkpoints: true,
96            checkpoint_interval: 1000,
97            parallel_processing: false,
98            max_concurrency: 4,
99        }
100    }
101}
102
103/// Projection statistics
104///
105/// Tracks operational metrics for a projection.
106#[derive(Debug, Clone, Serialize, Deserialize)]
107pub struct ProjectionStats {
108    events_processed: u64,
109    last_processed_at: Option<DateTime<Utc>>,
110    last_checkpoint_at: Option<DateTime<Utc>>,
111    errors_count: u64,
112    last_error_at: Option<DateTime<Utc>>,
113    processing_time_ms: u64,
114}
115
116impl ProjectionStats {
117    pub fn new() -> Self {
118        Self {
119            events_processed: 0,
120            last_processed_at: None,
121            last_checkpoint_at: None,
122            errors_count: 0,
123            last_error_at: None,
124            processing_time_ms: 0,
125        }
126    }
127
128    // Getters
129    pub fn events_processed(&self) -> u64 {
130        self.events_processed
131    }
132
133    pub fn errors_count(&self) -> u64 {
134        self.errors_count
135    }
136
137    pub fn last_processed_at(&self) -> Option<DateTime<Utc>> {
138        self.last_processed_at
139    }
140
141    pub fn processing_time_ms(&self) -> u64 {
142        self.processing_time_ms
143    }
144
145    // Mutation methods
146    pub fn record_event_processed(&mut self, processing_time_ms: u64) {
147        self.events_processed += 1;
148        self.last_processed_at = Some(Utc::now());
149        self.processing_time_ms += processing_time_ms;
150    }
151
152    pub fn record_error(&mut self) {
153        self.errors_count += 1;
154        self.last_error_at = Some(Utc::now());
155    }
156
157    pub fn record_checkpoint(&mut self) {
158        self.last_checkpoint_at = Some(Utc::now());
159    }
160
161    pub fn reset(&mut self) {
162        *self = Self::new();
163    }
164
165    /// Calculate average processing time per event in milliseconds
166    pub fn avg_processing_time_ms(&self) -> f64 {
167        if self.events_processed == 0 {
168            0.0
169        } else {
170            self.processing_time_ms as f64 / self.events_processed as f64
171        }
172    }
173}
174
175impl Default for ProjectionStats {
176    fn default() -> Self {
177        Self::new()
178    }
179}
180
181/// Domain Entity: Projection
182///
183/// Represents a projection that aggregates events into a queryable view.
184/// Projections are materialized views derived from the event stream.
185///
186/// Domain Rules:
187/// - Name must be unique within a tenant
188/// - Name cannot be empty
189/// - Version starts at 1 and increments
190/// - Cannot change projection type after creation
191/// - Status transitions must follow lifecycle rules
192/// - Stats are accurate and updated on each event
193#[derive(Debug, Clone, Serialize, Deserialize)]
194pub struct Projection {
195    id: Uuid,
196    tenant_id: TenantId,
197    name: String,
198    version: u32,
199    projection_type: ProjectionType,
200    status: ProjectionStatus,
201    config: ProjectionConfig,
202    stats: ProjectionStats,
203    created_at: DateTime<Utc>,
204    updated_at: DateTime<Utc>,
205    started_at: Option<DateTime<Utc>>,
206    stopped_at: Option<DateTime<Utc>>,
207    description: Option<String>,
208    /// Event types this projection is interested in (empty = all events)
209    event_types: Vec<EventType>,
210    /// Custom metadata
211    metadata: serde_json::Value,
212}
213
214impl Projection {
215    /// Create a new projection with validation
216    pub fn new(
217        tenant_id: TenantId,
218        name: String,
219        version: u32,
220        projection_type: ProjectionType,
221    ) -> Result<Self> {
222        Self::validate_name(&name)?;
223        Self::validate_version(version)?;
224
225        let now = Utc::now();
226        Ok(Self {
227            id: Uuid::new_v4(),
228            tenant_id,
229            name,
230            version,
231            projection_type,
232            status: ProjectionStatus::Created,
233            config: ProjectionConfig::default(),
234            stats: ProjectionStats::new(),
235            created_at: now,
236            updated_at: now,
237            started_at: None,
238            stopped_at: None,
239            description: None,
240            event_types: Vec::new(),
241            metadata: serde_json::json!({}),
242        })
243    }
244
245    /// Create first version of a projection
246    pub fn new_v1(
247        tenant_id: TenantId,
248        name: String,
249        projection_type: ProjectionType,
250    ) -> Result<Self> {
251        Self::new(tenant_id, name, 1, projection_type)
252    }
253
254    /// Reconstruct projection from storage (bypasses validation)
255    #[allow(clippy::too_many_arguments)]
256    pub fn reconstruct(
257        id: Uuid,
258        tenant_id: TenantId,
259        name: String,
260        version: u32,
261        projection_type: ProjectionType,
262        status: ProjectionStatus,
263        config: ProjectionConfig,
264        stats: ProjectionStats,
265        created_at: DateTime<Utc>,
266        updated_at: DateTime<Utc>,
267        started_at: Option<DateTime<Utc>>,
268        stopped_at: Option<DateTime<Utc>>,
269        description: Option<String>,
270        event_types: Vec<EventType>,
271        metadata: serde_json::Value,
272    ) -> Self {
273        Self {
274            id,
275            tenant_id,
276            name,
277            version,
278            projection_type,
279            status,
280            config,
281            stats,
282            created_at,
283            updated_at,
284            started_at,
285            stopped_at,
286            description,
287            event_types,
288            metadata,
289        }
290    }
291
292    // Getters
293
294    pub fn id(&self) -> Uuid {
295        self.id
296    }
297
298    pub fn tenant_id(&self) -> &TenantId {
299        &self.tenant_id
300    }
301
302    pub fn name(&self) -> &str {
303        &self.name
304    }
305
306    pub fn version(&self) -> u32 {
307        self.version
308    }
309
310    pub fn projection_type(&self) -> ProjectionType {
311        self.projection_type
312    }
313
314    pub fn status(&self) -> ProjectionStatus {
315        self.status
316    }
317
318    pub fn config(&self) -> &ProjectionConfig {
319        &self.config
320    }
321
322    pub fn stats(&self) -> &ProjectionStats {
323        &self.stats
324    }
325
326    pub fn created_at(&self) -> DateTime<Utc> {
327        self.created_at
328    }
329
330    pub fn updated_at(&self) -> DateTime<Utc> {
331        self.updated_at
332    }
333
334    pub fn description(&self) -> Option<&str> {
335        self.description.as_deref()
336    }
337
338    pub fn event_types(&self) -> &[EventType] {
339        &self.event_types
340    }
341
342    pub fn metadata(&self) -> &serde_json::Value {
343        &self.metadata
344    }
345
346    // Domain behavior methods
347
348    /// Start the projection
349    pub fn start(&mut self) -> Result<()> {
350        if !self.status.can_start() {
351            return Err(crate::error::AllSourceError::ValidationError(format!(
352                "Cannot start projection in status {:?}",
353                self.status
354            )));
355        }
356
357        self.status = ProjectionStatus::Running;
358        self.started_at = Some(Utc::now());
359        self.updated_at = Utc::now();
360        Ok(())
361    }
362
363    /// Pause the projection
364    pub fn pause(&mut self) -> Result<()> {
365        if !self.status.can_pause() {
366            return Err(crate::error::AllSourceError::ValidationError(format!(
367                "Cannot pause projection in status {:?}",
368                self.status
369            )));
370        }
371
372        self.status = ProjectionStatus::Paused;
373        self.updated_at = Utc::now();
374        Ok(())
375    }
376
377    /// Stop the projection
378    pub fn stop(&mut self) -> Result<()> {
379        if !self.status.can_stop() {
380            return Err(crate::error::AllSourceError::ValidationError(format!(
381                "Cannot stop projection in status {:?}",
382                self.status
383            )));
384        }
385
386        self.status = ProjectionStatus::Stopped;
387        self.stopped_at = Some(Utc::now());
388        self.updated_at = Utc::now();
389        Ok(())
390    }
391
392    /// Mark projection as failed
393    pub fn mark_failed(&mut self) {
394        self.status = ProjectionStatus::Failed;
395        self.updated_at = Utc::now();
396    }
397
398    /// Start rebuilding the projection
399    pub fn start_rebuild(&mut self) -> Result<()> {
400        self.status = ProjectionStatus::Rebuilding;
401        self.stats.reset();
402        self.updated_at = Utc::now();
403        Ok(())
404    }
405
406    /// Update configuration
407    pub fn update_config(&mut self, config: ProjectionConfig) {
408        self.config = config;
409        self.updated_at = Utc::now();
410    }
411
412    /// Set description
413    pub fn set_description(&mut self, description: String) -> Result<()> {
414        Self::validate_description(&description)?;
415        self.description = Some(description);
416        self.updated_at = Utc::now();
417        Ok(())
418    }
419
420    /// Add event type filter
421    pub fn add_event_type(&mut self, event_type: EventType) -> Result<()> {
422        if self.event_types.contains(&event_type) {
423            return Err(crate::error::AllSourceError::InvalidInput(format!(
424                "Event type '{}' already in filter",
425                event_type.as_str()
426            )));
427        }
428
429        self.event_types.push(event_type);
430        self.updated_at = Utc::now();
431        Ok(())
432    }
433
434    /// Remove event type filter
435    pub fn remove_event_type(&mut self, event_type: &EventType) -> Result<()> {
436        let initial_len = self.event_types.len();
437        self.event_types.retain(|et| et != event_type);
438
439        if self.event_types.len() == initial_len {
440            return Err(crate::error::AllSourceError::InvalidInput(format!(
441                "Event type '{}' not in filter",
442                event_type.as_str()
443            )));
444        }
445
446        self.updated_at = Utc::now();
447        Ok(())
448    }
449
450    /// Check if projection processes this event type
451    pub fn processes_event_type(&self, event_type: &EventType) -> bool {
452        // Empty filter means process all events
453        self.event_types.is_empty() || self.event_types.contains(event_type)
454    }
455
456    /// Update metadata
457    pub fn update_metadata(&mut self, metadata: serde_json::Value) {
458        self.metadata = metadata;
459        self.updated_at = Utc::now();
460    }
461
462    /// Get mutable access to stats (for recording events)
463    pub fn stats_mut(&mut self) -> &mut ProjectionStats {
464        self.updated_at = Utc::now();
465        &mut self.stats
466    }
467
468    /// Check if projection is first version
469    pub fn is_first_version(&self) -> bool {
470        self.version == 1
471    }
472
473    /// Check if projection belongs to tenant
474    pub fn belongs_to_tenant(&self, tenant_id: &TenantId) -> bool {
475        &self.tenant_id == tenant_id
476    }
477
478    /// Create next version
479    pub fn create_next_version(&self) -> Result<Projection> {
480        Projection::new(
481            self.tenant_id.clone(),
482            self.name.clone(),
483            self.version + 1,
484            self.projection_type,
485        )
486    }
487
488    // Validation methods
489
490    fn validate_name(name: &str) -> Result<()> {
491        if name.is_empty() {
492            return Err(crate::error::AllSourceError::InvalidInput(
493                "Projection name cannot be empty".to_string(),
494            ));
495        }
496
497        if name.len() > 100 {
498            return Err(crate::error::AllSourceError::InvalidInput(format!(
499                "Projection name cannot exceed 100 characters, got {}",
500                name.len()
501            )));
502        }
503
504        // Name should be alphanumeric with underscores/hyphens
505        if !name
506            .chars()
507            .all(|c| c.is_alphanumeric() || c == '_' || c == '-')
508        {
509            return Err(crate::error::AllSourceError::InvalidInput(format!(
510                "Projection name '{}' must be alphanumeric with underscores or hyphens",
511                name
512            )));
513        }
514
515        Ok(())
516    }
517
518    fn validate_version(version: u32) -> Result<()> {
519        if version == 0 {
520            return Err(crate::error::AllSourceError::InvalidInput(
521                "Projection version must be >= 1".to_string(),
522            ));
523        }
524        Ok(())
525    }
526
527    fn validate_description(description: &str) -> Result<()> {
528        if description.len() > 1000 {
529            return Err(crate::error::AllSourceError::InvalidInput(format!(
530                "Projection description cannot exceed 1000 characters, got {}",
531                description.len()
532            )));
533        }
534        Ok(())
535    }
536}
537
538#[cfg(test)]
539mod tests {
540    use super::*;
541
542    fn test_tenant_id() -> TenantId {
543        TenantId::new("test-tenant".to_string()).unwrap()
544    }
545
546    fn test_event_type() -> EventType {
547        EventType::new("test.event".to_string()).unwrap()
548    }
549
550    #[test]
551    fn test_create_projection() {
552        let projection = Projection::new(
553            test_tenant_id(),
554            "user_snapshot".to_string(),
555            1,
556            ProjectionType::EntitySnapshot,
557        );
558
559        assert!(projection.is_ok());
560        let projection = projection.unwrap();
561        assert_eq!(projection.name(), "user_snapshot");
562        assert_eq!(projection.version(), 1);
563        assert_eq!(projection.status(), ProjectionStatus::Created);
564        assert_eq!(projection.projection_type(), ProjectionType::EntitySnapshot);
565    }
566
567    #[test]
568    fn test_create_v1_projection() {
569        let projection = Projection::new_v1(
570            test_tenant_id(),
571            "event_counter".to_string(),
572            ProjectionType::EventCounter,
573        );
574
575        assert!(projection.is_ok());
576        let projection = projection.unwrap();
577        assert_eq!(projection.version(), 1);
578        assert!(projection.is_first_version());
579    }
580
581    #[test]
582    fn test_reject_empty_name() {
583        let result = Projection::new(test_tenant_id(), "".to_string(), 1, ProjectionType::Custom);
584
585        assert!(result.is_err());
586    }
587
588    #[test]
589    fn test_reject_too_long_name() {
590        let long_name = "a".repeat(101);
591        let result = Projection::new(test_tenant_id(), long_name, 1, ProjectionType::Custom);
592
593        assert!(result.is_err());
594    }
595
596    #[test]
597    fn test_reject_invalid_name_characters() {
598        let result = Projection::new(
599            test_tenant_id(),
600            "invalid name!".to_string(),
601            1,
602            ProjectionType::Custom,
603        );
604
605        assert!(result.is_err());
606    }
607
608    #[test]
609    fn test_accept_valid_names() {
610        let names = vec!["user_snapshot", "event-counter", "projection123"];
611
612        for name in names {
613            let result = Projection::new(
614                test_tenant_id(),
615                name.to_string(),
616                1,
617                ProjectionType::Custom,
618            );
619            assert!(result.is_ok(), "Name '{}' should be valid", name);
620        }
621    }
622
623    #[test]
624    fn test_reject_zero_version() {
625        let result = Projection::new(
626            test_tenant_id(),
627            "test_projection".to_string(),
628            0,
629            ProjectionType::Custom,
630        );
631
632        assert!(result.is_err());
633    }
634
635    #[test]
636    fn test_start_projection() {
637        let mut projection =
638            Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
639                .unwrap();
640
641        assert_eq!(projection.status(), ProjectionStatus::Created);
642        assert!(projection.started_at.is_none());
643
644        let result = projection.start();
645        assert!(result.is_ok());
646        assert_eq!(projection.status(), ProjectionStatus::Running);
647        assert!(projection.started_at.is_some());
648    }
649
650    #[test]
651    fn test_cannot_start_running_projection() {
652        let mut projection =
653            Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
654                .unwrap();
655
656        projection.start().unwrap();
657        let result = projection.start();
658        assert!(result.is_err());
659    }
660
661    #[test]
662    fn test_pause_projection() {
663        let mut projection =
664            Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
665                .unwrap();
666
667        projection.start().unwrap();
668        let result = projection.pause();
669        assert!(result.is_ok());
670        assert_eq!(projection.status(), ProjectionStatus::Paused);
671    }
672
673    #[test]
674    fn test_cannot_pause_non_running_projection() {
675        let mut projection =
676            Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
677                .unwrap();
678
679        let result = projection.pause();
680        assert!(result.is_err());
681    }
682
683    #[test]
684    fn test_stop_projection() {
685        let mut projection =
686            Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
687                .unwrap();
688
689        projection.start().unwrap();
690        let result = projection.stop();
691        assert!(result.is_ok());
692        assert_eq!(projection.status(), ProjectionStatus::Stopped);
693        assert!(projection.stopped_at.is_some());
694    }
695
696    #[test]
697    fn test_mark_failed() {
698        let mut projection =
699            Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
700                .unwrap();
701
702        projection.start().unwrap();
703        projection.mark_failed();
704        assert_eq!(projection.status(), ProjectionStatus::Failed);
705        assert!(projection.status().is_failed());
706    }
707
708    #[test]
709    fn test_start_rebuild() {
710        let mut projection =
711            Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
712                .unwrap();
713
714        // Process some events first
715        projection.stats_mut().record_event_processed(10);
716        assert_eq!(projection.stats().events_processed(), 1);
717
718        let result = projection.start_rebuild();
719        assert!(result.is_ok());
720        assert_eq!(projection.status(), ProjectionStatus::Rebuilding);
721        assert_eq!(projection.stats().events_processed(), 0); // Stats reset
722    }
723
724    #[test]
725    fn test_set_description() {
726        let mut projection =
727            Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
728                .unwrap();
729
730        let result = projection.set_description("Test projection".to_string());
731        assert!(result.is_ok());
732        assert_eq!(projection.description(), Some("Test projection"));
733    }
734
735    #[test]
736    fn test_reject_too_long_description() {
737        let mut projection =
738            Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
739                .unwrap();
740
741        let long_desc = "a".repeat(1001);
742        let result = projection.set_description(long_desc);
743        assert!(result.is_err());
744    }
745
746    #[test]
747    fn test_add_event_type() {
748        let mut projection =
749            Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
750                .unwrap();
751
752        let event_type = test_event_type();
753        let result = projection.add_event_type(event_type.clone());
754        assert!(result.is_ok());
755        assert_eq!(projection.event_types().len(), 1);
756        assert!(projection.processes_event_type(&event_type));
757    }
758
759    #[test]
760    fn test_reject_duplicate_event_type() {
761        let mut projection =
762            Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
763                .unwrap();
764
765        let event_type = test_event_type();
766        projection.add_event_type(event_type.clone()).unwrap();
767        let result = projection.add_event_type(event_type);
768        assert!(result.is_err());
769    }
770
771    #[test]
772    fn test_remove_event_type() {
773        let mut projection =
774            Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
775                .unwrap();
776
777        let event_type = test_event_type();
778        projection.add_event_type(event_type.clone()).unwrap();
779
780        let result = projection.remove_event_type(&event_type);
781        assert!(result.is_ok());
782        assert_eq!(projection.event_types().len(), 0);
783    }
784
785    #[test]
786    fn test_processes_all_events_when_no_filter() {
787        let projection =
788            Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
789                .unwrap();
790
791        let event_type = test_event_type();
792        assert!(projection.processes_event_type(&event_type));
793    }
794
795    #[test]
796    fn test_projection_stats() {
797        let mut projection =
798            Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
799                .unwrap();
800
801        // Record some events
802        projection.stats_mut().record_event_processed(10);
803        projection.stats_mut().record_event_processed(20);
804        projection.stats_mut().record_event_processed(30);
805
806        assert_eq!(projection.stats().events_processed(), 3);
807        assert_eq!(projection.stats().processing_time_ms(), 60);
808        assert_eq!(projection.stats().avg_processing_time_ms(), 20.0);
809    }
810
811    #[test]
812    fn test_stats_record_error() {
813        let mut projection =
814            Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
815                .unwrap();
816
817        projection.stats_mut().record_error();
818        projection.stats_mut().record_error();
819
820        assert_eq!(projection.stats().errors_count(), 2);
821    }
822
823    #[test]
824    fn test_belongs_to_tenant() {
825        let tenant1 = TenantId::new("tenant1".to_string()).unwrap();
826        let tenant2 = TenantId::new("tenant2".to_string()).unwrap();
827
828        let projection =
829            Projection::new_v1(tenant1.clone(), "test".to_string(), ProjectionType::Custom)
830                .unwrap();
831
832        assert!(projection.belongs_to_tenant(&tenant1));
833        assert!(!projection.belongs_to_tenant(&tenant2));
834    }
835
836    #[test]
837    fn test_create_next_version() {
838        let projection_v1 = Projection::new_v1(
839            test_tenant_id(),
840            "test_projection".to_string(),
841            ProjectionType::Custom,
842        )
843        .unwrap();
844
845        let projection_v2 = projection_v1.create_next_version();
846        assert!(projection_v2.is_ok());
847
848        let projection_v2 = projection_v2.unwrap();
849        assert_eq!(projection_v2.version(), 2);
850        assert_eq!(projection_v2.name(), "test_projection");
851        assert_eq!(projection_v2.projection_type(), ProjectionType::Custom);
852        assert!(!projection_v2.is_first_version());
853    }
854
855    #[test]
856    fn test_projection_status_checks() {
857        assert!(ProjectionStatus::Running.is_active());
858        assert!(ProjectionStatus::Rebuilding.is_active());
859        assert!(!ProjectionStatus::Paused.is_active());
860
861        assert!(ProjectionStatus::Created.can_start());
862        assert!(ProjectionStatus::Stopped.can_start());
863        assert!(!ProjectionStatus::Running.can_start());
864
865        assert!(ProjectionStatus::Running.can_pause());
866        assert!(!ProjectionStatus::Created.can_pause());
867
868        assert!(ProjectionStatus::Running.can_stop());
869        assert!(!ProjectionStatus::Stopped.can_stop());
870
871        assert!(ProjectionStatus::Failed.is_failed());
872        assert!(!ProjectionStatus::Running.is_failed());
873    }
874
875    #[test]
876    fn test_update_config() {
877        let mut projection =
878            Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
879                .unwrap();
880
881        let config = ProjectionConfig {
882            batch_size: 500,
883            parallel_processing: true,
884            ..Default::default()
885        };
886
887        projection.update_config(config);
888        assert_eq!(projection.config().batch_size, 500);
889        assert!(projection.config().parallel_processing);
890    }
891
892    #[test]
893    fn test_serde_serialization() {
894        let projection =
895            Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
896                .unwrap();
897
898        // Should be able to serialize
899        let json = serde_json::to_string(&projection);
900        assert!(json.is_ok());
901
902        // Should be able to deserialize
903        let deserialized = serde_json::from_str::<Projection>(&json.unwrap());
904        assert!(deserialized.is_ok());
905    }
906}