use std::{marker::PhantomData, time::Duration};
use crate::{
Addr, StreamHandler,
actor::service::Service,
addr::OwningAddr,
channel::Channel,
event_loop::{EventLoop, EventLoopConfig},
};
use super::{
Actor, RestartableActor,
handle::ActorHandle,
restart_strategy::{NonRestartable, RecreateFromDefault, RestartOnly, RestartStrategy},
};
pub fn setup_actor<A: Actor>(actor: A) -> ActorBuilder<A> {
ActorBuilder::new(actor)
}
pub trait Configurable<A: Actor> {
fn setup_actor(self) -> ActorBuilder<A>;
}
impl<A: Actor> Configurable<A> for A {
fn setup_actor(self) -> ActorBuilder<A> {
setup_actor(self)
}
}
pub struct ActorBuilder<A: Actor, R: RestartStrategy<A> = RestartOnly> {
actor: A,
config: EventLoopConfig,
channel: Option<Channel<A>>,
_restart: PhantomData<R>,
}
impl<A: Actor> ActorBuilder<A, RestartOnly> {
fn new(actor: A) -> Self {
Self {
actor,
config: EventLoopConfig::default(),
channel: None,
_restart: PhantomData,
}
}
}
impl<A: Actor, R: RestartStrategy<A> + 'static> ActorBuilder<A, R> {
#[must_use]
pub fn bounded(mut self, capacity: usize) -> Self {
self.channel = Some(Channel::bounded(capacity));
self
}
#[must_use]
pub fn unbounded(mut self) -> Self {
self.channel = Some(Channel::unbounded());
self
}
#[must_use]
pub const fn timeout(mut self, timeout: Duration) -> Self {
self.config.timeout = Some(timeout);
self
}
#[must_use]
pub const fn fail_on_timeout(mut self, fail: bool) -> Self {
self.config.fail_on_timeout = fail;
self
}
fn resolve_channel(self) -> (A, EventLoopConfig, Channel<A>) {
let channel = self.channel.unwrap_or_else(Channel::unbounded);
(self.actor, self.config, channel)
}
pub fn spawn(self) -> Addr<A> {
self.spawn_owning().detach()
}
pub fn spawn_owning(self) -> OwningAddr<A> {
let (actor, config, channel) = self.resolve_channel();
let (event_loop, addr) = EventLoop::<A, R>::from_channel(channel)
.with_config(config)
.create(actor);
let handle = ActorHandle::spawn(event_loop);
OwningAddr { addr, handle }
}
}
impl<A: Actor> ActorBuilder<A, RestartOnly> {
pub fn on_stream<S>(self, stream: S) -> StreamActorBuilder<A, S>
where
S: futures::Stream + Unpin + Send + 'static,
S::Item: 'static + Send,
A: StreamHandler<S::Item>,
{
StreamActorBuilder {
actor: self.actor,
config: self.config,
channel: self.channel,
stream,
}
}
}
impl<A, R> ActorBuilder<A, R>
where
A: RestartableActor + Default,
R: RestartStrategy<A> + 'static,
{
#[must_use]
pub fn recreate_from_default(self) -> ActorBuilder<A, RecreateFromDefault> {
ActorBuilder {
actor: self.actor,
config: self.config,
channel: self.channel,
_restart: PhantomData,
}
}
}
impl<A, R> ActorBuilder<A, R>
where
A: Actor + Service,
R: RestartStrategy<A> + 'static,
{
pub async fn register(self) -> crate::error::Result<(Addr<A>, Option<Addr<A>>)> {
self.spawn().register().await
}
}
pub struct StreamActorBuilder<A, S>
where
A: Actor + StreamHandler<S::Item>,
S: futures::Stream + Unpin + Send + 'static,
S::Item: 'static + Send,
{
actor: A,
config: EventLoopConfig,
channel: Option<Channel<A>>,
stream: S,
}
impl<A, S> StreamActorBuilder<A, S>
where
A: Actor + StreamHandler<S::Item>,
S: futures::Stream + Unpin + Send + 'static,
S::Item: 'static + Send,
{
#[must_use]
pub fn bounded(mut self, capacity: usize) -> Self {
self.channel = Some(Channel::bounded(capacity));
self
}
#[must_use]
pub fn unbounded(mut self) -> Self {
self.channel = Some(Channel::unbounded());
self
}
#[must_use]
pub const fn timeout(mut self, timeout: Duration) -> Self {
self.config.timeout = Some(timeout);
self
}
#[must_use]
pub const fn fail_on_timeout(mut self, fail: bool) -> Self {
self.config.fail_on_timeout = fail;
self
}
fn resolve_channel(self) -> (A, EventLoopConfig, Channel<A>, S) {
let channel = self.channel.unwrap_or_else(Channel::unbounded);
(self.actor, self.config, channel, self.stream)
}
pub fn spawn(self) -> Addr<A> {
self.spawn_owning().detach()
}
pub fn spawn_owning(self) -> OwningAddr<A> {
let (actor, config, channel, stream) = self.resolve_channel();
let (event_loop, addr) = EventLoop::<A, NonRestartable>::from_channel(channel)
.with_config(config)
.create_on_stream(actor, stream);
let handle = ActorHandle::spawn(event_loop);
OwningAddr { addr, handle }
}
}