allsource_core/application/use_cases/
manage_projection.rs1use crate::application::dto::{
2 CreateProjectionRequest, CreateProjectionResponse, ListProjectionsResponse, ProjectionDto,
3 UpdateProjectionRequest,
4};
5use crate::domain::entities::{Projection, ProjectionConfig, ProjectionType};
6use crate::domain::value_objects::{EventType, TenantId};
7use crate::error::Result;
8
9pub struct CreateProjectionUseCase;
13
14impl CreateProjectionUseCase {
15 pub fn execute(request: CreateProjectionRequest) -> Result<CreateProjectionResponse> {
16 let tenant_id = TenantId::new(request.tenant_id)?;
18
19 let projection_type = ProjectionType::from(request.projection_type);
21
22 let mut projection = Projection::new_v1(tenant_id, request.name, projection_type)?;
24
25 if let Some(config_dto) = request.config {
27 let config = ProjectionConfig::from(config_dto);
28 projection.update_config(config);
29 }
30
31 if let Some(description) = request.description {
33 projection.set_description(description)?;
34 }
35
36 for event_type_str in request.event_types {
38 let event_type = EventType::new(event_type_str)?;
39 projection.add_event_type(event_type)?;
40 }
41
42 Ok(CreateProjectionResponse {
43 projection: ProjectionDto::from(&projection),
44 })
45 }
46}
47
48pub struct UpdateProjectionUseCase;
52
53impl UpdateProjectionUseCase {
54 pub fn execute(
55 mut projection: Projection,
56 request: UpdateProjectionRequest,
57 ) -> Result<ProjectionDto> {
58 if let Some(description) = request.description {
60 projection.set_description(description)?;
61 }
62
63 if let Some(config_dto) = request.config {
65 projection.update_config(ProjectionConfig::from(config_dto));
66 }
67
68 if let Some(event_types) = request.event_types {
70 let existing = projection.event_types().to_vec();
72 for event_type in existing {
73 projection.remove_event_type(&event_type);
74 }
75 for event_type_str in event_types {
76 let event_type = EventType::new(event_type_str)?;
77 projection.add_event_type(event_type)?;
78 }
79 }
80
81 Ok(ProjectionDto::from(&projection))
82 }
83}
84
85pub struct StartProjectionUseCase;
89
90impl StartProjectionUseCase {
91 pub fn execute(mut projection: Projection) -> Result<ProjectionDto> {
92 projection.start()?;
93 Ok(ProjectionDto::from(&projection))
94 }
95}
96
97pub struct PauseProjectionUseCase;
101
102impl PauseProjectionUseCase {
103 pub fn execute(mut projection: Projection) -> Result<ProjectionDto> {
104 projection.pause()?;
105 Ok(ProjectionDto::from(&projection))
106 }
107}
108
109pub struct StopProjectionUseCase;
113
114impl StopProjectionUseCase {
115 pub fn execute(mut projection: Projection) -> Result<ProjectionDto> {
116 projection.stop();
117 Ok(ProjectionDto::from(&projection))
118 }
119}
120
121pub struct RebuildProjectionUseCase;
125
126impl RebuildProjectionUseCase {
127 pub fn execute(mut projection: Projection) -> Result<ProjectionDto> {
128 projection.start_rebuild();
129 Ok(ProjectionDto::from(&projection))
130 }
131}
132
133pub struct ListProjectionsUseCase;
137
138impl ListProjectionsUseCase {
139 pub fn execute(projections: Vec<Projection>) -> ListProjectionsResponse {
140 let projection_dtos: Vec<ProjectionDto> =
141 projections.iter().map(ProjectionDto::from).collect();
142 let count = projection_dtos.len();
143
144 ListProjectionsResponse {
145 projections: projection_dtos,
146 count,
147 }
148 }
149}
150
151#[cfg(test)]
152mod tests {
153 use super::*;
154 use crate::application::dto::{ProjectionConfigDto, ProjectionTypeDto};
155
156 #[test]
157 fn test_create_projection() {
158 let request = CreateProjectionRequest {
159 name: "user-snapshots".to_string(),
160 projection_type: ProjectionTypeDto::EntitySnapshot,
161 tenant_id: "tenant-1".to_string(),
162 event_types: vec!["user.created".to_string(), "user.updated".to_string()],
163 description: Some("User state snapshots".to_string()),
164 config: Some(ProjectionConfigDto {
165 batch_size: Some(500),
166 checkpoint_interval: Some(5000),
167 }),
168 };
169
170 let response = CreateProjectionUseCase::execute(request);
171 assert!(response.is_ok());
172
173 let response = response.unwrap();
174 assert_eq!(response.projection.name, "user-snapshots");
175 assert_eq!(response.projection.event_types.len(), 2);
176 }
177
178 #[test]
179 fn test_projection_lifecycle() {
180 let tenant_id = TenantId::new("tenant-1".to_string()).unwrap();
181 let mut projection = Projection::new_v1(
182 tenant_id,
183 "test-projection".to_string(),
184 ProjectionType::EventCounter,
185 )
186 .unwrap();
187
188 projection
189 .add_event_type(
190 crate::domain::value_objects::EventType::new("test.event".to_string()).unwrap(),
191 )
192 .unwrap();
193
194 let result = StartProjectionUseCase::execute(projection.clone());
196 assert!(result.is_ok());
197 assert_eq!(
198 result.unwrap().status,
199 crate::application::dto::ProjectionStatusDto::Running
200 );
201
202 projection.start().unwrap();
204 let result = PauseProjectionUseCase::execute(projection.clone());
205 assert!(result.is_ok());
206 assert_eq!(
207 result.unwrap().status,
208 crate::application::dto::ProjectionStatusDto::Paused
209 );
210
211 let result = RebuildProjectionUseCase::execute(projection.clone());
213 assert!(result.is_ok());
214 assert_eq!(
215 result.unwrap().status,
216 crate::application::dto::ProjectionStatusDto::Rebuilding
217 );
218 }
219
220 #[test]
221 fn test_update_projection() {
222 let tenant_id = TenantId::new("tenant-1".to_string()).unwrap();
223 let mut projection = Projection::new_v1(
224 tenant_id,
225 "test-projection".to_string(),
226 ProjectionType::Custom,
227 )
228 .unwrap();
229
230 projection
231 .add_event_type(
232 crate::domain::value_objects::EventType::new("old.event".to_string()).unwrap(),
233 )
234 .unwrap();
235
236 let request = UpdateProjectionRequest {
237 description: Some("Updated description".to_string()),
238 config: Some(ProjectionConfigDto {
239 batch_size: Some(1000),
240 checkpoint_interval: Some(10000),
241 }),
242 event_types: Some(vec!["new.event".to_string()]),
243 };
244
245 let result = UpdateProjectionUseCase::execute(projection, request);
246 assert!(result.is_ok());
247
248 let updated = result.unwrap();
249 assert_eq!(updated.description, Some("Updated description".to_string()));
250 assert_eq!(updated.event_types, vec!["new.event".to_string()]);
251 assert_eq!(updated.config.batch_size, Some(1000));
252 }
253
254 #[test]
255 fn test_list_projections() {
256 let tenant_id = TenantId::new("tenant-1".to_string()).unwrap();
257 let projections = vec![
258 Projection::new_v1(
259 tenant_id.clone(),
260 "projection-1".to_string(),
261 ProjectionType::EntitySnapshot,
262 )
263 .unwrap(),
264 Projection::new_v1(
265 tenant_id,
266 "projection-2".to_string(),
267 ProjectionType::EventCounter,
268 )
269 .unwrap(),
270 ];
271
272 let response = ListProjectionsUseCase::execute(projections);
273 assert_eq!(response.count, 2);
274 assert_eq!(response.projections.len(), 2);
275 }
276}