sos_filesystem/
event_log.rs

1//! Event log file.
2//!
3//! Event logs consist of 4 identity bytes followed by one or more
4//! rows of log records; each row contains the row length prepended
5//! and appended so that rows can be efficiently iterated in
6//! both directions.
7//!
8//! Row components with byte sizes:
9//!
10//! ```text
11//! | 4 row length | 12 timestamp | 32 last commit hash | 32 commit hash | 4 data length | data | 4 row length |
12//! ```
13//!
14//! The first row must always contain a last commit hash that is all zero.
15//!
16use crate::{
17    formats::{
18        read_file_identity_bytes, EventLogRecord, FileItem, FormatStream,
19        FormatStreamIterator,
20    },
21    Error, Result,
22};
23use async_fd_lock::{LockRead, LockWrite};
24use async_trait::async_trait;
25use binary_stream::futures::{BinaryReader, Decodable, Encodable};
26use futures::{stream::BoxStream, StreamExt, TryStreamExt};
27use sos_core::{
28    commit::{CommitHash, CommitProof, CommitTree, Comparison},
29    encode,
30    encoding::{encoding_options, VERSION1},
31    events::{
32        patch::{CheckedPatch, Diff, Patch},
33        AccountEvent, DeviceEvent, EventRecord, WriteEvent,
34    },
35};
36use sos_vfs::{self as vfs, File, OpenOptions};
37use std::result::Result as StdResult;
38use std::{
39    io::{Cursor, SeekFrom},
40    path::{Path, PathBuf},
41};
42use tokio::{
43    io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader},
44    sync::mpsc,
45};
46use tokio_stream::wrappers::ReceiverStream;
47
48#[cfg(feature = "files")]
49use sos_core::events::FileEvent;
50
51pub use sos_core::events::EventLog;
52
53/// Event log for changes to an account.
54pub type AccountEventLog<E> = FileSystemEventLog<AccountEvent, E>;
55
56/// Event log for devices.
57pub type DeviceEventLog<E> = FileSystemEventLog<DeviceEvent, E>;
58
59/// Event log for changes to a folder.
60pub type FolderEventLog<E> = FileSystemEventLog<WriteEvent, E>;
61
62/// Event log for changes to external files.
63#[cfg(feature = "files")]
64pub type FileEventLog<E> = FileSystemEventLog<FileEvent, E>;
65
66/// Type of an event log file iterator.
67type Iter = Box<dyn FormatStreamIterator<EventLogRecord> + Send + Sync>;
68
69/// Read the bytes for the encoded event
70/// inside the log record.
71async fn read_event_buffer(
72    file_path: impl AsRef<Path>,
73    record: &EventLogRecord,
74) -> Result<Vec<u8>> {
75    let file = File::open(file_path.as_ref()).await?;
76    let mut guard = file.lock_read().await.map_err(|e| e.error)?;
77
78    let offset = record.value();
79    let row_len = offset.end - offset.start;
80
81    guard.seek(SeekFrom::Start(offset.start)).await?;
82
83    let mut buf = vec![0u8; row_len as usize];
84    guard.read_exact(&mut buf).await?;
85
86    Ok(buf)
87}
88
89/// Filesystem event log.
90///
91/// Appends events to an append-only writer and reads events
92/// via a reader whilst managing an in-memory merkle tree
93/// of event hashes.
94pub struct FileSystemEventLog<T, E>
95where
96    T: Default + Encodable + Decodable + Send + Sync,
97    E: std::error::Error
98        + std::fmt::Debug
99        + From<sos_core::Error>
100        + From<crate::Error>
101        + From<std::io::Error>
102        + Send
103        + Sync
104        + 'static,
105{
106    tree: CommitTree,
107    data: PathBuf,
108    identity: &'static [u8],
109    version: Option<u16>,
110    phantom: std::marker::PhantomData<(T, E)>,
111}
112
113#[async_trait]
114impl<T, E> EventLog<T> for FileSystemEventLog<T, E>
115where
116    T: Default + Encodable + Decodable + Send + Sync + 'static,
117    E: std::error::Error
118        + std::fmt::Debug
119        + From<sos_core::Error>
120        + From<crate::Error>
121        + From<std::io::Error>
122        + Send
123        + Sync
124        + 'static,
125{
126    type Error = E;
127
128    async fn record_stream(
129        &self,
130        reverse: bool,
131    ) -> BoxStream<'async_trait, StdResult<EventRecord, Self::Error>> {
132        let (tx, rx) =
133            mpsc::channel::<StdResult<EventRecord, Self::Error>>(8);
134
135        let mut it =
136            self.iter(reverse).await.expect("to initialize iterator");
137        let file_path = self.data.clone();
138        tokio::task::spawn(async move {
139            while let Some(record) = it.next().await? {
140                let event_buffer =
141                    read_event_buffer(file_path.clone(), &record).await?;
142                let event_record = record.into_event_record(event_buffer);
143                if let Err(e) = tx.send(Ok(event_record)).await {
144                    tracing::error!(error = %e);
145                }
146            }
147            Ok::<_, Self::Error>(())
148        });
149
150        ReceiverStream::new(rx).boxed()
151    }
152
153    async fn event_stream(
154        &self,
155        reverse: bool,
156    ) -> BoxStream<'async_trait, StdResult<(EventRecord, T), Self::Error>>
157    {
158        self.record_stream(reverse)
159            .await
160            .try_filter_map(|record| async {
161                let event = record.decode_event::<T>().await?;
162                Ok(Some((record, event)))
163            })
164            .boxed()
165    }
166
167    async fn diff_checked(
168        &self,
169        commit: Option<CommitHash>,
170        checkpoint: CommitProof,
171    ) -> StdResult<Diff<T>, Self::Error> {
172        let patch = self.diff_events(commit.as_ref()).await?;
173        Ok(Diff::<T> {
174            last_commit: commit,
175            patch,
176            checkpoint,
177        })
178    }
179
180    async fn diff_unchecked(&self) -> StdResult<Diff<T>, Self::Error> {
181        let patch = self.diff_events(None).await?;
182        Ok(Diff::<T> {
183            last_commit: None,
184            patch,
185            checkpoint: self.tree().head()?,
186        })
187    }
188
189    async fn diff_events(
190        &self,
191        commit: Option<&CommitHash>,
192    ) -> StdResult<Patch<T>, Self::Error> {
193        let records = self.diff_records(commit).await?;
194        Ok(Patch::new(records))
195    }
196
197    fn tree(&self) -> &CommitTree {
198        &self.tree
199    }
200
201    async fn rewind(
202        &mut self,
203        commit: &CommitHash,
204    ) -> StdResult<Vec<EventRecord>, Self::Error> {
205        let mut length = vfs::metadata(&self.data).await?.len();
206        // Iterate backwards and track how many commits are pruned
207        let mut it = self.iter(true).await?;
208
209        tracing::trace!(length = %length, "event_log::rewind");
210
211        let mut records = Vec::new();
212
213        while let Some(record) = it.next().await? {
214            // Found the target commit
215            if &record.commit() == commit.as_ref() {
216                // Rewrite the in-memory tree
217                let mut leaves = self.tree().leaves().unwrap_or_default();
218                if leaves.len() > records.len() {
219                    let new_len = leaves.len() - records.len();
220                    leaves.truncate(new_len);
221                    let mut tree = CommitTree::new();
222                    tree.append(&mut leaves);
223                    tree.commit();
224                    self.tree = tree;
225                } else {
226                    return Err(Error::RewindLeavesLength.into());
227                }
228
229                // Truncate the file to the new length
230                let file =
231                    OpenOptions::new().write(true).open(&self.data).await?;
232                let mut guard =
233                    file.lock_write().await.map_err(|e| e.error)?;
234                guard.inner_mut().set_len(length).await?;
235
236                return Ok(records);
237            }
238
239            // Compute new length and number of pruned commits
240            let byte_length = record.byte_length();
241
242            if byte_length < length {
243                length -= byte_length;
244            }
245
246            let event_buffer = read_event_buffer(&self.data, &record).await?;
247
248            let event_record = record.into_event_record(event_buffer);
249            records.push(event_record);
250
251            tracing::trace!(
252                length = %length,
253                byte_length = %byte_length,
254                num_pruned = %records.len(),
255                "event_log::rewind",
256            );
257        }
258
259        Err(Error::CommitNotFound(*commit).into())
260    }
261
262    async fn load_tree(&mut self) -> StdResult<(), Self::Error> {
263        let mut commits = Vec::new();
264
265        let mut it = self.iter(false).await?;
266        while let Some(record) = it.next().await? {
267            commits.push(record.commit());
268        }
269
270        self.tree = CommitTree::new();
271        self.tree.append(&mut commits);
272        self.tree.commit();
273        Ok(())
274    }
275
276    async fn clear(&mut self) -> StdResult<(), Self::Error> {
277        self.truncate().await?;
278        self.tree = CommitTree::new();
279        Ok(())
280    }
281
282    async fn apply(&mut self, events: &[T]) -> StdResult<(), Self::Error> {
283        let mut records = Vec::with_capacity(events.len());
284        for event in events {
285            records.push(EventRecord::encode_event(event).await?);
286        }
287        self.apply_records(records).await
288    }
289
290    async fn apply_records(
291        &mut self,
292        records: Vec<EventRecord>,
293    ) -> StdResult<(), Self::Error> {
294        let mut buffer: Vec<u8> = Vec::new();
295        let mut commits = Vec::new();
296        let mut last_commit_hash = self.tree().last_commit();
297
298        for mut record in records {
299            record.set_last_commit(last_commit_hash);
300            let mut buf = encode(&record).await?;
301            buffer.append(&mut buf);
302            last_commit_hash = Some(*record.commit());
303            commits.push(*record.commit());
304        }
305
306        #[allow(unused_mut)]
307        let mut file = OpenOptions::new()
308            // NOTE: must also set read() for Windows advisory locks
309            // NOTE: otherwise we will get "Access denied (OS error 5)"
310            // SEE: https://github.com/rust-lang/rust/issues/54118
311            .read(true)
312            .write(true)
313            .append(true)
314            .open(&self.data)
315            .await?;
316
317        #[cfg(target_arch = "wasm32")]
318        {
319            use tokio::io::AsyncSeekExt;
320            file.seek(SeekFrom::End(0)).await?;
321        }
322
323        let mut guard = file.lock_write().await.map_err(|e| e.error)?;
324        match guard.write_all(&buffer).await {
325            Ok(_) => {
326                guard.flush().await?;
327                let mut hashes =
328                    commits.iter().map(|c| *c.as_ref()).collect::<Vec<_>>();
329                self.tree.append(&mut hashes);
330                self.tree.commit();
331                Ok(())
332            }
333            Err(e) => Err(e.into()),
334        }
335    }
336
337    async fn patch_checked(
338        &mut self,
339        commit_proof: &CommitProof,
340        patch: &Patch<T>,
341    ) -> StdResult<CheckedPatch, Self::Error> {
342        let comparison = self.tree().compare(commit_proof)?;
343
344        match comparison {
345            Comparison::Equal => {
346                self.patch_unchecked(patch).await?;
347                let proof = self.tree().head()?;
348                Ok(CheckedPatch::Success(proof))
349            }
350            Comparison::Contains(indices) => {
351                let head = self.tree().head()?;
352                let contains = self.tree().proof(&indices)?;
353                Ok(CheckedPatch::Conflict {
354                    head,
355                    contains: Some(contains),
356                })
357            }
358            Comparison::Unknown => {
359                let head = self.tree().head()?;
360                Ok(CheckedPatch::Conflict {
361                    head,
362                    contains: None,
363                })
364            }
365        }
366    }
367
368    async fn replace_all_events(
369        &mut self,
370        diff: &Diff<T>,
371    ) -> StdResult<(), Self::Error> {
372        // Create a snapshot for disc-based implementations
373        let snapshot = self.try_create_snapshot().await?;
374
375        // Erase the file content and in-memory merkle tree
376        self.clear().await?;
377
378        // Apply the new events
379        self.patch_unchecked(&diff.patch).await?;
380
381        // Verify against the checkpoint
382        let computed = self.tree().head()?;
383        let verified = computed == diff.checkpoint;
384
385        let mut rollback_completed = false;
386        match (verified, &snapshot) {
387            // Try to rollback if verification failed
388            (false, Some(snapshot_path)) => {
389                rollback_completed =
390                    self.try_rollback_snapshot(snapshot_path).await.is_ok();
391            }
392            // Delete the snapshot if verified
393            (true, Some(snapshot_path)) => {
394                vfs::remove_file(snapshot_path).await?;
395            }
396            _ => {}
397        }
398
399        if !verified {
400            return Err(Error::CheckpointVerification {
401                checkpoint: diff.checkpoint.root,
402                computed: computed.root,
403                snapshot,
404                rollback_completed,
405            }
406            .into());
407        }
408
409        Ok(())
410    }
411
412    async fn patch_unchecked(
413        &mut self,
414        patch: &Patch<T>,
415    ) -> StdResult<(), Self::Error> {
416        /*
417        if let Some(record) = patch.records().first() {
418            self.check_event_time_ahead(record).await?;
419        }
420        */
421        self.apply_records(patch.records().to_vec()).await
422    }
423
424    async fn diff_records(
425        &self,
426        commit: Option<&CommitHash>,
427    ) -> StdResult<Vec<EventRecord>, Self::Error> {
428        let mut events = Vec::new();
429        // let file = self.file();
430        let mut it = self.iter(true).await?;
431        while let Some(record) = it.next().await? {
432            if let Some(commit) = commit {
433                if &record.commit() == commit.as_ref() {
434                    return Ok(events);
435                }
436            }
437            let buffer = read_event_buffer(&self.data, &record).await?;
438            // Iterating in reverse order as we would typically
439            // be looking for commits near the end of the event log
440            // but we want the patch events in the order they were
441            // appended so insert at the beginning to reverse the list
442            let event_record = record.into_event_record(buffer);
443            events.insert(0, event_record);
444        }
445
446        // If the caller wanted to patch until a particular commit
447        // but it doesn't exist we error otherwise we would return
448        // all the events
449        if let Some(commit) = commit {
450            return Err(Error::CommitNotFound(*commit).into());
451        }
452
453        Ok(events)
454    }
455
456    fn version(&self) -> u16 {
457        self.version.unwrap_or(VERSION1)
458    }
459}
460
461impl<T, E> FileSystemEventLog<T, E>
462where
463    T: Default + Encodable + Decodable + Send + Sync + 'static,
464    E: std::error::Error
465        + std::fmt::Debug
466        + From<sos_core::Error>
467        + From<crate::Error>
468        + From<std::io::Error>
469        + Send
470        + Sync
471        + 'static,
472{
473    /// Path to the event log file.
474    pub fn file_path(&self) -> &PathBuf {
475        &self.data
476    }
477
478    async fn truncate(&mut self) -> StdResult<(), E> {
479        use tokio::io::{
480            AsyncSeekExt as TokioAsyncSeekExt,
481            AsyncWriteExt as TokioAsyncWriteExt,
482        };
483
484        // Workaround for set_len(0) failing with "Access Denied" on Windows
485        // SEE: https://github.com/rust-lang/rust/issues/105437
486        let mut file = OpenOptions::new()
487            .write(true)
488            .truncate(true)
489            .open(&self.data)
490            .await?;
491
492        file.seek(SeekFrom::Start(0)).await?;
493
494        let mut guard = file.lock_write().await.map_err(|e| e.error)?;
495        guard.write_all(self.identity).await?;
496        if let Some(version) = self.version {
497            guard.write_all(&version.to_le_bytes()).await?;
498        }
499        guard.flush().await?;
500
501        Ok(())
502    }
503
504    /// Read the event data from an item.
505    #[doc(hidden)]
506    pub async fn decode_event(
507        &self,
508        item: &EventLogRecord,
509    ) -> StdResult<T, E> {
510        let value = item.value();
511
512        let file = File::open(&self.data).await?;
513        let mut guard = file.lock_read().await.map_err(|e| e.error)?;
514
515        guard.seek(SeekFrom::Start(value.start)).await?;
516        let mut buffer = vec![0; (value.end - value.start) as usize];
517        guard.read_exact(buffer.as_mut_slice()).await?;
518
519        let mut stream = BufReader::new(Cursor::new(&mut buffer));
520        let mut reader = BinaryReader::new(&mut stream, encoding_options());
521        let mut event: T = Default::default();
522        event.decode(&mut reader).await?;
523        Ok(event)
524    }
525
526    /// Iterate the event records.
527    pub async fn iter(&self, reverse: bool) -> StdResult<Iter, E> {
528        let content_offset = self.header_len() as u64;
529        let read_stream = File::open(&self.data).await?;
530        let it: Iter = Box::new(
531            FormatStream::<EventLogRecord, File>::new_file(
532                read_stream,
533                self.identity,
534                true,
535                Some(content_offset),
536                reverse,
537            )
538            .await?,
539        );
540        Ok(it)
541    }
542
543    /// Length of the file magic bytes and optional
544    /// encoding version.
545    #[doc(hidden)]
546    fn header_len(&self) -> usize {
547        let mut len = self.identity.len();
548        if self.version.is_some() {
549            len += (u16::BITS / 8) as usize;
550        }
551        len
552    }
553
554    /*
555    /// Find the last log record using a reverse iterator.
556    async fn head_record(&self) -> Result<Option<EventLogRecord>> {
557        let mut it = self.iter(true).await?;
558        it.next().await
559    }
560    */
561
562    /*
563    #[doc(hidden)]
564    async fn check_event_time_ahead(
565        &self,
566        record: &EventRecord,
567    ) -> Result<()> {
568        if let Some(head_record) = self.head_record().await? {
569            if record.time().0 < head_record.time().0 {
570                println!("record: {:#?}", record.time().0);
571                println!("head: {:#?}", head_record.time().0);
572                return Err(Error::EventTimeBehind);
573            }
574        }
575        Ok(())
576    }
577    */
578
579    /// Create an event log file if it does not exist.
580    ///
581    /// Ensure the identity bytes are written when the file
582    /// length is zero.
583    async fn initialize_event_log<P: AsRef<Path>>(
584        path: P,
585        identity: &'static [u8],
586        encoding_version: Option<u16>,
587    ) -> StdResult<(), E> {
588        let file = OpenOptions::new()
589            .create(true)
590            .write(true)
591            .open(path.as_ref())
592            .await?;
593
594        let size = vfs::metadata(path.as_ref()).await?.len();
595        if size == 0 {
596            let mut guard = file.lock_write().await.map_err(|e| e.error)?;
597            let mut header = identity.to_vec();
598            if let Some(version) = encoding_version {
599                header.extend_from_slice(&version.to_le_bytes());
600            }
601            guard.write_all(&header).await?;
602            guard.flush().await?;
603        }
604
605        Ok(())
606    }
607
608    #[doc(hidden)]
609    async fn try_create_snapshot(&self) -> StdResult<Option<PathBuf>, E> {
610        if let Some(root) = self.tree().root() {
611            let mut snapshot_path = self.data.clone();
612            snapshot_path.set_extension(&format!("snapshot-{}", root));
613
614            let metadata = vfs::metadata(&self.data).await?;
615            tracing::debug!(
616                num_events = %self.tree().len(),
617                file_size = %metadata.len(),
618                source = %self.data.display(),
619                snapshot = %snapshot_path.display(),
620                "event_log::snapshot::create"
621            );
622
623            vfs::copy(&self.data, &snapshot_path).await?;
624            Ok(Some(snapshot_path))
625        } else {
626            Ok(None)
627        }
628    }
629
630    #[doc(hidden)]
631    async fn try_rollback_snapshot(
632        &mut self,
633        snapshot_path: &PathBuf,
634    ) -> StdResult<(), E> {
635        let source_path = self.data.clone();
636
637        let metadata = vfs::metadata(snapshot_path).await?;
638        tracing::debug!(
639            file_size = %metadata.len(),
640            source = %source_path.display(),
641            snapshot = %snapshot_path.display(),
642            "event_log::snapshot::rollback"
643        );
644
645        vfs::remove_file(&source_path).await?;
646        vfs::rename(snapshot_path, &source_path).await?;
647        self.load_tree().await?;
648
649        Ok(())
650    }
651}
652
653impl<E> FileSystemEventLog<WriteEvent, E>
654where
655    E: std::error::Error
656        + std::fmt::Debug
657        + From<sos_core::Error>
658        + From<crate::Error>
659        + From<std::io::Error>
660        + Send
661        + Sync
662        + 'static,
663{
664    /// Create a new folder event log file.
665    pub async fn new_folder<P: AsRef<Path>>(path: P) -> StdResult<Self, E> {
666        use sos_core::constants::FOLDER_EVENT_LOG_IDENTITY;
667        // Note that for backwards compatibility we don't
668        // encode a version, later we will need to upgrade
669        // the encoding to include a version
670        Self::initialize_event_log(
671            path.as_ref(),
672            &FOLDER_EVENT_LOG_IDENTITY,
673            None,
674        )
675        .await?;
676
677        read_file_identity_bytes(path.as_ref(), &FOLDER_EVENT_LOG_IDENTITY)
678            .await?;
679
680        Ok(Self {
681            data: path.as_ref().to_path_buf(),
682            tree: Default::default(),
683            identity: &FOLDER_EVENT_LOG_IDENTITY,
684            version: None,
685            phantom: std::marker::PhantomData,
686        })
687    }
688}
689
690impl<E> FileSystemEventLog<AccountEvent, E>
691where
692    E: std::error::Error
693        + std::fmt::Debug
694        + From<sos_core::Error>
695        + From<crate::Error>
696        + From<std::io::Error>
697        + Send
698        + Sync
699        + 'static,
700{
701    /// Create a new account event log file.
702    pub async fn new_account<P: AsRef<Path>>(path: P) -> StdResult<Self, E> {
703        use sos_core::{
704            constants::ACCOUNT_EVENT_LOG_IDENTITY, encoding::VERSION,
705        };
706        Self::initialize_event_log(
707            path.as_ref(),
708            &ACCOUNT_EVENT_LOG_IDENTITY,
709            Some(VERSION),
710        )
711        .await?;
712
713        read_file_identity_bytes(path.as_ref(), &ACCOUNT_EVENT_LOG_IDENTITY)
714            .await?;
715
716        Ok(Self {
717            data: path.as_ref().to_path_buf(),
718            tree: Default::default(),
719            identity: &ACCOUNT_EVENT_LOG_IDENTITY,
720            version: Some(VERSION),
721            phantom: std::marker::PhantomData,
722        })
723    }
724}
725
726impl<E> FileSystemEventLog<DeviceEvent, E>
727where
728    E: std::error::Error
729        + std::fmt::Debug
730        + From<sos_core::Error>
731        + From<crate::Error>
732        + From<std::io::Error>
733        + Send
734        + Sync
735        + 'static,
736{
737    /// Create a new device event log file.
738    pub async fn new_device(path: impl AsRef<Path>) -> StdResult<Self, E> {
739        use sos_core::{
740            constants::DEVICE_EVENT_LOG_IDENTITY, encoding::VERSION,
741        };
742
743        Self::initialize_event_log(
744            path.as_ref(),
745            &DEVICE_EVENT_LOG_IDENTITY,
746            Some(VERSION),
747        )
748        .await?;
749
750        read_file_identity_bytes(path.as_ref(), &DEVICE_EVENT_LOG_IDENTITY)
751            .await?;
752
753        Ok(Self {
754            data: path.as_ref().to_path_buf(),
755            tree: Default::default(),
756            identity: &DEVICE_EVENT_LOG_IDENTITY,
757            version: Some(VERSION),
758            phantom: std::marker::PhantomData,
759        })
760    }
761}
762
763#[cfg(feature = "files")]
764impl<E> FileSystemEventLog<FileEvent, E>
765where
766    E: std::error::Error
767        + std::fmt::Debug
768        + From<sos_core::Error>
769        + From<crate::Error>
770        + From<std::io::Error>
771        + Send
772        + Sync
773        + 'static,
774{
775    /// Create a new file event log file.
776    pub async fn new_file(path: impl AsRef<Path>) -> StdResult<Self, E> {
777        use sos_core::{
778            constants::FILE_EVENT_LOG_IDENTITY, encoding::VERSION,
779        };
780
781        Self::initialize_event_log(
782            path.as_ref(),
783            &FILE_EVENT_LOG_IDENTITY,
784            Some(VERSION),
785        )
786        .await?;
787
788        read_file_identity_bytes(path.as_ref(), &FILE_EVENT_LOG_IDENTITY)
789            .await?;
790
791        Ok(Self {
792            data: path.as_ref().to_path_buf(),
793            tree: Default::default(),
794            identity: &FILE_EVENT_LOG_IDENTITY,
795            version: Some(VERSION),
796            phantom: std::marker::PhantomData,
797        })
798    }
799}