Skip to main content

miden_client/note_transport/
grpc.rs

1//! gRPC-based note transport client.
2//!
3//! On native targets, the connection is established lazily on the first request using a
4//! TLS-enabled `tonic` channel. On WASM, a `tonic_web_wasm_client` is created on demand.
5
6use alloc::boxed::Box;
7use alloc::string::String;
8use alloc::vec::Vec;
9use core::pin::Pin;
10use core::task::{Context, Poll};
11
12use futures::Stream;
13use miden_protocol::note::{NoteHeader, NoteTag};
14use miden_protocol::utils::serde::{Deserializable, Serializable};
15use miden_tx::utils::sync::RwLock;
16use tonic::{Request, Streaming};
17use tonic_health::pb::HealthCheckRequest;
18use tonic_health::pb::health_client::HealthClient;
19#[cfg(not(target_arch = "wasm32"))]
20use {
21    std::time::Duration,
22    tonic::transport::{Channel, ClientTlsConfig},
23};
24
25use super::generated::miden_note_transport::miden_note_transport_client::MidenNoteTransportClient;
26use super::generated::miden_note_transport::{
27    FetchNotesRequest,
28    SendNoteRequest,
29    StreamNotesRequest,
30    StreamNotesUpdate,
31    TransportNote,
32};
33use super::{NoteInfo, NoteStream, NoteTransportCursor, NoteTransportError};
34
35#[cfg(not(target_arch = "wasm32"))]
36type Service = Channel;
37#[cfg(target_arch = "wasm32")]
38type Service = tonic_web_wasm_client::Client;
39
40/// Establishes a connection to the note transport service, returning the gRPC clients.
41#[cfg(not(target_arch = "wasm32"))]
42async fn connect_channel(
43    endpoint: &str,
44    timeout_ms: u64,
45) -> Result<(MidenNoteTransportClient<Service>, HealthClient<Service>), NoteTransportError> {
46    let endpoint = tonic::transport::Endpoint::try_from(String::from(endpoint))
47        .map_err(|e| NoteTransportError::Connection(Box::new(e)))?
48        .timeout(Duration::from_millis(timeout_ms));
49    let tls = ClientTlsConfig::new().with_native_roots();
50    let channel = endpoint
51        .tls_config(tls)
52        .map_err(|e| NoteTransportError::Connection(Box::new(e)))?
53        .connect()
54        .await
55        .map_err(|e| NoteTransportError::Connection(Box::new(e)))?;
56    Ok((MidenNoteTransportClient::new(channel.clone()), HealthClient::new(channel)))
57}
58
59/// Establishes a connection to the note transport service, returning the gRPC clients.
60///
61/// Note: `timeout_ms` is currently ignored on WASM as `tonic_web_wasm_client::Client` does not
62/// support timeout configuration.
63// TODO: refactor `connect_channel` so that WASM doesn't accept a timeout parameter.
64#[cfg(target_arch = "wasm32")]
65#[allow(clippy::unused_async)]
66async fn connect_channel(
67    endpoint: &str,
68    _timeout_ms: u64,
69) -> Result<(MidenNoteTransportClient<Service>, HealthClient<Service>), NoteTransportError> {
70    let wasm_client = tonic_web_wasm_client::Client::new(String::from(endpoint));
71    Ok((
72        MidenNoteTransportClient::new(wasm_client.clone()),
73        HealthClient::new(wasm_client),
74    ))
75}
76
77/// Inner state holding the connected gRPC clients.
78#[derive(Clone)]
79struct ConnectedClient {
80    client: MidenNoteTransportClient<Service>,
81    health_client: HealthClient<Service>,
82}
83
84/// gRPC client for the note transport network.
85///
86/// The connection is established lazily on first use.
87pub struct GrpcNoteTransportClient {
88    inner: RwLock<Option<ConnectedClient>>,
89    endpoint: String,
90    timeout_ms: u64,
91}
92
93impl GrpcNoteTransportClient {
94    /// Creates a new [`GrpcNoteTransportClient`] without establishing a connection.
95    /// The connection will be established lazily on the first request.
96    pub fn new(endpoint: String, timeout_ms: u64) -> Self {
97        Self {
98            inner: RwLock::new(None),
99            endpoint,
100            timeout_ms,
101        }
102    }
103
104    /// Ensures the client is connected and returns the connected state.
105    async fn ensure_connected(&self) -> Result<ConnectedClient, NoteTransportError> {
106        if let Some(connected) = self.inner.read().as_ref() {
107            return Ok(connected.clone());
108        }
109
110        let (client, health_client) = connect_channel(&self.endpoint, self.timeout_ms).await?;
111        let connected = ConnectedClient { client, health_client };
112        *self.inner.write() = Some(connected.clone());
113        Ok(connected)
114    }
115
116    /// Get a clone of the main client, connecting if needed.
117    async fn api(&self) -> Result<MidenNoteTransportClient<Service>, NoteTransportError> {
118        Ok(self.ensure_connected().await?.client)
119    }
120
121    /// Get a clone of the health client, connecting if needed.
122    async fn health_api(&self) -> Result<HealthClient<Service>, NoteTransportError> {
123        Ok(self.ensure_connected().await?.health_client)
124    }
125
126    /// Pushes a note to the note transport network.
127    ///
128    /// While the note header goes in plaintext, the provided note details can be encrypted.
129    pub async fn send_note(
130        &self,
131        header: NoteHeader,
132        details: Vec<u8>,
133    ) -> Result<(), NoteTransportError> {
134        let request = SendNoteRequest {
135            note: Some(TransportNote { header: header.to_bytes(), details }),
136        };
137
138        self.api()
139            .await?
140            .send_note(Request::new(request))
141            .await
142            .map_err(|e| NoteTransportError::Network(format!("Send note failed: {e:?}")))?;
143
144        Ok(())
145    }
146
147    /// Downloads notes for given tags from the note transport network.
148    ///
149    /// Returns notes labeled after the provided cursor (pagination), and an updated cursor.
150    pub async fn fetch_notes(
151        &self,
152        tags: &[NoteTag],
153        cursor: NoteTransportCursor,
154    ) -> Result<(Vec<NoteInfo>, NoteTransportCursor), NoteTransportError> {
155        let tags_int = tags.iter().map(NoteTag::as_u32).collect();
156        let request = FetchNotesRequest { tags: tags_int, cursor: cursor.value() };
157
158        let response = self
159            .api()
160            .await?
161            .fetch_notes(Request::new(request))
162            .await
163            .map_err(|e| NoteTransportError::Network(format!("Fetch notes failed: {e:?}")))?;
164
165        let response = response.into_inner();
166
167        // Convert protobuf notes to internal format and track the most recent received timestamp
168        let mut notes = Vec::new();
169
170        for pnote in response.notes {
171            let header = NoteHeader::read_from_bytes(&pnote.header)?;
172
173            notes.push(NoteInfo { header, details_bytes: pnote.details });
174        }
175
176        Ok((notes, response.cursor.into()))
177    }
178
179    /// Stream notes from the note transport network.
180    ///
181    /// Subscribes to a given tag.
182    /// New notes are received periodically.
183    pub async fn stream_notes(
184        &self,
185        tag: NoteTag,
186        cursor: NoteTransportCursor,
187    ) -> Result<NoteStreamAdapter, NoteTransportError> {
188        let request = StreamNotesRequest {
189            tag: tag.as_u32(),
190            cursor: cursor.value(),
191        };
192
193        let response = self
194            .api()
195            .await?
196            .stream_notes(request)
197            .await
198            .map_err(|e| NoteTransportError::Network(format!("Stream notes failed: {e:?}")))?;
199        Ok(NoteStreamAdapter::new(response.into_inner()))
200    }
201
202    /// gRPC-standardized server health-check.
203    ///
204    /// Checks if the note transport node and respective gRPC services are serving requests.
205    /// Currently the grPC server operates only one service labelled `MidenNoteTransport`.
206    pub async fn health_check(&mut self) -> Result<(), NoteTransportError> {
207        let request = tonic::Request::new(HealthCheckRequest {
208            service: String::new(), // empty string -> whole server
209        });
210
211        let response = self
212            .health_api()
213            .await?
214            .check(request)
215            .await
216            .map_err(|e| NoteTransportError::Network(format!("Health check failed: {e}")))?
217            .into_inner();
218
219        let serving = matches!(
220            response.status(),
221            tonic_health::pb::health_check_response::ServingStatus::Serving
222        );
223
224        serving
225            .then_some(())
226            .ok_or_else(|| NoteTransportError::Network("Service is not serving".into()))
227    }
228}
229
230#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
231#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
232impl super::NoteTransportClient for GrpcNoteTransportClient {
233    async fn send_note(
234        &self,
235        header: NoteHeader,
236        details: Vec<u8>,
237    ) -> Result<(), NoteTransportError> {
238        self.send_note(header, details).await
239    }
240
241    async fn fetch_notes(
242        &self,
243        tags: &[NoteTag],
244        cursor: NoteTransportCursor,
245    ) -> Result<(Vec<NoteInfo>, NoteTransportCursor), NoteTransportError> {
246        self.fetch_notes(tags, cursor).await
247    }
248
249    async fn stream_notes(
250        &self,
251        tag: NoteTag,
252        cursor: NoteTransportCursor,
253    ) -> Result<Box<dyn NoteStream>, NoteTransportError> {
254        let stream = self.stream_notes(tag, cursor).await?;
255        Ok(Box::new(stream))
256    }
257}
258
259/// Convert from `tonic::Streaming<StreamNotesUpdate>` to [`NoteStream`]
260pub struct NoteStreamAdapter {
261    inner: Streaming<StreamNotesUpdate>,
262}
263
264impl NoteStreamAdapter {
265    /// Create a new [`NoteStreamAdapter`]
266    pub fn new(stream: Streaming<StreamNotesUpdate>) -> Self {
267        Self { inner: stream }
268    }
269}
270
271impl Stream for NoteStreamAdapter {
272    type Item = Result<Vec<NoteInfo>, NoteTransportError>;
273
274    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
275        match Pin::new(&mut self.inner).poll_next(cx) {
276            Poll::Ready(Some(Ok(update))) => {
277                // Convert StreamNotesUpdate to Vec<NoteInfo>
278                let mut notes = Vec::new();
279                for pnote in update.notes {
280                    let header = NoteHeader::read_from_bytes(&pnote.header)?;
281
282                    notes.push(NoteInfo { header, details_bytes: pnote.details });
283                }
284                Poll::Ready(Some(Ok(notes)))
285            },
286            Poll::Ready(Some(Err(status))) => Poll::Ready(Some(Err(NoteTransportError::Network(
287                format!("tonic status: {status}"),
288            )))),
289            Poll::Ready(None) => Poll::Ready(None),
290            Poll::Pending => Poll::Pending,
291        }
292    }
293}
294
295impl NoteStream for NoteStreamAdapter {}