nash_protocol/protocol/place_orders/
types.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use tokio::sync::{Mutex, RwLock};
5use tracing::error;
6
7use crate::errors::{Result, ProtocolError};
8use crate::protocol::ErrorResponse;
9use crate::protocol::{
10    asset_nonces::AssetNoncesRequest, list_markets::ListMarketsRequest, serializable_to_json,
11    sign_all_states::SignAllStates, NashProtocol, NashProtocolRequest,
12    ProtocolHook, ResponseOrError, State,
13};
14use crate::utils::current_time_as_i64;
15use crate::protocol::place_order::{LimitOrderRequest, PlaceOrderResponse, MarketOrderRequest};
16
17/// Request to place limit orders on Nash exchange. On an A/B market
18/// price amount will always be in terms of A and price in terms of B.
19pub type LimitOrdersRequest = MultiRequest<LimitOrderRequest>;
20pub type PlaceOrdersResponse = MultiResponse<PlaceOrderResponse>;
21
22/// Request to place market orders on Nash exchange. On an A/B market
23/// price amount will always be in terms of A and price in terms of B.
24pub type MarketOrdersRequest = MultiRequest<MarketOrderRequest>;
25
26use crate::protocol::place_order::types::{LimitOrderConstructor, MarketOrderConstructor};
27use crate::protocol::multi_request::{MultiRequest, MultiRequestConstructor, MultiResponse};
28use std::convert::TryInto;
29
30pub type LimitOrdersConstructor = MultiRequestConstructor<LimitOrderConstructor>;
31pub type MarketOrdersConstructor = MultiRequestConstructor<MarketOrderConstructor>;
32
33async fn get_required_hooks(state: Arc<RwLock<State>>, market: &str) -> Result<Vec<ProtocolHook>> {
34    let state = state.read().await;
35
36    let mut hooks = Vec::new();
37    // If we need assets or markets list, pull them
38    match (&state.assets, &state.markets) {
39        (None, _) | (_, None) => {
40            hooks.push(ProtocolHook::Protocol(NashProtocolRequest::ListMarkets(
41                ListMarketsRequest,
42            )));
43        }
44        _ => {}
45    }
46    // If have run out of r values, get more before running this pipeline
47    let chains = state.get_market(market)?.blockchains();
48    let fill_pool_schedules = state
49        .acquire_fill_pool_schedules(Some(&chains), Some(10))
50        .await?;
51    for (request, permit) in fill_pool_schedules {
52        // A bit too complicated but we have to satisfy the compiler
53        let permit = Some(Arc::new(Mutex::new(Some(permit))));
54        hooks.push(ProtocolHook::Protocol(NashProtocolRequest::DhFill(
55            request, permit,
56        )));
57    }
58    // Retrieve asset nonces if we don't have them or an error triggered need to refresh
59    match (state.asset_nonces.as_ref(), state.assets_nonces_refresh) {
60        (None, _) | (_, true) => {
61            hooks.push(ProtocolHook::Protocol(NashProtocolRequest::AssetNonces(
62                AssetNoncesRequest::new(),
63            )));
64        }
65        _ => {}
66    }
67    // If we are about to run out of orders...
68    if !state.dont_sign_states && state.get_remaining_orders() < 10 {
69        // Need to sign states
70        hooks.push(ProtocolHook::SignAllState(SignAllStates::new()));
71        // After signing states, need to update nonces again
72        hooks.push(ProtocolHook::Protocol(NashProtocolRequest::AssetNonces(
73            AssetNoncesRequest::new(),
74        )));
75    }
76    Ok(hooks)
77}
78
79#[async_trait]
80impl NashProtocol for LimitOrdersRequest {
81    type Response = PlaceOrdersResponse;
82
83    async fn acquire_permit(
84        &self,
85        state: Arc<RwLock<State>>,
86    ) -> Option<tokio::sync::OwnedSemaphorePermit> {
87        state
88            .read()
89            .await
90            .place_order_semaphore
91            .clone()
92            .acquire_owned()
93            .await
94            .ok()
95    }
96
97    async fn graphql(&self, state: Arc<RwLock<State>>) -> Result<serde_json::Value> {
98        if let Some(request) = self.requests.first() {
99            let builder = self.make_constructor(state.clone()).await?;
100            let time = current_time_as_i64();
101            let affiliate = state.read().await.affiliate_code.clone();
102            let market = state.read().await.get_market(&request.market)?;
103            let order_precision = 8;
104            let fee_precision = market.min_trade_size_b.asset.precision;
105            let query = builder.signed_graphql_request(time, affiliate, state, order_precision, fee_precision).await?;
106            serializable_to_json(&query)
107        } else {
108            Err(ProtocolError("Empty request."))
109        }
110    }
111
112    async fn response_from_json(
113        &self,
114        response: serde_json::Value,
115        _state: Arc<RwLock<State>>,
116    ) -> Result<ResponseOrError<Self::Response>> {
117        Ok(ResponseOrError::from_data(response.try_into()?))
118    }
119
120    /// Update the number of orders remaining before state sync
121    async fn process_response(
122        &self,
123        response: &Self::Response,
124        state: Arc<RwLock<State>>,
125    ) -> Result<()> {
126        if let Some(Ok(response)) = response.responses.iter().rfind(|response| response.is_ok()) {
127            let state = state.read().await;
128            state.set_remaining_orders(response.remaining_orders);
129        }
130        Ok(())
131    }
132
133    async fn process_error(
134        &self,
135        response: &ErrorResponse,
136        state: Arc<RwLock<State>>,
137    ) -> Result<()> {
138        // TODO: Do we need to decrement for errors?
139        state.read().await.decr_remaining_orders();
140        for err in &response.errors {
141            if err.message.find("invalid blockchain signature").is_some() {
142                error!(err = %err.message, request = ?self, "invalid blockchain signature");
143            }
144        }
145        Ok(())
146    }
147
148    /// Potentially get more r values or sign states before placing an order
149    async fn run_before(&self, state: Arc<RwLock<State>>) -> Result<Option<Vec<ProtocolHook>>> {
150        let request = &self.requests[0];
151        get_required_hooks(state, &request.market).await.map(Some)
152    }
153}
154
155#[async_trait]
156impl NashProtocol for MarketOrdersRequest {
157    type Response = PlaceOrdersResponse;
158
159    async fn acquire_permit(
160        &self,
161        state: Arc<RwLock<State>>,
162    ) -> Option<tokio::sync::OwnedSemaphorePermit> {
163        state
164            .read()
165            .await
166            .place_order_semaphore
167            .clone()
168            .acquire_owned()
169            .await
170            .ok()
171    }
172
173    async fn graphql(&self, state: Arc<RwLock<State>>) -> Result<serde_json::Value> {
174        if let Some(request) = self.requests.first() {
175            let builder = self.make_constructor(state.clone()).await?;
176            let time = current_time_as_i64();
177            let (affiliate, market) = {
178                let state_read = state.read().await;
179                let affiliate = state_read.affiliate_code.clone();
180                let market = state_read.get_market(&request.market)?;
181                (affiliate, market)
182            };
183            let order_precision = 8;
184            let fee_precision = market.min_trade_size_b.asset.precision;
185            let query = builder.signed_graphql_request(time, affiliate, state, order_precision, fee_precision).await?;
186            serializable_to_json(&query)
187        } else {
188            Err(ProtocolError("Empty request."))
189        }
190    }
191
192    async fn response_from_json(
193        &self,
194        response: serde_json::Value,
195        _state: Arc<RwLock<State>>,
196    ) -> Result<ResponseOrError<Self::Response>> {
197        Ok(ResponseOrError::from_data(response.try_into()?))
198    }
199
200    /// Update the number of orders remaining before state sync
201    async fn process_response(
202        &self,
203        response: &Self::Response,
204        state: Arc<RwLock<State>>,
205    ) -> Result<()> {
206        if let Some(Ok(response)) = response.responses.iter().rfind(|response| response.is_ok()) {
207            let state = state.read().await;
208            // TODO: Incorporate error into process response
209            state.set_remaining_orders(response.remaining_orders);
210        }
211        Ok(())
212    }
213
214    async fn process_error(
215        &self,
216        _response: &    ErrorResponse,
217        state: Arc<RwLock<State>>,
218    ) -> Result<()> {
219        // TODO: Do we need to decrement for errors?
220        state.read().await.decr_remaining_orders();
221        Ok(())
222    }
223
224    /// Potentially get more r values or sign states before placing an order
225    async fn run_before(&self, state: Arc<RwLock<State>>) -> Result<Option<Vec<ProtocolHook>>> {
226        let request = &self.requests[0];
227        get_required_hooks(state, &request.market).await.map(Some)
228    }
229}