nakamoto_client/
service.rs

1//! TODO
2use std::borrow::{Borrow, Cow};
3use std::collections::HashMap;
4use std::net;
5use std::sync::Arc;
6
7use nakamoto_chain::BlockTree;
8use nakamoto_common::bitcoin::consensus::Encodable;
9use nakamoto_common::block::time::{AdjustedClock, LocalTime};
10use nakamoto_net::{Disconnect, Io, Link, StateMachine};
11use nakamoto_p2p as p2p;
12
13use crate::client::Config;
14use crate::peer;
15use nakamoto_common::block::filter;
16
17/// Client service. Wraps a state machine and handles decoding and encoding of network messages.
18pub struct Service<T, F, P, C> {
19    inboxes: HashMap<net::SocketAddr, p2p::stream::Decoder>,
20    machine: p2p::StateMachine<T, F, P, C>,
21}
22
23impl<T: BlockTree, F: filter::Filters, P: peer::Store, C: AdjustedClock<net::SocketAddr>>
24    Service<T, F, P, C>
25{
26    /// Create a new client service.
27    pub fn new(
28        tree: T,
29        filters: F,
30        peers: P,
31        clock: C,
32        rng: fastrand::Rng,
33        config: Config,
34    ) -> Self {
35        Self {
36            inboxes: HashMap::new(),
37            machine: p2p::StateMachine::new(
38                tree,
39                filters,
40                peers,
41                clock,
42                rng,
43                p2p::Config {
44                    network: config.network,
45                    domains: config.domains,
46                    connect: config.connect,
47                    user_agent: config.user_agent,
48                    hooks: config.hooks,
49                    limits: config.limits,
50                    services: config.services,
51
52                    ..p2p::Config::default()
53                },
54            ),
55        }
56    }
57}
58
59impl<T, F, P, C> nakamoto_net::Service for Service<T, F, P, C>
60where
61    T: BlockTree,
62    F: filter::Filters,
63    P: peer::Store,
64    C: AdjustedClock<net::SocketAddr>,
65{
66    type Command = p2p::Command;
67
68    fn command_received(&mut self, cmd: Self::Command) {
69        // TODO: Commands shouldn't be handled by the inner state machine.
70        self.machine.command(cmd)
71    }
72}
73
74impl<T, F, P, C> StateMachine for Service<T, F, P, C>
75where
76    T: BlockTree,
77    F: filter::Filters,
78    P: peer::Store,
79    C: AdjustedClock<net::SocketAddr>,
80{
81    type Message = [u8];
82    type Event = p2p::Event;
83    type DisconnectReason = p2p::DisconnectReason;
84
85    fn initialize(&mut self, time: LocalTime) {
86        self.machine.initialize(time);
87    }
88
89    fn tick(&mut self, local_time: LocalTime) {
90        self.machine.tick(local_time);
91    }
92
93    fn timer_expired(&mut self) {
94        self.machine.timer_expired();
95    }
96
97    fn message_received(&mut self, addr: &net::SocketAddr, bytes: Cow<[u8]>) {
98        if let Some(inbox) = self.inboxes.get_mut(addr) {
99            inbox.input(bytes.borrow());
100
101            loop {
102                match inbox.decode_next() {
103                    Ok(Some(msg)) => self.machine.message_received(addr, Cow::Owned(msg)),
104                    Ok(None) => break,
105
106                    Err(err) => {
107                        log::error!("Invalid message received from {}: {}", addr, err);
108
109                        self.machine
110                            .disconnect(*addr, p2p::DisconnectReason::DecodeError(Arc::new(err)));
111
112                        return;
113                    }
114                }
115            }
116        } else {
117            log::debug!("Received message from unknown peer {}", addr);
118        }
119    }
120
121    fn attempted(&mut self, addr: &net::SocketAddr) {
122        self.machine.attempted(addr)
123    }
124
125    fn connected(&mut self, addr: net::SocketAddr, local_addr: &net::SocketAddr, link: Link) {
126        self.inboxes.insert(addr, p2p::stream::Decoder::new(1024));
127        self.machine.connected(addr, local_addr, link)
128    }
129
130    fn disconnected(&mut self, addr: &net::SocketAddr, reason: Disconnect<Self::DisconnectReason>) {
131        self.inboxes.remove(addr);
132        self.machine.disconnected(addr, reason)
133    }
134}
135
136impl<T, F, P, C> Iterator for Service<T, F, P, C> {
137    type Item = Io<Vec<u8>, p2p::Event, p2p::DisconnectReason>;
138
139    fn next(&mut self) -> Option<Self::Item> {
140        match self.machine.next() {
141            Some(Io::Write(addr, msg)) => {
142                log::debug!("Write {:?} to {}", &msg, addr.ip());
143                let mut buf = Vec::new();
144
145                msg.consensus_encode(&mut buf)
146                    .expect("writing to an in-memory buffer doesn't fail");
147
148                Some(Io::Write(addr, buf))
149            }
150            Some(Io::Event(e)) => Some(Io::Event(e)),
151            Some(Io::Connect(a)) => Some(Io::Connect(a)),
152            Some(Io::Disconnect(a, r)) => Some(Io::Disconnect(a, r)),
153            Some(Io::SetTimer(d)) => Some(Io::SetTimer(d)),
154
155            None => None,
156        }
157    }
158}