Skip to main content

stormchaser_model/logging/
mod.rs

1pub mod elasticsearch;
2pub mod loki;
3
4use crate::id::*;
5use anyhow::Result;
6use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
7use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
8use serde::{Deserialize, Serialize};
9use tokio::sync::mpsc;
10
11/// Represents the supported logging backends for step execution logs.
12#[derive(Debug, Clone, Serialize, Deserialize)]
13pub enum LogBackend {
14    /// Grafana Loki backend.
15    Loki {
16        /// The URL of the Loki server.
17        url: String,
18    },
19    /// Elasticsearch backend.
20    Elasticsearch {
21        /// The URL of the Elasticsearch server.
22        url: String,
23        /// The Elasticsearch index to query.
24        index: String,
25    },
26}
27
28impl LogBackend {
29    pub(crate) fn create_client(&self) -> ClientWithMiddleware {
30        let retry_policy = ExponentialBackoff::builder().build_with_max_retries(5);
31        ClientBuilder::new(reqwest::Client::new())
32            .with(RetryTransientMiddleware::new_with_policy(retry_policy))
33            .build()
34    }
35
36    /// Fetches historical logs for a specific step instance.
37    pub async fn fetch_step_logs(
38        &self,
39        step_name: &str,
40        step_id: StepInstanceId,
41        started_at: Option<chrono::DateTime<chrono::Utc>>,
42        finished_at: Option<chrono::DateTime<chrono::Utc>>,
43        limit: Option<usize>,
44    ) -> Result<Vec<String>> {
45        let job_name = format!(
46            "storm-{}-{}",
47            step_name.to_lowercase().replace('_', "-"),
48            &step_id.to_string()[..8]
49        );
50
51        match self {
52            LogBackend::Loki { url } => {
53                loki::fetch_loki_logs(self, url, &job_name, started_at, finished_at, limit).await
54            }
55            LogBackend::Elasticsearch { url, index } => {
56                elasticsearch::fetch_elasticsearch_logs(
57                    self,
58                    url,
59                    index,
60                    &job_name,
61                    started_at,
62                    finished_at,
63                    limit,
64                )
65                .await
66            }
67        }
68    }
69
70    /// Opens a real-time stream of log lines for a currently executing step instance.
71    pub async fn stream_step_logs(
72        &self,
73        step_name: &str,
74        step_id: StepInstanceId,
75    ) -> Result<mpsc::Receiver<Result<String>>> {
76        let job_name = format!(
77            "storm-{}-{}",
78            step_name.to_lowercase().replace('_', "-"),
79            &step_id.to_string()[..8]
80        );
81
82        match self {
83            LogBackend::Loki { url } => loki::stream_loki_logs(url, &job_name).await,
84            LogBackend::Elasticsearch { .. } => {
85                anyhow::bail!(
86                    "Log streaming is not currently supported for Elasticsearch backends"
87                );
88            }
89        }
90    }
91}
92
93#[cfg(test)]
94mod tests {
95    use super::*;
96
97    #[tokio::test]
98    async fn test_stream_step_logs_elasticsearch_unsupported() {
99        let backend = LogBackend::Elasticsearch {
100            url: "http://localhost:9200".to_string(),
101            index: "my-index".to_string(),
102        };
103        let step_id = StepInstanceId::new_v4();
104
105        let result = backend.stream_step_logs("test-step", step_id).await;
106        assert!(result.is_err());
107        assert!(result
108            .unwrap_err()
109            .to_string()
110            .contains("not currently supported"));
111    }
112}