miden-client 0.15.0

Client library that facilitates interaction with the Miden network
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
pub mod errors;
pub mod generated;
#[cfg(feature = "tonic")]
pub mod grpc;

use alloc::boxed::Box;
use alloc::collections::BTreeMap;
use alloc::string::String;
use alloc::sync::Arc;
use alloc::vec::Vec;

use futures::Stream;
use miden_protocol::address::Address;
use miden_protocol::block::BlockNumber;
use miden_protocol::note::{
    Note,
    NoteDetails,
    NoteDetailsCommitment,
    NoteFile,
    NoteHeader,
    NoteId,
    NoteTag,
};
use miden_protocol::utils::serde::Serializable;
use miden_tx::auth::TransactionAuthenticator;
use miden_tx::utils::serde::{
    ByteReader,
    ByteWriter,
    Deserializable,
    DeserializationError,
    SliceReader,
};

pub use self::errors::NoteTransportError;
use crate::{Client, ClientError};

pub const NOTE_TRANSPORT_TESTNET_ENDPOINT: &str = "https://transport.miden.io";
pub const NOTE_TRANSPORT_DEVNET_ENDPOINT: &str = "https://transport.devnet.miden.io";
pub const NOTE_TRANSPORT_CURSOR_STORE_SETTING: &str = "note_transport_cursor";

/// Settings key for the durable relay outbox: a serialized `Vec<NoteInfo>` of
/// private notes whose transport delivery has not yet succeeded.
/// `send_private_note` appends (replacing any entry with the same note id)
/// before relaying; [`Client::flush_relay_outbox`] drains entries that re-send
/// successfully. Reusing the settings k/v avoids a Store-trait schema change
/// while surviving process restarts.
pub const NOTE_TRANSPORT_OUTBOX_KEY: &str = "note_transport_outbox";

/// Client note transport methods.
impl<AUTH> Client<AUTH> {
    /// Check if note transport connection is configured
    pub fn is_note_transport_enabled(&self) -> bool {
        self.note_transport_api.is_some()
    }

    /// Returns the Note Transport client
    ///
    /// Errors if the note transport is not configured.
    pub(crate) fn get_note_transport_api(
        &self,
    ) -> Result<Arc<dyn NoteTransportClient>, NoteTransportError> {
        self.note_transport_api.clone().ok_or(NoteTransportError::Disabled)
    }

    /// Send a note through the note transport network.
    ///
    /// The note will be end-to-end encrypted (unimplemented, currently plaintext)
    /// using the provided recipient's `address` details.
    /// The recipient will be able to retrieve this note through the note's [`NoteTag`].
    ///
    /// **Durability.** The relay payload is persisted to the outbox before the
    /// transport call. If the call fails or is interrupted, the entry stays in
    /// the outbox and is retried on the next [`Client::flush_relay_outbox`]
    /// (which [`Client::sync_note_transport`] runs), so a transient transport
    /// failure does not drop the note. The receiver dedupes by note id, so a
    /// re-send after a partial success is harmless.
    pub async fn send_private_note(
        &mut self,
        note: Note,
        _address: &Address,
    ) -> Result<(), ClientError> {
        let api = self.get_note_transport_api()?;

        let header = *note.header();
        let note_id = header.id();
        let details = NoteDetails::from(note);
        let details_bytes = details.to_bytes();
        // e2ee impl hint:
        // address.key().encrypt(details_bytes)

        // Persist the payload before the network call so a failed or
        // interrupted `send_note` leaves a recoverable record rather than
        // losing the only copy with the call frame.
        let entry = NoteInfo {
            header,
            details_bytes: details_bytes.clone(),
        };
        let mut outbox = self.load_relay_outbox().await?;
        // Replace any existing entry for this note id so the latest payload
        // wins when a still-pending note is re-sent.
        outbox.retain(|e| e.header.id() != note_id);
        outbox.push(entry);
        self.save_relay_outbox(outbox).await?;

        api.send_note(header, details_bytes).await?;

        // Relay succeeded — drop the entry. A failed store write here is
        // tolerable: the next flush re-sends and the receiver dedupes by note
        // id, so a stale entry never causes loss.
        let mut outbox = self.load_relay_outbox().await?;
        outbox.retain(|e| e.header.id() != note_id);
        self.save_relay_outbox(outbox).await?;

        Ok(())
    }

