Skip to main content

rta/
lib.rs

1use core::{marker::PhantomData, slice};
2use frozen_core::{
3    fe::FRes,
4    ff::{FFCfg, FF},
5    fm::{FMCfg, FM},
6};
7
8const MOD_ID: u8 = 0x01;
9const FLUSH_DURATION: std::time::Duration = std::time::Duration::from_millis(250);
10
11pub use rta_derive::RTA;
12
13pub unsafe trait RTA: Clone + Sized + Default {
14    const HASH: u64;
15}
16
17pub struct Rta<T: RTA> {
18    mmap: FM,
19    _file: FF,
20    _marker: PhantomData<T>,
21    lock: std::sync::Mutex<()>,
22}
23
24impl<T> Rta<T>
25where
26    T: RTA + Clone + Sized + Default,
27{
28    const FILE_SIZE: usize = core::mem::size_of::<DiskInterface<T>>();
29
30    pub fn new(path: std::path::PathBuf) -> FRes<Self> {
31        if path.exists() {
32            panic!("invalid path, path to already existing file");
33        }
34
35        if path.is_dir() {
36            panic!("path must be of a file, not dir");
37        }
38
39        let file_cfg = FFCfg {
40            path,
41            auto_flush: false,
42            module_id: MOD_ID,
43            flush_duration: FLUSH_DURATION,
44        };
45        let mmap_cfg = FMCfg {
46            module_id: MOD_ID,
47            auto_flush: true,
48            flush_duration: FLUSH_DURATION,
49        };
50
51        let _file = FF::new(file_cfg, Self::FILE_SIZE as u64)?;
52        let mmap = FM::new(_file.fd(), Self::FILE_SIZE, mmap_cfg)?;
53
54        {
55            let writer = mmap.writer::<DiskInterface<T>>(0)?;
56            writer.write(|di| {
57                di.hash = T::HASH;
58
59                di.obja.obj = T::default();
60                di.obja.ver = 1;
61                di.obja.crc = crc32(Self::to_bytes(&di.obja.obj));
62
63                di.objb = di.obja.clone();
64            })?;
65        }
66
67        Ok(Self {
68            _file,
69            mmap,
70            _marker: PhantomData,
71            lock: std::sync::Mutex::new(()),
72        })
73    }
74
75    pub fn open(path: std::path::PathBuf) -> FRes<Self> {
76        if !path.exists() {
77            panic!("Rta does not exists");
78        }
79
80        if !path.is_file() {
81            panic!("Path is not a file");
82        }
83
84        let file_cfg = FFCfg {
85            path,
86            auto_flush: false,
87            module_id: MOD_ID,
88            flush_duration: FLUSH_DURATION,
89        };
90        let mmap_cfg = FMCfg {
91            module_id: MOD_ID,
92            auto_flush: true,
93            flush_duration: FLUSH_DURATION,
94        };
95
96        let _file = FF::open(file_cfg)?;
97        let mmap = FM::new(_file.fd(), Self::FILE_SIZE, mmap_cfg)?;
98
99        {
100            let r = mmap.reader::<DiskInterface<T>>(0)?;
101            r.read(|di| {
102                if di.hash != T::HASH {
103                    panic!("metadata hash mismatch");
104                }
105
106                let a = Self::valid(&di.obja);
107                let b = Self::valid(&di.objb);
108
109                if !a && !b {
110                    panic!("both metadata copies corrupt");
111                }
112            });
113        }
114
115        Ok(Self {
116            _file,
117            mmap,
118            _marker: PhantomData,
119            lock: std::sync::Mutex::new(()),
120        })
121    }
122
123    pub fn size() -> usize {
124        core::mem::size_of::<T>()
125    }
126
127    pub fn hash() -> u64 {
128        T::HASH
129    }
130
131    #[inline(always)]
132    pub fn read(&self) -> FRes<T> {
133        let r = self.mmap.reader::<DiskInterface<T>>(0)?;
134        let val = r.read(|di| {
135            let a_valid = Self::valid(&di.obja);
136            let b_valid = Self::valid(&di.objb);
137
138            match (a_valid, b_valid) {
139                (true, true) => {
140                    if di.obja.ver >= di.objb.ver {
141                        di.obja.obj.clone()
142                    } else {
143                        di.objb.obj.clone()
144                    }
145                }
146                (true, false) => di.obja.obj.clone(),
147                (false, true) => di.objb.obj.clone(),
148                (false, false) => panic!("both metadata copies corrupt"),
149            }
150        });
151
152        Ok(val)
153    }
154
155    #[inline(always)]
156    pub fn write(&self, new_val: &T) -> FRes<()> {
157        let _g = self.lock.lock().unwrap();
158        let w = self.mmap.writer::<DiskInterface<T>>(0)?;
159
160        w.write(|di| {
161            let max_ver = di.obja.ver.max(di.objb.ver);
162            let target = Self::select_oldest_mut(di);
163
164            target.obj = new_val.clone();
165            target.ver = max_ver.wrapping_add(1);
166            target.crc = crc32(Self::to_bytes(&target.obj));
167        })?;
168
169        Ok(())
170    }
171
172    #[inline]
173    fn to_bytes(t: &T) -> &[u8] {
174        unsafe { slice::from_raw_parts(t as *const T as *const u8, Self::size()) }
175    }
176
177    #[inline]
178    fn select_oldest_mut(di: &mut DiskInterface<T>) -> &mut DiskObject<T> {
179        if di.obja.ver <= di.objb.ver {
180            &mut di.obja
181        } else {
182            &mut di.objb
183        }
184    }
185
186    #[inline]
187    fn valid(obj: &DiskObject<T>) -> bool {
188        crc32(Self::to_bytes(&obj.obj)) == obj.crc
189    }
190}
191
192#[inline]
193fn crc32(bytes: &[u8]) -> u32 {
194    const POLY: u32 = 0x82F63B78;
195    let mut crc: u32 = 0;
196
197    // hardware streaming CRC32C (x86 only) (Castagnoli, reflected)
198    if std::is_x86_feature_detected!("sse4.2") {
199        use core::arch::x86_64::{_mm_crc32_u64, _mm_crc32_u8};
200
201        unsafe {
202            let mut ptr = bytes.as_ptr();
203            let mut len = bytes.len();
204
205            while len >= 8 {
206                let val = core::ptr::read_unaligned(ptr as *const u64);
207                crc = _mm_crc32_u64(crc as u64, val) as u32;
208                ptr = ptr.add(8);
209                len -= 8;
210            }
211
212            while len > 0 {
213                crc = _mm_crc32_u8(crc, *ptr);
214                ptr = ptr.add(1);
215                len -= 1;
216            }
217
218            return crc;
219        }
220    }
221
222    // CRC32C fallback
223    for &b in bytes {
224        crc ^= b as u32;
225        for _ in 0..8 {
226            if crc & 1 != 0 {
227                crc = (crc >> 1) ^ POLY;
228            } else {
229                crc >>= 1;
230            }
231        }
232    }
233
234    crc
235}
236
237#[repr(C)]
238struct DiskInterface<T: RTA> {
239    hash: u64,
240    obja: DiskObject<T>,
241    objb: DiskObject<T>,
242}
243
244#[repr(C)]
245#[derive(Clone)]
246struct DiskObject<T: RTA> {
247    obj: T,
248    ver: u32,
249    crc: u32,
250}