use crate::errors::{Error, Result};
use crate::operation::Operation;
use crate::storage::config::AccessMode;
use crate::storage::send_wrapper::{WrappedStorage, WrappedStorageTxn};
use crate::storage::sqlite::{schema, SqliteError, StoredUuid};
use crate::storage::{TaskMap, VersionId, DEFAULT_BASE_VERSION};
use anyhow::Context;
use async_trait::async_trait;
use rusqlite::types::{FromSql, ToSql};
use rusqlite::{params, Connection, OpenFlags, OptionalExtension, TransactionBehavior};
use std::path::Path;
use uuid::Uuid;
struct StoredTaskMap(TaskMap);
impl FromSql for StoredTaskMap {
fn column_result(value: rusqlite::types::ValueRef<'_>) -> rusqlite::types::FromSqlResult<Self> {
let o: TaskMap = serde_json::from_str(value.as_str()?)
.map_err(|_| rusqlite::types::FromSqlError::InvalidType)?;
Ok(StoredTaskMap(o))
}
}
impl ToSql for StoredTaskMap {
fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> {
let s = serde_json::to_string(&self.0)
.map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?;
Ok(s.into())
}
}
impl FromSql for Operation {
fn column_result(value: rusqlite::types::ValueRef<'_>) -> rusqlite::types::FromSqlResult<Self> {
let o: Operation = serde_json::from_str(value.as_str()?)
.map_err(|_| rusqlite::types::FromSqlError::InvalidType)?;
Ok(o)
}
}
impl ToSql for Operation {
fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> {
let s = serde_json::to_string(&self)
.map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?;
Ok(s.into())
}
}
pub(super) struct SqliteStorageInner {
con: Connection,
access_mode: AccessMode,
}
impl SqliteStorageInner {
pub(super) fn new<P: AsRef<Path>>(
directory: P,
access_mode: AccessMode,
create_if_missing: bool,
) -> Result<SqliteStorageInner> {
let directory = directory.as_ref();
if create_if_missing {
std::fs::create_dir_all(directory).map_err(|e| {
Error::Database(format!("Cannot create directory {directory:?}: {e}"))
})?;
}
let db_file = directory.join("taskchampion.sqlite3");
let mut flags = OpenFlags::default();
let mut open_access_mode = access_mode;
if create_if_missing && access_mode == AccessMode::ReadOnly && !db_file.exists() {
open_access_mode = AccessMode::ReadWrite;
}
if !create_if_missing {
flags.remove(OpenFlags::SQLITE_OPEN_CREATE);
}
if open_access_mode == AccessMode::ReadOnly {
flags.remove(OpenFlags::SQLITE_OPEN_READ_WRITE);
flags.insert(OpenFlags::SQLITE_OPEN_READ_ONLY);
flags.remove(OpenFlags::SQLITE_OPEN_CREATE);
}
let mut con = Connection::open_with_flags(db_file, flags)?;
con.query_row("PRAGMA journal_mode=WAL", [], |_row| Ok(()))
.context("Setting journal_mode=WAL")?;
if open_access_mode == AccessMode::ReadWrite {
schema::upgrade_db(&mut con)?;
}
Ok(Self { access_mode, con })
}
}
#[async_trait(?Send)]
impl WrappedStorage for SqliteStorageInner {
async fn txn<'a>(&'a mut self) -> Result<Box<dyn WrappedStorageTxn + 'a>> {
let txn = self
.con
.transaction_with_behavior(TransactionBehavior::Immediate)?;
Ok(Box::new(Txn {
txn: Some(txn),
access_mode: self.access_mode,
}))
}
}
pub(super) struct Txn<'t> {
txn: Option<rusqlite::Transaction<'t>>,
access_mode: AccessMode,
}
impl<'t> Txn<'t> {
fn check_write_access(&self) -> std::result::Result<(), SqliteError> {
if self.access_mode != AccessMode::ReadWrite {
Err(SqliteError::ReadOnlyStorage)
} else {
Ok(())
}
}
fn get_txn(&self) -> std::result::Result<&rusqlite::Transaction<'t>, SqliteError> {
self.txn
.as_ref()
.ok_or(SqliteError::TransactionAlreadyCommitted)
}
fn get_next_working_set_number(&self) -> Result<usize> {
let t = self.get_txn()?;
let next_id: Option<usize> = t
.query_row(
"SELECT COALESCE(MAX(id), 0) + 1 FROM working_set",
[],
|r| r.get(0),
)
.optional()
.context("Getting highest working set ID")?;
Ok(next_id.unwrap_or(0))
}
}
#[async_trait(?Send)]
impl WrappedStorageTxn for Txn<'_> {
async fn get_task(&mut self, uuid: Uuid) -> Result<Option<TaskMap>> {
let t = self.get_txn()?;
let result: Option<StoredTaskMap> = t
.query_row(
"SELECT data FROM tasks WHERE uuid = ? LIMIT 1",
[&StoredUuid(uuid)],
|r| r.get("data"),
)
.optional()?;
Ok(result.map(|t| t.0))
}
async fn get_pending_tasks(&mut self) -> Result<Vec<(Uuid, TaskMap)>> {
let t = self.get_txn()?;
let mut q = t.prepare(
"SELECT tasks.* FROM tasks JOIN working_set ON tasks.uuid = working_set.uuid",
)?;
let rows = q.query_map([], |r| {
let uuid: StoredUuid = r.get("uuid")?;
let data: StoredTaskMap = r.get("data")?;
Ok((uuid.0, data.0))
})?;
let mut res = Vec::new();
for row in rows {
res.push(row?)
}
Ok(res)
}
async fn create_task(&mut self, uuid: Uuid) -> Result<bool> {
self.check_write_access()?;
let t = self.get_txn()?;
let count: usize = t.query_row(
"SELECT count(uuid) FROM tasks WHERE uuid = ?",
[&StoredUuid(uuid)],
|x| x.get(0),
)?;
if count > 0 {
return Ok(false);
}
let data = TaskMap::default();
t.execute(
"INSERT INTO tasks (uuid, data) VALUES (?, ?)",
params![&StoredUuid(uuid), &StoredTaskMap(data)],
)
.context("Create task query")?;
Ok(true)
}
async fn set_task(&mut self, uuid: Uuid, task: TaskMap) -> Result<()> {
self.check_write_access()?;
let t = self.get_txn()?;
t.execute(
"INSERT OR REPLACE INTO tasks (uuid, data) VALUES (?, ?)",
params![&StoredUuid(uuid), &StoredTaskMap(task)],
)
.context("Update task query")?;
Ok(())
}
async fn delete_task(&mut self, uuid: Uuid) -> Result<bool> {
self.check_write_access()?;
let t = self.get_txn()?;
let changed = t
.execute("DELETE FROM tasks WHERE uuid = ?", [&StoredUuid(uuid)])
.context("Delete task query")?;
Ok(changed > 0)
}
async fn all_tasks(&mut self) -> Result<Vec<(Uuid, TaskMap)>> {
let t = self.get_txn()?;
let mut q = t.prepare("SELECT uuid, data FROM tasks")?;
let rows = q.query_map([], |r| {
let uuid: StoredUuid = r.get("uuid")?;
let data: StoredTaskMap = r.get("data")?;
Ok((uuid.0, data.0))
})?;
let mut ret = vec![];
for r in rows {
ret.push(r?);
}
Ok(ret)
}
async fn all_task_uuids(&mut self) -> Result<Vec<Uuid>> {
let t = self.get_txn()?;
let mut q = t.prepare("SELECT uuid FROM tasks")?;
let rows = q.query_map([], |r| {
let uuid: StoredUuid = r.get("uuid")?;
Ok(uuid.0)
})?;
let mut ret = vec![];
for r in rows {
ret.push(r?);
}
Ok(ret)
}
async fn base_version(&mut self) -> Result<VersionId> {
let t = self.get_txn()?;
let version: Option<StoredUuid> = t
.query_row(
"SELECT value FROM sync_meta WHERE key = 'base_version'",
[],
|r| r.get("value"),
)
.optional()?;
Ok(version.map(|u| u.0).unwrap_or(DEFAULT_BASE_VERSION))
}
async fn set_base_version(&mut self, version: VersionId) -> Result<()> {
self.check_write_access()?;
let t = self.get_txn()?;
t.execute(
"INSERT OR REPLACE INTO sync_meta (key, value) VALUES (?, ?)",
params!["base_version", &StoredUuid(version)],
)
.context("Set base version")?;
Ok(())
}
async fn get_task_operations(&mut self, uuid: Uuid) -> Result<Vec<Operation>> {
let t = self.get_txn()?;
let mut q = t.prepare("SELECT data FROM operations where uuid=? ORDER BY id ASC")?;
let rows = q.query_map([&StoredUuid(uuid)], |r| {
let data: Operation = r.get("data")?;
Ok(data)
})?;
let mut ret = vec![];
for r in rows {
ret.push(r?);
}
Ok(ret)
}
async fn unsynced_operations(&mut self) -> Result<Vec<Operation>> {
let t = self.get_txn()?;
let mut q = t.prepare("SELECT data FROM operations WHERE NOT synced ORDER BY id ASC")?;
let rows = q.query_map([], |r| {
let data: Operation = r.get("data")?;
Ok(data)
})?;
let mut ret = vec![];
for r in rows {
ret.push(r?);
}
Ok(ret)
}
async fn num_unsynced_operations(&mut self) -> Result<usize> {
let t = self.get_txn()?;
let count: usize = t.query_row(
"SELECT count(*) FROM operations WHERE NOT synced",
[],
|x| x.get(0),
)?;
Ok(count)
}
async fn add_operation(&mut self, op: Operation) -> Result<()> {
self.check_write_access()?;
let t = self.get_txn()?;
t.execute("INSERT INTO operations (data) VALUES (?)", params![&op])
.context("Add operation query")?;
Ok(())
}
async fn remove_operation(&mut self, op: Operation) -> Result<()> {
self.check_write_access()?;
let t = self.get_txn()?;
let last: Option<(u32, Operation)> = t
.query_row(
"SELECT id, data FROM operations WHERE NOT synced ORDER BY id DESC LIMIT 1",
[],
|x| Ok((x.get(0)?, x.get(1)?)),
)
.optional()?;
if let Some((last_id, last_op)) = last {
if last_op == op {
t.execute("DELETE FROM operations where id = ?", [last_id])
.context("Removing operation")?;
return Ok(());
}
}
Err(Error::Database(
"Last operation does not match -- cannot remove".to_string(),
))
}
async fn sync_complete(&mut self) -> Result<()> {
self.check_write_access()?;
let t = self.get_txn()?;
t.execute(
"UPDATE operations SET synced = true WHERE synced = false",
[],
)
.context("Marking operations as synced")?;
t.execute(
r#"DELETE from operations
WHERE uuid IN (
SELECT operations.uuid FROM operations LEFT JOIN tasks ON operations.uuid = tasks.uuid WHERE tasks.uuid IS NULL
)"#,
[],
)
.context("Deleting orphaned operations")?;
Ok(())
}
async fn get_working_set(&mut self) -> Result<Vec<Option<Uuid>>> {
let t = self.get_txn()?;
let mut q = t.prepare("SELECT id, uuid FROM working_set ORDER BY id ASC")?;
let rows = q
.query_map([], |r| {
let id: usize = r.get("id")?;
let uuid: StoredUuid = r.get("uuid")?;
Ok((id, uuid.0))
})
.context("Get working set query")?;
let rows: Vec<std::result::Result<(usize, Uuid), _>> = rows.collect();
let mut res = Vec::with_capacity(rows.len());
for _ in 0..self
.get_next_working_set_number()
.context("Getting working set number")?
{
res.push(None);
}
for r in rows {
let (id, uuid) = r?;
res[id] = Some(uuid);
}
Ok(res)
}
async fn add_to_working_set(&mut self, uuid: Uuid) -> Result<usize> {
self.check_write_access()?;
let t = self.get_txn()?;
let next_working_id = self.get_next_working_set_number()?;
t.execute(
"INSERT INTO working_set (id, uuid) VALUES (?, ?)",
params![next_working_id, &StoredUuid(uuid)],
)
.context("Create task query")?;
Ok(next_working_id)
}
async fn set_working_set_item(&mut self, index: usize, uuid: Option<Uuid>) -> Result<()> {
self.check_write_access()?;
let t = self.get_txn()?;
match uuid {
Some(uuid) => t.execute(
"INSERT OR REPLACE INTO working_set (id, uuid) VALUES (?, ?)",
params![index, &StoredUuid(uuid)],
),
None => t.execute("DELETE FROM working_set WHERE id = ?", [index]),
}
.context("Set working set item query")?;
Ok(())
}
async fn clear_working_set(&mut self) -> Result<()> {
self.check_write_access()?;
let t = self.get_txn()?;
t.execute("DELETE FROM working_set", [])
.context("Clear working set query")?;
Ok(())
}
async fn commit(&mut self) -> Result<()> {
self.check_write_access()?;
let t = self
.txn
.take()
.ok_or(SqliteError::TransactionAlreadyCommitted)?;
t.commit().context("Committing transaction")?;
Ok(())
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::storage::taskmap_with;
use chrono::Utc;
use pretty_assertions::assert_eq;
use rstest::rstest;
use std::thread;
use std::time::Duration;
use tempfile::TempDir;
fn create_0_8_0_db(path: &Path) -> Result<()> {
let db_file = path.join("taskchampion.sqlite3");
let con = Connection::open(db_file)?;
con.query_row("PRAGMA journal_mode=WAL", [], |_row| Ok(()))
.context("Setting journal_mode=WAL")?;
let queries = vec![
r#"CREATE TABLE operations (id INTEGER PRIMARY KEY AUTOINCREMENT, data STRING);"#,
r#"INSERT INTO operations VALUES(1,'"UndoPoint"');"#,
r#"INSERT INTO operations VALUES(2,
'{"Create":{"uuid":"e2956511-fd47-4e40-926a-52616229c2fa"}}');"#,
r#"INSERT INTO operations VALUES(3,
'{"Update":{"uuid":"e2956511-fd47-4e40-926a-52616229c2fa",
"property":"description",
"old_value":null,
"value":"one",
"timestamp":"2024-08-25T19:06:11.840482523Z"}}');"#,
r#"INSERT INTO operations VALUES(4,
'{"Update":{"uuid":"e2956511-fd47-4e40-926a-52616229c2fa",
"property":"entry",
"old_value":null,
"value":"1724612771",
"timestamp":"2024-08-25T19:06:11.840497662Z"}}');"#,
r#"INSERT INTO operations VALUES(5,
'{"Update":{"uuid":"e2956511-fd47-4e40-926a-52616229c2fa",
"property":"modified",
"old_value":null,
"value":"1724612771",
"timestamp":"2024-08-25T19:06:11.840498973Z"}}');"#,
r#"INSERT INTO operations VALUES(6,
'{"Update":{"uuid":"e2956511-fd47-4e40-926a-52616229c2fa",
"property":"status",
"old_value":null,
"value":"pending",
"timestamp":"2024-08-25T19:06:11.840505346Z"}}');"#,
r#"INSERT INTO operations VALUES(7,'"UndoPoint"');"#,
r#"INSERT INTO operations VALUES(8,
'{"Create":{"uuid":"1d125b41-ee1d-49a7-9319-0506dee414f8"}}');"#,
r#"INSERT INTO operations VALUES(9,
'{"Update":{"uuid":"1d125b41-ee1d-49a7-9319-0506dee414f8",
"property":"dep_e2956511-fd47-4e40-926a-52616229c2fa",
"old_value":null,
"value":"x",
"timestamp":"2024-08-25T19:06:15.880952492Z"}}');"#,
r#"INSERT INTO operations VALUES(10,
'{"Update":{"uuid":"1d125b41-ee1d-49a7-9319-0506dee414f8",
"property":"depends",
"old_value":null,
"value":"e2956511-fd47-4e40-926a-52616229c2fa",
"timestamp":"2024-08-25T19:06:15.880969429Z"}}');"#,
r#"INSERT INTO operations VALUES(11,
'{"Update":{"uuid":"1d125b41-ee1d-49a7-9319-0506dee414f8",
"property":"description",
"old_value":null,
"value":"two",
"timestamp":"2024-08-25T19:06:15.880970972Z"}}');"#,
r#"INSERT INTO operations VALUES(12,
'{"Update":{"uuid":"1d125b41-ee1d-49a7-9319-0506dee414f8",
"property":"entry",
"old_value":null,
"value":"1724612775",
"timestamp":"2024-08-25T19:06:15.880974948Z"}}');"#,
r#"INSERT INTO operations VALUES(13,
'{"Update":{"uuid":"1d125b41-ee1d-49a7-9319-0506dee414f8",
"property":"modified",
"old_value":null,
"value":"1724612775",
"timestamp":"2024-08-25T19:06:15.880976160Z"}}');"#,
r#"INSERT INTO operations VALUES(14,
'{"Update":{"uuid":"1d125b41-ee1d-49a7-9319-0506dee414f8",
"property":"status",
"old_value":null,
"value":"pending",
"timestamp":"2024-08-25T19:06:15.880977255Z"}}');"#,
r#"CREATE TABLE sync_meta (key STRING PRIMARY KEY, value STRING);"#,
r#"CREATE TABLE tasks (uuid STRING PRIMARY KEY, data STRING);"#,
r#"INSERT INTO tasks VALUES('e2956511-fd47-4e40-926a-52616229c2fa',
'{"status":"pending",
"entry":"1724612771",
"modified":"1724612771",
"description":"one"}');"#,
r#"INSERT INTO tasks VALUES('1d125b41-ee1d-49a7-9319-0506dee414f8',
'{"modified":"1724612775",
"status":"pending",
"description":"two",
"dep_e2956511-fd47-4e40-926a-52616229c2fa":"x",
"entry":"1724612775",
"depends":"e2956511-fd47-4e40-926a-52616229c2fa"}');"#,
r#"CREATE TABLE working_set (id INTEGER PRIMARY KEY, uuid STRING);"#,
r#"INSERT INTO working_set VALUES(1,'e2956511-fd47-4e40-926a-52616229c2fa');"#,
r#"INSERT INTO working_set VALUES(2,'1d125b41-ee1d-49a7-9319-0506dee414f8');"#,
r#"DELETE FROM sqlite_sequence;"#,
r#"INSERT INTO sqlite_sequence VALUES('operations',14);"#,
];
for q in queries {
con.execute(q, [])
.with_context(|| format!("executing {q}"))?;
}
Ok(())
}
#[tokio::test]
async fn test_empty_dir() -> Result<()> {
let tmp_dir = TempDir::new()?;
let non_existant = tmp_dir.path().join("subdir");
let mut storage =
SqliteStorageInner::new(non_existant.clone(), AccessMode::ReadWrite, true)?;
let uuid = Uuid::new_v4();
{
let mut txn = storage.txn().await?;
assert!(txn.create_task(uuid).await?);
txn.commit().await?;
}
{
let mut txn = storage.txn().await?;
let task = txn.get_task(uuid).await?;
assert_eq!(task, Some(taskmap_with(vec![])));
}
let mut storage = SqliteStorageInner::new(non_existant, AccessMode::ReadWrite, true)?;
{
let mut txn = storage.txn().await?;
let task = txn.get_task(uuid).await?;
assert_eq!(task, Some(taskmap_with(vec![])));
}
Ok(())
}
#[tokio::test]
async fn test_0_8_0_db() -> Result<()> {
let tmp_dir = TempDir::new()?;
create_0_8_0_db(tmp_dir.path())?;
let mut storage = SqliteStorageInner::new(tmp_dir.path(), AccessMode::ReadWrite, true)?;
assert_eq!(
schema::get_db_version(&mut storage.con)?,
schema::LATEST_VERSION,
);
let one = Uuid::parse_str("e2956511-fd47-4e40-926a-52616229c2fa").unwrap();
let two = Uuid::parse_str("1d125b41-ee1d-49a7-9319-0506dee414f8").unwrap();
{
let mut txn = storage.txn().await?;
let mut task_one = txn.get_task(one).await?.unwrap();
assert_eq!(task_one.get("description").unwrap(), "one");
let task_two = txn.get_task(two).await?.unwrap();
assert_eq!(task_two.get("description").unwrap(), "two");
let ops = txn.unsynced_operations().await?;
assert_eq!(ops.len(), 14);
assert_eq!(ops[0], Operation::UndoPoint);
task_one.insert("description".into(), "updated".into());
txn.set_task(one, task_one).await?;
txn.add_operation(Operation::Update {
uuid: one,
property: "description".into(),
old_value: Some("one".into()),
value: Some("updated".into()),
timestamp: Utc::now(),
})
.await?;
txn.commit().await?;
}
{
let mut txn = storage.txn().await?;
let task_one = txn.get_task(one).await?.unwrap();
assert_eq!(task_one.get("description").unwrap(), "updated");
let ops = txn.unsynced_operations().await?;
assert_eq!(ops.len(), 15);
}
{
let t = storage
.con
.transaction_with_behavior(TransactionBehavior::Immediate)?;
let mut q = t.prepare("SELECT data, uuid FROM operations ORDER BY id ASC")?;
let mut num_ops = 0;
for row in q
.query_map([], |r| {
let uuid: Option<StoredUuid> = r.get("uuid")?;
let operation: Operation = r.get("data")?;
Ok((uuid.map(|su| su.0), operation))
})
.context("Get all operations")?
{
let (uuid, operation) = row?;
assert_eq!(uuid, operation.get_uuid());
num_ops += 1;
}
assert_eq!(num_ops, 15);
}
Ok(())
}
#[test]
fn test_concurrent_access() -> Result<()> {
let tmp_dir = TempDir::new()?;
let path = tmp_dir.path();
SqliteStorageInner::new(path, AccessMode::ReadWrite, true).unwrap();
thread::scope(|scope| {
scope.spawn(|| {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_time()
.build()
.expect("Failed to create current-thread runtime");
rt.block_on(async move {
let mut storage =
SqliteStorageInner::new(path, AccessMode::ReadWrite, true).unwrap();
let u = Uuid::new_v4();
let mut txn = storage.txn().await.unwrap();
txn.set_base_version(u).await.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
txn.commit().await.unwrap();
});
});
scope.spawn(|| {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_time()
.build()
.expect("Failed to create current-thread runtime");
rt.block_on(async move {
tokio::time::sleep(Duration::from_millis(50)).await;
let mut storage =
SqliteStorageInner::new(path, AccessMode::ReadWrite, true).unwrap();
let u = Uuid::new_v4();
let mut txn = storage.txn().await.unwrap();
txn.set_base_version(u).await.unwrap();
txn.commit().await.unwrap();
});
});
});
Ok(())
}
#[rstest]
#[case::create_non_existent(false, true)]
#[case::create_exists(true, true)]
#[case::exists_dont_create(true, false)]
#[tokio::test]
async fn test_read_only(#[case] exists: bool, #[case] create: bool) -> Result<()> {
let tmp_dir = TempDir::new()?;
if exists {
SqliteStorageInner::new(tmp_dir.path(), AccessMode::ReadWrite, true)?;
}
let mut storage = SqliteStorageInner::new(tmp_dir.path(), AccessMode::ReadOnly, create)?;
fn is_read_only_err<T: std::fmt::Debug>(res: Result<T>) -> bool {
&res.unwrap_err().to_string() == "Task storage was opened in read-only mode"
}
let mut txn = storage.txn().await?;
let taskmap = TaskMap::new();
let op = Operation::UndoPoint;
assert!(is_read_only_err(txn.create_task(Uuid::new_v4()).await));
assert!(is_read_only_err(
txn.set_task(Uuid::new_v4(), taskmap).await
));
assert!(is_read_only_err(txn.delete_task(Uuid::new_v4()).await));
assert!(is_read_only_err(txn.set_base_version(Uuid::new_v4()).await));
assert!(is_read_only_err(txn.add_operation(op.clone()).await));
assert!(is_read_only_err(txn.remove_operation(op).await));
assert!(is_read_only_err(txn.sync_complete().await));
assert!(is_read_only_err(
txn.add_to_working_set(Uuid::new_v4()).await
));
assert!(is_read_only_err(txn.set_working_set_item(1, None).await));
assert!(is_read_only_err(txn.clear_working_set().await));
assert!(is_read_only_err(txn.commit().await));
assert_eq!(txn.get_task(Uuid::new_v4()).await?, None);
assert_eq!(txn.get_pending_tasks().await?.len(), 0);
assert_eq!(txn.all_tasks().await?.len(), 0);
assert_eq!(txn.base_version().await?, Uuid::nil());
assert_eq!(txn.get_task_operations(Uuid::new_v4()).await?.len(), 0);
assert_eq!(txn.unsynced_operations().await?.len(), 0);
assert_eq!(txn.num_unsynced_operations().await?, 0);
assert_eq!(txn.get_working_set().await?.len(), 1);
Ok(())
}
}