pezsc_network_sync/service/
network.rs1use futures::{channel::oneshot, StreamExt};
20use pezsc_network_types::PeerId;
21
22use pezsc_network::{
23 request_responses::{IfDisconnected, RequestFailure},
24 types::ProtocolName,
25 NetworkPeers, NetworkRequest, ReputationChange,
26};
27use pezsc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
28
29use std::sync::Arc;
30
31pub trait Network: NetworkPeers + NetworkRequest {}
33
34impl<T> Network for T where T: NetworkPeers + NetworkRequest {}
35
36pub struct NetworkServiceProvider {
41 rx: TracingUnboundedReceiver<ToServiceCommand>,
42 handle: NetworkServiceHandle,
43}
44
45#[derive(Debug)]
47pub enum ToServiceCommand {
48 DisconnectPeer(PeerId, ProtocolName),
50
51 ReportPeer(PeerId, ReputationChange),
53
54 StartRequest(
56 PeerId,
57 ProtocolName,
58 Vec<u8>,
59 oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
60 IfDisconnected,
61 ),
62}
63
64#[derive(Debug, Clone)]
67pub struct NetworkServiceHandle {
68 tx: TracingUnboundedSender<ToServiceCommand>,
69}
70
71impl NetworkServiceHandle {
72 pub fn new(tx: TracingUnboundedSender<ToServiceCommand>) -> NetworkServiceHandle {
74 Self { tx }
75 }
76
77 pub fn report_peer(&self, who: PeerId, cost_benefit: ReputationChange) {
79 let _ = self.tx.unbounded_send(ToServiceCommand::ReportPeer(who, cost_benefit));
80 }
81
82 pub fn disconnect_peer(&self, who: PeerId, protocol: ProtocolName) {
84 let _ = self.tx.unbounded_send(ToServiceCommand::DisconnectPeer(who, protocol));
85 }
86
87 pub fn start_request(
89 &self,
90 who: PeerId,
91 protocol: ProtocolName,
92 request: Vec<u8>,
93 tx: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
94 connect: IfDisconnected,
95 ) {
96 let _ = self
97 .tx
98 .unbounded_send(ToServiceCommand::StartRequest(who, protocol, request, tx, connect));
99 }
100}
101
102impl NetworkServiceProvider {
103 pub fn new() -> Self {
105 let (tx, rx) = tracing_unbounded("mpsc_network_service_provider", 100_000);
106
107 Self { rx, handle: NetworkServiceHandle::new(tx) }
108 }
109
110 pub fn handle(&self) -> NetworkServiceHandle {
112 self.handle.clone()
113 }
114
115 pub async fn run(self, service: Arc<dyn Network + Send + Sync>) {
117 let Self { mut rx, handle } = self;
118 drop(handle);
119
120 while let Some(inner) = rx.next().await {
121 match inner {
122 ToServiceCommand::DisconnectPeer(peer, protocol_name) => {
123 service.disconnect_peer(peer, protocol_name)
124 },
125 ToServiceCommand::ReportPeer(peer, reputation_change) => {
126 service.report_peer(peer, reputation_change)
127 },
128 ToServiceCommand::StartRequest(peer, protocol, request, tx, connect) => {
129 service.start_request(peer, protocol, request, None, tx, connect)
130 },
131 }
132 }
133 }
134}
135
136#[cfg(test)]
137mod tests {
138 use super::*;
139 use crate::service::mock::MockNetwork;
140
141 #[tokio::test]
144 async fn disconnect_and_report_peer() {
145 let provider = NetworkServiceProvider::new();
146 let handle = provider.handle();
147
148 let peer = PeerId::random();
149 let proto = ProtocolName::from("test-protocol");
150 let proto_clone = proto.clone();
151 let change = pezsc_network::ReputationChange::new_fatal("test-change");
152
153 let mut mock_network = MockNetwork::new();
154 mock_network
155 .expect_disconnect_peer()
156 .withf(move |in_peer, in_proto| &peer == in_peer && &proto == in_proto)
157 .once()
158 .returning(|_, _| ());
159 mock_network
160 .expect_report_peer()
161 .withf(move |in_peer, in_change| &peer == in_peer && &change == in_change)
162 .once()
163 .returning(|_, _| ());
164
165 tokio::spawn(async move {
166 provider.run(Arc::new(mock_network)).await;
167 });
168
169 handle.disconnect_peer(peer, proto_clone);
170 handle.report_peer(peer, change);
171 }
172}