1use std::collections::HashMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5
6use serde_json::Value;
7use winterbaume_core::{
8 BackendState, MockRequest, MockResponse, MockService, StateChangeNotifier, StatefulService,
9 default_account_id, json_error_response,
10};
11
12use crate::state::{KeyspacesError, KeyspacesState};
13use crate::types;
14use crate::views::KeyspacesStateView;
15use crate::wire;
16
17pub struct KeyspacesService {
18 pub(crate) state: Arc<BackendState<KeyspacesState>>,
19 pub(crate) notifier: StateChangeNotifier<KeyspacesStateView>,
20}
21
22impl KeyspacesService {
23 pub fn new() -> Self {
24 Self {
25 state: Arc::new(BackendState::new()),
26 notifier: StateChangeNotifier::new(),
27 }
28 }
29}
30
31impl Default for KeyspacesService {
32 fn default() -> Self {
33 Self::new()
34 }
35}
36
37impl MockService for KeyspacesService {
38 fn service_name(&self) -> &str {
39 "cassandra"
40 }
41
42 fn url_patterns(&self) -> Vec<&str> {
43 vec![
44 r"https?://cassandra\..*\.amazonaws\.com",
45 r"https?://cassandra\.amazonaws\.com",
46 ]
47 }
48
49 fn handle(
50 &self,
51 request: MockRequest,
52 ) -> Pin<Box<dyn Future<Output = MockResponse> + Send + '_>> {
53 Box::pin(async move { self.dispatch(request).await })
54 }
55}
56
57impl KeyspacesService {
58 async fn dispatch(&self, request: MockRequest) -> MockResponse {
59 let region = winterbaume_core::auth::extract_region_from_uri(&request.uri);
60 let account_id = default_account_id();
61
62 let action = request
63 .headers
64 .get("x-amz-target")
65 .and_then(|v| v.to_str().ok())
66 .and_then(|v| v.split('.').next_back())
67 .map(|s| s.to_string());
68
69 let action = match action {
70 Some(a) => a,
71 None => {
72 return json_error_response(400, "MissingAction", "Missing X-Amz-Target header");
73 }
74 };
75
76 if !request.body.is_empty() && serde_json::from_slice::<Value>(&request.body).is_err() {
79 return json_error_response(400, "SerializationException", "Invalid JSON body");
80 }
81 let body_bytes: &[u8] = &request.body;
82
83 let state = self.state.get(account_id, ®ion);
84
85 let response = match action.as_str() {
86 "CreateKeyspace" => {
87 self.handle_create_keyspace(&state, body_bytes, account_id, ®ion)
88 .await
89 }
90 "GetKeyspace" => self.handle_get_keyspace(&state, body_bytes).await,
91 "DeleteKeyspace" => self.handle_delete_keyspace(&state, body_bytes).await,
92 "UpdateKeyspace" => self.handle_update_keyspace(&state, body_bytes).await,
93 "ListKeyspaces" => self.handle_list_keyspaces(&state).await,
94 "CreateTable" => {
95 self.handle_create_table(&state, body_bytes, account_id, ®ion)
96 .await
97 }
98 "GetTable" => self.handle_get_table(&state, body_bytes).await,
99 "DeleteTable" => self.handle_delete_table(&state, body_bytes).await,
100 "UpdateTable" => self.handle_update_table(&state, body_bytes).await,
101 "ListTables" => self.handle_list_tables(&state, body_bytes).await,
102 "RestoreTable" => {
103 self.handle_restore_table(&state, body_bytes, account_id, ®ion)
104 .await
105 }
106 "GetTableAutoScalingSettings" => {
107 self.handle_get_table_auto_scaling_settings(&state, body_bytes)
108 .await
109 }
110 "CreateType" => self.handle_create_type(&state, body_bytes).await,
111 "GetType" => {
112 self.handle_get_type(&state, body_bytes, account_id, ®ion)
113 .await
114 }
115 "DeleteType" => {
116 self.handle_delete_type(&state, body_bytes, account_id, ®ion)
117 .await
118 }
119 "ListTypes" => self.handle_list_types(&state, body_bytes).await,
120 "TagResource" => self.handle_tag_resource(&state, body_bytes).await,
121 "UntagResource" => self.handle_untag_resource(&state, body_bytes).await,
122 "ListTagsForResource" => self.handle_list_tags_for_resource(&state, body_bytes).await,
123 _ => json_error_response(400, "InvalidAction", &format!("Unknown operation {action}")),
124 };
125
126 if response.status / 100 == 2 {
127 self.notify_state_changed(account_id, ®ion).await;
128 }
129 response
130 }
131
132 async fn handle_create_keyspace(
135 &self,
136 state: &Arc<tokio::sync::RwLock<KeyspacesState>>,
137 body: &[u8],
138 account_id: &str,
139 region: &str,
140 ) -> MockResponse {
141 let input = match wire::deserialize_create_keyspace_request(body) {
142 Ok(v) => v,
143 Err(e) => return json_error_response(400, "ValidationException", &e),
144 };
145 if input.keyspace_name.is_empty() {
146 return json_error_response(400, "ValidationException", "keyspaceName is required");
147 }
148
149 let (replication_strategy, replication_regions) =
150 replication_spec_to_pair(input.replication_specification.as_ref());
151 let tags = tag_list_to_map(input.tags.as_deref());
152
153 let mut guard = state.write().await;
154 match guard.create_keyspace(
155 &input.keyspace_name,
156 &replication_strategy,
157 replication_regions,
158 tags,
159 account_id,
160 region,
161 ) {
162 Ok(arn) => wire::serialize_create_keyspace_response(&wire::CreateKeyspaceResponse {
163 resource_arn: Some(arn),
164 }),
165 Err(e) => keyspaces_error_response(&e),
166 }
167 }
168
169 async fn handle_get_keyspace(
170 &self,
171 state: &Arc<tokio::sync::RwLock<KeyspacesState>>,
172 body: &[u8],
173 ) -> MockResponse {
174 let input = match wire::deserialize_get_keyspace_request(body) {
175 Ok(v) => v,
176 Err(e) => return json_error_response(400, "ValidationException", &e),
177 };
178 if input.keyspace_name.is_empty() {
179 return json_error_response(400, "ValidationException", "keyspaceName is required");
180 }
181
182 let guard = state.read().await;
183 match guard.get_keyspace(&input.keyspace_name) {
184 Ok(ks) => wire::serialize_get_keyspace_response(&wire::GetKeyspaceResponse {
185 keyspace_name: Some(ks.name.clone()),
186 resource_arn: Some(ks.arn.clone()),
187 replication_strategy: Some(ks.replication_strategy.clone()),
188 replication_regions: if ks.replication_regions.is_empty() {
189 None
190 } else {
191 Some(ks.replication_regions.clone())
192 },
193 ..Default::default()
194 }),
195 Err(e) => keyspaces_error_response(&e),
196 }
197 }
198
199 async fn handle_delete_keyspace(
200 &self,
201 state: &Arc<tokio::sync::RwLock<KeyspacesState>>,
202 body: &[u8],
203 ) -> MockResponse {
204 let input = match wire::deserialize_delete_keyspace_request(body) {
205 Ok(v) => v,
206 Err(e) => return json_error_response(400, "ValidationException", &e),
207 };
208 if input.keyspace_name.is_empty() {
209 return json_error_response(400, "ValidationException", "keyspaceName is required");
210 }
211
212 let mut guard = state.write().await;
213 match guard.delete_keyspace(&input.keyspace_name) {
214 Ok(()) => wire::serialize_delete_keyspace_response(&wire::DeleteKeyspaceResponse {}),
215 Err(e) => keyspaces_error_response(&e),
216 }
217 }
218
219 async fn handle_update_keyspace(
220 &self,
221 state: &Arc<tokio::sync::RwLock<KeyspacesState>>,
222 body: &[u8],
223 ) -> MockResponse {
224 let input = match wire::deserialize_update_keyspace_request(body) {
225 Ok(v) => v,
226 Err(e) => return json_error_response(400, "ValidationException", &e),
227 };
228 if input.keyspace_name.is_empty() {
229 return json_error_response(400, "ValidationException", "keyspaceName is required");
230 }
231
232 let (replication_strategy, replication_regions) =
233 replication_spec_to_pair(Some(&input.replication_specification));
234
235 let mut guard = state.write().await;
236 match guard.update_keyspace(
237 &input.keyspace_name,
238 &replication_strategy,
239 replication_regions,
240 ) {
241 Ok(arn) => wire::serialize_update_keyspace_response(&wire::UpdateKeyspaceResponse {
242 resource_arn: Some(arn),
243 }),
244 Err(e) => keyspaces_error_response(&e),
245 }
246 }
247
248 async fn handle_list_keyspaces(
249 &self,
250 state: &Arc<tokio::sync::RwLock<KeyspacesState>>,
251 ) -> MockResponse {
252 let guard = state.read().await;
253 let keyspaces = guard.list_keyspaces();
254
255 wire::serialize_list_keyspaces_response(&wire::ListKeyspacesResponse {
256 keyspaces: Some(
257 keyspaces
258 .into_iter()
259 .map(|ks| wire::KeyspaceSummary {
260 keyspace_name: Some(ks.name.clone()),
261 resource_arn: Some(ks.arn.clone()),
262 replication_strategy: Some(ks.replication_strategy.clone()),
263 replication_regions: if ks.replication_regions.is_empty() {
264 None
265 } else {
266 Some(ks.replication_regions.clone())
267 },
268 })
269 .collect(),
270 ),
271 ..Default::default()
272 })
273 }
274
275 async fn handle_create_table(
278 &self,
279 state: &Arc<tokio::sync::RwLock<KeyspacesState>>,
280 body: &[u8],
281 account_id: &str,
282 region: &str,
283 ) -> MockResponse {
284 let input = match wire::deserialize_create_table_request(body) {
285 Ok(v) => v,
286 Err(e) => return json_error_response(400, "ValidationException", &e),
287 };
288 if input.keyspace_name.is_empty() {
289 return json_error_response(400, "ValidationException", "keyspaceName is required");
290 }
291 if input.table_name.is_empty() {
292 return json_error_response(400, "ValidationException", "tableName is required");
293 }
294
295 if input.schema_definition.all_columns.is_empty()
298 && input.schema_definition.partition_keys.is_empty()
299 {
300 return json_error_response(400, "ValidationException", "schemaDefinition is required");
301 }
302 let schema = wire_schema_to_types(&input.schema_definition);
303
304 let (capacity_mode, rcu, wcu) =
305 capacity_spec_to_tuple(input.capacity_specification.as_ref(), "PAY_PER_REQUEST");
306 let (encryption_type, kms_key) =
307 encryption_spec_to_tuple(input.encryption_specification.as_ref());
308 let pitr = input
309 .point_in_time_recovery
310 .as_ref()
311 .map(|p| p.status == "ENABLED")
312 .unwrap_or(false);
313 let ttl_status = input
314 .ttl
315 .as_ref()
316 .map(|t| t.status.clone())
317 .unwrap_or_else(|| "ENABLED".to_string());
318 let default_ttl = input.default_time_to_live;
319 let comment = input
320 .comment
321 .as_ref()
322 .map(|c| c.message.clone())
323 .unwrap_or_default();
324 let cst = input
325 .client_side_timestamps
326 .as_ref()
327 .map(|c| c.status == "ENABLED")
328 .unwrap_or(false);
329 let tags = tag_list_to_map(input.tags.as_deref());
330
331 let mut guard = state.write().await;
332 match guard.create_table(
333 &input.keyspace_name,
334 &input.table_name,
335 schema,
336 &capacity_mode,
337 rcu,
338 wcu,
339 &encryption_type,
340 kms_key,
341 pitr,
342 &ttl_status,
343 default_ttl,
344 &comment,
345 cst,
346 tags,
347 account_id,
348 region,
349 ) {
350 Ok(arn) => wire::serialize_create_table_response(&wire::CreateTableResponse {
351 resource_arn: Some(arn),
352 }),
353 Err(e) => keyspaces_error_response(&e),
354 }
355 }
356
357 async fn handle_get_table(
358 &self,
359 state: &Arc<tokio::sync::RwLock<KeyspacesState>>,
360 body: &[u8],
361 ) -> MockResponse {
362 let input = match wire::deserialize_get_table_request(body) {
363 Ok(v) => v,
364 Err(e) => return json_error_response(400, "ValidationException", &e),
365 };
366 if input.keyspace_name.is_empty() {
367 return json_error_response(400, "ValidationException", "keyspaceName is required");
368 }
369 if input.table_name.is_empty() {
370 return json_error_response(400, "ValidationException", "tableName is required");
371 }
372
373 let guard = state.read().await;
374 match guard.get_table(&input.keyspace_name, &input.table_name) {
375 Ok(table) => wire::serialize_get_table_response(&wire::GetTableResponse {
376 keyspace_name: Some(table.keyspace_name.clone()),
377 table_name: Some(table.table_name.clone()),
378 resource_arn: Some(table.arn.clone()),
379 creation_timestamp: Some(table.creation_timestamp.timestamp() as f64),
380 status: Some(table.status.clone()),
381 schema_definition: Some(table_schema_to_wire(&table.schema_definition)),
382 capacity_specification: Some(wire::CapacitySpecificationSummary {
383 throughput_mode: Some(table.capacity_mode.clone()),
384 read_capacity_units: table.read_capacity_units,
385 write_capacity_units: table.write_capacity_units,
386 ..Default::default()
387 }),
388 encryption_specification: Some(wire::EncryptionSpecification {
389 r#type: table.encryption_type.clone(),
390 kms_key_identifier: table.kms_key_identifier.clone(),
391 }),
392 point_in_time_recovery: Some(wire::PointInTimeRecoverySummary {
393 status: Some(if table.point_in_time_recovery_enabled {
394 "ENABLED".to_string()
395 } else {
396 "DISABLED".to_string()
397 }),
398 ..Default::default()
399 }),
400 ttl: Some(wire::TimeToLive {
401 status: table.ttl_status.clone(),
402 }),
403 default_time_to_live: table.default_time_to_live,
404 comment: if table.comment.is_empty() {
405 None
406 } else {
407 Some(wire::Comment {
408 message: table.comment.clone(),
409 })
410 },
411 client_side_timestamps: Some(wire::ClientSideTimestamps {
412 status: if table.client_side_timestamps_enabled {
413 "ENABLED".to_string()
414 } else {
415 "DISABLED".to_string()
416 },
417 }),
418 ..Default::default()
419 }),
420 Err(e) => keyspaces_error_response(&e),
421 }
422 }
423
424 async fn handle_delete_table(
425 &self,
426 state: &Arc<tokio::sync::RwLock<KeyspacesState>>,
427 body: &[u8],
428 ) -> MockResponse {
429 let input = match wire::deserialize_delete_table_request(body) {
430 Ok(v) => v,
431 Err(e) => return json_error_response(400, "ValidationException", &e),
432 };
433 if input.keyspace_name.is_empty() {
434 return json_error_response(400, "ValidationException", "keyspaceName is required");
435 }
436 if input.table_name.is_empty() {
437 return json_error_response(400, "ValidationException", "tableName is required");
438 }
439
440 let mut guard = state.write().await;
441 match guard.delete_table(&input.keyspace_name, &input.table_name) {
442 Ok(()) => wire::serialize_delete_table_response(&wire::DeleteTableResponse {}),
443 Err(e) => keyspaces_error_response(&e),
444 }
445 }
446
447 async fn handle_update_table(
448 &self,
449 state: &Arc<tokio::sync::RwLock<KeyspacesState>>,
450 body: &[u8],
451 ) -> MockResponse {
452 let input = match wire::deserialize_update_table_request(body) {
453 Ok(v) => v,
454 Err(e) => return json_error_response(400, "ValidationException", &e),
455 };
456 if input.keyspace_name.is_empty() {
457 return json_error_response(400, "ValidationException", "keyspaceName is required");
458 }
459 if input.table_name.is_empty() {
460 return json_error_response(400, "ValidationException", "tableName is required");
461 }
462
463 let capacity_mode = input
464 .capacity_specification
465 .as_ref()
466 .map(|c| c.throughput_mode.clone());
467 let rcu = input
468 .capacity_specification
469 .as_ref()
470 .and_then(|c| c.read_capacity_units);
471 let wcu = input
472 .capacity_specification
473 .as_ref()
474 .and_then(|c| c.write_capacity_units);
475 let encryption_type = input
476 .encryption_specification
477 .as_ref()
478 .map(|e| e.r#type.clone());
479 let kms_key = input
480 .encryption_specification
481 .as_ref()
482 .and_then(|e| e.kms_key_identifier.clone());
483 let pitr = input
484 .point_in_time_recovery
485 .as_ref()
486 .map(|p| p.status == "ENABLED");
487 let ttl_status = input.ttl.as_ref().map(|t| t.status.clone());
488 let default_ttl = input.default_time_to_live;
489 let cst = input
490 .client_side_timestamps
491 .as_ref()
492 .map(|c| c.status == "ENABLED");
493
494 let mut guard = state.write().await;
495
496 if let Some(add_cols) = input.add_columns.as_deref() {
498 let key = (input.keyspace_name.clone(), input.table_name.clone());
499 if let Some(table) = guard.tables.get_mut(&key) {
500 for col in add_cols {
501 table
502 .schema_definition
503 .all_columns
504 .push(types::ColumnDefinition {
505 name: col.name.clone(),
506 column_type: col.r#type.clone(),
507 });
508 }
509 }
510 }
511
512 match guard.update_table(
513 &input.keyspace_name,
514 &input.table_name,
515 capacity_mode.as_deref(),
516 rcu,
517 wcu,
518 encryption_type.as_deref(),
519 kms_key,
520 pitr,
521 ttl_status.as_deref(),
522 default_ttl,
523 cst,
524 ) {
525 Ok(arn) => wire::serialize_update_table_response(&wire::UpdateTableResponse {
526 resource_arn: Some(arn),
527 }),
528 Err(e) => keyspaces_error_response(&e),
529 }
530 }
531
532 async fn handle_list_tables(
533 &self,
534 state: &Arc<tokio::sync::RwLock<KeyspacesState>>,
535 body: &[u8],
536 ) -> MockResponse {
537 let input = match wire::deserialize_list_tables_request(body) {
538 Ok(v) => v,
539 Err(e) => return json_error_response(400, "ValidationException", &e),
540 };
541 if input.keyspace_name.is_empty() {
542 return json_error_response(400, "ValidationException", "keyspaceName is required");
543 }
544
545 let guard = state.read().await;
546 match guard.list_tables(&input.keyspace_name) {
547 Ok(tables) => wire::serialize_list_tables_response(&wire::ListTablesResponse {
548 tables: Some(
549 tables
550 .into_iter()
551 .map(|t| wire::TableSummary {
552 keyspace_name: Some(t.keyspace_name.clone()),
553 table_name: Some(t.table_name.clone()),
554 resource_arn: Some(t.arn.clone()),
555 })
556 .collect(),
557 ),
558 ..Default::default()
559 }),
560 Err(e) => keyspaces_error_response(&e),
561 }
562 }
563
564 async fn handle_restore_table(
565 &self,
566 state: &Arc<tokio::sync::RwLock<KeyspacesState>>,
567 body: &[u8],
568 account_id: &str,
569 region: &str,
570 ) -> MockResponse {
571 let input = match wire::deserialize_restore_table_request(body) {
572 Ok(v) => v,
573 Err(e) => return json_error_response(400, "ValidationException", &e),
574 };
575 if input.source_keyspace_name.is_empty() {
576 return json_error_response(
577 400,
578 "ValidationException",
579 "sourceKeyspaceName is required",
580 );
581 }
582 if input.source_table_name.is_empty() {
583 return json_error_response(400, "ValidationException", "sourceTableName is required");
584 }
585 if input.target_keyspace_name.is_empty() {
586 return json_error_response(
587 400,
588 "ValidationException",
589 "targetKeyspaceName is required",
590 );
591 }
592 if input.target_table_name.is_empty() {
593 return json_error_response(400, "ValidationException", "targetTableName is required");
594 }
595
596 let mut guard = state.write().await;
597 match guard.restore_table(
598 &input.source_keyspace_name,
599 &input.source_table_name,
600 &input.target_keyspace_name,
601 &input.target_table_name,
602 account_id,
603 region,
604 ) {
605 Ok(arn) => wire::serialize_restore_table_response(&wire::RestoreTableResponse {
606 restored_table_a_r_n: Some(arn),
607 }),
608 Err(e) => keyspaces_error_response(&e),
609 }
610 }
611
612 async fn handle_get_table_auto_scaling_settings(
613 &self,
614 state: &Arc<tokio::sync::RwLock<KeyspacesState>>,
615 body: &[u8],
616 ) -> MockResponse {
617 let input = match wire::deserialize_get_table_auto_scaling_settings_request(body) {
618 Ok(v) => v,
619 Err(e) => return json_error_response(400, "ValidationException", &e),
620 };
621 if input.keyspace_name.is_empty() {
622 return json_error_response(400, "ValidationException", "keyspaceName is required");
623 }
624 if input.table_name.is_empty() {
625 return json_error_response(400, "ValidationException", "tableName is required");
626 }
627
628 let guard = state.read().await;
629 match guard.get_table(&input.keyspace_name, &input.table_name) {
630 Ok(table) => wire::serialize_get_table_auto_scaling_settings_response(
631 &wire::GetTableAutoScalingSettingsResponse {
632 keyspace_name: Some(table.keyspace_name.clone()),
633 table_name: Some(table.table_name.clone()),
634 resource_arn: Some(table.arn.clone()),
635 ..Default::default()
636 },
637 ),
638 Err(e) => keyspaces_error_response(&e),
639 }
640 }
641
642 async fn handle_create_type(
645 &self,
646 state: &Arc<tokio::sync::RwLock<KeyspacesState>>,
647 body: &[u8],
648 ) -> MockResponse {
649 let input = match wire::deserialize_create_type_request(body) {
650 Ok(v) => v,
651 Err(e) => return json_error_response(400, "ValidationException", &e),
652 };
653 if input.keyspace_name.is_empty() {
654 return json_error_response(400, "ValidationException", "keyspaceName is required");
655 }
656 if input.type_name.is_empty() {
657 return json_error_response(400, "ValidationException", "typeName is required");
658 }
659
660 let field_defs: Vec<types::FieldDefinition> = input
661 .field_definitions
662 .into_iter()
663 .map(|f| types::FieldDefinition {
664 name: f.name,
665 field_type: f.r#type,
666 })
667 .collect();
668
669 let mut guard = state.write().await;
670 let keyspace_arn = guard
672 .get_keyspace(&input.keyspace_name)
673 .ok()
674 .map(|ks| ks.arn.clone());
675
676 match guard.create_type(&input.keyspace_name, &input.type_name, field_defs) {
677 Ok(_) => wire::serialize_create_type_response(&wire::CreateTypeResponse {
678 keyspace_arn,
679 type_name: Some(input.type_name.clone()),
680 }),
681 Err(e) => keyspaces_error_response(&e),
682 }
683 }
684
685 async fn handle_get_type(
686 &self,
687 state: &Arc<tokio::sync::RwLock<KeyspacesState>>,
688 body: &[u8],
689 account_id: &str,
690 region: &str,
691 ) -> MockResponse {
692 let input = match wire::deserialize_get_type_request(body) {
693 Ok(v) => v,
694 Err(e) => return json_error_response(400, "ValidationException", &e),
695 };
696 if input.keyspace_name.is_empty() {
697 return json_error_response(400, "ValidationException", "keyspaceName is required");
698 }
699 if input.type_name.is_empty() {
700 return json_error_response(400, "ValidationException", "typeName is required");
701 }
702
703 let guard = state.read().await;
704 match guard.get_type(&input.keyspace_name, &input.type_name) {
705 Ok(udt) => {
706 let keyspace_arn = format!(
707 "arn:aws:cassandra:{region}:{account_id}:/keyspace/{}/",
708 input.keyspace_name
709 );
710 wire::serialize_get_type_response(&wire::GetTypeResponse {
711 keyspace_name: Some(udt.keyspace_name.clone()),
712 type_name: Some(udt.type_name.clone()),
713 keyspace_arn: Some(keyspace_arn),
714 field_definitions: Some(
715 udt.field_definitions
716 .iter()
717 .map(|f| wire::FieldDefinition {
718 name: f.name.clone(),
719 r#type: f.field_type.clone(),
720 })
721 .collect(),
722 ),
723 last_modified_timestamp: Some(udt.creation_timestamp.timestamp() as f64),
724 status: Some(udt.status.clone()),
725 ..Default::default()
726 })
727 }
728 Err(e) => keyspaces_error_response(&e),
729 }
730 }
731
732 async fn handle_delete_type(
733 &self,
734 state: &Arc<tokio::sync::RwLock<KeyspacesState>>,
735 body: &[u8],
736 account_id: &str,
737 region: &str,
738 ) -> MockResponse {
739 let input = match wire::deserialize_delete_type_request(body) {
740 Ok(v) => v,
741 Err(e) => return json_error_response(400, "ValidationException", &e),
742 };
743 if input.keyspace_name.is_empty() {
744 return json_error_response(400, "ValidationException", "keyspaceName is required");
745 }
746 if input.type_name.is_empty() {
747 return json_error_response(400, "ValidationException", "typeName is required");
748 }
749
750 let keyspace_arn = format!(
751 "arn:aws:cassandra:{region}:{account_id}:/keyspace/{}/",
752 input.keyspace_name
753 );
754
755 let mut guard = state.write().await;
756 match guard.delete_type(&input.keyspace_name, &input.type_name) {
757 Ok(()) => wire::serialize_delete_type_response(&wire::DeleteTypeResponse {
758 keyspace_arn: Some(keyspace_arn),
759 type_name: Some(input.type_name.clone()),
760 }),
761 Err(e) => keyspaces_error_response(&e),
762 }
763 }
764
765 async fn handle_list_types(
766 &self,
767 state: &Arc<tokio::sync::RwLock<KeyspacesState>>,
768 body: &[u8],
769 ) -> MockResponse {
770 let input = match wire::deserialize_list_types_request(body) {
771 Ok(v) => v,
772 Err(e) => return json_error_response(400, "ValidationException", &e),
773 };
774 if input.keyspace_name.is_empty() {
775 return json_error_response(400, "ValidationException", "keyspaceName is required");
776 }
777
778 let guard = state.read().await;
779 match guard.list_types(&input.keyspace_name) {
780 Ok(types) => wire::serialize_list_types_response(&wire::ListTypesResponse {
781 types: Some(types.into_iter().map(|t| t.type_name.clone()).collect()),
782 ..Default::default()
783 }),
784 Err(e) => keyspaces_error_response(&e),
785 }
786 }
787
788 async fn handle_tag_resource(
791 &self,
792 state: &Arc<tokio::sync::RwLock<KeyspacesState>>,
793 body: &[u8],
794 ) -> MockResponse {
795 let input = match wire::deserialize_tag_resource_request(body) {
796 Ok(v) => v,
797 Err(e) => return json_error_response(400, "ValidationException", &e),
798 };
799 if input.resource_arn.is_empty() {
800 return json_error_response(400, "ValidationException", "resourceArn is required");
801 }
802
803 let tags = tag_list_to_map(Some(input.tags.as_slice()));
804
805 let mut guard = state.write().await;
806 match guard.tag_resource(&input.resource_arn, tags) {
807 Ok(()) => wire::serialize_tag_resource_response(&wire::TagResourceResponse {}),
808 Err(e) => keyspaces_error_response(&e),
809 }
810 }
811
812 async fn handle_untag_resource(
813 &self,
814 state: &Arc<tokio::sync::RwLock<KeyspacesState>>,
815 body: &[u8],
816 ) -> MockResponse {
817 let input = match wire::deserialize_untag_resource_request(body) {
818 Ok(v) => v,
819 Err(e) => return json_error_response(400, "ValidationException", &e),
820 };
821 if input.resource_arn.is_empty() {
822 return json_error_response(400, "ValidationException", "resourceArn is required");
823 }
824
825 let tag_keys: Vec<String> = input.tags.into_iter().map(|t| t.key).collect();
826
827 let mut guard = state.write().await;
828 match guard.untag_resource(&input.resource_arn, &tag_keys) {
829 Ok(()) => wire::serialize_untag_resource_response(&wire::UntagResourceResponse {}),
830 Err(e) => keyspaces_error_response(&e),
831 }
832 }
833
834 async fn handle_list_tags_for_resource(
835 &self,
836 state: &Arc<tokio::sync::RwLock<KeyspacesState>>,
837 body: &[u8],
838 ) -> MockResponse {
839 let input = match wire::deserialize_list_tags_for_resource_request(body) {
840 Ok(v) => v,
841 Err(e) => return json_error_response(400, "ValidationException", &e),
842 };
843 if input.resource_arn.is_empty() {
844 return json_error_response(400, "ValidationException", "resourceArn is required");
845 }
846
847 let guard = state.read().await;
848 match guard.get_tags_for_resource(&input.resource_arn) {
849 Ok(tags) => wire::serialize_list_tags_for_resource_response(
850 &wire::ListTagsForResourceResponse {
851 tags: Some(
852 tags.into_iter()
853 .map(|(k, v)| wire::Tag { key: k, value: v })
854 .collect(),
855 ),
856 ..Default::default()
857 },
858 ),
859 Err(e) => keyspaces_error_response(&e),
860 }
861 }
862}
863
864fn keyspaces_error_response(err: &KeyspacesError) -> MockResponse {
867 let (status, error_type) = match err {
868 KeyspacesError::NotFound { .. } => (404, "ResourceNotFoundException"),
869 KeyspacesError::AlreadyExists { .. } => (409, "ConflictException"),
870 KeyspacesError::Validation { .. } => (400, "ValidationException"),
871 KeyspacesError::Conflict { .. } => (409, "ConflictException"),
872 };
873 json_error_response(status, error_type, &err.to_string())
874}
875
876fn replication_spec_to_pair(
879 spec: Option<&wire::ReplicationSpecification>,
880) -> (String, Vec<String>) {
881 match spec {
882 Some(s) => {
883 let strategy = if s.replication_strategy.is_empty() {
884 "SINGLE_REGION".to_string()
885 } else {
886 s.replication_strategy.clone()
887 };
888 let regions = s.region_list.clone().unwrap_or_default();
889 (strategy, regions)
890 }
891 None => ("SINGLE_REGION".to_string(), Vec::new()),
892 }
893}
894
895fn tag_list_to_map(tags: Option<&[wire::Tag]>) -> HashMap<String, String> {
896 let mut map = HashMap::new();
897 if let Some(list) = tags {
898 for t in list {
899 map.insert(t.key.clone(), t.value.clone());
900 }
901 }
902 map
903}
904
905fn capacity_spec_to_tuple(
906 spec: Option<&wire::CapacitySpecification>,
907 default_mode: &str,
908) -> (String, Option<i64>, Option<i64>) {
909 match spec {
910 Some(s) => {
911 let mode = if s.throughput_mode.is_empty() {
912 default_mode.to_string()
913 } else {
914 s.throughput_mode.clone()
915 };
916 (mode, s.read_capacity_units, s.write_capacity_units)
917 }
918 None => (default_mode.to_string(), None, None),
919 }
920}
921
922fn encryption_spec_to_tuple(
923 spec: Option<&wire::EncryptionSpecification>,
924) -> (String, Option<String>) {
925 match spec {
926 Some(s) => {
927 let enc_type = if s.r#type.is_empty() {
928 "AWS_OWNED_KMS_KEY".to_string()
929 } else {
930 s.r#type.clone()
931 };
932 (enc_type, s.kms_key_identifier.clone())
933 }
934 None => ("AWS_OWNED_KMS_KEY".to_string(), None),
935 }
936}
937
938fn wire_schema_to_types(schema: &wire::SchemaDefinition) -> types::SchemaDefinition {
939 types::SchemaDefinition {
940 all_columns: schema
941 .all_columns
942 .iter()
943 .map(|c| types::ColumnDefinition {
944 name: c.name.clone(),
945 column_type: c.r#type.clone(),
946 })
947 .collect(),
948 partition_keys: schema
949 .partition_keys
950 .iter()
951 .map(|p| p.name.clone())
952 .collect(),
953 clustering_keys: schema
954 .clustering_keys
955 .as_deref()
956 .map(|cs| {
957 cs.iter()
958 .map(|c| types::ClusteringKey {
959 name: c.name.clone(),
960 order_by: if c.order_by.is_empty() {
961 "ASC".to_string()
962 } else {
963 c.order_by.clone()
964 },
965 })
966 .collect()
967 })
968 .unwrap_or_default(),
969 static_columns: schema
970 .static_columns
971 .as_deref()
972 .map(|cols| cols.iter().map(|c| c.name.clone()).collect())
973 .unwrap_or_default(),
974 }
975}
976
977fn table_schema_to_wire(schema: &types::SchemaDefinition) -> wire::SchemaDefinition {
978 wire::SchemaDefinition {
979 all_columns: schema
980 .all_columns
981 .iter()
982 .map(|c| wire::ColumnDefinition {
983 name: c.name.clone(),
984 r#type: c.column_type.clone(),
985 })
986 .collect(),
987 partition_keys: schema
988 .partition_keys
989 .iter()
990 .map(|p| wire::PartitionKey { name: p.clone() })
991 .collect(),
992 clustering_keys: if schema.clustering_keys.is_empty() {
993 None
994 } else {
995 Some(
996 schema
997 .clustering_keys
998 .iter()
999 .map(|c| wire::ClusteringKey {
1000 name: c.name.clone(),
1001 order_by: c.order_by.clone(),
1002 })
1003 .collect(),
1004 )
1005 },
1006 static_columns: if schema.static_columns.is_empty() {
1007 None
1008 } else {
1009 Some(
1010 schema
1011 .static_columns
1012 .iter()
1013 .map(|s| wire::StaticColumn { name: s.clone() })
1014 .collect(),
1015 )
1016 },
1017 }
1018}