miden_client/note_transport/mod.rs
1pub mod errors;
2pub mod generated;
3#[cfg(feature = "tonic")]
4pub mod grpc;
5
6use alloc::boxed::Box;
7use alloc::collections::BTreeMap;
8use alloc::string::String;
9use alloc::sync::Arc;
10use alloc::vec::Vec;
11
12use futures::Stream;
13use miden_protocol::address::Address;
14use miden_protocol::block::BlockNumber;
15use miden_protocol::note::{
16 Note,
17 NoteDetails,
18 NoteDetailsCommitment,
19 NoteFile,
20 NoteHeader,
21 NoteId,
22 NoteTag,
23};
24use miden_protocol::utils::serde::Serializable;
25use miden_tx::auth::TransactionAuthenticator;
26use miden_tx::utils::serde::{
27 ByteReader,
28 ByteWriter,
29 Deserializable,
30 DeserializationError,
31 SliceReader,
32};
33
34pub use self::errors::NoteTransportError;
35use crate::{Client, ClientError};
36
37pub const NOTE_TRANSPORT_TESTNET_ENDPOINT: &str = "https://transport.miden.io";
38pub const NOTE_TRANSPORT_DEVNET_ENDPOINT: &str = "https://transport.devnet.miden.io";
39pub const NOTE_TRANSPORT_CURSOR_STORE_SETTING: &str = "note_transport_cursor";
40
41/// Settings key for the durable relay outbox: a serialized `Vec<NoteInfo>` of
42/// private notes whose transport delivery has not yet succeeded.
43/// `send_private_note` appends (replacing any entry with the same note id)
44/// before relaying; [`Client::flush_relay_outbox`] drains entries that re-send
45/// successfully. Reusing the settings k/v avoids a Store-trait schema change
46/// while surviving process restarts.
47pub const NOTE_TRANSPORT_OUTBOX_KEY: &str = "note_transport_outbox";
48
49/// Client note transport methods.
50impl<AUTH> Client<AUTH> {
51 /// Check if note transport connection is configured
52 pub fn is_note_transport_enabled(&self) -> bool {
53 self.note_transport_api.is_some()
54 }
55
56 /// Returns the Note Transport client
57 ///
58 /// Errors if the note transport is not configured.
59 pub(crate) fn get_note_transport_api(
60 &self,
61 ) -> Result<Arc<dyn NoteTransportClient>, NoteTransportError> {
62 self.note_transport_api.clone().ok_or(NoteTransportError::Disabled)
63 }
64
65 /// Send a note through the note transport network.
66 ///
67 /// The note will be end-to-end encrypted (unimplemented, currently plaintext)
68 /// using the provided recipient's `address` details.
69 /// The recipient will be able to retrieve this note through the note's [`NoteTag`].
70 ///
71 /// **Durability.** The relay payload is persisted to the outbox before the
72 /// transport call. If the call fails or is interrupted, the entry stays in
73 /// the outbox and is retried on the next [`Client::flush_relay_outbox`]
74 /// (which [`Client::sync_note_transport`] runs), so a transient transport
75 /// failure does not drop the note. The receiver dedupes by note id, so a
76 /// re-send after a partial success is harmless.
77 ///
78 /// Prefer [`Client::send_private_note_with_block_hint`], which also relays a block hint so the
79 /// recipient gets deterministic delivery instead of relying on its lookback heuristic.
80 #[deprecated(
81 since = "0.15.2",
82 note = "use `Client::send_private_note_with_block_hint` to relay a block hint for deterministic delivery"
83 )]
84 pub async fn send_private_note(
85 &mut self,
86 note: Note,
87 address: &Address,
88 ) -> Result<(), ClientError> {
89 self.relay_private_note(note, address, None).await
90 }
91
92 /// Send a note through the note transport network, relaying a block hint to the recipient.
93 ///
94 /// `block_hint` is the block from which the recipient should start scanning for the note's
95 /// on-chain commitment, instead of relying on its lookback heuristic. Any block at or before
96 /// the commitment is correct, and the chain tip at send time is a safe choice. A tighter value
97 /// just means less for the recipient to scan.
98 ///
99 /// The same durability guarantees as [`Client::send_private_note`] apply: the hint is
100 /// persisted with the relay payload, so a retried send preserves it.
101 pub async fn send_private_note_with_block_hint(
102 &mut self,
103 note: Note,
104 address: &Address,
105 block_hint: BlockNumber,
106 ) -> Result<(), ClientError> {
107 self.relay_private_note(note, address, Some(block_hint)).await
108 }
109
110 /// Shared relay path for [`Client::send_private_note`] and
111 /// [`Client::send_private_note_with_block_hint`]. `block_hint` is the optional block from which
112 /// the recipient should start scanning for the note's commitment.
113 async fn relay_private_note(
114 &self,
115 note: Note,
116 _address: &Address,
117 block_hint: Option<BlockNumber>,
118 ) -> Result<(), ClientError> {
119 let api = self.get_note_transport_api()?;
120
121 let header = *note.header();
122 let note_id = header.id();
123 let details = NoteDetails::from(note);
124 let details_bytes = details.to_bytes();
125 // e2ee impl hint:
126 // address.key().encrypt(details_bytes)
127
128 // Persist the payload before the network call so a failed or
129 // interrupted `send_note` leaves a recoverable record rather than
130 // losing the only copy with the call frame. The hint travels with the
131 // entry so a retried send relays the same value.
132 let entry = NoteInfo {
133 header,
134 details_bytes: details_bytes.clone(),
135 block_hint,
136 };
137 let mut outbox = self.load_relay_outbox().await?;
138 // Replace any existing entry for this note id so the latest payload
139 // wins when a still-pending note is re-sent.
140 outbox.retain(|e| e.header.id() != note_id);
141 outbox.push(entry);
142 self.save_relay_outbox(outbox).await?;
143
144 // Dispatch to the hint-carrying API only when a hint is present, otherwise use the plain
145 // `send_note`. The transport exposes a separate method per scenario.
146 match block_hint {
147 Some(block_hint) => {
148 api.send_note_with_block_hint(header, details_bytes, block_hint).await?;
149 },
150 None => {
151 api.send_note(header, details_bytes).await?;
152 },
153 }
154
155 // Relay succeeded — drop the entry. A failed store write here is
156 // tolerable: the next flush re-sends and the receiver dedupes by note
157 // id, so a stale entry never causes loss.
158 let mut outbox = self.load_relay_outbox().await?;
159 outbox.retain(|e| e.header.id() != note_id);
160 self.save_relay_outbox(outbox).await?;
161
162 Ok(())
163 }
164
165 /// Re-attempt every relay payload in the durable outbox. Each entry is a
166 /// private note whose previous transport delivery failed. Successful
167 /// re-sends are dropped; failures are kept for the next call. Every entry
168 /// is attempted independently, so one persistently-failing note does not
169 /// block the others.
170 ///
171 /// [`Client::sync_note_transport`] runs this automatically and ignores its
172 /// error, so a relay failure can't block a sync. Callers driving retries
173 /// themselves can invoke it directly and inspect the returned error.
174 pub async fn flush_relay_outbox(&self) -> Result<(), ClientError> {
175 let api = self.get_note_transport_api()?;
176
177 let entries = self.load_relay_outbox().await?;
178 if entries.is_empty() {
179 return Ok(());
180 }
181
182 // Attempt every entry independently so a single persistently-failing
183 // note can't block the rest. The outbox holds only the caller's own
184 // failed sends, so it stays small and this is not a meaningful burst.
185 let mut remaining = Vec::new();
186 let mut last_err: Option<NoteTransportError> = None;
187
188 for entry in entries {
189 let relayed = match entry.block_hint {
190 Some(block_hint) => {
191 api.send_note_with_block_hint(
192 entry.header,
193 entry.details_bytes.clone(),
194 block_hint,
195 )
196 .await
197 },
198 None => api.send_note(entry.header, entry.details_bytes.clone()).await,
199 };
200 match relayed {
201 Ok(()) => {},
202 Err(err) => {
203 tracing::warn!(?err, "relay-outbox entry retry failed; will retry next sync");
204 remaining.push(entry);
205 last_err = Some(err);
206 },
207 }
208 }
209
210 self.save_relay_outbox(remaining).await?;
211
212 if let Some(err) = last_err {
213 return Err(err.into());
214 }
215 Ok(())
216 }
217
218 /// Load the durable relay outbox.
219 ///
220 /// Returns an empty `Vec` if the outbox key is absent. On deserialization
221 /// failure (schema mismatch or storage corruption) the entry is dropped and
222 /// an empty `Vec` is returned — leaving unreadable bytes in place would
223 /// block every subsequent relay because each sync would re-read them.
224 async fn load_relay_outbox(&self) -> Result<Vec<NoteInfo>, ClientError> {
225 let bytes = self
226 .store
227 .get_setting(String::from(NOTE_TRANSPORT_OUTBOX_KEY))
228 .await
229 .map_err(ClientError::StoreError)?;
230 let Some(bytes) = bytes else {
231 return Ok(Vec::new());
232 };
233 match Vec::<NoteInfo>::read_from_bytes(&bytes) {
234 Ok(entries) => Ok(entries),
235 Err(err) => {
236 // TODO: remove once #2265 is ported to `next`. Recover a pre-`block_hint`
237 // outbox blob via the legacy (no-hint) layout so a pending relay survives upgrade.
238 if let Ok(legacy) = Vec::<LegacyNoteInfo>::read_from_bytes(&bytes) {
239 return Ok(legacy
240 .into_iter()
241 .map(|note| NoteInfo::new(note.header, note.details_bytes))
242 .collect());
243 }
244 tracing::warn!(?err, "dropping unreadable relay outbox; resetting to empty");
245 self.store
246 .remove_setting(String::from(NOTE_TRANSPORT_OUTBOX_KEY))
247 .await
248 .map_err(ClientError::StoreError)?;
249 Ok(Vec::new())
250 },
251 }
252 }
253
254 /// Persist the relay outbox, removing the key entirely when empty so the
255 /// settings table doesn't accumulate empty-vec blobs.
256 async fn save_relay_outbox(&self, entries: Vec<NoteInfo>) -> Result<(), ClientError> {
257 let key = String::from(NOTE_TRANSPORT_OUTBOX_KEY);
258 if entries.is_empty() {
259 return self.store.remove_setting(key).await.map_err(ClientError::StoreError);
260 }
261 let bytes = entries.to_bytes();
262 self.store.set_setting(key, bytes).await.map_err(ClientError::StoreError)
263 }
264}
265
266impl<AUTH> Client<AUTH>
267where
268 AUTH: TransactionAuthenticator + Sync + 'static,
269{
270 /// Fetch notes for tracked note tags.
271 ///
272 /// The client will query the configured note transport node for all tracked note tags.
273 /// To list tracked tags please use [`Client::get_note_tags`]. To add a new note tag please use
274 /// [`Client::add_note_tag`].
275 /// Only notes directed at your addresses will be stored and readable given the use of
276 /// end-to-end encryption (unimplemented).
277 /// Fetched notes will be stored into the client's store.
278 ///
279 /// An internal pagination mechanism is employed to reduce the number of downloaded notes.
280 /// To fetch the full history of private notes for the tracked tags, use
281 /// [`Client::fetch_all_private_notes`].
282 pub async fn fetch_private_notes(&mut self) -> Result<(), ClientError> {
283 let note_tags: Vec<NoteTag> =
284 self.store.get_unique_note_tags().await?.into_iter().collect();
285 let cursor = self.store.get_note_transport_cursor().await?;
286
287 let (_, new_cursor) = self.fetch_transport_notes(cursor, ¬e_tags).await?;
288 self.store.update_note_transport_cursor(new_cursor).await?;
289
290 Ok(())
291 }
292
293 /// Fetches all notes for tracked note tags, draining the server's paginated
294 /// response by looping until the cursor stops advancing.
295 ///
296 /// Similar to [`Client::fetch_private_notes`] but ignores the stored
297 /// pagination cursor and re-scans from the beginning. The server-side
298 /// transport caps each response at a fixed batch size; this method issues
299 /// repeated fetch calls until one returns the same cursor it was given
300 /// (i.e. no new notes), so the documented "fetches all notes" semantics
301 /// hold regardless of how large the backlog is. Prefer
302 /// [`Client::fetch_private_notes`] for steady-state syncing to avoid
303 /// re-downloading already-seen notes.
304 pub async fn fetch_all_private_notes(&mut self) -> Result<(), ClientError> {
305 // Safety cap on a misbehaving server. At 500 notes per batch, 1000
306 // iterations covers 500k notes — well beyond any plausible retention
307 // window — and bounds the worst-case wall-clock at ~50s at 50ms/req.
308 // Hitting this signals a server bug, not an honest backlog.
309 const MAX_ITERATIONS: usize = 1_000;
310
311 let note_tags: Vec<NoteTag> =
312 self.store.get_unique_note_tags().await?.into_iter().collect();
313 // Snapshot the stored cursor up front so we can advance (never regress)
314 // it after the drain. Without this guard, starting the drain at
315 // `init()` and persisting per-batch would clobber a previously
316 // advanced cursor with the small `rcursor` of the first batch.
317 let stored_cursor = self.store.get_note_transport_cursor().await?;
318
319 let mut cursor = NoteTransportCursor::init();
320 for _ in 0..MAX_ITERATIONS {
321 let (_, new_cursor) = self.fetch_transport_notes(cursor, ¬e_tags).await?;
322 // Terminate on any lack of forward progress. A well-behaved server
323 // returns `new_cursor == cursor` when there are no new notes (since
324 // `rcursor = max(cursor, max_seq_returned)`); using `<=` here also
325 // handles implementations that return an `init()` cursor on empty
326 // batches (see the in-tree mock transport).
327 if new_cursor <= cursor {
328 let final_cursor = core::cmp::max(cursor, stored_cursor);
329 self.store.update_note_transport_cursor(final_cursor).await?;
330 return Ok(());
331 }
332 cursor = new_cursor;
333 }
334
335 Err(ClientError::NoteTransportError(NoteTransportError::PaginationDidNotTerminate(
336 MAX_ITERATIONS,
337 )))
338 }
339
340 /// Fetch one batch of notes from the note transport network for the provided tags.
341 ///
342 /// The server paginates; this method issues one RPC and returns the imported details
343 /// commitments together with the new cursor. The returned cursor equals the input cursor when
344 /// the batch was empty (i.e. no new notes). Callers that want to drain the full backlog should
345 /// loop until `new_cursor == cursor` (see [`Client::fetch_all_private_notes`]). Callers that do
346 /// steady-state polling (see [`Client::sync_state`] / [`Client::fetch_private_notes`]) should
347 /// call this once per tick with the stored cursor.
348 ///
349 /// Downloaded notes are imported into the local store. Persistence of the returned cursor is
350 /// left to the caller so that drain loops can guard against regression of an already-advanced
351 /// stored cursor.
352 pub(crate) async fn fetch_transport_notes(
353 &mut self,
354 cursor: NoteTransportCursor,
355 tags: &[NoteTag],
356 ) -> Result<(Vec<NoteId>, NoteTransportCursor), ClientError> {
357 // Fallback lookback window, in blocks, used only for notes the transport delivered
358 // without a sender-provided block hint. Scanning back from sync height handles
359 // the race where a note is committed on-chain just before the NTL delivers its data.
360 // Without it, check_expected_notes would scan from sync_height forward and miss the
361 // already-committed note. A sender-provided hint is deterministic and always preferred.
362 const NOTE_LOOKBACK_BLOCKS: u32 = 20;
363
364 let mut notes = Vec::new();
365 let (note_infos, rcursor) =
366 self.get_note_transport_api()?.fetch_notes(tags, cursor).await?;
367 for note_info in ¬e_infos {
368 // e2ee impl hint:
369 // for key in self.store.decryption_keys() try
370 // key.decrypt(details_bytes_encrypted)
371 let note = rejoin_note(¬e_info.header, ¬e_info.details_bytes)?;
372 notes.push((note, note_info.block_hint));
373 }
374
375 let sync_height = self.get_sync_height().await?;
376 let fallback_after_block_num =
377 BlockNumber::from(sync_height.as_u32().saturating_sub(NOTE_LOOKBACK_BLOCKS));
378
379 let id_by_commitment: BTreeMap<NoteDetailsCommitment, NoteId> =
380 notes.iter().map(|(note, _)| (note.details_commitment(), note.id())).collect();
381
382 let mut note_requests = Vec::with_capacity(notes.len());
383 for (note, block_hint) in notes {
384 let tag = note.metadata().tag();
385 // Prefer the sender-provided hint, falling back to the lookback window when absent.
386 let after_block_num = block_hint.unwrap_or(fallback_after_block_num);
387 let note_file = NoteFile::NoteDetails {
388 details: note.into(),
389 after_block_num,
390 tag: Some(tag),
391 };
392 note_requests.push(note_file);
393 }
394 let imported_commitments = self.import_notes(¬e_requests).await?;
395 let imported_ids = imported_commitments
396 .into_iter()
397 .filter_map(|commitment| id_by_commitment.get(&commitment).copied())
398 .collect();
399
400 Ok((imported_ids, rcursor))
401 }
402}
403
404/// Note transport cursor
405///
406/// Pagination integer used to reduce the number of fetched notes from the note transport network,
407/// avoiding duplicate downloads.
408#[derive(Clone, Copy, Debug, PartialEq, PartialOrd, Eq, Ord)]
409pub struct NoteTransportCursor(u64);
410
411/// Note Transport update
412pub struct NoteTransportUpdate {
413 /// Pagination cursor for next fetch
414 pub cursor: NoteTransportCursor,
415 /// Fetched notes
416 pub notes: Vec<Note>,
417}
418
419impl NoteTransportCursor {
420 pub fn new(value: u64) -> Self {
421 Self(value)
422 }
423
424 pub fn init() -> Self {
425 Self::new(0)
426 }
427
428 pub fn value(&self) -> u64 {
429 self.0
430 }
431}
432
433impl From<u64> for NoteTransportCursor {
434 fn from(value: u64) -> Self {
435 Self::new(value)
436 }
437}
438
439/// The main transport client trait for sending and receiving encrypted notes
440#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
441#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
442pub trait NoteTransportClient: Send + Sync {
443 /// Send a note with optionally encrypted details
444 async fn send_note(
445 &self,
446 header: NoteHeader,
447 details: Vec<u8>,
448 ) -> Result<(), NoteTransportError>;
449
450 /// Send a note, relaying a block hint for the recipient's commitment scan.
451 ///
452 /// `block_hint` is the block from which the recipient should start scanning for the
453 /// note's commitment. The default implementation ignores it and delegates to
454 /// [`NoteTransportClient::send_note`], so existing implementors keep compiling. Transports
455 /// that can carry the hint (e.g. the gRPC client) override this.
456 async fn send_note_with_block_hint(
457 &self,
458 header: NoteHeader,
459 details: Vec<u8>,
460 _block_hint: BlockNumber,
461 ) -> Result<(), NoteTransportError> {
462 self.send_note(header, details).await
463 }
464
465 /// Fetch notes for given tags
466 ///
467 /// Downloads notes for given tags.
468 /// Returns notes labelled after the provided cursor (pagination), and an updated cursor.
469 async fn fetch_notes(
470 &self,
471 tag: &[NoteTag],
472 cursor: NoteTransportCursor,
473 ) -> Result<(Vec<NoteInfo>, NoteTransportCursor), NoteTransportError>;
474
475 /// Stream notes for a given tag
476 async fn stream_notes(
477 &self,
478 tag: NoteTag,
479 cursor: NoteTransportCursor,
480 ) -> Result<Box<dyn NoteStream>, NoteTransportError>;
481}
482
483/// Stream trait for note streaming
484pub trait NoteStream:
485 Stream<Item = Result<Vec<NoteInfo>, NoteTransportError>> + Send + Unpin
486{
487}
488
489/// Information about a note fetched from the note transport network
490#[derive(Debug, Clone)]
491pub struct NoteInfo {
492 /// Note header
493 pub header: NoteHeader,
494 /// Note details, can be encrypted
495 pub details_bytes: Vec<u8>,
496 /// Sender-provided block hint: the block from which the recipient should start scanning for
497 /// the note's on-chain commitment, instead of applying its default lookback window. `None`
498 /// when the sender did not provide a hint.
499 pub block_hint: Option<BlockNumber>,
500}
501
502impl NoteInfo {
503 /// Build a [`NoteInfo`] without a block hint (`block_hint` is `None`).
504 ///
505 /// Use the [`NoteInfo::block_hint`] field directly to attach a hint.
506 pub fn new(header: NoteHeader, details_bytes: Vec<u8>) -> Self {
507 Self { header, details_bytes, block_hint: None }
508 }
509}
510
511// SERIALIZATION
512// ================================================================================================
513
514impl Serializable for NoteInfo {
515 fn write_into<W: ByteWriter>(&self, target: &mut W) {
516 self.header.write_into(target);
517 self.details_bytes.write_into(target);
518 self.block_hint.write_into(target);
519 }
520}
521
522impl Deserializable for NoteInfo {
523 fn read_from<R: ByteReader>(source: &mut R) -> Result<Self, DeserializationError> {
524 let header = NoteHeader::read_from(source)?;
525 let details_bytes = Vec::<u8>::read_from(source)?;
526 let block_hint = Option::<BlockNumber>::read_from(source)?;
527 Ok(NoteInfo { header, details_bytes, block_hint })
528 }
529}
530
531// TODO: remove once #2265 is ported to `next`. Pre-`block_hint` on-disk layout of [`NoteInfo`]
532// (header + details only); used only by `load_relay_outbox` to recover blobs written by a
533// pre-0.15.2 client.
534struct LegacyNoteInfo {
535 header: NoteHeader,
536 details_bytes: Vec<u8>,
537}
538
539impl Deserializable for LegacyNoteInfo {
540 fn read_from<R: ByteReader>(source: &mut R) -> Result<Self, DeserializationError> {
541 let header = NoteHeader::read_from(source)?;
542 let details_bytes = Vec::<u8>::read_from(source)?;
543 Ok(LegacyNoteInfo { header, details_bytes })
544 }
545}
546
547impl Serializable for NoteTransportCursor {
548 fn write_into<W: ByteWriter>(&self, target: &mut W) {
549 self.0.write_into(target);
550 }
551}
552
553impl Deserializable for NoteTransportCursor {
554 fn read_from<R: ByteReader>(source: &mut R) -> Result<Self, DeserializationError> {
555 let value = u64::read_from(source)?;
556 Ok(Self::new(value))
557 }
558}
559
560fn rejoin_note(header: &NoteHeader, details_bytes: &[u8]) -> Result<Note, DeserializationError> {
561 let mut reader = SliceReader::new(details_bytes);
562 let details = NoteDetails::read_from(&mut reader)?;
563 // The transport wire format only carries `NoteHeader` + serialized `NoteDetails`, not the
564 // attachments collection. We rejoin with empty attachments; this matches the original note
565 // only when it had no attachments in the first place.
566 let partial_metadata = *header.metadata().partial_metadata();
567 Ok(Note::new(
568 details.assets().clone(),
569 partial_metadata,
570 details.recipient().clone(),
571 ))
572}