testing_echo_client/
testing_echo_client.rs

1use async_trait::async_trait;
2use binary_options_tools_core_pre::builder::ClientBuilder;
3use binary_options_tools_core_pre::connector::ConnectorResult;
4use binary_options_tools_core_pre::connector::{Connector, WsStream};
5use binary_options_tools_core_pre::error::{CoreError, CoreResult};
6use binary_options_tools_core_pre::testing::{TestingWrapper, TestingWrapperBuilder};
7use binary_options_tools_core_pre::traits::{ApiModule, Rule};
8use kanal::{AsyncReceiver, AsyncSender};
9use std::sync::Arc;
10use std::sync::atomic::{AtomicBool, Ordering};
11use std::time::Duration;
12use tokio_tungstenite::connect_async;
13use tokio_tungstenite::tungstenite::Message;
14
15struct DummyConnector {
16    url: String,
17}
18
19impl DummyConnector {
20    pub fn new(url: String) -> Self {
21        Self { url }
22    }
23}
24
25#[async_trait::async_trait]
26impl Connector<()> for DummyConnector {
27    async fn connect(&self, _: Arc<()>) -> ConnectorResult<WsStream> {
28        println!("Connecting to {}", self.url);
29        let wsstream = connect_async(&self.url).await.unwrap();
30        Ok(wsstream.0)
31    }
32
33    async fn disconnect(&self) -> ConnectorResult<()> {
34        println!("Disconnecting from {}", self.url);
35        Ok(())
36    }
37}
38
39// --- ApiModule 1: EchoModule ---
40pub struct EchoModule {
41    to_ws: AsyncSender<Message>,
42    cmd_rx: AsyncReceiver<String>,
43    cmd_tx: AsyncSender<String>,
44    msg_rx: AsyncReceiver<Arc<Message>>,
45    echo: AtomicBool,
46}
47
48#[async_trait]
49impl ApiModule<()> for EchoModule {
50    type Command = String;
51    type CommandResponse = String;
52    type Handle = EchoHandle;
53
54    fn new(
55        _state: Arc<()>,
56        cmd_rx: AsyncReceiver<Self::Command>,
57        cmd_ret_tx: AsyncSender<Self::CommandResponse>,
58        msg_rx: AsyncReceiver<Arc<Message>>,
59        to_ws: AsyncSender<Message>,
60    ) -> Self {
61        Self {
62            to_ws,
63            cmd_rx,
64            cmd_tx: cmd_ret_tx,
65            msg_rx,
66            echo: AtomicBool::new(false),
67        }
68    }
69
70    fn create_handle(
71        sender: AsyncSender<Self::Command>,
72        receiver: AsyncReceiver<Self::CommandResponse>,
73    ) -> Self::Handle {
74        EchoHandle { sender, receiver }
75    }
76
77    async fn run(&mut self) -> CoreResult<()> {
78        loop {
79            tokio::select! {
80                Ok(cmd) = self.cmd_rx.recv() => {
81                    let _ = self.to_ws.send(Message::text(cmd)).await;
82                    self.echo.store(true, Ordering::SeqCst);
83                }
84                Ok(msg) = self.msg_rx.recv() => {
85                    if let Message::Text(txt) = &*msg && self.echo.load(Ordering::SeqCst) {
86                        let _ = self.cmd_tx.send(txt.to_string()).await;
87                        self.echo.store(false, Ordering::SeqCst);
88                    }
89                }
90            }
91        }
92    }
93
94    fn rule(_: Arc<()>) -> Box<dyn Rule + Send + Sync> {
95        Box::new(move |msg: &Message| {
96            println!("Routing rule for EchoModule: {msg:?}");
97            msg.is_text()
98        })
99    }
100}
101
102#[derive(Clone)]
103pub struct EchoHandle {
104    sender: AsyncSender<String>,
105    receiver: AsyncReceiver<String>,
106}
107
108impl EchoHandle {
109    pub async fn echo(&self, msg: String) -> CoreResult<String> {
110        let _ = self.sender.send(msg).await;
111        println!("In side echo handle, waiting for response...");
112        Ok(self.receiver.recv().await?)
113    }
114}
115// Testing Platform with integrated testing wrapper
116pub struct TestingEchoPlatform {
117    testing_wrapper: TestingWrapper<()>,
118}
119
120impl TestingEchoPlatform {
121    pub async fn new(url: String) -> CoreResult<Self> {
122        let connector = DummyConnector::new(url);
123
124        let builder = ClientBuilder::new(connector, ()).with_module::<EchoModule>();
125
126        // // Create testing wrapper with custom configuration
127        // let testing_config = TestingConfig {
128        //     stats_interval: Duration::from_secs(10), // Log stats every 10 seconds
129        //     log_stats: true,
130        //     track_events: true,
131        //     max_reconnect_attempts: Some(3),
132        //     reconnect_delay: Duration::from_secs(5),
133        //     connection_timeout: Duration::from_secs(30),
134        //     auto_reconnect: true,
135        // };
136
137        let testing_wrapper = TestingWrapperBuilder::new()
138            .with_stats_interval(Duration::from_secs(10))
139            .with_log_stats(true)
140            .with_track_events(true)
141            .with_max_reconnect_attempts(Some(3))
142            .with_reconnect_delay(Duration::from_secs(5))
143            .with_connection_timeout(Duration::from_secs(30))
144            .with_auto_reconnect(true)
145            .build_with_middleware(builder)
146            .await?;
147
148        Ok(Self { testing_wrapper })
149    }
150
151    pub async fn start(&mut self) -> CoreResult<()> {
152        self.testing_wrapper.start().await
153    }
154
155    pub async fn stop(self) -> CoreResult<()> {
156        self.testing_wrapper.stop().await?;
157        Ok(())
158    }
159
160    pub async fn echo(&self, msg: String) -> CoreResult<String> {
161        match self
162            .testing_wrapper
163            .client()
164            .get_handle::<EchoModule>()
165            .await
166        {
167            Some(echo_handle) => echo_handle.echo(msg).await,
168            None => Err(CoreError::ModuleNotFound("EchoModule".to_string())),
169        }
170    }
171
172    pub async fn get_stats(&self) -> binary_options_tools_core_pre::statistics::ConnectionStats {
173        self.testing_wrapper.get_stats().await
174    }
175
176    pub async fn export_stats_json(&self) -> CoreResult<String> {
177        self.testing_wrapper.export_stats_json().await
178    }
179
180    pub async fn export_stats_csv(&self) -> CoreResult<String> {
181        self.testing_wrapper.export_stats_csv().await
182    }
183
184    pub async fn run_performance_test(&self, num_messages: usize, delay_ms: u64) -> CoreResult<()> {
185        println!("Starting performance test with {num_messages} messages");
186
187        let start_time = std::time::Instant::now();
188
189        for i in 0..num_messages {
190            let msg = format!("Test message {i}");
191            match self.echo(msg.clone()).await {
192                Ok(response) => {
193                    println!("Message {i}: sent '{msg}', received '{response}'");
194                }
195                Err(e) => {
196                    println!("Message {i} failed: {e}");
197                }
198            }
199
200            if delay_ms > 0 {
201                tokio::time::sleep(Duration::from_millis(delay_ms)).await;
202            }
203        }
204
205        let elapsed = start_time.elapsed();
206        println!("Performance test completed in {elapsed:?}");
207
208        // Print final statistics
209        let stats = self.get_stats().await;
210        println!("=== Performance Test Results ===");
211        println!("Total messages sent: {}", stats.messages_sent);
212        println!("Total messages received: {}", stats.messages_received);
213        println!(
214            "Average messages per second: {:.2}",
215            stats.avg_messages_sent_per_second
216        );
217        println!("Total bytes sent: {}", stats.bytes_sent);
218        println!("Total bytes received: {}", stats.bytes_received);
219        println!("================================");
220
221        Ok(())
222    }
223}
224
225// fn test(msg: Message) -> bool {
226//     if let Message::Binary(bin) = msg {
227//         return bin.as_ref().starts_with(b"needle")
228//     }
229//     false
230// }
231
232// Demonstration of usage
233#[tokio::main(flavor = "multi_thread", worker_threads = 4)]
234async fn main() -> CoreResult<()> {
235    // Initialize tracing
236    tracing_subscriber::fmt::init();
237
238    let mut platform = TestingEchoPlatform::new("wss://echo.websocket.org".to_string()).await?;
239
240    // Start the platform (this will begin collecting statistics)
241    platform.start().await?;
242
243    println!("Platform started! Running tests...");
244
245    // Give some time for the connection to establish
246    tokio::time::sleep(Duration::from_secs(2)).await;
247
248    // Run a simple echo test
249    println!("Testing basic echo functionality...");
250    let response = platform.echo("Hello, Testing World!".to_string()).await?;
251    println!("Echo response: {response}");
252
253    // Run a performance test
254    println!("Running performance test...");
255    platform.run_performance_test(10, 1000).await?; // 10 messages, 1 second delay
256
257    // Wait a bit more to collect statistics
258    tokio::time::sleep(Duration::from_secs(5)).await;
259
260    // Export statistics
261    println!("Exporting statistics...");
262    // let json_stats = platform.export_stats_json().await?;
263    // println!("JSON Stats:\n{json_stats}");
264
265    let csv_stats = platform.export_stats_csv().await?;
266    println!("CSV Stats:\n{csv_stats}");
267
268    // Stop the platform using the new shutdown method
269    platform.stop().await?;
270
271    println!("Testing complete!");
272    Ok(())
273}