#![allow(
dead_code,
missing_docs,
reason = "shared test utilities — not all items used in every test binary"
)]
use polaris_graph::dev::{DevToolsPlugin, SystemInfo};
use polaris_graph::graph::Graph;
use polaris_graph::hooks::HooksAPI;
use polaris_graph::node::NodeId;
use polaris_graph::{CaughtError, ErrorKind};
use polaris_system::param::{ParamError, SystemContext};
use polaris_system::plugin::Plugin;
use polaris_system::resource::LocalResource;
use polaris_system::server::Server;
use polaris_system::system::{BoxFuture, System, SystemError};
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Mutex};
pub fn create_test_server() -> Server {
let mut server = Server::new();
DevToolsPlugin::default().build(&mut server);
server
}
pub fn get_hooks(server: &Server) -> Option<&HooksAPI> {
server.api::<HooksAPI>()
}
pub fn branch<F>(f: F) -> Box<dyn FnOnce(&mut Graph)>
where
F: FnOnce(&mut Graph) + 'static,
{
Box::new(f)
}
#[derive(Clone, Default)]
pub struct HandlerLog {
invoked: Arc<Mutex<bool>>,
}
impl LocalResource for HandlerLog {}
impl HandlerLog {
pub fn was_invoked(&self) -> bool {
*self.invoked.lock().unwrap()
}
pub fn mark_invoked(&self) {
*self.invoked.lock().unwrap() = true;
}
}
#[derive(Clone, Default)]
pub struct ExecutionLog {
executed: Arc<Mutex<Vec<NodeId>>>,
}
impl LocalResource for ExecutionLog {}
impl ExecutionLog {
pub fn record(&self, node_id: &NodeId) {
self.executed.lock().unwrap().push(node_id.clone());
}
pub fn executed(&self) -> Vec<NodeId> {
self.executed.lock().unwrap().clone()
}
pub fn count(&self, node_id: &NodeId) -> usize {
self.executed
.lock()
.unwrap()
.iter()
.filter(|id| *id == node_id)
.count()
}
}
pub struct SuccessSystem;
impl System for SuccessSystem {
type Output = ();
fn run<'a>(
&'a self,
_ctx: &'a SystemContext<'_>,
) -> BoxFuture<'a, Result<Self::Output, SystemError>> {
Box::pin(async move { Ok(()) })
}
fn name(&self) -> &'static str {
"success_system"
}
}
pub struct FailingSystem;
impl System for FailingSystem {
type Output = ();
fn run<'a>(
&'a self,
_ctx: &'a SystemContext<'_>,
) -> BoxFuture<'a, Result<Self::Output, SystemError>> {
Box::pin(async move { Err(SystemError::ExecutionError("intentional failure".into())) })
}
fn name(&self) -> &'static str {
"failing_system"
}
fn is_fallible(&self) -> bool {
true
}
}
pub struct SlowSystem {
pub duration: std::time::Duration,
}
impl System for SlowSystem {
type Output = ();
fn run<'a>(
&'a self,
_ctx: &'a SystemContext<'_>,
) -> BoxFuture<'a, Result<Self::Output, SystemError>> {
let duration = self.duration;
Box::pin(async move {
tokio::time::sleep(duration).await;
Ok(())
})
}
fn name(&self) -> &'static str {
"slow_system"
}
}
pub struct HandlerSystem;
impl System for HandlerSystem {
type Output = ();
fn run<'a>(
&'a self,
ctx: &'a SystemContext<'_>,
) -> BoxFuture<'a, Result<Self::Output, SystemError>> {
Box::pin(async move {
if let Ok(log) = ctx.get_resource::<HandlerLog>() {
log.mark_invoked();
}
Ok(())
})
}
fn name(&self) -> &'static str {
"handler_system"
}
}
pub struct FlagSystem {
pub flag: Arc<Mutex<bool>>,
}
impl System for FlagSystem {
type Output = ();
fn run<'a>(
&'a self,
_ctx: &'a SystemContext<'_>,
) -> BoxFuture<'a, Result<Self::Output, SystemError>> {
let flag = Arc::clone(&self.flag);
Box::pin(async move {
*flag.lock().unwrap() = true;
Ok(())
})
}
fn name(&self) -> &'static str {
"flag_system"
}
}
pub struct LoggingSystem;
impl System for LoggingSystem {
type Output = NodeId;
fn run<'a>(
&'a self,
ctx: &'a SystemContext<'_>,
) -> BoxFuture<'a, Result<Self::Output, SystemError>> {
Box::pin(async move {
let info = ctx
.get_resource::<SystemInfo>()
.expect("SystemInfo not injected by DevToolsPlugin");
let node_id = info.node_id();
let log = ctx
.get_resource::<ExecutionLog>()
.expect("ExecutionLog resource not found");
log.record(&node_id);
Ok(node_id)
})
}
fn name(&self) -> &'static str {
"logging_system"
}
fn access(&self) -> polaris_system::param::SystemAccess {
polaris_system::param::SystemAccess::new()
.with_read::<SystemInfo>()
.with_read::<ExecutionLog>()
}
}
pub fn add_tracker(g: &mut Graph) -> NodeId {
g.add_boxed_system(Box::new(LoggingSystem))
}
#[derive(Debug, Clone)]
pub struct ProducerOutput {
pub value: i32,
}
pub struct ProducerSystem {
pub value: i32,
}
impl System for ProducerSystem {
type Output = ProducerOutput;
fn run<'a>(
&'a self,
_ctx: &'a SystemContext<'_>,
) -> BoxFuture<'a, Result<Self::Output, SystemError>> {
let value = self.value;
Box::pin(async move { Ok(ProducerOutput { value }) })
}
fn name(&self) -> &'static str {
"producer_system"
}
}
pub struct ConsumerSystem {
pub received: Arc<Mutex<Option<i32>>>,
}
impl System for ConsumerSystem {
type Output = ();
fn run<'a>(
&'a self,
ctx: &'a SystemContext<'_>,
) -> BoxFuture<'a, Result<Self::Output, SystemError>> {
let received = Arc::clone(&self.received);
Box::pin(async move {
let output = ctx
.get_output::<ProducerOutput>()
.expect("ProducerOutput should be available");
*received.lock().unwrap() = Some(output.value);
Ok(())
})
}
fn name(&self) -> &'static str {
"consumer_system"
}
}
#[derive(Debug)]
pub struct DecisionOutput {
pub take_true: bool,
}
pub struct DecisionSystem {
pub take_true: bool,
}
impl System for DecisionSystem {
type Output = DecisionOutput;
fn run<'a>(
&'a self,
_ctx: &'a SystemContext<'_>,
) -> BoxFuture<'a, Result<Self::Output, SystemError>> {
let take_true = self.take_true;
Box::pin(async move { Ok(DecisionOutput { take_true }) })
}
fn name(&self) -> &'static str {
"decision_system"
}
}
#[derive(Debug)]
pub struct SwitchOutput {
pub key: &'static str,
}
pub struct SwitchKeySystem {
pub key: &'static str,
}
impl System for SwitchKeySystem {
type Output = SwitchOutput;
fn run<'a>(
&'a self,
_ctx: &'a SystemContext<'_>,
) -> BoxFuture<'a, Result<Self::Output, SystemError>> {
let key = self.key;
Box::pin(async move { Ok(SwitchOutput { key }) })
}
fn name(&self) -> &'static str {
"switch_key_system"
}
}
#[derive(Debug)]
pub struct LoopState {
pub iteration: usize,
}
pub struct LoopIterationSystem {
pub counter: Arc<Mutex<usize>>,
}
impl System for LoopIterationSystem {
type Output = LoopState;
fn run<'a>(
&'a self,
_ctx: &'a SystemContext<'_>,
) -> BoxFuture<'a, Result<Self::Output, SystemError>> {
let counter = Arc::clone(&self.counter);
Box::pin(async move {
let mut count = counter.lock().unwrap();
*count += 1;
Ok(LoopState { iteration: *count })
})
}
fn name(&self) -> &'static str {
"loop_iteration_system"
}
}
pub struct InitialStateSystem;
impl System for InitialStateSystem {
type Output = LoopState;
fn run<'a>(
&'a self,
_ctx: &'a SystemContext<'_>,
) -> BoxFuture<'a, Result<Self::Output, SystemError>> {
Box::pin(async move { Ok(LoopState { iteration: 0 }) })
}
fn name(&self) -> &'static str {
"initial_state_system"
}
}
pub struct ParamFailingSystem;
impl System for ParamFailingSystem {
type Output = ();
fn run<'a>(
&'a self,
_ctx: &'a SystemContext<'_>,
) -> BoxFuture<'a, Result<Self::Output, SystemError>> {
Box::pin(async move {
Err(SystemError::ParamError(ParamError::ResourceNotFound(
"MissingType",
)))
})
}
fn name(&self) -> &'static str {
"param_failing_system"
}
}
pub struct KindCheckingHandler;
impl System for KindCheckingHandler {
type Output = ();
fn run<'a>(
&'a self,
ctx: &'a SystemContext<'_>,
) -> BoxFuture<'a, Result<Self::Output, SystemError>> {
Box::pin(async move {
if let Ok(log) = ctx.get_resource::<HandlerLog>() {
log.mark_invoked();
}
if let Ok(caught) = ctx.get_output::<CaughtError>()
&& let Ok(kind_log) = ctx.get_resource::<ErrorKindLog>()
{
kind_log.record(caught.kind);
}
Ok(())
})
}
fn name(&self) -> &'static str {
"kind_checking_handler"
}
}
#[derive(Clone, Default)]
pub struct ErrorKindLog {
kind: Arc<Mutex<Option<ErrorKind>>>,
}
impl LocalResource for ErrorKindLog {}
impl ErrorKindLog {
pub fn kind(&self) -> Option<ErrorKind> {
*self.kind.lock().unwrap()
}
pub fn record(&self, kind: ErrorKind) {
*self.kind.lock().unwrap() = Some(kind);
}
}
pub struct EventuallySucceedsSystem {
pub fail_count: u32,
pub attempts: Arc<AtomicU32>,
}
impl System for EventuallySucceedsSystem {
type Output = ();
fn run<'a>(
&'a self,
_ctx: &'a SystemContext<'_>,
) -> BoxFuture<'a, Result<Self::Output, SystemError>> {
Box::pin(async move {
let attempt = self.attempts.fetch_add(1, Ordering::SeqCst);
if attempt < self.fail_count {
Err(SystemError::ExecutionError(format!(
"transient failure (attempt {attempt})"
)))
} else {
Ok(())
}
})
}
fn name(&self) -> &'static str {
"eventually_succeeds_system"
}
}
#[derive(Debug, Clone)]
pub struct TestConfig {
pub value: i32,
}
impl LocalResource for TestConfig {}
pub struct ReadConfigCapture {
pub captured: Arc<Mutex<Option<i32>>>,
}
impl System for ReadConfigCapture {
type Output = ();
fn run<'a>(
&'a self,
ctx: &'a SystemContext<'_>,
) -> BoxFuture<'a, Result<Self::Output, SystemError>> {
let captured = Arc::clone(&self.captured);
Box::pin(async move {
let config = ctx
.get_resource::<TestConfig>()
.map_err(|err| SystemError::ExecutionError(err.to_string()))?;
*captured.lock().unwrap() = Some(config.value);
Ok(())
})
}
fn name(&self) -> &'static str {
"read_config_capture"
}
}
pub struct ReadConfigSystem;
impl System for ReadConfigSystem {
type Output = ();
fn run<'a>(
&'a self,
_ctx: &'a SystemContext<'_>,
) -> BoxFuture<'a, Result<Self::Output, SystemError>> {
Box::pin(async { Ok(()) })
}
fn name(&self) -> &'static str {
"read_config_system"
}
fn access(&self) -> polaris_system::param::SystemAccess {
polaris_system::param::SystemAccess::new().with_read::<TestConfig>()
}
}
pub struct WriteConfigSystem;
impl System for WriteConfigSystem {
type Output = ();
fn run<'a>(
&'a self,
_ctx: &'a SystemContext<'_>,
) -> BoxFuture<'a, Result<Self::Output, SystemError>> {
Box::pin(async { Ok(()) })
}
fn name(&self) -> &'static str {
"write_config_system"
}
fn access(&self) -> polaris_system::param::SystemAccess {
polaris_system::param::SystemAccess::new().with_write::<TestConfig>()
}
}
#[derive(Clone, Default)]
pub struct TrackerNodes(Arc<Mutex<Vec<NodeId>>>);
impl TrackerNodes {
pub fn add(&self, id: NodeId) {
self.0.lock().unwrap().push(id);
}
pub fn into_vec(self) -> Vec<NodeId> {
Arc::try_unwrap(self.0)
.map(|m| m.into_inner().unwrap())
.unwrap_or_else(|arc| arc.lock().unwrap().clone())
}
}