#[cfg(test)]
#[allow(clippy::module_inception)]
mod tests {
use std::fs;
use std::path::PathBuf;
use tempfile::TempDir;
use crate::script_discovery::{
ComponentRegistry, ComponentType, DiscoveredScriptActor, RuntimeRequirements,
ScriptActorDiscovery, ScriptActorMetadata, ScriptDiscoveryConfig, ScriptRuntime,
};
use std::collections::HashMap;
#[tokio::test]
async fn test_discover_python_actor() {
let temp_dir = TempDir::new().unwrap();
let actor_file = temp_dir.path().join("test_actor.actor.py");
let python_code = r#"
from reflow import actor, ActorContext, Message
from typing import Dict
@actor(
name="TestActor",
inports=["input"],
outports=["output"],
version="1.0.0",
description="Test actor for unit tests",
tags=["test", "example"]
)
async def process(context: ActorContext) -> Dict[str, Message]:
"""Process test messages."""
input_msg = context.payload.get("input")
if input_msg:
return {"output": Message.string("processed")}
return {}
"#;
fs::write(&actor_file, python_code).unwrap();
let config = ScriptDiscoveryConfig {
root_path: temp_dir.path().to_path_buf(),
patterns: vec!["**/*.actor.py".to_string()],
excluded_paths: vec![],
max_depth: Some(10),
auto_register: false,
validate_metadata: true,
};
let discovery = ScriptActorDiscovery::new(config);
let result = discovery.discover_actors().await;
match result {
Ok(discovered) => {
assert_eq!(discovered.actors.len(), 1);
let actor = &discovered.actors[0];
assert_eq!(actor.component, "TestActor");
assert_eq!(actor.runtime, ScriptRuntime::Python);
assert_eq!(actor.description, "Test actor for unit tests");
assert_eq!(actor.inports.len(), 1);
assert_eq!(actor.outports.len(), 1);
}
Err(e) => {
println!("Skipping test: {}", e);
}
}
}
#[tokio::test]
async fn test_discover_javascript_actor() {
let temp_dir = TempDir::new().unwrap();
let actor_file = temp_dir.path().join("test_actor.actor.js");
let js_code = r#"
const { actor, ActorContext, Message } = require('reflow');
/**
* Test actor for JavaScript
*/
@actor({
name: "TestJSActor",
inports: ["data"],
outports: ["result"],
version: "1.0.0",
tags: ["test", "javascript"]
})
async function processData(context) {
const data = context.payload.get("data");
if (data) {
return { result: Message.object({ processed: true }) };
}
return {};
}
module.exports = processData;
"#;
fs::write(&actor_file, js_code).unwrap();
let config = ScriptDiscoveryConfig {
root_path: temp_dir.path().to_path_buf(),
patterns: vec!["**/*.actor.js".to_string()],
excluded_paths: vec![],
max_depth: Some(10),
auto_register: false,
validate_metadata: true,
};
let discovery = ScriptActorDiscovery::new(config);
let result = discovery.discover_actors().await;
match result {
Ok(discovered) => {
if !discovered.actors.is_empty() {
let actor = &discovered.actors[0];
assert_eq!(actor.component, "TestJSActor");
assert_eq!(actor.runtime, ScriptRuntime::JavaScript);
assert_eq!(actor.inports.len(), 1);
assert_eq!(actor.outports.len(), 1);
} else {
println!("JavaScript actor discovery not fully implemented yet");
}
}
Err(e) => {
println!("Skipping test: {}", e);
}
}
}
#[test]
fn test_script_runtime_from_extension() {
assert_eq!(
ScriptRuntime::from_extension("py"),
Some(ScriptRuntime::Python)
);
assert_eq!(
ScriptRuntime::from_extension("js"),
Some(ScriptRuntime::JavaScript)
);
assert_eq!(
ScriptRuntime::from_extension("mjs"),
Some(ScriptRuntime::JavaScript)
);
assert_eq!(ScriptRuntime::from_extension("txt"), None);
}
#[test]
fn test_component_registry() {
let mut registry = ComponentRegistry::new();
let metadata = DiscoveredScriptActor {
component: "TestComponent".to_string(),
description: "Test description".to_string(),
file_path: PathBuf::from("/test/path.py"),
runtime: ScriptRuntime::Python,
inports: vec![],
outports: vec![],
workspace_metadata: ScriptActorMetadata {
namespace: "test.namespace".to_string(),
version: "1.0.0".to_string(),
author: None,
dependencies: vec![],
runtime_requirements: RuntimeRequirements {
runtime_version: "3.9".to_string(),
memory_limit: "512MB".to_string(),
cpu_limit: Some(0.5),
timeout: 30,
env_vars: HashMap::new(),
},
config_schema: None,
tags: vec![],
category: None,
source_hash: "test_hash".to_string(),
last_modified: chrono::Utc::now(),
},
};
registry
.register_script_actor("TestComponent", metadata)
.unwrap();
assert!(registry.has_component("TestComponent"));
assert!(matches!(
registry.get_component_type("TestComponent"),
Some(ComponentType::Script(ScriptRuntime::Python))
));
assert_eq!(registry.total_count(), 1);
let counts = registry.count_by_type();
assert_eq!(counts.get("python"), Some(&1));
}
#[tokio::test]
async fn test_websocket_rpc_types() {
use crate::websocket_rpc::*;
let request = RpcRequest::new(
"test-id".to_string(),
"process".to_string(),
serde_json::json!({"test": "data"}),
);
let json = serde_json::to_string(&request).unwrap();
assert!(json.contains("\"jsonrpc\":\"2.0\""));
assert!(json.contains("\"id\":\"test-id\""));
assert!(json.contains("\"method\":\"process\""));
let response_json = r#"{
"jsonrpc": "2.0",
"id": "test-id",
"result": {"status": "ok"}
}"#;
let response: RpcResponse = serde_json::from_str(response_json).unwrap();
assert_eq!(response.id, "test-id");
assert!(response.result.is_some());
assert!(response.error.is_none());
let notification = RpcNotification {
jsonrpc: "2.0".to_string(),
method: "output".to_string(),
params: serde_json::json!({
"actor_id": "test",
"port": "out",
"data": {"value": 42}
}),
};
let json = serde_json::to_string(¬ification).unwrap();
assert!(json.contains("\"jsonrpc\":\"2.0\""));
assert!(json.contains("\"method\":\"output\""));
assert!(!json.contains("\"id\""));
let output_json = r#"{
"actor_id": "test_actor",
"port": "output_port",
"data": {"type": "integer", "value": 123},
"timestamp": 1234567890
}"#;
let output: ScriptOutput = serde_json::from_str(output_json).unwrap();
assert_eq!(output.actor_id, "test_actor");
assert_eq!(output.port, "output_port");
assert_eq!(output.timestamp, 1234567890);
}
#[tokio::test]
async fn test_websocket_server_basic() {
use crate::script_discovery::test_helpers::test_server::TestWebSocketServer;
println!("Starting basic WebSocket test...");
let server = TestWebSocketServer::start().await;
println!("Server started on port: {}", server.port);
assert!(server.port > 0);
assert!(server.url.starts_with("ws://"));
println!("Shutting down server...");
server.shutdown().await;
println!("Server shutdown complete");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_websocket_script_actor_integration() {
use crate::script_discovery::test_helpers::test_server::TestWebSocketServer;
use crate::script_discovery::*;
use crate::websocket_rpc::*;
use reflow_actor::message::Message;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::time::{Duration, sleep};
println!("Starting test WebSocket server...");
let server = TestWebSocketServer::start().await;
let ws_url = server.url.clone();
println!("Test server started at: {}", ws_url);
let metadata = DiscoveredScriptActor {
component: "test_actor".to_string(),
description: "Test WebSocket actor".to_string(),
file_path: PathBuf::from("/test/test_actor.py"),
runtime: ScriptRuntime::Python,
inports: vec![],
outports: vec![],
workspace_metadata: ScriptActorMetadata {
namespace: "test".to_string(),
version: "1.0.0".to_string(),
author: None,
dependencies: vec![],
runtime_requirements: RuntimeRequirements {
runtime_version: "3.9".to_string(),
memory_limit: "512MB".to_string(),
cpu_limit: Some(0.5),
timeout: 30,
env_vars: HashMap::new(),
},
config_schema: None,
tags: vec![],
category: None,
source_hash: "test_hash".to_string(),
last_modified: chrono::Utc::now(),
},
};
println!("Creating WebSocket RPC client for URL: {}", ws_url);
let rpc_client = Arc::new(WebSocketRpcClient::new(ws_url));
println!("Creating WebSocketScriptActor...");
let mut actor =
WebSocketScriptActor::new(metadata, rpc_client, "redis://localhost:6379".to_string())
.await;
println!("Connecting to WebSocket server...");
let connect_result =
tokio::time::timeout(Duration::from_secs(2), actor.rpc_client.connect()).await;
match connect_result {
Ok(Ok(())) => println!("Connected to test server"),
Ok(Err(e)) => panic!("Failed to connect: {}", e),
Err(_) => panic!("Connection timed out"),
}
println!("Preparing to process message...");
let mut inputs = HashMap::new();
inputs.insert(
"input".to_string(),
Message::string("test data".to_string()),
);
println!("Calling process_message...");
let outputs = actor.process_message(inputs).await.unwrap();
println!("Got outputs: {:?}", outputs.keys().collect::<Vec<_>>());
assert_eq!(outputs.len(), 1);
assert!(outputs.contains_key("output"));
if let Some(Message::String(s)) = outputs.get("output") {
assert_eq!(s.as_ref(), "processed");
} else {
panic!("Expected string output");
}
sleep(Duration::from_millis(50)).await;
actor.rpc_client.disconnect().await.unwrap();
server.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_script_actor_with_streaming_outputs() {
use crate::script_discovery::test_helpers::test_server::TestWebSocketServer;
use crate::script_discovery::*;
use crate::websocket_rpc::*;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::time::Duration;
let server = TestWebSocketServer::start().await;
let ws_url = server.url.clone();
let metadata = DiscoveredScriptActor {
component: "streaming_actor".to_string(),
description: "Test streaming WebSocket actor".to_string(),
file_path: PathBuf::from("/test/streaming_actor.py"),
runtime: ScriptRuntime::Python,
inports: vec![],
outports: vec![],
workspace_metadata: ScriptActorMetadata {
namespace: "test".to_string(),
version: "1.0.0".to_string(),
author: None,
dependencies: vec![],
runtime_requirements: RuntimeRequirements {
runtime_version: "3.9".to_string(),
memory_limit: "512MB".to_string(),
cpu_limit: Some(0.5),
timeout: 30,
env_vars: HashMap::new(),
},
config_schema: None,
tags: vec![],
category: None,
source_hash: "test_hash".to_string(),
last_modified: chrono::Utc::now(),
},
};
let rpc_client = Arc::new(WebSocketRpcClient::new(ws_url));
let _actor = WebSocketScriptActor::new(
metadata,
rpc_client.clone(),
"redis://localhost:6379".to_string(),
)
.await;
let (output_tx, output_rx) = flume::unbounded::<ScriptOutput>();
rpc_client.set_output_channel(output_tx);
let connect_result =
tokio::time::timeout(std::time::Duration::from_secs(2), rpc_client.connect()).await;
match connect_result {
Ok(Ok(())) => println!("Connected to streaming test server"),
Ok(Err(e)) => panic!("Failed to connect to streaming server: {}", e),
Err(_) => panic!("Connection to streaming server timed out"),
}
let inputs: HashMap<String, serde_json::Value> = HashMap::new();
let _result = rpc_client
.call(
"stream",
serde_json::json!({
"payload": inputs,
"config": {},
"actor_id": "streaming_actor",
"timestamp": 0
}),
)
.await
.unwrap();
let mut streamed_values = Vec::new();
for i in 0..3 {
println!("Waiting for output {}...", i);
match tokio::time::timeout(Duration::from_secs(1), output_rx.recv_async()).await {
Ok(Ok(output)) => {
println!("Received output: {:?}", output);
assert_eq!(output.actor_id, "streaming_actor");
assert_eq!(output.port, "stream");
if let Some(val) = output.data.get("value").and_then(|v| v.as_i64()) {
streamed_values.push(val);
}
}
Ok(Err(e)) => {
println!("Failed to receive output: {}", e);
break;
}
Err(_) => {
println!("Timeout waiting for output {}", i);
break;
}
}
}
assert_eq!(streamed_values, vec![0, 1, 2]);
rpc_client.disconnect().await.unwrap();
server.shutdown().await;
}
}