allsource_core/application/use_cases/
manage_projection.rs

1use crate::application::dto::{
2    CreateProjectionRequest, CreateProjectionResponse, ListProjectionsResponse, ProjectionDto,
3    UpdateProjectionRequest,
4};
5use crate::domain::entities::{Projection, ProjectionConfig, ProjectionType};
6use crate::domain::value_objects::{EventType, TenantId};
7use crate::error::Result;
8
9/// Use Case: Create Projection
10///
11/// Creates a new projection for processing events.
12pub struct CreateProjectionUseCase;
13
14impl CreateProjectionUseCase {
15    pub fn execute(request: CreateProjectionRequest) -> Result<CreateProjectionResponse> {
16        // Validate tenant ID
17        let tenant_id = TenantId::new(request.tenant_id)?;
18
19        // Convert projection type
20        let projection_type = ProjectionType::from(request.projection_type);
21
22        // Create projection
23        let mut projection = Projection::new_v1(tenant_id, request.name, projection_type)?;
24
25        // Set config if provided
26        if let Some(config_dto) = request.config {
27            let config = ProjectionConfig::from(config_dto);
28            projection.update_config(config);
29        }
30
31        // Set description if provided
32        if let Some(description) = request.description {
33            projection.set_description(description)?;
34        }
35
36        // Add event types
37        for event_type_str in request.event_types {
38            let event_type = EventType::new(event_type_str)?;
39            projection.add_event_type(event_type)?;
40        }
41
42        Ok(CreateProjectionResponse {
43            projection: ProjectionDto::from(&projection),
44        })
45    }
46}
47
48/// Use Case: Update Projection
49///
50/// Updates projection configuration and metadata.
51pub struct UpdateProjectionUseCase;
52
53impl UpdateProjectionUseCase {
54    pub fn execute(
55        mut projection: Projection,
56        request: UpdateProjectionRequest,
57    ) -> Result<ProjectionDto> {
58        // Update description if provided
59        if let Some(description) = request.description {
60            projection.set_description(description)?;
61        }
62
63        // Update config if provided
64        if let Some(config_dto) = request.config {
65            projection.update_config(ProjectionConfig::from(config_dto));
66        }
67
68        // Update event types if provided
69        if let Some(event_types) = request.event_types {
70            // Clear existing and add new ones
71            let existing = projection.event_types().to_vec();
72            for event_type in existing {
73                projection.remove_event_type(&event_type);
74            }
75            for event_type_str in event_types {
76                let event_type = EventType::new(event_type_str)?;
77                projection.add_event_type(event_type)?;
78            }
79        }
80
81        Ok(ProjectionDto::from(&projection))
82    }
83}
84
85/// Use Case: Start Projection
86///
87/// Starts a created or paused projection.
88pub struct StartProjectionUseCase;
89
90impl StartProjectionUseCase {
91    pub fn execute(mut projection: Projection) -> Result<ProjectionDto> {
92        projection.start()?;
93        Ok(ProjectionDto::from(&projection))
94    }
95}
96
97/// Use Case: Pause Projection
98///
99/// Pauses a running projection.
100pub struct PauseProjectionUseCase;
101
102impl PauseProjectionUseCase {
103    pub fn execute(mut projection: Projection) -> Result<ProjectionDto> {
104        projection.pause()?;
105        Ok(ProjectionDto::from(&projection))
106    }
107}
108
109/// Use Case: Stop Projection
110///
111/// Stops a projection completely.
112pub struct StopProjectionUseCase;
113
114impl StopProjectionUseCase {
115    pub fn execute(mut projection: Projection) -> Result<ProjectionDto> {
116        projection.stop();
117        Ok(ProjectionDto::from(&projection))
118    }
119}
120
121/// Use Case: Rebuild Projection
122///
123/// Starts rebuilding a projection from scratch.
124pub struct RebuildProjectionUseCase;
125
126impl RebuildProjectionUseCase {
127    pub fn execute(mut projection: Projection) -> Result<ProjectionDto> {
128        projection.start_rebuild();
129        Ok(ProjectionDto::from(&projection))
130    }
131}
132
133/// Use Case: List Projections
134///
135/// Returns a list of all projections for a tenant.
136pub struct ListProjectionsUseCase;
137
138impl ListProjectionsUseCase {
139    pub fn execute(projections: Vec<Projection>) -> ListProjectionsResponse {
140        let projection_dtos: Vec<ProjectionDto> =
141            projections.iter().map(ProjectionDto::from).collect();
142        let count = projection_dtos.len();
143
144        ListProjectionsResponse {
145            projections: projection_dtos,
146            count,
147        }
148    }
149}
150
151#[cfg(test)]
152mod tests {
153    use super::*;
154    use crate::application::dto::{ProjectionConfigDto, ProjectionTypeDto};
155
156    #[test]
157    fn test_create_projection() {
158        let request = CreateProjectionRequest {
159            name: "user-snapshots".to_string(),
160            projection_type: ProjectionTypeDto::EntitySnapshot,
161            tenant_id: "tenant-1".to_string(),
162            event_types: vec!["user.created".to_string(), "user.updated".to_string()],
163            description: Some("User state snapshots".to_string()),
164            config: Some(ProjectionConfigDto {
165                batch_size: Some(500),
166                checkpoint_interval: Some(5000),
167            }),
168        };
169
170        let response = CreateProjectionUseCase::execute(request);
171        assert!(response.is_ok());
172
173        let response = response.unwrap();
174        assert_eq!(response.projection.name, "user-snapshots");
175        assert_eq!(response.projection.event_types.len(), 2);
176    }
177
178    #[test]
179    fn test_projection_lifecycle() {
180        let tenant_id = TenantId::new("tenant-1".to_string()).unwrap();
181        let mut projection = Projection::new_v1(
182            tenant_id,
183            "test-projection".to_string(),
184            ProjectionType::EventCounter,
185        )
186        .unwrap();
187
188        projection
189            .add_event_type(
190                crate::domain::value_objects::EventType::new("test.event".to_string()).unwrap(),
191            )
192            .unwrap();
193
194        // Start
195        let result = StartProjectionUseCase::execute(projection.clone());
196        assert!(result.is_ok());
197        assert_eq!(
198            result.unwrap().status,
199            crate::application::dto::ProjectionStatusDto::Running
200        );
201
202        // Pause
203        projection.start().unwrap();
204        let result = PauseProjectionUseCase::execute(projection.clone());
205        assert!(result.is_ok());
206        assert_eq!(
207            result.unwrap().status,
208            crate::application::dto::ProjectionStatusDto::Paused
209        );
210
211        // Rebuild
212        let result = RebuildProjectionUseCase::execute(projection.clone());
213        assert!(result.is_ok());
214        assert_eq!(
215            result.unwrap().status,
216            crate::application::dto::ProjectionStatusDto::Rebuilding
217        );
218    }
219
220    #[test]
221    fn test_update_projection() {
222        let tenant_id = TenantId::new("tenant-1".to_string()).unwrap();
223        let mut projection = Projection::new_v1(
224            tenant_id,
225            "test-projection".to_string(),
226            ProjectionType::Custom,
227        )
228        .unwrap();
229
230        projection
231            .add_event_type(
232                crate::domain::value_objects::EventType::new("old.event".to_string()).unwrap(),
233            )
234            .unwrap();
235
236        let request = UpdateProjectionRequest {
237            description: Some("Updated description".to_string()),
238            config: Some(ProjectionConfigDto {
239                batch_size: Some(1000),
240                checkpoint_interval: Some(10000),
241            }),
242            event_types: Some(vec!["new.event".to_string()]),
243        };
244
245        let result = UpdateProjectionUseCase::execute(projection, request);
246        assert!(result.is_ok());
247
248        let updated = result.unwrap();
249        assert_eq!(updated.description, Some("Updated description".to_string()));
250        assert_eq!(updated.event_types, vec!["new.event".to_string()]);
251        assert_eq!(updated.config.batch_size, Some(1000));
252    }
253
254    #[test]
255    fn test_list_projections() {
256        let tenant_id = TenantId::new("tenant-1".to_string()).unwrap();
257        let projections = vec![
258            Projection::new_v1(
259                tenant_id.clone(),
260                "projection-1".to_string(),
261                ProjectionType::EntitySnapshot,
262            )
263            .unwrap(),
264            Projection::new_v1(
265                tenant_id,
266                "projection-2".to_string(),
267                ProjectionType::EventCounter,
268            )
269            .unwrap(),
270        ];
271
272        let response = ListProjectionsUseCase::execute(projections);
273        assert_eq!(response.count, 2);
274        assert_eq!(response.projections.len(), 2);
275    }
276}