1pub mod index;
2pub mod segment;
3
4use index::Index;
5use segment::Segment;
6
7use std::collections::HashMap;
8use std::fs;
9use std::io;
10use std::path::PathBuf;
11
12struct Chunk {
13 index: Index,
14 segment: Segment,
15}
16
17pub struct DiskLog {
18 dir: PathBuf,
19 max_segment_size: u64,
20 max_index_size: u64,
21 base_offsets: Vec<u64>,
22 max_segments: usize,
23 active_chunk: u64,
24 chunks: HashMap<u64, Chunk>,
25}
26
27impl DiskLog {
28 pub fn new<P: Into<PathBuf>>(
29 dir: P,
30 max_index_size: u64,
31 max_segment_size: u64,
32 max_segments: usize,
33 ) -> io::Result<DiskLog> {
34 let dir = dir.into();
35 let _ = fs::create_dir_all(&dir);
36 if max_segment_size < 1024 || max_index_size < 100 {
37 panic!("size should be at least 1KB")
38 }
39
40 let files = fs::read_dir(&dir)?;
41 let mut base_offsets = Vec::new();
42 for file in files {
43 let path = file?.path();
44 let offset = path.file_stem().unwrap().to_str().unwrap();
45 let offset = offset.parse::<u64>().unwrap();
46 base_offsets.push(offset);
47 }
48
49 base_offsets.sort();
50 let mut chunks = HashMap::new();
51
52 let active_segment = if let Some((last_offset, offsets)) = base_offsets.split_last() {
53 for base_offset in offsets.iter() {
55 let index = Index::new(&dir, *base_offset, max_index_size, false)?;
56 let segment = Segment::new(&dir, *base_offset)?;
57 let chunk = Chunk { index, segment };
58 chunks.insert(*base_offset, chunk);
59 }
60
61 let index = Index::new(&dir, *last_offset, max_index_size, true)?;
63 let segment = Segment::new(&dir, *last_offset)?;
64 let mut chunk = Chunk { index, segment };
65
66 let next_offset = chunk.index.count();
69 chunk.segment.set_next_offset(next_offset);
70 chunks.insert(*last_offset, chunk);
71 *last_offset
72 } else {
73 let index = Index::new(&dir, 0, max_index_size, true)?;
74 let segment = Segment::new(&dir, 0)?;
75 let chunk = Chunk { index, segment };
76 chunks.insert(0, chunk);
77 base_offsets.push(0);
78 0
79 };
80
81 let log = DiskLog {
82 dir,
83 max_segment_size,
84 max_index_size,
85 max_segments,
86 base_offsets,
87 chunks,
88 active_chunk: active_segment,
89 };
90
91 Ok(log)
92 }
93
94 pub fn append(&mut self, record: &[u8]) -> io::Result<()> {
95 let active_chunk = if let Some(v) = self.chunks.get_mut(&self.active_chunk) {
96 v
97 } else {
98 return Err(io::Error::new(io::ErrorKind::Other, "No active segment"));
99 };
100
101 if active_chunk.segment.size() >= self.max_segment_size {
102 active_chunk.segment.close()?;
103 active_chunk.index.close()?;
104
105 let base_offset = active_chunk.index.base_offset() + active_chunk.index.count();
107 let index = Index::new(&self.dir, base_offset, self.max_index_size, true)?;
108 let segment = Segment::new(&self.dir, base_offset)?;
109 let chunk = Chunk { index, segment };
110 self.chunks.insert(base_offset, chunk);
111 self.base_offsets.push(base_offset);
112 self.active_chunk = base_offset;
113
114 if self.base_offsets.len() > self.max_segments {
115 let remove_offset = self.base_offsets.remove(0);
116 self.remove(remove_offset)?;
117 }
118 }
119
120 let active_chunk = self.chunks.get_mut(&self.active_chunk).unwrap();
122 let (_, position) = active_chunk.segment.append(record)?;
123 active_chunk.index.write(position, record.len() as u64)?;
124 Ok(())
125 }
126
127 pub fn read(&mut self, base_offset: u64, offset: u64) -> io::Result<Vec<u8>> {
130 let chunk = match self.chunks.get_mut(&base_offset) {
131 Some(segment) => segment,
132 None => {
133 return Err(io::Error::new(
134 io::ErrorKind::InvalidInput,
135 "Invalid segment",
136 ))
137 }
138 };
139
140 let (position, len) = chunk.index.read(offset)?;
141 let mut payload = vec![0; len as usize];
142 chunk.segment.read(position, &mut payload)?;
143 Ok(payload)
144 }
145
146 fn indexv(&self, base_offset: u64, relative_offset: u64, size: u64) -> io::Result<Chunks> {
153 let mut chunks = Chunks {
154 base_offset,
155 relative_offset,
156 count: 0,
157 size: 0,
158 chunks: Vec::new(),
159 };
160
161 loop {
162 let chunk = match self.chunks.get(&chunks.base_offset) {
164 Some(c) => c,
165 None if chunks.count == 0 => {
166 return Err(io::Error::new(
167 io::ErrorKind::InvalidInput,
168 "Invalid segment",
169 ))
170 }
171 None => break,
172 };
173
174 if chunks.relative_offset >= chunk.index.count() {
184 if chunks.base_offset == *self.base_offsets.last().unwrap() {
186 chunks.relative_offset -= 1;
187 break;
188 }
189
190 chunks.base_offset = chunk.index.base_offset() + chunk.index.count();
193 chunks.relative_offset = 0;
194 continue;
195 }
196
197 let read_size = size - chunks.size;
200 let (position, payload_size, count) =
201 chunk.index.readv(chunks.relative_offset, read_size)?;
202 chunks.relative_offset += count;
203 chunks.count += count;
204 chunks.size += payload_size;
205 chunks
206 .chunks
207 .push((chunks.base_offset, position, payload_size, count));
208 if chunks.size >= size {
209 chunks.relative_offset -= 1;
210 break;
211 }
212 }
213
214 Ok(chunks)
215 }
216
217 pub fn readv(
221 &mut self,
222 base_offset: u64,
223 relative_offset: u64,
224 size: u64,
225 ) -> io::Result<(u64, u64, u64, Vec<u8>)> {
226 let chunks = self.indexv(base_offset, relative_offset, size)?;
227
228 let mut out = vec![0; chunks.size as usize];
230 let mut start = 0;
231 for c in chunks.chunks {
232 let chunk = match self.chunks.get_mut(&c.0) {
233 Some(c) => c,
234 None => break,
235 };
236
237 let position = c.1;
238 let payload_size = c.2;
239 chunk
240 .segment
241 .read(position, &mut out[start..start + payload_size as usize])?;
242 start += payload_size as usize;
243 }
244
245 Ok((
246 chunks.base_offset,
247 chunks.relative_offset,
248 chunks.count,
249 out,
250 ))
251 }
252
253 pub fn close(&mut self, base_offset: u64) -> io::Result<()> {
254 if let Some(chunk) = self.chunks.get_mut(&base_offset) {
255 chunk.index.close()?;
256 chunk.segment.close()?;
257 }
258
259 Ok(())
260 }
261
262 pub fn remove(&mut self, base_offset: u64) -> io::Result<()> {
264 if let Some(mut chunk) = self.chunks.remove(&base_offset) {
265 chunk.segment.close()?;
266
267 let file: PathBuf = self.dir.clone();
268 let index_file_name = format!("{:020}.index", base_offset);
269 let segment_file_name = format!("{:020}.segment", base_offset);
270
271 fs::remove_file(file.join(index_file_name))?;
273 fs::remove_file(file.join(segment_file_name))?;
274 }
275
276 Ok(())
277 }
278
279 pub fn close_all(&mut self) -> io::Result<()> {
280 for (_, chunk) in self.chunks.iter_mut() {
281 chunk.index.close()?;
282 chunk.segment.close()?;
283 }
284
285 Ok(())
286 }
287
288 pub fn remove_all(&mut self) -> io::Result<()> {
289 self.close_all()?;
290 fs::remove_dir(&self.dir)?;
291
292 Ok(())
293 }
294}
295
296struct Chunks {
300 base_offset: u64,
301 relative_offset: u64,
302 count: u64,
303 size: u64,
304 chunks: Vec<(u64, u64, u64, u64)>,
305}
306
307#[cfg(test)]
308mod test {
309 use super::DiskLog;
310 use pretty_assertions::assert_eq;
311 use std::io;
312
313 #[test]
314 fn append_creates_and_deletes_segments_correctly() {
315 let dir = tempfile::tempdir().unwrap();
316 let dir = dir.path();
317
318 let record_count = 100;
319 let max_index_size = record_count * 16;
320 let mut log = DiskLog::new(dir, max_index_size, 10 * 1024, 10).unwrap();
321 let mut payload = vec![0u8; 1024];
322
323 for i in 0..200 {
326 payload[0] = i;
327 log.append(&payload).unwrap();
328 }
329
330 for i in 200..205 {
332 payload[0] = i;
333 log.append(&payload).unwrap();
334 }
335
336 let data = log.read(10, 0);
337 match data {
338 Err(e) if e.kind() == io::ErrorKind::InvalidInput => (),
339 _ => panic!("Expecting an invalid input error"),
340 };
341
342 let base_offset = 110;
344 for i in 0..10 {
345 let data = log.read(base_offset, i).unwrap();
346 let d = (base_offset + i) as u8;
347 assert_eq!(data[0], d);
348 }
349
350 let base_offset = 110;
352 for i in 0..10 {
353 let data = log.read(base_offset, i).unwrap();
354 let d = (base_offset + i) as u8;
355 assert_eq!(data[0], d);
356 }
357
358 let base_offset = 200;
360 for i in 0..5 {
361 let data = log.read(base_offset, i).unwrap();
362 let d = (base_offset + i) as u8;
363 assert_eq!(data[0], d);
364 }
365
366 let data = log.read(base_offset, 5);
367 match data {
368 Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => (),
369 _ => panic!("Expecting end of file error"),
370 };
371 }
372
373 #[test]
374 fn multi_segment_reads_work_as_expected() {
375 let dir = tempfile::tempdir().unwrap();
376 let dir = dir.path();
377
378 let record_count = 100;
380 let record_size = 1 * 1024;
381
382 let max_segment_size = 10 * 1024;
384 let max_index_size = record_count * 16;
385 let mut log = DiskLog::new(dir, max_index_size, max_segment_size, 100).unwrap();
386
387 let mut payload = vec![0u8; record_size];
391 for i in 0..record_count {
392 payload[0] = i as u8;
393 log.append(&payload).unwrap();
394 }
395
396 let base_offset = 0;
398 for i in 0..10 {
399 let data = log.read(base_offset, i).unwrap();
400 let d = (base_offset + i) as u8;
401 assert_eq!(data[0], d);
402 }
403 }
404
405 #[test]
406 fn vectored_read_works_as_expected() {
407 let dir = tempfile::tempdir().unwrap();
408 let dir = dir.path();
409
410 let record_count = 100;
411 let max_index_size = record_count * 16;
412 let mut log = DiskLog::new(dir, max_index_size, 10 * 1024, 10).unwrap();
413
414 let mut payload = vec![0u8; 1024];
418 for i in 0..90 {
419 payload[0] = i;
420 log.append(&payload).unwrap();
421 }
422
423 let (base_offset, relative_offset, count, data) = log.readv(0, 0, 50 * 1024).unwrap();
425 assert_eq!(base_offset, 40);
426 assert_eq!(relative_offset, 9);
427 assert_eq!(count, 50);
428
429 let total_size = data.len();
430 assert_eq!(total_size, 50 * 1024);
431
432 let data = log.read(50, 0).unwrap();
434 assert_eq!(data[0], 50);
435 }
436
437 #[test]
438 fn vectored_reads_in_different_boots_works_as_expected() {
439 let dir = tempfile::tempdir().unwrap();
440 let dir = dir.path();
441
442 let record_count = 100;
443 let max_index_size = record_count * 16;
444 let mut log = DiskLog::new(dir, max_index_size, 10 * 1024, 10).unwrap();
445
446 let mut payload: Vec<u8> = vec![0u8; 1024];
450 for i in 0..100 {
451 payload[0] = i;
452 log.append(&payload).unwrap();
453 }
454
455 log.close_all().unwrap();
456
457 let mut log = DiskLog::new(dir, max_index_size, 10 * 1024, 10).unwrap();
459 let (base_offset, relative_offset, count, data) = log.readv(0, 0, 50 * 1024).unwrap();
460 assert_eq!(base_offset, 40);
461 assert_eq!(relative_offset, 9);
462 assert_eq!(count, 50);
463
464 for i in 0..count {
465 let start = i as usize * 1024;
466 let end = start + 1024;
467 let record = &data[start..end];
468 assert_eq!(record[0], i as u8);
469 }
470
471 let total_size = data.len();
472 assert_eq!(total_size, 50 * 1024);
473
474 let data = log.read(50, 0).unwrap();
476 assert_eq!(data[0], 50);
477 }
478
479 #[test]
480 fn vectored_reads_on_unclosed_index_and_segment_works_as_expected() {
481 let dir = tempfile::tempdir().unwrap();
482 let dir = dir.path();
483
484 let record_count = 15;
486 let record_size = 1 * 1024;
487
488 let max_segment_size = 10 * 1024;
489 let max_index_size = record_count * 16;
490 let mut log = DiskLog::new(dir, max_index_size, max_segment_size, 100).unwrap();
491
492 let mut payload = vec![0u8; record_size];
494 for i in 0..record_count {
495 payload[0] = i as u8;
496 log.append(&payload).unwrap();
497 }
498
499 if let Ok(_l) = DiskLog::new(dir, max_index_size, max_segment_size, 100) {
502 panic!("Expecting a corrupted index error due to trailing zeros in the index")
503 }
504 }
505
506 #[test]
507 fn vectored_reads_crosses_boundary_correctly() {
508 let dir = tempfile::tempdir().unwrap();
509 let dir = dir.path();
510
511 let record_count = 100;
512 let max_index_size = record_count * 16;
513 let mut log = DiskLog::new(dir, max_index_size, 10 * 1024, 10).unwrap();
514
515 let mut payload = vec![0u8; 1024];
518 for i in 0..25 {
519 payload[0] = i;
520 log.append(&payload).unwrap();
521 }
522
523 let (base_offset, relative_offset, count, data) = log.readv(0, 0, 15 * 1024).unwrap();
525 assert_eq!(base_offset, 10);
526 assert_eq!(relative_offset, 4);
527 assert_eq!(count, 15);
528 assert_eq!(data.len(), 15 * 1024);
529
530 let (base_offset, relative_offset, count, data) = log
532 .readv(base_offset, relative_offset + 1, 15 * 1024)
533 .unwrap();
534 assert_eq!(base_offset, 20);
535 assert_eq!(relative_offset, 4);
536 assert_eq!(count, 10);
537 assert_eq!(data.len(), 10 * 1024);
538 }
539
540 #[test]
541 fn vectored_read_more_than_full_chomp_works_as_expected() {
542 let dir = tempfile::tempdir().unwrap();
543 let dir = dir.path();
544
545 let record_count = 100;
546 let max_index_size = record_count * 16;
547 let mut log = DiskLog::new(dir, max_index_size, 10 * 1024, 10).unwrap();
548
549 let mut payload = vec![0u8; 1024];
553 for i in 0..90 {
554 payload[0] = i;
555 log.append(&payload).unwrap();
556 }
557
558 let (base_offset, relative_offset, count, data) = log.readv(0, 0, 200 * 1024).unwrap();
560 assert_eq!(base_offset, 80);
561 assert_eq!(relative_offset, 9);
562 assert_eq!(count, 90);
563 assert_eq!(data.len(), 90 * 1024);
564 }
565}