1mod backproof_proto {
2 tonic::include_proto!("backproof");
3}
4
5pub mod api {
6 pub use super::backproof_proto::Candle;
7 pub use super::backproof_proto::Order;
8}
9
10use async_stream::stream;
11
12use backproof_proto::{
13 run_strategy_input::Event as InputEvent, run_strategy_output::Event as OutputEvent,
14 strategy_service_client::StrategyServiceClient, Candle, Order, RegisterStrategyRequest,
15 RunStrategyInput,
16};
17use futures_util::{pin_mut, StreamExt};
18use tonic::{codegen::StdError, transport::Channel};
19use tracing::{debug, error, info, instrument, span};
20
21use thiserror::Error as ThisError;
22
23#[derive(Debug)]
24pub struct Session(backproof_proto::Strategy);
25
26#[derive(ThisError, Debug)]
27pub enum BackproofSdkError {
28 #[error("cannot connect to the backend")]
29 ConnectionFailed(#[from] tonic::transport::Error),
30 #[error("internal server error")]
31 InternalServerError(#[from] tonic::Status),
32 #[error("you need registered the strategy before running it")]
33 NotRegistered,
34 #[error("critical error")]
35 CriticalError(Box<dyn std::error::Error>),
36 #[error("unknown")]
37 Unknown,
38}
39
40#[derive(Default, Debug)]
41pub struct BackProofStrategyBuilder {
43 api_key: Option<String>,
44 strategy: Option<Strategy>,
45}
46
47pub struct Strategy {
49 pub name: String,
50 pub logic: Box<dyn Fn(Candle) -> Order + Send + 'static>,
51}
52
53impl std::fmt::Debug for Strategy {
54 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55 f.debug_struct("Strategy")
56 .field("name", &self.name)
57 .finish()
58 }
59}
60
61impl BackProofStrategyBuilder {
63 pub fn new() -> Self {
64 Default::default()
65 }
66
67 #[instrument(skip(self, api_key))]
68 pub fn api_key(mut self, api_key: String) -> Self {
69 self.api_key = Some(api_key);
70 self
71 }
72
73 #[instrument(skip(self, logic))]
74 pub fn api_strategy(
75 mut self,
76 name: String,
77 logic: impl Fn(Candle) -> Order + 'static + Send,
78 ) -> Self {
79 let logic = Box::new(logic);
80 self.strategy = Some(Strategy { name, logic });
81 self
82 }
83
84 #[instrument(skip(self))]
85 pub async fn build<D>(self, endpoint: D) -> Result<BackproofStrategy, BackproofSdkError>
86 where
87 D: std::convert::TryInto<tonic::transport::Endpoint> + std::fmt::Debug,
88 D::Error: Into<StdError>,
89 {
90 let client = StrategyServiceClient::connect(endpoint).await?;
91
92 Ok(BackproofStrategy {
93 client,
94 api_key: self
95 .api_key
96 .expect("You need to set your Backproof Api Key."),
97 strategy: self
98 .strategy
99 .expect("You need to setup your backproof strategy."),
100 registered: false,
101 })
102 }
103}
104
105pub struct BackproofStrategy {
107 client: StrategyServiceClient<Channel>,
108 api_key: String,
109 strategy: Strategy,
110 registered: bool,
111}
112
113#[derive(Debug, Clone, Copy, Hash, Default)]
114pub struct Options {
117 tu: usize,
119}
120
121impl BackproofStrategy {
122 #[instrument]
124 pub fn builder() -> BackProofStrategyBuilder {
125 BackProofStrategyBuilder::new()
126 }
127
128 #[instrument(skip(self))]
130 pub async fn register(&mut self) -> Result<Session, BackproofSdkError> {
131 let response = self
132 .client
133 .register_strategy(RegisterStrategyRequest {
134 api_key: self.api_key.clone(),
135 strategy_name: self.strategy.name.clone(),
136 })
137 .await?;
138 let content = response.into_inner();
139 info!(?content, "RegisterReply");
140 self.registered = true;
141 info!(?self.strategy.name, ?self.api_key, "Successfully registered.");
142 Ok(Session(content.output.unwrap()))
143 }
144
145 #[instrument(skip(self))]
147 pub async fn run(
148 mut self,
149 _opts: Options,
150 session: Session,
151 ) -> Result<tokio::sync::oneshot::Sender<()>, BackproofSdkError> {
152 if !self.registered {
153 error!(
154 "You need to register your strategy before running it.\nCall `strategy.register()`."
155 );
156 return Err(BackproofSdkError::NotRegistered);
157 }
158 let (send, mut recv) = tokio::sync::oneshot::channel();
160
161 let (order_send, mut order_recv) = tokio::sync::mpsc::channel::<RunStrategyInput>(100);
163
164 let (candle_send, mut candle_recv) =
166 tokio::sync::mpsc::channel::<backproof_proto::RunStrategyOutput>(100);
167
168 debug!("channel created");
169 let order_stream = stream! {
170 while let Some(data) = order_recv.recv().await {
171 debug!(?data, "received");
172 yield data;
173 }
174 };
175 debug!("order stream created");
176
177 let response = self.client.run_strategy(order_stream).await?;
178 debug!(?response, "first server response");
179
180 let candle_stream = response.into_inner();
181
182 tokio::spawn(async move {
183 pin_mut!(candle_stream);
184
185 while let Some(Ok(input)) = candle_stream.next().await {
186 candle_send.send(input).await.unwrap();
187 }
188 });
189
190 let start_session_request = RunStrategyInput {
191 strategy_id: session.0.id.clone(),
192 event: Some(backproof_proto::run_strategy_input::Event::StartSession(
193 backproof_proto::StartSession {
194 api_key: self.api_key.clone(),
195 session_name: session.0.name,
196 },
197 )),
198 };
199
200 order_send
201 .send(start_session_request)
202 .await
203 .map_err(|err| {
204 error!(?err, "while sending start_session_request");
205 BackproofSdkError::CriticalError(Box::new(err))
206 })?;
207
208 tokio::spawn(async move {
209 let span = span!(tracing::Level::INFO, "Stream");
210 let _enter = span.enter();
211
212 loop {
213 tokio::select! {
214 Some(input) = candle_recv.recv() => {
215 info!(?input, "received from server");
216
217 if let Some(input_event) = input.event {
218 match input_event {
219 OutputEvent::Candle(candle) => {
220 let order = (self.strategy.logic)(candle);
221
222 debug!(?order, "sending to backend");
223 order_send
224 .send(RunStrategyInput { event: Some(InputEvent::Order(order)), strategy_id: session.0.id.clone() })
225 .await
226 .unwrap();
227 },
228 OutputEvent::EndSession(end_session) => {
229 info!(?end_session, "Server asked to end session properly. Closing.");
230 break;
231 },
232 };
233 } else {
234 error!("This should not happen, please report an issue to your backproof service provider.");
235 };
236 },
237 _ = &mut recv => {
238 info!("User killed the service.");
239 break;
240 }
241 }
242 }
243 });
244
245 Ok(send)
246 }
247}