thingvellir/upstream/
data_commit_request.rs

1use std::future::Future;
2use std::sync::Arc;
3use tokio::sync::{mpsc, Mutex};
4use tokio::task::{JoinSet, LocalSet};
5
6use crate::shard::InternalJoinSetResult;
7use crate::{ServiceData, UpstreamError};
8
9struct DataCommitGuard<'a, Key: Send + 'static, Data: ServiceData> {
10    key: Option<Key>,
11    internal_join_set: &'a mut JoinSet<InternalJoinSetResult<Key, Data>>,
12}
13#[must_use = "the data commit request must be resolved or rejected, otherwise the operation will be considered aborted."]
14pub struct DataCommitRequest<'a, Key: Send + 'static, Data: ServiceData> {
15    data: &'a mut Data,
16    commit_guard: DataCommitGuard<'a, Key, Data>,
17}
18
19impl<'a, Key: Send, Data: ServiceData> DataCommitRequest<'a, Key, Data> {
20    pub(crate) fn new(
21        key: Key,
22        data: &'a mut Data,
23        internal_join_set: &'a mut JoinSet<InternalJoinSetResult<Key, Data>>,
24    ) -> Self {
25        Self {
26            commit_guard: DataCommitGuard {
27                key: Some(key),
28                internal_join_set,
29            },
30            data,
31        }
32    }
33
34    pub fn data_mut(&mut self) -> &mut Data {
35        self.data
36    }
37
38    pub fn data(&self) -> &Data {
39        self.data
40    }
41
42    pub fn key(&self) -> &Key {
43        self.commit_guard.key()
44    }
45
46    pub fn into_processing(self) -> ProcessingDataCommitRequest<'a, Key, Data> {
47        ProcessingDataCommitRequest {
48            drop_guard: self.commit_guard,
49        }
50    }
51
52    pub fn resolve(self) {
53        self.into_processing().resolve();
54    }
55
56    pub fn reject<E: Into<UpstreamError>>(self, error: E) {
57        self.into_processing().reject(error);
58    }
59}
60
61impl<'a, Key: Send + 'static, Data: ServiceData> DataCommitRequest<'a, Key, Data> {
62    pub fn spawn<F: Future<Output = Result<(), UpstreamError>> + Send + 'static>(self, fut: F) {
63        self.into_processing().spawn(fut);
64    }
65}
66
67impl<'a, Key: Send, Data: ServiceData + Clone> DataCommitRequest<'a, Key, Data> {
68    pub fn into_owned(self) -> OwnedDataCommitRequest<'a, Key, Data> {
69        OwnedDataCommitRequest {
70            data: self.data.clone(),
71            commit_guard: self.commit_guard,
72        }
73    }
74}
75
76#[must_use = "the data commit request must be resolved or rejected, otherwise the operation will be considered aborted."]
77pub struct OwnedDataCommitRequest<'a, Key: Send + 'static, Data: ServiceData> {
78    data: Data,
79    commit_guard: DataCommitGuard<'a, Key, Data>,
80}
81
82impl<'a, Key: Send, Data: ServiceData> OwnedDataCommitRequest<'a, Key, Data> {
83    pub fn into_inner(self) -> (Data, ProcessingDataCommitRequest<'a, Key, Data>) {
84        (
85            self.data,
86            ProcessingDataCommitRequest {
87                drop_guard: self.commit_guard,
88            },
89        )
90    }
91
92    pub fn data(&self) -> &Data {
93        &self.data
94    }
95
96    pub fn resolve(self) {
97        self.commit_guard.resolve();
98    }
99
100    pub fn reject<E: Into<UpstreamError>>(self, error: E) {
101        self.commit_guard.reject(error);
102    }
103}
104
105impl<'a, Key: Send + 'static, Data: ServiceData> OwnedDataCommitRequest<'a, Key, Data> {
106    pub fn spawn<
107        R: Future<Output = Result<(), UpstreamError>> + Send + 'static,
108        F: FnOnce(&Key, Data) -> R + Send + 'static,
109    >(
110        mut self,
111        func: F,
112    ) {
113        let key = self.commit_guard.take_key();
114        self.commit_guard.internal_join_set.spawn(async move {
115            let fut = (func)(&key, self.data);
116            match fut.await {
117                Ok(()) => InternalJoinSetResult::DataCommitResult(key, Ok(())),
118                Err(err) => InternalJoinSetResult::DataCommitResult(key, Err(err)),
119            }
120        });
121    }
122}
123
124#[must_use = "the data commit request must be resolved or rejected, otherwise the operation will be considered aborted."]
125pub struct ProcessingDataCommitRequest<'a, Key: Send + 'static, Data: ServiceData> {
126    drop_guard: DataCommitGuard<'a, Key, Data>,
127}
128
129impl<'a, Key: Send, Data: ServiceData> ProcessingDataCommitRequest<'a, Key, Data> {
130    pub fn resolve(self) {
131        self.drop_guard.resolve();
132    }
133
134    pub fn reject<E: Into<UpstreamError>>(self, error: E) {
135        self.drop_guard.reject(error);
136    }
137}
138
139impl<'a, Key: Send + 'static, Data: ServiceData> ProcessingDataCommitRequest<'a, Key, Data> {
140    pub fn spawn<F: Future<Output = Result<(), UpstreamError>> + Send + 'static>(mut self, fut: F) {
141        let key = self.drop_guard.take_key();
142        self.drop_guard.internal_join_set.spawn(async move {
143            match fut.await {
144                Ok(()) => InternalJoinSetResult::DataCommitResult(key, Ok(())),
145                Err(err) => InternalJoinSetResult::DataCommitResult(key, Err(err)),
146            }
147        });
148    }
149}
150
151impl<'a, Key: Send, Data: ServiceData> DataCommitGuard<'a, Key, Data> {
152    fn key(&self) -> &Key {
153        self.key.as_ref().unwrap()
154    }
155
156    fn take_key(&mut self) -> Key {
157        self.key
158            .take()
159            .expect("invariant: key must be present, unless dropped.")
160    }
161
162    fn emit_result_async(&mut self, result: InternalJoinSetResult<Key, Data>) {
163        self.internal_join_set.spawn(async move { result });
164    }
165
166    fn resolve(mut self) {
167        let key = self.take_key();
168        self.emit_result_async(InternalJoinSetResult::DataCommitResult(key, Ok(())));
169    }
170
171    fn reject<E: Into<UpstreamError>>(mut self, error: E) {
172        let key = self.take_key();
173        self.emit_result_async(InternalJoinSetResult::DataCommitResult(
174            key,
175            Err(error.into()),
176        ));
177    }
178}
179
180impl<Key: Send, Data: ServiceData> Drop for DataCommitGuard<'_, Key, Data> {
181    fn drop(&mut self) {
182        if let Some(key) = self.key.take() {
183            self.emit_result_async(InternalJoinSetResult::DataCommitResult(
184                key,
185                Err(UpstreamError::OperationAborted),
186            ));
187        }
188    }
189}