    /// Re-attempt every relay payload in the durable outbox. Each entry is a
    /// private note whose previous transport delivery failed. Successful
    /// re-sends are dropped; failures are kept for the next call. Every entry
    /// is attempted independently, so one persistently-failing note does not
    /// block the others.
    ///
    /// [`Client::sync_note_transport`] runs this automatically and ignores its
    /// error, so a relay failure can't block a sync. Callers driving retries
    /// themselves can invoke it directly and inspect the returned error.
    pub async fn flush_relay_outbox(&self) -> Result<(), ClientError> {
        let api = self.get_note_transport_api()?;

        let entries = self.load_relay_outbox().await?;
        if entries.is_empty() {
            return Ok(());
        }

        // Attempt every entry independently so a single persistently-failing
        // note can't block the rest. The outbox holds only the caller's own
        // failed sends, so it stays small and this is not a meaningful burst.
        let mut remaining = Vec::new();
        let mut last_err: Option<NoteTransportError> = None;

        for entry in entries {
            match api.send_note(entry.header, entry.details_bytes.clone()).await {
                Ok(()) => {},
                Err(err) => {
                    tracing::warn!(?err, "relay-outbox entry retry failed; will retry next sync");
                    remaining.push(entry);
                    last_err = Some(err);
                },
            }
        }

        self.save_relay_outbox(remaining).await?;

        if let Some(err) = last_err {
            return Err(err.into());
        }
        Ok(())
    }

    /// Load the durable relay outbox.
    ///
    /// Returns an empty `Vec` if the outbox key is absent. On deserialization
    /// failure (schema mismatch or storage corruption) the entry is dropped and
    /// an empty `Vec` is returned — leaving unreadable bytes in place would
    /// block every subsequent relay because each sync would re-read them.
    async fn load_relay_outbox(&self) -> Result<Vec<NoteInfo>, ClientError> {
        let bytes = self
            .store
            .get_setting(String::from(NOTE_TRANSPORT_OUTBOX_KEY))
            .await
            .map_err(ClientError::StoreError)?;
        let Some(bytes) = bytes else {
            return Ok(Vec::new());
        };
        match Vec::<NoteInfo>::read_from_bytes(&bytes) {
            Ok(entries) => Ok(entries),
            Err(err) => {
                tracing::warn!(?err, "dropping unreadable relay outbox; resetting to empty");
                self.store
                    .remove_setting(String::from(NOTE_TRANSPORT_OUTBOX_KEY))
                    .await
                    .map_err(ClientError::StoreError)?;
                Ok(Vec::new())
            },
        }
    }

    /// Persist the relay outbox, removing the key entirely when empty so the
    /// settings table doesn't accumulate empty-vec blobs.
    async fn save_relay_outbox(&self, entries: Vec<NoteInfo>) -> Result<(), ClientError> {
        let key = String::from(NOTE_TRANSPORT_OUTBOX_KEY);
        if entries.is_empty() {
            return self.store.remove_setting(key).await.map_err(ClientError::StoreError);
        }
        let bytes = entries.to_bytes();
        self.store.set_setting(key, bytes).await.map_err(ClientError::StoreError)
    }
}

