1cfg_if::cfg_if! {
8 if #[cfg(feature = "minio")] {
9 mod generated_minio;
10 use self::generated_minio as generated;
11 } else {
12 mod generated;
13 }
14}
15
16pub use self::generated::*;
17
18mod signature;
19use self::signature::SignatureContext;
20
21mod get_object;
22mod multipart;
23
24#[cfg(test)]
25mod tests;
26
27use crate::access::{S3Access, S3AccessContext};
28use crate::auth::{Credentials, S3Auth};
29use crate::config::S3ConfigProvider;
30use crate::error::*;
31use crate::header;
32use crate::host::S3Host;
33use crate::http::Body;
34use crate::http::{self, BodySizeLimitExceeded};
35use crate::http::{OrderedHeaders, OrderedQs};
36use crate::http::{Request, Response};
37use crate::path::{ParseS3PathError, S3Path};
38use crate::post_policy::PostPolicy;
39use crate::protocol::S3Request;
40use crate::route::S3Route;
41use crate::s3_trait::S3;
42use crate::validation::{AwsNameValidation, NameValidation};
43
44use std::mem;
45use std::net::{IpAddr, SocketAddr};
46use std::ops::Not;
47use std::sync::Arc;
48
49use bytes::Bytes;
50use hyper::HeaderMap;
51use hyper::Method;
52use hyper::StatusCode;
53use hyper::Uri;
54use mime::Mime;
55use tracing::{debug, error};
56
57#[async_trait::async_trait]
58pub trait Operation: Send + Sync + 'static {
59 fn name(&self) -> &'static str;
60
61 async fn call(&self, ccx: &CallContext<'_>, req: &mut Request) -> S3Result<Response>;
62}
63
64pub struct CallContext<'a> {
65 pub s3: &'a Arc<dyn S3>,
66 pub config: &'a Arc<dyn S3ConfigProvider>,
67 pub host: Option<&'a dyn S3Host>,
68 pub auth: Option<&'a dyn S3Auth>,
69 pub access: Option<&'a dyn S3Access>,
70 pub route: Option<&'a dyn S3Route>,
71 pub validation: Option<&'a dyn NameValidation>,
72}
73
74fn build_s3_request<T>(input: T, req: &mut Request) -> S3Request<T> {
75 let method = req.method.clone();
76 let uri = mem::take(&mut req.uri);
77 let headers = mem::take(&mut req.headers);
78 let extensions = mem::take(&mut req.extensions);
79 let credentials = req.s3ext.credentials.take();
80 let region = req.s3ext.region.take();
81 let service = req.s3ext.service.take();
82 let trailing_headers = req.s3ext.trailing_headers.take();
83
84 S3Request {
85 input,
86 method,
87 uri,
88 headers,
89 extensions,
90 credentials,
91 region,
92 service,
93 trailing_headers,
94 }
95}
96
97pub(crate) fn serialize_error(mut e: S3Error, no_decl: bool) -> S3Result<Response> {
98 let status = e.status_code().unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
99 let mut res = Response::with_status(status);
100 if no_decl {
101 http::set_xml_body_no_decl(&mut res, &e)?;
102 } else {
103 http::set_xml_body(&mut res, &e)?;
104 }
105 if let Some(headers) = e.take_headers() {
106 res.headers = headers;
107 }
108 drop(e);
109 Ok(res)
110}
111
112fn unknown_operation() -> S3Error {
113 S3Error::with_message(S3ErrorCode::NotImplemented, "Unknown operation")
114}
115
116fn extract_http2_authority(req: &Request) -> Option<&str> {
117 if matches!(req.version, ::http::Version::HTTP_2 | ::http::Version::HTTP_3)
118 && let Some(authority) = req.uri.authority()
119 {
120 return Some(authority.as_str());
121 }
122 None
123}
124
125fn extract_host(req: &Request) -> S3Result<Option<String>> {
126 if let Some(val) = req.headers.get(crate::header::HOST) {
128 let on_err = |e| s3_error!(e, InvalidRequest, "invalid header: Host: {val:?}");
129 let host = val.to_str().map_err(on_err)?;
130 return Ok(Some(host.into()));
131 }
132
133 if let Some(authority) = extract_http2_authority(req) {
136 return Ok(Some(authority.into()));
137 }
138
139 Ok(None)
140}
141
142fn is_socket_addr_or_ip_addr(host: &str) -> bool {
143 host.parse::<SocketAddr>().is_ok() || host.parse::<IpAddr>().is_ok()
144}
145
146fn convert_parse_s3_path_error(err: &ParseS3PathError) -> S3Error {
147 match err {
148 ParseS3PathError::InvalidPath => s3_error!(InvalidURI),
149 ParseS3PathError::InvalidBucketName => s3_error!(InvalidBucketName),
150 ParseS3PathError::KeyTooLong => s3_error!(KeyTooLongError),
151 }
152}
153
154fn extract_qs(req_uri: &Uri) -> S3Result<Option<OrderedQs>> {
155 let Some(query) = req_uri.query() else { return Ok(None) };
156 match OrderedQs::parse(query) {
157 Ok(ans) => Ok(Some(ans)),
158 Err(source) => Err(S3Error::with_source(S3ErrorCode::InvalidURI, Box::new(source))),
159 }
160}
161
162fn check_query_pattern(qs: &OrderedQs, name: &str, val: &str) -> bool {
163 match qs.get_unique(name) {
164 Some(v) => v == val,
165 None => false,
166 }
167}
168
169fn extract_headers(headers: &HeaderMap) -> S3Result<OrderedHeaders<'_>> {
170 OrderedHeaders::from_headers(headers).map_err(|source| invalid_request!(source, "invalid headers"))
171}
172
173fn extract_mime(hs: &OrderedHeaders<'_>) -> Option<Mime> {
174 let content_type = hs.get_unique(crate::header::CONTENT_TYPE)?;
175
176 if content_type.is_empty() {
178 return None;
179 }
180
181 content_type.parse::<Mime>().ok()
182}
183
184fn extract_content_length(req: &Request) -> Option<u64> {
185 req.headers
186 .get(hyper::header::CONTENT_LENGTH)
187 .and_then(|val| atoi::atoi::<u64>(val.as_bytes()))
188}
189
190fn extract_decoded_content_length(hs: &'_ OrderedHeaders<'_>) -> S3Result<Option<usize>> {
191 let Some(val) = hs.get_unique(crate::header::X_AMZ_DECODED_CONTENT_LENGTH) else { return Ok(None) };
192 match atoi::atoi::<usize>(val.as_bytes()) {
193 Some(x) => Ok(Some(x)),
194 None => Err(invalid_request!("invalid header: x-amz-decoded-content-length")),
195 }
196}
197
198async fn extract_full_body(content_length: Option<u64>, body: &mut Body, max_body_size: usize) -> S3Result<Bytes> {
199 if let Some(bytes) = body.bytes() {
200 return Ok(bytes);
201 }
202
203 let bytes = body.store_all_limited(max_body_size).await.map_err(|e| {
204 if e.is::<BodySizeLimitExceeded>() {
205 S3Error::with_source(S3ErrorCode::MaxMessageLengthExceeded, e)
206 } else {
207 S3Error::with_source(S3ErrorCode::InternalError, e)
208 }
209 })?;
210
211 if bytes.is_empty().not() {
212 let content_length = content_length.ok_or(S3ErrorCode::MissingContentLength)?;
213 if bytes.len() as u64 != content_length {
214 return Err(s3_error!(IncompleteBody));
215 }
216 }
217
218 Ok(bytes)
219}
220
221#[allow(clippy::declare_interior_mutable_const)]
222fn fmt_content_length(len: usize) -> http::HeaderValue {
223 const ZERO: http::HeaderValue = http::HeaderValue::from_static("0");
224 if len > 0 {
225 crate::utils::format::fmt_usize(len, |s| http::HeaderValue::try_from(s).unwrap())
226 } else {
227 ZERO
228 }
229}
230
231pub async fn call(req: &mut Request, ccx: &CallContext<'_>) -> S3Result<Response> {
232 let prep = match prepare(req, ccx).await {
233 Ok(op) => op,
234 Err(err) => {
235 error!(?err, "failed to prepare");
236 return serialize_error(err, false);
237 }
238 };
239
240 match prep {
241 Prepare::S3(op) => {
242 match op.call(ccx, req).await {
243 Ok(resp) => {
244 Ok(resp) }
246 Err(err) => {
247 error!(op = %op.name(), ?err, "op returns error");
248 serialize_error(err, false)
249 }
250 }
251 }
252 Prepare::CustomRoute => {
253 let body = mem::take(&mut req.body);
254 let mut s3_req = build_s3_request(body, req);
255 let route = ccx.route.unwrap();
256
257 let result = async {
258 route.check_access(&mut s3_req).await?;
259 route.call(s3_req).await
260 }
261 .await;
262
263 match result {
264 Ok(s3_resp) => Ok(Response {
265 status: s3_resp.status.unwrap_or_default(),
266 headers: s3_resp.headers,
267 body: s3_resp.output,
268 extensions: s3_resp.extensions,
269 }),
270 Err(err) => {
271 error!(?err, "custom route returns error");
272 serialize_error(err, false)
273 }
274 }
275 }
276 }
277}
278
279enum Prepare {
280 S3(&'static dyn Operation),
281 CustomRoute,
282}
283
284#[allow(clippy::too_many_lines)]
285#[tracing::instrument(level = "debug", skip_all, err)]
286async fn prepare(req: &mut Request, ccx: &CallContext<'_>) -> S3Result<Prepare> {
287 let s3_path;
288 let mut content_length;
289 {
290 let decoded_uri_path = urlencoding::decode(req.uri.path())
291 .map_err(|_| S3ErrorCode::InvalidURI)?
292 .into_owned();
293
294 let host_header = extract_host(req)?;
295 let vh;
296 let vh_bucket;
297 let vh_region;
298 {
299 let default_validation = &const { AwsNameValidation::new() };
300 let validation = ccx.validation.unwrap_or(default_validation);
301
302 let result = 'parse: {
303 if let (Some(host_header), Some(s3_host)) = (host_header.as_deref(), ccx.host)
304 && !is_socket_addr_or_ip_addr(host_header)
305 {
306 debug!(?host_header, ?decoded_uri_path, "parsing virtual-hosted-style request");
307
308 vh = s3_host.parse_host_header(host_header)?;
309 debug!(?vh);
310
311 vh_bucket = vh.bucket();
312 vh_region = vh.region().map(str::to_owned);
313 break 'parse crate::path::parse_virtual_hosted_style_with_validation(
314 vh_bucket,
315 &decoded_uri_path,
316 validation,
317 );
318 }
319
320 debug!(?decoded_uri_path, "parsing path-style request");
321 vh_bucket = None;
322 vh_region = None;
323 crate::path::parse_path_style_with_validation(&decoded_uri_path, validation)
324 };
325
326 req.s3ext.s3_path = Some(result.map_err(|err| convert_parse_s3_path_error(&err))?);
327 s3_path = req.s3ext.s3_path.as_ref().unwrap();
328 }
329
330 req.s3ext.qs = extract_qs(&req.uri)?;
331 content_length = extract_content_length(req);
332
333 let hs = extract_headers(&req.headers)?;
334 let mime = extract_mime(&hs);
335 let decoded_content_length = extract_decoded_content_length(&hs)?;
336
337 let body_changed;
338 let transformed_body;
339 {
340 let mut scx = SignatureContext {
341 auth: ccx.auth,
342 config: ccx.config,
343
344 req_version: req.version,
345 req_method: &req.method,
346 req_uri: &req.uri,
347 req_body: &mut req.body,
348
349 qs: req.s3ext.qs.as_ref(),
350 hs,
351
352 decoded_uri_path,
353 vh_bucket,
354
355 content_length,
356 decoded_content_length,
357 mime,
358
359 multipart: None,
360 transformed_body: None,
361 trailing_headers: None,
362 };
363
364 let credentials = scx.check().await?;
365
366 body_changed = scx.transformed_body.is_some() || scx.multipart.is_some();
367 transformed_body = scx.transformed_body;
368
369 req.s3ext.multipart = scx.multipart;
370 req.s3ext.trailing_headers = scx.trailing_headers;
371
372 match credentials {
373 Some(cred) => {
374 req.s3ext.credentials = Some(Credentials {
375 access_key: cred.access_key,
376 secret_key: cred.secret_key,
377 });
378
379 let cred_region = cred
380 .region
381 .filter(|s| !s.is_empty())
382 .map(|s| crate::region::Region::new(s.into()))
383 .transpose()
384 .map_err(|e| invalid_request!("invalid credential region: {e}"))?;
385
386 if let (Some(cred_region), Some(host_region)) = (&cred_region, &vh_region)
391 && cred_region.as_str() != host_region.as_str()
392 {
393 debug!(
394 cred_region = %cred_region,
395 host_region = %host_region,
396 "credential region and virtual-host region differ; \
397 using credential region"
398 );
399 }
400
401 req.s3ext.region = cred_region;
402 req.s3ext.service = cred.service;
403 }
404 None => {
405 req.s3ext.credentials = None;
406 req.s3ext.region = None;
407 req.s3ext.service = None;
408 }
409 }
410
411 if req.s3ext.region.is_none() {
414 req.s3ext.region = vh_region
415 .filter(|s| !s.is_empty())
416 .map(|s| crate::region::Region::new(s.into()))
417 .transpose()
418 .map_err(|e| invalid_request!("invalid host region: {e}"))?;
419 }
420 }
421
422 if body_changed {
423 if let Some(val) = req.headers.get_mut(header::CONTENT_LENGTH) {
425 *val = fmt_content_length(decoded_content_length.unwrap_or(0));
426 }
427 if let Some(val) = &mut content_length {
428 *val = 0;
429 }
430 }
431 if let Some(body) = transformed_body {
432 req.body = body;
433 }
434
435 let has_multipart = req.s3ext.multipart.is_some();
436 debug!(?body_changed, ?decoded_content_length, ?has_multipart);
437 }
438
439 if let Some(route) = ccx.route
440 && route.is_match(&req.method, &req.uri, &req.headers, &mut req.extensions)
441 {
442 return Ok(Prepare::CustomRoute);
443 }
444
445 let (op, needs_full_body) = 'resolve: {
446 if let Some(multipart) = &mut req.s3ext.multipart
447 && req.method == Method::POST
448 {
449 match s3_path {
450 S3Path::Root => return Err(unknown_operation()),
451 S3Path::Bucket { bucket } => {
452 debug!(?multipart);
454
455 let now = time::OffsetDateTime::now_utc();
458 let policy = if let Some(policy_b64) = multipart.find_field_value("policy") {
459 let policy = PostPolicy::from_base64(policy_b64)
460 .map_err(|e| s3_error!(e, InvalidPolicyDocument, "failed to parse POST policy"))?;
461
462 let expiration_time: time::OffsetDateTime = policy.expiration.clone().into();
465 if now >= expiration_time {
466 return Err(S3Error::with_message(S3ErrorCode::AccessDenied, "Request has expired"));
467 }
468
469 Some(policy)
470 } else {
471 None
472 };
473
474 let config = ccx.config.snapshot();
476 let max_file_size = if let Some(ref pol) = policy {
477 if let Some((_, max)) = pol.content_length_range() {
478 std::cmp::min(max, config.post_object_max_file_size)
481 } else {
482 config.post_object_max_file_size
483 }
484 } else {
485 config.post_object_max_file_size
486 };
487
488 let file_stream = multipart.take_file_stream().expect("missing file stream");
491 let vec_bytes = http::aggregate_file_stream_limited(file_stream, max_file_size)
492 .await
493 .map_err(|e| match e {
494 http::MultipartError::FileTooLarge(..) => {
495 s3_error!(EntityTooLarge, "Your proposed upload exceeds the maximum allowed object size.")
496 }
497 other => invalid_request!(other, "failed to read file stream"),
498 })?;
499 let file_size: u64 = vec_bytes.iter().map(|b| b.len() as u64).fold(0u64, u64::saturating_add);
501 let vec_stream = crate::stream::VecByteStream::new(vec_bytes);
502 req.s3ext.vec_stream = Some(vec_stream);
503
504 if let Some(policy) = policy {
509 policy.validate_conditions_only(multipart, file_size, Some(bucket))?;
510 req.s3ext.post_policy = Some(policy);
511 }
512
513 break 'resolve (&PostObject as &'static dyn Operation, false);
514 }
515 S3Path::Object { .. } => return Err(s3_error!(MethodNotAllowed)),
517 }
518 }
519 resolve_route(req, s3_path, req.s3ext.qs.as_ref())?
520 };
521
522 if op.name() == "ListObjects"
524 && let Some(qs) = req.s3ext.qs.as_ref()
525 && qs.has("events")
526 {
527 return Err(s3_error!(NotImplemented, "listenBucketNotification only works on MinIO"));
528 }
529
530 debug!(op = %op.name(), ?s3_path, "resolved route");
531
532 if ccx.auth.is_some() {
533 let mut acx = S3AccessContext {
534 credentials: req.s3ext.credentials.as_ref(),
535 s3_path,
536 s3_op: &crate::S3Operation { name: op.name() },
537 method: &req.method,
538 uri: &req.uri,
539 headers: &req.headers,
540 extensions: &mut req.extensions,
541 };
542 match ccx.access {
543 Some(access) => access.check(&mut acx).await?,
544 None => crate::access::default_check(&mut acx)?,
545 }
546 }
547
548 debug!(op = %op.name(), ?s3_path, "checked access");
549
550 if needs_full_body {
551 let config = ccx.config.snapshot();
552 extract_full_body(content_length, &mut req.body, config.xml_max_body_size).await?;
553 }
554
555 Ok(Prepare::S3(op))
556}