#![allow(clippy::incompatible_msrv)]
extern crate ractor;
use std::sync::Arc;
use ractor::Actor;
use ractor::ActorProcessingErr;
use ractor::ActorRef;
use ractor::OutputPort;
use tokio::time::timeout;
use tokio::time::Duration;
enum PublisherMessage {
Publish(String),
}
#[cfg(feature = "cluster")]
impl ractor::Message for PublisherMessage {}
#[derive(Clone)]
struct Output(String);
#[cfg(feature = "cluster")]
impl ractor::Message for Output {}
struct Publisher;
#[cfg_attr(feature = "async-trait", ractor::async_trait)]
impl Actor for Publisher {
type Msg = PublisherMessage;
type State = Arc<OutputPort<Output>>;
type Arguments = Arc<OutputPort<Output>>;
async fn pre_start(
&self,
_myself: ActorRef<Self::Msg>,
port: Arc<OutputPort<Output>>,
) -> Result<Self::State, ActorProcessingErr> {
Ok(port)
}
async fn handle(
&self,
_myself: ActorRef<Self::Msg>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
Self::Msg::Publish(msg) => {
tracing::info!("Publishing {msg}");
state.send(Output(format!("Published: {msg}")));
}
}
Ok(())
}
}
struct Subscriber;
enum SubscriberMessage {
Published(String),
}
#[cfg(feature = "cluster")]
impl ractor::Message for SubscriberMessage {}
#[cfg_attr(feature = "async-trait", ractor::async_trait)]
impl Actor for Subscriber {
type Msg = SubscriberMessage;
type State = ();
type Arguments = ();
async fn pre_start(
&self,
_myself: ActorRef<Self::Msg>,
_: (),
) -> Result<Self::State, ActorProcessingErr> {
Ok(())
}
async fn handle(
&self,
myself: ActorRef<Self::Msg>,
message: Self::Msg,
_state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
Self::Msg::Published(msg) => {
tracing::info!("Subscriber ({myself:?}) received published message '{msg}'");
}
}
Ok(())
}
}
fn init_logging() {
let dir = tracing_subscriber::filter::Directive::from(tracing::Level::DEBUG);
use std::io::stderr;
use std::io::IsTerminal;
use tracing_glog::Glog;
use tracing_glog::GlogFields;
use tracing_subscriber::filter::EnvFilter;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::Registry;
let fmt = tracing_subscriber::fmt::Layer::default()
.with_ansi(stderr().is_terminal())
.with_writer(std::io::stderr)
.event_format(Glog::default().with_timer(tracing_glog::LocalTime::default()))
.fmt_fields(GlogFields::default().compact());
let filter = vec![dir]
.into_iter()
.fold(EnvFilter::from_default_env(), |filter, directive| {
filter.add_directive(directive)
});
let subscriber = Registry::default().with(filter).with(fmt);
tracing::subscriber::set_global_default(subscriber).expect("to set global subscriber");
}
#[ractor_example_entry_proc::ractor_example_entry]
async fn main() {
init_logging();
let port = Arc::new(OutputPort::default());
let (publisher_ref, publisher_handle) = Actor::spawn(None, Publisher, port.clone())
.await
.expect("Failed to start publisher");
let mut subscriber_refs = vec![];
let mut subscriber_handles = vec![];
for _ in 0..10 {
let (actor_ref, actor_handle) = Actor::spawn(None, Subscriber, ())
.await
.expect("Failed to start subscriber");
port.subscribe(actor_ref.clone(), |msg| {
Some(SubscriberMessage::Published(msg.0))
});
subscriber_refs.push(actor_ref);
subscriber_handles.push(actor_handle);
}
for i in 0..3 {
publisher_ref
.cast(PublisherMessage::Publish(format!("Something {i}")))
.expect("Send failed");
tokio::time::sleep(Duration::from_millis(500)).await;
}
publisher_ref.stop(None);
for actor in subscriber_refs {
actor.stop(None);
}
timeout(Duration::from_millis(50), publisher_handle)
.await
.expect("Actor failed to exit cleanly")
.unwrap();
for s in subscriber_handles.into_iter() {
timeout(Duration::from_millis(50), s)
.await
.expect("Actor failed to exit cleanly")
.unwrap();
}
}