1use std::{fmt::Debug, time::Duration};
2
3use async_trait::async_trait;
4use bs58;
5use photon_api::{
6 apis::configuration::{ApiKey, Configuration},
7 models::GetCompressedAccountsByOwnerPostRequestParams,
8};
9use solana_pubkey::Pubkey;
10use tracing::{error, trace, warn};
11
12use super::{
13 types::{
14 CompressedAccount, CompressedTokenAccount, OwnerBalance, QueueElementsResult,
15 SignatureWithMetadata, TokenBalance,
16 },
17 BatchAddressUpdateIndexerResponse,
18};
19use crate::indexer::{
20 base58::Base58Conversions,
21 config::RetryConfig,
22 response::{Context, Items, ItemsWithCursor, Response},
23 Address, AddressWithTree, GetCompressedAccountsByOwnerConfig,
24 GetCompressedTokenAccountsByOwnerOrDelegateOptions, Hash, Indexer, IndexerError,
25 IndexerRpcConfig, MerkleProof, NewAddressProofWithContext, PaginatedOptions,
26};
27
28pub struct PhotonIndexer {
30 configuration: Configuration,
31}
32
33impl PhotonIndexer {
34 pub fn default_path() -> String {
35 "http://127.0.0.1:8784".to_string()
36 }
37}
38
39impl PhotonIndexer {
40 async fn retry<F, Fut, T>(
41 &self,
42 config: RetryConfig,
43 mut operation: F,
44 ) -> Result<T, IndexerError>
45 where
46 F: FnMut() -> Fut,
47 Fut: std::future::Future<Output = Result<T, IndexerError>>,
48 {
49 let max_retries = config.num_retries;
50 let mut attempts = 0;
51 let mut delay_ms = config.delay_ms;
52 let max_delay_ms = config.max_delay_ms;
53
54 loop {
55 attempts += 1;
56
57 trace!(
58 "Attempt {}/{}: No rate limiter configured",
59 attempts,
60 max_retries
61 );
62
63 trace!("Attempt {}/{}: Executing operation", attempts, max_retries);
64 let result = operation().await;
65
66 match result {
67 Ok(value) => {
68 trace!("Attempt {}/{}: Operation succeeded.", attempts, max_retries);
69 return Ok(value);
70 }
71 Err(e) => {
72 let is_retryable = match &e {
73 IndexerError::ApiError(_) => {
74 warn!("API Error: {}", e);
75 true
76 }
77 IndexerError::PhotonError {
78 context: _,
79 message: _,
80 } => {
81 warn!("Operation failed, checking if retryable...");
82 true
83 }
84 IndexerError::IndexerNotSyncedToSlot => true,
85 IndexerError::Base58DecodeError { .. } => false,
86 IndexerError::AccountNotFound => false,
87 IndexerError::InvalidParameters(_) => false,
88 IndexerError::NotImplemented(_) => false,
89 _ => false,
90 };
91
92 if is_retryable && attempts < max_retries {
93 warn!(
94 "Attempt {}/{}: Operation failed. Retrying",
95 attempts, max_retries
96 );
97
98 tokio::time::sleep(Duration::from_millis(delay_ms)).await;
99 delay_ms = std::cmp::min(delay_ms * 2, max_delay_ms);
100 } else {
101 if is_retryable {
102 error!("Operation failed after max retries.");
103 } else {
104 error!("Operation failed with non-retryable error.");
105 }
106 return Err(e);
107 }
108 }
109 }
110 }
111 }
112}
113
114impl PhotonIndexer {
115 pub fn new(path: String, api_key: Option<String>) -> Self {
116 let configuration = Configuration {
117 base_path: path,
118 api_key: api_key.map(|key| ApiKey {
119 prefix: Some("api-key".to_string()),
120 key,
121 }),
122 ..Default::default()
123 };
124
125 PhotonIndexer { configuration }
126 }
127
128 pub fn new_with_config(configuration: Configuration) -> Self {
129 PhotonIndexer { configuration }
130 }
131
132 fn extract_result<T>(context: &str, result: Option<T>) -> Result<T, IndexerError> {
133 result.ok_or_else(|| IndexerError::missing_result(context, "value not present"))
134 }
135
136 fn extract_result_with_error_check<T>(
137 context: &str,
138 error: Option<Box<photon_api::models::GetBatchAddressUpdateInfoPost200ResponseError>>,
139 result: Option<T>,
140 ) -> Result<T, IndexerError> {
141 if let Some(error) = error {
142 let error_message = error
143 .clone()
144 .message
145 .unwrap_or_else(|| format!("Unknown API error: {:?}", error).to_string());
146 return Err(IndexerError::ApiError(format!(
147 "API error in {} (code: {:?}): {}",
148 context, error.code, error_message
149 )));
150 }
151
152 Self::extract_result(context, result)
153 }
154
155 fn build_account_params(
156 &self,
157 address: Option<Address>,
158 hash: Option<Hash>,
159 ) -> Result<photon_api::models::GetCompressedAccountPostRequestParams, IndexerError> {
160 match (address, hash) {
161 (None, None) => Err(IndexerError::InvalidParameters(
162 "Either address or hash must be provided".to_string(),
163 )),
164 (Some(_), Some(_)) => Err(IndexerError::InvalidParameters(
165 "Only one of address or hash must be provided".to_string(),
166 )),
167 (address, hash) => Ok(photon_api::models::GetCompressedAccountPostRequestParams {
168 address: address.map(|x| x.to_base58()),
169 hash: hash.map(|x| x.to_base58()),
170 }),
171 }
172 }
173}
174
175impl Debug for PhotonIndexer {
176 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
177 f.debug_struct("PhotonIndexer")
178 .field("configuration", &self.configuration)
179 .finish()
180 }
181}
182
183#[async_trait]
184impl Indexer for PhotonIndexer {
185 async fn get_compressed_account(
186 &self,
187 address: Address,
188 config: Option<IndexerRpcConfig>,
189 ) -> Result<Response<Option<CompressedAccount>>, IndexerError> {
190 let config = config.unwrap_or_default();
191 self.retry(config.retry_config, || async {
192 let params = self.build_account_params(Some(address), None)?;
193 let request = photon_api::models::GetCompressedAccountPostRequest {
194 params: Box::new(params),
195 ..Default::default()
196 };
197
198 let result = photon_api::apis::default_api::get_compressed_account_post(
199 &self.configuration,
200 request,
201 )
202 .await?;
203 let api_response = Self::extract_result_with_error_check(
204 "get_compressed_account",
205 result.error,
206 result.result.map(|r| *r),
207 )?;
208 if api_response.context.slot < config.slot {
209 return Err(IndexerError::IndexerNotSyncedToSlot);
210 }
211 let account = match api_response.value {
212 Some(boxed) => Some(CompressedAccount::try_from(&*boxed)?),
213 None => None,
214 };
215
216 Ok(Response {
217 context: Context {
218 slot: api_response.context.slot,
219 },
220 value: account,
221 })
222 })
223 .await
224 }
225
226 async fn get_compressed_account_by_hash(
227 &self,
228 hash: Hash,
229 config: Option<IndexerRpcConfig>,
230 ) -> Result<Response<Option<CompressedAccount>>, IndexerError> {
231 let config = config.unwrap_or_default();
232 self.retry(config.retry_config, || async {
233 let params = self.build_account_params(None, Some(hash))?;
234 let request = photon_api::models::GetCompressedAccountPostRequest {
235 params: Box::new(params),
236 ..Default::default()
237 };
238
239 let result = photon_api::apis::default_api::get_compressed_account_post(
240 &self.configuration,
241 request,
242 )
243 .await?;
244 let api_response = Self::extract_result_with_error_check(
245 "get_compressed_account_by_hash",
246 result.error,
247 result.result.map(|r| *r),
248 )?;
249 if api_response.context.slot < config.slot {
250 return Err(IndexerError::IndexerNotSyncedToSlot);
251 }
252 let account = match api_response.value {
253 Some(boxed) => Some(CompressedAccount::try_from(&*boxed)?),
254 None => None,
255 };
256
257 Ok(Response {
258 context: Context {
259 slot: api_response.context.slot,
260 },
261 value: account,
262 })
263 })
264 .await
265 }
266
267 async fn get_compressed_accounts_by_owner(
268 &self,
269 owner: &Pubkey,
270 options: Option<GetCompressedAccountsByOwnerConfig>,
271 config: Option<IndexerRpcConfig>,
272 ) -> Result<Response<ItemsWithCursor<CompressedAccount>>, IndexerError> {
273 let config = config.unwrap_or_default();
274 self.retry(config.retry_config, || async {
275 #[cfg(feature = "v2")]
276 {
277 let request = photon_api::models::GetCompressedAccountsByOwnerV2PostRequest {
278 params: Box::from(GetCompressedAccountsByOwnerPostRequestParams {
279 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
280 data_slice: options.as_ref().and_then(|o| {
281 o.data_slice.as_ref().map(|ds| {
282 Box::new(photon_api::models::DataSlice {
283 length: ds.length as u32,
284 offset: ds.offset as u32,
285 })
286 })
287 }),
288 filters: options.as_ref().and_then(|o| o.filters_to_photon()),
289 limit: options.as_ref().and_then(|o| o.limit),
290 owner: owner.to_string(),
291 }),
292 ..Default::default()
293 };
294 let result =
295 photon_api::apis::default_api::get_compressed_accounts_by_owner_v2_post(
296 &self.configuration,
297 request,
298 )
299 .await?;
300 let response = result.result.ok_or(IndexerError::AccountNotFound)?;
301 if response.context.slot < config.slot {
302 return Err(IndexerError::IndexerNotSyncedToSlot);
303 }
304 let accounts: Result<Vec<_>, _> = response
305 .value
306 .items
307 .iter()
308 .map(CompressedAccount::try_from)
309 .collect();
310
311 let cursor = response.value.cursor;
312
313 Ok(Response {
314 context: Context {
315 slot: response.context.slot,
316 },
317 value: ItemsWithCursor {
318 items: accounts?,
319 cursor,
320 },
321 })
322 }
323 #[cfg(not(feature = "v2"))]
324 {
325 let request = photon_api::models::GetCompressedAccountsByOwnerPostRequest {
326 params: Box::from(GetCompressedAccountsByOwnerPostRequestParams {
327 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
328 data_slice: options.as_ref().and_then(|o| {
329 o.data_slice.as_ref().map(|ds| {
330 Box::new(photon_api::models::DataSlice {
331 length: ds.length as u32,
332 offset: ds.offset as u32,
333 })
334 })
335 }),
336 filters: options.as_ref().and_then(|o| o.filters_to_photon()),
337 limit: options.as_ref().and_then(|o| o.limit),
338 owner: owner.to_string(),
339 }),
340 ..Default::default()
341 };
342 let result = photon_api::apis::default_api::get_compressed_accounts_by_owner_post(
343 &self.configuration,
344 request,
345 )
346 .await?;
347 let response = result.result.ok_or(IndexerError::AccountNotFound)?;
348 if response.context.slot < config.slot {
349 return Err(IndexerError::IndexerNotSyncedToSlot);
350 }
351 let accounts: Result<Vec<_>, _> = response
352 .value
353 .items
354 .iter()
355 .map(CompressedAccount::try_from)
356 .collect();
357
358 let cursor = response.value.cursor;
359
360 Ok(Response {
361 context: Context {
362 slot: response.context.slot,
363 },
364 value: ItemsWithCursor {
365 items: accounts?,
366 cursor,
367 },
368 })
369 }
370 })
371 .await
372 }
373
374 async fn get_compressed_balance(
375 &self,
376 address: Option<Address>,
377 hash: Option<Hash>,
378 config: Option<IndexerRpcConfig>,
379 ) -> Result<Response<u64>, IndexerError> {
380 let config = config.unwrap_or_default();
381 self.retry(config.retry_config, || async {
382 let params = self.build_account_params(address, hash)?;
383 let request = photon_api::models::GetCompressedAccountBalancePostRequest {
384 params: Box::new(params),
385 ..Default::default()
386 };
387
388 let result = photon_api::apis::default_api::get_compressed_account_balance_post(
389 &self.configuration,
390 request,
391 )
392 .await?;
393
394 let api_response = Self::extract_result_with_error_check(
395 "get_compressed_account_balance",
396 result.error,
397 result.result.map(|r| *r),
398 )?;
399 if api_response.context.slot < config.slot {
400 return Err(IndexerError::IndexerNotSyncedToSlot);
401 }
402 Ok(Response {
403 context: Context {
404 slot: api_response.context.slot,
405 },
406 value: api_response.value,
407 })
408 })
409 .await
410 }
411
412 async fn get_compressed_balance_by_owner(
413 &self,
414 owner: &Pubkey,
415 config: Option<IndexerRpcConfig>,
416 ) -> Result<Response<u64>, IndexerError> {
417 let config = config.unwrap_or_default();
418 self.retry(config.retry_config, || async {
419 let request = photon_api::models::GetCompressedBalanceByOwnerPostRequest {
420 params: Box::new(
421 photon_api::models::GetCompressedBalanceByOwnerPostRequestParams {
422 owner: owner.to_string(),
423 },
424 ),
425 ..Default::default()
426 };
427
428 let result = photon_api::apis::default_api::get_compressed_balance_by_owner_post(
429 &self.configuration,
430 request,
431 )
432 .await?;
433
434 let api_response = Self::extract_result_with_error_check(
435 "get_compressed_balance_by_owner",
436 result.error,
437 result.result.map(|r| *r),
438 )?;
439 if api_response.context.slot < config.slot {
440 return Err(IndexerError::IndexerNotSyncedToSlot);
441 }
442 Ok(Response {
443 context: Context {
444 slot: api_response.context.slot,
445 },
446 value: api_response.value,
447 })
448 })
449 .await
450 }
451
452 async fn get_compressed_mint_token_holders(
453 &self,
454 mint: &Pubkey,
455 options: Option<PaginatedOptions>,
456 config: Option<IndexerRpcConfig>,
457 ) -> Result<Response<ItemsWithCursor<OwnerBalance>>, IndexerError> {
458 let config = config.unwrap_or_default();
459 self.retry(config.retry_config, || async {
460 let request = photon_api::models::GetCompressedMintTokenHoldersPostRequest {
461 params: Box::new(
462 photon_api::models::GetCompressedMintTokenHoldersPostRequestParams {
463 mint: mint.to_string(),
464 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
465 limit: options.as_ref().and_then(|o| o.limit),
466 },
467 ),
468 ..Default::default()
469 };
470
471 let result = photon_api::apis::default_api::get_compressed_mint_token_holders_post(
472 &self.configuration,
473 request,
474 )
475 .await?;
476
477 let api_response = Self::extract_result_with_error_check(
478 "get_compressed_mint_token_holders",
479 result.error,
480 result.result.map(|r| *r),
481 )?;
482 if api_response.context.slot < config.slot {
483 return Err(IndexerError::IndexerNotSyncedToSlot);
484 }
485 let owner_balances: Result<Vec<_>, _> = api_response
486 .value
487 .items
488 .iter()
489 .map(OwnerBalance::try_from)
490 .collect();
491
492 let cursor = api_response.value.cursor;
493
494 Ok(Response {
495 context: Context {
496 slot: api_response.context.slot,
497 },
498 value: ItemsWithCursor {
499 items: owner_balances?,
500 cursor,
501 },
502 })
503 })
504 .await
505 }
506
507 async fn get_compressed_token_account_balance(
508 &self,
509 address: Option<Address>,
510 hash: Option<Hash>,
511 config: Option<IndexerRpcConfig>,
512 ) -> Result<Response<u64>, IndexerError> {
513 let config = config.unwrap_or_default();
514 self.retry(config.retry_config, || async {
515 let request = photon_api::models::GetCompressedTokenAccountBalancePostRequest {
516 params: Box::new(photon_api::models::GetCompressedAccountPostRequestParams {
517 address: address.map(|x| x.to_base58()),
518 hash: hash.map(|x| x.to_base58()),
519 }),
520 ..Default::default()
521 };
522
523 let result = photon_api::apis::default_api::get_compressed_token_account_balance_post(
524 &self.configuration,
525 request,
526 )
527 .await?;
528
529 let api_response = Self::extract_result_with_error_check(
530 "get_compressed_token_account_balance",
531 result.error,
532 result.result.map(|r| *r),
533 )?;
534 if api_response.context.slot < config.slot {
535 return Err(IndexerError::IndexerNotSyncedToSlot);
536 }
537 Ok(Response {
538 context: Context {
539 slot: api_response.context.slot,
540 },
541 value: api_response.value.amount,
542 })
543 })
544 .await
545 }
546
547 async fn get_compressed_token_accounts_by_delegate(
548 &self,
549 delegate: &Pubkey,
550 options: Option<GetCompressedTokenAccountsByOwnerOrDelegateOptions>,
551 config: Option<IndexerRpcConfig>,
552 ) -> Result<Response<ItemsWithCursor<CompressedTokenAccount>>, IndexerError> {
553 let config = config.unwrap_or_default();
554 self.retry(config.retry_config, || async {
555 #[cfg(feature = "v2")]
556 {
557 let request = photon_api::models::GetCompressedTokenAccountsByDelegateV2PostRequest {
558 params: Box::new(
559 photon_api::models::GetCompressedTokenAccountsByDelegatePostRequestParams {
560 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
561 limit: options.as_ref().and_then(|o| o.limit),
562 mint: options.as_ref().and_then(|o| o.mint.as_ref()).map(|x| x.to_string()),
563 delegate: delegate.to_string(),
564 },
565 ),
566 ..Default::default()
567 };
568
569 let result = photon_api::apis::default_api::get_compressed_token_accounts_by_delegate_v2_post(
570 &self.configuration,
571 request,
572 )
573 .await?;
574
575 let response = result.result.ok_or(IndexerError::AccountNotFound)?;
576 if response.context.slot < config.slot {
577 return Err(IndexerError::IndexerNotSyncedToSlot);
578 }
579
580 let token_accounts: Result<Vec<_>, _> = response
581 .value
582 .items
583 .iter()
584 .map(CompressedTokenAccount::try_from)
585 .collect();
586
587 let cursor = response.value.cursor;
588
589 Ok(Response {
590 context: Context {
591 slot: response.context.slot,
592 },
593 value: ItemsWithCursor {
594 items: token_accounts?,
595 cursor,
596 },
597 })
598 }
599 #[cfg(not(feature = "v2"))]
600 {
601 let request = photon_api::models::GetCompressedTokenAccountsByDelegatePostRequest {
602 params: Box::new(
603 photon_api::models::GetCompressedTokenAccountsByDelegatePostRequestParams {
604 delegate: delegate.to_string(),
605 mint: options.as_ref().and_then(|o| o.mint.as_ref()).map(|x| x.to_string()),
606 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
607 limit: options.as_ref().and_then(|o| o.limit),
608 },
609 ),
610 ..Default::default()
611 };
612
613 let result = photon_api::apis::default_api::get_compressed_token_accounts_by_delegate_post(
614 &self.configuration,
615 request,
616 )
617 .await?;
618
619 let response = result.result.ok_or(IndexerError::AccountNotFound)?;
620 if response.context.slot < config.slot {
621 return Err(IndexerError::IndexerNotSyncedToSlot);
622 }
623
624 let token_accounts: Result<Vec<_>, _> = response
625 .value
626 .items
627 .iter()
628 .map(CompressedTokenAccount::try_from)
629 .collect();
630
631 let cursor = response.value.cursor;
632
633 Ok(Response {
634 context: Context {
635 slot: response.context.slot,
636 },
637 value: ItemsWithCursor {
638 items: token_accounts?,
639 cursor,
640 },
641 })
642 }
643 })
644 .await
645 }
646
647 async fn get_compressed_token_accounts_by_owner(
648 &self,
649 owner: &Pubkey,
650 options: Option<GetCompressedTokenAccountsByOwnerOrDelegateOptions>,
651 config: Option<IndexerRpcConfig>,
652 ) -> Result<Response<ItemsWithCursor<CompressedTokenAccount>>, IndexerError> {
653 let config = config.unwrap_or_default();
654 self.retry(config.retry_config, || async {
655 #[cfg(feature = "v2")]
656 {
657 let request = photon_api::models::GetCompressedTokenAccountsByOwnerV2PostRequest {
658 params: Box::from(
659 photon_api::models::GetCompressedTokenAccountsByOwnerPostRequestParams {
660 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
661 limit: options.as_ref().and_then(|o| o.limit),
662 mint: options
663 .as_ref()
664 .and_then(|o| o.mint.as_ref())
665 .map(|x| x.to_string()),
666 owner: owner.to_string(),
667 },
668 ),
669 ..Default::default()
670 };
671 let result =
672 photon_api::apis::default_api::get_compressed_token_accounts_by_owner_v2_post(
673 &self.configuration,
674 request,
675 )
676 .await?;
677 let response = result.result.ok_or(IndexerError::AccountNotFound)?;
678 if response.context.slot < config.slot {
679 return Err(IndexerError::IndexerNotSyncedToSlot);
680 }
681 let token_accounts: Result<Vec<_>, _> = response
682 .value
683 .items
684 .iter()
685 .map(CompressedTokenAccount::try_from)
686 .collect();
687
688 let cursor = response.value.cursor;
689
690 Ok(Response {
691 context: Context {
692 slot: response.context.slot,
693 },
694 value: ItemsWithCursor {
695 items: token_accounts?,
696 cursor,
697 },
698 })
699 }
700 #[cfg(not(feature = "v2"))]
701 {
702 let request = photon_api::models::GetCompressedTokenAccountsByOwnerPostRequest {
703 params: Box::new(
704 photon_api::models::GetCompressedTokenAccountsByOwnerPostRequestParams {
705 owner: owner.to_string(),
706 mint: options
707 .as_ref()
708 .and_then(|o| o.mint.as_ref())
709 .map(|x| x.to_string()),
710 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
711 limit: options.as_ref().and_then(|o| o.limit),
712 },
713 ),
714 ..Default::default()
715 };
716
717 let result =
718 photon_api::apis::default_api::get_compressed_token_accounts_by_owner_post(
719 &self.configuration,
720 request,
721 )
722 .await?;
723
724 let response = Self::extract_result_with_error_check(
725 "get_compressed_token_accounts_by_owner",
726 result.error,
727 result.result.map(|r| *r),
728 )?;
729 if response.context.slot < config.slot {
730 return Err(IndexerError::IndexerNotSyncedToSlot);
731 }
732 let token_accounts: Result<Vec<_>, _> = response
733 .value
734 .items
735 .iter()
736 .map(CompressedTokenAccount::try_from)
737 .collect();
738
739 let cursor = response.value.cursor;
740
741 Ok(Response {
742 context: Context {
743 slot: response.context.slot,
744 },
745 value: ItemsWithCursor {
746 items: token_accounts?,
747 cursor,
748 },
749 })
750 }
751 })
752 .await
753 }
754
755 async fn get_compressed_token_balances_by_owner_v2(
756 &self,
757 owner: &Pubkey,
758 options: Option<GetCompressedTokenAccountsByOwnerOrDelegateOptions>,
759 config: Option<IndexerRpcConfig>,
760 ) -> Result<Response<ItemsWithCursor<TokenBalance>>, IndexerError> {
761 let config = config.unwrap_or_default();
762 self.retry(config.retry_config, || async {
763 #[cfg(feature = "v2")]
764 {
765 let request = photon_api::models::GetCompressedTokenBalancesByOwnerV2PostRequest {
766 params: Box::new(
767 photon_api::models::GetCompressedTokenAccountsByOwnerPostRequestParams {
768 owner: owner.to_string(),
769 mint: options
770 .as_ref()
771 .and_then(|o| o.mint.as_ref())
772 .map(|x| x.to_string()),
773 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
774 limit: options.as_ref().and_then(|o| o.limit),
775 },
776 ),
777 ..Default::default()
778 };
779
780 let result =
781 photon_api::apis::default_api::get_compressed_token_balances_by_owner_v2_post(
782 &self.configuration,
783 request,
784 )
785 .await?;
786
787 let api_response = Self::extract_result_with_error_check(
788 "get_compressed_token_balances_by_owner_v2",
789 result.error,
790 result.result.map(|r| *r),
791 )?;
792 if api_response.context.slot < config.slot {
793 return Err(IndexerError::IndexerNotSyncedToSlot);
794 }
795
796 let token_balances: Result<Vec<_>, _> = api_response
797 .value
798 .items
799 .iter()
800 .map(TokenBalance::try_from)
801 .collect();
802
803 Ok(Response {
804 context: Context {
805 slot: api_response.context.slot,
806 },
807 value: ItemsWithCursor {
808 items: token_balances?,
809 cursor: api_response.value.cursor,
810 },
811 })
812 }
813 #[cfg(not(feature = "v2"))]
814 {
815 let request = photon_api::models::GetCompressedTokenBalancesByOwnerPostRequest {
816 params: Box::new(
817 photon_api::models::GetCompressedTokenAccountsByOwnerPostRequestParams {
818 owner: owner.to_string(),
819 mint: options
820 .as_ref()
821 .and_then(|o| o.mint.as_ref())
822 .map(|x| x.to_string()),
823 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
824 limit: options.as_ref().and_then(|o| o.limit),
825 },
826 ),
827 ..Default::default()
828 };
829
830 let result =
831 photon_api::apis::default_api::get_compressed_token_balances_by_owner_post(
832 &self.configuration,
833 request,
834 )
835 .await?;
836
837 let api_response = Self::extract_result_with_error_check(
838 "get_compressed_token_balances_by_owner",
839 result.error,
840 result.result.map(|r| *r),
841 )?;
842 if api_response.context.slot < config.slot {
843 return Err(IndexerError::IndexerNotSyncedToSlot);
844 }
845
846 let token_balances: Result<Vec<_>, _> = api_response
847 .value
848 .token_balances
849 .iter()
850 .map(TokenBalance::try_from)
851 .collect();
852
853 Ok(Response {
854 context: Context {
855 slot: api_response.context.slot,
856 },
857 value: ItemsWithCursor {
858 items: token_balances?,
859 cursor: api_response.value.cursor,
860 },
861 })
862 }
863 })
864 .await
865 }
866
867 async fn get_compression_signatures_for_account(
868 &self,
869 hash: Hash,
870 config: Option<IndexerRpcConfig>,
871 ) -> Result<Response<Items<SignatureWithMetadata>>, IndexerError> {
872 let config = config.unwrap_or_default();
873 self.retry(config.retry_config, || async {
874 let request = photon_api::models::GetCompressionSignaturesForAccountPostRequest {
875 params: Box::new(
876 photon_api::models::GetCompressedAccountProofPostRequestParams {
877 hash: hash.to_base58(),
878 },
879 ),
880 ..Default::default()
881 };
882
883 let result =
884 photon_api::apis::default_api::get_compression_signatures_for_account_post(
885 &self.configuration,
886 request,
887 )
888 .await?;
889
890 let api_response = Self::extract_result_with_error_check(
891 "get_compression_signatures_for_account",
892 result.error,
893 result.result.map(|r| *r),
894 )?;
895 if api_response.context.slot < config.slot {
896 return Err(IndexerError::IndexerNotSyncedToSlot);
897 }
898 let signatures = api_response
899 .value
900 .items
901 .iter()
902 .map(SignatureWithMetadata::try_from)
903 .collect::<Result<Vec<SignatureWithMetadata>, IndexerError>>()?;
904
905 Ok(Response {
906 context: Context {
907 slot: api_response.context.slot,
908 },
909 value: Items { items: signatures },
910 })
911 })
912 .await
913 }
914
915 async fn get_compression_signatures_for_address(
916 &self,
917 address: &[u8; 32],
918 options: Option<PaginatedOptions>,
919 config: Option<IndexerRpcConfig>,
920 ) -> Result<Response<ItemsWithCursor<SignatureWithMetadata>>, IndexerError> {
921 let config = config.unwrap_or_default();
922 self.retry(config.retry_config, || async {
923 let request = photon_api::models::GetCompressionSignaturesForAddressPostRequest {
924 params: Box::new(
925 photon_api::models::GetCompressionSignaturesForAddressPostRequestParams {
926 address: address.to_base58(),
927 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
928 limit: options.as_ref().and_then(|o| o.limit),
929 },
930 ),
931 ..Default::default()
932 };
933
934 let result =
935 photon_api::apis::default_api::get_compression_signatures_for_address_post(
936 &self.configuration,
937 request,
938 )
939 .await?;
940
941 let api_response = Self::extract_result_with_error_check(
942 "get_compression_signatures_for_address",
943 result.error,
944 result.result.map(|r| *r),
945 )?;
946 if api_response.context.slot < config.slot {
947 return Err(IndexerError::IndexerNotSyncedToSlot);
948 }
949
950 let signatures = api_response
951 .value
952 .items
953 .iter()
954 .map(SignatureWithMetadata::try_from)
955 .collect::<Result<Vec<SignatureWithMetadata>, IndexerError>>()?;
956
957 let cursor = api_response.value.cursor;
958
959 Ok(Response {
960 context: Context {
961 slot: api_response.context.slot,
962 },
963 value: ItemsWithCursor {
964 items: signatures,
965 cursor,
966 },
967 })
968 })
969 .await
970 }
971
972 async fn get_compression_signatures_for_owner(
973 &self,
974 owner: &Pubkey,
975 options: Option<PaginatedOptions>,
976 config: Option<IndexerRpcConfig>,
977 ) -> Result<Response<ItemsWithCursor<SignatureWithMetadata>>, IndexerError> {
978 let config = config.unwrap_or_default();
979 self.retry(config.retry_config, || async {
980 let request = photon_api::models::GetCompressionSignaturesForOwnerPostRequest {
981 params: Box::new(
982 photon_api::models::GetCompressionSignaturesForOwnerPostRequestParams {
983 owner: owner.to_string(),
984 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
985 limit: options.as_ref().and_then(|o| o.limit),
986 },
987 ),
988 ..Default::default()
989 };
990
991 let result = photon_api::apis::default_api::get_compression_signatures_for_owner_post(
992 &self.configuration,
993 request,
994 )
995 .await?;
996
997 let api_response = Self::extract_result_with_error_check(
998 "get_compression_signatures_for_owner",
999 result.error,
1000 result.result.map(|r| *r),
1001 )?;
1002 if api_response.context.slot < config.slot {
1003 return Err(IndexerError::IndexerNotSyncedToSlot);
1004 }
1005
1006 let signatures = api_response
1007 .value
1008 .items
1009 .iter()
1010 .map(SignatureWithMetadata::try_from)
1011 .collect::<Result<Vec<SignatureWithMetadata>, IndexerError>>()?;
1012
1013 let cursor = api_response.value.cursor;
1014
1015 Ok(Response {
1016 context: Context {
1017 slot: api_response.context.slot,
1018 },
1019 value: ItemsWithCursor {
1020 items: signatures,
1021 cursor,
1022 },
1023 })
1024 })
1025 .await
1026 }
1027
1028 async fn get_compression_signatures_for_token_owner(
1029 &self,
1030 owner: &Pubkey,
1031 options: Option<PaginatedOptions>,
1032 config: Option<IndexerRpcConfig>,
1033 ) -> Result<Response<ItemsWithCursor<SignatureWithMetadata>>, IndexerError> {
1034 let config = config.unwrap_or_default();
1035 self.retry(config.retry_config, || async {
1036 let request = photon_api::models::GetCompressionSignaturesForTokenOwnerPostRequest {
1037 params: Box::new(
1038 photon_api::models::GetCompressionSignaturesForOwnerPostRequestParams {
1039 owner: owner.to_string(),
1040 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
1041 limit: options.as_ref().and_then(|o| o.limit),
1042 },
1043 ),
1044 ..Default::default()
1045 };
1046
1047 let result =
1048 photon_api::apis::default_api::get_compression_signatures_for_token_owner_post(
1049 &self.configuration,
1050 request,
1051 )
1052 .await?;
1053
1054 let api_response = Self::extract_result_with_error_check(
1055 "get_compression_signatures_for_token_owner",
1056 result.error,
1057 result.result.map(|r| *r),
1058 )?;
1059 if api_response.context.slot < config.slot {
1060 return Err(IndexerError::IndexerNotSyncedToSlot);
1061 }
1062
1063 let signatures = api_response
1064 .value
1065 .items
1066 .iter()
1067 .map(SignatureWithMetadata::try_from)
1068 .collect::<Result<Vec<SignatureWithMetadata>, IndexerError>>()?;
1069
1070 let cursor = api_response.value.cursor;
1071
1072 Ok(Response {
1073 context: Context {
1074 slot: api_response.context.slot,
1075 },
1076 value: ItemsWithCursor {
1077 items: signatures,
1078 cursor,
1079 },
1080 })
1081 })
1082 .await
1083 }
1084
1085 async fn get_indexer_health(&self, config: Option<RetryConfig>) -> Result<bool, IndexerError> {
1086 let config = config.unwrap_or_default();
1087 self.retry(config, || async {
1088 let request = photon_api::models::GetIndexerHealthPostRequest {
1089 ..Default::default()
1090 };
1091
1092 let result = photon_api::apis::default_api::get_indexer_health_post(
1093 &self.configuration,
1094 request,
1095 )
1096 .await?;
1097
1098 let _api_response = Self::extract_result_with_error_check(
1099 "get_indexer_health",
1100 result.error,
1101 result.result,
1102 )?;
1103
1104 Ok(true)
1105 })
1106 .await
1107 }
1108
1109 async fn get_indexer_slot(&self, config: Option<RetryConfig>) -> Result<u64, IndexerError> {
1110 let config = config.unwrap_or_default();
1111 self.retry(config, || async {
1112 let request = photon_api::models::GetIndexerSlotPostRequest {
1113 ..Default::default()
1114 };
1115
1116 let result =
1117 photon_api::apis::default_api::get_indexer_slot_post(&self.configuration, request)
1118 .await?;
1119
1120 let result = Self::extract_result_with_error_check(
1121 "get_indexer_slot",
1122 result.error,
1123 result.result,
1124 )?;
1125 Ok(result)
1126 })
1127 .await
1128 }
1129
1130 async fn get_multiple_compressed_account_proofs(
1131 &self,
1132 hashes: Vec<[u8; 32]>,
1133 config: Option<IndexerRpcConfig>,
1134 ) -> Result<Response<Items<MerkleProof>>, IndexerError> {
1135 let config = config.unwrap_or_default();
1136 self.retry(config.retry_config, || async {
1137 let hashes_for_async = hashes.clone();
1138
1139 let request: photon_api::models::GetMultipleCompressedAccountProofsPostRequest =
1140 photon_api::models::GetMultipleCompressedAccountProofsPostRequest {
1141 params: hashes_for_async
1142 .into_iter()
1143 .map(|hash| bs58::encode(hash).into_string())
1144 .collect(),
1145 ..Default::default()
1146 };
1147
1148 let result =
1149 photon_api::apis::default_api::get_multiple_compressed_account_proofs_post(
1150 &self.configuration,
1151 request,
1152 )
1153 .await?;
1154
1155 if let Some(error) = &result.error {
1156 let error_msg = error.message.as_deref().unwrap_or("Unknown error");
1157 let error_code = error.code.unwrap_or(0);
1158 tracing::error!("API returned error: {}", error_msg);
1159 return Err(IndexerError::PhotonError {
1160 context: "get_multiple_compressed_account_proofs".to_string(),
1161 message: format!("API Error (code {}): {}", error_code, error_msg),
1162 });
1163 }
1164
1165 let photon_proofs = result.result.ok_or_else(|| {
1166 IndexerError::missing_result(
1167 "get_multiple_new_address_proofs",
1168 "No result returned from Photon API",
1169 )
1170 })?;
1171 if photon_proofs.context.slot < config.slot {
1172 return Err(IndexerError::IndexerNotSyncedToSlot);
1173 }
1174
1175 let proofs = photon_proofs
1176 .value
1177 .iter()
1178 .map(|x| {
1179 let mut proof_vec = x.proof.clone();
1180 proof_vec.truncate(proof_vec.len() - 10); let proof = proof_vec
1183 .iter()
1184 .map(|x| Hash::from_base58(x))
1185 .collect::<Result<Vec<[u8; 32]>, IndexerError>>()
1186 .map_err(|e| IndexerError::Base58DecodeError {
1187 field: "proof".to_string(),
1188 message: e.to_string(),
1189 })?;
1190
1191 Ok(MerkleProof {
1192 hash: <[u8; 32] as Base58Conversions>::from_base58(&x.hash)?,
1193 leaf_index: x.leaf_index,
1194 merkle_tree: Pubkey::from_str_const(x.merkle_tree.as_str()),
1195 proof,
1196 root_seq: x.root_seq,
1197 root: <[u8; 32] as Base58Conversions>::from_base58(&x.root)?,
1198 })
1199 })
1200 .collect::<Result<Vec<MerkleProof>, IndexerError>>()?;
1201
1202 Ok(Response {
1203 context: Context {
1204 slot: photon_proofs.context.slot,
1205 },
1206 value: Items { items: proofs },
1207 })
1208 })
1209 .await
1210 }
1211
1212 async fn get_multiple_compressed_accounts(
1213 &self,
1214 addresses: Option<Vec<Address>>,
1215 hashes: Option<Vec<Hash>>,
1216 config: Option<IndexerRpcConfig>,
1217 ) -> Result<Response<Items<Option<CompressedAccount>>>, IndexerError> {
1218 let config = config.unwrap_or_default();
1219 self.retry(config.retry_config, || async {
1220 let hashes = hashes.clone();
1221 let addresses = addresses.clone();
1222 let request = photon_api::models::GetMultipleCompressedAccountsPostRequest {
1223 params: Box::new(
1224 photon_api::models::GetMultipleCompressedAccountsPostRequestParams {
1225 addresses: addresses.map(|x| x.iter().map(|x| x.to_base58()).collect()),
1226 hashes: hashes.map(|x| x.iter().map(|x| x.to_base58()).collect()),
1227 },
1228 ),
1229 ..Default::default()
1230 };
1231
1232 let result = photon_api::apis::default_api::get_multiple_compressed_accounts_post(
1233 &self.configuration,
1234 request,
1235 )
1236 .await?;
1237
1238 let api_response = Self::extract_result_with_error_check(
1239 "get_multiple_compressed_accounts",
1240 result.error,
1241 result.result.map(|r| *r),
1242 )?;
1243 if api_response.context.slot < config.slot {
1244 return Err(IndexerError::IndexerNotSyncedToSlot);
1245 }
1246 let accounts = api_response
1247 .value
1248 .items
1249 .iter()
1250 .map(|account_opt| match account_opt {
1251 Some(account) => CompressedAccount::try_from(account).map(Some),
1252 None => Ok(None),
1253 })
1254 .collect::<Result<Vec<Option<CompressedAccount>>, IndexerError>>()?;
1255
1256 Ok(Response {
1257 context: Context {
1258 slot: api_response.context.slot,
1259 },
1260 value: Items { items: accounts },
1261 })
1262 })
1263 .await
1264 }
1265
1266 async fn get_multiple_new_address_proofs(
1267 &self,
1268 merkle_tree_pubkey: [u8; 32],
1269 addresses: Vec<[u8; 32]>,
1270 config: Option<IndexerRpcConfig>,
1271 ) -> Result<Response<Items<NewAddressProofWithContext>>, IndexerError> {
1272 let config = config.unwrap_or_default();
1273 self.retry(config.retry_config, || async {
1274 let params: Vec<photon_api::models::address_with_tree::AddressWithTree> = addresses
1275 .iter()
1276 .map(|x| photon_api::models::address_with_tree::AddressWithTree {
1277 address: bs58::encode(x).into_string(),
1278 tree: bs58::encode(&merkle_tree_pubkey).into_string(),
1279 })
1280 .collect();
1281
1282 let request = photon_api::models::GetMultipleNewAddressProofsV2PostRequest {
1283 params,
1284 ..Default::default()
1285 };
1286
1287 let result = photon_api::apis::default_api::get_multiple_new_address_proofs_v2_post(
1288 &self.configuration,
1289 request,
1290 )
1291 .await;
1292
1293 let result = result?;
1294
1295 let api_response = match Self::extract_result_with_error_check(
1296 "get_multiple_new_address_proofs",
1297 result.error,
1298 result.result.map(|r| *r),
1299 ) {
1300 Ok(proofs) => proofs,
1301 Err(e) => {
1302 error!("Failed to extract proofs: {:?}", e);
1303 return Err(e);
1304 }
1305 };
1306 if api_response.context.slot < config.slot {
1307 return Err(IndexerError::IndexerNotSyncedToSlot);
1308 }
1309 let photon_proofs = api_response.value;
1310 let mut proofs = Vec::new();
1311 for photon_proof in photon_proofs {
1312 let tree_pubkey = Hash::from_base58(&photon_proof.merkle_tree).map_err(|e| {
1313 IndexerError::Base58DecodeError {
1314 field: "merkle_tree".to_string(),
1315 message: e.to_string(),
1316 }
1317 })?;
1318
1319 let low_address_value = Hash::from_base58(&photon_proof.lower_range_address)
1320 .map_err(|e| IndexerError::Base58DecodeError {
1321 field: "lower_range_address".to_string(),
1322 message: e.to_string(),
1323 })?;
1324
1325 let next_address_value = Hash::from_base58(&photon_proof.higher_range_address)
1326 .map_err(|e| IndexerError::Base58DecodeError {
1327 field: "higher_range_address".to_string(),
1328 message: e.to_string(),
1329 })?;
1330
1331 let mut proof_vec: Vec<[u8; 32]> = photon_proof
1332 .proof
1333 .iter()
1334 .map(|x: &String| Hash::from_base58(x))
1335 .collect::<Result<Vec<[u8; 32]>, IndexerError>>()?;
1336
1337 proof_vec.truncate(proof_vec.len() - 10); let mut proof_arr = [[0u8; 32]; 16];
1339 proof_arr.copy_from_slice(&proof_vec);
1340
1341 let root = Hash::from_base58(&photon_proof.root).map_err(|e| {
1342 IndexerError::Base58DecodeError {
1343 field: "root".to_string(),
1344 message: e.to_string(),
1345 }
1346 })?;
1347
1348 let proof = NewAddressProofWithContext {
1349 merkle_tree: tree_pubkey.into(),
1350 low_address_index: photon_proof.low_element_leaf_index,
1351 low_address_value,
1352 low_address_next_index: photon_proof.next_index,
1353 low_address_next_value: next_address_value,
1354 low_address_proof: proof_arr.to_vec(),
1355 root,
1356 root_seq: photon_proof.root_seq,
1357 new_low_element: None,
1358 new_element: None,
1359 new_element_next_value: None,
1360 };
1361 proofs.push(proof);
1362 }
1363
1364 Ok(Response {
1365 context: Context {
1366 slot: api_response.context.slot,
1367 },
1368 value: Items { items: proofs },
1369 })
1370 })
1371 .await
1372 }
1373
1374 async fn get_validity_proof(
1375 &self,
1376 hashes: Vec<Hash>,
1377 new_addresses_with_trees: Vec<AddressWithTree>,
1378 config: Option<IndexerRpcConfig>,
1379 ) -> Result<Response<super::types::ValidityProofWithContext>, IndexerError> {
1380 let config = config.unwrap_or_default();
1381 self.retry(config.retry_config, || async {
1382 #[cfg(feature = "v2")]
1383 {
1384 let request = photon_api::models::GetValidityProofV2PostRequest {
1385 params: Box::new(photon_api::models::GetValidityProofPostRequestParams {
1386 hashes: Some(hashes.iter().map(|x| x.to_base58()).collect()),
1387 new_addresses_with_trees: Some(
1388 new_addresses_with_trees
1389 .iter()
1390 .map(|x| photon_api::models::AddressWithTree {
1391 address: x.address.to_base58(),
1392 tree: x.tree.to_string(),
1393 })
1394 .collect(),
1395 ),
1396 }),
1397 ..Default::default()
1398 };
1399
1400 let result = photon_api::apis::default_api::get_validity_proof_v2_post(
1401 &self.configuration,
1402 request,
1403 )
1404 .await?;
1405
1406 let api_response = Self::extract_result_with_error_check(
1407 "get_validity_proof_v2",
1408 result.error,
1409 result.result.map(|r| *r),
1410 )?;
1411 if api_response.context.slot < config.slot {
1412 return Err(IndexerError::IndexerNotSyncedToSlot);
1413 }
1414 let validity_proof =
1415 super::types::ValidityProofWithContext::from_api_model_v2(*api_response.value)?;
1416
1417 Ok(Response {
1418 context: Context {
1419 slot: api_response.context.slot,
1420 },
1421 value: validity_proof,
1422 })
1423 }
1424 #[cfg(not(feature = "v2"))]
1425 {
1426 let request = photon_api::models::GetValidityProofPostRequest {
1427 params: Box::new(photon_api::models::GetValidityProofPostRequestParams {
1428 hashes: Some(hashes.iter().map(|x| x.to_base58()).collect()),
1429 new_addresses_with_trees: Some(
1430 new_addresses_with_trees
1431 .iter()
1432 .map(|x| photon_api::models::AddressWithTree {
1433 address: x.address.to_base58(),
1434 tree: x.tree.to_string(),
1435 })
1436 .collect(),
1437 ),
1438 }),
1439 ..Default::default()
1440 };
1441
1442 let result = photon_api::apis::default_api::get_validity_proof_post(
1443 &self.configuration,
1444 request,
1445 )
1446 .await?;
1447
1448 let api_response = Self::extract_result_with_error_check(
1449 "get_validity_proof",
1450 result.error,
1451 result.result.map(|r| *r),
1452 )?;
1453 if api_response.context.slot < config.slot {
1454 return Err(IndexerError::IndexerNotSyncedToSlot);
1455 }
1456 let validity_proof = super::types::ValidityProofWithContext::from_api_model(
1457 *api_response.value,
1458 hashes.len(),
1459 )?;
1460
1461 Ok(Response {
1462 context: Context {
1463 slot: api_response.context.slot,
1464 },
1465 value: validity_proof,
1466 })
1467 }
1468 })
1469 .await
1470 }
1471
1472 async fn get_address_queue_with_proofs(
1473 &mut self,
1474 _merkle_tree_pubkey: &Pubkey,
1475 _zkp_batch_size: u16,
1476 _start_offset: Option<u64>,
1477 _config: Option<IndexerRpcConfig>,
1478 ) -> Result<Response<BatchAddressUpdateIndexerResponse>, IndexerError> {
1479 #[cfg(not(feature = "v2"))]
1480 unimplemented!("get_address_queue_with_proofs");
1481 #[cfg(feature = "v2")]
1482 {
1483 let merkle_tree_pubkey = _merkle_tree_pubkey;
1484 let limit = _zkp_batch_size;
1485 let start_queue_index = _start_offset;
1486 let config = _config.unwrap_or_default();
1487 self.retry(config.retry_config, || async {
1488 let merkle_tree = Hash::from_bytes(merkle_tree_pubkey.to_bytes().as_ref())?;
1489 let request = photon_api::models::GetBatchAddressUpdateInfoPostRequest {
1490 params: Box::new(
1491 photon_api::models::GetBatchAddressUpdateInfoPostRequestParams {
1492 limit,
1493 start_queue_index,
1494 tree: merkle_tree.to_base58(),
1495 },
1496 ),
1497 ..Default::default()
1498 };
1499
1500 let result = photon_api::apis::default_api::get_batch_address_update_info_post(
1501 &self.configuration,
1502 request,
1503 )
1504 .await?;
1505
1506 let api_response = Self::extract_result_with_error_check(
1507 "get_batch_address_update_info",
1508 result.error,
1509 result.result.map(|r| *r),
1510 )?;
1511 if api_response.context.slot < config.slot {
1512 return Err(IndexerError::IndexerNotSyncedToSlot);
1513 }
1514
1515 let addresses = api_response
1516 .addresses
1517 .iter()
1518 .map(|x| crate::indexer::AddressQueueIndex {
1519 address: Hash::from_base58(x.address.clone().as_ref()).unwrap(),
1520 queue_index: x.queue_index,
1521 })
1522 .collect();
1523
1524 let mut proofs: Vec<NewAddressProofWithContext> = vec![];
1525 for proof in api_response.non_inclusion_proofs {
1526 let proof = NewAddressProofWithContext {
1527 merkle_tree: *merkle_tree_pubkey,
1528 low_address_index: proof.low_element_leaf_index,
1529 low_address_value: Hash::from_base58(
1530 proof.lower_range_address.clone().as_ref(),
1531 )
1532 .unwrap(),
1533 low_address_next_index: proof.next_index,
1534 low_address_next_value: Hash::from_base58(
1535 proof.higher_range_address.clone().as_ref(),
1536 )
1537 .unwrap(),
1538 low_address_proof: proof
1539 .proof
1540 .iter()
1541 .map(|x| Hash::from_base58(x.clone().as_ref()).unwrap())
1542 .collect(),
1543 root: Hash::from_base58(proof.root.clone().as_ref()).unwrap(),
1544 root_seq: proof.root_seq,
1545
1546 new_low_element: None,
1547 new_element: None,
1548 new_element_next_value: None,
1549 };
1550 proofs.push(proof);
1551 }
1552
1553 let subtrees = api_response
1554 .subtrees
1555 .iter()
1556 .map(|x| {
1557 let mut arr = [0u8; 32];
1558 arr.copy_from_slice(x.as_slice());
1559 arr
1560 })
1561 .collect::<Vec<_>>();
1562
1563 let result = BatchAddressUpdateIndexerResponse {
1564 batch_start_index: api_response.start_index,
1565 addresses,
1566 non_inclusion_proofs: proofs,
1567 subtrees,
1568 };
1569 Ok(Response {
1570 context: Context {
1571 slot: api_response.context.slot,
1572 },
1573 value: result,
1574 })
1575 })
1576 .await
1577 }
1578 }
1579
1580 async fn get_queue_elements(
1581 &mut self,
1582 _pubkey: [u8; 32],
1583 _output_queue_start_index: Option<u64>,
1584 _output_queue_limit: Option<u16>,
1585 _input_queue_start_index: Option<u64>,
1586 _input_queue_limit: Option<u16>,
1587 _config: Option<IndexerRpcConfig>,
1588 ) -> Result<Response<QueueElementsResult>, IndexerError> {
1589 #[cfg(not(feature = "v2"))]
1590 unimplemented!("get_queue_elements");
1591 #[cfg(feature = "v2")]
1592 {
1593 use super::MerkleProofWithContext;
1594 let pubkey = _pubkey;
1595 let output_queue_start_index = _output_queue_start_index;
1596 let output_queue_limit = _output_queue_limit;
1597 let input_queue_start_index = _input_queue_start_index;
1598 let input_queue_limit = _input_queue_limit;
1599 let config = _config.unwrap_or_default();
1600 self.retry(config.retry_config, || async {
1601 let request: photon_api::models::GetQueueElementsPostRequest =
1602 photon_api::models::GetQueueElementsPostRequest {
1603 params: Box::from(photon_api::models::GetQueueElementsPostRequestParams {
1604 tree: bs58::encode(pubkey).into_string(),
1605 output_queue_start_index,
1606 output_queue_limit,
1607 input_queue_start_index,
1608 input_queue_limit,
1609 }),
1610 ..Default::default()
1611 };
1612
1613 let result = photon_api::apis::default_api::get_queue_elements_post(
1614 &self.configuration,
1615 request,
1616 )
1617 .await;
1618 let result: Result<Response<QueueElementsResult>, IndexerError> = match result {
1619 Ok(api_response) => match api_response.result {
1620 Some(api_result) => {
1621 if api_result.context.slot < config.slot {
1622 return Err(IndexerError::IndexerNotSyncedToSlot);
1623 }
1624
1625 let output_queue_elements = api_result
1627 .output_queue_elements
1628 .map(|elements| {
1629 elements
1630 .iter()
1631 .map(|x| -> Result<_, IndexerError> {
1632 let proof: Vec<Hash> = x
1633 .proof
1634 .iter()
1635 .map(|p| Hash::from_base58(p))
1636 .collect::<Result<Vec<_>, _>>()?;
1637 let root = Hash::from_base58(&x.root)?;
1638 let leaf = Hash::from_base58(&x.leaf)?;
1639 let merkle_tree = Hash::from_base58(&x.tree)?;
1640 let tx_hash = x
1641 .tx_hash
1642 .as_ref()
1643 .map(|h| Hash::from_base58(h))
1644 .transpose()?;
1645 let account_hash = Hash::from_base58(&x.account_hash)?;
1646
1647 Ok(MerkleProofWithContext {
1648 proof,
1649 root,
1650 leaf_index: x.leaf_index,
1651 leaf,
1652 merkle_tree,
1653 root_seq: x.root_seq,
1654 tx_hash,
1655 account_hash,
1656 })
1657 })
1658 .collect::<Result<Vec<_>, _>>()
1659 })
1660 .transpose()?;
1661
1662 let input_queue_elements = api_result
1664 .input_queue_elements
1665 .map(|elements| {
1666 elements
1667 .iter()
1668 .map(|x| -> Result<_, IndexerError> {
1669 let proof: Vec<Hash> = x
1670 .proof
1671 .iter()
1672 .map(|p| Hash::from_base58(p))
1673 .collect::<Result<Vec<_>, _>>()?;
1674 let root = Hash::from_base58(&x.root)?;
1675 let leaf = Hash::from_base58(&x.leaf)?;
1676 let merkle_tree = Hash::from_base58(&x.tree)?;
1677 let tx_hash = x
1678 .tx_hash
1679 .as_ref()
1680 .map(|h| Hash::from_base58(h))
1681 .transpose()?;
1682 let account_hash = Hash::from_base58(&x.account_hash)?;
1683
1684 Ok(MerkleProofWithContext {
1685 proof,
1686 root,
1687 leaf_index: x.leaf_index,
1688 leaf,
1689 merkle_tree,
1690 root_seq: x.root_seq,
1691 tx_hash,
1692 account_hash,
1693 })
1694 })
1695 .collect::<Result<Vec<_>, _>>()
1696 })
1697 .transpose()?;
1698
1699 Ok(Response {
1700 context: Context {
1701 slot: api_result.context.slot,
1702 },
1703 value: QueueElementsResult {
1704 output_queue_elements,
1705 output_queue_index: api_result.output_queue_index,
1706 input_queue_elements,
1707 input_queue_index: api_result.input_queue_index,
1708 },
1709 })
1710 }
1711 None => {
1712 let error =
1713 api_response
1714 .error
1715 .ok_or_else(|| IndexerError::PhotonError {
1716 context: "get_queue_elements".to_string(),
1717 message: "No error details provided".to_string(),
1718 })?;
1719
1720 Err(IndexerError::PhotonError {
1721 context: "get_queue_elements".to_string(),
1722 message: error
1723 .message
1724 .unwrap_or_else(|| "Unknown error".to_string()),
1725 })
1726 }
1727 },
1728 Err(e) => Err(IndexerError::PhotonError {
1729 context: "get_queue_elements".to_string(),
1730 message: e.to_string(),
1731 }),
1732 };
1733
1734 result
1735 })
1736 .await
1737 }
1738 }
1739
1740 async fn get_queue_info(
1741 &self,
1742 config: Option<IndexerRpcConfig>,
1743 ) -> Result<Response<super::QueueInfoResult>, IndexerError> {
1744 let config = config.unwrap_or_default();
1745 self.retry(config.retry_config, || async {
1746 let request = photon_api::models::GetQueueInfoPostRequest {
1747 ..Default::default()
1748 };
1749
1750 let result =
1751 photon_api::apis::default_api::get_queue_info_post(&self.configuration, request)
1752 .await?;
1753
1754 let api_response = Self::extract_result_with_error_check(
1755 "get_queue_info",
1756 result.error.map(|e| {
1757 Box::new(
1758 photon_api::models::GetBatchAddressUpdateInfoPost200ResponseError {
1759 code: Some(e.code),
1760 message: Some(e.message),
1761 },
1762 )
1763 }),
1764 result.result,
1765 )?;
1766
1767 if api_response.slot < config.slot {
1768 return Err(IndexerError::IndexerNotSyncedToSlot);
1769 }
1770
1771 let queues = api_response
1772 .queues
1773 .iter()
1774 .map(|q| -> Result<_, IndexerError> {
1775 let tree_bytes = super::base58::decode_base58_to_fixed_array(&q.tree)?;
1776 let queue_bytes = super::base58::decode_base58_to_fixed_array(&q.queue)?;
1777
1778 Ok(super::QueueInfo {
1779 tree: Pubkey::new_from_array(tree_bytes),
1780 queue: Pubkey::new_from_array(queue_bytes),
1781 queue_type: q.queue_type,
1782 queue_size: q.queue_size,
1783 })
1784 })
1785 .collect::<Result<Vec<_>, _>>()?;
1786
1787 Ok(Response {
1788 context: Context {
1789 slot: api_response.slot,
1790 },
1791 value: super::QueueInfoResult {
1792 queues,
1793 slot: api_response.slot,
1794 },
1795 })
1796 })
1797 .await
1798 }
1799
1800 async fn get_queue_elements_v2(
1801 &mut self,
1802 merkle_tree_pubkey: [u8; 32],
1803 options: super::QueueElementsV2Options,
1804 config: Option<IndexerRpcConfig>,
1805 ) -> Result<Response<super::QueueElementsV2Result>, IndexerError> {
1806 let config = config.unwrap_or_default();
1807 self.retry(config.retry_config, || async {
1808 let params = photon_api::models::GetQueueElementsV2PostRequestParams {
1809 tree: bs58::encode(merkle_tree_pubkey).into_string(),
1810 output_queue_start_index: options.output_queue_start_index,
1811 output_queue_limit: options.output_queue_limit,
1812 output_queue_zkp_batch_size: options.output_queue_zkp_batch_size,
1813 input_queue_start_index: options.input_queue_start_index,
1814 input_queue_limit: options.input_queue_limit,
1815 input_queue_zkp_batch_size: options.input_queue_zkp_batch_size,
1816 address_queue_start_index: options.address_queue_start_index,
1817 address_queue_limit: options.address_queue_limit,
1818 address_queue_zkp_batch_size: options.address_queue_zkp_batch_size,
1819 };
1820
1821 let request = photon_api::models::GetQueueElementsV2PostRequest {
1822 params: Box::new(params),
1823 ..Default::default()
1824 };
1825
1826 let result = photon_api::apis::default_api::get_queue_elements_v2_post(
1827 &self.configuration,
1828 request,
1829 )
1830 .await?;
1831
1832 let api_response = Self::extract_result_with_error_check(
1833 "get_queue_elements_v2",
1834 result.error,
1835 result.result.map(|r| *r),
1836 )?;
1837
1838 if api_response.context.slot < config.slot {
1839 return Err(IndexerError::IndexerNotSyncedToSlot);
1840 }
1841
1842 let state_queue = if let Some(state) = api_response.state_queue {
1843 let node_hashes: Result<Vec<[u8; 32]>, IndexerError> = state
1844 .node_hashes
1845 .iter()
1846 .map(|h| Hash::from_base58(h))
1847 .collect();
1848 let initial_root = Hash::from_base58(&state.initial_root)?;
1849
1850 let output_queue = if let Some(output) = state.output_queue {
1851 let account_hashes: Result<Vec<[u8; 32]>, IndexerError> = output
1852 .account_hashes
1853 .iter()
1854 .map(|h| Hash::from_base58(h))
1855 .collect();
1856 let old_leaves: Result<Vec<[u8; 32]>, IndexerError> =
1857 output.leaves.iter().map(|h| Hash::from_base58(h)).collect();
1858 let leaves_hash_chains: Result<Vec<[u8; 32]>, IndexerError> = output
1859 .leaves_hash_chains
1860 .iter()
1861 .map(|h| Hash::from_base58(h))
1862 .collect();
1863
1864 Some(super::OutputQueueDataV2 {
1865 leaf_indices: output.leaf_indices,
1866 account_hashes: account_hashes?,
1867 old_leaves: old_leaves?,
1868 first_queue_index: output.first_queue_index,
1869 next_index: output.next_index,
1870 leaves_hash_chains: leaves_hash_chains?,
1871 })
1872 } else {
1873 None
1874 };
1875
1876 let input_queue = if let Some(input) = state.input_queue {
1877 let account_hashes: Result<Vec<[u8; 32]>, IndexerError> = input
1878 .account_hashes
1879 .iter()
1880 .map(|h| Hash::from_base58(h))
1881 .collect();
1882 let current_leaves: Result<Vec<[u8; 32]>, IndexerError> =
1883 input.leaves.iter().map(|h| Hash::from_base58(h)).collect();
1884 let tx_hashes: Result<Vec<[u8; 32]>, IndexerError> = input
1885 .tx_hashes
1886 .iter()
1887 .map(|h| Hash::from_base58(h))
1888 .collect();
1889 let nullifiers: Result<Vec<[u8; 32]>, IndexerError> = input
1890 .nullifiers
1891 .iter()
1892 .map(|h| Hash::from_base58(h))
1893 .collect();
1894 let leaves_hash_chains: Result<Vec<[u8; 32]>, IndexerError> = input
1895 .leaves_hash_chains
1896 .iter()
1897 .map(|h| Hash::from_base58(h))
1898 .collect();
1899
1900 Some(super::InputQueueDataV2 {
1901 leaf_indices: input.leaf_indices,
1902 account_hashes: account_hashes?,
1903 current_leaves: current_leaves?,
1904 tx_hashes: tx_hashes?,
1905 nullifiers: nullifiers?,
1906 first_queue_index: input.first_queue_index,
1907 leaves_hash_chains: leaves_hash_chains?,
1908 })
1909 } else {
1910 None
1911 };
1912
1913 Some(super::StateQueueDataV2 {
1914 nodes: state.nodes,
1915 node_hashes: node_hashes?,
1916 initial_root,
1917 root_seq: state.root_seq,
1918 output_queue,
1919 input_queue,
1920 })
1921 } else {
1922 None
1923 };
1924
1925 let address_queue = if let Some(address) = api_response.address_queue {
1927 let addresses: Result<Vec<[u8; 32]>, IndexerError> = address
1928 .addresses
1929 .iter()
1930 .map(|h| Hash::from_base58(h))
1931 .collect();
1932
1933 let low_element_values: Result<Vec<[u8; 32]>, IndexerError> = address
1934 .low_element_values
1935 .iter()
1936 .map(|h| Hash::from_base58(h))
1937 .collect();
1938
1939 let low_element_next_values: Result<Vec<[u8; 32]>, IndexerError> = address
1940 .low_element_next_values
1941 .iter()
1942 .map(|h| Hash::from_base58(h))
1943 .collect();
1944
1945 let low_element_proofs: Result<Vec<Vec<[u8; 32]>>, IndexerError> = address
1946 .low_element_proofs
1947 .iter()
1948 .map(|proof_vec| {
1949 proof_vec
1950 .iter()
1951 .map(|h| Hash::from_base58(h))
1952 .collect::<Result<Vec<[u8; 32]>, IndexerError>>()
1953 })
1954 .collect();
1955
1956 let node_hashes: Result<Vec<[u8; 32]>, IndexerError> = address
1957 .node_hashes
1958 .iter()
1959 .map(|h| Hash::from_base58(h))
1960 .collect();
1961
1962 let initial_root = Hash::from_base58(&address.initial_root)?;
1963
1964 Some(super::AddressQueueDataV2 {
1965 addresses: addresses?,
1966 low_element_values: low_element_values?,
1967 low_element_next_values: low_element_next_values?,
1968 low_element_indices: address.low_element_indices,
1969 low_element_next_indices: address.low_element_next_indices,
1970 low_element_proofs: low_element_proofs?,
1971 nodes: address.nodes,
1972 node_hashes: node_hashes?,
1973 initial_root,
1974 first_queue_index: address.start_index,
1975 })
1976 } else {
1977 None
1978 };
1979
1980 Ok(Response {
1981 context: Context {
1982 slot: api_response.context.slot,
1983 },
1984 value: super::QueueElementsV2Result {
1985 state_queue,
1986 address_queue,
1987 },
1988 })
1989 })
1990 .await
1991 }
1992
1993 async fn get_subtrees(
1994 &self,
1995 _merkle_tree_pubkey: [u8; 32],
1996 _config: Option<IndexerRpcConfig>,
1997 ) -> Result<Response<Items<[u8; 32]>>, IndexerError> {
1998 #[cfg(not(feature = "v2"))]
1999 unimplemented!();
2000 #[cfg(feature = "v2")]
2001 {
2002 todo!();
2003 }
2004 }
2005}