nexus-acto-rs 0.4.2

A Rust crate for Actors
Documentation
use std::fmt::{Debug, Formatter};
use std::future::Future;
use std::hash::{Hash, Hasher};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;

use dashmap::DashMap;
use futures::future::BoxFuture;
use siphasher::sip::SipHasher;
use tokio::sync::Mutex;

use crate::actor::actor::actor_process::ActorProcess;
use crate::actor::actor::pid::ExtendedPid;
use crate::actor::actor::Pid;
use crate::actor::actor_system::ActorSystem;
use crate::actor::process::{Process, ProcessHandle};

const LOCAL_ADDRESS: &str = "nonhost";

#[derive(Debug, Clone)]
pub struct ProcessRegistry {
  sequence_id: Arc<AtomicU64>,
  actor_system: ActorSystem,
  address: String,
  local_pids: SliceMap,
  remote_handlers: Arc<Mutex<Vec<AddressResolver>>>,
}

#[derive(Debug, Clone)]
pub struct SliceMap {
  local_pids: Arc<Vec<DashMap<String, ProcessHandle>>>,
}

impl SliceMap {
  fn new() -> Self {
    let mut local_pids = Vec::with_capacity(1024);
    for _ in 0..1024 {
      local_pids.push(DashMap::new());
    }
    Self {
      local_pids: Arc::new(local_pids),
    }
  }

  fn get_bucket(&self, key: &str) -> &DashMap<String, ProcessHandle> {
    let mut hasher = SipHasher::new();
    key.hash(&mut hasher);
    let hash = hasher.finish();
    let index = (hash % 1024) as usize;
    &self.local_pids[index]
  }
}

#[derive(Clone)]
pub struct AddressResolver(Arc<dyn Fn(&ExtendedPid) -> BoxFuture<'static, Option<ProcessHandle>> + Send + Sync>);

impl Debug for AddressResolver {
  fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
    write!(f, "AddressResolver")
  }
}

impl PartialEq for AddressResolver {
  fn eq(&self, other: &Self) -> bool {
    Arc::ptr_eq(&self.0, &other.0)
  }
}

impl Eq for AddressResolver {}

impl Hash for AddressResolver {
  fn hash<H: Hasher>(&self, state: &mut H) {
    (self.0.as_ref() as *const dyn Fn(&ExtendedPid) -> BoxFuture<'static, Option<ProcessHandle>>).hash(state);
  }
}

impl AddressResolver {
  pub fn new<F, Fut>(f: F) -> Self
  where
    F: Fn(&ExtendedPid) -> Fut + Send + Sync + 'static,
    Fut: Future<Output = Option<ProcessHandle>> + Send + 'static, {
    AddressResolver(Arc::new(move |p| Box::pin(f(p))))
  }

  pub async fn run(&self, pid: &ExtendedPid) -> Option<ProcessHandle> {
    (self.0)(pid).await
  }
}

impl ProcessRegistry {
  pub fn new(actor_system: ActorSystem) -> Self {
    ProcessRegistry {
      sequence_id: Arc::new(AtomicU64::new(0)),
      actor_system,
      address: LOCAL_ADDRESS.to_string(),
      local_pids: SliceMap::new(),
      remote_handlers: Arc::new(Mutex::new(Vec::new())),
    }
  }

  pub async fn register_address_resolver(&mut self, handler: AddressResolver) {
    let mut mg = self.remote_handlers.lock().await;
    mg.push(handler);
  }

  pub fn get_address(&self) -> String {
    self.address.clone()
  }

  pub fn next_id(&self) -> String {
    let counter = self.sequence_id.fetch_add(1, Ordering::SeqCst);
    uint64_to_id(counter)
  }

  pub fn add_process(&self, process: ProcessHandle, id: &str) -> (ExtendedPid, bool) {
    let bucket = self.local_pids.get_bucket(&id);
    let pid = Pid {
      address: self.address.clone(),
      id: id.to_string(),
      request_id: 0,
    };
    let pid = ExtendedPid::new(pid, self.actor_system.clone());
    let inserted = bucket.insert(id.to_string(), process).is_none();
    (pid, inserted)
  }

  pub fn remove_process(&self, pid: &ExtendedPid) {
    let bucket = self.local_pids.get_bucket(pid.id());
    if let Some((_, process)) = bucket.remove(pid.id()) {
      if let Some(actor_process) = process.as_any().downcast_ref::<ActorProcess>() {
        actor_process.set_dead();
      }
    }
  }

  pub async fn get_process(&self, pid: &ExtendedPid) -> Option<ProcessHandle> {
    if pid.address() != LOCAL_ADDRESS && pid.address() != self.address {
      {
        let mg = self.remote_handlers.lock().await;
        for handler in mg.iter() {
          if let Some(process) = handler.run(pid).await {
            return Some(process);
          }
        }
      }
      return Some(self.actor_system.get_dead_letter().await);
    }

    self.get_local_process(pid.id()).await
  }

  pub async fn get_local_process(&self, id: &str) -> Option<ProcessHandle> {
    let bucket = self.local_pids.get_bucket(id);
    let result = bucket.get(id);
    match result {
      Some(r) => Some(r.clone()),
      None => Some(self.actor_system.get_dead_letter().await),
    }
  }
}

fn uint64_to_id(u: u64) -> String {
  const DIGITS: &[u8] = b"0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ~+";
  let mut buf = [0u8; 13];
  let mut i = 12;
  let mut u = u;
  while u >= 64 {
    buf[i] = DIGITS[(u & 0x3f) as usize];
    u >>= 6;
    i -= 1;
  }
  buf[i] = DIGITS[u as usize];
  i -= 1;
  buf[i] = b'$';
  String::from_utf8(buf[i..].to_vec()).unwrap()
}