Skip to main content

ark_grpc/
client.rs

1use crate::generated;
2use crate::generated::ark::v1::ark_service_client::ArkServiceClient;
3use crate::generated::ark::v1::get_subscription_response;
4use crate::generated::ark::v1::indexer_service_client::IndexerServiceClient;
5use crate::generated::ark::v1::indexer_tx_history_record::Key;
6use crate::generated::ark::v1::ConfirmRegistrationRequest;
7use crate::generated::ark::v1::EstimateIntentFeeRequest;
8use crate::generated::ark::v1::GetEventStreamRequest;
9use crate::generated::ark::v1::GetInfoRequest;
10use crate::generated::ark::v1::GetSubscriptionRequest;
11use crate::generated::ark::v1::GetTransactionsStreamRequest;
12use crate::generated::ark::v1::IndexerChainedTxType;
13use crate::generated::ark::v1::Intent;
14use crate::generated::ark::v1::Outpoint;
15use crate::generated::ark::v1::RegisterIntentRequest;
16use crate::generated::ark::v1::SubmitSignedForfeitTxsRequest;
17use crate::generated::ark::v1::SubmitTreeNoncesRequest;
18use crate::generated::ark::v1::SubmitTreeSignaturesRequest;
19use crate::generated::ark::v1::SubscribeForScriptsRequest;
20use crate::generated::ark::v1::UnsubscribeForScriptsRequest;
21use crate::Error;
22use ark_core::asset::AssetId;
23use ark_core::history;
24use ark_core::server::parse_sequence_number;
25use ark_core::server::ArkTransaction;
26use ark_core::server::AssetInfo;
27use ark_core::server::BatchFailed;
28use ark_core::server::BatchFinalizationEvent;
29use ark_core::server::BatchFinalizedEvent;
30use ark_core::server::BatchStartedEvent;
31use ark_core::server::BatchTreeEventType;
32use ark_core::server::ChainedTxType;
33use ark_core::server::CommitmentTransaction;
34use ark_core::server::FinalizeOffchainTxResponse;
35use ark_core::server::GetVtxosRequest;
36use ark_core::server::GetVtxosRequestFilter;
37use ark_core::server::GetVtxosRequestReference;
38use ark_core::server::IndexerPage;
39use ark_core::server::Info;
40use ark_core::server::NoncePks;
41use ark_core::server::PartialSigTree;
42use ark_core::server::StreamEvent;
43use ark_core::server::StreamStartedEvent;
44use ark_core::server::StreamTransactionData;
45use ark_core::server::SubmitOffchainTxResponse;
46use ark_core::server::SubscriptionEvent;
47use ark_core::server::SubscriptionResponse;
48use ark_core::server::TreeNoncesAggregatedEvent;
49use ark_core::server::TreeNoncesEvent;
50use ark_core::server::TreeSignatureEvent;
51use ark_core::server::TreeSigningStartedEvent;
52use ark_core::server::TreeTxEvent;
53use ark_core::server::TreeTxNoncePks;
54use ark_core::server::VirtualTxOutPoint;
55use ark_core::server::VirtualTxsResponse;
56use ark_core::server::VtxoChain;
57use ark_core::server::VtxoChains;
58use ark_core::server::SDK_VERSION;
59use ark_core::server::TARGET_ARKD_VERSION;
60use ark_core::ArkAddress;
61use ark_core::TxGraphChunk;
62use async_stream::stream;
63use base64::Engine;
64use bitcoin::hex::FromHex;
65use bitcoin::secp256k1::PublicKey;
66use bitcoin::taproot::Signature;
67use bitcoin::OutPoint;
68use bitcoin::Psbt;
69use bitcoin::ScriptBuf;
70use bitcoin::SignedAmount;
71use bitcoin::Transaction;
72use bitcoin::Txid;
73use futures::Future;
74use futures::Stream;
75use futures::StreamExt;
76use futures::TryStreamExt;
77use std::collections::HashMap;
78use std::error::Error as StdError;
79use std::fmt;
80use std::str::FromStr;
81use std::sync::Arc;
82use std::sync::RwLock;
83
84#[derive(Clone, Default)]
85struct HeaderState {
86    digest: Arc<RwLock<Option<String>>>,
87}
88
89impl HeaderState {
90    fn set_digest(&self, digest: String) {
91        let digest = (!digest.is_empty()).then_some(digest);
92        match self.digest.write() {
93            Ok(mut guard) => *guard = digest,
94            Err(poisoned) => {
95                log::warn!("digest header state lock poisoned while updating; recovering");
96                *poisoned.into_inner() = digest;
97            }
98        }
99    }
100
101    fn digest(&self) -> Option<String> {
102        match self.digest.read() {
103            Ok(guard) => guard.clone(),
104            Err(poisoned) => {
105                log::warn!("digest header state lock poisoned while reading; recovering");
106                poisoned.into_inner().clone()
107            }
108        }
109    }
110}
111
112#[derive(Clone, Default)]
113struct HeaderInterceptor {
114    state: HeaderState,
115}
116
117impl tonic::service::Interceptor for HeaderInterceptor {
118    fn call(&mut self, mut req: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
119        let metadata = req.metadata_mut();
120        metadata.insert(
121            "x-build-version",
122            tonic::metadata::MetadataValue::from_static(TARGET_ARKD_VERSION),
123        );
124        metadata.insert(
125            "x-sdk-version",
126            tonic::metadata::MetadataValue::from_static(SDK_VERSION),
127        );
128
129        if let Some(digest) = self.state.digest() {
130            if let Ok(value) = tonic::metadata::MetadataValue::try_from(digest.as_str()) {
131                metadata.insert("x-digest", value);
132            }
133        }
134
135        Ok(req)
136    }
137}
138
139type InterceptedChannel =
140    tonic::codegen::InterceptedService<tonic::transport::Channel, HeaderInterceptor>;
141
142type InfoRefreshHook =
143    Arc<dyn Fn(Info) -> Result<(), Box<dyn StdError + Send + Sync + 'static>> + Send + Sync>;
144
145#[derive(Clone, Default)]
146struct SharedState {
147    headers: HeaderState,
148    info_refresh_hook: Arc<RwLock<Option<InfoRefreshHook>>>,
149}
150
151impl SharedState {
152    fn set_info_refresh_hook(&self, hook: InfoRefreshHook) {
153        match self.info_refresh_hook.write() {
154            Ok(mut guard) => *guard = Some(hook),
155            Err(poisoned) => {
156                log::warn!("info refresh hook lock poisoned while updating; recovering");
157                *poisoned.into_inner() = Some(hook);
158            }
159        }
160    }
161
162    /// Runs one RPC operation with digest-mismatch handling.
163    ///
164    /// If the server rejects the operation because our cached `/info` digest is stale,
165    /// this fetches fresh `/info` through the supplied Ark client, runs the refresh hook,
166    /// commits the new digest header, and returns `ServerInfoChanged` for the original
167    /// failure. The original operation is never retried automatically.
168    async fn guarded<T>(
169        &self,
170        info_client: ArkServiceClient<InterceptedChannel>,
171        op: impl Future<Output = Result<T, Error>>,
172    ) -> Result<T, Error> {
173        match op.await {
174            Ok(value) => Ok(value),
175            Err(err) if err.is_digest_mismatch() => {
176                let original = err;
177                let info = self.fetch_info_unguarded(info_client).await?;
178                let digest = info.digest.clone();
179
180                if let Some(hook) = self.info_refresh_hook() {
181                    hook(info).map_err(Error::conversion)?;
182                }
183
184                self.headers.set_digest(digest);
185                Err(Error::server_info_changed(original))
186            }
187            Err(err) => Err(err),
188        }
189    }
190
191    fn info_refresh_hook(&self) -> Option<InfoRefreshHook> {
192        match self.info_refresh_hook.read() {
193            Ok(hook) => hook.clone(),
194            Err(poisoned) => {
195                log::warn!("info refresh hook lock poisoned while reading; recovering");
196                poisoned.into_inner().clone()
197            }
198        }
199    }
200
201    async fn get_info_unguarded(
202        &self,
203        client: ArkServiceClient<InterceptedChannel>,
204    ) -> Result<Info, Error> {
205        let info = self.fetch_info_unguarded(client).await?;
206        self.headers.set_digest(info.digest.clone());
207        Ok(info)
208    }
209
210    async fn fetch_info_unguarded(
211        &self,
212        mut client: ArkServiceClient<InterceptedChannel>,
213    ) -> Result<Info, Error> {
214        let response = client
215            .get_info(GetInfoRequest {})
216            .await
217            .map_err(Error::request)?;
218
219        response.into_inner().try_into()
220    }
221}
222
223#[derive(Clone)]
224pub struct Client {
225    url: String,
226    ark: Option<guarded::Ark>,
227    indexer: Option<guarded::Indexer>,
228    shared: SharedState,
229}
230
231impl fmt::Debug for Client {
232    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
233        f.debug_struct("Client")
234            .field("url", &self.url)
235            .field("connected", &self.ark.is_some())
236            .finish()
237    }
238}
239
240impl Client {
241    pub fn new(url: String) -> Self {
242        Self {
243            url,
244            ark: None,
245            indexer: None,
246            shared: SharedState::default(),
247        }
248    }
249
250    pub fn set_info_refresh_hook(
251        &mut self,
252        hook: impl Fn(Info) -> Result<(), Box<dyn StdError + Send + Sync + 'static>>
253            + Send
254            + Sync
255            + 'static,
256    ) {
257        self.shared.set_info_refresh_hook(Arc::new(hook));
258    }
259
260    pub async fn connect(&mut self) -> Result<(), Error> {
261        let endpoint =
262            tonic::transport::Endpoint::from_shared(self.url.clone()).map_err(Error::connect)?;
263
264        #[cfg(any(feature = "tls-webpki-roots", feature = "tls-native-roots"))]
265        let endpoint = {
266            let tls = tonic::transport::ClientTlsConfig::new();
267            #[cfg(feature = "tls-webpki-roots")]
268            let tls = tls.with_webpki_roots();
269            #[cfg(feature = "tls-native-roots")]
270            let tls = tls.with_native_roots();
271            endpoint.tls_config(tls).map_err(Error::connect)?
272        };
273
274        let channel = endpoint.connect().await.map_err(Error::connect)?;
275
276        let interceptor = HeaderInterceptor {
277            state: self.shared.headers.clone(),
278        };
279        let ark_service_client =
280            ArkServiceClient::with_interceptor(channel.clone(), interceptor.clone());
281        let indexer_client = IndexerServiceClient::with_interceptor(channel, interceptor);
282
283        self.ark = Some(guarded::Ark::new(
284            ark_service_client.clone(),
285            self.shared.clone(),
286        ));
287        self.indexer = Some(guarded::Indexer::new(
288            indexer_client,
289            ark_service_client,
290            self.shared.clone(),
291        ));
292        Ok(())
293    }
294
295    pub async fn get_info(&self) -> Result<Info, Error> {
296        self.ark()?.get_info().await
297    }
298
299    /// List VTXOs with pagination support.
300    /// Returns a single page of results along with pagination info.
301    pub async fn list_vtxos(&self, request: GetVtxosRequest) -> Result<ListVtxosResponse, Error> {
302        if request.reference().is_empty() {
303            return Ok(ListVtxosResponse {
304                vtxos: Vec::new(),
305                page: None,
306            });
307        }
308
309        let response = self
310            .indexer()?
311            .request(move |mut client| async move {
312                client
313                    .get_vtxos(generated::ark::v1::GetVtxosRequest::from(request))
314                    .await
315            })
316            .await?;
317
318        let inner = response.into_inner();
319
320        let vtxos = inner
321            .vtxos
322            .iter()
323            .map(VirtualTxOutPoint::try_from)
324            .collect::<Result<Vec<_>, _>>()?;
325
326        let page = inner
327            .page
328            .map(IndexerPage::try_from)
329            .transpose()
330            .map_err(Error::conversion)?;
331
332        Ok(ListVtxosResponse { vtxos, page })
333    }
334
335    pub async fn register_intent(&self, intent: ark_core::intent::Intent) -> Result<String, Error> {
336        let intent = intent.try_into()?;
337        let request = RegisterIntentRequest {
338            intent: Some(intent),
339        };
340
341        let response = self
342            .ark()?
343            .request(move |mut client| async move { client.register_intent(request).await })
344            .await?;
345
346        let intent_id = response.into_inner().intent_id;
347
348        Ok(intent_id)
349    }
350
351    pub async fn submit_offchain_transaction_request(
352        &self,
353        ark_tx: Psbt,
354        checkpoint_txs: Vec<Psbt>,
355    ) -> Result<SubmitOffchainTxResponse, Error> {
356        let base64 = base64::engine::GeneralPurpose::new(
357            &base64::alphabet::STANDARD,
358            base64::engine::GeneralPurposeConfig::new(),
359        );
360
361        let ark_tx = base64.encode(ark_tx.serialize());
362
363        let checkpoint_txs = checkpoint_txs
364            .into_iter()
365            .map(|tx| base64.encode(tx.serialize()))
366            .collect();
367
368        let res = self
369            .ark()?
370            .request(move |mut client| async move {
371                client
372                    .submit_tx(generated::ark::v1::SubmitTxRequest {
373                        signed_ark_tx: ark_tx,
374                        checkpoint_txs,
375                    })
376                    .await
377            })
378            .await?;
379
380        let res = res.into_inner();
381
382        let signed_ark_tx = res.final_ark_tx;
383        let signed_ark_tx = base64.decode(signed_ark_tx).map_err(Error::conversion)?;
384        let signed_ark_tx = Psbt::deserialize(&signed_ark_tx).map_err(Error::conversion)?;
385
386        let signed_checkpoint_txs = res
387            .signed_checkpoint_txs
388            .into_iter()
389            .map(|tx| {
390                let tx = base64.decode(tx).map_err(Error::conversion)?;
391                let tx = Psbt::deserialize(&tx).map_err(Error::conversion)?;
392
393                Ok(tx)
394            })
395            .collect::<Result<Vec<_>, Error>>()?;
396
397        Ok(SubmitOffchainTxResponse {
398            signed_ark_tx,
399            signed_checkpoint_txs,
400        })
401    }
402
403    pub async fn finalize_offchain_transaction(
404        &self,
405        txid: Txid,
406        checkpoint_txs: Vec<Psbt>,
407    ) -> Result<FinalizeOffchainTxResponse, Error> {
408        let base64 = base64::engine::GeneralPurpose::new(
409            &base64::alphabet::STANDARD,
410            base64::engine::GeneralPurposeConfig::new(),
411        );
412
413        let checkpoint_txs = checkpoint_txs
414            .into_iter()
415            .map(|tx| base64.encode(tx.serialize()))
416            .collect();
417
418        self.ark()?
419            .request(move |mut client| async move {
420                client
421                    .finalize_tx(generated::ark::v1::FinalizeTxRequest {
422                        ark_txid: txid.to_string(),
423                        final_checkpoint_txs: checkpoint_txs,
424                    })
425                    .await
426            })
427            .await?;
428
429        Ok(FinalizeOffchainTxResponse {})
430    }
431
432    pub async fn get_pending_tx(
433        &self,
434        intent: ark_core::intent::Intent,
435    ) -> Result<Vec<ark_core::server::PendingTx>, Error> {
436        let intent: Intent = intent.try_into()?;
437
438        let res = self
439            .ark()?
440            .request(move |mut client| async move {
441                client
442                    .get_pending_tx(generated::ark::v1::GetPendingTxRequest {
443                        identifier: Some(
444                            generated::ark::v1::get_pending_tx_request::Identifier::Intent(intent),
445                        ),
446                    })
447                    .await
448            })
449            .await?;
450
451        let inner = res.into_inner();
452        let base64 = base64::engine::GeneralPurpose::new(
453            &base64::alphabet::STANDARD,
454            base64::engine::GeneralPurposeConfig::new(),
455        );
456
457        inner
458            .pending_txs
459            .into_iter()
460            .map(|tx| {
461                let ark_txid = tx.ark_txid.parse().map_err(Error::conversion)?;
462
463                let signed_ark_tx = base64.decode(&tx.final_ark_tx).map_err(Error::conversion)?;
464                let signed_ark_tx = Psbt::deserialize(&signed_ark_tx).map_err(Error::conversion)?;
465
466                let signed_checkpoint_txs = tx
467                    .signed_checkpoint_txs
468                    .into_iter()
469                    .map(|cp| {
470                        let bytes = base64.decode(cp).map_err(Error::conversion)?;
471                        Psbt::deserialize(&bytes).map_err(Error::conversion)
472                    })
473                    .collect::<Result<Vec<_>, Error>>()?;
474
475                Ok(ark_core::server::PendingTx {
476                    ark_txid,
477                    signed_ark_tx,
478                    signed_checkpoint_txs,
479                })
480            })
481            .collect()
482    }
483
484    pub async fn confirm_registration(&self, intent_id: String) -> Result<(), Error> {
485        self.ark()?
486            .request(move |mut client| async move {
487                client
488                    .confirm_registration(ConfirmRegistrationRequest { intent_id })
489                    .await
490            })
491            .await?;
492
493        Ok(())
494    }
495
496    pub async fn submit_tree_nonces(
497        &self,
498        batch_id: &str,
499        cosigner_pubkey: PublicKey,
500        pub_nonce_tree: NoncePks,
501    ) -> Result<(), Error> {
502        self.ark()?
503            .request(move |mut client| async move {
504                client
505                    .submit_tree_nonces(SubmitTreeNoncesRequest {
506                        batch_id: batch_id.to_string(),
507                        pubkey: cosigner_pubkey.to_string(),
508                        tree_nonces: pub_nonce_tree.encode(),
509                    })
510                    .await
511            })
512            .await?;
513
514        Ok(())
515    }
516
517    pub async fn submit_tree_signatures(
518        &self,
519        batch_id: &str,
520        cosigner_pk: PublicKey,
521        partial_sig_tree: PartialSigTree,
522    ) -> Result<(), Error> {
523        self.ark()?
524            .request(move |mut client| async move {
525                client
526                    .submit_tree_signatures(SubmitTreeSignaturesRequest {
527                        batch_id: batch_id.to_string(),
528                        pubkey: cosigner_pk.to_string(),
529                        tree_signatures: partial_sig_tree.encode(),
530                    })
531                    .await
532            })
533            .await?;
534
535        Ok(())
536    }
537
538    pub async fn submit_signed_forfeit_txs(
539        &self,
540        signed_forfeit_txs: Vec<Psbt>,
541        signed_commitment_tx: Option<Psbt>,
542    ) -> Result<(), Error> {
543        let base64 = base64::engine::GeneralPurpose::new(
544            &base64::alphabet::STANDARD,
545            base64::engine::GeneralPurposeConfig::new(),
546        );
547
548        let signed_commitment_tx = signed_commitment_tx
549            .map(|tx| base64.encode(tx.serialize()))
550            .unwrap_or_default();
551
552        self.ark()?
553            .request(move |mut client| async move {
554                client
555                    .submit_signed_forfeit_txs(SubmitSignedForfeitTxsRequest {
556                        signed_forfeit_txs: signed_forfeit_txs
557                            .iter()
558                            .map(|psbt| base64.encode(psbt.serialize()))
559                            .collect(),
560                        signed_commitment_tx,
561                    })
562                    .await
563            })
564            .await?;
565
566        Ok(())
567    }
568
569    pub async fn get_event_stream(
570        &self,
571        topics: Vec<String>,
572    ) -> Result<impl Stream<Item = Result<StreamEvent, Error>> + Unpin, Error> {
573        let response = self
574            .ark()?
575            .request(move |mut client| async move {
576                client
577                    .get_event_stream(GetEventStreamRequest { topics })
578                    .await
579            })
580            .await?;
581        let mut stream = response.into_inner();
582
583        let stream = stream! {
584            loop {
585                match stream.try_next().await {
586                    Ok(Some(event)) => match event.event {
587                        None => {
588                            log::debug!("Got empty message");
589                        }
590                        Some(event) => {
591                            yield Ok(StreamEvent::try_from(event)?);
592                        }
593                    },
594                    Ok(None) => {
595                        yield Err(Error::event_stream_disconnect());
596                    }
597                    Err(e) => {
598                        yield Err(Error::event_stream(e));
599                    }
600                }
601            }
602        };
603
604        Ok(stream.boxed())
605    }
606
607    pub async fn get_tx_stream(
608        &self,
609    ) -> Result<impl Stream<Item = Result<StreamTransactionData, Error>> + Unpin, Error> {
610        let response = self
611            .ark()?
612            .request(|mut client| async move {
613                client
614                    .get_transactions_stream(GetTransactionsStreamRequest {})
615                    .await
616            })
617            .await?;
618
619        let mut stream = response.into_inner();
620
621        let stream = stream! {
622            loop {
623                match stream.try_next().await {
624                    Ok(Some(event)) => match event.data {
625                        None => {
626                            log::debug!("Got empty message");
627                        }
628                        Some(event) => {
629                            yield Ok(StreamTransactionData::try_from(event)?);
630                        }
631                    },
632                    Ok(None) => {
633                        yield Err(Error::event_stream_disconnect());
634                    }
635                    Err(e) => {
636                        yield Err(Error::event_stream(e));
637                    }
638                }
639            }
640        };
641
642        Ok(stream.boxed())
643    }
644
645    pub async fn get_vtxo_chain(
646        &self,
647        outpoint: Option<OutPoint>,
648        size_and_index: Option<(i32, i32)>,
649    ) -> Result<VtxoChainResponse, Error> {
650        let response = self
651            .indexer()?
652            .request(move |mut client| async move {
653                client
654                    .get_vtxo_chain(generated::ark::v1::GetVtxoChainRequest {
655                        outpoint: outpoint.map(|o| generated::ark::v1::IndexerOutpoint {
656                            txid: o.txid.to_string(),
657                            vout: o.vout,
658                        }),
659                        page: size_and_index.map(|(size, index)| {
660                            generated::ark::v1::IndexerPageRequest { size, index }
661                        }),
662                    })
663                    .await
664            })
665            .await?;
666        let response = response.into_inner();
667        let result = response.try_into()?;
668        Ok(result)
669    }
670
671    pub async fn get_virtual_txs(
672        &self,
673        txids: Vec<String>,
674        size_and_index: Option<(i32, i32)>,
675    ) -> Result<VirtualTxsResponse, Error> {
676        let response = self
677            .indexer()?
678            .request(move |mut client| async move {
679                client
680                    .get_virtual_txs(generated::ark::v1::GetVirtualTxsRequest {
681                        txids,
682                        page: size_and_index.map(|(size, index)| {
683                            generated::ark::v1::IndexerPageRequest { size, index }
684                        }),
685                    })
686                    .await
687            })
688            .await?;
689        let response = response.into_inner();
690        let result = response.try_into()?;
691        Ok(result)
692    }
693
694    /// Allows to subscribe for tx notifications related to the provided
695    /// vtxo scripts.
696    ///
697    /// It can also be used to update an existing subscriptions by adding
698    /// new scripts to it.
699    ///
700    /// Note: for new subscriptions, don't provide a `subscription_id`
701    ///
702    /// Returns the subscription id if successful
703    pub async fn subscribe_to_scripts(
704        &self,
705        scripts: Vec<ArkAddress>,
706        subscription_id: Option<String>,
707    ) -> Result<String, Error> {
708        let scripts = scripts
709            .iter()
710            .map(|address| address.to_p2tr_script_pubkey().to_hex_string())
711            .collect::<Vec<_>>();
712
713        // For new subscription we expect empty string ("") here
714        let subscription_id = subscription_id.unwrap_or_default();
715
716        let response = self
717            .indexer()?
718            .request(move |mut client| async move {
719                client
720                    .subscribe_for_scripts(SubscribeForScriptsRequest {
721                        scripts,
722                        subscription_id,
723                    })
724                    .await
725            })
726            .await?;
727
728        let response = response.into_inner();
729
730        Ok(response.subscription_id)
731    }
732
733    /// Allows to remove scripts from an existing subscription.
734    pub async fn unsubscribe_from_scripts(
735        &self,
736        scripts: Vec<ArkAddress>,
737        subscription_id: String,
738    ) -> Result<(), Error> {
739        let scripts = scripts
740            .iter()
741            .map(|address| address.to_p2tr_script_pubkey().to_hex_string())
742            .collect::<Vec<_>>();
743
744        self.indexer()?
745            .request(move |mut client| async move {
746                client
747                    .unsubscribe_for_scripts(UnsubscribeForScriptsRequest {
748                        subscription_id,
749                        scripts,
750                    })
751                    .await
752            })
753            .await?;
754
755        Ok(())
756    }
757
758    /// Gets a subscription stream that returns subscription responses.
759    pub async fn get_subscription(
760        &self,
761        subscription_id: String,
762    ) -> Result<impl Stream<Item = Result<SubscriptionResponse, Error>> + Unpin, Error> {
763        let response = self
764            .indexer()?
765            .request(move |mut client| async move {
766                client
767                    .get_subscription(GetSubscriptionRequest { subscription_id })
768                    .await
769            })
770            .await?;
771
772        let mut stream = response.into_inner();
773
774        let stream = stream! {
775            loop {
776                match stream.try_next().await {
777                    Ok(Some(response)) => {
778                        match SubscriptionResponse::try_from(response) {
779                            Ok(subscription_response) => {
780                                yield Ok(subscription_response);
781                            }
782                            Err(e) => {
783                                yield Err(e);
784                            }
785                        }
786                    }
787                    Ok(None) => {
788                        break;
789                    }
790                    Err(e) => {
791                        yield Err(Error::event_stream(e));
792                    }
793                }
794            }
795        };
796
797        Ok(stream.boxed())
798    }
799
800    pub async fn estimate_fees(
801        &self,
802        intent: ark_core::intent::Intent,
803    ) -> Result<SignedAmount, Error> {
804        let intent = intent.try_into()?;
805        let response = self
806            .ark()?
807            .request(move |mut client| async move {
808                client
809                    .estimate_intent_fee(EstimateIntentFeeRequest {
810                        intent: Some(intent),
811                    })
812                    .await
813            })
814            .await?;
815        let response = response.into_inner();
816
817        Ok(SignedAmount::from_sat(response.fee))
818    }
819
820    pub async fn get_asset(&self, asset_id: AssetId) -> Result<AssetInfo, Error> {
821        let response = self
822            .indexer()?
823            .request(move |mut client| async move {
824                client
825                    .get_asset(generated::ark::v1::GetAssetRequest {
826                        asset_id: asset_id.to_string(),
827                    })
828                    .await
829            })
830            .await?;
831
832        let inner = response.into_inner();
833
834        let supply = inner.supply.parse::<u64>().map_err(Error::conversion)?;
835
836        let asset_id = inner.asset_id.parse().map_err(Error::conversion)?;
837        let control_asset_id = if inner.control_asset.is_empty() {
838            None
839        } else {
840            Some(inner.control_asset.parse().map_err(Error::conversion)?)
841        };
842
843        Ok(AssetInfo {
844            asset_id,
845            control_asset_id,
846            supply,
847            metadata: inner.metadata,
848        })
849    }
850
851    fn ark(&self) -> Result<guarded::Ark, Error> {
852        self.ark.clone().ok_or(Error::not_connected())
853    }
854
855    fn indexer(&self) -> Result<guarded::Indexer, Error> {
856        self.indexer.clone().ok_or(Error::not_connected())
857    }
858}
859
860impl TryFrom<ark_core::intent::Intent> for Intent {
861    type Error = Error;
862
863    fn try_from(value: ark_core::intent::Intent) -> Result<Self, Self::Error> {
864        Ok(Self {
865            proof: value.serialize_proof(),
866            message: value.serialize_message().map_err(Error::conversion)?,
867        })
868    }
869}
870
871impl TryFrom<generated::ark::v1::BatchStartedEvent> for BatchStartedEvent {
872    type Error = Error;
873
874    fn try_from(value: generated::ark::v1::BatchStartedEvent) -> Result<Self, Self::Error> {
875        let batch_expiry = parse_sequence_number(value.batch_expiry).map_err(Error::conversion)?;
876
877        Ok(BatchStartedEvent {
878            id: value.id,
879            intent_id_hashes: value.intent_id_hashes,
880            batch_expiry,
881        })
882    }
883}
884
885impl TryFrom<generated::ark::v1::StreamStartedEvent> for StreamStartedEvent {
886    type Error = Error;
887
888    fn try_from(value: generated::ark::v1::StreamStartedEvent) -> Result<Self, Self::Error> {
889        Ok(StreamStartedEvent { id: value.id })
890    }
891}
892
893impl TryFrom<generated::ark::v1::BatchFinalizationEvent> for BatchFinalizationEvent {
894    type Error = Error;
895
896    fn try_from(value: generated::ark::v1::BatchFinalizationEvent) -> Result<Self, Self::Error> {
897        let base64 = &base64::engine::GeneralPurpose::new(
898            &base64::alphabet::STANDARD,
899            base64::engine::GeneralPurposeConfig::new(),
900        );
901
902        let commitment_tx = base64
903            .decode(&value.commitment_tx)
904            .map_err(Error::conversion)?;
905        let commitment_tx = Psbt::deserialize(&commitment_tx).map_err(Error::conversion)?;
906
907        Ok(BatchFinalizationEvent {
908            id: value.id,
909            commitment_tx,
910        })
911    }
912}
913
914impl TryFrom<generated::ark::v1::BatchFinalizedEvent> for BatchFinalizedEvent {
915    type Error = Error;
916
917    fn try_from(value: generated::ark::v1::BatchFinalizedEvent) -> Result<Self, Self::Error> {
918        let commitment_txid = value.commitment_txid.parse().map_err(Error::conversion)?;
919
920        Ok(BatchFinalizedEvent {
921            id: value.id,
922            commitment_txid,
923        })
924    }
925}
926
927impl From<generated::ark::v1::BatchFailedEvent> for BatchFailed {
928    fn from(value: generated::ark::v1::BatchFailedEvent) -> Self {
929        BatchFailed {
930            id: value.id,
931            reason: value.reason,
932        }
933    }
934}
935
936impl TryFrom<generated::ark::v1::TreeSigningStartedEvent> for TreeSigningStartedEvent {
937    type Error = Error;
938
939    fn try_from(value: generated::ark::v1::TreeSigningStartedEvent) -> Result<Self, Self::Error> {
940        let unsigned_commitment_tx = base64::engine::GeneralPurpose::new(
941            &base64::alphabet::STANDARD,
942            base64::engine::GeneralPurposeConfig::new(),
943        )
944        .decode(&value.unsigned_commitment_tx)
945        .map_err(Error::conversion)?;
946
947        let unsigned_commitment_tx =
948            Psbt::deserialize(&unsigned_commitment_tx).map_err(Error::conversion)?;
949
950        Ok(TreeSigningStartedEvent {
951            id: value.id,
952            cosigners_pubkeys: value
953                .cosigners_pubkeys
954                .into_iter()
955                .map(|pk| pk.parse().map_err(Error::conversion))
956                .collect::<Result<Vec<_>, Error>>()?,
957            unsigned_commitment_tx,
958        })
959    }
960}
961
962impl TryFrom<generated::ark::v1::TreeNoncesAggregatedEvent> for TreeNoncesAggregatedEvent {
963    type Error = Error;
964
965    fn try_from(value: generated::ark::v1::TreeNoncesAggregatedEvent) -> Result<Self, Self::Error> {
966        let tree_nonces = NoncePks::decode(value.tree_nonces).map_err(Error::conversion)?;
967
968        Ok(TreeNoncesAggregatedEvent {
969            id: value.id,
970            tree_nonces,
971        })
972    }
973}
974
975impl TryFrom<generated::ark::v1::TreeTxEvent> for TreeTxEvent {
976    type Error = Error;
977
978    fn try_from(value: generated::ark::v1::TreeTxEvent) -> Result<Self, Self::Error> {
979        let batch_tree_event_type = match value.batch_index {
980            0 => BatchTreeEventType::Vtxo,
981            1 => BatchTreeEventType::Connector,
982            n => return Err(Error::conversion(format!("unsupported batch index: {n}"))),
983        };
984
985        let txid = if value.txid.is_empty() {
986            None
987        } else {
988            Some(value.txid.parse().map_err(Error::conversion)?)
989        };
990
991        let base64 = &base64::engine::GeneralPurpose::new(
992            &base64::alphabet::STANDARD,
993            base64::engine::GeneralPurposeConfig::new(),
994        );
995
996        let bytes = base64.decode(&value.tx).map_err(Error::conversion)?;
997        let tx = Psbt::deserialize(&bytes).map_err(Error::conversion)?;
998
999        let children = value
1000            .children
1001            .iter()
1002            .map(|(index, txid)| Ok((*index, txid.parse().map_err(Error::conversion)?)))
1003            .collect::<Result<HashMap<_, _>, Error>>()?;
1004
1005        Ok(Self {
1006            id: value.id,
1007            topic: value.topic,
1008            batch_tree_event_type,
1009            tx_graph_chunk: TxGraphChunk { txid, tx, children },
1010        })
1011    }
1012}
1013
1014impl TryFrom<generated::ark::v1::TreeSignatureEvent> for TreeSignatureEvent {
1015    type Error = Error;
1016
1017    fn try_from(value: generated::ark::v1::TreeSignatureEvent) -> Result<Self, Self::Error> {
1018        let batch_tree_event_type = match value.batch_index {
1019            0 => BatchTreeEventType::Vtxo,
1020            1 => BatchTreeEventType::Connector,
1021            n => return Err(Error::conversion(format!("unsupported batch index: {n}"))),
1022        };
1023
1024        let txid = value.txid.parse().map_err(Error::conversion)?;
1025
1026        let signature = Vec::from_hex(&value.signature).map_err(Error::conversion)?;
1027        let signature = Signature::from_slice(&signature).map_err(Error::conversion)?;
1028
1029        Ok(Self {
1030            id: value.id,
1031            topic: value.topic,
1032            batch_tree_event_type,
1033            txid,
1034            signature,
1035        })
1036    }
1037}
1038
1039impl TryFrom<generated::ark::v1::TreeNoncesEvent> for TreeNoncesEvent {
1040    type Error = Error;
1041
1042    fn try_from(value: generated::ark::v1::TreeNoncesEvent) -> Result<Self, Self::Error> {
1043        let txid = value.txid.parse().map_err(Error::conversion)?;
1044
1045        let nonces = TreeTxNoncePks::decode(value.nonces).map_err(Error::conversion)?;
1046
1047        Ok(Self {
1048            id: value.id,
1049            topic: value.topic,
1050            txid,
1051            nonces,
1052        })
1053    }
1054}
1055
1056impl TryFrom<generated::ark::v1::get_event_stream_response::Event> for StreamEvent {
1057    type Error = Error;
1058
1059    fn try_from(
1060        value: generated::ark::v1::get_event_stream_response::Event,
1061    ) -> Result<Self, Self::Error> {
1062        Ok(match value {
1063            generated::ark::v1::get_event_stream_response::Event::StreamStarted(e) => {
1064                StreamEvent::StreamStarted(e.try_into()?)
1065            }
1066            generated::ark::v1::get_event_stream_response::Event::BatchStarted(e) => {
1067                StreamEvent::BatchStarted(e.try_into()?)
1068            }
1069            generated::ark::v1::get_event_stream_response::Event::BatchFinalization(e) => {
1070                StreamEvent::BatchFinalization(e.try_into()?)
1071            }
1072            generated::ark::v1::get_event_stream_response::Event::BatchFinalized(e) => {
1073                StreamEvent::BatchFinalized(e.try_into()?)
1074            }
1075            generated::ark::v1::get_event_stream_response::Event::BatchFailed(e) => {
1076                StreamEvent::BatchFailed(e.into())
1077            }
1078            generated::ark::v1::get_event_stream_response::Event::TreeSigningStarted(e) => {
1079                StreamEvent::TreeSigningStarted(e.try_into()?)
1080            }
1081            generated::ark::v1::get_event_stream_response::Event::TreeNoncesAggregated(e) => {
1082                StreamEvent::TreeNoncesAggregated(e.try_into()?)
1083            }
1084            generated::ark::v1::get_event_stream_response::Event::TreeTx(e) => {
1085                StreamEvent::TreeTx(e.try_into()?)
1086            }
1087            generated::ark::v1::get_event_stream_response::Event::TreeSignature(e) => {
1088                StreamEvent::TreeSignature(e.try_into()?)
1089            }
1090            generated::ark::v1::get_event_stream_response::Event::TreeNonces(e) => {
1091                StreamEvent::TreeNonces(e.try_into()?)
1092            }
1093            generated::ark::v1::get_event_stream_response::Event::Heartbeat(_) => {
1094                StreamEvent::Heartbeat
1095            }
1096        })
1097    }
1098}
1099
1100impl TryFrom<generated::ark::v1::get_transactions_stream_response::Data> for StreamTransactionData {
1101    type Error = Error;
1102
1103    fn try_from(
1104        value: generated::ark::v1::get_transactions_stream_response::Data,
1105    ) -> Result<Self, Self::Error> {
1106        match value {
1107            generated::ark::v1::get_transactions_stream_response::Data::CommitmentTx(
1108                commitment_tx,
1109            ) => Ok(StreamTransactionData::Commitment(
1110                CommitmentTransaction::try_from(commitment_tx)?,
1111            )),
1112            generated::ark::v1::get_transactions_stream_response::Data::ArkTx(redeem) => Ok(
1113                StreamTransactionData::Ark(ArkTransaction::try_from(redeem)?),
1114            ),
1115            generated::ark::v1::get_transactions_stream_response::Data::Heartbeat(_) => {
1116                Ok(StreamTransactionData::Heartbeat)
1117            }
1118            generated::ark::v1::get_transactions_stream_response::Data::SweepTx(tx) => {
1119                Ok(StreamTransactionData::Ark(ArkTransaction::try_from(tx)?))
1120            }
1121        }
1122    }
1123}
1124
1125impl TryFrom<generated::ark::v1::TxNotification> for CommitmentTransaction {
1126    type Error = Error;
1127
1128    fn try_from(value: generated::ark::v1::TxNotification) -> Result<Self, Self::Error> {
1129        let spent_vtxos = value
1130            .spent_vtxos
1131            .iter()
1132            .map(VirtualTxOutPoint::try_from)
1133            .collect::<Result<Vec<_>, _>>()?;
1134
1135        let spendable_vtxos = value
1136            .spendable_vtxos
1137            .iter()
1138            .map(VirtualTxOutPoint::try_from)
1139            .collect::<Result<Vec<_>, _>>()?;
1140
1141        Ok(CommitmentTransaction {
1142            txid: Txid::from_str(value.txid.as_str()).map_err(Error::conversion)?,
1143            spent_vtxos,
1144            unspent_vtxos: spendable_vtxos,
1145        })
1146    }
1147}
1148
1149impl TryFrom<generated::ark::v1::TxNotification> for ArkTransaction {
1150    type Error = Error;
1151
1152    fn try_from(value: generated::ark::v1::TxNotification) -> Result<Self, Self::Error> {
1153        let spent_vtxos = value
1154            .spent_vtxos
1155            .iter()
1156            .map(VirtualTxOutPoint::try_from)
1157            .collect::<Result<Vec<_>, _>>()?;
1158
1159        let spendable_vtxos = value
1160            .spendable_vtxos
1161            .iter()
1162            .map(VirtualTxOutPoint::try_from)
1163            .collect::<Result<Vec<_>, _>>()?;
1164
1165        let tx = if value.tx.is_empty() {
1166            None
1167        } else {
1168            let base64 = base64::engine::GeneralPurpose::new(
1169                &base64::alphabet::STANDARD,
1170                base64::engine::GeneralPurposeConfig::new(),
1171            );
1172            let bytes = base64.decode(&value.tx).map_err(Error::conversion)?;
1173            Some(Psbt::deserialize(&bytes).map_err(Error::conversion)?)
1174        };
1175
1176        let checkpoint_txs = value
1177            .checkpoint_txs
1178            .into_iter()
1179            .map(|(k, v)| {
1180                let out_point = OutPoint::from_str(k.as_str()).map_err(Error::conversion)?;
1181                let txid = v.txid.parse().map_err(Error::conversion)?;
1182                Ok((out_point, txid))
1183            })
1184            .collect::<Result<HashMap<_, _>, Error>>()?;
1185
1186        let swept_vtxos = value
1187            .swept_vtxos
1188            .into_iter()
1189            .map(OutPoint::try_from)
1190            .collect::<Result<Vec<_>, _>>()?;
1191
1192        Ok(ArkTransaction {
1193            txid: Txid::from_str(value.txid.as_str()).map_err(Error::conversion)?,
1194            tx,
1195            spent_vtxos,
1196            unspent_vtxos: spendable_vtxos,
1197            checkpoint_txs,
1198            swept_vtxos,
1199        })
1200    }
1201}
1202
1203impl TryFrom<Outpoint> for OutPoint {
1204    type Error = Error;
1205
1206    fn try_from(value: Outpoint) -> Result<Self, Self::Error> {
1207        let point = OutPoint {
1208            txid: Txid::from_str(value.txid.as_str()).map_err(Error::conversion)?,
1209            vout: value.vout,
1210        };
1211        Ok(point)
1212    }
1213}
1214
1215pub struct VtxoChainResponse {
1216    pub chains: VtxoChains,
1217    pub page: Option<IndexerPage>,
1218}
1219
1220pub struct ListVtxosResponse {
1221    pub vtxos: Vec<VirtualTxOutPoint>,
1222    pub page: Option<IndexerPage>,
1223}
1224
1225impl TryFrom<generated::ark::v1::GetVtxoChainResponse> for VtxoChainResponse {
1226    type Error = Error;
1227
1228    fn try_from(value: generated::ark::v1::GetVtxoChainResponse) -> Result<Self, Self::Error> {
1229        let chains = value
1230            .chain
1231            .iter()
1232            .map(VtxoChain::try_from)
1233            .collect::<Result<Vec<_>, Error>>()?;
1234
1235        Ok(VtxoChainResponse {
1236            chains: VtxoChains { inner: chains },
1237            page: value
1238                .page
1239                .map(IndexerPage::try_from)
1240                .transpose()
1241                .map_err(Error::conversion)?,
1242        })
1243    }
1244}
1245
1246impl TryFrom<generated::ark::v1::GetVirtualTxsResponse> for VirtualTxsResponse {
1247    type Error = Error;
1248
1249    fn try_from(value: generated::ark::v1::GetVirtualTxsResponse) -> Result<Self, Self::Error> {
1250        let base64 = &base64::engine::GeneralPurpose::new(
1251            &base64::alphabet::STANDARD,
1252            base64::engine::GeneralPurposeConfig::new(),
1253        );
1254
1255        let txs = value
1256            .txs
1257            .into_iter()
1258            .map(|tx| {
1259                let bytes = base64.decode(&tx).map_err(Error::conversion)?;
1260                let psbt = Psbt::deserialize(&bytes).map_err(Error::conversion)?;
1261
1262                Ok(psbt)
1263            })
1264            .collect::<Result<Vec<_>, _>>()?;
1265
1266        Ok(VirtualTxsResponse {
1267            txs,
1268            page: value
1269                .page
1270                .map(IndexerPage::try_from)
1271                .transpose()
1272                .map_err(Error::conversion)?,
1273        })
1274    }
1275}
1276
1277impl TryFrom<&generated::ark::v1::IndexerChain> for VtxoChain {
1278    type Error = Error;
1279
1280    fn try_from(value: &generated::ark::v1::IndexerChain) -> Result<Self, Self::Error> {
1281        let spends = value
1282            .spends
1283            .iter()
1284            .map(|txid| {
1285                // Handle the case where txid might be 66 bytes long by trimming the last 2 bytes.
1286                let txid_str = if txid.len() == 66 { &txid[..64] } else { txid };
1287                txid_str.parse().map_err(Error::conversion)
1288            })
1289            .collect::<Result<Vec<_>, Error>>()?;
1290
1291        let tx_type = match value.r#type() {
1292            IndexerChainedTxType::Unspecified => ChainedTxType::Unspecified,
1293            IndexerChainedTxType::Commitment => ChainedTxType::Commitment,
1294            IndexerChainedTxType::Ark => ChainedTxType::Ark,
1295            IndexerChainedTxType::Tree => ChainedTxType::Tree,
1296            IndexerChainedTxType::Checkpoint => ChainedTxType::Checkpoint,
1297        };
1298
1299        Ok(VtxoChain {
1300            txid: value.txid.parse().map_err(Error::conversion)?,
1301            tx_type,
1302            spends,
1303            expires_at: value.expires_at,
1304        })
1305    }
1306}
1307
1308impl From<generated::ark::v1::IndexerPageResponse> for IndexerPage {
1309    fn from(value: generated::ark::v1::IndexerPageResponse) -> Self {
1310        IndexerPage {
1311            current: value.current,
1312            next: value.next,
1313            total: value.total,
1314        }
1315    }
1316}
1317
1318impl TryFrom<&generated::ark::v1::IndexerTxHistoryRecord> for history::Transaction {
1319    type Error = Error;
1320
1321    fn try_from(value: &generated::ark::v1::IndexerTxHistoryRecord) -> Result<Self, Self::Error> {
1322        let sign = match value.r#type() {
1323            generated::ark::v1::IndexerTxType::Received => 1,
1324            // Default to sent if unspecified.
1325            generated::ark::v1::IndexerTxType::Sent
1326            | generated::ark::v1::IndexerTxType::Unspecified => -1,
1327        };
1328
1329        let amount = SignedAmount::from_sat(value.amount as i64 * sign);
1330
1331        let tx = match &value.key {
1332            Some(Key::CommitmentTxid(txid)) => history::Transaction::Commitment {
1333                txid: txid.parse().map_err(Error::conversion)?,
1334                amount,
1335                created_at: value.created_at,
1336            },
1337            Some(Key::VirtualTxid(txid)) => history::Transaction::Ark {
1338                txid: txid.parse().map_err(Error::conversion)?,
1339                amount,
1340                is_settled: value.is_settled,
1341                created_at: value.created_at,
1342            },
1343            None => return Err(Error::conversion("invalid transaction without key")),
1344        };
1345
1346        Ok(tx)
1347    }
1348}
1349
1350impl TryFrom<generated::ark::v1::GetSubscriptionResponse> for SubscriptionResponse {
1351    type Error = Error;
1352
1353    fn try_from(value: generated::ark::v1::GetSubscriptionResponse) -> Result<Self, Self::Error> {
1354        let value = match value.data {
1355            Some(get_subscription_response::Data::Heartbeat(_)) => return Ok(Self::Heartbeat),
1356            Some(get_subscription_response::Data::Event(event)) => event,
1357            None => return Err(Error::conversion("empty subscription response")),
1358        };
1359
1360        let txid = value.txid.parse().map_err(Error::conversion)?;
1361
1362        let new_vtxos = value
1363            .new_vtxos
1364            .iter()
1365            .map(VirtualTxOutPoint::try_from)
1366            .collect::<Result<Vec<_>, _>>()?;
1367
1368        let spent_vtxos = value
1369            .spent_vtxos
1370            .iter()
1371            .map(VirtualTxOutPoint::try_from)
1372            .collect::<Result<Vec<_>, _>>()?;
1373
1374        let tx = if value.tx.is_empty() {
1375            None
1376        } else {
1377            match Vec::from_hex(&value.tx)
1378                .ok()
1379                .and_then(|bytes| bitcoin::consensus::deserialize::<Transaction>(&bytes).ok())
1380            {
1381                Some(raw_tx) => Some(raw_tx),
1382                None => {
1383                    let base64 = base64::engine::GeneralPurpose::new(
1384                        &base64::alphabet::STANDARD,
1385                        base64::engine::GeneralPurposeConfig::new(),
1386                    );
1387                    let bytes = base64.decode(&value.tx).map_err(|e| {
1388                        let tx_prefix = value.tx.chars().take(24).collect::<String>();
1389                        Error::conversion(format!(
1390                            "invalid subscription tx payload (not hex tx or base64 psbt) (len={}, tx_prefix='{}'): {e}",
1391                            value.tx.len(),
1392                            tx_prefix
1393                        ))
1394                    })?;
1395                    let psbt = Psbt::deserialize(&bytes).map_err(|e| {
1396                        let tx_prefix = value.tx.chars().take(24).collect::<String>();
1397                        Error::conversion(format!(
1398                            "invalid subscription tx payload (base64 but not psbt) (len={}, tx_prefix='{}'): {e}",
1399                            value.tx.len(),
1400                            tx_prefix
1401                        ))
1402                    })?;
1403                    Some(psbt.unsigned_tx)
1404                }
1405            }
1406        };
1407
1408        let checkpoint_txs = value
1409            .checkpoint_txs
1410            .into_iter()
1411            .map(|(k, v)| {
1412                let out_point = OutPoint::from_str(k.as_str()).map_err(Error::conversion)?;
1413                let txid = v.txid.parse().map_err(Error::conversion)?;
1414                Ok((out_point, txid))
1415            })
1416            .collect::<Result<HashMap<_, _>, Error>>()?;
1417
1418        let scripts = value
1419            .scripts
1420            .iter()
1421            .map(|h| ScriptBuf::from_hex(h).map_err(Error::conversion))
1422            .collect::<Result<Vec<_>, _>>()?;
1423
1424        Ok(Self::Event(Box::new(SubscriptionEvent {
1425            txid,
1426            scripts,
1427            new_vtxos,
1428            spent_vtxos,
1429            tx,
1430            checkpoint_txs,
1431        })))
1432    }
1433}
1434
1435impl From<GetVtxosRequest> for generated::ark::v1::GetVtxosRequest {
1436    fn from(value: GetVtxosRequest) -> Self {
1437        let (spendable_only, spent_only, recoverable_only, pending_only) = match value.filter() {
1438            Some(GetVtxosRequestFilter::Spendable) => (true, false, false, false),
1439            Some(GetVtxosRequestFilter::Spent) => (false, true, false, false),
1440            Some(GetVtxosRequestFilter::Recoverable) => (false, false, true, false),
1441            Some(GetVtxosRequestFilter::PendingOnly) => (false, false, false, true),
1442            None => (false, false, false, false),
1443        };
1444
1445        let page = value
1446            .page()
1447            .map(|p| generated::ark::v1::IndexerPageRequest {
1448                size: p.size,
1449                index: p.index,
1450            });
1451
1452        match value.reference() {
1453            GetVtxosRequestReference::Scripts(script_bufs) => Self {
1454                scripts: script_bufs.iter().map(|s| s.to_hex_string()).collect(),
1455                outpoints: Vec::new(),
1456                spendable_only,
1457                spent_only,
1458                recoverable_only,
1459                page,
1460                pending_only,
1461                after: value.after().unwrap_or(0) as i64,
1462                before: value.before().unwrap_or(0) as i64,
1463            },
1464            GetVtxosRequestReference::OutPoints(outpoints) => Self {
1465                scripts: Vec::new(),
1466                outpoints: outpoints.iter().map(|o| o.to_string()).collect(),
1467                spendable_only,
1468                spent_only,
1469                recoverable_only,
1470                page,
1471                pending_only,
1472                after: value.after().unwrap_or(0) as i64,
1473                before: value.before().unwrap_or(0) as i64,
1474            },
1475        }
1476    }
1477}
1478
1479mod guarded {
1480    //! Guarded wrappers around the generated tonic clients.
1481    //!
1482    //! `Client` methods use these wrappers instead of storing or cloning generated clients
1483    //! directly. `request` is the only normal RPC escape hatch: it clones the generated
1484    //! client, runs the closure's future, and routes the result through `SharedState` so a
1485    //! digest mismatch refreshes `/info` and updates the shared digest/header state.
1486    //!
1487    //! Each `request` closure must perform exactly one non-`GetInfo` gRPC call and return
1488    //! that call's `tonic::Status` unchanged. Do not batch multiple RPCs, call `GetInfo`,
1489    //! swallow errors, or synthesize fallback successes inside the closure. Guarding is
1490    //! scoped to a single failed operation, and digest-mismatch refresh intentionally does
1491    //! not retry that operation.
1492
1493    use super::InterceptedChannel;
1494    use super::SharedState;
1495    use crate::generated::ark::v1::ark_service_client::ArkServiceClient;
1496    use crate::generated::ark::v1::indexer_service_client::IndexerServiceClient;
1497    use crate::Error;
1498    use ark_core::server::Info;
1499    use futures::Future;
1500
1501    #[derive(Clone)]
1502    pub(super) struct Ark {
1503        raw: ArkServiceClient<InterceptedChannel>,
1504        shared: SharedState,
1505    }
1506
1507    impl Ark {
1508        pub(super) fn new(raw: ArkServiceClient<InterceptedChannel>, shared: SharedState) -> Self {
1509            Self { raw, shared }
1510        }
1511
1512        /// Runs one Ark service RPC through the digest guard.
1513        ///
1514        /// The closure must perform exactly one non-`GetInfo` gRPC call and return that
1515        /// call's `tonic::Status` unchanged. Do not batch multiple RPCs, call `GetInfo`,
1516        /// swallow errors, or synthesize fallback successes inside the closure. A digest
1517        /// mismatch refreshes `/info` and returns `ServerInfoChanged`; the failed RPC is
1518        /// not retried automatically.
1519        pub(super) async fn request<T, F, Fut>(&self, f: F) -> Result<T, Error>
1520        where
1521            F: FnOnce(ArkServiceClient<InterceptedChannel>) -> Fut,
1522            Fut: Future<Output = Result<T, tonic::Status>>,
1523        {
1524            let client = self.raw.clone();
1525            let info_client = self.raw.clone();
1526
1527            self.shared
1528                .guarded(info_client, async move {
1529                    f(client).await.map_err(Error::request)
1530                })
1531                .await
1532        }
1533
1534        pub(super) async fn get_info(&self) -> Result<Info, Error> {
1535            self.shared.get_info_unguarded(self.raw.clone()).await
1536        }
1537    }
1538
1539    #[derive(Clone)]
1540    pub(super) struct Indexer {
1541        raw: IndexerServiceClient<InterceptedChannel>,
1542        info_client: ArkServiceClient<InterceptedChannel>,
1543        shared: SharedState,
1544    }
1545
1546    impl Indexer {
1547        pub(super) fn new(
1548            raw: IndexerServiceClient<InterceptedChannel>,
1549            info_client: ArkServiceClient<InterceptedChannel>,
1550            shared: SharedState,
1551        ) -> Self {
1552            Self {
1553                raw,
1554                info_client,
1555                shared,
1556            }
1557        }
1558
1559        /// Runs one Indexer service RPC through the digest guard.
1560        ///
1561        /// The closure must perform exactly one non-`GetInfo` gRPC call and return that
1562        /// call's `tonic::Status` unchanged. Do not batch multiple RPCs, call `GetInfo`,
1563        /// swallow errors, or synthesize fallback successes inside the closure. A digest
1564        /// mismatch refreshes `/info` and returns `ServerInfoChanged`; the failed RPC is
1565        /// not retried automatically.
1566        pub(super) async fn request<T, F, Fut>(&self, f: F) -> Result<T, Error>
1567        where
1568            F: FnOnce(IndexerServiceClient<InterceptedChannel>) -> Fut,
1569            Fut: Future<Output = Result<T, tonic::Status>>,
1570        {
1571            let client = self.raw.clone();
1572            let info_client = self.info_client.clone();
1573
1574            self.shared
1575                .guarded(info_client, async move {
1576                    f(client).await.map_err(Error::request)
1577                })
1578                .await
1579        }
1580    }
1581}