Skip to main content

fakecloud_core/
service.rs

1use async_trait::async_trait;
2use bytes::Bytes;
3use http::{HeaderMap, Method, StatusCode};
4use md5::{Digest, Md5};
5use parking_lot::Mutex;
6use std::collections::{BTreeMap, HashMap};
7use std::path::PathBuf;
8
9use crate::auth::Principal;
10
11/// Streaming request body kept alongside the buffered `body: Bytes`. Set
12/// by dispatch only for routes that opt into streaming (S3 PutObject /
13/// UploadPart, ECR OCI blob upload PATCH/PUT). Service handlers call
14/// [`AwsRequest::take_body_stream`] to consume the raw stream without
15/// buffering the entire payload into memory; non-streaming services
16/// keep using `req.body` (which is empty `Bytes` for streaming routes).
17pub type RequestBodyStream = axum::body::Body;
18
19/// A parsed AWS request.
20pub struct AwsRequest {
21    pub service: String,
22    pub action: String,
23    pub region: String,
24    pub account_id: String,
25    pub request_id: String,
26    pub headers: HeaderMap,
27    pub query_params: HashMap<String, String>,
28    /// Buffered request body. For streaming routes this is `Bytes::new()`
29    /// and the raw body is available via [`AwsRequest::take_body_stream`].
30    pub body: Bytes,
31    /// Raw streaming body, populated only for streaming routes. Wrapped
32    /// in a Mutex so the per-service handler can `.take()` ownership
33    /// behind the shared `&AwsRequest` reference threaded through the
34    /// call chain.
35    pub body_stream: Mutex<Option<RequestBodyStream>>,
36    pub path_segments: Vec<String>,
37    /// The raw URI path, before splitting into segments.
38    pub raw_path: String,
39    /// The raw URI query string (everything after `?`), preserving repeated keys.
40    pub raw_query: String,
41    pub method: Method,
42    /// Whether this request came via Query (form-encoded) or JSON protocol.
43    pub is_query_protocol: bool,
44    /// The access key ID from the SigV4 Authorization header, if present.
45    pub access_key_id: Option<String>,
46    /// The resolved caller identity. `None` when the credential is unknown
47    /// or the caller used the reserved root-bypass credentials. Populated
48    /// by dispatch via the configured [`crate::auth::CredentialResolver`]
49    /// so service handlers can make identity-based decisions (e.g.
50    /// `GetCallerIdentity`, IAM enforcement) without re-parsing the
51    /// Authorization header.
52    pub principal: Option<Principal>,
53}
54
55impl std::fmt::Debug for AwsRequest {
56    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57        f.debug_struct("AwsRequest")
58            .field("service", &self.service)
59            .field("action", &self.action)
60            .field("region", &self.region)
61            .field("account_id", &self.account_id)
62            .field("request_id", &self.request_id)
63            .field("headers", &self.headers)
64            .field("query_params", &self.query_params)
65            .field("body_len", &self.body.len())
66            .field(
67                "body_stream",
68                &self.body_stream.lock().as_ref().map(|_| "<stream>"),
69            )
70            .field("path_segments", &self.path_segments)
71            .field("raw_path", &self.raw_path)
72            .field("raw_query", &self.raw_query)
73            .field("method", &self.method)
74            .field("is_query_protocol", &self.is_query_protocol)
75            .field("access_key_id", &self.access_key_id)
76            .field("principal", &self.principal)
77            .finish()
78    }
79}
80
81impl AwsRequest {
82    /// Parse the request body as JSON, returning `Value::Null` on failure.
83    pub fn json_body(&self) -> serde_json::Value {
84        serde_json::from_slice(&self.body).unwrap_or(serde_json::Value::Null)
85    }
86
87    /// Consume the streaming body if this request was dispatched as
88    /// streaming. Returns `None` for buffered requests; the buffered
89    /// body is available via [`AwsRequest::body`]. Calling this twice
90    /// returns `None` on the second call.
91    pub fn take_body_stream(&self) -> Option<RequestBodyStream> {
92        self.body_stream.lock().take()
93    }
94}
95
96/// Drain a streaming request body into a single [`Bytes`] buffer with no
97/// upper bound. Used by handlers that legitimately need the whole payload
98/// in memory (small JSON-shaped requests that happened to land on a
99/// streaming route, e.g. ECR `mount` PUT with no body). Heavy uploads
100/// (S3 PutObject / UploadPart, ECR blob PATCH/PUT) take the streaming
101/// spool path via [`spool_request_stream`] instead. The dispatch-level
102/// cap (`FAKECLOUD_MAX_REQUEST_BODY_BYTES`) does not apply to streaming
103/// routes; this helper exists so a service handler that knows the
104/// payload is small can buffer without dragging in `axum` itself.
105pub async fn drain_request_stream(stream: RequestBodyStream) -> Result<Bytes, AwsServiceError> {
106    use http_body_util::BodyExt;
107    match stream.collect().await {
108        Ok(c) => Ok(c.to_bytes()),
109        Err(e) => Err(stream_error_to_aws(&e.to_string())),
110    }
111}
112
113fn stream_error_to_aws(msg: &str) -> AwsServiceError {
114    // Hyper / axum surface `body limit exceeded` with a
115    // payload-too-large variant. Everything else (connection
116    // reset, malformed chunked encoding, premature EOF) maps
117    // to a 400 BadRequest so callers can distinguish.
118    let too_large = msg.to_ascii_lowercase().contains("limit");
119    let (status, code, message) = if too_large {
120        (
121            StatusCode::PAYLOAD_TOO_LARGE,
122            "RequestEntityTooLarge",
123            "Streaming request body exceeded the configured limit",
124        )
125    } else {
126        (
127            StatusCode::BAD_REQUEST,
128            "MalformedRequestBody",
129            "Failed to read streaming request body",
130        )
131    };
132    AwsServiceError::aws_error(status, code, message)
133}
134
135/// Outcome of spooling a streaming request body to disk: the path of the
136/// freshly created tempfile, the total byte count, and the MD5 hash of
137/// the bytes (lowercase hex, the form S3 uses for `ETag`).
138///
139/// The caller owns the file and is responsible for either consuming it
140/// (passing the [`PathBuf`] into a `BodySource::File` handed to a store)
141/// or unlinking it. Returning the file path instead of a handle lets the
142/// downstream store rename the file directly, which is the whole point —
143/// in disk-mode S3 a 1 GiB upload performs zero in-RAM copies of the
144/// payload.
145#[derive(Debug)]
146pub struct SpooledBody {
147    pub path: PathBuf,
148    pub size: u64,
149    pub md5_hex: String,
150}
151
152/// Incremental decoder for the `aws-chunked` content-encoding that modern AWS
153/// S3 clients (aws-cli, boto3 >= 1.36, aws-crt) apply by default to PutObject /
154/// UploadPart bodies when they send `x-amz-content-sha256:
155/// STREAMING-AWS4-HMAC-SHA256-PAYLOAD` (or `STREAMING-UNSIGNED-PAYLOAD-TRAILER`).
156///
157/// The wire format wraps the real payload in application-layer frames:
158/// `<hex-size>[;chunk-signature=<hex>]\r\n<data>\r\n` repeated, terminated by a
159/// `0`-size chunk, then optional `x-amz-trailer` lines, then a final `\r\n`.
160/// hyper only strips HTTP `Transfer-Encoding: chunked`, NOT this
161/// `Content-Encoding: aws-chunked` framing — so without decoding, the size /
162/// signature lines and trailers get stored as the object's bytes (silent
163/// corruption + a wrong ETag). This fed-incrementally because network frames do
164/// not align to chunk boundaries; trailer checksums are consumed but not
165/// re-validated (fakecloud computes its own checksums over the decoded bytes).
166#[derive(Default)]
167pub struct AwsChunkedDecoder {
168    state: ChunkState,
169    line: Vec<u8>,
170    remaining: usize,
171    done: bool,
172}
173
174#[derive(Default, PartialEq)]
175enum ChunkState {
176    #[default]
177    Header,
178    Data,
179    AfterData,
180    Trailer,
181}
182
183/// A malformed `aws-chunked` chunk-size line (non-hex length).
184#[derive(Debug, Clone, Copy, PartialEq, Eq)]
185pub struct MalformedChunk;
186
187impl AwsChunkedDecoder {
188    /// Feed a network frame; returns the decoded payload bytes it yielded.
189    /// Errors only on a malformed chunk-size line.
190    pub fn feed(&mut self, input: &[u8]) -> Result<Vec<u8>, MalformedChunk> {
191        let mut out = Vec::new();
192        let mut i = 0;
193        while i < input.len() && !self.done {
194            match self.state {
195                ChunkState::Data => {
196                    let take = self.remaining.min(input.len() - i);
197                    out.extend_from_slice(&input[i..i + take]);
198                    i += take;
199                    self.remaining -= take;
200                    if self.remaining == 0 {
201                        self.state = ChunkState::AfterData;
202                    }
203                }
204                ChunkState::AfterData => {
205                    // Consume the inter-chunk CRLF; next byte after \n is a header.
206                    while i < input.len() {
207                        let b = input[i];
208                        i += 1;
209                        if b == b'\n' {
210                            self.state = ChunkState::Header;
211                            break;
212                        }
213                    }
214                }
215                ChunkState::Header | ChunkState::Trailer => {
216                    let is_header = self.state == ChunkState::Header;
217                    while i < input.len() {
218                        let b = input[i];
219                        i += 1;
220                        if b == b'\n' {
221                            let line = std::mem::take(&mut self.line);
222                            if is_header {
223                                // size is the hex up to a `;` extension or EOL.
224                                let hex_part: &[u8] =
225                                    line.split(|&c| c == b';').next().unwrap_or(&[]);
226                                let hex = std::str::from_utf8(hex_part)
227                                    .map_err(|_| MalformedChunk)?
228                                    .trim();
229                                let size =
230                                    usize::from_str_radix(hex, 16).map_err(|_| MalformedChunk)?;
231                                if size == 0 {
232                                    self.state = ChunkState::Trailer;
233                                } else {
234                                    self.remaining = size;
235                                    self.state = ChunkState::Data;
236                                }
237                            } else if line.is_empty() {
238                                // Blank line ends the trailer section.
239                                self.done = true;
240                            }
241                            // (a non-empty trailer line is consumed and ignored)
242                            break;
243                        } else if b != b'\r' {
244                            self.line.push(b);
245                        }
246                    }
247                }
248            }
249        }
250        Ok(out)
251    }
252}
253
254/// Whether a request body carries the `aws-chunked` content-encoding (so the
255/// spool path must decode the framing). True when `Content-Encoding` lists
256/// `aws-chunked`, or `x-amz-content-sha256` is a `STREAMING-…` marker — both of
257/// which default modern S3 clients (aws-cli, boto3 >= 1.36, aws-crt) set.
258pub fn is_aws_chunked(headers: &http::HeaderMap) -> bool {
259    headers
260        .get("content-encoding")
261        .and_then(|v| v.to_str().ok())
262        .is_some_and(|v| {
263            v.split(',')
264                .any(|t| t.trim().eq_ignore_ascii_case("aws-chunked"))
265        })
266        || headers
267            .get("x-amz-content-sha256")
268            .and_then(|v| v.to_str().ok())
269            .is_some_and(|v| v.starts_with("STREAMING-"))
270}
271
272/// Strip the `aws-chunked` token from a client `Content-Encoding` so the stored
273/// object metadata reflects what AWS keeps (it consumes `aws-chunked` as a
274/// transfer detail; any remaining real encoding such as `gzip` is preserved).
275/// Returns `None` when nothing meaningful remains.
276pub fn strip_aws_chunked_encoding(content_encoding: Option<&str>) -> Option<String> {
277    let ce = content_encoding?;
278    let kept: Vec<&str> = ce
279        .split(',')
280        .map(|t| t.trim())
281        .filter(|t| !t.is_empty() && !t.eq_ignore_ascii_case("aws-chunked"))
282        .collect();
283    if kept.is_empty() {
284        None
285    } else {
286        Some(kept.join(", "))
287    }
288}
289
290/// Stream a request body to a tempfile on disk while computing its MD5
291/// and length on the fly. The body is **never** materialized into a
292/// single `Bytes` buffer; chunks flow from hyper -> Tokio file in
293/// constant memory. A 1 GiB PutObject moves through this function with
294/// peak resident memory bounded by hyper's per-frame buffer.
295///
296/// `dir` controls where the tempfile lands. S3 callers point this at
297/// the S3 object root so the eventual rename into the final storage
298/// path stays on the same filesystem and is a metadata-only move.
299/// Memory-mode callers can pass `None` for the system temp dir; the
300/// memory store reads the file back into bytes and unlinks it.
301///
302/// `aws_chunked` decodes the `Content-Encoding: aws-chunked` application-layer
303/// framing that default modern S3 clients apply (see [`AwsChunkedDecoder`]), so
304/// the spooled bytes, MD5/ETag, and size reflect the real payload — not the
305/// chunk-size/signature framing. Non-S3 callers (and raw `UNSIGNED-PAYLOAD`
306/// uploads) pass `false` and stream verbatim.
307pub async fn spool_request_stream(
308    stream: RequestBodyStream,
309    dir: Option<&std::path::Path>,
310    aws_chunked: bool,
311) -> Result<SpooledBody, AwsServiceError> {
312    use http_body_util::BodyExt;
313    use tokio::io::AsyncWriteExt;
314
315    let dir = dir.map(|d| d.to_path_buf());
316    if let Some(d) = dir.as_ref() {
317        // Best-effort create; an existing dir is fine.
318        let _ = tokio::fs::create_dir_all(d).await;
319    }
320
321    let mut builder = tempfile::Builder::new();
322    builder.prefix("fc-spool-");
323    let named = match dir.as_ref() {
324        Some(d) => builder.tempfile_in(d),
325        None => builder.tempfile(),
326    }
327    .map_err(|e| {
328        AwsServiceError::aws_error(
329            StatusCode::INTERNAL_SERVER_ERROR,
330            "InternalError",
331            format!("failed to create spool tempfile: {e}"),
332        )
333    })?;
334
335    // `into_temp_path` would auto-delete on drop. We keep the path and
336    // assume responsibility for either persisting or unlinking it.
337    let (std_file, temp_path) = named.into_parts();
338    // Persist to a stable PathBuf — `keep()` releases the
339    // delete-on-drop guard so the file outlives this function.
340    let path: PathBuf = temp_path.keep().map_err(|e| {
341        AwsServiceError::aws_error(
342            StatusCode::INTERNAL_SERVER_ERROR,
343            "InternalError",
344            format!("failed to persist spool tempfile: {e}"),
345        )
346    })?;
347
348    let mut file = tokio::fs::File::from_std(std_file);
349    let mut hasher = Md5::new();
350    let mut size: u64 = 0;
351    let mut body = stream;
352    let mut decoder = aws_chunked.then(AwsChunkedDecoder::default);
353
354    // Cleanup helper: drop the file handle before unlinking so
355    // platforms that disallow removing an open file (Windows) still
356    // collect the partial spool. `drop(file)` closes the underlying
357    // OS handle synchronously.
358    async fn cleanup(file: tokio::fs::File, path: &std::path::Path) {
359        drop(file);
360        let _ = tokio::fs::remove_file(path).await;
361    }
362
363    loop {
364        match body.frame().await {
365            Some(Ok(frame)) => {
366                if let Ok(raw) = frame.into_data() {
367                    if !raw.is_empty() {
368                        // Decode aws-chunked framing into the real payload when
369                        // the client used it; otherwise the frame IS the payload.
370                        let payload = match decoder.as_mut() {
371                            Some(d) => match d.feed(&raw) {
372                                Ok(decoded) => decoded,
373                                Err(_) => {
374                                    cleanup(file, &path).await;
375                                    return Err(AwsServiceError::aws_error(
376                                        StatusCode::BAD_REQUEST,
377                                        "InvalidChunkSizeError",
378                                        "Malformed aws-chunked request body",
379                                    ));
380                                }
381                            },
382                            None => raw.to_vec(),
383                        };
384                        if !payload.is_empty() {
385                            hasher.update(&payload);
386                            size += payload.len() as u64;
387                            if let Err(e) = file.write_all(&payload).await {
388                                cleanup(file, &path).await;
389                                return Err(AwsServiceError::aws_error(
390                                    StatusCode::INTERNAL_SERVER_ERROR,
391                                    "InternalError",
392                                    format!("failed to spool request body: {e}"),
393                                ));
394                            }
395                        }
396                    }
397                }
398                // HTTP trailers are ignored; aws-chunked trailers are consumed
399                // inside the decoder.
400            }
401            Some(Err(e)) => {
402                cleanup(file, &path).await;
403                return Err(stream_error_to_aws(&e.to_string()));
404            }
405            None => break,
406        }
407    }
408
409    if let Err(e) = file.flush().await {
410        cleanup(file, &path).await;
411        return Err(AwsServiceError::aws_error(
412            StatusCode::INTERNAL_SERVER_ERROR,
413            "InternalError",
414            format!("failed to flush spool tempfile: {e}"),
415        ));
416    }
417    drop(file);
418
419    let md5_hex = hex_lower(&hasher.finalize());
420    Ok(SpooledBody {
421        path,
422        size,
423        md5_hex,
424    })
425}
426
427fn hex_lower(bytes: &[u8]) -> String {
428    const HEX: &[u8] = b"0123456789abcdef";
429    let mut out = String::with_capacity(bytes.len() * 2);
430    for b in bytes {
431        out.push(HEX[(b >> 4) as usize] as char);
432        out.push(HEX[(b & 0x0f) as usize] as char);
433    }
434    out
435}
436
437/// A response body. Most handlers return [`ResponseBody::Bytes`] built from
438/// an in-memory [`Bytes`] buffer; the [`File`](ResponseBody::File) variant
439/// exists so large disk-backed objects can be streamed straight from the
440/// filesystem to the HTTP body without being materialized into RAM. The file
441/// handle is opened by the service handler while it still holds the
442/// per-bucket read guard, so the reader sees a consistent inode even if a
443/// concurrent PUT/DELETE renames or unlinks the path before dispatch streams
444/// the body.
445#[derive(Debug)]
446pub enum ResponseBody {
447    Bytes(Bytes),
448    File { file: tokio::fs::File, size: u64 },
449}
450
451impl ResponseBody {
452    pub fn len(&self) -> u64 {
453        match self {
454            ResponseBody::Bytes(b) => b.len() as u64,
455            ResponseBody::File { size, .. } => *size,
456        }
457    }
458
459    pub fn is_empty(&self) -> bool {
460        self.len() == 0
461    }
462
463    /// Accessor that returns the bytes of a `Bytes` variant and panics for
464    /// `File`. Used by tests and by callers that know the response was built
465    /// from an in-memory buffer (JSON handlers, cross-service glue).
466    pub fn expect_bytes(&self) -> &[u8] {
467        match self {
468            ResponseBody::Bytes(b) => b,
469            ResponseBody::File { .. } => {
470                panic!("expect_bytes called on ResponseBody::File")
471            }
472        }
473    }
474}
475
476impl Default for ResponseBody {
477    fn default() -> Self {
478        ResponseBody::Bytes(Bytes::new())
479    }
480}
481
482impl From<Bytes> for ResponseBody {
483    fn from(b: Bytes) -> Self {
484        ResponseBody::Bytes(b)
485    }
486}
487
488impl From<Vec<u8>> for ResponseBody {
489    fn from(v: Vec<u8>) -> Self {
490        ResponseBody::Bytes(Bytes::from(v))
491    }
492}
493
494impl From<&'static [u8]> for ResponseBody {
495    fn from(s: &'static [u8]) -> Self {
496        ResponseBody::Bytes(Bytes::from_static(s))
497    }
498}
499
500impl From<String> for ResponseBody {
501    fn from(s: String) -> Self {
502        ResponseBody::Bytes(Bytes::from(s))
503    }
504}
505
506impl From<&'static str> for ResponseBody {
507    fn from(s: &'static str) -> Self {
508        ResponseBody::Bytes(Bytes::from_static(s.as_bytes()))
509    }
510}
511
512impl PartialEq<Bytes> for ResponseBody {
513    fn eq(&self, other: &Bytes) -> bool {
514        match self {
515            ResponseBody::Bytes(b) => b == other,
516            ResponseBody::File { .. } => false,
517        }
518    }
519}
520
521/// A response from a service handler.
522pub struct AwsResponse {
523    pub status: StatusCode,
524    pub content_type: String,
525    pub body: ResponseBody,
526    pub headers: HeaderMap,
527}
528
529impl AwsResponse {
530    pub fn xml(status: StatusCode, body: impl Into<Bytes>) -> Self {
531        Self {
532            status,
533            content_type: "text/xml".to_string(),
534            body: ResponseBody::Bytes(body.into()),
535            headers: HeaderMap::new(),
536        }
537    }
538
539    pub fn json(status: StatusCode, body: impl Into<Bytes>) -> Self {
540        Self {
541            status,
542            content_type: "application/x-amz-json-1.1".to_string(),
543            body: ResponseBody::Bytes(body.into()),
544            headers: HeaderMap::new(),
545        }
546    }
547
548    /// Build a JSON response from a `serde_json::Value` with an explicit status.
549    ///
550    /// Serialization of an in-memory `Value` cannot fail — it has no cycles and
551    /// no custom serializers — so the inner `to_vec` is documented as infallible
552    /// rather than left as a bare `unwrap()`.
553    pub fn json_value(status: StatusCode, value: serde_json::Value) -> Self {
554        Self::json(
555            status,
556            serde_json::to_vec(&value).expect("serde_json::Value serialization is infallible"),
557        )
558    }
559
560    /// Convenience constructor for a 200 OK JSON response from a `serde_json::Value`.
561    pub fn ok_json(value: serde_json::Value) -> Self {
562        Self::json_value(StatusCode::OK, value)
563    }
564}
565
566/// Error returned by service handlers.
567#[derive(Debug, thiserror::Error)]
568pub enum AwsServiceError {
569    #[error("service not found: {service}")]
570    ServiceNotFound { service: String },
571
572    #[error("action {action} not implemented for service {service}")]
573    ActionNotImplemented { service: String, action: String },
574
575    #[error("{code}: {message}")]
576    AwsError {
577        status: StatusCode,
578        code: String,
579        message: String,
580        /// Additional key-value pairs to include in the error XML (e.g., BucketName, Key, Condition).
581        extra_fields: Vec<(String, String)>,
582        /// Additional HTTP headers to include in the error response.
583        headers: Vec<(String, String)>,
584    },
585}
586
587impl AwsServiceError {
588    pub fn action_not_implemented(service: &str, action: &str) -> Self {
589        Self::ActionNotImplemented {
590            service: service.to_string(),
591            action: action.to_string(),
592        }
593    }
594
595    pub fn aws_error(
596        status: StatusCode,
597        code: impl Into<String>,
598        message: impl Into<String>,
599    ) -> Self {
600        Self::AwsError {
601            status,
602            code: code.into(),
603            message: message.into(),
604            extra_fields: Vec::new(),
605            headers: Vec::new(),
606        }
607    }
608
609    pub fn aws_error_with_fields(
610        status: StatusCode,
611        code: impl Into<String>,
612        message: impl Into<String>,
613        extra_fields: Vec<(String, String)>,
614    ) -> Self {
615        Self::AwsError {
616            status,
617            code: code.into(),
618            message: message.into(),
619            extra_fields,
620            headers: Vec::new(),
621        }
622    }
623
624    pub fn aws_error_with_headers(
625        status: StatusCode,
626        code: impl Into<String>,
627        message: impl Into<String>,
628        headers: Vec<(String, String)>,
629    ) -> Self {
630        Self::AwsError {
631            status,
632            code: code.into(),
633            message: message.into(),
634            extra_fields: Vec::new(),
635            headers,
636        }
637    }
638
639    pub fn extra_fields(&self) -> &[(String, String)] {
640        match self {
641            Self::AwsError { extra_fields, .. } => extra_fields,
642            _ => &[],
643        }
644    }
645
646    pub fn status(&self) -> StatusCode {
647        match self {
648            Self::ServiceNotFound { .. } => StatusCode::BAD_REQUEST,
649            Self::ActionNotImplemented { .. } => StatusCode::NOT_IMPLEMENTED,
650            Self::AwsError { status, .. } => *status,
651        }
652    }
653
654    pub fn code(&self) -> &str {
655        match self {
656            Self::ServiceNotFound { .. } => "UnknownService",
657            Self::ActionNotImplemented { .. } => "InvalidAction",
658            Self::AwsError { code, .. } => code,
659        }
660    }
661
662    pub fn message(&self) -> String {
663        match self {
664            Self::ServiceNotFound { service } => format!("service not found: {service}"),
665            Self::ActionNotImplemented { service, action } => {
666                format!("action {action} not implemented for service {service}")
667            }
668            Self::AwsError { message, .. } => message.clone(),
669        }
670    }
671
672    pub fn response_headers(&self) -> &[(String, String)] {
673        match self {
674            Self::AwsError { headers, .. } => headers,
675            _ => &[],
676        }
677    }
678}
679
680/// Trait that every AWS service implements.
681#[async_trait]
682pub trait AwsService: Send + Sync {
683    /// The AWS service identifier (e.g., "sqs", "sns", "sts", "events", "ssm").
684    fn service_name(&self) -> &str;
685
686    /// Handle an incoming request.
687    async fn handle(&self, request: AwsRequest) -> Result<AwsResponse, AwsServiceError>;
688
689    /// List of actions this service supports (for introspection).
690    fn supported_actions(&self) -> &[&str];
691
692    /// Whether this service participates in opt-in IAM enforcement
693    /// (`FAKECLOUD_IAM=soft|strict`).
694    ///
695    /// Defaults to `false`: unless a service has a full
696    /// `iam_action_for` implementation covering every operation it
697    /// supports plus resource-ARN extractors, it's silently skipped when
698    /// IAM enforcement is on. The startup log enumerates which services
699    /// are enforced and which are not so users always know the current
700    /// enforcement surface.
701    ///
702    /// Phase 1 contract: a service that returns `true` here MUST also
703    /// provide a fully populated [`AwsService::iam_action_for`]
704    /// implementation covering every action it advertises. Returning
705    /// `true` without the action mapping is a programming bug.
706    fn iam_enforceable(&self) -> bool {
707        false
708    }
709
710    /// Derive the IAM action + resource ARN for an incoming request.
711    ///
712    /// Only called when [`AwsService::iam_enforceable`] returns `true`
713    /// and IAM enforcement is enabled. Services must map every action
714    /// they implement; returning `None` for a covered action causes the
715    /// evaluator to skip the request and flag it via the
716    /// `fakecloud::iam::audit` tracing target so gaps are visible in
717    /// soft mode.
718    ///
719    /// The `IamAction.resource` is built from `request.principal`'s
720    /// account id (not global config) so multi-account isolation
721    /// (#381) works once per-account state partitioning lands.
722    fn iam_action_for(&self, _request: &AwsRequest) -> Option<crate::auth::IamAction> {
723        None
724    }
725
726    /// Derive service-specific IAM condition keys for an incoming request.
727    ///
728    /// Called right after [`AwsService::iam_action_for`] when IAM
729    /// enforcement is enabled. The returned map is merged into the
730    /// [`crate::auth::ConditionContext::service_keys`] before the
731    /// evaluator runs, so policies can reference keys like `s3:prefix`
732    /// or `sns:Protocol` the same way they reference global keys.
733    ///
734    /// Keys MUST be in the full `"service:key"` form, lowercased
735    /// (e.g. `"s3:prefix"`), matching the case-insensitive lookup in
736    /// [`crate::auth::ConditionContext::lookup`]. Extractors should
737    /// only emit keys they can populate with confidence; anything
738    /// ambiguous or unimplemented should be skipped with a
739    /// `tracing::debug!(target: "fakecloud::iam::audit", ...)` so
740    /// condition evaluation safe-fails to "doesn't apply" rather than
741    /// "matches".
742    ///
743    /// Default impl returns an empty map: services that haven't been
744    /// plumbed yet behave exactly as before.
745    fn iam_condition_keys_for(
746        &self,
747        _request: &AwsRequest,
748        _action: &crate::auth::IamAction,
749    ) -> BTreeMap<String, Vec<String>> {
750        BTreeMap::new()
751    }
752
753    /// Return the tags on the resource identified by `resource_arn`.
754    ///
755    /// Called at dispatch time when IAM enforcement is enabled, right
756    /// after [`AwsService::iam_action_for`]. The returned map populates
757    /// `aws:ResourceTag/<key>` condition keys so policies can gate
758    /// access based on the target resource's tags.
759    ///
760    /// Return `None` to signal that this service does not (yet) support
761    /// resource-tag ABAC — dispatch will emit a debug audit log and
762    /// skip `aws:ResourceTag/*` evaluation. Return `Some(empty map)`
763    /// when the resource exists but has no tags.
764    fn resource_tags_for(
765        &self,
766        _resource_arn: &str,
767    ) -> Option<std::collections::HashMap<String, String>> {
768        None
769    }
770
771    /// Extract tags being sent in the request (e.g. on CreateQueue,
772    /// PutObject with `x-amz-tagging`, TagResource).
773    ///
774    /// The returned map populates `aws:RequestTag/<key>` and
775    /// `aws:TagKeys` condition keys. Return `None` when the service
776    /// does not (yet) support request-tag extraction — dispatch skips
777    /// `aws:RequestTag/*` / `aws:TagKeys` evaluation with a debug log.
778    /// Return `Some(empty map)` when the request legitimately carries
779    /// no tags.
780    fn request_tags_from(
781        &self,
782        _request: &AwsRequest,
783        _action: &str,
784    ) -> Option<std::collections::HashMap<String, String>> {
785        None
786    }
787}
788
789#[cfg(test)]
790mod tests {
791    use super::*;
792    use crate::auth::IamAction;
793    use async_trait::async_trait;
794
795    /// Build a signed aws-chunked body for `payload`, split into chunks of
796    /// `chunk_size`, terminated by a 0-chunk + a trailer + final CRLF.
797    fn aws_chunked_body(payload: &[u8], chunk_size: usize, with_trailer: bool) -> Vec<u8> {
798        let sig = "0".repeat(64);
799        let mut out = Vec::new();
800        for c in payload.chunks(chunk_size.max(1)) {
801            out.extend_from_slice(format!("{:x};chunk-signature={sig}\r\n", c.len()).as_bytes());
802            out.extend_from_slice(c);
803            out.extend_from_slice(b"\r\n");
804        }
805        out.extend_from_slice(format!("0;chunk-signature={sig}\r\n").as_bytes());
806        if with_trailer {
807            out.extend_from_slice(b"x-amz-checksum-crc32:AAAAAA==\r\n");
808        }
809        out.extend_from_slice(b"\r\n");
810        out
811    }
812
813    fn decode_all(body: &[u8], feed_size: usize) -> Vec<u8> {
814        let mut d = AwsChunkedDecoder::default();
815        let mut out = Vec::new();
816        for frame in body.chunks(feed_size.max(1)) {
817            out.extend(d.feed(frame).expect("valid chunked body"));
818        }
819        out
820    }
821
822    #[test]
823    fn aws_chunked_decoder_roundtrips_across_frame_boundaries() {
824        let payload: Vec<u8> = (0..5000u32).map(|i| (i % 251) as u8).collect();
825        // Chunked with 1 KiB data chunks; with and without a trailer.
826        for with_trailer in [false, true] {
827            let body = aws_chunked_body(&payload, 1024, with_trailer);
828            // Network frames don't align to chunk boundaries: try several sizes,
829            // including 1 byte at a time and a single whole-body frame.
830            for feed in [1usize, 7, 64, 1000, body.len()] {
831                let decoded = decode_all(&body, feed);
832                assert_eq!(decoded, payload, "feed={feed} trailer={with_trailer}");
833            }
834        }
835    }
836
837    #[test]
838    fn aws_chunked_decoder_handles_empty_payload() {
839        let body = aws_chunked_body(b"", 1024, false);
840        assert_eq!(decode_all(&body, 3), Vec::<u8>::new());
841    }
842
843    #[test]
844    fn aws_chunked_decoder_rejects_bad_size_line() {
845        let mut d = AwsChunkedDecoder::default();
846        assert!(d.feed(b"zz;chunk-signature=x\r\n").is_err());
847    }
848
849    #[test]
850    fn is_aws_chunked_detects_streaming_markers() {
851        let mut h = http::HeaderMap::new();
852        assert!(!is_aws_chunked(&h));
853        h.insert("content-encoding", "aws-chunked".parse().unwrap());
854        assert!(is_aws_chunked(&h));
855        let mut h2 = http::HeaderMap::new();
856        h2.insert(
857            "x-amz-content-sha256",
858            "STREAMING-AWS4-HMAC-SHA256-PAYLOAD".parse().unwrap(),
859        );
860        assert!(is_aws_chunked(&h2));
861        // gzip without aws-chunked must NOT trigger decoding.
862        let mut h3 = http::HeaderMap::new();
863        h3.insert("content-encoding", "gzip".parse().unwrap());
864        assert!(!is_aws_chunked(&h3));
865    }
866
867    #[test]
868    fn strip_aws_chunked_keeps_real_encoding() {
869        assert_eq!(strip_aws_chunked_encoding(Some("aws-chunked")), None);
870        assert_eq!(
871            strip_aws_chunked_encoding(Some("aws-chunked, gzip")).as_deref(),
872            Some("gzip")
873        );
874        assert_eq!(
875            strip_aws_chunked_encoding(Some("gzip")).as_deref(),
876            Some("gzip")
877        );
878        assert_eq!(strip_aws_chunked_encoding(None), None);
879    }
880
881    struct DefaultService;
882
883    #[async_trait]
884    impl AwsService for DefaultService {
885        fn service_name(&self) -> &str {
886            "default"
887        }
888        async fn handle(&self, _request: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
889            unreachable!()
890        }
891        fn supported_actions(&self) -> &[&str] {
892            &[]
893        }
894    }
895
896    struct PopulatedService;
897
898    #[async_trait]
899    impl AwsService for PopulatedService {
900        fn service_name(&self) -> &str {
901            "populated"
902        }
903        async fn handle(&self, _request: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
904            unreachable!()
905        }
906        fn supported_actions(&self) -> &[&str] {
907            &[]
908        }
909        fn iam_condition_keys_for(
910            &self,
911            _request: &AwsRequest,
912            _action: &IamAction,
913        ) -> BTreeMap<String, Vec<String>> {
914            let mut m = BTreeMap::new();
915            m.insert("s3:prefix".to_string(), vec!["logs/".to_string()]);
916            m
917        }
918    }
919
920    fn sample_request() -> AwsRequest {
921        AwsRequest {
922            service: "default".into(),
923            action: "Noop".into(),
924            region: "us-east-1".into(),
925            account_id: "123456789012".into(),
926            request_id: "req-1".into(),
927            headers: HeaderMap::new(),
928            query_params: HashMap::new(),
929            body: Bytes::new(),
930            body_stream: parking_lot::Mutex::new(None),
931            path_segments: vec![],
932            raw_path: "/".into(),
933            raw_query: String::new(),
934            method: Method::GET,
935            is_query_protocol: false,
936            access_key_id: None,
937            principal: None,
938        }
939    }
940
941    fn sample_action() -> IamAction {
942        IamAction {
943            service: "s3",
944            action: "ListBucket",
945            resource: "arn:aws:s3:::my-bucket".to_string(),
946        }
947    }
948
949    #[test]
950    fn iam_condition_keys_for_default_is_empty() {
951        let svc = DefaultService;
952        let keys = svc.iam_condition_keys_for(&sample_request(), &sample_action());
953        assert!(keys.is_empty());
954    }
955
956    #[test]
957    fn iam_condition_keys_for_override_returns_map() {
958        let svc = PopulatedService;
959        let keys = svc.iam_condition_keys_for(&sample_request(), &sample_action());
960        assert_eq!(keys.get("s3:prefix"), Some(&vec!["logs/".to_string()]));
961    }
962
963    #[test]
964    fn response_body_len_and_is_empty_for_bytes() {
965        let body: ResponseBody = Bytes::from_static(b"hello").into();
966        assert_eq!(body.len(), 5);
967        assert!(!body.is_empty());
968        let empty: ResponseBody = ResponseBody::default();
969        assert!(empty.is_empty());
970    }
971
972    #[test]
973    fn response_body_from_vec_and_string_and_str() {
974        let from_vec: ResponseBody = vec![1u8, 2, 3].into();
975        assert_eq!(from_vec.expect_bytes(), &[1, 2, 3][..]);
976        let from_string: ResponseBody = String::from("hi").into();
977        assert_eq!(from_string.expect_bytes(), b"hi");
978        let from_str: ResponseBody = "hey".into();
979        assert_eq!(from_str.expect_bytes(), b"hey");
980        let from_static: ResponseBody = (b"123" as &'static [u8]).into();
981        assert_eq!(from_static.expect_bytes(), b"123");
982    }
983
984    #[test]
985    fn response_body_partial_eq_bytes() {
986        let body: ResponseBody = Bytes::from_static(b"x").into();
987        assert!(body == Bytes::from_static(b"x"));
988        assert!(!(body == Bytes::from_static(b"y")));
989    }
990
991    #[test]
992    fn aws_request_json_body_empty_returns_null() {
993        let req = sample_request();
994        assert_eq!(req.json_body(), serde_json::Value::Null);
995    }
996
997    #[test]
998    fn aws_request_json_body_parses_valid() {
999        let mut req = sample_request();
1000        req.body = Bytes::from_static(br#"{"a":1}"#);
1001        assert_eq!(req.json_body(), serde_json::json!({"a": 1}));
1002    }
1003
1004    #[test]
1005    fn aws_response_xml_constructor() {
1006        let resp = AwsResponse::xml(StatusCode::OK, Bytes::from_static(b"<ok/>"));
1007        assert_eq!(resp.status, StatusCode::OK);
1008        assert_eq!(resp.content_type, "text/xml");
1009    }
1010
1011    #[test]
1012    fn aws_response_json_constructor() {
1013        let resp = AwsResponse::json(StatusCode::CREATED, "{}");
1014        assert_eq!(resp.status, StatusCode::CREATED);
1015        assert_eq!(resp.content_type, "application/x-amz-json-1.1");
1016    }
1017
1018    #[test]
1019    fn aws_response_ok_json_helper() {
1020        let resp = AwsResponse::ok_json(serde_json::json!({"ok": true}));
1021        assert_eq!(resp.status, StatusCode::OK);
1022        assert!(resp.body.expect_bytes().starts_with(b"{"));
1023    }
1024
1025    #[test]
1026    fn aws_error_service_not_found_fields() {
1027        let err = AwsServiceError::ServiceNotFound {
1028            service: "sqs".to_string(),
1029        };
1030        assert_eq!(err.status(), StatusCode::BAD_REQUEST);
1031        assert_eq!(err.code(), "UnknownService");
1032        assert!(err.message().contains("sqs"));
1033        assert!(err.extra_fields().is_empty());
1034        assert!(err.response_headers().is_empty());
1035    }
1036
1037    #[test]
1038    fn aws_error_action_not_implemented_fields() {
1039        let err = AwsServiceError::action_not_implemented("sns", "FutureAction");
1040        assert_eq!(err.status(), StatusCode::NOT_IMPLEMENTED);
1041        assert_eq!(err.code(), "InvalidAction");
1042        assert!(err.message().contains("FutureAction"));
1043        assert!(err.message().contains("sns"));
1044    }
1045
1046    #[test]
1047    fn aws_error_aws_error_helpers() {
1048        let e = AwsServiceError::aws_error(StatusCode::FORBIDDEN, "Denied", "no");
1049        assert_eq!(e.status(), StatusCode::FORBIDDEN);
1050        assert_eq!(e.code(), "Denied");
1051        assert_eq!(e.message(), "no");
1052
1053        let fields = vec![("Bucket".to_string(), "b".to_string())];
1054        let ef = AwsServiceError::aws_error_with_fields(
1055            StatusCode::NOT_FOUND,
1056            "Missing",
1057            "gone",
1058            fields.clone(),
1059        );
1060        assert_eq!(ef.extra_fields(), fields.as_slice());
1061
1062        let hdrs = vec![("X-Retry".to_string(), "1".to_string())];
1063        let eh = AwsServiceError::aws_error_with_headers(
1064            StatusCode::TOO_MANY_REQUESTS,
1065            "Throttled",
1066            "slow",
1067            hdrs.clone(),
1068        );
1069        assert_eq!(eh.response_headers(), hdrs.as_slice());
1070    }
1071
1072    #[test]
1073    #[should_panic(expected = "expect_bytes called on ResponseBody::File")]
1074    fn response_body_expect_bytes_panics_on_file() {
1075        let f = std::fs::File::create(std::env::temp_dir().join("fc-test-expect-file")).unwrap();
1076        let async_f = tokio::fs::File::from_std(f);
1077        let body = ResponseBody::File {
1078            file: async_f,
1079            size: 0,
1080        };
1081        let _ = body.expect_bytes();
1082    }
1083}