1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
use std::future::poll_fn;
use std::task::Poll;

use tokio::io::*;
use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
use tokio::net::TcpStream;
use tokio::sync::Mutex;

use super::commands::stream_mapper::CommandToByteMapper;
use super::commands::Command;
use super::events::stream_mapper::*;
use super::events::Event;

type EventClosure = dyn FnMut(&Event) + Sync + Send + 'static;
type EventClosureMutex = Box<EventClosure>;

pub fn event_handler<F>(f: F) -> EventClosureMutex
where
    F: FnMut(&Event) + Sync + Send + 'static,
{
    Box::new(f)
}

pub struct FlicClient {
    reader: Mutex<OwnedReadHalf>,
    writer: Mutex<OwnedWriteHalf>,
    is_running: Mutex<bool>,
    command_mapper: Mutex<CommandToByteMapper>,
    event_mapper: Mutex<ByteToEventMapper>,
    map: Mutex<Vec<EventClosureMutex>>,
}

impl FlicClient {
    pub async fn new(conn: &str) -> Result<FlicClient> {
        TcpStream::connect(conn)
            .await
            .map(|s| s.into_split())
            .map(|(reader, writer)| FlicClient {
                reader: Mutex::new(reader),
                writer: Mutex::new(writer),
                is_running: Mutex::new(true),
                command_mapper: Mutex::new(CommandToByteMapper::new()),
                event_mapper: Mutex::new(ByteToEventMapper::new()),
                map: Mutex::new(vec![]),
            })
    }
    pub async fn register_event_handler(self, event: EventClosureMutex) -> Self {
        self.map.lock().await.push(event);
        self
    }

    // pub async fn listen_old(&self) {
    //     while *self.is_running.lock().await {
    //         let mut reader = self.reader.lock().await;
    //         if let Ok(size) = poll_fn(|cx| {
    //             let mut buf = [0; 1];
    //             let mut read_buf = ReadBuf::new(&mut buf);
    //             match reader.poll_peek(cx, &mut read_buf) {
    //                 Poll::Pending => Poll::Ready(Ok(0_usize)),
    //                 Poll::Ready(all) => Poll::Ready(all),
    //             }
    //         })
    //         .await
    //         {
    //             if size > 0 {
    //                 let mut buffer = vec![];
    //                 if let Some(_) = reader.read_buf(&mut buffer).await.ok() {
    //                     for b in buffer.iter() {
    //                         match self.event_mapper.lock().await.map(*b) {
    //                             EventResult::Some(Event::NoOp) => {}
    //                             EventResult::Some(event) => {
    //                                 let mut map = self.map.lock().await;
    //                                 for ref mut f in &mut *map {
    //                                     f(&event);
    //                                 }
    //                             }
    //                             _ => {}
    //                         }
    //                     }
    //                 }
    //             }
    //         }
    //     }
    // }

    pub async fn listen(&self) {
        while *self.is_running.lock().await {
            let mut reader = self.reader.lock().await;
            let mut buffer = vec![0; 2048];

            let size = reader.read(&mut buffer).await;

            match size {
                Ok(size) if size > 0 => {
                    buffer.truncate(size);
                    for b in buffer.iter() {
                        match self.event_mapper.lock().await.map(*b) {
                            EventResult::Some(Event::NoOp) => {}
                            EventResult::Some(event) => {
                                let mut map = self.map.lock().await;
                                for ref mut f in &mut *map {
                                    f(&event);
                                }
                            }
                            _ => {}
                        }
                    }
                }
                Ok(_) => {
                    tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
                }
                Err(e) => {
                    eprintln!("Error reading from reader: {}", e);
                    break;
                }
            }
        }
    }

    pub async fn stop(&self) {
        *self.is_running.lock().await = false;
    }

    pub async fn submit(&self, cmd: Command) {
        let mut writer = self.writer.lock().await;
        for b in self.command_mapper.lock().await.map(cmd) {
            let _ = writer.write_u8(b).await;
        }
    }
}