Skip to main content

forge_runtime/signals/
endpoints.rs

1//! HTTP ingestion endpoints for client-side signal events.
2//!
3//! Routes (under /_api/):
4//! - POST /signal/event  -- custom events
5//! - POST /signal/view   -- page views
6//! - POST /signal/user   -- identify (link session to user)
7//! - POST /signal/report -- diagnostic error reports
8
9use std::sync::Arc;
10
11use axum::Json;
12use axum::extract::State;
13use axum::http::HeaderMap;
14use axum::response::IntoResponse;
15use forge_core::AuthContext;
16use forge_core::signals::{
17    DiagnosticReport, IdentifyPayload, PageViewPayload, SignalEvent, SignalEventBatch,
18    SignalEventType, SignalResponse, UtmParams,
19};
20use serde_json::Value;
21use sqlx::PgPool;
22use tracing::warn;
23use uuid::Uuid;
24
25use super::bot;
26use super::collector::SignalsCollector;
27use super::device;
28use super::session;
29use super::visitor;
30
31/// Maximum events per batch request.
32const MAX_BATCH_SIZE: usize = 50;
33
34/// Shared state for signal endpoints.
35#[derive(Clone)]
36pub struct SignalsState {
37    pub collector: SignalsCollector,
38    pub pool: PgPool,
39    pub server_secret: String,
40    /// When true, strip raw client IP from stored events (GDPR-compliant).
41    pub anonymize_ip: bool,
42}
43
44/// POST /signal/event -- batch custom events.
45pub async fn event_handler(
46    State(state): State<Arc<SignalsState>>,
47    auth: Option<axum::Extension<AuthContext>>,
48    headers: HeaderMap,
49    Json(batch): Json<SignalEventBatch>,
50) -> impl IntoResponse {
51    if batch.events.len() > MAX_BATCH_SIZE {
52        return Json(SignalResponse {
53            ok: false,
54            session_id: None,
55        });
56    }
57
58    let ctx = extract_request_ctx(&headers, &auth, &state.server_secret, state.anonymize_ip);
59    let session_id =
60        resolve_session_id(batch.context.as_ref().and_then(|c| c.session_id.as_deref()));
61    let page_url = batch.context.as_ref().and_then(|c| c.page_url.clone());
62
63    let session_id = session::upsert_session(
64        &state.pool,
65        session_id,
66        &ctx.visitor_id,
67        ctx.user_id,
68        ctx.tenant_id,
69        page_url.as_deref(),
70        batch.context.as_ref().and_then(|c| c.referrer.as_deref()),
71        ctx.user_agent.as_deref(),
72        ctx.client_ip.as_deref(),
73        ctx.is_bot,
74        "track",
75        ctx.device_type.as_deref(),
76        ctx.browser.as_deref(),
77        ctx.os.as_deref(),
78    )
79    .await;
80
81    for event in batch.events {
82        let signal = SignalEvent {
83            event_type: SignalEventType::Track,
84            event_name: Some(event.event),
85            correlation_id: event.correlation_id,
86            session_id,
87            visitor_id: Some(ctx.visitor_id.clone()),
88            user_id: ctx.user_id,
89            tenant_id: ctx.tenant_id,
90            properties: event.properties,
91            page_url: page_url.clone(),
92            referrer: None,
93            function_name: None,
94            function_kind: None,
95            duration_ms: None,
96            status: None,
97            error_message: None,
98            error_stack: None,
99            error_context: None,
100            client_ip: ctx.client_ip.clone(),
101            user_agent: ctx.user_agent.clone(),
102            device_type: ctx.device_type.clone(),
103            browser: ctx.browser.clone(),
104            os: ctx.os.clone(),
105            utm: None,
106            is_bot: ctx.is_bot,
107            timestamp: event.timestamp.unwrap_or_else(chrono::Utc::now),
108        };
109        state.collector.try_send(signal);
110    }
111
112    Json(SignalResponse {
113        ok: true,
114        session_id,
115    })
116}
117
118/// POST /signal/view -- page view event.
119pub async fn view_handler(
120    State(state): State<Arc<SignalsState>>,
121    auth: Option<axum::Extension<AuthContext>>,
122    headers: HeaderMap,
123    Json(payload): Json<PageViewPayload>,
124) -> impl IntoResponse {
125    let ctx = extract_request_ctx(&headers, &auth, &state.server_secret, state.anonymize_ip);
126    let session_id_header = extract_header(&headers, "x-session-id");
127    let session_id = resolve_session_id(session_id_header.as_deref());
128
129    let session_id = session::upsert_session(
130        &state.pool,
131        session_id,
132        &ctx.visitor_id,
133        ctx.user_id,
134        ctx.tenant_id,
135        Some(&payload.url),
136        payload.referrer.as_deref(),
137        ctx.user_agent.as_deref(),
138        ctx.client_ip.as_deref(),
139        ctx.is_bot,
140        "page_view",
141        ctx.device_type.as_deref(),
142        ctx.browser.as_deref(),
143        ctx.os.as_deref(),
144    )
145    .await;
146
147    let utm = if payload.utm_source.is_some()
148        || payload.utm_medium.is_some()
149        || payload.utm_campaign.is_some()
150    {
151        Some(UtmParams {
152            source: payload.utm_source,
153            medium: payload.utm_medium,
154            campaign: payload.utm_campaign,
155            term: payload.utm_term,
156            content: payload.utm_content,
157        })
158    } else {
159        None
160    };
161
162    let signal = SignalEvent {
163        event_type: SignalEventType::PageView,
164        event_name: payload.title,
165        correlation_id: payload.correlation_id,
166        session_id,
167        visitor_id: Some(ctx.visitor_id),
168        user_id: ctx.user_id,
169        tenant_id: ctx.tenant_id,
170        properties: Value::Object(serde_json::Map::new()),
171        page_url: Some(payload.url),
172        referrer: payload.referrer,
173        function_name: None,
174        function_kind: None,
175        duration_ms: None,
176        status: None,
177        error_message: None,
178        error_stack: None,
179        error_context: None,
180        client_ip: ctx.client_ip,
181        user_agent: ctx.user_agent,
182        device_type: ctx.device_type,
183        browser: ctx.browser,
184        os: ctx.os,
185        utm,
186        is_bot: ctx.is_bot,
187        timestamp: chrono::Utc::now(),
188    };
189    state.collector.try_send(signal);
190
191    Json(SignalResponse {
192        ok: true,
193        session_id,
194    })
195}
196
197/// POST /signal/user -- identify user.
198pub async fn user_handler(
199    State(state): State<Arc<SignalsState>>,
200    auth: Option<axum::Extension<AuthContext>>,
201    headers: HeaderMap,
202    Json(payload): Json<IdentifyPayload>,
203) -> impl IntoResponse {
204    let user_id = Uuid::parse_str(&payload.user_id).ok().or_else(|| {
205        warn!(raw_id = %payload.user_id, "identify called with non-UUID user_id, ignoring");
206        None
207    });
208
209    let Some(user_id) = user_id else {
210        return Json(SignalResponse {
211            ok: false,
212            session_id: None,
213        });
214    };
215
216    let session_id_header = extract_header(&headers, "x-session-id");
217    let session_id = resolve_session_id(session_id_header.as_deref());
218
219    if let Some(sid) = session_id {
220        session::identify_session(&state.pool, sid, user_id).await;
221    }
222
223    let referrer: Option<&str> = None;
224    session::upsert_user(
225        &state.pool,
226        user_id,
227        &payload.traits,
228        referrer,
229        None,
230        None,
231        None,
232    )
233    .await;
234
235    let ctx = extract_request_ctx(&headers, &auth, &state.server_secret, state.anonymize_ip);
236
237    let signal = SignalEvent {
238        event_type: SignalEventType::Identify,
239        event_name: None,
240        correlation_id: None,
241        session_id,
242        visitor_id: Some(ctx.visitor_id),
243        user_id: Some(user_id),
244        tenant_id: ctx.tenant_id,
245        properties: payload.traits,
246        page_url: None,
247        referrer: None,
248        function_name: None,
249        function_kind: None,
250        duration_ms: None,
251        status: None,
252        error_message: None,
253        error_stack: None,
254        error_context: None,
255        client_ip: ctx.client_ip,
256        user_agent: ctx.user_agent,
257        device_type: ctx.device_type,
258        browser: ctx.browser,
259        os: ctx.os,
260        utm: None,
261        is_bot: ctx.is_bot,
262        timestamp: chrono::Utc::now(),
263    };
264    state.collector.try_send(signal);
265
266    Json(SignalResponse {
267        ok: true,
268        session_id,
269    })
270}
271
272/// POST /signal/report -- diagnostic error reports.
273pub async fn report_handler(
274    State(state): State<Arc<SignalsState>>,
275    auth: Option<axum::Extension<AuthContext>>,
276    headers: HeaderMap,
277    Json(report): Json<DiagnosticReport>,
278) -> impl IntoResponse {
279    if report.errors.len() > MAX_BATCH_SIZE {
280        return Json(SignalResponse {
281            ok: false,
282            session_id: None,
283        });
284    }
285
286    let ctx = extract_request_ctx(&headers, &auth, &state.server_secret, state.anonymize_ip);
287    let session_id_header = extract_header(&headers, "x-session-id");
288    let session_id = resolve_session_id(session_id_header.as_deref());
289
290    if let Some(sid) = session_id {
291        session::upsert_session(
292            &state.pool,
293            Some(sid),
294            &ctx.visitor_id,
295            ctx.user_id,
296            ctx.tenant_id,
297            None,
298            None,
299            ctx.user_agent.as_deref(),
300            ctx.client_ip.as_deref(),
301            ctx.is_bot,
302            "error",
303            ctx.device_type.as_deref(),
304            ctx.browser.as_deref(),
305            ctx.os.as_deref(),
306        )
307        .await;
308    }
309
310    for err in report.errors {
311        let signal = SignalEvent {
312            event_type: SignalEventType::Error,
313            event_name: Some(err.message.clone()),
314            correlation_id: err.correlation_id,
315            session_id,
316            visitor_id: Some(ctx.visitor_id.clone()),
317            user_id: ctx.user_id,
318            tenant_id: ctx.tenant_id,
319            properties: Value::Object(serde_json::Map::new()),
320            page_url: err.page_url,
321            referrer: None,
322            function_name: None,
323            function_kind: None,
324            duration_ms: None,
325            status: None,
326            error_message: Some(err.message),
327            error_stack: err.stack,
328            error_context: err.context,
329            client_ip: ctx.client_ip.clone(),
330            user_agent: ctx.user_agent.clone(),
331            device_type: ctx.device_type.clone(),
332            browser: ctx.browser.clone(),
333            os: ctx.os.clone(),
334            utm: None,
335            is_bot: ctx.is_bot,
336            timestamp: chrono::Utc::now(),
337        };
338        state.collector.try_send(signal);
339    }
340
341    Json(SignalResponse {
342        ok: true,
343        session_id,
344    })
345}
346
347/// Shared request context extracted from headers and auth for all signal endpoints.
348struct RequestCtx {
349    user_agent: Option<String>,
350    client_ip: Option<String>,
351    is_bot: bool,
352    visitor_id: String,
353    user_id: Option<Uuid>,
354    tenant_id: Option<Uuid>,
355    device_type: Option<String>,
356    browser: Option<String>,
357    os: Option<String>,
358}
359
360fn extract_request_ctx(
361    headers: &HeaderMap,
362    auth: &Option<axum::Extension<AuthContext>>,
363    server_secret: &str,
364    anonymize_ip: bool,
365) -> RequestCtx {
366    let user_agent = extract_header(headers, "user-agent");
367    let platform_header = extract_header(headers, "x-forge-platform");
368    let raw_ip = extract_client_ip(headers);
369    let ua_lower = user_agent
370        .as_deref()
371        .unwrap_or_default()
372        .to_ascii_lowercase();
373    let is_bot = bot::is_bot_lower(&ua_lower);
374    let visitor_id =
375        visitor::generate_visitor_id(raw_ip.as_deref(), user_agent.as_deref(), server_secret);
376    let user_id = auth.as_ref().and_then(|a| a.user_id());
377    let tenant_id = auth.as_ref().and_then(|a| a.tenant_id());
378    let device_info = device::parse_lowered(platform_header.as_deref(), &ua_lower);
379    // When anonymize_ip is enabled, drop the raw IP from stored events.
380    // The hashed visitor_id is still computed from the IP above for analytics.
381    let client_ip = if anonymize_ip { None } else { raw_ip };
382    RequestCtx {
383        user_agent,
384        client_ip,
385        is_bot,
386        visitor_id,
387        user_id,
388        tenant_id,
389        device_type: device_info.device_type,
390        browser: device_info.browser,
391        os: device_info.os,
392    }
393}
394
395fn extract_header(headers: &HeaderMap, name: &str) -> Option<String> {
396    crate::gateway::extract_header(headers, name)
397}
398
399fn extract_client_ip(headers: &HeaderMap) -> Option<String> {
400    crate::gateway::extract_client_ip(headers)
401}
402
403fn resolve_session_id(raw: Option<&str>) -> Option<Uuid> {
404    raw.and_then(|s| Uuid::parse_str(s).ok())
405}
406
407#[cfg(test)]
408#[allow(clippy::unwrap_used)]
409mod tests {
410    use axum::http::{HeaderMap, HeaderValue};
411    use uuid::Uuid;
412
413    use super::{extract_client_ip, extract_header, resolve_session_id};
414
415    #[tokio::test]
416    async fn extract_header_returns_value() {
417        let mut headers = HeaderMap::new();
418        headers.insert("x-custom", HeaderValue::from_static("hello"));
419        assert_eq!(extract_header(&headers, "x-custom"), Some("hello".into()));
420    }
421
422    #[tokio::test]
423    async fn extract_header_returns_none_for_missing() {
424        let headers = HeaderMap::new();
425        assert_eq!(extract_header(&headers, "x-custom"), None);
426    }
427
428    #[tokio::test]
429    async fn extract_header_returns_none_for_empty_value() {
430        let mut headers = HeaderMap::new();
431        headers.insert("x-custom", HeaderValue::from_static(""));
432        assert_eq!(extract_header(&headers, "x-custom"), None);
433    }
434
435    #[tokio::test]
436    async fn extract_client_ip_from_forwarded_for_single() {
437        let mut headers = HeaderMap::new();
438        headers.insert("x-forwarded-for", HeaderValue::from_static("1.2.3.4"));
439        assert_eq!(extract_client_ip(&headers), Some("1.2.3.4".into()));
440    }
441
442    #[tokio::test]
443    async fn extract_client_ip_from_forwarded_for_multiple() {
444        let mut headers = HeaderMap::new();
445        headers.insert(
446            "x-forwarded-for",
447            HeaderValue::from_static("1.2.3.4, 5.6.7.8"),
448        );
449        assert_eq!(extract_client_ip(&headers), Some("1.2.3.4".into()));
450    }
451
452    #[tokio::test]
453    async fn extract_client_ip_falls_back_to_real_ip() {
454        let mut headers = HeaderMap::new();
455        headers.insert("x-real-ip", HeaderValue::from_static("9.8.7.6"));
456        assert_eq!(extract_client_ip(&headers), Some("9.8.7.6".into()));
457    }
458
459    #[tokio::test]
460    async fn extract_client_ip_returns_none_when_no_headers() {
461        let headers = HeaderMap::new();
462        assert_eq!(extract_client_ip(&headers), None);
463    }
464
465    #[tokio::test]
466    async fn resolve_session_id_parses_valid_uuid() {
467        let raw = "550e8400-e29b-41d4-a716-446655440000";
468        let expected = Uuid::parse_str(raw).unwrap();
469        assert_eq!(resolve_session_id(Some(raw)), Some(expected));
470    }
471
472    #[tokio::test]
473    async fn resolve_session_id_returns_none_for_garbage() {
474        assert_eq!(resolve_session_id(Some("not-a-uuid")), None);
475    }
476
477    #[tokio::test]
478    async fn resolve_session_id_returns_none_for_none() {
479        assert_eq!(resolve_session_id(None), None);
480    }
481}