stormchaser_model/logging/
mod.rs1pub mod elasticsearch;
2pub mod loki;
3
4use crate::id::*;
5use anyhow::Result;
6use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
7use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
8use serde::{Deserialize, Serialize};
9use tokio::sync::mpsc;
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
13pub enum LogBackend {
14 Loki {
16 url: String,
18 },
19 Elasticsearch {
21 url: String,
23 index: String,
25 },
26}
27
28impl LogBackend {
29 pub(crate) fn create_client(&self) -> ClientWithMiddleware {
30 let retry_policy = ExponentialBackoff::builder().build_with_max_retries(5);
31 ClientBuilder::new(reqwest::Client::new())
32 .with(RetryTransientMiddleware::new_with_policy(retry_policy))
33 .build()
34 }
35
36 pub async fn fetch_step_logs(
38 &self,
39 step_name: &str,
40 step_id: StepInstanceId,
41 started_at: Option<chrono::DateTime<chrono::Utc>>,
42 finished_at: Option<chrono::DateTime<chrono::Utc>>,
43 limit: Option<usize>,
44 ) -> Result<Vec<String>> {
45 let job_name = format!(
46 "storm-{}-{}",
47 step_name.to_lowercase().replace('_', "-"),
48 &step_id.to_string()[..8]
49 );
50
51 match self {
52 LogBackend::Loki { url } => {
53 loki::fetch_loki_logs(self, url, &job_name, started_at, finished_at, limit).await
54 }
55 LogBackend::Elasticsearch { url, index } => {
56 elasticsearch::fetch_elasticsearch_logs(
57 self,
58 url,
59 index,
60 &job_name,
61 started_at,
62 finished_at,
63 limit,
64 )
65 .await
66 }
67 }
68 }
69
70 pub async fn stream_step_logs(
72 &self,
73 step_name: &str,
74 step_id: StepInstanceId,
75 ) -> Result<mpsc::Receiver<Result<String>>> {
76 let job_name = format!(
77 "storm-{}-{}",
78 step_name.to_lowercase().replace('_', "-"),
79 &step_id.to_string()[..8]
80 );
81
82 match self {
83 LogBackend::Loki { url } => loki::stream_loki_logs(url, &job_name).await,
84 LogBackend::Elasticsearch { .. } => {
85 anyhow::bail!(
86 "Log streaming is not currently supported for Elasticsearch backends"
87 );
88 }
89 }
90 }
91}
92
93#[cfg(test)]
94mod tests {
95 use super::*;
96
97 #[tokio::test]
98 async fn test_stream_step_logs_elasticsearch_unsupported() {
99 let backend = LogBackend::Elasticsearch {
100 url: "http://localhost:9200".to_string(),
101 index: "my-index".to_string(),
102 };
103 let step_id = StepInstanceId::new_v4();
104
105 let result = backend.stream_step_logs("test-step", step_id).await;
106 assert!(result.is_err());
107 assert!(result
108 .unwrap_err()
109 .to_string()
110 .contains("not currently supported"));
111 }
112}