sos_backend/
event_log.rs

1use crate::{BackendTarget, Error};
2use async_trait::async_trait;
3use binary_stream::futures::{Decodable, Encodable};
4use futures::stream::BoxStream;
5use sos_core::{
6    commit::{CommitHash, CommitProof, CommitTree},
7    events::{
8        patch::{CheckedPatch, Diff, Patch},
9        AccountEvent, DeviceEvent, EventLog, EventLogType, EventRecord,
10        WriteEvent,
11    },
12    AccountId, VaultId,
13};
14use sos_database::{
15    entity::{AccountEntity, FolderEntity, FolderRecord},
16    DatabaseEventLog,
17};
18use sos_filesystem::FileSystemEventLog;
19
20/// Event log for account events.
21pub type AccountEventLog = BackendEventLog<AccountEvent>;
22/// Event log for device events.
23pub type DeviceEventLog = BackendEventLog<DeviceEvent>;
24/// Event log for folder events.
25pub type FolderEventLog = BackendEventLog<WriteEvent>;
26/// Event log for file events.
27#[cfg(feature = "files")]
28pub type FileEventLog = BackendEventLog<sos_core::events::FileEvent>;
29
30#[cfg(feature = "files")]
31use sos_core::events::FileEvent;
32
33/// Generic event log.
34pub enum BackendEventLog<T>
35where
36    T: Default + Encodable + Decodable + Send + Sync,
37{
38    /// Database event log.
39    Database(DatabaseEventLog<T, Error>),
40    /// File system event log.
41    FileSystem(FileSystemEventLog<T, Error>),
42}
43
44impl BackendEventLog<AccountEvent> {
45    /// Create a new account event log.
46    pub async fn new_account(
47        target: BackendTarget,
48        account_id: &AccountId,
49    ) -> Result<Self, Error> {
50        Ok(match target {
51            BackendTarget::FileSystem(paths) => BackendEventLog::FileSystem(
52                FileSystemEventLog::<AccountEvent, Error>::new_account(
53                    paths.with_account_id(account_id).account_events(),
54                    *account_id,
55                )
56                .await?,
57            ),
58            BackendTarget::Database(_, client) => BackendEventLog::Database(
59                DatabaseEventLog::<AccountEvent, Error>::new_account(
60                    client,
61                    *account_id,
62                )
63                .await?,
64            ),
65        })
66    }
67}
68
69impl BackendEventLog<WriteEvent> {
70    /// Create a new folder event log.
71    pub async fn new_folder(
72        target: BackendTarget,
73        account_id: &AccountId,
74        folder_id: &VaultId,
75    ) -> Result<Self, Error> {
76        Ok(match target {
77            BackendTarget::FileSystem(paths) => BackendEventLog::FileSystem(
78                FileSystemEventLog::<WriteEvent, Error>::new_folder(
79                    paths
80                        .with_account_id(account_id)
81                        .event_log_path(folder_id),
82                    *account_id,
83                    EventLogType::Folder(*folder_id),
84                )
85                .await?,
86            ),
87            BackendTarget::Database(_, client) => BackendEventLog::Database(
88                DatabaseEventLog::<WriteEvent, Error>::new_folder(
89                    client,
90                    *account_id,
91                    *folder_id,
92                )
93                .await?,
94            ),
95        })
96    }
97
98    /// Create a new login event log.
99    pub async fn new_login_folder(
100        target: BackendTarget,
101        account_id: &AccountId,
102    ) -> Result<Self, Error> {
103        Ok(match target {
104            BackendTarget::FileSystem(paths) => BackendEventLog::FileSystem(
105                FileSystemEventLog::<WriteEvent, Error>::new_folder(
106                    paths.with_account_id(account_id).identity_events(),
107                    *account_id,
108                    EventLogType::Identity,
109                )
110                .await?,
111            ),
112            BackendTarget::Database(_, client) => {
113                let account_id = *account_id;
114                let folder_row = client
115                    .conn_and_then(move |conn| {
116                        let account_entity = AccountEntity::new(&conn);
117                        let account_row =
118                            account_entity.find_one(&account_id)?;
119                        let folder_entity = FolderEntity::new(&conn);
120                        folder_entity.find_login_folder(account_row.row_id)
121                    })
122                    .await?;
123                let folder_record =
124                    FolderRecord::from_row(folder_row).await?;
125                BackendEventLog::Database(
126                    DatabaseEventLog::<WriteEvent, Error>::new_folder(
127                        client,
128                        account_id,
129                        *folder_record.summary.id(),
130                    )
131                    .await?,
132                )
133            }
134        })
135    }
136}
137
138impl BackendEventLog<DeviceEvent> {
139    /// Create a new device event log.
140    pub async fn new_device(
141        target: BackendTarget,
142        account_id: &AccountId,
143    ) -> Result<Self, Error> {
144        Ok(match target {
145            BackendTarget::FileSystem(paths) => BackendEventLog::FileSystem(
146                FileSystemEventLog::<DeviceEvent, Error>::new_device(
147                    paths.with_account_id(account_id).device_events(),
148                    *account_id,
149                )
150                .await?,
151            ),
152            BackendTarget::Database(_, client) => BackendEventLog::Database(
153                DatabaseEventLog::<DeviceEvent, Error>::new_device(
154                    client,
155                    *account_id,
156                )
157                .await?,
158            ),
159        })
160    }
161}
162
163#[cfg(feature = "files")]
164impl BackendEventLog<FileEvent> {
165    /// Create a new file event log.
166    pub async fn new_file(
167        target: BackendTarget,
168        account_id: &AccountId,
169    ) -> Result<Self, Error> {
170        Ok(match target {
171            BackendTarget::FileSystem(paths) => BackendEventLog::FileSystem(
172                FileSystemEventLog::<FileEvent, Error>::new_file(
173                    paths.with_account_id(account_id).file_events(),
174                    *account_id,
175                )
176                .await?,
177            ),
178            BackendTarget::Database(_, client) => BackendEventLog::Database(
179                DatabaseEventLog::<FileEvent, Error>::new_file(
180                    client,
181                    *account_id,
182                )
183                .await?,
184            ),
185        })
186    }
187}
188
189#[async_trait]
190impl<T> EventLog<T> for BackendEventLog<T>
191where
192    T: Default + Encodable + Decodable + Send + Sync + 'static,
193{
194    type Error = Error;
195
196    async fn record_stream(
197        &self,
198        reverse: bool,
199    ) -> BoxStream<'async_trait, Result<EventRecord, Self::Error>> {
200        match self {
201            Self::Database(inner) => inner.record_stream(reverse).await,
202            Self::FileSystem(inner) => inner.record_stream(reverse).await,
203        }
204    }
205
206    async fn event_stream(
207        &self,
208        reverse: bool,
209    ) -> BoxStream<'async_trait, Result<(EventRecord, T), Self::Error>> {
210        match self {
211            Self::Database(inner) => inner.event_stream(reverse).await,
212            Self::FileSystem(inner) => inner.event_stream(reverse).await,
213        }
214    }
215
216    async fn diff_checked(
217        &self,
218        commit: Option<CommitHash>,
219        checkpoint: CommitProof,
220    ) -> Result<Diff<T>, Self::Error> {
221        match self {
222            Self::Database(inner) => {
223                inner.diff_checked(commit, checkpoint).await
224            }
225            Self::FileSystem(inner) => {
226                inner.diff_checked(commit, checkpoint).await
227            }
228        }
229    }
230
231    async fn diff_unchecked(&self) -> Result<Diff<T>, Self::Error> {
232        match self {
233            Self::Database(inner) => inner.diff_unchecked().await,
234            Self::FileSystem(inner) => inner.diff_unchecked().await,
235        }
236    }
237
238    async fn diff_events(
239        &self,
240        commit: Option<&CommitHash>,
241    ) -> Result<Patch<T>, Self::Error> {
242        match self {
243            Self::Database(inner) => inner.diff_events(commit).await,
244            Self::FileSystem(inner) => inner.diff_events(commit).await,
245        }
246    }
247
248    fn tree(&self) -> &CommitTree {
249        match self {
250            Self::Database(inner) => inner.tree(),
251            Self::FileSystem(inner) => inner.tree(),
252        }
253    }
254
255    /*
256    fn identity(&self) -> &'static [u8] {
257        match self {
258            Self::Database(inner) => inner.identity(),
259            Self::FileSystem(inner) => inner.identity(),
260        }
261    }
262
263    fn version(&self) -> Option<u16> {
264        match self {
265            Self::Database(inner) => inner.version(),
266            Self::FileSystem(inner) => inner.version(),
267        }
268    }
269    */
270
271    /*
272    async fn truncate(&mut self) -> Result<(), Self::Error> {
273        match self {
274            Self::Database(inner) => inner.truncate().await,
275            Self::FileSystem(inner) => inner.truncate().await,
276        }
277    }
278    */
279
280    async fn rewind(
281        &mut self,
282        commit: &CommitHash,
283    ) -> Result<Vec<EventRecord>, Self::Error> {
284        match self {
285            Self::Database(inner) => inner.rewind(commit).await,
286            Self::FileSystem(inner) => inner.rewind(commit).await,
287        }
288    }
289
290    async fn load_tree(&mut self) -> Result<(), Self::Error> {
291        match self {
292            Self::Database(inner) => inner.load_tree().await,
293            Self::FileSystem(inner) => inner.load_tree().await,
294        }
295    }
296
297    async fn clear(&mut self) -> Result<(), Self::Error> {
298        match self {
299            Self::Database(inner) => inner.clear().await,
300            Self::FileSystem(inner) => inner.clear().await,
301        }
302    }
303
304    async fn apply(&mut self, events: &[T]) -> Result<(), Self::Error> {
305        match self {
306            Self::Database(inner) => inner.apply(events).await,
307            Self::FileSystem(inner) => inner.apply(events).await,
308        }
309    }
310
311    async fn apply_records(
312        &mut self,
313        records: Vec<EventRecord>,
314    ) -> Result<(), Self::Error> {
315        match self {
316            Self::Database(inner) => inner.apply_records(records).await,
317            Self::FileSystem(inner) => inner.apply_records(records).await,
318        }
319    }
320
321    async fn patch_checked(
322        &mut self,
323        commit_proof: &CommitProof,
324        patch: &Patch<T>,
325    ) -> Result<CheckedPatch, Self::Error> {
326        match self {
327            Self::Database(inner) => {
328                inner.patch_checked(commit_proof, patch).await
329            }
330            Self::FileSystem(inner) => {
331                inner.patch_checked(commit_proof, patch).await
332            }
333        }
334    }
335
336    async fn replace_all_events(
337        &mut self,
338        diff: &Diff<T>,
339    ) -> Result<(), Self::Error> {
340        match self {
341            Self::Database(inner) => inner.replace_all_events(diff).await,
342            Self::FileSystem(inner) => inner.replace_all_events(diff).await,
343        }
344    }
345
346    async fn patch_unchecked(
347        &mut self,
348        patch: &Patch<T>,
349    ) -> Result<(), Self::Error> {
350        match self {
351            Self::Database(inner) => inner.patch_unchecked(patch).await,
352            Self::FileSystem(inner) => inner.patch_unchecked(patch).await,
353        }
354    }
355
356    async fn diff_records(
357        &self,
358        commit: Option<&CommitHash>,
359    ) -> Result<Vec<EventRecord>, Self::Error> {
360        match self {
361            Self::Database(inner) => inner.diff_records(commit).await,
362            Self::FileSystem(inner) => inner.diff_records(commit).await,
363        }
364    }
365
366    fn version(&self) -> u16 {
367        match self {
368            Self::Database(inner) => inner.version(),
369            Self::FileSystem(inner) => inner.version(),
370        }
371    }
372}