google_cloud_spanner/apiv1/
mod.rs

1pub mod conn_pool;
2pub mod spanner_client;
3
4#[cfg(test)]
5mod tests {
6    use prost_types::{value::Kind, ListValue, Value};
7    use serial_test::serial;
8
9    use google_cloud_gax::conn::{ConnectionOptions, Environment};
10    use google_cloud_gax::grpc::Code;
11    use google_cloud_googleapis::spanner::v1::mutation::{Operation, Write};
12    use google_cloud_googleapis::spanner::v1::{
13        commit_request, transaction_options, transaction_selector, BatchCreateSessionsRequest, BeginTransactionRequest,
14        CommitRequest, CreateSessionRequest, DeleteSessionRequest, ExecuteBatchDmlRequest, ExecuteSqlRequest,
15        GetSessionRequest, ListSessionsRequest, PartitionQueryRequest, PartitionReadRequest, ReadRequest,
16        RequestOptions, RollbackRequest, Session, Transaction, TransactionOptions, TransactionSelector,
17    };
18    use google_cloud_googleapis::spanner::v1::{execute_batch_dml_request, KeySet, Mutation};
19
20    use crate::apiv1::conn_pool::ConnectionManager;
21    use crate::apiv1::spanner_client::Client;
22    use crate::session::client_metadata;
23
24    const DATABASE: &str = "projects/local-project/instances/test-instance/databases/local-database";
25
26    async fn create_spanner_client() -> Client {
27        let cm = ConnectionManager::new(
28            1,
29            &Environment::Emulator("localhost:9010".to_string()),
30            "",
31            &ConnectionOptions::default(),
32        )
33        .await
34        .unwrap();
35        cm.conn().with_metadata(client_metadata(DATABASE))
36    }
37
38    async fn create_session(client: &mut Client) -> Session {
39        let session_request = CreateSessionRequest {
40            database: DATABASE.to_string(),
41            session: None,
42        };
43        let session_response = client.create_session(session_request, None).await.unwrap();
44        session_response.into_inner()
45    }
46
47    async fn begin_read_only_transaction(client: &mut Client, session: &Session) -> Transaction {
48        let request = BeginTransactionRequest {
49            session: session.name.to_string(),
50            options: Option::from(TransactionOptions {
51                exclude_txn_from_change_streams: false,
52                mode: Option::from(transaction_options::Mode::ReadOnly(transaction_options::ReadOnly {
53                    return_read_timestamp: false,
54                    timestamp_bound: None,
55                })),
56            }),
57            request_options: None,
58        };
59        client.begin_transaction(request, None).await.unwrap().into_inner()
60    }
61
62    async fn begin_read_write_transaction(client: &mut Client, session: &Session) -> Transaction {
63        let request = BeginTransactionRequest {
64            session: session.name.to_string(),
65            options: Some(TransactionOptions {
66                exclude_txn_from_change_streams: false,
67                mode: Some(transaction_options::Mode::ReadWrite(transaction_options::ReadWrite::default())),
68            }),
69            request_options: None,
70        };
71        client.begin_transaction(request, None).await.unwrap().into_inner()
72    }
73
74    #[tokio::test]
75    #[serial]
76    async fn test_create_session() {
77        let mut client = create_spanner_client().await;
78        let request = CreateSessionRequest {
79            database: DATABASE.to_string(),
80            session: None,
81        };
82
83        match client.create_session(request, None).await {
84            Ok(res) => {
85                println!("created session = {}", res.get_ref().name);
86                assert!(!res.get_ref().name.is_empty());
87            }
88            Err(err) => panic!("err: {err:?}"),
89        };
90    }
91
92    #[tokio::test]
93    #[serial]
94    async fn test_batch_create_session() {
95        let mut client = create_spanner_client().await;
96        let request = BatchCreateSessionsRequest {
97            database: DATABASE.to_string(),
98            session_count: 2,
99            session_template: None,
100        };
101
102        match client.batch_create_sessions(request, None).await {
103            Ok(res) => {
104                assert_eq!(
105                    res.get_ref().session.len(),
106                    2,
107                    "created session size = {}",
108                    res.get_ref().session.len()
109                );
110            }
111            Err(err) => panic!("err: {err:?}"),
112        };
113    }
114
115    #[tokio::test]
116    #[serial]
117    async fn test_get_session() {
118        let mut client = create_spanner_client().await;
119        let session = create_session(&mut client).await;
120        let request = GetSessionRequest {
121            name: session.name.to_string(),
122        };
123
124        match client.get_session(request, None).await {
125            Ok(res) => {
126                assert_eq!(res.get_ref().name, session.name.to_string());
127            }
128            Err(err) => panic!("err: {err:?}"),
129        };
130    }
131
132    #[tokio::test]
133    #[serial]
134    async fn test_list_sessions() {
135        let mut client = create_spanner_client().await;
136        let request = ListSessionsRequest {
137            database: DATABASE.to_string(),
138            page_size: 10,
139            page_token: "".to_string(),
140            filter: "".to_string(),
141        };
142
143        match client.list_sessions(request, None).await {
144            Ok(res) => {
145                println!("list session size = {}", res.get_ref().sessions.len());
146            }
147            Err(err) => panic!("err: {err:?}"),
148        };
149    }
150
151    #[tokio::test]
152    #[serial]
153    async fn test_delete_session() {
154        let mut client = create_spanner_client().await;
155
156        // create sessions
157        let batch_request = BatchCreateSessionsRequest {
158            database: DATABASE.to_string(),
159            session_count: 2,
160            session_template: None,
161        };
162        let session_response = client.batch_create_sessions(batch_request, None).await.unwrap();
163        let sessions = &session_response.get_ref().session;
164
165        // all delete
166        for session in sessions.iter() {
167            let request = DeleteSessionRequest {
168                name: session.name.to_string(),
169            };
170
171            match client.delete_session(request, None).await {
172                Ok(_) => {}
173                Err(err) => panic!("err: {err:?}"),
174            };
175        }
176    }
177
178    #[tokio::test]
179    #[serial]
180    async fn test_execute_sql() {
181        let mut client = create_spanner_client().await;
182        let session = create_session(&mut client).await;
183        let request = ExecuteSqlRequest {
184            session: session.name.to_string(),
185            transaction: None,
186            sql: "SELECT 1".to_string(),
187            params: None,
188            param_types: Default::default(),
189            resume_token: vec![],
190            query_mode: 0,
191            partition_token: vec![],
192            seqno: 0,
193            query_options: None,
194            request_options: None,
195            directed_read_options: None,
196            data_boost_enabled: false,
197        };
198        match client.execute_sql(request, None).await {
199            Ok(res) => {
200                assert_eq!(1, res.into_inner().rows.len());
201            }
202            Err(err) => panic!("err: {err:?}"),
203        };
204    }
205
206    #[tokio::test]
207    #[serial]
208    async fn test_execute_streaming_sql() {
209        let mut client = create_spanner_client().await;
210        let session = create_session(&mut client).await;
211        let mut request = ExecuteSqlRequest {
212            session: session.name.to_string(),
213            transaction: None,
214            sql: "select 1".to_string(),
215            params: None,
216            param_types: Default::default(),
217            resume_token: vec![],
218            query_mode: 0,
219            partition_token: vec![],
220            seqno: 0,
221            query_options: None,
222            request_options: None,
223            directed_read_options: None,
224            data_boost_enabled: false,
225        };
226
227        let resume_token = match client.execute_streaming_sql(request.clone(), None).await {
228            Ok(res) => {
229                let mut result = res.into_inner();
230                if let Some(next_message) = result.message().await.unwrap() {
231                    Some(next_message.resume_token)
232                } else {
233                    None
234                }
235            }
236            Err(err) => panic!("err: {err:?}"),
237        };
238        assert!(resume_token.is_some());
239        println!("resume token = {:?}", resume_token.clone().unwrap());
240        request.resume_token = resume_token.unwrap();
241
242        match client.execute_streaming_sql(request, None).await {
243            Ok(res) => {
244                let mut result = res.into_inner();
245                assert!(!result.message().await.unwrap().unwrap().values.is_empty())
246            }
247            Err(err) => panic!("err: {err:?}"),
248        }
249    }
250
251    #[tokio::test]
252    #[serial]
253    async fn test_begin_transaction() {
254        let mut client = create_spanner_client().await;
255        let session = create_session(&mut client).await;
256        let request = BeginTransactionRequest {
257            session: session.name.to_string(),
258            options: Option::from(TransactionOptions {
259                exclude_txn_from_change_streams: false,
260                mode: Option::from(transaction_options::Mode::ReadOnly(transaction_options::ReadOnly {
261                    return_read_timestamp: false,
262                    timestamp_bound: None,
263                })),
264            }),
265            request_options: None,
266        };
267
268        match client.begin_transaction(request, None).await {
269            Ok(res) => {
270                let tx_id = res.into_inner().id;
271                println!("tx id is {tx_id:?}");
272                assert!(!tx_id.is_empty());
273            }
274            Err(err) => panic!("err: {err:?}"),
275        };
276    }
277
278    #[tokio::test]
279    #[serial]
280    async fn test_execute_batch_dml() {
281        let mut client = create_spanner_client().await;
282        let session = create_session(&mut client).await;
283        let tx = begin_read_write_transaction(&mut client, &session).await;
284        let request = ExecuteBatchDmlRequest {
285            session: session.name.to_string(),
286            transaction: Option::from(TransactionSelector {
287                selector: Option::from(transaction_selector::Selector::Id(tx.id.clone())),
288            }),
289            statements: vec![
290                execute_batch_dml_request::Statement {
291                    sql: "INSERT INTO Guild (GuildId,OwnerUserId,UpdatedAt) VALUES('1', 'u1', CURRENT_TIMESTAMP())"
292                        .to_string(),
293                    params: None,
294                    param_types: Default::default(),
295                },
296                execute_batch_dml_request::Statement {
297                    sql: "INSERT INTO Guild (GuildId,OwnerUserId,UpdatedAt) VALUES('2', 'u2', CURRENT_TIMESTAMP())"
298                        .to_string(),
299                    params: None,
300                    param_types: Default::default(),
301                },
302            ],
303            seqno: 0,
304            request_options: None,
305        };
306
307        let result = client.execute_batch_dml(request, None).await;
308        client
309            .rollback(
310                RollbackRequest {
311                    session: session.name.to_string(),
312                    transaction_id: tx.id,
313                },
314                None,
315            )
316            .await
317            .unwrap();
318        match result {
319            Ok(res) => {
320                let status = res.into_inner().status.unwrap();
321                assert_eq!(Code::Ok, Code::from(status.code), "gRPC success but error found : {status:?}");
322            }
323            Err(err) => panic!("err: {err:?}"),
324        };
325    }
326
327    #[tokio::test]
328    #[serial]
329    async fn test_execute_batch_dml_error_as_tonic_check() {
330        let mut client = create_spanner_client().await;
331        let session = create_session(&mut client).await;
332        let tx = begin_read_write_transaction(&mut client, &session).await;
333        let request = ExecuteBatchDmlRequest {
334            session: session.name.to_string(),
335            transaction: Option::from(TransactionSelector {
336                selector: Option::from(transaction_selector::Selector::Id(tx.id.clone())),
337            }),
338            statements: vec![execute_batch_dml_request::Statement {
339                sql: "INSERT INTO GuildX (GuildId,OwnerUserId,UpdatedAt) VALUES('1', 'u1', CURRENT_TIMESTAMP())"
340                    .to_string(),
341                params: None,
342                param_types: Default::default(),
343            }],
344            seqno: 0,
345            request_options: None,
346        };
347
348        let result = client.execute_batch_dml(request, None).await;
349        client
350            .rollback(
351                RollbackRequest {
352                    session: session.name.to_string(),
353                    transaction_id: tx.id,
354                },
355                None,
356            )
357            .await
358            .unwrap();
359        match result {
360            Ok(res) => panic!("must be error code = {:?}", res.into_inner().status.unwrap().code),
361            Err(status) => {
362                assert_eq!(
363                    Code::InvalidArgument,
364                    status.code(),
365                    "gRPC success but error found : {status:?}"
366                );
367            }
368        };
369    }
370
371    #[tokio::test]
372    #[serial]
373    async fn test_read() {
374        let mut client = create_spanner_client().await;
375        let session = create_session(&mut client).await;
376        let request = ReadRequest {
377            session: session.name.to_string(),
378            transaction: None,
379            table: "Guild".to_string(),
380            index: "".to_string(),
381            columns: vec!["GuildId".to_string()],
382            key_set: Some(KeySet {
383                keys: vec![],
384                ranges: vec![],
385                all: true,
386            }),
387            resume_token: vec![],
388            partition_token: vec![],
389            request_options: None,
390            limit: 0,
391            data_boost_enabled: false,
392            order_by: 0,
393            directed_read_options: None,
394            lock_hint: 0,
395        };
396
397        match client.read(request, None).await {
398            Ok(res) => {
399                println!("row size = {:?}", res.into_inner().rows.len());
400            }
401            Err(err) => panic!("err: {err:?}"),
402        };
403    }
404
405    #[tokio::test]
406    #[serial]
407    async fn test_streaming_read() {
408        let mut client = create_spanner_client().await;
409        let session = create_session(&mut client).await;
410        let request = ReadRequest {
411            session: session.name.to_string(),
412            transaction: None,
413            table: "User".to_string(),
414            index: "".to_string(),
415            columns: vec!["UserId".to_string()],
416            key_set: Some(KeySet {
417                keys: vec![],
418                ranges: vec![],
419                all: true,
420            }),
421            resume_token: vec![],
422            partition_token: vec![],
423            request_options: None,
424            limit: 0,
425            data_boost_enabled: false,
426            order_by: 0,
427            directed_read_options: None,
428            lock_hint: 0,
429        };
430
431        match client.streaming_read(request, None).await {
432            Ok(res) => match res.into_inner().message().await {
433                Ok(..) => {}
434                Err(err) => panic!("err: {err:?}"),
435            },
436            Err(err) => panic!("err: {err:?}"),
437        };
438    }
439
440    #[tokio::test]
441    #[serial]
442    async fn test_commit() {
443        let mut client = create_spanner_client().await;
444        let session = create_session(&mut client).await;
445        let tx = begin_read_write_transaction(&mut client, &session).await;
446        let request = CommitRequest {
447            session: session.name.to_string(),
448            mutations: vec![Mutation {
449                operation: Some(Operation::InsertOrUpdate(Write {
450                    table: "Guild".to_string(),
451                    columns: vec![
452                        "GuildId".to_string(),
453                        "OwnerUserId".to_string(),
454                        "UpdatedAt".to_string(),
455                    ],
456                    values: vec![ListValue {
457                        values: vec![
458                            Value {
459                                kind: Some(Kind::StringValue("g1".to_string())),
460                            },
461                            Value {
462                                kind: Some(Kind::StringValue("u1".to_string())),
463                            },
464                            Value {
465                                kind: Some(Kind::StringValue("spanner.commit_timestamp()".to_string())),
466                            },
467                        ],
468                    }],
469                })),
470            }],
471            transaction: Option::from(commit_request::Transaction::TransactionId(tx.id)),
472            request_options: Option::from(RequestOptions {
473                priority: 10,
474                request_tag: "".to_string(),
475                transaction_tag: "".to_string(),
476            }),
477            return_commit_stats: false,
478            max_commit_delay: None,
479        };
480
481        match client.commit(request, None).await {
482            Ok(res) => {
483                assert!(res.into_inner().commit_timestamp.is_some());
484            }
485            Err(err) => panic!("err: {err:?}"),
486        };
487    }
488
489    #[tokio::test]
490    #[serial]
491    async fn test_rollback() {
492        let mut client = create_spanner_client().await;
493        let session = create_session(&mut client).await;
494        let tx = begin_read_write_transaction(&mut client, &session).await;
495        let request = RollbackRequest {
496            session: session.name.to_string(),
497            transaction_id: tx.id,
498        };
499
500        match client.rollback(request, None).await {
501            Ok(_) => {}
502            Err(err) => panic!("err: {err:?}"),
503        };
504    }
505
506    #[tokio::test]
507    #[serial]
508    async fn test_partition_query() {
509        let mut client = create_spanner_client().await;
510        let session = create_session(&mut client).await;
511        let tx = begin_read_only_transaction(&mut client, &session).await;
512        let request = PartitionQueryRequest {
513            session: session.name.to_string(),
514            transaction: Option::from(TransactionSelector {
515                selector: Option::from(transaction_selector::Selector::Id(tx.id)),
516            }),
517            sql: "SELECT * FROM User".to_string(),
518            params: None,
519            param_types: Default::default(),
520            partition_options: None,
521        };
522
523        match client.partition_query(request, None).await {
524            Ok(res) => {
525                println!("partition count {:?}", res.into_inner().partitions.len());
526                assert_eq!(true, true);
527            }
528            Err(err) => {
529                println!("error code = {0}, {1}", err.code(), err.message());
530                assert_eq!(false, true)
531            }
532        };
533    }
534
535    #[tokio::test]
536    #[serial]
537    async fn test_partition_read() {
538        let mut client = create_spanner_client().await;
539        let session = create_session(&mut client).await;
540        let tx = begin_read_only_transaction(&mut client, &session).await;
541        let request = PartitionReadRequest {
542            session: session.name.to_string(),
543            transaction: Option::from(TransactionSelector {
544                selector: Option::from(transaction_selector::Selector::Id(tx.id)),
545            }),
546            table: "User".to_string(),
547            index: "".to_string(),
548            columns: vec![],
549            partition_options: None,
550            key_set: None,
551        };
552
553        match client.partition_read(request, None).await {
554            Ok(res) => {
555                println!("partition count {:?}", res.into_inner().partitions.len());
556                assert_eq!(true, true);
557            }
558            Err(err) => {
559                println!("error code = {0}, {1}", err.code(), err.message());
560                assert_eq!(false, true)
561            }
562        };
563    }
564}