devops_armory/logging/alerts/
log_parser.rs1use std::collections::HashSet;
2use std::time::Duration;
3
4use awc::*;
5use futures::stream::select_all;
6use futures::{StreamExt, TryStreamExt};
7use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
8use tuple_conv::*;
9
10use super::models::{
11 Pod,
12 Log,
13 Notification
14};
15
16pub async fn gke_log_parser(
19 token: String,
20 project_name: String,
21 gke_cluster_endpoint: String,
22 gke_cluster_namespace: String,
23 gke_cluster_region: String,
24 gke_k8s_hostname: String,
25 gke_log_message: Vec<String>,
26 slack_webhook_url: String,
27 slack_channel: String,
28 slack_username: String,
29 slack_message_text: String,
30 slack_notified_users: Vec<String>,
31 slack_icon_emoji: String
32) -> Result<(), Box<dyn std::error::Error>> {
33
34 let mut builder = SslConnector::builder(SslMethod::tls()).unwrap();
35 builder.set_verify(SslVerifyMode::NONE);
36
37 let myconnector = builder.build();
38
39 let client = Client::builder()
40 .connector(Connector::new().openssl(myconnector))
41 .finish();
42
43 let pod_list = client
44 .get(format!("https://{gke_cluster_endpoint}/api/v1/namespaces/{gke_cluster_namespace}/pods"))
45 .bearer_auth(&token)
46 .timeout(Duration::from_secs(30))
47 .send()
48 .await
49 .expect("Failed to get pods in current namespace")
50 .json::<Pod>()
51 .await;
52
53 let mut streams = Vec::new();
54
55 let filtered_k8s_hostname = gke_k8s_hostname.as_str();
56
57 for pod in &pod_list {
58 for hostname in &pod.hostname {
59 if hostname.contains(filtered_k8s_hostname) {
60 let res = client
61 .get(&format!("https://{gke_cluster_endpoint}/api/v1/namespaces/{gke_cluster_namespace}/pods/{hostname}/log?&tailLines=10&follow×tamps=true")) .bearer_auth(&token.clone())
63 .insert_header(("Content-Type", "application/json"))
64 .send()
65 .await
66 .expect("Fail to connect to stream")
67 .into_stream();
68
69 streams.push(res);
70 }
71 }
72 }
73
74 let mut combined_stream = select_all(streams);
75
76 loop {
77 match combined_stream.next().await {
78 Some(chunk) => match chunk {
79 Ok(chunk_bytes) => {
80 let mut host_name = "".to_string();
81
82 let mut unique_hash: HashSet<Notification> = HashSet::new();
83
84 for pod in &pod_list {
85 for hostname in &pod.hostname {
86 if hostname.contains(filtered_k8s_hostname) {
87 let gcp_id = project_name.as_str();
88 let gcp_region = gke_cluster_region.as_str();
89 let project_name = project_name.as_str();
90 host_name = hostname.to_owned();
91 let chunk_string =
92 std::str::from_utf8(&chunk_bytes).expect("Non-UTF8 bytes");
93 let iter = chunk_string.split_once(char::is_whitespace);
94
95 let vec = iter.unwrap_or_default().to_vec();
96
97 let log = Log {
98 time: vec.get(0).map(|x| x.to_string()).unwrap_or_default(),
99 message: vec.get(1).map(|x| x.to_string()).unwrap_or_default(),
100 host: host_name.to_string(),
101 google_project_id: gcp_id.to_string(),
102 region: gcp_region.to_string(),
103 project_id: project_name.to_string(),
104 };
105
106 let log_message = log.message;
107
108 let msg_pattern = gke_log_message.iter().any(|s| log_message.contains(s));
109
110 match msg_pattern {
111 true => {
112 let not = Notification {
113 channel: slack_channel.clone(),
114 username: slack_username.clone(),
115 text: format!("{slack_message_text} {:?} {log_message}", slack_notified_users),
116 icon_emoji: slack_icon_emoji.to_string(),
117 };
118
119 unique_hash.insert(not.clone());
120
121 },
122 false => {
123 }
127 }
128
129 }
130
131 }
132
133 }
134
135 match !unique_hash.is_empty() {
136 true => {
137 let current_hash: Vec<Notification> = unique_hash.into_iter().collect();
138 let x = ¤t_hash[0];
139
140 let mut builder2 = SslConnector::builder(SslMethod::tls()).unwrap();
141 builder2.set_verify(SslVerifyMode::NONE);
142
143 let myconnector2 = builder2.build();
144
145 let client_not = Client::builder()
146 .connector(Connector::new().openssl(myconnector2))
147 .finish();
148
149 let slack_notification_request = client_not
150 .post(format!("{slack_webhook_url}"))
151 .timeout(Duration::from_secs(120))
152 .send_json(&x)
153 .await
154 .expect("Failed send request");
155 let req_status = slack_notification_request.status();
156 match req_status.as_u16() {
157 200 => {
158 println!("Request status OK")
159 },
160 _ => {
161 eprintln!("Request status unknown: {}", req_status)
162 }
163 };
164 },
165 false => {
166 }
171 }
172 }
173 Err(err) => {
174 eprintln!("Failed to read stream chunk: {}", err)
175 }
176 },
177 None => {
178 return Err("Reached end of stream".into());
179 }
180 }
181 }
182}
183
184pub async fn gke_log_parser_loop(
186 token: String,
187 project_name: String,
188 gke_cluster_endpoint: String,
189 gke_cluster_namespace: String,
190 gke_cluster_region: String,
191 gke_k8s_hostname: String,
192 gke_log_message: Vec<String>,
193 slack_webhook_url: String,
194 slack_channel: String,
195 slack_username: String,
196 slack_message_text: String,
197 slack_notified_users: Vec<String>,
198 slack_icon_emoji: String
199 ) {
200
201 loop {
202 match gke_log_parser(
203 token.clone(),
204 project_name.clone(),
205 gke_cluster_endpoint.clone(),
206 gke_cluster_namespace.clone(),
207 gke_cluster_region.clone(),
208 gke_k8s_hostname.clone(),
209 gke_log_message.clone(),
210 slack_webhook_url.clone(),
211 slack_channel.clone(),
212 slack_username.clone(),
213 slack_message_text.clone(),
214 slack_notified_users.clone(),
215 slack_icon_emoji.clone(),
216 ).await {
217 Ok(()) => {
218 println!("Stream working as expected. Proceeding...");
219 continue;
220 }
221 Err(e) => {
222 eprintln!("Unexpected end of stream - {e}. Retrying...");
223 continue;
224 }
225 }
226 }
227}
228