Skip to main content

lnc_core/
batch.rs

1use crate::{DEFAULT_BATCH_SIZE, LanceError, Result, SortKey};
2use bytes::Bytes;
3use crossbeam::queue::ArrayQueue;
4use std::sync::Arc;
5
6#[repr(C, align(8))]
7pub struct IngestionBatch {
8    pub batch_id: u64,
9    pub sort_key: SortKey,
10    pub payload: Bytes,
11    pub record_count: u32,
12    pub crc: u32,
13}
14
15impl IngestionBatch {
16    #[inline]
17    #[must_use]
18    pub fn new(
19        batch_id: u64,
20        sort_key: SortKey,
21        payload: Bytes,
22        record_count: u32,
23        crc: u32,
24    ) -> Self {
25        Self {
26            batch_id,
27            sort_key,
28            payload,
29            record_count,
30            crc,
31        }
32    }
33
34    #[inline]
35    #[must_use]
36    pub fn payload_len(&self) -> usize {
37        self.payload.len()
38    }
39}
40
41#[repr(C, align(4096))]
42pub struct LoanableBatch {
43    pub batch_id: u64,
44    data: Box<[u8]>,
45    len: usize,
46    capacity: usize,
47}
48
49impl LoanableBatch {
50    /// Create a new loanable batch with the given capacity.
51    #[must_use]
52    pub fn new(batch_id: u64, capacity: usize) -> Self {
53        let data = vec![0u8; capacity].into_boxed_slice();
54        Self {
55            batch_id,
56            data,
57            len: 0,
58            capacity,
59        }
60    }
61
62    #[inline]
63    pub fn reset(&mut self) {
64        self.len = 0;
65    }
66
67    #[inline]
68    #[must_use]
69    pub fn as_slice(&self) -> &[u8] {
70        &self.data[..self.len]
71    }
72
73    #[inline]
74    pub fn as_mut_slice(&mut self) -> &mut [u8] {
75        &mut self.data[..self.len]
76    }
77
78    #[inline]
79    #[must_use]
80    pub fn remaining_capacity(&self) -> usize {
81        self.capacity - self.len
82    }
83
84    #[inline]
85    #[must_use]
86    pub fn len(&self) -> usize {
87        self.len
88    }
89
90    #[inline]
91    #[must_use]
92    pub fn is_empty(&self) -> bool {
93        self.len == 0
94    }
95
96    #[inline]
97    #[must_use]
98    pub fn capacity(&self) -> usize {
99        self.capacity
100    }
101
102    #[inline]
103    #[must_use]
104    pub fn as_ptr(&self) -> *const u8 {
105        self.data.as_ptr()
106    }
107
108    #[inline]
109    pub fn as_mut_ptr(&mut self) -> *mut u8 {
110        self.data.as_mut_ptr()
111    }
112
113    /// Append data to the batch.
114    ///
115    /// # Errors
116    /// Returns an error if there is insufficient capacity.
117    pub fn append(&mut self, data: &[u8]) -> Result<()> {
118        if data.len() > self.remaining_capacity() {
119            return Err(LanceError::BufferPoolExhausted);
120        }
121        self.data[self.len..self.len + data.len()].copy_from_slice(data);
122        self.len += data.len();
123        Ok(())
124    }
125
126    #[inline]
127    pub fn set_len(&mut self, len: usize) {
128        debug_assert!(len <= self.capacity);
129        self.len = len.min(self.capacity);
130    }
131}
132
133pub struct BatchPool {
134    free_list: Arc<ArrayQueue<LoanableBatch>>,
135    batch_capacity: usize,
136}
137
138impl BatchPool {
139    /// Create a new batch pool.
140    ///
141    /// # Errors
142    /// Returns an error if pool creation fails.
143    pub fn new(pool_size: usize, batch_capacity: usize) -> Result<Self> {
144        let free_list = Arc::new(ArrayQueue::new(pool_size));
145
146        for i in 0..pool_size {
147            let batch = LoanableBatch::new(i as u64, batch_capacity);
148            if free_list.push(batch).is_err() {
149                return Err(LanceError::BufferPoolExhausted);
150            }
151        }
152
153        Ok(Self {
154            free_list,
155            batch_capacity,
156        })
157    }
158
159    /// Create a new batch pool with default batch size.
160    ///
161    /// # Errors
162    /// Returns an error if pool creation fails.
163    pub fn with_default_size(pool_size: usize) -> Result<Self> {
164        Self::new(pool_size, DEFAULT_BATCH_SIZE)
165    }
166
167    /// Acquire a batch from the pool.
168    ///
169    /// # Errors
170    /// Returns an error if the pool is exhausted.
171    #[inline]
172    pub fn acquire(&self) -> Result<LoanableBatch> {
173        self.free_list.pop().ok_or(LanceError::BufferPoolExhausted)
174    }
175
176    #[inline]
177    pub fn release(&self, mut batch: LoanableBatch) {
178        batch.reset();
179        let _ = self.free_list.push(batch);
180    }
181
182    #[inline]
183    #[must_use]
184    pub fn available(&self) -> usize {
185        self.free_list.len()
186    }
187
188    #[inline]
189    #[must_use]
190    pub fn batch_capacity(&self) -> usize {
191        self.batch_capacity
192    }
193}
194
195impl Clone for BatchPool {
196    fn clone(&self) -> Self {
197        Self {
198            free_list: Arc::clone(&self.free_list),
199            batch_capacity: self.batch_capacity,
200        }
201    }
202}
203
204#[cfg(test)]
205#[allow(clippy::unwrap_used)]
206mod tests {
207    use super::*;
208
209    #[test]
210    fn test_batch_pool_acquire_release() {
211        let pool = BatchPool::new(4, 1024).unwrap();
212        assert_eq!(pool.available(), 4);
213
214        let batch1 = pool.acquire().unwrap();
215        assert_eq!(pool.available(), 3);
216
217        let batch2 = pool.acquire().unwrap();
218        assert_eq!(pool.available(), 2);
219
220        pool.release(batch1);
221        assert_eq!(pool.available(), 3);
222
223        pool.release(batch2);
224        assert_eq!(pool.available(), 4);
225    }
226
227    #[test]
228    fn test_batch_pool_exhaustion() {
229        let pool = BatchPool::new(2, 1024).unwrap();
230
231        let _b1 = pool.acquire().unwrap();
232        let _b2 = pool.acquire().unwrap();
233
234        let result = pool.acquire();
235        assert!(result.is_err());
236    }
237
238    #[test]
239    fn test_loanable_batch_append() {
240        let mut batch = LoanableBatch::new(0, 1024);
241        assert!(batch.is_empty());
242
243        batch.append(b"hello").unwrap();
244        assert_eq!(batch.len(), 5);
245        assert_eq!(batch.as_slice(), b"hello");
246
247        batch.append(b" world").unwrap();
248        assert_eq!(batch.len(), 11);
249        assert_eq!(batch.as_slice(), b"hello world");
250    }
251
252    #[test]
253    fn test_loanable_batch_reset() {
254        let mut batch = LoanableBatch::new(0, 1024);
255        batch.append(b"test data").unwrap();
256        assert!(!batch.is_empty());
257
258        batch.reset();
259        assert!(batch.is_empty());
260        assert_eq!(batch.len(), 0);
261    }
262}