1use std::{path::Path, str::from_utf8};
4
5use futures::stream::TryStreamExt;
6use futures::Stream;
7use futures::StreamExt;
8use serde::{de::DeserializeOwned, Serialize};
9use tokio::io::AsyncBufReadExt;
10use tokio_stream::wrappers::LinesStream;
11use tokio_util::io::StreamReader;
12
13use crate::config::Icinga2Instance;
14use crate::types::{
15 enums::event_stream_type::IcingaEventStreamType,
16 event_stream::IcingaEvent,
17 filter::IcingaFilter,
18 rest::{RestApiEndpoint, RestApiResponse},
19};
20
21#[derive(Debug, Clone)]
23pub struct Icinga2Async {
24 client: reqwest::Client,
26 pub url: url::Url,
28 pub username: String,
30 password: String,
32}
33
34impl Icinga2Async {
35 pub fn from_instance_config(config: &Icinga2Instance) -> Result<Self, crate::error::Error> {
42 let client_builder = reqwest::ClientBuilder::new();
43 let client_builder = client_builder.user_agent(concat!(
44 env!("CARGO_PKG_NAME"),
45 "/",
46 env!("CARGO_PKG_VERSION")
47 ));
48 let mut headers = reqwest::header::HeaderMap::new();
49 headers.insert(
50 "Content-Type",
51 reqwest::header::HeaderValue::from_static("application/json"),
52 );
53 headers.insert(
54 "Accept",
55 reqwest::header::HeaderValue::from_static("application/json"),
56 );
57 let client_builder = client_builder.default_headers(headers);
58 let client_builder = if let Some(ca_certificate) = &config.ca_certificate {
59 let ca_cert_content = std::fs::read(ca_certificate)
60 .map_err(crate::error::Error::CouldNotReadCACertFile)?;
61 let ca_cert = reqwest::Certificate::from_pem(&ca_cert_content)
62 .map_err(crate::error::Error::CouldNotParsePEMCACertificate)?;
63 client_builder.tls_certs_only([ca_cert])
64 } else {
65 client_builder
66 };
67 let client = client_builder
68 .build()
69 .map_err(crate::error::Error::CouldNotBuildReqwestClientFromSuppliedInformation)?;
70 let url =
71 url::Url::parse(&config.url).map_err(crate::error::Error::CouldNotParseUrlInConfig)?;
72 let username = config.username.clone();
73 let password = config.password.clone();
74 Ok(Icinga2Async {
75 client,
76 url,
77 username,
78 password,
79 })
80 }
81
82 pub fn from_config_file(path: &Path) -> Result<Self, crate::error::Error> {
89 let icinga_instance = Icinga2Instance::from_config_file(path)?;
90 Self::from_instance_config(&icinga_instance)
91 }
92
93 pub async fn rest<ApiEndpoint, Res>(
99 &self,
100 api_endpoint: ApiEndpoint,
101 ) -> Result<Res, crate::error::Error>
102 where
103 ApiEndpoint: RestApiEndpoint,
104 <ApiEndpoint as RestApiEndpoint>::RequestBody: Clone + Serialize + std::fmt::Debug,
105 Res: DeserializeOwned + std::fmt::Debug + RestApiResponse<ApiEndpoint>,
106 {
107 let method = api_endpoint.method()?;
108 let url = api_endpoint.url(&self.url)?;
109 let request_body: Option<std::borrow::Cow<<ApiEndpoint as RestApiEndpoint>::RequestBody>> =
110 api_endpoint.request_body()?;
111 let actual_method = if method == reqwest::Method::GET && request_body.is_some() {
112 reqwest::Method::POST
113 } else {
114 method.to_owned()
115 };
116 let mut req = self.client.request(actual_method, url.to_owned());
117 if method == reqwest::Method::GET && request_body.is_some() {
118 tracing::trace!("Sending GET request with body as POST via X-HTTP-Method-Override");
119 req = req.header(
120 "X-HTTP-Method-Override",
121 reqwest::header::HeaderValue::from_static("GET"),
122 );
123 }
124 req = req.basic_auth(&self.username, Some(&self.password));
125 if let Some(request_body) = request_body {
126 tracing::trace!("Request body:\n{:#?}", request_body);
127 req = req.json(&request_body);
128 }
129 let result = req.send().await;
130 if let Err(ref e) = result {
131 tracing::error!(%url, %method, "Icinga2 send error: {:?}", e);
132 }
133 let result = result?;
134 let status = result.status();
135 let response_body = result.bytes().await?;
136 match from_utf8(&response_body) {
137 Ok(response_body) => {
138 tracing::trace!("Response body:\n{}", &response_body);
139 }
140 Err(e) => {
141 tracing::trace!(
142 "Response body that could not be parsed as utf8 because of {}:\n{:?}",
143 &e,
144 &response_body
145 );
146 }
147 }
148 if status.is_client_error() {
149 tracing::error!(%url, %method, "Icinga2 status error (client error): {:?}", status);
150 } else if status.is_server_error() {
151 tracing::error!(%url, %method, "Icinga2 status error (server error): {:?}", status);
152 }
153 if response_body.is_empty() {
154 Err(crate::error::Error::EmptyResponseBody(status))
155 } else {
156 let jd = &mut serde_json::Deserializer::from_slice(&response_body);
157 match serde_path_to_error::deserialize(jd) {
158 Ok(response_body) => {
159 tracing::trace!("Parsed response body:\n{:#?}", response_body);
160 Ok(response_body)
161 }
162 Err(e) => {
163 let path = e.path();
164 tracing::error!("Parsing failed at path {}: {}", path.to_string(), e.inner());
165 if let Ok(response_body) = serde_json::from_slice(&response_body) {
166 let mut response_body: serde_json::Value = response_body;
167 for segment in path {
168 match (response_body, segment) {
169 (
170 serde_json::Value::Array(vs),
171 serde_path_to_error::Segment::Seq { index },
172 ) => {
173 if let Some(v) = vs.get(*index) {
174 response_body = v.to_owned();
175 } else {
176 return Err(e.into());
178 }
179 }
180 (
181 serde_json::Value::Object(m),
182 serde_path_to_error::Segment::Map { key },
183 ) => {
184 if let Some(v) = m.get(key) {
185 response_body = v.to_owned();
186 } else {
187 return Err(e.into());
189 }
190 }
191 _ => {
192 return Err(e.into());
194 }
195 }
196 }
197 tracing::error!("Value in location path references is: {}", response_body);
198 }
199 Err(e.into())
200 }
201 }
202 }
203 }
204
205 pub async fn event_stream(
211 &self,
212 types: &[IcingaEventStreamType],
213 queue: &str,
214 filter: Option<IcingaFilter>,
215 ) -> Result<impl Stream<Item = Result<IcingaEvent, std::io::Error>>, crate::error::Error> {
216 let method = reqwest::Method::POST;
217 let mut url = self
218 .url
219 .join("v1/events")
220 .map_err(crate::error::Error::CouldNotParseUrlFragment)?;
221 for t in types {
222 url.query_pairs_mut().append_pair("types", &t.to_string());
223 }
224 url.query_pairs_mut().append_pair("queue", queue);
225 let request_body = filter;
226 let mut req = self.client.request(method.to_owned(), url.to_owned());
227 req = req.basic_auth(&self.username, Some(&self.password));
228 if let Some(request_body) = request_body {
229 tracing::trace!("Request body:\n{:#?}", request_body);
230 req = req.json(&request_body);
231 }
232 let result = req.send().await;
233 if let Err(ref e) = result {
234 tracing::error!(%url, %method, "Icinga2 send error: {:?}", e);
235 }
236 let result = result?;
237 let status = result.status();
238 if status.is_client_error() {
239 tracing::error!(%url, %method, "Icinga2 status error (client error): {:?}", status);
240 } else if status.is_server_error() {
241 tracing::error!(%url, %method, "Icinga2 status error (server error): {:?}", status);
242 }
243 let byte_chunk_stream = result.bytes_stream().map_err(std::io::Error::other);
244 let stream_reader = StreamReader::new(byte_chunk_stream);
245 let line_reader = LinesStream::new(stream_reader.lines());
246 let event_reader = line_reader.map(|l| match l {
247 Ok(l) => {
248 tracing::trace!("Icinga2 received raw event:\n{}", &l);
249 let jd = &mut serde_json::Deserializer::from_str(&l);
250 match serde_path_to_error::deserialize(jd) {
251 Ok(event) => {
252 tracing::trace!("Icinga2 received event:\n{:#?}", &event);
253 Ok(event)
254 }
255 Err(e) => Err(std::io::Error::other(e)),
256 }
257 }
258 Err(e) => Err(e),
259 });
260 Ok(event_reader)
261 }
262}
263
264#[cfg(test)]
265mod test {
266 use super::*;
267 use std::error::Error;
268 use tracing_test::traced_test;
269
270 #[traced_test]
271 #[tokio::test]
272 async fn test_event_stream_async() -> Result<(), Box<dyn Error>> {
273 dotenvy::dotenv()?;
274 let icinga2 = Icinga2Async::from_config_file(std::path::Path::new(&std::env::var(
275 "ICINGA_TEST_INSTANCE_CONFIG",
276 )?))?;
277 let mut stream = icinga2
278 .event_stream(
279 &[
280 IcingaEventStreamType::CheckResult,
281 IcingaEventStreamType::StateChange,
282 IcingaEventStreamType::Notification,
283 IcingaEventStreamType::AcknowledgementSet,
284 IcingaEventStreamType::AcknowledgementCleared,
285 IcingaEventStreamType::CommentAdded,
286 IcingaEventStreamType::CommentRemove,
287 IcingaEventStreamType::DowntimeAdded,
288 IcingaEventStreamType::DowntimeRemoved,
289 IcingaEventStreamType::DowntimeStarted,
290 IcingaEventStreamType::DowntimeTriggered,
291 IcingaEventStreamType::ObjectCreated,
292 IcingaEventStreamType::ObjectDeleted,
293 IcingaEventStreamType::ObjectModified,
294 IcingaEventStreamType::Flapping,
295 ],
296 "test",
297 None,
298 )
299 .await?;
300 for _ in 0..100 {
301 let event = stream.next().await;
302 tracing::trace!("Got event:\n{:#?}", event);
303 if let Some(event) = event {
304 assert!(event.is_ok());
305 }
306 }
307 Ok(())
308 }
309}