1use futures_core::Stream;
2use serde::Serialize;
3use serde::de::DeserializeOwned;
4
5use crate::error::ClientError;
6use crate::link::Link;
7use crate::rpc_link::RpcLink;
8
9pub struct Client<L = RpcLink> {
32 link: L,
33}
34
35impl Client<RpcLink> {
36 pub fn new(base_url: impl Into<String>) -> Self {
38 Client {
39 link: RpcLink::new(base_url),
40 }
41 }
42}
43
44impl<L: Link> Client<L> {
45 pub fn with_link(link: L) -> Self {
47 Client { link }
48 }
49
50 pub async fn call<I, O>(&self, path: &str, input: &I) -> Result<O, ClientError>
54 where
55 I: Serialize,
56 O: DeserializeOwned,
57 {
58 let input_value = serde_json::to_value(input).map_err(ClientError::Serialize)?;
59 let output_value = self.link.call(path, input_value).await?;
60 serde_json::from_value(output_value).map_err(ClientError::Deserialize)
61 }
62
63 pub async fn subscribe<O>(
68 &self,
69 path: &str,
70 input: &impl Serialize,
71 ) -> Result<impl Stream<Item = Result<O, ClientError>>, ClientError>
72 where
73 O: DeserializeOwned + 'static,
74 {
75 self.subscribe_from(path, input, None).await
76 }
77
78 pub async fn subscribe_from<O>(
82 &self,
83 path: &str,
84 input: &impl Serialize,
85 last_event_id: Option<u64>,
86 ) -> Result<impl Stream<Item = Result<O, ClientError>>, ClientError>
87 where
88 O: DeserializeOwned + 'static,
89 {
90 let input_value = serde_json::to_value(input).map_err(ClientError::Serialize)?;
91 let value_stream = self
92 .link
93 .subscribe(path, input_value, last_event_id)
94 .await?;
95
96 Ok(DeserializeStream {
97 inner: value_stream,
98 _phantom: std::marker::PhantomData,
99 })
100 }
101}
102
103use std::pin::Pin;
104use std::task::{Context, Poll};
105
106pin_project_lite::pin_project! {
107 struct DeserializeStream<O> {
108 #[pin]
109 inner: Pin<Box<dyn Stream<Item = Result<serde_json::Value, ClientError>> + Send>>,
110 _phantom: std::marker::PhantomData<O>,
111 }
112}
113
114impl<O: DeserializeOwned> Stream for DeserializeStream<O> {
115 type Item = Result<O, ClientError>;
116
117 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
118 let this = self.project();
119 match this.inner.poll_next(cx) {
120 Poll::Ready(Some(Ok(value))) => {
121 let result = serde_json::from_value(value).map_err(ClientError::Deserialize);
122 Poll::Ready(Some(result))
123 }
124 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
125 Poll::Ready(None) => Poll::Ready(None),
126 Poll::Pending => Poll::Pending,
127 }
128 }
129}