Skip to main content

vibesql_server/subscription/manager/
query.rs

1//! Query execution and retry logic for subscriptions.
2
3use std::sync::atomic::Ordering;
4
5use tracing::{debug, trace, warn};
6use vibesql_storage::Database;
7
8use super::SubscriptionManager;
9use crate::subscription::{
10    classify_error_str, compute_delta_with_pk, hash_rows, PartialRowDelta, Subscription,
11    SubscriptionError, SubscriptionErrorKind, SubscriptionId, SubscriptionUpdate,
12};
13
14impl SubscriptionManager {
15    /// Execute query with retry logic for transient errors
16    pub(crate) async fn execute_with_retry(
17        &self,
18        subscription: &mut Subscription,
19        db: &Database,
20        id: SubscriptionId,
21    ) {
22        loop {
23            // Parse and execute the query
24            let result = self.execute_subscription_query(subscription, db, id).await;
25
26            match result {
27                Ok(rows) => {
28                    // Successful execution - reset retry count
29                    subscription.retry_count = 0;
30
31                    // Convert to Row format
32                    let result_rows: Vec<crate::Row> =
33                        rows.iter().map(|r| crate::Row { values: r.values.to_vec() }).collect();
34
35                    // Hash results for comparison
36                    let new_hash = hash_rows(&result_rows);
37
38                    if new_hash != subscription.last_result_hash {
39                        debug!(
40                            subscription_id = %id,
41                            old_hash = subscription.last_result_hash,
42                            new_hash = new_hash,
43                            row_count = result_rows.len(),
44                            "Results changed, notifying subscriber"
45                        );
46
47                        // Determine whether to send Delta, Partial, or Full update
48                        let update = if let Some(ref old_rows) = subscription.last_result {
49                            // We have previous results - compute delta using PK columns
50                            if let Some(delta) = compute_delta_with_pk(
51                                id,
52                                old_rows,
53                                &result_rows,
54                                &subscription.pk_columns,
55                            ) {
56                                // Check if we can use Partial updates (selective column updates)
57                                // Conditions:
58                                // 1. Subscription is selective_eligible (confident PK detection)
59                                // 2. Delta has only updates (no inserts or deletes)
60                                // 3. Updates exist
61                                if let SubscriptionUpdate::Delta {
62                                    ref inserts,
63                                    ref updates,
64                                    ref deletes,
65                                    ..
66                                } = delta
67                                {
68                                    if subscription.selective_eligible
69                                        && inserts.is_empty()
70                                        && deletes.is_empty()
71                                        && !updates.is_empty()
72                                    {
73                                        // Convert to Partial updates
74                                        let partial_updates: Vec<PartialRowDelta> = updates
75                                            .iter()
76                                            .filter_map(|(old_row, new_row)| {
77                                                PartialRowDelta::from_rows(
78                                                    old_row,
79                                                    new_row,
80                                                    &subscription.pk_columns,
81                                                )
82                                            })
83                                            .collect();
84
85                                        if !partial_updates.is_empty() {
86                                            debug!(
87                                                subscription_id = %id,
88                                                partial_updates = partial_updates.len(),
89                                                "Sending partial update (selective columns)"
90                                            );
91                                            SubscriptionUpdate::Partial {
92                                                subscription_id: id,
93                                                updates: partial_updates,
94                                            }
95                                        } else {
96                                            // Fall back to delta if partial conversion failed
97                                            debug!(
98                                                subscription_id = %id,
99                                                updates = updates.len(),
100                                                "Sending delta update (partial conversion failed)"
101                                            );
102                                            delta
103                                        }
104                                    } else {
105                                        // Log delta statistics and send as-is
106                                        debug!(
107                                            subscription_id = %id,
108                                            inserts = inserts.len(),
109                                            updates = updates.len(),
110                                            deletes = deletes.len(),
111                                            selective_eligible = subscription.selective_eligible,
112                                            "Sending delta update"
113                                        );
114                                        delta
115                                    }
116                                } else {
117                                    delta
118                                }
119                            } else {
120                                // No delta (shouldn't happen if hash changed, but be safe)
121                                SubscriptionUpdate::Full {
122                                    subscription_id: id,
123                                    rows: result_rows.clone(),
124                                }
125                            }
126                        } else {
127                            // No previous results - send full (first update after initial)
128                            debug!(
129                                subscription_id = %id,
130                                "No previous result, sending full update"
131                            );
132                            SubscriptionUpdate::Full {
133                                subscription_id: id,
134                                rows: result_rows.clone(),
135                            }
136                        };
137
138                        // Update stored state
139                        subscription.last_result_hash = new_hash;
140                        subscription.last_result = Some(result_rows);
141
142                        // Check for slow consumer before sending
143                        let capacity = subscription.notify_tx.capacity();
144                        let max_capacity = subscription.notify_tx.max_capacity();
145                        let used = max_capacity.saturating_sub(capacity);
146                        let usage_percent =
147                            if max_capacity > 0 { (used * 100) / max_capacity } else { 0 };
148
149                        if usage_percent >= subscription.slow_consumer_threshold_percent as usize {
150                            warn!(
151                                subscription_id = %id,
152                                used = used,
153                                max_capacity = max_capacity,
154                                usage_percent = usage_percent,
155                                threshold = subscription.slow_consumer_threshold_percent,
156                                "Slow consumer detected: subscription channel is {}% full. \
157                                 Consider increasing channel_buffer_size or client is consuming too slowly.",
158                                usage_percent
159                            );
160                        }
161
162                        // Use try_send for non-blocking send with backpressure detection
163                        match subscription.notify_tx.try_send(update) {
164                            Ok(()) => {
165                                subscription.updates_sent += 1;
166                                trace!(
167                                    subscription_id = %id,
168                                    updates_sent = subscription.updates_sent,
169                                    "Update sent successfully"
170                                );
171                            }
172                            Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
173                                subscription.updates_dropped += 1;
174                                warn!(
175                                    subscription_id = %id,
176                                    updates_dropped = subscription.updates_dropped,
177                                    channel_buffer_size = subscription.channel_buffer_size,
178                                    "Subscription channel full, dropping update. \
179                                     Consider increasing channel_buffer_size in SubscriptionConfig \
180                                     or ensure client is consuming updates faster."
181                                );
182                            }
183                            Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
184                                trace!(
185                                    subscription_id = %id,
186                                    "Notification channel closed, subscription will be cleaned up"
187                                );
188                            }
189                        }
190                    } else {
191                        trace!(
192                            subscription_id = %id,
193                            "Results unchanged, no notification needed"
194                        );
195                    }
196                    return;
197                }
198                Err(error_msg) => {
199                    // Classify the error to determine retry strategy
200                    let error_kind = classify_error_str(&error_msg);
201
202                    match error_kind {
203                        SubscriptionErrorKind::Permanent => {
204                            // Permanent error - don't retry, notify subscriber and stop
205                            debug!(
206                                subscription_id = %id,
207                                error = %error_msg,
208                                "Permanent error, not retrying"
209                            );
210                            let _ = subscription
211                                .notify_tx
212                                .send(SubscriptionUpdate::Error {
213                                    subscription_id: id,
214                                    message: format!(
215                                        "Query execution failed: {} (error will not be retried)",
216                                        error_msg
217                                    ),
218                                })
219                                .await;
220                            return;
221                        }
222                        SubscriptionErrorKind::Transient | SubscriptionErrorKind::Unknown => {
223                            // Transient error - may retry
224                            subscription.retry_count += 1;
225
226                            if subscription.retry_count > subscription.retry_policy.max_retries {
227                                // Exceeded max retries - circuit breaker
228                                warn!(
229                                    subscription_id = %id,
230                                    retry_count = subscription.retry_count,
231                                    max_retries = subscription.retry_policy.max_retries,
232                                    error = %error_msg,
233                                    "Subscription failed after max retries"
234                                );
235                                let _ = subscription
236                                    .notify_tx
237                                    .send(SubscriptionUpdate::Error {
238                                        subscription_id: id,
239                                        message: format!(
240                                            "Subscription failed after {} retries: {}",
241                                            subscription.retry_policy.max_retries, error_msg
242                                        ),
243                                    })
244                                    .await;
245                                return;
246                            }
247
248                            // Calculate backoff and retry
249                            let backoff = subscription
250                                .retry_policy
251                                .calculate_backoff(subscription.retry_count - 1);
252
253                            warn!(
254                                subscription_id = %id,
255                                retry_attempt = subscription.retry_count,
256                                backoff_ms = backoff.as_millis(),
257                                error_kind = %error_kind,
258                                error = %error_msg,
259                                "Retrying subscription query after transient error"
260                            );
261
262                            tokio::time::sleep(backoff).await;
263                            // Loop continues to retry
264                        }
265                    }
266                }
267            }
268        }
269    }
270
271    /// Execute the subscription query and return rows or error message
272    pub(crate) async fn execute_subscription_query(
273        &self,
274        subscription: &Subscription,
275        db: &Database,
276        id: SubscriptionId,
277    ) -> Result<Vec<vibesql_storage::Row>, String> {
278        // Re-execute the query
279        let executor = vibesql_executor::SelectExecutor::new(db);
280
281        // Parse and execute the query
282        match vibesql_parser::Parser::parse_sql(&subscription.query) {
283            Ok(vibesql_ast::Statement::Select(select)) => {
284                executor.execute(&select).map_err(|e| e.to_string())
285            }
286            Ok(_) => {
287                // Not a SELECT - shouldn't happen for subscriptions
288                warn!(
289                    subscription_id = %id,
290                    "Subscription query is not a SELECT"
291                );
292                Err("Subscription query is not a SELECT".to_string())
293            }
294            Err(e) => Err(format!("Failed to parse query: {}", e)),
295        }
296    }
297
298    /// Send initial results to a new subscriber
299    ///
300    /// Executes the query and sends the initial results. This should be called
301    /// right after subscribing to provide immediate data. The initial results
302    /// are always sent as a Full update.
303    ///
304    /// # Errors
305    ///
306    /// - `NotFound` if the subscription doesn't exist
307    /// - `ParseError` if the query fails to execute
308    /// - `ResultSetTooLarge` if the result set exceeds the configured limit
309    /// - `ChannelClosed` if the notification channel is closed
310    pub async fn send_initial_results(
311        &self,
312        id: SubscriptionId,
313        db: &Database,
314    ) -> Result<(), SubscriptionError> {
315        let mut sub_ref =
316            self.subscriptions.get_mut(&id).ok_or(SubscriptionError::NotFound(id))?;
317
318        let subscription = sub_ref.value_mut();
319
320        // Execute the query
321        let executor = vibesql_executor::SelectExecutor::new(db);
322        let stmt = vibesql_parser::Parser::parse_sql(&subscription.query)
323            .map_err(|e| SubscriptionError::ParseError(e.to_string()))?;
324
325        let rows = match stmt {
326            vibesql_ast::Statement::Select(select) => executor
327                .execute(&select)
328                .map_err(|e| SubscriptionError::ParseError(e.to_string()))?,
329            _ => return Err(SubscriptionError::ParseError("Not a SELECT query".to_string())),
330        };
331
332        // Check result set size limit
333        if rows.len() > self.config.max_result_rows {
334            self.result_set_exceeded_count.fetch_add(1, Ordering::Relaxed);
335            return Err(SubscriptionError::ResultSetTooLarge {
336                rows: rows.len(),
337                max: self.config.max_result_rows,
338            });
339        }
340
341        // Convert to Row format
342        let result_rows: Vec<crate::Row> =
343            rows.iter().map(|r| crate::Row { values: r.values.to_vec() }).collect();
344
345        // Update hash and store result for delta computation
346        subscription.last_result_hash = hash_rows(&result_rows);
347        subscription.last_result = Some(result_rows.clone());
348
349        // Send initial results (always Full for initial)
350        subscription
351            .notify_tx
352            .send(SubscriptionUpdate::Full { subscription_id: id, rows: result_rows })
353            .await
354            .map_err(|_| SubscriptionError::ChannelClosed)?;
355
356        Ok(())
357    }
358}