use std::future::Future;
use std::sync::Arc;
use tokio::sync::{mpsc, Mutex};
use tokio::task::{JoinSet, LocalSet};
use crate::shard::InternalJoinSetResult;
use crate::{ServiceData, UpstreamError};
struct DataCommitGuard<'a, Key: Send + 'static, Data: ServiceData> {
key: Option<Key>,
internal_join_set: &'a mut JoinSet<InternalJoinSetResult<Key, Data>>,
}
#[must_use = "the data commit request must be resolved or rejected, otherwise the operation will be considered aborted."]
pub struct DataCommitRequest<'a, Key: Send + 'static, Data: ServiceData> {
data: &'a mut Data,
commit_guard: DataCommitGuard<'a, Key, Data>,
}
impl<'a, Key: Send, Data: ServiceData> DataCommitRequest<'a, Key, Data> {
pub(crate) fn new(
key: Key,
data: &'a mut Data,
internal_join_set: &'a mut JoinSet<InternalJoinSetResult<Key, Data>>,
) -> Self {
Self {
commit_guard: DataCommitGuard {
key: Some(key),
internal_join_set,
},
data,
}
}
pub fn data_mut(&mut self) -> &mut Data {
self.data
}
pub fn data(&self) -> &Data {
self.data
}
pub fn key(&self) -> &Key {
self.commit_guard.key()
}
pub fn into_processing(self) -> ProcessingDataCommitRequest<'a, Key, Data> {
ProcessingDataCommitRequest {
drop_guard: self.commit_guard,
}
}
pub fn resolve(self) {
self.into_processing().resolve();
}
pub fn reject<E: Into<UpstreamError>>(self, error: E) {
self.into_processing().reject(error);
}
}
impl<'a, Key: Send + 'static, Data: ServiceData> DataCommitRequest<'a, Key, Data> {
pub fn spawn<F: Future<Output = Result<(), UpstreamError>> + Send + 'static>(self, fut: F) {
self.into_processing().spawn(fut);
}
}
impl<'a, Key: Send, Data: ServiceData + Clone> DataCommitRequest<'a, Key, Data> {
pub fn into_owned(self) -> OwnedDataCommitRequest<'a, Key, Data> {
OwnedDataCommitRequest {
data: self.data.clone(),
commit_guard: self.commit_guard,
}
}
}
#[must_use = "the data commit request must be resolved or rejected, otherwise the operation will be considered aborted."]
pub struct OwnedDataCommitRequest<'a, Key: Send + 'static, Data: ServiceData> {
data: Data,
commit_guard: DataCommitGuard<'a, Key, Data>,
}
impl<'a, Key: Send, Data: ServiceData> OwnedDataCommitRequest<'a, Key, Data> {
pub fn into_inner(self) -> (Data, ProcessingDataCommitRequest<'a, Key, Data>) {
(
self.data,
ProcessingDataCommitRequest {
drop_guard: self.commit_guard,
},
)
}
pub fn data(&self) -> &Data {
&self.data
}
pub fn resolve(self) {
self.commit_guard.resolve();
}
pub fn reject<E: Into<UpstreamError>>(self, error: E) {
self.commit_guard.reject(error);
}
}
impl<'a, Key: Send + 'static, Data: ServiceData> OwnedDataCommitRequest<'a, Key, Data> {
pub fn spawn<
R: Future<Output = Result<(), UpstreamError>> + Send + 'static,
F: FnOnce(&Key, Data) -> R + Send + 'static,
>(
mut self,
func: F,
) {
let key = self.commit_guard.take_key();
self.commit_guard.internal_join_set.spawn(async move {
let fut = (func)(&key, self.data);
match fut.await {
Ok(()) => InternalJoinSetResult::DataCommitResult(key, Ok(())),
Err(err) => InternalJoinSetResult::DataCommitResult(key, Err(err)),
}
});
}
}
#[must_use = "the data commit request must be resolved or rejected, otherwise the operation will be considered aborted."]
pub struct ProcessingDataCommitRequest<'a, Key: Send + 'static, Data: ServiceData> {
drop_guard: DataCommitGuard<'a, Key, Data>,
}
impl<'a, Key: Send, Data: ServiceData> ProcessingDataCommitRequest<'a, Key, Data> {
pub fn resolve(self) {
self.drop_guard.resolve();
}
pub fn reject<E: Into<UpstreamError>>(self, error: E) {
self.drop_guard.reject(error);
}
}
impl<'a, Key: Send + 'static, Data: ServiceData> ProcessingDataCommitRequest<'a, Key, Data> {
pub fn spawn<F: Future<Output = Result<(), UpstreamError>> + Send + 'static>(mut self, fut: F) {
let key = self.drop_guard.take_key();
self.drop_guard.internal_join_set.spawn(async move {
match fut.await {
Ok(()) => InternalJoinSetResult::DataCommitResult(key, Ok(())),
Err(err) => InternalJoinSetResult::DataCommitResult(key, Err(err)),
}
});
}
}
impl<'a, Key: Send, Data: ServiceData> DataCommitGuard<'a, Key, Data> {
fn key(&self) -> &Key {
self.key.as_ref().unwrap()
}
fn take_key(&mut self) -> Key {
self.key
.take()
.expect("invariant: key must be present, unless dropped.")
}
fn emit_result_async(&mut self, result: InternalJoinSetResult<Key, Data>) {
self.internal_join_set.spawn(async move { result });
}
fn resolve(mut self) {
let key = self.take_key();
self.emit_result_async(InternalJoinSetResult::DataCommitResult(key, Ok(())));
}
fn reject<E: Into<UpstreamError>>(mut self, error: E) {
let key = self.take_key();
self.emit_result_async(InternalJoinSetResult::DataCommitResult(
key,
Err(error.into()),
));
}
}
impl<Key: Send, Data: ServiceData> Drop for DataCommitGuard<'_, Key, Data> {
fn drop(&mut self) {
if let Some(key) = self.key.take() {
self.emit_result_async(InternalJoinSetResult::DataCommitResult(
key,
Err(UpstreamError::OperationAborted),
));
}
}
}