1use crate::{
17 formats::{
18 read_file_identity_bytes, EventLogRecord, FileItem, FormatStream,
19 FormatStreamIterator,
20 },
21 Error, Result,
22};
23use async_fd_lock::{LockRead, LockWrite};
24use async_trait::async_trait;
25use binary_stream::futures::{BinaryReader, Decodable, Encodable};
26use futures::{stream::BoxStream, StreamExt, TryStreamExt};
27use sos_core::{
28 commit::{CommitHash, CommitProof, CommitSpan, CommitTree, Comparison},
29 encode,
30 encoding::{encoding_options, VERSION1},
31 events::{
32 changes_feed,
33 patch::{CheckedPatch, Diff, Patch},
34 AccountEvent, DeviceEvent, EventLogType, EventRecord,
35 LocalChangeEvent, WriteEvent,
36 },
37 AccountId,
38};
39use sos_vfs::{self as vfs, File, OpenOptions};
40use std::result::Result as StdResult;
41use std::{
42 io::{Cursor, SeekFrom},
43 path::{Path, PathBuf},
44};
45use tokio::{
46 io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader},
47 sync::mpsc,
48};
49use tokio_stream::wrappers::ReceiverStream;
50
51#[cfg(feature = "files")]
52use sos_core::events::FileEvent;
53
54pub use sos_core::events::EventLog;
55
56pub type AccountEventLog<E> = FileSystemEventLog<AccountEvent, E>;
58
59pub type DeviceEventLog<E> = FileSystemEventLog<DeviceEvent, E>;
61
62pub type FolderEventLog<E> = FileSystemEventLog<WriteEvent, E>;
64
65#[cfg(feature = "files")]
67pub type FileEventLog<E> = FileSystemEventLog<FileEvent, E>;
68
69type Iter = Box<dyn FormatStreamIterator<EventLogRecord> + Send + Sync>;
71
72async fn read_event_buffer(
75 file_path: impl AsRef<Path>,
76 record: &EventLogRecord,
77) -> Result<Vec<u8>> {
78 let file = File::open(file_path.as_ref()).await?;
79 let mut guard = file.lock_read().await.map_err(|e| e.error)?;
80
81 let offset = record.value();
82 let row_len = offset.end - offset.start;
83
84 guard.seek(SeekFrom::Start(offset.start)).await?;
85
86 let mut buf = vec![0u8; row_len as usize];
87 guard.read_exact(&mut buf).await?;
88
89 Ok(buf)
90}
91
92pub struct FileSystemEventLog<T, E>
98where
99 T: Default + Encodable + Decodable + Send + Sync,
100 E: std::error::Error
101 + std::fmt::Debug
102 + From<sos_core::Error>
103 + From<crate::Error>
104 + From<std::io::Error>
105 + Send
106 + Sync
107 + 'static,
108{
109 account_id: AccountId,
110 log_type: EventLogType,
111 tree: CommitTree,
112 data: PathBuf,
113 identity: &'static [u8],
114 version: Option<u16>,
115 phantom: std::marker::PhantomData<(T, E)>,
116}
117
118#[async_trait]
119impl<T, E> EventLog<T> for FileSystemEventLog<T, E>
120where
121 T: Default + Encodable + Decodable + Send + Sync + 'static,
122 E: std::error::Error
123 + std::fmt::Debug
124 + From<sos_core::Error>
125 + From<crate::Error>
126 + From<std::io::Error>
127 + Send
128 + Sync
129 + 'static,
130{
131 type Error = E;
132
133 async fn record_stream(
134 &self,
135 reverse: bool,
136 ) -> BoxStream<'async_trait, StdResult<EventRecord, Self::Error>> {
137 let (tx, rx) =
138 mpsc::channel::<StdResult<EventRecord, Self::Error>>(8);
139
140 let mut it =
141 self.iter(reverse).await.expect("to initialize iterator");
142 let file_path = self.data.clone();
143 tokio::task::spawn(async move {
144 while let Some(record) = it.next().await? {
145 if tx.is_closed() {
146 break;
147 }
148 let event_buffer =
149 read_event_buffer(file_path.clone(), &record).await?;
150 let event_record = record.into_event_record(event_buffer);
151 if let Err(e) = tx.send(Ok(event_record)).await {
152 tracing::error!(error = %e);
153 }
154 }
155 Ok::<_, Self::Error>(())
156 });
157
158 ReceiverStream::new(rx).boxed()
159 }
160
161 async fn event_stream(
162 &self,
163 reverse: bool,
164 ) -> BoxStream<'async_trait, StdResult<(EventRecord, T), Self::Error>>
165 {
166 self.record_stream(reverse)
167 .await
168 .try_filter_map(|record| async {
169 let event = record.decode_event::<T>().await?;
170 Ok(Some((record, event)))
171 })
172 .boxed()
173 }
174
175 async fn diff_checked(
176 &self,
177 commit: Option<CommitHash>,
178 checkpoint: CommitProof,
179 ) -> StdResult<Diff<T>, Self::Error> {
180 let patch = self.diff_events(commit.as_ref()).await?;
181 Ok(Diff::<T> {
182 last_commit: commit,
183 patch,
184 checkpoint,
185 })
186 }
187
188 async fn diff_unchecked(&self) -> StdResult<Diff<T>, Self::Error> {
189 let patch = self.diff_events(None).await?;
190 Ok(Diff::<T> {
191 last_commit: None,
192 patch,
193 checkpoint: self.tree().head()?,
194 })
195 }
196
197 async fn diff_events(
198 &self,
199 commit: Option<&CommitHash>,
200 ) -> StdResult<Patch<T>, Self::Error> {
201 let records = self.diff_records(commit).await?;
202 Ok(Patch::new(records))
203 }
204
205 fn tree(&self) -> &CommitTree {
206 &self.tree
207 }
208
209 async fn rewind(
210 &mut self,
211 commit: &CommitHash,
212 ) -> StdResult<Vec<EventRecord>, Self::Error> {
213 let mut length = vfs::metadata(&self.data).await?.len();
214 let mut it = self.iter(true).await?;
216
217 tracing::trace!(length = %length, "event_log::rewind");
218
219 let mut records = Vec::new();
220
221 while let Some(record) = it.next().await? {
222 if &record.commit() == commit.as_ref() {
224 let mut leaves = self.tree().leaves().unwrap_or_default();
226 if leaves.len() > records.len() {
227 let new_len = leaves.len() - records.len();
228 leaves.truncate(new_len);
229 let mut tree = CommitTree::new();
230 tree.append(&mut leaves);
231 tree.commit();
232 self.tree = tree;
233 } else {
234 return Err(Error::RewindLeavesLength.into());
235 }
236
237 let file =
239 OpenOptions::new().write(true).open(&self.data).await?;
240 let mut guard =
241 file.lock_write().await.map_err(|e| e.error)?;
242 guard.inner_mut().set_len(length).await?;
243
244 return Ok(records);
245 }
246
247 let byte_length = record.byte_length();
249
250 if byte_length < length {
251 length -= byte_length;
252 }
253
254 let event_buffer = read_event_buffer(&self.data, &record).await?;
255
256 let event_record = record.into_event_record(event_buffer);
257 records.push(event_record);
258
259 tracing::trace!(
260 length = %length,
261 byte_length = %byte_length,
262 num_pruned = %records.len(),
263 "event_log::rewind",
264 );
265 }
266
267 Err(Error::CommitNotFound(*commit).into())
268 }
269
270 async fn load_tree(&mut self) -> StdResult<(), Self::Error> {
271 let mut commits = Vec::new();
272
273 let mut it = self.iter(false).await?;
274 while let Some(record) = it.next().await? {
275 commits.push(record.commit());
276 }
277
278 self.tree = CommitTree::new();
279 self.tree.append(&mut commits);
280 self.tree.commit();
281 Ok(())
282 }
283
284 async fn clear(&mut self) -> StdResult<(), Self::Error> {
285 self.truncate().await?;
286 self.tree = CommitTree::new();
287 Ok(())
288 }
289
290 async fn apply(&mut self, events: &[T]) -> StdResult<(), Self::Error> {
291 let mut records = Vec::with_capacity(events.len());
292 for event in events {
293 records.push(EventRecord::encode_event(event).await?);
294 }
295 self.apply_records(records).await
296 }
297
298 async fn apply_records(
299 &mut self,
300 records: Vec<EventRecord>,
301 ) -> StdResult<(), Self::Error> {
302 if records.is_empty() {
303 return Ok(());
304 }
305
306 let mut span = CommitSpan {
307 before: self.tree.last_commit(),
308 after: None,
309 };
310
311 let mut buffer: Vec<u8> = Vec::new();
312 let mut commits = Vec::new();
313 let mut last_commit_hash = self.tree().last_commit();
314
315 for mut record in records {
316 record.set_last_commit(last_commit_hash);
317 let mut buf = encode(&record).await?;
318 buffer.append(&mut buf);
319 last_commit_hash = Some(*record.commit());
320 commits.push(*record.commit());
321 }
322
323 #[allow(unused_mut)]
324 let mut file = OpenOptions::new()
325 .read(true)
329 .write(true)
330 .append(true)
331 .open(&self.data)
332 .await?;
333
334 #[cfg(target_arch = "wasm32")]
335 {
336 use tokio::io::AsyncSeekExt;
337 file.seek(SeekFrom::End(0)).await?;
338 }
339
340 let mut guard = file.lock_write().await.map_err(|e| e.error)?;
341 match guard.write_all(&buffer).await {
342 Ok(_) => {
343 guard.flush().await?;
344 let mut hashes =
345 commits.iter().map(|c| *c.as_ref()).collect::<Vec<_>>();
346 self.tree.append(&mut hashes);
347 self.tree.commit();
348
349 span.after = self.tree.last_commit();
350
351 changes_feed().send_replace(
352 LocalChangeEvent::AccountModified {
353 account_id: self.account_id,
354 log_type: self.log_type,
355 commit_span: span,
356 },
357 );
358
359 Ok(())
360 }
361 Err(e) => Err(e.into()),
362 }
363 }
364
365 async fn patch_checked(
366 &mut self,
367 commit_proof: &CommitProof,
368 patch: &Patch<T>,
369 ) -> StdResult<CheckedPatch, Self::Error> {
370 let comparison = self.tree().compare(commit_proof)?;
371
372 match comparison {
373 Comparison::Equal => {
374 self.patch_unchecked(patch).await?;
375 let proof = self.tree().head()?;
376 Ok(CheckedPatch::Success(proof))
377 }
378 Comparison::Contains(indices) => {
379 let head = self.tree().head()?;
380 let contains = self.tree().proof(&indices)?;
381 Ok(CheckedPatch::Conflict {
382 head,
383 contains: Some(contains),
384 })
385 }
386 Comparison::Unknown => {
387 let head = self.tree().head()?;
388 Ok(CheckedPatch::Conflict {
389 head,
390 contains: None,
391 })
392 }
393 }
394 }
395
396 async fn replace_all_events(
397 &mut self,
398 diff: &Diff<T>,
399 ) -> StdResult<(), Self::Error> {
400 let snapshot = self.try_create_snapshot().await?;
402
403 self.clear().await?;
405
406 self.patch_unchecked(&diff.patch).await?;
408
409 let computed = self.tree().head()?;
411 let verified = computed == diff.checkpoint;
412
413 let mut rollback_completed = false;
414 match (verified, &snapshot) {
415 (false, Some(snapshot_path)) => {
417 rollback_completed =
418 self.try_rollback_snapshot(snapshot_path).await.is_ok();
419 }
420 (true, Some(snapshot_path)) => {
422 vfs::remove_file(snapshot_path).await?;
423 }
424 _ => {}
425 }
426
427 if !verified {
428 return Err(Error::CheckpointVerification {
429 checkpoint: diff.checkpoint.root,
430 computed: computed.root,
431 snapshot,
432 rollback_completed,
433 }
434 .into());
435 }
436
437 Ok(())
438 }
439
440 async fn patch_unchecked(
441 &mut self,
442 patch: &Patch<T>,
443 ) -> StdResult<(), Self::Error> {
444 self.apply_records(patch.records().to_vec()).await
450 }
451
452 async fn diff_records(
453 &self,
454 commit: Option<&CommitHash>,
455 ) -> StdResult<Vec<EventRecord>, Self::Error> {
456 let mut events = Vec::new();
457 let mut it = self.iter(true).await?;
459 while let Some(record) = it.next().await? {
460 if let Some(commit) = commit {
461 if &record.commit() == commit.as_ref() {
462 return Ok(events);
463 }
464 }
465 let buffer = read_event_buffer(&self.data, &record).await?;
466 let event_record = record.into_event_record(buffer);
471 events.insert(0, event_record);
472 }
473
474 if let Some(commit) = commit {
478 return Err(Error::CommitNotFound(*commit).into());
479 }
480
481 Ok(events)
482 }
483
484 fn version(&self) -> u16 {
485 self.version.unwrap_or(VERSION1)
486 }
487}
488
489impl<T, E> FileSystemEventLog<T, E>
490where
491 T: Default + Encodable + Decodable + Send + Sync + 'static,
492 E: std::error::Error
493 + std::fmt::Debug
494 + From<sos_core::Error>
495 + From<crate::Error>
496 + From<std::io::Error>
497 + Send
498 + Sync
499 + 'static,
500{
501 pub fn file_path(&self) -> &PathBuf {
503 &self.data
504 }
505
506 async fn truncate(&mut self) -> StdResult<(), E> {
507 use tokio::io::{
508 AsyncSeekExt as TokioAsyncSeekExt,
509 AsyncWriteExt as TokioAsyncWriteExt,
510 };
511
512 let mut file = OpenOptions::new()
515 .write(true)
516 .truncate(true)
517 .open(&self.data)
518 .await?;
519
520 file.seek(SeekFrom::Start(0)).await?;
521
522 let mut guard = file.lock_write().await.map_err(|e| e.error)?;
523 guard.write_all(self.identity).await?;
524 if let Some(version) = self.version {
525 guard.write_all(&version.to_le_bytes()).await?;
526 }
527 guard.flush().await?;
528
529 Ok(())
530 }
531
532 #[doc(hidden)]
534 pub async fn decode_event(
535 &self,
536 item: &EventLogRecord,
537 ) -> StdResult<T, E> {
538 let value = item.value();
539
540 let file = File::open(&self.data).await?;
541 let mut guard = file.lock_read().await.map_err(|e| e.error)?;
542
543 guard.seek(SeekFrom::Start(value.start)).await?;
544 let mut buffer = vec![0; (value.end - value.start) as usize];
545 guard.read_exact(buffer.as_mut_slice()).await?;
546
547 let mut stream = BufReader::new(Cursor::new(&mut buffer));
548 let mut reader = BinaryReader::new(&mut stream, encoding_options());
549 let mut event: T = Default::default();
550 event.decode(&mut reader).await?;
551 Ok(event)
552 }
553
554 pub async fn iter(&self, reverse: bool) -> StdResult<Iter, E> {
556 let content_offset = self.header_len() as u64;
557 let read_stream = File::open(&self.data).await?;
558 let it: Iter = Box::new(
559 FormatStream::<EventLogRecord, File>::new_file(
560 read_stream,
561 self.identity,
562 true,
563 Some(content_offset),
564 reverse,
565 )
566 .await?,
567 );
568 Ok(it)
569 }
570
571 #[doc(hidden)]
574 fn header_len(&self) -> usize {
575 let mut len = self.identity.len();
576 if self.version.is_some() {
577 len += (u16::BITS / 8) as usize;
578 }
579 len
580 }
581
582 async fn initialize_event_log<P: AsRef<Path>>(
612 path: P,
613 identity: &'static [u8],
614 encoding_version: Option<u16>,
615 ) -> StdResult<(), E> {
616 let file = OpenOptions::new()
617 .create(true)
618 .write(true)
619 .open(path.as_ref())
620 .await?;
621
622 let size = vfs::metadata(path.as_ref()).await?.len();
623 if size == 0 {
624 let mut guard = file.lock_write().await.map_err(|e| e.error)?;
625 let mut header = identity.to_vec();
626 if let Some(version) = encoding_version {
627 header.extend_from_slice(&version.to_le_bytes());
628 }
629 guard.write_all(&header).await?;
630 guard.flush().await?;
631 }
632
633 Ok(())
634 }
635
636 #[doc(hidden)]
637 async fn try_create_snapshot(&self) -> StdResult<Option<PathBuf>, E> {
638 if let Some(root) = self.tree().root() {
639 let mut snapshot_path = self.data.clone();
640 snapshot_path.set_extension(&format!("snapshot-{}", root));
641
642 let metadata = vfs::metadata(&self.data).await?;
643 tracing::debug!(
644 num_events = %self.tree().len(),
645 file_size = %metadata.len(),
646 source = %self.data.display(),
647 snapshot = %snapshot_path.display(),
648 "event_log::snapshot::create"
649 );
650
651 vfs::copy(&self.data, &snapshot_path).await?;
652 Ok(Some(snapshot_path))
653 } else {
654 Ok(None)
655 }
656 }
657
658 #[doc(hidden)]
659 async fn try_rollback_snapshot(
660 &mut self,
661 snapshot_path: &PathBuf,
662 ) -> StdResult<(), E> {
663 let source_path = self.data.clone();
664
665 let metadata = vfs::metadata(snapshot_path).await?;
666 tracing::debug!(
667 file_size = %metadata.len(),
668 source = %source_path.display(),
669 snapshot = %snapshot_path.display(),
670 "event_log::snapshot::rollback"
671 );
672
673 vfs::remove_file(&source_path).await?;
674 vfs::rename(snapshot_path, &source_path).await?;
675 self.load_tree().await?;
676
677 Ok(())
678 }
679}
680
681impl<E> FileSystemEventLog<WriteEvent, E>
682where
683 E: std::error::Error
684 + std::fmt::Debug
685 + From<sos_core::Error>
686 + From<crate::Error>
687 + From<std::io::Error>
688 + Send
689 + Sync
690 + 'static,
691{
692 pub async fn new_folder<P: AsRef<Path>>(
694 path: P,
695 account_id: AccountId,
696 log_type: EventLogType,
697 ) -> StdResult<Self, E> {
698 use sos_core::constants::FOLDER_EVENT_LOG_IDENTITY;
699 Self::initialize_event_log(
703 path.as_ref(),
704 &FOLDER_EVENT_LOG_IDENTITY,
705 None,
706 )
707 .await?;
708
709 read_file_identity_bytes(path.as_ref(), &FOLDER_EVENT_LOG_IDENTITY)
710 .await?;
711
712 Ok(Self {
713 data: path.as_ref().to_path_buf(),
714 tree: Default::default(),
715 log_type,
716 account_id,
717 identity: &FOLDER_EVENT_LOG_IDENTITY,
718 version: None,
719 phantom: std::marker::PhantomData,
720 })
721 }
722}
723
724impl<E> FileSystemEventLog<AccountEvent, E>
725where
726 E: std::error::Error
727 + std::fmt::Debug
728 + From<sos_core::Error>
729 + From<crate::Error>
730 + From<std::io::Error>
731 + Send
732 + Sync
733 + 'static,
734{
735 pub async fn new_account<P: AsRef<Path>>(
737 path: P,
738 account_id: AccountId,
739 ) -> StdResult<Self, E> {
740 use sos_core::{
741 constants::ACCOUNT_EVENT_LOG_IDENTITY, encoding::VERSION,
742 };
743 Self::initialize_event_log(
744 path.as_ref(),
745 &ACCOUNT_EVENT_LOG_IDENTITY,
746 Some(VERSION),
747 )
748 .await?;
749
750 read_file_identity_bytes(path.as_ref(), &ACCOUNT_EVENT_LOG_IDENTITY)
751 .await?;
752
753 Ok(Self {
754 account_id,
755 log_type: EventLogType::Account,
756 data: path.as_ref().to_path_buf(),
757 tree: Default::default(),
758 identity: &ACCOUNT_EVENT_LOG_IDENTITY,
759 version: Some(VERSION),
760 phantom: std::marker::PhantomData,
761 })
762 }
763}
764
765impl<E> FileSystemEventLog<DeviceEvent, E>
766where
767 E: std::error::Error
768 + std::fmt::Debug
769 + From<sos_core::Error>
770 + From<crate::Error>
771 + From<std::io::Error>
772 + Send
773 + Sync
774 + 'static,
775{
776 pub async fn new_device(
778 path: impl AsRef<Path>,
779 account_id: AccountId,
780 ) -> StdResult<Self, E> {
781 use sos_core::{
782 constants::DEVICE_EVENT_LOG_IDENTITY, encoding::VERSION,
783 };
784
785 Self::initialize_event_log(
786 path.as_ref(),
787 &DEVICE_EVENT_LOG_IDENTITY,
788 Some(VERSION),
789 )
790 .await?;
791
792 read_file_identity_bytes(path.as_ref(), &DEVICE_EVENT_LOG_IDENTITY)
793 .await?;
794
795 Ok(Self {
796 log_type: EventLogType::Device,
797 account_id,
798 data: path.as_ref().to_path_buf(),
799 tree: Default::default(),
800 identity: &DEVICE_EVENT_LOG_IDENTITY,
801 version: Some(VERSION),
802 phantom: std::marker::PhantomData,
803 })
804 }
805}
806
807#[cfg(feature = "files")]
808impl<E> FileSystemEventLog<FileEvent, E>
809where
810 E: std::error::Error
811 + std::fmt::Debug
812 + From<sos_core::Error>
813 + From<crate::Error>
814 + From<std::io::Error>
815 + Send
816 + Sync
817 + 'static,
818{
819 pub async fn new_file(
821 path: impl AsRef<Path>,
822 account_id: AccountId,
823 ) -> StdResult<Self, E> {
824 use sos_core::{
825 constants::FILE_EVENT_LOG_IDENTITY, encoding::VERSION,
826 };
827
828 Self::initialize_event_log(
829 path.as_ref(),
830 &FILE_EVENT_LOG_IDENTITY,
831 Some(VERSION),
832 )
833 .await?;
834
835 read_file_identity_bytes(path.as_ref(), &FILE_EVENT_LOG_IDENTITY)
836 .await?;
837
838 Ok(Self {
839 account_id,
840 log_type: EventLogType::Files,
841 data: path.as_ref().to_path_buf(),
842 tree: Default::default(),
843 identity: &FILE_EVENT_LOG_IDENTITY,
844 version: Some(VERSION),
845 phantom: std::marker::PhantomData,
846 })
847 }
848}