tycho_simulation/rfq/protocols/hashflow/
decoder.rs1use std::collections::{HashMap, HashSet};
2
3use tycho_client::feed::synchronizer::ComponentWithState;
4use tycho_common::{models::token::Token, Bytes};
5
6use super::{
7 client_builder::HashflowClientBuilder,
8 models::{HashflowMarketMakerLevels, HashflowPair, HashflowPriceLevel},
9 state::HashflowState,
10};
11use crate::{
12 protocol::{
13 errors::InvalidSnapshotError,
14 models::{DecoderContext, TryFromWithBlock},
15 },
16 rfq::{constants::get_hashflow_auth, models::TimestampHeader},
17};
18
19impl TryFromWithBlock<ComponentWithState, TimestampHeader> for HashflowState {
20 type Error = InvalidSnapshotError;
21
22 async fn try_from_with_header(
23 snapshot: ComponentWithState,
24 _timestamp_header: TimestampHeader,
25 _account_balances: &HashMap<Bytes, HashMap<Bytes, Bytes>>,
26 all_tokens: &HashMap<Bytes, Token>,
27 _decoder_context: &DecoderContext,
28 ) -> Result<Self, Self::Error> {
29 let state_attrs = snapshot.state.attributes;
30
31 if snapshot.component.tokens.len() != 2 {
32 return Err(InvalidSnapshotError::ValueError(
33 "Component must have 2 tokens (base and quote)".to_string(),
34 ));
35 }
36
37 let base_token_address = &snapshot.component.tokens[0];
38 let quote_token_address = &snapshot.component.tokens[1];
39
40 let base_token = all_tokens
41 .get(base_token_address)
42 .ok_or_else(|| {
43 InvalidSnapshotError::ValueError(format!(
44 "Base token not found: {base_token_address}"
45 ))
46 })?
47 .clone();
48
49 let quote_token = all_tokens
50 .get(quote_token_address)
51 .ok_or_else(|| {
52 InvalidSnapshotError::ValueError(format!(
53 "Quote token not found: {quote_token_address}"
54 ))
55 })?
56 .clone();
57
58 let empty_levels_array: Bytes = "[]".as_bytes().to_vec().into();
61 let levels_data = state_attrs
62 .get("levels")
63 .unwrap_or(&empty_levels_array);
64
65 let price_levels: Vec<HashflowPriceLevel> = serde_json::from_slice(levels_data)
66 .map_err(|e| InvalidSnapshotError::ValueError(format!("Invalid levels JSON: {e}")))?;
67
68 let market_maker = state_attrs
69 .get("mm")
70 .ok_or_else(|| {
71 InvalidSnapshotError::MissingAttribute("mm attribute not found".to_string())
72 })
73 .and_then(|mm_bytes| {
74 String::from_utf8(mm_bytes.to_vec()).map_err(|_| {
75 InvalidSnapshotError::ValueError("Invalid mm encoding".to_string())
76 })
77 })?;
78
79 let levels = HashflowMarketMakerLevels {
81 pair: HashflowPair {
82 base_token: base_token_address.clone(),
83 quote_token: quote_token_address.clone(),
84 },
85 levels: price_levels,
86 };
87
88 let auth = get_hashflow_auth().map_err(|e| {
90 InvalidSnapshotError::ValueError(format!("Failed to get Hashflow authentication: {e}"))
91 })?;
92
93 let client =
94 HashflowClientBuilder::new(snapshot.component.chain.into(), auth.user, auth.key)
95 .tokens(HashSet::from([base_token_address.clone(), quote_token_address.clone()]))
96 .build()
97 .map_err(|e| {
98 InvalidSnapshotError::MissingAttribute(format!(
99 "Couldn't create HashflowClient: {e}"
100 ))
101 })?;
102
103 Ok(HashflowState::new(base_token, quote_token, levels, market_maker, client))
104 }
105}
106
107#[cfg(test)]
108mod tests {
109 use std::env;
110
111 use tycho_common::{
112 dto::{Chain, ChangeType, ProtocolComponent, ResponseProtocolState},
113 models::Chain as ModelChain,
114 };
115
116 use super::*;
117
118 fn wbtc() -> Token {
119 Token::new(
120 &hex::decode("2260fac5e5542a773aa44fbcfedf7c193bc2c599")
121 .unwrap()
122 .into(),
123 "WBTC",
124 8,
125 0,
126 &[Some(10_000)],
127 ModelChain::Ethereum,
128 100,
129 )
130 }
131
132 fn usdc() -> Token {
133 Token::new(
134 &hex::decode("a0b86991c6218a76c1d19d4a2e9eb0ce3606eb48")
135 .unwrap()
136 .into(),
137 "USDC",
138 6,
139 0,
140 &[Some(10_000)],
141 ModelChain::Ethereum,
142 100,
143 )
144 }
145
146 fn create_test_levels() -> serde_json::Value {
147 serde_json::json!([
148 {
149 "q": "1.5",
150 "p": "65000.0"
151 },
152 {
153 "q": "2.0",
154 "p": "64950.0"
155 },
156 {
157 "q": "0.5",
158 "p": "65100.0"
159 }
160 ])
161 }
162
163 fn create_test_snapshot() -> (ComponentWithState, HashMap<Bytes, Token>) {
164 let wbtc_token = wbtc();
165 let usdc_token = usdc();
166 let levels = create_test_levels();
167
168 let mut tokens = HashMap::new();
169 tokens.insert(wbtc_token.address.clone(), wbtc_token.clone());
170 tokens.insert(usdc_token.address.clone(), usdc_token.clone());
171
172 let mut state_attributes = HashMap::new();
173
174 let levels_json = serde_json::to_vec(&levels).expect("Failed to serialize levels");
176 state_attributes.insert("levels".to_string(), levels_json.into());
177
178 state_attributes.insert(
180 "mm".to_string(),
181 "test_market_maker"
182 .as_bytes()
183 .to_vec()
184 .into(),
185 );
186
187 let snapshot = ComponentWithState {
188 state: ResponseProtocolState {
189 attributes: state_attributes,
190 component_id: "hashflow_wbtc_usdc".to_string(),
191 balances: HashMap::new(),
192 },
193 component: ProtocolComponent {
194 id: "hashflow_wbtc_usdc".to_string(),
195 protocol_system: "hashflow".to_string(),
196 protocol_type_name: "hashflow".to_string(),
197 chain: Chain::Ethereum,
198 tokens: vec![wbtc_token.address.clone(), usdc_token.address.clone()],
199 contract_ids: Vec::new(),
200 static_attributes: HashMap::new(),
201 change: ChangeType::Creation,
202 creation_tx: Bytes::default(),
203 created_at: chrono::NaiveDateTime::default(),
204 },
205 component_tvl: None,
206 entrypoints: Vec::new(),
207 };
208
209 (snapshot, tokens)
210 }
211
212 #[tokio::test]
213 async fn test_try_from_with_header() {
214 env::set_var("HASHFLOW_USER", "test_user");
215 env::set_var("HASHFLOW_KEY", "test_key");
216
217 let (snapshot, tokens) = create_test_snapshot();
218
219 let result = HashflowState::try_from_with_header(
220 snapshot,
221 TimestampHeader { timestamp: 1703097600u64 },
222 &HashMap::new(),
223 &tokens,
224 &DecoderContext::new(),
225 )
226 .await
227 .expect("create state from snapshot");
228
229 assert_eq!(result.base_token.symbol, "WBTC");
230 assert_eq!(result.quote_token.symbol, "USDC");
231 assert_eq!(result.market_maker, "test_market_maker");
232 assert_eq!(result.levels.levels.len(), 3);
233 assert_eq!(result.levels.levels[0].quantity, 1.5);
234 assert_eq!(result.levels.levels[0].price, 65000.0);
235 assert_eq!(result.levels.levels[1].quantity, 2.0);
236 assert_eq!(result.levels.levels[1].price, 64950.0);
237 assert_eq!(result.levels.levels[2].quantity, 0.5);
238 assert_eq!(result.levels.levels[2].price, 65100.0);
239 }
240
241 #[tokio::test]
242 async fn test_try_from_missing_levels() {
243 env::set_var("HASHFLOW_USER", "test_user");
244 env::set_var("HASHFLOW_KEY", "test_key");
245
246 let (mut snapshot, tokens) = create_test_snapshot();
247 snapshot
248 .state
249 .attributes
250 .remove("levels");
251
252 let result = HashflowState::try_from_with_header(
253 snapshot,
254 TimestampHeader::default(),
255 &HashMap::new(),
256 &tokens,
257 &DecoderContext::new(),
258 )
259 .await
260 .expect("create state with missing levels should default to empty levels");
261
262 assert_eq!(result.base_token.symbol, "WBTC");
264 assert_eq!(result.quote_token.symbol, "USDC");
265 assert_eq!(result.levels.levels.len(), 0);
266 }
267
268 #[tokio::test]
269 async fn test_try_from_missing_token() {
270 env::set_var("HASHFLOW_USER", "test_user");
271 env::set_var("HASHFLOW_KEY", "test_key");
272
273 let (mut snapshot, tokens) = create_test_snapshot();
275 snapshot.component.tokens.pop(); let result = HashflowState::try_from_with_header(
278 snapshot,
279 TimestampHeader::default(),
280 &HashMap::new(),
281 &tokens,
282 &DecoderContext::new(),
283 )
284 .await;
285
286 assert!(result.is_err());
287 assert!(matches!(result.unwrap_err(), InvalidSnapshotError::ValueError(_)));
288 }
289
290 #[tokio::test]
291 async fn test_try_from_too_many_tokens() {
292 env::set_var("HASHFLOW_USER", "test_user");
293 env::set_var("HASHFLOW_KEY", "test_key");
294
295 let (mut snapshot, mut tokens) = create_test_snapshot();
297
298 let dai_token = Token::new(
299 &hex::decode("6b175474e89094c44da98b954eedeac495271d0f")
300 .unwrap()
301 .into(),
302 "DAI",
303 18,
304 0,
305 &[Some(10_000)],
306 ModelChain::Ethereum,
307 100,
308 );
309
310 tokens.insert(dai_token.address.clone(), dai_token.clone());
311 snapshot
312 .component
313 .tokens
314 .push(dai_token.address);
315
316 let result = HashflowState::try_from_with_header(
317 snapshot,
318 TimestampHeader::default(),
319 &HashMap::new(),
320 &tokens,
321 &DecoderContext::new(),
322 )
323 .await;
324
325 assert!(result.is_err());
326 assert!(matches!(result.unwrap_err(), InvalidSnapshotError::ValueError(_)));
327 }
328
329 #[tokio::test]
330 async fn test_try_from_invalid_levels_json() {
331 env::set_var("HASHFLOW_USER", "test_user");
332 env::set_var("HASHFLOW_KEY", "test_key");
333
334 let (mut snapshot, tokens) = create_test_snapshot();
335
336 snapshot.state.attributes.insert(
338 "levels".to_string(),
339 "invalid json"
340 .as_bytes()
341 .to_vec()
342 .into(),
343 );
344
345 let result = HashflowState::try_from_with_header(
346 snapshot,
347 TimestampHeader::default(),
348 &HashMap::new(),
349 &tokens,
350 &DecoderContext::new(),
351 )
352 .await;
353
354 assert!(result.is_err());
355 assert!(matches!(result.unwrap_err(), InvalidSnapshotError::ValueError(_)));
356 }
357
358 #[tokio::test]
359 async fn test_try_from_missing_mm() {
360 env::set_var("HASHFLOW_USER", "test_user");
361 env::set_var("HASHFLOW_KEY", "test_key");
362
363 let (mut snapshot, tokens) = create_test_snapshot();
364 snapshot.state.attributes.remove("mm");
365
366 let result = HashflowState::try_from_with_header(
367 snapshot,
368 TimestampHeader::default(),
369 &HashMap::new(),
370 &tokens,
371 &DecoderContext::new(),
372 )
373 .await;
374
375 assert!(result.is_err());
376 assert!(matches!(result.unwrap_err(), InvalidSnapshotError::MissingAttribute(_)));
377 }
378}