sos_filesystem/
event_log.rs

1//! Event log file.
2//!
3//! Event logs consist of 4 identity bytes followed by one or more
4//! rows of log records; each row contains the row length prepended
5//! and appended so that rows can be efficiently iterated in
6//! both directions.
7//!
8//! Row components with byte sizes:
9//!
10//! ```text
11//! | 4 row length | 12 timestamp | 32 last commit hash | 32 commit hash | 4 data length | data | 4 row length |
12//! ```
13//!
14//! The first row must always contain a last commit hash that is all zero.
15//!
16use crate::{
17    formats::{
18        read_file_identity_bytes, EventLogRecord, FileItem, FormatStream,
19        FormatStreamIterator,
20    },
21    Error, Result,
22};
23use async_fd_lock::{LockRead, LockWrite};
24use async_stream::try_stream;
25use async_trait::async_trait;
26use binary_stream::futures::{BinaryReader, Decodable, Encodable};
27use futures::stream::BoxStream;
28use sos_core::{
29    commit::{CommitHash, CommitProof, CommitTree, Comparison},
30    encode,
31    encoding::{encoding_options, VERSION1},
32    events::{
33        patch::{CheckedPatch, Diff, Patch},
34        AccountEvent, DeviceEvent, EventRecord, WriteEvent,
35    },
36};
37use sos_vfs::{self as vfs, File, OpenOptions};
38use std::io::Cursor;
39use std::result::Result as StdResult;
40use std::{
41    io::SeekFrom,
42    path::{Path, PathBuf},
43};
44use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader};
45
46#[cfg(feature = "files")]
47use sos_core::events::FileEvent;
48
49pub use sos_core::events::EventLog;
50
51/// Event log for changes to an account.
52pub type AccountEventLog<E> = FileSystemEventLog<AccountEvent, E>;
53
54/// Event log for devices.
55pub type DeviceEventLog<E> = FileSystemEventLog<DeviceEvent, E>;
56
57/// Event log for changes to a folder.
58pub type FolderEventLog<E> = FileSystemEventLog<WriteEvent, E>;
59
60/// Event log for changes to external files.
61#[cfg(feature = "files")]
62pub type FileEventLog<E> = FileSystemEventLog<FileEvent, E>;
63
64/// Type of an event log file iterator.
65type Iter = Box<dyn FormatStreamIterator<EventLogRecord> + Send + Sync>;
66
67/// Read the bytes for the encoded event
68/// inside the log record.
69async fn read_event_buffer(
70    file_path: impl AsRef<Path>,
71    record: &EventLogRecord,
72) -> Result<Vec<u8>> {
73    let file = File::open(file_path.as_ref()).await?;
74    let mut guard = file.lock_read().await.map_err(|e| e.error)?;
75
76    let offset = record.value();
77    let row_len = offset.end - offset.start;
78
79    guard.seek(SeekFrom::Start(offset.start)).await?;
80
81    let mut buf = vec![0u8; row_len as usize];
82    guard.read_exact(&mut buf).await?;
83
84    Ok(buf)
85}
86
87/// Filesystem event log.
88///
89/// Appends events to an append-only writer and reads events
90/// via a reader whilst managing an in-memory merkle tree
91/// of event hashes.
92pub struct FileSystemEventLog<T, E>
93where
94    T: Default + Encodable + Decodable + Send + Sync,
95    E: std::error::Error
96        + std::fmt::Debug
97        + From<sos_core::Error>
98        + From<crate::Error>
99        + From<std::io::Error>
100        + Send
101        + Sync
102        + 'static,
103{
104    tree: CommitTree,
105    data: PathBuf,
106    identity: &'static [u8],
107    version: Option<u16>,
108    phantom: std::marker::PhantomData<(T, E)>,
109}
110
111#[async_trait]
112impl<T, E> EventLog<T> for FileSystemEventLog<T, E>
113where
114    T: Default + Encodable + Decodable + Send + Sync + 'static,
115    E: std::error::Error
116        + std::fmt::Debug
117        + From<sos_core::Error>
118        + From<crate::Error>
119        + From<std::io::Error>
120        + Send
121        + Sync
122        + 'static,
123{
124    type Error = E;
125
126    async fn record_stream(
127        &self,
128        reverse: bool,
129    ) -> BoxStream<'async_trait, StdResult<EventRecord, Self::Error>> {
130        let mut it =
131            self.iter(reverse).await.expect("to initialize iterator");
132        let file_path = self.data.clone();
133        Box::pin(try_stream! {
134            while let Some(record) = it.next().await? {
135                let event_buffer = read_event_buffer(
136                    file_path.clone(), &record).await?;
137                let event_record = record.into_event_record(event_buffer);
138                yield event_record;
139            }
140        })
141    }
142
143    async fn event_stream(
144        &self,
145        reverse: bool,
146    ) -> BoxStream<'async_trait, StdResult<(EventRecord, T), Self::Error>>
147    {
148        let mut it =
149            self.iter(reverse).await.expect("to initialize iterator");
150
151        let file_path = self.data.clone();
152        Box::pin(try_stream! {
153            while let Some(record) = it.next().await? {
154                let event_buffer = read_event_buffer(
155                    file_path.clone(), &record).await?;
156                let event_record = record.into_event_record(event_buffer);
157                let event = event_record.decode_event::<T>().await?;
158                yield (event_record, event);
159            }
160        })
161    }
162
163    async fn diff_checked(
164        &self,
165        commit: Option<CommitHash>,
166        checkpoint: CommitProof,
167    ) -> StdResult<Diff<T>, Self::Error> {
168        let patch = self.diff_events(commit.as_ref()).await?;
169        Ok(Diff::<T> {
170            last_commit: commit,
171            patch,
172            checkpoint,
173        })
174    }
175
176    async fn diff_unchecked(&self) -> StdResult<Diff<T>, Self::Error> {
177        let patch = self.diff_events(None).await?;
178        Ok(Diff::<T> {
179            last_commit: None,
180            patch,
181            checkpoint: self.tree().head()?,
182        })
183    }
184
185    async fn diff_events(
186        &self,
187        commit: Option<&CommitHash>,
188    ) -> StdResult<Patch<T>, Self::Error> {
189        let records = self.diff_records(commit).await?;
190        Ok(Patch::new(records))
191    }
192
193    fn tree(&self) -> &CommitTree {
194        &self.tree
195    }
196
197    async fn rewind(
198        &mut self,
199        commit: &CommitHash,
200    ) -> StdResult<Vec<EventRecord>, Self::Error> {
201        let mut length = vfs::metadata(&self.data).await?.len();
202        // Iterate backwards and track how many commits are pruned
203        let mut it = self.iter(true).await?;
204
205        tracing::trace!(length = %length, "event_log::rewind");
206
207        let mut records = Vec::new();
208
209        while let Some(record) = it.next().await? {
210            // Found the target commit
211            if &record.commit() == commit.as_ref() {
212                // Rewrite the in-memory tree
213                let mut leaves = self.tree().leaves().unwrap_or_default();
214                if leaves.len() > records.len() {
215                    let new_len = leaves.len() - records.len();
216                    leaves.truncate(new_len);
217                    let mut tree = CommitTree::new();
218                    tree.append(&mut leaves);
219                    tree.commit();
220                    self.tree = tree;
221                } else {
222                    return Err(Error::RewindLeavesLength.into());
223                }
224
225                // Truncate the file to the new length
226                let file =
227                    OpenOptions::new().write(true).open(&self.data).await?;
228                let mut guard =
229                    file.lock_write().await.map_err(|e| e.error)?;
230                guard.inner_mut().set_len(length).await?;
231
232                return Ok(records);
233            }
234
235            // Compute new length and number of pruned commits
236            let byte_length = record.byte_length();
237
238            if byte_length < length {
239                length -= byte_length;
240            }
241
242            let event_buffer = read_event_buffer(&self.data, &record).await?;
243
244            let event_record = record.into_event_record(event_buffer);
245            records.push(event_record);
246
247            tracing::trace!(
248                length = %length,
249                byte_length = %byte_length,
250                num_pruned = %records.len(),
251                "event_log::rewind",
252            );
253        }
254
255        Err(Error::CommitNotFound(*commit).into())
256    }
257
258    async fn load_tree(&mut self) -> StdResult<(), Self::Error> {
259        let mut commits = Vec::new();
260
261        let mut it = self.iter(false).await?;
262        while let Some(record) = it.next().await? {
263            commits.push(record.commit());
264        }
265
266        self.tree = CommitTree::new();
267        self.tree.append(&mut commits);
268        self.tree.commit();
269        Ok(())
270    }
271
272    async fn clear(&mut self) -> StdResult<(), Self::Error> {
273        self.truncate().await?;
274        self.tree = CommitTree::new();
275        Ok(())
276    }
277
278    async fn apply(&mut self, events: &[T]) -> StdResult<(), Self::Error> {
279        let mut records = Vec::with_capacity(events.len());
280        for event in events {
281            records.push(EventRecord::encode_event(event).await?);
282        }
283        self.apply_records(records).await
284    }
285
286    async fn apply_records(
287        &mut self,
288        records: Vec<EventRecord>,
289    ) -> StdResult<(), Self::Error> {
290        let mut buffer: Vec<u8> = Vec::new();
291        let mut commits = Vec::new();
292        let mut last_commit_hash = self.tree().last_commit();
293
294        for mut record in records {
295            record.set_last_commit(last_commit_hash);
296            let mut buf = encode(&record).await?;
297            buffer.append(&mut buf);
298            last_commit_hash = Some(*record.commit());
299            commits.push(*record.commit());
300        }
301
302        #[allow(unused_mut)]
303        let mut file = OpenOptions::new()
304            // NOTE: must also set read() for Windows advisory locks
305            // NOTE: otherwise we will get "Access denied (OS error 5)"
306            // SEE: https://github.com/rust-lang/rust/issues/54118
307            .read(true)
308            .write(true)
309            .append(true)
310            .open(&self.data)
311            .await?;
312
313        #[cfg(target_arch = "wasm32")]
314        {
315            use tokio::io::AsyncSeekExt;
316            file.seek(SeekFrom::End(0)).await?;
317        }
318
319        let mut guard = file.lock_write().await.map_err(|e| e.error)?;
320        match guard.write_all(&buffer).await {
321            Ok(_) => {
322                guard.flush().await?;
323                let mut hashes =
324                    commits.iter().map(|c| *c.as_ref()).collect::<Vec<_>>();
325                self.tree.append(&mut hashes);
326                self.tree.commit();
327                Ok(())
328            }
329            Err(e) => Err(e.into()),
330        }
331    }
332
333    async fn patch_checked(
334        &mut self,
335        commit_proof: &CommitProof,
336        patch: &Patch<T>,
337    ) -> StdResult<CheckedPatch, Self::Error> {
338        let comparison = self.tree().compare(commit_proof)?;
339
340        match comparison {
341            Comparison::Equal => {
342                self.patch_unchecked(patch).await?;
343                let proof = self.tree().head()?;
344                Ok(CheckedPatch::Success(proof))
345            }
346            Comparison::Contains(indices) => {
347                let head = self.tree().head()?;
348                let contains = self.tree().proof(&indices)?;
349                Ok(CheckedPatch::Conflict {
350                    head,
351                    contains: Some(contains),
352                })
353            }
354            Comparison::Unknown => {
355                let head = self.tree().head()?;
356                Ok(CheckedPatch::Conflict {
357                    head,
358                    contains: None,
359                })
360            }
361        }
362    }
363
364    async fn replace_all_events(
365        &mut self,
366        diff: &Diff<T>,
367    ) -> StdResult<(), Self::Error> {
368        // Create a snapshot for disc-based implementations
369        let snapshot = self.try_create_snapshot().await?;
370
371        // Erase the file content and in-memory merkle tree
372        self.clear().await?;
373
374        // Apply the new events
375        self.patch_unchecked(&diff.patch).await?;
376
377        // Verify against the checkpoint
378        let computed = self.tree().head()?;
379        let verified = computed == diff.checkpoint;
380
381        let mut rollback_completed = false;
382        match (verified, &snapshot) {
383            // Try to rollback if verification failed
384            (false, Some(snapshot_path)) => {
385                rollback_completed =
386                    self.try_rollback_snapshot(snapshot_path).await.is_ok();
387            }
388            // Delete the snapshot if verified
389            (true, Some(snapshot_path)) => {
390                vfs::remove_file(snapshot_path).await?;
391            }
392            _ => {}
393        }
394
395        if !verified {
396            return Err(Error::CheckpointVerification {
397                checkpoint: diff.checkpoint.root,
398                computed: computed.root,
399                snapshot,
400                rollback_completed,
401            }
402            .into());
403        }
404
405        Ok(())
406    }
407
408    async fn patch_unchecked(
409        &mut self,
410        patch: &Patch<T>,
411    ) -> StdResult<(), Self::Error> {
412        /*
413        if let Some(record) = patch.records().first() {
414            self.check_event_time_ahead(record).await?;
415        }
416        */
417        self.apply_records(patch.records().to_vec()).await
418    }
419
420    async fn diff_records(
421        &self,
422        commit: Option<&CommitHash>,
423    ) -> StdResult<Vec<EventRecord>, Self::Error> {
424        let mut events = Vec::new();
425        // let file = self.file();
426        let mut it = self.iter(true).await?;
427        while let Some(record) = it.next().await? {
428            if let Some(commit) = commit {
429                if &record.commit() == commit.as_ref() {
430                    return Ok(events);
431                }
432            }
433            let buffer = read_event_buffer(&self.data, &record).await?;
434            // Iterating in reverse order as we would typically
435            // be looking for commits near the end of the event log
436            // but we want the patch events in the order they were
437            // appended so insert at the beginning to reverse the list
438            let event_record = record.into_event_record(buffer);
439            events.insert(0, event_record);
440        }
441
442        // If the caller wanted to patch until a particular commit
443        // but it doesn't exist we error otherwise we would return
444        // all the events
445        if let Some(commit) = commit {
446            return Err(Error::CommitNotFound(*commit).into());
447        }
448
449        Ok(events)
450    }
451
452    fn version(&self) -> u16 {
453        self.version.unwrap_or(VERSION1)
454    }
455}
456
457impl<T, E> FileSystemEventLog<T, E>
458where
459    T: Default + Encodable + Decodable + Send + Sync + 'static,
460    E: std::error::Error
461        + std::fmt::Debug
462        + From<sos_core::Error>
463        + From<crate::Error>
464        + From<std::io::Error>
465        + Send
466        + Sync
467        + 'static,
468{
469    /// Path to the event log file.
470    pub fn file_path(&self) -> &PathBuf {
471        &self.data
472    }
473
474    async fn truncate(&mut self) -> StdResult<(), E> {
475        use tokio::io::{
476            AsyncSeekExt as TokioAsyncSeekExt,
477            AsyncWriteExt as TokioAsyncWriteExt,
478        };
479
480        // Workaround for set_len(0) failing with "Access Denied" on Windows
481        // SEE: https://github.com/rust-lang/rust/issues/105437
482        let mut file = OpenOptions::new()
483            .write(true)
484            .truncate(true)
485            .open(&self.data)
486            .await?;
487
488        file.seek(SeekFrom::Start(0)).await?;
489
490        let mut guard = file.lock_write().await.map_err(|e| e.error)?;
491        guard.write_all(self.identity).await?;
492        if let Some(version) = self.version {
493            guard.write_all(&version.to_le_bytes()).await?;
494        }
495        guard.flush().await?;
496
497        Ok(())
498    }
499
500    /// Read the event data from an item.
501    #[doc(hidden)]
502    pub async fn decode_event(
503        &self,
504        item: &EventLogRecord,
505    ) -> StdResult<T, E> {
506        let value = item.value();
507
508        let file = File::open(&self.data).await?;
509        let mut guard = file.lock_read().await.map_err(|e| e.error)?;
510
511        guard.seek(SeekFrom::Start(value.start)).await?;
512        let mut buffer = vec![0; (value.end - value.start) as usize];
513        guard.read_exact(buffer.as_mut_slice()).await?;
514
515        let mut stream = BufReader::new(Cursor::new(&mut buffer));
516        let mut reader = BinaryReader::new(&mut stream, encoding_options());
517        let mut event: T = Default::default();
518        event.decode(&mut reader).await?;
519        Ok(event)
520    }
521
522    /// Iterate the event records.
523    pub async fn iter(&self, reverse: bool) -> StdResult<Iter, E> {
524        let content_offset = self.header_len() as u64;
525        let read_stream = File::open(&self.data).await?;
526        let it: Iter = Box::new(
527            FormatStream::<EventLogRecord, File>::new_file(
528                read_stream,
529                self.identity,
530                true,
531                Some(content_offset),
532                reverse,
533            )
534            .await?,
535        );
536        Ok(it)
537    }
538
539    /// Length of the file magic bytes and optional
540    /// encoding version.
541    #[doc(hidden)]
542    fn header_len(&self) -> usize {
543        let mut len = self.identity.len();
544        if self.version.is_some() {
545            len += (u16::BITS / 8) as usize;
546        }
547        len
548    }
549
550    /*
551    /// Find the last log record using a reverse iterator.
552    async fn head_record(&self) -> Result<Option<EventLogRecord>> {
553        let mut it = self.iter(true).await?;
554        it.next().await
555    }
556    */
557
558    /*
559    #[doc(hidden)]
560    async fn check_event_time_ahead(
561        &self,
562        record: &EventRecord,
563    ) -> Result<()> {
564        if let Some(head_record) = self.head_record().await? {
565            if record.time().0 < head_record.time().0 {
566                println!("record: {:#?}", record.time().0);
567                println!("head: {:#?}", head_record.time().0);
568                return Err(Error::EventTimeBehind);
569            }
570        }
571        Ok(())
572    }
573    */
574
575    /// Create an event log file if it does not exist.
576    ///
577    /// Ensure the identity bytes are written when the file
578    /// length is zero.
579    async fn initialize_event_log<P: AsRef<Path>>(
580        path: P,
581        identity: &'static [u8],
582        encoding_version: Option<u16>,
583    ) -> StdResult<(), E> {
584        let file = OpenOptions::new()
585            .create(true)
586            .write(true)
587            .open(path.as_ref())
588            .await?;
589
590        let size = vfs::metadata(path.as_ref()).await?.len();
591        if size == 0 {
592            let mut guard = file.lock_write().await.map_err(|e| e.error)?;
593            let mut header = identity.to_vec();
594            if let Some(version) = encoding_version {
595                header.extend_from_slice(&version.to_le_bytes());
596            }
597            guard.write_all(&header).await?;
598            guard.flush().await?;
599        }
600
601        Ok(())
602    }
603
604    #[doc(hidden)]
605    async fn try_create_snapshot(&self) -> StdResult<Option<PathBuf>, E> {
606        if let Some(root) = self.tree().root() {
607            let mut snapshot_path = self.data.clone();
608            snapshot_path.set_extension(&format!("snapshot-{}", root));
609
610            let metadata = vfs::metadata(&self.data).await?;
611            tracing::debug!(
612                num_events = %self.tree().len(),
613                file_size = %metadata.len(),
614                source = %self.data.display(),
615                snapshot = %snapshot_path.display(),
616                "event_log::snapshot::create"
617            );
618
619            vfs::copy(&self.data, &snapshot_path).await?;
620            Ok(Some(snapshot_path))
621        } else {
622            Ok(None)
623        }
624    }
625
626    #[doc(hidden)]
627    async fn try_rollback_snapshot(
628        &mut self,
629        snapshot_path: &PathBuf,
630    ) -> StdResult<(), E> {
631        let source_path = self.data.clone();
632
633        let metadata = vfs::metadata(snapshot_path).await?;
634        tracing::debug!(
635            file_size = %metadata.len(),
636            source = %source_path.display(),
637            snapshot = %snapshot_path.display(),
638            "event_log::snapshot::rollback"
639        );
640
641        vfs::remove_file(&source_path).await?;
642        vfs::rename(snapshot_path, &source_path).await?;
643        self.load_tree().await?;
644
645        Ok(())
646    }
647}
648
649impl<E> FileSystemEventLog<WriteEvent, E>
650where
651    E: std::error::Error
652        + std::fmt::Debug
653        + From<sos_core::Error>
654        + From<crate::Error>
655        + From<std::io::Error>
656        + Send
657        + Sync
658        + 'static,
659{
660    /// Create a new folder event log file.
661    pub async fn new_folder<P: AsRef<Path>>(path: P) -> StdResult<Self, E> {
662        use sos_core::constants::FOLDER_EVENT_LOG_IDENTITY;
663        // Note that for backwards compatibility we don't
664        // encode a version, later we will need to upgrade
665        // the encoding to include a version
666        Self::initialize_event_log(
667            path.as_ref(),
668            &FOLDER_EVENT_LOG_IDENTITY,
669            None,
670        )
671        .await?;
672
673        read_file_identity_bytes(path.as_ref(), &FOLDER_EVENT_LOG_IDENTITY)
674            .await?;
675
676        Ok(Self {
677            data: path.as_ref().to_path_buf(),
678            tree: Default::default(),
679            identity: &FOLDER_EVENT_LOG_IDENTITY,
680            version: None,
681            phantom: std::marker::PhantomData,
682        })
683    }
684}
685
686impl<E> FileSystemEventLog<AccountEvent, E>
687where
688    E: std::error::Error
689        + std::fmt::Debug
690        + From<sos_core::Error>
691        + From<crate::Error>
692        + From<std::io::Error>
693        + Send
694        + Sync
695        + 'static,
696{
697    /// Create a new account event log file.
698    pub async fn new_account<P: AsRef<Path>>(path: P) -> StdResult<Self, E> {
699        use sos_core::{
700            constants::ACCOUNT_EVENT_LOG_IDENTITY, encoding::VERSION,
701        };
702        Self::initialize_event_log(
703            path.as_ref(),
704            &ACCOUNT_EVENT_LOG_IDENTITY,
705            Some(VERSION),
706        )
707        .await?;
708
709        read_file_identity_bytes(path.as_ref(), &ACCOUNT_EVENT_LOG_IDENTITY)
710            .await?;
711
712        Ok(Self {
713            data: path.as_ref().to_path_buf(),
714            tree: Default::default(),
715            identity: &ACCOUNT_EVENT_LOG_IDENTITY,
716            version: Some(VERSION),
717            phantom: std::marker::PhantomData,
718        })
719    }
720}
721
722impl<E> FileSystemEventLog<DeviceEvent, E>
723where
724    E: std::error::Error
725        + std::fmt::Debug
726        + From<sos_core::Error>
727        + From<crate::Error>
728        + From<std::io::Error>
729        + Send
730        + Sync
731        + 'static,
732{
733    /// Create a new device event log file.
734    pub async fn new_device(path: impl AsRef<Path>) -> StdResult<Self, E> {
735        use sos_core::{
736            constants::DEVICE_EVENT_LOG_IDENTITY, encoding::VERSION,
737        };
738
739        Self::initialize_event_log(
740            path.as_ref(),
741            &DEVICE_EVENT_LOG_IDENTITY,
742            Some(VERSION),
743        )
744        .await?;
745
746        read_file_identity_bytes(path.as_ref(), &DEVICE_EVENT_LOG_IDENTITY)
747            .await?;
748
749        Ok(Self {
750            data: path.as_ref().to_path_buf(),
751            tree: Default::default(),
752            identity: &DEVICE_EVENT_LOG_IDENTITY,
753            version: Some(VERSION),
754            phantom: std::marker::PhantomData,
755        })
756    }
757}
758
759#[cfg(feature = "files")]
760impl<E> FileSystemEventLog<FileEvent, E>
761where
762    E: std::error::Error
763        + std::fmt::Debug
764        + From<sos_core::Error>
765        + From<crate::Error>
766        + From<std::io::Error>
767        + Send
768        + Sync
769        + 'static,
770{
771    /// Create a new file event log file.
772    pub async fn new_file(path: impl AsRef<Path>) -> StdResult<Self, E> {
773        use sos_core::{
774            constants::FILE_EVENT_LOG_IDENTITY, encoding::VERSION,
775        };
776
777        Self::initialize_event_log(
778            path.as_ref(),
779            &FILE_EVENT_LOG_IDENTITY,
780            Some(VERSION),
781        )
782        .await?;
783
784        read_file_identity_bytes(path.as_ref(), &FILE_EVENT_LOG_IDENTITY)
785            .await?;
786
787        Ok(Self {
788            data: path.as_ref().to_path_buf(),
789            tree: Default::default(),
790            identity: &FILE_EVENT_LOG_IDENTITY,
791            version: Some(VERSION),
792            phantom: std::marker::PhantomData,
793        })
794    }
795}