segments/disk/
mod.rs

1pub mod index;
2pub mod segment;
3
4use index::Index;
5use segment::Segment;
6
7use std::collections::HashMap;
8use std::fs;
9use std::io;
10use std::path::PathBuf;
11
12struct Chunk {
13    index: Index,
14    segment: Segment,
15}
16
17pub struct DiskLog {
18    dir: PathBuf,
19    max_segment_size: u64,
20    max_index_size: u64,
21    base_offsets: Vec<u64>,
22    max_segments: usize,
23    active_chunk: u64,
24    chunks: HashMap<u64, Chunk>,
25}
26
27impl DiskLog {
28    pub fn new<P: Into<PathBuf>>(
29        dir: P,
30        max_index_size: u64,
31        max_segment_size: u64,
32        max_segments: usize,
33    ) -> io::Result<DiskLog> {
34        let dir = dir.into();
35        let _ = fs::create_dir_all(&dir);
36        if max_segment_size < 1024 || max_index_size < 100 {
37            panic!("size should be at least 1KB")
38        }
39
40        let files = fs::read_dir(&dir)?;
41        let mut base_offsets = Vec::new();
42        for file in files {
43            let path = file?.path();
44            let offset = path.file_stem().unwrap().to_str().unwrap();
45            let offset = offset.parse::<u64>().unwrap();
46            base_offsets.push(offset);
47        }
48
49        base_offsets.sort();
50        let mut chunks = HashMap::new();
51
52        let active_segment = if let Some((last_offset, offsets)) = base_offsets.split_last() {
53            // Initialized filled segments
54            for base_offset in offsets.iter() {
55                let index = Index::new(&dir, *base_offset, max_index_size, false)?;
56                let segment = Segment::new(&dir, *base_offset)?;
57                let chunk = Chunk { index, segment };
58                chunks.insert(*base_offset, chunk);
59            }
60
61            // Initialize active segment
62            let index = Index::new(&dir, *last_offset, max_index_size, true)?;
63            let segment = Segment::new(&dir, *last_offset)?;
64            let mut chunk = Chunk { index, segment };
65
66            // Wrong counts due to unclosed segments are handled during initialization. We can just assume
67            // count is always right from here on
68            let next_offset = chunk.index.count();
69            chunk.segment.set_next_offset(next_offset);
70            chunks.insert(*last_offset, chunk);
71            *last_offset
72        } else {
73            let index = Index::new(&dir, 0, max_index_size, true)?;
74            let segment = Segment::new(&dir, 0)?;
75            let chunk = Chunk { index, segment };
76            chunks.insert(0, chunk);
77            base_offsets.push(0);
78            0
79        };
80
81        let log = DiskLog {
82            dir,
83            max_segment_size,
84            max_index_size,
85            max_segments,
86            base_offsets,
87            chunks,
88            active_chunk: active_segment,
89        };
90
91        Ok(log)
92    }
93
94    pub fn append(&mut self, record: &[u8]) -> io::Result<()> {
95        let active_chunk = if let Some(v) = self.chunks.get_mut(&self.active_chunk) {
96            v
97        } else {
98            return Err(io::Error::new(io::ErrorKind::Other, "No active segment"));
99        };
100
101        if active_chunk.segment.size() >= self.max_segment_size {
102            active_chunk.segment.close()?;
103            active_chunk.index.close()?;
104
105            // update active chunk
106            let base_offset = active_chunk.index.base_offset() + active_chunk.index.count();
107            let index = Index::new(&self.dir, base_offset, self.max_index_size, true)?;
108            let segment = Segment::new(&self.dir, base_offset)?;
109            let chunk = Chunk { index, segment };
110            self.chunks.insert(base_offset, chunk);
111            self.base_offsets.push(base_offset);
112            self.active_chunk = base_offset;
113
114            if self.base_offsets.len() > self.max_segments {
115                let remove_offset = self.base_offsets.remove(0);
116                self.remove(remove_offset)?;
117            }
118        }
119
120        // write record to segment and index
121        let active_chunk = self.chunks.get_mut(&self.active_chunk).unwrap();
122        let (_, position) = active_chunk.segment.append(record)?;
123        active_chunk.index.write(position, record.len() as u64)?;
124        Ok(())
125    }
126
127    /// Read a record from correct segment
128    /// Returns data, next base offset and relative offset
129    pub fn read(&mut self, base_offset: u64, offset: u64) -> io::Result<Vec<u8>> {
130        let chunk = match self.chunks.get_mut(&base_offset) {
131            Some(segment) => segment,
132            None => {
133                return Err(io::Error::new(
134                    io::ErrorKind::InvalidInput,
135                    "Invalid segment",
136                ))
137            }
138        };
139
140        let (position, len) = chunk.index.read(offset)?;
141        let mut payload = vec![0; len as usize];
142        chunk.segment.read(position, &mut payload)?;
143        Ok(payload)
144    }
145
146    /// Goes through index and returns chunks which tell how to sweep segments to collect
147    /// necessary amount on data asked by the user
148    /// Corner cases:
149    /// When there is more data (in other segments) current eof should move to next segment
150    /// Empty segments are possible after moving to next segment
151    /// EOFs after some data is collected are not errors
152    fn indexv(&self, base_offset: u64, relative_offset: u64, size: u64) -> io::Result<Chunks> {
153        let mut chunks = Chunks {
154            base_offset,
155            relative_offset,
156            count: 0,
157            size: 0,
158            chunks: Vec::new(),
159        };
160
161        loop {
162            // Get the chunk with given base offset
163            let chunk = match self.chunks.get(&chunks.base_offset) {
164                Some(c) => c,
165                None if chunks.count == 0 => {
166                    return Err(io::Error::new(
167                        io::ErrorKind::InvalidInput,
168                        "Invalid segment",
169                    ))
170                }
171                None => break,
172            };
173
174            // If next relative offset is equal to index count => We've crossed the boundary
175            // NOTE: We are assuming the index file was closed properly. `index.count()` will
176            // count `unfilled zeros` due to mmap `set_len` if it was not closed properly
177            // FIXME for chunks with index which isn't closed properly, relative offset will
178            // FIXME be less than count but `readv` is going to return EOF
179            // Reads on indexes which aren't closed properly result in `EOF` when they encounter 0 length record as the mmaped
180            // segment isn't truncated. Index read goes past the actual size as the size calculation of the next boot is wrong.
181            // This block covers both usual EOFs during normal operations as well as EOFs due to unclosed index
182            // EOF due to unclosed index is a warning though
183            if chunks.relative_offset >= chunk.index.count() {
184                // break if we are already at the tail segment
185                if chunks.base_offset == *self.base_offsets.last().unwrap() {
186                    chunks.relative_offset -= 1;
187                    break;
188                }
189
190                // we use 'total offsets' to go next segment. this remains same during subsequent
191                // tail reads if there are no appends. hence the above early return
192                chunks.base_offset = chunk.index.base_offset() + chunk.index.count();
193                chunks.relative_offset = 0;
194                continue;
195            }
196
197            // Get what to read from the segment and fill the buffer. Covers the case where the logic has just moved to next
198            // segment and the segment is empty
199            let read_size = size - chunks.size;
200            let (position, payload_size, count) =
201                chunk.index.readv(chunks.relative_offset, read_size)?;
202            chunks.relative_offset += count;
203            chunks.count += count;
204            chunks.size += payload_size;
205            chunks
206                .chunks
207                .push((chunks.base_offset, position, payload_size, count));
208            if chunks.size >= size {
209                chunks.relative_offset -= 1;
210                break;
211            }
212        }
213
214        Ok(chunks)
215    }
216
217    /// Reads multiple packets from the disk and return base offset and relative offset of the
218    /// Returns base offset, relative offset of the last record along with number of messages and count
219    /// Goes to next segment when relative off set crosses boundary
220    pub fn readv(
221        &mut self,
222        base_offset: u64,
223        relative_offset: u64,
224        size: u64,
225    ) -> io::Result<(u64, u64, u64, Vec<u8>)> {
226        let chunks = self.indexv(base_offset, relative_offset, size)?;
227
228        // Fill the pre-allocated buffer
229        let mut out = vec![0; chunks.size as usize];
230        let mut start = 0;
231        for c in chunks.chunks {
232            let chunk = match self.chunks.get_mut(&c.0) {
233                Some(c) => c,
234                None => break,
235            };
236
237            let position = c.1;
238            let payload_size = c.2;
239            chunk
240                .segment
241                .read(position, &mut out[start..start + payload_size as usize])?;
242            start += payload_size as usize;
243        }
244
245        Ok((
246            chunks.base_offset,
247            chunks.relative_offset,
248            chunks.count,
249            out,
250        ))
251    }
252
253    pub fn close(&mut self, base_offset: u64) -> io::Result<()> {
254        if let Some(chunk) = self.chunks.get_mut(&base_offset) {
255            chunk.index.close()?;
256            chunk.segment.close()?;
257        }
258
259        Ok(())
260    }
261
262    // Removes segment with given base offset from the disk and the system
263    pub fn remove(&mut self, base_offset: u64) -> io::Result<()> {
264        if let Some(mut chunk) = self.chunks.remove(&base_offset) {
265            chunk.segment.close()?;
266
267            let file: PathBuf = self.dir.clone();
268            let index_file_name = format!("{:020}.index", base_offset);
269            let segment_file_name = format!("{:020}.segment", base_offset);
270
271            // dbg!(file.join(&index_file_name));
272            fs::remove_file(file.join(index_file_name))?;
273            fs::remove_file(file.join(segment_file_name))?;
274        }
275
276        Ok(())
277    }
278
279    pub fn close_all(&mut self) -> io::Result<()> {
280        for (_, chunk) in self.chunks.iter_mut() {
281            chunk.index.close()?;
282            chunk.segment.close()?;
283        }
284
285        Ok(())
286    }
287
288    pub fn remove_all(&mut self) -> io::Result<()> {
289        self.close_all()?;
290        fs::remove_dir(&self.dir)?;
291
292        Ok(())
293    }
294}
295
296/// Captured state while sweeping indexes collect a bulk of records
297/// from segment/segments
298/// TODO: 'chunks' vector arguments aren't readable
299struct Chunks {
300    base_offset: u64,
301    relative_offset: u64,
302    count: u64,
303    size: u64,
304    chunks: Vec<(u64, u64, u64, u64)>,
305}
306
307#[cfg(test)]
308mod test {
309    use super::DiskLog;
310    use pretty_assertions::assert_eq;
311    use std::io;
312
313    #[test]
314    fn append_creates_and_deletes_segments_correctly() {
315        let dir = tempfile::tempdir().unwrap();
316        let dir = dir.path();
317
318        let record_count = 100;
319        let max_index_size = record_count * 16;
320        let mut log = DiskLog::new(dir, max_index_size, 10 * 1024, 10).unwrap();
321        let mut payload = vec![0u8; 1024];
322
323        // 200 1K iterations. 20 files ignoring deletes. 0.segment, 10.segment .... 199.segment
324        // considering deletes -> 110.segment .. 200.segment
325        for i in 0..200 {
326            payload[0] = i;
327            log.append(&payload).unwrap();
328        }
329
330        // Semi fill 200.segment
331        for i in 200..205 {
332            payload[0] = i;
333            log.append(&payload).unwrap();
334        }
335
336        let data = log.read(10, 0);
337        match data {
338            Err(e) if e.kind() == io::ErrorKind::InvalidInput => (),
339            _ => panic!("Expecting an invalid input error"),
340        };
341
342        // read segment with base offset 110
343        let base_offset = 110;
344        for i in 0..10 {
345            let data = log.read(base_offset, i).unwrap();
346            let d = (base_offset + i) as u8;
347            assert_eq!(data[0], d);
348        }
349
350        // read segment with base offset 190
351        let base_offset = 110;
352        for i in 0..10 {
353            let data = log.read(base_offset, i).unwrap();
354            let d = (base_offset + i) as u8;
355            assert_eq!(data[0], d);
356        }
357
358        // read 200.segment which is semi filled with 5 records
359        let base_offset = 200;
360        for i in 0..5 {
361            let data = log.read(base_offset, i).unwrap();
362            let d = (base_offset + i) as u8;
363            assert_eq!(data[0], d);
364        }
365
366        let data = log.read(base_offset, 5);
367        match data {
368            Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => (),
369            _ => panic!("Expecting end of file error"),
370        };
371    }
372
373    #[test]
374    fn multi_segment_reads_work_as_expected() {
375        let dir = tempfile::tempdir().unwrap();
376        let dir = dir.path();
377
378        // 100K bytes
379        let record_count = 100;
380        let record_size = 1 * 1024;
381
382        // 10 records per segment. 10 segments (0.segment - 90.segment)
383        let max_segment_size = 10 * 1024;
384        let max_index_size = record_count * 16;
385        let mut log = DiskLog::new(dir, max_index_size, max_segment_size, 100).unwrap();
386
387        // 100 1K iterations. 10 files ignoring deletes.
388        // 0.segment (data with 0 - 9), 10.segment (10 - 19) .... 90.segment (0 size)
389        // 10K per file
390        let mut payload = vec![0u8; record_size];
391        for i in 0..record_count {
392            payload[0] = i as u8;
393            log.append(&payload).unwrap();
394        }
395
396        // Read all the segments
397        let base_offset = 0;
398        for i in 0..10 {
399            let data = log.read(base_offset, i).unwrap();
400            let d = (base_offset + i) as u8;
401            assert_eq!(data[0], d);
402        }
403    }
404
405    #[test]
406    fn vectored_read_works_as_expected() {
407        let dir = tempfile::tempdir().unwrap();
408        let dir = dir.path();
409
410        let record_count = 100;
411        let max_index_size = record_count * 16;
412        let mut log = DiskLog::new(dir, max_index_size, 10 * 1024, 10).unwrap();
413
414        // 90 1K iterations. 10 files ignoring deletes.
415        // 0.segment (data with 0 - 9), 10.segment (10 - 19) .... 90.segment (0 size)
416        // 10K per file
417        let mut payload = vec![0u8; 1024];
418        for i in 0..90 {
419            payload[0] = i;
420            log.append(&payload).unwrap();
421        }
422
423        // Read 50K. Reads 0.segment - 4.segment
424        let (base_offset, relative_offset, count, data) = log.readv(0, 0, 50 * 1024).unwrap();
425        assert_eq!(base_offset, 40);
426        assert_eq!(relative_offset, 9);
427        assert_eq!(count, 50);
428
429        let total_size = data.len();
430        assert_eq!(total_size, 50 * 1024);
431
432        // Read 50.segment offset 0
433        let data = log.read(50, 0).unwrap();
434        assert_eq!(data[0], 50);
435    }
436
437    #[test]
438    fn vectored_reads_in_different_boots_works_as_expected() {
439        let dir = tempfile::tempdir().unwrap();
440        let dir = dir.path();
441
442        let record_count = 100;
443        let max_index_size = record_count * 16;
444        let mut log = DiskLog::new(dir, max_index_size, 10 * 1024, 10).unwrap();
445
446        // 100 1K iterations. 10 files
447        // 0.segment (data with 0 - 9), 10.segment (10 - 19) .... 90.segment (90 - 99)
448        // 10K per file
449        let mut payload: Vec<u8> = vec![0u8; 1024];
450        for i in 0..100 {
451            payload[0] = i;
452            log.append(&payload).unwrap();
453        }
454
455        log.close_all().unwrap();
456
457        // Boot 2. Read 50K. Reads 0.segment - 4.segment
458        let mut log = DiskLog::new(dir, max_index_size, 10 * 1024, 10).unwrap();
459        let (base_offset, relative_offset, count, data) = log.readv(0, 0, 50 * 1024).unwrap();
460        assert_eq!(base_offset, 40);
461        assert_eq!(relative_offset, 9);
462        assert_eq!(count, 50);
463
464        for i in 0..count {
465            let start = i as usize * 1024;
466            let end = start + 1024;
467            let record = &data[start..end];
468            assert_eq!(record[0], i as u8);
469        }
470
471        let total_size = data.len();
472        assert_eq!(total_size, 50 * 1024);
473
474        // Read 50.segment offset 0
475        let data = log.read(50, 0).unwrap();
476        assert_eq!(data[0], 50);
477    }
478
479    #[test]
480    fn vectored_reads_on_unclosed_index_and_segment_works_as_expected() {
481        let dir = tempfile::tempdir().unwrap();
482        let dir = dir.path();
483
484        // 15K bytes
485        let record_count = 15;
486        let record_size = 1 * 1024;
487
488        let max_segment_size = 10 * 1024;
489        let max_index_size = record_count * 16;
490        let mut log = DiskLog::new(dir, max_index_size, max_segment_size, 100).unwrap();
491
492        // 10 records per segment. 2 segments. 0.segment, 10.segment (partially filled and unclosed)
493        let mut payload = vec![0u8; record_size];
494        for i in 0..record_count {
495            payload[0] = i as u8;
496            log.append(&payload).unwrap();
497        }
498
499        // Last disk not closed. Index will be filled with zeros and segment entries in index are not flushed from buffer yet
500        // Trailing zero indexes are considered as corrupted indexes
501        if let Ok(_l) = DiskLog::new(dir, max_index_size, max_segment_size, 100) {
502            panic!("Expecting a corrupted index error due to trailing zeros in the index")
503        }
504    }
505
506    #[test]
507    fn vectored_reads_crosses_boundary_correctly() {
508        let dir = tempfile::tempdir().unwrap();
509        let dir = dir.path();
510
511        let record_count = 100;
512        let max_index_size = record_count * 16;
513        let mut log = DiskLog::new(dir, max_index_size, 10 * 1024, 10).unwrap();
514
515        // 25 1K iterations. 3 segments
516        // 0.segment (10K, data with 0 - 9), 10.segment (5K, data with 10 - 14)
517        let mut payload = vec![0u8; 1024];
518        for i in 0..25 {
519            payload[0] = i;
520            log.append(&payload).unwrap();
521        }
522
523        // Read 15K. Crosses boundaries of the segment and offset will be in the middle of 2nd segment
524        let (base_offset, relative_offset, count, data) = log.readv(0, 0, 15 * 1024).unwrap();
525        assert_eq!(base_offset, 10);
526        assert_eq!(relative_offset, 4);
527        assert_eq!(count, 15);
528        assert_eq!(data.len(), 15 * 1024);
529
530        // Read 15K. Crosses boundaries of the segment and offset will be at last record of 3rd segment
531        let (base_offset, relative_offset, count, data) = log
532            .readv(base_offset, relative_offset + 1, 15 * 1024)
533            .unwrap();
534        assert_eq!(base_offset, 20);
535        assert_eq!(relative_offset, 4);
536        assert_eq!(count, 10);
537        assert_eq!(data.len(), 10 * 1024);
538    }
539
540    #[test]
541    fn vectored_read_more_than_full_chomp_works_as_expected() {
542        let dir = tempfile::tempdir().unwrap();
543        let dir = dir.path();
544
545        let record_count = 100;
546        let max_index_size = record_count * 16;
547        let mut log = DiskLog::new(dir, max_index_size, 10 * 1024, 10).unwrap();
548
549        // 90 1K iterations. 10 files
550        // 0.segment (data with 0 - 9), 10.segment (10 - 19) .... 80.segment
551        // 10K per file. 90K in total
552        let mut payload = vec![0u8; 1024];
553        for i in 0..90 {
554            payload[0] = i;
555            log.append(&payload).unwrap();
556        }
557
558        // Read 200K. Crosses boundaries of all the segments
559        let (base_offset, relative_offset, count, data) = log.readv(0, 0, 200 * 1024).unwrap();
560        assert_eq!(base_offset, 80);
561        assert_eq!(relative_offset, 9);
562        assert_eq!(count, 90);
563        assert_eq!(data.len(), 90 * 1024);
564    }
565}