1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
use std::{
    io::Write,
    net::{
        Shutdown,
        TcpListener,
        TcpStream,
    },
    sync::{
        Arc,
        RwLock,
    },
};
pub struct Streamer
{
    clients: RwLock<Vec<TcpStream>>,
}
impl Streamer
{
    pub fn new() -> Arc<Self> //{{{
    {
        Arc::new(Streamer {
            clients: (RwLock::new(Vec::new())),
        })
    }

    //}}}
    pub fn new_client(&self, s: TcpStream) //{{{
    {
        let mut clients = self.clients.write().unwrap();
        clients.push(s);
    }

    //}}}
    pub fn send(&self, data: &str) //{{{
    {
        let mut clients = self.clients.write().unwrap();
        let mut i = 0;
        while i < clients.len()
        {
            let mut sock = &mut clients[i];
            if write!(&mut sock, "data: {}\r\n\n", data).is_err()
            {
                clients.remove(i);
                continue;
            }
            i += 1
        }
    }

    //}}}
    pub fn send_json_with_event<T: serde::Serialize>(&self, event: &str, data: &T) //{{{
    {
        self.send_with_event(
            event,
            serde_json::to_string(data).unwrap_or_default().as_str(),
        )
    }

    //}}}
    pub fn send_with_event(&self, event: &str, data: &str) //{{{
    {
        let mut clients = self.clients.write().unwrap();
        let mut i = 0;
        while i < clients.len()
        {
            let mut sock = &mut clients[i];
            if write!(&mut sock, "event: {}\r\ndata: {}\r\n\n", event, data).is_err()
            {
                clients.remove(i);
                continue;
            }
            i += 1
        }
    }

    //}}}
    pub fn send_json<T: serde::Serialize>(&self, data: &T) //{{{
    {
        self.send(serde_json::to_string(data).unwrap_or_default().as_str())
    }

    //}}}
    pub fn start(self: Arc<Self>, addr: &str) -> std::io::Result<()> //{{{
    {
        let listener = TcpListener::bind(addr)?;
        std::thread::spawn(move || {
            //{{{
            loop
            {
                let (sock, _addr) = match listener.accept()
                {
                    Ok((sock, _addr)) =>
                    {
                        match sock.set_read_timeout(Some(std::time::Duration::from_millis(200)))
                        {
                            Err(e) =>
                            {
                                eprintln!("error setting timeout{}", e);
                                let _ = sock.shutdown(Shutdown::Both);
                                continue;
                            }
                            Ok(_) => (sock, _addr),
                        }
                    }
                    Err(e) =>
                    {
                        eprintln!("error accepting the client to streaming endpoint {}", e);
                        continue;
                    }
                };
                let mut sock = sock;
                let _ = sock.write(b"HTTP/1.1 200 OK\r\n");
                let _ = sock.write(b"Connection: keep-alive\r\n");
                let _ = sock.write(b"Content-Type: text/event-stream\r\n");
                let _ = sock.write(b"x-content-type-options: nosniff\r\n");
                if sock
                    .write(b"Access-Control-Allow-Origin: *\r\n\r\n")
                    .is_ok()
                {
                    self.new_client(sock);
                };
            }
        }); //}}}
        Ok(())
    } //}}}
}