seekable_iterator/
threadsafe_pooled_iter.rs

1use core::{
2    borrow::{Borrow, BorrowMut},
3    ops::{Deref, DerefMut},
4};
5use alloc::borrow::ToOwned;
6
7use anchored_pool::{PooledResource, ResetNothing, ResourcePoolEmpty, SharedBoundedPool};
8
9use crate::{comparator::Comparator, lending_iterator_support::LentItem, seekable::Seekable};
10use crate::{
11    pooled::{OutOfBuffers, PooledIterator},
12    cursor::{CursorLendingIterator, CursorPooledIterator},
13};
14
15
16/// Convert a [`CursorLendingIterator`] into a [`CursorPooledIterator`] by storing recently
17/// accessed items in reusable buffers.
18///
19/// This effectively allows the iterator to lend out multiple items at once, unlike a lending
20/// iterator which can only lend out one. This comes primarily at the cost of extra copying
21/// into buffers, and in memory usage. The costs of allocating buffers is likely amortized by
22/// their reuse.
23#[derive(Debug)]
24#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
25pub struct ThreadsafePooledIter<I, BorrowedItem: ToOwned> {
26    iter: I,
27    pool: SharedBoundedPool<BorrowedItem::Owned, ResetNothing>,
28}
29
30impl<I, BorrowedItem> ThreadsafePooledIter<I, BorrowedItem>
31where
32    BorrowedItem:        ToOwned,
33    BorrowedItem::Owned: Default,
34{
35    /// Create a `ThreadsafePooledIter` that can lend out up to `num_buffers` items at a time.
36    #[must_use]
37    pub fn new(iter: I, num_buffers: usize) -> Self {
38        let pool = SharedBoundedPool::new_default_without_reset(num_buffers);
39
40        Self { iter, pool }
41    }
42}
43
44impl<I, BorrowedItem> ThreadsafePooledIter<I, BorrowedItem>
45where
46    I:                             CursorLendingIterator,
47    BorrowedItem:                  ToOwned,
48    for<'lend> LentItem<'lend, I>: Borrow<BorrowedItem>,
49{
50    /// # Potential Panics or Deadlocks
51    /// If `self.buffer_pool_size() == 0`, then this method panics.
52    /// This method may also cause a deadlock if no buffers are currently available, and the
53    /// current thread needs to make progress in order to release a buffer.
54    #[expect(clippy::needless_pass_by_value, reason = "lent item usually consists of references")]
55    #[inline]
56    fn fill_buffer(
57        pool: &SharedBoundedPool<BorrowedItem::Owned, ResetNothing>,
58        item: LentItem<'_, I>,
59    ) -> ThreadsafePoolItem<BorrowedItem::Owned> {
60        let mut pool_item = pool.get();
61        item.borrow().clone_into(&mut pool_item);
62        ThreadsafePoolItem(pool_item)
63    }
64}
65
66impl<I, BorrowedItem> PooledIterator for ThreadsafePooledIter<I, BorrowedItem>
67where
68    I:                             CursorLendingIterator,
69    BorrowedItem:                  ToOwned,
70    for<'lend> LentItem<'lend, I>: Borrow<BorrowedItem>,
71{
72    type Item = ThreadsafePoolItem<BorrowedItem::Owned>;
73
74    /// Move the iterator one position forwards, and return the entry at that position.
75    /// Returns `None` if the iterator was at the last entry.
76    ///
77    /// May need to wait for a buffer to become available.
78    ///
79    /// # Potential Panics or Deadlocks
80    /// If `self.buffer_pool_size() == 0`, then this method panics.
81    /// This method may also cause a deadlock if no buffers are currently available, and the
82    /// current thread needs to make progress in order to release a buffer.
83    fn next(&mut self) -> Option<Self::Item> {
84        self.iter.next().map(|item| Self::fill_buffer(&self.pool, item))
85    }
86
87    fn try_next(&mut self) -> Result<Option<Self::Item>, OutOfBuffers> {
88        let mut buffer = self.pool.try_get()
89            .map_err(|ResourcePoolEmpty| OutOfBuffers)?;
90
91        if let Some(item) = self.iter.next() {
92            item.borrow().clone_into(&mut buffer);
93            Ok(Some(ThreadsafePoolItem(buffer)))
94        } else {
95            Ok(None)
96        }
97    }
98
99    #[inline]
100    fn buffer_pool_size(&self) -> usize {
101        self.pool.pool_size()
102    }
103
104    fn available_buffers(&self) -> usize {
105        self.pool.available_resources()
106    }
107}
108
109impl<I, BorrowedItem> CursorPooledIterator for ThreadsafePooledIter<I, BorrowedItem>
110where
111    I:                             CursorLendingIterator,
112    BorrowedItem:                  ToOwned,
113    for<'lend> LentItem<'lend, I>: Borrow<BorrowedItem>,
114{
115    #[inline]
116    fn valid(&self) -> bool {
117        self.iter.valid()
118    }
119
120    /// Get the current value the iterator is at, if the iterator is [valid].
121    ///
122    /// May need to wait for a buffer to become available.
123    ///
124    /// # Potential Panics or Deadlocks
125    /// If `self.buffer_pool_size() == 0`, then this method panics.
126    /// This method may also cause a deadlock if no buffers are currently available, and the
127    /// current thread needs to make progress in order to release a buffer.
128    ///
129    /// [valid]: CursorPooledIterator::valid
130    #[inline]
131    fn current(&self) -> Option<Self::Item> {
132        self.iter.current().map(|item| Self::fill_buffer(&self.pool, item))
133    }
134
135    fn try_current(&self) -> Result<Option<Self::Item>, OutOfBuffers> {
136        let mut buffer = self.pool.try_get()
137            .map_err(|ResourcePoolEmpty| OutOfBuffers)?;
138
139        if let Some(item) = self.iter.current() {
140            item.borrow().clone_into(&mut buffer);
141            Ok(Some(ThreadsafePoolItem(buffer)))
142        } else {
143            Ok(None)
144        }
145    }
146
147    /// Move the iterator one position back, and return the entry at that position.
148    /// Returns `None` if the iterator was at the last entry.
149    ///
150    /// May need to wait for a buffer to become available.
151    ///
152    /// # Potential Panics or Deadlocks
153    /// If `self.buffer_pool_size() == 0`, then this method panics.
154    /// This method may also cause a deadlock if no buffers are currently available, and the
155    /// current thread needs to make progress in order to release a buffer.
156    fn prev(&mut self) -> Option<Self::Item> {
157        self.iter.prev().map(|item| Self::fill_buffer(&self.pool, item))
158    }
159
160    fn try_prev(&mut self) -> Result<Option<Self::Item>, OutOfBuffers> {
161        let mut buffer = self.pool.try_get()
162            .map_err(|ResourcePoolEmpty| OutOfBuffers)?;
163
164        if let Some(item) = self.iter.prev() {
165            item.borrow().clone_into(&mut buffer);
166            Ok(Some(ThreadsafePoolItem(buffer)))
167        } else {
168            Ok(None)
169        }
170    }
171}
172
173impl<I, BorrowedItem, Key, Cmp> Seekable<Key, Cmp> for ThreadsafePooledIter<I, BorrowedItem>
174where
175    I:                             CursorLendingIterator + Seekable<Key, Cmp>,
176    BorrowedItem:                  ToOwned,
177    Key:                           ?Sized,
178    Cmp:                           Comparator<Key>,
179    for<'lend> LentItem<'lend, I>: Borrow<BorrowedItem>,
180{
181    #[inline]
182    fn reset(&mut self) {
183        self.iter.reset();
184    }
185
186    fn seek(&mut self, min_bound: &Key) {
187        self.iter.seek(min_bound);
188    }
189
190    fn seek_before(&mut self, strict_upper_bound: &Key) {
191        self.iter.seek_before(strict_upper_bound);
192    }
193
194    #[inline]
195    fn seek_to_first(&mut self) {
196        self.iter.seek_to_first();
197    }
198
199    fn seek_to_last(&mut self) {
200        self.iter.seek_to_last();
201    }
202}
203
204/// The type of an item returned by [`ThreadsafePooledIter`].
205///
206/// The owned item buffer is returned to the [`ThreadsafePooledIter`] when the
207/// `ThreadsafePoolItem` is dropped.
208#[derive(Debug)]
209#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
210pub struct ThreadsafePoolItem<OwnedItem>(
211    PooledResource<SharedBoundedPool<OwnedItem, ResetNothing>, OwnedItem>,
212);
213
214impl<OwnedItem> Deref for ThreadsafePoolItem<OwnedItem> {
215    type Target = OwnedItem;
216
217    #[inline]
218    fn deref(&self) -> &Self::Target {
219        &self.0
220    }
221}
222
223impl<OwnedItem> DerefMut for ThreadsafePoolItem<OwnedItem> {
224    #[inline]
225    fn deref_mut(&mut self) -> &mut Self::Target {
226        &mut self.0
227    }
228}
229
230impl<OwnedItem> Borrow<OwnedItem> for ThreadsafePoolItem<OwnedItem> {
231    #[inline]
232    fn borrow(&self) -> &OwnedItem {
233        self
234    }
235}
236
237impl<OwnedItem> BorrowMut<OwnedItem> for ThreadsafePoolItem<OwnedItem> {
238    #[inline]
239    fn borrow_mut(&mut self) -> &mut OwnedItem {
240        self
241    }
242}
243
244impl<OwnedItem> AsRef<OwnedItem> for ThreadsafePoolItem<OwnedItem> {
245    #[inline]
246    fn as_ref(&self) -> &OwnedItem {
247        self
248    }
249}
250
251impl<OwnedItem> AsMut<OwnedItem> for ThreadsafePoolItem<OwnedItem> {
252    #[inline]
253    fn as_mut(&mut self) -> &mut OwnedItem {
254        self
255    }
256}
257
258
259#[cfg(test)]
260mod tests {
261    use crate::test_iter::TestIter;
262    use super::*;
263
264
265    #[test]
266    fn threadsafe_pooled_test_iter() {
267        let data: &[u8] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9].as_slice();
268        let mut iter = ThreadsafePooledIter::<_, u8>::new(TestIter::new(data).unwrap(), 2);
269
270        // Hold one buffer the entire time
271        let first = iter.next().unwrap();
272        assert_eq!(*first, 0);
273
274        for i in 1..=9 {
275            assert!(iter.valid());
276            let next = iter.next().unwrap();
277            // Both of the two buffers are in use
278            assert!(iter.try_next().is_err());
279            assert_eq!(*next, i);
280        }
281        drop(first);
282
283        assert!(iter.next().is_none());
284        let _unused = iter.current();
285
286        for i in (0..=9).rev() {
287            let current = iter.current();
288            let prev = iter.prev().unwrap();
289
290            if current.is_some() {
291                // Both of the two buffers are in use
292                assert!(iter.try_next().is_err());
293            }
294            assert!(iter.valid());
295
296            // This drops `current`
297            assert!(!current.is_some_and(|curr| *curr == *prev));
298
299            let new_current = iter.current().unwrap();
300
301            assert_eq!(*prev, i);
302            assert_eq!(*new_current, i);
303        }
304    }
305
306    #[test]
307    fn seek_test() {
308        let data: &[u8] = [0, 1, 2, 3, 4, 4, 4, 4, 4, 4, 4, 4, 5, 6, 7, 8, 9, 99].as_slice();
309        let mut iter = ThreadsafePooledIter::<_, u8>::new(TestIter::new(data).unwrap(), 1);
310
311        iter.seek_to_first();
312        assert_eq!(*iter.current().unwrap(), 0);
313
314        iter.seek(&0);
315        assert_eq!(*iter.current().unwrap(), 0);
316
317        iter.seek(&1);
318        assert_eq!(*iter.current().unwrap(), 1);
319
320        iter.seek(&9);
321        assert_eq!(*iter.current().unwrap(), 9);
322
323        iter.seek(&8);
324        assert_eq!(*iter.current().unwrap(), 8);
325
326        iter.seek(&10);
327        assert_eq!(*iter.current().unwrap(), 99);
328
329        iter.seek_before(&92);
330        assert_eq!(*iter.current().unwrap(), 9);
331
332        iter.seek_before(&99);
333        assert_eq!(*iter.current().unwrap(), 9);
334
335        iter.seek_before(&100);
336        assert_eq!(*iter.current().unwrap(), 99);
337
338        iter.seek_before(&1);
339        assert_eq!(*iter.current().unwrap(), 0);
340
341        iter.seek_before(&0);
342        assert!(!iter.valid());
343
344        iter.seek(&100);
345        assert!(!iter.valid());
346
347        iter.seek(&99);
348        assert_eq!(*iter.current().unwrap(), 99);
349
350        iter.seek_to_last();
351        assert_eq!(*iter.current().unwrap(), 99);
352
353        iter.seek_before(&4);
354        assert_eq!(*iter.current().unwrap(), 3);
355    }
356}