nomad_client_rs/api/
event.rs1use futures::TryStreamExt;
2use reqwest::{Client, RequestBuilder};
3use tokio::io::AsyncBufReadExt;
4use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
5use tokio::task;
6use tokio::task::JoinHandle;
7use tokio_util::io::StreamReader;
8
9use crate::api::event::models::EventsSubscribeParams;
10use crate::models::event::{Event, EventCollection};
11use crate::{ClientError, NomadClient};
12
13async fn process_events(
14 client: Client,
15 builder: RequestBuilder,
16 sender: UnboundedSender<Event>,
17) -> Result<(), ClientError> {
18 let req = builder.build();
19 if req.is_err() {
20 return Err(ClientError::RequestError(req.err().unwrap().to_string()));
21 }
22
23 let req = req.unwrap();
24 match client.execute(req).await {
25 Ok(response) => {
26 let stream = response.bytes_stream();
27 let reader = StreamReader::new(
28 stream.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)),
29 );
30 let mut lines = reader.lines();
31
32 'stream: while let Ok(Some(line)) = lines.next_line().await {
33 if line == "{}" {
35 continue;
36 }
37
38 match serde_json::from_str::<EventCollection>(&line) {
39 Ok(collection) => {
40 for event in collection.events {
41 match sender.send(event) {
42 Ok(_) => {}
43 Err(err) => {
44 println!("stopping event stream: '{}'", err);
45 break 'stream;
46 }
47 }
48 }
49 }
50 Err(err) => return Err(ClientError::DeserializationError(err.to_string())),
51 }
52 }
53 Ok(())
54 }
55 Err(err) => Err(ClientError::NetworkError(err.to_string())),
56 }
57}
58
59impl NomadClient {
60 pub fn events_subscribe(
61 &self,
62 params: &EventsSubscribeParams,
63 ) -> (
64 JoinHandle<Result<(), ClientError>>,
65 UnboundedReceiver<Event>,
66 ) {
67 let mut builder = self
68 .request(reqwest::Method::GET, "/event/stream")
69 .query(params);
70
71 if !params.topic.is_empty() {
72 let params: Vec<(&str, String)> = params
73 .topic
74 .iter()
75 .map(|topic| ("topic", format!("{topic:?}")))
76 .collect();
77
78 builder = builder.query(params.as_slice());
79 }
80
81 let (sender, receiver) = unbounded_channel();
82 let client = self.client.clone();
83
84 let handle = task::spawn(async move { process_events(client, builder, sender).await });
85
86 (handle, receiver)
87 }
88}
89
90pub mod models {
91 use serde::Serialize;
92
93 use crate::models::event::EventTopic;
94
95 #[derive(Debug, Serialize)]
96 #[serde(rename_all = "camelCase")]
97 pub struct EventsSubscribeParams {
98 pub namespace: Option<String>,
99 #[serde(skip_serializing)]
100 pub topic: Vec<EventTopic>,
101 }
102
103 impl EventsSubscribeParams {
104 pub fn add_topic(&mut self, topic: EventTopic) {
105 self.topic.push(topic);
106 }
107 }
108
109 impl Default for EventsSubscribeParams {
110 fn default() -> Self {
111 EventsSubscribeParams {
112 namespace: "*".to_string().into(),
113 topic: Vec::default(),
114 }
115 }
116 }
117}
118
119#[cfg(test)]
120mod tests {
121 use crate::api::event::models::EventsSubscribeParams;
122 use crate::models::event::EventTopic;
123 use crate::NomadClient;
124
125 #[tokio::test]
126 #[ignore]
127 async fn job_parse_should_return_valid_job() {
128 let client = NomadClient::default();
129
130 let mut params = EventsSubscribeParams::default();
131 params.add_topic(EventTopic::Allocation);
132
133 let (handle, mut receiver) = client.events_subscribe(¶ms);
134
135 if let Some(event) = receiver.recv().await {
136 println!("{event:?}");
137 assert!(matches!(event.topic, Some(EventTopic::Allocation)))
138 }
139 }
140}