openstack/common/
resourceiterator.rs

1// Copyright 2017 Dmitry Tantsur <divius.inside@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Generic API bits for implementing new services.
16
17use std::vec;
18
19use async_stream::try_stream;
20use async_trait::async_trait;
21use futures::pin_mut;
22use futures::stream::{Stream, TryStreamExt};
23
24use super::super::{Error, ErrorKind, Result};
25
26/// A query for resources.
27///
28/// This is a low-level trait that should not be used directly.
29#[async_trait]
30pub trait ResourceQuery {
31    /// Item type.
32    type Item;
33
34    /// Default limit to use with this query.
35    const DEFAULT_LIMIT: usize;
36
37    /// Whether pagination is supported for this query.
38    async fn can_paginate(&self) -> Result<bool>;
39
40    /// Extract a marker from a resource.
41    fn extract_marker(&self, resource: &Self::Item) -> String;
42
43    /// Get a chunk of resources.
44    async fn fetch_chunk(
45        &self,
46        limit: Option<usize>,
47        marker: Option<String>,
48    ) -> Result<Vec<Self::Item>>;
49
50    /// Validate the query before the first execution.
51    ///
52    /// This call may modify internal representation of the query, so changing
53    /// the query after calling it may cause undesired side effects.
54    async fn validate(&mut self) -> Result<()> {
55        Ok(())
56    }
57}
58
59/// Generic iterator over resources.
60#[derive(Debug, Clone)]
61pub struct ResourceIterator<Q: ResourceQuery> {
62    query: Q,
63    cache: Option<vec::IntoIter<Q::Item>>,
64    marker: Option<String>,
65    can_paginate: Option<bool>,
66    validated: bool,
67}
68
69impl<Q> ResourceIterator<Q>
70where
71    Q: ResourceQuery,
72{
73    #[allow(dead_code)] // unused with --no-default-features
74    pub(crate) fn new(query: Q) -> ResourceIterator<Q> {
75        ResourceIterator {
76            query,
77            cache: None,
78            marker: None,
79            can_paginate: None, // ask the service later
80            validated: false,
81        }
82    }
83}
84
85impl<Q> ResourceIterator<Q>
86where
87    Q: ResourceQuery + Send,
88{
89    /// Assert that only one item is left and fetch it.
90    ///
91    /// Fails with `ResourceNotFound` if no items are left and with
92    /// `TooManyItems` if there is more than one item left.
93    pub async fn one(self) -> Result<Q::Item> {
94        let stream = self.into_stream();
95        pin_mut!(stream);
96        match stream.try_next().await? {
97            Some(result) => {
98                if stream.try_next().await?.is_some() {
99                    Err(Error::new(
100                        ErrorKind::TooManyItems,
101                        "Query returned more than one result",
102                    ))
103                } else {
104                    Ok(result)
105                }
106            }
107            None => Err(Error::new(
108                ErrorKind::ResourceNotFound,
109                "Query returned no results",
110            )),
111        }
112    }
113
114    /// Convert this iterator into a proper implementor of the `Stream` trait.
115    ///
116    /// This stream yields `Result<Q::Item>` items and is therefore also an
117    /// implementor of the `TryStream` trait.
118    ///
119    /// Note that no requests are done until you start iterating.
120    pub fn into_stream(mut self) -> impl Stream<Item = Result<Q::Item>> {
121        try_stream! {
122            if !self.validated {
123                self.query.validate().await?;
124                self.validated = true;
125            }
126
127            if self.can_paginate.is_none() {
128                self.can_paginate = Some(self.query.can_paginate().await?);
129            }
130
131            loop {
132                let maybe_next = self.cache.as_mut().and_then(|cache| cache.next());
133                if let Some(next) = maybe_next {
134                    self.marker = Some(self.query.extract_marker(&next));
135                    yield next;
136                } else if self.cache.is_some() && self.can_paginate == Some(false) {
137                    // We have exhausted the results and pagination is not possible
138                    break;
139                } else {
140                    let (marker, limit) = if self.can_paginate == Some(true) {
141                        // can_paginate=true implies no limit was provided
142                        (self.marker.clone(), Some(Q::DEFAULT_LIMIT))
143                    } else {
144                        (None, None)
145                    };
146
147                    let mut iter = self.query.fetch_chunk(limit, marker).await?.into_iter();
148                    let maybe_next = iter.next();
149                    self.cache = Some(iter);
150                    if let Some(next) = maybe_next {
151                        self.marker = Some(self.query.extract_marker(&next));
152                        yield next;
153                    } else {
154                        break;
155                    }
156                }
157            }
158        }
159    }
160}
161
162#[cfg(test)]
163mod test {
164    use async_trait::async_trait;
165    use futures::stream::TryStreamExt;
166
167    use super::super::super::Result;
168    use super::{ResourceIterator, ResourceQuery};
169
170    #[derive(Debug, PartialEq, Eq)]
171    struct Test(u8);
172
173    #[derive(Debug)]
174    struct TestQuery;
175
176    #[async_trait]
177    impl ResourceQuery for TestQuery {
178        type Item = Test;
179
180        const DEFAULT_LIMIT: usize = 2;
181
182        async fn can_paginate(&self) -> Result<bool> {
183            Ok(true)
184        }
185
186        fn extract_marker(&self, resource: &Test) -> String {
187            resource.0.to_string()
188        }
189
190        async fn fetch_chunk(
191            &self,
192            limit: Option<usize>,
193            marker: Option<String>,
194        ) -> Result<Vec<Self::Item>> {
195            assert_eq!(limit, Some(2));
196            Ok(match marker.map(|s| s.parse::<u8>().unwrap()) {
197                Some(1) => vec![Test(2), Test(3)],
198                Some(3) => Vec::new(),
199                None => vec![Test(0), Test(1)],
200                Some(x) => panic!("unexpected marker {:?}", x),
201            })
202        }
203    }
204
205    #[derive(Debug)]
206    struct NoPagination;
207
208    #[async_trait]
209    impl ResourceQuery for NoPagination {
210        type Item = Test;
211
212        const DEFAULT_LIMIT: usize = 2;
213
214        async fn can_paginate(&self) -> Result<bool> {
215            Ok(false)
216        }
217
218        fn extract_marker(&self, resource: &Test) -> String {
219            resource.0.to_string()
220        }
221
222        async fn fetch_chunk(
223            &self,
224            limit: Option<usize>,
225            marker: Option<String>,
226        ) -> Result<Vec<Self::Item>> {
227            assert!(limit.is_none());
228            assert!(marker.is_none());
229            Ok(vec![Test(0), Test(1), Test(2)])
230        }
231    }
232
233    #[tokio::test]
234    async fn test_resource_iterator() {
235        let it: ResourceIterator<TestQuery> = ResourceIterator::new(TestQuery);
236        assert_eq!(
237            it.into_stream().try_collect::<Vec<Test>>().await.unwrap(),
238            vec![Test(0), Test(1), Test(2), Test(3)]
239        );
240    }
241
242    #[tokio::test]
243    async fn test_resource_iterator_no_pagination() {
244        let it: ResourceIterator<NoPagination> = ResourceIterator::new(NoPagination);
245        assert_eq!(
246            it.into_stream().try_collect::<Vec<Test>>().await.unwrap(),
247            vec![Test(0), Test(1), Test(2)]
248        );
249    }
250}