1use std::sync::Arc;
10
11use axum::Json;
12use axum::extract::State;
13use axum::http::HeaderMap;
14use axum::response::IntoResponse;
15use forge_core::AuthContext;
16use forge_core::signals::{
17 DiagnosticReport, IdentifyPayload, PageViewPayload, SignalEvent, SignalEventBatch,
18 SignalEventType, SignalResponse, UtmParams,
19};
20use serde_json::Value;
21use sqlx::PgPool;
22use tracing::warn;
23use uuid::Uuid;
24
25use super::bot;
26use super::collector::SignalsCollector;
27use super::device;
28use super::session;
29use super::visitor;
30
31const MAX_BATCH_SIZE: usize = 50;
33
34#[derive(Clone)]
36pub struct SignalsState {
37 pub collector: SignalsCollector,
38 pub pool: PgPool,
39 pub server_secret: String,
40}
41
42pub async fn event_handler(
44 State(state): State<Arc<SignalsState>>,
45 auth: Option<axum::Extension<AuthContext>>,
46 headers: HeaderMap,
47 Json(batch): Json<SignalEventBatch>,
48) -> impl IntoResponse {
49 if batch.events.len() > MAX_BATCH_SIZE {
50 return Json(SignalResponse {
51 ok: false,
52 session_id: None,
53 });
54 }
55
56 let ctx = extract_request_ctx(&headers, &auth, &state.server_secret);
57 let session_id =
58 resolve_session_id(batch.context.as_ref().and_then(|c| c.session_id.as_deref()));
59 let page_url = batch.context.as_ref().and_then(|c| c.page_url.clone());
60
61 let session_id = session::upsert_session(
62 &state.pool,
63 session_id,
64 &ctx.visitor_id,
65 ctx.user_id,
66 ctx.tenant_id,
67 page_url.as_deref(),
68 batch.context.as_ref().and_then(|c| c.referrer.as_deref()),
69 ctx.user_agent.as_deref(),
70 ctx.client_ip.as_deref(),
71 ctx.is_bot,
72 "track",
73 ctx.device_type.as_deref(),
74 ctx.browser.as_deref(),
75 ctx.os.as_deref(),
76 )
77 .await;
78
79 for event in batch.events {
80 let signal = SignalEvent {
81 event_type: SignalEventType::Track,
82 event_name: Some(event.event),
83 correlation_id: event.correlation_id,
84 session_id,
85 visitor_id: Some(ctx.visitor_id.clone()),
86 user_id: ctx.user_id,
87 tenant_id: ctx.tenant_id,
88 properties: event.properties,
89 page_url: page_url.clone(),
90 referrer: None,
91 function_name: None,
92 function_kind: None,
93 duration_ms: None,
94 status: None,
95 error_message: None,
96 error_stack: None,
97 error_context: None,
98 client_ip: ctx.client_ip.clone(),
99 user_agent: ctx.user_agent.clone(),
100 device_type: ctx.device_type.clone(),
101 browser: ctx.browser.clone(),
102 os: ctx.os.clone(),
103 utm: None,
104 is_bot: ctx.is_bot,
105 timestamp: event.timestamp.unwrap_or_else(chrono::Utc::now),
106 };
107 state.collector.try_send(signal);
108 }
109
110 Json(SignalResponse {
111 ok: true,
112 session_id,
113 })
114}
115
116pub async fn view_handler(
118 State(state): State<Arc<SignalsState>>,
119 auth: Option<axum::Extension<AuthContext>>,
120 headers: HeaderMap,
121 Json(payload): Json<PageViewPayload>,
122) -> impl IntoResponse {
123 let ctx = extract_request_ctx(&headers, &auth, &state.server_secret);
124 let session_id_header = extract_header(&headers, "x-session-id");
125 let session_id = resolve_session_id(session_id_header.as_deref());
126
127 let session_id = session::upsert_session(
128 &state.pool,
129 session_id,
130 &ctx.visitor_id,
131 ctx.user_id,
132 ctx.tenant_id,
133 Some(&payload.url),
134 payload.referrer.as_deref(),
135 ctx.user_agent.as_deref(),
136 ctx.client_ip.as_deref(),
137 ctx.is_bot,
138 "page_view",
139 ctx.device_type.as_deref(),
140 ctx.browser.as_deref(),
141 ctx.os.as_deref(),
142 )
143 .await;
144
145 let utm = if payload.utm_source.is_some()
146 || payload.utm_medium.is_some()
147 || payload.utm_campaign.is_some()
148 {
149 Some(UtmParams {
150 source: payload.utm_source,
151 medium: payload.utm_medium,
152 campaign: payload.utm_campaign,
153 term: payload.utm_term,
154 content: payload.utm_content,
155 })
156 } else {
157 None
158 };
159
160 let signal = SignalEvent {
161 event_type: SignalEventType::PageView,
162 event_name: payload.title,
163 correlation_id: payload.correlation_id,
164 session_id,
165 visitor_id: Some(ctx.visitor_id),
166 user_id: ctx.user_id,
167 tenant_id: ctx.tenant_id,
168 properties: Value::Object(serde_json::Map::new()),
169 page_url: Some(payload.url),
170 referrer: payload.referrer,
171 function_name: None,
172 function_kind: None,
173 duration_ms: None,
174 status: None,
175 error_message: None,
176 error_stack: None,
177 error_context: None,
178 client_ip: ctx.client_ip,
179 user_agent: ctx.user_agent,
180 device_type: ctx.device_type,
181 browser: ctx.browser,
182 os: ctx.os,
183 utm,
184 is_bot: ctx.is_bot,
185 timestamp: chrono::Utc::now(),
186 };
187 state.collector.try_send(signal);
188
189 Json(SignalResponse {
190 ok: true,
191 session_id,
192 })
193}
194
195pub async fn user_handler(
197 State(state): State<Arc<SignalsState>>,
198 auth: Option<axum::Extension<AuthContext>>,
199 headers: HeaderMap,
200 Json(payload): Json<IdentifyPayload>,
201) -> impl IntoResponse {
202 let user_id = Uuid::parse_str(&payload.user_id).ok().or_else(|| {
203 warn!(raw_id = %payload.user_id, "identify called with non-UUID user_id, ignoring");
204 None
205 });
206
207 let Some(user_id) = user_id else {
208 return Json(SignalResponse {
209 ok: false,
210 session_id: None,
211 });
212 };
213
214 let session_id_header = extract_header(&headers, "x-session-id");
215 let session_id = resolve_session_id(session_id_header.as_deref());
216
217 if let Some(sid) = session_id {
218 session::identify_session(&state.pool, sid, user_id).await;
219 }
220
221 let referrer: Option<&str> = None;
222 session::upsert_user(
223 &state.pool,
224 user_id,
225 &payload.traits,
226 referrer,
227 None,
228 None,
229 None,
230 )
231 .await;
232
233 let ctx = extract_request_ctx(&headers, &auth, &state.server_secret);
234
235 let signal = SignalEvent {
236 event_type: SignalEventType::Identify,
237 event_name: None,
238 correlation_id: None,
239 session_id,
240 visitor_id: Some(ctx.visitor_id),
241 user_id: Some(user_id),
242 tenant_id: ctx.tenant_id,
243 properties: payload.traits,
244 page_url: None,
245 referrer: None,
246 function_name: None,
247 function_kind: None,
248 duration_ms: None,
249 status: None,
250 error_message: None,
251 error_stack: None,
252 error_context: None,
253 client_ip: ctx.client_ip,
254 user_agent: ctx.user_agent,
255 device_type: ctx.device_type,
256 browser: ctx.browser,
257 os: ctx.os,
258 utm: None,
259 is_bot: ctx.is_bot,
260 timestamp: chrono::Utc::now(),
261 };
262 state.collector.try_send(signal);
263
264 Json(SignalResponse {
265 ok: true,
266 session_id,
267 })
268}
269
270pub async fn report_handler(
272 State(state): State<Arc<SignalsState>>,
273 auth: Option<axum::Extension<AuthContext>>,
274 headers: HeaderMap,
275 Json(report): Json<DiagnosticReport>,
276) -> impl IntoResponse {
277 if report.errors.len() > MAX_BATCH_SIZE {
278 return Json(SignalResponse {
279 ok: false,
280 session_id: None,
281 });
282 }
283
284 let ctx = extract_request_ctx(&headers, &auth, &state.server_secret);
285 let session_id_header = extract_header(&headers, "x-session-id");
286 let session_id = resolve_session_id(session_id_header.as_deref());
287
288 if let Some(sid) = session_id {
289 session::upsert_session(
290 &state.pool,
291 Some(sid),
292 &ctx.visitor_id,
293 ctx.user_id,
294 ctx.tenant_id,
295 None,
296 None,
297 ctx.user_agent.as_deref(),
298 ctx.client_ip.as_deref(),
299 ctx.is_bot,
300 "error",
301 ctx.device_type.as_deref(),
302 ctx.browser.as_deref(),
303 ctx.os.as_deref(),
304 )
305 .await;
306 }
307
308 for err in report.errors {
309 let signal = SignalEvent {
310 event_type: SignalEventType::Error,
311 event_name: Some(err.message.clone()),
312 correlation_id: err.correlation_id,
313 session_id,
314 visitor_id: Some(ctx.visitor_id.clone()),
315 user_id: ctx.user_id,
316 tenant_id: ctx.tenant_id,
317 properties: Value::Object(serde_json::Map::new()),
318 page_url: err.page_url,
319 referrer: None,
320 function_name: None,
321 function_kind: None,
322 duration_ms: None,
323 status: None,
324 error_message: Some(err.message),
325 error_stack: err.stack,
326 error_context: err.context,
327 client_ip: ctx.client_ip.clone(),
328 user_agent: ctx.user_agent.clone(),
329 device_type: ctx.device_type.clone(),
330 browser: ctx.browser.clone(),
331 os: ctx.os.clone(),
332 utm: None,
333 is_bot: ctx.is_bot,
334 timestamp: chrono::Utc::now(),
335 };
336 state.collector.try_send(signal);
337 }
338
339 Json(SignalResponse {
340 ok: true,
341 session_id,
342 })
343}
344
345struct RequestCtx {
347 user_agent: Option<String>,
348 client_ip: Option<String>,
349 is_bot: bool,
350 visitor_id: String,
351 user_id: Option<Uuid>,
352 tenant_id: Option<Uuid>,
353 device_type: Option<String>,
354 browser: Option<String>,
355 os: Option<String>,
356}
357
358fn extract_request_ctx(
359 headers: &HeaderMap,
360 auth: &Option<axum::Extension<AuthContext>>,
361 server_secret: &str,
362) -> RequestCtx {
363 let user_agent = extract_header(headers, "user-agent");
364 let platform_header = extract_header(headers, "x-forge-platform");
365 let client_ip = extract_client_ip(headers);
366 let ua_lower = user_agent
367 .as_deref()
368 .unwrap_or_default()
369 .to_ascii_lowercase();
370 let is_bot = bot::is_bot_lower(&ua_lower);
371 let visitor_id =
372 visitor::generate_visitor_id(client_ip.as_deref(), user_agent.as_deref(), server_secret);
373 let user_id = auth.as_ref().and_then(|a| a.user_id());
374 let tenant_id = auth.as_ref().and_then(|a| a.tenant_id());
375 let device_info = device::parse_lowered(platform_header.as_deref(), &ua_lower);
376 RequestCtx {
377 user_agent,
378 client_ip,
379 is_bot,
380 visitor_id,
381 user_id,
382 tenant_id,
383 device_type: device_info.device_type,
384 browser: device_info.browser,
385 os: device_info.os,
386 }
387}
388
389fn extract_header(headers: &HeaderMap, name: &str) -> Option<String> {
390 crate::gateway::extract_header(headers, name)
391}
392
393fn extract_client_ip(headers: &HeaderMap) -> Option<String> {
394 crate::gateway::extract_client_ip(headers)
395}
396
397fn resolve_session_id(raw: Option<&str>) -> Option<Uuid> {
398 raw.and_then(|s| Uuid::parse_str(s).ok())
399}
400
401#[cfg(test)]
402#[allow(clippy::unwrap_used)]
403mod tests {
404 use axum::http::{HeaderMap, HeaderValue};
405 use uuid::Uuid;
406
407 use super::{extract_client_ip, extract_header, resolve_session_id};
408
409 #[tokio::test]
410 async fn extract_header_returns_value() {
411 let mut headers = HeaderMap::new();
412 headers.insert("x-custom", HeaderValue::from_static("hello"));
413 assert_eq!(extract_header(&headers, "x-custom"), Some("hello".into()));
414 }
415
416 #[tokio::test]
417 async fn extract_header_returns_none_for_missing() {
418 let headers = HeaderMap::new();
419 assert_eq!(extract_header(&headers, "x-custom"), None);
420 }
421
422 #[tokio::test]
423 async fn extract_header_returns_none_for_empty_value() {
424 let mut headers = HeaderMap::new();
425 headers.insert("x-custom", HeaderValue::from_static(""));
426 assert_eq!(extract_header(&headers, "x-custom"), None);
427 }
428
429 #[tokio::test]
430 async fn extract_client_ip_from_forwarded_for_single() {
431 let mut headers = HeaderMap::new();
432 headers.insert("x-forwarded-for", HeaderValue::from_static("1.2.3.4"));
433 assert_eq!(extract_client_ip(&headers), Some("1.2.3.4".into()));
434 }
435
436 #[tokio::test]
437 async fn extract_client_ip_from_forwarded_for_multiple() {
438 let mut headers = HeaderMap::new();
439 headers.insert(
440 "x-forwarded-for",
441 HeaderValue::from_static("1.2.3.4, 5.6.7.8"),
442 );
443 assert_eq!(extract_client_ip(&headers), Some("1.2.3.4".into()));
444 }
445
446 #[tokio::test]
447 async fn extract_client_ip_falls_back_to_real_ip() {
448 let mut headers = HeaderMap::new();
449 headers.insert("x-real-ip", HeaderValue::from_static("9.8.7.6"));
450 assert_eq!(extract_client_ip(&headers), Some("9.8.7.6".into()));
451 }
452
453 #[tokio::test]
454 async fn extract_client_ip_returns_none_when_no_headers() {
455 let headers = HeaderMap::new();
456 assert_eq!(extract_client_ip(&headers), None);
457 }
458
459 #[tokio::test]
460 async fn resolve_session_id_parses_valid_uuid() {
461 let raw = "550e8400-e29b-41d4-a716-446655440000";
462 let expected = Uuid::parse_str(raw).unwrap();
463 assert_eq!(resolve_session_id(Some(raw)), Some(expected));
464 }
465
466 #[tokio::test]
467 async fn resolve_session_id_returns_none_for_garbage() {
468 assert_eq!(resolve_session_id(Some("not-a-uuid")), None);
469 }
470
471 #[tokio::test]
472 async fn resolve_session_id_returns_none_for_none() {
473 assert_eq!(resolve_session_id(None), None);
474 }
475}