miden_client/note_transport/mod.rs
1pub mod errors;
2pub mod generated;
3#[cfg(feature = "tonic")]
4pub mod grpc;
5
6use alloc::boxed::Box;
7use alloc::collections::BTreeMap;
8use alloc::string::String;
9use alloc::sync::Arc;
10use alloc::vec::Vec;
11
12use futures::Stream;
13use miden_protocol::address::Address;
14use miden_protocol::block::BlockNumber;
15use miden_protocol::note::{
16 Note,
17 NoteDetails,
18 NoteDetailsCommitment,
19 NoteFile,
20 NoteHeader,
21 NoteId,
22 NoteTag,
23};
24use miden_protocol::utils::serde::Serializable;
25use miden_tx::auth::TransactionAuthenticator;
26use miden_tx::utils::serde::{
27 ByteReader,
28 ByteWriter,
29 Deserializable,
30 DeserializationError,
31 SliceReader,
32};
33
34pub use self::errors::NoteTransportError;
35use crate::{Client, ClientError};
36
37pub const NOTE_TRANSPORT_TESTNET_ENDPOINT: &str = "https://transport.miden.io";
38pub const NOTE_TRANSPORT_DEVNET_ENDPOINT: &str = "https://transport.devnet.miden.io";
39pub const NOTE_TRANSPORT_CURSOR_STORE_SETTING: &str = "note_transport_cursor";
40
41/// Settings key for the durable relay outbox: a serialized `Vec<NoteInfo>` of
42/// private notes whose transport delivery has not yet succeeded.
43/// `send_private_note` appends (replacing any entry with the same note id)
44/// before relaying; [`Client::flush_relay_outbox`] drains entries that re-send
45/// successfully. Reusing the settings k/v avoids a Store-trait schema change
46/// while surviving process restarts.
47pub const NOTE_TRANSPORT_OUTBOX_KEY: &str = "note_transport_outbox";
48
49/// Client note transport methods.
50impl<AUTH> Client<AUTH> {
51 /// Check if note transport connection is configured
52 pub fn is_note_transport_enabled(&self) -> bool {
53 self.note_transport_api.is_some()
54 }
55
56 /// Returns the Note Transport client
57 ///
58 /// Errors if the note transport is not configured.
59 pub(crate) fn get_note_transport_api(
60 &self,
61 ) -> Result<Arc<dyn NoteTransportClient>, NoteTransportError> {
62 self.note_transport_api.clone().ok_or(NoteTransportError::Disabled)
63 }
64
65 /// Send a note through the note transport network.
66 ///
67 /// The note will be end-to-end encrypted (unimplemented, currently plaintext)
68 /// using the provided recipient's `address` details.
69 /// The recipient will be able to retrieve this note through the note's [`NoteTag`].
70 ///
71 /// **Durability.** The relay payload is persisted to the outbox before the
72 /// transport call. If the call fails or is interrupted, the entry stays in
73 /// the outbox and is retried on the next [`Client::flush_relay_outbox`]
74 /// (which [`Client::sync_note_transport`] runs), so a transient transport
75 /// failure does not drop the note. The receiver dedupes by note id, so a
76 /// re-send after a partial success is harmless.
77 pub async fn send_private_note(
78 &mut self,
79 note: Note,
80 _address: &Address,
81 ) -> Result<(), ClientError> {
82 let api = self.get_note_transport_api()?;
83
84 let header = *note.header();
85 let note_id = header.id();
86 let details = NoteDetails::from(note);
87 let details_bytes = details.to_bytes();
88 // e2ee impl hint:
89 // address.key().encrypt(details_bytes)
90
91 // Persist the payload before the network call so a failed or
92 // interrupted `send_note` leaves a recoverable record rather than
93 // losing the only copy with the call frame.
94 let entry = NoteInfo {
95 header,
96 details_bytes: details_bytes.clone(),
97 };
98 let mut outbox = self.load_relay_outbox().await?;
99 // Replace any existing entry for this note id so the latest payload
100 // wins when a still-pending note is re-sent.
101 outbox.retain(|e| e.header.id() != note_id);
102 outbox.push(entry);
103 self.save_relay_outbox(outbox).await?;
104
105 api.send_note(header, details_bytes).await?;
106
107 // Relay succeeded — drop the entry. A failed store write here is
108 // tolerable: the next flush re-sends and the receiver dedupes by note
109 // id, so a stale entry never causes loss.
110 let mut outbox = self.load_relay_outbox().await?;
111 outbox.retain(|e| e.header.id() != note_id);
112 self.save_relay_outbox(outbox).await?;
113
114 Ok(())
115 }
116
117 /// Re-attempt every relay payload in the durable outbox. Each entry is a
118 /// private note whose previous transport delivery failed. Successful
119 /// re-sends are dropped; failures are kept for the next call. Every entry
120 /// is attempted independently, so one persistently-failing note does not
121 /// block the others.
122 ///
123 /// [`Client::sync_note_transport`] runs this automatically and ignores its
124 /// error, so a relay failure can't block a sync. Callers driving retries
125 /// themselves can invoke it directly and inspect the returned error.
126 pub async fn flush_relay_outbox(&self) -> Result<(), ClientError> {
127 let api = self.get_note_transport_api()?;
128
129 let entries = self.load_relay_outbox().await?;
130 if entries.is_empty() {
131 return Ok(());
132 }
133
134 // Attempt every entry independently so a single persistently-failing
135 // note can't block the rest. The outbox holds only the caller's own
136 // failed sends, so it stays small and this is not a meaningful burst.
137 let mut remaining = Vec::new();
138 let mut last_err: Option<NoteTransportError> = None;
139
140 for entry in entries {
141 match api.send_note(entry.header, entry.details_bytes.clone()).await {
142 Ok(()) => {},
143 Err(err) => {
144 tracing::warn!(?err, "relay-outbox entry retry failed; will retry next sync");
145 remaining.push(entry);
146 last_err = Some(err);
147 },
148 }
149 }
150
151 self.save_relay_outbox(remaining).await?;
152
153 if let Some(err) = last_err {
154 return Err(err.into());
155 }
156 Ok(())
157 }
158
159 /// Load the durable relay outbox.
160 ///
161 /// Returns an empty `Vec` if the outbox key is absent. On deserialization
162 /// failure (schema mismatch or storage corruption) the entry is dropped and
163 /// an empty `Vec` is returned — leaving unreadable bytes in place would
164 /// block every subsequent relay because each sync would re-read them.
165 async fn load_relay_outbox(&self) -> Result<Vec<NoteInfo>, ClientError> {
166 let bytes = self
167 .store
168 .get_setting(String::from(NOTE_TRANSPORT_OUTBOX_KEY))
169 .await
170 .map_err(ClientError::StoreError)?;
171 let Some(bytes) = bytes else {
172 return Ok(Vec::new());
173 };
174 match Vec::<NoteInfo>::read_from_bytes(&bytes) {
175 Ok(entries) => Ok(entries),
176 Err(err) => {
177 tracing::warn!(?err, "dropping unreadable relay outbox; resetting to empty");
178 self.store
179 .remove_setting(String::from(NOTE_TRANSPORT_OUTBOX_KEY))
180 .await
181 .map_err(ClientError::StoreError)?;
182 Ok(Vec::new())
183 },
184 }
185 }
186
187 /// Persist the relay outbox, removing the key entirely when empty so the
188 /// settings table doesn't accumulate empty-vec blobs.
189 async fn save_relay_outbox(&self, entries: Vec<NoteInfo>) -> Result<(), ClientError> {
190 let key = String::from(NOTE_TRANSPORT_OUTBOX_KEY);
191 if entries.is_empty() {
192 return self.store.remove_setting(key).await.map_err(ClientError::StoreError);
193 }
194 let bytes = entries.to_bytes();
195 self.store.set_setting(key, bytes).await.map_err(ClientError::StoreError)
196 }
197}
198
199impl<AUTH> Client<AUTH>
200where
201 AUTH: TransactionAuthenticator + Sync + 'static,
202{
203 /// Fetch notes for tracked note tags.
204 ///
205 /// The client will query the configured note transport node for all tracked note tags.
206 /// To list tracked tags please use [`Client::get_note_tags`]. To add a new note tag please use
207 /// [`Client::add_note_tag`].
208 /// Only notes directed at your addresses will be stored and readable given the use of
209 /// end-to-end encryption (unimplemented).
210 /// Fetched notes will be stored into the client's store.
211 ///
212 /// An internal pagination mechanism is employed to reduce the number of downloaded notes.
213 /// To fetch the full history of private notes for the tracked tags, use
214 /// [`Client::fetch_all_private_notes`].
215 pub async fn fetch_private_notes(&mut self) -> Result<(), ClientError> {
216 let note_tags: Vec<NoteTag> =
217 self.store.get_unique_note_tags().await?.into_iter().collect();
218 let cursor = self.store.get_note_transport_cursor().await?;
219
220 let (_, new_cursor) = self.fetch_transport_notes(cursor, ¬e_tags).await?;
221 self.store.update_note_transport_cursor(new_cursor).await?;
222
223 Ok(())
224 }
225
226 /// Fetches all notes for tracked note tags, draining the server's paginated
227 /// response by looping until the cursor stops advancing.
228 ///
229 /// Similar to [`Client::fetch_private_notes`] but ignores the stored
230 /// pagination cursor and re-scans from the beginning. The server-side
231 /// transport caps each response at a fixed batch size; this method issues
232 /// repeated fetch calls until one returns the same cursor it was given
233 /// (i.e. no new notes), so the documented "fetches all notes" semantics
234 /// hold regardless of how large the backlog is. Prefer
235 /// [`Client::fetch_private_notes`] for steady-state syncing to avoid
236 /// re-downloading already-seen notes.
237 pub async fn fetch_all_private_notes(&mut self) -> Result<(), ClientError> {
238 // Safety cap on a misbehaving server. At 500 notes per batch, 1000
239 // iterations covers 500k notes — well beyond any plausible retention
240 // window — and bounds the worst-case wall-clock at ~50s at 50ms/req.
241 // Hitting this signals a server bug, not an honest backlog.
242 const MAX_ITERATIONS: usize = 1_000;
243
244 let note_tags: Vec<NoteTag> =
245 self.store.get_unique_note_tags().await?.into_iter().collect();
246 // Snapshot the stored cursor up front so we can advance (never regress)
247 // it after the drain. Without this guard, starting the drain at
248 // `init()` and persisting per-batch would clobber a previously
249 // advanced cursor with the small `rcursor` of the first batch.
250 let stored_cursor = self.store.get_note_transport_cursor().await?;
251
252 let mut cursor = NoteTransportCursor::init();
253 for _ in 0..MAX_ITERATIONS {
254 let (_, new_cursor) = self.fetch_transport_notes(cursor, ¬e_tags).await?;
255 // Terminate on any lack of forward progress. A well-behaved server
256 // returns `new_cursor == cursor` when there are no new notes (since
257 // `rcursor = max(cursor, max_seq_returned)`); using `<=` here also
258 // handles implementations that return an `init()` cursor on empty
259 // batches (see the in-tree mock transport).
260 if new_cursor <= cursor {
261 let final_cursor = core::cmp::max(cursor, stored_cursor);
262 self.store.update_note_transport_cursor(final_cursor).await?;
263 return Ok(());
264 }
265 cursor = new_cursor;
266 }
267
268 Err(ClientError::NoteTransportError(NoteTransportError::PaginationDidNotTerminate(
269 MAX_ITERATIONS,
270 )))
271 }
272
273 /// Fetch one batch of notes from the note transport network for the provided tags.
274 ///
275 /// The server paginates; this method issues one RPC and returns the imported details
276 /// commitments together with the new cursor. The returned cursor equals the input cursor when
277 /// the batch was empty (i.e. no new notes). Callers that want to drain the full backlog should
278 /// loop until `new_cursor == cursor` (see [`Client::fetch_all_private_notes`]). Callers that do
279 /// steady-state polling (see [`Client::sync_state`] / [`Client::fetch_private_notes`]) should
280 /// call this once per tick with the stored cursor.
281 ///
282 /// Downloaded notes are imported into the local store. Persistence of the returned cursor is
283 /// left to the caller so that drain loops can guard against regression of an already-advanced
284 /// stored cursor.
285 pub(crate) async fn fetch_transport_notes(
286 &mut self,
287 cursor: NoteTransportCursor,
288 tags: &[NoteTag],
289 ) -> Result<(Vec<NoteId>, NoteTransportCursor), ClientError> {
290 // Number of blocks to look back from sync height when scanning for committed notes.
291 // Handles the race where a note is committed on-chain just before the NTL delivers
292 // its data — without this, check_expected_notes would scan from sync_height forward
293 // and miss the already-committed note.
294 const NOTE_LOOKBACK_BLOCKS: u32 = 20;
295
296 let mut notes = Vec::new();
297 let (note_infos, rcursor) =
298 self.get_note_transport_api()?.fetch_notes(tags, cursor).await?;
299 for note_info in ¬e_infos {
300 // e2ee impl hint:
301 // for key in self.store.decryption_keys() try
302 // key.decrypt(details_bytes_encrypted)
303 let note = rejoin_note(¬e_info.header, ¬e_info.details_bytes)?;
304 notes.push(note);
305 }
306
307 let sync_height = self.get_sync_height().await?;
308 let after_block_num =
309 BlockNumber::from(sync_height.as_u32().saturating_sub(NOTE_LOOKBACK_BLOCKS));
310
311 let id_by_commitment: BTreeMap<NoteDetailsCommitment, NoteId> =
312 notes.iter().map(|note| (note.details_commitment(), note.id())).collect();
313
314 let mut note_requests = Vec::with_capacity(notes.len());
315 for note in notes {
316 let tag = note.metadata().tag();
317 let note_file = NoteFile::NoteDetails {
318 details: note.into(),
319 after_block_num,
320 tag: Some(tag),
321 };
322 note_requests.push(note_file);
323 }
324 let imported_commitments = self.import_notes(¬e_requests).await?;
325 let imported_ids = imported_commitments
326 .into_iter()
327 .filter_map(|commitment| id_by_commitment.get(&commitment).copied())
328 .collect();
329
330 Ok((imported_ids, rcursor))
331 }
332}
333
334/// Note transport cursor
335///
336/// Pagination integer used to reduce the number of fetched notes from the note transport network,
337/// avoiding duplicate downloads.
338#[derive(Clone, Copy, Debug, PartialEq, PartialOrd, Eq, Ord)]
339pub struct NoteTransportCursor(u64);
340
341/// Note Transport update
342pub struct NoteTransportUpdate {
343 /// Pagination cursor for next fetch
344 pub cursor: NoteTransportCursor,
345 /// Fetched notes
346 pub notes: Vec<Note>,
347}
348
349impl NoteTransportCursor {
350 pub fn new(value: u64) -> Self {
351 Self(value)
352 }
353
354 pub fn init() -> Self {
355 Self::new(0)
356 }
357
358 pub fn value(&self) -> u64 {
359 self.0
360 }
361}
362
363impl From<u64> for NoteTransportCursor {
364 fn from(value: u64) -> Self {
365 Self::new(value)
366 }
367}
368
369/// The main transport client trait for sending and receiving encrypted notes
370#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
371#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
372pub trait NoteTransportClient: Send + Sync {
373 /// Send a note with optionally encrypted details
374 async fn send_note(
375 &self,
376 header: NoteHeader,
377 details: Vec<u8>,
378 ) -> Result<(), NoteTransportError>;
379
380 /// Fetch notes for given tags
381 ///
382 /// Downloads notes for given tags.
383 /// Returns notes labelled after the provided cursor (pagination), and an updated cursor.
384 async fn fetch_notes(
385 &self,
386 tag: &[NoteTag],
387 cursor: NoteTransportCursor,
388 ) -> Result<(Vec<NoteInfo>, NoteTransportCursor), NoteTransportError>;
389
390 /// Stream notes for a given tag
391 async fn stream_notes(
392 &self,
393 tag: NoteTag,
394 cursor: NoteTransportCursor,
395 ) -> Result<Box<dyn NoteStream>, NoteTransportError>;
396}
397
398/// Stream trait for note streaming
399pub trait NoteStream:
400 Stream<Item = Result<Vec<NoteInfo>, NoteTransportError>> + Send + Unpin
401{
402}
403
404/// Information about a note fetched from the note transport network
405#[derive(Debug, Clone)]
406pub struct NoteInfo {
407 /// Note header
408 pub header: NoteHeader,
409 /// Note details, can be encrypted
410 pub details_bytes: Vec<u8>,
411}
412
413// SERIALIZATION
414// ================================================================================================
415
416impl Serializable for NoteInfo {
417 fn write_into<W: ByteWriter>(&self, target: &mut W) {
418 self.header.write_into(target);
419 self.details_bytes.write_into(target);
420 }
421}
422
423impl Deserializable for NoteInfo {
424 fn read_from<R: ByteReader>(source: &mut R) -> Result<Self, DeserializationError> {
425 let header = NoteHeader::read_from(source)?;
426 let details_bytes = Vec::<u8>::read_from(source)?;
427 Ok(NoteInfo { header, details_bytes })
428 }
429}
430
431impl Serializable for NoteTransportCursor {
432 fn write_into<W: ByteWriter>(&self, target: &mut W) {
433 self.0.write_into(target);
434 }
435}
436
437impl Deserializable for NoteTransportCursor {
438 fn read_from<R: ByteReader>(source: &mut R) -> Result<Self, DeserializationError> {
439 let value = u64::read_from(source)?;
440 Ok(Self::new(value))
441 }
442}
443
444fn rejoin_note(header: &NoteHeader, details_bytes: &[u8]) -> Result<Note, DeserializationError> {
445 let mut reader = SliceReader::new(details_bytes);
446 let details = NoteDetails::read_from(&mut reader)?;
447 // The transport wire format only carries `NoteHeader` + serialized `NoteDetails`, not the
448 // attachments collection. We rejoin with empty attachments; this matches the original note
449 // only when it had no attachments in the first place.
450 let partial_metadata = *header.metadata().partial_metadata();
451 Ok(Note::new(
452 details.assets().clone(),
453 partial_metadata,
454 details.recipient().clone(),
455 ))
456}