use std::{
borrow::Cow,
convert::TryFrom,
ffi::OsString,
path::{Path, PathBuf},
str::FromStr,
sync::Arc,
};
use flume::Receiver;
use pliantdb_core::{
document::{Document, Header, Revision},
schema::{CollectionName, Key},
transaction::Executed,
};
use structopt::StructOpt;
use tokio::{
fs::File,
io::{AsyncReadExt, AsyncWriteExt},
};
use crate::{
config::Configuration,
database::{document_tree_name, transaction_tree_name},
Storage,
};
const TRANSACTIONS_FOLDER_NAME: &str = "_transactions";
#[derive(StructOpt, Debug)]
pub struct Cli {
pub database_path: PathBuf,
#[structopt(subcommand)]
pub subcommand: Command,
}
#[derive(StructOpt, Debug)]
pub enum Command {
Save {
output_directory: Option<PathBuf>,
output_name: Option<String>,
},
Load {
backup: PathBuf,
},
}
impl Command {
pub async fn execute(&self, database_path: PathBuf) -> anyhow::Result<()> {
match self {
Self::Save {
output_directory,
output_name,
} => {
self.save(database_path, output_directory, output_name)
.await
}
Self::Load { backup } => self.load(&database_path, backup).await,
}
}
async fn save(
&self,
database_path: PathBuf,
output_directory: &Option<PathBuf>,
output_name: &Option<String>,
) -> anyhow::Result<()> {
if !database_path.exists() {
anyhow::bail!("database_path does not exist");
}
let db = Storage::open_local(&database_path, &Configuration::default()).await?;
let output_directory = if let Some(output_directory) = output_directory {
output_directory.clone()
} else {
database_path.parent().map(ToOwned::to_owned).unwrap()
};
let output_name = if let Some(output_name) = output_name.clone() {
PathBuf::from_str(&output_name)?
} else {
let mut name = database_path.file_name().unwrap().to_owned();
name.push(&OsString::from(".backup"));
PathBuf::from(name)
};
let backup_directory = output_directory.join(output_name);
let (sender, receiver) = flume::bounded(100);
let document_writer = tokio::spawn(write_documents(receiver, backup_directory));
tokio::task::spawn_blocking::<_, anyhow::Result<()>>(move || {
for (database, collection_tree) in
db.sled().tree_names().into_iter().filter_map(|tree| {
if let Some(database_end) = tree.windows(2).position(|t| t.starts_with(b"::")) {
let database = String::from_utf8(tree[0..database_end].to_vec()).ok()?;
if &tree[database_end..database_end + 14] == b"::collection::" {
return Some((database, tree));
}
}
None
})
{
println!(
"Exporting {}",
String::from_utf8(collection_tree.to_vec()).unwrap()
);
let database = Arc::new(database);
let tree = db.sled().open_tree(&collection_tree)?;
for result in tree.iter() {
let (_, document) = result?;
let document = bincode::deserialize::<Document<'_>>(&document)?;
sender.send(BackupEntry::Document {
database: database.clone(),
document: document.to_owned(),
})?;
}
if let Ok(tree) = db
.sled()
.open_tree(transaction_tree_name(&database).as_bytes())
{
for row in tree.iter() {
let (_, executed) = row?;
let transaction = bincode::deserialize::<Executed<'static>>(&executed)?;
sender.send(BackupEntry::Transaction {
database: database.clone(),
transaction,
})?;
}
}
}
Ok(())
})
.await
.unwrap()
.unwrap();
document_writer.await.unwrap()
}
async fn load(&self, database_path: &Path, backup: &Path) -> anyhow::Result<()> {
let storage = Storage::open_local(database_path, &Configuration::default()).await?;
let (sender, receiver) = flume::bounded(100);
let document_restorer =
tokio::task::spawn_blocking(|| restore_documents(receiver, storage));
let mut databases = tokio::fs::read_dir(&backup).await?;
while let Some(database_folder) = databases.next_entry().await? {
let database = match database_folder.file_name().to_str() {
Some(name) => Arc::new(name.to_owned()),
None => continue,
};
let mut collections = tokio::fs::read_dir(&database_folder.path()).await?;
while let Some(collection_folder) = collections.next_entry().await? {
let collection_folder = collection_folder.path();
let collection = collection_folder
.file_name()
.unwrap()
.to_str()
.expect("invalid collection name encountered");
if collection == TRANSACTIONS_FOLDER_NAME {
println!("Restoring executed transactions");
let mut entries = tokio::fs::read_dir(&collection_folder).await?;
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
if path.extension() == Some(&OsString::from("cbor")) {
let mut file = File::open(&path).await?;
let mut contents = Vec::new();
file.read_to_end(&mut contents).await?;
let transaction = serde_cbor::from_slice(&contents)?;
sender.send(BackupEntry::Transaction {
database: database.clone(),
transaction,
})?;
}
}
} else {
let collection = CollectionName::try_from(collection)?;
println!("Restoring {}", collection);
let mut entries = tokio::fs::read_dir(&collection_folder).await?;
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
if path.extension() == Some(&OsString::from("cbor")) {
let file_name = path
.file_name()
.unwrap()
.to_str()
.expect("invalid file name encountered");
let parts = file_name.split('.').collect::<Vec<_>>();
let id = parts[0].parse::<u64>()?;
let revision = parts[1].parse::<u32>()?;
let mut file = File::open(&path).await?;
let mut contents = Vec::new();
file.read_to_end(&mut contents).await?;
let doc = Document {
header: Cow::Owned(Header {
id,
revision: Revision::with_id(revision, &contents),
}),
collection: collection.clone(),
contents: Cow::Owned(contents),
};
sender
.send_async(BackupEntry::Document {
database: database.clone(),
document: doc,
})
.await?;
}
}
}
}
}
drop(sender);
document_restorer.await?
}
}
enum BackupEntry {
Document {
database: Arc<String>,
document: Document<'static>,
},
Transaction {
database: Arc<String>,
transaction: Executed<'static>,
},
}
async fn write_documents(receiver: Receiver<BackupEntry>, backup: PathBuf) -> anyhow::Result<()> {
if !backup.exists() {
tokio::fs::create_dir(&backup).await?;
}
while let Ok(entry) = receiver.recv_async().await {
match entry {
BackupEntry::Document { database, document } => {
let collection_directory = backup
.join(database.as_ref())
.join(document.collection.to_string());
if !collection_directory.exists() {
tokio::fs::create_dir_all(&collection_directory).await?;
}
let document_path = collection_directory.join(format!(
"{}.{}.cbor",
document.header.id, document.header.revision.id
));
let mut file = File::create(&document_path).await?;
file.write_all(&document.contents).await?;
file.shutdown().await?;
}
BackupEntry::Transaction {
database,
transaction,
} => {
let transactions_directory = backup.join(database.as_ref()).join("_transactions");
if !transactions_directory.exists() {
tokio::fs::create_dir_all(&transactions_directory).await?;
}
let document_path = transactions_directory.join(format!("{}.cbor", transaction.id));
let mut file = File::create(&document_path).await?;
file.write_all(&serde_cbor::to_vec(&transaction)?).await?;
file.shutdown().await?;
}
}
}
Ok(())
}
#[allow(clippy::clippy::needless_pass_by_value)]
fn restore_documents(receiver: Receiver<BackupEntry>, storage: Storage) -> anyhow::Result<()> {
while let Ok(entry) = receiver.recv() {
match entry {
BackupEntry::Document { database, document } => {
let tree = storage
.sled()
.open_tree(document_tree_name(&database, &document.collection))?;
tree.insert(
document.header.id.as_big_endian_bytes()?,
bincode::serialize(&document)?,
)?;
}
BackupEntry::Transaction {
database,
transaction,
} => {
let tree = storage.sled().open_tree(transaction_tree_name(&database))?;
tree.insert(
transaction.id.as_big_endian_bytes()?,
bincode::serialize(&transaction)?,
)?;
}
}
}
storage.sled().flush()?;
Ok(())
}
#[cfg(test)]
mod tests {
use pliantdb_core::{
connection::Connection as _,
test_util::{Basic, TestDirectory},
};
use super::*;
use crate::Database;
#[tokio::test]
async fn backup_restore() -> anyhow::Result<()> {
let backup_destination = TestDirectory::new("backup-restore.pliantdb.backup");
let test_doc = {
let database_directory = TestDirectory::new("backup-restore.pliantdb");
let db = Database::<Basic>::open_local(&database_directory, &Configuration::default())
.await?;
let test_doc = db
.collection::<Basic>()
.push(&Basic::new("somevalue"))
.await?;
drop(db);
Command::Save {
output_directory: None,
output_name: Some(
backup_destination
.0
.file_name()
.unwrap()
.to_str()
.unwrap()
.to_owned(),
),
}
.execute(database_directory.0.clone())
.await?;
test_doc
};
let database_directory = TestDirectory::new("backup-restore.pliantdb");
Command::Load {
backup: backup_destination.0.clone(),
}
.execute(database_directory.0.clone())
.await?;
let db =
Database::<Basic>::open_local(&database_directory, &Configuration::default()).await?;
let doc = db
.get::<Basic>(test_doc.id)
.await?
.expect("Backed up document.not found");
let contents = doc.contents::<Basic>()?;
assert_eq!(contents.value, "somevalue");
Ok(())
}
}