1use crate::{
11 entity::{
12 AccountEntity, CommitRecord, EventEntity, EventRecordRow,
13 FolderEntity, FolderRecord,
14 },
15 Error,
16};
17use async_sqlite::{rusqlite::Row, Client};
18use async_trait::async_trait;
19use binary_stream::futures::{Decodable, Encodable};
20use futures::{
21 pin_mut,
22 stream::{BoxStream, StreamExt, TryStreamExt},
23};
24use sos_core::{
25 commit::{CommitHash, CommitProof, CommitTree, Comparison},
26 encoding::VERSION1,
27 events::{
28 patch::{CheckedPatch, Diff, Patch},
29 AccountEvent, DeviceEvent, EventLog, EventLogType, EventRecord,
30 WriteEvent,
31 },
32 AccountId, VaultId,
33};
34
35#[derive(Clone)]
37#[doc(hidden)]
38pub enum EventLogOwner {
39 Account(i64),
41 Folder(FolderRecord),
43}
44
45impl From<&EventLogOwner> for i64 {
46 fn from(value: &EventLogOwner) -> Self {
47 match value {
48 EventLogOwner::Account(id) => *id,
49 EventLogOwner::Folder(folder) => folder.row_id,
50 }
51 }
52}
53
54#[cfg(feature = "files")]
55use sos_core::events::FileEvent;
56use tokio_stream::wrappers::ReceiverStream;
57
58pub type AccountEventLog<E> = DatabaseEventLog<AccountEvent, E>;
60
61pub type DeviceEventLog<E> = DatabaseEventLog<DeviceEvent, E>;
63
64pub type FolderEventLog<E> = DatabaseEventLog<WriteEvent, E>;
66
67#[cfg(feature = "files")]
69pub type FileEventLog<E> = DatabaseEventLog<FileEvent, E>;
70
71pub struct DatabaseEventLog<T, E>
73where
74 T: Default + Encodable + Decodable + Send + Sync,
75 E: std::error::Error
76 + std::fmt::Debug
77 + From<sos_core::Error>
78 + From<crate::Error>
79 + From<std::io::Error>
80 + Send
81 + Sync
82 + 'static,
83{
84 owner: EventLogOwner,
85 client: Client,
86 log_type: EventLogType,
87 tree: CommitTree,
88 marker: std::marker::PhantomData<(T, E)>,
89}
90
91impl<T, E> DatabaseEventLog<T, E>
92where
93 T: Default + Encodable + Decodable + Send + Sync,
94 E: std::error::Error
95 + std::fmt::Debug
96 + From<sos_core::Error>
97 + From<crate::Error>
98 + From<std::io::Error>
99 + Send
100 + Sync
101 + 'static,
102{
103 pub fn with_new_client(
109 &self,
110 client: Client,
111 owner: Option<EventLogOwner>,
112 ) -> Self {
113 Self {
114 owner: owner.unwrap_or_else(|| self.owner.clone()),
115 client,
116 log_type: self.log_type,
117 tree: CommitTree::new(),
118 marker: std::marker::PhantomData,
119 }
120 }
121
122 async fn lookup_owner(
124 client: &Client,
125 account_id: &AccountId,
126 log_type: &EventLogType,
127 ) -> Result<EventLogOwner, Error> {
128 let account_id = *account_id;
129 let log_type = *log_type;
130 let result = client
131 .conn_and_then(move |conn| {
132 let account = AccountEntity::new(&conn);
133 let account_row = account.find_one(&account_id)?;
134 match log_type {
135 EventLogType::Folder(folder_id) => {
136 let folder = FolderEntity::new(&conn);
137 let folder_row = folder.find_one(&folder_id)?;
138 Ok::<_, Error>((account_row, Some(folder_row)))
139 }
140 _ => Ok::<_, Error>((account_row, None)),
141 }
142 })
143 .await?;
144
145 Ok(match result {
146 (account_row, None) => EventLogOwner::Account(account_row.row_id),
147 (_, Some(folder_row)) => EventLogOwner::Folder(
148 FolderRecord::from_row(folder_row).await?,
149 ),
150 })
151 }
152
153 async fn insert_records(
154 &mut self,
155 records: &[EventRecord],
156 delete_before: bool,
157 ) -> Result<(), E> {
158 let log_type = self.log_type.clone();
159 let mut insert_rows = Vec::new();
160 let mut commits = Vec::new();
161 for record in records {
162 commits.push(*record.commit());
163 insert_rows.push(EventRecordRow::new(&record)?);
164 }
165
166 let id = (&self.owner).into();
167
168 self.client
170 .conn_mut(move |conn| {
171 let tx = conn.transaction()?;
172 let events = EventEntity::new(&tx);
173 if delete_before {
174 events.delete_all_events(log_type, id)?;
175 }
176 let ids = events.insert_events(
177 log_type,
178 id,
179 insert_rows.as_slice(),
180 )?;
181 tx.commit()?;
182 Ok(ids)
183 })
184 .await
185 .map_err(Error::from)?;
186
187 if delete_before {
188 self.tree = CommitTree::new();
189 }
190
191 let mut hashes =
193 commits.iter().map(|c| *c.as_ref()).collect::<Vec<_>>();
194 self.tree.append(&mut hashes);
195 self.tree.commit();
196
197 Ok(())
198 }
199}
200
201impl<E> DatabaseEventLog<AccountEvent, E>
202where
203 E: std::error::Error
204 + std::fmt::Debug
205 + From<sos_core::Error>
206 + From<crate::Error>
207 + From<std::io::Error>
208 + Send
209 + Sync
210 + 'static,
211{
212 pub async fn new_account(
214 client: Client,
215 account_id: AccountId,
216 ) -> Result<Self, E> {
217 let log_type = EventLogType::Account;
218 let owner =
219 Self::lookup_owner(&client, &account_id, &log_type).await?;
220 Ok(Self {
221 owner,
222 client,
223 log_type,
224 tree: CommitTree::new(),
225 marker: std::marker::PhantomData,
226 })
227 }
228}
229
230impl<E> DatabaseEventLog<WriteEvent, E>
231where
232 E: std::error::Error
233 + std::fmt::Debug
234 + From<sos_core::Error>
235 + From<crate::Error>
236 + From<std::io::Error>
237 + Send
238 + Sync
239 + 'static,
240{
241 pub async fn new_folder(
243 client: Client,
244 account_id: AccountId,
245 folder_id: VaultId,
246 ) -> Result<Self, E> {
247 let log_type = EventLogType::Folder(folder_id);
248 let owner =
249 Self::lookup_owner(&client, &account_id, &log_type).await?;
250
251 Ok(Self {
252 owner,
253 client,
254 log_type,
255 tree: CommitTree::new(),
256 marker: std::marker::PhantomData,
257 })
258 }
259}
260
261impl<E> DatabaseEventLog<DeviceEvent, E>
262where
263 E: std::error::Error
264 + std::fmt::Debug
265 + From<sos_core::Error>
266 + From<crate::Error>
267 + From<std::io::Error>
268 + Send
269 + Sync
270 + 'static,
271{
272 pub async fn new_device(
274 client: Client,
275 account_id: AccountId,
276 ) -> Result<Self, E> {
277 let log_type = EventLogType::Device;
278 let owner =
279 Self::lookup_owner(&client, &account_id, &log_type).await?;
280 Ok(Self {
281 owner,
282 client,
283 log_type,
284 tree: CommitTree::new(),
285 marker: std::marker::PhantomData,
286 })
287 }
288}
289
290#[cfg(feature = "files")]
291impl<E> DatabaseEventLog<FileEvent, E>
292where
293 E: std::error::Error
294 + std::fmt::Debug
295 + From<sos_core::Error>
296 + From<crate::Error>
297 + From<std::io::Error>
298 + Send
299 + Sync
300 + 'static,
301{
302 pub async fn new_file(
304 client: Client,
305 account_id: AccountId,
306 ) -> Result<Self, Error> {
307 let log_type = EventLogType::Files;
308 let owner =
309 Self::lookup_owner(&client, &account_id, &log_type).await?;
310 Ok(Self {
311 owner,
312 client,
313 log_type,
314 tree: CommitTree::new(),
315 marker: std::marker::PhantomData,
316 })
317 }
318}
319
320#[async_trait]
321impl<T, E> EventLog<T> for DatabaseEventLog<T, E>
322where
323 T: Default + Encodable + Decodable + Send + Sync + 'static,
324 E: std::error::Error
325 + std::fmt::Debug
326 + From<sos_core::Error>
327 + From<crate::Error>
328 + From<std::io::Error>
329 + Send
330 + Sync
331 + 'static,
332{
333 type Error = E;
334
335 async fn record_stream(
336 &self,
337 reverse: bool,
338 ) -> BoxStream<'async_trait, Result<EventRecord, Self::Error>> {
339 let (tx, rx) = tokio::sync::mpsc::channel(8);
340
341 let id: i64 = (&self.owner).into();
342 let log_type = self.log_type.clone();
343 let client = self.client.clone();
344
345 tokio::spawn(async move {
346 client
347 .conn_and_then(move |conn| {
348 let query =
349 EventEntity::find_all_query(log_type, reverse);
350
351 let mut stmt = conn.prepare_cached(&query.as_string())?;
352
353 fn convert_row(
354 row: &Row<'_>,
355 ) -> Result<EventRecordRow, crate::Error>
356 {
357 Ok(row.try_into()?)
358 }
359
360 let rows = stmt.query_and_then([id], |row| {
361 Ok::<_, crate::Error>(convert_row(row)?)
362 })?;
363
364 for row in rows {
365 let row = row?;
366 let record: EventRecord = row.try_into()?;
367 let sender = tx.clone();
368 futures::executor::block_on(async move {
369 if let Err(err) = sender.send(Ok(record)).await {
370 tracing::error!(error = %err);
371 }
372 });
373 }
374
375 Ok::<_, Error>(())
376 })
377 .await?;
378 Ok::<_, Error>(())
379 });
380
381 ReceiverStream::new(rx).boxed()
382 }
383
384 async fn event_stream(
385 &self,
386 reverse: bool,
387 ) -> BoxStream<'async_trait, Result<(EventRecord, T), Self::Error>> {
388 self.record_stream(reverse)
389 .await
390 .try_filter_map(|record| async {
391 let event = record.decode_event::<T>().await?;
392 Ok(Some((record, event)))
393 })
394 .boxed()
395 }
396
397 async fn diff_checked(
398 &self,
399 commit: Option<CommitHash>,
400 checkpoint: CommitProof,
401 ) -> Result<Diff<T>, Self::Error> {
402 let patch = self.diff_events(commit.as_ref()).await?;
403 Ok(Diff::<T> {
404 last_commit: commit,
405 patch,
406 checkpoint,
407 })
408 }
409
410 async fn diff_unchecked(&self) -> Result<Diff<T>, Self::Error> {
411 let patch = self.diff_events(None).await?;
412 Ok(Diff::<T> {
413 last_commit: None,
414 patch,
415 checkpoint: self.tree().head()?,
416 })
417 }
418
419 async fn diff_events(
420 &self,
421 commit: Option<&CommitHash>,
422 ) -> Result<Patch<T>, Self::Error> {
423 let records = self.diff_records(commit).await?;
424 Ok(Patch::new(records))
425 }
426
427 fn tree(&self) -> &CommitTree {
428 &self.tree
429 }
430
431 async fn rewind(
432 &mut self,
433 commit: &CommitHash,
434 ) -> Result<Vec<EventRecord>, Self::Error> {
435 let (records, tree) = {
436 let stream = self.record_stream(true).await;
437 pin_mut!(stream);
438
439 let mut records = Vec::new();
440 let mut tree = CommitTree::new();
441 let mut new_len = 0;
442
443 while let Some(record) = stream.next().await {
444 let record = record?;
445 if record.commit() == commit {
446 let mut leaves = self.tree().leaves().unwrap_or_default();
447 new_len = leaves.len() - records.len();
448 leaves.truncate(new_len);
449
450 tree.append(&mut leaves);
451 tree.commit();
452
453 break;
454 }
455 records.push(record);
456 }
457
458 if new_len == 0 {
459 return Err(Error::CommitNotFound(*commit).into());
460 }
461
462 (records, tree)
463 };
464
465 let delete_ids =
466 records.iter().map(|r| *r.commit()).collect::<Vec<_>>();
467
468 let log_type = self.log_type.clone();
470 self.client
471 .conn_mut(move |conn| {
472 let tx = conn.transaction()?;
473 let events = EventEntity::new(&tx);
474 for id in delete_ids {
475 events.delete_one(log_type, &id)?;
476 }
477 tx.commit()?;
478 Ok(())
479 })
480 .await
481 .map_err(Error::from)?;
482
483 self.tree = tree;
485
486 Ok(records)
487 }
488
489 async fn load_tree(&mut self) -> Result<(), Self::Error> {
490 let log_type = self.log_type.clone();
491 let id = (&self.owner).into();
492 let commits = self
493 .client
494 .conn_and_then(move |conn| {
495 let events = EventEntity::new(&conn);
496 let commits = events.load_commits(log_type, id)?;
497 Ok::<_, Error>(commits)
498 })
499 .await?;
500 let mut tree = CommitTree::new();
501 for commit in commits {
502 let record: CommitRecord = commit.try_into()?;
503 tree.insert(*record.commit_hash.as_ref());
504 }
505 tree.commit();
506 self.tree = tree;
507 Ok(())
508 }
509
510 async fn clear(&mut self) -> Result<(), Self::Error> {
511 let log_type = self.log_type.clone();
512 let id = (&self.owner).into();
513 self.client
514 .conn_mut(move |conn| {
515 let tx = conn.transaction()?;
516 let events = EventEntity::new(&tx);
517 events.delete_all_events(log_type, id)?;
518 tx.commit()?;
519 Ok(())
520 })
521 .await
522 .map_err(Error::from)?;
523 self.tree = CommitTree::new();
524 Ok(())
525 }
526
527 async fn apply(&mut self, events: &[T]) -> Result<(), Self::Error> {
528 let mut records = Vec::with_capacity(events.len());
529 for event in events {
530 records.push(EventRecord::encode_event(event).await?);
531 }
532 self.apply_records(records).await
533 }
534
535 async fn apply_records(
536 &mut self,
537 records: Vec<EventRecord>,
538 ) -> Result<(), Self::Error> {
539 self.insert_records(records.as_slice(), false).await
540 }
541
542 async fn patch_checked(
543 &mut self,
544 commit_proof: &CommitProof,
545 patch: &Patch<T>,
546 ) -> Result<CheckedPatch, Self::Error> {
547 let comparison = self.tree().compare(commit_proof)?;
548 match comparison {
549 Comparison::Equal => {
550 self.patch_unchecked(patch).await?;
551 let proof = self.tree().head()?;
552 Ok(CheckedPatch::Success(proof))
553 }
554 Comparison::Contains(indices) => {
555 let head = self.tree().head()?;
556 let contains = self.tree().proof(&indices)?;
557 Ok(CheckedPatch::Conflict {
558 head,
559 contains: Some(contains),
560 })
561 }
562 Comparison::Unknown => {
563 let head = self.tree().head()?;
564 Ok(CheckedPatch::Conflict {
565 head,
566 contains: None,
567 })
568 }
569 }
570 }
571
572 async fn replace_all_events(
573 &mut self,
574 diff: &Diff<T>,
575 ) -> Result<(), Self::Error> {
576 self.insert_records(diff.patch.records(), true).await?;
577
578 let computed = self.tree().head()?;
579 let verified = computed == diff.checkpoint;
580 if !verified {
581 return Err(Error::CheckpointVerification {
582 checkpoint: diff.checkpoint.root,
583 computed: computed.root,
584 }
585 .into());
586 }
587
588 Ok(())
589 }
590
591 async fn patch_unchecked(
592 &mut self,
593 patch: &Patch<T>,
594 ) -> Result<(), Self::Error> {
595 self.apply_records(patch.records().to_vec()).await
596 }
597
598 async fn diff_records(
599 &self,
600 commit: Option<&CommitHash>,
601 ) -> Result<Vec<EventRecord>, Self::Error> {
602 let mut events = Vec::new();
603
604 let stream = self.record_stream(true).await;
605 pin_mut!(stream);
606
607 while let Some(record) = stream.next().await {
608 let record = record?;
609 if let Some(commit) = commit {
610 if record.commit() == commit {
611 return Ok(events);
612 }
613 }
614 events.insert(0, record);
619 }
620
621 if let Some(commit) = commit {
625 return Err(Error::CommitNotFound(*commit).into());
626 }
627
628 Ok(events)
629 }
630
631 fn version(&self) -> u16 {
632 match &self.owner {
633 EventLogOwner::Folder(folder) => *folder.summary.version(),
634 EventLogOwner::Account(_) => VERSION1,
635 }
636 }
637}