opendal_util/
factory.rs

1use std::collections::HashMap;
2
3use opendal::{Error, ErrorKind, Operator, OperatorRegistry, OperatorUri};
4use url::Url;
5
6pub trait OperatorFactory: Send + Sync {
7    fn load(&self, uri: &str) -> Result<Operator, Error>;
8}
9
10pub struct DefaultOperatorFactory;
11
12impl DefaultOperatorFactory {
13    pub fn new() -> Self {
14        Self
15    }
16}
17
18impl Default for DefaultOperatorFactory {
19    fn default() -> Self {
20        Self::new()
21    }
22}
23
24impl OperatorFactory for DefaultOperatorFactory {
25    fn load(&self, uri: &str) -> Result<Operator, Error> {
26        Operator::from_uri(uri)
27    }
28}
29
30impl OperatorFactory for OperatorRegistry {
31    fn load(&self, uri: &str) -> Result<Operator, Error> {
32        OperatorRegistry::load(self, uri)
33    }
34}
35
36pub struct ProfileOperatorFactory {
37    profiles: HashMap<String, HashMap<String, String>>,
38}
39
40impl ProfileOperatorFactory {
41    pub fn new(profiles: HashMap<String, HashMap<String, String>>) -> Self {
42        Self { profiles }
43    }
44}
45
46impl OperatorFactory for ProfileOperatorFactory {
47    fn load(&self, uri: &str) -> Result<Operator, Error> {
48        let mut url = Url::parse(uri).map_err(|err| {
49            Error::new(ErrorKind::ConfigInvalid, "Failed to parse uri").set_source(err)
50        })?;
51
52        let profile_name = url.scheme();
53
54        let profile = self
55            .profiles
56            .get(profile_name)
57            .ok_or_else(|| {
58                // Operator::from_uri returns this error as well when a scheme is unsupported,
59                // even though the error description says this error is returned when an operation is not supported.
60                Error::new(ErrorKind::Unsupported, "Profile not found")
61                    .with_context("profile_name", profile_name)
62            })?
63            .clone();
64
65        let scheme = profile.get("type").cloned().ok_or_else(|| {
66            Error::new(ErrorKind::ConfigInvalid, "Missing 'type' in profile")
67                .with_context("profile_name", profile_name)
68        })?;
69
70        // This should never fail (sagikazarmark, 2025)
71        let _ = url.set_scheme(scheme.as_str());
72
73        let uri = OperatorUri::new(url.as_str(), profile)?;
74
75        Operator::from_uri(uri)
76    }
77}
78
79pub struct ChainOperatorFactory {
80    factories: Vec<Box<dyn OperatorFactory>>,
81}
82
83impl ChainOperatorFactory {
84    pub fn new<I>(factories: I) -> Self
85    where
86        I: IntoIterator<Item = Box<dyn OperatorFactory>>,
87    {
88        Self {
89            factories: factories.into_iter().collect(),
90        }
91    }
92
93    pub fn builder() -> ChainOperatorFactoryBuilder {
94        ChainOperatorFactoryBuilder::default()
95    }
96}
97
98impl OperatorFactory for ChainOperatorFactory {
99    fn load(&self, uri: &str) -> Result<Operator, Error> {
100        for factory in &self.factories {
101            match factory.load(uri) {
102                Ok(op) => return Ok(op),
103                Err(e) if e.kind() == ErrorKind::Unsupported => continue,
104                Err(e) => return Err(e),
105            }
106        }
107
108        Err(Error::new(ErrorKind::Unsupported, "Unsupported URI").with_context("uri", uri))
109    }
110}
111
112#[derive(Default)]
113pub struct ChainOperatorFactoryBuilder {
114    factories: Vec<Box<dyn OperatorFactory>>,
115}
116
117impl ChainOperatorFactoryBuilder {
118    pub fn then(mut self, factory: impl OperatorFactory + 'static) -> Self {
119        self.factories.push(Box::new(factory));
120        self
121    }
122
123    pub fn build(self) -> ChainOperatorFactory {
124        ChainOperatorFactory {
125            factories: self.factories,
126        }
127    }
128}
129
130pub struct LambdaOperatorFactory<Inner, F>
131where
132    Inner: OperatorFactory,
133    F: Fn(Operator) -> Operator + Send + Sync,
134{
135    inner: Inner,
136    transform: F,
137}
138
139impl<Inner, F> LambdaOperatorFactory<Inner, F>
140where
141    Inner: OperatorFactory,
142    F: Fn(Operator) -> Operator + Send + Sync,
143{
144    pub fn new(inner: Inner, transform: F) -> Self {
145        Self { inner, transform }
146    }
147}
148
149impl<Inner, F> OperatorFactory for LambdaOperatorFactory<Inner, F>
150where
151    Inner: OperatorFactory,
152    F: Fn(Operator) -> Operator + Send + Sync,
153{
154    fn load(&self, uri: &str) -> Result<Operator, Error> {
155        let op = self.inner.load(uri)?;
156
157        Ok((self.transform)(op))
158    }
159}