Skip to main content

dynoxide/
server.rs

1//! Axum-based HTTP server exposing the DynamoDB JSON API.
2//!
3//! Only compiled with the `http-server` feature flag.
4
5use crate::Database;
6use axum::{
7    Router,
8    body::Body,
9    extract::{DefaultBodyLimit, State},
10    http::{HeaderMap, HeaderName, HeaderValue, Method, StatusCode, Uri, header::SERVER},
11    response::Response,
12    routing::any,
13};
14use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, TcpStream};
15use std::time::Duration;
16use tower_http::set_header::SetResponseHeaderLayer;
17
18/// AWS region used in the health-check response body (mirrors DynamoDB Local behaviour).
19const AWS_REGION: &str = "us-east-1";
20
21const CONTENT_TYPE: &str = "application/x-amz-json-1.0";
22const TARGET_PREFIX: &str = "DynamoDB_20120810.";
23const STREAMS_TARGET_PREFIX: &str = "DynamoDBStreams_20120810.";
24
25/// Check whether the port is already in use by attempting a TCP connection.
26///
27/// Probes both the requested address and the cross-address (wildcard vs localhost)
28/// to detect conflicts like Docker binding `0.0.0.0` when dynoxide requests `127.0.0.1`.
29fn check_port_available(addr: SocketAddr) -> Result<(), String> {
30    let timeout = Duration::from_millis(100);
31    let port = addr.port();
32
33    // Cross-check: if binding to loopback, also probe the wildcard (and vice versa),
34    // within the same address family.
35    let cross = SocketAddr::new(
36        match addr.ip() {
37            IpAddr::V4(ip) if ip.is_loopback() => IpAddr::V4(Ipv4Addr::UNSPECIFIED),
38            IpAddr::V4(_) => IpAddr::V4(Ipv4Addr::LOCALHOST),
39            IpAddr::V6(ip) if ip.is_loopback() => IpAddr::V6(Ipv6Addr::UNSPECIFIED),
40            IpAddr::V6(_) => IpAddr::V6(Ipv6Addr::LOCALHOST),
41        },
42        port,
43    );
44
45    for probe in [addr, cross] {
46        if TcpStream::connect_timeout(&probe, timeout).is_ok() {
47            return Err(format!(
48                "port {port} is already in use (detected listener on {probe})"
49            ));
50        }
51    }
52    Ok(())
53}
54
55/// Bind a TCP listener.
56///
57/// `SO_REUSEADDR` on Unix lets us rebind past `TIME_WAIT` sockets from a
58/// previous clean shutdown. It doesn't let two live listeners share a port
59/// (that's `SO_REUSEPORT`), so port-conflict detection still works. Not set
60/// on Windows: the semantics there allow hijacking an active bind.
61fn bind_exclusive(addr: SocketAddr) -> Result<std::net::TcpListener, String> {
62    use socket2::{Domain, Protocol, Socket, Type};
63
64    let domain = if addr.is_ipv6() {
65        Domain::IPV6
66    } else {
67        Domain::IPV4
68    };
69
70    let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))
71        .map_err(|e| format!("failed to create socket: {e}"))?;
72
73    #[cfg(unix)]
74    socket
75        .set_reuse_address(true)
76        .map_err(|e| format!("failed to set SO_REUSEADDR: {e}"))?;
77
78    socket
79        .set_nonblocking(true)
80        .map_err(|e| format!("failed to set nonblocking: {e}"))?;
81    socket
82        .bind(&addr.into())
83        .map_err(|e| format!("failed to bind {addr}: {e}"))?;
84    socket
85        .listen(1024)
86        .map_err(|e| format!("failed to listen on {addr}: {e}"))?;
87
88    Ok(std::net::TcpListener::from(socket))
89}
90
91/// Start the HTTP server.
92pub async fn start(host: &str, port: u16, db: Database) -> Result<(), String> {
93    let addr: SocketAddr = format!("{host}:{port}")
94        .parse()
95        .map_err(|e| format!("invalid address {host}:{port}: {e}"))?;
96
97    // Runs before any async tasks are spawned, so blocking connect probes are safe.
98    check_port_available(addr)?;
99
100    let std_listener = bind_exclusive(addr)?;
101    let listener = tokio::net::TcpListener::from_std(std_listener)
102        .map_err(|e| format!("failed to create async listener: {e}"))?;
103
104    let app = build_router(db);
105
106    eprintln!("Dynoxide listening on http://{addr}");
107
108    axum::serve(listener, app)
109        .with_graceful_shutdown(shutdown_signal())
110        .await
111        .map_err(|e| format!("server failed: {e}"))
112}
113
114/// Start on a specific listener (for tests).
115pub async fn serve_on(listener: tokio::net::TcpListener, db: Database) {
116    let app = build_router(db);
117    axum::serve(listener, app).await.unwrap();
118}
119
120/// DynamoDB accepts bodies up to 16 MB.
121const MAX_BODY_SIZE: usize = 16 * 1024 * 1024;
122
123/// Build the shared axum router used by both `start` and `serve_on`.
124fn build_router(db: Database) -> Router {
125    Router::new()
126        .route("/", any(handle_root))
127        .fallback(handle_fallback)
128        .layer(DefaultBodyLimit::max(MAX_BODY_SIZE))
129        .layer(SetResponseHeaderLayer::overriding(
130            SERVER,
131            HeaderValue::from_static(concat!("Dynoxide/", env!("CARGO_PKG_VERSION"))),
132        ))
133        .layer(SetResponseHeaderLayer::overriding(
134            HeaderName::from_static("x-dynoxide-version"),
135            HeaderValue::from_static(env!("CARGO_PKG_VERSION")),
136        ))
137        .with_state(db)
138}
139
140/// The 404 body DynamoDB returns for non-POST methods.
141const NOT_FOUND_BODY: &str = "<UnknownOperationException/>\n";
142
143/// Single handler for all methods on `/`. Dispatches based on method.
144///
145/// - GET: health check (200)
146/// - POST: DynamoDB API dispatch
147/// - OPTIONS with Origin: CORS preflight response (200)
148/// - OPTIONS without Origin: 404
149/// - All other methods (DELETE, PUT, etc.): 404
150async fn handle_root(
151    method: Method,
152    uri: Uri,
153    State(db): State<Database>,
154    headers: HeaderMap,
155    body: String,
156) -> Response {
157    let has_origin = headers.get("origin").is_some();
158
159    let mut resp = match method {
160        Method::GET => {
161            let body_str = format!("healthy: dynamodb.{AWS_REGION}.amazonaws.com ");
162            dynamo_response_raw(StatusCode::OK, &body_str)
163        }
164        Method::OPTIONS if has_origin => {
165            // CORS preflight: return 200 with CORS headers
166            let mut r = Response::builder()
167                .status(StatusCode::OK)
168                .body(Body::from(""))
169                .unwrap();
170            add_dynamo_headers(&mut r, b"");
171            // Set content-length to 0 explicitly
172            r.headers_mut().insert(
173                HeaderName::from_static("content-length"),
174                HeaderValue::from_static("0"),
175            );
176            r.headers_mut().insert(
177                HeaderName::from_static("access-control-allow-origin"),
178                HeaderValue::from_static("*"),
179            );
180            r.headers_mut().insert(
181                HeaderName::from_static("access-control-max-age"),
182                HeaderValue::from_static("172800"),
183            );
184            // Echo back request headers and method if present
185            if let Some(req_headers) = headers.get("access-control-request-headers") {
186                r.headers_mut().insert(
187                    HeaderName::from_static("access-control-allow-headers"),
188                    req_headers.clone(),
189                );
190            }
191            if let Some(req_method) = headers.get("access-control-request-method") {
192                r.headers_mut().insert(
193                    HeaderName::from_static("access-control-allow-methods"),
194                    req_method.clone(),
195                );
196            }
197            return r;
198        }
199        Method::POST => handle_request(uri, State(db), headers.clone(), body).await,
200        _ => {
201            // OPTIONS without Origin, DELETE, PUT, PATCH, etc. — all return 404.
202            dynamo_response_raw(StatusCode::NOT_FOUND, NOT_FOUND_BODY)
203        }
204    };
205
206    // Add CORS header to all responses if Origin is present
207    if has_origin {
208        resp.headers_mut().insert(
209            HeaderName::from_static("access-control-allow-origin"),
210            HeaderValue::from_static("*"),
211        );
212    }
213
214    resp
215}
216
217/// Fallback for all unmatched routes — returns 404.
218async fn handle_fallback() -> Response {
219    dynamo_response_raw(StatusCode::NOT_FOUND, NOT_FOUND_BODY)
220}
221
222async fn shutdown_signal() {
223    #[cfg(unix)]
224    {
225        use tokio::signal::unix::{SignalKind, signal};
226        let mut sigterm =
227            signal(SignalKind::terminate()).expect("failed to install SIGTERM handler");
228        tokio::select! {
229            _ = tokio::signal::ctrl_c() => {},
230            _ = sigterm.recv() => {},
231        }
232    }
233    #[cfg(not(unix))]
234    {
235        tokio::signal::ctrl_c()
236            .await
237            .expect("failed to install CTRL+C handler");
238    }
239    eprintln!("\nShutting down...");
240}
241
242async fn handle_request(
243    uri: Uri,
244    State(db): State<Database>,
245    headers: HeaderMap,
246    body: String,
247) -> Response {
248    // Check Content-Type header — DynamoDB accepts both application/json and
249    // application/x-amz-json-1.0, with optional parameters (e.g. ;charset=utf-8).
250    // The response Content-Type echoes the base media type from the request.
251    let raw_ct = headers
252        .get("content-type")
253        .and_then(|v| v.to_str().ok())
254        .unwrap_or("");
255
256    // Strip parameters and whitespace: "  application/json  ; charset=utf-8" → "application/json"
257    let base_ct = raw_ct.split(';').next().unwrap_or("").trim();
258
259    let is_amz_json = base_ct.eq_ignore_ascii_case(CONTENT_TYPE);
260    let is_plain_json = base_ct.eq_ignore_ascii_case("application/json");
261
262    // If neither recognised Content-Type AND there is a body, return 404.
263    // A POST with no body (or empty body) and no Content-Type is still treated
264    // as a DynamoDB request (DynamoDB accepts it and parses the empty body as JSON).
265    if !is_amz_json && !is_plain_json && (!body.is_empty() || !raw_ct.is_empty()) {
266        return dynamo_response_raw(StatusCode::NOT_FOUND, NOT_FOUND_BODY);
267    }
268
269    // Determine response content-type: echo the request's base media type.
270    // application/x-amz-json-1.0 requests get that back; everything else gets application/json.
271    let response_ct = if is_amz_json {
272        CONTENT_TYPE
273    } else {
274        "application/json"
275    };
276
277    // Try to parse body as JSON. DynamoDB requires a valid JSON object.
278    // Non-JSON → SerializationException (no message).
279    if !body.is_empty() && serde_json::from_str::<serde_json::Value>(&body).is_err() {
280        return serialization_exception_bare(response_ct);
281    }
282
283    // Check x-amz-target header
284    // NOTE: empty body check happens after target resolution — DynamoDB returns
285    // UnknownOperationException if no target, even with empty body.
286    let target = match headers.get("x-amz-target").and_then(|v| v.to_str().ok()) {
287        Some(t) => t,
288        None => {
289            // No target header — UnknownOperationException (no message)
290            return unknown_operation_response(response_ct);
291        }
292    };
293
294    let operation = target
295        .strip_prefix(TARGET_PREFIX)
296        .or_else(|| target.strip_prefix(STREAMS_TARGET_PREFIX));
297
298    let operation = match operation {
299        Some(op) if is_known_operation(op) => op,
300        _ => {
301            // Unrecognised target prefix or unknown operation
302            return unknown_operation_response(response_ct);
303        }
304    };
305
306    // Validate authentication headers — DynamoDB checks auth after target resolution.
307    if let Some(auth_error) = validate_auth(&headers, &uri, response_ct) {
308        return auth_error;
309    }
310
311    // Empty body with a valid target → SerializationException (bare, no message).
312    // DynamoDB requires a JSON body for all operations.
313    if body.is_empty() {
314        return serialization_exception_bare(response_ct);
315    }
316
317    tracing::debug!(operation, body_len = body.len(), "request");
318    tracing::trace!(operation, body = %body, "request body");
319
320    match dispatch(&db, operation, &body) {
321        Ok(json) => {
322            tracing::debug!(operation, body_len = json.len(), "response");
323            tracing::trace!(operation, body = %json, "response body");
324            dynamo_response(StatusCode::OK, response_ct, json)
325        }
326        Err(e) => {
327            let status = StatusCode::from_u16(e.status_code()).unwrap_or(StatusCode::BAD_REQUEST);
328            let json = e.to_json();
329            tracing::warn!(operation, status = %status, "error response");
330            tracing::trace!(operation, body = %json, "error response body");
331            dynamo_response(status, response_ct, json)
332        }
333    }
334}
335
336/// Validate AWS authentication headers/query parameters.
337///
338/// DynamoDB checks auth after resolving the target operation. Returns `Some(Response)` if
339/// auth validation fails, `None` if auth is present (or if we choose to skip full
340/// signature verification).
341fn validate_auth(headers: &HeaderMap, uri: &Uri, response_ct: &str) -> Option<Response> {
342    let auth_header = headers.get("authorization").and_then(|v| v.to_str().ok());
343
344    // Check query string for X-Amz-Algorithm
345    let query = uri.query().unwrap_or("");
346    let has_algorithm_query = query.split('&').any(|p| {
347        let key = p.split('=').next().unwrap_or("");
348        key == "X-Amz-Algorithm"
349    });
350
351    // If both Authorization header AND X-Amz-Algorithm query → InvalidSignatureException
352    if auth_header.is_some() && has_algorithm_query {
353        let body = serde_json::json!({
354            "__type": "com.amazon.coral.service#InvalidSignatureException",
355            "message": "Found both 'X-Amz-Algorithm' as a query-string param and 'Authorization' as HTTP header."
356        })
357        .to_string();
358        return Some(dynamo_response(StatusCode::BAD_REQUEST, response_ct, body));
359    }
360
361    // Query-string auth (X-Amz-Algorithm present)
362    if has_algorithm_query {
363        let mut missing = Vec::new();
364        let query_params: Vec<&str> = query
365            .split('&')
366            .map(|p| p.split('=').next().unwrap_or(""))
367            .collect();
368
369        // Check if X-Amz-Algorithm has a non-empty value
370        let algo_has_value = query.split('&').any(|p| {
371            let mut parts = p.splitn(2, '=');
372            let key = parts.next().unwrap_or("");
373            let val = parts.next().unwrap_or("");
374            key == "X-Amz-Algorithm" && !val.is_empty()
375        });
376
377        if !algo_has_value {
378            missing.push("'X-Amz-Algorithm'");
379        }
380        for (param, label) in [
381            ("X-Amz-Credential", "'X-Amz-Credential'"),
382            ("X-Amz-Signature", "'X-Amz-Signature'"),
383            ("X-Amz-SignedHeaders", "'X-Amz-SignedHeaders'"),
384            ("X-Amz-Date", "'X-Amz-Date'"),
385        ] {
386            if !query_params.contains(&param) {
387                missing.push(label);
388            }
389        }
390
391        if !missing.is_empty() {
392            let parts: Vec<String> = missing
393                .iter()
394                .map(|p| format!("AWS query-string parameters must include {p}. "))
395                .collect();
396            let msg = format!("{}Re-examine the query-string parameters.", parts.join(""));
397            let body = serde_json::json!({
398                "__type": "com.amazon.coral.service#IncompleteSignatureException",
399                "message": msg
400            })
401            .to_string();
402            return Some(dynamo_response(StatusCode::BAD_REQUEST, response_ct, body));
403        }
404
405        // Query auth is present and complete — allow through
406        return None;
407    }
408
409    // Header-based auth
410    match auth_header {
411        None => {
412            // No Authorization header at all → MissingAuthenticationTokenException
413            let body = serde_json::json!({
414                "__type": "com.amazon.coral.service#MissingAuthenticationTokenException",
415                "message": "Request is missing Authentication Token"
416            })
417            .to_string();
418            Some(dynamo_response(StatusCode::BAD_REQUEST, response_ct, body))
419        }
420        Some(auth) => {
421            if !auth.starts_with("AWS4-") {
422                // Authorization header doesn't start with AWS4- → MissingAuthenticationTokenException
423                let body = serde_json::json!({
424                    "__type": "com.amazon.coral.service#MissingAuthenticationTokenException",
425                    "message": "Request is missing Authentication Token"
426                })
427                .to_string();
428                return Some(dynamo_response(StatusCode::BAD_REQUEST, response_ct, body));
429            }
430
431            // AWS4- prefix present — check for required parameters
432            let has_date = headers.get("x-amz-date").is_some() || headers.get("date").is_some();
433
434            // Parse auth header for Credential, Signature, SignedHeaders
435            // These can be separated by spaces or commas
436            let has_credential = auth.contains("Credential=") || auth.contains("credential=");
437            let has_signature = auth.contains("Signature=") || auth.contains("signature=");
438            let has_signed_headers =
439                auth.contains("SignedHeaders=") || auth.contains("signedheaders=");
440
441            let mut missing = Vec::new();
442            if !has_credential {
443                missing.push("'Credential'");
444            }
445            if !has_signature {
446                missing.push("'Signature'");
447            }
448            if !has_signed_headers {
449                missing.push("'SignedHeaders'");
450            }
451            if !has_date {
452                missing.push("existence of either a 'X-Amz-Date' or a 'Date' header.");
453            }
454
455            if missing.is_empty() {
456                // All required parts present — allow through (we don't verify signatures)
457                return None;
458            }
459
460            // Build the IncompleteSignatureException message
461            let mut parts: Vec<String> = missing
462                .iter()
463                .map(|p| {
464                    if p.contains("existence of") {
465                        format!("Authorization header requires {p}")
466                    } else {
467                        format!("Authorization header requires {p} parameter.")
468                    }
469                })
470                .collect();
471            parts.push(format!("Authorization={auth}"));
472            let msg = parts.join(" ");
473            let body = serde_json::json!({
474                "__type": "com.amazon.coral.service#IncompleteSignatureException",
475                "message": msg
476            })
477            .to_string();
478            Some(dynamo_response(StatusCode::BAD_REQUEST, response_ct, body))
479        }
480    }
481}
482
483/// Java ClassCastException message that DynamoDB leaks for certain type mismatches.
484const PARAMETERIZED_TYPE_CAST_ERROR: &str = "class sun.reflect.generics.reflectiveObjects.ParameterizedTypeImpl cannot be cast to class java.lang.Class (sun.reflect.generics.reflectiveObjects.ParameterizedTypeImpl and java.lang.Class are in module java.base of loader 'bootstrap')";
485
486/// Known DynamoDB operations — used to distinguish unknown targets from known ones.
487fn is_known_operation(op: &str) -> bool {
488    matches!(
489        op,
490        "CreateTable"
491            | "DeleteTable"
492            | "DescribeTable"
493            | "ListTables"
494            | "UpdateTable"
495            | "PutItem"
496            | "GetItem"
497            | "DeleteItem"
498            | "UpdateItem"
499            | "Query"
500            | "Scan"
501            | "BatchGetItem"
502            | "BatchWriteItem"
503            | "TransactWriteItems"
504            | "TransactGetItems"
505            | "ListStreams"
506            | "DescribeStream"
507            | "GetShardIterator"
508            | "GetRecords"
509            | "UpdateTimeToLive"
510            | "DescribeTimeToLive"
511            | "ExecuteStatement"
512            | "ExecuteTransaction"
513            | "BatchExecuteStatement"
514            | "TagResource"
515            | "UntagResource"
516            | "ListTagsOfResource"
517    )
518}
519
520/// SerializationException with no message (just `__type`).
521/// Used for JSON parse failures at the connection level.
522fn serialization_exception_bare(content_type: &str) -> Response {
523    let body = r#"{"__type":"com.amazon.coral.service#SerializationException"}"#.to_string();
524    dynamo_response(StatusCode::BAD_REQUEST, content_type, body)
525}
526
527/// UnknownOperationException with no message (just `__type`).
528fn unknown_operation_response(content_type: &str) -> Response {
529    let body = r#"{"__type":"com.amazon.coral.service#UnknownOperationException"}"#.to_string();
530    dynamo_response(StatusCode::BAD_REQUEST, content_type, body)
531}
532
533/// Pre-check JSON field types that are deserialized as `serde_json::Value`.
534///
535/// DynamoDB returns SerializationException for type mismatches on fields like
536/// AttributeDefinitions, KeySchema, etc. Because our raw request structs use
537/// `Option<serde_json::Value>` for these fields, serde accepts any JSON type.
538/// This function inspects the raw JSON and returns the appropriate
539/// SerializationException before serde gets involved.
540fn pre_check_serialization_types(operation: &str, body: &str) -> crate::Result<()> {
541    let json: serde_json::Value = serde_json::from_str(body)
542        .map_err(|e| crate::DynoxideError::SerializationException(e.to_string()))?;
543
544    let obj = match json.as_object() {
545        Some(o) => o,
546        None => return Ok(()),
547    };
548
549    match operation {
550        "CreateTable" => {
551            check_field_is_list(obj, "AttributeDefinitions")?;
552            check_field_is_list(obj, "KeySchema")?;
553            check_field_is_list(obj, "LocalSecondaryIndexes")?;
554            check_field_is_list(obj, "GlobalSecondaryIndexes")?;
555            check_list_elements_are_structs(obj, "AttributeDefinitions")?;
556            check_list_elements_are_structs(obj, "KeySchema")?;
557            check_list_elements_are_structs(obj, "LocalSecondaryIndexes")?;
558            check_list_elements_are_structs(obj, "GlobalSecondaryIndexes")?;
559
560            // Check struct fields and their inner scalar types
561            check_field_is_struct(obj, "ProvisionedThroughput")?;
562            check_nested_pt_fields(obj)?;
563
564            // Check nested fields inside KeySchema elements
565            check_nested_list_structs(obj, "KeySchema")?;
566            // Check nested fields inside AttributeDefinitions elements
567            check_nested_list_structs(obj, "AttributeDefinitions")?;
568
569            // Check nested list fields inside LocalSecondaryIndexes
570            if let Some(serde_json::Value::Array(arr)) = obj.get("LocalSecondaryIndexes") {
571                for item in arr {
572                    if let Some(inner) = item.as_object() {
573                        check_field_is_struct(inner, "Projection")?;
574                        check_field_is_list(inner, "KeySchema")?;
575                        check_list_elements_are_structs(inner, "KeySchema")?;
576                        check_field_is_string(inner, "IndexName")?;
577                        check_nested_list_structs(inner, "KeySchema")?;
578                        check_nested_projection_fields(inner)?;
579                        if let Some(proj) = inner.get("Projection").and_then(|p| p.as_object()) {
580                            check_field_is_list(proj, "NonKeyAttributes")?;
581                            check_nested_list_strings(proj, "NonKeyAttributes")?;
582                        }
583                    }
584                }
585            }
586
587            // Check nested list fields inside GlobalSecondaryIndexes
588            if let Some(serde_json::Value::Array(arr)) = obj.get("GlobalSecondaryIndexes") {
589                for item in arr {
590                    if let Some(inner) = item.as_object() {
591                        check_field_is_struct(inner, "Projection")?;
592                        check_field_is_struct(inner, "ProvisionedThroughput")?;
593                        check_field_is_list(inner, "KeySchema")?;
594                        check_list_elements_are_structs(inner, "KeySchema")?;
595                        check_field_is_string(inner, "IndexName")?;
596                        check_nested_list_structs(inner, "KeySchema")?;
597                        check_nested_projection_fields(inner)?;
598                        check_nested_pt_fields(inner)?;
599                        if let Some(proj) = inner.get("Projection").and_then(|p| p.as_object()) {
600                            check_field_is_list(proj, "NonKeyAttributes")?;
601                            check_nested_list_strings(proj, "NonKeyAttributes")?;
602                        }
603                    }
604                }
605            }
606        }
607        "UpdateTable" => {
608            check_field_is_list(obj, "GlobalSecondaryIndexUpdates")?;
609            check_list_elements_are_structs(obj, "GlobalSecondaryIndexUpdates")?;
610            check_field_is_struct(obj, "ProvisionedThroughput")?;
611            check_nested_pt_fields(obj)?;
612            // Check inside GlobalSecondaryIndexUpdates
613            if let Some(serde_json::Value::Array(arr)) = obj.get("GlobalSecondaryIndexUpdates") {
614                for item in arr {
615                    if let Some(inner) = item.as_object() {
616                        check_field_is_struct(inner, "Create")?;
617                        check_field_is_struct(inner, "Update")?;
618                        check_field_is_struct(inner, "Delete")?;
619                        if let Some(create) = inner.get("Create").and_then(|v| v.as_object()) {
620                            check_field_is_struct(create, "Projection")?;
621                            check_field_is_struct(create, "ProvisionedThroughput")?;
622                            check_field_is_list(create, "KeySchema")?;
623                            check_list_elements_are_structs(create, "KeySchema")?;
624                            check_nested_list_structs(create, "KeySchema")?;
625                            check_nested_projection_fields(create)?;
626                            check_nested_pt_fields(create)?;
627                        }
628                        if let Some(update) = inner.get("Update").and_then(|v| v.as_object()) {
629                            check_field_is_struct(update, "ProvisionedThroughput")?;
630                            check_nested_pt_fields(update)?;
631                        }
632                    }
633                }
634            }
635        }
636        "PutItem" | "DeleteItem" | "UpdateItem" => {
637            check_field_is_map(
638                obj,
639                "AttributeUpdates",
640                "com.amazonaws.dynamodb.v20120810.AttributeValueUpdate",
641            )?;
642            check_map_values_are_structs(obj, "AttributeUpdates")?;
643        }
644        "Query" => {
645            check_field_is_map(
646                obj,
647                "KeyConditions",
648                "com.amazonaws.dynamodb.v20120810.Condition",
649            )?;
650            check_field_is_map(
651                obj,
652                "QueryFilter",
653                "com.amazonaws.dynamodb.v20120810.Condition",
654            )?;
655            check_map_values_are_structs(obj, "QueryFilter")?;
656            check_map_values_are_structs(obj, "KeyConditions")?;
657            check_filter_inner_fields(obj, "QueryFilter")?;
658            check_filter_inner_fields(obj, "KeyConditions")?;
659            check_filter_attribute_value_lists(obj, "QueryFilter")?;
660            check_field_is_map(
661                obj,
662                "ExclusiveStartKey",
663                "com.amazonaws.dynamodb.v20120810.AttributeValue",
664            )?;
665        }
666        "Scan" => {
667            check_field_is_map(
668                obj,
669                "ScanFilter",
670                "com.amazonaws.dynamodb.v20120810.Condition",
671            )?;
672            check_map_values_are_structs(obj, "ScanFilter")?;
673            check_filter_inner_fields(obj, "ScanFilter")?;
674            check_filter_attribute_value_lists(obj, "ScanFilter")?;
675            check_field_is_map(
676                obj,
677                "ExclusiveStartKey",
678                "com.amazonaws.dynamodb.v20120810.AttributeValue",
679            )?;
680        }
681        "BatchGetItem" => {
682            check_field_is_map(
683                obj,
684                "RequestItems",
685                "com.amazonaws.dynamodb.v20120810.KeysAndAttributes",
686            )?;
687            check_map_values_are_structs(obj, "RequestItems")?;
688            // Check nested fields inside RequestItems
689            if let Some(serde_json::Value::Object(ri)) = obj.get("RequestItems") {
690                for (_table, val) in ri {
691                    if let Some(inner) = val.as_object() {
692                        check_field_is_map(inner, "ExpressionAttributeNames", "java.lang.String")?;
693                        // Check Keys array elements are maps, and their values are AV structs
694                        if let Some(serde_json::Value::Array(keys)) = inner.get("Keys") {
695                            for key in keys {
696                                if !key.is_object() && !key.is_null() {
697                                    return Err(crate::DynoxideError::SerializationException(
698                                        PARAMETERIZED_TYPE_CAST_ERROR.to_string(),
699                                    ));
700                                }
701                                if let Some(key_map) = key.as_object() {
702                                    for (_k, v) in key_map {
703                                        if !v.is_object() && !v.is_null() {
704                                            return Err(
705                                                crate::DynoxideError::SerializationException(
706                                                    "Unexpected value type in payload".to_string(),
707                                                ),
708                                            );
709                                        }
710                                    }
711                                }
712                            }
713                        }
714                    }
715                }
716            }
717        }
718        "BatchWriteItem" => {
719            check_field_is_map(
720                obj,
721                "RequestItems",
722                "java.util.List<com.amazonaws.dynamodb.v20120810.WriteRequest>",
723            )?;
724            // Check nested fields inside RequestItems
725            if let Some(serde_json::Value::Object(ri)) = obj.get("RequestItems") {
726                for (_table, val) in ri {
727                    // Each value must be an array of WriteRequests
728                    if !val.is_array() && !val.is_null() {
729                        return Err(crate::DynoxideError::SerializationException(
730                            PARAMETERIZED_TYPE_CAST_ERROR.to_string(),
731                        ));
732                    }
733                    if let Some(items) = val.as_array() {
734                        // Check array elements are structs (WriteRequest)
735                        for item in items {
736                            if !item.is_object() && !item.is_null() {
737                                let msg = if item.is_array() {
738                                    "Unrecognized collection type class com.amazonaws.dynamodb.v20120810.WriteRequest".to_string()
739                                } else {
740                                    "Unexpected value type in payload".to_string()
741                                };
742                                return Err(crate::DynoxideError::SerializationException(msg));
743                            }
744                        }
745                        for item in items {
746                            if let Some(inner) = item.as_object() {
747                                check_field_is_struct(inner, "DeleteRequest")?;
748                                check_field_is_struct(inner, "PutRequest")?;
749                                if let Some(dr) =
750                                    inner.get("DeleteRequest").and_then(|v| v.as_object())
751                                {
752                                    check_field_is_map(
753                                        dr,
754                                        "Key",
755                                        "com.amazonaws.dynamodb.v20120810.AttributeValue",
756                                    )?;
757                                    check_map_values_are_structs(dr, "Key")?;
758                                }
759                                if let Some(pr) =
760                                    inner.get("PutRequest").and_then(|v| v.as_object())
761                                {
762                                    check_field_is_map(
763                                        pr,
764                                        "Item",
765                                        "com.amazonaws.dynamodb.v20120810.AttributeValue",
766                                    )?;
767                                    check_map_values_are_structs(pr, "Item")?;
768                                }
769                            }
770                        }
771                    }
772                }
773            }
774        }
775        "TagResource" => {
776            check_field_is_list(obj, "Tags")?;
777            check_list_elements_are_structs(obj, "Tags")?;
778        }
779        _ => {}
780    }
781
782    // Common map fields — checked AFTER operation-specific nested fields
783    check_field_is_map(
784        obj,
785        "Key",
786        "com.amazonaws.dynamodb.v20120810.AttributeValue",
787    )?;
788    check_field_is_map(
789        obj,
790        "Item",
791        "com.amazonaws.dynamodb.v20120810.AttributeValue",
792    )?;
793    check_field_is_map(obj, "ExpressionAttributeNames", "java.lang.String")?;
794    check_field_is_map(
795        obj,
796        "ExpressionAttributeValues",
797        "com.amazonaws.dynamodb.v20120810.AttributeValue",
798    )?;
799    check_field_is_map(
800        obj,
801        "Expected",
802        "com.amazonaws.dynamodb.v20120810.ExpectedAttributeValue",
803    )?;
804
805    // Check that attribute value map entries are structs (not scalars)
806    check_map_values_are_structs(obj, "Key")?;
807    check_map_values_are_structs(obj, "Item")?;
808    check_map_values_are_structs(obj, "ExpressionAttributeValues")?;
809    check_map_values_are_structs(obj, "ExclusiveStartKey")?;
810    check_map_values_are_structs(obj, "Expected")?;
811
812    // Check Expected.Attr inner fields
813    if let Some(serde_json::Value::Object(expected)) = obj.get("Expected") {
814        for (_attr, cond) in expected {
815            if let Some(cond_obj) = cond.as_object() {
816                check_field_is_bool(cond_obj, "Exists")?;
817            }
818        }
819    }
820
821    // Common scalar fields — checked AFTER nested fields to match DynamoDB ordering
822    check_field_is_string(obj, "TableName")?;
823    check_field_is_string(obj, "IndexName")?;
824    check_field_is_string(obj, "ReturnConsumedCapacity")?;
825    check_field_is_string(obj, "ReturnValues")?;
826    check_field_is_string(obj, "ReturnItemCollectionMetrics")?;
827    check_field_is_string(obj, "ConditionalOperator")?;
828    check_field_is_string(obj, "Select")?;
829    check_field_is_string(obj, "ConditionExpression")?;
830    check_field_is_string(obj, "FilterExpression")?;
831    check_field_is_string(obj, "KeyConditionExpression")?;
832    check_field_is_string(obj, "ProjectionExpression")?;
833    check_field_is_string(obj, "UpdateExpression")?;
834    check_field_is_int(obj, "Limit")?;
835    check_field_is_int(obj, "Segment")?;
836    check_field_is_int(obj, "TotalSegments")?;
837    check_field_is_bool(obj, "ScanIndexForward")?;
838    check_field_is_bool(obj, "ConsistentRead")?;
839
840    Ok(())
841}
842
843/// Check that a field, if present, is a JSON number (integer).
844/// `java_type` is "Long" for PT fields, "Integer" for Limit/Segment/etc.
845fn check_field_is_integer_typed(
846    obj: &serde_json::Map<String, serde_json::Value>,
847    field: &str,
848    java_type: &str,
849) -> crate::Result<()> {
850    let val = match obj.get(field) {
851        Some(v) if !v.is_null() => v,
852        _ => return Ok(()),
853    };
854
855    if val.is_number() {
856        return Ok(());
857    }
858
859    let msg = if val.is_array() {
860        format!("Unrecognized collection type class java.lang.{java_type}")
861    } else if val.is_object() {
862        "Start of structure or map found where not expected".to_string()
863    } else if val.is_boolean() {
864        if val.as_bool() == Some(true) {
865            format!("TRUE_VALUE cannot be converted to {java_type}")
866        } else {
867            format!("FALSE_VALUE cannot be converted to {java_type}")
868        }
869    } else if val.is_string() {
870        format!("STRING_VALUE cannot be converted to {java_type}")
871    } else {
872        "Unexpected field type".to_string()
873    };
874
875    Err(crate::DynoxideError::SerializationException(msg))
876}
877
878/// Check integer field using "Long" type (for PT fields).
879fn check_field_is_integer(
880    obj: &serde_json::Map<String, serde_json::Value>,
881    field: &str,
882) -> crate::Result<()> {
883    check_field_is_integer_typed(obj, field, "Long")
884}
885
886/// Check integer field using "Integer" type (for Limit, Segment, etc.).
887fn check_field_is_int(
888    obj: &serde_json::Map<String, serde_json::Value>,
889    field: &str,
890) -> crate::Result<()> {
891    check_field_is_integer_typed(obj, field, "Integer")
892}
893
894/// Check that a field, if present and not null, is a JSON string.
895/// Returns SerializationException for wrong types.
896fn check_field_is_string(
897    obj: &serde_json::Map<String, serde_json::Value>,
898    field: &str,
899) -> crate::Result<()> {
900    let val = match obj.get(field) {
901        Some(v) if !v.is_null() => v,
902        _ => return Ok(()),
903    };
904
905    if val.is_string() {
906        return Ok(());
907    }
908
909    let msg = if val.is_array() {
910        "Unrecognized collection type class java.lang.String".to_string()
911    } else if val.is_object() {
912        "Start of structure or map found where not expected".to_string()
913    } else if val.as_bool() == Some(true) {
914        "TRUE_VALUE cannot be converted to String".to_string()
915    } else if val.as_bool() == Some(false) {
916        "FALSE_VALUE cannot be converted to String".to_string()
917    } else if val.is_number() {
918        // DynamoDB distinguishes DECIMAL_VALUE (float) from NUMBER_VALUE (int)
919        if val.is_f64() && !val.is_i64() && !val.is_u64() {
920            "DECIMAL_VALUE cannot be converted to String".to_string()
921        } else {
922            "NUMBER_VALUE cannot be converted to String".to_string()
923        }
924    } else {
925        "Unexpected field type".to_string()
926    };
927
928    Err(crate::DynoxideError::SerializationException(msg))
929}
930
931/// Check that a field, if present and not null, is a JSON boolean.
932/// Returns SerializationException for wrong types.
933fn check_field_is_bool(
934    obj: &serde_json::Map<String, serde_json::Value>,
935    field: &str,
936) -> crate::Result<()> {
937    let val = match obj.get(field) {
938        Some(v) if !v.is_null() => v,
939        _ => return Ok(()),
940    };
941
942    if val.is_boolean() {
943        return Ok(());
944    }
945
946    let msg = if val.is_array() {
947        "Unrecognized collection type class java.lang.Boolean".to_string()
948    } else if val.is_object() {
949        "Start of structure or map found where not expected".to_string()
950    } else if val.is_string() {
951        "Unexpected token received from parser".to_string()
952    } else if val.is_number() {
953        if val.is_f64() && !val.is_i64() && !val.is_u64() {
954            "DECIMAL_VALUE cannot be converted to Boolean".to_string()
955        } else {
956            "NUMBER_VALUE cannot be converted to Boolean".to_string()
957        }
958    } else {
959        "Unexpected field type".to_string()
960    };
961
962    Err(crate::DynoxideError::SerializationException(msg))
963}
964
965/// Check that all elements in a list field are JSON objects (structs).
966/// Returns "Unexpected value type in payload" for non-struct elements.
967fn check_list_elements_are_structs(
968    obj: &serde_json::Map<String, serde_json::Value>,
969    field: &str,
970) -> crate::Result<()> {
971    let java_class = match field {
972        "KeySchema" => "com.amazonaws.dynamodb.v20120810.KeySchemaElement",
973        "AttributeDefinitions" => "com.amazonaws.dynamodb.v20120810.AttributeDefinition",
974        "LocalSecondaryIndexes" => "com.amazonaws.dynamodb.v20120810.LocalSecondaryIndex",
975        "GlobalSecondaryIndexes" => "com.amazonaws.dynamodb.v20120810.GlobalSecondaryIndex",
976        "GlobalSecondaryIndexUpdates" => {
977            "com.amazonaws.dynamodb.v20120810.GlobalSecondaryIndexUpdate"
978        }
979        "Tags" => "com.amazonaws.dynamodb.v20120810.Tag",
980        _ => "Unknown",
981    };
982    if let Some(serde_json::Value::Array(arr)) = obj.get(field) {
983        for item in arr {
984            if !item.is_object() && !item.is_null() {
985                let msg = if item.is_array() {
986                    format!("Unrecognized collection type class {java_class}")
987                } else {
988                    "Unexpected value type in payload".to_string()
989                };
990                return Err(crate::DynoxideError::SerializationException(msg));
991            }
992        }
993    }
994    Ok(())
995}
996
997/// Check scalar fields inside a ProvisionedThroughput struct.
998fn check_nested_pt_fields(obj: &serde_json::Map<String, serde_json::Value>) -> crate::Result<()> {
999    if let Some(pt) = obj.get("ProvisionedThroughput").and_then(|v| v.as_object()) {
1000        check_field_is_integer(pt, "WriteCapacityUnits")?;
1001        check_field_is_integer(pt, "ReadCapacityUnits")?;
1002    }
1003    Ok(())
1004}
1005
1006/// Check scalar fields inside a Projection struct.
1007fn check_nested_projection_fields(
1008    obj: &serde_json::Map<String, serde_json::Value>,
1009) -> crate::Result<()> {
1010    if let Some(proj) = obj.get("Projection").and_then(|v| v.as_object()) {
1011        check_field_is_string(proj, "ProjectionType")?;
1012    }
1013    Ok(())
1014}
1015
1016/// Check that elements inside a list field are structs, and check their scalar fields.
1017fn check_nested_list_structs(
1018    obj: &serde_json::Map<String, serde_json::Value>,
1019    field: &str,
1020) -> crate::Result<()> {
1021    if let Some(serde_json::Value::Array(arr)) = obj.get(field) {
1022        for item in arr {
1023            if let Some(inner) = item.as_object() {
1024                // Common struct fields in KeySchema/AttributeDefinitions elements
1025                check_field_is_string(inner, "KeyType")?;
1026                check_field_is_string(inner, "AttributeName")?;
1027                check_field_is_string(inner, "AttributeType")?;
1028                check_field_is_string(inner, "IndexName")?;
1029            }
1030        }
1031    }
1032    Ok(())
1033}
1034
1035/// Check that elements inside a string list field are actually strings.
1036fn check_nested_list_strings(
1037    obj: &serde_json::Map<String, serde_json::Value>,
1038    field: &str,
1039) -> crate::Result<()> {
1040    if let Some(serde_json::Value::Array(arr)) = obj.get(field) {
1041        for item in arr {
1042            if !item.is_string() && !item.is_null() {
1043                if item.is_boolean() {
1044                    let val = if item.as_bool() == Some(true) {
1045                        "TRUE_VALUE"
1046                    } else {
1047                        "FALSE_VALUE"
1048                    };
1049                    return Err(crate::DynoxideError::SerializationException(format!(
1050                        "{val} cannot be converted to String"
1051                    )));
1052                } else if item.is_number() {
1053                    return Err(crate::DynoxideError::SerializationException(
1054                        "NUMBER_VALUE cannot be converted to String".to_string(),
1055                    ));
1056                }
1057            }
1058        }
1059    }
1060    Ok(())
1061}
1062
1063/// Check that all values in a map field (if present) are JSON objects (attribute value structs).
1064fn check_map_values_are_structs(
1065    obj: &serde_json::Map<String, serde_json::Value>,
1066    field: &str,
1067) -> crate::Result<()> {
1068    let java_class = match field {
1069        "Key" | "Item" | "ExpressionAttributeValues" | "ExclusiveStartKey" => {
1070            "com.amazonaws.dynamodb.v20120810.AttributeValue"
1071        }
1072        "Expected" => "com.amazonaws.dynamodb.v20120810.ExpectedAttributeValue",
1073        "AttributeUpdates" => "com.amazonaws.dynamodb.v20120810.AttributeValueUpdate",
1074        "RequestItems" => "com.amazonaws.dynamodb.v20120810.KeysAndAttributes",
1075        "KeyConditions" | "QueryFilter" | "ScanFilter" => {
1076            "com.amazonaws.dynamodb.v20120810.Condition"
1077        }
1078        _ => "Unknown",
1079    };
1080    if let Some(serde_json::Value::Object(map)) = obj.get(field) {
1081        for (_key, val) in map {
1082            if !val.is_object() && !val.is_null() {
1083                let msg = if val.is_array() {
1084                    format!("Unrecognized collection type class {java_class}")
1085                } else {
1086                    "Unexpected value type in payload".to_string()
1087                };
1088                return Err(crate::DynoxideError::SerializationException(msg));
1089            }
1090        }
1091    }
1092    Ok(())
1093}
1094
1095/// Check that a field, if present and not null, is a JSON object (map).
1096/// Returns SerializationException with the DynamoDB Java type in the message.
1097fn check_field_is_map(
1098    obj: &serde_json::Map<String, serde_json::Value>,
1099    field: &str,
1100    java_value_type: &str,
1101) -> crate::Result<()> {
1102    let val = match obj.get(field) {
1103        Some(v) if !v.is_null() => v,
1104        _ => return Ok(()),
1105    };
1106
1107    if val.is_object() {
1108        return Ok(());
1109    }
1110
1111    let msg = if val.is_array() {
1112        format!("Unrecognized collection type java.util.Map<java.lang.String, {java_value_type}>")
1113    } else {
1114        // Scalar value where map expected → DynamoDB returns "Unexpected field type"
1115        "Unexpected field type".to_string()
1116    };
1117
1118    Err(crate::DynoxideError::SerializationException(msg))
1119}
1120
1121/// Check that a field, if present and not null, is a JSON object (struct).
1122/// Returns SerializationException with the appropriate message for the wrong type.
1123fn check_field_is_struct(
1124    obj: &serde_json::Map<String, serde_json::Value>,
1125    field: &str,
1126) -> crate::Result<()> {
1127    let val = match obj.get(field) {
1128        Some(v) if !v.is_null() => v,
1129        _ => return Ok(()),
1130    };
1131
1132    if val.is_object() {
1133        return Ok(());
1134    }
1135
1136    let msg = if val.is_array() {
1137        // Try to map field name to DynamoDB Java class
1138        let dynamo_class = match field {
1139            "ProvisionedThroughput" => {
1140                Some("com.amazonaws.dynamodb.v20120810.ProvisionedThroughput")
1141            }
1142            "Projection" => Some("com.amazonaws.dynamodb.v20120810.Projection"),
1143            "DeleteRequest" => Some("com.amazonaws.dynamodb.v20120810.DeleteRequest"),
1144            "PutRequest" => Some("com.amazonaws.dynamodb.v20120810.PutRequest"),
1145            "Create" => Some("com.amazonaws.dynamodb.v20120810.CreateGlobalSecondaryIndexAction"),
1146            "Update" => Some("com.amazonaws.dynamodb.v20120810.UpdateGlobalSecondaryIndexAction"),
1147            "Delete" => Some("com.amazonaws.dynamodb.v20120810.DeleteGlobalSecondaryIndexAction"),
1148            _ => None,
1149        };
1150        if let Some(cls) = dynamo_class {
1151            format!("Unrecognized collection type class {cls}")
1152        } else {
1153            "Start of structure or map found where not expected".to_string()
1154        }
1155    } else {
1156        // Scalar value where struct expected
1157        "Unexpected field type".to_string()
1158    };
1159
1160    Err(crate::DynoxideError::SerializationException(msg))
1161}
1162
1163/// Check that a field, if present and not null, is a JSON array.
1164/// Returns the appropriate SerializationException message for the wrong type.
1165fn check_field_is_list(
1166    obj: &serde_json::Map<String, serde_json::Value>,
1167    field: &str,
1168) -> crate::Result<()> {
1169    let val = match obj.get(field) {
1170        Some(v) if !v.is_null() => v,
1171        _ => return Ok(()),
1172    };
1173
1174    if val.is_array() {
1175        return Ok(());
1176    }
1177
1178    let msg = if val.is_object() {
1179        "Start of structure or map found where not expected".to_string()
1180    } else {
1181        "Unexpected field type".to_string()
1182    };
1183
1184    Err(crate::DynoxideError::SerializationException(msg))
1185}
1186
1187/// Check scalar fields inside filter condition map entries (QueryFilter/ScanFilter/KeyConditions).
1188fn check_filter_inner_fields(
1189    obj: &serde_json::Map<String, serde_json::Value>,
1190    filter_field: &str,
1191) -> crate::Result<()> {
1192    let filter = match obj.get(filter_field) {
1193        Some(v) if v.is_object() => v.as_object().unwrap(),
1194        _ => return Ok(()),
1195    };
1196
1197    for (_attr_name, condition) in filter {
1198        if let Some(cond_obj) = condition.as_object() {
1199            check_field_is_string(cond_obj, "ComparisonOperator")?;
1200            check_field_is_list(cond_obj, "AttributeValueList")?;
1201            // Check AVL elements are attr structs
1202            if let Some(serde_json::Value::Array(avl)) = cond_obj.get("AttributeValueList") {
1203                for item in avl {
1204                    if !item.is_object() && !item.is_null() {
1205                        let msg = if item.is_array() {
1206                            "Unrecognized collection type class com.amazonaws.dynamodb.v20120810.AttributeValue"
1207                                .to_string()
1208                        } else {
1209                            "Unexpected value type in payload".to_string()
1210                        };
1211                        return Err(crate::DynoxideError::SerializationException(msg));
1212                    }
1213                }
1214            }
1215        }
1216    }
1217    Ok(())
1218}
1219
1220/// Check AttributeValueList fields inside a filter map (QueryFilter/ScanFilter).
1221///
1222/// The filter is a map of attribute names to condition objects, each of which
1223/// may contain an AttributeValueList that must be an array.
1224fn check_filter_attribute_value_lists(
1225    obj: &serde_json::Map<String, serde_json::Value>,
1226    filter_field: &str,
1227) -> crate::Result<()> {
1228    let filter = match obj.get(filter_field) {
1229        Some(v) if v.is_object() => v.as_object().unwrap(),
1230        _ => return Ok(()),
1231    };
1232
1233    for (_attr_name, condition) in filter {
1234        if let Some(cond_obj) = condition.as_object() {
1235            check_field_is_list(cond_obj, "AttributeValueList")?;
1236        }
1237    }
1238
1239    Ok(())
1240}
1241
1242fn dispatch(db: &Database, operation: &str, body: &str) -> crate::Result<String> {
1243    // Pre-check JSON field types for operations that use serde_json::Value internally.
1244    // These checks must run before serde deserialisation because serde_json::Value accepts
1245    // any JSON type, so type mismatches on list/struct fields would silently pass through.
1246    pre_check_serialization_types(operation, body)?;
1247
1248    match operation {
1249        "CreateTable" => {
1250            let req = deserialize(body)?;
1251            let resp = db.create_table(req)?;
1252            serialize(&resp)
1253        }
1254        "DeleteTable" => {
1255            let req = deserialize(body)?;
1256            let resp = db.delete_table(req)?;
1257            serialize(&resp)
1258        }
1259        "DescribeTable" => {
1260            let req = deserialize(body)?;
1261            let resp = db.describe_table(req)?;
1262            serialize(&resp)
1263        }
1264        "ListTables" => {
1265            let req = deserialize(body)?;
1266            let resp = db.list_tables(req)?;
1267            serialize(&resp)
1268        }
1269        "UpdateTable" => {
1270            let req = deserialize(body)?;
1271            let resp = db.update_table(req)?;
1272            serialize(&resp)
1273        }
1274        "PutItem" => {
1275            let req = deserialize(body)?;
1276            let resp = db.put_item(req)?;
1277            serialize(&resp)
1278        }
1279        "GetItem" => {
1280            let req = deserialize(body)?;
1281            let resp = db.get_item(req)?;
1282            serialize(&resp)
1283        }
1284        "DeleteItem" => {
1285            let req = deserialize(body)?;
1286            let resp = db.delete_item(req)?;
1287            serialize(&resp)
1288        }
1289        "UpdateItem" => {
1290            let req = deserialize(body)?;
1291            let resp = db.update_item(req)?;
1292            serialize(&resp)
1293        }
1294        "Query" => {
1295            let req = deserialize(body)?;
1296            let resp = db.query(req)?;
1297            serialize(&resp)
1298        }
1299        "Scan" => {
1300            let req = deserialize(body)?;
1301            let resp = db.scan(req)?;
1302            serialize(&resp)
1303        }
1304        "BatchGetItem" => {
1305            let req = deserialize(body)?;
1306            let resp = db.batch_get_item(req)?;
1307            serialize(&resp)
1308        }
1309        "BatchWriteItem" => {
1310            let req = deserialize(body)?;
1311            let resp = db.batch_write_item(req)?;
1312            serialize(&resp)
1313        }
1314        "TransactWriteItems" => {
1315            let req = deserialize(body)?;
1316            let resp = db.transact_write_items(req)?;
1317            serialize(&resp)
1318        }
1319        "TransactGetItems" => {
1320            let req = deserialize(body)?;
1321            let resp = db.transact_get_items(req)?;
1322            serialize(&resp)
1323        }
1324        "ListStreams" => {
1325            let req = deserialize(body)?;
1326            let resp = db.list_streams(req)?;
1327            serialize(&resp)
1328        }
1329        "DescribeStream" => {
1330            let req = deserialize(body)?;
1331            let resp = db.describe_stream(req)?;
1332            serialize(&resp)
1333        }
1334        "GetShardIterator" => {
1335            let req = deserialize(body)?;
1336            let resp = db.get_shard_iterator(req)?;
1337            serialize(&resp)
1338        }
1339        "GetRecords" => {
1340            let req = deserialize(body)?;
1341            let resp = db.get_records(req)?;
1342            serialize(&resp)
1343        }
1344        "UpdateTimeToLive" => {
1345            let req = deserialize(body)?;
1346            let resp = db.update_time_to_live(req)?;
1347            serialize(&resp)
1348        }
1349        "DescribeTimeToLive" => {
1350            let req = deserialize(body)?;
1351            let resp = db.describe_time_to_live(req)?;
1352            serialize(&resp)
1353        }
1354        "ExecuteStatement" => {
1355            let req = deserialize(body)?;
1356            let resp = db.execute_statement(req)?;
1357            serialize(&resp)
1358        }
1359        "ExecuteTransaction" => {
1360            let req = deserialize(body)?;
1361            let resp = db.execute_transaction(req)?;
1362            serialize(&resp)
1363        }
1364        "BatchExecuteStatement" => {
1365            let req = deserialize(body)?;
1366            let resp = db.batch_execute_statement(req)?;
1367            serialize(&resp)
1368        }
1369        "TagResource" => {
1370            let req = deserialize(body)?;
1371            let resp = db.tag_resource(req)?;
1372            serialize(&resp)
1373        }
1374        "UntagResource" => {
1375            let req = deserialize(body)?;
1376            let resp = db.untag_resource(req)?;
1377            serialize(&resp)
1378        }
1379        "ListTagsOfResource" => {
1380            let req = deserialize(body)?;
1381            let resp = db.list_tags_of_resource(req)?;
1382            serialize(&resp)
1383        }
1384        _ => {
1385            // This should not be reachable because is_known_operation() filters first,
1386            // but handle it defensively.
1387            Err(crate::DynoxideError::SerializationException(
1388                "UnknownOperationException".to_string(),
1389            ))
1390        }
1391    }
1392}
1393
1394fn deserialize<T: serde::de::DeserializeOwned>(body: &str) -> crate::Result<T> {
1395    serde_json::from_str(body).map_err(|e| {
1396        let msg = e.to_string();
1397        // Custom validation errors from our Deserialize impls use a "VALIDATION:" prefix
1398        // to signal that these should be ValidationException, not SerializationException.
1399        if let Some(stripped) = msg.strip_prefix("VALIDATION:") {
1400            // serde_json appends " at line N column N" to custom errors — strip it
1401            let clean = strip_serde_position(stripped);
1402            return crate::DynoxideError::ValidationException(clean.to_string());
1403        }
1404        // DynamoDB returns ValidationException for missing required fields,
1405        // null values, and unrecognised enum variants. Only true JSON type
1406        // mismatches (e.g. number where string is expected) produce a
1407        // SerializationException.
1408        if msg.contains("missing field")
1409            || msg.contains("unknown variant")
1410            || msg.contains("invalid type: null")
1411        {
1412            crate::DynoxideError::ValidationException(msg)
1413        } else if msg.contains("empty AttributeValue") {
1414            crate::DynoxideError::ValidationException(
1415                "Supplied AttributeValue is empty, must contain exactly one of the supported datatypes".to_string(),
1416            )
1417        } else if msg.contains("Supplied AttributeValue") {
1418            // Multi-datatype or empty AV error — strip position info and return as-is
1419            let clean = strip_serde_position(&msg);
1420            crate::DynoxideError::ValidationException(clean)
1421        } else {
1422            crate::DynoxideError::SerializationException(map_serde_to_dynamodb_message(&msg, body))
1423        }
1424    })
1425}
1426
1427/// Strip serde_json's " at line N column N" suffix from error messages.
1428fn strip_serde_position(msg: &str) -> String {
1429    if let Some(idx) = msg.rfind(" at line ") {
1430        // Verify the suffix looks like " at line N column N"
1431        let suffix = &msg[idx..];
1432        if suffix.contains("column") {
1433            return msg[..idx].to_string();
1434        }
1435    }
1436    msg.to_string()
1437}
1438
1439/// Map serde deserialisation error messages to DynamoDB-style SerializationException messages.
1440///
1441/// DynamoDB returns specific messages like "NUMBER_VALUE cannot be converted to String"
1442/// whereas serde returns "invalid type: integer `23`, expected a string at line 1 column 42".
1443fn map_serde_to_dynamodb_message(msg: &str, body: &str) -> String {
1444    // "invalid type: <type>, expected <target>"
1445    if let Some(rest) = msg.strip_prefix("invalid type: ") {
1446        // Extract the source type and target type
1447        let (source_part, target_part) = match rest.split_once(", expected ") {
1448            Some((s, t)) => (s, t),
1449            None => return msg.to_string(),
1450        };
1451        // Strip " at line N column N" from target
1452        let target = target_part
1453            .split(" at line ")
1454            .next()
1455            .unwrap_or(target_part)
1456            .trim();
1457
1458        return map_type_mismatch(source_part.trim(), target);
1459    }
1460
1461    // "invalid length N, expected struct X ..." → struct-level errors
1462    if msg.contains("expected struct") && msg.starts_with("invalid length ") {
1463        // Extract struct name from "invalid length N, expected struct X with M elements"
1464        if let Some(rest) = msg.split("expected struct ").nth(1) {
1465            let struct_name = rest.split(' ').next().unwrap_or("Unknown");
1466            if let Some(dynamo_class) = map_struct_to_dynamo_class(struct_name) {
1467                return format!("Unrecognized collection type class {dynamo_class}");
1468            }
1469        }
1470        return "Start of structure or map found where not expected".to_string();
1471    }
1472
1473    // "expected string for X at line N column N" → wrong type inside AttributeValue
1474    if msg.starts_with("expected string for ") {
1475        return infer_type_conversion_error(msg, body, "String");
1476    }
1477
1478    // "expected value at line N column N" → wrong value type at position
1479    if msg.starts_with("expected value at line ") {
1480        return infer_type_conversion_error(msg, body, "String");
1481    }
1482
1483    msg.to_string()
1484}
1485
1486/// Map a serde type mismatch to DynamoDB's SerializationException message.
1487fn map_type_mismatch(source: &str, target: &str) -> String {
1488    // Determine target type category
1489    let target_is_string = target == "a string";
1490    let target_is_bool = target == "a boolean";
1491    let target_is_sequence = target == "a sequence";
1492    let target_is_integer = target == "i64" || target == "u64";
1493    let target_is_struct = target.starts_with("struct ");
1494    let target_is_map = target.starts_with("a map") || target.starts_with("map");
1495
1496    // Determine source type
1497    let is_integer = source.starts_with("integer ");
1498    let is_float = source.starts_with("floating point ");
1499    let is_bool_true = source == "boolean `true`";
1500    let is_bool_false = source == "boolean `false`";
1501    let _is_bool = is_bool_true || is_bool_false;
1502    let is_string = source.starts_with("string ");
1503    let is_sequence = source == "sequence";
1504    let is_map = source == "map";
1505
1506    // Map to DynamoDB message based on (source_type, target_type) combination
1507    if target_is_sequence {
1508        // List/array fields
1509        if is_map {
1510            return "Start of structure or map found where not expected".to_string();
1511        }
1512        return "Unexpected field type".to_string();
1513    }
1514
1515    if target_is_string {
1516        if is_bool_true {
1517            return "TRUE_VALUE cannot be converted to String".to_string();
1518        }
1519        if is_bool_false {
1520            return "FALSE_VALUE cannot be converted to String".to_string();
1521        }
1522        if is_float {
1523            return "DECIMAL_VALUE cannot be converted to String".to_string();
1524        }
1525        if is_integer {
1526            return "NUMBER_VALUE cannot be converted to String".to_string();
1527        }
1528        if is_sequence {
1529            return "Unrecognized collection type class java.lang.String".to_string();
1530        }
1531        if is_map {
1532            return "Start of structure or map found where not expected".to_string();
1533        }
1534    }
1535
1536    if target_is_bool {
1537        if is_string {
1538            return "Unexpected token received from parser".to_string();
1539        }
1540        if is_float {
1541            return "DECIMAL_VALUE cannot be converted to Boolean".to_string();
1542        }
1543        if is_integer {
1544            return "NUMBER_VALUE cannot be converted to Boolean".to_string();
1545        }
1546        if is_sequence {
1547            return "Unrecognized collection type class java.lang.Boolean".to_string();
1548        }
1549        if is_map {
1550            return "Start of structure or map found where not expected".to_string();
1551        }
1552    }
1553
1554    if target_is_integer {
1555        if is_string {
1556            return "STRING_VALUE cannot be converted to Long".to_string();
1557        }
1558        if is_bool_true {
1559            return "TRUE_VALUE cannot be converted to Long".to_string();
1560        }
1561        if is_bool_false {
1562            return "FALSE_VALUE cannot be converted to Long".to_string();
1563        }
1564        if is_sequence {
1565            return "Unrecognized collection type class java.lang.Long".to_string();
1566        }
1567        if is_map {
1568            return "Start of structure or map found where not expected".to_string();
1569        }
1570    }
1571
1572    if target_is_struct || target_is_map {
1573        if is_sequence {
1574            // Need to figure out the class from target
1575            if let Some(struct_name) = target.strip_prefix("struct ") {
1576                let name = struct_name.split(' ').next().unwrap_or("Unknown");
1577                if let Some(dynamo_class) = map_struct_to_dynamo_class(name) {
1578                    return format!("Unrecognized collection type class {dynamo_class}");
1579                }
1580            }
1581        }
1582        if is_map && target_is_struct {
1583            return "Start of structure or map found where not expected".to_string();
1584        }
1585        if !is_map && !is_sequence {
1586            return "Unexpected field type".to_string();
1587        }
1588    }
1589
1590    // Fallback: return the original message
1591    source
1592        .split(" at line ")
1593        .next()
1594        .unwrap_or(source)
1595        .to_string()
1596}
1597
1598/// Infer the DynamoDB type conversion error from a serde error message.
1599/// Uses the column position to inspect the actual JSON value in the body.
1600fn infer_type_conversion_error(msg: &str, body: &str, target_type: &str) -> String {
1601    // Try to extract column number from "at line N column N"
1602    if let Some(col_str) = msg.rsplit("column ").next() {
1603        if let Ok(col) = col_str.trim().parse::<usize>() {
1604            // Column is 1-based. Look at the character just before the column
1605            // to determine what type of value serde encountered.
1606            if col > 0 && col <= body.len() {
1607                let ch = body.as_bytes()[col - 1];
1608                return match ch {
1609                    b't' => format!("TRUE_VALUE cannot be converted to {target_type}"),
1610                    b'f' => format!("FALSE_VALUE cannot be converted to {target_type}"),
1611                    b'0'..=b'9' | b'-' => {
1612                        format!("NUMBER_VALUE cannot be converted to {target_type}")
1613                    }
1614                    _ => format!("TRUE_VALUE cannot be converted to {target_type}"),
1615                };
1616            }
1617        }
1618    }
1619    format!("TRUE_VALUE cannot be converted to {target_type}")
1620}
1621
1622/// Map Rust struct names to DynamoDB Java class names for SerializationException messages.
1623fn map_struct_to_dynamo_class(struct_name: &str) -> Option<&'static str> {
1624    match struct_name {
1625        "ProvisionedThroughput" | "ProvisionedThroughputRaw" => {
1626            Some("com.amazonaws.dynamodb.v20120810.ProvisionedThroughput")
1627        }
1628        "Projection" | "ProjectionRaw" => Some("com.amazonaws.dynamodb.v20120810.Projection"),
1629        "KeySchemaElement" | "KeySchemaElementRaw" => {
1630            Some("com.amazonaws.dynamodb.v20120810.KeySchemaElement")
1631        }
1632        "AttributeDefinition" | "AttributeDefinitionRaw" => {
1633            Some("com.amazonaws.dynamodb.v20120810.AttributeDefinition")
1634        }
1635        "LocalSecondaryIndex" | "LocalSecondaryIndexRaw" => {
1636            Some("com.amazonaws.dynamodb.v20120810.LocalSecondaryIndex")
1637        }
1638        "GlobalSecondaryIndex" | "GlobalSecondaryIndexRaw" => {
1639            Some("com.amazonaws.dynamodb.v20120810.GlobalSecondaryIndex")
1640        }
1641        "DeleteGsiAction" | "DeleteGsiActionRaw" => {
1642            Some("com.amazonaws.dynamodb.v20120810.DeleteGlobalSecondaryIndexAction")
1643        }
1644        "CreateGsiAction" | "CreateGsiActionRaw" => {
1645            Some("com.amazonaws.dynamodb.v20120810.CreateGlobalSecondaryIndexAction")
1646        }
1647        "UpdateGsiAction" | "UpdateGsiActionRaw" => {
1648            Some("com.amazonaws.dynamodb.v20120810.UpdateGlobalSecondaryIndexAction")
1649        }
1650        "GlobalSecondaryIndexUpdate" | "GlobalSecondaryIndexUpdateRaw" => {
1651            Some("com.amazonaws.dynamodb.v20120810.GlobalSecondaryIndexUpdate")
1652        }
1653        "Tag" | "TagRaw" => Some("com.amazonaws.dynamodb.v20120810.Tag"),
1654        _ => None,
1655    }
1656}
1657
1658fn serialize<T: serde::Serialize>(val: &T) -> crate::Result<String> {
1659    serde_json::to_string(val).map_err(|e| crate::DynoxideError::InternalServerError(e.to_string()))
1660}
1661
1662/// Generate a DynamoDB-style request ID: 52 uppercase hex characters.
1663/// Real DynamoDB uses `[0-9A-Z]{52}`.
1664fn generate_request_id() -> String {
1665    use uuid::Uuid;
1666    // Generate two UUIDs and concat their uppercase hex (32 chars each → 64 chars, take 52)
1667    let u1 = Uuid::now_v7();
1668    let u2 = Uuid::now_v7();
1669    let hex = format!(
1670        "{}{}",
1671        u1.as_simple().to_string().to_ascii_uppercase(),
1672        u2.as_simple().to_string().to_ascii_uppercase()
1673    );
1674    hex[..52].to_string()
1675}
1676
1677/// Compute CRC32 of response body and return as string.
1678fn compute_crc32(body: &[u8]) -> String {
1679    crc32fast::hash(body).to_string()
1680}
1681
1682/// Add standard DynamoDB headers to a response: x-amzn-requestid, x-amz-crc32, content-length.
1683fn add_dynamo_headers(response: &mut Response, body_bytes: &[u8]) {
1684    let headers = response.headers_mut();
1685    headers.insert(
1686        HeaderName::from_static("x-amzn-requestid"),
1687        HeaderValue::from_str(&generate_request_id()).unwrap(),
1688    );
1689    headers.insert(
1690        HeaderName::from_static("x-amz-crc32"),
1691        HeaderValue::from_str(&compute_crc32(body_bytes)).unwrap(),
1692    );
1693    headers.insert(
1694        HeaderName::from_static("content-length"),
1695        HeaderValue::from_str(&body_bytes.len().to_string()).unwrap(),
1696    );
1697}
1698
1699/// Build a response with proper DynamoDB headers (requestid, crc32, content-length).
1700fn dynamo_response(status: StatusCode, content_type: &str, body_str: String) -> Response {
1701    let body_bytes = body_str.as_bytes();
1702    let mut resp = Response::builder()
1703        .status(status)
1704        .header("content-type", content_type)
1705        .body(Body::from(body_str.clone()))
1706        .unwrap();
1707    add_dynamo_headers(&mut resp, body_bytes);
1708    resp
1709}
1710
1711/// Build a response with proper DynamoDB headers for a raw byte body (e.g. HTML 404).
1712fn dynamo_response_raw(status: StatusCode, body_str: &str) -> Response {
1713    let body_bytes = body_str.as_bytes();
1714    let mut resp = Response::builder()
1715        .status(status)
1716        .body(Body::from(body_str.to_string()))
1717        .unwrap();
1718    add_dynamo_headers(&mut resp, body_bytes);
1719    resp
1720}