sos_database/
event_log.rs

1//! Event log backed by a database table.
2//!
3//! Event logs can belong to an account or to a folder
4//! so we keep track of the owner of the event log for
5//! database queries.
6//!
7//! If you were to move a folder between accounts or otherwise
8//! re-owner an event log you must create a new event log so
9//! the owner reference is updated.
10use crate::{
11    entity::{
12        AccountEntity, CommitRecord, EventEntity, EventRecordRow,
13        FolderEntity, FolderRecord,
14    },
15    Error,
16};
17use async_sqlite::{rusqlite::Row, Client};
18use async_trait::async_trait;
19use binary_stream::futures::{Decodable, Encodable};
20use futures::{
21    pin_mut,
22    stream::{BoxStream, StreamExt, TryStreamExt},
23};
24use sos_core::{
25    commit::{CommitHash, CommitProof, CommitSpan, CommitTree, Comparison},
26    encoding::VERSION1,
27    events::{
28        changes_feed,
29        patch::{CheckedPatch, Diff, Patch},
30        AccountEvent, DeviceEvent, EventLog, EventLogType, EventRecord,
31        LocalChangeEvent, WriteEvent,
32    },
33    AccountId, VaultId,
34};
35
36/// Owner of an event log.
37#[derive(Clone)]
38#[doc(hidden)]
39pub enum EventLogOwner {
40    /// Event log owned by an account.
41    Account(AccountId, i64),
42    /// Event log owned by a folder.
43    Folder(AccountId, FolderRecord),
44}
45
46impl EventLogOwner {
47    /// Account idenifier.
48    pub fn account_id(&self) -> &AccountId {
49        match self {
50            EventLogOwner::Account(account_id, _) => account_id,
51            EventLogOwner::Folder(account_id, _) => account_id,
52        }
53    }
54}
55
56impl From<&EventLogOwner> for i64 {
57    fn from(value: &EventLogOwner) -> Self {
58        match value {
59            EventLogOwner::Account(_, id) => *id,
60            EventLogOwner::Folder(_, folder) => folder.row_id,
61        }
62    }
63}
64
65#[cfg(feature = "files")]
66use sos_core::events::FileEvent;
67use tokio_stream::wrappers::ReceiverStream;
68
69/// Event log for changes to an account.
70pub type AccountEventLog<E> = DatabaseEventLog<AccountEvent, E>;
71
72/// Event log for devices.
73pub type DeviceEventLog<E> = DatabaseEventLog<DeviceEvent, E>;
74
75/// Event log for changes to a folder.
76pub type FolderEventLog<E> = DatabaseEventLog<WriteEvent, E>;
77
78/// Event log for changes to external files.
79#[cfg(feature = "files")]
80pub type FileEventLog<E> = DatabaseEventLog<FileEvent, E>;
81
82/// Database event log.
83pub struct DatabaseEventLog<T, E>
84where
85    T: Default + Encodable + Decodable + Send + Sync,
86    E: std::error::Error
87        + std::fmt::Debug
88        + From<sos_core::Error>
89        + From<crate::Error>
90        + From<std::io::Error>
91        + Send
92        + Sync
93        + 'static,
94{
95    owner: EventLogOwner,
96    client: Client,
97    log_type: EventLogType,
98    tree: CommitTree,
99    marker: std::marker::PhantomData<(T, E)>,
100}
101
102impl<T, E> DatabaseEventLog<T, E>
103where
104    T: Default + Encodable + Decodable + Send + Sync,
105    E: std::error::Error
106        + std::fmt::Debug
107        + From<sos_core::Error>
108        + From<crate::Error>
109        + From<std::io::Error>
110        + Send
111        + Sync
112        + 'static,
113{
114    /// Create a copy of this event log using a fresh
115    /// commit tree and a different client.
116    ///
117    /// Typically used to create a clone using
118    /// a temporary in-memory database.
119    pub fn with_new_client(
120        &self,
121        client: Client,
122        owner: Option<EventLogOwner>,
123    ) -> Self {
124        Self {
125            owner: owner.unwrap_or_else(|| self.owner.clone()),
126            client,
127            log_type: self.log_type,
128            tree: CommitTree::new(),
129            marker: std::marker::PhantomData,
130        }
131    }
132
133    /// Lookup an owner for the event log.
134    async fn lookup_owner(
135        client: &Client,
136        account_id: &AccountId,
137        log_type: &EventLogType,
138    ) -> Result<EventLogOwner, Error> {
139        let account_id = *account_id;
140        let log_type = *log_type;
141        let result = client
142            .conn_and_then(move |conn| {
143                let account = AccountEntity::new(&conn);
144                let account_row = account.find_one(&account_id)?;
145                match log_type {
146                    EventLogType::Folder(folder_id) => {
147                        let folder = FolderEntity::new(&conn);
148                        let folder_row = folder.find_one(&folder_id)?;
149                        Ok::<_, Error>((account_row, Some(folder_row)))
150                    }
151                    _ => Ok::<_, Error>((account_row, None)),
152                }
153            })
154            .await?;
155
156        Ok(match result {
157            (account_row, None) => {
158                EventLogOwner::Account(account_id, account_row.row_id)
159            }
160            (_, Some(folder_row)) => EventLogOwner::Folder(
161                account_id,
162                FolderRecord::from_row(folder_row).await?,
163            ),
164        })
165    }
166
167    async fn insert_records(
168        &mut self,
169        records: &[EventRecord],
170        delete_before: bool,
171    ) -> Result<(), E> {
172        if records.is_empty() {
173            return Ok(());
174        }
175
176        let mut span = CommitSpan {
177            before: self.tree.last_commit(),
178            after: None,
179        };
180
181        let log_type = self.log_type;
182        let mut insert_rows = Vec::new();
183        let mut commits = Vec::new();
184        for record in records {
185            commits.push(*record.commit());
186            insert_rows.push(EventRecordRow::new(record)?);
187        }
188
189        let id = (&self.owner).into();
190
191        // Insert into the database.
192        self.client
193            .conn_mut(move |conn| {
194                let tx = conn.transaction()?;
195                let events = EventEntity::new(&tx);
196                if delete_before {
197                    events.delete_all_events(log_type, id)?;
198                }
199                let ids = events.insert_events(
200                    log_type,
201                    id,
202                    insert_rows.as_slice(),
203                )?;
204                tx.commit()?;
205                Ok(ids)
206            })
207            .await
208            .map_err(Error::from)?;
209
210        if delete_before {
211            self.tree = CommitTree::new();
212        }
213
214        // Update the in-memory merkle tree
215        let mut hashes =
216            commits.iter().map(|c| *c.as_ref()).collect::<Vec<_>>();
217        self.tree.append(&mut hashes);
218        self.tree.commit();
219
220        span.after = self.tree.last_commit();
221
222        changes_feed().send_replace(LocalChangeEvent::AccountModified {
223            account_id: *self.owner.account_id(),
224            log_type: self.log_type,
225            commit_span: span,
226        });
227
228        Ok(())
229    }
230}
231
232impl<E> DatabaseEventLog<AccountEvent, E>
233where
234    E: std::error::Error
235        + std::fmt::Debug
236        + From<sos_core::Error>
237        + From<crate::Error>
238        + From<std::io::Error>
239        + Send
240        + Sync
241        + 'static,
242{
243    /// Create a new account event log.
244    pub async fn new_account(
245        client: Client,
246        account_id: AccountId,
247    ) -> Result<Self, E> {
248        let log_type = EventLogType::Account;
249        let owner =
250            Self::lookup_owner(&client, &account_id, &log_type).await?;
251        Ok(Self {
252            owner,
253            client,
254            log_type,
255            tree: CommitTree::new(),
256            marker: std::marker::PhantomData,
257        })
258    }
259}
260
261impl<E> DatabaseEventLog<WriteEvent, E>
262where
263    E: std::error::Error
264        + std::fmt::Debug
265        + From<sos_core::Error>
266        + From<crate::Error>
267        + From<std::io::Error>
268        + Send
269        + Sync
270        + 'static,
271{
272    /// Create a new folder event log.
273    pub async fn new_folder(
274        client: Client,
275        account_id: AccountId,
276        folder_id: VaultId,
277    ) -> Result<Self, E> {
278        let log_type = EventLogType::Folder(folder_id);
279        let owner =
280            Self::lookup_owner(&client, &account_id, &log_type).await?;
281
282        Ok(Self {
283            owner,
284            client,
285            log_type,
286            tree: CommitTree::new(),
287            marker: std::marker::PhantomData,
288        })
289    }
290}
291
292impl<E> DatabaseEventLog<DeviceEvent, E>
293where
294    E: std::error::Error
295        + std::fmt::Debug
296        + From<sos_core::Error>
297        + From<crate::Error>
298        + From<std::io::Error>
299        + Send
300        + Sync
301        + 'static,
302{
303    /// Create a new device event log.
304    pub async fn new_device(
305        client: Client,
306        account_id: AccountId,
307    ) -> Result<Self, E> {
308        let log_type = EventLogType::Device;
309        let owner =
310            Self::lookup_owner(&client, &account_id, &log_type).await?;
311        Ok(Self {
312            owner,
313            client,
314            log_type,
315            tree: CommitTree::new(),
316            marker: std::marker::PhantomData,
317        })
318    }
319}
320
321#[cfg(feature = "files")]
322impl<E> DatabaseEventLog<FileEvent, E>
323where
324    E: std::error::Error
325        + std::fmt::Debug
326        + From<sos_core::Error>
327        + From<crate::Error>
328        + From<std::io::Error>
329        + Send
330        + Sync
331        + 'static,
332{
333    /// Create a new file event log.
334    pub async fn new_file(
335        client: Client,
336        account_id: AccountId,
337    ) -> Result<Self, Error> {
338        let log_type = EventLogType::Files;
339        let owner =
340            Self::lookup_owner(&client, &account_id, &log_type).await?;
341        Ok(Self {
342            owner,
343            client,
344            log_type,
345            tree: CommitTree::new(),
346            marker: std::marker::PhantomData,
347        })
348    }
349}
350
351#[async_trait]
352impl<T, E> EventLog<T> for DatabaseEventLog<T, E>
353where
354    T: Default + Encodable + Decodable + Send + Sync + 'static,
355    E: std::error::Error
356        + std::fmt::Debug
357        + From<sos_core::Error>
358        + From<crate::Error>
359        + From<std::io::Error>
360        + Send
361        + Sync
362        + 'static,
363{
364    type Error = E;
365
366    async fn record_stream(
367        &self,
368        reverse: bool,
369    ) -> BoxStream<'async_trait, Result<EventRecord, Self::Error>> {
370        let (tx, rx) = tokio::sync::mpsc::channel(8);
371
372        let id: i64 = (&self.owner).into();
373        let log_type = self.log_type;
374        let client = self.client.clone();
375
376        tokio::spawn(async move {
377            client
378                .conn_and_then(move |conn| {
379                    let query =
380                        EventEntity::find_all_query(log_type, reverse);
381
382                    let mut stmt = conn.prepare_cached(&query.as_string())?;
383
384                    fn convert_row(
385                        row: &Row<'_>,
386                    ) -> Result<EventRecordRow, crate::Error>
387                    {
388                        Ok(row.try_into()?)
389                    }
390
391                    let rows = stmt.query_and_then([id], convert_row)?;
392
393                    for row in rows {
394                        if tx.is_closed() {
395                            break;
396                        }
397                        let row = row?;
398                        let record: EventRecord = row.try_into()?;
399                        let inner_tx = tx.clone();
400                        let res = futures::executor::block_on(async move {
401                            inner_tx.send(Ok(record)).await
402                        });
403                        if let Err(e) = res {
404                            tracing::error!(error = %e);
405                            break;
406                        }
407                    }
408
409                    Ok::<_, Error>(())
410                })
411                .await?;
412            Ok::<_, Error>(())
413        });
414
415        ReceiverStream::new(rx).boxed()
416    }
417
418    async fn event_stream(
419        &self,
420        reverse: bool,
421    ) -> BoxStream<'async_trait, Result<(EventRecord, T), Self::Error>> {
422        self.record_stream(reverse)
423            .await
424            .try_filter_map(|record| async {
425                let event = record.decode_event::<T>().await?;
426                Ok(Some((record, event)))
427            })
428            .boxed()
429    }
430
431    async fn diff_checked(
432        &self,
433        commit: Option<CommitHash>,
434        checkpoint: CommitProof,
435    ) -> Result<Diff<T>, Self::Error> {
436        let patch = self.diff_events(commit.as_ref()).await?;
437        Ok(Diff::<T> {
438            last_commit: commit,
439            patch,
440            checkpoint,
441        })
442    }
443
444    async fn diff_unchecked(&self) -> Result<Diff<T>, Self::Error> {
445        let patch = self.diff_events(None).await?;
446        Ok(Diff::<T> {
447            last_commit: None,
448            patch,
449            checkpoint: self.tree().head()?,
450        })
451    }
452
453    async fn diff_events(
454        &self,
455        commit: Option<&CommitHash>,
456    ) -> Result<Patch<T>, Self::Error> {
457        let records = self.diff_records(commit).await?;
458        Ok(Patch::new(records))
459    }
460
461    fn tree(&self) -> &CommitTree {
462        &self.tree
463    }
464
465    async fn rewind(
466        &mut self,
467        commit: &CommitHash,
468    ) -> Result<Vec<EventRecord>, Self::Error> {
469        let (records, tree) = {
470            let stream = self.record_stream(true).await;
471            pin_mut!(stream);
472
473            let mut records = Vec::new();
474            let mut tree = CommitTree::new();
475            let mut new_len = 0;
476
477            while let Some(record) = stream.next().await {
478                let record = record?;
479                if record.commit() == commit {
480                    let mut leaves = self.tree().leaves().unwrap_or_default();
481                    new_len = leaves.len() - records.len();
482                    leaves.truncate(new_len);
483
484                    tree.append(&mut leaves);
485                    tree.commit();
486
487                    break;
488                }
489                records.push(record);
490            }
491
492            if new_len == 0 {
493                return Err(Error::CommitNotFound(*commit).into());
494            }
495
496            (records, tree)
497        };
498
499        let delete_ids =
500            records.iter().map(|r| *r.commit()).collect::<Vec<_>>();
501
502        // Delete from the database
503        let log_type = self.log_type;
504        self.client
505            .conn_mut(move |conn| {
506                let tx = conn.transaction()?;
507                let events = EventEntity::new(&tx);
508                for id in delete_ids {
509                    events.delete_one(log_type, &id)?;
510                }
511                tx.commit()?;
512                Ok(())
513            })
514            .await
515            .map_err(Error::from)?;
516
517        // Update merkle tree
518        self.tree = tree;
519
520        Ok(records)
521    }
522
523    async fn load_tree(&mut self) -> Result<(), Self::Error> {
524        let log_type = self.log_type;
525        let id = (&self.owner).into();
526        let commits = self
527            .client
528            .conn_and_then(move |conn| {
529                let events = EventEntity::new(&conn);
530                let commits = events.load_commits(log_type, id)?;
531                Ok::<_, Error>(commits)
532            })
533            .await?;
534        let mut tree = CommitTree::new();
535        for commit in commits {
536            let record: CommitRecord = commit.try_into()?;
537            tree.insert(*record.commit_hash.as_ref());
538        }
539        tree.commit();
540        self.tree = tree;
541        Ok(())
542    }
543
544    async fn clear(&mut self) -> Result<(), Self::Error> {
545        let log_type = self.log_type;
546        let id = (&self.owner).into();
547        self.client
548            .conn_mut(move |conn| {
549                let tx = conn.transaction()?;
550                let events = EventEntity::new(&tx);
551                events.delete_all_events(log_type, id)?;
552                tx.commit()?;
553                Ok(())
554            })
555            .await
556            .map_err(Error::from)?;
557        self.tree = CommitTree::new();
558        Ok(())
559    }
560
561    async fn apply(&mut self, events: &[T]) -> Result<(), Self::Error> {
562        let mut records = Vec::with_capacity(events.len());
563        for event in events {
564            records.push(EventRecord::encode_event(event).await?);
565        }
566        self.apply_records(records).await
567    }
568
569    async fn apply_records(
570        &mut self,
571        records: Vec<EventRecord>,
572    ) -> Result<(), Self::Error> {
573        self.insert_records(records.as_slice(), false).await
574    }
575
576    async fn patch_checked(
577        &mut self,
578        commit_proof: &CommitProof,
579        patch: &Patch<T>,
580    ) -> Result<CheckedPatch, Self::Error> {
581        let comparison = self.tree().compare(commit_proof)?;
582        match comparison {
583            Comparison::Equal => {
584                self.patch_unchecked(patch).await?;
585                let proof = self.tree().head()?;
586                Ok(CheckedPatch::Success(proof))
587            }
588            Comparison::Contains(indices) => {
589                let head = self.tree().head()?;
590                let contains = self.tree().proof(&indices)?;
591                Ok(CheckedPatch::Conflict {
592                    head,
593                    contains: Some(contains),
594                })
595            }
596            Comparison::Unknown => {
597                let head = self.tree().head()?;
598                Ok(CheckedPatch::Conflict {
599                    head,
600                    contains: None,
601                })
602            }
603        }
604    }
605
606    async fn replace_all_events(
607        &mut self,
608        diff: &Diff<T>,
609    ) -> Result<(), Self::Error> {
610        self.insert_records(diff.patch.records(), true).await?;
611
612        let computed = self.tree().head()?;
613        let verified = computed == diff.checkpoint;
614        if !verified {
615            return Err(Error::CheckpointVerification {
616                checkpoint: diff.checkpoint.root,
617                computed: computed.root,
618            }
619            .into());
620        }
621
622        Ok(())
623    }
624
625    async fn patch_unchecked(
626        &mut self,
627        patch: &Patch<T>,
628    ) -> Result<(), Self::Error> {
629        self.apply_records(patch.records().to_vec()).await
630    }
631
632    async fn diff_records(
633        &self,
634        commit: Option<&CommitHash>,
635    ) -> Result<Vec<EventRecord>, Self::Error> {
636        let mut events = Vec::new();
637
638        let stream = self.record_stream(true).await;
639        pin_mut!(stream);
640
641        while let Some(record) = stream.next().await {
642            let record = record?;
643            if let Some(commit) = commit {
644                if record.commit() == commit {
645                    return Ok(events);
646                }
647            }
648            // Iterating in reverse order as we would typically
649            // be looking for commits near the end of the event log
650            // but we want the patch events in the order they were
651            // appended so insert at the beginning to reverse the list
652            events.insert(0, record);
653        }
654
655        // If the caller wanted to patch until a particular commit
656        // but it doesn't exist we error otherwise we would return
657        // all the events
658        if let Some(commit) = commit {
659            return Err(Error::CommitNotFound(*commit).into());
660        }
661
662        Ok(events)
663    }
664
665    fn version(&self) -> u16 {
666        match &self.owner {
667            EventLogOwner::Folder(_, folder) => *folder.summary.version(),
668            EventLogOwner::Account(_, _) => VERSION1,
669        }
670    }
671}