Skip to main content

chainstream_sdk/stream/
client.rs

1//! Centrifuge WebSocket client implementation for ChainStream Stream API
2//!
3//! This module provides real-time data streaming capabilities using the Centrifuge protocol.
4
5use std::collections::HashMap;
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use std::sync::Arc;
8
9use futures_util::{SinkExt, StreamExt};
10use parking_lot::RwLock;
11use serde::{Deserialize, Serialize};
12use serde_json::Value;
13use tokio::sync::mpsc;
14use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
15
16use crate::error::ChainStreamError;
17use crate::openapi::types::Resolution;
18
19use super::fields::replace_filter_fields;
20use super::models::*;
21
22/// Callback type for stream subscriptions
23pub type StreamCallback<T> = Box<dyn Fn(T) + Send + Sync + 'static>;
24
25/// Unsubscribe handle
26pub struct Unsubscribe {
27    channel: String,
28    callback_id: u64,
29    api: Arc<StreamApiInner>,
30}
31
32impl Unsubscribe {
33    /// Unsubscribe from the channel
34    pub fn unsubscribe(self) {
35        self.api.unsubscribe(&self.channel, self.callback_id);
36    }
37}
38
39/// Centrifuge protocol command
40#[derive(Debug, Serialize)]
41struct CentrifugeCommand {
42    id: u64,
43    #[serde(skip_serializing_if = "Option::is_none")]
44    connect: Option<ConnectRequest>,
45    #[serde(skip_serializing_if = "Option::is_none")]
46    subscribe: Option<SubscribeRequest>,
47    #[serde(skip_serializing_if = "Option::is_none")]
48    unsubscribe: Option<UnsubscribeRequest>,
49}
50
51#[derive(Debug, Serialize)]
52struct ConnectRequest {
53    token: String,
54}
55
56#[derive(Debug, Serialize)]
57struct SubscribeRequest {
58    channel: String,
59    #[serde(skip_serializing_if = "Option::is_none")]
60    delta: Option<String>,
61    #[serde(skip_serializing_if = "Option::is_none")]
62    filter: Option<String>,
63}
64
65#[derive(Debug, Serialize)]
66struct UnsubscribeRequest {
67    channel: String,
68}
69
70/// Centrifuge protocol response
71#[derive(Debug, Deserialize)]
72#[allow(dead_code)]
73struct CentrifugeResponse {
74    #[serde(default)]
75    id: u64,
76    #[serde(default)]
77    connect: Option<Value>,
78    #[serde(default)]
79    subscribe: Option<Value>,
80    #[serde(default)]
81    push: Option<PushData>,
82    #[serde(default)]
83    error: Option<ErrorData>,
84}
85
86#[derive(Debug, Deserialize)]
87struct PushData {
88    channel: String,
89    #[serde(rename = "pub")]
90    pub_data: Option<PublicationData>,
91}
92
93#[derive(Debug, Deserialize)]
94struct PublicationData {
95    data: Value,
96}
97
98#[derive(Debug, Deserialize)]
99struct ErrorData {
100    code: i32,
101    message: String,
102}
103
104/// Internal callback wrapper
105struct CallbackWrapper {
106    id: u64,
107    callback: Box<dyn Fn(Value) + Send + Sync>,
108}
109
110/// Internal state of the StreamApi
111struct StreamApiInner {
112    url: String,
113    access_token: String,
114    connected: AtomicBool,
115    command_id: AtomicU64,
116    callback_id: AtomicU64,
117    listeners: RwLock<HashMap<String, Vec<CallbackWrapper>>>,
118    subscriptions: RwLock<HashMap<String, u64>>,
119    command_tx: RwLock<Option<mpsc::UnboundedSender<Message>>>,
120}
121
122impl StreamApiInner {
123    fn new(url: String, access_token: String) -> Self {
124        Self {
125            url,
126            access_token,
127            connected: AtomicBool::new(false),
128            command_id: AtomicU64::new(1),
129            callback_id: AtomicU64::new(1),
130            listeners: RwLock::new(HashMap::new()),
131            subscriptions: RwLock::new(HashMap::new()),
132            command_tx: RwLock::new(None),
133        }
134    }
135
136    fn next_command_id(&self) -> u64 {
137        self.command_id.fetch_add(1, Ordering::SeqCst)
138    }
139
140    fn next_callback_id(&self) -> u64 {
141        self.callback_id.fetch_add(1, Ordering::SeqCst)
142    }
143
144    fn add_listener<F>(&self, channel: &str, callback: F) -> u64
145    where
146        F: Fn(Value) + Send + Sync + 'static,
147    {
148        let callback_id = self.next_callback_id();
149        let wrapper = CallbackWrapper {
150            id: callback_id,
151            callback: Box::new(callback),
152        };
153
154        let mut listeners = self.listeners.write();
155        listeners
156            .entry(channel.to_string())
157            .or_default()
158            .push(wrapper);
159
160        callback_id
161    }
162
163    fn unsubscribe(&self, channel: &str, callback_id: u64) {
164        let mut listeners = self.listeners.write();
165        if let Some(callbacks) = listeners.get_mut(channel) {
166            callbacks.retain(|c| c.id != callback_id);
167
168            // If no more listeners, unsubscribe from channel
169            if callbacks.is_empty() {
170                listeners.remove(channel);
171                drop(listeners);
172
173                // Send unsubscribe command
174                if let Some(tx) = self.command_tx.read().as_ref() {
175                    let cmd = CentrifugeCommand {
176                        id: self.next_command_id(),
177                        connect: None,
178                        subscribe: None,
179                        unsubscribe: Some(UnsubscribeRequest {
180                            channel: channel.to_string(),
181                        }),
182                    };
183                    if let Ok(json) = serde_json::to_string(&cmd) {
184                        let _ = tx.send(Message::Text(json.into()));
185                    }
186                }
187
188                self.subscriptions.write().remove(channel);
189                log::info!("[streaming] unsubscribed from channel: {}", channel);
190            }
191        }
192    }
193
194    fn dispatch_message(&self, channel: &str, data: Value) {
195        let listeners = self.listeners.read();
196        if let Some(callbacks) = listeners.get(channel) {
197            for callback in callbacks {
198                (callback.callback)(data.clone());
199            }
200        }
201    }
202
203    fn send_subscribe(&self, channel: &str, filter: Option<&str>) {
204        if let Some(tx) = self.command_tx.read().as_ref() {
205            let cmd = CentrifugeCommand {
206                id: self.next_command_id(),
207                connect: None,
208                subscribe: Some(SubscribeRequest {
209                    channel: channel.to_string(),
210                    delta: Some("fossil".to_string()),
211                    filter: filter.map(|f| f.to_string()),
212                }),
213                unsubscribe: None,
214            };
215            if let Ok(json) = serde_json::to_string(&cmd) {
216                let _ = tx.send(Message::Text(json.into()));
217            }
218        }
219    }
220}
221
222/// Stream API client for real-time data subscriptions
223pub struct StreamApi {
224    inner: Arc<StreamApiInner>,
225}
226
227impl StreamApi {
228    /// Create a new StreamApi instance
229    pub fn new(url: &str, access_token: &str) -> Self {
230        // Build URL with token
231        let url_with_token = if url.contains('?') {
232            format!("{}&token={}", url, access_token)
233        } else {
234            format!("{}?token={}", url, access_token)
235        };
236
237        Self {
238            inner: Arc::new(StreamApiInner::new(
239                url_with_token,
240                access_token.to_string(),
241            )),
242        }
243    }
244
245    /// Check if connected
246    pub fn is_connected(&self) -> bool {
247        self.inner.connected.load(Ordering::SeqCst)
248    }
249
250    /// Connect to the WebSocket server
251    pub async fn connect(&self) -> Result<(), ChainStreamError> {
252        if self.is_connected() {
253            return Ok(());
254        }
255
256        let url = &self.inner.url;
257        log::info!("[streaming] connecting to {}", url);
258
259        let (ws_stream, _) = connect_async(url)
260            .await
261            .map_err(|e| ChainStreamError::WebSocket(format!("Failed to connect: {}", e)))?;
262
263        let (mut write, mut read) = ws_stream.split();
264
265        // Create command channel
266        let (tx, mut rx) = mpsc::unbounded_channel::<Message>();
267        *self.inner.command_tx.write() = Some(tx.clone());
268
269        // Send connect command with token
270        let connect_cmd = CentrifugeCommand {
271            id: self.inner.next_command_id(),
272            connect: Some(ConnectRequest {
273                token: self.inner.access_token.clone(),
274            }),
275            subscribe: None,
276            unsubscribe: None,
277        };
278        let connect_json = serde_json::to_string(&connect_cmd)
279            .map_err(|e| ChainStreamError::Serialization(e.to_string()))?;
280        write
281            .send(Message::Text(connect_json.into()))
282            .await
283            .map_err(|e| ChainStreamError::WebSocket(format!("Failed to send connect: {}", e)))?;
284
285        self.inner.connected.store(true, Ordering::SeqCst);
286
287        // Spawn write task
288        let inner_write = self.inner.clone();
289        tokio::spawn(async move {
290            while let Some(msg) = rx.recv().await {
291                if write.send(msg).await.is_err() {
292                    inner_write.connected.store(false, Ordering::SeqCst);
293                    break;
294                }
295            }
296        });
297
298        // Spawn read task
299        let inner_read = self.inner.clone();
300        tokio::spawn(async move {
301            while let Some(msg) = read.next().await {
302                match msg {
303                    Ok(Message::Text(text)) => {
304                        if let Ok(response) = serde_json::from_str::<CentrifugeResponse>(&text) {
305                            // Handle push messages (publications)
306                            if let Some(push) = response.push {
307                                if let Some(pub_data) = push.pub_data {
308                                    inner_read.dispatch_message(&push.channel, pub_data.data);
309                                }
310                            }
311                            // Handle errors
312                            if let Some(err) = response.error {
313                                log::error!(
314                                    "[streaming] error: code={}, message={}",
315                                    err.code,
316                                    err.message
317                                );
318                            }
319                        }
320                    }
321                    Ok(Message::Close(_)) => {
322                        log::info!("[streaming] connection closed");
323                        inner_read.connected.store(false, Ordering::SeqCst);
324                        break;
325                    }
326                    Ok(Message::Ping(data)) => {
327                        if let Some(tx) = inner_read.command_tx.read().as_ref() {
328                            let _ = tx.send(Message::Pong(data));
329                        }
330                    }
331                    Err(e) => {
332                        log::error!("[streaming] read error: {}", e);
333                        inner_read.connected.store(false, Ordering::SeqCst);
334                        break;
335                    }
336                    _ => {}
337                }
338            }
339        });
340
341        Ok(())
342    }
343
344    /// Disconnect from the WebSocket server
345    pub async fn disconnect(&self) {
346        if let Some(tx) = self.inner.command_tx.write().take() {
347            let _ = tx.send(Message::Close(None));
348        }
349        self.inner.connected.store(false, Ordering::SeqCst);
350        log::info!("[streaming] disconnected");
351    }
352
353    /// Subscribe to a channel with a raw callback
354    pub async fn subscribe<F>(
355        &self,
356        channel: &str,
357        callback: F,
358        filter: Option<&str>,
359        method_name: Option<&str>,
360    ) -> Result<Unsubscribe, ChainStreamError>
361    where
362        F: Fn(Value) + Send + Sync + 'static,
363    {
364        // Ensure connected
365        if !self.is_connected() {
366            self.connect().await?;
367        }
368
369        // Process filter if method name is provided
370        let processed_filter = match (filter, method_name) {
371            (Some(f), Some(m)) if !f.is_empty() => Some(replace_filter_fields(f, m)),
372            (Some(f), _) if !f.is_empty() => Some(f.to_string()),
373            _ => None,
374        };
375
376        // Check if already subscribed to this channel
377        let needs_subscribe = {
378            let subs = self.inner.subscriptions.read();
379            !subs.contains_key(channel)
380        };
381
382        // Add callback
383        let callback_id = self.inner.add_listener(channel, callback);
384
385        // Subscribe to channel if not already subscribed
386        if needs_subscribe {
387            self.inner
388                .send_subscribe(channel, processed_filter.as_deref());
389            self.inner
390                .subscriptions
391                .write()
392                .insert(channel.to_string(), self.inner.next_command_id());
393            log::info!("[streaming] subscribed to channel: {}", channel);
394        }
395
396        Ok(Unsubscribe {
397            channel: channel.to_string(),
398            callback_id,
399            api: self.inner.clone(),
400        })
401    }
402
403    // ==================== Subscription Methods ====================
404
405    /// Subscribe to token candle data
406    /// price_type: PriceType::Usd (default) or PriceType::Native
407    pub async fn subscribe_token_candles<F>(
408        &self,
409        chain: &str,
410        token_address: &str,
411        resolution: Resolution,
412        callback: F,
413        filter: Option<&str>,
414        price_type: Option<PriceType>,
415    ) -> Result<Unsubscribe, ChainStreamError>
416    where
417        F: Fn(Candle) + Send + Sync + 'static,
418    {
419        let prefix = match price_type.unwrap_or_default() {
420            PriceType::Native => "dex-candle-in-native",
421            PriceType::Usd => "dex-candle",
422        };
423        let channel = format!("{}:{}_{}_{}", prefix, chain, token_address, resolution);
424
425        self.subscribe(
426            &channel,
427            move |data| {
428                if let Ok(candle) = parse_candle(&data) {
429                    callback(candle);
430                }
431            },
432            filter,
433            Some("subscribe_token_candles"),
434        )
435        .await
436    }
437
438    /// Subscribe to pool candle data
439    /// price_type: PriceType::Usd (default) or PriceType::Native
440    pub async fn subscribe_pool_candles<F>(
441        &self,
442        chain: &str,
443        pool_address: &str,
444        resolution: Resolution,
445        callback: F,
446        filter: Option<&str>,
447        price_type: Option<PriceType>,
448    ) -> Result<Unsubscribe, ChainStreamError>
449    where
450        F: Fn(Candle) + Send + Sync + 'static,
451    {
452        let prefix = match price_type.unwrap_or_default() {
453            PriceType::Native => "dex-pool-candle-in-native",
454            PriceType::Usd => "dex-pool-candle",
455        };
456        let channel = format!("{}:{}_{}_{}", prefix, chain, pool_address, resolution);
457
458        self.subscribe(
459            &channel,
460            move |data| {
461                if let Ok(candle) = parse_candle(&data) {
462                    callback(candle);
463                }
464            },
465            filter,
466            Some("subscribe_pool_candles"),
467        )
468        .await
469    }
470
471    /// Subscribe to pair candle data
472    /// pair_address format: {tokenA}-{tokenB}
473    /// price_type: PriceType::Usd (default) or PriceType::Native
474    pub async fn subscribe_pair_candles<F>(
475        &self,
476        chain: &str,
477        pair_address: &str,
478        resolution: Resolution,
479        callback: F,
480        filter: Option<&str>,
481        price_type: Option<PriceType>,
482    ) -> Result<Unsubscribe, ChainStreamError>
483    where
484        F: Fn(Candle) + Send + Sync + 'static,
485    {
486        let prefix = match price_type.unwrap_or_default() {
487            PriceType::Native => "dex-pair-candle-in-native",
488            PriceType::Usd => "dex-pair-candle",
489        };
490        let channel = format!("{}:{}_{}_{}", prefix, chain, pair_address, resolution);
491
492        self.subscribe(
493            &channel,
494            move |data| {
495                if let Ok(candle) = parse_candle(&data) {
496                    callback(candle);
497                }
498            },
499            filter,
500            Some("subscribe_pair_candles"),
501        )
502        .await
503    }
504
505    /// Subscribe to token statistics
506    pub async fn subscribe_token_stats<F>(
507        &self,
508        chain: &str,
509        token_address: &str,
510        callback: F,
511        filter: Option<&str>,
512    ) -> Result<Unsubscribe, ChainStreamError>
513    where
514        F: Fn(TokenStat) + Send + Sync + 'static,
515    {
516        let channel = format!("dex-token-stats:{}_{}", chain, token_address);
517
518        self.subscribe(
519            &channel,
520            move |data| {
521                if let Ok(stat) = serde_json::from_value::<TokenStat>(data) {
522                    callback(stat);
523                }
524            },
525            filter,
526            Some("subscribe_token_stats"),
527        )
528        .await
529    }
530
531    /// Subscribe to new tokens
532    pub async fn subscribe_new_token<F>(
533        &self,
534        chain: &str,
535        callback: F,
536        filter: Option<&str>,
537    ) -> Result<Unsubscribe, ChainStreamError>
538    where
539        F: Fn(NewToken) + Send + Sync + 'static,
540    {
541        let channel = format!("dex-new-token:{}", chain);
542
543        self.subscribe(
544            &channel,
545            move |data| {
546                if let Ok(token) = parse_new_token(&data) {
547                    callback(token);
548                }
549            },
550            filter,
551            Some("subscribe_new_token"),
552        )
553        .await
554    }
555
556    /// Subscribe to token trades
557    pub async fn subscribe_token_trade<F>(
558        &self,
559        chain: &str,
560        token_address: &str,
561        callback: F,
562        filter: Option<&str>,
563    ) -> Result<Unsubscribe, ChainStreamError>
564    where
565        F: Fn(TradeActivity) + Send + Sync + 'static,
566    {
567        let channel = format!("dex-trade:{}_{}", chain, token_address);
568
569        self.subscribe(
570            &channel,
571            move |data| {
572                if let Ok(trade) = parse_trade_activity(&data) {
573                    callback(trade);
574                }
575            },
576            filter,
577            Some("subscribe_token_trades"),
578        )
579        .await
580    }
581
582    /// Subscribe to wallet balance
583    pub async fn subscribe_wallet_balance<F>(
584        &self,
585        chain: &str,
586        wallet_address: &str,
587        callback: F,
588        filter: Option<&str>,
589    ) -> Result<Unsubscribe, ChainStreamError>
590    where
591        F: Fn(WalletBalance) + Send + Sync + 'static,
592    {
593        let channel = format!("dex-wallet-balance:{}_{}", chain, wallet_address);
594
595        self.subscribe(
596            &channel,
597            move |data| {
598                if let Ok(balance) = serde_json::from_value::<WalletBalance>(data) {
599                    callback(balance);
600                }
601            },
602            filter,
603            Some("subscribe_wallet_balance"),
604        )
605        .await
606    }
607
608    /// Subscribe to token holders
609    pub async fn subscribe_token_holders<F>(
610        &self,
611        chain: &str,
612        token_address: &str,
613        callback: F,
614        filter: Option<&str>,
615    ) -> Result<Unsubscribe, ChainStreamError>
616    where
617        F: Fn(TokenHolder) + Send + Sync + 'static,
618    {
619        let channel = format!("dex-token-holder:{}_{}", chain, token_address);
620
621        self.subscribe(
622            &channel,
623            move |data| {
624                if let Ok(holder) = serde_json::from_value::<TokenHolder>(data) {
625                    callback(holder);
626                }
627            },
628            filter,
629            Some("subscribe_token_holders"),
630        )
631        .await
632    }
633
634    /// Subscribe to token supply
635    pub async fn subscribe_token_supply<F>(
636        &self,
637        chain: &str,
638        token_address: &str,
639        callback: F,
640        filter: Option<&str>,
641    ) -> Result<Unsubscribe, ChainStreamError>
642    where
643        F: Fn(TokenSupply) + Send + Sync + 'static,
644    {
645        let channel = format!("dex-token-supply:{}_{}", chain, token_address);
646
647        self.subscribe(
648            &channel,
649            move |data| {
650                if let Ok(supply) = serde_json::from_value::<TokenSupply>(data) {
651                    callback(supply);
652                }
653            },
654            filter,
655            Some("subscribe_token_supply"),
656        )
657        .await
658    }
659
660    /// Subscribe to DEX pool balance
661    pub async fn subscribe_dex_pool_balance<F>(
662        &self,
663        chain: &str,
664        pool_address: &str,
665        callback: F,
666        filter: Option<&str>,
667    ) -> Result<Unsubscribe, ChainStreamError>
668    where
669        F: Fn(DexPoolBalance) + Send + Sync + 'static,
670    {
671        let channel = format!("dex-pool-balance:{}_{}", chain, pool_address);
672
673        self.subscribe(
674            &channel,
675            move |data| {
676                if let Ok(balance) = serde_json::from_value::<DexPoolBalance>(data) {
677                    callback(balance);
678                }
679            },
680            filter,
681            Some("subscribe_dex_pool_balance"),
682        )
683        .await
684    }
685
686    /// Subscribe to token max liquidity
687    pub async fn subscribe_token_max_liquidity<F>(
688        &self,
689        chain: &str,
690        token_address: &str,
691        callback: F,
692        filter: Option<&str>,
693    ) -> Result<Unsubscribe, ChainStreamError>
694    where
695        F: Fn(TokenMaxLiquidity) + Send + Sync + 'static,
696    {
697        let channel = format!("dex-token-max-liquidity:{}_{}", chain, token_address);
698
699        self.subscribe(
700            &channel,
701            move |data| {
702                if let Ok(liquidity) = serde_json::from_value::<TokenMaxLiquidity>(data) {
703                    callback(liquidity);
704                }
705            },
706            filter,
707            Some("subscribe_token_max_liquidity"),
708        )
709        .await
710    }
711
712    /// Subscribe to token total liquidity
713    pub async fn subscribe_token_total_liquidity<F>(
714        &self,
715        chain: &str,
716        token_address: &str,
717        callback: F,
718        filter: Option<&str>,
719    ) -> Result<Unsubscribe, ChainStreamError>
720    where
721        F: Fn(TokenTotalLiquidity) + Send + Sync + 'static,
722    {
723        let channel = format!("dex-token-total-liquidity:{}_{}", chain, token_address);
724
725        self.subscribe(
726            &channel,
727            move |data| {
728                if let Ok(liquidity) = serde_json::from_value::<TokenTotalLiquidity>(data) {
729                    callback(liquidity);
730                }
731            },
732            filter,
733            Some("subscribe_token_total_liquidity"),
734        )
735        .await
736    }
737
738    /// Subscribe to wallet PnL
739    pub async fn subscribe_wallet_pnl<F>(
740        &self,
741        chain: &str,
742        wallet_address: &str,
743        callback: F,
744        filter: Option<&str>,
745    ) -> Result<Unsubscribe, ChainStreamError>
746    where
747        F: Fn(WalletTokenPnl) + Send + Sync + 'static,
748    {
749        let channel = format!("dex-wallet-pnl:{}_{}", chain, wallet_address);
750
751        self.subscribe(
752            &channel,
753            move |data| {
754                if let Ok(pnl) = serde_json::from_value::<WalletTokenPnl>(data) {
755                    callback(pnl);
756                }
757            },
758            filter,
759            Some("subscribe_wallet_pnl"),
760        )
761        .await
762    }
763
764    /// Subscribe to new tokens metadata
765    pub async fn subscribe_new_tokens_metadata<F>(
766        &self,
767        chain: &str,
768        callback: F,
769        filter: Option<&str>,
770    ) -> Result<Unsubscribe, ChainStreamError>
771    where
772        F: Fn(TokenMetadata) + Send + Sync + 'static,
773    {
774        let channel = format!("dex-new-token-metadata:{}", chain);
775
776        self.subscribe(
777            &channel,
778            move |data| {
779                if let Ok(metadata) = serde_json::from_value::<TokenMetadata>(data) {
780                    callback(metadata);
781                }
782            },
783            filter,
784            Some("subscribe_new_tokens_metadata"),
785        )
786        .await
787    }
788
789    /// Subscribe to ranking tokens list
790    pub async fn subscribe_ranking_tokens_list<F>(
791        &self,
792        chain: &str,
793        ranking_type: RankingType,
794        callback: F,
795        filter: Option<&str>,
796    ) -> Result<Unsubscribe, ChainStreamError>
797    where
798        F: Fn(RankingTokenList) + Send + Sync + 'static,
799    {
800        let ranking_str = match ranking_type {
801            RankingType::New => "new",
802            RankingType::Hot => "trending",
803            RankingType::Stocks => "stocks",
804            RankingType::FinalStretch => "completed",
805            RankingType::Migrated => "graduated",
806        };
807        let channel = format!("dex-ranking-token-list:{}_{}", chain, ranking_str);
808
809        self.subscribe(
810            &channel,
811            move |data| {
812                if let Ok(ranking) = serde_json::from_value::<RankingTokenList>(data) {
813                    callback(ranking);
814                }
815            },
816            filter,
817            None,
818        )
819        .await
820    }
821}
822
823// Helper functions to parse data with short field names
824
825fn parse_candle(data: &Value) -> Result<Candle, String> {
826    let obj = data
827        .as_object()
828        .ok_or_else(|| "expected object".to_string())?;
829
830    Ok(Candle {
831        address: get_string(obj, "a"),
832        open: get_string(obj, "o"),
833        close: get_string(obj, "c"),
834        high: get_string(obj, "h"),
835        low: get_string(obj, "l"),
836        volume: get_string(obj, "v"),
837        resolution: get_string(obj, "r"),
838        time: get_i64(obj, "t"),
839        number: get_i32(obj, "n"),
840    })
841}
842
843fn parse_new_token(data: &Value) -> Result<NewToken, String> {
844    let obj = data
845        .as_object()
846        .ok_or_else(|| "expected object".to_string())?;
847
848    Ok(NewToken {
849        token_address: get_string(obj, "a"),
850        name: get_string(obj, "n"),
851        symbol: get_string(obj, "s"),
852        decimals: obj.get("d").and_then(|v| v.as_i64()).map(|v| v as i32),
853        launch_from: None, // TODO: parse if present
854        created_at_ms: get_i64(obj, "cts"),
855    })
856}
857
858fn parse_trade_activity(data: &Value) -> Result<TradeActivity, String> {
859    let obj = data
860        .as_object()
861        .ok_or_else(|| "expected object".to_string())?;
862
863    Ok(TradeActivity {
864        token_address: get_string(obj, "a"),
865        timestamp: get_i64(obj, "t"),
866        kind: get_string(obj, "k"),
867        buy_amount: get_string(obj, "ba"),
868        buy_amount_in_usd: get_string(obj, "baiu"),
869        buy_token_address: get_string(obj, "btma"),
870        buy_token_name: get_string(obj, "btn"),
871        buy_token_symbol: get_string(obj, "bts"),
872        buy_wallet_address: get_string(obj, "bwa"),
873        sell_amount: get_string(obj, "sa"),
874        sell_amount_in_usd: get_string(obj, "saiu"),
875        sell_token_address: get_string(obj, "stma"),
876        sell_token_name: get_string(obj, "stn"),
877        sell_token_symbol: get_string(obj, "sts"),
878        sell_wallet_address: get_string(obj, "swa"),
879        tx_hash: get_string(obj, "h"),
880    })
881}
882
883fn get_string(obj: &serde_json::Map<String, Value>, key: &str) -> String {
884    obj.get(key)
885        .and_then(|v| v.as_str())
886        .unwrap_or_default()
887        .to_string()
888}
889
890fn get_i64(obj: &serde_json::Map<String, Value>, key: &str) -> i64 {
891    obj.get(key).and_then(|v| v.as_i64()).unwrap_or_default()
892}
893
894fn get_i32(obj: &serde_json::Map<String, Value>, key: &str) -> i32 {
895    obj.get(key)
896        .and_then(|v| v.as_i64())
897        .map(|v| v as i32)
898        .unwrap_or_default()
899}