sos_filesystem/
event_log.rs

1//! Event log file.
2//!
3//! Event logs consist of 4 identity bytes followed by one or more
4//! rows of log records; each row contains the row length prepended
5//! and appended so that rows can be efficiently iterated in
6//! both directions.
7//!
8//! Row components with byte sizes:
9//!
10//! ```text
11//! | 4 row length | 12 timestamp | 32 last commit hash | 32 commit hash | 4 data length | data | 4 row length |
12//! ```
13//!
14//! The first row must always contain a last commit hash that is all zero.
15//!
16use crate::{
17    formats::{
18        read_file_identity_bytes, EventLogRecord, FileItem, FormatStream,
19        FormatStreamIterator,
20    },
21    Error, Result,
22};
23use async_fd_lock::{LockRead, LockWrite};
24use async_trait::async_trait;
25use binary_stream::futures::{BinaryReader, Decodable, Encodable};
26use futures::{stream::BoxStream, StreamExt, TryStreamExt};
27use sos_core::{
28    commit::{CommitHash, CommitProof, CommitSpan, CommitTree, Comparison},
29    encode,
30    encoding::{encoding_options, VERSION1},
31    events::{
32        changes_feed,
33        patch::{CheckedPatch, Diff, Patch},
34        AccountEvent, DeviceEvent, EventLogType, EventRecord,
35        LocalChangeEvent, WriteEvent,
36    },
37    AccountId,
38};
39use sos_vfs::{self as vfs, File, OpenOptions};
40use std::result::Result as StdResult;
41use std::{
42    io::{Cursor, SeekFrom},
43    path::{Path, PathBuf},
44};
45use tokio::{
46    io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader},
47    sync::mpsc,
48};
49use tokio_stream::wrappers::ReceiverStream;
50
51#[cfg(feature = "files")]
52use sos_core::events::FileEvent;
53
54pub use sos_core::events::EventLog;
55
56/// Event log for changes to an account.
57pub type AccountEventLog<E> = FileSystemEventLog<AccountEvent, E>;
58
59/// Event log for devices.
60pub type DeviceEventLog<E> = FileSystemEventLog<DeviceEvent, E>;
61
62/// Event log for changes to a folder.
63pub type FolderEventLog<E> = FileSystemEventLog<WriteEvent, E>;
64
65/// Event log for changes to external files.
66#[cfg(feature = "files")]
67pub type FileEventLog<E> = FileSystemEventLog<FileEvent, E>;
68
69/// Type of an event log file iterator.
70type Iter = Box<dyn FormatStreamIterator<EventLogRecord> + Send + Sync>;
71
72/// Read the bytes for the encoded event
73/// inside the log record.
74async fn read_event_buffer(
75    file_path: impl AsRef<Path>,
76    record: &EventLogRecord,
77) -> Result<Vec<u8>> {
78    let file = File::open(file_path.as_ref()).await?;
79    let mut guard = file.lock_read().await.map_err(|e| e.error)?;
80
81    let offset = record.value();
82    let row_len = offset.end - offset.start;
83
84    guard.seek(SeekFrom::Start(offset.start)).await?;
85
86    let mut buf = vec![0u8; row_len as usize];
87    guard.read_exact(&mut buf).await?;
88
89    Ok(buf)
90}
91
92/// Filesystem event log.
93///
94/// Appends events to an append-only writer and reads events
95/// via a reader whilst managing an in-memory merkle tree
96/// of event hashes.
97pub struct FileSystemEventLog<T, E>
98where
99    T: Default + Encodable + Decodable + Send + Sync,
100    E: std::error::Error
101        + std::fmt::Debug
102        + From<sos_core::Error>
103        + From<crate::Error>
104        + From<std::io::Error>
105        + Send
106        + Sync
107        + 'static,
108{
109    account_id: AccountId,
110    log_type: EventLogType,
111    tree: CommitTree,
112    data: PathBuf,
113    identity: &'static [u8],
114    version: Option<u16>,
115    phantom: std::marker::PhantomData<(T, E)>,
116}
117
118#[async_trait]
119impl<T, E> EventLog<T> for FileSystemEventLog<T, E>
120where
121    T: Default + Encodable + Decodable + Send + Sync + 'static,
122    E: std::error::Error
123        + std::fmt::Debug
124        + From<sos_core::Error>
125        + From<crate::Error>
126        + From<std::io::Error>
127        + Send
128        + Sync
129        + 'static,
130{
131    type Error = E;
132
133    async fn record_stream(
134        &self,
135        reverse: bool,
136    ) -> BoxStream<'async_trait, StdResult<EventRecord, Self::Error>> {
137        let (tx, rx) =
138            mpsc::channel::<StdResult<EventRecord, Self::Error>>(8);
139
140        let mut it =
141            self.iter(reverse).await.expect("to initialize iterator");
142        let file_path = self.data.clone();
143        tokio::task::spawn(async move {
144            while let Some(record) = it.next().await? {
145                if tx.is_closed() {
146                    break;
147                }
148                let event_buffer =
149                    read_event_buffer(file_path.clone(), &record).await?;
150                let event_record = record.into_event_record(event_buffer);
151                if let Err(e) = tx.send(Ok(event_record)).await {
152                    tracing::error!(error = %e);
153                }
154            }
155            Ok::<_, Self::Error>(())
156        });
157
158        ReceiverStream::new(rx).boxed()
159    }
160
161    async fn event_stream(
162        &self,
163        reverse: bool,
164    ) -> BoxStream<'async_trait, StdResult<(EventRecord, T), Self::Error>>
165    {
166        self.record_stream(reverse)
167            .await
168            .try_filter_map(|record| async {
169                let event = record.decode_event::<T>().await?;
170                Ok(Some((record, event)))
171            })
172            .boxed()
173    }
174
175    async fn diff_checked(
176        &self,
177        commit: Option<CommitHash>,
178        checkpoint: CommitProof,
179    ) -> StdResult<Diff<T>, Self::Error> {
180        let patch = self.diff_events(commit.as_ref()).await?;
181        Ok(Diff::<T> {
182            last_commit: commit,
183            patch,
184            checkpoint,
185        })
186    }
187
188    async fn diff_unchecked(&self) -> StdResult<Diff<T>, Self::Error> {
189        let patch = self.diff_events(None).await?;
190        Ok(Diff::<T> {
191            last_commit: None,
192            patch,
193            checkpoint: self.tree().head()?,
194        })
195    }
196
197    async fn diff_events(
198        &self,
199        commit: Option<&CommitHash>,
200    ) -> StdResult<Patch<T>, Self::Error> {
201        let records = self.diff_records(commit).await?;
202        Ok(Patch::new(records))
203    }
204
205    fn tree(&self) -> &CommitTree {
206        &self.tree
207    }
208
209    async fn rewind(
210        &mut self,
211        commit: &CommitHash,
212    ) -> StdResult<Vec<EventRecord>, Self::Error> {
213        let mut length = vfs::metadata(&self.data).await?.len();
214        // Iterate backwards and track how many commits are pruned
215        let mut it = self.iter(true).await?;
216
217        tracing::trace!(length = %length, "event_log::rewind");
218
219        let mut records = Vec::new();
220
221        while let Some(record) = it.next().await? {
222            // Found the target commit
223            if &record.commit() == commit.as_ref() {
224                // Rewrite the in-memory tree
225                let mut leaves = self.tree().leaves().unwrap_or_default();
226                if leaves.len() > records.len() {
227                    let new_len = leaves.len() - records.len();
228                    leaves.truncate(new_len);
229                    let mut tree = CommitTree::new();
230                    tree.append(&mut leaves);
231                    tree.commit();
232                    self.tree = tree;
233                } else {
234                    return Err(Error::RewindLeavesLength.into());
235                }
236
237                // Truncate the file to the new length
238                let file =
239                    OpenOptions::new().write(true).open(&self.data).await?;
240                let mut guard =
241                    file.lock_write().await.map_err(|e| e.error)?;
242                guard.inner_mut().set_len(length).await?;
243
244                return Ok(records);
245            }
246
247            // Compute new length and number of pruned commits
248            let byte_length = record.byte_length();
249
250            if byte_length < length {
251                length -= byte_length;
252            }
253
254            let event_buffer = read_event_buffer(&self.data, &record).await?;
255
256            let event_record = record.into_event_record(event_buffer);
257            records.push(event_record);
258
259            tracing::trace!(
260                length = %length,
261                byte_length = %byte_length,
262                num_pruned = %records.len(),
263                "event_log::rewind",
264            );
265        }
266
267        Err(Error::CommitNotFound(*commit).into())
268    }
269
270    async fn load_tree(&mut self) -> StdResult<(), Self::Error> {
271        let mut commits = Vec::new();
272
273        let mut it = self.iter(false).await?;
274        while let Some(record) = it.next().await? {
275            commits.push(record.commit());
276        }
277
278        self.tree = CommitTree::new();
279        self.tree.append(&mut commits);
280        self.tree.commit();
281        Ok(())
282    }
283
284    async fn clear(&mut self) -> StdResult<(), Self::Error> {
285        self.truncate().await?;
286        self.tree = CommitTree::new();
287        Ok(())
288    }
289
290    async fn apply(&mut self, events: &[T]) -> StdResult<(), Self::Error> {
291        let mut records = Vec::with_capacity(events.len());
292        for event in events {
293            records.push(EventRecord::encode_event(event).await?);
294        }
295        self.apply_records(records).await
296    }
297
298    async fn apply_records(
299        &mut self,
300        records: Vec<EventRecord>,
301    ) -> StdResult<(), Self::Error> {
302        if records.is_empty() {
303            return Ok(());
304        }
305
306        let mut span = CommitSpan {
307            before: self.tree.last_commit(),
308            after: None,
309        };
310
311        let mut buffer: Vec<u8> = Vec::new();
312        let mut commits = Vec::new();
313        let mut last_commit_hash = self.tree().last_commit();
314
315        for mut record in records {
316            record.set_last_commit(last_commit_hash);
317            let mut buf = encode(&record).await?;
318            buffer.append(&mut buf);
319            last_commit_hash = Some(*record.commit());
320            commits.push(*record.commit());
321        }
322
323        #[allow(unused_mut)]
324        let mut file = OpenOptions::new()
325            // NOTE: must also set read() for Windows advisory locks
326            // NOTE: otherwise we will get "Access denied (OS error 5)"
327            // SEE: https://github.com/rust-lang/rust/issues/54118
328            .read(true)
329            .write(true)
330            .append(true)
331            .open(&self.data)
332            .await?;
333
334        #[cfg(target_arch = "wasm32")]
335        {
336            use tokio::io::AsyncSeekExt;
337            file.seek(SeekFrom::End(0)).await?;
338        }
339
340        let mut guard = file.lock_write().await.map_err(|e| e.error)?;
341        match guard.write_all(&buffer).await {
342            Ok(_) => {
343                guard.flush().await?;
344                let mut hashes =
345                    commits.iter().map(|c| *c.as_ref()).collect::<Vec<_>>();
346                self.tree.append(&mut hashes);
347                self.tree.commit();
348
349                span.after = self.tree.last_commit();
350
351                changes_feed().send_replace(
352                    LocalChangeEvent::AccountModified {
353                        account_id: self.account_id,
354                        log_type: self.log_type,
355                        commit_span: span,
356                    },
357                );
358
359                Ok(())
360            }
361            Err(e) => Err(e.into()),
362        }
363    }
364
365    async fn patch_checked(
366        &mut self,
367        commit_proof: &CommitProof,
368        patch: &Patch<T>,
369    ) -> StdResult<CheckedPatch, Self::Error> {
370        let comparison = self.tree().compare(commit_proof)?;
371
372        match comparison {
373            Comparison::Equal => {
374                self.patch_unchecked(patch).await?;
375                let proof = self.tree().head()?;
376                Ok(CheckedPatch::Success(proof))
377            }
378            Comparison::Contains(indices) => {
379                let head = self.tree().head()?;
380                let contains = self.tree().proof(&indices)?;
381                Ok(CheckedPatch::Conflict {
382                    head,
383                    contains: Some(contains),
384                })
385            }
386            Comparison::Unknown => {
387                let head = self.tree().head()?;
388                Ok(CheckedPatch::Conflict {
389                    head,
390                    contains: None,
391                })
392            }
393        }
394    }
395
396    async fn replace_all_events(
397        &mut self,
398        diff: &Diff<T>,
399    ) -> StdResult<(), Self::Error> {
400        // Create a snapshot for disc-based implementations
401        let snapshot = self.try_create_snapshot().await?;
402
403        // Erase the file content and in-memory merkle tree
404        self.clear().await?;
405
406        // Apply the new events
407        self.patch_unchecked(&diff.patch).await?;
408
409        // Verify against the checkpoint
410        let computed = self.tree().head()?;
411        let verified = computed == diff.checkpoint;
412
413        let mut rollback_completed = false;
414        match (verified, &snapshot) {
415            // Try to rollback if verification failed
416            (false, Some(snapshot_path)) => {
417                rollback_completed =
418                    self.try_rollback_snapshot(snapshot_path).await.is_ok();
419            }
420            // Delete the snapshot if verified
421            (true, Some(snapshot_path)) => {
422                vfs::remove_file(snapshot_path).await?;
423            }
424            _ => {}
425        }
426
427        if !verified {
428            return Err(Error::CheckpointVerification {
429                checkpoint: diff.checkpoint.root,
430                computed: computed.root,
431                snapshot,
432                rollback_completed,
433            }
434            .into());
435        }
436
437        Ok(())
438    }
439
440    async fn patch_unchecked(
441        &mut self,
442        patch: &Patch<T>,
443    ) -> StdResult<(), Self::Error> {
444        /*
445        if let Some(record) = patch.records().first() {
446            self.check_event_time_ahead(record).await?;
447        }
448        */
449        self.apply_records(patch.records().to_vec()).await
450    }
451
452    async fn diff_records(
453        &self,
454        commit: Option<&CommitHash>,
455    ) -> StdResult<Vec<EventRecord>, Self::Error> {
456        let mut events = Vec::new();
457        // let file = self.file();
458        let mut it = self.iter(true).await?;
459        while let Some(record) = it.next().await? {
460            if let Some(commit) = commit {
461                if &record.commit() == commit.as_ref() {
462                    return Ok(events);
463                }
464            }
465            let buffer = read_event_buffer(&self.data, &record).await?;
466            // Iterating in reverse order as we would typically
467            // be looking for commits near the end of the event log
468            // but we want the patch events in the order they were
469            // appended so insert at the beginning to reverse the list
470            let event_record = record.into_event_record(buffer);
471            events.insert(0, event_record);
472        }
473
474        // If the caller wanted to patch until a particular commit
475        // but it doesn't exist we error otherwise we would return
476        // all the events
477        if let Some(commit) = commit {
478            return Err(Error::CommitNotFound(*commit).into());
479        }
480
481        Ok(events)
482    }
483
484    fn version(&self) -> u16 {
485        self.version.unwrap_or(VERSION1)
486    }
487}
488
489impl<T, E> FileSystemEventLog<T, E>
490where
491    T: Default + Encodable + Decodable + Send + Sync + 'static,
492    E: std::error::Error
493        + std::fmt::Debug
494        + From<sos_core::Error>
495        + From<crate::Error>
496        + From<std::io::Error>
497        + Send
498        + Sync
499        + 'static,
500{
501    /// Path to the event log file.
502    pub fn file_path(&self) -> &PathBuf {
503        &self.data
504    }
505
506    async fn truncate(&mut self) -> StdResult<(), E> {
507        use tokio::io::{
508            AsyncSeekExt as TokioAsyncSeekExt,
509            AsyncWriteExt as TokioAsyncWriteExt,
510        };
511
512        // Workaround for set_len(0) failing with "Access Denied" on Windows
513        // SEE: https://github.com/rust-lang/rust/issues/105437
514        let mut file = OpenOptions::new()
515            .write(true)
516            .truncate(true)
517            .open(&self.data)
518            .await?;
519
520        file.seek(SeekFrom::Start(0)).await?;
521
522        let mut guard = file.lock_write().await.map_err(|e| e.error)?;
523        guard.write_all(self.identity).await?;
524        if let Some(version) = self.version {
525            guard.write_all(&version.to_le_bytes()).await?;
526        }
527        guard.flush().await?;
528
529        Ok(())
530    }
531
532    /// Read the event data from an item.
533    #[doc(hidden)]
534    pub async fn decode_event(
535        &self,
536        item: &EventLogRecord,
537    ) -> StdResult<T, E> {
538        let value = item.value();
539
540        let file = File::open(&self.data).await?;
541        let mut guard = file.lock_read().await.map_err(|e| e.error)?;
542
543        guard.seek(SeekFrom::Start(value.start)).await?;
544        let mut buffer = vec![0; (value.end - value.start) as usize];
545        guard.read_exact(buffer.as_mut_slice()).await?;
546
547        let mut stream = BufReader::new(Cursor::new(&mut buffer));
548        let mut reader = BinaryReader::new(&mut stream, encoding_options());
549        let mut event: T = Default::default();
550        event.decode(&mut reader).await?;
551        Ok(event)
552    }
553
554    /// Iterate the event records.
555    pub async fn iter(&self, reverse: bool) -> StdResult<Iter, E> {
556        let content_offset = self.header_len() as u64;
557        let read_stream = File::open(&self.data).await?;
558        let it: Iter = Box::new(
559            FormatStream::<EventLogRecord, File>::new_file(
560                read_stream,
561                self.identity,
562                true,
563                Some(content_offset),
564                reverse,
565            )
566            .await?,
567        );
568        Ok(it)
569    }
570
571    /// Length of the file magic bytes and optional
572    /// encoding version.
573    #[doc(hidden)]
574    fn header_len(&self) -> usize {
575        let mut len = self.identity.len();
576        if self.version.is_some() {
577            len += (u16::BITS / 8) as usize;
578        }
579        len
580    }
581
582    /*
583    /// Find the last log record using a reverse iterator.
584    async fn head_record(&self) -> Result<Option<EventLogRecord>> {
585        let mut it = self.iter(true).await?;
586        it.next().await
587    }
588    */
589
590    /*
591    #[doc(hidden)]
592    async fn check_event_time_ahead(
593        &self,
594        record: &EventRecord,
595    ) -> Result<()> {
596        if let Some(head_record) = self.head_record().await? {
597            if record.time().0 < head_record.time().0 {
598                println!("record: {:#?}", record.time().0);
599                println!("head: {:#?}", head_record.time().0);
600                return Err(Error::EventTimeBehind);
601            }
602        }
603        Ok(())
604    }
605    */
606
607    /// Create an event log file if it does not exist.
608    ///
609    /// Ensure the identity bytes are written when the file
610    /// length is zero.
611    async fn initialize_event_log<P: AsRef<Path>>(
612        path: P,
613        identity: &'static [u8],
614        encoding_version: Option<u16>,
615    ) -> StdResult<(), E> {
616        let file = OpenOptions::new()
617            .create(true)
618            .write(true)
619            .open(path.as_ref())
620            .await?;
621
622        let size = vfs::metadata(path.as_ref()).await?.len();
623        if size == 0 {
624            let mut guard = file.lock_write().await.map_err(|e| e.error)?;
625            let mut header = identity.to_vec();
626            if let Some(version) = encoding_version {
627                header.extend_from_slice(&version.to_le_bytes());
628            }
629            guard.write_all(&header).await?;
630            guard.flush().await?;
631        }
632
633        Ok(())
634    }
635
636    #[doc(hidden)]
637    async fn try_create_snapshot(&self) -> StdResult<Option<PathBuf>, E> {
638        if let Some(root) = self.tree().root() {
639            let mut snapshot_path = self.data.clone();
640            snapshot_path.set_extension(&format!("snapshot-{}", root));
641
642            let metadata = vfs::metadata(&self.data).await?;
643            tracing::debug!(
644                num_events = %self.tree().len(),
645                file_size = %metadata.len(),
646                source = %self.data.display(),
647                snapshot = %snapshot_path.display(),
648                "event_log::snapshot::create"
649            );
650
651            vfs::copy(&self.data, &snapshot_path).await?;
652            Ok(Some(snapshot_path))
653        } else {
654            Ok(None)
655        }
656    }
657
658    #[doc(hidden)]
659    async fn try_rollback_snapshot(
660        &mut self,
661        snapshot_path: &PathBuf,
662    ) -> StdResult<(), E> {
663        let source_path = self.data.clone();
664
665        let metadata = vfs::metadata(snapshot_path).await?;
666        tracing::debug!(
667            file_size = %metadata.len(),
668            source = %source_path.display(),
669            snapshot = %snapshot_path.display(),
670            "event_log::snapshot::rollback"
671        );
672
673        vfs::remove_file(&source_path).await?;
674        vfs::rename(snapshot_path, &source_path).await?;
675        self.load_tree().await?;
676
677        Ok(())
678    }
679}
680
681impl<E> FileSystemEventLog<WriteEvent, E>
682where
683    E: std::error::Error
684        + std::fmt::Debug
685        + From<sos_core::Error>
686        + From<crate::Error>
687        + From<std::io::Error>
688        + Send
689        + Sync
690        + 'static,
691{
692    /// Create a new folder event log file.
693    pub async fn new_folder<P: AsRef<Path>>(
694        path: P,
695        account_id: AccountId,
696        log_type: EventLogType,
697    ) -> StdResult<Self, E> {
698        use sos_core::constants::FOLDER_EVENT_LOG_IDENTITY;
699        // Note that for backwards compatibility we don't
700        // encode a version, later we will need to upgrade
701        // the encoding to include a version
702        Self::initialize_event_log(
703            path.as_ref(),
704            &FOLDER_EVENT_LOG_IDENTITY,
705            None,
706        )
707        .await?;
708
709        read_file_identity_bytes(path.as_ref(), &FOLDER_EVENT_LOG_IDENTITY)
710            .await?;
711
712        Ok(Self {
713            data: path.as_ref().to_path_buf(),
714            tree: Default::default(),
715            log_type,
716            account_id,
717            identity: &FOLDER_EVENT_LOG_IDENTITY,
718            version: None,
719            phantom: std::marker::PhantomData,
720        })
721    }
722}
723
724impl<E> FileSystemEventLog<AccountEvent, E>
725where
726    E: std::error::Error
727        + std::fmt::Debug
728        + From<sos_core::Error>
729        + From<crate::Error>
730        + From<std::io::Error>
731        + Send
732        + Sync
733        + 'static,
734{
735    /// Create a new account event log file.
736    pub async fn new_account<P: AsRef<Path>>(
737        path: P,
738        account_id: AccountId,
739    ) -> StdResult<Self, E> {
740        use sos_core::{
741            constants::ACCOUNT_EVENT_LOG_IDENTITY, encoding::VERSION,
742        };
743        Self::initialize_event_log(
744            path.as_ref(),
745            &ACCOUNT_EVENT_LOG_IDENTITY,
746            Some(VERSION),
747        )
748        .await?;
749
750        read_file_identity_bytes(path.as_ref(), &ACCOUNT_EVENT_LOG_IDENTITY)
751            .await?;
752
753        Ok(Self {
754            account_id,
755            log_type: EventLogType::Account,
756            data: path.as_ref().to_path_buf(),
757            tree: Default::default(),
758            identity: &ACCOUNT_EVENT_LOG_IDENTITY,
759            version: Some(VERSION),
760            phantom: std::marker::PhantomData,
761        })
762    }
763}
764
765impl<E> FileSystemEventLog<DeviceEvent, E>
766where
767    E: std::error::Error
768        + std::fmt::Debug
769        + From<sos_core::Error>
770        + From<crate::Error>
771        + From<std::io::Error>
772        + Send
773        + Sync
774        + 'static,
775{
776    /// Create a new device event log file.
777    pub async fn new_device(
778        path: impl AsRef<Path>,
779        account_id: AccountId,
780    ) -> StdResult<Self, E> {
781        use sos_core::{
782            constants::DEVICE_EVENT_LOG_IDENTITY, encoding::VERSION,
783        };
784
785        Self::initialize_event_log(
786            path.as_ref(),
787            &DEVICE_EVENT_LOG_IDENTITY,
788            Some(VERSION),
789        )
790        .await?;
791
792        read_file_identity_bytes(path.as_ref(), &DEVICE_EVENT_LOG_IDENTITY)
793            .await?;
794
795        Ok(Self {
796            log_type: EventLogType::Device,
797            account_id,
798            data: path.as_ref().to_path_buf(),
799            tree: Default::default(),
800            identity: &DEVICE_EVENT_LOG_IDENTITY,
801            version: Some(VERSION),
802            phantom: std::marker::PhantomData,
803        })
804    }
805}
806
807#[cfg(feature = "files")]
808impl<E> FileSystemEventLog<FileEvent, E>
809where
810    E: std::error::Error
811        + std::fmt::Debug
812        + From<sos_core::Error>
813        + From<crate::Error>
814        + From<std::io::Error>
815        + Send
816        + Sync
817        + 'static,
818{
819    /// Create a new file event log file.
820    pub async fn new_file(
821        path: impl AsRef<Path>,
822        account_id: AccountId,
823    ) -> StdResult<Self, E> {
824        use sos_core::{
825            constants::FILE_EVENT_LOG_IDENTITY, encoding::VERSION,
826        };
827
828        Self::initialize_event_log(
829            path.as_ref(),
830            &FILE_EVENT_LOG_IDENTITY,
831            Some(VERSION),
832        )
833        .await?;
834
835        read_file_identity_bytes(path.as_ref(), &FILE_EVENT_LOG_IDENTITY)
836            .await?;
837
838        Ok(Self {
839            account_id,
840            log_type: EventLogType::Files,
841            data: path.as_ref().to_path_buf(),
842            tree: Default::default(),
843            identity: &FILE_EVENT_LOG_IDENTITY,
844            version: Some(VERSION),
845            phantom: std::marker::PhantomData,
846        })
847    }
848}