vibesql_server/subscription/manager/query.rs
1//! Query execution and retry logic for subscriptions.
2
3use std::sync::atomic::Ordering;
4
5use tracing::{debug, trace, warn};
6use vibesql_storage::Database;
7
8use super::SubscriptionManager;
9use crate::subscription::{
10 classify_error_str, compute_delta_with_pk, hash_rows, PartialRowDelta, Subscription,
11 SubscriptionError, SubscriptionErrorKind, SubscriptionId, SubscriptionUpdate,
12};
13
14impl SubscriptionManager {
15 /// Execute query with retry logic for transient errors
16 pub(crate) async fn execute_with_retry(
17 &self,
18 subscription: &mut Subscription,
19 db: &Database,
20 id: SubscriptionId,
21 ) {
22 loop {
23 // Parse and execute the query
24 let result = self.execute_subscription_query(subscription, db, id).await;
25
26 match result {
27 Ok(rows) => {
28 // Successful execution - reset retry count
29 subscription.retry_count = 0;
30
31 // Convert to Row format
32 let result_rows: Vec<crate::Row> =
33 rows.iter().map(|r| crate::Row { values: r.values.to_vec() }).collect();
34
35 // Hash results for comparison
36 let new_hash = hash_rows(&result_rows);
37
38 if new_hash != subscription.last_result_hash {
39 debug!(
40 subscription_id = %id,
41 old_hash = subscription.last_result_hash,
42 new_hash = new_hash,
43 row_count = result_rows.len(),
44 "Results changed, notifying subscriber"
45 );
46
47 // Determine whether to send Delta, Partial, or Full update
48 let update = if let Some(ref old_rows) = subscription.last_result {
49 // We have previous results - compute delta using PK columns
50 if let Some(delta) = compute_delta_with_pk(
51 id,
52 old_rows,
53 &result_rows,
54 &subscription.pk_columns,
55 ) {
56 // Check if we can use Partial updates (selective column updates)
57 // Conditions:
58 // 1. Subscription is selective_eligible (confident PK detection)
59 // 2. Delta has only updates (no inserts or deletes)
60 // 3. Updates exist
61 if let SubscriptionUpdate::Delta {
62 ref inserts,
63 ref updates,
64 ref deletes,
65 ..
66 } = delta
67 {
68 if subscription.selective_eligible
69 && inserts.is_empty()
70 && deletes.is_empty()
71 && !updates.is_empty()
72 {
73 // Convert to Partial updates
74 let partial_updates: Vec<PartialRowDelta> = updates
75 .iter()
76 .filter_map(|(old_row, new_row)| {
77 PartialRowDelta::from_rows(
78 old_row,
79 new_row,
80 &subscription.pk_columns,
81 )
82 })
83 .collect();
84
85 if !partial_updates.is_empty() {
86 debug!(
87 subscription_id = %id,
88 partial_updates = partial_updates.len(),
89 "Sending partial update (selective columns)"
90 );
91 SubscriptionUpdate::Partial {
92 subscription_id: id,
93 updates: partial_updates,
94 }
95 } else {
96 // Fall back to delta if partial conversion failed
97 debug!(
98 subscription_id = %id,
99 updates = updates.len(),
100 "Sending delta update (partial conversion failed)"
101 );
102 delta
103 }
104 } else {
105 // Log delta statistics and send as-is
106 debug!(
107 subscription_id = %id,
108 inserts = inserts.len(),
109 updates = updates.len(),
110 deletes = deletes.len(),
111 selective_eligible = subscription.selective_eligible,
112 "Sending delta update"
113 );
114 delta
115 }
116 } else {
117 delta
118 }
119 } else {
120 // No delta (shouldn't happen if hash changed, but be safe)
121 SubscriptionUpdate::Full {
122 subscription_id: id,
123 rows: result_rows.clone(),
124 }
125 }
126 } else {
127 // No previous results - send full (first update after initial)
128 debug!(
129 subscription_id = %id,
130 "No previous result, sending full update"
131 );
132 SubscriptionUpdate::Full {
133 subscription_id: id,
134 rows: result_rows.clone(),
135 }
136 };
137
138 // Update stored state
139 subscription.last_result_hash = new_hash;
140 subscription.last_result = Some(result_rows);
141
142 // Check for slow consumer before sending
143 let capacity = subscription.notify_tx.capacity();
144 let max_capacity = subscription.notify_tx.max_capacity();
145 let used = max_capacity.saturating_sub(capacity);
146 let usage_percent =
147 if max_capacity > 0 { (used * 100) / max_capacity } else { 0 };
148
149 if usage_percent >= subscription.slow_consumer_threshold_percent as usize {
150 warn!(
151 subscription_id = %id,
152 used = used,
153 max_capacity = max_capacity,
154 usage_percent = usage_percent,
155 threshold = subscription.slow_consumer_threshold_percent,
156 "Slow consumer detected: subscription channel is {}% full. \
157 Consider increasing channel_buffer_size or client is consuming too slowly.",
158 usage_percent
159 );
160 }
161
162 // Use try_send for non-blocking send with backpressure detection
163 match subscription.notify_tx.try_send(update) {
164 Ok(()) => {
165 subscription.updates_sent += 1;
166 trace!(
167 subscription_id = %id,
168 updates_sent = subscription.updates_sent,
169 "Update sent successfully"
170 );
171 }
172 Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
173 subscription.updates_dropped += 1;
174 warn!(
175 subscription_id = %id,
176 updates_dropped = subscription.updates_dropped,
177 channel_buffer_size = subscription.channel_buffer_size,
178 "Subscription channel full, dropping update. \
179 Consider increasing channel_buffer_size in SubscriptionConfig \
180 or ensure client is consuming updates faster."
181 );
182 }
183 Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
184 trace!(
185 subscription_id = %id,
186 "Notification channel closed, subscription will be cleaned up"
187 );
188 }
189 }
190 } else {
191 trace!(
192 subscription_id = %id,
193 "Results unchanged, no notification needed"
194 );
195 }
196 return;
197 }
198 Err(error_msg) => {
199 // Classify the error to determine retry strategy
200 let error_kind = classify_error_str(&error_msg);
201
202 match error_kind {
203 SubscriptionErrorKind::Permanent => {
204 // Permanent error - don't retry, notify subscriber and stop
205 debug!(
206 subscription_id = %id,
207 error = %error_msg,
208 "Permanent error, not retrying"
209 );
210 let _ = subscription
211 .notify_tx
212 .send(SubscriptionUpdate::Error {
213 subscription_id: id,
214 message: format!(
215 "Query execution failed: {} (error will not be retried)",
216 error_msg
217 ),
218 })
219 .await;
220 return;
221 }
222 SubscriptionErrorKind::Transient | SubscriptionErrorKind::Unknown => {
223 // Transient error - may retry
224 subscription.retry_count += 1;
225
226 if subscription.retry_count > subscription.retry_policy.max_retries {
227 // Exceeded max retries - circuit breaker
228 warn!(
229 subscription_id = %id,
230 retry_count = subscription.retry_count,
231 max_retries = subscription.retry_policy.max_retries,
232 error = %error_msg,
233 "Subscription failed after max retries"
234 );
235 let _ = subscription
236 .notify_tx
237 .send(SubscriptionUpdate::Error {
238 subscription_id: id,
239 message: format!(
240 "Subscription failed after {} retries: {}",
241 subscription.retry_policy.max_retries, error_msg
242 ),
243 })
244 .await;
245 return;
246 }
247
248 // Calculate backoff and retry
249 let backoff = subscription
250 .retry_policy
251 .calculate_backoff(subscription.retry_count - 1);
252
253 warn!(
254 subscription_id = %id,
255 retry_attempt = subscription.retry_count,
256 backoff_ms = backoff.as_millis(),
257 error_kind = %error_kind,
258 error = %error_msg,
259 "Retrying subscription query after transient error"
260 );
261
262 tokio::time::sleep(backoff).await;
263 // Loop continues to retry
264 }
265 }
266 }
267 }
268 }
269 }
270
271 /// Execute the subscription query and return rows or error message
272 pub(crate) async fn execute_subscription_query(
273 &self,
274 subscription: &Subscription,
275 db: &Database,
276 id: SubscriptionId,
277 ) -> Result<Vec<vibesql_storage::Row>, String> {
278 // Re-execute the query
279 let executor = vibesql_executor::SelectExecutor::new(db);
280
281 // Parse and execute the query
282 match vibesql_parser::Parser::parse_sql(&subscription.query) {
283 Ok(vibesql_ast::Statement::Select(select)) => {
284 executor.execute(&select).map_err(|e| e.to_string())
285 }
286 Ok(_) => {
287 // Not a SELECT - shouldn't happen for subscriptions
288 warn!(
289 subscription_id = %id,
290 "Subscription query is not a SELECT"
291 );
292 Err("Subscription query is not a SELECT".to_string())
293 }
294 Err(e) => Err(format!("Failed to parse query: {}", e)),
295 }
296 }
297
298 /// Send initial results to a new subscriber
299 ///
300 /// Executes the query and sends the initial results. This should be called
301 /// right after subscribing to provide immediate data. The initial results
302 /// are always sent as a Full update.
303 ///
304 /// # Errors
305 ///
306 /// - `NotFound` if the subscription doesn't exist
307 /// - `ParseError` if the query fails to execute
308 /// - `ResultSetTooLarge` if the result set exceeds the configured limit
309 /// - `ChannelClosed` if the notification channel is closed
310 pub async fn send_initial_results(
311 &self,
312 id: SubscriptionId,
313 db: &Database,
314 ) -> Result<(), SubscriptionError> {
315 let mut sub_ref =
316 self.subscriptions.get_mut(&id).ok_or(SubscriptionError::NotFound(id))?;
317
318 let subscription = sub_ref.value_mut();
319
320 // Execute the query
321 let executor = vibesql_executor::SelectExecutor::new(db);
322 let stmt = vibesql_parser::Parser::parse_sql(&subscription.query)
323 .map_err(|e| SubscriptionError::ParseError(e.to_string()))?;
324
325 let rows = match stmt {
326 vibesql_ast::Statement::Select(select) => executor
327 .execute(&select)
328 .map_err(|e| SubscriptionError::ParseError(e.to_string()))?,
329 _ => return Err(SubscriptionError::ParseError("Not a SELECT query".to_string())),
330 };
331
332 // Check result set size limit
333 if rows.len() > self.config.max_result_rows {
334 self.result_set_exceeded_count.fetch_add(1, Ordering::Relaxed);
335 return Err(SubscriptionError::ResultSetTooLarge {
336 rows: rows.len(),
337 max: self.config.max_result_rows,
338 });
339 }
340
341 // Convert to Row format
342 let result_rows: Vec<crate::Row> =
343 rows.iter().map(|r| crate::Row { values: r.values.to_vec() }).collect();
344
345 // Update hash and store result for delta computation
346 subscription.last_result_hash = hash_rows(&result_rows);
347 subscription.last_result = Some(result_rows.clone());
348
349 // Send initial results (always Full for initial)
350 subscription
351 .notify_tx
352 .send(SubscriptionUpdate::Full { subscription_id: id, rows: result_rows })
353 .await
354 .map_err(|_| SubscriptionError::ChannelClosed)?;
355
356 Ok(())
357 }
358}