miden_client/note_transport/
grpc.rs1use alloc::boxed::Box;
7use alloc::string::String;
8use alloc::vec::Vec;
9use core::pin::Pin;
10use core::task::{Context, Poll};
11
12use futures::Stream;
13use miden_protocol::block::BlockNumber;
14use miden_protocol::note::{NoteHeader, NoteTag};
15use miden_protocol::utils::serde::{Deserializable, Serializable};
16use miden_tx::utils::sync::RwLock;
17use tonic::{Request, Streaming};
18use tonic_health::pb::HealthCheckRequest;
19use tonic_health::pb::health_client::HealthClient;
20#[cfg(not(target_arch = "wasm32"))]
21use {
22 std::time::Duration,
23 tonic::transport::{Channel, ClientTlsConfig},
24};
25
26use super::generated::miden_note_transport::miden_note_transport_client::MidenNoteTransportClient;
27use super::generated::miden_note_transport::{
28 FetchNotesRequest,
29 SendNoteRequest,
30 StreamNotesRequest,
31 StreamNotesUpdate,
32 TransportNote,
33};
34use super::{NoteInfo, NoteStream, NoteTransportCursor, NoteTransportError};
35
36#[cfg(not(target_arch = "wasm32"))]
37type Service = Channel;
38#[cfg(target_arch = "wasm32")]
39type Service = tonic_web_wasm_client::Client;
40
41#[cfg(not(target_arch = "wasm32"))]
43async fn connect_channel(
44 endpoint: &str,
45 timeout_ms: u64,
46) -> Result<(MidenNoteTransportClient<Service>, HealthClient<Service>), NoteTransportError> {
47 let endpoint = tonic::transport::Endpoint::try_from(String::from(endpoint))
48 .map_err(|e| NoteTransportError::Connection(Box::new(e)))?
49 .timeout(Duration::from_millis(timeout_ms));
50 let tls = ClientTlsConfig::new().with_native_roots();
51 let channel = endpoint
52 .tls_config(tls)
53 .map_err(|e| NoteTransportError::Connection(Box::new(e)))?
54 .connect()
55 .await
56 .map_err(|e| NoteTransportError::Connection(Box::new(e)))?;
57 Ok((MidenNoteTransportClient::new(channel.clone()), HealthClient::new(channel)))
58}
59
60#[cfg(target_arch = "wasm32")]
66#[allow(clippy::unused_async)]
67async fn connect_channel(
68 endpoint: &str,
69 _timeout_ms: u64,
70) -> Result<(MidenNoteTransportClient<Service>, HealthClient<Service>), NoteTransportError> {
71 let wasm_client = tonic_web_wasm_client::Client::new(String::from(endpoint));
72 Ok((
73 MidenNoteTransportClient::new(wasm_client.clone()),
74 HealthClient::new(wasm_client),
75 ))
76}
77
78#[derive(Clone)]
80struct ConnectedClient {
81 client: MidenNoteTransportClient<Service>,
82 health_client: HealthClient<Service>,
83}
84
85pub struct GrpcNoteTransportClient {
89 inner: RwLock<Option<ConnectedClient>>,
90 endpoint: String,
91 timeout_ms: u64,
92}
93
94impl GrpcNoteTransportClient {
95 pub fn new(endpoint: String, timeout_ms: u64) -> Self {
98 Self {
99 inner: RwLock::new(None),
100 endpoint,
101 timeout_ms,
102 }
103 }
104
105 async fn ensure_connected(&self) -> Result<ConnectedClient, NoteTransportError> {
107 if let Some(connected) = self.inner.read().as_ref() {
108 return Ok(connected.clone());
109 }
110
111 let (client, health_client) = connect_channel(&self.endpoint, self.timeout_ms).await?;
112 let connected = ConnectedClient { client, health_client };
113 *self.inner.write() = Some(connected.clone());
114 Ok(connected)
115 }
116
117 async fn api(&self) -> Result<MidenNoteTransportClient<Service>, NoteTransportError> {
119 Ok(self.ensure_connected().await?.client)
120 }
121
122 async fn health_api(&self) -> Result<HealthClient<Service>, NoteTransportError> {
124 Ok(self.ensure_connected().await?.health_client)
125 }
126
127 pub async fn send_note(
131 &self,
132 header: NoteHeader,
133 details: Vec<u8>,
134 ) -> Result<(), NoteTransportError> {
135 self.send_note_inner(header, details, None).await
136 }
137
138 pub async fn send_note_with_block_hint(
143 &self,
144 header: NoteHeader,
145 details: Vec<u8>,
146 block_hint: BlockNumber,
147 ) -> Result<(), NoteTransportError> {
148 self.send_note_inner(header, details, Some(block_hint.as_u32())).await
149 }
150
151 async fn send_note_inner(
153 &self,
154 header: NoteHeader,
155 details: Vec<u8>,
156 after_block_num: Option<u32>,
157 ) -> Result<(), NoteTransportError> {
158 let request = SendNoteRequest {
159 note: Some(TransportNote {
160 header: header.to_bytes(),
161 details,
162 after_block_num,
163 }),
164 };
165
166 self.api()
167 .await?
168 .send_note(Request::new(request))
169 .await
170 .map_err(|e| NoteTransportError::Network(format!("Send note failed: {e:?}")))?;
171
172 Ok(())
173 }
174
175 pub async fn fetch_notes(
179 &self,
180 tags: &[NoteTag],
181 cursor: NoteTransportCursor,
182 ) -> Result<(Vec<NoteInfo>, NoteTransportCursor), NoteTransportError> {
183 let tags_int = tags.iter().map(NoteTag::as_u32).collect();
184 let request = FetchNotesRequest { tags: tags_int, cursor: cursor.value() };
185
186 let response = self
187 .api()
188 .await?
189 .fetch_notes(Request::new(request))
190 .await
191 .map_err(|e| NoteTransportError::Network(format!("Fetch notes failed: {e:?}")))?;
192
193 let response = response.into_inner();
194
195 let mut notes = Vec::new();
197
198 for pnote in response.notes {
199 let header = NoteHeader::read_from_bytes(&pnote.header)?;
200
201 notes.push(NoteInfo {
202 header,
203 details_bytes: pnote.details,
204 block_hint: pnote.after_block_num.map(BlockNumber::from),
205 });
206 }
207
208 Ok((notes, response.cursor.into()))
209 }
210
211 pub async fn stream_notes(
216 &self,
217 tag: NoteTag,
218 cursor: NoteTransportCursor,
219 ) -> Result<NoteStreamAdapter, NoteTransportError> {
220 let request = StreamNotesRequest {
221 tag: tag.as_u32(),
222 cursor: cursor.value(),
223 };
224
225 let response = self
226 .api()
227 .await?
228 .stream_notes(request)
229 .await
230 .map_err(|e| NoteTransportError::Network(format!("Stream notes failed: {e:?}")))?;
231 Ok(NoteStreamAdapter::new(response.into_inner()))
232 }
233
234 pub async fn health_check(&mut self) -> Result<(), NoteTransportError> {
239 let request = tonic::Request::new(HealthCheckRequest {
240 service: String::new(), });
242
243 let response = self
244 .health_api()
245 .await?
246 .check(request)
247 .await
248 .map_err(|e| NoteTransportError::Network(format!("Health check failed: {e}")))?
249 .into_inner();
250
251 let serving = matches!(
252 response.status(),
253 tonic_health::pb::health_check_response::ServingStatus::Serving
254 );
255
256 serving
257 .then_some(())
258 .ok_or_else(|| NoteTransportError::Network("Service is not serving".into()))
259 }
260}
261
262#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
263#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
264impl super::NoteTransportClient for GrpcNoteTransportClient {
265 async fn send_note(
266 &self,
267 header: NoteHeader,
268 details: Vec<u8>,
269 ) -> Result<(), NoteTransportError> {
270 self.send_note(header, details).await
271 }
272
273 async fn send_note_with_block_hint(
274 &self,
275 header: NoteHeader,
276 details: Vec<u8>,
277 block_hint: BlockNumber,
278 ) -> Result<(), NoteTransportError> {
279 self.send_note_with_block_hint(header, details, block_hint).await
280 }
281
282 async fn fetch_notes(
283 &self,
284 tags: &[NoteTag],
285 cursor: NoteTransportCursor,
286 ) -> Result<(Vec<NoteInfo>, NoteTransportCursor), NoteTransportError> {
287 self.fetch_notes(tags, cursor).await
288 }
289
290 async fn stream_notes(
291 &self,
292 tag: NoteTag,
293 cursor: NoteTransportCursor,
294 ) -> Result<Box<dyn NoteStream>, NoteTransportError> {
295 let stream = self.stream_notes(tag, cursor).await?;
296 Ok(Box::new(stream))
297 }
298}
299
300pub struct NoteStreamAdapter {
302 inner: Streaming<StreamNotesUpdate>,
303}
304
305impl NoteStreamAdapter {
306 pub fn new(stream: Streaming<StreamNotesUpdate>) -> Self {
308 Self { inner: stream }
309 }
310}
311
312impl Stream for NoteStreamAdapter {
313 type Item = Result<Vec<NoteInfo>, NoteTransportError>;
314
315 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
316 match Pin::new(&mut self.inner).poll_next(cx) {
317 Poll::Ready(Some(Ok(update))) => {
318 let mut notes = Vec::new();
320 for pnote in update.notes {
321 let header = NoteHeader::read_from_bytes(&pnote.header)?;
322
323 notes.push(NoteInfo {
324 header,
325 details_bytes: pnote.details,
326 block_hint: pnote.after_block_num.map(BlockNumber::from),
327 });
328 }
329 Poll::Ready(Some(Ok(notes)))
330 },
331 Poll::Ready(Some(Err(status))) => Poll::Ready(Some(Err(NoteTransportError::Network(
332 format!("tonic status: {status}"),
333 )))),
334 Poll::Ready(None) => Poll::Ready(None),
335 Poll::Pending => Poll::Pending,
336 }
337 }
338}
339
340impl NoteStream for NoteStreamAdapter {}