hickory_server/store/sqlite/
persistence.rs

1// Copyright 2015-2016 Benjamin Fry <benjaminfry -@- me.com>
2//
3// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4// https://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5// https://opensource.org/licenses/MIT>, at your option. This file may not be
6// copied, modified, or distributed except according to those terms.
7
8//! All zone persistence related types
9
10use std::iter::Iterator;
11use std::path::Path;
12use std::sync::{Mutex, MutexGuard};
13
14use rusqlite::types::ToSql;
15use rusqlite::{self, Connection};
16use time;
17use tracing::error;
18
19use crate::error::{PersistenceError, PersistenceErrorKind};
20use crate::proto::rr::Record;
21use crate::proto::serialize::binary::{BinDecodable, BinDecoder, BinEncodable, BinEncoder};
22
23/// The current Journal version of the application
24pub const CURRENT_VERSION: i64 = 1;
25
26/// The Journal is the audit log of all changes to a zone after initial creation.
27pub struct Journal {
28    conn: Mutex<Connection>,
29    version: i64,
30}
31
32impl Journal {
33    /// Constructs a new Journal, attaching to the specified Sqlite Connection
34    pub fn new(conn: Connection) -> Result<Self, PersistenceError> {
35        let version = Self::select_schema_version(&conn)?;
36        Ok(Self {
37            conn: Mutex::new(conn),
38            version,
39        })
40    }
41
42    /// Constructs a new Journal opening a Sqlite connection to the file at the specified path
43    pub fn from_file(journal_file: &Path) -> Result<Self, PersistenceError> {
44        let result = Self::new(Connection::open(journal_file)?);
45        let mut journal = result?;
46        journal.schema_up()?;
47        Ok(journal)
48    }
49
50    /// Returns a reference to the Sqlite Connection
51    pub fn conn(&self) -> MutexGuard<'_, Connection> {
52        self.conn.lock().expect("conn poisoned")
53    }
54
55    /// Returns the current schema version of the journal
56    pub fn schema_version(&self) -> i64 {
57        self.version
58    }
59
60    /// this returns an iterator from the beginning of time, to be used to recreate an authority
61    pub fn iter(&self) -> JournalIter<'_> {
62        JournalIter::new(self)
63    }
64
65    /// Inserts a record, this is an append only operation.
66    ///
67    /// Records should never be posthumously modified. The message will be serialized into the.
68    ///  the first message serialized to the journal, should be a single AXFR of the entire zone,
69    ///  this will be used as a starting point to reconstruct the zone.
70    ///
71    /// # Argument
72    ///
73    /// * `record` - will be serialized into the journal
74    pub fn insert_record(&self, soa_serial: u32, record: &Record) -> Result<(), PersistenceError> {
75        assert!(
76            self.version == CURRENT_VERSION,
77            "schema version mismatch, schema_up() resolves this"
78        );
79
80        let mut serial_record: Vec<u8> = Vec::with_capacity(512);
81        {
82            let mut encoder = BinEncoder::new(&mut serial_record);
83            record.emit(&mut encoder)?;
84        }
85
86        let timestamp = time::OffsetDateTime::now_utc();
87        let client_id: i64 = 0; // TODO: we need better id information about the client, like pub_key
88        let soa_serial: i64 = i64::from(soa_serial);
89
90        let count = self.conn.lock().expect("conn poisoned").execute(
91            "INSERT
92                                          \
93                                            INTO records (client_id, soa_serial, timestamp, \
94                                            record)
95                                          \
96                                            VALUES ($1, $2, $3, $4)",
97            [
98                &client_id as &dyn ToSql,
99                &soa_serial,
100                &timestamp,
101                &serial_record,
102            ],
103        )?;
104        //
105        if count != 1 {
106            return Err(PersistenceErrorKind::WrongInsertCount {
107                got: count,
108                expect: 1,
109            }
110            .into());
111        };
112
113        Ok(())
114    }
115
116    /// Inserts a set of records into the Journal, a convenience method for insert_record
117    pub fn insert_records(
118        &self,
119        soa_serial: u32,
120        records: &[Record],
121    ) -> Result<(), PersistenceError> {
122        // TODO: NEED TRANSACTION HERE
123        for record in records {
124            self.insert_record(soa_serial, record)?;
125        }
126
127        Ok(())
128    }
129
130    /// Selects a record from the given row_id.
131    ///
132    /// This allows for the entire set of records to be iterated through, by starting at 0, and
133    ///  incrementing each subsequent row.
134    ///
135    /// # Arguments
136    ///
137    /// * `row_id` - the row_id can either be exact, or start at 0 to get the earliest row in the
138    ///   list.
139    pub fn select_record(&self, row_id: i64) -> Result<Option<(i64, Record)>, PersistenceError> {
140        assert!(
141            self.version == CURRENT_VERSION,
142            "schema version mismatch, schema_up() resolves this"
143        );
144
145        let conn = self.conn.lock().expect("conn poisoned");
146        let mut stmt = conn.prepare(
147            "SELECT _rowid_, record
148                                            \
149                                               FROM records
150                                            \
151                                               WHERE _rowid_ >= $1
152                                            \
153                                               LIMIT 1",
154        )?;
155
156        let record_opt: Option<Result<(i64, Record), rusqlite::Error>> = stmt
157            .query_and_then([&row_id], |row| -> Result<(i64, Record), rusqlite::Error> {
158                let row_id: i64 = row.get(0)?;
159                let record_bytes: Vec<u8> = row.get(1)?;
160                let mut decoder = BinDecoder::new(&record_bytes);
161
162                // todo add location to this...
163                match Record::read(&mut decoder) {
164                    Ok(record) => Ok((row_id, record)),
165                    Err(decode_error) => Err(rusqlite::Error::InvalidParameterName(format!(
166                        "could not decode: {decode_error}"
167                    ))),
168                }
169            })?
170            .next();
171
172        //
173        match record_opt {
174            Some(Ok((row_id, record))) => Ok(Some((row_id, record))),
175            Some(Err(err)) => Err(err.into()),
176            None => Ok(None),
177        }
178    }
179
180    /// selects the current schema version of the journal DB, returns -1 if there is no schema
181    ///
182    ///
183    /// # Arguments
184    ///
185    /// * `conn` - db connection to use
186    pub fn select_schema_version(conn: &Connection) -> Result<i64, PersistenceError> {
187        // first see if our schema is there
188        let mut stmt = conn.prepare(
189            "SELECT name
190                                        \
191                                          FROM sqlite_master
192                                        \
193                                          WHERE type='table'
194                                        \
195                                          AND name='tdns_schema'",
196        )?;
197
198        let tdns_schema_opt: Option<Result<String, _>> =
199            stmt.query_map([], |row| row.get(0))?.next();
200
201        let tdns_schema = match tdns_schema_opt {
202            Some(Ok(string)) => string,
203            Some(Err(err)) => return Err(err.into()),
204            None => return Ok(-1),
205        };
206
207        assert_eq!(&tdns_schema, "tdns_schema");
208
209        let version: i64 = conn.query_row(
210            "SELECT version
211                                            \
212                                                FROM tdns_schema",
213            [],
214            |row| row.get(0),
215        )?;
216
217        Ok(version)
218    }
219
220    /// update the schema version
221    fn update_schema_version(&self, new_version: i64) -> Result<(), PersistenceError> {
222        // validate the versions of all the schemas...
223        assert!(new_version <= CURRENT_VERSION);
224
225        let count = self
226            .conn
227            .lock()
228            .expect("conn poisoned")
229            .execute("UPDATE tdns_schema SET version = $1", [&new_version])?;
230
231        //
232        assert_eq!(count, 1);
233        Ok(())
234    }
235
236    /// initializes the schema for the Journal
237    pub fn schema_up(&mut self) -> Result<i64, PersistenceError> {
238        while self.version < CURRENT_VERSION {
239            match self.version + 1 {
240                0 => self.version = self.init_up()?,
241                1 => self.version = self.records_up()?,
242                _ => panic!("incorrect version somewhere"), // valid panic, non-recoverable state
243            }
244
245            self.update_schema_version(self.version)?;
246        }
247
248        Ok(self.version)
249    }
250
251    /// initial schema, include the tdns_schema table for tracking the Journal version
252    fn init_up(&self) -> Result<i64, PersistenceError> {
253        let count = self.conn.lock().expect("conn poisoned").execute(
254            "CREATE TABLE tdns_schema (
255                                          \
256                                            version INTEGER NOT NULL
257                                        \
258                                            )",
259            [],
260        )?;
261        //
262        assert_eq!(count, 0);
263
264        let count = self
265            .conn
266            .lock()
267            .expect("conn poisoned")
268            .execute("INSERT INTO tdns_schema (version) VALUES (0)", [])?;
269        //
270        assert_eq!(count, 1);
271
272        Ok(0)
273    }
274
275    /// adds the records table, this is the main and single table for the history of changes to an
276    ///  authority. Each record is expected to be in the format of an update record
277    fn records_up(&self) -> Result<i64, PersistenceError> {
278        // we'll be using rowid for our primary key, basically: `rowid INTEGER PRIMARY KEY ASC`
279        let count = self.conn.lock().expect("conn poisoned").execute(
280            "CREATE TABLE records (
281                                          \
282                                            client_id      INTEGER NOT NULL,
283                                          \
284                                            soa_serial     INTEGER NOT NULL,
285                                          \
286                                            timestamp      TEXT NOT NULL,
287                                          \
288                                            record         BLOB NOT NULL
289                                        \
290                                            )",
291            [],
292        )?;
293        //
294        assert_eq!(count, 1);
295
296        Ok(1)
297    }
298}
299
300/// Returns an iterator over all items in a Journal
301///
302/// Useful for replaying an entire journal into memory to reconstruct a zone from disk
303pub struct JournalIter<'j> {
304    current_row_id: i64,
305    journal: &'j Journal,
306}
307
308impl<'j> JournalIter<'j> {
309    fn new(journal: &'j Journal) -> Self {
310        JournalIter {
311            current_row_id: 0,
312            journal,
313        }
314    }
315}
316
317impl Iterator for JournalIter<'_> {
318    type Item = Record;
319
320    fn next(&mut self) -> Option<Self::Item> {
321        match self.journal.select_record(self.current_row_id + 1) {
322            Ok(Some((row_id, record))) => {
323                self.current_row_id = row_id;
324                Some(record)
325            }
326            Ok(None) => None,
327            Err(err) => {
328                error!("persistence error while iterating over journal: {}", err);
329                None
330            }
331        }
332    }
333}