use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
use uuid::Uuid;
use super::actor::{Actor, ActorCtx};
use super::context::Publisher;
use super::directory::ResourceDirectory;
use super::executor::{ActorHandle, ActorSlot};
use super::resource::{ResourceCtx, ResourceError, ResourceSnapshot};
use super::transition::{ActorSpec, ResourceSpec};
use crate::events::{EventBus, StreamRegistry};
pub struct NodeBuilder {
id: String,
actor_queue_capacity: usize,
pending_actors: Vec<PendingActor>,
}
impl NodeBuilder {
pub fn new(id: impl Into<String>) -> Self {
Self {
id: id.into(),
actor_queue_capacity: 32,
pending_actors: Vec::new(),
}
}
pub fn actor_queue_capacity(mut self, capacity: usize) -> Self {
self.actor_queue_capacity = capacity.max(1);
self
}
pub fn register_actor<A: Actor>(self, actor: A) -> Self {
let id = Uuid::new_v4().to_string();
self.register_with_id(id, actor)
.unwrap_or_else(|err| panic!("generated actor id must be unique: {err:?}"))
}
pub fn register_with_id<A: Actor>(
mut self,
id: impl Into<String>,
actor: A,
) -> Result<Self, ResourceError> {
let id = id.into();
if self.pending_actors.iter().any(|pending| pending.id == id) {
return Err(ResourceError::Internal(format!(
"duplicate resource id: {id}"
)));
}
self.pending_actors.push(PendingActor::new(id, actor));
Ok(self)
}
pub fn build(self) -> Node {
self.try_build()
.unwrap_or_else(|err| panic!("NodeBuilder::build failed: {err:?}"))
}
pub fn try_build(self) -> Result<Node, ResourceError> {
let stream_registry = StreamRegistry::new();
let bus = EventBus::with_registry(stream_registry.clone());
let mut directory = ResourceDirectory::new();
for pending in self.pending_actors {
(pending.register)(
&mut directory,
&self.id,
&bus,
&stream_registry,
self.actor_queue_capacity,
)?;
}
Ok(Node {
id: self.id,
bus,
stream_registry,
directory: Arc::new(RwLock::new(directory)),
actor_queue_capacity: self.actor_queue_capacity,
})
}
}
type RegisterPendingActor = Box<
dyn FnOnce(
&mut ResourceDirectory,
&str,
&EventBus,
&StreamRegistry,
usize,
) -> Result<(), ResourceError>
+ Send,
>;
struct PendingActor {
id: String,
register: RegisterPendingActor,
}
impl PendingActor {
fn new<A: Actor>(id: String, actor: A) -> Self {
let register_id = id.clone();
let register = Box::new(
move |directory: &mut ResourceDirectory,
node_id: &str,
bus: &EventBus,
stream_registry: &StreamRegistry,
actor_queue_capacity: usize| {
if directory.contains_id(®ister_id) {
return Err(ResourceError::Internal(format!(
"duplicate resource id: {register_id}"
)));
}
let (kind, slot, spec) = spawn_actor_entry(
actor,
node_id,
register_id.clone(),
bus,
stream_registry,
actor_queue_capacity,
);
directory.insert(register_id, kind, slot, spec)
},
);
Self { id, register }
}
}
pub struct Node {
id: String,
bus: EventBus,
stream_registry: StreamRegistry,
directory: Arc<RwLock<ResourceDirectory>>,
actor_queue_capacity: usize,
}
impl Node {
pub fn id(&self) -> &str {
&self.id
}
pub fn events(&self) -> &EventBus {
&self.bus
}
pub fn stream_registry(&self) -> &StreamRegistry {
&self.stream_registry
}
pub(crate) async fn directory_read(
&self,
) -> tokio::sync::RwLockReadGuard<'_, ResourceDirectory> {
self.directory.read().await
}
pub async fn register_actor<A: Actor>(&self, actor: A) -> Result<String, ResourceError> {
let id = Uuid::new_v4().to_string();
self.register_with_id(id.clone(), actor).await?;
Ok(id)
}
pub async fn register_with_id<A: Actor>(
&self,
id: String,
actor: A,
) -> Result<(), ResourceError> {
let mut dir = self.directory.write().await;
if dir.contains_id(&id) {
return Err(ResourceError::Internal(format!(
"duplicate resource id: {id}"
)));
}
let (kind, slot, spec) = spawn_actor_entry(
actor,
&self.id,
id.clone(),
&self.bus,
&self.stream_registry,
self.actor_queue_capacity,
);
dir.insert(id, kind, slot, spec)
}
pub async fn resources(&self) -> Vec<ResourceSnapshot> {
let entries = {
let dir = self.directory.read().await;
dir.entries().to_vec()
};
let mut out = Vec::with_capacity(entries.len());
for entry in entries {
let ctx = ResourceCtx::new_test();
match entry.snapshot(ctx, &self.id).await {
Ok(snap) => out.push(snap),
Err(ResourceError::Unavailable(_)) => {
out.push(entry.unavailable_snapshot(&self.id))
}
Err(_) => {}
}
}
out
}
pub(crate) async fn actor_specs(&self) -> Vec<ActorSpec> {
let entries = {
let dir = self.directory.read().await;
dir.entries().to_vec()
};
let mut out = Vec::with_capacity(entries.len());
for entry in entries {
let ctx = ResourceCtx::new_test();
if let Ok(spec) = entry.actor_spec(ctx, &self.id).await {
out.push(spec);
}
}
out
}
pub async fn shutdown(&self, within: Duration) {
let entries = {
let dir = self.directory.read().await;
dir.entries().to_vec()
};
for entry in entries {
let _ = entry.handle.shutdown(within).await;
let mut task_slot = entry.task.lock().await;
if let Some(task) = task_slot.take() {
let abort_handle = task.abort_handle();
if tokio::time::timeout(within, task).await.is_err() {
abort_handle.abort();
}
}
}
}
}
fn spawn_actor_entry<A: Actor>(
actor: A,
node_id: &str,
id: String,
bus: &EventBus,
stream_registry: &StreamRegistry,
actor_queue_capacity: usize,
) -> (String, ActorSlot, ResourceSpec) {
let spec = actor.spec();
let kind = spec.kind.clone();
let labels = spec.labels.clone();
let publisher = Publisher::new(bus.clone(), stream_registry.clone());
let actor_ctx =
ActorCtx::new(node_id.to_string(), id, kind.clone(), labels).with_publisher(publisher);
let (handle, task) = ActorHandle::spawn_with_task(actor, actor_queue_capacity, actor_ctx);
let slot = ActorSlot { handle, task };
(kind, slot, spec)
}