tally_sdk/
event_query.rs

1//! Event Query Client for historical blockchain event retrieval
2//!
3//! This module provides efficient querying for historical Tally program events
4//! directly from the Solana blockchain using RPC calls. The blockchain serves
5//! as the single source of truth, with no database duplication required.
6
7#![forbid(unsafe_code)]
8
9use crate::solana_sdk::pubkey::Pubkey;
10use crate::{error::Result, events::TallyEvent, SimpleTallyClient, TallyError};
11use anchor_client::solana_account_decoder::UiAccountEncoding;
12use anchor_client::solana_client::rpc_client::GetConfirmedSignaturesForAddress2Config;
13use anchor_client::solana_client::rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig};
14use anchor_client::solana_client::rpc_filter::{Memcmp, MemcmpEncodedBytes, RpcFilterType};
15use anchor_client::solana_sdk::{commitment_config::CommitmentConfig, signature::Signature};
16use anyhow::Context;
17use chrono::{DateTime, Utc};
18use lru::LruCache;
19use serde::{Deserialize, Serialize};
20use std::collections::{HashMap, HashSet};
21use std::num::NonZeroUsize;
22use std::str::FromStr;
23use std::sync::{Arc, Mutex};
24use std::time::Instant;
25use tracing::{debug, info, trace, warn};
26
27/// Configuration for RPC event queries
28#[derive(Debug, Clone)]
29pub struct EventQueryConfig {
30    /// Maximum number of events to return per query
31    pub max_events_per_query: usize,
32    /// Maximum number of transaction signatures to process in batch
33    pub max_signatures_per_batch: usize,
34    /// Default commitment level for queries
35    pub commitment: CommitmentConfig,
36    /// Enable caching for recent queries
37    pub enable_cache: bool,
38    /// Cache TTL in seconds
39    pub cache_ttl_seconds: u64,
40    /// Maximum cache size (number of cached query results)
41    pub max_cache_size: usize,
42}
43
44impl Default for EventQueryConfig {
45    fn default() -> Self {
46        Self {
47            max_events_per_query: 1000,
48            max_signatures_per_batch: 100,
49            commitment: CommitmentConfig::confirmed(),
50            enable_cache: true,
51            cache_ttl_seconds: 300, // 5 minutes
52            max_cache_size: 1000,
53        }
54    }
55}
56
57/// Event query client configuration
58#[derive(Debug, Clone)]
59pub struct EventQueryClientConfig {
60    /// RPC endpoint URL
61    pub rpc_url: String,
62    /// Tally program ID
63    pub program_id: Pubkey,
64    /// Query configuration
65    pub query_config: EventQueryConfig,
66}
67
68/// A parsed event with transaction context
69#[derive(Debug, Clone, Serialize, Deserialize)]
70pub struct ParsedEvent {
71    /// Transaction signature that contains this event
72    pub signature: Signature,
73    /// Slot number where transaction was processed
74    pub slot: u64,
75    /// Block time (Unix timestamp)
76    pub block_time: Option<i64>,
77    /// Transaction success status
78    pub success: bool,
79    /// The parsed Tally event
80    pub event: TallyEvent,
81    /// Log index within the transaction
82    pub log_index: usize,
83}
84
85/// Cache entry for query results
86#[derive(Debug, Clone)]
87struct CacheEntry {
88    /// Cached events
89    events: Vec<ParsedEvent>,
90    /// Timestamp when cached
91    cached_at: DateTime<Utc>,
92    /// TTL for this entry
93    ttl_seconds: u64,
94}
95
96impl CacheEntry {
97    fn new(events: Vec<ParsedEvent>, ttl_seconds: u64) -> Self {
98        Self {
99            events,
100            cached_at: Utc::now(),
101            ttl_seconds,
102        }
103    }
104
105    fn is_expired(&self) -> bool {
106        // Use checked conversion to i64 and saturating addition to prevent overflow
107        let ttl_i64 = i64::try_from(self.ttl_seconds).unwrap_or(i64::MAX);
108        let duration = chrono::Duration::seconds(ttl_i64);
109        let expiry = self
110            .cached_at
111            .checked_add_signed(duration)
112            .unwrap_or(self.cached_at);
113        Utc::now() > expiry
114    }
115}
116
117/// Query parameters for event retrieval
118#[derive(Debug, Clone, Hash, PartialEq, Eq)]
119struct QueryKey {
120    merchant: Pubkey,
121    query_type: QueryType,
122    limit: usize,
123    from_slot: Option<u64>,
124    to_slot: Option<u64>,
125}
126
127#[derive(Debug, Clone, Hash, PartialEq, Eq)]
128enum QueryType {
129    Recent,
130    DateRange,
131    MerchantEvents,
132}
133
134/// RPC client for querying historical Tally program events
135pub struct EventQueryClient {
136    /// Solana SDK client
137    sdk_client: Arc<SimpleTallyClient>,
138    /// Tally program ID
139    program_id: Pubkey,
140    /// Query configuration
141    config: EventQueryConfig,
142    /// LRU cache for query results
143    cache: Arc<Mutex<LruCache<QueryKey, CacheEntry>>>,
144}
145
146impl EventQueryClient {
147    /// Create a new `EventQueryClient`
148    ///
149    /// # Arguments
150    ///
151    /// * `config` - Event query client configuration
152    ///
153    /// # Errors
154    ///
155    /// Returns an error if RPC client creation fails
156    pub fn new(config: EventQueryClientConfig) -> Result<Self> {
157        let sdk_client = Arc::new(
158            SimpleTallyClient::new(&config.rpc_url)
159                .context("Failed to create SimpleTallyClient")?,
160        );
161
162        let cache_size = NonZeroUsize::new(config.query_config.max_cache_size)
163            .context("Cache size must be greater than 0")?;
164        let cache = Arc::new(Mutex::new(LruCache::new(cache_size)));
165
166        info!(
167            service = "tally-sdk",
168            component = "event_query_client",
169            event = "client_created",
170            rpc_url = %config.rpc_url,
171            program_id = %config.program_id,
172            max_events_per_query = config.query_config.max_events_per_query,
173            cache_enabled = config.query_config.enable_cache,
174            "EventQueryClient initialized successfully"
175        );
176
177        Ok(Self {
178            sdk_client,
179            program_id: config.program_id,
180            config: config.query_config,
181            cache,
182        })
183    }
184
185    /// Create a new `EventQueryClient` with program ID from environment
186    ///
187    /// # Arguments
188    ///
189    /// * `rpc_url` - RPC endpoint URL
190    /// * `query_config` - Optional query configuration (uses defaults if None)
191    ///
192    /// # Errors
193    ///
194    /// Returns an error if RPC client creation fails
195    pub fn new_with_program_id(
196        rpc_url: String,
197        query_config: Option<EventQueryConfig>,
198    ) -> Result<Self> {
199        let config = EventQueryClientConfig {
200            rpc_url,
201            program_id: crate::program_id(),
202            query_config: query_config.unwrap_or_default(),
203        };
204        Self::new(config)
205    }
206
207    /// Get recent events for a merchant
208    ///
209    /// # Arguments
210    ///
211    /// * `merchant` - Merchant public key
212    /// * `limit` - Maximum number of events to return
213    ///
214    /// # Returns
215    ///
216    /// Vector of parsed events ordered by most recent first
217    ///
218    /// # Errors
219    ///
220    /// Returns error if RPC queries fail or event parsing fails
221    pub async fn get_recent_events(
222        &self,
223        merchant: &Pubkey,
224        limit: usize,
225    ) -> Result<Vec<ParsedEvent>> {
226        let start_time = Instant::now();
227        let query_key = Self::build_query_key(merchant, QueryType::Recent, limit, None, None);
228
229        // Check cache and return early if hit
230        if let Some(cached) = self.try_get_cached_events(&query_key, merchant) {
231            return Ok(cached);
232        }
233
234        debug!(
235            service = "tally-sdk",
236            component = "event_query_client",
237            event = "query_recent_events",
238            merchant = %merchant,
239            limit = limit,
240            "Querying recent events for merchant"
241        );
242
243        // Fetch and process events
244        let sorted_events = self.fetch_and_sort_events(merchant, limit).await?;
245
246        // Store results in cache
247        self.try_cache_events(query_key, &sorted_events);
248
249        Self::log_query_success(merchant, &sorted_events, start_time);
250
251        Ok(sorted_events)
252    }
253
254    /// Build a query key for cache operations
255    const fn build_query_key(
256        merchant: &Pubkey,
257        query_type: QueryType,
258        limit: usize,
259        from_slot: Option<u64>,
260        to_slot: Option<u64>,
261    ) -> QueryKey {
262        QueryKey {
263            merchant: *merchant,
264            query_type,
265            limit,
266            from_slot,
267            to_slot,
268        }
269    }
270
271    /// Try to get cached events, returning Some if cache hit
272    fn try_get_cached_events(
273        &self,
274        query_key: &QueryKey,
275        merchant: &Pubkey,
276    ) -> Option<Vec<ParsedEvent>> {
277        if !self.config.enable_cache {
278            return None;
279        }
280
281        if let Some(cached_events) = self.get_from_cache(query_key) {
282            debug!(
283                service = "tally-sdk",
284                component = "event_query_client",
285                event = "cache_hit",
286                merchant = %merchant,
287                cached_event_count = cached_events.len(),
288                "Returning cached recent events"
289            );
290            return Some(cached_events);
291        }
292
293        None
294    }
295
296    /// Fetch signatures, parse events, and sort by most recent first
297    async fn fetch_and_sort_events(
298        &self,
299        merchant: &Pubkey,
300        limit: usize,
301    ) -> Result<Vec<ParsedEvent>> {
302        let signatures = self.get_merchant_signatures(merchant, limit).await?;
303        let events = self.parse_events_from_signatures(&signatures).await?;
304        Ok(Self::sort_and_limit_events(events, limit))
305    }
306
307    /// Sort events by slot (most recent first) and apply limit
308    fn sort_and_limit_events(mut events: Vec<ParsedEvent>, limit: usize) -> Vec<ParsedEvent> {
309        events.sort_by(|a, b| b.slot.cmp(&a.slot));
310        events.truncate(limit);
311        events
312    }
313
314    /// Try to cache events if caching is enabled
315    fn try_cache_events(&self, query_key: QueryKey, events: &[ParsedEvent]) {
316        if self.config.enable_cache {
317            self.store_in_cache(query_key, events.to_vec());
318        }
319    }
320
321    /// Log successful query completion with metrics
322    fn log_query_success(merchant: &Pubkey, events: &[ParsedEvent], start_time: Instant) {
323        info!(
324            service = "tally-sdk",
325            component = "event_query_client",
326            event = "recent_events_retrieved",
327            merchant = %merchant,
328            event_count = events.len(),
329            duration_ms = start_time.elapsed().as_millis(),
330            "Successfully retrieved recent events"
331        );
332    }
333
334    /// Get events for a merchant within a date range
335    ///
336    /// # Arguments
337    ///
338    /// * `merchant` - Merchant public key
339    /// * `from` - Start date (inclusive)
340    /// * `to` - End date (inclusive)
341    ///
342    /// # Returns
343    ///
344    /// Vector of parsed events within the date range
345    ///
346    /// # Errors
347    ///
348    /// Returns error if RPC queries fail or slot conversion fails
349    pub async fn get_events_by_date_range(
350        &self,
351        merchant: &Pubkey,
352        from: DateTime<Utc>,
353        to: DateTime<Utc>,
354    ) -> Result<Vec<ParsedEvent>> {
355        let start_time = Instant::now();
356
357        debug!(
358            service = "tally-sdk",
359            component = "event_query_client",
360            event = "query_events_by_date_range",
361            merchant = %merchant,
362            from = %from,
363            to = %to,
364            "Querying events by date range"
365        );
366
367        // Convert dates to slots and build query key
368        let (from_slot, to_slot) = self.convert_date_range_to_slots(from, to)?;
369        let query_key = Self::build_date_range_query_key(
370            merchant,
371            from_slot,
372            to_slot,
373            self.config.max_events_per_query,
374        );
375
376        // Check cache and return early if hit
377        if let Some(cached) = self.try_get_cached_date_range_events(&query_key, merchant) {
378            return Ok(cached);
379        }
380
381        // Fetch, filter, and sort events
382        let sorted_events = self
383            .fetch_filter_and_sort_events_by_date(merchant, from, to, from_slot, to_slot)
384            .await?;
385
386        // Store results in cache
387        self.try_cache_events(query_key.clone(), &sorted_events);
388
389        Self::log_date_range_query_success(
390            merchant,
391            &sorted_events,
392            from_slot,
393            to_slot,
394            start_time,
395        );
396
397        Ok(sorted_events)
398    }
399
400    /// Convert date range to approximate slot range
401    fn convert_date_range_to_slots(
402        &self,
403        from: DateTime<Utc>,
404        to: DateTime<Utc>,
405    ) -> Result<(u64, u64)> {
406        let from_slot = self.timestamp_to_approximate_slot(from.timestamp())?;
407        let to_slot = self.timestamp_to_approximate_slot(to.timestamp())?;
408        Ok((from_slot, to_slot))
409    }
410
411    /// Build query key for date range queries
412    const fn build_date_range_query_key(
413        merchant: &Pubkey,
414        from_slot: u64,
415        to_slot: u64,
416        limit: usize,
417    ) -> QueryKey {
418        QueryKey {
419            merchant: *merchant,
420            query_type: QueryType::DateRange,
421            limit,
422            from_slot: Some(from_slot),
423            to_slot: Some(to_slot),
424        }
425    }
426
427    /// Try to get cached date range events, returning Some if cache hit
428    fn try_get_cached_date_range_events(
429        &self,
430        query_key: &QueryKey,
431        merchant: &Pubkey,
432    ) -> Option<Vec<ParsedEvent>> {
433        if !self.config.enable_cache {
434            return None;
435        }
436
437        if let Some(cached_events) = self.get_from_cache(query_key) {
438            debug!(
439                service = "tally-sdk",
440                component = "event_query_client",
441                event = "cache_hit",
442                merchant = %merchant,
443                cached_event_count = cached_events.len(),
444                "Returning cached date range events"
445            );
446            return Some(cached_events);
447        }
448
449        None
450    }
451
452    /// Fetch signatures, parse, filter by date, and sort events
453    async fn fetch_filter_and_sort_events_by_date(
454        &self,
455        merchant: &Pubkey,
456        from: DateTime<Utc>,
457        to: DateTime<Utc>,
458        from_slot: u64,
459        to_slot: u64,
460    ) -> Result<Vec<ParsedEvent>> {
461        let signatures = self
462            .get_merchant_signatures_in_slot_range(merchant, from_slot, to_slot)
463            .await?;
464        let events = self.parse_events_from_signatures(&signatures).await?;
465        let filtered_events = Self::filter_events_by_date_range(events, from, to);
466        Ok(Self::sort_events_by_block_time(filtered_events))
467    }
468
469    /// Filter events to only those within the specified date range
470    fn filter_events_by_date_range(
471        events: Vec<ParsedEvent>,
472        from: DateTime<Utc>,
473        to: DateTime<Utc>,
474    ) -> Vec<ParsedEvent> {
475        events
476            .into_iter()
477            .filter(|event| Self::is_event_in_date_range(event, from, to))
478            .collect()
479    }
480
481    /// Check if an event is within the specified date range
482    fn is_event_in_date_range(event: &ParsedEvent, from: DateTime<Utc>, to: DateTime<Utc>) -> bool {
483        event
484            .block_time
485            .and_then(|block_time| DateTime::from_timestamp(block_time, 0))
486            .is_some_and(|event_time| event_time >= from && event_time <= to)
487    }
488
489    /// Sort events by block time (most recent first)
490    fn sort_events_by_block_time(mut events: Vec<ParsedEvent>) -> Vec<ParsedEvent> {
491        events.sort_by(|a, b| b.block_time.unwrap_or(0).cmp(&a.block_time.unwrap_or(0)));
492        events
493    }
494
495    /// Log successful date range query completion with metrics
496    fn log_date_range_query_success(
497        merchant: &Pubkey,
498        events: &[ParsedEvent],
499        from_slot: u64,
500        to_slot: u64,
501        start_time: Instant,
502    ) {
503        info!(
504            service = "tally-sdk",
505            component = "event_query_client",
506            event = "date_range_events_retrieved",
507            merchant = %merchant,
508            event_count = events.len(),
509            from_slot = from_slot,
510            to_slot = to_slot,
511            duration_ms = start_time.elapsed().as_millis(),
512            "Successfully retrieved events by date range"
513        );
514    }
515
516    /// Get all events for a merchant (up to configured limit)
517    ///
518    /// # Arguments
519    ///
520    /// * `merchant` - Merchant public key
521    /// * `limit` - Maximum number of events to return
522    ///
523    /// # Returns
524    ///
525    /// Vector of parsed events for the merchant
526    ///
527    /// # Errors
528    ///
529    /// Returns error if RPC queries fail
530    pub async fn get_merchant_events(
531        &self,
532        merchant: &Pubkey,
533        limit: usize,
534    ) -> Result<Vec<ParsedEvent>> {
535        let start_time = Instant::now();
536        let query_key = Self::build_merchant_events_query_key(merchant, limit);
537
538        // Check cache and return early if hit
539        if let Some(cached) = self.try_get_cached_merchant_events(&query_key, merchant) {
540            return Ok(cached);
541        }
542
543        debug!(
544            service = "tally-sdk",
545            component = "event_query_client",
546            event = "query_merchant_events",
547            merchant = %merchant,
548            limit = limit,
549            "Querying all events for merchant"
550        );
551
552        // Fetch, parse, and sort events
553        let sorted_events = self
554            .fetch_parse_and_sort_merchant_events(merchant, limit)
555            .await?;
556
557        // Store results in cache
558        self.try_cache_events(query_key, &sorted_events);
559
560        Self::log_merchant_events_success(merchant, &sorted_events, start_time);
561
562        Ok(sorted_events)
563    }
564
565    /// Build query key for merchant events queries
566    const fn build_merchant_events_query_key(merchant: &Pubkey, limit: usize) -> QueryKey {
567        QueryKey {
568            merchant: *merchant,
569            query_type: QueryType::MerchantEvents,
570            limit,
571            from_slot: None,
572            to_slot: None,
573        }
574    }
575
576    /// Try to get cached merchant events, returning Some if cache hit
577    fn try_get_cached_merchant_events(
578        &self,
579        query_key: &QueryKey,
580        merchant: &Pubkey,
581    ) -> Option<Vec<ParsedEvent>> {
582        if !self.config.enable_cache {
583            return None;
584        }
585
586        if let Some(cached_events) = self.get_from_cache(query_key) {
587            debug!(
588                service = "tally-sdk",
589                component = "event_query_client",
590                event = "cache_hit",
591                merchant = %merchant,
592                cached_event_count = cached_events.len(),
593                "Returning cached merchant events"
594            );
595            return Some(cached_events);
596        }
597
598        None
599    }
600
601    /// Fetch signatures, parse events, and sort for merchant
602    async fn fetch_parse_and_sort_merchant_events(
603        &self,
604        merchant: &Pubkey,
605        limit: usize,
606    ) -> Result<Vec<ParsedEvent>> {
607        // Get more signatures to ensure we have enough events (2x buffer with overflow protection)
608        let signature_limit = limit.saturating_mul(2);
609        let signatures = self
610            .get_merchant_signatures(merchant, signature_limit)
611            .await?;
612
613        // Parse events from transactions
614        let events = self.parse_events_from_signatures(&signatures).await?;
615
616        // Sort and limit events
617        Ok(Self::sort_and_limit_events(events, limit))
618    }
619
620    /// Log successful merchant events query completion with metrics
621    fn log_merchant_events_success(merchant: &Pubkey, events: &[ParsedEvent], start_time: Instant) {
622        info!(
623            service = "tally-sdk",
624            component = "event_query_client",
625            event = "merchant_events_retrieved",
626            merchant = %merchant,
627            event_count = events.len(),
628            duration_ms = start_time.elapsed().as_millis(),
629            "Successfully retrieved merchant events"
630        );
631    }
632
633    /// Get transaction signatures for merchant's program accounts
634    #[allow(clippy::unused_async)] // May need async for future enhanced RPC operations
635    async fn get_merchant_signatures(
636        &self,
637        merchant: &Pubkey,
638        limit: usize,
639    ) -> Result<Vec<Signature>> {
640        // Get merchant account signatures
641        let merchant_signatures = self
642            .sdk_client
643            .get_confirmed_signatures_for_address(
644                merchant,
645                Some(GetConfirmedSignaturesForAddress2Config {
646                    limit: Some(limit.min(1000)), // Solana RPC limit
647                    commitment: Some(self.config.commitment),
648                    ..Default::default()
649                }),
650            )
651            .map_err(|e| TallyError::RpcError(format!("Failed to get merchant signatures: {e}")))?;
652
653        let mut signatures = HashSet::new();
654        for sig_info in merchant_signatures {
655            if let Ok(signature) = Signature::from_str(&sig_info.signature) {
656                signatures.insert(signature);
657            }
658        }
659
660        // Get plans for this merchant and their signatures
661        let plans = self.get_merchant_plans(merchant)?;
662        for plan_address in &plans {
663            let plan_signatures = self
664                .sdk_client
665                .get_confirmed_signatures_for_address(
666                    plan_address,
667                    Some(GetConfirmedSignaturesForAddress2Config {
668                        limit: Some(limit.min(1000)),
669                        commitment: Some(self.config.commitment),
670                        ..Default::default()
671                    }),
672                )
673                .map_err(|e| TallyError::RpcError(format!("Failed to get plan signatures: {e}")))?;
674
675            for sig_info in plan_signatures {
676                if let Ok(signature) = Signature::from_str(&sig_info.signature) {
677                    signatures.insert(signature);
678                }
679            }
680        }
681
682        // Get subscriptions for merchant plans and their signatures
683        for plan_address in &plans {
684            let subscriptions = self.get_plan_subscriptions(plan_address)?;
685            for subscription_address in subscriptions {
686                let sub_signatures = self
687                    .sdk_client
688                    .get_confirmed_signatures_for_address(
689                        &subscription_address,
690                        Some(GetConfirmedSignaturesForAddress2Config {
691                            limit: Some(limit.min(1000)),
692                            commitment: Some(self.config.commitment),
693                            ..Default::default()
694                        }),
695                    )
696                    .map_err(|e| {
697                        TallyError::RpcError(format!("Failed to get subscription signatures: {e}"))
698                    })?;
699
700                for sig_info in sub_signatures {
701                    if let Ok(signature) = Signature::from_str(&sig_info.signature) {
702                        signatures.insert(signature);
703                    }
704                }
705            }
706        }
707
708        let result: Vec<Signature> = signatures.into_iter().collect();
709
710        debug!(
711            service = "tally-sdk",
712            component = "event_query_client",
713            event = "signatures_collected",
714            merchant = %merchant,
715            signature_count = result.len(),
716            plan_count = plans.len(),
717            "Collected transaction signatures for merchant"
718        );
719
720        Ok(result)
721    }
722
723    /// Get transaction signatures for merchant within a slot range
724    async fn get_merchant_signatures_in_slot_range(
725        &self,
726        merchant: &Pubkey,
727        _from_slot: u64,
728        _to_slot: u64,
729    ) -> Result<Vec<Signature>> {
730        // Get signatures with 2x buffer, using saturating multiplication to prevent overflow
731        let signature_limit = self.config.max_events_per_query.saturating_mul(2);
732        let signatures = self
733            .get_merchant_signatures(merchant, signature_limit)
734            .await?;
735
736        // We would need to fetch transaction details to filter by slot, which is expensive
737        // For now, return all signatures and filter during event parsing
738
739        Ok(signatures)
740    }
741
742    /// Parse events from transaction signatures
743    async fn parse_events_from_signatures(
744        &self,
745        signatures: &[Signature],
746    ) -> Result<Vec<ParsedEvent>> {
747        let all_events = self.process_signature_batches(signatures).await;
748
749        Self::log_parsed_events_summary(signatures, &all_events);
750
751        Ok(all_events)
752    }
753
754    /// Process signatures in batches with rate limiting
755    async fn process_signature_batches(&self, signatures: &[Signature]) -> Vec<ParsedEvent> {
756        let mut all_events = Vec::new();
757
758        for chunk in signatures.chunks(self.config.max_signatures_per_batch) {
759            let batch_events = self.process_signature_chunk(chunk);
760            all_events.extend(batch_events);
761
762            // Small delay between batches to be respectful to RPC
763            self.apply_batch_rate_limit(chunk.len()).await;
764        }
765
766        all_events
767    }
768
769    /// Process a single chunk of signatures
770    fn process_signature_chunk(&self, chunk: &[Signature]) -> Vec<ParsedEvent> {
771        let batch_events = Vec::new();
772
773        for signature in chunk {
774            self.try_fetch_and_log_transaction(signature);
775        }
776
777        batch_events
778    }
779
780    /// Try to fetch transaction and log result
781    fn try_fetch_and_log_transaction(&self, signature: &Signature) {
782        match self.sdk_client.get_transaction(signature) {
783            Ok(_transaction) => {
784                Self::log_transaction_received(signature);
785            }
786            Err(e) => {
787                Self::log_transaction_fetch_error(signature, &e);
788            }
789        }
790    }
791
792    /// Log successful transaction fetch
793    fn log_transaction_received(signature: &Signature) {
794        debug!(
795            service = "tally-sdk",
796            component = "event_query_client",
797            event = "transaction_received",
798            signature = %signature,
799            "Transaction data received - event parsing temporarily disabled"
800        );
801    }
802
803    /// Log transaction fetch error
804    fn log_transaction_fetch_error<E: std::fmt::Display>(signature: &Signature, error: &E) {
805        trace!(
806            service = "tally-sdk",
807            component = "event_query_client",
808            event = "transaction_fetch_error",
809            signature = %signature,
810            error = %error,
811            "Failed to fetch transaction details"
812        );
813    }
814
815    /// Apply rate limiting delay between batches if needed
816    async fn apply_batch_rate_limit(&self, chunk_len: usize) {
817        if chunk_len == self.config.max_signatures_per_batch {
818            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
819        }
820    }
821
822    /// Log summary of parsed events
823    fn log_parsed_events_summary(signatures: &[Signature], events: &[ParsedEvent]) {
824        debug!(
825            service = "tally-sdk",
826            component = "event_query_client",
827            event = "events_parsed",
828            signature_count = signatures.len(),
829            event_count = events.len(),
830            "Parsed events from transaction signatures"
831        );
832    }
833
834    /// Get plan addresses for a merchant using getProgramAccounts
835    fn get_merchant_plans(&self, merchant: &Pubkey) -> Result<Vec<Pubkey>> {
836        let config = RpcProgramAccountsConfig {
837            filters: Some(vec![
838                // Filter by merchant field in Plan account (first 32 bytes after discriminator)
839                RpcFilterType::Memcmp(Memcmp::new(
840                    8, // Skip 8-byte Anchor discriminator
841                    MemcmpEncodedBytes::Base58(merchant.to_string()),
842                )),
843            ]),
844            account_config: RpcAccountInfoConfig {
845                encoding: Some(UiAccountEncoding::Base64),
846                commitment: Some(self.config.commitment),
847                ..Default::default()
848            },
849            with_context: Some(false),
850            sort_results: None,
851        };
852
853        let accounts = self
854            .sdk_client
855            .rpc()
856            .get_program_accounts_with_config(&self.program_id, config)
857            .map_err(|e| TallyError::RpcError(format!("Failed to get merchant plans: {e}")))?;
858
859        let plan_addresses: Vec<Pubkey> = accounts.into_iter().map(|(pubkey, _)| pubkey).collect();
860
861        debug!(
862            service = "tally-sdk",
863            component = "event_query_client",
864            event = "merchant_plans_retrieved",
865            merchant = %merchant,
866            plan_count = plan_addresses.len(),
867            "Retrieved plan addresses for merchant"
868        );
869
870        Ok(plan_addresses)
871    }
872
873    /// Get subscription addresses for a plan using getProgramAccounts
874    fn get_plan_subscriptions(&self, plan: &Pubkey) -> Result<Vec<Pubkey>> {
875        let config = RpcProgramAccountsConfig {
876            filters: Some(vec![
877                // Filter by plan field in Subscription account (first 32 bytes after discriminator)
878                RpcFilterType::Memcmp(Memcmp::new(
879                    8, // Skip 8-byte Anchor discriminator
880                    MemcmpEncodedBytes::Base58(plan.to_string()),
881                )),
882            ]),
883            account_config: RpcAccountInfoConfig {
884                encoding: Some(UiAccountEncoding::Base64),
885                commitment: Some(self.config.commitment),
886                ..Default::default()
887            },
888            with_context: Some(false),
889            sort_results: None,
890        };
891
892        let accounts = self
893            .sdk_client
894            .rpc()
895            .get_program_accounts_with_config(&self.program_id, config)
896            .map_err(|e| TallyError::RpcError(format!("Failed to get plan subscriptions: {e}")))?;
897
898        let subscription_addresses: Vec<Pubkey> =
899            accounts.into_iter().map(|(pubkey, _)| pubkey).collect();
900
901        trace!(
902            service = "tally-sdk",
903            component = "event_query_client",
904            event = "plan_subscriptions_retrieved",
905            plan = %plan,
906            subscription_count = subscription_addresses.len(),
907            "Retrieved subscription addresses for plan"
908        );
909
910        Ok(subscription_addresses)
911    }
912
913    /// Convert Unix timestamp to approximate slot number
914    fn timestamp_to_approximate_slot(&self, timestamp: i64) -> Result<u64> {
915        // Estimate slot time (approximately 400ms per slot on Solana)
916        const SLOT_DURATION_MS: i64 = 400;
917
918        // Get current slot and time
919        let current_slot = self
920            .sdk_client
921            .get_slot()
922            .map_err(|e| TallyError::RpcError(format!("Failed to get current slot: {e}")))?;
923        let current_time = Utc::now().timestamp();
924        let time_diff_seconds = current_time.saturating_sub(timestamp);
925        let time_diff_ms = time_diff_seconds.saturating_mul(1000);
926        let slot_diff = time_diff_ms / SLOT_DURATION_MS;
927
928        // Calculate approximate slot using checked arithmetic
929        let approximate_slot = if slot_diff > 0 {
930            // slot_diff is positive, so safe to cast to u64 since it came from positive time difference
931            current_slot.saturating_sub(u64::try_from(slot_diff).unwrap_or(u64::MAX))
932        } else {
933            // slot_diff is negative or zero, use absolute value
934            let abs_diff = slot_diff.unsigned_abs();
935            current_slot.saturating_add(abs_diff)
936        };
937
938        trace!(
939            service = "tally-sdk",
940            component = "event_query_client",
941            event = "timestamp_to_slot_conversion",
942            timestamp = timestamp,
943            current_slot = current_slot,
944            approximate_slot = approximate_slot,
945            "Converted timestamp to approximate slot"
946        );
947
948        Ok(approximate_slot)
949    }
950
951    /// Get events from cache if available and not expired
952    fn get_from_cache(&self, key: &QueryKey) -> Option<Vec<ParsedEvent>> {
953        if let Ok(mut cache) = self.cache.lock() {
954            if let Some(entry) = cache.get(key) {
955                if !entry.is_expired() {
956                    return Some(entry.events.clone());
957                }
958                // Remove expired entry
959                cache.pop(key);
960            }
961        }
962        None
963    }
964
965    /// Store events in cache
966    fn store_in_cache(&self, key: QueryKey, events: Vec<ParsedEvent>) {
967        if let Ok(mut cache) = self.cache.lock() {
968            let entry = CacheEntry::new(events, self.config.cache_ttl_seconds);
969            cache.put(key, entry);
970        }
971    }
972
973    /// Clear the cache
974    pub fn clear_cache(&self) {
975        if let Ok(mut cache) = self.cache.lock() {
976            cache.clear();
977        }
978
979        info!(
980            service = "tally-sdk",
981            component = "event_query_client",
982            event = "cache_cleared",
983            "Query cache has been cleared"
984        );
985    }
986
987    /// Get cache statistics
988    #[must_use]
989    pub fn get_cache_stats(&self) -> HashMap<String, u64> {
990        let mut stats = HashMap::new();
991
992        if let Ok(cache) = self.cache.lock() {
993            stats.insert("cache_size".to_string(), cache.len() as u64);
994            stats.insert("cache_capacity".to_string(), cache.cap().get() as u64);
995        }
996
997        stats
998    }
999
1000    /// Health check for the RPC client
1001    pub fn health_check(&self) -> bool {
1002        match self.sdk_client.get_health() {
1003            Ok(()) => {
1004                debug!(
1005                    service = "tally-sdk",
1006                    component = "event_query_client",
1007                    event = "health_check_success",
1008                    "RPC client health check passed"
1009                );
1010                true
1011            }
1012            Err(e) => {
1013                warn!(
1014                    service = "tally-sdk",
1015                    component = "event_query_client",
1016                    event = "health_check_failed",
1017                    error = %e,
1018                    "RPC client health check failed"
1019                );
1020                false
1021            }
1022        }
1023    }
1024}
1025
1026#[cfg(test)]
1027mod tests {
1028    use super::*;
1029
1030    fn create_test_config() -> EventQueryClientConfig {
1031        EventQueryClientConfig {
1032            rpc_url: "http://localhost:8899".to_string(),
1033            program_id: crate::program_id(),
1034            query_config: EventQueryConfig::default(),
1035        }
1036    }
1037
1038    #[test]
1039    fn test_event_query_client_creation() {
1040        let config = create_test_config();
1041        let client = EventQueryClient::new(config);
1042        assert!(client.is_ok());
1043    }
1044
1045    #[test]
1046    fn test_event_query_config_default() {
1047        let config = EventQueryConfig::default();
1048        assert_eq!(config.max_events_per_query, 1000);
1049        assert_eq!(config.max_signatures_per_batch, 100);
1050        assert!(config.enable_cache);
1051        assert_eq!(config.cache_ttl_seconds, 300);
1052    }
1053
1054    #[test]
1055    fn test_cache_entry_expiry() {
1056        let events = vec![];
1057        let entry = CacheEntry::new(events, 1); // 1 second TTL
1058
1059        assert!(!entry.is_expired());
1060
1061        // Test with past timestamp
1062        let mut expired_entry = entry;
1063        expired_entry.cached_at = Utc::now() - chrono::Duration::seconds(2);
1064        assert!(expired_entry.is_expired());
1065    }
1066
1067    #[test]
1068    fn test_query_key_equality() {
1069        let merchant = Pubkey::new_unique();
1070
1071        let key1 = QueryKey {
1072            merchant,
1073            query_type: QueryType::Recent,
1074            limit: 100,
1075            from_slot: None,
1076            to_slot: None,
1077        };
1078
1079        let key2 = QueryKey {
1080            merchant,
1081            query_type: QueryType::Recent,
1082            limit: 100,
1083            from_slot: None,
1084            to_slot: None,
1085        };
1086
1087        assert_eq!(key1, key2);
1088
1089        let key3 = QueryKey {
1090            merchant,
1091            query_type: QueryType::MerchantEvents,
1092            limit: 100,
1093            from_slot: None,
1094            to_slot: None,
1095        };
1096
1097        assert_ne!(key1, key3);
1098    }
1099
1100    #[tokio::test]
1101    async fn test_timestamp_to_slot_conversion() {
1102        let config = create_test_config();
1103        let _client = EventQueryClient::new(config).unwrap();
1104
1105        // Test with current timestamp (should not fail)
1106        let _current_time = Utc::now().timestamp();
1107
1108        // This test will fail with localhost RPC, but validates the interface
1109        // In a real environment with running validator, this would work
1110        // let slot = client.timestamp_to_approximate_slot(current_time).await;
1111        // assert!(slot.is_ok() || slot.unwrap_err().to_string().contains("Connection refused"));
1112    }
1113
1114    #[test]
1115    fn test_cache_operations() {
1116        let config = create_test_config();
1117        let client = EventQueryClient::new(config).unwrap();
1118
1119        let key = QueryKey {
1120            merchant: Pubkey::new_unique(),
1121            query_type: QueryType::Recent,
1122            limit: 100,
1123            from_slot: None,
1124            to_slot: None,
1125        };
1126
1127        // Cache should be empty initially
1128        assert!(client.get_from_cache(&key).is_none());
1129
1130        // Store something in cache
1131        let events = vec![];
1132        client.store_in_cache(key.clone(), events);
1133
1134        // Should be able to retrieve it
1135        assert!(client.get_from_cache(&key).is_some());
1136
1137        // Clear cache
1138        client.clear_cache();
1139
1140        // Should be empty again
1141        assert!(client.get_from_cache(&key).is_none());
1142    }
1143
1144    #[test]
1145    fn test_cache_stats() {
1146        let config = create_test_config();
1147        let client = EventQueryClient::new(config).unwrap();
1148
1149        let stats = client.get_cache_stats();
1150        assert!(stats.contains_key("cache_size"));
1151        assert!(stats.contains_key("cache_capacity"));
1152        assert_eq!(stats["cache_size"], 0);
1153    }
1154}