allsource_core/application/use_cases/
manage_projection.rs1use crate::{
2 application::dto::{
3 CreateProjectionRequest, CreateProjectionResponse, ListProjectionsResponse, ProjectionDto,
4 UpdateProjectionRequest,
5 },
6 domain::{
7 entities::{Projection, ProjectionConfig, ProjectionType},
8 value_objects::{EventType, TenantId},
9 },
10 error::Result,
11};
12
13pub struct CreateProjectionUseCase;
17
18impl CreateProjectionUseCase {
19 pub fn execute(request: CreateProjectionRequest) -> Result<CreateProjectionResponse> {
20 let tenant_id = TenantId::new(request.tenant_id)?;
22
23 let projection_type = ProjectionType::from(request.projection_type);
25
26 let mut projection = Projection::new_v1(tenant_id, request.name, projection_type)?;
28
29 if let Some(config_dto) = request.config {
31 let config = ProjectionConfig::from(config_dto);
32 projection.update_config(config);
33 }
34
35 if let Some(description) = request.description {
37 projection.set_description(description)?;
38 }
39
40 for event_type_str in request.event_types {
42 let event_type = EventType::new(event_type_str)?;
43 projection.add_event_type(event_type)?;
44 }
45
46 Ok(CreateProjectionResponse {
47 projection: ProjectionDto::from(&projection),
48 })
49 }
50}
51
52pub struct UpdateProjectionUseCase;
56
57impl UpdateProjectionUseCase {
58 pub fn execute(
59 mut projection: Projection,
60 request: UpdateProjectionRequest,
61 ) -> Result<ProjectionDto> {
62 if let Some(description) = request.description {
64 projection.set_description(description)?;
65 }
66
67 if let Some(config_dto) = request.config {
69 projection.update_config(ProjectionConfig::from(config_dto));
70 }
71
72 if let Some(event_types) = request.event_types {
74 let existing = projection.event_types().to_vec();
76 for event_type in existing {
77 projection.remove_event_type(&event_type)?;
78 }
79 for event_type_str in event_types {
80 let event_type = EventType::new(event_type_str)?;
81 projection.add_event_type(event_type)?;
82 }
83 }
84
85 Ok(ProjectionDto::from(&projection))
86 }
87}
88
89pub struct StartProjectionUseCase;
93
94impl StartProjectionUseCase {
95 pub fn execute(mut projection: Projection) -> Result<ProjectionDto> {
96 projection.start()?;
97 Ok(ProjectionDto::from(&projection))
98 }
99}
100
101pub struct PauseProjectionUseCase;
105
106impl PauseProjectionUseCase {
107 pub fn execute(mut projection: Projection) -> Result<ProjectionDto> {
108 projection.pause()?;
109 Ok(ProjectionDto::from(&projection))
110 }
111}
112
113pub struct StopProjectionUseCase;
117
118impl StopProjectionUseCase {
119 pub fn execute(mut projection: Projection) -> Result<ProjectionDto> {
120 projection.stop()?;
121 Ok(ProjectionDto::from(&projection))
122 }
123}
124
125pub struct RebuildProjectionUseCase;
129
130impl RebuildProjectionUseCase {
131 pub fn execute(mut projection: Projection) -> Result<ProjectionDto> {
132 projection.start_rebuild()?;
133 Ok(ProjectionDto::from(&projection))
134 }
135}
136
137pub struct ListProjectionsUseCase;
141
142impl ListProjectionsUseCase {
143 pub fn execute(projections: Vec<Projection>) -> ListProjectionsResponse {
144 let projection_dtos: Vec<ProjectionDto> =
145 projections.iter().map(ProjectionDto::from).collect();
146 let count = projection_dtos.len();
147
148 ListProjectionsResponse {
149 projections: projection_dtos,
150 count,
151 }
152 }
153}
154
155#[cfg(test)]
156mod tests {
157 use super::*;
158 use crate::application::dto::{ProjectionConfigDto, ProjectionTypeDto};
159
160 #[test]
161 fn test_create_projection() {
162 let request = CreateProjectionRequest {
163 name: "user-snapshots".to_string(),
164 projection_type: ProjectionTypeDto::EntitySnapshot,
165 tenant_id: "tenant-1".to_string(),
166 event_types: vec!["user.created".to_string(), "user.updated".to_string()],
167 description: Some("User state snapshots".to_string()),
168 config: Some(ProjectionConfigDto {
169 batch_size: Some(500),
170 checkpoint_interval: Some(5000),
171 }),
172 };
173
174 let response = CreateProjectionUseCase::execute(request);
175 assert!(response.is_ok());
176
177 let response = response.unwrap();
178 assert_eq!(response.projection.name, "user-snapshots");
179 assert_eq!(response.projection.event_types.len(), 2);
180 }
181
182 #[test]
183 fn test_projection_lifecycle() {
184 let tenant_id = TenantId::new("tenant-1".to_string()).unwrap();
185 let mut projection = Projection::new_v1(
186 tenant_id,
187 "test-projection".to_string(),
188 ProjectionType::EventCounter,
189 )
190 .unwrap();
191
192 projection
193 .add_event_type(
194 crate::domain::value_objects::EventType::new("test.event".to_string()).unwrap(),
195 )
196 .unwrap();
197
198 let result = StartProjectionUseCase::execute(projection.clone());
200 assert!(result.is_ok());
201 assert_eq!(
202 result.unwrap().status,
203 crate::application::dto::ProjectionStatusDto::Running
204 );
205
206 projection.start().unwrap();
208 let result = PauseProjectionUseCase::execute(projection.clone());
209 assert!(result.is_ok());
210 assert_eq!(
211 result.unwrap().status,
212 crate::application::dto::ProjectionStatusDto::Paused
213 );
214
215 let result = RebuildProjectionUseCase::execute(projection.clone());
217 assert!(result.is_ok());
218 assert_eq!(
219 result.unwrap().status,
220 crate::application::dto::ProjectionStatusDto::Rebuilding
221 );
222 }
223
224 #[test]
225 fn test_update_projection() {
226 let tenant_id = TenantId::new("tenant-1".to_string()).unwrap();
227 let mut projection = Projection::new_v1(
228 tenant_id,
229 "test-projection".to_string(),
230 ProjectionType::Custom,
231 )
232 .unwrap();
233
234 projection
235 .add_event_type(
236 crate::domain::value_objects::EventType::new("old.event".to_string()).unwrap(),
237 )
238 .unwrap();
239
240 let request = UpdateProjectionRequest {
241 description: Some("Updated description".to_string()),
242 config: Some(ProjectionConfigDto {
243 batch_size: Some(1000),
244 checkpoint_interval: Some(10000),
245 }),
246 event_types: Some(vec!["new.event".to_string()]),
247 };
248
249 let result = UpdateProjectionUseCase::execute(projection, request);
250 assert!(result.is_ok());
251
252 let updated = result.unwrap();
253 assert_eq!(updated.description, Some("Updated description".to_string()));
254 assert_eq!(updated.event_types, vec!["new.event".to_string()]);
255 assert_eq!(updated.config.batch_size, Some(1000));
256 }
257
258 #[test]
259 fn test_list_projections() {
260 let tenant_id = TenantId::new("tenant-1".to_string()).unwrap();
261 let projections = vec![
262 Projection::new_v1(
263 tenant_id.clone(),
264 "projection-1".to_string(),
265 ProjectionType::EntitySnapshot,
266 )
267 .unwrap(),
268 Projection::new_v1(
269 tenant_id,
270 "projection-2".to_string(),
271 ProjectionType::EventCounter,
272 )
273 .unwrap(),
274 ];
275
276 let response = ListProjectionsUseCase::execute(projections);
277 assert_eq!(response.count, 2);
278 assert_eq!(response.projections.len(), 2);
279 }
280}