Skip to main content

chainstream_sdk/stream/
client.rs

1//! Centrifuge WebSocket client implementation for ChainStream Stream API
2//!
3//! This module provides real-time data streaming capabilities using the Centrifuge protocol.
4
5use std::collections::HashMap;
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use std::sync::Arc;
8
9use futures_util::{SinkExt, StreamExt};
10use parking_lot::RwLock;
11use serde::{Deserialize, Serialize};
12use serde_json::Value;
13use tokio::sync::mpsc;
14use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
15
16use crate::error::ChainStreamError;
17use crate::openapi::types::Resolution;
18
19use super::fields::replace_filter_fields;
20use super::models::*;
21
22/// Callback type for stream subscriptions
23pub type StreamCallback<T> = Box<dyn Fn(T) + Send + Sync + 'static>;
24
25/// Unsubscribe handle
26pub struct Unsubscribe {
27    channel: String,
28    callback_id: u64,
29    api: Arc<StreamApiInner>,
30}
31
32impl Unsubscribe {
33    /// Unsubscribe from the channel
34    pub fn unsubscribe(self) {
35        self.api.unsubscribe(&self.channel, self.callback_id);
36    }
37}
38
39/// Centrifuge protocol command
40#[derive(Debug, Serialize)]
41struct CentrifugeCommand {
42    id: u64,
43    #[serde(skip_serializing_if = "Option::is_none")]
44    connect: Option<ConnectRequest>,
45    #[serde(skip_serializing_if = "Option::is_none")]
46    subscribe: Option<SubscribeRequest>,
47    #[serde(skip_serializing_if = "Option::is_none")]
48    unsubscribe: Option<UnsubscribeRequest>,
49}
50
51#[derive(Debug, Serialize)]
52struct ConnectRequest {
53    token: String,
54}
55
56#[derive(Debug, Serialize)]
57struct SubscribeRequest {
58    channel: String,
59    #[serde(skip_serializing_if = "Option::is_none")]
60    delta: Option<String>,
61    #[serde(skip_serializing_if = "Option::is_none")]
62    filter: Option<String>,
63}
64
65#[derive(Debug, Serialize)]
66struct UnsubscribeRequest {
67    channel: String,
68}
69
70/// Centrifuge protocol response
71#[derive(Debug, Deserialize)]
72#[allow(dead_code)]
73struct CentrifugeResponse {
74    #[serde(default)]
75    id: u64,
76    #[serde(default)]
77    connect: Option<Value>,
78    #[serde(default)]
79    subscribe: Option<Value>,
80    #[serde(default)]
81    push: Option<PushData>,
82    #[serde(default)]
83    error: Option<ErrorData>,
84}
85
86#[derive(Debug, Deserialize)]
87struct PushData {
88    channel: String,
89    #[serde(rename = "pub")]
90    pub_data: Option<PublicationData>,
91}
92
93#[derive(Debug, Deserialize)]
94struct PublicationData {
95    data: Value,
96}
97
98#[derive(Debug, Deserialize)]
99struct ErrorData {
100    code: i32,
101    message: String,
102}
103
104/// Internal callback wrapper
105struct CallbackWrapper {
106    id: u64,
107    callback: Box<dyn Fn(Value) + Send + Sync>,
108}
109
110/// Internal state of the StreamApi
111struct StreamApiInner {
112    url: String,
113    access_token: String,
114    connected: AtomicBool,
115    command_id: AtomicU64,
116    callback_id: AtomicU64,
117    listeners: RwLock<HashMap<String, Vec<CallbackWrapper>>>,
118    subscriptions: RwLock<HashMap<String, u64>>,
119    command_tx: RwLock<Option<mpsc::UnboundedSender<Message>>>,
120}
121
122impl StreamApiInner {
123    fn new(url: String, access_token: String) -> Self {
124        Self {
125            url,
126            access_token,
127            connected: AtomicBool::new(false),
128            command_id: AtomicU64::new(1),
129            callback_id: AtomicU64::new(1),
130            listeners: RwLock::new(HashMap::new()),
131            subscriptions: RwLock::new(HashMap::new()),
132            command_tx: RwLock::new(None),
133        }
134    }
135
136    fn next_command_id(&self) -> u64 {
137        self.command_id.fetch_add(1, Ordering::SeqCst)
138    }
139
140    fn next_callback_id(&self) -> u64 {
141        self.callback_id.fetch_add(1, Ordering::SeqCst)
142    }
143
144    fn add_listener<F>(&self, channel: &str, callback: F) -> u64
145    where
146        F: Fn(Value) + Send + Sync + 'static,
147    {
148        let callback_id = self.next_callback_id();
149        let wrapper = CallbackWrapper {
150            id: callback_id,
151            callback: Box::new(callback),
152        };
153
154        let mut listeners = self.listeners.write();
155        listeners
156            .entry(channel.to_string())
157            .or_default()
158            .push(wrapper);
159
160        callback_id
161    }
162
163    fn unsubscribe(&self, channel: &str, callback_id: u64) {
164        let mut listeners = self.listeners.write();
165        if let Some(callbacks) = listeners.get_mut(channel) {
166            callbacks.retain(|c| c.id != callback_id);
167
168            // If no more listeners, unsubscribe from channel
169            if callbacks.is_empty() {
170                listeners.remove(channel);
171                drop(listeners);
172
173                // Send unsubscribe command
174                if let Some(tx) = self.command_tx.read().as_ref() {
175                    let cmd = CentrifugeCommand {
176                        id: self.next_command_id(),
177                        connect: None,
178                        subscribe: None,
179                        unsubscribe: Some(UnsubscribeRequest {
180                            channel: channel.to_string(),
181                        }),
182                    };
183                    if let Ok(json) = serde_json::to_string(&cmd) {
184                        let _ = tx.send(Message::Text(json.into()));
185                    }
186                }
187
188                self.subscriptions.write().remove(channel);
189                log::info!("[streaming] unsubscribed from channel: {}", channel);
190            }
191        }
192    }
193
194    fn dispatch_message(&self, channel: &str, data: Value) {
195        let listeners = self.listeners.read();
196        if let Some(callbacks) = listeners.get(channel) {
197            for callback in callbacks {
198                (callback.callback)(data.clone());
199            }
200        }
201    }
202
203    fn send_subscribe(&self, channel: &str, filter: Option<&str>) {
204        if let Some(tx) = self.command_tx.read().as_ref() {
205            let cmd = CentrifugeCommand {
206                id: self.next_command_id(),
207                connect: None,
208                subscribe: Some(SubscribeRequest {
209                    channel: channel.to_string(),
210                    delta: Some("fossil".to_string()),
211                    filter: filter.map(|f| f.to_string()),
212                }),
213                unsubscribe: None,
214            };
215            if let Ok(json) = serde_json::to_string(&cmd) {
216                let _ = tx.send(Message::Text(json.into()));
217            }
218        }
219    }
220}
221
222/// Stream API client for real-time data subscriptions
223pub struct StreamApi {
224    inner: Arc<StreamApiInner>,
225}
226
227impl StreamApi {
228    /// Create a new StreamApi instance
229    pub fn new(url: &str, access_token: &str) -> Self {
230        // Build URL with token
231        let url_with_token = if url.contains('?') {
232            format!("{}&token={}", url, access_token)
233        } else {
234            format!("{}?token={}", url, access_token)
235        };
236
237        Self {
238            inner: Arc::new(StreamApiInner::new(
239                url_with_token,
240                access_token.to_string(),
241            )),
242        }
243    }
244
245    /// Check if connected
246    pub fn is_connected(&self) -> bool {
247        self.inner.connected.load(Ordering::SeqCst)
248    }
249
250    /// Connect to the WebSocket server
251    pub async fn connect(&self) -> Result<(), ChainStreamError> {
252        if self.is_connected() {
253            return Ok(());
254        }
255
256        let url = &self.inner.url;
257        log::info!("[streaming] connecting to {}", url);
258
259        let (ws_stream, _) = connect_async(url)
260            .await
261            .map_err(|e| ChainStreamError::WebSocket(format!("Failed to connect: {}", e)))?;
262
263        let (mut write, mut read) = ws_stream.split();
264
265        // Create command channel
266        let (tx, mut rx) = mpsc::unbounded_channel::<Message>();
267        *self.inner.command_tx.write() = Some(tx.clone());
268
269        // Send connect command with token
270        let connect_cmd = CentrifugeCommand {
271            id: self.inner.next_command_id(),
272            connect: Some(ConnectRequest {
273                token: self.inner.access_token.clone(),
274            }),
275            subscribe: None,
276            unsubscribe: None,
277        };
278        let connect_json = serde_json::to_string(&connect_cmd)
279            .map_err(|e| ChainStreamError::Serialization(e.to_string()))?;
280        write
281            .send(Message::Text(connect_json.into()))
282            .await
283            .map_err(|e| ChainStreamError::WebSocket(format!("Failed to send connect: {}", e)))?;
284
285        self.inner.connected.store(true, Ordering::SeqCst);
286
287        // Spawn write task
288        let inner_write = self.inner.clone();
289        tokio::spawn(async move {
290            while let Some(msg) = rx.recv().await {
291                if write.send(msg).await.is_err() {
292                    inner_write.connected.store(false, Ordering::SeqCst);
293                    break;
294                }
295            }
296        });
297
298        // Spawn read task
299        let inner_read = self.inner.clone();
300        tokio::spawn(async move {
301            while let Some(msg) = read.next().await {
302                match msg {
303                    Ok(Message::Text(text)) => {
304                        if let Ok(response) = serde_json::from_str::<CentrifugeResponse>(&text) {
305                            // Handle push messages (publications)
306                            if let Some(push) = response.push {
307                                if let Some(pub_data) = push.pub_data {
308                                    inner_read.dispatch_message(&push.channel, pub_data.data);
309                                }
310                            }
311                            // Handle errors
312                            if let Some(err) = response.error {
313                                log::error!(
314                                    "[streaming] error: code={}, message={}",
315                                    err.code,
316                                    err.message
317                                );
318                            }
319                        }
320                    }
321                    Ok(Message::Close(_)) => {
322                        log::info!("[streaming] connection closed");
323                        inner_read.connected.store(false, Ordering::SeqCst);
324                        break;
325                    }
326                    Ok(Message::Ping(data)) => {
327                        if let Some(tx) = inner_read.command_tx.read().as_ref() {
328                            let _ = tx.send(Message::Pong(data));
329                        }
330                    }
331                    Err(e) => {
332                        log::error!("[streaming] read error: {}", e);
333                        inner_read.connected.store(false, Ordering::SeqCst);
334                        break;
335                    }
336                    _ => {}
337                }
338            }
339        });
340
341        Ok(())
342    }
343
344    /// Disconnect from the WebSocket server
345    pub async fn disconnect(&self) {
346        if let Some(tx) = self.inner.command_tx.write().take() {
347            let _ = tx.send(Message::Close(None));
348        }
349        self.inner.connected.store(false, Ordering::SeqCst);
350        log::info!("[streaming] disconnected");
351    }
352
353    /// Subscribe to a channel with a raw callback
354    pub async fn subscribe<F>(
355        &self,
356        channel: &str,
357        callback: F,
358        filter: Option<&str>,
359        method_name: Option<&str>,
360    ) -> Result<Unsubscribe, ChainStreamError>
361    where
362        F: Fn(Value) + Send + Sync + 'static,
363    {
364        // Ensure connected
365        if !self.is_connected() {
366            self.connect().await?;
367        }
368
369        // Process filter if method name is provided
370        let processed_filter = match (filter, method_name) {
371            (Some(f), Some(m)) if !f.is_empty() => Some(replace_filter_fields(f, m)),
372            (Some(f), _) if !f.is_empty() => Some(f.to_string()),
373            _ => None,
374        };
375
376        // Check if already subscribed to this channel
377        let needs_subscribe = {
378            let subs = self.inner.subscriptions.read();
379            !subs.contains_key(channel)
380        };
381
382        // Add callback
383        let callback_id = self.inner.add_listener(channel, callback);
384
385        // Subscribe to channel if not already subscribed
386        if needs_subscribe {
387            self.inner
388                .send_subscribe(channel, processed_filter.as_deref());
389            self.inner
390                .subscriptions
391                .write()
392                .insert(channel.to_string(), self.inner.next_command_id());
393            log::info!("[streaming] subscribed to channel: {}", channel);
394        }
395
396        Ok(Unsubscribe {
397            channel: channel.to_string(),
398            callback_id,
399            api: self.inner.clone(),
400        })
401    }
402
403    // ==================== Subscription Methods ====================
404
405    /// Subscribe to token candle data
406    /// price_type: PriceType::Usd (default) or PriceType::Native
407    pub async fn subscribe_token_candles<F>(
408        &self,
409        chain: &str,
410        token_address: &str,
411        resolution: Resolution,
412        callback: F,
413        filter: Option<&str>,
414        price_type: Option<PriceType>,
415    ) -> Result<Unsubscribe, ChainStreamError>
416    where
417        F: Fn(Candle) + Send + Sync + 'static,
418    {
419        let prefix = match price_type.unwrap_or_default() {
420            PriceType::Native => "dex-candle-in-native",
421            PriceType::Usd => "dex-candle",
422        };
423        let channel = format!("{}:{}_{}_{}", prefix, chain, token_address, resolution);
424
425        self.subscribe(
426            &channel,
427            move |data| {
428                if let Ok(candle) = parse_candle(&data) {
429                    callback(candle);
430                }
431            },
432            filter,
433            Some("subscribe_token_candles"),
434        )
435        .await
436    }
437
438    /// Subscribe to pool candle data
439    /// price_type: PriceType::Usd (default) or PriceType::Native
440    pub async fn subscribe_pool_candles<F>(
441        &self,
442        chain: &str,
443        pool_address: &str,
444        resolution: Resolution,
445        callback: F,
446        filter: Option<&str>,
447        price_type: Option<PriceType>,
448    ) -> Result<Unsubscribe, ChainStreamError>
449    where
450        F: Fn(Candle) + Send + Sync + 'static,
451    {
452        let prefix = match price_type.unwrap_or_default() {
453            PriceType::Native => "dex-pool-candle-in-native",
454            PriceType::Usd => "dex-pool-candle",
455        };
456        let channel = format!("{}:{}_{}_{}", prefix, chain, pool_address, resolution);
457
458        self.subscribe(
459            &channel,
460            move |data| {
461                if let Ok(candle) = parse_candle(&data) {
462                    callback(candle);
463                }
464            },
465            filter,
466            Some("subscribe_pool_candles"),
467        )
468        .await
469    }
470
471    /// Subscribe to pair candle data
472    /// pair_address format: {tokenA}-{tokenB}
473    /// price_type: PriceType::Usd (default) or PriceType::Native
474    pub async fn subscribe_pair_candles<F>(
475        &self,
476        chain: &str,
477        pair_address: &str,
478        resolution: Resolution,
479        callback: F,
480        filter: Option<&str>,
481        price_type: Option<PriceType>,
482    ) -> Result<Unsubscribe, ChainStreamError>
483    where
484        F: Fn(Candle) + Send + Sync + 'static,
485    {
486        let prefix = match price_type.unwrap_or_default() {
487            PriceType::Native => "dex-pair-candle-in-native",
488            PriceType::Usd => "dex-pair-candle",
489        };
490        let channel = format!("{}:{}_{}_{}", prefix, chain, pair_address, resolution);
491
492        self.subscribe(
493            &channel,
494            move |data| {
495                if let Ok(candle) = parse_candle(&data) {
496                    callback(candle);
497                }
498            },
499            filter,
500            Some("subscribe_pair_candles"),
501        )
502        .await
503    }
504
505    /// Subscribe to token statistics
506    pub async fn subscribe_token_stats<F>(
507        &self,
508        chain: &str,
509        token_address: &str,
510        callback: F,
511        filter: Option<&str>,
512    ) -> Result<Unsubscribe, ChainStreamError>
513    where
514        F: Fn(TokenStat) + Send + Sync + 'static,
515    {
516        let channel = format!("dex-token-stats:{}_{}", chain, token_address);
517
518        self.subscribe(
519            &channel,
520            move |data| {
521                if let Ok(stat) = serde_json::from_value::<TokenStat>(data) {
522                    callback(stat);
523                }
524            },
525            filter,
526            Some("subscribe_token_stats"),
527        )
528        .await
529    }
530
531    /// Subscribe to new tokens
532    pub async fn subscribe_new_token<F>(
533        &self,
534        chain: &str,
535        callback: F,
536        filter: Option<&str>,
537    ) -> Result<Unsubscribe, ChainStreamError>
538    where
539        F: Fn(NewToken) + Send + Sync + 'static,
540    {
541        let channel = format!("dex-new-token:{}", chain);
542
543        self.subscribe(
544            &channel,
545            move |data| {
546                if let Ok(token) = parse_new_token(&data) {
547                    callback(token);
548                }
549            },
550            filter,
551            Some("subscribe_new_token"),
552        )
553        .await
554    }
555
556    /// Subscribe to token trades
557    pub async fn subscribe_token_trade<F>(
558        &self,
559        chain: &str,
560        token_address: &str,
561        callback: F,
562        filter: Option<&str>,
563    ) -> Result<Unsubscribe, ChainStreamError>
564    where
565        F: Fn(TradeActivity) + Send + Sync + 'static,
566    {
567        let channel = format!("dex-trade:{}_{}", chain, token_address);
568
569        self.subscribe(
570            &channel,
571            move |data| {
572                if let Ok(trade) = parse_trade_activity(&data) {
573                    callback(trade);
574                }
575            },
576            filter,
577            Some("subscribe_token_trades"),
578        )
579        .await
580    }
581
582    /// Subscribe to prediction event activities.
583    pub async fn subscribe_prediction_event_activities<F>(
584        &self,
585        event_slug: &str,
586        callback: F,
587        filter: Option<&str>,
588    ) -> Result<Unsubscribe, ChainStreamError>
589    where
590        F: Fn(PredictionActivity) + Send + Sync + 'static,
591    {
592        let channel = prediction_activity_channel(PredictionActivityChannelKind::Event, event_slug);
593
594        self.subscribe(
595            &channel,
596            move |data| {
597                if let Ok(activity) = parse_prediction_activity(&data) {
598                    callback(activity);
599                }
600            },
601            filter,
602            Some("subscribe_prediction_event_activities"),
603        )
604        .await
605    }
606
607    /// Subscribe to prediction token activities.
608    pub async fn subscribe_prediction_token_activities<F>(
609        &self,
610        token_id: &str,
611        callback: F,
612        filter: Option<&str>,
613    ) -> Result<Unsubscribe, ChainStreamError>
614    where
615        F: Fn(PredictionActivity) + Send + Sync + 'static,
616    {
617        let channel = prediction_activity_channel(PredictionActivityChannelKind::Token, token_id);
618
619        self.subscribe(
620            &channel,
621            move |data| {
622                if let Ok(activity) = parse_prediction_activity(&data) {
623                    callback(activity);
624                }
625            },
626            filter,
627            Some("subscribe_prediction_token_activities"),
628        )
629        .await
630    }
631
632    /// Subscribe to wallet balance
633    pub async fn subscribe_wallet_balance<F>(
634        &self,
635        chain: &str,
636        wallet_address: &str,
637        callback: F,
638        filter: Option<&str>,
639    ) -> Result<Unsubscribe, ChainStreamError>
640    where
641        F: Fn(WalletBalance) + Send + Sync + 'static,
642    {
643        let channel = format!("dex-wallet-balance:{}_{}", chain, wallet_address);
644
645        self.subscribe(
646            &channel,
647            move |data| {
648                if let Ok(balance) = serde_json::from_value::<WalletBalance>(data) {
649                    callback(balance);
650                }
651            },
652            filter,
653            Some("subscribe_wallet_balance"),
654        )
655        .await
656    }
657
658    /// Subscribe to token holders
659    pub async fn subscribe_token_holders<F>(
660        &self,
661        chain: &str,
662        token_address: &str,
663        callback: F,
664        filter: Option<&str>,
665    ) -> Result<Unsubscribe, ChainStreamError>
666    where
667        F: Fn(TokenHolder) + Send + Sync + 'static,
668    {
669        let channel = format!("dex-token-holder:{}_{}", chain, token_address);
670
671        self.subscribe(
672            &channel,
673            move |data| {
674                if let Ok(holder) = serde_json::from_value::<TokenHolder>(data) {
675                    callback(holder);
676                }
677            },
678            filter,
679            Some("subscribe_token_holders"),
680        )
681        .await
682    }
683
684    /// Subscribe to token supply
685    pub async fn subscribe_token_supply<F>(
686        &self,
687        chain: &str,
688        token_address: &str,
689        callback: F,
690        filter: Option<&str>,
691    ) -> Result<Unsubscribe, ChainStreamError>
692    where
693        F: Fn(TokenSupply) + Send + Sync + 'static,
694    {
695        let channel = format!("dex-token-supply:{}_{}", chain, token_address);
696
697        self.subscribe(
698            &channel,
699            move |data| {
700                if let Ok(supply) = serde_json::from_value::<TokenSupply>(data) {
701                    callback(supply);
702                }
703            },
704            filter,
705            Some("subscribe_token_supply"),
706        )
707        .await
708    }
709
710    /// Subscribe to DEX pool balance
711    pub async fn subscribe_dex_pool_balance<F>(
712        &self,
713        chain: &str,
714        pool_address: &str,
715        callback: F,
716        filter: Option<&str>,
717    ) -> Result<Unsubscribe, ChainStreamError>
718    where
719        F: Fn(DexPoolBalance) + Send + Sync + 'static,
720    {
721        let channel = format!("dex-pool-balance:{}_{}", chain, pool_address);
722
723        self.subscribe(
724            &channel,
725            move |data| {
726                if let Ok(balance) = serde_json::from_value::<DexPoolBalance>(data) {
727                    callback(balance);
728                }
729            },
730            filter,
731            Some("subscribe_dex_pool_balance"),
732        )
733        .await
734    }
735
736    /// Subscribe to token max liquidity
737    pub async fn subscribe_token_max_liquidity<F>(
738        &self,
739        chain: &str,
740        token_address: &str,
741        callback: F,
742        filter: Option<&str>,
743    ) -> Result<Unsubscribe, ChainStreamError>
744    where
745        F: Fn(TokenMaxLiquidity) + Send + Sync + 'static,
746    {
747        let channel = format!("dex-token-max-liquidity:{}_{}", chain, token_address);
748
749        self.subscribe(
750            &channel,
751            move |data| {
752                if let Ok(liquidity) = serde_json::from_value::<TokenMaxLiquidity>(data) {
753                    callback(liquidity);
754                }
755            },
756            filter,
757            Some("subscribe_token_max_liquidity"),
758        )
759        .await
760    }
761
762    /// Subscribe to token total liquidity
763    pub async fn subscribe_token_total_liquidity<F>(
764        &self,
765        chain: &str,
766        token_address: &str,
767        callback: F,
768        filter: Option<&str>,
769    ) -> Result<Unsubscribe, ChainStreamError>
770    where
771        F: Fn(TokenTotalLiquidity) + Send + Sync + 'static,
772    {
773        let channel = format!("dex-token-total-liquidity:{}_{}", chain, token_address);
774
775        self.subscribe(
776            &channel,
777            move |data| {
778                if let Ok(liquidity) = serde_json::from_value::<TokenTotalLiquidity>(data) {
779                    callback(liquidity);
780                }
781            },
782            filter,
783            Some("subscribe_token_total_liquidity"),
784        )
785        .await
786    }
787
788    /// Subscribe to wallet PnL
789    pub async fn subscribe_wallet_pnl<F>(
790        &self,
791        chain: &str,
792        wallet_address: &str,
793        callback: F,
794        filter: Option<&str>,
795    ) -> Result<Unsubscribe, ChainStreamError>
796    where
797        F: Fn(WalletTokenPnl) + Send + Sync + 'static,
798    {
799        let channel = format!("dex-wallet-pnl:{}_{}", chain, wallet_address);
800
801        self.subscribe(
802            &channel,
803            move |data| {
804                if let Ok(pnl) = serde_json::from_value::<WalletTokenPnl>(data) {
805                    callback(pnl);
806                }
807            },
808            filter,
809            Some("subscribe_wallet_pnl"),
810        )
811        .await
812    }
813
814    /// Subscribe to new tokens metadata (batch, aggregated every 1 second)
815    /// Channel: dex-new-tokens-metadata:{chain}
816    /// No CEL filter support
817    pub async fn subscribe_new_tokens_metadata<F>(
818        &self,
819        chain: &str,
820        callback: F,
821    ) -> Result<Unsubscribe, ChainStreamError>
822    where
823        F: Fn(Vec<TokenMetadata>) + Send + Sync + 'static,
824    {
825        let channel = format!("dex-new-tokens-metadata:{}", chain);
826
827        self.subscribe(
828            &channel,
829            move |data| {
830                if let Some(arr) = data.as_array() {
831                    let result: Vec<TokenMetadata> = arr
832                        .iter()
833                        .filter_map(|item| item.as_object().map(parse_token_metadata))
834                        .collect();
835                    callback(result);
836                }
837            },
838            None,
839            Some("subscribe_new_tokens_metadata"),
840        )
841        .await
842    }
843
844    /// Subscribe to new tokens list (batch from token-created-to-realtime-pipeline)
845    /// Channel: dex-new-tokens:{chain}
846    /// No CEL filter support
847    pub async fn subscribe_new_tokens<F>(
848        &self,
849        chain: &str,
850        callback: F,
851    ) -> Result<Unsubscribe, ChainStreamError>
852    where
853        F: Fn(Vec<TokenMetadata>) + Send + Sync + 'static,
854    {
855        let channel = format!("dex-new-tokens:{}", chain);
856
857        self.subscribe(
858            &channel,
859            move |data| {
860                if let Some(arr) = data.as_array() {
861                    let result: Vec<TokenMetadata> = arr
862                        .iter()
863                        .filter_map(|item| item.as_object().map(parse_token_metadata))
864                        .collect();
865                    callback(result);
866                }
867            },
868            None,
869            Some("subscribe_new_tokens"),
870        )
871        .await
872    }
873
874    /// Subscribe to ranking tokens list
875    pub async fn subscribe_ranking_tokens_list<F>(
876        &self,
877        chain: &str,
878        ranking_type: RankingType,
879        callback: F,
880        filter: Option<&str>,
881    ) -> Result<Unsubscribe, ChainStreamError>
882    where
883        F: Fn(RankingTokenList) + Send + Sync + 'static,
884    {
885        let ranking_str = match ranking_type {
886            RankingType::New => "new",
887            RankingType::Hot => "trending",
888            RankingType::Stocks => "stocks",
889            RankingType::FinalStretch => "completed",
890            RankingType::Migrated => "graduated",
891        };
892        let channel = format!("dex-ranking-token-list:{}_{}", chain, ranking_str);
893
894        self.subscribe(
895            &channel,
896            move |data| {
897                if let Ok(ranking) = serde_json::from_value::<RankingTokenList>(data) {
898                    callback(ranking);
899                }
900            },
901            filter,
902            None,
903        )
904        .await
905    }
906}
907
908// Helper functions to parse data with short field names
909
910fn prediction_activity_channel(kind: PredictionActivityChannelKind, key: &str) -> String {
911    match kind {
912        PredictionActivityChannelKind::Event => format!("pred:evt:{key}:act"),
913        PredictionActivityChannelKind::Token => format!("pred:tok:{key}:act"),
914    }
915}
916
917fn parse_candle(data: &Value) -> Result<Candle, String> {
918    let obj = data
919        .as_object()
920        .ok_or_else(|| "expected object".to_string())?;
921
922    Ok(Candle {
923        address: get_string(obj, "a"),
924        open: get_string(obj, "o"),
925        close: get_string(obj, "c"),
926        high: get_string(obj, "h"),
927        low: get_string(obj, "l"),
928        volume: get_string(obj, "v"),
929        resolution: get_string(obj, "r"),
930        time: get_i64(obj, "t"),
931        number: get_i32(obj, "n"),
932    })
933}
934
935fn parse_social_media(obj: &serde_json::Map<String, Value>) -> SocialMedia {
936    SocialMedia {
937        twitter: obj
938            .get("tw")
939            .and_then(|v| v.as_str())
940            .map(|s| s.to_string()),
941        telegram: obj
942            .get("tg")
943            .and_then(|v| v.as_str())
944            .map(|s| s.to_string()),
945        website: obj.get("w").and_then(|v| v.as_str()).map(|s| s.to_string()),
946        tiktok: obj
947            .get("tt")
948            .and_then(|v| v.as_str())
949            .map(|s| s.to_string()),
950        discord: obj
951            .get("dc")
952            .and_then(|v| v.as_str())
953            .map(|s| s.to_string()),
954        facebook: obj
955            .get("fb")
956            .and_then(|v| v.as_str())
957            .map(|s| s.to_string()),
958        github: obj
959            .get("gh")
960            .and_then(|v| v.as_str())
961            .map(|s| s.to_string()),
962        instagram: obj
963            .get("ig")
964            .and_then(|v| v.as_str())
965            .map(|s| s.to_string()),
966        linkedin: obj
967            .get("li")
968            .and_then(|v| v.as_str())
969            .map(|s| s.to_string()),
970        medium: obj
971            .get("md")
972            .and_then(|v| v.as_str())
973            .map(|s| s.to_string()),
974        reddit: obj
975            .get("rd")
976            .and_then(|v| v.as_str())
977            .map(|s| s.to_string()),
978        youtube: obj
979            .get("yt")
980            .and_then(|v| v.as_str())
981            .map(|s| s.to_string()),
982        bitbucket: obj
983            .get("bb")
984            .and_then(|v| v.as_str())
985            .map(|s| s.to_string()),
986    }
987}
988
989fn parse_dex_protocol(obj: &serde_json::Map<String, Value>) -> DexProtocol {
990    DexProtocol {
991        program_address: obj
992            .get("pa")
993            .and_then(|v| v.as_str())
994            .map(|s| s.to_string()),
995        protocol_family: obj
996            .get("pf")
997            .and_then(|v| v.as_str())
998            .map(|s| s.to_string()),
999        protocol_name: obj
1000            .get("pn")
1001            .and_then(|v| v.as_str())
1002            .map(|s| s.to_string()),
1003    }
1004}
1005
1006fn parse_token_metadata(obj: &serde_json::Map<String, Value>) -> TokenMetadata {
1007    TokenMetadata {
1008        token_address: get_string(obj, "a"),
1009        name: obj.get("n").and_then(|v| v.as_str()).map(|s| s.to_string()),
1010        decimals: obj.get("dec").and_then(|v| v.as_i64()).map(|v| v as i32),
1011        symbol: obj.get("s").and_then(|v| v.as_str()).map(|s| s.to_string()),
1012        image_url: obj
1013            .get("iu")
1014            .and_then(|v| v.as_str())
1015            .map(|s| s.to_string()),
1016        description: obj
1017            .get("de")
1018            .and_then(|v| v.as_str())
1019            .map(|s| s.to_string()),
1020        social_media: obj
1021            .get("sm")
1022            .and_then(|v| v.as_object())
1023            .map(parse_social_media),
1024        created_at_ms: obj.get("cts").and_then(|v| v.as_i64()),
1025        coingecko_coin_id: obj
1026            .get("cgi")
1027            .and_then(|v| v.as_str())
1028            .map(|s| s.to_string()),
1029        launch_from: obj
1030            .get("lf")
1031            .and_then(|v| v.as_object())
1032            .map(parse_dex_protocol),
1033        migrated_to: obj
1034            .get("mt")
1035            .and_then(|v| v.as_object())
1036            .map(parse_dex_protocol),
1037    }
1038}
1039
1040fn parse_new_token(data: &Value) -> Result<NewToken, String> {
1041    let obj = data
1042        .as_object()
1043        .ok_or_else(|| "expected object".to_string())?;
1044
1045    Ok(NewToken {
1046        token_address: get_string(obj, "a"),
1047        name: get_string(obj, "n"),
1048        symbol: get_string(obj, "s"),
1049        decimals: obj.get("dec").and_then(|v| v.as_i64()).map(|v| v as i32),
1050        image_url: obj
1051            .get("iu")
1052            .and_then(|v| v.as_str())
1053            .map(|s| s.to_string()),
1054        description: obj
1055            .get("de")
1056            .and_then(|v| v.as_str())
1057            .map(|s| s.to_string()),
1058        social_media: obj
1059            .get("sm")
1060            .and_then(|v| v.as_object())
1061            .map(parse_social_media),
1062        coingecko_coin_id: obj
1063            .get("cgi")
1064            .and_then(|v| v.as_str())
1065            .map(|s| s.to_string()),
1066        launch_from: obj
1067            .get("lf")
1068            .and_then(|v| v.as_object())
1069            .map(parse_dex_protocol),
1070        migrated_to: obj
1071            .get("mt")
1072            .and_then(|v| v.as_object())
1073            .map(parse_dex_protocol),
1074        created_at_ms: get_i64(obj, "cts"),
1075    })
1076}
1077
1078fn parse_trade_activity(data: &Value) -> Result<TradeActivity, String> {
1079    let obj = data
1080        .as_object()
1081        .ok_or_else(|| "expected object".to_string())?;
1082
1083    Ok(TradeActivity {
1084        token_address: get_string(obj, "a"),
1085        timestamp: get_i64(obj, "t"),
1086        kind: get_string(obj, "k"),
1087        buy_amount: get_string(obj, "ba"),
1088        buy_amount_in_usd: get_string(obj, "baiu"),
1089        buy_token_address: get_string(obj, "btma"),
1090        buy_token_name: get_string(obj, "btn"),
1091        buy_token_symbol: get_string(obj, "bts"),
1092        buy_wallet_address: get_string(obj, "bwa"),
1093        sell_amount: get_string(obj, "sa"),
1094        sell_amount_in_usd: get_string(obj, "saiu"),
1095        sell_token_address: get_string(obj, "stma"),
1096        sell_token_name: get_string(obj, "stn"),
1097        sell_token_symbol: get_string(obj, "sts"),
1098        sell_wallet_address: get_string(obj, "swa"),
1099        tx_hash: get_string(obj, "h"),
1100    })
1101}
1102
1103fn parse_prediction_activity(data: &Value) -> Result<PredictionActivity, String> {
1104    let root = data
1105        .as_object()
1106        .ok_or_else(|| "expected object".to_string())?;
1107    let obj = root.get("a").and_then(|v| v.as_object()).unwrap_or(root);
1108    let activity_type = PredictionActivityType::from_wire(&get_string(obj, "ty"))
1109        .ok_or_else(|| "invalid prediction activity type".to_string())?;
1110
1111    Ok(PredictionActivity {
1112        activity_id: get_string(obj, "id"),
1113        amount: get_string(obj, "amt"),
1114        asset_ids: get_string_vec_option(obj, "as"),
1115        block_number: get_u64(obj, "bn"),
1116        condition_id: get_string(obj, "cid"),
1117        event_slug: get_string(obj, "es"),
1118        log_index: get_u64(obj, "li"),
1119        market_icon: get_string(obj, "mi"),
1120        market_id: get_string(obj, "mid"),
1121        market_question: get_string(obj, "mq"),
1122        outcome: get_string(obj, "oc"),
1123        outcomes: get_string_vec_option(obj, "ocs"),
1124        price: get_string(obj, "p"),
1125        quantity: get_string(obj, "q"),
1126        seq_index: obj
1127            .get("seq")
1128            .and_then(|v| v.as_u64())
1129            .or_else(|| root.get("seq").and_then(|v| v.as_u64()))
1130            .unwrap_or_default(),
1131        source: get_string(obj, "src"),
1132        taker: get_string(obj, "tk"),
1133        taker_age: get_u64(obj, "ta"),
1134        taker_image: get_string(obj, "ti"),
1135        taker_name: get_string(obj, "tn"),
1136        taker_order_hash: get_string(obj, "toh"),
1137        taker_pseudonym: get_string(obj, "tp"),
1138        taker_tags: get_string_vec(obj, "tt"),
1139        timestamp: get_u64(obj, "ts"),
1140        token_id: get_string(obj, "tid"),
1141        tx_hash: get_string(obj, "tx"),
1142        activity_type,
1143    })
1144}
1145
1146fn get_string(obj: &serde_json::Map<String, Value>, key: &str) -> String {
1147    obj.get(key)
1148        .and_then(|v| v.as_str())
1149        .unwrap_or_default()
1150        .to_string()
1151}
1152
1153fn get_string_vec(obj: &serde_json::Map<String, Value>, key: &str) -> Vec<String> {
1154    obj.get(key)
1155        .and_then(|v| v.as_array())
1156        .map(|items| {
1157            items
1158                .iter()
1159                .filter_map(|v| v.as_str().map(|s| s.to_string()))
1160                .collect()
1161        })
1162        .unwrap_or_default()
1163}
1164
1165fn get_string_vec_option(obj: &serde_json::Map<String, Value>, key: &str) -> Option<Vec<String>> {
1166    obj.get(key).and_then(|v| {
1167        v.as_array().map(|items| {
1168            items
1169                .iter()
1170                .filter_map(|v| v.as_str().map(|s| s.to_string()))
1171                .collect()
1172        })
1173    })
1174}
1175
1176fn get_i64(obj: &serde_json::Map<String, Value>, key: &str) -> i64 {
1177    obj.get(key).and_then(|v| v.as_i64()).unwrap_or_default()
1178}
1179
1180fn get_u64(obj: &serde_json::Map<String, Value>, key: &str) -> u64 {
1181    obj.get(key).and_then(|v| v.as_u64()).unwrap_or_default()
1182}
1183
1184fn get_i32(obj: &serde_json::Map<String, Value>, key: &str) -> i32 {
1185    obj.get(key)
1186        .and_then(|v| v.as_i64())
1187        .map(|v| v as i32)
1188        .unwrap_or_default()
1189}