Skip to main content

testing_echo_client/
testing_echo_client.rs

1use async_trait::async_trait;
2use binary_options_tools_core_pre::builder::ClientBuilder;
3use binary_options_tools_core_pre::connector::ConnectorResult;
4use binary_options_tools_core_pre::connector::{Connector, WsStream};
5use binary_options_tools_core_pre::error::{CoreError, CoreResult};
6use binary_options_tools_core_pre::testing::{TestingWrapper, TestingWrapperBuilder};
7use binary_options_tools_core_pre::traits::{ApiModule, Rule, RunnerCommand};
8use kanal::{AsyncReceiver, AsyncSender};
9use std::sync::atomic::{AtomicBool, Ordering};
10use std::sync::Arc;
11use std::time::Duration;
12use tokio_tungstenite::connect_async;
13use tokio_tungstenite::tungstenite::Message;
14
15struct DummyConnector {
16    url: String,
17}
18
19impl DummyConnector {
20    pub fn new(url: String) -> Self {
21        Self { url }
22    }
23}
24
25#[async_trait::async_trait]
26impl Connector<()> for DummyConnector {
27    async fn connect(&self, _: Arc<()>) -> ConnectorResult<WsStream> {
28        println!("Connecting to {}", self.url);
29        let wsstream = connect_async(&self.url).await.unwrap();
30        Ok(wsstream.0)
31    }
32
33    async fn disconnect(&self) -> ConnectorResult<()> {
34        println!("Disconnecting from {}", self.url);
35        Ok(())
36    }
37}
38
39// --- ApiModule 1: EchoModule ---
40pub struct EchoModule {
41    to_ws: AsyncSender<Message>,
42    cmd_rx: AsyncReceiver<String>,
43    cmd_tx: AsyncSender<String>,
44    msg_rx: AsyncReceiver<Arc<Message>>,
45    echo: AtomicBool,
46}
47
48#[async_trait]
49impl ApiModule<()> for EchoModule {
50    type Command = String;
51    type CommandResponse = String;
52    type Handle = EchoHandle;
53
54    fn new(
55        _state: Arc<()>,
56        cmd_rx: AsyncReceiver<Self::Command>,
57        cmd_ret_tx: AsyncSender<Self::CommandResponse>,
58        msg_rx: AsyncReceiver<Arc<Message>>,
59        to_ws: AsyncSender<Message>,
60        _: AsyncSender<RunnerCommand>,
61    ) -> Self {
62        Self {
63            to_ws,
64            cmd_rx,
65            cmd_tx: cmd_ret_tx,
66            msg_rx,
67            echo: AtomicBool::new(false),
68        }
69    }
70
71    fn create_handle(
72        sender: AsyncSender<Self::Command>,
73        receiver: AsyncReceiver<Self::CommandResponse>,
74    ) -> Self::Handle {
75        EchoHandle { sender, receiver }
76    }
77
78    async fn run(&mut self) -> CoreResult<()> {
79        loop {
80            tokio::select! {
81                Ok(cmd) = self.cmd_rx.recv() => {
82                    let _ = self.to_ws.send(Message::text(cmd)).await;
83                    self.echo.store(true, Ordering::SeqCst);
84                }
85                Ok(msg) = self.msg_rx.recv() => {
86                    if let Message::Text(txt) = &*msg {
87                        if self.echo.load(Ordering::SeqCst) {
88                            let _ = self.cmd_tx.send(txt.to_string()).await;
89                            self.echo.store(false, Ordering::SeqCst);
90                        }
91                    }
92                }
93            }
94        }
95    }
96
97    fn rule(_: Arc<()>) -> Box<dyn Rule + Send + Sync> {
98        Box::new(move |msg: &Message| {
99            println!("Routing rule for EchoModule: {msg:?}");
100            msg.is_text()
101        })
102    }
103}
104
105#[derive(Clone)]
106pub struct EchoHandle {
107    sender: AsyncSender<String>,
108    receiver: AsyncReceiver<String>,
109}
110
111impl EchoHandle {
112    pub async fn echo(&self, msg: String) -> CoreResult<String> {
113        let _ = self.sender.send(msg).await;
114        println!("In side echo handle, waiting for response...");
115        Ok(self.receiver.recv().await?)
116    }
117}
118// Testing Platform with integrated testing wrapper
119pub struct TestingEchoPlatform {
120    testing_wrapper: TestingWrapper<()>,
121}
122
123impl TestingEchoPlatform {
124    pub async fn new(url: String) -> CoreResult<Self> {
125        let connector = DummyConnector::new(url);
126
127        let builder = ClientBuilder::new(connector, ()).with_module::<EchoModule>();
128
129        // // Create testing wrapper with custom configuration
130        // let testing_config = TestingConfig {
131        //     stats_interval: Duration::from_secs(10), // Log stats every 10 seconds
132        //     log_stats: true,
133        //     track_events: true,
134        //     max_reconnect_attempts: Some(3),
135        //     reconnect_delay: Duration::from_secs(5),
136        //     connection_timeout: Duration::from_secs(30),
137        //     auto_reconnect: true,
138        // };
139
140        let testing_wrapper = TestingWrapperBuilder::new()
141            .with_stats_interval(Duration::from_secs(10))
142            .with_log_stats(true)
143            .with_track_events(true)
144            .with_max_reconnect_attempts(Some(3))
145            .with_reconnect_delay(Duration::from_secs(5))
146            .with_connection_timeout(Duration::from_secs(30))
147            .with_auto_reconnect(true)
148            .build_with_middleware(builder)
149            .await?;
150
151        Ok(Self { testing_wrapper })
152    }
153
154    pub async fn start(&mut self) -> CoreResult<()> {
155        self.testing_wrapper.start().await
156    }
157
158    pub async fn stop(self) -> CoreResult<()> {
159        self.testing_wrapper.stop().await?;
160        Ok(())
161    }
162
163    pub async fn echo(&self, msg: String) -> CoreResult<String> {
164        match self
165            .testing_wrapper
166            .client()
167            .get_handle::<EchoModule>()
168            .await
169        {
170            Some(echo_handle) => echo_handle.echo(msg).await,
171            None => Err(CoreError::ModuleNotFound("EchoModule".to_string())),
172        }
173    }
174
175    pub async fn get_stats(&self) -> binary_options_tools_core_pre::statistics::ConnectionStats {
176        self.testing_wrapper.get_stats().await
177    }
178
179    pub async fn export_stats_json(&self) -> CoreResult<String> {
180        self.testing_wrapper.export_stats_json().await
181    }
182
183    pub async fn export_stats_csv(&self) -> CoreResult<String> {
184        self.testing_wrapper.export_stats_csv().await
185    }
186
187    pub async fn run_performance_test(&self, num_messages: usize, delay_ms: u64) -> CoreResult<()> {
188        println!("Starting performance test with {num_messages} messages");
189
190        let start_time = std::time::Instant::now();
191
192        for i in 0..num_messages {
193            let msg = format!("Test message {i}");
194            match self.echo(msg.clone()).await {
195                Ok(response) => {
196                    println!("Message {i}: sent '{msg}', received '{response}'");
197                }
198                Err(e) => {
199                    println!("Message {i} failed: {e}");
200                }
201            }
202
203            if delay_ms > 0 {
204                tokio::time::sleep(Duration::from_millis(delay_ms)).await;
205            }
206        }
207
208        let elapsed = start_time.elapsed();
209        println!("Performance test completed in {elapsed:?}");
210
211        // Print final statistics
212        let stats = self.get_stats().await;
213        println!("=== Performance Test Results ===");
214        println!("Total messages sent: {}", stats.messages_sent);
215        println!("Total messages received: {}", stats.messages_received);
216        println!(
217            "Average messages per second: {:.2}",
218            stats.avg_messages_sent_per_second
219        );
220        println!("Total bytes sent: {}", stats.bytes_sent);
221        println!("Total bytes received: {}", stats.bytes_received);
222        println!("================================");
223
224        Ok(())
225    }
226}
227
228// fn test(msg: Message) -> bool {
229//     if let Message::Binary(bin) = msg {
230//         return bin.as_ref().starts_with(b"needle")
231//     }
232//     false
233// }
234
235// Demonstration of usage
236#[tokio::main(flavor = "multi_thread", worker_threads = 4)]
237async fn main() -> CoreResult<()> {
238    // Initialize tracing
239    tracing_subscriber::fmt::init();
240
241    let mut platform = TestingEchoPlatform::new("wss://echo.websocket.org".to_string()).await?;
242
243    // Start the platform (this will begin collecting statistics)
244    platform.start().await?;
245
246    println!("Platform started! Running tests...");
247
248    // Give some time for the connection to establish
249    tokio::time::sleep(Duration::from_secs(2)).await;
250
251    // Run a simple echo test
252    println!("Testing basic echo functionality...");
253    let response = platform.echo("Hello, Testing World!".to_string()).await?;
254    println!("Echo response: {response}");
255
256    // Run a performance test
257    println!("Running performance test...");
258    platform.run_performance_test(10, 1000).await?; // 10 messages, 1 second delay
259
260    // Wait a bit more to collect statistics
261    tokio::time::sleep(Duration::from_secs(5)).await;
262
263    // Export statistics
264    println!("Exporting statistics...");
265    // let json_stats = platform.export_stats_json().await?;
266    // println!("JSON Stats:\n{json_stats}");
267
268    let csv_stats = platform.export_stats_csv().await?;
269    println!("CSV Stats:\n{csv_stats}");
270
271    // Stop the platform using the new shutdown method
272    platform.stop().await?;
273
274    println!("Testing complete!");
275    Ok(())
276}