1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
use std::io;
use std::net;
use std::path::Path;
use std::sync::Arc;
use std::time::{self, SystemTime};

use crossbeam_channel as chan;

use nakamoto_chain::block::cache::BlockCache;
use nakamoto_chain::block::store;

use nakamoto_common::block::store::Store;
use nakamoto_common::block::time::AdjustedTime;
use nakamoto_common::block::tree::{self, BlockTree, ImportResult};
use nakamoto_common::block::{Block, BlockHash, BlockHeader, Height, Transaction};

use nakamoto_p2p as p2p;
use nakamoto_p2p::address_book::AddressBook;
use nakamoto_p2p::bitcoin::network::message::{NetworkMessage, RawNetworkMessage};
use nakamoto_p2p::bitcoin::util::hash::BitcoinHash;
use nakamoto_p2p::protocol::bitcoin::Command;
use nakamoto_p2p::protocol::bitcoin::{self, Network};
use nakamoto_p2p::protocol::Link;
use nakamoto_p2p::reactor::poll::Waker;

pub use nakamoto_p2p::event::Event;

use crate::error::Error;
use crate::handle::{self, Handle};

/// Node configuration.
#[derive(Debug, Clone)]
pub struct NodeConfig {
    pub discovery: bool,
    pub listen: Vec<net::SocketAddr>,
    pub network: Network,
    pub address_book: AddressBook,
    pub timeout: time::Duration,
    pub name: &'static str,
}

impl NodeConfig {
    pub fn named(name: &'static str) -> Self {
        Self {
            name,
            ..Self::default()
        }
    }
}

impl Default for NodeConfig {
    fn default() -> Self {
        Self {
            discovery: true,
            listen: vec![([0, 0, 0, 0], 0).into()],
            network: Network::default(),
            address_book: AddressBook::default(),
            timeout: time::Duration::from_secs(60),
            name: "self",
        }
    }
}

/// A light-node process.
pub struct Node {
    pub config: NodeConfig,

    commands: chan::Receiver<Command>,
    handle: chan::Sender<Command>,
    events: chan::Receiver<Event<NetworkMessage>>,
    reactor: nakamoto_p2p::reactor::poll::Reactor<net::TcpStream, RawNetworkMessage, Command>,
}

impl Node {
    /// Create a new node.
    pub fn new(config: NodeConfig) -> Result<Self, Error> {
        let (handle, commands) = chan::unbounded::<Command>();
        let (subscriber, events) = chan::unbounded::<Event<NetworkMessage>>();
        let reactor = p2p::reactor::poll::Reactor::new(subscriber)?;

        Ok(Self {
            commands,
            events,
            handle,
            reactor,
            config,
        })
    }

    /// Seed the node's address book with peer addresses.
    pub fn seed<S: net::ToSocketAddrs>(&mut self, seeds: Vec<S>) -> Result<(), Error> {
        self.config.address_book.seed(seeds).map_err(Error::from)
    }

    /// Start the node process. This function is meant to be run in its own thread.
    pub fn run(mut self) -> Result<(), Error> {
        let cfg = bitcoin::Config::from(
            self.config.name,
            self.config.network,
            self.config.address_book,
        );
        let genesis = cfg.network.genesis();
        let params = cfg.network.params();

        log::info!("Initializing node ({:?})..", cfg.network);
        log::info!("Genesis block hash is {}", cfg.network.genesis_hash());

        // FIXME: Get path from `NodeConfig`.
        let path = Path::new("headers.db");
        let mut store = match store::File::create(path, genesis) {
            Err(store::Error::Io(e)) if e.kind() == io::ErrorKind::AlreadyExists => {
                log::info!("Found existing store {:?}", path);
                store::File::open(path, genesis)?
            }
            Err(err) => panic!(err.to_string()),
            Ok(store) => {
                log::info!("Initializing new block store {:?}", path);
                store
            }
        };
        if store.check().is_err() {
            log::warn!("Corruption detected in store, healing..");
            store.heal()?; // Rollback store to the last valid header.
        }
        log::info!("Store height = {}", store.height()?);
        log::info!("Loading blocks from store..");

        let local_time = SystemTime::now().into();
        let checkpoints = cfg.network.checkpoints().collect::<Vec<_>>();
        let clock = AdjustedTime::<net::SocketAddr>::new(local_time);
        let cache = BlockCache::from(store, params, &checkpoints)?;
        let rng = fastrand::Rng::new();

        log::info!("{} peer(s) found..", cfg.address_book.len());
        log::debug!("{:?}", cfg.address_book);
        let protocol = p2p::protocol::Bitcoin::new(cache, clock, rng, cfg);

        self.reactor
            .run(protocol, self.commands, &self.config.listen)?;

        Ok(())
    }

