Skip to main content

allsource_core/application/use_cases/
manage_projection.rs

1use crate::{
2    application::dto::{
3        CreateProjectionRequest, CreateProjectionResponse, ListProjectionsResponse, ProjectionDto,
4        UpdateProjectionRequest,
5    },
6    domain::{
7        entities::{Projection, ProjectionConfig, ProjectionType},
8        value_objects::{EventType, TenantId},
9    },
10    error::Result,
11};
12
13/// Use Case: Create Projection
14///
15/// Creates a new projection for processing events.
16pub struct CreateProjectionUseCase;
17
18impl CreateProjectionUseCase {
19    pub fn execute(request: CreateProjectionRequest) -> Result<CreateProjectionResponse> {
20        // Validate tenant ID
21        let tenant_id = TenantId::new(request.tenant_id)?;
22
23        // Convert projection type
24        let projection_type = ProjectionType::from(request.projection_type);
25
26        // Create projection
27        let mut projection = Projection::new_v1(tenant_id, request.name, projection_type)?;
28
29        // Set config if provided
30        if let Some(config_dto) = request.config {
31            let config = ProjectionConfig::from(config_dto);
32            projection.update_config(config);
33        }
34
35        // Set description if provided
36        if let Some(description) = request.description {
37            projection.set_description(description)?;
38        }
39
40        // Add event types
41        for event_type_str in request.event_types {
42            let event_type = EventType::new(event_type_str)?;
43            projection.add_event_type(event_type)?;
44        }
45
46        Ok(CreateProjectionResponse {
47            projection: ProjectionDto::from(&projection),
48        })
49    }
50}
51
52/// Use Case: Update Projection
53///
54/// Updates projection configuration and metadata.
55pub struct UpdateProjectionUseCase;
56
57impl UpdateProjectionUseCase {
58    pub fn execute(
59        mut projection: Projection,
60        request: UpdateProjectionRequest,
61    ) -> Result<ProjectionDto> {
62        // Update description if provided
63        if let Some(description) = request.description {
64            projection.set_description(description)?;
65        }
66
67        // Update config if provided
68        if let Some(config_dto) = request.config {
69            projection.update_config(ProjectionConfig::from(config_dto));
70        }
71
72        // Update event types if provided
73        if let Some(event_types) = request.event_types {
74            // Clear existing and add new ones
75            let existing = projection.event_types().to_vec();
76            for event_type in existing {
77                projection.remove_event_type(&event_type)?;
78            }
79            for event_type_str in event_types {
80                let event_type = EventType::new(event_type_str)?;
81                projection.add_event_type(event_type)?;
82            }
83        }
84
85        Ok(ProjectionDto::from(&projection))
86    }
87}
88
89/// Use Case: Start Projection
90///
91/// Starts a created or paused projection.
92pub struct StartProjectionUseCase;
93
94impl StartProjectionUseCase {
95    pub fn execute(mut projection: Projection) -> Result<ProjectionDto> {
96        projection.start()?;
97        Ok(ProjectionDto::from(&projection))
98    }
99}
100
101/// Use Case: Pause Projection
102///
103/// Pauses a running projection.
104pub struct PauseProjectionUseCase;
105
106impl PauseProjectionUseCase {
107    pub fn execute(mut projection: Projection) -> Result<ProjectionDto> {
108        projection.pause()?;
109        Ok(ProjectionDto::from(&projection))
110    }
111}
112
113/// Use Case: Stop Projection
114///
115/// Stops a projection completely.
116pub struct StopProjectionUseCase;
117
118impl StopProjectionUseCase {
119    pub fn execute(mut projection: Projection) -> Result<ProjectionDto> {
120        projection.stop()?;
121        Ok(ProjectionDto::from(&projection))
122    }
123}
124
125/// Use Case: Rebuild Projection
126///
127/// Starts rebuilding a projection from scratch.
128pub struct RebuildProjectionUseCase;
129
130impl RebuildProjectionUseCase {
131    pub fn execute(mut projection: Projection) -> Result<ProjectionDto> {
132        projection.start_rebuild()?;
133        Ok(ProjectionDto::from(&projection))
134    }
135}
136
137/// Use Case: List Projections
138///
139/// Returns a list of all projections for a tenant.
140pub struct ListProjectionsUseCase;
141
142impl ListProjectionsUseCase {
143    pub fn execute(projections: &[Projection]) -> ListProjectionsResponse {
144        let projection_dtos: Vec<ProjectionDto> =
145            projections.iter().map(ProjectionDto::from).collect();
146        let count = projection_dtos.len();
147
148        ListProjectionsResponse {
149            projections: projection_dtos,
150            count,
151        }
152    }
153}
154
155#[cfg(test)]
156mod tests {
157    use super::*;
158    use crate::application::dto::{ProjectionConfigDto, ProjectionTypeDto};
159
160    #[test]
161    fn test_create_projection() {
162        let request = CreateProjectionRequest {
163            name: "user-snapshots".to_string(),
164            projection_type: ProjectionTypeDto::EntitySnapshot,
165            tenant_id: "tenant-1".to_string(),
166            event_types: vec!["user.created".to_string(), "user.updated".to_string()],
167            description: Some("User state snapshots".to_string()),
168            config: Some(ProjectionConfigDto {
169                batch_size: Some(500),
170                checkpoint_interval: Some(5000),
171            }),
172        };
173
174        let response = CreateProjectionUseCase::execute(request);
175        assert!(response.is_ok());
176
177        let response = response.unwrap();
178        assert_eq!(response.projection.name, "user-snapshots");
179        assert_eq!(response.projection.event_types.len(), 2);
180    }
181
182    #[test]
183    fn test_projection_lifecycle() {
184        let tenant_id = TenantId::new("tenant-1".to_string()).unwrap();
185        let mut projection = Projection::new_v1(
186            tenant_id,
187            "test-projection".to_string(),
188            ProjectionType::EventCounter,
189        )
190        .unwrap();
191
192        projection
193            .add_event_type(
194                crate::domain::value_objects::EventType::new("test.event".to_string()).unwrap(),
195            )
196            .unwrap();
197
198        // Start
199        let result = StartProjectionUseCase::execute(projection.clone());
200        assert!(result.is_ok());
201        assert_eq!(
202            result.unwrap().status,
203            crate::application::dto::ProjectionStatusDto::Running
204        );
205
206        // Pause
207        projection.start().unwrap();
208        let result = PauseProjectionUseCase::execute(projection.clone());
209        assert!(result.is_ok());
210        assert_eq!(
211            result.unwrap().status,
212            crate::application::dto::ProjectionStatusDto::Paused
213        );
214
215        // Rebuild
216        let result = RebuildProjectionUseCase::execute(projection.clone());
217        assert!(result.is_ok());
218        assert_eq!(
219            result.unwrap().status,
220            crate::application::dto::ProjectionStatusDto::Rebuilding
221        );
222    }
223
224    #[test]
225    fn test_update_projection() {
226        let tenant_id = TenantId::new("tenant-1".to_string()).unwrap();
227        let mut projection = Projection::new_v1(
228            tenant_id,
229            "test-projection".to_string(),
230            ProjectionType::Custom,
231        )
232        .unwrap();
233
234        projection
235            .add_event_type(
236                crate::domain::value_objects::EventType::new("old.event".to_string()).unwrap(),
237            )
238            .unwrap();
239
240        let request = UpdateProjectionRequest {
241            description: Some("Updated description".to_string()),
242            config: Some(ProjectionConfigDto {
243                batch_size: Some(1000),
244                checkpoint_interval: Some(10000),
245            }),
246            event_types: Some(vec!["new.event".to_string()]),
247        };
248
249        let result = UpdateProjectionUseCase::execute(projection, request);
250        assert!(result.is_ok());
251
252        let updated = result.unwrap();
253        assert_eq!(updated.description, Some("Updated description".to_string()));
254        assert_eq!(updated.event_types, vec!["new.event".to_string()]);
255        assert_eq!(updated.config.batch_size, Some(1000));
256    }
257
258    #[test]
259    fn test_list_projections() {
260        let tenant_id = TenantId::new("tenant-1".to_string()).unwrap();
261        let projections = vec![
262            Projection::new_v1(
263                tenant_id.clone(),
264                "projection-1".to_string(),
265                ProjectionType::EntitySnapshot,
266            )
267            .unwrap(),
268            Projection::new_v1(
269                tenant_id,
270                "projection-2".to_string(),
271                ProjectionType::EventCounter,
272            )
273            .unwrap(),
274        ];
275
276        let response = ListProjectionsUseCase::execute(&projections);
277        assert_eq!(response.count, 2);
278        assert_eq!(response.projections.len(), 2);
279    }
280}