1use crate::{BackendTarget, Error};
2use async_trait::async_trait;
3use binary_stream::futures::{Decodable, Encodable};
4use futures::stream::BoxStream;
5use sos_core::{
6 commit::{CommitHash, CommitProof, CommitTree},
7 events::{
8 patch::{CheckedPatch, Diff, Patch},
9 AccountEvent, DeviceEvent, EventLog, EventLogType, EventRecord,
10 WriteEvent,
11 },
12 AccountId, VaultId,
13};
14use sos_database::{
15 entity::{AccountEntity, FolderEntity, FolderRecord},
16 DatabaseEventLog,
17};
18use sos_filesystem::FileSystemEventLog;
19
20pub type AccountEventLog = BackendEventLog<AccountEvent>;
22pub type DeviceEventLog = BackendEventLog<DeviceEvent>;
24pub type FolderEventLog = BackendEventLog<WriteEvent>;
26#[cfg(feature = "files")]
28pub type FileEventLog = BackendEventLog<sos_core::events::FileEvent>;
29
30#[cfg(feature = "files")]
31use sos_core::events::FileEvent;
32
33pub enum BackendEventLog<T>
35where
36 T: Default + Encodable + Decodable + Send + Sync,
37{
38 Database(DatabaseEventLog<T, Error>),
40 FileSystem(FileSystemEventLog<T, Error>),
42}
43
44impl BackendEventLog<AccountEvent> {
45 pub async fn new_account(
47 target: BackendTarget,
48 account_id: &AccountId,
49 ) -> Result<Self, Error> {
50 Ok(match target {
51 BackendTarget::FileSystem(paths) => BackendEventLog::FileSystem(
52 FileSystemEventLog::<AccountEvent, Error>::new_account(
53 paths.with_account_id(account_id).account_events(),
54 *account_id,
55 )
56 .await?,
57 ),
58 BackendTarget::Database(_, client) => BackendEventLog::Database(
59 DatabaseEventLog::<AccountEvent, Error>::new_account(
60 client,
61 *account_id,
62 )
63 .await?,
64 ),
65 })
66 }
67}
68
69impl BackendEventLog<WriteEvent> {
70 pub async fn new_folder(
72 target: BackendTarget,
73 account_id: &AccountId,
74 folder_id: &VaultId,
75 ) -> Result<Self, Error> {
76 Ok(match target {
77 BackendTarget::FileSystem(paths) => BackendEventLog::FileSystem(
78 FileSystemEventLog::<WriteEvent, Error>::new_folder(
79 paths
80 .with_account_id(account_id)
81 .event_log_path(folder_id),
82 *account_id,
83 EventLogType::Folder(*folder_id),
84 )
85 .await?,
86 ),
87 BackendTarget::Database(_, client) => BackendEventLog::Database(
88 DatabaseEventLog::<WriteEvent, Error>::new_folder(
89 client,
90 *account_id,
91 *folder_id,
92 )
93 .await?,
94 ),
95 })
96 }
97
98 pub async fn new_login_folder(
100 target: BackendTarget,
101 account_id: &AccountId,
102 ) -> Result<Self, Error> {
103 Ok(match target {
104 BackendTarget::FileSystem(paths) => BackendEventLog::FileSystem(
105 FileSystemEventLog::<WriteEvent, Error>::new_folder(
106 paths.with_account_id(account_id).identity_events(),
107 *account_id,
108 EventLogType::Identity,
109 )
110 .await?,
111 ),
112 BackendTarget::Database(_, client) => {
113 let account_id = *account_id;
114 let folder_row = client
115 .conn_and_then(move |conn| {
116 let account_entity = AccountEntity::new(&conn);
117 let account_row =
118 account_entity.find_one(&account_id)?;
119 let folder_entity = FolderEntity::new(&conn);
120 folder_entity.find_login_folder(account_row.row_id)
121 })
122 .await?;
123 let folder_record =
124 FolderRecord::from_row(folder_row).await?;
125 BackendEventLog::Database(
126 DatabaseEventLog::<WriteEvent, Error>::new_folder(
127 client,
128 account_id,
129 *folder_record.summary.id(),
130 )
131 .await?,
132 )
133 }
134 })
135 }
136}
137
138impl BackendEventLog<DeviceEvent> {
139 pub async fn new_device(
141 target: BackendTarget,
142 account_id: &AccountId,
143 ) -> Result<Self, Error> {
144 Ok(match target {
145 BackendTarget::FileSystem(paths) => BackendEventLog::FileSystem(
146 FileSystemEventLog::<DeviceEvent, Error>::new_device(
147 paths.with_account_id(account_id).device_events(),
148 *account_id,
149 )
150 .await?,
151 ),
152 BackendTarget::Database(_, client) => BackendEventLog::Database(
153 DatabaseEventLog::<DeviceEvent, Error>::new_device(
154 client,
155 *account_id,
156 )
157 .await?,
158 ),
159 })
160 }
161}
162
163#[cfg(feature = "files")]
164impl BackendEventLog<FileEvent> {
165 pub async fn new_file(
167 target: BackendTarget,
168 account_id: &AccountId,
169 ) -> Result<Self, Error> {
170 Ok(match target {
171 BackendTarget::FileSystem(paths) => BackendEventLog::FileSystem(
172 FileSystemEventLog::<FileEvent, Error>::new_file(
173 paths.with_account_id(account_id).file_events(),
174 *account_id,
175 )
176 .await?,
177 ),
178 BackendTarget::Database(_, client) => BackendEventLog::Database(
179 DatabaseEventLog::<FileEvent, Error>::new_file(
180 client,
181 *account_id,
182 )
183 .await?,
184 ),
185 })
186 }
187}
188
189#[async_trait]
190impl<T> EventLog<T> for BackendEventLog<T>
191where
192 T: Default + Encodable + Decodable + Send + Sync + 'static,
193{
194 type Error = Error;
195
196 async fn record_stream(
197 &self,
198 reverse: bool,
199 ) -> BoxStream<'async_trait, Result<EventRecord, Self::Error>> {
200 match self {
201 Self::Database(inner) => inner.record_stream(reverse).await,
202 Self::FileSystem(inner) => inner.record_stream(reverse).await,
203 }
204 }
205
206 async fn event_stream(
207 &self,
208 reverse: bool,
209 ) -> BoxStream<'async_trait, Result<(EventRecord, T), Self::Error>> {
210 match self {
211 Self::Database(inner) => inner.event_stream(reverse).await,
212 Self::FileSystem(inner) => inner.event_stream(reverse).await,
213 }
214 }
215
216 async fn diff_checked(
217 &self,
218 commit: Option<CommitHash>,
219 checkpoint: CommitProof,
220 ) -> Result<Diff<T>, Self::Error> {
221 match self {
222 Self::Database(inner) => {
223 inner.diff_checked(commit, checkpoint).await
224 }
225 Self::FileSystem(inner) => {
226 inner.diff_checked(commit, checkpoint).await
227 }
228 }
229 }
230
231 async fn diff_unchecked(&self) -> Result<Diff<T>, Self::Error> {
232 match self {
233 Self::Database(inner) => inner.diff_unchecked().await,
234 Self::FileSystem(inner) => inner.diff_unchecked().await,
235 }
236 }
237
238 async fn diff_events(
239 &self,
240 commit: Option<&CommitHash>,
241 ) -> Result<Patch<T>, Self::Error> {
242 match self {
243 Self::Database(inner) => inner.diff_events(commit).await,
244 Self::FileSystem(inner) => inner.diff_events(commit).await,
245 }
246 }
247
248 fn tree(&self) -> &CommitTree {
249 match self {
250 Self::Database(inner) => inner.tree(),
251 Self::FileSystem(inner) => inner.tree(),
252 }
253 }
254
255 async fn rewind(
281 &mut self,
282 commit: &CommitHash,
283 ) -> Result<Vec<EventRecord>, Self::Error> {
284 match self {
285 Self::Database(inner) => inner.rewind(commit).await,
286 Self::FileSystem(inner) => inner.rewind(commit).await,
287 }
288 }
289
290 async fn load_tree(&mut self) -> Result<(), Self::Error> {
291 match self {
292 Self::Database(inner) => inner.load_tree().await,
293 Self::FileSystem(inner) => inner.load_tree().await,
294 }
295 }
296
297 async fn clear(&mut self) -> Result<(), Self::Error> {
298 match self {
299 Self::Database(inner) => inner.clear().await,
300 Self::FileSystem(inner) => inner.clear().await,
301 }
302 }
303
304 async fn apply(&mut self, events: &[T]) -> Result<(), Self::Error> {
305 match self {
306 Self::Database(inner) => inner.apply(events).await,
307 Self::FileSystem(inner) => inner.apply(events).await,
308 }
309 }
310
311 async fn apply_records(
312 &mut self,
313 records: Vec<EventRecord>,
314 ) -> Result<(), Self::Error> {
315 match self {
316 Self::Database(inner) => inner.apply_records(records).await,
317 Self::FileSystem(inner) => inner.apply_records(records).await,
318 }
319 }
320
321 async fn patch_checked(
322 &mut self,
323 commit_proof: &CommitProof,
324 patch: &Patch<T>,
325 ) -> Result<CheckedPatch, Self::Error> {
326 match self {
327 Self::Database(inner) => {
328 inner.patch_checked(commit_proof, patch).await
329 }
330 Self::FileSystem(inner) => {
331 inner.patch_checked(commit_proof, patch).await
332 }
333 }
334 }
335
336 async fn replace_all_events(
337 &mut self,
338 diff: &Diff<T>,
339 ) -> Result<(), Self::Error> {
340 match self {
341 Self::Database(inner) => inner.replace_all_events(diff).await,
342 Self::FileSystem(inner) => inner.replace_all_events(diff).await,
343 }
344 }
345
346 async fn patch_unchecked(
347 &mut self,
348 patch: &Patch<T>,
349 ) -> Result<(), Self::Error> {
350 match self {
351 Self::Database(inner) => inner.patch_unchecked(patch).await,
352 Self::FileSystem(inner) => inner.patch_unchecked(patch).await,
353 }
354 }
355
356 async fn diff_records(
357 &self,
358 commit: Option<&CommitHash>,
359 ) -> Result<Vec<EventRecord>, Self::Error> {
360 match self {
361 Self::Database(inner) => inner.diff_records(commit).await,
362 Self::FileSystem(inner) => inner.diff_records(commit).await,
363 }
364 }
365
366 fn version(&self) -> u16 {
367 match self {
368 Self::Database(inner) => inner.version(),
369 Self::FileSystem(inner) => inner.version(),
370 }
371 }
372}