use crate::errors::Result;
use crate::storage::{StorageTxn, TaskMap};
use std::collections::HashSet;
pub(crate) async fn rebuild<F>(
txn: &mut dyn StorageTxn,
in_working_set: F,
renumber: bool,
) -> Result<()>
where
F: Fn(&TaskMap) -> bool,
{
let old_ws = txn.get_working_set().await?;
let mut new_ws = vec![None]; let mut seen = HashSet::new();
for elt in &old_ws[1..] {
if let Some(uuid) = elt {
if let Some(task) = txn.get_task(*uuid).await? {
if in_working_set(&task) {
new_ws.push(Some(*uuid));
seen.insert(*uuid);
} else {
if !renumber {
new_ws.push(None);
}
}
continue;
}
} else {
new_ws.push(None);
}
}
for (uuid, task) in txn.all_tasks().await? {
if !seen.contains(&uuid) && in_working_set(&task) {
new_ws.push(Some(uuid));
}
}
for (i, (old, new)) in old_ws.iter().zip(new_ws.iter()).enumerate() {
if old != new {
txn.set_working_set_item(i, *new).await?;
}
}
match new_ws.len().cmp(&old_ws.len()) {
std::cmp::Ordering::Less => {
for (i, item) in old_ws.iter().enumerate().skip(new_ws.len()) {
if item.is_some() {
txn.set_working_set_item(i, None).await?;
}
}
}
std::cmp::Ordering::Equal => {}
std::cmp::Ordering::Greater => {
for uuid in &new_ws[old_ws.len()..] {
txn.add_to_working_set(uuid.expect("new ws items should not be None"))
.await?;
}
}
}
txn.commit().await?;
Ok(())
}
#[cfg(test)]
mod test {
use super::*;
use crate::storage::inmemory::InMemoryStorage;
use crate::storage::Storage;
use crate::taskdb::TaskDb;
use crate::{Operation, Operations};
use chrono::Utc;
use uuid::Uuid;
#[tokio::test]
async fn rebuild_working_set_renumber() -> Result<()> {
rebuild_working_set(true).await
}
#[tokio::test]
async fn rebuild_working_set_no_renumber() -> Result<()> {
rebuild_working_set(false).await
}
async fn rebuild_working_set(renumber: bool) -> Result<()> {
let mut db = TaskDb::new(InMemoryStorage::new());
let mut uuids = vec![];
uuids.push(Uuid::new_v4());
println!("uuids[0]: {:?} - pending, not in working set", uuids[0]);
uuids.push(Uuid::new_v4());
println!("uuids[1]: {:?} - pending, in working set", uuids[1]);
uuids.push(Uuid::new_v4());
println!("uuids[2]: {:?} - not pending, not in working set", uuids[2]);
uuids.push(Uuid::new_v4());
println!("uuids[3]: {:?} - not pending, in working set", uuids[3]);
uuids.push(Uuid::new_v4());
println!("uuids[4]: {:?} - pending, in working set", uuids[4]);
let mut ops = Operations::new();
for uuid in &uuids {
ops.push(Operation::Create { uuid: *uuid });
}
for i in &[0usize, 1, 4] {
ops.push(Operation::Update {
uuid: uuids[*i],
property: String::from("status"),
value: Some("pending".into()),
old_value: None,
timestamp: Utc::now(),
});
}
db.commit_operations(ops, |_| false).await?;
{
let mut txn = db.storage.txn().await?;
txn.clear_working_set().await?;
for i in &[1usize, 3, 4] {
txn.add_to_working_set(uuids[*i]).await?;
}
txn.commit().await?;
}
assert_eq!(
db.working_set().await?,
vec![None, Some(uuids[1]), Some(uuids[3]), Some(uuids[4])]
);
rebuild(
db.storage.txn().await?.as_mut(),
|t| {
if let Some(status) = t.get("status") {
status == "pending"
} else {
false
}
},
renumber,
)
.await?;
let exp = if renumber {
vec![None, Some(uuids[1]), Some(uuids[4]), Some(uuids[0])]
} else {
vec![None, Some(uuids[1]), None, Some(uuids[4]), Some(uuids[0])]
};
assert_eq!(db.working_set().await?, exp);
Ok(())
}
#[tokio::test]
async fn rebuild_working_set_no_change() -> Result<()> {
let mut db = TaskDb::new(InMemoryStorage::new());
let mut uuids = vec![];
uuids.push(Uuid::new_v4());
println!("uuids[0]: {:?} - pending, in working set", uuids[0]);
uuids.push(Uuid::new_v4());
println!("uuids[1]: {:?} - pending, in working set", uuids[1]);
uuids.push(Uuid::new_v4());
println!("uuids[2]: {:?} - pending, not in working set", uuids[2]);
let mut ops = Operations::new();
for uuid in &uuids {
ops.push(Operation::Create { uuid: *uuid });
ops.push(Operation::Update {
uuid: *uuid,
property: String::from("status"),
value: Some("pending".into()),
old_value: None,
timestamp: Utc::now(),
});
}
db.commit_operations(ops, |_| false).await?;
{
let mut txn = db.storage.txn().await?;
txn.clear_working_set().await?;
for i in &[0, 1] {
txn.add_to_working_set(uuids[*i]).await?;
}
txn.commit().await?;
}
rebuild(
db.storage.txn().await?.as_mut(),
|t| {
if let Some(status) = t.get("status") {
status == "pending"
} else {
false
}
},
true,
)
.await?;
assert_eq!(
db.working_set().await?,
vec![None, Some(uuids[0]), Some(uuids[1]), Some(uuids[2])]
);
Ok(())
}
#[tokio::test]
async fn rebuild_working_set_shrinks() -> Result<()> {
let mut db = TaskDb::new(InMemoryStorage::new());
let mut uuids = vec![];
uuids.push(Uuid::new_v4());
println!("uuids[0]: {:?} - pending, in working set", uuids[0]);
uuids.push(Uuid::new_v4());
println!("uuids[1]: {:?} - not pending, in working set", uuids[1]);
uuids.push(Uuid::new_v4());
println!("uuids[2]: {:?} - not pending, in working set", uuids[2]);
let mut ops = Operations::new();
for uuid in &uuids {
ops.push(Operation::Create { uuid: *uuid });
}
ops.push(Operation::Update {
uuid: uuids[0],
property: String::from("status"),
value: Some("pending".into()),
old_value: None,
timestamp: Utc::now(),
});
db.commit_operations(ops, |_| false).await?;
{
let mut txn = db.storage.txn().await?;
txn.clear_working_set().await?;
for uuid in &uuids {
txn.add_to_working_set(*uuid).await?;
}
txn.commit().await?;
}
rebuild(
db.storage.txn().await?.as_mut(),
|t| {
if let Some(status) = t.get("status") {
status == "pending"
} else {
false
}
},
true,
)
.await?;
assert_eq!(db.working_set().await?, vec![None, Some(uuids[0])]);
Ok(())
}
}