http_stream/
stream.rs

1use std::{
2    io::{Read, Write},
3    net::{Shutdown, TcpListener, TcpStream},
4    sync::{Arc, RwLock},
5};
6pub struct Streamer {
7    clients: RwLock<Vec<TcpStream>>,
8}
9impl Streamer {
10    pub fn new() -> Arc<Self> //{{{
11    {
12        Arc::new(Streamer {
13            clients: (RwLock::new(Vec::new())),
14        })
15    }
16
17    //}}}
18    pub fn connected_clients(&self) -> usize {
19        self.clients
20            .read()
21            .map_err(|_| 0)
22            .map(|e| e.len())
23            .unwrap_or_default()
24    }
25
26    pub fn new_client(&self, s: TcpStream) //{{{
27    {
28        let _ = self.clients.write().and_then(|mut c| Ok(c.push(s)));
29    }
30
31    //}}}
32    pub fn send(&self, data: &str) //{{{
33    {
34        let _ = self.clients.write().and_then(|mut clients| {
35            let mut i = 0;
36            while i < clients.len() {
37                let mut sock = &mut clients[i];
38                if write!(&mut sock, "data: {}\r\n\n", data).is_err() {
39                    clients.remove(i);
40                    continue;
41                }
42                i += 1
43            }
44            Ok(())
45        });
46    }
47
48    //}}}
49    pub fn send_json_with_event<T: serde::Serialize>(&self, event: &str, data: &T) //{{{
50    {
51        self.send_with_event(
52            event,
53            serde_json::to_string(data).unwrap_or_default().as_str(),
54        )
55    }
56
57    //}}}
58    pub fn send_with_event(&self, event: &str, data: &str) //{{{
59    {
60        let _ = self.clients.write().and_then(|mut clients| {
61            let mut i = 0;
62            while i < clients.len() {
63                let mut sock = &mut clients[i];
64                if write!(&mut sock, "event: {}\r\ndata: {}\r\n\n", event, data).is_err() {
65                    clients.remove(i);
66                    continue;
67                }
68                i += 1
69            }
70            Ok(())
71        });
72    }
73
74    //}}}
75    pub fn send_json<T: serde::Serialize>(&self, data: &T) //{{{
76    {
77        self.send(serde_json::to_string(data).unwrap_or_default().as_str())
78    }
79
80    //}}}
81    pub fn start<F: FnOnce(&str) -> bool + Send + 'static + Clone>(
82        self: Arc<Self>,
83        addr: &str,
84        control_fn: F,
85    ) -> std::io::Result<()> //{{{
86    {
87        let re: regex::Regex = regex::Regex::new(r"GET /[^ ]+").unwrap();
88        let listener = TcpListener::bind(addr)?;
89        std::thread::spawn(move || {
90            //{{{
91            loop {
92                let (sock, _addr) = match listener.accept() {
93                    Ok((sock, _addr)) => {
94                        match sock.set_read_timeout(Some(std::time::Duration::from_millis(200))) {
95                            Err(e) => {
96                                eprintln!("error setting timeout{}", e);
97                                let _ = sock.shutdown(Shutdown::Both);
98                                continue;
99                            }
100                            Ok(_) => (sock, _addr),
101                        }
102                    }
103                    Err(e) => {
104                        eprintln!("error accepting the client to streaming endpoint {}", e);
105                        continue;
106                    }
107                };
108                if sock
109                    .set_write_timeout(Some(std::time::Duration::from_millis(50)))
110                    .is_err()
111                {
112                    eprintln!("error setting up socket");
113                    continue;
114                }
115                if sock
116                    .set_read_timeout(Some(std::time::Duration::from_millis(50)))
117                    .is_err()
118                {
119                    eprintln!("error setting up socket");
120                    continue;
121                }
122                let mut buf = [0u8; 500];
123                let mut sock = sock;
124                if sock.read(&mut buf).is_err() {
125                    eprintln!("error setting up socket");
126                    continue;
127                }
128                let req = String::from_utf8_lossy(&buf);
129                let tkn = re.find(&req);
130                if tkn.is_none() {
131                    continue;
132                }
133                // let tkn: Vec<&str> =
134                let decoded = percent_encoding::percent_decode(tkn.unwrap().as_str().as_bytes())
135                    .decode_utf8();
136                if decoded.is_err() {
137                    continue;
138                }
139                let tkn = decoded.unwrap();
140                let tkn = tkn.split("/").collect::<Vec<&str>>();
141                if tkn.len() != 2 {
142                    continue;
143                }
144                let tkn = tkn[1];
145                if !control_fn.clone()(tkn) {
146                    let _ = sock.write(b"invalid token\r\n");
147                    continue;
148                }
149                let _ = sock.write(b"HTTP/1.1 200 OK\r\n");
150                let _ = sock.write(b"Connection: keep-alive\r\n");
151                let _ = sock.write(b"Content-Type: text/event-stream\r\n");
152                let _ = sock.write(b"x-content-type-options: nosniff\r\n");
153                if sock
154                    .write(b"Access-Control-Allow-Origin: *\r\n\r\n")
155                    .is_ok()
156                {
157                    self.new_client(sock);
158                };
159            }
160        }); //}}}
161        Ok(())
162    } //}}}
163}