1use std::sync::Arc;
10
11use axum::Json;
12use axum::extract::State;
13use axum::http::HeaderMap;
14use axum::response::IntoResponse;
15use forge_core::AuthContext;
16use forge_core::signals::{
17 DiagnosticReport, IdentifyPayload, PageViewPayload, SignalEvent, SignalEventBatch,
18 SignalEventType, SignalResponse, UtmParams,
19};
20use serde_json::Value;
21use sqlx::PgPool;
22use tracing::warn;
23use uuid::Uuid;
24
25use super::bot;
26use super::collector::SignalsCollector;
27use super::device;
28use super::session;
29use super::visitor;
30
31const MAX_BATCH_SIZE: usize = 50;
33
34#[derive(Clone)]
36pub struct SignalsState {
37 pub collector: SignalsCollector,
38 pub pool: PgPool,
39 pub server_secret: String,
40 pub anonymize_ip: bool,
42}
43
44pub async fn event_handler(
46 State(state): State<Arc<SignalsState>>,
47 auth: Option<axum::Extension<AuthContext>>,
48 headers: HeaderMap,
49 Json(batch): Json<SignalEventBatch>,
50) -> impl IntoResponse {
51 if batch.events.len() > MAX_BATCH_SIZE {
52 return Json(SignalResponse {
53 ok: false,
54 session_id: None,
55 });
56 }
57
58 let ctx = extract_request_ctx(&headers, &auth, &state.server_secret, state.anonymize_ip);
59 let session_id =
60 resolve_session_id(batch.context.as_ref().and_then(|c| c.session_id.as_deref()));
61 let page_url = batch.context.as_ref().and_then(|c| c.page_url.clone());
62
63 let session_id = session::upsert_session(
64 &state.pool,
65 session_id,
66 &ctx.visitor_id,
67 ctx.user_id,
68 ctx.tenant_id,
69 page_url.as_deref(),
70 batch.context.as_ref().and_then(|c| c.referrer.as_deref()),
71 ctx.user_agent.as_deref(),
72 ctx.client_ip.as_deref(),
73 ctx.is_bot,
74 "track",
75 ctx.device_type.as_deref(),
76 ctx.browser.as_deref(),
77 ctx.os.as_deref(),
78 )
79 .await;
80
81 for event in batch.events {
82 let signal = SignalEvent {
83 event_type: SignalEventType::Track,
84 event_name: Some(event.event),
85 correlation_id: event.correlation_id,
86 session_id,
87 visitor_id: Some(ctx.visitor_id.clone()),
88 user_id: ctx.user_id,
89 tenant_id: ctx.tenant_id,
90 properties: event.properties,
91 page_url: page_url.clone(),
92 referrer: None,
93 function_name: None,
94 function_kind: None,
95 duration_ms: None,
96 status: None,
97 error_message: None,
98 error_stack: None,
99 error_context: None,
100 client_ip: ctx.client_ip.clone(),
101 user_agent: ctx.user_agent.clone(),
102 device_type: ctx.device_type.clone(),
103 browser: ctx.browser.clone(),
104 os: ctx.os.clone(),
105 utm: None,
106 is_bot: ctx.is_bot,
107 timestamp: event.timestamp.unwrap_or_else(chrono::Utc::now),
108 };
109 state.collector.try_send(signal);
110 }
111
112 Json(SignalResponse {
113 ok: true,
114 session_id,
115 })
116}
117
118pub async fn view_handler(
120 State(state): State<Arc<SignalsState>>,
121 auth: Option<axum::Extension<AuthContext>>,
122 headers: HeaderMap,
123 Json(payload): Json<PageViewPayload>,
124) -> impl IntoResponse {
125 let ctx = extract_request_ctx(&headers, &auth, &state.server_secret, state.anonymize_ip);
126 let session_id_header = extract_header(&headers, "x-session-id");
127 let session_id = resolve_session_id(session_id_header.as_deref());
128
129 let session_id = session::upsert_session(
130 &state.pool,
131 session_id,
132 &ctx.visitor_id,
133 ctx.user_id,
134 ctx.tenant_id,
135 Some(&payload.url),
136 payload.referrer.as_deref(),
137 ctx.user_agent.as_deref(),
138 ctx.client_ip.as_deref(),
139 ctx.is_bot,
140 "page_view",
141 ctx.device_type.as_deref(),
142 ctx.browser.as_deref(),
143 ctx.os.as_deref(),
144 )
145 .await;
146
147 let utm = if payload.utm_source.is_some()
148 || payload.utm_medium.is_some()
149 || payload.utm_campaign.is_some()
150 {
151 Some(UtmParams {
152 source: payload.utm_source,
153 medium: payload.utm_medium,
154 campaign: payload.utm_campaign,
155 term: payload.utm_term,
156 content: payload.utm_content,
157 })
158 } else {
159 None
160 };
161
162 let signal = SignalEvent {
163 event_type: SignalEventType::PageView,
164 event_name: payload.title,
165 correlation_id: payload.correlation_id,
166 session_id,
167 visitor_id: Some(ctx.visitor_id),
168 user_id: ctx.user_id,
169 tenant_id: ctx.tenant_id,
170 properties: Value::Object(serde_json::Map::new()),
171 page_url: Some(payload.url),
172 referrer: payload.referrer,
173 function_name: None,
174 function_kind: None,
175 duration_ms: None,
176 status: None,
177 error_message: None,
178 error_stack: None,
179 error_context: None,
180 client_ip: ctx.client_ip,
181 user_agent: ctx.user_agent,
182 device_type: ctx.device_type,
183 browser: ctx.browser,
184 os: ctx.os,
185 utm,
186 is_bot: ctx.is_bot,
187 timestamp: chrono::Utc::now(),
188 };
189 state.collector.try_send(signal);
190
191 Json(SignalResponse {
192 ok: true,
193 session_id,
194 })
195}
196
197pub async fn user_handler(
199 State(state): State<Arc<SignalsState>>,
200 auth: Option<axum::Extension<AuthContext>>,
201 headers: HeaderMap,
202 Json(payload): Json<IdentifyPayload>,
203) -> impl IntoResponse {
204 let user_id = Uuid::parse_str(&payload.user_id).ok().or_else(|| {
205 warn!(raw_id = %payload.user_id, "identify called with non-UUID user_id, ignoring");
206 None
207 });
208
209 let Some(user_id) = user_id else {
210 return Json(SignalResponse {
211 ok: false,
212 session_id: None,
213 });
214 };
215
216 let session_id_header = extract_header(&headers, "x-session-id");
217 let session_id = resolve_session_id(session_id_header.as_deref());
218
219 if let Some(sid) = session_id {
220 session::identify_session(&state.pool, sid, user_id).await;
221 }
222
223 let referrer: Option<&str> = None;
224 session::upsert_user(
225 &state.pool,
226 user_id,
227 &payload.traits,
228 referrer,
229 None,
230 None,
231 None,
232 )
233 .await;
234
235 let ctx = extract_request_ctx(&headers, &auth, &state.server_secret, state.anonymize_ip);
236
237 let signal = SignalEvent {
238 event_type: SignalEventType::Identify,
239 event_name: None,
240 correlation_id: None,
241 session_id,
242 visitor_id: Some(ctx.visitor_id),
243 user_id: Some(user_id),
244 tenant_id: ctx.tenant_id,
245 properties: payload.traits,
246 page_url: None,
247 referrer: None,
248 function_name: None,
249 function_kind: None,
250 duration_ms: None,
251 status: None,
252 error_message: None,
253 error_stack: None,
254 error_context: None,
255 client_ip: ctx.client_ip,
256 user_agent: ctx.user_agent,
257 device_type: ctx.device_type,
258 browser: ctx.browser,
259 os: ctx.os,
260 utm: None,
261 is_bot: ctx.is_bot,
262 timestamp: chrono::Utc::now(),
263 };
264 state.collector.try_send(signal);
265
266 Json(SignalResponse {
267 ok: true,
268 session_id,
269 })
270}
271
272pub async fn report_handler(
274 State(state): State<Arc<SignalsState>>,
275 auth: Option<axum::Extension<AuthContext>>,
276 headers: HeaderMap,
277 Json(report): Json<DiagnosticReport>,
278) -> impl IntoResponse {
279 if report.errors.len() > MAX_BATCH_SIZE {
280 return Json(SignalResponse {
281 ok: false,
282 session_id: None,
283 });
284 }
285
286 let ctx = extract_request_ctx(&headers, &auth, &state.server_secret, state.anonymize_ip);
287 let session_id_header = extract_header(&headers, "x-session-id");
288 let session_id = resolve_session_id(session_id_header.as_deref());
289
290 if let Some(sid) = session_id {
291 session::upsert_session(
292 &state.pool,
293 Some(sid),
294 &ctx.visitor_id,
295 ctx.user_id,
296 ctx.tenant_id,
297 None,
298 None,
299 ctx.user_agent.as_deref(),
300 ctx.client_ip.as_deref(),
301 ctx.is_bot,
302 "error",
303 ctx.device_type.as_deref(),
304 ctx.browser.as_deref(),
305 ctx.os.as_deref(),
306 )
307 .await;
308 }
309
310 for err in report.errors {
311 let signal = SignalEvent {
312 event_type: SignalEventType::Error,
313 event_name: Some(err.message.clone()),
314 correlation_id: err.correlation_id,
315 session_id,
316 visitor_id: Some(ctx.visitor_id.clone()),
317 user_id: ctx.user_id,
318 tenant_id: ctx.tenant_id,
319 properties: Value::Object(serde_json::Map::new()),
320 page_url: err.page_url,
321 referrer: None,
322 function_name: None,
323 function_kind: None,
324 duration_ms: None,
325 status: None,
326 error_message: Some(err.message),
327 error_stack: err.stack,
328 error_context: err.context,
329 client_ip: ctx.client_ip.clone(),
330 user_agent: ctx.user_agent.clone(),
331 device_type: ctx.device_type.clone(),
332 browser: ctx.browser.clone(),
333 os: ctx.os.clone(),
334 utm: None,
335 is_bot: ctx.is_bot,
336 timestamp: chrono::Utc::now(),
337 };
338 state.collector.try_send(signal);
339 }
340
341 Json(SignalResponse {
342 ok: true,
343 session_id,
344 })
345}
346
347struct RequestCtx {
349 user_agent: Option<String>,
350 client_ip: Option<String>,
351 is_bot: bool,
352 visitor_id: String,
353 user_id: Option<Uuid>,
354 tenant_id: Option<Uuid>,
355 device_type: Option<String>,
356 browser: Option<String>,
357 os: Option<String>,
358}
359
360fn extract_request_ctx(
361 headers: &HeaderMap,
362 auth: &Option<axum::Extension<AuthContext>>,
363 server_secret: &str,
364 anonymize_ip: bool,
365) -> RequestCtx {
366 let user_agent = extract_header(headers, "user-agent");
367 let platform_header = extract_header(headers, "x-forge-platform");
368 let raw_ip = extract_client_ip(headers);
369 let ua_lower = user_agent
370 .as_deref()
371 .unwrap_or_default()
372 .to_ascii_lowercase();
373 let is_bot = bot::is_bot_lower(&ua_lower);
374 let visitor_id =
375 visitor::generate_visitor_id(raw_ip.as_deref(), user_agent.as_deref(), server_secret);
376 let user_id = auth.as_ref().and_then(|a| a.user_id());
377 let tenant_id = auth.as_ref().and_then(|a| a.tenant_id());
378 let device_info = device::parse_lowered(platform_header.as_deref(), &ua_lower);
379 let client_ip = if anonymize_ip { None } else { raw_ip };
382 RequestCtx {
383 user_agent,
384 client_ip,
385 is_bot,
386 visitor_id,
387 user_id,
388 tenant_id,
389 device_type: device_info.device_type,
390 browser: device_info.browser,
391 os: device_info.os,
392 }
393}
394
395fn extract_header(headers: &HeaderMap, name: &str) -> Option<String> {
396 crate::gateway::extract_header(headers, name)
397}
398
399fn extract_client_ip(headers: &HeaderMap) -> Option<String> {
400 crate::gateway::extract_client_ip(headers)
401}
402
403fn resolve_session_id(raw: Option<&str>) -> Option<Uuid> {
404 raw.and_then(|s| Uuid::parse_str(s).ok())
405}
406
407#[cfg(test)]
408#[allow(clippy::unwrap_used)]
409mod tests {
410 use axum::http::{HeaderMap, HeaderValue};
411 use uuid::Uuid;
412
413 use super::{extract_client_ip, extract_header, resolve_session_id};
414
415 #[tokio::test]
416 async fn extract_header_returns_value() {
417 let mut headers = HeaderMap::new();
418 headers.insert("x-custom", HeaderValue::from_static("hello"));
419 assert_eq!(extract_header(&headers, "x-custom"), Some("hello".into()));
420 }
421
422 #[tokio::test]
423 async fn extract_header_returns_none_for_missing() {
424 let headers = HeaderMap::new();
425 assert_eq!(extract_header(&headers, "x-custom"), None);
426 }
427
428 #[tokio::test]
429 async fn extract_header_returns_none_for_empty_value() {
430 let mut headers = HeaderMap::new();
431 headers.insert("x-custom", HeaderValue::from_static(""));
432 assert_eq!(extract_header(&headers, "x-custom"), None);
433 }
434
435 #[tokio::test]
436 async fn extract_client_ip_from_forwarded_for_single() {
437 let mut headers = HeaderMap::new();
438 headers.insert("x-forwarded-for", HeaderValue::from_static("1.2.3.4"));
439 assert_eq!(extract_client_ip(&headers), Some("1.2.3.4".into()));
440 }
441
442 #[tokio::test]
443 async fn extract_client_ip_from_forwarded_for_multiple() {
444 let mut headers = HeaderMap::new();
445 headers.insert(
446 "x-forwarded-for",
447 HeaderValue::from_static("1.2.3.4, 5.6.7.8"),
448 );
449 assert_eq!(extract_client_ip(&headers), Some("1.2.3.4".into()));
450 }
451
452 #[tokio::test]
453 async fn extract_client_ip_falls_back_to_real_ip() {
454 let mut headers = HeaderMap::new();
455 headers.insert("x-real-ip", HeaderValue::from_static("9.8.7.6"));
456 assert_eq!(extract_client_ip(&headers), Some("9.8.7.6".into()));
457 }
458
459 #[tokio::test]
460 async fn extract_client_ip_returns_none_when_no_headers() {
461 let headers = HeaderMap::new();
462 assert_eq!(extract_client_ip(&headers), None);
463 }
464
465 #[tokio::test]
466 async fn resolve_session_id_parses_valid_uuid() {
467 let raw = "550e8400-e29b-41d4-a716-446655440000";
468 let expected = Uuid::parse_str(raw).unwrap();
469 assert_eq!(resolve_session_id(Some(raw)), Some(expected));
470 }
471
472 #[tokio::test]
473 async fn resolve_session_id_returns_none_for_garbage() {
474 assert_eq!(resolve_session_id(Some("not-a-uuid")), None);
475 }
476
477 #[tokio::test]
478 async fn resolve_session_id_returns_none_for_none() {
479 assert_eq!(resolve_session_id(None), None);
480 }
481}