google_cloud_bigquery/http/
query.rs

1use std::collections::VecDeque;
2use std::marker::PhantomData;
3
4use crate::http::bigquery_job_client::BigqueryJobClient;
5use crate::http::error::Error as HttpError;
6use crate::http::job::get_query_results::GetQueryResultsRequest;
7use crate::http::query::value::StructDecodable;
8use crate::http::tabledata::list::Tuple;
9
10#[derive(thiserror::Error, Debug)]
11pub enum Error {
12    #[error(transparent)]
13    Http(#[from] HttpError),
14    #[error(transparent)]
15    Value(#[from] value::Error),
16}
17
18pub struct Iterator<T: StructDecodable> {
19    pub(crate) client: BigqueryJobClient,
20    pub(crate) project_id: String,
21    pub(crate) job_id: String,
22    pub(crate) request: GetQueryResultsRequest,
23    pub(crate) chunk: VecDeque<Tuple>,
24    pub(crate) force_first_fetch: bool,
25    pub total_size: i64,
26    pub(crate) _marker: PhantomData<T>,
27}
28
29impl<T: StructDecodable> Iterator<T> {
30    pub async fn next(&mut self) -> Result<Option<T>, Error> {
31        loop {
32            if let Some(v) = self.chunk.pop_front() {
33                return Ok(T::decode(v).map(Some)?);
34            }
35            if self.force_first_fetch {
36                self.force_first_fetch = false
37            } else if self.request.page_token.is_none() {
38                return Ok(None);
39            }
40            let response = self
41                .client
42                .get_query_results(self.project_id.as_str(), self.job_id.as_str(), &self.request)
43                .await?;
44            if response.rows.is_none() {
45                return Ok(None);
46            }
47            let v = response.rows.unwrap();
48            self.chunk = VecDeque::from(v);
49            self.request.page_token = response.page_token;
50        }
51    }
52}
53
54pub mod row {
55    use crate::http::query::value::StructDecodable;
56    use crate::http::tabledata::list::{Cell, Tuple};
57
58    #[derive(thiserror::Error, Debug)]
59    pub enum Error {
60        #[error("no data found: {0}")]
61        UnexpectedColumnIndex(usize),
62        #[error(transparent)]
63        Value(#[from] super::value::Error),
64    }
65
66    pub struct Row {
67        inner: Vec<Cell>,
68    }
69
70    impl Row {
71        pub fn column<T: super::value::Decodable>(&self, index: usize) -> Result<T, Error> {
72            let cell: &Cell = self.inner.get(index).ok_or(Error::UnexpectedColumnIndex(index))?;
73            Ok(T::decode(&cell.v)?)
74        }
75    }
76
77    impl StructDecodable for Row {
78        fn decode(value: Tuple) -> Result<Self, crate::http::query::value::Error> {
79            Ok(Self { inner: value.f })
80        }
81    }
82}
83
84pub mod value {
85    use std::num::ParseIntError;
86    use std::ops::AddAssign;
87    use std::str::FromStr;
88    use std::time::Duration;
89
90    use base64::prelude::BASE64_STANDARD;
91    use base64::Engine;
92    use bigdecimal::BigDecimal;
93    use time::error::ComponentRange;
94    use time::macros::format_description;
95    use time::{Date, OffsetDateTime, Time};
96
97    use crate::http::tabledata::list::{Tuple, Value};
98
99    #[derive(thiserror::Error, Debug)]
100    pub enum Error {
101        #[error("invalid type")]
102        InvalidType,
103        #[error("unexpected null value")]
104        UnexpectedNullValue,
105        #[error(transparent)]
106        Timestamp(#[from] ComponentRange),
107        #[error("invalid number {0}")]
108        FromString(String),
109        #[error(transparent)]
110        Base64(#[from] base64::DecodeError),
111        #[error(transparent)]
112        ParseDateTime(#[from] time::error::Parse),
113        #[error(transparent)]
114        ParseBigDecimal(#[from] bigdecimal::ParseBigDecimalError),
115        #[error(transparent)]
116        ParseTime(#[from] ParseIntError),
117    }
118
119    pub trait Decodable: Sized {
120        fn decode(value: &Value) -> Result<Self, Error>;
121    }
122
123    pub trait StructDecodable: Sized {
124        fn decode(value: Tuple) -> Result<Self, Error>;
125    }
126
127    impl<T: StructDecodable> Decodable for T {
128        fn decode(value: &Value) -> Result<Self, Error> {
129            match value {
130                Value::Struct(v) => T::decode(v.clone()),
131                Value::Null => Err(Error::UnexpectedNullValue),
132                _ => Err(Error::InvalidType),
133            }
134        }
135    }
136
137    impl Decodable for String {
138        fn decode(value: &Value) -> Result<Self, Error> {
139            match value {
140                Value::String(v) => Ok(v.to_string()),
141                Value::Null => Err(Error::UnexpectedNullValue),
142                _ => Err(Error::InvalidType),
143            }
144        }
145    }
146
147    impl Decodable for Vec<u8> {
148        fn decode(value: &Value) -> Result<Self, Error> {
149            match value {
150                Value::String(v) => Ok(BASE64_STANDARD.decode(v)?),
151                Value::Null => Err(Error::UnexpectedNullValue),
152                _ => Err(Error::InvalidType),
153            }
154        }
155    }
156
157    impl Decodable for bool {
158        fn decode(value: &Value) -> Result<Self, Error> {
159            match value {
160                Value::String(v) => Ok(v.parse::<bool>().map_err(|_| Error::FromString(v.clone()))?),
161                Value::Null => Err(Error::UnexpectedNullValue),
162                _ => Err(Error::InvalidType),
163            }
164        }
165    }
166
167    impl Decodable for f64 {
168        fn decode(value: &Value) -> Result<Self, Error> {
169            match value {
170                Value::String(v) => Ok(v.parse::<f64>().map_err(|_| Error::FromString(v.clone()))?),
171                Value::Null => Err(Error::UnexpectedNullValue),
172                _ => Err(Error::InvalidType),
173            }
174        }
175    }
176
177    impl Decodable for i64 {
178        fn decode(value: &Value) -> Result<Self, Error> {
179            match value {
180                Value::String(v) => Ok(v.parse::<i64>().map_err(|_| Error::FromString(v.clone()))?),
181                Value::Null => Err(Error::UnexpectedNullValue),
182                _ => Err(Error::InvalidType),
183            }
184        }
185    }
186
187    impl Decodable for BigDecimal {
188        fn decode(value: &Value) -> Result<Self, Error> {
189            match value {
190                Value::String(v) => Ok(BigDecimal::from_str(v)?),
191                Value::Null => Err(Error::UnexpectedNullValue),
192                _ => Err(Error::InvalidType),
193            }
194        }
195    }
196
197    impl Decodable for OffsetDateTime {
198        fn decode(value: &Value) -> Result<Self, Error> {
199            let f = f64::decode(value)?;
200            let sec = f.trunc();
201            // Timestamps in BigQuery have microsecond precision, so we must
202            // return a round number of microseconds.
203            let micro = ((f - sec) * 1000000.0 + 0.5).trunc();
204            Ok(OffsetDateTime::from_unix_timestamp_nanos(
205                sec as i128 * 1_000_000_000 + micro as i128 * 1000,
206            )?)
207        }
208    }
209
210    impl Decodable for Date {
211        fn decode(value: &Value) -> Result<Self, Error> {
212            match value {
213                Value::String(v) => Ok(Date::parse(v, format_description!("[year]-[month]-[day]"))?),
214                Value::Null => Err(Error::UnexpectedNullValue),
215                _ => Err(Error::InvalidType),
216            }
217        }
218    }
219
220    impl Decodable for Time {
221        fn decode(value: &Value) -> Result<Self, Error> {
222            match value {
223                Value::String(v) => {
224                    let split: Vec<&str> = v.split('.').collect();
225                    let mut time = Time::parse(split[0], format_description!("[hour]:[minute]:[second]"))?;
226                    if split.len() > 1 {
227                        let micro: u64 = split[1].parse()?;
228                        time.add_assign(Duration::from_micros(micro))
229                    }
230                    Ok(time)
231                }
232                Value::Null => Err(Error::UnexpectedNullValue),
233                _ => Err(Error::InvalidType),
234            }
235        }
236    }
237
238    impl<T> Decodable for Vec<T>
239    where
240        T: Decodable,
241    {
242        fn decode(value: &Value) -> Result<Self, Error> {
243            match value {
244                Value::Array(v) => {
245                    let mut result = Vec::with_capacity(v.len());
246                    for element in v {
247                        result.push(T::decode(&element.v)?);
248                    }
249                    Ok(result)
250                }
251                Value::Null => Err(Error::UnexpectedNullValue),
252                _ => Err(Error::InvalidType),
253            }
254        }
255    }
256
257    impl<T> Decodable for Option<T>
258    where
259        T: Decodable,
260    {
261        fn decode(value: &Value) -> Result<Self, Error> {
262            match value {
263                Value::Null => Ok(None),
264                _ => Ok(Some(T::decode(value)?)),
265            }
266        }
267    }
268}