1use chrono::{DateTime, Duration, Utc};
7use serde::{Deserialize, Serialize};
8use std::collections::{HashMap, VecDeque};
9use std::sync::Arc;
10use tokio::sync::{RwLock, broadcast};
11
12#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
14pub enum ActivityType {
15 LargeTransaction,
17 RapidTransactions,
19 UnusualPattern,
21 VelocityExceeded,
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct SuspiciousActivity {
28 pub user_id: String,
30 pub activity_type: ActivityType,
32 pub description: String,
34 pub txids: Vec<String>,
36 pub amount_sats: u64,
38 pub detected_at: DateTime<Utc>,
40 pub risk_score: u8,
42}
43
44#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct ActivityMonitorConfig {
47 pub large_tx_threshold_sats: u64,
49 pub rapid_tx_count: u32,
51 pub rapid_tx_window_secs: i64,
53 pub velocity_limit_sats_per_hour: u64,
55 pub min_risk_score: u8,
57}
58
59impl Default for ActivityMonitorConfig {
60 fn default() -> Self {
61 Self {
62 large_tx_threshold_sats: 10_000_000, rapid_tx_count: 5,
64 rapid_tx_window_secs: 300, velocity_limit_sats_per_hour: 50_000_000, min_risk_score: 50,
67 }
68 }
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize)]
73pub struct TransactionRecord {
74 pub txid: String,
76 pub user_id: String,
78 pub amount_sats: u64,
80 pub timestamp: DateTime<Utc>,
82}
83
84pub struct ActivityMonitor {
86 config: ActivityMonitorConfig,
87 user_transactions: Arc<RwLock<HashMap<String, VecDeque<TransactionRecord>>>>,
89 alert_tx: broadcast::Sender<SuspiciousActivity>,
91}
92
93impl ActivityMonitor {
94 pub fn new(config: ActivityMonitorConfig) -> Self {
96 let (alert_tx, _) = broadcast::channel(100);
97 Self {
98 config,
99 user_transactions: Arc::new(RwLock::new(HashMap::new())),
100 alert_tx,
101 }
102 }
103
104 pub fn subscribe(&self) -> broadcast::Receiver<SuspiciousActivity> {
106 self.alert_tx.subscribe()
107 }
108
109 pub async fn record_transaction(
111 &self,
112 record: TransactionRecord,
113 ) -> Option<SuspiciousActivity> {
114 let user_id = record.user_id.clone();
115
116 {
118 let mut transactions = self.user_transactions.write().await;
119 transactions
120 .entry(user_id.clone())
121 .or_insert_with(VecDeque::new)
122 .push_back(record.clone());
123 }
124
125 self.cleanup_old_records(&user_id).await;
127
128 let mut alerts = Vec::new();
130
131 if let Some(alert) = self.check_large_transaction(&record).await {
133 alerts.push(alert);
134 }
135
136 if let Some(alert) = self.check_rapid_transactions(&user_id).await {
138 alerts.push(alert);
139 }
140
141 if let Some(alert) = self.check_velocity(&user_id).await {
143 alerts.push(alert);
144 }
145
146 let mut highest_alert: Option<SuspiciousActivity> = None;
148 for alert in alerts {
149 if alert.risk_score >= self.config.min_risk_score {
150 let _ = self.alert_tx.send(alert.clone());
151 if highest_alert.is_none()
152 || alert.risk_score > highest_alert.as_ref().unwrap().risk_score
153 {
154 highest_alert = Some(alert);
155 }
156 }
157 }
158
159 highest_alert
160 }
161
162 async fn check_large_transaction(
164 &self,
165 record: &TransactionRecord,
166 ) -> Option<SuspiciousActivity> {
167 if record.amount_sats >= self.config.large_tx_threshold_sats {
168 let risk_score = self.calculate_large_tx_risk_score(record.amount_sats);
169 Some(SuspiciousActivity {
170 user_id: record.user_id.clone(),
171 activity_type: ActivityType::LargeTransaction,
172 description: format!("Large transaction detected: {} sats", record.amount_sats),
173 txids: vec![record.txid.clone()],
174 amount_sats: record.amount_sats,
175 detected_at: Utc::now(),
176 risk_score,
177 })
178 } else {
179 None
180 }
181 }
182
183 async fn check_rapid_transactions(&self, user_id: &str) -> Option<SuspiciousActivity> {
185 let transactions = self.user_transactions.read().await;
186 let user_txs = transactions.get(user_id)?;
187
188 let now = Utc::now();
189 let window_start = now - Duration::seconds(self.config.rapid_tx_window_secs);
190
191 let recent: Vec<_> = user_txs
192 .iter()
193 .filter(|tx| tx.timestamp >= window_start)
194 .collect();
195
196 if recent.len() as u32 >= self.config.rapid_tx_count {
197 let total_amount: u64 = recent.iter().map(|tx| tx.amount_sats).sum();
198 let txids: Vec<_> = recent.iter().map(|tx| tx.txid.clone()).collect();
199
200 let risk_score = self.calculate_rapid_tx_risk_score(recent.len());
201
202 Some(SuspiciousActivity {
203 user_id: user_id.to_string(),
204 activity_type: ActivityType::RapidTransactions,
205 description: format!(
206 "{} transactions in {} seconds",
207 recent.len(),
208 self.config.rapid_tx_window_secs
209 ),
210 txids,
211 amount_sats: total_amount,
212 detected_at: Utc::now(),
213 risk_score,
214 })
215 } else {
216 None
217 }
218 }
219
220 async fn check_velocity(&self, user_id: &str) -> Option<SuspiciousActivity> {
222 let transactions = self.user_transactions.read().await;
223 let user_txs = transactions.get(user_id)?;
224
225 let now = Utc::now();
226 let hour_ago = now - Duration::hours(1);
227
228 let recent: Vec<_> = user_txs
229 .iter()
230 .filter(|tx| tx.timestamp >= hour_ago)
231 .collect();
232
233 let total_amount: u64 = recent.iter().map(|tx| tx.amount_sats).sum();
234
235 if total_amount >= self.config.velocity_limit_sats_per_hour {
236 let txids: Vec<_> = recent.iter().map(|tx| tx.txid.clone()).collect();
237
238 let risk_score = self.calculate_velocity_risk_score(total_amount);
239
240 Some(SuspiciousActivity {
241 user_id: user_id.to_string(),
242 activity_type: ActivityType::VelocityExceeded,
243 description: format!("High velocity: {} sats in 1 hour", total_amount),
244 txids,
245 amount_sats: total_amount,
246 detected_at: Utc::now(),
247 risk_score,
248 })
249 } else {
250 None
251 }
252 }
253
254 fn calculate_large_tx_risk_score(&self, amount_sats: u64) -> u8 {
256 let threshold = self.config.large_tx_threshold_sats;
257 let ratio = amount_sats as f64 / threshold as f64;
258
259 let score = 50.0 + 25.0 * ratio.ln();
261 score.clamp(0.0, 100.0) as u8
262 }
263
264 fn calculate_rapid_tx_risk_score(&self, count: usize) -> u8 {
266 let threshold = self.config.rapid_tx_count as f64;
267 let ratio = count as f64 / threshold;
268
269 let score = 50.0 + 30.0 * (ratio - 1.0);
270 score.clamp(0.0, 100.0) as u8
271 }
272
273 fn calculate_velocity_risk_score(&self, amount_sats: u64) -> u8 {
275 let threshold = self.config.velocity_limit_sats_per_hour;
276 let ratio = amount_sats as f64 / threshold as f64;
277
278 let score = 50.0 + 30.0 * (ratio - 1.0);
279 score.clamp(0.0, 100.0) as u8
280 }
281
282 async fn cleanup_old_records(&self, user_id: &str) {
284 let mut transactions = self.user_transactions.write().await;
285 if let Some(user_txs) = transactions.get_mut(user_id) {
286 let now = Utc::now();
287 let cutoff = now - Duration::hours(24); while let Some(tx) = user_txs.front() {
290 if tx.timestamp < cutoff {
291 user_txs.pop_front();
292 } else {
293 break;
294 }
295 }
296 }
297 }
298
299 pub async fn get_user_history(&self, user_id: &str) -> Vec<TransactionRecord> {
301 let transactions = self.user_transactions.read().await;
302 transactions
303 .get(user_id)
304 .map(|txs| txs.iter().cloned().collect())
305 .unwrap_or_default()
306 }
307}
308
309#[cfg(test)]
310mod tests {
311 use super::*;
312
313 #[test]
314 fn test_activity_monitor_config_defaults() {
315 let config = ActivityMonitorConfig::default();
316 assert_eq!(config.large_tx_threshold_sats, 10_000_000);
317 assert_eq!(config.rapid_tx_count, 5);
318 assert_eq!(config.rapid_tx_window_secs, 300);
319 assert_eq!(config.velocity_limit_sats_per_hour, 50_000_000);
320 }
321
322 #[tokio::test]
323 async fn test_large_transaction_detection() {
324 let monitor = ActivityMonitor::new(ActivityMonitorConfig::default());
325
326 let record = TransactionRecord {
327 txid: "tx1".to_string(),
328 user_id: "user1".to_string(),
329 amount_sats: 20_000_000, timestamp: Utc::now(),
331 };
332
333 let alert = monitor.record_transaction(record).await;
334 assert!(alert.is_some());
335
336 let alert = alert.unwrap();
337 assert_eq!(alert.activity_type, ActivityType::LargeTransaction);
338 assert!(alert.risk_score >= 50);
339 }
340
341 #[tokio::test]
342 async fn test_rapid_transactions_detection() {
343 let monitor = ActivityMonitor::new(ActivityMonitorConfig::default());
344
345 for i in 0..5 {
347 let record = TransactionRecord {
348 txid: format!("tx{}", i),
349 user_id: "user1".to_string(),
350 amount_sats: 1_000_000,
351 timestamp: Utc::now(),
352 };
353 monitor.record_transaction(record).await;
354 }
355
356 let history = monitor.get_user_history("user1").await;
358 assert_eq!(history.len(), 5);
359 }
360}