Skip to main content

miden_client/note_transport/
grpc.rs

1use alloc::boxed::Box;
2use alloc::string::String;
3use alloc::vec::Vec;
4use core::pin::Pin;
5use core::task::{Context, Poll};
6
7use futures::Stream;
8use miden_protocol::note::{NoteHeader, NoteTag};
9use miden_protocol::utils::{Deserializable, Serializable};
10use miden_tx::utils::sync::RwLock;
11use tonic::{Request, Streaming};
12use tonic_health::pb::HealthCheckRequest;
13use tonic_health::pb::health_client::HealthClient;
14#[cfg(not(target_arch = "wasm32"))]
15use {
16    std::time::Duration,
17    tonic::transport::{Channel, ClientTlsConfig},
18};
19
20use super::generated::miden_note_transport::miden_note_transport_client::MidenNoteTransportClient;
21use super::generated::miden_note_transport::{
22    FetchNotesRequest,
23    SendNoteRequest,
24    StreamNotesRequest,
25    StreamNotesUpdate,
26    TransportNote,
27};
28use super::{NoteInfo, NoteStream, NoteTransportCursor, NoteTransportError};
29
30#[cfg(not(target_arch = "wasm32"))]
31type Service = Channel;
32#[cfg(target_arch = "wasm32")]
33type Service = tonic_web_wasm_client::Client;
34
35/// gRPC client
36pub struct GrpcNoteTransportClient {
37    client: RwLock<MidenNoteTransportClient<Service>>,
38    health_client: RwLock<HealthClient<Service>>,
39}
40
41impl GrpcNoteTransportClient {
42    /// gRPC client constructor
43    #[cfg(not(target_arch = "wasm32"))]
44    pub async fn connect(endpoint: String, timeout_ms: u64) -> Result<Self, NoteTransportError> {
45        let endpoint = tonic::transport::Endpoint::try_from(endpoint)
46            .map_err(|e| NoteTransportError::Connection(Box::new(e)))?
47            .timeout(Duration::from_millis(timeout_ms));
48        let tls = ClientTlsConfig::new().with_native_roots();
49        let channel = endpoint
50            .tls_config(tls)
51            .map_err(|e| NoteTransportError::Connection(Box::new(e)))?
52            .connect()
53            .await
54            .map_err(|e| NoteTransportError::Connection(Box::new(e)))?;
55        let health_client = HealthClient::new(channel.clone());
56        let client = MidenNoteTransportClient::new(channel);
57
58        Ok(Self {
59            client: RwLock::new(client),
60            health_client: RwLock::new(health_client),
61        })
62    }
63
64    /// gRPC client (WASM) constructor
65    #[cfg(target_arch = "wasm32")]
66    pub fn new(endpoint: String) -> Self {
67        let wasm_client = tonic_web_wasm_client::Client::new(endpoint);
68        let health_client = HealthClient::new(wasm_client.clone());
69        let client = MidenNoteTransportClient::new(wasm_client);
70
71        Self {
72            client: RwLock::new(client),
73            health_client: RwLock::new(health_client),
74        }
75    }
76
77    /// Get a lock to the main client
78    fn api(&self) -> MidenNoteTransportClient<Service> {
79        self.client.read().clone()
80    }
81
82    /// Get a lock to the health client
83    fn health_api(&self) -> HealthClient<Service> {
84        self.health_client.read().clone()
85    }
86
87    /// Pushes a note to the note transport network.
88    ///
89    /// While the note header goes in plaintext, the provided note details can be encrypted.
90    pub async fn send_note(
91        &self,
92        header: NoteHeader,
93        details: Vec<u8>,
94    ) -> Result<(), NoteTransportError> {
95        let request = SendNoteRequest {
96            note: Some(TransportNote { header: header.to_bytes(), details }),
97        };
98
99        self.api()
100            .send_note(Request::new(request))
101            .await
102            .map_err(|e| NoteTransportError::Network(format!("Send note failed: {e:?}")))?;
103
104        Ok(())
105    }
106
107    /// Downloads notes for given tags from the note transport network.
108    ///
109    /// Returns notes labeled after the provided cursor (pagination), and an updated cursor.
110    pub async fn fetch_notes(
111        &self,
112        tags: &[NoteTag],
113        cursor: NoteTransportCursor,
114    ) -> Result<(Vec<NoteInfo>, NoteTransportCursor), NoteTransportError> {
115        let tags_int = tags.iter().map(NoteTag::as_u32).collect();
116        let request = FetchNotesRequest { tags: tags_int, cursor: cursor.value() };
117
118        let response = self
119            .api()
120            .fetch_notes(Request::new(request))
121            .await
122            .map_err(|e| NoteTransportError::Network(format!("Fetch notes failed: {e:?}")))?;
123
124        let response = response.into_inner();
125
126        // Convert protobuf notes to internal format and track the most recent received timestamp
127        let mut notes = Vec::new();
128
129        for pnote in response.notes {
130            let header = NoteHeader::read_from_bytes(&pnote.header)?;
131
132            notes.push(NoteInfo { header, details_bytes: pnote.details });
133        }
134
135        Ok((notes, response.cursor.into()))
136    }
137
138    /// Stream notes from the note transport network.
139    ///
140    /// Subscribes to a given tag.
141    /// New notes are received periodically.
142    pub async fn stream_notes(
143        &self,
144        tag: NoteTag,
145        cursor: NoteTransportCursor,
146    ) -> Result<NoteStreamAdapter, NoteTransportError> {
147        let request = StreamNotesRequest {
148            tag: tag.as_u32(),
149            cursor: cursor.value(),
150        };
151
152        let response = self
153            .api()
154            .stream_notes(request)
155            .await
156            .map_err(|e| NoteTransportError::Network(format!("Stream notes failed: {e:?}")))?;
157        Ok(NoteStreamAdapter::new(response.into_inner()))
158    }
159
160    /// gRPC-standardized server health-check.
161    ///
162    /// Checks if the note transport node and respective gRPC services are serving requests.
163    /// Currently the grPC server operates only one service labelled `MidenNoteTransport`.
164    pub async fn health_check(&mut self) -> Result<(), NoteTransportError> {
165        let request = tonic::Request::new(HealthCheckRequest {
166            service: String::new(), // empty string -> whole server
167        });
168
169        let response = self
170            .health_api()
171            .check(request)
172            .await
173            .map_err(|e| NoteTransportError::Network(format!("Health check failed: {e}")))?
174            .into_inner();
175
176        let serving = matches!(
177            response.status(),
178            tonic_health::pb::health_check_response::ServingStatus::Serving
179        );
180
181        serving
182            .then_some(())
183            .ok_or_else(|| NoteTransportError::Network("Service is not serving".into()))
184    }
185}
186
187#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
188#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
189impl super::NoteTransportClient for GrpcNoteTransportClient {
190    async fn send_note(
191        &self,
192        header: NoteHeader,
193        details: Vec<u8>,
194    ) -> Result<(), NoteTransportError> {
195        self.send_note(header, details).await
196    }
197
198    async fn fetch_notes(
199        &self,
200        tags: &[NoteTag],
201        cursor: NoteTransportCursor,
202    ) -> Result<(Vec<NoteInfo>, NoteTransportCursor), NoteTransportError> {
203        self.fetch_notes(tags, cursor).await
204    }
205
206    async fn stream_notes(
207        &self,
208        tag: NoteTag,
209        cursor: NoteTransportCursor,
210    ) -> Result<Box<dyn NoteStream>, NoteTransportError> {
211        let stream = self.stream_notes(tag, cursor).await?;
212        Ok(Box::new(stream))
213    }
214}
215
216/// Convert from `tonic::Streaming<StreamNotesUpdate>` to [`NoteStream`]
217pub struct NoteStreamAdapter {
218    inner: Streaming<StreamNotesUpdate>,
219}
220
221impl NoteStreamAdapter {
222    /// Create a new [`NoteStreamAdapter`]
223    pub fn new(stream: Streaming<StreamNotesUpdate>) -> Self {
224        Self { inner: stream }
225    }
226}
227
228impl Stream for NoteStreamAdapter {
229    type Item = Result<Vec<NoteInfo>, NoteTransportError>;
230
231    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
232        match Pin::new(&mut self.inner).poll_next(cx) {
233            Poll::Ready(Some(Ok(update))) => {
234                // Convert StreamNotesUpdate to Vec<NoteInfo>
235                let mut notes = Vec::new();
236                for pnote in update.notes {
237                    let header = NoteHeader::read_from_bytes(&pnote.header)?;
238
239                    notes.push(NoteInfo { header, details_bytes: pnote.details });
240                }
241                Poll::Ready(Some(Ok(notes)))
242            },
243            Poll::Ready(Some(Err(status))) => Poll::Ready(Some(Err(NoteTransportError::Network(
244                format!("tonic status: {status}"),
245            )))),
246            Poll::Ready(None) => Poll::Ready(None),
247            Poll::Pending => Poll::Pending,
248        }
249    }
250}
251
252impl NoteStream for NoteStreamAdapter {}