rawdb/
region.rs

1use std::{fs::File, mem, sync::Arc};
2
3use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
4
5use crate::{Database, Error, Reader, RegionMetadata, Result, WeakDatabase};
6
7/// Named region within a database providing isolated storage space.
8///
9/// Regions grow dynamically as data is written and can be moved within the
10/// database file to optimize space usage. Each region has a unique ID for lookup.
11#[derive(Debug, Clone)]
12#[must_use = "Region should be stored to access the data"]
13pub struct Region(Arc<RegionInner>);
14
15#[derive(Debug)]
16pub struct RegionInner {
17    db: WeakDatabase,
18    index: usize,
19    meta: RwLock<RegionMetadata>,
20    /// Dirty ranges (start_offset, end_offset) relative to region start.
21    /// Merged at flush time to reduce syscalls.
22    /// Separate from meta to allow flush without blocking iterators.
23    dirty_ranges: Mutex<Vec<(usize, usize)>>,
24}
25
26impl Region {
27    pub fn new(
28        db: &Database,
29        id: String,
30        index: usize,
31        start: usize,
32        len: usize,
33        reserved: usize,
34    ) -> Self {
35        Self(Arc::new(RegionInner {
36            db: db.weak_clone(),
37            index,
38            meta: RwLock::new(RegionMetadata::new(id, start, len, reserved)),
39            dirty_ranges: Mutex::new(Vec::new()),
40        }))
41    }
42
43    pub fn from(db: &Database, index: usize, meta: RegionMetadata) -> Self {
44        Self(Arc::new(RegionInner {
45            db: db.weak_clone(),
46            index,
47            meta: RwLock::new(meta),
48            dirty_ranges: Mutex::new(Vec::new()),
49        }))
50    }
51
52    /// Creates a reader for zero-copy access to this region's data.
53    ///
54    /// The Reader holds read locks on both the memory map and region metadata,
55    /// blocking writes until dropped. Drop the reader as soon as you're done
56    /// reading to avoid blocking other operations.
57    #[inline]
58    pub fn create_reader(&self) -> Reader {
59        Reader::new(self)
60    }
61
62    pub fn open_db_read_only_file(&self) -> Result<File> {
63        self.db().open_read_only_file()
64    }
65
66    /// Appends data to the end of the region.
67    ///
68    /// The region will automatically grow and relocate if needed.
69    /// Data is written to the mmap but not durable until `flush()` is called.
70    #[inline]
71    pub fn write(&self, data: &[u8]) -> Result<()> {
72        self.write_with(data, None, false)
73    }
74
75    /// Writes data at a specific offset within the region.
76    ///
77    /// The offset must be within the current region length.
78    /// Data written past the current end will extend the length.
79    /// Data is written to the mmap but not durable until `flush()` is called.
80    #[inline]
81    pub fn write_at(&self, data: &[u8], at: usize) -> Result<()> {
82        self.write_with(data, Some(at), false)
83    }
84
85    /// Writes values directly to the mmap with dirty range tracking.
86    ///
87    /// All writes must be within the current region length (no extension).
88    /// Tracks dirty ranges to avoid flushing unchanged data.
89    ///
90    /// - `iter`: Iterator yielding (offset, value) pairs where offset is relative to region start
91    /// - `value_len`: The byte size of each value
92    /// - `write_fn`: Called for each (value, slice) to serialize the value into the slice
93    #[inline]
94    pub fn batch_write_each<T, F>(
95        &self,
96        iter: impl Iterator<Item = (usize, T)>,
97        value_len: usize,
98        mut write_fn: F,
99    ) where
100        F: FnMut(&T, &mut [u8]),
101    {
102        let region_start = self.meta().start();
103        let db = self.db();
104        let mmap = db.mmap();
105        let ptr = mmap.as_ptr() as *mut u8;
106
107        let mut ranges = self.0.dirty_ranges.lock();
108
109        for (offset, value) in iter {
110            let abs_offset = region_start + offset;
111            let slice = unsafe { std::slice::from_raw_parts_mut(ptr.add(abs_offset), value_len) };
112            write_fn(&value, slice);
113            ranges.push((offset, offset + value_len));
114        }
115    }
116
117    /// Truncates the region to the specified length.
118    ///
119    /// This reduces the logical length but doesn't modify existing data bytes.
120    /// The truncated data becomes inaccessible even though the bytes remain in the mmap.
121    /// Changes are not durable until `flush()` is called.
122    pub fn truncate(&self, from: usize) -> Result<()> {
123        // Check current length first (quick read, guard dropped immediately)
124        let len = self.meta().len();
125        if from == len {
126            return Ok(());
127        } else if from > len {
128            return Err(Error::TruncateInvalid {
129                from,
130                current_len: len,
131            });
132        }
133
134        let db = self.db();
135        // Lock order: regions -> metadata (top-to-bottom)
136        let regions = db.regions();
137        let mut meta = self.meta_mut();
138        meta.set_len(from);
139        meta.write_if_dirty(self.index(), &regions);
140        Ok(())
141    }
142
143    /// Truncates the region to a specific offset and writes data there.
144    ///
145    /// This is an atomic truncate + write operation. The final length will be
146    /// exactly `at + data.len()` regardless of the previous length.
147    /// Changes are not durable until `flush()` is called.
148    #[inline]
149    pub fn truncate_write(&self, at: usize, data: &[u8]) -> Result<()> {
150        self.write_with(data, Some(at), true)
151    }
152
153    fn write_with(&self, data: &[u8], at: Option<usize>, truncate: bool) -> Result<()> {
154        let db = self.db();
155        let index = self.index();
156        let meta = self.meta();
157        let start = meta.start();
158        let reserved = meta.reserved();
159        let len = meta.len();
160        drop(meta);
161
162        let data_len = data.len();
163
164        // Validate write position if specified
165        // Note: checking `at > len` is sufficient since `len <= reserved` is always true
166        // Therefore if `at <= len`, then `at <= reserved` must also be true
167        if let Some(at_val) = at
168            && at_val > len
169        {
170            return Err(Error::WriteOutOfBounds {
171                position: at_val,
172                region_len: len,
173            });
174        }
175
176        let new_len = at.map_or(len + data_len, |at| {
177            let new_len = at + data_len;
178            if truncate { new_len } else { new_len.max(len) }
179        });
180        let write_start = start + at.unwrap_or(len);
181
182        // Write to reserved space if possible
183        if new_len <= reserved {
184            // Write before acquiring meta to avoid deadlock with punch_holes.
185            // Lock order: mmap (via db.write) must come before meta.
186            db.write(write_start, data);
187
188            // Lock order: regions → meta
189            let regions = db.regions();
190            let mut meta = self.meta_mut();
191
192            self.mark_dirty_abs(start, write_start, data_len);
193            meta.set_len(new_len);
194            meta.write_if_dirty(index, &regions);
195
196            return Ok(());
197        }
198
199        assert!(new_len > reserved);
200        if reserved == 0 {
201            panic!(
202                "reserved is 0 which would cause infinite loop! start={start}, len={len}, index={index}, new_len={new_len}"
203            );
204        }
205        let mut new_reserved = reserved;
206        while new_len > new_reserved {
207            new_reserved = new_reserved
208                .checked_mul(2)
209                .expect("Region size would overflow usize");
210        }
211        assert!(new_len <= new_reserved);
212        let added_reserve = new_reserved - reserved;
213
214        let mut layout = db.layout_mut();
215
216        // If is last continue writing
217        if layout.is_last_anything(self) {
218            // Release layout BEFORE calling set_min_len to avoid deadlock.
219            // set_min_len needs mmap_mut, and another thread may hold mmap read
220            // while waiting for layout_mut, causing deadlock if we hold layout here.
221            let target_len = start + new_reserved;
222            drop(layout);
223
224            db.set_min_len(target_len)?;
225
226            // Re-acquire layout and verify we're still last
227            let layout = db.layout();
228            if !layout.is_last_anything(self) {
229                // Another region was appended while we didn't hold the lock.
230                // Fall through to the other code paths by restarting.
231                drop(layout);
232                return self.write_with(data, at, truncate);
233            }
234            drop(layout);
235
236            let mut meta = self.meta_mut();
237            meta.set_reserved(new_reserved);
238            drop(meta);
239
240            db.write(write_start, data);
241
242            self.mark_dirty_abs(start, write_start, data_len);
243            // Acquire regions READ lock BEFORE metadata WRITE lock to prevent deadlock.
244            let regions = db.regions();
245            let mut meta = self.meta_mut();
246            meta.set_len(new_len);
247            meta.write_if_dirty(index, &regions);
248
249            return Ok(());
250        }
251
252        // Expand region to the right if gap is wide enough
253        let hole_start = start + reserved;
254        if layout
255            .get_hole(hole_start)
256            .is_some_and(|gap| gap >= added_reserve)
257        {
258            layout.remove_or_compress_hole(hole_start, added_reserve)?;
259            let mut meta = self.meta_mut();
260            meta.set_reserved(new_reserved);
261            drop(meta);
262            drop(layout);
263
264            db.write(write_start, data);
265
266            self.mark_dirty_abs(start, write_start, data_len);
267            // Acquire regions READ lock BEFORE metadata WRITE lock to prevent deadlock.
268            let regions = db.regions();
269            let mut meta = self.meta_mut();
270            meta.set_len(new_len);
271            meta.write_if_dirty(index, &regions);
272
273            return Ok(());
274        }
275
276        // Find hole big enough to move the region
277        if let Some(hole_start) = layout.find_smallest_adequate_hole(new_reserved) {
278            layout.remove_or_compress_hole(hole_start, new_reserved)?;
279            layout.reserve(hole_start, new_reserved);
280            drop(layout);
281
282            db.copy(start, hole_start, write_start - start);
283            db.write(hole_start + at.unwrap_or(len), data);
284
285            let mut layout = db.layout_mut();
286            layout.move_region(hole_start, self)?;
287            assert!(layout.take_reserved(hole_start) == Some(new_reserved));
288
289            // Region moved, mark all data as dirty (relative to new start)
290            self.mark_dirty(0, new_len);
291            // Lock order: layout (held) → regions → meta
292            let regions = db.regions();
293            let mut meta = self.meta_mut();
294            meta.set_start(hole_start);
295            meta.set_reserved(new_reserved);
296            meta.set_len(new_len);
297            meta.write_if_dirty(index, &regions);
298
299            return Ok(());
300        }
301
302        // Allocate at end of file
303        let new_start = layout.len();
304        let target_len = new_start + new_reserved;
305        // Release layout BEFORE calling set_min_len to avoid deadlock.
306        drop(layout);
307
308        db.set_min_len(target_len)?;
309
310        // Re-acquire layout and reserve space
311        let mut layout = db.layout_mut();
312        // Verify new_start is still valid (another thread may have appended)
313        let current_len = layout.len();
314        if current_len != new_start {
315            // State changed, restart to pick the right path
316            drop(layout);
317            return self.write_with(data, at, truncate);
318        }
319        layout.reserve(new_start, new_reserved);
320        drop(layout);
321
322        db.copy(start, new_start, write_start - start);
323        db.write(new_start + at.unwrap_or(len), data);
324
325        let mut layout = db.layout_mut();
326        layout.move_region(new_start, self)?;
327        assert!(layout.take_reserved(new_start) == Some(new_reserved));
328
329        // Region moved, mark all data as dirty (relative to new start)
330        self.mark_dirty(0, new_len);
331        // Lock order: layout (held) → regions → meta
332        let regions = db.regions();
333        let mut meta = self.meta_mut();
334        meta.set_start(new_start);
335        meta.set_reserved(new_reserved);
336        meta.set_len(new_len);
337        meta.write_if_dirty(index, &regions);
338
339        Ok(())
340    }
341
342    /// Renames the region to a new ID.
343    ///
344    /// The new ID must not already be in use.
345    /// Changes are not durable until `flush()` is called.
346    pub fn rename(&self, new_id: &str) -> Result<()> {
347        let old_id = self.meta().id().to_string();
348        let db = self.db();
349        let mut regions = db.regions_mut();
350        let mut meta = self.meta_mut();
351        let index = self.index();
352        regions.rename(&old_id, new_id)?;
353        meta.set_id(new_id.to_string());
354        meta.write_if_dirty(index, &regions);
355        Ok(())
356    }
357
358    /// Removes the region from the database.
359    ///
360    /// The space is marked as a pending hole that will become reusable after
361    /// the next `flush()`. This consumes the region to prevent use-after-free.
362    pub fn remove(self) -> Result<()> {
363        let db = self.db();
364        // Lock order: layout → regions
365        let mut layout = db.layout_mut();
366        let mut regions = db.regions_mut();
367        layout.remove_region(&self)?;
368        regions.remove(&self)?;
369        Ok(())
370    }
371
372    /// Flushes this region's dirty data and metadata to disk.
373    ///
374    /// Flushes if any data writes or metadata-only changes (truncate, rename) were made.
375    /// Returns `Ok(true)` if anything was flushed, `Ok(false)` if nothing was dirty.
376    pub fn flush(&self) -> Result<bool> {
377        let db = self.db();
378        let dirty_ranges = self.take_dirty_ranges();
379
380        let data_flushed = if !dirty_ranges.is_empty() {
381            // Lock order: mmap before meta, so release meta before acquiring mmap
382            let region_start = self.meta().start();
383            let mmap = db.mmap();
384            for (dirty_start, dirty_end) in &dirty_ranges {
385                mmap.flush_range(region_start + dirty_start, dirty_end - dirty_start)?;
386            }
387            true
388        } else {
389            false
390        };
391
392        // Lock order: regions → meta
393        let regions = db.regions();
394        let meta = self.meta();
395        let meta_flushed = meta.flush(self.index(), &regions)?;
396
397        Ok(data_flushed || meta_flushed)
398    }
399
400    #[inline(always)]
401    pub fn arc(&self) -> &Arc<RegionInner> {
402        &self.0
403    }
404
405    #[inline(always)]
406    pub fn index(&self) -> usize {
407        self.0.index
408    }
409
410    #[inline(always)]
411    pub fn meta(&self) -> RwLockReadGuard<'_, RegionMetadata> {
412        self.0.meta.read()
413    }
414
415    #[inline(always)]
416    fn meta_mut(&self) -> RwLockWriteGuard<'_, RegionMetadata> {
417        self.0.meta.write()
418    }
419
420    #[inline(always)]
421    pub fn db(&self) -> Database {
422        self.0.db.upgrade()
423    }
424
425    /// Marks a range as dirty (needing flush).
426    /// `offset` is relative to region start.
427    #[inline]
428    pub fn mark_dirty(&self, offset: usize, len: usize) {
429        self.0.dirty_ranges.lock().push((offset, offset + len));
430    }
431
432    /// Marks a range as dirty using absolute file positions.
433    /// Converts to relative offsets internally.
434    #[inline]
435    fn mark_dirty_abs(&self, region_start: usize, abs_start: usize, len: usize) {
436        let offset = abs_start - region_start;
437        self.mark_dirty(offset, len);
438    }
439
440    /// Takes and returns dirty ranges, clearing the internal Vec and releasing memory.
441    /// Concurrent writes after this call will create new ranges for next flush.
442    #[inline]
443    pub(crate) fn take_dirty_ranges(&self) -> Vec<(usize, usize)> {
444        mem::take(&mut *self.0.dirty_ranges.lock())
445    }
446}