binary_options_tools_core/general/
send.rs

1use std::time::Duration;
2
3use async_channel::{bounded, Receiver, RecvError, Sender};
4use tokio_tungstenite::tungstenite::Message;
5use tracing::error;
6
7use crate::{
8    error::{BinaryOptionsResult, BinaryOptionsToolsError},
9    general::validate::validate,
10    utils::time::timeout,
11};
12
13use super::{
14    traits::{DataHandler, MessageTransfer},
15    types::Data,
16};
17
18#[derive(Clone)]
19pub struct SenderMessage {
20    sender: Sender<Message>,
21    sender_priority: Sender<Message>,
22}
23
24impl SenderMessage {
25    pub fn new(cap: usize) -> (Self, (Receiver<Message>, Receiver<Message>)) {
26        let (s, r) = bounded(cap);
27        let (sp, rp) = bounded(cap);
28
29        (
30            Self {
31                sender: s,
32                sender_priority: sp,
33            },
34            (r, rp),
35        )
36    }
37    // pub fn new(sender: Sender<Transfer>) -> Self {
38    //     Self { sender }
39    // }
40    async fn reciever<Transfer: MessageTransfer, T: DataHandler<Transfer = Transfer>>(
41        &self,
42        data: &Data<T, Transfer>,
43        msg: Transfer,
44        response_type: Transfer::Info,
45    ) -> BinaryOptionsResult<Receiver<Transfer>> {
46        let reciever = data.add_request(response_type).await;
47
48        self.send(msg)
49            .await
50            .map_err(|e| BinaryOptionsToolsError::ThreadMessageSendingErrorMPCS(e.to_string()))?;
51        Ok(reciever)
52    }
53
54    pub async fn send<Transfer: MessageTransfer>(&self, msg: Transfer) -> BinaryOptionsResult<()> {
55        self.sender
56            .send(msg.into())
57            .await
58            .map_err(|e| BinaryOptionsToolsError::ChannelRequestSendingError(e.to_string()))?;
59        Ok(())
60    }
61
62    pub async fn priority_send(&self, msg: Message) -> BinaryOptionsResult<()> {
63        self.sender_priority
64            .send(msg)
65            .await
66            .map_err(|e| BinaryOptionsToolsError::ChannelRequestSendingError(e.to_string()))?;
67        Ok(())
68    }
69
70    pub async fn send_message<Transfer: MessageTransfer, T: DataHandler<Transfer = Transfer>>(
71        &self,
72        data: &Data<T, Transfer>,
73        msg: Transfer,
74        response_type: Transfer::Info,
75        validator: impl Fn(&Transfer) -> bool + Send + Sync,
76    ) -> BinaryOptionsResult<Transfer> {
77        let reciever = self.reciever(data, msg, response_type).await?;
78
79        while let Ok(msg) = reciever.recv().await {
80            if let Some(msg) =
81                validate(&validator, msg).inspect_err(|e| error!("Failed to place trade {e}"))?
82            {
83                return Ok(msg);
84            }
85        }
86        Err(BinaryOptionsToolsError::ChannelRequestRecievingError(
87            RecvError,
88        ))
89    }
90
91    pub async fn send_message_with_timout<
92        Transfer: MessageTransfer,
93        T: DataHandler<Transfer = Transfer>,
94    >(
95        &self,
96        time: Duration,
97        task: impl ToString,
98        data: &Data<T, Transfer>,
99        msg: Transfer,
100        response_type: Transfer::Info,
101        validator: impl Fn(&Transfer) -> bool + Send + Sync,
102    ) -> BinaryOptionsResult<Transfer> {
103        let reciever = self.reciever(data, msg, response_type).await?;
104
105        timeout(
106            time,
107            async {
108                while let Ok(msg) = reciever.recv().await {
109                    if let Some(msg) = validate(&validator, msg)
110                        .inspect_err(|e| eprintln!("Failed to place trade {e}"))?
111                    {
112                        return Ok(msg);
113                    }
114                }
115                Err(BinaryOptionsToolsError::ChannelRequestRecievingError(
116                    RecvError,
117                ))
118            },
119            task.to_string(),
120        )
121        .await
122    }
123
124    pub async fn send_message_with_timeout_and_retry<
125        Transfer: MessageTransfer,
126        T: DataHandler<Transfer = Transfer>,
127    >(
128        &self,
129        time: Duration,
130        task: impl ToString,
131        data: &Data<T, Transfer>,
132        msg: Transfer,
133        response_type: Transfer::Info,
134        validator: impl Fn(&Transfer) -> bool + Send + Sync,
135    ) -> BinaryOptionsResult<Transfer> {
136        let reciever = self
137            .reciever(data, msg.clone(), response_type.clone())
138            .await?;
139
140        let call1 = timeout(
141            time,
142            async {
143                while let Ok(msg) = reciever.recv().await {
144                    if let Some(msg) = validate(&validator, msg)
145                        .inspect_err(|e| eprintln!("Failed to place trade {e}"))?
146                    {
147                        return Ok(msg);
148                    }
149                }
150                Err(BinaryOptionsToolsError::ChannelRequestRecievingError(
151                    RecvError,
152                ))
153            },
154            task.to_string(),
155        )
156        .await;
157        match call1 {
158            Ok(res) => Ok(res),
159            Err(_) => {
160                println!("Failded once trying again");
161                let reciever = self.reciever(data, msg, response_type).await?;
162                timeout(
163                    time,
164                    async {
165                        while let Ok(msg) = reciever.recv().await {
166                            if let Some(msg) = validate(&validator, msg)
167                                .inspect_err(|e| eprintln!("Failed to place trade {e}"))?
168                            {
169                                return Ok(msg);
170                            }
171                        }
172                        Err(BinaryOptionsToolsError::ChannelRequestRecievingError(
173                            RecvError,
174                        ))
175                    },
176                    task.to_string(),
177                )
178                .await
179            }
180        }
181    }
182}