dbcrossbarlib/drivers/bigquery_schema/
mod.rs

1//! Support for `bigquery-schema` locators.
2
3use std::{fmt, str::FromStr};
4
5use crate::common::*;
6use crate::drivers::bigquery_shared::{BqColumn, BqTable, TableName, Usage};
7
8/// A JSON file containing BigQuery table schema.
9#[derive(Clone, Debug)]
10pub struct BigQuerySchemaLocator {
11    path: PathOrStdio,
12}
13
14impl fmt::Display for BigQuerySchemaLocator {
15    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
16        self.path.fmt_locator_helper(Self::scheme(), f)
17    }
18}
19
20impl FromStr for BigQuerySchemaLocator {
21    type Err = Error;
22
23    fn from_str(s: &str) -> Result<Self> {
24        let path = PathOrStdio::from_str_locator_helper(Self::scheme(), s)?;
25        Ok(BigQuerySchemaLocator { path })
26    }
27}
28
29impl Locator for BigQuerySchemaLocator {
30    fn as_any(&self) -> &dyn Any {
31        self
32    }
33
34    fn schema(&self, _ctx: Context) -> BoxFuture<Option<Schema>> {
35        schema_helper(self.to_owned()).boxed()
36    }
37
38    fn write_schema(
39        &self,
40        _ctx: Context,
41        schema: Schema,
42        if_exists: IfExists,
43    ) -> BoxFuture<()> {
44        write_schema_helper(self.to_owned(), schema, if_exists).boxed()
45    }
46}
47
48impl LocatorStatic for BigQuerySchemaLocator {
49    fn scheme() -> &'static str {
50        "bigquery-schema:"
51    }
52
53    fn features() -> Features {
54        Features {
55            locator: LocatorFeatures::Schema | LocatorFeatures::WriteSchema,
56            write_schema_if_exists: IfExistsFeatures::no_append(),
57            source_args: EnumSet::empty(),
58            dest_args: EnumSet::empty(),
59            dest_if_exists: EnumSet::empty(),
60            _placeholder: (),
61        }
62    }
63}
64
65/// Implementation of `schema`, but as a real `async` function.
66#[instrument(level = "trace", name = "bigquery_schema::schema")]
67async fn schema_helper(source: BigQuerySchemaLocator) -> Result<Option<Schema>> {
68    // Read our input.
69    let input = source.path.open_async().await?;
70    let data = async_read_to_end(input)
71        .await
72        .with_context(|| format!("error reading {}", source.path))?;
73
74    // Parse our input as a list of columns.
75    let columns: Vec<BqColumn> = serde_json::from_slice(&data)
76        .with_context(|| format!("error parsing {}", source.path))?;
77
78    // Build a `BqTable`, convert it, and set a placeholder name.
79    let arbitrary_name = TableName::from_str("unused:unused.unused")?;
80    let bq_table = BqTable {
81        name: arbitrary_name,
82        columns,
83    };
84    let mut table = bq_table.to_table()?;
85    table.name = "unnamed".to_owned();
86    Ok(Some(Schema::from_table(table)?))
87}
88
89/// Implementation of `write_schema`, but as a real `async` function.
90#[instrument(level = "trace", name = "bigquery_schema::write_schema")]
91async fn write_schema_helper(
92    dest: BigQuerySchemaLocator,
93    schema: Schema,
94    if_exists: IfExists,
95) -> Result<()> {
96    // The BigQuery table name doesn't matter here, because our BigQuery schema
97    // won't use it. We could convert `table.name` into a valid BigQuery table
98    // name, but because BigQuery table names obey fairly strict restrictions,
99    // it's not worth doing the work if we're just going throw it away.
100    let arbitrary_name = TableName::from_str("unused:unused.unused")?;
101
102    // Convert our schema to a BigQuery table.
103    let bq_table = BqTable::for_table_name_and_columns(
104        &schema,
105        arbitrary_name,
106        &schema.table.columns,
107        Usage::FinalTable,
108    )?;
109
110    // Output our schema to our destination.
111    let mut f = dest.path.create_async(if_exists).await?;
112    buffer_sync_write_and_copy_to_async(&mut f, |buff| {
113        bq_table.write_json_schema(buff)
114    })
115    .await
116    .with_context(|| format!("error writing to {}", dest.path))?;
117    f.flush().await?;
118    Ok(())
119}