use lazy_static::lazy_static;
use regex::Regex;
use std::{fmt, marker::PhantomData, str::FromStr};
use crate::args::EnumSetExt;
use crate::common::*;
use crate::drivers::find_driver;
pub enum DisplayOutputLocators {
Never,
IfRequested,
ByDefault,
}
pub trait Locator: fmt::Debug + fmt::Display + Send + Sync + 'static {
fn as_any(&self) -> &dyn Any;
fn schema(&self, _ctx: Context) -> BoxFuture<Option<Table>> {
async { Ok(None) }.boxed()
}
fn write_schema(
&self,
_ctx: Context,
_schema: Table,
_if_exists: IfExists,
) -> BoxFuture<()> {
let err = format_err!("cannot write schema to {}", self);
async move { Err(err) }.boxed()
}
fn count(
&self,
_ctx: Context,
_shared_args: SharedArguments<Unverified>,
_source_args: SourceArguments<Unverified>,
) -> BoxFuture<usize> {
let err = format_err!("cannot count records at {}", self);
async move { Err(err) }.boxed()
}
fn local_data(
&self,
_ctx: Context,
_shared_args: SharedArguments<Unverified>,
_source_args: SourceArguments<Unverified>,
) -> BoxFuture<Option<BoxStream<CsvStream>>> {
async { Ok(None) }.boxed()
}
fn display_output_locators(&self) -> DisplayOutputLocators {
DisplayOutputLocators::IfRequested
}
fn write_local_data(
&self,
_ctx: Context,
_data: BoxStream<CsvStream>,
_shared_args: SharedArguments<Unverified>,
_dest_args: DestinationArguments<Unverified>,
) -> BoxFuture<BoxStream<BoxFuture<BoxLocator>>> {
let err = format_err!("cannot write data to {}", self);
async move { Err(err) }.boxed()
}
fn supports_write_remote_data(&self, _source: &dyn Locator) -> bool {
false
}
fn write_remote_data(
&self,
_ctx: Context,
source: BoxLocator,
_shared_args: SharedArguments<Unverified>,
_source_args: SourceArguments<Unverified>,
_dest_args: DestinationArguments<Unverified>,
) -> BoxFuture<Vec<BoxLocator>> {
let err = format_err!("cannot write_remote_data from source {}", source);
async move { Err(err) }.boxed()
}
}
pub type BoxLocator = Box<dyn Locator>;
impl FromStr for BoxLocator {
type Err = Error;
fn from_str(s: &str) -> Result<Self> {
lazy_static! {
static ref SCHEME_RE: Regex = Regex::new("^[A-Za-z][-A-Za-z0-9+.]*:")
.expect("invalid regex in source");
}
let cap = SCHEME_RE
.captures(s)
.ok_or_else(|| format_err!("cannot parse locator: {:?}", s))?;
let scheme = &cap[0];
let driver = find_driver(scheme)?;
driver.parse(s)
}
}
#[test]
fn locator_from_str_to_string_roundtrip() {
let locators = vec![
"bigquery:my_project:my_dataset.my_table",
"bigquery-schema:dir/my_table.json",
"bigml:dataset",
"bigml:datasets",
"bigml:dataset/abc123",
"bigml:source",
"bigml:sources",
"csv:file.csv",
"csv:dir/",
"dbcrossbar-schema:file.json",
"gs://example-bucket/tmp/",
"postgres://localhost:5432/db#my_table",
"postgres-sql:dir/my_table.sql",
"s3://example/my-dir/",
];
for locator in locators.into_iter() {
let parsed: BoxLocator = locator.parse().unwrap();
assert_eq!(parsed.to_string(), locator);
}
}
#[derive(Debug, EnumSetType)]
pub enum LocatorFeatures {
Schema,
WriteSchema,
LocalData,
WriteLocalData,
Count,
}
#[derive(Debug, Copy, Clone)]
pub struct Features {
pub locator: EnumSet<LocatorFeatures>,
pub write_schema_if_exists: EnumSet<IfExistsFeatures>,
pub source_args: EnumSet<SourceArgumentsFeatures>,
pub dest_args: EnumSet<DestinationArgumentsFeatures>,
pub dest_if_exists: EnumSet<IfExistsFeatures>,
pub(crate) _placeholder: (),
}
impl Features {
pub(crate) fn empty() -> Self {
Features {
locator: EnumSet::empty(),
write_schema_if_exists: EnumSet::empty(),
source_args: EnumSet::empty(),
dest_args: EnumSet::empty(),
dest_if_exists: EnumSet::empty(),
_placeholder: (),
}
}
}
impl fmt::Display for Features {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
if self.locator.contains(LocatorFeatures::Schema) {
writeln!(f, "- conv FROM")?;
}
if self.locator.contains(LocatorFeatures::WriteSchema) {
writeln!(f, "- conv TO:")?;
writeln!(f, " {}", self.write_schema_if_exists.display())?;
}
if self.locator.contains(LocatorFeatures::Count) {
writeln!(f, "- count")?;
if !self.source_args.is_empty() {
writeln!(f, " {}", self.source_args.display())?;
}
}
if self.locator.contains(LocatorFeatures::LocalData) {
writeln!(f, "- cp FROM:")?;
if !self.source_args.is_empty() {
writeln!(f, " {}", self.source_args.display())?;
}
}
if self.locator.contains(LocatorFeatures::WriteLocalData) {
writeln!(f, "- cp TO:")?;
if !self.dest_args.is_empty() {
writeln!(f, " {}", self.dest_args.display())?;
}
writeln!(f, " {}", self.dest_if_exists.display())?;
}
Ok(())
}
}
pub trait LocatorStatic: Locator + Clone + FromStr<Err = Error> + Sized {
fn boxed(self) -> BoxLocator {
Box::new(self)
}
fn scheme() -> &'static str;
fn features() -> Features;
}
pub trait LocatorDriver: Send + Sync + 'static {
fn scheme(&self) -> &str;
fn name(&self) -> &str {
let scheme = self.scheme();
assert!(scheme.ends_with(':'));
&scheme[..scheme.len() - 1]
}
fn features(&self) -> Features;
fn parse(&self, s: &str) -> Result<BoxLocator>;
}
pub(crate) struct LocatorDriverWrapper<L> {
_phantom: PhantomData<L>,
}
impl<L: LocatorStatic> LocatorDriverWrapper<L> {
pub(crate) fn new() -> Self {
LocatorDriverWrapper {
_phantom: PhantomData,
}
}
}
impl<L: LocatorStatic> LocatorDriver for LocatorDriverWrapper<L> {
fn scheme(&self) -> &str {
L::scheme()
}
fn features(&self) -> Features {
L::features()
}
fn parse(&self, s: &str) -> Result<BoxLocator> {
Ok(Box::new(s.parse::<L>()?))
}
}