1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
use futures::future::poll_fn; use futures::task::Poll; use tokio::io::*; use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf}; use tokio::net::TcpStream; use tokio::sync::Mutex; use super::commands::stream_mapper::CommandToByteMapper; use super::commands::Command; use super::events::stream_mapper::*; use super::events::Event; type EventClosure = dyn FnMut(&Event) + Sync + Send + 'static; type EventClosureMutex = Box<EventClosure>; pub fn event_handler<F>(f: F) -> EventClosureMutex where F: FnMut(&Event) + Sync + Send + 'static, { Box::new(f) } pub struct FlicClient { reader: Mutex<OwnedReadHalf>, writer: Mutex<OwnedWriteHalf>, is_running: Mutex<bool>, command_mapper: Mutex<CommandToByteMapper>, event_mapper: Mutex<ByteToEventMapper>, map: Mutex<Vec<EventClosureMutex>>, } impl FlicClient { pub async fn new(conn: &str) -> Result<FlicClient> { TcpStream::connect(conn) .await .map(|s| s.into_split()) .map(|(reader, writer)| FlicClient { reader: Mutex::new(reader), writer: Mutex::new(writer), is_running: Mutex::new(true), command_mapper: Mutex::new(CommandToByteMapper::new()), event_mapper: Mutex::new(ByteToEventMapper::new()), map: Mutex::new(vec![]), }) } pub async fn register_event_handler(self, event: EventClosureMutex) -> Self { self.map.lock().await.push(event); self } pub async fn listen(&self) { while *self.is_running.lock().await { let mut reader = self.reader.lock().await; if let Ok(size) = poll_fn(|cx| { let mut buf = [0; 1]; match reader.poll_peek(cx, &mut buf) { Poll::Pending => Poll::Ready(Ok(0_usize)), Poll::Ready(all) => Poll::Ready(all), } }) .await { if size > 0 { let mut buffer = vec![]; if let Some(_) = reader.read_buf(&mut buffer).await.ok() { for b in buffer.iter() { match self.event_mapper.lock().await.map(*b) { EventResult::Some(Event::NoOp) => {} EventResult::Some(event) => { let mut map = self.map.lock().await; for ref mut f in &mut *map { f(&event); } } _ => {} } } } } } } } pub async fn stop(&self) { *self.is_running.lock().await = false; } pub async fn submit(&self, cmd: Command) { let mut writer = self.writer.lock().await; for b in self.command_mapper.lock().await.map(cmd) { let _ = writer.write_u8(b).await; } } }