testing_echo_client/
testing_echo_client.rs1use async_trait::async_trait;
2use binary_options_tools_core_pre::builder::ClientBuilder;
3use binary_options_tools_core_pre::connector::ConnectorResult;
4use binary_options_tools_core_pre::connector::{Connector, WsStream};
5use binary_options_tools_core_pre::error::{CoreError, CoreResult};
6use binary_options_tools_core_pre::testing::{TestingWrapper, TestingWrapperBuilder};
7use binary_options_tools_core_pre::traits::{ApiModule, Rule};
8use kanal::{AsyncReceiver, AsyncSender};
9use std::sync::Arc;
10use std::sync::atomic::{AtomicBool, Ordering};
11use std::time::Duration;
12use tokio_tungstenite::connect_async;
13use tokio_tungstenite::tungstenite::Message;
14
15struct DummyConnector {
16 url: String,
17}
18
19impl DummyConnector {
20 pub fn new(url: String) -> Self {
21 Self { url }
22 }
23}
24
25#[async_trait::async_trait]
26impl Connector<()> for DummyConnector {
27 async fn connect(&self, _: Arc<()>) -> ConnectorResult<WsStream> {
28 println!("Connecting to {}", self.url);
29 let wsstream = connect_async(&self.url).await.unwrap();
30 Ok(wsstream.0)
31 }
32
33 async fn disconnect(&self) -> ConnectorResult<()> {
34 println!("Disconnecting from {}", self.url);
35 Ok(())
36 }
37}
38
39pub struct EchoModule {
41 to_ws: AsyncSender<Message>,
42 cmd_rx: AsyncReceiver<String>,
43 cmd_tx: AsyncSender<String>,
44 msg_rx: AsyncReceiver<Arc<Message>>,
45 echo: AtomicBool,
46}
47
48#[async_trait]
49impl ApiModule<()> for EchoModule {
50 type Command = String;
51 type CommandResponse = String;
52 type Handle = EchoHandle;
53
54 fn new(
55 _state: Arc<()>,
56 cmd_rx: AsyncReceiver<Self::Command>,
57 cmd_ret_tx: AsyncSender<Self::CommandResponse>,
58 msg_rx: AsyncReceiver<Arc<Message>>,
59 to_ws: AsyncSender<Message>,
60 ) -> Self {
61 Self {
62 to_ws,
63 cmd_rx,
64 cmd_tx: cmd_ret_tx,
65 msg_rx,
66 echo: AtomicBool::new(false),
67 }
68 }
69
70 fn create_handle(
71 sender: AsyncSender<Self::Command>,
72 receiver: AsyncReceiver<Self::CommandResponse>,
73 ) -> Self::Handle {
74 EchoHandle { sender, receiver }
75 }
76
77 async fn run(&mut self) -> CoreResult<()> {
78 loop {
79 tokio::select! {
80 Ok(cmd) = self.cmd_rx.recv() => {
81 let _ = self.to_ws.send(Message::text(cmd)).await;
82 self.echo.store(true, Ordering::SeqCst);
83 }
84 Ok(msg) = self.msg_rx.recv() => {
85 if let Message::Text(txt) = &*msg && self.echo.load(Ordering::SeqCst) {
86 let _ = self.cmd_tx.send(txt.to_string()).await;
87 self.echo.store(false, Ordering::SeqCst);
88 }
89 }
90 }
91 }
92 }
93
94 fn rule(_: Arc<()>) -> Box<dyn Rule + Send + Sync> {
95 Box::new(move |msg: &Message| {
96 println!("Routing rule for EchoModule: {msg:?}");
97 msg.is_text()
98 })
99 }
100}
101
102#[derive(Clone)]
103pub struct EchoHandle {
104 sender: AsyncSender<String>,
105 receiver: AsyncReceiver<String>,
106}
107
108impl EchoHandle {
109 pub async fn echo(&self, msg: String) -> CoreResult<String> {
110 let _ = self.sender.send(msg).await;
111 println!("In side echo handle, waiting for response...");
112 Ok(self.receiver.recv().await?)
113 }
114}
115pub struct TestingEchoPlatform {
117 testing_wrapper: TestingWrapper<()>,
118}
119
120impl TestingEchoPlatform {
121 pub async fn new(url: String) -> CoreResult<Self> {
122 let connector = DummyConnector::new(url);
123
124 let builder = ClientBuilder::new(connector, ()).with_module::<EchoModule>();
125
126 let testing_wrapper = TestingWrapperBuilder::new()
138 .with_stats_interval(Duration::from_secs(10))
139 .with_log_stats(true)
140 .with_track_events(true)
141 .with_max_reconnect_attempts(Some(3))
142 .with_reconnect_delay(Duration::from_secs(5))
143 .with_connection_timeout(Duration::from_secs(30))
144 .with_auto_reconnect(true)
145 .build_with_middleware(builder)
146 .await?;
147
148 Ok(Self { testing_wrapper })
149 }
150
151 pub async fn start(&mut self) -> CoreResult<()> {
152 self.testing_wrapper.start().await
153 }
154
155 pub async fn stop(self) -> CoreResult<()> {
156 self.testing_wrapper.stop().await?;
157 Ok(())
158 }
159
160 pub async fn echo(&self, msg: String) -> CoreResult<String> {
161 match self
162 .testing_wrapper
163 .client()
164 .get_handle::<EchoModule>()
165 .await
166 {
167 Some(echo_handle) => echo_handle.echo(msg).await,
168 None => Err(CoreError::ModuleNotFound("EchoModule".to_string())),
169 }
170 }
171
172 pub async fn get_stats(&self) -> binary_options_tools_core_pre::statistics::ConnectionStats {
173 self.testing_wrapper.get_stats().await
174 }
175
176 pub async fn export_stats_json(&self) -> CoreResult<String> {
177 self.testing_wrapper.export_stats_json().await
178 }
179
180 pub async fn export_stats_csv(&self) -> CoreResult<String> {
181 self.testing_wrapper.export_stats_csv().await
182 }
183
184 pub async fn run_performance_test(&self, num_messages: usize, delay_ms: u64) -> CoreResult<()> {
185 println!("Starting performance test with {num_messages} messages");
186
187 let start_time = std::time::Instant::now();
188
189 for i in 0..num_messages {
190 let msg = format!("Test message {i}");
191 match self.echo(msg.clone()).await {
192 Ok(response) => {
193 println!("Message {i}: sent '{msg}', received '{response}'");
194 }
195 Err(e) => {
196 println!("Message {i} failed: {e}");
197 }
198 }
199
200 if delay_ms > 0 {
201 tokio::time::sleep(Duration::from_millis(delay_ms)).await;
202 }
203 }
204
205 let elapsed = start_time.elapsed();
206 println!("Performance test completed in {elapsed:?}");
207
208 let stats = self.get_stats().await;
210 println!("=== Performance Test Results ===");
211 println!("Total messages sent: {}", stats.messages_sent);
212 println!("Total messages received: {}", stats.messages_received);
213 println!(
214 "Average messages per second: {:.2}",
215 stats.avg_messages_sent_per_second
216 );
217 println!("Total bytes sent: {}", stats.bytes_sent);
218 println!("Total bytes received: {}", stats.bytes_received);
219 println!("================================");
220
221 Ok(())
222 }
223}
224
225#[tokio::main(flavor = "multi_thread", worker_threads = 4)]
234async fn main() -> CoreResult<()> {
235 tracing_subscriber::fmt::init();
237
238 let mut platform = TestingEchoPlatform::new("wss://echo.websocket.org".to_string()).await?;
239
240 platform.start().await?;
242
243 println!("Platform started! Running tests...");
244
245 tokio::time::sleep(Duration::from_secs(2)).await;
247
248 println!("Testing basic echo functionality...");
250 let response = platform.echo("Hello, Testing World!".to_string()).await?;
251 println!("Echo response: {response}");
252
253 println!("Running performance test...");
255 platform.run_performance_test(10, 1000).await?; tokio::time::sleep(Duration::from_secs(5)).await;
259
260 println!("Exporting statistics...");
262 let csv_stats = platform.export_stats_csv().await?;
266 println!("CSV Stats:\n{csv_stats}");
267
268 platform.stop().await?;
270
271 println!("Testing complete!");
272 Ok(())
273}