Skip to main content

miden_client/note_transport/
mod.rs

1pub mod errors;
2pub mod generated;
3#[cfg(feature = "tonic")]
4pub mod grpc;
5
6use alloc::boxed::Box;
7use alloc::collections::BTreeMap;
8use alloc::string::String;
9use alloc::sync::Arc;
10use alloc::vec::Vec;
11
12use futures::Stream;
13use miden_protocol::address::Address;
14use miden_protocol::block::BlockNumber;
15use miden_protocol::note::{
16    Note,
17    NoteDetails,
18    NoteDetailsCommitment,
19    NoteFile,
20    NoteHeader,
21    NoteId,
22    NoteTag,
23};
24use miden_protocol::utils::serde::Serializable;
25use miden_tx::auth::TransactionAuthenticator;
26use miden_tx::utils::serde::{
27    ByteReader,
28    ByteWriter,
29    Deserializable,
30    DeserializationError,
31    SliceReader,
32};
33
34pub use self::errors::NoteTransportError;
35use crate::{Client, ClientError};
36
37pub const NOTE_TRANSPORT_TESTNET_ENDPOINT: &str = "https://transport.miden.io";
38pub const NOTE_TRANSPORT_DEVNET_ENDPOINT: &str = "https://transport.devnet.miden.io";
39pub const NOTE_TRANSPORT_CURSOR_STORE_SETTING: &str = "note_transport_cursor";
40
41/// Settings key for the durable relay outbox: a serialized `Vec<NoteInfo>` of
42/// private notes whose transport delivery has not yet succeeded.
43/// `send_private_note` appends (replacing any entry with the same note id)
44/// before relaying; [`Client::flush_relay_outbox`] drains entries that re-send
45/// successfully. Reusing the settings k/v avoids a Store-trait schema change
46/// while surviving process restarts.
47pub const NOTE_TRANSPORT_OUTBOX_KEY: &str = "note_transport_outbox";
48
49/// Client note transport methods.
50impl<AUTH> Client<AUTH> {
51    /// Check if note transport connection is configured
52    pub fn is_note_transport_enabled(&self) -> bool {
53        self.note_transport_api.is_some()
54    }
55
56    /// Returns the Note Transport client
57    ///
58    /// Errors if the note transport is not configured.
59    pub(crate) fn get_note_transport_api(
60        &self,
61    ) -> Result<Arc<dyn NoteTransportClient>, NoteTransportError> {
62        self.note_transport_api.clone().ok_or(NoteTransportError::Disabled)
63    }
64
65    /// Send a note through the note transport network.
66    ///
67    /// The note will be end-to-end encrypted (unimplemented, currently plaintext)
68    /// using the provided recipient's `address` details.
69    /// The recipient will be able to retrieve this note through the note's [`NoteTag`].
70    ///
71    /// **Durability.** The relay payload is persisted to the outbox before the
72    /// transport call. If the call fails or is interrupted, the entry stays in
73    /// the outbox and is retried on the next [`Client::flush_relay_outbox`]
74    /// (which [`Client::sync_note_transport`] runs), so a transient transport
75    /// failure does not drop the note. The receiver dedupes by note id, so a
76    /// re-send after a partial success is harmless.
77    ///
78    /// Prefer [`Client::send_private_note_with_block_hint`], which also relays a block hint so the
79    /// recipient gets deterministic delivery instead of relying on its lookback heuristic.
80    #[deprecated(
81        since = "0.15.2",
82        note = "use `Client::send_private_note_with_block_hint` to relay a block hint for deterministic delivery"
83    )]
84    pub async fn send_private_note(
85        &mut self,
86        note: Note,
87        address: &Address,
88    ) -> Result<(), ClientError> {
89        self.relay_private_note(note, address, None).await
90    }
91
92    /// Send a note through the note transport network, relaying a block hint to the recipient.
93    ///
94    /// `block_hint` is the block from which the recipient should start scanning for the note's
95    /// on-chain commitment, instead of relying on its lookback heuristic. Any block at or before
96    /// the commitment is correct, and the chain tip at send time is a safe choice. A tighter value
97    /// just means less for the recipient to scan.
98    ///
99    /// The same durability guarantees as [`Client::send_private_note`] apply: the hint is
100    /// persisted with the relay payload, so a retried send preserves it.
101    pub async fn send_private_note_with_block_hint(
102        &mut self,
103        note: Note,
104        address: &Address,
105        block_hint: BlockNumber,
106    ) -> Result<(), ClientError> {
107        self.relay_private_note(note, address, Some(block_hint)).await
108    }
109
110    /// Shared relay path for [`Client::send_private_note`] and
111    /// [`Client::send_private_note_with_block_hint`]. `block_hint` is the optional block from which
112    /// the recipient should start scanning for the note's commitment.
113    async fn relay_private_note(
114        &self,
115        note: Note,
116        _address: &Address,
117        block_hint: Option<BlockNumber>,
118    ) -> Result<(), ClientError> {
119        let api = self.get_note_transport_api()?;
120
121        let header = *note.header();
122        let note_id = header.id();
123        let details = NoteDetails::from(note);
124        let details_bytes = details.to_bytes();
125        // e2ee impl hint:
126        // address.key().encrypt(details_bytes)
127
128        // Persist the payload before the network call so a failed or
129        // interrupted `send_note` leaves a recoverable record rather than
130        // losing the only copy with the call frame. The hint travels with the
131        // entry so a retried send relays the same value.
132        let entry = NoteInfo {
133            header,
134            details_bytes: details_bytes.clone(),
135            block_hint,
136        };
137        let mut outbox = self.load_relay_outbox().await?;
138        // Replace any existing entry for this note id so the latest payload
139        // wins when a still-pending note is re-sent.
140        outbox.retain(|e| e.header.id() != note_id);
141        outbox.push(entry);
142        self.save_relay_outbox(outbox).await?;
143
144        // Dispatch to the hint-carrying API only when a hint is present, otherwise use the plain
145        // `send_note`. The transport exposes a separate method per scenario.
146        match block_hint {
147            Some(block_hint) => {
148                api.send_note_with_block_hint(header, details_bytes, block_hint).await?;
149            },
150            None => {
151                api.send_note(header, details_bytes).await?;
152            },
153        }
154
155        // Relay succeeded — drop the entry. A failed store write here is
156        // tolerable: the next flush re-sends and the receiver dedupes by note
157        // id, so a stale entry never causes loss.
158        let mut outbox = self.load_relay_outbox().await?;
159        outbox.retain(|e| e.header.id() != note_id);
160        self.save_relay_outbox(outbox).await?;
161
162        Ok(())
163    }
164
165    /// Re-attempt every relay payload in the durable outbox. Each entry is a
166    /// private note whose previous transport delivery failed. Successful
167    /// re-sends are dropped; failures are kept for the next call. Every entry
168    /// is attempted independently, so one persistently-failing note does not
169    /// block the others.
170    ///
171    /// [`Client::sync_note_transport`] runs this automatically and ignores its
172    /// error, so a relay failure can't block a sync. Callers driving retries
173    /// themselves can invoke it directly and inspect the returned error.
174    pub async fn flush_relay_outbox(&self) -> Result<(), ClientError> {
175        let api = self.get_note_transport_api()?;
176
177        let entries = self.load_relay_outbox().await?;
178        if entries.is_empty() {
179            return Ok(());
180        }
181
182        // Attempt every entry independently so a single persistently-failing
183        // note can't block the rest. The outbox holds only the caller's own
184        // failed sends, so it stays small and this is not a meaningful burst.
185        let mut remaining = Vec::new();
186        let mut last_err: Option<NoteTransportError> = None;
187
188        for entry in entries {
189            let relayed = match entry.block_hint {
190                Some(block_hint) => {
191                    api.send_note_with_block_hint(
192                        entry.header,
193                        entry.details_bytes.clone(),
194                        block_hint,
195                    )
196                    .await
197                },
198                None => api.send_note(entry.header, entry.details_bytes.clone()).await,
199            };
200            match relayed {
201                Ok(()) => {},
202                Err(err) => {
203                    tracing::warn!(?err, "relay-outbox entry retry failed; will retry next sync");
204                    remaining.push(entry);
205                    last_err = Some(err);
206                },
207            }
208        }
209
210        self.save_relay_outbox(remaining).await?;
211
212        if let Some(err) = last_err {
213            return Err(err.into());
214        }
215        Ok(())
216    }
217
218    /// Load the durable relay outbox.
219    ///
220    /// Returns an empty `Vec` if the outbox key is absent. On deserialization
221    /// failure (schema mismatch or storage corruption) the entry is dropped and
222    /// an empty `Vec` is returned — leaving unreadable bytes in place would
223    /// block every subsequent relay because each sync would re-read them.
224    async fn load_relay_outbox(&self) -> Result<Vec<NoteInfo>, ClientError> {
225        let bytes = self
226            .store
227            .get_setting(String::from(NOTE_TRANSPORT_OUTBOX_KEY))
228            .await
229            .map_err(ClientError::StoreError)?;
230        let Some(bytes) = bytes else {
231            return Ok(Vec::new());
232        };
233        match Vec::<NoteInfo>::read_from_bytes(&bytes) {
234            Ok(entries) => Ok(entries),
235            Err(err) => {
236                // TODO: remove once #2265 is ported to `next`. Recover a pre-`block_hint`
237                // outbox blob via the legacy (no-hint) layout so a pending relay survives upgrade.
238                if let Ok(legacy) = Vec::<LegacyNoteInfo>::read_from_bytes(&bytes) {
239                    return Ok(legacy
240                        .into_iter()
241                        .map(|note| NoteInfo::new(note.header, note.details_bytes))
242                        .collect());
243                }
244                tracing::warn!(?err, "dropping unreadable relay outbox; resetting to empty");
245                self.store
246                    .remove_setting(String::from(NOTE_TRANSPORT_OUTBOX_KEY))
247                    .await
248                    .map_err(ClientError::StoreError)?;
249                Ok(Vec::new())
250            },
251        }
252    }
253
254    /// Persist the relay outbox, removing the key entirely when empty so the
255    /// settings table doesn't accumulate empty-vec blobs.
256    async fn save_relay_outbox(&self, entries: Vec<NoteInfo>) -> Result<(), ClientError> {
257        let key = String::from(NOTE_TRANSPORT_OUTBOX_KEY);
258        if entries.is_empty() {
259            return self.store.remove_setting(key).await.map_err(ClientError::StoreError);
260        }
261        let bytes = entries.to_bytes();
262        self.store.set_setting(key, bytes).await.map_err(ClientError::StoreError)
263    }
264}
265
266impl<AUTH> Client<AUTH>
267where
268    AUTH: TransactionAuthenticator + Sync + 'static,
269{
270    /// Fetch notes for tracked note tags.
271    ///
272    /// The client will query the configured note transport node for all tracked note tags.
273    /// To list tracked tags please use [`Client::get_note_tags`]. To add a new note tag please use
274    /// [`Client::add_note_tag`].
275    /// Only notes directed at your addresses will be stored and readable given the use of
276    /// end-to-end encryption (unimplemented).
277    /// Fetched notes will be stored into the client's store.
278    ///
279    /// An internal pagination mechanism is employed to reduce the number of downloaded notes.
280    /// To fetch the full history of private notes for the tracked tags, use
281    /// [`Client::fetch_all_private_notes`].
282    pub async fn fetch_private_notes(&mut self) -> Result<(), ClientError> {
283        let note_tags: Vec<NoteTag> =
284            self.store.get_unique_note_tags().await?.into_iter().collect();
285        let cursor = self.store.get_note_transport_cursor().await?;
286
287        let (_, new_cursor) = self.fetch_transport_notes(cursor, &note_tags).await?;
288        self.store.update_note_transport_cursor(new_cursor).await?;
289
290        Ok(())
291    }
292
293    /// Fetches all notes for tracked note tags, draining the server's paginated
294    /// response by looping until the cursor stops advancing.
295    ///
296    /// Similar to [`Client::fetch_private_notes`] but ignores the stored
297    /// pagination cursor and re-scans from the beginning. The server-side
298    /// transport caps each response at a fixed batch size; this method issues
299    /// repeated fetch calls until one returns the same cursor it was given
300    /// (i.e. no new notes), so the documented "fetches all notes" semantics
301    /// hold regardless of how large the backlog is. Prefer
302    /// [`Client::fetch_private_notes`] for steady-state syncing to avoid
303    /// re-downloading already-seen notes.
304    pub async fn fetch_all_private_notes(&mut self) -> Result<(), ClientError> {
305        // Safety cap on a misbehaving server. At 500 notes per batch, 1000
306        // iterations covers 500k notes — well beyond any plausible retention
307        // window — and bounds the worst-case wall-clock at ~50s at 50ms/req.
308        // Hitting this signals a server bug, not an honest backlog.
309        const MAX_ITERATIONS: usize = 1_000;
310
311        let note_tags: Vec<NoteTag> =
312            self.store.get_unique_note_tags().await?.into_iter().collect();
313        // Snapshot the stored cursor up front so we can advance (never regress)
314        // it after the drain. Without this guard, starting the drain at
315        // `init()` and persisting per-batch would clobber a previously
316        // advanced cursor with the small `rcursor` of the first batch.
317        let stored_cursor = self.store.get_note_transport_cursor().await?;
318
319        let mut cursor = NoteTransportCursor::init();
320        for _ in 0..MAX_ITERATIONS {
321            let (_, new_cursor) = self.fetch_transport_notes(cursor, &note_tags).await?;
322            // Terminate on any lack of forward progress. A well-behaved server
323            // returns `new_cursor == cursor` when there are no new notes (since
324            // `rcursor = max(cursor, max_seq_returned)`); using `<=` here also
325            // handles implementations that return an `init()` cursor on empty
326            // batches (see the in-tree mock transport).
327            if new_cursor <= cursor {
328                let final_cursor = core::cmp::max(cursor, stored_cursor);
329                self.store.update_note_transport_cursor(final_cursor).await?;
330                return Ok(());
331            }
332            cursor = new_cursor;
333        }
334
335        Err(ClientError::NoteTransportError(NoteTransportError::PaginationDidNotTerminate(
336            MAX_ITERATIONS,
337        )))
338    }
339
340    /// Fetch one batch of notes from the note transport network for the provided tags.
341    ///
342    /// The server paginates; this method issues one RPC and returns the imported details
343    /// commitments together with the new cursor. The returned cursor equals the input cursor when
344    /// the batch was empty (i.e. no new notes). Callers that want to drain the full backlog should
345    /// loop until `new_cursor == cursor` (see [`Client::fetch_all_private_notes`]). Callers that do
346    /// steady-state polling (see [`Client::sync_state`] / [`Client::fetch_private_notes`]) should
347    /// call this once per tick with the stored cursor.
348    ///
349    /// Downloaded notes are imported into the local store. Persistence of the returned cursor is
350    /// left to the caller so that drain loops can guard against regression of an already-advanced
351    /// stored cursor.
352    pub(crate) async fn fetch_transport_notes(
353        &mut self,
354        cursor: NoteTransportCursor,
355        tags: &[NoteTag],
356    ) -> Result<(Vec<NoteId>, NoteTransportCursor), ClientError> {
357        // Fallback lookback window, in blocks, used only for notes the transport delivered
358        // without a sender-provided block hint. Scanning back from sync height handles
359        // the race where a note is committed on-chain just before the NTL delivers its data.
360        // Without it, check_expected_notes would scan from sync_height forward and miss the
361        // already-committed note. A sender-provided hint is deterministic and always preferred.
362        const NOTE_LOOKBACK_BLOCKS: u32 = 20;
363
364        let mut notes = Vec::new();
365        let (note_infos, rcursor) =
366            self.get_note_transport_api()?.fetch_notes(tags, cursor).await?;
367        for note_info in &note_infos {
368            // e2ee impl hint:
369            // for key in self.store.decryption_keys() try
370            // key.decrypt(details_bytes_encrypted)
371            let note = rejoin_note(&note_info.header, &note_info.details_bytes)?;
372            notes.push((note, note_info.block_hint));
373        }
374
375        let sync_height = self.get_sync_height().await?;
376        let fallback_after_block_num =
377            BlockNumber::from(sync_height.as_u32().saturating_sub(NOTE_LOOKBACK_BLOCKS));
378
379        let id_by_commitment: BTreeMap<NoteDetailsCommitment, NoteId> =
380            notes.iter().map(|(note, _)| (note.details_commitment(), note.id())).collect();
381
382        let mut note_requests = Vec::with_capacity(notes.len());
383        for (note, block_hint) in notes {
384            let tag = note.metadata().tag();
385            // Prefer the sender-provided hint, falling back to the lookback window when absent.
386            let after_block_num = block_hint.unwrap_or(fallback_after_block_num);
387            let note_file = NoteFile::NoteDetails {
388                details: note.into(),
389                after_block_num,
390                tag: Some(tag),
391            };
392            note_requests.push(note_file);
393        }
394        let imported_commitments = self.import_notes(&note_requests).await?;
395        let imported_ids = imported_commitments
396            .into_iter()
397            .filter_map(|commitment| id_by_commitment.get(&commitment).copied())
398            .collect();
399
400        Ok((imported_ids, rcursor))
401    }
402}
403
404/// Note transport cursor
405///
406/// Pagination integer used to reduce the number of fetched notes from the note transport network,
407/// avoiding duplicate downloads.
408#[derive(Clone, Copy, Debug, PartialEq, PartialOrd, Eq, Ord)]
409pub struct NoteTransportCursor(u64);
410
411/// Note Transport update
412pub struct NoteTransportUpdate {
413    /// Pagination cursor for next fetch
414    pub cursor: NoteTransportCursor,
415    /// Fetched notes
416    pub notes: Vec<Note>,
417}
418
419impl NoteTransportCursor {
420    pub fn new(value: u64) -> Self {
421        Self(value)
422    }
423
424    pub fn init() -> Self {
425        Self::new(0)
426    }
427
428    pub fn value(&self) -> u64 {
429        self.0
430    }
431}
432
433impl From<u64> for NoteTransportCursor {
434    fn from(value: u64) -> Self {
435        Self::new(value)
436    }
437}
438
439/// The main transport client trait for sending and receiving encrypted notes
440#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
441#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
442pub trait NoteTransportClient: Send + Sync {
443    /// Send a note with optionally encrypted details
444    async fn send_note(
445        &self,
446        header: NoteHeader,
447        details: Vec<u8>,
448    ) -> Result<(), NoteTransportError>;
449
450    /// Send a note, relaying a block hint for the recipient's commitment scan.
451    ///
452    /// `block_hint` is the block from which the recipient should start scanning for the
453    /// note's commitment. The default implementation ignores it and delegates to
454    /// [`NoteTransportClient::send_note`], so existing implementors keep compiling. Transports
455    /// that can carry the hint (e.g. the gRPC client) override this.
456    async fn send_note_with_block_hint(
457        &self,
458        header: NoteHeader,
459        details: Vec<u8>,
460        _block_hint: BlockNumber,
461    ) -> Result<(), NoteTransportError> {
462        self.send_note(header, details).await
463    }
464
465    /// Fetch notes for given tags
466    ///
467    /// Downloads notes for given tags.
468    /// Returns notes labelled after the provided cursor (pagination), and an updated cursor.
469    async fn fetch_notes(
470        &self,
471        tag: &[NoteTag],
472        cursor: NoteTransportCursor,
473    ) -> Result<(Vec<NoteInfo>, NoteTransportCursor), NoteTransportError>;
474
475    /// Stream notes for a given tag
476    async fn stream_notes(
477        &self,
478        tag: NoteTag,
479        cursor: NoteTransportCursor,
480    ) -> Result<Box<dyn NoteStream>, NoteTransportError>;
481}
482
483/// Stream trait for note streaming
484pub trait NoteStream:
485    Stream<Item = Result<Vec<NoteInfo>, NoteTransportError>> + Send + Unpin
486{
487}
488
489/// Information about a note fetched from the note transport network
490#[derive(Debug, Clone)]
491pub struct NoteInfo {
492    /// Note header
493    pub header: NoteHeader,
494    /// Note details, can be encrypted
495    pub details_bytes: Vec<u8>,
496    /// Sender-provided block hint: the block from which the recipient should start scanning for
497    /// the note's on-chain commitment, instead of applying its default lookback window. `None`
498    /// when the sender did not provide a hint.
499    pub block_hint: Option<BlockNumber>,
500}
501
502impl NoteInfo {
503    /// Build a [`NoteInfo`] without a block hint (`block_hint` is `None`).
504    ///
505    /// Use the [`NoteInfo::block_hint`] field directly to attach a hint.
506    pub fn new(header: NoteHeader, details_bytes: Vec<u8>) -> Self {
507        Self { header, details_bytes, block_hint: None }
508    }
509}
510
511// SERIALIZATION
512// ================================================================================================
513
514impl Serializable for NoteInfo {
515    fn write_into<W: ByteWriter>(&self, target: &mut W) {
516        self.header.write_into(target);
517        self.details_bytes.write_into(target);
518        self.block_hint.write_into(target);
519    }
520}
521
522impl Deserializable for NoteInfo {
523    fn read_from<R: ByteReader>(source: &mut R) -> Result<Self, DeserializationError> {
524        let header = NoteHeader::read_from(source)?;
525        let details_bytes = Vec::<u8>::read_from(source)?;
526        let block_hint = Option::<BlockNumber>::read_from(source)?;
527        Ok(NoteInfo { header, details_bytes, block_hint })
528    }
529}
530
531// TODO: remove once #2265 is ported to `next`. Pre-`block_hint` on-disk layout of [`NoteInfo`]
532// (header + details only); used only by `load_relay_outbox` to recover blobs written by a
533// pre-0.15.2 client.
534struct LegacyNoteInfo {
535    header: NoteHeader,
536    details_bytes: Vec<u8>,
537}
538
539impl Deserializable for LegacyNoteInfo {
540    fn read_from<R: ByteReader>(source: &mut R) -> Result<Self, DeserializationError> {
541        let header = NoteHeader::read_from(source)?;
542        let details_bytes = Vec::<u8>::read_from(source)?;
543        Ok(LegacyNoteInfo { header, details_bytes })
544    }
545}
546
547impl Serializable for NoteTransportCursor {
548    fn write_into<W: ByteWriter>(&self, target: &mut W) {
549        self.0.write_into(target);
550    }
551}
552
553impl Deserializable for NoteTransportCursor {
554    fn read_from<R: ByteReader>(source: &mut R) -> Result<Self, DeserializationError> {
555        let value = u64::read_from(source)?;
556        Ok(Self::new(value))
557    }
558}
559
560fn rejoin_note(header: &NoteHeader, details_bytes: &[u8]) -> Result<Note, DeserializationError> {
561    let mut reader = SliceReader::new(details_bytes);
562    let details = NoteDetails::read_from(&mut reader)?;
563    // The transport wire format only carries `NoteHeader` + serialized `NoteDetails`, not the
564    // attachments collection. We rejoin with empty attachments; this matches the original note
565    // only when it had no attachments in the first place.
566    let partial_metadata = *header.metadata().partial_metadata();
567    Ok(Note::new(
568        details.assets().clone(),
569        partial_metadata,
570        details.recipient().clone(),
571    ))
572}