Skip to main content

fakecloud_core/
dispatch.rs

1use axum::body::Body;
2use axum::extract::{ConnectInfo, Extension, Query};
3use axum::http::{Request, StatusCode};
4use axum::response::Response;
5use bytes::Bytes;
6use std::collections::HashMap;
7use std::net::SocketAddr;
8use std::sync::Arc;
9
10use crate::auth::{
11    is_root_bypass, ConditionContext, CredentialResolver, IamMode, IamPolicyEvaluator, Principal,
12    PrincipalType, ResourcePolicyProvider,
13};
14use crate::protocol::{self, AwsProtocol};
15use crate::registry::ServiceRegistry;
16use crate::service::{AwsRequest, ResponseBody};
17
18/// The main dispatch handler. All HTTP requests come through here.
19pub async fn dispatch(
20    ConnectInfo(remote_addr): ConnectInfo<SocketAddr>,
21    Extension(registry): Extension<Arc<ServiceRegistry>>,
22    Extension(config): Extension<Arc<DispatchConfig>>,
23    Query(query_params): Query<HashMap<String, String>>,
24    request: Request<Body>,
25) -> Response<Body> {
26    let remote_addr = Some(remote_addr);
27    let request_id = uuid::Uuid::new_v4().to_string();
28
29    let (parts, body) = request.into_parts();
30
31    // Streaming opt-in: if the route is a known large-body S3 / ECR
32    // upload, we skip the buffered `to_bytes` step entirely and hand
33    // the raw body to the service handler. The handler spills it to
34    // disk on the fly. Header-only detection covers every streaming
35    // candidate (none of them rely on form-body sniffing).
36    let stream_route = streaming_route(
37        &parts.method,
38        parts.uri.path(),
39        &parts.headers,
40        &query_params,
41    );
42    let header_only = protocol::detect_service_headers_only(&parts.headers, &query_params);
43    let stream_dispatch = match (&stream_route, &header_only) {
44        // Header-only detection agrees with the URL match — covers S3
45        // PUT object (SigV4 service=s3 in Authorization).
46        (Some(sr), Some(detected)) if sr.0 == detected.service => Some(detected.clone()),
47        // ECR OCI v2 blob upload has no AWS auth header; the path
48        // alone (`/v2/.../blobs/uploads/...`) tells us the route is
49        // ECR. Synthesize a DetectedRequest so dispatch picks the
50        // streaming path. Same special-case the buffered branch
51        // applies on detect_service None (see below).
52        (Some((service, _)), None) if *service == "ecr" => Some(protocol::DetectedRequest {
53            service: "ecr".to_string(),
54            action: String::new(),
55            protocol: AwsProtocol::Rest,
56        }),
57        _ => None,
58    };
59
60    let (body_bytes, body_stream) = if stream_dispatch.is_some() {
61        (Bytes::new(), Some(body))
62    } else {
63        // Buffered path: materialize the body into memory under the
64        // configured cap. `FAKECLOUD_MAX_REQUEST_BODY_BYTES` (default
65        // 1 GiB) caps non-streaming requests; streaming routes have no
66        // cap because nothing materializes the entire body in RAM.
67        let max_body_bytes = max_request_body_bytes();
68        match axum::body::to_bytes(body, max_body_bytes).await {
69            Ok(b) => (b, None),
70            Err(_) => {
71                return build_error_response(
72                    StatusCode::PAYLOAD_TOO_LARGE,
73                    "RequestEntityTooLarge",
74                    "Request body too large",
75                    &request_id,
76                    AwsProtocol::Query,
77                );
78            }
79        }
80    };
81
82    // Detect service and action
83    let detected = if let Some(d) = stream_dispatch {
84        d
85    } else {
86        match protocol::detect_service(&parts.headers, &query_params, &body_bytes) {
87            Some(d) => d,
88            None => {
89                // OPTIONS requests (CORS preflight) don't carry Authorization headers.
90                // Route them to S3 since S3 is the only REST service that handles CORS.
91                // Note: API Gateway CORS preflight is not fully supported in this emulator
92                // because we can't distinguish between S3 and API Gateway OPTIONS requests
93                // without additional context (in real AWS, they have different domains).
94                if parts.method == http::Method::OPTIONS {
95                    protocol::DetectedRequest {
96                        service: "s3".to_string(),
97                        action: String::new(),
98                        protocol: AwsProtocol::Rest,
99                    }
100                } else if parts.uri.path() == "/v2" || parts.uri.path().starts_with("/v2/") {
101                    // OCI Distribution v2 protocol. Docker CLI / OCI clients
102                    // use Basic auth (not SigV4) and GET /v2/ with no body,
103                    // so this must be matched before the apigateway fallback.
104                    protocol::DetectedRequest {
105                        service: "ecr".to_string(),
106                        action: String::new(),
107                        protocol: AwsProtocol::Rest,
108                    }
109                } else if !parts.uri.path().starts_with("/_") {
110                    // Requests without AWS auth that don't match any service might be
111                    // API Gateway execute API calls (plain HTTP without signatures).
112                    // Route them to apigateway service which will validate if a matching
113                    // API/stage exists. Skip special FakeCloud endpoints (/_*).
114                    protocol::DetectedRequest {
115                        service: "apigateway".to_string(),
116                        action: String::new(),
117                        protocol: AwsProtocol::RestJson,
118                    }
119                } else {
120                    return build_error_response(
121                        StatusCode::BAD_REQUEST,
122                        "MissingAction",
123                        "Could not determine target service or action from request",
124                        &request_id,
125                        AwsProtocol::Query,
126                    );
127                }
128            }
129        }
130    };
131
132    // Bedrock-agent and bedrock-runtime both send `bedrock` in the SigV4
133    // credential scope, but bedrock-agent has its own service handler.
134    // Disambiguate based on the request path.
135    let detected = if detected.service == "bedrock" {
136        let first_seg = parts.uri.path().split('/').nth(1);
137        if matches!(
138            first_seg,
139            Some(
140                "agents"
141                    | "knowledgebases"
142                    | "flows"
143                    | "prompts"
144                    | "tags"
145                    | "retrieveAndGenerate"
146                    | "retrieveAndGenerateStream"
147                    | "optimize-prompt"
148                    | "sessions"
149                    | "invocations"
150                    | "generate-query"
151                    | "rerank"
152            )
153        ) {
154            // Further disambiguate runtime vs control plane for agents/flows paths
155            let segs: Vec<&str> = parts.uri.path().split('/').collect();
156            let is_runtime = matches!(
157                segs.as_slice(),
158                ["", "agents", _, "agentAliases", _, ..]  // InvokeAgent
159                    | ["", "flows", _, "aliases", _]   // InvokeFlow
160                    | ["", "knowledgebases", _, "retrieve"] // Retrieve
161                    | ["", "retrieveAndGenerate"]
162                    | ["", "retrieveAndGenerateStream"]
163                    | ["", "optimize-prompt"]
164                    | ["", "sessions", ..]
165                    | ["", "invocations", ..]
166                    | ["", "generate-query"]
167                    | ["", "rerank"]
168            );
169            if is_runtime {
170                protocol::DetectedRequest {
171                    service: "bedrock-agent-runtime".to_string(),
172                    ..detected
173                }
174            } else {
175                protocol::DetectedRequest {
176                    service: "bedrock-agent".to_string(),
177                    ..detected
178                }
179            }
180        } else {
181            detected
182        }
183    } else {
184        detected
185    };
186
187    // Look up service
188    let service = match registry.get(&detected.service) {
189        Some(s) => s,
190        None => {
191            return build_error_response(
192                detected.protocol.error_status(),
193                "UnknownService",
194                &format!("Service '{}' is not available", detected.service),
195                &request_id,
196                detected.protocol,
197            );
198        }
199    };
200
201    // Extract region and access key from auth header (or presigned query).
202    let auth_header = parts
203        .headers
204        .get("authorization")
205        .and_then(|v| v.to_str().ok())
206        .unwrap_or("");
207    let header_info = fakecloud_aws::sigv4::parse_sigv4(auth_header);
208    let presigned_info = if header_info.is_none() {
209        // Presigned URL: credentials live in the query string.
210        fakecloud_aws::sigv4::parse_sigv4_presigned(&query_params).map(|p| p.as_info())
211    } else {
212        None
213    };
214    let sigv4_info = header_info.or(presigned_info);
215    let access_key_id = sigv4_info.as_ref().map(|info| info.access_key.clone());
216
217    // Host-header routing hint: LocalStack-shaped
218    // `<svc>.<region>.localhost.localstack.cloud[:port]`, real-AWS
219    // `<svc>.<region>.amazonaws.com`, and every S3 virtual-hosted variant
220    // of both. Secondary region source and carries the bucket for
221    // virtual-hosted S3 path rewrite.
222    let host_info = protocol::parse_routing_host_from_headers(&parts.headers);
223
224    let region = sigv4_info
225        .map(|info| info.region)
226        .or_else(|| host_info.as_ref().map(|h| h.region.clone()))
227        .or_else(|| extract_region_from_user_agent(&parts.headers))
228        .unwrap_or_else(|| config.region.clone());
229
230    // Resolve the caller's principal up front so both SigV4 verification
231    // (which needs the secret) and the service handler (which needs the
232    // identity for GetCallerIdentity and IAM enforcement) share a single
233    // lookup. The root-bypass AKID skips resolution entirely — `test`
234    // credentials have no backing identity and must always pass.
235    let caller_akid = access_key_id.as_deref().unwrap_or("");
236    let resolved = if !caller_akid.is_empty() && !is_root_bypass(caller_akid) {
237        config
238            .credential_resolver
239            .as_ref()
240            .and_then(|r| r.resolve(caller_akid))
241    } else {
242        None
243    };
244    let caller_principal = resolved.as_ref().map(|r| r.principal.clone());
245    let caller_session_policies = resolved
246        .as_ref()
247        .map(|r| r.session_policies.clone())
248        .unwrap_or_default();
249
250    // Opt-in SigV4 cryptographic verification. Runs before the service
251    // handler so a failing signature never reaches business logic. The
252    // reserved `test*` root identity short-circuits verification to keep
253    // local-dev workflows frictionless.
254    if config.verify_sigv4 && !is_root_bypass(caller_akid) && config.credential_resolver.is_some() {
255        let amz_date = parts
256            .headers
257            .get("x-amz-date")
258            .and_then(|v| v.to_str().ok());
259        let parsed = fakecloud_aws::sigv4::parse_sigv4_header(auth_header, amz_date)
260            .or_else(|| fakecloud_aws::sigv4::parse_sigv4_presigned(&query_params));
261        let parsed = match parsed {
262            Some(p) => p,
263            None => {
264                return build_error_response(
265                    StatusCode::FORBIDDEN,
266                    "IncompleteSignature",
267                    "Request is missing or has a malformed AWS Signature",
268                    &request_id,
269                    detected.protocol,
270                );
271            }
272        };
273        let resolved_for_verify = match resolved.as_ref() {
274            Some(r) => r,
275            None => {
276                return build_error_response(
277                    StatusCode::FORBIDDEN,
278                    "InvalidClientTokenId",
279                    "The security token included in the request is invalid",
280                    &request_id,
281                    detected.protocol,
282                );
283            }
284        };
285        let headers_vec = fakecloud_aws::sigv4::headers_from_http(&parts.headers);
286        let raw_query_for_verify = parts.uri.query().unwrap_or("").to_string();
287        let verify_req = fakecloud_aws::sigv4::VerifyRequest {
288            method: parts.method.as_str(),
289            path: parts.uri.path(),
290            query: &raw_query_for_verify,
291            headers: &headers_vec,
292            body: &body_bytes,
293        };
294        match fakecloud_aws::sigv4::verify(
295            &parsed,
296            &verify_req,
297            &resolved_for_verify.secret_access_key,
298            chrono::Utc::now(),
299        ) {
300            Ok(()) => {}
301            Err(fakecloud_aws::sigv4::SigV4Error::RequestTimeTooSkewed { .. }) => {
302                return build_error_response(
303                    StatusCode::FORBIDDEN,
304                    "RequestTimeTooSkewed",
305                    "The difference between the request time and the current time is too large",
306                    &request_id,
307                    detected.protocol,
308                );
309            }
310            Err(fakecloud_aws::sigv4::SigV4Error::InvalidDate(msg)) => {
311                return build_error_response(
312                    StatusCode::FORBIDDEN,
313                    "IncompleteSignature",
314                    &format!("Invalid x-amz-date: {msg}"),
315                    &request_id,
316                    detected.protocol,
317                );
318            }
319            Err(fakecloud_aws::sigv4::SigV4Error::Malformed(msg)) => {
320                return build_error_response(
321                    StatusCode::FORBIDDEN,
322                    "IncompleteSignature",
323                    &format!("Malformed SigV4 signature: {msg}"),
324                    &request_id,
325                    detected.protocol,
326                );
327            }
328            Err(fakecloud_aws::sigv4::SigV4Error::SignatureMismatch) => {
329                return build_error_response(
330                    StatusCode::FORBIDDEN,
331                    "SignatureDoesNotMatch",
332                    "The request signature we calculated does not match the signature you provided",
333                    &request_id,
334                    detected.protocol,
335                );
336            }
337        }
338    }
339
340    // Build path segments. For S3 virtual-hosted-style requests the bucket
341    // lives in the Host header, not the path — prepend it so the S3 handler
342    // sees a uniform path-style request. SigV4 verification above already
343    // ran against the wire path, so this rewrite is signature-safe.
344    let wire_path = parts.uri.path();
345    let path = if detected.service == "s3" {
346        if let Some(bucket) = host_info.as_ref().and_then(|h| h.bucket.as_deref()) {
347            let prefix_with_slash = format!("/{bucket}/");
348            let is_bucket_root = wire_path.trim_end_matches('/') == format!("/{bucket}");
349            if wire_path.starts_with(&prefix_with_slash) || is_bucket_root {
350                wire_path.to_string()
351            } else if wire_path == "/" || wire_path.is_empty() {
352                format!("/{bucket}")
353            } else {
354                format!("/{bucket}{wire_path}")
355            }
356        } else {
357            wire_path.to_string()
358        }
359    } else {
360        wire_path.to_string()
361    };
362    let raw_query = parts.uri.query().unwrap_or("").to_string();
363    let path_segments: Vec<String> = path
364        .split('/')
365        .filter(|s| !s.is_empty())
366        .map(|s| s.to_string())
367        .collect();
368
369    // For JSON protocol, validate that non-empty bodies are valid JSON
370    if detected.protocol == AwsProtocol::Json
371        && !body_bytes.is_empty()
372        && serde_json::from_slice::<serde_json::Value>(&body_bytes).is_err()
373    {
374        return build_error_response(
375            StatusCode::BAD_REQUEST,
376            "SerializationException",
377            "Start of structure or map found where not expected",
378            &request_id,
379            AwsProtocol::Json,
380        );
381    }
382
383    // Merge query params with form body params for Query protocol
384    let mut all_params = query_params;
385    if detected.protocol == AwsProtocol::Query {
386        let body_params = protocol::parse_query_body(&body_bytes);
387        for (k, v) in body_params {
388            all_params.entry(k).or_insert(v);
389        }
390    }
391
392    let aws_request = AwsRequest {
393        service: detected.service.clone(),
394        action: detected.action.clone(),
395        region,
396        account_id: caller_principal
397            .as_ref()
398            .map(|p| p.account_id.clone())
399            .unwrap_or_else(|| config.account_id.clone()),
400        request_id: request_id.clone(),
401        headers: parts.headers,
402        query_params: all_params,
403        body: body_bytes,
404        body_stream: parking_lot::Mutex::new(body_stream),
405        path_segments,
406        raw_path: path,
407        raw_query,
408        method: parts.method,
409        is_query_protocol: detected.protocol == AwsProtocol::Query,
410        access_key_id,
411        principal: caller_principal,
412    };
413
414    tracing::info!(
415        service = %aws_request.service,
416        action = %aws_request.action,
417        request_id = %aws_request.request_id,
418        "handling request"
419    );
420
421    // Opt-in IAM identity-policy enforcement. Runs before the service
422    // handler so a deny never reaches business logic. Root principals
423    // (both `test*` bypass AKIDs and the account's IAM root) are exempt,
424    // matching AWS behavior. Services that haven't opted in via
425    // `iam_enforceable()` are transparently skipped — the startup log
426    // lists which services are under enforcement so users always know.
427    if config.iam_mode.is_enabled()
428        && service.iam_enforceable()
429        && !is_root_bypass(aws_request.access_key_id.as_deref().unwrap_or(""))
430    {
431        if let Some(evaluator) = config.policy_evaluator.as_ref() {
432            if let Some(principal) = aws_request.principal.as_ref() {
433                if !principal.is_root() {
434                    if let Some(iam_action) = service.iam_action_for(&aws_request) {
435                        let mut condition_context = build_condition_context(
436                            principal,
437                            remote_addr,
438                            &aws_request.region,
439                            is_secure_transport(&aws_request.headers),
440                        );
441                        // F3 keys riding on the resolved credential. STS
442                        // populates these at mint time so subsequent
443                        // requests under the credential can be evaluated
444                        // against `aws:MultiFactorAuthPresent`,
445                        // `aws:MultiFactorAuthAge`, `aws:TokenIssueTime`,
446                        // and `aws:FederatedProvider`. IAM user access
447                        // keys carry none of these, matching AWS.
448                        if let Some(rc) = resolved.as_ref() {
449                            condition_context.aws_mfa_present = Some(rc.mfa_present);
450                            condition_context.aws_token_issue_time = rc.token_issued_at;
451                            condition_context.aws_federated_provider =
452                                rc.federated_provider.clone();
453                            // `aws:MultiFactorAuthAge` is "seconds since
454                            // MFA was asserted" — computed at evaluation
455                            // time from the token issue moment so the
456                            // value increases monotonically as the session
457                            // ages. Only set when the session was actually
458                            // minted with MFA; otherwise the key is
459                            // absent, matching AWS.
460                            if rc.mfa_present {
461                                if let Some(issued) = rc.token_issued_at {
462                                    let age = chrono::Utc::now()
463                                        .signed_duration_since(issued)
464                                        .num_seconds()
465                                        .max(0);
466                                    condition_context.aws_mfa_age_seconds = Some(age);
467                                }
468                            }
469                        }
470                        condition_context.service_keys =
471                            service.iam_condition_keys_for(&aws_request, &iam_action);
472
473                        // ABAC: populate tag-based condition keys.
474                        // aws:ResourceTag/*
475                        match service.resource_tags_for(&iam_action.resource) {
476                            Some(tags) => condition_context.resource_tags = Some(tags),
477                            None => tracing::debug!(
478                                target: "fakecloud::iam::audit",
479                                service = %detected.service,
480                                resource = %iam_action.resource,
481                                "service does not expose resource tags for ABAC; skipping aws:ResourceTag/* evaluation"
482                            ),
483                        }
484                        // aws:RequestTag/* + aws:TagKeys
485                        match service.request_tags_from(&aws_request, iam_action.action) {
486                            Some(tags) => condition_context.request_tags = Some(tags),
487                            None => tracing::debug!(
488                                target: "fakecloud::iam::audit",
489                                service = %detected.service,
490                                action = %iam_action.action_string(),
491                                "service does not expose request tags for ABAC; skipping aws:RequestTag/* / aws:TagKeys evaluation"
492                            ),
493                        }
494                        // aws:PrincipalTag/*
495                        condition_context.principal_tags = principal.tags.clone();
496
497                        // Phase 2: fetch the resource-based policy (if
498                        // any) attached to the target resource and
499                        // pass it to the evaluator alongside the
500                        // principal's identity policies. The resource's
501                        // owning account is parsed from the ARN (#381
502                        // multi-account alignment); S3 ARNs have an
503                        // empty account field, so we fall back to the
504                        // server's configured account ID in that case.
505                        let resource_policy_json =
506                            config.resource_policy_provider.as_ref().and_then(|p| {
507                                p.resource_policy(&detected.service, &iam_action.resource)
508                            });
509                        // Derive the resource-owning account from the
510                        // resource ARN. Wildcard (`*`) means the action
511                        // isn't scoped to a specific resource (e.g.
512                        // ListQueues, GetCallerIdentity) — treat it as
513                        // same-account by using the caller's account.
514                        let resource_account_id = parse_account_from_arn(&iam_action.resource)
515                            .unwrap_or_else(|| principal.account_id.clone());
516                        // SCP ceiling: resolve the inherited SCP chain
517                        // for this principal (management accounts and
518                        // service-linked roles come back as `None`, in
519                        // which case the evaluator treats the layer as
520                        // absent). Audit breadcrumbs emitted by the
521                        // resolver itself, not here.
522                        let scps = config
523                            .scp_resolver
524                            .as_ref()
525                            .and_then(|r| r.scps_for(principal));
526                        let decision = evaluator.evaluate_with_resource_policy(
527                            principal,
528                            &iam_action,
529                            &condition_context,
530                            resource_policy_json.as_deref(),
531                            &resource_account_id,
532                            &caller_session_policies,
533                            scps.as_deref(),
534                        );
535                        if !decision.is_allow() {
536                            tracing::warn!(
537                                target: "fakecloud::iam::audit",
538                                service = %detected.service,
539                                action = %iam_action.action_string(),
540                                resource = %iam_action.resource,
541                                principal = %principal.arn,
542                                resource_policy_present = resource_policy_json.is_some(),
543                                decision = ?decision,
544                                mode = %config.iam_mode,
545                                request_id = %request_id,
546                                "IAM policy evaluation denied request"
547                            );
548                            if config.iam_mode.is_strict() {
549                                // Real AWS includes an "Encoded
550                                // authorization failure message" suffix
551                                // on AccessDeniedException — an opaque
552                                // base64+zlib JSON blob that the caller
553                                // can pass to STS
554                                // `DecodeAuthorizationMessage` to
555                                // recover the structured deny reason
556                                // (action, principal, matched
557                                // statements, condition context). We
558                                // produce the same blob inline so
559                                // existing tooling that decodes deny
560                                // reasons works against fakecloud.
561                                let context_summary = serde_json::json!({
562                                    "aws:PrincipalArn": principal.arn,
563                                    "aws:PrincipalAccount": principal.account_id,
564                                    "aws:RequestedRegion": condition_context
565                                        .aws_requested_region
566                                        .clone()
567                                        .unwrap_or_default(),
568                                    "aws:SecureTransport": condition_context
569                                        .aws_secure_transport
570                                        .unwrap_or(false),
571                                    "aws:Action": iam_action.action_string(),
572                                    "aws:Resource": iam_action.resource,
573                                    "decision": format!("{:?}", decision),
574                                });
575                                let action_string = iam_action.action_string();
576                                let encoded = crate::auth_message::encode_deny(
577                                    matches!(decision, crate::auth::IamDecision::ExplicitDeny),
578                                    Some(&action_string),
579                                    Some(&principal.arn),
580                                    Vec::new(),
581                                    Some(context_summary),
582                                );
583                                return build_error_response(
584                                    StatusCode::FORBIDDEN,
585                                    "AccessDeniedException",
586                                    &format!(
587                                        "User: {} is not authorized to perform: {} on resource: {} Encoded authorization failure message: {}",
588                                        principal.arn,
589                                        iam_action.action_string(),
590                                        iam_action.resource,
591                                        encoded,
592                                    ),
593                                    &request_id,
594                                    detected.protocol,
595                                );
596                            }
597                            // Soft mode: audit log already emitted; fall
598                            // through to the handler.
599                        }
600                    } else {
601                        // Service opted in but didn't return an IamAction
602                        // for this specific operation — programming bug,
603                        // surface it loudly in soft/strict mode so it's
604                        // visible during rollout.
605                        tracing::warn!(
606                            target: "fakecloud::iam::audit",
607                            service = %detected.service,
608                            action = %aws_request.action,
609                            "service is iam_enforceable but has no IamAction mapping for this action; skipping evaluation"
610                        );
611                    }
612                }
613            }
614        }
615    }
616
617    match service.handle(aws_request).await {
618        Ok(resp) => {
619            let mut builder = Response::builder()
620                .status(resp.status)
621                .header("x-amzn-requestid", &request_id)
622                .header("x-amz-request-id", &request_id);
623
624            if !resp.content_type.is_empty() {
625                builder = builder.header("content-type", &resp.content_type);
626            }
627
628            let has_content_length = resp
629                .headers
630                .iter()
631                .any(|(k, _)| k.as_str().eq_ignore_ascii_case("content-length"));
632
633            for (k, v) in &resp.headers {
634                builder = builder.header(k, v);
635            }
636
637            match resp.body {
638                ResponseBody::Bytes(b) => builder.body(Body::from(b)).unwrap(),
639                ResponseBody::File { file, size } => {
640                    let stream = tokio_util::io::ReaderStream::new(file);
641                    let body = Body::from_stream(stream);
642                    if !has_content_length {
643                        builder = builder.header("content-length", size.to_string());
644                    }
645                    builder.body(body).unwrap()
646                }
647            }
648        }
649        Err(err) => {
650            tracing::warn!(
651                service = %detected.service,
652                action = %detected.action,
653                error = %err,
654                "request failed"
655            );
656            let error_headers = err.response_headers().to_vec();
657            let mut resp = build_error_response_with_fields(
658                err.status(),
659                err.code(),
660                &err.message(),
661                &request_id,
662                detected.protocol,
663                err.extra_fields(),
664            );
665            for (k, v) in &error_headers {
666                if let (Ok(name), Ok(val)) = (
667                    k.parse::<http::header::HeaderName>(),
668                    v.parse::<http::header::HeaderValue>(),
669                ) {
670                    resp.headers_mut().insert(name, val);
671                }
672            }
673            resp
674        }
675    }
676}
677
678/// Configuration passed to the dispatch handler.
679#[derive(Clone)]
680pub struct DispatchConfig {
681    pub region: String,
682    pub account_id: String,
683    /// Whether to cryptographically verify SigV4 signatures on incoming
684    /// requests. Wired through from `--verify-sigv4` /
685    /// `FAKECLOUD_VERIFY_SIGV4`. Off by default.
686    pub verify_sigv4: bool,
687    /// IAM policy evaluation mode. Wired through from `--iam` /
688    /// `FAKECLOUD_IAM`. Defaults to [`IamMode::Off`]. Actual evaluation is
689    /// added in a later batch; today this field is plumbed but never
690    /// consulted.
691    pub iam_mode: IamMode,
692    /// Resolves access key IDs to their secrets and owning principals.
693    /// Required when `verify_sigv4` or `iam_mode != Off`. When `None`, both
694    /// features gracefully degrade to off-by-default behavior.
695    pub credential_resolver: Option<Arc<dyn CredentialResolver>>,
696    /// Evaluates IAM identity policies for a resolved principal + action.
697    /// Required when `iam_mode != Off`. When `None`, enforcement silently
698    /// degrades to off even if `iam_mode` is set.
699    pub policy_evaluator: Option<Arc<dyn IamPolicyEvaluator>>,
700    /// Resolves resource-based policies (S3 bucket policies in the
701    /// initial rollout) to hand to the evaluator alongside the
702    /// principal's identity policies. `None` means the server was
703    /// started without any resource-policy-owning service registered;
704    /// dispatch then behaves as if no resource policy is attached to
705    /// any resource, identical to the Phase 1 behavior.
706    pub resource_policy_provider: Option<Arc<dyn ResourcePolicyProvider>>,
707    /// Resolves the ordered SCP chain that applies to a principal's
708    /// account (root-OU first, account-direct last). `None` means no
709    /// organizations resolver has been registered — SCPs never gate
710    /// any request in that case. Off-by-default matches the Batch 4
711    /// contract: zero behavior change until a user calls
712    /// `CreateOrganization` and the resolver is wired.
713    pub scp_resolver: Option<Arc<dyn crate::auth::ScpResolver>>,
714}
715
716impl std::fmt::Debug for DispatchConfig {
717    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
718        f.debug_struct("DispatchConfig")
719            .field("region", &self.region)
720            .field("account_id", &self.account_id)
721            .field("verify_sigv4", &self.verify_sigv4)
722            .field("iam_mode", &self.iam_mode)
723            .field(
724                "credential_resolver",
725                &self
726                    .credential_resolver
727                    .as_ref()
728                    .map(|_| "<CredentialResolver>"),
729            )
730            .field(
731                "policy_evaluator",
732                &self
733                    .policy_evaluator
734                    .as_ref()
735                    .map(|_| "<IamPolicyEvaluator>"),
736            )
737            .field(
738                "resource_policy_provider",
739                &self
740                    .resource_policy_provider
741                    .as_ref()
742                    .map(|_| "<ResourcePolicyProvider>"),
743            )
744            .field(
745                "scp_resolver",
746                &self.scp_resolver.as_ref().map(|_| "<ScpResolver>"),
747            )
748            .finish()
749    }
750}
751
752impl DispatchConfig {
753    /// Minimal constructor for tests and call sites that don't care about the
754    /// opt-in security features.
755    pub fn new(region: impl Into<String>, account_id: impl Into<String>) -> Self {
756        Self {
757            region: region.into(),
758            account_id: account_id.into(),
759            verify_sigv4: false,
760            iam_mode: IamMode::Off,
761            credential_resolver: None,
762            policy_evaluator: None,
763            resource_policy_provider: None,
764            scp_resolver: None,
765        }
766    }
767}
768
769/// Extract the 12-digit account ID segment from an AWS ARN.
770///
771/// ARNs follow `arn:<partition>:<service>:<region>:<account>:<resource>`.
772/// Identifies routes that opt into streaming request bodies. Returns
773/// `Some((service, action_hint))` when the dispatch path should hand
774/// the raw body to the service handler unbuffered, otherwise `None`
775/// for the default buffered path. The handler reads the stream via
776/// [`crate::service::AwsRequest::take_body_stream`].
777///
778/// Streaming-eligible routes today:
779///
780/// * `s3` PUT object — `PUT /<bucket>/<key>` with a SigV4 (or
781///   presigned) auth header. Covers PutObject, UploadPart, and
782///   UploadPartCopy. The S3 service spills to disk via
783///   [`fakecloud_persistence::BodySource::File`] when the stream is
784///   present.
785/// * `ecr` OCI Distribution v2 blob upload — `PATCH` and `PUT` on
786///   `/v2/{name}/blobs/uploads/{uuid}`. The ECR service spools the
787///   stream into a per-upload temp file before computing the digest.
788fn streaming_route(
789    method: &http::Method,
790    path: &str,
791    headers: &http::HeaderMap,
792    query_params: &HashMap<String, String>,
793) -> Option<(&'static str, &'static str)> {
794    // ECR OCI v2 blob upload (PATCH chunk + final PUT).
795    if (method == http::Method::PATCH || method == http::Method::PUT)
796        && path.starts_with("/v2/")
797        && path.contains("/blobs/uploads/")
798    {
799        return Some(("ecr", ""));
800    }
801
802    // S3 PutObject / UploadPart / UploadPartCopy. Detect either via
803    // SigV4 service field in the Authorization header OR via a SigV4
804    // presigned URL (X-Amz-Credential .../s3/...) OR a SigV2 presigned
805    // URL (AWSAccessKeyId + Signature + Expires query parameters).
806    if method == http::Method::PUT {
807        let after = path.trim_start_matches('/');
808        // Path-style PutObject is `PUT /<bucket>/<key>` (path contains a
809        // slash); virtual-hosted-style is `PUT /<key>` with the bucket
810        // in the Host header. For virtual-hosted, accept any non-empty
811        // path so the key flows through the streaming dispatch — the
812        // Host parser already routed this request to S3.
813        let virtual_hosted_s3 = protocol::parse_routing_host_from_headers(headers)
814            .filter(|h| h.service == "s3" && h.bucket.is_some())
815            .is_some();
816        if after.is_empty() || (!virtual_hosted_s3 && !after.contains('/')) {
817            return None;
818        }
819        let header_s3 = headers
820            .get("authorization")
821            .and_then(|v| v.to_str().ok())
822            .and_then(fakecloud_aws::sigv4::parse_sigv4)
823            .map(|info| info.service == "s3")
824            .unwrap_or(false);
825        let presigned_v4_s3 = query_params
826            .get("X-Amz-Credential")
827            .and_then(|c| c.split('/').nth(3).map(|s| s.to_string()))
828            .map(|service| service == "s3")
829            .unwrap_or(false);
830        let presigned_v2 = query_params.contains_key("AWSAccessKeyId")
831            && query_params.contains_key("Signature")
832            && query_params.contains_key("Expires");
833        if header_s3 || presigned_v4_s3 || presigned_v2 {
834            return Some(("s3", ""));
835        }
836    }
837
838    None
839}
840
841/// Default request-body buffering cap. fakecloud reads the entire
842/// request body into memory before handing it to a service handler,
843/// so this ceiling caps RAM usage per in-flight request.
844///
845/// Default 1 GiB — comfortably above legitimate single S3 PutObject
846/// payloads (AWS recommends multipart above ~100 MiB) and each
847/// multipart part dispatches through here separately. Override with
848/// `FAKECLOUD_MAX_REQUEST_BODY_BYTES` (decimal bytes) when running
849/// stress tests that push past the default.
850const DEFAULT_MAX_REQUEST_BODY_BYTES: usize = 1024 * 1024 * 1024;
851
852fn max_request_body_bytes() -> usize {
853    static CACHED: std::sync::OnceLock<usize> = std::sync::OnceLock::new();
854    *CACHED.get_or_init(|| {
855        std::env::var("FAKECLOUD_MAX_REQUEST_BODY_BYTES")
856            .ok()
857            .and_then(|s| s.parse::<usize>().ok())
858            .filter(|&n| n > 0)
859            .unwrap_or(DEFAULT_MAX_REQUEST_BODY_BYTES)
860    })
861}
862
863/// For the cross-account decision in IAM enforcement, the "resource
864/// account" is the ARN's account segment. Some services (notably S3)
865/// produce ARNs with an empty account field — for those we return
866/// `None` and let the caller fall back to the server's configured
867/// account ID. Malformed or non-ARN strings also return `None`.
868fn parse_account_from_arn(arn: &str) -> Option<String> {
869    let mut parts = arn.splitn(6, ':');
870    if parts.next()? != "arn" {
871        return None;
872    }
873    let _partition = parts.next()?;
874    let _service = parts.next()?;
875    let _region = parts.next()?;
876    let account = parts.next()?;
877    // Resource segment must exist (parts.next().is_some()) for the ARN
878    // to be well-formed, but we don't consume its value here.
879    parts.next()?;
880    if account.is_empty() {
881        None
882    } else {
883        Some(account.to_string())
884    }
885}
886
887/// Extract region from User-Agent header suffix `region/<region>`.
888fn extract_region_from_user_agent(headers: &http::HeaderMap) -> Option<String> {
889    let ua = headers.get("user-agent")?.to_str().ok()?;
890    for part in ua.split_whitespace() {
891        if let Some(region) = part.strip_prefix("region/") {
892            if !region.is_empty() {
893                return Some(region.to_string());
894            }
895        }
896    }
897    None
898}
899
900fn build_error_response(
901    status: StatusCode,
902    code: &str,
903    message: &str,
904    request_id: &str,
905    protocol: AwsProtocol,
906) -> Response<Body> {
907    build_error_response_with_fields(status, code, message, request_id, protocol, &[])
908}
909
910fn build_error_response_with_fields(
911    status: StatusCode,
912    code: &str,
913    message: &str,
914    request_id: &str,
915    protocol: AwsProtocol,
916    extra_fields: &[(String, String)],
917) -> Response<Body> {
918    let (status, content_type, body) = match protocol {
919        AwsProtocol::Query => {
920            fakecloud_aws::error::xml_error_response(status, code, message, request_id)
921        }
922        AwsProtocol::Rest => fakecloud_aws::error::s3_xml_error_response_with_fields(
923            status,
924            code,
925            message,
926            request_id,
927            extra_fields,
928        ),
929        AwsProtocol::Json | AwsProtocol::RestJson => {
930            fakecloud_aws::error::json_error_response(status, code, message)
931        }
932    };
933
934    // S3 (and other REST-XML services) place the error code in
935    // `x-amz-error-code` so HEAD responses — which HTTP forbids from
936    // carrying a body — still surface the code. AWS SDKs read this header
937    // when the body is empty. Emit it on every error response so HEAD,
938    // OPTIONS, and any client that strips the body still see the code.
939    Response::builder()
940        .status(status)
941        .header("content-type", content_type)
942        .header("x-amzn-requestid", request_id)
943        .header("x-amz-request-id", request_id)
944        .header("x-amz-error-code", code)
945        .header("x-amz-error-message", message)
946        .body(Body::from(body))
947        .unwrap()
948}
949
950/// Build the [`ConditionContext`] passed to the IAM evaluator for one
951/// request. Populates the 10 global condition keys from the resolved
952/// principal + the HTTP request. Service-specific keys are deferred to
953/// a follow-up batch and left empty.
954fn build_condition_context(
955    principal: &Principal,
956    remote_addr: Option<SocketAddr>,
957    region: &str,
958    secure_transport: bool,
959) -> ConditionContext {
960    let now = chrono::Utc::now();
961    ConditionContext {
962        aws_username: aws_username_from_principal(principal),
963        aws_userid: Some(principal.user_id.clone()),
964        aws_principal_arn: Some(principal.arn.clone()),
965        aws_principal_account: Some(principal.account_id.clone()),
966        aws_principal_type: Some(principal_type_label(principal.principal_type).to_string()),
967        aws_source_ip: remote_addr.map(|sa| sa.ip()),
968        aws_current_time: Some(now),
969        aws_epoch_time: Some(now.timestamp()),
970        aws_secure_transport: Some(secure_transport),
971        aws_requested_region: Some(region.to_string()),
972        // F3 keys: populated from the caller's session context when STS
973        // mints credentials with MFA / SAML / OIDC / VPC-endpoint hints.
974        // Default-None here so tests/dispatch sites that don't set them
975        // safe-fail any policy referencing them — matching AWS for keys
976        // that aren't asserted.
977        aws_mfa_present: None,
978        aws_mfa_age_seconds: None,
979        aws_called_via: Vec::new(),
980        aws_source_vpce: None,
981        aws_source_vpc: None,
982        aws_vpc_source_ip: None,
983        aws_federated_provider: None,
984        aws_token_issue_time: None,
985        service_keys: Default::default(),
986        resource_tags: None,
987        request_tags: None,
988        principal_tags: None,
989    }
990}
991
992/// `aws:username` is only set for IAM users, matching AWS. For assumed
993/// roles, federated users, root, and unknown principals the key is
994/// absent — operators that reference it without `IfExists` safe-fail.
995fn aws_username_from_principal(principal: &Principal) -> Option<String> {
996    if principal.principal_type != PrincipalType::User {
997        return None;
998    }
999    let after = principal.arn.rsplit_once(":user/").map(|(_, s)| s)?;
1000    // Strip any IAM path prefix; bare username is the last segment.
1001    Some(after.rsplit('/').next().unwrap_or(after).to_string())
1002}
1003
1004/// AWS's `aws:PrincipalType` uses PascalCase identifiers, distinct from
1005/// the lowercase ones [`PrincipalType::as_str`] returns for ARNs.
1006fn principal_type_label(t: PrincipalType) -> &'static str {
1007    match t {
1008        PrincipalType::User => "User",
1009        PrincipalType::AssumedRole => "AssumedRole",
1010        PrincipalType::FederatedUser => "FederatedUser",
1011        PrincipalType::Root => "Account",
1012        PrincipalType::Unknown => "Unknown",
1013    }
1014}
1015
1016/// Best-effort detection of TLS-terminated requests. Direct HTTPS
1017/// connections are not yet supported by the fakecloud server (it speaks
1018/// plain HTTP), so the only signal is an `x-forwarded-proto: https`
1019/// header set by an upstream proxy. Anything else evaluates to `false`,
1020/// which matches the typical local-dev setup.
1021fn is_secure_transport(headers: &http::HeaderMap) -> bool {
1022    headers
1023        .get("x-forwarded-proto")
1024        .and_then(|v| v.to_str().ok())
1025        .map(|s| s.eq_ignore_ascii_case("https"))
1026        .unwrap_or(false)
1027}
1028
1029trait ProtocolExt {
1030    fn error_status(&self) -> StatusCode;
1031}
1032
1033impl ProtocolExt for AwsProtocol {
1034    fn error_status(&self) -> StatusCode {
1035        StatusCode::BAD_REQUEST
1036    }
1037}
1038
1039#[cfg(test)]
1040mod tests {
1041    use super::*;
1042
1043    #[test]
1044    fn default_max_request_body_bytes_is_one_gib() {
1045        // Without the env override, the cap defaults to 1 GiB. The
1046        // public function caches via OnceLock so only the first call
1047        // in the process matters; we assert the constant directly.
1048        assert_eq!(DEFAULT_MAX_REQUEST_BODY_BYTES, 1024 * 1024 * 1024);
1049    }
1050
1051    #[test]
1052    fn dispatch_config_new_defaults_to_off() {
1053        let cfg = DispatchConfig::new("us-east-1", "123456789012");
1054        assert_eq!(cfg.region, "us-east-1");
1055        assert_eq!(cfg.account_id, "123456789012");
1056        assert!(!cfg.verify_sigv4);
1057        assert_eq!(cfg.iam_mode, IamMode::Off);
1058    }
1059
1060    #[test]
1061    fn aws_username_strips_iam_path_for_users() {
1062        let p = Principal {
1063            arn: "arn:aws:iam::123456789012:user/engineering/alice".into(),
1064            user_id: "AIDAALICE".into(),
1065            account_id: "123456789012".into(),
1066            principal_type: PrincipalType::User,
1067            source_identity: None,
1068            tags: None,
1069        };
1070        assert_eq!(aws_username_from_principal(&p), Some("alice".into()));
1071    }
1072
1073    #[test]
1074    fn aws_username_unset_for_assumed_role() {
1075        let p = Principal {
1076            arn: "arn:aws:sts::123456789012:assumed-role/ops/session".into(),
1077            user_id: "AROAOPS:session".into(),
1078            account_id: "123456789012".into(),
1079            principal_type: PrincipalType::AssumedRole,
1080            source_identity: None,
1081            tags: None,
1082        };
1083        assert_eq!(aws_username_from_principal(&p), None);
1084    }
1085
1086    #[test]
1087    fn principal_type_label_matches_aws_casing() {
1088        assert_eq!(principal_type_label(PrincipalType::User), "User");
1089        assert_eq!(
1090            principal_type_label(PrincipalType::AssumedRole),
1091            "AssumedRole"
1092        );
1093        assert_eq!(principal_type_label(PrincipalType::Root), "Account");
1094    }
1095
1096    #[test]
1097    fn build_condition_context_populates_global_keys() {
1098        let p = Principal {
1099            arn: "arn:aws:iam::123456789012:user/alice".into(),
1100            user_id: "AIDAALICE".into(),
1101            account_id: "123456789012".into(),
1102            principal_type: PrincipalType::User,
1103            source_identity: None,
1104            tags: None,
1105        };
1106        let addr: SocketAddr = "10.0.0.1:54321".parse().unwrap();
1107        let ctx = build_condition_context(&p, Some(addr), "us-east-1", false);
1108        assert_eq!(ctx.aws_username.as_deref(), Some("alice"));
1109        assert_eq!(ctx.aws_userid.as_deref(), Some("AIDAALICE"));
1110        assert_eq!(
1111            ctx.aws_principal_arn.as_deref(),
1112            Some("arn:aws:iam::123456789012:user/alice")
1113        );
1114        assert_eq!(ctx.aws_principal_account.as_deref(), Some("123456789012"));
1115        assert_eq!(ctx.aws_principal_type.as_deref(), Some("User"));
1116        assert_eq!(
1117            ctx.aws_source_ip.map(|i| i.to_string()).as_deref(),
1118            Some("10.0.0.1")
1119        );
1120        assert_eq!(ctx.aws_requested_region.as_deref(), Some("us-east-1"));
1121        assert_eq!(ctx.aws_secure_transport, Some(false));
1122        assert!(ctx.aws_current_time.is_some());
1123        assert!(ctx.aws_epoch_time.is_some());
1124    }
1125
1126    #[test]
1127    fn is_secure_transport_reads_x_forwarded_proto() {
1128        let mut headers = http::HeaderMap::new();
1129        headers.insert("x-forwarded-proto", "https".parse().unwrap());
1130        assert!(is_secure_transport(&headers));
1131        headers.insert("x-forwarded-proto", "http".parse().unwrap());
1132        assert!(!is_secure_transport(&headers));
1133        let empty = http::HeaderMap::new();
1134        assert!(!is_secure_transport(&empty));
1135    }
1136
1137    #[test]
1138    fn parse_account_from_arn_extracts_standard_shapes() {
1139        assert_eq!(
1140            parse_account_from_arn("arn:aws:sqs:us-east-1:123456789012:queue"),
1141            Some("123456789012".to_string())
1142        );
1143        assert_eq!(
1144            parse_account_from_arn("arn:aws:iam::123456789012:user/alice"),
1145            Some("123456789012".to_string())
1146        );
1147    }
1148
1149    #[test]
1150    fn parse_account_from_arn_returns_none_for_s3_empty_account() {
1151        // S3 ARNs have both region and account empty.
1152        assert_eq!(parse_account_from_arn("arn:aws:s3:::my-bucket"), None);
1153        assert_eq!(
1154            parse_account_from_arn("arn:aws:s3:::my-bucket/path/to/key"),
1155            None
1156        );
1157    }
1158
1159    #[test]
1160    fn parse_account_from_arn_returns_none_for_malformed() {
1161        assert_eq!(parse_account_from_arn(""), None);
1162        assert_eq!(parse_account_from_arn("not-an-arn"), None);
1163        assert_eq!(parse_account_from_arn("arn:aws:sqs:us-east-1"), None);
1164        assert_eq!(parse_account_from_arn("arn:aws:sqs"), None);
1165    }
1166
1167    #[test]
1168    fn extract_region_from_user_agent_finds_region_segment() {
1169        let mut headers = http::HeaderMap::new();
1170        headers.insert(
1171            "user-agent",
1172            "aws-sdk-rust/1.0 os/linux region/eu-central-1"
1173                .parse()
1174                .unwrap(),
1175        );
1176        assert_eq!(
1177            extract_region_from_user_agent(&headers),
1178            Some("eu-central-1".to_string())
1179        );
1180    }
1181
1182    #[test]
1183    fn extract_region_from_user_agent_none_without_header() {
1184        let headers = http::HeaderMap::new();
1185        assert_eq!(extract_region_from_user_agent(&headers), None);
1186    }
1187
1188    #[test]
1189    fn extract_region_from_user_agent_ignores_empty_region() {
1190        let mut headers = http::HeaderMap::new();
1191        headers.insert("user-agent", "aws-sdk-java region/".parse().unwrap());
1192        assert_eq!(extract_region_from_user_agent(&headers), None);
1193    }
1194
1195    #[test]
1196    fn extract_region_from_user_agent_none_when_no_region_marker() {
1197        let mut headers = http::HeaderMap::new();
1198        headers.insert("user-agent", "curl/7.79.1".parse().unwrap());
1199        assert_eq!(extract_region_from_user_agent(&headers), None);
1200    }
1201
1202    #[test]
1203    fn aws_username_none_for_root() {
1204        let p = Principal {
1205            arn: "arn:aws:iam::123456789012:root".into(),
1206            user_id: "123456789012".into(),
1207            account_id: "123456789012".into(),
1208            principal_type: PrincipalType::Root,
1209            source_identity: None,
1210            tags: None,
1211        };
1212        assert_eq!(aws_username_from_principal(&p), None);
1213    }
1214
1215    #[test]
1216    fn aws_username_bare_no_path() {
1217        let p = Principal {
1218            arn: "arn:aws:iam::123456789012:user/bob".into(),
1219            user_id: "AIDABOB".into(),
1220            account_id: "123456789012".into(),
1221            principal_type: PrincipalType::User,
1222            source_identity: None,
1223            tags: None,
1224        };
1225        assert_eq!(aws_username_from_principal(&p), Some("bob".into()));
1226    }
1227
1228    #[test]
1229    fn principal_type_label_covers_federated_and_unknown() {
1230        assert_eq!(
1231            principal_type_label(PrincipalType::FederatedUser),
1232            "FederatedUser"
1233        );
1234        assert_eq!(principal_type_label(PrincipalType::Unknown), "Unknown");
1235    }
1236
1237    #[test]
1238    fn build_condition_context_marks_secure_when_flag_set() {
1239        let p = Principal {
1240            arn: "arn:aws:iam::123456789012:user/alice".into(),
1241            user_id: "AIDAALICE".into(),
1242            account_id: "123456789012".into(),
1243            principal_type: PrincipalType::User,
1244            source_identity: None,
1245            tags: None,
1246        };
1247        let ctx = build_condition_context(&p, None, "us-west-2", true);
1248        assert_eq!(ctx.aws_secure_transport, Some(true));
1249        assert!(ctx.aws_source_ip.is_none());
1250        assert_eq!(ctx.aws_requested_region.as_deref(), Some("us-west-2"));
1251    }
1252
1253    #[test]
1254    fn is_secure_transport_case_insensitive() {
1255        let mut headers = http::HeaderMap::new();
1256        headers.insert("x-forwarded-proto", "HTTPS".parse().unwrap());
1257        assert!(is_secure_transport(&headers));
1258    }
1259
1260    #[test]
1261    fn is_secure_transport_non_ascii_bytes_false() {
1262        let mut headers = http::HeaderMap::new();
1263        headers.insert(
1264            "x-forwarded-proto",
1265            http::HeaderValue::from_bytes(&[0xFF, 0xFE]).unwrap(),
1266        );
1267        assert!(!is_secure_transport(&headers));
1268    }
1269
1270    #[test]
1271    fn protocol_ext_error_status_is_bad_request() {
1272        assert_eq!(AwsProtocol::Query.error_status(), StatusCode::BAD_REQUEST);
1273        assert_eq!(AwsProtocol::Json.error_status(), StatusCode::BAD_REQUEST);
1274        assert_eq!(AwsProtocol::Rest.error_status(), StatusCode::BAD_REQUEST);
1275        assert_eq!(
1276            AwsProtocol::RestJson.error_status(),
1277            StatusCode::BAD_REQUEST
1278        );
1279    }
1280
1281    #[test]
1282    fn build_error_response_json_has_json_content_type() {
1283        let resp = build_error_response(
1284            StatusCode::BAD_REQUEST,
1285            "TestCode",
1286            "test msg",
1287            "req-1",
1288            AwsProtocol::Json,
1289        );
1290        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1291        let ct = resp
1292            .headers()
1293            .get("content-type")
1294            .unwrap()
1295            .to_str()
1296            .unwrap();
1297        assert!(ct.contains("json"));
1298        let rid = resp
1299            .headers()
1300            .get("x-amzn-requestid")
1301            .unwrap()
1302            .to_str()
1303            .unwrap();
1304        assert_eq!(rid, "req-1");
1305    }
1306
1307    #[test]
1308    fn build_error_response_rest_returns_xml_content_type() {
1309        let resp = build_error_response(
1310            StatusCode::NOT_FOUND,
1311            "NoSuchBucket",
1312            "bucket missing",
1313            "req-2",
1314            AwsProtocol::Rest,
1315        );
1316        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
1317        let ct = resp
1318            .headers()
1319            .get("content-type")
1320            .unwrap()
1321            .to_str()
1322            .unwrap();
1323        assert!(ct.contains("xml"));
1324    }
1325
1326    #[test]
1327    fn build_error_response_query_returns_xml() {
1328        let resp = build_error_response(
1329            StatusCode::BAD_REQUEST,
1330            "InvalidParameter",
1331            "bad param",
1332            "req-3",
1333            AwsProtocol::Query,
1334        );
1335        let ct = resp
1336            .headers()
1337            .get("content-type")
1338            .unwrap()
1339            .to_str()
1340            .unwrap();
1341        assert!(ct.contains("xml"));
1342    }
1343
1344    #[test]
1345    fn dispatch_config_carries_opt_in_flags() {
1346        let cfg = DispatchConfig {
1347            region: "eu-west-1".to_string(),
1348            account_id: "000000000000".to_string(),
1349            verify_sigv4: true,
1350            iam_mode: IamMode::Strict,
1351            credential_resolver: None,
1352            policy_evaluator: None,
1353            resource_policy_provider: None,
1354            scp_resolver: None,
1355        };
1356        assert!(cfg.verify_sigv4);
1357        assert!(cfg.iam_mode.is_strict());
1358        assert!(cfg.resource_policy_provider.is_none());
1359        assert!(cfg.scp_resolver.is_none());
1360    }
1361
1362    fn s3_sigv4_headers() -> http::HeaderMap {
1363        let mut headers = http::HeaderMap::new();
1364        headers.insert(
1365            "authorization",
1366            "AWS4-HMAC-SHA256 Credential=test/20240101/us-east-1/s3/aws4_request, \
1367             SignedHeaders=host, Signature=fake"
1368                .parse()
1369                .unwrap(),
1370        );
1371        headers
1372    }
1373
1374    #[test]
1375    fn streaming_route_path_style_s3_put_object() {
1376        let headers = s3_sigv4_headers();
1377        assert_eq!(
1378            streaming_route(
1379                &http::Method::PUT,
1380                "/my-bucket/key.txt",
1381                &headers,
1382                &HashMap::new(),
1383            ),
1384            Some(("s3", "")),
1385        );
1386    }
1387
1388    #[test]
1389    fn streaming_route_path_style_create_bucket_skipped() {
1390        // `PUT /bucket` (no trailing key) is CreateBucket — must NOT
1391        // hit the streaming path.
1392        let headers = s3_sigv4_headers();
1393        assert_eq!(
1394            streaming_route(&http::Method::PUT, "/my-bucket", &headers, &HashMap::new(),),
1395            None,
1396        );
1397    }
1398
1399    #[test]
1400    fn streaming_route_virtual_hosted_s3_put_object() {
1401        let mut headers = s3_sigv4_headers();
1402        headers.insert(
1403            "host",
1404            "vhost-bucket.s3.us-east-1.localhost.localstack.cloud:4566"
1405                .parse()
1406                .unwrap(),
1407        );
1408        // Virtual-hosted PUT has no bucket in the URL path (`/<key>`),
1409        // so the slash check used for path-style would reject it. The
1410        // Host parser confirms this is virtual-hosted S3 and the key
1411        // flows through the streaming dispatch.
1412        assert_eq!(
1413            streaming_route(&http::Method::PUT, "/hello.txt", &headers, &HashMap::new(),),
1414            Some(("s3", "")),
1415        );
1416    }
1417
1418    #[test]
1419    fn streaming_route_virtual_hosted_s3_root_skipped() {
1420        // `PUT /` against a virtual-hosted Host = CreateBucket, which
1421        // is handled buffered. Empty path-after-slash must short-circuit.
1422        let mut headers = s3_sigv4_headers();
1423        headers.insert(
1424            "host",
1425            "vhost-bucket.s3.us-east-1.localhost.localstack.cloud:4566"
1426                .parse()
1427                .unwrap(),
1428        );
1429        assert_eq!(
1430            streaming_route(&http::Method::PUT, "/", &headers, &HashMap::new()),
1431            None,
1432        );
1433    }
1434
1435    #[test]
1436    fn streaming_route_ecr_blob_upload() {
1437        let headers = http::HeaderMap::new();
1438        assert_eq!(
1439            streaming_route(
1440                &http::Method::PATCH,
1441                "/v2/my-repo/blobs/uploads/abcd1234",
1442                &headers,
1443                &HashMap::new(),
1444            ),
1445            Some(("ecr", "")),
1446        );
1447        assert_eq!(
1448            streaming_route(
1449                &http::Method::PUT,
1450                "/v2/my-repo/blobs/uploads/abcd1234",
1451                &headers,
1452                &HashMap::new(),
1453            ),
1454            Some(("ecr", "")),
1455        );
1456    }
1457
1458    #[test]
1459    fn streaming_route_presigned_v4_s3_put() {
1460        let headers = http::HeaderMap::new();
1461        let mut query_params = HashMap::new();
1462        query_params.insert(
1463            "X-Amz-Credential".to_string(),
1464            "test/20240101/us-east-1/s3/aws4_request".to_string(),
1465        );
1466        assert_eq!(
1467            streaming_route(
1468                &http::Method::PUT,
1469                "/my-bucket/key.txt",
1470                &headers,
1471                &query_params,
1472            ),
1473            Some(("s3", "")),
1474        );
1475    }
1476
1477    #[test]
1478    fn streaming_route_non_s3_auth_header_skipped() {
1479        // Same path shape but the SigV4 service is lambda — must not
1480        // wire the streaming dispatch (Lambda has its own buffered path).
1481        let mut headers = http::HeaderMap::new();
1482        headers.insert(
1483            "authorization",
1484            "AWS4-HMAC-SHA256 Credential=test/20240101/us-east-1/lambda/aws4_request, \
1485             SignedHeaders=host, Signature=fake"
1486                .parse()
1487                .unwrap(),
1488        );
1489        assert_eq!(
1490            streaming_route(
1491                &http::Method::PUT,
1492                "/my-bucket/key.txt",
1493                &headers,
1494                &HashMap::new(),
1495            ),
1496            None,
1497        );
1498    }
1499
1500    #[test]
1501    fn streaming_route_get_skipped() {
1502        let headers = s3_sigv4_headers();
1503        assert_eq!(
1504            streaming_route(
1505                &http::Method::GET,
1506                "/my-bucket/key.txt",
1507                &headers,
1508                &HashMap::new(),
1509            ),
1510            None,
1511        );
1512    }
1513}