backproof_sdk/
lib.rs

1mod backproof_proto {
2    tonic::include_proto!("backproof");
3}
4
5pub mod api {
6    pub use super::backproof_proto::Candle;
7    pub use super::backproof_proto::Order;
8}
9
10use async_stream::stream;
11
12use backproof_proto::{
13    run_strategy_input::Event as InputEvent, run_strategy_output::Event as OutputEvent,
14    strategy_service_client::StrategyServiceClient, Candle, Order, RegisterStrategyRequest,
15    RunStrategyInput,
16};
17use futures_util::{pin_mut, StreamExt};
18use tonic::{codegen::StdError, transport::Channel};
19use tracing::{debug, error, info, instrument, span};
20
21use thiserror::Error as ThisError;
22
23#[derive(Debug)]
24pub struct Session(backproof_proto::Strategy);
25
26#[derive(ThisError, Debug)]
27pub enum BackproofSdkError {
28    #[error("cannot connect to the backend")]
29    ConnectionFailed(#[from] tonic::transport::Error),
30    #[error("internal server error")]
31    InternalServerError(#[from] tonic::Status),
32    #[error("you need registered the strategy before running it")]
33    NotRegistered,
34    #[error("critical error")]
35    CriticalError(Box<dyn std::error::Error>),
36    #[error("unknown")]
37    Unknown,
38}
39
40#[derive(Default, Debug)]
41/// The strategy backproof strategy builder.
42pub struct BackProofStrategyBuilder {
43    api_key: Option<String>,
44    strategy: Option<Strategy>,
45}
46
47/// A single straegy that have to be registered in the backend.
48pub struct Strategy {
49    pub name: String,
50    pub logic: Box<dyn Fn(Candle) -> Order + Send + 'static>,
51}
52
53impl std::fmt::Debug for Strategy {
54    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55        f.debug_struct("Strategy")
56            .field("name", &self.name)
57            .finish()
58    }
59}
60
61/// Lets you build your strategy step by step.
62impl BackProofStrategyBuilder {
63    pub fn new() -> Self {
64        Default::default()
65    }
66
67    #[instrument(skip(self, api_key))]
68    pub fn api_key(mut self, api_key: String) -> Self {
69        self.api_key = Some(api_key);
70        self
71    }
72
73    #[instrument(skip(self, logic))]
74    pub fn api_strategy(
75        mut self,
76        name: String,
77        logic: impl Fn(Candle) -> Order + 'static + Send,
78    ) -> Self {
79        let logic = Box::new(logic);
80        self.strategy = Some(Strategy { name, logic });
81        self
82    }
83
84    #[instrument(skip(self))]
85    pub async fn build<D>(self, endpoint: D) -> Result<BackproofStrategy, BackproofSdkError>
86    where
87        D: std::convert::TryInto<tonic::transport::Endpoint> + std::fmt::Debug,
88        D::Error: Into<StdError>,
89    {
90        let client = StrategyServiceClient::connect(endpoint).await?;
91
92        Ok(BackproofStrategy {
93            client,
94            api_key: self
95                .api_key
96                .expect("You need to set your Backproof Api Key."),
97            strategy: self
98                .strategy
99                .expect("You need to setup your backproof strategy."),
100            registered: false,
101        })
102    }
103}
104
105/// Represents the strategy that you can run.
106pub struct BackproofStrategy {
107    client: StrategyServiceClient<Channel>,
108    api_key: String,
109    strategy: Strategy,
110    registered: bool,
111}
112
113#[derive(Debug, Clone, Copy, Hash, Default)]
114/// Launch options.
115/// You can define some parameters when running your strategy.
116pub struct Options {
117    /// Time unit of candles
118    tu: usize,
119}
120
121impl BackproofStrategy {
122    /// Get a builder to configure the strategy.
123    #[instrument]
124    pub fn builder() -> BackProofStrategyBuilder {
125        BackProofStrategyBuilder::new()
126    }
127
128    /// register the strategy and setup the backend.
129    #[instrument(skip(self))]
130    pub async fn register(&mut self) -> Result<Session, BackproofSdkError> {
131        let response = self
132            .client
133            .register_strategy(RegisterStrategyRequest {
134                api_key: self.api_key.clone(),
135                strategy_name: self.strategy.name.clone(),
136            })
137            .await?;
138        let content = response.into_inner();
139        info!(?content, "RegisterReply");
140        self.registered = true;
141        info!(?self.strategy.name, ?self.api_key, "Successfully registered.");
142        Ok(Session(content.output.unwrap()))
143    }
144
145    /// Starts the strategy with given options.
146    #[instrument(skip(self))]
147    pub async fn run(
148        mut self,
149        _opts: Options,
150        session: Session,
151    ) -> Result<tokio::sync::oneshot::Sender<()>, BackproofSdkError> {
152        if !self.registered {
153            error!(
154                "You need to register your strategy before running it.\nCall `strategy.register()`."
155            );
156            return Err(BackproofSdkError::NotRegistered);
157        }
158        // Create close channel
159        let (send, mut recv) = tokio::sync::oneshot::channel();
160
161        // Input
162        let (order_send, mut order_recv) = tokio::sync::mpsc::channel::<RunStrategyInput>(100);
163
164        // Output
165        let (candle_send, mut candle_recv) =
166            tokio::sync::mpsc::channel::<backproof_proto::RunStrategyOutput>(100);
167
168        debug!("channel created");
169        let order_stream = stream! {
170            while let Some(data) = order_recv.recv().await {
171                debug!(?data, "received");
172                yield data;
173            }
174        };
175        debug!("order stream created");
176
177        let response = self.client.run_strategy(order_stream).await?;
178        debug!(?response, "first server response");
179
180        let candle_stream = response.into_inner();
181
182        tokio::spawn(async move {
183            pin_mut!(candle_stream);
184
185            while let Some(Ok(input)) = candle_stream.next().await {
186                candle_send.send(input).await.unwrap();
187            }
188        });
189
190        let start_session_request = RunStrategyInput {
191            strategy_id: session.0.id.clone(),
192            event: Some(backproof_proto::run_strategy_input::Event::StartSession(
193                backproof_proto::StartSession {
194                    api_key: self.api_key.clone(),
195                    session_name: session.0.name,
196                },
197            )),
198        };
199
200        order_send
201            .send(start_session_request)
202            .await
203            .map_err(|err| {
204                error!(?err, "while sending start_session_request");
205                BackproofSdkError::CriticalError(Box::new(err))
206            })?;
207
208        tokio::spawn(async move {
209            let span = span!(tracing::Level::INFO, "Stream");
210            let _enter = span.enter();
211
212            loop {
213                tokio::select! {
214                    Some(input) = candle_recv.recv() => {
215                        info!(?input, "received from server");
216
217                        if let Some(input_event) = input.event {
218                            match input_event {
219                                OutputEvent::Candle(candle) => {
220                                    let order = (self.strategy.logic)(candle);
221
222                                    debug!(?order, "sending to backend");
223                                    order_send
224                                        .send(RunStrategyInput { event: Some(InputEvent::Order(order)), strategy_id: session.0.id.clone() })
225                                        .await
226                                        .unwrap();
227                                },
228                                OutputEvent::EndSession(end_session) => {
229                                    info!(?end_session, "Server asked to end session properly. Closing.");
230                                    break;
231                                },
232                            };
233                        } else {
234                            error!("This should not happen, please report an issue to your backproof service provider.");
235                        };
236                    },
237                    _ = &mut recv => {
238                        info!("User killed the service.");
239                        break;
240                    }
241                }
242            }
243        });
244
245        Ok(send)
246    }
247}