Skip to main content

fakecloud_core/
dispatch.rs

1use axum::body::Body;
2use axum::extract::{ConnectInfo, Extension, Query};
3use axum::http::{Request, StatusCode};
4use axum::response::Response;
5use bytes::Bytes;
6use std::collections::HashMap;
7use std::net::SocketAddr;
8use std::sync::Arc;
9
10use crate::auth::{
11    is_root_bypass, ConditionContext, CredentialResolver, IamMode, IamPolicyEvaluator, Principal,
12    PrincipalType, ResourcePolicyProvider,
13};
14use crate::protocol::{self, AwsProtocol};
15use crate::registry::ServiceRegistry;
16use crate::service::{AwsRequest, ResponseBody};
17
18/// The main dispatch handler. All HTTP requests come through here.
19pub async fn dispatch(
20    ConnectInfo(remote_addr): ConnectInfo<SocketAddr>,
21    Extension(registry): Extension<Arc<ServiceRegistry>>,
22    Extension(config): Extension<Arc<DispatchConfig>>,
23    Query(query_params): Query<HashMap<String, String>>,
24    request: Request<Body>,
25) -> Response<Body> {
26    let remote_addr = Some(remote_addr);
27    let request_id = uuid::Uuid::new_v4().to_string();
28
29    let (parts, body) = request.into_parts();
30
31    // Streaming opt-in: if the route is a known large-body S3 / ECR
32    // upload, we skip the buffered `to_bytes` step entirely and hand
33    // the raw body to the service handler. The handler spills it to
34    // disk on the fly. Header-only detection covers every streaming
35    // candidate (none of them rely on form-body sniffing).
36    let stream_route = streaming_route(
37        &parts.method,
38        parts.uri.path(),
39        &parts.headers,
40        &query_params,
41    );
42    let header_only = protocol::detect_service_headers_only(&parts.headers, &query_params);
43    let stream_dispatch = match (&stream_route, &header_only) {
44        // Header-only detection agrees with the URL match — covers S3
45        // PUT object (SigV4 service=s3 in Authorization).
46        (Some(sr), Some(detected)) if sr.0 == detected.service => Some(detected.clone()),
47        // ECR OCI v2 blob upload has no AWS auth header; the path
48        // alone (`/v2/.../blobs/uploads/...`) tells us the route is
49        // ECR. Synthesize a DetectedRequest so dispatch picks the
50        // streaming path. Same special-case the buffered branch
51        // applies on detect_service None (see below).
52        (Some((service, _)), None) if *service == "ecr" => Some(protocol::DetectedRequest {
53            service: "ecr".to_string(),
54            action: String::new(),
55            protocol: AwsProtocol::Rest,
56        }),
57        _ => None,
58    };
59
60    let (body_bytes, body_stream) = if stream_dispatch.is_some() {
61        (Bytes::new(), Some(body))
62    } else {
63        // Buffered path: materialize the body into memory under the
64        // configured cap. `FAKECLOUD_MAX_REQUEST_BODY_BYTES` (default
65        // 1 GiB) caps non-streaming requests; streaming routes have no
66        // cap because nothing materializes the entire body in RAM.
67        let max_body_bytes = max_request_body_bytes();
68        match axum::body::to_bytes(body, max_body_bytes).await {
69            Ok(b) => (b, None),
70            Err(_) => {
71                return build_error_response(
72                    StatusCode::PAYLOAD_TOO_LARGE,
73                    "RequestEntityTooLarge",
74                    "Request body too large",
75                    &request_id,
76                    AwsProtocol::Query,
77                );
78            }
79        }
80    };
81
82    // Detect service and action
83    let detected = if let Some(d) = stream_dispatch {
84        d
85    } else {
86        match protocol::detect_service(&parts.headers, &query_params, &body_bytes) {
87            Some(d) => d,
88            None => {
89                // OPTIONS requests (CORS preflight) don't carry Authorization headers.
90                // Route them to S3 since S3 is the only REST service that handles CORS.
91                // Note: API Gateway CORS preflight is not fully supported in this emulator
92                // because we can't distinguish between S3 and API Gateway OPTIONS requests
93                // without additional context (in real AWS, they have different domains).
94                if parts.method == http::Method::OPTIONS {
95                    protocol::DetectedRequest {
96                        service: "s3".to_string(),
97                        action: String::new(),
98                        protocol: AwsProtocol::Rest,
99                    }
100                } else if parts.uri.path() == "/v2" || parts.uri.path().starts_with("/v2/") {
101                    // OCI Distribution v2 protocol. Docker CLI / OCI clients
102                    // use Basic auth (not SigV4) and GET /v2/ with no body,
103                    // so this must be matched before the apigateway fallback.
104                    protocol::DetectedRequest {
105                        service: "ecr".to_string(),
106                        action: String::new(),
107                        protocol: AwsProtocol::Rest,
108                    }
109                } else if let Some(bucket) = anonymous_s3_bucket(&parts.uri, &config) {
110                    // Unsigned request whose first path segment names an
111                    // existing S3 bucket: an anonymous path-style S3 access
112                    // (e.g. serving a public-read object to a browser). Without
113                    // this it would fall through to the apigateway catch-all
114                    // below and 404 with "Stage not found" (#1707). Authorization
115                    // for the anonymous caller still runs in the IAM block.
116                    tracing::debug!(bucket = %bucket, "routing unsigned request to S3 (existing bucket)");
117                    protocol::DetectedRequest {
118                        service: "s3".to_string(),
119                        action: String::new(),
120                        protocol: AwsProtocol::Rest,
121                    }
122                } else if !parts.uri.path().starts_with("/_") {
123                    // Requests without AWS auth that don't match any service might be
124                    // API Gateway execute API calls (plain HTTP without signatures).
125                    // Route them to apigateway service which will validate if a matching
126                    // API/stage exists. Skip special FakeCloud endpoints (/_*).
127                    protocol::DetectedRequest {
128                        service: "apigateway".to_string(),
129                        action: String::new(),
130                        protocol: AwsProtocol::RestJson,
131                    }
132                } else {
133                    return build_error_response(
134                        StatusCode::BAD_REQUEST,
135                        "MissingAction",
136                        "Could not determine target service or action from request",
137                        &request_id,
138                        AwsProtocol::Query,
139                    );
140                }
141            }
142        }
143    };
144
145    // Bedrock-agent and bedrock-runtime both send `bedrock` in the SigV4
146    // credential scope, but bedrock-agent has its own service handler.
147    // Disambiguate based on the request path.
148    let detected = if detected.service == "bedrock" {
149        let first_seg = parts.uri.path().split('/').nth(1);
150        if matches!(
151            first_seg,
152            Some(
153                "agents"
154                    | "knowledgebases"
155                    | "flows"
156                    | "prompts"
157                    | "tags"
158                    | "retrieveAndGenerate"
159                    | "retrieveAndGenerateStream"
160                    | "optimize-prompt"
161                    | "sessions"
162                    | "invocations"
163                    | "generate-query"
164                    | "rerank"
165            )
166        ) {
167            // Further disambiguate runtime vs control plane for agents/flows paths
168            let segs: Vec<&str> = parts.uri.path().split('/').collect();
169            let is_runtime = matches!(
170                segs.as_slice(),
171                ["", "agents", _, "agentAliases", _, ..]  // InvokeAgent
172                    | ["", "flows", _, "aliases", _]   // InvokeFlow
173                    | ["", "knowledgebases", _, "retrieve"] // Retrieve
174                    | ["", "retrieveAndGenerate"]
175                    | ["", "retrieveAndGenerateStream"]
176                    | ["", "optimize-prompt"]
177                    | ["", "sessions", ..]
178                    | ["", "invocations", ..]
179                    | ["", "generate-query"]
180                    | ["", "rerank"]
181            );
182            if is_runtime {
183                protocol::DetectedRequest {
184                    service: "bedrock-agent-runtime".to_string(),
185                    ..detected
186                }
187            } else {
188                protocol::DetectedRequest {
189                    service: "bedrock-agent".to_string(),
190                    ..detected
191                }
192            }
193        } else {
194            detected
195        }
196    } else {
197        detected
198    };
199
200    // Look up service
201    let service = match registry.get(&detected.service) {
202        Some(s) => s,
203        None => {
204            return build_error_response(
205                detected.protocol.error_status(),
206                "UnknownService",
207                &format!("Service '{}' is not available", detected.service),
208                &request_id,
209                detected.protocol,
210            );
211        }
212    };
213
214    // Extract region and access key from auth header (or presigned query).
215    let auth_header = parts
216        .headers
217        .get("authorization")
218        .and_then(|v| v.to_str().ok())
219        .unwrap_or("");
220    let header_info = fakecloud_aws::sigv4::parse_sigv4(auth_header);
221    let presigned_info = if header_info.is_none() {
222        // Presigned URL: credentials live in the query string.
223        fakecloud_aws::sigv4::parse_sigv4_presigned(&query_params).map(|p| p.as_info())
224    } else {
225        None
226    };
227    let sigv4_info = header_info.or(presigned_info);
228    // SigV2 presigned URLs (`AWSAccessKeyId` + `Signature` + `Expires` query
229    // parameters) carry the access key outside the SigV4 grammar, so the SigV4
230    // parsers above return None. Recover the key here so a SigV2-presigned
231    // request is attributed to its caller instead of being treated as
232    // anonymous (which would deny it under the object read auth gate).
233    let access_key_id = sigv4_info
234        .as_ref()
235        .map(|info| info.access_key.clone())
236        .or_else(|| sigv2_presigned_access_key(&query_params));
237
238    // Host-header routing hint: LocalStack-shaped
239    // `<svc>.<region>.localhost.localstack.cloud[:port]`, real-AWS
240    // `<svc>.<region>.amazonaws.com`, and every S3 virtual-hosted variant
241    // of both. Secondary region source and carries the bucket for
242    // virtual-hosted S3 path rewrite.
243    let host_info = protocol::parse_routing_host_from_headers(&parts.headers);
244
245    let region = sigv4_info
246        .map(|info| info.region)
247        .or_else(|| host_info.as_ref().map(|h| h.region.clone()))
248        .or_else(|| extract_region_from_user_agent(&parts.headers))
249        .unwrap_or_else(|| config.region.clone());
250
251    // Resolve the caller's principal up front so both SigV4 verification
252    // (which needs the secret) and the service handler (which needs the
253    // identity for GetCallerIdentity and IAM enforcement) share a single
254    // lookup. The root-bypass AKID skips resolution entirely — `test`
255    // credentials have no backing identity and must always pass.
256    let caller_akid = access_key_id.as_deref().unwrap_or("");
257    let resolved = if !caller_akid.is_empty() && !is_root_bypass(caller_akid) {
258        config
259            .credential_resolver
260            .as_ref()
261            .and_then(|r| r.resolve(caller_akid))
262    } else {
263        None
264    };
265    let caller_principal = resolved.as_ref().map(|r| r.principal.clone());
266    let caller_session_policies = resolved
267        .as_ref()
268        .map(|r| r.session_policies.clone())
269        .unwrap_or_default();
270
271    // Opt-in SigV4 cryptographic verification. Runs before the service
272    // handler so a failing signature never reaches business logic. The
273    // reserved `test*` root identity short-circuits verification to keep
274    // local-dev workflows frictionless.
275    if config.verify_sigv4 && !is_root_bypass(caller_akid) && config.credential_resolver.is_some() {
276        let amz_date = parts
277            .headers
278            .get("x-amz-date")
279            .and_then(|v| v.to_str().ok());
280        let parsed = fakecloud_aws::sigv4::parse_sigv4_header(auth_header, amz_date)
281            .or_else(|| fakecloud_aws::sigv4::parse_sigv4_presigned(&query_params));
282        let parsed = match parsed {
283            Some(p) => p,
284            None => {
285                return build_error_response(
286                    StatusCode::FORBIDDEN,
287                    "IncompleteSignature",
288                    "Request is missing or has a malformed AWS Signature",
289                    &request_id,
290                    detected.protocol,
291                );
292            }
293        };
294        let resolved_for_verify = match resolved.as_ref() {
295            Some(r) => r,
296            None => {
297                return build_error_response(
298                    StatusCode::FORBIDDEN,
299                    "InvalidClientTokenId",
300                    "The security token included in the request is invalid",
301                    &request_id,
302                    detected.protocol,
303                );
304            }
305        };
306        let headers_vec = fakecloud_aws::sigv4::headers_from_http(&parts.headers);
307        let raw_query_for_verify = parts.uri.query().unwrap_or("").to_string();
308        let verify_req = fakecloud_aws::sigv4::VerifyRequest {
309            method: parts.method.as_str(),
310            path: parts.uri.path(),
311            query: &raw_query_for_verify,
312            headers: &headers_vec,
313            body: &body_bytes,
314        };
315        match fakecloud_aws::sigv4::verify(
316            &parsed,
317            &verify_req,
318            &resolved_for_verify.secret_access_key,
319            chrono::Utc::now(),
320        ) {
321            Ok(()) => {}
322            Err(fakecloud_aws::sigv4::SigV4Error::RequestTimeTooSkewed { .. }) => {
323                return build_error_response(
324                    StatusCode::FORBIDDEN,
325                    "RequestTimeTooSkewed",
326                    "The difference between the request time and the current time is too large",
327                    &request_id,
328                    detected.protocol,
329                );
330            }
331            Err(fakecloud_aws::sigv4::SigV4Error::InvalidDate(msg)) => {
332                return build_error_response(
333                    StatusCode::FORBIDDEN,
334                    "IncompleteSignature",
335                    &format!("Invalid x-amz-date: {msg}"),
336                    &request_id,
337                    detected.protocol,
338                );
339            }
340            Err(fakecloud_aws::sigv4::SigV4Error::Malformed(msg)) => {
341                return build_error_response(
342                    StatusCode::FORBIDDEN,
343                    "IncompleteSignature",
344                    &format!("Malformed SigV4 signature: {msg}"),
345                    &request_id,
346                    detected.protocol,
347                );
348            }
349            Err(fakecloud_aws::sigv4::SigV4Error::SignatureMismatch) => {
350                return build_error_response(
351                    StatusCode::FORBIDDEN,
352                    "SignatureDoesNotMatch",
353                    "The request signature we calculated does not match the signature you provided",
354                    &request_id,
355                    detected.protocol,
356                );
357            }
358            Err(fakecloud_aws::sigv4::SigV4Error::PresignedUrlExpired { .. }) => {
359                return build_error_response(
360                    StatusCode::FORBIDDEN,
361                    "AccessDenied",
362                    "Request has expired",
363                    &request_id,
364                    detected.protocol,
365                );
366            }
367            Err(fakecloud_aws::sigv4::SigV4Error::InvalidPresignExpires(_)) => {
368                return build_error_response(
369                    StatusCode::BAD_REQUEST,
370                    "AuthorizationQueryParametersError",
371                    "X-Amz-Expires must be a number between 1 and 604800 seconds",
372                    &request_id,
373                    detected.protocol,
374                );
375            }
376        }
377    }
378
379    // Build path segments. For S3 virtual-hosted-style requests the bucket
380    // lives in the Host header, not the path — prepend it so the S3 handler
381    // sees a uniform path-style request. SigV4 verification above already
382    // ran against the wire path, so this rewrite is signature-safe.
383    let wire_path = parts.uri.path();
384    let path = if detected.service == "s3" {
385        if let Some(bucket) = host_info.as_ref().and_then(|h| h.bucket.as_deref()) {
386            let prefix_with_slash = format!("/{bucket}/");
387            let is_bucket_root = wire_path.trim_end_matches('/') == format!("/{bucket}");
388            if wire_path.starts_with(&prefix_with_slash) || is_bucket_root {
389                wire_path.to_string()
390            } else if wire_path == "/" || wire_path.is_empty() {
391                format!("/{bucket}")
392            } else {
393                format!("/{bucket}{wire_path}")
394            }
395        } else {
396            wire_path.to_string()
397        }
398    } else {
399        wire_path.to_string()
400    };
401    let raw_query = parts.uri.query().unwrap_or("").to_string();
402    let path_segments: Vec<String> = path
403        .split('/')
404        .filter(|s| !s.is_empty())
405        .map(|s| s.to_string())
406        .collect();
407
408    // For JSON protocol, validate that non-empty bodies are valid JSON
409    if detected.protocol == AwsProtocol::Json
410        && !body_bytes.is_empty()
411        && serde_json::from_slice::<serde_json::Value>(&body_bytes).is_err()
412    {
413        return build_error_response(
414            StatusCode::BAD_REQUEST,
415            "SerializationException",
416            "Start of structure or map found where not expected",
417            &request_id,
418            AwsProtocol::Json,
419        );
420    }
421
422    // Merge query params with form body params for Query protocol
423    let mut all_params = query_params;
424    if detected.protocol == AwsProtocol::Query {
425        let body_params = protocol::parse_query_body(&body_bytes);
426        for (k, v) in body_params {
427            all_params.entry(k).or_insert(v);
428        }
429    }
430
431    // CloudWatch (`monitoring`) advertises awsJson1_0 alongside awsQuery. Its
432    // handlers all read the flat awsQuery param map, so when a client uses the
433    // JSON protocol we flatten the JSON body into that same map, leaving the
434    // handlers unchanged. The handler emits a JSON response for JSON callers.
435    if detected.protocol == AwsProtocol::Json && detected.service == "monitoring" {
436        let body_params = protocol::flatten_json_to_query(&body_bytes);
437        for (k, v) in body_params {
438            all_params.entry(k).or_insert(v);
439        }
440    }
441
442    let aws_request = AwsRequest {
443        service: detected.service.clone(),
444        action: detected.action.clone(),
445        region,
446        account_id: caller_principal
447            .as_ref()
448            .map(|p| p.account_id.clone())
449            .unwrap_or_else(|| config.account_id.clone()),
450        request_id: request_id.clone(),
451        headers: parts.headers,
452        query_params: all_params,
453        body: body_bytes,
454        body_stream: parking_lot::Mutex::new(body_stream),
455        path_segments,
456        raw_path: path,
457        raw_query,
458        method: parts.method,
459        is_query_protocol: detected.protocol == AwsProtocol::Query,
460        access_key_id,
461        principal: caller_principal,
462    };
463
464    tracing::info!(
465        service = %aws_request.service,
466        action = %aws_request.action,
467        request_id = %aws_request.request_id,
468        "handling request"
469    );
470
471    // Opt-in IAM identity-policy enforcement. Runs before the service
472    // handler so a deny never reaches business logic. Root principals
473    // (both `test*` bypass AKIDs and the account's IAM root) are exempt,
474    // matching AWS behavior. Services that haven't opted in via
475    // `iam_enforceable()` are transparently skipped — the startup log
476    // lists which services are under enforcement so users always know.
477    if config.iam_mode.is_enabled()
478        && service.iam_enforceable()
479        && !is_root_bypass(aws_request.access_key_id.as_deref().unwrap_or(""))
480    {
481        if let Some(evaluator) = config.policy_evaluator.as_ref() {
482            if let Some(principal) = aws_request.principal.as_ref() {
483                if !principal.is_root() {
484                    if let Some(iam_action) = service.iam_action_for(&aws_request) {
485                        let mut condition_context = build_condition_context(
486                            principal,
487                            remote_addr,
488                            &aws_request.region,
489                            is_secure_transport(&aws_request.headers),
490                        );
491                        // F3 keys riding on the resolved credential. STS
492                        // populates these at mint time so subsequent
493                        // requests under the credential can be evaluated
494                        // against `aws:MultiFactorAuthPresent`,
495                        // `aws:MultiFactorAuthAge`, `aws:TokenIssueTime`,
496                        // and `aws:FederatedProvider`. IAM user access
497                        // keys carry none of these, matching AWS.
498                        if let Some(rc) = resolved.as_ref() {
499                            condition_context.aws_mfa_present = Some(rc.mfa_present);
500                            condition_context.aws_token_issue_time = rc.token_issued_at;
501                            condition_context.aws_federated_provider =
502                                rc.federated_provider.clone();
503                            // `aws:MultiFactorAuthAge` is "seconds since
504                            // MFA was asserted" — computed at evaluation
505                            // time from the token issue moment so the
506                            // value increases monotonically as the session
507                            // ages. Only set when the session was actually
508                            // minted with MFA; otherwise the key is
509                            // absent, matching AWS.
510                            if rc.mfa_present {
511                                if let Some(issued) = rc.token_issued_at {
512                                    let age = chrono::Utc::now()
513                                        .signed_duration_since(issued)
514                                        .num_seconds()
515                                        .max(0);
516                                    condition_context.aws_mfa_age_seconds = Some(age);
517                                }
518                            }
519                        }
520                        condition_context.service_keys =
521                            service.iam_condition_keys_for(&aws_request, &iam_action);
522
523                        // ABAC: populate tag-based condition keys.
524                        // aws:ResourceTag/*
525                        match service.resource_tags_for(&iam_action.resource) {
526                            Some(tags) => condition_context.resource_tags = Some(tags),
527                            None => tracing::debug!(
528                                target: "fakecloud::iam::audit",
529                                service = %detected.service,
530                                resource = %iam_action.resource,
531                                "service does not expose resource tags for ABAC; skipping aws:ResourceTag/* evaluation"
532                            ),
533                        }
534                        // aws:RequestTag/* + aws:TagKeys
535                        match service.request_tags_from(&aws_request, iam_action.action) {
536                            Some(tags) => condition_context.request_tags = Some(tags),
537                            None => tracing::debug!(
538                                target: "fakecloud::iam::audit",
539                                service = %detected.service,
540                                action = %iam_action.action_string(),
541                                "service does not expose request tags for ABAC; skipping aws:RequestTag/* / aws:TagKeys evaluation"
542                            ),
543                        }
544                        // aws:PrincipalTag/*
545                        condition_context.principal_tags = principal.tags.clone();
546
547                        // Phase 2: fetch the resource-based policy (if
548                        // any) attached to the target resource and
549                        // pass it to the evaluator alongside the
550                        // principal's identity policies. The resource's
551                        // owning account is parsed from the ARN (#381
552                        // multi-account alignment); S3 ARNs have an
553                        // empty account field, so we fall back to the
554                        // server's configured account ID in that case.
555                        let resource_policy_json =
556                            config.resource_policy_provider.as_ref().and_then(|p| {
557                                p.resource_policy(&detected.service, &iam_action.resource)
558                            });
559                        // Derive the resource-owning account. Prefer a provider
560                        // lookup (S3 ARNs carry no account, so the bucket's
561                        // owner is resolved from state — without this, account
562                        // A reaching account B's bucket would be mis-read as
563                        // same-account and skip B's bucket-policy requirement,
564                        // bug-audit 2026-05-28, 5.3), then fall back to the
565                        // account embedded in the ARN (SQS/SNS/Lambda/…), then
566                        // to the caller's account for wildcard / unscoped
567                        // actions (ListQueues, GetCallerIdentity).
568                        let resource_account_id = config
569                            .resource_policy_provider
570                            .as_ref()
571                            .and_then(|p| {
572                                p.resource_owner_account(&detected.service, &iam_action.resource)
573                            })
574                            .or_else(|| parse_account_from_arn(&iam_action.resource))
575                            .unwrap_or_else(|| principal.account_id.clone());
576                        // SCP ceiling: resolve the inherited SCP chain
577                        // for this principal (management accounts and
578                        // service-linked roles come back as `None`, in
579                        // which case the evaluator treats the layer as
580                        // absent). Audit breadcrumbs emitted by the
581                        // resolver itself, not here.
582                        let scps = config
583                            .scp_resolver
584                            .as_ref()
585                            .and_then(|r| r.scps_for(principal));
586                        let decision = evaluator.evaluate_with_resource_policy(
587                            principal,
588                            &iam_action,
589                            &condition_context,
590                            resource_policy_json.as_deref(),
591                            &resource_account_id,
592                            &caller_session_policies,
593                            scps.as_deref(),
594                        );
595                        if !decision.is_allow() {
596                            tracing::warn!(
597                                target: "fakecloud::iam::audit",
598                                service = %detected.service,
599                                action = %iam_action.action_string(),
600                                resource = %iam_action.resource,
601                                principal = %principal.arn,
602                                resource_policy_present = resource_policy_json.is_some(),
603                                decision = ?decision,
604                                mode = %config.iam_mode,
605                                request_id = %request_id,
606                                "IAM policy evaluation denied request"
607                            );
608                            if config.iam_mode.is_strict() {
609                                // Real AWS includes an "Encoded
610                                // authorization failure message" suffix
611                                // on AccessDeniedException — an opaque
612                                // base64+zlib JSON blob that the caller
613                                // can pass to STS
614                                // `DecodeAuthorizationMessage` to
615                                // recover the structured deny reason
616                                // (action, principal, matched
617                                // statements, condition context). We
618                                // produce the same blob inline so
619                                // existing tooling that decodes deny
620                                // reasons works against fakecloud.
621                                let context_summary = serde_json::json!({
622                                    "aws:PrincipalArn": principal.arn,
623                                    "aws:PrincipalAccount": principal.account_id,
624                                    "aws:RequestedRegion": condition_context
625                                        .aws_requested_region
626                                        .clone()
627                                        .unwrap_or_default(),
628                                    "aws:SecureTransport": condition_context
629                                        .aws_secure_transport
630                                        .unwrap_or(false),
631                                    "aws:Action": iam_action.action_string(),
632                                    "aws:Resource": iam_action.resource,
633                                    "decision": format!("{:?}", decision),
634                                });
635                                let action_string = iam_action.action_string();
636                                let encoded = crate::auth_message::encode_deny(
637                                    matches!(decision, crate::auth::IamDecision::ExplicitDeny),
638                                    Some(&action_string),
639                                    Some(&principal.arn),
640                                    Vec::new(),
641                                    Some(context_summary),
642                                );
643                                return build_error_response(
644                                    StatusCode::FORBIDDEN,
645                                    "AccessDeniedException",
646                                    &format!(
647                                        "User: {} is not authorized to perform: {} on resource: {} Encoded authorization failure message: {}",
648                                        principal.arn,
649                                        iam_action.action_string(),
650                                        iam_action.resource,
651                                        encoded,
652                                    ),
653                                    &request_id,
654                                    detected.protocol,
655                                );
656                            }
657                            // Soft mode: audit log already emitted; fall
658                            // through to the handler.
659                        }
660                    } else {
661                        // Service opted in but didn't return an IamAction
662                        // for this specific operation — programming bug,
663                        // surface it loudly in soft/strict mode so it's
664                        // visible during rollout.
665                        tracing::warn!(
666                            target: "fakecloud::iam::audit",
667                            service = %detected.service,
668                            action = %aws_request.action,
669                            "service is iam_enforceable but has no IamAction mapping for this action; skipping evaluation"
670                        );
671                    }
672                }
673            } else if aws_request.access_key_id.is_none() {
674                // Truly anonymous (unsigned) caller — no Authorization header at
675                // all. No identity policies exist, so authorization rests
676                // entirely on the resource policy (a bucket policy granting
677                // `Principal:"*"`) and public-read ACLs — mirroring AWS, which
678                // denies anonymous requests unless the resource is explicitly
679                // made public. Without this an anonymous request that reached an
680                // iam_enforceable service in enforcement mode would bypass
681                // authorization entirely.
682                //
683                // A request that carried an Authorization header but whose
684                // credential did not resolve (principal `None` with
685                // `access_key_id` `Some`) is intentionally left alone here: with
686                // SigV4 verification off, fakecloud does not reject unverified
687                // signed requests, and turning them into anonymous denials would
688                // change long-standing behavior.
689                if let Some(iam_action) = service.iam_action_for(&aws_request) {
690                    let now = chrono::Utc::now();
691                    let mut condition_context = ConditionContext {
692                        aws_source_ip: remote_addr.map(|sa| sa.ip()),
693                        aws_current_time: Some(now),
694                        aws_epoch_time: Some(now.timestamp()),
695                        aws_secure_transport: Some(is_secure_transport(&aws_request.headers)),
696                        aws_requested_region: Some(aws_request.region.clone()),
697                        ..Default::default()
698                    };
699                    condition_context.service_keys =
700                        service.iam_condition_keys_for(&aws_request, &iam_action);
701                    let resource_policy_json = config
702                        .resource_policy_provider
703                        .as_ref()
704                        .and_then(|p| p.resource_policy(&detected.service, &iam_action.resource));
705                    let policy_allows = evaluator
706                        .evaluate_anonymous(
707                            &iam_action,
708                            &condition_context,
709                            resource_policy_json.as_deref(),
710                        )
711                        .is_allow();
712                    let acl_allows = config.resource_policy_provider.as_ref().is_some_and(|p| {
713                        p.public_acl_allows(
714                            &detected.service,
715                            &iam_action.resource,
716                            iam_action.action,
717                        )
718                    });
719                    if !policy_allows && !acl_allows {
720                        tracing::warn!(
721                            target: "fakecloud::iam::audit",
722                            service = %detected.service,
723                            action = %iam_action.action_string(),
724                            resource = %iam_action.resource,
725                            resource_policy_present = resource_policy_json.is_some(),
726                            mode = %config.iam_mode,
727                            request_id = %request_id,
728                            "anonymous request denied: no public bucket policy or ACL grants the action"
729                        );
730                        if config.iam_mode.is_strict() {
731                            return build_error_response(
732                                StatusCode::FORBIDDEN,
733                                "AccessDenied",
734                                "Access Denied",
735                                &request_id,
736                                detected.protocol,
737                            );
738                        }
739                        // Soft mode: audit log emitted; fall through to the handler.
740                    }
741                }
742            }
743        }
744    }
745
746    match service.handle(aws_request).await {
747        Ok(resp) => {
748            let mut builder = Response::builder()
749                .status(resp.status)
750                .header("x-amzn-requestid", &request_id)
751                .header("x-amz-request-id", &request_id);
752
753            if !resp.content_type.is_empty() {
754                builder = builder.header("content-type", &resp.content_type);
755            }
756
757            let has_content_length = resp
758                .headers
759                .iter()
760                .any(|(k, _)| k.as_str().eq_ignore_ascii_case("content-length"));
761
762            for (k, v) in &resp.headers {
763                builder = builder.header(k, v);
764            }
765
766            match resp.body {
767                ResponseBody::Bytes(b) => builder.body(Body::from(b)).unwrap(),
768                ResponseBody::File { file, size } => {
769                    let stream = tokio_util::io::ReaderStream::new(file);
770                    let body = Body::from_stream(stream);
771                    if !has_content_length {
772                        builder = builder.header("content-length", size.to_string());
773                    }
774                    builder.body(body).unwrap()
775                }
776            }
777        }
778        Err(err) => {
779            tracing::warn!(
780                service = %detected.service,
781                action = %detected.action,
782                error = %err,
783                "request failed"
784            );
785            let error_headers = err.response_headers().to_vec();
786            let mut resp = build_error_response_with_fields(
787                err.status(),
788                err.code(),
789                &err.message(),
790                &request_id,
791                detected.protocol,
792                err.extra_fields(),
793            );
794            for (k, v) in &error_headers {
795                if let (Ok(name), Ok(val)) = (
796                    k.parse::<http::header::HeaderName>(),
797                    v.parse::<http::header::HeaderValue>(),
798                ) {
799                    resp.headers_mut().insert(name, val);
800                }
801            }
802            resp
803        }
804    }
805}
806
807/// Configuration passed to the dispatch handler.
808#[derive(Clone)]
809pub struct DispatchConfig {
810    pub region: String,
811    pub account_id: String,
812    /// Whether to cryptographically verify SigV4 signatures on incoming
813    /// requests. Wired through from `--verify-sigv4` /
814    /// `FAKECLOUD_VERIFY_SIGV4`. Off by default.
815    pub verify_sigv4: bool,
816    /// IAM policy evaluation mode. Wired through from `--iam` /
817    /// `FAKECLOUD_IAM`. Defaults to [`IamMode::Off`]. Actual evaluation is
818    /// added in a later batch; today this field is plumbed but never
819    /// consulted.
820    pub iam_mode: IamMode,
821    /// Resolves access key IDs to their secrets and owning principals.
822    /// Required when `verify_sigv4` or `iam_mode != Off`. When `None`, both
823    /// features gracefully degrade to off-by-default behavior.
824    pub credential_resolver: Option<Arc<dyn CredentialResolver>>,
825    /// Evaluates IAM identity policies for a resolved principal + action.
826    /// Required when `iam_mode != Off`. When `None`, enforcement silently
827    /// degrades to off even if `iam_mode` is set.
828    pub policy_evaluator: Option<Arc<dyn IamPolicyEvaluator>>,
829    /// Resolves resource-based policies (S3 bucket policies in the
830    /// initial rollout) to hand to the evaluator alongside the
831    /// principal's identity policies. `None` means the server was
832    /// started without any resource-policy-owning service registered;
833    /// dispatch then behaves as if no resource policy is attached to
834    /// any resource, identical to the Phase 1 behavior.
835    pub resource_policy_provider: Option<Arc<dyn ResourcePolicyProvider>>,
836    /// Resolves the ordered SCP chain that applies to a principal's
837    /// account (root-OU first, account-direct last). `None` means no
838    /// organizations resolver has been registered — SCPs never gate
839    /// any request in that case. Off-by-default matches the Batch 4
840    /// contract: zero behavior change until a user calls
841    /// `CreateOrganization` and the resolver is wired.
842    pub scp_resolver: Option<Arc<dyn crate::auth::ScpResolver>>,
843}
844
845impl std::fmt::Debug for DispatchConfig {
846    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
847        f.debug_struct("DispatchConfig")
848            .field("region", &self.region)
849            .field("account_id", &self.account_id)
850            .field("verify_sigv4", &self.verify_sigv4)
851            .field("iam_mode", &self.iam_mode)
852            .field(
853                "credential_resolver",
854                &self
855                    .credential_resolver
856                    .as_ref()
857                    .map(|_| "<CredentialResolver>"),
858            )
859            .field(
860                "policy_evaluator",
861                &self
862                    .policy_evaluator
863                    .as_ref()
864                    .map(|_| "<IamPolicyEvaluator>"),
865            )
866            .field(
867                "resource_policy_provider",
868                &self
869                    .resource_policy_provider
870                    .as_ref()
871                    .map(|_| "<ResourcePolicyProvider>"),
872            )
873            .field(
874                "scp_resolver",
875                &self.scp_resolver.as_ref().map(|_| "<ScpResolver>"),
876            )
877            .finish()
878    }
879}
880
881impl DispatchConfig {
882    /// Minimal constructor for tests and call sites that don't care about the
883    /// opt-in security features.
884    pub fn new(region: impl Into<String>, account_id: impl Into<String>) -> Self {
885        Self {
886            region: region.into(),
887            account_id: account_id.into(),
888            verify_sigv4: false,
889            iam_mode: IamMode::Off,
890            credential_resolver: None,
891            policy_evaluator: None,
892            resource_policy_provider: None,
893            scp_resolver: None,
894        }
895    }
896}
897
898/// Extract the 12-digit account ID segment from an AWS ARN.
899///
900/// ARNs follow `arn:<partition>:<service>:<region>:<account>:<resource>`.
901/// Identifies routes that opt into streaming request bodies. Returns
902/// `Some((service, action_hint))` when the dispatch path should hand
903/// the raw body to the service handler unbuffered, otherwise `None`
904/// for the default buffered path. The handler reads the stream via
905/// [`crate::service::AwsRequest::take_body_stream`].
906///
907/// Streaming-eligible routes today:
908///
909/// * `s3` PUT object — `PUT /<bucket>/<key>` with a SigV4 (or
910///   presigned) auth header. Covers PutObject, UploadPart, and
911///   UploadPartCopy. The S3 service spills to disk via
912///   [`fakecloud_persistence::BodySource::File`] when the stream is
913///   present.
914/// * `ecr` OCI Distribution v2 blob upload — `PATCH` and `PUT` on
915///   `/v2/{name}/blobs/uploads/{uuid}`. The ECR service spools the
916///   stream into a per-upload temp file before computing the digest.
917fn streaming_route(
918    method: &http::Method,
919    path: &str,
920    headers: &http::HeaderMap,
921    query_params: &HashMap<String, String>,
922) -> Option<(&'static str, &'static str)> {
923    // ECR OCI v2 blob upload (PATCH chunk + final PUT).
924    if (method == http::Method::PATCH || method == http::Method::PUT)
925        && path.starts_with("/v2/")
926        && path.contains("/blobs/uploads/")
927    {
928        return Some(("ecr", ""));
929    }
930
931    // S3 PutObject / UploadPart / UploadPartCopy. Detect either via
932    // SigV4 service field in the Authorization header OR via a SigV4
933    // presigned URL (X-Amz-Credential .../s3/...) OR a SigV2 presigned
934    // URL (AWSAccessKeyId + Signature + Expires query parameters).
935    if method == http::Method::PUT {
936        let after = path.trim_start_matches('/');
937        // Path-style PutObject is `PUT /<bucket>/<key>` (path contains a
938        // slash); virtual-hosted-style is `PUT /<key>` with the bucket
939        // in the Host header. For virtual-hosted, accept any non-empty
940        // path so the key flows through the streaming dispatch — the
941        // Host parser already routed this request to S3.
942        let virtual_hosted_s3 = protocol::parse_routing_host_from_headers(headers)
943            .filter(|h| h.service == "s3" && h.bucket.is_some())
944            .is_some();
945        if after.is_empty() || (!virtual_hosted_s3 && !after.contains('/')) {
946            return None;
947        }
948        let header_s3 = headers
949            .get("authorization")
950            .and_then(|v| v.to_str().ok())
951            .and_then(fakecloud_aws::sigv4::parse_sigv4)
952            .map(|info| info.service == "s3")
953            .unwrap_or(false);
954        let presigned_v4_s3 = query_params
955            .get("X-Amz-Credential")
956            .and_then(|c| c.split('/').nth(3).map(|s| s.to_string()))
957            .map(|service| service == "s3")
958            .unwrap_or(false);
959        let presigned_v2 = query_params.contains_key("AWSAccessKeyId")
960            && query_params.contains_key("Signature")
961            && query_params.contains_key("Expires");
962        if header_s3 || presigned_v4_s3 || presigned_v2 {
963            return Some(("s3", ""));
964        }
965    }
966
967    None
968}
969
970/// Default request-body buffering cap. fakecloud reads the entire
971/// request body into memory before handing it to a service handler,
972/// so this ceiling caps RAM usage per in-flight request.
973///
974/// Default 1 GiB — comfortably above legitimate single S3 PutObject
975/// payloads (AWS recommends multipart above ~100 MiB) and each
976/// multipart part dispatches through here separately. Override with
977/// `FAKECLOUD_MAX_REQUEST_BODY_BYTES` (decimal bytes) when running
978/// stress tests that push past the default.
979const DEFAULT_MAX_REQUEST_BODY_BYTES: usize = 1024 * 1024 * 1024;
980
981fn max_request_body_bytes() -> usize {
982    static CACHED: std::sync::OnceLock<usize> = std::sync::OnceLock::new();
983    *CACHED.get_or_init(|| {
984        std::env::var("FAKECLOUD_MAX_REQUEST_BODY_BYTES")
985            .ok()
986            .and_then(|s| s.parse::<usize>().ok())
987            .filter(|&n| n > 0)
988            .unwrap_or(DEFAULT_MAX_REQUEST_BODY_BYTES)
989    })
990}
991
992/// For the cross-account decision in IAM enforcement, the "resource
993/// account" is the ARN's account segment. Some services (notably S3)
994/// produce ARNs with an empty account field — for those we return
995/// `None` and let the caller fall back to the server's configured
996/// account ID. Malformed or non-ARN strings also return `None`.
997fn parse_account_from_arn(arn: &str) -> Option<String> {
998    let mut parts = arn.splitn(6, ':');
999    if parts.next()? != "arn" {
1000        return None;
1001    }
1002    let _partition = parts.next()?;
1003    let _service = parts.next()?;
1004    let _region = parts.next()?;
1005    let account = parts.next()?;
1006    // Resource segment must exist (parts.next().is_some()) for the ARN
1007    // to be well-formed, but we don't consume its value here.
1008    parts.next()?;
1009    if account.is_empty() {
1010        None
1011    } else {
1012        Some(account.to_string())
1013    }
1014}
1015
1016/// Extract region from User-Agent header suffix `region/<region>`.
1017fn extract_region_from_user_agent(headers: &http::HeaderMap) -> Option<String> {
1018    let ua = headers.get("user-agent")?.to_str().ok()?;
1019    for part in ua.split_whitespace() {
1020        if let Some(region) = part.strip_prefix("region/") {
1021            if !region.is_empty() {
1022                return Some(region.to_string());
1023            }
1024        }
1025    }
1026    None
1027}
1028
1029fn build_error_response(
1030    status: StatusCode,
1031    code: &str,
1032    message: &str,
1033    request_id: &str,
1034    protocol: AwsProtocol,
1035) -> Response<Body> {
1036    build_error_response_with_fields(status, code, message, request_id, protocol, &[])
1037}
1038
1039fn build_error_response_with_fields(
1040    status: StatusCode,
1041    code: &str,
1042    message: &str,
1043    request_id: &str,
1044    protocol: AwsProtocol,
1045    extra_fields: &[(String, String)],
1046) -> Response<Body> {
1047    let (status, content_type, body) = match protocol {
1048        AwsProtocol::Query => {
1049            fakecloud_aws::error::xml_error_response(status, code, message, request_id)
1050        }
1051        AwsProtocol::Rest => fakecloud_aws::error::s3_xml_error_response_with_fields(
1052            status,
1053            code,
1054            message,
1055            request_id,
1056            extra_fields,
1057        ),
1058        AwsProtocol::Json | AwsProtocol::RestJson => {
1059            fakecloud_aws::error::json_error_response_with_fields(
1060                status,
1061                code,
1062                message,
1063                extra_fields,
1064            )
1065        }
1066    };
1067
1068    // S3 (and other REST-XML services) place the error code in
1069    // `x-amz-error-code` so HEAD responses — which HTTP forbids from
1070    // carrying a body — still surface the code. AWS SDKs read this header
1071    // when the body is empty. Emit it on every error response so HEAD,
1072    // OPTIONS, and any client that strips the body still see the code.
1073    // Backend errors regularly include newlines (multi-line stderr from
1074    // docker/podman/etc.); HTTP header values reject control characters,
1075    // so sanitize before insertion or the builder rejects the response
1076    // and the connection drops.
1077    let safe_code = sanitize_header_value(code);
1078    let safe_message = sanitize_header_value(message);
1079    let mut builder = Response::builder()
1080        .status(status)
1081        .header("content-type", content_type)
1082        .header("x-amzn-requestid", request_id)
1083        .header("x-amz-request-id", request_id);
1084    if let Ok(v) = http::HeaderValue::from_str(&safe_code) {
1085        builder = builder.header("x-amz-error-code", v);
1086    }
1087    if let Ok(v) = http::HeaderValue::from_str(&safe_message) {
1088        builder = builder.header("x-amz-error-message", v);
1089    }
1090    builder.body(Body::from(body)).unwrap_or_else(|_| {
1091        // Builder only fails if a header is invalid; we sanitized the two
1092        // we control, so the remaining ones (content-type, request id) are
1093        // ASCII and safe. This fallback exists purely so we never panic.
1094        Response::new(Body::empty())
1095    })
1096}
1097
1098/// Strip characters that HTTP header values reject (control bytes, CR/LF/TAB)
1099/// and truncate to a length that AWS SDKs handle cleanly. Backend tools
1100/// (docker, podman, kubectl, …) emit multi-line stderr, and forwarding that
1101/// raw into `x-amz-error-message` previously panicked the dispatcher.
1102fn sanitize_header_value(s: &str) -> String {
1103    const MAX_LEN: usize = 1024;
1104    let mut out = String::with_capacity(s.len().min(MAX_LEN));
1105    for ch in s.chars() {
1106        if out.len() >= MAX_LEN {
1107            break;
1108        }
1109        // Header values forbid CR, LF, and other control bytes (RFC 9110).
1110        // Replace with a single space so multi-line messages stay readable.
1111        if ch.is_control() {
1112            if !out.ends_with(' ') {
1113                out.push(' ');
1114            }
1115        } else {
1116            out.push(ch);
1117        }
1118    }
1119    out.trim().to_string()
1120}
1121
1122/// Build the [`ConditionContext`] passed to the IAM evaluator for one
1123/// request. Populates the 10 global condition keys from the resolved
1124/// principal + the HTTP request. Service-specific keys are deferred to
1125/// a follow-up batch and left empty.
1126/// For an unsigned request that no other detection rule claimed, return the
1127/// bucket name when the first path segment names an existing S3 bucket.
1128///
1129/// fakecloud serves every service from one endpoint, so an anonymous
1130/// path-style S3 request (`GET /bucket/key`, no SigV4) is indistinguishable
1131/// from an API Gateway execute-api call by headers alone. Bucket existence is
1132/// the disambiguator: if the segment is a real bucket, route to S3; otherwise
1133/// fall through to the apigateway catch-all. Uses the already-wired
1134/// `resource_policy_provider`, which resolves S3 bucket ownership from state
1135/// (`Some` => the bucket exists). Returns `None` when no provider is wired.
1136/// Recover the access key from a SigV2 presigned URL. AWS SigV2 presigning
1137/// puts the key in the `AWSAccessKeyId` query parameter alongside `Signature`
1138/// and `Expires`; all three must be present for the URL to be a SigV2 presign.
1139/// Returns None for SigV4 presigns (which use `X-Amz-Credential`) or unsigned
1140/// requests.
1141fn sigv2_presigned_access_key(query_params: &HashMap<String, String>) -> Option<String> {
1142    if query_params.contains_key("Signature") && query_params.contains_key("Expires") {
1143        query_params.get("AWSAccessKeyId").cloned()
1144    } else {
1145        None
1146    }
1147}
1148
1149fn anonymous_s3_bucket(uri: &http::Uri, config: &DispatchConfig) -> Option<String> {
1150    let provider = config.resource_policy_provider.as_ref()?;
1151    let segment = uri.path().split('/').find(|s| !s.is_empty())?.to_string();
1152    let arn = format!("arn:aws:s3:::{segment}");
1153    provider.resource_owner_account("s3", &arn).map(|_| segment)
1154}
1155
1156fn build_condition_context(
1157    principal: &Principal,
1158    remote_addr: Option<SocketAddr>,
1159    region: &str,
1160    secure_transport: bool,
1161) -> ConditionContext {
1162    let now = chrono::Utc::now();
1163    ConditionContext {
1164        aws_username: aws_username_from_principal(principal),
1165        aws_userid: Some(principal.user_id.clone()),
1166        aws_principal_arn: Some(principal.arn.clone()),
1167        aws_principal_account: Some(principal.account_id.clone()),
1168        aws_principal_type: Some(principal_type_label(principal.principal_type).to_string()),
1169        aws_source_ip: remote_addr.map(|sa| sa.ip()),
1170        aws_current_time: Some(now),
1171        aws_epoch_time: Some(now.timestamp()),
1172        aws_secure_transport: Some(secure_transport),
1173        aws_requested_region: Some(region.to_string()),
1174        // F3 keys: populated from the caller's session context when STS
1175        // mints credentials with MFA / SAML / OIDC / VPC-endpoint hints.
1176        // Default-None here so tests/dispatch sites that don't set them
1177        // safe-fail any policy referencing them — matching AWS for keys
1178        // that aren't asserted.
1179        aws_mfa_present: None,
1180        aws_mfa_age_seconds: None,
1181        aws_called_via: Vec::new(),
1182        aws_source_vpce: None,
1183        aws_source_vpc: None,
1184        aws_vpc_source_ip: None,
1185        aws_federated_provider: None,
1186        aws_token_issue_time: None,
1187        service_keys: Default::default(),
1188        resource_tags: None,
1189        request_tags: None,
1190        principal_tags: None,
1191    }
1192}
1193
1194/// `aws:username` is only set for IAM users, matching AWS. For assumed
1195/// roles, federated users, root, and unknown principals the key is
1196/// absent — operators that reference it without `IfExists` safe-fail.
1197fn aws_username_from_principal(principal: &Principal) -> Option<String> {
1198    if principal.principal_type != PrincipalType::User {
1199        return None;
1200    }
1201    let after = principal.arn.rsplit_once(":user/").map(|(_, s)| s)?;
1202    // Strip any IAM path prefix; bare username is the last segment.
1203    Some(after.rsplit('/').next().unwrap_or(after).to_string())
1204}
1205
1206/// AWS's `aws:PrincipalType` uses PascalCase identifiers, distinct from
1207/// the lowercase ones [`PrincipalType::as_str`] returns for ARNs.
1208fn principal_type_label(t: PrincipalType) -> &'static str {
1209    match t {
1210        PrincipalType::User => "User",
1211        PrincipalType::AssumedRole => "AssumedRole",
1212        PrincipalType::FederatedUser => "FederatedUser",
1213        PrincipalType::Root => "Account",
1214        PrincipalType::Unknown => "Unknown",
1215    }
1216}
1217
1218/// Best-effort detection of TLS-terminated requests. Direct HTTPS
1219/// connections are not yet supported by the fakecloud server (it speaks
1220/// plain HTTP), so the only signal is an `x-forwarded-proto: https`
1221/// header set by an upstream proxy. Anything else evaluates to `false`,
1222/// which matches the typical local-dev setup.
1223fn is_secure_transport(headers: &http::HeaderMap) -> bool {
1224    headers
1225        .get("x-forwarded-proto")
1226        .and_then(|v| v.to_str().ok())
1227        .map(|s| s.eq_ignore_ascii_case("https"))
1228        .unwrap_or(false)
1229}
1230
1231trait ProtocolExt {
1232    fn error_status(&self) -> StatusCode;
1233}
1234
1235impl ProtocolExt for AwsProtocol {
1236    fn error_status(&self) -> StatusCode {
1237        StatusCode::BAD_REQUEST
1238    }
1239}
1240
1241#[cfg(test)]
1242mod tests {
1243    use super::*;
1244
1245    #[test]
1246    fn default_max_request_body_bytes_is_one_gib() {
1247        // Without the env override, the cap defaults to 1 GiB. The
1248        // public function caches via OnceLock so only the first call
1249        // in the process matters; we assert the constant directly.
1250        assert_eq!(DEFAULT_MAX_REQUEST_BODY_BYTES, 1024 * 1024 * 1024);
1251    }
1252
1253    #[test]
1254    fn sigv2_presigned_access_key_extracted_with_signature_and_expires() {
1255        let mut q = HashMap::new();
1256        q.insert("AWSAccessKeyId".to_string(), "AKIAEXAMPLE".to_string());
1257        q.insert("Signature".to_string(), "abc%2Bdef".to_string());
1258        q.insert("Expires".to_string(), "1700000000".to_string());
1259        assert_eq!(
1260            sigv2_presigned_access_key(&q).as_deref(),
1261            Some("AKIAEXAMPLE")
1262        );
1263    }
1264
1265    #[test]
1266    fn sigv2_presigned_access_key_none_without_signature_or_expires() {
1267        // AWSAccessKeyId alone (e.g. a stray query param) is not a SigV2
1268        // presign and must not be treated as a credential.
1269        let mut q = HashMap::new();
1270        q.insert("AWSAccessKeyId".to_string(), "AKIAEXAMPLE".to_string());
1271        assert_eq!(sigv2_presigned_access_key(&q), None);
1272
1273        q.insert("Expires".to_string(), "1700000000".to_string());
1274        assert_eq!(
1275            sigv2_presigned_access_key(&q),
1276            None,
1277            "missing Signature must not qualify"
1278        );
1279    }
1280
1281    #[test]
1282    fn sigv2_presigned_access_key_none_for_unsigned_request() {
1283        assert_eq!(sigv2_presigned_access_key(&HashMap::new()), None);
1284    }
1285
1286    #[test]
1287    fn dispatch_config_new_defaults_to_off() {
1288        let cfg = DispatchConfig::new("us-east-1", "123456789012");
1289        assert_eq!(cfg.region, "us-east-1");
1290        assert_eq!(cfg.account_id, "123456789012");
1291        assert!(!cfg.verify_sigv4);
1292        assert_eq!(cfg.iam_mode, IamMode::Off);
1293    }
1294
1295    #[test]
1296    fn aws_username_strips_iam_path_for_users() {
1297        let p = Principal {
1298            arn: "arn:aws:iam::123456789012:user/engineering/alice".into(),
1299            user_id: "AIDAALICE".into(),
1300            account_id: "123456789012".into(),
1301            principal_type: PrincipalType::User,
1302            source_identity: None,
1303            tags: None,
1304        };
1305        assert_eq!(aws_username_from_principal(&p), Some("alice".into()));
1306    }
1307
1308    #[test]
1309    fn aws_username_unset_for_assumed_role() {
1310        let p = Principal {
1311            arn: "arn:aws:sts::123456789012:assumed-role/ops/session".into(),
1312            user_id: "AROAOPS:session".into(),
1313            account_id: "123456789012".into(),
1314            principal_type: PrincipalType::AssumedRole,
1315            source_identity: None,
1316            tags: None,
1317        };
1318        assert_eq!(aws_username_from_principal(&p), None);
1319    }
1320
1321    #[test]
1322    fn principal_type_label_matches_aws_casing() {
1323        assert_eq!(principal_type_label(PrincipalType::User), "User");
1324        assert_eq!(
1325            principal_type_label(PrincipalType::AssumedRole),
1326            "AssumedRole"
1327        );
1328        assert_eq!(principal_type_label(PrincipalType::Root), "Account");
1329    }
1330
1331    #[test]
1332    fn build_condition_context_populates_global_keys() {
1333        let p = Principal {
1334            arn: "arn:aws:iam::123456789012:user/alice".into(),
1335            user_id: "AIDAALICE".into(),
1336            account_id: "123456789012".into(),
1337            principal_type: PrincipalType::User,
1338            source_identity: None,
1339            tags: None,
1340        };
1341        let addr: SocketAddr = "10.0.0.1:54321".parse().unwrap();
1342        let ctx = build_condition_context(&p, Some(addr), "us-east-1", false);
1343        assert_eq!(ctx.aws_username.as_deref(), Some("alice"));
1344        assert_eq!(ctx.aws_userid.as_deref(), Some("AIDAALICE"));
1345        assert_eq!(
1346            ctx.aws_principal_arn.as_deref(),
1347            Some("arn:aws:iam::123456789012:user/alice")
1348        );
1349        assert_eq!(ctx.aws_principal_account.as_deref(), Some("123456789012"));
1350        assert_eq!(ctx.aws_principal_type.as_deref(), Some("User"));
1351        assert_eq!(
1352            ctx.aws_source_ip.map(|i| i.to_string()).as_deref(),
1353            Some("10.0.0.1")
1354        );
1355        assert_eq!(ctx.aws_requested_region.as_deref(), Some("us-east-1"));
1356        assert_eq!(ctx.aws_secure_transport, Some(false));
1357        assert!(ctx.aws_current_time.is_some());
1358        assert!(ctx.aws_epoch_time.is_some());
1359    }
1360
1361    #[test]
1362    fn is_secure_transport_reads_x_forwarded_proto() {
1363        let mut headers = http::HeaderMap::new();
1364        headers.insert("x-forwarded-proto", "https".parse().unwrap());
1365        assert!(is_secure_transport(&headers));
1366        headers.insert("x-forwarded-proto", "http".parse().unwrap());
1367        assert!(!is_secure_transport(&headers));
1368        let empty = http::HeaderMap::new();
1369        assert!(!is_secure_transport(&empty));
1370    }
1371
1372    #[test]
1373    fn parse_account_from_arn_extracts_standard_shapes() {
1374        assert_eq!(
1375            parse_account_from_arn("arn:aws:sqs:us-east-1:123456789012:queue"),
1376            Some("123456789012".to_string())
1377        );
1378        assert_eq!(
1379            parse_account_from_arn("arn:aws:iam::123456789012:user/alice"),
1380            Some("123456789012".to_string())
1381        );
1382    }
1383
1384    #[test]
1385    fn parse_account_from_arn_returns_none_for_s3_empty_account() {
1386        // S3 ARNs have both region and account empty.
1387        assert_eq!(parse_account_from_arn("arn:aws:s3:::my-bucket"), None);
1388        assert_eq!(
1389            parse_account_from_arn("arn:aws:s3:::my-bucket/path/to/key"),
1390            None
1391        );
1392    }
1393
1394    #[test]
1395    fn parse_account_from_arn_returns_none_for_malformed() {
1396        assert_eq!(parse_account_from_arn(""), None);
1397        assert_eq!(parse_account_from_arn("not-an-arn"), None);
1398        assert_eq!(parse_account_from_arn("arn:aws:sqs:us-east-1"), None);
1399        assert_eq!(parse_account_from_arn("arn:aws:sqs"), None);
1400    }
1401
1402    #[test]
1403    fn extract_region_from_user_agent_finds_region_segment() {
1404        let mut headers = http::HeaderMap::new();
1405        headers.insert(
1406            "user-agent",
1407            "aws-sdk-rust/1.0 os/linux region/eu-central-1"
1408                .parse()
1409                .unwrap(),
1410        );
1411        assert_eq!(
1412            extract_region_from_user_agent(&headers),
1413            Some("eu-central-1".to_string())
1414        );
1415    }
1416
1417    #[test]
1418    fn extract_region_from_user_agent_none_without_header() {
1419        let headers = http::HeaderMap::new();
1420        assert_eq!(extract_region_from_user_agent(&headers), None);
1421    }
1422
1423    #[test]
1424    fn extract_region_from_user_agent_ignores_empty_region() {
1425        let mut headers = http::HeaderMap::new();
1426        headers.insert("user-agent", "aws-sdk-java region/".parse().unwrap());
1427        assert_eq!(extract_region_from_user_agent(&headers), None);
1428    }
1429
1430    #[test]
1431    fn extract_region_from_user_agent_none_when_no_region_marker() {
1432        let mut headers = http::HeaderMap::new();
1433        headers.insert("user-agent", "curl/7.79.1".parse().unwrap());
1434        assert_eq!(extract_region_from_user_agent(&headers), None);
1435    }
1436
1437    #[test]
1438    fn aws_username_none_for_root() {
1439        let p = Principal {
1440            arn: "arn:aws:iam::123456789012:root".into(),
1441            user_id: "123456789012".into(),
1442            account_id: "123456789012".into(),
1443            principal_type: PrincipalType::Root,
1444            source_identity: None,
1445            tags: None,
1446        };
1447        assert_eq!(aws_username_from_principal(&p), None);
1448    }
1449
1450    #[test]
1451    fn aws_username_bare_no_path() {
1452        let p = Principal {
1453            arn: "arn:aws:iam::123456789012:user/bob".into(),
1454            user_id: "AIDABOB".into(),
1455            account_id: "123456789012".into(),
1456            principal_type: PrincipalType::User,
1457            source_identity: None,
1458            tags: None,
1459        };
1460        assert_eq!(aws_username_from_principal(&p), Some("bob".into()));
1461    }
1462
1463    #[test]
1464    fn principal_type_label_covers_federated_and_unknown() {
1465        assert_eq!(
1466            principal_type_label(PrincipalType::FederatedUser),
1467            "FederatedUser"
1468        );
1469        assert_eq!(principal_type_label(PrincipalType::Unknown), "Unknown");
1470    }
1471
1472    #[test]
1473    fn build_condition_context_marks_secure_when_flag_set() {
1474        let p = Principal {
1475            arn: "arn:aws:iam::123456789012:user/alice".into(),
1476            user_id: "AIDAALICE".into(),
1477            account_id: "123456789012".into(),
1478            principal_type: PrincipalType::User,
1479            source_identity: None,
1480            tags: None,
1481        };
1482        let ctx = build_condition_context(&p, None, "us-west-2", true);
1483        assert_eq!(ctx.aws_secure_transport, Some(true));
1484        assert!(ctx.aws_source_ip.is_none());
1485        assert_eq!(ctx.aws_requested_region.as_deref(), Some("us-west-2"));
1486    }
1487
1488    #[test]
1489    fn is_secure_transport_case_insensitive() {
1490        let mut headers = http::HeaderMap::new();
1491        headers.insert("x-forwarded-proto", "HTTPS".parse().unwrap());
1492        assert!(is_secure_transport(&headers));
1493    }
1494
1495    #[test]
1496    fn is_secure_transport_non_ascii_bytes_false() {
1497        let mut headers = http::HeaderMap::new();
1498        headers.insert(
1499            "x-forwarded-proto",
1500            http::HeaderValue::from_bytes(&[0xFF, 0xFE]).unwrap(),
1501        );
1502        assert!(!is_secure_transport(&headers));
1503    }
1504
1505    #[test]
1506    fn protocol_ext_error_status_is_bad_request() {
1507        assert_eq!(AwsProtocol::Query.error_status(), StatusCode::BAD_REQUEST);
1508        assert_eq!(AwsProtocol::Json.error_status(), StatusCode::BAD_REQUEST);
1509        assert_eq!(AwsProtocol::Rest.error_status(), StatusCode::BAD_REQUEST);
1510        assert_eq!(
1511            AwsProtocol::RestJson.error_status(),
1512            StatusCode::BAD_REQUEST
1513        );
1514    }
1515
1516    #[test]
1517    fn build_error_response_json_has_json_content_type() {
1518        let resp = build_error_response(
1519            StatusCode::BAD_REQUEST,
1520            "TestCode",
1521            "test msg",
1522            "req-1",
1523            AwsProtocol::Json,
1524        );
1525        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1526        let ct = resp
1527            .headers()
1528            .get("content-type")
1529            .unwrap()
1530            .to_str()
1531            .unwrap();
1532        assert!(ct.contains("json"));
1533        let rid = resp
1534            .headers()
1535            .get("x-amzn-requestid")
1536            .unwrap()
1537            .to_str()
1538            .unwrap();
1539        assert_eq!(rid, "req-1");
1540    }
1541
1542    #[test]
1543    fn build_error_response_rest_returns_xml_content_type() {
1544        let resp = build_error_response(
1545            StatusCode::NOT_FOUND,
1546            "NoSuchBucket",
1547            "bucket missing",
1548            "req-2",
1549            AwsProtocol::Rest,
1550        );
1551        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
1552        let ct = resp
1553            .headers()
1554            .get("content-type")
1555            .unwrap()
1556            .to_str()
1557            .unwrap();
1558        assert!(ct.contains("xml"));
1559    }
1560
1561    #[test]
1562    fn build_error_response_query_returns_xml() {
1563        let resp = build_error_response(
1564            StatusCode::BAD_REQUEST,
1565            "InvalidParameter",
1566            "bad param",
1567            "req-3",
1568            AwsProtocol::Query,
1569        );
1570        let ct = resp
1571            .headers()
1572            .get("content-type")
1573            .unwrap()
1574            .to_str()
1575            .unwrap();
1576        assert!(ct.contains("xml"));
1577    }
1578
1579    /// Regression for issue #1539: multi-line backend errors (e.g. podman
1580    /// stderr) used to panic the dispatcher when stuffed into the
1581    /// `x-amz-error-message` HTTP header. The response must build cleanly
1582    /// and the header value must not contain control characters.
1583    #[test]
1584    fn build_error_response_with_multiline_message_does_not_panic() {
1585        let resp = build_error_response(
1586            StatusCode::INTERNAL_SERVER_ERROR,
1587            "ServiceException",
1588            "Lambda execution failed: container failed to start: docker start failed: \
1589             Error: unable to start container \"abc\": \
1590             failed to create new hosts file:\nhost-gateway is empty\n",
1591            "req-multi",
1592            AwsProtocol::Json,
1593        );
1594        assert_eq!(resp.status(), StatusCode::INTERNAL_SERVER_ERROR);
1595        let msg = resp
1596            .headers()
1597            .get("x-amz-error-message")
1598            .expect("x-amz-error-message must be set even when input contains newlines")
1599            .to_str()
1600            .unwrap();
1601        assert!(!msg.contains('\n'));
1602        assert!(!msg.contains('\r'));
1603        assert!(msg.contains("Lambda execution failed"));
1604        assert!(msg.contains("host-gateway is empty"));
1605    }
1606
1607    #[test]
1608    fn build_error_response_with_control_chars_strips_them() {
1609        let resp = build_error_response(
1610            StatusCode::BAD_REQUEST,
1611            "Code\twith\ttabs",
1612            "msg\x00with\x01nulls",
1613            "req-ctrl",
1614            AwsProtocol::Json,
1615        );
1616        let code = resp
1617            .headers()
1618            .get("x-amz-error-code")
1619            .unwrap()
1620            .to_str()
1621            .unwrap();
1622        let msg = resp
1623            .headers()
1624            .get("x-amz-error-message")
1625            .unwrap()
1626            .to_str()
1627            .unwrap();
1628        assert!(!code.contains('\t'));
1629        assert!(!msg.contains('\x00'));
1630        assert!(!msg.contains('\x01'));
1631    }
1632
1633    #[test]
1634    fn sanitize_header_value_truncates_long_input() {
1635        let huge = "x".repeat(5_000);
1636        let out = sanitize_header_value(&huge);
1637        assert!(out.len() <= 1024);
1638    }
1639
1640    #[test]
1641    fn sanitize_header_value_collapses_consecutive_control_runs() {
1642        let out = sanitize_header_value("a\n\n\n\rb");
1643        assert_eq!(out, "a b");
1644    }
1645
1646    #[test]
1647    fn dispatch_config_carries_opt_in_flags() {
1648        let cfg = DispatchConfig {
1649            region: "eu-west-1".to_string(),
1650            account_id: "000000000000".to_string(),
1651            verify_sigv4: true,
1652            iam_mode: IamMode::Strict,
1653            credential_resolver: None,
1654            policy_evaluator: None,
1655            resource_policy_provider: None,
1656            scp_resolver: None,
1657        };
1658        assert!(cfg.verify_sigv4);
1659        assert!(cfg.iam_mode.is_strict());
1660        assert!(cfg.resource_policy_provider.is_none());
1661        assert!(cfg.scp_resolver.is_none());
1662    }
1663
1664    fn s3_sigv4_headers() -> http::HeaderMap {
1665        let mut headers = http::HeaderMap::new();
1666        headers.insert(
1667            "authorization",
1668            "AWS4-HMAC-SHA256 Credential=test/20240101/us-east-1/s3/aws4_request, \
1669             SignedHeaders=host, Signature=fake"
1670                .parse()
1671                .unwrap(),
1672        );
1673        headers
1674    }
1675
1676    #[test]
1677    fn streaming_route_path_style_s3_put_object() {
1678        let headers = s3_sigv4_headers();
1679        assert_eq!(
1680            streaming_route(
1681                &http::Method::PUT,
1682                "/my-bucket/key.txt",
1683                &headers,
1684                &HashMap::new(),
1685            ),
1686            Some(("s3", "")),
1687        );
1688    }
1689
1690    #[test]
1691    fn streaming_route_path_style_create_bucket_skipped() {
1692        // `PUT /bucket` (no trailing key) is CreateBucket — must NOT
1693        // hit the streaming path.
1694        let headers = s3_sigv4_headers();
1695        assert_eq!(
1696            streaming_route(&http::Method::PUT, "/my-bucket", &headers, &HashMap::new(),),
1697            None,
1698        );
1699    }
1700
1701    #[test]
1702    fn streaming_route_virtual_hosted_s3_put_object() {
1703        let mut headers = s3_sigv4_headers();
1704        headers.insert(
1705            "host",
1706            "vhost-bucket.s3.us-east-1.localhost.localstack.cloud:4566"
1707                .parse()
1708                .unwrap(),
1709        );
1710        // Virtual-hosted PUT has no bucket in the URL path (`/<key>`),
1711        // so the slash check used for path-style would reject it. The
1712        // Host parser confirms this is virtual-hosted S3 and the key
1713        // flows through the streaming dispatch.
1714        assert_eq!(
1715            streaming_route(&http::Method::PUT, "/hello.txt", &headers, &HashMap::new(),),
1716            Some(("s3", "")),
1717        );
1718    }
1719
1720    #[test]
1721    fn streaming_route_virtual_hosted_s3_root_skipped() {
1722        // `PUT /` against a virtual-hosted Host = CreateBucket, which
1723        // is handled buffered. Empty path-after-slash must short-circuit.
1724        let mut headers = s3_sigv4_headers();
1725        headers.insert(
1726            "host",
1727            "vhost-bucket.s3.us-east-1.localhost.localstack.cloud:4566"
1728                .parse()
1729                .unwrap(),
1730        );
1731        assert_eq!(
1732            streaming_route(&http::Method::PUT, "/", &headers, &HashMap::new()),
1733            None,
1734        );
1735    }
1736
1737    #[test]
1738    fn streaming_route_ecr_blob_upload() {
1739        let headers = http::HeaderMap::new();
1740        assert_eq!(
1741            streaming_route(
1742                &http::Method::PATCH,
1743                "/v2/my-repo/blobs/uploads/abcd1234",
1744                &headers,
1745                &HashMap::new(),
1746            ),
1747            Some(("ecr", "")),
1748        );
1749        assert_eq!(
1750            streaming_route(
1751                &http::Method::PUT,
1752                "/v2/my-repo/blobs/uploads/abcd1234",
1753                &headers,
1754                &HashMap::new(),
1755            ),
1756            Some(("ecr", "")),
1757        );
1758    }
1759
1760    #[test]
1761    fn streaming_route_presigned_v4_s3_put() {
1762        let headers = http::HeaderMap::new();
1763        let mut query_params = HashMap::new();
1764        query_params.insert(
1765            "X-Amz-Credential".to_string(),
1766            "test/20240101/us-east-1/s3/aws4_request".to_string(),
1767        );
1768        assert_eq!(
1769            streaming_route(
1770                &http::Method::PUT,
1771                "/my-bucket/key.txt",
1772                &headers,
1773                &query_params,
1774            ),
1775            Some(("s3", "")),
1776        );
1777    }
1778
1779    #[test]
1780    fn streaming_route_non_s3_auth_header_skipped() {
1781        // Same path shape but the SigV4 service is lambda — must not
1782        // wire the streaming dispatch (Lambda has its own buffered path).
1783        let mut headers = http::HeaderMap::new();
1784        headers.insert(
1785            "authorization",
1786            "AWS4-HMAC-SHA256 Credential=test/20240101/us-east-1/lambda/aws4_request, \
1787             SignedHeaders=host, Signature=fake"
1788                .parse()
1789                .unwrap(),
1790        );
1791        assert_eq!(
1792            streaming_route(
1793                &http::Method::PUT,
1794                "/my-bucket/key.txt",
1795                &headers,
1796                &HashMap::new(),
1797            ),
1798            None,
1799        );
1800    }
1801
1802    #[test]
1803    fn streaming_route_get_skipped() {
1804        let headers = s3_sigv4_headers();
1805        assert_eq!(
1806            streaming_route(
1807                &http::Method::GET,
1808                "/my-bucket/key.txt",
1809                &headers,
1810                &HashMap::new(),
1811            ),
1812            None,
1813        );
1814    }
1815}