versioned_file/
lib.rs

1#![forbid(unsafe_code)]
2#![deny(missing_docs)]
3#![deny(unused_must_use)]
4#![deny(unused_mut)]
5// TODO: Ideally we could add some safety here to avoid errors where we are close to running out of
6// disk space.
7//
8// TODO: Should probably have some linting mechanism here to ensure that we only use read_exact and
9// write_all.
10//
11// TODO: Should add a switch to VersionedFile so that we can simulate a broken filesystem for ACID
12// testing. Probably should have that switch accept a mutexed object which will tell it when to
13// start failing. Then we can have many files at the same time coordinate various types of
14// failures.
15//
16// TODO: We should add random occasional checks to the code which verify that the vf cursor matches
17// the file handle cursor.
18//
19// TODO: Need to figure out how the versioned-file fits into a larger ACID ecosystem. Probably best
20// approach is to make it a base layer that doesn't provide any acid guarantees (beyond the header,
21// which is ACID compliant), and let larger transactional frameworks use versioned-file as a
22// building block.
23
24// NOTE: The way that we handle UpgradeFunc is unweildly and unfortunate. I was unable to figure
25// out a clean way to do async function pointers in rust, and the method that I did find started to
26// have issues as soon as one of the arguments was a pointer itself. If someone wants to take a
27// stab at cleaning up the UpgradeFunc / WrappedUpgradeFunc structs and related code, improvements
28// would be much welcomed.
29
30//! The VersionedFile crate provides a wrapper to async_std::File which adds an invisible 4096 byte
31//! header to the file which tracks things like the file version number and the file identifier.
32//! When using methods like `seek` and `set_len`, the header will be ignored. For example, calling
33//! set_len(0) will result in a file that has a logical size of 0 but the 4096 byte file header
34//! will still be intact.
35//!
36//! The most useful portion of the header is the version number, which allows the software to
37//! easily detect when a file is using an out-of-date version and complete an update on that file.
38//! The other part of the header is an identifier which may help to recover the file in the event
39//! that the file's name is changed or lost unexpectedly.
40//!
41//! When a VersionedFile is opened, the caller passes in the latest version of the file, along with
42//! a chain of upgrades that can be used to upgrade older versions of the file to the most recent
43//! version. Finally, the identifier is also supplied so that an error can be thrown if the file
44//! has the wrong identifier.
45//!
46//! ```
47//! // Basic file operations
48//! use async_std::io::SeekFrom;
49//! use std::path::PathBuf;
50//!
51//! use anyhow::{bail, Result, Error};
52//! use versioned_file::{open_file, wrap_upgrade_process, Upgrade, VersionedFile};
53//!
54//! #[async_std::main]
55//! async fn main() {
56//!     // Create a file with open_file:
57//!     let path = PathBuf::from("target/docs-example-file.txt");
58//!     let identifier = "VersionedFileDocs::example.txt";
59//!     let mut versioned_file = open_file(&path, identifier, 1, &Vec::new()).await.unwrap();
60//!
61//!     // Use write_all and read_exact for read/write operations:
62//!     versioned_file.write_all(b"hello, world!").await.unwrap();
63//!     versioned_file.seek(SeekFrom::Start(0)).await.unwrap();
64//!     let mut buf = vec![0u8; versioned_file.len().await.unwrap() as usize];
65//!     versioned_file.read_exact(&mut buf).await.unwrap();
66//!     if buf != b"hello, world!" {
67//!         panic!("example did not read correctly");
68//!     }
69//! }
70//! ```
71//! ```
72//! // Simple upgrade example
73//! use async_std::io::SeekFrom;
74//! use std::path::PathBuf;
75//!
76//! use anyhow::{bail, Result, Error};
77//! use versioned_file::{open_file, wrap_upgrade_process, Upgrade, VersionedFile};
78//!
79//! // An example of a function that upgrades a file from version 1 to version 2, while making
80//! // changes to the body of the file.
81//! async fn example_upgrade(
82//!     mut vf: VersionedFile,
83//!     initial_version: u8,
84//!     updated_version: u8,
85//! ) -> Result<(), Error> {
86//!     // Check that the version is okay.
87//!     if initial_version != 1 || updated_version != 2 {
88//!         bail!("wrong version");
89//!     }
90//!
91//!     // Truncate the file and replace the data
92//!     vf.set_len(0).await.unwrap();
93//!     let new_data = b"hello, update!";
94//!     vf.write_all(new_data).await.unwrap();
95//!     Ok(())
96//! }
97//!
98//! #[async_std::main]
99//! async fn main() {
100//!     // Open a file with an upgrade process:
101//!     let path = PathBuf::from("target/docs-example-file.txt");
102//!     let identifier = "VersionedFileDocs::example.txt";
103//!     let upgrade = Upgrade {
104//!         initial_version: 1,
105//!         updated_version: 2,
106//!         process: wrap_upgrade_process(example_upgrade),
107//!     };
108//!     let mut vf = open_file(&path, identifier, 2, &vec![upgrade]).await.unwrap();
109//!     let mut buf = vec![0u8; vf.len().await.unwrap() as usize];
110//!     vf.read_exact(&mut buf).await.unwrap();
111//!     if buf != b"hello, update!" {
112//!         panic!("example did not update correctly");
113//!     }
114//!
115//!     // Clean-up
116//!     std::fs::remove_file(PathBuf::from("target/docs-example-file.txt"));
117//! }
118
119use async_std::fs::{File, OpenOptions};
120use async_std::io::prelude::SeekExt;
121use async_std::io::{ReadExt, SeekFrom, WriteExt};
122use async_std::prelude::Future;
123use std::collections::HashMap;
124use std::path::PathBuf;
125use std::pin::Pin;
126use std::str::from_utf8;
127
128use anyhow::{bail, Context, Error, Result};
129
130/// UpgradeFunc is a pointer to a function that upgrades a file from one version to the next.
131/// The intended starting and ending versions are explicitly stated in the input. When the upgrade
132/// is complete, the file cursor will automatically be placed back at the start of the file.
133///
134/// It may seem rather redundant to explicitly declare the version transition in the input to the
135/// UpgradeFunc, as the version numbers are already stated multiple times elsewhere as well.
136/// Under normal circumstances, this redundancy would be seen as excessive, however a file upgrade
137/// has the potential to corrupt or destory data, so we want extra layers of protection to ensure
138/// that the wrong upgrade process is not called on a file.
139///
140/// This type is not usable until it has been wrapped with `wrap_upgrade_process`.
141
142/// UpgradeFunc defines the signature for a function that can be used to upgrade a
143/// VersionedFile. The UpgradeFunc function will receive the file that needs to be upgraded, and
144/// it will also receive the intended initial version and upgraded version. The version inputs
145/// allow the upgrade function to double check that the right upgrade is being used - if a bug in
146/// the library somehow causes the wrong upgrade to be used, the user may end up with corrupted
147/// data. For that reason, we place extra redundancy around the version checks.
148///
149/// UpgradeFunc functions cannot be used directly due to Rust's current inability to support
150/// async function pointers. To use an UpgradeFunc, one must call `wrap_upgrade_process` first.
151pub type UpgradeFunc =
152    fn(file: VersionedFile, initial_version: u8, upgraded_version: u8) -> Result<(), Error>;
153
154/// WrappedUpgradeFunc is a type that wraps an UpgradeFunc so that the UpgradeFunc can be
155/// used as a function pointer in the call to `open_file`.
156pub type WrappedUpgradeFunc =
157    Box<dyn Fn(VersionedFile, u8, u8) -> Pin<Box<dyn Future<Output = Result<(), Error>>>>>;
158
159/// wrap_upgrade_process is a function that will convert an UpgradeFunc into a
160/// WrappedUpgradeFunc.
161pub fn wrap_upgrade_process<T>(f: fn(VersionedFile, u8, u8) -> T) -> WrappedUpgradeFunc
162where
163    T: Future<Output = Result<(), Error>> + 'static,
164{
165    Box::new(move |x, y, z| Box::pin(f(x, y, z)))
166}
167
168/// Upgrade defines an upgrade process for upgrading the data in a file from one version to
169/// another.
170pub struct Upgrade {
171    /// initial_version designates the version of the file that this upgrade should be applied to.
172    pub initial_version: u8,
173    /// updated_version designates the version of the file after the upgrade is complete.
174    pub updated_version: u8,
175    /// process defines the function that is used to upgrade the file.
176    pub process: WrappedUpgradeFunc,
177}
178
179/// VersionedFile defines the main type for the crate, and implements an API for safely
180/// manipulating versioned files. The API is based on the async_std::File interface, but with some
181/// adjustments that are designed to make it both safer and more ergonomic. For example, len() is
182/// exposed directly rather than having to first fetch the file metadata. Another example, all
183/// calls to write will automatically flush() the file.
184///
185/// If a function is not fully documented, it is safe to assume that the function follows the same
186/// convensions/rules as its equivalent function for async_std::File.
187#[derive(Debug)]
188pub struct VersionedFile {
189    /// file houses the underlying file handle of the VersionedFile.
190    file: File,
191
192    /// cursor tracks the read offset of the underlying file. Implementers need to take care
193    /// not to mix up the cursor as understood by the user with the cursor relative to
194    /// the operating system.
195    cursor: u64,
196
197    /// needs_seek is set if the file needs to seek to synchronize with the cursor.
198    needs_seek: bool,
199}
200
201// When working with the VersionedFile implementation, there are a few sharp corners to watch out
202// for:
203//
204// The `cursor` field tracks the position of the cursor in the underlying file handle,
205// and will therefore be 4096 larger than the values that the user is expecting when calling Seek.
206//
207// Every function must take care to properly update the cursor after completing its operation.
208// In the event of an error, the cursor position of the file may be unknown. Every function that
209// changes the file cursor position must ensure that in the event of a failure, `needs_seek` is set
210// to `true`.
211//
212// Every function that depends on the file cursor must check `needs_seek` before executing, so that
213// the cursor can be set to be equal to `cursor` in the event that the current position is
214// unknown.
215//
216// Please test code careful when making changes.
217impl VersionedFile {
218    /// fix_seek will check if the file cursor has potentially drifted from the vf cursor and
219    /// attempt to fix it if drift is possible.
220    async fn fix_seek(&mut self) -> Result<(), Error> {
221        // Seek to the correct location if the previous operation was not able to update the file
222        // cursor.
223        if self.needs_seek {
224            match self.file.seek(SeekFrom::Start(self.cursor)).await {
225                Ok(_) => self.needs_seek = false,
226                Err(e) => bail!(format!(
227                    "unable to set file cursor to correct position: {}",
228                    e
229                )),
230            };
231        }
232        Ok(())
233    }
234
235    /// len will return the size of the file, not including the versioned header.
236    pub async fn len(&mut self) -> Result<u64, Error> {
237        let md = self
238            .file
239            .metadata()
240            .await
241            .context("unable to get metadata for file")?;
242        Ok(md.len() - 4096)
243    }
244
245    /// read_exact will read from the data portion of a VersionedFile. If there is an error, the
246    /// contents of buf are unspecified, and the read offset will not be updated.
247    pub async fn read_exact(&mut self, buf: &mut [u8]) -> Result<(), Error> {
248        self.fix_seek().await?;
249        self.needs_seek = true;
250
251        // Try reading the bytes. If that fails, try resetting the file cursor.
252        match self.file.read_exact(buf).await {
253            Ok(_) => {}
254            Err(e) => {
255                match self.file.seek(SeekFrom::Start(self.cursor)).await {
256                    // If resetting the cursor is successful, we can clear the needs_seek flag
257                    // before returning an error. Otherwise we leave the flag set so that future
258                    // function calls know the cursor is in the wrong spot.
259                    Ok(_) => self.needs_seek = false,
260                    Err(_) => {}
261                };
262                bail!(format!("{}", e));
263            }
264        };
265
266        // Update the cursor and clear the needs_seek flag.
267        self.cursor += buf.len() as u64;
268        self.needs_seek = false;
269        Ok(())
270    }
271
272    /// seek will seek to the provided offset within the file, ignoring the header.
273    pub async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
274        self.fix_seek().await?;
275        self.needs_seek = true;
276
277        // Special care needs to be taken with regards to the seeking boundaries. The udnerlying
278        // file has boundaries that are the whole file, but the caller has boundaries that exclude
279        // the first 4096 bytes.
280        match pos {
281            SeekFrom::Start(x) => {
282                // If seeking from the start, we need to add 4096 bytes to the caller offset to
283                // compensate for the header.
284                let new_pos = self
285                    .file
286                    .seek(SeekFrom::Start(x + 4096))
287                    .await
288                    .context("versioned file seek failed")?;
289                self.needs_seek = false;
290                self.cursor = new_pos;
291                return Ok(new_pos - 4096);
292            }
293            SeekFrom::End(x) => {
294                // If seeking from the end, we need to make sure that the caller does not seek the
295                // file into the header. We use self.len() because that will provide the length of
296                // the file excluding the header.
297                let size = self.len().await.context("unable to get file len")?;
298                if x + (size as i64) < 0 {
299                    self.needs_seek = false;
300                    bail!("cannot seek to a position before the start of the file");
301                }
302                let new_pos = self.file.seek(pos).await.context("seek failed")?;
303                self.needs_seek = false;
304                self.cursor = new_pos;
305                return Ok(new_pos - 4096);
306            }
307            SeekFrom::Current(x) => {
308                if x + (self.cursor as i64) < 4096 {
309                    self.needs_seek = false;
310                    bail!("cannot seek to a position before the start of the file");
311                }
312                let new_pos = self.file.seek(pos).await.context("seek failed")?;
313                self.needs_seek = false;
314                self.cursor = new_pos;
315                return Ok(new_pos - 4096);
316            }
317        }
318    }
319
320    /// set_len will truncate the file so that it has the provided length, excluding the header.
321    /// This operation can be used to make the file larger as well. This operation will put the
322    /// cursor at the end of the file after the length has been set.
323    pub async fn set_len(&mut self, new_len: u64) -> Result<(), Error> {
324        self.file
325            .set_len(new_len + 4096)
326            .await
327            .context("unable to adjust file length")?;
328        self.seek(SeekFrom::End(0))
329            .await
330            .context("unable to seek to new end of file")?;
331        Ok(())
332    }
333
334    /// write_all will write to the VersionedFile and then call flush(). Note that flush() is not
335    /// the same as fsync().
336    ///
337    /// If there is an error, the file cursor will not be updated.
338    pub async fn write_all(&mut self, buf: &[u8]) -> Result<(), Error> {
339        self.fix_seek().await?;
340        self.needs_seek = true;
341
342        // Try reading the bytes. If that fails, try resetting the file cursor.
343        match self.file.write_all(buf).await {
344            Ok(_) => {}
345            Err(e) => {
346                match self.file.seek(SeekFrom::Start(self.cursor)).await {
347                    // If resetting the cursor is successful, we can clear the needs_seek flag
348                    // before returning an error. Otherwise we leave the flag set so that future
349                    // function calls know the cursor is in the wrong spot.
350                    Ok(_) => self.needs_seek = false,
351                    Err(_) => {}
352                };
353                bail!(format!("{}", e));
354            }
355        };
356
357        // Flush the file. Note that this is not the same as fsync.
358        self.file.flush().await.context("unable to flush file")?;
359
360        // Update the cursor and clear the needs_seek flag.
361        self.cursor += buf.len() as u64;
362        self.needs_seek = false;
363        Ok(())
364    }
365}
366
367/// version_to_str will write out the version in ascii, adding leading zeroes if needed.
368fn version_to_str(version: u8) -> Result<String, Error> {
369    // 0 is not an allowed version, every other possible u8 is okay.
370    if version == 0 {
371        bail!("version is not allowed to be 0");
372    }
373
374    // Compute the 4 version bytes based on the latest version.
375    let mut version_string = format!("{}", version);
376    if version_string.len() == 1 {
377        version_string = format!("00{}", version);
378    } else if version_string.len() == 2 {
379        version_string = format!("0{}", version);
380    }
381    Ok(version_string)
382}
383
384/// new_file_header will write the header of the file using the expected idenfitier and the latest
385/// version.
386async fn new_file_header(
387    file: &mut File,
388    expected_identifier: &str,
389    latest_version: u8,
390) -> Result<(), Error> {
391    // Compute the full set of metadata bytes.
392    let version_string =
393        version_to_str(latest_version).context("unable to convert version to ascii string")?;
394    let header_str = format!("{}\n{}\n", version_string, expected_identifier);
395    let header_bytes = header_str.as_bytes();
396    if header_bytes.len() > 256 {
397        panic!("developer error: metadata_bytes should be guaranteed to have len below 256");
398    }
399
400    // Prepare the full header and write it to the file.
401    let mut full_header = [0u8; 4096];
402    full_header[..header_bytes.len()].copy_from_slice(header_bytes);
403    file.write_all(&full_header)
404        .await
405        .context("unable to write initial metadata")?;
406    file.flush()
407        .await
408        .context("unable to flush file after writing header")?;
409    let new_metadata = file
410        .metadata()
411        .await
412        .context("unable to get updated file metadata")?;
413    if new_metadata.len() != 4096 {
414        panic!(
415            "developer error: file did not initialize with 4096 bytes: {}",
416            new_metadata.len()
417        );
418    }
419
420    // Reset the offset to 0; after creating the file the startup routine will read the data to
421    // verify it matches.
422    file.seek(SeekFrom::Start(0))
423        .await
424        .context("unable to seek back to beginning of file")?;
425
426    Ok(())
427}
428
429/// verify_upgrade_paths verify that the set of paths provided for performing upgrades all lead to
430/// the latest version, and will return an error if some path doesn't lead to the latest version.
431/// It will also return an error if two possible paths exist for a given version.
432fn verify_upgrade_paths(upgrade_paths: &Vec<Upgrade>, latest_version: u8) -> Result<(), Error> {
433    // Enusre 0 was not used as the latest_version.
434    if latest_version == 0 {
435        bail!("version 0 is not allowed for a VersionedFile");
436    }
437
438    // Verify that an upgrade path exists for the file which carries it to the latest version.
439    let mut version_routes = HashMap::new();
440    // Verify basic properties of the graph (no cycles, no repeat sources).
441    for path in upgrade_paths {
442        if path.initial_version >= path.updated_version {
443            bail!("upgrade paths must always lead to a higher version number");
444        }
445        if version_routes.contains_key(&path.initial_version) {
446            bail!("upgrade paths can only have one upgrade for each version");
447        }
448        if path.updated_version > latest_version {
449            bail!("upgrade paths lead beyond the latest version");
450        }
451        if path.initial_version == 0 {
452            bail!("version 0 is not allowed for a VersionedFile");
453        }
454        version_routes.insert(path.initial_version, path.updated_version);
455    }
456    // Verify that all upgrades lead to the latest version. We iterate over the version_routes and mark every
457    // node that connects to a finished node.
458    let mut complete_paths = HashMap::new();
459    complete_paths.insert(latest_version, {});
460    loop {
461        let mut progress = false;
462        let mut finished = true;
463
464        for (key, value) in &version_routes {
465            if complete_paths.contains_key(key) {
466                continue;
467            }
468            if complete_paths.contains_key(value) {
469                progress = true;
470                complete_paths.insert(*key, {});
471            } else {
472                finished = false;
473            }
474        }
475
476        if finished {
477            break;
478        }
479        if progress == false {
480            bail!("update graph is incomplete, not all nodes lead to the latest version");
481        }
482    }
483
484    Ok(())
485}
486
487/// perform_file_upgrade takes a file and an upgrade, and then executes the upgrade against the
488/// file.
489async fn perform_file_upgrade(filepath: &PathBuf, u: &Upgrade) -> Result<(), Error> {
490    // Open the file and perform the upgrade.
491    let file = OpenOptions::new()
492        .read(true)
493        .write(true)
494        .open(filepath)
495        .await
496        .context("unable to open versioned file for update")?;
497    let mut versioned_file = VersionedFile {
498        file,
499        cursor: 4096,
500        needs_seek: false,
501    };
502    // Set the offset to right after the header, which is what the upgrade routines will expect.
503    versioned_file
504        .seek(SeekFrom::Start(0))
505        .await
506        .context("unable to seek in file after upgrade")?;
507    (u.process)(versioned_file, u.initial_version, u.updated_version)
508        .await
509        .context(format!(
510            "unable to complete file upgrade from version {} to {}",
511            u.initial_version, u.updated_version
512        ))?;
513        // file drops automatically because it is moved into the path.process call.
514
515    // Update the metadata to contain the new version string.
516    let file = OpenOptions::new()
517        .read(true)
518        .write(true)
519        .open(filepath)
520        .await
521        .context("unable to open versioned file for update")?;
522    let mut versioned_file = VersionedFile {
523        file,
524        cursor: 4096,
525        needs_seek: false,
526    };
527    let updated_version_str =
528        version_to_str(u.updated_version).context("upgrade path has bad version")?;
529    versioned_file
530        .file
531        .seek(SeekFrom::Start(0))
532        .await
533        .context("unable to seek to beginning of file")?;
534    versioned_file
535        .file
536        .write_all(updated_version_str.as_bytes())
537        .await
538        .context("unable to write updated version to file header")?;
539
540    Ok(())
541}
542
543/// open_file will open a versioned file.
544///
545/// If the file does not yet exist, a new VersionedFile will be created containing the
546/// latest_version and the provided identifier in the header. If the file exists but is an older
547/// version, the update_paths will be used to update the file to the latest version.
548///
549/// An error will be returned if the file does exist and has the wrong identifier, or if the file
550/// has a version that is higher than 'latest_version', or if the upgrades do not provide a valid
551/// path from the current version of the file to the latest version.
552pub async fn open_file(
553    filepath: &PathBuf,
554    expected_identifier: &str,
555    latest_version: u8,
556    upgrades: &Vec<Upgrade>,
557) -> Result<VersionedFile, Error> {
558    // Verify that the inputs match all requirements.
559    let path_str = filepath.to_str().context("could not stringify path")?;
560    if !path_str.is_ascii() {
561        bail!("path should be valid ascii");
562    }
563    if expected_identifier.len() > 251 {
564        bail!("the identifier of a versioned file cannot exceed 251 bytes");
565    }
566    if !expected_identifier.is_ascii() {
567        bail!("the identifier must be ascii");
568    }
569
570    // Open the file, creating a new file if one does not exist.
571    let mut file = OpenOptions::new()
572        .read(true)
573        .write(true)
574        .create(true)
575        .open(filepath)
576        .await
577        .context("unable to open versioned file")?;
578
579    // If the file length is zero, we assume that the file has not been created yet.
580    let file_metadata = file
581        .metadata()
582        .await
583        .context("unable to read versioned file metadata")?;
584    if file_metadata.len() == 0 {
585        new_file_header(&mut file, expected_identifier, latest_version)
586            .await
587            .context("unable to write new file header")?;
588    }
589
590    // Read the first 4096 bytes of the file to get the file header.
591    let mut header = vec![0; 4096];
592    file.read_exact(&mut header)
593        .await
594        .context("unable to read file header")?;
595
596    // Verify that the identifier bytes in the header match the expected identifier.
597    let header_identifier = from_utf8(&header[4..4 + expected_identifier.len()])
598        .context("the on-disk file identifier could not be parsed")?;
599    if header_identifier != expected_identifier {
600        bail!("the file does not have the correct identifier");
601    }
602
603    // Verify that the upgrade paths all lead to the latest version.
604    verify_upgrade_paths(&upgrades, latest_version).context("upgrade paths are invalid")?;
605
606    let version_str = from_utf8(&header[..3]).context("the on-disk version could not be parsed")?;
607    let mut version: u8 = version_str
608        .parse()
609        .context("unable to parse on-disk version")?;
610
611    // Had this weird issue with the async function pointer where I couldn't get it to work if I
612    // made the VersionedFile a pointer. As a result, we have to pass the file in, which transfers
613    // ownership, and means the file can't continue to be used in this function. I overcame that
614    // limitation by just opening and closing the file repeatedly. This is the first such place
615    // where we close the file for ownership reasons rather than because we don't need it anymore.
616    drop(file);
617
618    // Execute the upgrades.
619    while version != latest_version {
620        let mut found = false;
621        for upgrade in upgrades {
622            if upgrade.initial_version == version {
623                perform_file_upgrade(filepath, upgrade)
624                    .await
625                    .context("unable to complete file upgrade")?;
626                version = upgrade.updated_version;
627                found = true;
628                break;
629            }
630        }
631
632        // The upgrades verifier ensures that if an upgrade exists in the set of upgrades, then
633        // there also exists a path to the latest_version from that upgrade. Therefore, if this
634        // file doesn't have a path to the latest version, no other upgrades will be executed
635        // either.
636        if !found {
637            bail!("no viable upgrade path exists for file");
638        }
639    }
640
641    // Open the file and create the versioned_file.
642    let mut file = OpenOptions::new()
643        .read(true)
644        .write(true)
645        .open(filepath)
646        .await
647        .context("unable to open versioned file for update")?;
648    file.seek(SeekFrom::Start(4096))
649        .await
650        .context("unable to seek to beginning of file after header")?;
651    let versioned_file = VersionedFile {
652        file,
653        cursor: 4096,
654        needs_seek: false,
655    };
656
657    Ok(versioned_file)
658}
659
660#[cfg(test)]
661mod tests {
662    use super::*;
663
664    use testdir::testdir;
665
666    // Create a helper function which does a null upgrade so that we can do testing of the upgrade
667    // path verifier.
668    async fn stub_upgrade(_: VersionedFile, _: u8, _: u8) -> Result<(), Error> {
669        Ok(())
670    }
671
672    // This is a basic upgrade function that expects the current contents of the file to be
673    // "test_data". It will alter the contents so that they say "test".
674    async fn smoke_upgrade_1_2(
675        mut vf: VersionedFile,
676        initial_version: u8,
677        updated_version: u8,
678    ) -> Result<(), Error> {
679        // Verify that the correct version is being used.
680        if initial_version != 1 || updated_version != 2 {
681            bail!("this upgrade is intended to take the file from version 1 to version 2");
682        }
683        if vf.len().await.unwrap() != 9 {
684            bail!("file is wrong len");
685        }
686        // Read the file and verify that we are upgrading the correct data.
687        let mut buf = [0u8; 9];
688        vf.read_exact(&mut buf)
689            .await
690            .context("unable to read old file contents")?;
691        if &buf != b"test_data" {
692            bail!(format!("file appears corrupt: {:?}", buf));
693        }
694
695        // Truncate the file and write the new data into it.
696        let new_data = b"test";
697        vf.set_len(0).await.unwrap();
698        vf.write_all(new_data)
699            .await
700            .context("unable to write new data after deleting old data")?;
701        Ok(())
702    }
703
704    // smoke upgrade 2->3
705    async fn smoke_upgrade_2_3(
706        mut vf: VersionedFile,
707        initial_version: u8,
708        updated_version: u8,
709    ) -> Result<(), Error> {
710        // Verify that the correct version is being used.
711        if initial_version != 2 || updated_version != 3 {
712            bail!("this upgrade is intended to take the file from version 2 to version 3");
713        }
714        if vf.len().await.unwrap() != 4 {
715            bail!("file is wrong len");
716        }
717        // Read the file and verify that we are upgrading the correct data.
718        let mut buf = [0u8; 4];
719        vf.read_exact(&mut buf)
720            .await
721            .context("unable to read old file contents")?;
722        if &buf != b"test" {
723            bail!("file appears corrupt");
724        }
725
726        // Truncate the file and write the new data into it.
727        let new_data = b"testtest";
728        vf.set_len(0).await.unwrap();
729        vf.write_all(new_data)
730            .await
731            .context("unable to write new data after deleting old data")?;
732        Ok(())
733    }
734
735    // smoke upgrade 3->4
736    async fn smoke_upgrade_3_4(
737        mut vf: VersionedFile,
738        initial_version: u8,
739        updated_version: u8,
740    ) -> Result<(), Error> {
741        // Verify that the correct version is being used.
742        if initial_version != 3 || updated_version != 4 {
743            bail!("this upgrade is intended to take the file from version 1 to version 2");
744        }
745        if vf.len().await.unwrap() != 8 {
746            bail!("file is wrong len");
747        }
748        // Read the file and verify that we are upgrading the correct data.
749        let mut buf = [0u8; 8];
750        vf.read_exact(&mut buf)
751            .await
752            .context("unable to read old file contents")?;
753        if &buf != b"testtest" {
754            bail!("file appears corrupt");
755        }
756
757        // Truncate the file and write the new data into it.
758        let new_data = b"testtesttest";
759        vf.set_len(0).await.unwrap();
760        vf.write_all(new_data)
761            .await
762            .context("unable to write new data after deleting old data")?;
763        Ok(())
764    }
765
766    #[async_std::test]
767    // Do basic testing of all the major functions for VersionedFiles
768    async fn smoke_test() {
769        // Create a basic versioned file.
770        let dir = testdir!();
771        let test_dat = dir.join("test.dat");
772        open_file(&test_dat, "versioned_file::test.dat", 0, &Vec::new())
773            .await
774            .context("unable to create versioned file")
775            .unwrap_err();
776        open_file(&test_dat, "versioned_file::test.dat", 1, &Vec::new())
777            .await
778            .context("unable to create versioned file")
779            .unwrap();
780        // Try to open it again.
781        open_file(&test_dat, "versioned_file::test.dat", 1, &Vec::new())
782            .await
783            .context("unable to create versioned file")
784            .unwrap();
785        // Try to open it with the wrong specifier.
786        open_file(&test_dat, "bad_versioned_file::test.dat", 1, &Vec::new())
787            .await
788            .context("unable to create versioned file")
789            .unwrap_err();
790
791        // Try to make some invalid new files.
792        let invalid_name = dir.join("❄️"); // snowflake emoji in filename
793        open_file(&invalid_name, "versioned_file::test.dat", 1, &Vec::new())
794            .await
795            .context("unable to create versioned file")
796            .unwrap_err();
797        let invalid_id = dir.join("invalid_identifier.dat");
798        open_file(&invalid_id, "versioned_file::test.dat::❄️", 1, &Vec::new())
799            .await
800            .context("unable to create versioned file")
801            .unwrap_err();
802
803        // Perform a test where we open test.dat and write a small amount of data to it. Then we
804        // will open the file again and read back that data.
805        let mut file = open_file(&test_dat, "versioned_file::test.dat", 1, &Vec::new())
806            .await
807            .unwrap();
808        file.write_all(b"test_data").await.unwrap();
809        let mut file = open_file(&test_dat, "versioned_file::test.dat", 1, &Vec::new())
810            .await
811            .unwrap();
812        if file.len().await.unwrap() != 9 {
813            panic!("file has unexpected len");
814        }
815        let mut buf = [0u8; 9];
816        file.read_exact(&mut buf).await.unwrap();
817        if &buf != b"test_data" {
818            panic!("data read does not match data written");
819        }
820        // Try to open the file again and ensure the write happened in the correct spot.
821        open_file(&test_dat, "versioned_file::test.dat", 1, &Vec::new())
822            .await
823            .unwrap();
824
825        // Open the file again, this time with an upgrade for smoke_upgrade_1_2.
826        let mut upgrade_chain = vec![Upgrade {
827            initial_version: 1,
828            updated_version: 2,
829            process: wrap_upgrade_process(smoke_upgrade_1_2),
830        }];
831        let mut file = open_file(&test_dat, "versioned_file::test.dat", 2, &upgrade_chain)
832            .await
833            .unwrap();
834        if file.len().await.unwrap() != 4 {
835            panic!("file has wrong len");
836        }
837        let mut buf = [0u8; 4];
838        file.read_exact(&mut buf).await.unwrap();
839        if &buf != b"test" {
840            panic!("data read does not match data written");
841        }
842        // Try to open the file again to make sure everything still completes.
843        open_file(&test_dat, "versioned_file::test.dat", 2, &upgrade_chain)
844            .await
845            .unwrap();
846
847        // Attempt to do two upgrades at once, from 2 to 3  and 3 to 4.
848        upgrade_chain.push(Upgrade {
849            initial_version: 2,
850            updated_version: 3,
851            process: wrap_upgrade_process(smoke_upgrade_2_3),
852        });
853        upgrade_chain.push(Upgrade {
854            initial_version: 3,
855            updated_version: 4,
856            process: wrap_upgrade_process(smoke_upgrade_3_4),
857        });
858        let mut file = open_file(&test_dat, "versioned_file::test.dat", 4, &upgrade_chain)
859            .await
860            .unwrap();
861        if file.len().await.unwrap() != 12 {
862            panic!("file has wrong len");
863        }
864        let mut buf = [0u8; 12];
865        file.read_exact(&mut buf).await.unwrap();
866        if &buf != b"testtesttest" {
867            panic!("data read does not match data written");
868        }
869        // Try to open the file again to make sure everything still completes.
870        let mut file = open_file(&test_dat, "versioned_file::test.dat", 4, &upgrade_chain)
871            .await
872            .unwrap();
873
874        // Test that the seeking is implemented correctly.
875        file.seek(SeekFrom::End(-5)).await.unwrap();
876        file.write_all(b"NOVELLA").await.unwrap();
877        file.seek(SeekFrom::Current(-3)).await.unwrap();
878        file.seek(SeekFrom::Current(-4)).await.unwrap();
879        file.seek(SeekFrom::Current(-7)).await.unwrap();
880        let mut buf = [0u8; 14];
881        file.read_exact(&mut buf).await.unwrap();
882        if &buf != b"testtesNOVELLA" {
883            panic!(
884                "read data has unexpected result: {} || {}",
885                std::str::from_utf8(&buf).unwrap(),
886                buf[0]
887            );
888        }
889        file.seek(SeekFrom::Current(-2)).await.unwrap();
890        file.seek(SeekFrom::End(-15)).await.unwrap_err();
891        let mut buf = [0u8; 2];
892        file.read_exact(&mut buf).await.unwrap();
893        if &buf != b"LA" {
894            panic!("seek_end error changed file cursor");
895        }
896        file.seek(SeekFrom::Current(-2)).await.unwrap();
897        file.seek(SeekFrom::Current(-13)).await.unwrap_err();
898        file.read_exact(&mut buf).await.unwrap();
899        if &buf != b"LA" {
900            panic!("seek_end error changed file cursor");
901        }
902    }
903
904    #[test]
905    // Attempt to provide comprehensive test coverage of the upgrade path verifier.
906    fn test_verify_upgrade_paths() {
907        // Passing in no upgrades should be fine.
908        verify_upgrade_paths(&Vec::new(), 0).unwrap_err(); // 0 is not a legal version
909        verify_upgrade_paths(&Vec::new(), 1).unwrap();
910        verify_upgrade_paths(&Vec::new(), 2).unwrap();
911        verify_upgrade_paths(&Vec::new(), 255).unwrap();
912
913        // Passing in a single upgrade should be okay.
914        verify_upgrade_paths(
915            &vec![Upgrade {
916                initial_version: 1,
917                updated_version: 2,
918                process: wrap_upgrade_process(stub_upgrade),
919            }],
920            2,
921        )
922        .unwrap();
923
924        // A non-increasing upgrade is not okay.
925        verify_upgrade_paths(
926            &vec![Upgrade {
927                initial_version: 2,
928                updated_version: 2,
929                process: wrap_upgrade_process(stub_upgrade),
930            }],
931            2,
932        )
933        .unwrap_err();
934
935        // No route to final version is not okay.
936        verify_upgrade_paths(
937            &vec![Upgrade {
938                initial_version: 1,
939                updated_version: 2,
940                process: wrap_upgrade_process(stub_upgrade),
941            }],
942            3,
943        )
944        .unwrap_err();
945
946        // Simple path is okay.
947        verify_upgrade_paths(
948            &vec![
949                Upgrade {
950                    initial_version: 1,
951                    updated_version: 2,
952                    process: wrap_upgrade_process(stub_upgrade),
953                },
954                Upgrade {
955                    initial_version: 2,
956                    updated_version: 3,
957                    process: wrap_upgrade_process(stub_upgrade),
958                },
959            ],
960            3,
961        )
962        .unwrap();
963
964        // Two starting options for the same version is not okay.
965        verify_upgrade_paths(
966            &vec![
967                Upgrade {
968                    initial_version: 1,
969                    updated_version: 2,
970                    process: wrap_upgrade_process(stub_upgrade),
971                },
972                Upgrade {
973                    initial_version: 2,
974                    updated_version: 3,
975                    process: wrap_upgrade_process(stub_upgrade),
976                },
977                Upgrade {
978                    initial_version: 1,
979                    updated_version: 3,
980                    process: wrap_upgrade_process(stub_upgrade),
981                },
982            ],
983            3,
984        )
985        .unwrap_err();
986
987        // Two ending options for the same version is okay.
988        verify_upgrade_paths(
989            &vec![
990                Upgrade {
991                    initial_version: 1,
992                    updated_version: 3,
993                    process: wrap_upgrade_process(stub_upgrade),
994                },
995                Upgrade {
996                    initial_version: 2,
997                    updated_version: 3,
998                    process: wrap_upgrade_process(stub_upgrade),
999                },
1000            ],
1001            3,
1002        )
1003        .unwrap();
1004
1005        // Two ending options for the same version, version too high.
1006        verify_upgrade_paths(
1007            &vec![
1008                Upgrade {
1009                    initial_version: 1,
1010                    updated_version: 3,
1011                    process: wrap_upgrade_process(stub_upgrade),
1012                },
1013                Upgrade {
1014                    initial_version: 2,
1015                    updated_version: 3,
1016                    process: wrap_upgrade_process(stub_upgrade),
1017                },
1018            ],
1019            2,
1020        )
1021        .unwrap_err();
1022
1023        // Complex valid structure.
1024        verify_upgrade_paths(
1025            &vec![
1026                Upgrade {
1027                    initial_version: 1,
1028                    updated_version: 3,
1029                    process: wrap_upgrade_process(stub_upgrade),
1030                },
1031                Upgrade {
1032                    initial_version: 2,
1033                    updated_version: 3,
1034                    process: wrap_upgrade_process(stub_upgrade),
1035                },
1036                Upgrade {
1037                    initial_version: 3,
1038                    updated_version: 6,
1039                    process: wrap_upgrade_process(stub_upgrade),
1040                },
1041                Upgrade {
1042                    initial_version: 4,
1043                    updated_version: 6,
1044                    process: wrap_upgrade_process(stub_upgrade),
1045                },
1046                Upgrade {
1047                    initial_version: 5,
1048                    updated_version: 6,
1049                    process: wrap_upgrade_process(stub_upgrade),
1050                },
1051            ],
1052            6,
1053        )
1054        .unwrap();
1055
1056        // Complex valid structure, randomly ordered.
1057        verify_upgrade_paths(
1058            &vec![
1059                Upgrade {
1060                    initial_version: 5,
1061                    updated_version: 6,
1062                    process: wrap_upgrade_process(stub_upgrade),
1063                },
1064                Upgrade {
1065                    initial_version: 2,
1066                    updated_version: 3,
1067                    process: wrap_upgrade_process(stub_upgrade),
1068                },
1069                Upgrade {
1070                    initial_version: 3,
1071                    updated_version: 6,
1072                    process: wrap_upgrade_process(stub_upgrade),
1073                },
1074                Upgrade {
1075                    initial_version: 1,
1076                    updated_version: 3,
1077                    process: wrap_upgrade_process(stub_upgrade),
1078                },
1079                Upgrade {
1080                    initial_version: 4,
1081                    updated_version: 6,
1082                    process: wrap_upgrade_process(stub_upgrade),
1083                },
1084            ],
1085            6,
1086        )
1087        .unwrap();
1088
1089        // Complex structure, randomly ordered, one orphan.
1090        verify_upgrade_paths(
1091            &vec![
1092                Upgrade {
1093                    initial_version: 2,
1094                    updated_version: 5,
1095                    process: wrap_upgrade_process(stub_upgrade),
1096                },
1097                Upgrade {
1098                    initial_version: 6,
1099                    updated_version: 7,
1100                    process: wrap_upgrade_process(stub_upgrade),
1101                },
1102                Upgrade {
1103                    initial_version: 3,
1104                    updated_version: 6,
1105                    process: wrap_upgrade_process(stub_upgrade),
1106                },
1107                Upgrade {
1108                    initial_version: 1,
1109                    updated_version: 4,
1110                    process: wrap_upgrade_process(stub_upgrade),
1111                },
1112                Upgrade {
1113                    initial_version: 4,
1114                    updated_version: 6,
1115                    process: wrap_upgrade_process(stub_upgrade),
1116                },
1117            ],
1118            6,
1119        )
1120        .unwrap_err();
1121    }
1122
1123    #[test]
1124    fn test_version_to_str() {
1125        version_to_str(0).unwrap_err();
1126        if version_to_str(1).unwrap() != "001" {
1127            panic!("1 failed");
1128        }
1129        if version_to_str(2).unwrap() != "002" {
1130            panic!("2 failed");
1131        }
1132        if version_to_str(9).unwrap() != "009" {
1133            panic!("9 failed");
1134        }
1135        if version_to_str(39).unwrap() != "039" {
1136            panic!("39 failed");
1137        }
1138        if version_to_str(139).unwrap() != "139" {
1139            panic!("139 failed");
1140        }
1141    }
1142}