1mod auth;
2mod connector;
3mod endpoint;
4mod middleware;
5mod receiver;
6mod sender;
7
8pub mod traits;
9
10use crate::client::{receiver::on_message, sender::send_heartbeat};
11use crate::openapi::{
12 ProtoMessage, ProtoOaAccountAuthReq, ProtoOaAccountLogoutReq, ProtoOaApplicationAuthReq,
13 ProtoOaAssetClassListReq, ProtoOaAssetListReq, ProtoOaCancelOrderReq, ProtoOaClosePositionReq,
14 ProtoOaDealOffsetListReq, ProtoOaGetAccountListByAccessTokenReq,
15 ProtoOaGetPositionUnrealizedPnLReq, ProtoOaGetTickDataReq, ProtoOaGetTrendbarsReq,
16 ProtoOaNewOrderReq, ProtoOaOrderDetailsReq, ProtoOaOrderListByPositionIdReq, ProtoOaQuoteType,
17 ProtoOaReconcileReq, ProtoOaRefreshTokenReq, ProtoOaSubscribeSpotsReq,
18 ProtoOaSymbolCategoryListReq, ProtoOaSymbolsListReq, ProtoOaTraderReq,
19 ProtoOaUnsubscribeSpotsReq,
20};
21use crate::openapi::{ProtoOaOrderType, ProtoOaTradeSide};
22use endpoint::Endpoints;
23use prost::Message;
24
25use crate::types::{Auth, CTraderClient};
26
27use futures_util::{SinkExt, StreamExt};
28
29use std::time::Duration;
30use std::{collections::HashMap, sync::Arc};
31use tokio::sync::Mutex;
32use tokio::task::JoinHandle;
33use tokio_tungstenite::{connect_async, tungstenite::Message as WsMessage};
34
35#[allow(dead_code)]
36impl CTraderClient {
37 pub async fn new(
44 is_demo: bool,
45 app_client_id: String,
46 app_access_token: String,
47 app_client_secret: String,
48 app_redirect_url: String,
49 refresh_token: String,
50 ) -> Result<(Self, JoinHandle<()>, JoinHandle<()>), anyhow::Error> {
51 let host = if is_demo {
52 Endpoints::DEMO_HOST_WS_URI
53 } else {
54 Endpoints::LIVE_HOST_URI
55 };
56
57 let url = format!("{}:{}", host, Endpoints::PROTOBUF_PORT);
58
59 tracing::info!(
60 "Connecting Async Client to CTrader Endpoint {} OpenAPI",
61 url
62 );
63
64 let web_socket_stream = loop {
66 let web_socket_stream = if let Ok((ws_stream, _)) = connect_async(url.as_str()).await {
67 Some(ws_stream)
68 } else {
69 tracing::warn!("Error connecting to {} retrying in 5s ", url);
70 None
71 };
72
73 if web_socket_stream.is_some() {
74 tracing::info!("Successfully Connected Async Client to CTrader OpenAPI");
75 break web_socket_stream.unwrap();
76 }
77
78 tokio::time::sleep(Duration::from_secs(5)).await;
79 };
80
81 let auth = Auth::new(
82 app_client_id,
83 app_access_token,
84 app_client_secret,
85 app_redirect_url,
86 refresh_token,
87 );
88
89 let (ws_write, ws_read) = web_socket_stream.split();
90 let incoming = Arc::new(Mutex::new(ws_read));
91
92 let outgoing = Arc::new(Mutex::new(ws_write));
93
94 let message_handle = tokio::spawn(on_message(incoming));
95 let heartbeat_handle = tokio::spawn(send_heartbeat(outgoing.clone()));
96
97 Ok((
98 Self {
99 auth,
100 ws_write: outgoing.clone(),
101 },
102 message_handle,
103 heartbeat_handle,
104 ))
105 }
106
107 async fn send_heartbeat(&self) {}
109
110 pub async fn send_new_limit_order(
112 &self,
113 account_id: i64,
114 symbol_id: i64,
115 trade_side: ProtoOaTradeSide,
116 volume: i64,
117 price: f64,
118 ) -> Result<(), anyhow::Error> {
119 let order_type = ProtoOaOrderType::Limit;
120
121 self.send_new_order_request(
122 account_id,
123 symbol_id,
124 order_type,
125 trade_side,
126 volume,
127 Some(price),
128 )
129 .await?;
130 Ok(())
131 }
132
133 pub async fn send_new_market_order(
135 &self,
136 account_id: i64,
137 symbol_id: i64,
138 trade_side: ProtoOaTradeSide,
139 volume: i64,
140 ) -> Result<(), anyhow::Error> {
141 let order_type = ProtoOaOrderType::Market;
142
143 self.send_new_order_request(account_id, symbol_id, order_type, trade_side, volume, None)
144 .await?;
145 Ok(())
146 }
147
148 pub async fn send_new_stop_order(
150 &self,
151 account_id: i64,
152 symbol_id: i64,
153 trade_side: ProtoOaTradeSide,
154 volume: i64,
155 price: f64,
156 ) -> Result<(), anyhow::Error> {
157 let order_type = ProtoOaOrderType::Stop;
158
159 self.send_new_order_request(
160 account_id,
161 symbol_id,
162 order_type,
163 trade_side,
164 volume,
165 Some(price),
166 )
167 .await?;
168 Ok(())
169 }
170
171 pub async fn send_refresh_token_request(&self) -> Result<(), anyhow::Error> {
172 tracing::info!("Refreshing Application Token");
173
174 let req = ProtoOaRefreshTokenReq {
175 refresh_token: self.auth.ctrader_refresh_token.to_string(),
176 payload_type: Some(2173),
177 };
178
179 let mut buf = Vec::new();
181 req.encode(&mut buf)?;
182
183 let msg_id: u16 = 2173; let mut message = Vec::with_capacity(2 + buf.len());
186 message.extend_from_slice(&msg_id.to_le_bytes()); message.extend_from_slice(&buf); let ws_msg = WsMessage::Binary(message.into());
191
192 let _ = &mut self.ws_write.lock().await.send(ws_msg).await?;
193
194 Ok(())
195 }
196
197 pub async fn send_application_auth_request(&mut self) -> Result<(), anyhow::Error> {
199 tracing::info!("Authenticating Async Client to CTrader OpenAPI");
200
201 let req = ProtoOaApplicationAuthReq {
202 client_id: self.auth.ctrader_client_id.to_string(),
203 client_secret: self.auth.ctrader_client_secret.to_string(),
204 payload_type: Some(2100),
205 };
206
207 let mut buf = Vec::new();
209 req.encode(&mut buf)?;
210
211 let msg_id: u16 = 2100; let mut message = Vec::with_capacity(2 + buf.len());
214 message.extend_from_slice(&msg_id.to_le_bytes()); message.extend_from_slice(&buf); let ws_msg = WsMessage::Binary(message.into());
219
220 let _ = &mut self.ws_write.lock().await.send(ws_msg).await?;
221
222 Ok(())
223 }
224
225 pub async fn send_set_account_request(&mut self, account_id: i64) -> Result<(), anyhow::Error> {
226 tracing::info!("Setting Active account to {}", account_id);
227
228 let req = ProtoOaAccountAuthReq {
229 ctid_trader_account_id: account_id,
230 access_token: self.auth.ctrader_access_token.to_string(),
231 payload_type: Some(2102),
232 };
233
234 let mut buf = Vec::new();
236 req.encode(&mut buf)?;
237
238 let msg_id: u16 = 2102; let mut message = Vec::with_capacity(2 + buf.len());
241 message.extend_from_slice(&msg_id.to_le_bytes()); message.extend_from_slice(&buf); let ws_msg = WsMessage::Binary(message.into());
246
247 let _ = &mut self.ws_write.lock().await.send(ws_msg).await?;
248
249 Ok(())
250 }
251
252 pub async fn send_get_account_list_by_access_token_request(&self) -> Result<(), anyhow::Error> {
253 let req = ProtoOaGetAccountListByAccessTokenReq {
254 access_token: self.auth.ctrader_access_token.to_string(),
255 payload_type: Some(2149),
256 };
257
258 let mut buf = Vec::new();
260 req.encode(&mut buf)?;
261
262 let msg_id: u16 = 2149; let mut message = Vec::with_capacity(2 + buf.len());
265 message.extend_from_slice(&msg_id.to_le_bytes()); message.extend_from_slice(&buf); let ws_msg = WsMessage::Binary(message.into());
270
271 let _ = &mut self.ws_write.lock().await.send(ws_msg).await?;
272
273 Ok(())
274 }
275
276 pub async fn send_account_logout_request(&self, account_id: i64) -> Result<(), anyhow::Error> {
277 let req = ProtoOaAccountLogoutReq {
278 ctid_trader_account_id: account_id,
279 payload_type: Some(2162),
280 };
281
282 let mut buf = Vec::new();
284 req.encode(&mut buf)?;
285
286 let msg_id: u16 = 2162; let mut message = Vec::with_capacity(2 + buf.len());
289 message.extend_from_slice(&msg_id.to_le_bytes()); message.extend_from_slice(&buf); let ws_msg = WsMessage::Binary(message.into());
294
295 let _ = &mut self.ws_write.lock().await.send(ws_msg).await?;
296
297 Ok(())
298 }
299
300 pub async fn send_asset_list_request(&self, account_id: i64) -> Result<(), anyhow::Error> {
301 let req = ProtoOaAssetListReq {
302 ctid_trader_account_id: account_id,
303 payload_type: Some(2112),
304 };
305
306 let mut buf = Vec::new();
308 req.encode(&mut buf)?;
309
310 let msg_id: u16 = 2112; let mut message = Vec::with_capacity(2 + buf.len());
313 message.extend_from_slice(&msg_id.to_le_bytes()); message.extend_from_slice(&buf); let ws_msg = WsMessage::Binary(message.into());
318
319 let _ = &mut self.ws_write.lock().await.send(ws_msg).await?;
320
321 Ok(())
322 }
323
324 pub async fn send_asset_class_list_request(
325 &self,
326 account_id: i64,
327 ) -> Result<(), anyhow::Error> {
328 let req = ProtoOaAssetClassListReq {
329 ctid_trader_account_id: account_id,
330 payload_type: Some(2153),
331 };
332
333 let mut buf = Vec::new();
335 req.encode(&mut buf)?;
336
337 let msg_id: u16 = 2153; let mut message = Vec::with_capacity(2 + buf.len());
340 message.extend_from_slice(&msg_id.to_le_bytes()); message.extend_from_slice(&buf); let ws_msg = WsMessage::Binary(message.into());
345
346 let _ = &mut self.ws_write.lock().await.send(ws_msg).await?;
347
348 Ok(())
349 }
350
351 pub async fn send_symbol_category_list_request(
352 &self,
353 account_id: i64,
354 ) -> Result<(), anyhow::Error> {
355 let req = ProtoOaSymbolCategoryListReq {
356 ctid_trader_account_id: account_id,
357 payload_type: Some(2160),
358 };
359
360 let mut buf = Vec::new();
362 req.encode(&mut buf)?;
363
364 let msg_id: u16 = 2160; let mut message = Vec::with_capacity(2 + buf.len());
367 message.extend_from_slice(&msg_id.to_le_bytes()); message.extend_from_slice(&buf); let ws_msg = WsMessage::Binary(message.into());
372
373 let _ = &mut self.ws_write.lock().await.send(ws_msg).await?;
374
375 Ok(())
376 }
377
378 pub async fn send_symbols_list_request(
379 &self,
380 account_id: i64,
381 include_archived_symbols: bool,
382 ) -> Result<(), anyhow::Error> {
383 let req = ProtoOaSymbolsListReq {
384 ctid_trader_account_id: account_id,
385 include_archived_symbols: Some(include_archived_symbols),
386 payload_type: Some(2114),
387 };
388
389 let mut buf = Vec::new();
391 req.encode(&mut buf)?;
392
393 let msg_id: u16 = 2114; let mut message = Vec::with_capacity(2 + buf.len());
396 message.extend_from_slice(&msg_id.to_le_bytes()); message.extend_from_slice(&buf); let ws_msg = WsMessage::Binary(message.into());
401
402 let _ = &mut self.ws_write.lock().await.send(ws_msg).await?;
403
404 Ok(())
405 }
406
407 pub async fn send_trader_request(&self, account_id: i64) -> Result<(), anyhow::Error> {
408 let req = ProtoOaTraderReq {
409 ctid_trader_account_id: account_id,
410 payload_type: Some(2121),
411 };
412
413 let mut buf = Vec::new();
415 req.encode(&mut buf)?;
416
417 let msg_id: u16 = 2121; let mut message = Vec::with_capacity(2 + buf.len());
420 message.extend_from_slice(&msg_id.to_le_bytes()); message.extend_from_slice(&buf); let ws_msg = WsMessage::Binary(message.into());
425
426 let _ = &mut self.ws_write.lock().await.send(ws_msg).await?;
427
428 Ok(())
429 }
430
431 pub async fn send_unsubscribe_spots_request(
432 &self,
433 account_id: i64,
434 symbol_id: Vec<i64>,
435 ) -> Result<(), anyhow::Error> {
436 let req = ProtoOaUnsubscribeSpotsReq {
437 ctid_trader_account_id: account_id,
438 symbol_id: symbol_id,
439 payload_type: Some(2129),
440 };
441
442 let mut buf = Vec::new();
444 req.encode(&mut buf)?;
445
446 let msg_id: u16 = 2129; let mut message = Vec::with_capacity(2 + buf.len());
449 message.extend_from_slice(&msg_id.to_le_bytes()); message.extend_from_slice(&buf); let ws_msg = WsMessage::Binary(message.into());
454
455 let _ = &mut self.ws_write.lock().await.send(ws_msg).await?;
456
457 Ok(())
458 }
459
460 pub async fn send_subscribe_spots_request(
461 &self,
462 account_id: i64,
463 symbol_id: Vec<i64>,
464 time_in_seconds: usize,
465 subscribe_to_spot_timestamp: bool,
466 ) -> Result<(), anyhow::Error> {
467 let req = ProtoOaSubscribeSpotsReq {
468 ctid_trader_account_id: account_id,
469 symbol_id: symbol_id,
470 subscribe_to_spot_timestamp: Some(subscribe_to_spot_timestamp),
471 payload_type: Some(2127),
472 };
473
474 let mut buf = Vec::new();
476 req.encode(&mut buf)?;
477
478 let msg_id: u16 = 2127; let mut message = Vec::with_capacity(2 + buf.len());
481 message.extend_from_slice(&msg_id.to_le_bytes()); message.extend_from_slice(&buf); let ws_msg = WsMessage::Binary(message.into());
486
487 let _ = &mut self.ws_write.lock().await.send(ws_msg).await?;
488
489 Ok(())
490 }
491
492 pub async fn send_get_tick_data_request(
493 &self,
494 account_id: i64,
495 days: i64,
496 quote_type: ProtoOaQuoteType,
497 symbol_id: i64,
498 from_timestamp: Option<i64>,
499 to_timestamp: Option<i64>,
500 ) -> Result<(), anyhow::Error> {
501 let req = ProtoOaGetTickDataReq {
502 ctid_trader_account_id: account_id,
503 symbol_id: symbol_id,
504 payload_type: Some(2145),
505 r#type: quote_type as i32,
506 from_timestamp: from_timestamp,
507 to_timestamp: to_timestamp,
508 };
509
510 let mut buf = Vec::new();
512 req.encode(&mut buf)?;
513
514 let msg_id: u16 = 2145; let mut message = Vec::with_capacity(2 + buf.len());
517 message.extend_from_slice(&msg_id.to_le_bytes()); message.extend_from_slice(&buf); let ws_msg = WsMessage::Binary(message.into());
522
523 let _ = &mut self.ws_write.lock().await.send(ws_msg).await?;
524
525 Ok(())
526 }
527
528 pub async fn send_get_trendbars_request(
529 &self,
530 account_id: i64,
531 period: i32,
532 symbol_id: i64,
533 count: u32,
534 from_timestamp: i64,
535 to_timestamp: i64,
536 ) -> Result<(), anyhow::Error> {
537 let req = ProtoOaGetTrendbarsReq {
538 ctid_trader_account_id: account_id,
539 from_timestamp: Some(from_timestamp),
540 to_timestamp: Some(to_timestamp),
541 payload_type: Some(2137),
542 period: period,
543 count: Some(count),
544 symbol_id: symbol_id,
545 };
546
547 let mut buf = Vec::new();
549 req.encode(&mut buf)?;
550
551 let msg_id: u16 = 2137; let mut message = Vec::with_capacity(2 + buf.len());
554 message.extend_from_slice(&msg_id.to_le_bytes()); message.extend_from_slice(&buf); let ws_msg = WsMessage::Binary(message.into());
559
560 let _ = &mut self.ws_write.lock().await.send(ws_msg).await?;
561
562 Ok(())
563 }
564
565 pub async fn send_new_order_request(
566 &self,
567 account_id: i64,
568 symbol_id: i64,
569 order_type: ProtoOaOrderType,
570 trade_side: ProtoOaTradeSide,
571 volume: i64,
572 price: Option<f64>,
573 ) -> Result<(), anyhow::Error> {
574 let mut req = ProtoOaNewOrderReq {
575 ctid_trader_account_id: account_id,
576 symbol_id: symbol_id,
577 order_type: order_type as i32,
578 trade_side: trade_side as i32,
579 volume: volume,
580 payload_type: Some(2106),
581 ..Default::default()
582 };
583
584 match order_type {
585 ProtoOaOrderType::Limit => {
586 req.limit_price = price;
587 }
588
589 ProtoOaOrderType::Stop => req.stop_price = price,
590
591 ProtoOaOrderType::Market => {}
592
593 ProtoOaOrderType::StopLossTakeProfit => {}
594
595 _ => {}
596 };
597
598 let mut buf = Vec::new();
600 req.encode(&mut buf)?;
601
602 let msg_id: u16 = 2137; let mut message = Vec::with_capacity(2 + buf.len());
605 message.extend_from_slice(&msg_id.to_le_bytes()); message.extend_from_slice(&buf); let ws_msg = WsMessage::Binary(message.into());
610
611 let _ = &mut self.ws_write.lock().await.send(ws_msg).await?;
612
613 Ok(())
614 }
615
616 pub async fn send_reconcile_request(&self, account_id: i64) -> Result<(), anyhow::Error> {
617 let req = ProtoOaReconcileReq {
618 ctid_trader_account_id: account_id,
619 payload_type: Some(2124),
620 ..Default::default()
621 };
622
623 let mut buf = Vec::new();
625 req.encode(&mut buf)?;
626
627 let msg_id: u16 = 2124; let mut message = Vec::with_capacity(2 + buf.len());
630 message.extend_from_slice(&msg_id.to_le_bytes()); message.extend_from_slice(&buf); let ws_msg = WsMessage::Binary(message.into());
635
636 let _ = &mut self.ws_write.lock().await.send(ws_msg).await?;
637
638 Ok(())
639 }
640
641 pub async fn send_close_position_request(
642 &self,
643 account_id: i64,
644 position_id: i64,
645 volume: i64,
646 ) -> Result<(), anyhow::Error> {
647 let req = ProtoOaClosePositionReq {
648 ctid_trader_account_id: account_id,
649 position_id: position_id,
650 volume: volume,
651 payload_type: Some(2111),
652 };
653
654 let mut buf = Vec::new();
656 req.encode(&mut buf)?;
657
658 let msg_id: u16 = 2111; let mut message = Vec::with_capacity(2 + buf.len());
661 message.extend_from_slice(&msg_id.to_le_bytes()); message.extend_from_slice(&buf); let ws_msg = WsMessage::Binary(message.into());
666
667 let _ = &mut self.ws_write.lock().await.send(ws_msg).await?;
668
669 Ok(())
670 }
671
672 pub async fn send_cancel_order_request(
673 &self,
674 account_id: i64,
675 order_id: i64,
676 ) -> Result<(), anyhow::Error> {
677 let req = ProtoOaCancelOrderReq {
678 ctid_trader_account_id: account_id,
679 order_id: order_id,
680 payload_type: Some(2108),
681 };
682
683 let mut buf = Vec::new();
685 req.encode(&mut buf)?;
686
687 let msg_id: u16 = 2108; let mut message = Vec::with_capacity(2 + buf.len());
690 message.extend_from_slice(&msg_id.to_le_bytes()); message.extend_from_slice(&buf); let ws_msg = WsMessage::Binary(message.into());
695
696 let _ = &mut self.ws_write.lock().await.send(ws_msg).await?;
697
698 Ok(())
699 }
700
701 pub async fn send_deal_offset_list_request(
702 &self,
703 account_id: i64,
704 deal_id: i64,
705 ) -> Result<(), anyhow::Error> {
706 let req = ProtoOaDealOffsetListReq {
707 ctid_trader_account_id: account_id,
708 deal_id: deal_id,
709 payload_type: Some(2185),
710 };
711
712 let mut buf = Vec::new();
714 req.encode(&mut buf)?;
715
716 let msg_id: u16 = 2185; let mut message = Vec::with_capacity(2 + buf.len());
719 message.extend_from_slice(&msg_id.to_le_bytes()); message.extend_from_slice(&buf); let ws_msg = WsMessage::Binary(message.into());
724
725 let _ = &mut self.ws_write.lock().await.send(ws_msg).await?;
726
727 Ok(())
728 }
729
730 pub async fn send_get_position_unrealized_pnl_equest(
731 &self,
732 account_id: i64,
733 ) -> Result<(), anyhow::Error> {
734 let req = ProtoOaGetPositionUnrealizedPnLReq {
735 ctid_trader_account_id: account_id,
736 payload_type: Some(2137),
737 };
738
739 let mut buf = Vec::new();
741 req.encode(&mut buf)?;
742
743 let msg_id: u16 = 2137; let mut message = Vec::with_capacity(2 + buf.len());
746 message.extend_from_slice(&msg_id.to_le_bytes()); message.extend_from_slice(&buf); let ws_msg = WsMessage::Binary(message.into());
751
752 let _ = &mut self.ws_write.lock().await.send(ws_msg).await?;
753
754 Ok(())
755 }
756
757 pub async fn send_order_details_request(
758 &self,
759 account_id: i64,
760 order_id: i64,
761 ) -> Result<(), anyhow::Error> {
762 let req = ProtoOaOrderDetailsReq {
763 ctid_trader_account_id: account_id,
764 order_id: order_id,
765 payload_type: Some(2181),
766 };
767
768 let mut buf = Vec::new();
770 req.encode(&mut buf)?;
771
772 let msg_id: u16 = 2181; let mut message = Vec::with_capacity(2 + buf.len());
775 message.extend_from_slice(&msg_id.to_le_bytes()); message.extend_from_slice(&buf); let ws_msg = WsMessage::Binary(message.into());
780
781 let _ = &mut self.ws_write.lock().await.send(ws_msg).await?;
782
783 Ok(())
784 }
785
786 pub async fn send_order_list_by_position_id_request(
787 &self,
788 account_id: i64,
789 position_id: i64,
790 from_timestamp: i64,
791 to_timestamp: i64,
792 ) -> Result<(), anyhow::Error> {
793 let req = ProtoOaOrderListByPositionIdReq {
794 ctid_trader_account_id: account_id,
795 position_id: position_id,
796 payload_type: Some(2183),
797 from_timestamp: Some(from_timestamp),
798 to_timestamp: Some(to_timestamp),
799 };
800
801 let mut buf = Vec::new();
803 req.encode(&mut buf)?;
804
805 let msg_id: u16 = 2183; let mut message = Vec::with_capacity(2 + buf.len());
808 message.extend_from_slice(&msg_id.to_le_bytes()); message.extend_from_slice(&buf); let ws_msg = WsMessage::Binary(message.into());
813
814 let _ = &mut self.ws_write.lock().await.send(ws_msg).await?;
815
816 Ok(())
817 }
818}