1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
use crate::errors::Errors;
use crate::options::DharmaOpts;
use crate::traits::{ResourceKey, ResourceValue};
use serde::{Deserialize, Serialize};
use std::cmp::Ordering;
use std::fs::File;
use std::io::Write;

#[derive(Serialize, Deserialize, Clone)]
pub struct Value<K, V> {
    pub key: K,
    pub value: V,
}

impl<K, V> PartialEq for Value<K, V>
where
    K: ResourceKey,
    V: ResourceValue,
{
    fn eq(&self, other: &Self) -> bool {
        self.key == other.key
    }
}

impl<K, V> Value<K, V>
where
    K: ResourceKey,
    V: ResourceValue,
{
    pub fn new(key: K, value: V) -> Value<K, V> {
        Value { key, value }
    }
}

#[derive(Copy, Clone)]
pub enum RecordType {
    PADDING = 0,
    COMPLETE = 1,
    START = 2,
    MIDDLE = 3,
    END = 4,
    UNKNOWN = 5,
}

/// Map a unsigned byte to a Record Type.
pub fn to_record_type(val: u8) -> RecordType {
    return match val {
        0 => RecordType::PADDING,
        1 => RecordType::COMPLETE,
        2 => RecordType::START,
        3 => RecordType::MIDDLE,
        4 => RecordType::END,
        _ => RecordType::UNKNOWN,
    };
}

/// A Record represents the key, value and some metadata persisted to disk.
/// Records are written to disk as
///
/// | type (1 byte )| size (2 bytes) | data - array of u8 of length size |
///
/// The maximum size of a record is specified in `option.block_size_in_bytes`.
/// The maximum size of a record is limited to 32KB since that is the maximum
/// addressable memory with 2 bytes.
pub struct Record {
    // 1 bytes for record type
    pub record_type: RecordType,
    // 2 bytes for size
    pub data_size_in_bytes: u16,
    // can hold up to 32 kilobytes of data
    pub data: Vec<u8>,
}

impl Record {
    /// The base size in bytes required to store metadata associated with Record like
    /// record type and size.
    pub const RECORD_BASE_SIZE_IN_BYTES: usize = 3;

    /// Create a record that will be used to pad leftover space
    /// within a block. Padding records don't contain any data.
    pub fn with_padding(size: u16) -> Record {
        Record {
            record_type: RecordType::PADDING,
            data_size_in_bytes: size,
            data: Vec::new(),
        }
    }
}

/// A Block is the smallest unit of memory that is read from disk.
/// Blocks are packed together to form SSTables which
/// contain data stored in the database.
/// Each block is composed of as many records as can fit in the block. If a record doesn't
/// fit into a block then it is split across multiple blocks.
pub struct Block {
    pub records: Vec<Record>,
}

impl Block {
    pub fn new() -> Block {
        Block {
            records: Vec::new(),
        }
    }

    pub fn add(&mut self, record: Record) {
        self.records.push(record);
    }
}

