binary_options_tools_core/general/
send.rs1use std::time::Duration;
2
3use async_channel::{bounded, Receiver, RecvError, Sender};
4use tokio_tungstenite::tungstenite::Message;
5use tracing::error;
6
7use crate::{
8 error::{BinaryOptionsResult, BinaryOptionsToolsError},
9 general::validate::validate,
10 utils::time::timeout,
11};
12
13use super::{
14 traits::{DataHandler, MessageTransfer},
15 types::Data,
16};
17
18#[derive(Clone)]
19pub struct SenderMessage {
20 sender: Sender<Message>,
21 sender_priority: Sender<Message>,
22}
23
24impl SenderMessage {
25 pub fn new(cap: usize) -> (Self, (Receiver<Message>, Receiver<Message>)) {
26 let (s, r) = bounded(cap);
27 let (sp, rp) = bounded(cap);
28
29 (
30 Self {
31 sender: s,
32 sender_priority: sp,
33 },
34 (r, rp),
35 )
36 }
37 async fn reciever<Transfer: MessageTransfer, T: DataHandler<Transfer = Transfer>>(
41 &self,
42 data: &Data<T, Transfer>,
43 msg: Transfer,
44 response_type: Transfer::Info,
45 ) -> BinaryOptionsResult<Receiver<Transfer>> {
46 let reciever = data.add_request(response_type).await;
47
48 self.send(msg)
49 .await
50 .map_err(|e| BinaryOptionsToolsError::ThreadMessageSendingErrorMPCS(e.to_string()))?;
51 Ok(reciever)
52 }
53
54 pub async fn send<Transfer: MessageTransfer>(&self, msg: Transfer) -> BinaryOptionsResult<()> {
55 self.sender
56 .send(msg.into())
57 .await
58 .map_err(|e| BinaryOptionsToolsError::ChannelRequestSendingError(e.to_string()))?;
59 Ok(())
60 }
61
62 pub async fn priority_send(&self, msg: Message) -> BinaryOptionsResult<()> {
63 self.sender_priority
64 .send(msg)
65 .await
66 .map_err(|e| BinaryOptionsToolsError::ChannelRequestSendingError(e.to_string()))?;
67 Ok(())
68 }
69
70 pub async fn send_message<Transfer: MessageTransfer, T: DataHandler<Transfer = Transfer>>(
71 &self,
72 data: &Data<T, Transfer>,
73 msg: Transfer,
74 response_type: Transfer::Info,
75 validator: impl Fn(&Transfer) -> bool + Send + Sync,
76 ) -> BinaryOptionsResult<Transfer> {
77 let reciever = self.reciever(data, msg, response_type).await?;
78
79 while let Ok(msg) = reciever.recv().await {
80 if let Some(msg) =
81 validate(&validator, msg).inspect_err(|e| error!("Failed to place trade {e}"))?
82 {
83 return Ok(msg);
84 }
85 }
86 Err(BinaryOptionsToolsError::ChannelRequestRecievingError(
87 RecvError,
88 ))
89 }
90
91 pub async fn send_message_with_timout<
92 Transfer: MessageTransfer,
93 T: DataHandler<Transfer = Transfer>,
94 >(
95 &self,
96 time: Duration,
97 task: impl ToString,
98 data: &Data<T, Transfer>,
99 msg: Transfer,
100 response_type: Transfer::Info,
101 validator: impl Fn(&Transfer) -> bool + Send + Sync,
102 ) -> BinaryOptionsResult<Transfer> {
103 let reciever = self.reciever(data, msg, response_type).await?;
104
105 timeout(
106 time,
107 async {
108 while let Ok(msg) = reciever.recv().await {
109 if let Some(msg) = validate(&validator, msg)
110 .inspect_err(|e| eprintln!("Failed to place trade {e}"))?
111 {
112 return Ok(msg);
113 }
114 }
115 Err(BinaryOptionsToolsError::ChannelRequestRecievingError(
116 RecvError,
117 ))
118 },
119 task.to_string(),
120 )
121 .await
122 }
123
124 pub async fn send_message_with_timeout_and_retry<
125 Transfer: MessageTransfer,
126 T: DataHandler<Transfer = Transfer>,
127 >(
128 &self,
129 time: Duration,
130 task: impl ToString,
131 data: &Data<T, Transfer>,
132 msg: Transfer,
133 response_type: Transfer::Info,
134 validator: impl Fn(&Transfer) -> bool + Send + Sync,
135 ) -> BinaryOptionsResult<Transfer> {
136 let reciever = self
137 .reciever(data, msg.clone(), response_type.clone())
138 .await?;
139
140 let call1 = timeout(
141 time,
142 async {
143 while let Ok(msg) = reciever.recv().await {
144 if let Some(msg) = validate(&validator, msg)
145 .inspect_err(|e| eprintln!("Failed to place trade {e}"))?
146 {
147 return Ok(msg);
148 }
149 }
150 Err(BinaryOptionsToolsError::ChannelRequestRecievingError(
151 RecvError,
152 ))
153 },
154 task.to_string(),
155 )
156 .await;
157 match call1 {
158 Ok(res) => Ok(res),
159 Err(_) => {
160 println!("Failded once trying again");
161 let reciever = self.reciever(data, msg, response_type).await?;
162 timeout(
163 time,
164 async {
165 while let Ok(msg) = reciever.recv().await {
166 if let Some(msg) = validate(&validator, msg)
167 .inspect_err(|e| eprintln!("Failed to place trade {e}"))?
168 {
169 return Ok(msg);
170 }
171 }
172 Err(BinaryOptionsToolsError::ChannelRequestRecievingError(
173 RecvError,
174 ))
175 },
176 task.to_string(),
177 )
178 .await
179 }
180 }
181 }
182}