use dactor::node::{ActorId, NodeId};
use dactor::system_actors::*;
use dactor::type_registry::TypeRegistry;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use coerce::actor::context::ActorContext;
use coerce::actor::message::{Handler, Message};
use coerce::actor::Actor;
pub enum SpawnOutcome {
Success {
actor_id: ActorId,
actor: Mutex<Option<Box<dyn std::any::Any + Send>>>,
},
Failure(SpawnResponse),
}
impl SpawnOutcome {
pub fn success(actor_id: ActorId, actor: Box<dyn std::any::Any + Send>) -> Self {
Self::Success {
actor_id,
actor: Mutex::new(Some(actor)),
}
}
pub fn take_actor(&self) -> Option<Box<dyn std::any::Any + Send>> {
match self {
Self::Success { actor, .. } => actor.lock().unwrap_or_else(|e| e.into_inner()).take(),
Self::Failure(_) => None,
}
}
}
pub struct CancelOutcome(pub CancelResponse);
pub type FactoryFn = Box<
dyn Fn(&[u8]) -> Result<Box<dyn std::any::Any + Send>, dactor::remote::SerializationError>
+ Send
+ Sync,
>;
pub struct SpawnManagerActor {
manager: SpawnManager,
node_id: NodeId,
next_local: Arc<AtomicU64>,
}
impl SpawnManagerActor {
pub fn new(node_id: NodeId, registry: TypeRegistry, next_local: Arc<AtomicU64>) -> Self {
Self {
manager: SpawnManager::new(registry),
node_id,
next_local,
}
}
}
#[async_trait::async_trait]
impl Actor for SpawnManagerActor {}
pub struct HandleSpawnRequest(pub SpawnRequest);
impl Message for HandleSpawnRequest {
type Result = SpawnOutcome;
}
#[async_trait::async_trait]
impl Handler<HandleSpawnRequest> for SpawnManagerActor {
async fn handle(&mut self, msg: HandleSpawnRequest, _ctx: &mut ActorContext) -> SpawnOutcome {
match self.manager.create_actor(&msg.0) {
Ok(actor) => {
let local = self.next_local.fetch_add(1, Ordering::SeqCst);
let actor_id = ActorId {
node: self.node_id.clone(),
local,
};
self.manager.record_spawn(actor_id.clone());
SpawnOutcome::success(actor_id, actor)
}
Err(e) => SpawnOutcome::Failure(SpawnResponse::Failure {
request_id: msg.0.request_id.clone(),
error: e.to_string(),
}),
}
}
}
pub struct RegisterFactory {
pub type_name: String,
pub factory: FactoryFn,
}
impl Message for RegisterFactory {
type Result = ();
}
#[async_trait::async_trait]
impl Handler<RegisterFactory> for SpawnManagerActor {
async fn handle(&mut self, msg: RegisterFactory, _ctx: &mut ActorContext) {
self.manager
.type_registry_mut()
.register_factory(msg.type_name, msg.factory);
}
}
pub struct GetSpawnedActors;
impl Message for GetSpawnedActors {
type Result = Vec<ActorId>;
}
#[async_trait::async_trait]
impl Handler<GetSpawnedActors> for SpawnManagerActor {
async fn handle(&mut self, _msg: GetSpawnedActors, _ctx: &mut ActorContext) -> Vec<ActorId> {
self.manager.spawned_actors().to_vec()
}
}
pub struct WatchManagerActor {
manager: WatchManager,
}
impl WatchManagerActor {
pub fn new() -> Self {
Self {
manager: WatchManager::new(),
}
}
}
impl Default for WatchManagerActor {
fn default() -> Self {
Self::new()
}
}
#[async_trait::async_trait]
impl Actor for WatchManagerActor {}
pub struct RemoteWatch {
pub target: ActorId,
pub watcher: ActorId,
}
impl Message for RemoteWatch {
type Result = ();
}
#[async_trait::async_trait]
impl Handler<RemoteWatch> for WatchManagerActor {
async fn handle(&mut self, msg: RemoteWatch, _ctx: &mut ActorContext) {
self.manager.watch(msg.target, msg.watcher);
}
}
pub struct RemoteUnwatch {
pub target: ActorId,
pub watcher: ActorId,
}
impl Message for RemoteUnwatch {
type Result = ();
}
#[async_trait::async_trait]
impl Handler<RemoteUnwatch> for WatchManagerActor {
async fn handle(&mut self, msg: RemoteUnwatch, _ctx: &mut ActorContext) {
self.manager.unwatch(&msg.target, &msg.watcher);
}
}
pub struct OnTerminated(pub ActorId);
impl Message for OnTerminated {
type Result = Vec<WatchNotification>;
}
#[async_trait::async_trait]
impl Handler<OnTerminated> for WatchManagerActor {
async fn handle(
&mut self,
msg: OnTerminated,
_ctx: &mut ActorContext,
) -> Vec<WatchNotification> {
self.manager.on_terminated(&msg.0)
}
}
pub struct GetWatchedCount;
impl Message for GetWatchedCount {
type Result = usize;
}
#[async_trait::async_trait]
impl Handler<GetWatchedCount> for WatchManagerActor {
async fn handle(&mut self, _msg: GetWatchedCount, _ctx: &mut ActorContext) -> usize {
self.manager.watched_count()
}
}
pub struct CancelManagerActor {
manager: CancelManager,
}
impl CancelManagerActor {
pub fn new() -> Self {
Self {
manager: CancelManager::new(),
}
}
}
impl Default for CancelManagerActor {
fn default() -> Self {
Self::new()
}
}
#[async_trait::async_trait]
impl Actor for CancelManagerActor {}
pub struct RegisterCancel {
pub request_id: String,
pub token: tokio_util::sync::CancellationToken,
}
impl Message for RegisterCancel {
type Result = ();
}
#[async_trait::async_trait]
impl Handler<RegisterCancel> for CancelManagerActor {
async fn handle(&mut self, msg: RegisterCancel, _ctx: &mut ActorContext) {
self.manager.register(msg.request_id, msg.token);
}
}
pub struct CancelById(pub String);
impl Message for CancelById {
type Result = CancelOutcome;
}
#[async_trait::async_trait]
impl Handler<CancelById> for CancelManagerActor {
async fn handle(&mut self, msg: CancelById, _ctx: &mut ActorContext) -> CancelOutcome {
CancelOutcome(self.manager.cancel(&msg.0))
}
}
pub struct CompleteRequest(pub String);
impl Message for CompleteRequest {
type Result = ();
}
#[async_trait::async_trait]
impl Handler<CompleteRequest> for CancelManagerActor {
async fn handle(&mut self, msg: CompleteRequest, _ctx: &mut ActorContext) {
self.manager.remove(&msg.0);
}
}
pub struct GetActiveCount;
impl Message for GetActiveCount {
type Result = usize;
}
#[async_trait::async_trait]
impl Handler<GetActiveCount> for CancelManagerActor {
async fn handle(&mut self, _msg: GetActiveCount, _ctx: &mut ActorContext) -> usize {
self.manager.active_count()
}
}
pub struct NodeDirectoryActor {
directory: NodeDirectory,
}
impl NodeDirectoryActor {
pub fn new() -> Self {
Self {
directory: NodeDirectory::new(),
}
}
}
impl Default for NodeDirectoryActor {
fn default() -> Self {
Self::new()
}
}
#[async_trait::async_trait]
impl Actor for NodeDirectoryActor {}
pub struct ConnectPeer {
pub peer_id: NodeId,
pub address: Option<String>,
}
impl Message for ConnectPeer {
type Result = ();
}
#[async_trait::async_trait]
impl Handler<ConnectPeer> for NodeDirectoryActor {
async fn handle(&mut self, msg: ConnectPeer, _ctx: &mut ActorContext) {
if let Some(existing) = self.directory.get_peer(&msg.peer_id) {
let resolved = msg.address.or_else(|| existing.address.clone());
self.directory.remove_peer(&msg.peer_id);
self.directory.add_peer(msg.peer_id.clone(), resolved);
} else {
self.directory.add_peer(msg.peer_id.clone(), msg.address);
}
self.directory
.set_status(&msg.peer_id, PeerStatus::Connected);
}
}
pub struct DisconnectPeer(pub NodeId);
impl Message for DisconnectPeer {
type Result = ();
}
#[async_trait::async_trait]
impl Handler<DisconnectPeer> for NodeDirectoryActor {
async fn handle(&mut self, msg: DisconnectPeer, _ctx: &mut ActorContext) {
self.directory
.set_status(&msg.0, PeerStatus::Disconnected);
}
}
pub struct IsConnected(pub NodeId);
impl Message for IsConnected {
type Result = bool;
}
#[async_trait::async_trait]
impl Handler<IsConnected> for NodeDirectoryActor {
async fn handle(&mut self, msg: IsConnected, _ctx: &mut ActorContext) -> bool {
self.directory.is_connected(&msg.0)
}
}
pub struct GetPeerCount;
impl Message for GetPeerCount {
type Result = usize;
}
#[async_trait::async_trait]
impl Handler<GetPeerCount> for NodeDirectoryActor {
async fn handle(&mut self, _msg: GetPeerCount, _ctx: &mut ActorContext) -> usize {
self.directory.peer_count()
}
}
pub struct GetConnectedCount;
impl Message for GetConnectedCount {
type Result = usize;
}
#[async_trait::async_trait]
impl Handler<GetConnectedCount> for NodeDirectoryActor {
async fn handle(&mut self, _msg: GetConnectedCount, _ctx: &mut ActorContext) -> usize {
self.directory.connected_count()
}
}
pub struct GetPeerInfo(pub NodeId);
impl Message for GetPeerInfo {
type Result = Option<PeerInfo>;
}
#[async_trait::async_trait]
impl Handler<GetPeerInfo> for NodeDirectoryActor {
async fn handle(&mut self, msg: GetPeerInfo, _ctx: &mut ActorContext) -> Option<PeerInfo> {
self.directory.get_peer(&msg.0).cloned()
}
}