reflow_network 0.2.1

Network executor for Reflow — routes messages between actors, manages subgraphs, and emits runtime events.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
#[cfg(test)]
#[allow(clippy::module_inception)]
mod tests {
    use std::fs;
    use std::path::PathBuf;
    use tempfile::TempDir;

    use crate::script_discovery::{
        ComponentRegistry, ComponentType, DiscoveredScriptActor, RuntimeRequirements,
        ScriptActorDiscovery, ScriptActorMetadata, ScriptDiscoveryConfig, ScriptRuntime,
    };
    use std::collections::HashMap;

    #[tokio::test]
    async fn test_discover_python_actor() {
        // Create a temporary directory
        let temp_dir = TempDir::new().unwrap();
        let actor_file = temp_dir.path().join("test_actor.actor.py");

        // Write a Python actor file
        let python_code = r#"
from reflow import actor, ActorContext, Message
from typing import Dict

@actor(
    name="TestActor",
    inports=["input"],
    outports=["output"],
    version="1.0.0",
    description="Test actor for unit tests",
    tags=["test", "example"]
)
async def process(context: ActorContext) -> Dict[str, Message]:
    """Process test messages."""
    input_msg = context.payload.get("input")
    if input_msg:
        return {"output": Message.string("processed")}
    return {}
"#;

        fs::write(&actor_file, python_code).unwrap();

        // Create discovery config
        let config = ScriptDiscoveryConfig {
            root_path: temp_dir.path().to_path_buf(),
            patterns: vec!["**/*.actor.py".to_string()],
            excluded_paths: vec![],
            max_depth: Some(10),
            auto_register: false,
            validate_metadata: true,
        };

        // Discover actors
        let discovery = ScriptActorDiscovery::new(config);
        let result = discovery.discover_actors().await;

        // Check if Python is available
        match result {
            Ok(discovered) => {
                assert_eq!(discovered.actors.len(), 1);
                let actor = &discovered.actors[0];
                assert_eq!(actor.component, "TestActor");
                assert_eq!(actor.runtime, ScriptRuntime::Python);
                assert_eq!(actor.description, "Test actor for unit tests");
                assert_eq!(actor.inports.len(), 1);
                assert_eq!(actor.outports.len(), 1);
            }
            Err(e) => {
                // Python might not be installed
                println!("Skipping test: {}", e);
            }
        }
    }

    #[tokio::test]
    async fn test_discover_javascript_actor() {
        // Create a temporary directory
        let temp_dir = TempDir::new().unwrap();
        let actor_file = temp_dir.path().join("test_actor.actor.js");

        // Write a JavaScript actor file
        let js_code = r#"
const { actor, ActorContext, Message } = require('reflow');

/**
 * Test actor for JavaScript
 */
@actor({
    name: "TestJSActor",
    inports: ["data"],
    outports: ["result"],
    version: "1.0.0",
    tags: ["test", "javascript"]
})
async function processData(context) {
    const data = context.payload.get("data");
    if (data) {
        return { result: Message.object({ processed: true }) };
    }
    return {};
}

module.exports = processData;
"#;

        fs::write(&actor_file, js_code).unwrap();

        // Create discovery config
        let config = ScriptDiscoveryConfig {
            root_path: temp_dir.path().to_path_buf(),
            patterns: vec!["**/*.actor.js".to_string()],
            excluded_paths: vec![],
            max_depth: Some(10),
            auto_register: false,
            validate_metadata: true,
        };

        // Discover actors
        let discovery = ScriptActorDiscovery::new(config);
        let result = discovery.discover_actors().await;

        // Check if Node.js is available
        match result {
            Ok(discovered) => {
                // JavaScript discovery might not work yet, so just check it doesn't crash
                if !discovered.actors.is_empty() {
                    let actor = &discovered.actors[0];
                    assert_eq!(actor.component, "TestJSActor");
                    assert_eq!(actor.runtime, ScriptRuntime::JavaScript);
                    assert_eq!(actor.inports.len(), 1);
                    assert_eq!(actor.outports.len(), 1);
                } else {
                    // JavaScript metadata extraction not implemented yet
                    println!("JavaScript actor discovery not fully implemented yet");
                }
            }
            Err(e) => {
                // Node.js might not be installed
                println!("Skipping test: {}", e);
            }
        }
    }

    #[test]
    fn test_script_runtime_from_extension() {
        assert_eq!(
            ScriptRuntime::from_extension("py"),
            Some(ScriptRuntime::Python)
        );
        assert_eq!(
            ScriptRuntime::from_extension("js"),
            Some(ScriptRuntime::JavaScript)
        );
        assert_eq!(
            ScriptRuntime::from_extension("mjs"),
            Some(ScriptRuntime::JavaScript)
        );
        assert_eq!(ScriptRuntime::from_extension("txt"), None);
    }

