versioned_file/lib.rs
1#![forbid(unsafe_code)]
2#![deny(missing_docs)]
3#![deny(unused_must_use)]
4#![deny(unused_mut)]
5// TODO: Ideally we could add some safety here to avoid errors where we are close to running out of
6// disk space.
7//
8// TODO: Should probably have some linting mechanism here to ensure that we only use read_exact and
9// write_all.
10//
11// TODO: Should add a switch to VersionedFile so that we can simulate a broken filesystem for ACID
12// testing. Probably should have that switch accept a mutexed object which will tell it when to
13// start failing. Then we can have many files at the same time coordinate various types of
14// failures.
15//
16// TODO: We should add random occasional checks to the code which verify that the vf cursor matches
17// the file handle cursor.
18//
19// TODO: Need to figure out how the versioned-file fits into a larger ACID ecosystem. Probably best
20// approach is to make it a base layer that doesn't provide any acid guarantees (beyond the header,
21// which is ACID compliant), and let larger transactional frameworks use versioned-file as a
22// building block.
23
24// NOTE: The way that we handle UpgradeFunc is unweildly and unfortunate. I was unable to figure
25// out a clean way to do async function pointers in rust, and the method that I did find started to
26// have issues as soon as one of the arguments was a pointer itself. If someone wants to take a
27// stab at cleaning up the UpgradeFunc / WrappedUpgradeFunc structs and related code, improvements
28// would be much welcomed.
29
30//! The VersionedFile crate provides a wrapper to async_std::File which adds an invisible 4096 byte
31//! header to the file which tracks things like the file version number and the file identifier.
32//! When using methods like `seek` and `set_len`, the header will be ignored. For example, calling
33//! set_len(0) will result in a file that has a logical size of 0 but the 4096 byte file header
34//! will still be intact.
35//!
36//! The most useful portion of the header is the version number, which allows the software to
37//! easily detect when a file is using an out-of-date version and complete an update on that file.
38//! The other part of the header is an identifier which may help to recover the file in the event
39//! that the file's name is changed or lost unexpectedly.
40//!
41//! When a VersionedFile is opened, the caller passes in the latest version of the file, along with
42//! a chain of upgrades that can be used to upgrade older versions of the file to the most recent
43//! version. Finally, the identifier is also supplied so that an error can be thrown if the file
44//! has the wrong identifier.
45//!
46//! ```
47//! // Basic file operations
48//! use async_std::io::SeekFrom;
49//! use std::path::PathBuf;
50//!
51//! use anyhow::{bail, Result, Error};
52//! use versioned_file::{open_file, wrap_upgrade_process, Upgrade, VersionedFile};
53//!
54//! #[async_std::main]
55//! async fn main() {
56//! // Create a file with open_file:
57//! let path = PathBuf::from("target/docs-example-file.txt");
58//! let identifier = "VersionedFileDocs::example.txt";
59//! let mut versioned_file = open_file(&path, identifier, 1, &Vec::new()).await.unwrap();
60//!
61//! // Use write_all and read_exact for read/write operations:
62//! versioned_file.write_all(b"hello, world!").await.unwrap();
63//! versioned_file.seek(SeekFrom::Start(0)).await.unwrap();
64//! let mut buf = vec![0u8; versioned_file.len().await.unwrap() as usize];
65//! versioned_file.read_exact(&mut buf).await.unwrap();
66//! if buf != b"hello, world!" {
67//! panic!("example did not read correctly");
68//! }
69//! }
70//! ```
71//! ```
72//! // Simple upgrade example
73//! use async_std::io::SeekFrom;
74//! use std::path::PathBuf;
75//!
76//! use anyhow::{bail, Result, Error};
77//! use versioned_file::{open_file, wrap_upgrade_process, Upgrade, VersionedFile};
78//!
79//! // An example of a function that upgrades a file from version 1 to version 2, while making
80//! // changes to the body of the file.
81//! async fn example_upgrade(
82//! mut vf: VersionedFile,
83//! initial_version: u8,
84//! updated_version: u8,
85//! ) -> Result<(), Error> {
86//! // Check that the version is okay.
87//! if initial_version != 1 || updated_version != 2 {
88//! bail!("wrong version");
89//! }
90//!
91//! // Truncate the file and replace the data
92//! vf.set_len(0).await.unwrap();
93//! let new_data = b"hello, update!";
94//! vf.write_all(new_data).await.unwrap();
95//! Ok(())
96//! }
97//!
98//! #[async_std::main]
99//! async fn main() {
100//! // Open a file with an upgrade process:
101//! let path = PathBuf::from("target/docs-example-file.txt");
102//! let identifier = "VersionedFileDocs::example.txt";
103//! let upgrade = Upgrade {
104//! initial_version: 1,
105//! updated_version: 2,
106//! process: wrap_upgrade_process(example_upgrade),
107//! };
108//! let mut vf = open_file(&path, identifier, 2, &vec![upgrade]).await.unwrap();
109//! let mut buf = vec![0u8; vf.len().await.unwrap() as usize];
110//! vf.read_exact(&mut buf).await.unwrap();
111//! if buf != b"hello, update!" {
112//! panic!("example did not update correctly");
113//! }
114//!
115//! // Clean-up
116//! std::fs::remove_file(PathBuf::from("target/docs-example-file.txt"));
117//! }
118
119use async_std::fs::{File, OpenOptions};
120use async_std::io::prelude::SeekExt;
121use async_std::io::{ReadExt, SeekFrom, WriteExt};
122use async_std::prelude::Future;
123use std::collections::HashMap;
124use std::path::PathBuf;
125use std::pin::Pin;
126use std::str::from_utf8;
127
128use anyhow::{bail, Context, Error, Result};
129
130/// UpgradeFunc is a pointer to a function that upgrades a file from one version to the next.
131/// The intended starting and ending versions are explicitly stated in the input. When the upgrade
132/// is complete, the file cursor will automatically be placed back at the start of the file.
133///
134/// It may seem rather redundant to explicitly declare the version transition in the input to the
135/// UpgradeFunc, as the version numbers are already stated multiple times elsewhere as well.
136/// Under normal circumstances, this redundancy would be seen as excessive, however a file upgrade
137/// has the potential to corrupt or destory data, so we want extra layers of protection to ensure
138/// that the wrong upgrade process is not called on a file.
139///
140/// This type is not usable until it has been wrapped with `wrap_upgrade_process`.
141
142/// UpgradeFunc defines the signature for a function that can be used to upgrade a
143/// VersionedFile. The UpgradeFunc function will receive the file that needs to be upgraded, and
144/// it will also receive the intended initial version and upgraded version. The version inputs
145/// allow the upgrade function to double check that the right upgrade is being used - if a bug in
146/// the library somehow causes the wrong upgrade to be used, the user may end up with corrupted
147/// data. For that reason, we place extra redundancy around the version checks.
148///
149/// UpgradeFunc functions cannot be used directly due to Rust's current inability to support
150/// async function pointers. To use an UpgradeFunc, one must call `wrap_upgrade_process` first.
151pub type UpgradeFunc =
152 fn(file: VersionedFile, initial_version: u8, upgraded_version: u8) -> Result<(), Error>;
153
154/// WrappedUpgradeFunc is a type that wraps an UpgradeFunc so that the UpgradeFunc can be
155/// used as a function pointer in the call to `open_file`.
156pub type WrappedUpgradeFunc =
157 Box<dyn Fn(VersionedFile, u8, u8) -> Pin<Box<dyn Future<Output = Result<(), Error>>>>>;
158
159/// wrap_upgrade_process is a function that will convert an UpgradeFunc into a
160/// WrappedUpgradeFunc.
161pub fn wrap_upgrade_process<T>(f: fn(VersionedFile, u8, u8) -> T) -> WrappedUpgradeFunc
162where
163 T: Future<Output = Result<(), Error>> + 'static,
164{
165 Box::new(move |x, y, z| Box::pin(f(x, y, z)))
166}
167
168/// Upgrade defines an upgrade process for upgrading the data in a file from one version to
169/// another.
170pub struct Upgrade {
171 /// initial_version designates the version of the file that this upgrade should be applied to.
172 pub initial_version: u8,
173 /// updated_version designates the version of the file after the upgrade is complete.
174 pub updated_version: u8,
175 /// process defines the function that is used to upgrade the file.
176 pub process: WrappedUpgradeFunc,
177}
178
179/// VersionedFile defines the main type for the crate, and implements an API for safely
180/// manipulating versioned files. The API is based on the async_std::File interface, but with some
181/// adjustments that are designed to make it both safer and more ergonomic. For example, len() is
182/// exposed directly rather than having to first fetch the file metadata. Another example, all
183/// calls to write will automatically flush() the file.
184///
185/// If a function is not fully documented, it is safe to assume that the function follows the same
186/// convensions/rules as its equivalent function for async_std::File.
187#[derive(Debug)]
188pub struct VersionedFile {
189 /// file houses the underlying file handle of the VersionedFile.
190 file: File,
191
192 /// cursor tracks the read offset of the underlying file. Implementers need to take care
193 /// not to mix up the cursor as understood by the user with the cursor relative to
194 /// the operating system.
195 cursor: u64,
196
197 /// needs_seek is set if the file needs to seek to synchronize with the cursor.
198 needs_seek: bool,
199}
200
201// When working with the VersionedFile implementation, there are a few sharp corners to watch out
202// for:
203//
204// The `cursor` field tracks the position of the cursor in the underlying file handle,
205// and will therefore be 4096 larger than the values that the user is expecting when calling Seek.
206//
207// Every function must take care to properly update the cursor after completing its operation.
208// In the event of an error, the cursor position of the file may be unknown. Every function that
209// changes the file cursor position must ensure that in the event of a failure, `needs_seek` is set
210// to `true`.
211//
212// Every function that depends on the file cursor must check `needs_seek` before executing, so that
213// the cursor can be set to be equal to `cursor` in the event that the current position is
214// unknown.
215//
216// Please test code careful when making changes.
217impl VersionedFile {
218 /// fix_seek will check if the file cursor has potentially drifted from the vf cursor and
219 /// attempt to fix it if drift is possible.
220 async fn fix_seek(&mut self) -> Result<(), Error> {
221 // Seek to the correct location if the previous operation was not able to update the file
222 // cursor.
223 if self.needs_seek {
224 match self.file.seek(SeekFrom::Start(self.cursor)).await {
225 Ok(_) => self.needs_seek = false,
226 Err(e) => bail!(format!(
227 "unable to set file cursor to correct position: {}",
228 e
229 )),
230 };
231 }
232 Ok(())
233 }
234
235 /// len will return the size of the file, not including the versioned header.
236 pub async fn len(&mut self) -> Result<u64, Error> {
237 let md = self
238 .file
239 .metadata()
240 .await
241 .context("unable to get metadata for file")?;
242 Ok(md.len() - 4096)
243 }
244
245 /// read_exact will read from the data portion of a VersionedFile. If there is an error, the
246 /// contents of buf are unspecified, and the read offset will not be updated.
247 pub async fn read_exact(&mut self, buf: &mut [u8]) -> Result<(), Error> {
248 self.fix_seek().await?;
249 self.needs_seek = true;
250
251 // Try reading the bytes. If that fails, try resetting the file cursor.
252 match self.file.read_exact(buf).await {
253 Ok(_) => {}
254 Err(e) => {
255 match self.file.seek(SeekFrom::Start(self.cursor)).await {
256 // If resetting the cursor is successful, we can clear the needs_seek flag
257 // before returning an error. Otherwise we leave the flag set so that future
258 // function calls know the cursor is in the wrong spot.
259 Ok(_) => self.needs_seek = false,
260 Err(_) => {}
261 };
262 bail!(format!("{}", e));
263 }
264 };
265
266 // Update the cursor and clear the needs_seek flag.
267 self.cursor += buf.len() as u64;
268 self.needs_seek = false;
269 Ok(())
270 }
271
272 /// seek will seek to the provided offset within the file, ignoring the header.
273 pub async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
274 self.fix_seek().await?;
275 self.needs_seek = true;
276
277 // Special care needs to be taken with regards to the seeking boundaries. The udnerlying
278 // file has boundaries that are the whole file, but the caller has boundaries that exclude
279 // the first 4096 bytes.
280 match pos {
281 SeekFrom::Start(x) => {
282 // If seeking from the start, we need to add 4096 bytes to the caller offset to
283 // compensate for the header.
284 let new_pos = self
285 .file
286 .seek(SeekFrom::Start(x + 4096))
287 .await
288 .context("versioned file seek failed")?;
289 self.needs_seek = false;
290 self.cursor = new_pos;
291 return Ok(new_pos - 4096);
292 }
293 SeekFrom::End(x) => {
294 // If seeking from the end, we need to make sure that the caller does not seek the
295 // file into the header. We use self.len() because that will provide the length of
296 // the file excluding the header.
297 let size = self.len().await.context("unable to get file len")?;
298 if x + (size as i64) < 0 {
299 self.needs_seek = false;
300 bail!("cannot seek to a position before the start of the file");
301 }
302 let new_pos = self.file.seek(pos).await.context("seek failed")?;
303 self.needs_seek = false;
304 self.cursor = new_pos;
305 return Ok(new_pos - 4096);
306 }
307 SeekFrom::Current(x) => {
308 if x + (self.cursor as i64) < 4096 {
309 self.needs_seek = false;
310 bail!("cannot seek to a position before the start of the file");
311 }
312 let new_pos = self.file.seek(pos).await.context("seek failed")?;
313 self.needs_seek = false;
314 self.cursor = new_pos;
315 return Ok(new_pos - 4096);
316 }
317 }
318 }
319
320 /// set_len will truncate the file so that it has the provided length, excluding the header.
321 /// This operation can be used to make the file larger as well. This operation will put the
322 /// cursor at the end of the file after the length has been set.
323 pub async fn set_len(&mut self, new_len: u64) -> Result<(), Error> {
324 self.file
325 .set_len(new_len + 4096)
326 .await
327 .context("unable to adjust file length")?;
328 self.seek(SeekFrom::End(0))
329 .await
330 .context("unable to seek to new end of file")?;
331 Ok(())
332 }
333
334 /// write_all will write to the VersionedFile and then call flush(). Note that flush() is not
335 /// the same as fsync().
336 ///
337 /// If there is an error, the file cursor will not be updated.
338 pub async fn write_all(&mut self, buf: &[u8]) -> Result<(), Error> {
339 self.fix_seek().await?;
340 self.needs_seek = true;
341
342 // Try reading the bytes. If that fails, try resetting the file cursor.
343 match self.file.write_all(buf).await {
344 Ok(_) => {}
345 Err(e) => {
346 match self.file.seek(SeekFrom::Start(self.cursor)).await {
347 // If resetting the cursor is successful, we can clear the needs_seek flag
348 // before returning an error. Otherwise we leave the flag set so that future
349 // function calls know the cursor is in the wrong spot.
350 Ok(_) => self.needs_seek = false,
351 Err(_) => {}
352 };
353 bail!(format!("{}", e));
354 }
355 };
356
357 // Flush the file. Note that this is not the same as fsync.
358 self.file.flush().await.context("unable to flush file")?;
359
360 // Update the cursor and clear the needs_seek flag.
361 self.cursor += buf.len() as u64;
362 self.needs_seek = false;
363 Ok(())
364 }
365}
366
367/// version_to_str will write out the version in ascii, adding leading zeroes if needed.
368fn version_to_str(version: u8) -> Result<String, Error> {
369 // 0 is not an allowed version, every other possible u8 is okay.
370 if version == 0 {
371 bail!("version is not allowed to be 0");
372 }
373
374 // Compute the 4 version bytes based on the latest version.
375 let mut version_string = format!("{}", version);
376 if version_string.len() == 1 {
377 version_string = format!("00{}", version);
378 } else if version_string.len() == 2 {
379 version_string = format!("0{}", version);
380 }
381 Ok(version_string)
382}
383
384/// new_file_header will write the header of the file using the expected idenfitier and the latest
385/// version.
386async fn new_file_header(
387 file: &mut File,
388 expected_identifier: &str,
389 latest_version: u8,
390) -> Result<(), Error> {
391 // Compute the full set of metadata bytes.
392 let version_string =
393 version_to_str(latest_version).context("unable to convert version to ascii string")?;
394 let header_str = format!("{}\n{}\n", version_string, expected_identifier);
395 let header_bytes = header_str.as_bytes();
396 if header_bytes.len() > 256 {
397 panic!("developer error: metadata_bytes should be guaranteed to have len below 256");
398 }
399
400 // Prepare the full header and write it to the file.
401 let mut full_header = [0u8; 4096];
402 full_header[..header_bytes.len()].copy_from_slice(header_bytes);
403 file.write_all(&full_header)
404 .await
405 .context("unable to write initial metadata")?;
406 file.flush()
407 .await
408 .context("unable to flush file after writing header")?;
409 let new_metadata = file
410 .metadata()
411 .await
412 .context("unable to get updated file metadata")?;
413 if new_metadata.len() != 4096 {
414 panic!(
415 "developer error: file did not initialize with 4096 bytes: {}",
416 new_metadata.len()
417 );
418 }
419
420 // Reset the offset to 0; after creating the file the startup routine will read the data to
421 // verify it matches.
422 file.seek(SeekFrom::Start(0))
423 .await
424 .context("unable to seek back to beginning of file")?;
425
426 Ok(())
427}
428
429/// verify_upgrade_paths verify that the set of paths provided for performing upgrades all lead to
430/// the latest version, and will return an error if some path doesn't lead to the latest version.
431/// It will also return an error if two possible paths exist for a given version.
432fn verify_upgrade_paths(upgrade_paths: &Vec<Upgrade>, latest_version: u8) -> Result<(), Error> {
433 // Enusre 0 was not used as the latest_version.
434 if latest_version == 0 {
435 bail!("version 0 is not allowed for a VersionedFile");
436 }
437
438 // Verify that an upgrade path exists for the file which carries it to the latest version.
439 let mut version_routes = HashMap::new();
440 // Verify basic properties of the graph (no cycles, no repeat sources).
441 for path in upgrade_paths {
442 if path.initial_version >= path.updated_version {
443 bail!("upgrade paths must always lead to a higher version number");
444 }
445 if version_routes.contains_key(&path.initial_version) {
446 bail!("upgrade paths can only have one upgrade for each version");
447 }
448 if path.updated_version > latest_version {
449 bail!("upgrade paths lead beyond the latest version");
450 }
451 if path.initial_version == 0 {
452 bail!("version 0 is not allowed for a VersionedFile");
453 }
454 version_routes.insert(path.initial_version, path.updated_version);
455 }
456 // Verify that all upgrades lead to the latest version. We iterate over the version_routes and mark every
457 // node that connects to a finished node.
458 let mut complete_paths = HashMap::new();
459 complete_paths.insert(latest_version, {});
460 loop {
461 let mut progress = false;
462 let mut finished = true;
463
464 for (key, value) in &version_routes {
465 if complete_paths.contains_key(key) {
466 continue;
467 }
468 if complete_paths.contains_key(value) {
469 progress = true;
470 complete_paths.insert(*key, {});
471 } else {
472 finished = false;
473 }
474 }
475
476 if finished {
477 break;
478 }
479 if progress == false {
480 bail!("update graph is incomplete, not all nodes lead to the latest version");
481 }
482 }
483
484 Ok(())
485}
486
487/// perform_file_upgrade takes a file and an upgrade, and then executes the upgrade against the
488/// file.
489async fn perform_file_upgrade(filepath: &PathBuf, u: &Upgrade) -> Result<(), Error> {
490 // Open the file and perform the upgrade.
491 let file = OpenOptions::new()
492 .read(true)
493 .write(true)
494 .open(filepath)
495 .await
496 .context("unable to open versioned file for update")?;
497 let mut versioned_file = VersionedFile {
498 file,
499 cursor: 4096,
500 needs_seek: false,
501 };
502 // Set the offset to right after the header, which is what the upgrade routines will expect.
503 versioned_file
504 .seek(SeekFrom::Start(0))
505 .await
506 .context("unable to seek in file after upgrade")?;
507 (u.process)(versioned_file, u.initial_version, u.updated_version)
508 .await
509 .context(format!(
510 "unable to complete file upgrade from version {} to {}",
511 u.initial_version, u.updated_version
512 ))?;
513 // file drops automatically because it is moved into the path.process call.
514
515 // Update the metadata to contain the new version string.
516 let file = OpenOptions::new()
517 .read(true)
518 .write(true)
519 .open(filepath)
520 .await
521 .context("unable to open versioned file for update")?;
522 let mut versioned_file = VersionedFile {
523 file,
524 cursor: 4096,
525 needs_seek: false,
526 };
527 let updated_version_str =
528 version_to_str(u.updated_version).context("upgrade path has bad version")?;
529 versioned_file
530 .file
531 .seek(SeekFrom::Start(0))
532 .await
533 .context("unable to seek to beginning of file")?;
534 versioned_file
535 .file
536 .write_all(updated_version_str.as_bytes())
537 .await
538 .context("unable to write updated version to file header")?;
539
540 Ok(())
541}
542
543/// open_file will open a versioned file.
544///
545/// If the file does not yet exist, a new VersionedFile will be created containing the
546/// latest_version and the provided identifier in the header. If the file exists but is an older
547/// version, the update_paths will be used to update the file to the latest version.
548///
549/// An error will be returned if the file does exist and has the wrong identifier, or if the file
550/// has a version that is higher than 'latest_version', or if the upgrades do not provide a valid
551/// path from the current version of the file to the latest version.
552pub async fn open_file(
553 filepath: &PathBuf,
554 expected_identifier: &str,
555 latest_version: u8,
556 upgrades: &Vec<Upgrade>,
557) -> Result<VersionedFile, Error> {
558 // Verify that the inputs match all requirements.
559 let path_str = filepath.to_str().context("could not stringify path")?;
560 if !path_str.is_ascii() {
561 bail!("path should be valid ascii");
562 }
563 if expected_identifier.len() > 251 {
564 bail!("the identifier of a versioned file cannot exceed 251 bytes");
565 }
566 if !expected_identifier.is_ascii() {
567 bail!("the identifier must be ascii");
568 }
569
570 // Open the file, creating a new file if one does not exist.
571 let mut file = OpenOptions::new()
572 .read(true)
573 .write(true)
574 .create(true)
575 .open(filepath)
576 .await
577 .context("unable to open versioned file")?;
578
579 // If the file length is zero, we assume that the file has not been created yet.
580 let file_metadata = file
581 .metadata()
582 .await
583 .context("unable to read versioned file metadata")?;
584 if file_metadata.len() == 0 {
585 new_file_header(&mut file, expected_identifier, latest_version)
586 .await
587 .context("unable to write new file header")?;
588 }
589
590 // Read the first 4096 bytes of the file to get the file header.
591 let mut header = vec![0; 4096];
592 file.read_exact(&mut header)
593 .await
594 .context("unable to read file header")?;
595
596 // Verify that the identifier bytes in the header match the expected identifier.
597 let header_identifier = from_utf8(&header[4..4 + expected_identifier.len()])
598 .context("the on-disk file identifier could not be parsed")?;
599 if header_identifier != expected_identifier {
600 bail!("the file does not have the correct identifier");
601 }
602
603 // Verify that the upgrade paths all lead to the latest version.
604 verify_upgrade_paths(&upgrades, latest_version).context("upgrade paths are invalid")?;
605
606 let version_str = from_utf8(&header[..3]).context("the on-disk version could not be parsed")?;
607 let mut version: u8 = version_str
608 .parse()
609 .context("unable to parse on-disk version")?;
610
611 // Had this weird issue with the async function pointer where I couldn't get it to work if I
612 // made the VersionedFile a pointer. As a result, we have to pass the file in, which transfers
613 // ownership, and means the file can't continue to be used in this function. I overcame that
614 // limitation by just opening and closing the file repeatedly. This is the first such place
615 // where we close the file for ownership reasons rather than because we don't need it anymore.
616 drop(file);
617
618 // Execute the upgrades.
619 while version != latest_version {
620 let mut found = false;
621 for upgrade in upgrades {
622 if upgrade.initial_version == version {
623 perform_file_upgrade(filepath, upgrade)
624 .await
625 .context("unable to complete file upgrade")?;
626 version = upgrade.updated_version;
627 found = true;
628 break;
629 }
630 }
631
632 // The upgrades verifier ensures that if an upgrade exists in the set of upgrades, then
633 // there also exists a path to the latest_version from that upgrade. Therefore, if this
634 // file doesn't have a path to the latest version, no other upgrades will be executed
635 // either.
636 if !found {
637 bail!("no viable upgrade path exists for file");
638 }
639 }
640
641 // Open the file and create the versioned_file.
642 let mut file = OpenOptions::new()
643 .read(true)
644 .write(true)
645 .open(filepath)
646 .await
647 .context("unable to open versioned file for update")?;
648 file.seek(SeekFrom::Start(4096))
649 .await
650 .context("unable to seek to beginning of file after header")?;
651 let versioned_file = VersionedFile {
652 file,
653 cursor: 4096,
654 needs_seek: false,
655 };
656
657 Ok(versioned_file)
658}
659
660#[cfg(test)]
661mod tests {
662 use super::*;
663
664 use testdir::testdir;
665
666 // Create a helper function which does a null upgrade so that we can do testing of the upgrade
667 // path verifier.
668 async fn stub_upgrade(_: VersionedFile, _: u8, _: u8) -> Result<(), Error> {
669 Ok(())
670 }
671
672 // This is a basic upgrade function that expects the current contents of the file to be
673 // "test_data". It will alter the contents so that they say "test".
674 async fn smoke_upgrade_1_2(
675 mut vf: VersionedFile,
676 initial_version: u8,
677 updated_version: u8,
678 ) -> Result<(), Error> {
679 // Verify that the correct version is being used.
680 if initial_version != 1 || updated_version != 2 {
681 bail!("this upgrade is intended to take the file from version 1 to version 2");
682 }
683 if vf.len().await.unwrap() != 9 {
684 bail!("file is wrong len");
685 }
686 // Read the file and verify that we are upgrading the correct data.
687 let mut buf = [0u8; 9];
688 vf.read_exact(&mut buf)
689 .await
690 .context("unable to read old file contents")?;
691 if &buf != b"test_data" {
692 bail!(format!("file appears corrupt: {:?}", buf));
693 }
694
695 // Truncate the file and write the new data into it.
696 let new_data = b"test";
697 vf.set_len(0).await.unwrap();
698 vf.write_all(new_data)
699 .await
700 .context("unable to write new data after deleting old data")?;
701 Ok(())
702 }
703
704 // smoke upgrade 2->3
705 async fn smoke_upgrade_2_3(
706 mut vf: VersionedFile,
707 initial_version: u8,
708 updated_version: u8,
709 ) -> Result<(), Error> {
710 // Verify that the correct version is being used.
711 if initial_version != 2 || updated_version != 3 {
712 bail!("this upgrade is intended to take the file from version 2 to version 3");
713 }
714 if vf.len().await.unwrap() != 4 {
715 bail!("file is wrong len");
716 }
717 // Read the file and verify that we are upgrading the correct data.
718 let mut buf = [0u8; 4];
719 vf.read_exact(&mut buf)
720 .await
721 .context("unable to read old file contents")?;
722 if &buf != b"test" {
723 bail!("file appears corrupt");
724 }
725
726 // Truncate the file and write the new data into it.
727 let new_data = b"testtest";
728 vf.set_len(0).await.unwrap();
729 vf.write_all(new_data)
730 .await
731 .context("unable to write new data after deleting old data")?;
732 Ok(())
733 }
734
735 // smoke upgrade 3->4
736 async fn smoke_upgrade_3_4(
737 mut vf: VersionedFile,
738 initial_version: u8,
739 updated_version: u8,
740 ) -> Result<(), Error> {
741 // Verify that the correct version is being used.
742 if initial_version != 3 || updated_version != 4 {
743 bail!("this upgrade is intended to take the file from version 1 to version 2");
744 }
745 if vf.len().await.unwrap() != 8 {
746 bail!("file is wrong len");
747 }
748 // Read the file and verify that we are upgrading the correct data.
749 let mut buf = [0u8; 8];
750 vf.read_exact(&mut buf)
751 .await
752 .context("unable to read old file contents")?;
753 if &buf != b"testtest" {
754 bail!("file appears corrupt");
755 }
756
757 // Truncate the file and write the new data into it.
758 let new_data = b"testtesttest";
759 vf.set_len(0).await.unwrap();
760 vf.write_all(new_data)
761 .await
762 .context("unable to write new data after deleting old data")?;
763 Ok(())
764 }
765
766 #[async_std::test]
767 // Do basic testing of all the major functions for VersionedFiles
768 async fn smoke_test() {
769 // Create a basic versioned file.
770 let dir = testdir!();
771 let test_dat = dir.join("test.dat");
772 open_file(&test_dat, "versioned_file::test.dat", 0, &Vec::new())
773 .await
774 .context("unable to create versioned file")
775 .unwrap_err();
776 open_file(&test_dat, "versioned_file::test.dat", 1, &Vec::new())
777 .await
778 .context("unable to create versioned file")
779 .unwrap();
780 // Try to open it again.
781 open_file(&test_dat, "versioned_file::test.dat", 1, &Vec::new())
782 .await
783 .context("unable to create versioned file")
784 .unwrap();
785 // Try to open it with the wrong specifier.
786 open_file(&test_dat, "bad_versioned_file::test.dat", 1, &Vec::new())
787 .await
788 .context("unable to create versioned file")
789 .unwrap_err();
790
791 // Try to make some invalid new files.
792 let invalid_name = dir.join("❄️"); // snowflake emoji in filename
793 open_file(&invalid_name, "versioned_file::test.dat", 1, &Vec::new())
794 .await
795 .context("unable to create versioned file")
796 .unwrap_err();
797 let invalid_id = dir.join("invalid_identifier.dat");
798 open_file(&invalid_id, "versioned_file::test.dat::❄️", 1, &Vec::new())
799 .await
800 .context("unable to create versioned file")
801 .unwrap_err();
802
803 // Perform a test where we open test.dat and write a small amount of data to it. Then we
804 // will open the file again and read back that data.
805 let mut file = open_file(&test_dat, "versioned_file::test.dat", 1, &Vec::new())
806 .await
807 .unwrap();
808 file.write_all(b"test_data").await.unwrap();
809 let mut file = open_file(&test_dat, "versioned_file::test.dat", 1, &Vec::new())
810 .await
811 .unwrap();
812 if file.len().await.unwrap() != 9 {
813 panic!("file has unexpected len");
814 }
815 let mut buf = [0u8; 9];
816 file.read_exact(&mut buf).await.unwrap();
817 if &buf != b"test_data" {
818 panic!("data read does not match data written");
819 }
820 // Try to open the file again and ensure the write happened in the correct spot.
821 open_file(&test_dat, "versioned_file::test.dat", 1, &Vec::new())
822 .await
823 .unwrap();
824
825 // Open the file again, this time with an upgrade for smoke_upgrade_1_2.
826 let mut upgrade_chain = vec![Upgrade {
827 initial_version: 1,
828 updated_version: 2,
829 process: wrap_upgrade_process(smoke_upgrade_1_2),
830 }];
831 let mut file = open_file(&test_dat, "versioned_file::test.dat", 2, &upgrade_chain)
832 .await
833 .unwrap();
834 if file.len().await.unwrap() != 4 {
835 panic!("file has wrong len");
836 }
837 let mut buf = [0u8; 4];
838 file.read_exact(&mut buf).await.unwrap();
839 if &buf != b"test" {
840 panic!("data read does not match data written");
841 }
842 // Try to open the file again to make sure everything still completes.
843 open_file(&test_dat, "versioned_file::test.dat", 2, &upgrade_chain)
844 .await
845 .unwrap();
846
847 // Attempt to do two upgrades at once, from 2 to 3 and 3 to 4.
848 upgrade_chain.push(Upgrade {
849 initial_version: 2,
850 updated_version: 3,
851 process: wrap_upgrade_process(smoke_upgrade_2_3),
852 });
853 upgrade_chain.push(Upgrade {
854 initial_version: 3,
855 updated_version: 4,
856 process: wrap_upgrade_process(smoke_upgrade_3_4),
857 });
858 let mut file = open_file(&test_dat, "versioned_file::test.dat", 4, &upgrade_chain)
859 .await
860 .unwrap();
861 if file.len().await.unwrap() != 12 {
862 panic!("file has wrong len");
863 }
864 let mut buf = [0u8; 12];
865 file.read_exact(&mut buf).await.unwrap();
866 if &buf != b"testtesttest" {
867 panic!("data read does not match data written");
868 }
869 // Try to open the file again to make sure everything still completes.
870 let mut file = open_file(&test_dat, "versioned_file::test.dat", 4, &upgrade_chain)
871 .await
872 .unwrap();
873
874 // Test that the seeking is implemented correctly.
875 file.seek(SeekFrom::End(-5)).await.unwrap();
876 file.write_all(b"NOVELLA").await.unwrap();
877 file.seek(SeekFrom::Current(-3)).await.unwrap();
878 file.seek(SeekFrom::Current(-4)).await.unwrap();
879 file.seek(SeekFrom::Current(-7)).await.unwrap();
880 let mut buf = [0u8; 14];
881 file.read_exact(&mut buf).await.unwrap();
882 if &buf != b"testtesNOVELLA" {
883 panic!(
884 "read data has unexpected result: {} || {}",
885 std::str::from_utf8(&buf).unwrap(),
886 buf[0]
887 );
888 }
889 file.seek(SeekFrom::Current(-2)).await.unwrap();
890 file.seek(SeekFrom::End(-15)).await.unwrap_err();
891 let mut buf = [0u8; 2];
892 file.read_exact(&mut buf).await.unwrap();
893 if &buf != b"LA" {
894 panic!("seek_end error changed file cursor");
895 }
896 file.seek(SeekFrom::Current(-2)).await.unwrap();
897 file.seek(SeekFrom::Current(-13)).await.unwrap_err();
898 file.read_exact(&mut buf).await.unwrap();
899 if &buf != b"LA" {
900 panic!("seek_end error changed file cursor");
901 }
902 }
903
904 #[test]
905 // Attempt to provide comprehensive test coverage of the upgrade path verifier.
906 fn test_verify_upgrade_paths() {
907 // Passing in no upgrades should be fine.
908 verify_upgrade_paths(&Vec::new(), 0).unwrap_err(); // 0 is not a legal version
909 verify_upgrade_paths(&Vec::new(), 1).unwrap();
910 verify_upgrade_paths(&Vec::new(), 2).unwrap();
911 verify_upgrade_paths(&Vec::new(), 255).unwrap();
912
913 // Passing in a single upgrade should be okay.
914 verify_upgrade_paths(
915 &vec![Upgrade {
916 initial_version: 1,
917 updated_version: 2,
918 process: wrap_upgrade_process(stub_upgrade),
919 }],
920 2,
921 )
922 .unwrap();
923
924 // A non-increasing upgrade is not okay.
925 verify_upgrade_paths(
926 &vec![Upgrade {
927 initial_version: 2,
928 updated_version: 2,
929 process: wrap_upgrade_process(stub_upgrade),
930 }],
931 2,
932 )
933 .unwrap_err();
934
935 // No route to final version is not okay.
936 verify_upgrade_paths(
937 &vec![Upgrade {
938 initial_version: 1,
939 updated_version: 2,
940 process: wrap_upgrade_process(stub_upgrade),
941 }],
942 3,
943 )
944 .unwrap_err();
945
946 // Simple path is okay.
947 verify_upgrade_paths(
948 &vec![
949 Upgrade {
950 initial_version: 1,
951 updated_version: 2,
952 process: wrap_upgrade_process(stub_upgrade),
953 },
954 Upgrade {
955 initial_version: 2,
956 updated_version: 3,
957 process: wrap_upgrade_process(stub_upgrade),
958 },
959 ],
960 3,
961 )
962 .unwrap();
963
964 // Two starting options for the same version is not okay.
965 verify_upgrade_paths(
966 &vec![
967 Upgrade {
968 initial_version: 1,
969 updated_version: 2,
970 process: wrap_upgrade_process(stub_upgrade),
971 },
972 Upgrade {
973 initial_version: 2,
974 updated_version: 3,
975 process: wrap_upgrade_process(stub_upgrade),
976 },
977 Upgrade {
978 initial_version: 1,
979 updated_version: 3,
980 process: wrap_upgrade_process(stub_upgrade),
981 },
982 ],
983 3,
984 )
985 .unwrap_err();
986
987 // Two ending options for the same version is okay.
988 verify_upgrade_paths(
989 &vec![
990 Upgrade {
991 initial_version: 1,
992 updated_version: 3,
993 process: wrap_upgrade_process(stub_upgrade),
994 },
995 Upgrade {
996 initial_version: 2,
997 updated_version: 3,
998 process: wrap_upgrade_process(stub_upgrade),
999 },
1000 ],
1001 3,
1002 )
1003 .unwrap();
1004
1005 // Two ending options for the same version, version too high.
1006 verify_upgrade_paths(
1007 &vec![
1008 Upgrade {
1009 initial_version: 1,
1010 updated_version: 3,
1011 process: wrap_upgrade_process(stub_upgrade),
1012 },
1013 Upgrade {
1014 initial_version: 2,
1015 updated_version: 3,
1016 process: wrap_upgrade_process(stub_upgrade),
1017 },
1018 ],
1019 2,
1020 )
1021 .unwrap_err();
1022
1023 // Complex valid structure.
1024 verify_upgrade_paths(
1025 &vec![
1026 Upgrade {
1027 initial_version: 1,
1028 updated_version: 3,
1029 process: wrap_upgrade_process(stub_upgrade),
1030 },
1031 Upgrade {
1032 initial_version: 2,
1033 updated_version: 3,
1034 process: wrap_upgrade_process(stub_upgrade),
1035 },
1036 Upgrade {
1037 initial_version: 3,
1038 updated_version: 6,
1039 process: wrap_upgrade_process(stub_upgrade),
1040 },
1041 Upgrade {
1042 initial_version: 4,
1043 updated_version: 6,
1044 process: wrap_upgrade_process(stub_upgrade),
1045 },
1046 Upgrade {
1047 initial_version: 5,
1048 updated_version: 6,
1049 process: wrap_upgrade_process(stub_upgrade),
1050 },
1051 ],
1052 6,
1053 )
1054 .unwrap();
1055
1056 // Complex valid structure, randomly ordered.
1057 verify_upgrade_paths(
1058 &vec![
1059 Upgrade {
1060 initial_version: 5,
1061 updated_version: 6,
1062 process: wrap_upgrade_process(stub_upgrade),
1063 },
1064 Upgrade {
1065 initial_version: 2,
1066 updated_version: 3,
1067 process: wrap_upgrade_process(stub_upgrade),
1068 },
1069 Upgrade {
1070 initial_version: 3,
1071 updated_version: 6,
1072 process: wrap_upgrade_process(stub_upgrade),
1073 },
1074 Upgrade {
1075 initial_version: 1,
1076 updated_version: 3,
1077 process: wrap_upgrade_process(stub_upgrade),
1078 },
1079 Upgrade {
1080 initial_version: 4,
1081 updated_version: 6,
1082 process: wrap_upgrade_process(stub_upgrade),
1083 },
1084 ],
1085 6,
1086 )
1087 .unwrap();
1088
1089 // Complex structure, randomly ordered, one orphan.
1090 verify_upgrade_paths(
1091 &vec![
1092 Upgrade {
1093 initial_version: 2,
1094 updated_version: 5,
1095 process: wrap_upgrade_process(stub_upgrade),
1096 },
1097 Upgrade {
1098 initial_version: 6,
1099 updated_version: 7,
1100 process: wrap_upgrade_process(stub_upgrade),
1101 },
1102 Upgrade {
1103 initial_version: 3,
1104 updated_version: 6,
1105 process: wrap_upgrade_process(stub_upgrade),
1106 },
1107 Upgrade {
1108 initial_version: 1,
1109 updated_version: 4,
1110 process: wrap_upgrade_process(stub_upgrade),
1111 },
1112 Upgrade {
1113 initial_version: 4,
1114 updated_version: 6,
1115 process: wrap_upgrade_process(stub_upgrade),
1116 },
1117 ],
1118 6,
1119 )
1120 .unwrap_err();
1121 }
1122
1123 #[test]
1124 fn test_version_to_str() {
1125 version_to_str(0).unwrap_err();
1126 if version_to_str(1).unwrap() != "001" {
1127 panic!("1 failed");
1128 }
1129 if version_to_str(2).unwrap() != "002" {
1130 panic!("2 failed");
1131 }
1132 if version_to_str(9).unwrap() != "009" {
1133 panic!("9 failed");
1134 }
1135 if version_to_str(39).unwrap() != "039" {
1136 panic!("39 failed");
1137 }
1138 if version_to_str(139).unwrap() != "139" {
1139 panic!("139 failed");
1140 }
1141 }
1142}