1use std::{
8 fmt, ops,
9 str::{from_utf8, FromStr},
10 time::Duration,
11};
12
13use anyhow::{bail, Error as AnyError};
14use tracing::{debug, info};
15
16pub use crate::retry::*;
17
18#[derive(Debug, Clone, PartialEq)]
20#[non_exhaustive]
21pub struct Config {
22 pub part_size_bytes: PartSizeBytes,
26 pub max_concurrency: MaxConcurrency,
32 pub min_parts_for_concurrent_download: MinPartsForConcurrentDownload,
38 pub min_bytes_for_concurrent_download: MinBytesForConcurrentDownload,
45 pub sequential_download_mode: SequentialDownloadMode,
53 pub ensure_active_pull: EnsureActivePull,
60 pub buffer_size: BufferSize,
67 pub max_buffers_full_delay_ms: MaxBuffersFullDelayMs,
72 pub log_download_messages_as_debug: LogDownloadMessagesAsDebug,
79 pub retries: Option<RetryConfig>,
85}
86
87impl Config {
88 env_ctors!(no_fill);
89
90 pub fn part_size_bytes<T: Into<PartSizeBytes>>(mut self, part_size_bytes: T) -> Self {
92 self.part_size_bytes = part_size_bytes.into();
93 self
94 }
95
96 pub fn max_concurrency<T: Into<MaxConcurrency>>(mut self, max_concurrency: T) -> Self {
98 self.max_concurrency = max_concurrency.into();
99 self
100 }
101
102 pub fn min_parts_for_concurrent_download<T: Into<MinPartsForConcurrentDownload>>(
108 mut self,
109 min_parts_for_concurrent_download: T,
110 ) -> Self {
111 self.min_parts_for_concurrent_download = min_parts_for_concurrent_download.into();
112 self
113 }
114
115 pub fn min_bytes_for_concurrent_download<T: Into<MinBytesForConcurrentDownload>>(
122 mut self,
123 min_bytes_for_concurrent_download: T,
124 ) -> Self {
125 self.min_bytes_for_concurrent_download = min_bytes_for_concurrent_download.into();
126 self
127 }
128
129 pub fn sequential_download_mode<T: Into<SequentialDownloadMode>>(
136 mut self,
137 sequential_download_mode: T,
138 ) -> Self {
139 self.sequential_download_mode = sequential_download_mode.into();
140 self
141 }
142
143 pub fn ensure_active_pull<T: Into<EnsureActivePull>>(mut self, ensure_active_pull: T) -> Self {
150 self.ensure_active_pull = ensure_active_pull.into();
151 self
152 }
153
154 pub fn buffer_size<T: Into<BufferSize>>(mut self, buffer_size: T) -> Self {
156 self.buffer_size = buffer_size.into();
157 self
158 }
159
160 pub fn max_buffers_full_delay_ms<T: Into<MaxBuffersFullDelayMs>>(
163 mut self,
164 max_buffers_full_delay_ms: T,
165 ) -> Self {
166 self.max_buffers_full_delay_ms = max_buffers_full_delay_ms.into();
167 self
168 }
169
170 pub fn log_download_messages_as_debug<T: Into<LogDownloadMessagesAsDebug>>(
177 mut self,
178 log_download_messages_as_debug: T,
179 ) -> Self {
180 self.log_download_messages_as_debug = log_download_messages_as_debug.into();
181 self
182 }
183
184 pub fn retries(mut self, config: RetryConfig) -> Self {
186 self.retries = Some(config);
187 self
188 }
189
190 pub fn configure_retries<F>(mut self, mut f: F) -> Self
195 where
196 F: FnMut(RetryConfig) -> RetryConfig,
197 {
198 let retries = self.retries.take().unwrap_or_default();
199 self.retries(f(retries))
200 }
201
202 pub fn configure_retries_from_default<F>(self, mut f: F) -> Self
204 where
205 F: FnMut(RetryConfig) -> RetryConfig,
206 {
207 self.retries(f(RetryConfig::default()))
208 }
209
210 pub fn disable_retries(mut self) -> Self {
214 self.retries = None;
215 self
216 }
217
218 pub fn validated(self) -> Result<Self, AnyError> {
220 self.validate()?;
221 Ok(self)
222 }
223
224 pub fn validate(&self) -> Result<(), AnyError> {
226 if self.max_concurrency.0 == 0 {
227 bail!("'max_concurrency' must not be 0");
228 }
229
230 if self.part_size_bytes.0 == 0 {
231 bail!("'part_size_bytes' must not be 0");
232 }
233
234 if let Some(retries) = &self.retries {
235 retries.validate()?;
236 }
237
238 Ok(())
239 }
240
241 fn fill_from_env_prefixed_internal<T: AsRef<str>>(
242 &mut self,
243 prefix: T,
244 ) -> Result<bool, AnyError> {
245 let mut found_any = false;
246
247 if let Some(part_size_bytes) = PartSizeBytes::try_from_env_prefixed(prefix.as_ref())? {
248 found_any = true;
249 self.part_size_bytes = part_size_bytes;
250 }
251 if let Some(max_concurrency) = MaxConcurrency::try_from_env_prefixed(prefix.as_ref())? {
252 found_any = true;
253 self.max_concurrency = max_concurrency;
254 }
255 if let Some(min_parts_for_concurrent_download) =
256 MinPartsForConcurrentDownload::try_from_env_prefixed(prefix.as_ref())?
257 {
258 found_any = true;
259 self.min_parts_for_concurrent_download = min_parts_for_concurrent_download;
260 }
261 if let Some(min_bytes_for_concurrent_download) =
262 MinBytesForConcurrentDownload::try_from_env_prefixed(prefix.as_ref())?
263 {
264 found_any = true;
265 self.min_bytes_for_concurrent_download = min_bytes_for_concurrent_download;
266 }
267 if let Some(sequential_download_mode) =
268 SequentialDownloadMode::try_from_env_prefixed(prefix.as_ref())?
269 {
270 found_any = true;
271 self.sequential_download_mode = sequential_download_mode;
272 }
273 if let Some(ensure_active_pull) = EnsureActivePull::try_from_env_prefixed(prefix.as_ref())?
274 {
275 found_any = true;
276 self.ensure_active_pull = ensure_active_pull;
277 }
278 if let Some(buffer_size) = BufferSize::try_from_env_prefixed(prefix.as_ref())? {
279 found_any = true;
280 self.buffer_size = buffer_size;
281 }
282 if let Some(max_buffers_full_delay_ms) =
283 MaxBuffersFullDelayMs::try_from_env_prefixed(prefix.as_ref())?
284 {
285 found_any = true;
286 self.max_buffers_full_delay_ms = max_buffers_full_delay_ms;
287 }
288
289 if let Some(log_download_messages_as_debug) =
290 LogDownloadMessagesAsDebug::try_from_env_prefixed(prefix.as_ref())?
291 {
292 found_any = true;
293 self.log_download_messages_as_debug = log_download_messages_as_debug;
294 }
295
296 if let Some(retries) = RetryConfig::from_env_prefixed(prefix.as_ref())? {
297 found_any = true;
298 self.retries = Some(retries);
299 }
300
301 Ok(found_any)
302 }
303}
304
305impl Default for Config {
306 fn default() -> Self {
307 Self {
308 part_size_bytes: Default::default(),
309 max_concurrency: Default::default(),
310 min_bytes_for_concurrent_download: Default::default(),
311 min_parts_for_concurrent_download: Default::default(),
312 sequential_download_mode: Default::default(),
313 ensure_active_pull: Default::default(),
314 buffer_size: Default::default(),
315 max_buffers_full_delay_ms: Default::default(),
316 log_download_messages_as_debug: Default::default(),
317 retries: Some(Default::default()),
318 }
319 }
320}
321
322#[derive(Debug, Copy, Clone, PartialEq, Eq)]
365pub struct PartSizeBytes(u64);
366
367impl PartSizeBytes {
368 pub fn new<T: Into<u64>>(part_size_bytes: T) -> Self {
369 Self(part_size_bytes.into())
370 }
371
372 pub const fn from_u64(part_size_bytes: u64) -> Self {
373 Self(part_size_bytes)
374 }
375
376 pub const fn into_inner(self) -> u64 {
377 self.0
378 }
379
380 env_funs!("PART_SIZE_BYTES");
381}
382
383impl fmt::Display for PartSizeBytes {
384 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
385 write!(f, "{}", self.0)
386 }
387}
388
389impl ops::Deref for PartSizeBytes {
390 type Target = u64;
391
392 fn deref(&self) -> &Self::Target {
393 &self.0
394 }
395}
396
397impl ops::DerefMut for PartSizeBytes {
398 fn deref_mut(&mut self) -> &mut Self::Target {
399 &mut self.0
400 }
401}
402
403impl Default for PartSizeBytes {
404 fn default() -> Self {
405 PartSizeBytes::new(Mebi(4))
406 }
407}
408
409impl From<u64> for PartSizeBytes {
410 fn from(v: u64) -> Self {
411 PartSizeBytes(v)
412 }
413}
414
415impl From<PartSizeBytes> for u64 {
416 fn from(v: PartSizeBytes) -> Self {
417 v.0
418 }
419}
420
421impl From<Kilo> for PartSizeBytes {
422 fn from(v: Kilo) -> Self {
423 PartSizeBytes::new(v)
424 }
425}
426
427impl From<Mega> for PartSizeBytes {
428 fn from(v: Mega) -> Self {
429 PartSizeBytes::new(v)
430 }
431}
432
433impl From<Giga> for PartSizeBytes {
434 fn from(v: Giga) -> Self {
435 PartSizeBytes::new(v)
436 }
437}
438
439impl From<Kibi> for PartSizeBytes {
440 fn from(v: Kibi) -> Self {
441 PartSizeBytes::new(v)
442 }
443}
444
445impl From<Mebi> for PartSizeBytes {
446 fn from(v: Mebi) -> Self {
447 PartSizeBytes::new(v)
448 }
449}
450
451impl From<Gibi> for PartSizeBytes {
452 fn from(v: Gibi) -> Self {
453 PartSizeBytes::new(v)
454 }
455}
456
457impl FromStr for PartSizeBytes {
458 type Err = anyhow::Error;
459
460 fn from_str(s: &str) -> Result<Self, Self::Err> {
461 s.parse::<UnitPrefix>().map(|up| PartSizeBytes(up.value()))
462 }
463}
464new_type! {
465 #[doc="Maximum concurrency of a single download"]
466 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
467 pub copy struct MaxConcurrency(usize, env="MAX_CONCURRENCY");
468}
469
470impl MaxConcurrency {
471 pub const fn from_usize(max: usize) -> Self {
472 Self(max)
473 }
474}
475
476impl Default for MaxConcurrency {
477 fn default() -> Self {
478 MaxConcurrency(64)
479 }
480}
481
482new_type! {
483 #[doc="Buffer size of a concurrent download task"]
484 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
485 pub copy struct BufferSize(usize, env="BUFFER_SIZE");
486}
487
488impl Default for BufferSize {
489 fn default() -> Self {
490 BufferSize(2)
491 }
492}
493
494new_type! {
495 #[doc="Make sure that the network stream is always actively pulled into an intermediate buffer."]
496 #[doc="This is not always the case since some low concurrency downloads require the strem to be actively pulled."]
497 #[doc="This also allows for detection of panics."]
498 #[doc="The default is `false` which means this feature is turned off."]
499 #[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
500 pub copy struct EnsureActivePull(bool, env="ENSURE_ACTIVE_PULL");
501}
502
503new_type! {
504 #[doc="The minimum number of parts a download must consist of for the parts to be downloaded concurrently"]
505 #[doc="Depending on the part sizes it might be more efficient to set a number higer than 2. Downloading concurrently has an overhead."]
506 #[doc="This setting plays together with `MinBytesForConcurrentDownload`."]
507 #[doc="Setting this value to 0 or 1 makes no sense and has no effect."]
508 #[doc="The default is 2."]
509 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
510 pub copy struct MinPartsForConcurrentDownload(u64, env="MIN_PARTS_FOR_CONCURRENT_DOWNLOAD");
511}
512
513impl Default for MinPartsForConcurrentDownload {
514 fn default() -> Self {
515 MinPartsForConcurrentDownload(2)
516 }
517}
518
519#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
567pub struct MinBytesForConcurrentDownload(u64);
568
569impl MinBytesForConcurrentDownload {
570 pub fn new<T: Into<u64>>(part_size_bytes: T) -> Self {
571 Self(part_size_bytes.into())
572 }
573
574 pub const fn from_u64(part_size_bytes: u64) -> Self {
575 Self(part_size_bytes)
576 }
577
578 pub const fn into_inner(self) -> u64 {
579 self.0
580 }
581
582 env_funs!("MIN_BYTES_FOR_CONCURRENT_DOWNLOAD");
583}
584
585impl ops::Deref for MinBytesForConcurrentDownload {
586 type Target = u64;
587
588 fn deref(&self) -> &Self::Target {
589 &self.0
590 }
591}
592
593impl ops::DerefMut for MinBytesForConcurrentDownload {
594 fn deref_mut(&mut self) -> &mut Self::Target {
595 &mut self.0
596 }
597}
598
599impl Default for MinBytesForConcurrentDownload {
600 fn default() -> Self {
601 MinBytesForConcurrentDownload::new(0u64)
602 }
603}
604
605impl From<u64> for MinBytesForConcurrentDownload {
606 fn from(v: u64) -> Self {
607 MinBytesForConcurrentDownload(v)
608 }
609}
610
611impl From<MinBytesForConcurrentDownload> for u64 {
612 fn from(v: MinBytesForConcurrentDownload) -> Self {
613 v.0
614 }
615}
616
617impl From<Kilo> for MinBytesForConcurrentDownload {
618 fn from(v: Kilo) -> Self {
619 MinBytesForConcurrentDownload::new(v)
620 }
621}
622
623impl From<Mega> for MinBytesForConcurrentDownload {
624 fn from(v: Mega) -> Self {
625 MinBytesForConcurrentDownload::new(v)
626 }
627}
628
629impl From<Giga> for MinBytesForConcurrentDownload {
630 fn from(v: Giga) -> Self {
631 MinBytesForConcurrentDownload::new(v)
632 }
633}
634
635impl From<Kibi> for MinBytesForConcurrentDownload {
636 fn from(v: Kibi) -> Self {
637 MinBytesForConcurrentDownload::new(v)
638 }
639}
640
641impl From<Mebi> for MinBytesForConcurrentDownload {
642 fn from(v: Mebi) -> Self {
643 MinBytesForConcurrentDownload::new(v)
644 }
645}
646
647impl From<Gibi> for MinBytesForConcurrentDownload {
648 fn from(v: Gibi) -> Self {
649 MinBytesForConcurrentDownload::new(v)
650 }
651}
652
653impl FromStr for MinBytesForConcurrentDownload {
654 type Err = anyhow::Error;
655
656 fn from_str(s: &str) -> Result<Self, Self::Err> {
657 s.parse::<UnitPrefix>()
658 .map(|up| MinBytesForConcurrentDownload(up.value()))
659 }
660}
661
662new_type! {
663 #[doc="Maximum time to wait for download buffers when all were full in ms"]
664 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
665 pub copy struct MaxBuffersFullDelayMs(u64, env="MAX_BUFFERS_FULL_DELAY_MS");
666}
667
668impl Default for MaxBuffersFullDelayMs {
669 fn default() -> Self {
670 Self(10)
671 }
672}
673
674impl From<MaxBuffersFullDelayMs> for Duration {
675 fn from(m: MaxBuffersFullDelayMs) -> Self {
676 Duration::from_millis(m.0)
677 }
678}
679
680new_type! {
681 #[doc="If set to `true` download related messages are logged at `DEBUG` level. Otherwise at `INFO` level."]
682 #[doc="The default is `true` (log on `DEBUG` level)."]
683 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
684 pub copy struct LogDownloadMessagesAsDebug(bool, env="LOG_DOWNLOAD_MESSAGES_AS_DEBUG");
685}
686
687impl LogDownloadMessagesAsDebug {
688 pub(crate) fn log<T: AsRef<str>>(self, msg: T) {
689 if *self {
690 debug!("{}", msg.as_ref());
691 } else {
692 info!("{}", msg.as_ref());
693 }
694 }
695
696 pub fn value(self) -> bool {
698 self.0
699 }
700}
701
702impl Default for LogDownloadMessagesAsDebug {
703 fn default() -> Self {
704 LogDownloadMessagesAsDebug(true)
705 }
706}
707
708#[derive(Debug, Clone, Copy, PartialEq, Eq)]
716pub enum SequentialDownloadMode {
717 KeepParts,
719 MergeParts,
727 Repartition { part_size: PartSizeBytes },
730}
731
732impl SequentialDownloadMode {
733 pub fn new<T: Into<SequentialDownloadMode>>(mode: T) -> Self {
734 mode.into()
735 }
736
737 env_funs!("SEQUENTIAL_DOWNLOAD_MODE");
738}
739
740impl Default for SequentialDownloadMode {
741 fn default() -> Self {
742 Self::MergeParts
743 }
744}
745
746impl From<PartSizeBytes> for SequentialDownloadMode {
747 fn from(part_size: PartSizeBytes) -> Self {
748 Self::Repartition { part_size }
749 }
750}
751
752impl FromStr for SequentialDownloadMode {
753 type Err = AnyError;
754
755 fn from_str(s: &str) -> Result<Self, Self::Err> {
756 let s = s.trim();
757 match s {
758 "KEEP_PARTS" => Ok(Self::KeepParts),
759 "SINGLE_DOWNLOAD" => Ok(Self::MergeParts),
760 probably_a_number => {
761 let part_size = probably_a_number.parse()?;
762 Ok(Self::Repartition { part_size })
763 }
764 }
765 }
766}
767
768#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
777pub struct Unit(pub u64);
778
779impl Unit {
780 pub const fn value(self) -> u64 {
782 self.0
783 }
784}
785
786impl From<Unit> for u64 {
787 fn from(m: Unit) -> Self {
788 m.value()
789 }
790}
791#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
800pub struct Kilo(pub u64);
801
802impl Kilo {
803 pub const fn value(self) -> u64 {
805 self.0 * 1_000
806 }
807}
808
809impl From<Kilo> for u64 {
810 fn from(m: Kilo) -> Self {
811 m.value()
812 }
813}
814
815#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
824pub struct Mega(pub u64);
825
826impl Mega {
827 pub const fn value(self) -> u64 {
829 self.0 * 1_000_000
830 }
831}
832
833impl From<Mega> for u64 {
834 fn from(m: Mega) -> Self {
835 m.value()
836 }
837}
838
839#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
848pub struct Giga(pub u64);
849
850impl Giga {
851 pub const fn value(self) -> u64 {
853 self.0 * 1_000_000_000
854 }
855}
856
857impl From<Giga> for u64 {
858 fn from(m: Giga) -> Self {
859 m.value()
860 }
861}
862
863#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
872pub struct Kibi(pub u64);
873
874impl Kibi {
875 pub const fn value(self) -> u64 {
877 self.0 * 1_024
878 }
879}
880
881impl From<Kibi> for u64 {
882 fn from(m: Kibi) -> Self {
883 m.value()
884 }
885}
886
887#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
896pub struct Mebi(pub u64);
897
898impl Mebi {
899 pub const fn value(self) -> u64 {
901 self.0 * 1_048_576
902 }
903}
904
905impl From<Mebi> for u64 {
906 fn from(m: Mebi) -> Self {
907 m.value()
908 }
909}
910
911#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
920pub struct Gibi(pub u64);
921
922impl Gibi {
923 pub const fn value(self) -> u64 {
925 self.0 * 1_073_741_824
926 }
927}
928
929impl From<Gibi> for u64 {
930 fn from(m: Gibi) -> Self {
931 m.value()
932 }
933}
934
935#[derive(Debug, Clone, Copy, Eq)]
978pub enum UnitPrefix {
979 Unit(u64),
982 Kilo(u64),
983 Mega(u64),
984 Giga(u64),
985 Kibi(u64),
986 Mebi(u64),
987 Gibi(u64),
988}
989
990impl UnitPrefix {
991 pub fn value(self) -> u64 {
1003 match self {
1004 UnitPrefix::Unit(v) => v,
1005 UnitPrefix::Kilo(v) => Kilo(v).value(),
1006 UnitPrefix::Mega(v) => Mega(v).value(),
1007 UnitPrefix::Giga(v) => Giga(v).value(),
1008 UnitPrefix::Kibi(v) => Kibi(v).value(),
1009 UnitPrefix::Mebi(v) => Mebi(v).value(),
1010 UnitPrefix::Gibi(v) => Gibi(v).value(),
1011 }
1012 }
1013}
1014
1015impl From<UnitPrefix> for u64 {
1016 fn from(m: UnitPrefix) -> Self {
1017 m.value()
1018 }
1019}
1020
1021impl From<u64> for UnitPrefix {
1022 fn from(v: u64) -> Self {
1023 UnitPrefix::Unit(v)
1024 }
1025}
1026
1027impl FromStr for UnitPrefix {
1028 type Err = AnyError;
1029
1030 fn from_str(s: &str) -> Result<Self, Self::Err> {
1031 let s = s.trim();
1032 if let Some(idx) = s.find(|c: char| c.is_alphabetic()) {
1033 if idx == 0 {
1034 bail!("'{}' needs digits", s)
1035 }
1036
1037 let digits = from_utf8(&s.as_bytes()[..idx])?.trim();
1038 let unit = from_utf8(&s.as_bytes()[idx..])?.trim();
1039
1040 let bytes = digits.parse::<u64>()?;
1041
1042 match unit {
1043 "k" => Ok(UnitPrefix::Kilo(bytes)),
1044 "M" => Ok(UnitPrefix::Mega(bytes)),
1045 "G" => Ok(UnitPrefix::Giga(bytes)),
1046 "Ki" => Ok(UnitPrefix::Kibi(bytes)),
1047 "Mi" => Ok(UnitPrefix::Mebi(bytes)),
1048 "Gi" => Ok(UnitPrefix::Gibi(bytes)),
1049 s => bail!("invalid unit: '{}'", s),
1050 }
1051 } else {
1052 Ok(s.parse::<u64>().map(UnitPrefix::Unit)?)
1053 }
1054 }
1055}
1056
1057impl PartialEq for UnitPrefix {
1058 fn eq(&self, other: &Self) -> bool {
1059 self.value() == other.value()
1060 }
1061}
1062
1063impl PartialOrd for UnitPrefix {
1064 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1065 self.value().partial_cmp(&other.value())
1066 }
1067}
1068
1069impl Ord for UnitPrefix {
1070 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1071 self.value().cmp(&other.value())
1072 }
1073}