use crate::actor::handle::ActorHandle;
use crate::actor::ActorError;
use crate::actor::ActorRuntimeError;
use crate::chain::ChainEvent;
use crate::id::TheaterId;
use crate::metrics::ActorMetrics;
use crate::pack_bridge::{InterfaceHash, Value};
use crate::store::ContentStore;
use crate::ManifestConfig;
use crate::Result;
use crate::TheaterRuntimeError;
use serde::{Deserialize, Serialize};
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use tokio::sync::mpsc::Sender;
use tokio::sync::oneshot;
#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
pub enum TheaterCommand {
SpawnActor {
wasm_bytes: Vec<u8>,
name: Option<String>,
manifest: Option<ManifestConfig>,
init_bytes: Option<Vec<u8>>,
response_tx: oneshot::Sender<Result<TheaterId>>,
supervisor_tx: Option<Sender<ActorResult>>,
subscription_tx: Option<Sender<Result<ChainEvent, ActorError>>>,
},
ResumeActor {
manifest_path: String,
wasm_bytes: Option<Vec<u8>>,
response_tx: oneshot::Sender<Result<TheaterId>>,
supervisor_tx: Option<Sender<ActorResult>>,
subscription_tx: Option<Sender<Result<ChainEvent, ActorError>>>,
},
StopActor {
actor_id: TheaterId,
response_tx: oneshot::Sender<Result<()>>,
},
TerminateActor {
actor_id: TheaterId,
response_tx: oneshot::Sender<Result<()>>,
},
ShuttingDown {
actor_id: TheaterId,
data: Option<Vec<u8>>,
},
ActorShutdownComplete {
actor_id: TheaterId,
},
ShutdownRuntime,
NewEvent {
actor_id: TheaterId,
event: ChainEvent,
},
ActorError {
actor_id: TheaterId,
error: ActorError,
},
ActorRuntimeError {
error: ActorRuntimeError,
},
GetActors {
response_tx: oneshot::Sender<Result<Vec<(TheaterId, String)>>>,
},
GetActorManifest {
actor_id: TheaterId,
response_tx: oneshot::Sender<Result<ManifestConfig>>,
},
GetActorStatus {
actor_id: TheaterId,
response_tx: oneshot::Sender<Result<ActorStatus>>,
},
ListChildren {
parent_id: TheaterId,
response_tx: oneshot::Sender<Vec<TheaterId>>,
},
RestartActor {
actor_id: TheaterId,
response_tx: oneshot::Sender<Result<()>>,
},
GetActorState {
actor_id: TheaterId,
response_tx: oneshot::Sender<Result<Value>>,
},
GetActorEvents {
actor_id: TheaterId,
response_tx: oneshot::Sender<std::result::Result<Vec<ChainEvent>, TheaterRuntimeError>>,
},
GetActorMetrics {
actor_id: TheaterId,
response_tx: oneshot::Sender<Result<ActorMetrics>>,
},
SubscribeToActor {
actor_id: TheaterId,
event_tx: Sender<Result<ChainEvent, ActorError>>,
},
NewStore {
response_tx: oneshot::Sender<Result<ContentStore>>,
},
GetActorHandle {
actor_id: TheaterId,
response_tx: oneshot::Sender<Option<ActorHandle>>,
},
GetActorExportHashes {
actor_id: TheaterId,
response_tx: oneshot::Sender<Option<Vec<InterfaceHash>>>,
},
}
impl TheaterCommand {
pub fn to_log(&self) -> String {
match self {
TheaterCommand::SpawnActor { name, .. } => {
format!("SpawnActor: {}", name.as_deref().unwrap_or("<unnamed>"))
}
TheaterCommand::ResumeActor { manifest_path, .. } => {
format!("ResumeActor: {}", manifest_path)
}
TheaterCommand::StopActor { actor_id, .. } => {
format!("StopActor: {:?}", actor_id)
}
TheaterCommand::TerminateActor { actor_id, .. } => {
format!("TerminateActor: {:?}", actor_id)
}
TheaterCommand::ShuttingDown { actor_id, data } => {
format!(
"ShuttingDown: {:?} (data: {:?})",
actor_id,
data.as_ref().map(|d| String::from_utf8_lossy(d))
)
}
TheaterCommand::ActorShutdownComplete { actor_id } => {
format!("ActorShutdownComplete: {:?}", actor_id)
}
TheaterCommand::NewEvent { actor_id, .. } => {
format!("NewEvent: {:?}", actor_id)
}
TheaterCommand::ActorError { actor_id, .. } => {
format!("ActorError: {:?}", actor_id)
}
TheaterCommand::ActorRuntimeError { .. } => "ActorRuntimeError".to_string(),
TheaterCommand::GetActors { .. } => "GetActors".to_string(),
TheaterCommand::GetActorManifest { actor_id, .. } => {
format!("GetActorManifest: {:?}", actor_id)
}
TheaterCommand::GetActorStatus { actor_id, .. } => {
format!("GetActorStatus: {:?}", actor_id)
}
TheaterCommand::ListChildren { parent_id, .. } => {
format!("ListChildren: {:?}", parent_id)
}
TheaterCommand::RestartActor { actor_id, .. } => {
format!("RestartActor: {:?}", actor_id)
}
TheaterCommand::GetActorState { actor_id, .. } => {
format!("GetActorState: {:?}", actor_id)
}
TheaterCommand::GetActorEvents { actor_id, .. } => {
format!("GetActorEvents: {:?}", actor_id)
}
TheaterCommand::GetActorMetrics { actor_id, .. } => {
format!("GetActorMetrics: {:?}", actor_id)
}
TheaterCommand::SubscribeToActor { actor_id, .. } => {
format!("SubscribeToActor: {:?}", actor_id)
}
TheaterCommand::NewStore { .. } => "NewStore".to_string(),
TheaterCommand::GetActorHandle { actor_id, .. } => {
format!("GetActorHandle: {:?}", actor_id)
}
TheaterCommand::GetActorExportHashes { actor_id, .. } => {
format!("GetActorExportHashes: {:?}", actor_id)
}
TheaterCommand::ShutdownRuntime => "ShutdownRuntime".to_string(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub struct ChannelId(pub String);
impl std::fmt::Display for ChannelId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl ChannelId {
pub fn new(initiator: &ChannelParticipant, target: &ChannelParticipant) -> Self {
let mut hasher = DefaultHasher::new();
let timestamp = chrono::Utc::now().timestamp_millis();
let rand_value: u64 = rand::random();
initiator.hash(&mut hasher);
target.hash(&mut hasher);
timestamp.hash(&mut hasher);
rand_value.hash(&mut hasher);
let hash = hasher.finish();
ChannelId(format!("ch_{:016x}", hash))
}
pub fn as_str(&self) -> &str {
&self.0
}
pub fn parse(s: &str) -> Result<Self> {
if s.is_empty() {
anyhow::bail!("Channel ID cannot be empty");
}
if !s.starts_with("ch_") {
anyhow::bail!("Channel ID must start with 'ch_' prefix");
}
Ok(ChannelId(s.to_string()))
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum ChannelParticipant {
Actor(TheaterId),
External,
}
impl std::fmt::Display for ChannelParticipant {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ChannelParticipant::Actor(actor_id) => write!(f, "Actor({})", actor_id),
ChannelParticipant::External => write!(f, "External"),
}
}
}
#[derive(Debug)]
pub enum ChannelEvent {
Message {
channel_id: ChannelId,
sender_id: ChannelParticipant,
message: Vec<u8>,
},
Close { channel_id: ChannelId },
}
#[derive(Debug, Clone)]
pub enum ActorParent {
Actor(TheaterId),
External(Sender<ActorResult>),
}
#[derive(Debug)]
pub struct ActorRequest {
pub response_tx: oneshot::Sender<Vec<u8>>,
pub data: Vec<u8>,
}
#[derive(Debug)]
pub struct ActorSend {
pub data: Vec<u8>,
}
#[derive(Debug)]
pub struct ActorChannelOpen {
pub channel_id: ChannelId,
pub initiator_id: ChannelParticipant,
pub response_tx: oneshot::Sender<Result<bool>>,
pub initial_msg: Vec<u8>,
}
#[derive(Debug)]
pub struct ActorChannelMessage {
pub channel_id: ChannelId,
pub msg: Vec<u8>,
}
#[derive(Debug)]
pub struct ActorChannelClose {
pub channel_id: ChannelId,
}
#[derive(Debug)]
pub struct ActorChannelInitiated {
pub channel_id: ChannelId,
pub target_id: ChannelParticipant,
pub initial_msg: Vec<u8>,
}
#[derive(Debug)]
pub enum ActorMessage {
Request(ActorRequest),
Send(ActorSend),
ChannelOpen(ActorChannelOpen),
ChannelMessage(ActorChannelMessage),
ChannelClose(ActorChannelClose),
ChannelInitiated(ActorChannelInitiated),
}
#[derive(Debug)]
pub enum MessageCommand {
SendMessage {
target_id: TheaterId,
message: ActorMessage,
response_tx: oneshot::Sender<Result<()>>,
},
OpenChannel {
initiator_id: ChannelParticipant,
target_id: ChannelParticipant,
channel_id: ChannelId,
initial_message: Vec<u8>,
response_tx: oneshot::Sender<Result<bool>>,
},
ChannelMessage {
channel_id: ChannelId,
sender_id: ChannelParticipant,
message: Vec<u8>,
response_tx: oneshot::Sender<Result<()>>,
},
ChannelClose {
channel_id: ChannelId,
sender_id: ChannelParticipant,
response_tx: oneshot::Sender<Result<()>>,
},
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum ActorStatus {
Running,
Stopped,
Failed,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub struct ChildError {
pub actor_id: TheaterId,
pub error: ActorError,
}
impl std::fmt::Display for ChildError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "[{}] child-error: {}", self.actor_id, self.error)
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub struct ChildResult {
pub actor_id: TheaterId,
pub result: Option<Vec<u8>>,
}
impl std::fmt::Display for ChildResult {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"[{}] child-result: {}",
self.actor_id,
self.result
.as_ref()
.map(|r| String::from_utf8_lossy(r))
.unwrap_or_else(|| "None".into())
)
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub struct ChildExternalStop {
pub actor_id: TheaterId,
}
impl std::fmt::Display for ChildExternalStop {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "[{}] child-external-stop", self.actor_id)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum ActorResult {
Error(ChildError),
Success(ChildResult),
ExternalStop(ChildExternalStop),
}
impl std::fmt::Display for ActorResult {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ActorResult::Error(err) => write!(f, "{}", err),
ActorResult::Success(res) => write!(f, "{}", res),
ActorResult::ExternalStop(stop) => write!(f, "{}", stop),
}
}
}