Skip to main content

systemprompt_api/routes/analytics/
events.rs

1use axum::extract::{Extension, State};
2use axum::http::StatusCode;
3use axum::response::IntoResponse;
4use axum::Json;
5use std::sync::Arc;
6
7use systemprompt_analytics::{
8    AnalyticsEventBatchResponse, AnalyticsEventType, AnalyticsEventsRepository,
9    CreateAnalyticsEventBatchInput, CreateAnalyticsEventInput, CreateEngagementEventInput,
10    EngagementOptionalMetrics, EngagementRepository,
11};
12use systemprompt_content::ContentRepository;
13use systemprompt_identifiers::ContentId;
14use systemprompt_models::api::ApiError;
15use systemprompt_models::execution::context::RequestContext;
16use systemprompt_models::ContentRouting;
17
18#[derive(Clone)]
19pub struct AnalyticsState {
20    pub events: Arc<AnalyticsEventsRepository>,
21    pub content: Arc<ContentRepository>,
22    pub engagement: Arc<EngagementRepository>,
23    pub content_routing: Option<Arc<dyn ContentRouting>>,
24}
25
26impl std::fmt::Debug for AnalyticsState {
27    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28        f.debug_struct("AnalyticsState")
29            .field("content_routing", &self.content_routing.is_some())
30            .finish_non_exhaustive()
31    }
32}
33
34async fn resolve_content_id(
35    content_repo: &ContentRepository,
36    content_routing: Option<&dyn ContentRouting>,
37    page_url: &str,
38    slug: Option<&str>,
39) -> Option<ContentId> {
40    let resolved_slug;
41    let slug_to_use = if let Some(s) = slug {
42        s
43    } else {
44        resolved_slug = content_routing.and_then(|r| r.resolve_slug(page_url))?;
45        &resolved_slug
46    };
47
48    content_repo
49        .get_by_slug(slug_to_use)
50        .await
51        .map_err(|e| {
52            tracing::warn!(error = %e, slug = %slug_to_use, "Failed to lookup content by slug");
53            e
54        })
55        .ok()
56        .flatten()
57        .map(|c| c.id)
58}
59
60pub async fn record_event(
61    State(state): State<AnalyticsState>,
62    Extension(req_ctx): Extension<RequestContext>,
63    Json(mut input): Json<CreateAnalyticsEventInput>,
64) -> Result<impl IntoResponse, ApiError> {
65    if input.content_id.is_none() {
66        input.content_id = resolve_content_id(
67            &state.content,
68            state.content_routing.as_deref(),
69            &input.page_url,
70            input.slug.as_deref(),
71        )
72        .await;
73    }
74
75    let created = state
76        .events
77        .create_event(
78            req_ctx.session_id().as_str(),
79            req_ctx.user_id().as_str(),
80            &input,
81        )
82        .await
83        .map_err(|e| {
84            tracing::error!(error = %e, "Failed to record analytics event");
85            ApiError::internal_error("Failed to record analytics event")
86        })?;
87
88    if input.event_type == AnalyticsEventType::PageExit {
89        fan_out_engagement(&state, &req_ctx, &input).await;
90    }
91
92    Ok((StatusCode::CREATED, Json(created)))
93}
94
95pub async fn record_events_batch(
96    State(state): State<AnalyticsState>,
97    Extension(req_ctx): Extension<RequestContext>,
98    Json(mut input): Json<CreateAnalyticsEventBatchInput>,
99) -> Result<impl IntoResponse, ApiError> {
100    for event in &mut input.events {
101        if event.content_id.is_none() {
102            event.content_id = resolve_content_id(
103                &state.content,
104                state.content_routing.as_deref(),
105                &event.page_url,
106                event.slug.as_deref(),
107            )
108            .await;
109        }
110    }
111
112    let created = state
113        .events
114        .create_events_batch(
115            req_ctx.session_id().as_str(),
116            req_ctx.user_id().as_str(),
117            &input.events,
118        )
119        .await
120        .map_err(|e| {
121            tracing::error!(error = %e, "Failed to record analytics events batch");
122            ApiError::internal_error("Failed to record analytics events")
123        })?;
124
125    for event in &input.events {
126        if event.event_type == AnalyticsEventType::PageExit {
127            fan_out_engagement(&state, &req_ctx, event).await;
128        }
129    }
130
131    Ok((
132        StatusCode::CREATED,
133        Json(AnalyticsEventBatchResponse {
134            recorded: created.len(),
135            events: created,
136        }),
137    ))
138}
139
140async fn fan_out_engagement(
141    state: &AnalyticsState,
142    req_ctx: &RequestContext,
143    input: &CreateAnalyticsEventInput,
144) {
145    let Some(ref data) = input.data else { return };
146
147    let get_i32 =
148        |key: &str| -> Option<i32> { data.get(key).and_then(|v| v.as_i64()).map(|v| v as i32) };
149    let get_f32 =
150        |key: &str| -> Option<f32> { data.get(key).and_then(|v| v.as_f64()).map(|v| v as f32) };
151    let get_bool = |key: &str| -> Option<bool> { data.get(key).and_then(|v| v.as_bool()) };
152    let get_string =
153        |key: &str| -> Option<String> { data.get(key).and_then(|v| v.as_str()).map(String::from) };
154
155    let time_on_page = get_i32("time_on_page_ms").unwrap_or(0);
156    if time_on_page == 0 {
157        return;
158    }
159
160    let engagement_input = CreateEngagementEventInput {
161        page_url: input.page_url.clone(),
162        event_type: input.event_type.as_str().to_string(),
163        time_on_page_ms: time_on_page,
164        max_scroll_depth: get_i32("max_scroll_depth").unwrap_or(0),
165        click_count: get_i32("click_count").unwrap_or(0),
166        optional_metrics: EngagementOptionalMetrics {
167            time_to_first_interaction_ms: get_i32("time_to_first_interaction_ms"),
168            time_to_first_scroll_ms: get_i32("time_to_first_scroll_ms"),
169            scroll_velocity_avg: get_f32("scroll_velocity_avg"),
170            scroll_direction_changes: get_i32("scroll_direction_changes"),
171            mouse_move_distance_px: get_i32("mouse_move_distance_px"),
172            keyboard_events: get_i32("keyboard_events"),
173            copy_events: get_i32("copy_events"),
174            focus_time_ms: get_i32("focus_time_ms"),
175            blur_count: get_i32("blur_count"),
176            tab_switches: get_i32("tab_switches"),
177            visible_time_ms: get_i32("visible_time_ms"),
178            hidden_time_ms: get_i32("hidden_time_ms"),
179            is_rage_click: get_bool("is_rage_click"),
180            is_dead_click: get_bool("is_dead_click"),
181            reading_pattern: get_string("reading_pattern"),
182        },
183    };
184
185    let content_id = resolve_content_id(
186        &state.content,
187        state.content_routing.as_deref(),
188        &input.page_url,
189        input.slug.as_deref(),
190    )
191    .await;
192
193    if let Err(e) = state
194        .engagement
195        .create_engagement(
196            req_ctx.session_id().as_str(),
197            req_ctx.user_id().as_str(),
198            content_id.as_ref(),
199            &engagement_input,
200        )
201        .await
202    {
203        tracing::warn!(error = %e, "Failed to fan out engagement data from page_exit event");
204    }
205}