devops_armory/logging/alerts/
log_parser.rs1use std::collections::HashSet;
2use std::thread::sleep;
3use std::time::{self, Duration};
4
5use awc::*;
6use futures::stream::select_all;
7use futures::{StreamExt, TryStreamExt};
8use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
9use tuple_conv::*;
10
11use super::models::{
12 Pod,
13 Log,
14 Notification
15};
16
17pub async fn gke_log_parser(
20 token: String,
21 project_name: String,
22 gke_cluster_endpoint: String,
23 gke_cluster_namespace: String,
24 gke_cluster_region: String,
25 gke_k8s_hostname: String,
26 gke_log_message: Vec<String>,
27 slack_webhook_url: String,
28 slack_channel: String,
29 slack_username: String,
30 slack_message_text: String,
31 slack_notified_users: Vec<String>,
32 slack_icon_emoji: String
33) -> Result<(), Box<dyn std::error::Error>> {
34
35 let mut builder = SslConnector::builder(SslMethod::tls()).unwrap();
36 builder.set_verify(SslVerifyMode::NONE);
37
38 let myconnector = builder.build();
39
40 let client = Client::builder()
41 .connector(Connector::new().openssl(myconnector))
42 .finish();
43
44 let pod_list = client
45 .get(format!("https://{gke_cluster_endpoint}/api/v1/namespaces/{gke_cluster_namespace}/pods"))
46 .bearer_auth(&token)
47 .timeout(Duration::from_secs(30))
48 .send()
49 .await
50 .expect("Failed to get pods in current namespace")
51 .json::<Pod>()
52 .await;
53
54 let mut streams = Vec::new();
55
56 let filtered_k8s_hostname = gke_k8s_hostname.as_str();
57
58 for pod in &pod_list {
59 for hostname in &pod.hostname {
60 if hostname.contains(filtered_k8s_hostname) {
61 let res = client
62 .get(&format!("https://{gke_cluster_endpoint}/api/v1/namespaces/{gke_cluster_namespace}/pods/{hostname}/log?&tailLines=10&follow×tamps=true")) .bearer_auth(&token.clone())
64 .insert_header(("Content-Type", "application/json"))
65 .send()
66 .await
67 .expect("Fail to connect to stream")
68 .into_stream();
69
70 streams.push(res);
71 }
72 }
73 }
74
75 let mut combined_stream = select_all(streams);
76
77 loop {
78 match combined_stream.next().await {
79 Some(chunk) => match chunk {
80 Ok(chunk_bytes) => {
81 let mut host_name = "".to_string();
82
83 let mut unique_hash: HashSet<Notification> = HashSet::new();
84
85 for pod in &pod_list {
86 for hostname in &pod.hostname {
87 if hostname.contains(filtered_k8s_hostname) {
88 let gcp_id = project_name.as_str();
89 let gcp_region = gke_cluster_region.as_str();
90 let project_name = project_name.as_str();
91 host_name = hostname.to_owned();
92 let chunk_string =
93 std::str::from_utf8(&chunk_bytes).expect("Non-UTF8 bytes");
94 let iter = chunk_string.split_once(char::is_whitespace);
95
96 let vec = iter.unwrap_or_default().to_vec();
97
98 let log = Log {
99 time: vec.get(0).map(|x| x.to_string()).unwrap_or_default(),
100 message: vec.get(1).map(|x| x.to_string()).unwrap_or_default(),
101 host: host_name.to_string(),
102 google_project_id: gcp_id.to_string(),
103 region: gcp_region.to_string(),
104 project_id: project_name.to_string(),
105 };
106
107 let log_message = log.message;
108
109 let msg_pattern = gke_log_message.iter().any(|s| log_message.contains(s));
110
111 match msg_pattern {
112 true => {
113 let not = Notification {
114 channel: slack_channel.clone(),
115 username: slack_username.clone(),
116 text: format!("{slack_message_text} {:?} {log_message}", slack_notified_users),
117 icon_emoji: slack_icon_emoji.to_string(),
118 };
119
120 unique_hash.insert(not.clone());
121
122 },
123 false => {
124 }
128 }
129
130 }
131
132 }
133
134 }
135
136 match !unique_hash.is_empty() {
137 true => {
138 let current_hash: Vec<Notification> = unique_hash.into_iter().collect();
139 let x = ¤t_hash[0];
140
141 let mut builder2 = SslConnector::builder(SslMethod::tls()).unwrap();
142 builder2.set_verify(SslVerifyMode::NONE);
143
144 let myconnector2 = builder2.build();
145
146 let client_not = Client::builder()
147 .connector(Connector::new().openssl(myconnector2))
148 .finish();
149
150 let slack_notification_request = client_not
151 .post(format!("{slack_webhook_url}"))
152 .timeout(Duration::from_secs(120))
153 .send_json(&x)
154 .await
155 .expect("Failed send request");
156 let req_status = slack_notification_request.status();
157 match req_status.as_u16() {
158 200 => {
159 println!("Request status OK")
160 },
161 _ => {
162 eprintln!("Request status unknown: {}", req_status)
163 }
164 };
165 },
166 false => {
167 }
172 }
173 }
174 Err(err) => {
175 eprintln!("Failed to read stream chunk: {}", err)
176 }
177 },
178 None => {
179 break Err("Reached end of stream".into());
180 }
181 }
182 }
183}
184
185pub async fn gke_log_parser_loop(
187 token: String,
188 project_name: String,
189 gke_cluster_endpoint: String,
190 gke_cluster_namespace: String,
191 gke_cluster_region: String,
192 gke_k8s_hostname: String,
193 gke_log_message: Vec<String>,
194 slack_webhook_url: String,
195 slack_channel: String,
196 slack_username: String,
197 slack_message_text: String,
198 slack_notified_users: Vec<String>,
199 slack_icon_emoji: String
200 ) {
201 let dur = time::Duration::from_secs(60);
202 loop {
203 match gke_log_parser(
204 token.clone(),
205 project_name.clone(),
206 gke_cluster_endpoint.clone(),
207 gke_cluster_namespace.clone(),
208 gke_cluster_region.clone(),
209 gke_k8s_hostname.clone(),
210 gke_log_message.clone(),
211 slack_webhook_url.clone(),
212 slack_channel.clone(),
213 slack_username.clone(),
214 slack_message_text.clone(),
215 slack_notified_users.clone(),
216 slack_icon_emoji.clone(),
217 ).await {
218 Ok(()) => {
219 println!("Stream working as expected. Proceeding...");
220 }
221 Err(e) => {
222 eprintln!("Unexpected end of stream - {e}. Retrying...");
223 sleep(dur);
224 break;
225 }
226 }
227 }
228}
229