Skip to main content

miden_client/note_transport/
grpc.rs

1//! gRPC-based note transport client.
2//!
3//! On native targets, the connection is established lazily on the first request using a
4//! TLS-enabled `tonic` channel. On WASM, a `tonic_web_wasm_client` is created on demand.
5
6use alloc::boxed::Box;
7use alloc::string::String;
8use alloc::vec::Vec;
9use core::pin::Pin;
10use core::task::{Context, Poll};
11
12use futures::Stream;
13use miden_protocol::block::BlockNumber;
14use miden_protocol::note::{NoteHeader, NoteTag};
15use miden_protocol::utils::serde::{Deserializable, Serializable};
16use miden_tx::utils::sync::RwLock;
17use tonic::{Request, Streaming};
18use tonic_health::pb::HealthCheckRequest;
19use tonic_health::pb::health_client::HealthClient;
20#[cfg(not(target_arch = "wasm32"))]
21use {
22    std::time::Duration,
23    tonic::transport::{Channel, ClientTlsConfig},
24};
25
26use super::generated::miden_note_transport::miden_note_transport_client::MidenNoteTransportClient;
27use super::generated::miden_note_transport::{
28    FetchNotesRequest,
29    SendNoteRequest,
30    StreamNotesRequest,
31    StreamNotesUpdate,
32    TransportNote,
33};
34use super::{NoteInfo, NoteStream, NoteTransportCursor, NoteTransportError};
35
36#[cfg(not(target_arch = "wasm32"))]
37type Service = Channel;
38#[cfg(target_arch = "wasm32")]
39type Service = tonic_web_wasm_client::Client;
40
41/// Establishes a connection to the note transport service, returning the gRPC clients.
42#[cfg(not(target_arch = "wasm32"))]
43async fn connect_channel(
44    endpoint: &str,
45    timeout_ms: u64,
46) -> Result<(MidenNoteTransportClient<Service>, HealthClient<Service>), NoteTransportError> {
47    let endpoint = tonic::transport::Endpoint::try_from(String::from(endpoint))
48        .map_err(|e| NoteTransportError::Connection(Box::new(e)))?
49        .timeout(Duration::from_millis(timeout_ms));
50    let tls = ClientTlsConfig::new().with_native_roots();
51    let channel = endpoint
52        .tls_config(tls)
53        .map_err(|e| NoteTransportError::Connection(Box::new(e)))?
54        .connect()
55        .await
56        .map_err(|e| NoteTransportError::Connection(Box::new(e)))?;
57    Ok((MidenNoteTransportClient::new(channel.clone()), HealthClient::new(channel)))
58}
59
60/// Establishes a connection to the note transport service, returning the gRPC clients.
61///
62/// Note: `timeout_ms` is currently ignored on WASM as `tonic_web_wasm_client::Client` does not
63/// support timeout configuration.
64// TODO: refactor `connect_channel` so that WASM doesn't accept a timeout parameter.
65#[cfg(target_arch = "wasm32")]
66#[allow(clippy::unused_async)]
67async fn connect_channel(
68    endpoint: &str,
69    _timeout_ms: u64,
70) -> Result<(MidenNoteTransportClient<Service>, HealthClient<Service>), NoteTransportError> {
71    let wasm_client = tonic_web_wasm_client::Client::new(String::from(endpoint));
72    Ok((
73        MidenNoteTransportClient::new(wasm_client.clone()),
74        HealthClient::new(wasm_client),
75    ))
76}
77
78/// Inner state holding the connected gRPC clients.
79#[derive(Clone)]
80struct ConnectedClient {
81    client: MidenNoteTransportClient<Service>,
82    health_client: HealthClient<Service>,
83}
84
85/// gRPC client for the note transport network.
86///
87/// The connection is established lazily on first use.
88pub struct GrpcNoteTransportClient {
89    inner: RwLock<Option<ConnectedClient>>,
90    endpoint: String,
91    timeout_ms: u64,
92}
93
94impl GrpcNoteTransportClient {
95    /// Creates a new [`GrpcNoteTransportClient`] without establishing a connection.
96    /// The connection will be established lazily on the first request.
97    pub fn new(endpoint: String, timeout_ms: u64) -> Self {
98        Self {
99            inner: RwLock::new(None),
100            endpoint,
101            timeout_ms,
102        }
103    }
104
105    /// Ensures the client is connected and returns the connected state.
106    async fn ensure_connected(&self) -> Result<ConnectedClient, NoteTransportError> {
107        if let Some(connected) = self.inner.read().as_ref() {
108            return Ok(connected.clone());
109        }
110
111        let (client, health_client) = connect_channel(&self.endpoint, self.timeout_ms).await?;
112        let connected = ConnectedClient { client, health_client };
113        *self.inner.write() = Some(connected.clone());
114        Ok(connected)
115    }
116
117    /// Get a clone of the main client, connecting if needed.
118    async fn api(&self) -> Result<MidenNoteTransportClient<Service>, NoteTransportError> {
119        Ok(self.ensure_connected().await?.client)
120    }
121
122    /// Get a clone of the health client, connecting if needed.
123    async fn health_api(&self) -> Result<HealthClient<Service>, NoteTransportError> {
124        Ok(self.ensure_connected().await?.health_client)
125    }
126
127    /// Pushes a note to the note transport network.
128    ///
129    /// While the note header goes in plaintext, the provided note details can be encrypted.
130    pub async fn send_note(
131        &self,
132        header: NoteHeader,
133        details: Vec<u8>,
134    ) -> Result<(), NoteTransportError> {
135        self.send_note_inner(header, details, None).await
136    }
137
138    /// Pushes a note to the note transport network, relaying a block hint for the recipient.
139    ///
140    /// `block_hint` is forwarded to the server (as the `TransportNote`'s `after_block_num`) as the
141    /// block from which the recipient should start scanning for the note's commitment.
142    pub async fn send_note_with_block_hint(
143        &self,
144        header: NoteHeader,
145        details: Vec<u8>,
146        block_hint: BlockNumber,
147    ) -> Result<(), NoteTransportError> {
148        self.send_note_inner(header, details, Some(block_hint.as_u32())).await
149    }
150
151    /// Sends a note, passing the optional block hint straight through to the wire `TransportNote`.
152    async fn send_note_inner(
153        &self,
154        header: NoteHeader,
155        details: Vec<u8>,
156        after_block_num: Option<u32>,
157    ) -> Result<(), NoteTransportError> {
158        let request = SendNoteRequest {
159            note: Some(TransportNote {
160                header: header.to_bytes(),
161                details,
162                after_block_num,
163            }),
164        };
165
166        self.api()
167            .await?
168            .send_note(Request::new(request))
169            .await
170            .map_err(|e| NoteTransportError::Network(format!("Send note failed: {e:?}")))?;
171
172        Ok(())
173    }
174
175    /// Downloads notes for given tags from the note transport network.
176    ///
177    /// Returns notes labeled after the provided cursor (pagination), and an updated cursor.
178    pub async fn fetch_notes(
179        &self,
180        tags: &[NoteTag],
181        cursor: NoteTransportCursor,
182    ) -> Result<(Vec<NoteInfo>, NoteTransportCursor), NoteTransportError> {
183        let tags_int = tags.iter().map(NoteTag::as_u32).collect();
184        let request = FetchNotesRequest { tags: tags_int, cursor: cursor.value() };
185
186        let response = self
187            .api()
188            .await?
189            .fetch_notes(Request::new(request))
190            .await
191            .map_err(|e| NoteTransportError::Network(format!("Fetch notes failed: {e:?}")))?;
192
193        let response = response.into_inner();
194
195        // Convert protobuf notes to internal format and track the most recent received timestamp
196        let mut notes = Vec::new();
197
198        for pnote in response.notes {
199            let header = NoteHeader::read_from_bytes(&pnote.header)?;
200
201            notes.push(NoteInfo {
202                header,
203                details_bytes: pnote.details,
204                block_hint: pnote.after_block_num.map(BlockNumber::from),
205            });
206        }
207
208        Ok((notes, response.cursor.into()))
209    }
210
211    /// Stream notes from the note transport network.
212    ///
213    /// Subscribes to a given tag.
214    /// New notes are received periodically.
215    pub async fn stream_notes(
216        &self,
217        tag: NoteTag,
218        cursor: NoteTransportCursor,
219    ) -> Result<NoteStreamAdapter, NoteTransportError> {
220        let request = StreamNotesRequest {
221            tag: tag.as_u32(),
222            cursor: cursor.value(),
223        };
224
225        let response = self
226            .api()
227            .await?
228            .stream_notes(request)
229            .await
230            .map_err(|e| NoteTransportError::Network(format!("Stream notes failed: {e:?}")))?;
231        Ok(NoteStreamAdapter::new(response.into_inner()))
232    }
233
234    /// gRPC-standardized server health-check.
235    ///
236    /// Checks if the note transport node and respective gRPC services are serving requests.
237    /// Currently the grPC server operates only one service labelled `MidenNoteTransport`.
238    pub async fn health_check(&mut self) -> Result<(), NoteTransportError> {
239        let request = tonic::Request::new(HealthCheckRequest {
240            service: String::new(), // empty string -> whole server
241        });
242
243        let response = self
244            .health_api()
245            .await?
246            .check(request)
247            .await
248            .map_err(|e| NoteTransportError::Network(format!("Health check failed: {e}")))?
249            .into_inner();
250
251        let serving = matches!(
252            response.status(),
253            tonic_health::pb::health_check_response::ServingStatus::Serving
254        );
255
256        serving
257            .then_some(())
258            .ok_or_else(|| NoteTransportError::Network("Service is not serving".into()))
259    }
260}
261
262#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
263#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
264impl super::NoteTransportClient for GrpcNoteTransportClient {
265    async fn send_note(
266        &self,
267        header: NoteHeader,
268        details: Vec<u8>,
269    ) -> Result<(), NoteTransportError> {
270        self.send_note(header, details).await
271    }
272
273    async fn send_note_with_block_hint(
274        &self,
275        header: NoteHeader,
276        details: Vec<u8>,
277        block_hint: BlockNumber,
278    ) -> Result<(), NoteTransportError> {
279        self.send_note_with_block_hint(header, details, block_hint).await
280    }
281
282    async fn fetch_notes(
283        &self,
284        tags: &[NoteTag],
285        cursor: NoteTransportCursor,
286    ) -> Result<(Vec<NoteInfo>, NoteTransportCursor), NoteTransportError> {
287        self.fetch_notes(tags, cursor).await
288    }
289
290    async fn stream_notes(
291        &self,
292        tag: NoteTag,
293        cursor: NoteTransportCursor,
294    ) -> Result<Box<dyn NoteStream>, NoteTransportError> {
295        let stream = self.stream_notes(tag, cursor).await?;
296        Ok(Box::new(stream))
297    }
298}
299
300/// Convert from `tonic::Streaming<StreamNotesUpdate>` to [`NoteStream`]
301pub struct NoteStreamAdapter {
302    inner: Streaming<StreamNotesUpdate>,
303}
304
305impl NoteStreamAdapter {
306    /// Create a new [`NoteStreamAdapter`]
307    pub fn new(stream: Streaming<StreamNotesUpdate>) -> Self {
308        Self { inner: stream }
309    }
310}
311
312impl Stream for NoteStreamAdapter {
313    type Item = Result<Vec<NoteInfo>, NoteTransportError>;
314
315    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
316        match Pin::new(&mut self.inner).poll_next(cx) {
317            Poll::Ready(Some(Ok(update))) => {
318                // Convert StreamNotesUpdate to Vec<NoteInfo>
319                let mut notes = Vec::new();
320                for pnote in update.notes {
321                    let header = NoteHeader::read_from_bytes(&pnote.header)?;
322
323                    notes.push(NoteInfo {
324                        header,
325                        details_bytes: pnote.details,
326                        block_hint: pnote.after_block_num.map(BlockNumber::from),
327                    });
328                }
329                Poll::Ready(Some(Ok(notes)))
330            },
331            Poll::Ready(Some(Err(status))) => Poll::Ready(Some(Err(NoteTransportError::Network(
332                format!("tonic status: {status}"),
333            )))),
334            Poll::Ready(None) => Poll::Ready(None),
335            Poll::Pending => Poll::Pending,
336        }
337    }
338}
339
340impl NoteStream for NoteStreamAdapter {}