esthri_server/
http_server.rs

1/*
2 * Copyright (C) 2020 Swift Navigation Inc.
3 * Contact: Swift Navigation <dev@swiftnav.com>
4 *
5 * This source is subject to the license found in the file 'LICENSE' which must
6 * be be distributed together with this source. All other rights reserved.
7 *
8 * THIS CODE AND INFORMATION IS PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND,
9 * EITHER EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE IMPLIED
10 * WARRANTIES OF MERCHANTABILITY AND/OR FITNESS FOR A PARTICULAR PURPOSE.
11 */
12
13use std::{
14    convert::Infallible,
15    io::{self, ErrorKind},
16    ops::Deref,
17    sync::{
18        atomic::{AtomicBool, Ordering},
19        Arc, Mutex,
20    },
21};
22
23use anyhow::anyhow;
24use async_stream::stream;
25use async_zip::{base::write::ZipFileWriter, Compression, ZipEntryBuilder};
26use bytes::{Bytes, BytesMut};
27use esthri::aws_sdk::Client as S3Client;
28use futures::stream::{Stream, StreamExt, TryStreamExt};
29use hyper::header::{CONTENT_ENCODING, LOCATION};
30use log::*;
31use maud::{html, Markup, DOCTYPE};
32use mime_guess::mime::{APPLICATION_OCTET_STREAM, TEXT_HTML_UTF_8};
33use serde::{Deserialize, Serialize};
34use tokio::io::DuplexStream;
35use tokio_util::{
36    codec::{BytesCodec, FramedRead},
37    compat::{Compat, FuturesAsyncWriteCompatExt},
38};
39use warp::{
40    http::{
41        self,
42        header::{
43            CACHE_CONTROL, CONTENT_DISPOSITION, CONTENT_LENGTH, CONTENT_TYPE, ETAG, LAST_MODIFIED,
44        },
45        response,
46        status::StatusCode,
47        Response,
48    },
49    hyper::Body,
50    Filter,
51};
52
53use esthri::{
54    download_streaming, head_object, list_directory_stream, list_objects_stream, HeadObjectInfo,
55    S3ListingItem,
56};
57
58const LAST_MODIFIED_TIME_FMT: &str = "%a, %d %b %Y %H:%M:%S GMT";
59const MAX_BUF_SIZE: usize = 1024 * 1024;
60
61#[derive(Deserialize)]
62struct DownloadParams {
63    archive: Option<bool>,
64    archive_name: Option<String>,
65    prefixes: Option<S3PrefixList>,
66}
67
68#[derive(Debug)]
69struct S3PrefixList {
70    prefixes: Vec<String>,
71}
72
73impl<'de> serde::Deserialize<'de> for S3PrefixList {
74    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
75    where
76        D: serde::Deserializer<'de>,
77    {
78        let s = String::deserialize(deserializer)?;
79        let s = s.split('|').map(|x| {
80            if !x.is_empty() {
81                Ok(String::from(x))
82            } else {
83                Err("empty prefix")
84            }
85        });
86        let s: Result<Vec<String>, _> = s.collect();
87        let prefixes = s.map_err(serde::de::Error::custom)?;
88        if prefixes.is_empty() {
89            return Err(serde::de::Error::custom("empty prefix list"));
90        }
91        Ok(S3PrefixList { prefixes })
92    }
93}
94
95struct ErrorTracker {
96    has_error: AtomicBool,
97    the_error: Mutex<Option<Box<anyhow::Error>>>,
98}
99
100impl ErrorTracker {
101    fn new() -> Self {
102        ErrorTracker {
103            has_error: AtomicBool::new(false),
104            the_error: Mutex::new(None),
105        }
106    }
107
108    /// Store a copy of the provided error in the error tracker for later retrieval.
109    ///
110    /// # Arguments
111    ///
112    /// * `error_tracker` - the error tracker pointer, see [ErrorTrackerArc]
113    /// * `err` - the error toe record
114    fn record_error(error_tracker: ErrorTrackerArc, err: anyhow::Error) {
115        error_tracker.has_error.store(true, Ordering::Release);
116        let mut the_error = error_tracker.the_error.lock().expect("locking error field");
117        *the_error = Some(Box::new(err));
118    }
119}
120
121#[derive(Clone)]
122struct ErrorTrackerArc(Arc<ErrorTracker>);
123
124impl ErrorTrackerArc {
125    fn new() -> ErrorTrackerArc {
126        ErrorTrackerArc(Arc::new(ErrorTracker::new()))
127    }
128    fn has_error(&self) -> bool {
129        self.0.has_error.load(Ordering::Acquire)
130    }
131}
132
133impl Deref for ErrorTrackerArc {
134    type Target = ErrorTracker;
135    fn deref(&self) -> &Self::Target {
136        self.0.as_ref()
137    }
138}
139
140impl From<ErrorTrackerArc> for Option<io::Error> {
141    fn from(s: ErrorTrackerArc) -> Option<io::Error> {
142        let the_error = s.0.the_error.lock().unwrap();
143        let the_error = {
144            if let Some(the_error) = &*the_error {
145                the_error
146            } else {
147                return None;
148            }
149        };
150        if let Some(the_error) = the_error.downcast_ref::<io::Error>() {
151            Some(io::Error::new(the_error.kind(), format!("{}", the_error)))
152        } else {
153            Some(io::Error::new(ErrorKind::Other, format!("{}", the_error)))
154        }
155    }
156}
157
158#[derive(Debug, PartialEq)]
159struct EsthriRejection {
160    message: String,
161}
162
163impl warp::reject::Reject for EsthriRejection {}
164
165impl EsthriRejection {
166    fn warp_rejection<MsgT: AsRef<str>>(message: MsgT) -> warp::Rejection {
167        warp::reject::custom(EsthriRejection {
168            message: message.as_ref().to_owned(),
169        })
170    }
171    fn warp_result<T, MsgT: AsRef<str>>(message: MsgT) -> Result<T, warp::Rejection> {
172        Err(EsthriRejection::warp_rejection(message))
173    }
174}
175
176#[derive(Serialize)]
177struct ErrorMessage {
178    code: u16,
179    message: String,
180}
181
182fn with_bucket(bucket: String) -> impl Filter<Extract = (String,), Error = Infallible> + Clone {
183    warp::any().map(move || bucket.clone())
184}
185
186/// Lift allowed prefixes parameter into a warp handler/filter.  If allowed prefixes are empty then `None` is passed and
187/// all paths are allowed, also normalizes all prefixes so that they end in a slash.
188///
189fn with_allowed_prefixes(
190    allowed_prefixes: &[String],
191) -> impl Filter<Extract = (Option<Vec<String>>,), Error = Infallible> + Clone {
192    let allowed_prefixes = if !allowed_prefixes.is_empty() {
193        let allowed_prefixes = allowed_prefixes.to_vec();
194        Some(
195            allowed_prefixes
196                .iter()
197                .map(|prefix| {
198                    if prefix.ends_with('/') {
199                        prefix.to_owned()
200                    } else {
201                        prefix.to_owned() + "/"
202                    }
203                })
204                .collect(),
205        )
206    } else {
207        None
208    };
209    info!("allowed prefixes: {:?}", allowed_prefixes);
210    warp::any().map(move || allowed_prefixes.clone())
211}
212
213fn with_s3_client(
214    s3_client: S3Client,
215) -> impl Filter<Extract = (S3Client,), Error = Infallible> + Clone {
216    warp::any().map(move || s3_client.clone())
217}
218
219/// The main warp "filter" for the http server module, this lifts all of the information out of the HTTP request that we
220/// need and allows the [download] function to do it's work.  See [warp::Filter] for more details.  In a sense this
221/// function prepares the data that we need and [download] is the function that acts on that data.
222///
223/// # Arguments
224///
225/// * `s3_client` - The S3 client object, see [rusoto_s3::S3]
226/// * `bucket` - The bucket to serve over HTTP.
227/// * `index_html` - Wether to serve "index.html" in place of directory listings.
228/// * `allowed_prefixes` - specifies a list of prefixes which are allowed for access, all other prefixes will be
229/// rejected with HTTP status 404 (not found).
230///
231/// # Errors
232///
233/// This follows [warp]'s error model, see [warp::Rejection].
234///
235pub fn esthri_filter(
236    s3_client: S3Client,
237    bucket: &str,
238    index_html: bool,
239    allowed_prefixes: &[String],
240) -> impl Filter<Extract = (http::Response<Body>,), Error = warp::Rejection> + Clone {
241    warp::path::full()
242        .and(with_s3_client(s3_client))
243        .and(with_bucket(bucket.to_owned()))
244        .and(warp::any().map(move || index_html))
245        .and(with_allowed_prefixes(allowed_prefixes))
246        .and(warp::query::<DownloadParams>())
247        .and(warp::header::optional::<String>("if-none-match"))
248        .and_then(download)
249}
250
251#[cfg(not(windows))]
252pub async fn run(
253    s3_client: S3Client,
254    bucket: &str,
255    address: &std::net::SocketAddr,
256    index_html: bool,
257    allowed_prefixes: &[String],
258) -> Result<(), Infallible> {
259    use tokio::signal::unix::{signal as unix_signal, SignalKind};
260
261    let mut terminate_signal_stream =
262        unix_signal(SignalKind::terminate()).expect("could not setup tokio signal handler");
263
264    let still_alive = || "still alive";
265    let health_check = warp::path(".esthri_health_check").map(still_alive);
266
267    let routes = health_check
268        .or(esthri_filter(
269            s3_client,
270            bucket,
271            index_html,
272            allowed_prefixes,
273        ))
274        .recover(handle_rejection);
275
276    let (addr, server) = warp::serve(routes).bind_with_graceful_shutdown(*address, async move {
277        terminate_signal_stream.recv().await;
278        debug!("got shutdown signal, waiting for all open connections to complete...");
279    });
280
281    info!("listening on: http://{}...", addr);
282    let _ = tokio::task::spawn(server).await;
283
284    info!("shutting down...");
285
286    Ok(())
287}
288
289async fn abort_with_error(error_tracker: ErrorTrackerArc, err: anyhow::Error) {
290    ErrorTracker::record_error(error_tracker, err);
291}
292
293async fn stream_object_to_archive(
294    s3: &S3Client,
295    bucket: &str,
296    path: &str,
297    archive: &mut ZipFileWriter<Compat<DuplexStream>>,
298    error_tracker: ErrorTrackerArc,
299) -> bool {
300    let stream = match download_streaming(s3, bucket, path, true).await {
301        Ok(byte_stream) => (byte_stream).map_err(into_io_error),
302        Err(err) => {
303            abort_with_error(
304                error_tracker.clone(),
305                anyhow!(err).context("s3 download failed"),
306            )
307            .await;
308            return !error_tracker.has_error();
309        }
310    };
311
312    let stream_reader = stream.into_async_read();
313    let mut stream_reader = tokio_util::compat::FuturesAsyncReadCompatExt::compat(stream_reader);
314
315    let options = ZipEntryBuilder::new(path.into(), Compression::Deflate);
316
317    match archive
318        .write_entry_stream(options)
319        .await
320        .map(FuturesAsyncWriteCompatExt::compat_write)
321    {
322        Ok(mut writer) => {
323            match tokio::io::copy(&mut stream_reader, &mut writer).await {
324                Ok(_) => {}
325                Err(err) => {
326                    abort_with_error(
327                        error_tracker.clone(),
328                        anyhow!(err).context("failed to write to archive"),
329                    )
330                    .await;
331                }
332            }
333            match writer.into_inner().close().await {
334                Ok(_) => {}
335                Err(err) => {
336                    abort_with_error(
337                        error_tracker.clone(),
338                        anyhow!(err).context("failed to close archive"),
339                    )
340                    .await;
341                }
342            }
343        }
344        Err(err) => {
345            abort_with_error(
346                error_tracker.clone(),
347                anyhow!(err).context("zip append failed"),
348            )
349            .await;
350        }
351    }
352    !error_tracker.has_error()
353}
354
355async fn create_error_monitor_stream<T: Stream<Item = io::Result<BytesMut>> + Unpin>(
356    error_tracker: ErrorTrackerArc,
357    mut source_stream: T,
358) -> impl Stream<Item = io::Result<BytesMut>> {
359    stream! {
360        let error_tracker = error_tracker.clone();
361        loop {
362            let item: Option<io::Result<BytesMut>> = source_stream.next().await;
363            if let Some(item) = item {
364                yield item;
365            } else {
366                if error_tracker.has_error() {
367                    let the_error: Option<io::Error> = error_tracker.into();
368                    if let Some(the_error) = the_error {
369                        error!("stream error: {}", the_error);
370                        yield Err(the_error);
371                    } else {
372                        error!("no error even though one was signaled");
373                    }
374                } else {
375                    debug!("wrapped stream done, no error signaled");
376                }
377                return;
378            }
379        }
380    }
381}
382
383async fn create_archive_stream(
384    s3: S3Client,
385    bucket: String,
386    prefixes: Vec<String>,
387    error_tracker: ErrorTrackerArc,
388) -> impl Stream<Item = io::Result<BytesMut>> {
389    let (zip_reader, zip_writer) = tokio::io::duplex(MAX_BUF_SIZE);
390    let mut writer = ZipFileWriter::with_tokio(zip_writer);
391
392    let error_tracker_reader = error_tracker.clone();
393    tokio::spawn(async move {
394        for prefix in prefixes {
395            let mut object_list_stream = list_objects_stream(&s3, &bucket, &prefix);
396            let error_tracker = error_tracker.clone();
397            loop {
398                match object_list_stream.try_next().await {
399                    Ok(None) => break,
400                    Ok(Some(items)) => {
401                        for s3obj in items {
402                            let s3obj = s3obj
403                                .as_object()
404                                .expect("list_objects_stream only returns objects");
405                            if !stream_object_to_archive(
406                                &s3,
407                                &bucket,
408                                &s3obj.key,
409                                &mut writer,
410                                error_tracker.clone(),
411                            )
412                            .await
413                            {
414                                break;
415                            }
416                        }
417                        if error_tracker.has_error() {
418                            break;
419                        }
420                    }
421                    Err(err) => {
422                        let err = anyhow!(err).context("listing objects");
423                        abort_with_error(error_tracker.clone(), err).await;
424                        break;
425                    }
426                }
427            }
428        }
429        match writer.close().await {
430            Ok(_) => {}
431            Err(err) => {
432                let err = anyhow!(err).context("closing zip writer");
433                abort_with_error(error_tracker.clone(), err).await;
434            }
435        }
436    });
437    let framed_reader = FramedRead::new(zip_reader, BytesCodec::new());
438    create_error_monitor_stream(error_tracker_reader, framed_reader).await
439}
440
441fn into_io_error<E: std::error::Error>(err: E) -> io::Error {
442    io::Error::new(ErrorKind::Other, format!("{}", err))
443}
444
445fn get_stripped_path<'a>(base: &str, path: &'a str) -> &'a str {
446    path.strip_prefix(base).unwrap()
447}
448
449fn format_archive_download_button(path: &str, tooltip: &str) -> Markup {
450    html! {
451        a class="btn btn-primary btn-sm" role="button" href=(path)
452            title=(tooltip) {
453            span class="fa fa-file-archive fa-lg" { }
454        }
455    }
456}
457
458fn format_prefix_link(prefix: &str) -> Markup {
459    html! {
460        div {
461            i class="far fa-folder fa-fw pe-4" { }
462            a href=(prefix) {
463                (prefix)
464            }
465        }
466        (format_archive_download_button(&format!("{}?archive=true", prefix), &format!("Download tgz archive of {}", prefix)))
467    }
468}
469
470#[allow(clippy::branches_sharing_code)]
471fn format_object_link(path: &str) -> Markup {
472    html! {
473        div {
474            i class="far fa-file fa-fw pe-4" { }
475                a href=(path) {
476                    (path)
477                }
478        }
479    }
480}
481
482fn format_bucket_item(path: &str, archive: bool) -> Markup {
483    html! {
484        @if archive {
485            (format_prefix_link(path))
486        } @else {
487            (format_object_link(path))
488        }
489    }
490}
491
492fn format_title(bucket: &str, path: &str) -> Markup {
493    let path_components: Vec<&str> = path
494        .strip_suffix('/')
495        .unwrap_or(path)
496        .split_terminator('/')
497        .collect();
498
499    let path_length = path_components.len();
500
501    html! {
502        h5 {
503            a href=("../".repeat(path_length)) {
504                (bucket)
505            }
506
507            @for (i, component) in path_components.iter().enumerate() {
508                " > "
509                @if i == path_length - 1 {
510                    a href="." class="pe-1" {
511                        (component)
512                    }
513
514                    (format_archive_download_button(".?archive=true",&format!("Download tgz archive of {}", component)))
515                } @else {
516                    a href=("../".repeat(path_length - 1 - i)) {
517                        (component)
518                    }
519                }
520            }
521        }
522    }
523}
524
525async fn is_object(s3: &S3Client, bucket: &str, path: &str) -> Result<bool, warp::Rejection> {
526    if path.is_empty() {
527        Ok(false)
528    } else {
529        Ok(get_obj_info(s3, bucket, path).await.is_ok())
530    }
531}
532
533async fn is_directory(s3: &S3Client, bucket: &str, path: &str) -> Result<bool, warp::Rejection> {
534    if is_object(s3, bucket, path).await? {
535        return Ok(false);
536    }
537    let mut directory_list_stream = list_directory_stream(s3, bucket, path);
538    match directory_list_stream.try_next().await {
539        Ok(None) => Ok(false),
540        Ok(Some(_)) => Ok(true),
541        Err(err) => {
542            let message = format!("error listing item: {}", err);
543            EsthriRejection::warp_result(message)
544        }
545    }
546}
547
548async fn create_listing_page(s3: S3Client, bucket: String, path: String) -> io::Result<Bytes> {
549    let mut directory_list_stream = list_directory_stream(&s3, &bucket, &path);
550    let mut elements = vec![];
551    loop {
552        match directory_list_stream.try_next().await {
553            Ok(None) => break,
554            Ok(Some(items)) => {
555                for s3obj in items {
556                    match s3obj {
557                        S3ListingItem::S3Object(o) => elements
558                            .push(format_bucket_item(get_stripped_path(&path, &o.key), false)),
559                        S3ListingItem::S3CommonPrefix(cp) => {
560                            elements.push(format_bucket_item(get_stripped_path(&path, &cp), true))
561                        }
562                    }
563                }
564            }
565            Err(err) => {
566                return Err(into_io_error(err));
567            }
568        }
569    }
570    let document = html! {
571        (DOCTYPE)
572        meta charset="utf-8";
573        title {
574            "Esthri " (bucket) " - " (path)
575        }
576        link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/bootstrap/5.1.1/css/bootstrap.min.css";
577        link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/5.15.4/css/all.min.css";
578
579        div."container p-2" {
580            (format_title(&bucket, &path))
581
582            ul class="list-group" {
583                @for element in elements {
584                    li class="list-group-item d-flex justify-content-between align-items-center" {
585                        (element)
586                    }
587                }
588            }
589        }
590    };
591    Ok(Bytes::from(document.into_string()))
592}
593
594async fn create_item_stream(
595    s3: S3Client,
596    bucket: String,
597    path: String,
598) -> impl Stream<Item = esthri::Result<Bytes>> {
599    stream! {
600        // We don't decompress the file here as it will be served with the
601        // correct content-encoding and be decompressed by the browser.
602        let mut stream = match download_streaming(&s3, &bucket, &path, false).await {
603            Ok(byte_stream) => byte_stream,
604            Err(err) => {
605                yield Err(err);
606                return;
607            }
608        };
609        loop {
610            if let Some(data) = stream.next().await {
611                yield data;
612            } else {
613                return;
614            }
615        }
616    }
617}
618
619async fn get_obj_info(
620    s3: &S3Client,
621    bucket: &str,
622    path: &str,
623) -> Result<HeadObjectInfo, warp::Rejection> {
624    match head_object(s3, &bucket, &path).await {
625        Ok(obj_info) => {
626            if let Some(obj_info) = obj_info {
627                Ok(obj_info)
628            } else {
629                Err(warp::reject::not_found())
630            }
631        }
632        Err(err) => {
633            let message = format!("error listing item: {}", err);
634            EsthriRejection::warp_result(message)
635        }
636    }
637}
638
639async fn item_pre_response(
640    s3: &S3Client,
641    bucket: String,
642    path: String,
643    if_none_match: Option<String>,
644    mut resp_builder: response::Builder,
645) -> Result<(response::Builder, Option<(String, String)>), warp::Rejection> {
646    let obj_info = get_obj_info(s3, &bucket, &path).await?;
647    let not_modified = if_none_match
648        .map(|etag| etag.strip_prefix("W/").map(String::from).unwrap_or(etag))
649        .map(|etag| etag == obj_info.e_tag)
650        .unwrap_or(false);
651    if not_modified {
652        resp_builder = resp_builder.status(StatusCode::NOT_MODIFIED);
653        Ok((resp_builder, None))
654    } else {
655        resp_builder = resp_builder
656            .header(CONTENT_LENGTH, obj_info.size)
657            .header(ETAG, &obj_info.e_tag)
658            .header(CACHE_CONTROL, "private,max-age=0")
659            .header(
660                LAST_MODIFIED,
661                obj_info
662                    .last_modified
663                    .format(LAST_MODIFIED_TIME_FMT)
664                    .to_string(),
665            );
666        if obj_info.is_esthri_compressed() {
667            resp_builder = resp_builder.header(CONTENT_ENCODING, "gzip");
668        }
669        let mime = mime_guess::from_path(&path);
670        let mime = mime.first();
671        if let Some(mime) = mime {
672            resp_builder = resp_builder.header(CONTENT_TYPE, mime.essence_str());
673        } else {
674            resp_builder =
675                resp_builder.header(CONTENT_TYPE, APPLICATION_OCTET_STREAM.essence_str());
676        }
677        Ok((resp_builder, Some((bucket, path))))
678    }
679}
680
681#[cfg(not(windows))]
682/// An API error serializable to JSON.
683async fn handle_rejection(err: warp::Rejection) -> Result<impl warp::Reply, Infallible> {
684    let code;
685    let message;
686    if err.is_not_found() {
687        code = StatusCode::NOT_FOUND;
688        message = "not found".to_owned();
689    } else if let Some(EsthriRejection {
690        message: message_inner,
691    }) = err.find()
692    {
693        code = StatusCode::BAD_REQUEST;
694        message = message_inner.to_owned();
695    } else if err
696        .find::<warp::filters::body::BodyDeserializeError>()
697        .is_some()
698    {
699        message = "deserialization error".to_owned();
700        code = StatusCode::BAD_REQUEST;
701    } else if err.find::<warp::reject::MethodNotAllowed>().is_some() {
702        code = StatusCode::METHOD_NOT_ALLOWED;
703        message = "not allowed".to_owned();
704    } else if err.find::<warp::reject::InvalidQuery>().is_some() {
705        code = StatusCode::METHOD_NOT_ALLOWED;
706        message = "invalid query string".to_owned();
707    } else {
708        code = StatusCode::INTERNAL_SERVER_ERROR;
709        message = format!("internal error: {:?}", err);
710    }
711    let json = warp::reply::json(&ErrorMessage {
712        code: code.as_u16(),
713        message,
714    });
715    Ok(warp::reply::with_status(json, code))
716}
717
718fn sanitize_filename(filename: String) -> String {
719    let options = sanitize_filename::Options {
720        windows: true,
721        replacement: "_",
722        ..Default::default()
723    };
724    sanitize_filename::sanitize_with_options(
725        filename.strip_suffix('/').unwrap_or(&filename),
726        options,
727    )
728}
729
730/// If a path is supplied that is actually a "directory" (or in S3, it is a prefix which lists multiple objects) then we
731/// redirect to a version of the path with a trailing slash.  This makes downstream logic around what is an isn't a
732/// directory much easier to implement.
733///
734async fn redirect_on_dir_without_slash(
735    s3: &S3Client,
736    bucket: &str,
737    path: &str,
738) -> Result<(bool, Option<Result<http::Response<Body>, warp::Rejection>>), warp::Rejection> {
739    let is_dir_listing = path.is_empty() || path.ends_with('/');
740    if !is_dir_listing && is_directory(s3, bucket, path).await? {
741        let path = String::from("/") + path + "/";
742        debug!("redirect for index-html: {}", path);
743        let response = Response::builder()
744            .header(LOCATION, path)
745            .status(StatusCode::FOUND)
746            .body(Body::empty())
747            .map_err(|err| EsthriRejection::warp_rejection(format!("{}", err)));
748        Ok((is_dir_listing, Some(response)))
749    } else {
750        Ok((is_dir_listing, None))
751    }
752}
753
754/// If we've enabled the "--index-html" feature to the http server and a "index.html" exists in the directory that's
755/// currently been requested, then we'll serve that file in place of the usualy directory listing.  This allows the
756/// server to behave more like a "real" http server.
757///
758async fn maybe_serve_index_html(
759    s3: &S3Client,
760    bucket: &str,
761    path: String,
762    index_html: bool,
763    is_dir_listing: bool,
764) -> Result<(bool, String), warp::Rejection> {
765    let index_html_path = if path.is_empty() {
766        "index.html".to_owned()
767    } else {
768        path.clone() + "index.html"
769    };
770    if is_dir_listing && index_html && is_object(s3, bucket, &index_html_path).await? {
771        Ok((false, index_html_path))
772    } else {
773        Ok((is_dir_listing, path))
774    }
775}
776
777/// Process list of allowed prefixes and reject the path if it's not allowed
778fn should_reject(path: &str, allowed_prefixes: &[String], params: &DownloadParams) -> bool {
779    // The with_allowed_prefixes filter should ensure that allowed_prefixes is not
780    //   empty by the time we get here.
781    assert!(!allowed_prefixes.is_empty());
782    if allowed_prefixes.iter().any(|p| path.starts_with(p)) {
783        false
784    } else if path.is_empty() && params.prefixes.is_some() {
785        let prefixes = params.prefixes.as_ref().unwrap();
786        !prefixes.prefixes.iter().all(|archive_prefix| {
787            allowed_prefixes
788                .iter()
789                .any(|p| archive_prefix.starts_with(p))
790        })
791    } else {
792        true
793    }
794}
795
796/// The main entrypoint for fulfilling requests to the server.
797///
798/// Path patterns that are requested and handled here:
799///
800/// * HTTP GET request to `/<path/to/object/foo.bin` => results in fetch of `s3://<bucket>/path/to/object/foo.bin`
801///
802/// * HTTP GET request to `/<path/to/object/>` => results in a listing page showing all objects that match the prefix
803/// `path/to/object`
804///
805/// * HTTP GET request to `/<path/to/object/>?archive=true` => results in a zip archive being served with the contents
806/// of the directory `path/to/object` recursively populating said archive
807///
808/// * HTTP GET request to `/?archive=true&prefixes=prefix1|prefix2` => results in a zip archive being served with the
809/// contents all directories listed in the `prefixes` parameter populating the archive
810///
811/// # Arguments
812///
813/// * `path` - the full request path
814/// * `s3` - s3 client implementaiton, see [rusoto_s3::S3Client]
815/// * `bucket` - the bucket that objects are served from
816/// * `index_html` - if `index.html` from directory roots should be served instead of a directory listing
817/// * `allowed_prefixes` - the list of prefixes allowed for access
818/// * `params` - optional download parameter that can customize request behavior (see above)
819/// * `if_none_match` - hash from client to match against for client side cache processing
820///
821async fn download(
822    path: warp::path::FullPath,
823    s3: S3Client,
824    bucket: String,
825    index_html: bool,
826    allowed_prefixes: Option<Vec<String>>,
827    params: DownloadParams,
828    if_none_match: Option<String>,
829) -> Result<http::Response<Body>, warp::Rejection> {
830    let path = path
831        .as_str()
832        .to_owned()
833        .get(1..)
834        .map(Into::<String>::into)
835        .unwrap_or_default();
836    debug!(
837        "path: {}, params: archive: {:?}, prefixes: {:?}",
838        path, params.archive, params.prefixes
839    );
840
841    if let Some(allowed_prefixes) = allowed_prefixes {
842        if should_reject(&path, &allowed_prefixes[..], &params) {
843            return Err(warp::reject::not_found());
844        }
845    }
846
847    let (is_dir_listing, maybe_redirect) =
848        redirect_on_dir_without_slash(&s3, &bucket, &path).await?;
849
850    if let Some(redirect) = maybe_redirect {
851        return redirect;
852    }
853
854    let (is_dir_listing, path) =
855        maybe_serve_index_html(&s3, &bucket, path, index_html, is_dir_listing).await?;
856
857    let resp_builder = Response::builder();
858    let error_tracker = ErrorTrackerArc::new();
859
860    let is_archive = params.archive.unwrap_or(false);
861
862    let (body, resp_builder) = if is_archive {
863        let (archive_filename, prefixes) = if !path.is_empty() && params.prefixes.is_none() {
864            let archive_filename = sanitize_filename(path.clone());
865            (format!("{}.zip", archive_filename), Some(vec![path]))
866        } else if let Some(prefixes) = params.prefixes {
867            if path.is_empty() {
868                let archive_filename = params.archive_name.unwrap_or_else(|| "archive.zip".into());
869                (sanitize_filename(archive_filename), Some(prefixes.prefixes))
870            } else {
871                return Err(EsthriRejection::warp_rejection(
872                    "path must be empty with prefixes",
873                ));
874            }
875        } else {
876            return Err(EsthriRejection::warp_rejection(
877                "path and prefixes were empty",
878            ));
879        };
880
881        if let Some(prefixes) = prefixes {
882            let stream = create_archive_stream(s3.clone(), bucket, prefixes, error_tracker).await;
883            (
884                Some(Body::wrap_stream(stream)),
885                resp_builder
886                    .header(CONTENT_TYPE, APPLICATION_OCTET_STREAM.essence_str())
887                    .header(
888                        CONTENT_DISPOSITION,
889                        format!("attachment; filename=\"{}\"", archive_filename),
890                    ),
891            )
892        } else {
893            (None, resp_builder)
894        }
895    } else if is_dir_listing {
896        let listing_page = create_listing_page(s3.clone(), bucket, path).await;
897        let header = resp_builder.header(CONTENT_TYPE, TEXT_HTML_UTF_8.essence_str());
898        match listing_page {
899            Ok(page) => (Some(Body::from(page)), header),
900            Err(e) => (Some(Body::from(e.to_string())), header),
901        }
902    } else {
903        let (resp_builder, create_stream) =
904            item_pre_response(&s3, bucket, path, if_none_match, resp_builder).await?;
905        if let Some((bucket, path)) = create_stream {
906            let stream = create_item_stream(s3.clone(), bucket, path).await;
907            (Some(Body::wrap_stream(stream)), resp_builder)
908        } else {
909            (None, resp_builder)
910        }
911    };
912
913    resp_builder
914        .body(body.unwrap_or_else(Body::empty))
915        .map_err(|err| EsthriRejection::warp_rejection(format!("{}", err)))
916}