    #[test]
    fn test_component_registry() {
        let mut registry = ComponentRegistry::new();

        // Create test metadata
        let metadata = DiscoveredScriptActor {
            component: "TestComponent".to_string(),
            description: "Test description".to_string(),
            file_path: PathBuf::from("/test/path.py"),
            runtime: ScriptRuntime::Python,
            inports: vec![],
            outports: vec![],
            workspace_metadata: ScriptActorMetadata {
                namespace: "test.namespace".to_string(),
                version: "1.0.0".to_string(),
                author: None,
                dependencies: vec![],
                runtime_requirements: RuntimeRequirements {
                    runtime_version: "3.9".to_string(),
                    memory_limit: "512MB".to_string(),
                    cpu_limit: Some(0.5),
                    timeout: 30,
                    env_vars: HashMap::new(),
                },
                config_schema: None,
                tags: vec![],
                category: None,
                source_hash: "test_hash".to_string(),
                last_modified: chrono::Utc::now(),
            },
        };

        // Register script actor
        registry
            .register_script_actor("TestComponent", metadata)
            .unwrap();

        // Check registration
        assert!(registry.has_component("TestComponent"));
        assert!(matches!(
            registry.get_component_type("TestComponent"),
            Some(ComponentType::Script(ScriptRuntime::Python))
        ));

        // Check counts
        assert_eq!(registry.total_count(), 1);
        let counts = registry.count_by_type();
        assert_eq!(counts.get("python"), Some(&1));
    }

    #[tokio::test]
    async fn test_websocket_rpc_types() {
        use crate::websocket_rpc::*;

        // Test RPC request serialization
        let request = RpcRequest::new(
            "test-id".to_string(),
            "process".to_string(),
            serde_json::json!({"test": "data"}),
        );

        let json = serde_json::to_string(&request).unwrap();
        assert!(json.contains("\"jsonrpc\":\"2.0\""));
        assert!(json.contains("\"id\":\"test-id\""));
        assert!(json.contains("\"method\":\"process\""));

        // Test RPC response deserialization
        let response_json = r#"{
            "jsonrpc": "2.0",
            "id": "test-id",
            "result": {"status": "ok"}
        }"#;

        let response: RpcResponse = serde_json::from_str(response_json).unwrap();
        assert_eq!(response.id, "test-id");
        assert!(response.result.is_some());
        assert!(response.error.is_none());

        // Test RPC notification
        let notification = RpcNotification {
            jsonrpc: "2.0".to_string(),
            method: "output".to_string(),
            params: serde_json::json!({
                "actor_id": "test",
                "port": "out",
                "data": {"value": 42}
            }),
        };

        let json = serde_json::to_string(&notification).unwrap();
        assert!(json.contains("\"jsonrpc\":\"2.0\""));
        assert!(json.contains("\"method\":\"output\""));
        assert!(!json.contains("\"id\"")); // Notifications don't have IDs

