use std::sync::Arc;
use async_trait::async_trait;
use awaken_server_contract::contract::commit_coordinator::{
CommitCoordinator, CommitError, ThreadCommit, ThreadCommitOutcome, TransactionScopeId,
};
use awaken_server_contract::contract::storage::{
RuntimeCheckpointStore, ThreadRunCheckpointStore, ThreadRunStore,
};
use super::RunDispatchExecutor;
pub trait IntoDispatchExecutor {
fn into_dispatch_executor(self) -> Arc<dyn RunDispatchExecutor>;
}
impl<R: RunDispatchExecutor + 'static> IntoDispatchExecutor for Arc<R> {
fn into_dispatch_executor(self) -> Arc<dyn RunDispatchExecutor> {
self
}
}
impl IntoDispatchExecutor for Arc<dyn RunDispatchExecutor> {
fn into_dispatch_executor(self) -> Arc<dyn RunDispatchExecutor> {
self
}
}
pub(super) struct MailboxRunStoreCoordinator {
store: Arc<dyn ThreadRunStore>,
scope: TransactionScopeId,
}
impl MailboxRunStoreCoordinator {
pub(super) fn new(store: Arc<dyn ThreadRunStore>) -> Self {
let scope = TransactionScopeId::new(format!("mailbox-implicit::{:p}", Arc::as_ptr(&store)))
.expect("mailbox scope id is non-empty");
Self { store, scope }
}
}
#[async_trait]
impl CommitCoordinator for MailboxRunStoreCoordinator {
fn scope(&self) -> TransactionScopeId {
self.scope.clone()
}
fn reader(&self) -> Arc<dyn RuntimeCheckpointStore> {
Arc::new(ThreadRunCheckpointStore::new(Arc::clone(&self.store)))
}
async fn commit_checkpoint(
&self,
plan: ThreadCommit,
) -> Result<ThreadCommitOutcome, CommitError> {
plan.validate()?;
self.store
.checkpoint_append(
&plan.thread_id,
&plan.message_delta,
plan.expected_message_count,
&plan.run_projection,
)
.await
.map_err(|error| {
CommitError::StoreWrite(error).reclassify_append_conflict(&plan.thread_id)
})?;
Ok(ThreadCommitOutcome)
}
}