dbcrossbarlib/drivers/gs/
mod.rs1use std::ffi::OsStr;
4use std::{fmt, str::FromStr};
5
6use crate::common::*;
7use crate::drivers::bigquery::BigQueryLocator;
8use crate::locator::PathLikeLocator;
9
10mod local_data;
11mod prepare_as_destination;
12mod write_local_data;
13mod write_remote_data;
14
15use local_data::local_data_helper;
16pub(crate) use prepare_as_destination::prepare_as_destination_helper;
17use write_local_data::write_local_data_helper;
18use write_remote_data::write_remote_data_helper;
19
20#[derive(Clone, Debug)]
21pub(crate) struct GsLocator {
22 url: Url,
23}
24
25impl GsLocator {
26 pub(crate) fn as_url(&self) -> &Url {
28 &self.url
29 }
30
31 pub(crate) fn is_directory(&self) -> bool {
33 self.url.path().ends_with('/')
34 }
35
36 pub(crate) fn is_csv_file(&self) -> bool {
38 self.url.path().to_ascii_lowercase().ends_with(".csv")
39 }
40}
41
42impl fmt::Display for GsLocator {
43 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
44 self.url.fmt(f)
45 }
46}
47
48impl FromStr for GsLocator {
49 type Err = Error;
50
51 fn from_str(s: &str) -> Result<Self> {
52 if s.starts_with(Self::scheme()) {
53 let url = s
54 .parse::<Url>()
55 .with_context(|| format!("cannot parse {}", s))?;
56 if !url.path().starts_with('/') {
57 Err(format_err!("{} must start with gs://", url))
58 } else {
59 let locator = GsLocator { url };
60 if !locator.is_directory() && !locator.is_csv_file() {
61 Err(format_err!("{} must end with a '/' or '.csv'", locator))
62 } else {
63 Ok(locator)
64 }
65 }
66 } else {
67 Err(format_err!("expected {} to begin with gs://", s))
68 }
69 }
70}
71
72impl Locator for GsLocator {
73 fn as_any(&self) -> &dyn Any {
74 self
75 }
76
77 fn local_data(
78 &self,
79 ctx: Context,
80 shared_args: SharedArguments<Unverified>,
81 source_args: SourceArguments<Unverified>,
82 ) -> BoxFuture<Option<BoxStream<CsvStream>>> {
83 local_data_helper(ctx, self.url.clone(), shared_args, source_args).boxed()
84 }
85
86 fn write_local_data(
87 &self,
88 ctx: Context,
89 data: BoxStream<CsvStream>,
90 shared_args: SharedArguments<Unverified>,
91 dest_args: DestinationArguments<Unverified>,
92 ) -> BoxFuture<BoxStream<BoxFuture<BoxLocator>>> {
93 write_local_data_helper(ctx, self.clone(), data, shared_args, dest_args)
94 .boxed()
95 }
96
97 fn supports_write_remote_data(&self, source: &dyn Locator) -> bool {
98 source.as_any().is::<BigQueryLocator>() && self.is_directory()
105 }
106
107 fn write_remote_data(
108 &self,
109 ctx: Context,
110 source: BoxLocator,
111 shared_args: SharedArguments<Unverified>,
112 source_args: SourceArguments<Unverified>,
113 dest_args: DestinationArguments<Unverified>,
114 ) -> BoxFuture<Vec<BoxLocator>> {
115 write_remote_data_helper(
116 ctx,
117 source,
118 self.to_owned(),
119 shared_args,
120 source_args,
121 dest_args,
122 )
123 .boxed()
124 }
125}
126
127impl LocatorStatic for GsLocator {
128 fn scheme() -> &'static str {
129 "gs:"
130 }
131
132 fn features() -> Features {
133 Features {
134 locator: LocatorFeatures::LocalData | LocatorFeatures::WriteLocalData,
135 write_schema_if_exists: EnumSet::empty(),
136 source_args: Default::default(),
137 dest_args: Default::default(),
138 dest_if_exists: IfExistsFeatures::Overwrite.into(),
139 _placeholder: (),
140 }
141 }
142}
143
144impl PathLikeLocator for GsLocator {
145 fn path(&self) -> Option<&OsStr> {
146 Some(OsStr::new(self.url.path()))
147 }
148}
149
150pub(crate) fn find_gs_temp_dir(
153 temporary_storage: &TemporaryStorage,
154) -> Result<GsLocator> {
155 let mut temp = temporary_storage
156 .find_scheme(GsLocator::scheme())
157 .ok_or_else(|| format_err!("need `--temporary=gs://...` argument"))?
158 .to_owned();
159 if !temp.ends_with('/') {
160 temp.push('/');
161 }
162 temp.push_str(&TemporaryStorage::random_tag());
163 temp.push('/');
164 GsLocator::from_str(&temp)
165}
166
167#[cfg(test)]
168mod tests {
169 use crate::data_streams::DataFormat;
170
171 use super::*;
172
173 #[test]
174 fn test_s3_locator_url_parses() {
175 let locator = GsLocator::from_str("gs://bucket/path/").unwrap();
176 assert_eq!(locator.url.scheme(), "gs");
177 assert_eq!(locator.url.host_str(), Some("bucket"));
178 assert_eq!(locator.url.path(), "/path/");
179 }
180
181 #[test]
182 fn test_directory_locator_has_correct_path_like_properties() {
183 let locator = GsLocator::from_str("gs://bucket/path/").unwrap();
184 assert_eq!(locator.path().unwrap(), "/path/");
185 assert!(locator.is_directory_like());
186 assert!(locator.extension().is_none());
187 assert!(locator.data_format().is_none());
188 }
189
190 #[test]
191 fn test_file_locator_has_correct_path_like_properties() {
192 let locator = GsLocator::from_str("gs://bucket/path/file.csv").unwrap();
193 assert_eq!(locator.path().unwrap(), "/path/file.csv");
194 assert!(!locator.is_directory_like());
195 assert_eq!(locator.extension().unwrap(), "csv");
196 assert_eq!(locator.data_format(), Some(DataFormat::Csv));
197 }
198}