use crate::ActorRef;
use crate::Message;
#[cfg(test)]
mod tests;
pub trait OutputMessage: Message + Clone {}
impl<T: Message + Clone> OutputMessage for T {}
#[cfg(not(feature = "output-port-v2"))]
pub use v1::OutputPort;
#[cfg(feature = "output-port-v2")]
pub use v2::OutputPort;
#[cfg(not(feature = "output-port-v2"))]
mod v1 {
use std::fmt::Debug;
use std::sync::RwLock;
use tokio::sync::broadcast as pubsub;
use crate::concurrency::JoinHandle;
use crate::{ActorRef, Message, OutputMessage};
pub struct OutputPort<TMsg>
where
TMsg: OutputMessage,
{
tx: pubsub::Sender<Option<TMsg>>,
subscriptions: RwLock<Vec<OutputPortSubscription>>,
}
impl<TMsg: OutputMessage> Debug for OutputPort<TMsg> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "OutputPort({})", std::any::type_name::<TMsg>())
}
}
impl<TMsg> Default for OutputPort<TMsg>
where
TMsg: OutputMessage,
{
fn default() -> Self {
let (tx, _rx) = pubsub::channel(10);
Self {
tx,
subscriptions: RwLock::new(vec![]),
}
}
}
impl<TMsg> OutputPort<TMsg>
where
TMsg: OutputMessage,
{
pub fn subscribe<TReceiverMsg, F>(&self, receiver: ActorRef<TReceiverMsg>, converter: F)
where
F: Fn(TMsg) -> Option<TReceiverMsg> + Send + 'static,
TReceiverMsg: Message,
{
let mut subs = self.subscriptions.write().unwrap();
subs.retain(|sub| !sub.is_dead());
let sub = OutputPortSubscription::new::<TMsg, F, TReceiverMsg>(
self.tx.subscribe(),
converter,
receiver,
);
subs.push(sub);
}
pub fn send(&self, msg: TMsg) {
if self.tx.receiver_count() > 0 {
let _ = self.tx.send(Some(msg));
}
}
}
struct OutputPortSubscription {
handle: JoinHandle<()>,
}
impl OutputPortSubscription {
pub(crate) fn is_dead(&self) -> bool {
self.handle.is_finished()
}
pub(crate) fn new<TMsg, F, TReceiverMsg>(
mut port: pubsub::Receiver<Option<TMsg>>,
converter: F,
receiver: ActorRef<TReceiverMsg>,
) -> Self
where
TMsg: OutputMessage,
F: Fn(TMsg) -> Option<TReceiverMsg> + Send + 'static,
TReceiverMsg: Message,
{
let handle = crate::concurrency::spawn(async move {
while let Ok(Some(msg)) = port.recv().await {
if let Some(new_msg) = converter(msg) {
if receiver.cast(new_msg).is_err() {
return;
}
}
}
});
Self { handle }
}
}
}
#[cfg(feature = "output-port-v2")]
mod v2 {
use crate::{ActorId, ActorRef, Message, OutputMessage};
use std::fmt::Debug;
pub struct OutputPort<TMsg>
where
TMsg: OutputMessage,
{
inner: inner::OutputPort<ActorId, TMsg>,
}
impl<TMsg: OutputMessage> Debug for OutputPort<TMsg> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "OutputPort({})", std::any::type_name::<TMsg>())
}
}
impl<TMsg> Default for OutputPort<TMsg>
where
TMsg: OutputMessage,
{
fn default() -> Self {
Self {
inner: inner::OutputPort::default(),
}
}
}
impl<TMsg> OutputPort<TMsg>
where
TMsg: OutputMessage,
{
pub fn subscribe<TReceiverMsg, F>(&self, receiver: ActorRef<TReceiverMsg>, converter: F)
where
F: Fn(TMsg) -> Option<TReceiverMsg> + Send + 'static,
TReceiverMsg: Message,
{
self.inner.subscribe(receiver, converter)
}
pub fn send(&self, msg: TMsg) {
self.inner.send(msg)
}
}
mod inner {
use super::OutputMessage;
use crate::concurrency::{mpsc_unbounded, MpscUnboundedSender};
use crate::{ActorId, ActorRef, DerivedActorRef, Message};
#[cfg(feature = "tokio_runtime")]
const CONSUME_BUDGET_FACTOR: u32 = 32;
const MAX_BATCH_SIZE: usize = 32;
enum OutportMessage<Id, TMsg> {
Data(TMsg),
SetSubscriber(Option<Box<dyn Subscriber<Id, TMsg>>>),
}
pub(super) trait Subscriber<Id, TMsg: OutputMessage>: Send + 'static {
fn send(&self, value: &TMsg) -> bool;
fn id(&self) -> Id;
}
#[derive(Debug, Clone)]
pub(super) struct OutputPort<Id, TMsg>(MpscUnboundedSender<OutportMessage<Id, TMsg>>);
impl<Id: Send + 'static + PartialEq + Clone + Sync, TMsg: OutputMessage> Default
for OutputPort<Id, TMsg>
{
fn default() -> Self {
Self::new(true)
}
}
impl<Id: Send + 'static + PartialEq + Clone + Sync, TMsg: OutputMessage> OutputPort<Id, TMsg> {
pub(super) fn new(allow_duplicate_subscription: bool) -> Self {
let (tx, mut rx) = mpsc_unbounded::<OutportMessage<Id, TMsg>>();
crate::concurrency::spawn(async move {
let mut subscribers = Vec::<(Id, Box<dyn Subscriber<Id, TMsg>>)>::new();
let mut batch = Vec::new();
loop {
let l = rx.len().clamp(1, MAX_BATCH_SIZE);
if rx.recv_many(&mut batch, l).await == 0 {
break;
}
let mut i = 0;
let mut coop_count = 0u32;
'subs: while i < subscribers.len() {
for msg in batch.iter_mut() {
match msg {
OutportMessage::Data(v) => {
if !subscribers[i].1.send(v) {
subscribers.remove(i);
continue 'subs;
} else {
coop_count = coop_count.wrapping_add(1);
#[cfg(feature = "tokio_runtime")]
if coop_count % CONSUME_BUDGET_FACTOR == 0 {
tokio::task::coop::consume_budget().await
}
}
}
OutportMessage::SetSubscriber(opt_subscriber) => {
let sid = if let Some(subscriber) = opt_subscriber {
let sid = subscriber.id();
if sid != subscribers[i].0 {
continue;
}
sid
} else {
continue;
};
let subscriber = opt_subscriber.take().unwrap();
if !allow_duplicate_subscription {
if let Some((_, prev_subscriber)) =
subscribers.iter_mut().find(|(id, _)| id == &sid)
{
*prev_subscriber = subscriber;
} else {
subscribers.push((subscriber.id(), subscriber));
}
} else {
subscribers.push((subscriber.id(), subscriber));
}
}
}
}
i += 1;
}
let i0 = i;
for msg in batch.drain(..) {
match msg {
OutportMessage::Data(v) => {
let mut i = i0;
while i < subscribers.len() {
if !subscribers[i].1.send(&v) {
subscribers.remove(i);
} else {
i += 1;
#[cfg(feature = "tokio_runtime")]
if coop_count % CONSUME_BUDGET_FACTOR == 0 {
tokio::task::coop::consume_budget().await
}
coop_count = coop_count.wrapping_add(1);
}
}
}
OutportMessage::SetSubscriber(Some(subscriber)) => {
let sid = subscriber.id();
if !allow_duplicate_subscription {
if let Some((_, prev_subscriber)) =
subscribers.iter_mut().find(|(id, _)| id == &sid)
{
*prev_subscriber = subscriber;
} else {
subscribers.push((subscriber.id(), subscriber));
}
} else {
subscribers.push((subscriber.id(), subscriber));
}
}
OutportMessage::SetSubscriber(None) => (),
}
}
}
});
Self(tx)
}
pub(super) fn send(&self, value: TMsg) {
_ = self.0.send(OutportMessage::Data(value));
}
}
impl<TMsg: OutputMessage> OutputPort<ActorId, TMsg> {
pub(super) fn subscribe<TReceiverMsg, F>(
&self,
receiver: ActorRef<TReceiverMsg>,
converter: F,
) where
F: Fn(TMsg) -> Option<TReceiverMsg> + Send + 'static,
TReceiverMsg: Message,
{
self.set_subscriber_with_filter(receiver, move |msg| converter(msg.clone()))
}
pub(super) fn set_subscriber_with_filter<R: ActorReference>(
&self,
actor_ref: R,
filter: impl Fn(&TMsg) -> Option<R::Msg> + Send + 'static,
) {
_ = self
.0
.send(OutportMessage::SetSubscriber(Some(Box::new(Filtering {
actor_ref,
filter,
}))));
}
}
impl<T: OutputMessage, U: Message> Subscriber<ActorId, T> for ActorRef<U>
where
U: TryFrom<T>,
{
fn send(&self, value: &T) -> bool {
if let Ok(value) = value.clone().try_into() {
self.send_message(value).is_ok()
} else {
true
}
}
fn id(&self) -> ActorId {
self.get_id()
}
}
impl<T: OutputMessage> Subscriber<ActorId, T> for DerivedActorRef<T> {
fn send(&self, value: &T) -> bool {
self.send_message(value.clone()).is_ok()
}
fn id(&self) -> ActorId {
self.get_id()
}
}
struct Filtering<T, F> {
pub actor_ref: T,
pub filter: F,
}
impl<T: ActorReference, U: OutputMessage, F: Fn(&U) -> Option<T::Msg> + Send + 'static>
Subscriber<ActorId, U> for Filtering<T, F>
{
fn send(&self, value: &U) -> bool {
if let Some(v) = (self.filter)(value) {
self.actor_ref.send_message(v)
} else {
true
}
}
fn id(&self) -> ActorId {
self.actor_ref.id()
}
}
pub(super) trait ActorReference: Send + Sync + 'static {
type Msg: Message;
fn send_message(&self, value: Self::Msg) -> bool;
fn id(&self) -> ActorId;
}
impl<T: Message> ActorReference for ActorRef<T> {
type Msg = T;
fn send_message(&self, value: T) -> bool {
self.send_message(value).is_ok()
}
fn id(&self) -> ActorId {
self.get_id()
}
}
impl<T: Message> ActorReference for DerivedActorRef<T> {
type Msg = T;
fn send_message(&self, value: T) -> bool {
self.send_message(value).is_ok()
}
fn id(&self) -> ActorId {
self.get_id()
}
}
}
}
pub type OutputPortSubscriber<InputMessage> = Box<dyn OutputPortSubscriberTrait<InputMessage>>;
pub trait OutputPortSubscriberTrait<I>: Send
where
I: Message + Clone,
{
fn subscribe_to_port(&self, port: &OutputPort<I>);
}
impl<I, O> OutputPortSubscriberTrait<I> for ActorRef<O>
where
I: Message + Clone,
O: Message + From<I>,
{
fn subscribe_to_port(&self, port: &OutputPort<I>) {
port.subscribe(self.clone(), |msg| Some(O::from(msg)));
}
}