1use std::{
14 convert::Infallible,
15 io::{self, ErrorKind},
16 ops::Deref,
17 sync::{
18 atomic::{AtomicBool, Ordering},
19 Arc, Mutex,
20 },
21};
22
23use anyhow::anyhow;
24use async_stream::stream;
25use async_zip::{base::write::ZipFileWriter, Compression, ZipEntryBuilder};
26use bytes::{Bytes, BytesMut};
27use esthri::aws_sdk::Client as S3Client;
28use futures::stream::{Stream, StreamExt, TryStreamExt};
29use hyper::header::{CONTENT_ENCODING, LOCATION};
30use log::*;
31use maud::{html, Markup, DOCTYPE};
32use mime_guess::mime::{APPLICATION_OCTET_STREAM, TEXT_HTML_UTF_8};
33use serde::{Deserialize, Serialize};
34use tokio::io::DuplexStream;
35use tokio_util::{
36 codec::{BytesCodec, FramedRead},
37 compat::{Compat, FuturesAsyncWriteCompatExt},
38};
39use warp::{
40 http::{
41 self,
42 header::{
43 CACHE_CONTROL, CONTENT_DISPOSITION, CONTENT_LENGTH, CONTENT_TYPE, ETAG, LAST_MODIFIED,
44 },
45 response,
46 status::StatusCode,
47 Response,
48 },
49 hyper::Body,
50 Filter,
51};
52
53use esthri::{
54 download_streaming, head_object, list_directory_stream, list_objects_stream, HeadObjectInfo,
55 S3ListingItem,
56};
57
58const LAST_MODIFIED_TIME_FMT: &str = "%a, %d %b %Y %H:%M:%S GMT";
59const MAX_BUF_SIZE: usize = 1024 * 1024;
60
61#[derive(Deserialize)]
62struct DownloadParams {
63 archive: Option<bool>,
64 archive_name: Option<String>,
65 prefixes: Option<S3PrefixList>,
66}
67
68#[derive(Debug)]
69struct S3PrefixList {
70 prefixes: Vec<String>,
71}
72
73impl<'de> serde::Deserialize<'de> for S3PrefixList {
74 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
75 where
76 D: serde::Deserializer<'de>,
77 {
78 let s = String::deserialize(deserializer)?;
79 let s = s.split('|').map(|x| {
80 if !x.is_empty() {
81 Ok(String::from(x))
82 } else {
83 Err("empty prefix")
84 }
85 });
86 let s: Result<Vec<String>, _> = s.collect();
87 let prefixes = s.map_err(serde::de::Error::custom)?;
88 if prefixes.is_empty() {
89 return Err(serde::de::Error::custom("empty prefix list"));
90 }
91 Ok(S3PrefixList { prefixes })
92 }
93}
94
95struct ErrorTracker {
96 has_error: AtomicBool,
97 the_error: Mutex<Option<Box<anyhow::Error>>>,
98}
99
100impl ErrorTracker {
101 fn new() -> Self {
102 ErrorTracker {
103 has_error: AtomicBool::new(false),
104 the_error: Mutex::new(None),
105 }
106 }
107
108 fn record_error(error_tracker: ErrorTrackerArc, err: anyhow::Error) {
115 error_tracker.has_error.store(true, Ordering::Release);
116 let mut the_error = error_tracker.the_error.lock().expect("locking error field");
117 *the_error = Some(Box::new(err));
118 }
119}
120
121#[derive(Clone)]
122struct ErrorTrackerArc(Arc<ErrorTracker>);
123
124impl ErrorTrackerArc {
125 fn new() -> ErrorTrackerArc {
126 ErrorTrackerArc(Arc::new(ErrorTracker::new()))
127 }
128 fn has_error(&self) -> bool {
129 self.0.has_error.load(Ordering::Acquire)
130 }
131}
132
133impl Deref for ErrorTrackerArc {
134 type Target = ErrorTracker;
135 fn deref(&self) -> &Self::Target {
136 self.0.as_ref()
137 }
138}
139
140impl From<ErrorTrackerArc> for Option<io::Error> {
141 fn from(s: ErrorTrackerArc) -> Option<io::Error> {
142 let the_error = s.0.the_error.lock().unwrap();
143 let the_error = {
144 if let Some(the_error) = &*the_error {
145 the_error
146 } else {
147 return None;
148 }
149 };
150 if let Some(the_error) = the_error.downcast_ref::<io::Error>() {
151 Some(io::Error::new(the_error.kind(), format!("{}", the_error)))
152 } else {
153 Some(io::Error::new(ErrorKind::Other, format!("{}", the_error)))
154 }
155 }
156}
157
158#[derive(Debug, PartialEq)]
159struct EsthriRejection {
160 message: String,
161}
162
163impl warp::reject::Reject for EsthriRejection {}
164
165impl EsthriRejection {
166 fn warp_rejection<MsgT: AsRef<str>>(message: MsgT) -> warp::Rejection {
167 warp::reject::custom(EsthriRejection {
168 message: message.as_ref().to_owned(),
169 })
170 }
171 fn warp_result<T, MsgT: AsRef<str>>(message: MsgT) -> Result<T, warp::Rejection> {
172 Err(EsthriRejection::warp_rejection(message))
173 }
174}
175
176#[derive(Serialize)]
177struct ErrorMessage {
178 code: u16,
179 message: String,
180}
181
182fn with_bucket(bucket: String) -> impl Filter<Extract = (String,), Error = Infallible> + Clone {
183 warp::any().map(move || bucket.clone())
184}
185
186fn with_allowed_prefixes(
190 allowed_prefixes: &[String],
191) -> impl Filter<Extract = (Option<Vec<String>>,), Error = Infallible> + Clone {
192 let allowed_prefixes = if !allowed_prefixes.is_empty() {
193 let allowed_prefixes = allowed_prefixes.to_vec();
194 Some(
195 allowed_prefixes
196 .iter()
197 .map(|prefix| {
198 if prefix.ends_with('/') {
199 prefix.to_owned()
200 } else {
201 prefix.to_owned() + "/"
202 }
203 })
204 .collect(),
205 )
206 } else {
207 None
208 };
209 info!("allowed prefixes: {:?}", allowed_prefixes);
210 warp::any().map(move || allowed_prefixes.clone())
211}
212
213fn with_s3_client(
214 s3_client: S3Client,
215) -> impl Filter<Extract = (S3Client,), Error = Infallible> + Clone {
216 warp::any().map(move || s3_client.clone())
217}
218
219pub fn esthri_filter(
236 s3_client: S3Client,
237 bucket: &str,
238 index_html: bool,
239 allowed_prefixes: &[String],
240) -> impl Filter<Extract = (http::Response<Body>,), Error = warp::Rejection> + Clone {
241 warp::path::full()
242 .and(with_s3_client(s3_client))
243 .and(with_bucket(bucket.to_owned()))
244 .and(warp::any().map(move || index_html))
245 .and(with_allowed_prefixes(allowed_prefixes))
246 .and(warp::query::<DownloadParams>())
247 .and(warp::header::optional::<String>("if-none-match"))
248 .and_then(download)
249}
250
251#[cfg(not(windows))]
252pub async fn run(
253 s3_client: S3Client,
254 bucket: &str,
255 address: &std::net::SocketAddr,
256 index_html: bool,
257 allowed_prefixes: &[String],
258) -> Result<(), Infallible> {
259 use tokio::signal::unix::{signal as unix_signal, SignalKind};
260
261 let mut terminate_signal_stream =
262 unix_signal(SignalKind::terminate()).expect("could not setup tokio signal handler");
263
264 let still_alive = || "still alive";
265 let health_check = warp::path(".esthri_health_check").map(still_alive);
266
267 let routes = health_check
268 .or(esthri_filter(
269 s3_client,
270 bucket,
271 index_html,
272 allowed_prefixes,
273 ))
274 .recover(handle_rejection);
275
276 let (addr, server) = warp::serve(routes).bind_with_graceful_shutdown(*address, async move {
277 terminate_signal_stream.recv().await;
278 debug!("got shutdown signal, waiting for all open connections to complete...");
279 });
280
281 info!("listening on: http://{}...", addr);
282 let _ = tokio::task::spawn(server).await;
283
284 info!("shutting down...");
285
286 Ok(())
287}
288
289async fn abort_with_error(error_tracker: ErrorTrackerArc, err: anyhow::Error) {
290 ErrorTracker::record_error(error_tracker, err);
291}
292
293async fn stream_object_to_archive(
294 s3: &S3Client,
295 bucket: &str,
296 path: &str,
297 archive: &mut ZipFileWriter<Compat<DuplexStream>>,
298 error_tracker: ErrorTrackerArc,
299) -> bool {
300 let stream = match download_streaming(s3, bucket, path, true).await {
301 Ok(byte_stream) => (byte_stream).map_err(into_io_error),
302 Err(err) => {
303 abort_with_error(
304 error_tracker.clone(),
305 anyhow!(err).context("s3 download failed"),
306 )
307 .await;
308 return !error_tracker.has_error();
309 }
310 };
311
312 let stream_reader = stream.into_async_read();
313 let mut stream_reader = tokio_util::compat::FuturesAsyncReadCompatExt::compat(stream_reader);
314
315 let options = ZipEntryBuilder::new(path.into(), Compression::Deflate);
316
317 match archive
318 .write_entry_stream(options)
319 .await
320 .map(FuturesAsyncWriteCompatExt::compat_write)
321 {
322 Ok(mut writer) => {
323 match tokio::io::copy(&mut stream_reader, &mut writer).await {
324 Ok(_) => {}
325 Err(err) => {
326 abort_with_error(
327 error_tracker.clone(),
328 anyhow!(err).context("failed to write to archive"),
329 )
330 .await;
331 }
332 }
333 match writer.into_inner().close().await {
334 Ok(_) => {}
335 Err(err) => {
336 abort_with_error(
337 error_tracker.clone(),
338 anyhow!(err).context("failed to close archive"),
339 )
340 .await;
341 }
342 }
343 }
344 Err(err) => {
345 abort_with_error(
346 error_tracker.clone(),
347 anyhow!(err).context("zip append failed"),
348 )
349 .await;
350 }
351 }
352 !error_tracker.has_error()
353}
354
355async fn create_error_monitor_stream<T: Stream<Item = io::Result<BytesMut>> + Unpin>(
356 error_tracker: ErrorTrackerArc,
357 mut source_stream: T,
358) -> impl Stream<Item = io::Result<BytesMut>> {
359 stream! {
360 let error_tracker = error_tracker.clone();
361 loop {
362 let item: Option<io::Result<BytesMut>> = source_stream.next().await;
363 if let Some(item) = item {
364 yield item;
365 } else {
366 if error_tracker.has_error() {
367 let the_error: Option<io::Error> = error_tracker.into();
368 if let Some(the_error) = the_error {
369 error!("stream error: {}", the_error);
370 yield Err(the_error);
371 } else {
372 error!("no error even though one was signaled");
373 }
374 } else {
375 debug!("wrapped stream done, no error signaled");
376 }
377 return;
378 }
379 }
380 }
381}
382
383async fn create_archive_stream(
384 s3: S3Client,
385 bucket: String,
386 prefixes: Vec<String>,
387 error_tracker: ErrorTrackerArc,
388) -> impl Stream<Item = io::Result<BytesMut>> {
389 let (zip_reader, zip_writer) = tokio::io::duplex(MAX_BUF_SIZE);
390 let mut writer = ZipFileWriter::with_tokio(zip_writer);
391
392 let error_tracker_reader = error_tracker.clone();
393 tokio::spawn(async move {
394 for prefix in prefixes {
395 let mut object_list_stream = list_objects_stream(&s3, &bucket, &prefix);
396 let error_tracker = error_tracker.clone();
397 loop {
398 match object_list_stream.try_next().await {
399 Ok(None) => break,
400 Ok(Some(items)) => {
401 for s3obj in items {
402 let s3obj = s3obj
403 .as_object()
404 .expect("list_objects_stream only returns objects");
405 if !stream_object_to_archive(
406 &s3,
407 &bucket,
408 &s3obj.key,
409 &mut writer,
410 error_tracker.clone(),
411 )
412 .await
413 {
414 break;
415 }
416 }
417 if error_tracker.has_error() {
418 break;
419 }
420 }
421 Err(err) => {
422 let err = anyhow!(err).context("listing objects");
423 abort_with_error(error_tracker.clone(), err).await;
424 break;
425 }
426 }
427 }
428 }
429 match writer.close().await {
430 Ok(_) => {}
431 Err(err) => {
432 let err = anyhow!(err).context("closing zip writer");
433 abort_with_error(error_tracker.clone(), err).await;
434 }
435 }
436 });
437 let framed_reader = FramedRead::new(zip_reader, BytesCodec::new());
438 create_error_monitor_stream(error_tracker_reader, framed_reader).await
439}
440
441fn into_io_error<E: std::error::Error>(err: E) -> io::Error {
442 io::Error::new(ErrorKind::Other, format!("{}", err))
443}
444
445fn get_stripped_path<'a>(base: &str, path: &'a str) -> &'a str {
446 path.strip_prefix(base).unwrap()
447}
448
449fn format_archive_download_button(path: &str, tooltip: &str) -> Markup {
450 html! {
451 a class="btn btn-primary btn-sm" role="button" href=(path)
452 title=(tooltip) {
453 span class="fa fa-file-archive fa-lg" { }
454 }
455 }
456}
457
458fn format_prefix_link(prefix: &str) -> Markup {
459 html! {
460 div {
461 i class="far fa-folder fa-fw pe-4" { }
462 a href=(prefix) {
463 (prefix)
464 }
465 }
466 (format_archive_download_button(&format!("{}?archive=true", prefix), &format!("Download tgz archive of {}", prefix)))
467 }
468}
469
470#[allow(clippy::branches_sharing_code)]
471fn format_object_link(path: &str) -> Markup {
472 html! {
473 div {
474 i class="far fa-file fa-fw pe-4" { }
475 a href=(path) {
476 (path)
477 }
478 }
479 }
480}
481
482fn format_bucket_item(path: &str, archive: bool) -> Markup {
483 html! {
484 @if archive {
485 (format_prefix_link(path))
486 } @else {
487 (format_object_link(path))
488 }
489 }
490}
491
492fn format_title(bucket: &str, path: &str) -> Markup {
493 let path_components: Vec<&str> = path
494 .strip_suffix('/')
495 .unwrap_or(path)
496 .split_terminator('/')
497 .collect();
498
499 let path_length = path_components.len();
500
501 html! {
502 h5 {
503 a href=("../".repeat(path_length)) {
504 (bucket)
505 }
506
507 @for (i, component) in path_components.iter().enumerate() {
508 " > "
509 @if i == path_length - 1 {
510 a href="." class="pe-1" {
511 (component)
512 }
513
514 (format_archive_download_button(".?archive=true",&format!("Download tgz archive of {}", component)))
515 } @else {
516 a href=("../".repeat(path_length - 1 - i)) {
517 (component)
518 }
519 }
520 }
521 }
522 }
523}
524
525async fn is_object(s3: &S3Client, bucket: &str, path: &str) -> Result<bool, warp::Rejection> {
526 if path.is_empty() {
527 Ok(false)
528 } else {
529 Ok(get_obj_info(s3, bucket, path).await.is_ok())
530 }
531}
532
533async fn is_directory(s3: &S3Client, bucket: &str, path: &str) -> Result<bool, warp::Rejection> {
534 if is_object(s3, bucket, path).await? {
535 return Ok(false);
536 }
537 let mut directory_list_stream = list_directory_stream(s3, bucket, path);
538 match directory_list_stream.try_next().await {
539 Ok(None) => Ok(false),
540 Ok(Some(_)) => Ok(true),
541 Err(err) => {
542 let message = format!("error listing item: {}", err);
543 EsthriRejection::warp_result(message)
544 }
545 }
546}
547
548async fn create_listing_page(s3: S3Client, bucket: String, path: String) -> io::Result<Bytes> {
549 let mut directory_list_stream = list_directory_stream(&s3, &bucket, &path);
550 let mut elements = vec![];
551 loop {
552 match directory_list_stream.try_next().await {
553 Ok(None) => break,
554 Ok(Some(items)) => {
555 for s3obj in items {
556 match s3obj {
557 S3ListingItem::S3Object(o) => elements
558 .push(format_bucket_item(get_stripped_path(&path, &o.key), false)),
559 S3ListingItem::S3CommonPrefix(cp) => {
560 elements.push(format_bucket_item(get_stripped_path(&path, &cp), true))
561 }
562 }
563 }
564 }
565 Err(err) => {
566 return Err(into_io_error(err));
567 }
568 }
569 }
570 let document = html! {
571 (DOCTYPE)
572 meta charset="utf-8";
573 title {
574 "Esthri " (bucket) " - " (path)
575 }
576 link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/bootstrap/5.1.1/css/bootstrap.min.css";
577 link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/5.15.4/css/all.min.css";
578
579 div."container p-2" {
580 (format_title(&bucket, &path))
581
582 ul class="list-group" {
583 @for element in elements {
584 li class="list-group-item d-flex justify-content-between align-items-center" {
585 (element)
586 }
587 }
588 }
589 }
590 };
591 Ok(Bytes::from(document.into_string()))
592}
593
594async fn create_item_stream(
595 s3: S3Client,
596 bucket: String,
597 path: String,
598) -> impl Stream<Item = esthri::Result<Bytes>> {
599 stream! {
600 let mut stream = match download_streaming(&s3, &bucket, &path, false).await {
603 Ok(byte_stream) => byte_stream,
604 Err(err) => {
605 yield Err(err);
606 return;
607 }
608 };
609 loop {
610 if let Some(data) = stream.next().await {
611 yield data;
612 } else {
613 return;
614 }
615 }
616 }
617}
618
619async fn get_obj_info(
620 s3: &S3Client,
621 bucket: &str,
622 path: &str,
623) -> Result<HeadObjectInfo, warp::Rejection> {
624 match head_object(s3, &bucket, &path).await {
625 Ok(obj_info) => {
626 if let Some(obj_info) = obj_info {
627 Ok(obj_info)
628 } else {
629 Err(warp::reject::not_found())
630 }
631 }
632 Err(err) => {
633 let message = format!("error listing item: {}", err);
634 EsthriRejection::warp_result(message)
635 }
636 }
637}
638
639async fn item_pre_response(
640 s3: &S3Client,
641 bucket: String,
642 path: String,
643 if_none_match: Option<String>,
644 mut resp_builder: response::Builder,
645) -> Result<(response::Builder, Option<(String, String)>), warp::Rejection> {
646 let obj_info = get_obj_info(s3, &bucket, &path).await?;
647 let not_modified = if_none_match
648 .map(|etag| etag.strip_prefix("W/").map(String::from).unwrap_or(etag))
649 .map(|etag| etag == obj_info.e_tag)
650 .unwrap_or(false);
651 if not_modified {
652 resp_builder = resp_builder.status(StatusCode::NOT_MODIFIED);
653 Ok((resp_builder, None))
654 } else {
655 resp_builder = resp_builder
656 .header(CONTENT_LENGTH, obj_info.size)
657 .header(ETAG, &obj_info.e_tag)
658 .header(CACHE_CONTROL, "private,max-age=0")
659 .header(
660 LAST_MODIFIED,
661 obj_info
662 .last_modified
663 .format(LAST_MODIFIED_TIME_FMT)
664 .to_string(),
665 );
666 if obj_info.is_esthri_compressed() {
667 resp_builder = resp_builder.header(CONTENT_ENCODING, "gzip");
668 }
669 let mime = mime_guess::from_path(&path);
670 let mime = mime.first();
671 if let Some(mime) = mime {
672 resp_builder = resp_builder.header(CONTENT_TYPE, mime.essence_str());
673 } else {
674 resp_builder =
675 resp_builder.header(CONTENT_TYPE, APPLICATION_OCTET_STREAM.essence_str());
676 }
677 Ok((resp_builder, Some((bucket, path))))
678 }
679}
680
681#[cfg(not(windows))]
682async fn handle_rejection(err: warp::Rejection) -> Result<impl warp::Reply, Infallible> {
684 let code;
685 let message;
686 if err.is_not_found() {
687 code = StatusCode::NOT_FOUND;
688 message = "not found".to_owned();
689 } else if let Some(EsthriRejection {
690 message: message_inner,
691 }) = err.find()
692 {
693 code = StatusCode::BAD_REQUEST;
694 message = message_inner.to_owned();
695 } else if err
696 .find::<warp::filters::body::BodyDeserializeError>()
697 .is_some()
698 {
699 message = "deserialization error".to_owned();
700 code = StatusCode::BAD_REQUEST;
701 } else if err.find::<warp::reject::MethodNotAllowed>().is_some() {
702 code = StatusCode::METHOD_NOT_ALLOWED;
703 message = "not allowed".to_owned();
704 } else if err.find::<warp::reject::InvalidQuery>().is_some() {
705 code = StatusCode::METHOD_NOT_ALLOWED;
706 message = "invalid query string".to_owned();
707 } else {
708 code = StatusCode::INTERNAL_SERVER_ERROR;
709 message = format!("internal error: {:?}", err);
710 }
711 let json = warp::reply::json(&ErrorMessage {
712 code: code.as_u16(),
713 message,
714 });
715 Ok(warp::reply::with_status(json, code))
716}
717
718fn sanitize_filename(filename: String) -> String {
719 let options = sanitize_filename::Options {
720 windows: true,
721 replacement: "_",
722 ..Default::default()
723 };
724 sanitize_filename::sanitize_with_options(
725 filename.strip_suffix('/').unwrap_or(&filename),
726 options,
727 )
728}
729
730async fn redirect_on_dir_without_slash(
735 s3: &S3Client,
736 bucket: &str,
737 path: &str,
738) -> Result<(bool, Option<Result<http::Response<Body>, warp::Rejection>>), warp::Rejection> {
739 let is_dir_listing = path.is_empty() || path.ends_with('/');
740 if !is_dir_listing && is_directory(s3, bucket, path).await? {
741 let path = String::from("/") + path + "/";
742 debug!("redirect for index-html: {}", path);
743 let response = Response::builder()
744 .header(LOCATION, path)
745 .status(StatusCode::FOUND)
746 .body(Body::empty())
747 .map_err(|err| EsthriRejection::warp_rejection(format!("{}", err)));
748 Ok((is_dir_listing, Some(response)))
749 } else {
750 Ok((is_dir_listing, None))
751 }
752}
753
754async fn maybe_serve_index_html(
759 s3: &S3Client,
760 bucket: &str,
761 path: String,
762 index_html: bool,
763 is_dir_listing: bool,
764) -> Result<(bool, String), warp::Rejection> {
765 let index_html_path = if path.is_empty() {
766 "index.html".to_owned()
767 } else {
768 path.clone() + "index.html"
769 };
770 if is_dir_listing && index_html && is_object(s3, bucket, &index_html_path).await? {
771 Ok((false, index_html_path))
772 } else {
773 Ok((is_dir_listing, path))
774 }
775}
776
777fn should_reject(path: &str, allowed_prefixes: &[String], params: &DownloadParams) -> bool {
779 assert!(!allowed_prefixes.is_empty());
782 if allowed_prefixes.iter().any(|p| path.starts_with(p)) {
783 false
784 } else if path.is_empty() && params.prefixes.is_some() {
785 let prefixes = params.prefixes.as_ref().unwrap();
786 !prefixes.prefixes.iter().all(|archive_prefix| {
787 allowed_prefixes
788 .iter()
789 .any(|p| archive_prefix.starts_with(p))
790 })
791 } else {
792 true
793 }
794}
795
796async fn download(
822 path: warp::path::FullPath,
823 s3: S3Client,
824 bucket: String,
825 index_html: bool,
826 allowed_prefixes: Option<Vec<String>>,
827 params: DownloadParams,
828 if_none_match: Option<String>,
829) -> Result<http::Response<Body>, warp::Rejection> {
830 let path = path
831 .as_str()
832 .to_owned()
833 .get(1..)
834 .map(Into::<String>::into)
835 .unwrap_or_default();
836 debug!(
837 "path: {}, params: archive: {:?}, prefixes: {:?}",
838 path, params.archive, params.prefixes
839 );
840
841 if let Some(allowed_prefixes) = allowed_prefixes {
842 if should_reject(&path, &allowed_prefixes[..], ¶ms) {
843 return Err(warp::reject::not_found());
844 }
845 }
846
847 let (is_dir_listing, maybe_redirect) =
848 redirect_on_dir_without_slash(&s3, &bucket, &path).await?;
849
850 if let Some(redirect) = maybe_redirect {
851 return redirect;
852 }
853
854 let (is_dir_listing, path) =
855 maybe_serve_index_html(&s3, &bucket, path, index_html, is_dir_listing).await?;
856
857 let resp_builder = Response::builder();
858 let error_tracker = ErrorTrackerArc::new();
859
860 let is_archive = params.archive.unwrap_or(false);
861
862 let (body, resp_builder) = if is_archive {
863 let (archive_filename, prefixes) = if !path.is_empty() && params.prefixes.is_none() {
864 let archive_filename = sanitize_filename(path.clone());
865 (format!("{}.zip", archive_filename), Some(vec![path]))
866 } else if let Some(prefixes) = params.prefixes {
867 if path.is_empty() {
868 let archive_filename = params.archive_name.unwrap_or_else(|| "archive.zip".into());
869 (sanitize_filename(archive_filename), Some(prefixes.prefixes))
870 } else {
871 return Err(EsthriRejection::warp_rejection(
872 "path must be empty with prefixes",
873 ));
874 }
875 } else {
876 return Err(EsthriRejection::warp_rejection(
877 "path and prefixes were empty",
878 ));
879 };
880
881 if let Some(prefixes) = prefixes {
882 let stream = create_archive_stream(s3.clone(), bucket, prefixes, error_tracker).await;
883 (
884 Some(Body::wrap_stream(stream)),
885 resp_builder
886 .header(CONTENT_TYPE, APPLICATION_OCTET_STREAM.essence_str())
887 .header(
888 CONTENT_DISPOSITION,
889 format!("attachment; filename=\"{}\"", archive_filename),
890 ),
891 )
892 } else {
893 (None, resp_builder)
894 }
895 } else if is_dir_listing {
896 let listing_page = create_listing_page(s3.clone(), bucket, path).await;
897 let header = resp_builder.header(CONTENT_TYPE, TEXT_HTML_UTF_8.essence_str());
898 match listing_page {
899 Ok(page) => (Some(Body::from(page)), header),
900 Err(e) => (Some(Body::from(e.to_string())), header),
901 }
902 } else {
903 let (resp_builder, create_stream) =
904 item_pre_response(&s3, bucket, path, if_none_match, resp_builder).await?;
905 if let Some((bucket, path)) = create_stream {
906 let stream = create_item_stream(s3.clone(), bucket, path).await;
907 (Some(Body::wrap_stream(stream)), resp_builder)
908 } else {
909 (None, resp_builder)
910 }
911 };
912
913 resp_builder
914 .body(body.unwrap_or_else(Body::empty))
915 .map_err(|err| EsthriRejection::warp_rejection(format!("{}", err)))
916}