Skip to main content

dynoxide/
server.rs

1//! Axum-based HTTP server exposing the DynamoDB JSON API.
2//!
3//! Only compiled with the `http-server` feature flag.
4
5use crate::Database;
6use axum::{
7    Router,
8    body::Body,
9    extract::{DefaultBodyLimit, State},
10    http::{HeaderMap, HeaderName, HeaderValue, Method, StatusCode, Uri, header::SERVER},
11    response::Response,
12    routing::any,
13};
14use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, TcpStream};
15use std::time::Duration;
16use tower_http::set_header::SetResponseHeaderLayer;
17
18/// AWS region used in the health-check response body (mirrors DynamoDB Local behaviour).
19const AWS_REGION: &str = "us-east-1";
20
21const CONTENT_TYPE: &str = "application/x-amz-json-1.0";
22const TARGET_PREFIX: &str = "DynamoDB_20120810.";
23const STREAMS_TARGET_PREFIX: &str = "DynamoDBStreams_20120810.";
24
25/// Check whether the port is already in use by attempting a TCP connection.
26///
27/// Probes both the requested address and the cross-address (wildcard vs localhost)
28/// to detect conflicts like Docker binding `0.0.0.0` when dynoxide requests `127.0.0.1`.
29fn check_port_available(addr: SocketAddr) -> Result<(), String> {
30    let timeout = Duration::from_millis(100);
31    let port = addr.port();
32
33    // Cross-check: if binding to loopback, also probe the wildcard (and vice versa),
34    // within the same address family.
35    let cross = SocketAddr::new(
36        match addr.ip() {
37            IpAddr::V4(ip) if ip.is_loopback() => IpAddr::V4(Ipv4Addr::UNSPECIFIED),
38            IpAddr::V4(_) => IpAddr::V4(Ipv4Addr::LOCALHOST),
39            IpAddr::V6(ip) if ip.is_loopback() => IpAddr::V6(Ipv6Addr::UNSPECIFIED),
40            IpAddr::V6(_) => IpAddr::V6(Ipv6Addr::LOCALHOST),
41        },
42        port,
43    );
44
45    for probe in [addr, cross] {
46        if TcpStream::connect_timeout(&probe, timeout).is_ok() {
47            return Err(format!(
48                "port {port} is already in use (detected listener on {probe})"
49            ));
50        }
51    }
52    Ok(())
53}
54
55/// Bind a TCP listener.
56///
57/// `SO_REUSEADDR` on Unix lets us rebind past `TIME_WAIT` sockets from a
58/// previous clean shutdown. It doesn't let two live listeners share a port
59/// (that's `SO_REUSEPORT`), so port-conflict detection still works. Not set
60/// on Windows: the semantics there allow hijacking an active bind.
61fn bind_exclusive(addr: SocketAddr) -> Result<std::net::TcpListener, String> {
62    use socket2::{Domain, Protocol, Socket, Type};
63
64    let domain = if addr.is_ipv6() {
65        Domain::IPV6
66    } else {
67        Domain::IPV4
68    };
69
70    let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))
71        .map_err(|e| format!("failed to create socket: {e}"))?;
72
73    #[cfg(unix)]
74    socket
75        .set_reuse_address(true)
76        .map_err(|e| format!("failed to set SO_REUSEADDR: {e}"))?;
77
78    socket
79        .set_nonblocking(true)
80        .map_err(|e| format!("failed to set nonblocking: {e}"))?;
81    socket
82        .bind(&addr.into())
83        .map_err(|e| format!("failed to bind {addr}: {e}"))?;
84    socket
85        .listen(1024)
86        .map_err(|e| format!("failed to listen on {addr}: {e}"))?;
87
88    Ok(std::net::TcpListener::from(socket))
89}
90
91/// Start the HTTP server.
92pub async fn start(host: &str, port: u16, db: Database) -> Result<(), String> {
93    let addr: SocketAddr = format!("{host}:{port}")
94        .parse()
95        .map_err(|e| format!("invalid address {host}:{port}: {e}"))?;
96
97    // Runs before any async tasks are spawned, so blocking connect probes are safe.
98    check_port_available(addr)?;
99
100    let std_listener = bind_exclusive(addr)?;
101    let listener = tokio::net::TcpListener::from_std(std_listener)
102        .map_err(|e| format!("failed to create async listener: {e}"))?;
103
104    let app = build_router(db);
105
106    eprintln!("Dynoxide listening on http://{addr}");
107
108    axum::serve(listener, app)
109        .with_graceful_shutdown(shutdown_signal())
110        .await
111        .map_err(|e| format!("server failed: {e}"))
112}
113
114/// Start on a specific listener (for tests).
115pub async fn serve_on(listener: tokio::net::TcpListener, db: Database) {
116    let app = build_router(db);
117    axum::serve(listener, app).await.unwrap();
118}
119
120/// DynamoDB accepts bodies up to 16 MB.
121const MAX_BODY_SIZE: usize = 16 * 1024 * 1024;
122
123/// Build the shared axum router used by both `start` and `serve_on`.
124fn build_router(db: Database) -> Router {
125    Router::new()
126        .route("/", any(handle_root))
127        .fallback(handle_fallback)
128        .layer(DefaultBodyLimit::max(MAX_BODY_SIZE))
129        .layer(SetResponseHeaderLayer::overriding(
130            SERVER,
131            HeaderValue::from_static(concat!("Dynoxide/", env!("CARGO_PKG_VERSION"))),
132        ))
133        .layer(SetResponseHeaderLayer::overriding(
134            HeaderName::from_static("x-dynoxide-version"),
135            HeaderValue::from_static(env!("CARGO_PKG_VERSION")),
136        ))
137        .with_state(db)
138}
139
140/// The 404 body DynamoDB returns for non-POST methods.
141const NOT_FOUND_BODY: &str = "<UnknownOperationException/>\n";
142
143/// Single handler for all methods on `/`. Dispatches based on method.
144///
145/// - GET: health check (200)
146/// - POST: DynamoDB API dispatch
147/// - OPTIONS with Origin: CORS preflight response (200)
148/// - OPTIONS without Origin: 404
149/// - All other methods (DELETE, PUT, etc.): 404
150async fn handle_root(
151    method: Method,
152    uri: Uri,
153    State(db): State<Database>,
154    headers: HeaderMap,
155    body: String,
156) -> Response {
157    let has_origin = headers.get("origin").is_some();
158
159    let mut resp = match method {
160        Method::GET => {
161            let body_str = format!("healthy: dynamodb.{AWS_REGION}.amazonaws.com ");
162            dynamo_response_raw(StatusCode::OK, &body_str)
163        }
164        Method::OPTIONS if has_origin => {
165            // CORS preflight: return 200 with CORS headers
166            let mut r = Response::builder()
167                .status(StatusCode::OK)
168                .body(Body::from(""))
169                .unwrap();
170            add_dynamo_headers(&mut r, b"");
171            // Set content-length to 0 explicitly
172            r.headers_mut().insert(
173                HeaderName::from_static("content-length"),
174                HeaderValue::from_static("0"),
175            );
176            r.headers_mut().insert(
177                HeaderName::from_static("access-control-allow-origin"),
178                HeaderValue::from_static("*"),
179            );
180            r.headers_mut().insert(
181                HeaderName::from_static("access-control-max-age"),
182                HeaderValue::from_static("172800"),
183            );
184            // Echo back request headers and method if present
185            if let Some(req_headers) = headers.get("access-control-request-headers") {
186                r.headers_mut().insert(
187                    HeaderName::from_static("access-control-allow-headers"),
188                    req_headers.clone(),
189                );
190            }
191            if let Some(req_method) = headers.get("access-control-request-method") {
192                r.headers_mut().insert(
193                    HeaderName::from_static("access-control-allow-methods"),
194                    req_method.clone(),
195                );
196            }
197            return r;
198        }
199        Method::POST => handle_request(uri, State(db), headers.clone(), body).await,
200        _ => {
201            // OPTIONS without Origin, DELETE, PUT, PATCH, etc. — all return 404.
202            dynamo_response_raw(StatusCode::NOT_FOUND, NOT_FOUND_BODY)
203        }
204    };
205
206    // Add CORS header to all responses if Origin is present
207    if has_origin {
208        resp.headers_mut().insert(
209            HeaderName::from_static("access-control-allow-origin"),
210            HeaderValue::from_static("*"),
211        );
212    }
213
214    resp
215}
216
217/// Fallback for all unmatched routes — returns 404.
218async fn handle_fallback() -> Response {
219    dynamo_response_raw(StatusCode::NOT_FOUND, NOT_FOUND_BODY)
220}
221
222async fn shutdown_signal() {
223    #[cfg(unix)]
224    {
225        use tokio::signal::unix::{SignalKind, signal};
226        let mut sigterm =
227            signal(SignalKind::terminate()).expect("failed to install SIGTERM handler");
228        tokio::select! {
229            _ = tokio::signal::ctrl_c() => {},
230            _ = sigterm.recv() => {},
231        }
232    }
233    #[cfg(not(unix))]
234    {
235        tokio::signal::ctrl_c()
236            .await
237            .expect("failed to install CTRL+C handler");
238    }
239    eprintln!("\nShutting down...");
240}
241
242async fn handle_request(
243    uri: Uri,
244    State(db): State<Database>,
245    headers: HeaderMap,
246    body: String,
247) -> Response {
248    // Check Content-Type header — DynamoDB accepts both application/json and
249    // application/x-amz-json-1.0, with optional parameters (e.g. ;charset=utf-8).
250    // The response Content-Type echoes the base media type from the request.
251    let raw_ct = headers
252        .get("content-type")
253        .and_then(|v| v.to_str().ok())
254        .unwrap_or("");
255
256    // Strip parameters and whitespace: "  application/json  ; charset=utf-8" → "application/json"
257    let base_ct = raw_ct.split(';').next().unwrap_or("").trim();
258
259    let is_amz_json = base_ct.eq_ignore_ascii_case(CONTENT_TYPE);
260    let is_plain_json = base_ct.eq_ignore_ascii_case("application/json");
261
262    // If neither recognised Content-Type AND there is a body, return 404.
263    // A POST with no body (or empty body) and no Content-Type is still treated
264    // as a DynamoDB request (DynamoDB accepts it and parses the empty body as JSON).
265    if !is_amz_json && !is_plain_json && (!body.is_empty() || !raw_ct.is_empty()) {
266        return dynamo_response_raw(StatusCode::NOT_FOUND, NOT_FOUND_BODY);
267    }
268
269    // Determine response content-type: echo the request's base media type.
270    // application/x-amz-json-1.0 requests get that back; everything else gets application/json.
271    let response_ct = if is_amz_json {
272        CONTENT_TYPE
273    } else {
274        "application/json"
275    };
276
277    // Try to parse body as JSON. DynamoDB requires a valid JSON object.
278    // Non-JSON → SerializationException (no message).
279    if !body.is_empty() && serde_json::from_str::<serde_json::Value>(&body).is_err() {
280        return serialization_exception_bare(response_ct);
281    }
282
283    // Check x-amz-target header
284    // NOTE: empty body check happens after target resolution — DynamoDB returns
285    // UnknownOperationException if no target, even with empty body.
286    let target = match headers.get("x-amz-target").and_then(|v| v.to_str().ok()) {
287        Some(t) => t,
288        None => {
289            // No target header — UnknownOperationException (no message)
290            return unknown_operation_response(response_ct);
291        }
292    };
293
294    let operation = target
295        .strip_prefix(TARGET_PREFIX)
296        .or_else(|| target.strip_prefix(STREAMS_TARGET_PREFIX));
297
298    let operation = match operation {
299        Some(op) if crate::dynamo_ops::is_known_operation(op) => op,
300        _ => {
301            // Unrecognised target prefix or unknown operation
302            return unknown_operation_response(response_ct);
303        }
304    };
305
306    // Validate authentication headers — DynamoDB checks auth after target resolution.
307    if let Some(auth_error) = validate_auth(&headers, &uri, response_ct) {
308        return auth_error;
309    }
310
311    // Empty body with a valid target → SerializationException (bare, no message).
312    // DynamoDB requires a JSON body for all operations.
313    if body.is_empty() {
314        return serialization_exception_bare(response_ct);
315    }
316
317    tracing::debug!(operation, body_len = body.len(), "request");
318    tracing::trace!(operation, body = %body, "request body");
319
320    match dispatch(&db, operation, &body) {
321        Ok(json) => {
322            tracing::debug!(operation, body_len = json.len(), "response");
323            tracing::trace!(operation, body = %json, "response body");
324            dynamo_response(StatusCode::OK, response_ct, json)
325        }
326        Err(e) => {
327            let status = StatusCode::from_u16(e.status_code()).unwrap_or(StatusCode::BAD_REQUEST);
328            let json = e.to_json();
329            tracing::warn!(operation, status = %status, "error response");
330            tracing::trace!(operation, body = %json, "error response body");
331            dynamo_response(status, response_ct, json)
332        }
333    }
334}
335
336/// Validate AWS authentication headers/query parameters.
337///
338/// DynamoDB checks auth after resolving the target operation. Returns `Some(Response)` if
339/// auth validation fails, `None` if auth is present (or if we choose to skip full
340/// signature verification).
341fn validate_auth(headers: &HeaderMap, uri: &Uri, response_ct: &str) -> Option<Response> {
342    let auth_header = headers.get("authorization").and_then(|v| v.to_str().ok());
343
344    // Check query string for X-Amz-Algorithm
345    let query = uri.query().unwrap_or("");
346    let has_algorithm_query = query.split('&').any(|p| {
347        let key = p.split('=').next().unwrap_or("");
348        key == "X-Amz-Algorithm"
349    });
350
351    // If both Authorization header AND X-Amz-Algorithm query → InvalidSignatureException
352    if auth_header.is_some() && has_algorithm_query {
353        let body = serde_json::json!({
354            "__type": "com.amazon.coral.service#InvalidSignatureException",
355            "message": "Found both 'X-Amz-Algorithm' as a query-string param and 'Authorization' as HTTP header."
356        })
357        .to_string();
358        return Some(dynamo_response(StatusCode::BAD_REQUEST, response_ct, body));
359    }
360
361    // Query-string auth (X-Amz-Algorithm present)
362    if has_algorithm_query {
363        let mut missing = Vec::new();
364        let query_params: Vec<&str> = query
365            .split('&')
366            .map(|p| p.split('=').next().unwrap_or(""))
367            .collect();
368
369        // Check if X-Amz-Algorithm has a non-empty value
370        let algo_has_value = query.split('&').any(|p| {
371            let mut parts = p.splitn(2, '=');
372            let key = parts.next().unwrap_or("");
373            let val = parts.next().unwrap_or("");
374            key == "X-Amz-Algorithm" && !val.is_empty()
375        });
376
377        if !algo_has_value {
378            missing.push("'X-Amz-Algorithm'");
379        }
380        for (param, label) in [
381            ("X-Amz-Credential", "'X-Amz-Credential'"),
382            ("X-Amz-Signature", "'X-Amz-Signature'"),
383            ("X-Amz-SignedHeaders", "'X-Amz-SignedHeaders'"),
384            ("X-Amz-Date", "'X-Amz-Date'"),
385        ] {
386            if !query_params.contains(&param) {
387                missing.push(label);
388            }
389        }
390
391        if !missing.is_empty() {
392            let parts: Vec<String> = missing
393                .iter()
394                .map(|p| format!("AWS query-string parameters must include {p}. "))
395                .collect();
396            let msg = format!("{}Re-examine the query-string parameters.", parts.join(""));
397            let body = serde_json::json!({
398                "__type": "com.amazon.coral.service#IncompleteSignatureException",
399                "message": msg
400            })
401            .to_string();
402            return Some(dynamo_response(StatusCode::BAD_REQUEST, response_ct, body));
403        }
404
405        // Query auth is present and complete — allow through
406        return None;
407    }
408
409    // Header-based auth
410    match auth_header {
411        None => {
412            // No Authorization header at all → MissingAuthenticationTokenException
413            let body = serde_json::json!({
414                "__type": "com.amazon.coral.service#MissingAuthenticationTokenException",
415                "message": "Request is missing Authentication Token"
416            })
417            .to_string();
418            Some(dynamo_response(StatusCode::BAD_REQUEST, response_ct, body))
419        }
420        Some(auth) => {
421            if !auth.starts_with("AWS4-") {
422                // Authorization header doesn't start with AWS4- → MissingAuthenticationTokenException
423                let body = serde_json::json!({
424                    "__type": "com.amazon.coral.service#MissingAuthenticationTokenException",
425                    "message": "Request is missing Authentication Token"
426                })
427                .to_string();
428                return Some(dynamo_response(StatusCode::BAD_REQUEST, response_ct, body));
429            }
430
431            // AWS4- prefix present — check for required parameters
432            let has_date = headers.get("x-amz-date").is_some() || headers.get("date").is_some();
433
434            // Parse auth header for Credential, Signature, SignedHeaders
435            // These can be separated by spaces or commas
436            let has_credential = auth.contains("Credential=") || auth.contains("credential=");
437            let has_signature = auth.contains("Signature=") || auth.contains("signature=");
438            let has_signed_headers =
439                auth.contains("SignedHeaders=") || auth.contains("signedheaders=");
440
441            let mut missing = Vec::new();
442            if !has_credential {
443                missing.push("'Credential'");
444            }
445            if !has_signature {
446                missing.push("'Signature'");
447            }
448            if !has_signed_headers {
449                missing.push("'SignedHeaders'");
450            }
451            if !has_date {
452                missing.push("existence of either a 'X-Amz-Date' or a 'Date' header.");
453            }
454
455            if missing.is_empty() {
456                // All required parts present — allow through (we don't verify signatures)
457                return None;
458            }
459
460            // Build the IncompleteSignatureException message
461            let mut parts: Vec<String> = missing
462                .iter()
463                .map(|p| {
464                    if p.contains("existence of") {
465                        format!("Authorization header requires {p}")
466                    } else {
467                        format!("Authorization header requires {p} parameter.")
468                    }
469                })
470                .collect();
471            parts.push(format!("Authorization={auth}"));
472            let msg = parts.join(" ");
473            let body = serde_json::json!({
474                "__type": "com.amazon.coral.service#IncompleteSignatureException",
475                "message": msg
476            })
477            .to_string();
478            Some(dynamo_response(StatusCode::BAD_REQUEST, response_ct, body))
479        }
480    }
481}
482
483/// Java ClassCastException message that DynamoDB leaks for certain type mismatches.
484const PARAMETERIZED_TYPE_CAST_ERROR: &str = "class sun.reflect.generics.reflectiveObjects.ParameterizedTypeImpl cannot be cast to class java.lang.Class (sun.reflect.generics.reflectiveObjects.ParameterizedTypeImpl and java.lang.Class are in module java.base of loader 'bootstrap')";
485
486/// SerializationException with no message (just `__type`).
487/// Used for JSON parse failures at the connection level.
488fn serialization_exception_bare(content_type: &str) -> Response {
489    let body = r#"{"__type":"com.amazon.coral.service#SerializationException"}"#.to_string();
490    dynamo_response(StatusCode::BAD_REQUEST, content_type, body)
491}
492
493/// UnknownOperationException with no message (just `__type`).
494fn unknown_operation_response(content_type: &str) -> Response {
495    let body = r#"{"__type":"com.amazon.coral.service#UnknownOperationException"}"#.to_string();
496    dynamo_response(StatusCode::BAD_REQUEST, content_type, body)
497}
498
499/// Pre-check JSON field types that are deserialized as `serde_json::Value`.
500///
501/// DynamoDB returns SerializationException for type mismatches on fields like
502/// AttributeDefinitions, KeySchema, etc. Because our raw request structs use
503/// `Option<serde_json::Value>` for these fields, serde accepts any JSON type.
504/// This function inspects the raw JSON and returns the appropriate
505/// SerializationException before serde gets involved.
506fn pre_check_serialization_types(operation: &str, body: &str) -> crate::Result<()> {
507    let json: serde_json::Value = serde_json::from_str(body)
508        .map_err(|e| crate::DynoxideError::SerializationException(e.to_string()))?;
509
510    let obj = match json.as_object() {
511        Some(o) => o,
512        None => return Ok(()),
513    };
514
515    match operation {
516        "CreateTable" => {
517            check_field_is_list(obj, "AttributeDefinitions")?;
518            check_field_is_list(obj, "KeySchema")?;
519            check_field_is_list(obj, "LocalSecondaryIndexes")?;
520            check_field_is_list(obj, "GlobalSecondaryIndexes")?;
521            check_list_elements_are_structs(obj, "AttributeDefinitions")?;
522            check_list_elements_are_structs(obj, "KeySchema")?;
523            check_list_elements_are_structs(obj, "LocalSecondaryIndexes")?;
524            check_list_elements_are_structs(obj, "GlobalSecondaryIndexes")?;
525
526            // Check struct fields and their inner scalar types
527            check_field_is_struct(obj, "ProvisionedThroughput")?;
528            check_nested_pt_fields(obj)?;
529
530            // Check nested fields inside KeySchema elements
531            check_nested_list_structs(obj, "KeySchema")?;
532            // Check nested fields inside AttributeDefinitions elements
533            check_nested_list_structs(obj, "AttributeDefinitions")?;
534
535            // Check nested list fields inside LocalSecondaryIndexes
536            if let Some(serde_json::Value::Array(arr)) = obj.get("LocalSecondaryIndexes") {
537                for item in arr {
538                    if let Some(inner) = item.as_object() {
539                        check_field_is_struct(inner, "Projection")?;
540                        check_field_is_list(inner, "KeySchema")?;
541                        check_list_elements_are_structs(inner, "KeySchema")?;
542                        check_field_is_string(inner, "IndexName")?;
543                        check_nested_list_structs(inner, "KeySchema")?;
544                        check_nested_projection_fields(inner)?;
545                        if let Some(proj) = inner.get("Projection").and_then(|p| p.as_object()) {
546                            check_field_is_list(proj, "NonKeyAttributes")?;
547                            check_nested_list_strings(proj, "NonKeyAttributes")?;
548                        }
549                    }
550                }
551            }
552
553            // Check nested list fields inside GlobalSecondaryIndexes
554            if let Some(serde_json::Value::Array(arr)) = obj.get("GlobalSecondaryIndexes") {
555                for item in arr {
556                    if let Some(inner) = item.as_object() {
557                        check_field_is_struct(inner, "Projection")?;
558                        check_field_is_struct(inner, "ProvisionedThroughput")?;
559                        check_field_is_list(inner, "KeySchema")?;
560                        check_list_elements_are_structs(inner, "KeySchema")?;
561                        check_field_is_string(inner, "IndexName")?;
562                        check_nested_list_structs(inner, "KeySchema")?;
563                        check_nested_projection_fields(inner)?;
564                        check_nested_pt_fields(inner)?;
565                        if let Some(proj) = inner.get("Projection").and_then(|p| p.as_object()) {
566                            check_field_is_list(proj, "NonKeyAttributes")?;
567                            check_nested_list_strings(proj, "NonKeyAttributes")?;
568                        }
569                    }
570                }
571            }
572        }
573        "UpdateTable" => {
574            check_field_is_list(obj, "GlobalSecondaryIndexUpdates")?;
575            check_list_elements_are_structs(obj, "GlobalSecondaryIndexUpdates")?;
576            check_field_is_struct(obj, "ProvisionedThroughput")?;
577            check_nested_pt_fields(obj)?;
578            // Check inside GlobalSecondaryIndexUpdates
579            if let Some(serde_json::Value::Array(arr)) = obj.get("GlobalSecondaryIndexUpdates") {
580                for item in arr {
581                    if let Some(inner) = item.as_object() {
582                        check_field_is_struct(inner, "Create")?;
583                        check_field_is_struct(inner, "Update")?;
584                        check_field_is_struct(inner, "Delete")?;
585                        if let Some(create) = inner.get("Create").and_then(|v| v.as_object()) {
586                            check_field_is_struct(create, "Projection")?;
587                            check_field_is_struct(create, "ProvisionedThroughput")?;
588                            check_field_is_list(create, "KeySchema")?;
589                            check_list_elements_are_structs(create, "KeySchema")?;
590                            check_nested_list_structs(create, "KeySchema")?;
591                            check_nested_projection_fields(create)?;
592                            check_nested_pt_fields(create)?;
593                        }
594                        if let Some(update) = inner.get("Update").and_then(|v| v.as_object()) {
595                            check_field_is_struct(update, "ProvisionedThroughput")?;
596                            check_nested_pt_fields(update)?;
597                        }
598                    }
599                }
600            }
601        }
602        "PutItem" | "DeleteItem" | "UpdateItem" => {
603            check_field_is_map(
604                obj,
605                "AttributeUpdates",
606                "com.amazonaws.dynamodb.v20120810.AttributeValueUpdate",
607            )?;
608            check_map_values_are_structs(obj, "AttributeUpdates")?;
609        }
610        "Query" => {
611            check_field_is_map(
612                obj,
613                "KeyConditions",
614                "com.amazonaws.dynamodb.v20120810.Condition",
615            )?;
616            check_field_is_map(
617                obj,
618                "QueryFilter",
619                "com.amazonaws.dynamodb.v20120810.Condition",
620            )?;
621            check_map_values_are_structs(obj, "QueryFilter")?;
622            check_map_values_are_structs(obj, "KeyConditions")?;
623            check_filter_inner_fields(obj, "QueryFilter")?;
624            check_filter_inner_fields(obj, "KeyConditions")?;
625            check_filter_attribute_value_lists(obj, "QueryFilter")?;
626            check_field_is_map(
627                obj,
628                "ExclusiveStartKey",
629                "com.amazonaws.dynamodb.v20120810.AttributeValue",
630            )?;
631        }
632        "Scan" => {
633            check_field_is_map(
634                obj,
635                "ScanFilter",
636                "com.amazonaws.dynamodb.v20120810.Condition",
637            )?;
638            check_map_values_are_structs(obj, "ScanFilter")?;
639            check_filter_inner_fields(obj, "ScanFilter")?;
640            check_filter_attribute_value_lists(obj, "ScanFilter")?;
641            check_field_is_map(
642                obj,
643                "ExclusiveStartKey",
644                "com.amazonaws.dynamodb.v20120810.AttributeValue",
645            )?;
646        }
647        "BatchGetItem" => {
648            check_field_is_map(
649                obj,
650                "RequestItems",
651                "com.amazonaws.dynamodb.v20120810.KeysAndAttributes",
652            )?;
653            check_map_values_are_structs(obj, "RequestItems")?;
654            // Check nested fields inside RequestItems
655            if let Some(serde_json::Value::Object(ri)) = obj.get("RequestItems") {
656                for (_table, val) in ri {
657                    if let Some(inner) = val.as_object() {
658                        check_field_is_map(inner, "ExpressionAttributeNames", "java.lang.String")?;
659                        // Check Keys array elements are maps, and their values are AV structs
660                        if let Some(serde_json::Value::Array(keys)) = inner.get("Keys") {
661                            for key in keys {
662                                if !key.is_object() && !key.is_null() {
663                                    return Err(crate::DynoxideError::SerializationException(
664                                        PARAMETERIZED_TYPE_CAST_ERROR.to_string(),
665                                    ));
666                                }
667                                if let Some(key_map) = key.as_object() {
668                                    for (_k, v) in key_map {
669                                        if !v.is_object() && !v.is_null() {
670                                            return Err(
671                                                crate::DynoxideError::SerializationException(
672                                                    "Unexpected value type in payload".to_string(),
673                                                ),
674                                            );
675                                        }
676                                    }
677                                }
678                            }
679                        }
680                    }
681                }
682            }
683        }
684        "BatchWriteItem" => {
685            check_field_is_map(
686                obj,
687                "RequestItems",
688                "java.util.List<com.amazonaws.dynamodb.v20120810.WriteRequest>",
689            )?;
690            // Check nested fields inside RequestItems
691            if let Some(serde_json::Value::Object(ri)) = obj.get("RequestItems") {
692                for (_table, val) in ri {
693                    // Each value must be an array of WriteRequests
694                    if !val.is_array() && !val.is_null() {
695                        return Err(crate::DynoxideError::SerializationException(
696                            PARAMETERIZED_TYPE_CAST_ERROR.to_string(),
697                        ));
698                    }
699                    if let Some(items) = val.as_array() {
700                        // Check array elements are structs (WriteRequest)
701                        for item in items {
702                            if !item.is_object() && !item.is_null() {
703                                let msg = if item.is_array() {
704                                    "Unrecognized collection type class com.amazonaws.dynamodb.v20120810.WriteRequest".to_string()
705                                } else {
706                                    "Unexpected value type in payload".to_string()
707                                };
708                                return Err(crate::DynoxideError::SerializationException(msg));
709                            }
710                        }
711                        for item in items {
712                            if let Some(inner) = item.as_object() {
713                                check_field_is_struct(inner, "DeleteRequest")?;
714                                check_field_is_struct(inner, "PutRequest")?;
715                                if let Some(dr) =
716                                    inner.get("DeleteRequest").and_then(|v| v.as_object())
717                                {
718                                    check_field_is_map(
719                                        dr,
720                                        "Key",
721                                        "com.amazonaws.dynamodb.v20120810.AttributeValue",
722                                    )?;
723                                    check_map_values_are_structs(dr, "Key")?;
724                                }
725                                if let Some(pr) =
726                                    inner.get("PutRequest").and_then(|v| v.as_object())
727                                {
728                                    check_field_is_map(
729                                        pr,
730                                        "Item",
731                                        "com.amazonaws.dynamodb.v20120810.AttributeValue",
732                                    )?;
733                                    check_map_values_are_structs(pr, "Item")?;
734                                }
735                            }
736                        }
737                    }
738                }
739            }
740        }
741        "TagResource" => {
742            check_field_is_list(obj, "Tags")?;
743            check_list_elements_are_structs(obj, "Tags")?;
744        }
745        _ => {}
746    }
747
748    // Common map fields — checked AFTER operation-specific nested fields
749    check_field_is_map(
750        obj,
751        "Key",
752        "com.amazonaws.dynamodb.v20120810.AttributeValue",
753    )?;
754    check_field_is_map(
755        obj,
756        "Item",
757        "com.amazonaws.dynamodb.v20120810.AttributeValue",
758    )?;
759    check_field_is_map(obj, "ExpressionAttributeNames", "java.lang.String")?;
760    check_field_is_map(
761        obj,
762        "ExpressionAttributeValues",
763        "com.amazonaws.dynamodb.v20120810.AttributeValue",
764    )?;
765    check_field_is_map(
766        obj,
767        "Expected",
768        "com.amazonaws.dynamodb.v20120810.ExpectedAttributeValue",
769    )?;
770
771    // Check that attribute value map entries are structs (not scalars)
772    check_map_values_are_structs(obj, "Key")?;
773    check_map_values_are_structs(obj, "Item")?;
774    check_map_values_are_structs(obj, "ExpressionAttributeValues")?;
775    check_map_values_are_structs(obj, "ExclusiveStartKey")?;
776    check_map_values_are_structs(obj, "Expected")?;
777
778    // Check Expected.Attr inner fields
779    if let Some(serde_json::Value::Object(expected)) = obj.get("Expected") {
780        for (_attr, cond) in expected {
781            if let Some(cond_obj) = cond.as_object() {
782                check_field_is_bool(cond_obj, "Exists")?;
783            }
784        }
785    }
786
787    // Common scalar fields — checked AFTER nested fields to match DynamoDB ordering
788    check_field_is_string(obj, "TableName")?;
789    check_field_is_string(obj, "IndexName")?;
790    check_field_is_string(obj, "ReturnConsumedCapacity")?;
791    check_field_is_string(obj, "ReturnValues")?;
792    check_field_is_string(obj, "ReturnItemCollectionMetrics")?;
793    check_field_is_string(obj, "ConditionalOperator")?;
794    check_field_is_string(obj, "Select")?;
795    check_field_is_string(obj, "ConditionExpression")?;
796    check_field_is_string(obj, "FilterExpression")?;
797    check_field_is_string(obj, "KeyConditionExpression")?;
798    check_field_is_string(obj, "ProjectionExpression")?;
799    check_field_is_string(obj, "UpdateExpression")?;
800    check_field_is_int(obj, "Limit")?;
801    check_field_is_int(obj, "Segment")?;
802    check_field_is_int(obj, "TotalSegments")?;
803    check_field_is_bool(obj, "ScanIndexForward")?;
804    check_field_is_bool(obj, "ConsistentRead")?;
805
806    Ok(())
807}
808
809/// Check that a field, if present, is a JSON number (integer).
810/// `java_type` is "Long" for PT fields, "Integer" for Limit/Segment/etc.
811fn check_field_is_integer_typed(
812    obj: &serde_json::Map<String, serde_json::Value>,
813    field: &str,
814    java_type: &str,
815) -> crate::Result<()> {
816    let val = match obj.get(field) {
817        Some(v) if !v.is_null() => v,
818        _ => return Ok(()),
819    };
820
821    if val.is_number() {
822        return Ok(());
823    }
824
825    let msg = if val.is_array() {
826        format!("Unrecognized collection type class java.lang.{java_type}")
827    } else if val.is_object() {
828        "Start of structure or map found where not expected".to_string()
829    } else if val.is_boolean() {
830        if val.as_bool() == Some(true) {
831            format!("TRUE_VALUE cannot be converted to {java_type}")
832        } else {
833            format!("FALSE_VALUE cannot be converted to {java_type}")
834        }
835    } else if val.is_string() {
836        format!("STRING_VALUE cannot be converted to {java_type}")
837    } else {
838        "Unexpected field type".to_string()
839    };
840
841    Err(crate::DynoxideError::SerializationException(msg))
842}
843
844/// Check integer field using "Long" type (for PT fields).
845fn check_field_is_integer(
846    obj: &serde_json::Map<String, serde_json::Value>,
847    field: &str,
848) -> crate::Result<()> {
849    check_field_is_integer_typed(obj, field, "Long")
850}
851
852/// Check integer field using "Integer" type (for Limit, Segment, etc.).
853fn check_field_is_int(
854    obj: &serde_json::Map<String, serde_json::Value>,
855    field: &str,
856) -> crate::Result<()> {
857    check_field_is_integer_typed(obj, field, "Integer")
858}
859
860/// Check that a field, if present and not null, is a JSON string.
861/// Returns SerializationException for wrong types.
862fn check_field_is_string(
863    obj: &serde_json::Map<String, serde_json::Value>,
864    field: &str,
865) -> crate::Result<()> {
866    let val = match obj.get(field) {
867        Some(v) if !v.is_null() => v,
868        _ => return Ok(()),
869    };
870
871    if val.is_string() {
872        return Ok(());
873    }
874
875    let msg = if val.is_array() {
876        "Unrecognized collection type class java.lang.String".to_string()
877    } else if val.is_object() {
878        "Start of structure or map found where not expected".to_string()
879    } else if val.as_bool() == Some(true) {
880        "TRUE_VALUE cannot be converted to String".to_string()
881    } else if val.as_bool() == Some(false) {
882        "FALSE_VALUE cannot be converted to String".to_string()
883    } else if val.is_number() {
884        // DynamoDB distinguishes DECIMAL_VALUE (float) from NUMBER_VALUE (int)
885        if val.is_f64() && !val.is_i64() && !val.is_u64() {
886            "DECIMAL_VALUE cannot be converted to String".to_string()
887        } else {
888            "NUMBER_VALUE cannot be converted to String".to_string()
889        }
890    } else {
891        "Unexpected field type".to_string()
892    };
893
894    Err(crate::DynoxideError::SerializationException(msg))
895}
896
897/// Check that a field, if present and not null, is a JSON boolean.
898/// Returns SerializationException for wrong types.
899fn check_field_is_bool(
900    obj: &serde_json::Map<String, serde_json::Value>,
901    field: &str,
902) -> crate::Result<()> {
903    let val = match obj.get(field) {
904        Some(v) if !v.is_null() => v,
905        _ => return Ok(()),
906    };
907
908    if val.is_boolean() {
909        return Ok(());
910    }
911
912    let msg = if val.is_array() {
913        "Unrecognized collection type class java.lang.Boolean".to_string()
914    } else if val.is_object() {
915        "Start of structure or map found where not expected".to_string()
916    } else if val.is_string() {
917        "Unexpected token received from parser".to_string()
918    } else if val.is_number() {
919        if val.is_f64() && !val.is_i64() && !val.is_u64() {
920            "DECIMAL_VALUE cannot be converted to Boolean".to_string()
921        } else {
922            "NUMBER_VALUE cannot be converted to Boolean".to_string()
923        }
924    } else {
925        "Unexpected field type".to_string()
926    };
927
928    Err(crate::DynoxideError::SerializationException(msg))
929}
930
931/// Check that all elements in a list field are JSON objects (structs).
932/// Returns "Unexpected value type in payload" for non-struct elements.
933fn check_list_elements_are_structs(
934    obj: &serde_json::Map<String, serde_json::Value>,
935    field: &str,
936) -> crate::Result<()> {
937    let java_class = match field {
938        "KeySchema" => "com.amazonaws.dynamodb.v20120810.KeySchemaElement",
939        "AttributeDefinitions" => "com.amazonaws.dynamodb.v20120810.AttributeDefinition",
940        "LocalSecondaryIndexes" => "com.amazonaws.dynamodb.v20120810.LocalSecondaryIndex",
941        "GlobalSecondaryIndexes" => "com.amazonaws.dynamodb.v20120810.GlobalSecondaryIndex",
942        "GlobalSecondaryIndexUpdates" => {
943            "com.amazonaws.dynamodb.v20120810.GlobalSecondaryIndexUpdate"
944        }
945        "Tags" => "com.amazonaws.dynamodb.v20120810.Tag",
946        _ => "Unknown",
947    };
948    if let Some(serde_json::Value::Array(arr)) = obj.get(field) {
949        for item in arr {
950            if !item.is_object() && !item.is_null() {
951                let msg = if item.is_array() {
952                    format!("Unrecognized collection type class {java_class}")
953                } else {
954                    "Unexpected value type in payload".to_string()
955                };
956                return Err(crate::DynoxideError::SerializationException(msg));
957            }
958        }
959    }
960    Ok(())
961}
962
963/// Check scalar fields inside a ProvisionedThroughput struct.
964fn check_nested_pt_fields(obj: &serde_json::Map<String, serde_json::Value>) -> crate::Result<()> {
965    if let Some(pt) = obj.get("ProvisionedThroughput").and_then(|v| v.as_object()) {
966        check_field_is_integer(pt, "WriteCapacityUnits")?;
967        check_field_is_integer(pt, "ReadCapacityUnits")?;
968    }
969    Ok(())
970}
971
972/// Check scalar fields inside a Projection struct.
973fn check_nested_projection_fields(
974    obj: &serde_json::Map<String, serde_json::Value>,
975) -> crate::Result<()> {
976    if let Some(proj) = obj.get("Projection").and_then(|v| v.as_object()) {
977        check_field_is_string(proj, "ProjectionType")?;
978    }
979    Ok(())
980}
981
982/// Check that elements inside a list field are structs, and check their scalar fields.
983fn check_nested_list_structs(
984    obj: &serde_json::Map<String, serde_json::Value>,
985    field: &str,
986) -> crate::Result<()> {
987    if let Some(serde_json::Value::Array(arr)) = obj.get(field) {
988        for item in arr {
989            if let Some(inner) = item.as_object() {
990                // Common struct fields in KeySchema/AttributeDefinitions elements
991                check_field_is_string(inner, "KeyType")?;
992                check_field_is_string(inner, "AttributeName")?;
993                check_field_is_string(inner, "AttributeType")?;
994                check_field_is_string(inner, "IndexName")?;
995            }
996        }
997    }
998    Ok(())
999}
1000
1001/// Check that elements inside a string list field are actually strings.
1002fn check_nested_list_strings(
1003    obj: &serde_json::Map<String, serde_json::Value>,
1004    field: &str,
1005) -> crate::Result<()> {
1006    if let Some(serde_json::Value::Array(arr)) = obj.get(field) {
1007        for item in arr {
1008            if !item.is_string() && !item.is_null() {
1009                if item.is_boolean() {
1010                    let val = if item.as_bool() == Some(true) {
1011                        "TRUE_VALUE"
1012                    } else {
1013                        "FALSE_VALUE"
1014                    };
1015                    return Err(crate::DynoxideError::SerializationException(format!(
1016                        "{val} cannot be converted to String"
1017                    )));
1018                } else if item.is_number() {
1019                    return Err(crate::DynoxideError::SerializationException(
1020                        "NUMBER_VALUE cannot be converted to String".to_string(),
1021                    ));
1022                }
1023            }
1024        }
1025    }
1026    Ok(())
1027}
1028
1029/// Check that all values in a map field (if present) are JSON objects (attribute value structs).
1030fn check_map_values_are_structs(
1031    obj: &serde_json::Map<String, serde_json::Value>,
1032    field: &str,
1033) -> crate::Result<()> {
1034    let java_class = match field {
1035        "Key" | "Item" | "ExpressionAttributeValues" | "ExclusiveStartKey" => {
1036            "com.amazonaws.dynamodb.v20120810.AttributeValue"
1037        }
1038        "Expected" => "com.amazonaws.dynamodb.v20120810.ExpectedAttributeValue",
1039        "AttributeUpdates" => "com.amazonaws.dynamodb.v20120810.AttributeValueUpdate",
1040        "RequestItems" => "com.amazonaws.dynamodb.v20120810.KeysAndAttributes",
1041        "KeyConditions" | "QueryFilter" | "ScanFilter" => {
1042            "com.amazonaws.dynamodb.v20120810.Condition"
1043        }
1044        _ => "Unknown",
1045    };
1046    if let Some(serde_json::Value::Object(map)) = obj.get(field) {
1047        for (_key, val) in map {
1048            if !val.is_object() && !val.is_null() {
1049                let msg = if val.is_array() {
1050                    format!("Unrecognized collection type class {java_class}")
1051                } else {
1052                    "Unexpected value type in payload".to_string()
1053                };
1054                return Err(crate::DynoxideError::SerializationException(msg));
1055            }
1056        }
1057    }
1058    Ok(())
1059}
1060
1061/// Check that a field, if present and not null, is a JSON object (map).
1062/// Returns SerializationException with the DynamoDB Java type in the message.
1063fn check_field_is_map(
1064    obj: &serde_json::Map<String, serde_json::Value>,
1065    field: &str,
1066    java_value_type: &str,
1067) -> crate::Result<()> {
1068    let val = match obj.get(field) {
1069        Some(v) if !v.is_null() => v,
1070        _ => return Ok(()),
1071    };
1072
1073    if val.is_object() {
1074        return Ok(());
1075    }
1076
1077    let msg = if val.is_array() {
1078        format!("Unrecognized collection type java.util.Map<java.lang.String, {java_value_type}>")
1079    } else {
1080        // Scalar value where map expected → DynamoDB returns "Unexpected field type"
1081        "Unexpected field type".to_string()
1082    };
1083
1084    Err(crate::DynoxideError::SerializationException(msg))
1085}
1086
1087/// Check that a field, if present and not null, is a JSON object (struct).
1088/// Returns SerializationException with the appropriate message for the wrong type.
1089fn check_field_is_struct(
1090    obj: &serde_json::Map<String, serde_json::Value>,
1091    field: &str,
1092) -> crate::Result<()> {
1093    let val = match obj.get(field) {
1094        Some(v) if !v.is_null() => v,
1095        _ => return Ok(()),
1096    };
1097
1098    if val.is_object() {
1099        return Ok(());
1100    }
1101
1102    let msg = if val.is_array() {
1103        // Try to map field name to DynamoDB Java class
1104        let dynamo_class = match field {
1105            "ProvisionedThroughput" => {
1106                Some("com.amazonaws.dynamodb.v20120810.ProvisionedThroughput")
1107            }
1108            "Projection" => Some("com.amazonaws.dynamodb.v20120810.Projection"),
1109            "DeleteRequest" => Some("com.amazonaws.dynamodb.v20120810.DeleteRequest"),
1110            "PutRequest" => Some("com.amazonaws.dynamodb.v20120810.PutRequest"),
1111            "Create" => Some("com.amazonaws.dynamodb.v20120810.CreateGlobalSecondaryIndexAction"),
1112            "Update" => Some("com.amazonaws.dynamodb.v20120810.UpdateGlobalSecondaryIndexAction"),
1113            "Delete" => Some("com.amazonaws.dynamodb.v20120810.DeleteGlobalSecondaryIndexAction"),
1114            _ => None,
1115        };
1116        if let Some(cls) = dynamo_class {
1117            format!("Unrecognized collection type class {cls}")
1118        } else {
1119            "Start of structure or map found where not expected".to_string()
1120        }
1121    } else {
1122        // Scalar value where struct expected
1123        "Unexpected field type".to_string()
1124    };
1125
1126    Err(crate::DynoxideError::SerializationException(msg))
1127}
1128
1129/// Check that a field, if present and not null, is a JSON array.
1130/// Returns the appropriate SerializationException message for the wrong type.
1131fn check_field_is_list(
1132    obj: &serde_json::Map<String, serde_json::Value>,
1133    field: &str,
1134) -> crate::Result<()> {
1135    let val = match obj.get(field) {
1136        Some(v) if !v.is_null() => v,
1137        _ => return Ok(()),
1138    };
1139
1140    if val.is_array() {
1141        return Ok(());
1142    }
1143
1144    let msg = if val.is_object() {
1145        "Start of structure or map found where not expected".to_string()
1146    } else {
1147        "Unexpected field type".to_string()
1148    };
1149
1150    Err(crate::DynoxideError::SerializationException(msg))
1151}
1152
1153/// Check scalar fields inside filter condition map entries (QueryFilter/ScanFilter/KeyConditions).
1154fn check_filter_inner_fields(
1155    obj: &serde_json::Map<String, serde_json::Value>,
1156    filter_field: &str,
1157) -> crate::Result<()> {
1158    let filter = match obj.get(filter_field) {
1159        Some(v) if v.is_object() => v.as_object().unwrap(),
1160        _ => return Ok(()),
1161    };
1162
1163    for (_attr_name, condition) in filter {
1164        if let Some(cond_obj) = condition.as_object() {
1165            check_field_is_string(cond_obj, "ComparisonOperator")?;
1166            check_field_is_list(cond_obj, "AttributeValueList")?;
1167            // Check AVL elements are attr structs
1168            if let Some(serde_json::Value::Array(avl)) = cond_obj.get("AttributeValueList") {
1169                for item in avl {
1170                    if !item.is_object() && !item.is_null() {
1171                        let msg = if item.is_array() {
1172                            "Unrecognized collection type class com.amazonaws.dynamodb.v20120810.AttributeValue"
1173                                .to_string()
1174                        } else {
1175                            "Unexpected value type in payload".to_string()
1176                        };
1177                        return Err(crate::DynoxideError::SerializationException(msg));
1178                    }
1179                }
1180            }
1181        }
1182    }
1183    Ok(())
1184}
1185
1186/// Check AttributeValueList fields inside a filter map (QueryFilter/ScanFilter).
1187///
1188/// The filter is a map of attribute names to condition objects, each of which
1189/// may contain an AttributeValueList that must be an array.
1190fn check_filter_attribute_value_lists(
1191    obj: &serde_json::Map<String, serde_json::Value>,
1192    filter_field: &str,
1193) -> crate::Result<()> {
1194    let filter = match obj.get(filter_field) {
1195        Some(v) if v.is_object() => v.as_object().unwrap(),
1196        _ => return Ok(()),
1197    };
1198
1199    for (_attr_name, condition) in filter {
1200        if let Some(cond_obj) = condition.as_object() {
1201            check_field_is_list(cond_obj, "AttributeValueList")?;
1202        }
1203    }
1204
1205    Ok(())
1206}
1207
1208fn dispatch(db: &Database, operation: &str, body: &str) -> crate::Result<String> {
1209    // Pre-check JSON field types for operations that use serde_json::Value internally.
1210    // These checks must run before serde deserialisation because serde_json::Value accepts
1211    // any JSON type, so type mismatches on list/struct fields would silently pass through.
1212    pre_check_serialization_types(operation, body)?;
1213
1214    match operation {
1215        "CreateTable" => {
1216            let req = deserialize(body)?;
1217            let resp = db.create_table(req)?;
1218            serialize(&resp)
1219        }
1220        "DeleteTable" => {
1221            let req = deserialize(body)?;
1222            let resp = db.delete_table(req)?;
1223            serialize(&resp)
1224        }
1225        "DescribeTable" => {
1226            let req = deserialize(body)?;
1227            let resp = db.describe_table(req)?;
1228            serialize(&resp)
1229        }
1230        "ListTables" => {
1231            let req = deserialize(body)?;
1232            let resp = db.list_tables(req)?;
1233            serialize(&resp)
1234        }
1235        "UpdateTable" => {
1236            let req = deserialize(body)?;
1237            let resp = db.update_table(req)?;
1238            serialize(&resp)
1239        }
1240        "PutItem" => {
1241            let req = deserialize(body)?;
1242            let resp = db.put_item(req)?;
1243            serialize(&resp)
1244        }
1245        "GetItem" => {
1246            let req = deserialize(body)?;
1247            let resp = db.get_item(req)?;
1248            serialize(&resp)
1249        }
1250        "DeleteItem" => {
1251            let req = deserialize(body)?;
1252            let resp = db.delete_item(req)?;
1253            serialize(&resp)
1254        }
1255        "UpdateItem" => {
1256            let req = deserialize(body)?;
1257            let resp = db.update_item(req)?;
1258            serialize(&resp)
1259        }
1260        "Query" => {
1261            let req = deserialize(body)?;
1262            let resp = db.query(req)?;
1263            serialize(&resp)
1264        }
1265        "Scan" => {
1266            let req = deserialize(body)?;
1267            let resp = db.scan(req)?;
1268            serialize(&resp)
1269        }
1270        "BatchGetItem" => {
1271            let req = deserialize(body)?;
1272            let resp = db.batch_get_item(req)?;
1273            serialize(&resp)
1274        }
1275        "BatchWriteItem" => {
1276            let req = deserialize(body)?;
1277            let resp = db.batch_write_item(req)?;
1278            serialize(&resp)
1279        }
1280        "TransactWriteItems" => {
1281            let req = deserialize(body)?;
1282            let resp = db.transact_write_items(req)?;
1283            serialize(&resp)
1284        }
1285        "TransactGetItems" => {
1286            let req = deserialize(body)?;
1287            let resp = db.transact_get_items(req)?;
1288            serialize(&resp)
1289        }
1290        "ListStreams" => {
1291            let req = deserialize(body)?;
1292            let resp = db.list_streams(req)?;
1293            serialize(&resp)
1294        }
1295        "DescribeStream" => {
1296            let req = deserialize(body)?;
1297            let resp = db.describe_stream(req)?;
1298            serialize(&resp)
1299        }
1300        "GetShardIterator" => {
1301            let req = deserialize(body)?;
1302            let resp = db.get_shard_iterator(req)?;
1303            serialize(&resp)
1304        }
1305        "GetRecords" => {
1306            let req = deserialize(body)?;
1307            let resp = db.get_records(req)?;
1308            serialize(&resp)
1309        }
1310        "UpdateTimeToLive" => {
1311            let req = deserialize(body)?;
1312            let resp = db.update_time_to_live(req)?;
1313            serialize(&resp)
1314        }
1315        "DescribeTimeToLive" => {
1316            let req = deserialize(body)?;
1317            let resp = db.describe_time_to_live(req)?;
1318            serialize(&resp)
1319        }
1320        "ExecuteStatement" => {
1321            let req = deserialize(body)?;
1322            let resp = db.execute_statement(req)?;
1323            serialize(&resp)
1324        }
1325        "ExecuteTransaction" => {
1326            let req = deserialize(body)?;
1327            let resp = db.execute_transaction(req)?;
1328            serialize(&resp)
1329        }
1330        "BatchExecuteStatement" => {
1331            let req = deserialize(body)?;
1332            let resp = db.batch_execute_statement(req)?;
1333            serialize(&resp)
1334        }
1335        "TagResource" => {
1336            let req = deserialize(body)?;
1337            let resp = db.tag_resource(req)?;
1338            serialize(&resp)
1339        }
1340        "UntagResource" => {
1341            let req = deserialize(body)?;
1342            let resp = db.untag_resource(req)?;
1343            serialize(&resp)
1344        }
1345        "ListTagsOfResource" => {
1346            let req = deserialize(body)?;
1347            let resp = db.list_tags_of_resource(req)?;
1348            serialize(&resp)
1349        }
1350        _ => {
1351            // This should not be reachable because the dynamo_ops::is_known_operation
1352            // gate on the target match filters first, but handle it defensively.
1353            Err(crate::DynoxideError::SerializationException(
1354                "UnknownOperationException".to_string(),
1355            ))
1356        }
1357    }
1358}
1359
1360fn deserialize<T: serde::de::DeserializeOwned>(body: &str) -> crate::Result<T> {
1361    serde_json::from_str(body).map_err(|e| {
1362        let msg = e.to_string();
1363        // Custom validation errors from our Deserialize impls use a "VALIDATION:" prefix
1364        // to signal that these should be ValidationException, not SerializationException.
1365        if let Some(stripped) = msg.strip_prefix("VALIDATION:") {
1366            // serde_json appends " at line N column N" to custom errors — strip it
1367            let clean = strip_serde_position(stripped);
1368            return crate::DynoxideError::ValidationException(clean.to_string());
1369        }
1370        // DynamoDB returns ValidationException for missing required fields,
1371        // null values, and unrecognised enum variants. Only true JSON type
1372        // mismatches (e.g. number where string is expected) produce a
1373        // SerializationException.
1374        if msg.contains("missing field")
1375            || msg.contains("unknown variant")
1376            || msg.contains("invalid type: null")
1377        {
1378            crate::DynoxideError::ValidationException(msg)
1379        } else if msg.contains("empty AttributeValue") {
1380            crate::DynoxideError::ValidationException(
1381                "Supplied AttributeValue is empty, must contain exactly one of the supported datatypes".to_string(),
1382            )
1383        } else if msg.contains("Supplied AttributeValue") {
1384            // Multi-datatype or empty AV error — strip position info and return as-is
1385            let clean = strip_serde_position(&msg);
1386            crate::DynoxideError::ValidationException(clean)
1387        } else {
1388            crate::DynoxideError::SerializationException(map_serde_to_dynamodb_message(&msg, body))
1389        }
1390    })
1391}
1392
1393/// Strip serde_json's " at line N column N" suffix from error messages.
1394fn strip_serde_position(msg: &str) -> String {
1395    if let Some(idx) = msg.rfind(" at line ") {
1396        // Verify the suffix looks like " at line N column N"
1397        let suffix = &msg[idx..];
1398        if suffix.contains("column") {
1399            return msg[..idx].to_string();
1400        }
1401    }
1402    msg.to_string()
1403}
1404
1405/// Map serde deserialisation error messages to DynamoDB-style SerializationException messages.
1406///
1407/// DynamoDB returns specific messages like "NUMBER_VALUE cannot be converted to String"
1408/// whereas serde returns "invalid type: integer `23`, expected a string at line 1 column 42".
1409fn map_serde_to_dynamodb_message(msg: &str, body: &str) -> String {
1410    // "invalid type: <type>, expected <target>"
1411    if let Some(rest) = msg.strip_prefix("invalid type: ") {
1412        // Extract the source type and target type
1413        let (source_part, target_part) = match rest.split_once(", expected ") {
1414            Some((s, t)) => (s, t),
1415            None => return msg.to_string(),
1416        };
1417        // Strip " at line N column N" from target
1418        let target = target_part
1419            .split(" at line ")
1420            .next()
1421            .unwrap_or(target_part)
1422            .trim();
1423
1424        return map_type_mismatch(source_part.trim(), target);
1425    }
1426
1427    // "invalid length N, expected struct X ..." → struct-level errors
1428    if msg.contains("expected struct") && msg.starts_with("invalid length ") {
1429        // Extract struct name from "invalid length N, expected struct X with M elements"
1430        if let Some(rest) = msg.split("expected struct ").nth(1) {
1431            let struct_name = rest.split(' ').next().unwrap_or("Unknown");
1432            if let Some(dynamo_class) = map_struct_to_dynamo_class(struct_name) {
1433                return format!("Unrecognized collection type class {dynamo_class}");
1434            }
1435        }
1436        return "Start of structure or map found where not expected".to_string();
1437    }
1438
1439    // "expected string for X at line N column N" → wrong type inside AttributeValue
1440    if msg.starts_with("expected string for ") {
1441        return infer_type_conversion_error(msg, body, "String");
1442    }
1443
1444    // "expected value at line N column N" → wrong value type at position
1445    if msg.starts_with("expected value at line ") {
1446        return infer_type_conversion_error(msg, body, "String");
1447    }
1448
1449    msg.to_string()
1450}
1451
1452/// Map a serde type mismatch to DynamoDB's SerializationException message.
1453fn map_type_mismatch(source: &str, target: &str) -> String {
1454    // Determine target type category
1455    let target_is_string = target == "a string";
1456    let target_is_bool = target == "a boolean";
1457    let target_is_sequence = target == "a sequence";
1458    let target_is_integer = target == "i64" || target == "u64";
1459    let target_is_struct = target.starts_with("struct ");
1460    let target_is_map = target.starts_with("a map") || target.starts_with("map");
1461
1462    // Determine source type
1463    let is_integer = source.starts_with("integer ");
1464    let is_float = source.starts_with("floating point ");
1465    let is_bool_true = source == "boolean `true`";
1466    let is_bool_false = source == "boolean `false`";
1467    let _is_bool = is_bool_true || is_bool_false;
1468    let is_string = source.starts_with("string ");
1469    let is_sequence = source == "sequence";
1470    let is_map = source == "map";
1471
1472    // Map to DynamoDB message based on (source_type, target_type) combination
1473    if target_is_sequence {
1474        // List/array fields
1475        if is_map {
1476            return "Start of structure or map found where not expected".to_string();
1477        }
1478        return "Unexpected field type".to_string();
1479    }
1480
1481    if target_is_string {
1482        if is_bool_true {
1483            return "TRUE_VALUE cannot be converted to String".to_string();
1484        }
1485        if is_bool_false {
1486            return "FALSE_VALUE cannot be converted to String".to_string();
1487        }
1488        if is_float {
1489            return "DECIMAL_VALUE cannot be converted to String".to_string();
1490        }
1491        if is_integer {
1492            return "NUMBER_VALUE cannot be converted to String".to_string();
1493        }
1494        if is_sequence {
1495            return "Unrecognized collection type class java.lang.String".to_string();
1496        }
1497        if is_map {
1498            return "Start of structure or map found where not expected".to_string();
1499        }
1500    }
1501
1502    if target_is_bool {
1503        if is_string {
1504            return "Unexpected token received from parser".to_string();
1505        }
1506        if is_float {
1507            return "DECIMAL_VALUE cannot be converted to Boolean".to_string();
1508        }
1509        if is_integer {
1510            return "NUMBER_VALUE cannot be converted to Boolean".to_string();
1511        }
1512        if is_sequence {
1513            return "Unrecognized collection type class java.lang.Boolean".to_string();
1514        }
1515        if is_map {
1516            return "Start of structure or map found where not expected".to_string();
1517        }
1518    }
1519
1520    if target_is_integer {
1521        if is_string {
1522            return "STRING_VALUE cannot be converted to Long".to_string();
1523        }
1524        if is_bool_true {
1525            return "TRUE_VALUE cannot be converted to Long".to_string();
1526        }
1527        if is_bool_false {
1528            return "FALSE_VALUE cannot be converted to Long".to_string();
1529        }
1530        if is_sequence {
1531            return "Unrecognized collection type class java.lang.Long".to_string();
1532        }
1533        if is_map {
1534            return "Start of structure or map found where not expected".to_string();
1535        }
1536    }
1537
1538    if target_is_struct || target_is_map {
1539        if is_sequence {
1540            // Need to figure out the class from target
1541            if let Some(struct_name) = target.strip_prefix("struct ") {
1542                let name = struct_name.split(' ').next().unwrap_or("Unknown");
1543                if let Some(dynamo_class) = map_struct_to_dynamo_class(name) {
1544                    return format!("Unrecognized collection type class {dynamo_class}");
1545                }
1546            }
1547        }
1548        if is_map && target_is_struct {
1549            return "Start of structure or map found where not expected".to_string();
1550        }
1551        if !is_map && !is_sequence {
1552            return "Unexpected field type".to_string();
1553        }
1554    }
1555
1556    // Fallback: return the original message
1557    source
1558        .split(" at line ")
1559        .next()
1560        .unwrap_or(source)
1561        .to_string()
1562}
1563
1564/// Infer the DynamoDB type conversion error from a serde error message.
1565/// Uses the column position to inspect the actual JSON value in the body.
1566fn infer_type_conversion_error(msg: &str, body: &str, target_type: &str) -> String {
1567    // Try to extract column number from "at line N column N"
1568    if let Some(col_str) = msg.rsplit("column ").next() {
1569        if let Ok(col) = col_str.trim().parse::<usize>() {
1570            // Column is 1-based. Look at the character just before the column
1571            // to determine what type of value serde encountered.
1572            if col > 0 && col <= body.len() {
1573                let ch = body.as_bytes()[col - 1];
1574                return match ch {
1575                    b't' => format!("TRUE_VALUE cannot be converted to {target_type}"),
1576                    b'f' => format!("FALSE_VALUE cannot be converted to {target_type}"),
1577                    b'0'..=b'9' | b'-' => {
1578                        format!("NUMBER_VALUE cannot be converted to {target_type}")
1579                    }
1580                    _ => format!("TRUE_VALUE cannot be converted to {target_type}"),
1581                };
1582            }
1583        }
1584    }
1585    format!("TRUE_VALUE cannot be converted to {target_type}")
1586}
1587
1588/// Map Rust struct names to DynamoDB Java class names for SerializationException messages.
1589fn map_struct_to_dynamo_class(struct_name: &str) -> Option<&'static str> {
1590    match struct_name {
1591        "ProvisionedThroughput" | "ProvisionedThroughputRaw" => {
1592            Some("com.amazonaws.dynamodb.v20120810.ProvisionedThroughput")
1593        }
1594        "Projection" | "ProjectionRaw" => Some("com.amazonaws.dynamodb.v20120810.Projection"),
1595        "KeySchemaElement" | "KeySchemaElementRaw" => {
1596            Some("com.amazonaws.dynamodb.v20120810.KeySchemaElement")
1597        }
1598        "AttributeDefinition" | "AttributeDefinitionRaw" => {
1599            Some("com.amazonaws.dynamodb.v20120810.AttributeDefinition")
1600        }
1601        "LocalSecondaryIndex" | "LocalSecondaryIndexRaw" => {
1602            Some("com.amazonaws.dynamodb.v20120810.LocalSecondaryIndex")
1603        }
1604        "GlobalSecondaryIndex" | "GlobalSecondaryIndexRaw" => {
1605            Some("com.amazonaws.dynamodb.v20120810.GlobalSecondaryIndex")
1606        }
1607        "DeleteGsiAction" | "DeleteGsiActionRaw" => {
1608            Some("com.amazonaws.dynamodb.v20120810.DeleteGlobalSecondaryIndexAction")
1609        }
1610        "CreateGsiAction" | "CreateGsiActionRaw" => {
1611            Some("com.amazonaws.dynamodb.v20120810.CreateGlobalSecondaryIndexAction")
1612        }
1613        "UpdateGsiAction" | "UpdateGsiActionRaw" => {
1614            Some("com.amazonaws.dynamodb.v20120810.UpdateGlobalSecondaryIndexAction")
1615        }
1616        "GlobalSecondaryIndexUpdate" | "GlobalSecondaryIndexUpdateRaw" => {
1617            Some("com.amazonaws.dynamodb.v20120810.GlobalSecondaryIndexUpdate")
1618        }
1619        "Tag" | "TagRaw" => Some("com.amazonaws.dynamodb.v20120810.Tag"),
1620        _ => None,
1621    }
1622}
1623
1624fn serialize<T: serde::Serialize>(val: &T) -> crate::Result<String> {
1625    serde_json::to_string(val).map_err(|e| crate::DynoxideError::InternalServerError(e.to_string()))
1626}
1627
1628/// Generate a DynamoDB-style request ID: 52 uppercase hex characters.
1629/// Real DynamoDB uses `[0-9A-Z]{52}`.
1630fn generate_request_id() -> String {
1631    use uuid::Uuid;
1632    // Generate two UUIDs and concat their uppercase hex (32 chars each → 64 chars, take 52)
1633    let u1 = Uuid::now_v7();
1634    let u2 = Uuid::now_v7();
1635    let hex = format!(
1636        "{}{}",
1637        u1.as_simple().to_string().to_ascii_uppercase(),
1638        u2.as_simple().to_string().to_ascii_uppercase()
1639    );
1640    hex[..52].to_string()
1641}
1642
1643/// Compute CRC32 of response body and return as string.
1644fn compute_crc32(body: &[u8]) -> String {
1645    crc32fast::hash(body).to_string()
1646}
1647
1648/// Add standard DynamoDB headers to a response: x-amzn-requestid, x-amz-crc32, content-length.
1649fn add_dynamo_headers(response: &mut Response, body_bytes: &[u8]) {
1650    let headers = response.headers_mut();
1651    headers.insert(
1652        HeaderName::from_static("x-amzn-requestid"),
1653        HeaderValue::from_str(&generate_request_id()).unwrap(),
1654    );
1655    headers.insert(
1656        HeaderName::from_static("x-amz-crc32"),
1657        HeaderValue::from_str(&compute_crc32(body_bytes)).unwrap(),
1658    );
1659    headers.insert(
1660        HeaderName::from_static("content-length"),
1661        HeaderValue::from_str(&body_bytes.len().to_string()).unwrap(),
1662    );
1663}
1664
1665/// Build a response with proper DynamoDB headers (requestid, crc32, content-length).
1666fn dynamo_response(status: StatusCode, content_type: &str, body_str: String) -> Response {
1667    let body_bytes = body_str.as_bytes();
1668    let mut resp = Response::builder()
1669        .status(status)
1670        .header("content-type", content_type)
1671        .body(Body::from(body_str.clone()))
1672        .unwrap();
1673    add_dynamo_headers(&mut resp, body_bytes);
1674    resp
1675}
1676
1677/// Build a response with proper DynamoDB headers for a raw byte body (e.g. HTML 404).
1678fn dynamo_response_raw(status: StatusCode, body_str: &str) -> Response {
1679    let body_bytes = body_str.as_bytes();
1680    let mut resp = Response::builder()
1681        .status(status)
1682        .body(Body::from(body_str.to_string()))
1683        .unwrap();
1684    add_dynamo_headers(&mut resp, body_bytes);
1685    resp
1686}