1use crate::auth::PasswordStore;
2use crate::config::Config;
3use crate::observability::ObservabilityProvider;
4use crate::protocol::{
5 BackendMessage, FieldDescription, FrontendMessage, SelectiveUpdatesConfig, SubscriptionUpdateType, TransactionStatus,
6};
7use crate::registry::DatabaseRegistry;
8use crate::session::{ExecutionResult, Session};
9use crate::protocol::PartialRowUpdate;
10use crate::subscription::{
11 compute_delta_with_pk, create_partial_row_update, detect_pk_columns_from_stmt,
12 extract_table_refs, filter::SubscriptionFilter, hash_rows, SelectiveColumnConfig,
13 SubscriptionId, SubscriptionManager, SubscriptionUpdate,
14};
15use crate::Row;
16use anyhow::Result;
17use bytes::BytesMut;
18use std::collections::{HashMap, HashSet};
19use std::net::SocketAddr;
20use std::sync::atomic::{AtomicUsize, Ordering};
21use std::sync::Arc;
22use std::time::Instant;
23use tokio::io::{AsyncReadExt, AsyncWriteExt};
24use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
25use tokio::net::TcpStream;
26use tokio::sync::broadcast;
27use tracing::{debug, error, info, warn};
28use vibesql_executor::cache::table_extractor;
29
30#[derive(Debug, Clone)]
33pub struct TableMutationNotification {
34 pub affected_tables: HashSet<String>,
36}
37
38pub struct ConnectionHandler {
40 read_half: OwnedReadHalf,
42 write_half: OwnedWriteHalf,
44 peer_addr: SocketAddr,
45 config: Arc<Config>,
46 observability: Arc<ObservabilityProvider>,
47 password_store: Option<Arc<PasswordStore>>,
48 read_buf: BytesMut,
49 write_buf: BytesMut,
50 session: Option<Session>,
51 connection_start: Instant,
52 active_connections: Arc<AtomicUsize>,
53 database_registry: DatabaseRegistry,
55 connection_id: String,
57 subscription_manager: Arc<SubscriptionManager>,
59 mutation_broadcast_tx: broadcast::Sender<TableMutationNotification>,
61 mutation_broadcast_rx: broadcast::Receiver<TableMutationNotification>,
63}
64
65enum ClientMessageResult {
67 Continue,
69 Terminate,
71}
72
73impl ConnectionHandler {
74 #[allow(clippy::too_many_arguments)]
76 pub fn new(
77 stream: TcpStream,
78 peer_addr: SocketAddr,
79 config: Arc<Config>,
80 observability: Arc<ObservabilityProvider>,
81 password_store: Option<Arc<PasswordStore>>,
82 active_connections: Arc<AtomicUsize>,
83 database_registry: DatabaseRegistry,
84 subscription_manager: Arc<SubscriptionManager>,
85 mutation_broadcast_tx: broadcast::Sender<TableMutationNotification>,
86 ) -> Self {
87 let (read_half, write_half) = stream.into_split();
90
91 let mutation_broadcast_rx = mutation_broadcast_tx.subscribe();
93
94 let connection_id = uuid::Uuid::new_v4().to_string();
96
97 Self {
98 read_half,
99 write_half,
100 peer_addr,
101 config,
102 observability,
103 password_store,
104 read_buf: BytesMut::with_capacity(8192),
105 write_buf: BytesMut::with_capacity(8192),
106 session: None,
107 connection_start: Instant::now(),
108 active_connections,
109 database_registry,
110 connection_id,
111 subscription_manager,
112 mutation_broadcast_tx,
113 mutation_broadcast_rx,
114 }
115 }
116
117 pub async fn handle(&mut self) -> Result<()> {
119 self.startup_handshake().await?;
121
122 self.process_queries().await?;
124
125 Ok(())
126 }
127
128 async fn startup_handshake(&mut self) -> Result<()> {
130 debug!("Starting handshake with {}", self.peer_addr);
131
132 self.read_message().await?;
134
135 let startup_msg = FrontendMessage::decode_startup(&mut self.read_buf)?;
136
137 match startup_msg {
138 Some(FrontendMessage::SSLRequest) => {
139 debug!("Received SSL request");
140 self.write_half.write_u8(b'N').await?;
142 self.write_half.flush().await?;
143
144 self.read_buf.clear();
146 self.read_message().await?;
147
148 let startup_msg = FrontendMessage::decode_startup(&mut self.read_buf)?;
149 self.handle_startup(startup_msg).await?;
150 }
151
152 Some(msg) => {
153 self.handle_startup(Some(msg)).await?;
154 }
155
156 None => {
157 return Err(anyhow::anyhow!("No startup message received"));
158 }
159 }
160
161 Ok(())
162 }
163
164 async fn handle_startup(&mut self, msg: Option<FrontendMessage>) -> Result<()> {
166 match msg {
167 Some(FrontendMessage::Startup { protocol_version, params }) => {
168 debug!("Startup: version={}, params={:?}", protocol_version, params);
169
170 let user = params.get("user").cloned().unwrap_or_else(|| "postgres".to_string());
171 let database = params.get("database").cloned().unwrap_or_else(|| user.clone());
172
173 self.authenticate(&user).await?;
175
176 let shared_db = self.database_registry.get_or_create(&database).await;
178
179 self.session = Some(Session::new(database.clone(), user.clone(), shared_db));
181
182 info!("User '{}' connected to database '{}'", user, database);
183
184 self.send_parameter_status("server_version", "14.0 (VibeSQL)").await?;
186 self.send_parameter_status("server_encoding", "UTF8").await?;
187 self.send_parameter_status("client_encoding", "UTF8").await?;
188 self.send_parameter_status("DateStyle", "ISO, MDY").await?;
189 self.send_parameter_status("TimeZone", "UTC").await?;
190
191 self.send_backend_key_data().await?;
193
194 self.send_ready_for_query(TransactionStatus::Idle).await?;
196
197 Ok(())
198 }
199
200 _ => Err(anyhow::anyhow!("Invalid startup message")),
201 }
202 }
203
204 async fn authenticate(&mut self, user: &str) -> Result<()> {
206 match self.config.auth.method.as_str() {
207 "trust" => {
208 debug!("Using trust authentication for user '{}'", user);
210 self.send_authentication_ok().await?;
211 Ok(())
212 }
213
214 "password" => {
215 debug!("Requesting cleartext password for user '{}'", user);
217 self.send_cleartext_password_request().await?;
218
219 self.read_message().await?;
221 let msg = FrontendMessage::decode(&mut self.read_buf)?;
222
223 match msg {
224 Some(FrontendMessage::Password { password }) => {
225 debug!("Received password from user '{}'", user);
226
227 if let Some(ref store) = self.password_store {
228 if store.verify_cleartext(user, &password) {
229 info!("User '{}' authenticated successfully", user);
230 self.send_authentication_ok().await?;
231 Ok(())
232 } else {
233 error!("Authentication failed for user '{}'", user);
234 Err(anyhow::anyhow!("Authentication failed"))
235 }
236 } else {
237 error!("No password store configured");
238 Err(anyhow::anyhow!("Authentication not configured"))
239 }
240 }
241 _ => {
242 error!("Expected password message, got: {:?}", msg);
243 Err(anyhow::anyhow!("Expected password message"))
244 }
245 }
246 }
247
248 "md5" => {
249 debug!("Requesting MD5 password for user '{}'", user);
251
252 use rand::Rng;
254 let salt: [u8; 4] = rand::rng().random();
255
256 self.send_md5_password_request(&salt).await?;
257
258 self.read_message().await?;
260 let msg = FrontendMessage::decode(&mut self.read_buf)?;
261
262 match msg {
263 Some(FrontendMessage::Password { password }) => {
264 debug!("Received MD5 password response from user '{}'", user);
265
266 if let Some(ref store) = self.password_store {
267 if store.verify_md5(user, &password, &salt) {
268 info!("User '{}' authenticated successfully (MD5)", user);
269 self.send_authentication_ok().await?;
270 Ok(())
271 } else {
272 error!("MD5 authentication failed for user '{}'", user);
273 Err(anyhow::anyhow!("Authentication failed"))
274 }
275 } else {
276 error!("No password store configured");
277 Err(anyhow::anyhow!("Authentication not configured"))
278 }
279 }
280 _ => {
281 error!("Expected password message, got: {:?}", msg);
282 Err(anyhow::anyhow!("Expected password message"))
283 }
284 }
285 }
286
287 "scram-sha-256" => {
288 error!("SCRAM-SHA-256 authentication not yet implemented");
290 Err(anyhow::anyhow!("SCRAM-SHA-256 not implemented"))
291 }
292
293 _ => {
294 error!("Unsupported authentication method: {}", self.config.auth.method);
295 Err(anyhow::anyhow!("Unsupported authentication method"))
296 }
297 }
298 }
299
300 async fn process_queries(&mut self) -> Result<()> {
312 loop {
313 while let Some(msg) = FrontendMessage::decode(&mut self.read_buf)? {
316 match self.handle_client_message(msg).await? {
317 ClientMessageResult::Continue => {}
318 ClientMessageResult::Terminate => {
319 let (total, selective_eligible) = self.subscription_manager
320 .unsubscribe_all_for_connection(&self.connection_id);
321 if let Some(metrics) = self.observability.metrics() {
322 for _ in 0..total {
323 metrics.decrement_subscriptions_active();
324 }
325 for _ in 0..selective_eligible {
326 metrics.decrement_selective_eligible();
327 }
328 }
329 return Ok(());
330 }
331 }
332 }
333
334 tokio::select! {
341 biased; notification = self.mutation_broadcast_rx.recv() => {
345 match notification {
346 Ok(n) => {
347 if self.subscription_manager.connection_subscription_count(&self.connection_id) > 0 {
348 self.handle_cross_connection_notification(&n.affected_tables).await;
349 }
350 }
351 Err(broadcast::error::RecvError::Lagged(n)) => {
352 debug!("Missed {} broadcast notifications (lagged)", n);
353 }
354 Err(broadcast::error::RecvError::Closed) => {
355 warn!("Mutation broadcast channel closed");
356 }
357 }
358 }
359
360 read_result = self.read_half.read_buf(&mut self.read_buf) => {
362 match read_result {
363 Ok(0) => {
364 debug!("Connection closed by client");
366 break;
367 }
368 Ok(_) => {
369 }
371 Err(e) => {
372 return Err(e.into());
373 }
374 }
375 }
376 }
377 }
378
379 let (total, selective_eligible) = self.subscription_manager
381 .unsubscribe_all_for_connection(&self.connection_id);
382 if let Some(metrics) = self.observability.metrics() {
383 for _ in 0..total {
384 metrics.decrement_subscriptions_active();
385 }
386 for _ in 0..selective_eligible {
387 metrics.decrement_selective_eligible();
388 }
389 }
390
391 Ok(())
392 }
393
394 async fn handle_client_message(&mut self, msg: FrontendMessage) -> Result<ClientMessageResult> {
396 match msg {
397 FrontendMessage::Query { query } => {
398 debug!("Query: {}", query);
399 self.execute_query(&query).await?;
400 Ok(ClientMessageResult::Continue)
401 }
402
403 FrontendMessage::Subscribe { query, params, filter, selective_updates_config } => {
404 debug!("Subscribe: {} (filter: {:?}, selective_config: {:?})", query, filter, selective_updates_config);
405 self.handle_subscribe(&query, params, filter, selective_updates_config).await?;
406 Ok(ClientMessageResult::Continue)
407 }
408
409 FrontendMessage::Unsubscribe { subscription_id } => {
410 debug!("Unsubscribe: {:?}", subscription_id);
411 let was_selective_eligible = self.subscription_manager.unsubscribe_by_wire_id(&subscription_id);
412 if was_selective_eligible {
413 if let Some(metrics) = self.observability.metrics() {
414 metrics.decrement_selective_eligible();
415 }
416 }
417 Ok(ClientMessageResult::Continue)
419 }
420
421 FrontendMessage::Terminate => {
422 debug!("Client requested termination");
423 Ok(ClientMessageResult::Terminate)
424 }
425
426 msg => {
427 warn!("Unexpected message: {:?}", msg);
428 Ok(ClientMessageResult::Continue)
429 }
430 }
431 }
432
433 #[allow(clippy::type_complexity)]
441 async fn handle_cross_connection_notification(&mut self, affected_tables: &HashSet<String>) {
442 let subscriptions_to_update: Vec<([u8; 16], String, u64, Option<Vec<Row>>, Option<String>)> =
444 affected_tables
445 .iter()
446 .flat_map(|table| {
447 self.subscription_manager
448 .get_affected_subscriptions_for_connection(table, &self.connection_id)
449 })
450 .collect();
451
452 if subscriptions_to_update.is_empty() {
453 return;
454 }
455
456 let mut seen = std::collections::HashSet::new();
458 let unique_subscriptions: Vec<_> = subscriptions_to_update
459 .into_iter()
460 .filter(|(id, _, _, _, _)| seen.insert(*id))
461 .collect();
462
463 debug!(
464 "Cross-connection notification: notifying {} subscriptions for tables: {:?}",
465 unique_subscriptions.len(),
466 affected_tables
467 );
468
469 for (subscription_id, query, last_hash, last_result, filter) in unique_subscriptions {
471 if let Some(session) = &mut self.session {
472 match session.execute(&query).await {
473 Ok(ExecutionResult::Select { rows, columns }) => {
474 let filter_opt = filter.as_ref().and_then(|f| {
476 let col_names: Vec<String> =
477 columns.iter().map(|c| c.name.clone()).collect();
478 SubscriptionFilter::new(f, &col_names).ok()
479 });
480
481 let new_rows: Vec<Row> = if let Some(ref flt) = filter_opt {
483 rows.iter()
484 .filter(|row| flt.matches(&row.values))
485 .map(|r| Row { values: r.values.clone() })
486 .collect()
487 } else {
488 rows.iter().map(|r| Row { values: r.values.clone() }).collect()
489 };
490
491 let new_hash = hash_rows(&new_rows);
493
494 if new_hash == last_hash {
496 debug!(
497 "Cross-connection update: results unchanged for subscription {:?}",
498 subscription_id
499 );
500 continue;
501 }
502
503 if let Some(ref old_rows) = last_result {
505 if self
507 .try_send_selective_updates(
508 &subscription_id,
509 old_rows,
510 &new_rows,
511 )
512 .await
513 {
514 self.subscription_manager.update_result_by_wire_id(
516 &subscription_id,
517 new_hash,
518 new_rows,
519 );
520 continue;
521 }
522
523 let pk_columns = self
525 .subscription_manager
526 .get_pk_columns_by_wire_id(&subscription_id);
527 if let Some(delta) = compute_delta_with_pk(
528 SubscriptionId::default(),
529 old_rows,
530 &new_rows,
531 &pk_columns,
532 ) {
533 if let Err(e) =
535 self.send_delta_updates(&subscription_id, &delta).await
536 {
537 warn!(
538 "Failed to send cross-connection delta update: {}",
539 e
540 );
541 }
542
543 if let SubscriptionUpdate::Delta {
545 ref inserts,
546 ref updates,
547 ref deletes,
548 ..
549 } = delta
550 {
551 debug!(
552 "Cross-connection delta update sent: {} inserts, {} updates, {} deletes for subscription {:?}",
553 inserts.len(),
554 updates.len(),
555 deletes.len(),
556 subscription_id
557 );
558 }
559 } else {
560 let wire_rows = Self::rows_to_wire_format(&new_rows);
563 if let Err(e) = self
564 .send_subscription_data(
565 &subscription_id,
566 SubscriptionUpdateType::Full,
567 wire_rows,
568 )
569 .await
570 {
571 warn!("Failed to send cross-connection full update: {}", e);
572 }
573 }
574 } else {
575 debug!(
577 "Cross-connection update: no previous result, sending full update for subscription {:?}",
578 subscription_id
579 );
580 let wire_rows = Self::rows_to_wire_format(&new_rows);
581 if let Err(e) = self
582 .send_subscription_data(
583 &subscription_id,
584 SubscriptionUpdateType::Full,
585 wire_rows,
586 )
587 .await
588 {
589 warn!("Failed to send cross-connection full update: {}", e);
590 }
591 }
592
593 self.subscription_manager.update_result_by_wire_id(
595 &subscription_id,
596 new_hash,
597 new_rows,
598 );
599 }
600 Ok(_) => {
601 warn!("Subscription query returned non-SELECT result");
603 }
604 Err(e) => {
605 if let Err(send_err) = self
607 .send_subscription_error(&subscription_id, &format!("Query error: {}", e))
608 .await
609 {
610 warn!("Failed to send subscription error: {}", send_err);
611 }
612 }
613 }
614 }
615 }
616 }
617
618 async fn send_delta_updates(
624 &mut self,
625 subscription_id: &[u8; 16],
626 delta: &SubscriptionUpdate,
627 ) -> Result<()> {
628 if let SubscriptionUpdate::Delta { inserts, updates, deletes, .. } = delta {
629 if !deletes.is_empty() {
631 let wire_rows = Self::rows_to_wire_format(deletes);
632 self.send_subscription_data(
633 subscription_id,
634 SubscriptionUpdateType::DeltaDelete,
635 wire_rows,
636 )
637 .await?;
638 }
639
640 if !updates.is_empty() {
642 let config = self.subscription_manager.get_effective_selective_config_by_wire_id(
645 subscription_id,
646 &self.config.subscriptions.selective_updates,
647 );
648 let pk_columns = config.pk_columns.clone();
649
650 let mut partial_updates: Vec<PartialRowUpdate> = Vec::new();
652 let mut full_updates: Vec<Vec<Option<Vec<u8>>>> = Vec::new();
653
654 for (old_row, new_row) in updates {
655 let old_wire: Vec<Option<Vec<u8>>> = old_row
657 .values
658 .iter()
659 .map(|v| Some(v.to_string().as_bytes().to_vec()))
660 .collect();
661 let new_wire: Vec<Option<Vec<u8>>> = new_row
662 .values
663 .iter()
664 .map(|v| Some(v.to_string().as_bytes().to_vec()))
665 .collect();
666
667 if let Some(partial) =
669 create_partial_row_update(&old_wire, &new_wire, &pk_columns, &config)
670 {
671 partial_updates.push(partial);
672 } else {
673 full_updates.push(new_wire);
675 }
676 }
677
678 if !partial_updates.is_empty() {
680 self.send_subscription_partial_data(subscription_id, partial_updates).await?;
681 }
682
683 if !full_updates.is_empty() {
685 self.send_subscription_data(
686 subscription_id,
687 SubscriptionUpdateType::DeltaUpdate,
688 full_updates,
689 )
690 .await?;
691 }
692 }
693
694 if !inserts.is_empty() {
696 let wire_rows = Self::rows_to_wire_format(inserts);
697 self.send_subscription_data(
698 subscription_id,
699 SubscriptionUpdateType::DeltaInsert,
700 wire_rows,
701 )
702 .await?;
703 }
704 }
705 Ok(())
706 }
707
708 fn rows_to_wire_format(rows: &[Row]) -> Vec<Vec<Option<Vec<u8>>>> {
710 rows.iter()
711 .map(|row| {
712 row.values.iter().map(|v| Some(v.to_string().as_bytes().to_vec())).collect()
713 })
714 .collect()
715 }
716
717 async fn try_send_selective_updates(
728 &mut self,
729 subscription_id: &[u8; 16],
730 old_rows: &[Row],
731 new_rows: &[Row],
732 ) -> bool {
733 let selective_config = self.subscription_manager.get_effective_selective_config_by_wire_id(
735 subscription_id,
736 &self.config.subscriptions.selective_updates,
737 );
738
739 if !selective_config.enabled {
741 debug!(
742 "Selective update skipped for subscription {:?}: disabled in config",
743 subscription_id
744 );
745 if let Some(metrics) = self.observability.metrics() {
746 metrics.record_partial_update_fallback("disabled");
747 metrics.record_selective_update_decision("sent_full", Some("disabled"));
748 }
749 return false;
750 }
751
752 if old_rows.len() != new_rows.len() {
754 debug!(
755 "Selective update skipped for subscription {:?}: row count mismatch (old={}, new={})",
756 subscription_id,
757 old_rows.len(),
758 new_rows.len()
759 );
760 if let Some(metrics) = self.observability.metrics() {
761 metrics.record_partial_update_fallback("row_count_mismatch");
762 metrics.record_selective_update_decision("sent_full", Some("row_count_mismatch"));
763 }
764 return false;
765 }
766
767 if old_rows.is_empty() {
768 return false;
769 }
770
771 let pk_columns = &selective_config.pk_columns;
772
773 let old_wire: Vec<Vec<Option<Vec<u8>>>> = Self::rows_to_wire_format(old_rows);
775 let new_wire: Vec<Vec<Option<Vec<u8>>>> = Self::rows_to_wire_format(new_rows);
776
777 let mut pk_to_old_idx: HashMap<Vec<Option<Vec<u8>>>, usize> = HashMap::new();
779 for (idx, row) in old_wire.iter().enumerate() {
780 let pk_values: Vec<Option<Vec<u8>>> =
781 pk_columns.iter().filter_map(|&col| row.get(col).cloned()).collect();
782 pk_to_old_idx.insert(pk_values, idx);
783 }
784
785 let mut partial_updates = Vec::new();
787 let mut threshold_exceeded_count = 0u64;
788 for new_row in &new_wire {
789 let pk_values: Vec<Option<Vec<u8>>> =
791 pk_columns.iter().filter_map(|&col| new_row.get(col).cloned()).collect();
792
793 if let Some(&old_idx) = pk_to_old_idx.get(&pk_values) {
795 let old_row = &old_wire[old_idx];
796
797 if let Some(partial) =
799 create_partial_row_update(old_row, new_row, pk_columns, &selective_config)
800 {
801 let changed_count = old_row
803 .iter()
804 .zip(new_row.iter())
805 .filter(|(o, n)| o != n)
806 .count();
807 if let Some(metrics) = self.observability.metrics() {
808 metrics.record_selective_update_column_ratio(changed_count, new_row.len());
809 }
810 partial_updates.push(partial);
811 } else {
812 let changed_count = old_row
814 .iter()
815 .zip(new_row.iter())
816 .filter(|(o, n)| o != n)
817 .count();
818 if changed_count > 0 {
819 let ratio = changed_count as f64 / new_row.len() as f64;
820 if let Some(metrics) = self.observability.metrics() {
822 metrics.record_selective_update_column_ratio(changed_count, new_row.len());
823 }
824 if ratio > selective_config.max_changed_columns_ratio {
825 threshold_exceeded_count += 1;
826 }
827 }
828 continue;
829 }
830 } else {
831 debug!(
834 "Selective update skipped for subscription {:?}: cannot match row by PK (pk_columns={:?})",
835 subscription_id,
836 pk_columns
837 );
838 if let Some(metrics) = self.observability.metrics() {
839 metrics.record_partial_update_fallback("pk_mismatch");
840 metrics.record_selective_update_decision("sent_full", Some("pk_mismatch"));
841 }
842 return false;
843 }
844 }
845
846 if threshold_exceeded_count > 0 {
848 debug!(
849 "Selective update: {} rows exceeded change threshold for subscription {:?}",
850 threshold_exceeded_count,
851 subscription_id
852 );
853 if let Some(metrics) = self.observability.metrics() {
854 for _ in 0..threshold_exceeded_count {
855 metrics.record_partial_update_fallback("threshold_exceeded");
856 metrics.record_selective_update_decision("sent_full", Some("threshold_exceeded"));
857 }
858 }
859 }
860
861 if partial_updates.is_empty() {
863 debug!(
864 "Selective update skipped for subscription {:?}: no column changes detected",
865 subscription_id
866 );
867 if let Some(metrics) = self.observability.metrics() {
868 metrics.record_partial_update_fallback("no_changes");
869 metrics.record_selective_update_decision("skipped", Some("no_changes"));
870 }
871 return false;
872 }
873
874 if let Some(metrics) = self.observability.metrics() {
876 let total_columns = if !new_wire.is_empty() { new_wire[0].len() as u64 } else { 0 };
877 let mut total_columns_sent: u64 = 0;
878 let mut total_bytes_full: u64 = 0;
879 let mut total_bytes_partial: u64 = 0;
880
881 for (partial, new_row) in partial_updates.iter().zip(new_wire.iter()) {
882 total_columns_sent += partial.present_column_count() as u64;
884
885 let full_row_bytes: u64 = new_row
887 .iter()
888 .map(|v| v.as_ref().map(|b| b.len() as u64).unwrap_or(0) + 4) .sum();
890 let partial_bytes: u64 = partial
891 .values
892 .iter()
893 .map(|v| v.as_ref().map(|b| b.len() as u64).unwrap_or(0) + 4)
894 .sum::<u64>()
895 + partial.column_mask.len() as u64
896 + 2; total_bytes_full += full_row_bytes;
899 total_bytes_partial += partial_bytes;
900 }
901
902 let total_possible = total_columns * partial_updates.len() as u64;
904 metrics.record_selective_update_columns(total_columns_sent, total_possible);
905
906 if total_bytes_full > total_bytes_partial {
908 metrics.record_partial_update_bytes_saved(total_bytes_full - total_bytes_partial);
909 }
910
911 for _ in 0..partial_updates.len() {
913 metrics.record_selective_update_decision("sent_partial", None);
914 }
915 }
916
917 if let Err(e) = self.send_subscription_partial_data(subscription_id, partial_updates).await
919 {
920 warn!("Failed to send selective updates: {}", e);
921 return false;
922 }
923
924 if let Some(metrics) = self.observability.metrics() {
926 metrics.record_partial_update_sent();
927 }
928
929 debug!(
930 "Sent selective column update (0xF7) for subscription {:?}",
931 subscription_id
932 );
933 true
934 }
935
936 async fn execute_query(&mut self, query: &str) -> Result<()> {
938 let session = self.session.as_mut().ok_or_else(|| anyhow::anyhow!("No session"))?;
939
940 if query.trim().is_empty() {
942 self.send_empty_query_response().await?;
943 let txn_status = self.get_transaction_status();
944 self.send_ready_for_query(txn_status).await?;
945 return Ok(());
946 }
947
948 let query_start = Instant::now();
950
951 match session.execute(query).await {
953 Ok(result) => {
954 let query_duration = query_start.elapsed();
955 let stmt_type = result.statement_type();
956 let rows_affected = result.rows_affected();
957
958 if let Some(metrics) = self.observability.metrics() {
960 metrics.record_query(query_duration, stmt_type, true, rows_affected);
961 }
962
963 let is_mutation = matches!(
965 &result,
966 ExecutionResult::Insert { .. }
967 | ExecutionResult::Update { .. }
968 | ExecutionResult::Delete { .. }
969 );
970
971 self.send_query_result(result).await?;
972
973 if is_mutation {
975 self.notify_affected_subscriptions(query).await;
977
978 self.broadcast_mutation(query);
980 }
981
982 let txn_status = self.get_transaction_status();
984 self.send_ready_for_query(txn_status).await?;
985 Ok(())
986 }
987
988 Err(e) => {
989 error!("Query error: {}", e);
990
991 if let Some(metrics) = self.observability.metrics() {
993 metrics.record_query_error("execution_error", None);
994 }
995
996 self.send_error_response(&format!("{}", e)).await?;
997
998 let txn_status = if self.session.as_ref().is_some_and(|s| s.in_transaction()) {
1000 TransactionStatus::FailedTransaction
1001 } else {
1002 TransactionStatus::Idle
1003 };
1004 self.send_ready_for_query(txn_status).await?;
1005 Ok(())
1006 }
1007 }
1008 }
1009
1010 fn get_transaction_status(&self) -> TransactionStatus {
1012 if self.session.as_ref().is_some_and(|s| s.in_transaction()) {
1013 TransactionStatus::InTransaction
1014 } else {
1015 TransactionStatus::Idle
1016 }
1017 }
1018
1019 async fn handle_subscribe(
1030 &mut self,
1031 query: &str,
1032 _params: Vec<Option<Vec<u8>>>,
1033 filter: Option<String>,
1034 selective_updates_config: Option<SelectiveUpdatesConfig>,
1035 ) -> Result<()> {
1036 let session = self.session.as_mut().ok_or_else(|| anyhow::anyhow!("No session"))?;
1037
1038 let parsed = match vibesql_parser::Parser::parse_sql(query) {
1040 Ok(stmt) => stmt,
1041 Err(e) => {
1042 let error_id = [0u8; 16];
1044 self.send_subscription_error(&error_id, &format!("Parse error: {}", e)).await?;
1045 return Ok(());
1046 }
1047 };
1048
1049 if let Some(ref filter_str) = filter {
1051 if let Err(e) = vibesql_parser::arena_parser::parse_expression_to_owned(filter_str) {
1052 let error_id = [0u8; 16];
1053 self.send_subscription_error(&error_id, &format!("Filter parse error: {}", e))
1054 .await?;
1055 return Ok(());
1056 }
1057 }
1058
1059 let table_dependencies = table_extractor::extract_tables_from_statement(&parsed);
1061
1062 let pk_detection = {
1065 let db = session.shared_database().read().await;
1066 detect_pk_columns_from_stmt(&parsed, &db)
1067 };
1068 if pk_detection.confident {
1069 debug!(
1070 "PK detection confident for subscription: pk_columns={:?}, tables={:?}",
1071 pk_detection.pk_column_indices,
1072 pk_detection.tables
1073 );
1074 } else {
1075 debug!(
1076 "PK detection not confident for subscription: reason={}, pk_columns={:?}, tables={:?}, query={}",
1077 pk_detection.reason.map(|r| r.to_string()).unwrap_or_else(|| "unknown".to_string()),
1078 pk_detection.pk_column_indices,
1079 pk_detection.tables,
1080 query
1081 );
1082 }
1083
1084 if let Some(metrics) = self.observability.metrics() {
1086 if pk_detection.confident {
1087 metrics.record_pk_detection("confident", None);
1088 } else {
1089 let reason = if pk_detection.tables.is_empty() {
1091 "no_table"
1092 } else if pk_detection.tables.len() > 1 {
1093 "join_query"
1094 } else if pk_detection.pk_column_indices == vec![0] {
1095 "pk_not_in_result"
1097 } else {
1098 "unknown"
1099 };
1100 metrics.record_pk_detection("not_confident", Some(reason));
1101 }
1102 }
1103
1104 let wire_subscription_id = *uuid::Uuid::new_v4().as_bytes();
1106
1107 let (notify_tx, _notify_rx) = tokio::sync::mpsc::channel(1);
1110
1111 if let Err(e) = self.subscription_manager.subscribe_for_connection(
1113 query.to_string(),
1114 notify_tx,
1115 self.connection_id.clone(),
1116 wire_subscription_id,
1117 table_dependencies.clone(),
1118 filter.clone(),
1119 ) {
1120 let error_id = [0u8; 16];
1122 self.send_subscription_error(&error_id, &format!("{}", e)).await?;
1123 return Ok(());
1124 }
1125
1126 if let Some(metrics) = self.observability.metrics() {
1128 metrics.increment_subscriptions_active();
1129 }
1130
1131 let newly_eligible = self.subscription_manager.update_pk_columns_with_eligibility_by_wire_id(
1134 &wire_subscription_id,
1135 pk_detection.pk_column_indices.clone(),
1136 pk_detection.confident,
1137 );
1138 if newly_eligible {
1139 if let Some(metrics) = self.observability.metrics() {
1140 metrics.increment_selective_eligible();
1141 }
1142 }
1143
1144 if let Some(wire_config) = selective_updates_config {
1146 let server_config = &self.config.subscriptions.selective_updates;
1149
1150 let override_config = SelectiveColumnConfig {
1151 enabled: wire_config.enabled.unwrap_or(server_config.enabled),
1152 pk_columns: pk_detection.pk_column_indices.clone(), min_changed_columns: wire_config
1154 .min_changed_columns
1155 .unwrap_or(server_config.min_changed_columns),
1156 max_changed_columns_ratio: wire_config
1157 .max_changed_columns_ratio
1158 .unwrap_or(server_config.max_changed_columns_ratio),
1159 };
1160
1161 self.subscription_manager.set_selective_updates_override_by_wire_id(
1162 &wire_subscription_id,
1163 override_config,
1164 );
1165 }
1166
1167 match session.execute(query).await {
1169 Ok(ExecutionResult::Select { rows, columns }) => {
1170 let filter_opt = filter.as_ref().and_then(|f| {
1172 let col_names: Vec<String> = columns.iter().map(|c| c.name.clone()).collect();
1173 SubscriptionFilter::new(f, &col_names).ok()
1174 });
1175
1176 let result_rows: Vec<Row> = if let Some(ref flt) = filter_opt {
1178 rows.iter()
1179 .filter(|row| flt.matches(&row.values))
1180 .map(|r| Row { values: r.values.clone() })
1181 .collect()
1182 } else {
1183 rows.iter().map(|r| Row { values: r.values.clone() }).collect()
1184 };
1185
1186 let result_hash = hash_rows(&result_rows);
1188 self.subscription_manager.update_result_by_wire_id(
1189 &wire_subscription_id,
1190 result_hash,
1191 result_rows.clone(),
1192 );
1193
1194 let wire_rows: Vec<Vec<Option<Vec<u8>>>> = result_rows
1196 .iter()
1197 .map(|row| {
1198 row.values.iter().map(|v| Some(v.to_string().as_bytes().to_vec())).collect()
1199 })
1200 .collect();
1201
1202 self.send_subscription_data(
1204 &wire_subscription_id,
1205 SubscriptionUpdateType::Full,
1206 wire_rows,
1207 )
1208 .await?;
1209 }
1210 Ok(_) => {
1211 let was_selective_eligible = self.subscription_manager.unsubscribe_by_wire_id(&wire_subscription_id);
1213 if was_selective_eligible {
1214 if let Some(metrics) = self.observability.metrics() {
1215 metrics.decrement_selective_eligible();
1216 }
1217 }
1218 self.send_subscription_error(
1219 &wire_subscription_id,
1220 "Only SELECT queries can be subscribed to",
1221 )
1222 .await?;
1223 }
1224 Err(e) => {
1225 let was_selective_eligible = self.subscription_manager.unsubscribe_by_wire_id(&wire_subscription_id);
1227 if was_selective_eligible {
1228 if let Some(metrics) = self.observability.metrics() {
1229 metrics.decrement_selective_eligible();
1230 }
1231 }
1232 self.send_subscription_error(&wire_subscription_id, &format!("Execution error: {}", e))
1233 .await?;
1234 }
1235 }
1236
1237 Ok(())
1238 }
1239
1240 #[allow(clippy::type_complexity)]
1248 async fn notify_affected_subscriptions(&mut self, mutation_query: &str) {
1249 let affected_tables = match vibesql_parser::Parser::parse_sql(mutation_query) {
1251 Ok(stmt) => extract_table_refs(&stmt),
1252 Err(e) => {
1253 debug!("Failed to parse mutation query for subscription update: {}", e);
1254 return;
1255 }
1256 };
1257
1258 if affected_tables.is_empty() {
1259 return;
1260 }
1261
1262 let subscriptions_to_update: Vec<([u8; 16], String, u64, Option<Vec<Row>>, Option<String>)> =
1264 affected_tables
1265 .iter()
1266 .flat_map(|table| {
1267 self.subscription_manager
1268 .get_affected_subscriptions_for_connection(table, &self.connection_id)
1269 })
1270 .collect();
1271
1272 if subscriptions_to_update.is_empty() {
1273 return;
1274 }
1275
1276 let mut seen = std::collections::HashSet::new();
1278 let unique_subscriptions: Vec<_> = subscriptions_to_update
1279 .into_iter()
1280 .filter(|(id, _, _, _, _)| seen.insert(*id))
1281 .collect();
1282
1283 debug!(
1284 "Notifying {} subscriptions after mutation affecting tables: {:?}",
1285 unique_subscriptions.len(),
1286 affected_tables
1287 );
1288
1289 for (subscription_id, query, last_hash, last_result, filter) in unique_subscriptions {
1291 if let Some(session) = &mut self.session {
1292 match session.execute(&query).await {
1293 Ok(ExecutionResult::Select { rows, columns }) => {
1294 let filter_opt = filter.as_ref().and_then(|f| {
1296 let col_names: Vec<String> =
1297 columns.iter().map(|c| c.name.clone()).collect();
1298 SubscriptionFilter::new(f, &col_names).ok()
1299 });
1300
1301 let new_rows: Vec<Row> = if let Some(ref flt) = filter_opt {
1303 rows.iter()
1304 .filter(|row| flt.matches(&row.values))
1305 .map(|r| Row { values: r.values.clone() })
1306 .collect()
1307 } else {
1308 rows.iter().map(|r| Row { values: r.values.clone() }).collect()
1309 };
1310
1311 let new_hash = hash_rows(&new_rows);
1313
1314 if new_hash == last_hash {
1316 debug!(
1317 "Same-connection update: results unchanged for subscription {:?}",
1318 subscription_id
1319 );
1320 continue;
1321 }
1322
1323 if let Some(ref old_rows) = last_result {
1325 if self
1327 .try_send_selective_updates(
1328 &subscription_id,
1329 old_rows,
1330 &new_rows,
1331 )
1332 .await
1333 {
1334 self.subscription_manager.update_result_by_wire_id(
1336 &subscription_id,
1337 new_hash,
1338 new_rows,
1339 );
1340 continue;
1341 }
1342
1343 let pk_columns = self
1345 .subscription_manager
1346 .get_pk_columns_by_wire_id(&subscription_id);
1347 if let Some(delta) = compute_delta_with_pk(
1348 SubscriptionId::default(),
1349 old_rows,
1350 &new_rows,
1351 &pk_columns,
1352 ) {
1353 if let Err(e) =
1355 self.send_delta_updates(&subscription_id, &delta).await
1356 {
1357 warn!("Failed to send same-connection delta update: {}", e);
1358 }
1359
1360 if let SubscriptionUpdate::Delta {
1362 ref inserts,
1363 ref updates,
1364 ref deletes,
1365 ..
1366 } = delta
1367 {
1368 debug!(
1369 "Same-connection delta update sent: {} inserts, {} updates, {} deletes for subscription {:?}",
1370 inserts.len(),
1371 updates.len(),
1372 deletes.len(),
1373 subscription_id
1374 );
1375 }
1376 } else {
1377 let wire_rows = Self::rows_to_wire_format(&new_rows);
1379 if let Err(e) = self
1380 .send_subscription_data(
1381 &subscription_id,
1382 SubscriptionUpdateType::Full,
1383 wire_rows,
1384 )
1385 .await
1386 {
1387 warn!("Failed to send same-connection full update: {}", e);
1388 }
1389 }
1390 } else {
1391 let wire_rows = Self::rows_to_wire_format(&new_rows);
1393 if let Err(e) = self
1394 .send_subscription_data(
1395 &subscription_id,
1396 SubscriptionUpdateType::Full,
1397 wire_rows,
1398 )
1399 .await
1400 {
1401 warn!("Failed to send same-connection full update: {}", e);
1402 }
1403 }
1404
1405 self.subscription_manager.update_result_by_wire_id(
1407 &subscription_id,
1408 new_hash,
1409 new_rows,
1410 );
1411 }
1412 Ok(_) => {
1413 warn!("Subscription query returned non-SELECT result");
1415 }
1416 Err(e) => {
1417 if let Err(send_err) = self
1419 .send_subscription_error(&subscription_id, &format!("Query error: {}", e))
1420 .await
1421 {
1422 warn!("Failed to send subscription error: {}", send_err);
1423 }
1424 }
1425 }
1426 }
1427 }
1428 }
1429
1430 fn broadcast_mutation(&self, mutation_query: &str) {
1435 let affected_tables = match vibesql_parser::Parser::parse_sql(mutation_query) {
1437 Ok(stmt) => extract_table_refs(&stmt),
1438 Err(e) => {
1439 debug!("Failed to parse mutation query for broadcast: {}", e);
1440 return;
1441 }
1442 };
1443
1444 if affected_tables.is_empty() {
1445 return;
1446 }
1447
1448 debug!("Broadcasting mutation affecting tables: {:?}", affected_tables);
1449
1450 let notification = TableMutationNotification { affected_tables };
1454 if let Err(e) = self.mutation_broadcast_tx.send(notification) {
1455 debug!("Failed to broadcast mutation notification: {}", e);
1457 }
1458 }
1459
1460 async fn send_query_result(&mut self, result: ExecutionResult) -> Result<()> {
1462 match result {
1463 ExecutionResult::Select { rows, columns } => {
1464 let fields: Vec<FieldDescription> = columns
1466 .iter()
1467 .enumerate()
1468 .map(|(i, col)| FieldDescription {
1469 name: col.name.clone(),
1470 table_oid: 0,
1471 column_attr_number: i as i16,
1472 data_type_oid: 25, data_type_size: -1, type_modifier: -1,
1475 format_code: 0, })
1477 .collect();
1478
1479 self.send_row_description(fields).await?;
1480
1481 let row_count = rows.len();
1483
1484 for row in rows {
1486 let values: Vec<Option<Vec<u8>>> = row
1487 .values
1488 .iter()
1489 .map(|v: &vibesql_types::SqlValue| Some(v.to_string().as_bytes().to_vec()))
1490 .collect();
1491
1492 self.send_data_row(values).await?;
1493 }
1494
1495 self.send_command_complete(&format!("SELECT {}", row_count)).await?;
1497 }
1498
1499 ExecutionResult::Insert { rows_affected } => {
1500 self.send_command_complete(&format!("INSERT 0 {}", rows_affected)).await?;
1501 }
1502
1503 ExecutionResult::Update { rows_affected } => {
1504 self.send_command_complete(&format!("UPDATE {}", rows_affected)).await?;
1505 }
1506
1507 ExecutionResult::Delete { rows_affected } => {
1508 self.send_command_complete(&format!("DELETE {}", rows_affected)).await?;
1509 }
1510
1511 ExecutionResult::CreateTable
1512 | ExecutionResult::CreateIndex
1513 | ExecutionResult::CreateView => {
1514 self.send_command_complete("CREATE TABLE").await?;
1515 }
1516
1517 ExecutionResult::DropTable | ExecutionResult::DropIndex | ExecutionResult::DropView => {
1518 self.send_command_complete("DROP TABLE").await?;
1519 }
1520
1521 ExecutionResult::Analyze { tables_analyzed } => {
1522 self.send_command_complete(&format!("ANALYZE {}", tables_analyzed)).await?;
1523 }
1524
1525 ExecutionResult::Other { message } => {
1526 self.send_command_complete(&message).await?;
1527 }
1528
1529 ExecutionResult::Prepare { statement_name } => {
1530 self.send_command_complete(&format!("PREPARE {}", statement_name)).await?;
1531 }
1532
1533 ExecutionResult::Deallocate { statement_name } => {
1534 self.send_command_complete(&format!("DEALLOCATE {}", statement_name)).await?;
1535 }
1536
1537 ExecutionResult::DeclareCursor { cursor_name } => {
1538 self.send_command_complete(&format!("DECLARE CURSOR {}", cursor_name)).await?;
1539 }
1540
1541 ExecutionResult::OpenCursor { cursor_name } => {
1542 self.send_command_complete(&format!("OPEN {}", cursor_name)).await?;
1543 }
1544
1545 ExecutionResult::Fetch { rows, columns } => {
1546 let fields: Vec<FieldDescription> = columns
1548 .iter()
1549 .enumerate()
1550 .map(|(i, col)| FieldDescription {
1551 name: col.name.clone(),
1552 table_oid: 0,
1553 column_attr_number: i as i16,
1554 data_type_oid: 25, data_type_size: -1, type_modifier: -1,
1557 format_code: 0, })
1559 .collect();
1560
1561 self.send_row_description(fields).await?;
1562
1563 let row_count = rows.len();
1565
1566 for row in rows {
1568 let values: Vec<Option<Vec<u8>>> = row
1569 .values
1570 .iter()
1571 .map(|v: &vibesql_types::SqlValue| Some(v.to_string().as_bytes().to_vec()))
1572 .collect();
1573
1574 self.send_data_row(values).await?;
1575 }
1576
1577 self.send_command_complete(&format!("FETCH {}", row_count)).await?;
1579 }
1580
1581 ExecutionResult::CloseCursor { cursor_name } => {
1582 self.send_command_complete(&format!("CLOSE {}", cursor_name)).await?;
1583 }
1584
1585 ExecutionResult::Begin => {
1586 self.send_command_complete("BEGIN").await?;
1587 }
1588
1589 ExecutionResult::Commit => {
1590 self.send_command_complete("COMMIT").await?;
1591 }
1592
1593 ExecutionResult::Rollback => {
1594 self.send_command_complete("ROLLBACK").await?;
1595 }
1596 }
1597
1598 Ok(())
1599 }
1600
1601 async fn send_authentication_ok(&mut self) -> Result<()> {
1604 BackendMessage::AuthenticationOk.encode(&mut self.write_buf);
1605 self.flush_write_buffer().await
1606 }
1607
1608 async fn send_cleartext_password_request(&mut self) -> Result<()> {
1609 BackendMessage::AuthenticationCleartextPassword.encode(&mut self.write_buf);
1610 self.flush_write_buffer().await
1611 }
1612
1613 async fn send_md5_password_request(&mut self, salt: &[u8; 4]) -> Result<()> {
1614 BackendMessage::AuthenticationMD5Password { salt: *salt }.encode(&mut self.write_buf);
1615 self.flush_write_buffer().await
1616 }
1617
1618 async fn send_parameter_status(&mut self, name: &str, value: &str) -> Result<()> {
1619 BackendMessage::ParameterStatus { name: name.to_string(), value: value.to_string() }
1620 .encode(&mut self.write_buf);
1621 self.flush_write_buffer().await
1622 }
1623
1624 async fn send_backend_key_data(&mut self) -> Result<()> {
1625 BackendMessage::BackendKeyData {
1626 process_id: std::process::id() as i32,
1627 secret_key: 12345, }
1629 .encode(&mut self.write_buf);
1630 self.flush_write_buffer().await
1631 }
1632
1633 async fn send_ready_for_query(&mut self, status: TransactionStatus) -> Result<()> {
1634 BackendMessage::ReadyForQuery { status }.encode(&mut self.write_buf);
1635 self.flush_write_buffer().await
1636 }
1637
1638 async fn send_row_description(&mut self, fields: Vec<FieldDescription>) -> Result<()> {
1639 BackendMessage::RowDescription { fields }.encode(&mut self.write_buf);
1640 self.flush_write_buffer().await
1641 }
1642
1643 async fn send_data_row(&mut self, values: Vec<Option<Vec<u8>>>) -> Result<()> {
1644 BackendMessage::DataRow { values }.encode(&mut self.write_buf);
1645 self.flush_write_buffer().await
1646 }
1647
1648 async fn send_command_complete(&mut self, tag: &str) -> Result<()> {
1649 BackendMessage::CommandComplete { tag: tag.to_string() }.encode(&mut self.write_buf);
1650 self.flush_write_buffer().await
1651 }
1652
1653 async fn send_error_response(&mut self, message: &str) -> Result<()> {
1654 let mut fields = HashMap::new();
1655 fields.insert(b'S', "ERROR".to_string());
1656 fields.insert(b'C', "XX000".to_string()); fields.insert(b'M', message.to_string());
1658
1659 BackendMessage::ErrorResponse { fields }.encode(&mut self.write_buf);
1660 self.flush_write_buffer().await
1661 }
1662
1663 async fn send_empty_query_response(&mut self) -> Result<()> {
1664 BackendMessage::EmptyQueryResponse.encode(&mut self.write_buf);
1665 self.flush_write_buffer().await
1666 }
1667
1668 async fn send_subscription_data(
1670 &mut self,
1671 subscription_id: &[u8; 16],
1672 update_type: SubscriptionUpdateType,
1673 rows: Vec<Vec<Option<Vec<u8>>>>,
1674 ) -> Result<()> {
1675 if let Some(metrics) = self.observability.metrics() {
1677 let type_str = match update_type {
1678 SubscriptionUpdateType::Full => "full",
1679 SubscriptionUpdateType::DeltaInsert => "delta_insert",
1680 SubscriptionUpdateType::DeltaUpdate => "delta_update",
1681 SubscriptionUpdateType::DeltaDelete => "delta_delete",
1682 SubscriptionUpdateType::SelectiveUpdate => "selective",
1683 };
1684 metrics.record_subscription_update(type_str, rows.len() as u64);
1685
1686 if matches!(update_type, SubscriptionUpdateType::Full) {
1688 metrics.record_full_update_sent();
1689 }
1690 }
1691
1692 BackendMessage::SubscriptionData { subscription_id: *subscription_id, update_type, rows }
1693 .encode(&mut self.write_buf);
1694 self.flush_write_buffer().await
1695 }
1696
1697 async fn send_subscription_partial_data(
1702 &mut self,
1703 subscription_id: &[u8; 16],
1704 rows: Vec<PartialRowUpdate>,
1705 ) -> Result<()> {
1706 if let Some(metrics) = self.observability.metrics() {
1708 metrics.record_subscription_update("selective", rows.len() as u64);
1709 }
1710
1711 BackendMessage::SubscriptionPartialData { subscription_id: *subscription_id, rows }
1712 .encode(&mut self.write_buf);
1713 self.flush_write_buffer().await
1714 }
1715
1716 async fn send_subscription_error(
1718 &mut self,
1719 subscription_id: &[u8; 16],
1720 message: &str,
1721 ) -> Result<()> {
1722 BackendMessage::SubscriptionError {
1723 subscription_id: *subscription_id,
1724 message: message.to_string(),
1725 }
1726 .encode(&mut self.write_buf);
1727 self.flush_write_buffer().await
1728 }
1729
1730 async fn read_message(&mut self) -> Result<()> {
1733 let n = self.read_half.read_buf(&mut self.read_buf).await?;
1734 if n == 0 {
1735 return Err(anyhow::anyhow!("Connection closed"));
1736 }
1737 Ok(())
1738 }
1739
1740 async fn flush_write_buffer(&mut self) -> Result<()> {
1741 self.write_half.write_all(&self.write_buf).await?;
1742 self.write_half.flush().await?;
1743 self.write_buf.clear();
1744 Ok(())
1745 }
1746}
1747
1748impl Drop for ConnectionHandler {
1749 fn drop(&mut self) {
1750 self.active_connections.fetch_sub(1, Ordering::AcqRel);
1752
1753 if let Some(metrics) = self.observability.metrics() {
1755 metrics.record_connection_duration(self.connection_start.elapsed());
1756 }
1757 }
1758}