1use std::collections::HashMap;
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use std::sync::Arc;
8
9use futures_util::{SinkExt, StreamExt};
10use parking_lot::RwLock;
11use serde::{Deserialize, Serialize};
12use serde_json::Value;
13use tokio::sync::mpsc;
14use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
15
16use crate::error::ChainStreamError;
17use crate::openapi::types::Resolution;
18
19use super::fields::replace_filter_fields;
20use super::models::*;
21
22pub type StreamCallback<T> = Box<dyn Fn(T) + Send + Sync + 'static>;
24
25pub struct Unsubscribe {
27 channel: String,
28 callback_id: u64,
29 api: Arc<StreamApiInner>,
30}
31
32impl Unsubscribe {
33 pub fn unsubscribe(self) {
35 self.api.unsubscribe(&self.channel, self.callback_id);
36 }
37}
38
39#[derive(Debug, Serialize)]
41struct CentrifugeCommand {
42 id: u64,
43 #[serde(skip_serializing_if = "Option::is_none")]
44 connect: Option<ConnectRequest>,
45 #[serde(skip_serializing_if = "Option::is_none")]
46 subscribe: Option<SubscribeRequest>,
47 #[serde(skip_serializing_if = "Option::is_none")]
48 unsubscribe: Option<UnsubscribeRequest>,
49}
50
51#[derive(Debug, Serialize)]
52struct ConnectRequest {
53 token: String,
54}
55
56#[derive(Debug, Serialize)]
57struct SubscribeRequest {
58 channel: String,
59 #[serde(skip_serializing_if = "Option::is_none")]
60 delta: Option<String>,
61 #[serde(skip_serializing_if = "Option::is_none")]
62 filter: Option<String>,
63}
64
65#[derive(Debug, Serialize)]
66struct UnsubscribeRequest {
67 channel: String,
68}
69
70#[derive(Debug, Deserialize)]
72#[allow(dead_code)]
73struct CentrifugeResponse {
74 #[serde(default)]
75 id: u64,
76 #[serde(default)]
77 connect: Option<Value>,
78 #[serde(default)]
79 subscribe: Option<Value>,
80 #[serde(default)]
81 push: Option<PushData>,
82 #[serde(default)]
83 error: Option<ErrorData>,
84}
85
86#[derive(Debug, Deserialize)]
87struct PushData {
88 channel: String,
89 #[serde(rename = "pub")]
90 pub_data: Option<PublicationData>,
91}
92
93#[derive(Debug, Deserialize)]
94struct PublicationData {
95 data: Value,
96}
97
98#[derive(Debug, Deserialize)]
99struct ErrorData {
100 code: i32,
101 message: String,
102}
103
104struct CallbackWrapper {
106 id: u64,
107 callback: Box<dyn Fn(Value) + Send + Sync>,
108}
109
110struct StreamApiInner {
112 url: String,
113 access_token: String,
114 connected: AtomicBool,
115 command_id: AtomicU64,
116 callback_id: AtomicU64,
117 listeners: RwLock<HashMap<String, Vec<CallbackWrapper>>>,
118 subscriptions: RwLock<HashMap<String, u64>>,
119 command_tx: RwLock<Option<mpsc::UnboundedSender<Message>>>,
120}
121
122impl StreamApiInner {
123 fn new(url: String, access_token: String) -> Self {
124 Self {
125 url,
126 access_token,
127 connected: AtomicBool::new(false),
128 command_id: AtomicU64::new(1),
129 callback_id: AtomicU64::new(1),
130 listeners: RwLock::new(HashMap::new()),
131 subscriptions: RwLock::new(HashMap::new()),
132 command_tx: RwLock::new(None),
133 }
134 }
135
136 fn next_command_id(&self) -> u64 {
137 self.command_id.fetch_add(1, Ordering::SeqCst)
138 }
139
140 fn next_callback_id(&self) -> u64 {
141 self.callback_id.fetch_add(1, Ordering::SeqCst)
142 }
143
144 fn add_listener<F>(&self, channel: &str, callback: F) -> u64
145 where
146 F: Fn(Value) + Send + Sync + 'static,
147 {
148 let callback_id = self.next_callback_id();
149 let wrapper = CallbackWrapper {
150 id: callback_id,
151 callback: Box::new(callback),
152 };
153
154 let mut listeners = self.listeners.write();
155 listeners
156 .entry(channel.to_string())
157 .or_default()
158 .push(wrapper);
159
160 callback_id
161 }
162
163 fn unsubscribe(&self, channel: &str, callback_id: u64) {
164 let mut listeners = self.listeners.write();
165 if let Some(callbacks) = listeners.get_mut(channel) {
166 callbacks.retain(|c| c.id != callback_id);
167
168 if callbacks.is_empty() {
170 listeners.remove(channel);
171 drop(listeners);
172
173 if let Some(tx) = self.command_tx.read().as_ref() {
175 let cmd = CentrifugeCommand {
176 id: self.next_command_id(),
177 connect: None,
178 subscribe: None,
179 unsubscribe: Some(UnsubscribeRequest {
180 channel: channel.to_string(),
181 }),
182 };
183 if let Ok(json) = serde_json::to_string(&cmd) {
184 let _ = tx.send(Message::Text(json.into()));
185 }
186 }
187
188 self.subscriptions.write().remove(channel);
189 log::info!("[streaming] unsubscribed from channel: {}", channel);
190 }
191 }
192 }
193
194 fn dispatch_message(&self, channel: &str, data: Value) {
195 let listeners = self.listeners.read();
196 if let Some(callbacks) = listeners.get(channel) {
197 for callback in callbacks {
198 (callback.callback)(data.clone());
199 }
200 }
201 }
202
203 fn send_subscribe(&self, channel: &str, filter: Option<&str>) {
204 if let Some(tx) = self.command_tx.read().as_ref() {
205 let cmd = CentrifugeCommand {
206 id: self.next_command_id(),
207 connect: None,
208 subscribe: Some(SubscribeRequest {
209 channel: channel.to_string(),
210 delta: Some("fossil".to_string()),
211 filter: filter.map(|f| f.to_string()),
212 }),
213 unsubscribe: None,
214 };
215 if let Ok(json) = serde_json::to_string(&cmd) {
216 let _ = tx.send(Message::Text(json.into()));
217 }
218 }
219 }
220}
221
222pub struct StreamApi {
224 inner: Arc<StreamApiInner>,
225}
226
227impl StreamApi {
228 pub fn new(url: &str, access_token: &str) -> Self {
230 let url_with_token = if url.contains('?') {
232 format!("{}&token={}", url, access_token)
233 } else {
234 format!("{}?token={}", url, access_token)
235 };
236
237 Self {
238 inner: Arc::new(StreamApiInner::new(
239 url_with_token,
240 access_token.to_string(),
241 )),
242 }
243 }
244
245 pub fn is_connected(&self) -> bool {
247 self.inner.connected.load(Ordering::SeqCst)
248 }
249
250 pub async fn connect(&self) -> Result<(), ChainStreamError> {
252 if self.is_connected() {
253 return Ok(());
254 }
255
256 let url = &self.inner.url;
257 log::info!("[streaming] connecting to {}", url);
258
259 let (ws_stream, _) = connect_async(url)
260 .await
261 .map_err(|e| ChainStreamError::WebSocket(format!("Failed to connect: {}", e)))?;
262
263 let (mut write, mut read) = ws_stream.split();
264
265 let (tx, mut rx) = mpsc::unbounded_channel::<Message>();
267 *self.inner.command_tx.write() = Some(tx.clone());
268
269 let connect_cmd = CentrifugeCommand {
271 id: self.inner.next_command_id(),
272 connect: Some(ConnectRequest {
273 token: self.inner.access_token.clone(),
274 }),
275 subscribe: None,
276 unsubscribe: None,
277 };
278 let connect_json = serde_json::to_string(&connect_cmd)
279 .map_err(|e| ChainStreamError::Serialization(e.to_string()))?;
280 write
281 .send(Message::Text(connect_json.into()))
282 .await
283 .map_err(|e| ChainStreamError::WebSocket(format!("Failed to send connect: {}", e)))?;
284
285 self.inner.connected.store(true, Ordering::SeqCst);
286
287 let inner_write = self.inner.clone();
289 tokio::spawn(async move {
290 while let Some(msg) = rx.recv().await {
291 if write.send(msg).await.is_err() {
292 inner_write.connected.store(false, Ordering::SeqCst);
293 break;
294 }
295 }
296 });
297
298 let inner_read = self.inner.clone();
300 tokio::spawn(async move {
301 while let Some(msg) = read.next().await {
302 match msg {
303 Ok(Message::Text(text)) => {
304 if let Ok(response) = serde_json::from_str::<CentrifugeResponse>(&text) {
305 if let Some(push) = response.push {
307 if let Some(pub_data) = push.pub_data {
308 inner_read.dispatch_message(&push.channel, pub_data.data);
309 }
310 }
311 if let Some(err) = response.error {
313 log::error!(
314 "[streaming] error: code={}, message={}",
315 err.code,
316 err.message
317 );
318 }
319 }
320 }
321 Ok(Message::Close(_)) => {
322 log::info!("[streaming] connection closed");
323 inner_read.connected.store(false, Ordering::SeqCst);
324 break;
325 }
326 Ok(Message::Ping(data)) => {
327 if let Some(tx) = inner_read.command_tx.read().as_ref() {
328 let _ = tx.send(Message::Pong(data));
329 }
330 }
331 Err(e) => {
332 log::error!("[streaming] read error: {}", e);
333 inner_read.connected.store(false, Ordering::SeqCst);
334 break;
335 }
336 _ => {}
337 }
338 }
339 });
340
341 Ok(())
342 }
343
344 pub async fn disconnect(&self) {
346 if let Some(tx) = self.inner.command_tx.write().take() {
347 let _ = tx.send(Message::Close(None));
348 }
349 self.inner.connected.store(false, Ordering::SeqCst);
350 log::info!("[streaming] disconnected");
351 }
352
353 pub async fn subscribe<F>(
355 &self,
356 channel: &str,
357 callback: F,
358 filter: Option<&str>,
359 method_name: Option<&str>,
360 ) -> Result<Unsubscribe, ChainStreamError>
361 where
362 F: Fn(Value) + Send + Sync + 'static,
363 {
364 if !self.is_connected() {
366 self.connect().await?;
367 }
368
369 let processed_filter = match (filter, method_name) {
371 (Some(f), Some(m)) if !f.is_empty() => Some(replace_filter_fields(f, m)),
372 (Some(f), _) if !f.is_empty() => Some(f.to_string()),
373 _ => None,
374 };
375
376 let needs_subscribe = {
378 let subs = self.inner.subscriptions.read();
379 !subs.contains_key(channel)
380 };
381
382 let callback_id = self.inner.add_listener(channel, callback);
384
385 if needs_subscribe {
387 self.inner
388 .send_subscribe(channel, processed_filter.as_deref());
389 self.inner
390 .subscriptions
391 .write()
392 .insert(channel.to_string(), self.inner.next_command_id());
393 log::info!("[streaming] subscribed to channel: {}", channel);
394 }
395
396 Ok(Unsubscribe {
397 channel: channel.to_string(),
398 callback_id,
399 api: self.inner.clone(),
400 })
401 }
402
403 pub async fn subscribe_token_candles<F>(
408 &self,
409 chain: &str,
410 token_address: &str,
411 resolution: Resolution,
412 callback: F,
413 filter: Option<&str>,
414 price_type: Option<PriceType>,
415 ) -> Result<Unsubscribe, ChainStreamError>
416 where
417 F: Fn(Candle) + Send + Sync + 'static,
418 {
419 let prefix = match price_type.unwrap_or_default() {
420 PriceType::Native => "dex-candle-in-native",
421 PriceType::Usd => "dex-candle",
422 };
423 let channel = format!("{}:{}_{}_{}", prefix, chain, token_address, resolution);
424
425 self.subscribe(
426 &channel,
427 move |data| {
428 if let Ok(candle) = parse_candle(&data) {
429 callback(candle);
430 }
431 },
432 filter,
433 Some("subscribe_token_candles"),
434 )
435 .await
436 }
437
438 pub async fn subscribe_pool_candles<F>(
441 &self,
442 chain: &str,
443 pool_address: &str,
444 resolution: Resolution,
445 callback: F,
446 filter: Option<&str>,
447 price_type: Option<PriceType>,
448 ) -> Result<Unsubscribe, ChainStreamError>
449 where
450 F: Fn(Candle) + Send + Sync + 'static,
451 {
452 let prefix = match price_type.unwrap_or_default() {
453 PriceType::Native => "dex-pool-candle-in-native",
454 PriceType::Usd => "dex-pool-candle",
455 };
456 let channel = format!("{}:{}_{}_{}", prefix, chain, pool_address, resolution);
457
458 self.subscribe(
459 &channel,
460 move |data| {
461 if let Ok(candle) = parse_candle(&data) {
462 callback(candle);
463 }
464 },
465 filter,
466 Some("subscribe_pool_candles"),
467 )
468 .await
469 }
470
471 pub async fn subscribe_pair_candles<F>(
475 &self,
476 chain: &str,
477 pair_address: &str,
478 resolution: Resolution,
479 callback: F,
480 filter: Option<&str>,
481 price_type: Option<PriceType>,
482 ) -> Result<Unsubscribe, ChainStreamError>
483 where
484 F: Fn(Candle) + Send + Sync + 'static,
485 {
486 let prefix = match price_type.unwrap_or_default() {
487 PriceType::Native => "dex-pair-candle-in-native",
488 PriceType::Usd => "dex-pair-candle",
489 };
490 let channel = format!("{}:{}_{}_{}", prefix, chain, pair_address, resolution);
491
492 self.subscribe(
493 &channel,
494 move |data| {
495 if let Ok(candle) = parse_candle(&data) {
496 callback(candle);
497 }
498 },
499 filter,
500 Some("subscribe_pair_candles"),
501 )
502 .await
503 }
504
505 pub async fn subscribe_token_stats<F>(
507 &self,
508 chain: &str,
509 token_address: &str,
510 callback: F,
511 filter: Option<&str>,
512 ) -> Result<Unsubscribe, ChainStreamError>
513 where
514 F: Fn(TokenStat) + Send + Sync + 'static,
515 {
516 let channel = format!("dex-token-stats:{}_{}", chain, token_address);
517
518 self.subscribe(
519 &channel,
520 move |data| {
521 if let Ok(stat) = serde_json::from_value::<TokenStat>(data) {
522 callback(stat);
523 }
524 },
525 filter,
526 Some("subscribe_token_stats"),
527 )
528 .await
529 }
530
531 pub async fn subscribe_new_token<F>(
533 &self,
534 chain: &str,
535 callback: F,
536 filter: Option<&str>,
537 ) -> Result<Unsubscribe, ChainStreamError>
538 where
539 F: Fn(NewToken) + Send + Sync + 'static,
540 {
541 let channel = format!("dex-new-token:{}", chain);
542
543 self.subscribe(
544 &channel,
545 move |data| {
546 if let Ok(token) = parse_new_token(&data) {
547 callback(token);
548 }
549 },
550 filter,
551 Some("subscribe_new_token"),
552 )
553 .await
554 }
555
556 pub async fn subscribe_token_trade<F>(
558 &self,
559 chain: &str,
560 token_address: &str,
561 callback: F,
562 filter: Option<&str>,
563 ) -> Result<Unsubscribe, ChainStreamError>
564 where
565 F: Fn(TradeActivity) + Send + Sync + 'static,
566 {
567 let channel = format!("dex-trade:{}_{}", chain, token_address);
568
569 self.subscribe(
570 &channel,
571 move |data| {
572 if let Ok(trade) = parse_trade_activity(&data) {
573 callback(trade);
574 }
575 },
576 filter,
577 Some("subscribe_token_trades"),
578 )
579 .await
580 }
581
582 pub async fn subscribe_wallet_balance<F>(
584 &self,
585 chain: &str,
586 wallet_address: &str,
587 callback: F,
588 filter: Option<&str>,
589 ) -> Result<Unsubscribe, ChainStreamError>
590 where
591 F: Fn(WalletBalance) + Send + Sync + 'static,
592 {
593 let channel = format!("dex-wallet-balance:{}_{}", chain, wallet_address);
594
595 self.subscribe(
596 &channel,
597 move |data| {
598 if let Ok(balance) = serde_json::from_value::<WalletBalance>(data) {
599 callback(balance);
600 }
601 },
602 filter,
603 Some("subscribe_wallet_balance"),
604 )
605 .await
606 }
607
608 pub async fn subscribe_token_holders<F>(
610 &self,
611 chain: &str,
612 token_address: &str,
613 callback: F,
614 filter: Option<&str>,
615 ) -> Result<Unsubscribe, ChainStreamError>
616 where
617 F: Fn(TokenHolder) + Send + Sync + 'static,
618 {
619 let channel = format!("dex-token-holder:{}_{}", chain, token_address);
620
621 self.subscribe(
622 &channel,
623 move |data| {
624 if let Ok(holder) = serde_json::from_value::<TokenHolder>(data) {
625 callback(holder);
626 }
627 },
628 filter,
629 Some("subscribe_token_holders"),
630 )
631 .await
632 }
633
634 pub async fn subscribe_token_supply<F>(
636 &self,
637 chain: &str,
638 token_address: &str,
639 callback: F,
640 filter: Option<&str>,
641 ) -> Result<Unsubscribe, ChainStreamError>
642 where
643 F: Fn(TokenSupply) + Send + Sync + 'static,
644 {
645 let channel = format!("dex-token-supply:{}_{}", chain, token_address);
646
647 self.subscribe(
648 &channel,
649 move |data| {
650 if let Ok(supply) = serde_json::from_value::<TokenSupply>(data) {
651 callback(supply);
652 }
653 },
654 filter,
655 Some("subscribe_token_supply"),
656 )
657 .await
658 }
659
660 pub async fn subscribe_dex_pool_balance<F>(
662 &self,
663 chain: &str,
664 pool_address: &str,
665 callback: F,
666 filter: Option<&str>,
667 ) -> Result<Unsubscribe, ChainStreamError>
668 where
669 F: Fn(DexPoolBalance) + Send + Sync + 'static,
670 {
671 let channel = format!("dex-pool-balance:{}_{}", chain, pool_address);
672
673 self.subscribe(
674 &channel,
675 move |data| {
676 if let Ok(balance) = serde_json::from_value::<DexPoolBalance>(data) {
677 callback(balance);
678 }
679 },
680 filter,
681 Some("subscribe_dex_pool_balance"),
682 )
683 .await
684 }
685
686 pub async fn subscribe_token_max_liquidity<F>(
688 &self,
689 chain: &str,
690 token_address: &str,
691 callback: F,
692 filter: Option<&str>,
693 ) -> Result<Unsubscribe, ChainStreamError>
694 where
695 F: Fn(TokenMaxLiquidity) + Send + Sync + 'static,
696 {
697 let channel = format!("dex-token-max-liquidity:{}_{}", chain, token_address);
698
699 self.subscribe(
700 &channel,
701 move |data| {
702 if let Ok(liquidity) = serde_json::from_value::<TokenMaxLiquidity>(data) {
703 callback(liquidity);
704 }
705 },
706 filter,
707 Some("subscribe_token_max_liquidity"),
708 )
709 .await
710 }
711
712 pub async fn subscribe_token_total_liquidity<F>(
714 &self,
715 chain: &str,
716 token_address: &str,
717 callback: F,
718 filter: Option<&str>,
719 ) -> Result<Unsubscribe, ChainStreamError>
720 where
721 F: Fn(TokenTotalLiquidity) + Send + Sync + 'static,
722 {
723 let channel = format!("dex-token-total-liquidity:{}_{}", chain, token_address);
724
725 self.subscribe(
726 &channel,
727 move |data| {
728 if let Ok(liquidity) = serde_json::from_value::<TokenTotalLiquidity>(data) {
729 callback(liquidity);
730 }
731 },
732 filter,
733 Some("subscribe_token_total_liquidity"),
734 )
735 .await
736 }
737
738 pub async fn subscribe_wallet_pnl<F>(
740 &self,
741 chain: &str,
742 wallet_address: &str,
743 callback: F,
744 filter: Option<&str>,
745 ) -> Result<Unsubscribe, ChainStreamError>
746 where
747 F: Fn(WalletTokenPnl) + Send + Sync + 'static,
748 {
749 let channel = format!("dex-wallet-pnl:{}_{}", chain, wallet_address);
750
751 self.subscribe(
752 &channel,
753 move |data| {
754 if let Ok(pnl) = serde_json::from_value::<WalletTokenPnl>(data) {
755 callback(pnl);
756 }
757 },
758 filter,
759 Some("subscribe_wallet_pnl"),
760 )
761 .await
762 }
763
764 pub async fn subscribe_new_tokens_metadata<F>(
766 &self,
767 chain: &str,
768 callback: F,
769 filter: Option<&str>,
770 ) -> Result<Unsubscribe, ChainStreamError>
771 where
772 F: Fn(TokenMetadata) + Send + Sync + 'static,
773 {
774 let channel = format!("dex-new-token-metadata:{}", chain);
775
776 self.subscribe(
777 &channel,
778 move |data| {
779 if let Ok(metadata) = serde_json::from_value::<TokenMetadata>(data) {
780 callback(metadata);
781 }
782 },
783 filter,
784 Some("subscribe_new_tokens_metadata"),
785 )
786 .await
787 }
788
789 pub async fn subscribe_ranking_tokens_list<F>(
791 &self,
792 chain: &str,
793 ranking_type: RankingType,
794 callback: F,
795 filter: Option<&str>,
796 ) -> Result<Unsubscribe, ChainStreamError>
797 where
798 F: Fn(RankingTokenList) + Send + Sync + 'static,
799 {
800 let ranking_str = match ranking_type {
801 RankingType::New => "new",
802 RankingType::Hot => "trending",
803 RankingType::Stocks => "stocks",
804 RankingType::FinalStretch => "completed",
805 RankingType::Migrated => "graduated",
806 };
807 let channel = format!("dex-ranking-token-list:{}_{}", chain, ranking_str);
808
809 self.subscribe(
810 &channel,
811 move |data| {
812 if let Ok(ranking) = serde_json::from_value::<RankingTokenList>(data) {
813 callback(ranking);
814 }
815 },
816 filter,
817 None,
818 )
819 .await
820 }
821}
822
823fn parse_candle(data: &Value) -> Result<Candle, String> {
826 let obj = data
827 .as_object()
828 .ok_or_else(|| "expected object".to_string())?;
829
830 Ok(Candle {
831 address: get_string(obj, "a"),
832 open: get_string(obj, "o"),
833 close: get_string(obj, "c"),
834 high: get_string(obj, "h"),
835 low: get_string(obj, "l"),
836 volume: get_string(obj, "v"),
837 resolution: get_string(obj, "r"),
838 time: get_i64(obj, "t"),
839 number: get_i32(obj, "n"),
840 })
841}
842
843fn parse_new_token(data: &Value) -> Result<NewToken, String> {
844 let obj = data
845 .as_object()
846 .ok_or_else(|| "expected object".to_string())?;
847
848 Ok(NewToken {
849 token_address: get_string(obj, "a"),
850 name: get_string(obj, "n"),
851 symbol: get_string(obj, "s"),
852 decimals: obj.get("d").and_then(|v| v.as_i64()).map(|v| v as i32),
853 launch_from: None, created_at_ms: get_i64(obj, "cts"),
855 })
856}
857
858fn parse_trade_activity(data: &Value) -> Result<TradeActivity, String> {
859 let obj = data
860 .as_object()
861 .ok_or_else(|| "expected object".to_string())?;
862
863 Ok(TradeActivity {
864 token_address: get_string(obj, "a"),
865 timestamp: get_i64(obj, "t"),
866 kind: get_string(obj, "k"),
867 buy_amount: get_string(obj, "ba"),
868 buy_amount_in_usd: get_string(obj, "baiu"),
869 buy_token_address: get_string(obj, "btma"),
870 buy_token_name: get_string(obj, "btn"),
871 buy_token_symbol: get_string(obj, "bts"),
872 buy_wallet_address: get_string(obj, "bwa"),
873 sell_amount: get_string(obj, "sa"),
874 sell_amount_in_usd: get_string(obj, "saiu"),
875 sell_token_address: get_string(obj, "stma"),
876 sell_token_name: get_string(obj, "stn"),
877 sell_token_symbol: get_string(obj, "sts"),
878 sell_wallet_address: get_string(obj, "swa"),
879 tx_hash: get_string(obj, "h"),
880 })
881}
882
883fn get_string(obj: &serde_json::Map<String, Value>, key: &str) -> String {
884 obj.get(key)
885 .and_then(|v| v.as_str())
886 .unwrap_or_default()
887 .to_string()
888}
889
890fn get_i64(obj: &serde_json::Map<String, Value>, key: &str) -> i64 {
891 obj.get(key).and_then(|v| v.as_i64()).unwrap_or_default()
892}
893
894fn get_i32(obj: &serde_json::Map<String, Value>, key: &str) -> i32 {
895 obj.get(key)
896 .and_then(|v| v.as_i64())
897 .map(|v| v as i32)
898 .unwrap_or_default()
899}