segments 0.3.0

kafka inspired rumqtt's mqtt commitlog
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
mod segment;

use fnv::FnvHashMap;
use segment::Segment;
use std::fmt::Debug;
use std::mem;

/// Log is an inmemory commitlog (per topic) which splits data in segments.
/// It drops the oldest segment when retention policies are crossed.
/// Each segment is identified by base offset and a new segment is created
/// when ever current segment crosses disk limit
#[derive(Debug)]
pub struct MemoryLog<T> {
    /// First segment
    head: (u64, u64),
    /// Last segment
    tail: (u64, u64),
    /// Maximum size of a segment
    max_segment_size: usize,
    /// Maximum number of segments
    max_segments: usize,
    /// Current active chunk to append
    active_segment: Segment<T>,
    /// All the segments in a ringbuffer
    segments: FnvHashMap<u64, Segment<T>>,
}

impl<T: Debug + Clone> MemoryLog<T> {
    /// Create a new log
    pub fn new(max_segment_size: usize, max_segments: usize) -> MemoryLog<T> {
        if max_segment_size < 1024 {
            panic!("size should be at least 1KB")
        }

        MemoryLog {
            head: (0, 0),
            tail: (0, 0),
            max_segment_size,
            max_segments,
            segments: FnvHashMap::default(),
            active_segment: Segment::new(0),
        }
    }

    pub fn head_and_tail(&self) -> (u64, u64) {
        (self.head.0, self.tail.0)
    }

    /// Appends this record to the tail and returns the offset of this append.
    /// When the current segment is full, this also create a new segment and
    /// writes the record to it.
    /// This function also handles retention by removing head segment
    pub fn append(&mut self, size: usize, record: T) -> (u64, u64) {
        let switch = self.apply_retention();
        let segment_id = self.tail.0;
        let offset = self.active_segment.append(record, size);

        // For debugging during flux. Will be removed later
        if switch {
            // println!("swch. segment = {}, next_offset = {}", segment_id, offset);
        }

        (segment_id, offset)
    }

    fn apply_retention(&mut self) -> bool {
        if self.active_segment.size() >= self.max_segment_size {
            let next_offset = self.active_segment.base_offset() + self.active_segment.len() as u64;
            let last_active = mem::replace(&mut self.active_segment, Segment::new(next_offset));
            self.segments.insert(self.tail.0, last_active);

            // Next tail
            self.tail.0 += 1;
            self.tail.1 = next_offset;

            // if backlog + active segment count is greater than max segments,
            // delete first segment and update head
            if self.segments.len() + 1 > self.max_segments {
                if let Some(segment) = self.segments.remove(&self.head.0) {
                    let next_offset = segment.base_offset() + segment.len() as u64;

                    // Next head
                    self.head.0 += 1;
                    self.head.1 = next_offset;
                }
            }

            return true;
        }

        false
    }

    pub fn next_offset(&self) -> (u64, u64) {
        let segment_id = self.tail.0;
        let next_offset = self.active_segment.base_offset() + self.active_segment.len() as u64;
        (segment_id, next_offset)
    }

    /// Read a record from correct segment
    pub fn read(&mut self, cursor: (u64, u64)) -> Option<T> {
        if cursor.0 == self.tail.0 {
            return self.active_segment.read(cursor.1);
        }

        match self.segments.get(&cursor.0) {
            Some(segment) => segment.read(cursor.1),
            None => None,
        }
    }

    /// Reads multiple packets from the disk and returns base offset and
    /// offset of the next log.
    /// When data of deleted segment is asked, returns data of the current head
    /// **Note**: segment id is used to be able to pull directly from correct segment
    /// **Note**: This method also returns full segment data when requested
    /// data is not of active segment. Set your max_segment size keeping tail
    /// latencies of all the concurrent connections mind
    /// (some runtimes support internal preemption using await points)
    pub fn readv(&mut self, cursor: (u64, u64), out: &mut Vec<T>) -> Option<(u64, u64)> {
        let mut progress = cursor;

        // jump to head if the caller is trying to read deleted segment
        if progress.0 < self.head.0 {
            warn!("Trying to read a deleted segment. Jumping");
            progress = self.head;
        }

        // Return if user is trying read outside the tail.
        if progress.0 > self.tail.0 {
            warn!("Trying to read outside tail");
            return None;
        }

        // read from active segment if base offset matches active segment's base offset
        if progress.0 == self.tail.0 {
            let count = self.active_segment.readv(progress.1, out);
            if count == 0 {
                return None;
            }

            progress.1 += count as u64;
            return Some(progress);
        }

        loop {
            // Read from backlog segments
            match self.segments.get(&progress.0) {
                Some(segment) => {
                    let count = segment.readv(progress.1, out);
                    if count > 0 {
                        progress.1 += count as u64;
                        return Some(progress);
                    }
                    // Jump to the next segment if the above readv returns 0 elements
                    // because of just being at the edge before next segment got added.
                    // We are also assuming that user is not calling with random cursor.
                    // E.g if segment 1's read (1, 10) returns (1, 20) as next offset,
                    // (1, 19) being the edge of segment 1, we are assuming next segment
                    // starts at (2, 20). If user asks for (1, 100) for random reasons,
                    // we are screwed
                    progress.0 += 1;

                    // This jump is necessary because, readv should always return data
                    // if there is data. Or else router registers this for notification
                    // even though there is data (which might cause a block)
                    continue;
                }
                None if progress.0 == self.tail.0 => {
                    break;
                }
                None => {
                    // Same as above, we are assuming user is either asking for valid
                    // cursor of the segment or edge cursor. If segment 1 has offsets
                    // 10 to 19, valid cursors are (1, 10) or (1, 20), (1, 20) being
                    // the not existent edge to jump to next segment to read from (2, 20).
                    // User reading from invalid cursor like (1, 21) is is invalid
                    // behavior as jump points to (2, 21)
                    progress.0 += 1;
                    continue;
                }
            };
        }

        let count = self.active_segment.readv(progress.1, out);
        if count == 0 {
            return None;
        }

        progress.1 += count as u64;
        return Some(progress);
    }
}

