Skip to main content

orpc_client/
client.rs

1use futures_core::Stream;
2use serde::Serialize;
3use serde::de::DeserializeOwned;
4
5use crate::error::ClientError;
6use crate::link::Link;
7use crate::rpc_link::RpcLink;
8
9/// oRPC client for calling remote procedures.
10///
11/// Wraps a [`Link`] implementation and provides typed convenience methods
12/// for queries, mutations, and subscriptions.
13///
14/// # Example
15/// ```ignore
16/// use orpc_client::Client;
17///
18/// let client = Client::new("http://localhost:3000/rpc");
19///
20/// // Query / Mutation
21/// let planet: Planet = client.call("planet.find", &FindInput { name: "Earth".into() }).await?;
22///
23/// // Subscription (SSE stream)
24/// use futures_util::StreamExt;
25/// let mut stream = client.subscribe::<Planet>("planet.stream", &()).await?;
26/// while let Some(result) = stream.next().await {
27///     let planet = result?;
28///     println!("New planet: {planet:?}");
29/// }
30/// ```
31pub struct Client<L = RpcLink> {
32    link: L,
33}
34
35impl Client<RpcLink> {
36    /// Create a new client with a default [`RpcLink`] targeting the given base URL.
37    pub fn new(base_url: impl Into<String>) -> Self {
38        Client {
39            link: RpcLink::new(base_url),
40        }
41    }
42}
43
44impl<L: Link> Client<L> {
45    /// Create a client with a custom [`Link`] implementation.
46    pub fn with_link(link: L) -> Self {
47        Client { link }
48    }
49
50    /// Call a procedure and deserialize the response.
51    ///
52    /// Works for both queries (read) and mutations (write).
53    pub async fn call<I, O>(&self, path: &str, input: &I) -> Result<O, ClientError>
54    where
55        I: Serialize,
56        O: DeserializeOwned,
57    {
58        let input_value = serde_json::to_value(input).map_err(ClientError::Serialize)?;
59        let output_value = self.link.call(path, input_value).await?;
60        serde_json::from_value(output_value).map_err(ClientError::Deserialize)
61    }
62
63    /// Subscribe to a streaming procedure.
64    ///
65    /// Returns a stream of deserialized values. The stream ends when the
66    /// server sends a `done` event or an `error` event.
67    pub async fn subscribe<O>(
68        &self,
69        path: &str,
70        input: &impl Serialize,
71    ) -> Result<impl Stream<Item = Result<O, ClientError>>, ClientError>
72    where
73        O: DeserializeOwned + 'static,
74    {
75        self.subscribe_from(path, input, None).await
76    }
77
78    /// Subscribe with a specific `last_event_id` for SSE reconnection.
79    ///
80    /// The server will resume from events after `last_event_id`.
81    pub async fn subscribe_from<O>(
82        &self,
83        path: &str,
84        input: &impl Serialize,
85        last_event_id: Option<u64>,
86    ) -> Result<impl Stream<Item = Result<O, ClientError>>, ClientError>
87    where
88        O: DeserializeOwned + 'static,
89    {
90        let input_value = serde_json::to_value(input).map_err(ClientError::Serialize)?;
91        let value_stream = self
92            .link
93            .subscribe(path, input_value, last_event_id)
94            .await?;
95
96        Ok(DeserializeStream {
97            inner: value_stream,
98            _phantom: std::marker::PhantomData,
99        })
100    }
101}
102
103use std::pin::Pin;
104use std::task::{Context, Poll};
105
106pin_project_lite::pin_project! {
107    struct DeserializeStream<O> {
108        #[pin]
109        inner: Pin<Box<dyn Stream<Item = Result<serde_json::Value, ClientError>> + Send>>,
110        _phantom: std::marker::PhantomData<O>,
111    }
112}
113
114impl<O: DeserializeOwned> Stream for DeserializeStream<O> {
115    type Item = Result<O, ClientError>;
116
117    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
118        let this = self.project();
119        match this.inner.poll_next(cx) {
120            Poll::Ready(Some(Ok(value))) => {
121                let result = serde_json::from_value(value).map_err(ClientError::Deserialize);
122                Poll::Ready(Some(result))
123            }
124            Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
125            Poll::Ready(None) => Poll::Ready(None),
126            Poll::Pending => Poll::Pending,
127        }
128    }
129}