1use std::future::Future;
2use std::sync::Arc;
3use tokio::sync::{mpsc, Mutex};
4use tokio::task::{JoinSet, LocalSet};
5
6use crate::shard::InternalJoinSetResult;
7use crate::{ServiceData, UpstreamError};
8
9struct DataCommitGuard<'a, Key: Send + 'static, Data: ServiceData> {
10 key: Option<Key>,
11 internal_join_set: &'a mut JoinSet<InternalJoinSetResult<Key, Data>>,
12}
13#[must_use = "the data commit request must be resolved or rejected, otherwise the operation will be considered aborted."]
14pub struct DataCommitRequest<'a, Key: Send + 'static, Data: ServiceData> {
15 data: &'a mut Data,
16 commit_guard: DataCommitGuard<'a, Key, Data>,
17}
18
19impl<'a, Key: Send, Data: ServiceData> DataCommitRequest<'a, Key, Data> {
20 pub(crate) fn new(
21 key: Key,
22 data: &'a mut Data,
23 internal_join_set: &'a mut JoinSet<InternalJoinSetResult<Key, Data>>,
24 ) -> Self {
25 Self {
26 commit_guard: DataCommitGuard {
27 key: Some(key),
28 internal_join_set,
29 },
30 data,
31 }
32 }
33
34 pub fn data_mut(&mut self) -> &mut Data {
35 self.data
36 }
37
38 pub fn data(&self) -> &Data {
39 self.data
40 }
41
42 pub fn key(&self) -> &Key {
43 self.commit_guard.key()
44 }
45
46 pub fn into_processing(self) -> ProcessingDataCommitRequest<'a, Key, Data> {
47 ProcessingDataCommitRequest {
48 drop_guard: self.commit_guard,
49 }
50 }
51
52 pub fn resolve(self) {
53 self.into_processing().resolve();
54 }
55
56 pub fn reject<E: Into<UpstreamError>>(self, error: E) {
57 self.into_processing().reject(error);
58 }
59}
60
61impl<'a, Key: Send + 'static, Data: ServiceData> DataCommitRequest<'a, Key, Data> {
62 pub fn spawn<F: Future<Output = Result<(), UpstreamError>> + Send + 'static>(self, fut: F) {
63 self.into_processing().spawn(fut);
64 }
65}
66
67impl<'a, Key: Send, Data: ServiceData + Clone> DataCommitRequest<'a, Key, Data> {
68 pub fn into_owned(self) -> OwnedDataCommitRequest<'a, Key, Data> {
69 OwnedDataCommitRequest {
70 data: self.data.clone(),
71 commit_guard: self.commit_guard,
72 }
73 }
74}
75
76#[must_use = "the data commit request must be resolved or rejected, otherwise the operation will be considered aborted."]
77pub struct OwnedDataCommitRequest<'a, Key: Send + 'static, Data: ServiceData> {
78 data: Data,
79 commit_guard: DataCommitGuard<'a, Key, Data>,
80}
81
82impl<'a, Key: Send, Data: ServiceData> OwnedDataCommitRequest<'a, Key, Data> {
83 pub fn into_inner(self) -> (Data, ProcessingDataCommitRequest<'a, Key, Data>) {
84 (
85 self.data,
86 ProcessingDataCommitRequest {
87 drop_guard: self.commit_guard,
88 },
89 )
90 }
91
92 pub fn data(&self) -> &Data {
93 &self.data
94 }
95
96 pub fn resolve(self) {
97 self.commit_guard.resolve();
98 }
99
100 pub fn reject<E: Into<UpstreamError>>(self, error: E) {
101 self.commit_guard.reject(error);
102 }
103}
104
105impl<'a, Key: Send + 'static, Data: ServiceData> OwnedDataCommitRequest<'a, Key, Data> {
106 pub fn spawn<
107 R: Future<Output = Result<(), UpstreamError>> + Send + 'static,
108 F: FnOnce(&Key, Data) -> R + Send + 'static,
109 >(
110 mut self,
111 func: F,
112 ) {
113 let key = self.commit_guard.take_key();
114 self.commit_guard.internal_join_set.spawn(async move {
115 let fut = (func)(&key, self.data);
116 match fut.await {
117 Ok(()) => InternalJoinSetResult::DataCommitResult(key, Ok(())),
118 Err(err) => InternalJoinSetResult::DataCommitResult(key, Err(err)),
119 }
120 });
121 }
122}
123
124#[must_use = "the data commit request must be resolved or rejected, otherwise the operation will be considered aborted."]
125pub struct ProcessingDataCommitRequest<'a, Key: Send + 'static, Data: ServiceData> {
126 drop_guard: DataCommitGuard<'a, Key, Data>,
127}
128
129impl<'a, Key: Send, Data: ServiceData> ProcessingDataCommitRequest<'a, Key, Data> {
130 pub fn resolve(self) {
131 self.drop_guard.resolve();
132 }
133
134 pub fn reject<E: Into<UpstreamError>>(self, error: E) {
135 self.drop_guard.reject(error);
136 }
137}
138
139impl<'a, Key: Send + 'static, Data: ServiceData> ProcessingDataCommitRequest<'a, Key, Data> {
140 pub fn spawn<F: Future<Output = Result<(), UpstreamError>> + Send + 'static>(mut self, fut: F) {
141 let key = self.drop_guard.take_key();
142 self.drop_guard.internal_join_set.spawn(async move {
143 match fut.await {
144 Ok(()) => InternalJoinSetResult::DataCommitResult(key, Ok(())),
145 Err(err) => InternalJoinSetResult::DataCommitResult(key, Err(err)),
146 }
147 });
148 }
149}
150
151impl<'a, Key: Send, Data: ServiceData> DataCommitGuard<'a, Key, Data> {
152 fn key(&self) -> &Key {
153 self.key.as_ref().unwrap()
154 }
155
156 fn take_key(&mut self) -> Key {
157 self.key
158 .take()
159 .expect("invariant: key must be present, unless dropped.")
160 }
161
162 fn emit_result_async(&mut self, result: InternalJoinSetResult<Key, Data>) {
163 self.internal_join_set.spawn(async move { result });
164 }
165
166 fn resolve(mut self) {
167 let key = self.take_key();
168 self.emit_result_async(InternalJoinSetResult::DataCommitResult(key, Ok(())));
169 }
170
171 fn reject<E: Into<UpstreamError>>(mut self, error: E) {
172 let key = self.take_key();
173 self.emit_result_async(InternalJoinSetResult::DataCommitResult(
174 key,
175 Err(error.into()),
176 ));
177 }
178}
179
180impl<Key: Send, Data: ServiceData> Drop for DataCommitGuard<'_, Key, Data> {
181 fn drop(&mut self) {
182 if let Some(key) = self.key.take() {
183 self.emit_result_async(InternalJoinSetResult::DataCommitResult(
184 key,
185 Err(UpstreamError::OperationAborted),
186 ));
187 }
188 }
189}