allsource_core/application/use_cases/
manage_projection.rs

1use crate::application::dto::{
2    CreateProjectionRequest, CreateProjectionResponse, UpdateProjectionRequest,
3    ListProjectionsResponse, ProjectionDto, ProjectionTypeDto, ProjectionConfigDto,
4};
5use crate::domain::entities::{Projection, ProjectionType, ProjectionConfig};
6use crate::domain::value_objects::{TenantId, EventType};
7use crate::error::Result;
8
9/// Use Case: Create Projection
10///
11/// Creates a new projection for processing events.
12pub struct CreateProjectionUseCase;
13
14impl CreateProjectionUseCase {
15    pub fn execute(request: CreateProjectionRequest) -> Result<CreateProjectionResponse> {
16        // Validate tenant ID
17        let tenant_id = TenantId::new(request.tenant_id)?;
18
19        // Convert projection type
20        let projection_type = ProjectionType::from(request.projection_type);
21
22        // Create projection
23        let mut projection = Projection::new_v1(
24            tenant_id,
25            request.name,
26            projection_type,
27        )?;
28
29        // Set config if provided
30        if let Some(config_dto) = request.config {
31            let config = ProjectionConfig::from(config_dto);
32            projection.update_config(config);
33        }
34
35        // Set description if provided
36        if let Some(description) = request.description {
37            projection.set_description(description)?;
38        }
39
40        // Add event types
41        for event_type_str in request.event_types {
42            let event_type = EventType::new(event_type_str)?;
43            projection.add_event_type(event_type)?;
44        }
45
46        Ok(CreateProjectionResponse {
47            projection: ProjectionDto::from(&projection),
48        })
49    }
50}
51
52/// Use Case: Update Projection
53///
54/// Updates projection configuration and metadata.
55pub struct UpdateProjectionUseCase;
56
57impl UpdateProjectionUseCase {
58    pub fn execute(mut projection: Projection, request: UpdateProjectionRequest) -> Result<ProjectionDto> {
59        // Update description if provided
60        if let Some(description) = request.description {
61            projection.set_description(description)?;
62        }
63
64        // Update config if provided
65        if let Some(config_dto) = request.config {
66            projection.update_config(ProjectionConfig::from(config_dto));
67        }
68
69        // Update event types if provided
70        if let Some(event_types) = request.event_types {
71            // Clear existing and add new ones
72            let existing = projection.event_types().to_vec();
73            for event_type in existing {
74                projection.remove_event_type(&event_type);
75            }
76            for event_type_str in event_types {
77                let event_type = EventType::new(event_type_str)?;
78                projection.add_event_type(event_type)?;
79            }
80        }
81
82        Ok(ProjectionDto::from(&projection))
83    }
84}
85
86/// Use Case: Start Projection
87///
88/// Starts a created or paused projection.
89pub struct StartProjectionUseCase;
90
91impl StartProjectionUseCase {
92    pub fn execute(mut projection: Projection) -> Result<ProjectionDto> {
93        projection.start()?;
94        Ok(ProjectionDto::from(&projection))
95    }
96}
97
98/// Use Case: Pause Projection
99///
100/// Pauses a running projection.
101pub struct PauseProjectionUseCase;
102
103impl PauseProjectionUseCase {
104    pub fn execute(mut projection: Projection) -> Result<ProjectionDto> {
105        projection.pause()?;
106        Ok(ProjectionDto::from(&projection))
107    }
108}
109
110/// Use Case: Stop Projection
111///
112/// Stops a projection completely.
113pub struct StopProjectionUseCase;
114
115impl StopProjectionUseCase {
116    pub fn execute(mut projection: Projection) -> Result<ProjectionDto> {
117        projection.stop();
118        Ok(ProjectionDto::from(&projection))
119    }
120}
121
122/// Use Case: Rebuild Projection
123///
124/// Starts rebuilding a projection from scratch.
125pub struct RebuildProjectionUseCase;
126
127impl RebuildProjectionUseCase {
128    pub fn execute(mut projection: Projection) -> Result<ProjectionDto> {
129        projection.start_rebuild();
130        Ok(ProjectionDto::from(&projection))
131    }
132}
133
134/// Use Case: List Projections
135///
136/// Returns a list of all projections for a tenant.
137pub struct ListProjectionsUseCase;
138
139impl ListProjectionsUseCase {
140    pub fn execute(projections: Vec<Projection>) -> ListProjectionsResponse {
141        let projection_dtos: Vec<ProjectionDto> = projections.iter().map(ProjectionDto::from).collect();
142        let count = projection_dtos.len();
143
144        ListProjectionsResponse {
145            projections: projection_dtos,
146            count,
147        }
148    }
149}
150
151#[cfg(test)]
152mod tests {
153    use super::*;
154
155    #[test]
156    fn test_create_projection() {
157        let request = CreateProjectionRequest {
158            name: "user-snapshots".to_string(),
159            projection_type: ProjectionTypeDto::EntitySnapshot,
160            tenant_id: "tenant-1".to_string(),
161            event_types: vec!["user.created".to_string(), "user.updated".to_string()],
162            description: Some("User state snapshots".to_string()),
163            config: Some(ProjectionConfigDto {
164                batch_size: Some(500),
165                checkpoint_interval: Some(5000),
166            }),
167        };
168
169        let response = CreateProjectionUseCase::execute(request);
170        assert!(response.is_ok());
171
172        let response = response.unwrap();
173        assert_eq!(response.projection.name, "user-snapshots");
174        assert_eq!(response.projection.event_types.len(), 2);
175    }
176
177    #[test]
178    fn test_projection_lifecycle() {
179        let tenant_id = TenantId::new("tenant-1".to_string()).unwrap();
180        let mut projection = Projection::new_v1(
181            tenant_id,
182            "test-projection".to_string(),
183            ProjectionType::EventCounter,
184        )
185        .unwrap();
186
187        projection.add_event_type(crate::domain::value_objects::EventType::new("test.event".to_string()).unwrap()).unwrap();
188
189        // Start
190        let result = StartProjectionUseCase::execute(projection.clone());
191        assert!(result.is_ok());
192        assert_eq!(result.unwrap().status, crate::application::dto::ProjectionStatusDto::Running);
193
194        // Pause
195        projection.start().unwrap();
196        let result = PauseProjectionUseCase::execute(projection.clone());
197        assert!(result.is_ok());
198        assert_eq!(result.unwrap().status, crate::application::dto::ProjectionStatusDto::Paused);
199
200        // Rebuild
201        let result = RebuildProjectionUseCase::execute(projection.clone());
202        assert!(result.is_ok());
203        assert_eq!(result.unwrap().status, crate::application::dto::ProjectionStatusDto::Rebuilding);
204    }
205
206    #[test]
207    fn test_update_projection() {
208        let tenant_id = TenantId::new("tenant-1".to_string()).unwrap();
209        let mut projection = Projection::new_v1(
210            tenant_id,
211            "test-projection".to_string(),
212            ProjectionType::Custom,
213        )
214        .unwrap();
215
216        projection.add_event_type(crate::domain::value_objects::EventType::new("old.event".to_string()).unwrap()).unwrap();
217
218        let request = UpdateProjectionRequest {
219            description: Some("Updated description".to_string()),
220            config: Some(ProjectionConfigDto {
221                batch_size: Some(1000),
222                checkpoint_interval: Some(10000),
223            }),
224            event_types: Some(vec!["new.event".to_string()]),
225        };
226
227        let result = UpdateProjectionUseCase::execute(projection, request);
228        assert!(result.is_ok());
229
230        let updated = result.unwrap();
231        assert_eq!(updated.description, Some("Updated description".to_string()));
232        assert_eq!(updated.event_types, vec!["new.event".to_string()]);
233        assert_eq!(updated.config.batch_size, Some(1000));
234    }
235
236    #[test]
237    fn test_list_projections() {
238        let tenant_id = TenantId::new("tenant-1".to_string()).unwrap();
239        let projections = vec![
240            Projection::new_v1(
241                tenant_id.clone(),
242                "projection-1".to_string(),
243                ProjectionType::EntitySnapshot,
244            )
245            .unwrap(),
246            Projection::new_v1(
247                tenant_id,
248                "projection-2".to_string(),
249                ProjectionType::EventCounter,
250            )
251            .unwrap(),
252        ];
253
254        let response = ListProjectionsUseCase::execute(projections);
255        assert_eq!(response.count, 2);
256        assert_eq!(response.projections.len(), 2);
257    }
258}