disk_utils/wal/
mod.rs

1pub mod entries;
2pub mod iterator;
3pub mod record;
4pub mod redo_log;
5pub mod serializable;
6pub mod undo_log;
7
8use self::iterator::{BlockError, WalIterator};
9use self::record::{BLOCK_SIZE, Record, RecordType};
10
11use std::collections::HashSet;
12use std::fmt::Debug;
13use std::fs::File;
14use std::hash::Hash;
15use std::io;
16use std::io::Write;
17use std::result;
18
19use super::Serializable;
20
21pub trait LogData: Clone + PartialEq + Debug {
22    type Key: Clone + PartialEq + Eq + Debug + Hash + Serializable;
23    type Value: Clone + PartialEq + Debug + Serializable;
24}
25
26pub trait LogStore<Data: LogData> {
27    fn get(&self, key: &Data::Key) -> Option<Data::Value>;
28    fn remove(&mut self, key: &Data::Key);
29    fn update(&mut self, key: Data::Key, val: Data::Value);
30    fn flush(&mut self) -> io::Result<()>;
31    fn flush_change(&mut self, key: Data::Key, val: Data::Value) -> io::Result<()>;
32}
33
34#[derive(Debug)]
35pub enum LogError {
36    IoError(io::Error),
37    BlockError(BlockError),
38    SerializeError(SerializeError),
39}
40
41impl From<io::Error> for LogError {
42    fn from(err: io::Error) -> LogError {
43        LogError::IoError(err)
44    }
45}
46
47impl From<BlockError> for LogError {
48    fn from(err: BlockError) -> LogError {
49        LogError::BlockError(err)
50    }
51}
52
53impl From<SerializeError> for LogError {
54    fn from(err: SerializeError) -> LogError {
55        LogError::SerializeError(err)
56    }
57}
58
59pub type Result<T> = result::Result<T, LogError>;
60
61#[derive(PartialEq)]
62enum RecoverState {
63    /// No checkpoint entry found, read until end of log.
64    None,
65    /// Begin checkpoint entry found, read until the start entry
66    /// of every transaction in the checkpoint is read.
67    Begin(HashSet<u64>),
68    /// End checkpoint entry found, read until a begin
69    /// checkpoint entry is found.
70    End,
71}
72
73#[derive(Debug)]
74pub enum SerializeError {
75    IoError(io::Error),
76    InvalidTransfer(RecordType),
77    OutOfRecords,
78}
79
80impl From<io::Error> for SerializeError {
81    fn from(err: io::Error) -> SerializeError {
82        SerializeError::IoError(err)
83    }
84}
85
86#[derive(PartialEq)]
87enum SerializeState {
88    None,
89    First,
90    Middle,
91}
92
93pub type SerializeResult<T> = result::Result<T, SerializeError>;
94
95pub fn read_serializable<S: Serializable>(iter: &mut WalIterator) -> SerializeResult<S> {
96    let mut buf = Vec::new();
97    let mut state = SerializeState::None;
98    while let Some(mut record) = iter.next() {
99        match record.record_type {
100            RecordType::Zero | RecordType::Full => {
101                return Ok(S::deserialize(&mut &record.payload[..])?);
102            }
103            RecordType::First => {
104                if state != SerializeState::None {
105                    return Err(SerializeError::InvalidTransfer(RecordType::First));
106                }
107                state = SerializeState::First;
108                buf.append(&mut record.payload);
109            }
110            RecordType::Middle => {
111                if state != SerializeState::First && state != SerializeState::Middle {
112                    return Err(SerializeError::InvalidTransfer(RecordType::Middle));
113                }
114                state = SerializeState::Middle;
115                buf.append(&mut record.payload);
116            }
117            RecordType::Last => {
118                if state != SerializeState::Middle {
119                    return Err(SerializeError::InvalidTransfer(RecordType::Last));
120                }
121                buf.append(&mut record.payload);
122                return Ok(S::deserialize(&mut &buf[..])?);
123            }
124        }
125    }
126
127    Err(SerializeError::OutOfRecords)
128}
129
130pub fn read_serializable_backwards<S: Serializable>(iter: &mut WalIterator) -> SerializeResult<S> {
131    let mut buf = Vec::new();
132    let mut state = SerializeState::None;
133    while let Some(mut record) = iter.next_back() {
134        match record.record_type {
135            RecordType::Zero | RecordType::Full => {
136                return Ok(S::deserialize(&mut &record.payload[..])?);
137            }
138            RecordType::First => {
139                if state != SerializeState::Middle {
140                    return Err(SerializeError::InvalidTransfer(RecordType::First));
141                }
142                record.payload.reverse();
143                buf.append(&mut record.payload);
144                buf.reverse();
145                return Ok(S::deserialize(&mut &buf[..])?);
146            }
147            RecordType::Middle => {
148                if state != SerializeState::First && state != SerializeState::Middle {
149                    return Err(SerializeError::InvalidTransfer(RecordType::Middle));
150                }
151                state = SerializeState::Middle;
152                record.payload.reverse();
153                buf.append(&mut record.payload);
154            }
155            RecordType::Last => {
156                if state != SerializeState::None {
157                    return Err(SerializeError::InvalidTransfer(RecordType::Last));
158                }
159                state = SerializeState::First;
160                record.payload.reverse();
161                buf.append(&mut record.payload);
162            }
163        }
164    }
165
166    Err(SerializeError::OutOfRecords)
167}
168
169pub fn split_bytes_into_records(bytes: Vec<u8>, max_record_size: usize) -> io::Result<Vec<Record>> {
170    let mut records: Vec<_> = bytes.chunks(max_record_size)
171        .map(|bytes| Record::new(RecordType::Middle, bytes.to_vec()))
172        .collect();
173    if records.len() == 1 {
174        records.first_mut().unwrap().record_type = RecordType::Full;
175    } else if records.len() > 1 {
176        records.first_mut().unwrap().record_type = RecordType::First;
177        records.last_mut().unwrap().record_type = RecordType::Last;
178    } else {
179        records.push(Record::new(RecordType::Zero, vec![]));
180    }
181
182    Ok(records)
183}
184
185pub fn append_to_file(file: &mut File, record: &Record) -> io::Result<()> {
186    let file_len = file.metadata()?.len();
187    let curr_block_len = file_len - (file_len / BLOCK_SIZE as u64) * BLOCK_SIZE as u64;
188    if curr_block_len + record.payload.len() as u64 > BLOCK_SIZE as u64 {
189        let padding_len = BLOCK_SIZE as u64 - curr_block_len;
190        let padding = vec![0; padding_len as usize];
191        file.write(&padding[..])?;
192    }
193
194    record.write(file)?;
195    Ok(())
196}