xerv_core/arena/
writer.rs

1//! Arena read/write implementation.
2
3use super::header::{ArenaHeader, HEADER_SIZE};
4use crate::error::{Result, XervError};
5use crate::types::{ArenaOffset, RelPtr, TraceId};
6use fs2::FileExt;
7use memmap2::{MmapMut, MmapOptions};
8use parking_lot::RwLock;
9use std::fs::{File, OpenOptions};
10use std::path::{Path, PathBuf};
11use std::sync::Arc;
12use std::sync::atomic::{AtomicU64, Ordering};
13
14/// Default arena size: 16 MB.
15pub const DEFAULT_ARENA_SIZE: u64 = 16 * 1024 * 1024;
16
17/// Maximum arena size: 4 GB.
18pub const MAX_ARENA_SIZE: u64 = 4 * 1024 * 1024 * 1024;
19
20/// Minimum entry alignment (8 bytes for rkyv).
21pub const ENTRY_ALIGNMENT: usize = 8;
22
23/// Configuration for arena creation.
24#[derive(Debug, Clone)]
25pub struct ArenaConfig {
26    /// Initial capacity in bytes.
27    pub capacity: u64,
28    /// Directory for arena files.
29    pub directory: PathBuf,
30    /// Whether to sync writes to disk immediately.
31    pub sync_on_write: bool,
32}
33
34impl Default for ArenaConfig {
35    fn default() -> Self {
36        Self {
37            capacity: DEFAULT_ARENA_SIZE,
38            directory: PathBuf::from("/tmp/xerv"),
39            sync_on_write: false,
40        }
41    }
42}
43
44impl ArenaConfig {
45    /// Create an in-memory configuration for testing.
46    ///
47    /// Uses a temporary directory with a unique name per invocation.
48    pub fn in_memory() -> Self {
49        Self {
50            capacity: 4 * 1024 * 1024, // 4 MB for tests
51            directory: std::env::temp_dir().join(format!("xerv_arena_{}", uuid::Uuid::new_v4())),
52            sync_on_write: false,
53        }
54    }
55
56    /// Create config with custom capacity.
57    pub fn with_capacity(mut self, capacity: u64) -> Self {
58        self.capacity = capacity.min(MAX_ARENA_SIZE);
59        self
60    }
61
62    /// Create config with custom directory.
63    pub fn with_directory(mut self, directory: impl Into<PathBuf>) -> Self {
64        self.directory = directory.into();
65        self
66    }
67
68    /// Enable sync on write for durability.
69    pub fn with_sync(mut self, sync: bool) -> Self {
70        self.sync_on_write = sync;
71        self
72    }
73}
74
75/// Shared arena state.
76struct ArenaInner {
77    /// The memory-mapped file.
78    mmap: MmapMut,
79    /// The underlying file handle.
80    file: File,
81    /// Path to the arena file.
82    path: PathBuf,
83    /// Current write position (atomic for lock-free reads).
84    write_pos: AtomicU64,
85    /// Header information.
86    header: ArenaHeader,
87    /// Whether to sync on write.
88    sync_on_write: bool,
89    /// Capacity.
90    capacity: u64,
91}
92
93/// A memory-mapped arena for zero-copy data storage.
94///
95/// The `Arena` is the owner of the mmap and provides both read and write access.
96/// For concurrent access patterns, use `ArenaReader` and `ArenaWriter`.
97pub struct Arena {
98    inner: Arc<RwLock<ArenaInner>>,
99    trace_id: TraceId,
100}
101
102impl Arena {
103    /// Create a new arena for the given trace.
104    pub fn create(trace_id: TraceId, config: &ArenaConfig) -> Result<Self> {
105        // Ensure directory exists
106        std::fs::create_dir_all(&config.directory).map_err(|e| XervError::ArenaCreate {
107            path: config.directory.clone(),
108            cause: e.to_string(),
109        })?;
110
111        let filename = format!("trace_{}.bin", trace_id.as_uuid());
112        let path = config.directory.join(&filename);
113
114        // Create and size the file
115        let file = OpenOptions::new()
116            .read(true)
117            .write(true)
118            .create(true)
119            .truncate(true)
120            .open(&path)
121            .map_err(|e| XervError::ArenaCreate {
122                path: path.clone(),
123                cause: e.to_string(),
124            })?;
125
126        // Acquire exclusive lock
127        file.try_lock_exclusive()
128            .map_err(|e| XervError::ArenaCreate {
129                path: path.clone(),
130                cause: format!("Failed to lock file: {}", e),
131            })?;
132
133        // Set file size
134        file.set_len(config.capacity)
135            .map_err(|e| XervError::ArenaCreate {
136                path: path.clone(),
137                cause: e.to_string(),
138            })?;
139
140        // Memory map the file
141        let mut mmap = unsafe {
142            MmapOptions::new()
143                .len(config.capacity as usize)
144                .map_mut(&file)
145                .map_err(|e| XervError::ArenaMmap {
146                    path: path.clone(),
147                    cause: e.to_string(),
148                })?
149        };
150
151        // Write header
152        let header = ArenaHeader::new(trace_id, config.capacity);
153        let header_bytes = header.to_bytes().map_err(|e| XervError::ArenaCreate {
154            path: path.clone(),
155            cause: e.to_string(),
156        })?;
157
158        mmap[..HEADER_SIZE].copy_from_slice(&header_bytes);
159
160        if config.sync_on_write {
161            mmap.flush().map_err(|e| XervError::ArenaCreate {
162                path: path.clone(),
163                cause: e.to_string(),
164            })?;
165        }
166
167        let inner = ArenaInner {
168            mmap,
169            file,
170            path,
171            write_pos: AtomicU64::new(header.write_pos.as_u64()),
172            header,
173            sync_on_write: config.sync_on_write,
174            capacity: config.capacity,
175        };
176
177        Ok(Self {
178            inner: Arc::new(RwLock::new(inner)),
179            trace_id,
180        })
181    }
182
183    /// Open an existing arena file.
184    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
185        let path = path.as_ref().to_path_buf();
186
187        let file = OpenOptions::new()
188            .read(true)
189            .write(true)
190            .open(&path)
191            .map_err(|e| XervError::ArenaCreate {
192                path: path.clone(),
193                cause: e.to_string(),
194            })?;
195
196        // Acquire exclusive lock
197        file.try_lock_exclusive()
198            .map_err(|e| XervError::ArenaCreate {
199                path: path.clone(),
200                cause: format!("Failed to lock file: {}", e),
201            })?;
202
203        let metadata = file.metadata().map_err(|e| XervError::ArenaCreate {
204            path: path.clone(),
205            cause: e.to_string(),
206        })?;
207
208        let capacity = metadata.len();
209
210        // Memory map the file
211        let mmap = unsafe {
212            MmapOptions::new()
213                .len(capacity as usize)
214                .map_mut(&file)
215                .map_err(|e| XervError::ArenaMmap {
216                    path: path.clone(),
217                    cause: e.to_string(),
218                })?
219        };
220
221        // Read and validate header
222        let header = ArenaHeader::from_bytes(&mmap[..HEADER_SIZE]).map_err(|e| {
223            XervError::ArenaCorruption {
224                offset: ArenaOffset::new(0),
225                cause: e.to_string(),
226            }
227        })?;
228
229        header.validate().map_err(|e| XervError::ArenaCorruption {
230            offset: ArenaOffset::new(0),
231            cause: e.to_string(),
232        })?;
233
234        let trace_id = header.trace_id;
235
236        let inner = ArenaInner {
237            mmap,
238            file,
239            path,
240            write_pos: AtomicU64::new(header.write_pos.as_u64()),
241            header,
242            sync_on_write: false,
243            capacity,
244        };
245
246        Ok(Self {
247            inner: Arc::new(RwLock::new(inner)),
248            trace_id,
249        })
250    }
251
252    /// Get the trace ID for this arena.
253    pub fn trace_id(&self) -> TraceId {
254        self.trace_id
255    }
256
257    /// Get the current write position.
258    pub fn write_position(&self) -> ArenaOffset {
259        let inner = self.inner.read();
260        ArenaOffset::new(inner.write_pos.load(Ordering::Acquire))
261    }
262
263    /// Get available space in bytes.
264    pub fn available_space(&self) -> u64 {
265        let inner = self.inner.read();
266        let write_pos = inner.write_pos.load(Ordering::Acquire);
267        inner.capacity.saturating_sub(write_pos)
268    }
269
270    /// Get the path to the arena file.
271    pub fn path(&self) -> PathBuf {
272        self.inner.read().path.clone()
273    }
274
275    /// Write serialized bytes to the arena and return a relative pointer.
276    pub fn write_bytes<T>(&self, bytes: &[u8]) -> Result<RelPtr<T>> {
277        let mut inner = self.inner.write();
278
279        // Calculate aligned size
280        let size = bytes.len();
281        let aligned_size = (size + ENTRY_ALIGNMENT - 1) & !(ENTRY_ALIGNMENT - 1);
282
283        // Check capacity
284        let write_pos = inner.write_pos.load(Ordering::Acquire);
285        let new_pos = write_pos + aligned_size as u64;
286
287        if new_pos > inner.capacity {
288            return Err(XervError::ArenaCapacity {
289                requested: aligned_size as u64,
290                available: inner.capacity - write_pos,
291            });
292        }
293
294        // Write the data
295        let offset = ArenaOffset::new(write_pos);
296        inner.mmap[write_pos as usize..write_pos as usize + size].copy_from_slice(bytes);
297
298        // Zero padding for alignment
299        if aligned_size > size {
300            inner.mmap[write_pos as usize + size..write_pos as usize + aligned_size].fill(0);
301        }
302
303        // Update write position
304        inner.write_pos.store(new_pos, Ordering::Release);
305
306        // Sync if configured
307        if inner.sync_on_write {
308            inner.mmap.flush().map_err(|e| XervError::ArenaWrite {
309                trace_id: self.trace_id,
310                offset,
311                cause: e.to_string(),
312            })?;
313        }
314
315        Ok(RelPtr::new(offset, size as u32))
316    }
317
318    /// Read bytes from the arena.
319    pub fn read_bytes(&self, offset: ArenaOffset, size: usize) -> Result<Vec<u8>> {
320        let inner = self.inner.read();
321
322        let off = offset.as_u64() as usize;
323        let write_pos = inner.write_pos.load(Ordering::Acquire) as usize;
324
325        // Validate offset and size
326        if off + size > write_pos {
327            return Err(XervError::ArenaInvalidOffset {
328                offset,
329                cause: format!(
330                    "Offset {} + size {} exceeds write position {}",
331                    off, size, write_pos
332                ),
333            });
334        }
335
336        Ok(inner.mmap[off..off + size].to_vec())
337    }
338
339    /// Flush all pending writes to disk.
340    pub fn flush(&self) -> Result<()> {
341        let inner = self.inner.read();
342        inner.mmap.flush().map_err(|e| XervError::ArenaWrite {
343            trace_id: self.trace_id,
344            offset: self.write_position(),
345            cause: e.to_string(),
346        })
347    }
348
349    /// Update the header on disk (e.g., after writing config).
350    fn update_header(&self) -> Result<()> {
351        let mut inner = self.inner.write();
352        inner.header.write_pos = ArenaOffset::new(inner.write_pos.load(Ordering::Acquire));
353
354        let header_bytes = inner.header.to_bytes().map_err(|e| XervError::ArenaWrite {
355            trace_id: self.trace_id,
356            offset: ArenaOffset::new(0),
357            cause: e.to_string(),
358        })?;
359
360        inner.mmap[..HEADER_SIZE].copy_from_slice(&header_bytes);
361
362        if inner.sync_on_write {
363            inner.mmap.flush().map_err(|e| XervError::ArenaWrite {
364                trace_id: self.trace_id,
365                offset: ArenaOffset::new(0),
366                cause: e.to_string(),
367            })?;
368        }
369
370        Ok(())
371    }
372
373    /// Create a reader handle for this arena.
374    pub fn reader(&self) -> ArenaReader {
375        ArenaReader {
376            inner: Arc::clone(&self.inner),
377        }
378    }
379
380    /// Create a writer handle for this arena.
381    pub fn writer(&self) -> ArenaWriter {
382        ArenaWriter {
383            inner: Arc::clone(&self.inner),
384            trace_id: self.trace_id,
385        }
386    }
387}
388
389impl Drop for Arena {
390    fn drop(&mut self) {
391        // Update header with final write position before closing
392        let _ = self.update_header();
393
394        // Try to unlock the file
395        if let Some(inner) = Arc::get_mut(&mut self.inner) {
396            let inner = inner.get_mut();
397            let _ = inner.file.unlock();
398        }
399    }
400}
401
402/// A read-only handle to an arena.
403///
404/// Multiple readers can exist concurrently.
405#[derive(Clone)]
406pub struct ArenaReader {
407    inner: Arc<RwLock<ArenaInner>>,
408}
409
410impl ArenaReader {
411    /// Read bytes from the arena.
412    pub fn read_bytes(&self, offset: ArenaOffset, size: usize) -> Result<Vec<u8>> {
413        let inner = self.inner.read();
414
415        let off = offset.as_u64() as usize;
416        let write_pos = inner.write_pos.load(Ordering::Acquire) as usize;
417
418        if off + size > write_pos {
419            return Err(XervError::ArenaInvalidOffset {
420                offset,
421                cause: format!(
422                    "Offset {} + size {} exceeds write position {}",
423                    off, size, write_pos
424                ),
425            });
426        }
427
428        Ok(inner.mmap[off..off + size].to_vec())
429    }
430
431    /// Get the current write position.
432    pub fn write_position(&self) -> ArenaOffset {
433        let inner = self.inner.read();
434        ArenaOffset::new(inner.write_pos.load(Ordering::Acquire))
435    }
436}
437
438/// A write handle to an arena.
439///
440/// Only one writer should be active at a time for correctness.
441pub struct ArenaWriter {
442    inner: Arc<RwLock<ArenaInner>>,
443    trace_id: TraceId,
444}
445
446impl ArenaWriter {
447    /// Write bytes to the arena.
448    pub fn write_bytes<T>(&self, bytes: &[u8]) -> Result<RelPtr<T>> {
449        let mut inner = self.inner.write();
450
451        let size = bytes.len();
452        let aligned_size = (size + ENTRY_ALIGNMENT - 1) & !(ENTRY_ALIGNMENT - 1);
453
454        let write_pos = inner.write_pos.load(Ordering::Acquire);
455        let new_pos = write_pos + aligned_size as u64;
456
457        if new_pos > inner.capacity {
458            return Err(XervError::ArenaCapacity {
459                requested: aligned_size as u64,
460                available: inner.capacity - write_pos,
461            });
462        }
463
464        let offset = ArenaOffset::new(write_pos);
465        inner.mmap[write_pos as usize..write_pos as usize + size].copy_from_slice(bytes);
466
467        if aligned_size > size {
468            inner.mmap[write_pos as usize + size..write_pos as usize + aligned_size].fill(0);
469        }
470
471        inner.write_pos.store(new_pos, Ordering::Release);
472
473        if inner.sync_on_write {
474            inner.mmap.flush().map_err(|e| XervError::ArenaWrite {
475                trace_id: self.trace_id,
476                offset,
477                cause: e.to_string(),
478            })?;
479        }
480
481        Ok(RelPtr::new(offset, size as u32))
482    }
483
484    /// Flush all pending writes to disk.
485    pub fn flush(&self) -> Result<()> {
486        let inner = self.inner.read();
487        inner.mmap.flush().map_err(|e| XervError::ArenaWrite {
488            trace_id: self.trace_id,
489            offset: ArenaOffset::new(inner.write_pos.load(Ordering::Acquire)),
490            cause: e.to_string(),
491        })
492    }
493
494    /// Get the current write position.
495    pub fn write_position(&self) -> ArenaOffset {
496        let inner = self.inner.read();
497        ArenaOffset::new(inner.write_pos.load(Ordering::Acquire))
498    }
499}
500
501#[cfg(test)]
502mod tests {
503    use super::*;
504    use tempfile::tempdir;
505
506    #[test]
507    fn arena_create_and_write() {
508        let dir = tempdir().unwrap();
509        let config = ArenaConfig::default()
510            .with_capacity(1024 * 1024)
511            .with_directory(dir.path());
512
513        let trace_id = TraceId::new();
514        let arena = Arena::create(trace_id, &config).unwrap();
515
516        assert!(arena.path().exists());
517        assert_eq!(arena.trace_id(), trace_id);
518    }
519
520    #[test]
521    fn arena_write_and_read_bytes() {
522        let dir = tempdir().unwrap();
523        let config = ArenaConfig::default()
524            .with_capacity(1024 * 1024)
525            .with_directory(dir.path());
526
527        let arena = Arena::create(TraceId::new(), &config).unwrap();
528
529        let data = b"Hello, XERV!";
530        let ptr: RelPtr<()> = arena.write_bytes(data).unwrap();
531        assert!(!ptr.is_null());
532
533        let read_back = arena.read_bytes(ptr.offset(), ptr.size() as usize).unwrap();
534        assert_eq!(&read_back, data);
535    }
536
537    #[test]
538    fn arena_multiple_writes() {
539        let dir = tempdir().unwrap();
540        let config = ArenaConfig::default()
541            .with_capacity(1024 * 1024)
542            .with_directory(dir.path());
543
544        let arena = Arena::create(TraceId::new(), &config).unwrap();
545
546        let mut ptrs = Vec::new();
547        for i in 0..100 {
548            let data = format!("item_{}", i);
549            let ptr: RelPtr<()> = arena.write_bytes(data.as_bytes()).unwrap();
550            ptrs.push((ptr, data));
551        }
552
553        // Verify all data can be read back
554        for (ptr, expected) in &ptrs {
555            let read_back = arena.read_bytes(ptr.offset(), ptr.size() as usize).unwrap();
556            assert_eq!(String::from_utf8(read_back).unwrap(), *expected);
557        }
558    }
559
560    #[test]
561    fn arena_capacity_check() {
562        let dir = tempdir().unwrap();
563        let config = ArenaConfig::default()
564            .with_capacity(256) // Very small
565            .with_directory(dir.path());
566
567        let arena = Arena::create(TraceId::new(), &config).unwrap();
568
569        // This should eventually fail due to capacity
570        let data = "a".repeat(100);
571
572        // Keep writing until we run out of space
573        let mut writes = 0;
574        while arena.write_bytes::<()>(data.as_bytes()).is_ok() {
575            writes += 1;
576            if writes > 10 {
577                break; // Prevent infinite loop in case of bug
578            }
579        }
580
581        // Should have failed at some point
582        assert!(writes < 10);
583    }
584
585    #[test]
586    fn arena_reader_writer() {
587        let dir = tempdir().unwrap();
588        let config = ArenaConfig::default()
589            .with_capacity(1024 * 1024)
590            .with_directory(dir.path());
591
592        let arena = Arena::create(TraceId::new(), &config).unwrap();
593        let writer = arena.writer();
594        let reader = arena.reader();
595
596        let data = b"shared data";
597        let ptr: RelPtr<()> = writer.write_bytes(data).unwrap();
598
599        let read_back = reader
600            .read_bytes(ptr.offset(), ptr.size() as usize)
601            .unwrap();
602        assert_eq!(&read_back, data);
603    }
604}