1use std::sync::Arc;
8use std::time::Duration;
9
10use forge_core::signals::SignalEvent;
11use sqlx::PgPool;
12use tokio::sync::mpsc;
13use tracing::{debug, error, warn};
14
15const CHANNEL_CAPACITY: usize = 10_000;
17
18#[derive(Clone)]
23pub struct SignalsCollector {
24 tx: mpsc::Sender<SignalEvent>,
25}
26
27impl SignalsCollector {
28 pub fn spawn(pool: Arc<PgPool>, batch_size: usize, flush_interval: Duration) -> Self {
33 let (tx, rx) = mpsc::channel(CHANNEL_CAPACITY);
34 tokio::spawn(flush_loop(rx, pool, batch_size, flush_interval));
35 Self { tx }
36 }
37
38 pub fn try_send(&self, event: SignalEvent) {
40 if let Err(mpsc::error::TrySendError::Full(_)) = self.tx.try_send(event) {
41 warn!("signals collector channel full, dropping event");
42 }
43 }
44}
45
46async fn flush_loop(
48 mut rx: mpsc::Receiver<SignalEvent>,
49 pool: Arc<PgPool>,
50 batch_size: usize,
51 flush_interval: Duration,
52) {
53 let mut buffer: Vec<SignalEvent> = Vec::with_capacity(batch_size);
54 let mut interval = tokio::time::interval(flush_interval);
55 interval.tick().await;
56
57 loop {
58 tokio::select! {
59 event = rx.recv() => {
60 match event {
61 Some(e) => {
62 buffer.push(e);
63 if buffer.len() >= batch_size {
64 flush_batch(&pool, &mut buffer).await;
65 }
66 }
67 None => {
68 if !buffer.is_empty() {
70 flush_batch(&pool, &mut buffer).await;
71 }
72 debug!("signals collector shutting down");
73 return;
74 }
75 }
76 }
77 _ = interval.tick() => {
78 if !buffer.is_empty() {
79 flush_batch(&pool, &mut buffer).await;
80 }
81 }
82 }
83 }
84}
85
86async fn flush_batch(pool: &PgPool, buffer: &mut Vec<SignalEvent>) {
90 let count = buffer.len();
91
92 let mut ids = Vec::with_capacity(count);
93 let mut event_types = Vec::with_capacity(count);
94 let mut event_names: Vec<Option<String>> = Vec::with_capacity(count);
95 let mut correlation_ids: Vec<Option<String>> = Vec::with_capacity(count);
96 let mut session_ids: Vec<Option<uuid::Uuid>> = Vec::with_capacity(count);
97 let mut visitor_ids: Vec<Option<String>> = Vec::with_capacity(count);
98 let mut user_ids: Vec<Option<uuid::Uuid>> = Vec::with_capacity(count);
99 let mut tenant_ids: Vec<Option<uuid::Uuid>> = Vec::with_capacity(count);
100 let mut properties_list: Vec<serde_json::Value> = Vec::with_capacity(count);
101 let mut page_urls: Vec<Option<String>> = Vec::with_capacity(count);
102 let mut referrers: Vec<Option<String>> = Vec::with_capacity(count);
103 let mut function_names: Vec<Option<String>> = Vec::with_capacity(count);
104 let mut function_kinds: Vec<Option<String>> = Vec::with_capacity(count);
105 let mut duration_ms_list: Vec<Option<i32>> = Vec::with_capacity(count);
106 let mut statuses: Vec<Option<String>> = Vec::with_capacity(count);
107 let mut error_messages: Vec<Option<String>> = Vec::with_capacity(count);
108 let mut error_stacks: Vec<Option<String>> = Vec::with_capacity(count);
109 let mut error_contexts: Vec<Option<serde_json::Value>> = Vec::with_capacity(count);
110 let mut client_ips: Vec<Option<String>> = Vec::with_capacity(count);
111 let mut user_agents: Vec<Option<String>> = Vec::with_capacity(count);
112 let mut device_types: Vec<Option<String>> = Vec::with_capacity(count);
113 let mut browsers: Vec<Option<String>> = Vec::with_capacity(count);
114 let mut oses: Vec<Option<String>> = Vec::with_capacity(count);
115 let mut utm_sources: Vec<Option<String>> = Vec::with_capacity(count);
116 let mut utm_mediums: Vec<Option<String>> = Vec::with_capacity(count);
117 let mut utm_campaigns: Vec<Option<String>> = Vec::with_capacity(count);
118 let mut utm_terms: Vec<Option<String>> = Vec::with_capacity(count);
119 let mut utm_contents: Vec<Option<String>> = Vec::with_capacity(count);
120 let mut is_bots: Vec<bool> = Vec::with_capacity(count);
121 let mut timestamps: Vec<chrono::DateTime<chrono::Utc>> = Vec::with_capacity(count);
122
123 for event in buffer.drain(..) {
124 ids.push(uuid::Uuid::new_v4());
125 event_types.push(event.event_type.to_string());
126 event_names.push(event.event_name);
127 correlation_ids.push(event.correlation_id);
128 session_ids.push(event.session_id);
129 visitor_ids.push(event.visitor_id);
130 user_ids.push(event.user_id);
131 tenant_ids.push(event.tenant_id);
132 properties_list.push(event.properties);
133 page_urls.push(event.page_url);
134 referrers.push(event.referrer);
135 function_names.push(event.function_name);
136 function_kinds.push(event.function_kind);
137 duration_ms_list.push(event.duration_ms);
138 statuses.push(event.status);
139 error_messages.push(event.error_message);
140 error_stacks.push(event.error_stack);
141 error_contexts.push(event.error_context);
142 client_ips.push(event.client_ip);
143 user_agents.push(event.user_agent);
144 device_types.push(event.device_type);
145 browsers.push(event.browser);
146 oses.push(event.os);
147 let (src, med, camp, term, content) = match event.utm {
148 Some(utm) => (utm.source, utm.medium, utm.campaign, utm.term, utm.content),
149 None => (None, None, None, None, None),
150 };
151 utm_sources.push(src);
152 utm_mediums.push(med);
153 utm_campaigns.push(camp);
154 utm_terms.push(term);
155 utm_contents.push(content);
156 is_bots.push(event.is_bot);
157 timestamps.push(event.timestamp);
158 }
159
160 let result = sqlx::query(
161 "INSERT INTO forge_signals_events (
162 id, event_type, event_name, correlation_id,
163 session_id, visitor_id, user_id, tenant_id,
164 properties, page_url, referrer,
165 function_name, function_kind, duration_ms, status,
166 error_message, error_stack, error_context,
167 client_ip, user_agent,
168 device_type, browser, os,
169 utm_source, utm_medium, utm_campaign, utm_term, utm_content,
170 is_bot, timestamp
171 )
172 SELECT * FROM UNNEST(
173 $1::uuid[], $2::varchar[], $3::varchar[], $4::varchar[],
174 $5::uuid[], $6::varchar[], $7::uuid[], $8::uuid[],
175 $9::jsonb[], $10::text[], $11::text[],
176 $12::varchar[], $13::varchar[], $14::int[], $15::varchar[],
177 $16::text[], $17::text[], $18::jsonb[],
178 $19::text[], $20::text[],
179 $21::varchar[], $22::varchar[], $23::varchar[],
180 $24::varchar[], $25::varchar[], $26::varchar[], $27::varchar[], $28::varchar[],
181 $29::bool[], $30::timestamptz[]
182 )",
183 )
184 .bind(&ids)
185 .bind(&event_types)
186 .bind(&event_names)
187 .bind(&correlation_ids)
188 .bind(&session_ids)
189 .bind(&visitor_ids)
190 .bind(&user_ids)
191 .bind(&tenant_ids)
192 .bind(&properties_list)
193 .bind(&page_urls)
194 .bind(&referrers)
195 .bind(&function_names)
196 .bind(&function_kinds)
197 .bind(&duration_ms_list)
198 .bind(&statuses)
199 .bind(&error_messages)
200 .bind(&error_stacks)
201 .bind(&error_contexts)
202 .bind(&client_ips)
203 .bind(&user_agents)
204 .bind(&device_types)
205 .bind(&browsers)
206 .bind(&oses)
207 .bind(&utm_sources)
208 .bind(&utm_mediums)
209 .bind(&utm_campaigns)
210 .bind(&utm_terms)
211 .bind(&utm_contents)
212 .bind(&is_bots)
213 .bind(×tamps)
214 .execute(pool)
215 .await;
216
217 match result {
218 Ok(_) => debug!(count, "flushed signal events"),
219 Err(e) => error!(count, error = %e, "failed to flush signal events"),
220 }
221}