1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
use core::future::Future;

use bytes::Bytes;
use futures::{Stream, StreamExt as _};
use tracing::instrument;
use wrpc_transport::{IncomingInputStream, Value};

pub trait Eventual: wrpc_transport::Client {
    #[instrument(level = "trace", skip_all)]
    fn invoke_delete(
        &self,
        bucket: &str,
        key: &str,
    ) -> impl Future<Output = anyhow::Result<(Result<(), String>, Self::Transmission)>> + Send {
        self.invoke_static("wrpc:keyvalue/eventual@0.1.0", "delete", (bucket, key))
    }

    #[instrument(level = "trace", skip_all)]
    fn serve_delete(
        &self,
    ) -> impl Future<Output = anyhow::Result<Self::InvocationStream<(String, String)>>> + Send {
        self.serve_static("wrpc:keyvalue/eventual@0.1.0", "delete")
    }

    #[instrument(level = "trace", skip_all)]
    fn invoke_exists(
        &self,
        bucket: &str,
        key: &str,
    ) -> impl Future<Output = anyhow::Result<(Result<bool, String>, Self::Transmission)>> + Send
    {
        self.invoke_static("wrpc:keyvalue/eventual@0.1.0", "exists", (bucket, key))
    }

    #[instrument(level = "trace", skip_all)]
    fn serve_exists(
        &self,
    ) -> impl Future<Output = anyhow::Result<Self::InvocationStream<(String, String)>>> + Send {
        self.serve_static("wrpc:keyvalue/eventual@0.1.0", "exists")
    }

    #[instrument(level = "trace", skip_all)]
    fn invoke_get(
        &self,
        bucket: &str,
        key: &str,
    ) -> impl Future<
        Output = anyhow::Result<(
            Result<Option<IncomingInputStream>, String>,
            Self::Transmission,
        )>,
    > + Send {
        self.invoke_static("wrpc:keyvalue/eventual@0.1.0", "get", (bucket, key))
    }

    #[instrument(level = "trace", skip_all)]
    fn serve_get(
        &self,
    ) -> impl Future<Output = anyhow::Result<Self::InvocationStream<(String, String)>>> + Send {
        self.serve_static("wrpc:keyvalue/eventual@0.1.0", "get")
    }

    #[instrument(level = "trace", skip_all)]
    fn invoke_set(
        &self,
        bucket: &str,
        key: &str,
        value: impl Stream<Item = Bytes> + Send + 'static,
    ) -> impl Future<Output = anyhow::Result<(Result<(), String>, Self::Transmission)>> + Send {
        self.invoke_static(
            "wrpc:keyvalue/eventual@0.1.0",
            "set",
            (
                bucket,
                key,
                Value::Stream(Box::pin(
                    value.map(|buf| Ok(buf.into_iter().map(Value::U8).map(Some).collect())),
                )),
            ),
        )
    }

    #[instrument(level = "trace", skip_all)]
    fn serve_set(
        &self,
    ) -> impl Future<
        Output = anyhow::Result<Self::InvocationStream<(String, String, IncomingInputStream)>>,
    > + Send {
        self.serve_static("wrpc:keyvalue/eventual@0.1.0", "set")
    }
}

impl<T: wrpc_transport::Client> Eventual for T {}

pub trait Atomic: wrpc_transport::Client {
    #[instrument(level = "trace", skip_all)]
    fn invoke_compare_and_swap(
        &self,
        bucket: &str,
        key: &str,
        old: u64,
        new: u64,
    ) -> impl Future<Output = anyhow::Result<(Result<bool, String>, Self::Transmission)>> + Send
    {
        self.invoke_static(
            "wrpc:keyvalue/atomic@0.1.0",
            "compare-and-swap",
            (bucket, key, old, new),
        )
    }

    #[instrument(level = "trace", skip_all)]
    fn serve_compare_and_swap(
        &self,
    ) -> impl Future<Output = anyhow::Result<Self::InvocationStream<(String, String, u64, u64)>>> + Send
    {
        self.serve_static("wrpc:keyvalue/atomic@0.1.0", "compare-and-swap")
    }

    #[instrument(level = "trace", skip_all)]
    fn invoke_increment(
        &self,
        bucket: &str,
        key: &str,
        delta: u64,
    ) -> impl Future<Output = anyhow::Result<(Result<u64, String>, Self::Transmission)>> + Send
    {
        self.invoke_static(
            "wrpc:keyvalue/atomic@0.1.0",
            "increment",
            (bucket, key, delta),
        )
    }

    #[instrument(level = "trace", skip_all)]
    fn serve_increment(
        &self,
    ) -> impl Future<Output = anyhow::Result<Self::InvocationStream<(String, String, u64)>>> + Send
    {
        self.serve_static("wrpc:keyvalue/atomic@0.1.0", "increment")
    }
}

impl<T: wrpc_transport::Client> Atomic for T {}