Skip to main content

fakecloud_core/
dispatch.rs

1use axum::body::Body;
2use axum::extract::{ConnectInfo, Extension, Query};
3use axum::http::{Request, StatusCode};
4use axum::response::Response;
5use bytes::Bytes;
6use std::collections::HashMap;
7use std::net::SocketAddr;
8use std::sync::Arc;
9
10use crate::auth::{
11    is_root_bypass, ConditionContext, CredentialResolver, IamMode, IamPolicyEvaluator, Principal,
12    PrincipalType, ResourcePolicyProvider,
13};
14use crate::protocol::{self, AwsProtocol};
15use crate::registry::ServiceRegistry;
16use crate::service::{AwsRequest, ResponseBody};
17
18/// The main dispatch handler. All HTTP requests come through here.
19pub async fn dispatch(
20    ConnectInfo(remote_addr): ConnectInfo<SocketAddr>,
21    Extension(registry): Extension<Arc<ServiceRegistry>>,
22    Extension(config): Extension<Arc<DispatchConfig>>,
23    Query(query_params): Query<HashMap<String, String>>,
24    request: Request<Body>,
25) -> Response<Body> {
26    let remote_addr = Some(remote_addr);
27    let request_id = uuid::Uuid::new_v4().to_string();
28
29    let (parts, body) = request.into_parts();
30
31    // Streaming opt-in: if the route is a known large-body S3 / ECR
32    // upload, we skip the buffered `to_bytes` step entirely and hand
33    // the raw body to the service handler. The handler spills it to
34    // disk on the fly. Header-only detection covers every streaming
35    // candidate (none of them rely on form-body sniffing).
36    let stream_route = streaming_route(
37        &parts.method,
38        parts.uri.path(),
39        &parts.headers,
40        &query_params,
41    );
42    let header_only = protocol::detect_service_headers_only(&parts.headers, &query_params);
43    let stream_dispatch = match (&stream_route, &header_only) {
44        // Header-only detection agrees with the URL match — covers S3
45        // PUT object (SigV4 service=s3 in Authorization).
46        (Some(sr), Some(detected)) if sr.0 == detected.service => Some(detected.clone()),
47        // ECR OCI v2 blob upload has no AWS auth header; the path
48        // alone (`/v2/.../blobs/uploads/...`) tells us the route is
49        // ECR. Synthesize a DetectedRequest so dispatch picks the
50        // streaming path. Same special-case the buffered branch
51        // applies on detect_service None (see below).
52        (Some((service, _)), None) if *service == "ecr" => Some(protocol::DetectedRequest {
53            service: "ecr".to_string(),
54            action: String::new(),
55            protocol: AwsProtocol::Rest,
56        }),
57        _ => None,
58    };
59
60    let (body_bytes, body_stream) = if stream_dispatch.is_some() {
61        (Bytes::new(), Some(body))
62    } else {
63        // Buffered path: materialize the body into memory under the
64        // configured cap. `FAKECLOUD_MAX_REQUEST_BODY_BYTES` (default
65        // 1 GiB) caps non-streaming requests; streaming routes have no
66        // cap because nothing materializes the entire body in RAM.
67        let max_body_bytes = max_request_body_bytes();
68        match axum::body::to_bytes(body, max_body_bytes).await {
69            Ok(b) => (b, None),
70            Err(_) => {
71                return build_error_response(
72                    StatusCode::PAYLOAD_TOO_LARGE,
73                    "RequestEntityTooLarge",
74                    "Request body too large",
75                    &request_id,
76                    AwsProtocol::Query,
77                );
78            }
79        }
80    };
81
82    // Detect service and action
83    let detected = if let Some(d) = stream_dispatch {
84        d
85    } else {
86        match protocol::detect_service(&parts.headers, &query_params, &body_bytes) {
87            Some(d) => d,
88            None => {
89                // OPTIONS requests (CORS preflight) don't carry Authorization headers.
90                // Route them to S3 since S3 is the only REST service that handles CORS.
91                // Note: API Gateway CORS preflight is not fully supported in this emulator
92                // because we can't distinguish between S3 and API Gateway OPTIONS requests
93                // without additional context (in real AWS, they have different domains).
94                if parts.method == http::Method::OPTIONS {
95                    protocol::DetectedRequest {
96                        service: "s3".to_string(),
97                        action: String::new(),
98                        protocol: AwsProtocol::Rest,
99                    }
100                } else if parts.uri.path() == "/v2" || parts.uri.path().starts_with("/v2/") {
101                    // OCI Distribution v2 protocol. Docker CLI / OCI clients
102                    // use Basic auth (not SigV4) and GET /v2/ with no body,
103                    // so this must be matched before the apigateway fallback.
104                    protocol::DetectedRequest {
105                        service: "ecr".to_string(),
106                        action: String::new(),
107                        protocol: AwsProtocol::Rest,
108                    }
109                } else if let Some(bucket) = anonymous_s3_bucket(&parts.uri, &config) {
110                    // Unsigned request whose first path segment names an
111                    // existing S3 bucket: an anonymous path-style S3 access
112                    // (e.g. serving a public-read object to a browser). Without
113                    // this it would fall through to the apigateway catch-all
114                    // below and 404 with "Stage not found" (#1707). Authorization
115                    // for the anonymous caller still runs in the IAM block.
116                    tracing::debug!(bucket = %bucket, "routing unsigned request to S3 (existing bucket)");
117                    protocol::DetectedRequest {
118                        service: "s3".to_string(),
119                        action: String::new(),
120                        protocol: AwsProtocol::Rest,
121                    }
122                } else if !parts.uri.path().starts_with("/_") {
123                    // Requests without AWS auth that don't match any service might be
124                    // API Gateway execute API calls (plain HTTP without signatures).
125                    // Route them to apigateway service which will validate if a matching
126                    // API/stage exists. Skip special FakeCloud endpoints (/_*).
127                    protocol::DetectedRequest {
128                        service: "apigateway".to_string(),
129                        action: String::new(),
130                        protocol: AwsProtocol::RestJson,
131                    }
132                } else {
133                    return build_error_response(
134                        StatusCode::BAD_REQUEST,
135                        "MissingAction",
136                        "Could not determine target service or action from request",
137                        &request_id,
138                        AwsProtocol::Query,
139                    );
140                }
141            }
142        }
143    };
144
145    // Bedrock-agent and bedrock-runtime both send `bedrock` in the SigV4
146    // credential scope, but bedrock-agent has its own service handler.
147    // Disambiguate based on the request path.
148    let detected = if detected.service == "bedrock" {
149        let first_seg = parts.uri.path().split('/').nth(1);
150        if matches!(
151            first_seg,
152            Some(
153                "agents"
154                    | "knowledgebases"
155                    | "flows"
156                    | "prompts"
157                    | "tags"
158                    | "retrieveAndGenerate"
159                    | "retrieveAndGenerateStream"
160                    | "optimize-prompt"
161                    | "sessions"
162                    | "invocations"
163                    | "generate-query"
164                    | "rerank"
165            )
166        ) {
167            // Further disambiguate runtime vs control plane for agents/flows paths
168            let segs: Vec<&str> = parts.uri.path().split('/').collect();
169            let is_runtime = matches!(
170                segs.as_slice(),
171                ["", "agents", _, "agentAliases", _, ..]  // InvokeAgent
172                    | ["", "flows", _, "aliases", _]   // InvokeFlow
173                    | ["", "knowledgebases", _, "retrieve"] // Retrieve
174                    | ["", "retrieveAndGenerate"]
175                    | ["", "retrieveAndGenerateStream"]
176                    | ["", "optimize-prompt"]
177                    | ["", "sessions", ..]
178                    | ["", "invocations", ..]
179                    | ["", "generate-query"]
180                    | ["", "rerank"]
181            );
182            if is_runtime {
183                protocol::DetectedRequest {
184                    service: "bedrock-agent-runtime".to_string(),
185                    ..detected
186                }
187            } else {
188                protocol::DetectedRequest {
189                    service: "bedrock-agent".to_string(),
190                    ..detected
191                }
192            }
193        } else {
194            detected
195        }
196    } else {
197        detected
198    };
199
200    // Look up service
201    let service = match registry.get(&detected.service) {
202        Some(s) => s,
203        None => {
204            return build_error_response(
205                detected.protocol.error_status(),
206                "UnknownService",
207                &format!("Service '{}' is not available", detected.service),
208                &request_id,
209                detected.protocol,
210            );
211        }
212    };
213
214    // Extract region and access key from auth header (or presigned query).
215    let auth_header = parts
216        .headers
217        .get("authorization")
218        .and_then(|v| v.to_str().ok())
219        .unwrap_or("");
220    let header_info = fakecloud_aws::sigv4::parse_sigv4(auth_header);
221    let presigned_info = if header_info.is_none() {
222        // Presigned URL: credentials live in the query string.
223        fakecloud_aws::sigv4::parse_sigv4_presigned(&query_params).map(|p| p.as_info())
224    } else {
225        None
226    };
227    let sigv4_info = header_info.or(presigned_info);
228    // SigV2 presigned URLs (`AWSAccessKeyId` + `Signature` + `Expires` query
229    // parameters) carry the access key outside the SigV4 grammar, so the SigV4
230    // parsers above return None. Recover the key here so a SigV2-presigned
231    // request is attributed to its caller instead of being treated as
232    // anonymous (which would deny it under the object read auth gate).
233    let access_key_id = sigv4_info
234        .as_ref()
235        .map(|info| info.access_key.clone())
236        .or_else(|| sigv2_presigned_access_key(&query_params));
237
238    // Host-header routing hint: LocalStack-shaped
239    // `<svc>.<region>.localhost.localstack.cloud[:port]`, real-AWS
240    // `<svc>.<region>.amazonaws.com`, and every S3 virtual-hosted variant
241    // of both. Secondary region source and carries the bucket for
242    // virtual-hosted S3 path rewrite.
243    let host_info = protocol::parse_routing_host_from_headers(&parts.headers);
244
245    let region = sigv4_info
246        .map(|info| info.region)
247        .or_else(|| host_info.as_ref().map(|h| h.region.clone()))
248        .or_else(|| extract_region_from_user_agent(&parts.headers))
249        .unwrap_or_else(|| config.region.clone());
250
251    // Resolve the caller's principal up front so both SigV4 verification
252    // (which needs the secret) and the service handler (which needs the
253    // identity for GetCallerIdentity and IAM enforcement) share a single
254    // lookup. The root-bypass AKID skips resolution entirely — `test`
255    // credentials have no backing identity and must always pass.
256    let caller_akid = access_key_id.as_deref().unwrap_or("");
257    let resolved = if !caller_akid.is_empty() && !is_root_bypass(caller_akid) {
258        config
259            .credential_resolver
260            .as_ref()
261            .and_then(|r| r.resolve(caller_akid))
262    } else {
263        None
264    };
265    let caller_principal = resolved.as_ref().map(|r| r.principal.clone());
266    let caller_session_policies = resolved
267        .as_ref()
268        .map(|r| r.session_policies.clone())
269        .unwrap_or_default();
270
271    // Opt-in SigV4 cryptographic verification. Runs before the service
272    // handler so a failing signature never reaches business logic. The
273    // reserved `test*` root identity short-circuits verification to keep
274    // local-dev workflows frictionless.
275    if config.verify_sigv4 && !is_root_bypass(caller_akid) && config.credential_resolver.is_some() {
276        let amz_date = parts
277            .headers
278            .get("x-amz-date")
279            .and_then(|v| v.to_str().ok());
280        let parsed = fakecloud_aws::sigv4::parse_sigv4_header(auth_header, amz_date)
281            .or_else(|| fakecloud_aws::sigv4::parse_sigv4_presigned(&query_params));
282        let parsed = match parsed {
283            Some(p) => p,
284            None => {
285                return build_error_response(
286                    StatusCode::FORBIDDEN,
287                    "IncompleteSignature",
288                    "Request is missing or has a malformed AWS Signature",
289                    &request_id,
290                    detected.protocol,
291                );
292            }
293        };
294        let resolved_for_verify = match resolved.as_ref() {
295            Some(r) => r,
296            None => {
297                return build_error_response(
298                    StatusCode::FORBIDDEN,
299                    "InvalidClientTokenId",
300                    "The security token included in the request is invalid",
301                    &request_id,
302                    detected.protocol,
303                );
304            }
305        };
306        let headers_vec = fakecloud_aws::sigv4::headers_from_http(&parts.headers);
307        let raw_query_for_verify = parts.uri.query().unwrap_or("").to_string();
308        let verify_req = fakecloud_aws::sigv4::VerifyRequest {
309            method: parts.method.as_str(),
310            path: parts.uri.path(),
311            query: &raw_query_for_verify,
312            headers: &headers_vec,
313            body: &body_bytes,
314        };
315        match fakecloud_aws::sigv4::verify(
316            &parsed,
317            &verify_req,
318            &resolved_for_verify.secret_access_key,
319            chrono::Utc::now(),
320        ) {
321            Ok(()) => {}
322            Err(fakecloud_aws::sigv4::SigV4Error::RequestTimeTooSkewed { .. }) => {
323                return build_error_response(
324                    StatusCode::FORBIDDEN,
325                    "RequestTimeTooSkewed",
326                    "The difference between the request time and the current time is too large",
327                    &request_id,
328                    detected.protocol,
329                );
330            }
331            Err(fakecloud_aws::sigv4::SigV4Error::InvalidDate(msg)) => {
332                return build_error_response(
333                    StatusCode::FORBIDDEN,
334                    "IncompleteSignature",
335                    &format!("Invalid x-amz-date: {msg}"),
336                    &request_id,
337                    detected.protocol,
338                );
339            }
340            Err(fakecloud_aws::sigv4::SigV4Error::Malformed(msg)) => {
341                return build_error_response(
342                    StatusCode::FORBIDDEN,
343                    "IncompleteSignature",
344                    &format!("Malformed SigV4 signature: {msg}"),
345                    &request_id,
346                    detected.protocol,
347                );
348            }
349            Err(fakecloud_aws::sigv4::SigV4Error::SignatureMismatch) => {
350                return build_error_response(
351                    StatusCode::FORBIDDEN,
352                    "SignatureDoesNotMatch",
353                    "The request signature we calculated does not match the signature you provided",
354                    &request_id,
355                    detected.protocol,
356                );
357            }
358            Err(fakecloud_aws::sigv4::SigV4Error::PresignedUrlExpired { .. }) => {
359                return build_error_response(
360                    StatusCode::FORBIDDEN,
361                    "AccessDenied",
362                    "Request has expired",
363                    &request_id,
364                    detected.protocol,
365                );
366            }
367            Err(fakecloud_aws::sigv4::SigV4Error::InvalidPresignExpires(_)) => {
368                return build_error_response(
369                    StatusCode::BAD_REQUEST,
370                    "AuthorizationQueryParametersError",
371                    "X-Amz-Expires must be a number between 1 and 604800 seconds",
372                    &request_id,
373                    detected.protocol,
374                );
375            }
376        }
377    }
378
379    // Build path segments. For S3 virtual-hosted-style requests the bucket
380    // lives in the Host header, not the path — prepend it so the S3 handler
381    // sees a uniform path-style request. SigV4 verification above already
382    // ran against the wire path, so this rewrite is signature-safe.
383    let wire_path = parts.uri.path();
384    let path = if detected.service == "s3" {
385        if let Some(bucket) = host_info.as_ref().and_then(|h| h.bucket.as_deref()) {
386            let prefix_with_slash = format!("/{bucket}/");
387            let is_bucket_root = wire_path.trim_end_matches('/') == format!("/{bucket}");
388            if wire_path.starts_with(&prefix_with_slash) || is_bucket_root {
389                wire_path.to_string()
390            } else if wire_path == "/" || wire_path.is_empty() {
391                format!("/{bucket}")
392            } else {
393                format!("/{bucket}{wire_path}")
394            }
395        } else {
396            wire_path.to_string()
397        }
398    } else {
399        wire_path.to_string()
400    };
401    let raw_query = parts.uri.query().unwrap_or("").to_string();
402    let path_segments: Vec<String> = path
403        .split('/')
404        .filter(|s| !s.is_empty())
405        .map(|s| s.to_string())
406        .collect();
407
408    // For JSON protocol, validate that non-empty bodies are valid JSON
409    if detected.protocol == AwsProtocol::Json
410        && !body_bytes.is_empty()
411        && serde_json::from_slice::<serde_json::Value>(&body_bytes).is_err()
412    {
413        return build_error_response(
414            StatusCode::BAD_REQUEST,
415            "SerializationException",
416            "Start of structure or map found where not expected",
417            &request_id,
418            AwsProtocol::Json,
419        );
420    }
421
422    // Merge query params with form body params for Query protocol
423    let mut all_params = query_params;
424    if detected.protocol == AwsProtocol::Query {
425        let body_params = protocol::parse_query_body(&body_bytes);
426        for (k, v) in body_params {
427            all_params.entry(k).or_insert(v);
428        }
429    }
430
431    let aws_request = AwsRequest {
432        service: detected.service.clone(),
433        action: detected.action.clone(),
434        region,
435        account_id: caller_principal
436            .as_ref()
437            .map(|p| p.account_id.clone())
438            .unwrap_or_else(|| config.account_id.clone()),
439        request_id: request_id.clone(),
440        headers: parts.headers,
441        query_params: all_params,
442        body: body_bytes,
443        body_stream: parking_lot::Mutex::new(body_stream),
444        path_segments,
445        raw_path: path,
446        raw_query,
447        method: parts.method,
448        is_query_protocol: detected.protocol == AwsProtocol::Query,
449        access_key_id,
450        principal: caller_principal,
451    };
452
453    tracing::info!(
454        service = %aws_request.service,
455        action = %aws_request.action,
456        request_id = %aws_request.request_id,
457        "handling request"
458    );
459
460    // Opt-in IAM identity-policy enforcement. Runs before the service
461    // handler so a deny never reaches business logic. Root principals
462    // (both `test*` bypass AKIDs and the account's IAM root) are exempt,
463    // matching AWS behavior. Services that haven't opted in via
464    // `iam_enforceable()` are transparently skipped — the startup log
465    // lists which services are under enforcement so users always know.
466    if config.iam_mode.is_enabled()
467        && service.iam_enforceable()
468        && !is_root_bypass(aws_request.access_key_id.as_deref().unwrap_or(""))
469    {
470        if let Some(evaluator) = config.policy_evaluator.as_ref() {
471            if let Some(principal) = aws_request.principal.as_ref() {
472                if !principal.is_root() {
473                    if let Some(iam_action) = service.iam_action_for(&aws_request) {
474                        let mut condition_context = build_condition_context(
475                            principal,
476                            remote_addr,
477                            &aws_request.region,
478                            is_secure_transport(&aws_request.headers),
479                        );
480                        // F3 keys riding on the resolved credential. STS
481                        // populates these at mint time so subsequent
482                        // requests under the credential can be evaluated
483                        // against `aws:MultiFactorAuthPresent`,
484                        // `aws:MultiFactorAuthAge`, `aws:TokenIssueTime`,
485                        // and `aws:FederatedProvider`. IAM user access
486                        // keys carry none of these, matching AWS.
487                        if let Some(rc) = resolved.as_ref() {
488                            condition_context.aws_mfa_present = Some(rc.mfa_present);
489                            condition_context.aws_token_issue_time = rc.token_issued_at;
490                            condition_context.aws_federated_provider =
491                                rc.federated_provider.clone();
492                            // `aws:MultiFactorAuthAge` is "seconds since
493                            // MFA was asserted" — computed at evaluation
494                            // time from the token issue moment so the
495                            // value increases monotonically as the session
496                            // ages. Only set when the session was actually
497                            // minted with MFA; otherwise the key is
498                            // absent, matching AWS.
499                            if rc.mfa_present {
500                                if let Some(issued) = rc.token_issued_at {
501                                    let age = chrono::Utc::now()
502                                        .signed_duration_since(issued)
503                                        .num_seconds()
504                                        .max(0);
505                                    condition_context.aws_mfa_age_seconds = Some(age);
506                                }
507                            }
508                        }
509                        condition_context.service_keys =
510                            service.iam_condition_keys_for(&aws_request, &iam_action);
511
512                        // ABAC: populate tag-based condition keys.
513                        // aws:ResourceTag/*
514                        match service.resource_tags_for(&iam_action.resource) {
515                            Some(tags) => condition_context.resource_tags = Some(tags),
516                            None => tracing::debug!(
517                                target: "fakecloud::iam::audit",
518                                service = %detected.service,
519                                resource = %iam_action.resource,
520                                "service does not expose resource tags for ABAC; skipping aws:ResourceTag/* evaluation"
521                            ),
522                        }
523                        // aws:RequestTag/* + aws:TagKeys
524                        match service.request_tags_from(&aws_request, iam_action.action) {
525                            Some(tags) => condition_context.request_tags = Some(tags),
526                            None => tracing::debug!(
527                                target: "fakecloud::iam::audit",
528                                service = %detected.service,
529                                action = %iam_action.action_string(),
530                                "service does not expose request tags for ABAC; skipping aws:RequestTag/* / aws:TagKeys evaluation"
531                            ),
532                        }
533                        // aws:PrincipalTag/*
534                        condition_context.principal_tags = principal.tags.clone();
535
536                        // Phase 2: fetch the resource-based policy (if
537                        // any) attached to the target resource and
538                        // pass it to the evaluator alongside the
539                        // principal's identity policies. The resource's
540                        // owning account is parsed from the ARN (#381
541                        // multi-account alignment); S3 ARNs have an
542                        // empty account field, so we fall back to the
543                        // server's configured account ID in that case.
544                        let resource_policy_json =
545                            config.resource_policy_provider.as_ref().and_then(|p| {
546                                p.resource_policy(&detected.service, &iam_action.resource)
547                            });
548                        // Derive the resource-owning account. Prefer a provider
549                        // lookup (S3 ARNs carry no account, so the bucket's
550                        // owner is resolved from state — without this, account
551                        // A reaching account B's bucket would be mis-read as
552                        // same-account and skip B's bucket-policy requirement,
553                        // bug-audit 2026-05-28, 5.3), then fall back to the
554                        // account embedded in the ARN (SQS/SNS/Lambda/…), then
555                        // to the caller's account for wildcard / unscoped
556                        // actions (ListQueues, GetCallerIdentity).
557                        let resource_account_id = config
558                            .resource_policy_provider
559                            .as_ref()
560                            .and_then(|p| {
561                                p.resource_owner_account(&detected.service, &iam_action.resource)
562                            })
563                            .or_else(|| parse_account_from_arn(&iam_action.resource))
564                            .unwrap_or_else(|| principal.account_id.clone());
565                        // SCP ceiling: resolve the inherited SCP chain
566                        // for this principal (management accounts and
567                        // service-linked roles come back as `None`, in
568                        // which case the evaluator treats the layer as
569                        // absent). Audit breadcrumbs emitted by the
570                        // resolver itself, not here.
571                        let scps = config
572                            .scp_resolver
573                            .as_ref()
574                            .and_then(|r| r.scps_for(principal));
575                        let decision = evaluator.evaluate_with_resource_policy(
576                            principal,
577                            &iam_action,
578                            &condition_context,
579                            resource_policy_json.as_deref(),
580                            &resource_account_id,
581                            &caller_session_policies,
582                            scps.as_deref(),
583                        );
584                        if !decision.is_allow() {
585                            tracing::warn!(
586                                target: "fakecloud::iam::audit",
587                                service = %detected.service,
588                                action = %iam_action.action_string(),
589                                resource = %iam_action.resource,
590                                principal = %principal.arn,
591                                resource_policy_present = resource_policy_json.is_some(),
592                                decision = ?decision,
593                                mode = %config.iam_mode,
594                                request_id = %request_id,
595                                "IAM policy evaluation denied request"
596                            );
597                            if config.iam_mode.is_strict() {
598                                // Real AWS includes an "Encoded
599                                // authorization failure message" suffix
600                                // on AccessDeniedException — an opaque
601                                // base64+zlib JSON blob that the caller
602                                // can pass to STS
603                                // `DecodeAuthorizationMessage` to
604                                // recover the structured deny reason
605                                // (action, principal, matched
606                                // statements, condition context). We
607                                // produce the same blob inline so
608                                // existing tooling that decodes deny
609                                // reasons works against fakecloud.
610                                let context_summary = serde_json::json!({
611                                    "aws:PrincipalArn": principal.arn,
612                                    "aws:PrincipalAccount": principal.account_id,
613                                    "aws:RequestedRegion": condition_context
614                                        .aws_requested_region
615                                        .clone()
616                                        .unwrap_or_default(),
617                                    "aws:SecureTransport": condition_context
618                                        .aws_secure_transport
619                                        .unwrap_or(false),
620                                    "aws:Action": iam_action.action_string(),
621                                    "aws:Resource": iam_action.resource,
622                                    "decision": format!("{:?}", decision),
623                                });
624                                let action_string = iam_action.action_string();
625                                let encoded = crate::auth_message::encode_deny(
626                                    matches!(decision, crate::auth::IamDecision::ExplicitDeny),
627                                    Some(&action_string),
628                                    Some(&principal.arn),
629                                    Vec::new(),
630                                    Some(context_summary),
631                                );
632                                return build_error_response(
633                                    StatusCode::FORBIDDEN,
634                                    "AccessDeniedException",
635                                    &format!(
636                                        "User: {} is not authorized to perform: {} on resource: {} Encoded authorization failure message: {}",
637                                        principal.arn,
638                                        iam_action.action_string(),
639                                        iam_action.resource,
640                                        encoded,
641                                    ),
642                                    &request_id,
643                                    detected.protocol,
644                                );
645                            }
646                            // Soft mode: audit log already emitted; fall
647                            // through to the handler.
648                        }
649                    } else {
650                        // Service opted in but didn't return an IamAction
651                        // for this specific operation — programming bug,
652                        // surface it loudly in soft/strict mode so it's
653                        // visible during rollout.
654                        tracing::warn!(
655                            target: "fakecloud::iam::audit",
656                            service = %detected.service,
657                            action = %aws_request.action,
658                            "service is iam_enforceable but has no IamAction mapping for this action; skipping evaluation"
659                        );
660                    }
661                }
662            } else if aws_request.access_key_id.is_none() {
663                // Truly anonymous (unsigned) caller — no Authorization header at
664                // all. No identity policies exist, so authorization rests
665                // entirely on the resource policy (a bucket policy granting
666                // `Principal:"*"`) and public-read ACLs — mirroring AWS, which
667                // denies anonymous requests unless the resource is explicitly
668                // made public. Without this an anonymous request that reached an
669                // iam_enforceable service in enforcement mode would bypass
670                // authorization entirely.
671                //
672                // A request that carried an Authorization header but whose
673                // credential did not resolve (principal `None` with
674                // `access_key_id` `Some`) is intentionally left alone here: with
675                // SigV4 verification off, fakecloud does not reject unverified
676                // signed requests, and turning them into anonymous denials would
677                // change long-standing behavior.
678                if let Some(iam_action) = service.iam_action_for(&aws_request) {
679                    let now = chrono::Utc::now();
680                    let mut condition_context = ConditionContext {
681                        aws_source_ip: remote_addr.map(|sa| sa.ip()),
682                        aws_current_time: Some(now),
683                        aws_epoch_time: Some(now.timestamp()),
684                        aws_secure_transport: Some(is_secure_transport(&aws_request.headers)),
685                        aws_requested_region: Some(aws_request.region.clone()),
686                        ..Default::default()
687                    };
688                    condition_context.service_keys =
689                        service.iam_condition_keys_for(&aws_request, &iam_action);
690                    let resource_policy_json = config
691                        .resource_policy_provider
692                        .as_ref()
693                        .and_then(|p| p.resource_policy(&detected.service, &iam_action.resource));
694                    let policy_allows = evaluator
695                        .evaluate_anonymous(
696                            &iam_action,
697                            &condition_context,
698                            resource_policy_json.as_deref(),
699                        )
700                        .is_allow();
701                    let acl_allows = config.resource_policy_provider.as_ref().is_some_and(|p| {
702                        p.public_acl_allows(
703                            &detected.service,
704                            &iam_action.resource,
705                            iam_action.action,
706                        )
707                    });
708                    if !policy_allows && !acl_allows {
709                        tracing::warn!(
710                            target: "fakecloud::iam::audit",
711                            service = %detected.service,
712                            action = %iam_action.action_string(),
713                            resource = %iam_action.resource,
714                            resource_policy_present = resource_policy_json.is_some(),
715                            mode = %config.iam_mode,
716                            request_id = %request_id,
717                            "anonymous request denied: no public bucket policy or ACL grants the action"
718                        );
719                        if config.iam_mode.is_strict() {
720                            return build_error_response(
721                                StatusCode::FORBIDDEN,
722                                "AccessDenied",
723                                "Access Denied",
724                                &request_id,
725                                detected.protocol,
726                            );
727                        }
728                        // Soft mode: audit log emitted; fall through to the handler.
729                    }
730                }
731            }
732        }
733    }
734
735    match service.handle(aws_request).await {
736        Ok(resp) => {
737            let mut builder = Response::builder()
738                .status(resp.status)
739                .header("x-amzn-requestid", &request_id)
740                .header("x-amz-request-id", &request_id);
741
742            if !resp.content_type.is_empty() {
743                builder = builder.header("content-type", &resp.content_type);
744            }
745
746            let has_content_length = resp
747                .headers
748                .iter()
749                .any(|(k, _)| k.as_str().eq_ignore_ascii_case("content-length"));
750
751            for (k, v) in &resp.headers {
752                builder = builder.header(k, v);
753            }
754
755            match resp.body {
756                ResponseBody::Bytes(b) => builder.body(Body::from(b)).unwrap(),
757                ResponseBody::File { file, size } => {
758                    let stream = tokio_util::io::ReaderStream::new(file);
759                    let body = Body::from_stream(stream);
760                    if !has_content_length {
761                        builder = builder.header("content-length", size.to_string());
762                    }
763                    builder.body(body).unwrap()
764                }
765            }
766        }
767        Err(err) => {
768            tracing::warn!(
769                service = %detected.service,
770                action = %detected.action,
771                error = %err,
772                "request failed"
773            );
774            let error_headers = err.response_headers().to_vec();
775            let mut resp = build_error_response_with_fields(
776                err.status(),
777                err.code(),
778                &err.message(),
779                &request_id,
780                detected.protocol,
781                err.extra_fields(),
782            );
783            for (k, v) in &error_headers {
784                if let (Ok(name), Ok(val)) = (
785                    k.parse::<http::header::HeaderName>(),
786                    v.parse::<http::header::HeaderValue>(),
787                ) {
788                    resp.headers_mut().insert(name, val);
789                }
790            }
791            resp
792        }
793    }
794}
795
796/// Configuration passed to the dispatch handler.
797#[derive(Clone)]
798pub struct DispatchConfig {
799    pub region: String,
800    pub account_id: String,
801    /// Whether to cryptographically verify SigV4 signatures on incoming
802    /// requests. Wired through from `--verify-sigv4` /
803    /// `FAKECLOUD_VERIFY_SIGV4`. Off by default.
804    pub verify_sigv4: bool,
805    /// IAM policy evaluation mode. Wired through from `--iam` /
806    /// `FAKECLOUD_IAM`. Defaults to [`IamMode::Off`]. Actual evaluation is
807    /// added in a later batch; today this field is plumbed but never
808    /// consulted.
809    pub iam_mode: IamMode,
810    /// Resolves access key IDs to their secrets and owning principals.
811    /// Required when `verify_sigv4` or `iam_mode != Off`. When `None`, both
812    /// features gracefully degrade to off-by-default behavior.
813    pub credential_resolver: Option<Arc<dyn CredentialResolver>>,
814    /// Evaluates IAM identity policies for a resolved principal + action.
815    /// Required when `iam_mode != Off`. When `None`, enforcement silently
816    /// degrades to off even if `iam_mode` is set.
817    pub policy_evaluator: Option<Arc<dyn IamPolicyEvaluator>>,
818    /// Resolves resource-based policies (S3 bucket policies in the
819    /// initial rollout) to hand to the evaluator alongside the
820    /// principal's identity policies. `None` means the server was
821    /// started without any resource-policy-owning service registered;
822    /// dispatch then behaves as if no resource policy is attached to
823    /// any resource, identical to the Phase 1 behavior.
824    pub resource_policy_provider: Option<Arc<dyn ResourcePolicyProvider>>,
825    /// Resolves the ordered SCP chain that applies to a principal's
826    /// account (root-OU first, account-direct last). `None` means no
827    /// organizations resolver has been registered — SCPs never gate
828    /// any request in that case. Off-by-default matches the Batch 4
829    /// contract: zero behavior change until a user calls
830    /// `CreateOrganization` and the resolver is wired.
831    pub scp_resolver: Option<Arc<dyn crate::auth::ScpResolver>>,
832}
833
834impl std::fmt::Debug for DispatchConfig {
835    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
836        f.debug_struct("DispatchConfig")
837            .field("region", &self.region)
838            .field("account_id", &self.account_id)
839            .field("verify_sigv4", &self.verify_sigv4)
840            .field("iam_mode", &self.iam_mode)
841            .field(
842                "credential_resolver",
843                &self
844                    .credential_resolver
845                    .as_ref()
846                    .map(|_| "<CredentialResolver>"),
847            )
848            .field(
849                "policy_evaluator",
850                &self
851                    .policy_evaluator
852                    .as_ref()
853                    .map(|_| "<IamPolicyEvaluator>"),
854            )
855            .field(
856                "resource_policy_provider",
857                &self
858                    .resource_policy_provider
859                    .as_ref()
860                    .map(|_| "<ResourcePolicyProvider>"),
861            )
862            .field(
863                "scp_resolver",
864                &self.scp_resolver.as_ref().map(|_| "<ScpResolver>"),
865            )
866            .finish()
867    }
868}
869
870impl DispatchConfig {
871    /// Minimal constructor for tests and call sites that don't care about the
872    /// opt-in security features.
873    pub fn new(region: impl Into<String>, account_id: impl Into<String>) -> Self {
874        Self {
875            region: region.into(),
876            account_id: account_id.into(),
877            verify_sigv4: false,
878            iam_mode: IamMode::Off,
879            credential_resolver: None,
880            policy_evaluator: None,
881            resource_policy_provider: None,
882            scp_resolver: None,
883        }
884    }
885}
886
887/// Extract the 12-digit account ID segment from an AWS ARN.
888///
889/// ARNs follow `arn:<partition>:<service>:<region>:<account>:<resource>`.
890/// Identifies routes that opt into streaming request bodies. Returns
891/// `Some((service, action_hint))` when the dispatch path should hand
892/// the raw body to the service handler unbuffered, otherwise `None`
893/// for the default buffered path. The handler reads the stream via
894/// [`crate::service::AwsRequest::take_body_stream`].
895///
896/// Streaming-eligible routes today:
897///
898/// * `s3` PUT object — `PUT /<bucket>/<key>` with a SigV4 (or
899///   presigned) auth header. Covers PutObject, UploadPart, and
900///   UploadPartCopy. The S3 service spills to disk via
901///   [`fakecloud_persistence::BodySource::File`] when the stream is
902///   present.
903/// * `ecr` OCI Distribution v2 blob upload — `PATCH` and `PUT` on
904///   `/v2/{name}/blobs/uploads/{uuid}`. The ECR service spools the
905///   stream into a per-upload temp file before computing the digest.
906fn streaming_route(
907    method: &http::Method,
908    path: &str,
909    headers: &http::HeaderMap,
910    query_params: &HashMap<String, String>,
911) -> Option<(&'static str, &'static str)> {
912    // ECR OCI v2 blob upload (PATCH chunk + final PUT).
913    if (method == http::Method::PATCH || method == http::Method::PUT)
914        && path.starts_with("/v2/")
915        && path.contains("/blobs/uploads/")
916    {
917        return Some(("ecr", ""));
918    }
919
920    // S3 PutObject / UploadPart / UploadPartCopy. Detect either via
921    // SigV4 service field in the Authorization header OR via a SigV4
922    // presigned URL (X-Amz-Credential .../s3/...) OR a SigV2 presigned
923    // URL (AWSAccessKeyId + Signature + Expires query parameters).
924    if method == http::Method::PUT {
925        let after = path.trim_start_matches('/');
926        // Path-style PutObject is `PUT /<bucket>/<key>` (path contains a
927        // slash); virtual-hosted-style is `PUT /<key>` with the bucket
928        // in the Host header. For virtual-hosted, accept any non-empty
929        // path so the key flows through the streaming dispatch — the
930        // Host parser already routed this request to S3.
931        let virtual_hosted_s3 = protocol::parse_routing_host_from_headers(headers)
932            .filter(|h| h.service == "s3" && h.bucket.is_some())
933            .is_some();
934        if after.is_empty() || (!virtual_hosted_s3 && !after.contains('/')) {
935            return None;
936        }
937        let header_s3 = headers
938            .get("authorization")
939            .and_then(|v| v.to_str().ok())
940            .and_then(fakecloud_aws::sigv4::parse_sigv4)
941            .map(|info| info.service == "s3")
942            .unwrap_or(false);
943        let presigned_v4_s3 = query_params
944            .get("X-Amz-Credential")
945            .and_then(|c| c.split('/').nth(3).map(|s| s.to_string()))
946            .map(|service| service == "s3")
947            .unwrap_or(false);
948        let presigned_v2 = query_params.contains_key("AWSAccessKeyId")
949            && query_params.contains_key("Signature")
950            && query_params.contains_key("Expires");
951        if header_s3 || presigned_v4_s3 || presigned_v2 {
952            return Some(("s3", ""));
953        }
954    }
955
956    None
957}
958
959/// Default request-body buffering cap. fakecloud reads the entire
960/// request body into memory before handing it to a service handler,
961/// so this ceiling caps RAM usage per in-flight request.
962///
963/// Default 1 GiB — comfortably above legitimate single S3 PutObject
964/// payloads (AWS recommends multipart above ~100 MiB) and each
965/// multipart part dispatches through here separately. Override with
966/// `FAKECLOUD_MAX_REQUEST_BODY_BYTES` (decimal bytes) when running
967/// stress tests that push past the default.
968const DEFAULT_MAX_REQUEST_BODY_BYTES: usize = 1024 * 1024 * 1024;
969
970fn max_request_body_bytes() -> usize {
971    static CACHED: std::sync::OnceLock<usize> = std::sync::OnceLock::new();
972    *CACHED.get_or_init(|| {
973        std::env::var("FAKECLOUD_MAX_REQUEST_BODY_BYTES")
974            .ok()
975            .and_then(|s| s.parse::<usize>().ok())
976            .filter(|&n| n > 0)
977            .unwrap_or(DEFAULT_MAX_REQUEST_BODY_BYTES)
978    })
979}
980
981/// For the cross-account decision in IAM enforcement, the "resource
982/// account" is the ARN's account segment. Some services (notably S3)
983/// produce ARNs with an empty account field — for those we return
984/// `None` and let the caller fall back to the server's configured
985/// account ID. Malformed or non-ARN strings also return `None`.
986fn parse_account_from_arn(arn: &str) -> Option<String> {
987    let mut parts = arn.splitn(6, ':');
988    if parts.next()? != "arn" {
989        return None;
990    }
991    let _partition = parts.next()?;
992    let _service = parts.next()?;
993    let _region = parts.next()?;
994    let account = parts.next()?;
995    // Resource segment must exist (parts.next().is_some()) for the ARN
996    // to be well-formed, but we don't consume its value here.
997    parts.next()?;
998    if account.is_empty() {
999        None
1000    } else {
1001        Some(account.to_string())
1002    }
1003}
1004
1005/// Extract region from User-Agent header suffix `region/<region>`.
1006fn extract_region_from_user_agent(headers: &http::HeaderMap) -> Option<String> {
1007    let ua = headers.get("user-agent")?.to_str().ok()?;
1008    for part in ua.split_whitespace() {
1009        if let Some(region) = part.strip_prefix("region/") {
1010            if !region.is_empty() {
1011                return Some(region.to_string());
1012            }
1013        }
1014    }
1015    None
1016}
1017
1018fn build_error_response(
1019    status: StatusCode,
1020    code: &str,
1021    message: &str,
1022    request_id: &str,
1023    protocol: AwsProtocol,
1024) -> Response<Body> {
1025    build_error_response_with_fields(status, code, message, request_id, protocol, &[])
1026}
1027
1028fn build_error_response_with_fields(
1029    status: StatusCode,
1030    code: &str,
1031    message: &str,
1032    request_id: &str,
1033    protocol: AwsProtocol,
1034    extra_fields: &[(String, String)],
1035) -> Response<Body> {
1036    let (status, content_type, body) = match protocol {
1037        AwsProtocol::Query => {
1038            fakecloud_aws::error::xml_error_response(status, code, message, request_id)
1039        }
1040        AwsProtocol::Rest => fakecloud_aws::error::s3_xml_error_response_with_fields(
1041            status,
1042            code,
1043            message,
1044            request_id,
1045            extra_fields,
1046        ),
1047        AwsProtocol::Json | AwsProtocol::RestJson => {
1048            fakecloud_aws::error::json_error_response(status, code, message)
1049        }
1050    };
1051
1052    // S3 (and other REST-XML services) place the error code in
1053    // `x-amz-error-code` so HEAD responses — which HTTP forbids from
1054    // carrying a body — still surface the code. AWS SDKs read this header
1055    // when the body is empty. Emit it on every error response so HEAD,
1056    // OPTIONS, and any client that strips the body still see the code.
1057    // Backend errors regularly include newlines (multi-line stderr from
1058    // docker/podman/etc.); HTTP header values reject control characters,
1059    // so sanitize before insertion or the builder rejects the response
1060    // and the connection drops.
1061    let safe_code = sanitize_header_value(code);
1062    let safe_message = sanitize_header_value(message);
1063    let mut builder = Response::builder()
1064        .status(status)
1065        .header("content-type", content_type)
1066        .header("x-amzn-requestid", request_id)
1067        .header("x-amz-request-id", request_id);
1068    if let Ok(v) = http::HeaderValue::from_str(&safe_code) {
1069        builder = builder.header("x-amz-error-code", v);
1070    }
1071    if let Ok(v) = http::HeaderValue::from_str(&safe_message) {
1072        builder = builder.header("x-amz-error-message", v);
1073    }
1074    builder.body(Body::from(body)).unwrap_or_else(|_| {
1075        // Builder only fails if a header is invalid; we sanitized the two
1076        // we control, so the remaining ones (content-type, request id) are
1077        // ASCII and safe. This fallback exists purely so we never panic.
1078        Response::new(Body::empty())
1079    })
1080}
1081
1082/// Strip characters that HTTP header values reject (control bytes, CR/LF/TAB)
1083/// and truncate to a length that AWS SDKs handle cleanly. Backend tools
1084/// (docker, podman, kubectl, …) emit multi-line stderr, and forwarding that
1085/// raw into `x-amz-error-message` previously panicked the dispatcher.
1086fn sanitize_header_value(s: &str) -> String {
1087    const MAX_LEN: usize = 1024;
1088    let mut out = String::with_capacity(s.len().min(MAX_LEN));
1089    for ch in s.chars() {
1090        if out.len() >= MAX_LEN {
1091            break;
1092        }
1093        // Header values forbid CR, LF, and other control bytes (RFC 9110).
1094        // Replace with a single space so multi-line messages stay readable.
1095        if ch.is_control() {
1096            if !out.ends_with(' ') {
1097                out.push(' ');
1098            }
1099        } else {
1100            out.push(ch);
1101        }
1102    }
1103    out.trim().to_string()
1104}
1105
1106/// Build the [`ConditionContext`] passed to the IAM evaluator for one
1107/// request. Populates the 10 global condition keys from the resolved
1108/// principal + the HTTP request. Service-specific keys are deferred to
1109/// a follow-up batch and left empty.
1110/// For an unsigned request that no other detection rule claimed, return the
1111/// bucket name when the first path segment names an existing S3 bucket.
1112///
1113/// fakecloud serves every service from one endpoint, so an anonymous
1114/// path-style S3 request (`GET /bucket/key`, no SigV4) is indistinguishable
1115/// from an API Gateway execute-api call by headers alone. Bucket existence is
1116/// the disambiguator: if the segment is a real bucket, route to S3; otherwise
1117/// fall through to the apigateway catch-all. Uses the already-wired
1118/// `resource_policy_provider`, which resolves S3 bucket ownership from state
1119/// (`Some` => the bucket exists). Returns `None` when no provider is wired.
1120/// Recover the access key from a SigV2 presigned URL. AWS SigV2 presigning
1121/// puts the key in the `AWSAccessKeyId` query parameter alongside `Signature`
1122/// and `Expires`; all three must be present for the URL to be a SigV2 presign.
1123/// Returns None for SigV4 presigns (which use `X-Amz-Credential`) or unsigned
1124/// requests.
1125fn sigv2_presigned_access_key(query_params: &HashMap<String, String>) -> Option<String> {
1126    if query_params.contains_key("Signature") && query_params.contains_key("Expires") {
1127        query_params.get("AWSAccessKeyId").cloned()
1128    } else {
1129        None
1130    }
1131}
1132
1133fn anonymous_s3_bucket(uri: &http::Uri, config: &DispatchConfig) -> Option<String> {
1134    let provider = config.resource_policy_provider.as_ref()?;
1135    let segment = uri.path().split('/').find(|s| !s.is_empty())?.to_string();
1136    let arn = format!("arn:aws:s3:::{segment}");
1137    provider.resource_owner_account("s3", &arn).map(|_| segment)
1138}
1139
1140fn build_condition_context(
1141    principal: &Principal,
1142    remote_addr: Option<SocketAddr>,
1143    region: &str,
1144    secure_transport: bool,
1145) -> ConditionContext {
1146    let now = chrono::Utc::now();
1147    ConditionContext {
1148        aws_username: aws_username_from_principal(principal),
1149        aws_userid: Some(principal.user_id.clone()),
1150        aws_principal_arn: Some(principal.arn.clone()),
1151        aws_principal_account: Some(principal.account_id.clone()),
1152        aws_principal_type: Some(principal_type_label(principal.principal_type).to_string()),
1153        aws_source_ip: remote_addr.map(|sa| sa.ip()),
1154        aws_current_time: Some(now),
1155        aws_epoch_time: Some(now.timestamp()),
1156        aws_secure_transport: Some(secure_transport),
1157        aws_requested_region: Some(region.to_string()),
1158        // F3 keys: populated from the caller's session context when STS
1159        // mints credentials with MFA / SAML / OIDC / VPC-endpoint hints.
1160        // Default-None here so tests/dispatch sites that don't set them
1161        // safe-fail any policy referencing them — matching AWS for keys
1162        // that aren't asserted.
1163        aws_mfa_present: None,
1164        aws_mfa_age_seconds: None,
1165        aws_called_via: Vec::new(),
1166        aws_source_vpce: None,
1167        aws_source_vpc: None,
1168        aws_vpc_source_ip: None,
1169        aws_federated_provider: None,
1170        aws_token_issue_time: None,
1171        service_keys: Default::default(),
1172        resource_tags: None,
1173        request_tags: None,
1174        principal_tags: None,
1175    }
1176}
1177
1178/// `aws:username` is only set for IAM users, matching AWS. For assumed
1179/// roles, federated users, root, and unknown principals the key is
1180/// absent — operators that reference it without `IfExists` safe-fail.
1181fn aws_username_from_principal(principal: &Principal) -> Option<String> {
1182    if principal.principal_type != PrincipalType::User {
1183        return None;
1184    }
1185    let after = principal.arn.rsplit_once(":user/").map(|(_, s)| s)?;
1186    // Strip any IAM path prefix; bare username is the last segment.
1187    Some(after.rsplit('/').next().unwrap_or(after).to_string())
1188}
1189
1190/// AWS's `aws:PrincipalType` uses PascalCase identifiers, distinct from
1191/// the lowercase ones [`PrincipalType::as_str`] returns for ARNs.
1192fn principal_type_label(t: PrincipalType) -> &'static str {
1193    match t {
1194        PrincipalType::User => "User",
1195        PrincipalType::AssumedRole => "AssumedRole",
1196        PrincipalType::FederatedUser => "FederatedUser",
1197        PrincipalType::Root => "Account",
1198        PrincipalType::Unknown => "Unknown",
1199    }
1200}
1201
1202/// Best-effort detection of TLS-terminated requests. Direct HTTPS
1203/// connections are not yet supported by the fakecloud server (it speaks
1204/// plain HTTP), so the only signal is an `x-forwarded-proto: https`
1205/// header set by an upstream proxy. Anything else evaluates to `false`,
1206/// which matches the typical local-dev setup.
1207fn is_secure_transport(headers: &http::HeaderMap) -> bool {
1208    headers
1209        .get("x-forwarded-proto")
1210        .and_then(|v| v.to_str().ok())
1211        .map(|s| s.eq_ignore_ascii_case("https"))
1212        .unwrap_or(false)
1213}
1214
1215trait ProtocolExt {
1216    fn error_status(&self) -> StatusCode;
1217}
1218
1219impl ProtocolExt for AwsProtocol {
1220    fn error_status(&self) -> StatusCode {
1221        StatusCode::BAD_REQUEST
1222    }
1223}
1224
1225#[cfg(test)]
1226mod tests {
1227    use super::*;
1228
1229    #[test]
1230    fn default_max_request_body_bytes_is_one_gib() {
1231        // Without the env override, the cap defaults to 1 GiB. The
1232        // public function caches via OnceLock so only the first call
1233        // in the process matters; we assert the constant directly.
1234        assert_eq!(DEFAULT_MAX_REQUEST_BODY_BYTES, 1024 * 1024 * 1024);
1235    }
1236
1237    #[test]
1238    fn sigv2_presigned_access_key_extracted_with_signature_and_expires() {
1239        let mut q = HashMap::new();
1240        q.insert("AWSAccessKeyId".to_string(), "AKIAEXAMPLE".to_string());
1241        q.insert("Signature".to_string(), "abc%2Bdef".to_string());
1242        q.insert("Expires".to_string(), "1700000000".to_string());
1243        assert_eq!(
1244            sigv2_presigned_access_key(&q).as_deref(),
1245            Some("AKIAEXAMPLE")
1246        );
1247    }
1248
1249    #[test]
1250    fn sigv2_presigned_access_key_none_without_signature_or_expires() {
1251        // AWSAccessKeyId alone (e.g. a stray query param) is not a SigV2
1252        // presign and must not be treated as a credential.
1253        let mut q = HashMap::new();
1254        q.insert("AWSAccessKeyId".to_string(), "AKIAEXAMPLE".to_string());
1255        assert_eq!(sigv2_presigned_access_key(&q), None);
1256
1257        q.insert("Expires".to_string(), "1700000000".to_string());
1258        assert_eq!(
1259            sigv2_presigned_access_key(&q),
1260            None,
1261            "missing Signature must not qualify"
1262        );
1263    }
1264
1265    #[test]
1266    fn sigv2_presigned_access_key_none_for_unsigned_request() {
1267        assert_eq!(sigv2_presigned_access_key(&HashMap::new()), None);
1268    }
1269
1270    #[test]
1271    fn dispatch_config_new_defaults_to_off() {
1272        let cfg = DispatchConfig::new("us-east-1", "123456789012");
1273        assert_eq!(cfg.region, "us-east-1");
1274        assert_eq!(cfg.account_id, "123456789012");
1275        assert!(!cfg.verify_sigv4);
1276        assert_eq!(cfg.iam_mode, IamMode::Off);
1277    }
1278
1279    #[test]
1280    fn aws_username_strips_iam_path_for_users() {
1281        let p = Principal {
1282            arn: "arn:aws:iam::123456789012:user/engineering/alice".into(),
1283            user_id: "AIDAALICE".into(),
1284            account_id: "123456789012".into(),
1285            principal_type: PrincipalType::User,
1286            source_identity: None,
1287            tags: None,
1288        };
1289        assert_eq!(aws_username_from_principal(&p), Some("alice".into()));
1290    }
1291
1292    #[test]
1293    fn aws_username_unset_for_assumed_role() {
1294        let p = Principal {
1295            arn: "arn:aws:sts::123456789012:assumed-role/ops/session".into(),
1296            user_id: "AROAOPS:session".into(),
1297            account_id: "123456789012".into(),
1298            principal_type: PrincipalType::AssumedRole,
1299            source_identity: None,
1300            tags: None,
1301        };
1302        assert_eq!(aws_username_from_principal(&p), None);
1303    }
1304
1305    #[test]
1306    fn principal_type_label_matches_aws_casing() {
1307        assert_eq!(principal_type_label(PrincipalType::User), "User");
1308        assert_eq!(
1309            principal_type_label(PrincipalType::AssumedRole),
1310            "AssumedRole"
1311        );
1312        assert_eq!(principal_type_label(PrincipalType::Root), "Account");
1313    }
1314
1315    #[test]
1316    fn build_condition_context_populates_global_keys() {
1317        let p = Principal {
1318            arn: "arn:aws:iam::123456789012:user/alice".into(),
1319            user_id: "AIDAALICE".into(),
1320            account_id: "123456789012".into(),
1321            principal_type: PrincipalType::User,
1322            source_identity: None,
1323            tags: None,
1324        };
1325        let addr: SocketAddr = "10.0.0.1:54321".parse().unwrap();
1326        let ctx = build_condition_context(&p, Some(addr), "us-east-1", false);
1327        assert_eq!(ctx.aws_username.as_deref(), Some("alice"));
1328        assert_eq!(ctx.aws_userid.as_deref(), Some("AIDAALICE"));
1329        assert_eq!(
1330            ctx.aws_principal_arn.as_deref(),
1331            Some("arn:aws:iam::123456789012:user/alice")
1332        );
1333        assert_eq!(ctx.aws_principal_account.as_deref(), Some("123456789012"));
1334        assert_eq!(ctx.aws_principal_type.as_deref(), Some("User"));
1335        assert_eq!(
1336            ctx.aws_source_ip.map(|i| i.to_string()).as_deref(),
1337            Some("10.0.0.1")
1338        );
1339        assert_eq!(ctx.aws_requested_region.as_deref(), Some("us-east-1"));
1340        assert_eq!(ctx.aws_secure_transport, Some(false));
1341        assert!(ctx.aws_current_time.is_some());
1342        assert!(ctx.aws_epoch_time.is_some());
1343    }
1344
1345    #[test]
1346    fn is_secure_transport_reads_x_forwarded_proto() {
1347        let mut headers = http::HeaderMap::new();
1348        headers.insert("x-forwarded-proto", "https".parse().unwrap());
1349        assert!(is_secure_transport(&headers));
1350        headers.insert("x-forwarded-proto", "http".parse().unwrap());
1351        assert!(!is_secure_transport(&headers));
1352        let empty = http::HeaderMap::new();
1353        assert!(!is_secure_transport(&empty));
1354    }
1355
1356    #[test]
1357    fn parse_account_from_arn_extracts_standard_shapes() {
1358        assert_eq!(
1359            parse_account_from_arn("arn:aws:sqs:us-east-1:123456789012:queue"),
1360            Some("123456789012".to_string())
1361        );
1362        assert_eq!(
1363            parse_account_from_arn("arn:aws:iam::123456789012:user/alice"),
1364            Some("123456789012".to_string())
1365        );
1366    }
1367
1368    #[test]
1369    fn parse_account_from_arn_returns_none_for_s3_empty_account() {
1370        // S3 ARNs have both region and account empty.
1371        assert_eq!(parse_account_from_arn("arn:aws:s3:::my-bucket"), None);
1372        assert_eq!(
1373            parse_account_from_arn("arn:aws:s3:::my-bucket/path/to/key"),
1374            None
1375        );
1376    }
1377
1378    #[test]
1379    fn parse_account_from_arn_returns_none_for_malformed() {
1380        assert_eq!(parse_account_from_arn(""), None);
1381        assert_eq!(parse_account_from_arn("not-an-arn"), None);
1382        assert_eq!(parse_account_from_arn("arn:aws:sqs:us-east-1"), None);
1383        assert_eq!(parse_account_from_arn("arn:aws:sqs"), None);
1384    }
1385
1386    #[test]
1387    fn extract_region_from_user_agent_finds_region_segment() {
1388        let mut headers = http::HeaderMap::new();
1389        headers.insert(
1390            "user-agent",
1391            "aws-sdk-rust/1.0 os/linux region/eu-central-1"
1392                .parse()
1393                .unwrap(),
1394        );
1395        assert_eq!(
1396            extract_region_from_user_agent(&headers),
1397            Some("eu-central-1".to_string())
1398        );
1399    }
1400
1401    #[test]
1402    fn extract_region_from_user_agent_none_without_header() {
1403        let headers = http::HeaderMap::new();
1404        assert_eq!(extract_region_from_user_agent(&headers), None);
1405    }
1406
1407    #[test]
1408    fn extract_region_from_user_agent_ignores_empty_region() {
1409        let mut headers = http::HeaderMap::new();
1410        headers.insert("user-agent", "aws-sdk-java region/".parse().unwrap());
1411        assert_eq!(extract_region_from_user_agent(&headers), None);
1412    }
1413
1414    #[test]
1415    fn extract_region_from_user_agent_none_when_no_region_marker() {
1416        let mut headers = http::HeaderMap::new();
1417        headers.insert("user-agent", "curl/7.79.1".parse().unwrap());
1418        assert_eq!(extract_region_from_user_agent(&headers), None);
1419    }
1420
1421    #[test]
1422    fn aws_username_none_for_root() {
1423        let p = Principal {
1424            arn: "arn:aws:iam::123456789012:root".into(),
1425            user_id: "123456789012".into(),
1426            account_id: "123456789012".into(),
1427            principal_type: PrincipalType::Root,
1428            source_identity: None,
1429            tags: None,
1430        };
1431        assert_eq!(aws_username_from_principal(&p), None);
1432    }
1433
1434    #[test]
1435    fn aws_username_bare_no_path() {
1436        let p = Principal {
1437            arn: "arn:aws:iam::123456789012:user/bob".into(),
1438            user_id: "AIDABOB".into(),
1439            account_id: "123456789012".into(),
1440            principal_type: PrincipalType::User,
1441            source_identity: None,
1442            tags: None,
1443        };
1444        assert_eq!(aws_username_from_principal(&p), Some("bob".into()));
1445    }
1446
1447    #[test]
1448    fn principal_type_label_covers_federated_and_unknown() {
1449        assert_eq!(
1450            principal_type_label(PrincipalType::FederatedUser),
1451            "FederatedUser"
1452        );
1453        assert_eq!(principal_type_label(PrincipalType::Unknown), "Unknown");
1454    }
1455
1456    #[test]
1457    fn build_condition_context_marks_secure_when_flag_set() {
1458        let p = Principal {
1459            arn: "arn:aws:iam::123456789012:user/alice".into(),
1460            user_id: "AIDAALICE".into(),
1461            account_id: "123456789012".into(),
1462            principal_type: PrincipalType::User,
1463            source_identity: None,
1464            tags: None,
1465        };
1466        let ctx = build_condition_context(&p, None, "us-west-2", true);
1467        assert_eq!(ctx.aws_secure_transport, Some(true));
1468        assert!(ctx.aws_source_ip.is_none());
1469        assert_eq!(ctx.aws_requested_region.as_deref(), Some("us-west-2"));
1470    }
1471
1472    #[test]
1473    fn is_secure_transport_case_insensitive() {
1474        let mut headers = http::HeaderMap::new();
1475        headers.insert("x-forwarded-proto", "HTTPS".parse().unwrap());
1476        assert!(is_secure_transport(&headers));
1477    }
1478
1479    #[test]
1480    fn is_secure_transport_non_ascii_bytes_false() {
1481        let mut headers = http::HeaderMap::new();
1482        headers.insert(
1483            "x-forwarded-proto",
1484            http::HeaderValue::from_bytes(&[0xFF, 0xFE]).unwrap(),
1485        );
1486        assert!(!is_secure_transport(&headers));
1487    }
1488
1489    #[test]
1490    fn protocol_ext_error_status_is_bad_request() {
1491        assert_eq!(AwsProtocol::Query.error_status(), StatusCode::BAD_REQUEST);
1492        assert_eq!(AwsProtocol::Json.error_status(), StatusCode::BAD_REQUEST);
1493        assert_eq!(AwsProtocol::Rest.error_status(), StatusCode::BAD_REQUEST);
1494        assert_eq!(
1495            AwsProtocol::RestJson.error_status(),
1496            StatusCode::BAD_REQUEST
1497        );
1498    }
1499
1500    #[test]
1501    fn build_error_response_json_has_json_content_type() {
1502        let resp = build_error_response(
1503            StatusCode::BAD_REQUEST,
1504            "TestCode",
1505            "test msg",
1506            "req-1",
1507            AwsProtocol::Json,
1508        );
1509        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1510        let ct = resp
1511            .headers()
1512            .get("content-type")
1513            .unwrap()
1514            .to_str()
1515            .unwrap();
1516        assert!(ct.contains("json"));
1517        let rid = resp
1518            .headers()
1519            .get("x-amzn-requestid")
1520            .unwrap()
1521            .to_str()
1522            .unwrap();
1523        assert_eq!(rid, "req-1");
1524    }
1525
1526    #[test]
1527    fn build_error_response_rest_returns_xml_content_type() {
1528        let resp = build_error_response(
1529            StatusCode::NOT_FOUND,
1530            "NoSuchBucket",
1531            "bucket missing",
1532            "req-2",
1533            AwsProtocol::Rest,
1534        );
1535        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
1536        let ct = resp
1537            .headers()
1538            .get("content-type")
1539            .unwrap()
1540            .to_str()
1541            .unwrap();
1542        assert!(ct.contains("xml"));
1543    }
1544
1545    #[test]
1546    fn build_error_response_query_returns_xml() {
1547        let resp = build_error_response(
1548            StatusCode::BAD_REQUEST,
1549            "InvalidParameter",
1550            "bad param",
1551            "req-3",
1552            AwsProtocol::Query,
1553        );
1554        let ct = resp
1555            .headers()
1556            .get("content-type")
1557            .unwrap()
1558            .to_str()
1559            .unwrap();
1560        assert!(ct.contains("xml"));
1561    }
1562
1563    /// Regression for issue #1539: multi-line backend errors (e.g. podman
1564    /// stderr) used to panic the dispatcher when stuffed into the
1565    /// `x-amz-error-message` HTTP header. The response must build cleanly
1566    /// and the header value must not contain control characters.
1567    #[test]
1568    fn build_error_response_with_multiline_message_does_not_panic() {
1569        let resp = build_error_response(
1570            StatusCode::INTERNAL_SERVER_ERROR,
1571            "ServiceException",
1572            "Lambda execution failed: container failed to start: docker start failed: \
1573             Error: unable to start container \"abc\": \
1574             failed to create new hosts file:\nhost-gateway is empty\n",
1575            "req-multi",
1576            AwsProtocol::Json,
1577        );
1578        assert_eq!(resp.status(), StatusCode::INTERNAL_SERVER_ERROR);
1579        let msg = resp
1580            .headers()
1581            .get("x-amz-error-message")
1582            .expect("x-amz-error-message must be set even when input contains newlines")
1583            .to_str()
1584            .unwrap();
1585        assert!(!msg.contains('\n'));
1586        assert!(!msg.contains('\r'));
1587        assert!(msg.contains("Lambda execution failed"));
1588        assert!(msg.contains("host-gateway is empty"));
1589    }
1590
1591    #[test]
1592    fn build_error_response_with_control_chars_strips_them() {
1593        let resp = build_error_response(
1594            StatusCode::BAD_REQUEST,
1595            "Code\twith\ttabs",
1596            "msg\x00with\x01nulls",
1597            "req-ctrl",
1598            AwsProtocol::Json,
1599        );
1600        let code = resp
1601            .headers()
1602            .get("x-amz-error-code")
1603            .unwrap()
1604            .to_str()
1605            .unwrap();
1606        let msg = resp
1607            .headers()
1608            .get("x-amz-error-message")
1609            .unwrap()
1610            .to_str()
1611            .unwrap();
1612        assert!(!code.contains('\t'));
1613        assert!(!msg.contains('\x00'));
1614        assert!(!msg.contains('\x01'));
1615    }
1616
1617    #[test]
1618    fn sanitize_header_value_truncates_long_input() {
1619        let huge = "x".repeat(5_000);
1620        let out = sanitize_header_value(&huge);
1621        assert!(out.len() <= 1024);
1622    }
1623
1624    #[test]
1625    fn sanitize_header_value_collapses_consecutive_control_runs() {
1626        let out = sanitize_header_value("a\n\n\n\rb");
1627        assert_eq!(out, "a b");
1628    }
1629
1630    #[test]
1631    fn dispatch_config_carries_opt_in_flags() {
1632        let cfg = DispatchConfig {
1633            region: "eu-west-1".to_string(),
1634            account_id: "000000000000".to_string(),
1635            verify_sigv4: true,
1636            iam_mode: IamMode::Strict,
1637            credential_resolver: None,
1638            policy_evaluator: None,
1639            resource_policy_provider: None,
1640            scp_resolver: None,
1641        };
1642        assert!(cfg.verify_sigv4);
1643        assert!(cfg.iam_mode.is_strict());
1644        assert!(cfg.resource_policy_provider.is_none());
1645        assert!(cfg.scp_resolver.is_none());
1646    }
1647
1648    fn s3_sigv4_headers() -> http::HeaderMap {
1649        let mut headers = http::HeaderMap::new();
1650        headers.insert(
1651            "authorization",
1652            "AWS4-HMAC-SHA256 Credential=test/20240101/us-east-1/s3/aws4_request, \
1653             SignedHeaders=host, Signature=fake"
1654                .parse()
1655                .unwrap(),
1656        );
1657        headers
1658    }
1659
1660    #[test]
1661    fn streaming_route_path_style_s3_put_object() {
1662        let headers = s3_sigv4_headers();
1663        assert_eq!(
1664            streaming_route(
1665                &http::Method::PUT,
1666                "/my-bucket/key.txt",
1667                &headers,
1668                &HashMap::new(),
1669            ),
1670            Some(("s3", "")),
1671        );
1672    }
1673
1674    #[test]
1675    fn streaming_route_path_style_create_bucket_skipped() {
1676        // `PUT /bucket` (no trailing key) is CreateBucket — must NOT
1677        // hit the streaming path.
1678        let headers = s3_sigv4_headers();
1679        assert_eq!(
1680            streaming_route(&http::Method::PUT, "/my-bucket", &headers, &HashMap::new(),),
1681            None,
1682        );
1683    }
1684
1685    #[test]
1686    fn streaming_route_virtual_hosted_s3_put_object() {
1687        let mut headers = s3_sigv4_headers();
1688        headers.insert(
1689            "host",
1690            "vhost-bucket.s3.us-east-1.localhost.localstack.cloud:4566"
1691                .parse()
1692                .unwrap(),
1693        );
1694        // Virtual-hosted PUT has no bucket in the URL path (`/<key>`),
1695        // so the slash check used for path-style would reject it. The
1696        // Host parser confirms this is virtual-hosted S3 and the key
1697        // flows through the streaming dispatch.
1698        assert_eq!(
1699            streaming_route(&http::Method::PUT, "/hello.txt", &headers, &HashMap::new(),),
1700            Some(("s3", "")),
1701        );
1702    }
1703
1704    #[test]
1705    fn streaming_route_virtual_hosted_s3_root_skipped() {
1706        // `PUT /` against a virtual-hosted Host = CreateBucket, which
1707        // is handled buffered. Empty path-after-slash must short-circuit.
1708        let mut headers = s3_sigv4_headers();
1709        headers.insert(
1710            "host",
1711            "vhost-bucket.s3.us-east-1.localhost.localstack.cloud:4566"
1712                .parse()
1713                .unwrap(),
1714        );
1715        assert_eq!(
1716            streaming_route(&http::Method::PUT, "/", &headers, &HashMap::new()),
1717            None,
1718        );
1719    }
1720
1721    #[test]
1722    fn streaming_route_ecr_blob_upload() {
1723        let headers = http::HeaderMap::new();
1724        assert_eq!(
1725            streaming_route(
1726                &http::Method::PATCH,
1727                "/v2/my-repo/blobs/uploads/abcd1234",
1728                &headers,
1729                &HashMap::new(),
1730            ),
1731            Some(("ecr", "")),
1732        );
1733        assert_eq!(
1734            streaming_route(
1735                &http::Method::PUT,
1736                "/v2/my-repo/blobs/uploads/abcd1234",
1737                &headers,
1738                &HashMap::new(),
1739            ),
1740            Some(("ecr", "")),
1741        );
1742    }
1743
1744    #[test]
1745    fn streaming_route_presigned_v4_s3_put() {
1746        let headers = http::HeaderMap::new();
1747        let mut query_params = HashMap::new();
1748        query_params.insert(
1749            "X-Amz-Credential".to_string(),
1750            "test/20240101/us-east-1/s3/aws4_request".to_string(),
1751        );
1752        assert_eq!(
1753            streaming_route(
1754                &http::Method::PUT,
1755                "/my-bucket/key.txt",
1756                &headers,
1757                &query_params,
1758            ),
1759            Some(("s3", "")),
1760        );
1761    }
1762
1763    #[test]
1764    fn streaming_route_non_s3_auth_header_skipped() {
1765        // Same path shape but the SigV4 service is lambda — must not
1766        // wire the streaming dispatch (Lambda has its own buffered path).
1767        let mut headers = http::HeaderMap::new();
1768        headers.insert(
1769            "authorization",
1770            "AWS4-HMAC-SHA256 Credential=test/20240101/us-east-1/lambda/aws4_request, \
1771             SignedHeaders=host, Signature=fake"
1772                .parse()
1773                .unwrap(),
1774        );
1775        assert_eq!(
1776            streaming_route(
1777                &http::Method::PUT,
1778                "/my-bucket/key.txt",
1779                &headers,
1780                &HashMap::new(),
1781            ),
1782            None,
1783        );
1784    }
1785
1786    #[test]
1787    fn streaming_route_get_skipped() {
1788        let headers = s3_sigv4_headers();
1789        assert_eq!(
1790            streaming_route(
1791                &http::Method::GET,
1792                "/my-bucket/key.txt",
1793                &headers,
1794                &HashMap::new(),
1795            ),
1796            None,
1797        );
1798    }
1799}