1use memmap2::Mmap;
2use serde::{Serialize, de::DeserializeOwned};
3use std::{
4 fs::File,
5 io::{BufWriter, Write},
6 marker::PhantomData,
7 path::Path,
8};
9use tempfile::TempPath;
10
11use super::OwnedPayloadStore;
12use super::traits::{PayloadBuilderOps, open_mmap};
13use crate::error::{Result, RingDbError};
14use crate::persist::{move_file, read_u64_file, write_u64_file};
15
16pub struct SerdeStoreBuilder<T> {
19 writer: BufWriter<File>,
20 temp_path: TempPath,
21 offsets: Vec<u64>,
22 cursor: u64,
23 _marker: PhantomData<T>,
24}
25
26impl<T: Serialize> SerdeStoreBuilder<T> {
27 pub fn new() -> Result<Self> {
28 let named = tempfile::NamedTempFile::new()?;
29 let (file, temp_path) = named.into_parts();
30 Ok(Self {
31 writer: BufWriter::new(file),
32 temp_path,
33 offsets: vec![0u64],
34 cursor: 0,
35 _marker: PhantomData,
36 })
37 }
38
39 fn push_inner(&mut self, payload: T) -> Result<()> {
40 let bytes =
41 bincode::serialize(&payload).map_err(|e| RingDbError::Payload(e.to_string()))?;
42 self.writer.write_all(&bytes)?;
43 self.cursor += bytes.len() as u64;
44 self.offsets.push(self.cursor);
45 Ok(())
46 }
47
48 fn finish_inner(self) -> Result<SerdeStore<T>> {
49 let Self {
50 writer,
51 temp_path,
52 offsets,
53 cursor,
54 _marker,
55 } = self;
56 writer.into_inner().map_err(|e| e.into_error())?;
57 let mmap = open_mmap(temp_path.as_ref(), cursor)?;
58 Ok(SerdeStore {
59 mmap,
60 offsets,
61 _marker: PhantomData,
62 })
63 }
64
65 fn finish_persisted_inner(
66 self,
67 payloads_path: &Path,
68 offsets_path: &Path,
69 ) -> Result<SerdeStore<T>> {
70 let Self {
71 writer,
72 temp_path,
73 offsets,
74 cursor,
75 _marker,
76 } = self;
77 writer.into_inner().map_err(|e| e.into_error())?;
78 write_u64_file(offsets_path, &offsets)?;
79 move_file(temp_path.as_ref(), payloads_path)?;
82 let mmap = open_mmap(payloads_path, cursor)?;
83 Ok(SerdeStore {
84 mmap,
85 offsets,
86 _marker: PhantomData,
87 })
88 }
89}
90
91impl<T: Serialize> PayloadBuilderOps<T> for SerdeStoreBuilder<T> {
92 type Store = SerdeStore<T>;
93
94 fn push(&mut self, payload: T) -> Result<()> {
95 self.push_inner(payload)
96 }
97
98 fn finish(self) -> Result<SerdeStore<T>> {
99 self.finish_inner()
100 }
101
102 fn finish_persisted(self, payloads_path: &Path, offsets_path: &Path) -> Result<SerdeStore<T>> {
103 self.finish_persisted_inner(payloads_path, offsets_path)
104 }
105}
106
107pub struct SerdeStore<T> {
110 mmap: Option<Mmap>,
111 offsets: Vec<u64>,
112 _marker: PhantomData<T>,
113}
114
115impl<T: DeserializeOwned> SerdeStore<T> {
116 pub fn load(payloads_path: &Path, offsets_path: &Path) -> Result<Self> {
117 let offsets = read_u64_file(offsets_path)?;
118 let total_bytes = offsets.last().copied().unwrap_or(0);
119 let mmap = open_mmap(payloads_path, total_bytes)?;
120 Ok(SerdeStore {
121 mmap,
122 offsets,
123 _marker: PhantomData,
124 })
125 }
126
127 fn fetch_inner(&self, id: u32) -> Result<T> {
128 let idx = id as usize;
129 let start = self.offsets[idx] as usize;
130 let end = self.offsets[idx + 1] as usize;
131 let bytes = match &self.mmap {
132 Some(mmap) => &mmap[start..end],
133 None => &[],
134 };
135 bincode::deserialize(bytes).map_err(|e| RingDbError::Payload(e.to_string()))
136 }
137}
138
139impl<T: DeserializeOwned> OwnedPayloadStore<T> for SerdeStore<T> {
140 fn fetch_owned(&self, id: u32) -> Result<T> {
141 self.fetch_inner(id)
142 }
143}