1use crate::{
11 entity::{
12 AccountEntity, CommitRecord, EventEntity, EventRecordRow,
13 FolderEntity, FolderRecord,
14 },
15 Error,
16};
17use async_sqlite::{rusqlite::Row, Client};
18use async_trait::async_trait;
19use binary_stream::futures::{Decodable, Encodable};
20use futures::{
21 pin_mut,
22 stream::{BoxStream, StreamExt, TryStreamExt},
23};
24use sos_core::{
25 commit::{CommitHash, CommitProof, CommitSpan, CommitTree, Comparison},
26 encoding::VERSION1,
27 events::{
28 changes_feed,
29 patch::{CheckedPatch, Diff, Patch},
30 AccountEvent, DeviceEvent, EventLog, EventLogType, EventRecord,
31 LocalChangeEvent, WriteEvent,
32 },
33 AccountId, VaultId,
34};
35
36#[derive(Clone)]
38#[doc(hidden)]
39pub enum EventLogOwner {
40 Account(AccountId, i64),
42 Folder(AccountId, FolderRecord),
44}
45
46impl EventLogOwner {
47 pub fn account_id(&self) -> &AccountId {
49 match self {
50 EventLogOwner::Account(account_id, _) => account_id,
51 EventLogOwner::Folder(account_id, _) => account_id,
52 }
53 }
54}
55
56impl From<&EventLogOwner> for i64 {
57 fn from(value: &EventLogOwner) -> Self {
58 match value {
59 EventLogOwner::Account(_, id) => *id,
60 EventLogOwner::Folder(_, folder) => folder.row_id,
61 }
62 }
63}
64
65#[cfg(feature = "files")]
66use sos_core::events::FileEvent;
67use tokio_stream::wrappers::ReceiverStream;
68
69pub type AccountEventLog<E> = DatabaseEventLog<AccountEvent, E>;
71
72pub type DeviceEventLog<E> = DatabaseEventLog<DeviceEvent, E>;
74
75pub type FolderEventLog<E> = DatabaseEventLog<WriteEvent, E>;
77
78#[cfg(feature = "files")]
80pub type FileEventLog<E> = DatabaseEventLog<FileEvent, E>;
81
82pub struct DatabaseEventLog<T, E>
84where
85 T: Default + Encodable + Decodable + Send + Sync,
86 E: std::error::Error
87 + std::fmt::Debug
88 + From<sos_core::Error>
89 + From<crate::Error>
90 + From<std::io::Error>
91 + Send
92 + Sync
93 + 'static,
94{
95 owner: EventLogOwner,
96 client: Client,
97 log_type: EventLogType,
98 tree: CommitTree,
99 marker: std::marker::PhantomData<(T, E)>,
100}
101
102impl<T, E> DatabaseEventLog<T, E>
103where
104 T: Default + Encodable + Decodable + Send + Sync,
105 E: std::error::Error
106 + std::fmt::Debug
107 + From<sos_core::Error>
108 + From<crate::Error>
109 + From<std::io::Error>
110 + Send
111 + Sync
112 + 'static,
113{
114 pub fn with_new_client(
120 &self,
121 client: Client,
122 owner: Option<EventLogOwner>,
123 ) -> Self {
124 Self {
125 owner: owner.unwrap_or_else(|| self.owner.clone()),
126 client,
127 log_type: self.log_type,
128 tree: CommitTree::new(),
129 marker: std::marker::PhantomData,
130 }
131 }
132
133 async fn lookup_owner(
135 client: &Client,
136 account_id: &AccountId,
137 log_type: &EventLogType,
138 ) -> Result<EventLogOwner, Error> {
139 let account_id = *account_id;
140 let log_type = *log_type;
141 let result = client
142 .conn_and_then(move |conn| {
143 let account = AccountEntity::new(&conn);
144 let account_row = account.find_one(&account_id)?;
145 match log_type {
146 EventLogType::Folder(folder_id) => {
147 let folder = FolderEntity::new(&conn);
148 let folder_row = folder.find_one(&folder_id)?;
149 Ok::<_, Error>((account_row, Some(folder_row)))
150 }
151 _ => Ok::<_, Error>((account_row, None)),
152 }
153 })
154 .await?;
155
156 Ok(match result {
157 (account_row, None) => {
158 EventLogOwner::Account(account_id, account_row.row_id)
159 }
160 (_, Some(folder_row)) => EventLogOwner::Folder(
161 account_id,
162 FolderRecord::from_row(folder_row).await?,
163 ),
164 })
165 }
166
167 async fn insert_records(
168 &mut self,
169 records: &[EventRecord],
170 delete_before: bool,
171 ) -> Result<(), E> {
172 if records.is_empty() {
173 return Ok(());
174 }
175
176 let mut span = CommitSpan {
177 before: self.tree.last_commit(),
178 after: None,
179 };
180
181 let log_type = self.log_type;
182 let mut insert_rows = Vec::new();
183 let mut commits = Vec::new();
184 for record in records {
185 commits.push(*record.commit());
186 insert_rows.push(EventRecordRow::new(record)?);
187 }
188
189 let id = (&self.owner).into();
190
191 self.client
193 .conn_mut(move |conn| {
194 let tx = conn.transaction()?;
195 let events = EventEntity::new(&tx);
196 if delete_before {
197 events.delete_all_events(log_type, id)?;
198 }
199 let ids = events.insert_events(
200 log_type,
201 id,
202 insert_rows.as_slice(),
203 )?;
204 tx.commit()?;
205 Ok(ids)
206 })
207 .await
208 .map_err(Error::from)?;
209
210 if delete_before {
211 self.tree = CommitTree::new();
212 }
213
214 let mut hashes =
216 commits.iter().map(|c| *c.as_ref()).collect::<Vec<_>>();
217 self.tree.append(&mut hashes);
218 self.tree.commit();
219
220 span.after = self.tree.last_commit();
221
222 changes_feed().send_replace(LocalChangeEvent::AccountModified {
223 account_id: *self.owner.account_id(),
224 log_type: self.log_type,
225 commit_span: span,
226 });
227
228 Ok(())
229 }
230}
231
232impl<E> DatabaseEventLog<AccountEvent, E>
233where
234 E: std::error::Error
235 + std::fmt::Debug
236 + From<sos_core::Error>
237 + From<crate::Error>
238 + From<std::io::Error>
239 + Send
240 + Sync
241 + 'static,
242{
243 pub async fn new_account(
245 client: Client,
246 account_id: AccountId,
247 ) -> Result<Self, E> {
248 let log_type = EventLogType::Account;
249 let owner =
250 Self::lookup_owner(&client, &account_id, &log_type).await?;
251 Ok(Self {
252 owner,
253 client,
254 log_type,
255 tree: CommitTree::new(),
256 marker: std::marker::PhantomData,
257 })
258 }
259}
260
261impl<E> DatabaseEventLog<WriteEvent, E>
262where
263 E: std::error::Error
264 + std::fmt::Debug
265 + From<sos_core::Error>
266 + From<crate::Error>
267 + From<std::io::Error>
268 + Send
269 + Sync
270 + 'static,
271{
272 pub async fn new_folder(
274 client: Client,
275 account_id: AccountId,
276 folder_id: VaultId,
277 ) -> Result<Self, E> {
278 let log_type = EventLogType::Folder(folder_id);
279 let owner =
280 Self::lookup_owner(&client, &account_id, &log_type).await?;
281
282 Ok(Self {
283 owner,
284 client,
285 log_type,
286 tree: CommitTree::new(),
287 marker: std::marker::PhantomData,
288 })
289 }
290}
291
292impl<E> DatabaseEventLog<DeviceEvent, E>
293where
294 E: std::error::Error
295 + std::fmt::Debug
296 + From<sos_core::Error>
297 + From<crate::Error>
298 + From<std::io::Error>
299 + Send
300 + Sync
301 + 'static,
302{
303 pub async fn new_device(
305 client: Client,
306 account_id: AccountId,
307 ) -> Result<Self, E> {
308 let log_type = EventLogType::Device;
309 let owner =
310 Self::lookup_owner(&client, &account_id, &log_type).await?;
311 Ok(Self {
312 owner,
313 client,
314 log_type,
315 tree: CommitTree::new(),
316 marker: std::marker::PhantomData,
317 })
318 }
319}
320
321#[cfg(feature = "files")]
322impl<E> DatabaseEventLog<FileEvent, E>
323where
324 E: std::error::Error
325 + std::fmt::Debug
326 + From<sos_core::Error>
327 + From<crate::Error>
328 + From<std::io::Error>
329 + Send
330 + Sync
331 + 'static,
332{
333 pub async fn new_file(
335 client: Client,
336 account_id: AccountId,
337 ) -> Result<Self, Error> {
338 let log_type = EventLogType::Files;
339 let owner =
340 Self::lookup_owner(&client, &account_id, &log_type).await?;
341 Ok(Self {
342 owner,
343 client,
344 log_type,
345 tree: CommitTree::new(),
346 marker: std::marker::PhantomData,
347 })
348 }
349}
350
351#[async_trait]
352impl<T, E> EventLog<T> for DatabaseEventLog<T, E>
353where
354 T: Default + Encodable + Decodable + Send + Sync + 'static,
355 E: std::error::Error
356 + std::fmt::Debug
357 + From<sos_core::Error>
358 + From<crate::Error>
359 + From<std::io::Error>
360 + Send
361 + Sync
362 + 'static,
363{
364 type Error = E;
365
366 async fn record_stream(
367 &self,
368 reverse: bool,
369 ) -> BoxStream<'async_trait, Result<EventRecord, Self::Error>> {
370 let (tx, rx) = tokio::sync::mpsc::channel(8);
371
372 let id: i64 = (&self.owner).into();
373 let log_type = self.log_type;
374 let client = self.client.clone();
375
376 tokio::spawn(async move {
377 client
378 .conn_and_then(move |conn| {
379 let query =
380 EventEntity::find_all_query(log_type, reverse);
381
382 let mut stmt = conn.prepare_cached(&query.as_string())?;
383
384 fn convert_row(
385 row: &Row<'_>,
386 ) -> Result<EventRecordRow, crate::Error>
387 {
388 Ok(row.try_into()?)
389 }
390
391 let rows = stmt.query_and_then([id], |row| {
392 convert_row(row)
393 })?;
394
395 for row in rows {
396 if tx.is_closed() {
397 break;
398 }
399 let row = row?;
400 let record: EventRecord = row.try_into()?;
401 let inner_tx = tx.clone();
402 let res = futures::executor::block_on(async move {
403 inner_tx.send(Ok(record)).await
404 });
405 if let Err(e) = res {
406 tracing::error!(error = %e);
407 break;
408 }
409 }
410
411 Ok::<_, Error>(())
412 })
413 .await?;
414 Ok::<_, Error>(())
415 });
416
417 ReceiverStream::new(rx).boxed()
418 }
419
420 async fn event_stream(
421 &self,
422 reverse: bool,
423 ) -> BoxStream<'async_trait, Result<(EventRecord, T), Self::Error>> {
424 self.record_stream(reverse)
425 .await
426 .try_filter_map(|record| async {
427 let event = record.decode_event::<T>().await?;
428 Ok(Some((record, event)))
429 })
430 .boxed()
431 }
432
433 async fn diff_checked(
434 &self,
435 commit: Option<CommitHash>,
436 checkpoint: CommitProof,
437 ) -> Result<Diff<T>, Self::Error> {
438 let patch = self.diff_events(commit.as_ref()).await?;
439 Ok(Diff::<T> {
440 last_commit: commit,
441 patch,
442 checkpoint,
443 })
444 }
445
446 async fn diff_unchecked(&self) -> Result<Diff<T>, Self::Error> {
447 let patch = self.diff_events(None).await?;
448 Ok(Diff::<T> {
449 last_commit: None,
450 patch,
451 checkpoint: self.tree().head()?,
452 })
453 }
454
455 async fn diff_events(
456 &self,
457 commit: Option<&CommitHash>,
458 ) -> Result<Patch<T>, Self::Error> {
459 let records = self.diff_records(commit).await?;
460 Ok(Patch::new(records))
461 }
462
463 fn tree(&self) -> &CommitTree {
464 &self.tree
465 }
466
467 async fn rewind(
468 &mut self,
469 commit: &CommitHash,
470 ) -> Result<Vec<EventRecord>, Self::Error> {
471 let (records, tree) = {
472 let stream = self.record_stream(true).await;
473 pin_mut!(stream);
474
475 let mut records = Vec::new();
476 let mut tree = CommitTree::new();
477 let mut new_len = 0;
478
479 while let Some(record) = stream.next().await {
480 let record = record?;
481 if record.commit() == commit {
482 let mut leaves = self.tree().leaves().unwrap_or_default();
483 new_len = leaves.len() - records.len();
484 leaves.truncate(new_len);
485
486 tree.append(&mut leaves);
487 tree.commit();
488
489 break;
490 }
491 records.push(record);
492 }
493
494 if new_len == 0 {
495 return Err(Error::CommitNotFound(*commit).into());
496 }
497
498 (records, tree)
499 };
500
501 let delete_ids =
502 records.iter().map(|r| *r.commit()).collect::<Vec<_>>();
503
504 let log_type = self.log_type;
506 self.client
507 .conn_mut(move |conn| {
508 let tx = conn.transaction()?;
509 let events = EventEntity::new(&tx);
510 for id in delete_ids {
511 events.delete_one(log_type, &id)?;
512 }
513 tx.commit()?;
514 Ok(())
515 })
516 .await
517 .map_err(Error::from)?;
518
519 self.tree = tree;
521
522 Ok(records)
523 }
524
525 async fn load_tree(&mut self) -> Result<(), Self::Error> {
526 let log_type = self.log_type;
527 let id = (&self.owner).into();
528 let commits = self
529 .client
530 .conn_and_then(move |conn| {
531 let events = EventEntity::new(&conn);
532 let commits = events.load_commits(log_type, id)?;
533 Ok::<_, Error>(commits)
534 })
535 .await?;
536 let mut tree = CommitTree::new();
537 for commit in commits {
538 let record: CommitRecord = commit.try_into()?;
539 tree.insert(*record.commit_hash.as_ref());
540 }
541 tree.commit();
542 self.tree = tree;
543 Ok(())
544 }
545
546 async fn clear(&mut self) -> Result<(), Self::Error> {
547 let log_type = self.log_type;
548 let id = (&self.owner).into();
549 self.client
550 .conn_mut(move |conn| {
551 let tx = conn.transaction()?;
552 let events = EventEntity::new(&tx);
553 events.delete_all_events(log_type, id)?;
554 tx.commit()?;
555 Ok(())
556 })
557 .await
558 .map_err(Error::from)?;
559 self.tree = CommitTree::new();
560 Ok(())
561 }
562
563 async fn apply(&mut self, events: &[T]) -> Result<(), Self::Error> {
564 let mut records = Vec::with_capacity(events.len());
565 for event in events {
566 records.push(EventRecord::encode_event(event).await?);
567 }
568 self.apply_records(records).await
569 }
570
571 async fn apply_records(
572 &mut self,
573 records: Vec<EventRecord>,
574 ) -> Result<(), Self::Error> {
575 self.insert_records(records.as_slice(), false).await
576 }
577
578 async fn patch_checked(
579 &mut self,
580 commit_proof: &CommitProof,
581 patch: &Patch<T>,
582 ) -> Result<CheckedPatch, Self::Error> {
583 let comparison = self.tree().compare(commit_proof)?;
584 match comparison {
585 Comparison::Equal => {
586 self.patch_unchecked(patch).await?;
587 let proof = self.tree().head()?;
588 Ok(CheckedPatch::Success(proof))
589 }
590 Comparison::Contains(indices) => {
591 let head = self.tree().head()?;
592 let contains = self.tree().proof(&indices)?;
593 Ok(CheckedPatch::Conflict {
594 head,
595 contains: Some(contains),
596 })
597 }
598 Comparison::Unknown => {
599 let head = self.tree().head()?;
600 Ok(CheckedPatch::Conflict {
601 head,
602 contains: None,
603 })
604 }
605 }
606 }
607
608 async fn replace_all_events(
609 &mut self,
610 diff: &Diff<T>,
611 ) -> Result<(), Self::Error> {
612 self.insert_records(diff.patch.records(), true).await?;
613
614 let computed = self.tree().head()?;
615 let verified = computed == diff.checkpoint;
616 if !verified {
617 return Err(Error::CheckpointVerification {
618 checkpoint: diff.checkpoint.root,
619 computed: computed.root,
620 }
621 .into());
622 }
623
624 Ok(())
625 }
626
627 async fn patch_unchecked(
628 &mut self,
629 patch: &Patch<T>,
630 ) -> Result<(), Self::Error> {
631 self.apply_records(patch.records().to_vec()).await
632 }
633
634 async fn diff_records(
635 &self,
636 commit: Option<&CommitHash>,
637 ) -> Result<Vec<EventRecord>, Self::Error> {
638 let mut events = Vec::new();
639
640 let stream = self.record_stream(true).await;
641 pin_mut!(stream);
642
643 while let Some(record) = stream.next().await {
644 let record = record?;
645 if let Some(commit) = commit {
646 if record.commit() == commit {
647 return Ok(events);
648 }
649 }
650 events.insert(0, record);
655 }
656
657 if let Some(commit) = commit {
661 return Err(Error::CommitNotFound(*commit).into());
662 }
663
664 Ok(events)
665 }
666
667 fn version(&self) -> u16 {
668 match &self.owner {
669 EventLogOwner::Folder(_, folder) => *folder.summary.version(),
670 EventLogOwner::Account(_, _) => VERSION1,
671 }
672 }
673}