sos_database/entity/
event.rs

1use crate::Error;
2use async_sqlite::rusqlite::{
3    CachedStatement, Connection, Error as SqlError, Row,
4};
5use sos_core::{
6    commit::CommitHash,
7    events::{EventLogType, EventRecord},
8    UtcDateTime,
9};
10use sql_query_builder as sql;
11use std::ops::Deref;
12
13fn event_select_columns(sql: sql::Select) -> sql::Select {
14    sql.select(
15        r#"
16            event_id, created_at, commit_hash, event
17        "#,
18    )
19}
20
21/// Enumeration of tables for events.
22#[derive(Debug, Copy, Clone)]
23enum EventTable {
24    /// Account events table.
25    AccountEvents,
26    /// Folder events table.
27    FolderEvents,
28    /// Device events table.
29    DeviceEvents,
30    /// File events table.
31    FileEvents,
32}
33
34impl From<EventLogType> for EventTable {
35    fn from(value: EventLogType) -> Self {
36        match value {
37            EventLogType::Account => Self::AccountEvents,
38            EventLogType::Identity => Self::FolderEvents,
39            EventLogType::Device => Self::DeviceEvents,
40            #[cfg(feature = "files")]
41            EventLogType::Files => Self::FileEvents,
42            EventLogType::Folder(_) => Self::FolderEvents,
43        }
44    }
45}
46
47impl EventTable {
48    /// Table name.
49    pub fn as_str(&self) -> &'static str {
50        match self {
51            EventTable::AccountEvents => "account_events",
52            EventTable::FolderEvents => "folder_events",
53            EventTable::DeviceEvents => "device_events",
54            EventTable::FileEvents => "file_events",
55        }
56    }
57
58    /// Identifier column name.
59    ///
60    /// Events for a folder belong to a folder, other event logs
61    /// belong to the account.
62    pub fn id_column(&self) -> &'static str {
63        match self {
64            EventTable::FolderEvents => "folder_id",
65            _ => "account_id",
66        }
67    }
68}
69
70/// Commit row.
71#[derive(Debug)]
72pub struct CommitRow {
73    /// Row identifier.
74    pub row_id: i64,
75    /// Commit hash.
76    pub commit_hash: Vec<u8>,
77}
78
79impl<'a> TryFrom<&Row<'a>> for CommitRow {
80    type Error = SqlError;
81    fn try_from(row: &Row<'a>) -> Result<Self, Self::Error> {
82        Ok(CommitRow {
83            row_id: row.get(0)?,
84            commit_hash: row.get(1)?,
85        })
86    }
87}
88
89/// Commit record.
90pub struct CommitRecord {
91    /// Row identifier.
92    pub row_id: i64,
93    /// Commit hash.
94    pub commit_hash: CommitHash,
95}
96
97impl TryFrom<CommitRow> for CommitRecord {
98    type Error = Error;
99
100    fn try_from(value: CommitRow) -> Result<Self, Self::Error> {
101        Ok(CommitRecord {
102            row_id: value.row_id,
103            commit_hash: CommitHash(value.commit_hash.as_slice().try_into()?),
104        })
105    }
106}
107
108/// Commit record row.
109#[derive(Debug, Default)]
110pub struct EventRecordRow {
111    /// Row identifier.
112    pub row_id: i64,
113    /// Row created date and time.
114    created_at: String,
115    /// Commit hash.
116    commit_hash: Vec<u8>,
117    /// Event bytes.
118    event_bytes: Vec<u8>,
119}
120
121impl EventRecordRow {
122    /// Create a new event record row for insertion.
123    pub fn new(record: &EventRecord) -> Result<Self, Error> {
124        Ok(Self {
125            created_at: record.time().to_rfc3339()?,
126            commit_hash: record.commit().as_ref().to_vec(),
127            event_bytes: record.event_bytes().to_vec(),
128            ..Default::default()
129        })
130    }
131}
132
133impl<'a> TryFrom<&Row<'a>> for EventRecordRow {
134    type Error = SqlError;
135    fn try_from(row: &Row<'a>) -> Result<Self, Self::Error> {
136        Ok(EventRecordRow {
137            row_id: row.get(0)?,
138            created_at: row.get(1)?,
139            commit_hash: row.get(2)?,
140            event_bytes: row.get(3)?,
141        })
142    }
143}
144
145impl TryFrom<EventRecordRow> for EventRecord {
146    type Error = Error;
147
148    fn try_from(value: EventRecordRow) -> Result<Self, Self::Error> {
149        Ok(EventRecord::new(
150            UtcDateTime::parse_rfc3339(&value.created_at)?,
151            Default::default(),
152            CommitHash(value.commit_hash.as_slice().try_into()?),
153            value.event_bytes,
154        ))
155    }
156}
157
158/// Event entity.
159pub struct EventEntity<'conn, C>
160where
161    C: Deref<Target = Connection>,
162{
163    conn: &'conn C,
164}
165
166impl<'conn> EventEntity<'conn, Box<Connection>> {
167    /// Query to find all events.
168    pub fn find_all_query(
169        log_type: EventLogType,
170        reverse: bool,
171    ) -> sql::Select {
172        let table: EventTable = log_type.into();
173        let mut query = event_select_columns(sql::Select::new())
174            .from(table.as_str())
175            .where_clause(&format!("{}=?1", table.id_column()));
176        if reverse {
177            query = query.order_by("event_id DESC");
178        } else {
179            query = query.order_by("event_id ASC");
180        }
181        query
182    }
183}
184
185impl<'conn, C> EventEntity<'conn, C>
186where
187    C: Deref<Target = Connection>,
188{
189    /// Create a new event entity.
190    pub fn new(conn: &'conn C) -> Self {
191        Self { conn }
192    }
193
194    /// Find an event record in the database.
195    pub fn find_one(
196        &self,
197        log_type: EventLogType,
198        event_id: i64,
199    ) -> Result<EventRecordRow, SqlError> {
200        let table: EventTable = log_type.into();
201        let query = event_select_columns(sql::Select::new())
202            .from(table.as_str())
203            .where_clause("event_id=?1");
204        let mut stmt = self.conn.prepare_cached(&query.as_string())?;
205        Ok(stmt.query_row([event_id], |row| Ok(row.try_into()?))?)
206    }
207
208    /// Delete an event from the database table.
209    pub fn delete_one(
210        &self,
211        log_type: EventLogType,
212        commit_hash: &CommitHash,
213    ) -> Result<(), SqlError> {
214        let table: EventTable = log_type.into();
215        let query = sql::Delete::new()
216            .delete_from(table.as_str())
217            .where_clause("commit_hash = ?1");
218        let mut stmt = self.conn.prepare_cached(&query.as_string())?;
219        stmt.execute([commit_hash.as_ref()])?;
220        Ok(())
221    }
222
223    /// Insert events into an event log table.
224    pub fn insert_events(
225        &self,
226        log_type: EventLogType,
227        account_or_folder_id: i64,
228        events: &[EventRecordRow],
229    ) -> Result<Vec<i64>, SqlError> {
230        let table: EventTable = log_type.into();
231        let query = sql::Insert::new()
232            .insert_into(&format!(
233                "{} ({}, created_at, commit_hash, event)",
234                table.as_str(),
235                table.id_column()
236            ))
237            .values("(?1, ?2, ?3, ?4)");
238        let stmt = self.conn.prepare_cached(&query.as_string())?;
239        self.create_events(stmt, account_or_folder_id, events)
240    }
241
242    /// Create account events in the database.
243    pub fn insert_account_events(
244        &self,
245        account_id: i64,
246        events: &[EventRecordRow],
247    ) -> Result<Vec<i64>, SqlError> {
248        self.insert_events(EventLogType::Account, account_id, events)
249    }
250
251    /// Create folder events in the database.
252    pub fn insert_folder_events(
253        &self,
254        folder_id: i64,
255        events: &[EventRecordRow],
256    ) -> Result<Vec<i64>, SqlError> {
257        self.insert_events(EventLogType::Identity, folder_id, events)
258    }
259
260    /// Create device events in the database.
261    pub fn insert_device_events(
262        &self,
263        account_id: i64,
264        events: &[EventRecordRow],
265    ) -> Result<Vec<i64>, SqlError> {
266        self.insert_events(EventLogType::Device, account_id, events)
267    }
268
269    /// Create file events in the database.
270    #[cfg(feature = "files")]
271    pub fn insert_file_events(
272        &self,
273        account_id: i64,
274        events: &[EventRecordRow],
275    ) -> Result<Vec<i64>, SqlError> {
276        self.insert_events(EventLogType::Files, account_id, events)
277    }
278
279    /// Load event records for a folder.
280    pub fn load_events(
281        &self,
282        log_type: EventLogType,
283        account_id: i64,
284        folder_id: Option<i64>,
285    ) -> crate::Result<Vec<EventRecordRow>> {
286        let id = folder_id.unwrap_or(account_id);
287        let table: EventTable = log_type.into();
288        let query = event_select_columns(sql::Select::new())
289            .from(table.as_str())
290            .where_clause(&format!("{}=?1", table.id_column()))
291            .order_by("event_id ASC");
292
293        let mut stmt = self.conn.prepare_cached(&query.as_string())?;
294
295        fn convert_row(
296            row: &Row<'_>,
297        ) -> Result<EventRecordRow, crate::Error> {
298            Ok(row.try_into()?)
299        }
300
301        let rows = stmt.query_and_then([id], |row| {
302            Ok::<_, crate::Error>(convert_row(row)?)
303        })?;
304
305        let mut events = Vec::new();
306        for row in rows {
307            events.push(row?);
308        }
309        Ok(events)
310    }
311
312    /// Load commits and identifiers for a folder.
313    pub fn load_commits(
314        &self,
315        log_type: EventLogType,
316        account_or_folder_id: i64,
317    ) -> crate::Result<Vec<CommitRow>> {
318        let table: EventTable = log_type.into();
319        let query = sql::Select::new()
320            .select("event_id, commit_hash")
321            .from(table.as_str())
322            .where_clause(&format!("{}=?1", table.id_column()))
323            .order_by("event_id ASC");
324
325        let mut stmt = self.conn.prepare_cached(&query.as_string())?;
326
327        fn convert_row(row: &Row<'_>) -> Result<CommitRow, crate::Error> {
328            Ok(row.try_into()?)
329        }
330
331        let rows = stmt.query_and_then([account_or_folder_id], |row| {
332            Ok::<_, crate::Error>(convert_row(row)?)
333        })?;
334
335        let mut commits = Vec::new();
336        for row in rows {
337            commits.push(row?);
338        }
339        Ok(commits)
340    }
341
342    /// Delete all event logs.
343    pub fn delete_all_events(
344        &self,
345        log_type: EventLogType,
346        account_or_folder_id: i64,
347    ) -> Result<usize, SqlError> {
348        let table: EventTable = log_type.into();
349        let query = sql::Delete::new()
350            .delete_from(table.as_str())
351            .where_clause(&format!("{}=?1", table.id_column()));
352        let mut stmt = self.conn.prepare_cached(&query.as_string())?;
353        Ok(stmt.execute([account_or_folder_id])?)
354    }
355
356    fn create_events(
357        &self,
358        mut stmt: CachedStatement<'_>,
359        id: i64,
360        events: &[EventRecordRow],
361    ) -> Result<Vec<i64>, SqlError> {
362        let mut ids = Vec::new();
363        for record in events {
364            stmt.execute((
365                &id,
366                &record.created_at,
367                &record.commit_hash,
368                &record.event_bytes,
369            ))?;
370            ids.push(self.conn.last_insert_rowid());
371        }
372        Ok(ids)
373    }
374}