1use std::collections::VecDeque;
2use std::mem;
3use std::ops::{Deref, DerefMut};
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use parking_lot::{Mutex, RwLock};
8use thiserror;
9use tokio::select;
10use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
11use tokio::sync::{mpsc, oneshot};
12use tokio::task::{JoinHandle, JoinSet};
13use tokio::time::{sleep, timeout};
14use tokio_util::sync::CancellationToken;
15
16use google_cloud_gax::grpc::metadata::MetadataMap;
17use google_cloud_gax::grpc::{Code, Status};
18use google_cloud_gax::retry::TryAs;
19use google_cloud_googleapis::spanner::v1::{BatchCreateSessionsRequest, DeleteSessionRequest, Session};
20
21use crate::apiv1::conn_pool::ConnectionManager;
22use crate::apiv1::spanner_client::{ping_query_request, Client};
23
24pub struct SessionHandle {
26 pub session: Session,
27 pub spanner_client: Client,
28 valid: bool,
29 deleted: bool,
30 last_used_at: Instant,
31 last_checked_at: Instant,
32 last_pong_at: Instant,
33 created_at: Instant,
34}
35
36impl SessionHandle {
37 pub(crate) fn new(session: Session, spanner_client: Client, now: Instant) -> SessionHandle {
38 SessionHandle {
39 session,
40 spanner_client,
41 valid: true,
42 deleted: false,
43 last_used_at: now,
44 last_checked_at: now,
45 last_pong_at: now,
46 created_at: now,
47 }
48 }
49
50 pub async fn invalidate_if_needed<T>(&mut self, arg: Result<T, Status>) -> Result<T, Status> {
51 match arg {
52 Ok(s) => Ok(s),
53 Err(e) => {
54 if e.code() == Code::NotFound && e.message().contains("Session not found:") {
55 tracing::debug!("session invalidate {}", self.session.name);
56 self.delete().await;
57 }
58 Err(e)
59 }
60 }
61 }
62
63 async fn delete(&mut self) {
64 self.valid = false;
65 let session_name = &self.session.name;
66 let request = DeleteSessionRequest {
67 name: session_name.to_string(),
68 };
69 match self.spanner_client.delete_session(request, None).await {
70 Ok(_) => self.deleted = true,
71 Err(e) => tracing::error!("failed to delete session {}, {:?}", session_name, e),
72 };
73 }
74}
75
76pub struct ManagedSession {
78 session_pool: SessionPool,
79 session: Option<SessionHandle>,
80}
81
82impl ManagedSession {
83 fn new(session_pool: SessionPool, session: SessionHandle) -> Self {
84 ManagedSession {
85 session_pool,
86 session: Some(session),
87 }
88 }
89}
90
91impl Drop for ManagedSession {
92 fn drop(&mut self) {
93 let session = self.session.take().unwrap();
94 self.session_pool.recycle(session);
95 }
96}
97
98impl Deref for ManagedSession {
99 type Target = SessionHandle;
100
101 fn deref(&self) -> &Self::Target {
102 self.session.as_ref().unwrap()
103 }
104}
105
106impl DerefMut for ManagedSession {
107 fn deref_mut(&mut self) -> &mut Self::Target {
108 self.session.as_mut().unwrap()
109 }
110}
111
112struct Sessions {
115 available_sessions: VecDeque<SessionHandle>,
116
117 waiters: VecDeque<oneshot::Sender<()>>,
118
119 orphans: Vec<SessionHandle>,
121
122 num_inuse: usize,
124
125 num_creating: usize,
127}
128
129impl Sessions {
130 fn num_opened(&self) -> usize {
131 self.num_inuse + self.available_sessions.len()
132 }
133
134 fn take_waiter(&mut self) -> Option<oneshot::Sender<()>> {
135 while let Some(waiter) = self.waiters.pop_front() {
136 if !waiter.is_closed() {
138 return Some(waiter);
139 }
140 }
141 None
142 }
143
144 fn take(&mut self) -> Option<SessionHandle> {
145 match self.available_sessions.pop_front() {
146 None => None,
147 Some(s) => {
148 self.num_inuse += 1;
149 Some(s)
150 }
151 }
152 }
153
154 fn release(&mut self, session: SessionHandle) {
155 self.num_inuse -= 1;
156 if session.valid {
157 self.available_sessions.push_back(session);
158 } else if !session.deleted {
159 tracing::trace!("save as orphan name={}", session.session.name);
160 self.orphans.push(session);
161 }
162 }
163
164 fn reserve(&mut self, max_opened: usize, inc_step: usize) -> usize {
167 let num_opened = self.num_opened();
168 let num_creating = self.num_creating;
169 if max_opened < num_creating + num_opened {
170 tracing::trace!(
171 "No available connections max={}, num_creating={}, current={}",
172 max_opened,
173 num_creating,
174 num_opened
175 );
176 return 0;
177 }
178 let mut increasing = max_opened - (num_creating + num_opened);
179 if increasing > inc_step {
180 increasing = inc_step
181 }
182 self.num_creating += increasing;
183 increasing
184 }
185
186 fn replenish(&mut self, session_count: usize, result: Result<Vec<SessionHandle>, Status>) {
187 self.num_creating -= session_count;
188 match result {
189 Ok(mut new_sessions) => {
190 while let Some(session) = new_sessions.pop() {
191 self.available_sessions.push_back(session);
192 if let Some(waiter) = self.take_waiter() {
193 let _ = waiter.send(());
194 }
195 }
196 }
197 Err(e) => tracing::error!("failed to create new sessions {:?}", e),
198 }
199 }
200}
201
202#[derive(Clone)]
203struct SessionPool {
204 inner: Arc<RwLock<Sessions>>,
205 session_creation_sender: UnboundedSender<usize>,
206 config: Arc<SessionConfig>,
207}
208
209impl SessionPool {
210 async fn new(
211 database: String,
212 conn_pool: &ConnectionManager,
213 session_creation_sender: UnboundedSender<usize>,
214 config: Arc<SessionConfig>,
215 ) -> Result<Self, Status> {
216 let available_sessions = Self::init_pool(database, conn_pool, config.min_opened).await?;
217 Ok(SessionPool {
218 inner: Arc::new(RwLock::new(Sessions {
219 available_sessions,
220 waiters: VecDeque::new(),
221 orphans: Vec::new(),
222 num_inuse: 0,
223 num_creating: 0,
224 })),
225 session_creation_sender,
226 config,
227 })
228 }
229
230 async fn init_pool(
231 database: String,
232 conn_pool: &ConnectionManager,
233 min_opened: usize,
234 ) -> Result<VecDeque<SessionHandle>, Status> {
235 let channel_num = conn_pool.num();
236 let creation_count_per_channel = min_opened / channel_num;
237 let remainder = min_opened % channel_num;
238
239 let mut sessions = Vec::<SessionHandle>::new();
240 let mut tasks = JoinSet::new();
241 for _ in 0..channel_num {
242 let creation_count = if channel_num == 0 {
244 creation_count_per_channel + remainder
245 } else {
246 creation_count_per_channel
247 };
248 let next_client = conn_pool.conn().with_metadata(client_metadata(&database));
249 let database = database.clone();
250 tasks.spawn(async move { batch_create_sessions(next_client, &database, creation_count).await });
251 }
252 while let Some(r) = tasks.join_next().await {
253 let new_sessions = r.map_err(|e| Status::from_error(e.into()))??;
254 sessions.extend(new_sessions);
255 }
256 tracing::debug!("initial session created count = {}", sessions.len());
257 Ok(sessions.into())
258 }
259
260 fn num_opened(&self) -> usize {
261 self.inner.read().num_opened()
262 }
263
264 async fn acquire(&self) -> Result<ManagedSession, SessionError> {
271 loop {
272 let (on_session_acquired, session_count) = {
273 let mut sessions = self.inner.write();
274
275 if sessions.waiters.is_empty() {
277 if let Some(mut s) = sessions.take() {
278 s.last_used_at = Instant::now();
279 return Ok(ManagedSession::new(self.clone(), s));
280 }
281 }
282 let (sender, receiver) = oneshot::channel();
284 sessions.waiters.push_back(sender);
285 let session_count = sessions.reserve(self.config.max_opened, self.config.inc_step);
286 (receiver, session_count)
287 };
288
289 if session_count > 0 {
290 let _ = self.session_creation_sender.send(session_count);
291 }
292
293 match timeout(self.config.session_get_timeout, on_session_acquired).await {
295 Ok(Ok(())) => {
296 let mut sessions = self.inner.write();
297 if let Some(mut s) = sessions.take() {
298 s.last_used_at = Instant::now();
299 return Ok(ManagedSession::new(self.clone(), s));
300 } else {
301 continue; }
303 }
304 _ => {
305 {
306 let sessions = self.inner.write();
307 tracing::info!(
308 available = sessions.available_sessions.len(),
309 waiters = sessions.waiters.len(),
310 orphans = sessions.orphans.len(),
311 num_inuse = sessions.num_inuse,
312 num_creating = sessions.num_creating,
313 max_opened = self.config.max_opened,
314 "Timeout acquiring session"
315 );
316 }
317 return Err(SessionError::SessionGetTimeout);
318 }
319 }
320 }
321 }
322
323 fn recycle(&self, mut session: SessionHandle) {
329 if session.valid {
330 let mut sessions = self.inner.write();
331 let waiter = sessions.take_waiter();
332 if sessions.num_opened() > self.config.max_idle
333 && session.created_at + self.config.idle_timeout < Instant::now()
334 && waiter.is_none()
335 {
336 session.valid = false
338 }
339 sessions.release(session);
340 if let Some(waiter) = waiter {
341 let _ = waiter.send(());
342 }
343 } else {
344 let session_count = {
345 let mut sessions = self.inner.write();
346 sessions.release(session);
347 if sessions.num_opened() < self.config.min_opened && !sessions.waiters.is_empty() {
348 sessions.reserve(self.config.max_opened, self.config.inc_step)
349 } else {
350 0
351 }
352 };
353 if session_count > 0 {
354 let _ = self.session_creation_sender.send(session_count);
355 }
356 }
357 }
358
359 async fn close(&self) {
360 let empty = VecDeque::new();
361 let deleting_sessions = { mem::replace(&mut self.inner.write().available_sessions, empty) };
362 for mut session in deleting_sessions {
363 session.delete().await;
364 }
365
366 self.remove_orphans().await;
367 }
368
369 async fn remove_orphans(&self) {
370 let empty = vec![];
371 let deleting_sessions = { mem::replace(&mut self.inner.write().orphans, empty) };
372 tracing::trace!("remove {} orphan sessions", deleting_sessions.len());
373 for mut session in deleting_sessions {
374 session.delete().await;
375 }
376 }
377}
378
379#[derive(Clone, Debug)]
380pub struct SessionConfig {
381 pub max_opened: usize,
386
387 pub min_opened: usize,
394
395 pub max_idle: usize,
397
398 pub idle_timeout: Duration,
402
403 pub session_alive_trust_duration: Duration,
404
405 pub session_get_timeout: Duration,
407
408 pub refresh_interval: Duration,
410
411 inc_step: usize,
414}
415
416impl Default for SessionConfig {
417 fn default() -> Self {
418 SessionConfig {
419 max_opened: 400,
420 min_opened: 10,
421 max_idle: 300,
422 inc_step: 25,
423 idle_timeout: Duration::from_secs(30 * 60),
424 session_alive_trust_duration: Duration::from_secs(55 * 60),
425 session_get_timeout: Duration::from_secs(1),
426 refresh_interval: Duration::from_secs(5 * 60),
427 }
428 }
429}
430
431#[derive(thiserror::Error, Debug)]
432pub enum SessionError {
433 #[error("session get time out")]
434 SessionGetTimeout,
435 #[error("failed to create session")]
436 FailedToCreateSession,
437 #[error(transparent)]
438 GRPC(#[from] Status),
439}
440
441impl TryAs<Status> for SessionError {
442 fn try_as(&self) -> Option<&Status> {
443 match self {
444 SessionError::GRPC(e) => Some(e),
445 _ => None,
446 }
447 }
448}
449
450pub(crate) struct SessionManager {
451 session_pool: SessionPool,
452 cancel: CancellationToken,
453 tasks: Mutex<Vec<JoinHandle<()>>>,
454}
455
456impl SessionManager {
457 pub async fn new(
458 database: impl Into<String>,
459 conn_pool: ConnectionManager,
460 config: SessionConfig,
461 ) -> Result<Arc<SessionManager>, Status> {
462 let database = database.into();
463 let (sender, receiver) = mpsc::unbounded_channel();
464 let session_pool = SessionPool::new(database.clone(), &conn_pool, sender, Arc::new(config.clone())).await?;
465
466 let cancel = CancellationToken::new();
467 let task_session_cleaner = Self::spawn_health_check_task(config, session_pool.clone(), cancel.clone());
468 let task_session_creator =
469 Self::spawn_session_creation_task(session_pool.clone(), database, conn_pool, receiver, cancel.clone());
470
471 let sm = SessionManager {
472 session_pool,
473 cancel,
474 tasks: Mutex::new(vec![task_session_cleaner, task_session_creator]),
475 };
476 Ok(Arc::new(sm))
477 }
478
479 pub fn num_opened(&self) -> usize {
480 self.session_pool.num_opened()
481 }
482
483 pub async fn get(&self) -> Result<ManagedSession, SessionError> {
484 self.session_pool.acquire().await
485 }
486
487 pub async fn close(&self) {
488 if self.cancel.is_cancelled() {
489 return;
490 }
491 self.cancel.cancel();
492 let tasks = { mem::take(&mut *self.tasks.lock()) };
493 for task in tasks {
494 let _ = task.await;
495 }
496 self.session_pool.close().await;
497 }
498
499 fn spawn_session_creation_task(
500 session_pool: SessionPool,
501 database: String,
502 conn_pool: ConnectionManager,
503 mut rx: UnboundedReceiver<usize>,
504 cancel: CancellationToken,
505 ) -> JoinHandle<()> {
506 tokio::spawn(async move {
507 let mut tasks = JoinSet::default();
508 loop {
509 select! {
510 biased;
511 _ = cancel.cancelled() => break,
512 Some(Ok((session_count, result))) = tasks.join_next(), if !tasks.is_empty() => {
513 session_pool.inner.write().replenish(session_count, result);
514 }
515 session_count = rx.recv() => match session_count {
516 Some(session_count) => {
517 let client = conn_pool.conn().with_metadata(client_metadata(&database));
518 let database = database.clone();
519 tasks.spawn(async move { (session_count, batch_create_sessions(client, &database, session_count).await) });
520 },
521 None => continue
522 },
523 }
524 }
525 tracing::trace!("shutdown session creation task.");
526 })
527 }
528
529 fn spawn_health_check_task(
530 config: SessionConfig,
531 session_pool: SessionPool,
532 cancel: CancellationToken,
533 ) -> JoinHandle<()> {
534 let start = Instant::now() + config.refresh_interval;
535 let mut interval = tokio::time::interval_at(start.into(), config.refresh_interval);
536
537 tokio::spawn(async move {
538 loop {
539 select! {
540 _ = interval.tick() => {},
541 _ = cancel.cancelled() => break
542 }
543 let now = Instant::now();
544
545 session_pool.remove_orphans().await;
547
548 health_check(
550 now + Duration::from_nanos(1),
551 config.session_alive_trust_duration,
552 &session_pool,
553 cancel.clone(),
554 )
555 .await;
556 }
557 tracing::trace!("shutdown health check task.")
558 })
559 }
560}
561
562async fn health_check(
563 now: Instant,
564 session_alive_trust_duration: Duration,
565 sessions: &SessionPool,
566 cancel: CancellationToken,
567) {
568 tracing::trace!("start health check");
569 let start = Instant::now();
570 let sleep_duration = Duration::from_millis(10);
571 loop {
572 select! {
573 _ = sleep(sleep_duration) => {},
574 _ = cancel.cancelled() => break
575 }
576 let mut s = {
577 let mut locked = sessions.inner.write();
579 match locked.take() {
580 Some(mut s) => {
581 if s.last_checked_at == now {
583 locked.release(s);
584 break;
585 }
586 if std::cmp::max(s.last_used_at, s.last_pong_at) + session_alive_trust_duration >= now {
587 s.last_checked_at = now;
588 locked.release(s);
589 continue;
590 }
591 s
592 }
593 None => break,
594 }
595 };
596
597 let request = ping_query_request(s.session.name.clone());
598 match s.spanner_client.execute_sql(request, None).await {
599 Ok(_) => {
600 s.last_checked_at = now;
601 s.last_pong_at = now;
602 sessions.recycle(s);
603 }
604 Err(_) => {
605 s.delete().await;
606 sessions.recycle(s);
607 }
608 }
609 }
610 tracing::trace!("end health check elapsed={}msec", start.elapsed().as_millis());
611}
612
613async fn batch_create_sessions(
614 spanner_client: Client,
615 database: &str,
616 mut remaining_create_count: usize,
617) -> Result<Vec<SessionHandle>, Status> {
618 let mut created = Vec::with_capacity(remaining_create_count);
619 while remaining_create_count > 0 {
620 let sessions = batch_create_session(spanner_client.clone(), database, remaining_create_count).await?;
621 let actually_created = sessions.len();
624 remaining_create_count -= actually_created;
625 created.extend(sessions);
626 }
627 Ok(created)
628}
629
630async fn batch_create_session(
631 mut spanner_client: Client,
632 database: &str,
633 session_count: usize,
634) -> Result<Vec<SessionHandle>, Status> {
635 let request = BatchCreateSessionsRequest {
636 database: database.to_string(),
637 session_template: None,
638 session_count: session_count as i32,
639 };
640
641 tracing::debug!("spawn session creation request : session_count = {}", session_count);
642 let response = spanner_client.batch_create_sessions(request, None).await?.into_inner();
643
644 let now = Instant::now();
645 Ok(response
646 .session
647 .into_iter()
648 .map(|s| SessionHandle::new(s, spanner_client.clone(), now))
649 .collect::<Vec<SessionHandle>>())
650}
651
652pub(crate) fn client_metadata(database: &str) -> MetadataMap {
653 let mut metadata = MetadataMap::new();
654 metadata.insert("google-cloud-resource-prefix", database.parse().unwrap());
655 metadata
656}
657
658#[cfg(test)]
659mod tests {
660 use std::sync::atomic::{AtomicI64, Ordering};
661 use std::sync::Arc;
662 use std::time::{Duration, Instant};
663
664 use parking_lot::RwLock;
665 use serial_test::serial;
666 use tokio::time::sleep;
667 use tokio_util::sync::CancellationToken;
668
669 use google_cloud_gax::conn::{ConnectionOptions, Environment};
670 use google_cloud_googleapis::spanner::v1::ExecuteSqlRequest;
671
672 use crate::apiv1::conn_pool::ConnectionManager;
673 use crate::session::{
674 batch_create_sessions, client_metadata, health_check, SessionConfig, SessionError, SessionManager,
675 };
676
677 pub const DATABASE: &str = "projects/local-project/instances/test-instance/databases/local-database";
678
679 #[ctor::ctor]
680 fn init() {
681 let filter = tracing_subscriber::filter::EnvFilter::from_default_env()
682 .add_directive("google_cloud_spanner=trace".parse().unwrap());
683 let _ = tracing_subscriber::fmt().with_env_filter(filter).try_init();
684 }
685
686 async fn assert_rush(use_invalidate: bool, config: SessionConfig) -> Arc<SessionManager> {
687 let cm = ConnectionManager::new(
688 4,
689 &Environment::Emulator("localhost:9010".to_string()),
690 "",
691 &ConnectionOptions::default(),
692 )
693 .await
694 .unwrap();
695 let sm = SessionManager::new(DATABASE, cm, config).await.unwrap();
696
697 let counter = Arc::new(AtomicI64::new(0));
698 let mut spawns = Vec::with_capacity(100);
699 for _ in 0..100 {
700 let sm = sm.clone();
701 let counter = Arc::clone(&counter);
702 spawns.push(tokio::spawn(async move {
703 let mut session = sm.get().await.unwrap();
704 if use_invalidate {
705 session.delete().await;
706 }
707 counter.fetch_add(1, Ordering::SeqCst);
708 sleep(Duration::from_millis(300)).await;
709 }));
710 }
711 for handler in spawns {
712 let _ = handler.await;
713 }
714 sm
715 }
716
717 #[tokio::test(flavor = "multi_thread")]
718 #[serial]
719 async fn test_health_check_checked() {
720 let cm = ConnectionManager::new(
721 4,
722 &Environment::Emulator("localhost:9010".to_string()),
723 "",
724 &ConnectionOptions::default(),
725 )
726 .await
727 .unwrap();
728 let session_alive_trust_duration = Duration::from_millis(10);
729 let config = SessionConfig {
730 min_opened: 5,
731 session_alive_trust_duration,
732 max_opened: 5,
733 ..Default::default()
734 };
735 let sm = std::sync::Arc::new(SessionManager::new(DATABASE, cm, config).await.unwrap());
736 sleep(Duration::from_secs(1)).await;
737
738 let cancel = CancellationToken::new();
739 health_check(Instant::now(), session_alive_trust_duration, &sm.session_pool, cancel.clone()).await;
740
741 assert_eq!(sm.num_opened(), 5);
742 tokio::time::sleep(Duration::from_millis(500)).await;
743 cancel.cancel();
744 }
745
746 #[tokio::test(flavor = "multi_thread")]
747 #[serial]
748 async fn test_health_check_not_checked() {
749 let cm = ConnectionManager::new(
750 4,
751 &Environment::Emulator("localhost:9010".to_string()),
752 "",
753 &ConnectionOptions::default(),
754 )
755 .await
756 .unwrap();
757 let session_alive_trust_duration = Duration::from_secs(10);
758 let config = SessionConfig {
759 min_opened: 5,
760 session_alive_trust_duration,
761 max_opened: 5,
762 ..Default::default()
763 };
764 let sm = Arc::new(SessionManager::new(DATABASE, cm, config).await.unwrap());
765 sleep(Duration::from_secs(1)).await;
766
767 let cancel = CancellationToken::new();
768 health_check(Instant::now(), session_alive_trust_duration, &sm.session_pool, cancel.clone()).await;
769
770 assert_eq!(sm.num_opened(), 5);
771 sleep(Duration::from_millis(500)).await;
772 cancel.cancel();
773 }
774
775 #[tokio::test(flavor = "multi_thread")]
776 #[serial]
777 async fn test_increase_session_and_idle_session_expired() {
778 let conn_pool = ConnectionManager::new(
779 4,
780 &Environment::Emulator("localhost:9010".to_string()),
781 "",
782 &ConnectionOptions::default(),
783 )
784 .await
785 .unwrap();
786 let config = SessionConfig {
787 idle_timeout: Duration::from_millis(10),
788 min_opened: 10,
789 max_idle: 20,
790 max_opened: 45,
791 ..Default::default()
792 };
793 let sm = SessionManager::new(DATABASE, conn_pool, config).await.unwrap();
794 {
795 let mut sessions = Vec::new();
796 for _ in 0..45 {
797 sessions.push(sm.get().await.unwrap());
798 }
799
800 assert_eq!(sm.num_opened(), 45);
802 assert_eq!(sm.session_pool.inner.read().num_inuse, 45, "all the session are using");
803 sleep(Duration::from_secs(1)).await;
804 }
805
806 let sessions = sm.session_pool.inner.read();
808 assert_eq!(sessions.num_inuse, 0, "invalid num_inuse");
809 assert_eq!(sessions.available_sessions.len(), 20, "invalid available sessions");
810 assert_eq!(sessions.num_opened(), 20, "invalid num open");
811 assert_eq!(sessions.waiters.len(), 0, "session waiters is 0");
812 }
813
814 #[tokio::test(flavor = "multi_thread")]
815 #[serial]
816 async fn test_too_many_session_timeout() {
817 let conn_pool = ConnectionManager::new(
818 4,
819 &Environment::Emulator("localhost:9010".to_string()),
820 "",
821 &ConnectionOptions::default(),
822 )
823 .await
824 .unwrap();
825 let config = SessionConfig {
826 idle_timeout: Duration::from_millis(10),
827 min_opened: 10,
828 max_idle: 20,
829 max_opened: 45,
830 session_get_timeout: Duration::from_secs(1),
831 ..Default::default()
832 };
833 let sm = Arc::new(SessionManager::new(DATABASE, conn_pool, config.clone()).await.unwrap());
834 let mu = Arc::new(RwLock::new(Vec::new()));
835 let mut awaiters = Vec::with_capacity(100);
836 for _ in 0..100 {
837 let sm = sm.clone();
838 let mu = mu.clone();
839 awaiters.push(tokio::spawn(async move {
840 let session = sm.get().await;
841 mu.write().push(session);
842 0
843 }))
844 }
845 for handler in awaiters {
846 let _ = handler.await;
847 }
848 let sessions = mu.read();
849 for i in 0..sessions.len() - 1 {
850 let session = &sessions[i];
851 if i >= config.max_opened {
852 assert!(session.is_err(), "must err {i}");
853 match session.as_ref().err().unwrap() {
854 SessionError::SessionGetTimeout => {}
855 _ => {
856 panic!("must be session timeout error")
857 }
858 }
859 } else {
860 assert!(session.is_ok(), "must ok {i}");
861 }
862 }
863 let pool = sm.session_pool.inner.read();
864 assert_eq!(pool.num_opened(), config.max_opened);
865 assert_eq!(pool.waiters.len(), 100 - config.max_opened); }
867
868 #[tokio::test(flavor = "multi_thread")]
869 #[serial]
870 async fn test_rush_invalidate() {
871 let config = SessionConfig {
872 session_get_timeout: Duration::from_secs(20),
873 min_opened: 10,
874 max_idle: 20,
875 max_opened: 45,
876 ..Default::default()
877 };
878 let sm = assert_rush(true, config.clone()).await;
879 {
880 let sessions = sm.session_pool.inner.read();
881 let available_sessions = sessions.available_sessions.len();
882 assert_eq!(sessions.num_inuse, 0);
883 assert_eq!(sessions.waiters.len(), 0);
884 assert_eq!(sessions.orphans.len(), 0);
885 assert!(
886 available_sessions <= config.max_opened && available_sessions >= config.min_opened,
887 "now is {available_sessions}"
888 );
889 }
890 sm.close().await;
891 }
892
893 #[tokio::test(flavor = "multi_thread")]
894 #[serial]
895 async fn test_rush() {
896 let config = SessionConfig {
897 min_opened: 10,
898 max_idle: 20,
899 max_opened: 45,
900 ..Default::default()
901 };
902 let sm = assert_rush(false, config.clone()).await;
903 {
904 let sessions = sm.session_pool.inner.read();
905 let available_sessions = sessions.available_sessions.len();
906 assert_eq!(sessions.num_inuse, 0);
907 assert_eq!(sessions.waiters.len(), 0);
908 assert_eq!(sessions.orphans.len(), 0);
909 assert!(
910 available_sessions <= config.max_opened && available_sessions >= config.min_opened,
911 "now is {available_sessions}"
912 );
913 }
914 sm.close().await;
915 }
916
917 #[tokio::test(flavor = "multi_thread")]
918 #[serial]
919 async fn test_rush_with_invalidate() {
920 let config = SessionConfig {
921 min_opened: 10,
922 max_idle: 20,
923 max_opened: 45,
924 ..Default::default()
925 };
926 let sm = assert_rush(true, config.clone()).await;
927 {
928 let sessions = sm.session_pool.inner.read();
929 let available_sessions = sessions.available_sessions.len();
930 assert_eq!(sessions.num_inuse, 0);
931 assert_eq!(sessions.waiters.len(), 0);
932 assert_eq!(sessions.orphans.len(), 0);
933 assert!(
934 available_sessions <= config.max_opened && available_sessions >= config.min_opened,
935 "now is {available_sessions}"
936 );
937 }
938 sm.close().await;
939 }
940
941 #[tokio::test(flavor = "multi_thread")]
942 #[serial]
943 async fn test_rush_with_health_check() {
944 let config = SessionConfig {
945 session_alive_trust_duration: Duration::from_millis(10),
946 refresh_interval: Duration::from_millis(250),
947 session_get_timeout: Duration::from_secs(20),
948 min_opened: 10,
949 max_idle: 20,
950 max_opened: 45,
951 ..Default::default()
952 };
953 let sm = assert_rush(false, config.clone()).await;
954 sleep(Duration::from_secs(2)).await;
955 {
956 let sessions = sm.session_pool.inner.read();
957 let available_sessions = sessions.available_sessions.len();
958 assert!(sessions.num_inuse <= 1, "num_inuse is {}", sessions.num_inuse);
959 assert_eq!(sessions.waiters.len(), 0);
960 assert_eq!(sessions.orphans.len(), 0);
961 assert!(
962 available_sessions <= config.max_opened && available_sessions >= config.max_idle - 1,
963 "now is {available_sessions}"
964 );
965 }
966 sm.close().await;
967 }
968
969 #[tokio::test(flavor = "multi_thread")]
970 #[serial]
971 async fn test_rush_with_health_check_and_invalidate() {
972 let config = SessionConfig {
973 session_alive_trust_duration: Duration::from_millis(10),
974 refresh_interval: Duration::from_millis(250),
975 session_get_timeout: Duration::from_secs(20),
976 min_opened: 10,
977 max_idle: 20,
978 max_opened: 45,
979 ..Default::default()
980 };
981 let sm = assert_rush(true, config.clone()).await;
982 sleep(Duration::from_secs(2)).await;
983 {
984 let sessions = sm.session_pool.inner.read();
985 let available_sessions = sessions.available_sessions.len();
986 assert!(sessions.num_inuse <= 1, "num_inuse is {}", sessions.num_inuse);
987 assert_eq!(sessions.waiters.len(), 0);
988 assert_eq!(sessions.orphans.len(), 0);
989 assert!(
990 available_sessions <= config.max_opened && available_sessions >= config.min_opened - 1,
991 "now is {available_sessions}"
992 );
993 }
994 sm.close().await;
995 }
996
997 #[tokio::test(flavor = "multi_thread")]
998 #[serial]
999 async fn test_rush_with_idle_expired() {
1000 let config = SessionConfig {
1001 min_opened: 10,
1002 max_idle: 20,
1003 max_opened: 45,
1004 idle_timeout: Duration::from_millis(1),
1005 ..Default::default()
1006 };
1007 let sm = assert_rush(false, config.clone()).await;
1008 {
1009 let sessions = sm.session_pool.inner.read();
1010 assert_eq!(sessions.num_inuse, 0);
1011 assert_eq!(sessions.waiters.len(), 0);
1012 assert_eq!(sessions.orphans.len(), config.max_opened - config.max_idle);
1013 assert_eq!(sessions.available_sessions.len(), config.max_idle);
1014 }
1015 sm.close().await;
1016 }
1017
1018 #[tokio::test(flavor = "multi_thread")]
1019 #[serial]
1020 async fn test_rush_with_health_check_and_idle_expired() {
1021 let config = SessionConfig {
1022 session_alive_trust_duration: Duration::from_millis(10),
1023 refresh_interval: Duration::from_millis(250),
1024 session_get_timeout: Duration::from_secs(20),
1025 min_opened: 10,
1026 max_idle: 20,
1027 max_opened: 45,
1028 idle_timeout: Duration::from_millis(1),
1029 ..Default::default()
1030 };
1031 let sm = assert_rush(false, config.clone()).await;
1032 sleep(Duration::from_secs(1)).await;
1033 {
1034 let sessions = sm.session_pool.inner.read();
1035 assert!(sessions.num_inuse <= 1, "num_inuse is {}", sessions.num_inuse);
1036 assert_eq!(sessions.waiters.len(), 0);
1037 assert_eq!(sessions.orphans.len(), 0);
1038 let available_sessions = sessions.available_sessions.len();
1039 assert!(
1040 available_sessions >= config.min_opened - 1 && available_sessions <= config.max_idle,
1041 "now is {available_sessions}"
1042 );
1043 }
1044 sm.close().await;
1045 }
1046
1047 #[tokio::test(flavor = "multi_thread")]
1048 #[serial]
1049 async fn test_rush_with_health_check_and_idle_expired_and_invalid() {
1050 let config = SessionConfig {
1051 session_alive_trust_duration: Duration::from_millis(10),
1052 refresh_interval: Duration::from_millis(250),
1053 session_get_timeout: Duration::from_secs(20),
1054 min_opened: 10,
1055 max_idle: 20,
1056 max_opened: 45,
1057 idle_timeout: Duration::from_millis(1),
1058 ..Default::default()
1059 };
1060 let sm = assert_rush(true, config.clone()).await;
1061 sleep(Duration::from_secs(2)).await;
1062 {
1063 let sessions = sm.session_pool.inner.read();
1064 assert!(sessions.num_inuse <= 1, "num_inuse is {}", sessions.num_inuse);
1065 assert_eq!(sessions.orphans.len(), 0);
1067 assert_eq!(sessions.waiters.len(), 0, "invalid waiters");
1068 let available_sessions = sessions.available_sessions.len();
1069 assert!(
1070 available_sessions >= config.min_opened - 1 && available_sessions <= config.max_idle,
1071 "now is {available_sessions}"
1072 );
1073 }
1074 sm.close().await;
1075 }
1076
1077 #[tokio::test(flavor = "multi_thread")]
1078 #[serial]
1079 async fn test_close() {
1080 let cm = ConnectionManager::new(
1081 4,
1082 &Environment::Emulator("localhost:9010".to_string()),
1083 "",
1084 &ConnectionOptions::default(),
1085 )
1086 .await
1087 .unwrap();
1088 let config = SessionConfig::default();
1089 let sm = SessionManager::new(DATABASE, cm, config.clone()).await.unwrap();
1090 assert_eq!(sm.num_opened(), config.min_opened);
1091 sm.close().await;
1092 assert_eq!(sm.num_opened(), 0);
1093 assert_eq!(sm.session_pool.inner.read().orphans.len(), 0);
1094 }
1095
1096 #[tokio::test(flavor = "multi_thread")]
1097 #[serial]
1098 async fn test_batch_create_sessions() {
1099 let cm = ConnectionManager::new(
1100 1,
1101 &Environment::Emulator("localhost:9010".to_string()),
1102 "",
1103 &ConnectionOptions::default(),
1104 )
1105 .await
1106 .unwrap();
1107 let client = cm.conn().with_metadata(client_metadata(DATABASE));
1108 let session_count = 125;
1109 let result = batch_create_sessions(client.clone(), DATABASE, session_count).await;
1110 match result {
1111 Ok(created) => {
1112 assert_eq!(session_count, created.len());
1113 for mut s in created {
1114 let ping_result = s
1115 .spanner_client
1116 .execute_sql(
1117 ExecuteSqlRequest {
1118 session: s.session.name.to_string(),
1119 transaction: None,
1120 sql: "SELECT 1".to_string(),
1121 params: None,
1122 param_types: Default::default(),
1123 resume_token: vec![],
1124 query_mode: 0,
1125 partition_token: vec![],
1126 seqno: 0,
1127 query_options: None,
1128 request_options: None,
1129 directed_read_options: None,
1130 data_boost_enabled: false,
1131 last_statement: false,
1132 },
1133 None,
1134 )
1135 .await;
1136 assert!(ping_result.is_ok());
1137 }
1138 }
1139 Err(err) => panic!("{err:?}"),
1140 }
1141 }
1142}