Skip to main content

ark_rest/
client.rs

1use crate::apis;
2use crate::apis::ark_service_api::ark_service_confirm_registration;
3use crate::apis::ark_service_api::ark_service_delete_intent;
4use crate::apis::ark_service_api::ark_service_finalize_tx;
5use crate::apis::ark_service_api::ark_service_get_info;
6use crate::apis::ark_service_api::ark_service_register_intent;
7use crate::apis::ark_service_api::ark_service_submit_signed_forfeit_txs;
8use crate::apis::ark_service_api::ark_service_submit_tree_nonces;
9use crate::apis::ark_service_api::ark_service_submit_tree_signatures;
10use crate::apis::ark_service_api::ark_service_submit_tx;
11use crate::apis::indexer_service_api::indexer_service_get_virtual_txs;
12use crate::apis::indexer_service_api::indexer_service_get_vtxos;
13use crate::apis::indexer_service_api::indexer_service_subscribe_for_scripts;
14use crate::apis::indexer_service_api::indexer_service_unsubscribe_for_scripts;
15use crate::models;
16use crate::models::ConfirmRegistrationRequest;
17use crate::models::Intent;
18use crate::models::SubmitSignedForfeitTxsRequest;
19use crate::models::SubmitTreeNoncesRequest;
20use crate::models::SubmitTreeSignaturesRequest;
21use crate::models::SubscribeForScriptsRequest;
22use crate::models::UnsubscribeForScriptsRequest;
23use crate::Error;
24use ark_core::server::FinalizeOffchainTxResponse;
25use ark_core::server::GetVtxosRequest;
26use ark_core::server::GetVtxosRequestFilter;
27use ark_core::server::GetVtxosRequestReference;
28use ark_core::server::IndexerPage;
29use ark_core::server::NoncePks;
30use ark_core::server::PartialSigTree;
31use ark_core::server::StreamEvent;
32use ark_core::server::SubmitOffchainTxResponse;
33use ark_core::server::SubscriptionResponse;
34use ark_core::server::VirtualTxOutPoint;
35use ark_core::server::VirtualTxsResponse;
36use ark_core::ArkAddress;
37use bitcoin::base64;
38use bitcoin::base64::Engine;
39use bitcoin::secp256k1::PublicKey;
40use bitcoin::Psbt;
41use bitcoin::Txid;
42use futures::stream;
43use futures::Stream;
44use futures::StreamExt;
45
46pub struct Client {
47    configuration: apis::configuration::Configuration,
48}
49
50pub struct ListVtxosResponse {
51    pub vtxos: Vec<VirtualTxOutPoint>,
52    pub page: Option<IndexerPage>,
53}
54
55impl Client {
56    pub fn new(ark_server_url: String) -> Self {
57        let configuration = apis::configuration::Configuration {
58            base_path: ark_server_url,
59            ..Default::default()
60        };
61
62        Self { configuration }
63    }
64
65    pub async fn get_info(&self) -> Result<ark_core::server::Info, Error> {
66        let info = ark_service_get_info(&self.configuration)
67            .await
68            .map_err(Error::request)?;
69
70        let info = info.try_into()?;
71
72        Ok(info)
73    }
74
75    pub async fn submit_offchain_transaction_request(
76        &self,
77        ark_tx: Psbt,
78        checkpoint_txs: Vec<Psbt>,
79    ) -> Result<SubmitOffchainTxResponse, Error> {
80        let base64 = base64::engine::GeneralPurpose::new(
81            &base64::alphabet::STANDARD,
82            base64::engine::GeneralPurposeConfig::new(),
83        );
84
85        let ark_tx = base64.encode(ark_tx.serialize());
86
87        let checkpoint_txs = checkpoint_txs
88            .into_iter()
89            .map(|tx| Some(base64.encode(tx.serialize())))
90            .collect();
91
92        let res = ark_service_submit_tx(
93            &self.configuration,
94            models::SubmitTxRequest {
95                signed_ark_tx: Some(ark_tx),
96                checkpoint_txs,
97            },
98        )
99        .await
100        .map_err(Error::request)?;
101
102        let signed_ark_tx = res.final_ark_tx;
103        let signed_ark_tx = signed_ark_tx.ok_or(Error::request("Signed ark tx not received"))?;
104
105        let signed_ark_tx = base64.decode(signed_ark_tx).map_err(Error::conversion)?;
106        let signed_ark_tx = Psbt::deserialize(&signed_ark_tx).map_err(Error::conversion)?;
107
108        let signed_checkpoint_txs = res
109            .signed_checkpoint_txs
110            .ok_or(Error::request("Signed checkpoint tx not received"))?
111            .into_iter()
112            .map(|tx| {
113                let tx = base64.decode(tx).map_err(Error::conversion)?;
114                let tx = Psbt::deserialize(&tx).map_err(Error::conversion)?;
115
116                Ok(tx)
117            })
118            .collect::<Result<Vec<_>, Error>>()?;
119
120        Ok(SubmitOffchainTxResponse {
121            signed_ark_tx,
122            signed_checkpoint_txs,
123        })
124    }
125
126    pub async fn finalize_offchain_transaction(
127        &self,
128        txid: Txid,
129        checkpoint_txs: Vec<Psbt>,
130    ) -> Result<FinalizeOffchainTxResponse, Error> {
131        let base64 = base64::engine::GeneralPurpose::new(
132            &base64::alphabet::STANDARD,
133            base64::engine::GeneralPurposeConfig::new(),
134        );
135
136        let checkpoint_txs = checkpoint_txs
137            .into_iter()
138            .map(|tx| Some(base64.encode(tx.serialize())))
139            .collect();
140
141        ark_service_finalize_tx(
142            &self.configuration,
143            models::FinalizeTxRequest {
144                ark_txid: Some(txid.to_string()),
145                final_checkpoint_txs: checkpoint_txs,
146            },
147        )
148        .await
149        .map_err(Error::request)?;
150
151        Ok(FinalizeOffchainTxResponse {})
152    }
153
154    pub async fn list_vtxos(&self, request: GetVtxosRequest) -> Result<ListVtxosResponse, Error> {
155        let reference = request.reference();
156
157        if reference.is_empty() {
158            return Ok(ListVtxosResponse {
159                vtxos: Vec::new(),
160                page: None,
161            });
162        }
163
164        let filter = request.filter();
165
166        let (scripts, outpoints) = match reference {
167            GetVtxosRequestReference::Scripts(s) => (
168                Some(s.iter().map(|s| s.to_hex_string()).clone().collect()),
169                None,
170            ),
171            GetVtxosRequestReference::OutPoints(o) => {
172                (None, Some(o.iter().map(|o| o.to_string()).collect()))
173            }
174        };
175        let (spendable_only, spent_only, recoverable_only, pending_only) = match filter {
176            None => (Some(false), Some(false), Some(false), Some(false)),
177            Some(filter) => match filter {
178                GetVtxosRequestFilter::Spendable => {
179                    (Some(true), Some(false), Some(false), Some(false))
180                }
181                GetVtxosRequestFilter::Spent => (Some(false), Some(true), Some(false), Some(false)),
182                GetVtxosRequestFilter::Recoverable => {
183                    (Some(false), Some(false), Some(true), Some(false))
184                }
185                GetVtxosRequestFilter::PendingOnly => {
186                    (Some(false), Some(false), Some(false), Some(true))
187                }
188            },
189        };
190
191        let page_period_size: Option<i32> = request.page().map(|p| p.size);
192        let page_period_index: Option<i32> = request.page().map(|p| p.index);
193
194        let before = request.before().map(|b| b as i64);
195        let after = request.after().map(|b| b as i64);
196
197        let response = indexer_service_get_vtxos(
198            &self.configuration,
199            scripts,
200            outpoints,
201            spendable_only,
202            spent_only,
203            recoverable_only,
204            pending_only,
205            before,
206            after,
207            page_period_size,
208            page_period_index,
209        )
210        .await
211        .map_err(Error::request)?;
212
213        let vtxos = response.vtxos.ok_or(Error::request("VTXOs not received"))?;
214        let vtxos = vtxos
215            .into_iter()
216            .map(VirtualTxOutPoint::try_from)
217            .collect::<Result<Vec<_>, crate::conversions::ConversionError>>()?;
218
219        let page = response.page.map(|p| IndexerPage {
220            current: p.current.unwrap_or_default(),
221            next: p.next.unwrap_or_default(),
222            total: p.total.unwrap_or_default(),
223        });
224
225        Ok(ListVtxosResponse { vtxos, page })
226    }
227
228    pub async fn register_intent(
229        &self,
230        intent_message: &ark_core::intent::IntentMessage,
231        proof: &Psbt,
232    ) -> Result<String, Error> {
233        let message = intent_message.encode().map_err(Error::conversion)?;
234        let base64 = base64::engine::GeneralPurpose::new(
235            &base64::alphabet::STANDARD,
236            base64::engine::GeneralPurposeConfig::new(),
237        );
238
239        let bytes = proof.serialize();
240
241        let proof = base64.encode(&bytes);
242
243        let response = ark_service_register_intent(
244            &self.configuration,
245            models::RegisterIntentRequest {
246                intent: Some(Intent {
247                    proof: Some(proof),
248                    message: Some(message),
249                }),
250            },
251        )
252        .await
253        .map_err(Error::request)?;
254        let intent_id = response
255            .intent_id
256            .ok_or(Error::request("Could not get intent id"))?;
257
258        Ok(intent_id)
259    }
260
261    pub async fn delete_intent(
262        &self,
263        intent_message: &ark_core::intent::IntentMessage,
264        proof: &Psbt,
265    ) -> Result<(), Error> {
266        let message = intent_message.encode().map_err(Error::conversion)?;
267        let base64 = base64::engine::GeneralPurpose::new(
268            &base64::alphabet::STANDARD,
269            base64::engine::GeneralPurposeConfig::new(),
270        );
271
272        let bytes = proof.serialize();
273
274        let proof = base64.encode(&bytes);
275        ark_service_delete_intent(
276            &self.configuration,
277            models::DeleteIntentRequest {
278                intent: Some(Intent {
279                    proof: Some(proof),
280                    message: Some(message),
281                }),
282            },
283        )
284        .await
285        .map_err(Error::request)?;
286
287        Ok(())
288    }
289
290    pub async fn get_event_stream(
291        &self,
292        topics: Vec<String>,
293    ) -> Result<impl Stream<Item = Result<StreamEvent, Error>> + Unpin, Error> {
294        // Build the URL with query parameters
295        let mut url = format!("{}/v1/batch/events", self.configuration.base_path);
296        if !topics.is_empty() {
297            let query_params: Vec<String> = topics
298                .iter()
299                .map(|topic| format!("topics={}", urlencoding::encode(topic)))
300                .collect();
301            url = format!("{}?{}", url, query_params.join("&"));
302        }
303
304        // Create the request for SSE
305        let client = &self.configuration.client;
306        let request = client
307            .get(&url)
308            .header("Accept", "text/event-stream")
309            .send()
310            .await
311            .map_err(Error::request)?;
312
313        // Check if the request was successful
314        if !request.status().is_success() {
315            return Err(Error::request(format!(
316                "Event stream request failed with status: {}",
317                request.status()
318            )));
319        }
320
321        // Convert the response into a byte stream using async chunks
322        let byte_stream = request.bytes_stream();
323
324        // Create the SSE event stream
325        let stream = stream::unfold(byte_stream, |mut byte_stream| async move {
326            loop {
327                match byte_stream.next().await {
328                    Some(chunk_result) => {
329                        let result = match chunk_result {
330                            Ok(bytes) => {
331                                let event = String::from_utf8(bytes.to_vec());
332                                match event {
333                                    Ok(event) => {
334                                        let event = event.trim();
335                                        // Skip empty lines and SSE comments
336                                        if event.is_empty() || event.starts_with(':') {
337                                            continue;
338                                        }
339                                        // Strip SSE `data: ` prefix
340                                        let event = event.strip_prefix("data: ").unwrap_or(event);
341                                        if let Ok(response) =
342                                            serde_json::from_str::<models::GetEventStreamResponse>(
343                                                event,
344                                            )
345                                        {
346                                            match StreamEvent::try_from(response) {
347                                                Ok(stream_event) => Ok(stream_event),
348                                                Err(e) => Err(Error::conversion(e)),
349                                            }
350                                        } else {
351                                            // Handle parse error
352                                            Err(Error::conversion("Failed to parse JSON"))
353                                        }
354                                    }
355                                    Err(error) => Err(Error::conversion(error)),
356                                }
357                            }
358                            Err(e) => Err(Error::request(e)),
359                        };
360                        return Some((result, byte_stream));
361                    }
362                    None => return None,
363                }
364            }
365        });
366
367        Ok(Box::pin(stream))
368    }
369    pub async fn confirm_registration(&self, intent_id: String) -> Result<(), Error> {
370        ark_service_confirm_registration(
371            &self.configuration,
372            ConfirmRegistrationRequest {
373                intent_id: Some(intent_id),
374            },
375        )
376        .await
377        .map_err(Error::request)?;
378
379        Ok(())
380    }
381
382    pub async fn submit_tree_nonces(
383        &self,
384        batch_id: &str,
385        cosigner_pubkey: PublicKey,
386        pub_nonce_tree: NoncePks,
387    ) -> Result<(), Error> {
388        let tree_nonces = pub_nonce_tree.encode();
389
390        ark_service_submit_tree_nonces(
391            &self.configuration,
392            SubmitTreeNoncesRequest {
393                batch_id: Some(batch_id.to_string()),
394                pubkey: Some(cosigner_pubkey.to_string()),
395                tree_nonces: Some(tree_nonces),
396            },
397        )
398        .await
399        .map_err(Error::request)?;
400
401        Ok(())
402    }
403
404    pub async fn submit_tree_signatures(
405        &self,
406        batch_id: &str,
407        cosigner_pk: PublicKey,
408        partial_sig_tree: PartialSigTree,
409    ) -> Result<(), Error> {
410        let tree_signatures = partial_sig_tree.encode();
411
412        ark_service_submit_tree_signatures(
413            &self.configuration,
414            SubmitTreeSignaturesRequest {
415                batch_id: Some(batch_id.to_string()),
416                pubkey: Some(cosigner_pk.to_string()),
417                tree_signatures: Some(tree_signatures),
418            },
419        )
420        .await
421        .map_err(Error::request)?;
422
423        Ok(())
424    }
425
426    pub async fn submit_signed_forfeit_txs(
427        &self,
428        signed_forfeit_txs: Vec<Psbt>,
429        signed_commitment_tx: Option<Psbt>,
430    ) -> Result<(), Error> {
431        let base64 = base64::engine::GeneralPurpose::new(
432            &base64::alphabet::STANDARD,
433            base64::engine::GeneralPurposeConfig::new(),
434        );
435
436        let signed_commitment_tx = signed_commitment_tx
437            .map(|tx| base64.encode(tx.serialize()))
438            .unwrap_or_default();
439
440        ark_service_submit_signed_forfeit_txs(
441            &self.configuration,
442            SubmitSignedForfeitTxsRequest {
443                signed_forfeit_txs: signed_forfeit_txs
444                    .iter()
445                    .map(|psbt| Some(base64.encode(psbt.serialize())))
446                    .collect(),
447                signed_commitment_tx: Some(signed_commitment_tx),
448            },
449        )
450        .await
451        .map_err(Error::request)?;
452
453        Ok(())
454    }
455
456    /// Allows to subscribe for tx notifications related to the provided
457    /// vtxo scripts.
458    ///
459    /// It can also be used to update an existing subscriptions by adding
460    /// new scripts to it.
461    ///
462    /// Note: for new subscriptions, don't provide a `subscription_id`
463    ///
464    /// Returns the subscription id if successful
465    pub async fn subscribe_to_scripts(
466        &self,
467        scripts: Vec<ArkAddress>,
468        subscription_id: Option<String>,
469    ) -> Result<String, Error> {
470        let scripts = scripts
471            .iter()
472            .map(|address| address.to_p2tr_script_pubkey().to_hex_string())
473            .collect::<Vec<_>>();
474
475        // For new subscription we expect empty string ("") here
476        let subscription_id = subscription_id.unwrap_or_default();
477
478        let response = indexer_service_subscribe_for_scripts(
479            &self.configuration,
480            SubscribeForScriptsRequest {
481                scripts: Some(scripts),
482                subscription_id: Some(subscription_id),
483            },
484        )
485        .await
486        .map_err(Error::request)?;
487
488        let subscription_id = response
489            .subscription_id
490            .ok_or(Error::request("No subscription id"))?;
491
492        Ok(subscription_id)
493    }
494
495    /// Allows to remove scripts from an existing subscription.
496    pub async fn unsubscribe_from_scripts(
497        &self,
498        scripts: Vec<ArkAddress>,
499        subscription_id: String,
500    ) -> Result<(), Error> {
501        let scripts = scripts
502            .iter()
503            .map(|address| address.to_p2tr_script_pubkey().to_hex_string())
504            .collect::<Vec<_>>();
505
506        let _ = indexer_service_unsubscribe_for_scripts(
507            &self.configuration,
508            UnsubscribeForScriptsRequest {
509                subscription_id: Some(subscription_id),
510                scripts: Some(scripts),
511            },
512        )
513        .await
514        .map_err(Error::request)?;
515
516        Ok(())
517    }
518
519    pub async fn get_subscription(
520        &self,
521        subscription_id: String,
522    ) -> Result<impl Stream<Item = Result<SubscriptionResponse, Error>> + Unpin, Error> {
523        // Build the URL with subscription_id parameter
524        let url = format!(
525            "{}/v1/script/subscription/{subscription_id}",
526            self.configuration.base_path,
527        );
528
529        // Create the request for SSE
530        let client = &self.configuration.client;
531        let request = client
532            .get(&url)
533            .header("Accept", "text/event-stream")
534            .send()
535            .await
536            .map_err(Error::request)?;
537
538        // Check if the request was successful
539        if !request.status().is_success() {
540            return Err(Error::request(format!(
541                "Subscription stream request failed with status: {}",
542                request.status()
543            )));
544        }
545
546        // Convert the response into a byte stream using async chunks
547        let byte_stream = request.bytes_stream();
548
549        // Create the SSE event stream
550        let stream = stream::unfold(byte_stream, |mut byte_stream| async move {
551            loop {
552                match byte_stream.next().await {
553                    Some(chunk_result) => {
554                        let result = match chunk_result {
555                            Ok(bytes) => {
556                                let event = String::from_utf8(bytes.to_vec());
557                                match event {
558                                    Ok(event) => {
559                                        let event = event.trim();
560                                        // Skip empty lines and SSE comments
561                                        if event.is_empty() || event.starts_with(':') {
562                                            continue;
563                                        }
564                                        // Strip SSE `data: ` prefix
565                                        let event = event.strip_prefix("data: ").unwrap_or(event);
566                                        if let Ok(response) =
567                                            serde_json::from_str::<models::GetSubscriptionResponse>(
568                                                event,
569                                            )
570                                        {
571                                            match SubscriptionResponse::try_from(response) {
572                                                Ok(subscription_response) => {
573                                                    Ok(subscription_response)
574                                                }
575                                                Err(e) => Err(Error::conversion(e)),
576                                            }
577                                        } else {
578                                            // Handle parse error
579                                            Err(Error::conversion("Failed to parse JSON"))
580                                        }
581                                    }
582                                    Err(error) => Err(Error::conversion(error)),
583                                }
584                            }
585                            Err(e) => Err(Error::request(e)),
586                        };
587                        return Some((result, byte_stream));
588                    }
589                    None => return None,
590                }
591            }
592        });
593
594        Ok(Box::pin(stream))
595    }
596
597    pub async fn get_virtual_txs(
598        &self,
599        txids: Vec<String>,
600        size_and_index: Option<(i32, i32)>,
601    ) -> Result<VirtualTxsResponse, Error> {
602        let (size, index) = size_and_index
603            .map(|(sz, indx)| (Some(sz), Some(indx)))
604            .unwrap_or_default();
605        let response = indexer_service_get_virtual_txs(&self.configuration, txids, size, index)
606            .await
607            .map_err(Error::request)?;
608
609        let base64 = &base64::engine::GeneralPurpose::new(
610            &base64::alphabet::STANDARD,
611            base64::engine::GeneralPurposeConfig::new(),
612        );
613
614        let txs = response
615            .txs
616            .unwrap_or_default()
617            .into_iter()
618            .map(|tx| {
619                let bytes = base64.decode(&tx).map_err(Error::conversion)?;
620                let psbt = Psbt::deserialize(&bytes).map_err(Error::conversion)?;
621
622                Ok(psbt)
623            })
624            .collect::<Result<Vec<Psbt>, Error>>()?;
625
626        Ok(VirtualTxsResponse {
627            txs,
628            page: response.page.map(|a| IndexerPage {
629                current: a.current.unwrap_or_default(),
630                next: a.next.unwrap_or_default(),
631                total: a.total.unwrap_or_default(),
632            }),
633        })
634    }
635}