impl<AUTH> Client<AUTH>
where
    AUTH: TransactionAuthenticator + Sync + 'static,
{
    /// Fetch notes for tracked note tags.
    ///
    /// The client will query the configured note transport node for all tracked note tags.
    /// To list tracked tags please use [`Client::get_note_tags`]. To add a new note tag please use
    /// [`Client::add_note_tag`].
    /// Only notes directed at your addresses will be stored and readable given the use of
    /// end-to-end encryption (unimplemented).
    /// Fetched notes will be stored into the client's store.
    ///
    /// An internal pagination mechanism is employed to reduce the number of downloaded notes.
    /// To fetch the full history of private notes for the tracked tags, use
    /// [`Client::fetch_all_private_notes`].
    pub async fn fetch_private_notes(&mut self) -> Result<(), ClientError> {
        let note_tags: Vec<NoteTag> =
            self.store.get_unique_note_tags().await?.into_iter().collect();
        let cursor = self.store.get_note_transport_cursor().await?;

        let (_, new_cursor) = self.fetch_transport_notes(cursor, &note_tags).await?;
        self.store.update_note_transport_cursor(new_cursor).await?;

        Ok(())
    }

    /// Fetches all notes for tracked note tags, draining the server's paginated
    /// response by looping until the cursor stops advancing.
    ///
    /// Similar to [`Client::fetch_private_notes`] but ignores the stored
    /// pagination cursor and re-scans from the beginning. The server-side
    /// transport caps each response at a fixed batch size; this method issues
    /// repeated fetch calls until one returns the same cursor it was given
    /// (i.e. no new notes), so the documented "fetches all notes" semantics
    /// hold regardless of how large the backlog is. Prefer
    /// [`Client::fetch_private_notes`] for steady-state syncing to avoid
    /// re-downloading already-seen notes.
    pub async fn fetch_all_private_notes(&mut self) -> Result<(), ClientError> {
        // Safety cap on a misbehaving server. At 500 notes per batch, 1000
        // iterations covers 500k notes — well beyond any plausible retention
        // window — and bounds the worst-case wall-clock at ~50s at 50ms/req.
        // Hitting this signals a server bug, not an honest backlog.
        const MAX_ITERATIONS: usize = 1_000;

        let note_tags: Vec<NoteTag> =
            self.store.get_unique_note_tags().await?.into_iter().collect();
        // Snapshot the stored cursor up front so we can advance (never regress)
        // it after the drain. Without this guard, starting the drain at
        // `init()` and persisting per-batch would clobber a previously
        // advanced cursor with the small `rcursor` of the first batch.
        let stored_cursor = self.store.get_note_transport_cursor().await?;

        let mut cursor = NoteTransportCursor::init();
        for _ in 0..MAX_ITERATIONS {
            let (_, new_cursor) = self.fetch_transport_notes(cursor, &note_tags).await?;
            // Terminate on any lack of forward progress. A well-behaved server
            // returns `new_cursor == cursor` when there are no new notes (since
            // `rcursor = max(cursor, max_seq_returned)`); using `<=` here also
            // handles implementations that return an `init()` cursor on empty
            // batches (see the in-tree mock transport).
            if new_cursor <= cursor {
                let final_cursor = core::cmp::max(cursor, stored_cursor);
                self.store.update_note_transport_cursor(final_cursor).await?;
                return Ok(());
            }
            cursor = new_cursor;
        }

        Err(ClientError::NoteTransportError(NoteTransportError::PaginationDidNotTerminate(
            MAX_ITERATIONS,
        )))
    }

    /// Fetch one batch of notes from the note transport network for the provided tags.
    ///
    /// The server paginates; this method issues one RPC and returns the imported details
    /// commitments together with the new cursor. The returned cursor equals the input cursor when
    /// the batch was empty (i.e. no new notes). Callers that want to drain the full backlog should
    /// loop until `new_cursor == cursor` (see [`Client::fetch_all_private_notes`]). Callers that do
    /// steady-state polling (see [`Client::sync_state`] / [`Client::fetch_private_notes`]) should
    /// call this once per tick with the stored cursor.
    ///
    /// Downloaded notes are imported into the local store. Persistence of the returned cursor is
    /// left to the caller so that drain loops can guard against regression of an already-advanced
    /// stored cursor.
    pub(crate) async fn fetch_transport_notes(
        &mut self,
        cursor: NoteTransportCursor,
        tags: &[NoteTag],
    ) -> Result<(Vec<NoteId>, NoteTransportCursor), ClientError> {
        // Number of blocks to look back from sync height when scanning for committed notes.
        // Handles the race where a note is committed on-chain just before the NTL delivers
        // its data — without this, check_expected_notes would scan from sync_height forward
        // and miss the already-committed note.
        const NOTE_LOOKBACK_BLOCKS: u32 = 20;

        let mut notes = Vec::new();
        let (note_infos, rcursor) =
            self.get_note_transport_api()?.fetch_notes(tags, cursor).await?;
        for note_info in &note_infos {
            // e2ee impl hint:
            // for key in self.store.decryption_keys() try
            // key.decrypt(details_bytes_encrypted)
            let note = rejoin_note(&note_info.header, &note_info.details_bytes)?;
            notes.push(note);
        }

        let sync_height = self.get_sync_height().await?;
        let after_block_num =
            BlockNumber::from(sync_height.as_u32().saturating_sub(NOTE_LOOKBACK_BLOCKS));

        let id_by_commitment: BTreeMap<NoteDetailsCommitment, NoteId> =
            notes.iter().map(|note| (note.details_commitment(), note.id())).collect();

        let mut note_requests = Vec::with_capacity(notes.len());
        for note in notes {
            let tag = note.metadata().tag();
            let note_file = NoteFile::NoteDetails {
                details: note.into(),
                after_block_num,
                tag: Some(tag),
            };
            note_requests.push(note_file);
        }
        let imported_commitments = self.import_notes(&note_requests).await?;
        let imported_ids = imported_commitments
            .into_iter()
            .filter_map(|commitment| id_by_commitment.get(&commitment).copied())
            .collect();

        Ok((imported_ids, rcursor))
    }
}

