use crate::client::Spanner;
use crate::model::{CreateSessionRequest, Session};
use crate::{RequestOptions, Result};
use std::sync::{Arc, RwLock, Weak};
use std::time::Duration;
use tokio::time::{Instant, sleep};
pub(crate) const SESSION_MAINTENANCE_INTERVAL: Duration = Duration::from_secs(3600);
pub(crate) const SESSION_MAINTENANCE_AGE: Duration = Duration::from_secs(7 * 24 * 3600);
#[derive(Debug)]
pub(crate) struct ManagedSessionMaintainer {
pub(crate) spanner: Spanner,
pub(crate) session: RwLock<ManagedSession>,
pub(crate) database_name: String,
pub(crate) database_role: String,
pub(crate) options: RequestOptions,
}
#[derive(Debug)]
pub(crate) struct ManagedSession {
pub(crate) session: Arc<Session>,
pub(crate) created_at: Instant,
}
impl ManagedSessionMaintainer {
pub(crate) fn session_name(&self) -> String {
self.session
.read()
.expect("failed to read session")
.session
.name
.clone()
}
pub(crate) async fn create_and_start_maintenance(
spanner: Spanner,
database_name: String,
database_role: String,
options: RequestOptions,
) -> Result<Arc<Self>> {
let session =
Self::create_session(&spanner, &database_name, &database_role, &options).await?;
let maintainer = Arc::new(ManagedSessionMaintainer {
spanner,
session: RwLock::new(ManagedSession {
session: Arc::new(session),
created_at: Instant::now(),
}),
database_name,
database_role,
options,
});
let weak_maintainer = Arc::downgrade(&maintainer);
tokio::spawn(async move {
Self::maintenance_loop(
weak_maintainer,
SESSION_MAINTENANCE_INTERVAL,
SESSION_MAINTENANCE_AGE,
)
.await;
});
Ok(maintainer)
}
async fn check_and_replace_session(&self, age: Duration) -> Result<()> {
let should_replace = {
let guard = self.session.read().expect("failed to read session");
guard.created_at.elapsed() >= age
};
if should_replace {
self.replace_session().await?;
}
Ok(())
}
async fn replace_session(&self) -> Result<()> {
let new_session = Self::create_session(
&self.spanner,
&self.database_name,
&self.database_role,
&self.options,
)
.await?;
let mut guard = self.session.write().expect("failed to write session");
*guard = ManagedSession {
session: Arc::new(new_session),
created_at: Instant::now(),
};
tracing::info!(
"Successfully replaced multiplexed session for {}",
self.database_name
);
Ok(())
}
async fn create_session(
spanner: &Spanner,
database_name: &str,
database_role: &str,
options: &RequestOptions,
) -> Result<Session> {
let request = CreateSessionRequest::new()
.set_database(database_name)
.set_session(
Session::new()
.set_multiplexed(true)
.set_creator_role(database_role),
);
spanner
.create_session(request, options.clone(), spanner.next_channel_hint())
.await
}
async fn maintenance_loop(
maintainer: Weak<ManagedSessionMaintainer>,
interval: Duration,
age: Duration,
) {
sleep(interval).await;
while let Some(m) = maintainer.upgrade() {
Self::maintain(m, age).await;
sleep(interval).await;
}
}
async fn maintain(maintainer: Arc<ManagedSessionMaintainer>, age: Duration) {
if let Err(e) = maintainer.check_and_replace_session(age).await {
tracing::warn!(
"Failed to check and replace session for {}: {}. Retrying in 1 hour.",
maintainer.database_name,
e
);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use gaxi::grpc::tonic::Response;
use google_cloud_auth::credentials::anonymous::Builder as Anonymous;
use google_cloud_test_macros::tokio_test_no_panics;
use spanner_grpc_mock::google::spanner::v1::Session as GrpcSession;
use spanner_grpc_mock::{MockSpanner, start};
#[tokio_test_no_panics]
async fn session_maintenance() {
let mut mock = MockSpanner::new();
let mut seq = mockall::Sequence::new();
mock.expect_create_session()
.once()
.in_sequence(&mut seq)
.returning(|_| {
Ok(Response::new(GrpcSession {
name:
"projects/test-project/instances/test-instance/databases/test-db/sessions/1"
.to_string(),
multiplexed: true,
..Default::default()
}))
});
mock.expect_create_session()
.once()
.in_sequence(&mut seq)
.returning(|_| {
Ok(Response::new(GrpcSession {
name:
"projects/test-project/instances/test-instance/databases/test-db/sessions/2"
.to_string(),
multiplexed: true,
..Default::default()
}))
});
let (address, _server) = start("0.0.0.0:0", mock)
.await
.expect("Failed to start mock server");
let spanner = Spanner::builder()
.with_endpoint(address)
.with_credentials(Anonymous::new().build())
.build()
.await
.expect("Failed to build client");
let maintainer = ManagedSessionMaintainer::create_and_start_maintenance(
spanner,
"projects/test-project/instances/test-instance/databases/test-db".to_string(),
"test-role".to_string(),
RequestOptions::default(),
)
.await
.expect("Failed to create ManagedSessionMaintainer");
{
let session = maintainer
.session
.read()
.expect("failed to read session")
.session
.clone();
assert_eq!(
session.name,
"projects/test-project/instances/test-instance/databases/test-db/sessions/1"
);
}
{
let mut guard = maintainer.session.write().expect("failed to write session");
guard.created_at = Instant::now() - Duration::from_secs(7 * 24 * 3600 + 3600);
}
maintainer
.check_and_replace_session(SESSION_MAINTENANCE_AGE)
.await
.expect("Failed to check and replace session");
{
let session = maintainer
.session
.read()
.expect("failed to read session")
.session
.clone();
assert_eq!(
session.name,
"projects/test-project/instances/test-instance/databases/test-db/sessions/2"
);
}
}
#[tokio_test_no_panics]
async fn maintain_success() {
let mut mock = MockSpanner::new();
mock.expect_create_session().once().returning(|_| {
Ok(Response::new(GrpcSession {
name: "projects/test-project/instances/test-instance/databases/test-db/sessions/1"
.to_string(),
multiplexed: true,
..Default::default()
}))
});
let (address, _server) = start("0.0.0.0:0", mock)
.await
.expect("Failed to start mock server");
let spanner = Spanner::builder()
.with_endpoint(address)
.with_credentials(Anonymous::new().build())
.build()
.await
.expect("Failed to build client");
let maintainer = ManagedSessionMaintainer::create_and_start_maintenance(
spanner,
"projects/test-project/instances/test-instance/databases/test-db".to_string(),
"test-role".to_string(),
RequestOptions::default(),
)
.await
.expect("Failed to create ManagedSessionMaintainer");
let weak = Arc::downgrade(&maintainer);
let m = weak.upgrade().expect("should be alive");
ManagedSessionMaintainer::maintain(m, SESSION_MAINTENANCE_AGE).await;
}
#[tokio_test_no_panics]
async fn maintain_dropped() {
let weak = Weak::<ManagedSessionMaintainer>::new();
assert!(weak.upgrade().is_none());
}
#[tokio_test_no_panics]
async fn check_and_replace_session_no_op() {
let mut mock = MockSpanner::new();
mock.expect_create_session().once().returning(|_| {
Ok(Response::new(GrpcSession {
name: "projects/test-project/instances/test-instance/databases/test-db/sessions/1"
.to_string(),
multiplexed: true,
..Default::default()
}))
});
let (address, _server) = start("0.0.0.0:0", mock)
.await
.expect("Failed to start mock server");
let spanner = Spanner::builder()
.with_endpoint(address)
.with_credentials(Anonymous::new().build())
.build()
.await
.expect("Failed to build client");
let maintainer = ManagedSessionMaintainer::create_and_start_maintenance(
spanner,
"projects/test-project/instances/test-instance/databases/test-db".to_string(),
"test-role".to_string(),
RequestOptions::default(),
)
.await
.expect("Failed to create ManagedSessionMaintainer");
maintainer
.check_and_replace_session(SESSION_MAINTENANCE_AGE)
.await
.expect("Failed to check and replace session");
let session = maintainer
.session
.read()
.expect("failed to read session")
.session
.clone();
assert_eq!(
session.name,
"projects/test-project/instances/test-instance/databases/test-db/sessions/1"
);
}
#[tokio_test_no_panics]
async fn maintain_creation_fails() {
let mut mock = MockSpanner::new();
let mut seq = mockall::Sequence::new();
mock.expect_create_session()
.once()
.in_sequence(&mut seq)
.returning(|_| {
Ok(Response::new(GrpcSession {
name:
"projects/test-project/instances/test-instance/databases/test-db/sessions/1"
.to_string(),
multiplexed: true,
..Default::default()
}))
});
mock.expect_create_session()
.once()
.in_sequence(&mut seq)
.returning(|_| Err(gaxi::grpc::tonic::Status::internal("mock failure")));
let (address, _server) = start("0.0.0.0:0", mock)
.await
.expect("Failed to start mock server");
let spanner = Spanner::builder()
.with_endpoint(address)
.with_credentials(Anonymous::new().build())
.build()
.await
.expect("Failed to build client");
let maintainer = ManagedSessionMaintainer::create_and_start_maintenance(
spanner,
"projects/test-project/instances/test-instance/databases/test-db".to_string(),
"test-role".to_string(),
RequestOptions::default(),
)
.await
.expect("Failed to create ManagedSessionMaintainer");
{
let mut guard = maintainer.session.write().expect("failed to write session");
guard.created_at = Instant::now() - Duration::from_secs(7 * 24 * 3600 + 3600);
}
let weak = Arc::downgrade(&maintainer);
let m = weak.upgrade().expect("should be alive");
ManagedSessionMaintainer::maintain(m, SESSION_MAINTENANCE_AGE).await;
let session = maintainer
.session
.read()
.expect("failed to read session")
.session
.clone();
assert_eq!(
session.name,
"projects/test-project/instances/test-instance/databases/test-db/sessions/1"
);
}
#[tokio_test_no_panics]
async fn transaction_session_consistency_across_retries() {
use crate::database_client::DatabaseClient;
use crate::transaction_retry_policy::tests::create_aborted_status;
use spanner_grpc_mock::google::spanner::v1 as mock_v1;
use spanner_grpc_mock::google::spanner::v1::result_set_stats::RowCount;
let mut mock = MockSpanner::new();
let mut seq = mockall::Sequence::new();
mock.expect_create_session()
.once()
.in_sequence(&mut seq)
.returning(|_| {
Ok(Response::new(mock_v1::Session {
name: "projects/p/instances/i/databases/d/sessions/1".to_string(),
multiplexed: true,
..Default::default()
}))
});
mock.expect_create_session()
.once()
.in_sequence(&mut seq)
.returning(|_| {
Ok(Response::new(mock_v1::Session {
name: "projects/p/instances/i/databases/d/sessions/2".to_string(),
multiplexed: true,
..Default::default()
}))
});
mock.expect_begin_transaction().times(2).returning(|req| {
let req = req.into_inner();
assert_eq!(req.session, "projects/p/instances/i/databases/d/sessions/1");
Ok(Response::new(mock_v1::Transaction {
id: vec![1, 2, 3],
..Default::default()
}))
});
mock.expect_execute_sql().once().returning(|req| {
let req = req.into_inner();
assert_eq!(req.session, "projects/p/instances/i/databases/d/sessions/1");
Ok(Response::new(mock_v1::ResultSet {
metadata: Some(mock_v1::ResultSetMetadata {
transaction: Some(mock_v1::Transaction {
id: vec![1, 2, 3],
..Default::default()
}),
..Default::default()
}),
stats: Some(mock_v1::ResultSetStats {
row_count: Some(RowCount::RowCountExact(1)),
..Default::default()
}),
..Default::default()
}))
});
mock.expect_commit().once().returning(|req| {
let req = req.into_inner();
assert_eq!(req.session, "projects/p/instances/i/databases/d/sessions/1");
Err(create_aborted_status(std::time::Duration::from_nanos(1)))
});
mock.expect_execute_sql().once().returning(|req| {
let req = req.into_inner();
assert_eq!(req.session, "projects/p/instances/i/databases/d/sessions/1");
Ok(Response::new(mock_v1::ResultSet {
metadata: Some(mock_v1::ResultSetMetadata {
transaction: Some(mock_v1::Transaction {
id: vec![1, 2, 3],
..Default::default()
}),
..Default::default()
}),
stats: Some(mock_v1::ResultSetStats {
row_count: Some(RowCount::RowCountExact(1)),
..Default::default()
}),
..Default::default()
}))
});
mock.expect_commit().once().returning(|req| {
let req = req.into_inner();
assert_eq!(req.session, "projects/p/instances/i/databases/d/sessions/1");
Ok(Response::new(mock_v1::CommitResponse {
commit_timestamp: Some(prost_types::Timestamp {
seconds: 123456789,
nanos: 0,
}),
..Default::default()
}))
});
let (address, _server) = start("0.0.0.0:0", mock)
.await
.expect("Failed to start mock server");
let spanner = Spanner::builder()
.with_endpoint(address)
.with_credentials(Anonymous::new().build())
.build()
.await
.expect("Failed to build client");
let maintainer = ManagedSessionMaintainer::create_and_start_maintenance(
spanner.clone(),
"projects/p/instances/i/databases/d".to_string(),
"test-role".to_string(),
RequestOptions::default(),
)
.await
.expect("Failed to create ManagedSessionMaintainer");
let db_client = DatabaseClient {
spanner,
session_maintainer: maintainer.clone(),
leader_aware_routing_enabled: true,
};
let runner = db_client
.read_write_transaction()
.build()
.await
.expect("Failed to build runner");
maintainer
.replace_session()
.await
.expect("Failed to replace session");
assert_eq!(
maintainer.session_name(),
"projects/p/instances/i/databases/d/sessions/2"
);
let result = runner
.run(
|tx: crate::read_write_transaction::ReadWriteTransaction| async move {
let count = tx.execute_update("UPDATE Users SET active = true").await?;
assert_eq!(count, 1);
Ok(())
},
)
.await;
result.expect("Transaction failed");
}
}