use cbor::{Decoder, Encoder, CborError};
use rand;
use rustc_serialize::{Decodable, Encodable};
use sodiumoxide;
use std::sync::mpsc;
use std::boxed::Box;
use std::thread;
use std::marker::PhantomData;
use crust;
use NameType;
use node_interface::{Interface, CreatePersonas};
use routing_membrane::RoutingMembrane;
use types;
use types::{MessageId, Bytes};
use authority::{Authority};
use messages::connect_request::ConnectRequest;
use messages::put_public_id::PutPublicId;
use messages::put_public_id_response::PutPublicIdResponse;
use messages::{RoutingMessage, MessageTypeTag};
use message_header::MessageHeader;
use error::{RoutingError};
use std::thread::spawn;
type ConnectionManager = crust::ConnectionManager;
type Event = crust::Event;
pub type Endpoint = crust::Endpoint;
type PortAndProtocol = crust::Port;
type RoutingResult = Result<(), RoutingError>;
pub struct RoutingNode<F, G> where F : Interface + 'static,
G : CreatePersonas<F> {
genesis: Box<G>,
phantom_data: PhantomData<F>,
id: types::Id,
own_name: NameType,
next_message_id: MessageId,
bootstrap_endpoint: Option<Endpoint>,
bootstrap_node_id: Option<NameType>,
}
impl<F, G> RoutingNode<F, G> where F : Interface + 'static,
G : CreatePersonas<F> {
pub fn new(genesis: G) -> RoutingNode<F, G> {
sodiumoxide::init(); let id = types::Id::new();
let own_name = id.get_name();
RoutingNode { genesis: Box::new(genesis),
phantom_data: PhantomData,
id : id,
own_name : own_name.clone(),
next_message_id: rand::random::<MessageId>(),
bootstrap_endpoint: None,
bootstrap_node_id: None,
}
}
pub fn run_zero_membrane(&mut self) {
let (event_output, event_input) = mpsc::channel();
let mut cm = crust::ConnectionManager::new(event_output);
let ports_and_protocols : Vec<PortAndProtocol> = Vec::new();
let beacon_port = Some(5483u16);
let listeners = match cm.start_listening2(ports_and_protocols, beacon_port) {
Err(reason) => {
println!("Failed to start listening: {:?}", reason);
(vec![], None)
}
Ok(listeners_and_beacon) => listeners_and_beacon
};
let original_name = self.id.get_name();
let self_relocated_name = types::calculate_self_relocated_name(
&self.id.get_crypto_public_sign_key(),
&self.id.get_crypto_public_key(),
&self.id.get_validation_token());
println!("ZERO listening on {:?}, named {:?}", listeners.0.first(),
self_relocated_name);
self.id.assign_relocated_name(self_relocated_name);
let mut membrane = RoutingMembrane::<F>::new(
cm, event_input, None,
listeners.0, self.id.clone(),
self.genesis.create_personas());
spawn(move || membrane.run());
}
pub fn bootstrap(&mut self,
bootstrap_list: Option<Vec<Endpoint>>,
beacon_port: Option<u16>) -> Result<(), RoutingError> {
let (event_output, event_input) = mpsc::channel();
let mut cm = crust::ConnectionManager::new(event_output);
let ports_and_protocols : Vec<PortAndProtocol> = Vec::new();
let beacon_port = Some(5483u16);
let listeners = match cm.start_listening2(ports_and_protocols, beacon_port) {
Err(reason) => {
println!("Failed to start listening: {:?}", reason);
(vec![], None)
}
Ok(listeners_and_beacon) => listeners_and_beacon
};
let bootstrapped_to = try!(cm.bootstrap(bootstrap_list, beacon_port)
.map_err(|_|RoutingError::FailedToBootstrap));
println!("BOOTSTRAP to {:?}", bootstrapped_to);
println!("NODE listening on {:?}", listeners.0.first());
self.bootstrap_endpoint = Some(bootstrapped_to.clone());
cm.connect(vec![bootstrapped_to.clone()]);
thread::sleep_ms(100);
let unrelocated_id = self.id.clone();
let mut relocated_name : Option<NameType>;
let connect_msg = self.construct_connect_request_msg(&unrelocated_id.get_name(),
listeners.0.clone());
let serialised_message = try!(encode(&connect_msg));
ignore(cm.send(bootstrapped_to.clone(), serialised_message));
println!("Waiting for responses from network");
loop {
match event_input.recv() {
Err(_) => {},
Ok(crust::Event::NewMessage(endpoint, bytes)) => {
match decode::<RoutingMessage>(&bytes) {
Ok(message) => {
match message.message_type {
MessageTypeTag::ConnectResponse => {
println!("Received connect response");
let put_public_id_msg
= self.construct_put_public_id_msg(
&types::PublicId::new(&unrelocated_id));
let serialised_message = try!(encode(&put_public_id_msg));
ignore(cm.send(bootstrapped_to.clone(), serialised_message));
},
MessageTypeTag::PutPublicIdResponse => {
let put_public_id_response =
try!(decode::<PutPublicIdResponse>(&message.serialised_body));
relocated_name = Some(put_public_id_response.public_id.name());
debug_assert!(put_public_id_response.public_id.is_relocated());
if put_public_id_response.public_id.validation_token
!= self.id.get_validation_token() {
return Err(RoutingError::FailedToBootstrap); }
println!("Received PutPublicId relocated name {:?} from {:?}",
relocated_name, self.id.get_name());
break;
},
_ => {
println!("Received unexpected message");
}
}
},
Err(_) => {
}
};
},
Ok(crust::Event::NewConnection(endpoint)) => {
println!("NewConnection on {:?} while waiting on network.", endpoint);
},
Ok(crust::Event::LostConnection(endpoint)) => {
return Err(RoutingError::FailedToBootstrap);
}
}
};
match relocated_name {
Some(relocated_name) => {
self.id.assign_relocated_name(relocated_name);
debug_assert!(self.id.is_relocated());
let mut membrane = RoutingMembrane::<F>::new(
cm, event_input, Some(bootstrapped_to.clone()),
listeners.0, self.id.clone(),
self.genesis.create_personas());
spawn(move || membrane.run());
},
_ => panic!("DEBUG: did not relocate the publicId.") };
Ok(())
}
fn construct_connect_request_msg(&mut self, destination: &NameType,
accepting_on: Vec<Endpoint>) -> RoutingMessage {
let header = MessageHeader::new(self.get_next_message_id(),
types::DestinationAddress {dest: destination.clone(), relay_to: None },
self.our_source_address(), Authority::ManagedNode);
let connect_request = ConnectRequest {
local_endpoints: accepting_on,
external_endpoints: vec![],
requester_id: self.own_name.clone(),
receiver_id: destination.clone(),
requester_fob: types::PublicId::new(&self.id),
};
RoutingMessage::new(MessageTypeTag::ConnectRequest, header, connect_request,
&self.id.get_crypto_secret_sign_key())
}
fn construct_put_public_id_msg(&mut self,
our_unrelocated_id: &types::PublicId) -> RoutingMessage {
let header = MessageHeader::new(self.get_next_message_id(),
types::DestinationAddress{dest: our_unrelocated_id.name(), relay_to: None},
self.our_source_address(), Authority::ManagedNode);
let put_public_id = PutPublicId { public_id : our_unrelocated_id.clone() };
RoutingMessage::new(MessageTypeTag::PutPublicId, header, put_public_id,
&self.id.get_crypto_secret_sign_key())
}
fn our_source_address(&self) -> types::SourceAddress {
types::SourceAddress{ from_node: self.id.get_name(),
from_group: None,
reply_to: None,
relayed_for: Some(self.id.get_name()) }
}
fn get_next_message_id(&mut self) -> MessageId {
let temp = self.next_message_id;
self.next_message_id = self.next_message_id.wrapping_add(1);
return temp;
}
}
fn encode<T>(value: &T) -> Result<Bytes, CborError> where T: Encodable {
let mut enc = Encoder::from_memory();
try!(enc.encode(&[value]));
Ok(enc.into_bytes())
}
fn decode<T>(bytes: &Bytes) -> Result<T, CborError> where T: Decodable {
let mut dec = Decoder::from_bytes(&bytes[..]);
match dec.decode().next() {
Some(result) => result,
None => Err(CborError::UnexpectedEOF)
}
}
fn ignore<R,E>(_: Result<R,E>) {}
#[cfg(test)]
mod test {
}