1use std::{
2 io::{Read, Write},
3 net::{Shutdown, TcpListener, TcpStream},
4 sync::{Arc, RwLock},
5};
6pub struct Streamer {
7 clients: RwLock<Vec<TcpStream>>,
8}
9impl Streamer {
10 pub fn new() -> Arc<Self> {
12 Arc::new(Streamer {
13 clients: (RwLock::new(Vec::new())),
14 })
15 }
16
17 pub fn connected_clients(&self) -> usize {
19 self.clients
20 .read()
21 .map_err(|_| 0)
22 .map(|e| e.len())
23 .unwrap_or_default()
24 }
25
26 pub fn new_client(&self, s: TcpStream) {
28 let _ = self.clients.write().and_then(|mut c| Ok(c.push(s)));
29 }
30
31 pub fn send(&self, data: &str) {
34 let _ = self.clients.write().and_then(|mut clients| {
35 let mut i = 0;
36 while i < clients.len() {
37 let mut sock = &mut clients[i];
38 if write!(&mut sock, "data: {}\r\n\n", data).is_err() {
39 clients.remove(i);
40 continue;
41 }
42 i += 1
43 }
44 Ok(())
45 });
46 }
47
48 pub fn send_json_with_event<T: serde::Serialize>(&self, event: &str, data: &T) {
51 self.send_with_event(
52 event,
53 serde_json::to_string(data).unwrap_or_default().as_str(),
54 )
55 }
56
57 pub fn send_with_event(&self, event: &str, data: &str) {
60 let _ = self.clients.write().and_then(|mut clients| {
61 let mut i = 0;
62 while i < clients.len() {
63 let mut sock = &mut clients[i];
64 if write!(&mut sock, "event: {}\r\ndata: {}\r\n\n", event, data).is_err() {
65 clients.remove(i);
66 continue;
67 }
68 i += 1
69 }
70 Ok(())
71 });
72 }
73
74 pub fn send_json<T: serde::Serialize>(&self, data: &T) {
77 self.send(serde_json::to_string(data).unwrap_or_default().as_str())
78 }
79
80 pub fn start<F: FnOnce(&str) -> bool + Send + 'static + Clone>(
82 self: Arc<Self>,
83 addr: &str,
84 control_fn: F,
85 ) -> std::io::Result<()> {
87 let re: regex::Regex = regex::Regex::new(r"GET /[^ ]+").unwrap();
88 let listener = TcpListener::bind(addr)?;
89 std::thread::spawn(move || {
90 loop {
92 let (sock, _addr) = match listener.accept() {
93 Ok((sock, _addr)) => {
94 match sock.set_read_timeout(Some(std::time::Duration::from_millis(200))) {
95 Err(e) => {
96 eprintln!("error setting timeout{}", e);
97 let _ = sock.shutdown(Shutdown::Both);
98 continue;
99 }
100 Ok(_) => (sock, _addr),
101 }
102 }
103 Err(e) => {
104 eprintln!("error accepting the client to streaming endpoint {}", e);
105 continue;
106 }
107 };
108 if sock
109 .set_write_timeout(Some(std::time::Duration::from_millis(50)))
110 .is_err()
111 {
112 eprintln!("error setting up socket");
113 continue;
114 }
115 if sock
116 .set_read_timeout(Some(std::time::Duration::from_millis(50)))
117 .is_err()
118 {
119 eprintln!("error setting up socket");
120 continue;
121 }
122 let mut buf = [0u8; 500];
123 let mut sock = sock;
124 if sock.read(&mut buf).is_err() {
125 eprintln!("error setting up socket");
126 continue;
127 }
128 let req = String::from_utf8_lossy(&buf);
129 let tkn = re.find(&req);
130 if tkn.is_none() {
131 continue;
132 }
133 let decoded = percent_encoding::percent_decode(tkn.unwrap().as_str().as_bytes())
135 .decode_utf8();
136 if decoded.is_err() {
137 continue;
138 }
139 let tkn = decoded.unwrap();
140 let tkn = tkn.split("/").collect::<Vec<&str>>();
141 if tkn.len() != 2 {
142 continue;
143 }
144 let tkn = tkn[1];
145 if !control_fn.clone()(tkn) {
146 let _ = sock.write(b"invalid token\r\n");
147 continue;
148 }
149 let _ = sock.write(b"HTTP/1.1 200 OK\r\n");
150 let _ = sock.write(b"Connection: keep-alive\r\n");
151 let _ = sock.write(b"Content-Type: text/event-stream\r\n");
152 let _ = sock.write(b"x-content-type-options: nosniff\r\n");
153 if sock
154 .write(b"Access-Control-Allow-Origin: *\r\n\r\n")
155 .is_ok()
156 {
157 self.new_client(sock);
158 };
159 }
160 }); Ok(())
162 } }