1use std::{convert::Infallible, str::FromStr};
9
10use futures::{future::BoxFuture, FutureExt};
11
12use crate::{errors::CondowError, streams::BytesStream, InclusiveRange};
13
14pub use in_memory::InMemoryClient;
15
16pub trait CondowClient: Clone + Send + Sync + 'static {
24 type Location: std::fmt::Debug + std::fmt::Display + Clone + Send + Sync + 'static;
25
26 fn get_size(&self, location: Self::Location) -> BoxFuture<'static, Result<u64, CondowError>>;
28
29 fn download(
31 &self,
32 location: Self::Location,
33 range: InclusiveRange,
34 ) -> BoxFuture<'static, Result<BytesStream, CondowError>>;
35
36 fn download_full(
38 &self,
39 location: Self::Location,
40 ) -> BoxFuture<'static, Result<BytesStream, CondowError>> {
41 let me = self.clone();
42 async move {
43 let len = me.get_size(location.clone()).await?;
44 me.download(location, InclusiveRange(0, len - 1)).await
45 }
46 .boxed()
47 }
48}
49
50#[derive(Debug, Clone, Copy)]
52pub struct IgnoreLocation;
53
54impl std::fmt::Display for IgnoreLocation {
55 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56 write!(f, "<no location>")
57 }
58}
59
60impl FromStr for IgnoreLocation {
61 type Err = Infallible;
62
63 fn from_str(_: &str) -> Result<Self, Self::Err> {
64 Ok(IgnoreLocation)
65 }
66}
67
68impl<T> From<T> for IgnoreLocation
69where
70 T: AsRef<str>,
71{
72 fn from(_: T) -> Self {
73 IgnoreLocation
74 }
75}
76
77mod in_memory {
78 use std::{marker::PhantomData, sync::Arc};
79
80 use crate::{
81 config::{Config, Mebi},
82 errors::CondowError,
83 streams::{BytesHint, BytesStream},
84 Condow, InclusiveRange,
85 };
86 use anyhow::Error as AnyError;
87 use bytes::Bytes;
88 use futures::{
89 future::{self, BoxFuture, FutureExt},
90 stream,
91 };
92 use tracing::trace;
93
94 use super::{CondowClient, IgnoreLocation};
95
96 #[derive(Clone)]
100 pub struct InMemoryClient<L = IgnoreLocation> {
101 blob: Blob,
102 chunk_size: usize,
103 _location: PhantomData<L>,
104 }
105
106 impl<L> InMemoryClient<L> {
107 pub fn new(blob: Vec<u8>) -> Self {
109 Self::new_shared(Arc::new(blob))
110 }
111
112 pub fn new_shared(blob: Arc<Vec<u8>>) -> Self {
114 Self {
115 blob: Blob::Owned(blob),
116 chunk_size: Mebi(4).value() as usize,
117 _location: PhantomData,
118 }
119 }
120
121 pub fn new_from_slice(blob: &[u8]) -> Self {
123 Self::new(blob.to_vec())
124 }
125
126 pub fn new_static(blob: &'static [u8]) -> Self {
128 Self {
129 blob: Blob::Static(blob),
130 chunk_size: Mebi(4).value() as usize,
131 _location: PhantomData,
132 }
133 }
134
135 pub fn chunk_size(mut self, chunk_size: usize) -> Self {
136 self.chunk_size = chunk_size;
137 self
138 }
139 }
140
141 impl<L> InMemoryClient<L>
142 where
143 L: std::fmt::Debug + std::fmt::Display + Clone + Send + Sync + 'static,
144 {
145 pub fn condow(&self, config: Config) -> Result<Condow<Self, ()>, AnyError> {
146 Condow::new(self.clone(), config)
147 }
148 }
149
150 impl<L> CondowClient for InMemoryClient<L>
151 where
152 L: std::fmt::Debug + std::fmt::Display + Clone + Send + Sync + 'static,
153 {
154 type Location = L;
155
156 fn get_size(
157 &self,
158 _location: Self::Location,
159 ) -> BoxFuture<'static, Result<u64, CondowError>> {
160 trace!("in-memory-client: get_size");
161
162 futures::future::ready(Ok(self.blob.len() as u64)).boxed()
163 }
164
165 fn download(
166 &self,
167 _location: Self::Location,
168 range: InclusiveRange,
169 ) -> BoxFuture<'static, Result<BytesStream, CondowError>> {
170 trace!("in-memory-client: download");
171
172 download(self.blob.as_slice(), self.chunk_size, range)
173 }
174 }
175
176 fn download(
177 blob: &[u8],
178 chunk_size: usize,
179 range: InclusiveRange,
180 ) -> BoxFuture<'static, Result<BytesStream, CondowError>> {
181 let range = {
182 let r = range.to_std_range_excl();
183 r.start as usize..r.end as usize
184 };
185
186 if range.end > blob.len() {
187 return Box::pin(future::ready(Err(CondowError::new_invalid_range(format!(
188 "max upper bound is {} but {} was requested",
189 blob.len() - 1,
190 range.end - 1
191 )))));
192 }
193
194 let slice = &blob[range];
195
196 let bytes_hint = BytesHint::new_exact(slice.len() as u64);
197
198 let iter = slice.chunks(chunk_size).map(Bytes::copy_from_slice).map(Ok);
199
200 let owned_bytes: Vec<_> = iter.collect();
201
202 let stream = stream::iter(owned_bytes);
203
204 let stream = BytesStream::new(stream, bytes_hint);
205
206 let f = future::ready(Ok(stream));
207
208 Box::pin(f)
209 }
210
211 #[derive(Clone)]
212 enum Blob {
213 Static(&'static [u8]),
214 Owned(Arc<Vec<u8>>),
215 }
216
217 impl Blob {
218 pub fn len(&self) -> usize {
219 match self {
220 Blob::Static(b) => b.len(),
221 Blob::Owned(b) => b.len(),
222 }
223 }
224
225 pub fn as_slice(&self) -> &[u8] {
226 match self {
227 Blob::Static(b) => b,
228 Blob::Owned(b) => b,
229 }
230 }
231 }
232
233 #[cfg(test)]
234 mod test {
235 use futures::{pin_mut, StreamExt};
236
237 use crate::{errors::CondowError, streams::BytesHint, InclusiveRange};
238
239 const BLOB: &[u8] = b"abcdefghijklmnopqrstuvwxyz";
240
241 async fn download_to_vec(
242 blob: &[u8],
243 chunk_size: usize,
244 range: InclusiveRange,
245 ) -> Result<(Vec<u8>, BytesHint), CondowError> {
246 let stream = super::download(blob, chunk_size, range).await?;
247
248 let bytes_hint = stream.bytes_hint();
249 let mut buf = Vec::with_capacity(bytes_hint.lower_bound() as usize);
250 pin_mut!(stream);
251 while let Some(next) = stream.next().await {
252 let bytes = next?;
253 buf.extend_from_slice(bytes.as_ref())
254 }
255 Ok((buf, bytes_hint))
256 }
257
258 #[tokio::test]
259 async fn download_all() {
260 for chunk_size in 1..30 {
261 let (bytes, bytes_hint) =
262 download_to_vec(BLOB, chunk_size, (0..=BLOB.len() as u64 - 1).into())
263 .await
264 .unwrap();
265
266 assert_eq!(&bytes, BLOB);
267 assert_eq!(bytes_hint, BytesHint::new_exact(bytes.len() as u64));
268 }
269 }
270
271 #[tokio::test]
272 async fn download_range_begin() {
273 for chunk_size in 1..30 {
274 let range = InclusiveRange(0, 9);
275 let (bytes, bytes_hint) = download_to_vec(BLOB, chunk_size, range).await.unwrap();
276
277 let expected = b"abcdefghij";
278
279 assert_eq!(&bytes, expected);
280 assert_eq!(bytes_hint, BytesHint::new_exact(expected.len() as u64));
281 }
282 }
283
284 #[tokio::test]
285 async fn download_range_middle() {
286 for chunk_size in 1..30 {
287 let range = InclusiveRange(10, 19);
288 let (bytes, bytes_hint) = download_to_vec(BLOB, chunk_size, range).await.unwrap();
289
290 let expected = b"klmnopqrst";
291
292 assert_eq!(&bytes, expected);
293 assert_eq!(bytes_hint, BytesHint::new_exact(expected.len() as u64));
294 }
295 }
296
297 #[tokio::test]
298 async fn download_range_end() {
299 for chunk_size in 1..30 {
300 let range = InclusiveRange(16, 25);
301 let (bytes, bytes_hint) = download_to_vec(BLOB, chunk_size, range).await.unwrap();
302
303 let expected = b"qrstuvwxyz";
304
305 assert_eq!(&bytes, expected);
306 assert_eq!(bytes_hint, BytesHint::new_exact(expected.len() as u64));
307 }
308 }
309 }
310}
311
312pub mod failing_client_simulator {
313 use bytes::Bytes;
315 use futures::{future, lock::Mutex, task, FutureExt, Stream};
316 use std::{fmt::Display, marker::PhantomData, sync::Arc, vec};
317 use tracing::trace;
318
319 use crate::{
320 condow_client::CondowClient,
321 config::Config,
322 errors::CondowError,
323 streams::{BytesHint, BytesStream, BytesStreamItem},
324 Condow, InclusiveRange,
325 };
326
327 pub use super::IgnoreLocation;
328
329 pub struct FailingClientSimulatorBuilder {
331 blob: Blob,
333 response_player: ResponsePlayer,
335 chunk_size: usize,
337 }
338
339 impl FailingClientSimulatorBuilder {
340 pub fn blob(mut self, blob: Vec<u8>) -> Self {
342 self.blob = Blob::Owned(Arc::new(blob));
343 self
344 }
345
346 pub fn blob_arc(mut self, blob: Arc<Vec<u8>>) -> Self {
348 self.blob = Blob::Owned(blob);
349 self
350 }
351
352 pub fn blob_from_slice(self, blob: &[u8]) -> Self {
354 self.blob(blob.to_vec())
355 }
356
357 pub fn blob_static(mut self, blob: &'static [u8]) -> Self {
359 self.blob = Blob::Static(blob);
360 self
361 }
362
363 pub fn response_player(mut self, player: ResponsePlayer) -> Self {
365 self.response_player = player;
366 self
367 }
368
369 pub fn responses(self) -> ResponsesBuilder {
371 ResponsesBuilder(self)
372 }
373
374 pub fn chunk_size(mut self, chunk_size: usize) -> Self {
380 if chunk_size == 0 {
381 panic!("chunk size must be greater than 0")
382 }
383
384 self.chunk_size = chunk_size;
385 self
386 }
387
388 pub fn finish(self) -> FailingClientSimulator {
390 FailingClientSimulator::new(self.blob, self.response_player, self.chunk_size)
391 }
392 }
393
394 impl Default for FailingClientSimulatorBuilder {
395 fn default() -> Self {
396 Self {
397 blob: Blob::Static(&[]),
398 response_player: Default::default(),
399 chunk_size: 3,
400 }
401 }
402 }
403
404 #[derive(Clone)]
412 pub struct FailingClientSimulator<L = IgnoreLocation> {
413 blob: Blob,
414 responses: Arc<Mutex<vec::IntoIter<ResponseBehaviour>>>,
415 chunk_size: usize,
416 _phantom: PhantomData<L>,
417 }
418
419 impl<L> FailingClientSimulator<L>
420 where
421 L: std::fmt::Debug + std::fmt::Display + Clone + Send + Sync + 'static,
422 {
423 fn new(blob: Blob, response_player: ResponsePlayer, chunk_size: usize) -> Self {
425 Self {
426 blob,
427 responses: Arc::new(Mutex::new(response_player.into_iter())),
428 chunk_size,
429 _phantom: PhantomData,
430 }
431 }
432
433 pub fn condow(&self, config: Config) -> Result<Condow<Self>, anyhow::Error> {
434 Condow::new(self.clone(), config)
435 }
436 }
437
438 impl<L> CondowClient for FailingClientSimulator<L>
439 where
440 L: std::fmt::Debug + std::fmt::Display + Clone + Send + Sync + 'static,
441 {
442 type Location = L;
443
444 fn get_size(
445 &self,
446 _location: Self::Location,
447 ) -> futures::future::BoxFuture<'static, Result<u64, CondowError>> {
448 trace!("failing-client-simulator: get_size");
449 future::ready(Ok(self.blob.len() as u64)).boxed()
450 }
451
452 fn download(
453 &self,
454 _location: Self::Location,
455 range: InclusiveRange,
456 ) -> futures::future::BoxFuture<'static, Result<BytesStream, CondowError>> {
457 trace!("failing-client-simulator: download");
458 let me = self.clone();
459
460 if range.end_incl() >= me.blob.len() as u64 {
461 let msg = format!(
462 "end of range incl. {} is behind slice end (len = {})",
463 range,
464 me.blob.len()
465 );
466 return futures::future::ready(Err(CondowError::new(
467 &msg,
468 crate::errors::CondowErrorKind::InvalidRange,
469 )))
470 .boxed();
471 }
472
473 let bytes_hint = BytesHint::new_exact(range.len());
474
475 async move {
476 let next_response = me
477 .responses
478 .lock()
479 .await
480 .next()
481 .unwrap_or(ResponseBehaviour::Success);
482
483 match next_response {
484 ResponseBehaviour::Success => {
485 let stream = BytesStreamWithError {
486 blob: me.blob,
487 next: range.start() as usize,
488 end_excl: range.end_incl() as usize + 1,
489 error: None,
490 chunk_size: me.chunk_size,
491 };
492 Ok(BytesStream::new(stream, bytes_hint))
493 }
494 ResponseBehaviour::SuccessWithFailungStream(error_offset) => {
495 let start = range.start() as usize;
496 let end_excl = (start + error_offset).min(range.end_incl() as usize + 1);
497 if start > end_excl {
498 panic!(
499 "start ({}) > end_excl ({}) with range {:?} and error offset {}",
500 start, end_excl, range, error_offset
501 );
502 }
503
504 let stream = BytesStreamWithError {
505 blob: me.blob,
506 next: start,
507 end_excl,
508 error: Some(ErrorAction::Err(format!(
509 "stream error at {}",
510 error_offset
511 ))),
512 chunk_size: me.chunk_size,
513 };
514 Ok(BytesStream::new(stream, bytes_hint))
515 }
516 ResponseBehaviour::Error(error) => Err(error),
517 ResponseBehaviour::Panic(msg) => {
518 panic!("{}", msg)
519 }
520 ResponseBehaviour::SuccessWithStreamPanic(panic_offset) => {
521 let start = range.start() as usize;
522 let end_excl = (start + panic_offset).min(range.end_incl() as usize + 1);
523 if start > end_excl {
524 panic!(
525 "start ({}) > end_excl ({}) with range {:?} and error offset {}",
526 start, end_excl, range, panic_offset
527 );
528 }
529
530 let stream = BytesStreamWithError {
531 blob: me.blob,
532 next: start,
533 end_excl,
534 error: Some(ErrorAction::Panic(format!(
535 "panic at byte {} of range {}",
536 panic_offset, range
537 ))),
538 chunk_size: me.chunk_size,
539 };
540 Ok(BytesStream::new(stream, bytes_hint))
541 }
542 }
543 }
544 .boxed()
545 }
546 }
547
548 #[derive(Clone)]
549 enum Blob {
550 Static(&'static [u8]),
551 Owned(Arc<Vec<u8>>),
552 }
553
554 impl Blob {
555 pub fn len(&self) -> usize {
556 match self {
557 Blob::Static(b) => b.len(),
558 Blob::Owned(b) => b.len(),
559 }
560 }
561
562 pub fn as_slice(&self) -> &[u8] {
563 match self {
564 Blob::Static(b) => b,
565 Blob::Owned(b) => b,
566 }
567 }
568 }
569
570 pub struct ResponsesBuilder(FailingClientSimulatorBuilder);
574
575 impl ResponsesBuilder {
576 pub fn success(mut self) -> Self {
578 self.0.response_player = self.0.response_player.success();
579 self
580 }
581
582 pub fn successes(mut self, count: usize) -> Self {
584 self.0.response_player = self.0.response_player.successes(count);
585 self
586 }
587
588 pub fn success_with_stream_failure(mut self, failure_offset: usize) -> Self {
592 self.0.response_player = self
593 .0
594 .response_player
595 .success_with_stream_failure(failure_offset);
596 self
597 }
598
599 pub fn successes_with_stream_failure<I>(mut self, failure_offsets: I) -> Self
603 where
604 I: IntoIterator<Item = usize>,
605 {
606 self.0.response_player = self
607 .0
608 .response_player
609 .successes_with_stream_failure(failure_offsets);
610 self
611 }
612
613 pub fn success_with_stream_panic(mut self, panic_offset: usize) -> Self {
617 self.0.response_player = self
618 .0
619 .response_player
620 .success_with_stream_panic(panic_offset);
621 self
622 }
623
624 pub fn successes_with_stream_panic<I>(mut self, panic_offsets: I) -> Self
628 where
629 I: IntoIterator<Item = usize>,
630 {
631 self.0.response_player = self
632 .0
633 .response_player
634 .successes_with_stream_panic(panic_offsets);
635 self
636 }
637
638 pub fn failure<E: Into<CondowError>>(mut self, error: E) -> Self {
640 self.0.response_player = self.0.response_player.failure(error);
641 self
642 }
643
644 pub fn failures<I, E>(mut self, errors: I) -> Self
646 where
647 I: IntoIterator<Item = E>,
648 E: Into<CondowError>,
649 {
650 self.0.response_player = self.0.response_player.failures(errors);
651 self
652 }
653
654 pub fn panic<M: Display + Send + 'static>(mut self, message: M) -> Self {
656 self.0.response_player = self.0.response_player.panic(message);
657 self
658 }
659
660 pub fn never(mut self) -> Self {
664 self.0.response_player = self.0.response_player.never();
665 self
666 }
667
668 pub fn done(self) -> FailingClientSimulatorBuilder {
670 self.0
671 }
672
673 pub fn finish(self) -> FailingClientSimulator {
675 self.0.finish()
676 }
677 }
678
679 impl From<ResponsesBuilder> for FailingClientSimulatorBuilder {
680 fn from(rb: ResponsesBuilder) -> Self {
681 rb.0
682 }
683 }
684
685 #[derive(Default)]
689 pub struct ResponsePlayer {
690 responses: Vec<ResponseBehaviour>,
691 counter: usize,
692 }
693
694 impl ResponsePlayer {
695 pub fn success(self) -> Self {
697 self.successes(1)
698 }
699
700 pub fn successes(mut self, count: usize) -> Self {
702 (0..count).for_each(|_| {
703 self.counter += 1;
704 self.responses.push(ResponseBehaviour::Success)
705 });
706 self
707 }
708
709 pub fn success_with_stream_failure(self, failure_offset: usize) -> Self {
713 self.successes_with_stream_failure([failure_offset])
714 }
715
716 pub fn successes_with_stream_failure<I>(mut self, failure_offsets: I) -> Self
720 where
721 I: IntoIterator<Item = usize>,
722 {
723 failure_offsets.into_iter().for_each(|offset| {
724 self.counter += 1;
725 self.responses
726 .push(ResponseBehaviour::SuccessWithFailungStream(offset))
727 });
728 self
729 }
730
731 pub fn success_with_stream_panic(self, panic_offset: usize) -> Self {
735 self.successes_with_stream_panic([panic_offset])
736 }
737
738 pub fn successes_with_stream_panic<I>(mut self, panic_offset: I) -> Self
742 where
743 I: IntoIterator<Item = usize>,
744 {
745 panic_offset.into_iter().for_each(|offset| {
746 self.counter += 1;
747 self.responses
748 .push(ResponseBehaviour::SuccessWithStreamPanic(offset))
749 });
750 self
751 }
752
753 pub fn failure<E: Into<CondowError>>(self, error: E) -> Self {
755 self.failures([error])
756 }
757
758 pub fn failures<I, E>(mut self, errors: I) -> Self
760 where
761 I: IntoIterator<Item = E>,
762 E: Into<CondowError>,
763 {
764 errors.into_iter().for_each(|e| {
765 self.counter += 1;
766 self.responses.push(ResponseBehaviour::Error(e.into()))
767 });
768 self
769 }
770
771 pub fn panic<M: Display + Send + 'static>(mut self, message: M) -> Self {
773 self.counter += 1;
774 self.responses
775 .push(ResponseBehaviour::Panic(Box::new(message)));
776 self
777 }
778
779 pub fn never(mut self) -> Self {
783 self.counter += 1;
784 let message = format!("request {} should have never happened", self.counter);
785 self.responses
786 .push(ResponseBehaviour::Panic(Box::new(message)));
787 self
788 }
789 }
790
791 impl IntoIterator for ResponsePlayer {
792 type Item = ResponseBehaviour;
793
794 type IntoIter = vec::IntoIter<ResponseBehaviour>;
795
796 fn into_iter(self) -> Self::IntoIter {
797 self.responses.into_iter()
798 }
799 }
800
801 pub enum ResponseBehaviour {
803 Success,
805 SuccessWithFailungStream(usize),
809 Error(CondowError),
811 Panic(Box<dyn Display + Send + 'static>),
813 SuccessWithStreamPanic(usize),
817 }
818
819 pub enum ErrorAction {
820 Err(String),
821 Panic(String),
822 }
823
824 struct BytesStreamWithError {
825 blob: Blob,
826 next: usize,
827 end_excl: usize,
828 error: Option<ErrorAction>,
829 chunk_size: usize,
830 }
831
832 impl Stream for BytesStreamWithError {
833 type Item = BytesStreamItem;
834
835 fn poll_next(
836 mut self: std::pin::Pin<&mut Self>,
837 _cx: &mut std::task::Context<'_>,
838 ) -> task::Poll<Option<Self::Item>> {
839 if self.next == self.end_excl || self.chunk_size == 0 {
840 if let Some(error_action) = self.error.take() {
841 match error_action {
842 ErrorAction::Err(msg) => {
843 let err = CondowError::new_io(msg);
844 return task::Poll::Ready(Some(Err(err)));
845 }
846 ErrorAction::Panic(msg) => panic!("{}", msg),
847 }
848 } else {
849 return task::Poll::Ready(None);
850 }
851 }
852
853 if self.end_excl < self.next {
854 panic!(
855 "invalid state in BytesStreamWithError! end_excl ({}) < next ({})",
856 self.end_excl, self.next
857 );
858 }
859
860 let effective_chunk_size = self.chunk_size.min(self.end_excl - self.next);
861 let start = self.next;
862 self.next += effective_chunk_size;
863 let slice: &[u8] = &self.blob.as_slice()[start..self.next];
864 let bytes = Bytes::copy_from_slice(slice);
865
866 task::Poll::Ready(Some(Ok(bytes)))
867 }
868 }
869
870 #[cfg(test)]
871 mod test_client {
872 use futures::StreamExt;
873
874 use crate::errors::CondowErrorKind;
875
876 use super::*;
877
878 const BLOB: &[u8] = &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15];
879
880 #[tokio::test]
881 async fn all_ok() {
882 let client = get_builder().finish();
883 let range = InclusiveRange(0u64, BLOB.len() as u64 - 1);
884
885 let result = download(&client, range).await.unwrap().unwrap();
886
887 assert_eq!(result, BLOB);
888 }
889
890 #[tokio::test]
891 #[should_panic(expected = "request 1 should have never happened")]
892 async fn never_1() {
893 let client = get_builder().responses().never().finish();
894 let range = InclusiveRange(0u64, BLOB.len() as u64 - 1);
895
896 let _result = download(&client, range).await;
897 }
898
899 #[tokio::test]
900 #[should_panic(expected = "request 2 should have never happened")]
901 async fn never_2() {
902 let client = get_builder().responses().success().never().finish();
903 let range = InclusiveRange(0u64, BLOB.len() as u64 - 1);
904
905 let _result = download(&client, range).await.unwrap().unwrap();
906 let _result = download(&client, range).await;
907 }
908
909 #[tokio::test]
910 async fn failed_request_1() {
911 let client = get_builder()
912 .responses()
913 .failure(CondowErrorKind::NotFound)
914 .finish();
915
916 let range = InclusiveRange(0u64, BLOB.len() as u64 - 1);
917
918 let result = download(&client, range).await.unwrap_err();
919
920 assert_eq!(result.kind(), CondowErrorKind::NotFound);
921 }
922
923 #[tokio::test]
924 async fn failed_request_2() {
925 let client = get_builder()
926 .responses()
927 .failure(CondowErrorKind::InvalidRange)
928 .failure(CondowErrorKind::NotFound)
929 .finish();
930
931 let range = InclusiveRange(0u64, BLOB.len() as u64 - 1);
932
933 let result = download(&client, range).await.unwrap_err();
934 assert_eq!(result.kind(), CondowErrorKind::InvalidRange);
935 let result = download(&client, range).await.unwrap_err();
936 assert_eq!(result.kind(), CondowErrorKind::NotFound);
937 }
938
939 #[tokio::test]
940 async fn failed_request_3() {
941 let client = get_builder()
942 .responses()
943 .failures([CondowErrorKind::InvalidRange, CondowErrorKind::Io])
944 .success()
945 .failure(CondowErrorKind::NotFound)
946 .finish();
947
948 let range = InclusiveRange(0u64, BLOB.len() as u64 - 1);
949
950 let result = download(&client, range).await.unwrap_err();
951 assert_eq!(result.kind(), CondowErrorKind::InvalidRange);
952 let result = download(&client, range).await.unwrap_err();
953 assert_eq!(result.kind(), CondowErrorKind::Io);
954 let result = download(&client, range).await.unwrap().unwrap();
955 assert_eq!(result, BLOB);
956 let result = download(&client, range).await.unwrap_err();
957 assert_eq!(result.kind(), CondowErrorKind::NotFound);
958 }
959
960 #[tokio::test]
961 async fn fail_and_success() {
962 let client = get_builder()
963 .responses()
964 .success()
965 .failure(CondowErrorKind::NotFound)
966 .success()
967 .failures([CondowErrorKind::InvalidRange, CondowErrorKind::Io])
968 .success()
969 .success()
970 .failure(CondowErrorKind::Remote)
971 .success()
972 .never()
973 .finish();
974
975 let range = InclusiveRange(0u64, BLOB.len() as u64 - 1);
976
977 let result = download(&client, range).await.unwrap().unwrap();
978 assert_eq!(result, BLOB, "1");
979 let result = download(&client, range).await.unwrap_err();
980 assert_eq!(result.kind(), CondowErrorKind::NotFound, "2");
981 let result = download(&client, range).await.unwrap().unwrap();
982 assert_eq!(result, BLOB, "3");
983 let result = download(&client, range).await.unwrap_err();
984 assert_eq!(result.kind(), CondowErrorKind::InvalidRange, "4");
985 let result = download(&client, range).await.unwrap_err();
986 assert_eq!(result.kind(), CondowErrorKind::Io, "5");
987 let result = download(&client, range).await.unwrap().unwrap();
988 assert_eq!(result, BLOB, "6");
989 let result = download(&client, range).await.unwrap().unwrap();
990 assert_eq!(result, BLOB, "7");
991 let result = download(&client, range).await.unwrap_err();
992 assert_eq!(result.kind(), CondowErrorKind::Remote, "8");
993 let result = download(&client, range).await.unwrap().unwrap();
994 assert_eq!(result, BLOB, "9");
995 }
996
997 #[tokio::test]
998 async fn failed_stream_start_1() {
999 let client = get_builder()
1000 .responses()
1001 .success_with_stream_failure(0)
1002 .finish();
1003
1004 let range = InclusiveRange(0u64, BLOB.len() as u64 - 1);
1005
1006 let result = download(&client, range).await.unwrap().unwrap_err();
1007 assert_eq!(result, &[], "err");
1008 let result = download(&client, range).await.unwrap().unwrap();
1009 assert_eq!(result, BLOB, "ok");
1010 }
1011
1012 #[tokio::test]
1013 async fn failed_stream_start_2() {
1014 let client = get_builder()
1015 .responses()
1016 .successes_with_stream_failure([0, 0])
1017 .finish();
1018
1019 let range = InclusiveRange(0u64, BLOB.len() as u64 - 1);
1020
1021 let result = download(&client, range).await.unwrap().unwrap_err();
1022 assert_eq!(result, &[], "err");
1023 let result = download(&client, range).await.unwrap().unwrap_err();
1024 assert_eq!(result, &[], "err");
1025 let result = download(&client, range).await.unwrap().unwrap();
1026 assert_eq!(result, BLOB, "ok");
1027 }
1028
1029 #[tokio::test]
1030 async fn failed_stream_1() {
1031 let client = get_builder()
1032 .responses()
1033 .successes_with_stream_failure([5])
1034 .finish();
1035
1036 let range = InclusiveRange(0u64, BLOB.len() as u64 - 1);
1037
1038 let result = download(&client, range).await.unwrap().unwrap_err();
1039 assert_eq!(result, &BLOB[0..5], "err");
1040 let result = download(&client, range).await.unwrap().unwrap();
1041 assert_eq!(result, BLOB, "ok");
1042 }
1043
1044 #[tokio::test]
1045 async fn failed_stream_2() {
1046 let client = get_builder()
1047 .responses()
1048 .successes_with_stream_failure([5, 10])
1049 .finish();
1050
1051 let range = InclusiveRange(0u64, BLOB.len() as u64 - 1);
1052
1053 let result = download(&client, range).await.unwrap().unwrap_err();
1054 assert_eq!(result, &BLOB[0..5], "err");
1055 let result = download(&client, range).await.unwrap().unwrap_err();
1056 assert_eq!(result, &BLOB[0..10], "err");
1057 let result = download(&client, range).await.unwrap().unwrap();
1058 assert_eq!(result, BLOB, "ok");
1059 }
1060
1061 #[tokio::test]
1062 async fn failed_stream_3() {
1063 let client = get_builder()
1064 .responses()
1065 .successes_with_stream_failure([5, 5, 5])
1066 .finish();
1067
1068 let range = InclusiveRange(0u64, BLOB.len() as u64 - 1);
1069
1070 let result = download(&client, range).await.unwrap().unwrap_err();
1071 assert_eq!(result, &BLOB[0..5], "err");
1072 let result = download(&client, range).await.unwrap().unwrap_err();
1073 assert_eq!(result, &BLOB[0..5], "err");
1074 let result = download(&client, range).await.unwrap().unwrap_err();
1075 assert_eq!(result, &BLOB[0..5], "err");
1076 let result = download(&client, range).await.unwrap().unwrap();
1077 assert_eq!(result, BLOB, "ok");
1078 }
1079
1080 #[tokio::test]
1081 async fn failed_stream_end_1() {
1082 let client = get_builder()
1083 .responses()
1084 .success_with_stream_failure(BLOB.len() - 1)
1085 .finish();
1086
1087 let range = InclusiveRange(0u64, BLOB.len() as u64 - 1);
1088
1089 let result = download(&client, range).await.unwrap().unwrap_err();
1090 assert_eq!(result, &BLOB[0..BLOB.len() - 1], "err");
1091 let result = download(&client, range).await.unwrap().unwrap();
1092 assert_eq!(result, BLOB, "ok");
1093 }
1094
1095 #[tokio::test]
1096 async fn failed_stream_end_2() {
1097 let client = get_builder()
1098 .responses()
1099 .success_with_stream_failure(BLOB.len())
1100 .finish();
1101
1102 let range = InclusiveRange(0u64, BLOB.len() as u64 - 1);
1103
1104 let result = download(&client, range).await.unwrap().unwrap_err();
1105 assert_eq!(result, BLOB, "err");
1106 let result = download(&client, range).await.unwrap().unwrap();
1107 assert_eq!(result, BLOB, "ok");
1108 }
1109
1110 #[tokio::test]
1111 async fn combined_errors() {
1112 let client = get_builder()
1113 .responses()
1114 .failure(CondowErrorKind::Io)
1115 .success_with_stream_failure(0)
1116 .failures([CondowErrorKind::Remote, CondowErrorKind::InvalidRange])
1117 .successes_with_stream_failure([5, 9])
1118 .success()
1119 .never()
1120 .finish();
1121 let range = InclusiveRange(0u64, BLOB.len() as u64 - 1);
1122
1123 let result = download(&client, range).await.unwrap_err();
1124 assert_eq!(result.kind(), CondowErrorKind::Io, "1");
1125 let result = download(&client, range).await.unwrap().unwrap_err();
1126 assert_eq!(result, &[], "2");
1127 let result = download(&client, range).await.unwrap_err();
1128 assert_eq!(result.kind(), CondowErrorKind::Remote, "3");
1129 let result = download(&client, range).await.unwrap_err();
1130 assert_eq!(result.kind(), CondowErrorKind::InvalidRange, "4");
1131 let result = download(&client, range).await.unwrap().unwrap_err();
1132 assert_eq!(result, &BLOB[0..5], "5");
1133 let result = download(&client, range).await.unwrap().unwrap_err();
1134 assert_eq!(result, &BLOB[0..9], "6");
1135 let result = download(&client, range).await.unwrap().unwrap();
1136 assert_eq!(result, BLOB, "ok");
1137 }
1138
1139 #[tokio::test]
1140 async fn combined_errors_with_range() {
1141 let client = get_builder()
1142 .responses()
1143 .failure(CondowErrorKind::Io)
1144 .success_with_stream_failure(0)
1145 .failures([CondowErrorKind::Remote, CondowErrorKind::InvalidRange])
1146 .successes_with_stream_failure([3, 4])
1147 .success()
1148 .never()
1149 .finish();
1150
1151 let result = download(&client, InclusiveRange(0u64, BLOB.len() as u64 - 1))
1152 .await
1153 .unwrap_err();
1154 assert_eq!(result.kind(), CondowErrorKind::Io, "1");
1155 let result = download(&client, InclusiveRange(0u64, BLOB.len() as u64 - 1))
1156 .await
1157 .unwrap()
1158 .unwrap_err();
1159 assert_eq!(result, &[], "2");
1160 let result = download(&client, InclusiveRange(0u64, BLOB.len() as u64 - 1))
1161 .await
1162 .unwrap_err();
1163 assert_eq!(result.kind(), CondowErrorKind::Remote, "3");
1164 let result = download(&client, InclusiveRange(0u64, BLOB.len() as u64 - 1))
1165 .await
1166 .unwrap_err();
1167 assert_eq!(result.kind(), CondowErrorKind::InvalidRange, "4");
1168 let result = download(&client, 2..=9).await.unwrap().unwrap_err();
1169 assert_eq!(result, &BLOB[2..5], "5");
1170 let result = download(&client, 5..=BLOB.len() as u64 - 1)
1171 .await
1172 .unwrap()
1173 .unwrap_err();
1174 assert_eq!(result, &BLOB[5..9], "6");
1175
1176 let result = download(&client, 3..=8).await.unwrap().unwrap();
1177 assert_eq!(result, &BLOB[3..=8], "ok");
1178 }
1179
1180 fn get_builder() -> FailingClientSimulatorBuilder {
1181 FailingClientSimulatorBuilder::default()
1182 .blob_static(BLOB)
1183 .chunk_size(3)
1184 }
1185
1186 async fn download<R: Into<InclusiveRange>>(
1187 client: &FailingClientSimulator,
1188 range: R,
1189 ) -> Result<Result<Vec<u8>, Vec<u8>>, CondowError> {
1190 let mut stream = client.download(IgnoreLocation, range.into()).await?;
1191
1192 let mut received = Vec::new();
1193
1194 while let Some(next) = stream.next().await {
1195 if let Ok(bytes) = next {
1196 received.extend_from_slice(&bytes);
1197 } else {
1198 return Ok(Err(received));
1199 }
1200 }
1201
1202 Ok(Ok(received))
1203 }
1204 }
1205
1206 #[cfg(test)]
1207 mod test_stream {
1208 use std::ops::Range;
1209
1210 use futures::StreamExt;
1211
1212 use super::*;
1213
1214 const BLOB: &[u8] = &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
1215
1216 #[tokio::test]
1217 async fn empty_ok() {
1218 for start in 0..BLOB.len() {
1219 for chunk_size in 1..BLOB.len() + 1 {
1220 let result = consume(start..start, chunk_size, false).await.unwrap();
1221 assert!(
1222 result.is_empty(),
1223 "chunk_size: {}, start: {}",
1224 chunk_size,
1225 start
1226 )
1227 }
1228 }
1229 }
1230
1231 #[tokio::test]
1232 async fn range_ok() {
1233 let result = consume(5..BLOB.len(), 3, false).await.unwrap();
1234 assert_eq!(result, &BLOB[5..BLOB.len()])
1235 }
1236
1237 #[tokio::test]
1238 async fn empty_err() {
1239 for start in 0..BLOB.len() {
1240 for chunk_size in 1..BLOB.len() + 1 {
1241 let result = consume(start..start, chunk_size, true).await.unwrap_err();
1242 assert!(
1243 result.is_empty(),
1244 "chunk_size: {}, start: {}",
1245 chunk_size,
1246 start
1247 )
1248 }
1249 }
1250 }
1251
1252 #[tokio::test]
1253 async fn to_end_ok() {
1254 for start in 0..BLOB.len() {
1255 for chunk_size in 1..BLOB.len() + 1 {
1256 let result = consume(start..BLOB.len(), chunk_size, false).await.unwrap();
1257 assert_eq!(
1258 result,
1259 BLOB[start..BLOB.len()],
1260 "chunk_size: {}, start: {}",
1261 chunk_size,
1262 start
1263 )
1264 }
1265 }
1266 }
1267
1268 #[tokio::test]
1269 async fn to_end_err() {
1270 for start in 0..BLOB.len() {
1271 for chunk_size in 1..BLOB.len() + 1 {
1272 let result = consume(start..BLOB.len(), chunk_size, true)
1273 .await
1274 .unwrap_err();
1275 assert_eq!(
1276 result,
1277 BLOB[start..BLOB.len()],
1278 "chunk_size: {}, start: {}",
1279 chunk_size,
1280 start
1281 )
1282 }
1283 }
1284 }
1285
1286 #[tokio::test]
1287 async fn from_start_ok() {
1288 for end in 0..BLOB.len() {
1289 for chunk_size in 1..BLOB.len() + 1 {
1290 let result = consume(0..end, chunk_size, false).await.unwrap();
1291 assert_eq!(
1292 result,
1293 BLOB[0..end],
1294 "chunk_size: {}, end: {}",
1295 chunk_size,
1296 end
1297 )
1298 }
1299 }
1300 }
1301
1302 #[tokio::test]
1303 async fn from_start_err() {
1304 for end in 0..BLOB.len() {
1305 for chunk_size in 1..BLOB.len() + 1 {
1306 let result = consume(0..end, chunk_size, true).await.unwrap_err();
1307 assert_eq!(
1308 result,
1309 BLOB[0..end],
1310 "chunk_size: {}, end: {}",
1311 chunk_size,
1312 end
1313 )
1314 }
1315 }
1316 }
1317
1318 #[tokio::test]
1319 async fn get_a_slice_ok() {
1320 let start = 3;
1321 let end_excl = 7;
1322 for chunk_size in 1..BLOB.len() + 1 {
1323 let result = consume(start..end_excl, chunk_size, false).await.unwrap();
1324 assert_eq!(
1325 result,
1326 BLOB[start..end_excl],
1327 "chunk_size: {}, start: {}, end_excl: {}",
1328 chunk_size,
1329 start,
1330 end_excl,
1331 )
1332 }
1333 }
1334
1335 #[tokio::test]
1336 async fn get_a_slice_err() {
1337 let start = 3;
1338 let end_excl = 7;
1339 for chunk_size in 1..BLOB.len() + 1 {
1340 let result = consume(start..end_excl, chunk_size, true)
1341 .await
1342 .unwrap_err();
1343 assert_eq!(
1344 result,
1345 BLOB[start..end_excl],
1346 "chunk_size: {}, start: {}, end_excl: {}",
1347 chunk_size,
1348 start,
1349 end_excl,
1350 )
1351 }
1352 }
1353
1354 async fn consume(
1355 range: Range<usize>,
1356 chunk_size: usize,
1357 err: bool,
1358 ) -> Result<Vec<u8>, Vec<u8>> {
1359 let mut stream = BytesStreamWithError {
1360 blob: Blob::Static(BLOB),
1361 next: range.start,
1362 end_excl: range.end,
1363 error: if err {
1364 Some(ErrorAction::Err("bang!".to_string()))
1365 } else {
1366 None
1367 },
1368 chunk_size,
1369 };
1370
1371 let mut collected = Vec::new();
1372 while let Some(next) = stream.next().await {
1373 match next {
1374 Ok(bytes) => collected.extend_from_slice(&bytes),
1375 Err(_err) => return Err(collected),
1376 }
1377 }
1378
1379 Ok(collected)
1380 }
1381 }
1382}