use std::collections::HashSet;
use crate::errors::Result;
use crate::operation::Operation;
use crate::server::Server;
use crate::storage::{Storage, TaskMap};
use crate::Operations;
use uuid::Uuid;
mod apply;
mod snapshot;
mod sync;
pub(crate) mod undo;
mod working_set;
pub(crate) struct TaskDb<S: Storage> {
storage: S,
}
impl<S: Storage> TaskDb<S> {
pub(crate) fn new(storage: S) -> TaskDb<S> {
TaskDb { storage }
}
pub(crate) async fn commit_operations<F>(
&mut self,
operations: Operations,
add_to_working_set: F,
) -> Result<()>
where
F: Fn(&Operation) -> bool,
{
let mut txn = self.storage.txn().await?;
apply::apply_operations(txn.as_mut(), &operations).await?;
let mut to_add = Vec::new();
for operation in &operations {
if add_to_working_set(operation) {
match operation {
Operation::Create { uuid }
| Operation::Update { uuid, .. }
| Operation::Delete { uuid, .. } => to_add.push(*uuid),
_ => {}
}
}
}
let mut working_set: HashSet<Uuid> = txn
.get_working_set()
.await?
.iter()
.filter_map(|u| *u)
.collect();
for uuid in to_add {
if !working_set.contains(&uuid) {
txn.add_to_working_set(uuid).await?;
working_set.insert(uuid);
}
}
for operation in operations {
txn.add_operation(operation).await?;
}
txn.commit().await
}
pub(crate) async fn all_tasks(&mut self) -> Result<Vec<(Uuid, TaskMap)>> {
let mut txn = self.storage.txn().await?;
txn.all_tasks().await
}
pub(crate) async fn all_task_uuids(&mut self) -> Result<Vec<Uuid>> {
let mut txn = self.storage.txn().await?;
txn.all_task_uuids().await
}
pub(crate) async fn working_set(&mut self) -> Result<Vec<Option<Uuid>>> {
let mut txn = self.storage.txn().await?;
txn.get_working_set().await
}
pub(crate) async fn get_task(&mut self, uuid: Uuid) -> Result<Option<TaskMap>> {
let mut txn = self.storage.txn().await?;
txn.get_task(uuid).await
}
pub(crate) async fn get_pending_tasks(&mut self) -> Result<Vec<(Uuid, TaskMap)>> {
let mut txn = self.storage.txn().await?;
txn.get_pending_tasks().await
}
pub(crate) async fn get_task_operations(&mut self, uuid: Uuid) -> Result<Operations> {
let mut txn = self.storage.txn().await?;
txn.get_task_operations(uuid).await
}
pub(crate) async fn rebuild_working_set<F>(
&mut self,
in_working_set: F,
renumber: bool,
) -> Result<()>
where
F: Fn(&TaskMap) -> bool,
{
working_set::rebuild(self.storage.txn().await?.as_mut(), in_working_set, renumber).await
}
pub(crate) async fn sync(
&mut self,
server: &mut Box<dyn Server>,
avoid_snapshots: bool,
) -> Result<()> {
let mut txn = self.storage.txn().await?;
sync::sync(server, txn.as_mut(), avoid_snapshots).await
}
pub(crate) async fn get_undo_operations(&mut self) -> Result<Operations> {
let mut txn = self.storage.txn().await?;
undo::get_undo_operations(txn.as_mut()).await
}
pub(crate) async fn commit_reversed_operations(
&mut self,
undo_ops: Operations,
) -> Result<bool> {
let mut txn = self.storage.txn().await?;
undo::commit_reversed_operations(txn.as_mut(), undo_ops).await
}
pub(crate) async fn num_operations(&mut self) -> Result<usize> {
let mut txn = self.storage.txn().await?;
Ok(txn
.unsynced_operations()
.await?
.iter()
.filter(|o| !o.is_undo_point())
.count())
}
pub(crate) async fn num_undo_points(&mut self) -> Result<usize> {
let mut txn = self.storage.txn().await?;
Ok(txn
.unsynced_operations()
.await?
.iter()
.filter(|o| o.is_undo_point())
.count())
}
#[cfg(test)]
pub(crate) async fn sorted_tasks(&mut self) -> Vec<(Uuid, Vec<(String, String)>)> {
let mut res: Vec<(Uuid, Vec<(String, String)>)> = self
.all_tasks()
.await
.unwrap()
.iter()
.map(|(u, t)| {
let mut t = t
.iter()
.map(|(p, v)| (p.clone(), v.clone()))
.collect::<Vec<(String, String)>>();
t.sort();
(*u, t)
})
.collect();
res.sort();
res
}
#[cfg(test)]
pub(crate) async fn operations(&mut self) -> Vec<Operation> {
let mut txn = self.storage.txn().await.unwrap();
txn.unsynced_operations().await.unwrap().to_vec()
}
}
#[cfg(test)]
mod tests {
use crate::storage::inmemory::InMemoryStorage;
use super::*;
use chrono::Utc;
use pretty_assertions::assert_eq;
use uuid::Uuid;
#[tokio::test]
async fn commit_operations() -> Result<()> {
let mut db = TaskDb::new(InMemoryStorage::new());
let uuid = Uuid::new_v4();
let now = Utc::now();
let mut ops = Operations::new();
ops.push(Operation::Create { uuid });
ops.push(Operation::Update {
uuid,
property: String::from("title"),
value: Some("my task".into()),
timestamp: now,
old_value: Some("old".into()),
});
db.commit_operations(ops, |_| false).await?;
assert_eq!(
db.sorted_tasks().await,
vec![(uuid, vec![("title".into(), "my task".into())])]
);
assert_eq!(
db.operations().await,
vec![
Operation::Create { uuid },
Operation::Update {
uuid,
property: String::from("title"),
value: Some("my task".into()),
timestamp: now,
old_value: Some("old".into()),
},
]
);
Ok(())
}
#[tokio::test]
async fn commit_operations_update_working_set() -> Result<()> {
let mut db = TaskDb::new(InMemoryStorage::new());
let mut uuids = [Uuid::new_v4(), Uuid::new_v4(), Uuid::new_v4()];
uuids.sort();
let [uuid1, uuid2, uuid3] = uuids;
{
let mut txn = db.storage.txn().await?;
txn.add_to_working_set(uuid1).await?;
txn.commit().await?;
}
let mut ops = Operations::new();
ops.push(Operation::Create { uuid: uuid1 });
ops.push(Operation::Create { uuid: uuid2 });
ops.push(Operation::Create { uuid: uuid3 });
ops.push(Operation::Create { uuid: uuid2 });
ops.push(Operation::Create { uuid: uuid3 });
let add_to_working_set = |op: &Operation| match op {
Operation::Create { uuid } => *uuid == uuid1 || *uuid == uuid2,
_ => false,
};
db.commit_operations(ops, add_to_working_set).await?;
assert_eq!(
db.sorted_tasks().await,
vec![(uuid1, vec![]), (uuid2, vec![]), (uuid3, vec![]),]
);
assert_eq!(
db.operations().await,
vec![
Operation::Create { uuid: uuid1 },
Operation::Create { uuid: uuid2 },
Operation::Create { uuid: uuid3 },
Operation::Create { uuid: uuid2 },
Operation::Create { uuid: uuid3 },
]
);
assert_eq!(
db.working_set().await?,
vec![None, Some(uuid1), Some(uuid2)],
);
Ok(())
}
#[tokio::test]
async fn test_num_operations() {
let mut db = TaskDb::new(InMemoryStorage::new());
let mut ops = Operations::new();
ops.push(Operation::Create {
uuid: Uuid::new_v4(),
});
ops.push(Operation::UndoPoint);
ops.push(Operation::Create {
uuid: Uuid::new_v4(),
});
db.commit_operations(ops, |_| false).await.unwrap();
assert_eq!(db.num_operations().await.unwrap(), 2);
}
#[tokio::test]
async fn test_num_undo_points() {
let mut db = TaskDb::new(InMemoryStorage::new());
let mut ops = Operations::new();
ops.push(Operation::UndoPoint);
db.commit_operations(ops, |_| false).await.unwrap();
assert_eq!(db.num_undo_points().await.unwrap(), 1);
let mut ops = Operations::new();
ops.push(Operation::UndoPoint);
db.commit_operations(ops, |_| false).await.unwrap();
assert_eq!(db.num_undo_points().await.unwrap(), 2);
}
}