use crate::config::{setup_logging, LogConfig, LogOutput};
use futures::Stream;
use futures::StreamExt;
use std::pin::Pin;
use std::collections::HashMap;
use tokio::sync::mpsc;
use tonic::{Request, Response, Status};
use tonic_web::GrpcWebLayer;
use tracing::{info, trace, warn};
pub mod logging {
tonic::include_proto!("logging");
}
use logging::log_service_server::LogService;
use logging::log_service_server::LogServiceServer;
use logging::{LogMessage, SubscribeRequest};
use std::sync::Arc;
use tokio::sync::Mutex;
use tonic::transport::Server;
use tonic_reflection::server::Builder;
use tower_http::cors::{Any, CorsLayer};
#[derive(Debug, Clone)]
pub struct LoggingService {
clients: Arc<Mutex<HashMap<String, mpsc::UnboundedSender<LogMessage>>>>,
server_handle: Arc<Mutex<Option<tokio::task::JoinHandle<Result<(), Box<dyn std::error::Error + Send + Sync>>>>>>,
log_all_messages: Arc<Mutex<bool>>,
}
impl Default for LoggingService {
fn default() -> Self {
Self::new()
}
}
impl LoggingService {
pub fn new() -> Self {
Self {
clients: Arc::new(Mutex::new(HashMap::new())),
server_handle: Arc::new(Mutex::new(None)),
log_all_messages: Arc::new(Mutex::new(false)), }
}
pub async fn init(&self, config: &LogConfig) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
{
let mut log_all = self.log_all_messages.lock().await;
*log_all = config.log_all_messages;
}
let service_clone = self.clone();
let _guard = setup_logging(config, Some(service_clone))?;
info!("Logger initialized with output: {:?}", config.output);
match &config.output {
LogOutput::File => {
info!(
"File logging enabled - path: {}, filename: {}",
config.file_path.as_deref().unwrap_or("default"),
config.file_name.as_deref().unwrap_or("app.log")
);
}
LogOutput::Grpc => {
if let Some(grpc_config) = &config.grpc {
info!(
"GRPC logging enabled - server running on {}:{}",
grpc_config.address, grpc_config.port
);
}
}
LogOutput::Console => {
info!("Console logging enabled");
}
}
info!("Log level set to: {}", config.level);
if config.debug_mode.enabled {
let interval_secs = config.debug_mode.test_interval_secs.max(1);
tokio::spawn(async move {
let mut interval =
tokio::time::interval(tokio::time::Duration::from_secs(interval_secs));
loop {
interval.tick().await;
trace!("Test log message from server");
}
});
info!(
"Debug mode enabled: sending test messages every {} seconds",
interval_secs
);
}
self.start_server(config).await
}
async fn start_server(&self, config: &LogConfig) -> Result<(), Box<dyn std::error::Error + Sync + Send>> {
let addr = match &config.grpc {
Some(grpc_config) => format!("{}:{}", grpc_config.address, grpc_config.port),
None => "0.0.0.0:50052".to_string(),
}
.parse()?;
let descriptor_set = include_bytes!(concat!(env!("OUT_DIR"), "/logging_descriptor.bin"));
let reflection_service = Builder::configure()
.register_encoded_file_descriptor_set(descriptor_set)
.build_v1()?;
let cors = CorsLayer::new()
.allow_origin(Any)
.allow_headers(Any)
.allow_methods(Any)
.expose_headers(Any);
let service = self.clone();
let handle = tokio::spawn(async move {
match Server::builder()
.accept_http1(true)
.max_concurrent_streams(128) .tcp_keepalive(Some(std::time::Duration::from_secs(60)))
.tcp_nodelay(true)
.layer(cors) .layer(GrpcWebLayer::new())
.add_service(LogServiceServer::new(service))
.add_service(reflection_service) .serve_with_shutdown(addr, async {
tokio::signal::ctrl_c().await.ok();
info!("Shutting down server...");
})
.await
{
Ok(_) => Ok(()),
Err(e) => {
if e.to_string().contains("Address already in use") {
tracing::error!("Port already in use. Please stop other instances first.");
}
Err(e.into())
}
}
});
let mut server_handle = self.server_handle.lock().await;
*server_handle = Some(handle);
Ok(())
}
pub async fn broadcast_log(&self, log: LogMessage) {
let clients = self.clients.lock().await;
let log_all = self.log_all_messages.lock().await;
let mut dead_clients = Vec::new();
for (client_id, sender) in clients.iter() {
if *log_all || !is_internal_message(&log) {
if let Err(_) = sender.send(log.clone()) {
dead_clients.push(client_id.clone());
}
}
}
drop(clients);
if !dead_clients.is_empty() {
let mut clients = self.clients.lock().await;
for client_id in dead_clients {
clients.remove(&client_id);
warn!("Removed disconnected client: {}", client_id);
}
}
}
}
#[tonic::async_trait]
impl LogService for LoggingService {
type SubscribeToLogsStream = Pin<Box<dyn Stream<Item = Result<LogMessage, Status>> + Send>>;
async fn subscribe_to_logs(
&self,
request: Request<SubscribeRequest>,
) -> Result<Response<Self::SubscribeToLogsStream>, Status> {
let metadata = request.metadata();
info!("📝 Request headers: {:?}", metadata);
let client_id = request.into_inner().client_id;
info!("🔌 New client connected: {}", client_id);
let (tx, rx) = mpsc::unbounded_channel();
{
let mut clients = self.clients.lock().await;
clients.insert(client_id.clone(), tx);
}
let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx);
let test_message = LogMessage {
whoami: None, timestamp: Some(chrono::Utc::now().to_rfc3339()),
level: Some("INFO".to_string()),
message: format!("Test message for client {}", client_id),
target: None, thread_id: None, file: None, line: None, };
let client_id_for_end = client_id.clone();
let client_id_for_log = client_id.clone();
self.broadcast_log(test_message).await;
let mapped_stream = Box::pin(
stream
.map(move |result| {
if let Some(target) = &result.target {
if !target.starts_with("h2::")
&& !target.starts_with("tonic::")
&& !target.starts_with("tonic_web::")
&& target == "grpc_logger"
{
info!("📤 Sending log to client {}: {:?}", client_id_for_log, result);
}
}
Ok(result)
})
.chain(futures::stream::once(async move {
info!("🏁 Stream ending for client {}", client_id_for_end);
Err(Status::ok("Stream complete"))
})),
);
info!("✅ Stream setup complete for client: {}", client_id);
Ok(Response::new(mapped_stream))
}
}
fn is_internal_message(log: &LogMessage) -> bool {
const INTERNAL_PREFIXES: &[&str] = &[
"h2::",
"tonic::",
"hyper::",
"tower::",
"runtime::",
"http::",
];
if let Some(target) = &log.target {
INTERNAL_PREFIXES.iter().any(|prefix| target.starts_with(prefix))
} else {
false
}
}