booru_rs/stream.rs
1//! Async stream utilities for paginated results.
2//!
3//! This module provides utilities for iterating through paginated
4//! booru results using async streams.
5
6use crate::client::{Client, ClientBuilder};
7use crate::error::Result;
8
9/// An async stream that yields pages of posts.
10///
11/// Created by [`ClientBuilder::into_page_stream`] or [`ClientBuilder::into_post_stream`].
12///
13/// # Example
14///
15/// ```no_run
16/// use booru_rs::prelude::*;
17/// use booru_rs::stream::PageStream;
18///
19/// # async fn example() -> Result<()> {
20/// let mut stream = SafebooruClient::builder()
21/// .tag("landscape")?
22/// .limit(100)
23/// .into_page_stream();
24///
25/// // Manually poll pages
26/// while let Some(page_result) = stream.next().await {
27/// let posts = page_result?;
28/// if posts.is_empty() {
29/// break;
30/// }
31/// println!("Got {} posts", posts.len());
32/// }
33/// # Ok(())
34/// # }
35/// ```
36pub struct PageStream<T: Client> {
37 builder: ClientBuilder<T>,
38 current_page: u32,
39 exhausted: bool,
40 max_pages: Option<u32>,
41}
42
43impl<T: Client> PageStream<T> {
44 /// Creates a new page stream from a client builder.
45 pub fn new(builder: ClientBuilder<T>) -> Self {
46 let current_page = builder.page;
47 Self {
48 builder,
49 current_page,
50 exhausted: false,
51 max_pages: None,
52 }
53 }
54
55 /// Sets the maximum number of pages to fetch.
56 #[must_use]
57 pub fn max_pages(mut self, max: u32) -> Self {
58 self.max_pages = Some(max);
59 self
60 }
61
62 /// Returns the current page number.
63 pub fn current_page(&self) -> u32 {
64 self.current_page
65 }
66
67 /// Fetches the next page of results.
68 ///
69 /// Returns `None` when there are no more pages or the max page limit is reached.
70 pub async fn next(&mut self) -> Option<Result<Vec<T::Post>>> {
71 if self.exhausted {
72 return None;
73 }
74
75 // Check max pages limit
76 if let Some(max) = self.max_pages {
77 let pages_fetched = self.current_page.saturating_sub(self.builder.page);
78 if pages_fetched >= max {
79 self.exhausted = true;
80 return None;
81 }
82 }
83
84 // Build client for current page
85 let mut page_builder = self.builder.clone();
86 page_builder.page = self.current_page;
87 let client = page_builder.build();
88
89 match client.get().await {
90 Ok(posts) => {
91 if posts.is_empty() {
92 self.exhausted = true;
93 return Some(Ok(posts));
94 }
95 self.current_page += 1;
96 Some(Ok(posts))
97 }
98 Err(e) => {
99 self.exhausted = true;
100 Some(Err(e))
101 }
102 }
103 }
104}
105
106/// An async stream that yields individual posts across pages.
107///
108/// This stream automatically handles pagination, fetching new pages
109/// as needed while yielding posts one at a time.
110///
111/// # Example
112///
113/// ```no_run
114/// use booru_rs::prelude::*;
115///
116/// # async fn example() -> Result<()> {
117/// let mut stream = SafebooruClient::builder()
118/// .tag("landscape")?
119/// .limit(100)
120/// .into_post_stream()
121/// .max_posts(500); // Limit to 500 posts total
122///
123/// let mut count = 0;
124/// while let Some(post_result) = stream.next().await {
125/// let post = post_result?;
126/// println!("Post #{}", post.id);
127/// count += 1;
128/// }
129/// println!("Fetched {} posts", count);
130/// # Ok(())
131/// # }
132/// ```
133pub struct PostStream<T: Client> {
134 page_stream: PageStream<T>,
135 buffer: Vec<T::Post>,
136 buffer_index: usize,
137 posts_yielded: u32,
138 max_posts: Option<u32>,
139}
140
141impl<T: Client> PostStream<T> {
142 /// Creates a new post stream from a client builder.
143 pub fn new(builder: ClientBuilder<T>) -> Self {
144 Self {
145 page_stream: PageStream::new(builder),
146 buffer: Vec::new(),
147 buffer_index: 0,
148 posts_yielded: 0,
149 max_posts: None,
150 }
151 }
152
153 /// Sets the maximum number of posts to yield.
154 #[must_use]
155 pub fn max_posts(mut self, max: u32) -> Self {
156 self.max_posts = Some(max);
157 self
158 }
159
160 /// Sets the maximum number of pages to fetch.
161 #[must_use]
162 pub fn max_pages(mut self, max: u32) -> Self {
163 self.page_stream = self.page_stream.max_pages(max);
164 self
165 }
166
167 /// Returns the number of posts yielded so far.
168 pub fn posts_yielded(&self) -> u32 {
169 self.posts_yielded
170 }
171
172 /// Returns the current page number.
173 pub fn current_page(&self) -> u32 {
174 self.page_stream.current_page()
175 }
176
177 /// Fetches the next post.
178 ///
179 /// Returns `None` when there are no more posts.
180 pub async fn next(&mut self) -> Option<Result<T::Post>> {
181 // Check max posts limit
182 if let Some(max) = self.max_posts
183 && self.posts_yielded >= max
184 {
185 return None;
186 }
187
188 // If we have posts in the buffer, return the next one
189 if self.buffer_index < self.buffer.len() {
190 let post = self.buffer.swap_remove(self.buffer_index);
191 // Note: swap_remove changes order but we're consuming, so OK
192 self.buffer_index = 0; // Reset since swap_remove moves last to current
193 self.posts_yielded += 1;
194 return Some(Ok(post));
195 }
196
197 // Need to fetch more posts
198 match self.page_stream.next().await? {
199 Ok(posts) => {
200 if posts.is_empty() {
201 return None;
202 }
203 self.buffer = posts;
204 self.buffer_index = 1; // Will return index 0
205 self.posts_yielded += 1;
206
207 // Pop the first post
208 if self.buffer.is_empty() {
209 None
210 } else {
211 Some(Ok(self.buffer.swap_remove(0)))
212 }
213 }
214 Err(e) => Some(Err(e)),
215 }
216 }
217
218 /// Collects all remaining posts into a vector.
219 ///
220 /// This is useful when you want all posts at once. Respects `max_posts` if set.
221 ///
222 /// # Errors
223 ///
224 /// Returns the first error encountered during pagination.
225 pub async fn collect(mut self) -> Result<Vec<T::Post>> {
226 let mut all_posts = Vec::new();
227
228 while let Some(result) = self.next().await {
229 all_posts.push(result?);
230 }
231
232 Ok(all_posts)
233 }
234}
235
236// Extend ClientBuilder with stream methods
237impl<T: Client> ClientBuilder<T> {
238 /// Creates an async stream that yields pages of posts.
239 ///
240 /// Each call to `next()` fetches and returns a full page of posts.
241 ///
242 /// # Example
243 ///
244 /// ```no_run
245 /// use booru_rs::prelude::*;
246 ///
247 /// # async fn example() -> Result<()> {
248 /// let mut stream = SafebooruClient::builder()
249 /// .tag("landscape")?
250 /// .limit(100)
251 /// .into_page_stream();
252 ///
253 /// while let Some(page_result) = stream.next().await {
254 /// let posts = page_result?;
255 /// if posts.is_empty() { break; }
256 /// println!("Page with {} posts", posts.len());
257 /// }
258 /// # Ok(())
259 /// # }
260 /// ```
261 #[must_use]
262 pub fn into_page_stream(self) -> PageStream<T> {
263 PageStream::new(self)
264 }
265
266 /// Creates an async stream that yields individual posts.
267 ///
268 /// Automatically handles pagination, fetching new pages as needed.
269 ///
270 /// # Example
271 ///
272 /// ```no_run
273 /// use booru_rs::prelude::*;
274 ///
275 /// # async fn example() -> Result<()> {
276 /// let mut stream = SafebooruClient::builder()
277 /// .tag("landscape")?
278 /// .limit(100)
279 /// .into_post_stream()
280 /// .max_posts(250);
281 ///
282 /// while let Some(post_result) = stream.next().await {
283 /// let post = post_result?;
284 /// println!("Post #{}", post.id);
285 /// }
286 /// # Ok(())
287 /// # }
288 /// ```
289 #[must_use]
290 pub fn into_post_stream(self) -> PostStream<T> {
291 PostStream::new(self)
292 }
293}