kaccy_db/
analytics.rs

1//! Analytics and materialized views for dashboard metrics
2//!
3//! This module provides SQL for creating and refreshing materialized views
4//! that pre-compute common dashboard metrics for better performance.
5//!
6//! It also includes TimescaleDB integration for time-series data (price and volume history).
7
8use chrono::{DateTime, Utc};
9use serde::Serialize;
10use sqlx::PgPool;
11use uuid::Uuid;
12
13use crate::error::Result;
14
15/// Dashboard overview metrics
16#[derive(Debug, Clone, Serialize, sqlx::FromRow)]
17pub struct DashboardMetrics {
18    /// Total registered users
19    pub total_users: i64,
20    /// Users registered in last 24 hours
21    pub new_users_24h: i64,
22    /// Users registered in last 7 days
23    pub new_users_7d: i64,
24    /// Total active tokens
25    pub total_tokens: i64,
26    /// Tokens created in last 24 hours
27    pub new_tokens_24h: i64,
28    /// Total trades executed
29    pub total_trades: i64,
30    /// Trades in last 24 hours
31    pub trades_24h: i64,
32    /// Total trading volume in BTC (satoshis)
33    pub total_volume_sats: i64,
34    /// Volume in last 24 hours (satoshis)
35    pub volume_24h_sats: i64,
36    /// Total platform fees collected (satoshis)
37    pub total_fees_sats: i64,
38    /// Fees in last 24 hours (satoshis)
39    pub fees_24h_sats: i64,
40    /// Pending commitments count
41    pub pending_commitments: i64,
42    /// Pending KYC applications
43    pub pending_kyc: i64,
44}
45
46/// Token metrics summary
47#[derive(Debug, Clone, Serialize, sqlx::FromRow)]
48pub struct TokenMetricsSummary {
49    /// Token ID
50    pub token_id: uuid::Uuid,
51    /// Token symbol
52    pub symbol: String,
53    /// Token name
54    pub name: String,
55    /// Total supply
56    pub total_supply: rust_decimal::Decimal,
57    /// Number of holders
58    pub holder_count: i64,
59    /// Total trades for this token
60    pub trade_count: i64,
61    /// 24h trade count
62    pub trades_24h: i64,
63    /// Total volume in BTC
64    pub total_volume_btc: rust_decimal::Decimal,
65    /// 24h volume in BTC
66    pub volume_24h_btc: rust_decimal::Decimal,
67    /// Current price (last trade or bonding curve)
68    pub current_price_btc: rust_decimal::Decimal,
69    /// Price change 24h percentage
70    pub price_change_24h_pct: rust_decimal::Decimal,
71}
72
73/// User activity summary
74#[derive(Debug, Clone, Serialize, sqlx::FromRow)]
75pub struct UserActivitySummary {
76    /// User ID
77    pub user_id: uuid::Uuid,
78    /// Username
79    pub username: String,
80    /// Total trades made
81    pub trade_count: i64,
82    /// Total trading volume
83    pub total_volume_btc: rust_decimal::Decimal,
84    /// Number of tokens held
85    pub tokens_held: i64,
86    /// Number of tokens issued
87    pub tokens_issued: i64,
88    /// Reputation score
89    pub reputation_score: i32,
90    /// Last activity timestamp
91    pub last_activity: Option<chrono::DateTime<chrono::Utc>>,
92}
93
94/// Daily statistics
95#[derive(Debug, Clone, Serialize, sqlx::FromRow)]
96pub struct DailyStats {
97    /// Date
98    pub date: chrono::NaiveDate,
99    /// New users
100    pub new_users: i64,
101    /// New tokens
102    pub new_tokens: i64,
103    /// Number of trades
104    pub trade_count: i64,
105    /// Trading volume (satoshis)
106    pub volume_sats: i64,
107    /// Fees collected (satoshis)
108    pub fees_sats: i64,
109    /// Active users (made a trade)
110    pub active_users: i64,
111}
112
113/// Price history record (TimescaleDB hypertable)
114#[derive(Debug, Clone, Serialize, sqlx::FromRow)]
115pub struct PriceHistory {
116    /// Timestamp
117    pub time: DateTime<Utc>,
118    /// Token ID
119    pub token_id: Uuid,
120    /// Price in satoshis
121    pub price_satoshis: i64,
122    /// Total supply at this time
123    pub supply: i64,
124    /// Market cap in satoshis
125    pub market_cap_satoshis: i64,
126}
127
128/// Volume history record (TimescaleDB hypertable)
129#[derive(Debug, Clone, Serialize, sqlx::FromRow)]
130pub struct VolumeHistory {
131    /// Timestamp
132    pub time: DateTime<Utc>,
133    /// Token ID
134    pub token_id: Uuid,
135    /// Buy volume in satoshis
136    pub buy_volume_satoshis: i64,
137    /// Sell volume in satoshis
138    pub sell_volume_satoshis: i64,
139    /// Number of trades
140    pub trade_count: i32,
141    /// Number of unique traders
142    pub unique_traders: i32,
143}
144
145/// Platform volume history record (TimescaleDB hypertable)
146#[derive(Debug, Clone, Serialize, sqlx::FromRow)]
147pub struct PlatformVolumeHistory {
148    /// Timestamp
149    pub time: DateTime<Utc>,
150    /// Total volume in satoshis
151    pub total_volume_satoshis: i64,
152    /// Number of trades
153    pub trade_count: i32,
154    /// Number of active tokens
155    pub active_tokens: i32,
156    /// Number of active traders
157    pub active_traders: i32,
158    /// Fees collected in satoshis
159    pub fees_collected_satoshis: i64,
160}
161
162/// OHLC (Open-High-Low-Close) price data
163#[derive(Debug, Clone, Serialize, sqlx::FromRow)]
164pub struct OhlcData {
165    /// Time bucket
166    pub bucket: DateTime<Utc>,
167    /// Token ID
168    pub token_id: Uuid,
169    /// Opening price
170    pub open_price: i64,
171    /// Highest price
172    pub high_price: i64,
173    /// Lowest price
174    pub low_price: i64,
175    /// Closing price
176    pub close_price: i64,
177    /// Final supply
178    pub final_supply: i64,
179    /// Final market cap
180    pub final_market_cap: i64,
181}
182
183/// Analytics service for dashboard metrics
184pub struct AnalyticsService {
185    pool: PgPool,
186}
187
188impl AnalyticsService {
189    /// Create a new analytics service
190    pub fn new(pool: PgPool) -> Self {
191        Self { pool }
192    }
193
194    /// Create all materialized views
195    pub async fn create_materialized_views(&self) -> Result<()> {
196        // Create dashboard metrics view
197        sqlx::query(DASHBOARD_METRICS_VIEW_SQL)
198            .execute(&self.pool)
199            .await?;
200
201        // Create token metrics view
202        sqlx::query(TOKEN_METRICS_VIEW_SQL)
203            .execute(&self.pool)
204            .await?;
205
206        // Create daily stats view
207        sqlx::query(DAILY_STATS_VIEW_SQL)
208            .execute(&self.pool)
209            .await?;
210
211        // Create user activity view
212        sqlx::query(USER_ACTIVITY_VIEW_SQL)
213            .execute(&self.pool)
214            .await?;
215
216        tracing::info!("Created all materialized views for analytics");
217        Ok(())
218    }
219
220    /// Refresh all materialized views
221    pub async fn refresh_all_views(&self) -> Result<()> {
222        sqlx::query("REFRESH MATERIALIZED VIEW CONCURRENTLY IF EXISTS mv_dashboard_metrics")
223            .execute(&self.pool)
224            .await
225            .ok(); // Ignore if doesn't exist
226
227        sqlx::query("REFRESH MATERIALIZED VIEW CONCURRENTLY IF EXISTS mv_token_metrics")
228            .execute(&self.pool)
229            .await
230            .ok();
231
232        sqlx::query("REFRESH MATERIALIZED VIEW CONCURRENTLY IF EXISTS mv_daily_stats")
233            .execute(&self.pool)
234            .await
235            .ok();
236
237        sqlx::query("REFRESH MATERIALIZED VIEW CONCURRENTLY IF EXISTS mv_user_activity")
238            .execute(&self.pool)
239            .await
240            .ok();
241
242        tracing::debug!("Refreshed all materialized views");
243        Ok(())
244    }
245
246    /// Get dashboard metrics (from materialized view if available, otherwise compute)
247    pub async fn get_dashboard_metrics(&self) -> Result<DashboardMetrics> {
248        // Try materialized view first
249        let result =
250            sqlx::query_as::<_, DashboardMetrics>("SELECT * FROM mv_dashboard_metrics LIMIT 1")
251                .fetch_optional(&self.pool)
252                .await;
253
254        if let Ok(Some(metrics)) = result {
255            return Ok(metrics);
256        }
257
258        // Fall back to computed metrics
259        self.compute_dashboard_metrics().await
260    }
261
262    /// Compute dashboard metrics directly (without materialized view)
263    pub async fn compute_dashboard_metrics(&self) -> Result<DashboardMetrics> {
264        let metrics = sqlx::query_as::<_, DashboardMetrics>(
265            r#"
266            SELECT
267                (SELECT COUNT(*) FROM users) as total_users,
268                (SELECT COUNT(*) FROM users WHERE created_at > NOW() - INTERVAL '24 hours') as new_users_24h,
269                (SELECT COUNT(*) FROM users WHERE created_at > NOW() - INTERVAL '7 days') as new_users_7d,
270                (SELECT COUNT(*) FROM tokens WHERE status = 'active') as total_tokens,
271                (SELECT COUNT(*) FROM tokens WHERE created_at > NOW() - INTERVAL '24 hours') as new_tokens_24h,
272                (SELECT COUNT(*) FROM trades) as total_trades,
273                (SELECT COUNT(*) FROM trades WHERE created_at > NOW() - INTERVAL '24 hours') as trades_24h,
274                COALESCE((SELECT SUM((total_btc * 100000000)::bigint) FROM trades), 0) as total_volume_sats,
275                COALESCE((SELECT SUM((total_btc * 100000000)::bigint) FROM trades WHERE created_at > NOW() - INTERVAL '24 hours'), 0) as volume_24h_sats,
276                COALESCE((SELECT SUM((platform_fee * 100000000)::bigint) FROM trades), 0) as total_fees_sats,
277                COALESCE((SELECT SUM((platform_fee * 100000000)::bigint) FROM trades WHERE created_at > NOW() - INTERVAL '24 hours'), 0) as fees_24h_sats,
278                (SELECT COUNT(*) FROM output_commitments WHERE status = 'pending') as pending_commitments,
279                (SELECT COUNT(*) FROM kyc_applications WHERE status = 'pending') as pending_kyc
280            "#,
281        )
282        .fetch_one(&self.pool)
283        .await?;
284
285        Ok(metrics)
286    }
287
288    /// Get token metrics (top tokens by volume)
289    pub async fn get_top_tokens(&self, limit: i64) -> Result<Vec<TokenMetricsSummary>> {
290        // Try materialized view first
291        let result = sqlx::query_as::<_, TokenMetricsSummary>(
292            r#"
293            SELECT * FROM mv_token_metrics
294            ORDER BY volume_24h_btc DESC
295            LIMIT $1
296            "#,
297        )
298        .bind(limit)
299        .fetch_all(&self.pool)
300        .await;
301
302        if let Ok(tokens) = result {
303            if !tokens.is_empty() {
304                return Ok(tokens);
305            }
306        }
307
308        // Fall back to computed query
309        self.compute_top_tokens(limit).await
310    }
311
312    /// Compute top tokens directly
313    async fn compute_top_tokens(&self, limit: i64) -> Result<Vec<TokenMetricsSummary>> {
314        let tokens = sqlx::query_as::<_, TokenMetricsSummary>(
315            r#"
316            SELECT
317                t.token_id,
318                t.symbol,
319                t.name,
320                t.total_supply,
321                COALESCE(h.holder_count, 0) as holder_count,
322                COALESCE(tr.trade_count, 0) as trade_count,
323                COALESCE(tr.trades_24h, 0) as trades_24h,
324                COALESCE(tr.total_volume_btc, 0) as total_volume_btc,
325                COALESCE(tr.volume_24h_btc, 0) as volume_24h_btc,
326                COALESCE(tr.last_price, t.base_price) as current_price_btc,
327                COALESCE(
328                    CASE WHEN tr.price_24h_ago > 0
329                         THEN ((tr.last_price - tr.price_24h_ago) / tr.price_24h_ago * 100)
330                         ELSE 0
331                    END,
332                    0
333                ) as price_change_24h_pct
334            FROM tokens t
335            LEFT JOIN (
336                SELECT token_id, COUNT(DISTINCT user_id) as holder_count
337                FROM balances
338                WHERE amount > 0
339                GROUP BY token_id
340            ) h ON h.token_id = t.token_id
341            LEFT JOIN (
342                SELECT
343                    token_id,
344                    COUNT(*) as trade_count,
345                    COUNT(*) FILTER (WHERE created_at > NOW() - INTERVAL '24 hours') as trades_24h,
346                    SUM(total_btc) as total_volume_btc,
347                    SUM(total_btc) FILTER (WHERE created_at > NOW() - INTERVAL '24 hours') as volume_24h_btc,
348                    (SELECT price_btc FROM trades tr2 WHERE tr2.token_id = trades.token_id ORDER BY created_at DESC LIMIT 1) as last_price,
349                    (SELECT price_btc FROM trades tr2 WHERE tr2.token_id = trades.token_id AND tr2.created_at < NOW() - INTERVAL '24 hours' ORDER BY created_at DESC LIMIT 1) as price_24h_ago
350                FROM trades
351                GROUP BY token_id
352            ) tr ON tr.token_id = t.token_id
353            WHERE t.status = 'active'
354            ORDER BY COALESCE(tr.volume_24h_btc, 0) DESC
355            LIMIT $1
356            "#,
357        )
358        .bind(limit)
359        .fetch_all(&self.pool)
360        .await?;
361
362        Ok(tokens)
363    }
364
365    /// Get daily statistics for a date range
366    pub async fn get_daily_stats(
367        &self,
368        start_date: chrono::NaiveDate,
369        end_date: chrono::NaiveDate,
370    ) -> Result<Vec<DailyStats>> {
371        // Try materialized view first
372        let result = sqlx::query_as::<_, DailyStats>(
373            r#"
374            SELECT * FROM mv_daily_stats
375            WHERE date >= $1 AND date <= $2
376            ORDER BY date DESC
377            "#,
378        )
379        .bind(start_date)
380        .bind(end_date)
381        .fetch_all(&self.pool)
382        .await;
383
384        if let Ok(stats) = result {
385            if !stats.is_empty() {
386                return Ok(stats);
387            }
388        }
389
390        // Fall back to computed query
391        self.compute_daily_stats(start_date, end_date).await
392    }
393
394    /// Compute daily statistics directly
395    async fn compute_daily_stats(
396        &self,
397        start_date: chrono::NaiveDate,
398        end_date: chrono::NaiveDate,
399    ) -> Result<Vec<DailyStats>> {
400        let stats = sqlx::query_as::<_, DailyStats>(
401            r#"
402            WITH dates AS (
403                SELECT generate_series($1::date, $2::date, '1 day'::interval)::date as date
404            )
405            SELECT
406                d.date,
407                COALESCE(u.new_users, 0) as new_users,
408                COALESCE(t.new_tokens, 0) as new_tokens,
409                COALESCE(tr.trade_count, 0) as trade_count,
410                COALESCE(tr.volume_sats, 0) as volume_sats,
411                COALESCE(tr.fees_sats, 0) as fees_sats,
412                COALESCE(tr.active_users, 0) as active_users
413            FROM dates d
414            LEFT JOIN (
415                SELECT DATE(created_at) as date, COUNT(*) as new_users
416                FROM users
417                WHERE DATE(created_at) >= $1 AND DATE(created_at) <= $2
418                GROUP BY DATE(created_at)
419            ) u ON u.date = d.date
420            LEFT JOIN (
421                SELECT DATE(created_at) as date, COUNT(*) as new_tokens
422                FROM tokens
423                WHERE DATE(created_at) >= $1 AND DATE(created_at) <= $2
424                GROUP BY DATE(created_at)
425            ) t ON t.date = d.date
426            LEFT JOIN (
427                SELECT
428                    DATE(created_at) as date,
429                    COUNT(*) as trade_count,
430                    SUM((total_btc * 100000000)::bigint) as volume_sats,
431                    SUM((platform_fee * 100000000)::bigint) as fees_sats,
432                    COUNT(DISTINCT buyer_id) + COUNT(DISTINCT seller_id) as active_users
433                FROM trades
434                WHERE DATE(created_at) >= $1 AND DATE(created_at) <= $2
435                GROUP BY DATE(created_at)
436            ) tr ON tr.date = d.date
437            ORDER BY d.date DESC
438            "#,
439        )
440        .bind(start_date)
441        .bind(end_date)
442        .fetch_all(&self.pool)
443        .await?;
444
445        Ok(stats)
446    }
447
448    /// Get top users by trading volume
449    pub async fn get_top_users(&self, limit: i64) -> Result<Vec<UserActivitySummary>> {
450        let users = sqlx::query_as::<_, UserActivitySummary>(
451            r#"
452            SELECT
453                u.user_id,
454                u.username,
455                COALESCE(t.trade_count, 0) as trade_count,
456                COALESCE(t.total_volume_btc, 0) as total_volume_btc,
457                COALESCE(b.tokens_held, 0) as tokens_held,
458                COALESCE(tk.tokens_issued, 0) as tokens_issued,
459                u.reputation_score,
460                GREATEST(t.last_trade, u.created_at) as last_activity
461            FROM users u
462            LEFT JOIN (
463                SELECT
464                    user_id,
465                    COUNT(*) as trade_count,
466                    SUM(total_btc) as total_volume_btc,
467                    MAX(created_at) as last_trade
468                FROM (
469                    SELECT buyer_id as user_id, total_btc, created_at FROM trades
470                    UNION ALL
471                    SELECT seller_id as user_id, total_btc, created_at FROM trades
472                ) all_trades
473                GROUP BY user_id
474            ) t ON t.user_id = u.user_id
475            LEFT JOIN (
476                SELECT user_id, COUNT(DISTINCT token_id) as tokens_held
477                FROM balances
478                WHERE amount > 0
479                GROUP BY user_id
480            ) b ON b.user_id = u.user_id
481            LEFT JOIN (
482                SELECT issuer_id as user_id, COUNT(*) as tokens_issued
483                FROM tokens
484                GROUP BY issuer_id
485            ) tk ON tk.user_id = u.user_id
486            ORDER BY COALESCE(t.total_volume_btc, 0) DESC
487            LIMIT $1
488            "#,
489        )
490        .bind(limit)
491        .fetch_all(&self.pool)
492        .await?;
493
494        Ok(users)
495    }
496
497    /// Drop all materialized views
498    pub async fn drop_materialized_views(&self) -> Result<()> {
499        sqlx::query("DROP MATERIALIZED VIEW IF EXISTS mv_dashboard_metrics CASCADE")
500            .execute(&self.pool)
501            .await?;
502        sqlx::query("DROP MATERIALIZED VIEW IF EXISTS mv_token_metrics CASCADE")
503            .execute(&self.pool)
504            .await?;
505        sqlx::query("DROP MATERIALIZED VIEW IF EXISTS mv_daily_stats CASCADE")
506            .execute(&self.pool)
507            .await?;
508        sqlx::query("DROP MATERIALIZED VIEW IF EXISTS mv_user_activity CASCADE")
509            .execute(&self.pool)
510            .await?;
511
512        tracing::info!("Dropped all materialized views");
513        Ok(())
514    }
515
516    // ========================================================================
517    // TimescaleDB Time-Series Functions
518    // ========================================================================
519
520    /// Record price history data point
521    ///
522    /// Inserts a price history record into the TimescaleDB hypertable.
523    /// Safe to call even if TimescaleDB is not installed (uses regular table).
524    pub async fn record_price_history(
525        &self,
526        token_id: Uuid,
527        price_satoshis: i64,
528        supply: i64,
529    ) -> Result<()> {
530        let market_cap_satoshis = price_satoshis.saturating_mul(supply);
531
532        sqlx::query(
533            r#"
534            INSERT INTO price_history (time, token_id, price_satoshis, supply, market_cap_satoshis)
535            VALUES (NOW(), $1, $2, $3, $4)
536            ON CONFLICT (time, token_id) DO UPDATE
537            SET price_satoshis = EXCLUDED.price_satoshis,
538                supply = EXCLUDED.supply,
539                market_cap_satoshis = EXCLUDED.market_cap_satoshis
540            "#,
541        )
542        .bind(token_id)
543        .bind(price_satoshis)
544        .bind(supply)
545        .bind(market_cap_satoshis)
546        .execute(&self.pool)
547        .await?;
548
549        Ok(())
550    }
551
552    /// Record volume history data point
553    ///
554    /// Inserts or updates a volume history record for a specific time bucket.
555    /// Typically called on trade execution to update the current hour's volume.
556    pub async fn record_volume_history(
557        &self,
558        token_id: Uuid,
559        buy_volume_satoshis: i64,
560        sell_volume_satoshis: i64,
561        trade_count: i32,
562        unique_traders: i32,
563    ) -> Result<()> {
564        sqlx::query(
565            r#"
566            INSERT INTO volume_history
567                (time, token_id, buy_volume_satoshis, sell_volume_satoshis, trade_count, unique_traders)
568            VALUES (date_trunc('hour', NOW()), $1, $2, $3, $4, $5)
569            ON CONFLICT (time, token_id) DO UPDATE
570            SET buy_volume_satoshis = volume_history.buy_volume_satoshis + EXCLUDED.buy_volume_satoshis,
571                sell_volume_satoshis = volume_history.sell_volume_satoshis + EXCLUDED.sell_volume_satoshis,
572                trade_count = volume_history.trade_count + EXCLUDED.trade_count,
573                unique_traders = GREATEST(volume_history.unique_traders, EXCLUDED.unique_traders)
574            "#,
575        )
576        .bind(token_id)
577        .bind(buy_volume_satoshis)
578        .bind(sell_volume_satoshis)
579        .bind(trade_count)
580        .bind(unique_traders)
581        .execute(&self.pool)
582        .await?;
583
584        Ok(())
585    }
586
587    /// Record platform-wide volume history
588    pub async fn record_platform_volume(
589        &self,
590        total_volume_satoshis: i64,
591        trade_count: i32,
592        active_tokens: i32,
593        active_traders: i32,
594        fees_collected_satoshis: i64,
595    ) -> Result<()> {
596        sqlx::query(
597            r#"
598            INSERT INTO platform_volume_history
599                (time, total_volume_satoshis, trade_count, active_tokens, active_traders, fees_collected_satoshis)
600            VALUES (date_trunc('hour', NOW()), $1, $2, $3, $4, $5)
601            ON CONFLICT (time) DO UPDATE
602            SET total_volume_satoshis = platform_volume_history.total_volume_satoshis + EXCLUDED.total_volume_satoshis,
603                trade_count = platform_volume_history.trade_count + EXCLUDED.trade_count,
604                active_tokens = GREATEST(platform_volume_history.active_tokens, EXCLUDED.active_tokens),
605                active_traders = GREATEST(platform_volume_history.active_traders, EXCLUDED.active_traders),
606                fees_collected_satoshis = platform_volume_history.fees_collected_satoshis + EXCLUDED.fees_collected_satoshis
607            "#,
608        )
609        .bind(total_volume_satoshis)
610        .bind(trade_count)
611        .bind(active_tokens)
612        .bind(active_traders)
613        .bind(fees_collected_satoshis)
614        .execute(&self.pool)
615        .await?;
616
617        Ok(())
618    }
619
620    /// Get price history for a token within a time range
621    pub async fn get_price_history(
622        &self,
623        token_id: Uuid,
624        start_time: DateTime<Utc>,
625        end_time: DateTime<Utc>,
626    ) -> Result<Vec<PriceHistory>> {
627        let history = sqlx::query_as::<_, PriceHistory>(
628            r#"
629            SELECT time, token_id, price_satoshis, supply, market_cap_satoshis
630            FROM price_history
631            WHERE token_id = $1 AND time >= $2 AND time <= $3
632            ORDER BY time ASC
633            "#,
634        )
635        .bind(token_id)
636        .bind(start_time)
637        .bind(end_time)
638        .fetch_all(&self.pool)
639        .await?;
640
641        Ok(history)
642    }
643
644    /// Get volume history for a token within a time range
645    pub async fn get_volume_history(
646        &self,
647        token_id: Uuid,
648        start_time: DateTime<Utc>,
649        end_time: DateTime<Utc>,
650    ) -> Result<Vec<VolumeHistory>> {
651        let history = sqlx::query_as::<_, VolumeHistory>(
652            r#"
653            SELECT time, token_id, buy_volume_satoshis, sell_volume_satoshis, trade_count, unique_traders
654            FROM volume_history
655            WHERE token_id = $1 AND time >= $2 AND time <= $3
656            ORDER BY time ASC
657            "#,
658        )
659        .bind(token_id)
660        .bind(start_time)
661        .bind(end_time)
662        .fetch_all(&self.pool)
663        .await?;
664
665        Ok(history)
666    }
667
668    /// Get OHLC (candlestick) data for a token
669    ///
670    /// Returns OHLC data with configurable time buckets (e.g., '1 hour', '1 day', '1 week').
671    /// Uses TimescaleDB's continuous aggregate if available, otherwise computes from raw data.
672    pub async fn get_ohlc_data(
673        &self,
674        token_id: Uuid,
675        start_time: DateTime<Utc>,
676        end_time: DateTime<Utc>,
677        bucket_interval: &str, // e.g., "1 hour", "1 day", "1 week"
678    ) -> Result<Vec<OhlcData>> {
679        // Try to use continuous aggregate for hourly data
680        if bucket_interval == "1 hour" {
681            if let Ok(data) = self
682                .get_ohlc_from_aggregate(token_id, start_time, end_time)
683                .await
684            {
685                if !data.is_empty() {
686                    return Ok(data);
687                }
688            }
689        }
690
691        // Fall back to computing from raw data
692        self.compute_ohlc_data(token_id, start_time, end_time, bucket_interval)
693            .await
694    }
695
696    /// Get OHLC data from continuous aggregate (if TimescaleDB is available)
697    async fn get_ohlc_from_aggregate(
698        &self,
699        token_id: Uuid,
700        start_time: DateTime<Utc>,
701        end_time: DateTime<Utc>,
702    ) -> Result<Vec<OhlcData>> {
703        let data = sqlx::query_as::<_, OhlcData>(
704            r#"
705            SELECT bucket, token_id, open_price, high_price, low_price, close_price,
706                   final_supply, final_market_cap
707            FROM price_history_hourly
708            WHERE token_id = $1 AND bucket >= $2 AND bucket <= $3
709            ORDER BY bucket ASC
710            "#,
711        )
712        .bind(token_id)
713        .bind(start_time)
714        .bind(end_time)
715        .fetch_all(&self.pool)
716        .await?;
717
718        Ok(data)
719    }
720
721    /// Compute OHLC data from raw price history
722    async fn compute_ohlc_data(
723        &self,
724        token_id: Uuid,
725        start_time: DateTime<Utc>,
726        end_time: DateTime<Utc>,
727        bucket_interval: &str,
728    ) -> Result<Vec<OhlcData>> {
729        let data = sqlx::query_as::<_, OhlcData>(
730            r#"
731            SELECT
732                time_bucket($1::interval, time) AS bucket,
733                token_id,
734                first(price_satoshis, time) AS open_price,
735                max(price_satoshis) AS high_price,
736                min(price_satoshis) AS low_price,
737                last(price_satoshis, time) AS close_price,
738                last(supply, time) AS final_supply,
739                last(market_cap_satoshis, time) AS final_market_cap
740            FROM price_history
741            WHERE token_id = $2 AND time >= $3 AND time <= $4
742            GROUP BY bucket, token_id
743            ORDER BY bucket ASC
744            "#,
745        )
746        .bind(bucket_interval)
747        .bind(token_id)
748        .bind(start_time)
749        .bind(end_time)
750        .fetch_all(&self.pool)
751        .await?;
752
753        Ok(data)
754    }
755
756    /// Get aggregated volume data by time bucket
757    pub async fn get_aggregated_volume(
758        &self,
759        token_id: Option<Uuid>,
760        start_time: DateTime<Utc>,
761        end_time: DateTime<Utc>,
762        bucket_interval: &str, // e.g., "1 hour", "1 day"
763    ) -> Result<Vec<VolumeHistory>> {
764        let volume = if let Some(tid) = token_id {
765            // Token-specific volume
766            sqlx::query_as::<_, VolumeHistory>(
767                r#"
768                SELECT
769                    time_bucket($1::interval, time) AS time,
770                    token_id,
771                    sum(buy_volume_satoshis) AS buy_volume_satoshis,
772                    sum(sell_volume_satoshis) AS sell_volume_satoshis,
773                    sum(trade_count)::int AS trade_count,
774                    max(unique_traders)::int AS unique_traders
775                FROM volume_history
776                WHERE token_id = $2 AND time >= $3 AND time <= $4
777                GROUP BY time_bucket($1::interval, time), token_id
778                ORDER BY time ASC
779                "#,
780            )
781            .bind(bucket_interval)
782            .bind(tid)
783            .bind(start_time)
784            .bind(end_time)
785            .fetch_all(&self.pool)
786            .await?
787        } else {
788            // Platform-wide volume (aggregate across all tokens)
789            sqlx::query_as::<_, VolumeHistory>(
790                r#"
791                SELECT
792                    time_bucket($1::interval, time) AS time,
793                    '00000000-0000-0000-0000-000000000000'::uuid AS token_id,
794                    sum(buy_volume_satoshis) AS buy_volume_satoshis,
795                    sum(sell_volume_satoshis) AS sell_volume_satoshis,
796                    sum(trade_count)::int AS trade_count,
797                    sum(unique_traders)::int AS unique_traders
798                FROM volume_history
799                WHERE time >= $2 AND time <= $3
800                GROUP BY time_bucket($1::interval, time)
801                ORDER BY time ASC
802                "#,
803            )
804            .bind(bucket_interval)
805            .bind(start_time)
806            .bind(end_time)
807            .fetch_all(&self.pool)
808            .await?
809        };
810
811        Ok(volume)
812    }
813
814    /// Get platform-wide volume history
815    pub async fn get_platform_volume_history(
816        &self,
817        start_time: DateTime<Utc>,
818        end_time: DateTime<Utc>,
819    ) -> Result<Vec<PlatformVolumeHistory>> {
820        let history = sqlx::query_as::<_, PlatformVolumeHistory>(
821            r#"
822            SELECT time, total_volume_satoshis, trade_count, active_tokens,
823                   active_traders, fees_collected_satoshis
824            FROM platform_volume_history
825            WHERE time >= $1 AND time <= $2
826            ORDER BY time ASC
827            "#,
828        )
829        .bind(start_time)
830        .bind(end_time)
831        .fetch_all(&self.pool)
832        .await?;
833
834        Ok(history)
835    }
836
837    /// Check if TimescaleDB extension is available
838    pub async fn is_timescaledb_available(&self) -> bool {
839        sqlx::query_scalar::<_, bool>(
840            "SELECT EXISTS(SELECT 1 FROM pg_extension WHERE extname = 'timescaledb')",
841        )
842        .fetch_one(&self.pool)
843        .await
844        .unwrap_or(false)
845    }
846
847    /// Get hypertable information (only works with TimescaleDB)
848    pub async fn get_hypertable_info(&self, table_name: &str) -> Result<Option<String>> {
849        let info = sqlx::query_scalar::<_, String>(
850            r#"
851            SELECT format('Hypertable: %s, Chunks: %s, Compression: %s',
852                          hypertable_name,
853                          num_chunks,
854                          compression_enabled)
855            FROM timescaledb_information.hypertables
856            WHERE hypertable_name = $1
857            "#,
858        )
859        .bind(table_name)
860        .fetch_optional(&self.pool)
861        .await?;
862
863        Ok(info)
864    }
865}
866
867/// SQL to create dashboard metrics materialized view
868const DASHBOARD_METRICS_VIEW_SQL: &str = r#"
869CREATE MATERIALIZED VIEW IF NOT EXISTS mv_dashboard_metrics AS
870SELECT
871    (SELECT COUNT(*) FROM users) as total_users,
872    (SELECT COUNT(*) FROM users WHERE created_at > NOW() - INTERVAL '24 hours') as new_users_24h,
873    (SELECT COUNT(*) FROM users WHERE created_at > NOW() - INTERVAL '7 days') as new_users_7d,
874    (SELECT COUNT(*) FROM tokens WHERE status = 'active') as total_tokens,
875    (SELECT COUNT(*) FROM tokens WHERE created_at > NOW() - INTERVAL '24 hours') as new_tokens_24h,
876    (SELECT COUNT(*) FROM trades) as total_trades,
877    (SELECT COUNT(*) FROM trades WHERE created_at > NOW() - INTERVAL '24 hours') as trades_24h,
878    COALESCE((SELECT SUM((total_btc * 100000000)::bigint) FROM trades), 0) as total_volume_sats,
879    COALESCE((SELECT SUM((total_btc * 100000000)::bigint) FROM trades WHERE created_at > NOW() - INTERVAL '24 hours'), 0) as volume_24h_sats,
880    COALESCE((SELECT SUM((platform_fee * 100000000)::bigint) FROM trades), 0) as total_fees_sats,
881    COALESCE((SELECT SUM((platform_fee * 100000000)::bigint) FROM trades WHERE created_at > NOW() - INTERVAL '24 hours'), 0) as fees_24h_sats,
882    (SELECT COUNT(*) FROM output_commitments WHERE status = 'pending') as pending_commitments,
883    COALESCE((SELECT COUNT(*) FROM kyc_applications WHERE status = 'pending'), 0) as pending_kyc;
884
885CREATE UNIQUE INDEX IF NOT EXISTS mv_dashboard_metrics_idx ON mv_dashboard_metrics ((1));
886"#;
887
888/// SQL to create token metrics materialized view
889const TOKEN_METRICS_VIEW_SQL: &str = r#"
890CREATE MATERIALIZED VIEW IF NOT EXISTS mv_token_metrics AS
891SELECT
892    t.token_id,
893    t.symbol,
894    t.name,
895    t.total_supply,
896    COALESCE(h.holder_count, 0) as holder_count,
897    COALESCE(tr.trade_count, 0) as trade_count,
898    COALESCE(tr.trades_24h, 0) as trades_24h,
899    COALESCE(tr.total_volume_btc, 0) as total_volume_btc,
900    COALESCE(tr.volume_24h_btc, 0) as volume_24h_btc,
901    COALESCE(tr.last_price, t.base_price) as current_price_btc,
902    COALESCE(
903        CASE WHEN tr.price_24h_ago > 0
904             THEN ((tr.last_price - tr.price_24h_ago) / tr.price_24h_ago * 100)
905             ELSE 0
906        END,
907        0
908    ) as price_change_24h_pct
909FROM tokens t
910LEFT JOIN (
911    SELECT token_id, COUNT(DISTINCT user_id) as holder_count
912    FROM balances
913    WHERE amount > 0
914    GROUP BY token_id
915) h ON h.token_id = t.token_id
916LEFT JOIN (
917    SELECT
918        token_id,
919        COUNT(*) as trade_count,
920        COUNT(*) FILTER (WHERE created_at > NOW() - INTERVAL '24 hours') as trades_24h,
921        SUM(total_btc) as total_volume_btc,
922        SUM(total_btc) FILTER (WHERE created_at > NOW() - INTERVAL '24 hours') as volume_24h_btc,
923        (SELECT price_btc FROM trades tr2 WHERE tr2.token_id = trades.token_id ORDER BY created_at DESC LIMIT 1) as last_price,
924        (SELECT price_btc FROM trades tr2 WHERE tr2.token_id = trades.token_id AND tr2.created_at < NOW() - INTERVAL '24 hours' ORDER BY created_at DESC LIMIT 1) as price_24h_ago
925    FROM trades
926    GROUP BY token_id
927) tr ON tr.token_id = t.token_id
928WHERE t.status = 'active';
929
930CREATE UNIQUE INDEX IF NOT EXISTS mv_token_metrics_token_id_idx ON mv_token_metrics (token_id);
931CREATE INDEX IF NOT EXISTS mv_token_metrics_volume_idx ON mv_token_metrics (volume_24h_btc DESC);
932"#;
933
934/// SQL to create daily stats materialized view
935const DAILY_STATS_VIEW_SQL: &str = r#"
936CREATE MATERIALIZED VIEW IF NOT EXISTS mv_daily_stats AS
937WITH dates AS (
938    SELECT generate_series(
939        (SELECT COALESCE(MIN(DATE(created_at)), CURRENT_DATE - INTERVAL '30 days') FROM users),
940        CURRENT_DATE,
941        '1 day'::interval
942    )::date as date
943)
944SELECT
945    d.date,
946    COALESCE(u.new_users, 0)::bigint as new_users,
947    COALESCE(t.new_tokens, 0)::bigint as new_tokens,
948    COALESCE(tr.trade_count, 0)::bigint as trade_count,
949    COALESCE(tr.volume_sats, 0)::bigint as volume_sats,
950    COALESCE(tr.fees_sats, 0)::bigint as fees_sats,
951    COALESCE(tr.active_users, 0)::bigint as active_users
952FROM dates d
953LEFT JOIN (
954    SELECT DATE(created_at) as date, COUNT(*) as new_users
955    FROM users
956    GROUP BY DATE(created_at)
957) u ON u.date = d.date
958LEFT JOIN (
959    SELECT DATE(created_at) as date, COUNT(*) as new_tokens
960    FROM tokens
961    GROUP BY DATE(created_at)
962) t ON t.date = d.date
963LEFT JOIN (
964    SELECT
965        DATE(created_at) as date,
966        COUNT(*) as trade_count,
967        SUM((total_btc * 100000000)::bigint) as volume_sats,
968        SUM((platform_fee * 100000000)::bigint) as fees_sats,
969        COUNT(DISTINCT buyer_id) + COUNT(DISTINCT seller_id) as active_users
970    FROM trades
971    GROUP BY DATE(created_at)
972) tr ON tr.date = d.date;
973
974CREATE UNIQUE INDEX IF NOT EXISTS mv_daily_stats_date_idx ON mv_daily_stats (date);
975"#;
976
977/// SQL to create user activity materialized view
978const USER_ACTIVITY_VIEW_SQL: &str = r#"
979CREATE MATERIALIZED VIEW IF NOT EXISTS mv_user_activity AS
980SELECT
981    u.user_id,
982    u.username,
983    COALESCE(t.trade_count, 0)::bigint as trade_count,
984    COALESCE(t.total_volume_btc, 0) as total_volume_btc,
985    COALESCE(b.tokens_held, 0)::bigint as tokens_held,
986    COALESCE(tk.tokens_issued, 0)::bigint as tokens_issued,
987    u.reputation_score,
988    GREATEST(t.last_trade, u.created_at) as last_activity
989FROM users u
990LEFT JOIN (
991    SELECT
992        user_id,
993        COUNT(*) as trade_count,
994        SUM(total_btc) as total_volume_btc,
995        MAX(created_at) as last_trade
996    FROM (
997        SELECT buyer_id as user_id, total_btc, created_at FROM trades
998        UNION ALL
999        SELECT seller_id as user_id, total_btc, created_at FROM trades
1000    ) all_trades
1001    GROUP BY user_id
1002) t ON t.user_id = u.user_id
1003LEFT JOIN (
1004    SELECT user_id, COUNT(DISTINCT token_id) as tokens_held
1005    FROM balances
1006    WHERE amount > 0
1007    GROUP BY user_id
1008) b ON b.user_id = u.user_id
1009LEFT JOIN (
1010    SELECT issuer_id as user_id, COUNT(*) as tokens_issued
1011    FROM tokens
1012    GROUP BY issuer_id
1013) tk ON tk.user_id = u.user_id;
1014
1015CREATE UNIQUE INDEX IF NOT EXISTS mv_user_activity_user_id_idx ON mv_user_activity (user_id);
1016CREATE INDEX IF NOT EXISTS mv_user_activity_volume_idx ON mv_user_activity (total_volume_btc DESC);
1017"#;
1018
1019/// Refresh job configuration
1020#[derive(Debug, Clone)]
1021pub struct RefreshConfig {
1022    /// Interval between refreshes in seconds
1023    pub refresh_interval_secs: u64,
1024    /// Whether to use concurrent refresh
1025    pub concurrent: bool,
1026}
1027
1028impl Default for RefreshConfig {
1029    fn default() -> Self {
1030        Self {
1031            refresh_interval_secs: 300, // 5 minutes
1032            concurrent: true,
1033        }
1034    }
1035}
1036
1037#[cfg(test)]
1038mod tests {
1039    use super::*;
1040
1041    #[test]
1042    fn test_refresh_config_defaults() {
1043        let config = RefreshConfig::default();
1044        assert_eq!(config.refresh_interval_secs, 300);
1045        assert!(config.concurrent);
1046    }
1047
1048    #[test]
1049    fn test_price_history_creation() {
1050        let token_id = Uuid::new_v4();
1051        let price_history = PriceHistory {
1052            time: Utc::now(),
1053            token_id,
1054            price_satoshis: 100_000,
1055            supply: 1_000_000,
1056            market_cap_satoshis: 100_000_000_000,
1057        };
1058
1059        assert_eq!(price_history.token_id, token_id);
1060        assert_eq!(price_history.price_satoshis, 100_000);
1061        assert_eq!(price_history.supply, 1_000_000);
1062        assert_eq!(price_history.market_cap_satoshis, 100_000_000_000);
1063    }
1064
1065    #[test]
1066    fn test_volume_history_creation() {
1067        let token_id = Uuid::new_v4();
1068        let volume_history = VolumeHistory {
1069            time: Utc::now(),
1070            token_id,
1071            buy_volume_satoshis: 50_000_000,
1072            sell_volume_satoshis: 30_000_000,
1073            trade_count: 42,
1074            unique_traders: 15,
1075        };
1076
1077        assert_eq!(volume_history.token_id, token_id);
1078        assert_eq!(volume_history.buy_volume_satoshis, 50_000_000);
1079        assert_eq!(volume_history.sell_volume_satoshis, 30_000_000);
1080        assert_eq!(volume_history.trade_count, 42);
1081        assert_eq!(volume_history.unique_traders, 15);
1082    }
1083
1084    #[test]
1085    fn test_platform_volume_history_creation() {
1086        let platform_volume = PlatformVolumeHistory {
1087            time: Utc::now(),
1088            total_volume_satoshis: 1_000_000_000,
1089            trade_count: 1000,
1090            active_tokens: 50,
1091            active_traders: 200,
1092            fees_collected_satoshis: 5_000_000,
1093        };
1094
1095        assert_eq!(platform_volume.total_volume_satoshis, 1_000_000_000);
1096        assert_eq!(platform_volume.trade_count, 1000);
1097        assert_eq!(platform_volume.active_tokens, 50);
1098        assert_eq!(platform_volume.active_traders, 200);
1099        assert_eq!(platform_volume.fees_collected_satoshis, 5_000_000);
1100    }
1101
1102    #[test]
1103    fn test_ohlc_data_creation() {
1104        let token_id = Uuid::new_v4();
1105        let ohlc = OhlcData {
1106            bucket: Utc::now(),
1107            token_id,
1108            open_price: 95_000,
1109            high_price: 105_000,
1110            low_price: 90_000,
1111            close_price: 100_000,
1112            final_supply: 1_000_000,
1113            final_market_cap: 100_000_000_000,
1114        };
1115
1116        assert_eq!(ohlc.token_id, token_id);
1117        assert_eq!(ohlc.open_price, 95_000);
1118        assert_eq!(ohlc.high_price, 105_000);
1119        assert_eq!(ohlc.low_price, 90_000);
1120        assert_eq!(ohlc.close_price, 100_000);
1121        assert!(ohlc.high_price >= ohlc.open_price);
1122        assert!(ohlc.high_price >= ohlc.close_price);
1123        assert!(ohlc.low_price <= ohlc.open_price);
1124        assert!(ohlc.low_price <= ohlc.close_price);
1125    }
1126
1127    #[test]
1128    fn test_timescaledb_structures_are_serializable() {
1129        let token_id = Uuid::new_v4();
1130        let time = Utc::now();
1131
1132        let price = PriceHistory {
1133            time,
1134            token_id,
1135            price_satoshis: 100_000,
1136            supply: 1_000_000,
1137            market_cap_satoshis: 100_000_000_000,
1138        };
1139
1140        let volume = VolumeHistory {
1141            time,
1142            token_id,
1143            buy_volume_satoshis: 50_000_000,
1144            sell_volume_satoshis: 30_000_000,
1145            trade_count: 42,
1146            unique_traders: 15,
1147        };
1148
1149        let platform = PlatformVolumeHistory {
1150            time,
1151            total_volume_satoshis: 1_000_000_000,
1152            trade_count: 1000,
1153            active_tokens: 50,
1154            active_traders: 200,
1155            fees_collected_satoshis: 5_000_000,
1156        };
1157
1158        let ohlc = OhlcData {
1159            bucket: time,
1160            token_id,
1161            open_price: 95_000,
1162            high_price: 105_000,
1163            low_price: 90_000,
1164            close_price: 100_000,
1165            final_supply: 1_000_000,
1166            final_market_cap: 100_000_000_000,
1167        };
1168
1169        // Verify all can be serialized
1170        assert!(serde_json::to_string(&price).is_ok());
1171        assert!(serde_json::to_string(&volume).is_ok());
1172        assert!(serde_json::to_string(&platform).is_ok());
1173        assert!(serde_json::to_string(&ohlc).is_ok());
1174    }
1175
1176    #[test]
1177    fn test_price_history_market_cap_calculation() {
1178        let token_id = Uuid::new_v4();
1179        let price_satoshis = 100_000i64;
1180        let supply = 1_000_000i64;
1181        let expected_market_cap = price_satoshis.saturating_mul(supply);
1182
1183        let price_history = PriceHistory {
1184            time: Utc::now(),
1185            token_id,
1186            price_satoshis,
1187            supply,
1188            market_cap_satoshis: expected_market_cap,
1189        };
1190
1191        assert_eq!(price_history.market_cap_satoshis, 100_000_000_000);
1192        assert_eq!(
1193            price_history.market_cap_satoshis,
1194            price_history.price_satoshis * price_history.supply
1195        );
1196    }
1197}