graft_kernel/local/fjall_storage/
keys.rs

1use bytes::Bytes;
2use bytestring::ByteString;
3use culprit::{Result, ResultExt};
4use fjall::Slice;
5use graft_core::{
6    PageIdx, SegmentId, VolumeId, cbe::CBE64, lsn::LSN, volume_ref::VolumeRef,
7    zerocopy_ext::TryFromBytesExt,
8};
9use zerocopy::{BigEndian, Immutable, IntoBytes, KnownLayout, TryFromBytes, U32, Unaligned};
10
11use crate::{
12    local::fjall_storage::fjall_repr::{DecodeErr, FjallRepr, FjallReprRef},
13    proxy_to_fjall_repr,
14};
15
16pub trait FjallKeyPrefix {
17    type Prefix: AsRef<[u8]>;
18}
19
20impl FjallReprRef for VolumeId {
21    #[inline]
22    fn as_slice(&self) -> impl AsRef<[u8]> {
23        self.as_bytes()
24    }
25}
26
27impl FjallRepr for VolumeId {
28    #[inline]
29    fn try_from_slice(slice: Slice) -> Result<Self, DecodeErr> {
30        VolumeId::try_from(Bytes::from(slice)).or_into_ctx()
31    }
32}
33
34impl FjallReprRef for ByteString {
35    #[inline]
36    fn as_slice(&self) -> impl AsRef<[u8]> {
37        self
38    }
39}
40
41impl FjallRepr for ByteString {
42    #[inline]
43    fn try_from_slice(slice: Slice) -> Result<Self, DecodeErr> {
44        let bytes: Bytes = slice.into();
45        ByteString::try_from(bytes).or_into_ctx()
46    }
47}
48
49#[derive(IntoBytes, TryFromBytes, KnownLayout, Immutable, Unaligned)]
50#[repr(C)]
51struct SerializedVolumeRef {
52    vid: VolumeId,
53    lsn: CBE64,
54}
55
56impl AsRef<[u8]> for SerializedVolumeRef {
57    #[inline]
58    fn as_ref(&self) -> &[u8] {
59        self.as_bytes()
60    }
61}
62
63impl FjallKeyPrefix for VolumeRef {
64    type Prefix = VolumeId;
65}
66
67proxy_to_fjall_repr!(
68    encode (VolumeRef) using proxy (SerializedVolumeRef)
69    into_proxy(me) {
70        SerializedVolumeRef {
71            vid: me.vid,
72            lsn: me.lsn.into(),
73        }
74    }
75    from_proxy(proxy) {
76        Ok(VolumeRef {
77            vid: proxy.vid.clone(),
78            lsn: LSN::try_from(proxy.lsn)?,
79        })
80    }
81);
82
83/// Key for the `pages` partition
84#[derive(Debug, Clone, PartialEq, Eq, Hash)]
85pub struct PageKey {
86    sid: SegmentId,
87    pageidx: PageIdx,
88}
89
90impl PageKey {
91    #[inline]
92    pub fn new(sid: SegmentId, pageidx: PageIdx) -> Self {
93        Self { sid, pageidx }
94    }
95
96    #[inline]
97    pub fn sid(&self) -> &SegmentId {
98        &self.sid
99    }
100
101    #[inline]
102    pub fn pageidx(&self) -> &PageIdx {
103        &self.pageidx
104    }
105}
106
107#[derive(IntoBytes, TryFromBytes, KnownLayout, Immutable, Unaligned)]
108#[repr(C)]
109struct SerializedPageKey {
110    sid: SegmentId,
111    pageidx: U32<BigEndian>,
112}
113
114impl AsRef<[u8]> for SerializedPageKey {
115    #[inline]
116    fn as_ref(&self) -> &[u8] {
117        self.as_bytes()
118    }
119}
120
121impl FjallKeyPrefix for PageKey {
122    type Prefix = SegmentId;
123}
124
125proxy_to_fjall_repr!(
126    encode (PageKey) using proxy (SerializedPageKey)
127    into_proxy(me) {
128        SerializedPageKey {
129            sid: me.sid,
130            pageidx: me.pageidx.into(),
131        }
132    }
133    from_proxy(proxy) {
134        Ok(PageKey {
135            sid: proxy.sid.clone(),
136            pageidx: PageIdx::try_from(proxy.pageidx)?,
137        })
138    }
139);
140
141#[cfg(test)]
142mod tests {
143    use graft_core::{lsn, pageidx};
144
145    use crate::local::fjall_storage::fjall_repr::testutil::{
146        test_invalid, test_roundtrip, test_serialized_order,
147    };
148
149    use super::*;
150
151    #[graft_test::test]
152    fn test_volume_id() {
153        test_roundtrip(VolumeId::random());
154        test_roundtrip(VolumeId::ZERO);
155        test_invalid::<VolumeId>(b"");
156        test_invalid::<VolumeId>(b"asdf");
157        test_invalid::<VolumeId>(SegmentId::random().as_bytes());
158    }
159
160    #[graft_test::test]
161    fn test_commit_key() {
162        test_roundtrip(VolumeRef::new(VolumeId::random(), lsn!(123)));
163
164        // zero LSN is invalid
165        test_invalid::<VolumeRef>(
166            SerializedVolumeRef {
167                vid: VolumeId::random(),
168                lsn: CBE64::new(0),
169            }
170            .as_bytes(),
171        );
172
173        test_invalid::<VolumeRef>(b"short");
174        test_invalid::<VolumeRef>(b"");
175
176        // CommitKeys must naturally sort in descending order by LSN
177        let vid1: VolumeId = "5rMJhdYXJ3-2e64STQSCVT8X".parse().unwrap();
178        let vid2: VolumeId = "5rMJhdYYXB-2e2iX9AHva3xQ".parse().unwrap();
179        test_serialized_order(&[
180            VolumeRef::new(vid1.clone(), lsn!(4)),
181            VolumeRef::new(vid1.clone(), lsn!(3)),
182            VolumeRef::new(vid1.clone(), lsn!(2)),
183            VolumeRef::new(vid1, lsn!(1)),
184            VolumeRef::new(vid2.clone(), lsn!(2)),
185            VolumeRef::new(vid2, lsn!(1)),
186        ]);
187    }
188
189    #[graft_test::test]
190    fn test_page_key() {
191        test_roundtrip(PageKey::new(SegmentId::random(), pageidx!(42)));
192
193        // zero page index is invalid
194        test_invalid::<PageKey>(
195            SerializedPageKey {
196                sid: SegmentId::random(),
197                pageidx: 0.into(),
198            }
199            .as_bytes(),
200        );
201
202        test_invalid::<PageKey>(b"short");
203        test_invalid::<PageKey>(b"");
204
205        // PageKeys must naturally sort in ascending order by page index
206        let sid1: SegmentId = "74ggYyz4aX-33cEC1Bm7Gekh".parse().unwrap();
207        let sid2: SegmentId = "74ggYyz7mA-33d6VHh4ENsxq".parse().unwrap();
208        test_serialized_order(&[
209            PageKey::new(sid1.clone(), pageidx!(1)),
210            PageKey::new(sid1.clone(), pageidx!(2)),
211            PageKey::new(sid1.clone(), pageidx!(3)),
212            PageKey::new(sid1, pageidx!(4)),
213            PageKey::new(sid2.clone(), pageidx!(1)),
214            PageKey::new(sid2, pageidx!(2)),
215        ]);
216    }
217}