use super::*;
use crate::protocol::notifications::handler::{
NotificationsSinkMessage, ASYNC_NOTIFICATIONS_BUFFER_SIZE,
};
use std::future::Future;
#[tokio::test]
async fn validate_and_accept_substream() {
let (proto, mut notif) = notification_service("/proto/1".into());
let (handle, _stream) = proto.split();
let peer_id = PeerId::random();
let ValidationCallResult::WaitForValidation(result_rx) =
handle.report_incoming_substream(peer_id, vec![1, 3, 3, 7]).unwrap()
else {
panic!("peerset not enabled");
};
if let Some(NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx }) =
notif.next_event().await
{
assert_eq!(peer_id, peer.into());
assert_eq!(handshake, vec![1, 3, 3, 7]);
let _ = result_tx.send(ValidationResult::Accept).unwrap();
} else {
panic!("invalid event received");
}
assert_eq!(result_rx.await.unwrap(), ValidationResult::Accept);
}
#[tokio::test]
async fn substream_opened() {
let (proto, mut notif) = notification_service("/proto/1".into());
let (sink, _, _) = NotificationsSink::new(PeerId::random());
let (mut handle, _stream) = proto.split();
let peer_id = PeerId::random();
handle
.report_substream_opened(peer_id, Direction::Inbound, vec![1, 3, 3, 7], None, sink)
.unwrap();
if let Some(NotificationEvent::NotificationStreamOpened {
peer,
negotiated_fallback,
handshake,
direction,
}) = notif.next_event().await
{
assert_eq!(peer_id, peer.into());
assert_eq!(negotiated_fallback, None);
assert_eq!(handshake, vec![1, 3, 3, 7]);
assert_eq!(direction, Direction::Inbound);
} else {
panic!("invalid event received");
}
}
#[tokio::test]
async fn send_sync_notification() {
let (proto, mut notif) = notification_service("/proto/1".into());
let (sink, _, mut sync_rx) = NotificationsSink::new(PeerId::random());
let (mut handle, _stream) = proto.split();
let peer_id = PeerId::random();
let ValidationCallResult::WaitForValidation(result_rx) =
handle.report_incoming_substream(peer_id, vec![1, 3, 3, 7]).unwrap()
else {
panic!("peerset not enabled");
};
if let Some(NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx }) =
notif.next_event().await
{
assert_eq!(peer_id, peer.into());
assert_eq!(handshake, vec![1, 3, 3, 7]);
let _ = result_tx.send(ValidationResult::Accept).unwrap();
} else {
panic!("invalid event received");
}
assert_eq!(result_rx.await.unwrap(), ValidationResult::Accept);
handle
.report_substream_opened(peer_id, Direction::Inbound, vec![1, 3, 3, 7], None, sink)
.unwrap();
if let Some(NotificationEvent::NotificationStreamOpened {
peer,
negotiated_fallback,
handshake,
direction,
}) = notif.next_event().await
{
assert_eq!(peer_id, peer.into());
assert_eq!(negotiated_fallback, None);
assert_eq!(handshake, vec![1, 3, 3, 7]);
assert_eq!(direction, Direction::Inbound);
} else {
panic!("invalid event received");
}
notif.send_sync_notification(&peer_id.into(), vec![1, 3, 3, 8]);
assert_eq!(
sync_rx.next().await,
Some(NotificationsSinkMessage::Notification { message: vec![1, 3, 3, 8] })
);
}
#[tokio::test]
async fn send_async_notification() {
let (proto, mut notif) = notification_service("/proto/1".into());
let (sink, mut async_rx, _) = NotificationsSink::new(PeerId::random());
let (mut handle, _stream) = proto.split();
let peer_id = PeerId::random();
let ValidationCallResult::WaitForValidation(result_rx) =
handle.report_incoming_substream(peer_id, vec![1, 3, 3, 7]).unwrap()
else {
panic!("peerset not enabled");
};
if let Some(NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx }) =
notif.next_event().await
{
assert_eq!(peer_id, peer.into());
assert_eq!(handshake, vec![1, 3, 3, 7]);
let _ = result_tx.send(ValidationResult::Accept).unwrap();
} else {
panic!("invalid event received");
}
assert_eq!(result_rx.await.unwrap(), ValidationResult::Accept);
handle
.report_substream_opened(peer_id, Direction::Inbound, vec![1, 3, 3, 7], None, sink)
.unwrap();
if let Some(NotificationEvent::NotificationStreamOpened {
peer,
negotiated_fallback,
handshake,
direction,
}) = notif.next_event().await
{
assert_eq!(peer_id, peer.into());
assert_eq!(negotiated_fallback, None);
assert_eq!(handshake, vec![1, 3, 3, 7]);
assert_eq!(direction, Direction::Inbound);
} else {
panic!("invalid event received");
}
notif.send_async_notification(&peer_id.into(), vec![1, 3, 3, 9]).await.unwrap();
assert_eq!(
async_rx.next().await,
Some(NotificationsSinkMessage::Notification { message: vec![1, 3, 3, 9] })
);
}
#[tokio::test]
async fn send_sync_notification_to_non_existent_peer() {
let (proto, mut notif) = notification_service("/proto/1".into());
let (_sink, _, _sync_rx) = NotificationsSink::new(PeerId::random());
let (_handle, _stream) = proto.split();
let peer = PeerId::random();
notif.send_sync_notification(&peer.into(), vec![1, 3, 3, 7])
}
#[tokio::test]
async fn send_async_notification_to_non_existent_peer() {
let (proto, mut notif) = notification_service("/proto/1".into());
let (_sink, _, _sync_rx) = NotificationsSink::new(PeerId::random());
let (_handle, _stream) = proto.split();
let peer = PeerId::random();
if let Err(error::Error::PeerDoesntExist(peer_id)) =
notif.send_async_notification(&peer.into(), vec![1, 3, 3, 7]).await
{
assert_eq!(peer, peer_id.into());
} else {
panic!("invalid error received from `send_async_notification()`");
}
}
#[tokio::test]
async fn receive_notification() {
let (proto, mut notif) = notification_service("/proto/1".into());
let (sink, _, _sync_rx) = NotificationsSink::new(PeerId::random());
let (mut handle, _stream) = proto.split();
let peer_id = PeerId::random();
let ValidationCallResult::WaitForValidation(result_rx) =
handle.report_incoming_substream(peer_id, vec![1, 3, 3, 7]).unwrap()
else {
panic!("peerset not enabled");
};
if let Some(NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx }) =
notif.next_event().await
{
assert_eq!(peer_id, peer.into());
assert_eq!(handshake, vec![1, 3, 3, 7]);
let _ = result_tx.send(ValidationResult::Accept).unwrap();
} else {
panic!("invalid event received");
}
assert_eq!(result_rx.await.unwrap(), ValidationResult::Accept);
handle
.report_substream_opened(peer_id, Direction::Inbound, vec![1, 3, 3, 7], None, sink)
.unwrap();
if let Some(NotificationEvent::NotificationStreamOpened {
peer,
negotiated_fallback,
handshake,
direction,
}) = notif.next_event().await
{
assert_eq!(peer_id, peer.into());
assert_eq!(negotiated_fallback, None);
assert_eq!(handshake, vec![1, 3, 3, 7]);
assert_eq!(direction, Direction::Inbound);
} else {
panic!("invalid event received");
}
handle.report_notification_received(peer_id, vec![1, 3, 3, 8]).unwrap();
if let Some(NotificationEvent::NotificationReceived { peer, notification }) =
notif.next_event().await
{
assert_eq!(peer_id, peer.into());
assert_eq!(notification, vec![1, 3, 3, 8]);
} else {
panic!("invalid event received");
}
}
#[tokio::test]
async fn backpressure_works() {
let (proto, mut notif) = notification_service("/proto/1".into());
let (sink, mut async_rx, _) = NotificationsSink::new(PeerId::random());
let (mut handle, _stream) = proto.split();
let peer_id = PeerId::random();
let ValidationCallResult::WaitForValidation(result_rx) =
handle.report_incoming_substream(peer_id, vec![1, 3, 3, 7]).unwrap()
else {
panic!("peerset not enabled");
};
if let Some(NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx }) =
notif.next_event().await
{
assert_eq!(peer_id, peer.into());
assert_eq!(handshake, vec![1, 3, 3, 7]);
let _ = result_tx.send(ValidationResult::Accept).unwrap();
} else {
panic!("invalid event received");
}
assert_eq!(result_rx.await.unwrap(), ValidationResult::Accept);
handle
.report_substream_opened(peer_id, Direction::Inbound, vec![1, 3, 3, 7], None, sink)
.unwrap();
if let Some(NotificationEvent::NotificationStreamOpened {
peer,
negotiated_fallback,
handshake,
direction,
}) = notif.next_event().await
{
assert_eq!(peer_id, peer.into());
assert_eq!(negotiated_fallback, None);
assert_eq!(handshake, vec![1, 3, 3, 7]);
assert_eq!(direction, Direction::Inbound);
} else {
panic!("invalid event received");
}
for i in 0..=ASYNC_NOTIFICATIONS_BUFFER_SIZE {
assert!(futures::poll!(
notif.send_async_notification(&peer_id.into(), vec![1, 3, 3, i as u8])
)
.is_ready());
}
assert!(futures::poll!(notif.send_async_notification(&peer_id.into(), vec![1, 3, 3, 9]))
.is_pending());
assert_eq!(
async_rx.next().await,
Some(NotificationsSinkMessage::Notification { message: vec![1, 3, 3, 0] })
);
assert!(
futures::poll!(notif.send_async_notification(&peer_id.into(), vec![1, 3, 3, 9])).is_ready()
);
}
#[tokio::test]
async fn peer_disconnects_then_sync_notification_is_sent() {
let (proto, mut notif) = notification_service("/proto/1".into());
let (sink, _, sync_rx) = NotificationsSink::new(PeerId::random());
let (mut handle, _stream) = proto.split();
let peer_id = PeerId::random();
let ValidationCallResult::WaitForValidation(result_rx) =
handle.report_incoming_substream(peer_id, vec![1, 3, 3, 7]).unwrap()
else {
panic!("peerset not enabled");
};
if let Some(NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx }) =
notif.next_event().await
{
assert_eq!(peer_id, peer.into());
assert_eq!(handshake, vec![1, 3, 3, 7]);
let _ = result_tx.send(ValidationResult::Accept).unwrap();
} else {
panic!("invalid event received");
}
assert_eq!(result_rx.await.unwrap(), ValidationResult::Accept);
handle
.report_substream_opened(peer_id, Direction::Inbound, vec![1, 3, 3, 7], None, sink)
.unwrap();
if let Some(NotificationEvent::NotificationStreamOpened {
peer,
negotiated_fallback,
handshake,
direction,
}) = notif.next_event().await
{
assert_eq!(peer_id, peer.into());
assert_eq!(negotiated_fallback, None);
assert_eq!(handshake, vec![1, 3, 3, 7]);
assert_eq!(direction, Direction::Inbound);
} else {
panic!("invalid event received");
}
handle.report_substream_closed(peer_id).unwrap();
drop(sync_rx);
notif.send_sync_notification(&peer_id.into(), vec![1, 3, 3, 7]);
}
#[tokio::test]
async fn peer_disconnects_then_async_notification_is_sent() {
let (proto, mut notif) = notification_service("/proto/1".into());
let (sink, async_rx, _) = NotificationsSink::new(PeerId::random());
let (mut handle, _stream) = proto.split();
let peer_id = PeerId::random();
let ValidationCallResult::WaitForValidation(result_rx) =
handle.report_incoming_substream(peer_id, vec![1, 3, 3, 7]).unwrap()
else {
panic!("peerset not enabled");
};
if let Some(NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx }) =
notif.next_event().await
{
assert_eq!(peer_id, peer.into());
assert_eq!(handshake, vec![1, 3, 3, 7]);
let _ = result_tx.send(ValidationResult::Accept).unwrap();
} else {
panic!("invalid event received");
}
assert_eq!(result_rx.await.unwrap(), ValidationResult::Accept);
handle
.report_substream_opened(peer_id, Direction::Inbound, vec![1, 3, 3, 7], None, sink)
.unwrap();
if let Some(NotificationEvent::NotificationStreamOpened {
peer,
negotiated_fallback,
handshake,
direction,
}) = notif.next_event().await
{
assert_eq!(peer_id, peer.into());
assert_eq!(negotiated_fallback, None);
assert_eq!(handshake, vec![1, 3, 3, 7]);
assert_eq!(direction, Direction::Inbound);
} else {
panic!("invalid event received");
}
handle.report_substream_closed(peer_id).unwrap();
drop(async_rx);
if let Err(error::Error::ConnectionClosed) =
notif.send_async_notification(&peer_id.into(), vec![1, 3, 3, 7]).await
{
} else {
panic!("invalid state after calling `send_async_notification()` on closed connection")
}
}
#[tokio::test]
async fn cloned_service_opening_substream_works() {
let (proto, mut notif1) = notification_service("/proto/1".into());
let (_sink, _async_rx, _) = NotificationsSink::new(PeerId::random());
let (handle, _stream) = proto.split();
let mut notif2 = notif1.clone().unwrap();
let peer_id = PeerId::random();
let ValidationCallResult::WaitForValidation(mut result_rx) =
handle.report_incoming_substream(peer_id, vec![1, 3, 3, 7]).unwrap()
else {
panic!("peerset not enabled");
};
if let Some(NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx }) =
notif1.next_event().await
{
assert_eq!(peer_id, peer.into());
assert_eq!(handshake, vec![1, 3, 3, 7]);
let _ = result_tx.send(ValidationResult::Accept).unwrap();
} else {
panic!("invalid event received");
}
assert!(result_rx.try_recv().is_err());
if let Some(NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx }) =
notif2.next_event().await
{
assert_eq!(peer_id, peer.into());
assert_eq!(handshake, vec![1, 3, 3, 7]);
result_tx.send(ValidationResult::Accept).unwrap();
} else {
panic!("invalid event received");
}
assert_eq!(result_rx.await.unwrap(), ValidationResult::Accept);
}
#[tokio::test]
async fn cloned_service_one_service_rejects_substream() {
let (proto, mut notif1) = notification_service("/proto/1".into());
let (_sink, _async_rx, _) = NotificationsSink::new(PeerId::random());
let (handle, _stream) = proto.split();
let mut notif2 = notif1.clone().unwrap();
let mut notif3 = notif2.clone().unwrap();
let peer_id = PeerId::random();
let ValidationCallResult::WaitForValidation(mut result_rx) =
handle.report_incoming_substream(peer_id, vec![1, 3, 3, 7]).unwrap()
else {
panic!("peerset not enabled");
};
for notif in vec![&mut notif1, &mut notif2] {
if let Some(NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx }) =
notif.next_event().await
{
assert_eq!(peer_id, peer.into());
assert_eq!(handshake, vec![1, 3, 3, 7]);
let _ = result_tx.send(ValidationResult::Accept).unwrap();
} else {
panic!("invalid event received");
}
}
assert!(result_rx.try_recv().is_err());
if let Some(NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx }) =
notif3.next_event().await
{
assert_eq!(peer_id, peer.into());
assert_eq!(handshake, vec![1, 3, 3, 7]);
let _ = result_tx.send(ValidationResult::Reject).unwrap();
} else {
panic!("invalid event received");
}
assert_eq!(result_rx.await.unwrap(), ValidationResult::Reject);
}
#[tokio::test]
async fn cloned_service_opening_substream_sending_and_receiving_notifications_work() {
let (proto, mut notif1) = notification_service("/proto/1".into());
let (sink, _, mut sync_rx) = NotificationsSink::new(PeerId::random());
let (mut handle, _stream) = proto.split();
let mut notif2 = notif1.clone().unwrap();
let mut notif3 = notif1.clone().unwrap();
let peer_id = PeerId::random();
let ValidationCallResult::WaitForValidation(result_rx) =
handle.report_incoming_substream(peer_id, vec![1, 3, 3, 7]).unwrap()
else {
panic!("peerset not enabled");
};
for notif in vec![&mut notif1, &mut notif2, &mut notif3] {
if let Some(NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx }) =
notif.next_event().await
{
assert_eq!(peer_id, peer.into());
assert_eq!(handshake, vec![1, 3, 3, 7]);
let _ = result_tx.send(ValidationResult::Accept).unwrap();
} else {
panic!("invalid event received");
}
}
assert_eq!(result_rx.await.unwrap(), ValidationResult::Accept);
handle
.report_substream_opened(peer_id, Direction::Inbound, vec![1, 3, 3, 7], None, sink)
.unwrap();
for notif in vec![&mut notif1, &mut notif2, &mut notif3] {
if let Some(NotificationEvent::NotificationStreamOpened {
peer,
negotiated_fallback,
handshake,
direction,
}) = notif.next_event().await
{
assert_eq!(peer_id, peer.into());
assert_eq!(negotiated_fallback, None);
assert_eq!(handshake, vec![1, 3, 3, 7]);
assert_eq!(direction, Direction::Inbound);
} else {
panic!("invalid event received");
}
}
handle.report_notification_received(peer_id, vec![1, 3, 3, 8]).unwrap();
for notif in vec![&mut notif1, &mut notif2, &mut notif3] {
if let Some(NotificationEvent::NotificationReceived { peer, notification }) =
notif.next_event().await
{
assert_eq!(peer_id, peer.into());
assert_eq!(notification, vec![1, 3, 3, 8]);
} else {
panic!("invalid event received");
}
}
for (i, notif) in vec![&mut notif1, &mut notif2, &mut notif3].iter_mut().enumerate() {
notif.send_sync_notification(&peer_id.into(), vec![1, 3, 3, i as u8]);
assert_eq!(
sync_rx.next().await,
Some(NotificationsSinkMessage::Notification { message: vec![1, 3, 3, i as u8] })
);
}
handle.report_substream_closed(peer_id).unwrap();
for notif in vec![&mut notif1, &mut notif2, &mut notif3] {
if let Some(NotificationEvent::NotificationStreamClosed { peer }) = notif.next_event().await
{
assert_eq!(peer_id, peer.into());
} else {
panic!("invalid event received");
}
}
}
#[tokio::test]
async fn sending_notifications_using_notifications_sink_works() {
let (proto, mut notif) = notification_service("/proto/1".into());
let (sink, mut async_rx, mut sync_rx) = NotificationsSink::new(PeerId::random());
let (mut handle, _stream) = proto.split();
let peer_id = PeerId::random();
let ValidationCallResult::WaitForValidation(result_rx) =
handle.report_incoming_substream(peer_id, vec![1, 3, 3, 7]).unwrap()
else {
panic!("peerset not enabled");
};
if let Some(NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx }) =
notif.next_event().await
{
assert_eq!(peer_id, peer.into());
assert_eq!(handshake, vec![1, 3, 3, 7]);
let _ = result_tx.send(ValidationResult::Accept).unwrap();
} else {
panic!("invalid event received");
}
assert_eq!(result_rx.await.unwrap(), ValidationResult::Accept);
handle
.report_substream_opened(peer_id, Direction::Inbound, vec![1, 3, 3, 7], None, sink)
.unwrap();
if let Some(NotificationEvent::NotificationStreamOpened {
peer,
negotiated_fallback,
handshake,
direction,
}) = notif.next_event().await
{
assert_eq!(peer_id, peer.into());
assert_eq!(negotiated_fallback, None);
assert_eq!(handshake, vec![1, 3, 3, 7]);
assert_eq!(direction, Direction::Inbound);
} else {
panic!("invalid event received");
}
let sink = notif.message_sink(&peer_id.into()).unwrap();
sink.send_sync_notification(vec![1, 3, 3, 6]);
let _ = sink.send_async_notification(vec![1, 3, 3, 7]).await.unwrap();
assert_eq!(
sync_rx.next().await,
Some(NotificationsSinkMessage::Notification { message: vec![1, 3, 3, 6] }),
);
assert_eq!(
async_rx.next().await,
Some(NotificationsSinkMessage::Notification { message: vec![1, 3, 3, 7] }),
);
notif.send_sync_notification(&peer_id.into(), vec![1, 3, 3, 8]);
notif.send_async_notification(&peer_id.into(), vec![1, 3, 3, 9]).await.unwrap();
assert_eq!(
sync_rx.next().await,
Some(NotificationsSinkMessage::Notification { message: vec![1, 3, 3, 8] }),
);
assert_eq!(
async_rx.next().await,
Some(NotificationsSinkMessage::Notification { message: vec![1, 3, 3, 9] }),
);
}
#[test]
fn try_to_get_notifications_sink_for_non_existent_peer() {
let (_proto, notif) = notification_service("/proto/1".into());
assert!(notif.message_sink(&sc_network_types::PeerId::random()).is_none());
}
#[tokio::test]
async fn notification_sink_replaced() {
let (proto, mut notif) = notification_service("/proto/1".into());
let (sink, mut async_rx, mut sync_rx) = NotificationsSink::new(PeerId::random());
let (mut handle, _stream) = proto.split();
let peer_id = PeerId::random();
let ValidationCallResult::WaitForValidation(result_rx) =
handle.report_incoming_substream(peer_id, vec![1, 3, 3, 7]).unwrap()
else {
panic!("peerset not enabled");
};
if let Some(NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx }) =
notif.next_event().await
{
assert_eq!(peer_id, peer.into());
assert_eq!(handshake, vec![1, 3, 3, 7]);
let _ = result_tx.send(ValidationResult::Accept).unwrap();
} else {
panic!("invalid event received");
}
assert_eq!(result_rx.await.unwrap(), ValidationResult::Accept);
handle
.report_substream_opened(peer_id, Direction::Inbound, vec![1, 3, 3, 7], None, sink)
.unwrap();
if let Some(NotificationEvent::NotificationStreamOpened {
peer,
negotiated_fallback,
handshake,
direction,
}) = notif.next_event().await
{
assert_eq!(peer_id, peer.into());
assert_eq!(negotiated_fallback, None);
assert_eq!(handshake, vec![1, 3, 3, 7]);
assert_eq!(direction, Direction::Inbound);
} else {
panic!("invalid event received");
}
let sink = notif.message_sink(&peer_id.into()).unwrap();
sink.send_sync_notification(vec![1, 3, 3, 6]);
let _ = sink.send_async_notification(vec![1, 3, 3, 7]).await.unwrap();
assert_eq!(
sync_rx.next().await,
Some(NotificationsSinkMessage::Notification { message: vec![1, 3, 3, 6] }),
);
assert_eq!(
async_rx.next().await,
Some(NotificationsSinkMessage::Notification { message: vec![1, 3, 3, 7] }),
);
notif.send_sync_notification(&peer_id.into(), vec![1, 3, 3, 8]);
notif.send_async_notification(&peer_id.into(), vec![1, 3, 3, 9]).await.unwrap();
assert_eq!(
sync_rx.next().await,
Some(NotificationsSinkMessage::Notification { message: vec![1, 3, 3, 8] }),
);
assert_eq!(
async_rx.next().await,
Some(NotificationsSinkMessage::Notification { message: vec![1, 3, 3, 9] }),
);
let (new_sink, mut new_async_rx, mut new_sync_rx) = NotificationsSink::new(PeerId::random());
handle.report_notification_sink_replaced(peer_id, new_sink).unwrap();
drop(sync_rx);
drop(async_rx);
futures::future::poll_fn(|cx| {
let _ = std::pin::Pin::new(&mut notif.next_event()).poll(cx);
std::task::Poll::Ready(())
})
.await;
notif.send_sync_notification(&peer_id.into(), vec![1, 3, 3, 8]);
notif.send_async_notification(&peer_id.into(), vec![1, 3, 3, 9]).await.unwrap();
assert_eq!(
new_sync_rx.next().await,
Some(NotificationsSinkMessage::Notification { message: vec![1, 3, 3, 8] }),
);
assert_eq!(
new_async_rx.next().await,
Some(NotificationsSinkMessage::Notification { message: vec![1, 3, 3, 9] }),
);
sink.send_sync_notification(vec![1, 3, 3, 6]);
let _ = sink.send_async_notification(vec![1, 3, 3, 7]).await.unwrap();
assert_eq!(
new_sync_rx.next().await,
Some(NotificationsSinkMessage::Notification { message: vec![1, 3, 3, 6] }),
);
assert_eq!(
new_async_rx.next().await,
Some(NotificationsSinkMessage::Notification { message: vec![1, 3, 3, 7] }),
);
}
#[tokio::test]
async fn set_handshake() {
let (proto, mut notif) = notification_service("/proto/1".into());
let (_handle, mut stream) = proto.split();
assert!(notif.try_set_handshake(vec![1, 3, 3, 7]).is_ok());
match stream.next().await {
Some(NotificationCommand::SetHandshake(handshake)) => {
assert_eq!(handshake, vec![1, 3, 3, 7]);
},
_ => panic!("invalid event received"),
}
for _ in 0..COMMAND_QUEUE_SIZE {
assert!(notif.try_set_handshake(vec![1, 3, 3, 7]).is_ok());
}
assert!(notif.try_set_handshake(vec![1, 3, 3, 7]).is_err());
}