mod backproof_proto {
tonic::include_proto!("backproof");
}
pub mod api {
pub use super::backproof_proto::Candle;
pub use super::backproof_proto::Order;
}
use async_stream::stream;
use backproof_proto::{
run_strategy_input::Event as InputEvent, run_strategy_output::Event as OutputEvent,
strategy_service_client::StrategyServiceClient, Candle, Order, RegisterStrategyRequest,
RunStrategyInput,
};
use futures_util::{pin_mut, StreamExt};
use tonic::{codegen::StdError, transport::Channel};
use tracing::{debug, error, info, instrument, span};
use thiserror::Error as ThisError;
#[derive(Debug)]
pub struct Session(backproof_proto::Strategy);
#[derive(ThisError, Debug)]
pub enum BackproofSdkError {
#[error("cannot connect to the backend")]
ConnectionFailed(#[from] tonic::transport::Error),
#[error("internal server error")]
InternalServerError(#[from] tonic::Status),
#[error("you need registered the strategy before running it")]
NotRegistered,
#[error("critical error")]
CriticalError(Box<dyn std::error::Error>),
#[error("unknown")]
Unknown,
}
#[derive(Default, Debug)]
pub struct BackProofStrategyBuilder {
api_key: Option<String>,
strategy: Option<Strategy>,
}
pub struct Strategy {
pub name: String,
pub logic: Box<dyn Fn(Candle) -> Order + Send + 'static>,
}
impl std::fmt::Debug for Strategy {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Strategy")
.field("name", &self.name)
.finish()
}
}
impl BackProofStrategyBuilder {
pub fn new() -> Self {
Default::default()
}
#[instrument(skip(self, api_key))]
pub fn api_key(mut self, api_key: String) -> Self {
self.api_key = Some(api_key);
self
}
#[instrument(skip(self, logic))]
pub fn api_strategy(
mut self,
name: String,
logic: impl Fn(Candle) -> Order + 'static + Send,
) -> Self {
let logic = Box::new(logic);
self.strategy = Some(Strategy { name, logic });
self
}
#[instrument(skip(self))]
pub async fn build<D>(self, endpoint: D) -> Result<BackproofStrategy, BackproofSdkError>
where
D: std::convert::TryInto<tonic::transport::Endpoint> + std::fmt::Debug,
D::Error: Into<StdError>,
{
let client = StrategyServiceClient::connect(endpoint).await?;
Ok(BackproofStrategy {
client,
api_key: self
.api_key
.expect("You need to set your Backproof Api Key."),
strategy: self
.strategy
.expect("You need to setup your backproof strategy."),
registered: false,
})
}
}
pub struct BackproofStrategy {
client: StrategyServiceClient<Channel>,
api_key: String,
strategy: Strategy,
registered: bool,
}
#[derive(Debug, Clone, Copy, Hash, Default)]
pub struct Options {
tu: usize,
}
impl BackproofStrategy {
#[instrument]
pub fn builder() -> BackProofStrategyBuilder {
BackProofStrategyBuilder::new()
}
#[instrument(skip(self))]
pub async fn register(&mut self) -> Result<Session, BackproofSdkError> {
let response = self
.client
.register_strategy(RegisterStrategyRequest {
api_key: self.api_key.clone(),
strategy_name: self.strategy.name.clone(),
})
.await?;
let content = response.into_inner();
info!(?content, "RegisterReply");
self.registered = true;
info!(?self.strategy.name, ?self.api_key, "Successfully registered.");
Ok(Session(content.output.unwrap()))
}
#[instrument(skip(self))]
pub async fn run(
mut self,
_opts: Options,
session: Session,
) -> Result<tokio::sync::oneshot::Sender<()>, BackproofSdkError> {
if !self.registered {
error!(
"You need to register your strategy before running it.\nCall `strategy.register()`."
);
return Err(BackproofSdkError::NotRegistered);
}
let (send, mut recv) = tokio::sync::oneshot::channel();
let (order_send, mut order_recv) = tokio::sync::mpsc::channel::<RunStrategyInput>(100);
let (candle_send, mut candle_recv) =
tokio::sync::mpsc::channel::<backproof_proto::RunStrategyOutput>(100);
debug!("channel created");
let order_stream = stream! {
while let Some(data) = order_recv.recv().await {
debug!(?data, "received");
yield data;
}
};
debug!("order stream created");
let response = self.client.run_strategy(order_stream).await?;
debug!(?response, "first server response");
let candle_stream = response.into_inner();
tokio::spawn(async move {
pin_mut!(candle_stream);
while let Some(Ok(input)) = candle_stream.next().await {
candle_send.send(input).await.unwrap();
}
});
let start_session_request = RunStrategyInput {
strategy_id: session.0.id.clone(),
event: Some(backproof_proto::run_strategy_input::Event::StartSession(
backproof_proto::StartSession {
api_key: self.api_key.clone(),
session_name: session.0.name,
},
)),
};
order_send
.send(start_session_request)
.await
.map_err(|err| {
error!(?err, "while sending start_session_request");
BackproofSdkError::CriticalError(Box::new(err))
})?;
tokio::spawn(async move {
let span = span!(tracing::Level::INFO, "Stream");
let _enter = span.enter();
loop {
tokio::select! {
Some(input) = candle_recv.recv() => {
info!(?input, "received from server");
if let Some(input_event) = input.event {
match input_event {
OutputEvent::Candle(candle) => {
let order = (self.strategy.logic)(candle);
debug!(?order, "sending to backend");
order_send
.send(RunStrategyInput { event: Some(InputEvent::Order(order)), strategy_id: session.0.id.clone() })
.await
.unwrap();
},
OutputEvent::EndSession(end_session) => {
info!(?end_session, "Server asked to end session properly. Closing.");
break;
},
};
} else {
error!("This should not happen, please report an issue to your backproof service provider.");
};
},
_ = &mut recv => {
info!("User killed the service.");
break;
}
}
}
});
Ok(send)
}
}