use std::path::PathBuf;
use std::sync::Arc;
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
use crate::error::RoplatError;
use crate::node::Node;
use crate::resource::backed::Backend;
use crate::resource::{Interface, ResHandle, backed};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum NodeStatus {
Idle,
Running,
Completed,
Failed(String),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GraphNode {
pub id: String,
pub name: String,
pub status: NodeStatus,
pub node_type: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GraphEdge {
pub from: String,
pub to: String,
pub label: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GraphData {
pub nodes: Vec<GraphNode>,
pub edges: Vec<GraphEdge>,
}
#[derive(Debug)]
pub enum MonitorCmd {
GetGraph(tokio::sync::oneshot::Sender<GraphData>),
UpdateNodeStatus { node_id: String, status: NodeStatus },
RegisterNode(GraphNode),
RegisterEdge(GraphEdge),
Clear,
}
unsafe impl Send for MonitorCmd {}
#[derive(Clone)]
pub struct SystemMonitorResource {
graph: GraphData,
}
impl SystemMonitorResource {
pub fn new() -> Self {
Self { graph: GraphData { nodes: Vec::new(), edges: Vec::new() } }
}
fn update_node_status(&mut self, node_id: String, status: NodeStatus) {
if let Some(node) = self.graph.nodes.iter_mut().find(|n| n.id == node_id) {
node.status = status;
}
}
fn register_node(&mut self, node: GraphNode) {
self.graph.nodes.retain(|n| n.id != node.id);
self.graph.nodes.push(node);
}
fn register_edge(&mut self, edge: GraphEdge) {
self.graph
.edges
.retain(|e| e.from != edge.from || e.to != edge.to);
self.graph.edges.push(edge);
}
fn clear(&mut self) {
self.graph.nodes.clear();
self.graph.edges.clear();
}
}
impl Default for SystemMonitorResource {
fn default() -> Self {
Self::new()
}
}
impl Interface for SystemMonitorResource {
type Cmd = MonitorCmd;
}
#[async_trait::async_trait]
impl crate::resource::backed::Dispatch<MonitorCmd> for SystemMonitorResource {
async fn dispatch(&mut self, cmd: MonitorCmd) {
match cmd {
MonitorCmd::GetGraph(tx) => {
let _ = tx.send(self.graph.clone());
}
MonitorCmd::UpdateNodeStatus { node_id, status } => {
self.update_node_status(node_id, status);
}
MonitorCmd::RegisterNode(node) => {
self.register_node(node);
}
MonitorCmd::RegisterEdge(edge) => {
self.register_edge(edge);
}
MonitorCmd::Clear => {
self.clear();
}
}
}
}
pub struct GraphVisualizer {
monitor_handle: ResHandle<SystemMonitorResource>,
output_path: PathBuf,
}
impl GraphVisualizer {
pub fn new(monitor_handle: ResHandle<SystemMonitorResource>, output_path: PathBuf) -> Self {
Self { monitor_handle, output_path }
}
}
impl Node for GraphVisualizer {
type Input = ();
type Output = ();
type Error = RoplatError;
async fn process(&mut self, _input: ()) -> Result<(), RoplatError> {
let (tx, rx) = tokio::sync::oneshot::channel();
let cmd = MonitorCmd::GetGraph(tx);
self.monitor_handle.backend().send(cmd).await;
let graph_data = rx
.await
.map_err(|e| RoplatError::Other(format!("获取图数据失败: {}", e)))?;
let dot_content = generate_dot(&graph_data);
tokio::fs::write(&self.output_path, dot_content).await?;
Ok(())
}
}
fn generate_dot(graph: &GraphData) -> String {
let mut dot = String::from("digraph RoplatGraph {\n");
dot.push_str(" rankdir=TB;\n");
dot.push_str(" node [shape=box, style=rounded];\n");
dot.push_str(" edge [fontsize=10];\n\n");
for node in &graph.nodes {
let color = match &node.status {
NodeStatus::Idle => "lightgray",
NodeStatus::Running => "lightblue",
NodeStatus::Completed => "lightgreen",
NodeStatus::Failed(_) => "lightcoral",
};
let label = match &node.status {
NodeStatus::Failed(msg) => format!("{}\\n(Failed: {})", node.name, msg),
_ => node.name.clone(),
};
dot.push_str(&format!(
" \"{}\" [label=\"{}\", fillcolor={}, style=filled, rounded];\n",
node.id, label, color
));
}
dot.push('\n');
for edge in &graph.edges {
if let Some(label) = &edge.label {
dot.push_str(&format!(
" \"{}\" -> \"{}\" [label=\"{}\"];\n",
edge.from, edge.to, label
));
} else {
dot.push_str(&format!(" \"{}\" -> \"{}\";\n", edge.from, edge.to));
}
}
dot.push_str("}\n");
dot
}
pub struct NodeStatusTracker {
node_id: String,
monitor_backend: backed::Local<SystemMonitorResource>,
}
impl NodeStatusTracker {
pub fn new(node_id: String, monitor_backend: backed::Local<SystemMonitorResource>) -> Self {
Self { node_id, monitor_backend }
}
pub async fn set_running(&self) {
let cmd = MonitorCmd::UpdateNodeStatus {
node_id: self.node_id.clone(),
status: NodeStatus::Running,
};
self.monitor_backend.send(cmd).await;
}
pub async fn set_completed(&self) {
let cmd = MonitorCmd::UpdateNodeStatus {
node_id: self.node_id.clone(),
status: NodeStatus::Completed,
};
self.monitor_backend.send(cmd).await;
}
pub async fn set_failed(&self, error: String) {
let cmd = MonitorCmd::UpdateNodeStatus {
node_id: self.node_id.clone(),
status: NodeStatus::Failed(error),
};
self.monitor_backend.send(cmd).await;
}
pub async fn set_idle(&self) {
let cmd = MonitorCmd::UpdateNodeStatus {
node_id: self.node_id.clone(),
status: NodeStatus::Idle,
};
self.monitor_backend.send(cmd).await;
}
}
pub struct GraphBuilder {
monitor_handle: ResHandle<SystemMonitorResource>,
}
impl GraphBuilder {
pub fn new(monitor_handle: ResHandle<SystemMonitorResource>) -> Self {
Self { monitor_handle }
}
pub async fn register_node(&self, node: GraphNode) {
let cmd = MonitorCmd::RegisterNode(node);
self.monitor_handle.backend().send(cmd).await;
}
pub async fn register_edge(&self, edge: GraphEdge) {
let cmd = MonitorCmd::RegisterEdge(edge);
self.monitor_handle.backend().send(cmd).await;
}
pub fn create_tracker(&self, node_id: String) -> NodeStatusTracker {
let backend = self.monitor_handle.backend().clone();
NodeStatusTracker::new(node_id, backend)
}
pub async fn clear(&self) {
let cmd = MonitorCmd::Clear;
self.monitor_handle.backend().send(cmd).await;
}
}
pub fn create_system_monitor() -> (
Arc<RwLock<SystemMonitorResource>>,
ResHandle<SystemMonitorResource>,
) {
let resource = Arc::new(RwLock::new(SystemMonitorResource::new()));
let backend = backed::Local::new(resource.clone());
let handle = ResHandle::from_backend(backend);
(resource, handle)
}
pub fn create_graph_visualizer(
monitor_handle: ResHandle<SystemMonitorResource>,
output_path: PathBuf,
) -> GraphVisualizer {
GraphVisualizer::new(monitor_handle, output_path)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_node_status_equality() {
let status1 = NodeStatus::Idle;
let status2 = NodeStatus::Idle;
let status3 = NodeStatus::Running;
assert_eq!(status1, status2);
assert_ne!(status1, status3);
}
#[test]
fn test_node_status_failed() {
let status1 = NodeStatus::Failed("Error 1".to_string());
let status2 = NodeStatus::Failed("Error 1".to_string());
let status3 = NodeStatus::Failed("Error 2".to_string());
assert_eq!(status1, status2);
assert_ne!(status1, status3);
}
#[test]
fn test_graph_node_creation() {
let node = GraphNode {
id: "node1".to_string(),
name: "Test Node".to_string(),
status: NodeStatus::Idle,
node_type: "TestType".to_string(),
};
assert_eq!(node.id, "node1");
assert_eq!(node.name, "Test Node");
assert_eq!(node.status, NodeStatus::Idle);
assert_eq!(node.node_type, "TestType");
}
#[test]
fn test_graph_edge_creation() {
let edge = GraphEdge {
from: "node1".to_string(),
to: "node2".to_string(),
label: Some("connection".to_string()),
};
assert_eq!(edge.from, "node1");
assert_eq!(edge.to, "node2");
assert_eq!(edge.label, Some("connection".to_string()));
}
#[test]
fn test_graph_edge_without_label() {
let edge = GraphEdge {
from: "node1".to_string(),
to: "node2".to_string(),
label: None,
};
assert_eq!(edge.from, "node1");
assert_eq!(edge.to, "node2");
assert!(edge.label.is_none());
}
#[test]
fn test_graph_data_creation() {
let graph = GraphData { nodes: vec![], edges: vec![] };
assert!(graph.nodes.is_empty());
assert!(graph.edges.is_empty());
}
#[test]
fn test_graph_data_with_nodes_and_edges() {
let nodes = vec![
GraphNode {
id: "node1".to_string(),
name: "Node 1".to_string(),
status: NodeStatus::Idle,
node_type: "Type1".to_string(),
},
GraphNode {
id: "node2".to_string(),
name: "Node 2".to_string(),
status: NodeStatus::Running,
node_type: "Type2".to_string(),
},
];
let edges = vec![GraphEdge {
from: "node1".to_string(),
to: "node2".to_string(),
label: Some("edge".to_string()),
}];
let graph = GraphData { nodes, edges };
assert_eq!(graph.nodes.len(), 2);
assert_eq!(graph.edges.len(), 1);
}
#[test]
fn test_system_monitor_resource_new() {
let monitor = SystemMonitorResource::new();
assert!(monitor.graph.nodes.is_empty());
assert!(monitor.graph.edges.is_empty());
}
#[test]
fn test_system_monitor_resource_default() {
let monitor = SystemMonitorResource::default();
assert!(monitor.graph.nodes.is_empty());
assert!(monitor.graph.edges.is_empty());
}
#[tokio::test]
async fn test_system_monitor_update_node_status() {
let mut monitor = SystemMonitorResource::new();
let node = GraphNode {
id: "node1".to_string(),
name: "Node 1".to_string(),
status: NodeStatus::Idle,
node_type: "Type1".to_string(),
};
monitor.register_node(node);
monitor.update_node_status("node1".to_string(), NodeStatus::Running);
assert_eq!(monitor.graph.nodes[0].status, NodeStatus::Running);
}
#[tokio::test]
async fn test_system_monitor_register_node() {
let mut monitor = SystemMonitorResource::new();
let node = GraphNode {
id: "node1".to_string(),
name: "Node 1".to_string(),
status: NodeStatus::Idle,
node_type: "Type1".to_string(),
};
monitor.register_node(node.clone());
assert_eq!(monitor.graph.nodes.len(), 1);
assert_eq!(monitor.graph.nodes[0].id, "node1");
}
#[tokio::test]
async fn test_system_monitor_register_node_replace() {
let mut monitor = SystemMonitorResource::new();
let node1 = GraphNode {
id: "node1".to_string(),
name: "Node 1".to_string(),
status: NodeStatus::Idle,
node_type: "Type1".to_string(),
};
let node2 = GraphNode {
id: "node1".to_string(),
name: "Node 1 Updated".to_string(),
status: NodeStatus::Running,
node_type: "Type2".to_string(),
};
monitor.register_node(node1);
monitor.register_node(node2);
assert_eq!(monitor.graph.nodes.len(), 1);
assert_eq!(monitor.graph.nodes[0].name, "Node 1 Updated");
assert_eq!(monitor.graph.nodes[0].status, NodeStatus::Running);
}
#[tokio::test]
async fn test_system_monitor_register_edge() {
let mut monitor = SystemMonitorResource::new();
let edge = GraphEdge {
from: "node1".to_string(),
to: "node2".to_string(),
label: Some("edge".to_string()),
};
monitor.register_edge(edge.clone());
assert_eq!(monitor.graph.edges.len(), 1);
assert_eq!(monitor.graph.edges[0].from, "node1");
}
#[tokio::test]
async fn test_system_monitor_register_edge_replace() {
let mut monitor = SystemMonitorResource::new();
let edge1 = GraphEdge {
from: "node1".to_string(),
to: "node2".to_string(),
label: Some("old label".to_string()),
};
let edge2 = GraphEdge {
from: "node1".to_string(),
to: "node2".to_string(),
label: Some("new label".to_string()),
};
monitor.register_edge(edge1);
monitor.register_edge(edge2);
assert_eq!(monitor.graph.edges.len(), 1);
assert_eq!(monitor.graph.edges[0].label, Some("new label".to_string()));
}
#[tokio::test]
async fn test_system_monitor_clear() {
let mut monitor = SystemMonitorResource::new();
let node = GraphNode {
id: "node1".to_string(),
name: "Node 1".to_string(),
status: NodeStatus::Idle,
node_type: "Type1".to_string(),
};
let edge = GraphEdge {
from: "node1".to_string(),
to: "node2".to_string(),
label: Some("edge".to_string()),
};
monitor.register_node(node);
monitor.register_edge(edge);
assert_eq!(monitor.graph.nodes.len(), 1);
assert_eq!(monitor.graph.edges.len(), 1);
monitor.clear();
assert!(monitor.graph.nodes.is_empty());
assert!(monitor.graph.edges.is_empty());
}
#[test]
fn test_monitor_cmd_get_graph() {
let (tx, _rx) = tokio::sync::oneshot::channel();
let cmd = MonitorCmd::GetGraph(tx);
match cmd {
MonitorCmd::GetGraph(_) => {}
_ => panic!("Expected GetGraph command"),
}
}
#[test]
fn test_monitor_cmd_update_node_status() {
let cmd = MonitorCmd::UpdateNodeStatus {
node_id: "node1".to_string(),
status: NodeStatus::Running,
};
match cmd {
MonitorCmd::UpdateNodeStatus { node_id, status } => {
assert_eq!(node_id, "node1");
assert_eq!(status, NodeStatus::Running);
}
_ => panic!("Expected UpdateNodeStatus command"),
}
}
#[test]
fn test_monitor_cmd_register_node() {
let node = GraphNode {
id: "node1".to_string(),
name: "Node 1".to_string(),
status: NodeStatus::Idle,
node_type: "Type1".to_string(),
};
let cmd = MonitorCmd::RegisterNode(node.clone());
match cmd {
MonitorCmd::RegisterNode(n) => {
assert_eq!(n.id, "node1");
}
_ => panic!("Expected RegisterNode command"),
}
}
#[test]
fn test_monitor_cmd_clear() {
let cmd = MonitorCmd::Clear;
match cmd {
MonitorCmd::Clear => {}
_ => panic!("Expected Clear command"),
}
}
#[test]
fn test_generate_dot_empty_graph() {
let graph = GraphData { nodes: vec![], edges: vec![] };
let dot = generate_dot(&graph);
assert!(dot.contains("digraph RoplatGraph"));
assert!(dot.contains("rankdir=TB"));
}
#[test]
fn test_generate_dot_with_nodes() {
let graph = GraphData {
nodes: vec![
GraphNode {
id: "node1".to_string(),
name: "Node 1".to_string(),
status: NodeStatus::Idle,
node_type: "Type1".to_string(),
},
GraphNode {
id: "node2".to_string(),
name: "Node 2".to_string(),
status: NodeStatus::Running,
node_type: "Type2".to_string(),
},
],
edges: vec![],
};
let dot = generate_dot(&graph);
assert!(dot.contains("node1"));
assert!(dot.contains("node2"));
assert!(dot.contains("Node 1"));
assert!(dot.contains("Node 2"));
assert!(dot.contains("fillcolor=lightgray"));
assert!(dot.contains("fillcolor=lightblue"));
}
#[test]
fn test_generate_dot_with_edges() {
let graph = GraphData {
nodes: vec![
GraphNode {
id: "node1".to_string(),
name: "Node 1".to_string(),
status: NodeStatus::Idle,
node_type: "Type1".to_string(),
},
GraphNode {
id: "node2".to_string(),
name: "Node 2".to_string(),
status: NodeStatus::Idle,
node_type: "Type2".to_string(),
},
],
edges: vec![GraphEdge {
from: "node1".to_string(),
to: "node2".to_string(),
label: Some("connection".to_string()),
}],
};
let dot = generate_dot(&graph);
assert!(dot.contains("\"node1\" -> \"node2\""));
assert!(dot.contains("connection"));
}
#[test]
fn test_generate_dot_with_failed_node() {
let graph = GraphData {
nodes: vec![GraphNode {
id: "node1".to_string(),
name: "Node 1".to_string(),
status: NodeStatus::Failed("Error message".to_string()),
node_type: "Type1".to_string(),
}],
edges: vec![],
};
let dot = generate_dot(&graph);
assert!(dot.contains("Failed: Error message"));
assert!(dot.contains("fillcolor=lightcoral"));
}
#[test]
fn test_generate_dot_with_completed_node() {
let graph = GraphData {
nodes: vec![GraphNode {
id: "node1".to_string(),
name: "Node 1".to_string(),
status: NodeStatus::Completed,
node_type: "Type1".to_string(),
}],
edges: vec![],
};
let dot = generate_dot(&graph);
assert!(dot.contains("fillcolor=lightgreen"));
}
#[tokio::test]
async fn test_create_system_monitor() {
let (_resource, _handle) = create_system_monitor();
assert!(true);
}
#[test]
fn test_create_graph_visualizer() {
let (_resource, handle) = create_system_monitor();
let output_path = PathBuf::from("/tmp/test.dot");
let _visualizer = create_graph_visualizer(handle, output_path);
assert!(true);
}
}