nexus-acto-rs 0.4.2

A Rust crate for Actors
Documentation
use std::sync::atomic::{AtomicI32, AtomicUsize, Ordering};
use std::sync::Arc;

use once_cell::sync::Lazy;
use tokio::sync::RwLock;

use crate::log::log::LogLevel;
use crate::log::log_event::LogEvent;
use crate::log::log_event_handler::LogEventHandler;
use crate::log::log_subscription::LogSubscription;

pub static LOG_EVENT_STREAM: Lazy<Arc<LogEventStream>> = Lazy::new(|| LogEventStream::new());

#[derive(Debug, Clone)]
pub struct LogEventStream {
  pub(crate) subscriptions: Arc<RwLock<Vec<Arc<LogSubscription>>>>,
}

impl LogEventStream {
  pub fn new() -> Arc<Self> {
    Arc::new(Self {
      subscriptions: Arc::new(RwLock::new(Vec::new())),
    })
  }

  pub async fn subscribe<F, Fut>(self: &Arc<Self>, f: F) -> Arc<LogSubscription>
  where
    F: Fn(LogEvent) -> Fut + Send + Sync + 'static,
    Fut: futures::Future<Output = ()> + Send + 'static, {
    let mut subscriptions = self.subscriptions.write().await;
    let sub = Arc::new(LogSubscription {
      event_stream: Arc::downgrade(&self.clone()),
      index: Arc::new(AtomicUsize::new(subscriptions.len())),
      func: LogEventHandler::new(f),
      min_level: Arc::new(AtomicI32::new(LogLevel::Min as i32)),
    });
    subscriptions.push(Arc::clone(&sub));
    sub
  }

  pub async fn unsubscribe(&self, sub: &Arc<LogSubscription>) {
    let mut subscriptions = self.subscriptions.write().await;
    if let Some(index) = subscriptions.iter().position(|s| Arc::ptr_eq(s, sub)) {
      let last = subscriptions.len() - 1;
      subscriptions.swap(index, last);
      if let Some(swapped) = subscriptions.get(index) {
        swapped.index.store(index, Ordering::Relaxed);
      }
      subscriptions.pop();
    }
  }

  pub async fn publish(&self, evt: LogEvent) {
    let subscriptions = self.subscriptions.read().await;
    for sub in subscriptions.iter() {
      if evt.level >= LogLevel::try_from(sub.min_level.load(Ordering::Relaxed)).unwrap() {
        sub.func.clone().run(evt.clone()).await;
      }
    }
  }

  pub async fn clear(&self) {
    let mut subscriptions = self.subscriptions.write().await;
    subscriptions.clear();
  }
}

pub async fn subscribe_stream<F, Fut>(event_stream: &Arc<LogEventStream>, f: F) -> Arc<LogSubscription>
where
  F: Fn(LogEvent) -> Fut + Send + Sync + 'static,
  Fut: futures::Future<Output = ()> + Send + 'static, {
  event_stream.subscribe(f).await
}

pub async fn unsubscribe_stream(sub: &Arc<LogSubscription>) {
  if let Some(event_stream) = sub.event_stream.upgrade() {
    event_stream.unsubscribe(sub).await;
  }
}

pub async fn publish_to_stream(event_stream: &Arc<LogEventStream>, evt: LogEvent) {
  event_stream.publish(evt).await;
}

pub async fn reset_event_stream() {
  LOG_EVENT_STREAM.clear().await;
}