1use std::collections::HashMap;
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use std::sync::Arc;
8
9use futures_util::{SinkExt, StreamExt};
10use parking_lot::RwLock;
11use serde::{Deserialize, Serialize};
12use serde_json::Value;
13use tokio::sync::mpsc;
14use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
15
16use crate::error::ChainStreamError;
17use crate::openapi::types::Resolution;
18
19use super::fields::replace_filter_fields;
20use super::models::*;
21
22pub type StreamCallback<T> = Box<dyn Fn(T) + Send + Sync + 'static>;
24
25pub struct Unsubscribe {
27 channel: String,
28 callback_id: u64,
29 api: Arc<StreamApiInner>,
30}
31
32impl Unsubscribe {
33 pub fn unsubscribe(self) {
35 self.api.unsubscribe(&self.channel, self.callback_id);
36 }
37}
38
39#[derive(Debug, Serialize)]
41struct CentrifugeCommand {
42 id: u64,
43 #[serde(skip_serializing_if = "Option::is_none")]
44 connect: Option<ConnectRequest>,
45 #[serde(skip_serializing_if = "Option::is_none")]
46 subscribe: Option<SubscribeRequest>,
47 #[serde(skip_serializing_if = "Option::is_none")]
48 unsubscribe: Option<UnsubscribeRequest>,
49}
50
51#[derive(Debug, Serialize)]
52struct ConnectRequest {
53 token: String,
54}
55
56#[derive(Debug, Serialize)]
57struct SubscribeRequest {
58 channel: String,
59 #[serde(skip_serializing_if = "Option::is_none")]
60 delta: Option<String>,
61 #[serde(skip_serializing_if = "Option::is_none")]
62 filter: Option<String>,
63}
64
65#[derive(Debug, Serialize)]
66struct UnsubscribeRequest {
67 channel: String,
68}
69
70#[derive(Debug, Deserialize)]
72#[allow(dead_code)]
73struct CentrifugeResponse {
74 #[serde(default)]
75 id: u64,
76 #[serde(default)]
77 connect: Option<Value>,
78 #[serde(default)]
79 subscribe: Option<Value>,
80 #[serde(default)]
81 push: Option<PushData>,
82 #[serde(default)]
83 error: Option<ErrorData>,
84}
85
86#[derive(Debug, Deserialize)]
87struct PushData {
88 channel: String,
89 #[serde(rename = "pub")]
90 pub_data: Option<PublicationData>,
91}
92
93#[derive(Debug, Deserialize)]
94struct PublicationData {
95 data: Value,
96}
97
98#[derive(Debug, Deserialize)]
99struct ErrorData {
100 code: i32,
101 message: String,
102}
103
104struct CallbackWrapper {
106 id: u64,
107 callback: Box<dyn Fn(Value) + Send + Sync>,
108}
109
110struct StreamApiInner {
112 url: String,
113 access_token: String,
114 connected: AtomicBool,
115 command_id: AtomicU64,
116 callback_id: AtomicU64,
117 listeners: RwLock<HashMap<String, Vec<CallbackWrapper>>>,
118 subscriptions: RwLock<HashMap<String, u64>>,
119 command_tx: RwLock<Option<mpsc::UnboundedSender<Message>>>,
120}
121
122impl StreamApiInner {
123 fn new(url: String, access_token: String) -> Self {
124 Self {
125 url,
126 access_token,
127 connected: AtomicBool::new(false),
128 command_id: AtomicU64::new(1),
129 callback_id: AtomicU64::new(1),
130 listeners: RwLock::new(HashMap::new()),
131 subscriptions: RwLock::new(HashMap::new()),
132 command_tx: RwLock::new(None),
133 }
134 }
135
136 fn next_command_id(&self) -> u64 {
137 self.command_id.fetch_add(1, Ordering::SeqCst)
138 }
139
140 fn next_callback_id(&self) -> u64 {
141 self.callback_id.fetch_add(1, Ordering::SeqCst)
142 }
143
144 fn add_listener<F>(&self, channel: &str, callback: F) -> u64
145 where
146 F: Fn(Value) + Send + Sync + 'static,
147 {
148 let callback_id = self.next_callback_id();
149 let wrapper = CallbackWrapper {
150 id: callback_id,
151 callback: Box::new(callback),
152 };
153
154 let mut listeners = self.listeners.write();
155 listeners
156 .entry(channel.to_string())
157 .or_default()
158 .push(wrapper);
159
160 callback_id
161 }
162
163 fn unsubscribe(&self, channel: &str, callback_id: u64) {
164 let mut listeners = self.listeners.write();
165 if let Some(callbacks) = listeners.get_mut(channel) {
166 callbacks.retain(|c| c.id != callback_id);
167
168 if callbacks.is_empty() {
170 listeners.remove(channel);
171 drop(listeners);
172
173 if let Some(tx) = self.command_tx.read().as_ref() {
175 let cmd = CentrifugeCommand {
176 id: self.next_command_id(),
177 connect: None,
178 subscribe: None,
179 unsubscribe: Some(UnsubscribeRequest {
180 channel: channel.to_string(),
181 }),
182 };
183 if let Ok(json) = serde_json::to_string(&cmd) {
184 let _ = tx.send(Message::Text(json.into()));
185 }
186 }
187
188 self.subscriptions.write().remove(channel);
189 log::info!("[streaming] unsubscribed from channel: {}", channel);
190 }
191 }
192 }
193
194 fn dispatch_message(&self, channel: &str, data: Value) {
195 let listeners = self.listeners.read();
196 if let Some(callbacks) = listeners.get(channel) {
197 for callback in callbacks {
198 (callback.callback)(data.clone());
199 }
200 }
201 }
202
203 fn send_subscribe(&self, channel: &str, filter: Option<&str>) {
204 if let Some(tx) = self.command_tx.read().as_ref() {
205 let cmd = CentrifugeCommand {
206 id: self.next_command_id(),
207 connect: None,
208 subscribe: Some(SubscribeRequest {
209 channel: channel.to_string(),
210 delta: Some("fossil".to_string()),
211 filter: filter.map(|f| f.to_string()),
212 }),
213 unsubscribe: None,
214 };
215 if let Ok(json) = serde_json::to_string(&cmd) {
216 let _ = tx.send(Message::Text(json.into()));
217 }
218 }
219 }
220}
221
222pub struct StreamApi {
224 inner: Arc<StreamApiInner>,
225}
226
227impl StreamApi {
228 pub fn new(url: &str, access_token: &str) -> Self {
230 let url_with_token = if url.contains('?') {
232 format!("{}&token={}", url, access_token)
233 } else {
234 format!("{}?token={}", url, access_token)
235 };
236
237 Self {
238 inner: Arc::new(StreamApiInner::new(
239 url_with_token,
240 access_token.to_string(),
241 )),
242 }
243 }
244
245 pub fn is_connected(&self) -> bool {
247 self.inner.connected.load(Ordering::SeqCst)
248 }
249
250 pub async fn connect(&self) -> Result<(), ChainStreamError> {
252 if self.is_connected() {
253 return Ok(());
254 }
255
256 let url = &self.inner.url;
257 log::info!("[streaming] connecting to {}", url);
258
259 let (ws_stream, _) = connect_async(url)
260 .await
261 .map_err(|e| ChainStreamError::WebSocket(format!("Failed to connect: {}", e)))?;
262
263 let (mut write, mut read) = ws_stream.split();
264
265 let (tx, mut rx) = mpsc::unbounded_channel::<Message>();
267 *self.inner.command_tx.write() = Some(tx.clone());
268
269 let connect_cmd = CentrifugeCommand {
271 id: self.inner.next_command_id(),
272 connect: Some(ConnectRequest {
273 token: self.inner.access_token.clone(),
274 }),
275 subscribe: None,
276 unsubscribe: None,
277 };
278 let connect_json = serde_json::to_string(&connect_cmd)
279 .map_err(|e| ChainStreamError::Serialization(e.to_string()))?;
280 write
281 .send(Message::Text(connect_json.into()))
282 .await
283 .map_err(|e| ChainStreamError::WebSocket(format!("Failed to send connect: {}", e)))?;
284
285 self.inner.connected.store(true, Ordering::SeqCst);
286
287 let inner_write = self.inner.clone();
289 tokio::spawn(async move {
290 while let Some(msg) = rx.recv().await {
291 if write.send(msg).await.is_err() {
292 inner_write.connected.store(false, Ordering::SeqCst);
293 break;
294 }
295 }
296 });
297
298 let inner_read = self.inner.clone();
300 tokio::spawn(async move {
301 while let Some(msg) = read.next().await {
302 match msg {
303 Ok(Message::Text(text)) => {
304 if let Ok(response) = serde_json::from_str::<CentrifugeResponse>(&text) {
305 if let Some(push) = response.push {
307 if let Some(pub_data) = push.pub_data {
308 inner_read.dispatch_message(&push.channel, pub_data.data);
309 }
310 }
311 if let Some(err) = response.error {
313 log::error!(
314 "[streaming] error: code={}, message={}",
315 err.code,
316 err.message
317 );
318 }
319 }
320 }
321 Ok(Message::Close(_)) => {
322 log::info!("[streaming] connection closed");
323 inner_read.connected.store(false, Ordering::SeqCst);
324 break;
325 }
326 Ok(Message::Ping(data)) => {
327 if let Some(tx) = inner_read.command_tx.read().as_ref() {
328 let _ = tx.send(Message::Pong(data));
329 }
330 }
331 Err(e) => {
332 log::error!("[streaming] read error: {}", e);
333 inner_read.connected.store(false, Ordering::SeqCst);
334 break;
335 }
336 _ => {}
337 }
338 }
339 });
340
341 Ok(())
342 }
343
344 pub async fn disconnect(&self) {
346 if let Some(tx) = self.inner.command_tx.write().take() {
347 let _ = tx.send(Message::Close(None));
348 }
349 self.inner.connected.store(false, Ordering::SeqCst);
350 log::info!("[streaming] disconnected");
351 }
352
353 pub async fn subscribe<F>(
355 &self,
356 channel: &str,
357 callback: F,
358 filter: Option<&str>,
359 method_name: Option<&str>,
360 ) -> Result<Unsubscribe, ChainStreamError>
361 where
362 F: Fn(Value) + Send + Sync + 'static,
363 {
364 if !self.is_connected() {
366 self.connect().await?;
367 }
368
369 let processed_filter = match (filter, method_name) {
371 (Some(f), Some(m)) if !f.is_empty() => Some(replace_filter_fields(f, m)),
372 (Some(f), _) if !f.is_empty() => Some(f.to_string()),
373 _ => None,
374 };
375
376 let needs_subscribe = {
378 let subs = self.inner.subscriptions.read();
379 !subs.contains_key(channel)
380 };
381
382 let callback_id = self.inner.add_listener(channel, callback);
384
385 if needs_subscribe {
387 self.inner
388 .send_subscribe(channel, processed_filter.as_deref());
389 self.inner
390 .subscriptions
391 .write()
392 .insert(channel.to_string(), self.inner.next_command_id());
393 log::info!("[streaming] subscribed to channel: {}", channel);
394 }
395
396 Ok(Unsubscribe {
397 channel: channel.to_string(),
398 callback_id,
399 api: self.inner.clone(),
400 })
401 }
402
403 pub async fn subscribe_token_candles<F>(
408 &self,
409 chain: &str,
410 token_address: &str,
411 resolution: Resolution,
412 callback: F,
413 filter: Option<&str>,
414 price_type: Option<PriceType>,
415 ) -> Result<Unsubscribe, ChainStreamError>
416 where
417 F: Fn(Candle) + Send + Sync + 'static,
418 {
419 let prefix = match price_type.unwrap_or_default() {
420 PriceType::Native => "dex-candle-in-native",
421 PriceType::Usd => "dex-candle",
422 };
423 let channel = format!("{}:{}_{}_{}", prefix, chain, token_address, resolution);
424
425 self.subscribe(
426 &channel,
427 move |data| {
428 if let Ok(candle) = parse_candle(&data) {
429 callback(candle);
430 }
431 },
432 filter,
433 Some("subscribe_token_candles"),
434 )
435 .await
436 }
437
438 pub async fn subscribe_pool_candles<F>(
441 &self,
442 chain: &str,
443 pool_address: &str,
444 resolution: Resolution,
445 callback: F,
446 filter: Option<&str>,
447 price_type: Option<PriceType>,
448 ) -> Result<Unsubscribe, ChainStreamError>
449 where
450 F: Fn(Candle) + Send + Sync + 'static,
451 {
452 let prefix = match price_type.unwrap_or_default() {
453 PriceType::Native => "dex-pool-candle-in-native",
454 PriceType::Usd => "dex-pool-candle",
455 };
456 let channel = format!("{}:{}_{}_{}", prefix, chain, pool_address, resolution);
457
458 self.subscribe(
459 &channel,
460 move |data| {
461 if let Ok(candle) = parse_candle(&data) {
462 callback(candle);
463 }
464 },
465 filter,
466 Some("subscribe_pool_candles"),
467 )
468 .await
469 }
470
471 pub async fn subscribe_pair_candles<F>(
475 &self,
476 chain: &str,
477 pair_address: &str,
478 resolution: Resolution,
479 callback: F,
480 filter: Option<&str>,
481 price_type: Option<PriceType>,
482 ) -> Result<Unsubscribe, ChainStreamError>
483 where
484 F: Fn(Candle) + Send + Sync + 'static,
485 {
486 let prefix = match price_type.unwrap_or_default() {
487 PriceType::Native => "dex-pair-candle-in-native",
488 PriceType::Usd => "dex-pair-candle",
489 };
490 let channel = format!("{}:{}_{}_{}", prefix, chain, pair_address, resolution);
491
492 self.subscribe(
493 &channel,
494 move |data| {
495 if let Ok(candle) = parse_candle(&data) {
496 callback(candle);
497 }
498 },
499 filter,
500 Some("subscribe_pair_candles"),
501 )
502 .await
503 }
504
505 pub async fn subscribe_token_stats<F>(
507 &self,
508 chain: &str,
509 token_address: &str,
510 callback: F,
511 filter: Option<&str>,
512 ) -> Result<Unsubscribe, ChainStreamError>
513 where
514 F: Fn(TokenStat) + Send + Sync + 'static,
515 {
516 let channel = format!("dex-token-stats:{}_{}", chain, token_address);
517
518 self.subscribe(
519 &channel,
520 move |data| {
521 if let Ok(stat) = serde_json::from_value::<TokenStat>(data) {
522 callback(stat);
523 }
524 },
525 filter,
526 Some("subscribe_token_stats"),
527 )
528 .await
529 }
530
531 pub async fn subscribe_new_token<F>(
533 &self,
534 chain: &str,
535 callback: F,
536 filter: Option<&str>,
537 ) -> Result<Unsubscribe, ChainStreamError>
538 where
539 F: Fn(NewToken) + Send + Sync + 'static,
540 {
541 let channel = format!("dex-new-token:{}", chain);
542
543 self.subscribe(
544 &channel,
545 move |data| {
546 if let Ok(token) = parse_new_token(&data) {
547 callback(token);
548 }
549 },
550 filter,
551 Some("subscribe_new_token"),
552 )
553 .await
554 }
555
556 pub async fn subscribe_token_trade<F>(
558 &self,
559 chain: &str,
560 token_address: &str,
561 callback: F,
562 filter: Option<&str>,
563 ) -> Result<Unsubscribe, ChainStreamError>
564 where
565 F: Fn(TradeActivity) + Send + Sync + 'static,
566 {
567 let channel = format!("dex-trade:{}_{}", chain, token_address);
568
569 self.subscribe(
570 &channel,
571 move |data| {
572 if let Ok(trade) = parse_trade_activity(&data) {
573 callback(trade);
574 }
575 },
576 filter,
577 Some("subscribe_token_trades"),
578 )
579 .await
580 }
581
582 pub async fn subscribe_prediction_event_activities<F>(
584 &self,
585 event_slug: &str,
586 callback: F,
587 filter: Option<&str>,
588 ) -> Result<Unsubscribe, ChainStreamError>
589 where
590 F: Fn(PredictionActivity) + Send + Sync + 'static,
591 {
592 let channel = prediction_activity_channel(PredictionActivityChannelKind::Event, event_slug);
593
594 self.subscribe(
595 &channel,
596 move |data| {
597 if let Ok(activity) = parse_prediction_activity(&data) {
598 callback(activity);
599 }
600 },
601 filter,
602 Some("subscribe_prediction_event_activities"),
603 )
604 .await
605 }
606
607 pub async fn subscribe_prediction_token_activities<F>(
609 &self,
610 token_id: &str,
611 callback: F,
612 filter: Option<&str>,
613 ) -> Result<Unsubscribe, ChainStreamError>
614 where
615 F: Fn(PredictionActivity) + Send + Sync + 'static,
616 {
617 let channel = prediction_activity_channel(PredictionActivityChannelKind::Token, token_id);
618
619 self.subscribe(
620 &channel,
621 move |data| {
622 if let Ok(activity) = parse_prediction_activity(&data) {
623 callback(activity);
624 }
625 },
626 filter,
627 Some("subscribe_prediction_token_activities"),
628 )
629 .await
630 }
631
632 pub async fn subscribe_wallet_balance<F>(
634 &self,
635 chain: &str,
636 wallet_address: &str,
637 callback: F,
638 filter: Option<&str>,
639 ) -> Result<Unsubscribe, ChainStreamError>
640 where
641 F: Fn(WalletBalance) + Send + Sync + 'static,
642 {
643 let channel = format!("dex-wallet-balance:{}_{}", chain, wallet_address);
644
645 self.subscribe(
646 &channel,
647 move |data| {
648 if let Ok(balance) = serde_json::from_value::<WalletBalance>(data) {
649 callback(balance);
650 }
651 },
652 filter,
653 Some("subscribe_wallet_balance"),
654 )
655 .await
656 }
657
658 pub async fn subscribe_token_holders<F>(
660 &self,
661 chain: &str,
662 token_address: &str,
663 callback: F,
664 filter: Option<&str>,
665 ) -> Result<Unsubscribe, ChainStreamError>
666 where
667 F: Fn(TokenHolder) + Send + Sync + 'static,
668 {
669 let channel = format!("dex-token-holder:{}_{}", chain, token_address);
670
671 self.subscribe(
672 &channel,
673 move |data| {
674 if let Ok(holder) = serde_json::from_value::<TokenHolder>(data) {
675 callback(holder);
676 }
677 },
678 filter,
679 Some("subscribe_token_holders"),
680 )
681 .await
682 }
683
684 pub async fn subscribe_token_supply<F>(
686 &self,
687 chain: &str,
688 token_address: &str,
689 callback: F,
690 filter: Option<&str>,
691 ) -> Result<Unsubscribe, ChainStreamError>
692 where
693 F: Fn(TokenSupply) + Send + Sync + 'static,
694 {
695 let channel = format!("dex-token-supply:{}_{}", chain, token_address);
696
697 self.subscribe(
698 &channel,
699 move |data| {
700 if let Ok(supply) = serde_json::from_value::<TokenSupply>(data) {
701 callback(supply);
702 }
703 },
704 filter,
705 Some("subscribe_token_supply"),
706 )
707 .await
708 }
709
710 pub async fn subscribe_dex_pool_balance<F>(
712 &self,
713 chain: &str,
714 pool_address: &str,
715 callback: F,
716 filter: Option<&str>,
717 ) -> Result<Unsubscribe, ChainStreamError>
718 where
719 F: Fn(DexPoolBalance) + Send + Sync + 'static,
720 {
721 let channel = format!("dex-pool-balance:{}_{}", chain, pool_address);
722
723 self.subscribe(
724 &channel,
725 move |data| {
726 if let Ok(balance) = serde_json::from_value::<DexPoolBalance>(data) {
727 callback(balance);
728 }
729 },
730 filter,
731 Some("subscribe_dex_pool_balance"),
732 )
733 .await
734 }
735
736 pub async fn subscribe_token_max_liquidity<F>(
738 &self,
739 chain: &str,
740 token_address: &str,
741 callback: F,
742 filter: Option<&str>,
743 ) -> Result<Unsubscribe, ChainStreamError>
744 where
745 F: Fn(TokenMaxLiquidity) + Send + Sync + 'static,
746 {
747 let channel = format!("dex-token-max-liquidity:{}_{}", chain, token_address);
748
749 self.subscribe(
750 &channel,
751 move |data| {
752 if let Ok(liquidity) = serde_json::from_value::<TokenMaxLiquidity>(data) {
753 callback(liquidity);
754 }
755 },
756 filter,
757 Some("subscribe_token_max_liquidity"),
758 )
759 .await
760 }
761
762 pub async fn subscribe_token_total_liquidity<F>(
764 &self,
765 chain: &str,
766 token_address: &str,
767 callback: F,
768 filter: Option<&str>,
769 ) -> Result<Unsubscribe, ChainStreamError>
770 where
771 F: Fn(TokenTotalLiquidity) + Send + Sync + 'static,
772 {
773 let channel = format!("dex-token-total-liquidity:{}_{}", chain, token_address);
774
775 self.subscribe(
776 &channel,
777 move |data| {
778 if let Ok(liquidity) = serde_json::from_value::<TokenTotalLiquidity>(data) {
779 callback(liquidity);
780 }
781 },
782 filter,
783 Some("subscribe_token_total_liquidity"),
784 )
785 .await
786 }
787
788 pub async fn subscribe_wallet_pnl<F>(
790 &self,
791 chain: &str,
792 wallet_address: &str,
793 callback: F,
794 filter: Option<&str>,
795 ) -> Result<Unsubscribe, ChainStreamError>
796 where
797 F: Fn(WalletTokenPnl) + Send + Sync + 'static,
798 {
799 let channel = format!("dex-wallet-pnl:{}_{}", chain, wallet_address);
800
801 self.subscribe(
802 &channel,
803 move |data| {
804 if let Ok(pnl) = serde_json::from_value::<WalletTokenPnl>(data) {
805 callback(pnl);
806 }
807 },
808 filter,
809 Some("subscribe_wallet_pnl"),
810 )
811 .await
812 }
813
814 pub async fn subscribe_new_tokens_metadata<F>(
818 &self,
819 chain: &str,
820 callback: F,
821 ) -> Result<Unsubscribe, ChainStreamError>
822 where
823 F: Fn(Vec<TokenMetadata>) + Send + Sync + 'static,
824 {
825 let channel = format!("dex-new-tokens-metadata:{}", chain);
826
827 self.subscribe(
828 &channel,
829 move |data| {
830 if let Some(arr) = data.as_array() {
831 let result: Vec<TokenMetadata> = arr
832 .iter()
833 .filter_map(|item| item.as_object().map(parse_token_metadata))
834 .collect();
835 callback(result);
836 }
837 },
838 None,
839 Some("subscribe_new_tokens_metadata"),
840 )
841 .await
842 }
843
844 pub async fn subscribe_new_tokens<F>(
848 &self,
849 chain: &str,
850 callback: F,
851 ) -> Result<Unsubscribe, ChainStreamError>
852 where
853 F: Fn(Vec<TokenMetadata>) + Send + Sync + 'static,
854 {
855 let channel = format!("dex-new-tokens:{}", chain);
856
857 self.subscribe(
858 &channel,
859 move |data| {
860 if let Some(arr) = data.as_array() {
861 let result: Vec<TokenMetadata> = arr
862 .iter()
863 .filter_map(|item| item.as_object().map(parse_token_metadata))
864 .collect();
865 callback(result);
866 }
867 },
868 None,
869 Some("subscribe_new_tokens"),
870 )
871 .await
872 }
873
874 pub async fn subscribe_ranking_tokens_list<F>(
876 &self,
877 chain: &str,
878 ranking_type: RankingType,
879 callback: F,
880 filter: Option<&str>,
881 ) -> Result<Unsubscribe, ChainStreamError>
882 where
883 F: Fn(RankingTokenList) + Send + Sync + 'static,
884 {
885 let ranking_str = match ranking_type {
886 RankingType::New => "new",
887 RankingType::Hot => "trending",
888 RankingType::Stocks => "stocks",
889 RankingType::FinalStretch => "completed",
890 RankingType::Migrated => "graduated",
891 };
892 let channel = format!("dex-ranking-token-list:{}_{}", chain, ranking_str);
893
894 self.subscribe(
895 &channel,
896 move |data| {
897 if let Ok(ranking) = serde_json::from_value::<RankingTokenList>(data) {
898 callback(ranking);
899 }
900 },
901 filter,
902 None,
903 )
904 .await
905 }
906}
907
908fn prediction_activity_channel(kind: PredictionActivityChannelKind, key: &str) -> String {
911 match kind {
912 PredictionActivityChannelKind::Event => format!("pred:evt:{key}:act"),
913 PredictionActivityChannelKind::Token => format!("pred:tok:{key}:act"),
914 }
915}
916
917fn parse_candle(data: &Value) -> Result<Candle, String> {
918 let obj = data
919 .as_object()
920 .ok_or_else(|| "expected object".to_string())?;
921
922 Ok(Candle {
923 address: get_string(obj, "a"),
924 open: get_string(obj, "o"),
925 close: get_string(obj, "c"),
926 high: get_string(obj, "h"),
927 low: get_string(obj, "l"),
928 volume: get_string(obj, "v"),
929 resolution: get_string(obj, "r"),
930 time: get_i64(obj, "t"),
931 number: get_i32(obj, "n"),
932 })
933}
934
935fn parse_social_media(obj: &serde_json::Map<String, Value>) -> SocialMedia {
936 SocialMedia {
937 twitter: obj
938 .get("tw")
939 .and_then(|v| v.as_str())
940 .map(|s| s.to_string()),
941 telegram: obj
942 .get("tg")
943 .and_then(|v| v.as_str())
944 .map(|s| s.to_string()),
945 website: obj.get("w").and_then(|v| v.as_str()).map(|s| s.to_string()),
946 tiktok: obj
947 .get("tt")
948 .and_then(|v| v.as_str())
949 .map(|s| s.to_string()),
950 discord: obj
951 .get("dc")
952 .and_then(|v| v.as_str())
953 .map(|s| s.to_string()),
954 facebook: obj
955 .get("fb")
956 .and_then(|v| v.as_str())
957 .map(|s| s.to_string()),
958 github: obj
959 .get("gh")
960 .and_then(|v| v.as_str())
961 .map(|s| s.to_string()),
962 instagram: obj
963 .get("ig")
964 .and_then(|v| v.as_str())
965 .map(|s| s.to_string()),
966 linkedin: obj
967 .get("li")
968 .and_then(|v| v.as_str())
969 .map(|s| s.to_string()),
970 medium: obj
971 .get("md")
972 .and_then(|v| v.as_str())
973 .map(|s| s.to_string()),
974 reddit: obj
975 .get("rd")
976 .and_then(|v| v.as_str())
977 .map(|s| s.to_string()),
978 youtube: obj
979 .get("yt")
980 .and_then(|v| v.as_str())
981 .map(|s| s.to_string()),
982 bitbucket: obj
983 .get("bb")
984 .and_then(|v| v.as_str())
985 .map(|s| s.to_string()),
986 }
987}
988
989fn parse_dex_protocol(obj: &serde_json::Map<String, Value>) -> DexProtocol {
990 DexProtocol {
991 program_address: obj
992 .get("pa")
993 .and_then(|v| v.as_str())
994 .map(|s| s.to_string()),
995 protocol_family: obj
996 .get("pf")
997 .and_then(|v| v.as_str())
998 .map(|s| s.to_string()),
999 protocol_name: obj
1000 .get("pn")
1001 .and_then(|v| v.as_str())
1002 .map(|s| s.to_string()),
1003 }
1004}
1005
1006fn parse_token_metadata(obj: &serde_json::Map<String, Value>) -> TokenMetadata {
1007 TokenMetadata {
1008 token_address: get_string(obj, "a"),
1009 name: obj.get("n").and_then(|v| v.as_str()).map(|s| s.to_string()),
1010 decimals: obj.get("dec").and_then(|v| v.as_i64()).map(|v| v as i32),
1011 symbol: obj.get("s").and_then(|v| v.as_str()).map(|s| s.to_string()),
1012 image_url: obj
1013 .get("iu")
1014 .and_then(|v| v.as_str())
1015 .map(|s| s.to_string()),
1016 description: obj
1017 .get("de")
1018 .and_then(|v| v.as_str())
1019 .map(|s| s.to_string()),
1020 social_media: obj
1021 .get("sm")
1022 .and_then(|v| v.as_object())
1023 .map(parse_social_media),
1024 created_at_ms: obj.get("cts").and_then(|v| v.as_i64()),
1025 coingecko_coin_id: obj
1026 .get("cgi")
1027 .and_then(|v| v.as_str())
1028 .map(|s| s.to_string()),
1029 launch_from: obj
1030 .get("lf")
1031 .and_then(|v| v.as_object())
1032 .map(parse_dex_protocol),
1033 migrated_to: obj
1034 .get("mt")
1035 .and_then(|v| v.as_object())
1036 .map(parse_dex_protocol),
1037 }
1038}
1039
1040fn parse_new_token(data: &Value) -> Result<NewToken, String> {
1041 let obj = data
1042 .as_object()
1043 .ok_or_else(|| "expected object".to_string())?;
1044
1045 Ok(NewToken {
1046 token_address: get_string(obj, "a"),
1047 name: get_string(obj, "n"),
1048 symbol: get_string(obj, "s"),
1049 decimals: obj.get("dec").and_then(|v| v.as_i64()).map(|v| v as i32),
1050 image_url: obj
1051 .get("iu")
1052 .and_then(|v| v.as_str())
1053 .map(|s| s.to_string()),
1054 description: obj
1055 .get("de")
1056 .and_then(|v| v.as_str())
1057 .map(|s| s.to_string()),
1058 social_media: obj
1059 .get("sm")
1060 .and_then(|v| v.as_object())
1061 .map(parse_social_media),
1062 coingecko_coin_id: obj
1063 .get("cgi")
1064 .and_then(|v| v.as_str())
1065 .map(|s| s.to_string()),
1066 launch_from: obj
1067 .get("lf")
1068 .and_then(|v| v.as_object())
1069 .map(parse_dex_protocol),
1070 migrated_to: obj
1071 .get("mt")
1072 .and_then(|v| v.as_object())
1073 .map(parse_dex_protocol),
1074 created_at_ms: get_i64(obj, "cts"),
1075 })
1076}
1077
1078fn parse_trade_activity(data: &Value) -> Result<TradeActivity, String> {
1079 let obj = data
1080 .as_object()
1081 .ok_or_else(|| "expected object".to_string())?;
1082
1083 Ok(TradeActivity {
1084 token_address: get_string(obj, "a"),
1085 timestamp: get_i64(obj, "t"),
1086 kind: get_string(obj, "k"),
1087 buy_amount: get_string(obj, "ba"),
1088 buy_amount_in_usd: get_string(obj, "baiu"),
1089 buy_token_address: get_string(obj, "btma"),
1090 buy_token_name: get_string(obj, "btn"),
1091 buy_token_symbol: get_string(obj, "bts"),
1092 buy_wallet_address: get_string(obj, "bwa"),
1093 sell_amount: get_string(obj, "sa"),
1094 sell_amount_in_usd: get_string(obj, "saiu"),
1095 sell_token_address: get_string(obj, "stma"),
1096 sell_token_name: get_string(obj, "stn"),
1097 sell_token_symbol: get_string(obj, "sts"),
1098 sell_wallet_address: get_string(obj, "swa"),
1099 tx_hash: get_string(obj, "h"),
1100 })
1101}
1102
1103fn parse_prediction_activity(data: &Value) -> Result<PredictionActivity, String> {
1104 let root = data
1105 .as_object()
1106 .ok_or_else(|| "expected object".to_string())?;
1107 let obj = root.get("a").and_then(|v| v.as_object()).unwrap_or(root);
1108 let activity_type = PredictionActivityType::from_wire(&get_string(obj, "ty"))
1109 .ok_or_else(|| "invalid prediction activity type".to_string())?;
1110
1111 Ok(PredictionActivity {
1112 activity_id: get_string(obj, "id"),
1113 amount: get_string(obj, "amt"),
1114 asset_ids: get_string_vec_option(obj, "as"),
1115 block_number: get_u64(obj, "bn"),
1116 condition_id: get_string(obj, "cid"),
1117 event_slug: get_string(obj, "es"),
1118 log_index: get_u64(obj, "li"),
1119 market_icon: get_string(obj, "mi"),
1120 market_id: get_string(obj, "mid"),
1121 market_question: get_string(obj, "mq"),
1122 outcome: get_string(obj, "oc"),
1123 outcomes: get_string_vec_option(obj, "ocs"),
1124 price: get_string(obj, "p"),
1125 quantity: get_string(obj, "q"),
1126 seq_index: obj
1127 .get("seq")
1128 .and_then(|v| v.as_u64())
1129 .or_else(|| root.get("seq").and_then(|v| v.as_u64()))
1130 .unwrap_or_default(),
1131 source: get_string(obj, "src"),
1132 taker: get_string(obj, "tk"),
1133 taker_age: get_u64(obj, "ta"),
1134 taker_image: get_string(obj, "ti"),
1135 taker_name: get_string(obj, "tn"),
1136 taker_order_hash: get_string(obj, "toh"),
1137 taker_pseudonym: get_string(obj, "tp"),
1138 taker_tags: get_string_vec(obj, "tt"),
1139 timestamp: get_u64(obj, "ts"),
1140 token_id: get_string(obj, "tid"),
1141 tx_hash: get_string(obj, "tx"),
1142 activity_type,
1143 })
1144}
1145
1146fn get_string(obj: &serde_json::Map<String, Value>, key: &str) -> String {
1147 obj.get(key)
1148 .and_then(|v| v.as_str())
1149 .unwrap_or_default()
1150 .to_string()
1151}
1152
1153fn get_string_vec(obj: &serde_json::Map<String, Value>, key: &str) -> Vec<String> {
1154 obj.get(key)
1155 .and_then(|v| v.as_array())
1156 .map(|items| {
1157 items
1158 .iter()
1159 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1160 .collect()
1161 })
1162 .unwrap_or_default()
1163}
1164
1165fn get_string_vec_option(obj: &serde_json::Map<String, Value>, key: &str) -> Option<Vec<String>> {
1166 obj.get(key).and_then(|v| {
1167 v.as_array().map(|items| {
1168 items
1169 .iter()
1170 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1171 .collect()
1172 })
1173 })
1174}
1175
1176fn get_i64(obj: &serde_json::Map<String, Value>, key: &str) -> i64 {
1177 obj.get(key).and_then(|v| v.as_i64()).unwrap_or_default()
1178}
1179
1180fn get_u64(obj: &serde_json::Map<String, Value>, key: &str) -> u64 {
1181 obj.get(key).and_then(|v| v.as_u64()).unwrap_or_default()
1182}
1183
1184fn get_i32(obj: &serde_json::Map<String, Value>, key: &str) -> i32 {
1185 obj.get(key)
1186 .and_then(|v| v.as_i64())
1187 .map(|v| v as i32)
1188 .unwrap_or_default()
1189}