miden_client/note_transport/
grpc.rs1use alloc::boxed::Box;
2use alloc::string::String;
3use alloc::vec::Vec;
4use core::pin::Pin;
5use core::task::{Context, Poll};
6
7use futures::Stream;
8use miden_protocol::note::{NoteHeader, NoteTag};
9use miden_protocol::utils::{Deserializable, Serializable};
10use miden_tx::utils::sync::RwLock;
11use tonic::{Request, Streaming};
12use tonic_health::pb::HealthCheckRequest;
13use tonic_health::pb::health_client::HealthClient;
14#[cfg(not(target_arch = "wasm32"))]
15use {
16 std::time::Duration,
17 tonic::transport::{Channel, ClientTlsConfig},
18};
19
20use super::generated::miden_note_transport::miden_note_transport_client::MidenNoteTransportClient;
21use super::generated::miden_note_transport::{
22 FetchNotesRequest,
23 SendNoteRequest,
24 StreamNotesRequest,
25 StreamNotesUpdate,
26 TransportNote,
27};
28use super::{NoteInfo, NoteStream, NoteTransportCursor, NoteTransportError};
29
30#[cfg(not(target_arch = "wasm32"))]
31type Service = Channel;
32#[cfg(target_arch = "wasm32")]
33type Service = tonic_web_wasm_client::Client;
34
35pub struct GrpcNoteTransportClient {
37 client: RwLock<MidenNoteTransportClient<Service>>,
38 health_client: RwLock<HealthClient<Service>>,
39}
40
41impl GrpcNoteTransportClient {
42 #[cfg(not(target_arch = "wasm32"))]
44 pub async fn connect(endpoint: String, timeout_ms: u64) -> Result<Self, NoteTransportError> {
45 let endpoint = tonic::transport::Endpoint::try_from(endpoint)
46 .map_err(|e| NoteTransportError::Connection(Box::new(e)))?
47 .timeout(Duration::from_millis(timeout_ms));
48 let tls = ClientTlsConfig::new().with_native_roots();
49 let channel = endpoint
50 .tls_config(tls)
51 .map_err(|e| NoteTransportError::Connection(Box::new(e)))?
52 .connect()
53 .await
54 .map_err(|e| NoteTransportError::Connection(Box::new(e)))?;
55 let health_client = HealthClient::new(channel.clone());
56 let client = MidenNoteTransportClient::new(channel);
57
58 Ok(Self {
59 client: RwLock::new(client),
60 health_client: RwLock::new(health_client),
61 })
62 }
63
64 #[cfg(target_arch = "wasm32")]
66 pub fn new(endpoint: String) -> Self {
67 let wasm_client = tonic_web_wasm_client::Client::new(endpoint);
68 let health_client = HealthClient::new(wasm_client.clone());
69 let client = MidenNoteTransportClient::new(wasm_client);
70
71 Self {
72 client: RwLock::new(client),
73 health_client: RwLock::new(health_client),
74 }
75 }
76
77 fn api(&self) -> MidenNoteTransportClient<Service> {
79 self.client.read().clone()
80 }
81
82 fn health_api(&self) -> HealthClient<Service> {
84 self.health_client.read().clone()
85 }
86
87 pub async fn send_note(
91 &self,
92 header: NoteHeader,
93 details: Vec<u8>,
94 ) -> Result<(), NoteTransportError> {
95 let request = SendNoteRequest {
96 note: Some(TransportNote { header: header.to_bytes(), details }),
97 };
98
99 self.api()
100 .send_note(Request::new(request))
101 .await
102 .map_err(|e| NoteTransportError::Network(format!("Send note failed: {e:?}")))?;
103
104 Ok(())
105 }
106
107 pub async fn fetch_notes(
111 &self,
112 tags: &[NoteTag],
113 cursor: NoteTransportCursor,
114 ) -> Result<(Vec<NoteInfo>, NoteTransportCursor), NoteTransportError> {
115 let tags_int = tags.iter().map(NoteTag::as_u32).collect();
116 let request = FetchNotesRequest { tags: tags_int, cursor: cursor.value() };
117
118 let response = self
119 .api()
120 .fetch_notes(Request::new(request))
121 .await
122 .map_err(|e| NoteTransportError::Network(format!("Fetch notes failed: {e:?}")))?;
123
124 let response = response.into_inner();
125
126 let mut notes = Vec::new();
128
129 for pnote in response.notes {
130 let header = NoteHeader::read_from_bytes(&pnote.header)?;
131
132 notes.push(NoteInfo { header, details_bytes: pnote.details });
133 }
134
135 Ok((notes, response.cursor.into()))
136 }
137
138 pub async fn stream_notes(
143 &self,
144 tag: NoteTag,
145 cursor: NoteTransportCursor,
146 ) -> Result<NoteStreamAdapter, NoteTransportError> {
147 let request = StreamNotesRequest {
148 tag: tag.as_u32(),
149 cursor: cursor.value(),
150 };
151
152 let response = self
153 .api()
154 .stream_notes(request)
155 .await
156 .map_err(|e| NoteTransportError::Network(format!("Stream notes failed: {e:?}")))?;
157 Ok(NoteStreamAdapter::new(response.into_inner()))
158 }
159
160 pub async fn health_check(&mut self) -> Result<(), NoteTransportError> {
165 let request = tonic::Request::new(HealthCheckRequest {
166 service: String::new(), });
168
169 let response = self
170 .health_api()
171 .check(request)
172 .await
173 .map_err(|e| NoteTransportError::Network(format!("Health check failed: {e}")))?
174 .into_inner();
175
176 let serving = matches!(
177 response.status(),
178 tonic_health::pb::health_check_response::ServingStatus::Serving
179 );
180
181 serving
182 .then_some(())
183 .ok_or_else(|| NoteTransportError::Network("Service is not serving".into()))
184 }
185}
186
187#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
188#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
189impl super::NoteTransportClient for GrpcNoteTransportClient {
190 async fn send_note(
191 &self,
192 header: NoteHeader,
193 details: Vec<u8>,
194 ) -> Result<(), NoteTransportError> {
195 self.send_note(header, details).await
196 }
197
198 async fn fetch_notes(
199 &self,
200 tags: &[NoteTag],
201 cursor: NoteTransportCursor,
202 ) -> Result<(Vec<NoteInfo>, NoteTransportCursor), NoteTransportError> {
203 self.fetch_notes(tags, cursor).await
204 }
205
206 async fn stream_notes(
207 &self,
208 tag: NoteTag,
209 cursor: NoteTransportCursor,
210 ) -> Result<Box<dyn NoteStream>, NoteTransportError> {
211 let stream = self.stream_notes(tag, cursor).await?;
212 Ok(Box::new(stream))
213 }
214}
215
216pub struct NoteStreamAdapter {
218 inner: Streaming<StreamNotesUpdate>,
219}
220
221impl NoteStreamAdapter {
222 pub fn new(stream: Streaming<StreamNotesUpdate>) -> Self {
224 Self { inner: stream }
225 }
226}
227
228impl Stream for NoteStreamAdapter {
229 type Item = Result<Vec<NoteInfo>, NoteTransportError>;
230
231 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
232 match Pin::new(&mut self.inner).poll_next(cx) {
233 Poll::Ready(Some(Ok(update))) => {
234 let mut notes = Vec::new();
236 for pnote in update.notes {
237 let header = NoteHeader::read_from_bytes(&pnote.header)?;
238
239 notes.push(NoteInfo { header, details_bytes: pnote.details });
240 }
241 Poll::Ready(Some(Ok(notes)))
242 },
243 Poll::Ready(Some(Err(status))) => Poll::Ready(Some(Err(NoteTransportError::Network(
244 format!("tonic status: {status}"),
245 )))),
246 Poll::Ready(None) => Poll::Ready(None),
247 Poll::Pending => Poll::Pending,
248 }
249 }
250}
251
252impl NoteStream for NoteStreamAdapter {}