sos_database/
event_log.rs

1//! Event log backed by a database table.
2//!
3//! Event logs can belong to an account or to a folder
4//! so we keep track of the owner of the event log for
5//! database queries.
6//!
7//! If you were to move a folder between accounts or otherwise
8//! re-owner an event log you must create a new event log so
9//! the owner reference is updated.
10use crate::{
11    entity::{
12        AccountEntity, CommitRecord, EventEntity, EventRecordRow,
13        FolderEntity, FolderRecord,
14    },
15    Error,
16};
17use async_sqlite::{rusqlite::Row, Client};
18use async_trait::async_trait;
19use binary_stream::futures::{Decodable, Encodable};
20use futures::{
21    pin_mut,
22    stream::{BoxStream, StreamExt, TryStreamExt},
23};
24use sos_core::{
25    commit::{CommitHash, CommitProof, CommitTree, Comparison},
26    encoding::VERSION1,
27    events::{
28        patch::{CheckedPatch, Diff, Patch},
29        AccountEvent, DeviceEvent, EventLog, EventLogType, EventRecord,
30        WriteEvent,
31    },
32    AccountId, VaultId,
33};
34
35/// Owner of an event log.
36#[derive(Clone)]
37#[doc(hidden)]
38pub enum EventLogOwner {
39    /// Event log owned by an account.
40    Account(i64),
41    /// Event log owned by a folder.
42    Folder(FolderRecord),
43}
44
45impl From<&EventLogOwner> for i64 {
46    fn from(value: &EventLogOwner) -> Self {
47        match value {
48            EventLogOwner::Account(id) => *id,
49            EventLogOwner::Folder(folder) => folder.row_id,
50        }
51    }
52}
53
54#[cfg(feature = "files")]
55use sos_core::events::FileEvent;
56use tokio_stream::wrappers::ReceiverStream;
57
58/// Event log for changes to an account.
59pub type AccountEventLog<E> = DatabaseEventLog<AccountEvent, E>;
60
61/// Event log for devices.
62pub type DeviceEventLog<E> = DatabaseEventLog<DeviceEvent, E>;
63
64/// Event log for changes to a folder.
65pub type FolderEventLog<E> = DatabaseEventLog<WriteEvent, E>;
66
67/// Event log for changes to external files.
68#[cfg(feature = "files")]
69pub type FileEventLog<E> = DatabaseEventLog<FileEvent, E>;
70
71/// Database event log.
72pub struct DatabaseEventLog<T, E>
73where
74    T: Default + Encodable + Decodable + Send + Sync,
75    E: std::error::Error
76        + std::fmt::Debug
77        + From<sos_core::Error>
78        + From<crate::Error>
79        + From<std::io::Error>
80        + Send
81        + Sync
82        + 'static,
83{
84    owner: EventLogOwner,
85    client: Client,
86    log_type: EventLogType,
87    tree: CommitTree,
88    marker: std::marker::PhantomData<(T, E)>,
89}
90
91impl<T, E> DatabaseEventLog<T, E>
92where
93    T: Default + Encodable + Decodable + Send + Sync,
94    E: std::error::Error
95        + std::fmt::Debug
96        + From<sos_core::Error>
97        + From<crate::Error>
98        + From<std::io::Error>
99        + Send
100        + Sync
101        + 'static,
102{
103    /// Create a copy of this event log using a fresh
104    /// commit tree and a different client.
105    ///
106    /// Typically used to create a clone using
107    /// a temporary in-memory database.
108    pub fn with_new_client(
109        &self,
110        client: Client,
111        owner: Option<EventLogOwner>,
112    ) -> Self {
113        Self {
114            owner: owner.unwrap_or_else(|| self.owner.clone()),
115            client,
116            log_type: self.log_type,
117            tree: CommitTree::new(),
118            marker: std::marker::PhantomData,
119        }
120    }
121
122    /// Lookup an owner for the event log.
123    async fn lookup_owner(
124        client: &Client,
125        account_id: &AccountId,
126        log_type: &EventLogType,
127    ) -> Result<EventLogOwner, Error> {
128        let account_id = *account_id;
129        let log_type = *log_type;
130        let result = client
131            .conn_and_then(move |conn| {
132                let account = AccountEntity::new(&conn);
133                let account_row = account.find_one(&account_id)?;
134                match log_type {
135                    EventLogType::Folder(folder_id) => {
136                        let folder = FolderEntity::new(&conn);
137                        let folder_row = folder.find_one(&folder_id)?;
138                        Ok::<_, Error>((account_row, Some(folder_row)))
139                    }
140                    _ => Ok::<_, Error>((account_row, None)),
141                }
142            })
143            .await?;
144
145        Ok(match result {
146            (account_row, None) => EventLogOwner::Account(account_row.row_id),
147            (_, Some(folder_row)) => EventLogOwner::Folder(
148                FolderRecord::from_row(folder_row).await?,
149            ),
150        })
151    }
152
153    async fn insert_records(
154        &mut self,
155        records: &[EventRecord],
156        delete_before: bool,
157    ) -> Result<(), E> {
158        let log_type = self.log_type.clone();
159        let mut insert_rows = Vec::new();
160        let mut commits = Vec::new();
161        for record in records {
162            commits.push(*record.commit());
163            insert_rows.push(EventRecordRow::new(&record)?);
164        }
165
166        let id = (&self.owner).into();
167
168        // Insert into the database.
169        self.client
170            .conn_mut(move |conn| {
171                let tx = conn.transaction()?;
172                let events = EventEntity::new(&tx);
173                if delete_before {
174                    events.delete_all_events(log_type, id)?;
175                }
176                let ids = events.insert_events(
177                    log_type,
178                    id,
179                    insert_rows.as_slice(),
180                )?;
181                tx.commit()?;
182                Ok(ids)
183            })
184            .await
185            .map_err(Error::from)?;
186
187        if delete_before {
188            self.tree = CommitTree::new();
189        }
190
191        // Update the in-memory merkle tree
192        let mut hashes =
193            commits.iter().map(|c| *c.as_ref()).collect::<Vec<_>>();
194        self.tree.append(&mut hashes);
195        self.tree.commit();
196
197        Ok(())
198    }
199}
200
201impl<E> DatabaseEventLog<AccountEvent, E>
202where
203    E: std::error::Error
204        + std::fmt::Debug
205        + From<sos_core::Error>
206        + From<crate::Error>
207        + From<std::io::Error>
208        + Send
209        + Sync
210        + 'static,
211{
212    /// Create a new account event log.
213    pub async fn new_account(
214        client: Client,
215        account_id: AccountId,
216    ) -> Result<Self, E> {
217        let log_type = EventLogType::Account;
218        let owner =
219            Self::lookup_owner(&client, &account_id, &log_type).await?;
220        Ok(Self {
221            owner,
222            client,
223            log_type,
224            tree: CommitTree::new(),
225            marker: std::marker::PhantomData,
226        })
227    }
228}
229
230impl<E> DatabaseEventLog<WriteEvent, E>
231where
232    E: std::error::Error
233        + std::fmt::Debug
234        + From<sos_core::Error>
235        + From<crate::Error>
236        + From<std::io::Error>
237        + Send
238        + Sync
239        + 'static,
240{
241    /// Create a new folder event log.
242    pub async fn new_folder(
243        client: Client,
244        account_id: AccountId,
245        folder_id: VaultId,
246    ) -> Result<Self, E> {
247        let log_type = EventLogType::Folder(folder_id);
248        let owner =
249            Self::lookup_owner(&client, &account_id, &log_type).await?;
250
251        Ok(Self {
252            owner,
253            client,
254            log_type,
255            tree: CommitTree::new(),
256            marker: std::marker::PhantomData,
257        })
258    }
259}
260
261impl<E> DatabaseEventLog<DeviceEvent, E>
262where
263    E: std::error::Error
264        + std::fmt::Debug
265        + From<sos_core::Error>
266        + From<crate::Error>
267        + From<std::io::Error>
268        + Send
269        + Sync
270        + 'static,
271{
272    /// Create a new device event log.
273    pub async fn new_device(
274        client: Client,
275        account_id: AccountId,
276    ) -> Result<Self, E> {
277        let log_type = EventLogType::Device;
278        let owner =
279            Self::lookup_owner(&client, &account_id, &log_type).await?;
280        Ok(Self {
281            owner,
282            client,
283            log_type,
284            tree: CommitTree::new(),
285            marker: std::marker::PhantomData,
286        })
287    }
288}
289
290#[cfg(feature = "files")]
291impl<E> DatabaseEventLog<FileEvent, E>
292where
293    E: std::error::Error
294        + std::fmt::Debug
295        + From<sos_core::Error>
296        + From<crate::Error>
297        + From<std::io::Error>
298        + Send
299        + Sync
300        + 'static,
301{
302    /// Create a new file event log.
303    pub async fn new_file(
304        client: Client,
305        account_id: AccountId,
306    ) -> Result<Self, Error> {
307        let log_type = EventLogType::Files;
308        let owner =
309            Self::lookup_owner(&client, &account_id, &log_type).await?;
310        Ok(Self {
311            owner,
312            client,
313            log_type,
314            tree: CommitTree::new(),
315            marker: std::marker::PhantomData,
316        })
317    }
318}
319
320#[async_trait]
321impl<T, E> EventLog<T> for DatabaseEventLog<T, E>
322where
323    T: Default + Encodable + Decodable + Send + Sync + 'static,
324    E: std::error::Error
325        + std::fmt::Debug
326        + From<sos_core::Error>
327        + From<crate::Error>
328        + From<std::io::Error>
329        + Send
330        + Sync
331        + 'static,
332{
333    type Error = E;
334
335    async fn record_stream(
336        &self,
337        reverse: bool,
338    ) -> BoxStream<'async_trait, Result<EventRecord, Self::Error>> {
339        let (tx, rx) = tokio::sync::mpsc::channel(8);
340
341        let id: i64 = (&self.owner).into();
342        let log_type = self.log_type.clone();
343        let client = self.client.clone();
344
345        tokio::spawn(async move {
346            client
347                .conn_and_then(move |conn| {
348                    let query =
349                        EventEntity::find_all_query(log_type, reverse);
350
351                    let mut stmt = conn.prepare_cached(&query.as_string())?;
352
353                    fn convert_row(
354                        row: &Row<'_>,
355                    ) -> Result<EventRecordRow, crate::Error>
356                    {
357                        Ok(row.try_into()?)
358                    }
359
360                    let rows = stmt.query_and_then([id], |row| {
361                        Ok::<_, crate::Error>(convert_row(row)?)
362                    })?;
363
364                    for row in rows {
365                        let row = row?;
366                        let record: EventRecord = row.try_into()?;
367                        let sender = tx.clone();
368                        futures::executor::block_on(async move {
369                            if let Err(err) = sender.send(Ok(record)).await {
370                                tracing::error!(error = %err);
371                            }
372                        });
373                    }
374
375                    Ok::<_, Error>(())
376                })
377                .await?;
378            Ok::<_, Error>(())
379        });
380
381        ReceiverStream::new(rx).boxed()
382    }
383
384    async fn event_stream(
385        &self,
386        reverse: bool,
387    ) -> BoxStream<'async_trait, Result<(EventRecord, T), Self::Error>> {
388        self.record_stream(reverse)
389            .await
390            .try_filter_map(|record| async {
391                let event = record.decode_event::<T>().await?;
392                Ok(Some((record, event)))
393            })
394            .boxed()
395    }
396
397    async fn diff_checked(
398        &self,
399        commit: Option<CommitHash>,
400        checkpoint: CommitProof,
401    ) -> Result<Diff<T>, Self::Error> {
402        let patch = self.diff_events(commit.as_ref()).await?;
403        Ok(Diff::<T> {
404            last_commit: commit,
405            patch,
406            checkpoint,
407        })
408    }
409
410    async fn diff_unchecked(&self) -> Result<Diff<T>, Self::Error> {
411        let patch = self.diff_events(None).await?;
412        Ok(Diff::<T> {
413            last_commit: None,
414            patch,
415            checkpoint: self.tree().head()?,
416        })
417    }
418
419    async fn diff_events(
420        &self,
421        commit: Option<&CommitHash>,
422    ) -> Result<Patch<T>, Self::Error> {
423        let records = self.diff_records(commit).await?;
424        Ok(Patch::new(records))
425    }
426
427    fn tree(&self) -> &CommitTree {
428        &self.tree
429    }
430
431    async fn rewind(
432        &mut self,
433        commit: &CommitHash,
434    ) -> Result<Vec<EventRecord>, Self::Error> {
435        let (records, tree) = {
436            let stream = self.record_stream(true).await;
437            pin_mut!(stream);
438
439            let mut records = Vec::new();
440            let mut tree = CommitTree::new();
441            let mut new_len = 0;
442
443            while let Some(record) = stream.next().await {
444                let record = record?;
445                if record.commit() == commit {
446                    let mut leaves = self.tree().leaves().unwrap_or_default();
447                    new_len = leaves.len() - records.len();
448                    leaves.truncate(new_len);
449
450                    tree.append(&mut leaves);
451                    tree.commit();
452
453                    break;
454                }
455                records.push(record);
456            }
457
458            if new_len == 0 {
459                return Err(Error::CommitNotFound(*commit).into());
460            }
461
462            (records, tree)
463        };
464
465        let delete_ids =
466            records.iter().map(|r| *r.commit()).collect::<Vec<_>>();
467
468        // Delete from the database
469        let log_type = self.log_type.clone();
470        self.client
471            .conn_mut(move |conn| {
472                let tx = conn.transaction()?;
473                let events = EventEntity::new(&tx);
474                for id in delete_ids {
475                    events.delete_one(log_type, &id)?;
476                }
477                tx.commit()?;
478                Ok(())
479            })
480            .await
481            .map_err(Error::from)?;
482
483        // Update merkle tree
484        self.tree = tree;
485
486        Ok(records)
487    }
488
489    async fn load_tree(&mut self) -> Result<(), Self::Error> {
490        let log_type = self.log_type.clone();
491        let id = (&self.owner).into();
492        let commits = self
493            .client
494            .conn_and_then(move |conn| {
495                let events = EventEntity::new(&conn);
496                let commits = events.load_commits(log_type, id)?;
497                Ok::<_, Error>(commits)
498            })
499            .await?;
500        for commit in commits {
501            let record: CommitRecord = commit.try_into()?;
502            self.tree.insert(*record.commit_hash.as_ref());
503        }
504        self.tree.commit();
505        Ok(())
506    }
507
508    async fn clear(&mut self) -> Result<(), Self::Error> {
509        let log_type = self.log_type.clone();
510        let id = (&self.owner).into();
511        self.client
512            .conn_mut(move |conn| {
513                let tx = conn.transaction()?;
514                let events = EventEntity::new(&tx);
515                events.delete_all_events(log_type, id)?;
516                tx.commit()?;
517                Ok(())
518            })
519            .await
520            .map_err(Error::from)?;
521        self.tree = CommitTree::new();
522        Ok(())
523    }
524
525    async fn apply(&mut self, events: &[T]) -> Result<(), Self::Error> {
526        let mut records = Vec::with_capacity(events.len());
527        for event in events {
528            records.push(EventRecord::encode_event(event).await?);
529        }
530        self.apply_records(records).await
531    }
532
533    async fn apply_records(
534        &mut self,
535        records: Vec<EventRecord>,
536    ) -> Result<(), Self::Error> {
537        self.insert_records(records.as_slice(), false).await
538    }
539
540    async fn patch_checked(
541        &mut self,
542        commit_proof: &CommitProof,
543        patch: &Patch<T>,
544    ) -> Result<CheckedPatch, Self::Error> {
545        let comparison = self.tree().compare(commit_proof)?;
546        match comparison {
547            Comparison::Equal => {
548                self.patch_unchecked(patch).await?;
549                let proof = self.tree().head()?;
550                Ok(CheckedPatch::Success(proof))
551            }
552            Comparison::Contains(indices) => {
553                let head = self.tree().head()?;
554                let contains = self.tree().proof(&indices)?;
555                Ok(CheckedPatch::Conflict {
556                    head,
557                    contains: Some(contains),
558                })
559            }
560            Comparison::Unknown => {
561                let head = self.tree().head()?;
562                Ok(CheckedPatch::Conflict {
563                    head,
564                    contains: None,
565                })
566            }
567        }
568    }
569
570    async fn replace_all_events(
571        &mut self,
572        diff: &Diff<T>,
573    ) -> Result<(), Self::Error> {
574        self.insert_records(diff.patch.records(), true).await?;
575
576        let computed = self.tree().head()?;
577        let verified = computed == diff.checkpoint;
578        if !verified {
579            return Err(Error::CheckpointVerification {
580                checkpoint: diff.checkpoint.root,
581                computed: computed.root,
582            }
583            .into());
584        }
585
586        Ok(())
587    }
588
589    async fn patch_unchecked(
590        &mut self,
591        patch: &Patch<T>,
592    ) -> Result<(), Self::Error> {
593        self.apply_records(patch.records().to_vec()).await
594    }
595
596    async fn diff_records(
597        &self,
598        commit: Option<&CommitHash>,
599    ) -> Result<Vec<EventRecord>, Self::Error> {
600        let mut events = Vec::new();
601
602        let stream = self.record_stream(true).await;
603        pin_mut!(stream);
604
605        while let Some(record) = stream.next().await {
606            let record = record?;
607            if let Some(commit) = commit {
608                if record.commit() == commit {
609                    return Ok(events);
610                }
611            }
612            // Iterating in reverse order as we would typically
613            // be looking for commits near the end of the event log
614            // but we want the patch events in the order they were
615            // appended so insert at the beginning to reverse the list
616            events.insert(0, record);
617        }
618
619        // If the caller wanted to patch until a particular commit
620        // but it doesn't exist we error otherwise we would return
621        // all the events
622        if let Some(commit) = commit {
623            return Err(Error::CommitNotFound(*commit).into());
624        }
625
626        Ok(events)
627    }
628
629    fn version(&self) -> u16 {
630        match &self.owner {
631            EventLogOwner::Folder(folder) => *folder.summary.version(),
632            EventLogOwner::Account(_) => VERSION1,
633        }
634    }
635}