1mod connection;
16mod recovery;
17mod searcher;
18mod static_methods;
19mod writer;
20
21use fs_extra::dir;
22use r2d2::{Pool, PooledConnection};
23use r2d2_sqlite::SqliteConnectionManager;
24use rusqlite::ToSql;
25use std::{
26 fs,
27 path::{Path, PathBuf},
28 sync::{
29 mpsc::{channel, Receiver, Sender},
30 Arc, Mutex,
31 },
32 thread,
33 thread::JoinHandle,
34};
35
36pub use crate::database::{
37 connection::{Connection, DatabaseStats},
38 recovery::{RecoveryDatabase, RecoveryInfo},
39 searcher::{SearchBatch, SearchResult, Searcher},
40};
41use crate::{
42 config::{Config, SearchConfig},
43 database::writer::Writer,
44 error::{Error, Result},
45 events::{CrawlerCheckpoint, Event, EventId, HistoricEventsT, Profile},
46 index::{Index, Writer as IndexWriter},
47};
48
49#[cfg(test)]
50use fake::{Fake, Faker};
51#[cfg(test)]
52use std::time;
53#[cfg(test)]
54use tempfile::tempdir;
55
56#[cfg(test)]
57use crate::events::CheckpointDirection;
58#[cfg(test)]
59use crate::{EVENT, TOPIC_EVENT};
60
61const DATABASE_VERSION: i64 = 4;
62const EVENTS_DB_NAME: &str = "events.db";
63
64pub(crate) enum ThreadMessage {
65 Event((Event, Profile)),
66 HistoricEvents(HistoricEventsT),
67 Write(Sender<Result<()>>, bool),
68 Delete(Sender<Result<bool>>, EventId),
69 ShutDown(Sender<Result<()>>),
70}
71
72pub struct Database {
74 path: PathBuf,
75 connection: Arc<Mutex<PooledConnection<SqliteConnectionManager>>>,
76 pool: r2d2::Pool<SqliteConnectionManager>,
77 _write_thread: JoinHandle<()>,
78 tx: Sender<ThreadMessage>,
79 index: Index,
80 config: Config,
81}
82
83type WriterRet = (JoinHandle<()>, Sender<ThreadMessage>);
84
85impl Database {
86 pub fn new<P: AsRef<Path>>(path: P) -> Result<Database>
92 where
93 PathBuf: std::convert::From<P>,
94 {
95 Database::new_with_config(path, &Config::new())
96 }
97
98 pub fn new_with_config<P: AsRef<Path>>(path: P, config: &Config) -> Result<Database>
106 where
107 PathBuf: std::convert::From<P>,
108 {
109 let db_path = path.as_ref().join(EVENTS_DB_NAME);
110 let pool = Self::get_pool(&db_path, config)?;
111
112 let mut connection = pool.get()?;
113
114 Database::unlock(&connection, config)?;
115 Database::set_pragmas(&connection)?;
116
117 let (version, reindex_needed) = match Database::get_version(&mut connection) {
118 Ok(ret) => ret,
119 Err(e) => return Err(Error::DatabaseOpenError(e.to_string())),
120 };
121
122 Database::create_tables(&connection)?;
123
124 if version != DATABASE_VERSION {
125 return Err(Error::DatabaseVersionError);
126 }
127
128 if reindex_needed {
129 return Err(Error::ReindexError);
130 }
131
132 let index = Database::create_index(&path, config)?;
133 let writer = index.get_writer()?;
134
135 let writer_connection = pool.get()?;
140 Database::unlock(&writer_connection, config)?;
141 Database::set_pragmas(&writer_connection)?;
142
143 let (t_handle, tx) = Database::spawn_writer(writer_connection, writer);
144
145 Ok(Database {
146 path: path.into(),
147 connection: Arc::new(Mutex::new(connection)),
148 pool,
149 _write_thread: t_handle,
150 tx,
151 index,
152 config: config.clone(),
153 })
154 }
155
156 fn get_pool(db_path: &PathBuf, config: &Config) -> Result<Pool<SqliteConnectionManager>> {
157 let manager = SqliteConnectionManager::file(db_path);
158 let pool = r2d2::Pool::new(manager)?;
159 let connection = pool.get()?;
160
161 match Database::unlock(&connection, config) {
163 Ok(_) => Ok(pool),
165 Err(_) => {
166 let Some(passphrase) = &config.passphrase else {
167 return Err(Error::DatabaseUnlockError("Invalid passphrase".to_owned()));
170 };
171
172 let connection = pool.get()?;
178 connection.pragma_update(None, "key", &passphrase.as_str() as &dyn ToSql)?;
179
180 let mut statement = connection.prepare("PRAGMA cipher_migrate")?;
181 let result = statement.query_row([], |row| row.get::<usize, String>(0))?;
182
183 if result == "0" {
186 let manager = SqliteConnectionManager::file(db_path);
189 let pool = r2d2::Pool::new(manager)?;
190
191 Ok(pool)
192 } else {
193 Err(Error::DatabaseUnlockError("Invalid passphrase".to_owned()))
194 }
195 }
196 }
197 }
198
199 fn set_pragmas(connection: &rusqlite::Connection) -> Result<()> {
200 connection.pragma_update(None, "foreign_keys", &1 as &dyn ToSql)?;
201 connection.pragma_update(None, "journal_mode", "WAL")?;
202 connection.pragma_update(None, "synchronous", "NORMAL")?;
203 connection.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);")?;
204 Ok(())
205 }
206
207 #[cfg(feature = "encryption")]
220 pub fn change_passphrase(self, new_passphrase: &str) -> Result<()> {
221 match &self.config.passphrase {
222 Some(p) => {
223 Index::change_passphrase(&self.path, p, new_passphrase)?;
224 self.connection.lock().unwrap().pragma_update(
225 None,
226 "rekey",
227 &new_passphrase as &dyn ToSql,
228 )?;
229 }
230 None => panic!("Database isn't encrypted"),
231 }
232
233 let receiver = self.shutdown();
234 receiver.recv().unwrap()?;
235
236 Ok(())
237 }
238
239 #[cfg(feature = "encryption")]
240 fn unlock(connection: &rusqlite::Connection, config: &Config) -> Result<()> {
241 let passphrase: &String = if let Some(ref p) = config.passphrase {
242 p
243 } else {
244 return Ok(());
245 };
246
247 let mut statement = connection.prepare("PRAGMA cipher_version")?;
248 let results = statement.query_map([], |row| row.get::<usize, String>(0))?;
249
250 if results.count() != 1 {
251 return Err(Error::SqlCipherError(
252 "Sqlcipher support is missing".to_string(),
253 ));
254 }
255
256 connection.pragma_update(None, "key", passphrase as &dyn ToSql)?;
257
258 let count: std::result::Result<i64, rusqlite::Error> =
259 connection.query_row("SELECT COUNT(*) FROM sqlite_master", [], |row| row.get(0));
260
261 match count {
262 Ok(_) => Ok(()),
263 Err(_) => Err(Error::DatabaseUnlockError("Invalid passphrase".to_owned())),
264 }
265 }
266
267 #[cfg(not(feature = "encryption"))]
268 fn unlock(_: &rusqlite::Connection, _: &Config) -> Result<()> {
269 Ok(())
270 }
271
272 pub fn get_size(&self) -> Result<u64> {
275 Ok(dir::get_size(self.get_path())?)
276 }
277
278 pub fn get_path(&self) -> &Path {
280 self.path.as_path()
281 }
282
283 fn create_index<P: AsRef<Path>>(path: &P, config: &Config) -> Result<Index> {
284 Ok(Index::new(path, config)?)
285 }
286
287 fn spawn_writer(
288 connection: PooledConnection<SqliteConnectionManager>,
289 index_writer: IndexWriter,
290 ) -> WriterRet {
291 let (tx, rx): (_, Receiver<ThreadMessage>) = channel();
292
293 let t_handle = thread::spawn(move || {
294 let mut writer = Writer::new(connection, index_writer);
295 let mut loaded_unprocessed = false;
296
297 while let Ok(message) = rx.recv() {
298 match message {
299 ThreadMessage::Event((event, profile)) => writer.add_event(event, profile),
300 ThreadMessage::Write(sender, force_commit) => {
301 if !loaded_unprocessed {
307 let ret = writer.load_unprocessed_events();
308
309 loaded_unprocessed = true;
310
311 if ret.is_err() {
312 sender.send(ret).unwrap_or(());
313 continue;
314 }
315 }
316 let ret = writer.write_queued_events(force_commit);
317 sender.send(ret).unwrap_or(());
319 }
320 ThreadMessage::HistoricEvents(m) => {
321 let (check, old_check, events, sender) = m;
322 let ret = writer.write_historic_events(check, old_check, events, true);
323 sender.send(ret).unwrap_or(());
324 }
325 ThreadMessage::Delete(sender, event_id) => {
326 let ret = writer.delete_event(event_id);
327 sender.send(ret).unwrap_or(());
328 }
329 ThreadMessage::ShutDown(sender) => {
330 let ret = writer.shutdown();
331 sender.send(ret).unwrap_or(());
332 return;
333 }
334 };
335 }
336 });
337
338 (t_handle, tx)
339 }
340
341 pub fn add_event(&self, event: Event, profile: Profile) {
351 let message = ThreadMessage::Event((event, profile));
352 self.tx.send(message).unwrap();
353 }
354
355 pub fn delete_event(&self, event_id: &str) -> Receiver<Result<bool>> {
366 let (sender, receiver): (_, Receiver<Result<bool>>) = channel();
367 let message = ThreadMessage::Delete(sender, event_id.to_owned());
368 self.tx.send(message).unwrap();
369 receiver
370 }
371
372 fn commit_helper(&mut self, force: bool) -> Receiver<Result<()>> {
373 let (sender, receiver): (_, Receiver<Result<()>>) = channel();
374 self.tx.send(ThreadMessage::Write(sender, force)).unwrap();
375 receiver
376 }
377
378 pub fn commit(&mut self) -> Result<()> {
382 self.commit_helper(false).recv().unwrap()
383 }
384
385 pub fn force_commit(&mut self) -> Result<()> {
395 self.commit_helper(true).recv().unwrap()
396 }
397
398 pub fn reload(&mut self) -> Result<()> {
402 self.index.reload()?;
403 Ok(())
404 }
405
406 pub fn commit_no_wait(&mut self) -> Receiver<Result<()>> {
412 self.commit_helper(false)
413 }
414
415 pub fn force_commit_no_wait(&mut self) -> Receiver<Result<()>> {
425 self.commit_helper(true)
426 }
427
428 pub fn add_historic_events(
438 &self,
439 events: Vec<(Event, Profile)>,
440 new_checkpoint: Option<CrawlerCheckpoint>,
441 old_checkpoint: Option<CrawlerCheckpoint>,
442 ) -> Receiver<Result<bool>> {
443 let (sender, receiver): (_, Receiver<Result<bool>>) = channel();
444 let payload = (new_checkpoint, old_checkpoint, events, sender);
445 let message = ThreadMessage::HistoricEvents(payload);
446 self.tx.send(message).unwrap();
447
448 receiver
449 }
450
451 pub fn search(&self, term: &str, config: &SearchConfig) -> Result<SearchBatch> {
458 let searcher = self.get_searcher();
459 searcher.search(term, config)
460 }
461
462 pub fn get_searcher(&self) -> Searcher {
464 let index_searcher = self.index.get_searcher();
465 Searcher {
466 inner: index_searcher,
467 database: self.connection.clone(),
468 }
469 }
470
471 pub fn get_connection(&self) -> Result<Connection> {
474 let connection = self.pool.get()?;
475 Database::unlock(&connection, &self.config)?;
476 Database::set_pragmas(&connection)?;
477
478 Ok(Connection {
479 inner: connection,
480 path: self.path.clone(),
481 })
482 }
483
484 pub fn shutdown(self) -> Receiver<Result<()>> {
489 let (sender, receiver): (_, Receiver<Result<()>>) = channel();
490 let message = ThreadMessage::ShutDown(sender);
491 self.tx.send(message).unwrap();
492 receiver
493 }
494
495 pub fn delete(self) -> Result<()> {
499 fs::remove_dir_all(self.path)?;
500 Ok(())
501 }
502}
503
504#[test]
505fn create_event_db() {
506 let tmpdir = tempdir().unwrap();
507 let _db = Database::new(tmpdir.path()).unwrap();
508}
509
510#[test]
511fn store_profile() {
512 let tmpdir = tempdir().unwrap();
513 let db = Database::new(tmpdir.path()).unwrap();
514
515 let profile = Profile::new("Alice", "");
516
517 let id = Database::save_profile(
518 &db.connection.lock().unwrap(),
519 "@alice.example.org",
520 &profile,
521 );
522 assert_eq!(id.unwrap(), 1);
523
524 let id = Database::save_profile(
525 &db.connection.lock().unwrap(),
526 "@alice.example.org",
527 &profile,
528 );
529 assert_eq!(id.unwrap(), 1);
530
531 let profile_new = Profile::new("Alice", "mxc://some_url");
532
533 let id = Database::save_profile(
534 &db.connection.lock().unwrap(),
535 "@alice.example.org",
536 &profile_new,
537 );
538 assert_eq!(id.unwrap(), 2);
539}
540
541#[test]
542fn store_empty_profile() {
543 let tmpdir = tempdir().unwrap();
544 let db = Database::new(tmpdir.path()).unwrap();
545
546 let profile = Profile {
547 displayname: None,
548 avatar_url: None,
549 };
550 let id = Database::save_profile(
551 &db.connection.lock().unwrap(),
552 "@alice.example.org",
553 &profile,
554 );
555 assert_eq!(id.unwrap(), 1);
556}
557
558#[test]
559fn store_event() {
560 let tmpdir = tempdir().unwrap();
561 let db = Database::new(tmpdir.path()).unwrap();
562 let profile = Profile::new("Alice", "");
563 let id = Database::save_profile(
564 &db.connection.lock().unwrap(),
565 "@alice.example.org",
566 &profile,
567 )
568 .unwrap();
569
570 let mut event = EVENT.clone();
571 let id = Database::save_event_helper(&db.connection.lock().unwrap(), &mut event, id).unwrap();
572 assert_eq!(id, 1);
573}
574
575#[test]
576fn store_event_and_profile() {
577 let tmpdir = tempdir().unwrap();
578 let db = Database::new(tmpdir.path()).unwrap();
579 let mut profile = Profile::new("Alice", "");
580 let mut event = EVENT.clone();
581 Database::save_event(&db.connection.lock().unwrap(), &mut event, &mut profile).unwrap();
582}
583
584#[test]
585fn load_event() {
586 let tmpdir = tempdir().unwrap();
587 let db = Database::new(tmpdir.path()).unwrap();
588 let mut profile = Profile::new("Alice", "");
589
590 let mut event = EVENT.clone();
591 Database::save_event(&db.connection.lock().unwrap(), &mut event, &mut profile).unwrap();
592 let events = Database::load_events(
593 &db.connection.lock().unwrap(),
594 &[
595 (1.0, "$15163622445EBvZJ:localhost".to_string()),
596 (0.3, "$FAKE".to_string()),
597 ],
598 0,
599 0,
600 false,
601 )
602 .unwrap();
603
604 assert_eq!(*EVENT.source, events[0].event_source)
605}
606
607#[test]
608fn commit_a_write() {
609 let tmpdir = tempdir().unwrap();
610 let mut db = Database::new(tmpdir.path()).unwrap();
611 db.commit().unwrap();
612}
613
614#[test]
615fn save_the_event_multithreaded() {
616 let tmpdir = tempdir().unwrap();
617 let mut db = Database::new(tmpdir.path()).unwrap();
618 let profile = Profile::new("Alice", "");
619
620 db.add_event(EVENT.clone(), profile);
621 db.commit().unwrap();
622 db.reload().unwrap();
623
624 let events = Database::load_events(
625 &db.connection.lock().unwrap(),
626 &[
627 (1.0, "$15163622445EBvZJ:localhost".to_string()),
628 (0.3, "$FAKE".to_string()),
629 ],
630 0,
631 0,
632 false,
633 )
634 .unwrap();
635
636 assert_eq!(*EVENT.source, events[0].event_source)
637}
638
639#[test]
640fn load_a_profile() {
641 let tmpdir = tempdir().unwrap();
642 let db = Database::new(tmpdir.path()).unwrap();
643
644 let profile = Profile::new("Alice", "");
645 let user_id = "@alice.example.org";
646 let profile_id =
647 Database::save_profile(&db.connection.lock().unwrap(), user_id, &profile).unwrap();
648
649 let loaded_profile =
650 Database::load_profile(&db.connection.lock().unwrap(), profile_id).unwrap();
651
652 assert_eq!(profile, loaded_profile);
653}
654
655#[test]
656fn load_event_context() {
657 let tmpdir = tempdir().unwrap();
658 let mut db = Database::new(tmpdir.path()).unwrap();
659 let profile = Profile::new("Alice", "");
660
661 db.add_event(EVENT.clone(), profile.clone());
662
663 let mut before_event = None;
664
665 for i in 1..6 {
666 let mut event: Event = Faker.fake();
667 event.server_ts = EVENT.server_ts - i;
668 event.source = format!("Hello before event {}", i);
669
670 if before_event.is_none() {
671 before_event = Some(event.clone());
672 }
673
674 db.add_event(event, profile.clone());
675 }
676
677 let mut after_event = None;
678
679 for i in 1..6 {
680 let mut event: Event = Faker.fake();
681 event.server_ts = EVENT.server_ts + i;
682 event.source = format!("Hello after event {}", i);
683
684 if after_event.is_none() {
685 after_event = Some(event.clone());
686 }
687
688 db.add_event(event, profile.clone());
689 }
690
691 db.commit().unwrap();
692
693 for i in 1..5 {
694 let (before, after, _) =
695 Database::load_event_context(&db.connection.lock().unwrap(), &EVENT, 1, 1).unwrap();
696
697 if (before.len() != 1
698 || after.len() != 1
699 || before[0] != before_event.as_ref().unwrap().source
700 || after[0] != after_event.as_ref().unwrap().source)
701 && i != 10
702 {
703 thread::sleep(time::Duration::from_millis(10));
704 continue;
705 }
706
707 assert_eq!(before.len(), 1);
708 assert_eq!(before[0], before_event.as_ref().unwrap().source);
709 assert_eq!(after.len(), 1);
710 assert_eq!(after[0], after_event.as_ref().unwrap().source);
711
712 return;
713 }
714}
715
716#[test]
717fn save_and_load_checkpoints() {
718 let tmpdir = tempdir().unwrap();
719 let db = Database::new(tmpdir.path()).unwrap();
720
721 let checkpoint = CrawlerCheckpoint {
722 room_id: "!test:room".to_string(),
723 token: "1234".to_string(),
724 full_crawl: false,
725 direction: CheckpointDirection::Backwards,
726 };
727
728 let mut connection = db.get_connection().unwrap();
729 let transaction = connection.transaction().unwrap();
730
731 Database::replace_crawler_checkpoint(&transaction, Some(&checkpoint), None).unwrap();
732 transaction.commit().unwrap();
733
734 let checkpoints = connection.load_checkpoints().unwrap();
735
736 println!("{:?}", checkpoints);
737
738 assert!(checkpoints.contains(&checkpoint));
739
740 let new_checkpoint = CrawlerCheckpoint {
741 room_id: "!test:room".to_string(),
742 token: "12345".to_string(),
743 full_crawl: false,
744 direction: CheckpointDirection::Backwards,
745 };
746
747 Database::replace_crawler_checkpoint(&connection, Some(&new_checkpoint), Some(&checkpoint))
748 .unwrap();
749
750 let checkpoints = connection.load_checkpoints().unwrap();
751
752 assert!(!checkpoints.contains(&checkpoint));
753 assert!(checkpoints.contains(&new_checkpoint));
754}
755
756#[test]
757fn duplicate_empty_profiles() {
758 let tmpdir = tempdir().unwrap();
759 let db = Database::new(tmpdir.path()).unwrap();
760 let profile = Profile {
761 displayname: None,
762 avatar_url: None,
763 };
764 let user_id = "@alice.example.org";
765
766 let first_id =
767 Database::save_profile(&db.connection.lock().unwrap(), user_id, &profile).unwrap();
768 let second_id =
769 Database::save_profile(&db.connection.lock().unwrap(), user_id, &profile).unwrap();
770
771 assert_eq!(first_id, second_id);
772
773 let connection = db.connection.lock().unwrap();
774
775 let mut stmt = connection
776 .prepare("SELECT id FROM profile WHERE user_id=?1")
777 .unwrap();
778
779 let profile_ids = stmt.query_map([user_id], |row| row.get(0)).unwrap();
780
781 let mut id_count = 0;
782
783 for row in profile_ids {
784 let _profile_id: i64 = row.unwrap();
785 id_count += 1;
786 }
787
788 assert_eq!(id_count, 1);
789}
790
791#[test]
792fn is_empty() {
793 let tmpdir = tempdir().unwrap();
794 let mut db = Database::new(tmpdir.path()).unwrap();
795 let connection = db.get_connection().unwrap();
796 assert!(connection.is_empty().unwrap());
797
798 let profile = Profile::new("Alice", "");
799 db.add_event(EVENT.clone(), profile);
800 db.commit().unwrap();
801 assert!(!connection.is_empty().unwrap());
802}
803
804#[cfg(feature = "encryption")]
805#[test]
806fn encrypted_db() {
807 let tmpdir = tempdir().unwrap();
808 let db_config = Config::new().set_passphrase("test");
809 let mut db = match Database::new_with_config(tmpdir.path(), &db_config) {
810 Ok(db) => db,
811 Err(e) => panic!("Coulnd't open encrypted database {}", e),
812 };
813
814 let connection = match db.get_connection() {
815 Ok(c) => c,
816 Err(e) => panic!("Could not get database connection {}", e),
817 };
818
819 assert!(
820 connection.is_empty().unwrap(),
821 "New database should be empty"
822 );
823
824 let profile = Profile::new("Alice", "");
825 db.add_event(EVENT.clone(), profile);
826
827 match db.commit() {
828 Ok(_) => (),
829 Err(e) => panic!("Could not commit events to database {}", e),
830 }
831 assert!(
832 !connection.is_empty().unwrap(),
833 "Database shouldn't be empty anymore"
834 );
835
836 drop(db);
837
838 let db = Database::new(tmpdir.path());
839 assert!(
840 db.is_err(),
841 "opening the database without a passphrase should fail"
842 );
843}
844
845#[cfg(feature = "encryption")]
846#[test]
847fn change_passphrase() {
848 let tmpdir = tempdir().unwrap();
849 let db_config = Config::new().set_passphrase("test");
850 let mut db = match Database::new_with_config(tmpdir.path(), &db_config) {
851 Ok(db) => db,
852 Err(e) => panic!("Coulnd't open encrypted database {}", e),
853 };
854
855 let connection = db
856 .get_connection()
857 .expect("Could not get database connection");
858 assert!(
859 connection.is_empty().unwrap(),
860 "New database should be empty"
861 );
862
863 let profile = Profile::new("Alice", "");
864 db.add_event(EVENT.clone(), profile);
865
866 db.commit().expect("Could not commit events to database");
867 db.change_passphrase("wordpass")
868 .expect("Could not change the database passphrase");
869
870 let db_config = Config::new().set_passphrase("wordpass");
871 let db = Database::new_with_config(tmpdir.path(), &db_config)
872 .expect("Could not open database with the new passphrase");
873 let connection = db
874 .get_connection()
875 .expect("Could not get database connection");
876 assert!(
877 !connection.is_empty().unwrap(),
878 "Database shouldn't be empty anymore"
879 );
880 drop(db);
881
882 let db_config = Config::new().set_passphrase("test");
883 let db = Database::new_with_config(tmpdir.path(), &db_config);
884 assert!(
885 db.is_err(),
886 "opening the database without a passphrase should fail"
887 );
888}
889
890#[test]
891fn resume_committing() {
892 let tmpdir = tempdir().unwrap();
893 let mut db = Database::new(tmpdir.path()).unwrap();
894 let profile = Profile::new("Alice", "");
895
896 assert!(
898 Database::load_uncommitted_events(&db.connection.lock().unwrap())
899 .unwrap()
900 .is_empty()
901 );
902
903 db.add_event(EVENT.clone(), profile);
904 db.commit().unwrap();
905 db.reload().unwrap();
906
907 assert!(
909 !Database::load_uncommitted_events(&db.connection.lock().unwrap())
910 .unwrap()
911 .is_empty()
912 );
913
914 assert!(db
916 .search("test", &SearchConfig::new())
917 .unwrap()
918 .results
919 .is_empty());
920
921 drop(db);
924 let mut counter = 0;
925 let mut db = Database::new(tmpdir.path());
926
927 while db.is_err() {
931 counter += 1;
932 if counter > 10 {
933 break;
934 }
935 thread::sleep(time::Duration::from_millis(100));
936 db = Database::new(tmpdir.path())
937 }
938
939 let mut db = db.unwrap();
940
941 assert_eq!(
943 Database::load_uncommitted_events(&db.connection.lock().unwrap()).unwrap()[0].1,
944 *EVENT
945 );
946
947 db.force_commit().unwrap();
948 db.reload().unwrap();
949
950 assert!(
952 Database::load_uncommitted_events(&db.connection.lock().unwrap())
953 .unwrap()
954 .is_empty()
955 );
956
957 let result = db.search("test", &SearchConfig::new()).unwrap().results;
958
959 assert!(!result.is_empty());
961 assert_eq!(result.len(), 1);
962 assert_eq!(result[0].event_source, EVENT.source);
963}
964
965#[test]
966fn delete_uncommitted() {
967 let tmpdir = tempdir().unwrap();
968 let mut db = Database::new(tmpdir.path()).unwrap();
969 let profile = Profile::new("Alice", "");
970
971 for i in 1..1000 {
972 let mut event: Event = Faker.fake();
973 event.server_ts += i;
974 db.add_event(event, profile.clone());
975
976 if i % 100 == 0 {
977 db.commit().unwrap();
978 }
979 }
980
981 db.force_commit().unwrap();
982 assert!(
983 Database::load_uncommitted_events(&db.connection.lock().unwrap())
984 .unwrap()
985 .is_empty()
986 );
987}
988
989#[test]
990fn stats_getting() {
991 let tmpdir = tempdir().unwrap();
992 let mut db = Database::new(tmpdir.path()).unwrap();
993 let profile = Profile::new("Alice", "");
994
995 for i in 0..1000 {
996 let mut event: Event = Faker.fake();
997 event.server_ts += i;
998 db.add_event(event, profile.clone());
999 }
1000
1001 db.commit().unwrap();
1002
1003 let connection = db.get_connection().unwrap();
1004
1005 let stats = connection.get_stats().unwrap();
1006
1007 assert_eq!(stats.event_count, 1000);
1008 assert_eq!(stats.room_count, 1);
1009 assert!(stats.size > 0);
1010}
1011
1012#[test]
1013fn database_upgrade_v1() {
1014 let mut path = PathBuf::from(file!());
1015 path.pop();
1016 path.pop();
1017 path.pop();
1018 path.push("data/database/v1");
1019 let db = Database::new(path);
1020
1021 match db {
1024 Ok(_) => panic!("Database doesn't need a reindex."),
1025 Err(e) => match e {
1026 Error::ReindexError => (),
1027 e => panic!("Database doesn't need a reindex: {}", e),
1028 },
1029 }
1030}
1031
1032#[cfg(test)]
1033use crate::database::recovery::test::reindex_loop;
1034
1035#[test]
1036fn database_upgrade_v1_2() {
1037 let mut path = PathBuf::from(file!());
1038 path.pop();
1039 path.pop();
1040 path.pop();
1041 path.push("data/database/v1_2");
1042 let db = Database::new(&path);
1043 match db {
1044 Ok(_) => panic!("Database doesn't need a reindex."),
1045 Err(e) => match e {
1046 Error::ReindexError => (),
1047 e => panic!("Database doesn't need a reindex: {}", e),
1048 },
1049 }
1050
1051 let mut recovery_db = RecoveryDatabase::new(&path).expect("Can't open recovery db");
1052
1053 recovery_db.delete_the_index().unwrap();
1054 recovery_db.open_index().unwrap();
1055
1056 let events = recovery_db.load_events_deserialized(100, None).unwrap();
1057
1058 recovery_db.index_events(&events).unwrap();
1059 reindex_loop(&mut recovery_db, events).unwrap();
1060 recovery_db.commit_and_close().unwrap();
1061
1062 let db = Database::new(&path).expect("Can't open the db event after a reindex");
1063
1064 let mut connection = db.get_connection().unwrap();
1065 let (version, _) = Database::get_version(&mut connection).unwrap();
1066 assert_eq!(version, DATABASE_VERSION);
1067
1068 let result = db.search("Hello", &SearchConfig::new()).unwrap().results;
1069 assert!(!result.is_empty())
1070}
1071
1072#[test]
1073fn delete_an_event() {
1074 let tmpdir = tempdir().unwrap();
1075 let mut db = Database::new(tmpdir.path()).unwrap();
1076 let profile = Profile::new("Alice", "");
1077
1078 db.add_event(EVENT.clone(), profile.clone());
1079 db.add_event(TOPIC_EVENT.clone(), profile);
1080
1081 db.force_commit().unwrap();
1082
1083 assert!(
1084 Database::load_pending_deletion_events(&db.connection.lock().unwrap())
1085 .unwrap()
1086 .is_empty()
1087 );
1088
1089 let recv = db.delete_event(&EVENT.event_id);
1090 recv.recv().unwrap().unwrap();
1091
1092 assert_eq!(
1093 Database::load_pending_deletion_events(&db.connection.lock().unwrap())
1094 .unwrap()
1095 .len(),
1096 1
1097 );
1098
1099 drop(db);
1100
1101 let mut db = Database::new(tmpdir.path()).unwrap();
1102 assert_eq!(
1103 Database::load_pending_deletion_events(&db.connection.lock().unwrap())
1104 .unwrap()
1105 .len(),
1106 1
1107 );
1108
1109 db.force_commit().unwrap();
1110 assert_eq!(
1111 Database::load_pending_deletion_events(&db.connection.lock().unwrap())
1112 .unwrap()
1113 .len(),
1114 0
1115 );
1116}
1117
1118#[test]
1119fn add_events_with_null_byte() {
1120 let event_source: &str = r#"{
1121 "content": {
1122 "body": "\u00000",
1123 "msgtype": "m.text"
1124 },
1125 "event_id": "$15163622448EBvZJ:localhost",
1126 "origin_server_ts": 1516362244050,
1127 "sender": "@example2:localhost",
1128 "type": "m.room.message",
1129 "unsigned": {"age": 43289803098},
1130 "user_id": "@example2:localhost",
1131 "age": 43289803098,
1132 "room_id": "!test:example.org"
1133 }"#;
1134
1135 let event = RecoveryDatabase::event_from_json(event_source).unwrap();
1136
1137 let tmpdir = tempdir().unwrap();
1138 let db = Database::new(tmpdir.path()).unwrap();
1139 let profile = Profile::new("Alice", &event.content_value);
1140
1141 let events = vec![(event, profile)];
1142 db.add_historic_events(events, None, None)
1143 .recv()
1144 .unwrap()
1145 .expect("Event should be added");
1146}
1147
1148#[test]
1149fn is_room_indexed() {
1150 let tmpdir = tempdir().unwrap();
1151 let mut db = Database::new(tmpdir.path()).unwrap();
1152
1153 let connection = db.get_connection().unwrap();
1154
1155 assert!(connection.is_empty().unwrap());
1156 assert!(!connection.is_room_indexed("!test_room:localhost").unwrap());
1157
1158 let profile = Profile::new("Alice", "");
1159 db.add_event(EVENT.clone(), profile);
1160 db.force_commit().unwrap();
1161
1162 assert!(connection.is_room_indexed("!test_room:localhost").unwrap());
1163 assert!(!connection.is_room_indexed("!test_room2:localhost").unwrap());
1164}
1165
1166#[test]
1167fn user_version() {
1168 let tmpdir = tempdir().unwrap();
1169 let db = Database::new(tmpdir.path()).unwrap();
1170 let connection = db.get_connection().unwrap();
1171
1172 assert_eq!(connection.get_user_version().unwrap(), 0);
1173 connection.set_user_version(10).unwrap();
1174 assert_eq!(connection.get_user_version().unwrap(), 10);
1175}
1176
1177#[test]
1178#[cfg(feature = "encryption")]
1179fn sqlcipher_cipher_settings_update() {
1180 let mut path = PathBuf::from(file!());
1181 path.pop();
1182 path.pop();
1183 path.pop();
1184 path.push("data/database/sqlcipher-v3");
1185
1186 let config = Config::new().set_passphrase("qR17RdpWurSh2pQRSc/EnsaO9V041kOwsZk0iSdUY/g");
1187 let _db =
1188 Database::new_with_config(&path, &config).expect("We should be able to open the database");
1189}