soil_network/sync/service/
network.rs1use futures::{channel::oneshot, StreamExt};
8use soil_network::types::PeerId;
9
10use soil_client::utils::mpsc::{
11 tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender,
12};
13use soil_network::{
14 request_responses::{IfDisconnected, RequestFailure},
15 types::ProtocolName,
16 NetworkPeers, NetworkRequest, ReputationChange,
17};
18
19use std::sync::Arc;
20
21pub trait Network: NetworkPeers + NetworkRequest {}
23
24impl<T> Network for T where T: NetworkPeers + NetworkRequest {}
25
26pub struct NetworkServiceProvider {
31 rx: TracingUnboundedReceiver<ToServiceCommand>,
32 handle: NetworkServiceHandle,
33}
34
35#[derive(Debug)]
37pub enum ToServiceCommand {
38 DisconnectPeer(PeerId, ProtocolName),
40
41 ReportPeer(PeerId, ReputationChange),
43
44 StartRequest(
46 PeerId,
47 ProtocolName,
48 Vec<u8>,
49 oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
50 IfDisconnected,
51 ),
52}
53
54#[derive(Debug, Clone)]
57pub struct NetworkServiceHandle {
58 tx: TracingUnboundedSender<ToServiceCommand>,
59}
60
61impl NetworkServiceHandle {
62 pub fn new(tx: TracingUnboundedSender<ToServiceCommand>) -> NetworkServiceHandle {
64 Self { tx }
65 }
66
67 pub fn report_peer(&self, who: PeerId, cost_benefit: ReputationChange) {
69 let _ = self.tx.unbounded_send(ToServiceCommand::ReportPeer(who, cost_benefit));
70 }
71
72 pub fn disconnect_peer(&self, who: PeerId, protocol: ProtocolName) {
74 let _ = self.tx.unbounded_send(ToServiceCommand::DisconnectPeer(who, protocol));
75 }
76
77 pub fn start_request(
79 &self,
80 who: PeerId,
81 protocol: ProtocolName,
82 request: Vec<u8>,
83 tx: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
84 connect: IfDisconnected,
85 ) {
86 let _ = self
87 .tx
88 .unbounded_send(ToServiceCommand::StartRequest(who, protocol, request, tx, connect));
89 }
90}
91
92impl NetworkServiceProvider {
93 pub fn new() -> Self {
95 let (tx, rx) = tracing_unbounded("mpsc_network_service_provider", 100_000);
96
97 Self { rx, handle: NetworkServiceHandle::new(tx) }
98 }
99
100 pub fn handle(&self) -> NetworkServiceHandle {
102 self.handle.clone()
103 }
104
105 pub async fn run(self, service: Arc<dyn Network + Send + Sync>) {
107 let Self { mut rx, handle } = self;
108 drop(handle);
109
110 while let Some(inner) = rx.next().await {
111 match inner {
112 ToServiceCommand::DisconnectPeer(peer, protocol_name) => {
113 service.disconnect_peer(peer, protocol_name)
114 },
115 ToServiceCommand::ReportPeer(peer, reputation_change) => {
116 service.report_peer(peer, reputation_change)
117 },
118 ToServiceCommand::StartRequest(peer, protocol, request, tx, connect) => {
119 service.start_request(peer, protocol, request, None, tx, connect)
120 },
121 }
122 }
123 }
124}
125
126#[cfg(test)]
127mod tests {
128 use super::*;
129 use crate::sync::service::mock::MockNetwork;
130
131 #[tokio::test]
134 async fn disconnect_and_report_peer() {
135 let provider = NetworkServiceProvider::new();
136 let handle = provider.handle();
137
138 let peer = PeerId::random();
139 let proto = ProtocolName::from("test-protocol");
140 let proto_clone = proto.clone();
141 let change = soil_network::ReputationChange::new_fatal("test-change");
142
143 let mut mock_network = MockNetwork::new();
144 mock_network
145 .expect_disconnect_peer()
146 .withf(move |in_peer, in_proto| &peer == in_peer && &proto == in_proto)
147 .once()
148 .returning(|_, _| ());
149 mock_network
150 .expect_report_peer()
151 .withf(move |in_peer, in_change| &peer == in_peer && &change == in_change)
152 .once()
153 .returning(|_, _| ());
154
155 tokio::spawn(async move {
156 provider.run(Arc::new(mock_network)).await;
157 });
158
159 handle.disconnect_peer(peer, proto_clone);
160 handle.report_peer(peer, change);
161 }
162}