nexus-acto-rs 0.4.2

A Rust crate for Actors
Documentation
use std::io::{self, Write};
use std::sync::Arc;

use once_cell::sync::Lazy;
use time::OffsetDateTime;
use tokio::sync::{mpsc, Mutex};

use crate::log::io_encoder::IoEncoder;
use crate::log::log::LogLevel;
use crate::log::log_caller::LogCallerInfo;
use crate::log::log_event::LogEvent;
use crate::log::log_event_stream::{unsubscribe_stream, LOG_EVENT_STREAM};
use crate::log::log_subscription::LogSubscription;

pub struct IoLogger {
  sender: mpsc::Sender<LogEvent>,
  out: Arc<Mutex<Box<dyn Write + Send>>>,
}

static GLOBAL_LOGGER: Lazy<Mutex<Option<Arc<IoLogger>>>> = Lazy::new(|| Mutex::new(None));
static NO_STD_ERR_LOGS: Lazy<Mutex<bool>> = Lazy::new(|| Mutex::new(false));
static SUB: Lazy<Mutex<Option<Arc<LogSubscription>>>> = Lazy::new(|| Mutex::new(None));

pub async fn set_no_std_err_logs() {
  let subscriptions_count = LOG_EVENT_STREAM.subscriptions.read().await.len();
  if subscriptions_count >= 2 {
    let mut no_std_err_logs = NO_STD_ERR_LOGS.lock().await;
    *no_std_err_logs = true;
  }
}

pub async fn init() {
  let (sender, receiver) = mpsc::channel(100);
  let logger = Arc::new(IoLogger {
    sender,
    out: Arc::new(Mutex::new(Box::new(io::stderr()))),
  });

  {
    let mut global_logger = GLOBAL_LOGGER.lock().await;
    *global_logger = Some(logger.clone());
  }

  let logger_for_subscriber = logger.clone();
  reset_subscription_with(move |evt: LogEvent| {
    let logger_for_subscriber = logger_for_subscriber.clone();
    async move {
      let _ = logger_for_subscriber.sender.try_send(evt);
    }
  })
  .await;

  tokio::spawn(async move {
    listen_event(receiver, logger.out.clone()).await;
  });
}

pub async fn reset_no_std_err_logs() {
  let mut no_std_err_logs = NO_STD_ERR_LOGS.lock().await;
  *no_std_err_logs = false;
}

pub async fn reset_global_logger() {
  let mut logger = GLOBAL_LOGGER.lock().await;
  *logger = None;
}

pub async fn reset_subscription_with<F, Fut>(f: F)
where
  F: Fn(LogEvent) -> Fut + Send + Sync + 'static,
  Fut: futures::Future<Output = ()> + Send + 'static, {
  let mut sub = SUB.lock().await;
  if let Some(old_sub) = sub.take() {
    unsubscribe_stream(&old_sub).await;
  }
  *sub = Some(LOG_EVENT_STREAM.subscribe(f).await);
}

pub async fn reset_subscription() {
  let mut sub = SUB.lock().await;
  if let Some(old_sub) = sub.take() {
    unsubscribe_stream(&old_sub).await;
  }
  *sub = None;
}

async fn listen_event(mut receiver: mpsc::Receiver<LogEvent>, out: Arc<Mutex<Box<dyn Write + Send>>>) {
  while let Some(event) = receiver.recv().await {
    if *NO_STD_ERR_LOGS.lock().await {
      if let Some(sub) = SUB.lock().await.take() {
        LOG_EVENT_STREAM.unsubscribe(&sub).await;
      }
      break;
    }
    let buf = write_event(&event).await;
    let mut out = out.lock().await;
    out.write_all(&buf).unwrap();
    out.flush().unwrap();
  }
}

async fn write_event(event: &LogEvent) -> Vec<u8> {
  let mut buf = Vec::new();
  format_header(&mut buf, &event.prefix, event.time, event.level);
  if let Some(caller) = &event.caller {
    if caller.line > 0 {
      format_caller(&mut buf, caller);
    }
  }
  if !event.message.is_empty() {
    buf.extend_from_slice(event.message.as_bytes());
    buf.push(b' ');
  }
  let mut encoder = IoEncoder::new(&mut buf);
  for field in &event.context {
    field.encode(&mut encoder);
    encoder.write_space();
  }
  for field in &event.fields {
    field.encode(&mut encoder);
    encoder.write_space();
  }
  encoder.write_newline();
  buf
}

fn format_header(buf: &mut Vec<u8>, prefix: &str, time: OffsetDateTime, level: LogLevel) {
  write!(buf, "{} {} ", time, level).unwrap();
  if !prefix.is_empty() {
    buf.extend_from_slice(prefix.as_bytes());
  }
  buf.push(b'\t');
}

fn format_caller(buf: &mut Vec<u8>, caller: &LogCallerInfo) {
  let fname = caller.short_file_name();
  write!(buf, "{}:{}", fname, caller.line).unwrap();
  let v = 32 - fname.len();
  if v > 16 {
    buf.extend_from_slice(&[b'\t', b'\t', b'\t']);
  } else if v > 8 {
    buf.extend_from_slice(&[b'\t', b'\t']);
  } else {
    buf.push(b'\t');
  }
}

pub async fn log(event: LogEvent) {
  if let Some(logger) = GLOBAL_LOGGER.lock().await.as_ref() {
    let _ = logger.sender.try_send(event);
  }
}