1use core::future::Future;
2
3use anyhow::Context as _;
4use bytes::Bytes;
5use futures::{try_join, Stream, StreamExt as _};
6use tracing::instrument;
7use wrpc_transport::{Acceptor, IncomingInputStream, Value};
8
9pub trait Eventual: wrpc_transport::Client {
10 #[instrument(level = "trace", skip_all)]
11 fn invoke_delete(
12 &self,
13 bucket: &str,
14 key: &str,
15 ) -> impl Future<Output = anyhow::Result<(Result<(), String>, Self::Transmission)>> + Send {
16 self.invoke_static("wrpc:keyvalue/eventual@0.1.0", "delete", (bucket, key))
17 }
18
19 #[instrument(level = "trace", skip_all)]
20 fn serve_delete(
21 &self,
22 ) -> impl Future<
23 Output = anyhow::Result<
24 Self::InvocationStream<
25 Self::Context,
26 (String, String),
27 <Self::Acceptor as Acceptor>::Transmitter,
28 >,
29 >,
30 > + Send {
31 self.serve_static("wrpc:keyvalue/eventual@0.1.0", "delete")
32 }
33
34 #[instrument(level = "trace", skip_all)]
35 fn invoke_exists(
36 &self,
37 bucket: &str,
38 key: &str,
39 ) -> impl Future<Output = anyhow::Result<(Result<bool, String>, Self::Transmission)>> + Send
40 {
41 self.invoke_static("wrpc:keyvalue/eventual@0.1.0", "exists", (bucket, key))
42 }
43
44 #[instrument(level = "trace", skip_all)]
45 fn serve_exists(
46 &self,
47 ) -> impl Future<
48 Output = anyhow::Result<
49 Self::InvocationStream<
50 Self::Context,
51 (String, String),
52 <Self::Acceptor as Acceptor>::Transmitter,
53 >,
54 >,
55 > + Send {
56 self.serve_static("wrpc:keyvalue/eventual@0.1.0", "exists")
57 }
58
59 #[instrument(level = "trace", skip_all)]
60 fn invoke_get(
61 &self,
62 bucket: &str,
63 key: &str,
64 ) -> impl Future<
65 Output = anyhow::Result<(
66 Result<Option<IncomingInputStream>, String>,
67 Self::Transmission,
68 )>,
69 > + Send {
70 self.invoke_static("wrpc:keyvalue/eventual@0.1.0", "get", (bucket, key))
71 }
72
73 #[instrument(level = "trace", skip_all)]
74 fn serve_get(
75 &self,
76 ) -> impl Future<
77 Output = anyhow::Result<
78 Self::InvocationStream<
79 Self::Context,
80 (String, String),
81 <Self::Acceptor as Acceptor>::Transmitter,
82 >,
83 >,
84 > + Send {
85 self.serve_static("wrpc:keyvalue/eventual@0.1.0", "get")
86 }
87
88 #[instrument(level = "trace", skip_all)]
89 fn invoke_set(
90 &self,
91 bucket: &str,
92 key: &str,
93 value: impl Stream<Item = Bytes> + Send + 'static,
94 ) -> impl Future<Output = anyhow::Result<(Result<(), String>, Self::Transmission)>> + Send {
95 self.invoke_static(
96 "wrpc:keyvalue/eventual@0.1.0",
97 "set",
98 (
99 bucket,
100 key,
101 Value::Stream(Box::pin(
102 value.map(|buf| Ok(buf.into_iter().map(Value::U8).map(Some).collect())),
103 )),
104 ),
105 )
106 }
107
108 #[instrument(level = "trace", skip_all)]
109 fn serve_set(
110 &self,
111 ) -> impl Future<
112 Output = anyhow::Result<
113 Self::InvocationStream<
114 Self::Context,
115 (String, String, IncomingInputStream),
116 <Self::Acceptor as Acceptor>::Transmitter,
117 >,
118 >,
119 > + Send {
120 self.serve_static("wrpc:keyvalue/eventual@0.1.0", "set")
121 }
122}
123
124impl<T: wrpc_transport::Client> Eventual for T {}
125
126pub trait Atomic: wrpc_transport::Client {
127 #[instrument(level = "trace", skip_all)]
128 fn invoke_compare_and_swap(
129 &self,
130 bucket: &str,
131 key: &str,
132 old: u64,
133 new: u64,
134 ) -> impl Future<Output = anyhow::Result<(Result<bool, String>, Self::Transmission)>> + Send
135 {
136 self.invoke_static(
137 "wrpc:keyvalue/atomic@0.1.0",
138 "compare-and-swap",
139 (bucket, key, old, new),
140 )
141 }
142
143 #[instrument(level = "trace", skip_all)]
144 fn serve_compare_and_swap(
145 &self,
146 ) -> impl Future<
147 Output = anyhow::Result<
148 Self::InvocationStream<
149 Self::Context,
150 (String, String, u64, u64),
151 <Self::Acceptor as Acceptor>::Transmitter,
152 >,
153 >,
154 > + Send {
155 self.serve_static("wrpc:keyvalue/atomic@0.1.0", "compare-and-swap")
156 }
157
158 #[instrument(level = "trace", skip_all)]
159 fn invoke_increment(
160 &self,
161 bucket: &str,
162 key: &str,
163 delta: u64,
164 ) -> impl Future<Output = anyhow::Result<(Result<u64, String>, Self::Transmission)>> + Send
165 {
166 self.invoke_static(
167 "wrpc:keyvalue/atomic@0.1.0",
168 "increment",
169 (bucket, key, delta),
170 )
171 }
172
173 #[instrument(level = "trace", skip_all)]
174 fn serve_increment(
175 &self,
176 ) -> impl Future<
177 Output = anyhow::Result<
178 Self::InvocationStream<
179 Self::Context,
180 (String, String, u64),
181 <Self::Acceptor as Acceptor>::Transmitter,
182 >,
183 >,
184 > + Send {
185 self.serve_static("wrpc:keyvalue/atomic@0.1.0", "increment")
186 }
187}
188
189impl<T: wrpc_transport::Client> Atomic for T {}
190
191pub struct AtomicInvocations<T>
193where
194 T: wrpc_transport::Client,
195{
196 pub compare_and_swap: T::InvocationStream<
197 T::Context,
198 (String, String, u64, u64),
199 <T::Acceptor as Acceptor>::Transmitter,
200 >,
201 pub increment: T::InvocationStream<
202 T::Context,
203 (String, String, u64),
204 <T::Acceptor as Acceptor>::Transmitter,
205 >,
206}
207
208#[instrument(level = "trace", skip_all)]
210pub async fn serve_atomic<T>(client: &T) -> anyhow::Result<AtomicInvocations<T>>
211where
212 T: Atomic,
213{
214 let (compare_and_swap, increment) = try_join!(
215 async {
216 client
217 .serve_compare_and_swap()
218 .await
219 .context("failed to serve `wrpc:keyvalue/atomic.compare-and-swap`")
220 },
221 async {
222 client
223 .serve_increment()
224 .await
225 .context("failed to serve `wrpc:keyvalue/atomic.increment`")
226 },
227 )?;
228 Ok(AtomicInvocations {
229 compare_and_swap,
230 increment,
231 })
232}
233
234pub struct EventualInvocations<T>
236where
237 T: wrpc_transport::Client,
238{
239 pub delete:
240 T::InvocationStream<T::Context, (String, String), <T::Acceptor as Acceptor>::Transmitter>,
241 pub exists:
242 T::InvocationStream<T::Context, (String, String), <T::Acceptor as Acceptor>::Transmitter>,
243 pub get:
244 T::InvocationStream<T::Context, (String, String), <T::Acceptor as Acceptor>::Transmitter>,
245 pub set: T::InvocationStream<
246 T::Context,
247 (String, String, IncomingInputStream),
248 <T::Acceptor as Acceptor>::Transmitter,
249 >,
250}
251
252#[instrument(level = "trace", skip_all)]
254pub async fn serve_eventual<T>(client: &T) -> anyhow::Result<EventualInvocations<T>>
255where
256 T: Eventual,
257{
258 let (delete, exists, get, set) = try_join!(
259 async {
260 client
261 .serve_delete()
262 .await
263 .context("failed to serve `wrpc:keyvalue/eventual.delete`")
264 },
265 async {
266 client
267 .serve_exists()
268 .await
269 .context("failed to serve `wrpc:keyvalue/eventual.exists`")
270 },
271 async {
272 client
273 .serve_get()
274 .await
275 .context("failed to serve `wrpc:keyvalue/eventual.get`")
276 },
277 async {
278 client
279 .serve_set()
280 .await
281 .context("failed to serve `wrpc:keyvalue/eventual.set`")
282 },
283 )?;
284 Ok(EventualInvocations {
285 delete,
286 exists,
287 get,
288 set,
289 })
290}