blueprint_qos/servers/
loki.rs1use blueprint_core::{debug, info, warn};
2use std::collections::HashMap;
3use std::fs;
4use std::os::unix::fs::PermissionsExt;
5use std::sync::{Arc, Mutex};
6use std::time::Duration;
7use tempfile::{TempDir, TempPath};
8
9use crate::error::{Error, Result};
10use crate::logging::LokiConfig;
11use crate::servers::ServerManager;
12use crate::servers::common::DockerManager;
13
14#[derive(Clone, Debug)]
16pub struct LokiServerConfig {
17 pub port: u16,
19
20 pub data_dir: String,
22
23 pub container_name: String,
25
26 pub config_path: Option<String>,
28}
29
30impl Default for LokiServerConfig {
31 fn default() -> Self {
32 Self {
33 port: 3100,
34 data_dir: "/var/lib/loki".to_string(),
35 container_name: "blueprint-loki".to_string(),
36 config_path: None,
37 }
38 }
39}
40
41pub struct LokiServer {
43 docker: DockerManager,
45
46 config: LokiServerConfig,
48
49 container_id: Arc<Mutex<Option<String>>>,
51
52 temp_config_path: Arc<Mutex<Option<TempPath>>>,
54
55 temp_data_dir: Arc<Mutex<Option<TempDir>>>,
57}
58
59impl LokiServer {
60 pub fn new(config: LokiServerConfig) -> Result<Self> {
65 Ok(Self {
66 docker: DockerManager::new().map_err(|e| Error::DockerConnection(e.to_string()))?,
67 config,
68 container_id: Arc::new(Mutex::new(None)),
69 temp_config_path: Arc::new(Mutex::new(None)),
70 temp_data_dir: Arc::new(Mutex::new(None)),
71 })
72 }
73
74 #[must_use]
76 pub fn client_config(&self) -> LokiConfig {
77 LokiConfig {
78 url: format!("{}/loki/api/v1/push", self.url()),
79 username: None,
80 password: None,
81 batch_size: 1024,
82 labels: HashMap::new(),
83 timeout_secs: 5,
85 otel_config: None,
86 }
87 }
88}
89
90impl ServerManager for LokiServer {
91 async fn start(&self, network: Option<&str>, bind_ip: Option<String>) -> Result<()> {
92 info!("Starting Loki server on port {}", self.config.port);
93
94 let env_vars = HashMap::new();
95 let mut args = Vec::new();
96
97 let mut ports = HashMap::new();
98 ports.insert("3100/tcp".to_string(), self.config.port.to_string());
99
100 let mut volumes = HashMap::new();
101 let temp_data_dir = tempfile::Builder::new().prefix("loki-data-").tempdir()?;
102 fs::set_permissions(temp_data_dir.path(), fs::Permissions::from_mode(0o777))?;
103 volumes.insert(
104 temp_data_dir.path().to_str().unwrap().to_string(),
105 "/loki".to_string(),
106 );
107 self.temp_data_dir.lock().unwrap().replace(temp_data_dir);
108
109 if let Some(config_path) = &self.config.config_path {
110 let config_content = std::fs::read_to_string(config_path)?;
111 let mut temp_file = tempfile::NamedTempFile::new()?;
112 std::io::Write::write_all(&mut temp_file, config_content.as_bytes())?;
113
114 fs::set_permissions(temp_file.path(), fs::Permissions::from_mode(0o644))?;
116
117 let temp_path = temp_file.into_temp_path();
118 let temp_path_str = temp_path.to_str().unwrap().to_string();
119 volumes.insert(
120 temp_path_str.to_string(),
121 "/etc/loki/config.yaml".to_string(),
122 );
123 self.temp_config_path.lock().unwrap().replace(temp_path);
125 args.push("-config.file=/etc/loki/config.yaml".to_string());
126 }
127
128 let health_check_cmd = Some(vec![
129 "CMD-SHELL".to_string(),
130 "wget -q -O /dev/null http://localhost:3100/ready".to_string(),
131 ]);
132
133 let container_id = self
134 .docker
135 .run_container(
136 "grafana/loki:latest",
137 &self.config.container_name,
138 env_vars,
139 ports,
140 volumes,
141 Some(args),
142 None,
143 health_check_cmd,
144 bind_ip,
145 )
146 .await?;
147
148 if let Some(net) = network {
149 info!(
150 "Connecting Loki container {} to network {}",
151 &self.config.container_name, net
152 );
153 self.docker.connect_to_network(&container_id, net).await?;
154 }
155
156 {
157 let mut id = self.container_id.lock().unwrap();
158 *id = Some(container_id.clone());
159 }
160
161 self.wait_until_ready(30).await?;
163
164 info!("Loki server started successfully");
165 Ok(())
166 }
167
168 async fn stop(&self) -> Result<()> {
169 let container_id = {
170 let id = self.container_id.lock().unwrap();
171 match id.as_ref() {
172 Some(id) => id.clone(),
173 None => {
174 info!("Loki server is not running, nothing to stop.");
175 return Ok(());
176 }
177 }
178 };
179
180 info!("Stopping Loki server: {}", &self.config.container_name);
181 self.docker
182 .stop_and_remove_container(&container_id, &self.config.container_name)
183 .await?;
184
185 let mut id = self.container_id.lock().unwrap();
186 *id = None;
187
188 info!("Loki server stopped successfully.");
189 Ok(())
190 }
191
192 fn url(&self) -> String {
193 format!("http://localhost:{}", self.config.port)
194 }
195
196 async fn is_running(&self) -> Result<bool> {
197 let container_id = {
198 let id = self.container_id.lock().unwrap();
199 match id.as_ref() {
200 Some(id) => id.clone(),
201 None => return Ok(false),
202 }
203 };
204
205 self.docker.is_container_running(&container_id).await
206 }
207
208 async fn wait_until_ready(&self, timeout_secs: u64) -> Result<()> {
209 let container_id = {
210 let id = self.container_id.lock().unwrap();
211 id.as_ref()
212 .map(String::clone)
213 .ok_or_else(|| Error::Generic("Loki server is not running".to_string()))?
214 };
215
216 info!("Waiting for Loki container to be healthy...");
217 if let Err(e) = self
218 .docker
219 .wait_for_container_health(&container_id, timeout_secs)
220 .await
221 {
222 warn!(
223 "Loki container health check failed: {}. Proceeding with API check.",
224 e
225 );
226 } else {
227 info!("Loki container health check passed.");
228 }
229
230 info!("Waiting for Loki API to be responsive...");
231 let client = reqwest::Client::new();
232 let urls = [
233 format!("{}/ready", self.url()),
234 format!("{}/metrics", self.url()),
235 ];
236 let start_time = tokio::time::Instant::now();
237 let timeout = Duration::from_secs(timeout_secs);
238
239 loop {
240 if start_time.elapsed() > timeout {
241 return Err(Error::Generic(format!(
242 "Loki API did not become responsive within {} seconds.",
243 timeout_secs
244 )));
245 }
246
247 for url in &urls {
248 match client.get(url).send().await {
249 Ok(response) if response.status().is_success() => {
250 info!("Loki API is responsive at {}.", url);
251 return Ok(());
252 }
253 _ => {}
254 }
255 }
256
257 debug!("Loki API not yet responsive. Retrying...");
258 tokio::time::sleep(Duration::from_secs(1)).await;
259 }
260 }
261}