miden_client/note_transport/
grpc.rs1use alloc::boxed::Box;
7use alloc::string::String;
8use alloc::vec::Vec;
9use core::pin::Pin;
10use core::task::{Context, Poll};
11
12use futures::Stream;
13use miden_protocol::note::{NoteHeader, NoteTag};
14use miden_protocol::utils::serde::{Deserializable, Serializable};
15use miden_tx::utils::sync::RwLock;
16use tonic::{Request, Streaming};
17use tonic_health::pb::HealthCheckRequest;
18use tonic_health::pb::health_client::HealthClient;
19#[cfg(not(target_arch = "wasm32"))]
20use {
21 std::time::Duration,
22 tonic::transport::{Channel, ClientTlsConfig},
23};
24
25use super::generated::miden_note_transport::miden_note_transport_client::MidenNoteTransportClient;
26use super::generated::miden_note_transport::{
27 FetchNotesRequest,
28 SendNoteRequest,
29 StreamNotesRequest,
30 StreamNotesUpdate,
31 TransportNote,
32};
33use super::{NoteInfo, NoteStream, NoteTransportCursor, NoteTransportError};
34
35#[cfg(not(target_arch = "wasm32"))]
36type Service = Channel;
37#[cfg(target_arch = "wasm32")]
38type Service = tonic_web_wasm_client::Client;
39
40#[cfg(not(target_arch = "wasm32"))]
42async fn connect_channel(
43 endpoint: &str,
44 timeout_ms: u64,
45) -> Result<(MidenNoteTransportClient<Service>, HealthClient<Service>), NoteTransportError> {
46 let endpoint = tonic::transport::Endpoint::try_from(String::from(endpoint))
47 .map_err(|e| NoteTransportError::Connection(Box::new(e)))?
48 .timeout(Duration::from_millis(timeout_ms));
49 let tls = ClientTlsConfig::new().with_native_roots();
50 let channel = endpoint
51 .tls_config(tls)
52 .map_err(|e| NoteTransportError::Connection(Box::new(e)))?
53 .connect()
54 .await
55 .map_err(|e| NoteTransportError::Connection(Box::new(e)))?;
56 Ok((MidenNoteTransportClient::new(channel.clone()), HealthClient::new(channel)))
57}
58
59#[cfg(target_arch = "wasm32")]
65#[allow(clippy::unused_async)]
66async fn connect_channel(
67 endpoint: &str,
68 _timeout_ms: u64,
69) -> Result<(MidenNoteTransportClient<Service>, HealthClient<Service>), NoteTransportError> {
70 let wasm_client = tonic_web_wasm_client::Client::new(String::from(endpoint));
71 Ok((
72 MidenNoteTransportClient::new(wasm_client.clone()),
73 HealthClient::new(wasm_client),
74 ))
75}
76
77#[derive(Clone)]
79struct ConnectedClient {
80 client: MidenNoteTransportClient<Service>,
81 health_client: HealthClient<Service>,
82}
83
84pub struct GrpcNoteTransportClient {
88 inner: RwLock<Option<ConnectedClient>>,
89 endpoint: String,
90 timeout_ms: u64,
91}
92
93impl GrpcNoteTransportClient {
94 pub fn new(endpoint: String, timeout_ms: u64) -> Self {
97 Self {
98 inner: RwLock::new(None),
99 endpoint,
100 timeout_ms,
101 }
102 }
103
104 async fn ensure_connected(&self) -> Result<ConnectedClient, NoteTransportError> {
106 if let Some(connected) = self.inner.read().as_ref() {
107 return Ok(connected.clone());
108 }
109
110 let (client, health_client) = connect_channel(&self.endpoint, self.timeout_ms).await?;
111 let connected = ConnectedClient { client, health_client };
112 *self.inner.write() = Some(connected.clone());
113 Ok(connected)
114 }
115
116 async fn api(&self) -> Result<MidenNoteTransportClient<Service>, NoteTransportError> {
118 Ok(self.ensure_connected().await?.client)
119 }
120
121 async fn health_api(&self) -> Result<HealthClient<Service>, NoteTransportError> {
123 Ok(self.ensure_connected().await?.health_client)
124 }
125
126 pub async fn send_note(
130 &self,
131 header: NoteHeader,
132 details: Vec<u8>,
133 ) -> Result<(), NoteTransportError> {
134 let request = SendNoteRequest {
135 note: Some(TransportNote { header: header.to_bytes(), details }),
136 };
137
138 self.api()
139 .await?
140 .send_note(Request::new(request))
141 .await
142 .map_err(|e| NoteTransportError::Network(format!("Send note failed: {e:?}")))?;
143
144 Ok(())
145 }
146
147 pub async fn fetch_notes(
151 &self,
152 tags: &[NoteTag],
153 cursor: NoteTransportCursor,
154 ) -> Result<(Vec<NoteInfo>, NoteTransportCursor), NoteTransportError> {
155 let tags_int = tags.iter().map(NoteTag::as_u32).collect();
156 let request = FetchNotesRequest { tags: tags_int, cursor: cursor.value() };
157
158 let response = self
159 .api()
160 .await?
161 .fetch_notes(Request::new(request))
162 .await
163 .map_err(|e| NoteTransportError::Network(format!("Fetch notes failed: {e:?}")))?;
164
165 let response = response.into_inner();
166
167 let mut notes = Vec::new();
169
170 for pnote in response.notes {
171 let header = NoteHeader::read_from_bytes(&pnote.header)?;
172
173 notes.push(NoteInfo { header, details_bytes: pnote.details });
174 }
175
176 Ok((notes, response.cursor.into()))
177 }
178
179 pub async fn stream_notes(
184 &self,
185 tag: NoteTag,
186 cursor: NoteTransportCursor,
187 ) -> Result<NoteStreamAdapter, NoteTransportError> {
188 let request = StreamNotesRequest {
189 tag: tag.as_u32(),
190 cursor: cursor.value(),
191 };
192
193 let response = self
194 .api()
195 .await?
196 .stream_notes(request)
197 .await
198 .map_err(|e| NoteTransportError::Network(format!("Stream notes failed: {e:?}")))?;
199 Ok(NoteStreamAdapter::new(response.into_inner()))
200 }
201
202 pub async fn health_check(&mut self) -> Result<(), NoteTransportError> {
207 let request = tonic::Request::new(HealthCheckRequest {
208 service: String::new(), });
210
211 let response = self
212 .health_api()
213 .await?
214 .check(request)
215 .await
216 .map_err(|e| NoteTransportError::Network(format!("Health check failed: {e}")))?
217 .into_inner();
218
219 let serving = matches!(
220 response.status(),
221 tonic_health::pb::health_check_response::ServingStatus::Serving
222 );
223
224 serving
225 .then_some(())
226 .ok_or_else(|| NoteTransportError::Network("Service is not serving".into()))
227 }
228}
229
230#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
231#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
232impl super::NoteTransportClient for GrpcNoteTransportClient {
233 async fn send_note(
234 &self,
235 header: NoteHeader,
236 details: Vec<u8>,
237 ) -> Result<(), NoteTransportError> {
238 self.send_note(header, details).await
239 }
240
241 async fn fetch_notes(
242 &self,
243 tags: &[NoteTag],
244 cursor: NoteTransportCursor,
245 ) -> Result<(Vec<NoteInfo>, NoteTransportCursor), NoteTransportError> {
246 self.fetch_notes(tags, cursor).await
247 }
248
249 async fn stream_notes(
250 &self,
251 tag: NoteTag,
252 cursor: NoteTransportCursor,
253 ) -> Result<Box<dyn NoteStream>, NoteTransportError> {
254 let stream = self.stream_notes(tag, cursor).await?;
255 Ok(Box::new(stream))
256 }
257}
258
259pub struct NoteStreamAdapter {
261 inner: Streaming<StreamNotesUpdate>,
262}
263
264impl NoteStreamAdapter {
265 pub fn new(stream: Streaming<StreamNotesUpdate>) -> Self {
267 Self { inner: stream }
268 }
269}
270
271impl Stream for NoteStreamAdapter {
272 type Item = Result<Vec<NoteInfo>, NoteTransportError>;
273
274 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
275 match Pin::new(&mut self.inner).poll_next(cx) {
276 Poll::Ready(Some(Ok(update))) => {
277 let mut notes = Vec::new();
279 for pnote in update.notes {
280 let header = NoteHeader::read_from_bytes(&pnote.header)?;
281
282 notes.push(NoteInfo { header, details_bytes: pnote.details });
283 }
284 Poll::Ready(Some(Ok(notes)))
285 },
286 Poll::Ready(Some(Err(status))) => Poll::Ready(Some(Err(NoteTransportError::Network(
287 format!("tonic status: {status}"),
288 )))),
289 Poll::Ready(None) => Poll::Ready(None),
290 Poll::Pending => Poll::Pending,
291 }
292 }
293}
294
295impl NoteStream for NoteStreamAdapter {}