Skip to main content

devops_armory/logging/alerts/
log_parser.rs

1use std::collections::HashSet;
2use std::thread::sleep;
3use std::time::{self, Duration};
4
5use awc::*;
6use futures::stream::select_all;
7use futures::{StreamExt, TryStreamExt};
8use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
9use tuple_conv::*;
10
11use super::models::{
12    Pod,
13    Log,
14    Notification
15};
16
17/// Function which parse logs from k8s and send notifications to Slack channel on every match
18/// Token, project, k8s data and slack data need to be provided
19pub async fn gke_log_parser(
20    token: String,
21    project_name: String,
22    gke_cluster_endpoint: String,
23    gke_cluster_namespace: String,
24    gke_cluster_region: String,
25    gke_k8s_hostname: String,
26    gke_log_message: Vec<String>,
27    slack_webhook_url: String,
28    slack_channel: String,
29    slack_username: String,
30    slack_message_text: String,
31    slack_notified_users: Vec<String>,
32    slack_icon_emoji: String
33) -> Result<(), Box<dyn std::error::Error>> {
34
35    let mut builder = SslConnector::builder(SslMethod::tls()).unwrap();
36    builder.set_verify(SslVerifyMode::NONE);
37
38    let myconnector = builder.build();
39
40    let client = Client::builder()
41        .connector(Connector::new().openssl(myconnector))
42        .finish();
43
44    let pod_list = client
45        .get(format!("https://{gke_cluster_endpoint}/api/v1/namespaces/{gke_cluster_namespace}/pods"))
46        .bearer_auth(&token)
47        .timeout(Duration::from_secs(30))
48        .send()
49        .await
50        .expect("Failed to get pods in current namespace")
51        .json::<Pod>()
52        .await;
53
54    let mut streams = Vec::new();
55
56    let filtered_k8s_hostname = gke_k8s_hostname.as_str();
57
58    for pod in &pod_list {
59        for hostname in &pod.hostname {
60            if hostname.contains(filtered_k8s_hostname) {
61                let res = client
62                    .get(&format!("https://{gke_cluster_endpoint}/api/v1/namespaces/{gke_cluster_namespace}/pods/{hostname}/log?&tailLines=10&follow&timestamps=true"))   // <- Create request builder
63                    .bearer_auth(&token.clone())
64                    .insert_header(("Content-Type", "application/json"))
65                    .send()
66                    .await
67                    .expect("Fail to connect to stream")
68                    .into_stream();
69
70                streams.push(res);
71            }
72        }
73    }
74
75    let mut combined_stream = select_all(streams);
76
77    loop {
78        match combined_stream.next().await {
79            Some(chunk) => match chunk {
80                Ok(chunk_bytes) => {
81                    let mut host_name = "".to_string();
82
83                    let mut unique_hash: HashSet<Notification> = HashSet::new();
84
85                    for pod in &pod_list {
86                        for hostname in &pod.hostname {
87                            if hostname.contains(filtered_k8s_hostname) {
88                                let gcp_id = project_name.as_str();
89                                let gcp_region = gke_cluster_region.as_str();
90                                let project_name = project_name.as_str();
91                                host_name = hostname.to_owned();
92                                let chunk_string =
93                                    std::str::from_utf8(&chunk_bytes).expect("Non-UTF8 bytes");
94                                let iter = chunk_string.split_once(char::is_whitespace);
95
96                                let vec = iter.unwrap_or_default().to_vec();
97
98                                let log = Log {
99                                    time: vec.get(0).map(|x| x.to_string()).unwrap_or_default(),
100                                    message: vec.get(1).map(|x| x.to_string()).unwrap_or_default(),
101                                    host: host_name.to_string(),
102                                    google_project_id: gcp_id.to_string(),
103                                    region: gcp_region.to_string(),
104                                    project_id: project_name.to_string(),
105                                };
106
107                                let log_message = log.message;
108
109                                let msg_pattern = gke_log_message.iter().any(|s| log_message.contains(s));
110
111                                match msg_pattern {
112                                    true => {
113                                        let not = Notification {
114                                            channel: slack_channel.clone(),
115                                            username: slack_username.clone(),
116                                            text: format!("{slack_message_text} {:?} {log_message}", slack_notified_users),
117                                            icon_emoji: slack_icon_emoji.to_string(),
118                                        };
119
120                                        unique_hash.insert(not.clone());
121
122                                    },
123                                    false => {
124                                        // If there is no match on message, action below. 
125                                        // At the momment, there is no point of putting missed match on STDOUT
126                                        //println!("No message pattern found: {:?}", gke_log_message);
127                                    }
128                                }
129
130                            }
131
132                        }
133
134                    }
135
136                    match !unique_hash.is_empty() {
137                        true => {
138                            let current_hash: Vec<Notification> = unique_hash.into_iter().collect();
139                            let x = &current_hash[0];
140
141                            let mut builder2 = SslConnector::builder(SslMethod::tls()).unwrap();
142                            builder2.set_verify(SslVerifyMode::NONE);
143                        
144                            let myconnector2 = builder2.build();
145                        
146                            let client_not = Client::builder()
147                                .connector(Connector::new().openssl(myconnector2))
148                                .finish();
149                        
150                            let slack_notification_request = client_not
151                                .post(format!("{slack_webhook_url}"))
152                                .timeout(Duration::from_secs(120))
153                                .send_json(&x)
154                                .await
155                                .expect("Failed send request");
156                            let req_status = slack_notification_request.status();
157                            match req_status.as_u16() {
158                                200 => {
159                                    println!("Request status OK")
160                                },
161                                _ => {
162                                    eprintln!("Request status unknown: {}", req_status)
163                                }
164                            };
165                        },
166                        false => {
167                            // If there is empty hash - no match on message, then print below
168                            // In high traffic environments, there's no need to print below.
169                            // Can be improved in further development
170                            //println!("Unique hash is empty. Nothing to alert");
171                        }
172                    }
173                }
174                Err(err) => {
175                    eprintln!("Failed to read stream chunk: {}", err)
176                }
177            },
178            None => {
179                break Err("Reached end of stream".into());
180            }
181        }
182    }
183}
184
185/// Log parser loop with error handling and reconnecting
186pub async fn gke_log_parser_loop(
187    token: String,
188    project_name: String,
189    gke_cluster_endpoint: String,
190    gke_cluster_namespace: String,
191    gke_cluster_region: String,
192    gke_k8s_hostname: String,
193    gke_log_message: Vec<String>,
194    slack_webhook_url: String,
195    slack_channel: String,
196    slack_username: String,
197    slack_message_text: String,
198    slack_notified_users: Vec<String>,
199    slack_icon_emoji: String
200    ) {
201    let dur = time::Duration::from_secs(60);
202    loop {
203        match gke_log_parser(
204            token.clone(),
205            project_name.clone(),
206            gke_cluster_endpoint.clone(),
207            gke_cluster_namespace.clone(),
208            gke_cluster_region.clone(),
209            gke_k8s_hostname.clone(),
210            gke_log_message.clone(),
211            slack_webhook_url.clone(),
212            slack_channel.clone(),
213            slack_username.clone(),
214            slack_message_text.clone(),
215            slack_notified_users.clone(),
216            slack_icon_emoji.clone(),
217            ).await {
218                Ok(()) => {
219                    println!("Stream working as expected. Proceeding...");
220                }
221                Err(e) => {
222                    eprintln!("Unexpected end of stream - {e}. Retrying...");
223                    sleep(dur);
224                    break;
225                }
226            }
227    }
228}
229