1use std::{fs::File, mem, sync::Arc};
2
3use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
4
5use crate::{Database, Error, Reader, RegionMetadata, Result, WeakDatabase};
6
7#[derive(Debug, Clone)]
12#[must_use = "Region should be stored to access the data"]
13pub struct Region(Arc<RegionInner>);
14
15#[derive(Debug)]
16pub struct RegionInner {
17 db: WeakDatabase,
18 index: usize,
19 meta: RwLock<RegionMetadata>,
20 dirty_ranges: Mutex<Vec<(usize, usize)>>,
24}
25
26impl Region {
27 pub fn new(
28 db: &Database,
29 id: String,
30 index: usize,
31 start: usize,
32 len: usize,
33 reserved: usize,
34 ) -> Self {
35 Self(Arc::new(RegionInner {
36 db: db.weak_clone(),
37 index,
38 meta: RwLock::new(RegionMetadata::new(id, start, len, reserved)),
39 dirty_ranges: Mutex::new(Vec::new()),
40 }))
41 }
42
43 pub fn from(db: &Database, index: usize, meta: RegionMetadata) -> Self {
44 Self(Arc::new(RegionInner {
45 db: db.weak_clone(),
46 index,
47 meta: RwLock::new(meta),
48 dirty_ranges: Mutex::new(Vec::new()),
49 }))
50 }
51
52 #[inline]
58 pub fn create_reader(&self) -> Reader {
59 Reader::new(self)
60 }
61
62 pub fn open_db_read_only_file(&self) -> Result<File> {
63 self.db().open_read_only_file()
64 }
65
66 #[inline]
71 pub fn write(&self, data: &[u8]) -> Result<()> {
72 self.write_with(data, None, false)
73 }
74
75 #[inline]
81 pub fn write_at(&self, data: &[u8], at: usize) -> Result<()> {
82 self.write_with(data, Some(at), false)
83 }
84
85 #[inline]
94 pub fn batch_write_each<T, F>(
95 &self,
96 iter: impl Iterator<Item = (usize, T)>,
97 value_len: usize,
98 mut write_fn: F,
99 ) where
100 F: FnMut(&T, &mut [u8]),
101 {
102 let region_start = self.meta().start();
103 let db = self.db();
104 let mmap = db.mmap();
105 let ptr = mmap.as_ptr() as *mut u8;
106
107 let mut ranges = self.0.dirty_ranges.lock();
108
109 for (offset, value) in iter {
110 let abs_offset = region_start + offset;
111 let slice = unsafe { std::slice::from_raw_parts_mut(ptr.add(abs_offset), value_len) };
112 write_fn(&value, slice);
113 ranges.push((offset, offset + value_len));
114 }
115 }
116
117 pub fn truncate(&self, from: usize) -> Result<()> {
123 let len = self.meta().len();
125 if from == len {
126 return Ok(());
127 } else if from > len {
128 return Err(Error::TruncateInvalid {
129 from,
130 current_len: len,
131 });
132 }
133
134 let db = self.db();
135 let regions = db.regions();
137 let mut meta = self.meta_mut();
138 meta.set_len(from);
139 meta.write_if_dirty(self.index(), ®ions);
140 Ok(())
141 }
142
143 #[inline]
149 pub fn truncate_write(&self, at: usize, data: &[u8]) -> Result<()> {
150 self.write_with(data, Some(at), true)
151 }
152
153 fn write_with(&self, data: &[u8], at: Option<usize>, truncate: bool) -> Result<()> {
154 let db = self.db();
155 let index = self.index();
156 let meta = self.meta();
157 let start = meta.start();
158 let reserved = meta.reserved();
159 let len = meta.len();
160 drop(meta);
161
162 let data_len = data.len();
163
164 if let Some(at_val) = at
168 && at_val > len
169 {
170 return Err(Error::WriteOutOfBounds {
171 position: at_val,
172 region_len: len,
173 });
174 }
175
176 let new_len = at.map_or(len + data_len, |at| {
177 let new_len = at + data_len;
178 if truncate { new_len } else { new_len.max(len) }
179 });
180 let write_start = start + at.unwrap_or(len);
181
182 if new_len <= reserved {
184 db.write(write_start, data);
187
188 let regions = db.regions();
190 let mut meta = self.meta_mut();
191
192 self.mark_dirty_abs(start, write_start, data_len);
193 meta.set_len(new_len);
194 meta.write_if_dirty(index, ®ions);
195
196 return Ok(());
197 }
198
199 assert!(new_len > reserved);
200 if reserved == 0 {
201 panic!(
202 "reserved is 0 which would cause infinite loop! start={start}, len={len}, index={index}, new_len={new_len}"
203 );
204 }
205 let mut new_reserved = reserved;
206 while new_len > new_reserved {
207 new_reserved = new_reserved
208 .checked_mul(2)
209 .expect("Region size would overflow usize");
210 }
211 assert!(new_len <= new_reserved);
212 let added_reserve = new_reserved - reserved;
213
214 let mut layout = db.layout_mut();
215
216 if layout.is_last_anything(self) {
218 let target_len = start + new_reserved;
222 drop(layout);
223
224 db.set_min_len(target_len)?;
225
226 let layout = db.layout();
228 if !layout.is_last_anything(self) {
229 drop(layout);
232 return self.write_with(data, at, truncate);
233 }
234 drop(layout);
235
236 let mut meta = self.meta_mut();
237 meta.set_reserved(new_reserved);
238 drop(meta);
239
240 db.write(write_start, data);
241
242 self.mark_dirty_abs(start, write_start, data_len);
243 let regions = db.regions();
245 let mut meta = self.meta_mut();
246 meta.set_len(new_len);
247 meta.write_if_dirty(index, ®ions);
248
249 return Ok(());
250 }
251
252 let hole_start = start + reserved;
254 if layout
255 .get_hole(hole_start)
256 .is_some_and(|gap| gap >= added_reserve)
257 {
258 layout.remove_or_compress_hole(hole_start, added_reserve)?;
259 let mut meta = self.meta_mut();
260 meta.set_reserved(new_reserved);
261 drop(meta);
262 drop(layout);
263
264 db.write(write_start, data);
265
266 self.mark_dirty_abs(start, write_start, data_len);
267 let regions = db.regions();
269 let mut meta = self.meta_mut();
270 meta.set_len(new_len);
271 meta.write_if_dirty(index, ®ions);
272
273 return Ok(());
274 }
275
276 if let Some(hole_start) = layout.find_smallest_adequate_hole(new_reserved) {
278 layout.remove_or_compress_hole(hole_start, new_reserved)?;
279 layout.reserve(hole_start, new_reserved);
280 drop(layout);
281
282 db.copy(start, hole_start, write_start - start);
283 db.write(hole_start + at.unwrap_or(len), data);
284
285 let mut layout = db.layout_mut();
286 layout.move_region(hole_start, self)?;
287 assert!(layout.take_reserved(hole_start) == Some(new_reserved));
288
289 self.mark_dirty(0, new_len);
291 let regions = db.regions();
293 let mut meta = self.meta_mut();
294 meta.set_start(hole_start);
295 meta.set_reserved(new_reserved);
296 meta.set_len(new_len);
297 meta.write_if_dirty(index, ®ions);
298
299 return Ok(());
300 }
301
302 let new_start = layout.len();
304 let target_len = new_start + new_reserved;
305 drop(layout);
307
308 db.set_min_len(target_len)?;
309
310 let mut layout = db.layout_mut();
312 let current_len = layout.len();
314 if current_len != new_start {
315 drop(layout);
317 return self.write_with(data, at, truncate);
318 }
319 layout.reserve(new_start, new_reserved);
320 drop(layout);
321
322 db.copy(start, new_start, write_start - start);
323 db.write(new_start + at.unwrap_or(len), data);
324
325 let mut layout = db.layout_mut();
326 layout.move_region(new_start, self)?;
327 assert!(layout.take_reserved(new_start) == Some(new_reserved));
328
329 self.mark_dirty(0, new_len);
331 let regions = db.regions();
333 let mut meta = self.meta_mut();
334 meta.set_start(new_start);
335 meta.set_reserved(new_reserved);
336 meta.set_len(new_len);
337 meta.write_if_dirty(index, ®ions);
338
339 Ok(())
340 }
341
342 pub fn rename(&self, new_id: &str) -> Result<()> {
347 let old_id = self.meta().id().to_string();
348 let db = self.db();
349 let mut regions = db.regions_mut();
350 let mut meta = self.meta_mut();
351 let index = self.index();
352 regions.rename(&old_id, new_id)?;
353 meta.set_id(new_id.to_string());
354 meta.write_if_dirty(index, ®ions);
355 Ok(())
356 }
357
358 pub fn remove(self) -> Result<()> {
363 let db = self.db();
364 let mut layout = db.layout_mut();
366 let mut regions = db.regions_mut();
367 layout.remove_region(&self)?;
368 regions.remove(&self)?;
369 Ok(())
370 }
371
372 pub fn flush(&self) -> Result<bool> {
377 let db = self.db();
378 let dirty_ranges = self.take_dirty_ranges();
379
380 let data_flushed = if !dirty_ranges.is_empty() {
381 let region_start = self.meta().start();
383 let mmap = db.mmap();
384 for (dirty_start, dirty_end) in &dirty_ranges {
385 mmap.flush_range(region_start + dirty_start, dirty_end - dirty_start)?;
386 }
387 true
388 } else {
389 false
390 };
391
392 let regions = db.regions();
394 let meta = self.meta();
395 let meta_flushed = meta.flush(self.index(), ®ions)?;
396
397 Ok(data_flushed || meta_flushed)
398 }
399
400 #[inline(always)]
401 pub fn arc(&self) -> &Arc<RegionInner> {
402 &self.0
403 }
404
405 #[inline(always)]
406 pub fn index(&self) -> usize {
407 self.0.index
408 }
409
410 #[inline(always)]
411 pub fn meta(&self) -> RwLockReadGuard<'_, RegionMetadata> {
412 self.0.meta.read()
413 }
414
415 #[inline(always)]
416 fn meta_mut(&self) -> RwLockWriteGuard<'_, RegionMetadata> {
417 self.0.meta.write()
418 }
419
420 #[inline(always)]
421 pub fn db(&self) -> Database {
422 self.0.db.upgrade()
423 }
424
425 #[inline]
428 pub fn mark_dirty(&self, offset: usize, len: usize) {
429 self.0.dirty_ranges.lock().push((offset, offset + len));
430 }
431
432 #[inline]
435 fn mark_dirty_abs(&self, region_start: usize, abs_start: usize, len: usize) {
436 let offset = abs_start - region_start;
437 self.mark_dirty(offset, len);
438 }
439
440 #[inline]
443 pub(crate) fn take_dirty_ranges(&self) -> Vec<(usize, usize)> {
444 mem::take(&mut *self.0.dirty_ranges.lock())
445 }
446}