use alloc::boxed::Box;
use alloc::collections::BTreeMap;
use alloc::string::ToString;
use alloc::sync::Arc;
use alloc::vec::Vec;
use core::pin::Pin;
use core::sync::atomic::{AtomicUsize, Ordering};
use core::task::{Context, Poll};
use chrono::Utc;
use futures::Stream;
use miden_protocol::block::BlockNumber;
use miden_protocol::note::{NoteHeader, NoteTag};
use miden_tx::utils::serde::{
ByteReader,
ByteWriter,
Deserializable,
DeserializationError,
Serializable,
};
use miden_tx::utils::sync::RwLock;
use crate::note_transport::{
NoteInfo,
NoteStream,
NoteTransportClient,
NoteTransportCursor,
NoteTransportError,
};
#[derive(Clone)]
pub struct MockNoteTransportNode {
notes: BTreeMap<NoteTag, Vec<(NoteInfo, NoteTransportCursor)>>,
max_batch: Option<usize>,
}
impl MockNoteTransportNode {
pub fn new() -> Self {
Self {
notes: BTreeMap::default(),
max_batch: None,
}
}
pub fn with_max_batch(max_batch: usize) -> Self {
Self {
notes: BTreeMap::default(),
max_batch: Some(max_batch),
}
}
pub fn add_note(&mut self, header: NoteHeader, details_bytes: Vec<u8>) {
self.add_note_after(header, details_bytes, None);
}
pub fn add_note_after(
&mut self,
header: NoteHeader,
details_bytes: Vec<u8>,
block_hint: Option<BlockNumber>,
) {
let tag = header.metadata().tag();
let info = NoteInfo { header, details_bytes, block_hint };
let cursor = u64::try_from(Utc::now().timestamp_micros()).unwrap();
self.notes.entry(tag).or_default().push((info, cursor.into()));
}
pub fn get_notes(
&self,
tags: &[NoteTag],
cursor: NoteTransportCursor,
) -> (Vec<NoteInfo>, NoteTransportCursor) {
let mut collected: Vec<(NoteInfo, NoteTransportCursor)> = vec![];
for tag in tags {
let tnotes = self
.notes
.get(tag)
.map(|pg_notes| {
if let Some(pos) = pg_notes.iter().position(|(_, tcursor)| *tcursor > cursor) {
&pg_notes[pos..]
} else {
&[]
}
})
.map(Vec::from)
.unwrap_or_default();
collected.extend(tnotes);
}
collected.sort_by_key(|(_, c)| *c);
if let Some(max) = self.max_batch {
collected.truncate(max);
}
let rcursor = collected.iter().map(|(_, c)| *c).max().unwrap_or(cursor);
let notes = collected.into_iter().map(|(n, _)| n).collect();
(notes, rcursor)
}
}
impl Default for MockNoteTransportNode {
fn default() -> Self {
Self::new()
}
}
#[derive(Clone, Default)]
pub struct MockNoteTransportApi {
pub mock_node: Arc<RwLock<MockNoteTransportNode>>,
}
impl MockNoteTransportApi {
pub fn new(mock_node: Arc<RwLock<MockNoteTransportNode>>) -> Self {
Self { mock_node }
}
}
impl MockNoteTransportApi {
pub fn send_note(&self, header: NoteHeader, details_bytes: Vec<u8>) {
self.mock_node.write().add_note(header, details_bytes);
}
pub fn send_note_with_block_hint(
&self,
header: NoteHeader,
details_bytes: Vec<u8>,
block_hint: BlockNumber,
) {
self.mock_node.write().add_note_after(header, details_bytes, Some(block_hint));
}
pub fn fetch_notes(
&self,
tags: &[NoteTag],
cursor: NoteTransportCursor,
) -> (Vec<NoteInfo>, NoteTransportCursor) {
self.mock_node.read().get_notes(tags, cursor)
}
}
pub struct DummyNoteStream {}
impl Stream for DummyNoteStream {
type Item = Result<Vec<NoteInfo>, NoteTransportError>;
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(None)
}
}
impl NoteStream for DummyNoteStream {}
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
impl NoteTransportClient for MockNoteTransportApi {
async fn send_note(
&self,
header: NoteHeader,
details: Vec<u8>,
) -> Result<(), NoteTransportError> {
self.send_note(header, details);
Ok(())
}
async fn send_note_with_block_hint(
&self,
header: NoteHeader,
details: Vec<u8>,
block_hint: BlockNumber,
) -> Result<(), NoteTransportError> {
self.send_note_with_block_hint(header, details, block_hint);
Ok(())
}
async fn fetch_notes(
&self,
tags: &[NoteTag],
cursor: NoteTransportCursor,
) -> Result<(Vec<NoteInfo>, NoteTransportCursor), NoteTransportError> {
Ok(self.fetch_notes(tags, cursor))
}
async fn stream_notes(
&self,
_tag: NoteTag,
_cursor: NoteTransportCursor,
) -> Result<Box<dyn NoteStream>, NoteTransportError> {
Ok(Box::new(DummyNoteStream {}))
}
}
pub struct FaultyNoteTransportApi {
inner: MockNoteTransportApi,
fail_next: AtomicUsize,
send_attempts: AtomicUsize,
}
impl FaultyNoteTransportApi {
pub fn new(mock_node: Arc<RwLock<MockNoteTransportNode>>, fail_next: usize) -> Self {
Self {
inner: MockNoteTransportApi::new(mock_node),
fail_next: AtomicUsize::new(fail_next),
send_attempts: AtomicUsize::new(0),
}
}
pub fn fail_next_n(&self, n: usize) {
self.fail_next.store(n, Ordering::SeqCst);
}
pub fn send_attempts(&self) -> usize {
self.send_attempts.load(Ordering::SeqCst)
}
}
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
impl NoteTransportClient for FaultyNoteTransportApi {
async fn send_note(
&self,
header: NoteHeader,
details: Vec<u8>,
) -> Result<(), NoteTransportError> {
self.send_attempts.fetch_add(1, Ordering::SeqCst);
let should_fail = self
.fail_next
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |n| n.checked_sub(1))
.is_ok();
if should_fail {
return Err(NoteTransportError::Network(
"FaultyNoteTransportApi: simulated send_note failure".to_string(),
));
}
self.inner.send_note(header, details);
Ok(())
}
async fn send_note_with_block_hint(
&self,
header: NoteHeader,
details: Vec<u8>,
block_hint: BlockNumber,
) -> Result<(), NoteTransportError> {
self.send_attempts.fetch_add(1, Ordering::SeqCst);
let should_fail = self
.fail_next
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |n| n.checked_sub(1))
.is_ok();
if should_fail {
return Err(NoteTransportError::Network(
"FaultyNoteTransportApi: simulated send_note failure".to_string(),
));
}
self.inner.send_note_with_block_hint(header, details, block_hint);
Ok(())
}
async fn fetch_notes(
&self,
tags: &[NoteTag],
cursor: NoteTransportCursor,
) -> Result<(Vec<NoteInfo>, NoteTransportCursor), NoteTransportError> {
Ok(self.inner.fetch_notes(tags, cursor))
}
async fn stream_notes(
&self,
_tag: NoteTag,
_cursor: NoteTransportCursor,
) -> Result<Box<dyn NoteStream>, NoteTransportError> {
Ok(Box::new(DummyNoteStream {}))
}
}
impl Serializable for MockNoteTransportNode {
fn write_into<W: ByteWriter>(&self, target: &mut W) {
self.notes.write_into(target);
}
}
impl Deserializable for MockNoteTransportNode {
fn read_from<R: ByteReader>(source: &mut R) -> Result<Self, DeserializationError> {
let notes = BTreeMap::<NoteTag, Vec<(NoteInfo, NoteTransportCursor)>>::read_from(source)?;
Ok(Self { notes, max_batch: None })
}
}