use std::io::{Read, Write};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use std::time::Duration;
use log::*;
use parking_lot::{Condvar, Mutex, MutexGuard};
use secop_core::errors::{Error, Result};
use secop_core::types::{Status, StatusConst};
pub type Connector<R, W> = Box<dyn FnMut() -> Result<(R, W)> + Send + 'static>;
struct CommShared<W> {
writer: Mutex<W>,
buffer: Mutex<Vec<u8>>,
seen_eol: Condvar,
connected: AtomicBool,
stop: AtomicBool,
}
pub struct CommThread<R, W> {
connect: Connector<R, W>,
reader: R,
eol: Vec<u8>,
shared: Arc<CommShared<W>>,
}
pub struct CommClient<W> {
sol: Vec<u8>,
eol: Vec<u8>,
shared: Arc<CommShared<W>>,
timeout: Duration,
}
impl<W: Write> CommClient<W> {
fn is_connected(&self) -> bool {
self.shared.connected.load(Ordering::Relaxed)
}
fn get_line(&self, mut buffer: MutexGuard<Vec<u8>>) -> Result<Vec<u8>> {
let neol = self.eol.len();
let end = buffer
.windows(neol)
.position(|ch| ch == &self.eol[..])
.unwrap();
Ok(buffer.drain(0..end + neol).take(end).collect())
}
pub fn read(&self, max: u32) -> Result<Vec<u8>> {
let mut buffer = self.shared.buffer.lock();
let cur = buffer.len();
let drain = buffer.drain(0..cur.min(max as usize));
Ok(drain.collect())
}
pub fn write(&self, input: &[u8], line: bool) -> Result<u32> {
let mut writer = self.shared.writer.lock();
if line {
writer.write_all(&self.sol)?;
writer.write_all(input)?;
writer.write_all(&self.eol)?;
Ok((input.len() + self.sol.len() + self.eol.len()) as u32)
} else {
writer.write_all(input)?;
Ok(input.len() as u32)
}
}
pub fn readline(&self) -> Result<Vec<u8>> {
let mut buffer = self.shared.buffer.lock();
self.wait_eol(&mut buffer)?;
self.get_line(buffer)
}
pub fn communicate(&self, input: &[u8]) -> Result<Vec<u8>> {
let mut buffer = self.shared.buffer.lock();
buffer.clear();
self.write(input, true)?;
self.wait_eol(&mut buffer)?;
self.get_line(buffer)
}
fn wait_eol<T>(&self, lock: &mut MutexGuard<T>) -> Result<()> {
if self.shared.seen_eol.wait_for(lock, self.timeout).timed_out() {
Err(Error::comm_failed("no response"))
} else {
Ok(())
}
}
}
impl<W> Drop for CommClient<W> {
fn drop(&mut self) {
self.shared.stop.store(true, Ordering::SeqCst);
}
}
impl<R: Read + Send + 'static, W: Write + Send + 'static> CommThread<R, W> {
pub fn spawn(mut connect: Connector<R, W>, sol: &[u8], eol: &[u8], timeout: Duration)
-> Result<CommClient<W>> {
let (reader, writer) = connect()?;
let shared = Arc::new(CommShared {
writer: Mutex::new(writer),
buffer: Mutex::new(Vec::with_capacity(2048)),
seen_eol: Condvar::new(),
connected: AtomicBool::new(true),
stop: AtomicBool::new(false),
});
let mut comm = CommThread {
connect,
reader,
eol: eol.into(),
shared: Arc::clone(&shared),
};
thread::spawn(move || comm.thread());
Ok(CommClient {
shared,
timeout,
sol: sol.into(),
eol: eol.into(),
})
}
fn stopped(&self) -> bool {
self.shared.stop.load(Ordering::Relaxed)
}
fn thread(&mut self) {
debug!("reader thread started");
let mut buf = [0; 1024];
while !self.stopped() {
while !self.stopped() {
match self.reader.read(&mut buf) {
Ok(n) if n == 0 => {
warn!("reader connection closed");
break;
}
Ok(n) => {
let mut buffer = self.shared.buffer.lock();
debug!("reader got: {:?}", &buf[..n]);
buffer.extend(&buf[..n]);
if buffer.windows(self.eol.len()).any(|ch| ch == &self.eol[..]) {
self.shared.seen_eol.notify_one();
}
}
Err(e) => {
error!("reader error: {}", e);
break;
}
}
}
self.shared.connected.store(false, Ordering::SeqCst);
while !self.stopped() {
match (self.connect)() {
Ok((reader, writer)) => {
self.reader = reader;
*self.shared.writer.lock() = writer;
self.shared.connected.store(true, Ordering::SeqCst);
break;
}
Err(e) => {
warn!("error during reconnect: {}", e);
thread::sleep(Duration::from_millis(100));
}
}
}
}
debug!("reader thread exited normally");
}
}
pub trait HasComm {
type IO: Read + Write;
fn get_comm(&self) -> Result<&CommClient<Self::IO>>;
fn read_status(&mut self) -> Result<Status> {
if self.get_comm().ok().map_or(false, |c| c.is_connected()) {
Ok((StatusConst::Idle, "idle".into()))
} else {
Ok((StatusConst::Warn, "not connected".into()))
}
}
fn convert(&self, bytes: Result<Vec<u8>>) -> Result<String> {
bytes.map(|v| {
String::from_utf8(v)
.unwrap_or_else(|e| String::from_utf8_lossy(&e.into_bytes()).into_owned())
})
}
fn do_communicate(&self, arg: String) -> Result<String> {
self.convert(self.get_comm()?.communicate(arg.as_bytes()))
}
fn do_writeline(&self, arg: String) -> Result<()> {
self.get_comm()?.write(arg.as_bytes(), true).map(|_| ())
}
fn do_readline(&self, _: ()) -> Result<String> {
self.convert(self.get_comm()?.readline())
}
fn do_write(&self, arg: String) -> Result<()> {
self.get_comm()?.write(arg.as_bytes(), false).map(|_| ())
}
fn do_read(&self, _: ()) -> Result<String> {
self.convert(self.get_comm()?.read(u32::max_value()))
}
fn do_multi_communicate(&self, _req: Vec<(String, f64)>) -> Result<Vec<String>> {
panic!("multi_communicate is not yet implemented");
}
}