1use super::{indexes::Size, page::Page};
2use crate::indexes::max_rows_in_page;
3use crate::{page::PageHeader, MemoryUsage};
4use core::sync::atomic::{AtomicUsize, Ordering};
5use crossbeam_queue::ArrayQueue;
6use spacetimedb_sats::bsatn::{self, DecodeError};
7use spacetimedb_sats::de::{
8 DeserializeSeed, Deserializer, Error, NamedProductAccess, ProductVisitor, SeqProductAccess,
9};
10use std::sync::Arc;
11
12#[derive(Clone)]
14pub struct PagePool {
15 inner: Arc<PagePoolInner>,
16}
17
18impl MemoryUsage for PagePool {
19 fn heap_usage(&self) -> usize {
20 let Self { inner } = self;
21 inner.heap_usage()
22 }
23}
24
25impl PagePool {
26 pub fn new_for_test() -> Self {
27 Self::new(Some(100 * size_of::<Page>()))
28 }
29
30 pub fn new(max_size: Option<usize>) -> Self {
34 const PAGE_SIZE: usize = size_of::<Page>();
35 const DEFAULT_MAX_SIZE: usize = 128 * PAGE_SIZE; let queue_size = max_size.unwrap_or(DEFAULT_MAX_SIZE) / PAGE_SIZE;
46 let inner = Arc::new(PagePoolInner::new(queue_size));
47 Self { inner }
48 }
49
50 pub fn put(&self, page: Box<Page>) {
52 self.inner.put(page);
53 }
54
55 pub fn put_many(&self, pages: impl Iterator<Item = Box<Page>>) {
57 for page in pages {
58 self.put(page);
59 }
60 }
61
62 pub fn take_with_fixed_row_size(&self, fixed_row_size: Size) -> Box<Page> {
66 self.inner.take_with_fixed_row_size(fixed_row_size)
67 }
68
69 fn take_with_max_row_count(&self, max_rows_in_page: usize) -> Box<Page> {
73 self.inner.take_with_max_row_count(max_rows_in_page)
74 }
75
76 pub fn take_deserialize_from(&self, buf: &[u8]) -> Result<Box<Page>, DecodeError> {
78 self.deserialize(bsatn::Deserializer::new(&mut &*buf))
79 }
80
81 pub fn dropped_pages_count(&self) -> usize {
83 self.inner.dropped_pages_count.load(Ordering::Relaxed)
84 }
85
86 pub fn new_pages_allocated_count(&self) -> usize {
88 self.inner.new_pages_allocated_count.load(Ordering::Relaxed)
89 }
90
91 pub fn pages_reused_count(&self) -> usize {
93 self.inner.pages_reused_count.load(Ordering::Relaxed)
94 }
95
96 pub fn pages_returned_count(&self) -> usize {
98 self.inner.pages_returned_count.load(Ordering::Relaxed)
99 }
100}
101
102impl<'de> DeserializeSeed<'de> for &PagePool {
103 type Output = Box<Page>;
104
105 fn deserialize<D: Deserializer<'de>>(self, de: D) -> Result<Self::Output, D::Error> {
106 de.deserialize_product(self)
107 }
108}
109
110impl<'de> ProductVisitor<'de> for &PagePool {
111 type Output = Box<Page>;
112
113 fn product_name(&self) -> Option<&str> {
114 Some("Page")
115 }
116
117 fn product_len(&self) -> usize {
118 2
119 }
120
121 fn visit_seq_product<A: SeqProductAccess<'de>>(self, mut prod: A) -> Result<Self::Output, A::Error> {
122 let header = prod
123 .next_element::<PageHeader>()?
124 .ok_or_else(|| A::Error::invalid_product_length(2, &self))?;
125 let row_data = prod
126 .next_element()?
127 .ok_or_else(|| A::Error::invalid_product_length(2, &self))?;
128
129 let mut page = self.take_with_max_row_count(header.max_rows_in_page());
131 unsafe { page.set_raw(header, row_data) };
133
134 Ok(page)
135 }
136
137 fn visit_named_product<A: NamedProductAccess<'de>>(self, _: A) -> Result<Self::Output, A::Error> {
138 unreachable!()
139 }
140}
141
142struct PagePoolInner {
144 pages: ArrayQueue<Box<Page>>,
145 dropped_pages_count: AtomicUsize,
146 new_pages_allocated_count: AtomicUsize,
147 pages_reused_count: AtomicUsize,
148 pages_returned_count: AtomicUsize,
149}
150
151impl MemoryUsage for PagePoolInner {
152 fn heap_usage(&self) -> usize {
153 let Self {
154 pages,
155 dropped_pages_count,
156 new_pages_allocated_count,
157 pages_reused_count,
158 pages_returned_count,
159 } = self;
160 dropped_pages_count.heap_usage() +
161 new_pages_allocated_count.heap_usage() +
162 pages_reused_count.heap_usage() +
163 pages_returned_count.heap_usage() +
164 pages.capacity() * size_of::<(AtomicUsize, Box<Page>)>() +
166 pages.len() * size_of::<Page>()
168 }
169}
170
171#[inline]
172fn inc(atomic: &AtomicUsize) {
173 atomic.fetch_add(1, Ordering::Relaxed);
174}
175
176impl PagePoolInner {
177 fn new(cap: usize) -> Self {
179 let pages = ArrayQueue::new(cap);
180 Self {
181 pages,
182 dropped_pages_count: <_>::default(),
183 new_pages_allocated_count: <_>::default(),
184 pages_reused_count: <_>::default(),
185 pages_returned_count: <_>::default(),
186 }
187 }
188
189 fn put(&self, page: Box<Page>) {
191 if self.pages.push(page).is_ok() {
193 inc(&self.pages_returned_count);
194 } else {
195 inc(&self.dropped_pages_count);
196 }
197 }
198
199 fn take_with_max_row_count(&self, max_rows_in_page: usize) -> Box<Page> {
203 self.pages
204 .pop()
205 .map(|mut page| {
206 inc(&self.pages_reused_count);
207 page.reset_for(max_rows_in_page);
208 page
209 })
210 .unwrap_or_else(|| {
211 inc(&self.new_pages_allocated_count);
212 Page::new_with_max_row_count(max_rows_in_page)
213 })
214 }
215
216 fn take_with_fixed_row_size(&self, fixed_row_size: Size) -> Box<Page> {
220 self.take_with_max_row_count(max_rows_in_page(fixed_row_size))
221 }
222}
223
224#[cfg(test)]
225mod tests {
226 use super::*;
227 use core::{iter, ptr::addr_eq};
228
229 fn present_rows_ptr(page: &Page) -> *const () {
230 page.page_header_for_test().present_rows_storage_ptr_for_test()
231 }
232
233 fn assert_metrics(pool: &PagePool, dropped: usize, new: usize, reused: usize, returned: usize) {
234 assert_eq!(pool.dropped_pages_count(), dropped);
235 assert_eq!(pool.new_pages_allocated_count(), new);
236 assert_eq!(pool.pages_reused_count(), reused);
237 assert_eq!(pool.pages_returned_count(), returned);
238 }
239
240 #[test]
241 fn page_pool_returns_same_page() {
242 let pool = PagePool::new_for_test();
243 assert_metrics(&pool, 0, 0, 0, 0);
244
245 let page1 = pool.take_with_max_row_count(10);
247 assert_metrics(&pool, 0, 1, 0, 0);
248 let page1_ptr = &*page1 as *const _;
249 let page1_pr_ptr = present_rows_ptr(&page1);
250 pool.put(page1);
251 assert_metrics(&pool, 0, 1, 0, 1);
252
253 let page2 = pool.take_with_max_row_count(64);
255 assert_metrics(&pool, 0, 1, 1, 1);
256 let page2_ptr = &*page2 as *const _;
257 let page2_pr_ptr = present_rows_ptr(&page2);
258 assert!(addr_eq(page1_ptr, page2_ptr));
260 assert!(addr_eq(page1_pr_ptr, page2_pr_ptr));
262 pool.put(page2);
263 assert_metrics(&pool, 0, 1, 1, 2);
264
265 let page3 = pool.take_with_max_row_count(64 + 1);
267 assert_metrics(&pool, 0, 1, 2, 2);
268 let page3_ptr = &*page3 as *const _;
269 let page3_pr_ptr = present_rows_ptr(&page3);
270 assert!(addr_eq(page1_ptr, page3_ptr));
272 assert!(!addr_eq(page1_pr_ptr, page3_pr_ptr));
274
275 let page4 = Page::new_with_max_row_count(10);
277 let page4_ptr = &*page4 as *const _;
278 pool.put(page4);
279 pool.put(page3);
280 assert_metrics(&pool, 0, 1, 2, 4);
281 let page5 = pool.take_with_max_row_count(10);
283 assert_metrics(&pool, 0, 1, 3, 4);
284 let page5_ptr = &*page5 as *const _;
285 assert!(!addr_eq(page5_ptr, page1_ptr));
287 assert!(addr_eq(page5_ptr, page4_ptr));
288 }
289
290 #[test]
291 fn page_pool_drops_past_max_size() {
292 const N: usize = 3;
293 let pool = PagePool::new(Some(size_of::<Page>() * N));
294
295 let pages = iter::repeat_with(|| pool.take_with_max_row_count(42))
296 .take(N + 1)
297 .collect::<Vec<_>>();
298 assert_metrics(&pool, 0, N + 1, 0, 0);
299
300 pool.put_many(pages.into_iter());
301 assert_metrics(&pool, 1, N + 1, 0, N);
302 assert_eq!(pool.inner.pages.len(), N);
303 }
304}