nomad_client_rs/api/
event.rs

1use futures::TryStreamExt;
2use reqwest::{Client, RequestBuilder};
3use tokio::io::AsyncBufReadExt;
4use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
5use tokio::task;
6use tokio::task::JoinHandle;
7use tokio_util::io::StreamReader;
8
9use crate::api::event::models::EventsSubscribeParams;
10use crate::models::event::{Event, EventCollection};
11use crate::{ClientError, NomadClient};
12
13async fn process_events(
14    client: Client,
15    builder: RequestBuilder,
16    sender: UnboundedSender<Event>,
17) -> Result<(), ClientError> {
18    let req = builder.build();
19    if req.is_err() {
20        return Err(ClientError::RequestError(req.err().unwrap().to_string()));
21    }
22
23    let req = req.unwrap();
24    match client.execute(req).await {
25        Ok(response) => {
26            let stream = response.bytes_stream();
27            let reader = StreamReader::new(
28                stream.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)),
29            );
30            let mut lines = reader.lines();
31
32            'stream: while let Ok(Some(line)) = lines.next_line().await {
33                // if line is '{}', then it's a heartbeat and we can ignore it
34                if line == "{}" {
35                    continue;
36                }
37
38                match serde_json::from_str::<EventCollection>(&line) {
39                    Ok(collection) => {
40                        for event in collection.events {
41                            match sender.send(event) {
42                                Ok(_) => {}
43                                Err(err) => {
44                                    println!("stopping event stream: '{}'", err);
45                                    break 'stream;
46                                }
47                            }
48                        }
49                    }
50                    Err(err) => return Err(ClientError::DeserializationError(err.to_string())),
51                }
52            }
53            Ok(())
54        }
55        Err(err) => Err(ClientError::NetworkError(err.to_string())),
56    }
57}
58
59impl NomadClient {
60    pub fn events_subscribe(
61        &self,
62        params: &EventsSubscribeParams,
63    ) -> (
64        JoinHandle<Result<(), ClientError>>,
65        UnboundedReceiver<Event>,
66    ) {
67        let mut builder = self
68            .request(reqwest::Method::GET, "/event/stream")
69            .query(params);
70
71        if !params.topic.is_empty() {
72            let params: Vec<(&str, String)> = params
73                .topic
74                .iter()
75                .map(|topic| ("topic", format!("{topic:?}")))
76                .collect();
77
78            builder = builder.query(params.as_slice());
79        }
80
81        let (sender, receiver) = unbounded_channel();
82        let client = self.client.clone();
83
84        let handle = task::spawn(async move { process_events(client, builder, sender).await });
85
86        (handle, receiver)
87    }
88}
89
90pub mod models {
91    use serde::Serialize;
92
93    use crate::models::event::EventTopic;
94
95    #[derive(Debug, Serialize)]
96    #[serde(rename_all = "camelCase")]
97    pub struct EventsSubscribeParams {
98        pub namespace: Option<String>,
99        #[serde(skip_serializing)]
100        pub topic: Vec<EventTopic>,
101    }
102
103    impl EventsSubscribeParams {
104        pub fn add_topic(&mut self, topic: EventTopic) {
105            self.topic.push(topic);
106        }
107    }
108
109    impl Default for EventsSubscribeParams {
110        fn default() -> Self {
111            EventsSubscribeParams {
112                namespace: "*".to_string().into(),
113                topic: Vec::default(),
114            }
115        }
116    }
117}
118
119#[cfg(test)]
120mod tests {
121    use crate::api::event::models::EventsSubscribeParams;
122    use crate::models::event::EventTopic;
123    use crate::NomadClient;
124
125    #[tokio::test]
126    #[ignore]
127    async fn job_parse_should_return_valid_job() {
128        let client = NomadClient::default();
129
130        let mut params = EventsSubscribeParams::default();
131        params.add_topic(EventTopic::Allocation);
132
133        let (handle, mut receiver) = client.events_subscribe(&params);
134
135        if let Some(event) = receiver.recv().await {
136            println!("{event:?}");
137            assert!(matches!(event.topic, Some(EventTopic::Allocation)))
138        }
139    }
140}