sos_database/entity/
event.rs

1use crate::Error;
2use async_sqlite::rusqlite::{
3    CachedStatement, Connection, Error as SqlError, Row,
4};
5use sos_core::{
6    commit::CommitHash,
7    events::{EventLogType, EventRecord},
8    UtcDateTime,
9};
10use sql_query_builder as sql;
11use std::ops::Deref;
12
13fn event_select_columns(sql: sql::Select) -> sql::Select {
14    sql.select(
15        r#"
16            event_id, created_at, commit_hash, event
17        "#,
18    )
19}
20
21/// Enumeration of tables for events.
22#[derive(Debug, Copy, Clone)]
23#[allow(clippy::enum_variant_names)]
24enum EventTable {
25    /// Account events table.
26    AccountEvents,
27    /// Folder events table.
28    FolderEvents,
29    /// Device events table.
30    DeviceEvents,
31    /// File events table.
32    #[cfg(feature = "files")]
33    FileEvents,
34}
35
36impl From<EventLogType> for EventTable {
37    fn from(value: EventLogType) -> Self {
38        match value {
39            EventLogType::Account => Self::AccountEvents,
40            EventLogType::Identity => Self::FolderEvents,
41            EventLogType::Device => Self::DeviceEvents,
42            #[cfg(feature = "files")]
43            EventLogType::Files => Self::FileEvents,
44            EventLogType::Folder(_) => Self::FolderEvents,
45        }
46    }
47}
48
49impl EventTable {
50    /// Table name.
51    pub fn as_str(&self) -> &'static str {
52        match self {
53            EventTable::AccountEvents => "account_events",
54            EventTable::FolderEvents => "folder_events",
55            EventTable::DeviceEvents => "device_events",
56            #[cfg(feature = "files")]
57            EventTable::FileEvents => "file_events",
58        }
59    }
60
61    /// Identifier column name.
62    ///
63    /// Events for a folder belong to a folder, other event logs
64    /// belong to the account.
65    pub fn id_column(&self) -> &'static str {
66        match self {
67            EventTable::FolderEvents => "folder_id",
68            _ => "account_id",
69        }
70    }
71}
72
73/// Commit row.
74#[derive(Debug)]
75pub struct CommitRow {
76    /// Row identifier.
77    pub row_id: i64,
78    /// Commit hash.
79    pub commit_hash: Vec<u8>,
80}
81
82impl<'a> TryFrom<&Row<'a>> for CommitRow {
83    type Error = SqlError;
84    fn try_from(row: &Row<'a>) -> Result<Self, Self::Error> {
85        Ok(CommitRow {
86            row_id: row.get(0)?,
87            commit_hash: row.get(1)?,
88        })
89    }
90}
91
92/// Commit record.
93pub struct CommitRecord {
94    /// Row identifier.
95    pub row_id: i64,
96    /// Commit hash.
97    pub commit_hash: CommitHash,
98}
99
100impl TryFrom<CommitRow> for CommitRecord {
101    type Error = Error;
102
103    fn try_from(value: CommitRow) -> Result<Self, Self::Error> {
104        Ok(CommitRecord {
105            row_id: value.row_id,
106            commit_hash: CommitHash(value.commit_hash.as_slice().try_into()?),
107        })
108    }
109}
110
111/// Commit record row.
112#[derive(Debug, Default)]
113pub struct EventRecordRow {
114    /// Row identifier.
115    pub row_id: i64,
116    /// Row created date and time.
117    created_at: String,
118    /// Commit hash.
119    commit_hash: Vec<u8>,
120    /// Event bytes.
121    event_bytes: Vec<u8>,
122}
123
124impl EventRecordRow {
125    /// Create a new event record row for insertion.
126    pub fn new(record: &EventRecord) -> Result<Self, Error> {
127        Ok(Self {
128            created_at: record.time().to_rfc3339()?,
129            commit_hash: record.commit().as_ref().to_vec(),
130            event_bytes: record.event_bytes().to_vec(),
131            ..Default::default()
132        })
133    }
134}
135
136impl<'a> TryFrom<&Row<'a>> for EventRecordRow {
137    type Error = SqlError;
138    fn try_from(row: &Row<'a>) -> Result<Self, Self::Error> {
139        Ok(EventRecordRow {
140            row_id: row.get(0)?,
141            created_at: row.get(1)?,
142            commit_hash: row.get(2)?,
143            event_bytes: row.get(3)?,
144        })
145    }
146}
147
148impl TryFrom<EventRecordRow> for EventRecord {
149    type Error = Error;
150
151    fn try_from(value: EventRecordRow) -> Result<Self, Self::Error> {
152        Ok(EventRecord::new(
153            UtcDateTime::parse_rfc3339(&value.created_at)?,
154            Default::default(),
155            CommitHash(value.commit_hash.as_slice().try_into()?),
156            value.event_bytes,
157        ))
158    }
159}
160
161/// Event entity.
162pub struct EventEntity<'conn, C>
163where
164    C: Deref<Target = Connection>,
165{
166    conn: &'conn C,
167}
168
169impl<'conn> EventEntity<'conn, Box<Connection>> {
170    /// Query to find all events.
171    pub fn find_all_query(
172        log_type: EventLogType,
173        reverse: bool,
174    ) -> sql::Select {
175        let table: EventTable = log_type.into();
176        let mut query = event_select_columns(sql::Select::new())
177            .from(table.as_str())
178            .where_clause(&format!("{}=?1", table.id_column()));
179        if reverse {
180            query = query.order_by("event_id DESC");
181        } else {
182            query = query.order_by("event_id ASC");
183        }
184        query
185    }
186}
187
188impl<'conn, C> EventEntity<'conn, C>
189where
190    C: Deref<Target = Connection>,
191{
192    /// Create a new event entity.
193    pub fn new(conn: &'conn C) -> Self {
194        Self { conn }
195    }
196
197    /// Find an event record in the database.
198    pub fn find_one(
199        &self,
200        log_type: EventLogType,
201        event_id: i64,
202    ) -> Result<EventRecordRow, SqlError> {
203        let table: EventTable = log_type.into();
204        let query = event_select_columns(sql::Select::new())
205            .from(table.as_str())
206            .where_clause("event_id=?1");
207        let mut stmt = self.conn.prepare_cached(&query.as_string())?;
208        stmt.query_row([event_id], |row| row.try_into())
209    }
210
211    /// Delete an event from the database table.
212    pub fn delete_one(
213        &self,
214        log_type: EventLogType,
215        commit_hash: &CommitHash,
216    ) -> Result<(), SqlError> {
217        let table: EventTable = log_type.into();
218        let query = sql::Delete::new()
219            .delete_from(table.as_str())
220            .where_clause("commit_hash = ?1");
221        let mut stmt = self.conn.prepare_cached(&query.as_string())?;
222        stmt.execute([commit_hash.as_ref()])?;
223        Ok(())
224    }
225
226    /// Insert events into an event log table.
227    pub fn insert_events(
228        &self,
229        log_type: EventLogType,
230        account_or_folder_id: i64,
231        events: &[EventRecordRow],
232    ) -> Result<Vec<i64>, SqlError> {
233        let table: EventTable = log_type.into();
234        let query = sql::Insert::new()
235            .insert_into(&format!(
236                "{} ({}, created_at, commit_hash, event)",
237                table.as_str(),
238                table.id_column()
239            ))
240            .values("(?1, ?2, ?3, ?4)");
241        let stmt = self.conn.prepare_cached(&query.as_string())?;
242        self.create_events(stmt, account_or_folder_id, events)
243    }
244
245    /// Create account events in the database.
246    pub fn insert_account_events(
247        &self,
248        account_id: i64,
249        events: &[EventRecordRow],
250    ) -> Result<Vec<i64>, SqlError> {
251        self.insert_events(EventLogType::Account, account_id, events)
252    }
253
254    /// Create folder events in the database.
255    pub fn insert_folder_events(
256        &self,
257        folder_id: i64,
258        events: &[EventRecordRow],
259    ) -> Result<Vec<i64>, SqlError> {
260        self.insert_events(EventLogType::Identity, folder_id, events)
261    }
262
263    /// Create device events in the database.
264    pub fn insert_device_events(
265        &self,
266        account_id: i64,
267        events: &[EventRecordRow],
268    ) -> Result<Vec<i64>, SqlError> {
269        self.insert_events(EventLogType::Device, account_id, events)
270    }
271
272    /// Create file events in the database.
273    #[cfg(feature = "files")]
274    pub fn insert_file_events(
275        &self,
276        account_id: i64,
277        events: &[EventRecordRow],
278    ) -> Result<Vec<i64>, SqlError> {
279        self.insert_events(EventLogType::Files, account_id, events)
280    }
281
282    /// Load event records for a folder.
283    pub fn load_events(
284        &self,
285        log_type: EventLogType,
286        account_id: i64,
287        folder_id: Option<i64>,
288    ) -> crate::Result<Vec<EventRecordRow>> {
289        let id = folder_id.unwrap_or(account_id);
290        let table: EventTable = log_type.into();
291        let query = event_select_columns(sql::Select::new())
292            .from(table.as_str())
293            .where_clause(&format!("{}=?1", table.id_column()))
294            .order_by("event_id ASC");
295
296        let mut stmt = self.conn.prepare_cached(&query.as_string())?;
297
298        fn convert_row(
299            row: &Row<'_>,
300        ) -> Result<EventRecordRow, crate::Error> {
301            Ok(row.try_into()?)
302        }
303
304        let rows = stmt.query_and_then([id], convert_row)?;
305
306        let mut events = Vec::new();
307        for row in rows {
308            events.push(row?);
309        }
310        Ok(events)
311    }
312
313    /// Load commits and identifiers for a folder.
314    pub fn load_commits(
315        &self,
316        log_type: EventLogType,
317        account_or_folder_id: i64,
318    ) -> crate::Result<Vec<CommitRow>> {
319        let table: EventTable = log_type.into();
320        let query = sql::Select::new()
321            .select("event_id, commit_hash")
322            .from(table.as_str())
323            .where_clause(&format!("{}=?1", table.id_column()))
324            .order_by("event_id ASC");
325
326        let mut stmt = self.conn.prepare_cached(&query.as_string())?;
327
328        fn convert_row(row: &Row<'_>) -> Result<CommitRow, crate::Error> {
329            Ok(row.try_into()?)
330        }
331
332        let rows =
333            stmt.query_and_then([account_or_folder_id], convert_row)?;
334
335        let mut commits = Vec::new();
336        for row in rows {
337            commits.push(row?);
338        }
339        Ok(commits)
340    }
341
342    /// Delete all event logs.
343    pub fn delete_all_events(
344        &self,
345        log_type: EventLogType,
346        account_or_folder_id: i64,
347    ) -> Result<usize, SqlError> {
348        let table: EventTable = log_type.into();
349        let query = sql::Delete::new()
350            .delete_from(table.as_str())
351            .where_clause(&format!("{}=?1", table.id_column()));
352        let mut stmt = self.conn.prepare_cached(&query.as_string())?;
353        stmt.execute([account_or_folder_id])
354    }
355
356    fn create_events(
357        &self,
358        mut stmt: CachedStatement<'_>,
359        id: i64,
360        events: &[EventRecordRow],
361    ) -> Result<Vec<i64>, SqlError> {
362        let mut ids = Vec::new();
363        for record in events {
364            stmt.execute((
365                &id,
366                &record.created_at,
367                &record.commit_hash,
368                &record.event_bytes,
369            ))?;
370            ids.push(self.conn.last_insert_rowid());
371        }
372        Ok(ids)
373    }
374}