1use std::collections::HashMap;
2
3use opendal::{Error, ErrorKind, Operator, OperatorRegistry, OperatorUri};
4use url::Url;
5
6pub trait OperatorFactory: Send + Sync {
7 fn load(&self, uri: &str) -> Result<Operator, Error>;
8}
9
10pub struct DefaultOperatorFactory;
11
12impl DefaultOperatorFactory {
13 pub fn new() -> Self {
14 Self
15 }
16}
17
18impl Default for DefaultOperatorFactory {
19 fn default() -> Self {
20 Self::new()
21 }
22}
23
24impl OperatorFactory for DefaultOperatorFactory {
25 fn load(&self, uri: &str) -> Result<Operator, Error> {
26 Operator::from_uri(uri)
27 }
28}
29
30impl OperatorFactory for OperatorRegistry {
31 fn load(&self, uri: &str) -> Result<Operator, Error> {
32 OperatorRegistry::load(self, uri)
33 }
34}
35
36pub struct ProfileOperatorFactory {
37 profiles: HashMap<String, HashMap<String, String>>,
38}
39
40impl ProfileOperatorFactory {
41 pub fn new(profiles: HashMap<String, HashMap<String, String>>) -> Self {
42 Self { profiles }
43 }
44}
45
46impl OperatorFactory for ProfileOperatorFactory {
47 fn load(&self, uri: &str) -> Result<Operator, Error> {
48 let mut url = Url::parse(uri).map_err(|err| {
49 Error::new(ErrorKind::ConfigInvalid, "Failed to parse uri").set_source(err)
50 })?;
51
52 let profile_name = url.scheme();
53
54 let profile = self
55 .profiles
56 .get(profile_name)
57 .ok_or_else(|| {
58 Error::new(ErrorKind::Unsupported, "Profile not found")
61 .with_context("profile_name", profile_name)
62 })?
63 .clone();
64
65 let scheme = profile.get("type").cloned().ok_or_else(|| {
66 Error::new(ErrorKind::ConfigInvalid, "Missing 'type' in profile")
67 .with_context("profile_name", profile_name)
68 })?;
69
70 let _ = url.set_scheme(scheme.as_str());
72
73 let uri = OperatorUri::new(url.as_str(), profile)?;
74
75 Operator::from_uri(uri)
76 }
77}
78
79pub struct ChainOperatorFactory {
80 factories: Vec<Box<dyn OperatorFactory>>,
81}
82
83impl ChainOperatorFactory {
84 pub fn new<I>(factories: I) -> Self
85 where
86 I: IntoIterator<Item = Box<dyn OperatorFactory>>,
87 {
88 Self {
89 factories: factories.into_iter().collect(),
90 }
91 }
92
93 pub fn builder() -> ChainOperatorFactoryBuilder {
94 ChainOperatorFactoryBuilder::default()
95 }
96}
97
98impl OperatorFactory for ChainOperatorFactory {
99 fn load(&self, uri: &str) -> Result<Operator, Error> {
100 for factory in &self.factories {
101 match factory.load(uri) {
102 Ok(op) => return Ok(op),
103 Err(e) if e.kind() == ErrorKind::Unsupported => continue,
104 Err(e) => return Err(e),
105 }
106 }
107
108 Err(Error::new(ErrorKind::Unsupported, "Unsupported URI").with_context("uri", uri))
109 }
110}
111
112#[derive(Default)]
113pub struct ChainOperatorFactoryBuilder {
114 factories: Vec<Box<dyn OperatorFactory>>,
115}
116
117impl ChainOperatorFactoryBuilder {
118 pub fn then(mut self, factory: impl OperatorFactory + 'static) -> Self {
119 self.factories.push(Box::new(factory));
120 self
121 }
122
123 pub fn build(self) -> ChainOperatorFactory {
124 ChainOperatorFactory {
125 factories: self.factories,
126 }
127 }
128}
129
130pub struct LambdaOperatorFactory<Inner, F>
131where
132 Inner: OperatorFactory,
133 F: Fn(Operator) -> Operator + Send + Sync,
134{
135 inner: Inner,
136 transform: F,
137}
138
139impl<Inner, F> LambdaOperatorFactory<Inner, F>
140where
141 Inner: OperatorFactory,
142 F: Fn(Operator) -> Operator + Send + Sync,
143{
144 pub fn new(inner: Inner, transform: F) -> Self {
145 Self { inner, transform }
146 }
147}
148
149impl<Inner, F> OperatorFactory for LambdaOperatorFactory<Inner, F>
150where
151 Inner: OperatorFactory,
152 F: Fn(Operator) -> Operator + Send + Sync,
153{
154 fn load(&self, uri: &str) -> Result<Operator, Error> {
155 let op = self.inner.load(uri)?;
156
157 Ok((self.transform)(op))
158 }
159}