icinga2_api/api/
async_client.rs

1//! Main API object (async version)
2
3use std::{path::Path, str::from_utf8};
4
5use futures::stream::TryStreamExt;
6use futures::Stream;
7use futures::StreamExt;
8use serde::{de::DeserializeOwned, Serialize};
9use tokio::io::AsyncBufReadExt;
10use tokio_stream::wrappers::LinesStream;
11use tokio_util::io::StreamReader;
12
13use crate::config::Icinga2Instance;
14use crate::types::{
15    enums::event_stream_type::IcingaEventStreamType,
16    event_stream::IcingaEvent,
17    filter::IcingaFilter,
18    rest::{RestApiEndpoint, RestApiResponse},
19};
20
21/// the runtime object for an Icinga2 instance (blocking variant)
22#[derive(Debug, Clone)]
23pub struct Icinga2Async {
24    /// the HTTP client to use
25    client: reqwest::Client,
26    /// the base URL for the Icinga API
27    pub url: url::Url,
28    /// username
29    pub username: String,
30    /// password
31    password: String,
32}
33
34impl Icinga2Async {
35    /// create a new Icinga2 instance from a config that was
36    /// either manually created or previously loaded via [Icinga2Instance::from_config_file]
37    ///
38    /// # Errors
39    /// this fails if the CA certificate file mentioned in the configuration
40    /// can not be found or parsed
41    pub fn from_instance_config(config: &Icinga2Instance) -> Result<Self, crate::error::Error> {
42        let client_builder = reqwest::ClientBuilder::new();
43        let client_builder = client_builder.user_agent(concat!(
44            env!("CARGO_PKG_NAME"),
45            "/",
46            env!("CARGO_PKG_VERSION")
47        ));
48        let mut headers = reqwest::header::HeaderMap::new();
49        headers.insert(
50            "Content-Type",
51            reqwest::header::HeaderValue::from_static("application/json"),
52        );
53        headers.insert(
54            "Accept",
55            reqwest::header::HeaderValue::from_static("application/json"),
56        );
57        let client_builder = client_builder.default_headers(headers);
58        let client_builder = if let Some(ca_certificate) = &config.ca_certificate {
59            let ca_cert_content = std::fs::read(ca_certificate)
60                .map_err(crate::error::Error::CouldNotReadCACertFile)?;
61            let ca_cert = reqwest::Certificate::from_pem(&ca_cert_content)
62                .map_err(crate::error::Error::CouldNotParsePEMCACertificate)?;
63            client_builder.tls_certs_only([ca_cert])
64        } else {
65            client_builder
66        };
67        let client = client_builder
68            .build()
69            .map_err(crate::error::Error::CouldNotBuildReqwestClientFromSuppliedInformation)?;
70        let url =
71            url::Url::parse(&config.url).map_err(crate::error::Error::CouldNotParseUrlInConfig)?;
72        let username = config.username.clone();
73        let password = config.password.clone();
74        Ok(Icinga2Async {
75            client,
76            url,
77            username,
78            password,
79        })
80    }
81
82    /// create a new Icinga2 instance from a TOML config file
83    ///
84    /// # Errors
85    /// this fails if the configuration file can not be found or parsed
86    /// or the CA certificate file mentioned in the configuration file
87    /// can not be found or parsed
88    pub fn from_config_file(path: &Path) -> Result<Self, crate::error::Error> {
89        let icinga_instance = Icinga2Instance::from_config_file(path)?;
90        Self::from_instance_config(&icinga_instance)
91    }
92
93    /// common code for the REST API calls
94    ///
95    /// # Errors
96    ///
97    /// this returns an error if encoding, the actual request, or decoding of the response fail
98    pub async fn rest<ApiEndpoint, Res>(
99        &self,
100        api_endpoint: ApiEndpoint,
101    ) -> Result<Res, crate::error::Error>
102    where
103        ApiEndpoint: RestApiEndpoint,
104        <ApiEndpoint as RestApiEndpoint>::RequestBody: Clone + Serialize + std::fmt::Debug,
105        Res: DeserializeOwned + std::fmt::Debug + RestApiResponse<ApiEndpoint>,
106    {
107        let method = api_endpoint.method()?;
108        let url = api_endpoint.url(&self.url)?;
109        let request_body: Option<std::borrow::Cow<<ApiEndpoint as RestApiEndpoint>::RequestBody>> =
110            api_endpoint.request_body()?;
111        let actual_method = if method == reqwest::Method::GET && request_body.is_some() {
112            reqwest::Method::POST
113        } else {
114            method.to_owned()
115        };
116        let mut req = self.client.request(actual_method, url.to_owned());
117        if method == reqwest::Method::GET && request_body.is_some() {
118            tracing::trace!("Sending GET request with body as POST via X-HTTP-Method-Override");
119            req = req.header(
120                "X-HTTP-Method-Override",
121                reqwest::header::HeaderValue::from_static("GET"),
122            );
123        }
124        req = req.basic_auth(&self.username, Some(&self.password));
125        if let Some(request_body) = request_body {
126            tracing::trace!("Request body:\n{:#?}", request_body);
127            req = req.json(&request_body);
128        }
129        let result = req.send().await;
130        if let Err(ref e) = result {
131            tracing::error!(%url, %method, "Icinga2 send error: {:?}", e);
132        }
133        let result = result?;
134        let status = result.status();
135        let response_body = result.bytes().await?;
136        match from_utf8(&response_body) {
137            Ok(response_body) => {
138                tracing::trace!("Response body:\n{}", &response_body);
139            }
140            Err(e) => {
141                tracing::trace!(
142                    "Response body that could not be parsed as utf8 because of {}:\n{:?}",
143                    &e,
144                    &response_body
145                );
146            }
147        }
148        if status.is_client_error() {
149            tracing::error!(%url, %method, "Icinga2 status error (client error): {:?}", status);
150        } else if status.is_server_error() {
151            tracing::error!(%url, %method, "Icinga2 status error (server error): {:?}", status);
152        }
153        if response_body.is_empty() {
154            Err(crate::error::Error::EmptyResponseBody(status))
155        } else {
156            let jd = &mut serde_json::Deserializer::from_slice(&response_body);
157            match serde_path_to_error::deserialize(jd) {
158                Ok(response_body) => {
159                    tracing::trace!("Parsed response body:\n{:#?}", response_body);
160                    Ok(response_body)
161                }
162                Err(e) => {
163                    let path = e.path();
164                    tracing::error!("Parsing failed at path {}: {}", path.to_string(), e.inner());
165                    if let Ok(response_body) = serde_json::from_slice(&response_body) {
166                        let mut response_body: serde_json::Value = response_body;
167                        for segment in path {
168                            match (response_body, segment) {
169                                (
170                                    serde_json::Value::Array(vs),
171                                    serde_path_to_error::Segment::Seq { index },
172                                ) => {
173                                    if let Some(v) = vs.get(*index) {
174                                        response_body = v.to_owned();
175                                    } else {
176                                        // if we can not find the element serde_path_to_error references fall back to just returning the error
177                                        return Err(e.into());
178                                    }
179                                }
180                                (
181                                    serde_json::Value::Object(m),
182                                    serde_path_to_error::Segment::Map { key },
183                                ) => {
184                                    if let Some(v) = m.get(key) {
185                                        response_body = v.to_owned();
186                                    } else {
187                                        // if we can not find the element serde_path_to_error references fall back to just returning the error
188                                        return Err(e.into());
189                                    }
190                                }
191                                _ => {
192                                    // if we can not find the element serde_path_to_error references fall back to just returning the error
193                                    return Err(e.into());
194                                }
195                            }
196                        }
197                        tracing::error!("Value in location path references is: {}", response_body);
198                    }
199                    Err(e.into())
200                }
201            }
202        }
203    }
204
205    /// Long-polling on an event stream
206    ///
207    /// # Errors
208    ///
209    /// this returns an error if encoding or the actual request fail
210    pub async fn event_stream(
211        &self,
212        types: &[IcingaEventStreamType],
213        queue: &str,
214        filter: Option<IcingaFilter>,
215    ) -> Result<impl Stream<Item = Result<IcingaEvent, std::io::Error>>, crate::error::Error> {
216        let method = reqwest::Method::POST;
217        let mut url = self
218            .url
219            .join("v1/events")
220            .map_err(crate::error::Error::CouldNotParseUrlFragment)?;
221        for t in types {
222            url.query_pairs_mut().append_pair("types", &t.to_string());
223        }
224        url.query_pairs_mut().append_pair("queue", queue);
225        let request_body = filter;
226        let mut req = self.client.request(method.to_owned(), url.to_owned());
227        req = req.basic_auth(&self.username, Some(&self.password));
228        if let Some(request_body) = request_body {
229            tracing::trace!("Request body:\n{:#?}", request_body);
230            req = req.json(&request_body);
231        }
232        let result = req.send().await;
233        if let Err(ref e) = result {
234            tracing::error!(%url, %method, "Icinga2 send error: {:?}", e);
235        }
236        let result = result?;
237        let status = result.status();
238        if status.is_client_error() {
239            tracing::error!(%url, %method, "Icinga2 status error (client error): {:?}", status);
240        } else if status.is_server_error() {
241            tracing::error!(%url, %method, "Icinga2 status error (server error): {:?}", status);
242        }
243        let byte_chunk_stream = result.bytes_stream().map_err(std::io::Error::other);
244        let stream_reader = StreamReader::new(byte_chunk_stream);
245        let line_reader = LinesStream::new(stream_reader.lines());
246        let event_reader = line_reader.map(|l| match l {
247            Ok(l) => {
248                tracing::trace!("Icinga2 received raw event:\n{}", &l);
249                let jd = &mut serde_json::Deserializer::from_str(&l);
250                match serde_path_to_error::deserialize(jd) {
251                    Ok(event) => {
252                        tracing::trace!("Icinga2 received event:\n{:#?}", &event);
253                        Ok(event)
254                    }
255                    Err(e) => Err(std::io::Error::other(e)),
256                }
257            }
258            Err(e) => Err(e),
259        });
260        Ok(event_reader)
261    }
262}
263
264#[cfg(test)]
265mod test {
266    use super::*;
267    use std::error::Error;
268    use tracing_test::traced_test;
269
270    #[traced_test]
271    #[tokio::test]
272    async fn test_event_stream_async() -> Result<(), Box<dyn Error>> {
273        dotenvy::dotenv()?;
274        let icinga2 = Icinga2Async::from_config_file(std::path::Path::new(&std::env::var(
275            "ICINGA_TEST_INSTANCE_CONFIG",
276        )?))?;
277        let mut stream = icinga2
278            .event_stream(
279                &[
280                    IcingaEventStreamType::CheckResult,
281                    IcingaEventStreamType::StateChange,
282                    IcingaEventStreamType::Notification,
283                    IcingaEventStreamType::AcknowledgementSet,
284                    IcingaEventStreamType::AcknowledgementCleared,
285                    IcingaEventStreamType::CommentAdded,
286                    IcingaEventStreamType::CommentRemove,
287                    IcingaEventStreamType::DowntimeAdded,
288                    IcingaEventStreamType::DowntimeRemoved,
289                    IcingaEventStreamType::DowntimeStarted,
290                    IcingaEventStreamType::DowntimeTriggered,
291                    IcingaEventStreamType::ObjectCreated,
292                    IcingaEventStreamType::ObjectDeleted,
293                    IcingaEventStreamType::ObjectModified,
294                    IcingaEventStreamType::Flapping,
295                ],
296                "test",
297                None,
298            )
299            .await?;
300        for _ in 0..100 {
301            let event = stream.next().await;
302            tracing::trace!("Got event:\n{:#?}", event);
303            if let Some(event) = event {
304                assert!(event.is_ok());
305            }
306        }
307        Ok(())
308    }
309}