Skip to main content

allsource_core/domain/entities/
projection.rs

1use crate::{
2    domain::value_objects::{EventType, TenantId},
3    error::Result,
4};
5use chrono::{DateTime, Utc};
6use serde::{Deserialize, Serialize};
7use uuid::Uuid;
8
9/// Projection status
10///
11/// Tracks the current state of a projection's lifecycle.
12#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
13#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
14pub enum ProjectionStatus {
15    /// Projection is defined but not yet started
16    Created,
17    /// Projection is actively processing events
18    Running,
19    /// Projection is temporarily paused
20    Paused,
21    /// Projection has encountered an error
22    Failed,
23    /// Projection has been stopped
24    Stopped,
25    /// Projection is being rebuilt from scratch
26    Rebuilding,
27}
28
29impl ProjectionStatus {
30    /// Check if projection is actively processing
31    pub fn is_active(&self) -> bool {
32        matches!(self, Self::Running | Self::Rebuilding)
33    }
34
35    /// Check if projection can be started
36    pub fn can_start(&self) -> bool {
37        matches!(self, Self::Created | Self::Stopped | Self::Paused)
38    }
39
40    /// Check if projection can be paused
41    pub fn can_pause(&self) -> bool {
42        matches!(self, Self::Running)
43    }
44
45    /// Check if projection can be stopped
46    pub fn can_stop(&self) -> bool {
47        !matches!(self, Self::Stopped)
48    }
49
50    /// Check if projection is in error state
51    pub fn is_failed(&self) -> bool {
52        matches!(self, Self::Failed)
53    }
54}
55
56/// Projection type
57///
58/// Defines how the projection processes events.
59#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
60#[serde(rename_all = "snake_case")]
61pub enum ProjectionType {
62    /// Maintains current state by entity ID
63    EntitySnapshot,
64    /// Counts events by type
65    EventCounter,
66    /// Custom user-defined projection logic
67    Custom,
68    /// Time-series aggregation
69    TimeSeries,
70    /// Funnel analysis
71    Funnel,
72}
73
74/// Projection configuration
75///
76/// Optional settings that control projection behavior.
77#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct ProjectionConfig {
79    /// Maximum number of events to process in a batch
80    pub batch_size: usize,
81    /// Whether to checkpoint state periodically
82    pub enable_checkpoints: bool,
83    /// Checkpoint interval in number of events
84    pub checkpoint_interval: usize,
85    /// Whether to process events in parallel
86    pub parallel_processing: bool,
87    /// Maximum concurrent event handlers
88    pub max_concurrency: usize,
89}
90
91impl Default for ProjectionConfig {
92    fn default() -> Self {
93        Self {
94            batch_size: 100,
95            enable_checkpoints: true,
96            checkpoint_interval: 1000,
97            parallel_processing: false,
98            max_concurrency: 4,
99        }
100    }
101}
102
103/// Projection statistics
104///
105/// Tracks operational metrics for a projection.
106#[derive(Debug, Clone, Serialize, Deserialize)]
107pub struct ProjectionStats {
108    events_processed: u64,
109    last_processed_at: Option<DateTime<Utc>>,
110    last_checkpoint_at: Option<DateTime<Utc>>,
111    errors_count: u64,
112    last_error_at: Option<DateTime<Utc>>,
113    processing_time_ms: u64,
114}
115
116impl ProjectionStats {
117    pub fn new() -> Self {
118        Self {
119            events_processed: 0,
120            last_processed_at: None,
121            last_checkpoint_at: None,
122            errors_count: 0,
123            last_error_at: None,
124            processing_time_ms: 0,
125        }
126    }
127
128    // Getters
129    pub fn events_processed(&self) -> u64 {
130        self.events_processed
131    }
132
133    pub fn errors_count(&self) -> u64 {
134        self.errors_count
135    }
136
137    pub fn last_processed_at(&self) -> Option<DateTime<Utc>> {
138        self.last_processed_at
139    }
140
141    pub fn processing_time_ms(&self) -> u64 {
142        self.processing_time_ms
143    }
144
145    // Mutation methods
146    pub fn record_event_processed(&mut self, processing_time_ms: u64) {
147        self.events_processed += 1;
148        self.last_processed_at = Some(Utc::now());
149        self.processing_time_ms += processing_time_ms;
150    }
151
152    pub fn record_error(&mut self) {
153        self.errors_count += 1;
154        self.last_error_at = Some(Utc::now());
155    }
156
157    pub fn record_checkpoint(&mut self) {
158        self.last_checkpoint_at = Some(Utc::now());
159    }
160
161    pub fn reset(&mut self) {
162        *self = Self::new();
163    }
164
165    /// Calculate average processing time per event in milliseconds
166    pub fn avg_processing_time_ms(&self) -> f64 {
167        if self.events_processed == 0 {
168            0.0
169        } else {
170            self.processing_time_ms as f64 / self.events_processed as f64
171        }
172    }
173}
174
175impl Default for ProjectionStats {
176    fn default() -> Self {
177        Self::new()
178    }
179}
180
181/// Domain Entity: Projection
182///
183/// Represents a projection that aggregates events into a queryable view.
184/// Projections are materialized views derived from the event stream.
185///
186/// Domain Rules:
187/// - Name must be unique within a tenant
188/// - Name cannot be empty
189/// - Version starts at 1 and increments
190/// - Cannot change projection type after creation
191/// - Status transitions must follow lifecycle rules
192/// - Stats are accurate and updated on each event
193#[derive(Debug, Clone, Serialize, Deserialize)]
194pub struct Projection {
195    id: Uuid,
196    tenant_id: TenantId,
197    name: String,
198    version: u32,
199    projection_type: ProjectionType,
200    status: ProjectionStatus,
201    config: ProjectionConfig,
202    stats: ProjectionStats,
203    created_at: DateTime<Utc>,
204    updated_at: DateTime<Utc>,
205    started_at: Option<DateTime<Utc>>,
206    stopped_at: Option<DateTime<Utc>>,
207    description: Option<String>,
208    /// Event types this projection is interested in (empty = all events)
209    event_types: Vec<EventType>,
210    /// Custom metadata
211    metadata: serde_json::Value,
212}
213
214impl Projection {
215    /// Create a new projection with validation
216    pub fn new(
217        tenant_id: TenantId,
218        name: String,
219        version: u32,
220        projection_type: ProjectionType,
221    ) -> Result<Self> {
222        Self::validate_name(&name)?;
223        Self::validate_version(version)?;
224
225        let now = Utc::now();
226        Ok(Self {
227            id: Uuid::new_v4(),
228            tenant_id,
229            name,
230            version,
231            projection_type,
232            status: ProjectionStatus::Created,
233            config: ProjectionConfig::default(),
234            stats: ProjectionStats::new(),
235            created_at: now,
236            updated_at: now,
237            started_at: None,
238            stopped_at: None,
239            description: None,
240            event_types: Vec::new(),
241            metadata: serde_json::json!({}),
242        })
243    }
244
245    /// Create first version of a projection
246    pub fn new_v1(
247        tenant_id: TenantId,
248        name: String,
249        projection_type: ProjectionType,
250    ) -> Result<Self> {
251        Self::new(tenant_id, name, 1, projection_type)
252    }
253
254    /// Reconstruct projection from storage (bypasses validation)
255    #[allow(clippy::too_many_arguments)]
256    pub fn reconstruct(
257        id: Uuid,
258        tenant_id: TenantId,
259        name: String,
260        version: u32,
261        projection_type: ProjectionType,
262        status: ProjectionStatus,
263        config: ProjectionConfig,
264        stats: ProjectionStats,
265        created_at: DateTime<Utc>,
266        updated_at: DateTime<Utc>,
267        started_at: Option<DateTime<Utc>>,
268        stopped_at: Option<DateTime<Utc>>,
269        description: Option<String>,
270        event_types: Vec<EventType>,
271        metadata: serde_json::Value,
272    ) -> Self {
273        Self {
274            id,
275            tenant_id,
276            name,
277            version,
278            projection_type,
279            status,
280            config,
281            stats,
282            created_at,
283            updated_at,
284            started_at,
285            stopped_at,
286            description,
287            event_types,
288            metadata,
289        }
290    }
291
292    // Getters
293
294    pub fn id(&self) -> Uuid {
295        self.id
296    }
297
298    pub fn tenant_id(&self) -> &TenantId {
299        &self.tenant_id
300    }
301
302    pub fn name(&self) -> &str {
303        &self.name
304    }
305
306    pub fn version(&self) -> u32 {
307        self.version
308    }
309
310    pub fn projection_type(&self) -> ProjectionType {
311        self.projection_type
312    }
313
314    pub fn status(&self) -> ProjectionStatus {
315        self.status
316    }
317
318    pub fn config(&self) -> &ProjectionConfig {
319        &self.config
320    }
321
322    pub fn stats(&self) -> &ProjectionStats {
323        &self.stats
324    }
325
326    pub fn created_at(&self) -> DateTime<Utc> {
327        self.created_at
328    }
329
330    pub fn updated_at(&self) -> DateTime<Utc> {
331        self.updated_at
332    }
333
334    pub fn description(&self) -> Option<&str> {
335        self.description.as_deref()
336    }
337
338    pub fn event_types(&self) -> &[EventType] {
339        &self.event_types
340    }
341
342    pub fn metadata(&self) -> &serde_json::Value {
343        &self.metadata
344    }
345
346    // Domain behavior methods
347
348    /// Start the projection
349    pub fn start(&mut self) -> Result<()> {
350        if !self.status.can_start() {
351            return Err(crate::error::AllSourceError::ValidationError(format!(
352                "Cannot start projection in status {:?}",
353                self.status
354            )));
355        }
356
357        self.status = ProjectionStatus::Running;
358        self.started_at = Some(Utc::now());
359        self.updated_at = Utc::now();
360        Ok(())
361    }
362
363    /// Pause the projection
364    pub fn pause(&mut self) -> Result<()> {
365        if !self.status.can_pause() {
366            return Err(crate::error::AllSourceError::ValidationError(format!(
367                "Cannot pause projection in status {:?}",
368                self.status
369            )));
370        }
371
372        self.status = ProjectionStatus::Paused;
373        self.updated_at = Utc::now();
374        Ok(())
375    }
376
377    /// Stop the projection
378    pub fn stop(&mut self) -> Result<()> {
379        if !self.status.can_stop() {
380            return Err(crate::error::AllSourceError::ValidationError(format!(
381                "Cannot stop projection in status {:?}",
382                self.status
383            )));
384        }
385
386        self.status = ProjectionStatus::Stopped;
387        self.stopped_at = Some(Utc::now());
388        self.updated_at = Utc::now();
389        Ok(())
390    }
391
392    /// Mark projection as failed
393    pub fn mark_failed(&mut self) {
394        self.status = ProjectionStatus::Failed;
395        self.updated_at = Utc::now();
396    }
397
398    /// Start rebuilding the projection
399    pub fn start_rebuild(&mut self) -> Result<()> {
400        self.status = ProjectionStatus::Rebuilding;
401        self.stats.reset();
402        self.updated_at = Utc::now();
403        Ok(())
404    }
405
406    /// Update configuration
407    pub fn update_config(&mut self, config: ProjectionConfig) {
408        self.config = config;
409        self.updated_at = Utc::now();
410    }
411
412    /// Set description
413    pub fn set_description(&mut self, description: String) -> Result<()> {
414        Self::validate_description(&description)?;
415        self.description = Some(description);
416        self.updated_at = Utc::now();
417        Ok(())
418    }
419
420    /// Add event type filter
421    pub fn add_event_type(&mut self, event_type: EventType) -> Result<()> {
422        if self.event_types.contains(&event_type) {
423            return Err(crate::error::AllSourceError::InvalidInput(format!(
424                "Event type '{}' already in filter",
425                event_type.as_str()
426            )));
427        }
428
429        self.event_types.push(event_type);
430        self.updated_at = Utc::now();
431        Ok(())
432    }
433
434    /// Remove event type filter
435    pub fn remove_event_type(&mut self, event_type: &EventType) -> Result<()> {
436        let initial_len = self.event_types.len();
437        self.event_types.retain(|et| et != event_type);
438
439        if self.event_types.len() == initial_len {
440            return Err(crate::error::AllSourceError::InvalidInput(format!(
441                "Event type '{}' not in filter",
442                event_type.as_str()
443            )));
444        }
445
446        self.updated_at = Utc::now();
447        Ok(())
448    }
449
450    /// Check if projection processes this event type
451    pub fn processes_event_type(&self, event_type: &EventType) -> bool {
452        // Empty filter means process all events
453        self.event_types.is_empty() || self.event_types.contains(event_type)
454    }
455
456    /// Update metadata
457    pub fn update_metadata(&mut self, metadata: serde_json::Value) {
458        self.metadata = metadata;
459        self.updated_at = Utc::now();
460    }
461
462    /// Get mutable access to stats (for recording events)
463    pub fn stats_mut(&mut self) -> &mut ProjectionStats {
464        self.updated_at = Utc::now();
465        &mut self.stats
466    }
467
468    /// Check if projection is first version
469    pub fn is_first_version(&self) -> bool {
470        self.version == 1
471    }
472
473    /// Check if projection belongs to tenant
474    pub fn belongs_to_tenant(&self, tenant_id: &TenantId) -> bool {
475        &self.tenant_id == tenant_id
476    }
477
478    /// Create next version
479    pub fn create_next_version(&self) -> Result<Projection> {
480        Projection::new(
481            self.tenant_id.clone(),
482            self.name.clone(),
483            self.version + 1,
484            self.projection_type,
485        )
486    }
487
488    // Validation methods
489
490    fn validate_name(name: &str) -> Result<()> {
491        if name.is_empty() {
492            return Err(crate::error::AllSourceError::InvalidInput(
493                "Projection name cannot be empty".to_string(),
494            ));
495        }
496
497        if name.len() > 100 {
498            return Err(crate::error::AllSourceError::InvalidInput(format!(
499                "Projection name cannot exceed 100 characters, got {}",
500                name.len()
501            )));
502        }
503
504        // Name should be alphanumeric with underscores/hyphens
505        if !name
506            .chars()
507            .all(|c| c.is_alphanumeric() || c == '_' || c == '-')
508        {
509            return Err(crate::error::AllSourceError::InvalidInput(format!(
510                "Projection name '{name}' must be alphanumeric with underscores or hyphens"
511            )));
512        }
513
514        Ok(())
515    }
516
517    fn validate_version(version: u32) -> Result<()> {
518        if version == 0 {
519            return Err(crate::error::AllSourceError::InvalidInput(
520                "Projection version must be >= 1".to_string(),
521            ));
522        }
523        Ok(())
524    }
525
526    fn validate_description(description: &str) -> Result<()> {
527        if description.len() > 1000 {
528            return Err(crate::error::AllSourceError::InvalidInput(format!(
529                "Projection description cannot exceed 1000 characters, got {}",
530                description.len()
531            )));
532        }
533        Ok(())
534    }
535}
536
537#[cfg(test)]
538mod tests {
539    use super::*;
540
541    fn test_tenant_id() -> TenantId {
542        TenantId::new("test-tenant".to_string()).unwrap()
543    }
544
545    fn test_event_type() -> EventType {
546        EventType::new("test.event".to_string()).unwrap()
547    }
548
549    #[test]
550    fn test_create_projection() {
551        let projection = Projection::new(
552            test_tenant_id(),
553            "user_snapshot".to_string(),
554            1,
555            ProjectionType::EntitySnapshot,
556        );
557
558        assert!(projection.is_ok());
559        let projection = projection.unwrap();
560        assert_eq!(projection.name(), "user_snapshot");
561        assert_eq!(projection.version(), 1);
562        assert_eq!(projection.status(), ProjectionStatus::Created);
563        assert_eq!(projection.projection_type(), ProjectionType::EntitySnapshot);
564    }
565
566    #[test]
567    fn test_create_v1_projection() {
568        let projection = Projection::new_v1(
569            test_tenant_id(),
570            "event_counter".to_string(),
571            ProjectionType::EventCounter,
572        );
573
574        assert!(projection.is_ok());
575        let projection = projection.unwrap();
576        assert_eq!(projection.version(), 1);
577        assert!(projection.is_first_version());
578    }
579
580    #[test]
581    fn test_reject_empty_name() {
582        let result = Projection::new(test_tenant_id(), String::new(), 1, ProjectionType::Custom);
583
584        assert!(result.is_err());
585    }
586
587    #[test]
588    fn test_reject_too_long_name() {
589        let long_name = "a".repeat(101);
590        let result = Projection::new(test_tenant_id(), long_name, 1, ProjectionType::Custom);
591
592        assert!(result.is_err());
593    }
594
595    #[test]
596    fn test_reject_invalid_name_characters() {
597        let result = Projection::new(
598            test_tenant_id(),
599            "invalid name!".to_string(),
600            1,
601            ProjectionType::Custom,
602        );
603
604        assert!(result.is_err());
605    }
606
607    #[test]
608    fn test_accept_valid_names() {
609        let names = vec!["user_snapshot", "event-counter", "projection123"];
610
611        for name in names {
612            let result = Projection::new(
613                test_tenant_id(),
614                name.to_string(),
615                1,
616                ProjectionType::Custom,
617            );
618            assert!(result.is_ok(), "Name '{name}' should be valid");
619        }
620    }
621
622    #[test]
623    fn test_reject_zero_version() {
624        let result = Projection::new(
625            test_tenant_id(),
626            "test_projection".to_string(),
627            0,
628            ProjectionType::Custom,
629        );
630
631        assert!(result.is_err());
632    }
633
634    #[test]
635    fn test_start_projection() {
636        let mut projection =
637            Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
638                .unwrap();
639
640        assert_eq!(projection.status(), ProjectionStatus::Created);
641        assert!(projection.started_at.is_none());
642
643        let result = projection.start();
644        assert!(result.is_ok());
645        assert_eq!(projection.status(), ProjectionStatus::Running);
646        assert!(projection.started_at.is_some());
647    }
648
649    #[test]
650    fn test_cannot_start_running_projection() {
651        let mut projection =
652            Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
653                .unwrap();
654
655        projection.start().unwrap();
656        let result = projection.start();
657        assert!(result.is_err());
658    }
659
660    #[test]
661    fn test_pause_projection() {
662        let mut projection =
663            Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
664                .unwrap();
665
666        projection.start().unwrap();
667        let result = projection.pause();
668        assert!(result.is_ok());
669        assert_eq!(projection.status(), ProjectionStatus::Paused);
670    }
671
672    #[test]
673    fn test_cannot_pause_non_running_projection() {
674        let mut projection =
675            Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
676                .unwrap();
677
678        let result = projection.pause();
679        assert!(result.is_err());
680    }
681
682    #[test]
683    fn test_stop_projection() {
684        let mut projection =
685            Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
686                .unwrap();
687
688        projection.start().unwrap();
689        let result = projection.stop();
690        assert!(result.is_ok());
691        assert_eq!(projection.status(), ProjectionStatus::Stopped);
692        assert!(projection.stopped_at.is_some());
693    }
694
695    #[test]
696    fn test_mark_failed() {
697        let mut projection =
698            Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
699                .unwrap();
700
701        projection.start().unwrap();
702        projection.mark_failed();
703        assert_eq!(projection.status(), ProjectionStatus::Failed);
704        assert!(projection.status().is_failed());
705    }
706
707    #[test]
708    fn test_start_rebuild() {
709        let mut projection =
710            Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
711                .unwrap();
712
713        // Process some events first
714        projection.stats_mut().record_event_processed(10);
715        assert_eq!(projection.stats().events_processed(), 1);
716
717        let result = projection.start_rebuild();
718        assert!(result.is_ok());
719        assert_eq!(projection.status(), ProjectionStatus::Rebuilding);
720        assert_eq!(projection.stats().events_processed(), 0); // Stats reset
721    }
722
723    #[test]
724    fn test_set_description() {
725        let mut projection =
726            Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
727                .unwrap();
728
729        let result = projection.set_description("Test projection".to_string());
730        assert!(result.is_ok());
731        assert_eq!(projection.description(), Some("Test projection"));
732    }
733
734    #[test]
735    fn test_reject_too_long_description() {
736        let mut projection =
737            Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
738                .unwrap();
739
740        let long_desc = "a".repeat(1001);
741        let result = projection.set_description(long_desc);
742        assert!(result.is_err());
743    }
744
745    #[test]
746    fn test_add_event_type() {
747        let mut projection =
748            Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
749                .unwrap();
750
751        let event_type = test_event_type();
752        let result = projection.add_event_type(event_type.clone());
753        assert!(result.is_ok());
754        assert_eq!(projection.event_types().len(), 1);
755        assert!(projection.processes_event_type(&event_type));
756    }
757
758    #[test]
759    fn test_reject_duplicate_event_type() {
760        let mut projection =
761            Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
762                .unwrap();
763
764        let event_type = test_event_type();
765        projection.add_event_type(event_type.clone()).unwrap();
766        let result = projection.add_event_type(event_type);
767        assert!(result.is_err());
768    }
769
770    #[test]
771    fn test_remove_event_type() {
772        let mut projection =
773            Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
774                .unwrap();
775
776        let event_type = test_event_type();
777        projection.add_event_type(event_type.clone()).unwrap();
778
779        let result = projection.remove_event_type(&event_type);
780        assert!(result.is_ok());
781        assert_eq!(projection.event_types().len(), 0);
782    }
783
784    #[test]
785    fn test_processes_all_events_when_no_filter() {
786        let projection =
787            Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
788                .unwrap();
789
790        let event_type = test_event_type();
791        assert!(projection.processes_event_type(&event_type));
792    }
793
794    #[test]
795    fn test_projection_stats() {
796        let mut projection =
797            Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
798                .unwrap();
799
800        // Record some events
801        projection.stats_mut().record_event_processed(10);
802        projection.stats_mut().record_event_processed(20);
803        projection.stats_mut().record_event_processed(30);
804
805        assert_eq!(projection.stats().events_processed(), 3);
806        assert_eq!(projection.stats().processing_time_ms(), 60);
807        assert_eq!(projection.stats().avg_processing_time_ms(), 20.0);
808    }
809
810    #[test]
811    fn test_stats_record_error() {
812        let mut projection =
813            Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
814                .unwrap();
815
816        projection.stats_mut().record_error();
817        projection.stats_mut().record_error();
818
819        assert_eq!(projection.stats().errors_count(), 2);
820    }
821
822    #[test]
823    fn test_belongs_to_tenant() {
824        let tenant1 = TenantId::new("tenant1".to_string()).unwrap();
825        let tenant2 = TenantId::new("tenant2".to_string()).unwrap();
826
827        let projection =
828            Projection::new_v1(tenant1.clone(), "test".to_string(), ProjectionType::Custom)
829                .unwrap();
830
831        assert!(projection.belongs_to_tenant(&tenant1));
832        assert!(!projection.belongs_to_tenant(&tenant2));
833    }
834
835    #[test]
836    fn test_create_next_version() {
837        let projection_v1 = Projection::new_v1(
838            test_tenant_id(),
839            "test_projection".to_string(),
840            ProjectionType::Custom,
841        )
842        .unwrap();
843
844        let projection_v2 = projection_v1.create_next_version();
845        assert!(projection_v2.is_ok());
846
847        let projection_v2 = projection_v2.unwrap();
848        assert_eq!(projection_v2.version(), 2);
849        assert_eq!(projection_v2.name(), "test_projection");
850        assert_eq!(projection_v2.projection_type(), ProjectionType::Custom);
851        assert!(!projection_v2.is_first_version());
852    }
853
854    #[test]
855    fn test_projection_status_checks() {
856        assert!(ProjectionStatus::Running.is_active());
857        assert!(ProjectionStatus::Rebuilding.is_active());
858        assert!(!ProjectionStatus::Paused.is_active());
859
860        assert!(ProjectionStatus::Created.can_start());
861        assert!(ProjectionStatus::Stopped.can_start());
862        assert!(!ProjectionStatus::Running.can_start());
863
864        assert!(ProjectionStatus::Running.can_pause());
865        assert!(!ProjectionStatus::Created.can_pause());
866
867        assert!(ProjectionStatus::Running.can_stop());
868        assert!(!ProjectionStatus::Stopped.can_stop());
869
870        assert!(ProjectionStatus::Failed.is_failed());
871        assert!(!ProjectionStatus::Running.is_failed());
872    }
873
874    #[test]
875    fn test_update_config() {
876        let mut projection =
877            Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
878                .unwrap();
879
880        let config = ProjectionConfig {
881            batch_size: 500,
882            parallel_processing: true,
883            ..Default::default()
884        };
885
886        projection.update_config(config);
887        assert_eq!(projection.config().batch_size, 500);
888        assert!(projection.config().parallel_processing);
889    }
890
891    #[test]
892    fn test_serde_serialization() {
893        let projection =
894            Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
895                .unwrap();
896
897        // Should be able to serialize
898        let json = serde_json::to_string(&projection);
899        assert!(json.is_ok());
900
901        // Should be able to deserialize
902        let deserialized = serde_json::from_str::<Projection>(&json.unwrap());
903        assert!(deserialized.is_ok());
904    }
905}