Skip to main content

tycho_simulation/rfq/protocols/hashflow/
decoder.rs

1use std::collections::{HashMap, HashSet};
2
3use tycho_client::feed::synchronizer::ComponentWithState;
4use tycho_common::{models::token::Token, Bytes};
5
6use super::{
7    client_builder::HashflowClientBuilder,
8    models::{HashflowMarketMakerLevels, HashflowPair, HashflowPriceLevel},
9    state::HashflowState,
10};
11use crate::{
12    protocol::{
13        errors::InvalidSnapshotError,
14        models::{DecoderContext, TryFromWithBlock},
15    },
16    rfq::{constants::get_hashflow_auth, models::TimestampHeader},
17};
18
19impl TryFromWithBlock<ComponentWithState, TimestampHeader> for HashflowState {
20    type Error = InvalidSnapshotError;
21
22    async fn try_from_with_header(
23        snapshot: ComponentWithState,
24        _timestamp_header: TimestampHeader,
25        _account_balances: &HashMap<Bytes, HashMap<Bytes, Bytes>>,
26        all_tokens: &HashMap<Bytes, Token>,
27        _decoder_context: &DecoderContext,
28    ) -> Result<Self, Self::Error> {
29        let state_attrs = snapshot.state.attributes;
30
31        if snapshot.component.tokens.len() != 2 {
32            return Err(InvalidSnapshotError::ValueError(
33                "Component must have 2 tokens (base and quote)".to_string(),
34            ));
35        }
36
37        let base_token_address = &snapshot.component.tokens[0];
38        let quote_token_address = &snapshot.component.tokens[1];
39
40        let base_token = all_tokens
41            .get(base_token_address)
42            .ok_or_else(|| {
43                InvalidSnapshotError::ValueError(format!(
44                    "Base token not found: {base_token_address}"
45                ))
46            })?
47            .clone();
48
49        let quote_token = all_tokens
50            .get(quote_token_address)
51            .ok_or_else(|| {
52                InvalidSnapshotError::ValueError(format!(
53                    "Quote token not found: {quote_token_address}"
54                ))
55            })?
56            .clone();
57
58        // Parse the price levels array from the component attributes.
59        // A missing levels key indicates no levels for this market maker.
60        let empty_levels_array: Bytes = "[]".as_bytes().to_vec().into();
61        let levels_data = state_attrs
62            .get("levels")
63            .unwrap_or(&empty_levels_array);
64
65        let price_levels: Vec<HashflowPriceLevel> = serde_json::from_slice(levels_data)
66            .map_err(|e| InvalidSnapshotError::ValueError(format!("Invalid levels JSON: {e}")))?;
67
68        let market_maker = state_attrs
69            .get("mm")
70            .ok_or_else(|| {
71                InvalidSnapshotError::MissingAttribute("mm attribute not found".to_string())
72            })
73            .and_then(|mm_bytes| {
74                String::from_utf8(mm_bytes.to_vec()).map_err(|_| {
75                    InvalidSnapshotError::ValueError("Invalid mm encoding".to_string())
76                })
77            })?;
78
79        // Create the HashflowMarketMakerLevels using token pair from component
80        let levels = HashflowMarketMakerLevels {
81            pair: HashflowPair {
82                base_token: base_token_address.clone(),
83                quote_token: quote_token_address.clone(),
84            },
85            levels: price_levels,
86        };
87
88        // Create HashFlow client with authentication from environment variables
89        let auth = get_hashflow_auth().map_err(|e| {
90            InvalidSnapshotError::ValueError(format!("Failed to get Hashflow authentication: {e}"))
91        })?;
92
93        let client = HashflowClientBuilder::new(snapshot.component.chain, auth.user, auth.key)
94            .tokens(HashSet::from([base_token_address.clone(), quote_token_address.clone()]))
95            .build()
96            .map_err(|e| {
97                InvalidSnapshotError::MissingAttribute(format!(
98                    "Couldn't create HashflowClient: {e}"
99                ))
100            })?;
101
102        Ok(HashflowState::new(base_token, quote_token, levels, market_maker, client))
103    }
104}
105
106#[cfg(test)]
107mod tests {
108    use std::env;
109
110    use tycho_common::models::{
111        protocol::{ProtocolComponent, ProtocolComponentState},
112        Chain, ChangeType,
113    };
114
115    use super::*;
116
117    fn wbtc() -> Token {
118        Token::new(
119            &hex::decode("2260fac5e5542a773aa44fbcfedf7c193bc2c599")
120                .unwrap()
121                .into(),
122            "WBTC",
123            8,
124            0,
125            &[Some(10_000)],
126            Chain::Ethereum,
127            100,
128        )
129    }
130
131    fn usdc() -> Token {
132        Token::new(
133            &hex::decode("a0b86991c6218a76c1d19d4a2e9eb0ce3606eb48")
134                .unwrap()
135                .into(),
136            "USDC",
137            6,
138            0,
139            &[Some(10_000)],
140            Chain::Ethereum,
141            100,
142        )
143    }
144
145    fn create_test_levels() -> serde_json::Value {
146        serde_json::json!([
147            {
148                "q": "1.5",
149                "p": "65000.0"
150            },
151            {
152                "q": "2.0",
153                "p": "64950.0"
154            },
155            {
156                "q": "0.5",
157                "p": "65100.0"
158            }
159        ])
160    }
161
162    fn create_test_snapshot() -> (ComponentWithState, HashMap<Bytes, Token>) {
163        let wbtc_token = wbtc();
164        let usdc_token = usdc();
165        let levels = create_test_levels();
166
167        let mut tokens = HashMap::new();
168        tokens.insert(wbtc_token.address.clone(), wbtc_token.clone());
169        tokens.insert(usdc_token.address.clone(), usdc_token.clone());
170
171        let mut state_attributes = HashMap::new();
172
173        // Serialize the levels to JSON
174        let levels_json = serde_json::to_vec(&levels).expect("Failed to serialize levels");
175        state_attributes.insert("levels".to_string(), levels_json.into());
176
177        // Add market maker name
178        state_attributes.insert(
179            "mm".to_string(),
180            "test_market_maker"
181                .as_bytes()
182                .to_vec()
183                .into(),
184        );
185
186        let snapshot = ComponentWithState {
187            state: ProtocolComponentState {
188                attributes: state_attributes,
189                component_id: "hashflow_wbtc_usdc".to_string(),
190                balances: HashMap::new(),
191            },
192            component: ProtocolComponent {
193                id: "hashflow_wbtc_usdc".to_string(),
194                protocol_system: "hashflow".to_string(),
195                protocol_type_name: "hashflow".to_string(),
196                chain: Chain::Ethereum,
197                tokens: vec![wbtc_token.address.clone(), usdc_token.address.clone()],
198                contract_addresses: Vec::new(),
199                static_attributes: HashMap::new(),
200                change: ChangeType::Creation,
201                creation_tx: Bytes::default(),
202                created_at: chrono::NaiveDateTime::default(),
203            },
204            component_tvl: None,
205            entrypoints: Vec::new(),
206        };
207
208        (snapshot, tokens)
209    }
210
211    #[tokio::test]
212    async fn test_try_from_with_header() {
213        env::set_var("HASHFLOW_USER", "test_user");
214        env::set_var("HASHFLOW_KEY", "test_key");
215
216        let (snapshot, tokens) = create_test_snapshot();
217
218        let result = HashflowState::try_from_with_header(
219            snapshot,
220            TimestampHeader { timestamp: 1703097600u64 },
221            &HashMap::new(),
222            &tokens,
223            &DecoderContext::new(),
224        )
225        .await
226        .expect("create state from snapshot");
227
228        assert_eq!(result.base_token.symbol, "WBTC");
229        assert_eq!(result.quote_token.symbol, "USDC");
230        assert_eq!(result.market_maker, "test_market_maker");
231        assert_eq!(result.levels.levels.len(), 3);
232        assert_eq!(result.levels.levels[0].quantity, 1.5);
233        assert_eq!(result.levels.levels[0].price, 65000.0);
234        assert_eq!(result.levels.levels[1].quantity, 2.0);
235        assert_eq!(result.levels.levels[1].price, 64950.0);
236        assert_eq!(result.levels.levels[2].quantity, 0.5);
237        assert_eq!(result.levels.levels[2].price, 65100.0);
238    }
239
240    #[tokio::test]
241    async fn test_try_from_missing_levels() {
242        env::set_var("HASHFLOW_USER", "test_user");
243        env::set_var("HASHFLOW_KEY", "test_key");
244
245        let (mut snapshot, tokens) = create_test_snapshot();
246        snapshot
247            .state
248            .attributes
249            .remove("levels");
250
251        let result = HashflowState::try_from_with_header(
252            snapshot,
253            TimestampHeader::default(),
254            &HashMap::new(),
255            &tokens,
256            &DecoderContext::new(),
257        )
258        .await
259        .expect("create state with missing levels should default to empty levels");
260
261        // Should succeed with empty levels
262        assert_eq!(result.base_token.symbol, "WBTC");
263        assert_eq!(result.quote_token.symbol, "USDC");
264        assert_eq!(result.levels.levels.len(), 0);
265    }
266
267    #[tokio::test]
268    async fn test_try_from_missing_token() {
269        env::set_var("HASHFLOW_USER", "test_user");
270        env::set_var("HASHFLOW_KEY", "test_key");
271
272        // Test missing second token (only one token in array)
273        let (mut snapshot, tokens) = create_test_snapshot();
274        snapshot.component.tokens.pop(); // Remove the second token
275
276        let result = HashflowState::try_from_with_header(
277            snapshot,
278            TimestampHeader::default(),
279            &HashMap::new(),
280            &tokens,
281            &DecoderContext::new(),
282        )
283        .await;
284
285        assert!(result.is_err());
286        assert!(matches!(result.unwrap_err(), InvalidSnapshotError::ValueError(_)));
287    }
288
289    #[tokio::test]
290    async fn test_try_from_too_many_tokens() {
291        env::set_var("HASHFLOW_USER", "test_user");
292        env::set_var("HASHFLOW_KEY", "test_key");
293
294        // Test with three tokens instead of two
295        let (mut snapshot, mut tokens) = create_test_snapshot();
296
297        let dai_token = Token::new(
298            &hex::decode("6b175474e89094c44da98b954eedeac495271d0f")
299                .unwrap()
300                .into(),
301            "DAI",
302            18,
303            0,
304            &[Some(10_000)],
305            Chain::Ethereum,
306            100,
307        );
308
309        tokens.insert(dai_token.address.clone(), dai_token.clone());
310        snapshot
311            .component
312            .tokens
313            .push(dai_token.address);
314
315        let result = HashflowState::try_from_with_header(
316            snapshot,
317            TimestampHeader::default(),
318            &HashMap::new(),
319            &tokens,
320            &DecoderContext::new(),
321        )
322        .await;
323
324        assert!(result.is_err());
325        assert!(matches!(result.unwrap_err(), InvalidSnapshotError::ValueError(_)));
326    }
327
328    #[tokio::test]
329    async fn test_try_from_invalid_levels_json() {
330        env::set_var("HASHFLOW_USER", "test_user");
331        env::set_var("HASHFLOW_KEY", "test_key");
332
333        let (mut snapshot, tokens) = create_test_snapshot();
334
335        // Insert invalid JSON for levels
336        snapshot.state.attributes.insert(
337            "levels".to_string(),
338            "invalid json"
339                .as_bytes()
340                .to_vec()
341                .into(),
342        );
343
344        let result = HashflowState::try_from_with_header(
345            snapshot,
346            TimestampHeader::default(),
347            &HashMap::new(),
348            &tokens,
349            &DecoderContext::new(),
350        )
351        .await;
352
353        assert!(result.is_err());
354        assert!(matches!(result.unwrap_err(), InvalidSnapshotError::ValueError(_)));
355    }
356
357    #[tokio::test]
358    async fn test_try_from_missing_mm() {
359        env::set_var("HASHFLOW_USER", "test_user");
360        env::set_var("HASHFLOW_KEY", "test_key");
361
362        let (mut snapshot, tokens) = create_test_snapshot();
363        snapshot.state.attributes.remove("mm");
364
365        let result = HashflowState::try_from_with_header(
366            snapshot,
367            TimestampHeader::default(),
368            &HashMap::new(),
369            &tokens,
370            &DecoderContext::new(),
371        )
372        .await;
373
374        assert!(result.is_err());
375        assert!(matches!(result.unwrap_err(), InvalidSnapshotError::MissingAttribute(_)));
376    }
377}