1use async_trait::async_trait;
2use binary_options_tools_core_pre::builder::ClientBuilder;
3use binary_options_tools_core_pre::client::Client;
4use binary_options_tools_core_pre::connector::ConnectorResult;
5use binary_options_tools_core_pre::connector::{Connector, WsStream};
6use binary_options_tools_core_pre::error::{CoreError, CoreResult};
7use binary_options_tools_core_pre::traits::{ApiModule, Rule, RunnerCommand};
8use futures_util::stream::unfold;
9use futures_util::{Stream, StreamExt};
10use kanal::{AsyncReceiver, AsyncSender};
11use std::sync::atomic::{AtomicBool, Ordering};
12use std::sync::Arc;
13use std::time::Duration;
14use tokio_tungstenite::connect_async;
15use tokio_tungstenite::tungstenite::Message;
16
17struct DummyConnector {
18 url: String,
19}
20
21impl DummyConnector {
22 pub fn new(url: String) -> Self {
23 Self { url }
24 }
25}
26
27#[async_trait::async_trait]
28impl Connector<()> for DummyConnector {
29 async fn connect(&self, _: Arc<()>) -> ConnectorResult<WsStream> {
30 println!("Connecting to {}", self.url);
32 let wsstream = connect_async(&self.url).await.unwrap();
33 Ok(wsstream.0)
34 }
35
36 async fn disconnect(&self) -> ConnectorResult<()> {
37 println!("Disconnecting from {}", self.url);
39 Ok(())
40 }
41}
42
43async fn print_handler(msg: Arc<Message>, _state: Arc<()>) -> CoreResult<()> {
45 println!("[Lightweight] Received: {msg:?}");
46 Ok(())
47}
48
49pub struct EchoModule {
51 to_ws: AsyncSender<Message>,
52 cmd_rx: AsyncReceiver<String>,
53 cmd_tx: AsyncSender<String>,
54 msg_rx: AsyncReceiver<Arc<Message>>,
55 echo: AtomicBool,
56}
57
58#[async_trait]
59impl ApiModule<()> for EchoModule {
60 type Command = String;
61 type CommandResponse = String;
62 type Handle = EchoHandle;
63
64 fn new(
65 _state: Arc<()>,
66 cmd_rx: AsyncReceiver<Self::Command>,
67 cmd_ret_tx: AsyncSender<Self::CommandResponse>,
68 msg_rx: AsyncReceiver<Arc<Message>>,
69 to_ws: AsyncSender<Message>,
70 _: AsyncSender<RunnerCommand>,
71 ) -> Self {
72 Self {
73 to_ws,
74 cmd_rx,
75 cmd_tx: cmd_ret_tx,
76 msg_rx,
77 echo: AtomicBool::new(false),
78 }
79 }
80
81 fn create_handle(
82 sender: AsyncSender<Self::Command>,
83 receiver: AsyncReceiver<Self::CommandResponse>,
84 ) -> Self::Handle {
85 EchoHandle { sender, receiver }
86 }
87
88 async fn run(&mut self) -> CoreResult<()> {
89 loop {
90 tokio::select! {
91 Ok(cmd) = self.cmd_rx.recv() => {
92 let _ = self.to_ws.send(Message::text(cmd)).await;
93 self.echo.store(true, Ordering::SeqCst);
94 }
95 Ok(msg) = self.msg_rx.recv() => {
96 if let Message::Text(txt) = &*msg {
97 if self.echo.load(Ordering::SeqCst) {
98 let _ = self.cmd_tx.send(txt.to_string()).await;
99 self.echo.store(false, Ordering::SeqCst);
100 }
101 }
102 }
103 }
104 }
105 }
106
107 fn rule(_: Arc<()>) -> Box<dyn Rule + Send + Sync> {
108 Box::new(move |msg: &Message| msg.is_text())
109 }
110}
111
112#[derive(Clone)]
113pub struct EchoHandle {
114 sender: AsyncSender<String>,
115 receiver: AsyncReceiver<String>,
116}
117
118impl EchoHandle {
119 pub async fn echo(&self, msg: String) -> CoreResult<String> {
120 let _ = self.sender.send(msg).await;
121 println!("In side echo handle, waiting for response...");
122 Ok(self.receiver.recv().await?)
123 }
124}
125
126pub struct StreamModule {
128 msg_rx: AsyncReceiver<Arc<Message>>,
129 cmd_rx: AsyncReceiver<bool>,
130 cmd_tx: AsyncSender<String>,
131 send: AtomicBool,
132}
133
134#[async_trait]
135impl ApiModule<()> for StreamModule {
136 type Command = bool;
137 type CommandResponse = String;
138 type Handle = StreamHandle;
139
140 fn new(
141 _state: Arc<()>,
142 cmd_rx: AsyncReceiver<Self::Command>,
143 cmd_ret_tx: AsyncSender<Self::CommandResponse>,
144 msg_rx: AsyncReceiver<Arc<Message>>,
145 _to_ws: AsyncSender<Message>,
146 _: AsyncSender<RunnerCommand>,
147 ) -> Self {
148 Self {
149 msg_rx,
150 cmd_tx: cmd_ret_tx,
151 cmd_rx,
152 send: AtomicBool::new(false),
153 }
154 }
155
156 fn create_handle(
157 sender: AsyncSender<Self::Command>,
158 receiver: AsyncReceiver<Self::CommandResponse>,
159 ) -> Self::Handle {
160 StreamHandle { sender, receiver }
161 }
162
163 async fn run(&mut self) -> CoreResult<()> {
164 loop {
165 tokio::select! {
166 Ok(cmd) = self.cmd_rx.recv() => {
167 self.send.store(cmd, Ordering::SeqCst);
169 }
170 Ok(msg) = self.msg_rx.recv() => {
171 if let Message::Text(txt) = &*msg {
172 if self.send.load(Ordering::SeqCst) {
173 println!("[StreamModule] Received: {txt}");
175 let _ = self.cmd_tx.send(txt.to_string()).await;
176 }
177 }
178 }
179 else => {
180 println!("[Error] StreamModule: Channel closed");
181 },
182 }
183 }
184 }
185
186 fn rule(_: Arc<()>) -> Box<dyn Rule + Send + Sync> {
187 Box::new(move |_msg: &Message| {
188 true
190 })
191 }
192}
193
194#[derive(Clone)]
195pub struct StreamHandle {
196 receiver: AsyncReceiver<String>,
197 sender: AsyncSender<bool>,
198}
199
200impl StreamHandle {
201 pub async fn stream(self) -> CoreResult<impl Stream<Item = CoreResult<String>>> {
202 self.sender.send(true).await?;
203 println!("StreamHandle: Waiting for messages...");
204 Ok(Box::pin(unfold(self.receiver, |state| async move {
205 let item = state.recv().await.map_err(CoreError::from);
206 Some((item, state))
207 })))
208 }
209}
210
211pub struct PeriodicSenderModule {
213 cmd_rx: AsyncReceiver<bool>,
214 to_ws: AsyncSender<Message>,
215 running: AtomicBool,
216}
217
218#[async_trait]
219impl ApiModule<()> for PeriodicSenderModule {
220 type Command = bool; type CommandResponse = ();
222 type Handle = PeriodicSenderHandle;
223
224 fn new(
225 _state: Arc<()>,
226 cmd_rx: AsyncReceiver<Self::Command>,
227 _cmd_ret_tx: AsyncSender<Self::CommandResponse>,
228 _msg_rx: AsyncReceiver<Arc<Message>>,
229 to_ws: AsyncSender<Message>,
230 _: AsyncSender<RunnerCommand>,
231 ) -> Self {
232 Self {
233 cmd_rx,
234 to_ws,
235 running: AtomicBool::new(false),
236 }
237 }
238
239 fn create_handle(
240 sender: AsyncSender<Self::Command>,
241 _receiver: AsyncReceiver<Self::CommandResponse>,
242 ) -> Self::Handle {
243 PeriodicSenderHandle { sender }
244 }
245
246 async fn run(&mut self) -> CoreResult<()> {
247 let to_ws = self.to_ws.clone();
248 let mut interval = tokio::time::interval(Duration::from_secs(5));
249 loop {
250 tokio::select! {
251 Ok(cmd) = self.cmd_rx.recv() => {
252 self.running.store(cmd, Ordering::SeqCst);
253 }
254 _ = interval.tick() => {
255 if self.running.load(Ordering::SeqCst) {
256 let _ = to_ws.send(Message::text("Ping from periodic sender")).await;
257 }
258 }
259 }
260 }
261 }
262
263 fn rule(_: Arc<()>) -> Box<dyn Rule + Send + Sync> {
264 Box::new(move |_msg: &Message| {
265 false
267 })
268 }
269}
270
271#[derive(Clone)]
272pub struct PeriodicSenderHandle {
273 sender: AsyncSender<bool>,
274}
275
276impl PeriodicSenderHandle {
277 pub async fn start(&self) {
279 let _ = self.sender.send(true).await;
280 }
281 pub async fn stop(&self) {
283 let _ = self.sender.send(false).await;
284 }
285}
286
287pub struct EchoPlatform {
289 client: Client<()>,
290 _runner: tokio::task::JoinHandle<()>,
291}
292
293impl EchoPlatform {
294 pub async fn new(url: String) -> CoreResult<Self> {
295 let connector = DummyConnector::new(url);
297
298 let mut builder = ClientBuilder::new(connector, ());
299 builder =
300 builder.with_lightweight_handler(|msg, state, _| Box::pin(print_handler(msg, state)));
301 let (client, mut runner) = builder
302 .with_module::<EchoModule>()
303 .with_module::<StreamModule>()
304 .with_module::<PeriodicSenderModule>()
305 .build()
306 .await?;
307
308 let _runner = tokio::spawn(async move { runner.run().await });
313
314 Ok(Self { client, _runner })
315 }
316
317 pub async fn echo(&self, msg: String) -> CoreResult<String> {
318 match self.client.get_handle::<EchoModule>().await {
319 Some(echo_handle) => echo_handle.echo(msg).await,
320 None => Err(CoreError::ModuleNotFound("EchoModule".to_string())),
321 }
322 }
323
324 pub async fn stream(&self) -> CoreResult<impl Stream<Item = CoreResult<String>>> {
325 let stream_handle = self.client.get_handle::<StreamModule>().await.unwrap();
326 println!("Starting stream...");
327 stream_handle.stream().await
328 }
329
330 pub async fn start(&self) -> CoreResult<()> {
331 match self.client.get_handle::<PeriodicSenderModule>().await {
332 Some(handle) => {
333 handle.start().await;
334 Ok(())
335 }
336 None => Err(CoreError::ModuleNotFound(
337 stringify!(PeriodicSenderModule).to_string(),
338 )),
339 }
340 }
341}
342
343#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
345async fn main() -> CoreResult<()> {
346 let platform = EchoPlatform::new("wss://echo.websocket.org".to_string()).await?;
347 platform.start().await?;
348 println!("Platform started, ready to echo!");
349 println!("{}", platform.echo("Hello, Echo!".to_string()).await?);
350
351 tokio::time::sleep(Duration::from_secs(2)).await;
353 let mut stream = platform.stream().await?;
354 while let Some(Ok(msg)) = stream.next().await {
355 println!("Streamed message: {msg}");
356 }
357 Ok(())
358}
359