trojan_analytics/
collector.rs1use std::net::{IpAddr, SocketAddr};
4use std::sync::Arc;
5use std::time::Instant;
6
7use rand::Rng;
8use tokio::sync::mpsc;
9use tracing::debug;
10use trojan_config::{AnalyticsConfig, AnalyticsPrivacyConfig};
11
12use crate::event::{AuthResult, CloseReason, ConnectionEvent, Protocol, TargetType, Transport};
13
14#[derive(Clone)]
19pub struct EventCollector {
20 sender: mpsc::Sender<ConnectionEvent>,
21 config: Arc<AnalyticsConfig>,
22}
23
24impl EventCollector {
25 pub(crate) fn new(sender: mpsc::Sender<ConnectionEvent>, config: Arc<AnalyticsConfig>) -> Self {
27 Self { sender, config }
28 }
29
30 #[inline]
34 pub fn record(&self, event: ConnectionEvent) -> bool {
35 self.sender.try_send(event).is_ok()
36 }
37
38 pub fn connection(&self, conn_id: u64, peer: SocketAddr) -> ConnectionEventBuilder {
42 ConnectionEventBuilder::new(self.clone(), conn_id, peer, &self.config)
43 }
44
45 pub fn should_sample(&self, user_id: Option<&str>) -> bool {
49 let sampling = &self.config.sampling;
50
51 if let Some(uid) = user_id
53 && sampling.always_record_users.iter().any(|u| u == uid)
54 {
55 return true;
56 }
57
58 if sampling.rate >= 1.0 {
60 return true;
61 }
62 if sampling.rate <= 0.0 {
63 return false;
64 }
65
66 rand::thread_rng().r#gen::<f64>() < sampling.rate
67 }
68
69 pub fn privacy(&self) -> &AnalyticsPrivacyConfig {
71 &self.config.privacy
72 }
73
74 pub fn server_id(&self) -> Option<&str> {
76 self.config.server_id.as_deref()
77 }
78}
79
80pub struct ConnectionEventBuilder {
85 collector: EventCollector,
86 event: ConnectionEvent,
87 start_time: Instant,
88 sent: bool,
89}
90
91impl ConnectionEventBuilder {
92 fn new(
94 collector: EventCollector,
95 conn_id: u64,
96 peer: SocketAddr,
97 config: &AnalyticsConfig,
98 ) -> Self {
99 let peer_ip = if config.privacy.record_peer_ip {
100 peer.ip()
101 } else {
102 match peer {
104 SocketAddr::V4(_) => IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
105 SocketAddr::V6(_) => IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
106 }
107 };
108
109 let mut event = ConnectionEvent::new(conn_id, peer_ip, peer.port());
110 event.server_id = config.server_id.clone().unwrap_or_default();
111
112 Self {
113 collector,
114 event,
115 start_time: Instant::now(),
116 sent: false,
117 }
118 }
119
120 pub fn user(mut self, user_id: impl Into<String>) -> Self {
122 let uid = user_id.into();
123 let privacy = self.collector.privacy();
124
125 self.event.user_id = if privacy.full_user_id {
126 uid
127 } else {
128 let len = privacy.user_id_prefix_len.min(uid.len());
130 uid[..len].to_string()
131 };
132 self.event.auth_result = AuthResult::Success;
133 self
134 }
135
136 pub fn auth_failed(mut self) -> Self {
138 self.event.auth_result = AuthResult::Failed;
139 self
140 }
141
142 pub fn target(mut self, host: impl Into<String>, port: u16, target_type: TargetType) -> Self {
144 self.event.target_host = host.into();
145 self.event.target_port = port;
146 self.event.target_type = target_type;
147 self
148 }
149
150 pub fn sni(mut self, sni: impl Into<String>) -> Self {
152 if self.collector.privacy().record_sni {
153 self.event.sni = sni.into();
154 }
155 self
156 }
157
158 pub fn protocol(mut self, protocol: Protocol) -> Self {
160 self.event.protocol = protocol;
161 self
162 }
163
164 pub fn transport(mut self, transport: Transport) -> Self {
166 self.event.transport = transport;
167 self
168 }
169
170 pub fn fallback(mut self) -> Self {
172 self.event.is_fallback = true;
173 self.event.auth_result = AuthResult::Skipped;
174 self
175 }
176
177 #[inline]
179 pub fn add_bytes(&mut self, sent: u64, recv: u64) {
180 self.event.bytes_sent += sent;
181 self.event.bytes_recv += recv;
182 }
183
184 #[inline]
186 pub fn add_packets(&mut self, sent: u64, recv: u64) {
187 self.event.packets_sent += sent;
188 self.event.packets_recv += recv;
189 }
190
191 pub fn event_mut(&mut self) -> &mut ConnectionEvent {
193 &mut self.event
194 }
195
196 pub fn finish(mut self, close_reason: CloseReason) {
198 self.event.duration_ms = self.start_time.elapsed().as_millis() as u64;
199 self.event.close_reason = close_reason;
200 self.send();
201 }
202
203 fn send(&mut self) {
205 if self.sent {
206 return;
207 }
208 self.sent = true;
209
210 if !self.collector.record(self.event.clone()) {
211 debug!(
212 conn_id = self.event.conn_id,
213 "analytics buffer full, event dropped"
214 );
215 }
216 }
217}
218
219impl Drop for ConnectionEventBuilder {
220 fn drop(&mut self) {
221 if !self.sent {
222 self.event.duration_ms = self.start_time.elapsed().as_millis() as u64;
223 self.send();
224 }
225 }
226}