/// Note transport cursor
///
/// Pagination integer used to reduce the number of fetched notes from the note transport network,
/// avoiding duplicate downloads.
#[derive(Clone, Copy, Debug, PartialEq, PartialOrd, Eq, Ord)]
pub struct NoteTransportCursor(u64);

/// Note Transport update
pub struct NoteTransportUpdate {
    /// Pagination cursor for next fetch
    pub cursor: NoteTransportCursor,
    /// Fetched notes
    pub notes: Vec<Note>,
}

impl NoteTransportCursor {
    pub fn new(value: u64) -> Self {
        Self(value)
    }

    pub fn init() -> Self {
        Self::new(0)
    }

    pub fn value(&self) -> u64 {
        self.0
    }
}

impl From<u64> for NoteTransportCursor {
    fn from(value: u64) -> Self {
        Self::new(value)
    }
}

/// The main transport client trait for sending and receiving encrypted notes
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
pub trait NoteTransportClient: Send + Sync {
    /// Send a note with optionally encrypted details
    async fn send_note(
        &self,
        header: NoteHeader,
        details: Vec<u8>,
    ) -> Result<(), NoteTransportError>;

    /// Fetch notes for given tags
    ///
    /// Downloads notes for given tags.
    /// Returns notes labelled after the provided cursor (pagination), and an updated cursor.
    async fn fetch_notes(
        &self,
        tag: &[NoteTag],
        cursor: NoteTransportCursor,
    ) -> Result<(Vec<NoteInfo>, NoteTransportCursor), NoteTransportError>;

    /// Stream notes for a given tag
    async fn stream_notes(
        &self,
        tag: NoteTag,
        cursor: NoteTransportCursor,
    ) -> Result<Box<dyn NoteStream>, NoteTransportError>;
}

/// Stream trait for note streaming
pub trait NoteStream:
    Stream<Item = Result<Vec<NoteInfo>, NoteTransportError>> + Send + Unpin
{
}

/// Information about a note fetched from the note transport network
#[derive(Debug, Clone)]
pub struct NoteInfo {
    /// Note header
    pub header: NoteHeader,
    /// Note details, can be encrypted
    pub details_bytes: Vec<u8>,
}

// SERIALIZATION
// ================================================================================================

impl Serializable for NoteInfo {
    fn write_into<W: ByteWriter>(&self, target: &mut W) {
        self.header.write_into(target);
        self.details_bytes.write_into(target);
    }
}

impl Deserializable for NoteInfo {
    fn read_from<R: ByteReader>(source: &mut R) -> Result<Self, DeserializationError> {
        let header = NoteHeader::read_from(source)?;
        let details_bytes = Vec::<u8>::read_from(source)?;
        Ok(NoteInfo { header, details_bytes })
    }
}

impl Serializable for NoteTransportCursor {
    fn write_into<W: ByteWriter>(&self, target: &mut W) {
        self.0.write_into(target);
    }
}

impl Deserializable for NoteTransportCursor {
    fn read_from<R: ByteReader>(source: &mut R) -> Result<Self, DeserializationError> {
        let value = u64::read_from(source)?;
        Ok(Self::new(value))
    }
}

fn rejoin_note(header: &NoteHeader, details_bytes: &[u8]) -> Result<Note, DeserializationError> {
    let mut reader = SliceReader::new(details_bytes);
    let details = NoteDetails::read_from(&mut reader)?;
    // The transport wire format only carries `NoteHeader` + serialized `NoteDetails`, not the
    // attachments collection. We rejoin with empty attachments; this matches the original note
    // only when it had no attachments in the first place.
    let partial_metadata = *header.metadata().partial_metadata();
    Ok(Note::new(
        details.assets().clone(),
        partial_metadata,
        details.recipient().clone(),
    ))
}