tycho_simulation/rfq/protocols/hashflow/
decoder.rs

1use std::collections::{HashMap, HashSet};
2
3use tycho_client::feed::synchronizer::ComponentWithState;
4use tycho_common::{models::token::Token, Bytes};
5
6use super::{
7    client_builder::HashflowClientBuilder,
8    models::{HashflowMarketMakerLevels, HashflowPair, HashflowPriceLevel},
9    state::HashflowState,
10};
11use crate::{
12    protocol::{
13        errors::InvalidSnapshotError,
14        models::{DecoderContext, TryFromWithBlock},
15    },
16    rfq::{constants::get_hashflow_auth, models::TimestampHeader},
17};
18
19impl TryFromWithBlock<ComponentWithState, TimestampHeader> for HashflowState {
20    type Error = InvalidSnapshotError;
21
22    async fn try_from_with_header(
23        snapshot: ComponentWithState,
24        _timestamp_header: TimestampHeader,
25        _account_balances: &HashMap<Bytes, HashMap<Bytes, Bytes>>,
26        all_tokens: &HashMap<Bytes, Token>,
27        _decoder_context: &DecoderContext,
28    ) -> Result<Self, Self::Error> {
29        let state_attrs = snapshot.state.attributes;
30
31        if snapshot.component.tokens.len() != 2 {
32            return Err(InvalidSnapshotError::ValueError(
33                "Component must have 2 tokens (base and quote)".to_string(),
34            ));
35        }
36
37        let base_token_address = &snapshot.component.tokens[0];
38        let quote_token_address = &snapshot.component.tokens[1];
39
40        let base_token = all_tokens
41            .get(base_token_address)
42            .ok_or_else(|| {
43                InvalidSnapshotError::ValueError(format!(
44                    "Base token not found: {base_token_address}"
45                ))
46            })?
47            .clone();
48
49        let quote_token = all_tokens
50            .get(quote_token_address)
51            .ok_or_else(|| {
52                InvalidSnapshotError::ValueError(format!(
53                    "Quote token not found: {quote_token_address}"
54                ))
55            })?
56            .clone();
57
58        // Parse the price levels array from the component attributes.
59        // A missing levels key indicates no levels for this market maker.
60        let empty_levels_array: Bytes = "[]".as_bytes().to_vec().into();
61        let levels_data = state_attrs
62            .get("levels")
63            .unwrap_or(&empty_levels_array);
64
65        let price_levels: Vec<HashflowPriceLevel> = serde_json::from_slice(levels_data)
66            .map_err(|e| InvalidSnapshotError::ValueError(format!("Invalid levels JSON: {e}")))?;
67
68        let market_maker = state_attrs
69            .get("mm")
70            .ok_or_else(|| {
71                InvalidSnapshotError::MissingAttribute("mm attribute not found".to_string())
72            })
73            .and_then(|mm_bytes| {
74                String::from_utf8(mm_bytes.to_vec()).map_err(|_| {
75                    InvalidSnapshotError::ValueError("Invalid mm encoding".to_string())
76                })
77            })?;
78
79        // Create the HashflowMarketMakerLevels using token pair from component
80        let levels = HashflowMarketMakerLevels {
81            pair: HashflowPair {
82                base_token: base_token_address.clone(),
83                quote_token: quote_token_address.clone(),
84            },
85            levels: price_levels,
86        };
87
88        // Create HashFlow client with authentication from environment variables
89        let auth = get_hashflow_auth().map_err(|e| {
90            InvalidSnapshotError::ValueError(format!("Failed to get Hashflow authentication: {e}"))
91        })?;
92
93        let client =
94            HashflowClientBuilder::new(snapshot.component.chain.into(), auth.user, auth.key)
95                .tokens(HashSet::from([base_token_address.clone(), quote_token_address.clone()]))
96                .build()
97                .map_err(|e| {
98                    InvalidSnapshotError::MissingAttribute(format!(
99                        "Couldn't create HashflowClient: {e}"
100                    ))
101                })?;
102
103        Ok(HashflowState::new(base_token, quote_token, levels, market_maker, client))
104    }
105}
106
107#[cfg(test)]
108mod tests {
109    use std::env;
110
111    use tycho_common::{
112        dto::{Chain, ChangeType, ProtocolComponent, ResponseProtocolState},
113        models::Chain as ModelChain,
114    };
115
116    use super::*;
117
118    fn wbtc() -> Token {
119        Token::new(
120            &hex::decode("2260fac5e5542a773aa44fbcfedf7c193bc2c599")
121                .unwrap()
122                .into(),
123            "WBTC",
124            8,
125            0,
126            &[Some(10_000)],
127            ModelChain::Ethereum,
128            100,
129        )
130    }
131
132    fn usdc() -> Token {
133        Token::new(
134            &hex::decode("a0b86991c6218a76c1d19d4a2e9eb0ce3606eb48")
135                .unwrap()
136                .into(),
137            "USDC",
138            6,
139            0,
140            &[Some(10_000)],
141            ModelChain::Ethereum,
142            100,
143        )
144    }
145
146    fn create_test_levels() -> serde_json::Value {
147        serde_json::json!([
148            {
149                "q": "1.5",
150                "p": "65000.0"
151            },
152            {
153                "q": "2.0",
154                "p": "64950.0"
155            },
156            {
157                "q": "0.5",
158                "p": "65100.0"
159            }
160        ])
161    }
162
163    fn create_test_snapshot() -> (ComponentWithState, HashMap<Bytes, Token>) {
164        let wbtc_token = wbtc();
165        let usdc_token = usdc();
166        let levels = create_test_levels();
167
168        let mut tokens = HashMap::new();
169        tokens.insert(wbtc_token.address.clone(), wbtc_token.clone());
170        tokens.insert(usdc_token.address.clone(), usdc_token.clone());
171
172        let mut state_attributes = HashMap::new();
173
174        // Serialize the levels to JSON
175        let levels_json = serde_json::to_vec(&levels).expect("Failed to serialize levels");
176        state_attributes.insert("levels".to_string(), levels_json.into());
177
178        // Add market maker name
179        state_attributes.insert(
180            "mm".to_string(),
181            "test_market_maker"
182                .as_bytes()
183                .to_vec()
184                .into(),
185        );
186
187        let snapshot = ComponentWithState {
188            state: ResponseProtocolState {
189                attributes: state_attributes,
190                component_id: "hashflow_wbtc_usdc".to_string(),
191                balances: HashMap::new(),
192            },
193            component: ProtocolComponent {
194                id: "hashflow_wbtc_usdc".to_string(),
195                protocol_system: "hashflow".to_string(),
196                protocol_type_name: "hashflow".to_string(),
197                chain: Chain::Ethereum,
198                tokens: vec![wbtc_token.address.clone(), usdc_token.address.clone()],
199                contract_ids: Vec::new(),
200                static_attributes: HashMap::new(),
201                change: ChangeType::Creation,
202                creation_tx: Bytes::default(),
203                created_at: chrono::NaiveDateTime::default(),
204            },
205            component_tvl: None,
206            entrypoints: Vec::new(),
207        };
208
209        (snapshot, tokens)
210    }
211
212    #[tokio::test]
213    async fn test_try_from_with_header() {
214        env::set_var("HASHFLOW_USER", "test_user");
215        env::set_var("HASHFLOW_KEY", "test_key");
216
217        let (snapshot, tokens) = create_test_snapshot();
218
219        let result = HashflowState::try_from_with_header(
220            snapshot,
221            TimestampHeader { timestamp: 1703097600u64 },
222            &HashMap::new(),
223            &tokens,
224            &DecoderContext::new(),
225        )
226        .await
227        .expect("create state from snapshot");
228
229        assert_eq!(result.base_token.symbol, "WBTC");
230        assert_eq!(result.quote_token.symbol, "USDC");
231        assert_eq!(result.market_maker, "test_market_maker");
232        assert_eq!(result.levels.levels.len(), 3);
233        assert_eq!(result.levels.levels[0].quantity, 1.5);
234        assert_eq!(result.levels.levels[0].price, 65000.0);
235        assert_eq!(result.levels.levels[1].quantity, 2.0);
236        assert_eq!(result.levels.levels[1].price, 64950.0);
237        assert_eq!(result.levels.levels[2].quantity, 0.5);
238        assert_eq!(result.levels.levels[2].price, 65100.0);
239    }
240
241    #[tokio::test]
242    async fn test_try_from_missing_levels() {
243        env::set_var("HASHFLOW_USER", "test_user");
244        env::set_var("HASHFLOW_KEY", "test_key");
245
246        let (mut snapshot, tokens) = create_test_snapshot();
247        snapshot
248            .state
249            .attributes
250            .remove("levels");
251
252        let result = HashflowState::try_from_with_header(
253            snapshot,
254            TimestampHeader::default(),
255            &HashMap::new(),
256            &tokens,
257            &DecoderContext::new(),
258        )
259        .await
260        .expect("create state with missing levels should default to empty levels");
261
262        // Should succeed with empty levels
263        assert_eq!(result.base_token.symbol, "WBTC");
264        assert_eq!(result.quote_token.symbol, "USDC");
265        assert_eq!(result.levels.levels.len(), 0);
266    }
267
268    #[tokio::test]
269    async fn test_try_from_missing_token() {
270        env::set_var("HASHFLOW_USER", "test_user");
271        env::set_var("HASHFLOW_KEY", "test_key");
272
273        // Test missing second token (only one token in array)
274        let (mut snapshot, tokens) = create_test_snapshot();
275        snapshot.component.tokens.pop(); // Remove the second token
276
277        let result = HashflowState::try_from_with_header(
278            snapshot,
279            TimestampHeader::default(),
280            &HashMap::new(),
281            &tokens,
282            &DecoderContext::new(),
283        )
284        .await;
285
286        assert!(result.is_err());
287        assert!(matches!(result.unwrap_err(), InvalidSnapshotError::ValueError(_)));
288    }
289
290    #[tokio::test]
291    async fn test_try_from_too_many_tokens() {
292        env::set_var("HASHFLOW_USER", "test_user");
293        env::set_var("HASHFLOW_KEY", "test_key");
294
295        // Test with three tokens instead of two
296        let (mut snapshot, mut tokens) = create_test_snapshot();
297
298        let dai_token = Token::new(
299            &hex::decode("6b175474e89094c44da98b954eedeac495271d0f")
300                .unwrap()
301                .into(),
302            "DAI",
303            18,
304            0,
305            &[Some(10_000)],
306            ModelChain::Ethereum,
307            100,
308        );
309
310        tokens.insert(dai_token.address.clone(), dai_token.clone());
311        snapshot
312            .component
313            .tokens
314            .push(dai_token.address);
315
316        let result = HashflowState::try_from_with_header(
317            snapshot,
318            TimestampHeader::default(),
319            &HashMap::new(),
320            &tokens,
321            &DecoderContext::new(),
322        )
323        .await;
324
325        assert!(result.is_err());
326        assert!(matches!(result.unwrap_err(), InvalidSnapshotError::ValueError(_)));
327    }
328
329    #[tokio::test]
330    async fn test_try_from_invalid_levels_json() {
331        env::set_var("HASHFLOW_USER", "test_user");
332        env::set_var("HASHFLOW_KEY", "test_key");
333
334        let (mut snapshot, tokens) = create_test_snapshot();
335
336        // Insert invalid JSON for levels
337        snapshot.state.attributes.insert(
338            "levels".to_string(),
339            "invalid json"
340                .as_bytes()
341                .to_vec()
342                .into(),
343        );
344
345        let result = HashflowState::try_from_with_header(
346            snapshot,
347            TimestampHeader::default(),
348            &HashMap::new(),
349            &tokens,
350            &DecoderContext::new(),
351        )
352        .await;
353
354        assert!(result.is_err());
355        assert!(matches!(result.unwrap_err(), InvalidSnapshotError::ValueError(_)));
356    }
357
358    #[tokio::test]
359    async fn test_try_from_missing_mm() {
360        env::set_var("HASHFLOW_USER", "test_user");
361        env::set_var("HASHFLOW_KEY", "test_key");
362
363        let (mut snapshot, tokens) = create_test_snapshot();
364        snapshot.state.attributes.remove("mm");
365
366        let result = HashflowState::try_from_with_header(
367            snapshot,
368            TimestampHeader::default(),
369            &HashMap::new(),
370            &tokens,
371            &DecoderContext::new(),
372        )
373        .await;
374
375        assert!(result.is_err());
376        assert!(matches!(result.unwrap_err(), InvalidSnapshotError::MissingAttribute(_)));
377    }
378}