nomad_client_rs/api/
event.rsuse futures::TryStreamExt;
use reqwest::{Client, RequestBuilder};
use tokio::io::AsyncBufReadExt;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio::task;
use tokio::task::JoinHandle;
use tokio_util::io::StreamReader;
use crate::api::event::models::EventsSubscribeParams;
use crate::models::event::{Event, EventCollection};
use crate::{ClientError, NomadClient};
async fn process_events(
client: Client,
builder: RequestBuilder,
sender: UnboundedSender<Event>,
) -> Result<(), ClientError> {
let req = builder.build();
if req.is_err() {
return Err(ClientError::RequestError(req.err().unwrap().to_string()));
}
let req = req.unwrap();
match client.execute(req).await {
Ok(response) => {
let stream = response.bytes_stream();
let reader = StreamReader::new(
stream.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)),
);
let mut lines = reader.lines();
'stream: while let Ok(Some(line)) = lines.next_line().await {
if line == "{}" {
continue;
}
match serde_json::from_str::<EventCollection>(&line) {
Ok(collection) => {
for event in collection.events {
match sender.send(event) {
Ok(_) => {}
Err(err) => {
println!("stopping event stream: '{}'", err);
break 'stream;
}
}
}
}
Err(err) => return Err(ClientError::DeserializationError(err.to_string())),
}
}
Ok(())
}
Err(err) => Err(ClientError::NetworkError(err.to_string())),
}
}
impl NomadClient {
pub fn events_subscribe(
&self,
params: &EventsSubscribeParams,
) -> (
JoinHandle<Result<(), ClientError>>,
UnboundedReceiver<Event>,
) {
let mut builder = self
.request(reqwest::Method::GET, "/event/stream")
.query(params);
if !params.topic.is_empty() {
let params: Vec<(&str, String)> = params
.topic
.iter()
.map(|topic| ("topic", format!("{topic:?}")))
.collect();
builder = builder.query(params.as_slice());
}
let (sender, receiver) = unbounded_channel();
let client = self.client.clone();
let handle = task::spawn(async move { process_events(client, builder, sender).await });
(handle, receiver)
}
}
pub mod models {
use serde::Serialize;
use crate::models::event::EventTopic;
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct EventsSubscribeParams {
pub namespace: Option<String>,
#[serde(skip_serializing)]
pub topic: Vec<EventTopic>,
}
impl EventsSubscribeParams {
pub fn add_topic(&mut self, topic: EventTopic) {
self.topic.push(topic);
}
}
impl Default for EventsSubscribeParams {
fn default() -> Self {
EventsSubscribeParams {
namespace: "*".to_string().into(),
topic: Vec::default(),
}
}
}
}
#[cfg(test)]
mod tests {
use crate::api::event::models::EventsSubscribeParams;
use crate::models::event::EventTopic;
use crate::NomadClient;
#[tokio::test]
#[ignore]
async fn job_parse_should_return_valid_job() {
let client = NomadClient::default();
let mut params = EventsSubscribeParams::default();
params.add_topic(EventTopic::Allocation);
let (handle, mut receiver) = client.events_subscribe(¶ms);
if let Some(event) = receiver.recv().await {
println!("{event:?}");
assert!(matches!(event.topic, Some(EventTopic::Allocation)))
}
}
}