1use std::collections::BTreeMap;
9use std::io;
10use std::io::Cursor;
11use std::io::Read;
12use std::io::Write;
13use std::path::Path;
14
15use byteorder::LittleEndian;
16use byteorder::ReadBytesExt;
17use byteorder::WriteBytesExt;
18use vlqencoding::VLQDecode;
19use vlqencoding::VLQEncode;
20
21use crate::errors::IoResultExt;
22use crate::utils;
23use crate::utils::atomic_read;
24use crate::utils::atomic_write;
25use crate::utils::xxhash;
26
27#[derive(PartialEq, Eq, Debug, Clone)]
29pub struct LogMetadata {
30 pub(crate) primary_len: u64,
32
33 pub(crate) indexes: BTreeMap<String, u64>,
35
36 pub(crate) epoch: u64,
39}
40
41impl LogMetadata {
42 pub fn read<R: Read>(mut reader: R) -> io::Result<Self> {
44 let header = HeaderVersion::from_reader(&mut reader)?;
45 let hash: u64 = match header {
46 HeaderVersion::V0 => reader.read_vlq()?,
47 HeaderVersion::V1 => reader.read_u64::<LittleEndian>()?,
48 };
49 let buf_len = reader.read_vlq()?;
50
51 let mut buf = vec![0; buf_len];
52 reader.read_exact(&mut buf)?;
53
54 if xxhash(&buf) != hash {
55 let msg = "metadata integrity check failed";
56 return Err(io::Error::new(io::ErrorKind::InvalidData, msg));
57 }
58
59 let mut reader = Cursor::new(buf);
60 let primary_len = reader.read_vlq()?;
61 let index_count: usize = reader.read_vlq()?;
62 let mut indexes = BTreeMap::new();
63 for _ in 0..index_count {
64 let name_len = reader.read_vlq()?;
65 let mut name = vec![0; name_len];
66 reader.read_exact(&mut name)?;
67 let name = String::from_utf8(name).map_err(|_e| {
68 let msg = "non-utf8 index name";
69 io::Error::new(io::ErrorKind::InvalidData, msg)
70 })?;
71 let len = reader.read_vlq()?;
72 indexes.insert(name, len);
73 }
74
75 let epoch = reader.read_vlq().unwrap_or_default();
78
79 Ok(Self {
80 primary_len,
81 indexes,
82 epoch,
83 })
84 }
85
86 pub fn write<W: Write>(&self, writer: &mut W) -> io::Result<()> {
88 let header = if cfg!(test) {
89 HeaderVersion::V1
90 } else {
91 HeaderVersion::V0
92 };
93 self.write_using_header(writer, header)
94 }
95
96 fn write_using_header<W: Write>(
98 &self,
99 writer: &mut W,
100 header: HeaderVersion,
101 ) -> io::Result<()> {
102 let mut buf = Vec::new();
103 buf.write_vlq(self.primary_len)?;
104 buf.write_vlq(self.indexes.len())?;
105 for (name, len) in self.indexes.iter() {
106 let name = name.as_bytes();
107 buf.write_vlq(name.len())?;
108 buf.write_all(name)?;
109 buf.write_vlq(*len)?;
110 }
111 buf.write_vlq(self.epoch)?;
112 writer.write_all(header.to_bytes())?;
113 match header {
114 HeaderVersion::V1 => writer.write_u64::<LittleEndian>(xxhash(&buf))?,
115 HeaderVersion::V0 => writer.write_vlq(xxhash(&buf))?,
116 }
117 writer.write_vlq(buf.len())?;
118 writer.write_all(&buf)?;
119
120 Ok(())
121 }
122
123 pub fn read_file<P: AsRef<Path>>(path: P) -> crate::Result<Self> {
125 let path = path.as_ref();
126 let buf = atomic_read(path).context(path, "when reading LogMetadata")?;
127 Self::read(&buf[..]).context(path, || {
128 format!("when parsing LogMetadata (content: {:?})", &buf)
129 })
130 }
131
132 pub fn write_file<P: AsRef<Path>>(&self, path: P, fsync: bool) -> crate::Result<()> {
134 let mut buf = Vec::new();
135 self.write(&mut buf).infallible()?;
136 atomic_write(path, &buf, fsync)?;
137 Ok(())
138 }
139
140 pub(crate) fn new_with_primary_len(len: u64) -> Self {
145 Self {
146 primary_len: len,
147 indexes: BTreeMap::new(),
148 epoch: utils::rand_u64(),
149 }
150 }
151
152 pub(crate) fn is_compatible_with(&self, other: &Self) -> bool {
155 self.primary_len == other.primary_len && self.epoch == other.epoch
156 }
157}
158
159#[derive(Copy, Clone, Debug)]
160enum HeaderVersion {
161 V0,
162
163 V1,
165}
166
167impl HeaderVersion {
168 const HEADER_V0: &'static [u8] = b"meta\0";
169 const HEADER_V1: &'static [u8] = b"meta\x01";
170
171 fn from_reader(reader: &mut dyn Read) -> io::Result<Self> {
172 assert_eq!(Self::HEADER_V0.len(), Self::HEADER_V0.len());
173 let mut header = vec![0; Self::HEADER_V0.len()];
174 reader.read_exact(&mut header)?;
175 if header == Self::HEADER_V1 {
176 Ok(Self::V1)
177 } else if header == Self::HEADER_V0 {
178 Ok(Self::V0)
179 } else {
180 let msg = "invalid metadata header";
181 Err(io::Error::new(io::ErrorKind::InvalidData, msg))
182 }
183 }
184
185 fn to_bytes(&self) -> &[u8] {
186 match self {
187 Self::V0 => Self::HEADER_V0,
188 Self::V1 => Self::HEADER_V1,
189 }
190 }
191}
192
193#[cfg(test)]
194mod tests {
195 use quickcheck::quickcheck;
196 use tempfile::tempdir;
197
198 use super::*;
199
200 quickcheck! {
201 fn test_roundtrip_meta(primary_len: u64, indexes: BTreeMap<String, u64>, epoch: u64) -> bool {
202 let mut buf = Vec::new();
203 let meta = LogMetadata { primary_len, indexes, epoch, };
204 meta.write(&mut buf).expect("write");
205 let mut cur = Cursor::new(buf);
206 let meta_read = LogMetadata::read(&mut cur).expect("read");
207 meta_read == meta
208 }
209
210 fn test_roundtrip_meta_v0(primary_len: u64, indexes: BTreeMap<String, u64>, epoch: u64) -> bool {
211 let mut buf = Vec::new();
212 let meta = LogMetadata { primary_len, indexes, epoch, };
213 meta.write_using_header(&mut buf, HeaderVersion::V0).expect("write");
214 let mut cur = Cursor::new(buf);
215 let meta_read = LogMetadata::read(&mut cur).expect("read");
216 meta_read == meta
217 }
218
219 fn test_roundtrip_meta_file(primary_len: u64, indexes: BTreeMap<String, u64>, epoch: u64) -> bool {
220 let dir = tempdir().unwrap();
221 let meta = LogMetadata { primary_len, indexes, epoch, };
222 let path = dir.path().join("meta");
223 meta.write_file(&path, false).expect("write_file");
224 let meta_read = LogMetadata::read_file(&path).expect("read_file");
225 meta_read == meta
226 }
227 }
228
229 #[test]
230 fn test_read_file_includes_file_content_on_error() {
231 let dir = tempdir().unwrap();
232 let path = dir.path().join("meta");
233 let meta = LogMetadata {
234 primary_len: 1,
235 indexes: Default::default(),
236 epoch: 42,
237 };
238 let mut buf: Vec<u8> = Vec::new();
239 meta.write(&mut buf).unwrap();
240 *buf.last_mut().unwrap() ^= 1;
241 std::fs::write(&path, &buf).unwrap();
242 let err = LogMetadata::read_file(&path).unwrap_err();
243 let content = format!("{:?}", &buf);
244 assert!(err.to_string().contains(&content));
245 }
246}