dbcrossbarlib/drivers/bigquery_schema/
mod.rs1use std::{fmt, str::FromStr};
4
5use crate::common::*;
6use crate::drivers::bigquery_shared::{BqColumn, BqTable, TableName, Usage};
7
8#[derive(Clone, Debug)]
10pub struct BigQuerySchemaLocator {
11 path: PathOrStdio,
12}
13
14impl fmt::Display for BigQuerySchemaLocator {
15 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
16 self.path.fmt_locator_helper(Self::scheme(), f)
17 }
18}
19
20impl FromStr for BigQuerySchemaLocator {
21 type Err = Error;
22
23 fn from_str(s: &str) -> Result<Self> {
24 let path = PathOrStdio::from_str_locator_helper(Self::scheme(), s)?;
25 Ok(BigQuerySchemaLocator { path })
26 }
27}
28
29impl Locator for BigQuerySchemaLocator {
30 fn as_any(&self) -> &dyn Any {
31 self
32 }
33
34 fn schema(&self, _ctx: Context) -> BoxFuture<Option<Schema>> {
35 schema_helper(self.to_owned()).boxed()
36 }
37
38 fn write_schema(
39 &self,
40 _ctx: Context,
41 schema: Schema,
42 if_exists: IfExists,
43 ) -> BoxFuture<()> {
44 write_schema_helper(self.to_owned(), schema, if_exists).boxed()
45 }
46}
47
48impl LocatorStatic for BigQuerySchemaLocator {
49 fn scheme() -> &'static str {
50 "bigquery-schema:"
51 }
52
53 fn features() -> Features {
54 Features {
55 locator: LocatorFeatures::Schema | LocatorFeatures::WriteSchema,
56 write_schema_if_exists: IfExistsFeatures::no_append(),
57 source_args: EnumSet::empty(),
58 dest_args: EnumSet::empty(),
59 dest_if_exists: EnumSet::empty(),
60 _placeholder: (),
61 }
62 }
63}
64
65#[instrument(level = "trace", name = "bigquery_schema::schema")]
67async fn schema_helper(source: BigQuerySchemaLocator) -> Result<Option<Schema>> {
68 let input = source.path.open_async().await?;
70 let data = async_read_to_end(input)
71 .await
72 .with_context(|| format!("error reading {}", source.path))?;
73
74 let columns: Vec<BqColumn> = serde_json::from_slice(&data)
76 .with_context(|| format!("error parsing {}", source.path))?;
77
78 let arbitrary_name = TableName::from_str("unused:unused.unused")?;
80 let bq_table = BqTable {
81 name: arbitrary_name,
82 columns,
83 };
84 let mut table = bq_table.to_table()?;
85 table.name = "unnamed".to_owned();
86 Ok(Some(Schema::from_table(table)?))
87}
88
89#[instrument(level = "trace", name = "bigquery_schema::write_schema")]
91async fn write_schema_helper(
92 dest: BigQuerySchemaLocator,
93 schema: Schema,
94 if_exists: IfExists,
95) -> Result<()> {
96 let arbitrary_name = TableName::from_str("unused:unused.unused")?;
101
102 let bq_table = BqTable::for_table_name_and_columns(
104 &schema,
105 arbitrary_name,
106 &schema.table.columns,
107 Usage::FinalTable,
108 )?;
109
110 let mut f = dest.path.create_async(if_exists).await?;
112 buffer_sync_write_and_copy_to_async(&mut f, |buff| {
113 bq_table.write_json_schema(buff)
114 })
115 .await
116 .with_context(|| format!("error writing to {}", dest.path))?;
117 f.flush().await?;
118 Ok(())
119}