Skip to main content

ark_rest/
client.rs

1use crate::apis;
2use crate::apis::ark_service_api::ark_service_confirm_registration;
3use crate::apis::ark_service_api::ark_service_delete_intent;
4use crate::apis::ark_service_api::ark_service_finalize_tx;
5use crate::apis::ark_service_api::ark_service_get_info;
6use crate::apis::ark_service_api::ark_service_register_intent;
7use crate::apis::ark_service_api::ark_service_submit_signed_forfeit_txs;
8use crate::apis::ark_service_api::ark_service_submit_tree_nonces;
9use crate::apis::ark_service_api::ark_service_submit_tree_signatures;
10use crate::apis::ark_service_api::ark_service_submit_tx;
11use crate::apis::indexer_service_api::indexer_service_get_virtual_txs;
12use crate::apis::indexer_service_api::indexer_service_get_vtxos;
13use crate::apis::indexer_service_api::indexer_service_subscribe_for_scripts;
14use crate::apis::indexer_service_api::indexer_service_unsubscribe_for_scripts;
15use crate::models;
16use crate::models::ConfirmRegistrationRequest;
17use crate::models::Intent;
18use crate::models::SubmitSignedForfeitTxsRequest;
19use crate::models::SubmitTreeNoncesRequest;
20use crate::models::SubmitTreeSignaturesRequest;
21use crate::models::SubscribeForScriptsRequest;
22use crate::models::UnsubscribeForScriptsRequest;
23use crate::Error;
24use ark_core::server::FinalizeOffchainTxResponse;
25use ark_core::server::GetVtxosRequest;
26use ark_core::server::GetVtxosRequestFilter;
27use ark_core::server::GetVtxosRequestReference;
28use ark_core::server::IndexerPage;
29use ark_core::server::NoncePks;
30use ark_core::server::PartialSigTree;
31use ark_core::server::StreamEvent;
32use ark_core::server::SubmitOffchainTxResponse;
33use ark_core::server::SubscriptionResponse;
34use ark_core::server::VirtualTxOutPoint;
35use ark_core::server::VirtualTxsResponse;
36use ark_core::ArkAddress;
37use bitcoin::base64;
38use bitcoin::base64::Engine;
39use bitcoin::secp256k1::PublicKey;
40use bitcoin::Psbt;
41use bitcoin::Txid;
42use futures::stream;
43use futures::Stream;
44use futures::StreamExt;
45
46pub struct Client {
47    configuration: apis::configuration::Configuration,
48}
49
50pub struct ListVtxosResponse {
51    pub vtxos: Vec<VirtualTxOutPoint>,
52    pub page: Option<IndexerPage>,
53}
54
55impl Client {
56    pub fn new(ark_server_url: String) -> Result<Self, Error> {
57        let mut default_headers = reqwest::header::HeaderMap::new();
58        default_headers.insert(
59            "X-Build-Version",
60            reqwest::header::HeaderValue::from_static(env!("CARGO_PKG_VERSION")),
61        );
62        let client = reqwest::Client::builder()
63            .default_headers(default_headers)
64            .build()
65            .map_err(Error::request)?;
66
67        let configuration = apis::configuration::Configuration {
68            base_path: ark_server_url,
69            client,
70            ..Default::default()
71        };
72
73        Ok(Self { configuration })
74    }
75
76    pub async fn get_info(&self) -> Result<ark_core::server::Info, Error> {
77        let info = ark_service_get_info(&self.configuration)
78            .await
79            .map_err(Error::request)?;
80
81        let info = info.try_into()?;
82
83        Ok(info)
84    }
85
86    pub async fn submit_offchain_transaction_request(
87        &self,
88        ark_tx: Psbt,
89        checkpoint_txs: Vec<Psbt>,
90    ) -> Result<SubmitOffchainTxResponse, Error> {
91        let base64 = base64::engine::GeneralPurpose::new(
92            &base64::alphabet::STANDARD,
93            base64::engine::GeneralPurposeConfig::new(),
94        );
95
96        let ark_tx = base64.encode(ark_tx.serialize());
97
98        let checkpoint_txs = checkpoint_txs
99            .into_iter()
100            .map(|tx| Some(base64.encode(tx.serialize())))
101            .collect();
102
103        let res = ark_service_submit_tx(
104            &self.configuration,
105            models::SubmitTxRequest {
106                signed_ark_tx: Some(ark_tx),
107                checkpoint_txs,
108            },
109        )
110        .await
111        .map_err(Error::request)?;
112
113        let signed_ark_tx = res.final_ark_tx;
114        let signed_ark_tx = signed_ark_tx.ok_or(Error::request("Signed ark tx not received"))?;
115
116        let signed_ark_tx = base64.decode(signed_ark_tx).map_err(Error::conversion)?;
117        let signed_ark_tx = Psbt::deserialize(&signed_ark_tx).map_err(Error::conversion)?;
118
119        let signed_checkpoint_txs = res
120            .signed_checkpoint_txs
121            .ok_or(Error::request("Signed checkpoint tx not received"))?
122            .into_iter()
123            .map(|tx| {
124                let tx = base64.decode(tx).map_err(Error::conversion)?;
125                let tx = Psbt::deserialize(&tx).map_err(Error::conversion)?;
126
127                Ok(tx)
128            })
129            .collect::<Result<Vec<_>, Error>>()?;
130
131        Ok(SubmitOffchainTxResponse {
132            signed_ark_tx,
133            signed_checkpoint_txs,
134        })
135    }
136
137    pub async fn finalize_offchain_transaction(
138        &self,
139        txid: Txid,
140        checkpoint_txs: Vec<Psbt>,
141    ) -> Result<FinalizeOffchainTxResponse, Error> {
142        let base64 = base64::engine::GeneralPurpose::new(
143            &base64::alphabet::STANDARD,
144            base64::engine::GeneralPurposeConfig::new(),
145        );
146
147        let checkpoint_txs = checkpoint_txs
148            .into_iter()
149            .map(|tx| Some(base64.encode(tx.serialize())))
150            .collect();
151
152        ark_service_finalize_tx(
153            &self.configuration,
154            models::FinalizeTxRequest {
155                ark_txid: Some(txid.to_string()),
156                final_checkpoint_txs: checkpoint_txs,
157            },
158        )
159        .await
160        .map_err(Error::request)?;
161
162        Ok(FinalizeOffchainTxResponse {})
163    }
164
165    pub async fn list_vtxos(&self, request: GetVtxosRequest) -> Result<ListVtxosResponse, Error> {
166        let reference = request.reference();
167
168        if reference.is_empty() {
169            return Ok(ListVtxosResponse {
170                vtxos: Vec::new(),
171                page: None,
172            });
173        }
174
175        let filter = request.filter();
176
177        let (scripts, outpoints) = match reference {
178            GetVtxosRequestReference::Scripts(s) => (
179                Some(s.iter().map(|s| s.to_hex_string()).clone().collect()),
180                None,
181            ),
182            GetVtxosRequestReference::OutPoints(o) => {
183                (None, Some(o.iter().map(|o| o.to_string()).collect()))
184            }
185        };
186        let (spendable_only, spent_only, recoverable_only, pending_only) = match filter {
187            None => (Some(false), Some(false), Some(false), Some(false)),
188            Some(filter) => match filter {
189                GetVtxosRequestFilter::Spendable => {
190                    (Some(true), Some(false), Some(false), Some(false))
191                }
192                GetVtxosRequestFilter::Spent => (Some(false), Some(true), Some(false), Some(false)),
193                GetVtxosRequestFilter::Recoverable => {
194                    (Some(false), Some(false), Some(true), Some(false))
195                }
196                GetVtxosRequestFilter::PendingOnly => {
197                    (Some(false), Some(false), Some(false), Some(true))
198                }
199            },
200        };
201
202        let page_period_size: Option<i32> = request.page().map(|p| p.size);
203        let page_period_index: Option<i32> = request.page().map(|p| p.index);
204
205        let before = request.before().map(|b| b as i64);
206        let after = request.after().map(|b| b as i64);
207
208        let response = indexer_service_get_vtxos(
209            &self.configuration,
210            scripts,
211            outpoints,
212            spendable_only,
213            spent_only,
214            recoverable_only,
215            pending_only,
216            before,
217            after,
218            page_period_size,
219            page_period_index,
220        )
221        .await
222        .map_err(Error::request)?;
223
224        let vtxos = response.vtxos.ok_or(Error::request("VTXOs not received"))?;
225        let vtxos = vtxos
226            .into_iter()
227            .map(VirtualTxOutPoint::try_from)
228            .collect::<Result<Vec<_>, crate::conversions::ConversionError>>()?;
229
230        let page = response.page.map(|p| IndexerPage {
231            current: p.current.unwrap_or_default(),
232            next: p.next.unwrap_or_default(),
233            total: p.total.unwrap_or_default(),
234        });
235
236        Ok(ListVtxosResponse { vtxos, page })
237    }
238
239    pub async fn register_intent(
240        &self,
241        intent_message: &ark_core::intent::IntentMessage,
242        proof: &Psbt,
243    ) -> Result<String, Error> {
244        let message = intent_message.encode().map_err(Error::conversion)?;
245        let base64 = base64::engine::GeneralPurpose::new(
246            &base64::alphabet::STANDARD,
247            base64::engine::GeneralPurposeConfig::new(),
248        );
249
250        let bytes = proof.serialize();
251
252        let proof = base64.encode(&bytes);
253
254        let response = ark_service_register_intent(
255            &self.configuration,
256            models::RegisterIntentRequest {
257                intent: Some(Intent {
258                    proof: Some(proof),
259                    message: Some(message),
260                }),
261            },
262        )
263        .await
264        .map_err(Error::request)?;
265        let intent_id = response
266            .intent_id
267            .ok_or(Error::request("Could not get intent id"))?;
268
269        Ok(intent_id)
270    }
271
272    pub async fn delete_intent(
273        &self,
274        intent_message: &ark_core::intent::IntentMessage,
275        proof: &Psbt,
276    ) -> Result<(), Error> {
277        let message = intent_message.encode().map_err(Error::conversion)?;
278        let base64 = base64::engine::GeneralPurpose::new(
279            &base64::alphabet::STANDARD,
280            base64::engine::GeneralPurposeConfig::new(),
281        );
282
283        let bytes = proof.serialize();
284
285        let proof = base64.encode(&bytes);
286        ark_service_delete_intent(
287            &self.configuration,
288            models::DeleteIntentRequest {
289                intent: Some(Intent {
290                    proof: Some(proof),
291                    message: Some(message),
292                }),
293            },
294        )
295        .await
296        .map_err(Error::request)?;
297
298        Ok(())
299    }
300
301    pub async fn get_event_stream(
302        &self,
303        topics: Vec<String>,
304    ) -> Result<impl Stream<Item = Result<StreamEvent, Error>> + Unpin, Error> {
305        // Build the URL with query parameters
306        let mut url = format!("{}/v1/batch/events", self.configuration.base_path);
307        if !topics.is_empty() {
308            let query_params: Vec<String> = topics
309                .iter()
310                .map(|topic| format!("topics={}", urlencoding::encode(topic)))
311                .collect();
312            url = format!("{}?{}", url, query_params.join("&"));
313        }
314
315        // Create the request for SSE
316        let client = &self.configuration.client;
317        let request = client
318            .get(&url)
319            .header("Accept", "text/event-stream")
320            .send()
321            .await
322            .map_err(Error::request)?;
323
324        // Check if the request was successful
325        if !request.status().is_success() {
326            return Err(Error::request(format!(
327                "Event stream request failed with status: {}",
328                request.status()
329            )));
330        }
331
332        // Convert the response into a byte stream using async chunks
333        let byte_stream = request.bytes_stream();
334
335        // Create the SSE event stream
336        let stream = stream::unfold(byte_stream, |mut byte_stream| async move {
337            loop {
338                match byte_stream.next().await {
339                    Some(chunk_result) => {
340                        let result = match chunk_result {
341                            Ok(bytes) => {
342                                let event = String::from_utf8(bytes.to_vec());
343                                match event {
344                                    Ok(event) => {
345                                        let event = event.trim();
346                                        // Skip empty lines and SSE comments
347                                        if event.is_empty() || event.starts_with(':') {
348                                            continue;
349                                        }
350                                        // Strip SSE `data: ` prefix
351                                        let event = event.strip_prefix("data: ").unwrap_or(event);
352                                        if let Ok(response) =
353                                            serde_json::from_str::<models::GetEventStreamResponse>(
354                                                event,
355                                            )
356                                        {
357                                            match StreamEvent::try_from(response) {
358                                                Ok(stream_event) => Ok(stream_event),
359                                                Err(e) => Err(Error::conversion(e)),
360                                            }
361                                        } else {
362                                            // Handle parse error
363                                            Err(Error::conversion("Failed to parse JSON"))
364                                        }
365                                    }
366                                    Err(error) => Err(Error::conversion(error)),
367                                }
368                            }
369                            Err(e) => Err(Error::request(e)),
370                        };
371                        return Some((result, byte_stream));
372                    }
373                    None => return None,
374                }
375            }
376        });
377
378        Ok(Box::pin(stream))
379    }
380    pub async fn confirm_registration(&self, intent_id: String) -> Result<(), Error> {
381        ark_service_confirm_registration(
382            &self.configuration,
383            ConfirmRegistrationRequest {
384                intent_id: Some(intent_id),
385            },
386        )
387        .await
388        .map_err(Error::request)?;
389
390        Ok(())
391    }
392
393    pub async fn submit_tree_nonces(
394        &self,
395        batch_id: &str,
396        cosigner_pubkey: PublicKey,
397        pub_nonce_tree: NoncePks,
398    ) -> Result<(), Error> {
399        let tree_nonces = pub_nonce_tree.encode();
400
401        ark_service_submit_tree_nonces(
402            &self.configuration,
403            SubmitTreeNoncesRequest {
404                batch_id: Some(batch_id.to_string()),
405                pubkey: Some(cosigner_pubkey.to_string()),
406                tree_nonces: Some(tree_nonces),
407            },
408        )
409        .await
410        .map_err(Error::request)?;
411
412        Ok(())
413    }
414
415    pub async fn submit_tree_signatures(
416        &self,
417        batch_id: &str,
418        cosigner_pk: PublicKey,
419        partial_sig_tree: PartialSigTree,
420    ) -> Result<(), Error> {
421        let tree_signatures = partial_sig_tree.encode();
422
423        ark_service_submit_tree_signatures(
424            &self.configuration,
425            SubmitTreeSignaturesRequest {
426                batch_id: Some(batch_id.to_string()),
427                pubkey: Some(cosigner_pk.to_string()),
428                tree_signatures: Some(tree_signatures),
429            },
430        )
431        .await
432        .map_err(Error::request)?;
433
434        Ok(())
435    }
436
437    pub async fn submit_signed_forfeit_txs(
438        &self,
439        signed_forfeit_txs: Vec<Psbt>,
440        signed_commitment_tx: Option<Psbt>,
441    ) -> Result<(), Error> {
442        let base64 = base64::engine::GeneralPurpose::new(
443            &base64::alphabet::STANDARD,
444            base64::engine::GeneralPurposeConfig::new(),
445        );
446
447        let signed_commitment_tx = signed_commitment_tx
448            .map(|tx| base64.encode(tx.serialize()))
449            .unwrap_or_default();
450
451        ark_service_submit_signed_forfeit_txs(
452            &self.configuration,
453            SubmitSignedForfeitTxsRequest {
454                signed_forfeit_txs: signed_forfeit_txs
455                    .iter()
456                    .map(|psbt| Some(base64.encode(psbt.serialize())))
457                    .collect(),
458                signed_commitment_tx: Some(signed_commitment_tx),
459            },
460        )
461        .await
462        .map_err(Error::request)?;
463
464        Ok(())
465    }
466
467    /// Allows to subscribe for tx notifications related to the provided
468    /// vtxo scripts.
469    ///
470    /// It can also be used to update an existing subscriptions by adding
471    /// new scripts to it.
472    ///
473    /// Note: for new subscriptions, don't provide a `subscription_id`
474    ///
475    /// Returns the subscription id if successful
476    pub async fn subscribe_to_scripts(
477        &self,
478        scripts: Vec<ArkAddress>,
479        subscription_id: Option<String>,
480    ) -> Result<String, Error> {
481        let scripts = scripts
482            .iter()
483            .map(|address| address.to_p2tr_script_pubkey().to_hex_string())
484            .collect::<Vec<_>>();
485
486        // For new subscription we expect empty string ("") here
487        let subscription_id = subscription_id.unwrap_or_default();
488
489        let response = indexer_service_subscribe_for_scripts(
490            &self.configuration,
491            SubscribeForScriptsRequest {
492                scripts: Some(scripts),
493                subscription_id: Some(subscription_id),
494            },
495        )
496        .await
497        .map_err(Error::request)?;
498
499        let subscription_id = response
500            .subscription_id
501            .ok_or(Error::request("No subscription id"))?;
502
503        Ok(subscription_id)
504    }
505
506    /// Allows to remove scripts from an existing subscription.
507    pub async fn unsubscribe_from_scripts(
508        &self,
509        scripts: Vec<ArkAddress>,
510        subscription_id: String,
511    ) -> Result<(), Error> {
512        let scripts = scripts
513            .iter()
514            .map(|address| address.to_p2tr_script_pubkey().to_hex_string())
515            .collect::<Vec<_>>();
516
517        let _ = indexer_service_unsubscribe_for_scripts(
518            &self.configuration,
519            UnsubscribeForScriptsRequest {
520                subscription_id: Some(subscription_id),
521                scripts: Some(scripts),
522            },
523        )
524        .await
525        .map_err(Error::request)?;
526
527        Ok(())
528    }
529
530    pub async fn get_subscription(
531        &self,
532        subscription_id: String,
533    ) -> Result<impl Stream<Item = Result<SubscriptionResponse, Error>> + Unpin, Error> {
534        // Build the URL with subscription_id parameter
535        let url = format!(
536            "{}/v1/script/subscription/{subscription_id}",
537            self.configuration.base_path,
538        );
539
540        // Create the request for SSE
541        let client = &self.configuration.client;
542        let request = client
543            .get(&url)
544            .header("Accept", "text/event-stream")
545            .send()
546            .await
547            .map_err(Error::request)?;
548
549        // Check if the request was successful
550        if !request.status().is_success() {
551            return Err(Error::request(format!(
552                "Subscription stream request failed with status: {}",
553                request.status()
554            )));
555        }
556
557        // Convert the response into a byte stream using async chunks
558        let byte_stream = request.bytes_stream();
559
560        // Create the SSE event stream
561        let stream = stream::unfold(byte_stream, |mut byte_stream| async move {
562            loop {
563                match byte_stream.next().await {
564                    Some(chunk_result) => {
565                        let result = match chunk_result {
566                            Ok(bytes) => {
567                                let event = String::from_utf8(bytes.to_vec());
568                                match event {
569                                    Ok(event) => {
570                                        let event = event.trim();
571                                        // Skip empty lines and SSE comments
572                                        if event.is_empty() || event.starts_with(':') {
573                                            continue;
574                                        }
575                                        // Strip SSE `data: ` prefix
576                                        let event = event.strip_prefix("data: ").unwrap_or(event);
577                                        if let Ok(response) =
578                                            serde_json::from_str::<models::GetSubscriptionResponse>(
579                                                event,
580                                            )
581                                        {
582                                            match SubscriptionResponse::try_from(response) {
583                                                Ok(subscription_response) => {
584                                                    Ok(subscription_response)
585                                                }
586                                                Err(e) => Err(Error::conversion(e)),
587                                            }
588                                        } else {
589                                            // Handle parse error
590                                            Err(Error::conversion("Failed to parse JSON"))
591                                        }
592                                    }
593                                    Err(error) => Err(Error::conversion(error)),
594                                }
595                            }
596                            Err(e) => Err(Error::request(e)),
597                        };
598                        return Some((result, byte_stream));
599                    }
600                    None => return None,
601                }
602            }
603        });
604
605        Ok(Box::pin(stream))
606    }
607
608    pub async fn get_virtual_txs(
609        &self,
610        txids: Vec<String>,
611        size_and_index: Option<(i32, i32)>,
612    ) -> Result<VirtualTxsResponse, Error> {
613        let (size, index) = size_and_index
614            .map(|(sz, indx)| (Some(sz), Some(indx)))
615            .unwrap_or_default();
616        let response = indexer_service_get_virtual_txs(&self.configuration, txids, size, index)
617            .await
618            .map_err(Error::request)?;
619
620        let base64 = &base64::engine::GeneralPurpose::new(
621            &base64::alphabet::STANDARD,
622            base64::engine::GeneralPurposeConfig::new(),
623        );
624
625        let txs = response
626            .txs
627            .unwrap_or_default()
628            .into_iter()
629            .map(|tx| {
630                let bytes = base64.decode(&tx).map_err(Error::conversion)?;
631                let psbt = Psbt::deserialize(&bytes).map_err(Error::conversion)?;
632
633                Ok(psbt)
634            })
635            .collect::<Result<Vec<Psbt>, Error>>()?;
636
637        Ok(VirtualTxsResponse {
638            txs,
639            page: response.page.map(|a| IndexerPage {
640                current: a.current.unwrap_or_default(),
641                next: a.next.unwrap_or_default(),
642                total: a.total.unwrap_or_default(),
643            }),
644        })
645    }
646}