1use std::sync::Arc;
2
3use celestia_rpc::{HeaderClient, StateClient};
4
5use crate::Error;
6use crate::client::ClientInner;
7use crate::proto::cosmos::bank::v1beta1::MsgSend;
8use crate::proto::cosmos::staking::v1beta1::{
9 MsgBeginRedelegate, MsgCancelUnbondingDelegation, MsgDelegate, MsgUndelegate,
10};
11use crate::tx::{GasEstimate, IntoProtobufAny, TxConfig, TxInfo, TxPriority};
12use crate::types::Blob;
13use crate::types::state::{
14 AccAddress, Address, Coin, PageRequest, QueryDelegationResponse, QueryRedelegationsResponse,
15 QueryUnbondingDelegationResponse, ValAddress,
16};
17use crate::utils::height_i64;
18
19pub type AsyncGrpcCall<Response> = celestia_grpc::grpc::AsyncGrpcCall<Response, crate::Error>;
21
22pub struct StateApi {
24 inner: Arc<ClientInner>,
25}
26
27impl StateApi {
28 pub(crate) fn new(inner: Arc<ClientInner>) -> StateApi {
29 StateApi { inner }
30 }
31
32 pub fn balance(&self) -> AsyncGrpcCall<u64> {
42 let this = StateApi::new(self.inner.clone());
43
44 AsyncGrpcCall::new(move |context| async move {
45 let address = this.inner.address()?;
46 this.balance_for_address(&address).context(&context).await
47 })
48 }
49
50 pub fn balance_unverified(&self) -> AsyncGrpcCall<u64> {
53 let this = StateApi::new(self.inner.clone());
54
55 AsyncGrpcCall::new(move |context| async move {
56 let address = this.inner.address()?;
57 this.balance_for_address_unverified(&address)
58 .context(&context)
59 .await
60 })
61 }
62
63 pub fn balance_for_address(&self, address: &AccAddress) -> AsyncGrpcCall<u64> {
75 let inner = self.inner.clone();
76 let address = Address::AccAddress(address.to_owned());
77
78 AsyncGrpcCall::new(move |context| async move {
79 let grpc = match inner.grpc() {
80 Ok(grpc) => grpc,
81 Err(_) => {
82 return Ok(inner.rpc.state_balance_for_address(address).await?.amount());
83 }
84 };
85
86 let head = inner.rpc.header_network_head().await?;
87 head.validate()?;
88
89 Ok(grpc
90 .get_verified_balance(&address, &head)
91 .context(&context)
92 .await?
93 .amount())
94 })
95 }
96
97 pub fn balance_for_address_unverified(&self, address: &AccAddress) -> AsyncGrpcCall<u64> {
99 let inner = self.inner.clone();
100 let address = address.to_owned().into();
101
102 AsyncGrpcCall::new(move |context| async move {
103 Ok(inner
104 .grpc()?
105 .get_balance(&address, "utia")
106 .context(&context)
107 .await
108 .map(|res| res.amount())?)
109 })
110 }
111
112 pub fn estimate_gas_price(&self, priority: TxPriority) -> AsyncGrpcCall<f64> {
118 let inner = self.inner.clone();
119
120 AsyncGrpcCall::new(move |context| async move {
121 Ok(inner
122 .grpc()?
123 .estimate_gas_price(priority)
124 .context(&context)
125 .await?)
126 })
127 }
128
129 pub fn estimate_gas_price_and_usage(
138 &self,
139 priority: TxPriority,
140 tx_bytes: Vec<u8>,
141 ) -> AsyncGrpcCall<GasEstimate> {
142 let inner = self.inner.clone();
143
144 AsyncGrpcCall::new(move |context| async move {
145 Ok(inner
146 .grpc()?
147 .estimate_gas_price_and_usage(priority, tx_bytes)
148 .context(&context)
149 .await?)
150 })
151 }
152
153 pub fn submit_message<M>(&self, message: M, cfg: TxConfig) -> AsyncGrpcCall<TxInfo>
184 where
185 M: IntoProtobufAny + Send + 'static,
186 {
187 let inner = self.inner.clone();
188
189 AsyncGrpcCall::new(move |context| async move {
190 Ok(inner
191 .grpc()?
192 .submit_message(message, cfg)
193 .context(&context)
194 .await?)
195 })
196 }
197
198 pub fn transfer(
200 &self,
201 to_address: &AccAddress,
202 amount: u64,
203 cfg: TxConfig,
204 ) -> AsyncGrpcCall<TxInfo> {
205 let this = StateApi::new(self.inner.clone());
206 let to_address = to_address.to_string();
207
208 AsyncGrpcCall::new(move |context| async move {
209 let from_address = this.inner.address().map_err(|_| Error::ReadOnlyMode)?;
211
212 let msg = MsgSend {
213 from_address: from_address.to_string(),
214 to_address,
215 amount: vec![Coin::utia(amount).into()],
216 };
217
218 this.submit_message(msg, cfg).context(&context).await
219 })
220 }
221
222 pub fn submit_pay_for_blob(&self, blobs: &[Blob], cfg: TxConfig) -> AsyncGrpcCall<TxInfo> {
258 let inner = self.inner.clone();
259 let blobs = blobs.to_vec();
260
261 AsyncGrpcCall::new(move |context| async move {
262 Ok(inner
263 .grpc()?
264 .submit_blobs(&blobs, cfg)
265 .context(&context)
266 .await?)
267 })
268 }
269
270 pub fn cancel_unbonding_delegation(
272 &self,
273 validator_address: &ValAddress,
274 amount: u64,
275 creation_height: u64,
276 cfg: TxConfig,
277 ) -> AsyncGrpcCall<TxInfo> {
278 let this = StateApi::new(self.inner.clone());
279 let validator_address = validator_address.to_string();
280
281 AsyncGrpcCall::new(move |context| async move {
282 let delegator_address = this.inner.address()?;
283
284 let msg = MsgCancelUnbondingDelegation {
285 delegator_address: delegator_address.to_string(),
286 validator_address,
287 amount: Some(Coin::utia(amount).into()),
288 creation_height: height_i64(creation_height)?,
289 };
290
291 this.submit_message(msg, cfg).context(&context).await
292 })
293 }
294
295 pub fn begin_redelegate(
297 &self,
298 src_validator_address: &ValAddress,
299 dest_validator_address: &ValAddress,
300 amount: u64,
301 cfg: TxConfig,
302 ) -> AsyncGrpcCall<TxInfo> {
303 let this = StateApi::new(self.inner.clone());
304 let validator_src_address = src_validator_address.to_string();
305 let validator_dst_address = dest_validator_address.to_string();
306
307 AsyncGrpcCall::new(move |context| async move {
308 let delegator_address = this.inner.address()?;
309
310 let msg = MsgBeginRedelegate {
311 delegator_address: delegator_address.to_string(),
312 validator_src_address,
313 validator_dst_address,
314 amount: Some(Coin::utia(amount).into()),
315 };
316
317 this.submit_message(msg, cfg).context(&context).await
318 })
319 }
320
321 pub fn undelegate(
323 &self,
324 validator_address: &ValAddress,
325 amount: u64,
326 cfg: TxConfig,
327 ) -> AsyncGrpcCall<TxInfo> {
328 let this = StateApi::new(self.inner.clone());
329 let validator_address = validator_address.to_string();
330
331 AsyncGrpcCall::new(move |context| async move {
332 let delegator_address = this.inner.address()?;
333
334 let msg = MsgUndelegate {
335 delegator_address: delegator_address.to_string(),
336 validator_address,
337 amount: Some(Coin::utia(amount).into()),
338 };
339
340 this.submit_message(msg, cfg).context(&context).await
341 })
342 }
343
344 pub fn delegate(
346 &self,
347 validator_address: &ValAddress,
348 amount: u64,
349 cfg: TxConfig,
350 ) -> AsyncGrpcCall<TxInfo> {
351 let this = StateApi::new(self.inner.clone());
352 let validator_address = validator_address.to_string();
353
354 AsyncGrpcCall::new(move |context| async move {
355 let delegator_address = this.inner.address()?;
356
357 let msg = MsgDelegate {
358 delegator_address: delegator_address.to_string(),
359 validator_address,
360 amount: Some(Coin::utia(amount).into()),
361 };
362
363 this.submit_message(msg, cfg).context(&context).await
364 })
365 }
366
367 pub fn query_delegation(
369 &self,
370 validator_address: &ValAddress,
371 ) -> AsyncGrpcCall<QueryDelegationResponse> {
372 let this = StateApi::new(self.inner.clone());
373 let validator_address = *validator_address;
374
375 AsyncGrpcCall::new(move |context| async move {
376 let delegator_address = this.inner.address()?;
377
378 Ok(this
379 .inner
380 .grpc()?
381 .query_delegation(&delegator_address, &validator_address)
382 .context(&context)
383 .await?)
384 })
385 }
386
387 pub fn query_unbonding(
389 &self,
390 validator_address: &ValAddress,
391 ) -> AsyncGrpcCall<QueryUnbondingDelegationResponse> {
392 let this = StateApi::new(self.inner.clone());
393 let validator_address = *validator_address;
394
395 AsyncGrpcCall::new(move |context| async move {
396 let delegator_address = this.inner.address()?;
397
398 Ok(this
399 .inner
400 .grpc()?
401 .query_unbonding(&delegator_address, &validator_address)
402 .context(&context)
403 .await?)
404 })
405 }
406
407 pub fn query_redelegations(
409 &self,
410 src_validator_address: &ValAddress,
411 dest_validator_address: &ValAddress,
412 ) -> AsyncGrpcCall<QueryRedelegationsResponse> {
413 let this = StateApi::new(self.inner.clone());
414 let src_validator_address = *src_validator_address;
415 let dest_validator_address = *dest_validator_address;
416
417 AsyncGrpcCall::new(move |context| async move {
418 let delegator_address = this.inner.address()?;
419
420 let mut full_resp = QueryRedelegationsResponse {
421 responses: Vec::new(),
422 pagination: None,
423 };
424
425 let mut next_key = Vec::new();
426
427 loop {
428 let mut resp = this
429 .inner
430 .grpc()?
431 .query_redelegations(
432 &delegator_address,
433 &src_validator_address,
434 &dest_validator_address,
435 Some(PageRequest {
436 key: next_key,
437 ..Default::default()
438 }),
439 )
440 .context(&context)
441 .await?;
442
443 full_resp.responses.append(&mut resp.responses);
444
445 match resp.pagination {
446 Some(pagination) => next_key = pagination.next_key,
447 None => break,
448 }
449 }
450
451 Ok(full_resp)
452 })
453 }
454}
455
456#[cfg(test)]
457mod tests {
458 use super::*;
459
460 use std::time::Duration;
461
462 use celestia_grpc::Error as GrpcError;
463 use celestia_rpc::Error as RpcError;
464 use jsonrpsee::core::ClientError as JrpcError;
465 use lumina_utils::test_utils::async_test;
466 use tonic::Code;
467
468 use celestia_grpc::TxConfig;
469 use k256::ecdsa::SigningKey;
470
471 use crate::test_utils::{
472 TEST_GRPC_URL, TEST_RPC_URL, ensure_serializable_deserializable, new_client,
473 new_read_only_client, new_rpc_only_client, node0_address, validator_address,
474 };
475 use crate::{Client, Error};
476
477 #[async_test]
478 async fn transfer() {
479 let client = new_client().await;
480
481 let random_key = SigningKey::random(&mut rand::rngs::OsRng);
482 let random_acc = random_key.verifying_key().into();
483
484 client
485 .state()
486 .transfer(&random_acc, 123, TxConfig::default())
487 .await
488 .unwrap();
489
490 assert_eq!(
491 client
492 .state()
493 .balance_for_address_unverified(&random_acc)
494 .await
495 .unwrap(),
496 123
497 );
498
499 let client_ro = new_read_only_client().await;
500 let e = client_ro
501 .state()
502 .transfer(&random_acc, 123, TxConfig::default())
503 .await
504 .unwrap_err();
505
506 assert!(matches!(e, Error::ReadOnlyMode));
507 }
508
509 #[async_test]
510 async fn delegation() {
511 let client = new_client().await;
512 let validator_addr = validator_address();
513 let client_addr = client.address().unwrap();
514
515 client
517 .state()
518 .delegate(&validator_addr, 100, TxConfig::default())
519 .await
520 .unwrap();
521
522 let del = client
523 .state()
524 .query_delegation(&validator_addr)
525 .await
526 .unwrap();
527
528 assert_eq!(del.response.balance, 100);
529 assert_eq!(del.response.delegation.delegator_address, client_addr);
530 assert_eq!(del.response.delegation.validator_address, validator_addr);
531 assert_eq!(del.response.delegation.shares, 100.into());
532
533 let unbond_tx_height = client
535 .state()
536 .undelegate(&validator_addr, 10, TxConfig::default())
537 .await
538 .unwrap()
539 .height;
540
541 let unbond = client
542 .state()
543 .query_unbonding(&validator_addr)
544 .await
545 .unwrap();
546
547 assert_eq!(unbond.unbond.delegator_address, client_addr);
548 assert_eq!(unbond.unbond.validator_address, validator_addr);
549 assert_eq!(unbond.unbond.entries.len(), 1);
550 assert_eq!(
551 unbond.unbond.entries[0].creation_height.value(),
552 unbond_tx_height
553 );
554 assert_eq!(unbond.unbond.entries[0].initial_balance, 10);
555 assert_eq!(unbond.unbond.entries[0].balance, 10);
556
557 let del = client
558 .state()
559 .query_delegation(&validator_addr)
560 .await
561 .unwrap();
562
563 assert_eq!(del.response.balance, 90);
564 assert_eq!(del.response.delegation.delegator_address, client_addr);
565 assert_eq!(del.response.delegation.validator_address, validator_addr);
566 assert_eq!(del.response.delegation.shares, 90.into());
567
568 client
570 .state()
571 .cancel_unbonding_delegation(&validator_addr, 3, unbond_tx_height, TxConfig::default())
572 .await
573 .unwrap();
574
575 let unbond = client
576 .state()
577 .query_unbonding(&validator_addr)
578 .await
579 .unwrap();
580
581 assert_eq!(unbond.unbond.delegator_address, client_addr);
582 assert_eq!(unbond.unbond.validator_address, validator_addr);
583 assert_eq!(unbond.unbond.entries.len(), 1);
584 assert_eq!(
585 unbond.unbond.entries[0].creation_height.value(),
586 unbond_tx_height
587 );
588 assert_eq!(unbond.unbond.entries[0].initial_balance, 7);
589 assert_eq!(unbond.unbond.entries[0].balance, 7);
590
591 let del = client
592 .state()
593 .query_delegation(&validator_addr)
594 .await
595 .unwrap();
596
597 assert_eq!(del.response.balance, 93);
598 assert_eq!(del.response.delegation.delegator_address, client_addr);
599 assert_eq!(del.response.delegation.validator_address, validator_addr);
600 assert_eq!(del.response.delegation.shares, 93.into());
601
602 client
604 .state()
605 .cancel_unbonding_delegation(&validator_addr, 7, unbond_tx_height, TxConfig::default())
606 .await
607 .unwrap();
608
609 let err = client
610 .state()
611 .query_unbonding(&validator_addr)
612 .await
613 .unwrap_err();
614
615 assert_eq!(err.as_grpc_status().unwrap().code(), tonic::Code::NotFound);
616
617 let del = client
618 .state()
619 .query_delegation(&validator_addr)
620 .await
621 .unwrap();
622
623 assert_eq!(del.response.balance, 100);
624 assert_eq!(del.response.delegation.delegator_address, client_addr);
625 assert_eq!(del.response.delegation.validator_address, validator_addr);
626 assert_eq!(del.response.delegation.shares, 100.into());
627 }
628
629 #[async_test]
630 async fn balance_for_address() {
631 let client_ro = new_read_only_client().await;
632
633 let addr = node0_address();
635 let balance = client_ro.state().balance_for_address(&addr).await.unwrap();
636 assert!(balance > 0);
637
638 let balance = client_ro
640 .state()
641 .balance_for_address_unverified(&addr)
642 .await
643 .unwrap();
644 assert!(balance > 0);
645
646 let e = client_ro.state().balance().await.unwrap_err();
648 assert!(matches!(e, Error::NoAssociatedAddress));
649
650 let e = client_ro.state().balance().await.unwrap_err();
652 assert!(matches!(e, Error::NoAssociatedAddress));
653
654 let client_rpc = new_rpc_only_client().await;
655
656 let balance = client_rpc.state().balance_for_address(&addr).await.unwrap();
658 assert!(balance > 0);
659
660 let e = client_rpc
662 .state()
663 .balance_for_address_unverified(&addr)
664 .await
665 .unwrap_err();
666 assert!(matches!(e, Error::GrpcEndpointNotSet));
667 }
668
669 #[allow(dead_code)]
670 #[allow(unused_variables)]
671 #[allow(unreachable_code)]
672 #[allow(clippy::diverging_sub_expression)]
673 async fn enforce_serde_bounds() {
674 let api = StateApi::new(unimplemented!());
676
677 let cfg = ensure_serializable_deserializable(TxConfig::default());
678 let val_addr: ValAddress = ensure_serializable_deserializable(unimplemented!());
679 let acc_addr: AccAddress = ensure_serializable_deserializable(unimplemented!());
680
681 ensure_serializable_deserializable(api.balance().await.unwrap());
682
683 ensure_serializable_deserializable(api.balance_unverified().await.unwrap());
684
685 ensure_serializable_deserializable(api.balance_for_address(&acc_addr).await.unwrap());
686
687 ensure_serializable_deserializable(
688 api.balance_for_address_unverified(&acc_addr).await.unwrap(),
689 );
690
691 ensure_serializable_deserializable(api.estimate_gas_price(TxPriority::Low).await.unwrap());
692
693 ensure_serializable_deserializable(
694 api.estimate_gas_price_and_usage(TxPriority::Low, Vec::new())
695 .await
696 .unwrap(),
697 );
698
699 ensure_serializable_deserializable(api.submit_message((), cfg).await.unwrap());
700
701 ensure_serializable_deserializable(api.transfer(&acc_addr, 0, cfg).await.unwrap());
702
703 let blobs: Vec<_> = ensure_serializable_deserializable(unimplemented!());
704 ensure_serializable_deserializable(api.submit_pay_for_blob(&blobs, cfg).await.unwrap());
705
706 ensure_serializable_deserializable(
707 api.cancel_unbonding_delegation(&val_addr, 0, 0, cfg)
708 .await
709 .unwrap(),
710 );
711
712 ensure_serializable_deserializable(
713 api.begin_redelegate(&val_addr, &val_addr, 0, cfg)
714 .await
715 .unwrap(),
716 );
717
718 ensure_serializable_deserializable(api.undelegate(&val_addr, 0, cfg).await.unwrap());
719
720 ensure_serializable_deserializable(api.delegate(&val_addr, 0, cfg).await.unwrap());
721
722 ensure_serializable_deserializable(api.query_delegation(&val_addr).await.unwrap());
723
724 ensure_serializable_deserializable(api.query_unbonding(&val_addr).await.unwrap());
725
726 ensure_serializable_deserializable(
727 api.query_redelegations(&val_addr, &val_addr).await.unwrap(),
728 );
729 }
730
731 #[async_test]
732 async fn rpc_timeout() {
733 let client_build_error = Client::builder()
734 .rpc_url(TEST_RPC_URL)
735 .grpc_url(TEST_GRPC_URL)
736 .timeout(Duration::from_nanos(1))
737 .build()
738 .await
739 .unwrap_err();
740
741 assert!(matches!(
742 client_build_error,
743 Error::Rpc(RpcError::JsonRpc(JrpcError::RequestTimeout))
744 ));
745 }
746
747 #[async_test]
748 async fn grpc_timeout() {
749 let client = new_client().await;
750
751 let balance_timeout = client
752 .state()
753 .balance()
754 .timeout(Duration::from_nanos(100))
755 .await
756 .unwrap_err();
757
758 let _balance_ok = client.state().balance().await.unwrap();
759
760 let Error::Grpc(GrpcError::TonicError(status)) = balance_timeout else {
761 panic!("Invalid error type");
762 };
763
764 assert!(status.code() == Code::DeadlineExceeded || status.code() == Code::Cancelled);
765 }
766}