1use std::{
2 fs::{File, OpenOptions},
3 io, mem,
4 ops::{Deref, DerefMut},
5 os::fd::AsRawFd,
6 path::Path,
7 ptr, slice,
8 sync::atomic::Ordering,
9};
10
11use crate::{
12 stats::{COUNT_ACTIVE_SEGMENT, COUNT_FTRUNCATE_FAILED, COUNT_MMAP_FAILED, COUNT_MUNMAP_FAILED},
13 utils::{check_zst, page_size},
14};
15
16#[derive(Debug)]
22pub struct Segment<T> {
23 pub(crate) addr: *mut T,
24 len: usize,
25 capacity: usize,
26}
27
28impl<T> Segment<T> {
29 #[inline(always)]
31 pub const fn null() -> Self {
32 check_zst::<T>();
33 Self {
34 addr: std::ptr::null_mut(),
35 len: 0,
36 capacity: 0,
37 }
38 }
39
40 pub fn open_rw<P: AsRef<Path>>(path: P, capacity: usize) -> io::Result<Self> {
44 check_zst::<T>();
45 if capacity == 0 {
46 return Ok(Self::null());
47 }
48
49 let file = OpenOptions::new()
50 .read(true)
51 .write(true)
52 .create(true)
53 .open(&path)?;
54
55 unsafe { ftruncate::<T>(&file, capacity) }?;
57
58 let addr = unsafe { mmap(&file, capacity) }?;
60 Ok(Self {
61 addr,
62 len: 0,
63 capacity,
64 })
65 }
66
67 #[inline(always)]
69 pub fn capacity(&self) -> usize {
70 self.capacity
71 }
72
73 pub fn truncate(&mut self, new_len: usize) {
76 if new_len > self.len {
77 return;
78 }
79
80 unsafe {
81 let remaining_len = self.len - new_len;
82 let items = ptr::slice_from_raw_parts_mut(self.addr.add(new_len), remaining_len);
83 self.set_len(new_len);
84 ptr::drop_in_place(items);
85 }
86 }
87
88 pub fn truncate_first(&mut self, delete_count: usize) {
95 let new_len = self.len.saturating_add_signed(-(delete_count as isize));
96 if new_len == 0 {
97 self.clear()
98 } else {
99 unsafe {
100 let items = slice::from_raw_parts_mut(self.addr, delete_count);
101 ptr::drop_in_place(items);
102 ptr::copy(self.addr.add(delete_count), self.addr, new_len);
103 self.set_len(new_len);
104 }
105 }
106 }
107
108 #[inline]
110 pub fn clear(&mut self) {
111 unsafe {
112 let items = slice::from_raw_parts_mut(self.addr, self.len);
113 self.set_len(0);
114 ptr::drop_in_place(items);
115 }
116 }
117
118 #[allow(clippy::missing_safety_doc)]
120 #[inline(always)]
121 pub unsafe fn set_len(&mut self, new_len: usize) {
122 debug_assert!(new_len <= self.capacity());
123 self.len = new_len;
124 }
125
126 #[inline(always)]
128 pub fn disk_size(&self) -> usize {
129 self.capacity * mem::size_of::<T>()
130 }
131
132 #[inline]
136 pub fn push_within_capacity(&mut self, value: T) -> Result<(), T> {
137 if self.len == self.capacity {
138 return Err(value);
139 }
140
141 unsafe {
142 let dst = self.addr.add(self.len);
143 ptr::write(dst, value);
144 }
145
146 self.len += 1;
147 Ok(())
148 }
149
150 #[inline]
154 pub fn pop(&mut self) -> Option<T> {
155 if self.len == 0 {
156 return None;
157 }
158
159 self.len -= 1;
160 unsafe {
161 let src = self.addr.add(self.len);
162 Some(ptr::read(src))
163 }
164 }
165
166 pub fn extend_from_segment(&mut self, mut other: Segment<T>) {
188 let new_len = other.len + self.len;
189 assert!(
190 new_len <= self.capacity,
191 "New segment is too small: new_len={}, capacity={}",
192 new_len,
193 self.capacity
194 );
195
196 unsafe {
197 ptr::copy_nonoverlapping(other.addr, self.addr.add(self.len), other.len);
198 self.set_len(new_len);
199 other.set_len(0);
200 };
201 }
202
203 pub fn advice_prefetch_all_pages(&self) {
211 if self.addr.is_null() || self.len == 0 {
212 return;
213 }
214
215 let madvise_code = unsafe {
216 libc::madvise(
217 self.addr.cast(),
218 self.len * mem::size_of::<T>(),
219 libc::MADV_WILLNEED,
220 )
221 };
222 assert_eq!(
223 madvise_code,
224 0,
225 "madvise error: {}",
226 io::Error::last_os_error()
227 );
228 }
229
230 pub fn advice_prefetch_page_at(&self, index: usize) {
234 if self.addr.is_null() || index >= self.len {
235 return;
236 }
237
238 let page_size = page_size();
239 let page_mask = !(page_size.wrapping_add_signed(-1));
240
241 let madvise_code = unsafe {
242 libc::madvise(
243 (self.addr.add(index) as usize & page_mask) as *mut libc::c_void,
244 page_size,
245 libc::MADV_WILLNEED,
246 )
247 };
248 assert_eq!(
249 madvise_code,
250 0,
251 "madvise error: {}",
252 io::Error::last_os_error()
253 );
254 }
255}
256
257impl<T> Deref for Segment<T> {
258 type Target = [T];
259
260 #[inline(always)]
261 fn deref(&self) -> &Self::Target {
262 unsafe { slice::from_raw_parts(self.addr, self.len) }
263 }
264}
265
266impl<T> DerefMut for Segment<T> {
267 #[inline(always)]
268 fn deref_mut(&mut self) -> &mut Self::Target {
269 unsafe { slice::from_raw_parts_mut(self.addr, self.len) }
270 }
271}
272
273impl<T> Drop for Segment<T> {
274 fn drop(&mut self) {
275 if self.len > 0 {
276 unsafe { ptr::drop_in_place(ptr::slice_from_raw_parts_mut(self.addr, self.len)) }
277 }
278
279 if !self.addr.is_null() {
280 let _ = unsafe { munmap(self.addr, self.capacity) };
281 }
282 }
283}
284
285unsafe impl<T> Send for Segment<T> {}
286unsafe impl<T> Sync for Segment<T> {}
287
288unsafe fn ftruncate<T>(file: &File, capacity: usize) -> io::Result<()> {
289 check_zst::<T>();
290 let segment_size = capacity * mem::size_of::<T>();
291 let fd = file.as_raw_fd();
292
293 if libc::ftruncate(fd, segment_size as libc::off_t) != 0 {
294 COUNT_FTRUNCATE_FAILED.fetch_add(1, Ordering::Relaxed);
295 Err(io::Error::last_os_error())
296 } else {
297 Ok(())
298 }
299}
300
301unsafe fn mmap<T>(file: &File, capacity: usize) -> io::Result<*mut T> {
302 check_zst::<T>();
303 let segment_size = capacity * mem::size_of::<T>();
304
305 let fd = file.as_raw_fd();
308
309 let addr = libc::mmap(
310 std::ptr::null_mut(),
311 segment_size as libc::size_t,
312 libc::PROT_READ | libc::PROT_WRITE,
313 libc::MAP_SHARED,
314 fd,
315 0,
316 );
317
318 if addr == libc::MAP_FAILED {
319 COUNT_MMAP_FAILED.fetch_add(1, Ordering::Relaxed);
320 Err(io::Error::last_os_error())
321 } else {
322 COUNT_ACTIVE_SEGMENT.fetch_add(1, Ordering::Relaxed);
323 Ok(addr.cast())
324 }
325}
326
327unsafe fn munmap<T>(addr: *mut T, capacity: usize) -> io::Result<()> {
328 check_zst::<T>();
329 debug_assert!(!addr.is_null());
330 debug_assert!(capacity > 0);
331
332 let unmap_code = libc::munmap(addr.cast(), capacity * mem::size_of::<T>());
333
334 if unmap_code != 0 {
335 COUNT_MUNMAP_FAILED.fetch_add(1, Ordering::Relaxed);
336 Err(io::Error::last_os_error())
337 } else {
338 COUNT_ACTIVE_SEGMENT.fetch_sub(1, Ordering::Relaxed);
339 Ok(())
340 }
341}