jsondb 0.4.0

JSON-based """embedded database""" library
Documentation
//! A quick-and-dirty “““embedded database””” library.
//!
//! This is just a library for storing data in JSON files, but with
//! a few added conveniences:
//!
//! * The saved data includes a schema version number, and will be
//!   automatically migrated to newer schema versions.
//! * The live data is guarded by a built-in read-write lock which can
//!   be used synchronously or from a [tokio] async environment.
//! * Data is saved to the backing JSON file, in a hopefully-atomic
//!   fashion, every time a write lock is released.
//!
//! Data can be represented in pretty much any form you can convince
//! [serde] to go along with, except for the following restrictions:
//!
//! * Your data type must be [`Debug`] + [`Send`] + [`Sync`] + `'static`.
//! * Your serialization format shouldn't include a top-level
//!   `version` field of its own, as this is reserved for our schema
//!   version tracking.
//! * You can't use `#[serde(deny_unknown_fields)]`, as this conflicts
//!   with our use of `#[serde(flatten)]`.

use std::{
    cmp::Ordering,
    ffi::OsString,
    fmt::Debug,
    io::ErrorKind,
    ops::Deref,
    path::{Path, PathBuf},
    sync::Arc,
};

use serde::{de::DeserializeOwned, Deserialize, Serialize};
use tokio::{
    fs::{rename, File},
    io::{AsyncReadExt, AsyncWriteExt},
    sync::{mpsc, oneshot, OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock},
};

#[cfg(test)]
mod tests;

/// A JSON-backed “““database”””.
///
/// This wraps a value that is loaded from a JSON file, automatically
/// migrated forward from previous schema versions, and automatically
/// written to disk when it's updated (we attempt to make saves atomic
/// using the `rename(2)` function).
pub struct JsonDb<T: Schema> {
    channel: mpsc::UnboundedSender<Request<T>>,
}

#[derive(Debug)]
enum Request<T> {
    Read(oneshot::Sender<OwnedRwLockReadGuard<T>>),
    Write(oneshot::Sender<OwnedRwLockWriteGuard<T>>),
    Flush(oneshot::Sender<()>),
}

/// Schema for a JSON-backed database.
///
/// This needs to be a (de)serializable type, with a previous schema
/// that can be migrated into the new schema, and a version number
/// which must be the previous schema's version +1, [unless this is
/// version 0][`SchemaV0`].  This can then be automatically parsed
/// from a JSON object containing a `version` field along with the
/// other fields of the corresponding schema version; earlier versions
/// will be migrated to the current version automatically.
pub trait Schema: Send + Sync + Debug + DeserializeOwned + Serialize + 'static {
    /// Previous schema that can be migrated into the new schema.
    type Prev: Schema + Into<Self>;

    /// Schema version number.
    const VERSION: u32 = Self::Prev::VERSION + 1;

    /// Whether unversioned data should be parsed as V0, rather than
    /// rejected with an error.
    const UNVERSIONED_V0: bool = Self::Prev::UNVERSIONED_V0;

    fn parse(s: &str) -> Result<Self, Error> {
        let version = match serde_json::from_str::<Version>(s)?.version {
            Some(v) => v,
            None => {
                if Self::UNVERSIONED_V0 {
                    0
                } else {
                    return Err(Error::MissingVersion);
                }
            }
        };
        match version.cmp(&Self::VERSION) {
            Ordering::Less => Ok(Self::Prev::parse(s)?.into()),
            Ordering::Equal => Ok(serde_json::from_str::<VersionedData<Self>>(s)?.data),
            Ordering::Greater => Err(Error::UnknownVersion(version)),
        }
    }
}

/// Marker trait to indicate version 0 of a database schema.
///
/// Implementing this will automatically implement [`Schema`], with
/// version number `0` and `Self` as the previous version.
pub trait SchemaV0: Send + Sync + Debug + DeserializeOwned + Serialize + 'static {
    /// Set this to `true` if your version 0 data may be stored in a
    /// pre-`JsonDb` format that does not include a version number.
    /// Note that regardless of this setting, when data is written
    /// back to the JSON file, it will always include a version
    /// number.
    const VERSION_OPTIONAL: bool = false;
}

impl<T: SchemaV0> Schema for T {
    type Prev = Self;
    const VERSION: u32 = 0;
    const UNVERSIONED_V0: bool = Self::VERSION_OPTIONAL;
}

#[derive(Deserialize)]
struct Version {
    version: Option<u32>,
}

#[derive(Deserialize, Serialize)]
struct VersionedData<T> {
    version: Option<u32>,
    #[serde(flatten)]
    data: T,
}

