1pub mod conn_pool;
2pub mod spanner_client;
3
4#[cfg(test)]
5mod tests {
6 use prost_types::{value::Kind, ListValue, Value};
7 use serial_test::serial;
8
9 use crate::apiv1::conn_pool::ConnectionManager;
10 use crate::apiv1::spanner_client::Client;
11 use crate::session::client_metadata;
12 use google_cloud_gax::conn::{ConnectionOptions, Environment};
13 use google_cloud_gax::grpc::Code;
14 use google_cloud_googleapis::spanner::v1::mutation::{Operation, Write};
15 use google_cloud_googleapis::spanner::v1::transaction_options::IsolationLevel;
16 use google_cloud_googleapis::spanner::v1::{
17 commit_request, transaction_options, transaction_selector, BatchCreateSessionsRequest, BeginTransactionRequest,
18 CommitRequest, CreateSessionRequest, DeleteSessionRequest, ExecuteBatchDmlRequest, ExecuteSqlRequest,
19 GetSessionRequest, ListSessionsRequest, PartitionQueryRequest, PartitionReadRequest, ReadRequest,
20 RequestOptions, RollbackRequest, Session, Transaction, TransactionOptions, TransactionSelector,
21 };
22 use google_cloud_googleapis::spanner::v1::{execute_batch_dml_request, KeySet, Mutation};
23
24 const DATABASE: &str = "projects/local-project/instances/test-instance/databases/local-database";
25
26 async fn create_spanner_client() -> Client {
27 let cm = ConnectionManager::new(
28 1,
29 &Environment::Emulator("localhost:9010".to_string()),
30 "",
31 &ConnectionOptions::default(),
32 )
33 .await
34 .unwrap();
35 cm.conn().with_metadata(client_metadata(DATABASE))
36 }
37
38 async fn create_session(client: &mut Client) -> Session {
39 let session_request = CreateSessionRequest {
40 database: DATABASE.to_string(),
41 session: None,
42 };
43 let session_response = client.create_session(session_request, true, None).await.unwrap();
44 session_response.into_inner()
45 }
46
47 async fn begin_read_only_transaction(client: &mut Client, session: &Session) -> Transaction {
48 let request = BeginTransactionRequest {
49 session: session.name.to_string(),
50 options: Option::from(TransactionOptions {
51 exclude_txn_from_change_streams: false,
52 mode: Option::from(transaction_options::Mode::ReadOnly(transaction_options::ReadOnly {
53 return_read_timestamp: false,
54 timestamp_bound: None,
55 })),
56 isolation_level: IsolationLevel::Unspecified as i32,
57 }),
58 request_options: None,
59 mutation_key: None,
60 };
61 client
62 .begin_transaction(request, true, None)
63 .await
64 .unwrap()
65 .into_inner()
66 }
67
68 async fn begin_read_write_transaction(client: &mut Client, session: &Session) -> Transaction {
69 let request = BeginTransactionRequest {
70 session: session.name.to_string(),
71 options: Some(TransactionOptions {
72 exclude_txn_from_change_streams: false,
73 mode: Some(transaction_options::Mode::ReadWrite(transaction_options::ReadWrite::default())),
74 isolation_level: IsolationLevel::Unspecified as i32,
75 }),
76 request_options: None,
77 mutation_key: None,
78 };
79 client
80 .begin_transaction(request, true, None)
81 .await
82 .unwrap()
83 .into_inner()
84 }
85
86 #[tokio::test]
87 #[serial]
88 async fn test_create_session() {
89 let mut client = create_spanner_client().await;
90 let request = CreateSessionRequest {
91 database: DATABASE.to_string(),
92 session: None,
93 };
94
95 match client.create_session(request, true, None).await {
96 Ok(res) => {
97 println!("created session = {}", res.get_ref().name);
98 assert!(!res.get_ref().name.is_empty());
99 }
100 Err(err) => panic!("err: {err:?}"),
101 };
102 }
103
104 #[tokio::test]
105 #[serial]
106 async fn test_batch_create_session() {
107 let mut client = create_spanner_client().await;
108 let request = BatchCreateSessionsRequest {
109 database: DATABASE.to_string(),
110 session_count: 2,
111 session_template: None,
112 };
113
114 match client.batch_create_sessions(request, true, None).await {
115 Ok(res) => {
116 assert_eq!(
117 res.get_ref().session.len(),
118 2,
119 "created session size = {}",
120 res.get_ref().session.len()
121 );
122 }
123 Err(err) => panic!("err: {err:?}"),
124 };
125 }
126
127 #[tokio::test]
128 #[serial]
129 async fn test_get_session() {
130 let mut client = create_spanner_client().await;
131 let session = create_session(&mut client).await;
132 let request = GetSessionRequest {
133 name: session.name.to_string(),
134 };
135
136 match client.get_session(request, true, None).await {
137 Ok(res) => {
138 assert_eq!(res.get_ref().name, session.name.to_string());
139 }
140 Err(err) => panic!("err: {err:?}"),
141 };
142 }
143
144 #[tokio::test]
145 #[serial]
146 async fn test_list_sessions() {
147 let mut client = create_spanner_client().await;
148 let request = ListSessionsRequest {
149 database: DATABASE.to_string(),
150 page_size: 10,
151 page_token: "".to_string(),
152 filter: "".to_string(),
153 };
154
155 match client.list_sessions(request, true, None).await {
156 Ok(res) => {
157 println!("list session size = {}", res.get_ref().sessions.len());
158 }
159 Err(err) => panic!("err: {err:?}"),
160 };
161 }
162
163 #[tokio::test]
164 #[serial]
165 async fn test_delete_session() {
166 let mut client = create_spanner_client().await;
167
168 let batch_request = BatchCreateSessionsRequest {
170 database: DATABASE.to_string(),
171 session_count: 2,
172 session_template: None,
173 };
174 let session_response = client.batch_create_sessions(batch_request, true, None).await.unwrap();
175 let sessions = &session_response.get_ref().session;
176
177 for session in sessions.iter() {
179 let request = DeleteSessionRequest {
180 name: session.name.to_string(),
181 };
182
183 match client.delete_session(request, true, None).await {
184 Ok(_) => {}
185 Err(err) => panic!("err: {err:?}"),
186 };
187 }
188 }
189
190 #[tokio::test]
191 #[serial]
192 async fn test_execute_sql() {
193 let mut client = create_spanner_client().await;
194 let session = create_session(&mut client).await;
195 let request = ExecuteSqlRequest {
196 session: session.name.to_string(),
197 transaction: None,
198 sql: "SELECT 1".to_string(),
199 params: None,
200 param_types: Default::default(),
201 resume_token: vec![],
202 query_mode: 0,
203 partition_token: vec![],
204 seqno: 0,
205 query_options: None,
206 request_options: None,
207 directed_read_options: None,
208 data_boost_enabled: false,
209 last_statement: false,
210 };
211 match client.execute_sql(request, true, None).await {
212 Ok(res) => {
213 assert_eq!(1, res.into_inner().rows.len());
214 }
215 Err(err) => panic!("err: {err:?}"),
216 };
217 }
218
219 #[tokio::test]
220 #[serial]
221 async fn test_execute_streaming_sql() {
222 let mut client = create_spanner_client().await;
223 let session = create_session(&mut client).await;
224 let mut request = ExecuteSqlRequest {
225 session: session.name.to_string(),
226 transaction: None,
227 sql: "select 1".to_string(),
228 params: None,
229 param_types: Default::default(),
230 resume_token: vec![],
231 query_mode: 0,
232 partition_token: vec![],
233 seqno: 0,
234 query_options: None,
235 request_options: None,
236 directed_read_options: None,
237 data_boost_enabled: false,
238 last_statement: false,
239 };
240
241 let resume_token = match client.execute_streaming_sql(request.clone(), true, None).await {
242 Ok(res) => {
243 let mut result = res.into_inner();
244 if let Some(next_message) = result.message().await.unwrap() {
245 Some(next_message.resume_token)
246 } else {
247 None
248 }
249 }
250 Err(err) => panic!("err: {err:?}"),
251 };
252 assert!(resume_token.is_some());
253 println!("resume token = {:?}", resume_token.clone().unwrap());
254 request.resume_token = resume_token.unwrap();
255
256 match client.execute_streaming_sql(request, true, None).await {
257 Ok(res) => {
258 let mut result = res.into_inner();
259 assert!(!result.message().await.unwrap().unwrap().values.is_empty())
260 }
261 Err(err) => panic!("err: {err:?}"),
262 }
263 }
264
265 #[tokio::test]
266 #[serial]
267 async fn test_begin_transaction() {
268 let mut client = create_spanner_client().await;
269 let session = create_session(&mut client).await;
270 let request = BeginTransactionRequest {
271 session: session.name.to_string(),
272 options: Option::from(TransactionOptions {
273 exclude_txn_from_change_streams: false,
274 mode: Option::from(transaction_options::Mode::ReadOnly(transaction_options::ReadOnly {
275 return_read_timestamp: false,
276 timestamp_bound: None,
277 })),
278 isolation_level: IsolationLevel::Unspecified as i32,
279 }),
280 request_options: None,
281 mutation_key: None,
282 };
283
284 match client.begin_transaction(request, true, None).await {
285 Ok(res) => {
286 let tx_id = res.into_inner().id;
287 println!("tx id is {tx_id:?}");
288 assert!(!tx_id.is_empty());
289 }
290 Err(err) => panic!("err: {err:?}"),
291 };
292 }
293
294 #[tokio::test]
295 #[serial]
296 async fn test_execute_batch_dml() {
297 let mut client = create_spanner_client().await;
298 let session = create_session(&mut client).await;
299 let tx = begin_read_write_transaction(&mut client, &session).await;
300 let request = ExecuteBatchDmlRequest {
301 session: session.name.to_string(),
302 transaction: Option::from(TransactionSelector {
303 selector: Option::from(transaction_selector::Selector::Id(tx.id.clone())),
304 }),
305 statements: vec![
306 execute_batch_dml_request::Statement {
307 sql: "INSERT INTO Guild (GuildId,OwnerUserId,UpdatedAt) VALUES('1', 'u1', CURRENT_TIMESTAMP())"
308 .to_string(),
309 params: None,
310 param_types: Default::default(),
311 },
312 execute_batch_dml_request::Statement {
313 sql: "INSERT INTO Guild (GuildId,OwnerUserId,UpdatedAt) VALUES('2', 'u2', CURRENT_TIMESTAMP())"
314 .to_string(),
315 params: None,
316 param_types: Default::default(),
317 },
318 ],
319 seqno: 0,
320 request_options: None,
321 last_statements: false,
322 };
323
324 let result = client.execute_batch_dml(request, true, None).await;
325 client
326 .rollback(
327 RollbackRequest {
328 session: session.name.to_string(),
329 transaction_id: tx.id,
330 },
331 true,
332 None,
333 )
334 .await
335 .unwrap();
336 match result {
337 Ok(res) => {
338 let status = res.into_inner().status.unwrap();
339 assert_eq!(Code::Ok, Code::from(status.code), "gRPC success but error found : {status:?}");
340 }
341 Err(err) => panic!("err: {err:?}"),
342 };
343 }
344
345 #[tokio::test]
346 #[serial]
347 async fn test_execute_batch_dml_error_as_tonic_check() {
348 let mut client = create_spanner_client().await;
349 let session = create_session(&mut client).await;
350 let tx = begin_read_write_transaction(&mut client, &session).await;
351 let request = ExecuteBatchDmlRequest {
352 session: session.name.to_string(),
353 transaction: Option::from(TransactionSelector {
354 selector: Option::from(transaction_selector::Selector::Id(tx.id.clone())),
355 }),
356 statements: vec![execute_batch_dml_request::Statement {
357 sql: "INSERT INTO GuildX (GuildId,OwnerUserId,UpdatedAt) VALUES('1', 'u1', CURRENT_TIMESTAMP())"
358 .to_string(),
359 params: None,
360 param_types: Default::default(),
361 }],
362 seqno: 0,
363 request_options: None,
364 last_statements: false,
365 };
366
367 let result = client.execute_batch_dml(request, true, None).await;
368 client
369 .rollback(
370 RollbackRequest {
371 session: session.name.to_string(),
372 transaction_id: tx.id,
373 },
374 true,
375 None,
376 )
377 .await
378 .unwrap();
379 match result {
380 Ok(res) => panic!("must be error code = {:?}", res.into_inner().status.unwrap().code),
381 Err(status) => {
382 assert_eq!(
383 Code::InvalidArgument,
384 status.code(),
385 "gRPC success but error found : {status:?}"
386 );
387 }
388 };
389 }
390
391 #[tokio::test]
392 #[serial]
393 async fn test_read() {
394 let mut client = create_spanner_client().await;
395 let session = create_session(&mut client).await;
396 let request = ReadRequest {
397 session: session.name.to_string(),
398 transaction: None,
399 table: "Guild".to_string(),
400 index: "".to_string(),
401 columns: vec!["GuildId".to_string()],
402 key_set: Some(KeySet {
403 keys: vec![],
404 ranges: vec![],
405 all: true,
406 }),
407 resume_token: vec![],
408 partition_token: vec![],
409 request_options: None,
410 limit: 0,
411 data_boost_enabled: false,
412 order_by: 0,
413 directed_read_options: None,
414 lock_hint: 0,
415 };
416
417 match client.read(request, true, None).await {
418 Ok(res) => {
419 println!("row size = {:?}", res.into_inner().rows.len());
420 }
421 Err(err) => panic!("err: {err:?}"),
422 };
423 }
424
425 #[tokio::test]
426 #[serial]
427 async fn test_streaming_read() {
428 let mut client = create_spanner_client().await;
429 let session = create_session(&mut client).await;
430 let request = ReadRequest {
431 session: session.name.to_string(),
432 transaction: None,
433 table: "User".to_string(),
434 index: "".to_string(),
435 columns: vec!["UserId".to_string()],
436 key_set: Some(KeySet {
437 keys: vec![],
438 ranges: vec![],
439 all: true,
440 }),
441 resume_token: vec![],
442 partition_token: vec![],
443 request_options: None,
444 limit: 0,
445 data_boost_enabled: false,
446 order_by: 0,
447 directed_read_options: None,
448 lock_hint: 0,
449 };
450
451 match client.streaming_read(request, true, None).await {
452 Ok(res) => match res.into_inner().message().await {
453 Ok(..) => {}
454 Err(err) => panic!("err: {err:?}"),
455 },
456 Err(err) => panic!("err: {err:?}"),
457 };
458 }
459
460 #[tokio::test]
461 #[serial]
462 async fn test_commit() {
463 let mut client = create_spanner_client().await;
464 let session = create_session(&mut client).await;
465 let tx = begin_read_write_transaction(&mut client, &session).await;
466 let request = CommitRequest {
467 session: session.name.to_string(),
468 mutations: vec![Mutation {
469 operation: Some(Operation::InsertOrUpdate(Write {
470 table: "Guild".to_string(),
471 columns: vec![
472 "GuildId".to_string(),
473 "OwnerUserId".to_string(),
474 "UpdatedAt".to_string(),
475 ],
476 values: vec![ListValue {
477 values: vec![
478 Value {
479 kind: Some(Kind::StringValue("g1".to_string())),
480 },
481 Value {
482 kind: Some(Kind::StringValue("u1".to_string())),
483 },
484 Value {
485 kind: Some(Kind::StringValue("spanner.commit_timestamp()".to_string())),
486 },
487 ],
488 }],
489 })),
490 }],
491 transaction: Option::from(commit_request::Transaction::TransactionId(tx.id)),
492 request_options: Option::from(RequestOptions {
493 priority: 10,
494 request_tag: "".to_string(),
495 transaction_tag: "".to_string(),
496 }),
497 return_commit_stats: false,
498 max_commit_delay: None,
499 precommit_token: None,
500 };
501
502 match client.commit(request, true, None).await {
503 Ok(res) => {
504 assert!(res.into_inner().commit_timestamp.is_some());
505 }
506 Err(err) => panic!("err: {err:?}"),
507 };
508 }
509
510 #[tokio::test]
511 #[serial]
512 async fn test_rollback() {
513 let mut client = create_spanner_client().await;
514 let session = create_session(&mut client).await;
515 let tx = begin_read_write_transaction(&mut client, &session).await;
516 let request = RollbackRequest {
517 session: session.name.to_string(),
518 transaction_id: tx.id,
519 };
520
521 match client.rollback(request, true, None).await {
522 Ok(_) => {}
523 Err(err) => panic!("err: {err:?}"),
524 };
525 }
526
527 #[tokio::test]
528 #[serial]
529 async fn test_partition_query() {
530 let mut client = create_spanner_client().await;
531 let session = create_session(&mut client).await;
532 let tx = begin_read_only_transaction(&mut client, &session).await;
533 let request = PartitionQueryRequest {
534 session: session.name.to_string(),
535 transaction: Option::from(TransactionSelector {
536 selector: Option::from(transaction_selector::Selector::Id(tx.id)),
537 }),
538 sql: "SELECT * FROM User".to_string(),
539 params: None,
540 param_types: Default::default(),
541 partition_options: None,
542 };
543
544 match client.partition_query(request, true, None).await {
545 Ok(res) => {
546 println!("partition count {:?}", res.into_inner().partitions.len());
547 assert_eq!(true, true);
548 }
549 Err(err) => {
550 println!("error code = {0}, {1}", err.code(), err.message());
551 assert_eq!(false, true)
552 }
553 };
554 }
555
556 #[tokio::test]
557 #[serial]
558 async fn test_partition_read() {
559 let mut client = create_spanner_client().await;
560 let session = create_session(&mut client).await;
561 let tx = begin_read_only_transaction(&mut client, &session).await;
562 let request = PartitionReadRequest {
563 session: session.name.to_string(),
564 transaction: Option::from(TransactionSelector {
565 selector: Option::from(transaction_selector::Selector::Id(tx.id)),
566 }),
567 table: "User".to_string(),
568 index: "".to_string(),
569 columns: vec![],
570 partition_options: None,
571 key_set: None,
572 };
573
574 match client.partition_read(request, true, None).await {
575 Ok(res) => {
576 println!("partition count {:?}", res.into_inner().partitions.len());
577 assert_eq!(true, true);
578 }
579 Err(err) => {
580 println!("error code = {0}, {1}", err.code(), err.message());
581 assert_eq!(false, true)
582 }
583 };
584 }
585}