extern crate alloc;
extern crate multiwii_serial_protocol_v2;
extern crate serialport;
extern crate packed_struct;
use multiwii_serial_protocol_v2::{MspPacket, MspParser};
use async_std::sync::{channel, Arc, Condvar, Mutex, Sender, Receiver};
use async_std::task;
use async_std::io::{Error, ErrorKind};
use std::time::{Duration, Instant};
use std::collections::VecDeque;
#[derive(Clone)]
pub struct Core {
parser_locked: Arc<Mutex<MspParser>>,
verbose: bool,
buff_size: usize,
msp_write_delay: Duration,
msp_reader_recv: Receiver<MspPacket>,
msp_writer_send: Sender<MspPacket>,
msp_error_recv: Receiver<Error>,
}
pub struct MspTaskHandle {
input_handle: Option<async_std::task::JoinHandle<()>>,
output_handle: async_std::task::JoinHandle<()>,
}
impl MspTaskHandle {
pub async fn cancel(self) {
self.output_handle.cancel().await;
match self.input_handle {
Some(h) => h.cancel().await,
None => None,
};
}
}
impl Core {
pub fn open(stream: impl Send + std::io::Read + std::io::Write + Clone + 'static,
buff_size: usize,
msp_write_delay: Duration,
verbose: bool,
) -> (Core, MspTaskHandle) {
let (msp_reader_send, msp_reader_recv) = channel::<MspPacket>(4096);
let (msp_writer_send, msp_writer_recv) = channel::<MspPacket>(1024);
let (msp_error_send, msp_error_recv) = channel::<Error>(1);
let parser = MspParser::new();
let parser_locked = Arc::new(Mutex::new(parser));
let serial_write_lock = Arc::new((Mutex::new(buff_size.clone()), Condvar::new()));
let serial_write_lock_clone = serial_write_lock.clone();
let elapsed_queue_lock = Arc::new(Mutex::new(VecDeque::with_capacity(buff_size.clone())));
let elapsed_queue_lock_clone = elapsed_queue_lock.clone();
let input_handle = if buff_size > 0 {
let reader = stream.clone();
Some(Core::process_input(reader, parser_locked.clone(), msp_reader_send, serial_write_lock, msp_error_send.clone(), elapsed_queue_lock, verbose.clone()))
} else {
None
};
let output_handle = Core::process_output(stream, msp_writer_recv, serial_write_lock_clone, msp_write_delay.clone(), msp_error_send, elapsed_queue_lock_clone, verbose.clone());
return (Core {
buff_size,
msp_write_delay,
verbose,
parser_locked,
msp_reader_recv,
msp_writer_send,
msp_error_recv,
}, MspTaskHandle {
input_handle,
output_handle
});
}
pub async fn read(&self) -> MspPacket {
return self.msp_reader_recv.recv().await.unwrap();
}
pub async fn write(&self, packet: MspPacket) {
self.msp_writer_send.send(packet).await;
}
pub async fn error(&self) -> Error {
return self.msp_error_recv.recv().await.unwrap();
}
fn process_input(
serial: impl Send + std::io::Read + 'static,
parser_locked: Arc<Mutex<MspParser>>,
msp_reader_send: Sender<MspPacket>,
serial_write_lock: Arc<(Mutex<usize>, Condvar)>,
msp_error_send: Sender<Error>,
elapsed_queue_lock: Arc<Mutex<VecDeque<Instant>>>,
verbose: bool,
) -> async_std::task::JoinHandle<()> {
task::spawn(async move {
let (lock, cvar) = &*serial_write_lock;
let initial_lock = lock.lock().await;
let initial_buffer_size = *initial_lock;
drop(initial_lock);
for byte in serial.bytes() {
match byte {
Ok(byte) => {
let mut parser = parser_locked.lock().await;
let res = parser.parse(byte);
match res {
Ok(Some(p)) => {
if verbose {
println!("receive new msp packet {}", p.cmd);
match (*elapsed_queue_lock.lock().await).pop_front() {
Some(instant) => println!("elapsed time since send {}", instant.elapsed().subsec_millis()),
None => (),
};
}
msp_reader_send.send(p).await;
let mut received_lock = lock.lock().await;
if *received_lock < initial_buffer_size {
*received_lock += 1;
cvar.notify_one();
}
},
Err(e) => eprintln!("bad crc {:?}", e),
Ok(None) => ()
}
}
Err(ref e) if e.kind() == ErrorKind::TimedOut => {
if verbose {
println!("read timeout");
}
}
Err(e) => msp_error_send.send(e).await
};
task::yield_now().await;
}
})
}
fn process_output(
mut serial: impl Send + std::io::Write + 'static,
msp_writer_recv: Receiver<MspPacket>,
serial_write_lock: Arc<(Mutex<usize>, Condvar)>,
write_delay: Duration,
msp_error_send: Sender<Error>,
elapsed_queue_lock: Arc<Mutex<VecDeque<Instant>>>,
verbose: bool,
) -> async_std::task::JoinHandle<()> {
task::spawn(async move {
let (lock, cvar) = &*serial_write_lock;
let mut should_wait_for_lock = false;
let temp_lock_guard = lock.lock().await;
if *temp_lock_guard > 0 {
should_wait_for_lock = true;
}
drop(temp_lock_guard);
loop {
if should_wait_for_lock {
let guard = cvar.wait_until(lock.lock().await, |send_count| {
if *send_count > 0 {
*send_count -=1;
return true;
}
return false;
}).await;
drop(guard);
}
let packet = match msp_writer_recv.recv().await {
Err(_) => break,
Ok(packet) => packet,
};
let size = packet.packet_size_bytes_v2();
let mut output = vec![0; size];
packet
.serialize_v2(&mut output)
.expect("Failed to serialize");
if verbose {
println!("writing {}", packet.cmd);
}
loop {
match serial.write(&output) {
Ok(_) => {
if verbose && should_wait_for_lock {
(*elapsed_queue_lock.lock().await).push_back(Instant::now());
}
break;
},
Err(ref e) if e.kind() == ErrorKind::TimedOut => {
task::yield_now().await;
},
Err(e) => {
msp_error_send.send(e).await;
*(lock.lock().await) += 1;
}
}
}
if write_delay > Duration::from_millis(0) {
task::sleep(write_delay).await;
}
task::yield_now().await;
}
})
}
pub async fn reset_parser(&self) {
(*self.parser_locked.lock().await).reset();
}
pub fn buff_size(&self) -> usize {
self.buff_size
}
}