Skip to main content

echo_client/
echo_client.rs

1use async_trait::async_trait;
2use binary_options_tools_core_pre::builder::ClientBuilder;
3use binary_options_tools_core_pre::client::Client;
4use binary_options_tools_core_pre::connector::ConnectorResult;
5use binary_options_tools_core_pre::connector::{Connector, WsStream};
6use binary_options_tools_core_pre::error::{CoreError, CoreResult};
7use binary_options_tools_core_pre::traits::{ApiModule, Rule, RunnerCommand};
8use futures_util::stream::unfold;
9use futures_util::{Stream, StreamExt};
10use kanal::{AsyncReceiver, AsyncSender};
11use std::sync::atomic::{AtomicBool, Ordering};
12use std::sync::Arc;
13use std::time::Duration;
14use tokio_tungstenite::connect_async;
15use tokio_tungstenite::tungstenite::Message;
16
17struct DummyConnector {
18    url: String,
19}
20
21impl DummyConnector {
22    pub fn new(url: String) -> Self {
23        Self { url }
24    }
25}
26
27#[async_trait::async_trait]
28impl Connector<()> for DummyConnector {
29    async fn connect(&self, _: Arc<()>) -> ConnectorResult<WsStream> {
30        // Simulate a WebSocket connection
31        println!("Connecting to {}", self.url);
32        let wsstream = connect_async(&self.url).await.unwrap();
33        Ok(wsstream.0)
34    }
35
36    async fn disconnect(&self) -> ConnectorResult<()> {
37        // Simulate disconnection
38        println!("Disconnecting from {}", self.url);
39        Ok(())
40    }
41}
42
43// --- Lightweight Handlers ---
44async fn print_handler(msg: Arc<Message>, _state: Arc<()>) -> CoreResult<()> {
45    println!("[Lightweight] Received: {msg:?}");
46    Ok(())
47}
48
49// --- ApiModule 1: EchoModule ---
50pub struct EchoModule {
51    to_ws: AsyncSender<Message>,
52    cmd_rx: AsyncReceiver<String>,
53    cmd_tx: AsyncSender<String>,
54    msg_rx: AsyncReceiver<Arc<Message>>,
55    echo: AtomicBool,
56}
57
58#[async_trait]
59impl ApiModule<()> for EchoModule {
60    type Command = String;
61    type CommandResponse = String;
62    type Handle = EchoHandle;
63
64    fn new(
65        _state: Arc<()>,
66        cmd_rx: AsyncReceiver<Self::Command>,
67        cmd_ret_tx: AsyncSender<Self::CommandResponse>,
68        msg_rx: AsyncReceiver<Arc<Message>>,
69        to_ws: AsyncSender<Message>,
70        _: AsyncSender<RunnerCommand>,
71    ) -> Self {
72        Self {
73            to_ws,
74            cmd_rx,
75            cmd_tx: cmd_ret_tx,
76            msg_rx,
77            echo: AtomicBool::new(false),
78        }
79    }
80
81    fn create_handle(
82        sender: AsyncSender<Self::Command>,
83        receiver: AsyncReceiver<Self::CommandResponse>,
84    ) -> Self::Handle {
85        EchoHandle { sender, receiver }
86    }
87
88    async fn run(&mut self) -> CoreResult<()> {
89        loop {
90            tokio::select! {
91                Ok(cmd) = self.cmd_rx.recv() => {
92                    let _ = self.to_ws.send(Message::text(cmd)).await;
93                    self.echo.store(true, Ordering::SeqCst);
94                }
95                Ok(msg) = self.msg_rx.recv() => {
96                    if let Message::Text(txt) = &*msg {
97                        if self.echo.load(Ordering::SeqCst) {
98                            let _ = self.cmd_tx.send(txt.to_string()).await;
99                            self.echo.store(false, Ordering::SeqCst);
100                        }
101                    }
102                }
103            }
104        }
105    }
106
107    fn rule(_: Arc<()>) -> Box<dyn Rule + Send + Sync> {
108        Box::new(move |msg: &Message| msg.is_text())
109    }
110}
111
112#[derive(Clone)]
113pub struct EchoHandle {
114    sender: AsyncSender<String>,
115    receiver: AsyncReceiver<String>,
116}
117
118impl EchoHandle {
119    pub async fn echo(&self, msg: String) -> CoreResult<String> {
120        let _ = self.sender.send(msg).await;
121        println!("In side echo handle, waiting for response...");
122        Ok(self.receiver.recv().await?)
123    }
124}
125
126// --- ApiModule 2: StreamModule ---
127pub struct StreamModule {
128    msg_rx: AsyncReceiver<Arc<Message>>,
129    cmd_rx: AsyncReceiver<bool>,
130    cmd_tx: AsyncSender<String>,
131    send: AtomicBool,
132}
133
134#[async_trait]
135impl ApiModule<()> for StreamModule {
136    type Command = bool;
137    type CommandResponse = String;
138    type Handle = StreamHandle;
139
140    fn new(
141        _state: Arc<()>,
142        cmd_rx: AsyncReceiver<Self::Command>,
143        cmd_ret_tx: AsyncSender<Self::CommandResponse>,
144        msg_rx: AsyncReceiver<Arc<Message>>,
145        _to_ws: AsyncSender<Message>,
146        _: AsyncSender<RunnerCommand>,
147    ) -> Self {
148        Self {
149            msg_rx,
150            cmd_tx: cmd_ret_tx,
151            cmd_rx,
152            send: AtomicBool::new(false),
153        }
154    }
155
156    fn create_handle(
157        sender: AsyncSender<Self::Command>,
158        receiver: AsyncReceiver<Self::CommandResponse>,
159    ) -> Self::Handle {
160        StreamHandle { sender, receiver }
161    }
162
163    async fn run(&mut self) -> CoreResult<()> {
164        loop {
165            tokio::select! {
166                Ok(cmd) = self.cmd_rx.recv() => {
167                    // Update the send flag based on the received command
168                    self.send.store(cmd, Ordering::SeqCst);
169                }
170                Ok(msg) = self.msg_rx.recv() => {
171                    if let Message::Text(txt) = &*msg {
172                        if self.send.load(Ordering::SeqCst) {
173                            // Process the message if send is true
174                            println!("[StreamModule] Received: {txt}");
175                            let _ = self.cmd_tx.send(txt.to_string()).await;
176                        }
177                    }
178                }
179                else => {
180                    println!("[Error] StreamModule: Channel closed");
181                },
182            }
183        }
184    }
185
186    fn rule(_: Arc<()>) -> Box<dyn Rule + Send + Sync> {
187        Box::new(move |_msg: &Message| {
188            // Accept all messages
189            true
190        })
191    }
192}
193
194#[derive(Clone)]
195pub struct StreamHandle {
196    receiver: AsyncReceiver<String>,
197    sender: AsyncSender<bool>,
198}
199
200impl StreamHandle {
201    pub async fn stream(self) -> CoreResult<impl Stream<Item = CoreResult<String>>> {
202        self.sender.send(true).await?;
203        println!("StreamHandle: Waiting for messages...");
204        Ok(Box::pin(unfold(self.receiver, |state| async move {
205            let item = state.recv().await.map_err(CoreError::from);
206            Some((item, state))
207        })))
208    }
209}
210
211// --- ApiModule 3: PeriodicSenderModule ---
212pub struct PeriodicSenderModule {
213    cmd_rx: AsyncReceiver<bool>,
214    to_ws: AsyncSender<Message>,
215    running: AtomicBool,
216}
217
218#[async_trait]
219impl ApiModule<()> for PeriodicSenderModule {
220    type Command = bool; // true = start, false = stop
221    type CommandResponse = ();
222    type Handle = PeriodicSenderHandle;
223
224    fn new(
225        _state: Arc<()>,
226        cmd_rx: AsyncReceiver<Self::Command>,
227        _cmd_ret_tx: AsyncSender<Self::CommandResponse>,
228        _msg_rx: AsyncReceiver<Arc<Message>>,
229        to_ws: AsyncSender<Message>,
230        _: AsyncSender<RunnerCommand>,
231    ) -> Self {
232        Self {
233            cmd_rx,
234            to_ws,
235            running: AtomicBool::new(false),
236        }
237    }
238
239    fn create_handle(
240        sender: AsyncSender<Self::Command>,
241        _receiver: AsyncReceiver<Self::CommandResponse>,
242    ) -> Self::Handle {
243        PeriodicSenderHandle { sender }
244    }
245
246    async fn run(&mut self) -> CoreResult<()> {
247        let to_ws = self.to_ws.clone();
248        let mut interval = tokio::time::interval(Duration::from_secs(5));
249        loop {
250            tokio::select! {
251                Ok(cmd) = self.cmd_rx.recv() => {
252                    self.running.store(cmd, Ordering::SeqCst);
253                }
254                _ = interval.tick() => {
255                    if self.running.load(Ordering::SeqCst) {
256                        let _ = to_ws.send(Message::text("Ping from periodic sender")).await;
257                    }
258                }
259            }
260        }
261    }
262
263    fn rule(_: Arc<()>) -> Box<dyn Rule + Send + Sync> {
264        Box::new(move |_msg: &Message| {
265            // This module does not process incoming messages
266            false
267        })
268    }
269}
270
271#[derive(Clone)]
272pub struct PeriodicSenderHandle {
273    sender: AsyncSender<bool>,
274}
275
276impl PeriodicSenderHandle {
277    /// Start periodic sending
278    pub async fn start(&self) {
279        let _ = self.sender.send(true).await;
280    }
281    /// Stop periodic sending
282    pub async fn stop(&self) {
283        let _ = self.sender.send(false).await;
284    }
285}
286
287// --- EchoPlatform Struct ---
288pub struct EchoPlatform {
289    client: Client<()>,
290    _runner: tokio::task::JoinHandle<()>,
291}
292
293impl EchoPlatform {
294    pub async fn new(url: String) -> CoreResult<Self> {
295        // Use a simple connector (implement your own if needed)
296        let connector = DummyConnector::new(url);
297
298        let mut builder = ClientBuilder::new(connector, ());
299        builder =
300            builder.with_lightweight_handler(|msg, state, _| Box::pin(print_handler(msg, state)));
301        let (client, mut runner) = builder
302            .with_module::<EchoModule>()
303            .with_module::<StreamModule>()
304            .with_module::<PeriodicSenderModule>()
305            .build()
306            .await?;
307
308        // let echo_handle = client.get_handle::<EchoModule>().await.unwrap();
309        // let stream_handle = client.get_handle::<StreamModule>().await.unwrap();
310
311        // Start runner in background
312        let _runner = tokio::spawn(async move { runner.run().await });
313
314        Ok(Self { client, _runner })
315    }
316
317    pub async fn echo(&self, msg: String) -> CoreResult<String> {
318        match self.client.get_handle::<EchoModule>().await {
319            Some(echo_handle) => echo_handle.echo(msg).await,
320            None => Err(CoreError::ModuleNotFound("EchoModule".to_string())),
321        }
322    }
323
324    pub async fn stream(&self) -> CoreResult<impl Stream<Item = CoreResult<String>>> {
325        let stream_handle = self.client.get_handle::<StreamModule>().await.unwrap();
326        println!("Starting stream...");
327        stream_handle.stream().await
328    }
329
330    pub async fn start(&self) -> CoreResult<()> {
331        match self.client.get_handle::<PeriodicSenderModule>().await {
332            Some(handle) => {
333                handle.start().await;
334                Ok(())
335            }
336            None => Err(CoreError::ModuleNotFound(
337                stringify!(PeriodicSenderModule).to_string(),
338            )),
339        }
340    }
341}
342
343// --- Main Example ---
344#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
345async fn main() -> CoreResult<()> {
346    let platform = EchoPlatform::new("wss://echo.websocket.org".to_string()).await?;
347    platform.start().await?;
348    println!("Platform started, ready to echo!");
349    println!("{}", platform.echo("Hello, Echo!".to_string()).await?);
350
351    // Wait to receive the echo
352    tokio::time::sleep(Duration::from_secs(2)).await;
353    let mut stream = platform.stream().await?;
354    while let Some(Ok(msg)) = stream.next().await {
355        println!("Streamed message: {msg}");
356    }
357    Ok(())
358}
359// can you make some kind of new implementation / wrapper around a client / runner that tests it a lot like check the connection lattency, checks the time since las disconnection, the time the system kept connected before calling the connect or reconnect functions, also i want it to work like for structs like the EchoPlatform like with a cupple of lines i pass the configuration of the struct (like functions to call espected return )