pub fn create_blocks<K: ResourceKey, V: ResourceValue>(
    options: &DharmaOpts,
    values: &Vec<Value<K, V>>,
    block_vec: &mut Vec<Block>,
) {
    let mut current_block = Block::new();
    let mut available_memory_in_bytes = options.block_size_in_bytes;
    let mut i = 0;
    while i < values.len() {
        let val = &values[i];
        // TODO: add logging and handle encoding error
        let encoded = bincode::serialize(val).unwrap();
        // encoded is an array of 8 bit integers (u8)
        // each value in the array takes a byte of memory
        // therefore size of array in bytes is the size of this record in bytes
        let record_size = encoded.len();
        // each record needs at has a base size to hold
        let required_record_size = Record::RECORD_BASE_SIZE_IN_BYTES + record_size;
        match available_memory_in_bytes.cmp(&required_record_size) {
            // record will be broken into chunks
            Ordering::Less => {
                // decoder should skip reading memory in block
                // if leftover data is less than Record::RECORD_BASE_SIZE_IN_BYTES
                let mut record_offset = 0;
                if available_memory_in_bytes > Record::RECORD_BASE_SIZE_IN_BYTES {
                    // flag specifying whether we are processing the first chunk of record
                    let mut is_first_chunk = true;
                    // records are broken into chunks
                    // in each iteration of this loop we process one chunk
                    while available_memory_in_bytes > Record::RECORD_BASE_SIZE_IN_BYTES {
                        available_memory_in_bytes -= Record::RECORD_BASE_SIZE_IN_BYTES;
                        let mut record_type = RecordType::START;
                        if !is_first_chunk {
                            record_type = RecordType::MIDDLE;
                        }
                        let mut record_offset_end = record_offset + available_memory_in_bytes;
                        if record_offset_end >= record_size {
                            record_offset_end = record_size;
                            record_type = RecordType::END;
                        }
                        let mut data_chunk: Vec<u8> = Vec::new();
                        for i in record_offset..record_offset_end {
                            data_chunk.push(encoded[i]);
                        }
                        let processed_memory_in_bytes = record_offset_end - record_offset;
                        record_offset = record_offset_end;
                        let record = Record {
                            record_type,
                            data_size_in_bytes: data_chunk.len() as u16,
                            data: data_chunk,
                        };
                        current_block.add(record);
                        // depending on record type determine whether new block has to be created
                        match record_type {
                            RecordType::END => {
                                // we may not have exhausted all the space in the block
                                available_memory_in_bytes -= processed_memory_in_bytes;
                                // if we have exhausted all space then create a new block
                                if available_memory_in_bytes == 0 {
                                    block_vec.push(current_block);
                                    current_block = Block::new();
                                    available_memory_in_bytes = options.block_size_in_bytes;
                                }
                                // break since we have finished processing this value
                                i += 1;
                                break;
                            }
                            // for start and middle blocks all space has been exhausted
                            _ => {
                                block_vec.push(current_block);
                                current_block = Block::new();
                                available_memory_in_bytes = options.block_size_in_bytes;
                                is_first_chunk = false;
                            }
                        }
                    }
                } else {
                    // bytes to pad is available space minus 1 byte to store record type(PADDING)
                    let bytes_to_pad = available_memory_in_bytes - 1;
                    let padding = Record::with_padding(bytes_to_pad as u16);
                    current_block.add(padding);
                    // create new block
                    block_vec.push(current_block);
                    current_block = Block::new();
                    available_memory_in_bytes = options.block_size_in_bytes;
                }
            }
            Ordering::Equal => {
                let record = Record {
                    record_type: RecordType::COMPLETE,
                    data_size_in_bytes: record_size as u16,
                    data: encoded,
                };
                current_block.add(record);
                block_vec.push(current_block);
                current_block = Block::new();
                available_memory_in_bytes = options.block_size_in_bytes;
                i += 1;
            }
            Ordering::Greater => {
                let record = Record {
                    record_type: RecordType::COMPLETE,
                    data_size_in_bytes: record_size as u16,
                    data: encoded,
                };
                current_block.add(record);
                available_memory_in_bytes -= required_record_size;
                i += 1;
            }
        }
    }
    // blocks are added to the block list if they have no space left in them
    // and a new block with no records committed is created
    // if the current block has records in it then it represents a block
    // that is not full and hasn't been added to the block list
    if current_block.records.len() > 0 {
        block_vec.push(current_block);
    }
}

pub fn write_block_to_disk(
    options: &DharmaOpts,
    file_handle: &mut File,
    block: &Block,
) -> Result<(), Errors> {
    let mut written_size_in_bytes = 0;
    for record in &block.records {
        match record.record_type {
            RecordType::PADDING => {
                // write record type byte
                let type_bytes: [u8; 1] = 0_u8.to_be_bytes();
                let size_bytes: [u8; 2] = record.data_size_in_bytes.to_be_bytes();
                let final_bytes = [type_bytes[0], size_bytes[0], size_bytes[1]];
                let mut padding_bytes: Vec<u8> =
                    Vec::with_capacity(record.data_size_in_bytes as usize);
                for i in 0..record.data_size_in_bytes {
                    padding_bytes.push(0u8);
                }
                file_handle.write(&final_bytes);
                file_handle.write(padding_bytes.as_slice());
                written_size_in_bytes += final_bytes.len() + padding_bytes.len();
            }
            _ => {
                let record_type = record.record_type as u8;
                let type_bytes: [u8; 1] = record_type.to_be_bytes();
                let size_bytes: [u8; 2] = record.data_size_in_bytes.to_be_bytes();
                let data_bytes: &[u8] = &record.data;
                file_handle.write(&type_bytes);
                file_handle.write(&size_bytes);
                written_size_in_bytes += 3;
                file_handle.write(data_bytes);
                written_size_in_bytes += data_bytes.len();
            }
        }
    }
    // add an extra padding record if the block has space
    if written_size_in_bytes < options.block_size_in_bytes as usize {
        let mut available_space_in_bytes = options.block_size_in_bytes - written_size_in_bytes;
        if available_space_in_bytes > Record::RECORD_BASE_SIZE_IN_BYTES {
            // subtract one byte for record type specifier
            available_space_in_bytes -= Record::RECORD_BASE_SIZE_IN_BYTES;
            let type_bytes: [u8; 1] = 0_i8.to_be_bytes();
            let size_bytes = (available_space_in_bytes as u16).to_be_bytes();
            let mut padding: Vec<u8> = Vec::with_capacity(available_space_in_bytes as usize);
            for _ in 0..available_space_in_bytes {
                padding.push(0u8);
            }
            // TODO: merge these file system writes into a single call and benchmark performance
            file_handle.write(&type_bytes);
            file_handle.write(&size_bytes);
            file_handle.write(padding.as_slice());
        } else {
            let mut padding: Vec<u8> = Vec::with_capacity(available_space_in_bytes as usize);
            for _ in 0..available_space_in_bytes {
                padding.push(0u8);
            }
            file_handle.write(padding.as_slice());
            available_space_in_bytes = 0;
        }
    }
    Ok(())
}