1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
use futures::TryStreamExt;
use reqwest::{Client, RequestBuilder};
use tokio::io::AsyncBufReadExt;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio::task;
use tokio::task::JoinHandle;
use tokio_util::io::StreamReader;

use crate::{ClientError, NomadClient};
use crate::models::event::{Event, EventCollection};

async fn process_events(client: Client, builder: RequestBuilder, sender: UnboundedSender<Event>) -> Result<(), ClientError> {
    let req = builder.build();
    if req.is_err() {
        return Err(ClientError::RequestError(req.err().unwrap().to_string()));
    }

    let req = req.unwrap();
    return match client.execute(req).await {
        Ok(response) => {
            let stream = response.bytes_stream();
            let reader = StreamReader::new(stream.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)));
            let mut lines = reader.lines();

            'stream: while let Ok(Some(line)) = lines.next_line().await {
                // if line is '{}', then it's a heartbeat and we can ignore it
                if line == "{}" {
                    continue;
                }

                match serde_json::from_str::<EventCollection>(&line) {
                    Ok(collection) => {
                        for event in collection.events {
                            match sender.send(event) {
                                Ok(_) => {}
                                Err(err) => {
                                    println!("stopping event stream: '{}'", err);
                                    break 'stream;
                                }
                            }
                        }
                    }
                    Err(e) => return Err(ClientError::DeserializationError(e.to_string()))
                }
            }
            Ok(())
        }
        Err(err) => Err(ClientError::NetworkError(err.to_string()))
    };
}


impl NomadClient {
    pub fn events_subscribe(&self) -> (JoinHandle<Result<(), ClientError>>, UnboundedReceiver<Event>) {
        let builder = self.request(reqwest::Method::GET, "/event/stream");
        let (sender, receiver) = unbounded_channel();
        let client = self.client.clone();
        let handle = task::spawn(async move {
            process_events(client, builder, sender).await
        });

        return (handle, receiver);
    }
}

pub mod models {}