1pub mod conn_pool;
2pub mod spanner_client;
3
4#[cfg(test)]
5mod tests {
6 use prost_types::{value::Kind, ListValue, Value};
7 use serial_test::serial;
8
9 use google_cloud_gax::conn::{ConnectionOptions, Environment};
10 use google_cloud_gax::grpc::Code;
11 use google_cloud_googleapis::spanner::v1::mutation::{Operation, Write};
12 use google_cloud_googleapis::spanner::v1::{
13 commit_request, transaction_options, transaction_selector, BatchCreateSessionsRequest, BeginTransactionRequest,
14 CommitRequest, CreateSessionRequest, DeleteSessionRequest, ExecuteBatchDmlRequest, ExecuteSqlRequest,
15 GetSessionRequest, ListSessionsRequest, PartitionQueryRequest, PartitionReadRequest, ReadRequest,
16 RequestOptions, RollbackRequest, Session, Transaction, TransactionOptions, TransactionSelector,
17 };
18 use google_cloud_googleapis::spanner::v1::{execute_batch_dml_request, KeySet, Mutation};
19
20 use crate::apiv1::conn_pool::ConnectionManager;
21 use crate::apiv1::spanner_client::Client;
22 use crate::session::client_metadata;
23
24 const DATABASE: &str = "projects/local-project/instances/test-instance/databases/local-database";
25
26 async fn create_spanner_client() -> Client {
27 let cm = ConnectionManager::new(
28 1,
29 &Environment::Emulator("localhost:9010".to_string()),
30 "",
31 &ConnectionOptions::default(),
32 )
33 .await
34 .unwrap();
35 cm.conn().with_metadata(client_metadata(DATABASE))
36 }
37
38 async fn create_session(client: &mut Client) -> Session {
39 let session_request = CreateSessionRequest {
40 database: DATABASE.to_string(),
41 session: None,
42 };
43 let session_response = client.create_session(session_request, None).await.unwrap();
44 session_response.into_inner()
45 }
46
47 async fn begin_read_only_transaction(client: &mut Client, session: &Session) -> Transaction {
48 let request = BeginTransactionRequest {
49 session: session.name.to_string(),
50 options: Option::from(TransactionOptions {
51 exclude_txn_from_change_streams: false,
52 mode: Option::from(transaction_options::Mode::ReadOnly(transaction_options::ReadOnly {
53 return_read_timestamp: false,
54 timestamp_bound: None,
55 })),
56 }),
57 request_options: None,
58 };
59 client.begin_transaction(request, None).await.unwrap().into_inner()
60 }
61
62 async fn begin_read_write_transaction(client: &mut Client, session: &Session) -> Transaction {
63 let request = BeginTransactionRequest {
64 session: session.name.to_string(),
65 options: Some(TransactionOptions {
66 exclude_txn_from_change_streams: false,
67 mode: Some(transaction_options::Mode::ReadWrite(transaction_options::ReadWrite::default())),
68 }),
69 request_options: None,
70 };
71 client.begin_transaction(request, None).await.unwrap().into_inner()
72 }
73
74 #[tokio::test]
75 #[serial]
76 async fn test_create_session() {
77 let mut client = create_spanner_client().await;
78 let request = CreateSessionRequest {
79 database: DATABASE.to_string(),
80 session: None,
81 };
82
83 match client.create_session(request, None).await {
84 Ok(res) => {
85 println!("created session = {}", res.get_ref().name);
86 assert!(!res.get_ref().name.is_empty());
87 }
88 Err(err) => panic!("err: {err:?}"),
89 };
90 }
91
92 #[tokio::test]
93 #[serial]
94 async fn test_batch_create_session() {
95 let mut client = create_spanner_client().await;
96 let request = BatchCreateSessionsRequest {
97 database: DATABASE.to_string(),
98 session_count: 2,
99 session_template: None,
100 };
101
102 match client.batch_create_sessions(request, None).await {
103 Ok(res) => {
104 assert_eq!(
105 res.get_ref().session.len(),
106 2,
107 "created session size = {}",
108 res.get_ref().session.len()
109 );
110 }
111 Err(err) => panic!("err: {err:?}"),
112 };
113 }
114
115 #[tokio::test]
116 #[serial]
117 async fn test_get_session() {
118 let mut client = create_spanner_client().await;
119 let session = create_session(&mut client).await;
120 let request = GetSessionRequest {
121 name: session.name.to_string(),
122 };
123
124 match client.get_session(request, None).await {
125 Ok(res) => {
126 assert_eq!(res.get_ref().name, session.name.to_string());
127 }
128 Err(err) => panic!("err: {err:?}"),
129 };
130 }
131
132 #[tokio::test]
133 #[serial]
134 async fn test_list_sessions() {
135 let mut client = create_spanner_client().await;
136 let request = ListSessionsRequest {
137 database: DATABASE.to_string(),
138 page_size: 10,
139 page_token: "".to_string(),
140 filter: "".to_string(),
141 };
142
143 match client.list_sessions(request, None).await {
144 Ok(res) => {
145 println!("list session size = {}", res.get_ref().sessions.len());
146 }
147 Err(err) => panic!("err: {err:?}"),
148 };
149 }
150
151 #[tokio::test]
152 #[serial]
153 async fn test_delete_session() {
154 let mut client = create_spanner_client().await;
155
156 let batch_request = BatchCreateSessionsRequest {
158 database: DATABASE.to_string(),
159 session_count: 2,
160 session_template: None,
161 };
162 let session_response = client.batch_create_sessions(batch_request, None).await.unwrap();
163 let sessions = &session_response.get_ref().session;
164
165 for session in sessions.iter() {
167 let request = DeleteSessionRequest {
168 name: session.name.to_string(),
169 };
170
171 match client.delete_session(request, None).await {
172 Ok(_) => {}
173 Err(err) => panic!("err: {err:?}"),
174 };
175 }
176 }
177
178 #[tokio::test]
179 #[serial]
180 async fn test_execute_sql() {
181 let mut client = create_spanner_client().await;
182 let session = create_session(&mut client).await;
183 let request = ExecuteSqlRequest {
184 session: session.name.to_string(),
185 transaction: None,
186 sql: "SELECT 1".to_string(),
187 params: None,
188 param_types: Default::default(),
189 resume_token: vec![],
190 query_mode: 0,
191 partition_token: vec![],
192 seqno: 0,
193 query_options: None,
194 request_options: None,
195 directed_read_options: None,
196 data_boost_enabled: false,
197 };
198 match client.execute_sql(request, None).await {
199 Ok(res) => {
200 assert_eq!(1, res.into_inner().rows.len());
201 }
202 Err(err) => panic!("err: {err:?}"),
203 };
204 }
205
206 #[tokio::test]
207 #[serial]
208 async fn test_execute_streaming_sql() {
209 let mut client = create_spanner_client().await;
210 let session = create_session(&mut client).await;
211 let mut request = ExecuteSqlRequest {
212 session: session.name.to_string(),
213 transaction: None,
214 sql: "select 1".to_string(),
215 params: None,
216 param_types: Default::default(),
217 resume_token: vec![],
218 query_mode: 0,
219 partition_token: vec![],
220 seqno: 0,
221 query_options: None,
222 request_options: None,
223 directed_read_options: None,
224 data_boost_enabled: false,
225 };
226
227 let resume_token = match client.execute_streaming_sql(request.clone(), None).await {
228 Ok(res) => {
229 let mut result = res.into_inner();
230 if let Some(next_message) = result.message().await.unwrap() {
231 Some(next_message.resume_token)
232 } else {
233 None
234 }
235 }
236 Err(err) => panic!("err: {err:?}"),
237 };
238 assert!(resume_token.is_some());
239 println!("resume token = {:?}", resume_token.clone().unwrap());
240 request.resume_token = resume_token.unwrap();
241
242 match client.execute_streaming_sql(request, None).await {
243 Ok(res) => {
244 let mut result = res.into_inner();
245 assert!(!result.message().await.unwrap().unwrap().values.is_empty())
246 }
247 Err(err) => panic!("err: {err:?}"),
248 }
249 }
250
251 #[tokio::test]
252 #[serial]
253 async fn test_begin_transaction() {
254 let mut client = create_spanner_client().await;
255 let session = create_session(&mut client).await;
256 let request = BeginTransactionRequest {
257 session: session.name.to_string(),
258 options: Option::from(TransactionOptions {
259 exclude_txn_from_change_streams: false,
260 mode: Option::from(transaction_options::Mode::ReadOnly(transaction_options::ReadOnly {
261 return_read_timestamp: false,
262 timestamp_bound: None,
263 })),
264 }),
265 request_options: None,
266 };
267
268 match client.begin_transaction(request, None).await {
269 Ok(res) => {
270 let tx_id = res.into_inner().id;
271 println!("tx id is {tx_id:?}");
272 assert!(!tx_id.is_empty());
273 }
274 Err(err) => panic!("err: {err:?}"),
275 };
276 }
277
278 #[tokio::test]
279 #[serial]
280 async fn test_execute_batch_dml() {
281 let mut client = create_spanner_client().await;
282 let session = create_session(&mut client).await;
283 let tx = begin_read_write_transaction(&mut client, &session).await;
284 let request = ExecuteBatchDmlRequest {
285 session: session.name.to_string(),
286 transaction: Option::from(TransactionSelector {
287 selector: Option::from(transaction_selector::Selector::Id(tx.id.clone())),
288 }),
289 statements: vec![
290 execute_batch_dml_request::Statement {
291 sql: "INSERT INTO Guild (GuildId,OwnerUserId,UpdatedAt) VALUES('1', 'u1', CURRENT_TIMESTAMP())"
292 .to_string(),
293 params: None,
294 param_types: Default::default(),
295 },
296 execute_batch_dml_request::Statement {
297 sql: "INSERT INTO Guild (GuildId,OwnerUserId,UpdatedAt) VALUES('2', 'u2', CURRENT_TIMESTAMP())"
298 .to_string(),
299 params: None,
300 param_types: Default::default(),
301 },
302 ],
303 seqno: 0,
304 request_options: None,
305 };
306
307 let result = client.execute_batch_dml(request, None).await;
308 client
309 .rollback(
310 RollbackRequest {
311 session: session.name.to_string(),
312 transaction_id: tx.id,
313 },
314 None,
315 )
316 .await
317 .unwrap();
318 match result {
319 Ok(res) => {
320 let status = res.into_inner().status.unwrap();
321 assert_eq!(Code::Ok, Code::from(status.code), "gRPC success but error found : {status:?}");
322 }
323 Err(err) => panic!("err: {err:?}"),
324 };
325 }
326
327 #[tokio::test]
328 #[serial]
329 async fn test_execute_batch_dml_error_as_tonic_check() {
330 let mut client = create_spanner_client().await;
331 let session = create_session(&mut client).await;
332 let tx = begin_read_write_transaction(&mut client, &session).await;
333 let request = ExecuteBatchDmlRequest {
334 session: session.name.to_string(),
335 transaction: Option::from(TransactionSelector {
336 selector: Option::from(transaction_selector::Selector::Id(tx.id.clone())),
337 }),
338 statements: vec![execute_batch_dml_request::Statement {
339 sql: "INSERT INTO GuildX (GuildId,OwnerUserId,UpdatedAt) VALUES('1', 'u1', CURRENT_TIMESTAMP())"
340 .to_string(),
341 params: None,
342 param_types: Default::default(),
343 }],
344 seqno: 0,
345 request_options: None,
346 };
347
348 let result = client.execute_batch_dml(request, None).await;
349 client
350 .rollback(
351 RollbackRequest {
352 session: session.name.to_string(),
353 transaction_id: tx.id,
354 },
355 None,
356 )
357 .await
358 .unwrap();
359 match result {
360 Ok(res) => panic!("must be error code = {:?}", res.into_inner().status.unwrap().code),
361 Err(status) => {
362 assert_eq!(
363 Code::InvalidArgument,
364 status.code(),
365 "gRPC success but error found : {status:?}"
366 );
367 }
368 };
369 }
370
371 #[tokio::test]
372 #[serial]
373 async fn test_read() {
374 let mut client = create_spanner_client().await;
375 let session = create_session(&mut client).await;
376 let request = ReadRequest {
377 session: session.name.to_string(),
378 transaction: None,
379 table: "Guild".to_string(),
380 index: "".to_string(),
381 columns: vec!["GuildId".to_string()],
382 key_set: Some(KeySet {
383 keys: vec![],
384 ranges: vec![],
385 all: true,
386 }),
387 resume_token: vec![],
388 partition_token: vec![],
389 request_options: None,
390 limit: 0,
391 data_boost_enabled: false,
392 order_by: 0,
393 directed_read_options: None,
394 lock_hint: 0,
395 };
396
397 match client.read(request, None).await {
398 Ok(res) => {
399 println!("row size = {:?}", res.into_inner().rows.len());
400 }
401 Err(err) => panic!("err: {err:?}"),
402 };
403 }
404
405 #[tokio::test]
406 #[serial]
407 async fn test_streaming_read() {
408 let mut client = create_spanner_client().await;
409 let session = create_session(&mut client).await;
410 let request = ReadRequest {
411 session: session.name.to_string(),
412 transaction: None,
413 table: "User".to_string(),
414 index: "".to_string(),
415 columns: vec!["UserId".to_string()],
416 key_set: Some(KeySet {
417 keys: vec![],
418 ranges: vec![],
419 all: true,
420 }),
421 resume_token: vec![],
422 partition_token: vec![],
423 request_options: None,
424 limit: 0,
425 data_boost_enabled: false,
426 order_by: 0,
427 directed_read_options: None,
428 lock_hint: 0,
429 };
430
431 match client.streaming_read(request, None).await {
432 Ok(res) => match res.into_inner().message().await {
433 Ok(..) => {}
434 Err(err) => panic!("err: {err:?}"),
435 },
436 Err(err) => panic!("err: {err:?}"),
437 };
438 }
439
440 #[tokio::test]
441 #[serial]
442 async fn test_commit() {
443 let mut client = create_spanner_client().await;
444 let session = create_session(&mut client).await;
445 let tx = begin_read_write_transaction(&mut client, &session).await;
446 let request = CommitRequest {
447 session: session.name.to_string(),
448 mutations: vec![Mutation {
449 operation: Some(Operation::InsertOrUpdate(Write {
450 table: "Guild".to_string(),
451 columns: vec![
452 "GuildId".to_string(),
453 "OwnerUserId".to_string(),
454 "UpdatedAt".to_string(),
455 ],
456 values: vec![ListValue {
457 values: vec![
458 Value {
459 kind: Some(Kind::StringValue("g1".to_string())),
460 },
461 Value {
462 kind: Some(Kind::StringValue("u1".to_string())),
463 },
464 Value {
465 kind: Some(Kind::StringValue("spanner.commit_timestamp()".to_string())),
466 },
467 ],
468 }],
469 })),
470 }],
471 transaction: Option::from(commit_request::Transaction::TransactionId(tx.id)),
472 request_options: Option::from(RequestOptions {
473 priority: 10,
474 request_tag: "".to_string(),
475 transaction_tag: "".to_string(),
476 }),
477 return_commit_stats: false,
478 max_commit_delay: None,
479 };
480
481 match client.commit(request, None).await {
482 Ok(res) => {
483 assert!(res.into_inner().commit_timestamp.is_some());
484 }
485 Err(err) => panic!("err: {err:?}"),
486 };
487 }
488
489 #[tokio::test]
490 #[serial]
491 async fn test_rollback() {
492 let mut client = create_spanner_client().await;
493 let session = create_session(&mut client).await;
494 let tx = begin_read_write_transaction(&mut client, &session).await;
495 let request = RollbackRequest {
496 session: session.name.to_string(),
497 transaction_id: tx.id,
498 };
499
500 match client.rollback(request, None).await {
501 Ok(_) => {}
502 Err(err) => panic!("err: {err:?}"),
503 };
504 }
505
506 #[tokio::test]
507 #[serial]
508 async fn test_partition_query() {
509 let mut client = create_spanner_client().await;
510 let session = create_session(&mut client).await;
511 let tx = begin_read_only_transaction(&mut client, &session).await;
512 let request = PartitionQueryRequest {
513 session: session.name.to_string(),
514 transaction: Option::from(TransactionSelector {
515 selector: Option::from(transaction_selector::Selector::Id(tx.id)),
516 }),
517 sql: "SELECT * FROM User".to_string(),
518 params: None,
519 param_types: Default::default(),
520 partition_options: None,
521 };
522
523 match client.partition_query(request, None).await {
524 Ok(res) => {
525 println!("partition count {:?}", res.into_inner().partitions.len());
526 assert_eq!(true, true);
527 }
528 Err(err) => {
529 println!("error code = {0}, {1}", err.code(), err.message());
530 assert_eq!(false, true)
531 }
532 };
533 }
534
535 #[tokio::test]
536 #[serial]
537 async fn test_partition_read() {
538 let mut client = create_spanner_client().await;
539 let session = create_session(&mut client).await;
540 let tx = begin_read_only_transaction(&mut client, &session).await;
541 let request = PartitionReadRequest {
542 session: session.name.to_string(),
543 transaction: Option::from(TransactionSelector {
544 selector: Option::from(transaction_selector::Selector::Id(tx.id)),
545 }),
546 table: "User".to_string(),
547 index: "".to_string(),
548 columns: vec![],
549 partition_options: None,
550 key_set: None,
551 };
552
553 match client.partition_read(request, None).await {
554 Ok(res) => {
555 println!("partition count {:?}", res.into_inner().partitions.len());
556 assert_eq!(true, true);
557 }
558 Err(err) => {
559 println!("error code = {0}, {1}", err.code(), err.message());
560 assert_eq!(false, true)
561 }
562 };
563 }
564}