Skip to main content

gcloud_spanner/apiv1/
mod.rs

1pub mod conn_pool;
2pub mod spanner_client;
3
4#[cfg(test)]
5mod tests {
6    use prost_types::{value::Kind, ListValue, Value};
7    use serial_test::serial;
8
9    use crate::apiv1::conn_pool::ConnectionManager;
10    use crate::apiv1::spanner_client::Client;
11    use crate::session::client_metadata;
12    use google_cloud_gax::conn::{ConnectionOptions, Environment};
13    use google_cloud_gax::grpc::Code;
14    use google_cloud_googleapis::spanner::v1::mutation::{Operation, Write};
15    use google_cloud_googleapis::spanner::v1::transaction_options::IsolationLevel;
16    use google_cloud_googleapis::spanner::v1::{
17        commit_request, transaction_options, transaction_selector, BatchCreateSessionsRequest, BeginTransactionRequest,
18        CommitRequest, CreateSessionRequest, DeleteSessionRequest, ExecuteBatchDmlRequest, ExecuteSqlRequest,
19        GetSessionRequest, ListSessionsRequest, PartitionQueryRequest, PartitionReadRequest, ReadRequest,
20        RequestOptions, RollbackRequest, Session, Transaction, TransactionOptions, TransactionSelector,
21    };
22    use google_cloud_googleapis::spanner::v1::{execute_batch_dml_request, KeySet, Mutation};
23
24    const DATABASE: &str = "projects/local-project/instances/test-instance/databases/local-database";
25
26    async fn create_spanner_client() -> Client {
27        let cm = ConnectionManager::new(
28            1,
29            &Environment::Emulator("localhost:9010".to_string()),
30            "",
31            &ConnectionOptions::default(),
32        )
33        .await
34        .unwrap();
35        cm.conn().with_metadata(client_metadata(DATABASE))
36    }
37
38    async fn create_session(client: &mut Client) -> Session {
39        let session_request = CreateSessionRequest {
40            database: DATABASE.to_string(),
41            session: None,
42        };
43        let session_response = client.create_session(session_request, true, None).await.unwrap();
44        session_response.into_inner()
45    }
46
47    async fn begin_read_only_transaction(client: &mut Client, session: &Session) -> Transaction {
48        let request = BeginTransactionRequest {
49            session: session.name.to_string(),
50            options: Option::from(TransactionOptions {
51                exclude_txn_from_change_streams: false,
52                mode: Option::from(transaction_options::Mode::ReadOnly(transaction_options::ReadOnly {
53                    return_read_timestamp: false,
54                    timestamp_bound: None,
55                })),
56                isolation_level: IsolationLevel::Unspecified as i32,
57            }),
58            request_options: None,
59            mutation_key: None,
60        };
61        client
62            .begin_transaction(request, true, None)
63            .await
64            .unwrap()
65            .into_inner()
66    }
67
68    async fn begin_read_write_transaction(client: &mut Client, session: &Session) -> Transaction {
69        let request = BeginTransactionRequest {
70            session: session.name.to_string(),
71            options: Some(TransactionOptions {
72                exclude_txn_from_change_streams: false,
73                mode: Some(transaction_options::Mode::ReadWrite(transaction_options::ReadWrite::default())),
74                isolation_level: IsolationLevel::Unspecified as i32,
75            }),
76            request_options: None,
77            mutation_key: None,
78        };
79        client
80            .begin_transaction(request, true, None)
81            .await
82            .unwrap()
83            .into_inner()
84    }
85
86    #[tokio::test]
87    #[serial]
88    async fn test_create_session() {
89        let mut client = create_spanner_client().await;
90        let request = CreateSessionRequest {
91            database: DATABASE.to_string(),
92            session: None,
93        };
94
95        match client.create_session(request, true, None).await {
96            Ok(res) => {
97                println!("created session = {}", res.get_ref().name);
98                assert!(!res.get_ref().name.is_empty());
99            }
100            Err(err) => panic!("err: {err:?}"),
101        };
102    }
103
104    #[tokio::test]
105    #[serial]
106    async fn test_batch_create_session() {
107        let mut client = create_spanner_client().await;
108        let request = BatchCreateSessionsRequest {
109            database: DATABASE.to_string(),
110            session_count: 2,
111            session_template: None,
112        };
113
114        match client.batch_create_sessions(request, true, None).await {
115            Ok(res) => {
116                assert_eq!(
117                    res.get_ref().session.len(),
118                    2,
119                    "created session size = {}",
120                    res.get_ref().session.len()
121                );
122            }
123            Err(err) => panic!("err: {err:?}"),
124        };
125    }
126
127    #[tokio::test]
128    #[serial]
129    async fn test_get_session() {
130        let mut client = create_spanner_client().await;
131        let session = create_session(&mut client).await;
132        let request = GetSessionRequest {
133            name: session.name.to_string(),
134        };
135
136        match client.get_session(request, true, None).await {
137            Ok(res) => {
138                assert_eq!(res.get_ref().name, session.name.to_string());
139            }
140            Err(err) => panic!("err: {err:?}"),
141        };
142    }
143
144    #[tokio::test]
145    #[serial]
146    async fn test_list_sessions() {
147        let mut client = create_spanner_client().await;
148        let request = ListSessionsRequest {
149            database: DATABASE.to_string(),
150            page_size: 10,
151            page_token: "".to_string(),
152            filter: "".to_string(),
153        };
154
155        match client.list_sessions(request, true, None).await {
156            Ok(res) => {
157                println!("list session size = {}", res.get_ref().sessions.len());
158            }
159            Err(err) => panic!("err: {err:?}"),
160        };
161    }
162
163    #[tokio::test]
164    #[serial]
165    async fn test_delete_session() {
166        let mut client = create_spanner_client().await;
167
168        // create sessions
169        let batch_request = BatchCreateSessionsRequest {
170            database: DATABASE.to_string(),
171            session_count: 2,
172            session_template: None,
173        };
174        let session_response = client.batch_create_sessions(batch_request, true, None).await.unwrap();
175        let sessions = &session_response.get_ref().session;
176
177        // all delete
178        for session in sessions.iter() {
179            let request = DeleteSessionRequest {
180                name: session.name.to_string(),
181            };
182
183            match client.delete_session(request, true, None).await {
184                Ok(_) => {}
185                Err(err) => panic!("err: {err:?}"),
186            };
187        }
188    }
189
190    #[tokio::test]
191    #[serial]
192    async fn test_execute_sql() {
193        let mut client = create_spanner_client().await;
194        let session = create_session(&mut client).await;
195        let request = ExecuteSqlRequest {
196            session: session.name.to_string(),
197            transaction: None,
198            sql: "SELECT 1".to_string(),
199            params: None,
200            param_types: Default::default(),
201            resume_token: vec![],
202            query_mode: 0,
203            partition_token: vec![],
204            seqno: 0,
205            query_options: None,
206            request_options: None,
207            directed_read_options: None,
208            data_boost_enabled: false,
209            last_statement: false,
210        };
211        match client.execute_sql(request, true, None).await {
212            Ok(res) => {
213                assert_eq!(1, res.into_inner().rows.len());
214            }
215            Err(err) => panic!("err: {err:?}"),
216        };
217    }
218
219    #[tokio::test]
220    #[serial]
221    async fn test_execute_streaming_sql() {
222        let mut client = create_spanner_client().await;
223        let session = create_session(&mut client).await;
224        let mut request = ExecuteSqlRequest {
225            session: session.name.to_string(),
226            transaction: None,
227            sql: "select 1".to_string(),
228            params: None,
229            param_types: Default::default(),
230            resume_token: vec![],
231            query_mode: 0,
232            partition_token: vec![],
233            seqno: 0,
234            query_options: None,
235            request_options: None,
236            directed_read_options: None,
237            data_boost_enabled: false,
238            last_statement: false,
239        };
240
241        let resume_token = match client.execute_streaming_sql(request.clone(), true, None).await {
242            Ok(res) => {
243                let mut result = res.into_inner();
244                if let Some(next_message) = result.message().await.unwrap() {
245                    Some(next_message.resume_token)
246                } else {
247                    None
248                }
249            }
250            Err(err) => panic!("err: {err:?}"),
251        };
252        assert!(resume_token.is_some());
253        println!("resume token = {:?}", resume_token.clone().unwrap());
254        request.resume_token = resume_token.unwrap();
255
256        match client.execute_streaming_sql(request, true, None).await {
257            Ok(res) => {
258                let mut result = res.into_inner();
259                assert!(!result.message().await.unwrap().unwrap().values.is_empty())
260            }
261            Err(err) => panic!("err: {err:?}"),
262        }
263    }
264
265    #[tokio::test]
266    #[serial]
267    async fn test_begin_transaction() {
268        let mut client = create_spanner_client().await;
269        let session = create_session(&mut client).await;
270        let request = BeginTransactionRequest {
271            session: session.name.to_string(),
272            options: Option::from(TransactionOptions {
273                exclude_txn_from_change_streams: false,
274                mode: Option::from(transaction_options::Mode::ReadOnly(transaction_options::ReadOnly {
275                    return_read_timestamp: false,
276                    timestamp_bound: None,
277                })),
278                isolation_level: IsolationLevel::Unspecified as i32,
279            }),
280            request_options: None,
281            mutation_key: None,
282        };
283
284        match client.begin_transaction(request, true, None).await {
285            Ok(res) => {
286                let tx_id = res.into_inner().id;
287                println!("tx id is {tx_id:?}");
288                assert!(!tx_id.is_empty());
289            }
290            Err(err) => panic!("err: {err:?}"),
291        };
292    }
293
294    #[tokio::test]
295    #[serial]
296    async fn test_execute_batch_dml() {
297        let mut client = create_spanner_client().await;
298        let session = create_session(&mut client).await;
299        let tx = begin_read_write_transaction(&mut client, &session).await;
300        let request = ExecuteBatchDmlRequest {
301            session: session.name.to_string(),
302            transaction: Option::from(TransactionSelector {
303                selector: Option::from(transaction_selector::Selector::Id(tx.id.clone())),
304            }),
305            statements: vec![
306                execute_batch_dml_request::Statement {
307                    sql: "INSERT INTO Guild (GuildId,OwnerUserId,UpdatedAt) VALUES('1', 'u1', CURRENT_TIMESTAMP())"
308                        .to_string(),
309                    params: None,
310                    param_types: Default::default(),
311                },
312                execute_batch_dml_request::Statement {
313                    sql: "INSERT INTO Guild (GuildId,OwnerUserId,UpdatedAt) VALUES('2', 'u2', CURRENT_TIMESTAMP())"
314                        .to_string(),
315                    params: None,
316                    param_types: Default::default(),
317                },
318            ],
319            seqno: 0,
320            request_options: None,
321            last_statements: false,
322        };
323
324        let result = client.execute_batch_dml(request, true, None).await;
325        client
326            .rollback(
327                RollbackRequest {
328                    session: session.name.to_string(),
329                    transaction_id: tx.id,
330                },
331                true,
332                None,
333            )
334            .await
335            .unwrap();
336        match result {
337            Ok(res) => {
338                let status = res.into_inner().status.unwrap();
339                assert_eq!(Code::Ok, Code::from(status.code), "gRPC success but error found : {status:?}");
340            }
341            Err(err) => panic!("err: {err:?}"),
342        };
343    }
344
345    #[tokio::test]
346    #[serial]
347    async fn test_execute_batch_dml_error_as_tonic_check() {
348        let mut client = create_spanner_client().await;
349        let session = create_session(&mut client).await;
350        let tx = begin_read_write_transaction(&mut client, &session).await;
351        let request = ExecuteBatchDmlRequest {
352            session: session.name.to_string(),
353            transaction: Option::from(TransactionSelector {
354                selector: Option::from(transaction_selector::Selector::Id(tx.id.clone())),
355            }),
356            statements: vec![execute_batch_dml_request::Statement {
357                sql: "INSERT INTO GuildX (GuildId,OwnerUserId,UpdatedAt) VALUES('1', 'u1', CURRENT_TIMESTAMP())"
358                    .to_string(),
359                params: None,
360                param_types: Default::default(),
361            }],
362            seqno: 0,
363            request_options: None,
364            last_statements: false,
365        };
366
367        let result = client.execute_batch_dml(request, true, None).await;
368        client
369            .rollback(
370                RollbackRequest {
371                    session: session.name.to_string(),
372                    transaction_id: tx.id,
373                },
374                true,
375                None,
376            )
377            .await
378            .unwrap();
379        match result {
380            Ok(res) => panic!("must be error code = {:?}", res.into_inner().status.unwrap().code),
381            Err(status) => {
382                assert_eq!(
383                    Code::InvalidArgument,
384                    status.code(),
385                    "gRPC success but error found : {status:?}"
386                );
387            }
388        };
389    }
390
391    #[tokio::test]
392    #[serial]
393    async fn test_read() {
394        let mut client = create_spanner_client().await;
395        let session = create_session(&mut client).await;
396        let request = ReadRequest {
397            session: session.name.to_string(),
398            transaction: None,
399            table: "Guild".to_string(),
400            index: "".to_string(),
401            columns: vec!["GuildId".to_string()],
402            key_set: Some(KeySet {
403                keys: vec![],
404                ranges: vec![],
405                all: true,
406            }),
407            resume_token: vec![],
408            partition_token: vec![],
409            request_options: None,
410            limit: 0,
411            data_boost_enabled: false,
412            order_by: 0,
413            directed_read_options: None,
414            lock_hint: 0,
415        };
416
417        match client.read(request, true, None).await {
418            Ok(res) => {
419                println!("row size = {:?}", res.into_inner().rows.len());
420            }
421            Err(err) => panic!("err: {err:?}"),
422        };
423    }
424
425    #[tokio::test]
426    #[serial]
427    async fn test_streaming_read() {
428        let mut client = create_spanner_client().await;
429        let session = create_session(&mut client).await;
430        let request = ReadRequest {
431            session: session.name.to_string(),
432            transaction: None,
433            table: "User".to_string(),
434            index: "".to_string(),
435            columns: vec!["UserId".to_string()],
436            key_set: Some(KeySet {
437                keys: vec![],
438                ranges: vec![],
439                all: true,
440            }),
441            resume_token: vec![],
442            partition_token: vec![],
443            request_options: None,
444            limit: 0,
445            data_boost_enabled: false,
446            order_by: 0,
447            directed_read_options: None,
448            lock_hint: 0,
449        };
450
451        match client.streaming_read(request, true, None).await {
452            Ok(res) => match res.into_inner().message().await {
453                Ok(..) => {}
454                Err(err) => panic!("err: {err:?}"),
455            },
456            Err(err) => panic!("err: {err:?}"),
457        };
458    }
459
460    #[tokio::test]
461    #[serial]
462    async fn test_commit() {
463        let mut client = create_spanner_client().await;
464        let session = create_session(&mut client).await;
465        let tx = begin_read_write_transaction(&mut client, &session).await;
466        let request = CommitRequest {
467            session: session.name.to_string(),
468            mutations: vec![Mutation {
469                operation: Some(Operation::InsertOrUpdate(Write {
470                    table: "Guild".to_string(),
471                    columns: vec![
472                        "GuildId".to_string(),
473                        "OwnerUserId".to_string(),
474                        "UpdatedAt".to_string(),
475                    ],
476                    values: vec![ListValue {
477                        values: vec![
478                            Value {
479                                kind: Some(Kind::StringValue("g1".to_string())),
480                            },
481                            Value {
482                                kind: Some(Kind::StringValue("u1".to_string())),
483                            },
484                            Value {
485                                kind: Some(Kind::StringValue("spanner.commit_timestamp()".to_string())),
486                            },
487                        ],
488                    }],
489                })),
490            }],
491            transaction: Option::from(commit_request::Transaction::TransactionId(tx.id)),
492            request_options: Option::from(RequestOptions {
493                priority: 10,
494                request_tag: "".to_string(),
495                transaction_tag: "".to_string(),
496            }),
497            return_commit_stats: false,
498            max_commit_delay: None,
499            precommit_token: None,
500        };
501
502        match client.commit(request, true, None).await {
503            Ok(res) => {
504                assert!(res.into_inner().commit_timestamp.is_some());
505            }
506            Err(err) => panic!("err: {err:?}"),
507        };
508    }
509
510    #[tokio::test]
511    #[serial]
512    async fn test_rollback() {
513        let mut client = create_spanner_client().await;
514        let session = create_session(&mut client).await;
515        let tx = begin_read_write_transaction(&mut client, &session).await;
516        let request = RollbackRequest {
517            session: session.name.to_string(),
518            transaction_id: tx.id,
519        };
520
521        match client.rollback(request, true, None).await {
522            Ok(_) => {}
523            Err(err) => panic!("err: {err:?}"),
524        };
525    }
526
527    #[tokio::test]
528    #[serial]
529    async fn test_partition_query() {
530        let mut client = create_spanner_client().await;
531        let session = create_session(&mut client).await;
532        let tx = begin_read_only_transaction(&mut client, &session).await;
533        let request = PartitionQueryRequest {
534            session: session.name.to_string(),
535            transaction: Option::from(TransactionSelector {
536                selector: Option::from(transaction_selector::Selector::Id(tx.id)),
537            }),
538            sql: "SELECT * FROM User".to_string(),
539            params: None,
540            param_types: Default::default(),
541            partition_options: None,
542        };
543
544        match client.partition_query(request, true, None).await {
545            Ok(res) => {
546                println!("partition count {:?}", res.into_inner().partitions.len());
547                assert_eq!(true, true);
548            }
549            Err(err) => {
550                println!("error code = {0}, {1}", err.code(), err.message());
551                assert_eq!(false, true)
552            }
553        };
554    }
555
556    #[tokio::test]
557    #[serial]
558    async fn test_partition_read() {
559        let mut client = create_spanner_client().await;
560        let session = create_session(&mut client).await;
561        let tx = begin_read_only_transaction(&mut client, &session).await;
562        let request = PartitionReadRequest {
563            session: session.name.to_string(),
564            transaction: Option::from(TransactionSelector {
565                selector: Option::from(transaction_selector::Selector::Id(tx.id)),
566            }),
567            table: "User".to_string(),
568            index: "".to_string(),
569            columns: vec![],
570            partition_options: None,
571            key_set: None,
572        };
573
574        match client.partition_read(request, true, None).await {
575            Ok(res) => {
576                println!("partition count {:?}", res.into_inner().partitions.len());
577                assert_eq!(true, true);
578            }
579            Err(err) => {
580                println!("error code = {0}, {1}", err.code(), err.message());
581                assert_eq!(false, true)
582            }
583        };
584    }
585}