use core::future::Future;
use bytes::Bytes;
use futures::{Stream, StreamExt as _};
use tracing::instrument;
use wrpc_transport::{AcceptedInvocation, Acceptor, IncomingInputStream, Value};
pub trait Eventual: wrpc_transport::Client {
#[instrument(level = "trace", skip_all)]
fn invoke_delete(
&self,
bucket: String,
key: String,
) -> impl Future<Output = anyhow::Result<(Result<(), String>, Self::Transmission)>> + Send {
self.invoke_static("wrpc:keyvalue/eventual@0.1.0", "delete", (bucket, key))
}
#[instrument(level = "trace", skip_all)]
fn serve_delete(
&self,
) -> impl Future<
Output = anyhow::Result<
impl Stream<
Item = anyhow::Result<
AcceptedInvocation<
<Self as wrpc_transport::Client>::Context,
(String, String),
<Self as wrpc_transport::Client>::Subject,
<<Self as wrpc_transport::Client>::Acceptor as Acceptor>::Transmitter,
>,
>,
> + Send,
>,
> + Send {
self.serve_static("wrpc:keyvalue/eventual@0.1.0", "delete")
}
#[instrument(level = "trace", skip_all)]
fn invoke_exists(
&self,
bucket: String,
key: String,
) -> impl Future<Output = anyhow::Result<(Result<bool, String>, Self::Transmission)>> + Send
{
self.invoke_static("wrpc:keyvalue/eventual@0.1.0", "exists", (bucket, key))
}
#[instrument(level = "trace", skip_all)]
fn serve_exists(
&self,
) -> impl Future<
Output = anyhow::Result<
impl Stream<
Item = anyhow::Result<
AcceptedInvocation<
<Self as wrpc_transport::Client>::Context,
(String, String),
<Self as wrpc_transport::Client>::Subject,
<<Self as wrpc_transport::Client>::Acceptor as Acceptor>::Transmitter,
>,
>,
> + Send,
>,
> + Send {
self.serve_static("wrpc:keyvalue/eventual@0.1.0", "exists")
}
#[instrument(level = "trace", skip_all)]
fn invoke_get(
&self,
bucket: String,
key: String,
) -> impl Future<
Output = anyhow::Result<(
Result<Option<IncomingInputStream>, String>,
Self::Transmission,
)>,
> + Send {
self.invoke_static("wrpc:keyvalue/eventual@0.1.0", "get", (bucket, key))
}
#[instrument(level = "trace", skip_all)]
fn serve_get(
&self,
) -> impl Future<
Output = anyhow::Result<
impl Stream<
Item = anyhow::Result<
AcceptedInvocation<
<Self as wrpc_transport::Client>::Context,
(String, String),
<Self as wrpc_transport::Client>::Subject,
<<Self as wrpc_transport::Client>::Acceptor as Acceptor>::Transmitter,
>,
>,
> + Send,
>,
> + Send {
self.serve_static("wrpc:keyvalue/eventual@0.1.0", "get")
}
#[instrument(level = "trace", skip_all)]
fn invoke_set(
&self,
bucket: String,
key: String,
value: impl Stream<Item = Bytes> + Send + 'static,
) -> impl Future<Output = anyhow::Result<(Result<(), String>, Self::Transmission)>> + Send {
self.invoke_static(
"wrpc:keyvalue/eventual@0.1.0",
"set",
(
bucket,
key,
Value::Stream(Box::pin(
value.map(|buf| Ok(buf.into_iter().map(Value::U8).map(Some).collect())),
)),
),
)
}
#[instrument(level = "trace", skip_all)]
fn serve_set(
&self,
) -> impl Future<
Output = anyhow::Result<
impl Stream<
Item = anyhow::Result<
AcceptedInvocation<
<Self as wrpc_transport::Client>::Context,
(String, String, IncomingInputStream),
<Self as wrpc_transport::Client>::Subject,
<<Self as wrpc_transport::Client>::Acceptor as Acceptor>::Transmitter,
>,
>,
> + Send,
>,
> + Send {
self.serve_static("wrpc:keyvalue/eventual@0.1.0", "set")
}
}
impl<T: wrpc_transport::Client> Eventual for T {}
pub trait Atomic: wrpc_transport::Client {
#[instrument(level = "trace", skip_all)]
fn invoke_compare_and_swap(
&self,
bucket: String,
key: String,
old: u64,
new: u64,
) -> impl Future<Output = anyhow::Result<(Result<bool, String>, Self::Transmission)>> + Send
{
self.invoke_static(
"wrpc:keyvalue/atomic@0.1.0",
"compare-and-swap",
(bucket, key, old, new),
)
}
#[instrument(level = "trace", skip_all)]
fn serve_compare_and_swap(
&self,
) -> impl Future<
Output = anyhow::Result<
impl Stream<
Item = anyhow::Result<
AcceptedInvocation<
Self::Context,
(String, String, u64, u64),
Self::Subject,
<Self::Acceptor as Acceptor>::Transmitter,
>,
>,
> + Send,
>,
> + Send {
self.serve_static("wrpc:keyvalue/atomic@0.1.0", "compare-and-swap")
}
#[instrument(level = "trace", skip_all)]
fn invoke_increment(
&self,
bucket: String,
key: String,
delta: u64,
) -> impl Future<Output = anyhow::Result<(Result<u64, String>, Self::Transmission)>> + Send
{
self.invoke_static(
"wrpc:keyvalue/atomic@0.1.0",
"increment",
(bucket, key, delta),
)
}
#[instrument(level = "trace", skip_all)]
fn serve_increment(
&self,
) -> impl Future<
Output = anyhow::Result<
impl Stream<
Item = anyhow::Result<
AcceptedInvocation<
Self::Context,
(String, String, u64),
Self::Subject,
<Self::Acceptor as Acceptor>::Transmitter,
>,
>,
> + Send,
>,
> + Send {
self.serve_static("wrpc:keyvalue/atomic@0.1.0", "increment")
}
}
impl<T: wrpc_transport::Client> Atomic for T {}