google_cloud_bigquery/http/
query.rs1use std::collections::VecDeque;
2use std::marker::PhantomData;
3
4use crate::http::bigquery_job_client::BigqueryJobClient;
5use crate::http::error::Error as HttpError;
6use crate::http::job::get_query_results::GetQueryResultsRequest;
7use crate::http::query::value::StructDecodable;
8use crate::http::tabledata::list::Tuple;
9
10#[derive(thiserror::Error, Debug)]
11pub enum Error {
12 #[error(transparent)]
13 Http(#[from] HttpError),
14 #[error(transparent)]
15 Value(#[from] value::Error),
16}
17
18pub struct Iterator<T: StructDecodable> {
19 pub(crate) client: BigqueryJobClient,
20 pub(crate) project_id: String,
21 pub(crate) job_id: String,
22 pub(crate) request: GetQueryResultsRequest,
23 pub(crate) chunk: VecDeque<Tuple>,
24 pub(crate) force_first_fetch: bool,
25 pub total_size: i64,
26 pub(crate) _marker: PhantomData<T>,
27}
28
29impl<T: StructDecodable> Iterator<T> {
30 pub async fn next(&mut self) -> Result<Option<T>, Error> {
31 loop {
32 if let Some(v) = self.chunk.pop_front() {
33 return Ok(T::decode(v).map(Some)?);
34 }
35 if self.force_first_fetch {
36 self.force_first_fetch = false
37 } else if self.request.page_token.is_none() {
38 return Ok(None);
39 }
40 let response = self
41 .client
42 .get_query_results(self.project_id.as_str(), self.job_id.as_str(), &self.request)
43 .await?;
44 if response.rows.is_none() {
45 return Ok(None);
46 }
47 let v = response.rows.unwrap();
48 self.chunk = VecDeque::from(v);
49 self.request.page_token = response.page_token;
50 }
51 }
52}
53
54pub mod row {
55 use crate::http::query::value::StructDecodable;
56 use crate::http::tabledata::list::{Cell, Tuple};
57
58 #[derive(thiserror::Error, Debug)]
59 pub enum Error {
60 #[error("no data found: {0}")]
61 UnexpectedColumnIndex(usize),
62 #[error(transparent)]
63 Value(#[from] super::value::Error),
64 }
65
66 pub struct Row {
67 inner: Vec<Cell>,
68 }
69
70 impl Row {
71 pub fn column<T: super::value::Decodable>(&self, index: usize) -> Result<T, Error> {
72 let cell: &Cell = self.inner.get(index).ok_or(Error::UnexpectedColumnIndex(index))?;
73 Ok(T::decode(&cell.v)?)
74 }
75 }
76
77 impl StructDecodable for Row {
78 fn decode(value: Tuple) -> Result<Self, crate::http::query::value::Error> {
79 Ok(Self { inner: value.f })
80 }
81 }
82}
83
84pub mod value {
85 use std::num::ParseIntError;
86 use std::ops::AddAssign;
87 use std::str::FromStr;
88 use std::time::Duration;
89
90 use base64::prelude::BASE64_STANDARD;
91 use base64::Engine;
92 use bigdecimal::BigDecimal;
93 use time::error::ComponentRange;
94 use time::macros::format_description;
95 use time::{Date, OffsetDateTime, Time};
96
97 use crate::http::tabledata::list::{Tuple, Value};
98
99 #[derive(thiserror::Error, Debug)]
100 pub enum Error {
101 #[error("invalid type")]
102 InvalidType,
103 #[error("unexpected null value")]
104 UnexpectedNullValue,
105 #[error(transparent)]
106 Timestamp(#[from] ComponentRange),
107 #[error("invalid number {0}")]
108 FromString(String),
109 #[error(transparent)]
110 Base64(#[from] base64::DecodeError),
111 #[error(transparent)]
112 ParseDateTime(#[from] time::error::Parse),
113 #[error(transparent)]
114 ParseBigDecimal(#[from] bigdecimal::ParseBigDecimalError),
115 #[error(transparent)]
116 ParseTime(#[from] ParseIntError),
117 }
118
119 pub trait Decodable: Sized {
120 fn decode(value: &Value) -> Result<Self, Error>;
121 }
122
123 pub trait StructDecodable: Sized {
124 fn decode(value: Tuple) -> Result<Self, Error>;
125 }
126
127 impl<T: StructDecodable> Decodable for T {
128 fn decode(value: &Value) -> Result<Self, Error> {
129 match value {
130 Value::Struct(v) => T::decode(v.clone()),
131 Value::Null => Err(Error::UnexpectedNullValue),
132 _ => Err(Error::InvalidType),
133 }
134 }
135 }
136
137 impl Decodable for String {
138 fn decode(value: &Value) -> Result<Self, Error> {
139 match value {
140 Value::String(v) => Ok(v.to_string()),
141 Value::Null => Err(Error::UnexpectedNullValue),
142 _ => Err(Error::InvalidType),
143 }
144 }
145 }
146
147 impl Decodable for Vec<u8> {
148 fn decode(value: &Value) -> Result<Self, Error> {
149 match value {
150 Value::String(v) => Ok(BASE64_STANDARD.decode(v)?),
151 Value::Null => Err(Error::UnexpectedNullValue),
152 _ => Err(Error::InvalidType),
153 }
154 }
155 }
156
157 impl Decodable for bool {
158 fn decode(value: &Value) -> Result<Self, Error> {
159 match value {
160 Value::String(v) => Ok(v.parse::<bool>().map_err(|_| Error::FromString(v.clone()))?),
161 Value::Null => Err(Error::UnexpectedNullValue),
162 _ => Err(Error::InvalidType),
163 }
164 }
165 }
166
167 impl Decodable for f64 {
168 fn decode(value: &Value) -> Result<Self, Error> {
169 match value {
170 Value::String(v) => Ok(v.parse::<f64>().map_err(|_| Error::FromString(v.clone()))?),
171 Value::Null => Err(Error::UnexpectedNullValue),
172 _ => Err(Error::InvalidType),
173 }
174 }
175 }
176
177 impl Decodable for i64 {
178 fn decode(value: &Value) -> Result<Self, Error> {
179 match value {
180 Value::String(v) => Ok(v.parse::<i64>().map_err(|_| Error::FromString(v.clone()))?),
181 Value::Null => Err(Error::UnexpectedNullValue),
182 _ => Err(Error::InvalidType),
183 }
184 }
185 }
186
187 impl Decodable for BigDecimal {
188 fn decode(value: &Value) -> Result<Self, Error> {
189 match value {
190 Value::String(v) => Ok(BigDecimal::from_str(v)?),
191 Value::Null => Err(Error::UnexpectedNullValue),
192 _ => Err(Error::InvalidType),
193 }
194 }
195 }
196
197 impl Decodable for OffsetDateTime {
198 fn decode(value: &Value) -> Result<Self, Error> {
199 let f = f64::decode(value)?;
200 let sec = f.trunc();
201 let micro = ((f - sec) * 1000000.0 + 0.5).trunc();
204 Ok(OffsetDateTime::from_unix_timestamp_nanos(
205 sec as i128 * 1_000_000_000 + micro as i128 * 1000,
206 )?)
207 }
208 }
209
210 impl Decodable for Date {
211 fn decode(value: &Value) -> Result<Self, Error> {
212 match value {
213 Value::String(v) => Ok(Date::parse(v, format_description!("[year]-[month]-[day]"))?),
214 Value::Null => Err(Error::UnexpectedNullValue),
215 _ => Err(Error::InvalidType),
216 }
217 }
218 }
219
220 impl Decodable for Time {
221 fn decode(value: &Value) -> Result<Self, Error> {
222 match value {
223 Value::String(v) => {
224 let split: Vec<&str> = v.split('.').collect();
225 let mut time = Time::parse(split[0], format_description!("[hour]:[minute]:[second]"))?;
226 if split.len() > 1 {
227 let micro: u64 = split[1].parse()?;
228 time.add_assign(Duration::from_micros(micro))
229 }
230 Ok(time)
231 }
232 Value::Null => Err(Error::UnexpectedNullValue),
233 _ => Err(Error::InvalidType),
234 }
235 }
236 }
237
238 impl<T> Decodable for Vec<T>
239 where
240 T: Decodable,
241 {
242 fn decode(value: &Value) -> Result<Self, Error> {
243 match value {
244 Value::Array(v) => {
245 let mut result = Vec::with_capacity(v.len());
246 for element in v {
247 result.push(T::decode(&element.v)?);
248 }
249 Ok(result)
250 }
251 Value::Null => Err(Error::UnexpectedNullValue),
252 _ => Err(Error::InvalidType),
253 }
254 }
255 }
256
257 impl<T> Decodable for Option<T>
258 where
259 T: Decodable,
260 {
261 fn decode(value: &Value) -> Result<Self, Error> {
262 match value {
263 Value::Null => Ok(None),
264 _ => Ok(Some(T::decode(value)?)),
265 }
266 }
267 }
268}