seshat/database/
mod.rs

1// Copyright 2019 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod connection;
16mod recovery;
17mod searcher;
18mod static_methods;
19mod writer;
20
21use fs_extra::dir;
22use r2d2::{Pool, PooledConnection};
23use r2d2_sqlite::SqliteConnectionManager;
24use rusqlite::ToSql;
25use std::{
26    fs,
27    path::{Path, PathBuf},
28    sync::{
29        mpsc::{channel, Receiver, Sender},
30        Arc, Mutex,
31    },
32    thread,
33    thread::JoinHandle,
34};
35
36pub use crate::database::{
37    connection::{Connection, DatabaseStats},
38    recovery::{RecoveryDatabase, RecoveryInfo},
39    searcher::{SearchBatch, SearchResult, Searcher},
40};
41use crate::{
42    config::{Config, SearchConfig},
43    database::writer::Writer,
44    error::{Error, Result},
45    events::{CrawlerCheckpoint, Event, EventId, HistoricEventsT, Profile},
46    index::{Index, Writer as IndexWriter},
47};
48
49#[cfg(test)]
50use fake::{Fake, Faker};
51#[cfg(test)]
52use std::time;
53#[cfg(test)]
54use tempfile::tempdir;
55
56#[cfg(test)]
57use crate::events::CheckpointDirection;
58#[cfg(test)]
59use crate::{EVENT, TOPIC_EVENT};
60
61const DATABASE_VERSION: i64 = 4;
62const EVENTS_DB_NAME: &str = "events.db";
63
64pub(crate) enum ThreadMessage {
65    Event((Event, Profile)),
66    HistoricEvents(HistoricEventsT),
67    Write(Sender<Result<()>>, bool),
68    Delete(Sender<Result<bool>>, EventId),
69    ShutDown(Sender<Result<()>>),
70}
71
72/// The Seshat database.
73pub struct Database {
74    path: PathBuf,
75    connection: Arc<Mutex<PooledConnection<SqliteConnectionManager>>>,
76    pool: r2d2::Pool<SqliteConnectionManager>,
77    _write_thread: JoinHandle<()>,
78    tx: Sender<ThreadMessage>,
79    index: Index,
80    config: Config,
81}
82
83type WriterRet = (JoinHandle<()>, Sender<ThreadMessage>);
84
85impl Database {
86    /// Create a new Seshat database or open an existing one.
87    /// # Arguments
88    ///
89    /// * `path` - The directory where the database will be stored in. This
90    ///   should be an empty directory if a new database should be created.
91    pub fn new<P: AsRef<Path>>(path: P) -> Result<Database>
92    where
93        PathBuf: std::convert::From<P>,
94    {
95        Database::new_with_config(path, &Config::new())
96    }
97
98    /// Create a new Seshat database or open an existing one with the given
99    /// configuration.
100    /// # Arguments
101    ///
102    /// * `path` - The directory where the database will be stored in. This
103    ///   should be an empty directory if a new database should be created.
104    /// * `config` - Configuration that changes the behaviour of the database.
105    pub fn new_with_config<P: AsRef<Path>>(path: P, config: &Config) -> Result<Database>
106    where
107        PathBuf: std::convert::From<P>,
108    {
109        let db_path = path.as_ref().join(EVENTS_DB_NAME);
110        let pool = Self::get_pool(&db_path, config)?;
111
112        let mut connection = pool.get()?;
113
114        Database::unlock(&connection, config)?;
115        Database::set_pragmas(&connection)?;
116
117        let (version, reindex_needed) = match Database::get_version(&mut connection) {
118            Ok(ret) => ret,
119            Err(e) => return Err(Error::DatabaseOpenError(e.to_string())),
120        };
121
122        Database::create_tables(&connection)?;
123
124        if version != DATABASE_VERSION {
125            return Err(Error::DatabaseVersionError);
126        }
127
128        if reindex_needed {
129            return Err(Error::ReindexError);
130        }
131
132        let index = Database::create_index(&path, config)?;
133        let writer = index.get_writer()?;
134
135        // Warning: Do not open a new db connection before we write the tables
136        // to the DB, otherwise sqlcipher might think that we are initializing
137        // a new database and we'll end up with two connections using differing
138        // keys and writes/reads to one of the connections might fail.
139        let writer_connection = pool.get()?;
140        Database::unlock(&writer_connection, config)?;
141        Database::set_pragmas(&writer_connection)?;
142
143        let (t_handle, tx) = Database::spawn_writer(writer_connection, writer);
144
145        Ok(Database {
146            path: path.into(),
147            connection: Arc::new(Mutex::new(connection)),
148            pool,
149            _write_thread: t_handle,
150            tx,
151            index,
152            config: config.clone(),
153        })
154    }
155
156    fn get_pool(db_path: &PathBuf, config: &Config) -> Result<Pool<SqliteConnectionManager>> {
157        let manager = SqliteConnectionManager::file(db_path);
158        let pool = r2d2::Pool::new(manager)?;
159        let connection = pool.get()?;
160
161        // Try to unlock a single connection.
162        match Database::unlock(&connection, config) {
163            // We're fine, the connection was returned successfully, we can return the pool.
164            Ok(_) => Ok(pool),
165            Err(_) => {
166                let Some(passphrase) = &config.passphrase else {
167                    // No passphrase was provided, and we failed to unlock a connection, return an
168                    // error.
169                    return Err(Error::DatabaseUnlockError("Invalid passphrase".to_owned()));
170                };
171
172                // Ok, let's see if the unlock of the connection failed because of new default
173                // settings for the cipher settings, let's see if we can migrate the cipher settings.
174                // Take a look at the documentation of cipher_migrate[1] for more info.
175                //
176                // [1]: https://www.zetetic.net/sqlcipher/sqlcipher-api/#cipher_migrate
177                let connection = pool.get()?;
178                connection.pragma_update(None, "key", &passphrase.as_str() as &dyn ToSql)?;
179
180                let mut statement = connection.prepare("PRAGMA cipher_migrate")?;
181                let result = statement.query_row([], |row| row.get::<usize, String>(0))?;
182
183                // The cipher_migrate pragma returns a single row/column with the value set to `0`
184                // if we succeeded.
185                if result == "0" {
186                    // In this case the migration was successful and we can now recreate the pool
187                    // so the new settings come into play.
188                    let manager = SqliteConnectionManager::file(db_path);
189                    let pool = r2d2::Pool::new(manager)?;
190
191                    Ok(pool)
192                } else {
193                    Err(Error::DatabaseUnlockError("Invalid passphrase".to_owned()))
194                }
195            }
196        }
197    }
198
199    fn set_pragmas(connection: &rusqlite::Connection) -> Result<()> {
200        connection.pragma_update(None, "foreign_keys", &1 as &dyn ToSql)?;
201        connection.pragma_update(None, "journal_mode", "WAL")?;
202        connection.pragma_update(None, "synchronous", "NORMAL")?;
203        connection.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);")?;
204        Ok(())
205    }
206
207    /// Change the passphrase of the Seshat database.
208    ///
209    /// Note that this consumes the database object and any searcher objects
210    ///   can't be used anymore. A new database will have to be opened and new
211    ///   searcher objects as well.
212    ///
213    /// # Arguments
214    ///
215    /// * `path` - The directory where the database will be stored in. This
216    ///   should be an empty directory if a new database should be created.
217    /// * `new_passphrase` - The passphrase that should be used instead of the
218    ///   current one.
219    #[cfg(feature = "encryption")]
220    pub fn change_passphrase(self, new_passphrase: &str) -> Result<()> {
221        match &self.config.passphrase {
222            Some(p) => {
223                Index::change_passphrase(&self.path, p, new_passphrase)?;
224                self.connection.lock().unwrap().pragma_update(
225                    None,
226                    "rekey",
227                    &new_passphrase as &dyn ToSql,
228                )?;
229            }
230            None => panic!("Database isn't encrypted"),
231        }
232
233        let receiver = self.shutdown();
234        receiver.recv().unwrap()?;
235
236        Ok(())
237    }
238
239    #[cfg(feature = "encryption")]
240    fn unlock(connection: &rusqlite::Connection, config: &Config) -> Result<()> {
241        let passphrase: &String = if let Some(ref p) = config.passphrase {
242            p
243        } else {
244            return Ok(());
245        };
246
247        let mut statement = connection.prepare("PRAGMA cipher_version")?;
248        let results = statement.query_map([], |row| row.get::<usize, String>(0))?;
249
250        if results.count() != 1 {
251            return Err(Error::SqlCipherError(
252                "Sqlcipher support is missing".to_string(),
253            ));
254        }
255
256        connection.pragma_update(None, "key", passphrase as &dyn ToSql)?;
257
258        let count: std::result::Result<i64, rusqlite::Error> =
259            connection.query_row("SELECT COUNT(*) FROM sqlite_master", [], |row| row.get(0));
260
261        match count {
262            Ok(_) => Ok(()),
263            Err(_) => Err(Error::DatabaseUnlockError("Invalid passphrase".to_owned())),
264        }
265    }
266
267    #[cfg(not(feature = "encryption"))]
268    fn unlock(_: &rusqlite::Connection, _: &Config) -> Result<()> {
269        Ok(())
270    }
271
272    /// Get the size of the database.
273    /// This returns the number of bytes the database is using on disk.
274    pub fn get_size(&self) -> Result<u64> {
275        Ok(dir::get_size(self.get_path())?)
276    }
277
278    /// Get the path of the directory where the Seshat database lives in.
279    pub fn get_path(&self) -> &Path {
280        self.path.as_path()
281    }
282
283    fn create_index<P: AsRef<Path>>(path: &P, config: &Config) -> Result<Index> {
284        Ok(Index::new(path, config)?)
285    }
286
287    fn spawn_writer(
288        connection: PooledConnection<SqliteConnectionManager>,
289        index_writer: IndexWriter,
290    ) -> WriterRet {
291        let (tx, rx): (_, Receiver<ThreadMessage>) = channel();
292
293        let t_handle = thread::spawn(move || {
294            let mut writer = Writer::new(connection, index_writer);
295            let mut loaded_unprocessed = false;
296
297            while let Ok(message) = rx.recv() {
298                match message {
299                    ThreadMessage::Event((event, profile)) => writer.add_event(event, profile),
300                    ThreadMessage::Write(sender, force_commit) => {
301                        // We may have events that aren't deleted or committed
302                        // to the index but are stored in the db, let us load
303                        // them from the db and commit them to the index now.
304                        // They will later be marked as committed in the
305                        // database as part of a normal write.
306                        if !loaded_unprocessed {
307                            let ret = writer.load_unprocessed_events();
308
309                            loaded_unprocessed = true;
310
311                            if ret.is_err() {
312                                sender.send(ret).unwrap_or(());
313                                continue;
314                            }
315                        }
316                        let ret = writer.write_queued_events(force_commit);
317                        // Notify that we are done with the write.
318                        sender.send(ret).unwrap_or(());
319                    }
320                    ThreadMessage::HistoricEvents(m) => {
321                        let (check, old_check, events, sender) = m;
322                        let ret = writer.write_historic_events(check, old_check, events, true);
323                        sender.send(ret).unwrap_or(());
324                    }
325                    ThreadMessage::Delete(sender, event_id) => {
326                        let ret = writer.delete_event(event_id);
327                        sender.send(ret).unwrap_or(());
328                    }
329                    ThreadMessage::ShutDown(sender) => {
330                        let ret = writer.shutdown();
331                        sender.send(ret).unwrap_or(());
332                        return;
333                    }
334                };
335            }
336        });
337
338        (t_handle, tx)
339    }
340
341    /// Add an event with the given profile to the database.
342    /// # Arguments
343    ///
344    /// * `event` - The directory where the database will be stored in. This
345    /// * `profile` - The directory where the database will be stored in. This
346    ///
347    /// This is a fast non-blocking operation, it only queues up the event to be
348    /// added to the database. The events will be committed to the database
349    /// only when the user calls the `commit()` method.
350    pub fn add_event(&self, event: Event, profile: Profile) {
351        let message = ThreadMessage::Event((event, profile));
352        self.tx.send(message).unwrap();
353    }
354
355    /// Delete an event from the database.
356    ///
357    /// # Arguments
358    /// * `event_id` - The event id of the event that will be deleted.
359    ///
360    /// Note for the event to be completely removed a commit needs to be done.
361    ///
362    /// Returns a receiver that will receive an boolean once the event has
363    /// been deleted. The boolean indicates if the event was deleted or if a
364    /// commit will be needed.
365    pub fn delete_event(&self, event_id: &str) -> Receiver<Result<bool>> {
366        let (sender, receiver): (_, Receiver<Result<bool>>) = channel();
367        let message = ThreadMessage::Delete(sender, event_id.to_owned());
368        self.tx.send(message).unwrap();
369        receiver
370    }
371
372    fn commit_helper(&mut self, force: bool) -> Receiver<Result<()>> {
373        let (sender, receiver): (_, Receiver<Result<()>>) = channel();
374        self.tx.send(ThreadMessage::Write(sender, force)).unwrap();
375        receiver
376    }
377
378    /// Commit the currently queued up events. This method will block. A
379    /// non-blocking version of this method exists in the `commit_no_wait()`
380    /// method.
381    pub fn commit(&mut self) -> Result<()> {
382        self.commit_helper(false).recv().unwrap()
383    }
384
385    /// Commit the currently queued up events forcing the commit to the index.
386    ///
387    /// Commits are usually rate limited. This gets around the limit and forces
388    /// the documents to be added to the index.
389    ///
390    /// This method will block. A non-blocking version of this method exists in
391    /// the `force_commit_no_wait()` method.
392    ///
393    /// This should only be used for testing purposes.
394    pub fn force_commit(&mut self) -> Result<()> {
395        self.commit_helper(true).recv().unwrap()
396    }
397
398    /// Reload the database so that a search reflects the state of the last
399    /// commit. Note that this happens automatically and this method should be
400    /// used only in unit tests.
401    pub fn reload(&mut self) -> Result<()> {
402        self.index.reload()?;
403        Ok(())
404    }
405
406    /// Commit the currently queued up events without waiting for confirmation
407    /// that the operation is done.
408    ///
409    /// Returns a receiver that will receive an empty message once the commit is
410    /// done.
411    pub fn commit_no_wait(&mut self) -> Receiver<Result<()>> {
412        self.commit_helper(false)
413    }
414
415    /// Commit the currently queued up events forcing the commit to the index.
416    ///
417    /// Commits are usually rate limited. This gets around the limit and forces
418    /// the documents to be added to the index.
419    ///
420    /// This should only be used for testing purposes.
421    ///
422    /// Returns a receiver that will receive an empty message once the commit is
423    /// done.
424    pub fn force_commit_no_wait(&mut self) -> Receiver<Result<()>> {
425        self.commit_helper(true)
426    }
427
428    /// Add the given events from the room history to the database.
429    /// # Arguments
430    ///
431    /// * `events` - The events that will be added.
432    /// * `new_checkpoint` - A checkpoint that states where we need to continue
433    ///   fetching events from the room history. This checkpoint will be
434    ///   persisted in the database.
435    /// * `old_checkpoint` - The checkpoint that was used to fetch the given
436    ///   events. This checkpoint will be removed from the database.
437    pub fn add_historic_events(
438        &self,
439        events: Vec<(Event, Profile)>,
440        new_checkpoint: Option<CrawlerCheckpoint>,
441        old_checkpoint: Option<CrawlerCheckpoint>,
442    ) -> Receiver<Result<bool>> {
443        let (sender, receiver): (_, Receiver<Result<bool>>) = channel();
444        let payload = (new_checkpoint, old_checkpoint, events, sender);
445        let message = ThreadMessage::HistoricEvents(payload);
446        self.tx.send(message).unwrap();
447
448        receiver
449    }
450
451    /// Search the index and return events matching a search term.
452    /// This is just a helper function that gets a searcher and performs a
453    /// search on it immediately.
454    /// # Arguments
455    ///
456    /// * `term` - The search term that should be used to search the index.
457    pub fn search(&self, term: &str, config: &SearchConfig) -> Result<SearchBatch> {
458        let searcher = self.get_searcher();
459        searcher.search(term, config)
460    }
461
462    /// Get a searcher that can be used to perform a search.
463    pub fn get_searcher(&self) -> Searcher {
464        let index_searcher = self.index.get_searcher();
465        Searcher {
466            inner: index_searcher,
467            database: self.connection.clone(),
468        }
469    }
470
471    /// Get a database connection.
472    /// Note that this connection should only be used for reading.
473    pub fn get_connection(&self) -> Result<Connection> {
474        let connection = self.pool.get()?;
475        Database::unlock(&connection, &self.config)?;
476        Database::set_pragmas(&connection)?;
477
478        Ok(Connection {
479            inner: connection,
480            path: self.path.clone(),
481        })
482    }
483
484    /// Shut the database down.
485    ///
486    /// This will terminate the writer thread making sure that no writes will
487    /// happen after this operation.
488    pub fn shutdown(self) -> Receiver<Result<()>> {
489        let (sender, receiver): (_, Receiver<Result<()>>) = channel();
490        let message = ThreadMessage::ShutDown(sender);
491        self.tx.send(message).unwrap();
492        receiver
493    }
494
495    /// Delete the database.
496    /// Warning: This will delete the whole path that was provided at the
497    /// database creation time.
498    pub fn delete(self) -> Result<()> {
499        fs::remove_dir_all(self.path)?;
500        Ok(())
501    }
502}
503
504#[test]
505fn create_event_db() {
506    let tmpdir = tempdir().unwrap();
507    let _db = Database::new(tmpdir.path()).unwrap();
508}
509
510#[test]
511fn store_profile() {
512    let tmpdir = tempdir().unwrap();
513    let db = Database::new(tmpdir.path()).unwrap();
514
515    let profile = Profile::new("Alice", "");
516
517    let id = Database::save_profile(
518        &db.connection.lock().unwrap(),
519        "@alice.example.org",
520        &profile,
521    );
522    assert_eq!(id.unwrap(), 1);
523
524    let id = Database::save_profile(
525        &db.connection.lock().unwrap(),
526        "@alice.example.org",
527        &profile,
528    );
529    assert_eq!(id.unwrap(), 1);
530
531    let profile_new = Profile::new("Alice", "mxc://some_url");
532
533    let id = Database::save_profile(
534        &db.connection.lock().unwrap(),
535        "@alice.example.org",
536        &profile_new,
537    );
538    assert_eq!(id.unwrap(), 2);
539}
540
541#[test]
542fn store_empty_profile() {
543    let tmpdir = tempdir().unwrap();
544    let db = Database::new(tmpdir.path()).unwrap();
545
546    let profile = Profile {
547        displayname: None,
548        avatar_url: None,
549    };
550    let id = Database::save_profile(
551        &db.connection.lock().unwrap(),
552        "@alice.example.org",
553        &profile,
554    );
555    assert_eq!(id.unwrap(), 1);
556}
557
558#[test]
559fn store_event() {
560    let tmpdir = tempdir().unwrap();
561    let db = Database::new(tmpdir.path()).unwrap();
562    let profile = Profile::new("Alice", "");
563    let id = Database::save_profile(
564        &db.connection.lock().unwrap(),
565        "@alice.example.org",
566        &profile,
567    )
568    .unwrap();
569
570    let mut event = EVENT.clone();
571    let id = Database::save_event_helper(&db.connection.lock().unwrap(), &mut event, id).unwrap();
572    assert_eq!(id, 1);
573}
574
575#[test]
576fn store_event_and_profile() {
577    let tmpdir = tempdir().unwrap();
578    let db = Database::new(tmpdir.path()).unwrap();
579    let mut profile = Profile::new("Alice", "");
580    let mut event = EVENT.clone();
581    Database::save_event(&db.connection.lock().unwrap(), &mut event, &mut profile).unwrap();
582}
583
584#[test]
585fn load_event() {
586    let tmpdir = tempdir().unwrap();
587    let db = Database::new(tmpdir.path()).unwrap();
588    let mut profile = Profile::new("Alice", "");
589
590    let mut event = EVENT.clone();
591    Database::save_event(&db.connection.lock().unwrap(), &mut event, &mut profile).unwrap();
592    let events = Database::load_events(
593        &db.connection.lock().unwrap(),
594        &[
595            (1.0, "$15163622445EBvZJ:localhost".to_string()),
596            (0.3, "$FAKE".to_string()),
597        ],
598        0,
599        0,
600        false,
601    )
602    .unwrap();
603
604    assert_eq!(*EVENT.source, events[0].event_source)
605}
606
607#[test]
608fn commit_a_write() {
609    let tmpdir = tempdir().unwrap();
610    let mut db = Database::new(tmpdir.path()).unwrap();
611    db.commit().unwrap();
612}
613
614#[test]
615fn save_the_event_multithreaded() {
616    let tmpdir = tempdir().unwrap();
617    let mut db = Database::new(tmpdir.path()).unwrap();
618    let profile = Profile::new("Alice", "");
619
620    db.add_event(EVENT.clone(), profile);
621    db.commit().unwrap();
622    db.reload().unwrap();
623
624    let events = Database::load_events(
625        &db.connection.lock().unwrap(),
626        &[
627            (1.0, "$15163622445EBvZJ:localhost".to_string()),
628            (0.3, "$FAKE".to_string()),
629        ],
630        0,
631        0,
632        false,
633    )
634    .unwrap();
635
636    assert_eq!(*EVENT.source, events[0].event_source)
637}
638
639#[test]
640fn load_a_profile() {
641    let tmpdir = tempdir().unwrap();
642    let db = Database::new(tmpdir.path()).unwrap();
643
644    let profile = Profile::new("Alice", "");
645    let user_id = "@alice.example.org";
646    let profile_id =
647        Database::save_profile(&db.connection.lock().unwrap(), user_id, &profile).unwrap();
648
649    let loaded_profile =
650        Database::load_profile(&db.connection.lock().unwrap(), profile_id).unwrap();
651
652    assert_eq!(profile, loaded_profile);
653}
654
655#[test]
656fn load_event_context() {
657    let tmpdir = tempdir().unwrap();
658    let mut db = Database::new(tmpdir.path()).unwrap();
659    let profile = Profile::new("Alice", "");
660
661    db.add_event(EVENT.clone(), profile.clone());
662
663    let mut before_event = None;
664
665    for i in 1..6 {
666        let mut event: Event = Faker.fake();
667        event.server_ts = EVENT.server_ts - i;
668        event.source = format!("Hello before event {}", i);
669
670        if before_event.is_none() {
671            before_event = Some(event.clone());
672        }
673
674        db.add_event(event, profile.clone());
675    }
676
677    let mut after_event = None;
678
679    for i in 1..6 {
680        let mut event: Event = Faker.fake();
681        event.server_ts = EVENT.server_ts + i;
682        event.source = format!("Hello after event {}", i);
683
684        if after_event.is_none() {
685            after_event = Some(event.clone());
686        }
687
688        db.add_event(event, profile.clone());
689    }
690
691    db.commit().unwrap();
692
693    for i in 1..5 {
694        let (before, after, _) =
695            Database::load_event_context(&db.connection.lock().unwrap(), &EVENT, 1, 1).unwrap();
696
697        if (before.len() != 1
698            || after.len() != 1
699            || before[0] != before_event.as_ref().unwrap().source
700            || after[0] != after_event.as_ref().unwrap().source)
701            && i != 10
702        {
703            thread::sleep(time::Duration::from_millis(10));
704            continue;
705        }
706
707        assert_eq!(before.len(), 1);
708        assert_eq!(before[0], before_event.as_ref().unwrap().source);
709        assert_eq!(after.len(), 1);
710        assert_eq!(after[0], after_event.as_ref().unwrap().source);
711
712        return;
713    }
714}
715
716#[test]
717fn save_and_load_checkpoints() {
718    let tmpdir = tempdir().unwrap();
719    let db = Database::new(tmpdir.path()).unwrap();
720
721    let checkpoint = CrawlerCheckpoint {
722        room_id: "!test:room".to_string(),
723        token: "1234".to_string(),
724        full_crawl: false,
725        direction: CheckpointDirection::Backwards,
726    };
727
728    let mut connection = db.get_connection().unwrap();
729    let transaction = connection.transaction().unwrap();
730
731    Database::replace_crawler_checkpoint(&transaction, Some(&checkpoint), None).unwrap();
732    transaction.commit().unwrap();
733
734    let checkpoints = connection.load_checkpoints().unwrap();
735
736    println!("{:?}", checkpoints);
737
738    assert!(checkpoints.contains(&checkpoint));
739
740    let new_checkpoint = CrawlerCheckpoint {
741        room_id: "!test:room".to_string(),
742        token: "12345".to_string(),
743        full_crawl: false,
744        direction: CheckpointDirection::Backwards,
745    };
746
747    Database::replace_crawler_checkpoint(&connection, Some(&new_checkpoint), Some(&checkpoint))
748        .unwrap();
749
750    let checkpoints = connection.load_checkpoints().unwrap();
751
752    assert!(!checkpoints.contains(&checkpoint));
753    assert!(checkpoints.contains(&new_checkpoint));
754}
755
756#[test]
757fn duplicate_empty_profiles() {
758    let tmpdir = tempdir().unwrap();
759    let db = Database::new(tmpdir.path()).unwrap();
760    let profile = Profile {
761        displayname: None,
762        avatar_url: None,
763    };
764    let user_id = "@alice.example.org";
765
766    let first_id =
767        Database::save_profile(&db.connection.lock().unwrap(), user_id, &profile).unwrap();
768    let second_id =
769        Database::save_profile(&db.connection.lock().unwrap(), user_id, &profile).unwrap();
770
771    assert_eq!(first_id, second_id);
772
773    let connection = db.connection.lock().unwrap();
774
775    let mut stmt = connection
776        .prepare("SELECT id FROM profile WHERE user_id=?1")
777        .unwrap();
778
779    let profile_ids = stmt.query_map([user_id], |row| row.get(0)).unwrap();
780
781    let mut id_count = 0;
782
783    for row in profile_ids {
784        let _profile_id: i64 = row.unwrap();
785        id_count += 1;
786    }
787
788    assert_eq!(id_count, 1);
789}
790
791#[test]
792fn is_empty() {
793    let tmpdir = tempdir().unwrap();
794    let mut db = Database::new(tmpdir.path()).unwrap();
795    let connection = db.get_connection().unwrap();
796    assert!(connection.is_empty().unwrap());
797
798    let profile = Profile::new("Alice", "");
799    db.add_event(EVENT.clone(), profile);
800    db.commit().unwrap();
801    assert!(!connection.is_empty().unwrap());
802}
803
804#[cfg(feature = "encryption")]
805#[test]
806fn encrypted_db() {
807    let tmpdir = tempdir().unwrap();
808    let db_config = Config::new().set_passphrase("test");
809    let mut db = match Database::new_with_config(tmpdir.path(), &db_config) {
810        Ok(db) => db,
811        Err(e) => panic!("Coulnd't open encrypted database {}", e),
812    };
813
814    let connection = match db.get_connection() {
815        Ok(c) => c,
816        Err(e) => panic!("Could not get database connection {}", e),
817    };
818
819    assert!(
820        connection.is_empty().unwrap(),
821        "New database should be empty"
822    );
823
824    let profile = Profile::new("Alice", "");
825    db.add_event(EVENT.clone(), profile);
826
827    match db.commit() {
828        Ok(_) => (),
829        Err(e) => panic!("Could not commit events to database {}", e),
830    }
831    assert!(
832        !connection.is_empty().unwrap(),
833        "Database shouldn't be empty anymore"
834    );
835
836    drop(db);
837
838    let db = Database::new(tmpdir.path());
839    assert!(
840        db.is_err(),
841        "opening the database without a passphrase should fail"
842    );
843}
844
845#[cfg(feature = "encryption")]
846#[test]
847fn change_passphrase() {
848    let tmpdir = tempdir().unwrap();
849    let db_config = Config::new().set_passphrase("test");
850    let mut db = match Database::new_with_config(tmpdir.path(), &db_config) {
851        Ok(db) => db,
852        Err(e) => panic!("Coulnd't open encrypted database {}", e),
853    };
854
855    let connection = db
856        .get_connection()
857        .expect("Could not get database connection");
858    assert!(
859        connection.is_empty().unwrap(),
860        "New database should be empty"
861    );
862
863    let profile = Profile::new("Alice", "");
864    db.add_event(EVENT.clone(), profile);
865
866    db.commit().expect("Could not commit events to database");
867    db.change_passphrase("wordpass")
868        .expect("Could not change the database passphrase");
869
870    let db_config = Config::new().set_passphrase("wordpass");
871    let db = Database::new_with_config(tmpdir.path(), &db_config)
872        .expect("Could not open database with the new passphrase");
873    let connection = db
874        .get_connection()
875        .expect("Could not get database connection");
876    assert!(
877        !connection.is_empty().unwrap(),
878        "Database shouldn't be empty anymore"
879    );
880    drop(db);
881
882    let db_config = Config::new().set_passphrase("test");
883    let db = Database::new_with_config(tmpdir.path(), &db_config);
884    assert!(
885        db.is_err(),
886        "opening the database without a passphrase should fail"
887    );
888}
889
890#[test]
891fn resume_committing() {
892    let tmpdir = tempdir().unwrap();
893    let mut db = Database::new(tmpdir.path()).unwrap();
894    let profile = Profile::new("Alice", "");
895
896    // Check that we don't have any uncommitted events.
897    assert!(
898        Database::load_uncommitted_events(&db.connection.lock().unwrap())
899            .unwrap()
900            .is_empty()
901    );
902
903    db.add_event(EVENT.clone(), profile);
904    db.commit().unwrap();
905    db.reload().unwrap();
906
907    // Now we do have uncommitted events.
908    assert!(
909        !Database::load_uncommitted_events(&db.connection.lock().unwrap())
910            .unwrap()
911            .is_empty()
912    );
913
914    // Since the event wasn't committed to the index the search should fail.
915    assert!(db
916        .search("test", &SearchConfig::new())
917        .unwrap()
918        .results
919        .is_empty());
920
921    // Let us drop the DB to check if we're loading the uncommitted events
922    // correctly.
923    drop(db);
924    let mut counter = 0;
925    let mut db = Database::new(tmpdir.path());
926
927    // Tantivy might still be in the process of being shut down
928    // and hold on to the write lock. Meaning that opening the database might
929    // not succeed immediately. Retry a couple of times before giving up.
930    while db.is_err() {
931        counter += 1;
932        if counter > 10 {
933            break;
934        }
935        thread::sleep(time::Duration::from_millis(100));
936        db = Database::new(tmpdir.path())
937    }
938
939    let mut db = db.unwrap();
940
941    // We still have uncommitted events.
942    assert_eq!(
943        Database::load_uncommitted_events(&db.connection.lock().unwrap()).unwrap()[0].1,
944        *EVENT
945    );
946
947    db.force_commit().unwrap();
948    db.reload().unwrap();
949
950    // A forced commit gets rid of our uncommitted events.
951    assert!(
952        Database::load_uncommitted_events(&db.connection.lock().unwrap())
953            .unwrap()
954            .is_empty()
955    );
956
957    let result = db.search("test", &SearchConfig::new()).unwrap().results;
958
959    // The search is now successful.
960    assert!(!result.is_empty());
961    assert_eq!(result.len(), 1);
962    assert_eq!(result[0].event_source, EVENT.source);
963}
964
965#[test]
966fn delete_uncommitted() {
967    let tmpdir = tempdir().unwrap();
968    let mut db = Database::new(tmpdir.path()).unwrap();
969    let profile = Profile::new("Alice", "");
970
971    for i in 1..1000 {
972        let mut event: Event = Faker.fake();
973        event.server_ts += i;
974        db.add_event(event, profile.clone());
975
976        if i % 100 == 0 {
977            db.commit().unwrap();
978        }
979    }
980
981    db.force_commit().unwrap();
982    assert!(
983        Database::load_uncommitted_events(&db.connection.lock().unwrap())
984            .unwrap()
985            .is_empty()
986    );
987}
988
989#[test]
990fn stats_getting() {
991    let tmpdir = tempdir().unwrap();
992    let mut db = Database::new(tmpdir.path()).unwrap();
993    let profile = Profile::new("Alice", "");
994
995    for i in 0..1000 {
996        let mut event: Event = Faker.fake();
997        event.server_ts += i;
998        db.add_event(event, profile.clone());
999    }
1000
1001    db.commit().unwrap();
1002
1003    let connection = db.get_connection().unwrap();
1004
1005    let stats = connection.get_stats().unwrap();
1006
1007    assert_eq!(stats.event_count, 1000);
1008    assert_eq!(stats.room_count, 1);
1009    assert!(stats.size > 0);
1010}
1011
1012#[test]
1013fn database_upgrade_v1() {
1014    let mut path = PathBuf::from(file!());
1015    path.pop();
1016    path.pop();
1017    path.pop();
1018    path.push("data/database/v1");
1019    let db = Database::new(path);
1020
1021    // Sadly the v1 database has invalid json in the source field, reindexing it
1022    // won't be possible. Let's check that it's marked for a reindex.
1023    match db {
1024        Ok(_) => panic!("Database doesn't need a reindex."),
1025        Err(e) => match e {
1026            Error::ReindexError => (),
1027            e => panic!("Database doesn't need a reindex: {}", e),
1028        },
1029    }
1030}
1031
1032#[cfg(test)]
1033use crate::database::recovery::test::reindex_loop;
1034
1035#[test]
1036fn database_upgrade_v1_2() {
1037    let mut path = PathBuf::from(file!());
1038    path.pop();
1039    path.pop();
1040    path.pop();
1041    path.push("data/database/v1_2");
1042    let db = Database::new(&path);
1043    match db {
1044        Ok(_) => panic!("Database doesn't need a reindex."),
1045        Err(e) => match e {
1046            Error::ReindexError => (),
1047            e => panic!("Database doesn't need a reindex: {}", e),
1048        },
1049    }
1050
1051    let mut recovery_db = RecoveryDatabase::new(&path).expect("Can't open recovery db");
1052
1053    recovery_db.delete_the_index().unwrap();
1054    recovery_db.open_index().unwrap();
1055
1056    let events = recovery_db.load_events_deserialized(100, None).unwrap();
1057
1058    recovery_db.index_events(&events).unwrap();
1059    reindex_loop(&mut recovery_db, events).unwrap();
1060    recovery_db.commit_and_close().unwrap();
1061
1062    let db = Database::new(&path).expect("Can't open the db event after a reindex");
1063
1064    let mut connection = db.get_connection().unwrap();
1065    let (version, _) = Database::get_version(&mut connection).unwrap();
1066    assert_eq!(version, DATABASE_VERSION);
1067
1068    let result = db.search("Hello", &SearchConfig::new()).unwrap().results;
1069    assert!(!result.is_empty())
1070}
1071
1072#[test]
1073fn delete_an_event() {
1074    let tmpdir = tempdir().unwrap();
1075    let mut db = Database::new(tmpdir.path()).unwrap();
1076    let profile = Profile::new("Alice", "");
1077
1078    db.add_event(EVENT.clone(), profile.clone());
1079    db.add_event(TOPIC_EVENT.clone(), profile);
1080
1081    db.force_commit().unwrap();
1082
1083    assert!(
1084        Database::load_pending_deletion_events(&db.connection.lock().unwrap())
1085            .unwrap()
1086            .is_empty()
1087    );
1088
1089    let recv = db.delete_event(&EVENT.event_id);
1090    recv.recv().unwrap().unwrap();
1091
1092    assert_eq!(
1093        Database::load_pending_deletion_events(&db.connection.lock().unwrap())
1094            .unwrap()
1095            .len(),
1096        1
1097    );
1098
1099    drop(db);
1100
1101    let mut db = Database::new(tmpdir.path()).unwrap();
1102    assert_eq!(
1103        Database::load_pending_deletion_events(&db.connection.lock().unwrap())
1104            .unwrap()
1105            .len(),
1106        1
1107    );
1108
1109    db.force_commit().unwrap();
1110    assert_eq!(
1111        Database::load_pending_deletion_events(&db.connection.lock().unwrap())
1112            .unwrap()
1113            .len(),
1114        0
1115    );
1116}
1117
1118#[test]
1119fn add_events_with_null_byte() {
1120    let event_source: &str = r#"{
1121        "content": {
1122            "body": "\u00000",
1123            "msgtype": "m.text"
1124        },
1125        "event_id": "$15163622448EBvZJ:localhost",
1126        "origin_server_ts": 1516362244050,
1127        "sender": "@example2:localhost",
1128        "type": "m.room.message",
1129        "unsigned": {"age": 43289803098},
1130        "user_id": "@example2:localhost",
1131        "age": 43289803098,
1132        "room_id": "!test:example.org"
1133    }"#;
1134
1135    let event = RecoveryDatabase::event_from_json(event_source).unwrap();
1136
1137    let tmpdir = tempdir().unwrap();
1138    let db = Database::new(tmpdir.path()).unwrap();
1139    let profile = Profile::new("Alice", &event.content_value);
1140
1141    let events = vec![(event, profile)];
1142    db.add_historic_events(events, None, None)
1143        .recv()
1144        .unwrap()
1145        .expect("Event should be added");
1146}
1147
1148#[test]
1149fn is_room_indexed() {
1150    let tmpdir = tempdir().unwrap();
1151    let mut db = Database::new(tmpdir.path()).unwrap();
1152
1153    let connection = db.get_connection().unwrap();
1154
1155    assert!(connection.is_empty().unwrap());
1156    assert!(!connection.is_room_indexed("!test_room:localhost").unwrap());
1157
1158    let profile = Profile::new("Alice", "");
1159    db.add_event(EVENT.clone(), profile);
1160    db.force_commit().unwrap();
1161
1162    assert!(connection.is_room_indexed("!test_room:localhost").unwrap());
1163    assert!(!connection.is_room_indexed("!test_room2:localhost").unwrap());
1164}
1165
1166#[test]
1167fn user_version() {
1168    let tmpdir = tempdir().unwrap();
1169    let db = Database::new(tmpdir.path()).unwrap();
1170    let connection = db.get_connection().unwrap();
1171
1172    assert_eq!(connection.get_user_version().unwrap(), 0);
1173    connection.set_user_version(10).unwrap();
1174    assert_eq!(connection.get_user_version().unwrap(), 10);
1175}
1176
1177#[test]
1178#[cfg(feature = "encryption")]
1179fn sqlcipher_cipher_settings_update() {
1180    let mut path = PathBuf::from(file!());
1181    path.pop();
1182    path.pop();
1183    path.pop();
1184    path.push("data/database/sqlcipher-v3");
1185
1186    let config = Config::new().set_passphrase("qR17RdpWurSh2pQRSc/EnsaO9V041kOwsZk0iSdUY/g");
1187    let _db =
1188        Database::new_with_config(&path, &config).expect("We should be able to open the database");
1189}