1use crate::{
11 entity::{
12 AccountEntity, CommitRecord, EventEntity, EventRecordRow,
13 FolderEntity, FolderRecord,
14 },
15 Error,
16};
17use async_sqlite::{rusqlite::Row, Client};
18use async_trait::async_trait;
19use binary_stream::futures::{Decodable, Encodable};
20use futures::{
21 pin_mut,
22 stream::{BoxStream, StreamExt, TryStreamExt},
23};
24use sos_core::{
25 commit::{CommitHash, CommitProof, CommitSpan, CommitTree, Comparison},
26 encoding::VERSION1,
27 events::{
28 changes_feed,
29 patch::{CheckedPatch, Diff, Patch},
30 AccountEvent, DeviceEvent, EventLog, EventLogType, EventRecord,
31 LocalChangeEvent, WriteEvent,
32 },
33 AccountId, VaultId,
34};
35
36#[derive(Clone)]
38#[doc(hidden)]
39pub enum EventLogOwner {
40 Account(AccountId, i64),
42 Folder(AccountId, FolderRecord),
44}
45
46impl EventLogOwner {
47 pub fn account_id(&self) -> &AccountId {
49 match self {
50 EventLogOwner::Account(account_id, _) => account_id,
51 EventLogOwner::Folder(account_id, _) => account_id,
52 }
53 }
54}
55
56impl From<&EventLogOwner> for i64 {
57 fn from(value: &EventLogOwner) -> Self {
58 match value {
59 EventLogOwner::Account(_, id) => *id,
60 EventLogOwner::Folder(_, folder) => folder.row_id,
61 }
62 }
63}
64
65#[cfg(feature = "files")]
66use sos_core::events::FileEvent;
67use tokio_stream::wrappers::ReceiverStream;
68
69pub type AccountEventLog<E> = DatabaseEventLog<AccountEvent, E>;
71
72pub type DeviceEventLog<E> = DatabaseEventLog<DeviceEvent, E>;
74
75pub type FolderEventLog<E> = DatabaseEventLog<WriteEvent, E>;
77
78#[cfg(feature = "files")]
80pub type FileEventLog<E> = DatabaseEventLog<FileEvent, E>;
81
82pub struct DatabaseEventLog<T, E>
84where
85 T: Default + Encodable + Decodable + Send + Sync,
86 E: std::error::Error
87 + std::fmt::Debug
88 + From<sos_core::Error>
89 + From<crate::Error>
90 + From<std::io::Error>
91 + Send
92 + Sync
93 + 'static,
94{
95 owner: EventLogOwner,
96 client: Client,
97 log_type: EventLogType,
98 tree: CommitTree,
99 marker: std::marker::PhantomData<(T, E)>,
100}
101
102impl<T, E> DatabaseEventLog<T, E>
103where
104 T: Default + Encodable + Decodable + Send + Sync,
105 E: std::error::Error
106 + std::fmt::Debug
107 + From<sos_core::Error>
108 + From<crate::Error>
109 + From<std::io::Error>
110 + Send
111 + Sync
112 + 'static,
113{
114 pub fn with_new_client(
120 &self,
121 client: Client,
122 owner: Option<EventLogOwner>,
123 ) -> Self {
124 Self {
125 owner: owner.unwrap_or_else(|| self.owner.clone()),
126 client,
127 log_type: self.log_type,
128 tree: CommitTree::new(),
129 marker: std::marker::PhantomData,
130 }
131 }
132
133 async fn lookup_owner(
135 client: &Client,
136 account_id: &AccountId,
137 log_type: &EventLogType,
138 ) -> Result<EventLogOwner, Error> {
139 let account_id = *account_id;
140 let log_type = *log_type;
141 let result = client
142 .conn_and_then(move |conn| {
143 let account = AccountEntity::new(&conn);
144 let account_row = account.find_one(&account_id)?;
145 match log_type {
146 EventLogType::Folder(folder_id) => {
147 let folder = FolderEntity::new(&conn);
148 let folder_row = folder.find_one(&folder_id)?;
149 Ok::<_, Error>((account_row, Some(folder_row)))
150 }
151 _ => Ok::<_, Error>((account_row, None)),
152 }
153 })
154 .await?;
155
156 Ok(match result {
157 (account_row, None) => {
158 EventLogOwner::Account(account_id, account_row.row_id)
159 }
160 (_, Some(folder_row)) => EventLogOwner::Folder(
161 account_id,
162 FolderRecord::from_row(folder_row).await?,
163 ),
164 })
165 }
166
167 async fn insert_records(
168 &mut self,
169 records: &[EventRecord],
170 delete_before: bool,
171 ) -> Result<(), E> {
172 if records.is_empty() {
173 return Ok(());
174 }
175
176 let mut span = CommitSpan {
177 before: self.tree.last_commit(),
178 after: None,
179 };
180
181 let log_type = self.log_type;
182 let mut insert_rows = Vec::new();
183 let mut commits = Vec::new();
184 for record in records {
185 commits.push(*record.commit());
186 insert_rows.push(EventRecordRow::new(record)?);
187 }
188
189 let id = (&self.owner).into();
190
191 self.client
193 .conn_mut(move |conn| {
194 let tx = conn.transaction()?;
195 let events = EventEntity::new(&tx);
196 if delete_before {
197 events.delete_all_events(log_type, id)?;
198 }
199 let ids = events.insert_events(
200 log_type,
201 id,
202 insert_rows.as_slice(),
203 )?;
204 tx.commit()?;
205 Ok(ids)
206 })
207 .await
208 .map_err(Error::from)?;
209
210 if delete_before {
211 self.tree = CommitTree::new();
212 }
213
214 let mut hashes =
216 commits.iter().map(|c| *c.as_ref()).collect::<Vec<_>>();
217 self.tree.append(&mut hashes);
218 self.tree.commit();
219
220 span.after = self.tree.last_commit();
221
222 changes_feed().send_replace(LocalChangeEvent::AccountModified {
223 account_id: *self.owner.account_id(),
224 log_type: self.log_type,
225 commit_span: span,
226 });
227
228 Ok(())
229 }
230}
231
232impl<E> DatabaseEventLog<AccountEvent, E>
233where
234 E: std::error::Error
235 + std::fmt::Debug
236 + From<sos_core::Error>
237 + From<crate::Error>
238 + From<std::io::Error>
239 + Send
240 + Sync
241 + 'static,
242{
243 pub async fn new_account(
245 client: Client,
246 account_id: AccountId,
247 ) -> Result<Self, E> {
248 let log_type = EventLogType::Account;
249 let owner =
250 Self::lookup_owner(&client, &account_id, &log_type).await?;
251 Ok(Self {
252 owner,
253 client,
254 log_type,
255 tree: CommitTree::new(),
256 marker: std::marker::PhantomData,
257 })
258 }
259}
260
261impl<E> DatabaseEventLog<WriteEvent, E>
262where
263 E: std::error::Error
264 + std::fmt::Debug
265 + From<sos_core::Error>
266 + From<crate::Error>
267 + From<std::io::Error>
268 + Send
269 + Sync
270 + 'static,
271{
272 pub async fn new_folder(
274 client: Client,
275 account_id: AccountId,
276 folder_id: VaultId,
277 ) -> Result<Self, E> {
278 let log_type = EventLogType::Folder(folder_id);
279 let owner =
280 Self::lookup_owner(&client, &account_id, &log_type).await?;
281
282 Ok(Self {
283 owner,
284 client,
285 log_type,
286 tree: CommitTree::new(),
287 marker: std::marker::PhantomData,
288 })
289 }
290}
291
292impl<E> DatabaseEventLog<DeviceEvent, E>
293where
294 E: std::error::Error
295 + std::fmt::Debug
296 + From<sos_core::Error>
297 + From<crate::Error>
298 + From<std::io::Error>
299 + Send
300 + Sync
301 + 'static,
302{
303 pub async fn new_device(
305 client: Client,
306 account_id: AccountId,
307 ) -> Result<Self, E> {
308 let log_type = EventLogType::Device;
309 let owner =
310 Self::lookup_owner(&client, &account_id, &log_type).await?;
311 Ok(Self {
312 owner,
313 client,
314 log_type,
315 tree: CommitTree::new(),
316 marker: std::marker::PhantomData,
317 })
318 }
319}
320
321#[cfg(feature = "files")]
322impl<E> DatabaseEventLog<FileEvent, E>
323where
324 E: std::error::Error
325 + std::fmt::Debug
326 + From<sos_core::Error>
327 + From<crate::Error>
328 + From<std::io::Error>
329 + Send
330 + Sync
331 + 'static,
332{
333 pub async fn new_file(
335 client: Client,
336 account_id: AccountId,
337 ) -> Result<Self, Error> {
338 let log_type = EventLogType::Files;
339 let owner =
340 Self::lookup_owner(&client, &account_id, &log_type).await?;
341 Ok(Self {
342 owner,
343 client,
344 log_type,
345 tree: CommitTree::new(),
346 marker: std::marker::PhantomData,
347 })
348 }
349}
350
351#[async_trait]
352impl<T, E> EventLog<T> for DatabaseEventLog<T, E>
353where
354 T: Default + Encodable + Decodable + Send + Sync + 'static,
355 E: std::error::Error
356 + std::fmt::Debug
357 + From<sos_core::Error>
358 + From<crate::Error>
359 + From<std::io::Error>
360 + Send
361 + Sync
362 + 'static,
363{
364 type Error = E;
365
366 async fn record_stream(
367 &self,
368 reverse: bool,
369 ) -> BoxStream<'async_trait, Result<EventRecord, Self::Error>> {
370 let (tx, rx) = tokio::sync::mpsc::channel(8);
371
372 let id: i64 = (&self.owner).into();
373 let log_type = self.log_type;
374 let client = self.client.clone();
375
376 tokio::spawn(async move {
377 client
378 .conn_and_then(move |conn| {
379 let query =
380 EventEntity::find_all_query(log_type, reverse);
381
382 let mut stmt = conn.prepare_cached(&query.as_string())?;
383
384 fn convert_row(
385 row: &Row<'_>,
386 ) -> Result<EventRecordRow, crate::Error>
387 {
388 Ok(row.try_into()?)
389 }
390
391 let rows = stmt.query_and_then([id], convert_row)?;
392
393 for row in rows {
394 if tx.is_closed() {
395 break;
396 }
397 let row = row?;
398 let record: EventRecord = row.try_into()?;
399 let inner_tx = tx.clone();
400 let res = futures::executor::block_on(async move {
401 inner_tx.send(Ok(record)).await
402 });
403 if let Err(e) = res {
404 tracing::error!(error = %e);
405 break;
406 }
407 }
408
409 Ok::<_, Error>(())
410 })
411 .await?;
412 Ok::<_, Error>(())
413 });
414
415 ReceiverStream::new(rx).boxed()
416 }
417
418 async fn event_stream(
419 &self,
420 reverse: bool,
421 ) -> BoxStream<'async_trait, Result<(EventRecord, T), Self::Error>> {
422 self.record_stream(reverse)
423 .await
424 .try_filter_map(|record| async {
425 let event = record.decode_event::<T>().await?;
426 Ok(Some((record, event)))
427 })
428 .boxed()
429 }
430
431 async fn diff_checked(
432 &self,
433 commit: Option<CommitHash>,
434 checkpoint: CommitProof,
435 ) -> Result<Diff<T>, Self::Error> {
436 let patch = self.diff_events(commit.as_ref()).await?;
437 Ok(Diff::<T> {
438 last_commit: commit,
439 patch,
440 checkpoint,
441 })
442 }
443
444 async fn diff_unchecked(&self) -> Result<Diff<T>, Self::Error> {
445 let patch = self.diff_events(None).await?;
446 Ok(Diff::<T> {
447 last_commit: None,
448 patch,
449 checkpoint: self.tree().head()?,
450 })
451 }
452
453 async fn diff_events(
454 &self,
455 commit: Option<&CommitHash>,
456 ) -> Result<Patch<T>, Self::Error> {
457 let records = self.diff_records(commit).await?;
458 Ok(Patch::new(records))
459 }
460
461 fn tree(&self) -> &CommitTree {
462 &self.tree
463 }
464
465 async fn rewind(
466 &mut self,
467 commit: &CommitHash,
468 ) -> Result<Vec<EventRecord>, Self::Error> {
469 let (records, tree) = {
470 let stream = self.record_stream(true).await;
471 pin_mut!(stream);
472
473 let mut records = Vec::new();
474 let mut tree = CommitTree::new();
475 let mut new_len = 0;
476
477 while let Some(record) = stream.next().await {
478 let record = record?;
479 if record.commit() == commit {
480 let mut leaves = self.tree().leaves().unwrap_or_default();
481 new_len = leaves.len() - records.len();
482 leaves.truncate(new_len);
483
484 tree.append(&mut leaves);
485 tree.commit();
486
487 break;
488 }
489 records.push(record);
490 }
491
492 if new_len == 0 {
493 return Err(Error::CommitNotFound(*commit).into());
494 }
495
496 (records, tree)
497 };
498
499 let delete_ids =
500 records.iter().map(|r| *r.commit()).collect::<Vec<_>>();
501
502 let log_type = self.log_type;
504 self.client
505 .conn_mut(move |conn| {
506 let tx = conn.transaction()?;
507 let events = EventEntity::new(&tx);
508 for id in delete_ids {
509 events.delete_one(log_type, &id)?;
510 }
511 tx.commit()?;
512 Ok(())
513 })
514 .await
515 .map_err(Error::from)?;
516
517 self.tree = tree;
519
520 Ok(records)
521 }
522
523 async fn load_tree(&mut self) -> Result<(), Self::Error> {
524 let log_type = self.log_type;
525 let id = (&self.owner).into();
526 let commits = self
527 .client
528 .conn_and_then(move |conn| {
529 let events = EventEntity::new(&conn);
530 let commits = events.load_commits(log_type, id)?;
531 Ok::<_, Error>(commits)
532 })
533 .await?;
534 let mut tree = CommitTree::new();
535 for commit in commits {
536 let record: CommitRecord = commit.try_into()?;
537 tree.insert(*record.commit_hash.as_ref());
538 }
539 tree.commit();
540 self.tree = tree;
541 Ok(())
542 }
543
544 async fn clear(&mut self) -> Result<(), Self::Error> {
545 let log_type = self.log_type;
546 let id = (&self.owner).into();
547 self.client
548 .conn_mut(move |conn| {
549 let tx = conn.transaction()?;
550 let events = EventEntity::new(&tx);
551 events.delete_all_events(log_type, id)?;
552 tx.commit()?;
553 Ok(())
554 })
555 .await
556 .map_err(Error::from)?;
557 self.tree = CommitTree::new();
558 Ok(())
559 }
560
561 async fn apply(&mut self, events: &[T]) -> Result<(), Self::Error> {
562 let mut records = Vec::with_capacity(events.len());
563 for event in events {
564 records.push(EventRecord::encode_event(event).await?);
565 }
566 self.apply_records(records).await
567 }
568
569 async fn apply_records(
570 &mut self,
571 records: Vec<EventRecord>,
572 ) -> Result<(), Self::Error> {
573 self.insert_records(records.as_slice(), false).await
574 }
575
576 async fn patch_checked(
577 &mut self,
578 commit_proof: &CommitProof,
579 patch: &Patch<T>,
580 ) -> Result<CheckedPatch, Self::Error> {
581 let comparison = self.tree().compare(commit_proof)?;
582 match comparison {
583 Comparison::Equal => {
584 self.patch_unchecked(patch).await?;
585 let proof = self.tree().head()?;
586 Ok(CheckedPatch::Success(proof))
587 }
588 Comparison::Contains(indices) => {
589 let head = self.tree().head()?;
590 let contains = self.tree().proof(&indices)?;
591 Ok(CheckedPatch::Conflict {
592 head,
593 contains: Some(contains),
594 })
595 }
596 Comparison::Unknown => {
597 let head = self.tree().head()?;
598 Ok(CheckedPatch::Conflict {
599 head,
600 contains: None,
601 })
602 }
603 }
604 }
605
606 async fn replace_all_events(
607 &mut self,
608 diff: &Diff<T>,
609 ) -> Result<(), Self::Error> {
610 self.insert_records(diff.patch.records(), true).await?;
611
612 let computed = self.tree().head()?;
613 let verified = computed == diff.checkpoint;
614 if !verified {
615 return Err(Error::CheckpointVerification {
616 checkpoint: diff.checkpoint.root,
617 computed: computed.root,
618 }
619 .into());
620 }
621
622 Ok(())
623 }
624
625 async fn patch_unchecked(
626 &mut self,
627 patch: &Patch<T>,
628 ) -> Result<(), Self::Error> {
629 self.apply_records(patch.records().to_vec()).await
630 }
631
632 async fn diff_records(
633 &self,
634 commit: Option<&CommitHash>,
635 ) -> Result<Vec<EventRecord>, Self::Error> {
636 let mut events = Vec::new();
637
638 let stream = self.record_stream(true).await;
639 pin_mut!(stream);
640
641 while let Some(record) = stream.next().await {
642 let record = record?;
643 if let Some(commit) = commit {
644 if record.commit() == commit {
645 return Ok(events);
646 }
647 }
648 events.insert(0, record);
653 }
654
655 if let Some(commit) = commit {
659 return Err(Error::CommitNotFound(*commit).into());
660 }
661
662 Ok(events)
663 }
664
665 fn version(&self) -> u16 {
666 match &self.owner {
667 EventLogOwner::Folder(_, folder) => *folder.summary.version(),
668 EventLogOwner::Account(_, _) => VERSION1,
669 }
670 }
671}