1use config::Config;
2use segment::Segment;
3use std::ffi::OsStr;
4use std::fs;
5use std::io::{Error, ErrorKind, Result};
6use std::path::{Path, PathBuf};
7
8mod cursor;
9use self::cursor::Cursor;
10
11#[cfg(test)]
12mod tests;
13
14pub struct WAL {
16 cfg: Config,
17 dir: PathBuf,
18 cursor: Cursor,
19
20 next_sequence: u64,
21
22 segments: Vec<Segment>,
23}
24
25impl WAL {
26 pub fn open<S: AsRef<OsStr> + ?Sized>(dir: &S, cfg: Config) -> Result<WAL> {
28 let p = Path::new(dir);
29 if !p.exists() {
30 fs::create_dir_all(&p)?;
31 }
32
33 if !p.is_dir() {
34 return Err(Error::new(ErrorKind::Other, "expecting a directory"));
35 }
36
37 let dir = p.to_path_buf();
38
39 let mut cursor = Cursor::open(&dir)?;
40
41 let mut read_sequence = cursor.position.sequence;
42 let mut segments: Vec<Segment> = Vec::with_capacity(10);
43 loop {
44 match Segment::open(&dir, read_sequence, cfg.entry_per_segment, false) {
45 Ok(s) => segments.push(s),
46 Err(ref e) if e.kind() == ErrorKind::NotFound => break,
47 Err(e) => return Err(e),
48 }
49
50 read_sequence += 1;
51 }
52
53 match segments.first() {
54 Some(s) => if s.len() < cursor.position.read as usize {
55 cursor.position.read = 0;
56 },
57 None => {
58 cursor.position.sequence = 0;
59 cursor.position.read = 0;
60 }
61 }
62
63 Ok(WAL {
64 cfg: cfg,
65 dir: dir,
66 cursor: cursor,
67 next_sequence: read_sequence,
68 segments: segments,
69 })
70 }
71
72 pub fn write(&mut self, data: &[u8]) -> Result<()> {
74 self.try_allocate(1)?;
75 let segment = self.segments.last_mut().unwrap();
76 segment.write(data)?;
77
78 Ok(())
79 }
80
81 pub fn batch_write(&mut self, mut data: &[&[u8]]) -> Result<()> {
83 while !data.is_empty() {
84 let space = self.try_allocate(data.len())?;
85
86 let segment = self.segments.last_mut().unwrap();
87 let written = segment.batch_write(&data[0..space])?;
88 data = &data[written..];
89 }
90
91 Ok(())
92 }
93
94 fn try_allocate(&mut self, n: usize) -> Result<(usize)> {
95 match self.segments.last_mut() {
96 Some(ref s) if s.space() > 0 => {
97 let space = s.space();
98
99 return if space > n { Ok(n) } else { Ok(space) };
100 }
101 Some(ss) => {
102 let _ = ss.flush();
103 }
104 None => {}
105 }
106
107 let new_seg = Segment::open(
108 &self.dir,
109 self.next_sequence,
110 self.cfg.entry_per_segment,
111 true,
112 )?;
113 let space = new_seg.space();
114 self.next_sequence += 1;
115 self.segments.push(new_seg);
116
117 if space > n {
118 Ok(n)
119 } else {
120 Ok(space)
121 }
122 }
123
124 pub fn read(&mut self, mut n: usize) -> Result<Vec<Vec<u8>>> {
126 let mut result: Vec<Vec<u8>> = Vec::with_capacity(n);
127
128 let mut seg_read: usize = 0;
129 let mut seg_finished: usize = 0;
130 let start_pos = self.cursor.position.clone();
131 let mut start: usize = self.cursor.position.read as usize;
132
133 while n > 0 {
134 let segment = match self.segments.get(seg_read) {
135 Some(s) if s.len() > start => s,
136 Some(_) => {
137 seg_read += 1;
138 seg_finished += 1;
139 start = 0;
140 continue;
141 }
142 None => break,
143 };
144
145 let read = segment.read_into(start, n, &mut result, self.cfg.check_crc32)?;
146 start += read;
147 n -= read;
148 self.cursor.position.sequence = segment.sequence();
149 self.cursor.position.read = start as u64;
150
151 if n > 0 {
152 if segment.space() > 0 {
153 break;
154 }
155
156 start = 0;
157 seg_read += 1;
158 seg_finished += 1;
159 }
160 }
161
162 if seg_finished > 0 {
163 for _ in 0..seg_finished {
164 self.segments.remove(0).destory();
165 }
166 }
167
168 if self.cursor.position != start_pos {
169 let _ = self.cursor.save();
170 }
171
172 Ok(result)
173 }
174
175 pub fn len(&self) -> usize {
177 let mut size: usize = 0;
178
179 for segment in &self.segments {
180 let num = if segment.sequence() == self.cursor.position.sequence {
181 segment.len() - self.cursor.position.read as usize
182 } else {
183 segment.len()
184 };
185
186 size += num;
187 }
188
189 size
190 }
191}