ssb_db/sqlite_ssb_db/
mod.rs

1use flumedb::offset_log::OffsetLog;
2use flumedb::{FlumeLog, IterAtOffset};
3
4use diesel::prelude::*;
5use diesel::sqlite::SqliteConnection;
6use diesel_migrations::any_pending_migrations;
7use itertools::Itertools;
8use snafu::{OptionExt, ResultExt};
9use ssb_legacy_msg_data;
10use ssb_legacy_msg_data::value::Value;
11use ssb_multiformats::multihash::Multihash;
12use ssb_multiformats::multikey::Multikey;
13use std::cell::RefCell;
14
15use crate::db;
16use crate::error::*;
17use crate::ssb_message::SsbMessage;
18use crate::SsbDb;
19
20use db::{
21    append_item, find_feed_flume_seqs_newer_than, find_feed_latest_seq,
22    find_message_flume_seq_by_author_and_sequence, find_message_flume_seq_by_key, get_latest,
23};
24
25pub struct SqliteSsbDb {
26    connection: RefCell<SqliteConnection>,
27    offset_log: RefCell<OffsetLog<u32>>,
28    db_path: String,
29}
30
31embed_migrations!();
32
33impl SqliteSsbDb {
34    pub fn new<S: AsRef<str>>(database_path: S, offset_log_path: S) -> SqliteSsbDb {
35        let connection = setup_connection(database_path.as_ref());
36
37        let offset_log = match OffsetLog::new(&offset_log_path.as_ref()) {
38            Ok(log) => log,
39            Err(_) => {
40                panic!("failed to open offset log at {}", offset_log_path.as_ref());
41            }
42        };
43        SqliteSsbDb {
44            connection: RefCell::new(connection),
45            offset_log: RefCell::new(offset_log),
46            db_path: database_path.as_ref().to_owned(),
47        }
48    }
49
50    pub fn update_indexes_from_offset_file(&self) -> Result<()> {
51        //We're using Max of flume_seq.
52        //When the db is empty, we'll get None.
53        //When there is one item in the db, we'll get 0 (it's the first seq number you get)
54        //When there's more than one you'll get some >0 number
55
56        let connection = self.connection.borrow_mut();
57        let offset_log = self.offset_log.borrow();
58
59        let max_seq = get_latest(&connection)
60            .context(UnableToGetLatestSequence)?
61            .map(|val| val as u64);
62
63        let num_to_skip: usize = match max_seq {
64            None => 0,
65            _ => 1,
66        };
67
68        let starting_offset = max_seq.unwrap_or(0);
69
70        offset_log
71            .iter_at_offset(starting_offset)
72            .skip(num_to_skip)
73            .chunks(10000)
74            .into_iter()
75            .map(|chunk| {
76                connection
77                    .transaction::<_, db::Error, _>(|| {
78                        chunk
79                            .map(|log_entry| {
80                                append_item(&connection, log_entry.offset, &log_entry.data)?;
81
82                                Ok(())
83                            })
84                            .collect::<std::result::Result<(), db::Error>>()
85                    })
86                    .map_err(|_| Error::SqliteAppendError {})
87                    .and_then(|_| Ok(()))
88            })
89            .collect()
90    }
91}
92
93impl SsbDb for SqliteSsbDb {
94    fn append_batch<T: AsRef<[u8]>>(&self, _: &Multikey, messages: &[T]) -> Result<()> {
95        // First, append the messages to flume
96        self.offset_log
97            .borrow_mut()
98            .append_batch(messages)
99            .map_err(|_| Error::OffsetAppendError {})?;
100
101        self.update_indexes_from_offset_file()
102    }
103    fn get_entry_by_key<'a>(&'a self, message_key: &Multihash) -> Result<Vec<u8>> {
104        let flume_seq = find_message_flume_seq_by_key(
105            &self.connection.borrow(),
106            &message_key.to_legacy_string(),
107        )
108        .context(MessageNotFound)?;
109        self.offset_log
110            .borrow()
111            .get(flume_seq)
112            .map_err(|_| Error::OffsetGetError {})
113    }
114
115    fn get_entry_by_seq(&self, feed_id: &Multikey, sequence: i32) -> Result<Option<Vec<u8>>> {
116        let flume_seq = find_message_flume_seq_by_author_and_sequence(
117            &self.connection.borrow(),
118            &feed_id.to_legacy_string(),
119            sequence,
120        )
121        .context(MessageNotFound)?;
122
123        flume_seq
124            .map(|flume_seq| {
125                self.offset_log
126                    .borrow()
127                    .get(flume_seq as u64)
128                    .map_err(|_| Error::OffsetGetError {})
129            })
130            .transpose()
131    }
132    fn get_feed_latest_sequence(&self, feed_id: &Multikey) -> Result<Option<i32>> {
133        find_feed_latest_seq(&self.connection.borrow(), &feed_id.to_legacy_string())
134            .context(FeedNotFound)
135    }
136    fn get_entries_newer_than_sequence<'a>(
137        &'a self,
138        feed_id: &Multikey,
139        sequence: i32,
140        limit: Option<i64>,
141        include_keys: bool,
142        include_values: bool,
143    ) -> Result<Vec<Vec<u8>>> {
144        let seqs = find_feed_flume_seqs_newer_than(
145            &self.connection.borrow(),
146            &feed_id.to_legacy_string(),
147            sequence,
148            limit,
149        )
150        .context(FeedNotFound)?;
151
152        match (include_keys, include_values) {
153            (false, false) => Err(Error::IncludeKeysIncludeValuesBothFalse {}),
154            (true, false) => seqs
155                .iter()
156                .flat_map(|seq| {
157                    self.offset_log
158                        .borrow()
159                        .get(*seq)
160                        .map_err(|_| Error::OffsetGetError {})
161                })
162                .flat_map(|msg| serde_json::from_slice::<SsbMessage>(&msg))
163                .map(|msg| Ok(msg.key.into_bytes()))
164                .collect(),
165            (false, true) => {
166                seqs.iter()
167                    .flat_map(|seq| {
168                        self.offset_log
169                            .borrow()
170                            .get(*seq)
171                            .map_err(|_| Error::OffsetGetError {})
172                    })
173                    .flat_map(|msg| {
174                        //If we're going to use Serde to pluck out the value we have to use
175                        //ssb-legacy-data Value so that when we convert it back to a string, the
176                        //ordering is still intact.
177                        //If we don't do that then we would return a message that would fail
178                        //verification
179                        ssb_legacy_msg_data::json::from_slice(&msg)
180                    })
181                    .map(|legacy_value| {
182                        if let Value::Object(legacy_val) = legacy_value {
183                            let val = legacy_val.get("value").context(ErrorParsingAsLegacyValue)?;
184                            ssb_legacy_msg_data::json::to_vec(&val, false)
185                                .map_err(|_| Error::EncodingValueAsVecError {})
186                        } else {
187                            Err(Error::ErrorParsingAsLegacyValue {})
188                        }
189                    })
190                    .collect()
191            }
192            (true, true) => seqs
193                .iter()
194                .map(|seq| {
195                    self.offset_log
196                        .borrow()
197                        .get(*seq)
198                        .map_err(|_| Error::OffsetGetError {})
199                })
200                .collect(),
201        }
202    }
203    fn rebuild_indexes(&self) -> Result<()> {
204        std::fs::remove_file(&self.db_path).unwrap();
205        self.connection.replace(setup_connection(&self.db_path));
206        self.update_indexes_from_offset_file()
207    }
208}
209fn setup_connection(database_path: &str) -> SqliteConnection {
210    let database_url = to_sqlite_uri(database_path, "rwc");
211    let connection = SqliteConnection::establish(&database_url)
212        .expect(&format!("Error connecting to {}", database_url));
213
214    if let Err(_) = any_pending_migrations(&connection) {
215        embedded_migrations::run(&connection).unwrap();
216    }
217
218    if let Ok(true) = any_pending_migrations(&connection) {
219        std::fs::remove_file(&database_path).unwrap();
220        embedded_migrations::run(&connection).unwrap();
221    }
222
223    connection
224}
225fn to_sqlite_uri(path: &str, rw_mode: &str) -> String {
226    format!("file:{}?mode={}", path, rw_mode)
227}