1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
use percent_encoding::percent_decode_str;
use std::{fmt, path::PathBuf, str::FromStr};
use crate::common::*;
mod ast;
use self::ast::SourceFile;
#[derive(Clone, Debug)]
pub struct DbcrossbarTsLocator {
path: PathOrStdio,
fragment: String,
}
impl fmt::Display for DbcrossbarTsLocator {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let encode = |s: &str| s.replace('%', "%25").replace('#', "%23");
write!(
f,
"{}{}#{}",
Self::scheme(),
encode(&self.path.to_string()),
encode(&self.fragment)
)
}
}
impl FromStr for DbcrossbarTsLocator {
type Err = Error;
fn from_str(s: &str) -> Result<Self> {
if !s.starts_with(Self::scheme()) {
return Err(format_err!(
"expected {:?} to start with {}",
s,
Self::scheme()
));
}
let parts = s[Self::scheme().len()..].splitn(2, '#').collect::<Vec<_>>();
if parts.len() != 2 {
return Err(format_err!("expected '#' in {:?}", s));
}
let decode = |idx| {
percent_decode_str(parts[idx])
.decode_utf8()
.with_context(|| format!("error decoding {:?}", s))
};
let path = decode(0)?;
let fragment = decode(1)?.into_owned();
let path = if path == "-" {
PathOrStdio::Stdio
} else {
PathOrStdio::Path(PathBuf::from(path.into_owned()))
};
Ok(DbcrossbarTsLocator { path, fragment })
}
}
impl Locator for DbcrossbarTsLocator {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self, _ctx: Context) -> BoxFuture<Option<Schema>> {
schema_helper(self.to_owned()).boxed()
}
}
impl LocatorStatic for DbcrossbarTsLocator {
fn scheme() -> &'static str {
"dbcrossbar-ts:"
}
fn features() -> Features {
Features {
locator: LocatorFeatures::Schema.into(),
write_schema_if_exists: EnumSet::empty(),
source_args: EnumSet::empty(),
dest_args: EnumSet::empty(),
dest_if_exists: EnumSet::empty(),
_placeholder: (),
}
}
fn is_unstable() -> bool {
true
}
}
#[instrument(name = "dbcrossbar_schema::write_schema", level = "trace")]
async fn schema_helper(source: DbcrossbarTsLocator) -> Result<Option<Schema>> {
let input = source.path.open_async().await?;
let data = async_read_to_end(input)
.await
.with_context(|| format!("error reading {}", source.path))?;
let data = String::from_utf8(data)
.with_context(|| format!("found non-UTF-8 data in {}", source.path))?;
let source_file = SourceFile::parse(source.path.to_string(), data)?;
let table = source_file.definition_to_table(&source.fragment)?;
Ok(Some(Schema::from_table(table)?))
}