data_pile/
database.rs

1use crate::{flatfile::FlatFile, seqno::SeqNoIndex, Error, SeqNoIter, SharedMmap};
2use std::{
3    path::{Path, PathBuf},
4    sync::{Arc, Mutex},
5};
6
7/// Append-only database. Can be safely cloned and used from different threads.
8#[derive(Clone)]
9pub struct Database {
10    flatfile: Arc<FlatFile>,
11    seqno_index: Arc<SeqNoIndex>,
12    write_lock: Arc<Mutex<()>>,
13}
14
15impl Database {
16    /// Open the database. Will create one if not exists.
17    pub fn file<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
18        let path = path.as_ref();
19
20        if !path.exists() {
21            std::fs::create_dir(path).map_err(|err| Error::FileOpen(path.to_path_buf(), err))?;
22        }
23
24        if !path.is_dir() {
25            return Err(Error::PathNotDir);
26        }
27
28        let flatfile_path = path.join("data");
29        let seqno_index_path = path.join("seqno");
30
31        Self::new(Some(flatfile_path), Some(seqno_index_path))
32    }
33
34    /// Open an in-memory database.
35    pub fn memory() -> Result<Self, Error> {
36        Self::new(None, None)
37    }
38
39    pub(crate) fn new(
40        flatfile_path: Option<PathBuf>,
41        seqno_index_path: Option<PathBuf>,
42    ) -> Result<Self, Error> {
43        let flatfile = Arc::new(FlatFile::new(flatfile_path)?);
44        let seqno_index = Arc::new(SeqNoIndex::new(seqno_index_path)?);
45
46        let write_lock = Arc::new(Mutex::new(()));
47
48        Ok(Database {
49            flatfile,
50            seqno_index,
51            write_lock,
52        })
53    }
54
55    /// Write an array of records to the database. This function will block if
56    /// another write is still in progress.
57    pub fn append(&self, records: &[&[u8]]) -> Result<(), Error> {
58        let _write_guard = self.write_lock.lock().unwrap();
59
60        let initial_size = self.flatfile.len();
61
62        let mut seqno_index_update = Vec::with_capacity(records.len());
63        let mut offset = initial_size;
64
65        for record in records.iter() {
66            seqno_index_update.push(offset as u64);
67            offset += record.len();
68        }
69
70        self.seqno_index.append(&seqno_index_update)?;
71        self.flatfile.append(records)?;
72
73        Ok(())
74    }
75
76    /// Put a single record (not recommended).
77    pub fn put(&self, record: &[u8]) -> Result<(), Error> {
78        self.append(&[record])
79    }
80
81    /// Get a record by its sequential number.
82    pub fn get_by_seqno(&self, seqno: usize) -> Option<SharedMmap> {
83        let offset = self.seqno_index.get_pointer_to_value(seqno)? as usize;
84        let next_offset = self
85            .seqno_index
86            .get_pointer_to_value(seqno + 1)
87            .map(|value| value as usize)
88            .unwrap_or_else(|| self.flatfile.len());
89        let length = next_offset - offset;
90        self.flatfile.get_record_at_offset(offset, length)
91    }
92
93    /// Iterate records in the order they were added starting form the given
94    /// sequential number.
95    pub fn iter_from_seqno(&self, seqno: usize) -> Option<SeqNoIter> {
96        Some(SeqNoIter::new(
97            self.flatfile.clone(),
98            self.seqno_index.clone(),
99            seqno,
100        ))
101    }
102}
103
104#[cfg(test)]
105mod tests {
106    use super::Database;
107
108    fn read_write(db: Database, data1: Vec<Vec<u8>>, data2: Vec<Vec<u8>>) {
109        let records1: Vec<_> = data1
110            .iter()
111            .filter(|data| !data.is_empty())
112            .map(|data| data.as_ref())
113            .collect();
114        let records2: Vec<_> = data2
115            .iter()
116            .filter(|data| !data.is_empty())
117            .map(|data| data.as_ref())
118            .collect();
119
120        if data1.is_empty() || data2.is_empty() {
121            return;
122        }
123
124        db.append(&records1).unwrap();
125
126        for i in 0..records1.len() {
127            let record = db.get_by_seqno(i).unwrap();
128            assert_eq!(records1[i], record.as_ref());
129        }
130
131        let mut iter = db.iter_from_seqno(0).unwrap();
132        let mut count = 0;
133
134        while let Some(record) = iter.next() {
135            assert_eq!(records1[count], record.as_ref());
136            count += 1;
137        }
138        assert_eq!(count, records1.len());
139
140        db.append(&records2).unwrap();
141
142        for i in records1.len()..(records1.len() + records2.len()) {
143            let record = db.get_by_seqno(i).unwrap();
144            assert_eq!(records2[i - records1.len()], record.as_ref());
145        }
146    }
147
148    #[quickcheck]
149    fn read_write_memory(data1: Vec<Vec<u8>>, data2: Vec<Vec<u8>>) {
150        let db = Database::memory().unwrap();
151        read_write(db, data1, data2);
152    }
153
154    #[quickcheck]
155    fn read_write_storage(data1: Vec<Vec<u8>>, data2: Vec<Vec<u8>>) {
156        let tmp = tempfile::tempdir().unwrap();
157        let db = Database::file(tmp.path()).unwrap();
158        read_write(db, data1, data2);
159    }
160
161    fn parallel_read_write(db: Database, data1: Vec<Vec<u8>>, data2: Vec<Vec<u8>>) {
162        let data1: Vec<_> = data1.into_iter().filter(|data| !data.is_empty()).collect();
163        let data2: Vec<_> = data2.into_iter().filter(|data| !data.is_empty()).collect();
164
165        if data1.is_empty() || data2.is_empty() {
166            return;
167        }
168
169        let records1: Vec<_> = data1.iter().map(|data| data.as_ref()).collect();
170
171        db.append(&records1).unwrap();
172
173        let write_db = db.clone();
174
175        let write_thread = std::thread::spawn(move || {
176            let records2: Vec<_> = data2.iter().map(|data| data.as_ref()).collect();
177
178            write_db.append(&records2).unwrap();
179
180            data2
181        });
182
183        for i in 0..records1.len() {
184            let record = db.get_by_seqno(i).unwrap();
185            assert_eq!(records1[i], record.as_ref());
186        }
187
188        let data2 = write_thread.join().unwrap();
189
190        for i in data1.len()..(data1.len() + data2.len()) {
191            let record = db.get_by_seqno(i).unwrap();
192            let i = i - data1.len();
193            assert_eq!(data2[i], record.as_ref());
194        }
195    }
196
197    #[quickcheck]
198    fn parallel_read_write_memory(data1: Vec<Vec<u8>>, data2: Vec<Vec<u8>>) {
199        let db = Database::memory().unwrap();
200        parallel_read_write(db, data1, data2);
201    }
202
203    #[quickcheck]
204    fn parallel_read_write_storage(data1: Vec<Vec<u8>>, data2: Vec<Vec<u8>>) {
205        let tmp = tempfile::tempdir().unwrap();
206        let db = Database::file(tmp.path()).unwrap();
207        parallel_read_write(db, data1, data2);
208    }
209}