Skip to main content

ark_grpc/
client.rs

1use crate::generated;
2use crate::generated::ark::v1::ark_service_client::ArkServiceClient;
3use crate::generated::ark::v1::get_subscription_response;
4use crate::generated::ark::v1::indexer_service_client::IndexerServiceClient;
5use crate::generated::ark::v1::indexer_tx_history_record::Key;
6use crate::generated::ark::v1::ConfirmRegistrationRequest;
7use crate::generated::ark::v1::EstimateIntentFeeRequest;
8use crate::generated::ark::v1::GetEventStreamRequest;
9use crate::generated::ark::v1::GetInfoRequest;
10use crate::generated::ark::v1::GetSubscriptionRequest;
11use crate::generated::ark::v1::GetTransactionsStreamRequest;
12use crate::generated::ark::v1::IndexerChainedTxType;
13use crate::generated::ark::v1::Intent;
14use crate::generated::ark::v1::Outpoint;
15use crate::generated::ark::v1::RegisterIntentRequest;
16use crate::generated::ark::v1::SubmitSignedForfeitTxsRequest;
17use crate::generated::ark::v1::SubmitTreeNoncesRequest;
18use crate::generated::ark::v1::SubmitTreeSignaturesRequest;
19use crate::generated::ark::v1::SubscribeForScriptsRequest;
20use crate::generated::ark::v1::UnsubscribeForScriptsRequest;
21use crate::Error;
22use ark_core::asset::AssetId;
23use ark_core::history;
24use ark_core::server::parse_sequence_number;
25use ark_core::server::ArkTransaction;
26use ark_core::server::AssetInfo;
27use ark_core::server::BatchFailed;
28use ark_core::server::BatchFinalizationEvent;
29use ark_core::server::BatchFinalizedEvent;
30use ark_core::server::BatchStartedEvent;
31use ark_core::server::BatchTreeEventType;
32use ark_core::server::ChainedTxType;
33use ark_core::server::CommitmentTransaction;
34use ark_core::server::FinalizeOffchainTxResponse;
35use ark_core::server::GetVtxosRequest;
36use ark_core::server::GetVtxosRequestFilter;
37use ark_core::server::GetVtxosRequestReference;
38use ark_core::server::IndexerPage;
39use ark_core::server::Info;
40use ark_core::server::NoncePks;
41use ark_core::server::PartialSigTree;
42use ark_core::server::StreamEvent;
43use ark_core::server::StreamStartedEvent;
44use ark_core::server::StreamTransactionData;
45use ark_core::server::SubmitOffchainTxResponse;
46use ark_core::server::SubscriptionEvent;
47use ark_core::server::SubscriptionResponse;
48use ark_core::server::TreeNoncesAggregatedEvent;
49use ark_core::server::TreeNoncesEvent;
50use ark_core::server::TreeSignatureEvent;
51use ark_core::server::TreeSigningStartedEvent;
52use ark_core::server::TreeTxEvent;
53use ark_core::server::TreeTxNoncePks;
54use ark_core::server::VirtualTxOutPoint;
55use ark_core::server::VirtualTxsResponse;
56use ark_core::server::VtxoChain;
57use ark_core::server::VtxoChains;
58use ark_core::ArkAddress;
59use ark_core::TxGraphChunk;
60use async_stream::stream;
61use base64::Engine;
62use bitcoin::hex::FromHex;
63use bitcoin::secp256k1::PublicKey;
64use bitcoin::taproot::Signature;
65use bitcoin::OutPoint;
66use bitcoin::Psbt;
67use bitcoin::ScriptBuf;
68use bitcoin::SignedAmount;
69use bitcoin::Transaction;
70use bitcoin::Txid;
71use futures::Stream;
72use futures::StreamExt;
73use futures::TryStreamExt;
74use std::collections::HashMap;
75use std::fmt;
76use std::str::FromStr;
77
78#[derive(Clone, Copy)]
79struct VersionInterceptor;
80
81impl tonic::service::Interceptor for VersionInterceptor {
82    fn call(&mut self, mut req: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
83        req.metadata_mut().insert(
84            "x-build-version",
85            tonic::metadata::MetadataValue::from_static(env!("CARGO_PKG_VERSION")),
86        );
87        Ok(req)
88    }
89}
90
91type InterceptedChannel =
92    tonic::codegen::InterceptedService<tonic::transport::Channel, VersionInterceptor>;
93
94#[derive(Clone)]
95pub struct Client {
96    url: String,
97    ark_client: Option<ArkServiceClient<InterceptedChannel>>,
98    indexer_client: Option<IndexerServiceClient<InterceptedChannel>>,
99}
100
101impl fmt::Debug for Client {
102    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
103        f.debug_struct("Client")
104            .field("url", &self.url)
105            .field("connected", &self.ark_client.is_some())
106            .finish()
107    }
108}
109
110impl Client {
111    pub fn new(url: String) -> Self {
112        Self {
113            url,
114            ark_client: None,
115            indexer_client: None,
116        }
117    }
118
119    pub async fn connect(&mut self) -> Result<(), Error> {
120        let endpoint =
121            tonic::transport::Endpoint::from_shared(self.url.clone()).map_err(Error::connect)?;
122
123        #[cfg(any(feature = "tls-webpki-roots", feature = "tls-native-roots"))]
124        let endpoint = {
125            let tls = tonic::transport::ClientTlsConfig::new();
126            #[cfg(feature = "tls-webpki-roots")]
127            let tls = tls.with_webpki_roots();
128            #[cfg(feature = "tls-native-roots")]
129            let tls = tls.with_native_roots();
130            endpoint.tls_config(tls).map_err(Error::connect)?
131        };
132
133        let channel = endpoint.connect().await.map_err(Error::connect)?;
134
135        let ark_service_client =
136            ArkServiceClient::with_interceptor(channel.clone(), VersionInterceptor);
137        let indexer_client = IndexerServiceClient::with_interceptor(channel, VersionInterceptor);
138
139        self.ark_client = Some(ark_service_client);
140        self.indexer_client = Some(indexer_client);
141        Ok(())
142    }
143
144    pub async fn get_info(&mut self) -> Result<Info, Error> {
145        let mut client = self.ark_client()?;
146
147        let response = client
148            .get_info(GetInfoRequest {})
149            .await
150            .map_err(Error::request)?;
151
152        response.into_inner().try_into()
153    }
154
155    /// List VTXOs with pagination support.
156    /// Returns a single page of results along with pagination info.
157    pub async fn list_vtxos(&self, request: GetVtxosRequest) -> Result<ListVtxosResponse, Error> {
158        if request.reference().is_empty() {
159            return Ok(ListVtxosResponse {
160                vtxos: Vec::new(),
161                page: None,
162            });
163        }
164
165        let mut client = self.indexer_client()?;
166
167        let response = client
168            .get_vtxos(generated::ark::v1::GetVtxosRequest::from(request))
169            .await
170            .map_err(Error::request)?;
171
172        let inner = response.into_inner();
173
174        let vtxos = inner
175            .vtxos
176            .iter()
177            .map(VirtualTxOutPoint::try_from)
178            .collect::<Result<Vec<_>, _>>()?;
179
180        let page = inner
181            .page
182            .map(IndexerPage::try_from)
183            .transpose()
184            .map_err(Error::conversion)?;
185
186        Ok(ListVtxosResponse { vtxos, page })
187    }
188
189    pub async fn register_intent(&self, intent: ark_core::intent::Intent) -> Result<String, Error> {
190        let mut client = self.ark_client()?;
191
192        let intent = intent.try_into()?;
193        let request = RegisterIntentRequest {
194            intent: Some(intent),
195        };
196
197        let response = client
198            .register_intent(request)
199            .await
200            .map_err(Error::request)?;
201
202        let intent_id = response.into_inner().intent_id;
203
204        Ok(intent_id)
205    }
206
207    pub async fn submit_offchain_transaction_request(
208        &self,
209        ark_tx: Psbt,
210        checkpoint_txs: Vec<Psbt>,
211    ) -> Result<SubmitOffchainTxResponse, Error> {
212        let mut client = self.ark_client()?;
213
214        let base64 = base64::engine::GeneralPurpose::new(
215            &base64::alphabet::STANDARD,
216            base64::engine::GeneralPurposeConfig::new(),
217        );
218
219        let ark_tx = base64.encode(ark_tx.serialize());
220
221        let checkpoint_txs = checkpoint_txs
222            .into_iter()
223            .map(|tx| base64.encode(tx.serialize()))
224            .collect();
225
226        let res = client
227            .submit_tx(generated::ark::v1::SubmitTxRequest {
228                signed_ark_tx: ark_tx,
229                checkpoint_txs,
230            })
231            .await
232            .map_err(Error::request)?;
233
234        let res = res.into_inner();
235
236        let signed_ark_tx = res.final_ark_tx;
237        let signed_ark_tx = base64.decode(signed_ark_tx).map_err(Error::conversion)?;
238        let signed_ark_tx = Psbt::deserialize(&signed_ark_tx).map_err(Error::conversion)?;
239
240        let signed_checkpoint_txs = res
241            .signed_checkpoint_txs
242            .into_iter()
243            .map(|tx| {
244                let tx = base64.decode(tx).map_err(Error::conversion)?;
245                let tx = Psbt::deserialize(&tx).map_err(Error::conversion)?;
246
247                Ok(tx)
248            })
249            .collect::<Result<Vec<_>, Error>>()?;
250
251        Ok(SubmitOffchainTxResponse {
252            signed_ark_tx,
253            signed_checkpoint_txs,
254        })
255    }
256
257    pub async fn finalize_offchain_transaction(
258        &self,
259        txid: Txid,
260        checkpoint_txs: Vec<Psbt>,
261    ) -> Result<FinalizeOffchainTxResponse, Error> {
262        let mut client = self.ark_client()?;
263
264        let base64 = base64::engine::GeneralPurpose::new(
265            &base64::alphabet::STANDARD,
266            base64::engine::GeneralPurposeConfig::new(),
267        );
268
269        let checkpoint_txs = checkpoint_txs
270            .into_iter()
271            .map(|tx| base64.encode(tx.serialize()))
272            .collect();
273
274        client
275            .finalize_tx(generated::ark::v1::FinalizeTxRequest {
276                ark_txid: txid.to_string(),
277                final_checkpoint_txs: checkpoint_txs,
278            })
279            .await
280            .map_err(Error::request)?;
281
282        Ok(FinalizeOffchainTxResponse {})
283    }
284
285    pub async fn get_pending_tx(
286        &self,
287        intent: ark_core::intent::Intent,
288    ) -> Result<Vec<ark_core::server::PendingTx>, Error> {
289        let mut client = self.ark_client()?;
290
291        let intent: Intent = intent.try_into()?;
292
293        let res = client
294            .get_pending_tx(generated::ark::v1::GetPendingTxRequest {
295                identifier: Some(
296                    generated::ark::v1::get_pending_tx_request::Identifier::Intent(intent),
297                ),
298            })
299            .await
300            .map_err(Error::request)?;
301
302        let inner = res.into_inner();
303        let base64 = base64::engine::GeneralPurpose::new(
304            &base64::alphabet::STANDARD,
305            base64::engine::GeneralPurposeConfig::new(),
306        );
307
308        inner
309            .pending_txs
310            .into_iter()
311            .map(|tx| {
312                let ark_txid = tx.ark_txid.parse().map_err(Error::conversion)?;
313
314                let signed_ark_tx = base64.decode(&tx.final_ark_tx).map_err(Error::conversion)?;
315                let signed_ark_tx = Psbt::deserialize(&signed_ark_tx).map_err(Error::conversion)?;
316
317                let signed_checkpoint_txs = tx
318                    .signed_checkpoint_txs
319                    .into_iter()
320                    .map(|cp| {
321                        let bytes = base64.decode(cp).map_err(Error::conversion)?;
322                        Psbt::deserialize(&bytes).map_err(Error::conversion)
323                    })
324                    .collect::<Result<Vec<_>, Error>>()?;
325
326                Ok(ark_core::server::PendingTx {
327                    ark_txid,
328                    signed_ark_tx,
329                    signed_checkpoint_txs,
330                })
331            })
332            .collect()
333    }
334
335    pub async fn confirm_registration(&self, intent_id: String) -> Result<(), Error> {
336        let mut client = self.ark_client()?;
337
338        client
339            .confirm_registration(ConfirmRegistrationRequest { intent_id })
340            .await
341            .map_err(Error::request)?;
342
343        Ok(())
344    }
345
346    pub async fn submit_tree_nonces(
347        &self,
348        batch_id: &str,
349        cosigner_pubkey: PublicKey,
350        pub_nonce_tree: NoncePks,
351    ) -> Result<(), Error> {
352        let mut client = self.ark_client()?;
353
354        client
355            .submit_tree_nonces(SubmitTreeNoncesRequest {
356                batch_id: batch_id.to_string(),
357                pubkey: cosigner_pubkey.to_string(),
358                tree_nonces: pub_nonce_tree.encode(),
359            })
360            .await
361            .map_err(Error::request)?;
362
363        Ok(())
364    }
365
366    pub async fn submit_tree_signatures(
367        &self,
368        batch_id: &str,
369        cosigner_pk: PublicKey,
370        partial_sig_tree: PartialSigTree,
371    ) -> Result<(), Error> {
372        let mut client = self.ark_client()?;
373
374        client
375            .submit_tree_signatures(SubmitTreeSignaturesRequest {
376                batch_id: batch_id.to_string(),
377                pubkey: cosigner_pk.to_string(),
378                tree_signatures: partial_sig_tree.encode(),
379            })
380            .await
381            .map_err(Error::request)?;
382
383        Ok(())
384    }
385
386    pub async fn submit_signed_forfeit_txs(
387        &self,
388        signed_forfeit_txs: Vec<Psbt>,
389        signed_commitment_tx: Option<Psbt>,
390    ) -> Result<(), Error> {
391        let mut client = self.ark_client()?;
392
393        let base64 = base64::engine::GeneralPurpose::new(
394            &base64::alphabet::STANDARD,
395            base64::engine::GeneralPurposeConfig::new(),
396        );
397
398        let signed_commitment_tx = signed_commitment_tx
399            .map(|tx| base64.encode(tx.serialize()))
400            .unwrap_or_default();
401
402        client
403            .submit_signed_forfeit_txs(SubmitSignedForfeitTxsRequest {
404                signed_forfeit_txs: signed_forfeit_txs
405                    .iter()
406                    .map(|psbt| base64.encode(psbt.serialize()))
407                    .collect(),
408                signed_commitment_tx,
409            })
410            .await
411            .map_err(Error::request)?;
412
413        Ok(())
414    }
415
416    pub async fn get_event_stream(
417        &self,
418        topics: Vec<String>,
419    ) -> Result<impl Stream<Item = Result<StreamEvent, Error>> + Unpin, Error> {
420        let mut client = self.ark_client()?;
421
422        let response = client
423            .get_event_stream(GetEventStreamRequest { topics })
424            .await
425            .map_err(Error::request)?;
426        let mut stream = response.into_inner();
427
428        let stream = stream! {
429            loop {
430                match stream.try_next().await {
431                    Ok(Some(event)) => match event.event {
432                        None => {
433                            log::debug!("Got empty message");
434                        }
435                        Some(event) => {
436                            yield Ok(StreamEvent::try_from(event)?);
437                        }
438                    },
439                    Ok(None) => {
440                        yield Err(Error::event_stream_disconnect());
441                    }
442                    Err(e) => {
443                        yield Err(Error::event_stream(e));
444                    }
445                }
446            }
447        };
448
449        Ok(stream.boxed())
450    }
451
452    pub async fn get_tx_stream(
453        &self,
454    ) -> Result<impl Stream<Item = Result<StreamTransactionData, Error>> + Unpin, Error> {
455        let mut client = self.ark_client()?;
456
457        let response = client
458            .get_transactions_stream(GetTransactionsStreamRequest {})
459            .await
460            .map_err(Error::request)?;
461
462        let mut stream = response.into_inner();
463
464        let stream = stream! {
465            loop {
466                match stream.try_next().await {
467                    Ok(Some(event)) => match event.data {
468                        None => {
469                            log::debug!("Got empty message");
470                        }
471                        Some(event) => {
472                            yield Ok(StreamTransactionData::try_from(event)?);
473                        }
474                    },
475                    Ok(None) => {
476                        yield Err(Error::event_stream_disconnect());
477                    }
478                    Err(e) => {
479                        yield Err(Error::event_stream(e));
480                    }
481                }
482            }
483        };
484
485        Ok(stream.boxed())
486    }
487
488    pub async fn get_vtxo_chain(
489        &self,
490        outpoint: Option<OutPoint>,
491        size_and_index: Option<(i32, i32)>,
492    ) -> Result<VtxoChainResponse, Error> {
493        let mut client = self.indexer_client()?;
494        let response = client
495            .get_vtxo_chain(generated::ark::v1::GetVtxoChainRequest {
496                outpoint: outpoint.map(|o| generated::ark::v1::IndexerOutpoint {
497                    txid: o.txid.to_string(),
498                    vout: o.vout,
499                }),
500                page: size_and_index
501                    .map(|(size, index)| generated::ark::v1::IndexerPageRequest { size, index }),
502            })
503            .await
504            .map_err(Error::request)?;
505        let response = response.into_inner();
506        let result = response.try_into()?;
507        Ok(result)
508    }
509
510    pub async fn get_virtual_txs(
511        &self,
512        txids: Vec<String>,
513        size_and_index: Option<(i32, i32)>,
514    ) -> Result<VirtualTxsResponse, Error> {
515        let mut client = self.indexer_client()?;
516        let response = client
517            .get_virtual_txs(generated::ark::v1::GetVirtualTxsRequest {
518                txids,
519                page: size_and_index
520                    .map(|(size, index)| generated::ark::v1::IndexerPageRequest { size, index }),
521            })
522            .await
523            .map_err(Error::request)?;
524        let response = response.into_inner();
525        let result = response.try_into()?;
526        Ok(result)
527    }
528
529    /// Allows to subscribe for tx notifications related to the provided
530    /// vtxo scripts.
531    ///
532    /// It can also be used to update an existing subscriptions by adding
533    /// new scripts to it.
534    ///
535    /// Note: for new subscriptions, don't provide a `subscription_id`
536    ///
537    /// Returns the subscription id if successful
538    pub async fn subscribe_to_scripts(
539        &self,
540        scripts: Vec<ArkAddress>,
541        subscription_id: Option<String>,
542    ) -> Result<String, Error> {
543        let mut client = self.indexer_client()?;
544        let scripts = scripts
545            .iter()
546            .map(|address| address.to_p2tr_script_pubkey().to_hex_string())
547            .collect::<Vec<_>>();
548
549        // For new subscription we expect empty string ("") here
550        let subscription_id = subscription_id.unwrap_or_default();
551
552        let response = client
553            .subscribe_for_scripts(SubscribeForScriptsRequest {
554                scripts,
555                subscription_id,
556            })
557            .await
558            .map_err(Error::request)?;
559
560        let response = response.into_inner();
561
562        Ok(response.subscription_id)
563    }
564
565    /// Allows to remove scripts from an existing subscription.
566    pub async fn unsubscribe_from_scripts(
567        &self,
568        scripts: Vec<ArkAddress>,
569        subscription_id: String,
570    ) -> Result<(), Error> {
571        let mut client = self.indexer_client()?;
572        let scripts = scripts
573            .iter()
574            .map(|address| address.to_p2tr_script_pubkey().to_hex_string())
575            .collect::<Vec<_>>();
576
577        let _ = client
578            .unsubscribe_for_scripts(UnsubscribeForScriptsRequest {
579                subscription_id,
580                scripts,
581            })
582            .await
583            .map_err(Error::request)?;
584
585        Ok(())
586    }
587
588    /// Gets a subscription stream that returns subscription responses.
589    pub async fn get_subscription(
590        &self,
591        subscription_id: String,
592    ) -> Result<impl Stream<Item = Result<SubscriptionResponse, Error>> + Unpin, Error> {
593        let mut client = self.indexer_client()?;
594
595        let response = client
596            .get_subscription(GetSubscriptionRequest { subscription_id })
597            .await
598            .map_err(Error::request)?;
599
600        let mut stream = response.into_inner();
601
602        let stream = stream! {
603            loop {
604                match stream.try_next().await {
605                    Ok(Some(response)) => {
606                        match SubscriptionResponse::try_from(response) {
607                            Ok(subscription_response) => {
608                                yield Ok(subscription_response);
609                            }
610                            Err(e) => {
611                                yield Err(e);
612                            }
613                        }
614                    }
615                    Ok(None) => {
616                        break;
617                    }
618                    Err(e) => {
619                        yield Err(Error::event_stream(e));
620                    }
621                }
622            }
623        };
624
625        Ok(stream.boxed())
626    }
627
628    pub async fn estimate_fees(
629        &self,
630        intent: ark_core::intent::Intent,
631    ) -> Result<SignedAmount, Error> {
632        let mut client = self.ark_client()?;
633
634        let intent = intent.try_into()?;
635        let response = client
636            .estimate_intent_fee(EstimateIntentFeeRequest {
637                intent: Some(intent),
638            })
639            .await
640            .map_err(Error::request)?;
641        let response = response.into_inner();
642
643        Ok(SignedAmount::from_sat(response.fee))
644    }
645
646    pub async fn get_asset(&self, asset_id: AssetId) -> Result<AssetInfo, Error> {
647        let mut client = self.indexer_client()?;
648
649        let response = client
650            .get_asset(generated::ark::v1::GetAssetRequest {
651                asset_id: asset_id.to_string(),
652            })
653            .await
654            .map_err(Error::request)?;
655
656        let inner = response.into_inner();
657
658        let supply = inner.supply.parse::<u64>().map_err(Error::conversion)?;
659
660        let asset_id = inner.asset_id.parse().map_err(Error::conversion)?;
661        let control_asset_id = if inner.control_asset.is_empty() {
662            None
663        } else {
664            Some(inner.control_asset.parse().map_err(Error::conversion)?)
665        };
666
667        Ok(AssetInfo {
668            asset_id,
669            control_asset_id,
670            supply,
671            metadata: inner.metadata,
672        })
673    }
674
675    fn ark_client(&self) -> Result<ArkServiceClient<InterceptedChannel>, Error> {
676        // Cloning an `ArkServiceClient<Channel>` is cheap.
677        self.ark_client.clone().ok_or(Error::not_connected())
678    }
679    fn indexer_client(&self) -> Result<IndexerServiceClient<InterceptedChannel>, Error> {
680        self.indexer_client.clone().ok_or(Error::not_connected())
681    }
682}
683
684impl TryFrom<ark_core::intent::Intent> for Intent {
685    type Error = Error;
686
687    fn try_from(value: ark_core::intent::Intent) -> Result<Self, Self::Error> {
688        Ok(Self {
689            proof: value.serialize_proof(),
690            message: value.serialize_message().map_err(Error::conversion)?,
691        })
692    }
693}
694
695impl TryFrom<generated::ark::v1::BatchStartedEvent> for BatchStartedEvent {
696    type Error = Error;
697
698    fn try_from(value: generated::ark::v1::BatchStartedEvent) -> Result<Self, Self::Error> {
699        let batch_expiry = parse_sequence_number(value.batch_expiry).map_err(Error::conversion)?;
700
701        Ok(BatchStartedEvent {
702            id: value.id,
703            intent_id_hashes: value.intent_id_hashes,
704            batch_expiry,
705        })
706    }
707}
708
709impl TryFrom<generated::ark::v1::StreamStartedEvent> for StreamStartedEvent {
710    type Error = Error;
711
712    fn try_from(value: generated::ark::v1::StreamStartedEvent) -> Result<Self, Self::Error> {
713        Ok(StreamStartedEvent { id: value.id })
714    }
715}
716
717impl TryFrom<generated::ark::v1::BatchFinalizationEvent> for BatchFinalizationEvent {
718    type Error = Error;
719
720    fn try_from(value: generated::ark::v1::BatchFinalizationEvent) -> Result<Self, Self::Error> {
721        let base64 = &base64::engine::GeneralPurpose::new(
722            &base64::alphabet::STANDARD,
723            base64::engine::GeneralPurposeConfig::new(),
724        );
725
726        let commitment_tx = base64
727            .decode(&value.commitment_tx)
728            .map_err(Error::conversion)?;
729        let commitment_tx = Psbt::deserialize(&commitment_tx).map_err(Error::conversion)?;
730
731        Ok(BatchFinalizationEvent {
732            id: value.id,
733            commitment_tx,
734        })
735    }
736}
737
738impl TryFrom<generated::ark::v1::BatchFinalizedEvent> for BatchFinalizedEvent {
739    type Error = Error;
740
741    fn try_from(value: generated::ark::v1::BatchFinalizedEvent) -> Result<Self, Self::Error> {
742        let commitment_txid = value.commitment_txid.parse().map_err(Error::conversion)?;
743
744        Ok(BatchFinalizedEvent {
745            id: value.id,
746            commitment_txid,
747        })
748    }
749}
750
751impl From<generated::ark::v1::BatchFailedEvent> for BatchFailed {
752    fn from(value: generated::ark::v1::BatchFailedEvent) -> Self {
753        BatchFailed {
754            id: value.id,
755            reason: value.reason,
756        }
757    }
758}
759
760impl TryFrom<generated::ark::v1::TreeSigningStartedEvent> for TreeSigningStartedEvent {
761    type Error = Error;
762
763    fn try_from(value: generated::ark::v1::TreeSigningStartedEvent) -> Result<Self, Self::Error> {
764        let unsigned_commitment_tx = base64::engine::GeneralPurpose::new(
765            &base64::alphabet::STANDARD,
766            base64::engine::GeneralPurposeConfig::new(),
767        )
768        .decode(&value.unsigned_commitment_tx)
769        .map_err(Error::conversion)?;
770
771        let unsigned_commitment_tx =
772            Psbt::deserialize(&unsigned_commitment_tx).map_err(Error::conversion)?;
773
774        Ok(TreeSigningStartedEvent {
775            id: value.id,
776            cosigners_pubkeys: value
777                .cosigners_pubkeys
778                .into_iter()
779                .map(|pk| pk.parse().map_err(Error::conversion))
780                .collect::<Result<Vec<_>, Error>>()?,
781            unsigned_commitment_tx,
782        })
783    }
784}
785
786impl TryFrom<generated::ark::v1::TreeNoncesAggregatedEvent> for TreeNoncesAggregatedEvent {
787    type Error = Error;
788
789    fn try_from(value: generated::ark::v1::TreeNoncesAggregatedEvent) -> Result<Self, Self::Error> {
790        let tree_nonces = NoncePks::decode(value.tree_nonces).map_err(Error::conversion)?;
791
792        Ok(TreeNoncesAggregatedEvent {
793            id: value.id,
794            tree_nonces,
795        })
796    }
797}
798
799impl TryFrom<generated::ark::v1::TreeTxEvent> for TreeTxEvent {
800    type Error = Error;
801
802    fn try_from(value: generated::ark::v1::TreeTxEvent) -> Result<Self, Self::Error> {
803        let batch_tree_event_type = match value.batch_index {
804            0 => BatchTreeEventType::Vtxo,
805            1 => BatchTreeEventType::Connector,
806            n => return Err(Error::conversion(format!("unsupported batch index: {n}"))),
807        };
808
809        let txid = if value.txid.is_empty() {
810            None
811        } else {
812            Some(value.txid.parse().map_err(Error::conversion)?)
813        };
814
815        let base64 = &base64::engine::GeneralPurpose::new(
816            &base64::alphabet::STANDARD,
817            base64::engine::GeneralPurposeConfig::new(),
818        );
819
820        let bytes = base64.decode(&value.tx).map_err(Error::conversion)?;
821        let tx = Psbt::deserialize(&bytes).map_err(Error::conversion)?;
822
823        let children = value
824            .children
825            .iter()
826            .map(|(index, txid)| Ok((*index, txid.parse().map_err(Error::conversion)?)))
827            .collect::<Result<HashMap<_, _>, Error>>()?;
828
829        Ok(Self {
830            id: value.id,
831            topic: value.topic,
832            batch_tree_event_type,
833            tx_graph_chunk: TxGraphChunk { txid, tx, children },
834        })
835    }
836}
837
838impl TryFrom<generated::ark::v1::TreeSignatureEvent> for TreeSignatureEvent {
839    type Error = Error;
840
841    fn try_from(value: generated::ark::v1::TreeSignatureEvent) -> Result<Self, Self::Error> {
842        let batch_tree_event_type = match value.batch_index {
843            0 => BatchTreeEventType::Vtxo,
844            1 => BatchTreeEventType::Connector,
845            n => return Err(Error::conversion(format!("unsupported batch index: {n}"))),
846        };
847
848        let txid = value.txid.parse().map_err(Error::conversion)?;
849
850        let signature = Vec::from_hex(&value.signature).map_err(Error::conversion)?;
851        let signature = Signature::from_slice(&signature).map_err(Error::conversion)?;
852
853        Ok(Self {
854            id: value.id,
855            topic: value.topic,
856            batch_tree_event_type,
857            txid,
858            signature,
859        })
860    }
861}
862
863impl TryFrom<generated::ark::v1::TreeNoncesEvent> for TreeNoncesEvent {
864    type Error = Error;
865
866    fn try_from(value: generated::ark::v1::TreeNoncesEvent) -> Result<Self, Self::Error> {
867        let txid = value.txid.parse().map_err(Error::conversion)?;
868
869        let nonces = TreeTxNoncePks::decode(value.nonces).map_err(Error::conversion)?;
870
871        Ok(Self {
872            id: value.id,
873            topic: value.topic,
874            txid,
875            nonces,
876        })
877    }
878}
879
880impl TryFrom<generated::ark::v1::get_event_stream_response::Event> for StreamEvent {
881    type Error = Error;
882
883    fn try_from(
884        value: generated::ark::v1::get_event_stream_response::Event,
885    ) -> Result<Self, Self::Error> {
886        Ok(match value {
887            generated::ark::v1::get_event_stream_response::Event::StreamStarted(e) => {
888                StreamEvent::StreamStarted(e.try_into()?)
889            }
890            generated::ark::v1::get_event_stream_response::Event::BatchStarted(e) => {
891                StreamEvent::BatchStarted(e.try_into()?)
892            }
893            generated::ark::v1::get_event_stream_response::Event::BatchFinalization(e) => {
894                StreamEvent::BatchFinalization(e.try_into()?)
895            }
896            generated::ark::v1::get_event_stream_response::Event::BatchFinalized(e) => {
897                StreamEvent::BatchFinalized(e.try_into()?)
898            }
899            generated::ark::v1::get_event_stream_response::Event::BatchFailed(e) => {
900                StreamEvent::BatchFailed(e.into())
901            }
902            generated::ark::v1::get_event_stream_response::Event::TreeSigningStarted(e) => {
903                StreamEvent::TreeSigningStarted(e.try_into()?)
904            }
905            generated::ark::v1::get_event_stream_response::Event::TreeNoncesAggregated(e) => {
906                StreamEvent::TreeNoncesAggregated(e.try_into()?)
907            }
908            generated::ark::v1::get_event_stream_response::Event::TreeTx(e) => {
909                StreamEvent::TreeTx(e.try_into()?)
910            }
911            generated::ark::v1::get_event_stream_response::Event::TreeSignature(e) => {
912                StreamEvent::TreeSignature(e.try_into()?)
913            }
914            generated::ark::v1::get_event_stream_response::Event::TreeNonces(e) => {
915                StreamEvent::TreeNonces(e.try_into()?)
916            }
917            generated::ark::v1::get_event_stream_response::Event::Heartbeat(_) => {
918                StreamEvent::Heartbeat
919            }
920        })
921    }
922}
923
924impl TryFrom<generated::ark::v1::get_transactions_stream_response::Data> for StreamTransactionData {
925    type Error = Error;
926
927    fn try_from(
928        value: generated::ark::v1::get_transactions_stream_response::Data,
929    ) -> Result<Self, Self::Error> {
930        match value {
931            generated::ark::v1::get_transactions_stream_response::Data::CommitmentTx(
932                commitment_tx,
933            ) => Ok(StreamTransactionData::Commitment(
934                CommitmentTransaction::try_from(commitment_tx)?,
935            )),
936            generated::ark::v1::get_transactions_stream_response::Data::ArkTx(redeem) => Ok(
937                StreamTransactionData::Ark(ArkTransaction::try_from(redeem)?),
938            ),
939            generated::ark::v1::get_transactions_stream_response::Data::Heartbeat(_) => {
940                Ok(StreamTransactionData::Heartbeat)
941            }
942            generated::ark::v1::get_transactions_stream_response::Data::SweepTx(tx) => {
943                Ok(StreamTransactionData::Ark(ArkTransaction::try_from(tx)?))
944            }
945        }
946    }
947}
948
949impl TryFrom<generated::ark::v1::TxNotification> for CommitmentTransaction {
950    type Error = Error;
951
952    fn try_from(value: generated::ark::v1::TxNotification) -> Result<Self, Self::Error> {
953        let spent_vtxos = value
954            .spent_vtxos
955            .iter()
956            .map(VirtualTxOutPoint::try_from)
957            .collect::<Result<Vec<_>, _>>()?;
958
959        let spendable_vtxos = value
960            .spendable_vtxos
961            .iter()
962            .map(VirtualTxOutPoint::try_from)
963            .collect::<Result<Vec<_>, _>>()?;
964
965        Ok(CommitmentTransaction {
966            txid: Txid::from_str(value.txid.as_str()).map_err(Error::conversion)?,
967            spent_vtxos,
968            unspent_vtxos: spendable_vtxos,
969        })
970    }
971}
972
973impl TryFrom<generated::ark::v1::TxNotification> for ArkTransaction {
974    type Error = Error;
975
976    fn try_from(value: generated::ark::v1::TxNotification) -> Result<Self, Self::Error> {
977        let spent_vtxos = value
978            .spent_vtxos
979            .iter()
980            .map(VirtualTxOutPoint::try_from)
981            .collect::<Result<Vec<_>, _>>()?;
982
983        let spendable_vtxos = value
984            .spendable_vtxos
985            .iter()
986            .map(VirtualTxOutPoint::try_from)
987            .collect::<Result<Vec<_>, _>>()?;
988
989        let tx = if value.tx.is_empty() {
990            None
991        } else {
992            let base64 = base64::engine::GeneralPurpose::new(
993                &base64::alphabet::STANDARD,
994                base64::engine::GeneralPurposeConfig::new(),
995            );
996            let bytes = base64.decode(&value.tx).map_err(Error::conversion)?;
997            Some(Psbt::deserialize(&bytes).map_err(Error::conversion)?)
998        };
999
1000        let checkpoint_txs = value
1001            .checkpoint_txs
1002            .into_iter()
1003            .map(|(k, v)| {
1004                let out_point = OutPoint::from_str(k.as_str()).map_err(Error::conversion)?;
1005                let txid = v.txid.parse().map_err(Error::conversion)?;
1006                Ok((out_point, txid))
1007            })
1008            .collect::<Result<HashMap<_, _>, Error>>()?;
1009
1010        let swept_vtxos = value
1011            .swept_vtxos
1012            .into_iter()
1013            .map(OutPoint::try_from)
1014            .collect::<Result<Vec<_>, _>>()?;
1015
1016        Ok(ArkTransaction {
1017            txid: Txid::from_str(value.txid.as_str()).map_err(Error::conversion)?,
1018            tx,
1019            spent_vtxos,
1020            unspent_vtxos: spendable_vtxos,
1021            checkpoint_txs,
1022            swept_vtxos,
1023        })
1024    }
1025}
1026
1027impl TryFrom<Outpoint> for OutPoint {
1028    type Error = Error;
1029
1030    fn try_from(value: Outpoint) -> Result<Self, Self::Error> {
1031        let point = OutPoint {
1032            txid: Txid::from_str(value.txid.as_str()).map_err(Error::conversion)?,
1033            vout: value.vout,
1034        };
1035        Ok(point)
1036    }
1037}
1038
1039pub struct VtxoChainResponse {
1040    pub chains: VtxoChains,
1041    pub page: Option<IndexerPage>,
1042}
1043
1044pub struct ListVtxosResponse {
1045    pub vtxos: Vec<VirtualTxOutPoint>,
1046    pub page: Option<IndexerPage>,
1047}
1048
1049impl TryFrom<generated::ark::v1::GetVtxoChainResponse> for VtxoChainResponse {
1050    type Error = Error;
1051
1052    fn try_from(value: generated::ark::v1::GetVtxoChainResponse) -> Result<Self, Self::Error> {
1053        let chains = value
1054            .chain
1055            .iter()
1056            .map(VtxoChain::try_from)
1057            .collect::<Result<Vec<_>, Error>>()?;
1058
1059        Ok(VtxoChainResponse {
1060            chains: VtxoChains { inner: chains },
1061            page: value
1062                .page
1063                .map(IndexerPage::try_from)
1064                .transpose()
1065                .map_err(Error::conversion)?,
1066        })
1067    }
1068}
1069
1070impl TryFrom<generated::ark::v1::GetVirtualTxsResponse> for VirtualTxsResponse {
1071    type Error = Error;
1072
1073    fn try_from(value: generated::ark::v1::GetVirtualTxsResponse) -> Result<Self, Self::Error> {
1074        let base64 = &base64::engine::GeneralPurpose::new(
1075            &base64::alphabet::STANDARD,
1076            base64::engine::GeneralPurposeConfig::new(),
1077        );
1078
1079        let txs = value
1080            .txs
1081            .into_iter()
1082            .map(|tx| {
1083                let bytes = base64.decode(&tx).map_err(Error::conversion)?;
1084                let psbt = Psbt::deserialize(&bytes).map_err(Error::conversion)?;
1085
1086                Ok(psbt)
1087            })
1088            .collect::<Result<Vec<_>, _>>()?;
1089
1090        Ok(VirtualTxsResponse {
1091            txs,
1092            page: value
1093                .page
1094                .map(IndexerPage::try_from)
1095                .transpose()
1096                .map_err(Error::conversion)?,
1097        })
1098    }
1099}
1100
1101impl TryFrom<&generated::ark::v1::IndexerChain> for VtxoChain {
1102    type Error = Error;
1103
1104    fn try_from(value: &generated::ark::v1::IndexerChain) -> Result<Self, Self::Error> {
1105        let spends = value
1106            .spends
1107            .iter()
1108            .map(|txid| {
1109                // Handle the case where txid might be 66 bytes long by trimming the last 2 bytes.
1110                let txid_str = if txid.len() == 66 { &txid[..64] } else { txid };
1111                txid_str.parse().map_err(Error::conversion)
1112            })
1113            .collect::<Result<Vec<_>, Error>>()?;
1114
1115        let tx_type = match value.r#type() {
1116            IndexerChainedTxType::Unspecified => ChainedTxType::Unspecified,
1117            IndexerChainedTxType::Commitment => ChainedTxType::Commitment,
1118            IndexerChainedTxType::Ark => ChainedTxType::Ark,
1119            IndexerChainedTxType::Tree => ChainedTxType::Tree,
1120            IndexerChainedTxType::Checkpoint => ChainedTxType::Checkpoint,
1121        };
1122
1123        Ok(VtxoChain {
1124            txid: value.txid.parse().map_err(Error::conversion)?,
1125            tx_type,
1126            spends,
1127            expires_at: value.expires_at,
1128        })
1129    }
1130}
1131
1132impl From<generated::ark::v1::IndexerPageResponse> for IndexerPage {
1133    fn from(value: generated::ark::v1::IndexerPageResponse) -> Self {
1134        IndexerPage {
1135            current: value.current,
1136            next: value.next,
1137            total: value.total,
1138        }
1139    }
1140}
1141
1142impl TryFrom<&generated::ark::v1::IndexerTxHistoryRecord> for history::Transaction {
1143    type Error = Error;
1144
1145    fn try_from(value: &generated::ark::v1::IndexerTxHistoryRecord) -> Result<Self, Self::Error> {
1146        let sign = match value.r#type() {
1147            generated::ark::v1::IndexerTxType::Received => 1,
1148            // Default to sent if unspecified.
1149            generated::ark::v1::IndexerTxType::Sent
1150            | generated::ark::v1::IndexerTxType::Unspecified => -1,
1151        };
1152
1153        let amount = SignedAmount::from_sat(value.amount as i64 * sign);
1154
1155        let tx = match &value.key {
1156            Some(Key::CommitmentTxid(txid)) => history::Transaction::Commitment {
1157                txid: txid.parse().map_err(Error::conversion)?,
1158                amount,
1159                created_at: value.created_at,
1160            },
1161            Some(Key::VirtualTxid(txid)) => history::Transaction::Ark {
1162                txid: txid.parse().map_err(Error::conversion)?,
1163                amount,
1164                is_settled: value.is_settled,
1165                created_at: value.created_at,
1166            },
1167            None => return Err(Error::conversion("invalid transaction without key")),
1168        };
1169
1170        Ok(tx)
1171    }
1172}
1173
1174impl TryFrom<generated::ark::v1::GetSubscriptionResponse> for SubscriptionResponse {
1175    type Error = Error;
1176
1177    fn try_from(value: generated::ark::v1::GetSubscriptionResponse) -> Result<Self, Self::Error> {
1178        let value = match value.data {
1179            Some(get_subscription_response::Data::Heartbeat(_)) => return Ok(Self::Heartbeat),
1180            Some(get_subscription_response::Data::Event(event)) => event,
1181            None => return Err(Error::conversion("empty subscription response")),
1182        };
1183
1184        let txid = value.txid.parse().map_err(Error::conversion)?;
1185
1186        let new_vtxos = value
1187            .new_vtxos
1188            .iter()
1189            .map(VirtualTxOutPoint::try_from)
1190            .collect::<Result<Vec<_>, _>>()?;
1191
1192        let spent_vtxos = value
1193            .spent_vtxos
1194            .iter()
1195            .map(VirtualTxOutPoint::try_from)
1196            .collect::<Result<Vec<_>, _>>()?;
1197
1198        let tx = if value.tx.is_empty() {
1199            None
1200        } else {
1201            match Vec::from_hex(&value.tx)
1202                .ok()
1203                .and_then(|bytes| bitcoin::consensus::deserialize::<Transaction>(&bytes).ok())
1204            {
1205                Some(raw_tx) => Some(raw_tx),
1206                None => {
1207                    let base64 = base64::engine::GeneralPurpose::new(
1208                        &base64::alphabet::STANDARD,
1209                        base64::engine::GeneralPurposeConfig::new(),
1210                    );
1211                    let bytes = base64.decode(&value.tx).map_err(|e| {
1212                        let tx_prefix = value.tx.chars().take(24).collect::<String>();
1213                        Error::conversion(format!(
1214                            "invalid subscription tx payload (not hex tx or base64 psbt) (len={}, tx_prefix='{}'): {e}",
1215                            value.tx.len(),
1216                            tx_prefix
1217                        ))
1218                    })?;
1219                    let psbt = Psbt::deserialize(&bytes).map_err(|e| {
1220                        let tx_prefix = value.tx.chars().take(24).collect::<String>();
1221                        Error::conversion(format!(
1222                            "invalid subscription tx payload (base64 but not psbt) (len={}, tx_prefix='{}'): {e}",
1223                            value.tx.len(),
1224                            tx_prefix
1225                        ))
1226                    })?;
1227                    Some(psbt.unsigned_tx)
1228                }
1229            }
1230        };
1231
1232        let checkpoint_txs = value
1233            .checkpoint_txs
1234            .into_iter()
1235            .map(|(k, v)| {
1236                let out_point = OutPoint::from_str(k.as_str()).map_err(Error::conversion)?;
1237                let txid = v.txid.parse().map_err(Error::conversion)?;
1238                Ok((out_point, txid))
1239            })
1240            .collect::<Result<HashMap<_, _>, Error>>()?;
1241
1242        let scripts = value
1243            .scripts
1244            .iter()
1245            .map(|h| ScriptBuf::from_hex(h).map_err(Error::conversion))
1246            .collect::<Result<Vec<_>, _>>()?;
1247
1248        Ok(Self::Event(Box::new(SubscriptionEvent {
1249            txid,
1250            scripts,
1251            new_vtxos,
1252            spent_vtxos,
1253            tx,
1254            checkpoint_txs,
1255        })))
1256    }
1257}
1258
1259impl From<GetVtxosRequest> for generated::ark::v1::GetVtxosRequest {
1260    fn from(value: GetVtxosRequest) -> Self {
1261        let (spendable_only, spent_only, recoverable_only, pending_only) = match value.filter() {
1262            Some(GetVtxosRequestFilter::Spendable) => (true, false, false, false),
1263            Some(GetVtxosRequestFilter::Spent) => (false, true, false, false),
1264            Some(GetVtxosRequestFilter::Recoverable) => (false, false, true, false),
1265            Some(GetVtxosRequestFilter::PendingOnly) => (false, false, false, true),
1266            None => (false, false, false, false),
1267        };
1268
1269        let page = value
1270            .page()
1271            .map(|p| generated::ark::v1::IndexerPageRequest {
1272                size: p.size,
1273                index: p.index,
1274            });
1275
1276        match value.reference() {
1277            GetVtxosRequestReference::Scripts(script_bufs) => Self {
1278                scripts: script_bufs.iter().map(|s| s.to_hex_string()).collect(),
1279                outpoints: Vec::new(),
1280                spendable_only,
1281                spent_only,
1282                recoverable_only,
1283                page,
1284                pending_only,
1285                after: value.after().unwrap_or(0) as i64,
1286                before: value.before().unwrap_or(0) as i64,
1287            },
1288            GetVtxosRequestReference::OutPoints(outpoints) => Self {
1289                scripts: Vec::new(),
1290                outpoints: outpoints.iter().map(|o| o.to_string()).collect(),
1291                spendable_only,
1292                spent_only,
1293                recoverable_only,
1294                page,
1295                pending_only,
1296                after: value.after().unwrap_or(0) as i64,
1297                before: value.before().unwrap_or(0) as i64,
1298            },
1299        }
1300    }
1301}