reifydb-runtime 0.4.12

Runtime infrastructure for ReifyDB
Documentation
// SPDX-License-Identifier: Apache-2.0
// Copyright (c) 2025 ReifyDB

//! Native mailbox implementation using crossbeam-channel.

use std::{fmt, sync, sync::Arc, time::Duration};

use crossbeam_channel::{
	Receiver, RecvTimeoutError as CcRecvTimeoutError, SendError as CcSendError, Sender,
	TryRecvError as CcTryRecvError, TrySendError as CcTrySendError, bounded, unbounded,
};

use super::{ActorRef, RecvError, RecvTimeoutError, SendError, TryRecvError};

/// Native implementation of ActorRef inner.
///
/// Uses `crossbeam-channel` for lock-free message passing.
/// The notify callback is shared (via Arc) so that all clones of an ActorRef
/// see the callback once it is set — even clones created before `set_notify`.
pub struct ActorRefInner<M> {
	pub(crate) tx: Sender<M>,
	notify: Arc<sync::OnceLock<Arc<dyn Fn() + Send + Sync>>>,
}

impl<M> Clone for ActorRefInner<M> {
	fn clone(&self) -> Self {
		Self {
			tx: self.tx.clone(),
			notify: Arc::clone(&self.notify),
		}
	}
}

impl<M> fmt::Debug for ActorRefInner<M> {
	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
		f.debug_struct("ActorRefInner").field("capacity", &self.tx.capacity()).finish()
	}
}

impl<M: Send> ActorRefInner<M> {
	/// Create a new ActorRefInner from a sender.
	pub(crate) fn new(tx: Sender<M>) -> Self {
		Self {
			tx,
			notify: Arc::new(sync::OnceLock::new()),
		}
	}

	/// Set the notify callback, called on successful send to wake the actor.
	pub(crate) fn set_notify(&self, f: Arc<dyn Fn() + Send + Sync>) {
		let _ = self.notify.set(f);
	}

	/// Send a message (non-blocking, may fail if mailbox full).
	pub fn send(&self, msg: M) -> Result<(), SendError<M>> {
		match self.tx.try_send(msg) {
			Ok(()) => {
				if let Some(f) = self.notify.get() {
					f();
				}
				Ok(())
			}
			Err(CcTrySendError::Disconnected(m)) => Err(SendError::Closed(m)),
			Err(CcTrySendError::Full(m)) => Err(SendError::Full(m)),
		}
	}

	/// Send a message, blocking if the mailbox is full.
	pub fn send_blocking(&self, msg: M) -> Result<(), SendError<M>> {
		match self.tx.send(msg) {
			Ok(()) => {
				if let Some(f) = self.notify.get() {
					f();
				}
				Ok(())
			}
			Err(CcSendError(m)) => Err(SendError::Closed(m)),
		}
	}

	/// Check if the actor is still alive.
	pub fn is_alive(&self) -> bool {
		!self.tx.is_empty() || self.tx.capacity().is_some()
	}
}

/// Internal receiver for the actor's mailbox.
pub(crate) struct Mailbox<M> {
	pub(crate) rx: Receiver<M>,
}

impl<M> Mailbox<M> {
	/// Try to receive a message without blocking.
	pub fn try_recv(&self) -> Result<M, TryRecvError> {
		match self.rx.try_recv() {
			Ok(msg) => Ok(msg),
			Err(CcTryRecvError::Empty) => Err(TryRecvError::Empty),
			Err(CcTryRecvError::Disconnected) => Err(TryRecvError::Closed),
		}
	}

	/// Receive a message, blocking if necessary.
	pub fn recv(&self) -> Result<M, RecvError> {
		match self.rx.recv() {
			Ok(msg) => Ok(msg),
			Err(_) => Err(RecvError::Closed),
		}
	}

	/// Receive a message with a timeout.
	pub fn recv_timeout(&self, timeout: Duration) -> Result<M, RecvTimeoutError> {
		match self.rx.recv_timeout(timeout) {
			Ok(msg) => Ok(msg),
			Err(CcRecvTimeoutError::Timeout) => Err(RecvTimeoutError::Timeout),
			Err(CcRecvTimeoutError::Disconnected) => Err(RecvTimeoutError::Closed),
		}
	}
}

pub(crate) fn create_mailbox<M: Send>(capacity: Option<usize>) -> (ActorRef<M>, Mailbox<M>) {
	let (tx, rx) = match capacity {
		None => unbounded(),
		Some(n) => bounded(n),
	};

	(
		ActorRef::from_inner(ActorRefInner::new(tx)),
		Mailbox {
			rx,
		},
	)
}