sos_database/
event_log.rs

1//! Event log backed by a database table.
2//!
3//! Event logs can belong to an account or to a folder
4//! so we keep track of the owner of the event log for
5//! database queries.
6//!
7//! If you were to move a folder between accounts or otherwise
8//! re-owner an event log you must create a new event log so
9//! the owner reference is updated.
10use crate::{
11    entity::{
12        AccountEntity, CommitRecord, EventEntity, EventRecordRow,
13        FolderEntity, FolderRecord,
14    },
15    Error,
16};
17use async_sqlite::{rusqlite::Row, Client};
18use async_trait::async_trait;
19use binary_stream::futures::{Decodable, Encodable};
20use futures::{
21    pin_mut,
22    stream::{BoxStream, StreamExt, TryStreamExt},
23};
24use sos_core::{
25    commit::{CommitHash, CommitProof, CommitSpan, CommitTree, Comparison},
26    encoding::VERSION1,
27    events::{
28        changes_feed,
29        patch::{CheckedPatch, Diff, Patch},
30        AccountEvent, DeviceEvent, EventLog, EventLogType, EventRecord,
31        LocalChangeEvent, WriteEvent,
32    },
33    AccountId, VaultId,
34};
35
36/// Owner of an event log.
37#[derive(Clone)]
38#[doc(hidden)]
39pub enum EventLogOwner {
40    /// Event log owned by an account.
41    Account(AccountId, i64),
42    /// Event log owned by a folder.
43    Folder(AccountId, FolderRecord),
44}
45
46impl EventLogOwner {
47    /// Account idenifier.
48    pub fn account_id(&self) -> &AccountId {
49        match self {
50            EventLogOwner::Account(account_id, _) => account_id,
51            EventLogOwner::Folder(account_id, _) => account_id,
52        }
53    }
54}
55
56impl From<&EventLogOwner> for i64 {
57    fn from(value: &EventLogOwner) -> Self {
58        match value {
59            EventLogOwner::Account(_, id) => *id,
60            EventLogOwner::Folder(_, folder) => folder.row_id,
61        }
62    }
63}
64
65#[cfg(feature = "files")]
66use sos_core::events::FileEvent;
67use tokio_stream::wrappers::ReceiverStream;
68
69/// Event log for changes to an account.
70pub type AccountEventLog<E> = DatabaseEventLog<AccountEvent, E>;
71
72/// Event log for devices.
73pub type DeviceEventLog<E> = DatabaseEventLog<DeviceEvent, E>;
74
75/// Event log for changes to a folder.
76pub type FolderEventLog<E> = DatabaseEventLog<WriteEvent, E>;
77
78/// Event log for changes to external files.
79#[cfg(feature = "files")]
80pub type FileEventLog<E> = DatabaseEventLog<FileEvent, E>;
81
82/// Database event log.
83pub struct DatabaseEventLog<T, E>
84where
85    T: Default + Encodable + Decodable + Send + Sync,
86    E: std::error::Error
87        + std::fmt::Debug
88        + From<sos_core::Error>
89        + From<crate::Error>
90        + From<std::io::Error>
91        + Send
92        + Sync
93        + 'static,
94{
95    owner: EventLogOwner,
96    client: Client,
97    log_type: EventLogType,
98    tree: CommitTree,
99    marker: std::marker::PhantomData<(T, E)>,
100}
101
102impl<T, E> DatabaseEventLog<T, E>
103where
104    T: Default + Encodable + Decodable + Send + Sync,
105    E: std::error::Error
106        + std::fmt::Debug
107        + From<sos_core::Error>
108        + From<crate::Error>
109        + From<std::io::Error>
110        + Send
111        + Sync
112        + 'static,
113{
114    /// Create a copy of this event log using a fresh
115    /// commit tree and a different client.
116    ///
117    /// Typically used to create a clone using
118    /// a temporary in-memory database.
119    pub fn with_new_client(
120        &self,
121        client: Client,
122        owner: Option<EventLogOwner>,
123    ) -> Self {
124        Self {
125            owner: owner.unwrap_or_else(|| self.owner.clone()),
126            client,
127            log_type: self.log_type,
128            tree: CommitTree::new(),
129            marker: std::marker::PhantomData,
130        }
131    }
132
133    /// Lookup an owner for the event log.
134    async fn lookup_owner(
135        client: &Client,
136        account_id: &AccountId,
137        log_type: &EventLogType,
138    ) -> Result<EventLogOwner, Error> {
139        let account_id = *account_id;
140        let log_type = *log_type;
141        let result = client
142            .conn_and_then(move |conn| {
143                let account = AccountEntity::new(&conn);
144                let account_row = account.find_one(&account_id)?;
145                match log_type {
146                    EventLogType::Folder(folder_id) => {
147                        let folder = FolderEntity::new(&conn);
148                        let folder_row = folder.find_one(&folder_id)?;
149                        Ok::<_, Error>((account_row, Some(folder_row)))
150                    }
151                    _ => Ok::<_, Error>((account_row, None)),
152                }
153            })
154            .await?;
155
156        Ok(match result {
157            (account_row, None) => {
158                EventLogOwner::Account(account_id, account_row.row_id)
159            }
160            (_, Some(folder_row)) => EventLogOwner::Folder(
161                account_id,
162                FolderRecord::from_row(folder_row).await?,
163            ),
164        })
165    }
166
167    async fn insert_records(
168        &mut self,
169        records: &[EventRecord],
170        delete_before: bool,
171    ) -> Result<(), E> {
172        if records.is_empty() {
173            return Ok(());
174        }
175
176        let mut span = CommitSpan {
177            before: self.tree.last_commit(),
178            after: None,
179        };
180
181        let log_type = self.log_type;
182        let mut insert_rows = Vec::new();
183        let mut commits = Vec::new();
184        for record in records {
185            commits.push(*record.commit());
186            insert_rows.push(EventRecordRow::new(record)?);
187        }
188
189        let id = (&self.owner).into();
190
191        // Insert into the database.
192        self.client
193            .conn_mut(move |conn| {
194                let tx = conn.transaction()?;
195                let events = EventEntity::new(&tx);
196                if delete_before {
197                    events.delete_all_events(log_type, id)?;
198                }
199                let ids = events.insert_events(
200                    log_type,
201                    id,
202                    insert_rows.as_slice(),
203                )?;
204                tx.commit()?;
205                Ok(ids)
206            })
207            .await
208            .map_err(Error::from)?;
209
210        if delete_before {
211            self.tree = CommitTree::new();
212        }
213
214        // Update the in-memory merkle tree
215        let mut hashes =
216            commits.iter().map(|c| *c.as_ref()).collect::<Vec<_>>();
217        self.tree.append(&mut hashes);
218        self.tree.commit();
219
220        span.after = self.tree.last_commit();
221
222        changes_feed().send_replace(LocalChangeEvent::AccountModified {
223            account_id: *self.owner.account_id(),
224            log_type: self.log_type,
225            commit_span: span,
226        });
227
228        Ok(())
229    }
230}
231
232impl<E> DatabaseEventLog<AccountEvent, E>
233where
234    E: std::error::Error
235        + std::fmt::Debug
236        + From<sos_core::Error>
237        + From<crate::Error>
238        + From<std::io::Error>
239        + Send
240        + Sync
241        + 'static,
242{
243    /// Create a new account event log.
244    pub async fn new_account(
245        client: Client,
246        account_id: AccountId,
247    ) -> Result<Self, E> {
248        let log_type = EventLogType::Account;
249        let owner =
250            Self::lookup_owner(&client, &account_id, &log_type).await?;
251        Ok(Self {
252            owner,
253            client,
254            log_type,
255            tree: CommitTree::new(),
256            marker: std::marker::PhantomData,
257        })
258    }
259}
260
261impl<E> DatabaseEventLog<WriteEvent, E>
262where
263    E: std::error::Error
264        + std::fmt::Debug
265        + From<sos_core::Error>
266        + From<crate::Error>
267        + From<std::io::Error>
268        + Send
269        + Sync
270        + 'static,
271{
272    /// Create a new folder event log.
273    pub async fn new_folder(
274        client: Client,
275        account_id: AccountId,
276        folder_id: VaultId,
277    ) -> Result<Self, E> {
278        let log_type = EventLogType::Folder(folder_id);
279        let owner =
280            Self::lookup_owner(&client, &account_id, &log_type).await?;
281
282        Ok(Self {
283            owner,
284            client,
285            log_type,
286            tree: CommitTree::new(),
287            marker: std::marker::PhantomData,
288        })
289    }
290}
291
292impl<E> DatabaseEventLog<DeviceEvent, E>
293where
294    E: std::error::Error
295        + std::fmt::Debug
296        + From<sos_core::Error>
297        + From<crate::Error>
298        + From<std::io::Error>
299        + Send
300        + Sync
301        + 'static,
302{
303    /// Create a new device event log.
304    pub async fn new_device(
305        client: Client,
306        account_id: AccountId,
307    ) -> Result<Self, E> {
308        let log_type = EventLogType::Device;
309        let owner =
310            Self::lookup_owner(&client, &account_id, &log_type).await?;
311        Ok(Self {
312            owner,
313            client,
314            log_type,
315            tree: CommitTree::new(),
316            marker: std::marker::PhantomData,
317        })
318    }
319}
320
321#[cfg(feature = "files")]
322impl<E> DatabaseEventLog<FileEvent, E>
323where
324    E: std::error::Error
325        + std::fmt::Debug
326        + From<sos_core::Error>
327        + From<crate::Error>
328        + From<std::io::Error>
329        + Send
330        + Sync
331        + 'static,
332{
333    /// Create a new file event log.
334    pub async fn new_file(
335        client: Client,
336        account_id: AccountId,
337    ) -> Result<Self, Error> {
338        let log_type = EventLogType::Files;
339        let owner =
340            Self::lookup_owner(&client, &account_id, &log_type).await?;
341        Ok(Self {
342            owner,
343            client,
344            log_type,
345            tree: CommitTree::new(),
346            marker: std::marker::PhantomData,
347        })
348    }
349}
350
351#[async_trait]
352impl<T, E> EventLog<T> for DatabaseEventLog<T, E>
353where
354    T: Default + Encodable + Decodable + Send + Sync + 'static,
355    E: std::error::Error
356        + std::fmt::Debug
357        + From<sos_core::Error>
358        + From<crate::Error>
359        + From<std::io::Error>
360        + Send
361        + Sync
362        + 'static,
363{
364    type Error = E;
365
366    async fn record_stream(
367        &self,
368        reverse: bool,
369    ) -> BoxStream<'async_trait, Result<EventRecord, Self::Error>> {
370        let (tx, rx) = tokio::sync::mpsc::channel(8);
371
372        let id: i64 = (&self.owner).into();
373        let log_type = self.log_type;
374        let client = self.client.clone();
375
376        tokio::spawn(async move {
377            client
378                .conn_and_then(move |conn| {
379                    let query =
380                        EventEntity::find_all_query(log_type, reverse);
381
382                    let mut stmt = conn.prepare_cached(&query.as_string())?;
383
384                    fn convert_row(
385                        row: &Row<'_>,
386                    ) -> Result<EventRecordRow, crate::Error>
387                    {
388                        Ok(row.try_into()?)
389                    }
390
391                    let rows = stmt.query_and_then([id], |row| {
392                        convert_row(row)
393                    })?;
394
395                    for row in rows {
396                        if tx.is_closed() {
397                            break;
398                        }
399                        let row = row?;
400                        let record: EventRecord = row.try_into()?;
401                        let inner_tx = tx.clone();
402                        let res = futures::executor::block_on(async move {
403                            inner_tx.send(Ok(record)).await
404                        });
405                        if let Err(e) = res {
406                            tracing::error!(error = %e);
407                            break;
408                        }
409                    }
410
411                    Ok::<_, Error>(())
412                })
413                .await?;
414            Ok::<_, Error>(())
415        });
416
417        ReceiverStream::new(rx).boxed()
418    }
419
420    async fn event_stream(
421        &self,
422        reverse: bool,
423    ) -> BoxStream<'async_trait, Result<(EventRecord, T), Self::Error>> {
424        self.record_stream(reverse)
425            .await
426            .try_filter_map(|record| async {
427                let event = record.decode_event::<T>().await?;
428                Ok(Some((record, event)))
429            })
430            .boxed()
431    }
432
433    async fn diff_checked(
434        &self,
435        commit: Option<CommitHash>,
436        checkpoint: CommitProof,
437    ) -> Result<Diff<T>, Self::Error> {
438        let patch = self.diff_events(commit.as_ref()).await?;
439        Ok(Diff::<T> {
440            last_commit: commit,
441            patch,
442            checkpoint,
443        })
444    }
445
446    async fn diff_unchecked(&self) -> Result<Diff<T>, Self::Error> {
447        let patch = self.diff_events(None).await?;
448        Ok(Diff::<T> {
449            last_commit: None,
450            patch,
451            checkpoint: self.tree().head()?,
452        })
453    }
454
455    async fn diff_events(
456        &self,
457        commit: Option<&CommitHash>,
458    ) -> Result<Patch<T>, Self::Error> {
459        let records = self.diff_records(commit).await?;
460        Ok(Patch::new(records))
461    }
462
463    fn tree(&self) -> &CommitTree {
464        &self.tree
465    }
466
467    async fn rewind(
468        &mut self,
469        commit: &CommitHash,
470    ) -> Result<Vec<EventRecord>, Self::Error> {
471        let (records, tree) = {
472            let stream = self.record_stream(true).await;
473            pin_mut!(stream);
474
475            let mut records = Vec::new();
476            let mut tree = CommitTree::new();
477            let mut new_len = 0;
478
479            while let Some(record) = stream.next().await {
480                let record = record?;
481                if record.commit() == commit {
482                    let mut leaves = self.tree().leaves().unwrap_or_default();
483                    new_len = leaves.len() - records.len();
484                    leaves.truncate(new_len);
485
486                    tree.append(&mut leaves);
487                    tree.commit();
488
489                    break;
490                }
491                records.push(record);
492            }
493
494            if new_len == 0 {
495                return Err(Error::CommitNotFound(*commit).into());
496            }
497
498            (records, tree)
499        };
500
501        let delete_ids =
502            records.iter().map(|r| *r.commit()).collect::<Vec<_>>();
503
504        // Delete from the database
505        let log_type = self.log_type;
506        self.client
507            .conn_mut(move |conn| {
508                let tx = conn.transaction()?;
509                let events = EventEntity::new(&tx);
510                for id in delete_ids {
511                    events.delete_one(log_type, &id)?;
512                }
513                tx.commit()?;
514                Ok(())
515            })
516            .await
517            .map_err(Error::from)?;
518
519        // Update merkle tree
520        self.tree = tree;
521
522        Ok(records)
523    }
524
525    async fn load_tree(&mut self) -> Result<(), Self::Error> {
526        let log_type = self.log_type;
527        let id = (&self.owner).into();
528        let commits = self
529            .client
530            .conn_and_then(move |conn| {
531                let events = EventEntity::new(&conn);
532                let commits = events.load_commits(log_type, id)?;
533                Ok::<_, Error>(commits)
534            })
535            .await?;
536        let mut tree = CommitTree::new();
537        for commit in commits {
538            let record: CommitRecord = commit.try_into()?;
539            tree.insert(*record.commit_hash.as_ref());
540        }
541        tree.commit();
542        self.tree = tree;
543        Ok(())
544    }
545
546    async fn clear(&mut self) -> Result<(), Self::Error> {
547        let log_type = self.log_type;
548        let id = (&self.owner).into();
549        self.client
550            .conn_mut(move |conn| {
551                let tx = conn.transaction()?;
552                let events = EventEntity::new(&tx);
553                events.delete_all_events(log_type, id)?;
554                tx.commit()?;
555                Ok(())
556            })
557            .await
558            .map_err(Error::from)?;
559        self.tree = CommitTree::new();
560        Ok(())
561    }
562
563    async fn apply(&mut self, events: &[T]) -> Result<(), Self::Error> {
564        let mut records = Vec::with_capacity(events.len());
565        for event in events {
566            records.push(EventRecord::encode_event(event).await?);
567        }
568        self.apply_records(records).await
569    }
570
571    async fn apply_records(
572        &mut self,
573        records: Vec<EventRecord>,
574    ) -> Result<(), Self::Error> {
575        self.insert_records(records.as_slice(), false).await
576    }
577
578    async fn patch_checked(
579        &mut self,
580        commit_proof: &CommitProof,
581        patch: &Patch<T>,
582    ) -> Result<CheckedPatch, Self::Error> {
583        let comparison = self.tree().compare(commit_proof)?;
584        match comparison {
585            Comparison::Equal => {
586                self.patch_unchecked(patch).await?;
587                let proof = self.tree().head()?;
588                Ok(CheckedPatch::Success(proof))
589            }
590            Comparison::Contains(indices) => {
591                let head = self.tree().head()?;
592                let contains = self.tree().proof(&indices)?;
593                Ok(CheckedPatch::Conflict {
594                    head,
595                    contains: Some(contains),
596                })
597            }
598            Comparison::Unknown => {
599                let head = self.tree().head()?;
600                Ok(CheckedPatch::Conflict {
601                    head,
602                    contains: None,
603                })
604            }
605        }
606    }
607
608    async fn replace_all_events(
609        &mut self,
610        diff: &Diff<T>,
611    ) -> Result<(), Self::Error> {
612        self.insert_records(diff.patch.records(), true).await?;
613
614        let computed = self.tree().head()?;
615        let verified = computed == diff.checkpoint;
616        if !verified {
617            return Err(Error::CheckpointVerification {
618                checkpoint: diff.checkpoint.root,
619                computed: computed.root,
620            }
621            .into());
622        }
623
624        Ok(())
625    }
626
627    async fn patch_unchecked(
628        &mut self,
629        patch: &Patch<T>,
630    ) -> Result<(), Self::Error> {
631        self.apply_records(patch.records().to_vec()).await
632    }
633
634    async fn diff_records(
635        &self,
636        commit: Option<&CommitHash>,
637    ) -> Result<Vec<EventRecord>, Self::Error> {
638        let mut events = Vec::new();
639
640        let stream = self.record_stream(true).await;
641        pin_mut!(stream);
642
643        while let Some(record) = stream.next().await {
644            let record = record?;
645            if let Some(commit) = commit {
646                if record.commit() == commit {
647                    return Ok(events);
648                }
649            }
650            // Iterating in reverse order as we would typically
651            // be looking for commits near the end of the event log
652            // but we want the patch events in the order they were
653            // appended so insert at the beginning to reverse the list
654            events.insert(0, record);
655        }
656
657        // If the caller wanted to patch until a particular commit
658        // but it doesn't exist we error otherwise we would return
659        // all the events
660        if let Some(commit) = commit {
661            return Err(Error::CommitNotFound(*commit).into());
662        }
663
664        Ok(events)
665    }
666
667    fn version(&self) -> u16 {
668        match &self.owner {
669            EventLogOwner::Folder(_, folder) => *folder.summary.version(),
670            EventLogOwner::Account(_, _) => VERSION1,
671        }
672    }
673}