1pub mod auth_state;
168pub mod backend;
169pub(crate) mod common;
170pub mod connection;
171pub mod error_classification;
172pub mod handlers;
173pub mod metrics_ext;
174pub mod mode_state;
175pub mod precheck;
176pub(crate) mod routing;
177pub mod state;
178pub mod streaming;
179
180use std::sync::Arc;
181
182use crate::auth::AuthHandler;
183use crate::config::RoutingMode;
184use crate::metrics::MetricsCollector;
185use crate::pool::BufferPool;
186use crate::router::BackendSelector;
187use crate::types::{ClientAddress, ClientId};
188
189pub use auth_state::AuthState;
190pub use metrics_ext::MetricsRecorder;
191pub use mode_state::{ModeState, SessionMode};
192pub use state::SessionLoopState;
193
194pub struct ClientSession {
198 client_addr: ClientAddress,
199 buffer_pool: BufferPool,
200 client_id: ClientId,
202 router: Option<Arc<BackendSelector>>,
204 mode_state: ModeState,
206 auth_handler: Arc<AuthHandler>,
208 auth_state: AuthState,
210 metrics: crate::metrics::MetricsCollector,
212
213 connection_stats: Option<crate::metrics::ConnectionStatsAggregator>,
215
216 cache: Arc<crate::cache::UnifiedCache>,
218
219 cache_articles: bool,
221
222 adaptive_precheck: bool,
224}
225
226pub struct ClientSessionBuilder {
260 client_addr: ClientAddress,
261 buffer_pool: BufferPool,
262 router: Option<Arc<BackendSelector>>,
263 routing_mode: RoutingMode,
264 auth_handler: Arc<AuthHandler>,
265 metrics: MetricsCollector,
266 connection_stats: Option<crate::metrics::ConnectionStatsAggregator>,
267 cache: Arc<crate::cache::UnifiedCache>,
268 cache_articles: bool,
269 adaptive_precheck: bool,
270}
271
272impl ClientSessionBuilder {
273 #[must_use]
278 pub fn with_router(mut self, router: Arc<BackendSelector>) -> Self {
279 self.router = Some(router);
280 self
281 }
282
283 #[must_use]
290 pub fn with_routing_mode(mut self, mode: RoutingMode) -> Self {
291 self.routing_mode = mode;
292 self
293 }
294
295 #[must_use]
297 pub fn with_auth_handler(mut self, auth_handler: Arc<AuthHandler>) -> Self {
298 self.auth_handler = auth_handler;
299 self
300 }
301
302 #[must_use]
304 pub fn with_connection_stats(
305 mut self,
306 connection_stats: crate::metrics::ConnectionStatsAggregator,
307 ) -> Self {
308 self.connection_stats = Some(connection_stats);
309 self
310 }
311
312 #[must_use]
314 pub fn with_cache(mut self, cache: Arc<crate::cache::UnifiedCache>) -> Self {
315 self.cache = cache;
316 self
317 }
318
319 #[must_use]
324 pub fn with_cache_articles(mut self, cache: bool) -> Self {
325 self.cache_articles = cache;
326 self
327 }
328
329 pub fn with_adaptive_precheck(mut self, enable: bool) -> Self {
331 self.adaptive_precheck = enable;
332 self
333 }
334
335 #[must_use]
340 pub fn build(self) -> ClientSession {
341 let (mode, routing_mode) = match (&self.router, self.routing_mode) {
342 (Some(_), RoutingMode::PerCommand | RoutingMode::Hybrid) => {
344 (SessionMode::PerCommand, self.routing_mode)
345 }
346 (Some(_), RoutingMode::Stateful) => (SessionMode::Stateful, RoutingMode::Stateful),
348 (None, _) => (SessionMode::Stateful, RoutingMode::Stateful),
350 };
351
352 ClientSession {
353 client_addr: self.client_addr,
354 buffer_pool: self.buffer_pool,
355 client_id: ClientId::new(),
356 router: self.router,
357 mode_state: ModeState::new(mode, routing_mode),
358 auth_handler: self.auth_handler,
359 auth_state: AuthState::new(),
360 metrics: self.metrics,
361 connection_stats: self.connection_stats,
362 cache: self.cache,
363 cache_articles: self.cache_articles,
364 adaptive_precheck: self.adaptive_precheck,
365 }
366 }
367}
368
369impl ClientSession {
370 fn default_cache() -> Arc<crate::cache::UnifiedCache> {
372 const DEFAULT_TTL: std::time::Duration = std::time::Duration::from_secs(3600);
373 Arc::new(crate::cache::UnifiedCache::memory(0, DEFAULT_TTL, false))
374 }
375
376 #[must_use]
378 pub fn new(
379 client_addr: ClientAddress,
380 buffer_pool: BufferPool,
381 auth_handler: Arc<AuthHandler>,
382 metrics: MetricsCollector,
383 ) -> Self {
384 Self {
385 client_addr,
386 buffer_pool,
387 client_id: ClientId::new(),
388 router: None,
389 mode_state: ModeState::new(SessionMode::Stateful, RoutingMode::Stateful),
390 auth_handler,
391 auth_state: AuthState::new(),
392 metrics,
393 connection_stats: None,
394 cache: Self::default_cache(),
395 cache_articles: true,
396 adaptive_precheck: false,
397 }
398 }
399
400 #[must_use]
405 pub fn new_with_router(
406 client_addr: ClientAddress,
407 buffer_pool: BufferPool,
408 router: Arc<BackendSelector>,
409 routing_mode: RoutingMode,
410 auth_handler: Arc<AuthHandler>,
411 metrics: MetricsCollector,
412 ) -> Self {
413 Self {
414 client_addr,
415 buffer_pool,
416 client_id: ClientId::new(),
417 router: Some(router),
418 mode_state: ModeState::new(SessionMode::PerCommand, routing_mode),
419 auth_handler,
420 auth_state: AuthState::new(),
421 metrics,
422 connection_stats: None,
423 cache: Arc::new(crate::cache::UnifiedCache::memory(
424 0,
425 std::time::Duration::from_secs(3600),
426 false,
427 )),
428 cache_articles: true,
429 adaptive_precheck: false,
430 }
431 }
432
433 #[must_use]
455 pub fn builder(
456 client_addr: ClientAddress,
457 buffer_pool: BufferPool,
458 auth_handler: Arc<AuthHandler>,
459 metrics: MetricsCollector,
460 ) -> ClientSessionBuilder {
461 ClientSessionBuilder {
462 client_addr,
463 buffer_pool,
464 router: None,
465 routing_mode: RoutingMode::Stateful,
466 auth_handler,
467 metrics,
468 connection_stats: None,
469 cache: Arc::new(crate::cache::UnifiedCache::memory(
470 0,
471 std::time::Duration::from_secs(3600),
472 false,
473 )),
474 cache_articles: true,
475 adaptive_precheck: false,
476 }
477 }
478
479 #[must_use]
483 #[inline]
484 pub fn client_id(&self) -> ClientId {
485 self.client_id
486 }
487
488 #[must_use]
494 #[inline]
495 pub fn is_per_command_routing(&self) -> bool {
496 self.router.is_some()
497 }
498
499 #[must_use]
501 #[inline]
502 pub fn mode(&self) -> SessionMode {
503 self.mode_state.mode()
504 }
505
506 #[inline]
511 #[must_use]
512 pub fn username(&self) -> Option<Arc<str>> {
513 self.auth_state.username()
514 }
515
516 pub(crate) fn set_username(&self, username: Option<String>) {
521 if let Some(name) = username {
522 self.auth_state.mark_authenticated(name);
523 }
524 }
525
526 #[must_use]
528 #[inline]
529 pub(crate) fn connection_stats(&self) -> Option<&crate::metrics::ConnectionStatsAggregator> {
530 self.connection_stats.as_ref()
531 }
532
533 #[inline]
541 pub(crate) fn is_authenticated_cached(&self, skip_auth_check: bool) -> bool {
542 self.auth_state.is_authenticated_or_skipped(skip_auth_check)
543 }
544}
545
546#[cfg(test)]
547mod tests {
548 use super::*;
549 use crate::auth::AuthHandler;
550 use crate::metrics::MetricsCollector;
551 use crate::types::BufferSize;
552 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
553 use std::sync::Arc;
554
555 fn test_auth_handler() -> Arc<AuthHandler> {
557 Arc::new(AuthHandler::new(None, None).unwrap())
558 }
559
560 fn test_metrics() -> MetricsCollector {
562 MetricsCollector::new(1)
563 }
564
565 #[test]
566 fn test_client_session_creation() {
567 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
568 let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
569 let session = ClientSession::new(
570 addr.into(),
571 buffer_pool.clone(),
572 test_auth_handler(),
573 test_metrics(),
574 );
575
576 assert_eq!(session.client_addr.port(), 8080);
577 assert_eq!(
578 session.client_addr.ip(),
579 IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))
580 );
581 }
582
583 #[test]
584 fn test_client_session_with_different_ports() {
585 let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
586
587 let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
588 let session1 = ClientSession::new(
589 addr1.into(),
590 buffer_pool.clone(),
591 test_auth_handler(),
592 test_metrics(),
593 );
594
595 let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 9090);
596 let session2 = ClientSession::new(
597 addr2.into(),
598 buffer_pool.clone(),
599 test_auth_handler(),
600 test_metrics(),
601 );
602
603 assert_ne!(session1.client_addr.port(), session2.client_addr.port());
604 assert_eq!(session1.client_addr.port(), 8080);
605 assert_eq!(session2.client_addr.port(), 9090);
606 }
607
608 #[test]
609 fn test_client_session_with_ipv6() {
610 let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
611 let addr = SocketAddr::new(IpAddr::V6("::1".parse().unwrap()), 8119);
612 let session = ClientSession::new(
613 addr.into(),
614 buffer_pool,
615 test_auth_handler(),
616 test_metrics(),
617 );
618
619 assert_eq!(session.client_addr.port(), 8119);
620 assert!(session.client_addr.is_ipv6());
621 }
622
623 #[test]
624 fn test_buffer_pool_cloning() {
625 let buffer_pool = BufferPool::new(BufferSize::try_new(8192).unwrap(), 10);
626 let buffer_pool_clone = buffer_pool.clone();
627
628 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 1234);
629 let _session1 = ClientSession::new(
630 addr.into(),
631 buffer_pool,
632 test_auth_handler(),
633 test_metrics(),
634 );
635 let _session2 = ClientSession::new(
636 addr.into(),
637 buffer_pool_clone,
638 test_auth_handler(),
639 test_metrics(),
640 );
641 }
642
643 #[test]
644 fn test_session_addr_formatting() {
645 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 5555);
646 let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
647 let session = ClientSession::new(
648 addr.into(),
649 buffer_pool,
650 test_auth_handler(),
651 test_metrics(),
652 );
653
654 let addr_str = format!("{}", session.client_addr);
655 assert!(addr_str.contains("10.0.0.1"));
656 assert!(addr_str.contains("5555"));
657 }
658
659 #[test]
660 fn test_multiple_sessions_same_buffer_pool() {
661 let buffer_pool = BufferPool::new(BufferSize::try_new(4096).unwrap(), 8);
662 let sessions: Vec<_> = (0..5)
663 .map(|i| {
664 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8000 + i);
665 ClientSession::new(
666 addr.into(),
667 buffer_pool.clone(),
668 test_auth_handler(),
669 test_metrics(),
670 )
671 })
672 .collect();
673
674 assert_eq!(sessions.len(), 5);
675 for (i, session) in sessions.iter().enumerate() {
676 assert_eq!(session.client_addr.port(), 8000 + i as u16);
677 }
678 }
679
680 #[test]
681 fn test_loopback_address() {
682 let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
683 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8119);
684 let session = ClientSession::new(
685 addr.into(),
686 buffer_pool,
687 test_auth_handler(),
688 test_metrics(),
689 );
690
691 assert!(session.client_addr.ip().is_loopback());
692 }
693
694 #[test]
695 fn test_unspecified_address() {
696 let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
697 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0);
698 let session = ClientSession::new(
699 addr.into(),
700 buffer_pool,
701 test_auth_handler(),
702 test_metrics(),
703 );
704
705 assert!(session.client_addr.ip().is_unspecified());
706 assert_eq!(session.client_addr.port(), 0);
707 }
708
709 #[test]
710 fn test_session_without_router() {
711 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
712 let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
713 let session = ClientSession::new(
714 addr.into(),
715 buffer_pool,
716 test_auth_handler(),
717 test_metrics(),
718 );
719
720 assert!(!session.is_per_command_routing());
721 assert_eq!(session.client_addr.port(), 8080);
722 }
723
724 #[test]
725 fn test_session_with_router() {
726 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
727 let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
728 let router = Arc::new(BackendSelector::new());
729 let session = ClientSession::new_with_router(
730 addr.into(),
731 buffer_pool,
732 router,
733 RoutingMode::PerCommand,
734 test_auth_handler(),
735 test_metrics(),
736 );
737
738 assert!(session.is_per_command_routing());
739 assert_eq!(session.client_addr.port(), 8080);
740 }
741
742 #[test]
743 fn test_client_id_uniqueness() {
744 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
745 let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
746
747 let session1 = ClientSession::new(
748 addr.into(),
749 buffer_pool.clone(),
750 test_auth_handler(),
751 test_metrics(),
752 );
753 let session2 = ClientSession::new(
754 addr.into(),
755 buffer_pool,
756 test_auth_handler(),
757 test_metrics(),
758 );
759
760 assert_ne!(session1.client_id(), session2.client_id());
761 }
762
763 #[test]
764 fn test_session_mode_enum() {
765 assert_eq!(SessionMode::PerCommand, SessionMode::PerCommand);
766 assert_eq!(SessionMode::Stateful, SessionMode::Stateful);
767 assert_ne!(SessionMode::PerCommand, SessionMode::Stateful);
768
769 let per_command = format!("{:?}", SessionMode::PerCommand);
770 let stateful = format!("{:?}", SessionMode::Stateful);
771 assert!(per_command.contains("PerCommand"));
772 assert!(stateful.contains("Stateful"));
773 }
774
775 #[test]
776 fn test_hybrid_session_creation() {
777 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
778 let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
779 let router = Arc::new(BackendSelector::new());
780
781 let session = ClientSession::new_with_router(
782 addr.into(),
783 buffer_pool,
784 router,
785 RoutingMode::Hybrid,
786 test_auth_handler(),
787 test_metrics(),
788 );
789
790 assert!(session.is_per_command_routing());
791 assert_eq!(session.mode_state.routing_mode(), RoutingMode::Hybrid);
792 assert_eq!(session.mode(), SessionMode::PerCommand);
793 }
794
795 #[test]
796 fn test_routing_mode_configurations() {
797 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
798 let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
799 let router = Arc::new(BackendSelector::new());
800
801 let session = ClientSession::new_with_router(
803 addr.into(),
804 buffer_pool.clone(),
805 router.clone(),
806 RoutingMode::Stateful,
807 test_auth_handler(),
808 test_metrics(),
809 );
810 assert!(session.is_per_command_routing());
811 assert_eq!(session.mode_state.routing_mode(), RoutingMode::Stateful);
812
813 let session = ClientSession::new_with_router(
815 addr.into(),
816 buffer_pool.clone(),
817 router.clone(),
818 RoutingMode::PerCommand,
819 test_auth_handler(),
820 test_metrics(),
821 );
822 assert!(session.is_per_command_routing());
823 assert_eq!(session.mode_state.routing_mode(), RoutingMode::PerCommand);
824 assert_eq!(session.mode(), SessionMode::PerCommand);
825
826 let session = ClientSession::new_with_router(
828 addr.into(),
829 buffer_pool,
830 router,
831 RoutingMode::Hybrid,
832 test_auth_handler(),
833 test_metrics(),
834 );
835 assert!(session.is_per_command_routing());
836 assert_eq!(session.mode_state.routing_mode(), RoutingMode::Hybrid);
837 assert_eq!(session.mode(), SessionMode::PerCommand);
838 }
839
840 #[test]
841 fn test_hybrid_mode_initial_state() {
842 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
843 let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
844 let router = Arc::new(BackendSelector::new());
845
846 let session = ClientSession::new_with_router(
847 addr.into(),
848 buffer_pool,
849 router,
850 RoutingMode::Hybrid,
851 test_auth_handler(),
852 test_metrics(),
853 );
854
855 assert_eq!(session.mode(), SessionMode::PerCommand);
856 assert_eq!(session.mode_state.routing_mode(), RoutingMode::Hybrid);
857 assert!(session.is_per_command_routing());
858 }
859
860 #[test]
861 fn test_is_per_command_routing_logic() {
862 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
863 let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
864 let router = Arc::new(BackendSelector::new());
865
866 let session = ClientSession::new_with_router(
868 addr.into(),
869 buffer_pool.clone(),
870 router.clone(),
871 RoutingMode::Stateful,
872 test_auth_handler(),
873 test_metrics(),
874 );
875 assert!(session.is_per_command_routing());
876
877 let session = ClientSession::new_with_router(
879 addr.into(),
880 buffer_pool.clone(),
881 router.clone(),
882 RoutingMode::PerCommand,
883 test_auth_handler(),
884 test_metrics(),
885 );
886 assert!(session.is_per_command_routing());
887
888 let session = ClientSession::new_with_router(
890 addr.into(),
891 buffer_pool.clone(),
892 router,
893 RoutingMode::Hybrid,
894 test_auth_handler(),
895 test_metrics(),
896 );
897 assert!(session.is_per_command_routing());
898
899 let session = ClientSession::new(
901 addr.into(),
902 buffer_pool,
903 test_auth_handler(),
904 test_metrics(),
905 );
906 assert!(!session.is_per_command_routing());
907 }
908
909 #[test]
912 fn test_builder_basic_construction() {
913 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
914 let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
915 let auth_handler = test_auth_handler();
916
917 let session =
918 ClientSession::builder(addr.into(), buffer_pool, auth_handler, test_metrics()).build();
919
920 assert_eq!(*session.client_addr, addr);
921 assert!(!session.is_per_command_routing());
922 assert_eq!(session.mode(), SessionMode::Stateful);
923 assert_eq!(session.mode_state.routing_mode(), RoutingMode::Stateful);
924 }
925
926 #[test]
927 fn test_builder_with_router() {
928 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
929 let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
930 let router = Arc::new(BackendSelector::new());
931
932 let session = ClientSession::builder(
933 addr.into(),
934 buffer_pool,
935 test_auth_handler(),
936 test_metrics(),
937 )
938 .with_router(router)
939 .with_routing_mode(RoutingMode::PerCommand)
940 .build();
941
942 assert!(session.is_per_command_routing());
943 assert_eq!(session.mode(), SessionMode::PerCommand);
944 assert_eq!(session.mode_state.routing_mode(), RoutingMode::PerCommand);
945 }
946
947 #[test]
948 fn test_builder_with_hybrid_mode() {
949 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
950 let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
951 let router = Arc::new(BackendSelector::new());
952
953 let session = ClientSession::builder(
954 addr.into(),
955 buffer_pool,
956 test_auth_handler(),
957 test_metrics(),
958 )
959 .with_router(router)
960 .with_routing_mode(RoutingMode::Hybrid)
961 .build();
962
963 assert!(session.is_per_command_routing());
964 assert_eq!(session.mode(), SessionMode::PerCommand);
965 assert_eq!(session.mode_state.routing_mode(), RoutingMode::Hybrid);
966 }
967
968 #[test]
969 fn test_builder_with_stateful_mode() {
970 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
971 let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
972 let router = Arc::new(BackendSelector::new());
973
974 let session = ClientSession::builder(
976 addr.into(),
977 buffer_pool,
978 test_auth_handler(),
979 test_metrics(),
980 )
981 .with_router(router)
982 .with_routing_mode(RoutingMode::Stateful)
983 .build();
984
985 assert!(session.is_per_command_routing());
986 assert_eq!(session.mode(), SessionMode::Stateful);
987 assert_eq!(session.mode_state.routing_mode(), RoutingMode::Stateful);
988 }
989
990 #[test]
991 fn test_builder_without_router_ignores_mode() {
992 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
993 let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
994
995 let session = ClientSession::builder(
997 addr.into(),
998 buffer_pool,
999 test_auth_handler(),
1000 test_metrics(),
1001 )
1002 .with_routing_mode(RoutingMode::PerCommand)
1003 .build();
1004
1005 assert!(!session.is_per_command_routing());
1006 assert_eq!(session.mode(), SessionMode::Stateful);
1007 assert_eq!(session.mode_state.routing_mode(), RoutingMode::Stateful);
1008 }
1009
1010 #[test]
1011 fn test_builder_with_auth_handler() {
1012 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
1013 let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
1014 let auth_handler =
1015 Arc::new(AuthHandler::new(Some("user".to_string()), Some("pass".to_string())).unwrap());
1016
1017 let session = ClientSession::builder(
1018 addr.into(),
1019 buffer_pool,
1020 test_auth_handler(),
1021 test_metrics(),
1022 )
1023 .with_auth_handler(auth_handler.clone())
1024 .build();
1025
1026 assert_eq!(*session.client_addr, addr);
1028 }
1029
1030 #[test]
1031 fn test_builder_with_metrics() {
1032 use crate::metrics::MetricsCollector;
1033
1034 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
1035 let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
1036 let metrics = MetricsCollector::new(1); let session =
1039 ClientSession::builder(addr.into(), buffer_pool, test_auth_handler(), metrics).build();
1040
1041 let _ = session.metrics; }
1044
1045 #[test]
1046 fn test_builder_with_connection_stats() {
1047 use crate::metrics::ConnectionStatsAggregator;
1048
1049 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
1050 let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
1051 let stats = ConnectionStatsAggregator::default();
1052
1053 let session = ClientSession::builder(
1054 addr.into(),
1055 buffer_pool,
1056 test_auth_handler(),
1057 test_metrics(),
1058 )
1059 .with_connection_stats(stats)
1060 .build();
1061
1062 assert!(session.connection_stats().is_some());
1063 }
1064
1065 #[test]
1066 fn test_builder_with_cache() {
1067 use crate::cache::UnifiedCache;
1068 use std::time::Duration;
1069
1070 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
1071 let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
1072 let cache = Arc::new(UnifiedCache::memory(100, Duration::from_secs(3600), true));
1073
1074 let session = ClientSession::builder(
1075 addr.into(),
1076 buffer_pool,
1077 test_auth_handler(),
1078 test_metrics(),
1079 )
1080 .with_cache(cache.clone())
1081 .build();
1082
1083 assert_eq!(session.cache.capacity(), 100);
1085 }
1086
1087 #[test]
1088 fn test_builder_method_chaining() {
1089 use crate::metrics::MetricsCollector;
1090
1091 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
1092 let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
1093 let router = Arc::new(BackendSelector::new());
1094 let metrics = MetricsCollector::new(1); let session =
1098 ClientSession::builder(addr.into(), buffer_pool, test_auth_handler(), metrics)
1099 .with_router(router)
1100 .with_routing_mode(RoutingMode::Hybrid)
1101 .build();
1102
1103 assert!(session.is_per_command_routing());
1104 assert_eq!(session.mode(), SessionMode::PerCommand);
1105 assert_eq!(session.mode_state.routing_mode(), RoutingMode::Hybrid);
1106 let _ = session.metrics; }
1109
1110 #[test]
1113 fn test_mode_getter() {
1114 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
1115 let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
1116 let router = Arc::new(BackendSelector::new());
1117
1118 let session = ClientSession::new_with_router(
1120 addr.into(),
1121 buffer_pool.clone(),
1122 router.clone(),
1123 RoutingMode::PerCommand,
1124 test_auth_handler(),
1125 test_metrics(),
1126 );
1127 assert_eq!(session.mode(), SessionMode::PerCommand);
1128
1129 let session = ClientSession::new(
1131 addr.into(),
1132 buffer_pool,
1133 test_auth_handler(),
1134 test_metrics(),
1135 );
1136 assert_eq!(session.mode(), SessionMode::Stateful);
1137 }
1138
1139 #[test]
1140 fn test_username_initially_none() {
1141 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
1142 let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
1143 let session = ClientSession::new(
1144 addr.into(),
1145 buffer_pool,
1146 test_auth_handler(),
1147 test_metrics(),
1148 );
1149
1150 assert!(session.username().is_none());
1151 }
1152
1153 #[test]
1154 fn test_set_username_and_get() {
1155 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
1156 let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
1157 let session = ClientSession::new(
1158 addr.into(),
1159 buffer_pool,
1160 test_auth_handler(),
1161 test_metrics(),
1162 );
1163
1164 session.set_username(Some("testuser".to_string()));
1165
1166 let username = session.username();
1167 assert!(username.is_some());
1168 assert_eq!(username.as_deref(), Some("testuser"));
1169 }
1170
1171 #[test]
1172 fn test_set_username_none() {
1173 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
1174 let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
1175 let session = ClientSession::new(
1176 addr.into(),
1177 buffer_pool,
1178 test_auth_handler(),
1179 test_metrics(),
1180 );
1181
1182 session.set_username(None);
1183
1184 assert!(session.username().is_none());
1185 }
1186
1187 #[test]
1188 fn test_username_cheap_clone() {
1189 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
1190 let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
1191 let session = ClientSession::new(
1192 addr.into(),
1193 buffer_pool,
1194 test_auth_handler(),
1195 test_metrics(),
1196 );
1197
1198 session.set_username(Some("testuser".to_string()));
1199
1200 let username1 = session.username();
1201 let username2 = session.username();
1202
1203 assert!(username1.is_some());
1204 assert!(username2.is_some());
1205 assert_eq!(username1, username2);
1206 assert_eq!(username1.as_deref(), Some("testuser"));
1207 }
1208
1209 #[test]
1210 fn test_connection_stats_none_by_default() {
1211 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
1212 let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
1213 let session = ClientSession::new(
1214 addr.into(),
1215 buffer_pool,
1216 test_auth_handler(),
1217 test_metrics(),
1218 );
1219
1220 assert!(session.connection_stats().is_none());
1221 }
1222
1223 #[test]
1224 fn test_connection_stats_with_aggregator() {
1225 use crate::metrics::ConnectionStatsAggregator;
1226
1227 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
1228 let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
1229 let stats = ConnectionStatsAggregator::default();
1230
1231 let session = ClientSession::builder(
1232 addr.into(),
1233 buffer_pool,
1234 test_auth_handler(),
1235 test_metrics(),
1236 )
1237 .with_connection_stats(stats)
1238 .build();
1239
1240 assert!(session.connection_stats().is_some());
1241 }
1242
1243 #[test]
1246 fn test_metrics_direct_calls_no_metrics() {
1247 use crate::types::BackendId;
1248
1249 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
1250 let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
1251 let session = ClientSession::new(
1252 addr.into(),
1253 buffer_pool,
1254 test_auth_handler(),
1255 test_metrics(),
1256 );
1257
1258 session.metrics.record_command(BackendId::from_index(0));
1260 session.metrics.user_command(session.username().as_deref());
1261 session.metrics.stateful_session_started();
1262 session.metrics.stateful_session_ended();
1263 session
1264 .metrics
1265 .user_bytes_sent(session.username().as_deref(), 1024);
1266 session
1267 .metrics
1268 .user_bytes_received(session.username().as_deref(), 2048);
1269 }
1270
1271 #[test]
1272 fn test_metrics_direct_calls_with_metrics() {
1273 use crate::metrics::MetricsCollector;
1274 use crate::types::BackendId;
1275
1276 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
1277 let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
1278 let metrics = MetricsCollector::new(1); let session = ClientSession::builder(
1281 addr.into(),
1282 buffer_pool,
1283 test_auth_handler(),
1284 metrics.clone(),
1285 )
1286 .build();
1287
1288 session.set_username(Some("testuser".to_string()));
1289
1290 session.metrics.record_command(BackendId::from_index(0));
1292 session.metrics.user_command(session.username().as_deref());
1293 session.metrics.stateful_session_started();
1294 session.metrics.stateful_session_ended();
1295 session
1296 .metrics
1297 .user_bytes_sent(session.username().as_deref(), 1024);
1298 session
1299 .metrics
1300 .user_bytes_received(session.username().as_deref(), 2048);
1301
1302 let snapshot = metrics.snapshot(None);
1304 assert!(!snapshot.backend_stats.is_empty());
1305 assert!(snapshot.backend_stats[0].total_commands.get() > 0);
1306 }
1307
1308 #[test]
1309 fn test_record_command_with_metrics() {
1310 use crate::metrics::MetricsCollector;
1311 use crate::types::BackendId;
1312
1313 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
1314 let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
1315 let metrics = MetricsCollector::new(1); let session = ClientSession::builder(
1318 addr.into(),
1319 buffer_pool,
1320 test_auth_handler(),
1321 metrics.clone(),
1322 )
1323 .build();
1324
1325 let backend_id = BackendId::from_index(0);
1326 session.metrics.record_command(backend_id);
1327
1328 let snapshot = metrics.snapshot(None);
1329 assert_eq!(snapshot.backend_stats[0].total_commands.get(), 1);
1330 }
1331
1332 #[test]
1333 fn test_user_bytes_tracking() {
1334 use crate::metrics::MetricsCollector;
1335
1336 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
1337 let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
1338 let metrics = MetricsCollector::new(1); let session = ClientSession::builder(
1341 addr.into(),
1342 buffer_pool,
1343 test_auth_handler(),
1344 metrics.clone(),
1345 )
1346 .build();
1347
1348 session.set_username(Some("testuser".to_string()));
1349 session
1350 .metrics
1351 .user_bytes_sent(session.username().as_deref(), 1024);
1352 session
1353 .metrics
1354 .user_bytes_received(session.username().as_deref(), 2048);
1355
1356 let snapshot = metrics.snapshot(None);
1357 let user_stats = snapshot
1358 .user_stats
1359 .iter()
1360 .find(|s| s.username == "testuser");
1361 assert!(user_stats.is_some());
1362
1363 let stats = user_stats.unwrap();
1364 assert_eq!(stats.bytes_sent.as_u64(), 1024);
1365 assert_eq!(stats.bytes_received.as_u64(), 2048);
1366 }
1367
1368 #[test]
1369 fn test_stateful_session_tracking() {
1370 use crate::metrics::MetricsCollector;
1371
1372 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
1373 let buffer_pool = BufferPool::new(BufferSize::try_new(1024).unwrap(), 4);
1374 let metrics = MetricsCollector::new(1); let session = ClientSession::builder(
1377 addr.into(),
1378 buffer_pool,
1379 test_auth_handler(),
1380 metrics.clone(),
1381 )
1382 .build();
1383
1384 session.metrics.stateful_session_started();
1385
1386 let snapshot = metrics.snapshot(None);
1387 assert_eq!(snapshot.stateful_sessions, 1);
1388
1389 session.metrics.stateful_session_ended();
1390
1391 let snapshot = metrics.snapshot(None);
1392 assert_eq!(snapshot.stateful_sessions, 0);
1393 }
1394}