nash_protocol/protocol/
state.rs

1//! The `State` struct captures all mutable state within protocol, such as asset nonces
2//! r-values, blockchain keys, and so on.
3
4use std::collections::{HashMap, HashSet};
5use std::sync::atomic::{AtomicU64, Ordering};
6use std::sync::Arc;
7
8use async_recursion::async_recursion;
9use tracing::{info, trace};
10
11use super::signer::Signer;
12use crate::errors::{ProtocolError, Result};
13use crate::protocol::dh_fill_pool::DhFillPoolRequest;
14use crate::types::{Asset, Blockchain, Market};
15
16//****************************************//
17//  Protocol state representation         //
18//****************************************//
19
20/// Client state shared across the protocol.
21#[derive(Debug)]
22pub struct State {
23    // Inside here we will have an explicit definition of all mutable
24    // protocol state. To see how any particular protocol request modifies
25    // state, can look at the impl of `process_response`.
26    // `signer` is an wrapper around keys used by the client for signing
27    pub signer: Option<Signer>,
28    // incrementing `asset_nonces` are used to invalidate old state in the channel
29    // here we keep track of the latest nonce for each asset
30    pub asset_nonces: Option<HashMap<String, Vec<u32>>>,
31    // list of markets pulled from nash
32    pub markets: Option<HashMap<String, Market>>,
33    // list of assets supported for trading in nash
34    pub assets: Option<Vec<Asset>>,
35    // remaining orders before state signing is required
36    // FIXME: move r-pool from global indexmap here
37    pub remaining_orders: AtomicU64,
38    // optional affiliate code, will receive a share of fees generated
39    pub affiliate_code: Option<String>,
40    pub assets_nonces_refresh: bool,
41    pub dont_sign_states: bool, // flag only for market maker users
42
43    pub place_order_semaphore: Arc<tokio::sync::Semaphore>,
44    pub sign_all_states_semaphore: Arc<tokio::sync::Semaphore>,
45    pub k1_fill_pool_semaphore: Arc<tokio::sync::Semaphore>,
46    pub r1_fill_pool_semaphore: Arc<tokio::sync::Semaphore>,
47}
48
49impl State {
50    pub fn new(signer: Option<Signer>) -> Self {
51        Self {
52            signer,
53            asset_nonces: None,
54            markets: None,
55            assets: None,
56            remaining_orders: AtomicU64::new(0),
57            affiliate_code: None,
58            assets_nonces_refresh: false,
59            dont_sign_states: false,
60            // Set these here for now
61            place_order_semaphore: Arc::new(tokio::sync::Semaphore::new(1)),
62            sign_all_states_semaphore: Arc::new(tokio::sync::Semaphore::new(1)),
63            k1_fill_pool_semaphore: Arc::new(tokio::sync::Semaphore::new(1)),
64            r1_fill_pool_semaphore: Arc::new(tokio::sync::Semaphore::new(1)),
65        }
66    }
67
68    pub fn from_keys_path(keys_path: Option<&str>) -> Result<Self> {
69        let signer = match keys_path {
70            Some(path) => Some(Signer::new(path)?),
71            None => None,
72        };
73        Ok(Self::new(signer))
74    }
75
76    pub fn from_keys(secret: &str, session: &str) -> Result<Self> {
77        let signer = Some(Signer::from_data(secret, session)?);
78        Ok(Self::new(signer))
79    }
80
81    pub fn signer(&self) -> Result<&Signer> {
82        self.signer
83            .as_ref()
84            .ok_or(ProtocolError("Signer not initiated"))
85    }
86
87    pub fn get_market(&self, market_name: &str) -> Result<Market> {
88        let market_map = self
89            .markets
90            .as_ref()
91            .ok_or(ProtocolError("Market map does not exist"))?;
92        market_map
93            .get(market_name)
94            .ok_or(ProtocolError("Market name does not exist"))
95            .map(|m| m.clone())
96    }
97
98    pub fn get_remaining_orders(&self) -> u64 {
99        return self.remaining_orders.load(Ordering::Relaxed);
100    }
101
102    pub fn set_remaining_orders(&self, n: u64) {
103        return self.remaining_orders.store(n, Ordering::Relaxed);
104    }
105
106    pub fn decr_remaining_orders(&self) {
107        self.remaining_orders.fetch_sub(1, Ordering::Relaxed);
108    }
109
110    /// Check if pools need a refill
111    #[async_recursion]
112    pub async fn acquire_fill_pool_schedules(
113        &self,
114        chains: Option<&'async_recursion Vec<Blockchain>>,
115        r_val_fill_pool_threshold: Option<u32>,
116    ) -> Result<Vec<(DhFillPoolRequest, tokio::sync::OwnedSemaphorePermit)>> {
117        let mut schedules = Vec::new();
118        let mut schedules_pool_types = HashSet::new();
119        let threshold = r_val_fill_pool_threshold.unwrap_or(R_VAL_FILL_POOL_THRESHOLD);
120        for chain in chains.unwrap_or(Blockchain::all().as_ref()) {
121            let remaining = self.signer()?.get_remaining_r_vals(chain);
122            info!("{:?}: {}", chain, remaining);
123            if remaining < threshold {
124                let (semaphore, pool_type) = match chain {
125                    Blockchain::Bitcoin => (&self.k1_fill_pool_semaphore, RValPoolTypes::K1),
126                    Blockchain::Ethereum => (&self.k1_fill_pool_semaphore, RValPoolTypes::K1),
127                    Blockchain::NEO => (&self.r1_fill_pool_semaphore, RValPoolTypes::R1),
128                };
129                // Don't schedule for the same pool twice
130                if !schedules_pool_types.contains(&pool_type) {
131                    if let Ok(permit) = semaphore.clone().try_acquire_owned() {
132                        let fill_size = MAX_R_VAL_POOL_SIZE - remaining;
133                        schedules.push((DhFillPoolRequest::new(chain.clone(), fill_size)?, permit));
134                        schedules_pool_types.insert(pool_type);
135                        trace!(?chain, %remaining ,%fill_size, "created fill pool request");
136                    } else {
137                        // A bit hacky but we usually only run into this when we try to place
138                        // orders immediately after starting the fill-loop. If the fill-loop
139                        // is running the fill-request and we are in the place-order protocol
140                        // we have to wait here for the keys to arrive.
141                        let _ = semaphore
142                            .acquire()
143                            .await
144                            .expect("Who closed the semaphore?");
145                        return self
146                            .acquire_fill_pool_schedules(chains, r_val_fill_pool_threshold)
147                            .await;
148                    }
149                }
150            }
151        }
152        Ok(schedules)
153    }
154}
155
156#[derive(Hash, PartialEq, Eq)]
157enum RValPoolTypes {
158    R1,
159    K1,
160}
161
162pub const MAX_R_VAL_POOL_SIZE: u32 = 100;
163pub const R_VAL_FILL_POOL_THRESHOLD: u32 = 60;