1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
use futures::TryStreamExt;
use reqwest::{Client, RequestBuilder};
use tokio::io::AsyncBufReadExt;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio::task;
use tokio::task::JoinHandle;
use tokio_util::io::StreamReader;
use crate::{ClientError, NomadClient};
use crate::models::event::{Event, EventCollection};
async fn process_events(client: Client, builder: RequestBuilder, sender: UnboundedSender<Event>) -> Result<(), ClientError> {
let req = builder.build();
if req.is_err() {
return Err(ClientError::RequestError(req.err().unwrap().to_string()));
}
let req = req.unwrap();
return match client.execute(req).await {
Ok(response) => {
let stream = response.bytes_stream();
let reader = StreamReader::new(stream.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)));
let mut lines = reader.lines();
'stream: while let Ok(Some(line)) = lines.next_line().await {
if line == "{}" {
continue;
}
match serde_json::from_str::<EventCollection>(&line) {
Ok(collection) => {
for event in collection.events {
match sender.send(event) {
Ok(_) => {}
Err(err) => {
println!("stopping event stream: '{}'", err);
break 'stream;
}
}
}
}
Err(e) => return Err(ClientError::DeserializationError(e.to_string()))
}
}
Ok(())
}
Err(err) => Err(ClientError::NetworkError(err.to_string()))
};
}
impl NomadClient {
pub fn events_subscribe(&self) -> (JoinHandle<Result<(), ClientError>>, UnboundedReceiver<Event>) {
let builder = self.request(reqwest::Method::GET, "/event/stream");
let (sender, receiver) = unbounded_channel();
let client = self.client.clone();
let handle = task::spawn(async move {
process_events(client, builder, sender).await
});
return (handle, receiver);
}
}
pub mod models {}