icinga2_api/api/
async_client.rs

1//! Main API object (async version)
2
3use std::{path::Path, str::from_utf8};
4
5use futures::stream::TryStreamExt;
6use futures::Stream;
7use futures::StreamExt;
8use serde::{de::DeserializeOwned, Serialize};
9use tokio::io::AsyncBufReadExt;
10use tokio_stream::wrappers::LinesStream;
11use tokio_util::io::StreamReader;
12
13use crate::config::Icinga2Instance;
14use crate::types::{
15    enums::event_stream_type::IcingaEventStreamType,
16    event_stream::IcingaEvent,
17    filter::IcingaFilter,
18    rest::{RestApiEndpoint, RestApiResponse},
19};
20
21/// the runtime object for an Icinga2 instance (blocking variant)
22#[derive(Debug, Clone)]
23pub struct Icinga2Async {
24    /// the HTTP client to use
25    client: reqwest::Client,
26    /// the base URL for the Icinga API
27    pub url: url::Url,
28    /// username
29    pub username: String,
30    /// password
31    password: String,
32}
33
34impl Icinga2Async {
35    /// create a new Icinga2 instance from a config that was
36    /// either manually created or previously loaded via [Icinga2Instance::from_config_file]
37    ///
38    /// # Errors
39    /// this fails if the CA certificate file mentioned in the configuration
40    /// can not be found or parsed
41    pub fn from_instance_config(config: &Icinga2Instance) -> Result<Self, crate::error::Error> {
42        let client_builder = reqwest::ClientBuilder::new();
43        let client_builder = client_builder.user_agent(concat!(
44            env!("CARGO_PKG_NAME"),
45            "/",
46            env!("CARGO_PKG_VERSION")
47        ));
48        let mut headers = reqwest::header::HeaderMap::new();
49        headers.insert(
50            "Content-Type",
51            reqwest::header::HeaderValue::from_static("application/json"),
52        );
53        headers.insert(
54            "Accept",
55            reqwest::header::HeaderValue::from_static("application/json"),
56        );
57        let client_builder = client_builder.default_headers(headers);
58        let client_builder = if let Some(ca_certificate) = &config.ca_certificate {
59            let ca_cert_content = std::fs::read(ca_certificate)
60                .map_err(crate::error::Error::CouldNotReadCACertFile)?;
61            let ca_cert = reqwest::Certificate::from_pem(&ca_cert_content)
62                .map_err(crate::error::Error::CouldNotParsePEMCACertificate)?;
63            let client_builder = client_builder.add_root_certificate(ca_cert);
64            client_builder.tls_built_in_root_certs(false)
65        } else {
66            client_builder
67        };
68        let client = client_builder
69            .build()
70            .map_err(crate::error::Error::CouldNotBuildReqwestClientFromSuppliedInformation)?;
71        let url =
72            url::Url::parse(&config.url).map_err(crate::error::Error::CouldNotParseUrlInConfig)?;
73        let username = config.username.clone();
74        let password = config.password.clone();
75        Ok(Icinga2Async {
76            client,
77            url,
78            username,
79            password,
80        })
81    }
82
83    /// create a new Icinga2 instance from a TOML config file
84    ///
85    /// # Errors
86    /// this fails if the configuration file can not be found or parsed
87    /// or the CA certificate file mentioned in the configuration file
88    /// can not be found or parsed
89    pub fn from_config_file(path: &Path) -> Result<Self, crate::error::Error> {
90        let icinga_instance = Icinga2Instance::from_config_file(path)?;
91        Self::from_instance_config(&icinga_instance)
92    }
93
94    /// common code for the REST API calls
95    ///
96    /// # Errors
97    ///
98    /// this returns an error if encoding, the actual request, or decoding of the response fail
99    pub async fn rest<ApiEndpoint, Res>(
100        &self,
101        api_endpoint: ApiEndpoint,
102    ) -> Result<Res, crate::error::Error>
103    where
104        ApiEndpoint: RestApiEndpoint,
105        <ApiEndpoint as RestApiEndpoint>::RequestBody: Clone + Serialize + std::fmt::Debug,
106        Res: DeserializeOwned + std::fmt::Debug + RestApiResponse<ApiEndpoint>,
107    {
108        let method = api_endpoint.method()?;
109        let url = api_endpoint.url(&self.url)?;
110        let request_body: Option<std::borrow::Cow<<ApiEndpoint as RestApiEndpoint>::RequestBody>> =
111            api_endpoint.request_body()?;
112        let actual_method = if method == reqwest::Method::GET && request_body.is_some() {
113            reqwest::Method::POST
114        } else {
115            method.to_owned()
116        };
117        let mut req = self.client.request(actual_method, url.to_owned());
118        if method == reqwest::Method::GET && request_body.is_some() {
119            tracing::trace!("Sending GET request with body as POST via X-HTTP-Method-Override");
120            req = req.header(
121                "X-HTTP-Method-Override",
122                reqwest::header::HeaderValue::from_static("GET"),
123            );
124        }
125        req = req.basic_auth(&self.username, Some(&self.password));
126        if let Some(request_body) = request_body {
127            tracing::trace!("Request body:\n{:#?}", request_body);
128            req = req.json(&request_body);
129        }
130        let result = req.send().await;
131        if let Err(ref e) = result {
132            tracing::error!(%url, %method, "Icinga2 send error: {:?}", e);
133        }
134        let result = result?;
135        let status = result.status();
136        let response_body = result.bytes().await?;
137        match from_utf8(&response_body) {
138            Ok(response_body) => {
139                tracing::trace!("Response body:\n{}", &response_body);
140            }
141            Err(e) => {
142                tracing::trace!(
143                    "Response body that could not be parsed as utf8 because of {}:\n{:?}",
144                    &e,
145                    &response_body
146                );
147            }
148        }
149        if status.is_client_error() {
150            tracing::error!(%url, %method, "Icinga2 status error (client error): {:?}", status);
151        } else if status.is_server_error() {
152            tracing::error!(%url, %method, "Icinga2 status error (server error): {:?}", status);
153        }
154        if response_body.is_empty() {
155            Err(crate::error::Error::EmptyResponseBody(status))
156        } else {
157            let jd = &mut serde_json::Deserializer::from_slice(&response_body);
158            match serde_path_to_error::deserialize(jd) {
159                Ok(response_body) => {
160                    tracing::trace!("Parsed response body:\n{:#?}", response_body);
161                    Ok(response_body)
162                }
163                Err(e) => {
164                    let path = e.path();
165                    tracing::error!("Parsing failed at path {}: {}", path.to_string(), e.inner());
166                    if let Ok(response_body) = serde_json::from_slice(&response_body) {
167                        let mut response_body: serde_json::Value = response_body;
168                        for segment in path {
169                            match (response_body, segment) {
170                                (
171                                    serde_json::Value::Array(vs),
172                                    serde_path_to_error::Segment::Seq { index },
173                                ) => {
174                                    if let Some(v) = vs.get(*index) {
175                                        response_body = v.to_owned();
176                                    } else {
177                                        // if we can not find the element serde_path_to_error references fall back to just returning the error
178                                        return Err(e.into());
179                                    }
180                                }
181                                (
182                                    serde_json::Value::Object(m),
183                                    serde_path_to_error::Segment::Map { key },
184                                ) => {
185                                    if let Some(v) = m.get(key) {
186                                        response_body = v.to_owned();
187                                    } else {
188                                        // if we can not find the element serde_path_to_error references fall back to just returning the error
189                                        return Err(e.into());
190                                    }
191                                }
192                                _ => {
193                                    // if we can not find the element serde_path_to_error references fall back to just returning the error
194                                    return Err(e.into());
195                                }
196                            }
197                        }
198                        tracing::error!("Value in location path references is: {}", response_body);
199                    }
200                    Err(e.into())
201                }
202            }
203        }
204    }
205
206    /// Long-polling on an event stream
207    ///
208    /// # Errors
209    ///
210    /// this returns an error if encoding or the actual request fail
211    pub async fn event_stream(
212        &self,
213        types: &[IcingaEventStreamType],
214        queue: &str,
215        filter: Option<IcingaFilter>,
216    ) -> Result<impl Stream<Item = Result<IcingaEvent, std::io::Error>>, crate::error::Error> {
217        let method = reqwest::Method::POST;
218        let mut url = self
219            .url
220            .join("v1/events")
221            .map_err(crate::error::Error::CouldNotParseUrlFragment)?;
222        for t in types {
223            url.query_pairs_mut().append_pair("types", &t.to_string());
224        }
225        url.query_pairs_mut().append_pair("queue", queue);
226        let request_body = filter;
227        let mut req = self.client.request(method.to_owned(), url.to_owned());
228        req = req.basic_auth(&self.username, Some(&self.password));
229        if let Some(request_body) = request_body {
230            tracing::trace!("Request body:\n{:#?}", request_body);
231            req = req.json(&request_body);
232        }
233        let result = req.send().await;
234        if let Err(ref e) = result {
235            tracing::error!(%url, %method, "Icinga2 send error: {:?}", e);
236        }
237        let result = result?;
238        let status = result.status();
239        if status.is_client_error() {
240            tracing::error!(%url, %method, "Icinga2 status error (client error): {:?}", status);
241        } else if status.is_server_error() {
242            tracing::error!(%url, %method, "Icinga2 status error (server error): {:?}", status);
243        }
244        let byte_chunk_stream = result
245            .bytes_stream()
246            .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e));
247        let stream_reader = StreamReader::new(byte_chunk_stream);
248        let line_reader = LinesStream::new(stream_reader.lines());
249        let event_reader = line_reader.map(|l| match l {
250            Ok(l) => {
251                tracing::trace!("Icinga2 received raw event:\n{}", &l);
252                let jd = &mut serde_json::Deserializer::from_str(&l);
253                match serde_path_to_error::deserialize(jd) {
254                    Ok(event) => {
255                        tracing::trace!("Icinga2 received event:\n{:#?}", &event);
256                        Ok(event)
257                    }
258                    Err(e) => Err(std::io::Error::new(std::io::ErrorKind::Other, e)),
259                }
260            }
261            Err(e) => Err(e),
262        });
263        Ok(event_reader)
264    }
265}
266
267#[cfg(test)]
268mod test {
269    use super::*;
270    use std::error::Error;
271    use tracing_test::traced_test;
272
273    #[traced_test]
274    #[tokio::test]
275    async fn test_event_stream_async() -> Result<(), Box<dyn Error>> {
276        dotenvy::dotenv()?;
277        let icinga2 = Icinga2Async::from_config_file(std::path::Path::new(&std::env::var(
278            "ICINGA_TEST_INSTANCE_CONFIG",
279        )?))?;
280        let mut stream = icinga2
281            .event_stream(
282                &[
283                    IcingaEventStreamType::CheckResult,
284                    IcingaEventStreamType::StateChange,
285                    IcingaEventStreamType::Notification,
286                    IcingaEventStreamType::AcknowledgementSet,
287                    IcingaEventStreamType::AcknowledgementCleared,
288                    IcingaEventStreamType::CommentAdded,
289                    IcingaEventStreamType::CommentRemove,
290                    IcingaEventStreamType::DowntimeAdded,
291                    IcingaEventStreamType::DowntimeRemoved,
292                    IcingaEventStreamType::DowntimeStarted,
293                    IcingaEventStreamType::DowntimeTriggered,
294                    IcingaEventStreamType::ObjectCreated,
295                    IcingaEventStreamType::ObjectDeleted,
296                    IcingaEventStreamType::ObjectModified,
297                    IcingaEventStreamType::Flapping,
298                ],
299                "test",
300                None,
301            )
302            .await?;
303        for _ in 0..100 {
304            let event = stream.next().await;
305            tracing::trace!("Got event:\n{:#?}", event);
306            if let Some(event) = event {
307                assert!(event.is_ok());
308            }
309        }
310        Ok(())
311    }
312}