indexedlog/log/
meta.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
8use std::collections::BTreeMap;
9use std::io;
10use std::io::Cursor;
11use std::io::Read;
12use std::io::Write;
13use std::path::Path;
14
15use byteorder::LittleEndian;
16use byteorder::ReadBytesExt;
17use byteorder::WriteBytesExt;
18use vlqencoding::VLQDecode;
19use vlqencoding::VLQEncode;
20
21use crate::errors::IoResultExt;
22use crate::utils;
23use crate::utils::atomic_read;
24use crate::utils::atomic_write;
25use crate::utils::xxhash;
26
27/// Metadata about index names, logical [`Log`] and [`Index`] file lengths.
28#[derive(PartialEq, Eq, Debug, Clone)]
29pub struct LogMetadata {
30    /// Length of the primary log file.
31    pub(crate) primary_len: u64,
32
33    /// Lengths of index files. Name => Length.
34    pub(crate) indexes: BTreeMap<String, u64>,
35
36    /// Used to detect non-append-only changes.
37    /// Conceptually similar to "create time".
38    pub(crate) epoch: u64,
39}
40
41impl LogMetadata {
42    /// Read metadata from a reader.
43    pub fn read<R: Read>(mut reader: R) -> io::Result<Self> {
44        let header = HeaderVersion::from_reader(&mut reader)?;
45        let hash: u64 = match header {
46            HeaderVersion::V0 => reader.read_vlq()?,
47            HeaderVersion::V1 => reader.read_u64::<LittleEndian>()?,
48        };
49        let buf_len = reader.read_vlq()?;
50
51        let mut buf = vec![0; buf_len];
52        reader.read_exact(&mut buf)?;
53
54        if xxhash(&buf) != hash {
55            let msg = "metadata integrity check failed";
56            return Err(io::Error::new(io::ErrorKind::InvalidData, msg));
57        }
58
59        let mut reader = Cursor::new(buf);
60        let primary_len = reader.read_vlq()?;
61        let index_count: usize = reader.read_vlq()?;
62        let mut indexes = BTreeMap::new();
63        for _ in 0..index_count {
64            let name_len = reader.read_vlq()?;
65            let mut name = vec![0; name_len];
66            reader.read_exact(&mut name)?;
67            let name = String::from_utf8(name).map_err(|_e| {
68                let msg = "non-utf8 index name";
69                io::Error::new(io::ErrorKind::InvalidData, msg)
70            })?;
71            let len = reader.read_vlq()?;
72            indexes.insert(name, len);
73        }
74
75        // 'epoch' is optional - it does not exist in a previous serialization
76        // format. So not being able to read it (because EOF) is not fatal.
77        let epoch = reader.read_vlq().unwrap_or_default();
78
79        Ok(Self {
80            primary_len,
81            indexes,
82            epoch,
83        })
84    }
85
86    /// Write metadata to a writer.
87    pub fn write<W: Write>(&self, writer: &mut W) -> io::Result<()> {
88        let header = if cfg!(test) {
89            HeaderVersion::V1
90        } else {
91            HeaderVersion::V0
92        };
93        self.write_using_header(writer, header)
94    }
95
96    /// Write using specified header. Used by tests.
97    fn write_using_header<W: Write>(
98        &self,
99        writer: &mut W,
100        header: HeaderVersion,
101    ) -> io::Result<()> {
102        let mut buf = Vec::new();
103        buf.write_vlq(self.primary_len)?;
104        buf.write_vlq(self.indexes.len())?;
105        for (name, len) in self.indexes.iter() {
106            let name = name.as_bytes();
107            buf.write_vlq(name.len())?;
108            buf.write_all(name)?;
109            buf.write_vlq(*len)?;
110        }
111        buf.write_vlq(self.epoch)?;
112        writer.write_all(header.to_bytes())?;
113        match header {
114            HeaderVersion::V1 => writer.write_u64::<LittleEndian>(xxhash(&buf))?,
115            HeaderVersion::V0 => writer.write_vlq(xxhash(&buf))?,
116        }
117        writer.write_vlq(buf.len())?;
118        writer.write_all(&buf)?;
119
120        Ok(())
121    }
122
123    /// Read metadata from a file.
124    pub fn read_file<P: AsRef<Path>>(path: P) -> crate::Result<Self> {
125        let path = path.as_ref();
126        let buf = atomic_read(path).context(path, "when reading LogMetadata")?;
127        Self::read(&buf[..]).context(path, || {
128            format!("when parsing LogMetadata (content: {:?})", &buf)
129        })
130    }
131
132    /// Atomically write metadata to a file.
133    pub fn write_file<P: AsRef<Path>>(&self, path: P, fsync: bool) -> crate::Result<()> {
134        let mut buf = Vec::new();
135        self.write(&mut buf).infallible()?;
136        atomic_write(path, &buf, fsync)?;
137        Ok(())
138    }
139
140    /// Create a new LogMetadata that matches the primary length with
141    /// empty indexes.
142    /// The caller must make sure the primary log is consistent (exists,
143    /// and covered the length).
144    pub(crate) fn new_with_primary_len(len: u64) -> Self {
145        Self {
146            primary_len: len,
147            indexes: BTreeMap::new(),
148            epoch: utils::rand_u64(),
149        }
150    }
151
152    /// Test if two Metadata is compatible, aka. having the same length
153    /// and epoch.
154    pub(crate) fn is_compatible_with(&self, other: &Self) -> bool {
155        self.primary_len == other.primary_len && self.epoch == other.epoch
156    }
157}
158
159#[derive(Copy, Clone, Debug)]
160enum HeaderVersion {
161    V0,
162
163    // V1: xxhash uses fixed 8 bytes instead of vlq.
164    V1,
165}
166
167impl HeaderVersion {
168    const HEADER_V0: &'static [u8] = b"meta\0";
169    const HEADER_V1: &'static [u8] = b"meta\x01";
170
171    fn from_reader(reader: &mut dyn Read) -> io::Result<Self> {
172        assert_eq!(Self::HEADER_V0.len(), Self::HEADER_V0.len());
173        let mut header = vec![0; Self::HEADER_V0.len()];
174        reader.read_exact(&mut header)?;
175        if header == Self::HEADER_V1 {
176            Ok(Self::V1)
177        } else if header == Self::HEADER_V0 {
178            Ok(Self::V0)
179        } else {
180            let msg = "invalid metadata header";
181            Err(io::Error::new(io::ErrorKind::InvalidData, msg))
182        }
183    }
184
185    fn to_bytes(&self) -> &[u8] {
186        match self {
187            Self::V0 => Self::HEADER_V0,
188            Self::V1 => Self::HEADER_V1,
189        }
190    }
191}
192
193#[cfg(test)]
194mod tests {
195    use quickcheck::quickcheck;
196    use tempfile::tempdir;
197
198    use super::*;
199
200    quickcheck! {
201        fn test_roundtrip_meta(primary_len: u64, indexes: BTreeMap<String, u64>, epoch: u64) -> bool {
202            let mut buf = Vec::new();
203            let meta = LogMetadata { primary_len, indexes, epoch,  };
204            meta.write(&mut buf).expect("write");
205            let mut cur = Cursor::new(buf);
206            let meta_read = LogMetadata::read(&mut cur).expect("read");
207            meta_read == meta
208        }
209
210        fn test_roundtrip_meta_v0(primary_len: u64, indexes: BTreeMap<String, u64>, epoch: u64) -> bool {
211            let mut buf = Vec::new();
212            let meta = LogMetadata { primary_len, indexes, epoch,  };
213            meta.write_using_header(&mut buf, HeaderVersion::V0).expect("write");
214            let mut cur = Cursor::new(buf);
215            let meta_read = LogMetadata::read(&mut cur).expect("read");
216            meta_read == meta
217        }
218
219        fn test_roundtrip_meta_file(primary_len: u64, indexes: BTreeMap<String, u64>, epoch: u64) -> bool {
220            let dir = tempdir().unwrap();
221            let meta = LogMetadata { primary_len, indexes, epoch,  };
222            let path = dir.path().join("meta");
223            meta.write_file(&path, false).expect("write_file");
224            let meta_read = LogMetadata::read_file(&path).expect("read_file");
225            meta_read == meta
226        }
227    }
228
229    #[test]
230    fn test_read_file_includes_file_content_on_error() {
231        let dir = tempdir().unwrap();
232        let path = dir.path().join("meta");
233        let meta = LogMetadata {
234            primary_len: 1,
235            indexes: Default::default(),
236            epoch: 42,
237        };
238        let mut buf: Vec<u8> = Vec::new();
239        meta.write(&mut buf).unwrap();
240        *buf.last_mut().unwrap() ^= 1;
241        std::fs::write(&path, &buf).unwrap();
242        let err = LogMetadata::read_file(&path).unwrap_err();
243        let content = format!("{:?}", &buf);
244        assert!(err.to_string().contains(&content));
245    }
246}