use std::cell::RefCell;
use std::io::Read;
use std::io::Write;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::Weak;
use std::sync::Mutex;
use std::time::Duration;
use mio::net::UnixStream;
use super::core::*;
pub trait UdsClientHandler {
fn handle_connect(&self, entry: &UdsClient) -> Result<(), EventError>;
fn handle_disconnect(&self, entry: &UdsClient) -> Result<(), EventError>;
fn handle_message(&self, entry: &UdsClient) -> Result<(), EventError>;
}
pub struct UdsClient {
inner: RefCell<Option<Arc<UdsClientInner>>>,
}
impl UdsClient {
pub fn new() -> UdsClient {
UdsClient {
inner: RefCell::new(None),
}
}
pub fn release(&self) {
self.inner.replace(None);
}
pub fn get_inner(&self) -> Arc<UdsClientInner> {
if let Some(ref mut inner) = *self.inner.borrow_mut() {
return inner.clone();
}
panic!("No inner exists");
}
pub fn start(
event_manager: Arc<Mutex<EventManager>>,
handler: Arc<dyn UdsClientHandler>,
path: &PathBuf,
) -> Arc<Mutex<UdsClient>> {
let client = Arc::new(Mutex::new(UdsClient::new()));
let inner = Arc::new(UdsClientInner::new(
client.clone(),
event_manager.clone(),
handler.clone(),
path,
));
client.lock().unwrap().inner.borrow_mut().replace(inner);
client
}
pub fn connect(&self) {
let inner = self.get_inner();
let event_manager = inner.get_event_manager();
match inner.connect(self) {
Ok(_) => {
if let Some(ref mut stream) = *inner.stream.borrow_mut() {
if let Err(_) = event_manager
.lock()
.unwrap()
.register_read_write(stream, inner.clone())
{
self.connect_timer();
}
}
}
Err(_) => self.connect_timer(),
}
}
pub fn connect_timer(&self) {
let inner = self.get_inner();
let event_manager = inner.get_event_manager();
event_manager
.lock()
.unwrap()
.register_timer(Duration::from_secs(5), inner.clone());
}
pub fn stream_send(&self, message: &str) -> Result<(), EventError> {
self.get_inner().stream_send(message)
}
pub fn stream_read(&self) -> Result<String, EventError> {
self.get_inner().stream_read()
}
}
impl Drop for UdsClient {
fn drop(&mut self) {
println!("Drop UdsClient");
let inner = self.inner.replace(None);
drop(inner);
}
}
unsafe impl Send for UdsClientInner {}
unsafe impl Sync for UdsClientInner {}
pub struct UdsClientInner {
path: PathBuf,
client: Weak<Mutex<UdsClient>>,
event_manager: Arc<Mutex<EventManager>>,
handler: RefCell<Arc<dyn UdsClientHandler>>,
stream: RefCell<Option<UnixStream>>,
}
impl UdsClientInner {
pub fn new(
client: Arc<Mutex<UdsClient>>,
event_manager: Arc<Mutex<EventManager>>,
handler: Arc<dyn UdsClientHandler>,
path: &PathBuf,
) -> UdsClientInner {
UdsClientInner {
path: path.clone(),
client: Arc::downgrade(&client),
event_manager: event_manager,
handler: RefCell::new(handler),
stream: RefCell::new(None),
}
}
pub fn connect(&self, client: &UdsClient) -> Result<(), EventError> {
match UnixStream::connect(&self.path) {
Ok(stream) => {
self.stream.borrow_mut().replace(stream);
let _ = self.handler.borrow_mut().handle_connect(client);
Ok(())
}
Err(_) => Err(EventError::ConnectError("UDS".to_string())),
}
}
pub fn get_event_manager(&self) -> Arc<Mutex<EventManager>> {
self.event_manager.clone()
}
pub fn stream_send(&self, message: &str) -> Result<(), EventError> {
match *self.stream.borrow_mut() {
Some(ref mut stream) => {
if let Err(_err) = stream.write_all(message.as_bytes()) {
return Err(EventError::WriteError("UDS".to_string()));
}
}
None => return Err(EventError::WriteError("UDS".to_string())),
}
Ok(())
}
pub fn stream_read(&self) -> Result<String, EventError> {
match *self.stream.borrow_mut() {
Some(ref mut stream) => {
let mut buffer = Vec::new();
if let Err(err) = stream.read_to_end(&mut buffer) {
if err.kind() != std::io::ErrorKind::WouldBlock {
return Err(EventError::ReadError(err.to_string()));
}
}
let str = std::str::from_utf8(&buffer).unwrap();
if str.len() > 0 {
let message = String::from(str);
Ok(message)
} else {
Err(EventError::ReadError(
"Empty string from stream".to_string(),
))
}
}
None => Err(EventError::NoStream),
}
}
}
impl Drop for UdsClientInner {
fn drop(&mut self) {
println!("Drop UdsClientInner");
}
}
impl EventHandler for UdsClientInner {
fn handle(&self, e: EventType) -> Result<(), EventError> {
match e {
EventType::TimerEvent => {
let client = self.client.upgrade().unwrap();
let client = client.lock().unwrap();
client.connect();
Ok(())
}
EventType::ReadEvent => {
let handler = self.handler.borrow_mut();
let client = self.client.upgrade().unwrap();
let client = client.lock().unwrap();
if let Err(_) = handler.handle_message(&*client) {
client.connect_timer();
}
Ok(())
}
EventType::ErrorEvent => {
self.stream.borrow_mut().take();
let client = self.client.upgrade().unwrap();
let client = client.lock().unwrap();
client.connect_timer();
let handler = self.handler.borrow_mut();
handler.handle_disconnect(&*client)
}
_ => Err(EventError::InvalidEvent),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::Path;
struct TestClientHandler {
}
impl UdsClientHandler for TestClientHandler {
fn handle_connect(&self, _entry: &UdsClient) -> Result<(), EventError> {
Ok(())
}
fn handle_disconnect(&self, _entry: &UdsClient) -> Result<(), EventError> {
Ok(())
}
fn handle_message(&self, _entry: &UdsClient) -> Result<(), EventError> {
Ok(())
}
}
#[test]
pub fn test_uds_client() {
let mut path = PathBuf::new();
path.push("/tmp/test_uds.sock");
let em = EventManager::new();
let handler = TestClientHandler { };
{
let uds_client = UdsClient::start(Arc::new(Mutex::new(em)),
Arc::new(handler), &path);
drop(uds_client);
}
}
}