Skip to main content

devops_armory/logging/alerts/
log_parser.rs

1use std::collections::HashSet;
2use std::time::Duration;
3
4use awc::*;
5use futures::stream::select_all;
6use futures::{StreamExt, TryStreamExt};
7use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
8use tuple_conv::*;
9
10use super::models::{
11    Pod,
12    Log,
13    Notification
14};
15
16/// Function which parse logs from k8s and send notifications to Slack channel on every match
17/// Token, project, k8s data and slack data need to be provided
18pub async fn gke_log_parser(
19    token: String,
20    project_name: String,
21    gke_cluster_endpoint: String,
22    gke_cluster_namespace: String,
23    gke_cluster_region: String,
24    gke_k8s_hostname: String,
25    gke_log_message: Vec<String>,
26    slack_webhook_url: String,
27    slack_channel: String,
28    slack_username: String,
29    slack_message_text: String,
30    slack_notified_users: Vec<String>,
31    slack_icon_emoji: String
32) -> Result<(), Box<dyn std::error::Error>> {
33
34    let mut builder = SslConnector::builder(SslMethod::tls()).unwrap();
35    builder.set_verify(SslVerifyMode::NONE);
36
37    let myconnector = builder.build();
38
39    let client = Client::builder()
40        .connector(Connector::new().openssl(myconnector))
41        .finish();
42
43    let pod_list = client
44        .get(format!("https://{gke_cluster_endpoint}/api/v1/namespaces/{gke_cluster_namespace}/pods"))
45        .bearer_auth(&token)
46        .timeout(Duration::from_secs(30))
47        .send()
48        .await
49        .expect("Failed to get pods in current namespace")
50        .json::<Pod>()
51        .await;
52
53    let mut streams = Vec::new();
54
55    let filtered_k8s_hostname = gke_k8s_hostname.as_str();
56
57    for pod in &pod_list {
58        for hostname in &pod.hostname {
59            if hostname.contains(filtered_k8s_hostname) {
60                let res = client
61                    .get(&format!("https://{gke_cluster_endpoint}/api/v1/namespaces/{gke_cluster_namespace}/pods/{hostname}/log?&tailLines=10&follow&timestamps=true"))   // <- Create request builder
62                    .bearer_auth(&token.clone())
63                    .insert_header(("Content-Type", "application/json"))
64                    .send()
65                    .await
66                    .expect("Fail to connect to stream")
67                    .into_stream();
68
69                streams.push(res);
70            }
71        }
72    }
73
74    let mut combined_stream = select_all(streams);
75
76    loop {
77        match combined_stream.next().await {
78            Some(chunk) => match chunk {
79                Ok(chunk_bytes) => {
80                    let mut host_name = "".to_string();
81
82                    let mut unique_hash: HashSet<Notification> = HashSet::new();
83
84                    for pod in &pod_list {
85                        for hostname in &pod.hostname {
86                            if hostname.contains(filtered_k8s_hostname) {
87                                let gcp_id = project_name.as_str();
88                                let gcp_region = gke_cluster_region.as_str();
89                                let project_name = project_name.as_str();
90                                host_name = hostname.to_owned();
91                                let chunk_string =
92                                    std::str::from_utf8(&chunk_bytes).expect("Non-UTF8 bytes");
93                                let iter = chunk_string.split_once(char::is_whitespace);
94
95                                let vec = iter.unwrap_or_default().to_vec();
96
97                                let log = Log {
98                                    time: vec.get(0).map(|x| x.to_string()).unwrap_or_default(),
99                                    message: vec.get(1).map(|x| x.to_string()).unwrap_or_default(),
100                                    host: host_name.to_string(),
101                                    google_project_id: gcp_id.to_string(),
102                                    region: gcp_region.to_string(),
103                                    project_id: project_name.to_string(),
104                                };
105
106                                let log_message = log.message;
107
108                                let msg_pattern = gke_log_message.iter().any(|s| log_message.contains(s));
109
110                                match msg_pattern {
111                                    true => {
112                                        let not = Notification {
113                                            channel: slack_channel.clone(),
114                                            username: slack_username.clone(),
115                                            text: format!("{slack_message_text} {:?} {log_message}", slack_notified_users),
116                                            icon_emoji: slack_icon_emoji.to_string(),
117                                        };
118
119                                        unique_hash.insert(not.clone());
120
121                                    },
122                                    false => {
123                                        // If there is no match on message, action below. 
124                                        // At the momment, there is no point of putting missed match on STDOUT
125                                        //println!("No message pattern found: {:?}", gke_log_message);
126                                    }
127                                }
128
129                            }
130
131                        }
132
133                    }
134
135                    match !unique_hash.is_empty() {
136                        true => {
137                            let current_hash: Vec<Notification> = unique_hash.into_iter().collect();
138                            let x = &current_hash[0];
139
140                            let mut builder2 = SslConnector::builder(SslMethod::tls()).unwrap();
141                            builder2.set_verify(SslVerifyMode::NONE);
142                        
143                            let myconnector2 = builder2.build();
144                        
145                            let client_not = Client::builder()
146                                .connector(Connector::new().openssl(myconnector2))
147                                .finish();
148                        
149                            let slack_notification_request = client_not
150                                .post(format!("{slack_webhook_url}"))
151                                .timeout(Duration::from_secs(120))
152                                .send_json(&x)
153                                .await
154                                .expect("Failed send request");
155                            let req_status = slack_notification_request.status();
156                            match req_status.as_u16() {
157                                200 => {
158                                    println!("Request status OK")
159                                },
160                                _ => {
161                                    eprintln!("Request status unknown: {}", req_status)
162                                }
163                            };
164                        },
165                        false => {
166                            // If there is empty hash - no match on message, then print below
167                            // In high traffic environments, there's no need to print below.
168                            // Can be improved in further development
169                            //println!("Unique hash is empty. Nothing to alert");
170                        }
171                    }
172                }
173                Err(err) => {
174                    eprintln!("Failed to read stream chunk: {}", err)
175                }
176            },
177            None => {
178                return Err("Reached end of stream".into());
179            }
180        }
181    }
182}
183
184/// Log parser loop with error handling and reconnecting
185pub async fn gke_log_parser_loop(
186    token: String,
187    project_name: String,
188    gke_cluster_endpoint: String,
189    gke_cluster_namespace: String,
190    gke_cluster_region: String,
191    gke_k8s_hostname: String,
192    gke_log_message: Vec<String>,
193    slack_webhook_url: String,
194    slack_channel: String,
195    slack_username: String,
196    slack_message_text: String,
197    slack_notified_users: Vec<String>,
198    slack_icon_emoji: String
199    ) {
200
201    loop {
202        match gke_log_parser(
203            token.clone(),
204            project_name.clone(),
205            gke_cluster_endpoint.clone(),
206            gke_cluster_namespace.clone(),
207            gke_cluster_region.clone(),
208            gke_k8s_hostname.clone(),
209            gke_log_message.clone(),
210            slack_webhook_url.clone(),
211            slack_channel.clone(),
212            slack_username.clone(),
213            slack_message_text.clone(),
214            slack_notified_users.clone(),
215            slack_icon_emoji.clone(),
216            ).await {
217                Ok(()) => {
218                    println!("Stream working as expected. Proceeding...");
219                    continue;
220                }
221                Err(e) => {
222                    eprintln!("Unexpected end of stream - {e}. Retrying...");
223                    continue;
224                }
225            }
226    }
227}
228