Skip to main content

allsource_core/domain/entities/
projection.rs

1use crate::domain::value_objects::{EventType, TenantId};
2use crate::error::Result;
3use chrono::{DateTime, Utc};
4use serde::{Deserialize, Serialize};
5use uuid::Uuid;
6
7/// Projection status
8///
9/// Tracks the current state of a projection's lifecycle.
10#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
11#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
12pub enum ProjectionStatus {
13    /// Projection is defined but not yet started
14    Created,
15    /// Projection is actively processing events
16    Running,
17    /// Projection is temporarily paused
18    Paused,
19    /// Projection has encountered an error
20    Failed,
21    /// Projection has been stopped
22    Stopped,
23    /// Projection is being rebuilt from scratch
24    Rebuilding,
25}
26
27impl ProjectionStatus {
28    /// Check if projection is actively processing
29    pub fn is_active(&self) -> bool {
30        matches!(self, Self::Running | Self::Rebuilding)
31    }
32
33    /// Check if projection can be started
34    pub fn can_start(&self) -> bool {
35        matches!(self, Self::Created | Self::Stopped | Self::Paused)
36    }
37
38    /// Check if projection can be paused
39    pub fn can_pause(&self) -> bool {
40        matches!(self, Self::Running)
41    }
42
43    /// Check if projection can be stopped
44    pub fn can_stop(&self) -> bool {
45        !matches!(self, Self::Stopped)
46    }
47
48    /// Check if projection is in error state
49    pub fn is_failed(&self) -> bool {
50        matches!(self, Self::Failed)
51    }
52}
53
54/// Projection type
55///
56/// Defines how the projection processes events.
57#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
58#[serde(rename_all = "snake_case")]
59pub enum ProjectionType {
60    /// Maintains current state by entity ID
61    EntitySnapshot,
62    /// Counts events by type
63    EventCounter,
64    /// Custom user-defined projection logic
65    Custom,
66    /// Time-series aggregation
67    TimeSeries,
68    /// Funnel analysis
69    Funnel,
70}
71
72/// Projection configuration
73///
74/// Optional settings that control projection behavior.
75#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct ProjectionConfig {
77    /// Maximum number of events to process in a batch
78    pub batch_size: usize,
79    /// Whether to checkpoint state periodically
80    pub enable_checkpoints: bool,
81    /// Checkpoint interval in number of events
82    pub checkpoint_interval: usize,
83    /// Whether to process events in parallel
84    pub parallel_processing: bool,
85    /// Maximum concurrent event handlers
86    pub max_concurrency: usize,
87}
88
89impl Default for ProjectionConfig {
90    fn default() -> Self {
91        Self {
92            batch_size: 100,
93            enable_checkpoints: true,
94            checkpoint_interval: 1000,
95            parallel_processing: false,
96            max_concurrency: 4,
97        }
98    }
99}
100
101/// Projection statistics
102///
103/// Tracks operational metrics for a projection.
104#[derive(Debug, Clone, Serialize, Deserialize)]
105pub struct ProjectionStats {
106    events_processed: u64,
107    last_processed_at: Option<DateTime<Utc>>,
108    last_checkpoint_at: Option<DateTime<Utc>>,
109    errors_count: u64,
110    last_error_at: Option<DateTime<Utc>>,
111    processing_time_ms: u64,
112}
113
114impl ProjectionStats {
115    pub fn new() -> Self {
116        Self {
117            events_processed: 0,
118            last_processed_at: None,
119            last_checkpoint_at: None,
120            errors_count: 0,
121            last_error_at: None,
122            processing_time_ms: 0,
123        }
124    }
125
126    // Getters
127    pub fn events_processed(&self) -> u64 {
128        self.events_processed
129    }
130
131    pub fn errors_count(&self) -> u64 {
132        self.errors_count
133    }
134
135    pub fn last_processed_at(&self) -> Option<DateTime<Utc>> {
136        self.last_processed_at
137    }
138
139    pub fn processing_time_ms(&self) -> u64 {
140        self.processing_time_ms
141    }
142
143    // Mutation methods
144    pub fn record_event_processed(&mut self, processing_time_ms: u64) {
145        self.events_processed += 1;
146        self.last_processed_at = Some(Utc::now());
147        self.processing_time_ms += processing_time_ms;
148    }
149
150    pub fn record_error(&mut self) {
151        self.errors_count += 1;
152        self.last_error_at = Some(Utc::now());
153    }
154
155    pub fn record_checkpoint(&mut self) {
156        self.last_checkpoint_at = Some(Utc::now());
157    }
158
159    pub fn reset(&mut self) {
160        *self = Self::new();
161    }
162
163    /// Calculate average processing time per event in milliseconds
164    pub fn avg_processing_time_ms(&self) -> f64 {
165        if self.events_processed == 0 {
166            0.0
167        } else {
168            self.processing_time_ms as f64 / self.events_processed as f64
169        }
170    }
171}
172
173impl Default for ProjectionStats {
174    fn default() -> Self {
175        Self::new()
176    }
177}
178
179/// Domain Entity: Projection
180///
181/// Represents a projection that aggregates events into a queryable view.
182/// Projections are materialized views derived from the event stream.
183///
184/// Domain Rules:
185/// - Name must be unique within a tenant
186/// - Name cannot be empty
187/// - Version starts at 1 and increments
188/// - Cannot change projection type after creation
189/// - Status transitions must follow lifecycle rules
190/// - Stats are accurate and updated on each event
191#[derive(Debug, Clone, Serialize, Deserialize)]
192pub struct Projection {
193    id: Uuid,
194    tenant_id: TenantId,
195    name: String,
196    version: u32,
197    projection_type: ProjectionType,
198    status: ProjectionStatus,
199    config: ProjectionConfig,
200    stats: ProjectionStats,
201    created_at: DateTime<Utc>,
202    updated_at: DateTime<Utc>,
203    started_at: Option<DateTime<Utc>>,
204    stopped_at: Option<DateTime<Utc>>,
205    description: Option<String>,
206    /// Event types this projection is interested in (empty = all events)
207    event_types: Vec<EventType>,
208    /// Custom metadata
209    metadata: serde_json::Value,
210}
211
212impl Projection {
213    /// Create a new projection with validation
214    pub fn new(
215        tenant_id: TenantId,
216        name: String,
217        version: u32,
218        projection_type: ProjectionType,
219    ) -> Result<Self> {
220        Self::validate_name(&name)?;
221        Self::validate_version(version)?;
222
223        let now = Utc::now();
224        Ok(Self {
225            id: Uuid::new_v4(),
226            tenant_id,
227            name,
228            version,
229            projection_type,
230            status: ProjectionStatus::Created,
231            config: ProjectionConfig::default(),
232            stats: ProjectionStats::new(),
233            created_at: now,
234            updated_at: now,
235            started_at: None,
236            stopped_at: None,
237            description: None,
238            event_types: Vec::new(),
239            metadata: serde_json::json!({}),
240        })
241    }
242
243    /// Create first version of a projection
244    pub fn new_v1(
245        tenant_id: TenantId,
246        name: String,
247        projection_type: ProjectionType,
248    ) -> Result<Self> {
249        Self::new(tenant_id, name, 1, projection_type)
250    }
251
252    /// Reconstruct projection from storage (bypasses validation)
253    #[allow(clippy::too_many_arguments)]
254    pub fn reconstruct(
255        id: Uuid,
256        tenant_id: TenantId,
257        name: String,
258        version: u32,
259        projection_type: ProjectionType,
260        status: ProjectionStatus,
261        config: ProjectionConfig,
262        stats: ProjectionStats,
263        created_at: DateTime<Utc>,
264        updated_at: DateTime<Utc>,
265        started_at: Option<DateTime<Utc>>,
266        stopped_at: Option<DateTime<Utc>>,
267        description: Option<String>,
268        event_types: Vec<EventType>,
269        metadata: serde_json::Value,
270    ) -> Self {
271        Self {
272            id,
273            tenant_id,
274            name,
275            version,
276            projection_type,
277            status,
278            config,
279            stats,
280            created_at,
281            updated_at,
282            started_at,
283            stopped_at,
284            description,
285            event_types,
286            metadata,
287        }
288    }
289
290    // Getters
291
292    pub fn id(&self) -> Uuid {
293        self.id
294    }
295
296    pub fn tenant_id(&self) -> &TenantId {
297        &self.tenant_id
298    }
299
300    pub fn name(&self) -> &str {
301        &self.name
302    }
303
304    pub fn version(&self) -> u32 {
305        self.version
306    }
307
308    pub fn projection_type(&self) -> ProjectionType {
309        self.projection_type
310    }
311
312    pub fn status(&self) -> ProjectionStatus {
313        self.status
314    }
315
316    pub fn config(&self) -> &ProjectionConfig {
317        &self.config
318    }
319
320    pub fn stats(&self) -> &ProjectionStats {
321        &self.stats
322    }
323
324    pub fn created_at(&self) -> DateTime<Utc> {
325        self.created_at
326    }
327
328    pub fn updated_at(&self) -> DateTime<Utc> {
329        self.updated_at
330    }
331
332    pub fn description(&self) -> Option<&str> {
333        self.description.as_deref()
334    }
335
336    pub fn event_types(&self) -> &[EventType] {
337        &self.event_types
338    }
339
340    pub fn metadata(&self) -> &serde_json::Value {
341        &self.metadata
342    }
343
344    // Domain behavior methods
345
346    /// Start the projection
347    pub fn start(&mut self) -> Result<()> {
348        if !self.status.can_start() {
349            return Err(crate::error::AllSourceError::ValidationError(format!(
350                "Cannot start projection in status {:?}",
351                self.status
352            )));
353        }
354
355        self.status = ProjectionStatus::Running;
356        self.started_at = Some(Utc::now());
357        self.updated_at = Utc::now();
358        Ok(())
359    }
360
361    /// Pause the projection
362    pub fn pause(&mut self) -> Result<()> {
363        if !self.status.can_pause() {
364            return Err(crate::error::AllSourceError::ValidationError(format!(
365                "Cannot pause projection in status {:?}",
366                self.status
367            )));
368        }
369
370        self.status = ProjectionStatus::Paused;
371        self.updated_at = Utc::now();
372        Ok(())
373    }
374
375    /// Stop the projection
376    pub fn stop(&mut self) -> Result<()> {
377        if !self.status.can_stop() {
378            return Err(crate::error::AllSourceError::ValidationError(format!(
379                "Cannot stop projection in status {:?}",
380                self.status
381            )));
382        }
383
384        self.status = ProjectionStatus::Stopped;
385        self.stopped_at = Some(Utc::now());
386        self.updated_at = Utc::now();
387        Ok(())
388    }
389
390    /// Mark projection as failed
391    pub fn mark_failed(&mut self) {
392        self.status = ProjectionStatus::Failed;
393        self.updated_at = Utc::now();
394    }
395
396    /// Start rebuilding the projection
397    pub fn start_rebuild(&mut self) -> Result<()> {
398        self.status = ProjectionStatus::Rebuilding;
399        self.stats.reset();
400        self.updated_at = Utc::now();
401        Ok(())
402    }
403
404    /// Update configuration
405    pub fn update_config(&mut self, config: ProjectionConfig) {
406        self.config = config;
407        self.updated_at = Utc::now();
408    }
409
410    /// Set description
411    pub fn set_description(&mut self, description: String) -> Result<()> {
412        Self::validate_description(&description)?;
413        self.description = Some(description);
414        self.updated_at = Utc::now();
415        Ok(())
416    }
417
418    /// Add event type filter
419    pub fn add_event_type(&mut self, event_type: EventType) -> Result<()> {
420        if self.event_types.contains(&event_type) {
421            return Err(crate::error::AllSourceError::InvalidInput(format!(
422                "Event type '{}' already in filter",
423                event_type.as_str()
424            )));
425        }
426
427        self.event_types.push(event_type);
428        self.updated_at = Utc::now();
429        Ok(())
430    }
431
432    /// Remove event type filter
433    pub fn remove_event_type(&mut self, event_type: &EventType) -> Result<()> {
434        let initial_len = self.event_types.len();
435        self.event_types.retain(|et| et != event_type);
436
437        if self.event_types.len() == initial_len {
438            return Err(crate::error::AllSourceError::InvalidInput(format!(
439                "Event type '{}' not in filter",
440                event_type.as_str()
441            )));
442        }
443
444        self.updated_at = Utc::now();
445        Ok(())
446    }
447
448    /// Check if projection processes this event type
449    pub fn processes_event_type(&self, event_type: &EventType) -> bool {
450        // Empty filter means process all events
451        self.event_types.is_empty() || self.event_types.contains(event_type)
452    }
453
454    /// Update metadata
455    pub fn update_metadata(&mut self, metadata: serde_json::Value) {
456        self.metadata = metadata;
457        self.updated_at = Utc::now();
458    }
459
460    /// Get mutable access to stats (for recording events)
461    pub fn stats_mut(&mut self) -> &mut ProjectionStats {
462        self.updated_at = Utc::now();
463        &mut self.stats
464    }
465
466    /// Check if projection is first version
467    pub fn is_first_version(&self) -> bool {
468        self.version == 1
469    }
470
471    /// Check if projection belongs to tenant
472    pub fn belongs_to_tenant(&self, tenant_id: &TenantId) -> bool {
473        &self.tenant_id == tenant_id
474    }
475
476    /// Create next version
477    pub fn create_next_version(&self) -> Result<Projection> {
478        Projection::new(
479            self.tenant_id.clone(),
480            self.name.clone(),
481            self.version + 1,
482            self.projection_type,
483        )
484    }
485
486    // Validation methods
487
488    fn validate_name(name: &str) -> Result<()> {
489        if name.is_empty() {
490            return Err(crate::error::AllSourceError::InvalidInput(
491                "Projection name cannot be empty".to_string(),
492            ));
493        }
494
495        if name.len() > 100 {
496            return Err(crate::error::AllSourceError::InvalidInput(format!(
497                "Projection name cannot exceed 100 characters, got {}",
498                name.len()
499            )));
500        }
501
502        // Name should be alphanumeric with underscores/hyphens
503        if !name
504            .chars()
505            .all(|c| c.is_alphanumeric() || c == '_' || c == '-')
506        {
507            return Err(crate::error::AllSourceError::InvalidInput(format!(
508                "Projection name '{}' must be alphanumeric with underscores or hyphens",
509                name
510            )));
511        }
512
513        Ok(())
514    }
515
516    fn validate_version(version: u32) -> Result<()> {
517        if version == 0 {
518            return Err(crate::error::AllSourceError::InvalidInput(
519                "Projection version must be >= 1".to_string(),
520            ));
521        }
522        Ok(())
523    }
524
525    fn validate_description(description: &str) -> Result<()> {
526        if description.len() > 1000 {
527            return Err(crate::error::AllSourceError::InvalidInput(format!(
528                "Projection description cannot exceed 1000 characters, got {}",
529                description.len()
530            )));
531        }
532        Ok(())
533    }
534}
535
536#[cfg(test)]
537mod tests {
538    use super::*;
539
540    fn test_tenant_id() -> TenantId {
541        TenantId::new("test-tenant".to_string()).unwrap()
542    }
543
544    fn test_event_type() -> EventType {
545        EventType::new("test.event".to_string()).unwrap()
546    }
547
548    #[test]
549    fn test_create_projection() {
550        let projection = Projection::new(
551            test_tenant_id(),
552            "user_snapshot".to_string(),
553            1,
554            ProjectionType::EntitySnapshot,
555        );
556
557        assert!(projection.is_ok());
558        let projection = projection.unwrap();
559        assert_eq!(projection.name(), "user_snapshot");
560        assert_eq!(projection.version(), 1);
561        assert_eq!(projection.status(), ProjectionStatus::Created);
562        assert_eq!(projection.projection_type(), ProjectionType::EntitySnapshot);
563    }
564
565    #[test]
566    fn test_create_v1_projection() {
567        let projection = Projection::new_v1(
568            test_tenant_id(),
569            "event_counter".to_string(),
570            ProjectionType::EventCounter,
571        );
572
573        assert!(projection.is_ok());
574        let projection = projection.unwrap();
575        assert_eq!(projection.version(), 1);
576        assert!(projection.is_first_version());
577    }
578
579    #[test]
580    fn test_reject_empty_name() {
581        let result = Projection::new(test_tenant_id(), "".to_string(), 1, ProjectionType::Custom);
582
583        assert!(result.is_err());
584    }
585
586    #[test]
587    fn test_reject_too_long_name() {
588        let long_name = "a".repeat(101);
589        let result = Projection::new(test_tenant_id(), long_name, 1, ProjectionType::Custom);
590
591        assert!(result.is_err());
592    }
593
594    #[test]
595    fn test_reject_invalid_name_characters() {
596        let result = Projection::new(
597            test_tenant_id(),
598            "invalid name!".to_string(),
599            1,
600            ProjectionType::Custom,
601        );
602
603        assert!(result.is_err());
604    }
605
606    #[test]
607    fn test_accept_valid_names() {
608        let names = vec!["user_snapshot", "event-counter", "projection123"];
609
610        for name in names {
611            let result = Projection::new(
612                test_tenant_id(),
613                name.to_string(),
614                1,
615                ProjectionType::Custom,
616            );
617            assert!(result.is_ok(), "Name '{}' should be valid", name);
618        }
619    }
620
621    #[test]
622    fn test_reject_zero_version() {
623        let result = Projection::new(
624            test_tenant_id(),
625            "test_projection".to_string(),
626            0,
627            ProjectionType::Custom,
628        );
629
630        assert!(result.is_err());
631    }
632
633    #[test]
634    fn test_start_projection() {
635        let mut projection =
636            Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
637                .unwrap();
638
639        assert_eq!(projection.status(), ProjectionStatus::Created);
640        assert!(projection.started_at.is_none());
641
642        let result = projection.start();
643        assert!(result.is_ok());
644        assert_eq!(projection.status(), ProjectionStatus::Running);
645        assert!(projection.started_at.is_some());
646    }
647
648    #[test]
649    fn test_cannot_start_running_projection() {
650        let mut projection =
651            Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
652                .unwrap();
653
654        projection.start().unwrap();
655        let result = projection.start();
656        assert!(result.is_err());
657    }
658
659    #[test]
660    fn test_pause_projection() {
661        let mut projection =
662            Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
663                .unwrap();
664
665        projection.start().unwrap();
666        let result = projection.pause();
667        assert!(result.is_ok());
668        assert_eq!(projection.status(), ProjectionStatus::Paused);
669    }
670
671    #[test]
672    fn test_cannot_pause_non_running_projection() {
673        let mut projection =
674            Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
675                .unwrap();
676
677        let result = projection.pause();
678        assert!(result.is_err());
679    }
680
681    #[test]
682    fn test_stop_projection() {
683        let mut projection =
684            Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
685                .unwrap();
686
687        projection.start().unwrap();
688        let result = projection.stop();
689        assert!(result.is_ok());
690        assert_eq!(projection.status(), ProjectionStatus::Stopped);
691        assert!(projection.stopped_at.is_some());
692    }
693
694    #[test]
695    fn test_mark_failed() {
696        let mut projection =
697            Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
698                .unwrap();
699
700        projection.start().unwrap();
701        projection.mark_failed();
702        assert_eq!(projection.status(), ProjectionStatus::Failed);
703        assert!(projection.status().is_failed());
704    }
705
706    #[test]
707    fn test_start_rebuild() {
708        let mut projection =
709            Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
710                .unwrap();
711
712        // Process some events first
713        projection.stats_mut().record_event_processed(10);
714        assert_eq!(projection.stats().events_processed(), 1);
715
716        let result = projection.start_rebuild();
717        assert!(result.is_ok());
718        assert_eq!(projection.status(), ProjectionStatus::Rebuilding);
719        assert_eq!(projection.stats().events_processed(), 0); // Stats reset
720    }
721
722    #[test]
723    fn test_set_description() {
724        let mut projection =
725            Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
726                .unwrap();
727
728        let result = projection.set_description("Test projection".to_string());
729        assert!(result.is_ok());
730        assert_eq!(projection.description(), Some("Test projection"));
731    }
732
733    #[test]
734    fn test_reject_too_long_description() {
735        let mut projection =
736            Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
737                .unwrap();
738
739        let long_desc = "a".repeat(1001);
740        let result = projection.set_description(long_desc);
741        assert!(result.is_err());
742    }
743
744    #[test]
745    fn test_add_event_type() {
746        let mut projection =
747            Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
748                .unwrap();
749
750        let event_type = test_event_type();
751        let result = projection.add_event_type(event_type.clone());
752        assert!(result.is_ok());
753        assert_eq!(projection.event_types().len(), 1);
754        assert!(projection.processes_event_type(&event_type));
755    }
756
757    #[test]
758    fn test_reject_duplicate_event_type() {
759        let mut projection =
760            Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
761                .unwrap();
762
763        let event_type = test_event_type();
764        projection.add_event_type(event_type.clone()).unwrap();
765        let result = projection.add_event_type(event_type);
766        assert!(result.is_err());
767    }
768
769    #[test]
770    fn test_remove_event_type() {
771        let mut projection =
772            Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
773                .unwrap();
774
775        let event_type = test_event_type();
776        projection.add_event_type(event_type.clone()).unwrap();
777
778        let result = projection.remove_event_type(&event_type);
779        assert!(result.is_ok());
780        assert_eq!(projection.event_types().len(), 0);
781    }
782
783    #[test]
784    fn test_processes_all_events_when_no_filter() {
785        let projection =
786            Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
787                .unwrap();
788
789        let event_type = test_event_type();
790        assert!(projection.processes_event_type(&event_type));
791    }
792
793    #[test]
794    fn test_projection_stats() {
795        let mut projection =
796            Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
797                .unwrap();
798
799        // Record some events
800        projection.stats_mut().record_event_processed(10);
801        projection.stats_mut().record_event_processed(20);
802        projection.stats_mut().record_event_processed(30);
803
804        assert_eq!(projection.stats().events_processed(), 3);
805        assert_eq!(projection.stats().processing_time_ms(), 60);
806        assert_eq!(projection.stats().avg_processing_time_ms(), 20.0);
807    }
808
809    #[test]
810    fn test_stats_record_error() {
811        let mut projection =
812            Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
813                .unwrap();
814
815        projection.stats_mut().record_error();
816        projection.stats_mut().record_error();
817
818        assert_eq!(projection.stats().errors_count(), 2);
819    }
820
821    #[test]
822    fn test_belongs_to_tenant() {
823        let tenant1 = TenantId::new("tenant1".to_string()).unwrap();
824        let tenant2 = TenantId::new("tenant2".to_string()).unwrap();
825
826        let projection =
827            Projection::new_v1(tenant1.clone(), "test".to_string(), ProjectionType::Custom)
828                .unwrap();
829
830        assert!(projection.belongs_to_tenant(&tenant1));
831        assert!(!projection.belongs_to_tenant(&tenant2));
832    }
833
834    #[test]
835    fn test_create_next_version() {
836        let projection_v1 = Projection::new_v1(
837            test_tenant_id(),
838            "test_projection".to_string(),
839            ProjectionType::Custom,
840        )
841        .unwrap();
842
843        let projection_v2 = projection_v1.create_next_version();
844        assert!(projection_v2.is_ok());
845
846        let projection_v2 = projection_v2.unwrap();
847        assert_eq!(projection_v2.version(), 2);
848        assert_eq!(projection_v2.name(), "test_projection");
849        assert_eq!(projection_v2.projection_type(), ProjectionType::Custom);
850        assert!(!projection_v2.is_first_version());
851    }
852
853    #[test]
854    fn test_projection_status_checks() {
855        assert!(ProjectionStatus::Running.is_active());
856        assert!(ProjectionStatus::Rebuilding.is_active());
857        assert!(!ProjectionStatus::Paused.is_active());
858
859        assert!(ProjectionStatus::Created.can_start());
860        assert!(ProjectionStatus::Stopped.can_start());
861        assert!(!ProjectionStatus::Running.can_start());
862
863        assert!(ProjectionStatus::Running.can_pause());
864        assert!(!ProjectionStatus::Created.can_pause());
865
866        assert!(ProjectionStatus::Running.can_stop());
867        assert!(!ProjectionStatus::Stopped.can_stop());
868
869        assert!(ProjectionStatus::Failed.is_failed());
870        assert!(!ProjectionStatus::Running.is_failed());
871    }
872
873    #[test]
874    fn test_update_config() {
875        let mut projection =
876            Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
877                .unwrap();
878
879        let config = ProjectionConfig {
880            batch_size: 500,
881            parallel_processing: true,
882            ..Default::default()
883        };
884
885        projection.update_config(config);
886        assert_eq!(projection.config().batch_size, 500);
887        assert!(projection.config().parallel_processing);
888    }
889
890    #[test]
891    fn test_serde_serialization() {
892        let projection =
893            Projection::new_v1(test_tenant_id(), "test".to_string(), ProjectionType::Custom)
894                .unwrap();
895
896        // Should be able to serialize
897        let json = serde_json::to_string(&projection);
898        assert!(json.is_ok());
899
900        // Should be able to deserialize
901        let deserialized = serde_json::from_str::<Projection>(&json.unwrap());
902        assert!(deserialized.is_ok());
903    }
904}