gcp_bigquery_client/model/
query_response.rs

1use crate::error::BQError;
2use crate::model::error_proto::ErrorProto;
3use crate::model::get_query_results_response::GetQueryResultsResponse;
4use crate::model::job_reference::JobReference;
5use crate::model::table_row::TableRow;
6use crate::model::table_schema::TableSchema;
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use std::collections::HashMap;
10
11#[derive(Debug, Default, Clone, Serialize, Deserialize)]
12#[serde(rename_all = "camelCase")]
13pub struct QueryResponse {
14    /// Whether the query result was fetched from the query cache.
15    #[serde(skip_serializing_if = "Option::is_none")]
16    pub cache_hit: Option<bool>,
17    /// [Output-only] The first errors or warnings encountered during the running of the job. The final message includes the number of errors that caused the process to stop. Errors here do not necessarily mean that the job has completed or was unsuccessful.
18    #[serde(skip_serializing_if = "Option::is_none")]
19    pub errors: Option<Vec<ErrorProto>>,
20    /// Whether the query has completed or not. If rows or totalRows are present, this will always be true. If this is false, totalRows will not be available.
21    #[serde(skip_serializing_if = "Option::is_none")]
22    pub job_complete: Option<bool>,
23    #[serde(skip_serializing_if = "Option::is_none")]
24    pub job_reference: Option<JobReference>,
25    /// The resource type.
26    #[serde(skip_serializing_if = "Option::is_none")]
27    pub kind: Option<String>,
28    /// [Output-only] The number of rows affected by a DML statement. Present only for DML statements INSERT, UPDATE or DELETE.
29    #[serde(skip_serializing_if = "Option::is_none")]
30    pub num_dml_affected_rows: Option<String>,
31    /// A token used for paging results.
32    #[serde(skip_serializing_if = "Option::is_none")]
33    pub page_token: Option<String>,
34    /// An object with as many results as can be contained within the maximum permitted reply size. To get any additional rows, you can call GetQueryResults and specify the jobReference returned above.
35    #[serde(skip_serializing_if = "Option::is_none")]
36    pub rows: Option<Vec<TableRow>>,
37    #[serde(skip_serializing_if = "Option::is_none")]
38    pub schema: Option<TableSchema>,
39    /// The total number of bytes processed for this query. If this query was a dry run, this is the number of bytes that would be processed if the query were run.
40    #[serde(skip_serializing_if = "Option::is_none")]
41    pub total_bytes_processed: Option<String>,
42    /// The total number of rows in the complete query result set, which can be more than the number of rows in this single page of results.
43    #[serde(skip_serializing_if = "Option::is_none")]
44    pub total_rows: Option<String>,
45}
46
47impl From<GetQueryResultsResponse> for QueryResponse {
48    fn from(resp: GetQueryResultsResponse) -> Self {
49        Self {
50            cache_hit: resp.cache_hit,
51            errors: resp.errors,
52            job_complete: resp.job_complete,
53            job_reference: resp.job_reference,
54            kind: resp.kind,
55            num_dml_affected_rows: resp.num_dml_affected_rows,
56            page_token: resp.page_token,
57            rows: resp.rows,
58            schema: resp.schema,
59            total_bytes_processed: resp.total_bytes_processed,
60            total_rows: resp.total_rows,
61        }
62    }
63}
64
65/// Set of rows in response to a SQL query
66#[derive(Debug)]
67pub struct ResultSet {
68    cursor: i64,
69    row_count: i64,
70    rows: Vec<TableRow>,
71    fields: HashMap<String, usize>,
72}
73
74impl ResultSet {
75    pub fn new_from_query_response(query_response: QueryResponse) -> Self {
76        if query_response.job_complete.unwrap_or(false) && query_response.schema.is_some() {
77            // rows and tables schema are only present for successfully completed jobs.
78            let row_count = query_response.rows.as_ref().map_or(0, Vec::len) as i64;
79            let table_schema = query_response.schema.as_ref().expect("Expecting a schema");
80            let table_fields = table_schema
81                .fields
82                .as_ref()
83                .expect("Expecting a non empty list of fields");
84            let fields: HashMap<String, usize> = table_fields
85                .iter()
86                .enumerate()
87                .map(|(pos, field)| (field.name.clone(), pos))
88                .collect();
89            let rows = query_response.rows.unwrap_or_default();
90            Self {
91                cursor: -1,
92                row_count,
93                rows,
94                fields,
95            }
96        } else {
97            Self {
98                cursor: -1,
99                row_count: 0,
100                rows: vec![],
101                fields: HashMap::new(),
102            }
103        }
104    }
105
106    pub fn new_from_get_query_results_response(get_query_results_response: GetQueryResultsResponse) -> Self {
107        if get_query_results_response.job_complete.unwrap_or(false) && get_query_results_response.schema.is_some() {
108            // rows and tables schema are only present for successfully completed jobs.
109            let row_count = get_query_results_response.rows.as_ref().map_or(0, Vec::len) as i64;
110            let table_schema = get_query_results_response.schema.as_ref().expect("Expecting a schema");
111            let table_fields = table_schema
112                .fields
113                .as_ref()
114                .expect("Expecting a non empty list of fields");
115            let fields: HashMap<String, usize> = table_fields
116                .iter()
117                .enumerate()
118                .map(|(pos, field)| (field.name.clone(), pos))
119                .collect();
120            let rows = get_query_results_response.rows.unwrap_or_default();
121            Self {
122                cursor: -1,
123                row_count,
124                rows,
125                fields,
126            }
127        } else {
128            Self {
129                cursor: -1,
130                row_count: 0,
131                rows: vec![],
132                fields: HashMap::new(),
133            }
134        }
135    }
136
137    /// Moves the cursor froward one row from its current position.
138    /// A ResultSet cursor is initially positioned before the first row; the first call to the method next makes the
139    /// first row the current row; the second call makes the second row the current row, and so on.
140    pub fn next_row(&mut self) -> bool {
141        if self.cursor == (self.row_count - 1) {
142            false
143        } else {
144            self.cursor += 1;
145            true
146        }
147    }
148
149    /// Total number of rows in this result set.
150    pub fn row_count(&self) -> usize {
151        self.row_count as usize
152    }
153
154    /// List of column names for this result set.
155    pub fn column_names(&self) -> Vec<String> {
156        self.fields.keys().cloned().collect()
157    }
158
159    /// Returns the index for a column name.
160    pub fn column_index(&self, column_name: &str) -> Option<&usize> {
161        self.fields.get(column_name)
162    }
163
164    pub fn get_i64(&self, col_index: usize) -> Result<Option<i64>, BQError> {
165        let json_value = self.get_json_value(col_index)?;
166        match &json_value {
167            None => Ok(None),
168            Some(json_value) => match json_value {
169                serde_json::Value::Number(value) => Ok(value.as_i64()),
170                serde_json::Value::String(value) => match (value.parse::<i64>(), value.parse::<f64>()) {
171                    (Ok(v), _) => Ok(Some(v)),
172                    (Err(_), Ok(v)) => Ok(Some(v as i64)),
173                    _ => Err(BQError::InvalidColumnType {
174                        col_index,
175                        col_type: ResultSet::json_type(json_value),
176                        type_requested: "I64".into(),
177                    }),
178                },
179                _ => Err(BQError::InvalidColumnType {
180                    col_index,
181                    col_type: ResultSet::json_type(json_value),
182                    type_requested: "I64".into(),
183                }),
184            },
185        }
186    }
187
188    pub fn get_i64_by_name(&self, col_name: &str) -> Result<Option<i64>, BQError> {
189        let col_index = self.fields.get(col_name);
190        match col_index {
191            None => Err(BQError::InvalidColumnName {
192                col_name: col_name.into(),
193            }),
194            Some(col_index) => self.get_i64(*col_index),
195        }
196    }
197
198    pub fn get_serde<T>(&self, col_index: usize) -> Result<Option<T>, BQError>
199    where
200        T: serde::de::DeserializeOwned,
201    {
202        let json_value = self.get_json_value(col_index)?;
203        match json_value {
204            None => Ok(None),
205            Some(json_value) => match serde_json::from_value::<T>(json_value.clone()) {
206                Ok(value) => Ok(Some(value)),
207                Err(_) => Err(BQError::InvalidColumnType {
208                    col_index,
209                    col_type: ResultSet::json_type(&json_value),
210                    type_requested: std::any::type_name::<T>().to_string(),
211                }),
212            },
213        }
214    }
215
216    pub fn get_serde_by_name<T>(&self, col_name: &str) -> Result<Option<T>, BQError>
217    where
218        T: serde::de::DeserializeOwned,
219    {
220        let col_index = self.fields.get(col_name);
221        match col_index {
222            None => Err(BQError::InvalidColumnName {
223                col_name: col_name.into(),
224            }),
225            Some(col_index) => self.get_serde(*col_index),
226        }
227    }
228
229    pub fn get_f64(&self, col_index: usize) -> Result<Option<f64>, BQError> {
230        let json_value = self.get_json_value(col_index)?;
231        match &json_value {
232            None => Ok(None),
233            Some(json_value) => match json_value {
234                serde_json::Value::Number(value) => Ok(value.as_f64()),
235                serde_json::Value::String(value) => {
236                    let value: Result<f64, _> = value.parse();
237                    match &value {
238                        Err(_) => Err(BQError::InvalidColumnType {
239                            col_index,
240                            col_type: ResultSet::json_type(json_value),
241                            type_requested: "F64".into(),
242                        }),
243                        Ok(value) => Ok(Some(*value)),
244                    }
245                }
246                _ => Err(BQError::InvalidColumnType {
247                    col_index,
248                    col_type: ResultSet::json_type(json_value),
249                    type_requested: "F64".into(),
250                }),
251            },
252        }
253    }
254
255    pub fn get_f64_by_name(&self, col_name: &str) -> Result<Option<f64>, BQError> {
256        let col_index = self.fields.get(col_name);
257        match col_index {
258            None => Err(BQError::InvalidColumnName {
259                col_name: col_name.into(),
260            }),
261            Some(col_index) => self.get_f64(*col_index),
262        }
263    }
264
265    pub fn get_bool(&self, col_index: usize) -> Result<Option<bool>, BQError> {
266        let json_value = self.get_json_value(col_index)?;
267        match &json_value {
268            None => Ok(None),
269            Some(json_value) => match json_value {
270                serde_json::Value::Bool(value) => Ok(Some(*value)),
271                serde_json::Value::String(value) => {
272                    let value: Result<bool, _> = value.parse();
273                    match &value {
274                        Err(_) => Err(BQError::InvalidColumnType {
275                            col_index,
276                            col_type: ResultSet::json_type(json_value),
277                            type_requested: "Bool".into(),
278                        }),
279                        Ok(value) => Ok(Some(*value)),
280                    }
281                }
282                _ => Err(BQError::InvalidColumnType {
283                    col_index,
284                    col_type: ResultSet::json_type(json_value),
285                    type_requested: "Bool".into(),
286                }),
287            },
288        }
289    }
290
291    pub fn get_bool_by_name(&self, col_name: &str) -> Result<Option<bool>, BQError> {
292        let col_index = self.fields.get(col_name);
293        match col_index {
294            None => Err(BQError::InvalidColumnName {
295                col_name: col_name.into(),
296            }),
297            Some(col_index) => self.get_bool(*col_index),
298        }
299    }
300
301    pub fn get_string(&self, col_index: usize) -> Result<Option<String>, BQError> {
302        let json_value = self.get_json_value(col_index)?;
303        match json_value {
304            None => Ok(None),
305            Some(json_value) => match json_value {
306                serde_json::Value::String(value) => Ok(Some(value)),
307                serde_json::Value::Number(value) => Ok(Some(value.to_string())),
308                serde_json::Value::Bool(value) => Ok(Some(value.to_string())),
309                _ => Err(BQError::InvalidColumnType {
310                    col_index,
311                    col_type: ResultSet::json_type(&json_value),
312                    type_requested: "String".into(),
313                }),
314            },
315        }
316    }
317
318    pub fn get_string_by_name(&self, col_name: &str) -> Result<Option<String>, BQError> {
319        let col_index = self.fields.get(col_name);
320        match col_index {
321            None => Err(BQError::InvalidColumnName {
322                col_name: col_name.into(),
323            }),
324            Some(col_index) => self.get_string(*col_index),
325        }
326    }
327
328    pub fn get_json_value(&self, col_index: usize) -> Result<Option<serde_json::Value>, BQError> {
329        if self.cursor < 0 || self.cursor == self.row_count {
330            return Err(BQError::NoDataAvailable);
331        }
332        if col_index >= self.fields.len() {
333            return Err(BQError::InvalidColumnIndex { col_index });
334        }
335
336        Ok(self
337            .rows
338            .get(self.cursor as usize)
339            .and_then(|row| row.columns.as_ref())
340            .and_then(|cols| cols.get(col_index))
341            .and_then(|col| col.value.clone()))
342    }
343
344    pub fn get_json_value_by_name(&self, col_name: &str) -> Result<Option<serde_json::Value>, BQError> {
345        let col_pos = self.fields.get(col_name);
346        match col_pos {
347            None => Err(BQError::InvalidColumnName {
348                col_name: col_name.into(),
349            }),
350            Some(col_pos) => self.get_json_value(*col_pos),
351        }
352    }
353
354    fn json_type(json_value: &serde_json::Value) -> String {
355        match json_value {
356            Value::Null => "Null".into(),
357            Value::Bool(_) => "Bool".into(),
358            Value::Number(_) => "Number".into(),
359            Value::String(_) => "String".into(),
360            Value::Array(_) => "Array".into(),
361            Value::Object(_) => "Object".into(),
362        }
363    }
364}