dbcrossbarlib/drivers/gs/
mod.rs

1//! Support for Google Cloud Storage.
2
3use std::ffi::OsStr;
4use std::{fmt, str::FromStr};
5
6use crate::common::*;
7use crate::drivers::bigquery::BigQueryLocator;
8use crate::locator::PathLikeLocator;
9
10mod local_data;
11mod prepare_as_destination;
12mod write_local_data;
13mod write_remote_data;
14
15use local_data::local_data_helper;
16pub(crate) use prepare_as_destination::prepare_as_destination_helper;
17use write_local_data::write_local_data_helper;
18use write_remote_data::write_remote_data_helper;
19
20#[derive(Clone, Debug)]
21pub(crate) struct GsLocator {
22    url: Url,
23}
24
25impl GsLocator {
26    /// Access the `gs://` URL in this locator.
27    pub(crate) fn as_url(&self) -> &Url {
28        &self.url
29    }
30
31    /// Does this locator point at a `gs://` directory?
32    pub(crate) fn is_directory(&self) -> bool {
33        self.url.path().ends_with('/')
34    }
35
36    /// Does this locator point at a `gs://` CSV file?
37    pub(crate) fn is_csv_file(&self) -> bool {
38        self.url.path().to_ascii_lowercase().ends_with(".csv")
39    }
40}
41
42impl fmt::Display for GsLocator {
43    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
44        self.url.fmt(f)
45    }
46}
47
48impl FromStr for GsLocator {
49    type Err = Error;
50
51    fn from_str(s: &str) -> Result<Self> {
52        if s.starts_with(Self::scheme()) {
53            let url = s
54                .parse::<Url>()
55                .with_context(|| format!("cannot parse {}", s))?;
56            if !url.path().starts_with('/') {
57                Err(format_err!("{} must start with gs://", url))
58            } else {
59                let locator = GsLocator { url };
60                if !locator.is_directory() && !locator.is_csv_file() {
61                    Err(format_err!("{} must end with a '/' or '.csv'", locator))
62                } else {
63                    Ok(locator)
64                }
65            }
66        } else {
67            Err(format_err!("expected {} to begin with gs://", s))
68        }
69    }
70}
71
72impl Locator for GsLocator {
73    fn as_any(&self) -> &dyn Any {
74        self
75    }
76
77    fn local_data(
78        &self,
79        ctx: Context,
80        shared_args: SharedArguments<Unverified>,
81        source_args: SourceArguments<Unverified>,
82    ) -> BoxFuture<Option<BoxStream<CsvStream>>> {
83        local_data_helper(ctx, self.url.clone(), shared_args, source_args).boxed()
84    }
85
86    fn write_local_data(
87        &self,
88        ctx: Context,
89        data: BoxStream<CsvStream>,
90        shared_args: SharedArguments<Unverified>,
91        dest_args: DestinationArguments<Unverified>,
92    ) -> BoxFuture<BoxStream<BoxFuture<BoxLocator>>> {
93        write_local_data_helper(ctx, self.clone(), data, shared_args, dest_args)
94            .boxed()
95    }
96
97    fn supports_write_remote_data(&self, source: &dyn Locator) -> bool {
98        // We can only do `write_remote_data` if `source` is a
99        // `BigQueryLocator`. Otherwise, we need to do `write_local_data` like
100        // normal.
101        //
102        // Also, BigQuery can only write directories of CSV files, so if we're
103        // not pointed to a directory, don't use remote operations.
104        source.as_any().is::<BigQueryLocator>() && self.is_directory()
105    }
106
107    fn write_remote_data(
108        &self,
109        ctx: Context,
110        source: BoxLocator,
111        shared_args: SharedArguments<Unverified>,
112        source_args: SourceArguments<Unverified>,
113        dest_args: DestinationArguments<Unverified>,
114    ) -> BoxFuture<Vec<BoxLocator>> {
115        write_remote_data_helper(
116            ctx,
117            source,
118            self.to_owned(),
119            shared_args,
120            source_args,
121            dest_args,
122        )
123        .boxed()
124    }
125}
126
127impl LocatorStatic for GsLocator {
128    fn scheme() -> &'static str {
129        "gs:"
130    }
131
132    fn features() -> Features {
133        Features {
134            locator: LocatorFeatures::LocalData | LocatorFeatures::WriteLocalData,
135            write_schema_if_exists: EnumSet::empty(),
136            source_args: Default::default(),
137            dest_args: Default::default(),
138            dest_if_exists: IfExistsFeatures::Overwrite.into(),
139            _placeholder: (),
140        }
141    }
142}
143
144impl PathLikeLocator for GsLocator {
145    fn path(&self) -> Option<&OsStr> {
146        Some(OsStr::new(self.url.path()))
147    }
148}
149
150/// Given a `TemporaryStorage`, extract a unique `gs://` temporary directory,
151/// including a random component.
152pub(crate) fn find_gs_temp_dir(
153    temporary_storage: &TemporaryStorage,
154) -> Result<GsLocator> {
155    let mut temp = temporary_storage
156        .find_scheme(GsLocator::scheme())
157        .ok_or_else(|| format_err!("need `--temporary=gs://...` argument"))?
158        .to_owned();
159    if !temp.ends_with('/') {
160        temp.push('/');
161    }
162    temp.push_str(&TemporaryStorage::random_tag());
163    temp.push('/');
164    GsLocator::from_str(&temp)
165}
166
167#[cfg(test)]
168mod tests {
169    use crate::data_streams::DataFormat;
170
171    use super::*;
172
173    #[test]
174    fn test_s3_locator_url_parses() {
175        let locator = GsLocator::from_str("gs://bucket/path/").unwrap();
176        assert_eq!(locator.url.scheme(), "gs");
177        assert_eq!(locator.url.host_str(), Some("bucket"));
178        assert_eq!(locator.url.path(), "/path/");
179    }
180
181    #[test]
182    fn test_directory_locator_has_correct_path_like_properties() {
183        let locator = GsLocator::from_str("gs://bucket/path/").unwrap();
184        assert_eq!(locator.path().unwrap(), "/path/");
185        assert!(locator.is_directory_like());
186        assert!(locator.extension().is_none());
187        assert!(locator.data_format().is_none());
188    }
189
190    #[test]
191    fn test_file_locator_has_correct_path_like_properties() {
192        let locator = GsLocator::from_str("gs://bucket/path/file.csv").unwrap();
193        assert_eq!(locator.path().unwrap(), "/path/file.csv");
194        assert!(!locator.is_directory_like());
195        assert_eq!(locator.extension().unwrap(), "csv");
196        assert_eq!(locator.data_format(), Some(DataFormat::Csv));
197    }
198}