1use crate::{DEFAULT_BATCH_SIZE, LanceError, Result, SortKey};
2use bytes::Bytes;
3use crossbeam::queue::ArrayQueue;
4use std::sync::Arc;
5
6#[repr(C, align(8))]
7pub struct IngestionBatch {
8 pub batch_id: u64,
9 pub sort_key: SortKey,
10 pub payload: Bytes,
11 pub record_count: u32,
12 pub crc: u32,
13}
14
15impl IngestionBatch {
16 #[inline]
17 #[must_use]
18 pub fn new(
19 batch_id: u64,
20 sort_key: SortKey,
21 payload: Bytes,
22 record_count: u32,
23 crc: u32,
24 ) -> Self {
25 Self {
26 batch_id,
27 sort_key,
28 payload,
29 record_count,
30 crc,
31 }
32 }
33
34 #[inline]
35 #[must_use]
36 pub fn payload_len(&self) -> usize {
37 self.payload.len()
38 }
39}
40
41#[repr(C, align(4096))]
42pub struct LoanableBatch {
43 pub batch_id: u64,
44 data: Box<[u8]>,
45 len: usize,
46 capacity: usize,
47}
48
49impl LoanableBatch {
50 #[must_use]
52 pub fn new(batch_id: u64, capacity: usize) -> Self {
53 let data = vec![0u8; capacity].into_boxed_slice();
54 Self {
55 batch_id,
56 data,
57 len: 0,
58 capacity,
59 }
60 }
61
62 #[inline]
63 pub fn reset(&mut self) {
64 self.len = 0;
65 }
66
67 #[inline]
68 #[must_use]
69 pub fn as_slice(&self) -> &[u8] {
70 &self.data[..self.len]
71 }
72
73 #[inline]
74 pub fn as_mut_slice(&mut self) -> &mut [u8] {
75 &mut self.data[..self.len]
76 }
77
78 #[inline]
79 #[must_use]
80 pub fn remaining_capacity(&self) -> usize {
81 self.capacity - self.len
82 }
83
84 #[inline]
85 #[must_use]
86 pub fn len(&self) -> usize {
87 self.len
88 }
89
90 #[inline]
91 #[must_use]
92 pub fn is_empty(&self) -> bool {
93 self.len == 0
94 }
95
96 #[inline]
97 #[must_use]
98 pub fn capacity(&self) -> usize {
99 self.capacity
100 }
101
102 #[inline]
103 #[must_use]
104 pub fn as_ptr(&self) -> *const u8 {
105 self.data.as_ptr()
106 }
107
108 #[inline]
109 pub fn as_mut_ptr(&mut self) -> *mut u8 {
110 self.data.as_mut_ptr()
111 }
112
113 pub fn append(&mut self, data: &[u8]) -> Result<()> {
118 if data.len() > self.remaining_capacity() {
119 return Err(LanceError::BufferPoolExhausted);
120 }
121 self.data[self.len..self.len + data.len()].copy_from_slice(data);
122 self.len += data.len();
123 Ok(())
124 }
125
126 #[inline]
127 pub fn set_len(&mut self, len: usize) {
128 debug_assert!(len <= self.capacity);
129 self.len = len.min(self.capacity);
130 }
131}
132
133pub struct BatchPool {
134 free_list: Arc<ArrayQueue<LoanableBatch>>,
135 batch_capacity: usize,
136}
137
138impl BatchPool {
139 pub fn new(pool_size: usize, batch_capacity: usize) -> Result<Self> {
144 let free_list = Arc::new(ArrayQueue::new(pool_size));
145
146 for i in 0..pool_size {
147 let batch = LoanableBatch::new(i as u64, batch_capacity);
148 if free_list.push(batch).is_err() {
149 return Err(LanceError::BufferPoolExhausted);
150 }
151 }
152
153 Ok(Self {
154 free_list,
155 batch_capacity,
156 })
157 }
158
159 pub fn with_default_size(pool_size: usize) -> Result<Self> {
164 Self::new(pool_size, DEFAULT_BATCH_SIZE)
165 }
166
167 #[inline]
172 pub fn acquire(&self) -> Result<LoanableBatch> {
173 self.free_list.pop().ok_or(LanceError::BufferPoolExhausted)
174 }
175
176 #[inline]
177 pub fn release(&self, mut batch: LoanableBatch) {
178 batch.reset();
179 let _ = self.free_list.push(batch);
180 }
181
182 #[inline]
183 #[must_use]
184 pub fn available(&self) -> usize {
185 self.free_list.len()
186 }
187
188 #[inline]
189 #[must_use]
190 pub fn batch_capacity(&self) -> usize {
191 self.batch_capacity
192 }
193}
194
195impl Clone for BatchPool {
196 fn clone(&self) -> Self {
197 Self {
198 free_list: Arc::clone(&self.free_list),
199 batch_capacity: self.batch_capacity,
200 }
201 }
202}
203
204#[cfg(test)]
205#[allow(clippy::unwrap_used)]
206mod tests {
207 use super::*;
208
209 #[test]
210 fn test_batch_pool_acquire_release() {
211 let pool = BatchPool::new(4, 1024).unwrap();
212 assert_eq!(pool.available(), 4);
213
214 let batch1 = pool.acquire().unwrap();
215 assert_eq!(pool.available(), 3);
216
217 let batch2 = pool.acquire().unwrap();
218 assert_eq!(pool.available(), 2);
219
220 pool.release(batch1);
221 assert_eq!(pool.available(), 3);
222
223 pool.release(batch2);
224 assert_eq!(pool.available(), 4);
225 }
226
227 #[test]
228 fn test_batch_pool_exhaustion() {
229 let pool = BatchPool::new(2, 1024).unwrap();
230
231 let _b1 = pool.acquire().unwrap();
232 let _b2 = pool.acquire().unwrap();
233
234 let result = pool.acquire();
235 assert!(result.is_err());
236 }
237
238 #[test]
239 fn test_loanable_batch_append() {
240 let mut batch = LoanableBatch::new(0, 1024);
241 assert!(batch.is_empty());
242
243 batch.append(b"hello").unwrap();
244 assert_eq!(batch.len(), 5);
245 assert_eq!(batch.as_slice(), b"hello");
246
247 batch.append(b" world").unwrap();
248 assert_eq!(batch.len(), 11);
249 assert_eq!(batch.as_slice(), b"hello world");
250 }
251
252 #[test]
253 fn test_loanable_batch_reset() {
254 let mut batch = LoanableBatch::new(0, 1024);
255 batch.append(b"test data").unwrap();
256 assert!(!batch.is_empty());
257
258 batch.reset();
259 assert!(batch.is_empty());
260 assert_eq!(batch.len(), 0);
261 }
262}