allsource_core/application/use_cases/
manage_schema.rs1use crate::application::dto::{
2 RegisterSchemaRequest, RegisterSchemaResponse, UpdateSchemaRequest, ListSchemasResponse,
3 SchemaDto, CompatibilityModeDto,
4};
5use crate::domain::entities::{Schema, CompatibilityMode};
6use crate::error::Result;
7
8pub struct RegisterSchemaUseCase;
12
13impl RegisterSchemaUseCase {
14 pub fn execute(request: RegisterSchemaRequest) -> Result<RegisterSchemaResponse> {
15 let compatibility_mode = request
17 .compatibility_mode
18 .map(CompatibilityMode::from)
19 .unwrap_or(CompatibilityMode::None);
20
21 let mut schema = Schema::new_v1(
23 request.subject,
24 request.schema,
25 compatibility_mode,
26 )?;
27
28 if let Some(description) = request.description {
30 schema.set_description(description)?;
31 }
32
33 if let Some(tags) = request.tags {
35 for tag in tags {
36 schema.add_tag(tag)?;
37 }
38 }
39
40 Ok(RegisterSchemaResponse {
41 schema: SchemaDto::from(&schema),
42 })
43 }
44}
45
46pub struct CreateNextSchemaVersionUseCase;
50
51impl CreateNextSchemaVersionUseCase {
52 pub fn execute(
53 current_schema: &Schema,
54 new_schema_definition: serde_json::Value,
55 description: Option<String>,
56 ) -> Result<SchemaDto> {
57 let mut next_schema = current_schema.create_next_version(new_schema_definition)?;
59
60 if let Some(desc) = description {
62 next_schema.set_description(desc)?;
63 }
64
65 for tag in current_schema.tags() {
67 next_schema.add_tag(tag.clone())?;
68 }
69
70 Ok(SchemaDto::from(&next_schema))
71 }
72}
73
74pub struct UpdateSchemaMetadataUseCase;
78
79impl UpdateSchemaMetadataUseCase {
80 pub fn execute(mut schema: Schema, request: UpdateSchemaRequest) -> Result<SchemaDto> {
81 if let Some(description) = request.description {
83 if description.is_empty() {
84 schema.clear_description();
85 } else {
86 schema.set_description(description)?;
87 }
88 }
89
90 if let Some(tags) = request.tags {
92 for existing_tag in schema.tags().to_vec() {
94 schema.remove_tag(&existing_tag);
95 }
96 for tag in tags {
97 schema.add_tag(tag)?;
98 }
99 }
100
101 Ok(SchemaDto::from(&schema))
102 }
103}
104
105pub struct ListSchemasUseCase;
109
110impl ListSchemasUseCase {
111 pub fn execute(schemas: Vec<Schema>) -> ListSchemasResponse {
112 let schema_dtos: Vec<SchemaDto> = schemas.iter().map(SchemaDto::from).collect();
113 let count = schema_dtos.len();
114
115 ListSchemasResponse {
116 schemas: schema_dtos,
117 count,
118 }
119 }
120}
121
122#[cfg(test)]
123mod tests {
124 use super::*;
125 use serde_json::json;
126
127 #[test]
128 fn test_register_schema() {
129 let request = RegisterSchemaRequest {
130 subject: "user.created".to_string(),
131 schema: json!({
132 "type": "object",
133 "properties": {
134 "name": {"type": "string"},
135 "email": {"type": "string"}
136 }
137 }),
138 compatibility_mode: Some(CompatibilityModeDto::Backward),
139 description: Some("User creation event schema".to_string()),
140 tags: Some(vec!["user".to_string(), "core".to_string()]),
141 };
142
143 let response = RegisterSchemaUseCase::execute(request);
144 assert!(response.is_ok());
145
146 let response = response.unwrap();
147 assert_eq!(response.schema.subject, "user.created");
148 assert_eq!(response.schema.version, 1);
149 assert_eq!(response.schema.tags.len(), 2);
150 assert_eq!(response.schema.compatibility_mode, CompatibilityModeDto::Backward);
151 }
152
153 #[test]
154 fn test_create_next_version() {
155 let schema = Schema::new_v1(
156 "order.placed".to_string(),
157 json!({"type": "object"}),
158 CompatibilityMode::None,
159 )
160 .unwrap();
161
162 let new_schema = json!({
163 "type": "object",
164 "properties": {
165 "amount": {"type": "number"}
166 }
167 });
168
169 let result = CreateNextSchemaVersionUseCase::execute(
170 &schema,
171 new_schema,
172 Some("Version 2".to_string()),
173 );
174
175 assert!(result.is_ok());
176 let next = result.unwrap();
177 assert_eq!(next.version, 2);
178 assert_eq!(next.subject, "order.placed");
179 }
180
181 #[test]
182 fn test_update_schema_metadata() {
183 let mut schema = Schema::new_v1(
184 "test.event".to_string(),
185 json!({"type": "object"}),
186 CompatibilityMode::None,
187 )
188 .unwrap();
189
190 schema.add_tag("old-tag".to_string()).unwrap();
191
192 let request = UpdateSchemaRequest {
193 description: Some("Updated description".to_string()),
194 tags: Some(vec!["new-tag".to_string()]),
195 compatibility_mode: None,
196 };
197
198 let result = UpdateSchemaMetadataUseCase::execute(schema, request);
199 assert!(result.is_ok());
200
201 let updated = result.unwrap();
202 assert_eq!(updated.description, Some("Updated description".to_string()));
203 assert_eq!(updated.tags, vec!["new-tag".to_string()]);
204 }
205
206 #[test]
207 fn test_list_schemas() {
208 let schemas = vec![
209 Schema::new_v1(
210 "event.one".to_string(),
211 json!({"type": "object"}),
212 CompatibilityMode::None,
213 )
214 .unwrap(),
215 Schema::new_v1(
216 "event.two".to_string(),
217 json!({"type": "object"}),
218 CompatibilityMode::Backward,
219 )
220 .unwrap(),
221 ];
222
223 let response = ListSchemasUseCase::execute(schemas);
224 assert_eq!(response.count, 2);
225 assert_eq!(response.schemas.len(), 2);
226 }
227}