kaccy_bitcoin/
activity_monitor.rs

1//! Suspicious Activity Detection Module
2//!
3//! This module monitors for suspicious transaction patterns including
4//! large transactions and rapid succession of transactions.
5
6use chrono::{DateTime, Duration, Utc};
7use serde::{Deserialize, Serialize};
8use std::collections::{HashMap, VecDeque};
9use std::sync::Arc;
10use tokio::sync::{RwLock, broadcast};
11
12/// Suspicious activity type
13#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
14pub enum ActivityType {
15    /// Large single transaction
16    LargeTransaction,
17    /// Rapid succession of transactions
18    RapidTransactions,
19    /// Unusual pattern (e.g., round-number amounts)
20    UnusualPattern,
21    /// Velocity limit exceeded (too much volume too fast)
22    VelocityExceeded,
23}
24
25/// Suspicious activity alert
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct SuspiciousActivity {
28    /// User ID
29    pub user_id: String,
30    /// Activity type
31    pub activity_type: ActivityType,
32    /// Description
33    pub description: String,
34    /// Transaction IDs involved
35    pub txids: Vec<String>,
36    /// Total amount involved (in satoshis)
37    pub amount_sats: u64,
38    /// When detected
39    pub detected_at: DateTime<Utc>,
40    /// Risk score (0-100)
41    pub risk_score: u8,
42}
43
44/// Activity monitoring configuration
45#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct ActivityMonitorConfig {
47    /// Large transaction threshold (in satoshis)
48    pub large_tx_threshold_sats: u64,
49    /// Rapid transaction count threshold
50    pub rapid_tx_count: u32,
51    /// Rapid transaction time window (seconds)
52    pub rapid_tx_window_secs: i64,
53    /// Velocity limit (satoshis per hour)
54    pub velocity_limit_sats_per_hour: u64,
55    /// Minimum risk score to trigger alert
56    pub min_risk_score: u8,
57}
58
59impl Default for ActivityMonitorConfig {
60    fn default() -> Self {
61        Self {
62            large_tx_threshold_sats: 10_000_000, // 0.1 BTC
63            rapid_tx_count: 5,
64            rapid_tx_window_secs: 300,                // 5 minutes
65            velocity_limit_sats_per_hour: 50_000_000, // 0.5 BTC per hour
66            min_risk_score: 50,
67        }
68    }
69}
70
71/// Transaction record for monitoring
72#[derive(Debug, Clone, Serialize, Deserialize)]
73pub struct TransactionRecord {
74    /// Transaction ID
75    pub txid: String,
76    /// User ID
77    pub user_id: String,
78    /// Amount in satoshis
79    pub amount_sats: u64,
80    /// Timestamp
81    pub timestamp: DateTime<Utc>,
82}
83
84/// Activity monitor
85pub struct ActivityMonitor {
86    config: ActivityMonitorConfig,
87    /// Recent transactions by user
88    user_transactions: Arc<RwLock<HashMap<String, VecDeque<TransactionRecord>>>>,
89    /// Alert broadcaster
90    alert_tx: broadcast::Sender<SuspiciousActivity>,
91}
92
93impl ActivityMonitor {
94    /// Create a new activity monitor
95    pub fn new(config: ActivityMonitorConfig) -> Self {
96        let (alert_tx, _) = broadcast::channel(100);
97        Self {
98            config,
99            user_transactions: Arc::new(RwLock::new(HashMap::new())),
100            alert_tx,
101        }
102    }
103
104    /// Subscribe to alerts
105    pub fn subscribe(&self) -> broadcast::Receiver<SuspiciousActivity> {
106        self.alert_tx.subscribe()
107    }
108
109    /// Record a transaction and check for suspicious activity
110    pub async fn record_transaction(
111        &self,
112        record: TransactionRecord,
113    ) -> Option<SuspiciousActivity> {
114        let user_id = record.user_id.clone();
115
116        // Add to user transactions
117        {
118            let mut transactions = self.user_transactions.write().await;
119            transactions
120                .entry(user_id.clone())
121                .or_insert_with(VecDeque::new)
122                .push_back(record.clone());
123        }
124
125        // Cleanup old records
126        self.cleanup_old_records(&user_id).await;
127
128        // Check for suspicious activity
129        let mut alerts = Vec::new();
130
131        // Check for large transaction
132        if let Some(alert) = self.check_large_transaction(&record).await {
133            alerts.push(alert);
134        }
135
136        // Check for rapid transactions
137        if let Some(alert) = self.check_rapid_transactions(&user_id).await {
138            alerts.push(alert);
139        }
140
141        // Check velocity
142        if let Some(alert) = self.check_velocity(&user_id).await {
143            alerts.push(alert);
144        }
145
146        // Send alerts (highest risk score first)
147        let mut highest_alert: Option<SuspiciousActivity> = None;
148        for alert in alerts {
149            if alert.risk_score >= self.config.min_risk_score {
150                let _ = self.alert_tx.send(alert.clone());
151                if highest_alert.is_none()
152                    || alert.risk_score > highest_alert.as_ref().unwrap().risk_score
153                {
154                    highest_alert = Some(alert);
155                }
156            }
157        }
158
159        highest_alert
160    }
161
162    /// Check for large transaction
163    async fn check_large_transaction(
164        &self,
165        record: &TransactionRecord,
166    ) -> Option<SuspiciousActivity> {
167        if record.amount_sats >= self.config.large_tx_threshold_sats {
168            let risk_score = self.calculate_large_tx_risk_score(record.amount_sats);
169            Some(SuspiciousActivity {
170                user_id: record.user_id.clone(),
171                activity_type: ActivityType::LargeTransaction,
172                description: format!("Large transaction detected: {} sats", record.amount_sats),
173                txids: vec![record.txid.clone()],
174                amount_sats: record.amount_sats,
175                detected_at: Utc::now(),
176                risk_score,
177            })
178        } else {
179            None
180        }
181    }
182
183    /// Check for rapid transactions
184    async fn check_rapid_transactions(&self, user_id: &str) -> Option<SuspiciousActivity> {
185        let transactions = self.user_transactions.read().await;
186        let user_txs = transactions.get(user_id)?;
187
188        let now = Utc::now();
189        let window_start = now - Duration::seconds(self.config.rapid_tx_window_secs);
190
191        let recent: Vec<_> = user_txs
192            .iter()
193            .filter(|tx| tx.timestamp >= window_start)
194            .collect();
195
196        if recent.len() as u32 >= self.config.rapid_tx_count {
197            let total_amount: u64 = recent.iter().map(|tx| tx.amount_sats).sum();
198            let txids: Vec<_> = recent.iter().map(|tx| tx.txid.clone()).collect();
199
200            let risk_score = self.calculate_rapid_tx_risk_score(recent.len());
201
202            Some(SuspiciousActivity {
203                user_id: user_id.to_string(),
204                activity_type: ActivityType::RapidTransactions,
205                description: format!(
206                    "{} transactions in {} seconds",
207                    recent.len(),
208                    self.config.rapid_tx_window_secs
209                ),
210                txids,
211                amount_sats: total_amount,
212                detected_at: Utc::now(),
213                risk_score,
214            })
215        } else {
216            None
217        }
218    }
219
220    /// Check velocity (amount per hour)
221    async fn check_velocity(&self, user_id: &str) -> Option<SuspiciousActivity> {
222        let transactions = self.user_transactions.read().await;
223        let user_txs = transactions.get(user_id)?;
224
225        let now = Utc::now();
226        let hour_ago = now - Duration::hours(1);
227
228        let recent: Vec<_> = user_txs
229            .iter()
230            .filter(|tx| tx.timestamp >= hour_ago)
231            .collect();
232
233        let total_amount: u64 = recent.iter().map(|tx| tx.amount_sats).sum();
234
235        if total_amount >= self.config.velocity_limit_sats_per_hour {
236            let txids: Vec<_> = recent.iter().map(|tx| tx.txid.clone()).collect();
237
238            let risk_score = self.calculate_velocity_risk_score(total_amount);
239
240            Some(SuspiciousActivity {
241                user_id: user_id.to_string(),
242                activity_type: ActivityType::VelocityExceeded,
243                description: format!("High velocity: {} sats in 1 hour", total_amount),
244                txids,
245                amount_sats: total_amount,
246                detected_at: Utc::now(),
247                risk_score,
248            })
249        } else {
250            None
251        }
252    }
253
254    /// Calculate risk score for large transaction (0-100)
255    fn calculate_large_tx_risk_score(&self, amount_sats: u64) -> u8 {
256        let threshold = self.config.large_tx_threshold_sats;
257        let ratio = amount_sats as f64 / threshold as f64;
258
259        // Risk increases logarithmically with amount
260        let score = 50.0 + 25.0 * ratio.ln();
261        score.clamp(0.0, 100.0) as u8
262    }
263
264    /// Calculate risk score for rapid transactions (0-100)
265    fn calculate_rapid_tx_risk_score(&self, count: usize) -> u8 {
266        let threshold = self.config.rapid_tx_count as f64;
267        let ratio = count as f64 / threshold;
268
269        let score = 50.0 + 30.0 * (ratio - 1.0);
270        score.clamp(0.0, 100.0) as u8
271    }
272
273    /// Calculate risk score for velocity (0-100)
274    fn calculate_velocity_risk_score(&self, amount_sats: u64) -> u8 {
275        let threshold = self.config.velocity_limit_sats_per_hour;
276        let ratio = amount_sats as f64 / threshold as f64;
277
278        let score = 50.0 + 30.0 * (ratio - 1.0);
279        score.clamp(0.0, 100.0) as u8
280    }
281
282    /// Cleanup old transaction records
283    async fn cleanup_old_records(&self, user_id: &str) {
284        let mut transactions = self.user_transactions.write().await;
285        if let Some(user_txs) = transactions.get_mut(user_id) {
286            let now = Utc::now();
287            let cutoff = now - Duration::hours(24); // Keep 24 hours of history
288
289            while let Some(tx) = user_txs.front() {
290                if tx.timestamp < cutoff {
291                    user_txs.pop_front();
292                } else {
293                    break;
294                }
295            }
296        }
297    }
298
299    /// Get user transaction history
300    pub async fn get_user_history(&self, user_id: &str) -> Vec<TransactionRecord> {
301        let transactions = self.user_transactions.read().await;
302        transactions
303            .get(user_id)
304            .map(|txs| txs.iter().cloned().collect())
305            .unwrap_or_default()
306    }
307}
308
309#[cfg(test)]
310mod tests {
311    use super::*;
312
313    #[test]
314    fn test_activity_monitor_config_defaults() {
315        let config = ActivityMonitorConfig::default();
316        assert_eq!(config.large_tx_threshold_sats, 10_000_000);
317        assert_eq!(config.rapid_tx_count, 5);
318        assert_eq!(config.rapid_tx_window_secs, 300);
319        assert_eq!(config.velocity_limit_sats_per_hour, 50_000_000);
320    }
321
322    #[tokio::test]
323    async fn test_large_transaction_detection() {
324        let monitor = ActivityMonitor::new(ActivityMonitorConfig::default());
325
326        let record = TransactionRecord {
327            txid: "tx1".to_string(),
328            user_id: "user1".to_string(),
329            amount_sats: 20_000_000, // Above threshold
330            timestamp: Utc::now(),
331        };
332
333        let alert = monitor.record_transaction(record).await;
334        assert!(alert.is_some());
335
336        let alert = alert.unwrap();
337        assert_eq!(alert.activity_type, ActivityType::LargeTransaction);
338        assert!(alert.risk_score >= 50);
339    }
340
341    #[tokio::test]
342    async fn test_rapid_transactions_detection() {
343        let monitor = ActivityMonitor::new(ActivityMonitorConfig::default());
344
345        // Record 5 transactions in quick succession
346        for i in 0..5 {
347            let record = TransactionRecord {
348                txid: format!("tx{}", i),
349                user_id: "user1".to_string(),
350                amount_sats: 1_000_000,
351                timestamp: Utc::now(),
352            };
353            monitor.record_transaction(record).await;
354        }
355
356        // The 5th transaction should trigger an alert
357        let history = monitor.get_user_history("user1").await;
358        assert_eq!(history.len(), 5);
359    }
360}