1#![forbid(unsafe_code)]
8
9use crate::solana_sdk::pubkey::Pubkey;
10use crate::{error::Result, events::TallyEvent, SimpleTallyClient, TallyError};
11use anchor_client::solana_account_decoder::UiAccountEncoding;
12use anchor_client::solana_client::rpc_client::GetConfirmedSignaturesForAddress2Config;
13use anchor_client::solana_client::rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig};
14use anchor_client::solana_client::rpc_filter::{Memcmp, MemcmpEncodedBytes, RpcFilterType};
15use anchor_client::solana_sdk::{commitment_config::CommitmentConfig, signature::Signature};
16use anyhow::Context;
17use chrono::{DateTime, Utc};
18use lru::LruCache;
19use serde::{Deserialize, Serialize};
20use std::collections::{HashMap, HashSet};
21use std::num::NonZeroUsize;
22use std::str::FromStr;
23use std::sync::{Arc, Mutex};
24use std::time::Instant;
25use tracing::{debug, info, trace, warn};
26
27#[derive(Debug, Clone)]
29pub struct EventQueryConfig {
30 pub max_events_per_query: usize,
32 pub max_signatures_per_batch: usize,
34 pub commitment: CommitmentConfig,
36 pub enable_cache: bool,
38 pub cache_ttl_seconds: u64,
40 pub max_cache_size: usize,
42}
43
44impl Default for EventQueryConfig {
45 fn default() -> Self {
46 Self {
47 max_events_per_query: 1000,
48 max_signatures_per_batch: 100,
49 commitment: CommitmentConfig::confirmed(),
50 enable_cache: true,
51 cache_ttl_seconds: 300, max_cache_size: 1000,
53 }
54 }
55}
56
57#[derive(Debug, Clone)]
59pub struct EventQueryClientConfig {
60 pub rpc_url: String,
62 pub program_id: Pubkey,
64 pub query_config: EventQueryConfig,
66}
67
68#[derive(Debug, Clone, Serialize, Deserialize)]
70pub struct ParsedEvent {
71 pub signature: Signature,
73 pub slot: u64,
75 pub block_time: Option<i64>,
77 pub success: bool,
79 pub event: TallyEvent,
81 pub log_index: usize,
83}
84
85#[derive(Debug, Clone)]
87struct CacheEntry {
88 events: Vec<ParsedEvent>,
90 cached_at: DateTime<Utc>,
92 ttl_seconds: u64,
94}
95
96impl CacheEntry {
97 fn new(events: Vec<ParsedEvent>, ttl_seconds: u64) -> Self {
98 Self {
99 events,
100 cached_at: Utc::now(),
101 ttl_seconds,
102 }
103 }
104
105 fn is_expired(&self) -> bool {
106 let ttl_i64 = i64::try_from(self.ttl_seconds).unwrap_or(i64::MAX);
108 let duration = chrono::Duration::seconds(ttl_i64);
109 let expiry = self
110 .cached_at
111 .checked_add_signed(duration)
112 .unwrap_or(self.cached_at);
113 Utc::now() > expiry
114 }
115}
116
117#[derive(Debug, Clone, Hash, PartialEq, Eq)]
119struct QueryKey {
120 merchant: Pubkey,
121 query_type: QueryType,
122 limit: usize,
123 from_slot: Option<u64>,
124 to_slot: Option<u64>,
125}
126
127#[derive(Debug, Clone, Hash, PartialEq, Eq)]
128enum QueryType {
129 Recent,
130 DateRange,
131 MerchantEvents,
132}
133
134pub struct EventQueryClient {
136 sdk_client: Arc<SimpleTallyClient>,
138 program_id: Pubkey,
140 config: EventQueryConfig,
142 cache: Arc<Mutex<LruCache<QueryKey, CacheEntry>>>,
144}
145
146impl EventQueryClient {
147 pub fn new(config: EventQueryClientConfig) -> Result<Self> {
157 let sdk_client = Arc::new(
158 SimpleTallyClient::new(&config.rpc_url)
159 .context("Failed to create SimpleTallyClient")?,
160 );
161
162 let cache_size = NonZeroUsize::new(config.query_config.max_cache_size)
163 .context("Cache size must be greater than 0")?;
164 let cache = Arc::new(Mutex::new(LruCache::new(cache_size)));
165
166 info!(
167 service = "tally-sdk",
168 component = "event_query_client",
169 event = "client_created",
170 rpc_url = %config.rpc_url,
171 program_id = %config.program_id,
172 max_events_per_query = config.query_config.max_events_per_query,
173 cache_enabled = config.query_config.enable_cache,
174 "EventQueryClient initialized successfully"
175 );
176
177 Ok(Self {
178 sdk_client,
179 program_id: config.program_id,
180 config: config.query_config,
181 cache,
182 })
183 }
184
185 pub fn new_with_program_id(
196 rpc_url: String,
197 query_config: Option<EventQueryConfig>,
198 ) -> Result<Self> {
199 let config = EventQueryClientConfig {
200 rpc_url,
201 program_id: crate::program_id(),
202 query_config: query_config.unwrap_or_default(),
203 };
204 Self::new(config)
205 }
206
207 pub async fn get_recent_events(
222 &self,
223 merchant: &Pubkey,
224 limit: usize,
225 ) -> Result<Vec<ParsedEvent>> {
226 let start_time = Instant::now();
227 let query_key = Self::build_query_key(merchant, QueryType::Recent, limit, None, None);
228
229 if let Some(cached) = self.try_get_cached_events(&query_key, merchant) {
231 return Ok(cached);
232 }
233
234 debug!(
235 service = "tally-sdk",
236 component = "event_query_client",
237 event = "query_recent_events",
238 merchant = %merchant,
239 limit = limit,
240 "Querying recent events for merchant"
241 );
242
243 let sorted_events = self.fetch_and_sort_events(merchant, limit).await?;
245
246 self.try_cache_events(query_key, &sorted_events);
248
249 Self::log_query_success(merchant, &sorted_events, start_time);
250
251 Ok(sorted_events)
252 }
253
254 const fn build_query_key(
256 merchant: &Pubkey,
257 query_type: QueryType,
258 limit: usize,
259 from_slot: Option<u64>,
260 to_slot: Option<u64>,
261 ) -> QueryKey {
262 QueryKey {
263 merchant: *merchant,
264 query_type,
265 limit,
266 from_slot,
267 to_slot,
268 }
269 }
270
271 fn try_get_cached_events(
273 &self,
274 query_key: &QueryKey,
275 merchant: &Pubkey,
276 ) -> Option<Vec<ParsedEvent>> {
277 if !self.config.enable_cache {
278 return None;
279 }
280
281 if let Some(cached_events) = self.get_from_cache(query_key) {
282 debug!(
283 service = "tally-sdk",
284 component = "event_query_client",
285 event = "cache_hit",
286 merchant = %merchant,
287 cached_event_count = cached_events.len(),
288 "Returning cached recent events"
289 );
290 return Some(cached_events);
291 }
292
293 None
294 }
295
296 async fn fetch_and_sort_events(
298 &self,
299 merchant: &Pubkey,
300 limit: usize,
301 ) -> Result<Vec<ParsedEvent>> {
302 let signatures = self.get_merchant_signatures(merchant, limit).await?;
303 let events = self.parse_events_from_signatures(&signatures).await?;
304 Ok(Self::sort_and_limit_events(events, limit))
305 }
306
307 fn sort_and_limit_events(mut events: Vec<ParsedEvent>, limit: usize) -> Vec<ParsedEvent> {
309 events.sort_by(|a, b| b.slot.cmp(&a.slot));
310 events.truncate(limit);
311 events
312 }
313
314 fn try_cache_events(&self, query_key: QueryKey, events: &[ParsedEvent]) {
316 if self.config.enable_cache {
317 self.store_in_cache(query_key, events.to_vec());
318 }
319 }
320
321 fn log_query_success(merchant: &Pubkey, events: &[ParsedEvent], start_time: Instant) {
323 info!(
324 service = "tally-sdk",
325 component = "event_query_client",
326 event = "recent_events_retrieved",
327 merchant = %merchant,
328 event_count = events.len(),
329 duration_ms = start_time.elapsed().as_millis(),
330 "Successfully retrieved recent events"
331 );
332 }
333
334 pub async fn get_events_by_date_range(
350 &self,
351 merchant: &Pubkey,
352 from: DateTime<Utc>,
353 to: DateTime<Utc>,
354 ) -> Result<Vec<ParsedEvent>> {
355 let start_time = Instant::now();
356
357 debug!(
358 service = "tally-sdk",
359 component = "event_query_client",
360 event = "query_events_by_date_range",
361 merchant = %merchant,
362 from = %from,
363 to = %to,
364 "Querying events by date range"
365 );
366
367 let (from_slot, to_slot) = self.convert_date_range_to_slots(from, to)?;
369 let query_key = Self::build_date_range_query_key(
370 merchant,
371 from_slot,
372 to_slot,
373 self.config.max_events_per_query,
374 );
375
376 if let Some(cached) = self.try_get_cached_date_range_events(&query_key, merchant) {
378 return Ok(cached);
379 }
380
381 let sorted_events = self
383 .fetch_filter_and_sort_events_by_date(merchant, from, to, from_slot, to_slot)
384 .await?;
385
386 self.try_cache_events(query_key.clone(), &sorted_events);
388
389 Self::log_date_range_query_success(
390 merchant,
391 &sorted_events,
392 from_slot,
393 to_slot,
394 start_time,
395 );
396
397 Ok(sorted_events)
398 }
399
400 fn convert_date_range_to_slots(
402 &self,
403 from: DateTime<Utc>,
404 to: DateTime<Utc>,
405 ) -> Result<(u64, u64)> {
406 let from_slot = self.timestamp_to_approximate_slot(from.timestamp())?;
407 let to_slot = self.timestamp_to_approximate_slot(to.timestamp())?;
408 Ok((from_slot, to_slot))
409 }
410
411 const fn build_date_range_query_key(
413 merchant: &Pubkey,
414 from_slot: u64,
415 to_slot: u64,
416 limit: usize,
417 ) -> QueryKey {
418 QueryKey {
419 merchant: *merchant,
420 query_type: QueryType::DateRange,
421 limit,
422 from_slot: Some(from_slot),
423 to_slot: Some(to_slot),
424 }
425 }
426
427 fn try_get_cached_date_range_events(
429 &self,
430 query_key: &QueryKey,
431 merchant: &Pubkey,
432 ) -> Option<Vec<ParsedEvent>> {
433 if !self.config.enable_cache {
434 return None;
435 }
436
437 if let Some(cached_events) = self.get_from_cache(query_key) {
438 debug!(
439 service = "tally-sdk",
440 component = "event_query_client",
441 event = "cache_hit",
442 merchant = %merchant,
443 cached_event_count = cached_events.len(),
444 "Returning cached date range events"
445 );
446 return Some(cached_events);
447 }
448
449 None
450 }
451
452 async fn fetch_filter_and_sort_events_by_date(
454 &self,
455 merchant: &Pubkey,
456 from: DateTime<Utc>,
457 to: DateTime<Utc>,
458 from_slot: u64,
459 to_slot: u64,
460 ) -> Result<Vec<ParsedEvent>> {
461 let signatures = self
462 .get_merchant_signatures_in_slot_range(merchant, from_slot, to_slot)
463 .await?;
464 let events = self.parse_events_from_signatures(&signatures).await?;
465 let filtered_events = Self::filter_events_by_date_range(events, from, to);
466 Ok(Self::sort_events_by_block_time(filtered_events))
467 }
468
469 fn filter_events_by_date_range(
471 events: Vec<ParsedEvent>,
472 from: DateTime<Utc>,
473 to: DateTime<Utc>,
474 ) -> Vec<ParsedEvent> {
475 events
476 .into_iter()
477 .filter(|event| Self::is_event_in_date_range(event, from, to))
478 .collect()
479 }
480
481 fn is_event_in_date_range(event: &ParsedEvent, from: DateTime<Utc>, to: DateTime<Utc>) -> bool {
483 event
484 .block_time
485 .and_then(|block_time| DateTime::from_timestamp(block_time, 0))
486 .is_some_and(|event_time| event_time >= from && event_time <= to)
487 }
488
489 fn sort_events_by_block_time(mut events: Vec<ParsedEvent>) -> Vec<ParsedEvent> {
491 events.sort_by(|a, b| b.block_time.unwrap_or(0).cmp(&a.block_time.unwrap_or(0)));
492 events
493 }
494
495 fn log_date_range_query_success(
497 merchant: &Pubkey,
498 events: &[ParsedEvent],
499 from_slot: u64,
500 to_slot: u64,
501 start_time: Instant,
502 ) {
503 info!(
504 service = "tally-sdk",
505 component = "event_query_client",
506 event = "date_range_events_retrieved",
507 merchant = %merchant,
508 event_count = events.len(),
509 from_slot = from_slot,
510 to_slot = to_slot,
511 duration_ms = start_time.elapsed().as_millis(),
512 "Successfully retrieved events by date range"
513 );
514 }
515
516 pub async fn get_merchant_events(
531 &self,
532 merchant: &Pubkey,
533 limit: usize,
534 ) -> Result<Vec<ParsedEvent>> {
535 let start_time = Instant::now();
536 let query_key = Self::build_merchant_events_query_key(merchant, limit);
537
538 if let Some(cached) = self.try_get_cached_merchant_events(&query_key, merchant) {
540 return Ok(cached);
541 }
542
543 debug!(
544 service = "tally-sdk",
545 component = "event_query_client",
546 event = "query_merchant_events",
547 merchant = %merchant,
548 limit = limit,
549 "Querying all events for merchant"
550 );
551
552 let sorted_events = self
554 .fetch_parse_and_sort_merchant_events(merchant, limit)
555 .await?;
556
557 self.try_cache_events(query_key, &sorted_events);
559
560 Self::log_merchant_events_success(merchant, &sorted_events, start_time);
561
562 Ok(sorted_events)
563 }
564
565 const fn build_merchant_events_query_key(merchant: &Pubkey, limit: usize) -> QueryKey {
567 QueryKey {
568 merchant: *merchant,
569 query_type: QueryType::MerchantEvents,
570 limit,
571 from_slot: None,
572 to_slot: None,
573 }
574 }
575
576 fn try_get_cached_merchant_events(
578 &self,
579 query_key: &QueryKey,
580 merchant: &Pubkey,
581 ) -> Option<Vec<ParsedEvent>> {
582 if !self.config.enable_cache {
583 return None;
584 }
585
586 if let Some(cached_events) = self.get_from_cache(query_key) {
587 debug!(
588 service = "tally-sdk",
589 component = "event_query_client",
590 event = "cache_hit",
591 merchant = %merchant,
592 cached_event_count = cached_events.len(),
593 "Returning cached merchant events"
594 );
595 return Some(cached_events);
596 }
597
598 None
599 }
600
601 async fn fetch_parse_and_sort_merchant_events(
603 &self,
604 merchant: &Pubkey,
605 limit: usize,
606 ) -> Result<Vec<ParsedEvent>> {
607 let signature_limit = limit.saturating_mul(2);
609 let signatures = self
610 .get_merchant_signatures(merchant, signature_limit)
611 .await?;
612
613 let events = self.parse_events_from_signatures(&signatures).await?;
615
616 Ok(Self::sort_and_limit_events(events, limit))
618 }
619
620 fn log_merchant_events_success(merchant: &Pubkey, events: &[ParsedEvent], start_time: Instant) {
622 info!(
623 service = "tally-sdk",
624 component = "event_query_client",
625 event = "merchant_events_retrieved",
626 merchant = %merchant,
627 event_count = events.len(),
628 duration_ms = start_time.elapsed().as_millis(),
629 "Successfully retrieved merchant events"
630 );
631 }
632
633 #[allow(clippy::unused_async)] async fn get_merchant_signatures(
636 &self,
637 merchant: &Pubkey,
638 limit: usize,
639 ) -> Result<Vec<Signature>> {
640 let merchant_signatures = self
642 .sdk_client
643 .get_confirmed_signatures_for_address(
644 merchant,
645 Some(GetConfirmedSignaturesForAddress2Config {
646 limit: Some(limit.min(1000)), commitment: Some(self.config.commitment),
648 ..Default::default()
649 }),
650 )
651 .map_err(|e| TallyError::RpcError(format!("Failed to get merchant signatures: {e}")))?;
652
653 let mut signatures = HashSet::new();
654 for sig_info in merchant_signatures {
655 if let Ok(signature) = Signature::from_str(&sig_info.signature) {
656 signatures.insert(signature);
657 }
658 }
659
660 let plans = self.get_merchant_plans(merchant)?;
662 for plan_address in &plans {
663 let plan_signatures = self
664 .sdk_client
665 .get_confirmed_signatures_for_address(
666 plan_address,
667 Some(GetConfirmedSignaturesForAddress2Config {
668 limit: Some(limit.min(1000)),
669 commitment: Some(self.config.commitment),
670 ..Default::default()
671 }),
672 )
673 .map_err(|e| TallyError::RpcError(format!("Failed to get plan signatures: {e}")))?;
674
675 for sig_info in plan_signatures {
676 if let Ok(signature) = Signature::from_str(&sig_info.signature) {
677 signatures.insert(signature);
678 }
679 }
680 }
681
682 for plan_address in &plans {
684 let subscriptions = self.get_plan_subscriptions(plan_address)?;
685 for subscription_address in subscriptions {
686 let sub_signatures = self
687 .sdk_client
688 .get_confirmed_signatures_for_address(
689 &subscription_address,
690 Some(GetConfirmedSignaturesForAddress2Config {
691 limit: Some(limit.min(1000)),
692 commitment: Some(self.config.commitment),
693 ..Default::default()
694 }),
695 )
696 .map_err(|e| {
697 TallyError::RpcError(format!("Failed to get subscription signatures: {e}"))
698 })?;
699
700 for sig_info in sub_signatures {
701 if let Ok(signature) = Signature::from_str(&sig_info.signature) {
702 signatures.insert(signature);
703 }
704 }
705 }
706 }
707
708 let result: Vec<Signature> = signatures.into_iter().collect();
709
710 debug!(
711 service = "tally-sdk",
712 component = "event_query_client",
713 event = "signatures_collected",
714 merchant = %merchant,
715 signature_count = result.len(),
716 plan_count = plans.len(),
717 "Collected transaction signatures for merchant"
718 );
719
720 Ok(result)
721 }
722
723 async fn get_merchant_signatures_in_slot_range(
725 &self,
726 merchant: &Pubkey,
727 _from_slot: u64,
728 _to_slot: u64,
729 ) -> Result<Vec<Signature>> {
730 let signature_limit = self.config.max_events_per_query.saturating_mul(2);
732 let signatures = self
733 .get_merchant_signatures(merchant, signature_limit)
734 .await?;
735
736 Ok(signatures)
740 }
741
742 async fn parse_events_from_signatures(
744 &self,
745 signatures: &[Signature],
746 ) -> Result<Vec<ParsedEvent>> {
747 let all_events = self.process_signature_batches(signatures).await;
748
749 Self::log_parsed_events_summary(signatures, &all_events);
750
751 Ok(all_events)
752 }
753
754 async fn process_signature_batches(&self, signatures: &[Signature]) -> Vec<ParsedEvent> {
756 let mut all_events = Vec::new();
757
758 for chunk in signatures.chunks(self.config.max_signatures_per_batch) {
759 let batch_events = self.process_signature_chunk(chunk);
760 all_events.extend(batch_events);
761
762 self.apply_batch_rate_limit(chunk.len()).await;
764 }
765
766 all_events
767 }
768
769 fn process_signature_chunk(&self, chunk: &[Signature]) -> Vec<ParsedEvent> {
771 let batch_events = Vec::new();
772
773 for signature in chunk {
774 self.try_fetch_and_log_transaction(signature);
775 }
776
777 batch_events
778 }
779
780 fn try_fetch_and_log_transaction(&self, signature: &Signature) {
782 match self.sdk_client.get_transaction(signature) {
783 Ok(_transaction) => {
784 Self::log_transaction_received(signature);
785 }
786 Err(e) => {
787 Self::log_transaction_fetch_error(signature, &e);
788 }
789 }
790 }
791
792 fn log_transaction_received(signature: &Signature) {
794 debug!(
795 service = "tally-sdk",
796 component = "event_query_client",
797 event = "transaction_received",
798 signature = %signature,
799 "Transaction data received - event parsing temporarily disabled"
800 );
801 }
802
803 fn log_transaction_fetch_error<E: std::fmt::Display>(signature: &Signature, error: &E) {
805 trace!(
806 service = "tally-sdk",
807 component = "event_query_client",
808 event = "transaction_fetch_error",
809 signature = %signature,
810 error = %error,
811 "Failed to fetch transaction details"
812 );
813 }
814
815 async fn apply_batch_rate_limit(&self, chunk_len: usize) {
817 if chunk_len == self.config.max_signatures_per_batch {
818 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
819 }
820 }
821
822 fn log_parsed_events_summary(signatures: &[Signature], events: &[ParsedEvent]) {
824 debug!(
825 service = "tally-sdk",
826 component = "event_query_client",
827 event = "events_parsed",
828 signature_count = signatures.len(),
829 event_count = events.len(),
830 "Parsed events from transaction signatures"
831 );
832 }
833
834 fn get_merchant_plans(&self, merchant: &Pubkey) -> Result<Vec<Pubkey>> {
836 let config = RpcProgramAccountsConfig {
837 filters: Some(vec![
838 RpcFilterType::Memcmp(Memcmp::new(
840 8, MemcmpEncodedBytes::Base58(merchant.to_string()),
842 )),
843 ]),
844 account_config: RpcAccountInfoConfig {
845 encoding: Some(UiAccountEncoding::Base64),
846 commitment: Some(self.config.commitment),
847 ..Default::default()
848 },
849 with_context: Some(false),
850 sort_results: None,
851 };
852
853 let accounts = self
854 .sdk_client
855 .rpc()
856 .get_program_accounts_with_config(&self.program_id, config)
857 .map_err(|e| TallyError::RpcError(format!("Failed to get merchant plans: {e}")))?;
858
859 let plan_addresses: Vec<Pubkey> = accounts.into_iter().map(|(pubkey, _)| pubkey).collect();
860
861 debug!(
862 service = "tally-sdk",
863 component = "event_query_client",
864 event = "merchant_plans_retrieved",
865 merchant = %merchant,
866 plan_count = plan_addresses.len(),
867 "Retrieved plan addresses for merchant"
868 );
869
870 Ok(plan_addresses)
871 }
872
873 fn get_plan_subscriptions(&self, plan: &Pubkey) -> Result<Vec<Pubkey>> {
875 let config = RpcProgramAccountsConfig {
876 filters: Some(vec![
877 RpcFilterType::Memcmp(Memcmp::new(
879 8, MemcmpEncodedBytes::Base58(plan.to_string()),
881 )),
882 ]),
883 account_config: RpcAccountInfoConfig {
884 encoding: Some(UiAccountEncoding::Base64),
885 commitment: Some(self.config.commitment),
886 ..Default::default()
887 },
888 with_context: Some(false),
889 sort_results: None,
890 };
891
892 let accounts = self
893 .sdk_client
894 .rpc()
895 .get_program_accounts_with_config(&self.program_id, config)
896 .map_err(|e| TallyError::RpcError(format!("Failed to get plan subscriptions: {e}")))?;
897
898 let subscription_addresses: Vec<Pubkey> =
899 accounts.into_iter().map(|(pubkey, _)| pubkey).collect();
900
901 trace!(
902 service = "tally-sdk",
903 component = "event_query_client",
904 event = "plan_subscriptions_retrieved",
905 plan = %plan,
906 subscription_count = subscription_addresses.len(),
907 "Retrieved subscription addresses for plan"
908 );
909
910 Ok(subscription_addresses)
911 }
912
913 fn timestamp_to_approximate_slot(&self, timestamp: i64) -> Result<u64> {
915 const SLOT_DURATION_MS: i64 = 400;
917
918 let current_slot = self
920 .sdk_client
921 .get_slot()
922 .map_err(|e| TallyError::RpcError(format!("Failed to get current slot: {e}")))?;
923 let current_time = Utc::now().timestamp();
924 let time_diff_seconds = current_time.saturating_sub(timestamp);
925 let time_diff_ms = time_diff_seconds.saturating_mul(1000);
926 let slot_diff = time_diff_ms / SLOT_DURATION_MS;
927
928 let approximate_slot = if slot_diff > 0 {
930 current_slot.saturating_sub(u64::try_from(slot_diff).unwrap_or(u64::MAX))
932 } else {
933 let abs_diff = slot_diff.unsigned_abs();
935 current_slot.saturating_add(abs_diff)
936 };
937
938 trace!(
939 service = "tally-sdk",
940 component = "event_query_client",
941 event = "timestamp_to_slot_conversion",
942 timestamp = timestamp,
943 current_slot = current_slot,
944 approximate_slot = approximate_slot,
945 "Converted timestamp to approximate slot"
946 );
947
948 Ok(approximate_slot)
949 }
950
951 fn get_from_cache(&self, key: &QueryKey) -> Option<Vec<ParsedEvent>> {
953 if let Ok(mut cache) = self.cache.lock() {
954 if let Some(entry) = cache.get(key) {
955 if !entry.is_expired() {
956 return Some(entry.events.clone());
957 }
958 cache.pop(key);
960 }
961 }
962 None
963 }
964
965 fn store_in_cache(&self, key: QueryKey, events: Vec<ParsedEvent>) {
967 if let Ok(mut cache) = self.cache.lock() {
968 let entry = CacheEntry::new(events, self.config.cache_ttl_seconds);
969 cache.put(key, entry);
970 }
971 }
972
973 pub fn clear_cache(&self) {
975 if let Ok(mut cache) = self.cache.lock() {
976 cache.clear();
977 }
978
979 info!(
980 service = "tally-sdk",
981 component = "event_query_client",
982 event = "cache_cleared",
983 "Query cache has been cleared"
984 );
985 }
986
987 #[must_use]
989 pub fn get_cache_stats(&self) -> HashMap<String, u64> {
990 let mut stats = HashMap::new();
991
992 if let Ok(cache) = self.cache.lock() {
993 stats.insert("cache_size".to_string(), cache.len() as u64);
994 stats.insert("cache_capacity".to_string(), cache.cap().get() as u64);
995 }
996
997 stats
998 }
999
1000 pub fn health_check(&self) -> bool {
1002 match self.sdk_client.get_health() {
1003 Ok(()) => {
1004 debug!(
1005 service = "tally-sdk",
1006 component = "event_query_client",
1007 event = "health_check_success",
1008 "RPC client health check passed"
1009 );
1010 true
1011 }
1012 Err(e) => {
1013 warn!(
1014 service = "tally-sdk",
1015 component = "event_query_client",
1016 event = "health_check_failed",
1017 error = %e,
1018 "RPC client health check failed"
1019 );
1020 false
1021 }
1022 }
1023 }
1024}
1025
1026#[cfg(test)]
1027mod tests {
1028 use super::*;
1029
1030 fn create_test_config() -> EventQueryClientConfig {
1031 EventQueryClientConfig {
1032 rpc_url: "http://localhost:8899".to_string(),
1033 program_id: crate::program_id(),
1034 query_config: EventQueryConfig::default(),
1035 }
1036 }
1037
1038 #[test]
1039 fn test_event_query_client_creation() {
1040 let config = create_test_config();
1041 let client = EventQueryClient::new(config);
1042 assert!(client.is_ok());
1043 }
1044
1045 #[test]
1046 fn test_event_query_config_default() {
1047 let config = EventQueryConfig::default();
1048 assert_eq!(config.max_events_per_query, 1000);
1049 assert_eq!(config.max_signatures_per_batch, 100);
1050 assert!(config.enable_cache);
1051 assert_eq!(config.cache_ttl_seconds, 300);
1052 }
1053
1054 #[test]
1055 fn test_cache_entry_expiry() {
1056 let events = vec![];
1057 let entry = CacheEntry::new(events, 1); assert!(!entry.is_expired());
1060
1061 let mut expired_entry = entry;
1063 expired_entry.cached_at = Utc::now() - chrono::Duration::seconds(2);
1064 assert!(expired_entry.is_expired());
1065 }
1066
1067 #[test]
1068 fn test_query_key_equality() {
1069 let merchant = Pubkey::new_unique();
1070
1071 let key1 = QueryKey {
1072 merchant,
1073 query_type: QueryType::Recent,
1074 limit: 100,
1075 from_slot: None,
1076 to_slot: None,
1077 };
1078
1079 let key2 = QueryKey {
1080 merchant,
1081 query_type: QueryType::Recent,
1082 limit: 100,
1083 from_slot: None,
1084 to_slot: None,
1085 };
1086
1087 assert_eq!(key1, key2);
1088
1089 let key3 = QueryKey {
1090 merchant,
1091 query_type: QueryType::MerchantEvents,
1092 limit: 100,
1093 from_slot: None,
1094 to_slot: None,
1095 };
1096
1097 assert_ne!(key1, key3);
1098 }
1099
1100 #[tokio::test]
1101 async fn test_timestamp_to_slot_conversion() {
1102 let config = create_test_config();
1103 let _client = EventQueryClient::new(config).unwrap();
1104
1105 let _current_time = Utc::now().timestamp();
1107
1108 }
1113
1114 #[test]
1115 fn test_cache_operations() {
1116 let config = create_test_config();
1117 let client = EventQueryClient::new(config).unwrap();
1118
1119 let key = QueryKey {
1120 merchant: Pubkey::new_unique(),
1121 query_type: QueryType::Recent,
1122 limit: 100,
1123 from_slot: None,
1124 to_slot: None,
1125 };
1126
1127 assert!(client.get_from_cache(&key).is_none());
1129
1130 let events = vec![];
1132 client.store_in_cache(key.clone(), events);
1133
1134 assert!(client.get_from_cache(&key).is_some());
1136
1137 client.clear_cache();
1139
1140 assert!(client.get_from_cache(&key).is_none());
1142 }
1143
1144 #[test]
1145 fn test_cache_stats() {
1146 let config = create_test_config();
1147 let client = EventQueryClient::new(config).unwrap();
1148
1149 let stats = client.get_cache_stats();
1150 assert!(stats.contains_key("cache_size"));
1151 assert!(stats.contains_key("cache_capacity"));
1152 assert_eq!(stats["cache_size"], 0);
1153 }
1154}