nash_protocol/protocol/
state.rs1use std::collections::{HashMap, HashSet};
5use std::sync::atomic::{AtomicU64, Ordering};
6use std::sync::Arc;
7
8use async_recursion::async_recursion;
9use tracing::{info, trace};
10
11use super::signer::Signer;
12use crate::errors::{ProtocolError, Result};
13use crate::protocol::dh_fill_pool::DhFillPoolRequest;
14use crate::types::{Asset, Blockchain, Market};
15
16#[derive(Debug)]
22pub struct State {
23 pub signer: Option<Signer>,
28 pub asset_nonces: Option<HashMap<String, Vec<u32>>>,
31 pub markets: Option<HashMap<String, Market>>,
33 pub assets: Option<Vec<Asset>>,
35 pub remaining_orders: AtomicU64,
38 pub affiliate_code: Option<String>,
40 pub assets_nonces_refresh: bool,
41 pub dont_sign_states: bool, pub place_order_semaphore: Arc<tokio::sync::Semaphore>,
44 pub sign_all_states_semaphore: Arc<tokio::sync::Semaphore>,
45 pub k1_fill_pool_semaphore: Arc<tokio::sync::Semaphore>,
46 pub r1_fill_pool_semaphore: Arc<tokio::sync::Semaphore>,
47}
48
49impl State {
50 pub fn new(signer: Option<Signer>) -> Self {
51 Self {
52 signer,
53 asset_nonces: None,
54 markets: None,
55 assets: None,
56 remaining_orders: AtomicU64::new(0),
57 affiliate_code: None,
58 assets_nonces_refresh: false,
59 dont_sign_states: false,
60 place_order_semaphore: Arc::new(tokio::sync::Semaphore::new(1)),
62 sign_all_states_semaphore: Arc::new(tokio::sync::Semaphore::new(1)),
63 k1_fill_pool_semaphore: Arc::new(tokio::sync::Semaphore::new(1)),
64 r1_fill_pool_semaphore: Arc::new(tokio::sync::Semaphore::new(1)),
65 }
66 }
67
68 pub fn from_keys_path(keys_path: Option<&str>) -> Result<Self> {
69 let signer = match keys_path {
70 Some(path) => Some(Signer::new(path)?),
71 None => None,
72 };
73 Ok(Self::new(signer))
74 }
75
76 pub fn from_keys(secret: &str, session: &str) -> Result<Self> {
77 let signer = Some(Signer::from_data(secret, session)?);
78 Ok(Self::new(signer))
79 }
80
81 pub fn signer(&self) -> Result<&Signer> {
82 self.signer
83 .as_ref()
84 .ok_or(ProtocolError("Signer not initiated"))
85 }
86
87 pub fn get_market(&self, market_name: &str) -> Result<Market> {
88 let market_map = self
89 .markets
90 .as_ref()
91 .ok_or(ProtocolError("Market map does not exist"))?;
92 market_map
93 .get(market_name)
94 .ok_or(ProtocolError("Market name does not exist"))
95 .map(|m| m.clone())
96 }
97
98 pub fn get_remaining_orders(&self) -> u64 {
99 return self.remaining_orders.load(Ordering::Relaxed);
100 }
101
102 pub fn set_remaining_orders(&self, n: u64) {
103 return self.remaining_orders.store(n, Ordering::Relaxed);
104 }
105
106 pub fn decr_remaining_orders(&self) {
107 self.remaining_orders.fetch_sub(1, Ordering::Relaxed);
108 }
109
110 #[async_recursion]
112 pub async fn acquire_fill_pool_schedules(
113 &self,
114 chains: Option<&'async_recursion Vec<Blockchain>>,
115 r_val_fill_pool_threshold: Option<u32>,
116 ) -> Result<Vec<(DhFillPoolRequest, tokio::sync::OwnedSemaphorePermit)>> {
117 let mut schedules = Vec::new();
118 let mut schedules_pool_types = HashSet::new();
119 let threshold = r_val_fill_pool_threshold.unwrap_or(R_VAL_FILL_POOL_THRESHOLD);
120 for chain in chains.unwrap_or(Blockchain::all().as_ref()) {
121 let remaining = self.signer()?.get_remaining_r_vals(chain);
122 info!("{:?}: {}", chain, remaining);
123 if remaining < threshold {
124 let (semaphore, pool_type) = match chain {
125 Blockchain::Bitcoin => (&self.k1_fill_pool_semaphore, RValPoolTypes::K1),
126 Blockchain::Ethereum => (&self.k1_fill_pool_semaphore, RValPoolTypes::K1),
127 Blockchain::NEO => (&self.r1_fill_pool_semaphore, RValPoolTypes::R1),
128 };
129 if !schedules_pool_types.contains(&pool_type) {
131 if let Ok(permit) = semaphore.clone().try_acquire_owned() {
132 let fill_size = MAX_R_VAL_POOL_SIZE - remaining;
133 schedules.push((DhFillPoolRequest::new(chain.clone(), fill_size)?, permit));
134 schedules_pool_types.insert(pool_type);
135 trace!(?chain, %remaining ,%fill_size, "created fill pool request");
136 } else {
137 let _ = semaphore
142 .acquire()
143 .await
144 .expect("Who closed the semaphore?");
145 return self
146 .acquire_fill_pool_schedules(chains, r_val_fill_pool_threshold)
147 .await;
148 }
149 }
150 }
151 }
152 Ok(schedules)
153 }
154}
155
156#[derive(Hash, PartialEq, Eq)]
157enum RValPoolTypes {
158 R1,
159 K1,
160}
161
162pub const MAX_R_VAL_POOL_SIZE: u32 = 100;
163pub const R_VAL_FILL_POOL_THRESHOLD: u32 = 60;