gcloud_spanner/
session.rs

1use std::collections::VecDeque;
2use std::mem;
3use std::ops::{Deref, DerefMut};
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use parking_lot::{Mutex, RwLock};
8use thiserror;
9use tokio::select;
10use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
11use tokio::sync::{mpsc, oneshot};
12use tokio::task::{JoinHandle, JoinSet};
13use tokio::time::{sleep, timeout};
14use tokio_util::sync::CancellationToken;
15
16use google_cloud_gax::grpc::metadata::MetadataMap;
17use google_cloud_gax::grpc::{Code, Status};
18use google_cloud_gax::retry::TryAs;
19use google_cloud_googleapis::spanner::v1::{BatchCreateSessionsRequest, DeleteSessionRequest, Session};
20
21use crate::apiv1::conn_pool::ConnectionManager;
22use crate::apiv1::spanner_client::{ping_query_request, Client};
23
24/// Session
25pub struct SessionHandle {
26    pub session: Session,
27    pub spanner_client: Client,
28    valid: bool,
29    deleted: bool,
30    last_used_at: Instant,
31    last_checked_at: Instant,
32    last_pong_at: Instant,
33    created_at: Instant,
34}
35
36impl SessionHandle {
37    pub(crate) fn new(session: Session, spanner_client: Client, now: Instant) -> SessionHandle {
38        SessionHandle {
39            session,
40            spanner_client,
41            valid: true,
42            deleted: false,
43            last_used_at: now,
44            last_checked_at: now,
45            last_pong_at: now,
46            created_at: now,
47        }
48    }
49
50    pub async fn invalidate_if_needed<T>(&mut self, arg: Result<T, Status>) -> Result<T, Status> {
51        match arg {
52            Ok(s) => Ok(s),
53            Err(e) => {
54                if e.code() == Code::NotFound && e.message().contains("Session not found:") {
55                    tracing::debug!("session invalidate {}", self.session.name);
56                    self.delete().await;
57                }
58                Err(e)
59            }
60        }
61    }
62
63    async fn delete(&mut self) {
64        self.valid = false;
65        let session_name = &self.session.name;
66        let request = DeleteSessionRequest {
67            name: session_name.to_string(),
68        };
69        match self.spanner_client.delete_session(request, None).await {
70            Ok(_) => self.deleted = true,
71            Err(e) => tracing::error!("failed to delete session {}, {:?}", session_name, e),
72        };
73    }
74}
75
76/// ManagedSession
77pub struct ManagedSession {
78    session_pool: SessionPool,
79    session: Option<SessionHandle>,
80}
81
82impl ManagedSession {
83    fn new(session_pool: SessionPool, session: SessionHandle) -> Self {
84        ManagedSession {
85            session_pool,
86            session: Some(session),
87        }
88    }
89}
90
91impl Drop for ManagedSession {
92    fn drop(&mut self) {
93        let session = self.session.take().unwrap();
94        self.session_pool.recycle(session);
95    }
96}
97
98impl Deref for ManagedSession {
99    type Target = SessionHandle;
100
101    fn deref(&self) -> &Self::Target {
102        self.session.as_ref().unwrap()
103    }
104}
105
106impl DerefMut for ManagedSession {
107    fn deref_mut(&mut self) -> &mut Self::Target {
108        self.session.as_mut().unwrap()
109    }
110}
111
112/// Sessions have all sessions and waiters.
113/// This is for atomically locking the waiting list and free sessions.
114struct Sessions {
115    available_sessions: VecDeque<SessionHandle>,
116
117    waiters: VecDeque<oneshot::Sender<()>>,
118
119    /// Invalid sessions living in the server.
120    orphans: Vec<SessionHandle>,
121
122    /// number of sessions user uses.
123    num_inuse: usize,
124
125    /// number of sessions scheduled to be replenished.
126    num_creating: usize,
127}
128
129impl Sessions {
130    fn num_opened(&self) -> usize {
131        self.num_inuse + self.available_sessions.len()
132    }
133
134    fn take_waiter(&mut self) -> Option<oneshot::Sender<()>> {
135        while let Some(waiter) = self.waiters.pop_front() {
136            // Waiter can be closed when session acquisition times out.
137            if !waiter.is_closed() {
138                return Some(waiter);
139            }
140        }
141        None
142    }
143
144    fn take(&mut self) -> Option<SessionHandle> {
145        match self.available_sessions.pop_front() {
146            None => None,
147            Some(s) => {
148                self.num_inuse += 1;
149                Some(s)
150            }
151        }
152    }
153
154    fn release(&mut self, session: SessionHandle) {
155        self.num_inuse -= 1;
156        if session.valid {
157            self.available_sessions.push_back(session);
158        } else if !session.deleted {
159            tracing::trace!("save as orphan name={}", session.session.name);
160            self.orphans.push(session);
161        }
162    }
163
164    /// reserve calculates next session count to create.
165    /// Must call replenish after calling this method.
166    fn reserve(&mut self, max_opened: usize, inc_step: usize) -> usize {
167        let num_opened = self.num_opened();
168        let num_creating = self.num_creating;
169        if max_opened < num_creating + num_opened {
170            tracing::trace!(
171                "No available connections max={}, num_creating={}, current={}",
172                max_opened,
173                num_creating,
174                num_opened
175            );
176            return 0;
177        }
178        let mut increasing = max_opened - (num_creating + num_opened);
179        if increasing > inc_step {
180            increasing = inc_step
181        }
182        self.num_creating += increasing;
183        increasing
184    }
185
186    fn replenish(&mut self, session_count: usize, result: Result<Vec<SessionHandle>, Status>) {
187        self.num_creating -= session_count;
188        match result {
189            Ok(mut new_sessions) => {
190                while let Some(session) = new_sessions.pop() {
191                    self.available_sessions.push_back(session);
192                    if let Some(waiter) = self.take_waiter() {
193                        let _ = waiter.send(());
194                    }
195                }
196            }
197            Err(e) => tracing::error!("failed to create new sessions {:?}", e),
198        }
199    }
200}
201
202#[derive(Clone)]
203struct SessionPool {
204    inner: Arc<RwLock<Sessions>>,
205    session_creation_sender: UnboundedSender<usize>,
206    config: Arc<SessionConfig>,
207}
208
209impl SessionPool {
210    async fn new(
211        database: String,
212        conn_pool: &ConnectionManager,
213        session_creation_sender: UnboundedSender<usize>,
214        config: Arc<SessionConfig>,
215    ) -> Result<Self, Status> {
216        let available_sessions = Self::init_pool(database, conn_pool, config.min_opened).await?;
217        Ok(SessionPool {
218            inner: Arc::new(RwLock::new(Sessions {
219                available_sessions,
220                waiters: VecDeque::new(),
221                orphans: Vec::new(),
222                num_inuse: 0,
223                num_creating: 0,
224            })),
225            session_creation_sender,
226            config,
227        })
228    }
229
230    async fn init_pool(
231        database: String,
232        conn_pool: &ConnectionManager,
233        min_opened: usize,
234    ) -> Result<VecDeque<SessionHandle>, Status> {
235        let channel_num = conn_pool.num();
236        let creation_count_per_channel = min_opened / channel_num;
237        let remainder = min_opened % channel_num;
238
239        let mut sessions = Vec::<SessionHandle>::new();
240        let mut tasks = JoinSet::new();
241        for _ in 0..channel_num {
242            // Ensure that we create the exact number of requested sessions by adding the remainder to the first channel.
243            let creation_count = if channel_num == 0 {
244                creation_count_per_channel + remainder
245            } else {
246                creation_count_per_channel
247            };
248            let next_client = conn_pool.conn().with_metadata(client_metadata(&database));
249            let database = database.clone();
250            tasks.spawn(async move { batch_create_sessions(next_client, &database, creation_count).await });
251        }
252        while let Some(r) = tasks.join_next().await {
253            let new_sessions = r.map_err(|e| Status::from_error(e.into()))??;
254            sessions.extend(new_sessions);
255        }
256        tracing::debug!("initial session created count = {}", sessions.len());
257        Ok(sessions.into())
258    }
259
260    fn num_opened(&self) -> usize {
261        self.inner.read().num_opened()
262    }
263
264    /// The client first checks the waiting list.
265    /// If the waiting list is empty, it retrieves the first available session.
266    /// If there are no available sessions, it enters the waiting list.
267    /// If the waiting list is not empty, the client enters the waiting list.
268    /// The client on the waiting list will be notified when another client's session has finished and
269    /// when the process of replenishing the available sessions is complete.
270    async fn acquire(&self) -> Result<ManagedSession, SessionError> {
271        loop {
272            let (on_session_acquired, session_count) = {
273                let mut sessions = self.inner.write();
274
275                // Prioritize waiters over new acquirers.
276                if sessions.waiters.is_empty() {
277                    if let Some(mut s) = sessions.take() {
278                        s.last_used_at = Instant::now();
279                        return Ok(ManagedSession::new(self.clone(), s));
280                    }
281                }
282                // Add the participant to the waiting list.
283                let (sender, receiver) = oneshot::channel();
284                sessions.waiters.push_back(sender);
285                let session_count = sessions.reserve(self.config.max_opened, self.config.inc_step);
286                (receiver, session_count)
287            };
288
289            if session_count > 0 {
290                let _ = self.session_creation_sender.send(session_count);
291            }
292
293            // Wait for the session available notification.
294            match timeout(self.config.session_get_timeout, on_session_acquired).await {
295                Ok(Ok(())) => {
296                    let mut sessions = self.inner.write();
297                    if let Some(mut s) = sessions.take() {
298                        s.last_used_at = Instant::now();
299                        return Ok(ManagedSession::new(self.clone(), s));
300                    } else {
301                        continue; // another waiter raced for session
302                    }
303                }
304                _ => {
305                    {
306                        let sessions = self.inner.write();
307                        tracing::info!(
308                            available = sessions.available_sessions.len(),
309                            waiters = sessions.waiters.len(),
310                            orphans = sessions.orphans.len(),
311                            num_inuse = sessions.num_inuse,
312                            num_creating = sessions.num_creating,
313                            max_opened = self.config.max_opened,
314                            "Timeout acquiring session"
315                        );
316                    }
317                    return Err(SessionError::SessionGetTimeout);
318                }
319            }
320        }
321    }
322
323    /// If the session is valid
324    ///  - Pass the session to the first user on the waiting list.
325    ///  - If there is no waiting list, the session is returned to the list of available sessions.
326    ///    If the session is invalid
327    ///  - Discard the session. If the number of sessions falls below the threshold as a result of discarding, the session replenishment process is called.
328    fn recycle(&self, mut session: SessionHandle) {
329        if session.valid {
330            let mut sessions = self.inner.write();
331            let waiter = sessions.take_waiter();
332            if sessions.num_opened() > self.config.max_idle
333                && session.created_at + self.config.idle_timeout < Instant::now()
334                && waiter.is_none()
335            {
336                // Not reuse expired idle session
337                session.valid = false
338            }
339            sessions.release(session);
340            if let Some(waiter) = waiter {
341                let _ = waiter.send(());
342            }
343        } else {
344            let session_count = {
345                let mut sessions = self.inner.write();
346                sessions.release(session);
347                if sessions.num_opened() < self.config.min_opened && !sessions.waiters.is_empty() {
348                    sessions.reserve(self.config.max_opened, self.config.inc_step)
349                } else {
350                    0
351                }
352            };
353            if session_count > 0 {
354                let _ = self.session_creation_sender.send(session_count);
355            }
356        }
357    }
358
359    async fn close(&self) {
360        let empty = VecDeque::new();
361        let deleting_sessions = { mem::replace(&mut self.inner.write().available_sessions, empty) };
362        for mut session in deleting_sessions {
363            session.delete().await;
364        }
365
366        self.remove_orphans().await;
367    }
368
369    async fn remove_orphans(&self) {
370        let empty = vec![];
371        let deleting_sessions = { mem::replace(&mut self.inner.write().orphans, empty) };
372        tracing::trace!("remove {} orphan sessions", deleting_sessions.len());
373        for mut session in deleting_sessions {
374            session.delete().await;
375        }
376    }
377}
378
379#[derive(Clone, Debug)]
380pub struct SessionConfig {
381    /// max_opened is the maximum number of opened sessions allowed by the session
382    /// pool. If the client tries to open a session and there are already
383    /// max_opened sessions, it will block until one becomes available or the
384    /// context passed to the client method is canceled or times out.
385    pub max_opened: usize,
386
387    /// min_opened is the minimum number of opened sessions that the session pool
388    /// tries to maintain. Session pool won't continue to expire sessions if
389    /// number of opened connections drops below min_opened. However, if a session
390    /// is found to be broken, it will still be evicted from the session pool,
391    /// therefore it is posssible that the number of opened sessions drops below
392    /// min_opened.
393    pub min_opened: usize,
394
395    /// max_idle is the maximum number of idle sessions, pool is allowed to keep.
396    pub max_idle: usize,
397
398    /// idle_timeout is the wait time before discarding an idle session.
399    /// Sessions older than this value since they were last used will be discarded.
400    /// However, if the number of sessions is less than or equal to min_opened, it will not be discarded.
401    pub idle_timeout: Duration,
402
403    pub session_alive_trust_duration: Duration,
404
405    /// session_get_timeout is the maximum value of the waiting time that occurs when retrieving from the connection pool when there is no idle session.
406    pub session_get_timeout: Duration,
407
408    /// refresh_interval is the interval of cleanup and health check functions.
409    pub refresh_interval: Duration,
410
411    /// incStep is the number of sessions to create in one batch when at least
412    /// one more session is needed.
413    inc_step: usize,
414}
415
416impl Default for SessionConfig {
417    fn default() -> Self {
418        SessionConfig {
419            max_opened: 400,
420            min_opened: 10,
421            max_idle: 300,
422            inc_step: 25,
423            idle_timeout: Duration::from_secs(30 * 60),
424            session_alive_trust_duration: Duration::from_secs(55 * 60),
425            session_get_timeout: Duration::from_secs(1),
426            refresh_interval: Duration::from_secs(5 * 60),
427        }
428    }
429}
430
431#[derive(thiserror::Error, Debug)]
432pub enum SessionError {
433    #[error("session get time out")]
434    SessionGetTimeout,
435    #[error("failed to create session")]
436    FailedToCreateSession,
437    #[error(transparent)]
438    GRPC(#[from] Status),
439}
440
441impl TryAs<Status> for SessionError {
442    fn try_as(&self) -> Option<&Status> {
443        match self {
444            SessionError::GRPC(e) => Some(e),
445            _ => None,
446        }
447    }
448}
449
450pub(crate) struct SessionManager {
451    session_pool: SessionPool,
452    cancel: CancellationToken,
453    tasks: Mutex<Vec<JoinHandle<()>>>,
454}
455
456impl SessionManager {
457    pub async fn new(
458        database: impl Into<String>,
459        conn_pool: ConnectionManager,
460        config: SessionConfig,
461    ) -> Result<Arc<SessionManager>, Status> {
462        let database = database.into();
463        let (sender, receiver) = mpsc::unbounded_channel();
464        let session_pool = SessionPool::new(database.clone(), &conn_pool, sender, Arc::new(config.clone())).await?;
465
466        let cancel = CancellationToken::new();
467        let task_session_cleaner = Self::spawn_health_check_task(config, session_pool.clone(), cancel.clone());
468        let task_session_creator =
469            Self::spawn_session_creation_task(session_pool.clone(), database, conn_pool, receiver, cancel.clone());
470
471        let sm = SessionManager {
472            session_pool,
473            cancel,
474            tasks: Mutex::new(vec![task_session_cleaner, task_session_creator]),
475        };
476        Ok(Arc::new(sm))
477    }
478
479    pub fn num_opened(&self) -> usize {
480        self.session_pool.num_opened()
481    }
482
483    pub async fn get(&self) -> Result<ManagedSession, SessionError> {
484        self.session_pool.acquire().await
485    }
486
487    pub async fn close(&self) {
488        if self.cancel.is_cancelled() {
489            return;
490        }
491        self.cancel.cancel();
492        let tasks = { mem::take(&mut *self.tasks.lock()) };
493        for task in tasks {
494            let _ = task.await;
495        }
496        self.session_pool.close().await;
497    }
498
499    fn spawn_session_creation_task(
500        session_pool: SessionPool,
501        database: String,
502        conn_pool: ConnectionManager,
503        mut rx: UnboundedReceiver<usize>,
504        cancel: CancellationToken,
505    ) -> JoinHandle<()> {
506        tokio::spawn(async move {
507            let mut tasks = JoinSet::default();
508            loop {
509                select! {
510                    biased;
511                    _ = cancel.cancelled() => break,
512                    Some(Ok((session_count, result))) = tasks.join_next(), if !tasks.is_empty() => {
513                        session_pool.inner.write().replenish(session_count, result);
514                    }
515                    session_count = rx.recv() => match session_count {
516                        Some(session_count) => {
517                            let client = conn_pool.conn().with_metadata(client_metadata(&database));
518                            let database = database.clone();
519                            tasks.spawn(async move { (session_count, batch_create_sessions(client, &database, session_count).await) });
520                        },
521                        None => continue
522                    },
523                }
524            }
525            tracing::trace!("shutdown session creation task.");
526        })
527    }
528
529    fn spawn_health_check_task(
530        config: SessionConfig,
531        session_pool: SessionPool,
532        cancel: CancellationToken,
533    ) -> JoinHandle<()> {
534        let start = Instant::now() + config.refresh_interval;
535        let mut interval = tokio::time::interval_at(start.into(), config.refresh_interval);
536
537        tokio::spawn(async move {
538            loop {
539                select! {
540                    _ = interval.tick() => {},
541                    _ = cancel.cancelled() => break
542                }
543                let now = Instant::now();
544
545                // remove orphans first
546                session_pool.remove_orphans().await;
547
548                // start health check
549                health_check(
550                    now + Duration::from_nanos(1),
551                    config.session_alive_trust_duration,
552                    &session_pool,
553                    cancel.clone(),
554                )
555                .await;
556            }
557            tracing::trace!("shutdown health check task.")
558        })
559    }
560}
561
562async fn health_check(
563    now: Instant,
564    session_alive_trust_duration: Duration,
565    sessions: &SessionPool,
566    cancel: CancellationToken,
567) {
568    tracing::trace!("start health check");
569    let start = Instant::now();
570    let sleep_duration = Duration::from_millis(10);
571    loop {
572        select! {
573            _ = sleep(sleep_duration) => {},
574            _ = cancel.cancelled() => break
575        }
576        let mut s = {
577            // temporary take
578            let mut locked = sessions.inner.write();
579            match locked.take() {
580                Some(mut s) => {
581                    // all the session check complete.
582                    if s.last_checked_at == now {
583                        locked.release(s);
584                        break;
585                    }
586                    if std::cmp::max(s.last_used_at, s.last_pong_at) + session_alive_trust_duration >= now {
587                        s.last_checked_at = now;
588                        locked.release(s);
589                        continue;
590                    }
591                    s
592                }
593                None => break,
594            }
595        };
596
597        let request = ping_query_request(s.session.name.clone());
598        match s.spanner_client.execute_sql(request, None).await {
599            Ok(_) => {
600                s.last_checked_at = now;
601                s.last_pong_at = now;
602                sessions.recycle(s);
603            }
604            Err(_) => {
605                s.delete().await;
606                sessions.recycle(s);
607            }
608        }
609    }
610    tracing::trace!("end health check elapsed={}msec", start.elapsed().as_millis());
611}
612
613async fn batch_create_sessions(
614    spanner_client: Client,
615    database: &str,
616    mut remaining_create_count: usize,
617) -> Result<Vec<SessionHandle>, Status> {
618    let mut created = Vec::with_capacity(remaining_create_count);
619    while remaining_create_count > 0 {
620        let sessions = batch_create_session(spanner_client.clone(), database, remaining_create_count).await?;
621        // Spanner could return less sessions than requested.
622        // In that case, we should do another call using the same gRPC channel.
623        let actually_created = sessions.len();
624        remaining_create_count -= actually_created;
625        created.extend(sessions);
626    }
627    Ok(created)
628}
629
630async fn batch_create_session(
631    mut spanner_client: Client,
632    database: &str,
633    session_count: usize,
634) -> Result<Vec<SessionHandle>, Status> {
635    let request = BatchCreateSessionsRequest {
636        database: database.to_string(),
637        session_template: None,
638        session_count: session_count as i32,
639    };
640
641    tracing::debug!("spawn session creation request : session_count = {}", session_count);
642    let response = spanner_client.batch_create_sessions(request, None).await?.into_inner();
643
644    let now = Instant::now();
645    Ok(response
646        .session
647        .into_iter()
648        .map(|s| SessionHandle::new(s, spanner_client.clone(), now))
649        .collect::<Vec<SessionHandle>>())
650}
651
652pub(crate) fn client_metadata(database: &str) -> MetadataMap {
653    let mut metadata = MetadataMap::new();
654    metadata.insert("google-cloud-resource-prefix", database.parse().unwrap());
655    metadata
656}
657
658#[cfg(test)]
659mod tests {
660    use std::sync::atomic::{AtomicI64, Ordering};
661    use std::sync::Arc;
662    use std::time::{Duration, Instant};
663
664    use parking_lot::RwLock;
665    use serial_test::serial;
666    use tokio::time::sleep;
667    use tokio_util::sync::CancellationToken;
668
669    use google_cloud_gax::conn::{ConnectionOptions, Environment};
670    use google_cloud_googleapis::spanner::v1::ExecuteSqlRequest;
671
672    use crate::apiv1::conn_pool::ConnectionManager;
673    use crate::session::{
674        batch_create_sessions, client_metadata, health_check, SessionConfig, SessionError, SessionManager,
675    };
676
677    pub const DATABASE: &str = "projects/local-project/instances/test-instance/databases/local-database";
678
679    #[ctor::ctor]
680    fn init() {
681        let filter = tracing_subscriber::filter::EnvFilter::from_default_env()
682            .add_directive("google_cloud_spanner=trace".parse().unwrap());
683        let _ = tracing_subscriber::fmt().with_env_filter(filter).try_init();
684    }
685
686    async fn assert_rush(use_invalidate: bool, config: SessionConfig) -> Arc<SessionManager> {
687        let cm = ConnectionManager::new(
688            4,
689            &Environment::Emulator("localhost:9010".to_string()),
690            "",
691            &ConnectionOptions::default(),
692        )
693        .await
694        .unwrap();
695        let sm = SessionManager::new(DATABASE, cm, config).await.unwrap();
696
697        let counter = Arc::new(AtomicI64::new(0));
698        let mut spawns = Vec::with_capacity(100);
699        for _ in 0..100 {
700            let sm = sm.clone();
701            let counter = Arc::clone(&counter);
702            spawns.push(tokio::spawn(async move {
703                let mut session = sm.get().await.unwrap();
704                if use_invalidate {
705                    session.delete().await;
706                }
707                counter.fetch_add(1, Ordering::SeqCst);
708                sleep(Duration::from_millis(300)).await;
709            }));
710        }
711        for handler in spawns {
712            let _ = handler.await;
713        }
714        sm
715    }
716
717    #[tokio::test(flavor = "multi_thread")]
718    #[serial]
719    async fn test_health_check_checked() {
720        let cm = ConnectionManager::new(
721            4,
722            &Environment::Emulator("localhost:9010".to_string()),
723            "",
724            &ConnectionOptions::default(),
725        )
726        .await
727        .unwrap();
728        let session_alive_trust_duration = Duration::from_millis(10);
729        let config = SessionConfig {
730            min_opened: 5,
731            session_alive_trust_duration,
732            max_opened: 5,
733            ..Default::default()
734        };
735        let sm = std::sync::Arc::new(SessionManager::new(DATABASE, cm, config).await.unwrap());
736        sleep(Duration::from_secs(1)).await;
737
738        let cancel = CancellationToken::new();
739        health_check(Instant::now(), session_alive_trust_duration, &sm.session_pool, cancel.clone()).await;
740
741        assert_eq!(sm.num_opened(), 5);
742        tokio::time::sleep(Duration::from_millis(500)).await;
743        cancel.cancel();
744    }
745
746    #[tokio::test(flavor = "multi_thread")]
747    #[serial]
748    async fn test_health_check_not_checked() {
749        let cm = ConnectionManager::new(
750            4,
751            &Environment::Emulator("localhost:9010".to_string()),
752            "",
753            &ConnectionOptions::default(),
754        )
755        .await
756        .unwrap();
757        let session_alive_trust_duration = Duration::from_secs(10);
758        let config = SessionConfig {
759            min_opened: 5,
760            session_alive_trust_duration,
761            max_opened: 5,
762            ..Default::default()
763        };
764        let sm = Arc::new(SessionManager::new(DATABASE, cm, config).await.unwrap());
765        sleep(Duration::from_secs(1)).await;
766
767        let cancel = CancellationToken::new();
768        health_check(Instant::now(), session_alive_trust_duration, &sm.session_pool, cancel.clone()).await;
769
770        assert_eq!(sm.num_opened(), 5);
771        sleep(Duration::from_millis(500)).await;
772        cancel.cancel();
773    }
774
775    #[tokio::test(flavor = "multi_thread")]
776    #[serial]
777    async fn test_increase_session_and_idle_session_expired() {
778        let conn_pool = ConnectionManager::new(
779            4,
780            &Environment::Emulator("localhost:9010".to_string()),
781            "",
782            &ConnectionOptions::default(),
783        )
784        .await
785        .unwrap();
786        let config = SessionConfig {
787            idle_timeout: Duration::from_millis(10),
788            min_opened: 10,
789            max_idle: 20,
790            max_opened: 45,
791            ..Default::default()
792        };
793        let sm = SessionManager::new(DATABASE, conn_pool, config).await.unwrap();
794        {
795            let mut sessions = Vec::new();
796            for _ in 0..45 {
797                sessions.push(sm.get().await.unwrap());
798            }
799
800            // all the session are using
801            assert_eq!(sm.num_opened(), 45);
802            assert_eq!(sm.session_pool.inner.read().num_inuse, 45, "all the session are using");
803            sleep(Duration::from_secs(1)).await;
804        }
805
806        // idle session removed after drop
807        let sessions = sm.session_pool.inner.read();
808        assert_eq!(sessions.num_inuse, 0, "invalid num_inuse");
809        assert_eq!(sessions.available_sessions.len(), 20, "invalid available sessions");
810        assert_eq!(sessions.num_opened(), 20, "invalid num open");
811        assert_eq!(sessions.waiters.len(), 0, "session waiters is 0");
812    }
813
814    #[tokio::test(flavor = "multi_thread")]
815    #[serial]
816    async fn test_too_many_session_timeout() {
817        let conn_pool = ConnectionManager::new(
818            4,
819            &Environment::Emulator("localhost:9010".to_string()),
820            "",
821            &ConnectionOptions::default(),
822        )
823        .await
824        .unwrap();
825        let config = SessionConfig {
826            idle_timeout: Duration::from_millis(10),
827            min_opened: 10,
828            max_idle: 20,
829            max_opened: 45,
830            session_get_timeout: Duration::from_secs(1),
831            ..Default::default()
832        };
833        let sm = Arc::new(SessionManager::new(DATABASE, conn_pool, config.clone()).await.unwrap());
834        let mu = Arc::new(RwLock::new(Vec::new()));
835        let mut awaiters = Vec::with_capacity(100);
836        for _ in 0..100 {
837            let sm = sm.clone();
838            let mu = mu.clone();
839            awaiters.push(tokio::spawn(async move {
840                let session = sm.get().await;
841                mu.write().push(session);
842                0
843            }))
844        }
845        for handler in awaiters {
846            let _ = handler.await;
847        }
848        let sessions = mu.read();
849        for i in 0..sessions.len() - 1 {
850            let session = &sessions[i];
851            if i >= config.max_opened {
852                assert!(session.is_err(), "must err {i}");
853                match session.as_ref().err().unwrap() {
854                    SessionError::SessionGetTimeout => {}
855                    _ => {
856                        panic!("must be session timeout error")
857                    }
858                }
859            } else {
860                assert!(session.is_ok(), "must ok {i}");
861            }
862        }
863        let pool = sm.session_pool.inner.read();
864        assert_eq!(pool.num_opened(), config.max_opened);
865        assert_eq!(pool.waiters.len(), 100 - config.max_opened); //include timeout sessions
866    }
867
868    #[tokio::test(flavor = "multi_thread")]
869    #[serial]
870    async fn test_rush_invalidate() {
871        let config = SessionConfig {
872            session_get_timeout: Duration::from_secs(20),
873            min_opened: 10,
874            max_idle: 20,
875            max_opened: 45,
876            ..Default::default()
877        };
878        let sm = assert_rush(true, config.clone()).await;
879        {
880            let sessions = sm.session_pool.inner.read();
881            let available_sessions = sessions.available_sessions.len();
882            assert_eq!(sessions.num_inuse, 0);
883            assert_eq!(sessions.waiters.len(), 0);
884            assert_eq!(sessions.orphans.len(), 0);
885            assert!(
886                available_sessions <= config.max_opened && available_sessions >= config.min_opened,
887                "now is {available_sessions}"
888            );
889        }
890        sm.close().await;
891    }
892
893    #[tokio::test(flavor = "multi_thread")]
894    #[serial]
895    async fn test_rush() {
896        let config = SessionConfig {
897            min_opened: 10,
898            max_idle: 20,
899            max_opened: 45,
900            ..Default::default()
901        };
902        let sm = assert_rush(false, config.clone()).await;
903        {
904            let sessions = sm.session_pool.inner.read();
905            let available_sessions = sessions.available_sessions.len();
906            assert_eq!(sessions.num_inuse, 0);
907            assert_eq!(sessions.waiters.len(), 0);
908            assert_eq!(sessions.orphans.len(), 0);
909            assert!(
910                available_sessions <= config.max_opened && available_sessions >= config.min_opened,
911                "now is {available_sessions}"
912            );
913        }
914        sm.close().await;
915    }
916
917    #[tokio::test(flavor = "multi_thread")]
918    #[serial]
919    async fn test_rush_with_invalidate() {
920        let config = SessionConfig {
921            min_opened: 10,
922            max_idle: 20,
923            max_opened: 45,
924            ..Default::default()
925        };
926        let sm = assert_rush(true, config.clone()).await;
927        {
928            let sessions = sm.session_pool.inner.read();
929            let available_sessions = sessions.available_sessions.len();
930            assert_eq!(sessions.num_inuse, 0);
931            assert_eq!(sessions.waiters.len(), 0);
932            assert_eq!(sessions.orphans.len(), 0);
933            assert!(
934                available_sessions <= config.max_opened && available_sessions >= config.min_opened,
935                "now is {available_sessions}"
936            );
937        }
938        sm.close().await;
939    }
940
941    #[tokio::test(flavor = "multi_thread")]
942    #[serial]
943    async fn test_rush_with_health_check() {
944        let config = SessionConfig {
945            session_alive_trust_duration: Duration::from_millis(10),
946            refresh_interval: Duration::from_millis(250),
947            session_get_timeout: Duration::from_secs(20),
948            min_opened: 10,
949            max_idle: 20,
950            max_opened: 45,
951            ..Default::default()
952        };
953        let sm = assert_rush(false, config.clone()).await;
954        sleep(Duration::from_secs(2)).await;
955        {
956            let sessions = sm.session_pool.inner.read();
957            let available_sessions = sessions.available_sessions.len();
958            assert!(sessions.num_inuse <= 1, "num_inuse is {}", sessions.num_inuse);
959            assert_eq!(sessions.waiters.len(), 0);
960            assert_eq!(sessions.orphans.len(), 0);
961            assert!(
962                available_sessions <= config.max_opened && available_sessions >= config.max_idle - 1,
963                "now is {available_sessions}"
964            );
965        }
966        sm.close().await;
967    }
968
969    #[tokio::test(flavor = "multi_thread")]
970    #[serial]
971    async fn test_rush_with_health_check_and_invalidate() {
972        let config = SessionConfig {
973            session_alive_trust_duration: Duration::from_millis(10),
974            refresh_interval: Duration::from_millis(250),
975            session_get_timeout: Duration::from_secs(20),
976            min_opened: 10,
977            max_idle: 20,
978            max_opened: 45,
979            ..Default::default()
980        };
981        let sm = assert_rush(true, config.clone()).await;
982        sleep(Duration::from_secs(2)).await;
983        {
984            let sessions = sm.session_pool.inner.read();
985            let available_sessions = sessions.available_sessions.len();
986            assert!(sessions.num_inuse <= 1, "num_inuse is {}", sessions.num_inuse);
987            assert_eq!(sessions.waiters.len(), 0);
988            assert_eq!(sessions.orphans.len(), 0);
989            assert!(
990                available_sessions <= config.max_opened && available_sessions >= config.min_opened - 1,
991                "now is {available_sessions}"
992            );
993        }
994        sm.close().await;
995    }
996
997    #[tokio::test(flavor = "multi_thread")]
998    #[serial]
999    async fn test_rush_with_idle_expired() {
1000        let config = SessionConfig {
1001            min_opened: 10,
1002            max_idle: 20,
1003            max_opened: 45,
1004            idle_timeout: Duration::from_millis(1),
1005            ..Default::default()
1006        };
1007        let sm = assert_rush(false, config.clone()).await;
1008        {
1009            let sessions = sm.session_pool.inner.read();
1010            assert_eq!(sessions.num_inuse, 0);
1011            assert_eq!(sessions.waiters.len(), 0);
1012            assert_eq!(sessions.orphans.len(), config.max_opened - config.max_idle);
1013            assert_eq!(sessions.available_sessions.len(), config.max_idle);
1014        }
1015        sm.close().await;
1016    }
1017
1018    #[tokio::test(flavor = "multi_thread")]
1019    #[serial]
1020    async fn test_rush_with_health_check_and_idle_expired() {
1021        let config = SessionConfig {
1022            session_alive_trust_duration: Duration::from_millis(10),
1023            refresh_interval: Duration::from_millis(250),
1024            session_get_timeout: Duration::from_secs(20),
1025            min_opened: 10,
1026            max_idle: 20,
1027            max_opened: 45,
1028            idle_timeout: Duration::from_millis(1),
1029            ..Default::default()
1030        };
1031        let sm = assert_rush(false, config.clone()).await;
1032        sleep(Duration::from_secs(1)).await;
1033        {
1034            let sessions = sm.session_pool.inner.read();
1035            assert!(sessions.num_inuse <= 1, "num_inuse is {}", sessions.num_inuse);
1036            assert_eq!(sessions.waiters.len(), 0);
1037            assert_eq!(sessions.orphans.len(), 0);
1038            let available_sessions = sessions.available_sessions.len();
1039            assert!(
1040                available_sessions >= config.min_opened - 1 && available_sessions <= config.max_idle,
1041                "now is {available_sessions}"
1042            );
1043        }
1044        sm.close().await;
1045    }
1046
1047    #[tokio::test(flavor = "multi_thread")]
1048    #[serial]
1049    async fn test_rush_with_health_check_and_idle_expired_and_invalid() {
1050        let config = SessionConfig {
1051            session_alive_trust_duration: Duration::from_millis(10),
1052            refresh_interval: Duration::from_millis(250),
1053            session_get_timeout: Duration::from_secs(20),
1054            min_opened: 10,
1055            max_idle: 20,
1056            max_opened: 45,
1057            idle_timeout: Duration::from_millis(1),
1058            ..Default::default()
1059        };
1060        let sm = assert_rush(true, config.clone()).await;
1061        sleep(Duration::from_secs(2)).await;
1062        {
1063            let sessions = sm.session_pool.inner.read();
1064            assert!(sessions.num_inuse <= 1, "num_inuse is {}", sessions.num_inuse);
1065            // health checker removes orphans
1066            assert_eq!(sessions.orphans.len(), 0);
1067            assert_eq!(sessions.waiters.len(), 0, "invalid waiters");
1068            let available_sessions = sessions.available_sessions.len();
1069            assert!(
1070                available_sessions >= config.min_opened - 1 && available_sessions <= config.max_idle,
1071                "now is {available_sessions}"
1072            );
1073        }
1074        sm.close().await;
1075    }
1076
1077    #[tokio::test(flavor = "multi_thread")]
1078    #[serial]
1079    async fn test_close() {
1080        let cm = ConnectionManager::new(
1081            4,
1082            &Environment::Emulator("localhost:9010".to_string()),
1083            "",
1084            &ConnectionOptions::default(),
1085        )
1086        .await
1087        .unwrap();
1088        let config = SessionConfig::default();
1089        let sm = SessionManager::new(DATABASE, cm, config.clone()).await.unwrap();
1090        assert_eq!(sm.num_opened(), config.min_opened);
1091        sm.close().await;
1092        assert_eq!(sm.num_opened(), 0);
1093        assert_eq!(sm.session_pool.inner.read().orphans.len(), 0);
1094    }
1095
1096    #[tokio::test(flavor = "multi_thread")]
1097    #[serial]
1098    async fn test_batch_create_sessions() {
1099        let cm = ConnectionManager::new(
1100            1,
1101            &Environment::Emulator("localhost:9010".to_string()),
1102            "",
1103            &ConnectionOptions::default(),
1104        )
1105        .await
1106        .unwrap();
1107        let client = cm.conn().with_metadata(client_metadata(DATABASE));
1108        let session_count = 125;
1109        let result = batch_create_sessions(client.clone(), DATABASE, session_count).await;
1110        match result {
1111            Ok(created) => {
1112                assert_eq!(session_count, created.len());
1113                for mut s in created {
1114                    let ping_result = s
1115                        .spanner_client
1116                        .execute_sql(
1117                            ExecuteSqlRequest {
1118                                session: s.session.name.to_string(),
1119                                transaction: None,
1120                                sql: "SELECT 1".to_string(),
1121                                params: None,
1122                                param_types: Default::default(),
1123                                resume_token: vec![],
1124                                query_mode: 0,
1125                                partition_token: vec![],
1126                                seqno: 0,
1127                                query_options: None,
1128                                request_options: None,
1129                                directed_read_options: None,
1130                                data_boost_enabled: false,
1131                                last_statement: false,
1132                            },
1133                            None,
1134                        )
1135                        .await;
1136                    assert!(ping_result.is_ok());
1137                }
1138            }
1139            Err(err) => panic!("{err:?}"),
1140        }
1141    }
1142}