    /// Start the node process, supplying the block cache. This function is meant to be run in
    /// its own thread.
    pub fn run_with<T: BlockTree>(mut self, cache: T) -> Result<(), Error> {
        let cfg = bitcoin::Config::from(
            self.config.name,
            self.config.network,
            self.config.address_book,
        );

        log::info!("Initializing node ({:?})..", cfg.network);
        log::info!("Genesis block hash is {}", cfg.network.genesis_hash());
        log::info!("Chain height is {}", cache.height());

        let local_time = SystemTime::now().into();
        let clock = AdjustedTime::<net::SocketAddr>::new(local_time);
        let rng = fastrand::Rng::new();

        log::info!("{} peer(s) found..", cfg.address_book.len());
        log::debug!("{:?}", cfg.address_book);

        let protocol = p2p::protocol::Bitcoin::new(cache, clock, rng, cfg);

        self.reactor
            .run(protocol, self.commands, &self.config.listen)?;

        Ok(())
    }

    /// Create a new handle to communicate with the node.
    pub fn handle(&mut self) -> NodeHandle {
        NodeHandle {
            waker: self.reactor.waker(),
            commands: self.handle.clone(),
            events: self.events.clone(),
            timeout: self.config.timeout,
        }
    }
}

/// An instance of [`Handle`] for [`Node`].
pub struct NodeHandle {
    commands: chan::Sender<Command>,
    events: chan::Receiver<Event<NetworkMessage>>,
    waker: Arc<Waker>,
    timeout: time::Duration,
}

impl NodeHandle {
    /// Set the timeout for operations that wait on the network.
    pub fn set_timeout(&mut self, timeout: time::Duration) {
        self.timeout = timeout;
    }

    /// Send a command to the command channel, and wake up the event loop.
    pub fn command(&self, cmd: Command) -> Result<(), handle::Error> {
        self.commands.send(cmd)?;
        self.waker.wake()?;

        Ok(())
    }
}

impl Handle for NodeHandle {
    type Message = NetworkMessage;
    type Event = Event<NetworkMessage>;

    fn get_tip(&self) -> Result<BlockHeader, handle::Error> {
        let (transmit, receive) = chan::bounded::<BlockHeader>(1);
        self.command(Command::GetTip(transmit))?;

        Ok(receive.recv()?)
    }

    fn get_block(&self, hash: &BlockHash) -> Result<Block, handle::Error> {
        self.command(Command::GetBlock(*hash))?;
        self.wait(|e| match e {
            Event::Received(_, NetworkMessage::Block(blk)) if &blk.bitcoin_hash() == hash => {
                Some(blk)
            }
            _ => None,
        })
    }

    fn connect(&self, addr: net::SocketAddr) -> Result<Link, handle::Error> {
        self.command(Command::Connect(addr))?;
        self.wait(|e| match e {
            Event::Connected(a, link)
                if a == addr || (addr.ip().is_unspecified() && a.port() == addr.port()) =>
            {
                Some(link)
            }
            _ => None,
        })
    }

    fn import_headers(
        &self,
        headers: Vec<BlockHeader>,
    ) -> Result<Result<ImportResult, tree::Error>, handle::Error> {
        let (transmit, receive) = chan::bounded::<Result<ImportResult, tree::Error>>(1);
        self.command(Command::ImportHeaders(headers, transmit))?;

        Ok(receive.recv()?)
    }

    fn receive(&self, from: net::SocketAddr, msg: NetworkMessage) -> Result<(), handle::Error> {
        self.command(Command::Receive(from, msg))?;

        Ok(())
    }

    fn submit_transaction(&self, _tx: Transaction) -> Result<(), handle::Error> {
        todo!()
    }

    /// Subscribe to the event feed, and wait for the given function to return something,
    /// or timeout if the specified amount of time has elapsed.
    fn wait<F, T>(&self, f: F) -> Result<T, handle::Error>
    where
        F: Fn(Event<NetworkMessage>) -> Option<T>,
    {
        let start = time::Instant::now();
        let events = self.events.clone();

        loop {
            if let Some(timeout) = self.timeout.checked_sub(start.elapsed()) {
                match events.recv_timeout(timeout) {
                    Ok(event) => {
                        if let Some(t) = f(event) {
                            return Ok(t);
                        }
                    }
                    Err(chan::RecvTimeoutError::Disconnected) => {
                        return Err(handle::Error::Disconnected);
                    }
                    Err(chan::RecvTimeoutError::Timeout) => {
                        // Keep trying until our timeout reaches zero.
                        continue;
                    }
                }
            } else {
                return Err(handle::Error::Timeout);
            }
        }
    }

    fn wait_for_peers(&self, count: usize) -> Result<(), handle::Error> {
        use std::collections::HashSet;

        self.wait(|e| {
            let mut connected = HashSet::new();

            match e {
                Event::Connected(addr, _) => {
                    connected.insert(addr);

                    if connected.len() == count {
                        Some(())
                    } else {
                        None
                    }
                }
                _ => None,
            }
        })
    }

    fn wait_for_ready(&self) -> Result<(), handle::Error> {
        self.wait(|e| match e {
            Event::Synced => Some(()),
            _ => None,
        })
    }

    fn wait_for_height(&self, h: Height) -> Result<BlockHash, handle::Error> {
        self.wait(|e| match e {
            Event::HeadersImported(ImportResult::TipChanged(hash, height, _)) if height == h => {
                Some(hash)
            }
            _ => None,
        })
    }

    fn shutdown(self) -> Result<(), handle::Error> {
        self.command(Command::Shutdown)?;

        Ok(())
    }
}