ssb_db/
lib.rs

1//! # ssb-db
2//!
3//! The most basic db that you need to do (legacy) replication on ssb.
4//!
5//! ## Legacy Replication
6//!
7//! "Legacy Replication" is how ssb used to do replication before
8//! [ebt](https://github.com/dominictarr/epidemic-broadcast-trees)
9//!
10//! It's simpler than ebt, but uses more bandwidth.
11//!
12//! To do legacy replication a client calls
13//! [createHistoryStream](https://scuttlebot.io/apis/scuttlebot/ssb.html#createhistorystream-source)
14//! for each feed it wants to replicate, passing the largest sequence number it knows about.
15//!
16//! ## SsbDb
17//!
18//! `ssb-db` defines a trait [SsbDb] that provides all the functionality you should need
19//! to make and handle legacy replication requests.
20//!
21//! ## Architecture
22//!
23//! [SqliteSsbDb] implements the [SsbDb] trait.
24//!
25//! The underlying architecture is based on [flume-db](https://github.com/sunrise-choir/flumedb-rs).
26//!
27//! `ssb-db` stores data in an append only log. It maintains indexes for querying the log in sqlite.
28//! The append only log is the source of truth and the indexes are derived from the log. If the
29//! indexes break or need to be migrated, the sqlite db can be deleted and rebuilt from the log.
30//!
31//! ## Validation
32//!
33//! `ssb-db` **does not validate any messages before appending them.** The caller must validate them first.
34//! See [ssb-validate](https://github.com/sunrise-choir/ssb-validate) and [ssb-verify-signatures](https://github.com/sunrise-choir/ssb-verify-signatures).
35//!
36#[macro_use]
37extern crate diesel_migrations;
38#[macro_use]
39extern crate diesel;
40
41pub use flumedb::flume_view::Sequence as FlumeSequence;
42
43mod db;
44pub mod error;
45pub mod sqlite_ssb_db;
46mod ssb_message;
47
48pub use error::Error;
49pub use sqlite_ssb_db::SqliteSsbDb;
50
51use error::Result;
52use ssb_multiformats::multihash::Multihash;
53use ssb_multiformats::multikey::Multikey;
54
55pub trait SsbDb {
56    /// Append a batch of valid ssb messages authored by the `feed_id`.
57    fn append_batch<T: 'static + AsRef<[u8]>>(&self, feed_id: &Multikey, messages: &[T]) -> Result<()>;
58    /// Get an entry by its ssb message key.
59    fn get_entry_by_key(&self, message_key: &Multihash) -> Result<Vec<u8>>;
60    /// Get an entry by its sequence key + author.
61    fn get_entry_by_seq(&self, feed_id: &Multikey, sequence: i32) -> Result<Option<Vec<u8>>>;
62    /// Get the latest sequence number for the given feed.
63    fn get_feed_latest_sequence(&self, feed_id: &Multikey) -> Result<Option<i32>>;
64    /// Get all the entries for the given `feed_id`, with a sequence larger than `sequence`.
65    ///
66    /// You may `limit` the maximum number of entries to get.
67    ///
68    /// You can control whether to `include_keys`, `include_values`, or both.
69    ///
70    /// If `include_values` and `include_values` are both `false`
71    /// `get_entries_newer_than_sequence` will return an `Error`.
72    fn get_entries_newer_than_sequence(
73        &self,
74        feed_id: &Multikey,
75        sequence: i32,
76        limit: Option<i64>,
77        include_keys: bool,
78        include_values: bool,
79    ) -> Result<Vec<Vec<u8>>>;
80    /// You can rebuild the indexes in sqlite db (but not the offset file) if they become
81    /// corrupted.
82    fn rebuild_indexes(&self) -> Result<()>;
83}
84
85#[cfg(test)]
86mod tests {
87    use crate::ssb_message::{SsbMessage, SsbValue};
88    use crate::{SqliteSsbDb, SsbDb};
89    use flumedb::offset_log::OffsetLog;
90    use ssb_multiformats::multihash::Multihash;
91    use ssb_multiformats::multikey::Multikey;
92
93    #[test]
94    fn get_entry_by_key_works() {
95        let key_str = "%/v5mCnV/kmnVtnF3zXtD4tbzoEQo4kRq/0d/bgxP1WI=.sha256";
96        let key = Multihash::from_legacy(key_str.as_bytes()).unwrap().0;
97
98        let db_path = "/tmp/test_get_entry_by_key.sqlite3";
99        let db = SqliteSsbDb::new(db_path, "./test_vecs/piet.offset");
100        db.update_indexes_from_offset_file().unwrap();
101
102        let res = db.get_entry_by_key(&key);
103        let entry = res.unwrap();
104        let value: serde_json::Value = serde_json::from_slice(&entry).unwrap();
105
106        let actual_key_str: &str = value["key"].as_str().unwrap();
107
108        assert_eq!(actual_key_str, key_str);
109        std::fs::remove_file(&db_path).unwrap();
110    }
111
112    #[test]
113    fn get_feed_latest_sequence_works() {
114        let expected_seq = 6006;
115        let author_str = "@U5GvOKP/YUza9k53DSXxT0mk3PIrnyAmessvNfZl5E0=.ed25519";
116        let author = Multikey::from_legacy(author_str.as_bytes()).unwrap().0;
117
118        let db_path = "/tmp/test_get_latest_seq.sqlite3";
119        let db = SqliteSsbDb::new(db_path, "./test_vecs/piet.offset");
120        db.update_indexes_from_offset_file().unwrap();
121
122        let res = db.get_feed_latest_sequence(&author);
123        let seq = res.unwrap();
124
125        assert_eq!(seq.unwrap(), expected_seq);
126        std::fs::remove_file(&db_path).unwrap();
127    }
128    #[test]
129    fn get_entries_kv_newer_than_sequence_works() {
130        let author_str = "@U5GvOKP/YUza9k53DSXxT0mk3PIrnyAmessvNfZl5E0=.ed25519";
131        let author = Multikey::from_legacy(author_str.as_bytes()).unwrap().0;
132
133        let db_path = "/tmp/test_get_entries_kv.sqlite3";
134        let db = SqliteSsbDb::new(db_path, "./test_vecs/piet.offset");
135        db.update_indexes_from_offset_file().unwrap();
136
137        let res = db
138            .get_entries_newer_than_sequence(&author, 6000, None, true, true)
139            .unwrap()
140            .iter()
141            .flat_map(|entry| serde_json::from_slice::<SsbMessage>(&entry))
142            .collect::<Vec<_>>();
143
144        assert_eq!(res.len(), 6);
145
146        std::fs::remove_file(&db_path).unwrap();
147    }
148    #[test]
149    fn get_entries_newer_than_sequence_works_with_limit() {
150        let author_str = "@U5GvOKP/YUza9k53DSXxT0mk3PIrnyAmessvNfZl5E0=.ed25519";
151        let author = Multikey::from_legacy(author_str.as_bytes()).unwrap().0;
152
153        let db_path = "/tmp/test_get_entries_kv_limit.sqlite3";
154        let db = SqliteSsbDb::new(db_path, "./test_vecs/piet.offset");
155        db.update_indexes_from_offset_file().unwrap();
156
157        let res = db
158            .get_entries_newer_than_sequence(&author, 6000, Some(2), true, true)
159            .unwrap()
160            .iter()
161            .flat_map(|entry| serde_json::from_slice::<SsbMessage>(&entry))
162            .collect::<Vec<_>>();
163
164        assert_eq!(res.len(), 2);
165
166        std::fs::remove_file(&db_path).unwrap();
167    }
168    #[test]
169    fn get_entries_k_newer_than_sequence_works() {
170        let author_str = "@U5GvOKP/YUza9k53DSXxT0mk3PIrnyAmessvNfZl5E0=.ed25519";
171        let author = Multikey::from_legacy(author_str.as_bytes()).unwrap().0;
172
173        let db_path = "/tmp/test_get_entries_k.sqlite3";
174        let db = SqliteSsbDb::new(db_path, "./test_vecs/piet.offset");
175        db.update_indexes_from_offset_file().unwrap();
176
177        let res = db
178            .get_entries_newer_than_sequence(&author, 6000, None, true, false)
179            .unwrap()
180            .iter()
181            .flat_map(|entry| Multihash::from_legacy(entry))
182            .map(|(key, _)| key)
183            .collect::<Vec<_>>();
184
185        assert_eq!(res.len(), 6);
186
187        std::fs::remove_file(&db_path).unwrap();
188    }
189    #[test]
190    fn get_entries_v_newer_than_sequence_works() {
191        //check message is valid
192        let author_str = "@U5GvOKP/YUza9k53DSXxT0mk3PIrnyAmessvNfZl5E0=.ed25519";
193        let author = Multikey::from_legacy(author_str.as_bytes()).unwrap().0;
194
195        let db_path = "/tmp/test_get_entries_v.sqlite3";
196        let db = SqliteSsbDb::new(db_path, "./test_vecs/piet.offset");
197        db.update_indexes_from_offset_file().unwrap();
198
199        let res = db
200            .get_entries_newer_than_sequence(&author, 6000, None, false, true)
201            .unwrap()
202            .iter()
203            .flat_map(|entry| serde_json::from_slice::<SsbValue>(entry))
204            .collect::<Vec<_>>();
205
206        assert_eq!(res.len(), 6);
207
208        std::fs::remove_file(&db_path).unwrap();
209    }
210    #[test]
211    fn get_entries_no_kv_newer_than_sequence_errors() {
212        //check message is valid
213        let author_str = "@U5GvOKP/YUza9k53DSXxT0mk3PIrnyAmessvNfZl5E0=.ed25519";
214        let author = Multikey::from_legacy(author_str.as_bytes()).unwrap().0;
215
216        let db_path = "/tmp/test_get_entries_no_kv.sqlite3";
217        let db = SqliteSsbDb::new(db_path, "./test_vecs/piet.offset");
218        db.update_indexes_from_offset_file().unwrap();
219
220        let res = db.get_entries_newer_than_sequence(&author, 6000, None, false, false);
221
222        assert!(res.is_err());
223
224        std::fs::remove_file(&db_path).unwrap();
225    }
226    #[test]
227    fn append_batch_works() {
228        let expected_seq = 6006;
229        let author_str = "@U5GvOKP/YUza9k53DSXxT0mk3PIrnyAmessvNfZl5E0=.ed25519";
230        let author = Multikey::from_legacy(author_str.as_bytes()).unwrap().0;
231        let offset_log_path = "./test_vecs/piet.offset";
232        let log = OffsetLog::<u32>::new(&offset_log_path).unwrap();
233
234        let entries = log.iter().map(|entry| entry.data).collect::<Vec<_>>();
235
236        let db_path = "/tmp/test_append_batch.sqlite3";
237        let offset_path = "/tmp/test_append_batch.offset";
238        let db = SqliteSsbDb::new(db_path, offset_path);
239
240        let res = db.append_batch(&author, &entries.as_slice());
241        assert!(res.is_ok());
242
243        let res = db.get_feed_latest_sequence(&author);
244        let seq = res.unwrap();
245        assert_eq!(seq.unwrap(), expected_seq);
246
247        std::fs::remove_file(&db_path).unwrap();
248        std::fs::remove_file(&offset_path).unwrap();
249    }
250    #[test]
251    fn rebuild_indexes_works() {
252        let expected_seq = 6006;
253
254        let author_str = "@U5GvOKP/YUza9k53DSXxT0mk3PIrnyAmessvNfZl5E0=.ed25519";
255        let author = Multikey::from_legacy(author_str.as_bytes()).unwrap().0;
256
257        let db_path = "/tmp/test_rebuild_indexes.sqlite3";
258        let db = SqliteSsbDb::new(db_path, "./test_vecs/piet.offset");
259        db.update_indexes_from_offset_file().unwrap();
260
261        let res = db.rebuild_indexes();
262
263        assert!(res.is_ok());
264
265        let res = db.get_feed_latest_sequence(&author);
266        let seq = res.unwrap();
267
268        assert_eq!(seq.unwrap(), expected_seq);
269
270        std::fs::remove_file(&db_path).unwrap();
271    }
272}