1pub mod entries;
2pub mod iterator;
3pub mod record;
4pub mod redo_log;
5pub mod serializable;
6pub mod undo_log;
7
8use self::iterator::{BlockError, WalIterator};
9use self::record::{BLOCK_SIZE, Record, RecordType};
10
11use std::collections::HashSet;
12use std::fmt::Debug;
13use std::fs::File;
14use std::hash::Hash;
15use std::io;
16use std::io::Write;
17use std::result;
18
19use super::Serializable;
20
21pub trait LogData: Clone + PartialEq + Debug {
22 type Key: Clone + PartialEq + Eq + Debug + Hash + Serializable;
23 type Value: Clone + PartialEq + Debug + Serializable;
24}
25
26pub trait LogStore<Data: LogData> {
27 fn get(&self, key: &Data::Key) -> Option<Data::Value>;
28 fn remove(&mut self, key: &Data::Key);
29 fn update(&mut self, key: Data::Key, val: Data::Value);
30 fn flush(&mut self) -> io::Result<()>;
31 fn flush_change(&mut self, key: Data::Key, val: Data::Value) -> io::Result<()>;
32}
33
34#[derive(Debug)]
35pub enum LogError {
36 IoError(io::Error),
37 BlockError(BlockError),
38 SerializeError(SerializeError),
39}
40
41impl From<io::Error> for LogError {
42 fn from(err: io::Error) -> LogError {
43 LogError::IoError(err)
44 }
45}
46
47impl From<BlockError> for LogError {
48 fn from(err: BlockError) -> LogError {
49 LogError::BlockError(err)
50 }
51}
52
53impl From<SerializeError> for LogError {
54 fn from(err: SerializeError) -> LogError {
55 LogError::SerializeError(err)
56 }
57}
58
59pub type Result<T> = result::Result<T, LogError>;
60
61#[derive(PartialEq)]
62enum RecoverState {
63 None,
65 Begin(HashSet<u64>),
68 End,
71}
72
73#[derive(Debug)]
74pub enum SerializeError {
75 IoError(io::Error),
76 InvalidTransfer(RecordType),
77 OutOfRecords,
78}
79
80impl From<io::Error> for SerializeError {
81 fn from(err: io::Error) -> SerializeError {
82 SerializeError::IoError(err)
83 }
84}
85
86#[derive(PartialEq)]
87enum SerializeState {
88 None,
89 First,
90 Middle,
91}
92
93pub type SerializeResult<T> = result::Result<T, SerializeError>;
94
95pub fn read_serializable<S: Serializable>(iter: &mut WalIterator) -> SerializeResult<S> {
96 let mut buf = Vec::new();
97 let mut state = SerializeState::None;
98 while let Some(mut record) = iter.next() {
99 match record.record_type {
100 RecordType::Zero | RecordType::Full => {
101 return Ok(S::deserialize(&mut &record.payload[..])?);
102 }
103 RecordType::First => {
104 if state != SerializeState::None {
105 return Err(SerializeError::InvalidTransfer(RecordType::First));
106 }
107 state = SerializeState::First;
108 buf.append(&mut record.payload);
109 }
110 RecordType::Middle => {
111 if state != SerializeState::First && state != SerializeState::Middle {
112 return Err(SerializeError::InvalidTransfer(RecordType::Middle));
113 }
114 state = SerializeState::Middle;
115 buf.append(&mut record.payload);
116 }
117 RecordType::Last => {
118 if state != SerializeState::Middle {
119 return Err(SerializeError::InvalidTransfer(RecordType::Last));
120 }
121 buf.append(&mut record.payload);
122 return Ok(S::deserialize(&mut &buf[..])?);
123 }
124 }
125 }
126
127 Err(SerializeError::OutOfRecords)
128}
129
130pub fn read_serializable_backwards<S: Serializable>(iter: &mut WalIterator) -> SerializeResult<S> {
131 let mut buf = Vec::new();
132 let mut state = SerializeState::None;
133 while let Some(mut record) = iter.next_back() {
134 match record.record_type {
135 RecordType::Zero | RecordType::Full => {
136 return Ok(S::deserialize(&mut &record.payload[..])?);
137 }
138 RecordType::First => {
139 if state != SerializeState::Middle {
140 return Err(SerializeError::InvalidTransfer(RecordType::First));
141 }
142 record.payload.reverse();
143 buf.append(&mut record.payload);
144 buf.reverse();
145 return Ok(S::deserialize(&mut &buf[..])?);
146 }
147 RecordType::Middle => {
148 if state != SerializeState::First && state != SerializeState::Middle {
149 return Err(SerializeError::InvalidTransfer(RecordType::Middle));
150 }
151 state = SerializeState::Middle;
152 record.payload.reverse();
153 buf.append(&mut record.payload);
154 }
155 RecordType::Last => {
156 if state != SerializeState::None {
157 return Err(SerializeError::InvalidTransfer(RecordType::Last));
158 }
159 state = SerializeState::First;
160 record.payload.reverse();
161 buf.append(&mut record.payload);
162 }
163 }
164 }
165
166 Err(SerializeError::OutOfRecords)
167}
168
169pub fn split_bytes_into_records(bytes: Vec<u8>, max_record_size: usize) -> io::Result<Vec<Record>> {
170 let mut records: Vec<_> = bytes.chunks(max_record_size)
171 .map(|bytes| Record::new(RecordType::Middle, bytes.to_vec()))
172 .collect();
173 if records.len() == 1 {
174 records.first_mut().unwrap().record_type = RecordType::Full;
175 } else if records.len() > 1 {
176 records.first_mut().unwrap().record_type = RecordType::First;
177 records.last_mut().unwrap().record_type = RecordType::Last;
178 } else {
179 records.push(Record::new(RecordType::Zero, vec![]));
180 }
181
182 Ok(records)
183}
184
185pub fn append_to_file(file: &mut File, record: &Record) -> io::Result<()> {
186 let file_len = file.metadata()?.len();
187 let curr_block_len = file_len - (file_len / BLOCK_SIZE as u64) * BLOCK_SIZE as u64;
188 if curr_block_len + record.payload.len() as u64 > BLOCK_SIZE as u64 {
189 let padding_len = BLOCK_SIZE as u64 - curr_block_len;
190 let padding = vec![0; padding_len as usize];
191 file.write(&padding[..])?;
192 }
193
194 record.write(file)?;
195 Ok(())
196}