1use crate::{
17 formats::{
18 read_file_identity_bytes, EventLogRecord, FileItem, FormatStream,
19 FormatStreamIterator,
20 },
21 Error, Result,
22};
23use async_fd_lock::{LockRead, LockWrite};
24use async_stream::try_stream;
25use async_trait::async_trait;
26use binary_stream::futures::{BinaryReader, Decodable, Encodable};
27use futures::stream::BoxStream;
28use sos_core::{
29 commit::{CommitHash, CommitProof, CommitTree, Comparison},
30 encode,
31 encoding::{encoding_options, VERSION1},
32 events::{
33 patch::{CheckedPatch, Diff, Patch},
34 AccountEvent, DeviceEvent, EventRecord, WriteEvent,
35 },
36};
37use sos_vfs::{self as vfs, File, OpenOptions};
38use std::io::Cursor;
39use std::result::Result as StdResult;
40use std::{
41 io::SeekFrom,
42 path::{Path, PathBuf},
43};
44use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader};
45
46#[cfg(feature = "files")]
47use sos_core::events::FileEvent;
48
49pub use sos_core::events::EventLog;
50
51pub type AccountEventLog<E> = FileSystemEventLog<AccountEvent, E>;
53
54pub type DeviceEventLog<E> = FileSystemEventLog<DeviceEvent, E>;
56
57pub type FolderEventLog<E> = FileSystemEventLog<WriteEvent, E>;
59
60#[cfg(feature = "files")]
62pub type FileEventLog<E> = FileSystemEventLog<FileEvent, E>;
63
64type Iter = Box<dyn FormatStreamIterator<EventLogRecord> + Send + Sync>;
66
67async fn read_event_buffer(
70 file_path: impl AsRef<Path>,
71 record: &EventLogRecord,
72) -> Result<Vec<u8>> {
73 let file = File::open(file_path.as_ref()).await?;
74 let mut guard = file.lock_read().await.map_err(|e| e.error)?;
75
76 let offset = record.value();
77 let row_len = offset.end - offset.start;
78
79 guard.seek(SeekFrom::Start(offset.start)).await?;
80
81 let mut buf = vec![0u8; row_len as usize];
82 guard.read_exact(&mut buf).await?;
83
84 Ok(buf)
85}
86
87pub struct FileSystemEventLog<T, E>
93where
94 T: Default + Encodable + Decodable + Send + Sync,
95 E: std::error::Error
96 + std::fmt::Debug
97 + From<sos_core::Error>
98 + From<crate::Error>
99 + From<std::io::Error>
100 + Send
101 + Sync
102 + 'static,
103{
104 tree: CommitTree,
105 data: PathBuf,
106 identity: &'static [u8],
107 version: Option<u16>,
108 phantom: std::marker::PhantomData<(T, E)>,
109}
110
111#[async_trait]
112impl<T, E> EventLog<T> for FileSystemEventLog<T, E>
113where
114 T: Default + Encodable + Decodable + Send + Sync + 'static,
115 E: std::error::Error
116 + std::fmt::Debug
117 + From<sos_core::Error>
118 + From<crate::Error>
119 + From<std::io::Error>
120 + Send
121 + Sync
122 + 'static,
123{
124 type Error = E;
125
126 async fn record_stream(
127 &self,
128 reverse: bool,
129 ) -> BoxStream<'async_trait, StdResult<EventRecord, Self::Error>> {
130 let mut it =
131 self.iter(reverse).await.expect("to initialize iterator");
132 let file_path = self.data.clone();
133 Box::pin(try_stream! {
134 while let Some(record) = it.next().await? {
135 let event_buffer = read_event_buffer(
136 file_path.clone(), &record).await?;
137 let event_record = record.into_event_record(event_buffer);
138 yield event_record;
139 }
140 })
141 }
142
143 async fn event_stream(
144 &self,
145 reverse: bool,
146 ) -> BoxStream<'async_trait, StdResult<(EventRecord, T), Self::Error>>
147 {
148 let mut it =
149 self.iter(reverse).await.expect("to initialize iterator");
150
151 let file_path = self.data.clone();
152 Box::pin(try_stream! {
153 while let Some(record) = it.next().await? {
154 let event_buffer = read_event_buffer(
155 file_path.clone(), &record).await?;
156 let event_record = record.into_event_record(event_buffer);
157 let event = event_record.decode_event::<T>().await?;
158 yield (event_record, event);
159 }
160 })
161 }
162
163 async fn diff_checked(
164 &self,
165 commit: Option<CommitHash>,
166 checkpoint: CommitProof,
167 ) -> StdResult<Diff<T>, Self::Error> {
168 let patch = self.diff_events(commit.as_ref()).await?;
169 Ok(Diff::<T> {
170 last_commit: commit,
171 patch,
172 checkpoint,
173 })
174 }
175
176 async fn diff_unchecked(&self) -> StdResult<Diff<T>, Self::Error> {
177 let patch = self.diff_events(None).await?;
178 Ok(Diff::<T> {
179 last_commit: None,
180 patch,
181 checkpoint: self.tree().head()?,
182 })
183 }
184
185 async fn diff_events(
186 &self,
187 commit: Option<&CommitHash>,
188 ) -> StdResult<Patch<T>, Self::Error> {
189 let records = self.diff_records(commit).await?;
190 Ok(Patch::new(records))
191 }
192
193 fn tree(&self) -> &CommitTree {
194 &self.tree
195 }
196
197 async fn rewind(
198 &mut self,
199 commit: &CommitHash,
200 ) -> StdResult<Vec<EventRecord>, Self::Error> {
201 let mut length = vfs::metadata(&self.data).await?.len();
202 let mut it = self.iter(true).await?;
204
205 tracing::trace!(length = %length, "event_log::rewind");
206
207 let mut records = Vec::new();
208
209 while let Some(record) = it.next().await? {
210 if &record.commit() == commit.as_ref() {
212 let mut leaves = self.tree().leaves().unwrap_or_default();
214 if leaves.len() > records.len() {
215 let new_len = leaves.len() - records.len();
216 leaves.truncate(new_len);
217 let mut tree = CommitTree::new();
218 tree.append(&mut leaves);
219 tree.commit();
220 self.tree = tree;
221 } else {
222 return Err(Error::RewindLeavesLength.into());
223 }
224
225 let file =
227 OpenOptions::new().write(true).open(&self.data).await?;
228 let mut guard =
229 file.lock_write().await.map_err(|e| e.error)?;
230 guard.inner_mut().set_len(length).await?;
231
232 return Ok(records);
233 }
234
235 let byte_length = record.byte_length();
237
238 if byte_length < length {
239 length -= byte_length;
240 }
241
242 let event_buffer = read_event_buffer(&self.data, &record).await?;
243
244 let event_record = record.into_event_record(event_buffer);
245 records.push(event_record);
246
247 tracing::trace!(
248 length = %length,
249 byte_length = %byte_length,
250 num_pruned = %records.len(),
251 "event_log::rewind",
252 );
253 }
254
255 Err(Error::CommitNotFound(*commit).into())
256 }
257
258 async fn load_tree(&mut self) -> StdResult<(), Self::Error> {
259 let mut commits = Vec::new();
260
261 let mut it = self.iter(false).await?;
262 while let Some(record) = it.next().await? {
263 commits.push(record.commit());
264 }
265
266 self.tree = CommitTree::new();
267 self.tree.append(&mut commits);
268 self.tree.commit();
269 Ok(())
270 }
271
272 async fn clear(&mut self) -> StdResult<(), Self::Error> {
273 self.truncate().await?;
274 self.tree = CommitTree::new();
275 Ok(())
276 }
277
278 async fn apply(&mut self, events: &[T]) -> StdResult<(), Self::Error> {
279 let mut records = Vec::with_capacity(events.len());
280 for event in events {
281 records.push(EventRecord::encode_event(event).await?);
282 }
283 self.apply_records(records).await
284 }
285
286 async fn apply_records(
287 &mut self,
288 records: Vec<EventRecord>,
289 ) -> StdResult<(), Self::Error> {
290 let mut buffer: Vec<u8> = Vec::new();
291 let mut commits = Vec::new();
292 let mut last_commit_hash = self.tree().last_commit();
293
294 for mut record in records {
295 record.set_last_commit(last_commit_hash);
296 let mut buf = encode(&record).await?;
297 buffer.append(&mut buf);
298 last_commit_hash = Some(*record.commit());
299 commits.push(*record.commit());
300 }
301
302 #[allow(unused_mut)]
303 let mut file = OpenOptions::new()
304 .read(true)
308 .write(true)
309 .append(true)
310 .open(&self.data)
311 .await?;
312
313 #[cfg(target_arch = "wasm32")]
314 {
315 use tokio::io::AsyncSeekExt;
316 file.seek(SeekFrom::End(0)).await?;
317 }
318
319 let mut guard = file.lock_write().await.map_err(|e| e.error)?;
320 match guard.write_all(&buffer).await {
321 Ok(_) => {
322 guard.flush().await?;
323 let mut hashes =
324 commits.iter().map(|c| *c.as_ref()).collect::<Vec<_>>();
325 self.tree.append(&mut hashes);
326 self.tree.commit();
327 Ok(())
328 }
329 Err(e) => Err(e.into()),
330 }
331 }
332
333 async fn patch_checked(
334 &mut self,
335 commit_proof: &CommitProof,
336 patch: &Patch<T>,
337 ) -> StdResult<CheckedPatch, Self::Error> {
338 let comparison = self.tree().compare(commit_proof)?;
339
340 match comparison {
341 Comparison::Equal => {
342 self.patch_unchecked(patch).await?;
343 let proof = self.tree().head()?;
344 Ok(CheckedPatch::Success(proof))
345 }
346 Comparison::Contains(indices) => {
347 let head = self.tree().head()?;
348 let contains = self.tree().proof(&indices)?;
349 Ok(CheckedPatch::Conflict {
350 head,
351 contains: Some(contains),
352 })
353 }
354 Comparison::Unknown => {
355 let head = self.tree().head()?;
356 Ok(CheckedPatch::Conflict {
357 head,
358 contains: None,
359 })
360 }
361 }
362 }
363
364 async fn replace_all_events(
365 &mut self,
366 diff: &Diff<T>,
367 ) -> StdResult<(), Self::Error> {
368 let snapshot = self.try_create_snapshot().await?;
370
371 self.clear().await?;
373
374 self.patch_unchecked(&diff.patch).await?;
376
377 let computed = self.tree().head()?;
379 let verified = computed == diff.checkpoint;
380
381 let mut rollback_completed = false;
382 match (verified, &snapshot) {
383 (false, Some(snapshot_path)) => {
385 rollback_completed =
386 self.try_rollback_snapshot(snapshot_path).await.is_ok();
387 }
388 (true, Some(snapshot_path)) => {
390 vfs::remove_file(snapshot_path).await?;
391 }
392 _ => {}
393 }
394
395 if !verified {
396 return Err(Error::CheckpointVerification {
397 checkpoint: diff.checkpoint.root,
398 computed: computed.root,
399 snapshot,
400 rollback_completed,
401 }
402 .into());
403 }
404
405 Ok(())
406 }
407
408 async fn patch_unchecked(
409 &mut self,
410 patch: &Patch<T>,
411 ) -> StdResult<(), Self::Error> {
412 self.apply_records(patch.records().to_vec()).await
418 }
419
420 async fn diff_records(
421 &self,
422 commit: Option<&CommitHash>,
423 ) -> StdResult<Vec<EventRecord>, Self::Error> {
424 let mut events = Vec::new();
425 let mut it = self.iter(true).await?;
427 while let Some(record) = it.next().await? {
428 if let Some(commit) = commit {
429 if &record.commit() == commit.as_ref() {
430 return Ok(events);
431 }
432 }
433 let buffer = read_event_buffer(&self.data, &record).await?;
434 let event_record = record.into_event_record(buffer);
439 events.insert(0, event_record);
440 }
441
442 if let Some(commit) = commit {
446 return Err(Error::CommitNotFound(*commit).into());
447 }
448
449 Ok(events)
450 }
451
452 fn version(&self) -> u16 {
453 self.version.unwrap_or(VERSION1)
454 }
455}
456
457impl<T, E> FileSystemEventLog<T, E>
458where
459 T: Default + Encodable + Decodable + Send + Sync + 'static,
460 E: std::error::Error
461 + std::fmt::Debug
462 + From<sos_core::Error>
463 + From<crate::Error>
464 + From<std::io::Error>
465 + Send
466 + Sync
467 + 'static,
468{
469 pub fn file_path(&self) -> &PathBuf {
471 &self.data
472 }
473
474 async fn truncate(&mut self) -> StdResult<(), E> {
475 use tokio::io::{
476 AsyncSeekExt as TokioAsyncSeekExt,
477 AsyncWriteExt as TokioAsyncWriteExt,
478 };
479
480 let mut file = OpenOptions::new()
483 .write(true)
484 .truncate(true)
485 .open(&self.data)
486 .await?;
487
488 file.seek(SeekFrom::Start(0)).await?;
489
490 let mut guard = file.lock_write().await.map_err(|e| e.error)?;
491 guard.write_all(self.identity).await?;
492 if let Some(version) = self.version {
493 guard.write_all(&version.to_le_bytes()).await?;
494 }
495 guard.flush().await?;
496
497 Ok(())
498 }
499
500 #[doc(hidden)]
502 pub async fn decode_event(
503 &self,
504 item: &EventLogRecord,
505 ) -> StdResult<T, E> {
506 let value = item.value();
507
508 let file = File::open(&self.data).await?;
509 let mut guard = file.lock_read().await.map_err(|e| e.error)?;
510
511 guard.seek(SeekFrom::Start(value.start)).await?;
512 let mut buffer = vec![0; (value.end - value.start) as usize];
513 guard.read_exact(buffer.as_mut_slice()).await?;
514
515 let mut stream = BufReader::new(Cursor::new(&mut buffer));
516 let mut reader = BinaryReader::new(&mut stream, encoding_options());
517 let mut event: T = Default::default();
518 event.decode(&mut reader).await?;
519 Ok(event)
520 }
521
522 pub async fn iter(&self, reverse: bool) -> StdResult<Iter, E> {
524 let content_offset = self.header_len() as u64;
525 let read_stream = File::open(&self.data).await?;
526 let it: Iter = Box::new(
527 FormatStream::<EventLogRecord, File>::new_file(
528 read_stream,
529 self.identity,
530 true,
531 Some(content_offset),
532 reverse,
533 )
534 .await?,
535 );
536 Ok(it)
537 }
538
539 #[doc(hidden)]
542 fn header_len(&self) -> usize {
543 let mut len = self.identity.len();
544 if self.version.is_some() {
545 len += (u16::BITS / 8) as usize;
546 }
547 len
548 }
549
550 async fn initialize_event_log<P: AsRef<Path>>(
580 path: P,
581 identity: &'static [u8],
582 encoding_version: Option<u16>,
583 ) -> StdResult<(), E> {
584 let file = OpenOptions::new()
585 .create(true)
586 .write(true)
587 .open(path.as_ref())
588 .await?;
589
590 let size = vfs::metadata(path.as_ref()).await?.len();
591 if size == 0 {
592 let mut guard = file.lock_write().await.map_err(|e| e.error)?;
593 let mut header = identity.to_vec();
594 if let Some(version) = encoding_version {
595 header.extend_from_slice(&version.to_le_bytes());
596 }
597 guard.write_all(&header).await?;
598 guard.flush().await?;
599 }
600
601 Ok(())
602 }
603
604 #[doc(hidden)]
605 async fn try_create_snapshot(&self) -> StdResult<Option<PathBuf>, E> {
606 if let Some(root) = self.tree().root() {
607 let mut snapshot_path = self.data.clone();
608 snapshot_path.set_extension(&format!("snapshot-{}", root));
609
610 let metadata = vfs::metadata(&self.data).await?;
611 tracing::debug!(
612 num_events = %self.tree().len(),
613 file_size = %metadata.len(),
614 source = %self.data.display(),
615 snapshot = %snapshot_path.display(),
616 "event_log::snapshot::create"
617 );
618
619 vfs::copy(&self.data, &snapshot_path).await?;
620 Ok(Some(snapshot_path))
621 } else {
622 Ok(None)
623 }
624 }
625
626 #[doc(hidden)]
627 async fn try_rollback_snapshot(
628 &mut self,
629 snapshot_path: &PathBuf,
630 ) -> StdResult<(), E> {
631 let source_path = self.data.clone();
632
633 let metadata = vfs::metadata(snapshot_path).await?;
634 tracing::debug!(
635 file_size = %metadata.len(),
636 source = %source_path.display(),
637 snapshot = %snapshot_path.display(),
638 "event_log::snapshot::rollback"
639 );
640
641 vfs::remove_file(&source_path).await?;
642 vfs::rename(snapshot_path, &source_path).await?;
643 self.load_tree().await?;
644
645 Ok(())
646 }
647}
648
649impl<E> FileSystemEventLog<WriteEvent, E>
650where
651 E: std::error::Error
652 + std::fmt::Debug
653 + From<sos_core::Error>
654 + From<crate::Error>
655 + From<std::io::Error>
656 + Send
657 + Sync
658 + 'static,
659{
660 pub async fn new_folder<P: AsRef<Path>>(path: P) -> StdResult<Self, E> {
662 use sos_core::constants::FOLDER_EVENT_LOG_IDENTITY;
663 Self::initialize_event_log(
667 path.as_ref(),
668 &FOLDER_EVENT_LOG_IDENTITY,
669 None,
670 )
671 .await?;
672
673 read_file_identity_bytes(path.as_ref(), &FOLDER_EVENT_LOG_IDENTITY)
674 .await?;
675
676 Ok(Self {
677 data: path.as_ref().to_path_buf(),
678 tree: Default::default(),
679 identity: &FOLDER_EVENT_LOG_IDENTITY,
680 version: None,
681 phantom: std::marker::PhantomData,
682 })
683 }
684}
685
686impl<E> FileSystemEventLog<AccountEvent, E>
687where
688 E: std::error::Error
689 + std::fmt::Debug
690 + From<sos_core::Error>
691 + From<crate::Error>
692 + From<std::io::Error>
693 + Send
694 + Sync
695 + 'static,
696{
697 pub async fn new_account<P: AsRef<Path>>(path: P) -> StdResult<Self, E> {
699 use sos_core::{
700 constants::ACCOUNT_EVENT_LOG_IDENTITY, encoding::VERSION,
701 };
702 Self::initialize_event_log(
703 path.as_ref(),
704 &ACCOUNT_EVENT_LOG_IDENTITY,
705 Some(VERSION),
706 )
707 .await?;
708
709 read_file_identity_bytes(path.as_ref(), &ACCOUNT_EVENT_LOG_IDENTITY)
710 .await?;
711
712 Ok(Self {
713 data: path.as_ref().to_path_buf(),
714 tree: Default::default(),
715 identity: &ACCOUNT_EVENT_LOG_IDENTITY,
716 version: Some(VERSION),
717 phantom: std::marker::PhantomData,
718 })
719 }
720}
721
722impl<E> FileSystemEventLog<DeviceEvent, E>
723where
724 E: std::error::Error
725 + std::fmt::Debug
726 + From<sos_core::Error>
727 + From<crate::Error>
728 + From<std::io::Error>
729 + Send
730 + Sync
731 + 'static,
732{
733 pub async fn new_device(path: impl AsRef<Path>) -> StdResult<Self, E> {
735 use sos_core::{
736 constants::DEVICE_EVENT_LOG_IDENTITY, encoding::VERSION,
737 };
738
739 Self::initialize_event_log(
740 path.as_ref(),
741 &DEVICE_EVENT_LOG_IDENTITY,
742 Some(VERSION),
743 )
744 .await?;
745
746 read_file_identity_bytes(path.as_ref(), &DEVICE_EVENT_LOG_IDENTITY)
747 .await?;
748
749 Ok(Self {
750 data: path.as_ref().to_path_buf(),
751 tree: Default::default(),
752 identity: &DEVICE_EVENT_LOG_IDENTITY,
753 version: Some(VERSION),
754 phantom: std::marker::PhantomData,
755 })
756 }
757}
758
759#[cfg(feature = "files")]
760impl<E> FileSystemEventLog<FileEvent, E>
761where
762 E: std::error::Error
763 + std::fmt::Debug
764 + From<sos_core::Error>
765 + From<crate::Error>
766 + From<std::io::Error>
767 + Send
768 + Sync
769 + 'static,
770{
771 pub async fn new_file(path: impl AsRef<Path>) -> StdResult<Self, E> {
773 use sos_core::{
774 constants::FILE_EVENT_LOG_IDENTITY, encoding::VERSION,
775 };
776
777 Self::initialize_event_log(
778 path.as_ref(),
779 &FILE_EVENT_LOG_IDENTITY,
780 Some(VERSION),
781 )
782 .await?;
783
784 read_file_identity_bytes(path.as_ref(), &FILE_EVENT_LOG_IDENTITY)
785 .await?;
786
787 Ok(Self {
788 data: path.as_ref().to_path_buf(),
789 tree: Default::default(),
790 identity: &FILE_EVENT_LOG_IDENTITY,
791 version: Some(VERSION),
792 phantom: std::marker::PhantomData,
793 })
794 }
795}