allsource_core/application/use_cases/
manage_projection.rs1use crate::application::dto::{
2 CreateProjectionRequest, CreateProjectionResponse, ListProjectionsResponse,
3 ProjectionConfigDto, ProjectionDto, ProjectionTypeDto, UpdateProjectionRequest,
4};
5use crate::domain::entities::{Projection, ProjectionConfig, ProjectionType};
6use crate::domain::value_objects::{EventType, TenantId};
7use crate::error::Result;
8
9pub struct CreateProjectionUseCase;
13
14impl CreateProjectionUseCase {
15 pub fn execute(request: CreateProjectionRequest) -> Result<CreateProjectionResponse> {
16 let tenant_id = TenantId::new(request.tenant_id)?;
18
19 let projection_type = ProjectionType::from(request.projection_type);
21
22 let mut projection = Projection::new_v1(tenant_id, request.name, projection_type)?;
24
25 if let Some(config_dto) = request.config {
27 let config = ProjectionConfig::from(config_dto);
28 projection.update_config(config);
29 }
30
31 if let Some(description) = request.description {
33 projection.set_description(description)?;
34 }
35
36 for event_type_str in request.event_types {
38 let event_type = EventType::new(event_type_str)?;
39 projection.add_event_type(event_type)?;
40 }
41
42 Ok(CreateProjectionResponse {
43 projection: ProjectionDto::from(&projection),
44 })
45 }
46}
47
48pub struct UpdateProjectionUseCase;
52
53impl UpdateProjectionUseCase {
54 pub fn execute(
55 mut projection: Projection,
56 request: UpdateProjectionRequest,
57 ) -> Result<ProjectionDto> {
58 if let Some(description) = request.description {
60 projection.set_description(description)?;
61 }
62
63 if let Some(config_dto) = request.config {
65 projection.update_config(ProjectionConfig::from(config_dto));
66 }
67
68 if let Some(event_types) = request.event_types {
70 let existing = projection.event_types().to_vec();
72 for event_type in existing {
73 projection.remove_event_type(&event_type);
74 }
75 for event_type_str in event_types {
76 let event_type = EventType::new(event_type_str)?;
77 projection.add_event_type(event_type)?;
78 }
79 }
80
81 Ok(ProjectionDto::from(&projection))
82 }
83}
84
85pub struct StartProjectionUseCase;
89
90impl StartProjectionUseCase {
91 pub fn execute(mut projection: Projection) -> Result<ProjectionDto> {
92 projection.start()?;
93 Ok(ProjectionDto::from(&projection))
94 }
95}
96
97pub struct PauseProjectionUseCase;
101
102impl PauseProjectionUseCase {
103 pub fn execute(mut projection: Projection) -> Result<ProjectionDto> {
104 projection.pause()?;
105 Ok(ProjectionDto::from(&projection))
106 }
107}
108
109pub struct StopProjectionUseCase;
113
114impl StopProjectionUseCase {
115 pub fn execute(mut projection: Projection) -> Result<ProjectionDto> {
116 projection.stop();
117 Ok(ProjectionDto::from(&projection))
118 }
119}
120
121pub struct RebuildProjectionUseCase;
125
126impl RebuildProjectionUseCase {
127 pub fn execute(mut projection: Projection) -> Result<ProjectionDto> {
128 projection.start_rebuild();
129 Ok(ProjectionDto::from(&projection))
130 }
131}
132
133pub struct ListProjectionsUseCase;
137
138impl ListProjectionsUseCase {
139 pub fn execute(projections: Vec<Projection>) -> ListProjectionsResponse {
140 let projection_dtos: Vec<ProjectionDto> =
141 projections.iter().map(ProjectionDto::from).collect();
142 let count = projection_dtos.len();
143
144 ListProjectionsResponse {
145 projections: projection_dtos,
146 count,
147 }
148 }
149}
150
151#[cfg(test)]
152mod tests {
153 use super::*;
154
155 #[test]
156 fn test_create_projection() {
157 let request = CreateProjectionRequest {
158 name: "user-snapshots".to_string(),
159 projection_type: ProjectionTypeDto::EntitySnapshot,
160 tenant_id: "tenant-1".to_string(),
161 event_types: vec!["user.created".to_string(), "user.updated".to_string()],
162 description: Some("User state snapshots".to_string()),
163 config: Some(ProjectionConfigDto {
164 batch_size: Some(500),
165 checkpoint_interval: Some(5000),
166 }),
167 };
168
169 let response = CreateProjectionUseCase::execute(request);
170 assert!(response.is_ok());
171
172 let response = response.unwrap();
173 assert_eq!(response.projection.name, "user-snapshots");
174 assert_eq!(response.projection.event_types.len(), 2);
175 }
176
177 #[test]
178 fn test_projection_lifecycle() {
179 let tenant_id = TenantId::new("tenant-1".to_string()).unwrap();
180 let mut projection = Projection::new_v1(
181 tenant_id,
182 "test-projection".to_string(),
183 ProjectionType::EventCounter,
184 )
185 .unwrap();
186
187 projection
188 .add_event_type(
189 crate::domain::value_objects::EventType::new("test.event".to_string()).unwrap(),
190 )
191 .unwrap();
192
193 let result = StartProjectionUseCase::execute(projection.clone());
195 assert!(result.is_ok());
196 assert_eq!(
197 result.unwrap().status,
198 crate::application::dto::ProjectionStatusDto::Running
199 );
200
201 projection.start().unwrap();
203 let result = PauseProjectionUseCase::execute(projection.clone());
204 assert!(result.is_ok());
205 assert_eq!(
206 result.unwrap().status,
207 crate::application::dto::ProjectionStatusDto::Paused
208 );
209
210 let result = RebuildProjectionUseCase::execute(projection.clone());
212 assert!(result.is_ok());
213 assert_eq!(
214 result.unwrap().status,
215 crate::application::dto::ProjectionStatusDto::Rebuilding
216 );
217 }
218
219 #[test]
220 fn test_update_projection() {
221 let tenant_id = TenantId::new("tenant-1".to_string()).unwrap();
222 let mut projection = Projection::new_v1(
223 tenant_id,
224 "test-projection".to_string(),
225 ProjectionType::Custom,
226 )
227 .unwrap();
228
229 projection
230 .add_event_type(
231 crate::domain::value_objects::EventType::new("old.event".to_string()).unwrap(),
232 )
233 .unwrap();
234
235 let request = UpdateProjectionRequest {
236 description: Some("Updated description".to_string()),
237 config: Some(ProjectionConfigDto {
238 batch_size: Some(1000),
239 checkpoint_interval: Some(10000),
240 }),
241 event_types: Some(vec!["new.event".to_string()]),
242 };
243
244 let result = UpdateProjectionUseCase::execute(projection, request);
245 assert!(result.is_ok());
246
247 let updated = result.unwrap();
248 assert_eq!(updated.description, Some("Updated description".to_string()));
249 assert_eq!(updated.event_types, vec!["new.event".to_string()]);
250 assert_eq!(updated.config.batch_size, Some(1000));
251 }
252
253 #[test]
254 fn test_list_projections() {
255 let tenant_id = TenantId::new("tenant-1".to_string()).unwrap();
256 let projections = vec![
257 Projection::new_v1(
258 tenant_id.clone(),
259 "projection-1".to_string(),
260 ProjectionType::EntitySnapshot,
261 )
262 .unwrap(),
263 Projection::new_v1(
264 tenant_id,
265 "projection-2".to_string(),
266 ProjectionType::EventCounter,
267 )
268 .unwrap(),
269 ];
270
271 let response = ListProjectionsUseCase::execute(projections);
272 assert_eq!(response.count, 2);
273 assert_eq!(response.projections.len(), 2);
274 }
275}