blueprint_qos/servers/
loki.rs

1use blueprint_core::{debug, info, warn};
2use std::collections::HashMap;
3use std::fs;
4use std::os::unix::fs::PermissionsExt;
5use std::sync::{Arc, Mutex};
6use std::time::Duration;
7use tempfile::{TempDir, TempPath};
8
9use crate::error::{Error, Result};
10use crate::logging::LokiConfig;
11use crate::servers::ServerManager;
12use crate::servers::common::DockerManager;
13
14/// Loki server configuration
15#[derive(Clone, Debug)]
16pub struct LokiServerConfig {
17    /// Port to expose Loki on
18    pub port: u16,
19
20    /// Data directory
21    pub data_dir: String,
22
23    /// Container name
24    pub container_name: String,
25
26    /// Path to the Loki configuration file
27    pub config_path: Option<String>,
28}
29
30impl Default for LokiServerConfig {
31    fn default() -> Self {
32        Self {
33            port: 3100,
34            data_dir: "/var/lib/loki".to_string(),
35            container_name: "blueprint-loki".to_string(),
36            config_path: None,
37        }
38    }
39}
40
41/// Loki server manager
42pub struct LokiServer {
43    /// Docker manager
44    docker: DockerManager,
45
46    /// Server configuration
47    config: LokiServerConfig,
48
49    /// Container ID
50    container_id: Arc<Mutex<Option<String>>>,
51
52    /// Temporary config file path
53    temp_config_path: Arc<Mutex<Option<TempPath>>>,
54
55    /// Temporary data directory
56    temp_data_dir: Arc<Mutex<Option<TempDir>>>,
57}
58
59impl LokiServer {
60    /// Create a new Loki server manager
61    ///
62    /// # Errors
63    /// Returns an error if the Docker manager fails to create a new container
64    pub fn new(config: LokiServerConfig) -> Result<Self> {
65        Ok(Self {
66            docker: DockerManager::new().map_err(|e| Error::DockerConnection(e.to_string()))?,
67            config,
68            container_id: Arc::new(Mutex::new(None)),
69            temp_config_path: Arc::new(Mutex::new(None)),
70            temp_data_dir: Arc::new(Mutex::new(None)),
71        })
72    }
73
74    /// Get the Loki client configuration
75    #[must_use]
76    pub fn client_config(&self) -> LokiConfig {
77        LokiConfig {
78            url: format!("{}/loki/api/v1/push", self.url()),
79            username: None,
80            password: None,
81            batch_size: 1024,
82            labels: HashMap::new(),
83            // TODO: Update once Loki is fixed
84            timeout_secs: 5,
85            otel_config: None,
86        }
87    }
88}
89
90impl ServerManager for LokiServer {
91    async fn start(&self, network: Option<&str>, bind_ip: Option<String>) -> Result<()> {
92        info!("Starting Loki server on port {}", self.config.port);
93
94        let env_vars = HashMap::new();
95        let mut args = Vec::new();
96
97        let mut ports = HashMap::new();
98        ports.insert("3100/tcp".to_string(), self.config.port.to_string());
99
100        let mut volumes = HashMap::new();
101        let temp_data_dir = tempfile::Builder::new().prefix("loki-data-").tempdir()?;
102        fs::set_permissions(temp_data_dir.path(), fs::Permissions::from_mode(0o777))?;
103        volumes.insert(
104            temp_data_dir.path().to_str().unwrap().to_string(),
105            "/loki".to_string(),
106        );
107        self.temp_data_dir.lock().unwrap().replace(temp_data_dir);
108
109        if let Some(config_path) = &self.config.config_path {
110            let config_content = std::fs::read_to_string(config_path)?;
111            let mut temp_file = tempfile::NamedTempFile::new()?;
112            std::io::Write::write_all(&mut temp_file, config_content.as_bytes())?;
113
114            // Set permissions to be world-readable for the Docker container
115            fs::set_permissions(temp_file.path(), fs::Permissions::from_mode(0o644))?;
116
117            let temp_path = temp_file.into_temp_path();
118            let temp_path_str = temp_path.to_str().unwrap().to_string();
119            volumes.insert(
120                temp_path_str.to_string(),
121                "/etc/loki/config.yaml".to_string(),
122            );
123            // Keep the temp file alive until the container is started
124            self.temp_config_path.lock().unwrap().replace(temp_path);
125            args.push("-config.file=/etc/loki/config.yaml".to_string());
126        }
127
128        let health_check_cmd = Some(vec![
129            "CMD-SHELL".to_string(),
130            "wget -q -O /dev/null http://localhost:3100/ready".to_string(),
131        ]);
132
133        let container_id = self
134            .docker
135            .run_container(
136                "grafana/loki:latest",
137                &self.config.container_name,
138                env_vars,
139                ports,
140                volumes,
141                Some(args),
142                None,
143                health_check_cmd,
144                bind_ip,
145            )
146            .await?;
147
148        if let Some(net) = network {
149            info!(
150                "Connecting Loki container {} to network {}",
151                &self.config.container_name, net
152            );
153            self.docker.connect_to_network(&container_id, net).await?;
154        }
155
156        {
157            let mut id = self.container_id.lock().unwrap();
158            *id = Some(container_id.clone());
159        }
160
161        // TODO: Update once Loki is fixed
162        self.wait_until_ready(30).await?;
163
164        info!("Loki server started successfully");
165        Ok(())
166    }
167
168    async fn stop(&self) -> Result<()> {
169        let container_id = {
170            let id = self.container_id.lock().unwrap();
171            match id.as_ref() {
172                Some(id) => id.clone(),
173                None => {
174                    info!("Loki server is not running, nothing to stop.");
175                    return Ok(());
176                }
177            }
178        };
179
180        info!("Stopping Loki server: {}", &self.config.container_name);
181        self.docker
182            .stop_and_remove_container(&container_id, &self.config.container_name)
183            .await?;
184
185        let mut id = self.container_id.lock().unwrap();
186        *id = None;
187
188        info!("Loki server stopped successfully.");
189        Ok(())
190    }
191
192    fn url(&self) -> String {
193        format!("http://localhost:{}", self.config.port)
194    }
195
196    async fn is_running(&self) -> Result<bool> {
197        let container_id = {
198            let id = self.container_id.lock().unwrap();
199            match id.as_ref() {
200                Some(id) => id.clone(),
201                None => return Ok(false),
202            }
203        };
204
205        self.docker.is_container_running(&container_id).await
206    }
207
208    async fn wait_until_ready(&self, timeout_secs: u64) -> Result<()> {
209        let container_id = {
210            let id = self.container_id.lock().unwrap();
211            id.as_ref()
212                .map(String::clone)
213                .ok_or_else(|| Error::Generic("Loki server is not running".to_string()))?
214        };
215
216        info!("Waiting for Loki container to be healthy...");
217        if let Err(e) = self
218            .docker
219            .wait_for_container_health(&container_id, timeout_secs)
220            .await
221        {
222            warn!(
223                "Loki container health check failed: {}. Proceeding with API check.",
224                e
225            );
226        } else {
227            info!("Loki container health check passed.");
228        }
229
230        info!("Waiting for Loki API to be responsive...");
231        let client = reqwest::Client::new();
232        let urls = [
233            format!("{}/ready", self.url()),
234            format!("{}/metrics", self.url()),
235        ];
236        let start_time = tokio::time::Instant::now();
237        let timeout = Duration::from_secs(timeout_secs);
238
239        loop {
240            if start_time.elapsed() > timeout {
241                return Err(Error::Generic(format!(
242                    "Loki API did not become responsive within {} seconds.",
243                    timeout_secs
244                )));
245            }
246
247            for url in &urls {
248                match client.get(url).send().await {
249                    Ok(response) if response.status().is_success() => {
250                        info!("Loki API is responsive at {}.", url);
251                        return Ok(());
252                    }
253                    _ => {}
254                }
255            }
256
257            debug!("Loki API not yet responsive. Retrying...");
258            tokio::time::sleep(Duration::from_secs(1)).await;
259        }
260    }
261}