sos_database/
event_log.rs

1//! Event log backed by a database table.
2//!
3//! Event logs can belong to an account or to a folder
4//! so we keep track of the owner of the event log for
5//! database queries.
6//!
7//! If you were to move a folder between accounts or otherwise
8//! re-owner an event log you must create a new event log so
9//! the owner reference is updated.
10use crate::{
11    entity::{
12        AccountEntity, CommitRecord, EventEntity, EventRecordRow,
13        FolderEntity, FolderRecord,
14    },
15    Error,
16};
17use async_sqlite::{rusqlite::Row, Client};
18use async_trait::async_trait;
19use binary_stream::futures::{Decodable, Encodable};
20use futures::{
21    pin_mut,
22    stream::{BoxStream, StreamExt, TryStreamExt},
23};
24use sos_core::{
25    commit::{CommitHash, CommitProof, CommitTree, Comparison},
26    encoding::VERSION1,
27    events::{
28        patch::{CheckedPatch, Diff, Patch},
29        AccountEvent, DeviceEvent, EventLog, EventLogType, EventRecord,
30        WriteEvent,
31    },
32    AccountId, VaultId,
33};
34
35/// Owner of an event log.
36#[derive(Clone)]
37#[doc(hidden)]
38pub enum EventLogOwner {
39    /// Event log owned by an account.
40    Account(i64),
41    /// Event log owned by a folder.
42    Folder(FolderRecord),
43}
44
45impl From<&EventLogOwner> for i64 {
46    fn from(value: &EventLogOwner) -> Self {
47        match value {
48            EventLogOwner::Account(id) => *id,
49            EventLogOwner::Folder(folder) => folder.row_id,
50        }
51    }
52}
53
54#[cfg(feature = "files")]
55use sos_core::events::FileEvent;
56use tokio_stream::wrappers::ReceiverStream;
57
58/// Event log for changes to an account.
59pub type AccountEventLog<E> = DatabaseEventLog<AccountEvent, E>;
60
61/// Event log for devices.
62pub type DeviceEventLog<E> = DatabaseEventLog<DeviceEvent, E>;
63
64/// Event log for changes to a folder.
65pub type FolderEventLog<E> = DatabaseEventLog<WriteEvent, E>;
66
67/// Event log for changes to external files.
68#[cfg(feature = "files")]
69pub type FileEventLog<E> = DatabaseEventLog<FileEvent, E>;
70
71/// Database event log.
72pub struct DatabaseEventLog<T, E>
73where
74    T: Default + Encodable + Decodable + Send + Sync,
75    E: std::error::Error
76        + std::fmt::Debug
77        + From<sos_core::Error>
78        + From<crate::Error>
79        + From<std::io::Error>
80        + Send
81        + Sync
82        + 'static,
83{
84    owner: EventLogOwner,
85    client: Client,
86    log_type: EventLogType,
87    tree: CommitTree,
88    marker: std::marker::PhantomData<(T, E)>,
89}
90
91impl<T, E> DatabaseEventLog<T, E>
92where
93    T: Default + Encodable + Decodable + Send + Sync,
94    E: std::error::Error
95        + std::fmt::Debug
96        + From<sos_core::Error>
97        + From<crate::Error>
98        + From<std::io::Error>
99        + Send
100        + Sync
101        + 'static,
102{
103    /// Create a copy of this event log using a fresh
104    /// commit tree and a different client.
105    ///
106    /// Typically used to create a clone using
107    /// a temporary in-memory database.
108    pub fn with_new_client(
109        &self,
110        client: Client,
111        owner: Option<EventLogOwner>,
112    ) -> Self {
113        Self {
114            owner: owner.unwrap_or_else(|| self.owner.clone()),
115            client,
116            log_type: self.log_type,
117            tree: CommitTree::new(),
118            marker: std::marker::PhantomData,
119        }
120    }
121
122    /// Lookup an owner for the event log.
123    async fn lookup_owner(
124        client: &Client,
125        account_id: &AccountId,
126        log_type: &EventLogType,
127    ) -> Result<EventLogOwner, Error> {
128        let account_id = *account_id;
129        let log_type = *log_type;
130        let result = client
131            .conn_and_then(move |conn| {
132                let account = AccountEntity::new(&conn);
133                let account_row = account.find_one(&account_id)?;
134                match log_type {
135                    EventLogType::Folder(folder_id) => {
136                        let folder = FolderEntity::new(&conn);
137                        let folder_row = folder.find_one(&folder_id)?;
138                        Ok::<_, Error>((account_row, Some(folder_row)))
139                    }
140                    _ => Ok::<_, Error>((account_row, None)),
141                }
142            })
143            .await?;
144
145        Ok(match result {
146            (account_row, None) => EventLogOwner::Account(account_row.row_id),
147            (_, Some(folder_row)) => EventLogOwner::Folder(
148                FolderRecord::from_row(folder_row).await?,
149            ),
150        })
151    }
152
153    async fn insert_records(
154        &mut self,
155        records: &[EventRecord],
156        delete_before: bool,
157    ) -> Result<(), E> {
158        let log_type = self.log_type.clone();
159        let mut insert_rows = Vec::new();
160        let mut commits = Vec::new();
161        for record in records {
162            commits.push(*record.commit());
163            insert_rows.push(EventRecordRow::new(&record)?);
164        }
165
166        let id = (&self.owner).into();
167
168        // Insert into the database.
169        self.client
170            .conn_mut(move |conn| {
171                let tx = conn.transaction()?;
172                let events = EventEntity::new(&tx);
173                if delete_before {
174                    events.delete_all_events(log_type, id)?;
175                }
176                let ids = events.insert_events(
177                    log_type,
178                    id,
179                    insert_rows.as_slice(),
180                )?;
181                tx.commit()?;
182                Ok(ids)
183            })
184            .await
185            .map_err(Error::from)?;
186
187        if delete_before {
188            self.tree = CommitTree::new();
189        }
190
191        // Update the in-memory merkle tree
192        let mut hashes =
193            commits.iter().map(|c| *c.as_ref()).collect::<Vec<_>>();
194        self.tree.append(&mut hashes);
195        self.tree.commit();
196
197        Ok(())
198    }
199}
200
201impl<E> DatabaseEventLog<AccountEvent, E>
202where
203    E: std::error::Error
204        + std::fmt::Debug
205        + From<sos_core::Error>
206        + From<crate::Error>
207        + From<std::io::Error>
208        + Send
209        + Sync
210        + 'static,
211{
212    /// Create a new account event log.
213    pub async fn new_account(
214        client: Client,
215        account_id: AccountId,
216    ) -> Result<Self, E> {
217        let log_type = EventLogType::Account;
218        let owner =
219            Self::lookup_owner(&client, &account_id, &log_type).await?;
220        Ok(Self {
221            owner,
222            client,
223            log_type,
224            tree: CommitTree::new(),
225            marker: std::marker::PhantomData,
226        })
227    }
228}
229
230impl<E> DatabaseEventLog<WriteEvent, E>
231where
232    E: std::error::Error
233        + std::fmt::Debug
234        + From<sos_core::Error>
235        + From<crate::Error>
236        + From<std::io::Error>
237        + Send
238        + Sync
239        + 'static,
240{
241    /// Create a new folder event log.
242    pub async fn new_folder(
243        client: Client,
244        account_id: AccountId,
245        folder_id: VaultId,
246    ) -> Result<Self, E> {
247        let log_type = EventLogType::Folder(folder_id);
248        let owner =
249            Self::lookup_owner(&client, &account_id, &log_type).await?;
250
251        Ok(Self {
252            owner,
253            client,
254            log_type,
255            tree: CommitTree::new(),
256            marker: std::marker::PhantomData,
257        })
258    }
259}
260
261impl<E> DatabaseEventLog<DeviceEvent, E>
262where
263    E: std::error::Error
264        + std::fmt::Debug
265        + From<sos_core::Error>
266        + From<crate::Error>
267        + From<std::io::Error>
268        + Send
269        + Sync
270        + 'static,
271{
272    /// Create a new device event log.
273    pub async fn new_device(
274        client: Client,
275        account_id: AccountId,
276    ) -> Result<Self, E> {
277        let log_type = EventLogType::Device;
278        let owner =
279            Self::lookup_owner(&client, &account_id, &log_type).await?;
280        Ok(Self {
281            owner,
282            client,
283            log_type,
284            tree: CommitTree::new(),
285            marker: std::marker::PhantomData,
286        })
287    }
288}
289
290#[cfg(feature = "files")]
291impl<E> DatabaseEventLog<FileEvent, E>
292where
293    E: std::error::Error
294        + std::fmt::Debug
295        + From<sos_core::Error>
296        + From<crate::Error>
297        + From<std::io::Error>
298        + Send
299        + Sync
300        + 'static,
301{
302    /// Create a new file event log.
303    pub async fn new_file(
304        client: Client,
305        account_id: AccountId,
306    ) -> Result<Self, Error> {
307        let log_type = EventLogType::Files;
308        let owner =
309            Self::lookup_owner(&client, &account_id, &log_type).await?;
310        Ok(Self {
311            owner,
312            client,
313            log_type,
314            tree: CommitTree::new(),
315            marker: std::marker::PhantomData,
316        })
317    }
318}
319
320#[async_trait]
321impl<T, E> EventLog<T> for DatabaseEventLog<T, E>
322where
323    T: Default + Encodable + Decodable + Send + Sync + 'static,
324    E: std::error::Error
325        + std::fmt::Debug
326        + From<sos_core::Error>
327        + From<crate::Error>
328        + From<std::io::Error>
329        + Send
330        + Sync
331        + 'static,
332{
333    type Error = E;
334
335    async fn record_stream(
336        &self,
337        reverse: bool,
338    ) -> BoxStream<'async_trait, Result<EventRecord, Self::Error>> {
339        let (tx, rx) = tokio::sync::mpsc::channel(8);
340
341        let id: i64 = (&self.owner).into();
342        let log_type = self.log_type.clone();
343        let client = self.client.clone();
344
345        tokio::spawn(async move {
346            client
347                .conn_and_then(move |conn| {
348                    let query =
349                        EventEntity::find_all_query(log_type, reverse);
350
351                    let mut stmt = conn.prepare_cached(&query.as_string())?;
352
353                    fn convert_row(
354                        row: &Row<'_>,
355                    ) -> Result<EventRecordRow, crate::Error>
356                    {
357                        Ok(row.try_into()?)
358                    }
359
360                    let rows = stmt.query_and_then([id], |row| {
361                        Ok::<_, crate::Error>(convert_row(row)?)
362                    })?;
363
364                    for row in rows {
365                        let row = row?;
366                        let record: EventRecord = row.try_into()?;
367                        let sender = tx.clone();
368                        futures::executor::block_on(async move {
369                            if let Err(err) = sender.send(Ok(record)).await {
370                                tracing::error!(error = %err);
371                            }
372                        });
373                    }
374
375                    Ok::<_, Error>(())
376                })
377                .await?;
378            Ok::<_, Error>(())
379        });
380
381        ReceiverStream::new(rx).boxed()
382    }
383
384    async fn event_stream(
385        &self,
386        reverse: bool,
387    ) -> BoxStream<'async_trait, Result<(EventRecord, T), Self::Error>> {
388        self.record_stream(reverse)
389            .await
390            .try_filter_map(|record| async {
391                let event = record.decode_event::<T>().await?;
392                Ok(Some((record, event)))
393            })
394            .boxed()
395    }
396
397    async fn diff_checked(
398        &self,
399        commit: Option<CommitHash>,
400        checkpoint: CommitProof,
401    ) -> Result<Diff<T>, Self::Error> {
402        let patch = self.diff_events(commit.as_ref()).await?;
403        Ok(Diff::<T> {
404            last_commit: commit,
405            patch,
406            checkpoint,
407        })
408    }
409
410    async fn diff_unchecked(&self) -> Result<Diff<T>, Self::Error> {
411        let patch = self.diff_events(None).await?;
412        Ok(Diff::<T> {
413            last_commit: None,
414            patch,
415            checkpoint: self.tree().head()?,
416        })
417    }
418
419    async fn diff_events(
420        &self,
421        commit: Option<&CommitHash>,
422    ) -> Result<Patch<T>, Self::Error> {
423        let records = self.diff_records(commit).await?;
424        Ok(Patch::new(records))
425    }
426
427    fn tree(&self) -> &CommitTree {
428        &self.tree
429    }
430
431    async fn rewind(
432        &mut self,
433        commit: &CommitHash,
434    ) -> Result<Vec<EventRecord>, Self::Error> {
435        let (records, tree) = {
436            let stream = self.record_stream(true).await;
437            pin_mut!(stream);
438
439            let mut records = Vec::new();
440            let mut tree = CommitTree::new();
441            let mut new_len = 0;
442
443            while let Some(record) = stream.next().await {
444                let record = record?;
445                if record.commit() == commit {
446                    let mut leaves = self.tree().leaves().unwrap_or_default();
447                    new_len = leaves.len() - records.len();
448                    leaves.truncate(new_len);
449
450                    tree.append(&mut leaves);
451                    tree.commit();
452
453                    break;
454                }
455                records.push(record);
456            }
457
458            if new_len == 0 {
459                return Err(Error::CommitNotFound(*commit).into());
460            }
461
462            (records, tree)
463        };
464
465        let delete_ids =
466            records.iter().map(|r| *r.commit()).collect::<Vec<_>>();
467
468        // Delete from the database
469        let log_type = self.log_type.clone();
470        self.client
471            .conn_mut(move |conn| {
472                let tx = conn.transaction()?;
473                let events = EventEntity::new(&tx);
474                for id in delete_ids {
475                    events.delete_one(log_type, &id)?;
476                }
477                tx.commit()?;
478                Ok(())
479            })
480            .await
481            .map_err(Error::from)?;
482
483        // Update merkle tree
484        self.tree = tree;
485
486        Ok(records)
487    }
488
489    async fn load_tree(&mut self) -> Result<(), Self::Error> {
490        let log_type = self.log_type.clone();
491        let id = (&self.owner).into();
492        let commits = self
493            .client
494            .conn_and_then(move |conn| {
495                let events = EventEntity::new(&conn);
496                let commits = events.load_commits(log_type, id)?;
497                Ok::<_, Error>(commits)
498            })
499            .await?;
500        let mut tree = CommitTree::new();
501        for commit in commits {
502            let record: CommitRecord = commit.try_into()?;
503            tree.insert(*record.commit_hash.as_ref());
504        }
505        tree.commit();
506        self.tree = tree;
507        Ok(())
508    }
509
510    async fn clear(&mut self) -> Result<(), Self::Error> {
511        let log_type = self.log_type.clone();
512        let id = (&self.owner).into();
513        self.client
514            .conn_mut(move |conn| {
515                let tx = conn.transaction()?;
516                let events = EventEntity::new(&tx);
517                events.delete_all_events(log_type, id)?;
518                tx.commit()?;
519                Ok(())
520            })
521            .await
522            .map_err(Error::from)?;
523        self.tree = CommitTree::new();
524        Ok(())
525    }
526
527    async fn apply(&mut self, events: &[T]) -> Result<(), Self::Error> {
528        let mut records = Vec::with_capacity(events.len());
529        for event in events {
530            records.push(EventRecord::encode_event(event).await?);
531        }
532        self.apply_records(records).await
533    }
534
535    async fn apply_records(
536        &mut self,
537        records: Vec<EventRecord>,
538    ) -> Result<(), Self::Error> {
539        self.insert_records(records.as_slice(), false).await
540    }
541
542    async fn patch_checked(
543        &mut self,
544        commit_proof: &CommitProof,
545        patch: &Patch<T>,
546    ) -> Result<CheckedPatch, Self::Error> {
547        let comparison = self.tree().compare(commit_proof)?;
548        match comparison {
549            Comparison::Equal => {
550                self.patch_unchecked(patch).await?;
551                let proof = self.tree().head()?;
552                Ok(CheckedPatch::Success(proof))
553            }
554            Comparison::Contains(indices) => {
555                let head = self.tree().head()?;
556                let contains = self.tree().proof(&indices)?;
557                Ok(CheckedPatch::Conflict {
558                    head,
559                    contains: Some(contains),
560                })
561            }
562            Comparison::Unknown => {
563                let head = self.tree().head()?;
564                Ok(CheckedPatch::Conflict {
565                    head,
566                    contains: None,
567                })
568            }
569        }
570    }
571
572    async fn replace_all_events(
573        &mut self,
574        diff: &Diff<T>,
575    ) -> Result<(), Self::Error> {
576        self.insert_records(diff.patch.records(), true).await?;
577
578        let computed = self.tree().head()?;
579        let verified = computed == diff.checkpoint;
580        if !verified {
581            return Err(Error::CheckpointVerification {
582                checkpoint: diff.checkpoint.root,
583                computed: computed.root,
584            }
585            .into());
586        }
587
588        Ok(())
589    }
590
591    async fn patch_unchecked(
592        &mut self,
593        patch: &Patch<T>,
594    ) -> Result<(), Self::Error> {
595        self.apply_records(patch.records().to_vec()).await
596    }
597
598    async fn diff_records(
599        &self,
600        commit: Option<&CommitHash>,
601    ) -> Result<Vec<EventRecord>, Self::Error> {
602        let mut events = Vec::new();
603
604        let stream = self.record_stream(true).await;
605        pin_mut!(stream);
606
607        while let Some(record) = stream.next().await {
608            let record = record?;
609            if let Some(commit) = commit {
610                if record.commit() == commit {
611                    return Ok(events);
612                }
613            }
614            // Iterating in reverse order as we would typically
615            // be looking for commits near the end of the event log
616            // but we want the patch events in the order they were
617            // appended so insert at the beginning to reverse the list
618            events.insert(0, record);
619        }
620
621        // If the caller wanted to patch until a particular commit
622        // but it doesn't exist we error otherwise we would return
623        // all the events
624        if let Some(commit) = commit {
625            return Err(Error::CommitNotFound(*commit).into());
626        }
627
628        Ok(events)
629    }
630
631    fn version(&self) -> u16 {
632        match &self.owner {
633            EventLogOwner::Folder(folder) => *folder.summary.version(),
634            EventLogOwner::Account(_) => VERSION1,
635        }
636    }
637}