backproof-sdk 0.1.1

Rust SDK to communicate with the backproof service
Documentation
mod backproof_proto {
    tonic::include_proto!("backproof");
}

pub mod api {
    pub use super::backproof_proto::Candle;
    pub use super::backproof_proto::Order;
}

use async_stream::stream;

use backproof_proto::{
    run_strategy_input::Event as InputEvent, run_strategy_output::Event as OutputEvent,
    strategy_service_client::StrategyServiceClient, Candle, Order, RegisterStrategyRequest,
    RunStrategyInput,
};
use futures_util::{pin_mut, StreamExt};
use tonic::{codegen::StdError, transport::Channel};
use tracing::{debug, error, info, instrument, span};

use thiserror::Error as ThisError;

#[derive(Debug)]
pub struct Session(backproof_proto::Strategy);

#[derive(ThisError, Debug)]
pub enum BackproofSdkError {
    #[error("cannot connect to the backend")]
    ConnectionFailed(#[from] tonic::transport::Error),
    #[error("internal server error")]
    InternalServerError(#[from] tonic::Status),
    #[error("you need registered the strategy before running it")]
    NotRegistered,
    #[error("critical error")]
    CriticalError(Box<dyn std::error::Error>),
    #[error("unknown")]
    Unknown,
}

#[derive(Default, Debug)]
/// The strategy backproof strategy builder.
pub struct BackProofStrategyBuilder {
    api_key: Option<String>,
    strategy: Option<Strategy>,
}

/// A single straegy that have to be registered in the backend.
pub struct Strategy {
    pub name: String,
    pub logic: Box<dyn Fn(Candle) -> Order + Send + 'static>,
}

impl std::fmt::Debug for Strategy {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("Strategy")
            .field("name", &self.name)
            .finish()
    }
}

/// Lets you build your strategy step by step.
impl BackProofStrategyBuilder {
    pub fn new() -> Self {
        Default::default()
    }

    #[instrument(skip(self, api_key))]
    pub fn api_key(mut self, api_key: String) -> Self {
        self.api_key = Some(api_key);
        self
    }

    #[instrument(skip(self, logic))]
    pub fn api_strategy(
        mut self,
        name: String,
        logic: impl Fn(Candle) -> Order + 'static + Send,
    ) -> Self {
        let logic = Box::new(logic);
        self.strategy = Some(Strategy { name, logic });
        self
    }

    #[instrument(skip(self))]
    pub async fn build<D>(self, endpoint: D) -> Result<BackproofStrategy, BackproofSdkError>
    where
        D: std::convert::TryInto<tonic::transport::Endpoint> + std::fmt::Debug,
        D::Error: Into<StdError>,
    {
        let client = StrategyServiceClient::connect(endpoint).await?;

        Ok(BackproofStrategy {
            client,
            api_key: self
                .api_key
                .expect("You need to set your Backproof Api Key."),
            strategy: self
                .strategy
                .expect("You need to setup your backproof strategy."),
            registered: false,
        })
    }
}

/// Represents the strategy that you can run.
pub struct BackproofStrategy {
    client: StrategyServiceClient<Channel>,
    api_key: String,
    strategy: Strategy,
    registered: bool,
}

#[derive(Debug, Clone, Copy, Hash, Default)]
/// Launch options.
/// You can define some parameters when running your strategy.
pub struct Options {
    /// Time unit of candles
    tu: usize,
}

impl BackproofStrategy {
    /// Get a builder to configure the strategy.
    #[instrument]
    pub fn builder() -> BackProofStrategyBuilder {
        BackProofStrategyBuilder::new()
    }

    /// register the strategy and setup the backend.
    #[instrument(skip(self))]
    pub async fn register(&mut self) -> Result<Session, BackproofSdkError> {
        let response = self
            .client
            .register_strategy(RegisterStrategyRequest {
                api_key: self.api_key.clone(),
                strategy_name: self.strategy.name.clone(),
            })
            .await?;
        let content = response.into_inner();
        info!(?content, "RegisterReply");
        self.registered = true;
        info!(?self.strategy.name, ?self.api_key, "Successfully registered.");
        Ok(Session(content.output.unwrap()))
    }

    /// Starts the strategy with given options.
    #[instrument(skip(self))]
    pub async fn run(
        mut self,
        _opts: Options,
        session: Session,
    ) -> Result<tokio::sync::oneshot::Sender<()>, BackproofSdkError> {
        if !self.registered {
            error!(
                "You need to register your strategy before running it.\nCall `strategy.register()`."
            );
            return Err(BackproofSdkError::NotRegistered);
        }
        // Create close channel
        let (send, mut recv) = tokio::sync::oneshot::channel();

        // Input
        let (order_send, mut order_recv) = tokio::sync::mpsc::channel::<RunStrategyInput>(100);

        // Output
        let (candle_send, mut candle_recv) =
            tokio::sync::mpsc::channel::<backproof_proto::RunStrategyOutput>(100);

        debug!("channel created");
        let order_stream = stream! {
            while let Some(data) = order_recv.recv().await {
                debug!(?data, "received");
                yield data;
            }
        };
        debug!("order stream created");

        let response = self.client.run_strategy(order_stream).await?;
        debug!(?response, "first server response");

        let candle_stream = response.into_inner();

        tokio::spawn(async move {
            pin_mut!(candle_stream);

            while let Some(Ok(input)) = candle_stream.next().await {
                candle_send.send(input).await.unwrap();
            }
        });

        let start_session_request = RunStrategyInput {
            strategy_id: session.0.id.clone(),
            event: Some(backproof_proto::run_strategy_input::Event::StartSession(
                backproof_proto::StartSession {
                    api_key: self.api_key.clone(),
                    session_name: session.0.name,
                },
            )),
        };

        order_send
            .send(start_session_request)
            .await
            .map_err(|err| {
                error!(?err, "while sending start_session_request");
                BackproofSdkError::CriticalError(Box::new(err))
            })?;

        tokio::spawn(async move {
            let span = span!(tracing::Level::INFO, "Stream");
            let _enter = span.enter();

            loop {
                tokio::select! {
                    Some(input) = candle_recv.recv() => {
                        info!(?input, "received from server");

                        if let Some(input_event) = input.event {
                            match input_event {
                                OutputEvent::Candle(candle) => {
                                    let order = (self.strategy.logic)(candle);

                                    debug!(?order, "sending to backend");
                                    order_send
                                        .send(RunStrategyInput { event: Some(InputEvent::Order(order)), strategy_id: session.0.id.clone() })
                                        .await
                                        .unwrap();
                                },
                                OutputEvent::EndSession(end_session) => {
                                    info!(?end_session, "Server asked to end session properly. Closing.");
                                    break;
                                },
                            };
                        } else {
                            error!("This should not happen, please report an issue to your backproof service provider.");
                        };
                    },
                    _ = &mut recv => {
                        info!("User killed the service.");
                        break;
                    }
                }
            }
        });

        Ok(send)
    }
}