hickory_server/store/sqlite/
persistence.rs1use std::iter::Iterator;
11use std::path::Path;
12use std::sync::{Mutex, MutexGuard};
13
14use rusqlite::types::ToSql;
15use rusqlite::{self, Connection};
16use time;
17use tracing::error;
18
19use crate::error::{PersistenceError, PersistenceErrorKind};
20use crate::proto::rr::Record;
21use crate::proto::serialize::binary::{BinDecodable, BinDecoder, BinEncodable, BinEncoder};
22
23pub const CURRENT_VERSION: i64 = 1;
25
26pub struct Journal {
28 conn: Mutex<Connection>,
29 version: i64,
30}
31
32impl Journal {
33 pub fn new(conn: Connection) -> Result<Self, PersistenceError> {
35 let version = Self::select_schema_version(&conn)?;
36 Ok(Self {
37 conn: Mutex::new(conn),
38 version,
39 })
40 }
41
42 pub fn from_file(journal_file: &Path) -> Result<Self, PersistenceError> {
44 let result = Self::new(Connection::open(journal_file)?);
45 let mut journal = result?;
46 journal.schema_up()?;
47 Ok(journal)
48 }
49
50 pub fn conn(&self) -> MutexGuard<'_, Connection> {
52 self.conn.lock().expect("conn poisoned")
53 }
54
55 pub fn schema_version(&self) -> i64 {
57 self.version
58 }
59
60 pub fn iter(&self) -> JournalIter<'_> {
62 JournalIter::new(self)
63 }
64
65 pub fn insert_record(&self, soa_serial: u32, record: &Record) -> Result<(), PersistenceError> {
75 assert!(
76 self.version == CURRENT_VERSION,
77 "schema version mismatch, schema_up() resolves this"
78 );
79
80 let mut serial_record: Vec<u8> = Vec::with_capacity(512);
81 {
82 let mut encoder = BinEncoder::new(&mut serial_record);
83 record.emit(&mut encoder)?;
84 }
85
86 let timestamp = time::OffsetDateTime::now_utc();
87 let client_id: i64 = 0; let soa_serial: i64 = i64::from(soa_serial);
89
90 let count = self.conn.lock().expect("conn poisoned").execute(
91 "INSERT
92 \
93 INTO records (client_id, soa_serial, timestamp, \
94 record)
95 \
96 VALUES ($1, $2, $3, $4)",
97 [
98 &client_id as &dyn ToSql,
99 &soa_serial,
100 ×tamp,
101 &serial_record,
102 ],
103 )?;
104 if count != 1 {
106 return Err(PersistenceErrorKind::WrongInsertCount {
107 got: count,
108 expect: 1,
109 }
110 .into());
111 };
112
113 Ok(())
114 }
115
116 pub fn insert_records(
118 &self,
119 soa_serial: u32,
120 records: &[Record],
121 ) -> Result<(), PersistenceError> {
122 for record in records {
124 self.insert_record(soa_serial, record)?;
125 }
126
127 Ok(())
128 }
129
130 pub fn select_record(&self, row_id: i64) -> Result<Option<(i64, Record)>, PersistenceError> {
140 assert!(
141 self.version == CURRENT_VERSION,
142 "schema version mismatch, schema_up() resolves this"
143 );
144
145 let conn = self.conn.lock().expect("conn poisoned");
146 let mut stmt = conn.prepare(
147 "SELECT _rowid_, record
148 \
149 FROM records
150 \
151 WHERE _rowid_ >= $1
152 \
153 LIMIT 1",
154 )?;
155
156 let record_opt: Option<Result<(i64, Record), rusqlite::Error>> = stmt
157 .query_and_then([&row_id], |row| -> Result<(i64, Record), rusqlite::Error> {
158 let row_id: i64 = row.get(0)?;
159 let record_bytes: Vec<u8> = row.get(1)?;
160 let mut decoder = BinDecoder::new(&record_bytes);
161
162 match Record::read(&mut decoder) {
164 Ok(record) => Ok((row_id, record)),
165 Err(decode_error) => Err(rusqlite::Error::InvalidParameterName(format!(
166 "could not decode: {decode_error}"
167 ))),
168 }
169 })?
170 .next();
171
172 match record_opt {
174 Some(Ok((row_id, record))) => Ok(Some((row_id, record))),
175 Some(Err(err)) => Err(err.into()),
176 None => Ok(None),
177 }
178 }
179
180 pub fn select_schema_version(conn: &Connection) -> Result<i64, PersistenceError> {
187 let mut stmt = conn.prepare(
189 "SELECT name
190 \
191 FROM sqlite_master
192 \
193 WHERE type='table'
194 \
195 AND name='tdns_schema'",
196 )?;
197
198 let tdns_schema_opt: Option<Result<String, _>> =
199 stmt.query_map([], |row| row.get(0))?.next();
200
201 let tdns_schema = match tdns_schema_opt {
202 Some(Ok(string)) => string,
203 Some(Err(err)) => return Err(err.into()),
204 None => return Ok(-1),
205 };
206
207 assert_eq!(&tdns_schema, "tdns_schema");
208
209 let version: i64 = conn.query_row(
210 "SELECT version
211 \
212 FROM tdns_schema",
213 [],
214 |row| row.get(0),
215 )?;
216
217 Ok(version)
218 }
219
220 fn update_schema_version(&self, new_version: i64) -> Result<(), PersistenceError> {
222 assert!(new_version <= CURRENT_VERSION);
224
225 let count = self
226 .conn
227 .lock()
228 .expect("conn poisoned")
229 .execute("UPDATE tdns_schema SET version = $1", [&new_version])?;
230
231 assert_eq!(count, 1);
233 Ok(())
234 }
235
236 pub fn schema_up(&mut self) -> Result<i64, PersistenceError> {
238 while self.version < CURRENT_VERSION {
239 match self.version + 1 {
240 0 => self.version = self.init_up()?,
241 1 => self.version = self.records_up()?,
242 _ => panic!("incorrect version somewhere"), }
244
245 self.update_schema_version(self.version)?;
246 }
247
248 Ok(self.version)
249 }
250
251 fn init_up(&self) -> Result<i64, PersistenceError> {
253 let count = self.conn.lock().expect("conn poisoned").execute(
254 "CREATE TABLE tdns_schema (
255 \
256 version INTEGER NOT NULL
257 \
258 )",
259 [],
260 )?;
261 assert_eq!(count, 0);
263
264 let count = self
265 .conn
266 .lock()
267 .expect("conn poisoned")
268 .execute("INSERT INTO tdns_schema (version) VALUES (0)", [])?;
269 assert_eq!(count, 1);
271
272 Ok(0)
273 }
274
275 fn records_up(&self) -> Result<i64, PersistenceError> {
278 let count = self.conn.lock().expect("conn poisoned").execute(
280 "CREATE TABLE records (
281 \
282 client_id INTEGER NOT NULL,
283 \
284 soa_serial INTEGER NOT NULL,
285 \
286 timestamp TEXT NOT NULL,
287 \
288 record BLOB NOT NULL
289 \
290 )",
291 [],
292 )?;
293 assert_eq!(count, 1);
295
296 Ok(1)
297 }
298}
299
300pub struct JournalIter<'j> {
304 current_row_id: i64,
305 journal: &'j Journal,
306}
307
308impl<'j> JournalIter<'j> {
309 fn new(journal: &'j Journal) -> Self {
310 JournalIter {
311 current_row_id: 0,
312 journal,
313 }
314 }
315}
316
317impl Iterator for JournalIter<'_> {
318 type Item = Record;
319
320 fn next(&mut self) -> Option<Self::Item> {
321 match self.journal.select_record(self.current_row_id + 1) {
322 Ok(Some((row_id, record))) => {
323 self.current_row_id = row_id;
324 Some(record)
325 }
326 Ok(None) => None,
327 Err(err) => {
328 error!("persistence error while iterating over journal: {}", err);
329 None
330 }
331 }
332 }
333}