1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
// Copyright (C) 2019-2021 Aleo Systems Inc.
// This file is part of the snarkOS library.

// The snarkOS library is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// The snarkOS library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with the snarkOS library. If not, see <https://www.gnu.org/licenses/>.

use std::{
    io::{Error as IoError, ErrorKind},
    time::Duration,
};

use tokio::{net::TcpStream, time::timeout};

use snarkos_metrics::{self as metrics, connections::*, wrapped_mpsc};

use crate::{NetworkError, Node, Peer, PeerEvent, PeerEventData, PeerHandle, Version};

use super::{network::PeerIOHandle, PeerAction};

const CONNECTION_TIMEOUT_SECS: u64 = 3;

impl Peer {
    pub fn connect(mut self, node: Node, event_target: wrapped_mpsc::Sender<PeerEvent>) {
        let (sender, receiver) = wrapped_mpsc::channel::<PeerAction>(snarkos_metrics::queues::PEER_EVENTS, 64);
        tokio::spawn(async move {
            self.set_connecting();
            match self.inner_connect(node.version()).await {
                Err(e) => {
                    self.quality.connect_failed();
                    event_target
                        .send(PeerEvent {
                            address: self.address,
                            data: PeerEventData::FailHandshake,
                        })
                        .await
                        .ok();
                    self.fail();
                    if !e.is_trivial() {
                        error!(
                            "failed to send outgoing connection to peer '{}': '{:?}'",
                            self.address, e
                        );
                    } else {
                        warn!(
                            "failed to send outgoing connection to peer '{}': '{:?}'",
                            self.address, e
                        );
                    }
                }
                Ok(network) => {
                    self.set_connected();
                    metrics::increment_gauge!(CONNECTED, 1.0);
                    event_target
                        .send(PeerEvent {
                            address: self.address,
                            data: PeerEventData::Connected(PeerHandle { sender: sender.clone() }),
                        })
                        .await
                        .ok();

                    if let Err(e) = self.run(node, network, receiver).await {
                        if !e.is_trivial() {
                            self.fail();
                            error!(
                                "unrecoverable failure communicating to outbound peer '{}': '{:?}'",
                                self.address, e
                            );
                        } else {
                            warn!(
                                "unrecoverable failure communicating to outbound peer '{}': '{:?}'",
                                self.address, e
                            );
                        }
                    }
                    metrics::decrement_gauge!(CONNECTED, 1.0);
                }
            }
            self.set_disconnected();
            event_target
                .send(PeerEvent {
                    address: self.address,
                    data: PeerEventData::Disconnect(Box::new(self)),
                })
                .await
                .ok();
        });
    }

    async fn inner_connect(&mut self, our_version: Version) -> Result<PeerIOHandle, NetworkError> {
        metrics::increment_gauge!(CONNECTING, 1.0);
        let _x = defer::defer(|| metrics::decrement_gauge!(CONNECTING, 1.0));

        match timeout(
            Duration::from_secs(CONNECTION_TIMEOUT_SECS),
            TcpStream::connect(self.address),
        )
        .await
        {
            Ok(stream) => self.inner_handshake_initiator(stream?, our_version).await,
            Err(_) => Err(NetworkError::Io(IoError::new(
                ErrorKind::TimedOut,
                "connection timed out",
            ))),
        }
    }
}