condow_core/
condow_client.rs

1//! Adapter for [crate::Condow] to access BLOBs to be downloaded
2//!
3//! There are also implementation of a client mostly for testing
4//!
5//! * [InMemoryClient]: A client which keeps data in memory and never fails
6//! * [failing_client_simulator]: A module containing a client with data kept in memory
7//! which can fail and cause panics.
8use std::{convert::Infallible, str::FromStr};
9
10use futures::{future::BoxFuture, FutureExt};
11
12use crate::{errors::CondowError, streams::BytesStream, InclusiveRange};
13
14pub use in_memory::InMemoryClient;
15
16/// A client to some service or other resource which supports
17/// partial downloads
18///
19/// This is an adapter trait
20///
21/// Implementors of this trait may not panic on calling any of the methods nor
22/// within any of the futures returned.
23pub trait CondowClient: Clone + Send + Sync + 'static {
24    type Location: std::fmt::Debug + std::fmt::Display + Clone + Send + Sync + 'static;
25
26    /// Returns the size of the BLOB at the given location
27    fn get_size(&self, location: Self::Location) -> BoxFuture<'static, Result<u64, CondowError>>;
28
29    /// Download a BLOB or part of a BLOB from the given location as specified by the [InclusiveRange]
30    fn download(
31        &self,
32        location: Self::Location,
33        range: InclusiveRange,
34    ) -> BoxFuture<'static, Result<BytesStream, CondowError>>;
35
36    /// Download a complete BLOB
37    fn download_full(
38        &self,
39        location: Self::Location,
40    ) -> BoxFuture<'static, Result<BytesStream, CondowError>> {
41        let me = self.clone();
42        async move {
43            let len = me.get_size(location.clone()).await?;
44            me.download(location, InclusiveRange(0, len - 1)).await
45        }
46        .boxed()
47    }
48}
49
50/// A location usable for testing.
51#[derive(Debug, Clone, Copy)]
52pub struct IgnoreLocation;
53
54impl std::fmt::Display for IgnoreLocation {
55    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56        write!(f, "<no location>")
57    }
58}
59
60impl FromStr for IgnoreLocation {
61    type Err = Infallible;
62
63    fn from_str(_: &str) -> Result<Self, Self::Err> {
64        Ok(IgnoreLocation)
65    }
66}
67
68impl<T> From<T> for IgnoreLocation
69where
70    T: AsRef<str>,
71{
72    fn from(_: T) -> Self {
73        IgnoreLocation
74    }
75}
76
77mod in_memory {
78    use std::{marker::PhantomData, sync::Arc};
79
80    use crate::{
81        config::{Config, Mebi},
82        errors::CondowError,
83        streams::{BytesHint, BytesStream},
84        Condow, InclusiveRange,
85    };
86    use anyhow::Error as AnyError;
87    use bytes::Bytes;
88    use futures::{
89        future::{self, BoxFuture, FutureExt},
90        stream,
91    };
92    use tracing::trace;
93
94    use super::{CondowClient, IgnoreLocation};
95
96    /// Holds the BLOB in memory as owned or static data.
97    ///
98    /// Use for testing.
99    #[derive(Clone)]
100    pub struct InMemoryClient<L = IgnoreLocation> {
101        blob: Blob,
102        chunk_size: usize,
103        _location: PhantomData<L>,
104    }
105
106    impl<L> InMemoryClient<L> {
107        /// BLOB from owned bytes
108        pub fn new(blob: Vec<u8>) -> Self {
109            Self::new_shared(Arc::new(blob))
110        }
111
112        /// BLOB from shared bytes
113        pub fn new_shared(blob: Arc<Vec<u8>>) -> Self {
114            Self {
115                blob: Blob::Owned(blob),
116                chunk_size: Mebi(4).value() as usize,
117                _location: PhantomData,
118            }
119        }
120
121        /// BLOB copied from slice
122        pub fn new_from_slice(blob: &[u8]) -> Self {
123            Self::new(blob.to_vec())
124        }
125
126        /// BLOB with static byte slice
127        pub fn new_static(blob: &'static [u8]) -> Self {
128            Self {
129                blob: Blob::Static(blob),
130                chunk_size: Mebi(4).value() as usize,
131                _location: PhantomData,
132            }
133        }
134
135        pub fn chunk_size(mut self, chunk_size: usize) -> Self {
136            self.chunk_size = chunk_size;
137            self
138        }
139    }
140
141    impl<L> InMemoryClient<L>
142    where
143        L: std::fmt::Debug + std::fmt::Display + Clone + Send + Sync + 'static,
144    {
145        pub fn condow(&self, config: Config) -> Result<Condow<Self, ()>, AnyError> {
146            Condow::new(self.clone(), config)
147        }
148    }
149
150    impl<L> CondowClient for InMemoryClient<L>
151    where
152        L: std::fmt::Debug + std::fmt::Display + Clone + Send + Sync + 'static,
153    {
154        type Location = L;
155
156        fn get_size(
157            &self,
158            _location: Self::Location,
159        ) -> BoxFuture<'static, Result<u64, CondowError>> {
160            trace!("in-memory-client: get_size");
161
162            futures::future::ready(Ok(self.blob.len() as u64)).boxed()
163        }
164
165        fn download(
166            &self,
167            _location: Self::Location,
168            range: InclusiveRange,
169        ) -> BoxFuture<'static, Result<BytesStream, CondowError>> {
170            trace!("in-memory-client: download");
171
172            download(self.blob.as_slice(), self.chunk_size, range)
173        }
174    }
175
176    fn download(
177        blob: &[u8],
178        chunk_size: usize,
179        range: InclusiveRange,
180    ) -> BoxFuture<'static, Result<BytesStream, CondowError>> {
181        let range = {
182            let r = range.to_std_range_excl();
183            r.start as usize..r.end as usize
184        };
185
186        if range.end > blob.len() {
187            return Box::pin(future::ready(Err(CondowError::new_invalid_range(format!(
188                "max upper bound is {} but {} was requested",
189                blob.len() - 1,
190                range.end - 1
191            )))));
192        }
193
194        let slice = &blob[range];
195
196        let bytes_hint = BytesHint::new_exact(slice.len() as u64);
197
198        let iter = slice.chunks(chunk_size).map(Bytes::copy_from_slice).map(Ok);
199
200        let owned_bytes: Vec<_> = iter.collect();
201
202        let stream = stream::iter(owned_bytes);
203
204        let stream = BytesStream::new(stream, bytes_hint);
205
206        let f = future::ready(Ok(stream));
207
208        Box::pin(f)
209    }
210
211    #[derive(Clone)]
212    enum Blob {
213        Static(&'static [u8]),
214        Owned(Arc<Vec<u8>>),
215    }
216
217    impl Blob {
218        pub fn len(&self) -> usize {
219            match self {
220                Blob::Static(b) => b.len(),
221                Blob::Owned(b) => b.len(),
222            }
223        }
224
225        pub fn as_slice(&self) -> &[u8] {
226            match self {
227                Blob::Static(b) => b,
228                Blob::Owned(b) => b,
229            }
230        }
231    }
232
233    #[cfg(test)]
234    mod test {
235        use futures::{pin_mut, StreamExt};
236
237        use crate::{errors::CondowError, streams::BytesHint, InclusiveRange};
238
239        const BLOB: &[u8] = b"abcdefghijklmnopqrstuvwxyz";
240
241        async fn download_to_vec(
242            blob: &[u8],
243            chunk_size: usize,
244            range: InclusiveRange,
245        ) -> Result<(Vec<u8>, BytesHint), CondowError> {
246            let stream = super::download(blob, chunk_size, range).await?;
247
248            let bytes_hint = stream.bytes_hint();
249            let mut buf = Vec::with_capacity(bytes_hint.lower_bound() as usize);
250            pin_mut!(stream);
251            while let Some(next) = stream.next().await {
252                let bytes = next?;
253                buf.extend_from_slice(bytes.as_ref())
254            }
255            Ok((buf, bytes_hint))
256        }
257
258        #[tokio::test]
259        async fn download_all() {
260            for chunk_size in 1..30 {
261                let (bytes, bytes_hint) =
262                    download_to_vec(BLOB, chunk_size, (0..=BLOB.len() as u64 - 1).into())
263                        .await
264                        .unwrap();
265
266                assert_eq!(&bytes, BLOB);
267                assert_eq!(bytes_hint, BytesHint::new_exact(bytes.len() as u64));
268            }
269        }
270
271        #[tokio::test]
272        async fn download_range_begin() {
273            for chunk_size in 1..30 {
274                let range = InclusiveRange(0, 9);
275                let (bytes, bytes_hint) = download_to_vec(BLOB, chunk_size, range).await.unwrap();
276
277                let expected = b"abcdefghij";
278
279                assert_eq!(&bytes, expected);
280                assert_eq!(bytes_hint, BytesHint::new_exact(expected.len() as u64));
281            }
282        }
283
284        #[tokio::test]
285        async fn download_range_middle() {
286            for chunk_size in 1..30 {
287                let range = InclusiveRange(10, 19);
288                let (bytes, bytes_hint) = download_to_vec(BLOB, chunk_size, range).await.unwrap();
289
290                let expected = b"klmnopqrst";
291
292                assert_eq!(&bytes, expected);
293                assert_eq!(bytes_hint, BytesHint::new_exact(expected.len() as u64));
294            }
295        }
296
297        #[tokio::test]
298        async fn download_range_end() {
299            for chunk_size in 1..30 {
300                let range = InclusiveRange(16, 25);
301                let (bytes, bytes_hint) = download_to_vec(BLOB, chunk_size, range).await.unwrap();
302
303                let expected = b"qrstuvwxyz";
304
305                assert_eq!(&bytes, expected);
306                assert_eq!(bytes_hint, BytesHint::new_exact(expected.len() as u64));
307            }
308        }
309    }
310}
311
312pub mod failing_client_simulator {
313    //! Simulate failing requests and streams
314    use bytes::Bytes;
315    use futures::{future, lock::Mutex, task, FutureExt, Stream};
316    use std::{fmt::Display, marker::PhantomData, sync::Arc, vec};
317    use tracing::trace;
318
319    use crate::{
320        condow_client::CondowClient,
321        config::Config,
322        errors::CondowError,
323        streams::{BytesHint, BytesStream, BytesStreamItem},
324        Condow, InclusiveRange,
325    };
326
327    pub use super::IgnoreLocation;
328
329    /// A builder for a [FailingClientSimulator]
330    pub struct FailingClientSimulatorBuilder {
331        /// The BLOB that would be streamed on a successful request without any errors
332        blob: Blob,
333        /// Offsets at which an IO-Error occurs while streaming bytes
334        response_player: ResponsePlayer,
335        /// Size of the streamed chunks
336        chunk_size: usize,
337    }
338
339    impl FailingClientSimulatorBuilder {
340        /// Blob from owned bytes
341        pub fn blob(mut self, blob: Vec<u8>) -> Self {
342            self.blob = Blob::Owned(Arc::new(blob));
343            self
344        }
345
346        /// Blob from shared bytes
347        pub fn blob_arc(mut self, blob: Arc<Vec<u8>>) -> Self {
348            self.blob = Blob::Owned(blob);
349            self
350        }
351
352        /// Blob copied from slice
353        pub fn blob_from_slice(self, blob: &[u8]) -> Self {
354            self.blob(blob.to_vec())
355        }
356
357        /// Blob with static byte slice
358        pub fn blob_static(mut self, blob: &'static [u8]) -> Self {
359            self.blob = Blob::Static(blob);
360            self
361        }
362
363        /// Set the given [ResponsePlayer]
364        pub fn response_player(mut self, player: ResponsePlayer) -> Self {
365            self.response_player = player;
366            self
367        }
368
369        /// Add to the current response player with a [ResponsesBuilder]
370        pub fn responses(self) -> ResponsesBuilder {
371            ResponsesBuilder(self)
372        }
373
374        /// Set the chunk size
375        ///
376        /// # Panics
377        ///
378        /// If `chunk_size` is 0.
379        pub fn chunk_size(mut self, chunk_size: usize) -> Self {
380            if chunk_size == 0 {
381                panic!("chunk size must be greater than 0")
382            }
383
384            self.chunk_size = chunk_size;
385            self
386        }
387
388        /// Create the [FailingClientSimulator]
389        pub fn finish(self) -> FailingClientSimulator {
390            FailingClientSimulator::new(self.blob, self.response_player, self.chunk_size)
391        }
392    }
393
394    impl Default for FailingClientSimulatorBuilder {
395        fn default() -> Self {
396            Self {
397                blob: Blob::Static(&[]),
398                response_player: Default::default(),
399                chunk_size: 3,
400            }
401        }
402    }
403
404    /// Simulates a failing client.
405    ///
406    /// Has limited capabilities to simulate failure scenarios (mostly) for testing.
407    ///
408    /// `get_size` will always succeed.
409    ///
410    /// Clones will share the responses to be played back
411    #[derive(Clone)]
412    pub struct FailingClientSimulator<L = IgnoreLocation> {
413        blob: Blob,
414        responses: Arc<Mutex<vec::IntoIter<ResponseBehaviour>>>,
415        chunk_size: usize,
416        _phantom: PhantomData<L>,
417    }
418
419    impl<L> FailingClientSimulator<L>
420    where
421        L: std::fmt::Debug + std::fmt::Display + Clone + Send + Sync + 'static,
422    {
423        /// Create a new instance
424        fn new(blob: Blob, response_player: ResponsePlayer, chunk_size: usize) -> Self {
425            Self {
426                blob,
427                responses: Arc::new(Mutex::new(response_player.into_iter())),
428                chunk_size,
429                _phantom: PhantomData,
430            }
431        }
432
433        pub fn condow(&self, config: Config) -> Result<Condow<Self>, anyhow::Error> {
434            Condow::new(self.clone(), config)
435        }
436    }
437
438    impl<L> CondowClient for FailingClientSimulator<L>
439    where
440        L: std::fmt::Debug + std::fmt::Display + Clone + Send + Sync + 'static,
441    {
442        type Location = L;
443
444        fn get_size(
445            &self,
446            _location: Self::Location,
447        ) -> futures::future::BoxFuture<'static, Result<u64, CondowError>> {
448            trace!("failing-client-simulator: get_size");
449            future::ready(Ok(self.blob.len() as u64)).boxed()
450        }
451
452        fn download(
453            &self,
454            _location: Self::Location,
455            range: InclusiveRange,
456        ) -> futures::future::BoxFuture<'static, Result<BytesStream, CondowError>> {
457            trace!("failing-client-simulator: download");
458            let me = self.clone();
459
460            if range.end_incl() >= me.blob.len() as u64 {
461                let msg = format!(
462                    "end of range incl. {} is behind slice end (len = {})",
463                    range,
464                    me.blob.len()
465                );
466                return futures::future::ready(Err(CondowError::new(
467                    &msg,
468                    crate::errors::CondowErrorKind::InvalidRange,
469                )))
470                .boxed();
471            }
472
473            let bytes_hint = BytesHint::new_exact(range.len());
474
475            async move {
476                let next_response = me
477                    .responses
478                    .lock()
479                    .await
480                    .next()
481                    .unwrap_or(ResponseBehaviour::Success);
482
483                match next_response {
484                    ResponseBehaviour::Success => {
485                        let stream = BytesStreamWithError {
486                            blob: me.blob,
487                            next: range.start() as usize,
488                            end_excl: range.end_incl() as usize + 1,
489                            error: None,
490                            chunk_size: me.chunk_size,
491                        };
492                        Ok(BytesStream::new(stream, bytes_hint))
493                    }
494                    ResponseBehaviour::SuccessWithFailungStream(error_offset) => {
495                        let start = range.start() as usize;
496                        let end_excl = (start + error_offset).min(range.end_incl() as usize + 1);
497                        if start > end_excl {
498                            panic!(
499                                "start ({}) > end_excl ({}) with range {:?} and error offset {}",
500                                start, end_excl, range, error_offset
501                            );
502                        }
503
504                        let stream = BytesStreamWithError {
505                            blob: me.blob,
506                            next: start,
507                            end_excl,
508                            error: Some(ErrorAction::Err(format!(
509                                "stream error at {}",
510                                error_offset
511                            ))),
512                            chunk_size: me.chunk_size,
513                        };
514                        Ok(BytesStream::new(stream, bytes_hint))
515                    }
516                    ResponseBehaviour::Error(error) => Err(error),
517                    ResponseBehaviour::Panic(msg) => {
518                        panic!("{}", msg)
519                    }
520                    ResponseBehaviour::SuccessWithStreamPanic(panic_offset) => {
521                        let start = range.start() as usize;
522                        let end_excl = (start + panic_offset).min(range.end_incl() as usize + 1);
523                        if start > end_excl {
524                            panic!(
525                                "start ({}) > end_excl ({}) with range {:?} and error offset {}",
526                                start, end_excl, range, panic_offset
527                            );
528                        }
529
530                        let stream = BytesStreamWithError {
531                            blob: me.blob,
532                            next: start,
533                            end_excl,
534                            error: Some(ErrorAction::Panic(format!(
535                                "panic at byte {} of range {}",
536                                panic_offset, range
537                            ))),
538                            chunk_size: me.chunk_size,
539                        };
540                        Ok(BytesStream::new(stream, bytes_hint))
541                    }
542                }
543            }
544            .boxed()
545        }
546    }
547
548    #[derive(Clone)]
549    enum Blob {
550        Static(&'static [u8]),
551        Owned(Arc<Vec<u8>>),
552    }
553
554    impl Blob {
555        pub fn len(&self) -> usize {
556            match self {
557                Blob::Static(b) => b.len(),
558                Blob::Owned(b) => b.len(),
559            }
560        }
561
562        pub fn as_slice(&self) -> &[u8] {
563            match self {
564                Blob::Static(b) => b,
565                Blob::Owned(b) => b,
566            }
567        }
568    }
569
570    /// A builder to add responses to a [FailingClientSimulator]
571    ///
572    /// This is simply a seperated an API for the [FailingClientSimulatorBuilder]
573    pub struct ResponsesBuilder(FailingClientSimulatorBuilder);
574
575    impl ResponsesBuilder {
576        /// Add a successful response with a successful stream
577        pub fn success(mut self) -> Self {
578            self.0.response_player = self.0.response_player.success();
579            self
580        }
581
582        /// Add multiple successful responses with successful streams
583        pub fn successes(mut self, count: usize) -> Self {
584            self.0.response_player = self.0.response_player.successes(count);
585            self
586        }
587
588        /// Add a successful response with the stream failing at the given offset
589        ///
590        /// The failure will occur at the given offset of the queried range
591        pub fn success_with_stream_failure(mut self, failure_offset: usize) -> Self {
592            self.0.response_player = self
593                .0
594                .response_player
595                .success_with_stream_failure(failure_offset);
596            self
597        }
598
599        /// Add multiple successful responses with the streams each failing at a given offset
600        ///
601        /// The failures will occur at the given offsets of the queried ranges
602        pub fn successes_with_stream_failure<I>(mut self, failure_offsets: I) -> Self
603        where
604            I: IntoIterator<Item = usize>,
605        {
606            self.0.response_player = self
607                .0
608                .response_player
609                .successes_with_stream_failure(failure_offsets);
610            self
611        }
612
613        /// Add a successful response with the stream panicking at the given offset
614        ///
615        /// The panic will occur at the given offset of the queried range
616        pub fn success_with_stream_panic(mut self, panic_offset: usize) -> Self {
617            self.0.response_player = self
618                .0
619                .response_player
620                .success_with_stream_panic(panic_offset);
621            self
622        }
623
624        /// Add multiple successful responses with the streams each panicking at a given offset
625        ///
626        /// The panics will occur at the given offsets of the queried ranges
627        pub fn successes_with_stream_panic<I>(mut self, panic_offsets: I) -> Self
628        where
629            I: IntoIterator<Item = usize>,
630        {
631            self.0.response_player = self
632                .0
633                .response_player
634                .successes_with_stream_panic(panic_offsets);
635            self
636        }
637
638        /// Add a single failing response
639        pub fn failure<E: Into<CondowError>>(mut self, error: E) -> Self {
640            self.0.response_player = self.0.response_player.failure(error);
641            self
642        }
643
644        /// Add a chain of failing responses
645        pub fn failures<I, E>(mut self, errors: I) -> Self
646        where
647            I: IntoIterator<Item = E>,
648            E: Into<CondowError>,
649        {
650            self.0.response_player = self.0.response_player.failures(errors);
651            self
652        }
653
654        /// Causes a panic once the request is made
655        pub fn panic<M: Display + Send + 'static>(mut self, message: M) -> Self {
656            self.0.response_player = self.0.response_player.panic(message);
657            self
658        }
659
660        /// Causes a panic once the request is made
661        ///
662        /// The panic will contain a message containing the request number
663        pub fn never(mut self) -> Self {
664            self.0.response_player = self.0.response_player.never();
665            self
666        }
667
668        /// Get back to the [FailingClientSimulatorBuilder]s API.
669        pub fn done(self) -> FailingClientSimulatorBuilder {
670            self.0
671        }
672
673        /// Create the [FailingClientSimulator]
674        pub fn finish(self) -> FailingClientSimulator {
675            self.0.finish()
676        }
677    }
678
679    impl From<ResponsesBuilder> for FailingClientSimulatorBuilder {
680        fn from(rb: ResponsesBuilder) -> Self {
681            rb.0
682        }
683    }
684
685    /// Plays back responses for each request made
686    ///
687    /// Responses are delivered in the ordering the requests are made.
688    #[derive(Default)]
689    pub struct ResponsePlayer {
690        responses: Vec<ResponseBehaviour>,
691        counter: usize,
692    }
693
694    impl ResponsePlayer {
695        /// Add a successful response with a successful stream
696        pub fn success(self) -> Self {
697            self.successes(1)
698        }
699
700        /// Add multiple successful responses with successful streams
701        pub fn successes(mut self, count: usize) -> Self {
702            (0..count).for_each(|_| {
703                self.counter += 1;
704                self.responses.push(ResponseBehaviour::Success)
705            });
706            self
707        }
708
709        /// Add a successful response with the stream failing at the given offset
710        ///
711        /// The failure will occur at the given offset of the queried range
712        pub fn success_with_stream_failure(self, failure_offset: usize) -> Self {
713            self.successes_with_stream_failure([failure_offset])
714        }
715
716        /// Add multiple successful responses with the streams each failing at a given offset
717        ///
718        /// The failures will occur at the given offsets of the queried ranges
719        pub fn successes_with_stream_failure<I>(mut self, failure_offsets: I) -> Self
720        where
721            I: IntoIterator<Item = usize>,
722        {
723            failure_offsets.into_iter().for_each(|offset| {
724                self.counter += 1;
725                self.responses
726                    .push(ResponseBehaviour::SuccessWithFailungStream(offset))
727            });
728            self
729        }
730
731        /// Add a successful response with the stream panicking at the given offset
732        ///
733        /// The panics will occur at the given offset of the queried range
734        pub fn success_with_stream_panic(self, panic_offset: usize) -> Self {
735            self.successes_with_stream_panic([panic_offset])
736        }
737
738        /// Add multiple successful responses with the streams each panicking at a given offset
739        ///
740        /// The panics will occur at the given offsets of the queried ranges
741        pub fn successes_with_stream_panic<I>(mut self, panic_offset: I) -> Self
742        where
743            I: IntoIterator<Item = usize>,
744        {
745            panic_offset.into_iter().for_each(|offset| {
746                self.counter += 1;
747                self.responses
748                    .push(ResponseBehaviour::SuccessWithStreamPanic(offset))
749            });
750            self
751        }
752
753        /// Add a single failing response
754        pub fn failure<E: Into<CondowError>>(self, error: E) -> Self {
755            self.failures([error])
756        }
757
758        /// Add a chain of failing responses
759        pub fn failures<I, E>(mut self, errors: I) -> Self
760        where
761            I: IntoIterator<Item = E>,
762            E: Into<CondowError>,
763        {
764            errors.into_iter().for_each(|e| {
765                self.counter += 1;
766                self.responses.push(ResponseBehaviour::Error(e.into()))
767            });
768            self
769        }
770
771        /// Causes a panic once the request is made
772        pub fn panic<M: Display + Send + 'static>(mut self, message: M) -> Self {
773            self.counter += 1;
774            self.responses
775                .push(ResponseBehaviour::Panic(Box::new(message)));
776            self
777        }
778
779        /// Causes a panic once the request is made
780        ///
781        /// The panic will contain a message containing the request number
782        pub fn never(mut self) -> Self {
783            self.counter += 1;
784            let message = format!("request {} should have never happened", self.counter);
785            self.responses
786                .push(ResponseBehaviour::Panic(Box::new(message)));
787            self
788        }
789    }
790
791    impl IntoIterator for ResponsePlayer {
792        type Item = ResponseBehaviour;
793
794        type IntoIter = vec::IntoIter<ResponseBehaviour>;
795
796        fn into_iter(self) -> Self::IntoIter {
797            self.responses.into_iter()
798        }
799    }
800
801    /// The behaviour to respond to a request
802    pub enum ResponseBehaviour {
803        /// Respond with a success and also a non failing stream
804        Success,
805        /// Respond with a success but the stream will fail at the given offset
806        ///
807        /// A stream is defined by the queried range.
808        SuccessWithFailungStream(usize),
809        /// The response will be an error
810        Error(CondowError),
811        /// The request will cause a panic
812        Panic(Box<dyn Display + Send + 'static>),
813        /// Respond with a success but the stream will panic at the given offset
814        ///
815        /// A stream is defined by the queried range.
816        SuccessWithStreamPanic(usize),
817    }
818
819    pub enum ErrorAction {
820        Err(String),
821        Panic(String),
822    }
823
824    struct BytesStreamWithError {
825        blob: Blob,
826        next: usize,
827        end_excl: usize,
828        error: Option<ErrorAction>,
829        chunk_size: usize,
830    }
831
832    impl Stream for BytesStreamWithError {
833        type Item = BytesStreamItem;
834
835        fn poll_next(
836            mut self: std::pin::Pin<&mut Self>,
837            _cx: &mut std::task::Context<'_>,
838        ) -> task::Poll<Option<Self::Item>> {
839            if self.next == self.end_excl || self.chunk_size == 0 {
840                if let Some(error_action) = self.error.take() {
841                    match error_action {
842                        ErrorAction::Err(msg) => {
843                            let err = CondowError::new_io(msg);
844                            return task::Poll::Ready(Some(Err(err)));
845                        }
846                        ErrorAction::Panic(msg) => panic!("{}", msg),
847                    }
848                } else {
849                    return task::Poll::Ready(None);
850                }
851            }
852
853            if self.end_excl < self.next {
854                panic!(
855                    "invalid state in BytesStreamWithError! end_excl ({}) < next ({})",
856                    self.end_excl, self.next
857                );
858            }
859
860            let effective_chunk_size = self.chunk_size.min(self.end_excl - self.next);
861            let start = self.next;
862            self.next += effective_chunk_size;
863            let slice: &[u8] = &self.blob.as_slice()[start..self.next];
864            let bytes = Bytes::copy_from_slice(slice);
865
866            task::Poll::Ready(Some(Ok(bytes)))
867        }
868    }
869
870    #[cfg(test)]
871    mod test_client {
872        use futures::StreamExt;
873
874        use crate::errors::CondowErrorKind;
875
876        use super::*;
877
878        const BLOB: &[u8] = &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15];
879
880        #[tokio::test]
881        async fn all_ok() {
882            let client = get_builder().finish();
883            let range = InclusiveRange(0u64, BLOB.len() as u64 - 1);
884
885            let result = download(&client, range).await.unwrap().unwrap();
886
887            assert_eq!(result, BLOB);
888        }
889
890        #[tokio::test]
891        #[should_panic(expected = "request 1 should have never happened")]
892        async fn never_1() {
893            let client = get_builder().responses().never().finish();
894            let range = InclusiveRange(0u64, BLOB.len() as u64 - 1);
895
896            let _result = download(&client, range).await;
897        }
898
899        #[tokio::test]
900        #[should_panic(expected = "request 2 should have never happened")]
901        async fn never_2() {
902            let client = get_builder().responses().success().never().finish();
903            let range = InclusiveRange(0u64, BLOB.len() as u64 - 1);
904
905            let _result = download(&client, range).await.unwrap().unwrap();
906            let _result = download(&client, range).await;
907        }
908
909        #[tokio::test]
910        async fn failed_request_1() {
911            let client = get_builder()
912                .responses()
913                .failure(CondowErrorKind::NotFound)
914                .finish();
915
916            let range = InclusiveRange(0u64, BLOB.len() as u64 - 1);
917
918            let result = download(&client, range).await.unwrap_err();
919
920            assert_eq!(result.kind(), CondowErrorKind::NotFound);
921        }
922
923        #[tokio::test]
924        async fn failed_request_2() {
925            let client = get_builder()
926                .responses()
927                .failure(CondowErrorKind::InvalidRange)
928                .failure(CondowErrorKind::NotFound)
929                .finish();
930
931            let range = InclusiveRange(0u64, BLOB.len() as u64 - 1);
932
933            let result = download(&client, range).await.unwrap_err();
934            assert_eq!(result.kind(), CondowErrorKind::InvalidRange);
935            let result = download(&client, range).await.unwrap_err();
936            assert_eq!(result.kind(), CondowErrorKind::NotFound);
937        }
938
939        #[tokio::test]
940        async fn failed_request_3() {
941            let client = get_builder()
942                .responses()
943                .failures([CondowErrorKind::InvalidRange, CondowErrorKind::Io])
944                .success()
945                .failure(CondowErrorKind::NotFound)
946                .finish();
947
948            let range = InclusiveRange(0u64, BLOB.len() as u64 - 1);
949
950            let result = download(&client, range).await.unwrap_err();
951            assert_eq!(result.kind(), CondowErrorKind::InvalidRange);
952            let result = download(&client, range).await.unwrap_err();
953            assert_eq!(result.kind(), CondowErrorKind::Io);
954            let result = download(&client, range).await.unwrap().unwrap();
955            assert_eq!(result, BLOB);
956            let result = download(&client, range).await.unwrap_err();
957            assert_eq!(result.kind(), CondowErrorKind::NotFound);
958        }
959
960        #[tokio::test]
961        async fn fail_and_success() {
962            let client = get_builder()
963                .responses()
964                .success()
965                .failure(CondowErrorKind::NotFound)
966                .success()
967                .failures([CondowErrorKind::InvalidRange, CondowErrorKind::Io])
968                .success()
969                .success()
970                .failure(CondowErrorKind::Remote)
971                .success()
972                .never()
973                .finish();
974
975            let range = InclusiveRange(0u64, BLOB.len() as u64 - 1);
976
977            let result = download(&client, range).await.unwrap().unwrap();
978            assert_eq!(result, BLOB, "1");
979            let result = download(&client, range).await.unwrap_err();
980            assert_eq!(result.kind(), CondowErrorKind::NotFound, "2");
981            let result = download(&client, range).await.unwrap().unwrap();
982            assert_eq!(result, BLOB, "3");
983            let result = download(&client, range).await.unwrap_err();
984            assert_eq!(result.kind(), CondowErrorKind::InvalidRange, "4");
985            let result = download(&client, range).await.unwrap_err();
986            assert_eq!(result.kind(), CondowErrorKind::Io, "5");
987            let result = download(&client, range).await.unwrap().unwrap();
988            assert_eq!(result, BLOB, "6");
989            let result = download(&client, range).await.unwrap().unwrap();
990            assert_eq!(result, BLOB, "7");
991            let result = download(&client, range).await.unwrap_err();
992            assert_eq!(result.kind(), CondowErrorKind::Remote, "8");
993            let result = download(&client, range).await.unwrap().unwrap();
994            assert_eq!(result, BLOB, "9");
995        }
996
997        #[tokio::test]
998        async fn failed_stream_start_1() {
999            let client = get_builder()
1000                .responses()
1001                .success_with_stream_failure(0)
1002                .finish();
1003
1004            let range = InclusiveRange(0u64, BLOB.len() as u64 - 1);
1005
1006            let result = download(&client, range).await.unwrap().unwrap_err();
1007            assert_eq!(result, &[], "err");
1008            let result = download(&client, range).await.unwrap().unwrap();
1009            assert_eq!(result, BLOB, "ok");
1010        }
1011
1012        #[tokio::test]
1013        async fn failed_stream_start_2() {
1014            let client = get_builder()
1015                .responses()
1016                .successes_with_stream_failure([0, 0])
1017                .finish();
1018
1019            let range = InclusiveRange(0u64, BLOB.len() as u64 - 1);
1020
1021            let result = download(&client, range).await.unwrap().unwrap_err();
1022            assert_eq!(result, &[], "err");
1023            let result = download(&client, range).await.unwrap().unwrap_err();
1024            assert_eq!(result, &[], "err");
1025            let result = download(&client, range).await.unwrap().unwrap();
1026            assert_eq!(result, BLOB, "ok");
1027        }
1028
1029        #[tokio::test]
1030        async fn failed_stream_1() {
1031            let client = get_builder()
1032                .responses()
1033                .successes_with_stream_failure([5])
1034                .finish();
1035
1036            let range = InclusiveRange(0u64, BLOB.len() as u64 - 1);
1037
1038            let result = download(&client, range).await.unwrap().unwrap_err();
1039            assert_eq!(result, &BLOB[0..5], "err");
1040            let result = download(&client, range).await.unwrap().unwrap();
1041            assert_eq!(result, BLOB, "ok");
1042        }
1043
1044        #[tokio::test]
1045        async fn failed_stream_2() {
1046            let client = get_builder()
1047                .responses()
1048                .successes_with_stream_failure([5, 10])
1049                .finish();
1050
1051            let range = InclusiveRange(0u64, BLOB.len() as u64 - 1);
1052
1053            let result = download(&client, range).await.unwrap().unwrap_err();
1054            assert_eq!(result, &BLOB[0..5], "err");
1055            let result = download(&client, range).await.unwrap().unwrap_err();
1056            assert_eq!(result, &BLOB[0..10], "err");
1057            let result = download(&client, range).await.unwrap().unwrap();
1058            assert_eq!(result, BLOB, "ok");
1059        }
1060
1061        #[tokio::test]
1062        async fn failed_stream_3() {
1063            let client = get_builder()
1064                .responses()
1065                .successes_with_stream_failure([5, 5, 5])
1066                .finish();
1067
1068            let range = InclusiveRange(0u64, BLOB.len() as u64 - 1);
1069
1070            let result = download(&client, range).await.unwrap().unwrap_err();
1071            assert_eq!(result, &BLOB[0..5], "err");
1072            let result = download(&client, range).await.unwrap().unwrap_err();
1073            assert_eq!(result, &BLOB[0..5], "err");
1074            let result = download(&client, range).await.unwrap().unwrap_err();
1075            assert_eq!(result, &BLOB[0..5], "err");
1076            let result = download(&client, range).await.unwrap().unwrap();
1077            assert_eq!(result, BLOB, "ok");
1078        }
1079
1080        #[tokio::test]
1081        async fn failed_stream_end_1() {
1082            let client = get_builder()
1083                .responses()
1084                .success_with_stream_failure(BLOB.len() - 1)
1085                .finish();
1086
1087            let range = InclusiveRange(0u64, BLOB.len() as u64 - 1);
1088
1089            let result = download(&client, range).await.unwrap().unwrap_err();
1090            assert_eq!(result, &BLOB[0..BLOB.len() - 1], "err");
1091            let result = download(&client, range).await.unwrap().unwrap();
1092            assert_eq!(result, BLOB, "ok");
1093        }
1094
1095        #[tokio::test]
1096        async fn failed_stream_end_2() {
1097            let client = get_builder()
1098                .responses()
1099                .success_with_stream_failure(BLOB.len())
1100                .finish();
1101
1102            let range = InclusiveRange(0u64, BLOB.len() as u64 - 1);
1103
1104            let result = download(&client, range).await.unwrap().unwrap_err();
1105            assert_eq!(result, BLOB, "err");
1106            let result = download(&client, range).await.unwrap().unwrap();
1107            assert_eq!(result, BLOB, "ok");
1108        }
1109
1110        #[tokio::test]
1111        async fn combined_errors() {
1112            let client = get_builder()
1113                .responses()
1114                .failure(CondowErrorKind::Io)
1115                .success_with_stream_failure(0)
1116                .failures([CondowErrorKind::Remote, CondowErrorKind::InvalidRange])
1117                .successes_with_stream_failure([5, 9])
1118                .success()
1119                .never()
1120                .finish();
1121            let range = InclusiveRange(0u64, BLOB.len() as u64 - 1);
1122
1123            let result = download(&client, range).await.unwrap_err();
1124            assert_eq!(result.kind(), CondowErrorKind::Io, "1");
1125            let result = download(&client, range).await.unwrap().unwrap_err();
1126            assert_eq!(result, &[], "2");
1127            let result = download(&client, range).await.unwrap_err();
1128            assert_eq!(result.kind(), CondowErrorKind::Remote, "3");
1129            let result = download(&client, range).await.unwrap_err();
1130            assert_eq!(result.kind(), CondowErrorKind::InvalidRange, "4");
1131            let result = download(&client, range).await.unwrap().unwrap_err();
1132            assert_eq!(result, &BLOB[0..5], "5");
1133            let result = download(&client, range).await.unwrap().unwrap_err();
1134            assert_eq!(result, &BLOB[0..9], "6");
1135            let result = download(&client, range).await.unwrap().unwrap();
1136            assert_eq!(result, BLOB, "ok");
1137        }
1138
1139        #[tokio::test]
1140        async fn combined_errors_with_range() {
1141            let client = get_builder()
1142                .responses()
1143                .failure(CondowErrorKind::Io)
1144                .success_with_stream_failure(0)
1145                .failures([CondowErrorKind::Remote, CondowErrorKind::InvalidRange])
1146                .successes_with_stream_failure([3, 4])
1147                .success()
1148                .never()
1149                .finish();
1150
1151            let result = download(&client, InclusiveRange(0u64, BLOB.len() as u64 - 1))
1152                .await
1153                .unwrap_err();
1154            assert_eq!(result.kind(), CondowErrorKind::Io, "1");
1155            let result = download(&client, InclusiveRange(0u64, BLOB.len() as u64 - 1))
1156                .await
1157                .unwrap()
1158                .unwrap_err();
1159            assert_eq!(result, &[], "2");
1160            let result = download(&client, InclusiveRange(0u64, BLOB.len() as u64 - 1))
1161                .await
1162                .unwrap_err();
1163            assert_eq!(result.kind(), CondowErrorKind::Remote, "3");
1164            let result = download(&client, InclusiveRange(0u64, BLOB.len() as u64 - 1))
1165                .await
1166                .unwrap_err();
1167            assert_eq!(result.kind(), CondowErrorKind::InvalidRange, "4");
1168            let result = download(&client, 2..=9).await.unwrap().unwrap_err();
1169            assert_eq!(result, &BLOB[2..5], "5");
1170            let result = download(&client, 5..=BLOB.len() as u64 - 1)
1171                .await
1172                .unwrap()
1173                .unwrap_err();
1174            assert_eq!(result, &BLOB[5..9], "6");
1175
1176            let result = download(&client, 3..=8).await.unwrap().unwrap();
1177            assert_eq!(result, &BLOB[3..=8], "ok");
1178        }
1179
1180        fn get_builder() -> FailingClientSimulatorBuilder {
1181            FailingClientSimulatorBuilder::default()
1182                .blob_static(BLOB)
1183                .chunk_size(3)
1184        }
1185
1186        async fn download<R: Into<InclusiveRange>>(
1187            client: &FailingClientSimulator,
1188            range: R,
1189        ) -> Result<Result<Vec<u8>, Vec<u8>>, CondowError> {
1190            let mut stream = client.download(IgnoreLocation, range.into()).await?;
1191
1192            let mut received = Vec::new();
1193
1194            while let Some(next) = stream.next().await {
1195                if let Ok(bytes) = next {
1196                    received.extend_from_slice(&bytes);
1197                } else {
1198                    return Ok(Err(received));
1199                }
1200            }
1201
1202            Ok(Ok(received))
1203        }
1204    }
1205
1206    #[cfg(test)]
1207    mod test_stream {
1208        use std::ops::Range;
1209
1210        use futures::StreamExt;
1211
1212        use super::*;
1213
1214        const BLOB: &[u8] = &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
1215
1216        #[tokio::test]
1217        async fn empty_ok() {
1218            for start in 0..BLOB.len() {
1219                for chunk_size in 1..BLOB.len() + 1 {
1220                    let result = consume(start..start, chunk_size, false).await.unwrap();
1221                    assert!(
1222                        result.is_empty(),
1223                        "chunk_size: {}, start: {}",
1224                        chunk_size,
1225                        start
1226                    )
1227                }
1228            }
1229        }
1230
1231        #[tokio::test]
1232        async fn range_ok() {
1233            let result = consume(5..BLOB.len(), 3, false).await.unwrap();
1234            assert_eq!(result, &BLOB[5..BLOB.len()])
1235        }
1236
1237        #[tokio::test]
1238        async fn empty_err() {
1239            for start in 0..BLOB.len() {
1240                for chunk_size in 1..BLOB.len() + 1 {
1241                    let result = consume(start..start, chunk_size, true).await.unwrap_err();
1242                    assert!(
1243                        result.is_empty(),
1244                        "chunk_size: {}, start: {}",
1245                        chunk_size,
1246                        start
1247                    )
1248                }
1249            }
1250        }
1251
1252        #[tokio::test]
1253        async fn to_end_ok() {
1254            for start in 0..BLOB.len() {
1255                for chunk_size in 1..BLOB.len() + 1 {
1256                    let result = consume(start..BLOB.len(), chunk_size, false).await.unwrap();
1257                    assert_eq!(
1258                        result,
1259                        BLOB[start..BLOB.len()],
1260                        "chunk_size: {}, start: {}",
1261                        chunk_size,
1262                        start
1263                    )
1264                }
1265            }
1266        }
1267
1268        #[tokio::test]
1269        async fn to_end_err() {
1270            for start in 0..BLOB.len() {
1271                for chunk_size in 1..BLOB.len() + 1 {
1272                    let result = consume(start..BLOB.len(), chunk_size, true)
1273                        .await
1274                        .unwrap_err();
1275                    assert_eq!(
1276                        result,
1277                        BLOB[start..BLOB.len()],
1278                        "chunk_size: {}, start: {}",
1279                        chunk_size,
1280                        start
1281                    )
1282                }
1283            }
1284        }
1285
1286        #[tokio::test]
1287        async fn from_start_ok() {
1288            for end in 0..BLOB.len() {
1289                for chunk_size in 1..BLOB.len() + 1 {
1290                    let result = consume(0..end, chunk_size, false).await.unwrap();
1291                    assert_eq!(
1292                        result,
1293                        BLOB[0..end],
1294                        "chunk_size: {}, end: {}",
1295                        chunk_size,
1296                        end
1297                    )
1298                }
1299            }
1300        }
1301
1302        #[tokio::test]
1303        async fn from_start_err() {
1304            for end in 0..BLOB.len() {
1305                for chunk_size in 1..BLOB.len() + 1 {
1306                    let result = consume(0..end, chunk_size, true).await.unwrap_err();
1307                    assert_eq!(
1308                        result,
1309                        BLOB[0..end],
1310                        "chunk_size: {}, end: {}",
1311                        chunk_size,
1312                        end
1313                    )
1314                }
1315            }
1316        }
1317
1318        #[tokio::test]
1319        async fn get_a_slice_ok() {
1320            let start = 3;
1321            let end_excl = 7;
1322            for chunk_size in 1..BLOB.len() + 1 {
1323                let result = consume(start..end_excl, chunk_size, false).await.unwrap();
1324                assert_eq!(
1325                    result,
1326                    BLOB[start..end_excl],
1327                    "chunk_size: {}, start: {}, end_excl: {}",
1328                    chunk_size,
1329                    start,
1330                    end_excl,
1331                )
1332            }
1333        }
1334
1335        #[tokio::test]
1336        async fn get_a_slice_err() {
1337            let start = 3;
1338            let end_excl = 7;
1339            for chunk_size in 1..BLOB.len() + 1 {
1340                let result = consume(start..end_excl, chunk_size, true)
1341                    .await
1342                    .unwrap_err();
1343                assert_eq!(
1344                    result,
1345                    BLOB[start..end_excl],
1346                    "chunk_size: {}, start: {}, end_excl: {}",
1347                    chunk_size,
1348                    start,
1349                    end_excl,
1350                )
1351            }
1352        }
1353
1354        async fn consume(
1355            range: Range<usize>,
1356            chunk_size: usize,
1357            err: bool,
1358        ) -> Result<Vec<u8>, Vec<u8>> {
1359            let mut stream = BytesStreamWithError {
1360                blob: Blob::Static(BLOB),
1361                next: range.start,
1362                end_excl: range.end,
1363                error: if err {
1364                    Some(ErrorAction::Err("bang!".to_string()))
1365                } else {
1366                    None
1367                },
1368                chunk_size,
1369            };
1370
1371            let mut collected = Vec::new();
1372            while let Some(next) = stream.next().await {
1373                match next {
1374                    Ok(bytes) => collected.extend_from_slice(&bytes),
1375                    Err(_err) => return Err(collected),
1376                }
1377            }
1378
1379            Ok(collected)
1380        }
1381    }
1382}