Skip to main content

s3s/ops/
mod.rs

1//! Internal S3 operation dispatch, HTTP serialization, and deserialization.
2//!
3//! This module converts incoming HTTP requests into typed operation inputs,
4//! invokes the user-provided [`S3`](crate::S3) implementation, and converts
5//! the resulting outputs or errors back into HTTP responses.
6
7cfg_if::cfg_if! {
8    if #[cfg(feature = "minio")] {
9        mod generated_minio;
10        use self::generated_minio as generated;
11    } else {
12        mod generated;
13    }
14}
15
16pub use self::generated::*;
17
18mod signature;
19use self::signature::SignatureContext;
20
21mod get_object;
22mod multipart;
23
24#[cfg(test)]
25mod tests;
26
27use crate::access::{S3Access, S3AccessContext};
28use crate::auth::{Credentials, S3Auth};
29use crate::config::S3ConfigProvider;
30use crate::error::*;
31use crate::header;
32use crate::host::S3Host;
33use crate::http::Body;
34use crate::http::{self, BodySizeLimitExceeded};
35use crate::http::{OrderedHeaders, OrderedQs};
36use crate::http::{Request, Response};
37use crate::path::{ParseS3PathError, S3Path};
38use crate::post_policy::PostPolicy;
39use crate::protocol::S3Request;
40use crate::route::S3Route;
41use crate::s3_trait::S3;
42use crate::validation::{AwsNameValidation, NameValidation};
43
44use std::mem;
45use std::net::{IpAddr, SocketAddr};
46use std::ops::Not;
47use std::sync::Arc;
48
49use bytes::Bytes;
50use hyper::HeaderMap;
51use hyper::Method;
52use hyper::StatusCode;
53use hyper::Uri;
54use mime::Mime;
55use tracing::{debug, error};
56
57#[async_trait::async_trait]
58pub trait Operation: Send + Sync + 'static {
59    fn name(&self) -> &'static str;
60
61    async fn call(&self, ccx: &CallContext<'_>, req: &mut Request) -> S3Result<Response>;
62}
63
64pub struct CallContext<'a> {
65    pub s3: &'a Arc<dyn S3>,
66    pub config: &'a Arc<dyn S3ConfigProvider>,
67    pub host: Option<&'a dyn S3Host>,
68    pub auth: Option<&'a dyn S3Auth>,
69    pub access: Option<&'a dyn S3Access>,
70    pub route: Option<&'a dyn S3Route>,
71    pub validation: Option<&'a dyn NameValidation>,
72}
73
74fn build_s3_request<T>(input: T, req: &mut Request) -> S3Request<T> {
75    let method = req.method.clone();
76    let uri = mem::take(&mut req.uri);
77    let headers = mem::take(&mut req.headers);
78    let extensions = mem::take(&mut req.extensions);
79    let credentials = req.s3ext.credentials.take();
80    let region = req.s3ext.region.take();
81    let service = req.s3ext.service.take();
82    let trailing_headers = req.s3ext.trailing_headers.take();
83
84    S3Request {
85        input,
86        method,
87        uri,
88        headers,
89        extensions,
90        credentials,
91        region,
92        service,
93        trailing_headers,
94    }
95}
96
97pub(crate) fn serialize_error(mut e: S3Error, no_decl: bool) -> S3Result<Response> {
98    let status = e.status_code().unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
99    let mut res = Response::with_status(status);
100    if no_decl {
101        http::set_xml_body_no_decl(&mut res, &e)?;
102    } else {
103        http::set_xml_body(&mut res, &e)?;
104    }
105    if let Some(headers) = e.take_headers() {
106        res.headers = headers;
107    }
108    drop(e);
109    Ok(res)
110}
111
112fn unknown_operation() -> S3Error {
113    S3Error::with_message(S3ErrorCode::NotImplemented, "Unknown operation")
114}
115
116fn extract_http2_authority(req: &Request) -> Option<&str> {
117    if matches!(req.version, ::http::Version::HTTP_2 | ::http::Version::HTTP_3)
118        && let Some(authority) = req.uri.authority()
119    {
120        return Some(authority.as_str());
121    }
122    None
123}
124
125fn extract_host(req: &Request) -> S3Result<Option<String>> {
126    // First try to get from Host header.
127    if let Some(val) = req.headers.get(crate::header::HOST) {
128        let on_err = |e| s3_error!(e, InvalidRequest, "invalid header: Host: {val:?}");
129        let host = val.to_str().map_err(on_err)?;
130        return Ok(Some(host.into()));
131    }
132
133    // For HTTP/2 and HTTP/3, the Host header is replaced by :authority pseudo-header.
134    // https://github.com/hyperium/hyper/discussions/2435
135    if let Some(authority) = extract_http2_authority(req) {
136        return Ok(Some(authority.into()));
137    }
138
139    Ok(None)
140}
141
142fn is_socket_addr_or_ip_addr(host: &str) -> bool {
143    host.parse::<SocketAddr>().is_ok() || host.parse::<IpAddr>().is_ok()
144}
145
146fn convert_parse_s3_path_error(err: &ParseS3PathError) -> S3Error {
147    match err {
148        ParseS3PathError::InvalidPath => s3_error!(InvalidURI),
149        ParseS3PathError::InvalidBucketName => s3_error!(InvalidBucketName),
150        ParseS3PathError::KeyTooLong => s3_error!(KeyTooLongError),
151    }
152}
153
154fn extract_qs(req_uri: &Uri) -> S3Result<Option<OrderedQs>> {
155    let Some(query) = req_uri.query() else { return Ok(None) };
156    match OrderedQs::parse(query) {
157        Ok(ans) => Ok(Some(ans)),
158        Err(source) => Err(S3Error::with_source(S3ErrorCode::InvalidURI, Box::new(source))),
159    }
160}
161
162fn check_query_pattern(qs: &OrderedQs, name: &str, val: &str) -> bool {
163    match qs.get_unique(name) {
164        Some(v) => v == val,
165        None => false,
166    }
167}
168
169fn extract_headers(headers: &HeaderMap) -> S3Result<OrderedHeaders<'_>> {
170    OrderedHeaders::from_headers(headers).map_err(|source| invalid_request!(source, "invalid headers"))
171}
172
173fn extract_mime(hs: &OrderedHeaders<'_>) -> Option<Mime> {
174    let content_type = hs.get_unique(crate::header::CONTENT_TYPE)?;
175
176    // https://github.com/s3s-project/s3s/issues/361
177    if content_type.is_empty() {
178        return None;
179    }
180
181    content_type.parse::<Mime>().ok()
182}
183
184fn extract_content_length(req: &Request) -> Option<u64> {
185    req.headers
186        .get(hyper::header::CONTENT_LENGTH)
187        .and_then(|val| atoi::atoi::<u64>(val.as_bytes()))
188}
189
190fn extract_decoded_content_length(hs: &'_ OrderedHeaders<'_>) -> S3Result<Option<usize>> {
191    let Some(val) = hs.get_unique(crate::header::X_AMZ_DECODED_CONTENT_LENGTH) else { return Ok(None) };
192    match atoi::atoi::<usize>(val.as_bytes()) {
193        Some(x) => Ok(Some(x)),
194        None => Err(invalid_request!("invalid header: x-amz-decoded-content-length")),
195    }
196}
197
198async fn extract_full_body(content_length: Option<u64>, body: &mut Body, max_body_size: usize) -> S3Result<Bytes> {
199    if let Some(bytes) = body.bytes() {
200        return Ok(bytes);
201    }
202
203    let bytes = body.store_all_limited(max_body_size).await.map_err(|e| {
204        if e.is::<BodySizeLimitExceeded>() {
205            S3Error::with_source(S3ErrorCode::MaxMessageLengthExceeded, e)
206        } else {
207            S3Error::with_source(S3ErrorCode::InternalError, e)
208        }
209    })?;
210
211    if bytes.is_empty().not() {
212        let content_length = content_length.ok_or(S3ErrorCode::MissingContentLength)?;
213        if bytes.len() as u64 != content_length {
214            return Err(s3_error!(IncompleteBody));
215        }
216    }
217
218    Ok(bytes)
219}
220
221#[allow(clippy::declare_interior_mutable_const)]
222fn fmt_content_length(len: usize) -> http::HeaderValue {
223    const ZERO: http::HeaderValue = http::HeaderValue::from_static("0");
224    if len > 0 {
225        crate::utils::format::fmt_usize(len, |s| http::HeaderValue::try_from(s).unwrap())
226    } else {
227        ZERO
228    }
229}
230
231pub async fn call(req: &mut Request, ccx: &CallContext<'_>) -> S3Result<Response> {
232    let prep = match prepare(req, ccx).await {
233        Ok(op) => op,
234        Err(err) => {
235            error!(?err, "failed to prepare");
236            return serialize_error(err, false);
237        }
238    };
239
240    match prep {
241        Prepare::S3(op) => {
242            match op.call(ccx, req).await {
243                Ok(resp) => {
244                    Ok(resp) //
245                }
246                Err(err) => {
247                    error!(op = %op.name(), ?err, "op returns error");
248                    serialize_error(err, false)
249                }
250            }
251        }
252        Prepare::CustomRoute => {
253            let body = mem::take(&mut req.body);
254            let mut s3_req = build_s3_request(body, req);
255            let route = ccx.route.unwrap();
256
257            let result = async {
258                route.check_access(&mut s3_req).await?;
259                route.call(s3_req).await
260            }
261            .await;
262
263            match result {
264                Ok(s3_resp) => Ok(Response {
265                    status: s3_resp.status.unwrap_or_default(),
266                    headers: s3_resp.headers,
267                    body: s3_resp.output,
268                    extensions: s3_resp.extensions,
269                }),
270                Err(err) => {
271                    error!(?err, "custom route returns error");
272                    serialize_error(err, false)
273                }
274            }
275        }
276    }
277}
278
279enum Prepare {
280    S3(&'static dyn Operation),
281    CustomRoute,
282}
283
284#[allow(clippy::too_many_lines)]
285#[tracing::instrument(level = "debug", skip_all, err)]
286async fn prepare(req: &mut Request, ccx: &CallContext<'_>) -> S3Result<Prepare> {
287    let s3_path;
288    let mut content_length;
289    {
290        let decoded_uri_path = urlencoding::decode(req.uri.path())
291            .map_err(|_| S3ErrorCode::InvalidURI)?
292            .into_owned();
293
294        let host_header = extract_host(req)?;
295        let vh;
296        let vh_bucket;
297        let vh_region;
298        {
299            let default_validation = &const { AwsNameValidation::new() };
300            let validation = ccx.validation.unwrap_or(default_validation);
301
302            let result = 'parse: {
303                if let (Some(host_header), Some(s3_host)) = (host_header.as_deref(), ccx.host)
304                    && !is_socket_addr_or_ip_addr(host_header)
305                {
306                    debug!(?host_header, ?decoded_uri_path, "parsing virtual-hosted-style request");
307
308                    vh = s3_host.parse_host_header(host_header)?;
309                    debug!(?vh);
310
311                    vh_bucket = vh.bucket();
312                    vh_region = vh.region().map(str::to_owned);
313                    break 'parse crate::path::parse_virtual_hosted_style_with_validation(
314                        vh_bucket,
315                        &decoded_uri_path,
316                        validation,
317                    );
318                }
319
320                debug!(?decoded_uri_path, "parsing path-style request");
321                vh_bucket = None;
322                vh_region = None;
323                crate::path::parse_path_style_with_validation(&decoded_uri_path, validation)
324            };
325
326            req.s3ext.s3_path = Some(result.map_err(|err| convert_parse_s3_path_error(&err))?);
327            s3_path = req.s3ext.s3_path.as_ref().unwrap();
328        }
329
330        req.s3ext.qs = extract_qs(&req.uri)?;
331        content_length = extract_content_length(req);
332
333        let hs = extract_headers(&req.headers)?;
334        let mime = extract_mime(&hs);
335        let decoded_content_length = extract_decoded_content_length(&hs)?;
336
337        let body_changed;
338        let transformed_body;
339        {
340            let mut scx = SignatureContext {
341                auth: ccx.auth,
342                config: ccx.config,
343
344                req_version: req.version,
345                req_method: &req.method,
346                req_uri: &req.uri,
347                req_body: &mut req.body,
348
349                qs: req.s3ext.qs.as_ref(),
350                hs,
351
352                decoded_uri_path,
353                vh_bucket,
354
355                content_length,
356                decoded_content_length,
357                mime,
358
359                multipart: None,
360                transformed_body: None,
361                trailing_headers: None,
362            };
363
364            let credentials = scx.check().await?;
365
366            body_changed = scx.transformed_body.is_some() || scx.multipart.is_some();
367            transformed_body = scx.transformed_body;
368
369            req.s3ext.multipart = scx.multipart;
370            req.s3ext.trailing_headers = scx.trailing_headers;
371
372            match credentials {
373                Some(cred) => {
374                    req.s3ext.credentials = Some(Credentials {
375                        access_key: cred.access_key,
376                        secret_key: cred.secret_key,
377                    });
378
379                    let cred_region = cred
380                        .region
381                        .filter(|s| !s.is_empty())
382                        .map(|s| crate::region::Region::new(s.into()))
383                        .transpose()
384                        .map_err(|e| invalid_request!("invalid credential region: {e}"))?;
385
386                    // When both the signature credential and S3Host supply a region,
387                    // the credential region is authoritative (it was verified by the
388                    // signature check). Log a debug warning if they disagree so that
389                    // misconfigured clients or hosts are visible in traces.
390                    if let (Some(cred_region), Some(host_region)) = (&cred_region, &vh_region)
391                        && cred_region.as_str() != host_region.as_str()
392                    {
393                        debug!(
394                            cred_region = %cred_region,
395                            host_region = %host_region,
396                            "credential region and virtual-host region differ; \
397                             using credential region"
398                        );
399                    }
400
401                    req.s3ext.region = cred_region;
402                    req.s3ext.service = cred.service;
403                }
404                None => {
405                    req.s3ext.credentials = None;
406                    req.s3ext.region = None;
407                    req.s3ext.service = None;
408                }
409            }
410
411            // Fallback: if no region was determined from the signature credential
412            // (anonymous requests, SigV2), use the region provided by S3Host.
413            if req.s3ext.region.is_none() {
414                req.s3ext.region = vh_region
415                    .filter(|s| !s.is_empty())
416                    .map(|s| crate::region::Region::new(s.into()))
417                    .transpose()
418                    .map_err(|e| invalid_request!("invalid host region: {e}"))?;
419            }
420        }
421
422        if body_changed {
423            // invalidate the original content length
424            if let Some(val) = req.headers.get_mut(header::CONTENT_LENGTH) {
425                *val = fmt_content_length(decoded_content_length.unwrap_or(0));
426            }
427            if let Some(val) = &mut content_length {
428                *val = 0;
429            }
430        }
431        if let Some(body) = transformed_body {
432            req.body = body;
433        }
434
435        let has_multipart = req.s3ext.multipart.is_some();
436        debug!(?body_changed, ?decoded_content_length, ?has_multipart);
437    }
438
439    if let Some(route) = ccx.route
440        && route.is_match(&req.method, &req.uri, &req.headers, &mut req.extensions)
441    {
442        return Ok(Prepare::CustomRoute);
443    }
444
445    let (op, needs_full_body) = 'resolve: {
446        if let Some(multipart) = &mut req.s3ext.multipart
447            && req.method == Method::POST
448        {
449            match s3_path {
450                S3Path::Root => return Err(unknown_operation()),
451                S3Path::Bucket { bucket } => {
452                    // POST object
453                    debug!(?multipart);
454
455                    // Parse POST policy BEFORE reading file stream to prevent resource exhaustion
456                    // See https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-HTTPPOSTConstructPolicy.html
457                    let now = time::OffsetDateTime::now_utc();
458                    let policy = if let Some(policy_b64) = multipart.find_field_value("policy") {
459                        let policy = PostPolicy::from_base64(policy_b64)
460                            .map_err(|e| s3_error!(e, InvalidPolicyDocument, "failed to parse POST policy"))?;
461
462                        // Check policy expiration early to avoid reading file if policy is expired
463                        // Note: clone is necessary because Into<OffsetDateTime> consumes the Timestamp
464                        let expiration_time: time::OffsetDateTime = policy.expiration.clone().into();
465                        if now >= expiration_time {
466                            return Err(S3Error::with_message(S3ErrorCode::AccessDenied, "Request has expired"));
467                        }
468
469                        Some(policy)
470                    } else {
471                        None
472                    };
473
474                    // Determine file size limit: use stricter of policy max or config max
475                    let config = ccx.config.snapshot();
476                    let max_file_size = if let Some(ref pol) = policy {
477                        if let Some((_, max)) = pol.content_length_range() {
478                            // Use the minimum of policy max and config max to prevent resource exhaustion
479                            // Note: policy min is validated later in policy.validate()
480                            std::cmp::min(max, config.post_object_max_file_size)
481                        } else {
482                            config.post_object_max_file_size
483                        }
484                    } else {
485                        config.post_object_max_file_size
486                    };
487
488                    // Aggregate file stream with size limit to get known length
489                    // This is required because downstream handlers (like s3s-proxy) need content-length
490                    let file_stream = multipart.take_file_stream().expect("missing file stream");
491                    let vec_bytes = http::aggregate_file_stream_limited(file_stream, max_file_size)
492                        .await
493                        .map_err(|e| match e {
494                            http::MultipartError::FileTooLarge(..) => {
495                                s3_error!(EntityTooLarge, "Your proposed upload exceeds the maximum allowed object size.")
496                            }
497                            other => invalid_request!(other, "failed to read file stream"),
498                        })?;
499                    // Use saturating_add to prevent overflow in release builds (security-relevant for content-length-range validation)
500                    let file_size: u64 = vec_bytes.iter().map(|b| b.len() as u64).fold(0u64, u64::saturating_add);
501                    let vec_stream = crate::stream::VecByteStream::new(vec_bytes);
502                    req.s3ext.vec_stream = Some(vec_stream);
503
504                    // Validate the policy conditions (if policy exists)
505                    // Note: expiration was already checked above before reading the file
506                    // Pass the URL bucket so that the "bucket" condition can be validated
507                    // even when clients (like boto3) don't include it in form fields.
508                    if let Some(policy) = policy {
509                        policy.validate_conditions_only(multipart, file_size, Some(bucket))?;
510                        req.s3ext.post_policy = Some(policy);
511                    }
512
513                    break 'resolve (&PostObject as &'static dyn Operation, false);
514                }
515                // FIXME: POST /bucket/key hits this branch
516                S3Path::Object { .. } => return Err(s3_error!(MethodNotAllowed)),
517            }
518        }
519        resolve_route(req, s3_path, req.s3ext.qs.as_ref())?
520    };
521
522    // FIXME: hack for E2E tests (minio/mint)
523    if op.name() == "ListObjects"
524        && let Some(qs) = req.s3ext.qs.as_ref()
525        && qs.has("events")
526    {
527        return Err(s3_error!(NotImplemented, "listenBucketNotification only works on MinIO"));
528    }
529
530    debug!(op = %op.name(), ?s3_path, "resolved route");
531
532    if ccx.auth.is_some() {
533        let mut acx = S3AccessContext {
534            credentials: req.s3ext.credentials.as_ref(),
535            s3_path,
536            s3_op: &crate::S3Operation { name: op.name() },
537            method: &req.method,
538            uri: &req.uri,
539            headers: &req.headers,
540            extensions: &mut req.extensions,
541        };
542        match ccx.access {
543            Some(access) => access.check(&mut acx).await?,
544            None => crate::access::default_check(&mut acx)?,
545        }
546    }
547
548    debug!(op = %op.name(), ?s3_path, "checked access");
549
550    if needs_full_body {
551        let config = ccx.config.snapshot();
552        extract_full_body(content_length, &mut req.body, config.xml_max_body_size).await?;
553    }
554
555    Ok(Prepare::S3(op))
556}