tycho_simulation/rfq/protocols/hashflow/
decoder.rs1use std::collections::{HashMap, HashSet};
2
3use tycho_client::feed::synchronizer::ComponentWithState;
4use tycho_common::{models::token::Token, Bytes};
5
6use super::{
7 client_builder::HashflowClientBuilder,
8 models::{HashflowMarketMakerLevels, HashflowPair, HashflowPriceLevel},
9 state::HashflowState,
10};
11use crate::{
12 protocol::{
13 errors::InvalidSnapshotError,
14 models::{DecoderContext, TryFromWithBlock},
15 },
16 rfq::{constants::get_hashflow_auth, models::TimestampHeader},
17};
18
19impl TryFromWithBlock<ComponentWithState, TimestampHeader> for HashflowState {
20 type Error = InvalidSnapshotError;
21
22 async fn try_from_with_header(
23 snapshot: ComponentWithState,
24 _timestamp_header: TimestampHeader,
25 _account_balances: &HashMap<Bytes, HashMap<Bytes, Bytes>>,
26 all_tokens: &HashMap<Bytes, Token>,
27 _decoder_context: &DecoderContext,
28 ) -> Result<Self, Self::Error> {
29 let state_attrs = snapshot.state.attributes;
30
31 if snapshot.component.tokens.len() != 2 {
32 return Err(InvalidSnapshotError::ValueError(
33 "Component must have 2 tokens (base and quote)".to_string(),
34 ));
35 }
36
37 let base_token_address = &snapshot.component.tokens[0];
38 let quote_token_address = &snapshot.component.tokens[1];
39
40 let base_token = all_tokens
41 .get(base_token_address)
42 .ok_or_else(|| {
43 InvalidSnapshotError::ValueError(format!(
44 "Base token not found: {base_token_address}"
45 ))
46 })?
47 .clone();
48
49 let quote_token = all_tokens
50 .get(quote_token_address)
51 .ok_or_else(|| {
52 InvalidSnapshotError::ValueError(format!(
53 "Quote token not found: {quote_token_address}"
54 ))
55 })?
56 .clone();
57
58 let empty_levels_array: Bytes = "[]".as_bytes().to_vec().into();
61 let levels_data = state_attrs
62 .get("levels")
63 .unwrap_or(&empty_levels_array);
64
65 let price_levels: Vec<HashflowPriceLevel> = serde_json::from_slice(levels_data)
66 .map_err(|e| InvalidSnapshotError::ValueError(format!("Invalid levels JSON: {e}")))?;
67
68 let market_maker = state_attrs
69 .get("mm")
70 .ok_or_else(|| {
71 InvalidSnapshotError::MissingAttribute("mm attribute not found".to_string())
72 })
73 .and_then(|mm_bytes| {
74 String::from_utf8(mm_bytes.to_vec()).map_err(|_| {
75 InvalidSnapshotError::ValueError("Invalid mm encoding".to_string())
76 })
77 })?;
78
79 let levels = HashflowMarketMakerLevels {
81 pair: HashflowPair {
82 base_token: base_token_address.clone(),
83 quote_token: quote_token_address.clone(),
84 },
85 levels: price_levels,
86 };
87
88 let auth = get_hashflow_auth().map_err(|e| {
90 InvalidSnapshotError::ValueError(format!("Failed to get Hashflow authentication: {e}"))
91 })?;
92
93 let client = HashflowClientBuilder::new(snapshot.component.chain, auth.user, auth.key)
94 .tokens(HashSet::from([base_token_address.clone(), quote_token_address.clone()]))
95 .build()
96 .map_err(|e| {
97 InvalidSnapshotError::MissingAttribute(format!(
98 "Couldn't create HashflowClient: {e}"
99 ))
100 })?;
101
102 Ok(HashflowState::new(base_token, quote_token, levels, market_maker, client))
103 }
104}
105
106#[cfg(test)]
107mod tests {
108 use std::env;
109
110 use tycho_common::models::{
111 protocol::{ProtocolComponent, ProtocolComponentState},
112 Chain, ChangeType,
113 };
114
115 use super::*;
116
117 fn wbtc() -> Token {
118 Token::new(
119 &hex::decode("2260fac5e5542a773aa44fbcfedf7c193bc2c599")
120 .unwrap()
121 .into(),
122 "WBTC",
123 8,
124 0,
125 &[Some(10_000)],
126 Chain::Ethereum,
127 100,
128 )
129 }
130
131 fn usdc() -> Token {
132 Token::new(
133 &hex::decode("a0b86991c6218a76c1d19d4a2e9eb0ce3606eb48")
134 .unwrap()
135 .into(),
136 "USDC",
137 6,
138 0,
139 &[Some(10_000)],
140 Chain::Ethereum,
141 100,
142 )
143 }
144
145 fn create_test_levels() -> serde_json::Value {
146 serde_json::json!([
147 {
148 "q": "1.5",
149 "p": "65000.0"
150 },
151 {
152 "q": "2.0",
153 "p": "64950.0"
154 },
155 {
156 "q": "0.5",
157 "p": "65100.0"
158 }
159 ])
160 }
161
162 fn create_test_snapshot() -> (ComponentWithState, HashMap<Bytes, Token>) {
163 let wbtc_token = wbtc();
164 let usdc_token = usdc();
165 let levels = create_test_levels();
166
167 let mut tokens = HashMap::new();
168 tokens.insert(wbtc_token.address.clone(), wbtc_token.clone());
169 tokens.insert(usdc_token.address.clone(), usdc_token.clone());
170
171 let mut state_attributes = HashMap::new();
172
173 let levels_json = serde_json::to_vec(&levels).expect("Failed to serialize levels");
175 state_attributes.insert("levels".to_string(), levels_json.into());
176
177 state_attributes.insert(
179 "mm".to_string(),
180 "test_market_maker"
181 .as_bytes()
182 .to_vec()
183 .into(),
184 );
185
186 let snapshot = ComponentWithState {
187 state: ProtocolComponentState {
188 attributes: state_attributes,
189 component_id: "hashflow_wbtc_usdc".to_string(),
190 balances: HashMap::new(),
191 },
192 component: ProtocolComponent {
193 id: "hashflow_wbtc_usdc".to_string(),
194 protocol_system: "hashflow".to_string(),
195 protocol_type_name: "hashflow".to_string(),
196 chain: Chain::Ethereum,
197 tokens: vec![wbtc_token.address.clone(), usdc_token.address.clone()],
198 contract_addresses: Vec::new(),
199 static_attributes: HashMap::new(),
200 change: ChangeType::Creation,
201 creation_tx: Bytes::default(),
202 created_at: chrono::NaiveDateTime::default(),
203 },
204 component_tvl: None,
205 entrypoints: Vec::new(),
206 };
207
208 (snapshot, tokens)
209 }
210
211 #[tokio::test]
212 async fn test_try_from_with_header() {
213 env::set_var("HASHFLOW_USER", "test_user");
214 env::set_var("HASHFLOW_KEY", "test_key");
215
216 let (snapshot, tokens) = create_test_snapshot();
217
218 let result = HashflowState::try_from_with_header(
219 snapshot,
220 TimestampHeader { timestamp: 1703097600u64 },
221 &HashMap::new(),
222 &tokens,
223 &DecoderContext::new(),
224 )
225 .await
226 .expect("create state from snapshot");
227
228 assert_eq!(result.base_token.symbol, "WBTC");
229 assert_eq!(result.quote_token.symbol, "USDC");
230 assert_eq!(result.market_maker, "test_market_maker");
231 assert_eq!(result.levels.levels.len(), 3);
232 assert_eq!(result.levels.levels[0].quantity, 1.5);
233 assert_eq!(result.levels.levels[0].price, 65000.0);
234 assert_eq!(result.levels.levels[1].quantity, 2.0);
235 assert_eq!(result.levels.levels[1].price, 64950.0);
236 assert_eq!(result.levels.levels[2].quantity, 0.5);
237 assert_eq!(result.levels.levels[2].price, 65100.0);
238 }
239
240 #[tokio::test]
241 async fn test_try_from_missing_levels() {
242 env::set_var("HASHFLOW_USER", "test_user");
243 env::set_var("HASHFLOW_KEY", "test_key");
244
245 let (mut snapshot, tokens) = create_test_snapshot();
246 snapshot
247 .state
248 .attributes
249 .remove("levels");
250
251 let result = HashflowState::try_from_with_header(
252 snapshot,
253 TimestampHeader::default(),
254 &HashMap::new(),
255 &tokens,
256 &DecoderContext::new(),
257 )
258 .await
259 .expect("create state with missing levels should default to empty levels");
260
261 assert_eq!(result.base_token.symbol, "WBTC");
263 assert_eq!(result.quote_token.symbol, "USDC");
264 assert_eq!(result.levels.levels.len(), 0);
265 }
266
267 #[tokio::test]
268 async fn test_try_from_missing_token() {
269 env::set_var("HASHFLOW_USER", "test_user");
270 env::set_var("HASHFLOW_KEY", "test_key");
271
272 let (mut snapshot, tokens) = create_test_snapshot();
274 snapshot.component.tokens.pop(); let result = HashflowState::try_from_with_header(
277 snapshot,
278 TimestampHeader::default(),
279 &HashMap::new(),
280 &tokens,
281 &DecoderContext::new(),
282 )
283 .await;
284
285 assert!(result.is_err());
286 assert!(matches!(result.unwrap_err(), InvalidSnapshotError::ValueError(_)));
287 }
288
289 #[tokio::test]
290 async fn test_try_from_too_many_tokens() {
291 env::set_var("HASHFLOW_USER", "test_user");
292 env::set_var("HASHFLOW_KEY", "test_key");
293
294 let (mut snapshot, mut tokens) = create_test_snapshot();
296
297 let dai_token = Token::new(
298 &hex::decode("6b175474e89094c44da98b954eedeac495271d0f")
299 .unwrap()
300 .into(),
301 "DAI",
302 18,
303 0,
304 &[Some(10_000)],
305 Chain::Ethereum,
306 100,
307 );
308
309 tokens.insert(dai_token.address.clone(), dai_token.clone());
310 snapshot
311 .component
312 .tokens
313 .push(dai_token.address);
314
315 let result = HashflowState::try_from_with_header(
316 snapshot,
317 TimestampHeader::default(),
318 &HashMap::new(),
319 &tokens,
320 &DecoderContext::new(),
321 )
322 .await;
323
324 assert!(result.is_err());
325 assert!(matches!(result.unwrap_err(), InvalidSnapshotError::ValueError(_)));
326 }
327
328 #[tokio::test]
329 async fn test_try_from_invalid_levels_json() {
330 env::set_var("HASHFLOW_USER", "test_user");
331 env::set_var("HASHFLOW_KEY", "test_key");
332
333 let (mut snapshot, tokens) = create_test_snapshot();
334
335 snapshot.state.attributes.insert(
337 "levels".to_string(),
338 "invalid json"
339 .as_bytes()
340 .to_vec()
341 .into(),
342 );
343
344 let result = HashflowState::try_from_with_header(
345 snapshot,
346 TimestampHeader::default(),
347 &HashMap::new(),
348 &tokens,
349 &DecoderContext::new(),
350 )
351 .await;
352
353 assert!(result.is_err());
354 assert!(matches!(result.unwrap_err(), InvalidSnapshotError::ValueError(_)));
355 }
356
357 #[tokio::test]
358 async fn test_try_from_missing_mm() {
359 env::set_var("HASHFLOW_USER", "test_user");
360 env::set_var("HASHFLOW_KEY", "test_key");
361
362 let (mut snapshot, tokens) = create_test_snapshot();
363 snapshot.state.attributes.remove("mm");
364
365 let result = HashflowState::try_from_with_header(
366 snapshot,
367 TimestampHeader::default(),
368 &HashMap::new(),
369 &tokens,
370 &DecoderContext::new(),
371 )
372 .await;
373
374 assert!(result.is_err());
375 assert!(matches!(result.unwrap_err(), InvalidSnapshotError::MissingAttribute(_)));
376 }
377}