virustotal_rs/iterator_utils/
traits.rs1use std::time::Duration;
4
5#[async_trait::async_trait]
7pub trait PaginatedIterator<T: Send> {
8 type Error: std::error::Error + Send + Sync + 'static;
9
10 async fn next_batch(&mut self) -> std::result::Result<Vec<T>, Self::Error>;
12
13 fn has_more(&self) -> bool;
15
16 async fn collect_all(mut self) -> std::result::Result<Vec<T>, Self::Error>
18 where
19 Self: Sized,
20 {
21 let mut all_items = Vec::new();
22
23 while self.has_more() {
24 let batch = self.next_batch().await?;
25 if batch.is_empty() {
26 break;
27 }
28 all_items.extend(batch);
29 }
30
31 Ok(all_items)
32 }
33
34 fn hint_remaining(&self) -> Option<usize> {
36 None
37 }
38
39 fn stats(&self) -> IteratorStats {
41 IteratorStats::default()
42 }
43}
44
45pub trait Pageable<T>
47where
48 T: Send,
49{
50 fn get_page(
52 &self,
53 page: u32,
54 page_size: Option<u32>,
55 ) -> impl std::future::Future<
56 Output = std::result::Result<Vec<T>, Box<dyn std::error::Error + Send + Sync>>,
57 > + Send;
58}
59
60#[async_trait::async_trait]
62pub trait IteratorExt<T: Send>: PaginatedIterator<T> {
63 fn map<U, F>(self, mapper: F) -> super::adapters::MappedIterator<Self, T, U, F>
65 where
66 Self: Sized,
67 U: Send,
68 F: Fn(T) -> U + Send + Sync,
69 {
70 super::adapters::MappedIterator::new(self, mapper)
71 }
72
73 fn filter<F>(self, predicate: F) -> super::adapters::FilteredIterator<Self, T, F>
75 where
76 Self: Sized,
77 F: Fn(&T) -> bool + Send + Sync,
78 {
79 super::adapters::FilteredIterator::new(self, predicate)
80 }
81
82 fn take_until<F>(self, condition: F) -> super::adapters::TakeUntilIterator<Self, T, F>
84 where
85 Self: Sized,
86 F: Fn(&T) -> bool + Send + Sync,
87 {
88 super::adapters::TakeUntilIterator::new(self, condition)
89 }
90
91 fn batch(self, size: usize) -> super::adapters::BatchIterator<Self, T>
93 where
94 Self: Sized,
95 {
96 super::adapters::BatchIterator::new(self, size)
97 }
98
99 fn throttle(self, delay: Duration) -> super::adapters::ThrottledIterator<Self, T>
101 where
102 Self: Sized,
103 {
104 super::adapters::ThrottledIterator::new(self, delay)
105 }
106
107 fn retry(
109 self,
110 max_retries: u32,
111 base_delay: Duration,
112 ) -> super::adapters::RetryIterator<Self, T>
113 where
114 Self: Sized,
115 {
116 super::adapters::RetryIterator::new(self, max_retries, base_delay)
117 }
118
119 fn with_progress(
121 self,
122 tracker: super::progress::ProgressTracker,
123 ) -> super::adapters::ProgressIterator<Self, T>
124 where
125 Self: Sized,
126 {
127 super::adapters::ProgressIterator::new(self, tracker)
128 }
129
130 fn cached(self) -> super::adapters::CachedIterator<Self, T>
132 where
133 Self: Sized,
134 {
135 super::adapters::CachedIterator::new(self)
136 }
137
138 fn skip(self, count: usize) -> super::adapters::SkippedIterator<Self, T>
140 where
141 Self: Sized,
142 {
143 super::adapters::SkippedIterator::new(self, count)
144 }
145}
146
147impl<I, T: Send> IteratorExt<T> for I where I: PaginatedIterator<T> {}
149
150#[derive(Debug, Clone, Default)]
152pub struct IteratorStats {
153 pub batches_fetched: u64,
155 pub items_fetched: u64,
157 pub fetch_duration: Duration,
159 pub retries: u32,
161}
162
163#[async_trait::async_trait]
165pub trait Collectable<T: Send> {
166 type Error: std::error::Error + Send + Sync + 'static;
167
168 async fn take(self, n: usize) -> std::result::Result<Vec<T>, Self::Error>
170 where
171 Self: Sized;
172
173 async fn take_while<F>(self, predicate: F) -> std::result::Result<Vec<T>, Self::Error>
175 where
176 Self: Sized,
177 F: Fn(&T) -> bool + Send;
178
179 async fn collect_all(self) -> std::result::Result<Vec<T>, Self::Error>
181 where
182 Self: Sized;
183}
184
185#[async_trait::async_trait]
187impl<I, T> Collectable<T> for I
188where
189 I: PaginatedIterator<T> + Send,
190 T: Send,
191{
192 type Error = I::Error;
193
194 async fn take(mut self, n: usize) -> std::result::Result<Vec<T>, Self::Error> {
195 let mut result = Vec::with_capacity(n.min(1000));
196
197 while result.len() < n && self.has_more() {
198 let batch = self.next_batch().await?;
199 if batch.is_empty() {
200 break;
201 }
202
203 for item in batch {
204 if result.len() >= n {
205 break;
206 }
207 result.push(item);
208 }
209 }
210
211 Ok(result)
212 }
213
214 async fn take_while<F>(mut self, predicate: F) -> std::result::Result<Vec<T>, Self::Error>
215 where
216 Self: Sized,
217 F: Fn(&T) -> bool + Send,
218 {
219 let mut result = Vec::new();
220
221 while self.has_more() {
222 let batch = self.next_batch().await?;
223 if batch.is_empty() {
224 break;
225 }
226
227 for item in batch {
228 if !predicate(&item) {
229 return Ok(result);
230 }
231 result.push(item);
232 }
233 }
234
235 Ok(result)
236 }
237
238 async fn collect_all(self) -> std::result::Result<Vec<T>, Self::Error>
239 where
240 Self: Sized,
241 {
242 PaginatedIterator::collect_all(self).await
243 }
244}