1#![forbid(unsafe_code)]
8#![warn(missing_docs)]
9#![warn(rustdoc::bare_urls)]
10#![allow(clippy::mutable_key_type)] use std::collections::{BTreeSet, HashSet};
13use std::path::Path;
14use std::sync::Arc;
15
16pub extern crate nostr;
17pub extern crate nostr_database as database;
18
19use async_trait::async_trait;
20use nostr_database::prelude::*;
21use rusqlite::config::DbConfig;
22use rusqlite::Connection;
23use tokio::sync::RwLock;
24
25mod error;
26mod migration;
27mod pool;
28
29use self::error::Error;
30use self::migration::STARTUP_SQL;
31use self::pool::Pool;
32
33#[deprecated(since = "0.35.0", note = "Use LMDB or other backend instead")]
35#[derive(Debug, Clone)]
36pub struct SQLiteDatabase {
37 pool: Pool,
38 helper: DatabaseHelper,
39 fbb: Arc<RwLock<FlatBufferBuilder<'static>>>,
40}
41
42#[allow(deprecated)]
43impl SQLiteDatabase {
44 async fn new<P>(path: P, helper: DatabaseHelper) -> Result<Self, DatabaseError>
45 where
46 P: AsRef<Path>,
47 {
48 let conn = Connection::open(path).map_err(DatabaseError::backend)?;
49 let pool: Pool = Pool::new(conn);
50
51 migration::run(&pool).await?;
53
54 let this = Self {
55 pool,
56 helper,
57 fbb: Arc::new(RwLock::new(FlatBufferBuilder::with_capacity(70_000))),
58 };
59
60 this.bulk_load().await?;
61
62 Ok(this)
63 }
64
65 #[inline]
67 pub async fn open<P>(path: P) -> Result<Self, DatabaseError>
68 where
69 P: AsRef<Path>,
70 {
71 Self::new(path, DatabaseHelper::unbounded()).await
72 }
73
74 #[inline]
76 pub async fn open_bounded<P>(path: P, max_capacity: usize) -> Result<Self, DatabaseError>
77 where
78 P: AsRef<Path>,
79 {
80 Self::new(path, DatabaseHelper::bounded(max_capacity)).await
81 }
82
83 #[tracing::instrument(skip_all)]
84 async fn bulk_load(&self) -> Result<(), DatabaseError> {
85 let events = self
86 .pool
87 .interact(move |conn| {
88 let mut stmt = conn.prepare("SELECT event FROM events;")?;
90 let mut rows = stmt.query([])?;
91
92 let mut events = BTreeSet::new();
94 while let Ok(Some(row)) = rows.next() {
95 let buf: &[u8] = row.get_ref(0)?.as_bytes()?;
96 let event = Event::decode(buf)?;
97 events.insert(event);
98 }
99 Ok::<BTreeSet<Event>, Error>(events)
100 })
101 .await??;
102
103 let to_discard: HashSet<EventId> = self.helper.bulk_load(events).await;
105
106 if !to_discard.is_empty() {
108 self.pool
109 .interact(move |conn| {
110 let mut stmt = conn.prepare_cached("DELETE FROM events WHERE event_id = ?;")?;
111 for id in to_discard.into_iter() {
112 stmt.execute([id.to_hex()])?;
113 }
114 Ok::<(), Error>(())
115 })
116 .await??;
117 }
118 Ok(())
119 }
120}
121
122#[async_trait]
123#[allow(deprecated)]
124impl NostrDatabase for SQLiteDatabase {
125 fn backend(&self) -> Backend {
126 Backend::SQLite
127 }
128
129 #[tracing::instrument(skip_all, level = "trace")]
130 async fn save_event(&self, event: &Event) -> Result<bool, DatabaseError> {
131 let DatabaseEventResult {
133 to_store,
134 to_discard,
135 } = self.helper.index_event(event).await;
136
137 if !to_discard.is_empty() {
138 self.pool
139 .interact(move |conn| {
140 let mut stmt = conn.prepare_cached("DELETE FROM events WHERE event_id = ?;")?;
141 for id in to_discard.into_iter() {
142 stmt.execute([id.to_hex()])?;
143 }
144 Ok::<(), Error>(())
145 })
146 .await??;
147 }
148
149 if to_store {
150 let mut fbb = self.fbb.write().await;
152
153 let event_id: EventId = event.id;
155 let value: Vec<u8> = event.encode(&mut fbb).to_vec();
156
157 self.pool
159 .interact(move |conn| {
160 let mut stmt = conn.prepare_cached(
161 "INSERT OR IGNORE INTO events (event_id, event) VALUES (?, ?);",
162 )?;
163 stmt.execute((event_id.to_hex(), value))
164 })
165 .await?
166 .map_err(DatabaseError::backend)?;
167
168 Ok(true)
169 } else {
170 Ok(false)
171 }
172 }
173
174 async fn check_id(&self, event_id: &EventId) -> Result<DatabaseEventStatus, DatabaseError> {
175 if self.helper.has_event_id_been_deleted(event_id).await {
176 Ok(DatabaseEventStatus::Deleted)
177 } else {
178 let event_id: String = event_id.to_hex();
179 self.pool
180 .interact(move |conn| {
181 let mut stmt = conn
182 .prepare_cached(
183 "SELECT EXISTS(SELECT 1 FROM events WHERE event_id = ? LIMIT 1);",
184 )
185 .map_err(DatabaseError::backend)?;
186 let mut rows = stmt.query([event_id]).map_err(DatabaseError::backend)?;
187 let exists: u8 = match rows.next().map_err(DatabaseError::backend)? {
188 Some(row) => row.get(0).map_err(DatabaseError::backend)?,
189 None => 0,
190 };
191 Ok(if exists == 1 {
192 DatabaseEventStatus::Saved
193 } else {
194 DatabaseEventStatus::NotExistent
195 })
196 })
197 .await?
198 }
199 }
200
201 async fn has_coordinate_been_deleted(
202 &self,
203 coordinate: &Coordinate,
204 timestamp: &Timestamp,
205 ) -> Result<bool, DatabaseError> {
206 Ok(self
207 .helper
208 .has_coordinate_been_deleted(coordinate, timestamp)
209 .await)
210 }
211
212 async fn event_id_seen(
213 &self,
214 event_id: EventId,
215 relay_url: Url,
216 ) -> std::result::Result<(), DatabaseError> {
217 self.pool
218 .interact(move |conn| {
219 let mut stmt = conn.prepare_cached(
220 "INSERT OR IGNORE INTO event_seen_by_relays (event_id, relay_url) VALUES (?, ?);",
221 )?;
222 stmt.execute((event_id.to_hex(), relay_url.to_string()))
223 })
224 .await?.map_err(DatabaseError::backend)?;
225 Ok(())
226 }
227
228 async fn event_seen_on_relays(
229 &self,
230 event_id: &EventId,
231 ) -> Result<Option<HashSet<Url>>, DatabaseError> {
232 let event_id: String = event_id.to_hex();
233 self.pool
234 .interact(move |conn| {
235 let mut stmt = conn
236 .prepare_cached(
237 "SELECT relay_url FROM event_seen_by_relays WHERE event_id = ?;",
238 )
239 .map_err(DatabaseError::backend)?;
240 let mut rows = stmt.query([event_id]).map_err(DatabaseError::backend)?;
241 let mut relays = HashSet::new();
242 while let Ok(Some(row)) = rows.next() {
243 let url: &str = row
244 .get_ref(0)
245 .map_err(DatabaseError::backend)?
246 .as_str()
247 .map_err(DatabaseError::backend)?;
248 relays.insert(Url::parse(url).map_err(DatabaseError::backend)?);
249 }
250 Ok(Some(relays))
251 })
252 .await?
253 }
254
255 #[tracing::instrument(skip_all, level = "trace")]
256 async fn event_by_id(&self, event_id: &EventId) -> Result<Option<Event>, DatabaseError> {
257 let event_id: String = event_id.to_hex();
258 self.pool
259 .interact(move |conn| {
260 let mut stmt = conn
261 .prepare_cached("SELECT event FROM events WHERE event_id = ?;")
262 .map_err(DatabaseError::backend)?;
263 let mut rows = stmt.query([event_id]).map_err(DatabaseError::backend)?;
264 match rows.next().map_err(DatabaseError::backend)? {
265 Some(row) => {
266 let buf: &[u8] = row
267 .get_ref(0)
268 .map_err(DatabaseError::backend)?
269 .as_bytes()
270 .map_err(DatabaseError::backend)?;
271 Ok(Some(Event::decode(buf).map_err(DatabaseError::backend)?))
272 }
273 None => Ok(None),
274 }
275 })
276 .await?
277 }
278
279 #[inline]
280 #[tracing::instrument(skip_all, level = "trace")]
281 async fn count(&self, filters: Vec<Filter>) -> Result<usize, DatabaseError> {
282 Ok(self.helper.count(filters).await)
283 }
284
285 #[inline]
286 #[tracing::instrument(skip_all)]
287 async fn query(&self, filters: Vec<Filter>) -> Result<Vec<Event>, DatabaseError> {
288 Ok(self.helper.query(filters).await)
289 }
290
291 #[inline]
292 async fn negentropy_items(
293 &self,
294 filter: Filter,
295 ) -> Result<Vec<(EventId, Timestamp)>, DatabaseError> {
296 Ok(self.helper.negentropy_items(filter).await)
297 }
298
299 async fn delete(&self, filter: Filter) -> Result<(), DatabaseError> {
300 match self.helper.delete(filter).await {
301 Some(ids) => {
302 self.pool
303 .interact(move |conn| {
304 let mut stmt =
305 conn.prepare_cached("DELETE FROM events WHERE event_id = ?;")?;
306 for id in ids.into_iter() {
307 stmt.execute([id.to_hex()])?;
308 }
309 Ok::<(), Error>(())
310 })
311 .await??;
312 }
313 None => {
314 self.pool
315 .interact(move |conn| conn.execute("DELETE FROM events;", []))
316 .await?
317 .map_err(DatabaseError::backend)?;
318 }
319 };
320
321 Ok(())
322 }
323
324 async fn wipe(&self) -> Result<(), DatabaseError> {
325 self.pool
326 .interact(|conn| {
327 conn.set_db_config(DbConfig::SQLITE_DBCONFIG_RESET_DATABASE, true)?;
329 conn.execute("VACUUM;", [])?;
330 conn.set_db_config(DbConfig::SQLITE_DBCONFIG_RESET_DATABASE, false)?;
331
332 conn.execute_batch(STARTUP_SQL)?;
334
335 Ok::<(), Error>(())
336 })
337 .await??;
338
339 migration::run(&self.pool).await?;
340
341 self.helper.clear().await;
342
343 Ok(())
344 }
345}