1use crate::apis;
2use crate::apis::ark_service_api::ark_service_confirm_registration;
3use crate::apis::ark_service_api::ark_service_delete_intent;
4use crate::apis::ark_service_api::ark_service_finalize_tx;
5use crate::apis::ark_service_api::ark_service_get_info;
6use crate::apis::ark_service_api::ark_service_register_intent;
7use crate::apis::ark_service_api::ark_service_submit_signed_forfeit_txs;
8use crate::apis::ark_service_api::ark_service_submit_tree_nonces;
9use crate::apis::ark_service_api::ark_service_submit_tree_signatures;
10use crate::apis::ark_service_api::ark_service_submit_tx;
11use crate::apis::indexer_service_api::indexer_service_get_virtual_txs;
12use crate::apis::indexer_service_api::indexer_service_get_vtxos;
13use crate::apis::indexer_service_api::indexer_service_subscribe_for_scripts;
14use crate::apis::indexer_service_api::indexer_service_unsubscribe_for_scripts;
15use crate::models;
16use crate::models::ConfirmRegistrationRequest;
17use crate::models::Intent;
18use crate::models::SubmitSignedForfeitTxsRequest;
19use crate::models::SubmitTreeNoncesRequest;
20use crate::models::SubmitTreeSignaturesRequest;
21use crate::models::SubscribeForScriptsRequest;
22use crate::models::UnsubscribeForScriptsRequest;
23use crate::Error;
24use ark_core::server::FinalizeOffchainTxResponse;
25use ark_core::server::GetVtxosRequest;
26use ark_core::server::GetVtxosRequestFilter;
27use ark_core::server::GetVtxosRequestReference;
28use ark_core::server::IndexerPage;
29use ark_core::server::NoncePks;
30use ark_core::server::PartialSigTree;
31use ark_core::server::StreamEvent;
32use ark_core::server::SubmitOffchainTxResponse;
33use ark_core::server::SubscriptionResponse;
34use ark_core::server::VirtualTxOutPoint;
35use ark_core::server::VirtualTxsResponse;
36use ark_core::ArkAddress;
37use bitcoin::base64;
38use bitcoin::base64::Engine;
39use bitcoin::secp256k1::PublicKey;
40use bitcoin::Psbt;
41use bitcoin::Txid;
42use futures::stream;
43use futures::Stream;
44use futures::StreamExt;
45
46pub struct Client {
47 configuration: apis::configuration::Configuration,
48}
49
50pub struct ListVtxosResponse {
51 pub vtxos: Vec<VirtualTxOutPoint>,
52 pub page: Option<IndexerPage>,
53}
54
55impl Client {
56 pub fn new(ark_server_url: String) -> Result<Self, Error> {
57 let mut default_headers = reqwest::header::HeaderMap::new();
58 default_headers.insert(
59 "X-Build-Version",
60 reqwest::header::HeaderValue::from_static(env!("CARGO_PKG_VERSION")),
61 );
62 let client = reqwest::Client::builder()
63 .default_headers(default_headers)
64 .build()
65 .map_err(Error::request)?;
66
67 let configuration = apis::configuration::Configuration {
68 base_path: ark_server_url,
69 client,
70 ..Default::default()
71 };
72
73 Ok(Self { configuration })
74 }
75
76 pub async fn get_info(&self) -> Result<ark_core::server::Info, Error> {
77 let info = ark_service_get_info(&self.configuration)
78 .await
79 .map_err(Error::request)?;
80
81 let info = info.try_into()?;
82
83 Ok(info)
84 }
85
86 pub async fn submit_offchain_transaction_request(
87 &self,
88 ark_tx: Psbt,
89 checkpoint_txs: Vec<Psbt>,
90 ) -> Result<SubmitOffchainTxResponse, Error> {
91 let base64 = base64::engine::GeneralPurpose::new(
92 &base64::alphabet::STANDARD,
93 base64::engine::GeneralPurposeConfig::new(),
94 );
95
96 let ark_tx = base64.encode(ark_tx.serialize());
97
98 let checkpoint_txs = checkpoint_txs
99 .into_iter()
100 .map(|tx| Some(base64.encode(tx.serialize())))
101 .collect();
102
103 let res = ark_service_submit_tx(
104 &self.configuration,
105 models::SubmitTxRequest {
106 signed_ark_tx: Some(ark_tx),
107 checkpoint_txs,
108 },
109 )
110 .await
111 .map_err(Error::request)?;
112
113 let signed_ark_tx = res.final_ark_tx;
114 let signed_ark_tx = signed_ark_tx.ok_or(Error::request("Signed ark tx not received"))?;
115
116 let signed_ark_tx = base64.decode(signed_ark_tx).map_err(Error::conversion)?;
117 let signed_ark_tx = Psbt::deserialize(&signed_ark_tx).map_err(Error::conversion)?;
118
119 let signed_checkpoint_txs = res
120 .signed_checkpoint_txs
121 .ok_or(Error::request("Signed checkpoint tx not received"))?
122 .into_iter()
123 .map(|tx| {
124 let tx = base64.decode(tx).map_err(Error::conversion)?;
125 let tx = Psbt::deserialize(&tx).map_err(Error::conversion)?;
126
127 Ok(tx)
128 })
129 .collect::<Result<Vec<_>, Error>>()?;
130
131 Ok(SubmitOffchainTxResponse {
132 signed_ark_tx,
133 signed_checkpoint_txs,
134 })
135 }
136
137 pub async fn finalize_offchain_transaction(
138 &self,
139 txid: Txid,
140 checkpoint_txs: Vec<Psbt>,
141 ) -> Result<FinalizeOffchainTxResponse, Error> {
142 let base64 = base64::engine::GeneralPurpose::new(
143 &base64::alphabet::STANDARD,
144 base64::engine::GeneralPurposeConfig::new(),
145 );
146
147 let checkpoint_txs = checkpoint_txs
148 .into_iter()
149 .map(|tx| Some(base64.encode(tx.serialize())))
150 .collect();
151
152 ark_service_finalize_tx(
153 &self.configuration,
154 models::FinalizeTxRequest {
155 ark_txid: Some(txid.to_string()),
156 final_checkpoint_txs: checkpoint_txs,
157 },
158 )
159 .await
160 .map_err(Error::request)?;
161
162 Ok(FinalizeOffchainTxResponse {})
163 }
164
165 pub async fn list_vtxos(&self, request: GetVtxosRequest) -> Result<ListVtxosResponse, Error> {
166 let reference = request.reference();
167
168 if reference.is_empty() {
169 return Ok(ListVtxosResponse {
170 vtxos: Vec::new(),
171 page: None,
172 });
173 }
174
175 let filter = request.filter();
176
177 let (scripts, outpoints) = match reference {
178 GetVtxosRequestReference::Scripts(s) => (
179 Some(s.iter().map(|s| s.to_hex_string()).clone().collect()),
180 None,
181 ),
182 GetVtxosRequestReference::OutPoints(o) => {
183 (None, Some(o.iter().map(|o| o.to_string()).collect()))
184 }
185 };
186 let (spendable_only, spent_only, recoverable_only, pending_only) = match filter {
187 None => (Some(false), Some(false), Some(false), Some(false)),
188 Some(filter) => match filter {
189 GetVtxosRequestFilter::Spendable => {
190 (Some(true), Some(false), Some(false), Some(false))
191 }
192 GetVtxosRequestFilter::Spent => (Some(false), Some(true), Some(false), Some(false)),
193 GetVtxosRequestFilter::Recoverable => {
194 (Some(false), Some(false), Some(true), Some(false))
195 }
196 GetVtxosRequestFilter::PendingOnly => {
197 (Some(false), Some(false), Some(false), Some(true))
198 }
199 },
200 };
201
202 let page_period_size: Option<i32> = request.page().map(|p| p.size);
203 let page_period_index: Option<i32> = request.page().map(|p| p.index);
204
205 let before = request.before().map(|b| b as i64);
206 let after = request.after().map(|b| b as i64);
207
208 let response = indexer_service_get_vtxos(
209 &self.configuration,
210 scripts,
211 outpoints,
212 spendable_only,
213 spent_only,
214 recoverable_only,
215 pending_only,
216 before,
217 after,
218 page_period_size,
219 page_period_index,
220 )
221 .await
222 .map_err(Error::request)?;
223
224 let vtxos = response.vtxos.ok_or(Error::request("VTXOs not received"))?;
225 let vtxos = vtxos
226 .into_iter()
227 .map(VirtualTxOutPoint::try_from)
228 .collect::<Result<Vec<_>, crate::conversions::ConversionError>>()?;
229
230 let page = response.page.map(|p| IndexerPage {
231 current: p.current.unwrap_or_default(),
232 next: p.next.unwrap_or_default(),
233 total: p.total.unwrap_or_default(),
234 });
235
236 Ok(ListVtxosResponse { vtxos, page })
237 }
238
239 pub async fn register_intent(
240 &self,
241 intent_message: &ark_core::intent::IntentMessage,
242 proof: &Psbt,
243 ) -> Result<String, Error> {
244 let message = intent_message.encode().map_err(Error::conversion)?;
245 let base64 = base64::engine::GeneralPurpose::new(
246 &base64::alphabet::STANDARD,
247 base64::engine::GeneralPurposeConfig::new(),
248 );
249
250 let bytes = proof.serialize();
251
252 let proof = base64.encode(&bytes);
253
254 let response = ark_service_register_intent(
255 &self.configuration,
256 models::RegisterIntentRequest {
257 intent: Some(Intent {
258 proof: Some(proof),
259 message: Some(message),
260 }),
261 },
262 )
263 .await
264 .map_err(Error::request)?;
265 let intent_id = response
266 .intent_id
267 .ok_or(Error::request("Could not get intent id"))?;
268
269 Ok(intent_id)
270 }
271
272 pub async fn delete_intent(
273 &self,
274 intent_message: &ark_core::intent::IntentMessage,
275 proof: &Psbt,
276 ) -> Result<(), Error> {
277 let message = intent_message.encode().map_err(Error::conversion)?;
278 let base64 = base64::engine::GeneralPurpose::new(
279 &base64::alphabet::STANDARD,
280 base64::engine::GeneralPurposeConfig::new(),
281 );
282
283 let bytes = proof.serialize();
284
285 let proof = base64.encode(&bytes);
286 ark_service_delete_intent(
287 &self.configuration,
288 models::DeleteIntentRequest {
289 intent: Some(Intent {
290 proof: Some(proof),
291 message: Some(message),
292 }),
293 },
294 )
295 .await
296 .map_err(Error::request)?;
297
298 Ok(())
299 }
300
301 pub async fn get_event_stream(
302 &self,
303 topics: Vec<String>,
304 ) -> Result<impl Stream<Item = Result<StreamEvent, Error>> + Unpin, Error> {
305 let mut url = format!("{}/v1/batch/events", self.configuration.base_path);
307 if !topics.is_empty() {
308 let query_params: Vec<String> = topics
309 .iter()
310 .map(|topic| format!("topics={}", urlencoding::encode(topic)))
311 .collect();
312 url = format!("{}?{}", url, query_params.join("&"));
313 }
314
315 let client = &self.configuration.client;
317 let request = client
318 .get(&url)
319 .header("Accept", "text/event-stream")
320 .send()
321 .await
322 .map_err(Error::request)?;
323
324 if !request.status().is_success() {
326 return Err(Error::request(format!(
327 "Event stream request failed with status: {}",
328 request.status()
329 )));
330 }
331
332 let byte_stream = request.bytes_stream();
334
335 let stream = stream::unfold(byte_stream, |mut byte_stream| async move {
337 loop {
338 match byte_stream.next().await {
339 Some(chunk_result) => {
340 let result = match chunk_result {
341 Ok(bytes) => {
342 let event = String::from_utf8(bytes.to_vec());
343 match event {
344 Ok(event) => {
345 let event = event.trim();
346 if event.is_empty() || event.starts_with(':') {
348 continue;
349 }
350 let event = event.strip_prefix("data: ").unwrap_or(event);
352 if let Ok(response) =
353 serde_json::from_str::<models::GetEventStreamResponse>(
354 event,
355 )
356 {
357 match StreamEvent::try_from(response) {
358 Ok(stream_event) => Ok(stream_event),
359 Err(e) => Err(Error::conversion(e)),
360 }
361 } else {
362 Err(Error::conversion("Failed to parse JSON"))
364 }
365 }
366 Err(error) => Err(Error::conversion(error)),
367 }
368 }
369 Err(e) => Err(Error::request(e)),
370 };
371 return Some((result, byte_stream));
372 }
373 None => return None,
374 }
375 }
376 });
377
378 Ok(Box::pin(stream))
379 }
380 pub async fn confirm_registration(&self, intent_id: String) -> Result<(), Error> {
381 ark_service_confirm_registration(
382 &self.configuration,
383 ConfirmRegistrationRequest {
384 intent_id: Some(intent_id),
385 },
386 )
387 .await
388 .map_err(Error::request)?;
389
390 Ok(())
391 }
392
393 pub async fn submit_tree_nonces(
394 &self,
395 batch_id: &str,
396 cosigner_pubkey: PublicKey,
397 pub_nonce_tree: NoncePks,
398 ) -> Result<(), Error> {
399 let tree_nonces = pub_nonce_tree.encode();
400
401 ark_service_submit_tree_nonces(
402 &self.configuration,
403 SubmitTreeNoncesRequest {
404 batch_id: Some(batch_id.to_string()),
405 pubkey: Some(cosigner_pubkey.to_string()),
406 tree_nonces: Some(tree_nonces),
407 },
408 )
409 .await
410 .map_err(Error::request)?;
411
412 Ok(())
413 }
414
415 pub async fn submit_tree_signatures(
416 &self,
417 batch_id: &str,
418 cosigner_pk: PublicKey,
419 partial_sig_tree: PartialSigTree,
420 ) -> Result<(), Error> {
421 let tree_signatures = partial_sig_tree.encode();
422
423 ark_service_submit_tree_signatures(
424 &self.configuration,
425 SubmitTreeSignaturesRequest {
426 batch_id: Some(batch_id.to_string()),
427 pubkey: Some(cosigner_pk.to_string()),
428 tree_signatures: Some(tree_signatures),
429 },
430 )
431 .await
432 .map_err(Error::request)?;
433
434 Ok(())
435 }
436
437 pub async fn submit_signed_forfeit_txs(
438 &self,
439 signed_forfeit_txs: Vec<Psbt>,
440 signed_commitment_tx: Option<Psbt>,
441 ) -> Result<(), Error> {
442 let base64 = base64::engine::GeneralPurpose::new(
443 &base64::alphabet::STANDARD,
444 base64::engine::GeneralPurposeConfig::new(),
445 );
446
447 let signed_commitment_tx = signed_commitment_tx
448 .map(|tx| base64.encode(tx.serialize()))
449 .unwrap_or_default();
450
451 ark_service_submit_signed_forfeit_txs(
452 &self.configuration,
453 SubmitSignedForfeitTxsRequest {
454 signed_forfeit_txs: signed_forfeit_txs
455 .iter()
456 .map(|psbt| Some(base64.encode(psbt.serialize())))
457 .collect(),
458 signed_commitment_tx: Some(signed_commitment_tx),
459 },
460 )
461 .await
462 .map_err(Error::request)?;
463
464 Ok(())
465 }
466
467 pub async fn subscribe_to_scripts(
477 &self,
478 scripts: Vec<ArkAddress>,
479 subscription_id: Option<String>,
480 ) -> Result<String, Error> {
481 let scripts = scripts
482 .iter()
483 .map(|address| address.to_p2tr_script_pubkey().to_hex_string())
484 .collect::<Vec<_>>();
485
486 let subscription_id = subscription_id.unwrap_or_default();
488
489 let response = indexer_service_subscribe_for_scripts(
490 &self.configuration,
491 SubscribeForScriptsRequest {
492 scripts: Some(scripts),
493 subscription_id: Some(subscription_id),
494 },
495 )
496 .await
497 .map_err(Error::request)?;
498
499 let subscription_id = response
500 .subscription_id
501 .ok_or(Error::request("No subscription id"))?;
502
503 Ok(subscription_id)
504 }
505
506 pub async fn unsubscribe_from_scripts(
508 &self,
509 scripts: Vec<ArkAddress>,
510 subscription_id: String,
511 ) -> Result<(), Error> {
512 let scripts = scripts
513 .iter()
514 .map(|address| address.to_p2tr_script_pubkey().to_hex_string())
515 .collect::<Vec<_>>();
516
517 let _ = indexer_service_unsubscribe_for_scripts(
518 &self.configuration,
519 UnsubscribeForScriptsRequest {
520 subscription_id: Some(subscription_id),
521 scripts: Some(scripts),
522 },
523 )
524 .await
525 .map_err(Error::request)?;
526
527 Ok(())
528 }
529
530 pub async fn get_subscription(
531 &self,
532 subscription_id: String,
533 ) -> Result<impl Stream<Item = Result<SubscriptionResponse, Error>> + Unpin, Error> {
534 let url = format!(
536 "{}/v1/script/subscription/{subscription_id}",
537 self.configuration.base_path,
538 );
539
540 let client = &self.configuration.client;
542 let request = client
543 .get(&url)
544 .header("Accept", "text/event-stream")
545 .send()
546 .await
547 .map_err(Error::request)?;
548
549 if !request.status().is_success() {
551 return Err(Error::request(format!(
552 "Subscription stream request failed with status: {}",
553 request.status()
554 )));
555 }
556
557 let byte_stream = request.bytes_stream();
559
560 let stream = stream::unfold(byte_stream, |mut byte_stream| async move {
562 loop {
563 match byte_stream.next().await {
564 Some(chunk_result) => {
565 let result = match chunk_result {
566 Ok(bytes) => {
567 let event = String::from_utf8(bytes.to_vec());
568 match event {
569 Ok(event) => {
570 let event = event.trim();
571 if event.is_empty() || event.starts_with(':') {
573 continue;
574 }
575 let event = event.strip_prefix("data: ").unwrap_or(event);
577 if let Ok(response) =
578 serde_json::from_str::<models::GetSubscriptionResponse>(
579 event,
580 )
581 {
582 match SubscriptionResponse::try_from(response) {
583 Ok(subscription_response) => {
584 Ok(subscription_response)
585 }
586 Err(e) => Err(Error::conversion(e)),
587 }
588 } else {
589 Err(Error::conversion("Failed to parse JSON"))
591 }
592 }
593 Err(error) => Err(Error::conversion(error)),
594 }
595 }
596 Err(e) => Err(Error::request(e)),
597 };
598 return Some((result, byte_stream));
599 }
600 None => return None,
601 }
602 }
603 });
604
605 Ok(Box::pin(stream))
606 }
607
608 pub async fn get_virtual_txs(
609 &self,
610 txids: Vec<String>,
611 size_and_index: Option<(i32, i32)>,
612 ) -> Result<VirtualTxsResponse, Error> {
613 let (size, index) = size_and_index
614 .map(|(sz, indx)| (Some(sz), Some(indx)))
615 .unwrap_or_default();
616 let response = indexer_service_get_virtual_txs(&self.configuration, txids, size, index)
617 .await
618 .map_err(Error::request)?;
619
620 let base64 = &base64::engine::GeneralPurpose::new(
621 &base64::alphabet::STANDARD,
622 base64::engine::GeneralPurposeConfig::new(),
623 );
624
625 let txs = response
626 .txs
627 .unwrap_or_default()
628 .into_iter()
629 .map(|tx| {
630 let bytes = base64.decode(&tx).map_err(Error::conversion)?;
631 let psbt = Psbt::deserialize(&bytes).map_err(Error::conversion)?;
632
633 Ok(psbt)
634 })
635 .collect::<Result<Vec<Psbt>, Error>>()?;
636
637 Ok(VirtualTxsResponse {
638 txs,
639 page: response.page.map(|a| IndexerPage {
640 current: a.current.unwrap_or_default(),
641 next: a.next.unwrap_or_default(),
642 total: a.total.unwrap_or_default(),
643 }),
644 })
645 }
646}