1use std::{path::Path, str::from_utf8};
4
5use futures::stream::TryStreamExt;
6use futures::Stream;
7use futures::StreamExt;
8use serde::{de::DeserializeOwned, Serialize};
9use tokio::io::AsyncBufReadExt;
10use tokio_stream::wrappers::LinesStream;
11use tokio_util::io::StreamReader;
12
13use crate::config::Icinga2Instance;
14use crate::types::{
15 enums::event_stream_type::IcingaEventStreamType,
16 event_stream::IcingaEvent,
17 filter::IcingaFilter,
18 rest::{RestApiEndpoint, RestApiResponse},
19};
20
21#[derive(Debug, Clone)]
23pub struct Icinga2Async {
24 client: reqwest::Client,
26 pub url: url::Url,
28 pub username: String,
30 password: String,
32}
33
34impl Icinga2Async {
35 pub fn from_instance_config(config: &Icinga2Instance) -> Result<Self, crate::error::Error> {
42 let client_builder = reqwest::ClientBuilder::new();
43 let client_builder = client_builder.user_agent(concat!(
44 env!("CARGO_PKG_NAME"),
45 "/",
46 env!("CARGO_PKG_VERSION")
47 ));
48 let mut headers = reqwest::header::HeaderMap::new();
49 headers.insert(
50 "Content-Type",
51 reqwest::header::HeaderValue::from_static("application/json"),
52 );
53 headers.insert(
54 "Accept",
55 reqwest::header::HeaderValue::from_static("application/json"),
56 );
57 let client_builder = client_builder.default_headers(headers);
58 let client_builder = if let Some(ca_certificate) = &config.ca_certificate {
59 let ca_cert_content = std::fs::read(ca_certificate)
60 .map_err(crate::error::Error::CouldNotReadCACertFile)?;
61 let ca_cert = reqwest::Certificate::from_pem(&ca_cert_content)
62 .map_err(crate::error::Error::CouldNotParsePEMCACertificate)?;
63 let client_builder = client_builder.add_root_certificate(ca_cert);
64 client_builder.tls_built_in_root_certs(false)
65 } else {
66 client_builder
67 };
68 let client = client_builder
69 .build()
70 .map_err(crate::error::Error::CouldNotBuildReqwestClientFromSuppliedInformation)?;
71 let url =
72 url::Url::parse(&config.url).map_err(crate::error::Error::CouldNotParseUrlInConfig)?;
73 let username = config.username.clone();
74 let password = config.password.clone();
75 Ok(Icinga2Async {
76 client,
77 url,
78 username,
79 password,
80 })
81 }
82
83 pub fn from_config_file(path: &Path) -> Result<Self, crate::error::Error> {
90 let icinga_instance = Icinga2Instance::from_config_file(path)?;
91 Self::from_instance_config(&icinga_instance)
92 }
93
94 pub async fn rest<ApiEndpoint, Res>(
100 &self,
101 api_endpoint: ApiEndpoint,
102 ) -> Result<Res, crate::error::Error>
103 where
104 ApiEndpoint: RestApiEndpoint,
105 <ApiEndpoint as RestApiEndpoint>::RequestBody: Clone + Serialize + std::fmt::Debug,
106 Res: DeserializeOwned + std::fmt::Debug + RestApiResponse<ApiEndpoint>,
107 {
108 let method = api_endpoint.method()?;
109 let url = api_endpoint.url(&self.url)?;
110 let request_body: Option<std::borrow::Cow<<ApiEndpoint as RestApiEndpoint>::RequestBody>> =
111 api_endpoint.request_body()?;
112 let actual_method = if method == reqwest::Method::GET && request_body.is_some() {
113 reqwest::Method::POST
114 } else {
115 method.to_owned()
116 };
117 let mut req = self.client.request(actual_method, url.to_owned());
118 if method == reqwest::Method::GET && request_body.is_some() {
119 tracing::trace!("Sending GET request with body as POST via X-HTTP-Method-Override");
120 req = req.header(
121 "X-HTTP-Method-Override",
122 reqwest::header::HeaderValue::from_static("GET"),
123 );
124 }
125 req = req.basic_auth(&self.username, Some(&self.password));
126 if let Some(request_body) = request_body {
127 tracing::trace!("Request body:\n{:#?}", request_body);
128 req = req.json(&request_body);
129 }
130 let result = req.send().await;
131 if let Err(ref e) = result {
132 tracing::error!(%url, %method, "Icinga2 send error: {:?}", e);
133 }
134 let result = result?;
135 let status = result.status();
136 let response_body = result.bytes().await?;
137 match from_utf8(&response_body) {
138 Ok(response_body) => {
139 tracing::trace!("Response body:\n{}", &response_body);
140 }
141 Err(e) => {
142 tracing::trace!(
143 "Response body that could not be parsed as utf8 because of {}:\n{:?}",
144 &e,
145 &response_body
146 );
147 }
148 }
149 if status.is_client_error() {
150 tracing::error!(%url, %method, "Icinga2 status error (client error): {:?}", status);
151 } else if status.is_server_error() {
152 tracing::error!(%url, %method, "Icinga2 status error (server error): {:?}", status);
153 }
154 if response_body.is_empty() {
155 Err(crate::error::Error::EmptyResponseBody(status))
156 } else {
157 let jd = &mut serde_json::Deserializer::from_slice(&response_body);
158 match serde_path_to_error::deserialize(jd) {
159 Ok(response_body) => {
160 tracing::trace!("Parsed response body:\n{:#?}", response_body);
161 Ok(response_body)
162 }
163 Err(e) => {
164 let path = e.path();
165 tracing::error!("Parsing failed at path {}: {}", path.to_string(), e.inner());
166 if let Ok(response_body) = serde_json::from_slice(&response_body) {
167 let mut response_body: serde_json::Value = response_body;
168 for segment in path {
169 match (response_body, segment) {
170 (
171 serde_json::Value::Array(vs),
172 serde_path_to_error::Segment::Seq { index },
173 ) => {
174 if let Some(v) = vs.get(*index) {
175 response_body = v.to_owned();
176 } else {
177 return Err(e.into());
179 }
180 }
181 (
182 serde_json::Value::Object(m),
183 serde_path_to_error::Segment::Map { key },
184 ) => {
185 if let Some(v) = m.get(key) {
186 response_body = v.to_owned();
187 } else {
188 return Err(e.into());
190 }
191 }
192 _ => {
193 return Err(e.into());
195 }
196 }
197 }
198 tracing::error!("Value in location path references is: {}", response_body);
199 }
200 Err(e.into())
201 }
202 }
203 }
204 }
205
206 pub async fn event_stream(
212 &self,
213 types: &[IcingaEventStreamType],
214 queue: &str,
215 filter: Option<IcingaFilter>,
216 ) -> Result<impl Stream<Item = Result<IcingaEvent, std::io::Error>>, crate::error::Error> {
217 let method = reqwest::Method::POST;
218 let mut url = self
219 .url
220 .join("v1/events")
221 .map_err(crate::error::Error::CouldNotParseUrlFragment)?;
222 for t in types {
223 url.query_pairs_mut().append_pair("types", &t.to_string());
224 }
225 url.query_pairs_mut().append_pair("queue", queue);
226 let request_body = filter;
227 let mut req = self.client.request(method.to_owned(), url.to_owned());
228 req = req.basic_auth(&self.username, Some(&self.password));
229 if let Some(request_body) = request_body {
230 tracing::trace!("Request body:\n{:#?}", request_body);
231 req = req.json(&request_body);
232 }
233 let result = req.send().await;
234 if let Err(ref e) = result {
235 tracing::error!(%url, %method, "Icinga2 send error: {:?}", e);
236 }
237 let result = result?;
238 let status = result.status();
239 if status.is_client_error() {
240 tracing::error!(%url, %method, "Icinga2 status error (client error): {:?}", status);
241 } else if status.is_server_error() {
242 tracing::error!(%url, %method, "Icinga2 status error (server error): {:?}", status);
243 }
244 let byte_chunk_stream = result
245 .bytes_stream()
246 .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e));
247 let stream_reader = StreamReader::new(byte_chunk_stream);
248 let line_reader = LinesStream::new(stream_reader.lines());
249 let event_reader = line_reader.map(|l| match l {
250 Ok(l) => {
251 tracing::trace!("Icinga2 received raw event:\n{}", &l);
252 let jd = &mut serde_json::Deserializer::from_str(&l);
253 match serde_path_to_error::deserialize(jd) {
254 Ok(event) => {
255 tracing::trace!("Icinga2 received event:\n{:#?}", &event);
256 Ok(event)
257 }
258 Err(e) => Err(std::io::Error::new(std::io::ErrorKind::Other, e)),
259 }
260 }
261 Err(e) => Err(e),
262 });
263 Ok(event_reader)
264 }
265}
266
267#[cfg(test)]
268mod test {
269 use super::*;
270 use std::error::Error;
271 use tracing_test::traced_test;
272
273 #[traced_test]
274 #[tokio::test]
275 async fn test_event_stream_async() -> Result<(), Box<dyn Error>> {
276 dotenvy::dotenv()?;
277 let icinga2 = Icinga2Async::from_config_file(std::path::Path::new(&std::env::var(
278 "ICINGA_TEST_INSTANCE_CONFIG",
279 )?))?;
280 let mut stream = icinga2
281 .event_stream(
282 &[
283 IcingaEventStreamType::CheckResult,
284 IcingaEventStreamType::StateChange,
285 IcingaEventStreamType::Notification,
286 IcingaEventStreamType::AcknowledgementSet,
287 IcingaEventStreamType::AcknowledgementCleared,
288 IcingaEventStreamType::CommentAdded,
289 IcingaEventStreamType::CommentRemove,
290 IcingaEventStreamType::DowntimeAdded,
291 IcingaEventStreamType::DowntimeRemoved,
292 IcingaEventStreamType::DowntimeStarted,
293 IcingaEventStreamType::DowntimeTriggered,
294 IcingaEventStreamType::ObjectCreated,
295 IcingaEventStreamType::ObjectDeleted,
296 IcingaEventStreamType::ObjectModified,
297 IcingaEventStreamType::Flapping,
298 ],
299 "test",
300 None,
301 )
302 .await?;
303 for _ in 0..100 {
304 let event = stream.next().await;
305 tracing::trace!("Got event:\n{:#?}", event);
306 if let Some(event) = event {
307 assert!(event.is_ok());
308 }
309 }
310 Ok(())
311 }
312}