wrpc_interface_keyvalue/
lib.rs

1use core::future::Future;
2
3use anyhow::Context as _;
4use bytes::Bytes;
5use futures::{try_join, Stream, StreamExt as _};
6use tracing::instrument;
7use wrpc_transport::{Acceptor, IncomingInputStream, Value};
8
9pub trait Eventual: wrpc_transport::Client {
10    #[instrument(level = "trace", skip_all)]
11    fn invoke_delete(
12        &self,
13        bucket: &str,
14        key: &str,
15    ) -> impl Future<Output = anyhow::Result<(Result<(), String>, Self::Transmission)>> + Send {
16        self.invoke_static("wrpc:keyvalue/eventual@0.1.0", "delete", (bucket, key))
17    }
18
19    #[instrument(level = "trace", skip_all)]
20    fn serve_delete(
21        &self,
22    ) -> impl Future<
23        Output = anyhow::Result<
24            Self::InvocationStream<
25                Self::Context,
26                (String, String),
27                <Self::Acceptor as Acceptor>::Transmitter,
28            >,
29        >,
30    > + Send {
31        self.serve_static("wrpc:keyvalue/eventual@0.1.0", "delete")
32    }
33
34    #[instrument(level = "trace", skip_all)]
35    fn invoke_exists(
36        &self,
37        bucket: &str,
38        key: &str,
39    ) -> impl Future<Output = anyhow::Result<(Result<bool, String>, Self::Transmission)>> + Send
40    {
41        self.invoke_static("wrpc:keyvalue/eventual@0.1.0", "exists", (bucket, key))
42    }
43
44    #[instrument(level = "trace", skip_all)]
45    fn serve_exists(
46        &self,
47    ) -> impl Future<
48        Output = anyhow::Result<
49            Self::InvocationStream<
50                Self::Context,
51                (String, String),
52                <Self::Acceptor as Acceptor>::Transmitter,
53            >,
54        >,
55    > + Send {
56        self.serve_static("wrpc:keyvalue/eventual@0.1.0", "exists")
57    }
58
59    #[instrument(level = "trace", skip_all)]
60    fn invoke_get(
61        &self,
62        bucket: &str,
63        key: &str,
64    ) -> impl Future<
65        Output = anyhow::Result<(
66            Result<Option<IncomingInputStream>, String>,
67            Self::Transmission,
68        )>,
69    > + Send {
70        self.invoke_static("wrpc:keyvalue/eventual@0.1.0", "get", (bucket, key))
71    }
72
73    #[instrument(level = "trace", skip_all)]
74    fn serve_get(
75        &self,
76    ) -> impl Future<
77        Output = anyhow::Result<
78            Self::InvocationStream<
79                Self::Context,
80                (String, String),
81                <Self::Acceptor as Acceptor>::Transmitter,
82            >,
83        >,
84    > + Send {
85        self.serve_static("wrpc:keyvalue/eventual@0.1.0", "get")
86    }
87
88    #[instrument(level = "trace", skip_all)]
89    fn invoke_set(
90        &self,
91        bucket: &str,
92        key: &str,
93        value: impl Stream<Item = Bytes> + Send + 'static,
94    ) -> impl Future<Output = anyhow::Result<(Result<(), String>, Self::Transmission)>> + Send {
95        self.invoke_static(
96            "wrpc:keyvalue/eventual@0.1.0",
97            "set",
98            (
99                bucket,
100                key,
101                Value::Stream(Box::pin(
102                    value.map(|buf| Ok(buf.into_iter().map(Value::U8).map(Some).collect())),
103                )),
104            ),
105        )
106    }
107
108    #[instrument(level = "trace", skip_all)]
109    fn serve_set(
110        &self,
111    ) -> impl Future<
112        Output = anyhow::Result<
113            Self::InvocationStream<
114                Self::Context,
115                (String, String, IncomingInputStream),
116                <Self::Acceptor as Acceptor>::Transmitter,
117            >,
118        >,
119    > + Send {
120        self.serve_static("wrpc:keyvalue/eventual@0.1.0", "set")
121    }
122}
123
124impl<T: wrpc_transport::Client> Eventual for T {}
125
126pub trait Atomic: wrpc_transport::Client {
127    #[instrument(level = "trace", skip_all)]
128    fn invoke_compare_and_swap(
129        &self,
130        bucket: &str,
131        key: &str,
132        old: u64,
133        new: u64,
134    ) -> impl Future<Output = anyhow::Result<(Result<bool, String>, Self::Transmission)>> + Send
135    {
136        self.invoke_static(
137            "wrpc:keyvalue/atomic@0.1.0",
138            "compare-and-swap",
139            (bucket, key, old, new),
140        )
141    }
142
143    #[instrument(level = "trace", skip_all)]
144    fn serve_compare_and_swap(
145        &self,
146    ) -> impl Future<
147        Output = anyhow::Result<
148            Self::InvocationStream<
149                Self::Context,
150                (String, String, u64, u64),
151                <Self::Acceptor as Acceptor>::Transmitter,
152            >,
153        >,
154    > + Send {
155        self.serve_static("wrpc:keyvalue/atomic@0.1.0", "compare-and-swap")
156    }
157
158    #[instrument(level = "trace", skip_all)]
159    fn invoke_increment(
160        &self,
161        bucket: &str,
162        key: &str,
163        delta: u64,
164    ) -> impl Future<Output = anyhow::Result<(Result<u64, String>, Self::Transmission)>> + Send
165    {
166        self.invoke_static(
167            "wrpc:keyvalue/atomic@0.1.0",
168            "increment",
169            (bucket, key, delta),
170        )
171    }
172
173    #[instrument(level = "trace", skip_all)]
174    fn serve_increment(
175        &self,
176    ) -> impl Future<
177        Output = anyhow::Result<
178            Self::InvocationStream<
179                Self::Context,
180                (String, String, u64),
181                <Self::Acceptor as Acceptor>::Transmitter,
182            >,
183        >,
184    > + Send {
185        self.serve_static("wrpc:keyvalue/atomic@0.1.0", "increment")
186    }
187}
188
189impl<T: wrpc_transport::Client> Atomic for T {}
190
191/// `wrpc:keyvalue/atomic` invocation streams
192pub struct AtomicInvocations<T>
193where
194    T: wrpc_transport::Client,
195{
196    pub compare_and_swap: T::InvocationStream<
197        T::Context,
198        (String, String, u64, u64),
199        <T::Acceptor as Acceptor>::Transmitter,
200    >,
201    pub increment: T::InvocationStream<
202        T::Context,
203        (String, String, u64),
204        <T::Acceptor as Acceptor>::Transmitter,
205    >,
206}
207
208/// Serve `wrpc:keyvalue/atomic` invocations
209#[instrument(level = "trace", skip_all)]
210pub async fn serve_atomic<T>(client: &T) -> anyhow::Result<AtomicInvocations<T>>
211where
212    T: Atomic,
213{
214    let (compare_and_swap, increment) = try_join!(
215        async {
216            client
217                .serve_compare_and_swap()
218                .await
219                .context("failed to serve `wrpc:keyvalue/atomic.compare-and-swap`")
220        },
221        async {
222            client
223                .serve_increment()
224                .await
225                .context("failed to serve `wrpc:keyvalue/atomic.increment`")
226        },
227    )?;
228    Ok(AtomicInvocations {
229        compare_and_swap,
230        increment,
231    })
232}
233
234/// `wrpc:keyvalue/eventual` invocation streams
235pub struct EventualInvocations<T>
236where
237    T: wrpc_transport::Client,
238{
239    pub delete:
240        T::InvocationStream<T::Context, (String, String), <T::Acceptor as Acceptor>::Transmitter>,
241    pub exists:
242        T::InvocationStream<T::Context, (String, String), <T::Acceptor as Acceptor>::Transmitter>,
243    pub get:
244        T::InvocationStream<T::Context, (String, String), <T::Acceptor as Acceptor>::Transmitter>,
245    pub set: T::InvocationStream<
246        T::Context,
247        (String, String, IncomingInputStream),
248        <T::Acceptor as Acceptor>::Transmitter,
249    >,
250}
251
252/// Serve `wrpc:keyvalue/eventual` invocations
253#[instrument(level = "trace", skip_all)]
254pub async fn serve_eventual<T>(client: &T) -> anyhow::Result<EventualInvocations<T>>
255where
256    T: Eventual,
257{
258    let (delete, exists, get, set) = try_join!(
259        async {
260            client
261                .serve_delete()
262                .await
263                .context("failed to serve `wrpc:keyvalue/eventual.delete`")
264        },
265        async {
266            client
267                .serve_exists()
268                .await
269                .context("failed to serve `wrpc:keyvalue/eventual.exists`")
270        },
271        async {
272            client
273                .serve_get()
274                .await
275                .context("failed to serve `wrpc:keyvalue/eventual.get`")
276        },
277        async {
278            client
279                .serve_set()
280                .await
281                .context("failed to serve `wrpc:keyvalue/eventual.set`")
282        },
283    )?;
284    Ok(EventualInvocations {
285        delete,
286        exists,
287        get,
288        set,
289    })
290}