Skip to main content

systemprompt_api/routes/analytics/
events.rs

1//! Analytics event ingestion routes.
2//!
3//! Records single and batched client analytics events, resolving the owning
4//! content id via slug/URL routing and fanning `page_exit` events out into the
5//! engagement store. Shared handler state lives in [`AnalyticsState`].
6
7use axum::Json;
8use axum::extract::{Extension, State};
9use axum::http::StatusCode;
10use axum::response::IntoResponse;
11use std::sync::Arc;
12
13use systemprompt_analytics::{
14    AnalyticsEventBatchResponse, AnalyticsEventType, AnalyticsEventsRepository,
15    CreateAnalyticsEventBatchInput, CreateAnalyticsEventInput, CreateEngagementEventInput,
16    EngagementOptionalMetrics, EngagementRepository,
17};
18use systemprompt_content::ContentRepository;
19use systemprompt_identifiers::{ContentId, LocaleCode};
20use systemprompt_models::ContentRouting;
21use systemprompt_models::api::ApiError;
22use systemprompt_models::execution::context::RequestContext;
23
24#[derive(Clone)]
25pub struct AnalyticsState {
26    pub events: Arc<AnalyticsEventsRepository>,
27    pub content: Arc<ContentRepository>,
28    pub engagement: Arc<EngagementRepository>,
29    pub content_routing: Option<Arc<dyn ContentRouting>>,
30}
31
32impl std::fmt::Debug for AnalyticsState {
33    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
34        f.debug_struct("AnalyticsState")
35            .field("content_routing", &self.content_routing.is_some())
36            .finish_non_exhaustive()
37    }
38}
39
40async fn resolve_content_id(
41    content_repo: &ContentRepository,
42    content_routing: Option<&dyn ContentRouting>,
43    page_url: &str,
44    slug: Option<&str>,
45) -> Option<ContentId> {
46    let resolved_slug;
47    let slug_to_use = if let Some(s) = slug {
48        s
49    } else {
50        resolved_slug = content_routing.and_then(|r| r.resolve_slug(page_url))?;
51        &resolved_slug
52    };
53
54    content_repo
55        .get_by_slug(slug_to_use, &LocaleCode::new("en"))
56        .await
57        .map_err(|e| {
58            tracing::warn!(error = %e, slug = %slug_to_use, "Failed to lookup content by slug");
59            e
60        })
61        .ok()
62        .flatten()
63        .map(|c| c.id)
64}
65
66pub(super) async fn record_event(
67    State(state): State<AnalyticsState>,
68    Extension(req_ctx): Extension<RequestContext>,
69    Json(mut input): Json<CreateAnalyticsEventInput>,
70) -> Result<impl IntoResponse, ApiError> {
71    if input.content_id.is_none() {
72        input.content_id = resolve_content_id(
73            &state.content,
74            state.content_routing.as_deref(),
75            &input.page_url,
76            input.slug.as_deref(),
77        )
78        .await;
79    }
80
81    let created = state
82        .events
83        .create_event(req_ctx.session_id(), req_ctx.user_id(), &input)
84        .await
85        .map_err(|e| {
86            tracing::error!(error = %e, "Failed to record analytics event");
87            ApiError::internal_error("Failed to record analytics event")
88        })?;
89
90    if input.event_type == AnalyticsEventType::PageExit {
91        fan_out_engagement(&state, &req_ctx, &input).await;
92    }
93
94    Ok((StatusCode::CREATED, Json(created)))
95}
96
97pub(super) async fn record_events_batch(
98    State(state): State<AnalyticsState>,
99    Extension(req_ctx): Extension<RequestContext>,
100    Json(mut input): Json<CreateAnalyticsEventBatchInput>,
101) -> Result<impl IntoResponse, ApiError> {
102    for event in &mut input.events {
103        if event.content_id.is_none() {
104            event.content_id = resolve_content_id(
105                &state.content,
106                state.content_routing.as_deref(),
107                &event.page_url,
108                event.slug.as_deref(),
109            )
110            .await;
111        }
112    }
113
114    let created = state
115        .events
116        .create_events_batch(req_ctx.session_id(), req_ctx.user_id(), &input.events)
117        .await
118        .map_err(|e| {
119            tracing::error!(error = %e, "Failed to record analytics events batch");
120            ApiError::internal_error("Failed to record analytics events")
121        })?;
122
123    for event in &input.events {
124        if event.event_type == AnalyticsEventType::PageExit {
125            fan_out_engagement(&state, &req_ctx, event).await;
126        }
127    }
128
129    Ok((
130        StatusCode::CREATED,
131        Json(AnalyticsEventBatchResponse {
132            recorded: created.len(),
133            events: created,
134        }),
135    ))
136}
137
138async fn fan_out_engagement(
139    state: &AnalyticsState,
140    req_ctx: &RequestContext,
141    input: &CreateAnalyticsEventInput,
142) {
143    let Some(ref data) = input.data else { return };
144
145    let get_i32 = |key: &str| -> Option<i32> {
146        data.get(key)
147            .and_then(serde_json::Value::as_i64)
148            .map(|v| v as i32)
149    };
150    let get_f32 = |key: &str| -> Option<f32> {
151        data.get(key)
152            .and_then(serde_json::Value::as_f64)
153            .map(|v| v as f32)
154    };
155    let get_bool =
156        |key: &str| -> Option<bool> { data.get(key).and_then(serde_json::Value::as_bool) };
157    let get_string =
158        |key: &str| -> Option<String> { data.get(key).and_then(|v| v.as_str()).map(String::from) };
159
160    let time_on_page = get_i32("time_on_page_ms").unwrap_or(0);
161    if time_on_page == 0 {
162        return;
163    }
164
165    let engagement_input = CreateEngagementEventInput {
166        page_url: input.page_url.clone(),
167        event_type: input.event_type.as_str().to_owned(),
168        time_on_page_ms: time_on_page,
169        max_scroll_depth: get_i32("max_scroll_depth").unwrap_or(0),
170        click_count: get_i32("click_count").unwrap_or(0),
171        event_data: None,
172        optional_metrics: EngagementOptionalMetrics {
173            time_to_first_interaction_ms: get_i32("time_to_first_interaction_ms"),
174            time_to_first_scroll_ms: get_i32("time_to_first_scroll_ms"),
175            scroll_velocity_avg: get_f32("scroll_velocity_avg"),
176            scroll_direction_changes: get_i32("scroll_direction_changes"),
177            mouse_move_distance_px: get_i32("mouse_move_distance_px"),
178            keyboard_events: get_i32("keyboard_events"),
179            copy_events: get_i32("copy_events"),
180            focus_time_ms: get_i32("focus_time_ms"),
181            blur_count: get_i32("blur_count"),
182            tab_switches: get_i32("tab_switches"),
183            visible_time_ms: get_i32("visible_time_ms"),
184            hidden_time_ms: get_i32("hidden_time_ms"),
185            is_rage_click: get_bool("is_rage_click"),
186            is_dead_click: get_bool("is_dead_click"),
187            reading_pattern: get_string("reading_pattern"),
188        },
189    };
190
191    let content_id = resolve_content_id(
192        &state.content,
193        state.content_routing.as_deref(),
194        &input.page_url,
195        input.slug.as_deref(),
196    )
197    .await;
198
199    if let Err(e) = state
200        .engagement
201        .create_engagement(
202            req_ctx.session_id(),
203            req_ctx.user_id(),
204            content_id.as_ref(),
205            &engagement_input,
206        )
207        .await
208    {
209        tracing::warn!(error = %e, "Failed to fan out engagement data from page_exit event");
210    }
211}