1use crate::{
17 formats::{
18 read_file_identity_bytes, EventLogRecord, FileItem, FormatStream,
19 FormatStreamIterator,
20 },
21 Error, Result,
22};
23use async_fd_lock::{LockRead, LockWrite};
24use async_trait::async_trait;
25use binary_stream::futures::{BinaryReader, Decodable, Encodable};
26use futures::{stream::BoxStream, StreamExt, TryStreamExt};
27use sos_core::{
28 commit::{CommitHash, CommitProof, CommitTree, Comparison},
29 encode,
30 encoding::{encoding_options, VERSION1},
31 events::{
32 patch::{CheckedPatch, Diff, Patch},
33 AccountEvent, DeviceEvent, EventRecord, WriteEvent,
34 },
35};
36use sos_vfs::{self as vfs, File, OpenOptions};
37use std::result::Result as StdResult;
38use std::{
39 io::{Cursor, SeekFrom},
40 path::{Path, PathBuf},
41};
42use tokio::{
43 io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader},
44 sync::mpsc,
45};
46use tokio_stream::wrappers::ReceiverStream;
47
48#[cfg(feature = "files")]
49use sos_core::events::FileEvent;
50
51pub use sos_core::events::EventLog;
52
53pub type AccountEventLog<E> = FileSystemEventLog<AccountEvent, E>;
55
56pub type DeviceEventLog<E> = FileSystemEventLog<DeviceEvent, E>;
58
59pub type FolderEventLog<E> = FileSystemEventLog<WriteEvent, E>;
61
62#[cfg(feature = "files")]
64pub type FileEventLog<E> = FileSystemEventLog<FileEvent, E>;
65
66type Iter = Box<dyn FormatStreamIterator<EventLogRecord> + Send + Sync>;
68
69async fn read_event_buffer(
72 file_path: impl AsRef<Path>,
73 record: &EventLogRecord,
74) -> Result<Vec<u8>> {
75 let file = File::open(file_path.as_ref()).await?;
76 let mut guard = file.lock_read().await.map_err(|e| e.error)?;
77
78 let offset = record.value();
79 let row_len = offset.end - offset.start;
80
81 guard.seek(SeekFrom::Start(offset.start)).await?;
82
83 let mut buf = vec![0u8; row_len as usize];
84 guard.read_exact(&mut buf).await?;
85
86 Ok(buf)
87}
88
89pub struct FileSystemEventLog<T, E>
95where
96 T: Default + Encodable + Decodable + Send + Sync,
97 E: std::error::Error
98 + std::fmt::Debug
99 + From<sos_core::Error>
100 + From<crate::Error>
101 + From<std::io::Error>
102 + Send
103 + Sync
104 + 'static,
105{
106 tree: CommitTree,
107 data: PathBuf,
108 identity: &'static [u8],
109 version: Option<u16>,
110 phantom: std::marker::PhantomData<(T, E)>,
111}
112
113#[async_trait]
114impl<T, E> EventLog<T> for FileSystemEventLog<T, E>
115where
116 T: Default + Encodable + Decodable + Send + Sync + 'static,
117 E: std::error::Error
118 + std::fmt::Debug
119 + From<sos_core::Error>
120 + From<crate::Error>
121 + From<std::io::Error>
122 + Send
123 + Sync
124 + 'static,
125{
126 type Error = E;
127
128 async fn record_stream(
129 &self,
130 reverse: bool,
131 ) -> BoxStream<'async_trait, StdResult<EventRecord, Self::Error>> {
132 let (tx, rx) =
133 mpsc::channel::<StdResult<EventRecord, Self::Error>>(8);
134
135 let mut it =
136 self.iter(reverse).await.expect("to initialize iterator");
137 let file_path = self.data.clone();
138 tokio::task::spawn(async move {
139 while let Some(record) = it.next().await? {
140 let event_buffer =
141 read_event_buffer(file_path.clone(), &record).await?;
142 let event_record = record.into_event_record(event_buffer);
143 if let Err(e) = tx.send(Ok(event_record)).await {
144 tracing::error!(error = %e);
145 }
146 }
147 Ok::<_, Self::Error>(())
148 });
149
150 ReceiverStream::new(rx).boxed()
151 }
152
153 async fn event_stream(
154 &self,
155 reverse: bool,
156 ) -> BoxStream<'async_trait, StdResult<(EventRecord, T), Self::Error>>
157 {
158 self.record_stream(reverse)
159 .await
160 .try_filter_map(|record| async {
161 let event = record.decode_event::<T>().await?;
162 Ok(Some((record, event)))
163 })
164 .boxed()
165 }
166
167 async fn diff_checked(
168 &self,
169 commit: Option<CommitHash>,
170 checkpoint: CommitProof,
171 ) -> StdResult<Diff<T>, Self::Error> {
172 let patch = self.diff_events(commit.as_ref()).await?;
173 Ok(Diff::<T> {
174 last_commit: commit,
175 patch,
176 checkpoint,
177 })
178 }
179
180 async fn diff_unchecked(&self) -> StdResult<Diff<T>, Self::Error> {
181 let patch = self.diff_events(None).await?;
182 Ok(Diff::<T> {
183 last_commit: None,
184 patch,
185 checkpoint: self.tree().head()?,
186 })
187 }
188
189 async fn diff_events(
190 &self,
191 commit: Option<&CommitHash>,
192 ) -> StdResult<Patch<T>, Self::Error> {
193 let records = self.diff_records(commit).await?;
194 Ok(Patch::new(records))
195 }
196
197 fn tree(&self) -> &CommitTree {
198 &self.tree
199 }
200
201 async fn rewind(
202 &mut self,
203 commit: &CommitHash,
204 ) -> StdResult<Vec<EventRecord>, Self::Error> {
205 let mut length = vfs::metadata(&self.data).await?.len();
206 let mut it = self.iter(true).await?;
208
209 tracing::trace!(length = %length, "event_log::rewind");
210
211 let mut records = Vec::new();
212
213 while let Some(record) = it.next().await? {
214 if &record.commit() == commit.as_ref() {
216 let mut leaves = self.tree().leaves().unwrap_or_default();
218 if leaves.len() > records.len() {
219 let new_len = leaves.len() - records.len();
220 leaves.truncate(new_len);
221 let mut tree = CommitTree::new();
222 tree.append(&mut leaves);
223 tree.commit();
224 self.tree = tree;
225 } else {
226 return Err(Error::RewindLeavesLength.into());
227 }
228
229 let file =
231 OpenOptions::new().write(true).open(&self.data).await?;
232 let mut guard =
233 file.lock_write().await.map_err(|e| e.error)?;
234 guard.inner_mut().set_len(length).await?;
235
236 return Ok(records);
237 }
238
239 let byte_length = record.byte_length();
241
242 if byte_length < length {
243 length -= byte_length;
244 }
245
246 let event_buffer = read_event_buffer(&self.data, &record).await?;
247
248 let event_record = record.into_event_record(event_buffer);
249 records.push(event_record);
250
251 tracing::trace!(
252 length = %length,
253 byte_length = %byte_length,
254 num_pruned = %records.len(),
255 "event_log::rewind",
256 );
257 }
258
259 Err(Error::CommitNotFound(*commit).into())
260 }
261
262 async fn load_tree(&mut self) -> StdResult<(), Self::Error> {
263 let mut commits = Vec::new();
264
265 let mut it = self.iter(false).await?;
266 while let Some(record) = it.next().await? {
267 commits.push(record.commit());
268 }
269
270 self.tree = CommitTree::new();
271 self.tree.append(&mut commits);
272 self.tree.commit();
273 Ok(())
274 }
275
276 async fn clear(&mut self) -> StdResult<(), Self::Error> {
277 self.truncate().await?;
278 self.tree = CommitTree::new();
279 Ok(())
280 }
281
282 async fn apply(&mut self, events: &[T]) -> StdResult<(), Self::Error> {
283 let mut records = Vec::with_capacity(events.len());
284 for event in events {
285 records.push(EventRecord::encode_event(event).await?);
286 }
287 self.apply_records(records).await
288 }
289
290 async fn apply_records(
291 &mut self,
292 records: Vec<EventRecord>,
293 ) -> StdResult<(), Self::Error> {
294 let mut buffer: Vec<u8> = Vec::new();
295 let mut commits = Vec::new();
296 let mut last_commit_hash = self.tree().last_commit();
297
298 for mut record in records {
299 record.set_last_commit(last_commit_hash);
300 let mut buf = encode(&record).await?;
301 buffer.append(&mut buf);
302 last_commit_hash = Some(*record.commit());
303 commits.push(*record.commit());
304 }
305
306 #[allow(unused_mut)]
307 let mut file = OpenOptions::new()
308 .read(true)
312 .write(true)
313 .append(true)
314 .open(&self.data)
315 .await?;
316
317 #[cfg(target_arch = "wasm32")]
318 {
319 use tokio::io::AsyncSeekExt;
320 file.seek(SeekFrom::End(0)).await?;
321 }
322
323 let mut guard = file.lock_write().await.map_err(|e| e.error)?;
324 match guard.write_all(&buffer).await {
325 Ok(_) => {
326 guard.flush().await?;
327 let mut hashes =
328 commits.iter().map(|c| *c.as_ref()).collect::<Vec<_>>();
329 self.tree.append(&mut hashes);
330 self.tree.commit();
331 Ok(())
332 }
333 Err(e) => Err(e.into()),
334 }
335 }
336
337 async fn patch_checked(
338 &mut self,
339 commit_proof: &CommitProof,
340 patch: &Patch<T>,
341 ) -> StdResult<CheckedPatch, Self::Error> {
342 let comparison = self.tree().compare(commit_proof)?;
343
344 match comparison {
345 Comparison::Equal => {
346 self.patch_unchecked(patch).await?;
347 let proof = self.tree().head()?;
348 Ok(CheckedPatch::Success(proof))
349 }
350 Comparison::Contains(indices) => {
351 let head = self.tree().head()?;
352 let contains = self.tree().proof(&indices)?;
353 Ok(CheckedPatch::Conflict {
354 head,
355 contains: Some(contains),
356 })
357 }
358 Comparison::Unknown => {
359 let head = self.tree().head()?;
360 Ok(CheckedPatch::Conflict {
361 head,
362 contains: None,
363 })
364 }
365 }
366 }
367
368 async fn replace_all_events(
369 &mut self,
370 diff: &Diff<T>,
371 ) -> StdResult<(), Self::Error> {
372 let snapshot = self.try_create_snapshot().await?;
374
375 self.clear().await?;
377
378 self.patch_unchecked(&diff.patch).await?;
380
381 let computed = self.tree().head()?;
383 let verified = computed == diff.checkpoint;
384
385 let mut rollback_completed = false;
386 match (verified, &snapshot) {
387 (false, Some(snapshot_path)) => {
389 rollback_completed =
390 self.try_rollback_snapshot(snapshot_path).await.is_ok();
391 }
392 (true, Some(snapshot_path)) => {
394 vfs::remove_file(snapshot_path).await?;
395 }
396 _ => {}
397 }
398
399 if !verified {
400 return Err(Error::CheckpointVerification {
401 checkpoint: diff.checkpoint.root,
402 computed: computed.root,
403 snapshot,
404 rollback_completed,
405 }
406 .into());
407 }
408
409 Ok(())
410 }
411
412 async fn patch_unchecked(
413 &mut self,
414 patch: &Patch<T>,
415 ) -> StdResult<(), Self::Error> {
416 self.apply_records(patch.records().to_vec()).await
422 }
423
424 async fn diff_records(
425 &self,
426 commit: Option<&CommitHash>,
427 ) -> StdResult<Vec<EventRecord>, Self::Error> {
428 let mut events = Vec::new();
429 let mut it = self.iter(true).await?;
431 while let Some(record) = it.next().await? {
432 if let Some(commit) = commit {
433 if &record.commit() == commit.as_ref() {
434 return Ok(events);
435 }
436 }
437 let buffer = read_event_buffer(&self.data, &record).await?;
438 let event_record = record.into_event_record(buffer);
443 events.insert(0, event_record);
444 }
445
446 if let Some(commit) = commit {
450 return Err(Error::CommitNotFound(*commit).into());
451 }
452
453 Ok(events)
454 }
455
456 fn version(&self) -> u16 {
457 self.version.unwrap_or(VERSION1)
458 }
459}
460
461impl<T, E> FileSystemEventLog<T, E>
462where
463 T: Default + Encodable + Decodable + Send + Sync + 'static,
464 E: std::error::Error
465 + std::fmt::Debug
466 + From<sos_core::Error>
467 + From<crate::Error>
468 + From<std::io::Error>
469 + Send
470 + Sync
471 + 'static,
472{
473 pub fn file_path(&self) -> &PathBuf {
475 &self.data
476 }
477
478 async fn truncate(&mut self) -> StdResult<(), E> {
479 use tokio::io::{
480 AsyncSeekExt as TokioAsyncSeekExt,
481 AsyncWriteExt as TokioAsyncWriteExt,
482 };
483
484 let mut file = OpenOptions::new()
487 .write(true)
488 .truncate(true)
489 .open(&self.data)
490 .await?;
491
492 file.seek(SeekFrom::Start(0)).await?;
493
494 let mut guard = file.lock_write().await.map_err(|e| e.error)?;
495 guard.write_all(self.identity).await?;
496 if let Some(version) = self.version {
497 guard.write_all(&version.to_le_bytes()).await?;
498 }
499 guard.flush().await?;
500
501 Ok(())
502 }
503
504 #[doc(hidden)]
506 pub async fn decode_event(
507 &self,
508 item: &EventLogRecord,
509 ) -> StdResult<T, E> {
510 let value = item.value();
511
512 let file = File::open(&self.data).await?;
513 let mut guard = file.lock_read().await.map_err(|e| e.error)?;
514
515 guard.seek(SeekFrom::Start(value.start)).await?;
516 let mut buffer = vec![0; (value.end - value.start) as usize];
517 guard.read_exact(buffer.as_mut_slice()).await?;
518
519 let mut stream = BufReader::new(Cursor::new(&mut buffer));
520 let mut reader = BinaryReader::new(&mut stream, encoding_options());
521 let mut event: T = Default::default();
522 event.decode(&mut reader).await?;
523 Ok(event)
524 }
525
526 pub async fn iter(&self, reverse: bool) -> StdResult<Iter, E> {
528 let content_offset = self.header_len() as u64;
529 let read_stream = File::open(&self.data).await?;
530 let it: Iter = Box::new(
531 FormatStream::<EventLogRecord, File>::new_file(
532 read_stream,
533 self.identity,
534 true,
535 Some(content_offset),
536 reverse,
537 )
538 .await?,
539 );
540 Ok(it)
541 }
542
543 #[doc(hidden)]
546 fn header_len(&self) -> usize {
547 let mut len = self.identity.len();
548 if self.version.is_some() {
549 len += (u16::BITS / 8) as usize;
550 }
551 len
552 }
553
554 async fn initialize_event_log<P: AsRef<Path>>(
584 path: P,
585 identity: &'static [u8],
586 encoding_version: Option<u16>,
587 ) -> StdResult<(), E> {
588 let file = OpenOptions::new()
589 .create(true)
590 .write(true)
591 .open(path.as_ref())
592 .await?;
593
594 let size = vfs::metadata(path.as_ref()).await?.len();
595 if size == 0 {
596 let mut guard = file.lock_write().await.map_err(|e| e.error)?;
597 let mut header = identity.to_vec();
598 if let Some(version) = encoding_version {
599 header.extend_from_slice(&version.to_le_bytes());
600 }
601 guard.write_all(&header).await?;
602 guard.flush().await?;
603 }
604
605 Ok(())
606 }
607
608 #[doc(hidden)]
609 async fn try_create_snapshot(&self) -> StdResult<Option<PathBuf>, E> {
610 if let Some(root) = self.tree().root() {
611 let mut snapshot_path = self.data.clone();
612 snapshot_path.set_extension(&format!("snapshot-{}", root));
613
614 let metadata = vfs::metadata(&self.data).await?;
615 tracing::debug!(
616 num_events = %self.tree().len(),
617 file_size = %metadata.len(),
618 source = %self.data.display(),
619 snapshot = %snapshot_path.display(),
620 "event_log::snapshot::create"
621 );
622
623 vfs::copy(&self.data, &snapshot_path).await?;
624 Ok(Some(snapshot_path))
625 } else {
626 Ok(None)
627 }
628 }
629
630 #[doc(hidden)]
631 async fn try_rollback_snapshot(
632 &mut self,
633 snapshot_path: &PathBuf,
634 ) -> StdResult<(), E> {
635 let source_path = self.data.clone();
636
637 let metadata = vfs::metadata(snapshot_path).await?;
638 tracing::debug!(
639 file_size = %metadata.len(),
640 source = %source_path.display(),
641 snapshot = %snapshot_path.display(),
642 "event_log::snapshot::rollback"
643 );
644
645 vfs::remove_file(&source_path).await?;
646 vfs::rename(snapshot_path, &source_path).await?;
647 self.load_tree().await?;
648
649 Ok(())
650 }
651}
652
653impl<E> FileSystemEventLog<WriteEvent, E>
654where
655 E: std::error::Error
656 + std::fmt::Debug
657 + From<sos_core::Error>
658 + From<crate::Error>
659 + From<std::io::Error>
660 + Send
661 + Sync
662 + 'static,
663{
664 pub async fn new_folder<P: AsRef<Path>>(path: P) -> StdResult<Self, E> {
666 use sos_core::constants::FOLDER_EVENT_LOG_IDENTITY;
667 Self::initialize_event_log(
671 path.as_ref(),
672 &FOLDER_EVENT_LOG_IDENTITY,
673 None,
674 )
675 .await?;
676
677 read_file_identity_bytes(path.as_ref(), &FOLDER_EVENT_LOG_IDENTITY)
678 .await?;
679
680 Ok(Self {
681 data: path.as_ref().to_path_buf(),
682 tree: Default::default(),
683 identity: &FOLDER_EVENT_LOG_IDENTITY,
684 version: None,
685 phantom: std::marker::PhantomData,
686 })
687 }
688}
689
690impl<E> FileSystemEventLog<AccountEvent, E>
691where
692 E: std::error::Error
693 + std::fmt::Debug
694 + From<sos_core::Error>
695 + From<crate::Error>
696 + From<std::io::Error>
697 + Send
698 + Sync
699 + 'static,
700{
701 pub async fn new_account<P: AsRef<Path>>(path: P) -> StdResult<Self, E> {
703 use sos_core::{
704 constants::ACCOUNT_EVENT_LOG_IDENTITY, encoding::VERSION,
705 };
706 Self::initialize_event_log(
707 path.as_ref(),
708 &ACCOUNT_EVENT_LOG_IDENTITY,
709 Some(VERSION),
710 )
711 .await?;
712
713 read_file_identity_bytes(path.as_ref(), &ACCOUNT_EVENT_LOG_IDENTITY)
714 .await?;
715
716 Ok(Self {
717 data: path.as_ref().to_path_buf(),
718 tree: Default::default(),
719 identity: &ACCOUNT_EVENT_LOG_IDENTITY,
720 version: Some(VERSION),
721 phantom: std::marker::PhantomData,
722 })
723 }
724}
725
726impl<E> FileSystemEventLog<DeviceEvent, E>
727where
728 E: std::error::Error
729 + std::fmt::Debug
730 + From<sos_core::Error>
731 + From<crate::Error>
732 + From<std::io::Error>
733 + Send
734 + Sync
735 + 'static,
736{
737 pub async fn new_device(path: impl AsRef<Path>) -> StdResult<Self, E> {
739 use sos_core::{
740 constants::DEVICE_EVENT_LOG_IDENTITY, encoding::VERSION,
741 };
742
743 Self::initialize_event_log(
744 path.as_ref(),
745 &DEVICE_EVENT_LOG_IDENTITY,
746 Some(VERSION),
747 )
748 .await?;
749
750 read_file_identity_bytes(path.as_ref(), &DEVICE_EVENT_LOG_IDENTITY)
751 .await?;
752
753 Ok(Self {
754 data: path.as_ref().to_path_buf(),
755 tree: Default::default(),
756 identity: &DEVICE_EVENT_LOG_IDENTITY,
757 version: Some(VERSION),
758 phantom: std::marker::PhantomData,
759 })
760 }
761}
762
763#[cfg(feature = "files")]
764impl<E> FileSystemEventLog<FileEvent, E>
765where
766 E: std::error::Error
767 + std::fmt::Debug
768 + From<sos_core::Error>
769 + From<crate::Error>
770 + From<std::io::Error>
771 + Send
772 + Sync
773 + 'static,
774{
775 pub async fn new_file(path: impl AsRef<Path>) -> StdResult<Self, E> {
777 use sos_core::{
778 constants::FILE_EVENT_LOG_IDENTITY, encoding::VERSION,
779 };
780
781 Self::initialize_event_log(
782 path.as_ref(),
783 &FILE_EVENT_LOG_IDENTITY,
784 Some(VERSION),
785 )
786 .await?;
787
788 read_file_identity_bytes(path.as_ref(), &FILE_EVENT_LOG_IDENTITY)
789 .await?;
790
791 Ok(Self {
792 data: path.as_ref().to_path_buf(),
793 tree: Default::default(),
794 identity: &FILE_EVENT_LOG_IDENTITY,
795 version: Some(VERSION),
796 phantom: std::marker::PhantomData,
797 })
798 }
799}