use crate::core::error::{Error, Result};
use crate::monitoring::logs::{LogEntry, LogLevel};
use blueprint_core::{debug, error, info};
use blueprint_std::collections::HashMap;
use serde::{Deserialize, Serialize};
pub struct LokiClient {
base_url: String,
client: reqwest::Client,
labels: HashMap<String, String>,
}
impl LokiClient {
pub fn new(base_url: String) -> Result<Self> {
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(30))
.build()
.map_err(|e| Error::Other(format!("Failed to create HTTP client: {e}")))?;
let mut labels = HashMap::new();
labels.insert("job".to_string(), "blueprint".to_string());
labels.insert("environment".to_string(), "production".to_string());
Ok(Self {
base_url,
client,
labels,
})
}
pub async fn push_logs(&self, entries: Vec<LogEntry>) -> Result<()> {
if entries.is_empty() {
return Ok(());
}
let num_entries = entries.len();
let streams = self.entries_to_streams(entries);
let push_request = PushRequest { streams };
let url = format!("{}/loki/api/v1/push", self.base_url);
let response = self
.client
.post(&url)
.json(&push_request)
.send()
.await
.map_err(|e| Error::Other(format!("Failed to push logs to Loki: {e}")))?;
if !response.status().is_success() {
let error_text = response.text().await.unwrap_or_default();
return Err(Error::Other(format!("Loki push failed: {error_text}")));
}
debug!("Successfully pushed {} log entries to Loki", num_entries);
Ok(())
}
pub async fn query_logs(
&self,
query: &str,
start: Option<i64>,
end: Option<i64>,
limit: Option<u32>,
) -> Result<Vec<LogEntry>> {
let url = format!("{}/loki/api/v1/query_range", self.base_url);
let mut params = vec![
("query".to_string(), query.to_string()),
("limit".to_string(), limit.unwrap_or(1000).to_string()),
];
if let Some(start) = start {
params.push(("start".to_string(), start.to_string()));
}
if let Some(end) = end {
params.push(("end".to_string(), end.to_string()));
}
let response = self
.client
.get(&url)
.query(¶ms)
.send()
.await
.map_err(|e| Error::Other(format!("Failed to query Loki: {e}")))?;
if !response.status().is_success() {
let error_text = response.text().await.unwrap_or_default();
return Err(Error::Other(format!("Loki query failed: {error_text}")));
}
let query_response: QueryResponse = response
.json()
.await
.map_err(|e| Error::Other(format!("Failed to parse Loki response: {e}")))?;
Ok(self.parse_query_response(query_response))
}
fn entries_to_streams(&self, entries: Vec<LogEntry>) -> Vec<Stream> {
let mut streams_map: HashMap<String, Vec<[String; 2]>> = HashMap::new();
for entry in entries {
let mut labels = self.labels.clone();
labels.insert("service_id".to_string(), entry.service_id.clone());
labels.insert(
"level".to_string(),
format!("{:?}", entry.level).to_lowercase(),
);
if let Some(container_id) = &entry.container_id {
labels.insert("container_id".to_string(), container_id.clone());
}
for (key, value) in entry.metadata.iter().take(5) {
labels.insert(key.clone(), value.clone());
}
let labels_str = format_labels(&labels);
let timestamp = entry
.timestamp
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos()
.to_string();
streams_map
.entry(labels_str)
.or_default()
.push([timestamp, entry.message]);
}
streams_map
.into_iter()
.map(|(stream, values)| Stream { stream, values })
.collect()
}
fn parse_query_response(&self, response: QueryResponse) -> Vec<LogEntry> {
let mut entries = Vec::new();
if let Some(result) = response.data.result.first() {
for value in &result.values {
if value.len() >= 2 {
let timestamp_ns: i64 = value[0].parse().unwrap_or(0);
let message = value[1].clone();
let timestamp = std::time::UNIX_EPOCH
+ std::time::Duration::from_nanos(timestamp_ns as u64);
let mut metadata = HashMap::new();
for (key, value) in &result.stream {
if key != "service_id" && key != "level" && key != "container_id" {
metadata.insert(key.clone(), value.clone());
}
}
entries.push(LogEntry {
timestamp,
service_id: result
.stream
.get("service_id")
.cloned()
.unwrap_or_else(|| "unknown".to_string()),
container_id: result.stream.get("container_id").cloned(),
level: result
.stream
.get("level")
.map(|s| LogLevel::from(s.as_str()))
.unwrap_or(LogLevel::Info),
message,
metadata,
});
}
}
}
entries
}
pub async fn setup_local_loki() -> Result<()> {
info!("Setting up local Loki instance");
let output = tokio::process::Command::new("docker")
.args(["ps", "--filter", "name=loki", "--format", "{{.Names}}"])
.output()
.await
.map_err(|e| Error::Other(format!("Failed to check Docker: {e}")))?;
if String::from_utf8_lossy(&output.stdout).contains("loki") {
info!("Loki is already running");
return Ok(());
}
let output = tokio::process::Command::new("docker")
.args([
"run",
"-d",
"--name",
"loki",
"-p",
"3100:3100",
"-v",
"/tmp/loki:/loki",
"grafana/loki:latest",
"-config.file=/etc/loki/local-config.yaml",
])
.output()
.await
.map_err(|e| Error::Other(format!("Failed to start Loki: {e}")))?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
if !stderr.contains("already in use") {
return Err(Error::Other(format!("Failed to start Loki: {stderr}")));
}
}
info!("Loki started successfully on port 3100");
let _ = tokio::process::Command::new("docker")
.args([
"run",
"-d",
"--name",
"grafana",
"-p",
"3000:3000",
"--link",
"loki:loki",
"grafana/grafana:latest",
])
.output()
.await;
info!("Grafana started on port 3000 (admin/admin)");
Ok(())
}
}
fn format_labels(labels: &HashMap<String, String>) -> String {
let mut parts: Vec<String> = labels
.iter()
.map(|(k, v)| format!("{}=\"{}\"", k, v.replace('"', "\\\"")))
.collect();
parts.sort();
format!("{{{}}}", parts.join(","))
}
#[derive(Debug, Serialize)]
struct PushRequest {
streams: Vec<Stream>,
}
#[derive(Debug, Serialize)]
struct Stream {
stream: String,
values: Vec<[String; 2]>,
}
#[derive(Debug, Deserialize)]
struct QueryResponse {
data: QueryData,
}
#[derive(Debug, Deserialize)]
struct QueryData {
result: Vec<QueryResult>,
}
#[derive(Debug, Deserialize)]
struct QueryResult {
stream: HashMap<String, String>,
values: Vec<Vec<String>>,
}
pub struct LogAggregationPipeline {
loki_client: LokiClient,
buffer: Vec<LogEntry>,
buffer_size: usize,
flush_interval: std::time::Duration,
}
impl LogAggregationPipeline {
pub fn new(loki_url: String, buffer_size: usize) -> Result<Self> {
Ok(Self {
loki_client: LokiClient::new(loki_url)?,
buffer: Vec::with_capacity(buffer_size),
buffer_size,
flush_interval: std::time::Duration::from_secs(10),
})
}
pub async fn add_entry(&mut self, entry: LogEntry) -> Result<()> {
self.buffer.push(entry);
if self.buffer.len() >= self.buffer_size {
self.flush().await?;
}
Ok(())
}
pub async fn flush(&mut self) -> Result<()> {
if self.buffer.is_empty() {
return Ok(());
}
let entries: Vec<LogEntry> = std::mem::take(&mut self.buffer);
self.loki_client.push_logs(entries).await?;
Ok(())
}
pub fn start_background_flush(mut self) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut interval = tokio::time::interval(self.flush_interval);
loop {
interval.tick().await;
if let Err(e) = self.flush().await {
error!("Failed to flush logs to Loki: {}", e);
}
}
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_format_labels() {
let mut labels = HashMap::new();
labels.insert("job".to_string(), "test".to_string());
labels.insert("env".to_string(), "prod".to_string());
let formatted = format_labels(&labels);
assert!(formatted.contains("env=\"prod\""));
assert!(formatted.contains("job=\"test\""));
assert!(formatted.starts_with('{'));
assert!(formatted.ends_with('}'));
}
#[tokio::test]
async fn test_loki_client_creation() {
let client = LokiClient::new("http://localhost:3100".to_string());
assert!(client.is_ok());
}
}