use std::collections::BTreeMap;
use std::sync::Arc;
use serde_json::Value as JsonValue;
use super::context::{CommandId, EmissionContext, EnvelopePlan, Publisher, RequestCtx};
use super::node::Node;
use super::resource::{DynFuture, Resource, ResourceError, TransitionOutcome};
use super::transition::TransitionInput;
use crate::events::{NodeId, PublishError, TraceContext};
#[derive(Clone)]
pub struct TransitionCtx {
command_id: CommandId,
request: RequestCtx,
node_id: String,
node: Option<Arc<Node>>,
actor: Option<ActorCtx>,
}
impl std::fmt::Debug for TransitionCtx {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TransitionCtx")
.field("command_id", &self.command_id)
.field("request", &self.request)
.field("node_id", &self.node_id)
.field("node_attached", &self.node.is_some())
.field("actor", &self.actor)
.finish()
}
}
impl TransitionCtx {
pub fn new(request: RequestCtx, node: impl Into<String>) -> Self {
Self {
command_id: CommandId::new(),
request,
node_id: node.into(),
node: None,
actor: None,
}
}
pub fn with_node(request: RequestCtx, node: Arc<Node>) -> Self {
let node_id = node.id().to_string();
Self {
command_id: CommandId::new(),
request,
node_id,
node: Some(node),
actor: None,
}
}
pub fn new_test() -> Self {
Self::new(RequestCtx::default(), "test")
}
pub(crate) fn with_actor(mut self, actor: ActorCtx) -> Self {
self.actor = Some(actor);
self
}
pub fn command_id(&self) -> &CommandId {
&self.command_id
}
pub fn request(&self) -> &RequestCtx {
&self.request
}
pub fn node(&self) -> &str {
&self.node_id
}
pub fn resource_id(&self) -> Option<&str> {
self.actor.as_ref().map(|actor| actor.resource_id())
}
pub async fn register_actor<A: Actor>(&self, actor: A) -> Result<String, TransitionError> {
let Some(node) = self.node.as_ref() else {
return Err(TransitionError::Internal(
"TransitionCtx has no Node attached; build with TransitionCtx::with_node".into(),
));
};
node.register_actor(actor)
.await
.map_err(TransitionError::from)
}
pub async fn publish(
&self,
stream: &str,
payload_kind: &str,
payload_version: u32,
data: JsonValue,
) -> Result<(), TransitionError> {
let actor = self.actor.as_ref().ok_or_else(|| {
TransitionError::Internal("TransitionCtx has no actor identity".into())
})?;
self.publish_for_resource(
&actor.resource_id,
&actor.resource_kind,
stream,
payload_kind,
payload_version,
data,
)
.await
}
pub(crate) async fn publish_for_resource(
&self,
resource_id: &str,
resource_kind: &str,
stream: &str,
payload_kind: &str,
payload_version: u32,
data: JsonValue,
) -> Result<(), TransitionError> {
let actor = self.actor.as_ref().ok_or_else(|| {
TransitionError::Internal("TransitionCtx has no actor identity".into())
})?;
let publisher = actor.publisher.as_ref().ok_or_else(|| {
TransitionError::Internal("ActorCtx has no publisher attached".into())
})?;
let trace = self.request.traceparent().map(|tp| TraceContext {
traceparent: tp.to_string(),
tracestate: self.request.tracestate().map(String::from),
});
let node_id = NodeId::new(actor.node.clone());
publisher
.publish(
EnvelopePlan {
node_id: &node_id,
resource_id,
resource_kind,
stream,
payload_kind,
payload_version,
data,
},
EmissionContext {
correlation: self.request.request_id(),
causation: Some(self.command_id.as_str()),
trace,
},
)
.await
.map_err(transition_publish_error)
}
}
fn transition_publish_error(err: PublishError) -> TransitionError {
TransitionError::Internal(format!("publish failed: {err}"))
}
impl From<ResourceError> for TransitionError {
fn from(err: ResourceError) -> Self {
match err {
ResourceError::NotFound(id) => TransitionError::ResourceNotFound(id),
ResourceError::Unavailable(msg) => TransitionError::Internal(msg),
ResourceError::Internal(msg) => TransitionError::Internal(msg),
}
}
}
#[derive(Clone, Default)]
pub struct ActorCtx {
pub(crate) node: String,
pub(crate) resource_id: String,
pub(crate) resource_kind: String,
pub(crate) labels: BTreeMap<String, String>,
pub(crate) publisher: Option<Publisher>,
}
impl std::fmt::Debug for ActorCtx {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ActorCtx")
.field("node", &self.node)
.field("resource_id", &self.resource_id)
.field("resource_kind", &self.resource_kind)
.field("labels", &self.labels)
.field("publisher_attached", &self.publisher.is_some())
.finish()
}
}
impl ActorCtx {
pub fn new(
node: impl Into<String>,
resource_id: impl Into<String>,
resource_kind: impl Into<String>,
labels: BTreeMap<String, String>,
) -> Self {
Self {
node: node.into(),
resource_id: resource_id.into(),
resource_kind: resource_kind.into(),
labels,
publisher: None,
}
}
pub(crate) fn with_publisher(mut self, publisher: Publisher) -> Self {
self.publisher = Some(publisher);
self
}
pub fn new_test() -> Self {
Self::default()
}
pub fn node(&self) -> &str {
&self.node
}
pub fn resource_id(&self) -> &str {
&self.resource_id
}
pub fn resource_kind(&self) -> &str {
&self.resource_kind
}
pub fn labels(&self) -> &BTreeMap<String, String> {
&self.labels
}
pub async fn publish(
&self,
stream: &str,
payload_kind: &str,
payload_version: u32,
data: JsonValue,
) -> Result<(), ActorError> {
let publisher = self
.publisher
.as_ref()
.ok_or_else(|| ActorError::Internal("ActorCtx has no publisher attached".into()))?;
let node_id = NodeId::new(self.node.clone());
publisher
.publish(
EnvelopePlan {
node_id: &node_id,
resource_id: &self.resource_id,
resource_kind: &self.resource_kind,
stream,
payload_kind,
payload_version,
data,
},
EmissionContext::default(),
)
.await
.map_err(|e| ActorError::Internal(format!("publish failed: {e}")))
}
}
#[derive(Debug)]
pub enum TransitionError {
InvalidInput(String),
NotAllowed(String),
Conflict(String),
Busy,
BackpressureRequired,
Timeout,
ResourceNotFound(String),
Internal(String),
}
#[derive(Debug)]
pub enum ActorError {
StartFailed(String),
StopFailed(String),
Internal(String),
}
pub trait Actor: Resource {
fn transition<'a>(
&'a mut self,
ctx: TransitionCtx,
name: &'a str,
input: TransitionInput,
) -> DynFuture<'a, Result<TransitionOutcome, TransitionError>>;
fn on_start<'a>(&'a mut self, _ctx: ActorCtx) -> DynFuture<'a, Result<(), ActorError>> {
Box::pin(async { Ok(()) })
}
fn on_stop<'a>(&'a mut self, _ctx: ActorCtx) -> DynFuture<'a, Result<(), ActorError>> {
Box::pin(async { Ok(()) })
}
}