1use crate::generated;
2use crate::generated::ark::v1::ark_service_client::ArkServiceClient;
3use crate::generated::ark::v1::get_subscription_response;
4use crate::generated::ark::v1::indexer_service_client::IndexerServiceClient;
5use crate::generated::ark::v1::indexer_tx_history_record::Key;
6use crate::generated::ark::v1::ConfirmRegistrationRequest;
7use crate::generated::ark::v1::EstimateIntentFeeRequest;
8use crate::generated::ark::v1::GetEventStreamRequest;
9use crate::generated::ark::v1::GetInfoRequest;
10use crate::generated::ark::v1::GetSubscriptionRequest;
11use crate::generated::ark::v1::GetTransactionsStreamRequest;
12use crate::generated::ark::v1::IndexerChainedTxType;
13use crate::generated::ark::v1::Intent;
14use crate::generated::ark::v1::Outpoint;
15use crate::generated::ark::v1::RegisterIntentRequest;
16use crate::generated::ark::v1::SubmitSignedForfeitTxsRequest;
17use crate::generated::ark::v1::SubmitTreeNoncesRequest;
18use crate::generated::ark::v1::SubmitTreeSignaturesRequest;
19use crate::generated::ark::v1::SubscribeForScriptsRequest;
20use crate::generated::ark::v1::UnsubscribeForScriptsRequest;
21use crate::Error;
22use ark_core::asset::AssetId;
23use ark_core::history;
24use ark_core::server::parse_sequence_number;
25use ark_core::server::ArkTransaction;
26use ark_core::server::AssetInfo;
27use ark_core::server::BatchFailed;
28use ark_core::server::BatchFinalizationEvent;
29use ark_core::server::BatchFinalizedEvent;
30use ark_core::server::BatchStartedEvent;
31use ark_core::server::BatchTreeEventType;
32use ark_core::server::ChainedTxType;
33use ark_core::server::CommitmentTransaction;
34use ark_core::server::FinalizeOffchainTxResponse;
35use ark_core::server::GetVtxosRequest;
36use ark_core::server::GetVtxosRequestFilter;
37use ark_core::server::GetVtxosRequestReference;
38use ark_core::server::IndexerPage;
39use ark_core::server::Info;
40use ark_core::server::NoncePks;
41use ark_core::server::PartialSigTree;
42use ark_core::server::StreamEvent;
43use ark_core::server::StreamStartedEvent;
44use ark_core::server::StreamTransactionData;
45use ark_core::server::SubmitOffchainTxResponse;
46use ark_core::server::SubscriptionEvent;
47use ark_core::server::SubscriptionResponse;
48use ark_core::server::TreeNoncesAggregatedEvent;
49use ark_core::server::TreeNoncesEvent;
50use ark_core::server::TreeSignatureEvent;
51use ark_core::server::TreeSigningStartedEvent;
52use ark_core::server::TreeTxEvent;
53use ark_core::server::TreeTxNoncePks;
54use ark_core::server::VirtualTxOutPoint;
55use ark_core::server::VirtualTxsResponse;
56use ark_core::server::VtxoChain;
57use ark_core::server::VtxoChains;
58use ark_core::server::SDK_VERSION;
59use ark_core::server::TARGET_ARKD_VERSION;
60use ark_core::ArkAddress;
61use ark_core::TxGraphChunk;
62use async_stream::stream;
63use base64::Engine;
64use bitcoin::hex::FromHex;
65use bitcoin::secp256k1::PublicKey;
66use bitcoin::taproot::Signature;
67use bitcoin::OutPoint;
68use bitcoin::Psbt;
69use bitcoin::ScriptBuf;
70use bitcoin::SignedAmount;
71use bitcoin::Transaction;
72use bitcoin::Txid;
73use futures::Future;
74use futures::Stream;
75use futures::StreamExt;
76use futures::TryStreamExt;
77use std::collections::HashMap;
78use std::error::Error as StdError;
79use std::fmt;
80use std::str::FromStr;
81use std::sync::Arc;
82use std::sync::RwLock;
83
84#[derive(Clone, Default)]
85struct HeaderState {
86 digest: Arc<RwLock<Option<String>>>,
87}
88
89impl HeaderState {
90 fn set_digest(&self, digest: String) {
91 let digest = (!digest.is_empty()).then_some(digest);
92 match self.digest.write() {
93 Ok(mut guard) => *guard = digest,
94 Err(poisoned) => {
95 log::warn!("digest header state lock poisoned while updating; recovering");
96 *poisoned.into_inner() = digest;
97 }
98 }
99 }
100
101 fn digest(&self) -> Option<String> {
102 match self.digest.read() {
103 Ok(guard) => guard.clone(),
104 Err(poisoned) => {
105 log::warn!("digest header state lock poisoned while reading; recovering");
106 poisoned.into_inner().clone()
107 }
108 }
109 }
110}
111
112#[derive(Clone, Default)]
113struct HeaderInterceptor {
114 state: HeaderState,
115}
116
117impl tonic::service::Interceptor for HeaderInterceptor {
118 fn call(&mut self, mut req: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
119 let metadata = req.metadata_mut();
120 metadata.insert(
121 "x-build-version",
122 tonic::metadata::MetadataValue::from_static(TARGET_ARKD_VERSION),
123 );
124 metadata.insert(
125 "x-sdk-version",
126 tonic::metadata::MetadataValue::from_static(SDK_VERSION),
127 );
128
129 if let Some(digest) = self.state.digest() {
130 if let Ok(value) = tonic::metadata::MetadataValue::try_from(digest.as_str()) {
131 metadata.insert("x-digest", value);
132 }
133 }
134
135 Ok(req)
136 }
137}
138
139type InterceptedChannel =
140 tonic::codegen::InterceptedService<tonic::transport::Channel, HeaderInterceptor>;
141
142type InfoRefreshHook =
143 Arc<dyn Fn(Info) -> Result<(), Box<dyn StdError + Send + Sync + 'static>> + Send + Sync>;
144
145#[derive(Clone, Default)]
146struct SharedState {
147 headers: HeaderState,
148 info_refresh_hook: Arc<RwLock<Option<InfoRefreshHook>>>,
149}
150
151impl SharedState {
152 fn set_info_refresh_hook(&self, hook: InfoRefreshHook) {
153 match self.info_refresh_hook.write() {
154 Ok(mut guard) => *guard = Some(hook),
155 Err(poisoned) => {
156 log::warn!("info refresh hook lock poisoned while updating; recovering");
157 *poisoned.into_inner() = Some(hook);
158 }
159 }
160 }
161
162 async fn guarded<T>(
169 &self,
170 info_client: ArkServiceClient<InterceptedChannel>,
171 op: impl Future<Output = Result<T, Error>>,
172 ) -> Result<T, Error> {
173 match op.await {
174 Ok(value) => Ok(value),
175 Err(err) if err.is_digest_mismatch() => {
176 let original = err;
177 let info = self.fetch_info_unguarded(info_client).await?;
178 let digest = info.digest.clone();
179
180 if let Some(hook) = self.info_refresh_hook() {
181 hook(info).map_err(Error::conversion)?;
182 }
183
184 self.headers.set_digest(digest);
185 Err(Error::server_info_changed(original))
186 }
187 Err(err) => Err(err),
188 }
189 }
190
191 fn info_refresh_hook(&self) -> Option<InfoRefreshHook> {
192 match self.info_refresh_hook.read() {
193 Ok(hook) => hook.clone(),
194 Err(poisoned) => {
195 log::warn!("info refresh hook lock poisoned while reading; recovering");
196 poisoned.into_inner().clone()
197 }
198 }
199 }
200
201 async fn get_info_unguarded(
202 &self,
203 client: ArkServiceClient<InterceptedChannel>,
204 ) -> Result<Info, Error> {
205 let info = self.fetch_info_unguarded(client).await?;
206 self.headers.set_digest(info.digest.clone());
207 Ok(info)
208 }
209
210 async fn fetch_info_unguarded(
211 &self,
212 mut client: ArkServiceClient<InterceptedChannel>,
213 ) -> Result<Info, Error> {
214 let response = client
215 .get_info(GetInfoRequest {})
216 .await
217 .map_err(Error::request)?;
218
219 response.into_inner().try_into()
220 }
221}
222
223#[derive(Clone)]
224pub struct Client {
225 url: String,
226 ark: Option<guarded::Ark>,
227 indexer: Option<guarded::Indexer>,
228 shared: SharedState,
229}
230
231impl fmt::Debug for Client {
232 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
233 f.debug_struct("Client")
234 .field("url", &self.url)
235 .field("connected", &self.ark.is_some())
236 .finish()
237 }
238}
239
240impl Client {
241 pub fn new(url: String) -> Self {
242 Self {
243 url,
244 ark: None,
245 indexer: None,
246 shared: SharedState::default(),
247 }
248 }
249
250 pub fn set_info_refresh_hook(
251 &mut self,
252 hook: impl Fn(Info) -> Result<(), Box<dyn StdError + Send + Sync + 'static>>
253 + Send
254 + Sync
255 + 'static,
256 ) {
257 self.shared.set_info_refresh_hook(Arc::new(hook));
258 }
259
260 pub async fn connect(&mut self) -> Result<(), Error> {
261 let endpoint =
262 tonic::transport::Endpoint::from_shared(self.url.clone()).map_err(Error::connect)?;
263
264 #[cfg(any(feature = "tls-webpki-roots", feature = "tls-native-roots"))]
265 let endpoint = {
266 let tls = tonic::transport::ClientTlsConfig::new();
267 #[cfg(feature = "tls-webpki-roots")]
268 let tls = tls.with_webpki_roots();
269 #[cfg(feature = "tls-native-roots")]
270 let tls = tls.with_native_roots();
271 endpoint.tls_config(tls).map_err(Error::connect)?
272 };
273
274 let channel = endpoint.connect().await.map_err(Error::connect)?;
275
276 let interceptor = HeaderInterceptor {
277 state: self.shared.headers.clone(),
278 };
279 let ark_service_client =
280 ArkServiceClient::with_interceptor(channel.clone(), interceptor.clone());
281 let indexer_client = IndexerServiceClient::with_interceptor(channel, interceptor);
282
283 self.ark = Some(guarded::Ark::new(
284 ark_service_client.clone(),
285 self.shared.clone(),
286 ));
287 self.indexer = Some(guarded::Indexer::new(
288 indexer_client,
289 ark_service_client,
290 self.shared.clone(),
291 ));
292 Ok(())
293 }
294
295 pub async fn get_info(&self) -> Result<Info, Error> {
296 self.ark()?.get_info().await
297 }
298
299 pub async fn list_vtxos(&self, request: GetVtxosRequest) -> Result<ListVtxosResponse, Error> {
302 if request.reference().is_empty() {
303 return Ok(ListVtxosResponse {
304 vtxos: Vec::new(),
305 page: None,
306 });
307 }
308
309 let response = self
310 .indexer()?
311 .request(move |mut client| async move {
312 client
313 .get_vtxos(generated::ark::v1::GetVtxosRequest::from(request))
314 .await
315 })
316 .await?;
317
318 let inner = response.into_inner();
319
320 let vtxos = inner
321 .vtxos
322 .iter()
323 .map(VirtualTxOutPoint::try_from)
324 .collect::<Result<Vec<_>, _>>()?;
325
326 let page = inner
327 .page
328 .map(IndexerPage::try_from)
329 .transpose()
330 .map_err(Error::conversion)?;
331
332 Ok(ListVtxosResponse { vtxos, page })
333 }
334
335 pub async fn register_intent(&self, intent: ark_core::intent::Intent) -> Result<String, Error> {
336 let intent = intent.try_into()?;
337 let request = RegisterIntentRequest {
338 intent: Some(intent),
339 };
340
341 let response = self
342 .ark()?
343 .request(move |mut client| async move { client.register_intent(request).await })
344 .await?;
345
346 let intent_id = response.into_inner().intent_id;
347
348 Ok(intent_id)
349 }
350
351 pub async fn submit_offchain_transaction_request(
352 &self,
353 ark_tx: Psbt,
354 checkpoint_txs: Vec<Psbt>,
355 ) -> Result<SubmitOffchainTxResponse, Error> {
356 let base64 = base64::engine::GeneralPurpose::new(
357 &base64::alphabet::STANDARD,
358 base64::engine::GeneralPurposeConfig::new(),
359 );
360
361 let ark_tx = base64.encode(ark_tx.serialize());
362
363 let checkpoint_txs = checkpoint_txs
364 .into_iter()
365 .map(|tx| base64.encode(tx.serialize()))
366 .collect();
367
368 let res = self
369 .ark()?
370 .request(move |mut client| async move {
371 client
372 .submit_tx(generated::ark::v1::SubmitTxRequest {
373 signed_ark_tx: ark_tx,
374 checkpoint_txs,
375 })
376 .await
377 })
378 .await?;
379
380 let res = res.into_inner();
381
382 let signed_ark_tx = res.final_ark_tx;
383 let signed_ark_tx = base64.decode(signed_ark_tx).map_err(Error::conversion)?;
384 let signed_ark_tx = Psbt::deserialize(&signed_ark_tx).map_err(Error::conversion)?;
385
386 let signed_checkpoint_txs = res
387 .signed_checkpoint_txs
388 .into_iter()
389 .map(|tx| {
390 let tx = base64.decode(tx).map_err(Error::conversion)?;
391 let tx = Psbt::deserialize(&tx).map_err(Error::conversion)?;
392
393 Ok(tx)
394 })
395 .collect::<Result<Vec<_>, Error>>()?;
396
397 Ok(SubmitOffchainTxResponse {
398 signed_ark_tx,
399 signed_checkpoint_txs,
400 })
401 }
402
403 pub async fn finalize_offchain_transaction(
404 &self,
405 txid: Txid,
406 checkpoint_txs: Vec<Psbt>,
407 ) -> Result<FinalizeOffchainTxResponse, Error> {
408 let base64 = base64::engine::GeneralPurpose::new(
409 &base64::alphabet::STANDARD,
410 base64::engine::GeneralPurposeConfig::new(),
411 );
412
413 let checkpoint_txs = checkpoint_txs
414 .into_iter()
415 .map(|tx| base64.encode(tx.serialize()))
416 .collect();
417
418 self.ark()?
419 .request(move |mut client| async move {
420 client
421 .finalize_tx(generated::ark::v1::FinalizeTxRequest {
422 ark_txid: txid.to_string(),
423 final_checkpoint_txs: checkpoint_txs,
424 })
425 .await
426 })
427 .await?;
428
429 Ok(FinalizeOffchainTxResponse {})
430 }
431
432 pub async fn get_pending_tx(
433 &self,
434 intent: ark_core::intent::Intent,
435 ) -> Result<Vec<ark_core::server::PendingTx>, Error> {
436 let intent: Intent = intent.try_into()?;
437
438 let res = self
439 .ark()?
440 .request(move |mut client| async move {
441 client
442 .get_pending_tx(generated::ark::v1::GetPendingTxRequest {
443 identifier: Some(
444 generated::ark::v1::get_pending_tx_request::Identifier::Intent(intent),
445 ),
446 })
447 .await
448 })
449 .await?;
450
451 let inner = res.into_inner();
452 let base64 = base64::engine::GeneralPurpose::new(
453 &base64::alphabet::STANDARD,
454 base64::engine::GeneralPurposeConfig::new(),
455 );
456
457 inner
458 .pending_txs
459 .into_iter()
460 .map(|tx| {
461 let ark_txid = tx.ark_txid.parse().map_err(Error::conversion)?;
462
463 let signed_ark_tx = base64.decode(&tx.final_ark_tx).map_err(Error::conversion)?;
464 let signed_ark_tx = Psbt::deserialize(&signed_ark_tx).map_err(Error::conversion)?;
465
466 let signed_checkpoint_txs = tx
467 .signed_checkpoint_txs
468 .into_iter()
469 .map(|cp| {
470 let bytes = base64.decode(cp).map_err(Error::conversion)?;
471 Psbt::deserialize(&bytes).map_err(Error::conversion)
472 })
473 .collect::<Result<Vec<_>, Error>>()?;
474
475 Ok(ark_core::server::PendingTx {
476 ark_txid,
477 signed_ark_tx,
478 signed_checkpoint_txs,
479 })
480 })
481 .collect()
482 }
483
484 pub async fn confirm_registration(&self, intent_id: String) -> Result<(), Error> {
485 self.ark()?
486 .request(move |mut client| async move {
487 client
488 .confirm_registration(ConfirmRegistrationRequest { intent_id })
489 .await
490 })
491 .await?;
492
493 Ok(())
494 }
495
496 pub async fn submit_tree_nonces(
497 &self,
498 batch_id: &str,
499 cosigner_pubkey: PublicKey,
500 pub_nonce_tree: NoncePks,
501 ) -> Result<(), Error> {
502 self.ark()?
503 .request(move |mut client| async move {
504 client
505 .submit_tree_nonces(SubmitTreeNoncesRequest {
506 batch_id: batch_id.to_string(),
507 pubkey: cosigner_pubkey.to_string(),
508 tree_nonces: pub_nonce_tree.encode(),
509 })
510 .await
511 })
512 .await?;
513
514 Ok(())
515 }
516
517 pub async fn submit_tree_signatures(
518 &self,
519 batch_id: &str,
520 cosigner_pk: PublicKey,
521 partial_sig_tree: PartialSigTree,
522 ) -> Result<(), Error> {
523 self.ark()?
524 .request(move |mut client| async move {
525 client
526 .submit_tree_signatures(SubmitTreeSignaturesRequest {
527 batch_id: batch_id.to_string(),
528 pubkey: cosigner_pk.to_string(),
529 tree_signatures: partial_sig_tree.encode(),
530 })
531 .await
532 })
533 .await?;
534
535 Ok(())
536 }
537
538 pub async fn submit_signed_forfeit_txs(
539 &self,
540 signed_forfeit_txs: Vec<Psbt>,
541 signed_commitment_tx: Option<Psbt>,
542 ) -> Result<(), Error> {
543 let base64 = base64::engine::GeneralPurpose::new(
544 &base64::alphabet::STANDARD,
545 base64::engine::GeneralPurposeConfig::new(),
546 );
547
548 let signed_commitment_tx = signed_commitment_tx
549 .map(|tx| base64.encode(tx.serialize()))
550 .unwrap_or_default();
551
552 self.ark()?
553 .request(move |mut client| async move {
554 client
555 .submit_signed_forfeit_txs(SubmitSignedForfeitTxsRequest {
556 signed_forfeit_txs: signed_forfeit_txs
557 .iter()
558 .map(|psbt| base64.encode(psbt.serialize()))
559 .collect(),
560 signed_commitment_tx,
561 })
562 .await
563 })
564 .await?;
565
566 Ok(())
567 }
568
569 pub async fn get_event_stream(
570 &self,
571 topics: Vec<String>,
572 ) -> Result<impl Stream<Item = Result<StreamEvent, Error>> + Unpin, Error> {
573 let response = self
574 .ark()?
575 .request(move |mut client| async move {
576 client
577 .get_event_stream(GetEventStreamRequest { topics })
578 .await
579 })
580 .await?;
581 let mut stream = response.into_inner();
582
583 let stream = stream! {
584 loop {
585 match stream.try_next().await {
586 Ok(Some(event)) => match event.event {
587 None => {
588 log::debug!("Got empty message");
589 }
590 Some(event) => {
591 yield Ok(StreamEvent::try_from(event)?);
592 }
593 },
594 Ok(None) => {
595 yield Err(Error::event_stream_disconnect());
596 }
597 Err(e) => {
598 yield Err(Error::event_stream(e));
599 }
600 }
601 }
602 };
603
604 Ok(stream.boxed())
605 }
606
607 pub async fn get_tx_stream(
608 &self,
609 ) -> Result<impl Stream<Item = Result<StreamTransactionData, Error>> + Unpin, Error> {
610 let response = self
611 .ark()?
612 .request(|mut client| async move {
613 client
614 .get_transactions_stream(GetTransactionsStreamRequest {})
615 .await
616 })
617 .await?;
618
619 let mut stream = response.into_inner();
620
621 let stream = stream! {
622 loop {
623 match stream.try_next().await {
624 Ok(Some(event)) => match event.data {
625 None => {
626 log::debug!("Got empty message");
627 }
628 Some(event) => {
629 yield Ok(StreamTransactionData::try_from(event)?);
630 }
631 },
632 Ok(None) => {
633 yield Err(Error::event_stream_disconnect());
634 }
635 Err(e) => {
636 yield Err(Error::event_stream(e));
637 }
638 }
639 }
640 };
641
642 Ok(stream.boxed())
643 }
644
645 pub async fn get_vtxo_chain(
646 &self,
647 outpoint: Option<OutPoint>,
648 size_and_index: Option<(i32, i32)>,
649 ) -> Result<VtxoChainResponse, Error> {
650 let response = self
651 .indexer()?
652 .request(move |mut client| async move {
653 client
654 .get_vtxo_chain(generated::ark::v1::GetVtxoChainRequest {
655 outpoint: outpoint.map(|o| generated::ark::v1::IndexerOutpoint {
656 txid: o.txid.to_string(),
657 vout: o.vout,
658 }),
659 page: size_and_index.map(|(size, index)| {
660 generated::ark::v1::IndexerPageRequest { size, index }
661 }),
662 })
663 .await
664 })
665 .await?;
666 let response = response.into_inner();
667 let result = response.try_into()?;
668 Ok(result)
669 }
670
671 pub async fn get_virtual_txs(
672 &self,
673 txids: Vec<String>,
674 size_and_index: Option<(i32, i32)>,
675 ) -> Result<VirtualTxsResponse, Error> {
676 let response = self
677 .indexer()?
678 .request(move |mut client| async move {
679 client
680 .get_virtual_txs(generated::ark::v1::GetVirtualTxsRequest {
681 txids,
682 page: size_and_index.map(|(size, index)| {
683 generated::ark::v1::IndexerPageRequest { size, index }
684 }),
685 })
686 .await
687 })
688 .await?;
689 let response = response.into_inner();
690 let result = response.try_into()?;
691 Ok(result)
692 }
693
694 pub async fn subscribe_to_scripts(
704 &self,
705 scripts: Vec<ArkAddress>,
706 subscription_id: Option<String>,
707 ) -> Result<String, Error> {
708 let scripts = scripts
709 .iter()
710 .map(|address| address.to_p2tr_script_pubkey().to_hex_string())
711 .collect::<Vec<_>>();
712
713 let subscription_id = subscription_id.unwrap_or_default();
715
716 let response = self
717 .indexer()?
718 .request(move |mut client| async move {
719 client
720 .subscribe_for_scripts(SubscribeForScriptsRequest {
721 scripts,
722 subscription_id,
723 })
724 .await
725 })
726 .await?;
727
728 let response = response.into_inner();
729
730 Ok(response.subscription_id)
731 }
732
733 pub async fn unsubscribe_from_scripts(
735 &self,
736 scripts: Vec<ArkAddress>,
737 subscription_id: String,
738 ) -> Result<(), Error> {
739 let scripts = scripts
740 .iter()
741 .map(|address| address.to_p2tr_script_pubkey().to_hex_string())
742 .collect::<Vec<_>>();
743
744 self.indexer()?
745 .request(move |mut client| async move {
746 client
747 .unsubscribe_for_scripts(UnsubscribeForScriptsRequest {
748 subscription_id,
749 scripts,
750 })
751 .await
752 })
753 .await?;
754
755 Ok(())
756 }
757
758 pub async fn get_subscription(
760 &self,
761 subscription_id: String,
762 ) -> Result<impl Stream<Item = Result<SubscriptionResponse, Error>> + Unpin, Error> {
763 let response = self
764 .indexer()?
765 .request(move |mut client| async move {
766 client
767 .get_subscription(GetSubscriptionRequest { subscription_id })
768 .await
769 })
770 .await?;
771
772 let mut stream = response.into_inner();
773
774 let stream = stream! {
775 loop {
776 match stream.try_next().await {
777 Ok(Some(response)) => {
778 match SubscriptionResponse::try_from(response) {
779 Ok(subscription_response) => {
780 yield Ok(subscription_response);
781 }
782 Err(e) => {
783 yield Err(e);
784 }
785 }
786 }
787 Ok(None) => {
788 break;
789 }
790 Err(e) => {
791 yield Err(Error::event_stream(e));
792 }
793 }
794 }
795 };
796
797 Ok(stream.boxed())
798 }
799
800 pub async fn estimate_fees(
801 &self,
802 intent: ark_core::intent::Intent,
803 ) -> Result<SignedAmount, Error> {
804 let intent = intent.try_into()?;
805 let response = self
806 .ark()?
807 .request(move |mut client| async move {
808 client
809 .estimate_intent_fee(EstimateIntentFeeRequest {
810 intent: Some(intent),
811 })
812 .await
813 })
814 .await?;
815 let response = response.into_inner();
816
817 Ok(SignedAmount::from_sat(response.fee))
818 }
819
820 pub async fn get_asset(&self, asset_id: AssetId) -> Result<AssetInfo, Error> {
821 let response = self
822 .indexer()?
823 .request(move |mut client| async move {
824 client
825 .get_asset(generated::ark::v1::GetAssetRequest {
826 asset_id: asset_id.to_string(),
827 })
828 .await
829 })
830 .await?;
831
832 let inner = response.into_inner();
833
834 let supply = inner.supply.parse::<u64>().map_err(Error::conversion)?;
835
836 let asset_id = inner.asset_id.parse().map_err(Error::conversion)?;
837 let control_asset_id = if inner.control_asset.is_empty() {
838 None
839 } else {
840 Some(inner.control_asset.parse().map_err(Error::conversion)?)
841 };
842
843 Ok(AssetInfo {
844 asset_id,
845 control_asset_id,
846 supply,
847 metadata: inner.metadata,
848 })
849 }
850
851 fn ark(&self) -> Result<guarded::Ark, Error> {
852 self.ark.clone().ok_or(Error::not_connected())
853 }
854
855 fn indexer(&self) -> Result<guarded::Indexer, Error> {
856 self.indexer.clone().ok_or(Error::not_connected())
857 }
858}
859
860impl TryFrom<ark_core::intent::Intent> for Intent {
861 type Error = Error;
862
863 fn try_from(value: ark_core::intent::Intent) -> Result<Self, Self::Error> {
864 Ok(Self {
865 proof: value.serialize_proof(),
866 message: value.serialize_message().map_err(Error::conversion)?,
867 })
868 }
869}
870
871impl TryFrom<generated::ark::v1::BatchStartedEvent> for BatchStartedEvent {
872 type Error = Error;
873
874 fn try_from(value: generated::ark::v1::BatchStartedEvent) -> Result<Self, Self::Error> {
875 let batch_expiry = parse_sequence_number(value.batch_expiry).map_err(Error::conversion)?;
876
877 Ok(BatchStartedEvent {
878 id: value.id,
879 intent_id_hashes: value.intent_id_hashes,
880 batch_expiry,
881 })
882 }
883}
884
885impl TryFrom<generated::ark::v1::StreamStartedEvent> for StreamStartedEvent {
886 type Error = Error;
887
888 fn try_from(value: generated::ark::v1::StreamStartedEvent) -> Result<Self, Self::Error> {
889 Ok(StreamStartedEvent { id: value.id })
890 }
891}
892
893impl TryFrom<generated::ark::v1::BatchFinalizationEvent> for BatchFinalizationEvent {
894 type Error = Error;
895
896 fn try_from(value: generated::ark::v1::BatchFinalizationEvent) -> Result<Self, Self::Error> {
897 let base64 = &base64::engine::GeneralPurpose::new(
898 &base64::alphabet::STANDARD,
899 base64::engine::GeneralPurposeConfig::new(),
900 );
901
902 let commitment_tx = base64
903 .decode(&value.commitment_tx)
904 .map_err(Error::conversion)?;
905 let commitment_tx = Psbt::deserialize(&commitment_tx).map_err(Error::conversion)?;
906
907 Ok(BatchFinalizationEvent {
908 id: value.id,
909 commitment_tx,
910 })
911 }
912}
913
914impl TryFrom<generated::ark::v1::BatchFinalizedEvent> for BatchFinalizedEvent {
915 type Error = Error;
916
917 fn try_from(value: generated::ark::v1::BatchFinalizedEvent) -> Result<Self, Self::Error> {
918 let commitment_txid = value.commitment_txid.parse().map_err(Error::conversion)?;
919
920 Ok(BatchFinalizedEvent {
921 id: value.id,
922 commitment_txid,
923 })
924 }
925}
926
927impl From<generated::ark::v1::BatchFailedEvent> for BatchFailed {
928 fn from(value: generated::ark::v1::BatchFailedEvent) -> Self {
929 BatchFailed {
930 id: value.id,
931 reason: value.reason,
932 }
933 }
934}
935
936impl TryFrom<generated::ark::v1::TreeSigningStartedEvent> for TreeSigningStartedEvent {
937 type Error = Error;
938
939 fn try_from(value: generated::ark::v1::TreeSigningStartedEvent) -> Result<Self, Self::Error> {
940 let unsigned_commitment_tx = base64::engine::GeneralPurpose::new(
941 &base64::alphabet::STANDARD,
942 base64::engine::GeneralPurposeConfig::new(),
943 )
944 .decode(&value.unsigned_commitment_tx)
945 .map_err(Error::conversion)?;
946
947 let unsigned_commitment_tx =
948 Psbt::deserialize(&unsigned_commitment_tx).map_err(Error::conversion)?;
949
950 Ok(TreeSigningStartedEvent {
951 id: value.id,
952 cosigners_pubkeys: value
953 .cosigners_pubkeys
954 .into_iter()
955 .map(|pk| pk.parse().map_err(Error::conversion))
956 .collect::<Result<Vec<_>, Error>>()?,
957 unsigned_commitment_tx,
958 })
959 }
960}
961
962impl TryFrom<generated::ark::v1::TreeNoncesAggregatedEvent> for TreeNoncesAggregatedEvent {
963 type Error = Error;
964
965 fn try_from(value: generated::ark::v1::TreeNoncesAggregatedEvent) -> Result<Self, Self::Error> {
966 let tree_nonces = NoncePks::decode(value.tree_nonces).map_err(Error::conversion)?;
967
968 Ok(TreeNoncesAggregatedEvent {
969 id: value.id,
970 tree_nonces,
971 })
972 }
973}
974
975impl TryFrom<generated::ark::v1::TreeTxEvent> for TreeTxEvent {
976 type Error = Error;
977
978 fn try_from(value: generated::ark::v1::TreeTxEvent) -> Result<Self, Self::Error> {
979 let batch_tree_event_type = match value.batch_index {
980 0 => BatchTreeEventType::Vtxo,
981 1 => BatchTreeEventType::Connector,
982 n => return Err(Error::conversion(format!("unsupported batch index: {n}"))),
983 };
984
985 let txid = if value.txid.is_empty() {
986 None
987 } else {
988 Some(value.txid.parse().map_err(Error::conversion)?)
989 };
990
991 let base64 = &base64::engine::GeneralPurpose::new(
992 &base64::alphabet::STANDARD,
993 base64::engine::GeneralPurposeConfig::new(),
994 );
995
996 let bytes = base64.decode(&value.tx).map_err(Error::conversion)?;
997 let tx = Psbt::deserialize(&bytes).map_err(Error::conversion)?;
998
999 let children = value
1000 .children
1001 .iter()
1002 .map(|(index, txid)| Ok((*index, txid.parse().map_err(Error::conversion)?)))
1003 .collect::<Result<HashMap<_, _>, Error>>()?;
1004
1005 Ok(Self {
1006 id: value.id,
1007 topic: value.topic,
1008 batch_tree_event_type,
1009 tx_graph_chunk: TxGraphChunk { txid, tx, children },
1010 })
1011 }
1012}
1013
1014impl TryFrom<generated::ark::v1::TreeSignatureEvent> for TreeSignatureEvent {
1015 type Error = Error;
1016
1017 fn try_from(value: generated::ark::v1::TreeSignatureEvent) -> Result<Self, Self::Error> {
1018 let batch_tree_event_type = match value.batch_index {
1019 0 => BatchTreeEventType::Vtxo,
1020 1 => BatchTreeEventType::Connector,
1021 n => return Err(Error::conversion(format!("unsupported batch index: {n}"))),
1022 };
1023
1024 let txid = value.txid.parse().map_err(Error::conversion)?;
1025
1026 let signature = Vec::from_hex(&value.signature).map_err(Error::conversion)?;
1027 let signature = Signature::from_slice(&signature).map_err(Error::conversion)?;
1028
1029 Ok(Self {
1030 id: value.id,
1031 topic: value.topic,
1032 batch_tree_event_type,
1033 txid,
1034 signature,
1035 })
1036 }
1037}
1038
1039impl TryFrom<generated::ark::v1::TreeNoncesEvent> for TreeNoncesEvent {
1040 type Error = Error;
1041
1042 fn try_from(value: generated::ark::v1::TreeNoncesEvent) -> Result<Self, Self::Error> {
1043 let txid = value.txid.parse().map_err(Error::conversion)?;
1044
1045 let nonces = TreeTxNoncePks::decode(value.nonces).map_err(Error::conversion)?;
1046
1047 Ok(Self {
1048 id: value.id,
1049 topic: value.topic,
1050 txid,
1051 nonces,
1052 })
1053 }
1054}
1055
1056impl TryFrom<generated::ark::v1::get_event_stream_response::Event> for StreamEvent {
1057 type Error = Error;
1058
1059 fn try_from(
1060 value: generated::ark::v1::get_event_stream_response::Event,
1061 ) -> Result<Self, Self::Error> {
1062 Ok(match value {
1063 generated::ark::v1::get_event_stream_response::Event::StreamStarted(e) => {
1064 StreamEvent::StreamStarted(e.try_into()?)
1065 }
1066 generated::ark::v1::get_event_stream_response::Event::BatchStarted(e) => {
1067 StreamEvent::BatchStarted(e.try_into()?)
1068 }
1069 generated::ark::v1::get_event_stream_response::Event::BatchFinalization(e) => {
1070 StreamEvent::BatchFinalization(e.try_into()?)
1071 }
1072 generated::ark::v1::get_event_stream_response::Event::BatchFinalized(e) => {
1073 StreamEvent::BatchFinalized(e.try_into()?)
1074 }
1075 generated::ark::v1::get_event_stream_response::Event::BatchFailed(e) => {
1076 StreamEvent::BatchFailed(e.into())
1077 }
1078 generated::ark::v1::get_event_stream_response::Event::TreeSigningStarted(e) => {
1079 StreamEvent::TreeSigningStarted(e.try_into()?)
1080 }
1081 generated::ark::v1::get_event_stream_response::Event::TreeNoncesAggregated(e) => {
1082 StreamEvent::TreeNoncesAggregated(e.try_into()?)
1083 }
1084 generated::ark::v1::get_event_stream_response::Event::TreeTx(e) => {
1085 StreamEvent::TreeTx(e.try_into()?)
1086 }
1087 generated::ark::v1::get_event_stream_response::Event::TreeSignature(e) => {
1088 StreamEvent::TreeSignature(e.try_into()?)
1089 }
1090 generated::ark::v1::get_event_stream_response::Event::TreeNonces(e) => {
1091 StreamEvent::TreeNonces(e.try_into()?)
1092 }
1093 generated::ark::v1::get_event_stream_response::Event::Heartbeat(_) => {
1094 StreamEvent::Heartbeat
1095 }
1096 })
1097 }
1098}
1099
1100impl TryFrom<generated::ark::v1::get_transactions_stream_response::Data> for StreamTransactionData {
1101 type Error = Error;
1102
1103 fn try_from(
1104 value: generated::ark::v1::get_transactions_stream_response::Data,
1105 ) -> Result<Self, Self::Error> {
1106 match value {
1107 generated::ark::v1::get_transactions_stream_response::Data::CommitmentTx(
1108 commitment_tx,
1109 ) => Ok(StreamTransactionData::Commitment(
1110 CommitmentTransaction::try_from(commitment_tx)?,
1111 )),
1112 generated::ark::v1::get_transactions_stream_response::Data::ArkTx(redeem) => Ok(
1113 StreamTransactionData::Ark(ArkTransaction::try_from(redeem)?),
1114 ),
1115 generated::ark::v1::get_transactions_stream_response::Data::Heartbeat(_) => {
1116 Ok(StreamTransactionData::Heartbeat)
1117 }
1118 generated::ark::v1::get_transactions_stream_response::Data::SweepTx(tx) => {
1119 Ok(StreamTransactionData::Ark(ArkTransaction::try_from(tx)?))
1120 }
1121 }
1122 }
1123}
1124
1125impl TryFrom<generated::ark::v1::TxNotification> for CommitmentTransaction {
1126 type Error = Error;
1127
1128 fn try_from(value: generated::ark::v1::TxNotification) -> Result<Self, Self::Error> {
1129 let spent_vtxos = value
1130 .spent_vtxos
1131 .iter()
1132 .map(VirtualTxOutPoint::try_from)
1133 .collect::<Result<Vec<_>, _>>()?;
1134
1135 let spendable_vtxos = value
1136 .spendable_vtxos
1137 .iter()
1138 .map(VirtualTxOutPoint::try_from)
1139 .collect::<Result<Vec<_>, _>>()?;
1140
1141 Ok(CommitmentTransaction {
1142 txid: Txid::from_str(value.txid.as_str()).map_err(Error::conversion)?,
1143 spent_vtxos,
1144 unspent_vtxos: spendable_vtxos,
1145 })
1146 }
1147}
1148
1149impl TryFrom<generated::ark::v1::TxNotification> for ArkTransaction {
1150 type Error = Error;
1151
1152 fn try_from(value: generated::ark::v1::TxNotification) -> Result<Self, Self::Error> {
1153 let spent_vtxos = value
1154 .spent_vtxos
1155 .iter()
1156 .map(VirtualTxOutPoint::try_from)
1157 .collect::<Result<Vec<_>, _>>()?;
1158
1159 let spendable_vtxos = value
1160 .spendable_vtxos
1161 .iter()
1162 .map(VirtualTxOutPoint::try_from)
1163 .collect::<Result<Vec<_>, _>>()?;
1164
1165 let tx = if value.tx.is_empty() {
1166 None
1167 } else {
1168 let base64 = base64::engine::GeneralPurpose::new(
1169 &base64::alphabet::STANDARD,
1170 base64::engine::GeneralPurposeConfig::new(),
1171 );
1172 let bytes = base64.decode(&value.tx).map_err(Error::conversion)?;
1173 Some(Psbt::deserialize(&bytes).map_err(Error::conversion)?)
1174 };
1175
1176 let checkpoint_txs = value
1177 .checkpoint_txs
1178 .into_iter()
1179 .map(|(k, v)| {
1180 let out_point = OutPoint::from_str(k.as_str()).map_err(Error::conversion)?;
1181 let txid = v.txid.parse().map_err(Error::conversion)?;
1182 Ok((out_point, txid))
1183 })
1184 .collect::<Result<HashMap<_, _>, Error>>()?;
1185
1186 let swept_vtxos = value
1187 .swept_vtxos
1188 .into_iter()
1189 .map(OutPoint::try_from)
1190 .collect::<Result<Vec<_>, _>>()?;
1191
1192 Ok(ArkTransaction {
1193 txid: Txid::from_str(value.txid.as_str()).map_err(Error::conversion)?,
1194 tx,
1195 spent_vtxos,
1196 unspent_vtxos: spendable_vtxos,
1197 checkpoint_txs,
1198 swept_vtxos,
1199 })
1200 }
1201}
1202
1203impl TryFrom<Outpoint> for OutPoint {
1204 type Error = Error;
1205
1206 fn try_from(value: Outpoint) -> Result<Self, Self::Error> {
1207 let point = OutPoint {
1208 txid: Txid::from_str(value.txid.as_str()).map_err(Error::conversion)?,
1209 vout: value.vout,
1210 };
1211 Ok(point)
1212 }
1213}
1214
1215pub struct VtxoChainResponse {
1216 pub chains: VtxoChains,
1217 pub page: Option<IndexerPage>,
1218}
1219
1220pub struct ListVtxosResponse {
1221 pub vtxos: Vec<VirtualTxOutPoint>,
1222 pub page: Option<IndexerPage>,
1223}
1224
1225impl TryFrom<generated::ark::v1::GetVtxoChainResponse> for VtxoChainResponse {
1226 type Error = Error;
1227
1228 fn try_from(value: generated::ark::v1::GetVtxoChainResponse) -> Result<Self, Self::Error> {
1229 let chains = value
1230 .chain
1231 .iter()
1232 .map(VtxoChain::try_from)
1233 .collect::<Result<Vec<_>, Error>>()?;
1234
1235 Ok(VtxoChainResponse {
1236 chains: VtxoChains { inner: chains },
1237 page: value
1238 .page
1239 .map(IndexerPage::try_from)
1240 .transpose()
1241 .map_err(Error::conversion)?,
1242 })
1243 }
1244}
1245
1246impl TryFrom<generated::ark::v1::GetVirtualTxsResponse> for VirtualTxsResponse {
1247 type Error = Error;
1248
1249 fn try_from(value: generated::ark::v1::GetVirtualTxsResponse) -> Result<Self, Self::Error> {
1250 let base64 = &base64::engine::GeneralPurpose::new(
1251 &base64::alphabet::STANDARD,
1252 base64::engine::GeneralPurposeConfig::new(),
1253 );
1254
1255 let txs = value
1256 .txs
1257 .into_iter()
1258 .map(|tx| {
1259 let bytes = base64.decode(&tx).map_err(Error::conversion)?;
1260 let psbt = Psbt::deserialize(&bytes).map_err(Error::conversion)?;
1261
1262 Ok(psbt)
1263 })
1264 .collect::<Result<Vec<_>, _>>()?;
1265
1266 Ok(VirtualTxsResponse {
1267 txs,
1268 page: value
1269 .page
1270 .map(IndexerPage::try_from)
1271 .transpose()
1272 .map_err(Error::conversion)?,
1273 })
1274 }
1275}
1276
1277impl TryFrom<&generated::ark::v1::IndexerChain> for VtxoChain {
1278 type Error = Error;
1279
1280 fn try_from(value: &generated::ark::v1::IndexerChain) -> Result<Self, Self::Error> {
1281 let spends = value
1282 .spends
1283 .iter()
1284 .map(|txid| {
1285 let txid_str = if txid.len() == 66 { &txid[..64] } else { txid };
1287 txid_str.parse().map_err(Error::conversion)
1288 })
1289 .collect::<Result<Vec<_>, Error>>()?;
1290
1291 let tx_type = match value.r#type() {
1292 IndexerChainedTxType::Unspecified => ChainedTxType::Unspecified,
1293 IndexerChainedTxType::Commitment => ChainedTxType::Commitment,
1294 IndexerChainedTxType::Ark => ChainedTxType::Ark,
1295 IndexerChainedTxType::Tree => ChainedTxType::Tree,
1296 IndexerChainedTxType::Checkpoint => ChainedTxType::Checkpoint,
1297 };
1298
1299 Ok(VtxoChain {
1300 txid: value.txid.parse().map_err(Error::conversion)?,
1301 tx_type,
1302 spends,
1303 expires_at: value.expires_at,
1304 })
1305 }
1306}
1307
1308impl From<generated::ark::v1::IndexerPageResponse> for IndexerPage {
1309 fn from(value: generated::ark::v1::IndexerPageResponse) -> Self {
1310 IndexerPage {
1311 current: value.current,
1312 next: value.next,
1313 total: value.total,
1314 }
1315 }
1316}
1317
1318impl TryFrom<&generated::ark::v1::IndexerTxHistoryRecord> for history::Transaction {
1319 type Error = Error;
1320
1321 fn try_from(value: &generated::ark::v1::IndexerTxHistoryRecord) -> Result<Self, Self::Error> {
1322 let sign = match value.r#type() {
1323 generated::ark::v1::IndexerTxType::Received => 1,
1324 generated::ark::v1::IndexerTxType::Sent
1326 | generated::ark::v1::IndexerTxType::Unspecified => -1,
1327 };
1328
1329 let amount = SignedAmount::from_sat(value.amount as i64 * sign);
1330
1331 let tx = match &value.key {
1332 Some(Key::CommitmentTxid(txid)) => history::Transaction::Commitment {
1333 txid: txid.parse().map_err(Error::conversion)?,
1334 amount,
1335 created_at: value.created_at,
1336 },
1337 Some(Key::VirtualTxid(txid)) => history::Transaction::Ark {
1338 txid: txid.parse().map_err(Error::conversion)?,
1339 amount,
1340 is_settled: value.is_settled,
1341 created_at: value.created_at,
1342 },
1343 None => return Err(Error::conversion("invalid transaction without key")),
1344 };
1345
1346 Ok(tx)
1347 }
1348}
1349
1350impl TryFrom<generated::ark::v1::GetSubscriptionResponse> for SubscriptionResponse {
1351 type Error = Error;
1352
1353 fn try_from(value: generated::ark::v1::GetSubscriptionResponse) -> Result<Self, Self::Error> {
1354 let value = match value.data {
1355 Some(get_subscription_response::Data::Heartbeat(_)) => return Ok(Self::Heartbeat),
1356 Some(get_subscription_response::Data::Event(event)) => event,
1357 None => return Err(Error::conversion("empty subscription response")),
1358 };
1359
1360 let txid = value.txid.parse().map_err(Error::conversion)?;
1361
1362 let new_vtxos = value
1363 .new_vtxos
1364 .iter()
1365 .map(VirtualTxOutPoint::try_from)
1366 .collect::<Result<Vec<_>, _>>()?;
1367
1368 let spent_vtxos = value
1369 .spent_vtxos
1370 .iter()
1371 .map(VirtualTxOutPoint::try_from)
1372 .collect::<Result<Vec<_>, _>>()?;
1373
1374 let tx = if value.tx.is_empty() {
1375 None
1376 } else {
1377 match Vec::from_hex(&value.tx)
1378 .ok()
1379 .and_then(|bytes| bitcoin::consensus::deserialize::<Transaction>(&bytes).ok())
1380 {
1381 Some(raw_tx) => Some(raw_tx),
1382 None => {
1383 let base64 = base64::engine::GeneralPurpose::new(
1384 &base64::alphabet::STANDARD,
1385 base64::engine::GeneralPurposeConfig::new(),
1386 );
1387 let bytes = base64.decode(&value.tx).map_err(|e| {
1388 let tx_prefix = value.tx.chars().take(24).collect::<String>();
1389 Error::conversion(format!(
1390 "invalid subscription tx payload (not hex tx or base64 psbt) (len={}, tx_prefix='{}'): {e}",
1391 value.tx.len(),
1392 tx_prefix
1393 ))
1394 })?;
1395 let psbt = Psbt::deserialize(&bytes).map_err(|e| {
1396 let tx_prefix = value.tx.chars().take(24).collect::<String>();
1397 Error::conversion(format!(
1398 "invalid subscription tx payload (base64 but not psbt) (len={}, tx_prefix='{}'): {e}",
1399 value.tx.len(),
1400 tx_prefix
1401 ))
1402 })?;
1403 Some(psbt.unsigned_tx)
1404 }
1405 }
1406 };
1407
1408 let checkpoint_txs = value
1409 .checkpoint_txs
1410 .into_iter()
1411 .map(|(k, v)| {
1412 let out_point = OutPoint::from_str(k.as_str()).map_err(Error::conversion)?;
1413 let txid = v.txid.parse().map_err(Error::conversion)?;
1414 Ok((out_point, txid))
1415 })
1416 .collect::<Result<HashMap<_, _>, Error>>()?;
1417
1418 let scripts = value
1419 .scripts
1420 .iter()
1421 .map(|h| ScriptBuf::from_hex(h).map_err(Error::conversion))
1422 .collect::<Result<Vec<_>, _>>()?;
1423
1424 Ok(Self::Event(Box::new(SubscriptionEvent {
1425 txid,
1426 scripts,
1427 new_vtxos,
1428 spent_vtxos,
1429 tx,
1430 checkpoint_txs,
1431 })))
1432 }
1433}
1434
1435impl From<GetVtxosRequest> for generated::ark::v1::GetVtxosRequest {
1436 fn from(value: GetVtxosRequest) -> Self {
1437 let (spendable_only, spent_only, recoverable_only, pending_only) = match value.filter() {
1438 Some(GetVtxosRequestFilter::Spendable) => (true, false, false, false),
1439 Some(GetVtxosRequestFilter::Spent) => (false, true, false, false),
1440 Some(GetVtxosRequestFilter::Recoverable) => (false, false, true, false),
1441 Some(GetVtxosRequestFilter::PendingOnly) => (false, false, false, true),
1442 None => (false, false, false, false),
1443 };
1444
1445 let page = value
1446 .page()
1447 .map(|p| generated::ark::v1::IndexerPageRequest {
1448 size: p.size,
1449 index: p.index,
1450 });
1451
1452 match value.reference() {
1453 GetVtxosRequestReference::Scripts(script_bufs) => Self {
1454 scripts: script_bufs.iter().map(|s| s.to_hex_string()).collect(),
1455 outpoints: Vec::new(),
1456 spendable_only,
1457 spent_only,
1458 recoverable_only,
1459 page,
1460 pending_only,
1461 after: value.after().unwrap_or(0) as i64,
1462 before: value.before().unwrap_or(0) as i64,
1463 },
1464 GetVtxosRequestReference::OutPoints(outpoints) => Self {
1465 scripts: Vec::new(),
1466 outpoints: outpoints.iter().map(|o| o.to_string()).collect(),
1467 spendable_only,
1468 spent_only,
1469 recoverable_only,
1470 page,
1471 pending_only,
1472 after: value.after().unwrap_or(0) as i64,
1473 before: value.before().unwrap_or(0) as i64,
1474 },
1475 }
1476 }
1477}
1478
1479mod guarded {
1480 use super::InterceptedChannel;
1494 use super::SharedState;
1495 use crate::generated::ark::v1::ark_service_client::ArkServiceClient;
1496 use crate::generated::ark::v1::indexer_service_client::IndexerServiceClient;
1497 use crate::Error;
1498 use ark_core::server::Info;
1499 use futures::Future;
1500
1501 #[derive(Clone)]
1502 pub(super) struct Ark {
1503 raw: ArkServiceClient<InterceptedChannel>,
1504 shared: SharedState,
1505 }
1506
1507 impl Ark {
1508 pub(super) fn new(raw: ArkServiceClient<InterceptedChannel>, shared: SharedState) -> Self {
1509 Self { raw, shared }
1510 }
1511
1512 pub(super) async fn request<T, F, Fut>(&self, f: F) -> Result<T, Error>
1520 where
1521 F: FnOnce(ArkServiceClient<InterceptedChannel>) -> Fut,
1522 Fut: Future<Output = Result<T, tonic::Status>>,
1523 {
1524 let client = self.raw.clone();
1525 let info_client = self.raw.clone();
1526
1527 self.shared
1528 .guarded(info_client, async move {
1529 f(client).await.map_err(Error::request)
1530 })
1531 .await
1532 }
1533
1534 pub(super) async fn get_info(&self) -> Result<Info, Error> {
1535 self.shared.get_info_unguarded(self.raw.clone()).await
1536 }
1537 }
1538
1539 #[derive(Clone)]
1540 pub(super) struct Indexer {
1541 raw: IndexerServiceClient<InterceptedChannel>,
1542 info_client: ArkServiceClient<InterceptedChannel>,
1543 shared: SharedState,
1544 }
1545
1546 impl Indexer {
1547 pub(super) fn new(
1548 raw: IndexerServiceClient<InterceptedChannel>,
1549 info_client: ArkServiceClient<InterceptedChannel>,
1550 shared: SharedState,
1551 ) -> Self {
1552 Self {
1553 raw,
1554 info_client,
1555 shared,
1556 }
1557 }
1558
1559 pub(super) async fn request<T, F, Fut>(&self, f: F) -> Result<T, Error>
1567 where
1568 F: FnOnce(IndexerServiceClient<InterceptedChannel>) -> Fut,
1569 Fut: Future<Output = Result<T, tonic::Status>>,
1570 {
1571 let client = self.raw.clone();
1572 let info_client = self.info_client.clone();
1573
1574 self.shared
1575 .guarded(info_client, async move {
1576 f(client).await.map_err(Error::request)
1577 })
1578 .await
1579 }
1580 }
1581}