use routing::{Authority, Client, Data, DataIdentifier, Event, FullId, MessageId, Response, XorName};
use rust_sodium::crypto;
use std::sync::mpsc::{self, Receiver, TryRecvError};
use std::thread;
use std::time::{Duration, Instant};
const RESPONSE_TIMEOUT_SECS: u64 = 10;
pub struct ExampleClient {
routing_client: Client,
receiver: Receiver<Event>,
full_id: FullId,
}
impl ExampleClient {
pub fn new() -> ExampleClient {
let (sender, receiver) = mpsc::channel::<Event>();
let sign_keys = crypto::sign::gen_keypair();
let encrypt_keys = crypto::box_::gen_keypair();
let full_id = FullId::with_keys(encrypt_keys.clone(), sign_keys.clone());
let mut routing_client;
'outer: loop {
routing_client = unwrap!(Client::new(sender.clone(), Some(full_id.clone())));
for event in receiver.iter() {
match event {
Event::Connected => {
println!("Client Connected to the network");
break 'outer;
}
Event::Terminate => {
println!("Client failed to connect to the network. Restarting.");
thread::sleep(Duration::from_secs(5));
break;
}
_ => (),
}
}
}
ExampleClient {
routing_client: routing_client,
receiver: receiver,
full_id: full_id,
}
}
pub fn get(&mut self, request: DataIdentifier) -> Option<Data> {
let message_id = MessageId::new();
unwrap!(self.routing_client.send_get_request(Authority::NaeManager(*request.name()),
request.clone(),
message_id));
loop {
match recv_with_timeout(&self.receiver, Duration::from_secs(RESPONSE_TIMEOUT_SECS)) {
Some(Event::Response { response: Response::GetSuccess(data, id), .. }) => {
if message_id != id {
error!("GetSuccess for {:?}, but with wrong message_id {:?} instead of \
{:?}.",
data.name(),
id,
message_id);
}
return Some(data);
}
Some(Event::Response {
response: Response::GetFailure { external_error_indicator, .. },
.. }) => {
error!("Failed to Get {:?}: {:?}",
request.name(),
unwrap!(String::from_utf8(external_error_indicator)));
return None;
}
Some(Event::Terminate) |
Some(Event::RestartRequired) => self.disconnected(),
Some(_) => (),
None => return None,
}
}
}
pub fn put(&self, data: Data) -> Result<(), ()> {
let data_id = data.identifier();
let message_id = MessageId::new();
unwrap!(self.routing_client.send_put_request(Authority::ClientManager(*self.name()),
data,
message_id));
loop {
match recv_with_timeout(&self.receiver, Duration::from_secs(RESPONSE_TIMEOUT_SECS)) {
Some(Event::Response { response: Response::PutSuccess(rec_data_id, id), .. }) => {
if message_id != id {
error!("Stored {:?}, but with wrong message_id {:?} instead of {:?}.",
data_id.name(),
id,
message_id);
return Err(());
} else if data_id == rec_data_id {
trace!("Successfully stored {:?}", data_id.name());
return Ok(());
} else {
error!("Stored {:?}, but with wrong name {:?}.",
data_id.name(),
rec_data_id.name());
return Err(());
}
}
Some(Event::Response { response: Response::PutFailure { .. }, .. }) => {
error!("Received PutFailure for {:?}.", data_id.name());
return Err(());
}
Some(Event::Terminate) |
Some(Event::RestartRequired) => self.disconnected(),
Some(_) => (),
None => return Err(()),
}
}
}
fn disconnected(&self) {
panic!("Disconnected from the network.");
}
#[allow(unused)]
pub fn post(&self) {
unimplemented!()
}
#[allow(unused)]
pub fn delete(&self) {
unimplemented!()
}
pub fn name(&self) -> &XorName {
self.full_id.public_id().name()
}
}
impl Default for ExampleClient {
fn default() -> ExampleClient {
ExampleClient::new()
}
}
fn recv_with_timeout<T>(rx: &Receiver<T>, timeout: Duration) -> Option<T> {
let start = Instant::now();
let wait = Duration::from_millis(100);
loop {
match rx.try_recv() {
Ok(value) => return Some(value),
Err(TryRecvError::Empty) => thread::sleep(wait),
Err(TryRecvError::Disconnected) => return None,
}
if Instant::now() - start > timeout {
warn!("Timed out.");
return None;
}
}
}