/// Errors that can occur while working with [`JsonDb`].
#[derive(thiserror::Error, Debug)]
pub enum Error {
    #[error("I/O error")]
    Io(#[from] std::io::Error),
    #[error("Failed to parse JSON")]
    Json(#[from] serde_json::Error),
    #[error("Unknown schema version {0}")]
    UnknownVersion(u32),
    #[error("Missing schema version")]
    MissingVersion,
}

impl<T: Schema + Default> JsonDb<T> {
    /// Load a [`JsonDb`] from a given file, creating it and
    /// initializing it with the schema's default value if it does not
    /// exist.
    pub async fn load(path: PathBuf) -> Result<JsonDb<T>, Error> {
        Self::load_or_else(path, T::default).await
    }
}

async fn save<T: Schema>(data: &T, path: &Path) -> Result<(), Error> {
    let mut temp_file_name = OsString::from(".");
    temp_file_name.push(path.file_name().unwrap());
    temp_file_name.push(".tmp");
    let temp_file_path = path.parent().unwrap().join(temp_file_name);
    {
        let mut temp_file = File::create(&temp_file_path).await?;
        temp_file
            .write_all(&serde_json::to_vec_pretty(&VersionedData {
                version: Some(T::VERSION),
                data,
            })?)
            .await?;
        temp_file.sync_all().await?;
    }
    // Atomically update the actual file
    rename(&temp_file_path, &path).await?;

    Ok(())
}

impl<T: Schema> JsonDb<T> {
    /// Load a [`JsonDb`] from a given file, creating it and
    /// initializing it with the provided default value if it does not
    /// exist.
    pub async fn load_or(path: PathBuf, default: T) -> Result<JsonDb<T>, Error> {
        Self::load_or_else(path, || default).await
    }

    /// Load a [`JsonDb`] from a given file, creating it and
    /// initializing it with the provided function if it does not
    /// exist.
    pub async fn load_or_else<F>(path: PathBuf, default: F) -> Result<JsonDb<T>, Error>
    where
        F: FnOnce() -> T,
    {
        let open_result = File::open(&path).await;
        let data = match open_result {
            Ok(mut f) => {
                let mut buf = String::new();
                f.read_to_string(&mut buf).await?;
                T::parse(&buf)?
            }
            Err(e) => {
                if let ErrorKind::NotFound = e.kind() {
                    default()
                } else {
                    return Err(e.into());
                }
            }
        };
        let (request_send, mut request_recv) = mpsc::unbounded_channel::<Request<T>>();
        tokio::spawn(async move {
            save(&data, &path).await.expect("Failed to save data");
            let lock = Arc::new(RwLock::new(data));
            while let Some(request) = request_recv.recv().await {
                match request {
                    Request::Read(response) => {
                        response
                            .send(lock.clone().read_owned().await)
                            .expect("Failed to send read guard");
                    }
                    Request::Write(response) => {
                        response
                            .send(lock.clone().write_owned().await)
                            .expect("Failed to send write guard");
                        save(lock.read().await.deref(), &path)
                            .await
                            .expect("Failed to save data");
                    }
                    Request::Flush(response) => {
                        // Once we get around to handling this
                        // request, we've already flushed data from
                        // any previously-issued write requests
                        response
                            .send(())
                            .expect("Failed to send flush confirmation");
                    }
                }
            }
        });
        Ok(JsonDb {
            channel: request_send,
        })
    }

    fn request_read(&self) -> oneshot::Receiver<OwnedRwLockReadGuard<T>> {
        let (send, recv) = oneshot::channel();
        self.channel
            .send(Request::Read(send))
            .expect("Failed to send read lock request");
        recv
    }

    /// Take a read lock on the wrapped data.
    pub async fn read(&self) -> OwnedRwLockReadGuard<T> {
        self.request_read()
            .await
            .expect("Failed to receive read lock")
    }

    /// Synchronous version of [`read`][Self::read].
    pub fn blocking_read(&self) -> OwnedRwLockReadGuard<T> {
        self.request_read()
            .blocking_recv()
            .expect("Failed to receive read lock")
    }

    fn request_write(&self) -> oneshot::Receiver<OwnedRwLockWriteGuard<T>> {
        let (send, recv) = oneshot::channel();
        self.channel
            .send(Request::Write(send))
            .expect("Failed to send write lock request");
        recv
    }

    /// Take a write lock on the wrapped data. When the write guard is
    /// dropped, it triggers an atomic write of the updated data back
    /// to disk.
    pub async fn write(&self) -> OwnedRwLockWriteGuard<T> {
        self.request_write()
            .await
            .expect("Failed to receive write lock")
    }

    /// Synchronous version of [`write`][Self::write].
    pub fn blocking_write(&self) -> OwnedRwLockWriteGuard<T> {
        self.request_write()
            .blocking_recv()
            .expect("Failed to receive write lock")
    }

    fn request_flush(&self) -> oneshot::Receiver<()> {
        let (send, recv) = oneshot::channel();
        self.channel
            .send(Request::Flush(send))
            .expect("Failed to send flush request");
        recv
    }

    /// Wait for data to finish flushing to disk. Every call to
    /// [`read`][Self::read] or [`write`][Self::write], or their
    /// blocking equivalents, also waits for data to be flushed before
    /// returning a guard.
    pub async fn flush(&self) {
        self.request_flush()
            .await
            .expect("Failed to receive flush confirmation");
    }

    /// Synchronous version of [`flush`][Self::flush].
    pub fn blocking_flush(&self) {
        self.request_flush()
            .blocking_recv()
            .expect("Failed to receive flush confirmation");
    }
}