allsource_core/application/use_cases/
manage_projection.rs

1use crate::application::dto::{
2    CreateProjectionRequest, CreateProjectionResponse, ListProjectionsResponse,
3    ProjectionConfigDto, ProjectionDto, ProjectionTypeDto, UpdateProjectionRequest,
4};
5use crate::domain::entities::{Projection, ProjectionConfig, ProjectionType};
6use crate::domain::value_objects::{EventType, TenantId};
7use crate::error::Result;
8
9/// Use Case: Create Projection
10///
11/// Creates a new projection for processing events.
12pub struct CreateProjectionUseCase;
13
14impl CreateProjectionUseCase {
15    pub fn execute(request: CreateProjectionRequest) -> Result<CreateProjectionResponse> {
16        // Validate tenant ID
17        let tenant_id = TenantId::new(request.tenant_id)?;
18
19        // Convert projection type
20        let projection_type = ProjectionType::from(request.projection_type);
21
22        // Create projection
23        let mut projection = Projection::new_v1(tenant_id, request.name, projection_type)?;
24
25        // Set config if provided
26        if let Some(config_dto) = request.config {
27            let config = ProjectionConfig::from(config_dto);
28            projection.update_config(config);
29        }
30
31        // Set description if provided
32        if let Some(description) = request.description {
33            projection.set_description(description)?;
34        }
35
36        // Add event types
37        for event_type_str in request.event_types {
38            let event_type = EventType::new(event_type_str)?;
39            projection.add_event_type(event_type)?;
40        }
41
42        Ok(CreateProjectionResponse {
43            projection: ProjectionDto::from(&projection),
44        })
45    }
46}
47
48/// Use Case: Update Projection
49///
50/// Updates projection configuration and metadata.
51pub struct UpdateProjectionUseCase;
52
53impl UpdateProjectionUseCase {
54    pub fn execute(
55        mut projection: Projection,
56        request: UpdateProjectionRequest,
57    ) -> Result<ProjectionDto> {
58        // Update description if provided
59        if let Some(description) = request.description {
60            projection.set_description(description)?;
61        }
62
63        // Update config if provided
64        if let Some(config_dto) = request.config {
65            projection.update_config(ProjectionConfig::from(config_dto));
66        }
67
68        // Update event types if provided
69        if let Some(event_types) = request.event_types {
70            // Clear existing and add new ones
71            let existing = projection.event_types().to_vec();
72            for event_type in existing {
73                projection.remove_event_type(&event_type);
74            }
75            for event_type_str in event_types {
76                let event_type = EventType::new(event_type_str)?;
77                projection.add_event_type(event_type)?;
78            }
79        }
80
81        Ok(ProjectionDto::from(&projection))
82    }
83}
84
85/// Use Case: Start Projection
86///
87/// Starts a created or paused projection.
88pub struct StartProjectionUseCase;
89
90impl StartProjectionUseCase {
91    pub fn execute(mut projection: Projection) -> Result<ProjectionDto> {
92        projection.start()?;
93        Ok(ProjectionDto::from(&projection))
94    }
95}
96
97/// Use Case: Pause Projection
98///
99/// Pauses a running projection.
100pub struct PauseProjectionUseCase;
101
102impl PauseProjectionUseCase {
103    pub fn execute(mut projection: Projection) -> Result<ProjectionDto> {
104        projection.pause()?;
105        Ok(ProjectionDto::from(&projection))
106    }
107}
108
109/// Use Case: Stop Projection
110///
111/// Stops a projection completely.
112pub struct StopProjectionUseCase;
113
114impl StopProjectionUseCase {
115    pub fn execute(mut projection: Projection) -> Result<ProjectionDto> {
116        projection.stop();
117        Ok(ProjectionDto::from(&projection))
118    }
119}
120
121/// Use Case: Rebuild Projection
122///
123/// Starts rebuilding a projection from scratch.
124pub struct RebuildProjectionUseCase;
125
126impl RebuildProjectionUseCase {
127    pub fn execute(mut projection: Projection) -> Result<ProjectionDto> {
128        projection.start_rebuild();
129        Ok(ProjectionDto::from(&projection))
130    }
131}
132
133/// Use Case: List Projections
134///
135/// Returns a list of all projections for a tenant.
136pub struct ListProjectionsUseCase;
137
138impl ListProjectionsUseCase {
139    pub fn execute(projections: Vec<Projection>) -> ListProjectionsResponse {
140        let projection_dtos: Vec<ProjectionDto> =
141            projections.iter().map(ProjectionDto::from).collect();
142        let count = projection_dtos.len();
143
144        ListProjectionsResponse {
145            projections: projection_dtos,
146            count,
147        }
148    }
149}
150
151#[cfg(test)]
152mod tests {
153    use super::*;
154
155    #[test]
156    fn test_create_projection() {
157        let request = CreateProjectionRequest {
158            name: "user-snapshots".to_string(),
159            projection_type: ProjectionTypeDto::EntitySnapshot,
160            tenant_id: "tenant-1".to_string(),
161            event_types: vec!["user.created".to_string(), "user.updated".to_string()],
162            description: Some("User state snapshots".to_string()),
163            config: Some(ProjectionConfigDto {
164                batch_size: Some(500),
165                checkpoint_interval: Some(5000),
166            }),
167        };
168
169        let response = CreateProjectionUseCase::execute(request);
170        assert!(response.is_ok());
171
172        let response = response.unwrap();
173        assert_eq!(response.projection.name, "user-snapshots");
174        assert_eq!(response.projection.event_types.len(), 2);
175    }
176
177    #[test]
178    fn test_projection_lifecycle() {
179        let tenant_id = TenantId::new("tenant-1".to_string()).unwrap();
180        let mut projection = Projection::new_v1(
181            tenant_id,
182            "test-projection".to_string(),
183            ProjectionType::EventCounter,
184        )
185        .unwrap();
186
187        projection
188            .add_event_type(
189                crate::domain::value_objects::EventType::new("test.event".to_string()).unwrap(),
190            )
191            .unwrap();
192
193        // Start
194        let result = StartProjectionUseCase::execute(projection.clone());
195        assert!(result.is_ok());
196        assert_eq!(
197            result.unwrap().status,
198            crate::application::dto::ProjectionStatusDto::Running
199        );
200
201        // Pause
202        projection.start().unwrap();
203        let result = PauseProjectionUseCase::execute(projection.clone());
204        assert!(result.is_ok());
205        assert_eq!(
206            result.unwrap().status,
207            crate::application::dto::ProjectionStatusDto::Paused
208        );
209
210        // Rebuild
211        let result = RebuildProjectionUseCase::execute(projection.clone());
212        assert!(result.is_ok());
213        assert_eq!(
214            result.unwrap().status,
215            crate::application::dto::ProjectionStatusDto::Rebuilding
216        );
217    }
218
219    #[test]
220    fn test_update_projection() {
221        let tenant_id = TenantId::new("tenant-1".to_string()).unwrap();
222        let mut projection = Projection::new_v1(
223            tenant_id,
224            "test-projection".to_string(),
225            ProjectionType::Custom,
226        )
227        .unwrap();
228
229        projection
230            .add_event_type(
231                crate::domain::value_objects::EventType::new("old.event".to_string()).unwrap(),
232            )
233            .unwrap();
234
235        let request = UpdateProjectionRequest {
236            description: Some("Updated description".to_string()),
237            config: Some(ProjectionConfigDto {
238                batch_size: Some(1000),
239                checkpoint_interval: Some(10000),
240            }),
241            event_types: Some(vec!["new.event".to_string()]),
242        };
243
244        let result = UpdateProjectionUseCase::execute(projection, request);
245        assert!(result.is_ok());
246
247        let updated = result.unwrap();
248        assert_eq!(updated.description, Some("Updated description".to_string()));
249        assert_eq!(updated.event_types, vec!["new.event".to_string()]);
250        assert_eq!(updated.config.batch_size, Some(1000));
251    }
252
253    #[test]
254    fn test_list_projections() {
255        let tenant_id = TenantId::new("tenant-1".to_string()).unwrap();
256        let projections = vec![
257            Projection::new_v1(
258                tenant_id.clone(),
259                "projection-1".to_string(),
260                ProjectionType::EntitySnapshot,
261            )
262            .unwrap(),
263            Projection::new_v1(
264                tenant_id,
265                "projection-2".to_string(),
266                ProjectionType::EventCounter,
267            )
268            .unwrap(),
269        ];
270
271        let response = ListProjectionsUseCase::execute(projections);
272        assert_eq!(response.count, 2);
273        assert_eq!(response.projections.len(), 2);
274    }
275}