Skip to main content

vibesql_server/
connection.rs

1use crate::auth::PasswordStore;
2use crate::config::Config;
3use crate::observability::ObservabilityProvider;
4use crate::protocol::{
5    BackendMessage, FieldDescription, FrontendMessage, SelectiveUpdatesConfig, SubscriptionUpdateType, TransactionStatus,
6};
7use crate::registry::DatabaseRegistry;
8use crate::session::{ExecutionResult, Session};
9use crate::protocol::PartialRowUpdate;
10use crate::subscription::{
11    compute_delta_with_pk, create_partial_row_update, detect_pk_columns_from_stmt,
12    extract_table_refs, filter::SubscriptionFilter, hash_rows, SelectiveColumnConfig,
13    SubscriptionId, SubscriptionManager, SubscriptionUpdate,
14};
15use crate::Row;
16use anyhow::Result;
17use bytes::BytesMut;
18use std::collections::{HashMap, HashSet};
19use std::net::SocketAddr;
20use std::sync::atomic::{AtomicUsize, Ordering};
21use std::sync::Arc;
22use std::time::Instant;
23use tokio::io::{AsyncReadExt, AsyncWriteExt};
24use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
25use tokio::net::TcpStream;
26use tokio::sync::broadcast;
27use tracing::{debug, error, info, warn};
28use vibesql_executor::cache::table_extractor;
29
30/// Notification sent when a mutation affects tables
31/// This is broadcast to all connections so they can notify their subscriptions
32#[derive(Debug, Clone)]
33pub struct TableMutationNotification {
34    /// Tables that were affected by the mutation
35    pub affected_tables: HashSet<String>,
36}
37
38/// Connection handler for a single client
39pub struct ConnectionHandler {
40    /// Read half of the TCP stream (split for async select! usage)
41    read_half: OwnedReadHalf,
42    /// Write half of the TCP stream (split for async select! usage)
43    write_half: OwnedWriteHalf,
44    peer_addr: SocketAddr,
45    config: Arc<Config>,
46    observability: Arc<ObservabilityProvider>,
47    password_store: Option<Arc<PasswordStore>>,
48    read_buf: BytesMut,
49    write_buf: BytesMut,
50    session: Option<Session>,
51    connection_start: Instant,
52    active_connections: Arc<AtomicUsize>,
53    /// Database registry for shared database instances across connections
54    database_registry: DatabaseRegistry,
55    /// Unique identifier for this connection (for subscription tracking)
56    connection_id: String,
57    /// Global subscription manager for processing storage change events and tracking subscriptions
58    subscription_manager: Arc<SubscriptionManager>,
59    /// Broadcast sender for notifying other connections about mutations
60    mutation_broadcast_tx: broadcast::Sender<TableMutationNotification>,
61    /// Broadcast receiver for receiving mutation notifications from other connections
62    mutation_broadcast_rx: broadcast::Receiver<TableMutationNotification>,
63}
64
65/// Result of handling a client message
66enum ClientMessageResult {
67    /// Continue processing messages
68    Continue,
69    /// Client requested termination
70    Terminate,
71}
72
73impl ConnectionHandler {
74    /// Create a new connection handler
75    #[allow(clippy::too_many_arguments)]
76    pub fn new(
77        stream: TcpStream,
78        peer_addr: SocketAddr,
79        config: Arc<Config>,
80        observability: Arc<ObservabilityProvider>,
81        password_store: Option<Arc<PasswordStore>>,
82        active_connections: Arc<AtomicUsize>,
83        database_registry: DatabaseRegistry,
84        subscription_manager: Arc<SubscriptionManager>,
85        mutation_broadcast_tx: broadcast::Sender<TableMutationNotification>,
86    ) -> Self {
87        // Split the TCP stream for async select! usage
88        // This allows us to wait on both client messages and broadcast notifications simultaneously
89        let (read_half, write_half) = stream.into_split();
90
91        // Subscribe to the broadcast channel to receive notifications from other connections
92        let mutation_broadcast_rx = mutation_broadcast_tx.subscribe();
93
94        // Generate a unique connection ID for subscription tracking
95        let connection_id = uuid::Uuid::new_v4().to_string();
96
97        Self {
98            read_half,
99            write_half,
100            peer_addr,
101            config,
102            observability,
103            password_store,
104            read_buf: BytesMut::with_capacity(8192),
105            write_buf: BytesMut::with_capacity(8192),
106            session: None,
107            connection_start: Instant::now(),
108            active_connections,
109            database_registry,
110            connection_id,
111            subscription_manager,
112            mutation_broadcast_tx,
113            mutation_broadcast_rx,
114        }
115    }
116
117    /// Handle the connection
118    pub async fn handle(&mut self) -> Result<()> {
119        // Perform startup handshake
120        self.startup_handshake().await?;
121
122        // Process queries
123        self.process_queries().await?;
124
125        Ok(())
126    }
127
128    /// Perform the PostgreSQL startup handshake
129    async fn startup_handshake(&mut self) -> Result<()> {
130        debug!("Starting handshake with {}", self.peer_addr);
131
132        // Read startup message
133        self.read_message().await?;
134
135        let startup_msg = FrontendMessage::decode_startup(&mut self.read_buf)?;
136
137        match startup_msg {
138            Some(FrontendMessage::SSLRequest) => {
139                debug!("Received SSL request");
140                // We don't support SSL yet, send 'N'
141                self.write_half.write_u8(b'N').await?;
142                self.write_half.flush().await?;
143
144                // Read actual startup message after SSL rejection
145                self.read_buf.clear();
146                self.read_message().await?;
147
148                let startup_msg = FrontendMessage::decode_startup(&mut self.read_buf)?;
149                self.handle_startup(startup_msg).await?;
150            }
151
152            Some(msg) => {
153                self.handle_startup(Some(msg)).await?;
154            }
155
156            None => {
157                return Err(anyhow::anyhow!("No startup message received"));
158            }
159        }
160
161        Ok(())
162    }
163
164    /// Handle startup message and authentication
165    async fn handle_startup(&mut self, msg: Option<FrontendMessage>) -> Result<()> {
166        match msg {
167            Some(FrontendMessage::Startup { protocol_version, params }) => {
168                debug!("Startup: version={}, params={:?}", protocol_version, params);
169
170                let user = params.get("user").cloned().unwrap_or_else(|| "postgres".to_string());
171                let database = params.get("database").cloned().unwrap_or_else(|| user.clone());
172
173                // Perform authentication
174                self.authenticate(&user).await?;
175
176                // Get or create shared database from registry
177                let shared_db = self.database_registry.get_or_create(&database).await;
178
179                // Create session with shared database
180                self.session = Some(Session::new(database.clone(), user.clone(), shared_db));
181
182                info!("User '{}' connected to database '{}'", user, database);
183
184                // Send startup complete messages
185                self.send_parameter_status("server_version", "14.0 (VibeSQL)").await?;
186                self.send_parameter_status("server_encoding", "UTF8").await?;
187                self.send_parameter_status("client_encoding", "UTF8").await?;
188                self.send_parameter_status("DateStyle", "ISO, MDY").await?;
189                self.send_parameter_status("TimeZone", "UTC").await?;
190
191                // Send backend key data (for cancel requests)
192                self.send_backend_key_data().await?;
193
194                // Send ready for query
195                self.send_ready_for_query(TransactionStatus::Idle).await?;
196
197                Ok(())
198            }
199
200            _ => Err(anyhow::anyhow!("Invalid startup message")),
201        }
202    }
203
204    /// Authenticate the user
205    async fn authenticate(&mut self, user: &str) -> Result<()> {
206        match self.config.auth.method.as_str() {
207            "trust" => {
208                // Trust authentication - no password required
209                debug!("Using trust authentication for user '{}'", user);
210                self.send_authentication_ok().await?;
211                Ok(())
212            }
213
214            "password" => {
215                // Cleartext password authentication
216                debug!("Requesting cleartext password for user '{}'", user);
217                self.send_cleartext_password_request().await?;
218
219                // Read password response
220                self.read_message().await?;
221                let msg = FrontendMessage::decode(&mut self.read_buf)?;
222
223                match msg {
224                    Some(FrontendMessage::Password { password }) => {
225                        debug!("Received password from user '{}'", user);
226
227                        if let Some(ref store) = self.password_store {
228                            if store.verify_cleartext(user, &password) {
229                                info!("User '{}' authenticated successfully", user);
230                                self.send_authentication_ok().await?;
231                                Ok(())
232                            } else {
233                                error!("Authentication failed for user '{}'", user);
234                                Err(anyhow::anyhow!("Authentication failed"))
235                            }
236                        } else {
237                            error!("No password store configured");
238                            Err(anyhow::anyhow!("Authentication not configured"))
239                        }
240                    }
241                    _ => {
242                        error!("Expected password message, got: {:?}", msg);
243                        Err(anyhow::anyhow!("Expected password message"))
244                    }
245                }
246            }
247
248            "md5" => {
249                // MD5 password authentication
250                debug!("Requesting MD5 password for user '{}'", user);
251
252                // Generate random salt
253                use rand::Rng;
254                let salt: [u8; 4] = rand::rng().random();
255
256                self.send_md5_password_request(&salt).await?;
257
258                // Read password response
259                self.read_message().await?;
260                let msg = FrontendMessage::decode(&mut self.read_buf)?;
261
262                match msg {
263                    Some(FrontendMessage::Password { password }) => {
264                        debug!("Received MD5 password response from user '{}'", user);
265
266                        if let Some(ref store) = self.password_store {
267                            if store.verify_md5(user, &password, &salt) {
268                                info!("User '{}' authenticated successfully (MD5)", user);
269                                self.send_authentication_ok().await?;
270                                Ok(())
271                            } else {
272                                error!("MD5 authentication failed for user '{}'", user);
273                                Err(anyhow::anyhow!("Authentication failed"))
274                            }
275                        } else {
276                            error!("No password store configured");
277                            Err(anyhow::anyhow!("Authentication not configured"))
278                        }
279                    }
280                    _ => {
281                        error!("Expected password message, got: {:?}", msg);
282                        Err(anyhow::anyhow!("Expected password message"))
283                    }
284                }
285            }
286
287            "scram-sha-256" => {
288                // SCRAM-SHA-256 not yet implemented
289                error!("SCRAM-SHA-256 authentication not yet implemented");
290                Err(anyhow::anyhow!("SCRAM-SHA-256 not implemented"))
291            }
292
293            _ => {
294                error!("Unsupported authentication method: {}", self.config.auth.method);
295                Err(anyhow::anyhow!("Unsupported authentication method"))
296            }
297        }
298    }
299
300    /// Process queries from the client
301    ///
302    /// This method handles both:
303    /// 1. Client messages from the TCP stream
304    /// 2. Broadcast notifications from other connections about table mutations
305    ///
306    /// This enables cross-connection subscription notifications: when connection A
307    /// mutates a table, connection B's subscriptions on that table are notified.
308    ///
309    /// Uses `tokio::select!` to wait on both sources simultaneously with near-zero
310    /// latency, avoiding the previous 100ms polling approach.
311    async fn process_queries(&mut self) -> Result<()> {
312        loop {
313            // First, process any complete messages already in the buffer
314            // This handles cases where multiple messages arrived in a single TCP read
315            while let Some(msg) = FrontendMessage::decode(&mut self.read_buf)? {
316                match self.handle_client_message(msg).await? {
317                    ClientMessageResult::Continue => {}
318                    ClientMessageResult::Terminate => {
319                        let (total, selective_eligible) = self.subscription_manager
320                            .unsubscribe_all_for_connection(&self.connection_id);
321                        if let Some(metrics) = self.observability.metrics() {
322                            for _ in 0..total {
323                                metrics.decrement_subscriptions_active();
324                            }
325                            for _ in 0..selective_eligible {
326                                metrics.decrement_selective_eligible();
327                            }
328                        }
329                        return Ok(());
330                    }
331                }
332            }
333
334            // No complete message in buffer - wait for either:
335            // 1. More data from the client TCP stream
336            // 2. Broadcast notifications from other connections
337            //
338            // Using select! provides near-zero latency for cross-connection notifications
339            // compared to the previous 100ms timeout polling approach.
340            tokio::select! {
341                biased;  // Prioritize broadcast notifications for lower latency
342
343                // Check for cross-connection mutation notifications
344                notification = self.mutation_broadcast_rx.recv() => {
345                    match notification {
346                        Ok(n) => {
347                            if self.subscription_manager.connection_subscription_count(&self.connection_id) > 0 {
348                                self.handle_cross_connection_notification(&n.affected_tables).await;
349                            }
350                        }
351                        Err(broadcast::error::RecvError::Lagged(n)) => {
352                            debug!("Missed {} broadcast notifications (lagged)", n);
353                        }
354                        Err(broadcast::error::RecvError::Closed) => {
355                            warn!("Mutation broadcast channel closed");
356                        }
357                    }
358                }
359
360                // Read more data from the client
361                read_result = self.read_half.read_buf(&mut self.read_buf) => {
362                    match read_result {
363                        Ok(0) => {
364                            // Connection closed by client
365                            debug!("Connection closed by client");
366                            break;
367                        }
368                        Ok(_) => {
369                            // Data received - loop back to decode and process messages
370                        }
371                        Err(e) => {
372                            return Err(e.into());
373                        }
374                    }
375                }
376            }
377        }
378
379        // Clean up subscriptions when connection closes
380        let (total, selective_eligible) = self.subscription_manager
381            .unsubscribe_all_for_connection(&self.connection_id);
382        if let Some(metrics) = self.observability.metrics() {
383            for _ in 0..total {
384                metrics.decrement_subscriptions_active();
385            }
386            for _ in 0..selective_eligible {
387                metrics.decrement_selective_eligible();
388            }
389        }
390
391        Ok(())
392    }
393
394    /// Handle a single client message
395    async fn handle_client_message(&mut self, msg: FrontendMessage) -> Result<ClientMessageResult> {
396        match msg {
397            FrontendMessage::Query { query } => {
398                debug!("Query: {}", query);
399                self.execute_query(&query).await?;
400                Ok(ClientMessageResult::Continue)
401            }
402
403            FrontendMessage::Subscribe { query, params, filter, selective_updates_config } => {
404                debug!("Subscribe: {} (filter: {:?}, selective_config: {:?})", query, filter, selective_updates_config);
405                self.handle_subscribe(&query, params, filter, selective_updates_config).await?;
406                Ok(ClientMessageResult::Continue)
407            }
408
409            FrontendMessage::Unsubscribe { subscription_id } => {
410                debug!("Unsubscribe: {:?}", subscription_id);
411                let was_selective_eligible = self.subscription_manager.unsubscribe_by_wire_id(&subscription_id);
412                if was_selective_eligible {
413                    if let Some(metrics) = self.observability.metrics() {
414                        metrics.decrement_selective_eligible();
415                    }
416                }
417                // No response needed per protocol spec
418                Ok(ClientMessageResult::Continue)
419            }
420
421            FrontendMessage::Terminate => {
422                debug!("Client requested termination");
423                Ok(ClientMessageResult::Terminate)
424            }
425
426            msg => {
427                warn!("Unexpected message: {:?}", msg);
428                Ok(ClientMessageResult::Continue)
429            }
430        }
431    }
432
433    /// Handle a cross-connection notification about table mutations
434    ///
435    /// When another connection mutates tables, this method is called to
436    /// check if any of our subscriptions are affected and send updates.
437    /// This method supports delta updates to reduce network bandwidth when
438    /// only a small portion of the result set has changed.
439    /// Supports optional filtering expressions to send only matching rows.
440    #[allow(clippy::type_complexity)]
441    async fn handle_cross_connection_notification(&mut self, affected_tables: &HashSet<String>) {
442        // Collect subscriptions for THIS connection that need updating
443        let subscriptions_to_update: Vec<([u8; 16], String, u64, Option<Vec<Row>>, Option<String>)> =
444            affected_tables
445                .iter()
446                .flat_map(|table| {
447                    self.subscription_manager
448                        .get_affected_subscriptions_for_connection(table, &self.connection_id)
449                })
450                .collect();
451
452        if subscriptions_to_update.is_empty() {
453            return;
454        }
455
456        // De-duplicate subscriptions (a subscription may depend on multiple affected tables)
457        let mut seen = std::collections::HashSet::new();
458        let unique_subscriptions: Vec<_> = subscriptions_to_update
459            .into_iter()
460            .filter(|(id, _, _, _, _)| seen.insert(*id))
461            .collect();
462
463        debug!(
464            "Cross-connection notification: notifying {} subscriptions for tables: {:?}",
465            unique_subscriptions.len(),
466            affected_tables
467        );
468
469        // Re-execute each subscription query and send updates
470        for (subscription_id, query, last_hash, last_result, filter) in unique_subscriptions {
471            if let Some(session) = &mut self.session {
472                match session.execute(&query).await {
473                    Ok(ExecutionResult::Select { rows, columns }) => {
474                        // Build filter if present
475                        let filter_opt = filter.as_ref().and_then(|f| {
476                            let col_names: Vec<String> =
477                                columns.iter().map(|c| c.name.clone()).collect();
478                            SubscriptionFilter::new(f, &col_names).ok()
479                        });
480
481                        // Filter rows if filter is present, then convert to Row format
482                        let new_rows: Vec<Row> = if let Some(ref flt) = filter_opt {
483                            rows.iter()
484                                .filter(|row| flt.matches(&row.values))
485                                .map(|r| Row { values: r.values.clone() })
486                                .collect()
487                        } else {
488                            rows.iter().map(|r| Row { values: r.values.clone() }).collect()
489                        };
490
491                        // Compute hash for change detection
492                        let new_hash = hash_rows(&new_rows);
493
494                        // Skip if results haven't changed
495                        if new_hash == last_hash {
496                            debug!(
497                                "Cross-connection update: results unchanged for subscription {:?}",
498                                subscription_id
499                            );
500                            continue;
501                        }
502
503                        // Determine whether to send delta or full update
504                        if let Some(ref old_rows) = last_result {
505                            // First, try selective column updates (0xF7) using effective config
506                            if self
507                                .try_send_selective_updates(
508                                    &subscription_id,
509                                    old_rows,
510                                    &new_rows,
511                                )
512                                .await
513                            {
514                                // Selective updates sent successfully - update stored result
515                                self.subscription_manager.update_result_by_wire_id(
516                                    &subscription_id,
517                                    new_hash,
518                                    new_rows,
519                                );
520                                continue;
521                            }
522
523                            // Fall back to delta updates using PK columns
524                            let pk_columns = self
525                                .subscription_manager
526                                .get_pk_columns_by_wire_id(&subscription_id);
527                            if let Some(delta) = compute_delta_with_pk(
528                                SubscriptionId::default(),
529                                old_rows,
530                                &new_rows,
531                                &pk_columns,
532                            ) {
533                                // Send delta updates
534                                if let Err(e) =
535                                    self.send_delta_updates(&subscription_id, &delta).await
536                                {
537                                    warn!(
538                                        "Failed to send cross-connection delta update: {}",
539                                        e
540                                    );
541                                }
542
543                                // Log delta statistics
544                                if let SubscriptionUpdate::Delta {
545                                    ref inserts,
546                                    ref updates,
547                                    ref deletes,
548                                    ..
549                                } = delta
550                                {
551                                    debug!(
552                                        "Cross-connection delta update sent: {} inserts, {} updates, {} deletes for subscription {:?}",
553                                        inserts.len(),
554                                        updates.len(),
555                                        deletes.len(),
556                                        subscription_id
557                                    );
558                                }
559                            } else {
560                                // No delta computed (shouldn't happen if hash changed)
561                                // Fall back to full update
562                                let wire_rows = Self::rows_to_wire_format(&new_rows);
563                                if let Err(e) = self
564                                    .send_subscription_data(
565                                        &subscription_id,
566                                        SubscriptionUpdateType::Full,
567                                        wire_rows,
568                                    )
569                                    .await
570                                {
571                                    warn!("Failed to send cross-connection full update: {}", e);
572                                }
573                            }
574                        } else {
575                            // No previous results - send full update
576                            debug!(
577                                "Cross-connection update: no previous result, sending full update for subscription {:?}",
578                                subscription_id
579                            );
580                            let wire_rows = Self::rows_to_wire_format(&new_rows);
581                            if let Err(e) = self
582                                .send_subscription_data(
583                                    &subscription_id,
584                                    SubscriptionUpdateType::Full,
585                                    wire_rows,
586                                )
587                                .await
588                            {
589                                warn!("Failed to send cross-connection full update: {}", e);
590                            }
591                        }
592
593                        // Update stored result for next delta computation
594                        self.subscription_manager.update_result_by_wire_id(
595                            &subscription_id,
596                            new_hash,
597                            new_rows,
598                        );
599                    }
600                    Ok(_) => {
601                        // Non-SELECT result - shouldn't happen for a subscription query
602                        warn!("Subscription query returned non-SELECT result");
603                    }
604                    Err(e) => {
605                        // Query failed - send error to subscriber
606                        if let Err(send_err) = self
607                            .send_subscription_error(&subscription_id, &format!("Query error: {}", e))
608                            .await
609                        {
610                            warn!("Failed to send subscription error: {}", send_err);
611                        }
612                    }
613                }
614            }
615        }
616    }
617
618    /// Send delta updates to a subscription
619    ///
620    /// The wire protocol sends separate messages for inserts, updates, and deletes.
621    /// For UPDATE operations, we use PartialRowUpdate to send only changed columns
622    /// plus PK columns, reducing wire traffic for wide tables.
623    async fn send_delta_updates(
624        &mut self,
625        subscription_id: &[u8; 16],
626        delta: &SubscriptionUpdate,
627    ) -> Result<()> {
628        if let SubscriptionUpdate::Delta { inserts, updates, deletes, .. } = delta {
629            // Send deletes first (so clients can remove before adding)
630            if !deletes.is_empty() {
631                let wire_rows = Self::rows_to_wire_format(deletes);
632                self.send_subscription_data(
633                    subscription_id,
634                    SubscriptionUpdateType::DeltaDelete,
635                    wire_rows,
636                )
637                .await?;
638            }
639
640            // Send updates using partial row format when beneficial
641            if !updates.is_empty() {
642                // Get effective selective config for this subscription
643                // Uses per-subscription override if set, otherwise falls back to server config
644                let config = self.subscription_manager.get_effective_selective_config_by_wire_id(
645                    subscription_id,
646                    &self.config.subscriptions.selective_updates,
647                );
648                let pk_columns = config.pk_columns.clone();
649
650                // Separate updates into partial and full based on threshold
651                let mut partial_updates: Vec<PartialRowUpdate> = Vec::new();
652                let mut full_updates: Vec<Vec<Option<Vec<u8>>>> = Vec::new();
653
654                for (old_row, new_row) in updates {
655                    // Convert rows to wire format
656                    let old_wire: Vec<Option<Vec<u8>>> = old_row
657                        .values
658                        .iter()
659                        .map(|v| Some(v.to_string().as_bytes().to_vec()))
660                        .collect();
661                    let new_wire: Vec<Option<Vec<u8>>> = new_row
662                        .values
663                        .iter()
664                        .map(|v| Some(v.to_string().as_bytes().to_vec()))
665                        .collect();
666
667                    // Try to create a partial update
668                    if let Some(partial) =
669                        create_partial_row_update(&old_wire, &new_wire, &pk_columns, &config)
670                    {
671                        partial_updates.push(partial);
672                    } else {
673                        // Fall back to full row update
674                        full_updates.push(new_wire);
675                    }
676                }
677
678                // Send partial updates via SubscriptionPartialData (0xF7)
679                if !partial_updates.is_empty() {
680                    self.send_subscription_partial_data(subscription_id, partial_updates).await?;
681                }
682
683                // Send any full updates via regular DeltaUpdate
684                if !full_updates.is_empty() {
685                    self.send_subscription_data(
686                        subscription_id,
687                        SubscriptionUpdateType::DeltaUpdate,
688                        full_updates,
689                    )
690                    .await?;
691                }
692            }
693
694            // Send inserts last
695            if !inserts.is_empty() {
696                let wire_rows = Self::rows_to_wire_format(inserts);
697                self.send_subscription_data(
698                    subscription_id,
699                    SubscriptionUpdateType::DeltaInsert,
700                    wire_rows,
701                )
702                .await?;
703            }
704        }
705        Ok(())
706    }
707
708    /// Convert rows to wire format for sending over the protocol
709    fn rows_to_wire_format(rows: &[Row]) -> Vec<Vec<Option<Vec<u8>>>> {
710        rows.iter()
711            .map(|row| {
712                row.values.iter().map(|v| Some(v.to_string().as_bytes().to_vec())).collect()
713            })
714            .collect()
715    }
716
717    /// Try to send selective column updates (0xF7) for row updates
718    ///
719    /// Returns `true` if selective updates were sent, `false` if caller should
720    /// fall back to regular updates.
721    ///
722    /// Selective updates are used when:
723    /// - Config has selective updates enabled
724    /// - Row counts match (updates only, not inserts/deletes)
725    /// - Rows can be matched by primary key
726    /// - Changed columns ratio is within threshold
727    async fn try_send_selective_updates(
728        &mut self,
729        subscription_id: &[u8; 16],
730        old_rows: &[Row],
731        new_rows: &[Row],
732    ) -> bool {
733        // Get effective selective config (uses per-subscription override if set)
734        let selective_config = self.subscription_manager.get_effective_selective_config_by_wire_id(
735            subscription_id,
736            &self.config.subscriptions.selective_updates,
737        );
738
739        // Check if selective updates are enabled in effective config
740        if !selective_config.enabled {
741            debug!(
742                "Selective update skipped for subscription {:?}: disabled in config",
743                subscription_id
744            );
745            if let Some(metrics) = self.observability.metrics() {
746                metrics.record_partial_update_fallback("disabled");
747                metrics.record_selective_update_decision("sent_full", Some("disabled"));
748            }
749            return false;
750        }
751
752        // Row counts must match for selective updates (no inserts/deletes)
753        if old_rows.len() != new_rows.len() {
754            debug!(
755                "Selective update skipped for subscription {:?}: row count mismatch (old={}, new={})",
756                subscription_id,
757                old_rows.len(),
758                new_rows.len()
759            );
760            if let Some(metrics) = self.observability.metrics() {
761                metrics.record_partial_update_fallback("row_count_mismatch");
762                metrics.record_selective_update_decision("sent_full", Some("row_count_mismatch"));
763            }
764            return false;
765        }
766
767        if old_rows.is_empty() {
768            return false;
769        }
770
771        let pk_columns = &selective_config.pk_columns;
772
773        // Convert rows to wire format for comparison
774        let old_wire: Vec<Vec<Option<Vec<u8>>>> = Self::rows_to_wire_format(old_rows);
775        let new_wire: Vec<Vec<Option<Vec<u8>>>> = Self::rows_to_wire_format(new_rows);
776
777        // Build a map from PK values to row index for old rows
778        let mut pk_to_old_idx: HashMap<Vec<Option<Vec<u8>>>, usize> = HashMap::new();
779        for (idx, row) in old_wire.iter().enumerate() {
780            let pk_values: Vec<Option<Vec<u8>>> =
781                pk_columns.iter().filter_map(|&col| row.get(col).cloned()).collect();
782            pk_to_old_idx.insert(pk_values, idx);
783        }
784
785        // Try to create partial row updates for each new row
786        let mut partial_updates = Vec::new();
787        let mut threshold_exceeded_count = 0u64;
788        for new_row in &new_wire {
789            // Extract PK from new row
790            let pk_values: Vec<Option<Vec<u8>>> =
791                pk_columns.iter().filter_map(|&col| new_row.get(col).cloned()).collect();
792
793            // Find matching old row by PK
794            if let Some(&old_idx) = pk_to_old_idx.get(&pk_values) {
795                let old_row = &old_wire[old_idx];
796
797                // Try to create a partial row update
798                if let Some(partial) =
799                    create_partial_row_update(old_row, new_row, pk_columns, &selective_config)
800                {
801                    // Record column ratio for successful partial updates
802                    let changed_count = old_row
803                        .iter()
804                        .zip(new_row.iter())
805                        .filter(|(o, n)| o != n)
806                        .count();
807                    if let Some(metrics) = self.observability.metrics() {
808                        metrics.record_selective_update_column_ratio(changed_count, new_row.len());
809                    }
810                    partial_updates.push(partial);
811                } else {
812                    // Check if this was due to threshold exceeded (too many columns changed)
813                    let changed_count = old_row
814                        .iter()
815                        .zip(new_row.iter())
816                        .filter(|(o, n)| o != n)
817                        .count();
818                    if changed_count > 0 {
819                        let ratio = changed_count as f64 / new_row.len() as f64;
820                        // Record column ratio for analysis (helps tuning threshold)
821                        if let Some(metrics) = self.observability.metrics() {
822                            metrics.record_selective_update_column_ratio(changed_count, new_row.len());
823                        }
824                        if ratio > selective_config.max_changed_columns_ratio {
825                            threshold_exceeded_count += 1;
826                        }
827                    }
828                    continue;
829                }
830            } else {
831                // Can't find matching old row - this is an insert, not an update
832                // Fall back to regular updates
833                debug!(
834                    "Selective update skipped for subscription {:?}: cannot match row by PK (pk_columns={:?})",
835                    subscription_id,
836                    pk_columns
837                );
838                if let Some(metrics) = self.observability.metrics() {
839                    metrics.record_partial_update_fallback("pk_mismatch");
840                    metrics.record_selective_update_decision("sent_full", Some("pk_mismatch"));
841                }
842                return false;
843            }
844        }
845
846        // Record threshold exceeded fallbacks if any
847        if threshold_exceeded_count > 0 {
848            debug!(
849                "Selective update: {} rows exceeded change threshold for subscription {:?}",
850                threshold_exceeded_count,
851                subscription_id
852            );
853            if let Some(metrics) = self.observability.metrics() {
854                for _ in 0..threshold_exceeded_count {
855                    metrics.record_partial_update_fallback("threshold_exceeded");
856                    metrics.record_selective_update_decision("sent_full", Some("threshold_exceeded"));
857                }
858            }
859        }
860
861        // If no partial updates were generated, nothing changed
862        if partial_updates.is_empty() {
863            debug!(
864                "Selective update skipped for subscription {:?}: no column changes detected",
865                subscription_id
866            );
867            if let Some(metrics) = self.observability.metrics() {
868                metrics.record_partial_update_fallback("no_changes");
869                metrics.record_selective_update_decision("skipped", Some("no_changes"));
870            }
871            return false;
872        }
873
874        // Calculate and record metrics before sending
875        if let Some(metrics) = self.observability.metrics() {
876            let total_columns = if !new_wire.is_empty() { new_wire[0].len() as u64 } else { 0 };
877            let mut total_columns_sent: u64 = 0;
878            let mut total_bytes_full: u64 = 0;
879            let mut total_bytes_partial: u64 = 0;
880
881            for (partial, new_row) in partial_updates.iter().zip(new_wire.iter()) {
882                // Count columns sent in this partial update
883                total_columns_sent += partial.present_column_count() as u64;
884
885                // Estimate bytes for full row vs partial update
886                let full_row_bytes: u64 = new_row
887                    .iter()
888                    .map(|v| v.as_ref().map(|b| b.len() as u64).unwrap_or(0) + 4) // value + length prefix
889                    .sum();
890                let partial_bytes: u64 = partial
891                    .values
892                    .iter()
893                    .map(|v| v.as_ref().map(|b| b.len() as u64).unwrap_or(0) + 4)
894                    .sum::<u64>()
895                    + partial.column_mask.len() as u64
896                    + 2; // mask + total_columns header
897
898                total_bytes_full += full_row_bytes;
899                total_bytes_partial += partial_bytes;
900            }
901
902            // Record column efficiency metrics
903            let total_possible = total_columns * partial_updates.len() as u64;
904            metrics.record_selective_update_columns(total_columns_sent, total_possible);
905
906            // Record bytes saved
907            if total_bytes_full > total_bytes_partial {
908                metrics.record_partial_update_bytes_saved(total_bytes_full - total_bytes_partial);
909            }
910
911            // Record successful selective update decision for each partial update
912            for _ in 0..partial_updates.len() {
913                metrics.record_selective_update_decision("sent_partial", None);
914            }
915        }
916
917        // Send the partial updates
918        if let Err(e) = self.send_subscription_partial_data(subscription_id, partial_updates).await
919        {
920            warn!("Failed to send selective updates: {}", e);
921            return false;
922        }
923
924        // Record successful partial update sent
925        if let Some(metrics) = self.observability.metrics() {
926            metrics.record_partial_update_sent();
927        }
928
929        debug!(
930            "Sent selective column update (0xF7) for subscription {:?}",
931            subscription_id
932        );
933        true
934    }
935
936    /// Execute a SQL query
937    async fn execute_query(&mut self, query: &str) -> Result<()> {
938        let session = self.session.as_mut().ok_or_else(|| anyhow::anyhow!("No session"))?;
939
940        // Handle empty query
941        if query.trim().is_empty() {
942            self.send_empty_query_response().await?;
943            let txn_status = self.get_transaction_status();
944            self.send_ready_for_query(txn_status).await?;
945            return Ok(());
946        }
947
948        // Track query execution time
949        let query_start = Instant::now();
950
951        // Execute query (now async due to shared database locking)
952        match session.execute(query).await {
953            Ok(result) => {
954                let query_duration = query_start.elapsed();
955                let stmt_type = result.statement_type();
956                let rows_affected = result.rows_affected();
957
958                // Record metrics
959                if let Some(metrics) = self.observability.metrics() {
960                    metrics.record_query(query_duration, stmt_type, true, rows_affected);
961                }
962
963                // Check if this was a mutation that might affect subscriptions
964                let is_mutation = matches!(
965                    &result,
966                    ExecutionResult::Insert { .. }
967                        | ExecutionResult::Update { .. }
968                        | ExecutionResult::Delete { .. }
969                );
970
971                self.send_query_result(result).await?;
972
973                // Notify affected subscriptions after mutations
974                if is_mutation {
975                    // First, notify local subscriptions (same connection)
976                    self.notify_affected_subscriptions(query).await;
977
978                    // Then, broadcast to other connections for cross-connection notifications
979                    self.broadcast_mutation(query);
980                }
981
982                // Return appropriate transaction status
983                let txn_status = self.get_transaction_status();
984                self.send_ready_for_query(txn_status).await?;
985                Ok(())
986            }
987
988            Err(e) => {
989                error!("Query error: {}", e);
990
991                // Record error metric
992                if let Some(metrics) = self.observability.metrics() {
993                    metrics.record_query_error("execution_error", None);
994                }
995
996                self.send_error_response(&format!("{}", e)).await?;
997
998                // If in transaction and error occurred, report failed transaction state
999                let txn_status = if self.session.as_ref().is_some_and(|s| s.in_transaction()) {
1000                    TransactionStatus::FailedTransaction
1001                } else {
1002                    TransactionStatus::Idle
1003                };
1004                self.send_ready_for_query(txn_status).await?;
1005                Ok(())
1006            }
1007        }
1008    }
1009
1010    /// Get the current transaction status for the session
1011    fn get_transaction_status(&self) -> TransactionStatus {
1012        if self.session.as_ref().is_some_and(|s| s.in_transaction()) {
1013            TransactionStatus::InTransaction
1014        } else {
1015            TransactionStatus::Idle
1016        }
1017    }
1018
1019    /// Handle a subscription request
1020    ///
1021    /// Parses the query, extracts table dependencies, executes the query,
1022    /// registers the subscription, and sends the initial data to the client.
1023    ///
1024    /// # Arguments
1025    ///
1026    /// * `query` - The SQL SELECT query to subscribe to
1027    /// * `_params` - Parameter values for parameterized queries (unused for now)
1028    /// * `filter` - Optional filter expression (SQL WHERE clause) to apply to updates
1029    async fn handle_subscribe(
1030        &mut self,
1031        query: &str,
1032        _params: Vec<Option<Vec<u8>>>,
1033        filter: Option<String>,
1034        selective_updates_config: Option<SelectiveUpdatesConfig>,
1035    ) -> Result<()> {
1036        let session = self.session.as_mut().ok_or_else(|| anyhow::anyhow!("No session"))?;
1037
1038        // Parse the query to extract table dependencies
1039        let parsed = match vibesql_parser::Parser::parse_sql(query) {
1040            Ok(stmt) => stmt,
1041            Err(e) => {
1042                // Send subscription error with a dummy subscription ID (query failed before registration)
1043                let error_id = [0u8; 16];
1044                self.send_subscription_error(&error_id, &format!("Parse error: {}", e)).await?;
1045                return Ok(());
1046            }
1047        };
1048
1049        // Validate the filter expression if provided
1050        if let Some(ref filter_str) = filter {
1051            if let Err(e) = vibesql_parser::arena_parser::parse_expression_to_owned(filter_str) {
1052                let error_id = [0u8; 16];
1053                self.send_subscription_error(&error_id, &format!("Filter parse error: {}", e))
1054                    .await?;
1055                return Ok(());
1056            }
1057        }
1058
1059        // Extract table dependencies from the query
1060        let table_dependencies = table_extractor::extract_tables_from_statement(&parsed);
1061
1062        // Detect primary key columns for selective updates
1063        // This enables bandwidth-efficient delta updates by knowing which columns identify rows
1064        let pk_detection = {
1065            let db = session.shared_database().read().await;
1066            detect_pk_columns_from_stmt(&parsed, &db)
1067        };
1068        if pk_detection.confident {
1069            debug!(
1070                "PK detection confident for subscription: pk_columns={:?}, tables={:?}",
1071                pk_detection.pk_column_indices,
1072                pk_detection.tables
1073            );
1074        } else {
1075            debug!(
1076                "PK detection not confident for subscription: reason={}, pk_columns={:?}, tables={:?}, query={}",
1077                pk_detection.reason.map(|r| r.to_string()).unwrap_or_else(|| "unknown".to_string()),
1078                pk_detection.pk_column_indices,
1079                pk_detection.tables,
1080                query
1081            );
1082        }
1083
1084        // Record PK detection metrics
1085        if let Some(metrics) = self.observability.metrics() {
1086            if pk_detection.confident {
1087                metrics.record_pk_detection("confident", None);
1088            } else {
1089                // Determine reason for non-confidence based on detection results
1090                let reason = if pk_detection.tables.is_empty() {
1091                    "no_table"
1092                } else if pk_detection.tables.len() > 1 {
1093                    "join_query"
1094                } else if pk_detection.pk_column_indices == vec![0] {
1095                    // Default fallback - could be multiple reasons
1096                    "pk_not_in_result"
1097                } else {
1098                    "unknown"
1099                };
1100                metrics.record_pk_detection("not_confident", Some(reason));
1101            }
1102        }
1103
1104        // Generate a wire subscription ID (UUID) for the wire protocol
1105        let wire_subscription_id = *uuid::Uuid::new_v4().as_bytes();
1106
1107        // Create a dummy channel - wire protocol sends data directly through TCP socket,
1108        // not through the subscription manager's channel-based notification system
1109        let (notify_tx, _notify_rx) = tokio::sync::mpsc::channel(1);
1110
1111        // Register the subscription with the global subscription manager
1112        if let Err(e) = self.subscription_manager.subscribe_for_connection(
1113            query.to_string(),
1114            notify_tx,
1115            self.connection_id.clone(),
1116            wire_subscription_id,
1117            table_dependencies.clone(),
1118            filter.clone(),
1119        ) {
1120            // Send subscription error with a dummy subscription ID (subscription failed before registration)
1121            let error_id = [0u8; 16];
1122            self.send_subscription_error(&error_id, &format!("{}", e)).await?;
1123            return Ok(());
1124        }
1125
1126        // Track the new subscription in metrics
1127        if let Some(metrics) = self.observability.metrics() {
1128            metrics.increment_subscriptions_active();
1129        }
1130
1131        // Store detected PK columns in the subscription for selective updates
1132        // Track selective-eligible subscriptions in metrics
1133        let newly_eligible = self.subscription_manager.update_pk_columns_with_eligibility_by_wire_id(
1134            &wire_subscription_id,
1135            pk_detection.pk_column_indices.clone(),
1136            pk_detection.confident,
1137        );
1138        if newly_eligible {
1139            if let Some(metrics) = self.observability.metrics() {
1140                metrics.increment_selective_eligible();
1141            }
1142        }
1143
1144        // Apply per-subscription selective updates override if provided
1145        if let Some(wire_config) = selective_updates_config {
1146            // Convert wire protocol config to SelectiveColumnConfig
1147            // Merge with server defaults for any unspecified fields
1148            let server_config = &self.config.subscriptions.selective_updates;
1149
1150            let override_config = SelectiveColumnConfig {
1151                enabled: wire_config.enabled.unwrap_or(server_config.enabled),
1152                pk_columns: pk_detection.pk_column_indices.clone(), // Use detected PK columns
1153                min_changed_columns: wire_config
1154                    .min_changed_columns
1155                    .unwrap_or(server_config.min_changed_columns),
1156                max_changed_columns_ratio: wire_config
1157                    .max_changed_columns_ratio
1158                    .unwrap_or(server_config.max_changed_columns_ratio),
1159            };
1160
1161            self.subscription_manager.set_selective_updates_override_by_wire_id(
1162                &wire_subscription_id,
1163                override_config,
1164            );
1165        }
1166
1167        // Execute the query to get initial data
1168        match session.execute(query).await {
1169            Ok(ExecutionResult::Select { rows, columns }) => {
1170                // Build filter if present
1171                let filter_opt = filter.as_ref().and_then(|f| {
1172                    let col_names: Vec<String> = columns.iter().map(|c| c.name.clone()).collect();
1173                    SubscriptionFilter::new(f, &col_names).ok()
1174                });
1175
1176                // Filter rows if filter is present, then convert to Row format
1177                let result_rows: Vec<Row> = if let Some(ref flt) = filter_opt {
1178                    rows.iter()
1179                        .filter(|row| flt.matches(&row.values))
1180                        .map(|r| Row { values: r.values.clone() })
1181                        .collect()
1182                } else {
1183                    rows.iter().map(|r| Row { values: r.values.clone() }).collect()
1184                };
1185
1186                // Compute hash and store result for future delta computation
1187                let result_hash = hash_rows(&result_rows);
1188                self.subscription_manager.update_result_by_wire_id(
1189                    &wire_subscription_id,
1190                    result_hash,
1191                    result_rows.clone(),
1192                );
1193
1194                // Convert rows to wire format
1195                let wire_rows: Vec<Vec<Option<Vec<u8>>>> = result_rows
1196                    .iter()
1197                    .map(|row| {
1198                        row.values.iter().map(|v| Some(v.to_string().as_bytes().to_vec())).collect()
1199                    })
1200                    .collect();
1201
1202                // Send initial subscription data
1203                self.send_subscription_data(
1204                    &wire_subscription_id,
1205                    SubscriptionUpdateType::Full,
1206                    wire_rows,
1207                )
1208                .await?;
1209            }
1210            Ok(_) => {
1211                // Non-SELECT query - send error and remove subscription
1212                let was_selective_eligible = self.subscription_manager.unsubscribe_by_wire_id(&wire_subscription_id);
1213                if was_selective_eligible {
1214                    if let Some(metrics) = self.observability.metrics() {
1215                        metrics.decrement_selective_eligible();
1216                    }
1217                }
1218                self.send_subscription_error(
1219                    &wire_subscription_id,
1220                    "Only SELECT queries can be subscribed to",
1221                )
1222                .await?;
1223            }
1224            Err(e) => {
1225                // Query execution failed - remove subscription and send error
1226                let was_selective_eligible = self.subscription_manager.unsubscribe_by_wire_id(&wire_subscription_id);
1227                if was_selective_eligible {
1228                    if let Some(metrics) = self.observability.metrics() {
1229                        metrics.decrement_selective_eligible();
1230                    }
1231                }
1232                self.send_subscription_error(&wire_subscription_id, &format!("Execution error: {}", e))
1233                    .await?;
1234            }
1235        }
1236
1237        Ok(())
1238    }
1239
1240    /// Notify affected subscriptions after a mutation (INSERT/UPDATE/DELETE)
1241    ///
1242    /// This method parses the mutation query to extract the affected table,
1243    /// finds all subscriptions that depend on that table, re-executes their
1244    /// queries, and sends updated results to the client.
1245    /// Supports delta updates to reduce network bandwidth.
1246    /// Supports optional filtering expressions to send only matching rows.
1247    #[allow(clippy::type_complexity)]
1248    async fn notify_affected_subscriptions(&mut self, mutation_query: &str) {
1249        // Parse the mutation query to extract affected tables
1250        let affected_tables = match vibesql_parser::Parser::parse_sql(mutation_query) {
1251            Ok(stmt) => extract_table_refs(&stmt),
1252            Err(e) => {
1253                debug!("Failed to parse mutation query for subscription update: {}", e);
1254                return;
1255            }
1256        };
1257
1258        if affected_tables.is_empty() {
1259            return;
1260        }
1261
1262        // Collect subscriptions for THIS connection that need updating
1263        let subscriptions_to_update: Vec<([u8; 16], String, u64, Option<Vec<Row>>, Option<String>)> =
1264            affected_tables
1265                .iter()
1266                .flat_map(|table| {
1267                    self.subscription_manager
1268                        .get_affected_subscriptions_for_connection(table, &self.connection_id)
1269                })
1270                .collect();
1271
1272        if subscriptions_to_update.is_empty() {
1273            return;
1274        }
1275
1276        // De-duplicate subscriptions (a subscription may depend on multiple affected tables)
1277        let mut seen = std::collections::HashSet::new();
1278        let unique_subscriptions: Vec<_> = subscriptions_to_update
1279            .into_iter()
1280            .filter(|(id, _, _, _, _)| seen.insert(*id))
1281            .collect();
1282
1283        debug!(
1284            "Notifying {} subscriptions after mutation affecting tables: {:?}",
1285            unique_subscriptions.len(),
1286            affected_tables
1287        );
1288
1289        // Re-execute each subscription query and send updates
1290        for (subscription_id, query, last_hash, last_result, filter) in unique_subscriptions {
1291            if let Some(session) = &mut self.session {
1292                match session.execute(&query).await {
1293                    Ok(ExecutionResult::Select { rows, columns }) => {
1294                        // Build filter if present
1295                        let filter_opt = filter.as_ref().and_then(|f| {
1296                            let col_names: Vec<String> =
1297                                columns.iter().map(|c| c.name.clone()).collect();
1298                            SubscriptionFilter::new(f, &col_names).ok()
1299                        });
1300
1301                        // Filter rows if filter is present, then convert to Row format
1302                        let new_rows: Vec<Row> = if let Some(ref flt) = filter_opt {
1303                            rows.iter()
1304                                .filter(|row| flt.matches(&row.values))
1305                                .map(|r| Row { values: r.values.clone() })
1306                                .collect()
1307                        } else {
1308                            rows.iter().map(|r| Row { values: r.values.clone() }).collect()
1309                        };
1310
1311                        // Compute hash for change detection
1312                        let new_hash = hash_rows(&new_rows);
1313
1314                        // Skip if results haven't changed
1315                        if new_hash == last_hash {
1316                            debug!(
1317                                "Same-connection update: results unchanged for subscription {:?}",
1318                                subscription_id
1319                            );
1320                            continue;
1321                        }
1322
1323                        // Determine whether to send delta or full update
1324                        if let Some(ref old_rows) = last_result {
1325                            // First, try selective column updates (0xF7) using effective config
1326                            if self
1327                                .try_send_selective_updates(
1328                                    &subscription_id,
1329                                    old_rows,
1330                                    &new_rows,
1331                                )
1332                                .await
1333                            {
1334                                // Selective updates sent successfully - update stored result
1335                                self.subscription_manager.update_result_by_wire_id(
1336                                    &subscription_id,
1337                                    new_hash,
1338                                    new_rows,
1339                                );
1340                                continue;
1341                            }
1342
1343                            // Fall back to delta updates using PK columns
1344                            let pk_columns = self
1345                                .subscription_manager
1346                                .get_pk_columns_by_wire_id(&subscription_id);
1347                            if let Some(delta) = compute_delta_with_pk(
1348                                SubscriptionId::default(),
1349                                old_rows,
1350                                &new_rows,
1351                                &pk_columns,
1352                            ) {
1353                                // Send delta updates
1354                                if let Err(e) =
1355                                    self.send_delta_updates(&subscription_id, &delta).await
1356                                {
1357                                    warn!("Failed to send same-connection delta update: {}", e);
1358                                }
1359
1360                                // Log delta statistics
1361                                if let SubscriptionUpdate::Delta {
1362                                    ref inserts,
1363                                    ref updates,
1364                                    ref deletes,
1365                                    ..
1366                                } = delta
1367                                {
1368                                    debug!(
1369                                        "Same-connection delta update sent: {} inserts, {} updates, {} deletes for subscription {:?}",
1370                                        inserts.len(),
1371                                        updates.len(),
1372                                        deletes.len(),
1373                                        subscription_id
1374                                    );
1375                                }
1376                            } else {
1377                                // No delta computed - send full update
1378                                let wire_rows = Self::rows_to_wire_format(&new_rows);
1379                                if let Err(e) = self
1380                                    .send_subscription_data(
1381                                        &subscription_id,
1382                                        SubscriptionUpdateType::Full,
1383                                        wire_rows,
1384                                    )
1385                                    .await
1386                                {
1387                                    warn!("Failed to send same-connection full update: {}", e);
1388                                }
1389                            }
1390                        } else {
1391                            // No previous results - send full update
1392                            let wire_rows = Self::rows_to_wire_format(&new_rows);
1393                            if let Err(e) = self
1394                                .send_subscription_data(
1395                                    &subscription_id,
1396                                    SubscriptionUpdateType::Full,
1397                                    wire_rows,
1398                                )
1399                                .await
1400                            {
1401                                warn!("Failed to send same-connection full update: {}", e);
1402                            }
1403                        }
1404
1405                        // Update stored result for next delta computation
1406                        self.subscription_manager.update_result_by_wire_id(
1407                            &subscription_id,
1408                            new_hash,
1409                            new_rows,
1410                        );
1411                    }
1412                    Ok(_) => {
1413                        // Non-SELECT result - shouldn't happen for a subscription query
1414                        warn!("Subscription query returned non-SELECT result");
1415                    }
1416                    Err(e) => {
1417                        // Query failed - send error to subscriber
1418                        if let Err(send_err) = self
1419                            .send_subscription_error(&subscription_id, &format!("Query error: {}", e))
1420                            .await
1421                        {
1422                            warn!("Failed to send subscription error: {}", send_err);
1423                        }
1424                    }
1425                }
1426            }
1427        }
1428    }
1429
1430    /// Broadcast a mutation event to all connections
1431    ///
1432    /// This is called after a mutation (INSERT/UPDATE/DELETE) is executed to notify
1433    /// other connections that may have subscriptions on the affected tables.
1434    fn broadcast_mutation(&self, mutation_query: &str) {
1435        // Parse the mutation query to extract affected tables
1436        let affected_tables = match vibesql_parser::Parser::parse_sql(mutation_query) {
1437            Ok(stmt) => extract_table_refs(&stmt),
1438            Err(e) => {
1439                debug!("Failed to parse mutation query for broadcast: {}", e);
1440                return;
1441            }
1442        };
1443
1444        if affected_tables.is_empty() {
1445            return;
1446        }
1447
1448        debug!("Broadcasting mutation affecting tables: {:?}", affected_tables);
1449
1450        // Broadcast the notification to all connections
1451        // Note: This is fire-and-forget. If the channel is full or has no receivers,
1452        // it's okay - we've already notified our own connection's subscriptions.
1453        let notification = TableMutationNotification { affected_tables };
1454        if let Err(e) = self.mutation_broadcast_tx.send(notification) {
1455            // No receivers or channel issue - this is fine, just log at debug level
1456            debug!("Failed to broadcast mutation notification: {}", e);
1457        }
1458    }
1459
1460    /// Send query result to client
1461    async fn send_query_result(&mut self, result: ExecutionResult) -> Result<()> {
1462        match result {
1463            ExecutionResult::Select { rows, columns } => {
1464                // Send row description
1465                let fields: Vec<FieldDescription> = columns
1466                    .iter()
1467                    .enumerate()
1468                    .map(|(i, col)| FieldDescription {
1469                        name: col.name.clone(),
1470                        table_oid: 0,
1471                        column_attr_number: i as i16,
1472                        data_type_oid: 25,  // TEXT type
1473                        data_type_size: -1, // Variable length
1474                        type_modifier: -1,
1475                        format_code: 0, // Text format
1476                    })
1477                    .collect();
1478
1479                self.send_row_description(fields).await?;
1480
1481                // Save row count before consuming
1482                let row_count = rows.len();
1483
1484                // Send data rows
1485                for row in rows {
1486                    let values: Vec<Option<Vec<u8>>> = row
1487                        .values
1488                        .iter()
1489                        .map(|v: &vibesql_types::SqlValue| Some(v.to_string().as_bytes().to_vec()))
1490                        .collect();
1491
1492                    self.send_data_row(values).await?;
1493                }
1494
1495                // Send command complete
1496                self.send_command_complete(&format!("SELECT {}", row_count)).await?;
1497            }
1498
1499            ExecutionResult::Insert { rows_affected } => {
1500                self.send_command_complete(&format!("INSERT 0 {}", rows_affected)).await?;
1501            }
1502
1503            ExecutionResult::Update { rows_affected } => {
1504                self.send_command_complete(&format!("UPDATE {}", rows_affected)).await?;
1505            }
1506
1507            ExecutionResult::Delete { rows_affected } => {
1508                self.send_command_complete(&format!("DELETE {}", rows_affected)).await?;
1509            }
1510
1511            ExecutionResult::CreateTable
1512            | ExecutionResult::CreateIndex
1513            | ExecutionResult::CreateView => {
1514                self.send_command_complete("CREATE TABLE").await?;
1515            }
1516
1517            ExecutionResult::DropTable | ExecutionResult::DropIndex | ExecutionResult::DropView => {
1518                self.send_command_complete("DROP TABLE").await?;
1519            }
1520
1521            ExecutionResult::Analyze { tables_analyzed } => {
1522                self.send_command_complete(&format!("ANALYZE {}", tables_analyzed)).await?;
1523            }
1524
1525            ExecutionResult::Other { message } => {
1526                self.send_command_complete(&message).await?;
1527            }
1528
1529            ExecutionResult::Prepare { statement_name } => {
1530                self.send_command_complete(&format!("PREPARE {}", statement_name)).await?;
1531            }
1532
1533            ExecutionResult::Deallocate { statement_name } => {
1534                self.send_command_complete(&format!("DEALLOCATE {}", statement_name)).await?;
1535            }
1536
1537            ExecutionResult::DeclareCursor { cursor_name } => {
1538                self.send_command_complete(&format!("DECLARE CURSOR {}", cursor_name)).await?;
1539            }
1540
1541            ExecutionResult::OpenCursor { cursor_name } => {
1542                self.send_command_complete(&format!("OPEN {}", cursor_name)).await?;
1543            }
1544
1545            ExecutionResult::Fetch { rows, columns } => {
1546                // Send row description
1547                let fields: Vec<FieldDescription> = columns
1548                    .iter()
1549                    .enumerate()
1550                    .map(|(i, col)| FieldDescription {
1551                        name: col.name.clone(),
1552                        table_oid: 0,
1553                        column_attr_number: i as i16,
1554                        data_type_oid: 25,  // TEXT type
1555                        data_type_size: -1, // Variable length
1556                        type_modifier: -1,
1557                        format_code: 0, // Text format
1558                    })
1559                    .collect();
1560
1561                self.send_row_description(fields).await?;
1562
1563                // Save row count before consuming
1564                let row_count = rows.len();
1565
1566                // Send data rows
1567                for row in rows {
1568                    let values: Vec<Option<Vec<u8>>> = row
1569                        .values
1570                        .iter()
1571                        .map(|v: &vibesql_types::SqlValue| Some(v.to_string().as_bytes().to_vec()))
1572                        .collect();
1573
1574                    self.send_data_row(values).await?;
1575                }
1576
1577                // Send command complete
1578                self.send_command_complete(&format!("FETCH {}", row_count)).await?;
1579            }
1580
1581            ExecutionResult::CloseCursor { cursor_name } => {
1582                self.send_command_complete(&format!("CLOSE {}", cursor_name)).await?;
1583            }
1584
1585            ExecutionResult::Begin => {
1586                self.send_command_complete("BEGIN").await?;
1587            }
1588
1589            ExecutionResult::Commit => {
1590                self.send_command_complete("COMMIT").await?;
1591            }
1592
1593            ExecutionResult::Rollback => {
1594                self.send_command_complete("ROLLBACK").await?;
1595            }
1596        }
1597
1598        Ok(())
1599    }
1600
1601    // Message sending methods
1602
1603    async fn send_authentication_ok(&mut self) -> Result<()> {
1604        BackendMessage::AuthenticationOk.encode(&mut self.write_buf);
1605        self.flush_write_buffer().await
1606    }
1607
1608    async fn send_cleartext_password_request(&mut self) -> Result<()> {
1609        BackendMessage::AuthenticationCleartextPassword.encode(&mut self.write_buf);
1610        self.flush_write_buffer().await
1611    }
1612
1613    async fn send_md5_password_request(&mut self, salt: &[u8; 4]) -> Result<()> {
1614        BackendMessage::AuthenticationMD5Password { salt: *salt }.encode(&mut self.write_buf);
1615        self.flush_write_buffer().await
1616    }
1617
1618    async fn send_parameter_status(&mut self, name: &str, value: &str) -> Result<()> {
1619        BackendMessage::ParameterStatus { name: name.to_string(), value: value.to_string() }
1620            .encode(&mut self.write_buf);
1621        self.flush_write_buffer().await
1622    }
1623
1624    async fn send_backend_key_data(&mut self) -> Result<()> {
1625        BackendMessage::BackendKeyData {
1626            process_id: std::process::id() as i32,
1627            secret_key: 12345, // TODO: Generate random secret
1628        }
1629        .encode(&mut self.write_buf);
1630        self.flush_write_buffer().await
1631    }
1632
1633    async fn send_ready_for_query(&mut self, status: TransactionStatus) -> Result<()> {
1634        BackendMessage::ReadyForQuery { status }.encode(&mut self.write_buf);
1635        self.flush_write_buffer().await
1636    }
1637
1638    async fn send_row_description(&mut self, fields: Vec<FieldDescription>) -> Result<()> {
1639        BackendMessage::RowDescription { fields }.encode(&mut self.write_buf);
1640        self.flush_write_buffer().await
1641    }
1642
1643    async fn send_data_row(&mut self, values: Vec<Option<Vec<u8>>>) -> Result<()> {
1644        BackendMessage::DataRow { values }.encode(&mut self.write_buf);
1645        self.flush_write_buffer().await
1646    }
1647
1648    async fn send_command_complete(&mut self, tag: &str) -> Result<()> {
1649        BackendMessage::CommandComplete { tag: tag.to_string() }.encode(&mut self.write_buf);
1650        self.flush_write_buffer().await
1651    }
1652
1653    async fn send_error_response(&mut self, message: &str) -> Result<()> {
1654        let mut fields = HashMap::new();
1655        fields.insert(b'S', "ERROR".to_string());
1656        fields.insert(b'C', "XX000".to_string()); // internal_error
1657        fields.insert(b'M', message.to_string());
1658
1659        BackendMessage::ErrorResponse { fields }.encode(&mut self.write_buf);
1660        self.flush_write_buffer().await
1661    }
1662
1663    async fn send_empty_query_response(&mut self) -> Result<()> {
1664        BackendMessage::EmptyQueryResponse.encode(&mut self.write_buf);
1665        self.flush_write_buffer().await
1666    }
1667
1668    /// Send subscription data message (initial results or updates)
1669    async fn send_subscription_data(
1670        &mut self,
1671        subscription_id: &[u8; 16],
1672        update_type: SubscriptionUpdateType,
1673        rows: Vec<Vec<Option<Vec<u8>>>>,
1674    ) -> Result<()> {
1675        // Record subscription update metrics
1676        if let Some(metrics) = self.observability.metrics() {
1677            let type_str = match update_type {
1678                SubscriptionUpdateType::Full => "full",
1679                SubscriptionUpdateType::DeltaInsert => "delta_insert",
1680                SubscriptionUpdateType::DeltaUpdate => "delta_update",
1681                SubscriptionUpdateType::DeltaDelete => "delta_delete",
1682                SubscriptionUpdateType::SelectiveUpdate => "selective",
1683            };
1684            metrics.record_subscription_update(type_str, rows.len() as u64);
1685
1686            // Record full update sent for efficiency stats
1687            if matches!(update_type, SubscriptionUpdateType::Full) {
1688                metrics.record_full_update_sent();
1689            }
1690        }
1691
1692        BackendMessage::SubscriptionData { subscription_id: *subscription_id, update_type, rows }
1693            .encode(&mut self.write_buf);
1694        self.flush_write_buffer().await
1695    }
1696
1697    /// Send subscription partial data message (for selective column updates)
1698    ///
1699    /// Uses the SubscriptionPartialData (0xF7) message format to send only
1700    /// changed columns plus primary key columns, reducing wire traffic.
1701    async fn send_subscription_partial_data(
1702        &mut self,
1703        subscription_id: &[u8; 16],
1704        rows: Vec<PartialRowUpdate>,
1705    ) -> Result<()> {
1706        // Record subscription update metrics
1707        if let Some(metrics) = self.observability.metrics() {
1708            metrics.record_subscription_update("selective", rows.len() as u64);
1709        }
1710
1711        BackendMessage::SubscriptionPartialData { subscription_id: *subscription_id, rows }
1712            .encode(&mut self.write_buf);
1713        self.flush_write_buffer().await
1714    }
1715
1716    /// Send subscription error message
1717    async fn send_subscription_error(
1718        &mut self,
1719        subscription_id: &[u8; 16],
1720        message: &str,
1721    ) -> Result<()> {
1722        BackendMessage::SubscriptionError {
1723            subscription_id: *subscription_id,
1724            message: message.to_string(),
1725        }
1726        .encode(&mut self.write_buf);
1727        self.flush_write_buffer().await
1728    }
1729
1730    // I/O methods
1731
1732    async fn read_message(&mut self) -> Result<()> {
1733        let n = self.read_half.read_buf(&mut self.read_buf).await?;
1734        if n == 0 {
1735            return Err(anyhow::anyhow!("Connection closed"));
1736        }
1737        Ok(())
1738    }
1739
1740    async fn flush_write_buffer(&mut self) -> Result<()> {
1741        self.write_half.write_all(&self.write_buf).await?;
1742        self.write_half.flush().await?;
1743        self.write_buf.clear();
1744        Ok(())
1745    }
1746}
1747
1748impl Drop for ConnectionHandler {
1749    fn drop(&mut self) {
1750        // Decrement active connection count
1751        self.active_connections.fetch_sub(1, Ordering::AcqRel);
1752
1753        // Record connection duration when connection closes
1754        if let Some(metrics) = self.observability.metrics() {
1755            metrics.record_connection_duration(self.connection_start.elapsed());
1756        }
1757    }
1758}