        // Test ScriptOutput deserialization
        let output_json = r#"{
            "actor_id": "test_actor",
            "port": "output_port",
            "data": {"type": "integer", "value": 123},
            "timestamp": 1234567890
        }"#;

        let output: ScriptOutput = serde_json::from_str(output_json).unwrap();
        assert_eq!(output.actor_id, "test_actor");
        assert_eq!(output.port, "output_port");
        assert_eq!(output.timestamp, 1234567890);
    }

    #[tokio::test]
    async fn test_websocket_server_basic() {
        use crate::script_discovery::test_helpers::test_server::TestWebSocketServer;

        println!("Starting basic WebSocket test...");
        // Just test that the server starts and stops
        let server = TestWebSocketServer::start().await;
        println!("Server started on port: {}", server.port);
        assert!(server.port > 0);
        assert!(server.url.starts_with("ws://"));
        println!("Shutting down server...");
        server.shutdown().await;
        println!("Server shutdown complete");
    }

    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
    async fn test_websocket_script_actor_integration() {
        use crate::script_discovery::test_helpers::test_server::TestWebSocketServer;
        use crate::script_discovery::*;
        use crate::websocket_rpc::*;
        use reflow_actor::message::Message;
        use std::collections::HashMap;
        use std::sync::Arc;
        use tokio::time::{Duration, sleep};

        // Start the test WebSocket server
        println!("Starting test WebSocket server...");
        let server = TestWebSocketServer::start().await;
        let ws_url = server.url.clone();
        println!("Test server started at: {}", ws_url);

        // Create test metadata
        let metadata = DiscoveredScriptActor {
            component: "test_actor".to_string(),
            description: "Test WebSocket actor".to_string(),
            file_path: PathBuf::from("/test/test_actor.py"),
            runtime: ScriptRuntime::Python,
            inports: vec![],
            outports: vec![],
            workspace_metadata: ScriptActorMetadata {
                namespace: "test".to_string(),
                version: "1.0.0".to_string(),
                author: None,
                dependencies: vec![],
                runtime_requirements: RuntimeRequirements {
                    runtime_version: "3.9".to_string(),
                    memory_limit: "512MB".to_string(),
                    cpu_limit: Some(0.5),
                    timeout: 30,
                    env_vars: HashMap::new(),
                },
                config_schema: None,
                tags: vec![],
                category: None,
                source_hash: "test_hash".to_string(),
                last_modified: chrono::Utc::now(),
            },
        };

        // Create WebSocket RPC client
        println!("Creating WebSocket RPC client for URL: {}", ws_url);
        let rpc_client = Arc::new(WebSocketRpcClient::new(ws_url));

        // Create WebSocketScriptActor
        println!("Creating WebSocketScriptActor...");
        let mut actor =
            WebSocketScriptActor::new(metadata, rpc_client, "redis://localhost:6379".to_string())
                .await;

        // Connect to the mock server with timeout
        println!("Connecting to WebSocket server...");
        let connect_result =
            tokio::time::timeout(Duration::from_secs(2), actor.rpc_client.connect()).await;

        match connect_result {
            Ok(Ok(())) => println!("Connected to test server"),
            Ok(Err(e)) => panic!("Failed to connect: {}", e),
            Err(_) => panic!("Connection timed out"),
        }

        // Test processing a message
        println!("Preparing to process message...");
        let mut inputs = HashMap::new();
        inputs.insert(
            "input".to_string(),
            Message::string("test data".to_string()),
        );

        println!("Calling process_message...");
        let outputs = actor.process_message(inputs).await.unwrap();
        println!("Got outputs: {:?}", outputs.keys().collect::<Vec<_>>());

        // Check the synchronous output
        assert_eq!(outputs.len(), 1);
        assert!(outputs.contains_key("output"));
        if let Some(Message::String(s)) = outputs.get("output") {
            assert_eq!(s.as_ref(), "processed");
        } else {
            panic!("Expected string output");
        }

        // Give time for async output to arrive
        sleep(Duration::from_millis(50)).await;

        // Cleanup
        actor.rpc_client.disconnect().await.unwrap();
        server.shutdown().await;
    }

    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
    async fn test_script_actor_with_streaming_outputs() {
        use crate::script_discovery::test_helpers::test_server::TestWebSocketServer;
        use crate::script_discovery::*;
        use crate::websocket_rpc::*;
        use std::collections::HashMap;
        use std::sync::Arc;
        use tokio::time::Duration;

        // Start the test WebSocket server
        let server = TestWebSocketServer::start().await;
        let ws_url = server.url.clone();

        // Create test metadata for streaming actor
        let metadata = DiscoveredScriptActor {
            component: "streaming_actor".to_string(),
            description: "Test streaming WebSocket actor".to_string(),
            file_path: PathBuf::from("/test/streaming_actor.py"),
            runtime: ScriptRuntime::Python,
            inports: vec![],
            outports: vec![],
            workspace_metadata: ScriptActorMetadata {
                namespace: "test".to_string(),
                version: "1.0.0".to_string(),
                author: None,
                dependencies: vec![],
                runtime_requirements: RuntimeRequirements {
                    runtime_version: "3.9".to_string(),
                    memory_limit: "512MB".to_string(),
                    cpu_limit: Some(0.5),
                    timeout: 30,
                    env_vars: HashMap::new(),
                },
                config_schema: None,
                tags: vec![],
                category: None,
                source_hash: "test_hash".to_string(),
                last_modified: chrono::Utc::now(),
            },
        };

        // Create components
        let rpc_client = Arc::new(WebSocketRpcClient::new(ws_url));

        let _actor = WebSocketScriptActor::new(
            metadata,
            rpc_client.clone(),
            "redis://localhost:6379".to_string(),
        )
        .await;

        // Set output channel AFTER new() which creates its own internal channel
        let (output_tx, output_rx) = flume::unbounded::<ScriptOutput>();
        rpc_client.set_output_channel(output_tx);

        // Connect with timeout
        let connect_result =
            tokio::time::timeout(std::time::Duration::from_secs(2), rpc_client.connect()).await;

        match connect_result {
            Ok(Ok(())) => println!("Connected to streaming test server"),
            Ok(Err(e)) => panic!("Failed to connect to streaming server: {}", e),
            Err(_) => panic!("Connection to streaming server timed out"),
        }

        // Process a message that triggers streaming
        let inputs: HashMap<String, serde_json::Value> = HashMap::new();
        let _result = rpc_client
            .call(
                "stream",
                serde_json::json!({
                    "payload": inputs,
                    "config": {},
                    "actor_id": "streaming_actor",
                    "timestamp": 0
                }),
            )
            .await
            .unwrap();

        // Collect streamed outputs with timeout
        let mut streamed_values = Vec::new();
        for i in 0..3 {
            println!("Waiting for output {}...", i);
            match tokio::time::timeout(Duration::from_secs(1), output_rx.recv_async()).await {
                Ok(Ok(output)) => {
                    println!("Received output: {:?}", output);
                    assert_eq!(output.actor_id, "streaming_actor");
                    assert_eq!(output.port, "stream");
                    if let Some(val) = output.data.get("value").and_then(|v| v.as_i64()) {
                        streamed_values.push(val);
                    }
                }
                Ok(Err(e)) => {
                    println!("Failed to receive output: {}", e);
                    break;
                }
                Err(_) => {
                    println!("Timeout waiting for output {}", i);
                    break;
                }
            }
        }

        // Verify we received all streamed values
        assert_eq!(streamed_values, vec![0, 1, 2]);

        // Cleanup
        rpc_client.disconnect().await.unwrap();
        server.shutdown().await;
    }
}