1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// DISABLED: This test requires the deprecated loopback API and .close() method
// The loopback API (Relay::new()) is now internal and .close() is not publicly accessible.
// Closing is handled by dropping RelaySender in the new channel API.
/*
use pipedream_rs::{PipeExt, Relay};
use std::time::Duration;
use tokio::time::timeout;
#[ignore]
#[tokio::test]
async fn test_forward_is_awaitable_and_blocking() {
let source = Relay::new();
let target = Relay::new();
let mut sub = target.subscribe::<String>();
// Spawn the forwarder - it should block!
let source_clone = source.clone();
let target_clone = target.clone();
let forward_handle = tokio::spawn(async move {
source_clone.forward(&target_clone).await;
});
// Give forwarder time to subscribe (async fn is lazy)
tokio::time::sleep(Duration::from_millis(50)).await;
// Send a message
source.send("hello".to_string()).await.expect("send failed");
// partial wait to ensure forwarding happens
let msg = timeout(Duration::from_millis(100), sub.recv())
.await
.expect("timed out")
.expect("stream closed");
assert_eq!(*msg, "hello");
// Ensure forward_handle is still running (it returns () when done)
let is_done = forward_handle.is_finished();
assert!(!is_done, "Forward should still be running (blocked on source)");
// Close source - requires .close() method which is now private
source.close();
// Now forward should finish
timeout(Duration::from_millis(100), forward_handle)
.await
.expect("forward did not exit after source close")
.expect("forward task panic");
}
*/