1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
use futures::TryStreamExt;
use reqwest::{Client, RequestBuilder};
use tokio::io::AsyncBufReadExt;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio::task;
use tokio::task::JoinHandle;
use tokio_util::io::StreamReader;
use crate::models::event::{Event, EventCollection};
use crate::{ClientError, NomadClient};
async fn process_events(
client: Client,
builder: RequestBuilder,
sender: UnboundedSender<Event>,
) -> Result<(), ClientError> {
let req = builder.build();
if req.is_err() {
return Err(ClientError::RequestError(req.err().unwrap().to_string()));
}
let req = req.unwrap();
match client.execute(req).await {
Ok(response) => {
let stream = response.bytes_stream();
let reader = StreamReader::new(
stream.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)),
);
let mut lines = reader.lines();
'stream: while let Ok(Some(line)) = lines.next_line().await {
if line == "{}" {
continue;
}
match serde_json::from_str::<EventCollection>(&line) {
Ok(collection) => {
for event in collection.events {
match sender.send(event) {
Ok(_) => {}
Err(err) => {
println!("stopping event stream: '{}'", err);
break 'stream;
}
}
}
}
Err(err) => return Err(ClientError::DeserializationError(err.to_string())),
}
}
Ok(())
}
Err(err) => Err(ClientError::NetworkError(err.to_string())),
}
}
impl NomadClient {
pub fn events_subscribe(
&self,
) -> (
JoinHandle<Result<(), ClientError>>,
UnboundedReceiver<Event>,
) {
let builder = self.request(reqwest::Method::GET, "/event/stream");
let (sender, receiver) = unbounded_channel();
let client = self.client.clone();
let handle = task::spawn(async move { process_events(client, builder, sender).await });
(handle, receiver)
}
}
pub mod models {}