1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
use std::{
io::Write,
net::{
Shutdown,
TcpListener,
TcpStream,
},
sync::{
Arc,
RwLock,
},
};
pub struct Streamer
{
clients: RwLock<Vec<TcpStream>>,
}
impl Streamer
{
pub fn new() -> Arc<Self>
{
Arc::new(Streamer {
clients: (RwLock::new(Vec::new())),
})
}
pub fn new_client(&self, s: TcpStream)
{
let mut clients = self.clients.write().unwrap();
clients.push(s);
}
pub fn send(&self, data: &str)
{
let mut clients = self.clients.write().unwrap();
let mut i = 0;
while i < clients.len()
{
let mut sock = &mut clients[i];
if write!(&mut sock, "data: {}\r\n\n", data).is_err()
{
clients.remove(i);
continue;
}
i += 1
}
}
pub fn send_json_with_event<T: serde::Serialize>(&self, event: &str, data: &T)
{
self.send_with_event(
event,
serde_json::to_string(data).unwrap_or_default().as_str(),
)
}
pub fn send_with_event(&self, event: &str, data: &str)
{
let mut clients = self.clients.write().unwrap();
let mut i = 0;
while i < clients.len()
{
let mut sock = &mut clients[i];
if write!(&mut sock, "event: {}\r\ndata: {}\r\n\n", event, data).is_err()
{
clients.remove(i);
continue;
}
i += 1
}
}
pub fn send_json<T: serde::Serialize>(&self, data: &T)
{
self.send(serde_json::to_string(data).unwrap_or_default().as_str())
}
pub fn start(self: Arc<Self>, addr: &str) -> std::io::Result<()>
{
let listener = TcpListener::bind(addr)?;
std::thread::spawn(move || {
loop
{
let (sock, _addr) = match listener.accept()
{
Ok((sock, _addr)) =>
{
match sock.set_read_timeout(Some(std::time::Duration::from_millis(200)))
{
Err(e) =>
{
eprintln!("error setting timeout{}", e);
let _ = sock.shutdown(Shutdown::Both);
continue;
}
Ok(_) => (sock, _addr),
}
}
Err(e) =>
{
eprintln!("error accepting the client to streaming endpoint {}", e);
continue;
}
};
let mut sock = sock;
let _ = sock.write(b"HTTP/1.1 200 OK\r\n");
let _ = sock.write(b"Connection: keep-alive\r\n");
let _ = sock.write(b"Content-Type: text/event-stream\r\n");
let _ = sock.write(b"x-content-type-options: nosniff\r\n");
if sock
.write(b"Access-Control-Allow-Origin: *\r\n\r\n")
.is_ok()
{
self.new_client(sock);
};
}
});
Ok(())
}
}