xbp 10.14.1

XBP is a zero-config build pack that can also interact with proxies, kafka, sockets, synthetic monitors.
Documentation
use crate::logging::{log_error, log_info, log_warn};
use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
use std::fs::File;
use std::io::{BufRead, BufReader, Seek, SeekFrom};
use std::path::Path;
use tokio::time::{sleep, Duration};

#[cfg(feature = "kafka")]
use rdkafka::config::ClientConfig;
#[cfg(feature = "kafka")]
use rdkafka::producer::{FutureProducer, FutureRecord};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogConfig {
    pub log_files: Vec<String>,
    pub kafka_brokers: Option<String>,
    pub kafka_topic: Option<String>,
    pub kafka_public_url: Option<String>,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct LogEntry {
    pub message: String,
    pub timestamp: i64,
    pub file_path: String,
    pub level: String,
    pub service: String,
}

impl LogConfig {
    pub async fn from_xbp_config() -> Result<Option<Self>> {
        let config = match crate::commands::service::load_xbp_config().await {
            Ok(cfg) => cfg,
            Err(_) => return Ok(None),
        };

        if let Some(log_files) = config.log_files {
            Ok(Some(LogConfig {
                log_files,
                kafka_brokers: config.kafka_brokers,
                kafka_topic: config.kafka_topic,
                kafka_public_url: config.kafka_public_url,
            }))
        } else {
            Ok(None)
        }
    }
}

pub async fn tail_log_file<P: AsRef<Path>>(path: P, service_name: String) -> Result<()> {
    let file = File::open(&path).context(format!(
        "Failed to open log file: {}",
        path.as_ref().display()
    ))?;

    let mut reader = BufReader::new(file);
    reader.seek(SeekFrom::End(0))?;

    let _ = log_info(
        "logs",
        &format!("Tailing {}", path.as_ref().display()),
        None,
    )
    .await;

    loop {
        let mut line = String::new();
        match reader.read_line(&mut line) {
            Ok(0) => {
                sleep(Duration::from_millis(100)).await;
            }
            Ok(_) => {
                let entry = LogEntry {
                    message: line.trim().to_string(),
                    timestamp: chrono::Utc::now().timestamp(),
                    file_path: path.as_ref().display().to_string(),
                    level: detect_log_level(&line),
                    service: service_name.clone(),
                };

                println!("[{}] {}: {}", entry.service, entry.level, entry.message);
            }
            Err(e) => {
                let _ = log_error("logs", "Error reading log file", Some(&e.to_string())).await;
                break;
            }
        }
    }

    Ok(())
}

fn detect_log_level(line: &str) -> String {
    let line_upper = line.to_uppercase();
    if line_upper.contains("ERROR") {
        "ERROR".to_string()
    } else if line_upper.contains("WARN") {
        "WARN".to_string()
    } else if line_upper.contains("INFO") {
        "INFO".to_string()
    } else if line_upper.contains("DEBUG") {
        "DEBUG".to_string()
    } else {
        "INFO".to_string()
    }
}

#[cfg(feature = "kafka")]
pub async fn ship_logs_to_kafka(config: &LogConfig) -> Result<()> {
    let kafka_brokers = config
        .kafka_brokers
        .as_ref()
        .ok_or_else(|| anyhow::anyhow!("Kafka brokers not configured"))?;

    let kafka_topic = config
        .kafka_topic
        .as_ref()
        .ok_or_else(|| anyhow::anyhow!("Kafka topic not configured"))?;

    let producer: FutureProducer = ClientConfig::new()
        .set("bootstrap.servers", kafka_brokers)
        .set("message.timeout.ms", "5000")
        .create()
        .context("Failed to create Kafka producer")?;

    let _ = log_info(
        "kafka",
        &format!("Connected to Kafka: {}", kafka_brokers),
        None,
    )
    .await;
    let _ = log_info(
        "kafka",
        &format!("Shipping logs to topic: {}", kafka_topic),
        None,
    )
    .await;

    let mut tasks = Vec::new();

    for log_file in &config.log_files {
        let log_file = log_file.clone();
        let topic = kafka_topic.clone();
        let producer_clone = producer.clone();

        let task = tokio::spawn(async move {
            if let Err(e) = tail_and_ship_to_kafka(log_file, topic, producer_clone).await {
                eprintln!("Error tailing log file: {}", e);
            }
        });

        tasks.push(task);
    }

    for task in tasks {
        let _ = task.await;
    }

    Ok(())
}

#[cfg(feature = "kafka")]
async fn tail_and_ship_to_kafka(
    log_file: String,
    topic: String,
    producer: FutureProducer,
) -> Result<()> {
    let file = File::open(&log_file).context(format!("Failed to open log file: {}", log_file))?;

    let mut reader = BufReader::new(file);
    reader.seek(SeekFrom::End(0))?;

    loop {
        let mut line = String::new();
        match reader.read_line(&mut line) {
            Ok(0) => {
                sleep(Duration::from_millis(100)).await;
            }
            Ok(_) => {
                let entry = LogEntry {
                    message: line.trim().to_string(),
                    timestamp: chrono::Utc::now().timestamp(),
                    file_path: log_file.clone(),
                    level: detect_log_level(&line),
                    service: extract_service_name(&log_file),
                };

                let payload = serde_json::to_string(&entry)?;

                let record = FutureRecord::to(&topic)
                    .payload(&payload)
                    .key(&entry.service);

                match producer.send(record, Duration::from_secs(0)).await {
                    Ok(_) => {}
                    Err((e, _)) => {
                        eprintln!("Failed to send log to Kafka: {:?}", e);
                    }
                }
            }
            Err(e) => {
                eprintln!("Error reading log file: {}", e);
                break;
            }
        }
    }

    Ok(())
}

fn extract_service_name(log_file: &str) -> String {
    Path::new(log_file)
        .file_stem()
        .and_then(|s| s.to_str())
        .map(|s| s.to_string())
        .unwrap_or_else(|| "unknown".to_string())
}

#[cfg(feature = "kafka")]
pub async fn tail_kafka_topic(config: &LogConfig) -> Result<()> {
    use futures::StreamExt;
    use rdkafka::consumer::{Consumer, StreamConsumer};
    use rdkafka::Message;

    let kafka_brokers = config
        .kafka_brokers
        .as_ref()
        .ok_or_else(|| anyhow::anyhow!("Kafka brokers not configured"))?;

    let kafka_topic = config
        .kafka_topic
        .as_ref()
        .ok_or_else(|| anyhow::anyhow!("Kafka topic not configured"))?;

    let consumer: StreamConsumer = ClientConfig::new()
        .set("group.id", "xbp-log-consumer")
        .set("bootstrap.servers", kafka_brokers)
        .set("enable.auto.commit", "true")
        .set("auto.offset.reset", "latest")
        .create()
        .context("Failed to create Kafka consumer")?;

    consumer
        .subscribe(&[kafka_topic])
        .context("Failed to subscribe to Kafka topic")?;

    let _ = log_info(
        "kafka",
        &format!("Tailing Kafka topic: {}", kafka_topic),
        None,
    )
    .await;

    let mut stream = consumer.stream();

    while let Some(message) = stream.next().await {
        match message {
            Ok(msg) => {
                if let Some(payload) = msg.payload() {
                    if let Ok(payload_str) = std::str::from_utf8(payload) {
                        if let Ok(entry) = serde_json::from_str::<LogEntry>(payload_str) {
                            println!("[{}] {}: {}", entry.service, entry.level, entry.message);
                        } else {
                            println!("{}", payload_str);
                        }
                    }
                }
            }
            Err(e) => {
                let _ = log_error("kafka", "Error consuming message", Some(&e.to_string())).await;
            }
        }
    }

    Ok(())
}

#[cfg(not(feature = "kafka"))]
pub async fn ship_logs_to_kafka(_config: &LogConfig) -> Result<()> {
    let _ = log_error(
        "kafka",
        "Kafka feature not enabled",
        Some("Rebuild with --features kafka"),
    )
    .await;
    Err(anyhow::anyhow!(
        "Kafka feature not enabled. Rebuild with --features kafka"
    ))
}

#[cfg(not(feature = "kafka"))]
pub async fn tail_kafka_topic(_config: &LogConfig) -> Result<()> {
    let _ = log_error(
        "kafka",
        "Kafka feature not enabled",
        Some("Rebuild with --features kafka"),
    )
    .await;
    Err(anyhow::anyhow!(
        "Kafka feature not enabled. Rebuild with --features kafka"
    ))
}
pub async fn start_log_shipping() -> Result<()> {
    match LogConfig::from_xbp_config().await? {
        Some(config) => {
            if config.kafka_brokers.is_some() && config.kafka_topic.is_some() {
                ship_logs_to_kafka(&config).await
            } else {
                let _ = log_warn("logs", "Kafka not configured, starting local tail", None).await;

                if config.log_files.is_empty() {
                    let _ = log_error("logs", "No log files configured", None).await;
                    return Err(anyhow::anyhow!("No log files configured in xbp.json"));
                }

                let mut tasks = Vec::new();
                for log_file in config.log_files {
                    let service_name = extract_service_name(&log_file);
                    let task = tokio::task::spawn_local(async move {
                        if let Err(e) = tail_log_file(log_file, service_name).await {
                            eprintln!("Error tailing log: {}", e);
                        }
                    });
                    tasks.push(task);
                }

                for task in tasks {
                    let _ = task.await;
                }

                Ok(())
            }
        }
        None => {
            let _ = log_error("logs", "No log configuration found in xbp.json", None).await;
            Err(anyhow::anyhow!("Log configuration required"))
        }
    }
}