1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
// Copyright 2019 MaidSafe.net limited.
//
// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. Please review the Licences for the specific language governing
// permissions and limitations relating to use of the SAFE Network Software.

mod connection_group;

use crate::{
    client::SafeKey, network_event::NetworkEvent, network_event::NetworkTx, CoreError, CoreFuture,
};
use crate::{fry, ok};
use connection_group::ConnectionGroup;
use futures::{future, Future};
use log::{error, trace};
use quic_p2p::Config as QuicP2pConfig;
use safe_nd::{Message, PublicId, Response};
use std::{
    cell::RefCell,
    collections::{hash_map::Entry, HashMap},
    rc::Rc,
    time::Duration,
};
use tokio::util::FutureExt;

const CONNECTION_TIMEOUT_SECS: u64 = 30;

/// Initialises `QuicP2p` instance. Establishes new connections.
/// Contains a reference to crossbeam channel provided by quic-p2p for capturing the events.
#[derive(Clone)]
pub struct ConnectionManager {
    inner: Rc<RefCell<Inner>>,
}

impl ConnectionManager {
    /// Create a new connection manager.
    pub fn new(mut config: QuicP2pConfig, net_tx: &NetworkTx) -> Result<Self, CoreError> {
        config.port = Some(0); // Make sure we always use a random port for client connections.

        let inner = Rc::new(RefCell::new(Inner {
            config,
            groups: HashMap::default(),
            net_tx: net_tx.clone(),
        }));

        Ok(Self { inner })
    }

    /// Returns `true` if this connection manager is already connected to a Client Handlers
    /// group serving the provided public ID.
    pub fn has_connection_to(&self, pub_id: &PublicId) -> bool {
        let inner = self.inner.borrow();
        inner.groups.contains_key(&pub_id)
    }

    /// Send `message` via the `ConnectionGroup` specified by our given `pub_id`.
    pub fn send(&mut self, pub_id: &PublicId, msg: &Message) -> Box<CoreFuture<Response>> {
        self.inner.borrow_mut().send(pub_id, msg)
    }

    /// Connect to Client Handlers that manage the provided ID.
    pub fn bootstrap(&mut self, full_id: SafeKey) -> Box<CoreFuture<()>> {
        self.inner.borrow_mut().bootstrap(full_id)
    }

    /// Reconnect to the network.
    pub fn restart_network(&mut self) {
        unimplemented!();
    }

    /// Disconnect from a group.
    pub fn disconnect(&mut self, pub_id: &PublicId) -> Box<CoreFuture<()>> {
        self.inner.borrow_mut().disconnect(pub_id)
    }
}

struct Inner {
    config: QuicP2pConfig,
    groups: HashMap<PublicId, ConnectionGroup>,
    net_tx: NetworkTx,
}

impl Drop for Inner {
    fn drop(&mut self) {
        // Disconnect from all groups gracefully
        trace!("Dropped ConnectionManager - terminating gracefully");
        let _ = self.net_tx.unbounded_send(NetworkEvent::Disconnected);
    }
}

impl Inner {
    fn bootstrap(&mut self, full_id: SafeKey) -> Box<CoreFuture<()>> {
        trace!("Trying to bootstrap with group {:?}", full_id.public_id());

        let (connected_tx, connected_rx) = futures::oneshot();

        if let Entry::Vacant(value) = self.groups.entry(full_id.public_id()) {
            let _ = value.insert(fry!(ConnectionGroup::new(
                self.config.clone(),
                full_id,
                connected_tx
            )));
            Box::new(
                connected_rx
                    .map_err(|err| CoreError::from(format!("{}", err)))
                    .and_then(|res| res)
                    .timeout(Duration::from_secs(CONNECTION_TIMEOUT_SECS))
                    .map_err(|e| {
                        if let Some(err) = e.into_inner() {
                            // Do not swallow the original error in case if it's not a timeout.
                            err
                        } else {
                            CoreError::RequestTimeout
                        }
                    }),
            )
        } else {
            trace!("Group {} is already connected", full_id.public_id());
            ok!(())
        }
    }

    fn send(&mut self, pub_id: &PublicId, msg: &Message) -> Box<CoreFuture<Response>> {
        let msg_id = if let Message::Request { message_id, .. } = msg {
            *message_id
        } else {
            return Box::new(future::err(CoreError::Unexpected(
                "Not a Request".to_string(),
            )));
        };

        let conn_group = fry!(self.groups.get_mut(&pub_id).ok_or_else(|| {
            CoreError::Unexpected(
                "No connection group found - did you call `bootstrap`?".to_string(),
            )
        }));

        conn_group.send(msg_id, msg)
    }

    /// Disconnect from a group.
    pub fn disconnect(&mut self, pub_id: &PublicId) -> Box<CoreFuture<()>> {
        trace!("Disconnecting group {:?}", pub_id);

        let group = self.groups.remove(&pub_id);

        if let Some(mut group) = group {
            Box::new(group.close().map(move |res| {
                // Drop the group once it's disconnected
                let _ = group;
                res
            }))
        } else {
            error!("No group found for {}", pub_id); // FIXME: handle properly
            ok!(())
        }
    }
}