use std::collections::HashMap;
use async_trait::async_trait;
use dapr::proto::{common::v1 as common_v1, runtime::v1 as dapr_v1};
use prost_types::Any;
use tonic::{transport::Channel as TonicChannel, Request};
use crate::dapr::*;
use crate::error::Error;
pub struct Client<T>(T);
impl<T: DaprInterface> Client<T> {
pub async fn connect(addr: String) -> Result<Self, Error> {
Ok(Client(T::connect(addr).await?))
}
pub async fn invoke_service<I, M>(
&mut self,
app_id: I,
method_name: M,
data: Option<Any>,
) -> Result<InvokeServiceResponse, Error>
where
I: Into<String>,
M: Into<String>,
{
self.0
.invoke_service(InvokeServiceRequest {
id: app_id.into(),
message: common_v1::InvokeRequest {
method: method_name.into(),
data,
..Default::default()
}
.into(),
})
.await
}
pub async fn invoke_binding<S>(
&mut self,
name: S,
data: Vec<u8>,
) -> Result<InvokeBindingResponse, Error>
where
S: Into<String>,
{
self.0
.invoke_binding(InvokeBindingRequest {
name: name.into(),
data,
..Default::default()
})
.await
}
pub async fn publish_event<S>(
&mut self,
pubsub_name: S,
topic: S,
data_content_type: S,
data: Vec<u8>,
metadata: Option<HashMap<String, String>>,
) -> Result<(), Error>
where
S: Into<String>,
{
let mut mdata = HashMap::<String, String>::new();
if let Some(m) = metadata {
mdata = m;
}
self.0
.publish_event(PublishEventRequest {
pubsub_name: pubsub_name.into(),
topic: topic.into(),
data_content_type: data_content_type.into(),
data,
metadata: mdata,
})
.await
}
pub async fn get_secret<S>(&mut self, store_name: S, key: S) -> Result<GetSecretResponse, Error>
where
S: Into<String>,
{
self.0
.get_secret(GetSecretRequest {
store_name: store_name.into(),
key: key.into(),
..Default::default()
})
.await
}
pub async fn get_state<S>(
&mut self,
store_name: S,
key: S,
metadata: Option<HashMap<String, String>>,
) -> Result<GetStateResponse, Error>
where
S: Into<String>,
{
let mut mdata = HashMap::<String, String>::new();
if let Some(m) = metadata {
mdata = m;
}
self.0
.get_state(GetStateRequest {
store_name: store_name.into(),
key: key.into(),
metadata: mdata,
..Default::default()
})
.await
}
pub async fn save_state<I, K>(&mut self, store_name: K, states: I) -> Result<(), Error>
where
I: IntoIterator<Item = (K, Vec<u8>)>,
K: Into<String>,
{
self.0
.save_state(SaveStateRequest {
store_name: store_name.into(),
states: states.into_iter().map(|pair| pair.into()).collect(),
})
.await
}
pub async fn delete_bulk_state<I, K>(&mut self, store_name: K, states: I) -> Result<(), Error>
where
I: IntoIterator<Item = (K, Vec<u8>)>,
K: Into<String>,
{
self.0
.delete_bulk_state(DeleteBulkStateRequest {
store_name: store_name.into(),
states: states.into_iter().map(|pair| pair.into()).collect(),
})
.await
}
pub async fn delete_state<S>(
&mut self,
store_name: S,
key: S,
metadata: Option<HashMap<String, String>>,
) -> Result<(), Error>
where
S: Into<String>,
{
let mut mdata = HashMap::<String, String>::new();
if let Some(m) = metadata {
mdata = m;
}
self.0
.delete_state(DeleteStateRequest {
store_name: store_name.into(),
key: key.into(),
metadata: mdata,
..Default::default()
})
.await
}
pub async fn set_metadata<S>(&mut self, key: S, value: S) -> Result<(), Error>
where
S: Into<String>,
{
self.0
.set_metadata(SetMetadataRequest {
key: key.into(),
value: value.into(),
..Default::default()
})
.await
}
pub async fn get_metadata(&mut self) -> Result<GetMetadataResponse, Error> {
self.0.get_metadata().await
}
}
#[async_trait]
pub trait DaprInterface: Sized {
async fn connect(addr: String) -> Result<Self, Error>;
async fn publish_event(&mut self, request: PublishEventRequest) -> Result<(), Error>;
async fn invoke_service(
&mut self,
request: InvokeServiceRequest,
) -> Result<InvokeServiceResponse, Error>;
async fn invoke_binding(
&mut self,
request: InvokeBindingRequest,
) -> Result<InvokeBindingResponse, Error>;
async fn get_secret(&mut self, request: GetSecretRequest) -> Result<GetSecretResponse, Error>;
async fn get_state(&mut self, request: GetStateRequest) -> Result<GetStateResponse, Error>;
async fn save_state(&mut self, request: SaveStateRequest) -> Result<(), Error>;
async fn delete_state(&mut self, request: DeleteStateRequest) -> Result<(), Error>;
async fn delete_bulk_state(&mut self, request: DeleteBulkStateRequest) -> Result<(), Error>;
async fn set_metadata(&mut self, request: SetMetadataRequest) -> Result<(), Error>;
async fn get_metadata(&mut self) -> Result<GetMetadataResponse, Error>;
}
#[async_trait]
impl DaprInterface for dapr_v1::dapr_client::DaprClient<TonicChannel> {
async fn connect(addr: String) -> Result<Self, Error> {
Ok(dapr_v1::dapr_client::DaprClient::connect(addr).await?)
}
async fn invoke_service(
&mut self,
request: InvokeServiceRequest,
) -> Result<InvokeServiceResponse, Error> {
Ok(self
.invoke_service(Request::new(request))
.await?
.into_inner())
}
async fn invoke_binding(
&mut self,
request: InvokeBindingRequest,
) -> Result<InvokeBindingResponse, Error> {
Ok(self
.invoke_binding(Request::new(request))
.await?
.into_inner())
}
async fn publish_event(&mut self, request: PublishEventRequest) -> Result<(), Error> {
Ok(self
.publish_event(Request::new(request))
.await?
.into_inner())
}
async fn get_secret(&mut self, request: GetSecretRequest) -> Result<GetSecretResponse, Error> {
Ok(self.get_secret(Request::new(request)).await?.into_inner())
}
async fn get_state(&mut self, request: GetStateRequest) -> Result<GetStateResponse, Error> {
Ok(self.get_state(Request::new(request)).await?.into_inner())
}
async fn save_state(&mut self, request: SaveStateRequest) -> Result<(), Error> {
Ok(self.save_state(Request::new(request)).await?.into_inner())
}
async fn delete_state(&mut self, request: DeleteStateRequest) -> Result<(), Error> {
Ok(self.delete_state(Request::new(request)).await?.into_inner())
}
async fn delete_bulk_state(&mut self, request: DeleteBulkStateRequest) -> Result<(), Error> {
Ok(self
.delete_bulk_state(Request::new(request))
.await?
.into_inner())
}
async fn set_metadata(&mut self, request: SetMetadataRequest) -> Result<(), Error> {
Ok(self.set_metadata(Request::new(request)).await?.into_inner())
}
async fn get_metadata(&mut self) -> Result<GetMetadataResponse, Error> {
Ok(self.get_metadata(Request::new(())).await?.into_inner())
}
}
pub type InvokeServiceRequest = dapr_v1::InvokeServiceRequest;
pub type InvokeServiceResponse = common_v1::InvokeResponse;
pub type InvokeBindingRequest = dapr_v1::InvokeBindingRequest;
pub type InvokeBindingResponse = dapr_v1::InvokeBindingResponse;
pub type PublishEventRequest = dapr_v1::PublishEventRequest;
pub type GetStateRequest = dapr_v1::GetStateRequest;
pub type GetStateResponse = dapr_v1::GetStateResponse;
pub type SaveStateRequest = dapr_v1::SaveStateRequest;
pub type DeleteStateRequest = dapr_v1::DeleteStateRequest;
pub type DeleteBulkStateRequest = dapr_v1::DeleteBulkStateRequest;
pub type GetSecretRequest = dapr_v1::GetSecretRequest;
pub type GetSecretResponse = dapr_v1::GetSecretResponse;
pub type GetMetadataResponse = dapr_v1::GetMetadataResponse;
pub type SetMetadataRequest = dapr_v1::SetMetadataRequest;
pub type TonicClient = dapr_v1::dapr_client::DaprClient<TonicChannel>;
impl<K> From<(K, Vec<u8>)> for common_v1::StateItem
where
K: Into<String>,
{
fn from((key, value): (K, Vec<u8>)) -> Self {
common_v1::StateItem {
key: key.into(),
value,
..Default::default()
}
}
}