nostr_sqlite/
lib.rs

1// Copyright (c) 2022-2023 Yuki Kishimoto
2// Copyright (c) 2023-2024 Rust Nostr Developers
3// Distributed under the MIT software license
4
5//! SQLite Storage backend for Nostr SDK
6
7#![forbid(unsafe_code)]
8#![warn(missing_docs)]
9#![warn(rustdoc::bare_urls)]
10#![allow(clippy::mutable_key_type)] // TODO: remove when possible. Needed to suppress false positive for `BTreeSet<Event>`
11
12use std::collections::{BTreeSet, HashSet};
13use std::path::Path;
14use std::sync::Arc;
15
16pub extern crate nostr;
17pub extern crate nostr_database as database;
18
19use async_trait::async_trait;
20use nostr_database::prelude::*;
21use rusqlite::config::DbConfig;
22use rusqlite::Connection;
23use tokio::sync::RwLock;
24
25mod error;
26mod migration;
27mod pool;
28
29use self::error::Error;
30use self::migration::STARTUP_SQL;
31use self::pool::Pool;
32
33/// SQLite Nostr Database
34#[deprecated(since = "0.35.0", note = "Use LMDB or other backend instead")]
35#[derive(Debug, Clone)]
36pub struct SQLiteDatabase {
37    pool: Pool,
38    helper: DatabaseHelper,
39    fbb: Arc<RwLock<FlatBufferBuilder<'static>>>,
40}
41
42#[allow(deprecated)]
43impl SQLiteDatabase {
44    async fn new<P>(path: P, helper: DatabaseHelper) -> Result<Self, DatabaseError>
45    where
46        P: AsRef<Path>,
47    {
48        let conn = Connection::open(path).map_err(DatabaseError::backend)?;
49        let pool: Pool = Pool::new(conn);
50
51        // Execute migrations
52        migration::run(&pool).await?;
53
54        let this = Self {
55            pool,
56            helper,
57            fbb: Arc::new(RwLock::new(FlatBufferBuilder::with_capacity(70_000))),
58        };
59
60        this.bulk_load().await?;
61
62        Ok(this)
63    }
64
65    /// Open database with **unlimited** capacity
66    #[inline]
67    pub async fn open<P>(path: P) -> Result<Self, DatabaseError>
68    where
69        P: AsRef<Path>,
70    {
71        Self::new(path, DatabaseHelper::unbounded()).await
72    }
73
74    /// Open database with **limited** capacity
75    #[inline]
76    pub async fn open_bounded<P>(path: P, max_capacity: usize) -> Result<Self, DatabaseError>
77    where
78        P: AsRef<Path>,
79    {
80        Self::new(path, DatabaseHelper::bounded(max_capacity)).await
81    }
82
83    #[tracing::instrument(skip_all)]
84    async fn bulk_load(&self) -> Result<(), DatabaseError> {
85        let events = self
86            .pool
87            .interact(move |conn| {
88                // Query
89                let mut stmt = conn.prepare("SELECT event FROM events;")?;
90                let mut rows = stmt.query([])?;
91
92                // Decode
93                let mut events = BTreeSet::new();
94                while let Ok(Some(row)) = rows.next() {
95                    let buf: &[u8] = row.get_ref(0)?.as_bytes()?;
96                    let event = Event::decode(buf)?;
97                    events.insert(event);
98                }
99                Ok::<BTreeSet<Event>, Error>(events)
100            })
101            .await??;
102
103        // Build indexes
104        let to_discard: HashSet<EventId> = self.helper.bulk_load(events).await;
105
106        // Discard events
107        if !to_discard.is_empty() {
108            self.pool
109                .interact(move |conn| {
110                    let mut stmt = conn.prepare_cached("DELETE FROM events WHERE event_id = ?;")?;
111                    for id in to_discard.into_iter() {
112                        stmt.execute([id.to_hex()])?;
113                    }
114                    Ok::<(), Error>(())
115                })
116                .await??;
117        }
118        Ok(())
119    }
120}
121
122#[async_trait]
123#[allow(deprecated)]
124impl NostrDatabase for SQLiteDatabase {
125    fn backend(&self) -> Backend {
126        Backend::SQLite
127    }
128
129    #[tracing::instrument(skip_all, level = "trace")]
130    async fn save_event(&self, event: &Event) -> Result<bool, DatabaseError> {
131        // Index event
132        let DatabaseEventResult {
133            to_store,
134            to_discard,
135        } = self.helper.index_event(event).await;
136
137        if !to_discard.is_empty() {
138            self.pool
139                .interact(move |conn| {
140                    let mut stmt = conn.prepare_cached("DELETE FROM events WHERE event_id = ?;")?;
141                    for id in to_discard.into_iter() {
142                        stmt.execute([id.to_hex()])?;
143                    }
144                    Ok::<(), Error>(())
145                })
146                .await??;
147        }
148
149        if to_store {
150            // Acquire FlatBuffers Builder
151            let mut fbb = self.fbb.write().await;
152
153            // Encode
154            let event_id: EventId = event.id;
155            let value: Vec<u8> = event.encode(&mut fbb).to_vec();
156
157            // Save event
158            self.pool
159                .interact(move |conn| {
160                    let mut stmt = conn.prepare_cached(
161                        "INSERT OR IGNORE INTO events (event_id, event) VALUES (?, ?);",
162                    )?;
163                    stmt.execute((event_id.to_hex(), value))
164                })
165                .await?
166                .map_err(DatabaseError::backend)?;
167
168            Ok(true)
169        } else {
170            Ok(false)
171        }
172    }
173
174    async fn check_id(&self, event_id: &EventId) -> Result<DatabaseEventStatus, DatabaseError> {
175        if self.helper.has_event_id_been_deleted(event_id).await {
176            Ok(DatabaseEventStatus::Deleted)
177        } else {
178            let event_id: String = event_id.to_hex();
179            self.pool
180                .interact(move |conn| {
181                    let mut stmt = conn
182                        .prepare_cached(
183                            "SELECT EXISTS(SELECT 1 FROM events WHERE event_id = ? LIMIT 1);",
184                        )
185                        .map_err(DatabaseError::backend)?;
186                    let mut rows = stmt.query([event_id]).map_err(DatabaseError::backend)?;
187                    let exists: u8 = match rows.next().map_err(DatabaseError::backend)? {
188                        Some(row) => row.get(0).map_err(DatabaseError::backend)?,
189                        None => 0,
190                    };
191                    Ok(if exists == 1 {
192                        DatabaseEventStatus::Saved
193                    } else {
194                        DatabaseEventStatus::NotExistent
195                    })
196                })
197                .await?
198        }
199    }
200
201    async fn has_coordinate_been_deleted(
202        &self,
203        coordinate: &Coordinate,
204        timestamp: &Timestamp,
205    ) -> Result<bool, DatabaseError> {
206        Ok(self
207            .helper
208            .has_coordinate_been_deleted(coordinate, timestamp)
209            .await)
210    }
211
212    async fn event_id_seen(
213        &self,
214        event_id: EventId,
215        relay_url: Url,
216    ) -> std::result::Result<(), DatabaseError> {
217        self.pool
218            .interact(move |conn| {
219                let mut stmt = conn.prepare_cached(
220                    "INSERT OR IGNORE INTO event_seen_by_relays (event_id, relay_url) VALUES (?, ?);",
221                )?;
222                stmt.execute((event_id.to_hex(), relay_url.to_string()))
223            })
224            .await?.map_err(DatabaseError::backend)?;
225        Ok(())
226    }
227
228    async fn event_seen_on_relays(
229        &self,
230        event_id: &EventId,
231    ) -> Result<Option<HashSet<Url>>, DatabaseError> {
232        let event_id: String = event_id.to_hex();
233        self.pool
234            .interact(move |conn| {
235                let mut stmt = conn
236                    .prepare_cached(
237                        "SELECT relay_url FROM event_seen_by_relays WHERE event_id = ?;",
238                    )
239                    .map_err(DatabaseError::backend)?;
240                let mut rows = stmt.query([event_id]).map_err(DatabaseError::backend)?;
241                let mut relays = HashSet::new();
242                while let Ok(Some(row)) = rows.next() {
243                    let url: &str = row
244                        .get_ref(0)
245                        .map_err(DatabaseError::backend)?
246                        .as_str()
247                        .map_err(DatabaseError::backend)?;
248                    relays.insert(Url::parse(url).map_err(DatabaseError::backend)?);
249                }
250                Ok(Some(relays))
251            })
252            .await?
253    }
254
255    #[tracing::instrument(skip_all, level = "trace")]
256    async fn event_by_id(&self, event_id: &EventId) -> Result<Option<Event>, DatabaseError> {
257        let event_id: String = event_id.to_hex();
258        self.pool
259            .interact(move |conn| {
260                let mut stmt = conn
261                    .prepare_cached("SELECT event FROM events WHERE event_id = ?;")
262                    .map_err(DatabaseError::backend)?;
263                let mut rows = stmt.query([event_id]).map_err(DatabaseError::backend)?;
264                match rows.next().map_err(DatabaseError::backend)? {
265                    Some(row) => {
266                        let buf: &[u8] = row
267                            .get_ref(0)
268                            .map_err(DatabaseError::backend)?
269                            .as_bytes()
270                            .map_err(DatabaseError::backend)?;
271                        Ok(Some(Event::decode(buf).map_err(DatabaseError::backend)?))
272                    }
273                    None => Ok(None),
274                }
275            })
276            .await?
277    }
278
279    #[inline]
280    #[tracing::instrument(skip_all, level = "trace")]
281    async fn count(&self, filters: Vec<Filter>) -> Result<usize, DatabaseError> {
282        Ok(self.helper.count(filters).await)
283    }
284
285    #[inline]
286    #[tracing::instrument(skip_all)]
287    async fn query(&self, filters: Vec<Filter>) -> Result<Vec<Event>, DatabaseError> {
288        Ok(self.helper.query(filters).await)
289    }
290
291    #[inline]
292    async fn negentropy_items(
293        &self,
294        filter: Filter,
295    ) -> Result<Vec<(EventId, Timestamp)>, DatabaseError> {
296        Ok(self.helper.negentropy_items(filter).await)
297    }
298
299    async fn delete(&self, filter: Filter) -> Result<(), DatabaseError> {
300        match self.helper.delete(filter).await {
301            Some(ids) => {
302                self.pool
303                    .interact(move |conn| {
304                        let mut stmt =
305                            conn.prepare_cached("DELETE FROM events WHERE event_id = ?;")?;
306                        for id in ids.into_iter() {
307                            stmt.execute([id.to_hex()])?;
308                        }
309                        Ok::<(), Error>(())
310                    })
311                    .await??;
312            }
313            None => {
314                self.pool
315                    .interact(move |conn| conn.execute("DELETE FROM events;", []))
316                    .await?
317                    .map_err(DatabaseError::backend)?;
318            }
319        };
320
321        Ok(())
322    }
323
324    async fn wipe(&self) -> Result<(), DatabaseError> {
325        self.pool
326            .interact(|conn| {
327                // Reset DB
328                conn.set_db_config(DbConfig::SQLITE_DBCONFIG_RESET_DATABASE, true)?;
329                conn.execute("VACUUM;", [])?;
330                conn.set_db_config(DbConfig::SQLITE_DBCONFIG_RESET_DATABASE, false)?;
331
332                // Execute migrations
333                conn.execute_batch(STARTUP_SQL)?;
334
335                Ok::<(), Error>(())
336            })
337            .await??;
338
339        migration::run(&self.pool).await?;
340
341        self.helper.clear().await;
342
343        Ok(())
344    }
345}