Skip to main content

stormchaser_model/
logging.rs

1use anyhow::Result;
2use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
3use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
4use serde::{Deserialize, Serialize};
5use serde_json::Value;
6use tokio::sync::mpsc;
7use uuid::Uuid;
8
9/// Represents the supported logging backends for step execution logs.
10#[derive(Debug, Clone, Serialize, Deserialize)]
11pub enum LogBackend {
12    /// Grafana Loki backend.
13    Loki {
14        /// The URL of the Loki server.
15        url: String,
16    },
17    /// Elasticsearch backend.
18    Elasticsearch {
19        /// The URL of the Elasticsearch server.
20        url: String,
21        /// The Elasticsearch index to query.
22        index: String,
23    },
24}
25
26impl LogBackend {
27    fn create_client(&self) -> ClientWithMiddleware {
28        let retry_policy = ExponentialBackoff::builder().build_with_max_retries(5);
29        ClientBuilder::new(reqwest::Client::new())
30            .with(RetryTransientMiddleware::new_with_policy(retry_policy))
31            .build()
32    }
33
34    /// Fetches historical logs for a specific step instance.
35    pub async fn fetch_step_logs(
36        &self,
37        step_name: &str,
38        step_id: Uuid,
39        started_at: Option<chrono::DateTime<chrono::Utc>>,
40        finished_at: Option<chrono::DateTime<chrono::Utc>>,
41    ) -> Result<Vec<String>> {
42        let job_name = format!(
43            "storm-{}-{}",
44            step_name.to_lowercase().replace('_', "-"),
45            &step_id.to_string()[..8]
46        );
47
48        match self {
49            LogBackend::Loki { url } => {
50                self.fetch_loki_logs(url, &job_name, started_at, finished_at)
51                    .await
52            }
53            LogBackend::Elasticsearch { url, index } => {
54                self.fetch_elasticsearch_logs(url, index, &job_name, started_at, finished_at)
55                    .await
56            }
57        }
58    }
59
60    /// Opens a real-time stream of log lines for a currently executing step instance.
61    pub async fn stream_step_logs(
62        &self,
63        step_name: &str,
64        step_id: Uuid,
65    ) -> Result<mpsc::Receiver<Result<String>>> {
66        let job_name = format!(
67            "storm-{}-{}",
68            step_name.to_lowercase().replace('_', "-"),
69            &step_id.to_string()[..8]
70        );
71
72        match self {
73            LogBackend::Loki { url } => self.stream_loki_logs(url, &job_name).await,
74            LogBackend::Elasticsearch { .. } => {
75                anyhow::bail!(
76                    "Log streaming is not currently supported for Elasticsearch backends"
77                );
78            }
79        }
80    }
81
82    async fn stream_loki_logs(
83        &self,
84        loki_url: &str,
85        job_name: &str,
86    ) -> Result<mpsc::Receiver<Result<String>>> {
87        use futures::StreamExt;
88
89        let query = format!("{{job_name=\"{}\"}}", job_name);
90
91        let base_url = loki_url.trim_end_matches('/');
92        let ws_url = if base_url.starts_with("http://") {
93            base_url.replace("http://", "ws://")
94        } else if base_url.starts_with("https://") {
95            base_url.replace("https://", "wss://")
96        } else {
97            base_url.to_string()
98        };
99
100        let ws_url = format!(
101            "{}/loki/api/v1/tail?query={}",
102            ws_url,
103            urlencoding::encode(&query)
104        );
105
106        tracing::debug!("Connecting to Loki WebSocket: {}", ws_url);
107        let (ws_stream, _) = tokio_tungstenite::connect_async(&ws_url).await?;
108        tracing::debug!("Connected to Loki WebSocket");
109        let (tx, rx) = mpsc::channel(100);
110
111        tokio::spawn(async move {
112            let mut read_stream = ws_stream;
113            while let Some(msg) = read_stream.next().await {
114                match msg {
115                    Ok(tungstenite::Message::Text(text)) => {
116                        tracing::trace!("Received Loki WebSocket text: {}", text);
117                        if let Ok(data) = serde_json::from_str::<Value>(&text) {
118                            if let Some(streams) = data.get("streams").and_then(|s| s.as_array()) {
119                                for stream in streams {
120                                    if let Some(values) =
121                                        stream.get("values").and_then(|v| v.as_array())
122                                    {
123                                        for entry in values {
124                                            if let Some(log_line) =
125                                                entry.get(1).and_then(|v| v.as_str())
126                                            {
127                                                if tx.send(Ok(log_line.to_string())).await.is_err()
128                                                {
129                                                    return; // Receiver dropped
130                                                }
131                                            }
132                                        }
133                                    }
134                                }
135                            }
136                        }
137                    }
138                    Ok(tungstenite::Message::Close(_)) => {
139                        break;
140                    }
141                    Err(e) => {
142                        let _ = tx
143                            .send(Err(anyhow::anyhow!("WebSocket error: {}", e)))
144                            .await;
145                        break;
146                    }
147                    _ => {}
148                }
149            }
150        });
151
152        Ok(rx)
153    }
154
155    async fn fetch_loki_logs(
156        &self,
157        loki_url: &str,
158        job_name: &str,
159        started_at: Option<chrono::DateTime<chrono::Utc>>,
160        finished_at: Option<chrono::DateTime<chrono::Utc>>,
161    ) -> Result<Vec<String>> {
162        let query = format!("{{job_name=\"{}\"}}", job_name);
163        let url = format!("{}/loki/api/v1/query_range", loki_url.trim_end_matches('/'));
164
165        let client = self.create_client();
166        let forward = "forward".to_string();
167        let limit = "5000".to_string();
168
169        let start_time = started_at
170            .map(|t| t - chrono::Duration::minutes(1))
171            .unwrap_or_else(|| chrono::Utc::now() - chrono::Duration::days(30))
172            .timestamp_nanos_opt()
173            .unwrap_or(0)
174            .to_string();
175
176        let end_time = finished_at
177            .unwrap_or_else(chrono::Utc::now)
178            // Add a small buffer to end time to ensure we get the final logs
179            .checked_add_signed(chrono::Duration::minutes(5))
180            .unwrap_or_else(chrono::Utc::now)
181            .timestamp_nanos_opt()
182            .unwrap_or(0)
183            .to_string();
184
185        let resp = client
186            .get(&url)
187            .query(&[
188                ("query", &query),
189                ("direction", &forward),
190                ("limit", &limit),
191                ("start", &start_time),
192                ("end", &end_time),
193            ])
194            .send()
195            .await?;
196
197        if !resp.status().is_success() {
198            return Err(anyhow::anyhow!("Loki returned status {}", resp.status()));
199        }
200
201        let data: Value = resp.json().await?;
202        let mut logs = Vec::new();
203
204        if let Some(streams) = data["data"]["result"].as_array() {
205            for stream in streams {
206                if let Some(values) = stream["values"].as_array() {
207                    for entry in values {
208                        if let Some(log_line) = entry.get(1).and_then(|v| v.as_str()) {
209                            logs.push(log_line.to_string());
210                        }
211                    }
212                }
213            }
214        }
215
216        Ok(logs)
217    }
218
219    async fn fetch_elasticsearch_logs(
220        &self,
221        es_url: &str,
222        index: &str,
223        job_name: &str,
224        started_at: Option<chrono::DateTime<chrono::Utc>>,
225        finished_at: Option<chrono::DateTime<chrono::Utc>>,
226    ) -> Result<Vec<String>> {
227        let url = format!("{}/{}/_search", es_url.trim_end_matches('/'), index);
228
229        let gte = started_at
230            .map(|t| t - chrono::Duration::minutes(1))
231            .unwrap_or_else(|| chrono::Utc::now() - chrono::Duration::days(30))
232            .to_rfc3339();
233
234        let lte = finished_at
235            .unwrap_or_else(chrono::Utc::now)
236            .checked_add_signed(chrono::Duration::seconds(5))
237            .unwrap_or_else(chrono::Utc::now)
238            .to_rfc3339();
239
240        let query = serde_json::json!({
241            "query": {
242                "bool": {
243                    "must": [
244                        { "term": { "job_name.keyword": job_name } }
245                    ],
246                    "filter": [
247                        {
248                            "range": {
249                                "@timestamp": {
250                                    "gte": gte,
251                                    "lte": lte
252                                }
253                            }
254                        }
255                    ]
256                }
257            },
258            "sort": [
259                { "@timestamp": "asc" }
260            ],
261            "size": 5000
262        });
263
264        let client = self.create_client();
265        let resp = client.post(&url).json(&query).send().await?;
266
267        if !resp.status().is_success() {
268            return Err(anyhow::anyhow!(
269                "Elasticsearch returned status {}",
270                resp.status()
271            ));
272        }
273
274        let data: Value = resp.json().await?;
275        let mut logs = Vec::new();
276
277        if let Some(hits) = data["hits"]["hits"].as_array() {
278            for hit in hits {
279                if let Some(message) = hit["_source"]["message"].as_str() {
280                    logs.push(message.to_string());
281                }
282            }
283        }
284
285        Ok(logs)
286    }
287}
288
289#[cfg(test)]
290mod tests {
291    use super::*;
292    use uuid::Uuid;
293    use wiremock::matchers::{method, path};
294    use wiremock::{Mock, MockServer, ResponseTemplate};
295
296    #[tokio::test]
297    async fn test_fetch_loki_logs_success() {
298        let mock_server = MockServer::start().await;
299
300        let loki_response = serde_json::json!({
301            "status": "success",
302            "data": {
303                "resultType": "streams",
304                "result": [
305                    {
306                        "stream": {
307                            "job_name": "storm-test-step-12345678"
308                        },
309                        "values": [
310                            [ "1610000000000000000", "log line 1" ],
311                            [ "1610000001000000000", "log line 2" ]
312                        ]
313                    }
314                ]
315            }
316        });
317
318        Mock::given(method("GET"))
319            .and(path("/loki/api/v1/query_range"))
320            .respond_with(ResponseTemplate::new(200).set_body_json(loki_response))
321            .mount(&mock_server)
322            .await;
323
324        let backend = LogBackend::Loki {
325            url: mock_server.uri(),
326        };
327        let step_id = Uuid::new_v4();
328
329        let logs = backend
330            .fetch_step_logs("test-step", step_id, None, None)
331            .await
332            .expect("Failed to fetch loki logs");
333
334        assert_eq!(logs.len(), 2);
335        assert_eq!(logs[0], "log line 1");
336        assert_eq!(logs[1], "log line 2");
337    }
338
339    #[tokio::test]
340    async fn test_fetch_elasticsearch_logs_success() {
341        let mock_server = MockServer::start().await;
342
343        let es_response = serde_json::json!({
344            "took": 1,
345            "timed_out": false,
346            "_shards": {
347                "total": 1,
348                "successful": 1,
349                "skipped": 0,
350                "failed": 0
351            },
352            "hits": {
353                "total": {
354                    "value": 2,
355                    "relation": "eq"
356                },
357                "max_score": null,
358                "hits": [
359                    {
360                        "_index": "logs",
361                        "_id": "1",
362                        "_score": null,
363                        "_source": {
364                            "message": "es log line 1"
365                        },
366                        "sort": [1610000000000_u64]
367                    },
368                    {
369                        "_index": "logs",
370                        "_id": "2",
371                        "_score": null,
372                        "_source": {
373                            "message": "es log line 2"
374                        },
375                        "sort": [1610000001000_u64]
376                    }
377                ]
378            }
379        });
380
381        Mock::given(method("POST"))
382            .and(path("/my-index/_search"))
383            .respond_with(ResponseTemplate::new(200).set_body_json(es_response))
384            .mount(&mock_server)
385            .await;
386
387        let backend = LogBackend::Elasticsearch {
388            url: mock_server.uri(),
389            index: "my-index".to_string(),
390        };
391        let step_id = Uuid::new_v4();
392
393        let logs = backend
394            .fetch_step_logs("test-step", step_id, None, None)
395            .await
396            .expect("Failed to fetch es logs");
397
398        assert_eq!(logs.len(), 2);
399        assert_eq!(logs[0], "es log line 1");
400        assert_eq!(logs[1], "es log line 2");
401    }
402
403    #[tokio::test]
404    async fn test_fetch_loki_logs_error() {
405        let mock_server = MockServer::start().await;
406
407        Mock::given(method("GET"))
408            .and(path("/loki/api/v1/query_range"))
409            .respond_with(ResponseTemplate::new(500))
410            .mount(&mock_server)
411            .await;
412
413        let backend = LogBackend::Loki {
414            url: mock_server.uri(),
415        };
416        let step_id = Uuid::new_v4();
417
418        let result = backend
419            .fetch_step_logs("test-step", step_id, None, None)
420            .await;
421
422        assert!(result.is_err());
423        assert!(result
424            .unwrap_err()
425            .to_string()
426            .contains("Loki returned status"));
427    }
428
429    #[tokio::test]
430    async fn test_fetch_elasticsearch_logs_error() {
431        let mock_server = MockServer::start().await;
432
433        Mock::given(method("POST"))
434            .and(path("/my-index/_search"))
435            .respond_with(ResponseTemplate::new(500))
436            .mount(&mock_server)
437            .await;
438
439        let backend = LogBackend::Elasticsearch {
440            url: mock_server.uri(),
441            index: "my-index".to_string(),
442        };
443        let step_id = Uuid::new_v4();
444
445        let result = backend
446            .fetch_step_logs("test-step", step_id, None, None)
447            .await;
448
449        assert!(result.is_err());
450        assert!(result
451            .unwrap_err()
452            .to_string()
453            .contains("Elasticsearch returned status"));
454    }
455
456    #[tokio::test]
457    async fn test_stream_step_logs_elasticsearch_unsupported() {
458        let backend = LogBackend::Elasticsearch {
459            url: "http://localhost:9200".to_string(),
460            index: "my-index".to_string(),
461        };
462        let step_id = Uuid::new_v4();
463
464        let result = backend.stream_step_logs("test-step", step_id).await;
465        assert!(result.is_err());
466        assert!(result
467            .unwrap_err()
468            .to_string()
469            .contains("not currently supported"));
470    }
471
472    #[tokio::test]
473    async fn test_stream_step_logs_loki_connection_refused() {
474        // Test that it correctly attempts to connect to the websocket and fails gracefully
475        // if the server is not listening (connection refused).
476        let backend = LogBackend::Loki {
477            url: "http://127.0.0.1:1".to_string(),
478        };
479        let step_id = Uuid::new_v4();
480
481        let result = backend.stream_step_logs("test-step", step_id).await;
482        // The connection should fail
483        assert!(result.is_err());
484    }
485}