use crate::logging::{log_error, log_info, log_warn};
use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
use std::fs::File;
use std::io::{BufRead, BufReader, Seek, SeekFrom};
use std::path::Path;
use tokio::time::{sleep, Duration};
#[cfg(feature = "kafka")]
use rdkafka::config::ClientConfig;
#[cfg(feature = "kafka")]
use rdkafka::producer::{FutureProducer, FutureRecord};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogConfig {
pub log_files: Vec<String>,
pub kafka_brokers: Option<String>,
pub kafka_topic: Option<String>,
pub kafka_public_url: Option<String>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct LogEntry {
pub message: String,
pub timestamp: i64,
pub file_path: String,
pub level: String,
pub service: String,
}
impl LogConfig {
pub async fn from_xbp_config() -> Result<Option<Self>> {
let config = match crate::commands::service::load_xbp_config().await {
Ok(cfg) => cfg,
Err(_) => return Ok(None),
};
if let Some(log_files) = config.log_files {
Ok(Some(LogConfig {
log_files,
kafka_brokers: config.kafka_brokers,
kafka_topic: config.kafka_topic,
kafka_public_url: config.kafka_public_url,
}))
} else {
Ok(None)
}
}
}
pub async fn tail_log_file<P: AsRef<Path>>(path: P, service_name: String) -> Result<()> {
let file = File::open(&path).context(format!(
"Failed to open log file: {}",
path.as_ref().display()
))?;
let mut reader = BufReader::new(file);
reader.seek(SeekFrom::End(0))?;
let _ = log_info(
"logs",
&format!("Tailing {}", path.as_ref().display()),
None,
)
.await;
loop {
let mut line = String::new();
match reader.read_line(&mut line) {
Ok(0) => {
sleep(Duration::from_millis(100)).await;
}
Ok(_) => {
let entry = LogEntry {
message: line.trim().to_string(),
timestamp: chrono::Utc::now().timestamp(),
file_path: path.as_ref().display().to_string(),
level: detect_log_level(&line),
service: service_name.clone(),
};
println!("[{}] {}: {}", entry.service, entry.level, entry.message);
}
Err(e) => {
let _ = log_error("logs", "Error reading log file", Some(&e.to_string())).await;
break;
}
}
}
Ok(())
}
fn detect_log_level(line: &str) -> String {
let line_upper = line.to_uppercase();
if line_upper.contains("ERROR") {
"ERROR".to_string()
} else if line_upper.contains("WARN") {
"WARN".to_string()
} else if line_upper.contains("INFO") {
"INFO".to_string()
} else if line_upper.contains("DEBUG") {
"DEBUG".to_string()
} else {
"INFO".to_string()
}
}
#[cfg(feature = "kafka")]
pub async fn ship_logs_to_kafka(config: &LogConfig) -> Result<()> {
let kafka_brokers = config
.kafka_brokers
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Kafka brokers not configured"))?;
let kafka_topic = config
.kafka_topic
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Kafka topic not configured"))?;
let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", kafka_brokers)
.set("message.timeout.ms", "5000")
.create()
.context("Failed to create Kafka producer")?;
let _ = log_info(
"kafka",
&format!("Connected to Kafka: {}", kafka_brokers),
None,
)
.await;
let _ = log_info(
"kafka",
&format!("Shipping logs to topic: {}", kafka_topic),
None,
)
.await;
let mut tasks = Vec::new();
for log_file in &config.log_files {
let log_file = log_file.clone();
let topic = kafka_topic.clone();
let producer_clone = producer.clone();
let task = tokio::spawn(async move {
if let Err(e) = tail_and_ship_to_kafka(log_file, topic, producer_clone).await {
eprintln!("Error tailing log file: {}", e);
}
});
tasks.push(task);
}
for task in tasks {
let _ = task.await;
}
Ok(())
}
#[cfg(feature = "kafka")]
async fn tail_and_ship_to_kafka(
log_file: String,
topic: String,
producer: FutureProducer,
) -> Result<()> {
let file = File::open(&log_file).context(format!("Failed to open log file: {}", log_file))?;
let mut reader = BufReader::new(file);
reader.seek(SeekFrom::End(0))?;
loop {
let mut line = String::new();
match reader.read_line(&mut line) {
Ok(0) => {
sleep(Duration::from_millis(100)).await;
}
Ok(_) => {
let entry = LogEntry {
message: line.trim().to_string(),
timestamp: chrono::Utc::now().timestamp(),
file_path: log_file.clone(),
level: detect_log_level(&line),
service: extract_service_name(&log_file),
};
let payload = serde_json::to_string(&entry)?;
let record = FutureRecord::to(&topic)
.payload(&payload)
.key(&entry.service);
match producer.send(record, Duration::from_secs(0)).await {
Ok(_) => {}
Err((e, _)) => {
eprintln!("Failed to send log to Kafka: {:?}", e);
}
}
}
Err(e) => {
eprintln!("Error reading log file: {}", e);
break;
}
}
}
Ok(())
}
fn extract_service_name(log_file: &str) -> String {
Path::new(log_file)
.file_stem()
.and_then(|s| s.to_str())
.map(|s| s.to_string())
.unwrap_or_else(|| "unknown".to_string())
}
#[cfg(feature = "kafka")]
pub async fn tail_kafka_topic(config: &LogConfig) -> Result<()> {
use futures::StreamExt;
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::Message;
let kafka_brokers = config
.kafka_brokers
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Kafka brokers not configured"))?;
let kafka_topic = config
.kafka_topic
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Kafka topic not configured"))?;
let consumer: StreamConsumer = ClientConfig::new()
.set("group.id", "xbp-log-consumer")
.set("bootstrap.servers", kafka_brokers)
.set("enable.auto.commit", "true")
.set("auto.offset.reset", "latest")
.create()
.context("Failed to create Kafka consumer")?;
consumer
.subscribe(&[kafka_topic])
.context("Failed to subscribe to Kafka topic")?;
let _ = log_info(
"kafka",
&format!("Tailing Kafka topic: {}", kafka_topic),
None,
)
.await;
let mut stream = consumer.stream();
while let Some(message) = stream.next().await {
match message {
Ok(msg) => {
if let Some(payload) = msg.payload() {
if let Ok(payload_str) = std::str::from_utf8(payload) {
if let Ok(entry) = serde_json::from_str::<LogEntry>(payload_str) {
println!("[{}] {}: {}", entry.service, entry.level, entry.message);
} else {
println!("{}", payload_str);
}
}
}
}
Err(e) => {
let _ = log_error("kafka", "Error consuming message", Some(&e.to_string())).await;
}
}
}
Ok(())
}
#[cfg(not(feature = "kafka"))]
pub async fn ship_logs_to_kafka(_config: &LogConfig) -> Result<()> {
let _ = log_error(
"kafka",
"Kafka feature not enabled",
Some("Rebuild with --features kafka"),
)
.await;
Err(anyhow::anyhow!(
"Kafka feature not enabled. Rebuild with --features kafka"
))
}
#[cfg(not(feature = "kafka"))]
pub async fn tail_kafka_topic(_config: &LogConfig) -> Result<()> {
let _ = log_error(
"kafka",
"Kafka feature not enabled",
Some("Rebuild with --features kafka"),
)
.await;
Err(anyhow::anyhow!(
"Kafka feature not enabled. Rebuild with --features kafka"
))
}
pub async fn start_log_shipping() -> Result<()> {
match LogConfig::from_xbp_config().await? {
Some(config) => {
if config.kafka_brokers.is_some() && config.kafka_topic.is_some() {
ship_logs_to_kafka(&config).await
} else {
let _ = log_warn("logs", "Kafka not configured, starting local tail", None).await;
if config.log_files.is_empty() {
let _ = log_error("logs", "No log files configured", None).await;
return Err(anyhow::anyhow!("No log files configured in xbp.json"));
}
let mut tasks = Vec::new();
for log_file in config.log_files {
let service_name = extract_service_name(&log_file);
let task = tokio::task::spawn_local(async move {
if let Err(e) = tail_log_file(log_file, service_name).await {
eprintln!("Error tailing log: {}", e);
}
});
tasks.push(task);
}
for task in tasks {
let _ = task.await;
}
Ok(())
}
}
None => {
let _ = log_error("logs", "No log configuration found in xbp.json", None).await;
Err(anyhow::anyhow!("Log configuration required"))
}
}
}