Skip to main content

ringdb/payload/
serde.rs

1use memmap2::Mmap;
2use serde::{Serialize, de::DeserializeOwned};
3use std::{
4    fs::File,
5    io::{BufWriter, Write},
6    marker::PhantomData,
7    path::Path,
8};
9use tempfile::TempPath;
10
11use super::OwnedPayloadStore;
12use super::traits::{PayloadBuilderOps, open_mmap};
13use crate::error::{Result, RingDbError};
14use crate::persist::{move_file, read_u64_file, write_u64_file};
15
16// ─── SerdeStoreBuilder ────────────────────────────────────────────────────────
17
18pub struct SerdeStoreBuilder<T> {
19    writer: BufWriter<File>,
20    temp_path: TempPath,
21    offsets: Vec<u64>,
22    cursor: u64,
23    _marker: PhantomData<T>,
24}
25
26impl<T: Serialize> SerdeStoreBuilder<T> {
27    pub fn new() -> Result<Self> {
28        let named = tempfile::NamedTempFile::new()?;
29        let (file, temp_path) = named.into_parts();
30        Ok(Self {
31            writer: BufWriter::new(file),
32            temp_path,
33            offsets: vec![0u64],
34            cursor: 0,
35            _marker: PhantomData,
36        })
37    }
38
39    fn push_inner(&mut self, payload: T) -> Result<()> {
40        let bytes =
41            bincode::serialize(&payload).map_err(|e| RingDbError::Payload(e.to_string()))?;
42        self.writer.write_all(&bytes)?;
43        self.cursor += bytes.len() as u64;
44        self.offsets.push(self.cursor);
45        Ok(())
46    }
47
48    fn finish_inner(self) -> Result<SerdeStore<T>> {
49        let Self {
50            writer,
51            temp_path,
52            offsets,
53            cursor,
54            _marker,
55        } = self;
56        writer.into_inner().map_err(|e| e.into_error())?;
57        let mmap = open_mmap(temp_path.as_ref(), cursor)?;
58        Ok(SerdeStore {
59            mmap,
60            offsets,
61            _marker: PhantomData,
62        })
63    }
64
65    fn finish_persisted_inner(
66        self,
67        payloads_path: &Path,
68        offsets_path: &Path,
69    ) -> Result<SerdeStore<T>> {
70        let Self {
71            writer,
72            temp_path,
73            offsets,
74            cursor,
75            _marker,
76        } = self;
77        writer.into_inner().map_err(|e| e.into_error())?;
78        write_u64_file(offsets_path, &offsets)?;
79        // move_file handles cross-filesystem moves with a copy fallback.
80        // If it fails, temp_path is still alive and its Drop will clean up.
81        move_file(temp_path.as_ref(), payloads_path)?;
82        let mmap = open_mmap(payloads_path, cursor)?;
83        Ok(SerdeStore {
84            mmap,
85            offsets,
86            _marker: PhantomData,
87        })
88    }
89}
90
91impl<T: Serialize> PayloadBuilderOps<T> for SerdeStoreBuilder<T> {
92    type Store = SerdeStore<T>;
93
94    fn push(&mut self, payload: T) -> Result<()> {
95        self.push_inner(payload)
96    }
97
98    fn finish(self) -> Result<SerdeStore<T>> {
99        self.finish_inner()
100    }
101
102    fn finish_persisted(self, payloads_path: &Path, offsets_path: &Path) -> Result<SerdeStore<T>> {
103        self.finish_persisted_inner(payloads_path, offsets_path)
104    }
105}
106
107// ─── SerdeStore ───────────────────────────────────────────────────────────────
108
109pub struct SerdeStore<T> {
110    mmap: Option<Mmap>,
111    offsets: Vec<u64>,
112    _marker: PhantomData<T>,
113}
114
115impl<T: DeserializeOwned> SerdeStore<T> {
116    pub fn load(payloads_path: &Path, offsets_path: &Path) -> Result<Self> {
117        let offsets = read_u64_file(offsets_path)?;
118        let total_bytes = offsets.last().copied().unwrap_or(0);
119        let mmap = open_mmap(payloads_path, total_bytes)?;
120        Ok(SerdeStore {
121            mmap,
122            offsets,
123            _marker: PhantomData,
124        })
125    }
126
127    fn fetch_inner(&self, id: u32) -> Result<T> {
128        let idx = id as usize;
129        let start = self.offsets[idx] as usize;
130        let end = self.offsets[idx + 1] as usize;
131        let bytes = match &self.mmap {
132            Some(mmap) => &mmap[start..end],
133            None => &[],
134        };
135        bincode::deserialize(bytes).map_err(|e| RingDbError::Payload(e.to_string()))
136    }
137}
138
139impl<T: DeserializeOwned> OwnedPayloadStore<T> for SerdeStore<T> {
140    fn fetch_owned(&self, id: u32) -> Result<T> {
141        self.fetch_inner(id)
142    }
143}