1use core::{
2 borrow::{Borrow, BorrowMut},
3 ops::{Deref, DerefMut},
4};
5use alloc::borrow::ToOwned;
6
7use anchored_pool::{PooledResource, ResetNothing, ResourcePoolEmpty, SharedBoundedPool};
8
9use crate::{comparator::Comparator, lending_iterator_support::LentItem, seekable::Seekable};
10use crate::{
11 pooled::{OutOfBuffers, PooledIterator},
12 cursor::{CursorLendingIterator, CursorPooledIterator},
13};
14
15
16#[derive(Debug)]
24#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
25pub struct ThreadsafePooledIter<I, BorrowedItem: ToOwned> {
26 iter: I,
27 pool: SharedBoundedPool<BorrowedItem::Owned, ResetNothing>,
28}
29
30impl<I, BorrowedItem> ThreadsafePooledIter<I, BorrowedItem>
31where
32 BorrowedItem: ToOwned,
33 BorrowedItem::Owned: Default,
34{
35 #[must_use]
37 pub fn new(iter: I, num_buffers: usize) -> Self {
38 let pool = SharedBoundedPool::new_default_without_reset(num_buffers);
39
40 Self { iter, pool }
41 }
42}
43
44impl<I, BorrowedItem> ThreadsafePooledIter<I, BorrowedItem>
45where
46 I: CursorLendingIterator,
47 BorrowedItem: ToOwned,
48 for<'lend> LentItem<'lend, I>: Borrow<BorrowedItem>,
49{
50 #[expect(clippy::needless_pass_by_value, reason = "lent item usually consists of references")]
55 #[inline]
56 fn fill_buffer(
57 pool: &SharedBoundedPool<BorrowedItem::Owned, ResetNothing>,
58 item: LentItem<'_, I>,
59 ) -> ThreadsafePoolItem<BorrowedItem::Owned> {
60 let mut pool_item = pool.get();
61 item.borrow().clone_into(&mut pool_item);
62 ThreadsafePoolItem(pool_item)
63 }
64}
65
66impl<I, BorrowedItem> PooledIterator for ThreadsafePooledIter<I, BorrowedItem>
67where
68 I: CursorLendingIterator,
69 BorrowedItem: ToOwned,
70 for<'lend> LentItem<'lend, I>: Borrow<BorrowedItem>,
71{
72 type Item = ThreadsafePoolItem<BorrowedItem::Owned>;
73
74 fn next(&mut self) -> Option<Self::Item> {
84 self.iter.next().map(|item| Self::fill_buffer(&self.pool, item))
85 }
86
87 fn try_next(&mut self) -> Result<Option<Self::Item>, OutOfBuffers> {
88 let mut buffer = self.pool.try_get()
89 .map_err(|ResourcePoolEmpty| OutOfBuffers)?;
90
91 if let Some(item) = self.iter.next() {
92 item.borrow().clone_into(&mut buffer);
93 Ok(Some(ThreadsafePoolItem(buffer)))
94 } else {
95 Ok(None)
96 }
97 }
98
99 #[inline]
100 fn buffer_pool_size(&self) -> usize {
101 self.pool.pool_size()
102 }
103
104 fn available_buffers(&self) -> usize {
105 self.pool.available_resources()
106 }
107}
108
109impl<I, BorrowedItem> CursorPooledIterator for ThreadsafePooledIter<I, BorrowedItem>
110where
111 I: CursorLendingIterator,
112 BorrowedItem: ToOwned,
113 for<'lend> LentItem<'lend, I>: Borrow<BorrowedItem>,
114{
115 #[inline]
116 fn valid(&self) -> bool {
117 self.iter.valid()
118 }
119
120 #[inline]
131 fn current(&self) -> Option<Self::Item> {
132 self.iter.current().map(|item| Self::fill_buffer(&self.pool, item))
133 }
134
135 fn try_current(&self) -> Result<Option<Self::Item>, OutOfBuffers> {
136 let mut buffer = self.pool.try_get()
137 .map_err(|ResourcePoolEmpty| OutOfBuffers)?;
138
139 if let Some(item) = self.iter.current() {
140 item.borrow().clone_into(&mut buffer);
141 Ok(Some(ThreadsafePoolItem(buffer)))
142 } else {
143 Ok(None)
144 }
145 }
146
147 fn prev(&mut self) -> Option<Self::Item> {
157 self.iter.prev().map(|item| Self::fill_buffer(&self.pool, item))
158 }
159
160 fn try_prev(&mut self) -> Result<Option<Self::Item>, OutOfBuffers> {
161 let mut buffer = self.pool.try_get()
162 .map_err(|ResourcePoolEmpty| OutOfBuffers)?;
163
164 if let Some(item) = self.iter.prev() {
165 item.borrow().clone_into(&mut buffer);
166 Ok(Some(ThreadsafePoolItem(buffer)))
167 } else {
168 Ok(None)
169 }
170 }
171}
172
173impl<I, BorrowedItem, Key, Cmp> Seekable<Key, Cmp> for ThreadsafePooledIter<I, BorrowedItem>
174where
175 I: CursorLendingIterator + Seekable<Key, Cmp>,
176 BorrowedItem: ToOwned,
177 Key: ?Sized,
178 Cmp: Comparator<Key>,
179 for<'lend> LentItem<'lend, I>: Borrow<BorrowedItem>,
180{
181 #[inline]
182 fn reset(&mut self) {
183 self.iter.reset();
184 }
185
186 fn seek(&mut self, min_bound: &Key) {
187 self.iter.seek(min_bound);
188 }
189
190 fn seek_before(&mut self, strict_upper_bound: &Key) {
191 self.iter.seek_before(strict_upper_bound);
192 }
193
194 #[inline]
195 fn seek_to_first(&mut self) {
196 self.iter.seek_to_first();
197 }
198
199 fn seek_to_last(&mut self) {
200 self.iter.seek_to_last();
201 }
202}
203
204#[derive(Debug)]
209#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
210pub struct ThreadsafePoolItem<OwnedItem>(
211 PooledResource<SharedBoundedPool<OwnedItem, ResetNothing>, OwnedItem>,
212);
213
214impl<OwnedItem> Deref for ThreadsafePoolItem<OwnedItem> {
215 type Target = OwnedItem;
216
217 #[inline]
218 fn deref(&self) -> &Self::Target {
219 &self.0
220 }
221}
222
223impl<OwnedItem> DerefMut for ThreadsafePoolItem<OwnedItem> {
224 #[inline]
225 fn deref_mut(&mut self) -> &mut Self::Target {
226 &mut self.0
227 }
228}
229
230impl<OwnedItem> Borrow<OwnedItem> for ThreadsafePoolItem<OwnedItem> {
231 #[inline]
232 fn borrow(&self) -> &OwnedItem {
233 self
234 }
235}
236
237impl<OwnedItem> BorrowMut<OwnedItem> for ThreadsafePoolItem<OwnedItem> {
238 #[inline]
239 fn borrow_mut(&mut self) -> &mut OwnedItem {
240 self
241 }
242}
243
244impl<OwnedItem> AsRef<OwnedItem> for ThreadsafePoolItem<OwnedItem> {
245 #[inline]
246 fn as_ref(&self) -> &OwnedItem {
247 self
248 }
249}
250
251impl<OwnedItem> AsMut<OwnedItem> for ThreadsafePoolItem<OwnedItem> {
252 #[inline]
253 fn as_mut(&mut self) -> &mut OwnedItem {
254 self
255 }
256}
257
258
259#[cfg(test)]
260mod tests {
261 use crate::test_iter::TestIter;
262 use super::*;
263
264
265 #[test]
266 fn threadsafe_pooled_test_iter() {
267 let data: &[u8] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9].as_slice();
268 let mut iter = ThreadsafePooledIter::<_, u8>::new(TestIter::new(data).unwrap(), 2);
269
270 let first = iter.next().unwrap();
272 assert_eq!(*first, 0);
273
274 for i in 1..=9 {
275 assert!(iter.valid());
276 let next = iter.next().unwrap();
277 assert!(iter.try_next().is_err());
279 assert_eq!(*next, i);
280 }
281 drop(first);
282
283 assert!(iter.next().is_none());
284 let _unused = iter.current();
285
286 for i in (0..=9).rev() {
287 let current = iter.current();
288 let prev = iter.prev().unwrap();
289
290 if current.is_some() {
291 assert!(iter.try_next().is_err());
293 }
294 assert!(iter.valid());
295
296 assert!(!current.is_some_and(|curr| *curr == *prev));
298
299 let new_current = iter.current().unwrap();
300
301 assert_eq!(*prev, i);
302 assert_eq!(*new_current, i);
303 }
304 }
305
306 #[test]
307 fn seek_test() {
308 let data: &[u8] = [0, 1, 2, 3, 4, 4, 4, 4, 4, 4, 4, 4, 5, 6, 7, 8, 9, 99].as_slice();
309 let mut iter = ThreadsafePooledIter::<_, u8>::new(TestIter::new(data).unwrap(), 1);
310
311 iter.seek_to_first();
312 assert_eq!(*iter.current().unwrap(), 0);
313
314 iter.seek(&0);
315 assert_eq!(*iter.current().unwrap(), 0);
316
317 iter.seek(&1);
318 assert_eq!(*iter.current().unwrap(), 1);
319
320 iter.seek(&9);
321 assert_eq!(*iter.current().unwrap(), 9);
322
323 iter.seek(&8);
324 assert_eq!(*iter.current().unwrap(), 8);
325
326 iter.seek(&10);
327 assert_eq!(*iter.current().unwrap(), 99);
328
329 iter.seek_before(&92);
330 assert_eq!(*iter.current().unwrap(), 9);
331
332 iter.seek_before(&99);
333 assert_eq!(*iter.current().unwrap(), 9);
334
335 iter.seek_before(&100);
336 assert_eq!(*iter.current().unwrap(), 99);
337
338 iter.seek_before(&1);
339 assert_eq!(*iter.current().unwrap(), 0);
340
341 iter.seek_before(&0);
342 assert!(!iter.valid());
343
344 iter.seek(&100);
345 assert!(!iter.valid());
346
347 iter.seek(&99);
348 assert_eq!(*iter.current().unwrap(), 99);
349
350 iter.seek_to_last();
351 assert_eq!(*iter.current().unwrap(), 99);
352
353 iter.seek_before(&4);
354 assert_eq!(*iter.current().unwrap(), 3);
355 }
356}