booru_rs/
stream.rs

1//! Async stream utilities for paginated results.
2//!
3//! This module provides utilities for iterating through paginated
4//! booru results using async streams.
5
6use crate::client::{Client, ClientBuilder};
7use crate::error::Result;
8
9/// An async stream that yields pages of posts.
10///
11/// Created by [`ClientBuilder::into_page_stream`] or [`ClientBuilder::into_post_stream`].
12///
13/// # Example
14///
15/// ```no_run
16/// use booru_rs::prelude::*;
17/// use booru_rs::stream::PageStream;
18///
19/// # async fn example() -> Result<()> {
20/// let mut stream = SafebooruClient::builder()
21///     .tag("landscape")?
22///     .limit(100)
23///     .into_page_stream();
24///
25/// // Manually poll pages
26/// while let Some(page_result) = stream.next().await {
27///     let posts = page_result?;
28///     if posts.is_empty() {
29///         break;
30///     }
31///     println!("Got {} posts", posts.len());
32/// }
33/// # Ok(())
34/// # }
35/// ```
36pub struct PageStream<T: Client> {
37    builder: ClientBuilder<T>,
38    current_page: u32,
39    exhausted: bool,
40    max_pages: Option<u32>,
41}
42
43impl<T: Client> PageStream<T> {
44    /// Creates a new page stream from a client builder.
45    pub fn new(builder: ClientBuilder<T>) -> Self {
46        let current_page = builder.page;
47        Self {
48            builder,
49            current_page,
50            exhausted: false,
51            max_pages: None,
52        }
53    }
54
55    /// Sets the maximum number of pages to fetch.
56    #[must_use]
57    pub fn max_pages(mut self, max: u32) -> Self {
58        self.max_pages = Some(max);
59        self
60    }
61
62    /// Returns the current page number.
63    pub fn current_page(&self) -> u32 {
64        self.current_page
65    }
66
67    /// Fetches the next page of results.
68    ///
69    /// Returns `None` when there are no more pages or the max page limit is reached.
70    pub async fn next(&mut self) -> Option<Result<Vec<T::Post>>> {
71        if self.exhausted {
72            return None;
73        }
74
75        // Check max pages limit
76        if let Some(max) = self.max_pages {
77            let pages_fetched = self.current_page.saturating_sub(self.builder.page);
78            if pages_fetched >= max {
79                self.exhausted = true;
80                return None;
81            }
82        }
83
84        // Build client for current page
85        let mut page_builder = self.builder.clone();
86        page_builder.page = self.current_page;
87        let client = page_builder.build();
88
89        match client.get().await {
90            Ok(posts) => {
91                if posts.is_empty() {
92                    self.exhausted = true;
93                    return Some(Ok(posts));
94                }
95                self.current_page += 1;
96                Some(Ok(posts))
97            }
98            Err(e) => {
99                self.exhausted = true;
100                Some(Err(e))
101            }
102        }
103    }
104}
105
106/// An async stream that yields individual posts across pages.
107///
108/// This stream automatically handles pagination, fetching new pages
109/// as needed while yielding posts one at a time.
110///
111/// # Example
112///
113/// ```no_run
114/// use booru_rs::prelude::*;
115///
116/// # async fn example() -> Result<()> {
117/// let mut stream = SafebooruClient::builder()
118///     .tag("landscape")?
119///     .limit(100)
120///     .into_post_stream()
121///     .max_posts(500); // Limit to 500 posts total
122///
123/// let mut count = 0;
124/// while let Some(post_result) = stream.next().await {
125///     let post = post_result?;
126///     println!("Post #{}", post.id);
127///     count += 1;
128/// }
129/// println!("Fetched {} posts", count);
130/// # Ok(())
131/// # }
132/// ```
133pub struct PostStream<T: Client> {
134    page_stream: PageStream<T>,
135    buffer: Vec<T::Post>,
136    buffer_index: usize,
137    posts_yielded: u32,
138    max_posts: Option<u32>,
139}
140
141impl<T: Client> PostStream<T> {
142    /// Creates a new post stream from a client builder.
143    pub fn new(builder: ClientBuilder<T>) -> Self {
144        Self {
145            page_stream: PageStream::new(builder),
146            buffer: Vec::new(),
147            buffer_index: 0,
148            posts_yielded: 0,
149            max_posts: None,
150        }
151    }
152
153    /// Sets the maximum number of posts to yield.
154    #[must_use]
155    pub fn max_posts(mut self, max: u32) -> Self {
156        self.max_posts = Some(max);
157        self
158    }
159
160    /// Sets the maximum number of pages to fetch.
161    #[must_use]
162    pub fn max_pages(mut self, max: u32) -> Self {
163        self.page_stream = self.page_stream.max_pages(max);
164        self
165    }
166
167    /// Returns the number of posts yielded so far.
168    pub fn posts_yielded(&self) -> u32 {
169        self.posts_yielded
170    }
171
172    /// Returns the current page number.
173    pub fn current_page(&self) -> u32 {
174        self.page_stream.current_page()
175    }
176
177    /// Fetches the next post.
178    ///
179    /// Returns `None` when there are no more posts.
180    pub async fn next(&mut self) -> Option<Result<T::Post>> {
181        // Check max posts limit
182        if let Some(max) = self.max_posts
183            && self.posts_yielded >= max
184        {
185            return None;
186        }
187
188        // If we have posts in the buffer, return the next one
189        if self.buffer_index < self.buffer.len() {
190            let post = self.buffer.swap_remove(self.buffer_index);
191            // Note: swap_remove changes order but we're consuming, so OK
192            self.buffer_index = 0; // Reset since swap_remove moves last to current
193            self.posts_yielded += 1;
194            return Some(Ok(post));
195        }
196
197        // Need to fetch more posts
198        match self.page_stream.next().await? {
199            Ok(posts) => {
200                if posts.is_empty() {
201                    return None;
202                }
203                self.buffer = posts;
204                self.buffer_index = 1; // Will return index 0
205                self.posts_yielded += 1;
206
207                // Pop the first post
208                if self.buffer.is_empty() {
209                    None
210                } else {
211                    Some(Ok(self.buffer.swap_remove(0)))
212                }
213            }
214            Err(e) => Some(Err(e)),
215        }
216    }
217
218    /// Collects all remaining posts into a vector.
219    ///
220    /// This is useful when you want all posts at once. Respects `max_posts` if set.
221    ///
222    /// # Errors
223    ///
224    /// Returns the first error encountered during pagination.
225    pub async fn collect(mut self) -> Result<Vec<T::Post>> {
226        let mut all_posts = Vec::new();
227
228        while let Some(result) = self.next().await {
229            all_posts.push(result?);
230        }
231
232        Ok(all_posts)
233    }
234}
235
236// Extend ClientBuilder with stream methods
237impl<T: Client> ClientBuilder<T> {
238    /// Creates an async stream that yields pages of posts.
239    ///
240    /// Each call to `next()` fetches and returns a full page of posts.
241    ///
242    /// # Example
243    ///
244    /// ```no_run
245    /// use booru_rs::prelude::*;
246    ///
247    /// # async fn example() -> Result<()> {
248    /// let mut stream = SafebooruClient::builder()
249    ///     .tag("landscape")?
250    ///     .limit(100)
251    ///     .into_page_stream();
252    ///
253    /// while let Some(page_result) = stream.next().await {
254    ///     let posts = page_result?;
255    ///     if posts.is_empty() { break; }
256    ///     println!("Page with {} posts", posts.len());
257    /// }
258    /// # Ok(())
259    /// # }
260    /// ```
261    #[must_use]
262    pub fn into_page_stream(self) -> PageStream<T> {
263        PageStream::new(self)
264    }
265
266    /// Creates an async stream that yields individual posts.
267    ///
268    /// Automatically handles pagination, fetching new pages as needed.
269    ///
270    /// # Example
271    ///
272    /// ```no_run
273    /// use booru_rs::prelude::*;
274    ///
275    /// # async fn example() -> Result<()> {
276    /// let mut stream = SafebooruClient::builder()
277    ///     .tag("landscape")?
278    ///     .limit(100)
279    ///     .into_post_stream()
280    ///     .max_posts(250);
281    ///
282    /// while let Some(post_result) = stream.next().await {
283    ///     let post = post_result?;
284    ///     println!("Post #{}", post.id);
285    /// }
286    /// # Ok(())
287    /// # }
288    /// ```
289    #[must_use]
290    pub fn into_post_stream(self) -> PostStream<T> {
291        PostStream::new(self)
292    }
293}