use rand;
use sodiumoxide;
use std::io::Error as IoError;
use std::sync::{Mutex, Arc, mpsc};
use std::sync::mpsc::Receiver;
use client_interface::Interface;
use crust;
use messages;
use message_header;
use name_type::NameType;
use sendable::Sendable;
use types;
use error::{RoutingError};
use messages::connect_request::ConnectRequest;
use messages::connect_response::ConnectResponse;
use messages::get_data_response::GetDataResponse;
use messages::put_data_response::PutDataResponse;
use messages::put_data::PutData;
use messages::get_data::GetData;
use message_header::MessageHeader;
use messages::{RoutingMessage, MessageTypeTag};
use types::{MessageId, Id, PublicId};
use authority::Authority;
use utils::*;
pub use crust::Endpoint;
type Bytes = Vec<u8>;
type ConnectionManager = crust::ConnectionManager;
type Event = crust::Event;
type PortAndProtocol = crust::Port;
pub enum CryptoError {
Unknown
}
pub struct RoutingClient<F: Interface> {
interface: Arc<Mutex<F>>,
event_input: Receiver<Event>,
connection_manager: ConnectionManager,
id: Id,
public_id: PublicId,
bootstrap_address: (Option<NameType>, Option<Endpoint>),
next_message_id: MessageId
}
impl<F> Drop for RoutingClient<F> where F: Interface {
fn drop(&mut self) {
}
}
impl<F> RoutingClient<F> where F: Interface {
pub fn new(my_interface: Arc<Mutex<F>>, id: Id) -> RoutingClient<F> {
sodiumoxide::init(); let (tx, rx) = mpsc::channel::<Event>();
RoutingClient {
interface: my_interface,
event_input: rx,
connection_manager: crust::ConnectionManager::new(tx),
public_id: PublicId::new(&id),
id: id,
bootstrap_address: (None, None),
next_message_id: rand::random::<MessageId>()
}
}
pub fn get(&mut self, type_id: u64, name: NameType) -> Result<MessageId, IoError> {
let requester = types::SourceAddress {
from_node: self.public_id.name(),
from_group: None,
reply_to: None,
relayed_for: Some(self.public_id.name())
};
let message_id = self.get_next_message_id();
let message = messages::RoutingMessage::new(
messages::MessageTypeTag::GetData,
message_header::MessageHeader::new(
message_id,
types::DestinationAddress {
dest: name.clone(),
relay_to: None
},
requester.clone(),
Authority::Client
),
GetData {requester: requester.clone(), name_and_type_id: types::NameAndTypeId {
name: name.clone(), type_id: type_id }},
&self.id.get_crypto_secret_sign_key()
);
let _ = encode(&message).map(|msg| self.send_to_bootstrap_node(&msg));
Ok(message_id)
}
pub fn put<T>(&mut self, content: T) -> Result<MessageId, IoError> where T: Sendable {
let message_id = self.get_next_message_id();
let message = messages::RoutingMessage::new(
messages::MessageTypeTag::PutData,
MessageHeader::new(
message_id,
types::DestinationAddress {dest: self.public_id.name(), relay_to: None },
types::SourceAddress {
from_node: self.public_id.name(),
from_group: None,
reply_to: None,
relayed_for: Some(self.public_id.name()),
},
Authority::Client
),
PutData {name: content.name(), data: content.serialised_contents()},
&self.id.get_crypto_secret_sign_key()
);
let _ = encode(&message).map(|msg| self.send_to_bootstrap_node(&msg));
Ok(message_id)
}
pub fn unauthorised_put(&mut self, destination: NameType, content: Box<Sendable>) {
let message = RoutingMessage::new(MessageTypeTag::UnauthorisedPut,
MessageHeader::new(self.get_next_message_id(),
types::DestinationAddress{ dest: destination, relay_to: None },
types::SourceAddress {
from_node: self.public_id.name(),
from_group: None,
reply_to: None,
relayed_for: Some(self.public_id.name()),
},
Authority::Unknown),
PutData{ name: content.name(), data: content.serialised_contents() },
&self.id.get_crypto_secret_sign_key());
let _ = encode(&message).map(|msg| self.send_to_bootstrap_node(&msg));
}
pub fn run(&mut self) {
match self.event_input.try_recv() {
Err(_) => (),
Ok(crust::connection_manager::Event::NewMessage(endpoint, bytes)) => {
let routing_msg = match decode::<RoutingMessage>(&bytes) {
Ok(routing_msg) => routing_msg,
Err(_) => return
};
println!("received a {:?} from {:?}", routing_msg.message_type,
endpoint );
match self.bootstrap_address.1.clone() {
Some(ref bootstrap_endpoint) => {
if bootstrap_endpoint == &endpoint {
match routing_msg.message_type {
MessageTypeTag::ConnectResponse => {
self.handle_connect_response(endpoint,
routing_msg.serialised_body);
},
MessageTypeTag::GetDataResponse => {
self.handle_get_data_response(routing_msg.message_header,
routing_msg.serialised_body);
},
MessageTypeTag::PutDataResponse => {
self.handle_put_data_response(routing_msg.message_header,
routing_msg.serialised_body);
},
_ => {}
}
}
},
None => { println!("Client is not connected to a node."); }
}
},
_ => { }
};
}
pub fn bootstrap(&mut self, bootstrap_list: Option<Vec<Endpoint>>,
beacon_port: Option<u16>) -> Result<(), RoutingError> {
let ports_and_protocols : Vec<PortAndProtocol> = Vec::new();
let beacon_port = Some(5483u16);
let listeners = match self.connection_manager
.start_listening2(ports_and_protocols, beacon_port) {
Err(reason) => {
println!("Failed to start listening: {:?}", reason);
(vec![], None)
}
Ok(listeners_and_beacon) => listeners_and_beacon
};
println!("trying to bootstrapped client");
let bootstrapped_to = try!(self.connection_manager.bootstrap(bootstrap_list, beacon_port)
.map_err(|_|RoutingError::FailedToBootstrap));
self.bootstrap_address.1 = Some(bootstrapped_to);
println!("bootstrapped client");
self.send_bootstrap_connect_request(listeners.0);
Ok(())
}
fn send_bootstrap_connect_request(&mut self, accepting_on: Vec<Endpoint>) {
match self.bootstrap_address.clone() {
(_, Some(ref endpoint)) => {
println!("Sending connect request");
let message = RoutingMessage::new(
MessageTypeTag::ConnectRequest,
MessageHeader::new(
self.get_next_message_id(),
types::DestinationAddress{ dest: self.public_id.name(),
relay_to: None },
types::SourceAddress{ from_node: self.public_id.name(),
from_group: None, reply_to: None,
relayed_for: Some(self.public_id.name()) },
Authority::Client),
ConnectRequest {
local_endpoints: accepting_on,
external_endpoints: vec![],
requester_id: self.public_id.name(),
receiver_id: self.public_id.name(),
requester_fob: self.public_id.clone() },
&self.id.get_crypto_secret_sign_key());
let _ = encode(&message).map(|msg| self.send_to_bootstrap_node(&msg));
},
_ => {}
}
}
fn handle_connect_response(&mut self, peer_endpoint: Endpoint, bytes: Bytes) {
match decode::<ConnectResponse>(&bytes) {
Err(_) => return,
Ok(connect_response_msg) => {
assert!(self.bootstrap_address.0.is_none());
assert_eq!(self.bootstrap_address.1, Some(peer_endpoint.clone()));
self.bootstrap_address.0 = Some(connect_response_msg.receiver_fob.name());
}
};
}
fn send_to_bootstrap_node(&mut self, serialised_message: &Vec<u8>) {
match self.bootstrap_address.1 {
Some(ref bootstrap_endpoint) => {
let _ = self.connection_manager.send(bootstrap_endpoint.clone(),
serialised_message.clone());
},
None => {}
};
}
fn get_next_message_id(&mut self) -> MessageId {
self.next_message_id = self.next_message_id.wrapping_add(1);
self.next_message_id
}
fn handle_get_data_response(&self, header: MessageHeader, body: Bytes) {
match decode::<GetDataResponse>(&body) {
Ok(get_data_response) => {
let mut interface = self.interface.lock().unwrap();
interface.handle_get_response(header.message_id,
get_data_response.data);
},
Err(_) => {}
};
}
fn handle_put_data_response(&self, header: MessageHeader, body: Bytes) {
match decode::<PutDataResponse>(&body) {
Ok(put_data_response) => {
let mut interface = self.interface.lock().unwrap();
interface.handle_get_response(header.message_id,
put_data_response.data);
},
Err(_) => {}
};
}
}