ssb_db/sqlite_ssb_db/
mod.rs1use flumedb::offset_log::OffsetLog;
2use flumedb::{FlumeLog, IterAtOffset};
3
4use diesel::prelude::*;
5use diesel::sqlite::SqliteConnection;
6use diesel_migrations::any_pending_migrations;
7use itertools::Itertools;
8use snafu::{OptionExt, ResultExt};
9use ssb_legacy_msg_data;
10use ssb_legacy_msg_data::value::Value;
11use ssb_multiformats::multihash::Multihash;
12use ssb_multiformats::multikey::Multikey;
13use std::cell::RefCell;
14
15use crate::db;
16use crate::error::*;
17use crate::ssb_message::SsbMessage;
18use crate::SsbDb;
19
20use db::{
21 append_item, find_feed_flume_seqs_newer_than, find_feed_latest_seq,
22 find_message_flume_seq_by_author_and_sequence, find_message_flume_seq_by_key, get_latest,
23};
24
25pub struct SqliteSsbDb {
26 connection: RefCell<SqliteConnection>,
27 offset_log: RefCell<OffsetLog<u32>>,
28 db_path: String,
29}
30
31embed_migrations!();
32
33impl SqliteSsbDb {
34 pub fn new<S: AsRef<str>>(database_path: S, offset_log_path: S) -> SqliteSsbDb {
35 let connection = setup_connection(database_path.as_ref());
36
37 let offset_log = match OffsetLog::new(&offset_log_path.as_ref()) {
38 Ok(log) => log,
39 Err(_) => {
40 panic!("failed to open offset log at {}", offset_log_path.as_ref());
41 }
42 };
43 SqliteSsbDb {
44 connection: RefCell::new(connection),
45 offset_log: RefCell::new(offset_log),
46 db_path: database_path.as_ref().to_owned(),
47 }
48 }
49
50 pub fn update_indexes_from_offset_file(&self) -> Result<()> {
51 let connection = self.connection.borrow_mut();
57 let offset_log = self.offset_log.borrow();
58
59 let max_seq = get_latest(&connection)
60 .context(UnableToGetLatestSequence)?
61 .map(|val| val as u64);
62
63 let num_to_skip: usize = match max_seq {
64 None => 0,
65 _ => 1,
66 };
67
68 let starting_offset = max_seq.unwrap_or(0);
69
70 offset_log
71 .iter_at_offset(starting_offset)
72 .skip(num_to_skip)
73 .chunks(10000)
74 .into_iter()
75 .map(|chunk| {
76 connection
77 .transaction::<_, db::Error, _>(|| {
78 chunk
79 .map(|log_entry| {
80 append_item(&connection, log_entry.offset, &log_entry.data)?;
81
82 Ok(())
83 })
84 .collect::<std::result::Result<(), db::Error>>()
85 })
86 .map_err(|_| Error::SqliteAppendError {})
87 .and_then(|_| Ok(()))
88 })
89 .collect()
90 }
91}
92
93impl SsbDb for SqliteSsbDb {
94 fn append_batch<T: AsRef<[u8]>>(&self, _: &Multikey, messages: &[T]) -> Result<()> {
95 self.offset_log
97 .borrow_mut()
98 .append_batch(messages)
99 .map_err(|_| Error::OffsetAppendError {})?;
100
101 self.update_indexes_from_offset_file()
102 }
103 fn get_entry_by_key<'a>(&'a self, message_key: &Multihash) -> Result<Vec<u8>> {
104 let flume_seq = find_message_flume_seq_by_key(
105 &self.connection.borrow(),
106 &message_key.to_legacy_string(),
107 )
108 .context(MessageNotFound)?;
109 self.offset_log
110 .borrow()
111 .get(flume_seq)
112 .map_err(|_| Error::OffsetGetError {})
113 }
114
115 fn get_entry_by_seq(&self, feed_id: &Multikey, sequence: i32) -> Result<Option<Vec<u8>>> {
116 let flume_seq = find_message_flume_seq_by_author_and_sequence(
117 &self.connection.borrow(),
118 &feed_id.to_legacy_string(),
119 sequence,
120 )
121 .context(MessageNotFound)?;
122
123 flume_seq
124 .map(|flume_seq| {
125 self.offset_log
126 .borrow()
127 .get(flume_seq as u64)
128 .map_err(|_| Error::OffsetGetError {})
129 })
130 .transpose()
131 }
132 fn get_feed_latest_sequence(&self, feed_id: &Multikey) -> Result<Option<i32>> {
133 find_feed_latest_seq(&self.connection.borrow(), &feed_id.to_legacy_string())
134 .context(FeedNotFound)
135 }
136 fn get_entries_newer_than_sequence<'a>(
137 &'a self,
138 feed_id: &Multikey,
139 sequence: i32,
140 limit: Option<i64>,
141 include_keys: bool,
142 include_values: bool,
143 ) -> Result<Vec<Vec<u8>>> {
144 let seqs = find_feed_flume_seqs_newer_than(
145 &self.connection.borrow(),
146 &feed_id.to_legacy_string(),
147 sequence,
148 limit,
149 )
150 .context(FeedNotFound)?;
151
152 match (include_keys, include_values) {
153 (false, false) => Err(Error::IncludeKeysIncludeValuesBothFalse {}),
154 (true, false) => seqs
155 .iter()
156 .flat_map(|seq| {
157 self.offset_log
158 .borrow()
159 .get(*seq)
160 .map_err(|_| Error::OffsetGetError {})
161 })
162 .flat_map(|msg| serde_json::from_slice::<SsbMessage>(&msg))
163 .map(|msg| Ok(msg.key.into_bytes()))
164 .collect(),
165 (false, true) => {
166 seqs.iter()
167 .flat_map(|seq| {
168 self.offset_log
169 .borrow()
170 .get(*seq)
171 .map_err(|_| Error::OffsetGetError {})
172 })
173 .flat_map(|msg| {
174 ssb_legacy_msg_data::json::from_slice(&msg)
180 })
181 .map(|legacy_value| {
182 if let Value::Object(legacy_val) = legacy_value {
183 let val = legacy_val.get("value").context(ErrorParsingAsLegacyValue)?;
184 ssb_legacy_msg_data::json::to_vec(&val, false)
185 .map_err(|_| Error::EncodingValueAsVecError {})
186 } else {
187 Err(Error::ErrorParsingAsLegacyValue {})
188 }
189 })
190 .collect()
191 }
192 (true, true) => seqs
193 .iter()
194 .map(|seq| {
195 self.offset_log
196 .borrow()
197 .get(*seq)
198 .map_err(|_| Error::OffsetGetError {})
199 })
200 .collect(),
201 }
202 }
203 fn rebuild_indexes(&self) -> Result<()> {
204 std::fs::remove_file(&self.db_path).unwrap();
205 self.connection.replace(setup_connection(&self.db_path));
206 self.update_indexes_from_offset_file()
207 }
208}
209fn setup_connection(database_path: &str) -> SqliteConnection {
210 let database_url = to_sqlite_uri(database_path, "rwc");
211 let connection = SqliteConnection::establish(&database_url)
212 .expect(&format!("Error connecting to {}", database_url));
213
214 if let Err(_) = any_pending_migrations(&connection) {
215 embedded_migrations::run(&connection).unwrap();
216 }
217
218 if let Ok(true) = any_pending_migrations(&connection) {
219 std::fs::remove_file(&database_path).unwrap();
220 embedded_migrations::run(&connection).unwrap();
221 }
222
223 connection
224}
225fn to_sqlite_uri(path: &str, rw_mode: &str) -> String {
226 format!("file:{}?mode={}", path, rw_mode)
227}