openstack/common/
resourceiterator.rs1use std::vec;
18
19use async_stream::try_stream;
20use async_trait::async_trait;
21use futures::pin_mut;
22use futures::stream::{Stream, TryStreamExt};
23
24use super::super::{Error, ErrorKind, Result};
25
26#[async_trait]
30pub trait ResourceQuery {
31 type Item;
33
34 const DEFAULT_LIMIT: usize;
36
37 async fn can_paginate(&self) -> Result<bool>;
39
40 fn extract_marker(&self, resource: &Self::Item) -> String;
42
43 async fn fetch_chunk(
45 &self,
46 limit: Option<usize>,
47 marker: Option<String>,
48 ) -> Result<Vec<Self::Item>>;
49
50 async fn validate(&mut self) -> Result<()> {
55 Ok(())
56 }
57}
58
59#[derive(Debug, Clone)]
61pub struct ResourceIterator<Q: ResourceQuery> {
62 query: Q,
63 cache: Option<vec::IntoIter<Q::Item>>,
64 marker: Option<String>,
65 can_paginate: Option<bool>,
66 validated: bool,
67}
68
69impl<Q> ResourceIterator<Q>
70where
71 Q: ResourceQuery,
72{
73 #[allow(dead_code)] pub(crate) fn new(query: Q) -> ResourceIterator<Q> {
75 ResourceIterator {
76 query,
77 cache: None,
78 marker: None,
79 can_paginate: None, validated: false,
81 }
82 }
83}
84
85impl<Q> ResourceIterator<Q>
86where
87 Q: ResourceQuery + Send,
88{
89 pub async fn one(self) -> Result<Q::Item> {
94 let stream = self.into_stream();
95 pin_mut!(stream);
96 match stream.try_next().await? {
97 Some(result) => {
98 if stream.try_next().await?.is_some() {
99 Err(Error::new(
100 ErrorKind::TooManyItems,
101 "Query returned more than one result",
102 ))
103 } else {
104 Ok(result)
105 }
106 }
107 None => Err(Error::new(
108 ErrorKind::ResourceNotFound,
109 "Query returned no results",
110 )),
111 }
112 }
113
114 pub fn into_stream(mut self) -> impl Stream<Item = Result<Q::Item>> {
121 try_stream! {
122 if !self.validated {
123 self.query.validate().await?;
124 self.validated = true;
125 }
126
127 if self.can_paginate.is_none() {
128 self.can_paginate = Some(self.query.can_paginate().await?);
129 }
130
131 loop {
132 let maybe_next = self.cache.as_mut().and_then(|cache| cache.next());
133 if let Some(next) = maybe_next {
134 self.marker = Some(self.query.extract_marker(&next));
135 yield next;
136 } else if self.cache.is_some() && self.can_paginate == Some(false) {
137 break;
139 } else {
140 let (marker, limit) = if self.can_paginate == Some(true) {
141 (self.marker.clone(), Some(Q::DEFAULT_LIMIT))
143 } else {
144 (None, None)
145 };
146
147 let mut iter = self.query.fetch_chunk(limit, marker).await?.into_iter();
148 let maybe_next = iter.next();
149 self.cache = Some(iter);
150 if let Some(next) = maybe_next {
151 self.marker = Some(self.query.extract_marker(&next));
152 yield next;
153 } else {
154 break;
155 }
156 }
157 }
158 }
159 }
160}
161
162#[cfg(test)]
163mod test {
164 use async_trait::async_trait;
165 use futures::stream::TryStreamExt;
166
167 use super::super::super::Result;
168 use super::{ResourceIterator, ResourceQuery};
169
170 #[derive(Debug, PartialEq, Eq)]
171 struct Test(u8);
172
173 #[derive(Debug)]
174 struct TestQuery;
175
176 #[async_trait]
177 impl ResourceQuery for TestQuery {
178 type Item = Test;
179
180 const DEFAULT_LIMIT: usize = 2;
181
182 async fn can_paginate(&self) -> Result<bool> {
183 Ok(true)
184 }
185
186 fn extract_marker(&self, resource: &Test) -> String {
187 resource.0.to_string()
188 }
189
190 async fn fetch_chunk(
191 &self,
192 limit: Option<usize>,
193 marker: Option<String>,
194 ) -> Result<Vec<Self::Item>> {
195 assert_eq!(limit, Some(2));
196 Ok(match marker.map(|s| s.parse::<u8>().unwrap()) {
197 Some(1) => vec![Test(2), Test(3)],
198 Some(3) => Vec::new(),
199 None => vec![Test(0), Test(1)],
200 Some(x) => panic!("unexpected marker {:?}", x),
201 })
202 }
203 }
204
205 #[derive(Debug)]
206 struct NoPagination;
207
208 #[async_trait]
209 impl ResourceQuery for NoPagination {
210 type Item = Test;
211
212 const DEFAULT_LIMIT: usize = 2;
213
214 async fn can_paginate(&self) -> Result<bool> {
215 Ok(false)
216 }
217
218 fn extract_marker(&self, resource: &Test) -> String {
219 resource.0.to_string()
220 }
221
222 async fn fetch_chunk(
223 &self,
224 limit: Option<usize>,
225 marker: Option<String>,
226 ) -> Result<Vec<Self::Item>> {
227 assert!(limit.is_none());
228 assert!(marker.is_none());
229 Ok(vec![Test(0), Test(1), Test(2)])
230 }
231 }
232
233 #[tokio::test]
234 async fn test_resource_iterator() {
235 let it: ResourceIterator<TestQuery> = ResourceIterator::new(TestQuery);
236 assert_eq!(
237 it.into_stream().try_collect::<Vec<Test>>().await.unwrap(),
238 vec![Test(0), Test(1), Test(2), Test(3)]
239 );
240 }
241
242 #[tokio::test]
243 async fn test_resource_iterator_no_pagination() {
244 let it: ResourceIterator<NoPagination> = ResourceIterator::new(NoPagination);
245 assert_eq!(
246 it.into_stream().try_collect::<Vec<Test>>().await.unwrap(),
247 vec![Test(0), Test(1), Test(2)]
248 );
249 }
250}