1use std::{fmt::Debug, time::Duration};
2
3use async_trait::async_trait;
4use bs58;
5use light_merkle_tree_metadata::QueueType;
6use photon_api::{
7 apis::configuration::{ApiKey, Configuration},
8 models::GetCompressedAccountsByOwnerPostRequestParams,
9};
10use solana_pubkey::Pubkey;
11use tracing::{debug, error, warn};
12
13use super::{
14 types::{CompressedAccount, OwnerBalance, SignatureWithMetadata, TokenAccount, TokenBalance},
15 BatchAddressUpdateIndexerResponse, MerkleProofWithContext,
16};
17use crate::indexer::{
18 base58::Base58Conversions,
19 config::RetryConfig,
20 response::{Context, Items, ItemsWithCursor, Response},
21 Address, AddressWithTree, GetCompressedAccountsByOwnerConfig,
22 GetCompressedTokenAccountsByOwnerOrDelegateOptions, Hash, Indexer, IndexerError,
23 IndexerRpcConfig, MerkleProof, NewAddressProofWithContext, PaginatedOptions,
24};
25
26pub struct PhotonIndexer {
28 configuration: Configuration,
29}
30
31impl PhotonIndexer {
32 pub fn default_path() -> String {
33 "http://127.0.0.1:8784".to_string()
34 }
35}
36
37impl PhotonIndexer {
38 async fn retry<F, Fut, T>(
39 &self,
40 config: RetryConfig,
41 mut operation: F,
42 ) -> Result<T, IndexerError>
43 where
44 F: FnMut() -> Fut,
45 Fut: std::future::Future<Output = Result<T, IndexerError>>,
46 {
47 let max_retries = config.num_retries;
48 let mut attempts = 0;
49 let mut delay_ms = config.delay_ms;
50 let max_delay_ms = config.max_delay_ms;
51
52 loop {
53 attempts += 1;
54
55 debug!(
56 "Attempt {}/{}: No rate limiter configured",
57 attempts, max_retries
58 );
59
60 debug!("Attempt {}/{}: Executing operation", attempts, max_retries);
61 let result = operation().await;
62
63 match result {
64 Ok(value) => {
65 debug!("Attempt {}/{}: Operation succeeded.", attempts, max_retries);
66 return Ok(value);
67 }
68 Err(e) => {
69 let is_retryable = match &e {
70 IndexerError::ApiError(_) => {
71 warn!("API Error: {}", e);
72 true
73 }
74 IndexerError::PhotonError {
75 context: _,
76 message: _,
77 } => {
78 warn!("Operation failed, checking if retryable...");
79 true
80 }
81 IndexerError::IndexerNotSyncedToSlot => true,
82 IndexerError::Base58DecodeError { .. } => false,
83 IndexerError::AccountNotFound => false,
84 IndexerError::InvalidParameters(_) => false,
85 IndexerError::NotImplemented(_) => false,
86 _ => false,
87 };
88
89 if is_retryable && attempts < max_retries {
90 warn!(
91 "Attempt {}/{}: Operation failed. Retrying",
92 attempts, max_retries
93 );
94
95 tokio::time::sleep(Duration::from_millis(delay_ms)).await;
96 delay_ms = std::cmp::min(delay_ms * 2, max_delay_ms);
97 } else {
98 if is_retryable {
99 error!("Operation failed after max retries.");
100 } else {
101 error!("Operation failed with non-retryable error.");
102 }
103 return Err(e);
104 }
105 }
106 }
107 }
108 }
109}
110
111impl PhotonIndexer {
112 pub fn new(path: String, api_key: Option<String>) -> Self {
113 let configuration = Configuration {
114 base_path: path,
115 api_key: api_key.map(|key| ApiKey {
116 prefix: Some("api-key".to_string()),
117 key,
118 }),
119 ..Default::default()
120 };
121
122 PhotonIndexer { configuration }
123 }
124
125 fn extract_result<T>(context: &str, result: Option<T>) -> Result<T, IndexerError> {
126 result.ok_or_else(|| IndexerError::missing_result(context, "value not present"))
127 }
128
129 fn extract_result_with_error_check<T>(
130 context: &str,
131 error: Option<Box<photon_api::models::GetBatchAddressUpdateInfoPost200ResponseError>>,
132 result: Option<T>,
133 ) -> Result<T, IndexerError> {
134 if let Some(error) = error {
135 let error_message = error
136 .clone()
137 .message
138 .unwrap_or_else(|| format!("Unknown API error: {:?}", error).to_string());
139 return Err(IndexerError::ApiError(format!(
140 "API error in {} (code: {:?}): {}",
141 context, error.code, error_message
142 )));
143 }
144
145 Self::extract_result(context, result)
146 }
147
148 fn build_account_params(
149 &self,
150 address: Option<Address>,
151 hash: Option<Hash>,
152 ) -> Result<photon_api::models::GetCompressedAccountPostRequestParams, IndexerError> {
153 match (address, hash) {
154 (None, None) => Err(IndexerError::InvalidParameters(
155 "Either address or hash must be provided".to_string(),
156 )),
157 (Some(_), Some(_)) => Err(IndexerError::InvalidParameters(
158 "Only one of address or hash must be provided".to_string(),
159 )),
160 (address, hash) => Ok(photon_api::models::GetCompressedAccountPostRequestParams {
161 address: address.map(|x| x.to_base58()),
162 hash: hash.map(|x| x.to_base58()),
163 }),
164 }
165 }
166}
167
168impl Debug for PhotonIndexer {
169 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
170 f.debug_struct("PhotonIndexer")
171 .field("configuration", &self.configuration)
172 .finish()
173 }
174}
175
176#[async_trait]
177impl Indexer for PhotonIndexer {
178 async fn get_compressed_account(
179 &self,
180 address: Address,
181 config: Option<IndexerRpcConfig>,
182 ) -> Result<Response<CompressedAccount>, IndexerError> {
183 let config = config.unwrap_or_default();
184 self.retry(config.retry_config, || async {
185 let params = self.build_account_params(Some(address), None)?;
186 let request = photon_api::models::GetCompressedAccountPostRequest {
187 params: Box::new(params),
188 ..Default::default()
189 };
190
191 let result = photon_api::apis::default_api::get_compressed_account_post(
192 &self.configuration,
193 request,
194 )
195 .await?;
196 let api_response = Self::extract_result_with_error_check(
197 "get_compressed_account",
198 result.error,
199 result.result.map(|r| *r),
200 )?;
201 if api_response.context.slot < config.slot {
202 return Err(IndexerError::IndexerNotSyncedToSlot);
203 }
204 let account_data = api_response
205 .value
206 .ok_or(IndexerError::AccountNotFound)
207 .map(|boxed| *boxed)?;
208 let account = CompressedAccount::try_from(&account_data)?;
209
210 Ok(Response {
211 context: Context {
212 slot: api_response.context.slot,
213 },
214 value: account,
215 })
216 })
217 .await
218 }
219
220 async fn get_compressed_account_by_hash(
221 &self,
222 hash: Hash,
223 config: Option<IndexerRpcConfig>,
224 ) -> Result<Response<CompressedAccount>, IndexerError> {
225 let config = config.unwrap_or_default();
226 self.retry(config.retry_config, || async {
227 let params = self.build_account_params(None, Some(hash))?;
228 let request = photon_api::models::GetCompressedAccountPostRequest {
229 params: Box::new(params),
230 ..Default::default()
231 };
232
233 let result = photon_api::apis::default_api::get_compressed_account_post(
234 &self.configuration,
235 request,
236 )
237 .await?;
238 let api_response = Self::extract_result_with_error_check(
239 "get_compressed_account_by_hash",
240 result.error,
241 result.result.map(|r| *r),
242 )?;
243 if api_response.context.slot < config.slot {
244 return Err(IndexerError::IndexerNotSyncedToSlot);
245 }
246 let account_data = api_response
247 .value
248 .ok_or(IndexerError::AccountNotFound)
249 .map(|boxed| *boxed)?;
250 let account = CompressedAccount::try_from(&account_data)?;
251
252 Ok(Response {
253 context: Context {
254 slot: api_response.context.slot,
255 },
256 value: account,
257 })
258 })
259 .await
260 }
261
262 async fn get_compressed_accounts_by_owner(
263 &self,
264 owner: &Pubkey,
265 options: Option<GetCompressedAccountsByOwnerConfig>,
266 config: Option<IndexerRpcConfig>,
267 ) -> Result<Response<ItemsWithCursor<CompressedAccount>>, IndexerError> {
268 let config = config.unwrap_or_default();
269 self.retry(config.retry_config, || async {
270 #[cfg(feature = "v2")]
271 {
272 let request = photon_api::models::GetCompressedAccountsByOwnerV2PostRequest {
273 params: Box::from(GetCompressedAccountsByOwnerPostRequestParams {
274 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
275 data_slice: options.as_ref().and_then(|o| {
276 o.data_slice.as_ref().map(|ds| {
277 Box::new(photon_api::models::DataSlice {
278 length: ds.length as u32,
279 offset: ds.offset as u32,
280 })
281 })
282 }),
283 filters: options.as_ref().and_then(|o| o.filters_to_photon()),
284 limit: options.as_ref().and_then(|o| o.limit),
285 owner: owner.to_string(),
286 }),
287 ..Default::default()
288 };
289 let result =
290 photon_api::apis::default_api::get_compressed_accounts_by_owner_v2_post(
291 &self.configuration,
292 request,
293 )
294 .await?;
295 let response = result.result.ok_or(IndexerError::AccountNotFound)?;
296 if response.context.slot < config.slot {
297 return Err(IndexerError::IndexerNotSyncedToSlot);
298 }
299 let accounts: Result<Vec<_>, _> = response
300 .value
301 .items
302 .iter()
303 .map(CompressedAccount::try_from)
304 .collect();
305
306 let cursor = response.value.cursor;
307
308 Ok(Response {
309 context: Context {
310 slot: response.context.slot,
311 },
312 value: ItemsWithCursor {
313 items: accounts?,
314 cursor,
315 },
316 })
317 }
318 #[cfg(not(feature = "v2"))]
319 {
320 let request = photon_api::models::GetCompressedAccountsByOwnerPostRequest {
321 params: Box::from(GetCompressedAccountsByOwnerPostRequestParams {
322 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
323 data_slice: options.as_ref().and_then(|o| {
324 o.data_slice.as_ref().map(|ds| {
325 Box::new(photon_api::models::DataSlice {
326 length: ds.length as u32,
327 offset: ds.offset as u32,
328 })
329 })
330 }),
331 filters: options.as_ref().and_then(|o| o.filters_to_photon()),
332 limit: options.as_ref().and_then(|o| o.limit),
333 owner: owner.to_string(),
334 }),
335 ..Default::default()
336 };
337 let result = photon_api::apis::default_api::get_compressed_accounts_by_owner_post(
338 &self.configuration,
339 request,
340 )
341 .await?;
342 let response = result.result.ok_or(IndexerError::AccountNotFound)?;
343 if response.context.slot < config.slot {
344 return Err(IndexerError::IndexerNotSyncedToSlot);
345 }
346 let accounts: Result<Vec<_>, _> = response
347 .value
348 .items
349 .iter()
350 .map(CompressedAccount::try_from)
351 .collect();
352
353 let cursor = response.value.cursor;
354
355 Ok(Response {
356 context: Context {
357 slot: response.context.slot,
358 },
359 value: ItemsWithCursor {
360 items: accounts?,
361 cursor,
362 },
363 })
364 }
365 })
366 .await
367 }
368
369 async fn get_compressed_balance(
370 &self,
371 address: Option<Address>,
372 hash: Option<Hash>,
373 config: Option<IndexerRpcConfig>,
374 ) -> Result<Response<u64>, IndexerError> {
375 let config = config.unwrap_or_default();
376 self.retry(config.retry_config, || async {
377 let params = self.build_account_params(address, hash)?;
378 let request = photon_api::models::GetCompressedAccountBalancePostRequest {
379 params: Box::new(params),
380 ..Default::default()
381 };
382
383 let result = photon_api::apis::default_api::get_compressed_account_balance_post(
384 &self.configuration,
385 request,
386 )
387 .await?;
388
389 let api_response = Self::extract_result_with_error_check(
390 "get_compressed_account_balance",
391 result.error,
392 result.result.map(|r| *r),
393 )?;
394 if api_response.context.slot < config.slot {
395 return Err(IndexerError::IndexerNotSyncedToSlot);
396 }
397 Ok(Response {
398 context: Context {
399 slot: api_response.context.slot,
400 },
401 value: api_response.value,
402 })
403 })
404 .await
405 }
406
407 async fn get_compressed_balance_by_owner(
408 &self,
409 owner: &Pubkey,
410 config: Option<IndexerRpcConfig>,
411 ) -> Result<Response<u64>, IndexerError> {
412 let config = config.unwrap_or_default();
413 self.retry(config.retry_config, || async {
414 let request = photon_api::models::GetCompressedBalanceByOwnerPostRequest {
415 params: Box::new(
416 photon_api::models::GetCompressedBalanceByOwnerPostRequestParams {
417 owner: owner.to_string(),
418 },
419 ),
420 ..Default::default()
421 };
422
423 let result = photon_api::apis::default_api::get_compressed_balance_by_owner_post(
424 &self.configuration,
425 request,
426 )
427 .await?;
428
429 let api_response = Self::extract_result_with_error_check(
430 "get_compressed_balance_by_owner",
431 result.error,
432 result.result.map(|r| *r),
433 )?;
434 if api_response.context.slot < config.slot {
435 return Err(IndexerError::IndexerNotSyncedToSlot);
436 }
437 Ok(Response {
438 context: Context {
439 slot: api_response.context.slot,
440 },
441 value: api_response.value,
442 })
443 })
444 .await
445 }
446
447 async fn get_compressed_mint_token_holders(
448 &self,
449 mint: &Pubkey,
450 options: Option<PaginatedOptions>,
451 config: Option<IndexerRpcConfig>,
452 ) -> Result<Response<ItemsWithCursor<OwnerBalance>>, IndexerError> {
453 let config = config.unwrap_or_default();
454 self.retry(config.retry_config, || async {
455 let request = photon_api::models::GetCompressedMintTokenHoldersPostRequest {
456 params: Box::new(
457 photon_api::models::GetCompressedMintTokenHoldersPostRequestParams {
458 mint: mint.to_string(),
459 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
460 limit: options.as_ref().and_then(|o| o.limit),
461 },
462 ),
463 ..Default::default()
464 };
465
466 let result = photon_api::apis::default_api::get_compressed_mint_token_holders_post(
467 &self.configuration,
468 request,
469 )
470 .await?;
471
472 let api_response = Self::extract_result_with_error_check(
473 "get_compressed_mint_token_holders",
474 result.error,
475 result.result.map(|r| *r),
476 )?;
477 if api_response.context.slot < config.slot {
478 return Err(IndexerError::IndexerNotSyncedToSlot);
479 }
480 let owner_balances: Result<Vec<_>, _> = api_response
481 .value
482 .items
483 .iter()
484 .map(OwnerBalance::try_from)
485 .collect();
486
487 let cursor = api_response.value.cursor;
488
489 Ok(Response {
490 context: Context {
491 slot: api_response.context.slot,
492 },
493 value: ItemsWithCursor {
494 items: owner_balances?,
495 cursor,
496 },
497 })
498 })
499 .await
500 }
501
502 async fn get_compressed_token_account_balance(
503 &self,
504 address: Option<Address>,
505 hash: Option<Hash>,
506 config: Option<IndexerRpcConfig>,
507 ) -> Result<Response<u64>, IndexerError> {
508 let config = config.unwrap_or_default();
509 self.retry(config.retry_config, || async {
510 let request = photon_api::models::GetCompressedTokenAccountBalancePostRequest {
511 params: Box::new(photon_api::models::GetCompressedAccountPostRequestParams {
512 address: address.map(|x| x.to_base58()),
513 hash: hash.map(|x| x.to_base58()),
514 }),
515 ..Default::default()
516 };
517
518 let result = photon_api::apis::default_api::get_compressed_token_account_balance_post(
519 &self.configuration,
520 request,
521 )
522 .await?;
523
524 let api_response = Self::extract_result_with_error_check(
525 "get_compressed_token_account_balance",
526 result.error,
527 result.result.map(|r| *r),
528 )?;
529 if api_response.context.slot < config.slot {
530 return Err(IndexerError::IndexerNotSyncedToSlot);
531 }
532 Ok(Response {
533 context: Context {
534 slot: api_response.context.slot,
535 },
536 value: api_response.value.amount,
537 })
538 })
539 .await
540 }
541
542 async fn get_compressed_token_accounts_by_delegate(
543 &self,
544 delegate: &Pubkey,
545 options: Option<GetCompressedTokenAccountsByOwnerOrDelegateOptions>,
546 config: Option<IndexerRpcConfig>,
547 ) -> Result<Response<ItemsWithCursor<TokenAccount>>, IndexerError> {
548 let config = config.unwrap_or_default();
549 self.retry(config.retry_config, || async {
550 #[cfg(feature = "v2")]
551 {
552 let request = photon_api::models::GetCompressedTokenAccountsByDelegateV2PostRequest {
553 params: Box::new(
554 photon_api::models::GetCompressedTokenAccountsByDelegatePostRequestParams {
555 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
556 limit: options.as_ref().and_then(|o| o.limit),
557 mint: options.as_ref().and_then(|o| o.mint.as_ref()).map(|x| x.to_string()),
558 delegate: delegate.to_string(),
559 },
560 ),
561 ..Default::default()
562 };
563
564 let result = photon_api::apis::default_api::get_compressed_token_accounts_by_delegate_v2_post(
565 &self.configuration,
566 request,
567 )
568 .await?;
569
570 let response = result.result.ok_or(IndexerError::AccountNotFound)?;
571 if response.context.slot < config.slot {
572 return Err(IndexerError::IndexerNotSyncedToSlot);
573 }
574
575 let token_accounts: Result<Vec<_>, _> = response
576 .value
577 .items
578 .iter()
579 .map(TokenAccount::try_from)
580 .collect();
581
582 let cursor = response.value.cursor;
583
584 Ok(Response {
585 context: Context {
586 slot: response.context.slot,
587 },
588 value: ItemsWithCursor {
589 items: token_accounts?,
590 cursor,
591 },
592 })
593 }
594 #[cfg(not(feature = "v2"))]
595 {
596 let request = photon_api::models::GetCompressedTokenAccountsByDelegatePostRequest {
597 params: Box::new(
598 photon_api::models::GetCompressedTokenAccountsByDelegatePostRequestParams {
599 delegate: delegate.to_string(),
600 mint: options.as_ref().and_then(|o| o.mint.as_ref()).map(|x| x.to_string()),
601 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
602 limit: options.as_ref().and_then(|o| o.limit),
603 },
604 ),
605 ..Default::default()
606 };
607
608 let result = photon_api::apis::default_api::get_compressed_token_accounts_by_delegate_post(
609 &self.configuration,
610 request,
611 )
612 .await?;
613
614 let response = result.result.ok_or(IndexerError::AccountNotFound)?;
615 if response.context.slot < config.slot {
616 return Err(IndexerError::IndexerNotSyncedToSlot);
617 }
618
619 let token_accounts: Result<Vec<_>, _> = response
620 .value
621 .items
622 .iter()
623 .map(TokenAccount::try_from)
624 .collect();
625
626 let cursor = response.value.cursor;
627
628 Ok(Response {
629 context: Context {
630 slot: response.context.slot,
631 },
632 value: ItemsWithCursor {
633 items: token_accounts?,
634 cursor,
635 },
636 })
637 }
638 })
639 .await
640 }
641
642 async fn get_compressed_token_accounts_by_owner(
643 &self,
644 owner: &Pubkey,
645 options: Option<GetCompressedTokenAccountsByOwnerOrDelegateOptions>,
646 config: Option<IndexerRpcConfig>,
647 ) -> Result<Response<ItemsWithCursor<TokenAccount>>, IndexerError> {
648 let config = config.unwrap_or_default();
649 self.retry(config.retry_config, || async {
650 #[cfg(feature = "v2")]
651 {
652 let request = photon_api::models::GetCompressedTokenAccountsByOwnerV2PostRequest {
653 params: Box::from(
654 photon_api::models::GetCompressedTokenAccountsByOwnerPostRequestParams {
655 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
656 limit: options.as_ref().and_then(|o| o.limit),
657 mint: options
658 .as_ref()
659 .and_then(|o| o.mint.as_ref())
660 .map(|x| x.to_string()),
661 owner: owner.to_string(),
662 },
663 ),
664 ..Default::default()
665 };
666 let result =
667 photon_api::apis::default_api::get_compressed_token_accounts_by_owner_v2_post(
668 &self.configuration,
669 request,
670 )
671 .await?;
672 let response = result.result.ok_or(IndexerError::AccountNotFound)?;
673 if response.context.slot < config.slot {
674 return Err(IndexerError::IndexerNotSyncedToSlot);
675 }
676 let token_accounts: Result<Vec<_>, _> = response
677 .value
678 .items
679 .iter()
680 .map(TokenAccount::try_from)
681 .collect();
682
683 let cursor = response.value.cursor;
684
685 Ok(Response {
686 context: Context {
687 slot: response.context.slot,
688 },
689 value: ItemsWithCursor {
690 items: token_accounts?,
691 cursor,
692 },
693 })
694 }
695 #[cfg(not(feature = "v2"))]
696 {
697 let request = photon_api::models::GetCompressedTokenAccountsByOwnerPostRequest {
698 params: Box::new(
699 photon_api::models::GetCompressedTokenAccountsByOwnerPostRequestParams {
700 owner: owner.to_string(),
701 mint: options
702 .as_ref()
703 .and_then(|o| o.mint.as_ref())
704 .map(|x| x.to_string()),
705 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
706 limit: options.as_ref().and_then(|o| o.limit),
707 },
708 ),
709 ..Default::default()
710 };
711
712 let result =
713 photon_api::apis::default_api::get_compressed_token_accounts_by_owner_post(
714 &self.configuration,
715 request,
716 )
717 .await?;
718
719 let response = Self::extract_result_with_error_check(
720 "get_compressed_token_accounts_by_owner",
721 result.error,
722 result.result.map(|r| *r),
723 )?;
724 if response.context.slot < config.slot {
725 return Err(IndexerError::IndexerNotSyncedToSlot);
726 }
727 let token_accounts: Result<Vec<_>, _> = response
728 .value
729 .items
730 .iter()
731 .map(TokenAccount::try_from)
732 .collect();
733
734 let cursor = response.value.cursor;
735
736 Ok(Response {
737 context: Context {
738 slot: response.context.slot,
739 },
740 value: ItemsWithCursor {
741 items: token_accounts?,
742 cursor,
743 },
744 })
745 }
746 })
747 .await
748 }
749
750 async fn get_compressed_token_balances_by_owner_v2(
751 &self,
752 owner: &Pubkey,
753 options: Option<GetCompressedTokenAccountsByOwnerOrDelegateOptions>,
754 config: Option<IndexerRpcConfig>,
755 ) -> Result<Response<ItemsWithCursor<TokenBalance>>, IndexerError> {
756 let config = config.unwrap_or_default();
757 self.retry(config.retry_config, || async {
758 #[cfg(feature = "v2")]
759 {
760 let request = photon_api::models::GetCompressedTokenBalancesByOwnerV2PostRequest {
761 params: Box::new(
762 photon_api::models::GetCompressedTokenAccountsByOwnerPostRequestParams {
763 owner: owner.to_string(),
764 mint: options
765 .as_ref()
766 .and_then(|o| o.mint.as_ref())
767 .map(|x| x.to_string()),
768 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
769 limit: options.as_ref().and_then(|o| o.limit),
770 },
771 ),
772 ..Default::default()
773 };
774
775 let result =
776 photon_api::apis::default_api::get_compressed_token_balances_by_owner_v2_post(
777 &self.configuration,
778 request,
779 )
780 .await?;
781
782 let api_response = Self::extract_result_with_error_check(
783 "get_compressed_token_balances_by_owner_v2",
784 result.error,
785 result.result.map(|r| *r),
786 )?;
787 if api_response.context.slot < config.slot {
788 return Err(IndexerError::IndexerNotSyncedToSlot);
789 }
790
791 let token_balances: Result<Vec<_>, _> = api_response
792 .value
793 .items
794 .iter()
795 .map(TokenBalance::try_from)
796 .collect();
797
798 Ok(Response {
799 context: Context {
800 slot: api_response.context.slot,
801 },
802 value: ItemsWithCursor {
803 items: token_balances?,
804 cursor: api_response.value.cursor,
805 },
806 })
807 }
808 #[cfg(not(feature = "v2"))]
809 {
810 let request = photon_api::models::GetCompressedTokenBalancesByOwnerPostRequest {
811 params: Box::new(
812 photon_api::models::GetCompressedTokenAccountsByOwnerPostRequestParams {
813 owner: owner.to_string(),
814 mint: options
815 .as_ref()
816 .and_then(|o| o.mint.as_ref())
817 .map(|x| x.to_string()),
818 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
819 limit: options.as_ref().and_then(|o| o.limit),
820 },
821 ),
822 ..Default::default()
823 };
824
825 let result =
826 photon_api::apis::default_api::get_compressed_token_balances_by_owner_post(
827 &self.configuration,
828 request,
829 )
830 .await?;
831
832 let api_response = Self::extract_result_with_error_check(
833 "get_compressed_token_balances_by_owner",
834 result.error,
835 result.result.map(|r| *r),
836 )?;
837 if api_response.context.slot < config.slot {
838 return Err(IndexerError::IndexerNotSyncedToSlot);
839 }
840
841 let token_balances: Result<Vec<_>, _> = api_response
842 .value
843 .token_balances
844 .iter()
845 .map(TokenBalance::try_from)
846 .collect();
847
848 Ok(Response {
849 context: Context {
850 slot: api_response.context.slot,
851 },
852 value: ItemsWithCursor {
853 items: token_balances?,
854 cursor: api_response.value.cursor,
855 },
856 })
857 }
858 })
859 .await
860 }
861
862 async fn get_compression_signatures_for_account(
863 &self,
864 hash: Hash,
865 config: Option<IndexerRpcConfig>,
866 ) -> Result<Response<Items<SignatureWithMetadata>>, IndexerError> {
867 let config = config.unwrap_or_default();
868 self.retry(config.retry_config, || async {
869 let request = photon_api::models::GetCompressionSignaturesForAccountPostRequest {
870 params: Box::new(
871 photon_api::models::GetCompressedAccountProofPostRequestParams {
872 hash: hash.to_base58(),
873 },
874 ),
875 ..Default::default()
876 };
877
878 let result =
879 photon_api::apis::default_api::get_compression_signatures_for_account_post(
880 &self.configuration,
881 request,
882 )
883 .await?;
884
885 let api_response = Self::extract_result_with_error_check(
886 "get_compression_signatures_for_account",
887 result.error,
888 result.result.map(|r| *r),
889 )?;
890 if api_response.context.slot < config.slot {
891 return Err(IndexerError::IndexerNotSyncedToSlot);
892 }
893 let signatures = api_response
894 .value
895 .items
896 .iter()
897 .map(SignatureWithMetadata::try_from)
898 .collect::<Result<Vec<SignatureWithMetadata>, IndexerError>>()?;
899
900 Ok(Response {
901 context: Context {
902 slot: api_response.context.slot,
903 },
904 value: Items { items: signatures },
905 })
906 })
907 .await
908 }
909
910 async fn get_compression_signatures_for_address(
911 &self,
912 address: &[u8; 32],
913 options: Option<PaginatedOptions>,
914 config: Option<IndexerRpcConfig>,
915 ) -> Result<Response<ItemsWithCursor<SignatureWithMetadata>>, IndexerError> {
916 let config = config.unwrap_or_default();
917 self.retry(config.retry_config, || async {
918 let request = photon_api::models::GetCompressionSignaturesForAddressPostRequest {
919 params: Box::new(
920 photon_api::models::GetCompressionSignaturesForAddressPostRequestParams {
921 address: address.to_base58(),
922 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
923 limit: options.as_ref().and_then(|o| o.limit),
924 },
925 ),
926 ..Default::default()
927 };
928
929 let result =
930 photon_api::apis::default_api::get_compression_signatures_for_address_post(
931 &self.configuration,
932 request,
933 )
934 .await?;
935
936 let api_response = Self::extract_result_with_error_check(
937 "get_compression_signatures_for_address",
938 result.error,
939 result.result.map(|r| *r),
940 )?;
941 if api_response.context.slot < config.slot {
942 return Err(IndexerError::IndexerNotSyncedToSlot);
943 }
944
945 let signatures = api_response
946 .value
947 .items
948 .iter()
949 .map(SignatureWithMetadata::try_from)
950 .collect::<Result<Vec<SignatureWithMetadata>, IndexerError>>()?;
951
952 let cursor = api_response.value.cursor;
953
954 Ok(Response {
955 context: Context {
956 slot: api_response.context.slot,
957 },
958 value: ItemsWithCursor {
959 items: signatures,
960 cursor,
961 },
962 })
963 })
964 .await
965 }
966
967 async fn get_compression_signatures_for_owner(
968 &self,
969 owner: &Pubkey,
970 options: Option<PaginatedOptions>,
971 config: Option<IndexerRpcConfig>,
972 ) -> Result<Response<ItemsWithCursor<SignatureWithMetadata>>, IndexerError> {
973 let config = config.unwrap_or_default();
974 self.retry(config.retry_config, || async {
975 let request = photon_api::models::GetCompressionSignaturesForOwnerPostRequest {
976 params: Box::new(
977 photon_api::models::GetCompressionSignaturesForOwnerPostRequestParams {
978 owner: owner.to_string(),
979 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
980 limit: options.as_ref().and_then(|o| o.limit),
981 },
982 ),
983 ..Default::default()
984 };
985
986 let result = photon_api::apis::default_api::get_compression_signatures_for_owner_post(
987 &self.configuration,
988 request,
989 )
990 .await?;
991
992 let api_response = Self::extract_result_with_error_check(
993 "get_compression_signatures_for_owner",
994 result.error,
995 result.result.map(|r| *r),
996 )?;
997 if api_response.context.slot < config.slot {
998 return Err(IndexerError::IndexerNotSyncedToSlot);
999 }
1000
1001 let signatures = api_response
1002 .value
1003 .items
1004 .iter()
1005 .map(SignatureWithMetadata::try_from)
1006 .collect::<Result<Vec<SignatureWithMetadata>, IndexerError>>()?;
1007
1008 let cursor = api_response.value.cursor;
1009
1010 Ok(Response {
1011 context: Context {
1012 slot: api_response.context.slot,
1013 },
1014 value: ItemsWithCursor {
1015 items: signatures,
1016 cursor,
1017 },
1018 })
1019 })
1020 .await
1021 }
1022
1023 async fn get_compression_signatures_for_token_owner(
1024 &self,
1025 owner: &Pubkey,
1026 options: Option<PaginatedOptions>,
1027 config: Option<IndexerRpcConfig>,
1028 ) -> Result<Response<ItemsWithCursor<SignatureWithMetadata>>, IndexerError> {
1029 let config = config.unwrap_or_default();
1030 self.retry(config.retry_config, || async {
1031 let request = photon_api::models::GetCompressionSignaturesForTokenOwnerPostRequest {
1032 params: Box::new(
1033 photon_api::models::GetCompressionSignaturesForOwnerPostRequestParams {
1034 owner: owner.to_string(),
1035 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
1036 limit: options.as_ref().and_then(|o| o.limit),
1037 },
1038 ),
1039 ..Default::default()
1040 };
1041
1042 let result =
1043 photon_api::apis::default_api::get_compression_signatures_for_token_owner_post(
1044 &self.configuration,
1045 request,
1046 )
1047 .await?;
1048
1049 let api_response = Self::extract_result_with_error_check(
1050 "get_compression_signatures_for_token_owner",
1051 result.error,
1052 result.result.map(|r| *r),
1053 )?;
1054 if api_response.context.slot < config.slot {
1055 return Err(IndexerError::IndexerNotSyncedToSlot);
1056 }
1057
1058 let signatures = api_response
1059 .value
1060 .items
1061 .iter()
1062 .map(SignatureWithMetadata::try_from)
1063 .collect::<Result<Vec<SignatureWithMetadata>, IndexerError>>()?;
1064
1065 let cursor = api_response.value.cursor;
1066
1067 Ok(Response {
1068 context: Context {
1069 slot: api_response.context.slot,
1070 },
1071 value: ItemsWithCursor {
1072 items: signatures,
1073 cursor,
1074 },
1075 })
1076 })
1077 .await
1078 }
1079
1080 async fn get_indexer_health(&self, config: Option<RetryConfig>) -> Result<bool, IndexerError> {
1081 let config = config.unwrap_or_default();
1082 self.retry(config, || async {
1083 let request = photon_api::models::GetIndexerHealthPostRequest {
1084 ..Default::default()
1085 };
1086
1087 let result = photon_api::apis::default_api::get_indexer_health_post(
1088 &self.configuration,
1089 request,
1090 )
1091 .await?;
1092
1093 let _api_response = Self::extract_result_with_error_check(
1094 "get_indexer_health",
1095 result.error,
1096 result.result,
1097 )?;
1098
1099 Ok(true)
1100 })
1101 .await
1102 }
1103
1104 async fn get_indexer_slot(&self, config: Option<RetryConfig>) -> Result<u64, IndexerError> {
1105 let config = config.unwrap_or_default();
1106 self.retry(config, || async {
1107 let request = photon_api::models::GetIndexerSlotPostRequest {
1108 ..Default::default()
1109 };
1110
1111 let result =
1112 photon_api::apis::default_api::get_indexer_slot_post(&self.configuration, request)
1113 .await?;
1114
1115 let result = Self::extract_result_with_error_check(
1116 "get_indexer_slot",
1117 result.error,
1118 result.result,
1119 )?;
1120 Ok(result)
1121 })
1122 .await
1123 }
1124
1125 async fn get_multiple_compressed_account_proofs(
1126 &self,
1127 hashes: Vec<[u8; 32]>,
1128 config: Option<IndexerRpcConfig>,
1129 ) -> Result<Response<Items<MerkleProof>>, IndexerError> {
1130 let config = config.unwrap_or_default();
1131 self.retry(config.retry_config, || async {
1132 let hashes_for_async = hashes.clone();
1133
1134 let request: photon_api::models::GetMultipleCompressedAccountProofsPostRequest =
1135 photon_api::models::GetMultipleCompressedAccountProofsPostRequest {
1136 params: hashes_for_async
1137 .into_iter()
1138 .map(|hash| bs58::encode(hash).into_string())
1139 .collect(),
1140 ..Default::default()
1141 };
1142
1143 debug!("API request: {:?}", request);
1144
1145 let result =
1146 photon_api::apis::default_api::get_multiple_compressed_account_proofs_post(
1147 &self.configuration,
1148 request,
1149 )
1150 .await?;
1151 debug!("Raw API response: {:?}", result);
1152
1153 if let Some(error) = &result.error {
1154 let error_msg = error.message.as_deref().unwrap_or("Unknown error");
1155 let error_code = error.code.unwrap_or(0);
1156 tracing::error!("API returned error: {}", error_msg);
1157 return Err(IndexerError::PhotonError {
1158 context: "get_multiple_compressed_account_proofs".to_string(),
1159 message: format!("API Error (code {}): {}", error_code, error_msg),
1160 });
1161 }
1162
1163 let photon_proofs = result.result.ok_or_else(|| {
1164 IndexerError::missing_result(
1165 "get_multiple_new_address_proofs",
1166 "No result returned from Photon API",
1167 )
1168 })?;
1169 if photon_proofs.context.slot < config.slot {
1170 return Err(IndexerError::IndexerNotSyncedToSlot);
1171 }
1172
1173 let proofs = photon_proofs
1174 .value
1175 .iter()
1176 .map(|x| {
1177 let mut proof_vec = x.proof.clone();
1178 proof_vec.truncate(proof_vec.len() - 10); let proof = proof_vec
1181 .iter()
1182 .map(|x| Hash::from_base58(x))
1183 .collect::<Result<Vec<[u8; 32]>, IndexerError>>()
1184 .map_err(|e| IndexerError::Base58DecodeError {
1185 field: "proof".to_string(),
1186 message: e.to_string(),
1187 })?;
1188
1189 Ok(MerkleProof {
1190 hash: <[u8; 32] as Base58Conversions>::from_base58(&x.hash)?,
1191 leaf_index: x.leaf_index,
1192 merkle_tree: Pubkey::from_str_const(x.merkle_tree.as_str()),
1193 proof,
1194 root_seq: x.root_seq,
1195 root: [0u8; 32],
1196 })
1197 })
1198 .collect::<Result<Vec<MerkleProof>, IndexerError>>()?;
1199
1200 Ok(Response {
1201 context: Context {
1202 slot: photon_proofs.context.slot,
1203 },
1204 value: Items { items: proofs },
1205 })
1206 })
1207 .await
1208 }
1209
1210 async fn get_multiple_compressed_accounts(
1211 &self,
1212 addresses: Option<Vec<Address>>,
1213 hashes: Option<Vec<Hash>>,
1214 config: Option<IndexerRpcConfig>,
1215 ) -> Result<Response<Items<CompressedAccount>>, IndexerError> {
1216 let config = config.unwrap_or_default();
1217 self.retry(config.retry_config, || async {
1218 let hashes = hashes.clone();
1219 let addresses = addresses.clone();
1220 let request = photon_api::models::GetMultipleCompressedAccountsPostRequest {
1221 params: Box::new(
1222 photon_api::models::GetMultipleCompressedAccountsPostRequestParams {
1223 addresses: addresses.map(|x| x.iter().map(|x| x.to_base58()).collect()),
1224 hashes: hashes.map(|x| x.iter().map(|x| x.to_base58()).collect()),
1225 },
1226 ),
1227 ..Default::default()
1228 };
1229
1230 let result = photon_api::apis::default_api::get_multiple_compressed_accounts_post(
1231 &self.configuration,
1232 request,
1233 )
1234 .await?;
1235
1236 let api_response = Self::extract_result_with_error_check(
1237 "get_multiple_compressed_accounts",
1238 result.error,
1239 result.result.map(|r| *r),
1240 )?;
1241 if api_response.context.slot < config.slot {
1242 return Err(IndexerError::IndexerNotSyncedToSlot);
1243 }
1244 let accounts = api_response
1245 .value
1246 .items
1247 .iter()
1248 .map(CompressedAccount::try_from)
1249 .collect::<Result<Vec<CompressedAccount>, IndexerError>>()?;
1250
1251 Ok(Response {
1252 context: Context {
1253 slot: api_response.context.slot,
1254 },
1255 value: Items { items: accounts },
1256 })
1257 })
1258 .await
1259 }
1260
1261 async fn get_multiple_new_address_proofs(
1262 &self,
1263 merkle_tree_pubkey: [u8; 32],
1264 addresses: Vec<[u8; 32]>,
1265 config: Option<IndexerRpcConfig>,
1266 ) -> Result<Response<Items<NewAddressProofWithContext>>, IndexerError> {
1267 let config = config.unwrap_or_default();
1268 self.retry(config.retry_config, || async {
1269 let params: Vec<photon_api::models::address_with_tree::AddressWithTree> = addresses
1270 .iter()
1271 .map(|x| photon_api::models::address_with_tree::AddressWithTree {
1272 address: bs58::encode(x).into_string(),
1273 tree: bs58::encode(&merkle_tree_pubkey).into_string(),
1274 })
1275 .collect();
1276
1277 let request = photon_api::models::GetMultipleNewAddressProofsV2PostRequest {
1278 params,
1279 ..Default::default()
1280 };
1281
1282 let result = photon_api::apis::default_api::get_multiple_new_address_proofs_v2_post(
1283 &self.configuration,
1284 request,
1285 )
1286 .await;
1287
1288 match &result {
1289 Ok(response) => debug!("Raw API response: {:?}", response),
1290 Err(e) => error!("API request failed: {:?}", e),
1291 }
1292
1293 let result = result?;
1294
1295 let api_response = match Self::extract_result_with_error_check(
1296 "get_multiple_new_address_proofs",
1297 result.error,
1298 result.result.map(|r| *r),
1299 ) {
1300 Ok(proofs) => proofs,
1301 Err(e) => {
1302 error!("Failed to extract proofs: {:?}", e);
1303 return Err(e);
1304 }
1305 };
1306 if api_response.context.slot < config.slot {
1307 return Err(IndexerError::IndexerNotSyncedToSlot);
1308 }
1309 let photon_proofs = api_response.value;
1310 let mut proofs = Vec::new();
1311 for photon_proof in photon_proofs {
1312 let tree_pubkey = Hash::from_base58(&photon_proof.merkle_tree).map_err(|e| {
1313 IndexerError::Base58DecodeError {
1314 field: "merkle_tree".to_string(),
1315 message: e.to_string(),
1316 }
1317 })?;
1318
1319 let low_address_value = Hash::from_base58(&photon_proof.lower_range_address)
1320 .map_err(|e| IndexerError::Base58DecodeError {
1321 field: "lower_range_address".to_string(),
1322 message: e.to_string(),
1323 })?;
1324
1325 let next_address_value = Hash::from_base58(&photon_proof.higher_range_address)
1326 .map_err(|e| IndexerError::Base58DecodeError {
1327 field: "higher_range_address".to_string(),
1328 message: e.to_string(),
1329 })?;
1330
1331 let mut proof_vec: Vec<[u8; 32]> = photon_proof
1332 .proof
1333 .iter()
1334 .map(|x: &String| Hash::from_base58(x))
1335 .collect::<Result<Vec<[u8; 32]>, IndexerError>>()?;
1336
1337 proof_vec.truncate(proof_vec.len() - 10); let mut proof_arr = [[0u8; 32]; 16];
1339 proof_arr.copy_from_slice(&proof_vec);
1340
1341 let root = Hash::from_base58(&photon_proof.root).map_err(|e| {
1342 IndexerError::Base58DecodeError {
1343 field: "root".to_string(),
1344 message: e.to_string(),
1345 }
1346 })?;
1347
1348 let proof = NewAddressProofWithContext {
1349 merkle_tree: tree_pubkey.into(),
1350 low_address_index: photon_proof.low_element_leaf_index,
1351 low_address_value,
1352 low_address_next_index: photon_proof.next_index,
1353 low_address_next_value: next_address_value,
1354 low_address_proof: proof_arr.to_vec(),
1355 root,
1356 root_seq: photon_proof.root_seq,
1357 new_low_element: None,
1358 new_element: None,
1359 new_element_next_value: None,
1360 };
1361 proofs.push(proof);
1362 }
1363
1364 Ok(Response {
1365 context: Context {
1366 slot: api_response.context.slot,
1367 },
1368 value: Items { items: proofs },
1369 })
1370 })
1371 .await
1372 }
1373
1374 async fn get_validity_proof(
1375 &self,
1376 hashes: Vec<Hash>,
1377 new_addresses_with_trees: Vec<AddressWithTree>,
1378 config: Option<IndexerRpcConfig>,
1379 ) -> Result<Response<super::types::ValidityProofWithContext>, IndexerError> {
1380 let config = config.unwrap_or_default();
1381 self.retry(config.retry_config, || async {
1382 #[cfg(feature = "v2")]
1383 {
1384 let request = photon_api::models::GetValidityProofV2PostRequest {
1385 params: Box::new(photon_api::models::GetValidityProofPostRequestParams {
1386 hashes: Some(hashes.iter().map(|x| x.to_base58()).collect()),
1387 new_addresses_with_trees: Some(
1388 new_addresses_with_trees
1389 .iter()
1390 .map(|x| photon_api::models::AddressWithTree {
1391 address: x.address.to_base58(),
1392 tree: x.tree.to_string(),
1393 })
1394 .collect(),
1395 ),
1396 }),
1397 ..Default::default()
1398 };
1399
1400 let result = photon_api::apis::default_api::get_validity_proof_v2_post(
1401 &self.configuration,
1402 request,
1403 )
1404 .await?;
1405 let api_response = Self::extract_result_with_error_check(
1406 "get_validity_proof_v2",
1407 result.error,
1408 result.result.map(|r| *r),
1409 )?;
1410 if api_response.context.slot < config.slot {
1411 return Err(IndexerError::IndexerNotSyncedToSlot);
1412 }
1413 let validity_proof =
1414 super::types::ValidityProofWithContext::from_api_model_v2(*api_response.value)?;
1415
1416 Ok(Response {
1417 context: Context {
1418 slot: api_response.context.slot,
1419 },
1420 value: validity_proof,
1421 })
1422 }
1423 #[cfg(not(feature = "v2"))]
1424 {
1425 let request = photon_api::models::GetValidityProofPostRequest {
1426 params: Box::new(photon_api::models::GetValidityProofPostRequestParams {
1427 hashes: Some(hashes.iter().map(|x| x.to_base58()).collect()),
1428 new_addresses_with_trees: Some(
1429 new_addresses_with_trees
1430 .iter()
1431 .map(|x| photon_api::models::AddressWithTree {
1432 address: x.address.to_base58(),
1433 tree: x.tree.to_string(),
1434 })
1435 .collect(),
1436 ),
1437 }),
1438 ..Default::default()
1439 };
1440
1441 let result = photon_api::apis::default_api::get_validity_proof_post(
1442 &self.configuration,
1443 request,
1444 )
1445 .await?;
1446
1447 let api_response = Self::extract_result_with_error_check(
1448 "get_validity_proof",
1449 result.error,
1450 result.result.map(|r| *r),
1451 )?;
1452 if api_response.context.slot < config.slot {
1453 return Err(IndexerError::IndexerNotSyncedToSlot);
1454 }
1455 let validity_proof = super::types::ValidityProofWithContext::from_api_model(
1456 *api_response.value,
1457 hashes.len(),
1458 )?;
1459
1460 Ok(Response {
1461 context: Context {
1462 slot: api_response.context.slot,
1463 },
1464 value: validity_proof,
1465 })
1466 }
1467 })
1468 .await
1469 }
1470
1471 async fn get_address_queue_with_proofs(
1472 &mut self,
1473 _merkle_tree_pubkey: &Pubkey,
1474 _zkp_batch_size: u16,
1475 _start_offset: Option<u64>,
1476 _config: Option<IndexerRpcConfig>,
1477 ) -> Result<Response<BatchAddressUpdateIndexerResponse>, IndexerError> {
1478 #[cfg(not(feature = "v2"))]
1479 unimplemented!("get_address_queue_with_proofs");
1480 #[cfg(feature = "v2")]
1481 {
1482 let merkle_tree_pubkey = _merkle_tree_pubkey;
1483 let limit = _zkp_batch_size;
1484 let start_queue_index = _start_offset;
1485 let config = _config.unwrap_or_default();
1486 self.retry(config.retry_config, || async {
1487 let merkle_tree = Hash::from_bytes(merkle_tree_pubkey.to_bytes().as_ref())?;
1488 let request = photon_api::models::GetBatchAddressUpdateInfoPostRequest {
1489 params: Box::new(
1490 photon_api::models::GetBatchAddressUpdateInfoPostRequestParams {
1491 limit,
1492 start_queue_index,
1493 tree: merkle_tree.to_base58(),
1494 },
1495 ),
1496 ..Default::default()
1497 };
1498
1499 let result = photon_api::apis::default_api::get_batch_address_update_info_post(
1500 &self.configuration,
1501 request,
1502 )
1503 .await?;
1504
1505 let api_response = Self::extract_result_with_error_check(
1506 "get_batch_address_update_info",
1507 result.error,
1508 result.result.map(|r| *r),
1509 )?;
1510 if api_response.context.slot < config.slot {
1511 return Err(IndexerError::IndexerNotSyncedToSlot);
1512 }
1513
1514 let addresses = api_response
1515 .addresses
1516 .iter()
1517 .map(|x| crate::indexer::AddressQueueIndex {
1518 address: Hash::from_base58(x.address.clone().as_ref()).unwrap(),
1519 queue_index: x.queue_index,
1520 })
1521 .collect();
1522
1523 let mut proofs: Vec<NewAddressProofWithContext> = vec![];
1524 for proof in api_response.non_inclusion_proofs {
1525 let proof = NewAddressProofWithContext {
1526 merkle_tree: *merkle_tree_pubkey,
1527 low_address_index: proof.low_element_leaf_index,
1528 low_address_value: Hash::from_base58(
1529 proof.lower_range_address.clone().as_ref(),
1530 )
1531 .unwrap(),
1532 low_address_next_index: proof.next_index,
1533 low_address_next_value: Hash::from_base58(
1534 proof.higher_range_address.clone().as_ref(),
1535 )
1536 .unwrap(),
1537 low_address_proof: proof
1538 .proof
1539 .iter()
1540 .map(|x| Hash::from_base58(x.clone().as_ref()).unwrap())
1541 .collect(),
1542 root: Hash::from_base58(proof.root.clone().as_ref()).unwrap(),
1543 root_seq: proof.root_seq,
1544
1545 new_low_element: None,
1546 new_element: None,
1547 new_element_next_value: None,
1548 };
1549 proofs.push(proof);
1550 }
1551
1552 let subtrees = api_response
1553 .subtrees
1554 .iter()
1555 .map(|x| {
1556 let mut arr = [0u8; 32];
1557 arr.copy_from_slice(x.as_slice());
1558 arr
1559 })
1560 .collect::<Vec<_>>();
1561
1562 let result = BatchAddressUpdateIndexerResponse {
1563 batch_start_index: api_response.start_index,
1564 addresses,
1565 non_inclusion_proofs: proofs,
1566 subtrees,
1567 };
1568 Ok(Response {
1569 context: Context {
1570 slot: api_response.context.slot,
1571 },
1572 value: result,
1573 })
1574 })
1575 .await
1576 }
1577 }
1578
1579 async fn get_queue_elements(
1580 &mut self,
1581 _pubkey: [u8; 32],
1582 _queue_type: QueueType,
1583 _num_elements: u16,
1584 _start_offset: Option<u64>,
1585 _config: Option<IndexerRpcConfig>,
1586 ) -> Result<Response<Items<MerkleProofWithContext>>, IndexerError> {
1587 #[cfg(not(feature = "v2"))]
1588 unimplemented!("get_queue_elements");
1589 #[cfg(feature = "v2")]
1590 {
1591 let pubkey = _pubkey;
1592 let queue_type = _queue_type;
1593 let limit = _num_elements;
1594 let start_queue_index = _start_offset;
1595 let config = _config.unwrap_or_default();
1596 self.retry(config.retry_config, || async {
1597 let request: photon_api::models::GetQueueElementsPostRequest =
1598 photon_api::models::GetQueueElementsPostRequest {
1599 params: Box::from(photon_api::models::GetQueueElementsPostRequestParams {
1600 tree: bs58::encode(pubkey).into_string(),
1601 queue_type: queue_type as u16,
1602 limit,
1603 start_queue_index,
1604 }),
1605 ..Default::default()
1606 };
1607 let result = photon_api::apis::default_api::get_queue_elements_post(
1608 &self.configuration,
1609 request,
1610 )
1611 .await;
1612
1613 let result: Result<Response<Items<MerkleProofWithContext>>, IndexerError> =
1614 match result {
1615 Ok(api_response) => match api_response.result {
1616 Some(api_result) => {
1617 if api_result.context.slot < config.slot {
1618 return Err(IndexerError::IndexerNotSyncedToSlot);
1619 }
1620 let response = api_result.value;
1621 let proofs: Vec<MerkleProofWithContext> = response
1622 .iter()
1623 .map(|x| {
1624 let proof = x
1625 .proof
1626 .iter()
1627 .map(|x| Hash::from_base58(x).unwrap())
1628 .collect();
1629 let root = Hash::from_base58(&x.root).unwrap();
1630 let leaf = Hash::from_base58(&x.leaf).unwrap();
1631 let merkle_tree = Hash::from_base58(&x.tree).unwrap();
1632 let tx_hash = x
1633 .tx_hash
1634 .as_ref()
1635 .map(|x| Hash::from_base58(x).unwrap());
1636 let account_hash =
1637 Hash::from_base58(&x.account_hash).unwrap();
1638
1639 MerkleProofWithContext {
1640 proof,
1641 root,
1642 leaf_index: x.leaf_index,
1643 leaf,
1644 merkle_tree,
1645 root_seq: x.root_seq,
1646 tx_hash,
1647 account_hash,
1648 }
1649 })
1650 .collect();
1651
1652 Ok(Response {
1653 context: Context {
1654 slot: api_result.context.slot,
1655 },
1656 value: Items { items: proofs },
1657 })
1658 }
1659 None => {
1660 let error = api_response.error.ok_or_else(|| {
1661 IndexerError::PhotonError {
1662 context: "get_queue_elements".to_string(),
1663 message: "No error details provided".to_string(),
1664 }
1665 })?;
1666
1667 Err(IndexerError::PhotonError {
1668 context: "get_queue_elements".to_string(),
1669 message: error
1670 .message
1671 .unwrap_or_else(|| "Unknown error".to_string()),
1672 })
1673 }
1674 },
1675 Err(e) => Err(IndexerError::PhotonError {
1676 context: "get_queue_elements".to_string(),
1677 message: e.to_string(),
1678 }),
1679 };
1680
1681 result
1682 })
1683 .await
1684 }
1685 }
1686
1687 async fn get_subtrees(
1688 &self,
1689 _merkle_tree_pubkey: [u8; 32],
1690 _config: Option<IndexerRpcConfig>,
1691 ) -> Result<Response<Items<[u8; 32]>>, IndexerError> {
1692 #[cfg(not(feature = "v2"))]
1693 unimplemented!();
1694 #[cfg(feature = "v2")]
1695 {
1696 todo!();
1697 }
1698 }
1699}