1use async_trait::async_trait;
2use bytes::Bytes;
3use http::{HeaderMap, Method, StatusCode};
4use md5::{Digest, Md5};
5use parking_lot::Mutex;
6use std::collections::{BTreeMap, HashMap};
7use std::path::PathBuf;
8
9use crate::auth::Principal;
10
11pub type RequestBodyStream = axum::body::Body;
18
19pub struct AwsRequest {
21 pub service: String,
22 pub action: String,
23 pub region: String,
24 pub account_id: String,
25 pub request_id: String,
26 pub headers: HeaderMap,
27 pub query_params: HashMap<String, String>,
28 pub body: Bytes,
31 pub body_stream: Mutex<Option<RequestBodyStream>>,
36 pub path_segments: Vec<String>,
37 pub raw_path: String,
39 pub raw_query: String,
41 pub method: Method,
42 pub is_query_protocol: bool,
44 pub access_key_id: Option<String>,
46 pub principal: Option<Principal>,
53}
54
55impl std::fmt::Debug for AwsRequest {
56 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57 f.debug_struct("AwsRequest")
58 .field("service", &self.service)
59 .field("action", &self.action)
60 .field("region", &self.region)
61 .field("account_id", &self.account_id)
62 .field("request_id", &self.request_id)
63 .field("headers", &self.headers)
64 .field("query_params", &self.query_params)
65 .field("body_len", &self.body.len())
66 .field(
67 "body_stream",
68 &self.body_stream.lock().as_ref().map(|_| "<stream>"),
69 )
70 .field("path_segments", &self.path_segments)
71 .field("raw_path", &self.raw_path)
72 .field("raw_query", &self.raw_query)
73 .field("method", &self.method)
74 .field("is_query_protocol", &self.is_query_protocol)
75 .field("access_key_id", &self.access_key_id)
76 .field("principal", &self.principal)
77 .finish()
78 }
79}
80
81impl AwsRequest {
82 pub fn json_body(&self) -> serde_json::Value {
84 serde_json::from_slice(&self.body).unwrap_or(serde_json::Value::Null)
85 }
86
87 pub fn take_body_stream(&self) -> Option<RequestBodyStream> {
92 self.body_stream.lock().take()
93 }
94}
95
96pub async fn drain_request_stream(stream: RequestBodyStream) -> Result<Bytes, AwsServiceError> {
106 use http_body_util::BodyExt;
107 match stream.collect().await {
108 Ok(c) => Ok(c.to_bytes()),
109 Err(e) => Err(stream_error_to_aws(&e.to_string())),
110 }
111}
112
113fn stream_error_to_aws(msg: &str) -> AwsServiceError {
114 let too_large = msg.to_ascii_lowercase().contains("limit");
119 let (status, code, message) = if too_large {
120 (
121 StatusCode::PAYLOAD_TOO_LARGE,
122 "RequestEntityTooLarge",
123 "Streaming request body exceeded the configured limit",
124 )
125 } else {
126 (
127 StatusCode::BAD_REQUEST,
128 "MalformedRequestBody",
129 "Failed to read streaming request body",
130 )
131 };
132 AwsServiceError::aws_error(status, code, message)
133}
134
135#[derive(Debug)]
146pub struct SpooledBody {
147 pub path: PathBuf,
148 pub size: u64,
149 pub md5_hex: String,
150}
151
152#[derive(Default)]
167pub struct AwsChunkedDecoder {
168 state: ChunkState,
169 line: Vec<u8>,
170 remaining: usize,
171 done: bool,
172}
173
174#[derive(Default, PartialEq)]
175enum ChunkState {
176 #[default]
177 Header,
178 Data,
179 AfterData,
180 Trailer,
181}
182
183#[derive(Debug, Clone, Copy, PartialEq, Eq)]
185pub struct MalformedChunk;
186
187impl AwsChunkedDecoder {
188 pub fn feed(&mut self, input: &[u8]) -> Result<Vec<u8>, MalformedChunk> {
191 let mut out = Vec::new();
192 let mut i = 0;
193 while i < input.len() && !self.done {
194 match self.state {
195 ChunkState::Data => {
196 let take = self.remaining.min(input.len() - i);
197 out.extend_from_slice(&input[i..i + take]);
198 i += take;
199 self.remaining -= take;
200 if self.remaining == 0 {
201 self.state = ChunkState::AfterData;
202 }
203 }
204 ChunkState::AfterData => {
205 while i < input.len() {
207 let b = input[i];
208 i += 1;
209 if b == b'\n' {
210 self.state = ChunkState::Header;
211 break;
212 }
213 }
214 }
215 ChunkState::Header | ChunkState::Trailer => {
216 let is_header = self.state == ChunkState::Header;
217 while i < input.len() {
218 let b = input[i];
219 i += 1;
220 if b == b'\n' {
221 let line = std::mem::take(&mut self.line);
222 if is_header {
223 let hex_part: &[u8] =
225 line.split(|&c| c == b';').next().unwrap_or(&[]);
226 let hex = std::str::from_utf8(hex_part)
227 .map_err(|_| MalformedChunk)?
228 .trim();
229 let size =
230 usize::from_str_radix(hex, 16).map_err(|_| MalformedChunk)?;
231 if size == 0 {
232 self.state = ChunkState::Trailer;
233 } else {
234 self.remaining = size;
235 self.state = ChunkState::Data;
236 }
237 } else if line.is_empty() {
238 self.done = true;
240 }
241 break;
243 } else if b != b'\r' {
244 self.line.push(b);
245 }
246 }
247 }
248 }
249 }
250 Ok(out)
251 }
252}
253
254pub fn is_aws_chunked(headers: &http::HeaderMap) -> bool {
259 headers
260 .get("content-encoding")
261 .and_then(|v| v.to_str().ok())
262 .is_some_and(|v| {
263 v.split(',')
264 .any(|t| t.trim().eq_ignore_ascii_case("aws-chunked"))
265 })
266 || headers
267 .get("x-amz-content-sha256")
268 .and_then(|v| v.to_str().ok())
269 .is_some_and(|v| v.starts_with("STREAMING-"))
270}
271
272pub fn strip_aws_chunked_encoding(content_encoding: Option<&str>) -> Option<String> {
277 let ce = content_encoding?;
278 let kept: Vec<&str> = ce
279 .split(',')
280 .map(|t| t.trim())
281 .filter(|t| !t.is_empty() && !t.eq_ignore_ascii_case("aws-chunked"))
282 .collect();
283 if kept.is_empty() {
284 None
285 } else {
286 Some(kept.join(", "))
287 }
288}
289
290pub async fn spool_request_stream(
308 stream: RequestBodyStream,
309 dir: Option<&std::path::Path>,
310 aws_chunked: bool,
311) -> Result<SpooledBody, AwsServiceError> {
312 use http_body_util::BodyExt;
313 use tokio::io::AsyncWriteExt;
314
315 let dir = dir.map(|d| d.to_path_buf());
316 if let Some(d) = dir.as_ref() {
317 let _ = tokio::fs::create_dir_all(d).await;
319 }
320
321 let mut builder = tempfile::Builder::new();
322 builder.prefix("fc-spool-");
323 let named = match dir.as_ref() {
324 Some(d) => builder.tempfile_in(d),
325 None => builder.tempfile(),
326 }
327 .map_err(|e| {
328 AwsServiceError::aws_error(
329 StatusCode::INTERNAL_SERVER_ERROR,
330 "InternalError",
331 format!("failed to create spool tempfile: {e}"),
332 )
333 })?;
334
335 let (std_file, temp_path) = named.into_parts();
338 let path: PathBuf = temp_path.keep().map_err(|e| {
341 AwsServiceError::aws_error(
342 StatusCode::INTERNAL_SERVER_ERROR,
343 "InternalError",
344 format!("failed to persist spool tempfile: {e}"),
345 )
346 })?;
347
348 let mut file = tokio::fs::File::from_std(std_file);
349 let mut hasher = Md5::new();
350 let mut size: u64 = 0;
351 let mut body = stream;
352 let mut decoder = aws_chunked.then(AwsChunkedDecoder::default);
353
354 async fn cleanup(file: tokio::fs::File, path: &std::path::Path) {
359 drop(file);
360 let _ = tokio::fs::remove_file(path).await;
361 }
362
363 loop {
364 match body.frame().await {
365 Some(Ok(frame)) => {
366 if let Ok(raw) = frame.into_data() {
367 if !raw.is_empty() {
368 let payload = match decoder.as_mut() {
371 Some(d) => match d.feed(&raw) {
372 Ok(decoded) => decoded,
373 Err(_) => {
374 cleanup(file, &path).await;
375 return Err(AwsServiceError::aws_error(
376 StatusCode::BAD_REQUEST,
377 "InvalidChunkSizeError",
378 "Malformed aws-chunked request body",
379 ));
380 }
381 },
382 None => raw.to_vec(),
383 };
384 if !payload.is_empty() {
385 hasher.update(&payload);
386 size += payload.len() as u64;
387 if let Err(e) = file.write_all(&payload).await {
388 cleanup(file, &path).await;
389 return Err(AwsServiceError::aws_error(
390 StatusCode::INTERNAL_SERVER_ERROR,
391 "InternalError",
392 format!("failed to spool request body: {e}"),
393 ));
394 }
395 }
396 }
397 }
398 }
401 Some(Err(e)) => {
402 cleanup(file, &path).await;
403 return Err(stream_error_to_aws(&e.to_string()));
404 }
405 None => break,
406 }
407 }
408
409 if let Err(e) = file.flush().await {
410 cleanup(file, &path).await;
411 return Err(AwsServiceError::aws_error(
412 StatusCode::INTERNAL_SERVER_ERROR,
413 "InternalError",
414 format!("failed to flush spool tempfile: {e}"),
415 ));
416 }
417 drop(file);
418
419 let md5_hex = hex_lower(&hasher.finalize());
420 Ok(SpooledBody {
421 path,
422 size,
423 md5_hex,
424 })
425}
426
427fn hex_lower(bytes: &[u8]) -> String {
428 const HEX: &[u8] = b"0123456789abcdef";
429 let mut out = String::with_capacity(bytes.len() * 2);
430 for b in bytes {
431 out.push(HEX[(b >> 4) as usize] as char);
432 out.push(HEX[(b & 0x0f) as usize] as char);
433 }
434 out
435}
436
437#[derive(Debug)]
446pub enum ResponseBody {
447 Bytes(Bytes),
448 File { file: tokio::fs::File, size: u64 },
449}
450
451impl ResponseBody {
452 pub fn len(&self) -> u64 {
453 match self {
454 ResponseBody::Bytes(b) => b.len() as u64,
455 ResponseBody::File { size, .. } => *size,
456 }
457 }
458
459 pub fn is_empty(&self) -> bool {
460 self.len() == 0
461 }
462
463 pub fn expect_bytes(&self) -> &[u8] {
467 match self {
468 ResponseBody::Bytes(b) => b,
469 ResponseBody::File { .. } => {
470 panic!("expect_bytes called on ResponseBody::File")
471 }
472 }
473 }
474}
475
476impl Default for ResponseBody {
477 fn default() -> Self {
478 ResponseBody::Bytes(Bytes::new())
479 }
480}
481
482impl From<Bytes> for ResponseBody {
483 fn from(b: Bytes) -> Self {
484 ResponseBody::Bytes(b)
485 }
486}
487
488impl From<Vec<u8>> for ResponseBody {
489 fn from(v: Vec<u8>) -> Self {
490 ResponseBody::Bytes(Bytes::from(v))
491 }
492}
493
494impl From<&'static [u8]> for ResponseBody {
495 fn from(s: &'static [u8]) -> Self {
496 ResponseBody::Bytes(Bytes::from_static(s))
497 }
498}
499
500impl From<String> for ResponseBody {
501 fn from(s: String) -> Self {
502 ResponseBody::Bytes(Bytes::from(s))
503 }
504}
505
506impl From<&'static str> for ResponseBody {
507 fn from(s: &'static str) -> Self {
508 ResponseBody::Bytes(Bytes::from_static(s.as_bytes()))
509 }
510}
511
512impl PartialEq<Bytes> for ResponseBody {
513 fn eq(&self, other: &Bytes) -> bool {
514 match self {
515 ResponseBody::Bytes(b) => b == other,
516 ResponseBody::File { .. } => false,
517 }
518 }
519}
520
521pub struct AwsResponse {
523 pub status: StatusCode,
524 pub content_type: String,
525 pub body: ResponseBody,
526 pub headers: HeaderMap,
527}
528
529impl AwsResponse {
530 pub fn xml(status: StatusCode, body: impl Into<Bytes>) -> Self {
531 Self {
532 status,
533 content_type: "text/xml".to_string(),
534 body: ResponseBody::Bytes(body.into()),
535 headers: HeaderMap::new(),
536 }
537 }
538
539 pub fn json(status: StatusCode, body: impl Into<Bytes>) -> Self {
540 Self {
541 status,
542 content_type: "application/x-amz-json-1.1".to_string(),
543 body: ResponseBody::Bytes(body.into()),
544 headers: HeaderMap::new(),
545 }
546 }
547
548 pub fn json_value(status: StatusCode, value: serde_json::Value) -> Self {
554 Self::json(
555 status,
556 serde_json::to_vec(&value).expect("serde_json::Value serialization is infallible"),
557 )
558 }
559
560 pub fn ok_json(value: serde_json::Value) -> Self {
562 Self::json_value(StatusCode::OK, value)
563 }
564}
565
566#[derive(Debug, thiserror::Error)]
568pub enum AwsServiceError {
569 #[error("service not found: {service}")]
570 ServiceNotFound { service: String },
571
572 #[error("action {action} not implemented for service {service}")]
573 ActionNotImplemented { service: String, action: String },
574
575 #[error("{code}: {message}")]
576 AwsError {
577 status: StatusCode,
578 code: String,
579 message: String,
580 extra_fields: Vec<(String, String)>,
582 headers: Vec<(String, String)>,
584 },
585}
586
587impl AwsServiceError {
588 pub fn action_not_implemented(service: &str, action: &str) -> Self {
589 Self::ActionNotImplemented {
590 service: service.to_string(),
591 action: action.to_string(),
592 }
593 }
594
595 pub fn aws_error(
596 status: StatusCode,
597 code: impl Into<String>,
598 message: impl Into<String>,
599 ) -> Self {
600 Self::AwsError {
601 status,
602 code: code.into(),
603 message: message.into(),
604 extra_fields: Vec::new(),
605 headers: Vec::new(),
606 }
607 }
608
609 pub fn aws_error_with_fields(
610 status: StatusCode,
611 code: impl Into<String>,
612 message: impl Into<String>,
613 extra_fields: Vec<(String, String)>,
614 ) -> Self {
615 Self::AwsError {
616 status,
617 code: code.into(),
618 message: message.into(),
619 extra_fields,
620 headers: Vec::new(),
621 }
622 }
623
624 pub fn aws_error_with_headers(
625 status: StatusCode,
626 code: impl Into<String>,
627 message: impl Into<String>,
628 headers: Vec<(String, String)>,
629 ) -> Self {
630 Self::AwsError {
631 status,
632 code: code.into(),
633 message: message.into(),
634 extra_fields: Vec::new(),
635 headers,
636 }
637 }
638
639 pub fn extra_fields(&self) -> &[(String, String)] {
640 match self {
641 Self::AwsError { extra_fields, .. } => extra_fields,
642 _ => &[],
643 }
644 }
645
646 pub fn status(&self) -> StatusCode {
647 match self {
648 Self::ServiceNotFound { .. } => StatusCode::BAD_REQUEST,
649 Self::ActionNotImplemented { .. } => StatusCode::NOT_IMPLEMENTED,
650 Self::AwsError { status, .. } => *status,
651 }
652 }
653
654 pub fn code(&self) -> &str {
655 match self {
656 Self::ServiceNotFound { .. } => "UnknownService",
657 Self::ActionNotImplemented { .. } => "InvalidAction",
658 Self::AwsError { code, .. } => code,
659 }
660 }
661
662 pub fn message(&self) -> String {
663 match self {
664 Self::ServiceNotFound { service } => format!("service not found: {service}"),
665 Self::ActionNotImplemented { service, action } => {
666 format!("action {action} not implemented for service {service}")
667 }
668 Self::AwsError { message, .. } => message.clone(),
669 }
670 }
671
672 pub fn response_headers(&self) -> &[(String, String)] {
673 match self {
674 Self::AwsError { headers, .. } => headers,
675 _ => &[],
676 }
677 }
678}
679
680#[async_trait]
682pub trait AwsService: Send + Sync {
683 fn service_name(&self) -> &str;
685
686 async fn handle(&self, request: AwsRequest) -> Result<AwsResponse, AwsServiceError>;
688
689 fn supported_actions(&self) -> &[&str];
691
692 fn iam_enforceable(&self) -> bool {
707 false
708 }
709
710 fn iam_action_for(&self, _request: &AwsRequest) -> Option<crate::auth::IamAction> {
723 None
724 }
725
726 fn iam_condition_keys_for(
746 &self,
747 _request: &AwsRequest,
748 _action: &crate::auth::IamAction,
749 ) -> BTreeMap<String, Vec<String>> {
750 BTreeMap::new()
751 }
752
753 fn resource_tags_for(
765 &self,
766 _resource_arn: &str,
767 ) -> Option<std::collections::HashMap<String, String>> {
768 None
769 }
770
771 fn request_tags_from(
781 &self,
782 _request: &AwsRequest,
783 _action: &str,
784 ) -> Option<std::collections::HashMap<String, String>> {
785 None
786 }
787}
788
789#[cfg(test)]
790mod tests {
791 use super::*;
792 use crate::auth::IamAction;
793 use async_trait::async_trait;
794
795 fn aws_chunked_body(payload: &[u8], chunk_size: usize, with_trailer: bool) -> Vec<u8> {
798 let sig = "0".repeat(64);
799 let mut out = Vec::new();
800 for c in payload.chunks(chunk_size.max(1)) {
801 out.extend_from_slice(format!("{:x};chunk-signature={sig}\r\n", c.len()).as_bytes());
802 out.extend_from_slice(c);
803 out.extend_from_slice(b"\r\n");
804 }
805 out.extend_from_slice(format!("0;chunk-signature={sig}\r\n").as_bytes());
806 if with_trailer {
807 out.extend_from_slice(b"x-amz-checksum-crc32:AAAAAA==\r\n");
808 }
809 out.extend_from_slice(b"\r\n");
810 out
811 }
812
813 fn decode_all(body: &[u8], feed_size: usize) -> Vec<u8> {
814 let mut d = AwsChunkedDecoder::default();
815 let mut out = Vec::new();
816 for frame in body.chunks(feed_size.max(1)) {
817 out.extend(d.feed(frame).expect("valid chunked body"));
818 }
819 out
820 }
821
822 #[test]
823 fn aws_chunked_decoder_roundtrips_across_frame_boundaries() {
824 let payload: Vec<u8> = (0..5000u32).map(|i| (i % 251) as u8).collect();
825 for with_trailer in [false, true] {
827 let body = aws_chunked_body(&payload, 1024, with_trailer);
828 for feed in [1usize, 7, 64, 1000, body.len()] {
831 let decoded = decode_all(&body, feed);
832 assert_eq!(decoded, payload, "feed={feed} trailer={with_trailer}");
833 }
834 }
835 }
836
837 #[test]
838 fn aws_chunked_decoder_handles_empty_payload() {
839 let body = aws_chunked_body(b"", 1024, false);
840 assert_eq!(decode_all(&body, 3), Vec::<u8>::new());
841 }
842
843 #[test]
844 fn aws_chunked_decoder_rejects_bad_size_line() {
845 let mut d = AwsChunkedDecoder::default();
846 assert!(d.feed(b"zz;chunk-signature=x\r\n").is_err());
847 }
848
849 #[test]
850 fn is_aws_chunked_detects_streaming_markers() {
851 let mut h = http::HeaderMap::new();
852 assert!(!is_aws_chunked(&h));
853 h.insert("content-encoding", "aws-chunked".parse().unwrap());
854 assert!(is_aws_chunked(&h));
855 let mut h2 = http::HeaderMap::new();
856 h2.insert(
857 "x-amz-content-sha256",
858 "STREAMING-AWS4-HMAC-SHA256-PAYLOAD".parse().unwrap(),
859 );
860 assert!(is_aws_chunked(&h2));
861 let mut h3 = http::HeaderMap::new();
863 h3.insert("content-encoding", "gzip".parse().unwrap());
864 assert!(!is_aws_chunked(&h3));
865 }
866
867 #[test]
868 fn strip_aws_chunked_keeps_real_encoding() {
869 assert_eq!(strip_aws_chunked_encoding(Some("aws-chunked")), None);
870 assert_eq!(
871 strip_aws_chunked_encoding(Some("aws-chunked, gzip")).as_deref(),
872 Some("gzip")
873 );
874 assert_eq!(
875 strip_aws_chunked_encoding(Some("gzip")).as_deref(),
876 Some("gzip")
877 );
878 assert_eq!(strip_aws_chunked_encoding(None), None);
879 }
880
881 struct DefaultService;
882
883 #[async_trait]
884 impl AwsService for DefaultService {
885 fn service_name(&self) -> &str {
886 "default"
887 }
888 async fn handle(&self, _request: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
889 unreachable!()
890 }
891 fn supported_actions(&self) -> &[&str] {
892 &[]
893 }
894 }
895
896 struct PopulatedService;
897
898 #[async_trait]
899 impl AwsService for PopulatedService {
900 fn service_name(&self) -> &str {
901 "populated"
902 }
903 async fn handle(&self, _request: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
904 unreachable!()
905 }
906 fn supported_actions(&self) -> &[&str] {
907 &[]
908 }
909 fn iam_condition_keys_for(
910 &self,
911 _request: &AwsRequest,
912 _action: &IamAction,
913 ) -> BTreeMap<String, Vec<String>> {
914 let mut m = BTreeMap::new();
915 m.insert("s3:prefix".to_string(), vec!["logs/".to_string()]);
916 m
917 }
918 }
919
920 fn sample_request() -> AwsRequest {
921 AwsRequest {
922 service: "default".into(),
923 action: "Noop".into(),
924 region: "us-east-1".into(),
925 account_id: "123456789012".into(),
926 request_id: "req-1".into(),
927 headers: HeaderMap::new(),
928 query_params: HashMap::new(),
929 body: Bytes::new(),
930 body_stream: parking_lot::Mutex::new(None),
931 path_segments: vec![],
932 raw_path: "/".into(),
933 raw_query: String::new(),
934 method: Method::GET,
935 is_query_protocol: false,
936 access_key_id: None,
937 principal: None,
938 }
939 }
940
941 fn sample_action() -> IamAction {
942 IamAction {
943 service: "s3",
944 action: "ListBucket",
945 resource: "arn:aws:s3:::my-bucket".to_string(),
946 }
947 }
948
949 #[test]
950 fn iam_condition_keys_for_default_is_empty() {
951 let svc = DefaultService;
952 let keys = svc.iam_condition_keys_for(&sample_request(), &sample_action());
953 assert!(keys.is_empty());
954 }
955
956 #[test]
957 fn iam_condition_keys_for_override_returns_map() {
958 let svc = PopulatedService;
959 let keys = svc.iam_condition_keys_for(&sample_request(), &sample_action());
960 assert_eq!(keys.get("s3:prefix"), Some(&vec!["logs/".to_string()]));
961 }
962
963 #[test]
964 fn response_body_len_and_is_empty_for_bytes() {
965 let body: ResponseBody = Bytes::from_static(b"hello").into();
966 assert_eq!(body.len(), 5);
967 assert!(!body.is_empty());
968 let empty: ResponseBody = ResponseBody::default();
969 assert!(empty.is_empty());
970 }
971
972 #[test]
973 fn response_body_from_vec_and_string_and_str() {
974 let from_vec: ResponseBody = vec![1u8, 2, 3].into();
975 assert_eq!(from_vec.expect_bytes(), &[1, 2, 3][..]);
976 let from_string: ResponseBody = String::from("hi").into();
977 assert_eq!(from_string.expect_bytes(), b"hi");
978 let from_str: ResponseBody = "hey".into();
979 assert_eq!(from_str.expect_bytes(), b"hey");
980 let from_static: ResponseBody = (b"123" as &'static [u8]).into();
981 assert_eq!(from_static.expect_bytes(), b"123");
982 }
983
984 #[test]
985 fn response_body_partial_eq_bytes() {
986 let body: ResponseBody = Bytes::from_static(b"x").into();
987 assert!(body == Bytes::from_static(b"x"));
988 assert!(!(body == Bytes::from_static(b"y")));
989 }
990
991 #[test]
992 fn aws_request_json_body_empty_returns_null() {
993 let req = sample_request();
994 assert_eq!(req.json_body(), serde_json::Value::Null);
995 }
996
997 #[test]
998 fn aws_request_json_body_parses_valid() {
999 let mut req = sample_request();
1000 req.body = Bytes::from_static(br#"{"a":1}"#);
1001 assert_eq!(req.json_body(), serde_json::json!({"a": 1}));
1002 }
1003
1004 #[test]
1005 fn aws_response_xml_constructor() {
1006 let resp = AwsResponse::xml(StatusCode::OK, Bytes::from_static(b"<ok/>"));
1007 assert_eq!(resp.status, StatusCode::OK);
1008 assert_eq!(resp.content_type, "text/xml");
1009 }
1010
1011 #[test]
1012 fn aws_response_json_constructor() {
1013 let resp = AwsResponse::json(StatusCode::CREATED, "{}");
1014 assert_eq!(resp.status, StatusCode::CREATED);
1015 assert_eq!(resp.content_type, "application/x-amz-json-1.1");
1016 }
1017
1018 #[test]
1019 fn aws_response_ok_json_helper() {
1020 let resp = AwsResponse::ok_json(serde_json::json!({"ok": true}));
1021 assert_eq!(resp.status, StatusCode::OK);
1022 assert!(resp.body.expect_bytes().starts_with(b"{"));
1023 }
1024
1025 #[test]
1026 fn aws_error_service_not_found_fields() {
1027 let err = AwsServiceError::ServiceNotFound {
1028 service: "sqs".to_string(),
1029 };
1030 assert_eq!(err.status(), StatusCode::BAD_REQUEST);
1031 assert_eq!(err.code(), "UnknownService");
1032 assert!(err.message().contains("sqs"));
1033 assert!(err.extra_fields().is_empty());
1034 assert!(err.response_headers().is_empty());
1035 }
1036
1037 #[test]
1038 fn aws_error_action_not_implemented_fields() {
1039 let err = AwsServiceError::action_not_implemented("sns", "FutureAction");
1040 assert_eq!(err.status(), StatusCode::NOT_IMPLEMENTED);
1041 assert_eq!(err.code(), "InvalidAction");
1042 assert!(err.message().contains("FutureAction"));
1043 assert!(err.message().contains("sns"));
1044 }
1045
1046 #[test]
1047 fn aws_error_aws_error_helpers() {
1048 let e = AwsServiceError::aws_error(StatusCode::FORBIDDEN, "Denied", "no");
1049 assert_eq!(e.status(), StatusCode::FORBIDDEN);
1050 assert_eq!(e.code(), "Denied");
1051 assert_eq!(e.message(), "no");
1052
1053 let fields = vec![("Bucket".to_string(), "b".to_string())];
1054 let ef = AwsServiceError::aws_error_with_fields(
1055 StatusCode::NOT_FOUND,
1056 "Missing",
1057 "gone",
1058 fields.clone(),
1059 );
1060 assert_eq!(ef.extra_fields(), fields.as_slice());
1061
1062 let hdrs = vec![("X-Retry".to_string(), "1".to_string())];
1063 let eh = AwsServiceError::aws_error_with_headers(
1064 StatusCode::TOO_MANY_REQUESTS,
1065 "Throttled",
1066 "slow",
1067 hdrs.clone(),
1068 );
1069 assert_eq!(eh.response_headers(), hdrs.as_slice());
1070 }
1071
1072 #[test]
1073 #[should_panic(expected = "expect_bytes called on ResponseBody::File")]
1074 fn response_body_expect_bytes_panics_on_file() {
1075 let f = std::fs::File::create(std::env::temp_dir().join("fc-test-expect-file")).unwrap();
1076 let async_f = tokio::fs::File::from_std(f);
1077 let body = ResponseBody::File {
1078 file: async_f,
1079 size: 0,
1080 };
1081 let _ = body.expect_bytes();
1082 }
1083}