testing_echo_client/
testing_echo_client.rs1use async_trait::async_trait;
2use binary_options_tools_core_pre::builder::ClientBuilder;
3use binary_options_tools_core_pre::connector::ConnectorResult;
4use binary_options_tools_core_pre::connector::{Connector, WsStream};
5use binary_options_tools_core_pre::error::{CoreError, CoreResult};
6use binary_options_tools_core_pre::testing::{TestingWrapper, TestingWrapperBuilder};
7use binary_options_tools_core_pre::traits::{ApiModule, Rule, RunnerCommand};
8use kanal::{AsyncReceiver, AsyncSender};
9use std::sync::atomic::{AtomicBool, Ordering};
10use std::sync::Arc;
11use std::time::Duration;
12use tokio_tungstenite::connect_async;
13use tokio_tungstenite::tungstenite::Message;
14
15struct DummyConnector {
16 url: String,
17}
18
19impl DummyConnector {
20 pub fn new(url: String) -> Self {
21 Self { url }
22 }
23}
24
25#[async_trait::async_trait]
26impl Connector<()> for DummyConnector {
27 async fn connect(&self, _: Arc<()>) -> ConnectorResult<WsStream> {
28 println!("Connecting to {}", self.url);
29 let wsstream = connect_async(&self.url).await.unwrap();
30 Ok(wsstream.0)
31 }
32
33 async fn disconnect(&self) -> ConnectorResult<()> {
34 println!("Disconnecting from {}", self.url);
35 Ok(())
36 }
37}
38
39pub struct EchoModule {
41 to_ws: AsyncSender<Message>,
42 cmd_rx: AsyncReceiver<String>,
43 cmd_tx: AsyncSender<String>,
44 msg_rx: AsyncReceiver<Arc<Message>>,
45 echo: AtomicBool,
46}
47
48#[async_trait]
49impl ApiModule<()> for EchoModule {
50 type Command = String;
51 type CommandResponse = String;
52 type Handle = EchoHandle;
53
54 fn new(
55 _state: Arc<()>,
56 cmd_rx: AsyncReceiver<Self::Command>,
57 cmd_ret_tx: AsyncSender<Self::CommandResponse>,
58 msg_rx: AsyncReceiver<Arc<Message>>,
59 to_ws: AsyncSender<Message>,
60 _: AsyncSender<RunnerCommand>,
61 ) -> Self {
62 Self {
63 to_ws,
64 cmd_rx,
65 cmd_tx: cmd_ret_tx,
66 msg_rx,
67 echo: AtomicBool::new(false),
68 }
69 }
70
71 fn create_handle(
72 sender: AsyncSender<Self::Command>,
73 receiver: AsyncReceiver<Self::CommandResponse>,
74 ) -> Self::Handle {
75 EchoHandle { sender, receiver }
76 }
77
78 async fn run(&mut self) -> CoreResult<()> {
79 loop {
80 tokio::select! {
81 Ok(cmd) = self.cmd_rx.recv() => {
82 let _ = self.to_ws.send(Message::text(cmd)).await;
83 self.echo.store(true, Ordering::SeqCst);
84 }
85 Ok(msg) = self.msg_rx.recv() => {
86 if let Message::Text(txt) = &*msg {
87 if self.echo.load(Ordering::SeqCst) {
88 let _ = self.cmd_tx.send(txt.to_string()).await;
89 self.echo.store(false, Ordering::SeqCst);
90 }
91 }
92 }
93 }
94 }
95 }
96
97 fn rule(_: Arc<()>) -> Box<dyn Rule + Send + Sync> {
98 Box::new(move |msg: &Message| {
99 println!("Routing rule for EchoModule: {msg:?}");
100 msg.is_text()
101 })
102 }
103}
104
105#[derive(Clone)]
106pub struct EchoHandle {
107 sender: AsyncSender<String>,
108 receiver: AsyncReceiver<String>,
109}
110
111impl EchoHandle {
112 pub async fn echo(&self, msg: String) -> CoreResult<String> {
113 let _ = self.sender.send(msg).await;
114 println!("In side echo handle, waiting for response...");
115 Ok(self.receiver.recv().await?)
116 }
117}
118pub struct TestingEchoPlatform {
120 testing_wrapper: TestingWrapper<()>,
121}
122
123impl TestingEchoPlatform {
124 pub async fn new(url: String) -> CoreResult<Self> {
125 let connector = DummyConnector::new(url);
126
127 let builder = ClientBuilder::new(connector, ()).with_module::<EchoModule>();
128
129 let testing_wrapper = TestingWrapperBuilder::new()
141 .with_stats_interval(Duration::from_secs(10))
142 .with_log_stats(true)
143 .with_track_events(true)
144 .with_max_reconnect_attempts(Some(3))
145 .with_reconnect_delay(Duration::from_secs(5))
146 .with_connection_timeout(Duration::from_secs(30))
147 .with_auto_reconnect(true)
148 .build_with_middleware(builder)
149 .await?;
150
151 Ok(Self { testing_wrapper })
152 }
153
154 pub async fn start(&mut self) -> CoreResult<()> {
155 self.testing_wrapper.start().await
156 }
157
158 pub async fn stop(self) -> CoreResult<()> {
159 self.testing_wrapper.stop().await?;
160 Ok(())
161 }
162
163 pub async fn echo(&self, msg: String) -> CoreResult<String> {
164 match self
165 .testing_wrapper
166 .client()
167 .get_handle::<EchoModule>()
168 .await
169 {
170 Some(echo_handle) => echo_handle.echo(msg).await,
171 None => Err(CoreError::ModuleNotFound("EchoModule".to_string())),
172 }
173 }
174
175 pub async fn get_stats(&self) -> binary_options_tools_core_pre::statistics::ConnectionStats {
176 self.testing_wrapper.get_stats().await
177 }
178
179 pub async fn export_stats_json(&self) -> CoreResult<String> {
180 self.testing_wrapper.export_stats_json().await
181 }
182
183 pub async fn export_stats_csv(&self) -> CoreResult<String> {
184 self.testing_wrapper.export_stats_csv().await
185 }
186
187 pub async fn run_performance_test(&self, num_messages: usize, delay_ms: u64) -> CoreResult<()> {
188 println!("Starting performance test with {num_messages} messages");
189
190 let start_time = std::time::Instant::now();
191
192 for i in 0..num_messages {
193 let msg = format!("Test message {i}");
194 match self.echo(msg.clone()).await {
195 Ok(response) => {
196 println!("Message {i}: sent '{msg}', received '{response}'");
197 }
198 Err(e) => {
199 println!("Message {i} failed: {e}");
200 }
201 }
202
203 if delay_ms > 0 {
204 tokio::time::sleep(Duration::from_millis(delay_ms)).await;
205 }
206 }
207
208 let elapsed = start_time.elapsed();
209 println!("Performance test completed in {elapsed:?}");
210
211 let stats = self.get_stats().await;
213 println!("=== Performance Test Results ===");
214 println!("Total messages sent: {}", stats.messages_sent);
215 println!("Total messages received: {}", stats.messages_received);
216 println!(
217 "Average messages per second: {:.2}",
218 stats.avg_messages_sent_per_second
219 );
220 println!("Total bytes sent: {}", stats.bytes_sent);
221 println!("Total bytes received: {}", stats.bytes_received);
222 println!("================================");
223
224 Ok(())
225 }
226}
227
228#[tokio::main(flavor = "multi_thread", worker_threads = 4)]
237async fn main() -> CoreResult<()> {
238 tracing_subscriber::fmt::init();
240
241 let mut platform = TestingEchoPlatform::new("wss://echo.websocket.org".to_string()).await?;
242
243 platform.start().await?;
245
246 println!("Platform started! Running tests...");
247
248 tokio::time::sleep(Duration::from_secs(2)).await;
250
251 println!("Testing basic echo functionality...");
253 let response = platform.echo("Hello, Testing World!".to_string()).await?;
254 println!("Echo response: {response}");
255
256 println!("Running performance test...");
258 platform.run_performance_test(10, 1000).await?; tokio::time::sleep(Duration::from_secs(5)).await;
262
263 println!("Exporting statistics...");
265 let csv_stats = platform.export_stats_csv().await?;
269 println!("CSV Stats:\n{csv_stats}");
270
271 platform.stop().await?;
273
274 println!("Testing complete!");
275 Ok(())
276}