1use super::header::{ArenaHeader, HEADER_SIZE};
4use crate::error::{Result, XervError};
5use crate::types::{ArenaOffset, RelPtr, TraceId};
6use fs2::FileExt;
7use memmap2::{MmapMut, MmapOptions};
8use parking_lot::RwLock;
9use std::fs::{File, OpenOptions};
10use std::path::{Path, PathBuf};
11use std::sync::Arc;
12use std::sync::atomic::{AtomicU64, Ordering};
13
14pub const DEFAULT_ARENA_SIZE: u64 = 16 * 1024 * 1024;
16
17pub const MAX_ARENA_SIZE: u64 = 4 * 1024 * 1024 * 1024;
19
20pub const ENTRY_ALIGNMENT: usize = 8;
22
23#[derive(Debug, Clone)]
25pub struct ArenaConfig {
26 pub capacity: u64,
28 pub directory: PathBuf,
30 pub sync_on_write: bool,
32}
33
34impl Default for ArenaConfig {
35 fn default() -> Self {
36 Self {
37 capacity: DEFAULT_ARENA_SIZE,
38 directory: PathBuf::from("/tmp/xerv"),
39 sync_on_write: false,
40 }
41 }
42}
43
44impl ArenaConfig {
45 pub fn in_memory() -> Self {
49 Self {
50 capacity: 4 * 1024 * 1024, directory: std::env::temp_dir().join(format!("xerv_arena_{}", uuid::Uuid::new_v4())),
52 sync_on_write: false,
53 }
54 }
55
56 pub fn with_capacity(mut self, capacity: u64) -> Self {
58 self.capacity = capacity.min(MAX_ARENA_SIZE);
59 self
60 }
61
62 pub fn with_directory(mut self, directory: impl Into<PathBuf>) -> Self {
64 self.directory = directory.into();
65 self
66 }
67
68 pub fn with_sync(mut self, sync: bool) -> Self {
70 self.sync_on_write = sync;
71 self
72 }
73}
74
75struct ArenaInner {
77 mmap: MmapMut,
79 file: File,
81 path: PathBuf,
83 write_pos: AtomicU64,
85 header: ArenaHeader,
87 sync_on_write: bool,
89 capacity: u64,
91}
92
93pub struct Arena {
98 inner: Arc<RwLock<ArenaInner>>,
99 trace_id: TraceId,
100}
101
102impl Arena {
103 pub fn create(trace_id: TraceId, config: &ArenaConfig) -> Result<Self> {
105 std::fs::create_dir_all(&config.directory).map_err(|e| XervError::ArenaCreate {
107 path: config.directory.clone(),
108 cause: e.to_string(),
109 })?;
110
111 let filename = format!("trace_{}.bin", trace_id.as_uuid());
112 let path = config.directory.join(&filename);
113
114 let file = OpenOptions::new()
116 .read(true)
117 .write(true)
118 .create(true)
119 .truncate(true)
120 .open(&path)
121 .map_err(|e| XervError::ArenaCreate {
122 path: path.clone(),
123 cause: e.to_string(),
124 })?;
125
126 file.try_lock_exclusive()
128 .map_err(|e| XervError::ArenaCreate {
129 path: path.clone(),
130 cause: format!("Failed to lock file: {}", e),
131 })?;
132
133 file.set_len(config.capacity)
135 .map_err(|e| XervError::ArenaCreate {
136 path: path.clone(),
137 cause: e.to_string(),
138 })?;
139
140 let mut mmap = unsafe {
142 MmapOptions::new()
143 .len(config.capacity as usize)
144 .map_mut(&file)
145 .map_err(|e| XervError::ArenaMmap {
146 path: path.clone(),
147 cause: e.to_string(),
148 })?
149 };
150
151 let header = ArenaHeader::new(trace_id, config.capacity);
153 let header_bytes = header.to_bytes().map_err(|e| XervError::ArenaCreate {
154 path: path.clone(),
155 cause: e.to_string(),
156 })?;
157
158 mmap[..HEADER_SIZE].copy_from_slice(&header_bytes);
159
160 if config.sync_on_write {
161 mmap.flush().map_err(|e| XervError::ArenaCreate {
162 path: path.clone(),
163 cause: e.to_string(),
164 })?;
165 }
166
167 let inner = ArenaInner {
168 mmap,
169 file,
170 path,
171 write_pos: AtomicU64::new(header.write_pos.as_u64()),
172 header,
173 sync_on_write: config.sync_on_write,
174 capacity: config.capacity,
175 };
176
177 Ok(Self {
178 inner: Arc::new(RwLock::new(inner)),
179 trace_id,
180 })
181 }
182
183 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
185 let path = path.as_ref().to_path_buf();
186
187 let file = OpenOptions::new()
188 .read(true)
189 .write(true)
190 .open(&path)
191 .map_err(|e| XervError::ArenaCreate {
192 path: path.clone(),
193 cause: e.to_string(),
194 })?;
195
196 file.try_lock_exclusive()
198 .map_err(|e| XervError::ArenaCreate {
199 path: path.clone(),
200 cause: format!("Failed to lock file: {}", e),
201 })?;
202
203 let metadata = file.metadata().map_err(|e| XervError::ArenaCreate {
204 path: path.clone(),
205 cause: e.to_string(),
206 })?;
207
208 let capacity = metadata.len();
209
210 let mmap = unsafe {
212 MmapOptions::new()
213 .len(capacity as usize)
214 .map_mut(&file)
215 .map_err(|e| XervError::ArenaMmap {
216 path: path.clone(),
217 cause: e.to_string(),
218 })?
219 };
220
221 let header = ArenaHeader::from_bytes(&mmap[..HEADER_SIZE]).map_err(|e| {
223 XervError::ArenaCorruption {
224 offset: ArenaOffset::new(0),
225 cause: e.to_string(),
226 }
227 })?;
228
229 header.validate().map_err(|e| XervError::ArenaCorruption {
230 offset: ArenaOffset::new(0),
231 cause: e.to_string(),
232 })?;
233
234 let trace_id = header.trace_id;
235
236 let inner = ArenaInner {
237 mmap,
238 file,
239 path,
240 write_pos: AtomicU64::new(header.write_pos.as_u64()),
241 header,
242 sync_on_write: false,
243 capacity,
244 };
245
246 Ok(Self {
247 inner: Arc::new(RwLock::new(inner)),
248 trace_id,
249 })
250 }
251
252 pub fn trace_id(&self) -> TraceId {
254 self.trace_id
255 }
256
257 pub fn write_position(&self) -> ArenaOffset {
259 let inner = self.inner.read();
260 ArenaOffset::new(inner.write_pos.load(Ordering::Acquire))
261 }
262
263 pub fn available_space(&self) -> u64 {
265 let inner = self.inner.read();
266 let write_pos = inner.write_pos.load(Ordering::Acquire);
267 inner.capacity.saturating_sub(write_pos)
268 }
269
270 pub fn path(&self) -> PathBuf {
272 self.inner.read().path.clone()
273 }
274
275 pub fn write_bytes<T>(&self, bytes: &[u8]) -> Result<RelPtr<T>> {
277 let mut inner = self.inner.write();
278
279 let size = bytes.len();
281 let aligned_size = (size + ENTRY_ALIGNMENT - 1) & !(ENTRY_ALIGNMENT - 1);
282
283 let write_pos = inner.write_pos.load(Ordering::Acquire);
285 let new_pos = write_pos + aligned_size as u64;
286
287 if new_pos > inner.capacity {
288 return Err(XervError::ArenaCapacity {
289 requested: aligned_size as u64,
290 available: inner.capacity - write_pos,
291 });
292 }
293
294 let offset = ArenaOffset::new(write_pos);
296 inner.mmap[write_pos as usize..write_pos as usize + size].copy_from_slice(bytes);
297
298 if aligned_size > size {
300 inner.mmap[write_pos as usize + size..write_pos as usize + aligned_size].fill(0);
301 }
302
303 inner.write_pos.store(new_pos, Ordering::Release);
305
306 if inner.sync_on_write {
308 inner.mmap.flush().map_err(|e| XervError::ArenaWrite {
309 trace_id: self.trace_id,
310 offset,
311 cause: e.to_string(),
312 })?;
313 }
314
315 Ok(RelPtr::new(offset, size as u32))
316 }
317
318 pub fn read_bytes(&self, offset: ArenaOffset, size: usize) -> Result<Vec<u8>> {
320 let inner = self.inner.read();
321
322 let off = offset.as_u64() as usize;
323 let write_pos = inner.write_pos.load(Ordering::Acquire) as usize;
324
325 if off + size > write_pos {
327 return Err(XervError::ArenaInvalidOffset {
328 offset,
329 cause: format!(
330 "Offset {} + size {} exceeds write position {}",
331 off, size, write_pos
332 ),
333 });
334 }
335
336 Ok(inner.mmap[off..off + size].to_vec())
337 }
338
339 pub fn flush(&self) -> Result<()> {
341 let inner = self.inner.read();
342 inner.mmap.flush().map_err(|e| XervError::ArenaWrite {
343 trace_id: self.trace_id,
344 offset: self.write_position(),
345 cause: e.to_string(),
346 })
347 }
348
349 fn update_header(&self) -> Result<()> {
351 let mut inner = self.inner.write();
352 inner.header.write_pos = ArenaOffset::new(inner.write_pos.load(Ordering::Acquire));
353
354 let header_bytes = inner.header.to_bytes().map_err(|e| XervError::ArenaWrite {
355 trace_id: self.trace_id,
356 offset: ArenaOffset::new(0),
357 cause: e.to_string(),
358 })?;
359
360 inner.mmap[..HEADER_SIZE].copy_from_slice(&header_bytes);
361
362 if inner.sync_on_write {
363 inner.mmap.flush().map_err(|e| XervError::ArenaWrite {
364 trace_id: self.trace_id,
365 offset: ArenaOffset::new(0),
366 cause: e.to_string(),
367 })?;
368 }
369
370 Ok(())
371 }
372
373 pub fn reader(&self) -> ArenaReader {
375 ArenaReader {
376 inner: Arc::clone(&self.inner),
377 }
378 }
379
380 pub fn writer(&self) -> ArenaWriter {
382 ArenaWriter {
383 inner: Arc::clone(&self.inner),
384 trace_id: self.trace_id,
385 }
386 }
387}
388
389impl Drop for Arena {
390 fn drop(&mut self) {
391 let _ = self.update_header();
393
394 if let Some(inner) = Arc::get_mut(&mut self.inner) {
396 let inner = inner.get_mut();
397 let _ = inner.file.unlock();
398 }
399 }
400}
401
402#[derive(Clone)]
406pub struct ArenaReader {
407 inner: Arc<RwLock<ArenaInner>>,
408}
409
410impl ArenaReader {
411 pub fn read_bytes(&self, offset: ArenaOffset, size: usize) -> Result<Vec<u8>> {
413 let inner = self.inner.read();
414
415 let off = offset.as_u64() as usize;
416 let write_pos = inner.write_pos.load(Ordering::Acquire) as usize;
417
418 if off + size > write_pos {
419 return Err(XervError::ArenaInvalidOffset {
420 offset,
421 cause: format!(
422 "Offset {} + size {} exceeds write position {}",
423 off, size, write_pos
424 ),
425 });
426 }
427
428 Ok(inner.mmap[off..off + size].to_vec())
429 }
430
431 pub fn write_position(&self) -> ArenaOffset {
433 let inner = self.inner.read();
434 ArenaOffset::new(inner.write_pos.load(Ordering::Acquire))
435 }
436}
437
438pub struct ArenaWriter {
442 inner: Arc<RwLock<ArenaInner>>,
443 trace_id: TraceId,
444}
445
446impl ArenaWriter {
447 pub fn write_bytes<T>(&self, bytes: &[u8]) -> Result<RelPtr<T>> {
449 let mut inner = self.inner.write();
450
451 let size = bytes.len();
452 let aligned_size = (size + ENTRY_ALIGNMENT - 1) & !(ENTRY_ALIGNMENT - 1);
453
454 let write_pos = inner.write_pos.load(Ordering::Acquire);
455 let new_pos = write_pos + aligned_size as u64;
456
457 if new_pos > inner.capacity {
458 return Err(XervError::ArenaCapacity {
459 requested: aligned_size as u64,
460 available: inner.capacity - write_pos,
461 });
462 }
463
464 let offset = ArenaOffset::new(write_pos);
465 inner.mmap[write_pos as usize..write_pos as usize + size].copy_from_slice(bytes);
466
467 if aligned_size > size {
468 inner.mmap[write_pos as usize + size..write_pos as usize + aligned_size].fill(0);
469 }
470
471 inner.write_pos.store(new_pos, Ordering::Release);
472
473 if inner.sync_on_write {
474 inner.mmap.flush().map_err(|e| XervError::ArenaWrite {
475 trace_id: self.trace_id,
476 offset,
477 cause: e.to_string(),
478 })?;
479 }
480
481 Ok(RelPtr::new(offset, size as u32))
482 }
483
484 pub fn flush(&self) -> Result<()> {
486 let inner = self.inner.read();
487 inner.mmap.flush().map_err(|e| XervError::ArenaWrite {
488 trace_id: self.trace_id,
489 offset: ArenaOffset::new(inner.write_pos.load(Ordering::Acquire)),
490 cause: e.to_string(),
491 })
492 }
493
494 pub fn write_position(&self) -> ArenaOffset {
496 let inner = self.inner.read();
497 ArenaOffset::new(inner.write_pos.load(Ordering::Acquire))
498 }
499}
500
501#[cfg(test)]
502mod tests {
503 use super::*;
504 use tempfile::tempdir;
505
506 #[test]
507 fn arena_create_and_write() {
508 let dir = tempdir().unwrap();
509 let config = ArenaConfig::default()
510 .with_capacity(1024 * 1024)
511 .with_directory(dir.path());
512
513 let trace_id = TraceId::new();
514 let arena = Arena::create(trace_id, &config).unwrap();
515
516 assert!(arena.path().exists());
517 assert_eq!(arena.trace_id(), trace_id);
518 }
519
520 #[test]
521 fn arena_write_and_read_bytes() {
522 let dir = tempdir().unwrap();
523 let config = ArenaConfig::default()
524 .with_capacity(1024 * 1024)
525 .with_directory(dir.path());
526
527 let arena = Arena::create(TraceId::new(), &config).unwrap();
528
529 let data = b"Hello, XERV!";
530 let ptr: RelPtr<()> = arena.write_bytes(data).unwrap();
531 assert!(!ptr.is_null());
532
533 let read_back = arena.read_bytes(ptr.offset(), ptr.size() as usize).unwrap();
534 assert_eq!(&read_back, data);
535 }
536
537 #[test]
538 fn arena_multiple_writes() {
539 let dir = tempdir().unwrap();
540 let config = ArenaConfig::default()
541 .with_capacity(1024 * 1024)
542 .with_directory(dir.path());
543
544 let arena = Arena::create(TraceId::new(), &config).unwrap();
545
546 let mut ptrs = Vec::new();
547 for i in 0..100 {
548 let data = format!("item_{}", i);
549 let ptr: RelPtr<()> = arena.write_bytes(data.as_bytes()).unwrap();
550 ptrs.push((ptr, data));
551 }
552
553 for (ptr, expected) in &ptrs {
555 let read_back = arena.read_bytes(ptr.offset(), ptr.size() as usize).unwrap();
556 assert_eq!(String::from_utf8(read_back).unwrap(), *expected);
557 }
558 }
559
560 #[test]
561 fn arena_capacity_check() {
562 let dir = tempdir().unwrap();
563 let config = ArenaConfig::default()
564 .with_capacity(256) .with_directory(dir.path());
566
567 let arena = Arena::create(TraceId::new(), &config).unwrap();
568
569 let data = "a".repeat(100);
571
572 let mut writes = 0;
574 while arena.write_bytes::<()>(data.as_bytes()).is_ok() {
575 writes += 1;
576 if writes > 10 {
577 break; }
579 }
580
581 assert!(writes < 10);
583 }
584
585 #[test]
586 fn arena_reader_writer() {
587 let dir = tempdir().unwrap();
588 let config = ArenaConfig::default()
589 .with_capacity(1024 * 1024)
590 .with_directory(dir.path());
591
592 let arena = Arena::create(TraceId::new(), &config).unwrap();
593 let writer = arena.writer();
594 let reader = arena.reader();
595
596 let data = b"shared data";
597 let ptr: RelPtr<()> = writer.write_bytes(data).unwrap();
598
599 let read_back = reader
600 .read_bytes(ptr.offset(), ptr.size() as usize)
601 .unwrap();
602 assert_eq!(&read_back, data);
603 }
604}