1use std::borrow::{Borrow, Cow};
3use std::collections::HashMap;
4use std::net;
5use std::sync::Arc;
6
7use nakamoto_chain::BlockTree;
8use nakamoto_common::bitcoin::consensus::Encodable;
9use nakamoto_common::block::time::{AdjustedClock, LocalTime};
10use nakamoto_net::{Disconnect, Io, Link, StateMachine};
11use nakamoto_p2p as p2p;
12
13use crate::client::Config;
14use crate::peer;
15use nakamoto_common::block::filter;
16
17pub struct Service<T, F, P, C> {
19 inboxes: HashMap<net::SocketAddr, p2p::stream::Decoder>,
20 machine: p2p::StateMachine<T, F, P, C>,
21}
22
23impl<T: BlockTree, F: filter::Filters, P: peer::Store, C: AdjustedClock<net::SocketAddr>>
24 Service<T, F, P, C>
25{
26 pub fn new(
28 tree: T,
29 filters: F,
30 peers: P,
31 clock: C,
32 rng: fastrand::Rng,
33 config: Config,
34 ) -> Self {
35 Self {
36 inboxes: HashMap::new(),
37 machine: p2p::StateMachine::new(
38 tree,
39 filters,
40 peers,
41 clock,
42 rng,
43 p2p::Config {
44 network: config.network,
45 domains: config.domains,
46 connect: config.connect,
47 user_agent: config.user_agent,
48 hooks: config.hooks,
49 limits: config.limits,
50 services: config.services,
51
52 ..p2p::Config::default()
53 },
54 ),
55 }
56 }
57}
58
59impl<T, F, P, C> nakamoto_net::Service for Service<T, F, P, C>
60where
61 T: BlockTree,
62 F: filter::Filters,
63 P: peer::Store,
64 C: AdjustedClock<net::SocketAddr>,
65{
66 type Command = p2p::Command;
67
68 fn command_received(&mut self, cmd: Self::Command) {
69 self.machine.command(cmd)
71 }
72}
73
74impl<T, F, P, C> StateMachine for Service<T, F, P, C>
75where
76 T: BlockTree,
77 F: filter::Filters,
78 P: peer::Store,
79 C: AdjustedClock<net::SocketAddr>,
80{
81 type Message = [u8];
82 type Event = p2p::Event;
83 type DisconnectReason = p2p::DisconnectReason;
84
85 fn initialize(&mut self, time: LocalTime) {
86 self.machine.initialize(time);
87 }
88
89 fn tick(&mut self, local_time: LocalTime) {
90 self.machine.tick(local_time);
91 }
92
93 fn timer_expired(&mut self) {
94 self.machine.timer_expired();
95 }
96
97 fn message_received(&mut self, addr: &net::SocketAddr, bytes: Cow<[u8]>) {
98 if let Some(inbox) = self.inboxes.get_mut(addr) {
99 inbox.input(bytes.borrow());
100
101 loop {
102 match inbox.decode_next() {
103 Ok(Some(msg)) => self.machine.message_received(addr, Cow::Owned(msg)),
104 Ok(None) => break,
105
106 Err(err) => {
107 log::error!("Invalid message received from {}: {}", addr, err);
108
109 self.machine
110 .disconnect(*addr, p2p::DisconnectReason::DecodeError(Arc::new(err)));
111
112 return;
113 }
114 }
115 }
116 } else {
117 log::debug!("Received message from unknown peer {}", addr);
118 }
119 }
120
121 fn attempted(&mut self, addr: &net::SocketAddr) {
122 self.machine.attempted(addr)
123 }
124
125 fn connected(&mut self, addr: net::SocketAddr, local_addr: &net::SocketAddr, link: Link) {
126 self.inboxes.insert(addr, p2p::stream::Decoder::new(1024));
127 self.machine.connected(addr, local_addr, link)
128 }
129
130 fn disconnected(&mut self, addr: &net::SocketAddr, reason: Disconnect<Self::DisconnectReason>) {
131 self.inboxes.remove(addr);
132 self.machine.disconnected(addr, reason)
133 }
134}
135
136impl<T, F, P, C> Iterator for Service<T, F, P, C> {
137 type Item = Io<Vec<u8>, p2p::Event, p2p::DisconnectReason>;
138
139 fn next(&mut self) -> Option<Self::Item> {
140 match self.machine.next() {
141 Some(Io::Write(addr, msg)) => {
142 log::debug!("Write {:?} to {}", &msg, addr.ip());
143 let mut buf = Vec::new();
144
145 msg.consensus_encode(&mut buf)
146 .expect("writing to an in-memory buffer doesn't fail");
147
148 Some(Io::Write(addr, buf))
149 }
150 Some(Io::Event(e)) => Some(Io::Event(e)),
151 Some(Io::Connect(a)) => Some(Io::Connect(a)),
152 Some(Io::Disconnect(a, r)) => Some(Io::Disconnect(a, r)),
153 Some(Io::SetTimer(d)) => Some(Io::SetTimer(d)),
154
155 None => None,
156 }
157 }
158}