#[cfg(test)]
mod test {
    use super::MemoryLog;
    use pretty_assertions::assert_eq;

    #[test]
    fn append_creates_and_deletes_segments_correctly() {
        let mut log = MemoryLog::new(10 * 1024, 10);

        // 200 1K iterations. 10 1K records per file. 20 files ignoring deletes.
        // segments: 0.segment (0 - 9), 1.segment (10 -19) .... 19.segment (190 - 200)
        // considering deletes: 10.segment, 11.segment .. 19.segment
        for i in 0..200 {
            let payload = vec![i; 1024];
            log.append(payload.len(), payload);
        }

        // Semi fill 200.segment. Deletes 100.segment
        // considering deletes: 110.segment .. 190.segment
        for i in 200..205 {
            let payload = vec![i; 1024];
            log.append(payload.len(), payload);
        }

        let data = log.read((9, 0));
        assert!(data.is_none());

        // considering: 10.segment (100-109) .. 19.segment (190-199)
        // read segment with base offset 11
        let segment_id = 11;
        let base_offset = 110;
        for i in 0..10 {
            let data = log.read((segment_id, base_offset + i)).unwrap();
            let d = base_offset as u8 + i as u8;
            assert_eq!(data[0], d);
        }

        // read segment with base offset 190 (1 last segment before
        // semi filled segment)
        let segment_id = 19;
        let base_offset = 190;
        for i in 0..10 {
            let data = log.read((segment_id, base_offset + i)).unwrap();
            let d = base_offset as u8 + i as u8;
            assert_eq!(data[0], d);
        }

        // read 200.segment which is semi filled with 5 records
        let segment_id = 20;
        let base_offset = 200;
        for i in 0..5 {
            let data = log.read((segment_id, base_offset + i)).unwrap();
            let d = base_offset as u8 + i as u8;
            assert_eq!(data[0], d);
        }

        let data = log.read((20, base_offset + 5));
        assert!(data.is_none());
    }

    #[test]
    fn vectored_read_works_as_expected() {
        let mut log = MemoryLog::new(10 * 1024, 10);

        // 90 1K iterations. 10 files
        // 0.segment (data with 0 - 9), 1.segment (10 - 19) .... 8.segment (80 - 89)
        // 10K per segment = 10 records per segment
        for i in 0..90 {
            let payload = vec![i; 1024];
            log.append(payload.len(), payload);
        }

        let mut data = Vec::new();
        // Read a segment from start. This returns full segment
        let next = log.readv((0, 0), &mut data).unwrap();
        assert_eq!(data.len(), 10);
        assert_eq!(next, (0, 10));
        assert_eq!(data[0][0], 0);
        assert_eq!(data[data.len() - 1][0], 9);

        // Read 5.segment
        let data = log.read((5, 50)).unwrap();
        assert_eq!(data[0], 50);

        // Read a segment from the middle. This returns all the remaining elements
        let mut data = Vec::new();
        let next = log.readv((1, 15), &mut data).unwrap();
        assert_eq!(data.len(), 5);
        assert_eq!(next, (1, 20));
        assert_eq!(data[0][0], 15);
        assert_eq!(data[data.len() - 1][0], 19);

        // Read a segment from scratch. gets full segment
        let mut data = Vec::new();
        let next = log.readv((1, 10), &mut data).unwrap();
        assert_eq!(data.len(), 10);
        assert_eq!(next, (1, 20));
    }

