use crate::Error;
use async_sqlite::rusqlite::{
CachedStatement, Connection, Error as SqlError, Row,
};
use sos_core::{
commit::CommitHash,
events::{EventLogType, EventRecord},
UtcDateTime,
};
use sql_query_builder as sql;
use std::ops::Deref;
fn event_select_columns(sql: sql::Select) -> sql::Select {
sql.select(
r#"
event_id, created_at, commit_hash, event
"#,
)
}
#[derive(Debug, Copy, Clone)]
#[allow(clippy::enum_variant_names)]
enum EventTable {
AccountEvents,
FolderEvents,
DeviceEvents,
#[cfg(feature = "files")]
FileEvents,
}
impl From<EventLogType> for EventTable {
fn from(value: EventLogType) -> Self {
match value {
EventLogType::Account => Self::AccountEvents,
EventLogType::Identity => Self::FolderEvents,
EventLogType::Device => Self::DeviceEvents,
#[cfg(feature = "files")]
EventLogType::Files => Self::FileEvents,
EventLogType::Folder(_) => Self::FolderEvents,
}
}
}
impl EventTable {
pub fn as_str(&self) -> &'static str {
match self {
EventTable::AccountEvents => "account_events",
EventTable::FolderEvents => "folder_events",
EventTable::DeviceEvents => "device_events",
#[cfg(feature = "files")]
EventTable::FileEvents => "file_events",
}
}
pub fn id_column(&self) -> &'static str {
match self {
EventTable::FolderEvents => "folder_id",
_ => "account_id",
}
}
}
#[derive(Debug)]
pub struct CommitRow {
pub row_id: i64,
pub commit_hash: Vec<u8>,
}
impl<'a> TryFrom<&Row<'a>> for CommitRow {
type Error = SqlError;
fn try_from(row: &Row<'a>) -> Result<Self, Self::Error> {
Ok(CommitRow {
row_id: row.get(0)?,
commit_hash: row.get(1)?,
})
}
}
pub struct CommitRecord {
pub row_id: i64,
pub commit_hash: CommitHash,
}
impl TryFrom<CommitRow> for CommitRecord {
type Error = Error;
fn try_from(value: CommitRow) -> Result<Self, Self::Error> {
Ok(CommitRecord {
row_id: value.row_id,
commit_hash: CommitHash(value.commit_hash.as_slice().try_into()?),
})
}
}
#[derive(Debug, Default)]
pub struct EventRecordRow {
pub row_id: i64,
created_at: String,
commit_hash: Vec<u8>,
event_bytes: Vec<u8>,
}
impl EventRecordRow {
pub fn new(record: &EventRecord) -> Result<Self, Error> {
Ok(Self {
created_at: record.time().to_rfc3339()?,
commit_hash: record.commit().as_ref().to_vec(),
event_bytes: record.event_bytes().to_vec(),
..Default::default()
})
}
}
impl<'a> TryFrom<&Row<'a>> for EventRecordRow {
type Error = SqlError;
fn try_from(row: &Row<'a>) -> Result<Self, Self::Error> {
Ok(EventRecordRow {
row_id: row.get(0)?,
created_at: row.get(1)?,
commit_hash: row.get(2)?,
event_bytes: row.get(3)?,
})
}
}
impl TryFrom<EventRecordRow> for EventRecord {
type Error = Error;
fn try_from(value: EventRecordRow) -> Result<Self, Self::Error> {
Ok(EventRecord::new(
UtcDateTime::parse_rfc3339(&value.created_at)?,
Default::default(),
CommitHash(value.commit_hash.as_slice().try_into()?),
value.event_bytes,
))
}
}
pub struct EventEntity<'conn, C>
where
C: Deref<Target = Connection>,
{
conn: &'conn C,
}
impl<'conn> EventEntity<'conn, Box<Connection>> {
pub fn find_all_query(
log_type: EventLogType,
reverse: bool,
) -> sql::Select {
let table: EventTable = log_type.into();
let mut query = event_select_columns(sql::Select::new())
.from(table.as_str())
.where_clause(&format!("{}=?1", table.id_column()));
if reverse {
query = query.order_by("event_id DESC");
} else {
query = query.order_by("event_id ASC");
}
query
}
}
impl<'conn, C> EventEntity<'conn, C>
where
C: Deref<Target = Connection>,
{
pub fn new(conn: &'conn C) -> Self {
Self { conn }
}
pub fn find_one(
&self,
log_type: EventLogType,
event_id: i64,
) -> Result<EventRecordRow, SqlError> {
let table: EventTable = log_type.into();
let query = event_select_columns(sql::Select::new())
.from(table.as_str())
.where_clause("event_id=?1");
let mut stmt = self.conn.prepare_cached(&query.as_string())?;
stmt.query_row([event_id], |row| row.try_into())
}
pub fn delete_one(
&self,
log_type: EventLogType,
commit_hash: &CommitHash,
) -> Result<(), SqlError> {
let table: EventTable = log_type.into();
let query = sql::Delete::new()
.delete_from(table.as_str())
.where_clause("commit_hash = ?1");
let mut stmt = self.conn.prepare_cached(&query.as_string())?;
stmt.execute([commit_hash.as_ref()])?;
Ok(())
}
pub fn insert_events(
&self,
log_type: EventLogType,
account_or_folder_id: i64,
events: &[EventRecordRow],
) -> Result<Vec<i64>, SqlError> {
let table: EventTable = log_type.into();
let query = sql::Insert::new()
.insert_into(&format!(
"{} ({}, created_at, commit_hash, event)",
table.as_str(),
table.id_column()
))
.values("(?1, ?2, ?3, ?4)");
let stmt = self.conn.prepare_cached(&query.as_string())?;
self.create_events(stmt, account_or_folder_id, events)
}
pub fn insert_account_events(
&self,
account_id: i64,
events: &[EventRecordRow],
) -> Result<Vec<i64>, SqlError> {
self.insert_events(EventLogType::Account, account_id, events)
}
pub fn insert_folder_events(
&self,
folder_id: i64,
events: &[EventRecordRow],
) -> Result<Vec<i64>, SqlError> {
self.insert_events(EventLogType::Identity, folder_id, events)
}
pub fn insert_device_events(
&self,
account_id: i64,
events: &[EventRecordRow],
) -> Result<Vec<i64>, SqlError> {
self.insert_events(EventLogType::Device, account_id, events)
}
#[cfg(feature = "files")]
pub fn insert_file_events(
&self,
account_id: i64,
events: &[EventRecordRow],
) -> Result<Vec<i64>, SqlError> {
self.insert_events(EventLogType::Files, account_id, events)
}
pub fn load_events(
&self,
log_type: EventLogType,
account_id: i64,
folder_id: Option<i64>,
) -> crate::Result<Vec<EventRecordRow>> {
let id = folder_id.unwrap_or(account_id);
let table: EventTable = log_type.into();
let query = event_select_columns(sql::Select::new())
.from(table.as_str())
.where_clause(&format!("{}=?1", table.id_column()))
.order_by("event_id ASC");
let mut stmt = self.conn.prepare_cached(&query.as_string())?;
fn convert_row(
row: &Row<'_>,
) -> Result<EventRecordRow, crate::Error> {
Ok(row.try_into()?)
}
let rows = stmt.query_and_then([id], convert_row)?;
let mut events = Vec::new();
for row in rows {
events.push(row?);
}
Ok(events)
}
pub fn load_commits(
&self,
log_type: EventLogType,
account_or_folder_id: i64,
) -> crate::Result<Vec<CommitRow>> {
let table: EventTable = log_type.into();
let query = sql::Select::new()
.select("event_id, commit_hash")
.from(table.as_str())
.where_clause(&format!("{}=?1", table.id_column()))
.order_by("event_id ASC");
let mut stmt = self.conn.prepare_cached(&query.as_string())?;
fn convert_row(row: &Row<'_>) -> Result<CommitRow, crate::Error> {
Ok(row.try_into()?)
}
let rows =
stmt.query_and_then([account_or_folder_id], convert_row)?;
let mut commits = Vec::new();
for row in rows {
commits.push(row?);
}
Ok(commits)
}
pub fn delete_all_events(
&self,
log_type: EventLogType,
account_or_folder_id: i64,
) -> Result<usize, SqlError> {
let table: EventTable = log_type.into();
let query = sql::Delete::new()
.delete_from(table.as_str())
.where_clause(&format!("{}=?1", table.id_column()));
let mut stmt = self.conn.prepare_cached(&query.as_string())?;
stmt.execute([account_or_folder_id])
}
fn create_events(
&self,
mut stmt: CachedStatement<'_>,
id: i64,
events: &[EventRecordRow],
) -> Result<Vec<i64>, SqlError> {
let mut ids = Vec::new();
for record in events {
stmt.execute((
&id,
&record.created_at,
&record.commit_hash,
&record.event_bytes,
))?;
ids.push(self.conn.last_insert_rowid());
}
Ok(ids)
}
}