Skip to main content

ark_grpc/
client.rs

1use crate::generated;
2use crate::generated::ark::v1::ark_service_client::ArkServiceClient;
3use crate::generated::ark::v1::get_subscription_response;
4use crate::generated::ark::v1::indexer_service_client::IndexerServiceClient;
5use crate::generated::ark::v1::indexer_tx_history_record::Key;
6use crate::generated::ark::v1::ConfirmRegistrationRequest;
7use crate::generated::ark::v1::EstimateIntentFeeRequest;
8use crate::generated::ark::v1::GetEventStreamRequest;
9use crate::generated::ark::v1::GetInfoRequest;
10use crate::generated::ark::v1::GetSubscriptionRequest;
11use crate::generated::ark::v1::GetTransactionsStreamRequest;
12use crate::generated::ark::v1::IndexerChainedTxType;
13use crate::generated::ark::v1::Intent;
14use crate::generated::ark::v1::Outpoint;
15use crate::generated::ark::v1::RegisterIntentRequest;
16use crate::generated::ark::v1::SubmitSignedForfeitTxsRequest;
17use crate::generated::ark::v1::SubmitTreeNoncesRequest;
18use crate::generated::ark::v1::SubmitTreeSignaturesRequest;
19use crate::generated::ark::v1::SubscribeForScriptsRequest;
20use crate::generated::ark::v1::UnsubscribeForScriptsRequest;
21use crate::Error;
22use ark_core::history;
23use ark_core::server::parse_sequence_number;
24use ark_core::server::ArkTransaction;
25use ark_core::server::BatchFailed;
26use ark_core::server::BatchFinalizationEvent;
27use ark_core::server::BatchFinalizedEvent;
28use ark_core::server::BatchStartedEvent;
29use ark_core::server::BatchTreeEventType;
30use ark_core::server::ChainedTxType;
31use ark_core::server::CommitmentTransaction;
32use ark_core::server::FinalizeOffchainTxResponse;
33use ark_core::server::GetVtxosRequest;
34use ark_core::server::GetVtxosRequestFilter;
35use ark_core::server::GetVtxosRequestReference;
36use ark_core::server::IndexerPage;
37use ark_core::server::Info;
38use ark_core::server::NoncePks;
39use ark_core::server::PartialSigTree;
40use ark_core::server::StreamEvent;
41use ark_core::server::StreamTransactionData;
42use ark_core::server::SubmitOffchainTxResponse;
43use ark_core::server::SubscriptionEvent;
44use ark_core::server::SubscriptionResponse;
45use ark_core::server::TreeNoncesAggregatedEvent;
46use ark_core::server::TreeNoncesEvent;
47use ark_core::server::TreeSignatureEvent;
48use ark_core::server::TreeSigningStartedEvent;
49use ark_core::server::TreeTxEvent;
50use ark_core::server::TreeTxNoncePks;
51use ark_core::server::VirtualTxOutPoint;
52use ark_core::server::VirtualTxsResponse;
53use ark_core::server::VtxoChain;
54use ark_core::server::VtxoChains;
55use ark_core::ArkAddress;
56use ark_core::TxGraphChunk;
57use async_stream::stream;
58use base64::Engine;
59use bitcoin::hex::FromHex;
60use bitcoin::secp256k1::PublicKey;
61use bitcoin::taproot::Signature;
62use bitcoin::OutPoint;
63use bitcoin::Psbt;
64use bitcoin::ScriptBuf;
65use bitcoin::SignedAmount;
66use bitcoin::Txid;
67use futures::Stream;
68use futures::StreamExt;
69use futures::TryStreamExt;
70use std::collections::HashMap;
71use std::str::FromStr;
72
73#[derive(Debug, Clone)]
74pub struct Client {
75    url: String,
76    ark_client: Option<ArkServiceClient<tonic::transport::Channel>>,
77    indexer_client: Option<IndexerServiceClient<tonic::transport::Channel>>,
78}
79
80impl Client {
81    pub fn new(url: String) -> Self {
82        Self {
83            url,
84            ark_client: None,
85            indexer_client: None,
86        }
87    }
88
89    pub async fn connect(&mut self) -> Result<(), Error> {
90        let ark_service_client = ArkServiceClient::connect(self.url.clone())
91            .await
92            .map_err(Error::connect)?;
93        let indexer_client = IndexerServiceClient::connect(self.url.clone())
94            .await
95            .map_err(Error::connect)?;
96
97        self.ark_client = Some(ark_service_client);
98        self.indexer_client = Some(indexer_client);
99        Ok(())
100    }
101
102    pub async fn get_info(&mut self) -> Result<Info, Error> {
103        let mut client = self.ark_client()?;
104
105        let response = client
106            .get_info(GetInfoRequest {})
107            .await
108            .map_err(Error::request)?;
109
110        response.into_inner().try_into()
111    }
112
113    /// List VTXOs with pagination support.
114    /// Returns a single page of results along with pagination info.
115    pub async fn list_vtxos(&self, request: GetVtxosRequest) -> Result<ListVtxosResponse, Error> {
116        if request.reference().is_empty() {
117            return Ok(ListVtxosResponse {
118                vtxos: Vec::new(),
119                page: None,
120            });
121        }
122
123        let mut client = self.indexer_client()?;
124
125        let response = client
126            .get_vtxos(generated::ark::v1::GetVtxosRequest::from(request))
127            .await
128            .map_err(Error::request)?;
129
130        let inner = response.into_inner();
131
132        let vtxos = inner
133            .vtxos
134            .iter()
135            .map(VirtualTxOutPoint::try_from)
136            .collect::<Result<Vec<_>, _>>()?;
137
138        let page = inner
139            .page
140            .map(IndexerPage::try_from)
141            .transpose()
142            .map_err(Error::conversion)?;
143
144        Ok(ListVtxosResponse { vtxos, page })
145    }
146
147    pub async fn register_intent(&self, intent: ark_core::intent::Intent) -> Result<String, Error> {
148        let mut client = self.ark_client()?;
149
150        let intent = intent.try_into()?;
151        let request = RegisterIntentRequest {
152            intent: Some(intent),
153        };
154
155        let response = client
156            .register_intent(request)
157            .await
158            .map_err(Error::request)?;
159
160        let intent_id = response.into_inner().intent_id;
161
162        Ok(intent_id)
163    }
164
165    pub async fn submit_offchain_transaction_request(
166        &self,
167        ark_tx: Psbt,
168        checkpoint_txs: Vec<Psbt>,
169    ) -> Result<SubmitOffchainTxResponse, Error> {
170        let mut client = self.ark_client()?;
171
172        let base64 = base64::engine::GeneralPurpose::new(
173            &base64::alphabet::STANDARD,
174            base64::engine::GeneralPurposeConfig::new(),
175        );
176
177        let ark_tx = base64.encode(ark_tx.serialize());
178
179        let checkpoint_txs = checkpoint_txs
180            .into_iter()
181            .map(|tx| base64.encode(tx.serialize()))
182            .collect();
183
184        let res = client
185            .submit_tx(generated::ark::v1::SubmitTxRequest {
186                signed_ark_tx: ark_tx,
187                checkpoint_txs,
188            })
189            .await
190            .map_err(Error::request)?;
191
192        let res = res.into_inner();
193
194        let signed_ark_tx = res.final_ark_tx;
195        let signed_ark_tx = base64.decode(signed_ark_tx).map_err(Error::conversion)?;
196        let signed_ark_tx = Psbt::deserialize(&signed_ark_tx).map_err(Error::conversion)?;
197
198        let signed_checkpoint_txs = res
199            .signed_checkpoint_txs
200            .into_iter()
201            .map(|tx| {
202                let tx = base64.decode(tx).map_err(Error::conversion)?;
203                let tx = Psbt::deserialize(&tx).map_err(Error::conversion)?;
204
205                Ok(tx)
206            })
207            .collect::<Result<Vec<_>, Error>>()?;
208
209        Ok(SubmitOffchainTxResponse {
210            signed_ark_tx,
211            signed_checkpoint_txs,
212        })
213    }
214
215    pub async fn finalize_offchain_transaction(
216        &self,
217        txid: Txid,
218        checkpoint_txs: Vec<Psbt>,
219    ) -> Result<FinalizeOffchainTxResponse, Error> {
220        let mut client = self.ark_client()?;
221
222        let base64 = base64::engine::GeneralPurpose::new(
223            &base64::alphabet::STANDARD,
224            base64::engine::GeneralPurposeConfig::new(),
225        );
226
227        let checkpoint_txs = checkpoint_txs
228            .into_iter()
229            .map(|tx| base64.encode(tx.serialize()))
230            .collect();
231
232        client
233            .finalize_tx(generated::ark::v1::FinalizeTxRequest {
234                ark_txid: txid.to_string(),
235                final_checkpoint_txs: checkpoint_txs,
236            })
237            .await
238            .map_err(Error::request)?;
239
240        Ok(FinalizeOffchainTxResponse {})
241    }
242
243    pub async fn get_pending_tx(
244        &self,
245        intent: ark_core::intent::Intent,
246    ) -> Result<Vec<ark_core::server::PendingTx>, Error> {
247        let mut client = self.ark_client()?;
248
249        let intent: Intent = intent.try_into()?;
250
251        let res = client
252            .get_pending_tx(generated::ark::v1::GetPendingTxRequest {
253                identifier: Some(
254                    generated::ark::v1::get_pending_tx_request::Identifier::Intent(intent),
255                ),
256            })
257            .await
258            .map_err(Error::request)?;
259
260        let inner = res.into_inner();
261        let base64 = base64::engine::GeneralPurpose::new(
262            &base64::alphabet::STANDARD,
263            base64::engine::GeneralPurposeConfig::new(),
264        );
265
266        inner
267            .pending_txs
268            .into_iter()
269            .map(|tx| {
270                let ark_txid = tx.ark_txid.parse().map_err(Error::conversion)?;
271
272                let signed_ark_tx = base64.decode(&tx.final_ark_tx).map_err(Error::conversion)?;
273                let signed_ark_tx = Psbt::deserialize(&signed_ark_tx).map_err(Error::conversion)?;
274
275                let signed_checkpoint_txs = tx
276                    .signed_checkpoint_txs
277                    .into_iter()
278                    .map(|cp| {
279                        let bytes = base64.decode(cp).map_err(Error::conversion)?;
280                        Psbt::deserialize(&bytes).map_err(Error::conversion)
281                    })
282                    .collect::<Result<Vec<_>, Error>>()?;
283
284                Ok(ark_core::server::PendingTx {
285                    ark_txid,
286                    signed_ark_tx,
287                    signed_checkpoint_txs,
288                })
289            })
290            .collect()
291    }
292
293    pub async fn confirm_registration(&self, intent_id: String) -> Result<(), Error> {
294        let mut client = self.ark_client()?;
295
296        client
297            .confirm_registration(ConfirmRegistrationRequest { intent_id })
298            .await
299            .map_err(Error::request)?;
300
301        Ok(())
302    }
303
304    pub async fn submit_tree_nonces(
305        &self,
306        batch_id: &str,
307        cosigner_pubkey: PublicKey,
308        pub_nonce_tree: NoncePks,
309    ) -> Result<(), Error> {
310        let mut client = self.ark_client()?;
311
312        client
313            .submit_tree_nonces(SubmitTreeNoncesRequest {
314                batch_id: batch_id.to_string(),
315                pubkey: cosigner_pubkey.to_string(),
316                tree_nonces: pub_nonce_tree.encode(),
317            })
318            .await
319            .map_err(Error::request)?;
320
321        Ok(())
322    }
323
324    pub async fn submit_tree_signatures(
325        &self,
326        batch_id: &str,
327        cosigner_pk: PublicKey,
328        partial_sig_tree: PartialSigTree,
329    ) -> Result<(), Error> {
330        let mut client = self.ark_client()?;
331
332        client
333            .submit_tree_signatures(SubmitTreeSignaturesRequest {
334                batch_id: batch_id.to_string(),
335                pubkey: cosigner_pk.to_string(),
336                tree_signatures: partial_sig_tree.encode(),
337            })
338            .await
339            .map_err(Error::request)?;
340
341        Ok(())
342    }
343
344    pub async fn submit_signed_forfeit_txs(
345        &self,
346        signed_forfeit_txs: Vec<Psbt>,
347        signed_commitment_tx: Option<Psbt>,
348    ) -> Result<(), Error> {
349        let mut client = self.ark_client()?;
350
351        let base64 = base64::engine::GeneralPurpose::new(
352            &base64::alphabet::STANDARD,
353            base64::engine::GeneralPurposeConfig::new(),
354        );
355
356        let signed_commitment_tx = signed_commitment_tx
357            .map(|tx| base64.encode(tx.serialize()))
358            .unwrap_or_default();
359
360        client
361            .submit_signed_forfeit_txs(SubmitSignedForfeitTxsRequest {
362                signed_forfeit_txs: signed_forfeit_txs
363                    .iter()
364                    .map(|psbt| base64.encode(psbt.serialize()))
365                    .collect(),
366                signed_commitment_tx,
367            })
368            .await
369            .map_err(Error::request)?;
370
371        Ok(())
372    }
373
374    pub async fn get_event_stream(
375        &self,
376        topics: Vec<String>,
377    ) -> Result<impl Stream<Item = Result<StreamEvent, Error>> + Unpin, Error> {
378        let mut client = self.ark_client()?;
379
380        let response = client
381            .get_event_stream(GetEventStreamRequest { topics })
382            .await
383            .map_err(Error::request)?;
384        let mut stream = response.into_inner();
385
386        let stream = stream! {
387            loop {
388                match stream.try_next().await {
389                    Ok(Some(event)) => match event.event {
390                        None => {
391                            log::debug!("Got empty message");
392                        }
393                        Some(event) => {
394                            yield Ok(StreamEvent::try_from(event)?);
395                        }
396                    },
397                    Ok(None) => {
398                        yield Err(Error::event_stream_disconnect());
399                    }
400                    Err(e) => {
401                        yield Err(Error::event_stream(e));
402                    }
403                }
404            }
405        };
406
407        Ok(stream.boxed())
408    }
409
410    pub async fn get_tx_stream(
411        &self,
412    ) -> Result<impl Stream<Item = Result<StreamTransactionData, Error>> + Unpin, Error> {
413        let mut client = self.ark_client()?;
414
415        let response = client
416            .get_transactions_stream(GetTransactionsStreamRequest {})
417            .await
418            .map_err(Error::request)?;
419
420        let mut stream = response.into_inner();
421
422        let stream = stream! {
423            loop {
424                match stream.try_next().await {
425                    Ok(Some(event)) => match event.data {
426                        None => {
427                            log::debug!("Got empty message");
428                        }
429                        Some(event) => {
430                            yield Ok(StreamTransactionData::try_from(event)?);
431                        }
432                    },
433                    Ok(None) => {
434                        yield Err(Error::event_stream_disconnect());
435                    }
436                    Err(e) => {
437                        yield Err(Error::event_stream(e));
438                    }
439                }
440            }
441        };
442
443        Ok(stream.boxed())
444    }
445
446    pub async fn get_vtxo_chain(
447        &self,
448        outpoint: Option<OutPoint>,
449        size_and_index: Option<(i32, i32)>,
450    ) -> Result<VtxoChainResponse, Error> {
451        let mut client = self.indexer_client()?;
452        let response = client
453            .get_vtxo_chain(generated::ark::v1::GetVtxoChainRequest {
454                outpoint: outpoint.map(|o| generated::ark::v1::IndexerOutpoint {
455                    txid: o.txid.to_string(),
456                    vout: o.vout,
457                }),
458                page: size_and_index
459                    .map(|(size, index)| generated::ark::v1::IndexerPageRequest { size, index }),
460            })
461            .await
462            .map_err(Error::request)?;
463        let response = response.into_inner();
464        let result = response.try_into()?;
465        Ok(result)
466    }
467
468    pub async fn get_virtual_txs(
469        &self,
470        txids: Vec<String>,
471        size_and_index: Option<(i32, i32)>,
472    ) -> Result<VirtualTxsResponse, Error> {
473        let mut client = self.indexer_client()?;
474        let response = client
475            .get_virtual_txs(generated::ark::v1::GetVirtualTxsRequest {
476                txids,
477                page: size_and_index
478                    .map(|(size, index)| generated::ark::v1::IndexerPageRequest { size, index }),
479            })
480            .await
481            .map_err(Error::request)?;
482        let response = response.into_inner();
483        let result = response.try_into()?;
484        Ok(result)
485    }
486
487    /// Allows to subscribe for tx notifications related to the provided
488    /// vtxo scripts.
489    ///
490    /// It can also be used to update an existing subscriptions by adding
491    /// new scripts to it.
492    ///
493    /// Note: for new subscriptions, don't provide a `subscription_id`
494    ///
495    /// Returns the subscription id if successful
496    pub async fn subscribe_to_scripts(
497        &self,
498        scripts: Vec<ArkAddress>,
499        subscription_id: Option<String>,
500    ) -> Result<String, Error> {
501        let mut client = self.indexer_client()?;
502        let scripts = scripts
503            .iter()
504            .map(|address| address.to_p2tr_script_pubkey().to_hex_string())
505            .collect::<Vec<_>>();
506
507        // For new subscription we expect empty string ("") here
508        let subscription_id = subscription_id.unwrap_or_default();
509
510        let response = client
511            .subscribe_for_scripts(SubscribeForScriptsRequest {
512                scripts,
513                subscription_id,
514            })
515            .await
516            .map_err(Error::request)?;
517
518        let response = response.into_inner();
519
520        Ok(response.subscription_id)
521    }
522
523    /// Allows to remove scripts from an existing subscription.
524    pub async fn unsubscribe_from_scripts(
525        &self,
526        scripts: Vec<ArkAddress>,
527        subscription_id: String,
528    ) -> Result<(), Error> {
529        let mut client = self.indexer_client()?;
530        let scripts = scripts
531            .iter()
532            .map(|address| address.to_p2tr_script_pubkey().to_hex_string())
533            .collect::<Vec<_>>();
534
535        let _ = client
536            .unsubscribe_for_scripts(UnsubscribeForScriptsRequest {
537                subscription_id,
538                scripts,
539            })
540            .await
541            .map_err(Error::request)?;
542
543        Ok(())
544    }
545
546    /// Gets a subscription stream that returns subscription responses.
547    pub async fn get_subscription(
548        &self,
549        subscription_id: String,
550    ) -> Result<impl Stream<Item = Result<SubscriptionResponse, Error>> + Unpin, Error> {
551        let mut client = self.indexer_client()?;
552
553        let response = client
554            .get_subscription(GetSubscriptionRequest { subscription_id })
555            .await
556            .map_err(Error::request)?;
557
558        let mut stream = response.into_inner();
559
560        let stream = stream! {
561            loop {
562                match stream.try_next().await {
563                    Ok(Some(response)) => {
564                        match SubscriptionResponse::try_from(response) {
565                            Ok(subscription_response) => {
566                                yield Ok(subscription_response);
567                            }
568                            Err(e) => {
569                                yield Err(e);
570                            }
571                        }
572                    }
573                    Ok(None) => {
574                        break;
575                    }
576                    Err(e) => {
577                        yield Err(Error::event_stream(e));
578                    }
579                }
580            }
581        };
582
583        Ok(stream.boxed())
584    }
585
586    pub async fn estimate_fees(
587        &self,
588        intent: ark_core::intent::Intent,
589    ) -> Result<SignedAmount, Error> {
590        let mut client = self.ark_client()?;
591
592        let intent = intent.try_into()?;
593        let response = client
594            .estimate_intent_fee(EstimateIntentFeeRequest {
595                intent: Some(intent),
596            })
597            .await
598            .map_err(Error::request)?;
599        let response = response.into_inner();
600
601        Ok(SignedAmount::from_sat(response.fee))
602    }
603
604    fn ark_client(&self) -> Result<ArkServiceClient<tonic::transport::Channel>, Error> {
605        // Cloning an `ArkServiceClient<Channel>` is cheap.
606        self.ark_client.clone().ok_or(Error::not_connected())
607    }
608    fn indexer_client(&self) -> Result<IndexerServiceClient<tonic::transport::Channel>, Error> {
609        self.indexer_client.clone().ok_or(Error::not_connected())
610    }
611}
612
613impl TryFrom<ark_core::intent::Intent> for Intent {
614    type Error = Error;
615
616    fn try_from(value: ark_core::intent::Intent) -> Result<Self, Self::Error> {
617        Ok(Self {
618            proof: value.serialize_proof(),
619            message: value.serialize_message().map_err(Error::conversion)?,
620        })
621    }
622}
623
624impl TryFrom<generated::ark::v1::BatchStartedEvent> for BatchStartedEvent {
625    type Error = Error;
626
627    fn try_from(value: generated::ark::v1::BatchStartedEvent) -> Result<Self, Self::Error> {
628        let batch_expiry = parse_sequence_number(value.batch_expiry).map_err(Error::conversion)?;
629
630        Ok(BatchStartedEvent {
631            id: value.id,
632            intent_id_hashes: value.intent_id_hashes,
633            batch_expiry,
634        })
635    }
636}
637
638impl TryFrom<generated::ark::v1::BatchFinalizationEvent> for BatchFinalizationEvent {
639    type Error = Error;
640
641    fn try_from(value: generated::ark::v1::BatchFinalizationEvent) -> Result<Self, Self::Error> {
642        let base64 = &base64::engine::GeneralPurpose::new(
643            &base64::alphabet::STANDARD,
644            base64::engine::GeneralPurposeConfig::new(),
645        );
646
647        let commitment_tx = base64
648            .decode(&value.commitment_tx)
649            .map_err(Error::conversion)?;
650        let commitment_tx = Psbt::deserialize(&commitment_tx).map_err(Error::conversion)?;
651
652        Ok(BatchFinalizationEvent {
653            id: value.id,
654            commitment_tx,
655        })
656    }
657}
658
659impl TryFrom<generated::ark::v1::BatchFinalizedEvent> for BatchFinalizedEvent {
660    type Error = Error;
661
662    fn try_from(value: generated::ark::v1::BatchFinalizedEvent) -> Result<Self, Self::Error> {
663        let commitment_txid = value.commitment_txid.parse().map_err(Error::conversion)?;
664
665        Ok(BatchFinalizedEvent {
666            id: value.id,
667            commitment_txid,
668        })
669    }
670}
671
672impl From<generated::ark::v1::BatchFailedEvent> for BatchFailed {
673    fn from(value: generated::ark::v1::BatchFailedEvent) -> Self {
674        BatchFailed {
675            id: value.id,
676            reason: value.reason,
677        }
678    }
679}
680
681impl TryFrom<generated::ark::v1::TreeSigningStartedEvent> for TreeSigningStartedEvent {
682    type Error = Error;
683
684    fn try_from(value: generated::ark::v1::TreeSigningStartedEvent) -> Result<Self, Self::Error> {
685        let unsigned_commitment_tx = base64::engine::GeneralPurpose::new(
686            &base64::alphabet::STANDARD,
687            base64::engine::GeneralPurposeConfig::new(),
688        )
689        .decode(&value.unsigned_commitment_tx)
690        .map_err(Error::conversion)?;
691
692        let unsigned_commitment_tx =
693            Psbt::deserialize(&unsigned_commitment_tx).map_err(Error::conversion)?;
694
695        Ok(TreeSigningStartedEvent {
696            id: value.id,
697            cosigners_pubkeys: value
698                .cosigners_pubkeys
699                .into_iter()
700                .map(|pk| pk.parse().map_err(Error::conversion))
701                .collect::<Result<Vec<_>, Error>>()?,
702            unsigned_commitment_tx,
703        })
704    }
705}
706
707impl TryFrom<generated::ark::v1::TreeNoncesAggregatedEvent> for TreeNoncesAggregatedEvent {
708    type Error = Error;
709
710    fn try_from(value: generated::ark::v1::TreeNoncesAggregatedEvent) -> Result<Self, Self::Error> {
711        let tree_nonces = NoncePks::decode(value.tree_nonces).map_err(Error::conversion)?;
712
713        Ok(TreeNoncesAggregatedEvent {
714            id: value.id,
715            tree_nonces,
716        })
717    }
718}
719
720impl TryFrom<generated::ark::v1::TreeTxEvent> for TreeTxEvent {
721    type Error = Error;
722
723    fn try_from(value: generated::ark::v1::TreeTxEvent) -> Result<Self, Self::Error> {
724        let batch_tree_event_type = match value.batch_index {
725            0 => BatchTreeEventType::Vtxo,
726            1 => BatchTreeEventType::Connector,
727            n => return Err(Error::conversion(format!("unsupported batch index: {n}"))),
728        };
729
730        let txid = if value.txid.is_empty() {
731            None
732        } else {
733            Some(value.txid.parse().map_err(Error::conversion)?)
734        };
735
736        let base64 = &base64::engine::GeneralPurpose::new(
737            &base64::alphabet::STANDARD,
738            base64::engine::GeneralPurposeConfig::new(),
739        );
740
741        let bytes = base64.decode(&value.tx).map_err(Error::conversion)?;
742        let tx = Psbt::deserialize(&bytes).map_err(Error::conversion)?;
743
744        let children = value
745            .children
746            .iter()
747            .map(|(index, txid)| Ok((*index, txid.parse().map_err(Error::conversion)?)))
748            .collect::<Result<HashMap<_, _>, Error>>()?;
749
750        Ok(Self {
751            id: value.id,
752            topic: value.topic,
753            batch_tree_event_type,
754            tx_graph_chunk: TxGraphChunk { txid, tx, children },
755        })
756    }
757}
758
759impl TryFrom<generated::ark::v1::TreeSignatureEvent> for TreeSignatureEvent {
760    type Error = Error;
761
762    fn try_from(value: generated::ark::v1::TreeSignatureEvent) -> Result<Self, Self::Error> {
763        let batch_tree_event_type = match value.batch_index {
764            0 => BatchTreeEventType::Vtxo,
765            1 => BatchTreeEventType::Connector,
766            n => return Err(Error::conversion(format!("unsupported batch index: {n}"))),
767        };
768
769        let txid = value.txid.parse().map_err(Error::conversion)?;
770
771        let signature = Vec::from_hex(&value.signature).map_err(Error::conversion)?;
772        let signature = Signature::from_slice(&signature).map_err(Error::conversion)?;
773
774        Ok(Self {
775            id: value.id,
776            topic: value.topic,
777            batch_tree_event_type,
778            txid,
779            signature,
780        })
781    }
782}
783
784impl TryFrom<generated::ark::v1::TreeNoncesEvent> for TreeNoncesEvent {
785    type Error = Error;
786
787    fn try_from(value: generated::ark::v1::TreeNoncesEvent) -> Result<Self, Self::Error> {
788        let txid = value.txid.parse().map_err(Error::conversion)?;
789
790        let nonces = TreeTxNoncePks::decode(value.nonces).map_err(Error::conversion)?;
791
792        Ok(Self {
793            id: value.id,
794            topic: value.topic,
795            txid,
796            nonces,
797        })
798    }
799}
800
801impl TryFrom<generated::ark::v1::get_event_stream_response::Event> for StreamEvent {
802    type Error = Error;
803
804    fn try_from(
805        value: generated::ark::v1::get_event_stream_response::Event,
806    ) -> Result<Self, Self::Error> {
807        Ok(match value {
808            generated::ark::v1::get_event_stream_response::Event::BatchStarted(e) => {
809                StreamEvent::BatchStarted(e.try_into()?)
810            }
811            generated::ark::v1::get_event_stream_response::Event::BatchFinalization(e) => {
812                StreamEvent::BatchFinalization(e.try_into()?)
813            }
814            generated::ark::v1::get_event_stream_response::Event::BatchFinalized(e) => {
815                StreamEvent::BatchFinalized(e.try_into()?)
816            }
817            generated::ark::v1::get_event_stream_response::Event::BatchFailed(e) => {
818                StreamEvent::BatchFailed(e.into())
819            }
820            generated::ark::v1::get_event_stream_response::Event::TreeSigningStarted(e) => {
821                StreamEvent::TreeSigningStarted(e.try_into()?)
822            }
823            generated::ark::v1::get_event_stream_response::Event::TreeNoncesAggregated(e) => {
824                StreamEvent::TreeNoncesAggregated(e.try_into()?)
825            }
826            generated::ark::v1::get_event_stream_response::Event::TreeTx(e) => {
827                StreamEvent::TreeTx(e.try_into()?)
828            }
829            generated::ark::v1::get_event_stream_response::Event::TreeSignature(e) => {
830                StreamEvent::TreeSignature(e.try_into()?)
831            }
832            generated::ark::v1::get_event_stream_response::Event::TreeNonces(e) => {
833                StreamEvent::TreeNonces(e.try_into()?)
834            }
835            generated::ark::v1::get_event_stream_response::Event::Heartbeat(_) => {
836                StreamEvent::Heartbeat
837            }
838        })
839    }
840}
841
842impl TryFrom<generated::ark::v1::get_transactions_stream_response::Data> for StreamTransactionData {
843    type Error = Error;
844
845    fn try_from(
846        value: generated::ark::v1::get_transactions_stream_response::Data,
847    ) -> Result<Self, Self::Error> {
848        match value {
849            generated::ark::v1::get_transactions_stream_response::Data::CommitmentTx(
850                commitment_tx,
851            ) => Ok(StreamTransactionData::Commitment(
852                CommitmentTransaction::try_from(commitment_tx)?,
853            )),
854            generated::ark::v1::get_transactions_stream_response::Data::ArkTx(redeem) => Ok(
855                StreamTransactionData::Ark(ArkTransaction::try_from(redeem)?),
856            ),
857            generated::ark::v1::get_transactions_stream_response::Data::Heartbeat(_) => {
858                Ok(StreamTransactionData::Heartbeat)
859            }
860        }
861    }
862}
863
864impl TryFrom<generated::ark::v1::TxNotification> for CommitmentTransaction {
865    type Error = Error;
866
867    fn try_from(value: generated::ark::v1::TxNotification) -> Result<Self, Self::Error> {
868        let spent_vtxos = value
869            .spent_vtxos
870            .iter()
871            .map(VirtualTxOutPoint::try_from)
872            .collect::<Result<Vec<_>, _>>()?;
873
874        let spendable_vtxos = value
875            .spendable_vtxos
876            .iter()
877            .map(VirtualTxOutPoint::try_from)
878            .collect::<Result<Vec<_>, _>>()?;
879
880        Ok(CommitmentTransaction {
881            txid: Txid::from_str(value.txid.as_str()).map_err(Error::conversion)?,
882            spent_vtxos,
883            unspent_vtxos: spendable_vtxos,
884        })
885    }
886}
887
888impl TryFrom<generated::ark::v1::TxNotification> for ArkTransaction {
889    type Error = Error;
890
891    fn try_from(value: generated::ark::v1::TxNotification) -> Result<Self, Self::Error> {
892        let spent_vtxos = value
893            .spent_vtxos
894            .iter()
895            .map(VirtualTxOutPoint::try_from)
896            .collect::<Result<Vec<_>, _>>()?;
897
898        let spendable_vtxos = value
899            .spendable_vtxos
900            .iter()
901            .map(VirtualTxOutPoint::try_from)
902            .collect::<Result<Vec<_>, _>>()?;
903
904        Ok(ArkTransaction {
905            txid: Txid::from_str(value.txid.as_str()).map_err(Error::conversion)?,
906            spent_vtxos,
907            unspent_vtxos: spendable_vtxos,
908        })
909    }
910}
911
912impl TryFrom<Outpoint> for OutPoint {
913    type Error = Error;
914
915    fn try_from(value: Outpoint) -> Result<Self, Self::Error> {
916        let point = OutPoint {
917            txid: Txid::from_str(value.txid.as_str()).map_err(Error::conversion)?,
918            vout: value.vout,
919        };
920        Ok(point)
921    }
922}
923
924pub struct VtxoChainResponse {
925    pub chains: VtxoChains,
926    pub page: Option<IndexerPage>,
927}
928
929pub struct ListVtxosResponse {
930    pub vtxos: Vec<VirtualTxOutPoint>,
931    pub page: Option<IndexerPage>,
932}
933
934impl TryFrom<generated::ark::v1::GetVtxoChainResponse> for VtxoChainResponse {
935    type Error = Error;
936
937    fn try_from(value: generated::ark::v1::GetVtxoChainResponse) -> Result<Self, Self::Error> {
938        let chains = value
939            .chain
940            .iter()
941            .map(VtxoChain::try_from)
942            .collect::<Result<Vec<_>, Error>>()?;
943
944        Ok(VtxoChainResponse {
945            chains: VtxoChains { inner: chains },
946            page: value
947                .page
948                .map(IndexerPage::try_from)
949                .transpose()
950                .map_err(Error::conversion)?,
951        })
952    }
953}
954
955impl TryFrom<generated::ark::v1::GetVirtualTxsResponse> for VirtualTxsResponse {
956    type Error = Error;
957
958    fn try_from(value: generated::ark::v1::GetVirtualTxsResponse) -> Result<Self, Self::Error> {
959        let base64 = &base64::engine::GeneralPurpose::new(
960            &base64::alphabet::STANDARD,
961            base64::engine::GeneralPurposeConfig::new(),
962        );
963
964        let txs = value
965            .txs
966            .into_iter()
967            .map(|tx| {
968                let bytes = base64.decode(&tx).map_err(Error::conversion)?;
969                let psbt = Psbt::deserialize(&bytes).map_err(Error::conversion)?;
970
971                Ok(psbt)
972            })
973            .collect::<Result<Vec<_>, _>>()?;
974
975        Ok(VirtualTxsResponse {
976            txs,
977            page: value
978                .page
979                .map(IndexerPage::try_from)
980                .transpose()
981                .map_err(Error::conversion)?,
982        })
983    }
984}
985
986impl TryFrom<&generated::ark::v1::IndexerChain> for VtxoChain {
987    type Error = Error;
988
989    fn try_from(value: &generated::ark::v1::IndexerChain) -> Result<Self, Self::Error> {
990        let spends = value
991            .spends
992            .iter()
993            .map(|txid| {
994                // Handle the case where txid might be 66 bytes long by trimming the last 2 bytes.
995                let txid_str = if txid.len() == 66 { &txid[..64] } else { txid };
996                txid_str.parse().map_err(Error::conversion)
997            })
998            .collect::<Result<Vec<_>, Error>>()?;
999
1000        let tx_type = match value.r#type() {
1001            IndexerChainedTxType::Unspecified => ChainedTxType::Unspecified,
1002            IndexerChainedTxType::Commitment => ChainedTxType::Commitment,
1003            IndexerChainedTxType::Ark => ChainedTxType::Ark,
1004            IndexerChainedTxType::Tree => ChainedTxType::Tree,
1005            IndexerChainedTxType::Checkpoint => ChainedTxType::Checkpoint,
1006        };
1007
1008        Ok(VtxoChain {
1009            txid: value.txid.parse().map_err(Error::conversion)?,
1010            tx_type,
1011            spends,
1012            expires_at: value.expires_at,
1013        })
1014    }
1015}
1016
1017impl From<generated::ark::v1::IndexerPageResponse> for IndexerPage {
1018    fn from(value: generated::ark::v1::IndexerPageResponse) -> Self {
1019        IndexerPage {
1020            current: value.current,
1021            next: value.next,
1022            total: value.total,
1023        }
1024    }
1025}
1026
1027impl TryFrom<&generated::ark::v1::IndexerTxHistoryRecord> for history::Transaction {
1028    type Error = Error;
1029
1030    fn try_from(value: &generated::ark::v1::IndexerTxHistoryRecord) -> Result<Self, Self::Error> {
1031        let sign = match value.r#type() {
1032            generated::ark::v1::IndexerTxType::Received => 1,
1033            // Default to sent if unspecified.
1034            generated::ark::v1::IndexerTxType::Sent
1035            | generated::ark::v1::IndexerTxType::Unspecified => -1,
1036        };
1037
1038        let amount = SignedAmount::from_sat(value.amount as i64 * sign);
1039
1040        let tx = match &value.key {
1041            Some(Key::CommitmentTxid(txid)) => history::Transaction::Commitment {
1042                txid: txid.parse().map_err(Error::conversion)?,
1043                amount,
1044                created_at: value.created_at,
1045            },
1046            Some(Key::VirtualTxid(txid)) => history::Transaction::Ark {
1047                txid: txid.parse().map_err(Error::conversion)?,
1048                amount,
1049                is_settled: value.is_settled,
1050                created_at: value.created_at,
1051            },
1052            None => return Err(Error::conversion("invalid transaction without key")),
1053        };
1054
1055        Ok(tx)
1056    }
1057}
1058
1059impl TryFrom<generated::ark::v1::GetSubscriptionResponse> for SubscriptionResponse {
1060    type Error = Error;
1061
1062    fn try_from(value: generated::ark::v1::GetSubscriptionResponse) -> Result<Self, Self::Error> {
1063        let value = match value.data {
1064            Some(get_subscription_response::Data::Heartbeat(_)) => return Ok(Self::Heartbeat),
1065            Some(get_subscription_response::Data::Event(event)) => event,
1066            None => return Err(Error::conversion("empty subscription response")),
1067        };
1068
1069        let txid = value.txid.parse().map_err(Error::conversion)?;
1070
1071        let new_vtxos = value
1072            .new_vtxos
1073            .iter()
1074            .map(VirtualTxOutPoint::try_from)
1075            .collect::<Result<Vec<_>, _>>()?;
1076
1077        let spent_vtxos = value
1078            .spent_vtxos
1079            .iter()
1080            .map(VirtualTxOutPoint::try_from)
1081            .collect::<Result<Vec<_>, _>>()?;
1082
1083        let tx = if value.tx.is_empty() {
1084            None
1085        } else {
1086            let base64 = base64::engine::GeneralPurpose::new(
1087                &base64::alphabet::STANDARD,
1088                base64::engine::GeneralPurposeConfig::new(),
1089            );
1090            let bytes = base64.decode(&value.tx).map_err(Error::conversion)?;
1091            Some(Psbt::deserialize(&bytes).map_err(Error::conversion)?)
1092        };
1093
1094        let checkpoint_txs = value
1095            .checkpoint_txs
1096            .into_iter()
1097            .map(|(k, v)| {
1098                let out_point = OutPoint::from_str(k.as_str()).map_err(Error::conversion)?;
1099                let txid = v.txid.parse().map_err(Error::conversion)?;
1100                Ok((out_point, txid))
1101            })
1102            .collect::<Result<HashMap<_, _>, Error>>()?;
1103
1104        let scripts = value
1105            .scripts
1106            .iter()
1107            .map(|h| ScriptBuf::from_hex(h).map_err(Error::conversion))
1108            .collect::<Result<Vec<_>, _>>()?;
1109
1110        Ok(Self::Event(Box::new(SubscriptionEvent {
1111            txid,
1112            scripts,
1113            new_vtxos,
1114            spent_vtxos,
1115            tx,
1116            checkpoint_txs,
1117        })))
1118    }
1119}
1120
1121impl From<GetVtxosRequest> for generated::ark::v1::GetVtxosRequest {
1122    fn from(value: GetVtxosRequest) -> Self {
1123        let (spendable_only, spent_only, recoverable_only, pending_only) = match value.filter() {
1124            Some(GetVtxosRequestFilter::Spendable) => (true, false, false, false),
1125            Some(GetVtxosRequestFilter::Spent) => (false, true, false, false),
1126            Some(GetVtxosRequestFilter::Recoverable) => (false, false, true, false),
1127            Some(GetVtxosRequestFilter::PendingOnly) => (false, false, false, true),
1128            None => (false, false, false, false),
1129        };
1130
1131        let page = value
1132            .page()
1133            .map(|p| generated::ark::v1::IndexerPageRequest {
1134                size: p.size,
1135                index: p.index,
1136            });
1137
1138        match value.reference() {
1139            GetVtxosRequestReference::Scripts(script_bufs) => Self {
1140                scripts: script_bufs.iter().map(|s| s.to_hex_string()).collect(),
1141                outpoints: Vec::new(),
1142                spendable_only,
1143                spent_only,
1144                recoverable_only,
1145                page,
1146                pending_only,
1147            },
1148            GetVtxosRequestReference::OutPoints(outpoints) => Self {
1149                scripts: Vec::new(),
1150                outpoints: outpoints.iter().map(|o| o.to_string()).collect(),
1151                spendable_only,
1152                spent_only,
1153                recoverable_only,
1154                page,
1155                pending_only,
1156            },
1157        }
1158    }
1159}