use std::{
cmp::Ordering,
ffi::OsString,
fmt::Debug,
io::ErrorKind,
ops::Deref,
path::{Path, PathBuf},
sync::Arc,
};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use tokio::{
fs::{rename, File},
io::{AsyncReadExt, AsyncWriteExt},
sync::{mpsc, oneshot, OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock},
};
#[cfg(test)]
mod tests;
pub struct JsonDb<T: Schema> {
channel: mpsc::UnboundedSender<Request<T>>,
}
#[derive(Debug)]
enum Request<T> {
Read(oneshot::Sender<OwnedRwLockReadGuard<T>>),
Write(oneshot::Sender<OwnedRwLockWriteGuard<T>>),
Flush(oneshot::Sender<()>),
}
pub trait Schema: Send + Sync + Debug + DeserializeOwned + Serialize + 'static {
type Prev: Schema + Into<Self>;
const VERSION: u32 = Self::Prev::VERSION + 1;
const UNVERSIONED_V0: bool = Self::Prev::UNVERSIONED_V0;
fn parse(s: &str) -> Result<Self, Error> {
let version = match serde_json::from_str::<Version>(s)?.version {
Some(v) => v,
None => {
if Self::UNVERSIONED_V0 {
0
} else {
return Err(Error::MissingVersion);
}
}
};
match version.cmp(&Self::VERSION) {
Ordering::Less => Ok(Self::Prev::parse(s)?.into()),
Ordering::Equal => Ok(serde_json::from_str::<VersionedData<Self>>(s)?.data),
Ordering::Greater => Err(Error::UnknownVersion(version)),
}
}
}
pub trait SchemaV0: Send + Sync + Debug + DeserializeOwned + Serialize + 'static {
const VERSION_OPTIONAL: bool = false;
}
impl<T: SchemaV0> Schema for T {
type Prev = Self;
const VERSION: u32 = 0;
const UNVERSIONED_V0: bool = Self::VERSION_OPTIONAL;
}
#[derive(Deserialize)]
struct Version {
version: Option<u32>,
}
#[derive(Deserialize, Serialize)]
struct VersionedData<T> {
version: Option<u32>,
#[serde(flatten)]
data: T,
}
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("I/O error")]
Io(#[from] std::io::Error),
#[error("Failed to parse JSON")]
Json(#[from] serde_json::Error),
#[error("Unknown schema version {0}")]
UnknownVersion(u32),
#[error("Missing schema version")]
MissingVersion,
}
impl<T: Schema + Default> JsonDb<T> {
pub async fn load(path: PathBuf) -> Result<JsonDb<T>, Error> {
Self::load_or_else(path, T::default).await
}
}
async fn save<T: Schema>(data: &T, path: &Path) -> Result<(), Error> {
let mut temp_file_name = OsString::from(".");
temp_file_name.push(path.file_name().unwrap());
temp_file_name.push(".tmp");
let temp_file_path = path.parent().unwrap().join(temp_file_name);
{
let mut temp_file = File::create(&temp_file_path).await?;
temp_file
.write_all(&serde_json::to_vec_pretty(&VersionedData {
version: Some(T::VERSION),
data,
})?)
.await?;
temp_file.sync_all().await?;
}
rename(&temp_file_path, &path).await?;
Ok(())
}
impl<T: Schema> JsonDb<T> {
pub async fn load_or(path: PathBuf, default: T) -> Result<JsonDb<T>, Error> {
Self::load_or_else(path, || default).await
}
pub async fn load_or_else<F>(path: PathBuf, default: F) -> Result<JsonDb<T>, Error>
where
F: FnOnce() -> T,
{
let open_result = File::open(&path).await;
let data = match open_result {
Ok(mut f) => {
let mut buf = String::new();
f.read_to_string(&mut buf).await?;
T::parse(&buf)?
}
Err(e) => {
if let ErrorKind::NotFound = e.kind() {
default()
} else {
return Err(e.into());
}
}
};
let (request_send, mut request_recv) = mpsc::unbounded_channel::<Request<T>>();
tokio::spawn(async move {
save(&data, &path).await.expect("Failed to save data");
let lock = Arc::new(RwLock::new(data));
while let Some(request) = request_recv.recv().await {
match request {
Request::Read(response) => {
response
.send(lock.clone().read_owned().await)
.expect("Failed to send read guard");
}
Request::Write(response) => {
response
.send(lock.clone().write_owned().await)
.expect("Failed to send write guard");
save(lock.read().await.deref(), &path)
.await
.expect("Failed to save data");
}
Request::Flush(response) => {
response
.send(())
.expect("Failed to send flush confirmation");
}
}
}
});
Ok(JsonDb {
channel: request_send,
})
}
fn request_read(&self) -> oneshot::Receiver<OwnedRwLockReadGuard<T>> {
let (send, recv) = oneshot::channel();
self.channel
.send(Request::Read(send))
.expect("Failed to send read lock request");
recv
}
pub async fn read(&self) -> OwnedRwLockReadGuard<T> {
self.request_read()
.await
.expect("Failed to receive read lock")
}
pub fn blocking_read(&self) -> OwnedRwLockReadGuard<T> {
self.request_read()
.blocking_recv()
.expect("Failed to receive read lock")
}
fn request_write(&self) -> oneshot::Receiver<OwnedRwLockWriteGuard<T>> {
let (send, recv) = oneshot::channel();
self.channel
.send(Request::Write(send))
.expect("Failed to send write lock request");
recv
}
pub async fn write(&self) -> OwnedRwLockWriteGuard<T> {
self.request_write()
.await
.expect("Failed to receive write lock")
}
pub fn blocking_write(&self) -> OwnedRwLockWriteGuard<T> {
self.request_write()
.blocking_recv()
.expect("Failed to receive write lock")
}
fn request_flush(&self) -> oneshot::Receiver<()> {
let (send, recv) = oneshot::channel();
self.channel
.send(Request::Flush(send))
.expect("Failed to send flush request");
recv
}
pub async fn flush(&self) {
self.request_flush()
.await
.expect("Failed to receive flush confirmation");
}
pub fn blocking_flush(&self) {
self.request_flush()
.blocking_recv()
.expect("Failed to receive flush confirmation");
}
}