Skip to main content

winterbaume_keyspaces/
handlers.rs

1use std::collections::HashMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5
6use serde_json::Value;
7use winterbaume_core::{
8    BackendState, MockRequest, MockResponse, MockService, StateChangeNotifier, StatefulService,
9    default_account_id, json_error_response,
10};
11
12use crate::state::{KeyspacesError, KeyspacesState};
13use crate::types;
14use crate::views::KeyspacesStateView;
15use crate::wire;
16
17pub struct KeyspacesService {
18    pub(crate) state: Arc<BackendState<KeyspacesState>>,
19    pub(crate) notifier: StateChangeNotifier<KeyspacesStateView>,
20}
21
22impl KeyspacesService {
23    pub fn new() -> Self {
24        Self {
25            state: Arc::new(BackendState::new()),
26            notifier: StateChangeNotifier::new(),
27        }
28    }
29}
30
31impl Default for KeyspacesService {
32    fn default() -> Self {
33        Self::new()
34    }
35}
36
37impl MockService for KeyspacesService {
38    fn service_name(&self) -> &str {
39        "cassandra"
40    }
41
42    fn url_patterns(&self) -> Vec<&str> {
43        vec![
44            r"https?://cassandra\..*\.amazonaws\.com",
45            r"https?://cassandra\.amazonaws\.com",
46        ]
47    }
48
49    fn handle(
50        &self,
51        request: MockRequest,
52    ) -> Pin<Box<dyn Future<Output = MockResponse> + Send + '_>> {
53        Box::pin(async move { self.dispatch(request).await })
54    }
55}
56
57impl KeyspacesService {
58    async fn dispatch(&self, request: MockRequest) -> MockResponse {
59        let region = winterbaume_core::auth::extract_region_from_uri(&request.uri);
60        let account_id = default_account_id();
61
62        let action = request
63            .headers
64            .get("x-amz-target")
65            .and_then(|v| v.to_str().ok())
66            .and_then(|v| v.split('.').next_back())
67            .map(|s| s.to_string());
68
69        let action = match action {
70            Some(a) => a,
71            None => {
72                return json_error_response(400, "MissingAction", "Missing X-Amz-Target header");
73            }
74        };
75
76        // Validate the body is well-formed JSON up-front; the typed deserialisers in
77        // `wire` re-parse the bytes per operation.
78        if !request.body.is_empty() && serde_json::from_slice::<Value>(&request.body).is_err() {
79            return json_error_response(400, "SerializationException", "Invalid JSON body");
80        }
81        let body_bytes: &[u8] = &request.body;
82
83        let state = self.state.get(account_id, &region);
84
85        let response = match action.as_str() {
86            "CreateKeyspace" => {
87                self.handle_create_keyspace(&state, body_bytes, account_id, &region)
88                    .await
89            }
90            "GetKeyspace" => self.handle_get_keyspace(&state, body_bytes).await,
91            "DeleteKeyspace" => self.handle_delete_keyspace(&state, body_bytes).await,
92            "UpdateKeyspace" => self.handle_update_keyspace(&state, body_bytes).await,
93            "ListKeyspaces" => self.handle_list_keyspaces(&state).await,
94            "CreateTable" => {
95                self.handle_create_table(&state, body_bytes, account_id, &region)
96                    .await
97            }
98            "GetTable" => self.handle_get_table(&state, body_bytes).await,
99            "DeleteTable" => self.handle_delete_table(&state, body_bytes).await,
100            "UpdateTable" => self.handle_update_table(&state, body_bytes).await,
101            "ListTables" => self.handle_list_tables(&state, body_bytes).await,
102            "RestoreTable" => {
103                self.handle_restore_table(&state, body_bytes, account_id, &region)
104                    .await
105            }
106            "GetTableAutoScalingSettings" => {
107                self.handle_get_table_auto_scaling_settings(&state, body_bytes)
108                    .await
109            }
110            "CreateType" => self.handle_create_type(&state, body_bytes).await,
111            "GetType" => {
112                self.handle_get_type(&state, body_bytes, account_id, &region)
113                    .await
114            }
115            "DeleteType" => {
116                self.handle_delete_type(&state, body_bytes, account_id, &region)
117                    .await
118            }
119            "ListTypes" => self.handle_list_types(&state, body_bytes).await,
120            "TagResource" => self.handle_tag_resource(&state, body_bytes).await,
121            "UntagResource" => self.handle_untag_resource(&state, body_bytes).await,
122            "ListTagsForResource" => self.handle_list_tags_for_resource(&state, body_bytes).await,
123            _ => json_error_response(400, "InvalidAction", &format!("Unknown operation {action}")),
124        };
125
126        if response.status / 100 == 2 {
127            self.notify_state_changed(account_id, &region).await;
128        }
129        response
130    }
131
132    // ---- Keyspace handlers ----
133
134    async fn handle_create_keyspace(
135        &self,
136        state: &Arc<tokio::sync::RwLock<KeyspacesState>>,
137        body: &[u8],
138        account_id: &str,
139        region: &str,
140    ) -> MockResponse {
141        let input = match wire::deserialize_create_keyspace_request(body) {
142            Ok(v) => v,
143            Err(e) => return json_error_response(400, "ValidationException", &e),
144        };
145        if input.keyspace_name.is_empty() {
146            return json_error_response(400, "ValidationException", "keyspaceName is required");
147        }
148
149        let (replication_strategy, replication_regions) =
150            replication_spec_to_pair(input.replication_specification.as_ref());
151        let tags = tag_list_to_map(input.tags.as_deref());
152
153        let mut guard = state.write().await;
154        match guard.create_keyspace(
155            &input.keyspace_name,
156            &replication_strategy,
157            replication_regions,
158            tags,
159            account_id,
160            region,
161        ) {
162            Ok(arn) => wire::serialize_create_keyspace_response(&wire::CreateKeyspaceResponse {
163                resource_arn: Some(arn),
164            }),
165            Err(e) => keyspaces_error_response(&e),
166        }
167    }
168
169    async fn handle_get_keyspace(
170        &self,
171        state: &Arc<tokio::sync::RwLock<KeyspacesState>>,
172        body: &[u8],
173    ) -> MockResponse {
174        let input = match wire::deserialize_get_keyspace_request(body) {
175            Ok(v) => v,
176            Err(e) => return json_error_response(400, "ValidationException", &e),
177        };
178        if input.keyspace_name.is_empty() {
179            return json_error_response(400, "ValidationException", "keyspaceName is required");
180        }
181
182        let guard = state.read().await;
183        match guard.get_keyspace(&input.keyspace_name) {
184            Ok(ks) => wire::serialize_get_keyspace_response(&wire::GetKeyspaceResponse {
185                keyspace_name: Some(ks.name.clone()),
186                resource_arn: Some(ks.arn.clone()),
187                replication_strategy: Some(ks.replication_strategy.clone()),
188                replication_regions: if ks.replication_regions.is_empty() {
189                    None
190                } else {
191                    Some(ks.replication_regions.clone())
192                },
193                ..Default::default()
194            }),
195            Err(e) => keyspaces_error_response(&e),
196        }
197    }
198
199    async fn handle_delete_keyspace(
200        &self,
201        state: &Arc<tokio::sync::RwLock<KeyspacesState>>,
202        body: &[u8],
203    ) -> MockResponse {
204        let input = match wire::deserialize_delete_keyspace_request(body) {
205            Ok(v) => v,
206            Err(e) => return json_error_response(400, "ValidationException", &e),
207        };
208        if input.keyspace_name.is_empty() {
209            return json_error_response(400, "ValidationException", "keyspaceName is required");
210        }
211
212        let mut guard = state.write().await;
213        match guard.delete_keyspace(&input.keyspace_name) {
214            Ok(()) => wire::serialize_delete_keyspace_response(&wire::DeleteKeyspaceResponse {}),
215            Err(e) => keyspaces_error_response(&e),
216        }
217    }
218
219    async fn handle_update_keyspace(
220        &self,
221        state: &Arc<tokio::sync::RwLock<KeyspacesState>>,
222        body: &[u8],
223    ) -> MockResponse {
224        let input = match wire::deserialize_update_keyspace_request(body) {
225            Ok(v) => v,
226            Err(e) => return json_error_response(400, "ValidationException", &e),
227        };
228        if input.keyspace_name.is_empty() {
229            return json_error_response(400, "ValidationException", "keyspaceName is required");
230        }
231
232        let (replication_strategy, replication_regions) =
233            replication_spec_to_pair(Some(&input.replication_specification));
234
235        let mut guard = state.write().await;
236        match guard.update_keyspace(
237            &input.keyspace_name,
238            &replication_strategy,
239            replication_regions,
240        ) {
241            Ok(arn) => wire::serialize_update_keyspace_response(&wire::UpdateKeyspaceResponse {
242                resource_arn: Some(arn),
243            }),
244            Err(e) => keyspaces_error_response(&e),
245        }
246    }
247
248    async fn handle_list_keyspaces(
249        &self,
250        state: &Arc<tokio::sync::RwLock<KeyspacesState>>,
251    ) -> MockResponse {
252        let guard = state.read().await;
253        let keyspaces = guard.list_keyspaces();
254
255        wire::serialize_list_keyspaces_response(&wire::ListKeyspacesResponse {
256            keyspaces: Some(
257                keyspaces
258                    .into_iter()
259                    .map(|ks| wire::KeyspaceSummary {
260                        keyspace_name: Some(ks.name.clone()),
261                        resource_arn: Some(ks.arn.clone()),
262                        replication_strategy: Some(ks.replication_strategy.clone()),
263                        replication_regions: if ks.replication_regions.is_empty() {
264                            None
265                        } else {
266                            Some(ks.replication_regions.clone())
267                        },
268                    })
269                    .collect(),
270            ),
271            ..Default::default()
272        })
273    }
274
275    // ---- Table handlers ----
276
277    async fn handle_create_table(
278        &self,
279        state: &Arc<tokio::sync::RwLock<KeyspacesState>>,
280        body: &[u8],
281        account_id: &str,
282        region: &str,
283    ) -> MockResponse {
284        let input = match wire::deserialize_create_table_request(body) {
285            Ok(v) => v,
286            Err(e) => return json_error_response(400, "ValidationException", &e),
287        };
288        if input.keyspace_name.is_empty() {
289            return json_error_response(400, "ValidationException", "keyspaceName is required");
290        }
291        if input.table_name.is_empty() {
292            return json_error_response(400, "ValidationException", "tableName is required");
293        }
294
295        // The Smithy schemaDefinition member is required, but defaults to empty when absent.
296        // Treat an empty allColumns/partitionKeys as missing to preserve existing behaviour.
297        if input.schema_definition.all_columns.is_empty()
298            && input.schema_definition.partition_keys.is_empty()
299        {
300            return json_error_response(400, "ValidationException", "schemaDefinition is required");
301        }
302        let schema = wire_schema_to_types(&input.schema_definition);
303
304        let (capacity_mode, rcu, wcu) =
305            capacity_spec_to_tuple(input.capacity_specification.as_ref(), "PAY_PER_REQUEST");
306        let (encryption_type, kms_key) =
307            encryption_spec_to_tuple(input.encryption_specification.as_ref());
308        let pitr = input
309            .point_in_time_recovery
310            .as_ref()
311            .map(|p| p.status == "ENABLED")
312            .unwrap_or(false);
313        let ttl_status = input
314            .ttl
315            .as_ref()
316            .map(|t| t.status.clone())
317            .unwrap_or_else(|| "ENABLED".to_string());
318        let default_ttl = input.default_time_to_live;
319        let comment = input
320            .comment
321            .as_ref()
322            .map(|c| c.message.clone())
323            .unwrap_or_default();
324        let cst = input
325            .client_side_timestamps
326            .as_ref()
327            .map(|c| c.status == "ENABLED")
328            .unwrap_or(false);
329        let tags = tag_list_to_map(input.tags.as_deref());
330
331        let mut guard = state.write().await;
332        match guard.create_table(
333            &input.keyspace_name,
334            &input.table_name,
335            schema,
336            &capacity_mode,
337            rcu,
338            wcu,
339            &encryption_type,
340            kms_key,
341            pitr,
342            &ttl_status,
343            default_ttl,
344            &comment,
345            cst,
346            tags,
347            account_id,
348            region,
349        ) {
350            Ok(arn) => wire::serialize_create_table_response(&wire::CreateTableResponse {
351                resource_arn: Some(arn),
352            }),
353            Err(e) => keyspaces_error_response(&e),
354        }
355    }
356
357    async fn handle_get_table(
358        &self,
359        state: &Arc<tokio::sync::RwLock<KeyspacesState>>,
360        body: &[u8],
361    ) -> MockResponse {
362        let input = match wire::deserialize_get_table_request(body) {
363            Ok(v) => v,
364            Err(e) => return json_error_response(400, "ValidationException", &e),
365        };
366        if input.keyspace_name.is_empty() {
367            return json_error_response(400, "ValidationException", "keyspaceName is required");
368        }
369        if input.table_name.is_empty() {
370            return json_error_response(400, "ValidationException", "tableName is required");
371        }
372
373        let guard = state.read().await;
374        match guard.get_table(&input.keyspace_name, &input.table_name) {
375            Ok(table) => wire::serialize_get_table_response(&wire::GetTableResponse {
376                keyspace_name: Some(table.keyspace_name.clone()),
377                table_name: Some(table.table_name.clone()),
378                resource_arn: Some(table.arn.clone()),
379                creation_timestamp: Some(table.creation_timestamp.timestamp() as f64),
380                status: Some(table.status.clone()),
381                schema_definition: Some(table_schema_to_wire(&table.schema_definition)),
382                capacity_specification: Some(wire::CapacitySpecificationSummary {
383                    throughput_mode: Some(table.capacity_mode.clone()),
384                    read_capacity_units: table.read_capacity_units,
385                    write_capacity_units: table.write_capacity_units,
386                    ..Default::default()
387                }),
388                encryption_specification: Some(wire::EncryptionSpecification {
389                    r#type: table.encryption_type.clone(),
390                    kms_key_identifier: table.kms_key_identifier.clone(),
391                }),
392                point_in_time_recovery: Some(wire::PointInTimeRecoverySummary {
393                    status: Some(if table.point_in_time_recovery_enabled {
394                        "ENABLED".to_string()
395                    } else {
396                        "DISABLED".to_string()
397                    }),
398                    ..Default::default()
399                }),
400                ttl: Some(wire::TimeToLive {
401                    status: table.ttl_status.clone(),
402                }),
403                default_time_to_live: table.default_time_to_live,
404                comment: if table.comment.is_empty() {
405                    None
406                } else {
407                    Some(wire::Comment {
408                        message: table.comment.clone(),
409                    })
410                },
411                client_side_timestamps: Some(wire::ClientSideTimestamps {
412                    status: if table.client_side_timestamps_enabled {
413                        "ENABLED".to_string()
414                    } else {
415                        "DISABLED".to_string()
416                    },
417                }),
418                ..Default::default()
419            }),
420            Err(e) => keyspaces_error_response(&e),
421        }
422    }
423
424    async fn handle_delete_table(
425        &self,
426        state: &Arc<tokio::sync::RwLock<KeyspacesState>>,
427        body: &[u8],
428    ) -> MockResponse {
429        let input = match wire::deserialize_delete_table_request(body) {
430            Ok(v) => v,
431            Err(e) => return json_error_response(400, "ValidationException", &e),
432        };
433        if input.keyspace_name.is_empty() {
434            return json_error_response(400, "ValidationException", "keyspaceName is required");
435        }
436        if input.table_name.is_empty() {
437            return json_error_response(400, "ValidationException", "tableName is required");
438        }
439
440        let mut guard = state.write().await;
441        match guard.delete_table(&input.keyspace_name, &input.table_name) {
442            Ok(()) => wire::serialize_delete_table_response(&wire::DeleteTableResponse {}),
443            Err(e) => keyspaces_error_response(&e),
444        }
445    }
446
447    async fn handle_update_table(
448        &self,
449        state: &Arc<tokio::sync::RwLock<KeyspacesState>>,
450        body: &[u8],
451    ) -> MockResponse {
452        let input = match wire::deserialize_update_table_request(body) {
453            Ok(v) => v,
454            Err(e) => return json_error_response(400, "ValidationException", &e),
455        };
456        if input.keyspace_name.is_empty() {
457            return json_error_response(400, "ValidationException", "keyspaceName is required");
458        }
459        if input.table_name.is_empty() {
460            return json_error_response(400, "ValidationException", "tableName is required");
461        }
462
463        let capacity_mode = input
464            .capacity_specification
465            .as_ref()
466            .map(|c| c.throughput_mode.clone());
467        let rcu = input
468            .capacity_specification
469            .as_ref()
470            .and_then(|c| c.read_capacity_units);
471        let wcu = input
472            .capacity_specification
473            .as_ref()
474            .and_then(|c| c.write_capacity_units);
475        let encryption_type = input
476            .encryption_specification
477            .as_ref()
478            .map(|e| e.r#type.clone());
479        let kms_key = input
480            .encryption_specification
481            .as_ref()
482            .and_then(|e| e.kms_key_identifier.clone());
483        let pitr = input
484            .point_in_time_recovery
485            .as_ref()
486            .map(|p| p.status == "ENABLED");
487        let ttl_status = input.ttl.as_ref().map(|t| t.status.clone());
488        let default_ttl = input.default_time_to_live;
489        let cst = input
490            .client_side_timestamps
491            .as_ref()
492            .map(|c| c.status == "ENABLED");
493
494        let mut guard = state.write().await;
495
496        // Handle addColumns first.
497        if let Some(add_cols) = input.add_columns.as_deref() {
498            let key = (input.keyspace_name.clone(), input.table_name.clone());
499            if let Some(table) = guard.tables.get_mut(&key) {
500                for col in add_cols {
501                    table
502                        .schema_definition
503                        .all_columns
504                        .push(types::ColumnDefinition {
505                            name: col.name.clone(),
506                            column_type: col.r#type.clone(),
507                        });
508                }
509            }
510        }
511
512        match guard.update_table(
513            &input.keyspace_name,
514            &input.table_name,
515            capacity_mode.as_deref(),
516            rcu,
517            wcu,
518            encryption_type.as_deref(),
519            kms_key,
520            pitr,
521            ttl_status.as_deref(),
522            default_ttl,
523            cst,
524        ) {
525            Ok(arn) => wire::serialize_update_table_response(&wire::UpdateTableResponse {
526                resource_arn: Some(arn),
527            }),
528            Err(e) => keyspaces_error_response(&e),
529        }
530    }
531
532    async fn handle_list_tables(
533        &self,
534        state: &Arc<tokio::sync::RwLock<KeyspacesState>>,
535        body: &[u8],
536    ) -> MockResponse {
537        let input = match wire::deserialize_list_tables_request(body) {
538            Ok(v) => v,
539            Err(e) => return json_error_response(400, "ValidationException", &e),
540        };
541        if input.keyspace_name.is_empty() {
542            return json_error_response(400, "ValidationException", "keyspaceName is required");
543        }
544
545        let guard = state.read().await;
546        match guard.list_tables(&input.keyspace_name) {
547            Ok(tables) => wire::serialize_list_tables_response(&wire::ListTablesResponse {
548                tables: Some(
549                    tables
550                        .into_iter()
551                        .map(|t| wire::TableSummary {
552                            keyspace_name: Some(t.keyspace_name.clone()),
553                            table_name: Some(t.table_name.clone()),
554                            resource_arn: Some(t.arn.clone()),
555                        })
556                        .collect(),
557                ),
558                ..Default::default()
559            }),
560            Err(e) => keyspaces_error_response(&e),
561        }
562    }
563
564    async fn handle_restore_table(
565        &self,
566        state: &Arc<tokio::sync::RwLock<KeyspacesState>>,
567        body: &[u8],
568        account_id: &str,
569        region: &str,
570    ) -> MockResponse {
571        let input = match wire::deserialize_restore_table_request(body) {
572            Ok(v) => v,
573            Err(e) => return json_error_response(400, "ValidationException", &e),
574        };
575        if input.source_keyspace_name.is_empty() {
576            return json_error_response(
577                400,
578                "ValidationException",
579                "sourceKeyspaceName is required",
580            );
581        }
582        if input.source_table_name.is_empty() {
583            return json_error_response(400, "ValidationException", "sourceTableName is required");
584        }
585        if input.target_keyspace_name.is_empty() {
586            return json_error_response(
587                400,
588                "ValidationException",
589                "targetKeyspaceName is required",
590            );
591        }
592        if input.target_table_name.is_empty() {
593            return json_error_response(400, "ValidationException", "targetTableName is required");
594        }
595
596        let mut guard = state.write().await;
597        match guard.restore_table(
598            &input.source_keyspace_name,
599            &input.source_table_name,
600            &input.target_keyspace_name,
601            &input.target_table_name,
602            account_id,
603            region,
604        ) {
605            Ok(arn) => wire::serialize_restore_table_response(&wire::RestoreTableResponse {
606                restored_table_a_r_n: Some(arn),
607            }),
608            Err(e) => keyspaces_error_response(&e),
609        }
610    }
611
612    async fn handle_get_table_auto_scaling_settings(
613        &self,
614        state: &Arc<tokio::sync::RwLock<KeyspacesState>>,
615        body: &[u8],
616    ) -> MockResponse {
617        let input = match wire::deserialize_get_table_auto_scaling_settings_request(body) {
618            Ok(v) => v,
619            Err(e) => return json_error_response(400, "ValidationException", &e),
620        };
621        if input.keyspace_name.is_empty() {
622            return json_error_response(400, "ValidationException", "keyspaceName is required");
623        }
624        if input.table_name.is_empty() {
625            return json_error_response(400, "ValidationException", "tableName is required");
626        }
627
628        let guard = state.read().await;
629        match guard.get_table(&input.keyspace_name, &input.table_name) {
630            Ok(table) => wire::serialize_get_table_auto_scaling_settings_response(
631                &wire::GetTableAutoScalingSettingsResponse {
632                    keyspace_name: Some(table.keyspace_name.clone()),
633                    table_name: Some(table.table_name.clone()),
634                    resource_arn: Some(table.arn.clone()),
635                    ..Default::default()
636                },
637            ),
638            Err(e) => keyspaces_error_response(&e),
639        }
640    }
641
642    // ---- Type handlers ----
643
644    async fn handle_create_type(
645        &self,
646        state: &Arc<tokio::sync::RwLock<KeyspacesState>>,
647        body: &[u8],
648    ) -> MockResponse {
649        let input = match wire::deserialize_create_type_request(body) {
650            Ok(v) => v,
651            Err(e) => return json_error_response(400, "ValidationException", &e),
652        };
653        if input.keyspace_name.is_empty() {
654            return json_error_response(400, "ValidationException", "keyspaceName is required");
655        }
656        if input.type_name.is_empty() {
657            return json_error_response(400, "ValidationException", "typeName is required");
658        }
659
660        let field_defs: Vec<types::FieldDefinition> = input
661            .field_definitions
662            .into_iter()
663            .map(|f| types::FieldDefinition {
664                name: f.name,
665                field_type: f.r#type,
666            })
667            .collect();
668
669        let mut guard = state.write().await;
670        // Get keyspace ARN for the response.
671        let keyspace_arn = guard
672            .get_keyspace(&input.keyspace_name)
673            .ok()
674            .map(|ks| ks.arn.clone());
675
676        match guard.create_type(&input.keyspace_name, &input.type_name, field_defs) {
677            Ok(_) => wire::serialize_create_type_response(&wire::CreateTypeResponse {
678                keyspace_arn,
679                type_name: Some(input.type_name.clone()),
680            }),
681            Err(e) => keyspaces_error_response(&e),
682        }
683    }
684
685    async fn handle_get_type(
686        &self,
687        state: &Arc<tokio::sync::RwLock<KeyspacesState>>,
688        body: &[u8],
689        account_id: &str,
690        region: &str,
691    ) -> MockResponse {
692        let input = match wire::deserialize_get_type_request(body) {
693            Ok(v) => v,
694            Err(e) => return json_error_response(400, "ValidationException", &e),
695        };
696        if input.keyspace_name.is_empty() {
697            return json_error_response(400, "ValidationException", "keyspaceName is required");
698        }
699        if input.type_name.is_empty() {
700            return json_error_response(400, "ValidationException", "typeName is required");
701        }
702
703        let guard = state.read().await;
704        match guard.get_type(&input.keyspace_name, &input.type_name) {
705            Ok(udt) => {
706                let keyspace_arn = format!(
707                    "arn:aws:cassandra:{region}:{account_id}:/keyspace/{}/",
708                    input.keyspace_name
709                );
710                wire::serialize_get_type_response(&wire::GetTypeResponse {
711                    keyspace_name: Some(udt.keyspace_name.clone()),
712                    type_name: Some(udt.type_name.clone()),
713                    keyspace_arn: Some(keyspace_arn),
714                    field_definitions: Some(
715                        udt.field_definitions
716                            .iter()
717                            .map(|f| wire::FieldDefinition {
718                                name: f.name.clone(),
719                                r#type: f.field_type.clone(),
720                            })
721                            .collect(),
722                    ),
723                    last_modified_timestamp: Some(udt.creation_timestamp.timestamp() as f64),
724                    status: Some(udt.status.clone()),
725                    ..Default::default()
726                })
727            }
728            Err(e) => keyspaces_error_response(&e),
729        }
730    }
731
732    async fn handle_delete_type(
733        &self,
734        state: &Arc<tokio::sync::RwLock<KeyspacesState>>,
735        body: &[u8],
736        account_id: &str,
737        region: &str,
738    ) -> MockResponse {
739        let input = match wire::deserialize_delete_type_request(body) {
740            Ok(v) => v,
741            Err(e) => return json_error_response(400, "ValidationException", &e),
742        };
743        if input.keyspace_name.is_empty() {
744            return json_error_response(400, "ValidationException", "keyspaceName is required");
745        }
746        if input.type_name.is_empty() {
747            return json_error_response(400, "ValidationException", "typeName is required");
748        }
749
750        let keyspace_arn = format!(
751            "arn:aws:cassandra:{region}:{account_id}:/keyspace/{}/",
752            input.keyspace_name
753        );
754
755        let mut guard = state.write().await;
756        match guard.delete_type(&input.keyspace_name, &input.type_name) {
757            Ok(()) => wire::serialize_delete_type_response(&wire::DeleteTypeResponse {
758                keyspace_arn: Some(keyspace_arn),
759                type_name: Some(input.type_name.clone()),
760            }),
761            Err(e) => keyspaces_error_response(&e),
762        }
763    }
764
765    async fn handle_list_types(
766        &self,
767        state: &Arc<tokio::sync::RwLock<KeyspacesState>>,
768        body: &[u8],
769    ) -> MockResponse {
770        let input = match wire::deserialize_list_types_request(body) {
771            Ok(v) => v,
772            Err(e) => return json_error_response(400, "ValidationException", &e),
773        };
774        if input.keyspace_name.is_empty() {
775            return json_error_response(400, "ValidationException", "keyspaceName is required");
776        }
777
778        let guard = state.read().await;
779        match guard.list_types(&input.keyspace_name) {
780            Ok(types) => wire::serialize_list_types_response(&wire::ListTypesResponse {
781                types: Some(types.into_iter().map(|t| t.type_name.clone()).collect()),
782                ..Default::default()
783            }),
784            Err(e) => keyspaces_error_response(&e),
785        }
786    }
787
788    // ---- Tag handlers ----
789
790    async fn handle_tag_resource(
791        &self,
792        state: &Arc<tokio::sync::RwLock<KeyspacesState>>,
793        body: &[u8],
794    ) -> MockResponse {
795        let input = match wire::deserialize_tag_resource_request(body) {
796            Ok(v) => v,
797            Err(e) => return json_error_response(400, "ValidationException", &e),
798        };
799        if input.resource_arn.is_empty() {
800            return json_error_response(400, "ValidationException", "resourceArn is required");
801        }
802
803        let tags = tag_list_to_map(Some(input.tags.as_slice()));
804
805        let mut guard = state.write().await;
806        match guard.tag_resource(&input.resource_arn, tags) {
807            Ok(()) => wire::serialize_tag_resource_response(&wire::TagResourceResponse {}),
808            Err(e) => keyspaces_error_response(&e),
809        }
810    }
811
812    async fn handle_untag_resource(
813        &self,
814        state: &Arc<tokio::sync::RwLock<KeyspacesState>>,
815        body: &[u8],
816    ) -> MockResponse {
817        let input = match wire::deserialize_untag_resource_request(body) {
818            Ok(v) => v,
819            Err(e) => return json_error_response(400, "ValidationException", &e),
820        };
821        if input.resource_arn.is_empty() {
822            return json_error_response(400, "ValidationException", "resourceArn is required");
823        }
824
825        let tag_keys: Vec<String> = input.tags.into_iter().map(|t| t.key).collect();
826
827        let mut guard = state.write().await;
828        match guard.untag_resource(&input.resource_arn, &tag_keys) {
829            Ok(()) => wire::serialize_untag_resource_response(&wire::UntagResourceResponse {}),
830            Err(e) => keyspaces_error_response(&e),
831        }
832    }
833
834    async fn handle_list_tags_for_resource(
835        &self,
836        state: &Arc<tokio::sync::RwLock<KeyspacesState>>,
837        body: &[u8],
838    ) -> MockResponse {
839        let input = match wire::deserialize_list_tags_for_resource_request(body) {
840            Ok(v) => v,
841            Err(e) => return json_error_response(400, "ValidationException", &e),
842        };
843        if input.resource_arn.is_empty() {
844            return json_error_response(400, "ValidationException", "resourceArn is required");
845        }
846
847        let guard = state.read().await;
848        match guard.get_tags_for_resource(&input.resource_arn) {
849            Ok(tags) => wire::serialize_list_tags_for_resource_response(
850                &wire::ListTagsForResourceResponse {
851                    tags: Some(
852                        tags.into_iter()
853                            .map(|(k, v)| wire::Tag { key: k, value: v })
854                            .collect(),
855                    ),
856                    ..Default::default()
857                },
858            ),
859            Err(e) => keyspaces_error_response(&e),
860        }
861    }
862}
863
864// ---- Error shaping ----
865
866fn keyspaces_error_response(err: &KeyspacesError) -> MockResponse {
867    let (status, error_type) = match err {
868        KeyspacesError::NotFound { .. } => (404, "ResourceNotFoundException"),
869        KeyspacesError::AlreadyExists { .. } => (409, "ConflictException"),
870        KeyspacesError::Validation { .. } => (400, "ValidationException"),
871        KeyspacesError::Conflict { .. } => (409, "ConflictException"),
872    };
873    json_error_response(status, error_type, &err.to_string())
874}
875
876// ---- Wire to domain helpers ----
877
878fn replication_spec_to_pair(
879    spec: Option<&wire::ReplicationSpecification>,
880) -> (String, Vec<String>) {
881    match spec {
882        Some(s) => {
883            let strategy = if s.replication_strategy.is_empty() {
884                "SINGLE_REGION".to_string()
885            } else {
886                s.replication_strategy.clone()
887            };
888            let regions = s.region_list.clone().unwrap_or_default();
889            (strategy, regions)
890        }
891        None => ("SINGLE_REGION".to_string(), Vec::new()),
892    }
893}
894
895fn tag_list_to_map(tags: Option<&[wire::Tag]>) -> HashMap<String, String> {
896    let mut map = HashMap::new();
897    if let Some(list) = tags {
898        for t in list {
899            map.insert(t.key.clone(), t.value.clone());
900        }
901    }
902    map
903}
904
905fn capacity_spec_to_tuple(
906    spec: Option<&wire::CapacitySpecification>,
907    default_mode: &str,
908) -> (String, Option<i64>, Option<i64>) {
909    match spec {
910        Some(s) => {
911            let mode = if s.throughput_mode.is_empty() {
912                default_mode.to_string()
913            } else {
914                s.throughput_mode.clone()
915            };
916            (mode, s.read_capacity_units, s.write_capacity_units)
917        }
918        None => (default_mode.to_string(), None, None),
919    }
920}
921
922fn encryption_spec_to_tuple(
923    spec: Option<&wire::EncryptionSpecification>,
924) -> (String, Option<String>) {
925    match spec {
926        Some(s) => {
927            let enc_type = if s.r#type.is_empty() {
928                "AWS_OWNED_KMS_KEY".to_string()
929            } else {
930                s.r#type.clone()
931            };
932            (enc_type, s.kms_key_identifier.clone())
933        }
934        None => ("AWS_OWNED_KMS_KEY".to_string(), None),
935    }
936}
937
938fn wire_schema_to_types(schema: &wire::SchemaDefinition) -> types::SchemaDefinition {
939    types::SchemaDefinition {
940        all_columns: schema
941            .all_columns
942            .iter()
943            .map(|c| types::ColumnDefinition {
944                name: c.name.clone(),
945                column_type: c.r#type.clone(),
946            })
947            .collect(),
948        partition_keys: schema
949            .partition_keys
950            .iter()
951            .map(|p| p.name.clone())
952            .collect(),
953        clustering_keys: schema
954            .clustering_keys
955            .as_deref()
956            .map(|cs| {
957                cs.iter()
958                    .map(|c| types::ClusteringKey {
959                        name: c.name.clone(),
960                        order_by: if c.order_by.is_empty() {
961                            "ASC".to_string()
962                        } else {
963                            c.order_by.clone()
964                        },
965                    })
966                    .collect()
967            })
968            .unwrap_or_default(),
969        static_columns: schema
970            .static_columns
971            .as_deref()
972            .map(|cols| cols.iter().map(|c| c.name.clone()).collect())
973            .unwrap_or_default(),
974    }
975}
976
977fn table_schema_to_wire(schema: &types::SchemaDefinition) -> wire::SchemaDefinition {
978    wire::SchemaDefinition {
979        all_columns: schema
980            .all_columns
981            .iter()
982            .map(|c| wire::ColumnDefinition {
983                name: c.name.clone(),
984                r#type: c.column_type.clone(),
985            })
986            .collect(),
987        partition_keys: schema
988            .partition_keys
989            .iter()
990            .map(|p| wire::PartitionKey { name: p.clone() })
991            .collect(),
992        clustering_keys: if schema.clustering_keys.is_empty() {
993            None
994        } else {
995            Some(
996                schema
997                    .clustering_keys
998                    .iter()
999                    .map(|c| wire::ClusteringKey {
1000                        name: c.name.clone(),
1001                        order_by: c.order_by.clone(),
1002                    })
1003                    .collect(),
1004            )
1005        },
1006        static_columns: if schema.static_columns.is_empty() {
1007            None
1008        } else {
1009            Some(
1010                schema
1011                    .static_columns
1012                    .iter()
1013                    .map(|s| wire::StaticColumn { name: s.clone() })
1014                    .collect(),
1015            )
1016        },
1017    }
1018}