1use chrono::{DateTime, Utc};
9use serde::Serialize;
10use sqlx::PgPool;
11use uuid::Uuid;
12
13use crate::error::Result;
14
15#[derive(Debug, Clone, Serialize, sqlx::FromRow)]
17pub struct DashboardMetrics {
18 pub total_users: i64,
20 pub new_users_24h: i64,
22 pub new_users_7d: i64,
24 pub total_tokens: i64,
26 pub new_tokens_24h: i64,
28 pub total_trades: i64,
30 pub trades_24h: i64,
32 pub total_volume_sats: i64,
34 pub volume_24h_sats: i64,
36 pub total_fees_sats: i64,
38 pub fees_24h_sats: i64,
40 pub pending_commitments: i64,
42 pub pending_kyc: i64,
44}
45
46#[derive(Debug, Clone, Serialize, sqlx::FromRow)]
48pub struct TokenMetricsSummary {
49 pub token_id: uuid::Uuid,
51 pub symbol: String,
53 pub name: String,
55 pub total_supply: rust_decimal::Decimal,
57 pub holder_count: i64,
59 pub trade_count: i64,
61 pub trades_24h: i64,
63 pub total_volume_btc: rust_decimal::Decimal,
65 pub volume_24h_btc: rust_decimal::Decimal,
67 pub current_price_btc: rust_decimal::Decimal,
69 pub price_change_24h_pct: rust_decimal::Decimal,
71}
72
73#[derive(Debug, Clone, Serialize, sqlx::FromRow)]
75pub struct UserActivitySummary {
76 pub user_id: uuid::Uuid,
78 pub username: String,
80 pub trade_count: i64,
82 pub total_volume_btc: rust_decimal::Decimal,
84 pub tokens_held: i64,
86 pub tokens_issued: i64,
88 pub reputation_score: i32,
90 pub last_activity: Option<chrono::DateTime<chrono::Utc>>,
92}
93
94#[derive(Debug, Clone, Serialize, sqlx::FromRow)]
96pub struct DailyStats {
97 pub date: chrono::NaiveDate,
99 pub new_users: i64,
101 pub new_tokens: i64,
103 pub trade_count: i64,
105 pub volume_sats: i64,
107 pub fees_sats: i64,
109 pub active_users: i64,
111}
112
113#[derive(Debug, Clone, Serialize, sqlx::FromRow)]
115pub struct PriceHistory {
116 pub time: DateTime<Utc>,
118 pub token_id: Uuid,
120 pub price_satoshis: i64,
122 pub supply: i64,
124 pub market_cap_satoshis: i64,
126}
127
128#[derive(Debug, Clone, Serialize, sqlx::FromRow)]
130pub struct VolumeHistory {
131 pub time: DateTime<Utc>,
133 pub token_id: Uuid,
135 pub buy_volume_satoshis: i64,
137 pub sell_volume_satoshis: i64,
139 pub trade_count: i32,
141 pub unique_traders: i32,
143}
144
145#[derive(Debug, Clone, Serialize, sqlx::FromRow)]
147pub struct PlatformVolumeHistory {
148 pub time: DateTime<Utc>,
150 pub total_volume_satoshis: i64,
152 pub trade_count: i32,
154 pub active_tokens: i32,
156 pub active_traders: i32,
158 pub fees_collected_satoshis: i64,
160}
161
162#[derive(Debug, Clone, Serialize, sqlx::FromRow)]
164pub struct OhlcData {
165 pub bucket: DateTime<Utc>,
167 pub token_id: Uuid,
169 pub open_price: i64,
171 pub high_price: i64,
173 pub low_price: i64,
175 pub close_price: i64,
177 pub final_supply: i64,
179 pub final_market_cap: i64,
181}
182
183pub struct AnalyticsService {
185 pool: PgPool,
186}
187
188impl AnalyticsService {
189 pub fn new(pool: PgPool) -> Self {
191 Self { pool }
192 }
193
194 pub async fn create_materialized_views(&self) -> Result<()> {
196 sqlx::query(DASHBOARD_METRICS_VIEW_SQL)
198 .execute(&self.pool)
199 .await?;
200
201 sqlx::query(TOKEN_METRICS_VIEW_SQL)
203 .execute(&self.pool)
204 .await?;
205
206 sqlx::query(DAILY_STATS_VIEW_SQL)
208 .execute(&self.pool)
209 .await?;
210
211 sqlx::query(USER_ACTIVITY_VIEW_SQL)
213 .execute(&self.pool)
214 .await?;
215
216 tracing::info!("Created all materialized views for analytics");
217 Ok(())
218 }
219
220 pub async fn refresh_all_views(&self) -> Result<()> {
222 sqlx::query("REFRESH MATERIALIZED VIEW CONCURRENTLY IF EXISTS mv_dashboard_metrics")
223 .execute(&self.pool)
224 .await
225 .ok(); sqlx::query("REFRESH MATERIALIZED VIEW CONCURRENTLY IF EXISTS mv_token_metrics")
228 .execute(&self.pool)
229 .await
230 .ok();
231
232 sqlx::query("REFRESH MATERIALIZED VIEW CONCURRENTLY IF EXISTS mv_daily_stats")
233 .execute(&self.pool)
234 .await
235 .ok();
236
237 sqlx::query("REFRESH MATERIALIZED VIEW CONCURRENTLY IF EXISTS mv_user_activity")
238 .execute(&self.pool)
239 .await
240 .ok();
241
242 tracing::debug!("Refreshed all materialized views");
243 Ok(())
244 }
245
246 pub async fn get_dashboard_metrics(&self) -> Result<DashboardMetrics> {
248 let result =
250 sqlx::query_as::<_, DashboardMetrics>("SELECT * FROM mv_dashboard_metrics LIMIT 1")
251 .fetch_optional(&self.pool)
252 .await;
253
254 if let Ok(Some(metrics)) = result {
255 return Ok(metrics);
256 }
257
258 self.compute_dashboard_metrics().await
260 }
261
262 pub async fn compute_dashboard_metrics(&self) -> Result<DashboardMetrics> {
264 let metrics = sqlx::query_as::<_, DashboardMetrics>(
265 r#"
266 SELECT
267 (SELECT COUNT(*) FROM users) as total_users,
268 (SELECT COUNT(*) FROM users WHERE created_at > NOW() - INTERVAL '24 hours') as new_users_24h,
269 (SELECT COUNT(*) FROM users WHERE created_at > NOW() - INTERVAL '7 days') as new_users_7d,
270 (SELECT COUNT(*) FROM tokens WHERE status = 'active') as total_tokens,
271 (SELECT COUNT(*) FROM tokens WHERE created_at > NOW() - INTERVAL '24 hours') as new_tokens_24h,
272 (SELECT COUNT(*) FROM trades) as total_trades,
273 (SELECT COUNT(*) FROM trades WHERE created_at > NOW() - INTERVAL '24 hours') as trades_24h,
274 COALESCE((SELECT SUM((total_btc * 100000000)::bigint) FROM trades), 0) as total_volume_sats,
275 COALESCE((SELECT SUM((total_btc * 100000000)::bigint) FROM trades WHERE created_at > NOW() - INTERVAL '24 hours'), 0) as volume_24h_sats,
276 COALESCE((SELECT SUM((platform_fee * 100000000)::bigint) FROM trades), 0) as total_fees_sats,
277 COALESCE((SELECT SUM((platform_fee * 100000000)::bigint) FROM trades WHERE created_at > NOW() - INTERVAL '24 hours'), 0) as fees_24h_sats,
278 (SELECT COUNT(*) FROM output_commitments WHERE status = 'pending') as pending_commitments,
279 (SELECT COUNT(*) FROM kyc_applications WHERE status = 'pending') as pending_kyc
280 "#,
281 )
282 .fetch_one(&self.pool)
283 .await?;
284
285 Ok(metrics)
286 }
287
288 pub async fn get_top_tokens(&self, limit: i64) -> Result<Vec<TokenMetricsSummary>> {
290 let result = sqlx::query_as::<_, TokenMetricsSummary>(
292 r#"
293 SELECT * FROM mv_token_metrics
294 ORDER BY volume_24h_btc DESC
295 LIMIT $1
296 "#,
297 )
298 .bind(limit)
299 .fetch_all(&self.pool)
300 .await;
301
302 if let Ok(tokens) = result {
303 if !tokens.is_empty() {
304 return Ok(tokens);
305 }
306 }
307
308 self.compute_top_tokens(limit).await
310 }
311
312 async fn compute_top_tokens(&self, limit: i64) -> Result<Vec<TokenMetricsSummary>> {
314 let tokens = sqlx::query_as::<_, TokenMetricsSummary>(
315 r#"
316 SELECT
317 t.token_id,
318 t.symbol,
319 t.name,
320 t.total_supply,
321 COALESCE(h.holder_count, 0) as holder_count,
322 COALESCE(tr.trade_count, 0) as trade_count,
323 COALESCE(tr.trades_24h, 0) as trades_24h,
324 COALESCE(tr.total_volume_btc, 0) as total_volume_btc,
325 COALESCE(tr.volume_24h_btc, 0) as volume_24h_btc,
326 COALESCE(tr.last_price, t.base_price) as current_price_btc,
327 COALESCE(
328 CASE WHEN tr.price_24h_ago > 0
329 THEN ((tr.last_price - tr.price_24h_ago) / tr.price_24h_ago * 100)
330 ELSE 0
331 END,
332 0
333 ) as price_change_24h_pct
334 FROM tokens t
335 LEFT JOIN (
336 SELECT token_id, COUNT(DISTINCT user_id) as holder_count
337 FROM balances
338 WHERE amount > 0
339 GROUP BY token_id
340 ) h ON h.token_id = t.token_id
341 LEFT JOIN (
342 SELECT
343 token_id,
344 COUNT(*) as trade_count,
345 COUNT(*) FILTER (WHERE created_at > NOW() - INTERVAL '24 hours') as trades_24h,
346 SUM(total_btc) as total_volume_btc,
347 SUM(total_btc) FILTER (WHERE created_at > NOW() - INTERVAL '24 hours') as volume_24h_btc,
348 (SELECT price_btc FROM trades tr2 WHERE tr2.token_id = trades.token_id ORDER BY created_at DESC LIMIT 1) as last_price,
349 (SELECT price_btc FROM trades tr2 WHERE tr2.token_id = trades.token_id AND tr2.created_at < NOW() - INTERVAL '24 hours' ORDER BY created_at DESC LIMIT 1) as price_24h_ago
350 FROM trades
351 GROUP BY token_id
352 ) tr ON tr.token_id = t.token_id
353 WHERE t.status = 'active'
354 ORDER BY COALESCE(tr.volume_24h_btc, 0) DESC
355 LIMIT $1
356 "#,
357 )
358 .bind(limit)
359 .fetch_all(&self.pool)
360 .await?;
361
362 Ok(tokens)
363 }
364
365 pub async fn get_daily_stats(
367 &self,
368 start_date: chrono::NaiveDate,
369 end_date: chrono::NaiveDate,
370 ) -> Result<Vec<DailyStats>> {
371 let result = sqlx::query_as::<_, DailyStats>(
373 r#"
374 SELECT * FROM mv_daily_stats
375 WHERE date >= $1 AND date <= $2
376 ORDER BY date DESC
377 "#,
378 )
379 .bind(start_date)
380 .bind(end_date)
381 .fetch_all(&self.pool)
382 .await;
383
384 if let Ok(stats) = result {
385 if !stats.is_empty() {
386 return Ok(stats);
387 }
388 }
389
390 self.compute_daily_stats(start_date, end_date).await
392 }
393
394 async fn compute_daily_stats(
396 &self,
397 start_date: chrono::NaiveDate,
398 end_date: chrono::NaiveDate,
399 ) -> Result<Vec<DailyStats>> {
400 let stats = sqlx::query_as::<_, DailyStats>(
401 r#"
402 WITH dates AS (
403 SELECT generate_series($1::date, $2::date, '1 day'::interval)::date as date
404 )
405 SELECT
406 d.date,
407 COALESCE(u.new_users, 0) as new_users,
408 COALESCE(t.new_tokens, 0) as new_tokens,
409 COALESCE(tr.trade_count, 0) as trade_count,
410 COALESCE(tr.volume_sats, 0) as volume_sats,
411 COALESCE(tr.fees_sats, 0) as fees_sats,
412 COALESCE(tr.active_users, 0) as active_users
413 FROM dates d
414 LEFT JOIN (
415 SELECT DATE(created_at) as date, COUNT(*) as new_users
416 FROM users
417 WHERE DATE(created_at) >= $1 AND DATE(created_at) <= $2
418 GROUP BY DATE(created_at)
419 ) u ON u.date = d.date
420 LEFT JOIN (
421 SELECT DATE(created_at) as date, COUNT(*) as new_tokens
422 FROM tokens
423 WHERE DATE(created_at) >= $1 AND DATE(created_at) <= $2
424 GROUP BY DATE(created_at)
425 ) t ON t.date = d.date
426 LEFT JOIN (
427 SELECT
428 DATE(created_at) as date,
429 COUNT(*) as trade_count,
430 SUM((total_btc * 100000000)::bigint) as volume_sats,
431 SUM((platform_fee * 100000000)::bigint) as fees_sats,
432 COUNT(DISTINCT buyer_id) + COUNT(DISTINCT seller_id) as active_users
433 FROM trades
434 WHERE DATE(created_at) >= $1 AND DATE(created_at) <= $2
435 GROUP BY DATE(created_at)
436 ) tr ON tr.date = d.date
437 ORDER BY d.date DESC
438 "#,
439 )
440 .bind(start_date)
441 .bind(end_date)
442 .fetch_all(&self.pool)
443 .await?;
444
445 Ok(stats)
446 }
447
448 pub async fn get_top_users(&self, limit: i64) -> Result<Vec<UserActivitySummary>> {
450 let users = sqlx::query_as::<_, UserActivitySummary>(
451 r#"
452 SELECT
453 u.user_id,
454 u.username,
455 COALESCE(t.trade_count, 0) as trade_count,
456 COALESCE(t.total_volume_btc, 0) as total_volume_btc,
457 COALESCE(b.tokens_held, 0) as tokens_held,
458 COALESCE(tk.tokens_issued, 0) as tokens_issued,
459 u.reputation_score,
460 GREATEST(t.last_trade, u.created_at) as last_activity
461 FROM users u
462 LEFT JOIN (
463 SELECT
464 user_id,
465 COUNT(*) as trade_count,
466 SUM(total_btc) as total_volume_btc,
467 MAX(created_at) as last_trade
468 FROM (
469 SELECT buyer_id as user_id, total_btc, created_at FROM trades
470 UNION ALL
471 SELECT seller_id as user_id, total_btc, created_at FROM trades
472 ) all_trades
473 GROUP BY user_id
474 ) t ON t.user_id = u.user_id
475 LEFT JOIN (
476 SELECT user_id, COUNT(DISTINCT token_id) as tokens_held
477 FROM balances
478 WHERE amount > 0
479 GROUP BY user_id
480 ) b ON b.user_id = u.user_id
481 LEFT JOIN (
482 SELECT issuer_id as user_id, COUNT(*) as tokens_issued
483 FROM tokens
484 GROUP BY issuer_id
485 ) tk ON tk.user_id = u.user_id
486 ORDER BY COALESCE(t.total_volume_btc, 0) DESC
487 LIMIT $1
488 "#,
489 )
490 .bind(limit)
491 .fetch_all(&self.pool)
492 .await?;
493
494 Ok(users)
495 }
496
497 pub async fn drop_materialized_views(&self) -> Result<()> {
499 sqlx::query("DROP MATERIALIZED VIEW IF EXISTS mv_dashboard_metrics CASCADE")
500 .execute(&self.pool)
501 .await?;
502 sqlx::query("DROP MATERIALIZED VIEW IF EXISTS mv_token_metrics CASCADE")
503 .execute(&self.pool)
504 .await?;
505 sqlx::query("DROP MATERIALIZED VIEW IF EXISTS mv_daily_stats CASCADE")
506 .execute(&self.pool)
507 .await?;
508 sqlx::query("DROP MATERIALIZED VIEW IF EXISTS mv_user_activity CASCADE")
509 .execute(&self.pool)
510 .await?;
511
512 tracing::info!("Dropped all materialized views");
513 Ok(())
514 }
515
516 pub async fn record_price_history(
525 &self,
526 token_id: Uuid,
527 price_satoshis: i64,
528 supply: i64,
529 ) -> Result<()> {
530 let market_cap_satoshis = price_satoshis.saturating_mul(supply);
531
532 sqlx::query(
533 r#"
534 INSERT INTO price_history (time, token_id, price_satoshis, supply, market_cap_satoshis)
535 VALUES (NOW(), $1, $2, $3, $4)
536 ON CONFLICT (time, token_id) DO UPDATE
537 SET price_satoshis = EXCLUDED.price_satoshis,
538 supply = EXCLUDED.supply,
539 market_cap_satoshis = EXCLUDED.market_cap_satoshis
540 "#,
541 )
542 .bind(token_id)
543 .bind(price_satoshis)
544 .bind(supply)
545 .bind(market_cap_satoshis)
546 .execute(&self.pool)
547 .await?;
548
549 Ok(())
550 }
551
552 pub async fn record_volume_history(
557 &self,
558 token_id: Uuid,
559 buy_volume_satoshis: i64,
560 sell_volume_satoshis: i64,
561 trade_count: i32,
562 unique_traders: i32,
563 ) -> Result<()> {
564 sqlx::query(
565 r#"
566 INSERT INTO volume_history
567 (time, token_id, buy_volume_satoshis, sell_volume_satoshis, trade_count, unique_traders)
568 VALUES (date_trunc('hour', NOW()), $1, $2, $3, $4, $5)
569 ON CONFLICT (time, token_id) DO UPDATE
570 SET buy_volume_satoshis = volume_history.buy_volume_satoshis + EXCLUDED.buy_volume_satoshis,
571 sell_volume_satoshis = volume_history.sell_volume_satoshis + EXCLUDED.sell_volume_satoshis,
572 trade_count = volume_history.trade_count + EXCLUDED.trade_count,
573 unique_traders = GREATEST(volume_history.unique_traders, EXCLUDED.unique_traders)
574 "#,
575 )
576 .bind(token_id)
577 .bind(buy_volume_satoshis)
578 .bind(sell_volume_satoshis)
579 .bind(trade_count)
580 .bind(unique_traders)
581 .execute(&self.pool)
582 .await?;
583
584 Ok(())
585 }
586
587 pub async fn record_platform_volume(
589 &self,
590 total_volume_satoshis: i64,
591 trade_count: i32,
592 active_tokens: i32,
593 active_traders: i32,
594 fees_collected_satoshis: i64,
595 ) -> Result<()> {
596 sqlx::query(
597 r#"
598 INSERT INTO platform_volume_history
599 (time, total_volume_satoshis, trade_count, active_tokens, active_traders, fees_collected_satoshis)
600 VALUES (date_trunc('hour', NOW()), $1, $2, $3, $4, $5)
601 ON CONFLICT (time) DO UPDATE
602 SET total_volume_satoshis = platform_volume_history.total_volume_satoshis + EXCLUDED.total_volume_satoshis,
603 trade_count = platform_volume_history.trade_count + EXCLUDED.trade_count,
604 active_tokens = GREATEST(platform_volume_history.active_tokens, EXCLUDED.active_tokens),
605 active_traders = GREATEST(platform_volume_history.active_traders, EXCLUDED.active_traders),
606 fees_collected_satoshis = platform_volume_history.fees_collected_satoshis + EXCLUDED.fees_collected_satoshis
607 "#,
608 )
609 .bind(total_volume_satoshis)
610 .bind(trade_count)
611 .bind(active_tokens)
612 .bind(active_traders)
613 .bind(fees_collected_satoshis)
614 .execute(&self.pool)
615 .await?;
616
617 Ok(())
618 }
619
620 pub async fn get_price_history(
622 &self,
623 token_id: Uuid,
624 start_time: DateTime<Utc>,
625 end_time: DateTime<Utc>,
626 ) -> Result<Vec<PriceHistory>> {
627 let history = sqlx::query_as::<_, PriceHistory>(
628 r#"
629 SELECT time, token_id, price_satoshis, supply, market_cap_satoshis
630 FROM price_history
631 WHERE token_id = $1 AND time >= $2 AND time <= $3
632 ORDER BY time ASC
633 "#,
634 )
635 .bind(token_id)
636 .bind(start_time)
637 .bind(end_time)
638 .fetch_all(&self.pool)
639 .await?;
640
641 Ok(history)
642 }
643
644 pub async fn get_volume_history(
646 &self,
647 token_id: Uuid,
648 start_time: DateTime<Utc>,
649 end_time: DateTime<Utc>,
650 ) -> Result<Vec<VolumeHistory>> {
651 let history = sqlx::query_as::<_, VolumeHistory>(
652 r#"
653 SELECT time, token_id, buy_volume_satoshis, sell_volume_satoshis, trade_count, unique_traders
654 FROM volume_history
655 WHERE token_id = $1 AND time >= $2 AND time <= $3
656 ORDER BY time ASC
657 "#,
658 )
659 .bind(token_id)
660 .bind(start_time)
661 .bind(end_time)
662 .fetch_all(&self.pool)
663 .await?;
664
665 Ok(history)
666 }
667
668 pub async fn get_ohlc_data(
673 &self,
674 token_id: Uuid,
675 start_time: DateTime<Utc>,
676 end_time: DateTime<Utc>,
677 bucket_interval: &str, ) -> Result<Vec<OhlcData>> {
679 if bucket_interval == "1 hour" {
681 if let Ok(data) = self
682 .get_ohlc_from_aggregate(token_id, start_time, end_time)
683 .await
684 {
685 if !data.is_empty() {
686 return Ok(data);
687 }
688 }
689 }
690
691 self.compute_ohlc_data(token_id, start_time, end_time, bucket_interval)
693 .await
694 }
695
696 async fn get_ohlc_from_aggregate(
698 &self,
699 token_id: Uuid,
700 start_time: DateTime<Utc>,
701 end_time: DateTime<Utc>,
702 ) -> Result<Vec<OhlcData>> {
703 let data = sqlx::query_as::<_, OhlcData>(
704 r#"
705 SELECT bucket, token_id, open_price, high_price, low_price, close_price,
706 final_supply, final_market_cap
707 FROM price_history_hourly
708 WHERE token_id = $1 AND bucket >= $2 AND bucket <= $3
709 ORDER BY bucket ASC
710 "#,
711 )
712 .bind(token_id)
713 .bind(start_time)
714 .bind(end_time)
715 .fetch_all(&self.pool)
716 .await?;
717
718 Ok(data)
719 }
720
721 async fn compute_ohlc_data(
723 &self,
724 token_id: Uuid,
725 start_time: DateTime<Utc>,
726 end_time: DateTime<Utc>,
727 bucket_interval: &str,
728 ) -> Result<Vec<OhlcData>> {
729 let data = sqlx::query_as::<_, OhlcData>(
730 r#"
731 SELECT
732 time_bucket($1::interval, time) AS bucket,
733 token_id,
734 first(price_satoshis, time) AS open_price,
735 max(price_satoshis) AS high_price,
736 min(price_satoshis) AS low_price,
737 last(price_satoshis, time) AS close_price,
738 last(supply, time) AS final_supply,
739 last(market_cap_satoshis, time) AS final_market_cap
740 FROM price_history
741 WHERE token_id = $2 AND time >= $3 AND time <= $4
742 GROUP BY bucket, token_id
743 ORDER BY bucket ASC
744 "#,
745 )
746 .bind(bucket_interval)
747 .bind(token_id)
748 .bind(start_time)
749 .bind(end_time)
750 .fetch_all(&self.pool)
751 .await?;
752
753 Ok(data)
754 }
755
756 pub async fn get_aggregated_volume(
758 &self,
759 token_id: Option<Uuid>,
760 start_time: DateTime<Utc>,
761 end_time: DateTime<Utc>,
762 bucket_interval: &str, ) -> Result<Vec<VolumeHistory>> {
764 let volume = if let Some(tid) = token_id {
765 sqlx::query_as::<_, VolumeHistory>(
767 r#"
768 SELECT
769 time_bucket($1::interval, time) AS time,
770 token_id,
771 sum(buy_volume_satoshis) AS buy_volume_satoshis,
772 sum(sell_volume_satoshis) AS sell_volume_satoshis,
773 sum(trade_count)::int AS trade_count,
774 max(unique_traders)::int AS unique_traders
775 FROM volume_history
776 WHERE token_id = $2 AND time >= $3 AND time <= $4
777 GROUP BY time_bucket($1::interval, time), token_id
778 ORDER BY time ASC
779 "#,
780 )
781 .bind(bucket_interval)
782 .bind(tid)
783 .bind(start_time)
784 .bind(end_time)
785 .fetch_all(&self.pool)
786 .await?
787 } else {
788 sqlx::query_as::<_, VolumeHistory>(
790 r#"
791 SELECT
792 time_bucket($1::interval, time) AS time,
793 '00000000-0000-0000-0000-000000000000'::uuid AS token_id,
794 sum(buy_volume_satoshis) AS buy_volume_satoshis,
795 sum(sell_volume_satoshis) AS sell_volume_satoshis,
796 sum(trade_count)::int AS trade_count,
797 sum(unique_traders)::int AS unique_traders
798 FROM volume_history
799 WHERE time >= $2 AND time <= $3
800 GROUP BY time_bucket($1::interval, time)
801 ORDER BY time ASC
802 "#,
803 )
804 .bind(bucket_interval)
805 .bind(start_time)
806 .bind(end_time)
807 .fetch_all(&self.pool)
808 .await?
809 };
810
811 Ok(volume)
812 }
813
814 pub async fn get_platform_volume_history(
816 &self,
817 start_time: DateTime<Utc>,
818 end_time: DateTime<Utc>,
819 ) -> Result<Vec<PlatformVolumeHistory>> {
820 let history = sqlx::query_as::<_, PlatformVolumeHistory>(
821 r#"
822 SELECT time, total_volume_satoshis, trade_count, active_tokens,
823 active_traders, fees_collected_satoshis
824 FROM platform_volume_history
825 WHERE time >= $1 AND time <= $2
826 ORDER BY time ASC
827 "#,
828 )
829 .bind(start_time)
830 .bind(end_time)
831 .fetch_all(&self.pool)
832 .await?;
833
834 Ok(history)
835 }
836
837 pub async fn is_timescaledb_available(&self) -> bool {
839 sqlx::query_scalar::<_, bool>(
840 "SELECT EXISTS(SELECT 1 FROM pg_extension WHERE extname = 'timescaledb')",
841 )
842 .fetch_one(&self.pool)
843 .await
844 .unwrap_or(false)
845 }
846
847 pub async fn get_hypertable_info(&self, table_name: &str) -> Result<Option<String>> {
849 let info = sqlx::query_scalar::<_, String>(
850 r#"
851 SELECT format('Hypertable: %s, Chunks: %s, Compression: %s',
852 hypertable_name,
853 num_chunks,
854 compression_enabled)
855 FROM timescaledb_information.hypertables
856 WHERE hypertable_name = $1
857 "#,
858 )
859 .bind(table_name)
860 .fetch_optional(&self.pool)
861 .await?;
862
863 Ok(info)
864 }
865}
866
867const DASHBOARD_METRICS_VIEW_SQL: &str = r#"
869CREATE MATERIALIZED VIEW IF NOT EXISTS mv_dashboard_metrics AS
870SELECT
871 (SELECT COUNT(*) FROM users) as total_users,
872 (SELECT COUNT(*) FROM users WHERE created_at > NOW() - INTERVAL '24 hours') as new_users_24h,
873 (SELECT COUNT(*) FROM users WHERE created_at > NOW() - INTERVAL '7 days') as new_users_7d,
874 (SELECT COUNT(*) FROM tokens WHERE status = 'active') as total_tokens,
875 (SELECT COUNT(*) FROM tokens WHERE created_at > NOW() - INTERVAL '24 hours') as new_tokens_24h,
876 (SELECT COUNT(*) FROM trades) as total_trades,
877 (SELECT COUNT(*) FROM trades WHERE created_at > NOW() - INTERVAL '24 hours') as trades_24h,
878 COALESCE((SELECT SUM((total_btc * 100000000)::bigint) FROM trades), 0) as total_volume_sats,
879 COALESCE((SELECT SUM((total_btc * 100000000)::bigint) FROM trades WHERE created_at > NOW() - INTERVAL '24 hours'), 0) as volume_24h_sats,
880 COALESCE((SELECT SUM((platform_fee * 100000000)::bigint) FROM trades), 0) as total_fees_sats,
881 COALESCE((SELECT SUM((platform_fee * 100000000)::bigint) FROM trades WHERE created_at > NOW() - INTERVAL '24 hours'), 0) as fees_24h_sats,
882 (SELECT COUNT(*) FROM output_commitments WHERE status = 'pending') as pending_commitments,
883 COALESCE((SELECT COUNT(*) FROM kyc_applications WHERE status = 'pending'), 0) as pending_kyc;
884
885CREATE UNIQUE INDEX IF NOT EXISTS mv_dashboard_metrics_idx ON mv_dashboard_metrics ((1));
886"#;
887
888const TOKEN_METRICS_VIEW_SQL: &str = r#"
890CREATE MATERIALIZED VIEW IF NOT EXISTS mv_token_metrics AS
891SELECT
892 t.token_id,
893 t.symbol,
894 t.name,
895 t.total_supply,
896 COALESCE(h.holder_count, 0) as holder_count,
897 COALESCE(tr.trade_count, 0) as trade_count,
898 COALESCE(tr.trades_24h, 0) as trades_24h,
899 COALESCE(tr.total_volume_btc, 0) as total_volume_btc,
900 COALESCE(tr.volume_24h_btc, 0) as volume_24h_btc,
901 COALESCE(tr.last_price, t.base_price) as current_price_btc,
902 COALESCE(
903 CASE WHEN tr.price_24h_ago > 0
904 THEN ((tr.last_price - tr.price_24h_ago) / tr.price_24h_ago * 100)
905 ELSE 0
906 END,
907 0
908 ) as price_change_24h_pct
909FROM tokens t
910LEFT JOIN (
911 SELECT token_id, COUNT(DISTINCT user_id) as holder_count
912 FROM balances
913 WHERE amount > 0
914 GROUP BY token_id
915) h ON h.token_id = t.token_id
916LEFT JOIN (
917 SELECT
918 token_id,
919 COUNT(*) as trade_count,
920 COUNT(*) FILTER (WHERE created_at > NOW() - INTERVAL '24 hours') as trades_24h,
921 SUM(total_btc) as total_volume_btc,
922 SUM(total_btc) FILTER (WHERE created_at > NOW() - INTERVAL '24 hours') as volume_24h_btc,
923 (SELECT price_btc FROM trades tr2 WHERE tr2.token_id = trades.token_id ORDER BY created_at DESC LIMIT 1) as last_price,
924 (SELECT price_btc FROM trades tr2 WHERE tr2.token_id = trades.token_id AND tr2.created_at < NOW() - INTERVAL '24 hours' ORDER BY created_at DESC LIMIT 1) as price_24h_ago
925 FROM trades
926 GROUP BY token_id
927) tr ON tr.token_id = t.token_id
928WHERE t.status = 'active';
929
930CREATE UNIQUE INDEX IF NOT EXISTS mv_token_metrics_token_id_idx ON mv_token_metrics (token_id);
931CREATE INDEX IF NOT EXISTS mv_token_metrics_volume_idx ON mv_token_metrics (volume_24h_btc DESC);
932"#;
933
934const DAILY_STATS_VIEW_SQL: &str = r#"
936CREATE MATERIALIZED VIEW IF NOT EXISTS mv_daily_stats AS
937WITH dates AS (
938 SELECT generate_series(
939 (SELECT COALESCE(MIN(DATE(created_at)), CURRENT_DATE - INTERVAL '30 days') FROM users),
940 CURRENT_DATE,
941 '1 day'::interval
942 )::date as date
943)
944SELECT
945 d.date,
946 COALESCE(u.new_users, 0)::bigint as new_users,
947 COALESCE(t.new_tokens, 0)::bigint as new_tokens,
948 COALESCE(tr.trade_count, 0)::bigint as trade_count,
949 COALESCE(tr.volume_sats, 0)::bigint as volume_sats,
950 COALESCE(tr.fees_sats, 0)::bigint as fees_sats,
951 COALESCE(tr.active_users, 0)::bigint as active_users
952FROM dates d
953LEFT JOIN (
954 SELECT DATE(created_at) as date, COUNT(*) as new_users
955 FROM users
956 GROUP BY DATE(created_at)
957) u ON u.date = d.date
958LEFT JOIN (
959 SELECT DATE(created_at) as date, COUNT(*) as new_tokens
960 FROM tokens
961 GROUP BY DATE(created_at)
962) t ON t.date = d.date
963LEFT JOIN (
964 SELECT
965 DATE(created_at) as date,
966 COUNT(*) as trade_count,
967 SUM((total_btc * 100000000)::bigint) as volume_sats,
968 SUM((platform_fee * 100000000)::bigint) as fees_sats,
969 COUNT(DISTINCT buyer_id) + COUNT(DISTINCT seller_id) as active_users
970 FROM trades
971 GROUP BY DATE(created_at)
972) tr ON tr.date = d.date;
973
974CREATE UNIQUE INDEX IF NOT EXISTS mv_daily_stats_date_idx ON mv_daily_stats (date);
975"#;
976
977const USER_ACTIVITY_VIEW_SQL: &str = r#"
979CREATE MATERIALIZED VIEW IF NOT EXISTS mv_user_activity AS
980SELECT
981 u.user_id,
982 u.username,
983 COALESCE(t.trade_count, 0)::bigint as trade_count,
984 COALESCE(t.total_volume_btc, 0) as total_volume_btc,
985 COALESCE(b.tokens_held, 0)::bigint as tokens_held,
986 COALESCE(tk.tokens_issued, 0)::bigint as tokens_issued,
987 u.reputation_score,
988 GREATEST(t.last_trade, u.created_at) as last_activity
989FROM users u
990LEFT JOIN (
991 SELECT
992 user_id,
993 COUNT(*) as trade_count,
994 SUM(total_btc) as total_volume_btc,
995 MAX(created_at) as last_trade
996 FROM (
997 SELECT buyer_id as user_id, total_btc, created_at FROM trades
998 UNION ALL
999 SELECT seller_id as user_id, total_btc, created_at FROM trades
1000 ) all_trades
1001 GROUP BY user_id
1002) t ON t.user_id = u.user_id
1003LEFT JOIN (
1004 SELECT user_id, COUNT(DISTINCT token_id) as tokens_held
1005 FROM balances
1006 WHERE amount > 0
1007 GROUP BY user_id
1008) b ON b.user_id = u.user_id
1009LEFT JOIN (
1010 SELECT issuer_id as user_id, COUNT(*) as tokens_issued
1011 FROM tokens
1012 GROUP BY issuer_id
1013) tk ON tk.user_id = u.user_id;
1014
1015CREATE UNIQUE INDEX IF NOT EXISTS mv_user_activity_user_id_idx ON mv_user_activity (user_id);
1016CREATE INDEX IF NOT EXISTS mv_user_activity_volume_idx ON mv_user_activity (total_volume_btc DESC);
1017"#;
1018
1019#[derive(Debug, Clone)]
1021pub struct RefreshConfig {
1022 pub refresh_interval_secs: u64,
1024 pub concurrent: bool,
1026}
1027
1028impl Default for RefreshConfig {
1029 fn default() -> Self {
1030 Self {
1031 refresh_interval_secs: 300, concurrent: true,
1033 }
1034 }
1035}
1036
1037#[cfg(test)]
1038mod tests {
1039 use super::*;
1040
1041 #[test]
1042 fn test_refresh_config_defaults() {
1043 let config = RefreshConfig::default();
1044 assert_eq!(config.refresh_interval_secs, 300);
1045 assert!(config.concurrent);
1046 }
1047
1048 #[test]
1049 fn test_price_history_creation() {
1050 let token_id = Uuid::new_v4();
1051 let price_history = PriceHistory {
1052 time: Utc::now(),
1053 token_id,
1054 price_satoshis: 100_000,
1055 supply: 1_000_000,
1056 market_cap_satoshis: 100_000_000_000,
1057 };
1058
1059 assert_eq!(price_history.token_id, token_id);
1060 assert_eq!(price_history.price_satoshis, 100_000);
1061 assert_eq!(price_history.supply, 1_000_000);
1062 assert_eq!(price_history.market_cap_satoshis, 100_000_000_000);
1063 }
1064
1065 #[test]
1066 fn test_volume_history_creation() {
1067 let token_id = Uuid::new_v4();
1068 let volume_history = VolumeHistory {
1069 time: Utc::now(),
1070 token_id,
1071 buy_volume_satoshis: 50_000_000,
1072 sell_volume_satoshis: 30_000_000,
1073 trade_count: 42,
1074 unique_traders: 15,
1075 };
1076
1077 assert_eq!(volume_history.token_id, token_id);
1078 assert_eq!(volume_history.buy_volume_satoshis, 50_000_000);
1079 assert_eq!(volume_history.sell_volume_satoshis, 30_000_000);
1080 assert_eq!(volume_history.trade_count, 42);
1081 assert_eq!(volume_history.unique_traders, 15);
1082 }
1083
1084 #[test]
1085 fn test_platform_volume_history_creation() {
1086 let platform_volume = PlatformVolumeHistory {
1087 time: Utc::now(),
1088 total_volume_satoshis: 1_000_000_000,
1089 trade_count: 1000,
1090 active_tokens: 50,
1091 active_traders: 200,
1092 fees_collected_satoshis: 5_000_000,
1093 };
1094
1095 assert_eq!(platform_volume.total_volume_satoshis, 1_000_000_000);
1096 assert_eq!(platform_volume.trade_count, 1000);
1097 assert_eq!(platform_volume.active_tokens, 50);
1098 assert_eq!(platform_volume.active_traders, 200);
1099 assert_eq!(platform_volume.fees_collected_satoshis, 5_000_000);
1100 }
1101
1102 #[test]
1103 fn test_ohlc_data_creation() {
1104 let token_id = Uuid::new_v4();
1105 let ohlc = OhlcData {
1106 bucket: Utc::now(),
1107 token_id,
1108 open_price: 95_000,
1109 high_price: 105_000,
1110 low_price: 90_000,
1111 close_price: 100_000,
1112 final_supply: 1_000_000,
1113 final_market_cap: 100_000_000_000,
1114 };
1115
1116 assert_eq!(ohlc.token_id, token_id);
1117 assert_eq!(ohlc.open_price, 95_000);
1118 assert_eq!(ohlc.high_price, 105_000);
1119 assert_eq!(ohlc.low_price, 90_000);
1120 assert_eq!(ohlc.close_price, 100_000);
1121 assert!(ohlc.high_price >= ohlc.open_price);
1122 assert!(ohlc.high_price >= ohlc.close_price);
1123 assert!(ohlc.low_price <= ohlc.open_price);
1124 assert!(ohlc.low_price <= ohlc.close_price);
1125 }
1126
1127 #[test]
1128 fn test_timescaledb_structures_are_serializable() {
1129 let token_id = Uuid::new_v4();
1130 let time = Utc::now();
1131
1132 let price = PriceHistory {
1133 time,
1134 token_id,
1135 price_satoshis: 100_000,
1136 supply: 1_000_000,
1137 market_cap_satoshis: 100_000_000_000,
1138 };
1139
1140 let volume = VolumeHistory {
1141 time,
1142 token_id,
1143 buy_volume_satoshis: 50_000_000,
1144 sell_volume_satoshis: 30_000_000,
1145 trade_count: 42,
1146 unique_traders: 15,
1147 };
1148
1149 let platform = PlatformVolumeHistory {
1150 time,
1151 total_volume_satoshis: 1_000_000_000,
1152 trade_count: 1000,
1153 active_tokens: 50,
1154 active_traders: 200,
1155 fees_collected_satoshis: 5_000_000,
1156 };
1157
1158 let ohlc = OhlcData {
1159 bucket: time,
1160 token_id,
1161 open_price: 95_000,
1162 high_price: 105_000,
1163 low_price: 90_000,
1164 close_price: 100_000,
1165 final_supply: 1_000_000,
1166 final_market_cap: 100_000_000_000,
1167 };
1168
1169 assert!(serde_json::to_string(&price).is_ok());
1171 assert!(serde_json::to_string(&volume).is_ok());
1172 assert!(serde_json::to_string(&platform).is_ok());
1173 assert!(serde_json::to_string(&ohlc).is_ok());
1174 }
1175
1176 #[test]
1177 fn test_price_history_market_cap_calculation() {
1178 let token_id = Uuid::new_v4();
1179 let price_satoshis = 100_000i64;
1180 let supply = 1_000_000i64;
1181 let expected_market_cap = price_satoshis.saturating_mul(supply);
1182
1183 let price_history = PriceHistory {
1184 time: Utc::now(),
1185 token_id,
1186 price_satoshis,
1187 supply,
1188 market_cap_satoshis: expected_market_cap,
1189 };
1190
1191 assert_eq!(price_history.market_cap_satoshis, 100_000_000_000);
1192 assert_eq!(
1193 price_history.market_cap_satoshis,
1194 price_history.price_satoshis * price_history.supply
1195 );
1196 }
1197}