use crate::graph::Graph;
use crate::predicate::BoxedPredicate;
use core::any::Any;
use core::hash::{Hash, Hasher};
use polaris_system::plugin::{IntoScheduleIds, ScheduleId};
use polaris_system::resource::LocalResource;
use polaris_system::system::{BoxedSystem, ErasedSystem, IntoSystem};
use std::any::TypeId;
use std::fmt;
use std::marker::PhantomData;
use std::sync::Arc;
use std::time::Duration;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct NodeId(Arc<str>);
impl LocalResource for NodeId {}
impl NodeId {
#[must_use]
pub fn new() -> Self {
Self(nanoid::nanoid!(8).into())
}
#[must_use]
pub fn from_string(id: impl Into<Arc<str>>) -> Self {
Self(id.into())
}
#[must_use]
pub fn as_str(&self) -> &str {
&self.0
}
}
impl Default for NodeId {
fn default() -> Self {
Self::new()
}
}
impl fmt::Display for NodeId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "node_{}", self.0)
}
}
impl IntoIterator for NodeId {
type Item = NodeId;
type IntoIter = std::iter::Once<NodeId>;
fn into_iter(self) -> Self::IntoIter {
std::iter::once(self)
}
}
#[derive(Debug)]
#[non_exhaustive]
pub enum Node {
System(SystemNode),
Decision(DecisionNode),
Switch(SwitchNode),
Parallel(ParallelNode),
Loop(LoopNode),
Scope(ScopeNode),
}
impl Node {
#[must_use]
pub fn id(&self) -> NodeId {
match self {
Node::System(n) => n.id.clone(),
Node::Decision(n) => n.id.clone(),
Node::Switch(n) => n.id.clone(),
Node::Parallel(n) => n.id.clone(),
Node::Loop(n) => n.id.clone(),
Node::Scope(n) => n.id.clone(),
}
}
#[must_use]
pub fn name(&self) -> &'static str {
match self {
Node::System(n) => n.name(),
Node::Decision(n) => n.name,
Node::Switch(n) => n.name,
Node::Parallel(n) => n.name,
Node::Loop(n) => n.name,
Node::Scope(n) => n.name,
}
}
}
#[derive(Debug, Clone)]
pub enum RetryPolicy {
Fixed {
max_retries: usize,
delay: Duration,
},
Exponential {
max_retries: usize,
initial_delay: Duration,
max_delay: Option<Duration>,
},
}
impl RetryPolicy {
#[must_use]
pub fn fixed(max_retries: usize, delay: Duration) -> Self {
RetryPolicy::Fixed { max_retries, delay }
}
#[must_use]
pub fn exponential(max_retries: usize, initial_delay: Duration) -> Self {
RetryPolicy::Exponential {
max_retries,
initial_delay,
max_delay: None,
}
}
#[must_use]
pub fn with_max_delay(mut self, max_delay: Duration) -> Self {
if let RetryPolicy::Exponential {
max_delay: ref mut md,
..
} = self
{
*md = Some(max_delay);
}
self
}
#[must_use]
pub fn max_retries(&self) -> usize {
match self {
RetryPolicy::Fixed { max_retries, .. }
| RetryPolicy::Exponential { max_retries, .. } => *max_retries,
}
}
#[must_use]
pub fn delay_for_attempt(&self, attempt: usize) -> Duration {
match self {
RetryPolicy::Fixed { delay, .. } => *delay,
RetryPolicy::Exponential {
initial_delay,
max_delay,
..
} => {
let multiplier = 1u32.checked_shl(attempt as u32);
let delay = if let Some(m) = multiplier {
initial_delay.saturating_mul(m)
} else {
max_delay.unwrap_or(Duration::MAX)
};
if let Some(cap) = max_delay {
delay.min(*cap)
} else {
delay
}
}
}
}
}
pub struct SystemNode {
pub id: NodeId,
pub system: BoxedSystem,
pub timeout: Option<Duration>,
pub retry_policy: Option<RetryPolicy>,
pub schedules: Vec<ScheduleId>,
}
impl SystemNode {
#[must_use]
pub fn new<S: ErasedSystem>(system: S) -> Self {
Self {
id: NodeId::new(),
system: Box::new(system),
timeout: None,
retry_policy: None,
schedules: Vec::new(),
}
}
#[must_use]
pub fn new_boxed(system: BoxedSystem) -> Self {
Self {
id: NodeId::new(),
system,
timeout: None,
retry_policy: None,
schedules: Vec::new(),
}
}
#[must_use]
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
self
}
#[must_use]
pub fn with_schedules(mut self, schedules: Vec<ScheduleId>) -> Self {
self.schedules = schedules;
self
}
#[must_use]
pub fn name(&self) -> &'static str {
self.system.name()
}
#[must_use]
pub fn output_type_id(&self) -> TypeId {
self.system.output_type_id()
}
#[must_use]
pub fn output_type_name(&self) -> &'static str {
self.system.output_type_name()
}
}
impl fmt::Debug for SystemNode {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SystemNode")
.field("id", &self.id)
.field("name", &self.name())
.field("output_type", &self.output_type_name())
.field("schedules", &self.schedules)
.finish()
}
}
pub struct DecisionNode {
pub id: NodeId,
pub name: &'static str,
pub predicate: Option<BoxedPredicate>,
pub true_branch: Option<NodeId>,
pub false_branch: Option<NodeId>,
}
impl DecisionNode {
#[must_use]
pub fn new(name: &'static str) -> Self {
Self {
id: NodeId::new(),
name,
predicate: None,
true_branch: None,
false_branch: None,
}
}
#[must_use]
pub fn with_predicate(name: &'static str, predicate: BoxedPredicate) -> Self {
Self {
id: NodeId::new(),
name,
predicate: Some(predicate),
true_branch: None,
false_branch: None,
}
}
}
impl fmt::Debug for DecisionNode {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("DecisionNode")
.field("id", &self.id)
.field("name", &self.name)
.field("has_predicate", &self.predicate.is_some())
.field("true_branch", &self.true_branch)
.field("false_branch", &self.false_branch)
.finish()
}
}
pub struct SwitchNode {
pub id: NodeId,
pub name: &'static str,
pub discriminator: Option<crate::predicate::BoxedDiscriminator>,
pub cases: Vec<(&'static str, NodeId)>,
pub default: Option<NodeId>,
}
impl SwitchNode {
#[must_use]
pub fn new(name: &'static str) -> Self {
Self {
id: NodeId::new(),
name,
discriminator: None,
cases: Vec::new(),
default: None,
}
}
#[must_use]
pub fn with_discriminator(
name: &'static str,
discriminator: crate::predicate::BoxedDiscriminator,
) -> Self {
Self {
id: NodeId::new(),
name,
discriminator: Some(discriminator),
cases: Vec::new(),
default: None,
}
}
}
impl fmt::Debug for SwitchNode {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SwitchNode")
.field("id", &self.id)
.field("name", &self.name)
.field("has_discriminator", &self.discriminator.is_some())
.field("cases", &self.cases)
.field("default", &self.default)
.finish()
}
}
#[derive(Debug)]
pub struct ParallelNode {
pub id: NodeId,
pub name: &'static str,
pub branches: Vec<NodeId>,
}
impl ParallelNode {
#[must_use]
pub fn new(name: &'static str) -> Self {
Self {
id: NodeId::new(),
name,
branches: Vec::new(),
}
}
}
pub struct LoopNode {
pub id: NodeId,
pub name: &'static str,
pub termination: Option<BoxedPredicate>,
pub max_iterations: Option<usize>,
pub body_entry: Option<NodeId>,
}
impl LoopNode {
#[must_use]
pub fn new(name: &'static str) -> Self {
Self {
id: NodeId::new(),
name,
termination: None,
max_iterations: None,
body_entry: None,
}
}
#[must_use]
pub fn with_termination(name: &'static str, termination: BoxedPredicate) -> Self {
Self {
id: NodeId::new(),
name,
termination: Some(termination),
max_iterations: None,
body_entry: None,
}
}
#[must_use]
pub fn with_max_iterations(name: &'static str, max_iterations: usize) -> Self {
Self {
id: NodeId::new(),
name,
termination: None,
max_iterations: Some(max_iterations),
body_entry: None,
}
}
}
impl fmt::Debug for LoopNode {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("LoopNode")
.field("id", &self.id)
.field("name", &self.name)
.field("has_termination", &self.termination.is_some())
.field("max_iterations", &self.max_iterations)
.field("body_entry", &self.body_entry)
.finish()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum ContextMode {
Shared,
Inherit,
Isolated,
}
impl fmt::Display for ContextMode {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Shared => f.write_str("Shared"),
Self::Inherit => f.write_str("Inherit"),
Self::Isolated => f.write_str("Isolated"),
}
}
}
#[derive(Clone)]
pub struct ResourceForward {
pub(crate) type_id: TypeId,
pub(crate) type_name: &'static str,
pub(crate) clone_fn: fn(&dyn Any) -> Option<Box<dyn Any + Send + Sync>>,
}
impl fmt::Debug for ResourceForward {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ResourceForward")
.field("type_id", &self.type_id)
.field("type_name", &self.type_name)
.finish()
}
}
impl PartialEq for ResourceForward {
fn eq(&self, other: &Self) -> bool {
self.type_id == other.type_id
}
}
impl Eq for ResourceForward {}
impl Hash for ResourceForward {
fn hash<H: Hasher>(&self, state: &mut H) {
self.type_id.hash(state);
}
}
impl ResourceForward {
#[must_use]
pub fn type_id(&self) -> TypeId {
self.type_id
}
#[must_use]
pub fn type_name(&self) -> &'static str {
self.type_name
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ContextPolicy {
pub(crate) mode: ContextMode,
pub(crate) forward_resources: Vec<ResourceForward>,
}
impl ContextPolicy {
#[must_use]
pub fn shared() -> Self {
Self {
mode: ContextMode::Shared,
forward_resources: Vec::new(),
}
}
#[must_use]
pub fn inherit() -> Self {
Self {
mode: ContextMode::Inherit,
forward_resources: Vec::new(),
}
}
#[must_use]
pub fn isolated() -> Self {
Self {
mode: ContextMode::Isolated,
forward_resources: Vec::new(),
}
}
#[must_use]
pub fn forward<T: LocalResource + Clone>(mut self) -> Self {
if self.mode == ContextMode::Shared {
tracing::debug!(
resource = core::any::type_name::<T>(),
"forward() has no effect on ContextPolicy::shared() — resources are already accessible",
);
}
self.forward_resources.push(ResourceForward {
type_id: TypeId::of::<T>(),
type_name: core::any::type_name::<T>(),
clone_fn: |any| Some(Box::new(any.downcast_ref::<T>()?.clone())),
});
self
}
#[must_use]
pub fn mode(&self) -> ContextMode {
self.mode
}
#[must_use]
pub fn forward_resources(&self) -> &[ResourceForward] {
&self.forward_resources
}
}
#[derive(Debug)]
pub struct ScopeNode {
pub id: NodeId,
pub name: &'static str,
pub(crate) graph: Graph,
pub(crate) context_policy: ContextPolicy,
}
impl ScopeNode {
#[must_use]
pub fn new(name: &'static str, graph: Graph, context_policy: ContextPolicy) -> Self {
Self {
id: NodeId::new(),
name,
graph,
context_policy,
}
}
#[must_use]
pub fn graph(&self) -> &Graph {
&self.graph
}
#[must_use]
pub fn context_policy(&self) -> &ContextPolicy {
&self.context_policy
}
}
pub trait IntoSystemNode<Marker> {
fn into_system_node(self) -> (BoxedSystem, Vec<ScheduleId>);
}
pub struct NodeMarker<M>(PhantomData<M>);
pub struct ScheduledNodeMarker<M>(PhantomData<M>);
impl<S, M> IntoSystemNode<NodeMarker<M>> for S
where
S: IntoSystem<M>,
S::System: 'static,
{
fn into_system_node(self) -> (BoxedSystem, Vec<ScheduleId>) {
(Box::new(self.into_system()), Vec::new())
}
}
impl<Sch, S, M> IntoSystemNode<ScheduledNodeMarker<M>> for (Sch, S)
where
Sch: IntoScheduleIds,
S: IntoSystem<M>,
S::System: 'static,
{
fn into_system_node(self) -> (BoxedSystem, Vec<ScheduleId>) {
(Box::new(self.1.into_system()), Sch::schedule_ids())
}
}
#[cfg(test)]
mod tests {
use super::*;
use polaris_system::plugin::Schedule;
use polaris_system::system::IntoSystem;
async fn test_system() -> String {
"hello".to_string()
}
async fn sys_fn() -> i32 {
42
}
#[test]
fn node_id_uniqueness() {
let id1 = NodeId::new();
let id2 = NodeId::new();
assert_ne!(id1, id2);
}
#[test]
fn system_node_creation() {
let system = test_system.into_system();
let node = SystemNode::new(system);
assert!(!node.id.as_str().is_empty());
assert!(node.name().contains("test_system"));
}
#[test]
fn node_enum_accessors() {
let system = Node::System(SystemNode::new(sys_fn.into_system()));
assert!(!system.id().as_str().is_empty());
assert!(system.name().contains("sys_fn"));
let decision = Node::Decision(DecisionNode::new("dec"));
assert!(!decision.id().as_str().is_empty());
assert_eq!(decision.name(), "dec");
}
#[test]
fn system_node_preserves_type_info() {
let system = sys_fn.into_system();
let node = SystemNode::new(system);
assert_eq!(node.output_type_id(), TypeId::of::<i32>());
assert!(node.output_type_name().contains("i32"));
}
#[test]
fn retry_policy_fixed_delay() {
let policy = RetryPolicy::fixed(3, Duration::from_millis(100));
assert_eq!(policy.max_retries(), 3);
assert_eq!(policy.delay_for_attempt(0), Duration::from_millis(100));
assert_eq!(policy.delay_for_attempt(1), Duration::from_millis(100));
assert_eq!(policy.delay_for_attempt(2), Duration::from_millis(100));
}
#[test]
fn retry_policy_exponential_delay() {
let policy = RetryPolicy::exponential(4, Duration::from_millis(100));
assert_eq!(policy.max_retries(), 4);
assert_eq!(policy.delay_for_attempt(0), Duration::from_millis(100));
assert_eq!(policy.delay_for_attempt(1), Duration::from_millis(200));
assert_eq!(policy.delay_for_attempt(2), Duration::from_millis(400));
assert_eq!(policy.delay_for_attempt(3), Duration::from_millis(800));
}
#[test]
fn retry_policy_exponential_with_max_delay() {
let policy = RetryPolicy::exponential(4, Duration::from_millis(100))
.with_max_delay(Duration::from_millis(300));
assert_eq!(policy.delay_for_attempt(0), Duration::from_millis(100));
assert_eq!(policy.delay_for_attempt(1), Duration::from_millis(200));
assert_eq!(policy.delay_for_attempt(2), Duration::from_millis(300));
assert_eq!(policy.delay_for_attempt(3), Duration::from_millis(300));
}
#[test]
fn retry_policy_with_max_delay_no_effect_on_fixed() {
let policy = RetryPolicy::fixed(2, Duration::from_millis(100))
.with_max_delay(Duration::from_millis(50));
assert_eq!(policy.delay_for_attempt(0), Duration::from_millis(100));
}
struct MarkerA;
impl Schedule for MarkerA {}
struct MarkerB;
impl Schedule for MarkerB {}
#[test]
fn into_system_node_bare() {
let (_, schedules) = sys_fn.into_system_node();
assert!(schedules.is_empty());
}
#[test]
fn into_system_node_single_schedule() {
let (_, schedules) = (MarkerA, sys_fn).into_system_node();
assert_eq!(schedules.len(), 1);
assert_eq!(schedules[0], ScheduleId::of::<MarkerA>());
}
#[test]
fn into_system_node_multi_schedules() {
let (_, schedules) = ((MarkerA, MarkerB), sys_fn).into_system_node();
assert_eq!(schedules.len(), 2);
assert_eq!(schedules[0], ScheduleId::of::<MarkerA>());
assert_eq!(schedules[1], ScheduleId::of::<MarkerB>());
}
#[test]
fn system_node_with_schedules() {
let node = SystemNode::new(sys_fn.into_system()).with_schedules(vec![
ScheduleId::of::<MarkerA>(),
ScheduleId::of::<MarkerB>(),
]);
assert_eq!(node.schedules.len(), 2);
assert_eq!(node.schedules[0], ScheduleId::of::<MarkerA>());
assert_eq!(node.schedules[1], ScheduleId::of::<MarkerB>());
}
#[test]
fn context_policy_shared() {
let policy = ContextPolicy::shared();
assert!(matches!(policy.mode, ContextMode::Shared));
assert!(policy.forward_resources.is_empty());
}
#[test]
fn context_policy_inherit() {
let policy = ContextPolicy::inherit();
assert!(matches!(policy.mode, ContextMode::Inherit));
}
#[test]
fn context_policy_isolated() {
let policy = ContextPolicy::isolated();
assert!(matches!(policy.mode, ContextMode::Isolated));
}
use polaris_system::resource::LocalResource;
#[derive(Clone)]
struct TestRes;
impl LocalResource for TestRes {}
#[test]
fn context_policy_forward() {
let policy = ContextPolicy::inherit().forward::<TestRes>();
assert_eq!(policy.forward_resources.len(), 1);
assert_eq!(policy.forward_resources[0].type_id, TypeId::of::<TestRes>());
}
#[test]
fn scope_node_accessors() {
let inner = Graph::new();
let scope = ScopeNode {
id: NodeId::new(),
name: "test_scope",
graph: inner,
context_policy: ContextPolicy::shared(),
};
let node = Node::Scope(scope);
assert_eq!(node.name(), "test_scope");
assert!(!node.id().as_str().is_empty());
}
}