Skip to main content

forge_runtime/signals/
collector.rs

1//! Buffered batch event writer for the signals pipeline.
2//!
3//! Events are sent via an mpsc channel and flushed in batches using
4//! PostgreSQL UNNEST for high-throughput INSERT. Uses the analytics
5//! connection pool to avoid contention with user queries.
6
7use std::sync::Arc;
8use std::time::Duration;
9
10use forge_core::signals::SignalEvent;
11use sqlx::PgPool;
12use tokio::sync::mpsc;
13use tracing::{debug, error, warn};
14
15/// Channel capacity before events start getting dropped.
16const CHANNEL_CAPACITY: usize = 10_000;
17
18/// Buffered signal event collector.
19///
20/// Clone-friendly (shares the mpsc sender). Send events from any async
21/// context via [`try_send`] which never blocks the caller.
22#[derive(Clone)]
23pub struct SignalsCollector {
24    tx: mpsc::Sender<SignalEvent>,
25}
26
27impl SignalsCollector {
28    /// Create a new collector and spawn the background flush task.
29    ///
30    /// Returns the collector handle. The flush task runs until the last
31    /// sender is dropped (i.e., all collector clones are gone).
32    pub fn spawn(pool: Arc<PgPool>, batch_size: usize, flush_interval: Duration) -> Self {
33        let (tx, rx) = mpsc::channel(CHANNEL_CAPACITY);
34        tokio::spawn(flush_loop(rx, pool, batch_size, flush_interval));
35        Self { tx }
36    }
37
38    /// Send an event without blocking. Drops the event if the channel is full.
39    pub fn try_send(&self, event: SignalEvent) {
40        if let Err(mpsc::error::TrySendError::Full(_)) = self.tx.try_send(event) {
41            warn!("signals collector channel full, dropping event");
42        }
43    }
44}
45
46/// Background loop that drains the channel and performs batch inserts.
47async fn flush_loop(
48    mut rx: mpsc::Receiver<SignalEvent>,
49    pool: Arc<PgPool>,
50    batch_size: usize,
51    flush_interval: Duration,
52) {
53    let mut buffer: Vec<SignalEvent> = Vec::with_capacity(batch_size);
54    let mut interval = tokio::time::interval(flush_interval);
55    interval.tick().await;
56
57    loop {
58        tokio::select! {
59            event = rx.recv() => {
60                match event {
61                    Some(e) => {
62                        buffer.push(e);
63                        if buffer.len() >= batch_size {
64                            flush_batch(&pool, &mut buffer).await;
65                        }
66                    }
67                    None => {
68                        // Channel closed, flush remaining and exit
69                        if !buffer.is_empty() {
70                            flush_batch(&pool, &mut buffer).await;
71                        }
72                        debug!("signals collector shutting down");
73                        return;
74                    }
75                }
76            }
77            _ = interval.tick() => {
78                if !buffer.is_empty() {
79                    flush_batch(&pool, &mut buffer).await;
80                }
81            }
82        }
83    }
84}
85
86/// Flush a batch of events into PostgreSQL using UNNEST for single-roundtrip INSERT.
87/// Uses runtime sqlx::query() because UNNEST with typed arrays is not supported by
88/// the compile-time sqlx::query!() macro.
89async fn flush_batch(pool: &PgPool, buffer: &mut Vec<SignalEvent>) {
90    let count = buffer.len();
91
92    let mut ids = Vec::with_capacity(count);
93    let mut event_types = Vec::with_capacity(count);
94    let mut event_names: Vec<Option<String>> = Vec::with_capacity(count);
95    let mut correlation_ids: Vec<Option<String>> = Vec::with_capacity(count);
96    let mut session_ids: Vec<Option<uuid::Uuid>> = Vec::with_capacity(count);
97    let mut visitor_ids: Vec<Option<String>> = Vec::with_capacity(count);
98    let mut user_ids: Vec<Option<uuid::Uuid>> = Vec::with_capacity(count);
99    let mut tenant_ids: Vec<Option<uuid::Uuid>> = Vec::with_capacity(count);
100    let mut properties_list: Vec<serde_json::Value> = Vec::with_capacity(count);
101    let mut page_urls: Vec<Option<String>> = Vec::with_capacity(count);
102    let mut referrers: Vec<Option<String>> = Vec::with_capacity(count);
103    let mut function_names: Vec<Option<String>> = Vec::with_capacity(count);
104    let mut function_kinds: Vec<Option<String>> = Vec::with_capacity(count);
105    let mut duration_ms_list: Vec<Option<i32>> = Vec::with_capacity(count);
106    let mut statuses: Vec<Option<String>> = Vec::with_capacity(count);
107    let mut error_messages: Vec<Option<String>> = Vec::with_capacity(count);
108    let mut error_stacks: Vec<Option<String>> = Vec::with_capacity(count);
109    let mut error_contexts: Vec<Option<serde_json::Value>> = Vec::with_capacity(count);
110    let mut client_ips: Vec<Option<String>> = Vec::with_capacity(count);
111    let mut user_agents: Vec<Option<String>> = Vec::with_capacity(count);
112    let mut device_types: Vec<Option<String>> = Vec::with_capacity(count);
113    let mut browsers: Vec<Option<String>> = Vec::with_capacity(count);
114    let mut oses: Vec<Option<String>> = Vec::with_capacity(count);
115    let mut utm_sources: Vec<Option<String>> = Vec::with_capacity(count);
116    let mut utm_mediums: Vec<Option<String>> = Vec::with_capacity(count);
117    let mut utm_campaigns: Vec<Option<String>> = Vec::with_capacity(count);
118    let mut utm_terms: Vec<Option<String>> = Vec::with_capacity(count);
119    let mut utm_contents: Vec<Option<String>> = Vec::with_capacity(count);
120    let mut is_bots: Vec<bool> = Vec::with_capacity(count);
121    let mut timestamps: Vec<chrono::DateTime<chrono::Utc>> = Vec::with_capacity(count);
122
123    for event in buffer.drain(..) {
124        ids.push(uuid::Uuid::new_v4());
125        event_types.push(event.event_type.to_string());
126        event_names.push(event.event_name);
127        correlation_ids.push(event.correlation_id);
128        session_ids.push(event.session_id);
129        visitor_ids.push(event.visitor_id);
130        user_ids.push(event.user_id);
131        tenant_ids.push(event.tenant_id);
132        properties_list.push(event.properties);
133        page_urls.push(event.page_url);
134        referrers.push(event.referrer);
135        function_names.push(event.function_name);
136        function_kinds.push(event.function_kind);
137        duration_ms_list.push(event.duration_ms);
138        statuses.push(event.status);
139        error_messages.push(event.error_message);
140        error_stacks.push(event.error_stack);
141        error_contexts.push(event.error_context);
142        client_ips.push(event.client_ip);
143        user_agents.push(event.user_agent);
144        device_types.push(event.device_type);
145        browsers.push(event.browser);
146        oses.push(event.os);
147        let (src, med, camp, term, content) = match event.utm {
148            Some(utm) => (utm.source, utm.medium, utm.campaign, utm.term, utm.content),
149            None => (None, None, None, None, None),
150        };
151        utm_sources.push(src);
152        utm_mediums.push(med);
153        utm_campaigns.push(camp);
154        utm_terms.push(term);
155        utm_contents.push(content);
156        is_bots.push(event.is_bot);
157        timestamps.push(event.timestamp);
158    }
159
160    let result = sqlx::query(
161        "INSERT INTO forge_signals_events (
162            id, event_type, event_name, correlation_id,
163            session_id, visitor_id, user_id, tenant_id,
164            properties, page_url, referrer,
165            function_name, function_kind, duration_ms, status,
166            error_message, error_stack, error_context,
167            client_ip, user_agent,
168            device_type, browser, os,
169            utm_source, utm_medium, utm_campaign, utm_term, utm_content,
170            is_bot, timestamp
171        )
172        SELECT * FROM UNNEST(
173            $1::uuid[], $2::varchar[], $3::varchar[], $4::varchar[],
174            $5::uuid[], $6::varchar[], $7::uuid[], $8::uuid[],
175            $9::jsonb[], $10::text[], $11::text[],
176            $12::varchar[], $13::varchar[], $14::int[], $15::varchar[],
177            $16::text[], $17::text[], $18::jsonb[],
178            $19::text[], $20::text[],
179            $21::varchar[], $22::varchar[], $23::varchar[],
180            $24::varchar[], $25::varchar[], $26::varchar[], $27::varchar[], $28::varchar[],
181            $29::bool[], $30::timestamptz[]
182        )",
183    )
184    .bind(&ids)
185    .bind(&event_types)
186    .bind(&event_names)
187    .bind(&correlation_ids)
188    .bind(&session_ids)
189    .bind(&visitor_ids)
190    .bind(&user_ids)
191    .bind(&tenant_ids)
192    .bind(&properties_list)
193    .bind(&page_urls)
194    .bind(&referrers)
195    .bind(&function_names)
196    .bind(&function_kinds)
197    .bind(&duration_ms_list)
198    .bind(&statuses)
199    .bind(&error_messages)
200    .bind(&error_stacks)
201    .bind(&error_contexts)
202    .bind(&client_ips)
203    .bind(&user_agents)
204    .bind(&device_types)
205    .bind(&browsers)
206    .bind(&oses)
207    .bind(&utm_sources)
208    .bind(&utm_mediums)
209    .bind(&utm_campaigns)
210    .bind(&utm_terms)
211    .bind(&utm_contents)
212    .bind(&is_bots)
213    .bind(&timestamps)
214    .execute(pool)
215    .await;
216
217    match result {
218        Ok(_) => debug!(count, "flushed signal events"),
219        Err(e) => error!(count, error = %e, "failed to flush signal events"),
220    }
221}