    #[test]
    fn vectored_read_works_as_expected_2() {
        let mut log = MemoryLog::new(10 * 1024, 100);

        // 15 1K iterations. 1.5 files
        // 0.segment (data with 0 - 9), 1.segment (10 - 15)
        for i in 0..15 {
            let payload = vec![i; 1024];
            log.append(payload.len(), payload);
        }

        let mut data = Vec::new();

        // Read a segment from start. This returns full segment
        let next = log.readv((0, 0), &mut data).unwrap();
        assert_eq!(next, (0, 10));

        let next = log.readv(next, &mut data).unwrap();
        assert_eq!(next, (1, 15));

        // Write again
        for i in 15..50 {
            let payload = vec![i; 1024];
            log.append(payload.len(), payload);
        }

        let next = log.readv(next, &mut data).unwrap();
        assert_eq!(next, (1, 20));

        let next = log.readv(next, &mut data).unwrap();
        assert_eq!(next, (2, 30));

        let next = log.readv(next, &mut data).unwrap();
        assert_eq!(next, (3, 40));

        let next = log.readv(next, &mut data).unwrap();
        assert_eq!(next, (4, 50));

        let next = log.readv(next, &mut data);
        assert!(next.is_none());
    }

    #[test]
    fn vectored_reads_from_active_segment_works_as_expected() {
        let mut log = MemoryLog::new(10 * 1024, 10);

        // 200 1K iterations. 10 1K records per file. 20 files ignoring deletes.
        // segments: 0.segment, 1.segment .... 19.segment
        // considering deletes: 10.segment .. 19.segment
        for i in 0..200 {
            let payload = vec![i; 1024];
            log.append(payload.len(), payload);
        }

        // Read active segment. Next shouldn't jump to next segment
        let mut data = Vec::new();
        let next = log.readv((19, 190), &mut data);
        assert_eq!(data.len(), 10);
        assert_eq!(next, Some((19, 200)));
    }

    #[test]
    fn vectored_reads_from_active_segment_resumes_after_empty_reads_correctly() {
        let mut log = MemoryLog::new(10 * 1024, 10);

        // 85 1K iterations. 10 files
        // 0.segment (data with 0 - 9), 1.segment (10 - 19) .... 8.segment (80 - 84)
        // 10 records per segment (1K each)
        // 8.segment is semi filled
        for i in 0..85 {
            let payload = vec![i; 1024];
            log.append(payload.len(), payload);
        }

        // read active segment
        let mut data = Vec::new();
        let next = log.readv((8, 80), &mut data);
        assert_eq!(data.len(), 5);
        assert_eq!(next, Some((8, 85)));

        // fill active segment more
        for i in 85..90 {
            let payload = vec![i; 1024];
            log.append(payload.len(), payload);
        }

        // read active segment
        let mut data = Vec::new();
        let next = log.readv(next.unwrap(), &mut data);
        assert_eq!(data.len(), 5);
        assert_eq!(next, Some((8, 90)));

        let mut data = Vec::new();
        let next = log.readv(next.unwrap(), &mut data);
        assert_eq!(data.len(), 0);
        assert!(next.is_none());
    }

    #[test]
    fn last_active_segment_read_jumps_to_next_segment_read_correctly() {
        let mut log = MemoryLog::new(10 * 1024, 10);

        // 90 1K iterations. 9 files ignoring deletes.
        // 0.segment (data with 0 - 9), 1.segment .... 8.segment (80 - 89)
        // 10K per segment = 10 records per segment
        for i in 0..90 {
            let payload = vec![i; 1024];
            log.append(payload.len(), payload);
        }

        // read active segment. there's no next segment. so active segment
        // is not done yet
        let mut data = Vec::new();
        let next = log.readv((8, 80), &mut data).unwrap();
        assert_eq!(data.len(), 10);
        assert_eq!(next, (8, 90));

        // Append more which also changes active segment to 100.segment
        for i in 90..110 {
            let payload = vec![i; 1024];
            log.append(payload.len(), payload);
        }

        // Read from the next offset of previous active segment
        let mut data = Vec::new();
        let next = log.readv(next, &mut data).unwrap();
        assert_eq!(data.len(), 10);
        assert_eq!(next, (9, 100));

        // read active segment again
        let mut data = Vec::new();
        let next = log.readv(next, &mut data).unwrap();
        assert_eq!(data.len(), 10);
        assert_eq!(next, (10, 110));

        // read again when there is no more data
        let mut data = Vec::new();
        let next = log.readv(next, &mut data);
        assert_eq!(data.len(), 0);
        assert!(next.is_none());
    }

    #[test]
    fn vectored_read_iterate_through_all_the_segments() {
        let mut log = MemoryLog::new(10 * 1024, 100);

        for i in 0..100 {
            let payload = vec![i; 1024];
            log.append(payload.len(), payload);
        }

        let mut data = Vec::new();
        let mut cursor = (0, 0);
        let mut i = 0;
        while let Some(c) = log.readv(cursor, &mut data) {
            assert_eq!(c.0, i);
            println!("progress = {:?}", c);
            cursor = c;
            i += 1;
        }
    }
}