use action::Action;
use cache::NullCache;
use data::{AppendWrapper, Data, DataIdentifier};
use error::{InterfaceError, RoutingError};
use event::Event;
use id::FullId;
#[cfg(not(feature = "use-mock-crust"))]
use maidsafe_utilities::thread::{self, Joiner};
use messages::{CLIENT_GET_PRIORITY, DEFAULT_PRIORITY, Request};
use outbox::{EventBox, EventBuf};
use routing_table::Authority;
#[cfg(not(feature = "use-mock-crust"))]
use rust_sodium;
use state_machine::{State, StateMachine};
use states;
#[cfg(feature = "use-mock-crust")]
use std::cell::RefCell;
use std::sync::mpsc::{Receiver, Sender, channel};
#[cfg(feature = "use-mock-crust")]
use std::sync::mpsc::TryRecvError;
use types::MessageId;
use types::RoutingActionSender;
use xor_name::XorName;
pub struct Client {
interface_result_tx: Sender<Result<(), InterfaceError>>,
interface_result_rx: Receiver<Result<(), InterfaceError>>,
action_sender: RoutingActionSender,
#[cfg(feature = "use-mock-crust")]
machine: RefCell<StateMachine>,
#[cfg(feature = "use-mock-crust")]
event_buffer: RefCell<EventBuf>,
#[cfg(not(feature = "use-mock-crust"))]
_raii_joiner: Joiner,
}
impl Client {
#[cfg(not(feature = "use-mock-crust"))]
pub fn new(event_sender: Sender<Event>, keys: Option<FullId>) -> Result<Client, RoutingError> {
let min_section_size = 8;
rust_sodium::init();
let mut event_buffer = EventBuf::new();
let (action_sender, mut machine) =
Self::make_state_machine(keys, min_section_size, &mut event_buffer);
for ev in event_buffer.take_all() {
event_sender.send(ev)?;
}
let (tx, rx) = channel();
let raii_joiner = thread::named("Client thread", move || {
while Ok(()) == machine.step(&mut event_buffer) {
for ev in event_buffer.take_all() {
if event_sender.send(ev).is_err() {
return;
}
}
}
});
Ok(Client {
interface_result_tx: tx,
interface_result_rx: rx,
action_sender: action_sender,
_raii_joiner: raii_joiner,
})
}
fn make_state_machine(keys: Option<FullId>,
min_section_size: usize,
outbox: &mut EventBox)
-> (RoutingActionSender, StateMachine) {
let cache = Box::new(NullCache);
let full_id = keys.unwrap_or_else(FullId::new);
StateMachine::new(move |crust_service, timer, _outbox2| {
states::Bootstrapping::new(cache, true, crust_service, full_id, min_section_size, timer)
.map_or(State::Terminated, State::Bootstrapping)
},
outbox)
}
pub fn send_get_request(&self,
dst: Authority<XorName>,
data_id: DataIdentifier,
message_id: MessageId)
-> Result<(), InterfaceError> {
self.send_action(Request::Get(data_id, message_id), dst, CLIENT_GET_PRIORITY)
}
pub fn send_put_request(&self,
dst: Authority<XorName>,
data: Data,
message_id: MessageId)
-> Result<(), InterfaceError> {
self.send_action(Request::Put(data, message_id), dst, DEFAULT_PRIORITY)
}
pub fn send_post_request(&self,
dst: Authority<XorName>,
data: Data,
message_id: MessageId)
-> Result<(), InterfaceError> {
self.send_action(Request::Post(data, message_id), dst, DEFAULT_PRIORITY)
}
pub fn send_delete_request(&self,
dst: Authority<XorName>,
data: Data,
message_id: MessageId)
-> Result<(), InterfaceError> {
self.send_action(Request::Delete(data, message_id), dst, DEFAULT_PRIORITY)
}
pub fn send_append_request(&self,
dst: Authority<XorName>,
wrapper: AppendWrapper,
message_id: MessageId)
-> Result<(), InterfaceError> {
self.send_action(Request::Append(wrapper, message_id), dst, DEFAULT_PRIORITY)
}
pub fn send_get_account_info_request(&self,
dst: Authority<XorName>,
message_id: MessageId)
-> Result<(), InterfaceError> {
self.send_action(Request::GetAccountInfo(message_id),
dst,
CLIENT_GET_PRIORITY)
}
pub fn name(&self) -> Result<XorName, InterfaceError> {
let (result_tx, result_rx) = channel();
self.action_sender.send(Action::Name { result_tx: result_tx })?;
self.receive_action_result(&result_rx)
}
fn send_action(&self,
content: Request,
dst: Authority<XorName>,
priority: u8)
-> Result<(), InterfaceError> {
let action = Action::ClientSendRequest {
content: content,
dst: dst,
priority: priority,
result_tx: self.interface_result_tx.clone(),
};
self.action_sender.send(action)?;
self.receive_action_result(&self.interface_result_rx)?
}
fn receive_action_result<T>(&self, rx: &Receiver<T>) -> Result<T, InterfaceError> {
#[cfg(feature = "use-mock-crust")]
assert!(self.poll());
Ok(rx.recv()?)
}
}
#[cfg(feature = "use-mock-crust")]
impl Client {
pub fn new(keys: Option<FullId>, min_section_size: usize) -> Result<Client, RoutingError> {
let mut event_buffer = EventBuf::new();
let (action_sender, machine) =
Self::make_state_machine(keys, min_section_size, &mut event_buffer);
let (tx, rx) = channel();
Ok(Client {
interface_result_tx: tx,
interface_result_rx: rx,
action_sender: action_sender,
machine: RefCell::new(machine),
event_buffer: RefCell::new(event_buffer),
})
}
pub fn try_next_ev(&self) -> Result<Event, TryRecvError> {
if let Some(cached_ev) = self.event_buffer.borrow_mut().take_first() {
return Ok(cached_ev);
}
self.try_step()?;
self.event_buffer
.borrow_mut()
.take_first()
.ok_or(TryRecvError::Empty)
}
pub fn poll(&self) -> bool {
let mut result = false;
while Ok(()) == self.try_step() {
result = true;
}
result
}
fn try_step(&self) -> Result<(), TryRecvError> {
self.machine.borrow_mut().try_step(&mut *self.event_buffer.borrow_mut())
}
pub fn resend_unacknowledged(&self) -> bool {
self.machine
.borrow_mut()
.current_mut()
.resend_unacknowledged()
}
pub fn has_unacknowledged(&self) -> bool {
self.machine
.borrow()
.current()
.has_unacknowledged()
}
}
impl Drop for Client {
fn drop(&mut self) {
if let Err(err) = self.action_sender.send(Action::Terminate) {
debug!("Error {:?} sending event to Core", err);
}
}
}