1use alloc::boxed::Box;
2use alloc::collections::BTreeMap;
3use alloc::string::ToString;
4use alloc::sync::Arc;
5use alloc::vec::Vec;
6use core::pin::Pin;
7use core::sync::atomic::{AtomicUsize, Ordering};
8use core::task::{Context, Poll};
9
10use chrono::Utc;
11use futures::Stream;
12use miden_protocol::block::BlockNumber;
13use miden_protocol::note::{NoteHeader, NoteTag};
14use miden_tx::utils::serde::{
15 ByteReader,
16 ByteWriter,
17 Deserializable,
18 DeserializationError,
19 Serializable,
20};
21use miden_tx::utils::sync::RwLock;
22
23use crate::note_transport::{
24 NoteInfo,
25 NoteStream,
26 NoteTransportClient,
27 NoteTransportCursor,
28 NoteTransportError,
29};
30
31#[derive(Clone)]
35pub struct MockNoteTransportNode {
36 notes: BTreeMap<NoteTag, Vec<(NoteInfo, NoteTransportCursor)>>,
37 max_batch: Option<usize>,
41}
42
43impl MockNoteTransportNode {
44 pub fn new() -> Self {
45 Self {
46 notes: BTreeMap::default(),
47 max_batch: None,
48 }
49 }
50
51 pub fn with_max_batch(max_batch: usize) -> Self {
53 Self {
54 notes: BTreeMap::default(),
55 max_batch: Some(max_batch),
56 }
57 }
58
59 pub fn add_note(&mut self, header: NoteHeader, details_bytes: Vec<u8>) {
60 self.add_note_after(header, details_bytes, None);
61 }
62
63 pub fn add_note_after(
66 &mut self,
67 header: NoteHeader,
68 details_bytes: Vec<u8>,
69 block_hint: Option<BlockNumber>,
70 ) {
71 let tag = header.metadata().tag();
72 let info = NoteInfo { header, details_bytes, block_hint };
73 let cursor = u64::try_from(Utc::now().timestamp_micros()).unwrap();
74 self.notes.entry(tag).or_default().push((info, cursor.into()));
75 }
76
77 pub fn get_notes(
78 &self,
79 tags: &[NoteTag],
80 cursor: NoteTransportCursor,
81 ) -> (Vec<NoteInfo>, NoteTransportCursor) {
82 let mut collected: Vec<(NoteInfo, NoteTransportCursor)> = vec![];
86 for tag in tags {
87 let tnotes = self
89 .notes
90 .get(tag)
91 .map(|pg_notes| {
92 if let Some(pos) = pg_notes.iter().position(|(_, tcursor)| *tcursor > cursor) {
94 &pg_notes[pos..]
95 } else {
96 &[]
97 }
98 })
99 .map(Vec::from)
100 .unwrap_or_default();
101 collected.extend(tnotes);
102 }
103
104 collected.sort_by_key(|(_, c)| *c);
108
109 if let Some(max) = self.max_batch {
111 collected.truncate(max);
112 }
113
114 let rcursor = collected.iter().map(|(_, c)| *c).max().unwrap_or(cursor);
115 let notes = collected.into_iter().map(|(n, _)| n).collect();
116 (notes, rcursor)
117 }
118}
119
120impl Default for MockNoteTransportNode {
121 fn default() -> Self {
122 Self::new()
123 }
124}
125
126#[derive(Clone, Default)]
130pub struct MockNoteTransportApi {
131 pub mock_node: Arc<RwLock<MockNoteTransportNode>>,
132}
133
134impl MockNoteTransportApi {
135 pub fn new(mock_node: Arc<RwLock<MockNoteTransportNode>>) -> Self {
136 Self { mock_node }
137 }
138}
139
140impl MockNoteTransportApi {
141 pub fn send_note(&self, header: NoteHeader, details_bytes: Vec<u8>) {
142 self.mock_node.write().add_note(header, details_bytes);
143 }
144
145 pub fn send_note_with_block_hint(
146 &self,
147 header: NoteHeader,
148 details_bytes: Vec<u8>,
149 block_hint: BlockNumber,
150 ) {
151 self.mock_node.write().add_note_after(header, details_bytes, Some(block_hint));
152 }
153
154 pub fn fetch_notes(
155 &self,
156 tags: &[NoteTag],
157 cursor: NoteTransportCursor,
158 ) -> (Vec<NoteInfo>, NoteTransportCursor) {
159 self.mock_node.read().get_notes(tags, cursor)
160 }
161}
162
163pub struct DummyNoteStream {}
164impl Stream for DummyNoteStream {
165 type Item = Result<Vec<NoteInfo>, NoteTransportError>;
166
167 fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
168 Poll::Ready(None)
169 }
170}
171impl NoteStream for DummyNoteStream {}
172
173#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
174#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
175impl NoteTransportClient for MockNoteTransportApi {
176 async fn send_note(
177 &self,
178 header: NoteHeader,
179 details: Vec<u8>,
180 ) -> Result<(), NoteTransportError> {
181 self.send_note(header, details);
182 Ok(())
183 }
184
185 async fn send_note_with_block_hint(
186 &self,
187 header: NoteHeader,
188 details: Vec<u8>,
189 block_hint: BlockNumber,
190 ) -> Result<(), NoteTransportError> {
191 self.send_note_with_block_hint(header, details, block_hint);
192 Ok(())
193 }
194
195 async fn fetch_notes(
196 &self,
197 tags: &[NoteTag],
198 cursor: NoteTransportCursor,
199 ) -> Result<(Vec<NoteInfo>, NoteTransportCursor), NoteTransportError> {
200 Ok(self.fetch_notes(tags, cursor))
201 }
202
203 async fn stream_notes(
204 &self,
205 _tag: NoteTag,
206 _cursor: NoteTransportCursor,
207 ) -> Result<Box<dyn NoteStream>, NoteTransportError> {
208 Ok(Box::new(DummyNoteStream {}))
209 }
210}
211
212pub struct FaultyNoteTransportApi {
229 inner: MockNoteTransportApi,
230 fail_next: AtomicUsize,
231 send_attempts: AtomicUsize,
232}
233
234impl FaultyNoteTransportApi {
235 pub fn new(mock_node: Arc<RwLock<MockNoteTransportNode>>, fail_next: usize) -> Self {
238 Self {
239 inner: MockNoteTransportApi::new(mock_node),
240 fail_next: AtomicUsize::new(fail_next),
241 send_attempts: AtomicUsize::new(0),
242 }
243 }
244
245 pub fn fail_next_n(&self, n: usize) {
248 self.fail_next.store(n, Ordering::SeqCst);
249 }
250
251 pub fn send_attempts(&self) -> usize {
253 self.send_attempts.load(Ordering::SeqCst)
254 }
255}
256
257#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
258#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
259impl NoteTransportClient for FaultyNoteTransportApi {
260 async fn send_note(
261 &self,
262 header: NoteHeader,
263 details: Vec<u8>,
264 ) -> Result<(), NoteTransportError> {
265 self.send_attempts.fetch_add(1, Ordering::SeqCst);
266 let should_fail = self
267 .fail_next
268 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |n| n.checked_sub(1))
269 .is_ok();
270 if should_fail {
271 return Err(NoteTransportError::Network(
272 "FaultyNoteTransportApi: simulated send_note failure".to_string(),
273 ));
274 }
275 self.inner.send_note(header, details);
276 Ok(())
277 }
278
279 async fn send_note_with_block_hint(
280 &self,
281 header: NoteHeader,
282 details: Vec<u8>,
283 block_hint: BlockNumber,
284 ) -> Result<(), NoteTransportError> {
285 self.send_attempts.fetch_add(1, Ordering::SeqCst);
286 let should_fail = self
287 .fail_next
288 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |n| n.checked_sub(1))
289 .is_ok();
290 if should_fail {
291 return Err(NoteTransportError::Network(
292 "FaultyNoteTransportApi: simulated send_note failure".to_string(),
293 ));
294 }
295 self.inner.send_note_with_block_hint(header, details, block_hint);
296 Ok(())
297 }
298
299 async fn fetch_notes(
300 &self,
301 tags: &[NoteTag],
302 cursor: NoteTransportCursor,
303 ) -> Result<(Vec<NoteInfo>, NoteTransportCursor), NoteTransportError> {
304 Ok(self.inner.fetch_notes(tags, cursor))
305 }
306
307 async fn stream_notes(
308 &self,
309 _tag: NoteTag,
310 _cursor: NoteTransportCursor,
311 ) -> Result<Box<dyn NoteStream>, NoteTransportError> {
312 Ok(Box::new(DummyNoteStream {}))
313 }
314}
315
316impl Serializable for MockNoteTransportNode {
320 fn write_into<W: ByteWriter>(&self, target: &mut W) {
321 self.notes.write_into(target);
322 }
323}
324
325impl Deserializable for MockNoteTransportNode {
326 fn read_from<R: ByteReader>(source: &mut R) -> Result<Self, DeserializationError> {
327 let notes = BTreeMap::<NoteTag, Vec<(NoteInfo, NoteTransportCursor)>>::read_from(source)?;
328
329 Ok(Self { notes, max_batch: None })
330 }
331}