nomad_client_rs/api/
event.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
use futures::TryStreamExt;
use reqwest::{Client, RequestBuilder};
use tokio::io::AsyncBufReadExt;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio::task;
use tokio::task::JoinHandle;
use tokio_util::io::StreamReader;

use crate::api::event::models::EventsSubscribeParams;
use crate::models::event::{Event, EventCollection};
use crate::{ClientError, NomadClient};

async fn process_events(
    client: Client,
    builder: RequestBuilder,
    sender: UnboundedSender<Event>,
) -> Result<(), ClientError> {
    let req = builder.build();
    if req.is_err() {
        return Err(ClientError::RequestError(req.err().unwrap().to_string()));
    }

    let req = req.unwrap();
    match client.execute(req).await {
        Ok(response) => {
            let stream = response.bytes_stream();
            let reader = StreamReader::new(
                stream.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)),
            );
            let mut lines = reader.lines();

            'stream: while let Ok(Some(line)) = lines.next_line().await {
                // if line is '{}', then it's a heartbeat and we can ignore it
                if line == "{}" {
                    continue;
                }

                match serde_json::from_str::<EventCollection>(&line) {
                    Ok(collection) => {
                        for event in collection.events {
                            match sender.send(event) {
                                Ok(_) => {}
                                Err(err) => {
                                    println!("stopping event stream: '{}'", err);
                                    break 'stream;
                                }
                            }
                        }
                    }
                    Err(err) => return Err(ClientError::DeserializationError(err.to_string())),
                }
            }
            Ok(())
        }
        Err(err) => Err(ClientError::NetworkError(err.to_string())),
    }
}

impl NomadClient {
    pub fn events_subscribe(
        &self,
        params: &EventsSubscribeParams,
    ) -> (
        JoinHandle<Result<(), ClientError>>,
        UnboundedReceiver<Event>,
    ) {
        let mut builder = self
            .request(reqwest::Method::GET, "/event/stream")
            .query(params);

        if !params.topic.is_empty() {
            let params: Vec<(&str, String)> = params
                .topic
                .iter()
                .map(|topic| ("topic", format!("{topic:?}")))
                .collect();

            builder = builder.query(params.as_slice());
        }

        let (sender, receiver) = unbounded_channel();
        let client = self.client.clone();

        let handle = task::spawn(async move { process_events(client, builder, sender).await });

        (handle, receiver)
    }
}

pub mod models {
    use serde::Serialize;

    use crate::models::event::EventTopic;

    #[derive(Debug, Serialize)]
    #[serde(rename_all = "camelCase")]
    pub struct EventsSubscribeParams {
        pub namespace: Option<String>,
        #[serde(skip_serializing)]
        pub topic: Vec<EventTopic>,
    }

    impl EventsSubscribeParams {
        pub fn add_topic(&mut self, topic: EventTopic) {
            self.topic.push(topic);
        }
    }

    impl Default for EventsSubscribeParams {
        fn default() -> Self {
            EventsSubscribeParams {
                namespace: "*".to_string().into(),
                topic: Vec::default(),
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use crate::api::event::models::EventsSubscribeParams;
    use crate::models::event::EventTopic;
    use crate::NomadClient;

    #[tokio::test]
    #[ignore]
    async fn job_parse_should_return_valid_job() {
        let client = NomadClient::default();

        let mut params = EventsSubscribeParams::default();
        params.add_topic(EventTopic::Allocation);

        let (handle, mut receiver) = client.events_subscribe(&params);

        if let Some(event) = receiver.recv().await {
            println!("{event:?}");
            assert!(matches!(event.topic, Some(EventTopic::Allocation)))
        }
    }
}