1use crate::{flatfile::FlatFile, seqno::SeqNoIndex, Error, SeqNoIter, SharedMmap};
2use std::{
3 path::{Path, PathBuf},
4 sync::{Arc, Mutex},
5};
6
7#[derive(Clone)]
9pub struct Database {
10 flatfile: Arc<FlatFile>,
11 seqno_index: Arc<SeqNoIndex>,
12 write_lock: Arc<Mutex<()>>,
13}
14
15impl Database {
16 pub fn file<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
18 let path = path.as_ref();
19
20 if !path.exists() {
21 std::fs::create_dir(path).map_err(|err| Error::FileOpen(path.to_path_buf(), err))?;
22 }
23
24 if !path.is_dir() {
25 return Err(Error::PathNotDir);
26 }
27
28 let flatfile_path = path.join("data");
29 let seqno_index_path = path.join("seqno");
30
31 Self::new(Some(flatfile_path), Some(seqno_index_path))
32 }
33
34 pub fn memory() -> Result<Self, Error> {
36 Self::new(None, None)
37 }
38
39 pub(crate) fn new(
40 flatfile_path: Option<PathBuf>,
41 seqno_index_path: Option<PathBuf>,
42 ) -> Result<Self, Error> {
43 let flatfile = Arc::new(FlatFile::new(flatfile_path)?);
44 let seqno_index = Arc::new(SeqNoIndex::new(seqno_index_path)?);
45
46 let write_lock = Arc::new(Mutex::new(()));
47
48 Ok(Database {
49 flatfile,
50 seqno_index,
51 write_lock,
52 })
53 }
54
55 pub fn append(&self, records: &[&[u8]]) -> Result<(), Error> {
58 let _write_guard = self.write_lock.lock().unwrap();
59
60 let initial_size = self.flatfile.len();
61
62 let mut seqno_index_update = Vec::with_capacity(records.len());
63 let mut offset = initial_size;
64
65 for record in records.iter() {
66 seqno_index_update.push(offset as u64);
67 offset += record.len();
68 }
69
70 self.seqno_index.append(&seqno_index_update)?;
71 self.flatfile.append(records)?;
72
73 Ok(())
74 }
75
76 pub fn put(&self, record: &[u8]) -> Result<(), Error> {
78 self.append(&[record])
79 }
80
81 pub fn get_by_seqno(&self, seqno: usize) -> Option<SharedMmap> {
83 let offset = self.seqno_index.get_pointer_to_value(seqno)? as usize;
84 let next_offset = self
85 .seqno_index
86 .get_pointer_to_value(seqno + 1)
87 .map(|value| value as usize)
88 .unwrap_or_else(|| self.flatfile.len());
89 let length = next_offset - offset;
90 self.flatfile.get_record_at_offset(offset, length)
91 }
92
93 pub fn iter_from_seqno(&self, seqno: usize) -> Option<SeqNoIter> {
96 Some(SeqNoIter::new(
97 self.flatfile.clone(),
98 self.seqno_index.clone(),
99 seqno,
100 ))
101 }
102}
103
104#[cfg(test)]
105mod tests {
106 use super::Database;
107
108 fn read_write(db: Database, data1: Vec<Vec<u8>>, data2: Vec<Vec<u8>>) {
109 let records1: Vec<_> = data1
110 .iter()
111 .filter(|data| !data.is_empty())
112 .map(|data| data.as_ref())
113 .collect();
114 let records2: Vec<_> = data2
115 .iter()
116 .filter(|data| !data.is_empty())
117 .map(|data| data.as_ref())
118 .collect();
119
120 if data1.is_empty() || data2.is_empty() {
121 return;
122 }
123
124 db.append(&records1).unwrap();
125
126 for i in 0..records1.len() {
127 let record = db.get_by_seqno(i).unwrap();
128 assert_eq!(records1[i], record.as_ref());
129 }
130
131 let mut iter = db.iter_from_seqno(0).unwrap();
132 let mut count = 0;
133
134 while let Some(record) = iter.next() {
135 assert_eq!(records1[count], record.as_ref());
136 count += 1;
137 }
138 assert_eq!(count, records1.len());
139
140 db.append(&records2).unwrap();
141
142 for i in records1.len()..(records1.len() + records2.len()) {
143 let record = db.get_by_seqno(i).unwrap();
144 assert_eq!(records2[i - records1.len()], record.as_ref());
145 }
146 }
147
148 #[quickcheck]
149 fn read_write_memory(data1: Vec<Vec<u8>>, data2: Vec<Vec<u8>>) {
150 let db = Database::memory().unwrap();
151 read_write(db, data1, data2);
152 }
153
154 #[quickcheck]
155 fn read_write_storage(data1: Vec<Vec<u8>>, data2: Vec<Vec<u8>>) {
156 let tmp = tempfile::tempdir().unwrap();
157 let db = Database::file(tmp.path()).unwrap();
158 read_write(db, data1, data2);
159 }
160
161 fn parallel_read_write(db: Database, data1: Vec<Vec<u8>>, data2: Vec<Vec<u8>>) {
162 let data1: Vec<_> = data1.into_iter().filter(|data| !data.is_empty()).collect();
163 let data2: Vec<_> = data2.into_iter().filter(|data| !data.is_empty()).collect();
164
165 if data1.is_empty() || data2.is_empty() {
166 return;
167 }
168
169 let records1: Vec<_> = data1.iter().map(|data| data.as_ref()).collect();
170
171 db.append(&records1).unwrap();
172
173 let write_db = db.clone();
174
175 let write_thread = std::thread::spawn(move || {
176 let records2: Vec<_> = data2.iter().map(|data| data.as_ref()).collect();
177
178 write_db.append(&records2).unwrap();
179
180 data2
181 });
182
183 for i in 0..records1.len() {
184 let record = db.get_by_seqno(i).unwrap();
185 assert_eq!(records1[i], record.as_ref());
186 }
187
188 let data2 = write_thread.join().unwrap();
189
190 for i in data1.len()..(data1.len() + data2.len()) {
191 let record = db.get_by_seqno(i).unwrap();
192 let i = i - data1.len();
193 assert_eq!(data2[i], record.as_ref());
194 }
195 }
196
197 #[quickcheck]
198 fn parallel_read_write_memory(data1: Vec<Vec<u8>>, data2: Vec<Vec<u8>>) {
199 let db = Database::memory().unwrap();
200 parallel_read_write(db, data1, data2);
201 }
202
203 #[quickcheck]
204 fn parallel_read_write_storage(data1: Vec<Vec<u8>>, data2: Vec<Vec<u8>>) {
205 let tmp = tempfile::tempdir().unwrap();
206 let db = Database::file(tmp.path()).unwrap();
207 parallel_read_write(db, data1, data2);
208 }
209}