bigquery_storage/lib.rs
1//! # bigquery-storage
2//! A small wrapper around the [Google BigQuery Storage API](https://cloud.google.com/bigquery/docs/reference/storage).
3//!
4//! The BigQuery Storage API allows reading BigQuery tables by serializing their contents into efficient, concurrent streams. The official API supports both binary serialized Arrow and AVRO formats, but this crate only supports outputting Arrow [RecordBatch](arrow::record_batch::RecordBatch) at the moment.
5//! # Usage
6//! 0. You will need some form of authentication, provided by an [`Authenticator`](yup_oauth2::authenticator::Authenticator).
7//! 1. You will first need to create a [`Client`](crate::client::Client), with [`Client::new`](crate::client::Client::new).
8//! 2. Reading tables is done in [read sessions](https://cloud.google.com/bigquery/docs/reference/storage#create_a_session). In this crate, this is handled by [`Client::read_session_builder`](crate::client::Client::read_session_builder).
9//! 3. After that you will have a [`ReadSession`](crate::client::ReadSession), which is a small wrapper around a collection of [read streams](https://cloud.google.com/bigquery/docs/reference/storage#read_from_a_session_stream). Go through the streams with [`ReadSession::next_stream`](crate::client::ReadSession::next_stream).
10//! 4. Each storage stream is wrapped in a [`RowsStreamReader`](crate::read::RowsStreamReader). This will let you consume the stream into an Arrow [`StreamReader`](arrow::ipc::reader::StreamReader), at which point the data will actually be downloaded.
11//! # Example
12//! ```rust
13//! use bigquery_storage::{Table, Client};
14//!
15//! #[tokio::main(flavor = "current_thread")]
16//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
17//! // 1. Load the desired secret (here, a service account key)
18//! let sa_key = yup_oauth2::read_service_account_key("clientsecret.json")
19//! .await?;
20//!
21//! // 2. Create an Authenticator
22//! let auth = yup_oauth2::ServiceAccountAuthenticator::builder(sa_key)
23//! .build()
24//! .await?;
25//!
26//! // 3. Create a Client
27//! let mut client = Client::new(auth).await?;
28//!
29//! // Reading the content of a table `bigquery-public-beta:london_bicycles.cycle_stations`
30//! let test_table = Table::new(
31//! "bigquery-public-data",
32//! "london_bicycles",
33//! "cycle_stations"
34//! );
35//!
36//! // Create a new ReadSession; the `parent_project_id` is the ID of the GCP project
37//! // that owns the read job. This does not download any data.
38//! let mut read_session = client
39//! .read_session_builder(test_table)
40//! .parent_project_id("openquery-public-testing".to_string())
41//! .build()
42//! .await?;
43//!
44//! // Take the first stream in the queue for this ReadSession.
45//! let stream_reader = read_session
46//! .next_stream()
47//! .await?
48//! .expect("did not get any stream");
49//!
50//! // The stream is consumed to yield an Arrow StreamReader, which does download the
51//! // data.
52//! let mut arrow_stream_reader = stream_reader
53//! .into_arrow_reader()
54//! .await?;
55//!
56//! let arrow_record_batch = arrow_stream_reader
57//! .next()
58//! .expect("no record batch")?;
59//!
60//! Ok(())
61//! }
62//! ```
63//! # Authentication
64//! For authentication you need an [Authenticator](yup_oauth2::authenticator::Authenticator), which is provided by the [yup_oauth2](yup_oauth2) crate.
65pub use yup_oauth2;
66
67pub mod googleapis {
68 //! Codegenerated from [`google.cloud.bigquery.storage.v1`](https://github.com/googleapis/googleapis/tree/master/google/cloud/bigquery/storage/v1).
69 tonic::include_proto!("google.cloud.bigquery.storage.v1");
70}
71
72pub mod client;
73pub use client::*;
74
75pub mod read;
76pub use read::*;
77
78macro_rules! errors {
79 { $(
80 $(#[$m:meta])*
81 $id:ident($p:path),
82 )* } => {
83 /// Encompassing error enum for this crate.
84 #[derive(Debug)]
85 pub enum Error {
86 $($(#[$m])* $id($p),)*
87 }
88
89 impl std::fmt::Display for Error {
90 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91 match self {
92 $(
93 $(#[$m])*
94 Self::$id(inner) => {
95 write!(f, "{}: {}", stringify!($id), inner)
96 },
97 )*
98 }
99 }
100 }
101
102 impl std::error::Error for Error { }
103
104 $(
105 $(#[$m])*
106 impl From<$p> for Error {
107 fn from(inner: $p) -> Self {
108 Self::$id(inner)
109 }
110 }
111 )*
112 };
113}
114
115errors! {
116 Transport(tonic::transport::Error),
117 Status(tonic::Status),
118 MetadataEncoding(tonic::metadata::errors::InvalidMetadataValue),
119 Auth(yup_oauth2::Error),
120 InvalidResponse(String),
121 Io(std::io::Error),
122 #[cfg(feature = "arrow")]
123 Arrow(arrow::error::ArrowError),
124}
125
126impl Error {
127 pub(crate) fn invalid<S: AsRef<str>>(s: S) -> Self {
128 Self::InvalidResponse(s.as_ref().to_string())
129 }
130}