1use anyhow::Result;
2use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
3use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
4use serde::{Deserialize, Serialize};
5use serde_json::Value;
6use tokio::sync::mpsc;
7use uuid::Uuid;
8
9#[derive(Debug, Clone, Serialize, Deserialize)]
11pub enum LogBackend {
12 Loki {
14 url: String,
16 },
17 Elasticsearch {
19 url: String,
21 index: String,
23 },
24}
25
26impl LogBackend {
27 fn create_client(&self) -> ClientWithMiddleware {
28 let retry_policy = ExponentialBackoff::builder().build_with_max_retries(5);
29 ClientBuilder::new(reqwest::Client::new())
30 .with(RetryTransientMiddleware::new_with_policy(retry_policy))
31 .build()
32 }
33
34 pub async fn fetch_step_logs(
36 &self,
37 step_name: &str,
38 step_id: Uuid,
39 started_at: Option<chrono::DateTime<chrono::Utc>>,
40 finished_at: Option<chrono::DateTime<chrono::Utc>>,
41 ) -> Result<Vec<String>> {
42 let job_name = format!(
43 "storm-{}-{}",
44 step_name.to_lowercase().replace('_', "-"),
45 &step_id.to_string()[..8]
46 );
47
48 match self {
49 LogBackend::Loki { url } => {
50 self.fetch_loki_logs(url, &job_name, started_at, finished_at)
51 .await
52 }
53 LogBackend::Elasticsearch { url, index } => {
54 self.fetch_elasticsearch_logs(url, index, &job_name, started_at, finished_at)
55 .await
56 }
57 }
58 }
59
60 pub async fn stream_step_logs(
62 &self,
63 step_name: &str,
64 step_id: Uuid,
65 ) -> Result<mpsc::Receiver<Result<String>>> {
66 let job_name = format!(
67 "storm-{}-{}",
68 step_name.to_lowercase().replace('_', "-"),
69 &step_id.to_string()[..8]
70 );
71
72 match self {
73 LogBackend::Loki { url } => self.stream_loki_logs(url, &job_name).await,
74 LogBackend::Elasticsearch { .. } => {
75 anyhow::bail!(
76 "Log streaming is not currently supported for Elasticsearch backends"
77 );
78 }
79 }
80 }
81
82 async fn stream_loki_logs(
83 &self,
84 loki_url: &str,
85 job_name: &str,
86 ) -> Result<mpsc::Receiver<Result<String>>> {
87 use futures::StreamExt;
88
89 let query = format!("{{job_name=\"{}\"}}", job_name);
90
91 let base_url = loki_url.trim_end_matches('/');
92 let ws_url = if base_url.starts_with("http://") {
93 base_url.replace("http://", "ws://")
94 } else if base_url.starts_with("https://") {
95 base_url.replace("https://", "wss://")
96 } else {
97 base_url.to_string()
98 };
99
100 let ws_url = format!(
101 "{}/loki/api/v1/tail?query={}",
102 ws_url,
103 urlencoding::encode(&query)
104 );
105
106 tracing::debug!("Connecting to Loki WebSocket: {}", ws_url);
107 let (ws_stream, _) = tokio_tungstenite::connect_async(&ws_url).await?;
108 tracing::debug!("Connected to Loki WebSocket");
109 let (tx, rx) = mpsc::channel(100);
110
111 tokio::spawn(async move {
112 let mut read_stream = ws_stream;
113 while let Some(msg) = read_stream.next().await {
114 match msg {
115 Ok(tungstenite::Message::Text(text)) => {
116 tracing::trace!("Received Loki WebSocket text: {}", text);
117 if let Ok(data) = serde_json::from_str::<Value>(&text) {
118 if let Some(streams) = data.get("streams").and_then(|s| s.as_array()) {
119 for stream in streams {
120 if let Some(values) =
121 stream.get("values").and_then(|v| v.as_array())
122 {
123 for entry in values {
124 if let Some(log_line) =
125 entry.get(1).and_then(|v| v.as_str())
126 {
127 if tx.send(Ok(log_line.to_string())).await.is_err()
128 {
129 return; }
131 }
132 }
133 }
134 }
135 }
136 }
137 }
138 Ok(tungstenite::Message::Close(_)) => {
139 break;
140 }
141 Err(e) => {
142 let _ = tx
143 .send(Err(anyhow::anyhow!("WebSocket error: {}", e)))
144 .await;
145 break;
146 }
147 _ => {}
148 }
149 }
150 });
151
152 Ok(rx)
153 }
154
155 async fn fetch_loki_logs(
156 &self,
157 loki_url: &str,
158 job_name: &str,
159 started_at: Option<chrono::DateTime<chrono::Utc>>,
160 finished_at: Option<chrono::DateTime<chrono::Utc>>,
161 ) -> Result<Vec<String>> {
162 let query = format!("{{job_name=\"{}\"}}", job_name);
163 let url = format!("{}/loki/api/v1/query_range", loki_url.trim_end_matches('/'));
164
165 let client = self.create_client();
166 let forward = "forward".to_string();
167 let limit = "5000".to_string();
168
169 let start_time = started_at
170 .map(|t| t - chrono::Duration::minutes(1))
171 .unwrap_or_else(|| chrono::Utc::now() - chrono::Duration::days(30))
172 .timestamp_nanos_opt()
173 .unwrap_or(0)
174 .to_string();
175
176 let end_time = finished_at
177 .unwrap_or_else(chrono::Utc::now)
178 .checked_add_signed(chrono::Duration::minutes(5))
180 .unwrap_or_else(chrono::Utc::now)
181 .timestamp_nanos_opt()
182 .unwrap_or(0)
183 .to_string();
184
185 let resp = client
186 .get(&url)
187 .query(&[
188 ("query", &query),
189 ("direction", &forward),
190 ("limit", &limit),
191 ("start", &start_time),
192 ("end", &end_time),
193 ])
194 .send()
195 .await?;
196
197 if !resp.status().is_success() {
198 return Err(anyhow::anyhow!("Loki returned status {}", resp.status()));
199 }
200
201 let data: Value = resp.json().await?;
202 let mut logs = Vec::new();
203
204 if let Some(streams) = data["data"]["result"].as_array() {
205 for stream in streams {
206 if let Some(values) = stream["values"].as_array() {
207 for entry in values {
208 if let Some(log_line) = entry.get(1).and_then(|v| v.as_str()) {
209 logs.push(log_line.to_string());
210 }
211 }
212 }
213 }
214 }
215
216 Ok(logs)
217 }
218
219 async fn fetch_elasticsearch_logs(
220 &self,
221 es_url: &str,
222 index: &str,
223 job_name: &str,
224 started_at: Option<chrono::DateTime<chrono::Utc>>,
225 finished_at: Option<chrono::DateTime<chrono::Utc>>,
226 ) -> Result<Vec<String>> {
227 let url = format!("{}/{}/_search", es_url.trim_end_matches('/'), index);
228
229 let gte = started_at
230 .map(|t| t - chrono::Duration::minutes(1))
231 .unwrap_or_else(|| chrono::Utc::now() - chrono::Duration::days(30))
232 .to_rfc3339();
233
234 let lte = finished_at
235 .unwrap_or_else(chrono::Utc::now)
236 .checked_add_signed(chrono::Duration::seconds(5))
237 .unwrap_or_else(chrono::Utc::now)
238 .to_rfc3339();
239
240 let query = serde_json::json!({
241 "query": {
242 "bool": {
243 "must": [
244 { "term": { "job_name.keyword": job_name } }
245 ],
246 "filter": [
247 {
248 "range": {
249 "@timestamp": {
250 "gte": gte,
251 "lte": lte
252 }
253 }
254 }
255 ]
256 }
257 },
258 "sort": [
259 { "@timestamp": "asc" }
260 ],
261 "size": 5000
262 });
263
264 let client = self.create_client();
265 let resp = client.post(&url).json(&query).send().await?;
266
267 if !resp.status().is_success() {
268 return Err(anyhow::anyhow!(
269 "Elasticsearch returned status {}",
270 resp.status()
271 ));
272 }
273
274 let data: Value = resp.json().await?;
275 let mut logs = Vec::new();
276
277 if let Some(hits) = data["hits"]["hits"].as_array() {
278 for hit in hits {
279 if let Some(message) = hit["_source"]["message"].as_str() {
280 logs.push(message.to_string());
281 }
282 }
283 }
284
285 Ok(logs)
286 }
287}
288
289#[cfg(test)]
290mod tests {
291 use super::*;
292 use uuid::Uuid;
293 use wiremock::matchers::{method, path};
294 use wiremock::{Mock, MockServer, ResponseTemplate};
295
296 #[tokio::test]
297 async fn test_fetch_loki_logs_success() {
298 let mock_server = MockServer::start().await;
299
300 let loki_response = serde_json::json!({
301 "status": "success",
302 "data": {
303 "resultType": "streams",
304 "result": [
305 {
306 "stream": {
307 "job_name": "storm-test-step-12345678"
308 },
309 "values": [
310 [ "1610000000000000000", "log line 1" ],
311 [ "1610000001000000000", "log line 2" ]
312 ]
313 }
314 ]
315 }
316 });
317
318 Mock::given(method("GET"))
319 .and(path("/loki/api/v1/query_range"))
320 .respond_with(ResponseTemplate::new(200).set_body_json(loki_response))
321 .mount(&mock_server)
322 .await;
323
324 let backend = LogBackend::Loki {
325 url: mock_server.uri(),
326 };
327 let step_id = Uuid::new_v4();
328
329 let logs = backend
330 .fetch_step_logs("test-step", step_id, None, None)
331 .await
332 .expect("Failed to fetch loki logs");
333
334 assert_eq!(logs.len(), 2);
335 assert_eq!(logs[0], "log line 1");
336 assert_eq!(logs[1], "log line 2");
337 }
338
339 #[tokio::test]
340 async fn test_fetch_elasticsearch_logs_success() {
341 let mock_server = MockServer::start().await;
342
343 let es_response = serde_json::json!({
344 "took": 1,
345 "timed_out": false,
346 "_shards": {
347 "total": 1,
348 "successful": 1,
349 "skipped": 0,
350 "failed": 0
351 },
352 "hits": {
353 "total": {
354 "value": 2,
355 "relation": "eq"
356 },
357 "max_score": null,
358 "hits": [
359 {
360 "_index": "logs",
361 "_id": "1",
362 "_score": null,
363 "_source": {
364 "message": "es log line 1"
365 },
366 "sort": [1610000000000_u64]
367 },
368 {
369 "_index": "logs",
370 "_id": "2",
371 "_score": null,
372 "_source": {
373 "message": "es log line 2"
374 },
375 "sort": [1610000001000_u64]
376 }
377 ]
378 }
379 });
380
381 Mock::given(method("POST"))
382 .and(path("/my-index/_search"))
383 .respond_with(ResponseTemplate::new(200).set_body_json(es_response))
384 .mount(&mock_server)
385 .await;
386
387 let backend = LogBackend::Elasticsearch {
388 url: mock_server.uri(),
389 index: "my-index".to_string(),
390 };
391 let step_id = Uuid::new_v4();
392
393 let logs = backend
394 .fetch_step_logs("test-step", step_id, None, None)
395 .await
396 .expect("Failed to fetch es logs");
397
398 assert_eq!(logs.len(), 2);
399 assert_eq!(logs[0], "es log line 1");
400 assert_eq!(logs[1], "es log line 2");
401 }
402
403 #[tokio::test]
404 async fn test_fetch_loki_logs_error() {
405 let mock_server = MockServer::start().await;
406
407 Mock::given(method("GET"))
408 .and(path("/loki/api/v1/query_range"))
409 .respond_with(ResponseTemplate::new(500))
410 .mount(&mock_server)
411 .await;
412
413 let backend = LogBackend::Loki {
414 url: mock_server.uri(),
415 };
416 let step_id = Uuid::new_v4();
417
418 let result = backend
419 .fetch_step_logs("test-step", step_id, None, None)
420 .await;
421
422 assert!(result.is_err());
423 assert!(result
424 .unwrap_err()
425 .to_string()
426 .contains("Loki returned status"));
427 }
428
429 #[tokio::test]
430 async fn test_fetch_elasticsearch_logs_error() {
431 let mock_server = MockServer::start().await;
432
433 Mock::given(method("POST"))
434 .and(path("/my-index/_search"))
435 .respond_with(ResponseTemplate::new(500))
436 .mount(&mock_server)
437 .await;
438
439 let backend = LogBackend::Elasticsearch {
440 url: mock_server.uri(),
441 index: "my-index".to_string(),
442 };
443 let step_id = Uuid::new_v4();
444
445 let result = backend
446 .fetch_step_logs("test-step", step_id, None, None)
447 .await;
448
449 assert!(result.is_err());
450 assert!(result
451 .unwrap_err()
452 .to_string()
453 .contains("Elasticsearch returned status"));
454 }
455
456 #[tokio::test]
457 async fn test_stream_step_logs_elasticsearch_unsupported() {
458 let backend = LogBackend::Elasticsearch {
459 url: "http://localhost:9200".to_string(),
460 index: "my-index".to_string(),
461 };
462 let step_id = Uuid::new_v4();
463
464 let result = backend.stream_step_logs("test-step", step_id).await;
465 assert!(result.is_err());
466 assert!(result
467 .unwrap_err()
468 .to_string()
469 .contains("not currently supported"));
470 }
471
472 #[tokio::test]
473 async fn test_stream_step_logs_loki_connection_refused() {
474 let backend = LogBackend::Loki {
477 url: "http://127.0.0.1:1".to_string(),
478 };
479 let step_id = Uuid::new_v4();
480
481 let result = backend.stream_step_logs("test-step", step_id).await;
482 assert!(result.is_err());
484 }
485}