Skip to main content

forge_runtime/signals/
endpoints.rs

1//! HTTP ingestion endpoints for client-side signal events.
2//!
3//! Routes (under /_api/):
4//! - POST /signal/event  -- custom events
5//! - POST /signal/view   -- page views
6//! - POST /signal/user   -- identify (link session to user)
7//! - POST /signal/report -- diagnostic error reports
8
9use std::sync::Arc;
10
11use axum::Json;
12use axum::extract::State;
13use axum::http::HeaderMap;
14use axum::response::IntoResponse;
15use forge_core::AuthContext;
16use forge_core::signals::{
17    DiagnosticReport, IdentifyPayload, PageViewPayload, SignalEvent, SignalEventBatch,
18    SignalEventType, SignalResponse, UtmParams,
19};
20use serde_json::Value;
21use sqlx::PgPool;
22use tracing::warn;
23use uuid::Uuid;
24
25use super::bot;
26use super::collector::SignalsCollector;
27use super::device;
28use super::session;
29use super::visitor;
30
31/// Maximum events per batch request.
32const MAX_BATCH_SIZE: usize = 50;
33
34/// Shared state for signal endpoints.
35#[derive(Clone)]
36pub struct SignalsState {
37    pub collector: SignalsCollector,
38    pub pool: PgPool,
39    pub server_secret: String,
40}
41
42/// POST /signal/event -- batch custom events.
43pub async fn event_handler(
44    State(state): State<Arc<SignalsState>>,
45    auth: Option<axum::Extension<AuthContext>>,
46    headers: HeaderMap,
47    Json(batch): Json<SignalEventBatch>,
48) -> impl IntoResponse {
49    if batch.events.len() > MAX_BATCH_SIZE {
50        return Json(SignalResponse {
51            ok: false,
52            session_id: None,
53        });
54    }
55
56    let ctx = extract_request_ctx(&headers, &auth, &state.server_secret);
57    let session_id =
58        resolve_session_id(batch.context.as_ref().and_then(|c| c.session_id.as_deref()));
59    let page_url = batch.context.as_ref().and_then(|c| c.page_url.clone());
60
61    let session_id = session::upsert_session(
62        &state.pool,
63        session_id,
64        &ctx.visitor_id,
65        ctx.user_id,
66        ctx.tenant_id,
67        page_url.as_deref(),
68        batch.context.as_ref().and_then(|c| c.referrer.as_deref()),
69        ctx.user_agent.as_deref(),
70        ctx.client_ip.as_deref(),
71        ctx.is_bot,
72        "track",
73        ctx.device_type.as_deref(),
74        ctx.browser.as_deref(),
75        ctx.os.as_deref(),
76    )
77    .await;
78
79    for event in batch.events {
80        let signal = SignalEvent {
81            event_type: SignalEventType::Track,
82            event_name: Some(event.event),
83            correlation_id: event.correlation_id,
84            session_id,
85            visitor_id: Some(ctx.visitor_id.clone()),
86            user_id: ctx.user_id,
87            tenant_id: ctx.tenant_id,
88            properties: event.properties,
89            page_url: page_url.clone(),
90            referrer: None,
91            function_name: None,
92            function_kind: None,
93            duration_ms: None,
94            status: None,
95            error_message: None,
96            error_stack: None,
97            error_context: None,
98            client_ip: ctx.client_ip.clone(),
99            user_agent: ctx.user_agent.clone(),
100            device_type: ctx.device_type.clone(),
101            browser: ctx.browser.clone(),
102            os: ctx.os.clone(),
103            utm: None,
104            is_bot: ctx.is_bot,
105            timestamp: event.timestamp.unwrap_or_else(chrono::Utc::now),
106        };
107        state.collector.try_send(signal);
108    }
109
110    Json(SignalResponse {
111        ok: true,
112        session_id,
113    })
114}
115
116/// POST /signal/view -- page view event.
117pub async fn view_handler(
118    State(state): State<Arc<SignalsState>>,
119    auth: Option<axum::Extension<AuthContext>>,
120    headers: HeaderMap,
121    Json(payload): Json<PageViewPayload>,
122) -> impl IntoResponse {
123    let ctx = extract_request_ctx(&headers, &auth, &state.server_secret);
124    let session_id_header = extract_header(&headers, "x-session-id");
125    let session_id = resolve_session_id(session_id_header.as_deref());
126
127    let session_id = session::upsert_session(
128        &state.pool,
129        session_id,
130        &ctx.visitor_id,
131        ctx.user_id,
132        ctx.tenant_id,
133        Some(&payload.url),
134        payload.referrer.as_deref(),
135        ctx.user_agent.as_deref(),
136        ctx.client_ip.as_deref(),
137        ctx.is_bot,
138        "page_view",
139        ctx.device_type.as_deref(),
140        ctx.browser.as_deref(),
141        ctx.os.as_deref(),
142    )
143    .await;
144
145    let utm = if payload.utm_source.is_some()
146        || payload.utm_medium.is_some()
147        || payload.utm_campaign.is_some()
148    {
149        Some(UtmParams {
150            source: payload.utm_source,
151            medium: payload.utm_medium,
152            campaign: payload.utm_campaign,
153            term: payload.utm_term,
154            content: payload.utm_content,
155        })
156    } else {
157        None
158    };
159
160    let signal = SignalEvent {
161        event_type: SignalEventType::PageView,
162        event_name: payload.title,
163        correlation_id: payload.correlation_id,
164        session_id,
165        visitor_id: Some(ctx.visitor_id),
166        user_id: ctx.user_id,
167        tenant_id: ctx.tenant_id,
168        properties: Value::Object(serde_json::Map::new()),
169        page_url: Some(payload.url),
170        referrer: payload.referrer,
171        function_name: None,
172        function_kind: None,
173        duration_ms: None,
174        status: None,
175        error_message: None,
176        error_stack: None,
177        error_context: None,
178        client_ip: ctx.client_ip,
179        user_agent: ctx.user_agent,
180        device_type: ctx.device_type,
181        browser: ctx.browser,
182        os: ctx.os,
183        utm,
184        is_bot: ctx.is_bot,
185        timestamp: chrono::Utc::now(),
186    };
187    state.collector.try_send(signal);
188
189    Json(SignalResponse {
190        ok: true,
191        session_id,
192    })
193}
194
195/// POST /signal/user -- identify user.
196pub async fn user_handler(
197    State(state): State<Arc<SignalsState>>,
198    auth: Option<axum::Extension<AuthContext>>,
199    headers: HeaderMap,
200    Json(payload): Json<IdentifyPayload>,
201) -> impl IntoResponse {
202    let user_id = Uuid::parse_str(&payload.user_id).ok().or_else(|| {
203        warn!(raw_id = %payload.user_id, "identify called with non-UUID user_id, ignoring");
204        None
205    });
206
207    let Some(user_id) = user_id else {
208        return Json(SignalResponse {
209            ok: false,
210            session_id: None,
211        });
212    };
213
214    let session_id_header = extract_header(&headers, "x-session-id");
215    let session_id = resolve_session_id(session_id_header.as_deref());
216
217    if let Some(sid) = session_id {
218        session::identify_session(&state.pool, sid, user_id).await;
219    }
220
221    let referrer: Option<&str> = None;
222    session::upsert_user(
223        &state.pool,
224        user_id,
225        &payload.traits,
226        referrer,
227        None,
228        None,
229        None,
230    )
231    .await;
232
233    let ctx = extract_request_ctx(&headers, &auth, &state.server_secret);
234
235    let signal = SignalEvent {
236        event_type: SignalEventType::Identify,
237        event_name: None,
238        correlation_id: None,
239        session_id,
240        visitor_id: Some(ctx.visitor_id),
241        user_id: Some(user_id),
242        tenant_id: ctx.tenant_id,
243        properties: payload.traits,
244        page_url: None,
245        referrer: None,
246        function_name: None,
247        function_kind: None,
248        duration_ms: None,
249        status: None,
250        error_message: None,
251        error_stack: None,
252        error_context: None,
253        client_ip: ctx.client_ip,
254        user_agent: ctx.user_agent,
255        device_type: ctx.device_type,
256        browser: ctx.browser,
257        os: ctx.os,
258        utm: None,
259        is_bot: ctx.is_bot,
260        timestamp: chrono::Utc::now(),
261    };
262    state.collector.try_send(signal);
263
264    Json(SignalResponse {
265        ok: true,
266        session_id,
267    })
268}
269
270/// POST /signal/report -- diagnostic error reports.
271pub async fn report_handler(
272    State(state): State<Arc<SignalsState>>,
273    auth: Option<axum::Extension<AuthContext>>,
274    headers: HeaderMap,
275    Json(report): Json<DiagnosticReport>,
276) -> impl IntoResponse {
277    if report.errors.len() > MAX_BATCH_SIZE {
278        return Json(SignalResponse {
279            ok: false,
280            session_id: None,
281        });
282    }
283
284    let ctx = extract_request_ctx(&headers, &auth, &state.server_secret);
285    let session_id_header = extract_header(&headers, "x-session-id");
286    let session_id = resolve_session_id(session_id_header.as_deref());
287
288    if let Some(sid) = session_id {
289        session::upsert_session(
290            &state.pool,
291            Some(sid),
292            &ctx.visitor_id,
293            ctx.user_id,
294            ctx.tenant_id,
295            None,
296            None,
297            ctx.user_agent.as_deref(),
298            ctx.client_ip.as_deref(),
299            ctx.is_bot,
300            "error",
301            ctx.device_type.as_deref(),
302            ctx.browser.as_deref(),
303            ctx.os.as_deref(),
304        )
305        .await;
306    }
307
308    for err in report.errors {
309        let signal = SignalEvent {
310            event_type: SignalEventType::Error,
311            event_name: Some(err.message.clone()),
312            correlation_id: err.correlation_id,
313            session_id,
314            visitor_id: Some(ctx.visitor_id.clone()),
315            user_id: ctx.user_id,
316            tenant_id: ctx.tenant_id,
317            properties: Value::Object(serde_json::Map::new()),
318            page_url: err.page_url,
319            referrer: None,
320            function_name: None,
321            function_kind: None,
322            duration_ms: None,
323            status: None,
324            error_message: Some(err.message),
325            error_stack: err.stack,
326            error_context: err.context,
327            client_ip: ctx.client_ip.clone(),
328            user_agent: ctx.user_agent.clone(),
329            device_type: ctx.device_type.clone(),
330            browser: ctx.browser.clone(),
331            os: ctx.os.clone(),
332            utm: None,
333            is_bot: ctx.is_bot,
334            timestamp: chrono::Utc::now(),
335        };
336        state.collector.try_send(signal);
337    }
338
339    Json(SignalResponse {
340        ok: true,
341        session_id,
342    })
343}
344
345/// Shared request context extracted from headers and auth for all signal endpoints.
346struct RequestCtx {
347    user_agent: Option<String>,
348    client_ip: Option<String>,
349    is_bot: bool,
350    visitor_id: String,
351    user_id: Option<Uuid>,
352    tenant_id: Option<Uuid>,
353    device_type: Option<String>,
354    browser: Option<String>,
355    os: Option<String>,
356}
357
358fn extract_request_ctx(
359    headers: &HeaderMap,
360    auth: &Option<axum::Extension<AuthContext>>,
361    server_secret: &str,
362) -> RequestCtx {
363    let user_agent = extract_header(headers, "user-agent");
364    let platform_header = extract_header(headers, "x-forge-platform");
365    let client_ip = extract_client_ip(headers);
366    let ua_lower = user_agent
367        .as_deref()
368        .unwrap_or_default()
369        .to_ascii_lowercase();
370    let is_bot = bot::is_bot_lower(&ua_lower);
371    let visitor_id =
372        visitor::generate_visitor_id(client_ip.as_deref(), user_agent.as_deref(), server_secret);
373    let user_id = auth.as_ref().and_then(|a| a.user_id());
374    let tenant_id = auth.as_ref().and_then(|a| a.tenant_id());
375    let device_info = device::parse_lowered(platform_header.as_deref(), &ua_lower);
376    RequestCtx {
377        user_agent,
378        client_ip,
379        is_bot,
380        visitor_id,
381        user_id,
382        tenant_id,
383        device_type: device_info.device_type,
384        browser: device_info.browser,
385        os: device_info.os,
386    }
387}
388
389fn extract_header(headers: &HeaderMap, name: &str) -> Option<String> {
390    crate::gateway::extract_header(headers, name)
391}
392
393fn extract_client_ip(headers: &HeaderMap) -> Option<String> {
394    crate::gateway::extract_client_ip(headers)
395}
396
397fn resolve_session_id(raw: Option<&str>) -> Option<Uuid> {
398    raw.and_then(|s| Uuid::parse_str(s).ok())
399}
400
401#[cfg(test)]
402#[allow(clippy::unwrap_used)]
403mod tests {
404    use axum::http::{HeaderMap, HeaderValue};
405    use uuid::Uuid;
406
407    use super::{extract_client_ip, extract_header, resolve_session_id};
408
409    #[tokio::test]
410    async fn extract_header_returns_value() {
411        let mut headers = HeaderMap::new();
412        headers.insert("x-custom", HeaderValue::from_static("hello"));
413        assert_eq!(extract_header(&headers, "x-custom"), Some("hello".into()));
414    }
415
416    #[tokio::test]
417    async fn extract_header_returns_none_for_missing() {
418        let headers = HeaderMap::new();
419        assert_eq!(extract_header(&headers, "x-custom"), None);
420    }
421
422    #[tokio::test]
423    async fn extract_header_returns_none_for_empty_value() {
424        let mut headers = HeaderMap::new();
425        headers.insert("x-custom", HeaderValue::from_static(""));
426        assert_eq!(extract_header(&headers, "x-custom"), None);
427    }
428
429    #[tokio::test]
430    async fn extract_client_ip_from_forwarded_for_single() {
431        let mut headers = HeaderMap::new();
432        headers.insert("x-forwarded-for", HeaderValue::from_static("1.2.3.4"));
433        assert_eq!(extract_client_ip(&headers), Some("1.2.3.4".into()));
434    }
435
436    #[tokio::test]
437    async fn extract_client_ip_from_forwarded_for_multiple() {
438        let mut headers = HeaderMap::new();
439        headers.insert(
440            "x-forwarded-for",
441            HeaderValue::from_static("1.2.3.4, 5.6.7.8"),
442        );
443        assert_eq!(extract_client_ip(&headers), Some("1.2.3.4".into()));
444    }
445
446    #[tokio::test]
447    async fn extract_client_ip_falls_back_to_real_ip() {
448        let mut headers = HeaderMap::new();
449        headers.insert("x-real-ip", HeaderValue::from_static("9.8.7.6"));
450        assert_eq!(extract_client_ip(&headers), Some("9.8.7.6".into()));
451    }
452
453    #[tokio::test]
454    async fn extract_client_ip_returns_none_when_no_headers() {
455        let headers = HeaderMap::new();
456        assert_eq!(extract_client_ip(&headers), None);
457    }
458
459    #[tokio::test]
460    async fn resolve_session_id_parses_valid_uuid() {
461        let raw = "550e8400-e29b-41d4-a716-446655440000";
462        let expected = Uuid::parse_str(raw).unwrap();
463        assert_eq!(resolve_session_id(Some(raw)), Some(expected));
464    }
465
466    #[tokio::test]
467    async fn resolve_session_id_returns_none_for_garbage() {
468        assert_eq!(resolve_session_id(Some("not-a-uuid")), None);
469    }
470
471    #[tokio::test]
472    async fn resolve_session_id_returns_none_for_none() {
473        assert_eq!(resolve_session_id(None), None);
474    }
475}