wal_rs/wal/
mod.rs

1use config::Config;
2use segment::Segment;
3use std::ffi::OsStr;
4use std::fs;
5use std::io::{Error, ErrorKind, Result};
6use std::path::{Path, PathBuf};
7
8mod cursor;
9use self::cursor::Cursor;
10
11#[cfg(test)]
12mod tests;
13
14/// WAL write-ahead-log implementation
15pub struct WAL {
16    cfg: Config,
17    dir: PathBuf,
18    cursor: Cursor,
19
20    next_sequence: u64,
21
22    segments: Vec<Segment>,
23}
24
25impl WAL {
26    /// Opens a wal with given dir.
27    pub fn open<S: AsRef<OsStr> + ?Sized>(dir: &S, cfg: Config) -> Result<WAL> {
28        let p = Path::new(dir);
29        if !p.exists() {
30            fs::create_dir_all(&p)?;
31        }
32
33        if !p.is_dir() {
34            return Err(Error::new(ErrorKind::Other, "expecting a directory"));
35        }
36
37        let dir = p.to_path_buf();
38
39        let mut cursor = Cursor::open(&dir)?;
40
41        let mut read_sequence = cursor.position.sequence;
42        let mut segments: Vec<Segment> = Vec::with_capacity(10);
43        loop {
44            match Segment::open(&dir, read_sequence, cfg.entry_per_segment, false) {
45                Ok(s) => segments.push(s),
46                Err(ref e) if e.kind() == ErrorKind::NotFound => break,
47                Err(e) => return Err(e),
48            }
49
50            read_sequence += 1;
51        }
52
53        match segments.first() {
54            Some(s) => if s.len() < cursor.position.read as usize {
55                cursor.position.read = 0;
56            },
57            None => {
58                cursor.position.sequence = 0;
59                cursor.position.read = 0;
60            }
61        }
62
63        Ok(WAL {
64            cfg: cfg,
65            dir: dir,
66            cursor: cursor,
67            next_sequence: read_sequence,
68            segments: segments,
69        })
70    }
71
72    /// Write bytes to wal
73    pub fn write(&mut self, data: &[u8]) -> Result<()> {
74        self.try_allocate(1)?;
75        let segment = self.segments.last_mut().unwrap();
76        segment.write(data)?;
77
78        Ok(())
79    }
80
81    /// Writes multiple entries to wal.
82    pub fn batch_write(&mut self, mut data: &[&[u8]]) -> Result<()> {
83        while !data.is_empty() {
84            let space = self.try_allocate(data.len())?;
85
86            let segment = self.segments.last_mut().unwrap();
87            let written = segment.batch_write(&data[0..space])?;
88            data = &data[written..];
89        }
90
91        Ok(())
92    }
93
94    fn try_allocate(&mut self, n: usize) -> Result<(usize)> {
95        match self.segments.last_mut() {
96            Some(ref s) if s.space() > 0 => {
97                let space = s.space();
98
99                return if space > n { Ok(n) } else { Ok(space) };
100            }
101            Some(ss) => {
102                let _ = ss.flush();
103            }
104            None => {}
105        }
106
107        let new_seg = Segment::open(
108            &self.dir,
109            self.next_sequence,
110            self.cfg.entry_per_segment,
111            true,
112        )?;
113        let space = new_seg.space();
114        self.next_sequence += 1;
115        self.segments.push(new_seg);
116
117        if space > n {
118            Ok(n)
119        } else {
120            Ok(space)
121        }
122    }
123
124    /// Read N entries from wal.
125    pub fn read(&mut self, mut n: usize) -> Result<Vec<Vec<u8>>> {
126        let mut result: Vec<Vec<u8>> = Vec::with_capacity(n);
127
128        let mut seg_read: usize = 0;
129        let mut seg_finished: usize = 0;
130        let start_pos = self.cursor.position.clone();
131        let mut start: usize = self.cursor.position.read as usize;
132
133        while n > 0 {
134            let segment = match self.segments.get(seg_read) {
135                Some(s) if s.len() > start => s,
136                Some(_) => {
137                    seg_read += 1;
138                    seg_finished += 1;
139                    start = 0;
140                    continue;
141                }
142                None => break,
143            };
144
145            let read = segment.read_into(start, n, &mut result, self.cfg.check_crc32)?;
146            start += read;
147            n -= read;
148            self.cursor.position.sequence = segment.sequence();
149            self.cursor.position.read = start as u64;
150
151            if n > 0 {
152                if segment.space() > 0 {
153                    break;
154                }
155
156                start = 0;
157                seg_read += 1;
158                seg_finished += 1;
159            }
160        }
161
162        if seg_finished > 0 {
163            for _ in 0..seg_finished {
164                self.segments.remove(0).destory();
165            }
166        }
167
168        if self.cursor.position != start_pos {
169            let _ = self.cursor.save();
170        }
171
172        Ok(result)
173    }
174
175    /// Returns entry number in the wal.
176    pub fn len(&self) -> usize {
177        let mut size: usize = 0;
178
179        for segment in &self.segments {
180            let num = if segment.sequence() == self.cursor.position.sequence {
181                segment.len() - self.cursor.position.read as usize
182            } else {
183                segment.len()
184            };
185
186            size += num;
187        }
188
189        size
190    }
191}