use super::{WrappedStorage, WrappedStorageTxn};
use crate::errors::Result;
use crate::operation::Operation;
use crate::storage::{TaskMap, VersionId};
use tokio::sync::{mpsc, oneshot};
use uuid::Uuid;
pub(super) enum ActorMessage {
BeginTxn(oneshot::Sender<Result<mpsc::UnboundedSender<TxnMessage>>>),
}
pub(super) enum TxnMessage {
Commit(oneshot::Sender<Result<()>>),
Rollback,
GetTask(Uuid, oneshot::Sender<Result<Option<TaskMap>>>),
GetPendingTasks(oneshot::Sender<Result<Vec<(Uuid, TaskMap)>>>),
CreateTask(Uuid, oneshot::Sender<Result<bool>>),
SetTask(Uuid, TaskMap, oneshot::Sender<Result<()>>),
DeleteTask(Uuid, oneshot::Sender<Result<bool>>),
AllTasks(oneshot::Sender<Result<Vec<(Uuid, TaskMap)>>>),
AllTaskUuids(oneshot::Sender<Result<Vec<Uuid>>>),
BaseVersion(oneshot::Sender<Result<VersionId>>),
SetBaseVersion(VersionId, oneshot::Sender<Result<()>>),
GetTaskOperations(Uuid, oneshot::Sender<Result<Vec<Operation>>>),
UnsyncedOperations(oneshot::Sender<Result<Vec<Operation>>>),
NumUnsyncedOperations(oneshot::Sender<Result<usize>>),
AddOperation(Operation, oneshot::Sender<Result<()>>),
RemoveOperation(Operation, oneshot::Sender<Result<()>>),
SyncComplete(oneshot::Sender<Result<()>>),
GetWorkingSet(oneshot::Sender<Result<Vec<Option<Uuid>>>>),
AddToWorkingSet(Uuid, oneshot::Sender<Result<usize>>),
SetWorkingSetItem(usize, Option<Uuid>, oneshot::Sender<Result<()>>),
ClearWorkingSet(oneshot::Sender<Result<()>>),
IsEmpty(oneshot::Sender<Result<bool>>),
}
pub(super) struct ActorImpl<S: WrappedStorage> {
storage: S,
receiver: mpsc::UnboundedReceiver<ActorMessage>,
}
impl<S: WrappedStorage> ActorImpl<S> {
pub(super) fn new(storage: S, receiver: mpsc::UnboundedReceiver<ActorMessage>) -> Self {
Self { storage, receiver }
}
pub(super) async fn run(&mut self) {
while let Some(ActorMessage::BeginTxn(reply_sender)) = self.receiver.recv().await {
let (txn_sender, mut txn_receiver) = mpsc::unbounded_channel::<TxnMessage>();
match self.storage.txn().await {
Ok(mut txn) => {
if reply_sender.send(Ok(txn_sender)).is_err() {
continue; }
Self::handle_transaction(&mut txn_receiver, &mut txn).await;
}
Err(e) => {
let _ = reply_sender.send(Err(e));
}
}
}
}
async fn handle_transaction(
receiver: &mut mpsc::UnboundedReceiver<TxnMessage>,
txn: &mut Box<dyn WrappedStorageTxn + '_>,
) {
while let Some(msg) = receiver.recv().await {
match msg {
TxnMessage::Commit(resp) => {
let _ = resp.send(txn.commit().await);
return; }
TxnMessage::Rollback => {
return; }
TxnMessage::GetTask(uuid, resp) => {
let _ = resp.send(txn.get_task(uuid).await);
}
TxnMessage::GetPendingTasks(resp) => {
let _ = resp.send(txn.get_pending_tasks().await);
}
TxnMessage::CreateTask(uuid, resp) => {
let _ = resp.send(txn.create_task(uuid).await);
}
TxnMessage::SetTask(uuid, t, resp) => {
let _ = resp.send(txn.set_task(uuid, t).await);
}
TxnMessage::DeleteTask(uuid, resp) => {
let _ = resp.send(txn.delete_task(uuid).await);
}
TxnMessage::AllTasks(resp) => {
let _ = resp.send(txn.all_tasks().await);
}
TxnMessage::AllTaskUuids(resp) => {
let _ = resp.send(txn.all_task_uuids().await);
}
TxnMessage::BaseVersion(resp) => {
let _ = resp.send(txn.base_version().await);
}
TxnMessage::SetBaseVersion(v, resp) => {
let _ = resp.send(txn.set_base_version(v).await);
}
TxnMessage::GetTaskOperations(u, resp) => {
let _ = resp.send(txn.get_task_operations(u).await);
}
TxnMessage::UnsyncedOperations(resp) => {
let _ = resp.send(txn.unsynced_operations().await);
}
TxnMessage::NumUnsyncedOperations(resp) => {
let _ = resp.send(txn.num_unsynced_operations().await);
}
TxnMessage::AddOperation(o, resp) => {
let _ = resp.send(txn.add_operation(o).await);
}
TxnMessage::RemoveOperation(o, resp) => {
let _ = resp.send(txn.remove_operation(o).await);
}
TxnMessage::SyncComplete(resp) => {
let _ = resp.send(txn.sync_complete().await);
}
TxnMessage::GetWorkingSet(resp) => {
let _ = resp.send(txn.get_working_set().await);
}
TxnMessage::AddToWorkingSet(u, resp) => {
let _ = resp.send(txn.add_to_working_set(u).await);
}
TxnMessage::SetWorkingSetItem(i, u, resp) => {
let _ = resp.send(txn.set_working_set_item(i, u).await);
}
TxnMessage::ClearWorkingSet(resp) => {
let _ = resp.send(txn.clear_working_set().await);
}
TxnMessage::IsEmpty(resp) => {
let _ = resp.send(txn.is_empty().await);
}
};
}
}
}