Skip to main content

miden_client/note_transport/
mod.rs

1pub mod errors;
2pub mod generated;
3#[cfg(feature = "tonic")]
4pub mod grpc;
5
6use alloc::boxed::Box;
7use alloc::collections::BTreeMap;
8use alloc::string::String;
9use alloc::sync::Arc;
10use alloc::vec::Vec;
11
12use futures::Stream;
13use miden_protocol::address::Address;
14use miden_protocol::block::BlockNumber;
15use miden_protocol::note::{
16    Note,
17    NoteDetails,
18    NoteDetailsCommitment,
19    NoteFile,
20    NoteHeader,
21    NoteId,
22    NoteTag,
23};
24use miden_protocol::utils::serde::Serializable;
25use miden_tx::auth::TransactionAuthenticator;
26use miden_tx::utils::serde::{
27    ByteReader,
28    ByteWriter,
29    Deserializable,
30    DeserializationError,
31    SliceReader,
32};
33
34pub use self::errors::NoteTransportError;
35use crate::{Client, ClientError};
36
37pub const NOTE_TRANSPORT_TESTNET_ENDPOINT: &str = "https://transport.miden.io";
38pub const NOTE_TRANSPORT_DEVNET_ENDPOINT: &str = "https://transport.devnet.miden.io";
39pub const NOTE_TRANSPORT_CURSOR_STORE_SETTING: &str = "note_transport_cursor";
40
41/// Settings key for the durable relay outbox: a serialized `Vec<NoteInfo>` of
42/// private notes whose transport delivery has not yet succeeded.
43/// `send_private_note` appends (replacing any entry with the same note id)
44/// before relaying; [`Client::flush_relay_outbox`] drains entries that re-send
45/// successfully. Reusing the settings k/v avoids a Store-trait schema change
46/// while surviving process restarts.
47pub const NOTE_TRANSPORT_OUTBOX_KEY: &str = "note_transport_outbox";
48
49/// Client note transport methods.
50impl<AUTH> Client<AUTH> {
51    /// Check if note transport connection is configured
52    pub fn is_note_transport_enabled(&self) -> bool {
53        self.note_transport_api.is_some()
54    }
55
56    /// Returns the Note Transport client
57    ///
58    /// Errors if the note transport is not configured.
59    pub(crate) fn get_note_transport_api(
60        &self,
61    ) -> Result<Arc<dyn NoteTransportClient>, NoteTransportError> {
62        self.note_transport_api.clone().ok_or(NoteTransportError::Disabled)
63    }
64
65    /// Send a note through the note transport network.
66    ///
67    /// The note will be end-to-end encrypted (unimplemented, currently plaintext)
68    /// using the provided recipient's `address` details.
69    /// The recipient will be able to retrieve this note through the note's [`NoteTag`].
70    ///
71    /// **Durability.** The relay payload is persisted to the outbox before the
72    /// transport call. If the call fails or is interrupted, the entry stays in
73    /// the outbox and is retried on the next [`Client::flush_relay_outbox`]
74    /// (which [`Client::sync_note_transport`] runs), so a transient transport
75    /// failure does not drop the note. The receiver dedupes by note id, so a
76    /// re-send after a partial success is harmless.
77    pub async fn send_private_note(
78        &mut self,
79        note: Note,
80        _address: &Address,
81    ) -> Result<(), ClientError> {
82        let api = self.get_note_transport_api()?;
83
84        let header = *note.header();
85        let note_id = header.id();
86        let details = NoteDetails::from(note);
87        let details_bytes = details.to_bytes();
88        // e2ee impl hint:
89        // address.key().encrypt(details_bytes)
90
91        // Persist the payload before the network call so a failed or
92        // interrupted `send_note` leaves a recoverable record rather than
93        // losing the only copy with the call frame.
94        let entry = NoteInfo {
95            header,
96            details_bytes: details_bytes.clone(),
97        };
98        let mut outbox = self.load_relay_outbox().await?;
99        // Replace any existing entry for this note id so the latest payload
100        // wins when a still-pending note is re-sent.
101        outbox.retain(|e| e.header.id() != note_id);
102        outbox.push(entry);
103        self.save_relay_outbox(outbox).await?;
104
105        api.send_note(header, details_bytes).await?;
106
107        // Relay succeeded — drop the entry. A failed store write here is
108        // tolerable: the next flush re-sends and the receiver dedupes by note
109        // id, so a stale entry never causes loss.
110        let mut outbox = self.load_relay_outbox().await?;
111        outbox.retain(|e| e.header.id() != note_id);
112        self.save_relay_outbox(outbox).await?;
113
114        Ok(())
115    }
116
117    /// Re-attempt every relay payload in the durable outbox. Each entry is a
118    /// private note whose previous transport delivery failed. Successful
119    /// re-sends are dropped; failures are kept for the next call. Every entry
120    /// is attempted independently, so one persistently-failing note does not
121    /// block the others.
122    ///
123    /// [`Client::sync_note_transport`] runs this automatically and ignores its
124    /// error, so a relay failure can't block a sync. Callers driving retries
125    /// themselves can invoke it directly and inspect the returned error.
126    pub async fn flush_relay_outbox(&self) -> Result<(), ClientError> {
127        let api = self.get_note_transport_api()?;
128
129        let entries = self.load_relay_outbox().await?;
130        if entries.is_empty() {
131            return Ok(());
132        }
133
134        // Attempt every entry independently so a single persistently-failing
135        // note can't block the rest. The outbox holds only the caller's own
136        // failed sends, so it stays small and this is not a meaningful burst.
137        let mut remaining = Vec::new();
138        let mut last_err: Option<NoteTransportError> = None;
139
140        for entry in entries {
141            match api.send_note(entry.header, entry.details_bytes.clone()).await {
142                Ok(()) => {},
143                Err(err) => {
144                    tracing::warn!(?err, "relay-outbox entry retry failed; will retry next sync");
145                    remaining.push(entry);
146                    last_err = Some(err);
147                },
148            }
149        }
150
151        self.save_relay_outbox(remaining).await?;
152
153        if let Some(err) = last_err {
154            return Err(err.into());
155        }
156        Ok(())
157    }
158
159    /// Load the durable relay outbox.
160    ///
161    /// Returns an empty `Vec` if the outbox key is absent. On deserialization
162    /// failure (schema mismatch or storage corruption) the entry is dropped and
163    /// an empty `Vec` is returned — leaving unreadable bytes in place would
164    /// block every subsequent relay because each sync would re-read them.
165    async fn load_relay_outbox(&self) -> Result<Vec<NoteInfo>, ClientError> {
166        let bytes = self
167            .store
168            .get_setting(String::from(NOTE_TRANSPORT_OUTBOX_KEY))
169            .await
170            .map_err(ClientError::StoreError)?;
171        let Some(bytes) = bytes else {
172            return Ok(Vec::new());
173        };
174        match Vec::<NoteInfo>::read_from_bytes(&bytes) {
175            Ok(entries) => Ok(entries),
176            Err(err) => {
177                tracing::warn!(?err, "dropping unreadable relay outbox; resetting to empty");
178                self.store
179                    .remove_setting(String::from(NOTE_TRANSPORT_OUTBOX_KEY))
180                    .await
181                    .map_err(ClientError::StoreError)?;
182                Ok(Vec::new())
183            },
184        }
185    }
186
187    /// Persist the relay outbox, removing the key entirely when empty so the
188    /// settings table doesn't accumulate empty-vec blobs.
189    async fn save_relay_outbox(&self, entries: Vec<NoteInfo>) -> Result<(), ClientError> {
190        let key = String::from(NOTE_TRANSPORT_OUTBOX_KEY);
191        if entries.is_empty() {
192            return self.store.remove_setting(key).await.map_err(ClientError::StoreError);
193        }
194        let bytes = entries.to_bytes();
195        self.store.set_setting(key, bytes).await.map_err(ClientError::StoreError)
196    }
197}
198
199impl<AUTH> Client<AUTH>
200where
201    AUTH: TransactionAuthenticator + Sync + 'static,
202{
203    /// Fetch notes for tracked note tags.
204    ///
205    /// The client will query the configured note transport node for all tracked note tags.
206    /// To list tracked tags please use [`Client::get_note_tags`]. To add a new note tag please use
207    /// [`Client::add_note_tag`].
208    /// Only notes directed at your addresses will be stored and readable given the use of
209    /// end-to-end encryption (unimplemented).
210    /// Fetched notes will be stored into the client's store.
211    ///
212    /// An internal pagination mechanism is employed to reduce the number of downloaded notes.
213    /// To fetch the full history of private notes for the tracked tags, use
214    /// [`Client::fetch_all_private_notes`].
215    pub async fn fetch_private_notes(&mut self) -> Result<(), ClientError> {
216        let note_tags: Vec<NoteTag> =
217            self.store.get_unique_note_tags().await?.into_iter().collect();
218        let cursor = self.store.get_note_transport_cursor().await?;
219
220        let (_, new_cursor) = self.fetch_transport_notes(cursor, &note_tags).await?;
221        self.store.update_note_transport_cursor(new_cursor).await?;
222
223        Ok(())
224    }
225
226    /// Fetches all notes for tracked note tags, draining the server's paginated
227    /// response by looping until the cursor stops advancing.
228    ///
229    /// Similar to [`Client::fetch_private_notes`] but ignores the stored
230    /// pagination cursor and re-scans from the beginning. The server-side
231    /// transport caps each response at a fixed batch size; this method issues
232    /// repeated fetch calls until one returns the same cursor it was given
233    /// (i.e. no new notes), so the documented "fetches all notes" semantics
234    /// hold regardless of how large the backlog is. Prefer
235    /// [`Client::fetch_private_notes`] for steady-state syncing to avoid
236    /// re-downloading already-seen notes.
237    pub async fn fetch_all_private_notes(&mut self) -> Result<(), ClientError> {
238        // Safety cap on a misbehaving server. At 500 notes per batch, 1000
239        // iterations covers 500k notes — well beyond any plausible retention
240        // window — and bounds the worst-case wall-clock at ~50s at 50ms/req.
241        // Hitting this signals a server bug, not an honest backlog.
242        const MAX_ITERATIONS: usize = 1_000;
243
244        let note_tags: Vec<NoteTag> =
245            self.store.get_unique_note_tags().await?.into_iter().collect();
246        // Snapshot the stored cursor up front so we can advance (never regress)
247        // it after the drain. Without this guard, starting the drain at
248        // `init()` and persisting per-batch would clobber a previously
249        // advanced cursor with the small `rcursor` of the first batch.
250        let stored_cursor = self.store.get_note_transport_cursor().await?;
251
252        let mut cursor = NoteTransportCursor::init();
253        for _ in 0..MAX_ITERATIONS {
254            let (_, new_cursor) = self.fetch_transport_notes(cursor, &note_tags).await?;
255            // Terminate on any lack of forward progress. A well-behaved server
256            // returns `new_cursor == cursor` when there are no new notes (since
257            // `rcursor = max(cursor, max_seq_returned)`); using `<=` here also
258            // handles implementations that return an `init()` cursor on empty
259            // batches (see the in-tree mock transport).
260            if new_cursor <= cursor {
261                let final_cursor = core::cmp::max(cursor, stored_cursor);
262                self.store.update_note_transport_cursor(final_cursor).await?;
263                return Ok(());
264            }
265            cursor = new_cursor;
266        }
267
268        Err(ClientError::NoteTransportError(NoteTransportError::PaginationDidNotTerminate(
269            MAX_ITERATIONS,
270        )))
271    }
272
273    /// Fetch one batch of notes from the note transport network for the provided tags.
274    ///
275    /// The server paginates; this method issues one RPC and returns the imported details
276    /// commitments together with the new cursor. The returned cursor equals the input cursor when
277    /// the batch was empty (i.e. no new notes). Callers that want to drain the full backlog should
278    /// loop until `new_cursor == cursor` (see [`Client::fetch_all_private_notes`]). Callers that do
279    /// steady-state polling (see [`Client::sync_state`] / [`Client::fetch_private_notes`]) should
280    /// call this once per tick with the stored cursor.
281    ///
282    /// Downloaded notes are imported into the local store. Persistence of the returned cursor is
283    /// left to the caller so that drain loops can guard against regression of an already-advanced
284    /// stored cursor.
285    pub(crate) async fn fetch_transport_notes(
286        &mut self,
287        cursor: NoteTransportCursor,
288        tags: &[NoteTag],
289    ) -> Result<(Vec<NoteId>, NoteTransportCursor), ClientError> {
290        // Number of blocks to look back from sync height when scanning for committed notes.
291        // Handles the race where a note is committed on-chain just before the NTL delivers
292        // its data — without this, check_expected_notes would scan from sync_height forward
293        // and miss the already-committed note.
294        const NOTE_LOOKBACK_BLOCKS: u32 = 20;
295
296        let mut notes = Vec::new();
297        let (note_infos, rcursor) =
298            self.get_note_transport_api()?.fetch_notes(tags, cursor).await?;
299        for note_info in &note_infos {
300            // e2ee impl hint:
301            // for key in self.store.decryption_keys() try
302            // key.decrypt(details_bytes_encrypted)
303            let note = rejoin_note(&note_info.header, &note_info.details_bytes)?;
304            notes.push(note);
305        }
306
307        let sync_height = self.get_sync_height().await?;
308        let after_block_num =
309            BlockNumber::from(sync_height.as_u32().saturating_sub(NOTE_LOOKBACK_BLOCKS));
310
311        let id_by_commitment: BTreeMap<NoteDetailsCommitment, NoteId> =
312            notes.iter().map(|note| (note.details_commitment(), note.id())).collect();
313
314        let mut note_requests = Vec::with_capacity(notes.len());
315        for note in notes {
316            let tag = note.metadata().tag();
317            let note_file = NoteFile::NoteDetails {
318                details: note.into(),
319                after_block_num,
320                tag: Some(tag),
321            };
322            note_requests.push(note_file);
323        }
324        let imported_commitments = self.import_notes(&note_requests).await?;
325        let imported_ids = imported_commitments
326            .into_iter()
327            .filter_map(|commitment| id_by_commitment.get(&commitment).copied())
328            .collect();
329
330        Ok((imported_ids, rcursor))
331    }
332}
333
334/// Note transport cursor
335///
336/// Pagination integer used to reduce the number of fetched notes from the note transport network,
337/// avoiding duplicate downloads.
338#[derive(Clone, Copy, Debug, PartialEq, PartialOrd, Eq, Ord)]
339pub struct NoteTransportCursor(u64);
340
341/// Note Transport update
342pub struct NoteTransportUpdate {
343    /// Pagination cursor for next fetch
344    pub cursor: NoteTransportCursor,
345    /// Fetched notes
346    pub notes: Vec<Note>,
347}
348
349impl NoteTransportCursor {
350    pub fn new(value: u64) -> Self {
351        Self(value)
352    }
353
354    pub fn init() -> Self {
355        Self::new(0)
356    }
357
358    pub fn value(&self) -> u64 {
359        self.0
360    }
361}
362
363impl From<u64> for NoteTransportCursor {
364    fn from(value: u64) -> Self {
365        Self::new(value)
366    }
367}
368
369/// The main transport client trait for sending and receiving encrypted notes
370#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
371#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
372pub trait NoteTransportClient: Send + Sync {
373    /// Send a note with optionally encrypted details
374    async fn send_note(
375        &self,
376        header: NoteHeader,
377        details: Vec<u8>,
378    ) -> Result<(), NoteTransportError>;
379
380    /// Fetch notes for given tags
381    ///
382    /// Downloads notes for given tags.
383    /// Returns notes labelled after the provided cursor (pagination), and an updated cursor.
384    async fn fetch_notes(
385        &self,
386        tag: &[NoteTag],
387        cursor: NoteTransportCursor,
388    ) -> Result<(Vec<NoteInfo>, NoteTransportCursor), NoteTransportError>;
389
390    /// Stream notes for a given tag
391    async fn stream_notes(
392        &self,
393        tag: NoteTag,
394        cursor: NoteTransportCursor,
395    ) -> Result<Box<dyn NoteStream>, NoteTransportError>;
396}
397
398/// Stream trait for note streaming
399pub trait NoteStream:
400    Stream<Item = Result<Vec<NoteInfo>, NoteTransportError>> + Send + Unpin
401{
402}
403
404/// Information about a note fetched from the note transport network
405#[derive(Debug, Clone)]
406pub struct NoteInfo {
407    /// Note header
408    pub header: NoteHeader,
409    /// Note details, can be encrypted
410    pub details_bytes: Vec<u8>,
411}
412
413// SERIALIZATION
414// ================================================================================================
415
416impl Serializable for NoteInfo {
417    fn write_into<W: ByteWriter>(&self, target: &mut W) {
418        self.header.write_into(target);
419        self.details_bytes.write_into(target);
420    }
421}
422
423impl Deserializable for NoteInfo {
424    fn read_from<R: ByteReader>(source: &mut R) -> Result<Self, DeserializationError> {
425        let header = NoteHeader::read_from(source)?;
426        let details_bytes = Vec::<u8>::read_from(source)?;
427        Ok(NoteInfo { header, details_bytes })
428    }
429}
430
431impl Serializable for NoteTransportCursor {
432    fn write_into<W: ByteWriter>(&self, target: &mut W) {
433        self.0.write_into(target);
434    }
435}
436
437impl Deserializable for NoteTransportCursor {
438    fn read_from<R: ByteReader>(source: &mut R) -> Result<Self, DeserializationError> {
439        let value = u64::read_from(source)?;
440        Ok(Self::new(value))
441    }
442}
443
444fn rejoin_note(header: &NoteHeader, details_bytes: &[u8]) -> Result<Note, DeserializationError> {
445    let mut reader = SliceReader::new(details_bytes);
446    let details = NoteDetails::read_from(&mut reader)?;
447    // The transport wire format only carries `NoteHeader` + serialized `NoteDetails`, not the
448    // attachments collection. We rejoin with empty attachments; this matches the original note
449    // only when it had no attachments in the first place.
450    let partial_metadata = *header.metadata().partial_metadata();
451    Ok(Note::new(
452        details.assets().clone(),
453        partial_metadata,
454        details.recipient().clone(),
455    ))
456}