#![forbid(unsafe_code)]
#![warn(missing_docs)]
#![warn(rustdoc::bare_urls)]
#![allow(clippy::mutable_key_type)]
use std::collections::{BTreeSet, HashSet};
use std::path::Path;
use std::sync::Arc;
pub extern crate nostr;
pub extern crate nostr_database as database;
use async_trait::async_trait;
use nostr_database::prelude::*;
use rusqlite::config::DbConfig;
use rusqlite::Connection;
use tokio::sync::RwLock;
mod error;
mod migration;
mod pool;
use self::error::Error;
use self::migration::STARTUP_SQL;
use self::pool::Pool;
#[deprecated(since = "0.35.0", note = "Use LMDB or other backend instead")]
#[derive(Debug, Clone)]
pub struct SQLiteDatabase {
pool: Pool,
helper: DatabaseHelper,
fbb: Arc<RwLock<FlatBufferBuilder<'static>>>,
}
#[allow(deprecated)]
impl SQLiteDatabase {
async fn new<P>(path: P, helper: DatabaseHelper) -> Result<Self, DatabaseError>
where
P: AsRef<Path>,
{
let conn = Connection::open(path).map_err(DatabaseError::backend)?;
let pool: Pool = Pool::new(conn);
migration::run(&pool).await?;
let this = Self {
pool,
helper,
fbb: Arc::new(RwLock::new(FlatBufferBuilder::with_capacity(70_000))),
};
this.bulk_load().await?;
Ok(this)
}
#[inline]
pub async fn open<P>(path: P) -> Result<Self, DatabaseError>
where
P: AsRef<Path>,
{
Self::new(path, DatabaseHelper::unbounded()).await
}
#[inline]
pub async fn open_bounded<P>(path: P, max_capacity: usize) -> Result<Self, DatabaseError>
where
P: AsRef<Path>,
{
Self::new(path, DatabaseHelper::bounded(max_capacity)).await
}
#[tracing::instrument(skip_all)]
async fn bulk_load(&self) -> Result<(), DatabaseError> {
let events = self
.pool
.interact(move |conn| {
let mut stmt = conn.prepare("SELECT event FROM events;")?;
let mut rows = stmt.query([])?;
let mut events = BTreeSet::new();
while let Ok(Some(row)) = rows.next() {
let buf: &[u8] = row.get_ref(0)?.as_bytes()?;
let event = Event::decode(buf)?;
events.insert(event);
}
Ok::<BTreeSet<Event>, Error>(events)
})
.await??;
let to_discard: HashSet<EventId> = self.helper.bulk_load(events).await;
if !to_discard.is_empty() {
self.pool
.interact(move |conn| {
let mut stmt = conn.prepare_cached("DELETE FROM events WHERE event_id = ?;")?;
for id in to_discard.into_iter() {
stmt.execute([id.to_hex()])?;
}
Ok::<(), Error>(())
})
.await??;
}
Ok(())
}
}
#[async_trait]
#[allow(deprecated)]
impl NostrDatabase for SQLiteDatabase {
fn backend(&self) -> Backend {
Backend::SQLite
}
#[tracing::instrument(skip_all, level = "trace")]
async fn save_event(&self, event: &Event) -> Result<bool, DatabaseError> {
let DatabaseEventResult {
to_store,
to_discard,
} = self.helper.index_event(event).await;
if !to_discard.is_empty() {
self.pool
.interact(move |conn| {
let mut stmt = conn.prepare_cached("DELETE FROM events WHERE event_id = ?;")?;
for id in to_discard.into_iter() {
stmt.execute([id.to_hex()])?;
}
Ok::<(), Error>(())
})
.await??;
}
if to_store {
let mut fbb = self.fbb.write().await;
let event_id: EventId = event.id;
let value: Vec<u8> = event.encode(&mut fbb).to_vec();
self.pool
.interact(move |conn| {
let mut stmt = conn.prepare_cached(
"INSERT OR IGNORE INTO events (event_id, event) VALUES (?, ?);",
)?;
stmt.execute((event_id.to_hex(), value))
})
.await?
.map_err(DatabaseError::backend)?;
Ok(true)
} else {
Ok(false)
}
}
async fn check_id(&self, event_id: &EventId) -> Result<DatabaseEventStatus, DatabaseError> {
if self.helper.has_event_id_been_deleted(event_id).await {
Ok(DatabaseEventStatus::Deleted)
} else {
let event_id: String = event_id.to_hex();
self.pool
.interact(move |conn| {
let mut stmt = conn
.prepare_cached(
"SELECT EXISTS(SELECT 1 FROM events WHERE event_id = ? LIMIT 1);",
)
.map_err(DatabaseError::backend)?;
let mut rows = stmt.query([event_id]).map_err(DatabaseError::backend)?;
let exists: u8 = match rows.next().map_err(DatabaseError::backend)? {
Some(row) => row.get(0).map_err(DatabaseError::backend)?,
None => 0,
};
Ok(if exists == 1 {
DatabaseEventStatus::Saved
} else {
DatabaseEventStatus::NotExistent
})
})
.await?
}
}
async fn has_coordinate_been_deleted(
&self,
coordinate: &Coordinate,
timestamp: &Timestamp,
) -> Result<bool, DatabaseError> {
Ok(self
.helper
.has_coordinate_been_deleted(coordinate, timestamp)
.await)
}
async fn event_id_seen(
&self,
event_id: EventId,
relay_url: Url,
) -> std::result::Result<(), DatabaseError> {
self.pool
.interact(move |conn| {
let mut stmt = conn.prepare_cached(
"INSERT OR IGNORE INTO event_seen_by_relays (event_id, relay_url) VALUES (?, ?);",
)?;
stmt.execute((event_id.to_hex(), relay_url.to_string()))
})
.await?.map_err(DatabaseError::backend)?;
Ok(())
}
async fn event_seen_on_relays(
&self,
event_id: &EventId,
) -> Result<Option<HashSet<Url>>, DatabaseError> {
let event_id: String = event_id.to_hex();
self.pool
.interact(move |conn| {
let mut stmt = conn
.prepare_cached(
"SELECT relay_url FROM event_seen_by_relays WHERE event_id = ?;",
)
.map_err(DatabaseError::backend)?;
let mut rows = stmt.query([event_id]).map_err(DatabaseError::backend)?;
let mut relays = HashSet::new();
while let Ok(Some(row)) = rows.next() {
let url: &str = row
.get_ref(0)
.map_err(DatabaseError::backend)?
.as_str()
.map_err(DatabaseError::backend)?;
relays.insert(Url::parse(url).map_err(DatabaseError::backend)?);
}
Ok(Some(relays))
})
.await?
}
#[tracing::instrument(skip_all, level = "trace")]
async fn event_by_id(&self, event_id: &EventId) -> Result<Option<Event>, DatabaseError> {
let event_id: String = event_id.to_hex();
self.pool
.interact(move |conn| {
let mut stmt = conn
.prepare_cached("SELECT event FROM events WHERE event_id = ?;")
.map_err(DatabaseError::backend)?;
let mut rows = stmt.query([event_id]).map_err(DatabaseError::backend)?;
match rows.next().map_err(DatabaseError::backend)? {
Some(row) => {
let buf: &[u8] = row
.get_ref(0)
.map_err(DatabaseError::backend)?
.as_bytes()
.map_err(DatabaseError::backend)?;
Ok(Some(Event::decode(buf).map_err(DatabaseError::backend)?))
}
None => Ok(None),
}
})
.await?
}
#[inline]
#[tracing::instrument(skip_all, level = "trace")]
async fn count(&self, filters: Vec<Filter>) -> Result<usize, DatabaseError> {
Ok(self.helper.count(filters).await)
}
#[inline]
#[tracing::instrument(skip_all)]
async fn query(&self, filters: Vec<Filter>) -> Result<Vec<Event>, DatabaseError> {
Ok(self.helper.query(filters).await)
}
#[inline]
async fn negentropy_items(
&self,
filter: Filter,
) -> Result<Vec<(EventId, Timestamp)>, DatabaseError> {
Ok(self.helper.negentropy_items(filter).await)
}
async fn delete(&self, filter: Filter) -> Result<(), DatabaseError> {
match self.helper.delete(filter).await {
Some(ids) => {
self.pool
.interact(move |conn| {
let mut stmt =
conn.prepare_cached("DELETE FROM events WHERE event_id = ?;")?;
for id in ids.into_iter() {
stmt.execute([id.to_hex()])?;
}
Ok::<(), Error>(())
})
.await??;
}
None => {
self.pool
.interact(move |conn| conn.execute("DELETE FROM events;", []))
.await?
.map_err(DatabaseError::backend)?;
}
};
Ok(())
}
async fn wipe(&self) -> Result<(), DatabaseError> {
self.pool
.interact(|conn| {
conn.set_db_config(DbConfig::SQLITE_DBCONFIG_RESET_DATABASE, true)?;
conn.execute("VACUUM;", [])?;
conn.set_db_config(DbConfig::SQLITE_DBCONFIG_RESET_DATABASE, false)?;
conn.execute_batch(STARTUP_SQL)?;
Ok::<(), Error>(())
})
.await??;
migration::run(&self.pool).await?;
self.helper.clear().await;
Ok(())
}
}