risq 0.4.1

Re-implementation of Bisq (https://github.com/bisq-network/bisq) in rust
use super::{
    connection::{Connection, ConnectionId, Request},
    dispatch::SendableDispatcher,
    peers::{Peers, SeedConnection},
    server::event::ServerStarted,
};
use crate::{
    bisq::{
        constants::{seed_nodes, BaseCurrencyNetwork, LOCAL_CAPABILITIES},
        payload::*,
    },
    error::Error,
    prelude::{sync::oneshot, *},
};
use rand::{seq::SliceRandom, thread_rng};
use std::{
    fmt,
    sync::{Arc, RwLock},
};

#[derive(Clone, Copy, PartialEq)]
pub enum BootstrapState {
    PreBootstrap,
    InitialBootstrapInProgress,
    Bootstrapped,
}
impl BootstrapState {
    pub fn init() -> Arc<RwLock<BootstrapState>> {
        Arc::new(RwLock::new(BootstrapState::PreBootstrap))
    }
}
impl fmt::Display for BootstrapState {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            Self::PreBootstrap => write!(f, "PreBootstrap"),
            Self::InitialBootstrapInProgress => write!(f, "InitialBootstrapInProgress"),
            Self::Bootstrapped => write!(f, "Running"),
        }
    }
}

pub struct Bootstrap<D: SendableDispatcher> {
    network: BaseCurrencyNetwork,
    state: Arc<RwLock<BootstrapState>>,
    proxy_port: Option<u16>,
    addr_notify: Option<oneshot::Sender<NodeAddress>>,
    addr_rec: Option<oneshot::Receiver<NodeAddress>>,
    seed_nodes: Vec<NodeAddress>,
    peers: Addr<Peers<D>>,
    dispatcher: D,
}
impl<D: SendableDispatcher> Actor for Bootstrap<D> {
    type Context = Context<Bootstrap<D>>;
    fn started(&mut self, ctx: &mut Self::Context) {
        *self.state.write().expect("Corrupted lock in bootstrap") =
            BootstrapState::InitialBootstrapInProgress;

        let addr = self.seed_nodes.pop().expect("No seed nodes defined");
        ctx.spawn(
            fut::wrap_future(bootstrap_from_seed(
                addr.clone(),
                self.addr_rec.take().expect("Receiver already removed"),
                self.network,
                self.dispatcher.clone(),
                self.proxy_port,
            ))
            .map_err(|_, _, _| ())
            .and_then(move |seed_result, bootstrap: &mut Bootstrap<D>, _ctx| {
                *bootstrap
                    .state
                    .write()
                    .expect("Corrupted lock in bootstrap") = BootstrapState::Bootstrapped;

                fut::wrap_future(
                    bootstrap
                        .peers
                        .send(SeedConnection(
                            addr,
                            seed_result.connection_id,
                            seed_result.connection,
                        ))
                        .map_err(|_| ()),
                )
            })
            .map(|_, _, ctx| ctx.stop()),
        );
    }
}
impl<D: SendableDispatcher> Handler<ServerStarted> for Bootstrap<D> {
    type Result = ();
    fn handle(&mut self, ServerStarted(local_addr): ServerStarted, _ctx: &mut Self::Context) {
        self.addr_notify
            .take()
            .expect("Local addr notifier already used")
            .send(local_addr)
            .map_err(|e| error!("ERR: {:?}", e))
            .expect("Couldn't send local address");
    }
}
impl<D: SendableDispatcher> Bootstrap<D> {
    pub fn start(
        network: BaseCurrencyNetwork,
        state: Arc<RwLock<BootstrapState>>,
        peers: Addr<Peers<D>>,
        dispatcher: D,
        proxy_port: Option<u16>,
        force_seed: Option<NodeAddress>,
    ) -> Addr<Bootstrap<D>> {
        let mut seed_nodes = match force_seed {
            Some(addr) => vec![addr],
            None => seed_nodes(network),
        };
        seed_nodes.shuffle(&mut thread_rng());
        let (addr_notify, addr_rec) = oneshot::channel();
        Self {
            network,
            addr_notify: Some(addr_notify),
            addr_rec: Some(addr_rec),
            proxy_port,
            seed_nodes,
            peers,
            dispatcher,
            state,
        }
        .start()
    }
}
struct SeedResult {
    connection: Addr<Connection>,
    connection_id: ConnectionId,
}
fn bootstrap_from_seed<D: SendableDispatcher>(
    seed_addr: NodeAddress,
    local_addr: oneshot::Receiver<NodeAddress>,
    network: BaseCurrencyNetwork,
    dispatcher: D,
    proxy_port: Option<u16>,
) -> impl Future<Item = SeedResult, Error = Error> {
    let preliminary_get_data_request = PreliminaryGetDataRequest {
        nonce: gen_nonce(),
        excluded_keys: Vec::new(),
        supported_capabilities: LOCAL_CAPABILITIES.clone(),
    };
    info!("Bootstrapping from seed: {:?}", seed_addr);
    Connection::open(seed_addr, network.into(), dispatcher.clone(), proxy_port)
        .and_then(|(id, conn)| {
            debug!("Sending PreliminaryGetDataRequest to seed.");
            conn.send(Request(preliminary_get_data_request))
                .flatten()
                .map(move |response| (id, conn, response))
        })
        .and_then(move |(id, conn, preliminary_data_response)| {
            debug!(
                "Preliminary data response has {} items",
                preliminary_data_response.data_set.len()
                    + preliminary_data_response
                        .persistable_network_payload_items
                        .len()
            );
            let excluded_keys = get_excluded_keys(&preliminary_data_response);
            dispatcher.dispatch(id, preliminary_data_response.into());

            local_addr
                .map(move |addr| {
                    (
                        GetUpdatedDataRequest {
                            sender_node_address: addr.into(),
                            nonce: gen_nonce(),
                            excluded_keys,
                        },
                        id,
                        conn,
                        dispatcher,
                    )
                })
                .map_err(|e| e.into())
        })
        .and_then(|(request, id, conn, dispatcher)| {
            debug!("Sending GetUpdatedDataRequest to seed.");
            conn.send(Request(request))
                .flatten()
                .map(move |get_updated_data_response| {
                    debug!(
                        "Update data response has {} items",
                        get_updated_data_response.data_set.len()
                            + get_updated_data_response
                                .persistable_network_payload_items
                                .len()
                    );
                    dispatcher.dispatch(id, get_updated_data_response.into());
                    SeedResult {
                        connection_id: id,
                        connection: conn,
                    }
                })
        })
}
fn get_excluded_keys(preliminary_data_response: &GetDataResponse) -> Vec<Vec<u8>> {
    preliminary_data_response
        .data_set
        .iter()
        .map(|w| w.message.as_ref().expect("Couldn't unwrap message"))
        .map(|m| match m {
            storage_entry_wrapper::Message::ProtectedStorageEntry(entry) => entry,
            storage_entry_wrapper::Message::ProtectedMailboxStorageEntry(mailbox_entry) => {
                mailbox_entry
                    .entry
                    .as_ref()
                    .expect("Couldn't unwrap StorageEntry")
            }
        })
        .map(|entry| {
            entry
                .storage_payload
                .as_ref()
                .expect("Couldn't unwrap storage_payload")
                .bisq_hash()
                .into()
        })
        .chain(
            preliminary_data_response
                .persistable_network_payload_items
                .iter()
                .map(PersistableNetworkPayload::bisq_hash)
                .map(Vec::<u8>::from),
        )
        .collect()
}