spring_opendal/
lib.rs

1//! [![spring-rs](https://img.shields.io/github/stars/spring-rs/spring-rs)](https://spring-rs.github.io/docs/plugins/spring-opendal)
2#![doc(html_favicon_url = "https://spring-rs.github.io/favicon.ico")]
3#![doc(html_logo_url = "https://spring-rs.github.io/logo.svg")]
4pub mod config;
5
6use crate::config::*;
7use anyhow::Result;
8use opendal::Operator;
9use spring::app::AppBuilder;
10use spring::async_trait;
11use spring::config::ConfigRegistry;
12use spring::plugin::{MutableComponentRegistry, Plugin};
13use std::str::FromStr;
14
15pub type Op = Operator;
16
17pub struct OpenDALPlugin;
18
19#[async_trait]
20impl Plugin for OpenDALPlugin {
21    async fn build(&self, app: &mut AppBuilder) {
22        let config = app
23            .get_config::<OpenDALConfig>()
24            .expect("OpenDAL plugin config load failed");
25
26        let connect = Self::operator(config).expect("OpenDAL operator construct failed");
27        app.add_component(connect);
28    }
29}
30
31impl OpenDALPlugin {
32    pub fn operator(config: OpenDALConfig) -> Result<Operator> {
33        let scheme = opendal::Scheme::from_str(&config.scheme).map_err(|err| {
34            opendal::Error::new(opendal::ErrorKind::Unexpected, "not supported scheme")
35                .set_source(err)
36        })?;
37
38        let options = config.options.unwrap_or_default();
39
40        #[allow(unused_mut)]
41        let mut op = Operator::via_iter(scheme, options)?;
42
43        if let Some(layers) = config.layers {
44            if let Some(layer) = layers.into_iter().next() {
45                log::debug!("layer-{layer} enable");
46                match layer {
47                    #[cfg(feature = "layers-chaos")]
48                    Layers::Chaos { error_ratio } => {
49                        op = op.layer(opendal::layers::ChaosLayer::new(error_ratio));
50                    }
51                    #[cfg(feature = "layers-metrics")]
52                    Layers::Metrics => {
53                        op = op.layer(opendal::layers::MetricsLayer::default());
54                    }
55                    #[cfg(feature = "layers-mime-guess")]
56                    Layers::MimeGuess => {
57                        op = op.layer(opendal::layers::MimeGuessLayer::default());
58                    }
59                    #[cfg(feature = "layers-prometheus")]
60                    Layers::Prometheus {
61                        duration_seconds_buckets,
62                        bytes_buckets,
63                    } => {
64                        let mut builder = opendal::layers::PrometheusLayer::builder();
65                        if let Some(duration_seconds_buckets) = duration_seconds_buckets {
66                            builder = builder.duration_seconds_buckets(duration_seconds_buckets);
67                        }
68                        if let Some(bytes_buckets) = bytes_buckets {
69                            builder = builder.bytes_buckets(bytes_buckets);
70                        }
71                        let prometheus_layer = builder
72                            .register_default()
73                            .expect("Failed to register with the global registry");
74
75                        op = op.layer(prometheus_layer);
76                    }
77                    #[cfg(feature = "layers-prometheus-client")]
78                    Layers::PrometheusClient => {
79                        let mut registry = prometheus_client::registry::Registry::default();
80                        op = op.layer(
81                            opendal::layers::PrometheusClientLayer::builder()
82                                .register(&mut registry),
83                        );
84                    }
85                    #[cfg(feature = "layers-fastrace")]
86                    Layers::Fastrace => {
87                        op = op.layer(opendal::layers::FastraceLayer);
88                    }
89                    #[cfg(feature = "layers-tracing")]
90                    Layers::Tracing => {
91                        op = op.layer(opendal::layers::TracingLayer);
92                    }
93                    #[cfg(feature = "layers-otel-trace")]
94                    Layers::OtelTrace => {
95                        op = op.layer(opendal::layers::OtelTraceLayer);
96                    }
97                    #[cfg(feature = "layers-throttle")]
98                    Layers::Throttle { bandwidth, burst } => {
99                        op = op.layer(opendal::layers::ThrottleLayer::new(bandwidth, burst));
100                    }
101                    #[cfg(feature = "layers-await-tree")]
102                    Layers::AwaitTree => {
103                        op = op.layer(opendal::layers::AwaitTreeLayer::new());
104                    }
105                    #[cfg(feature = "layers-async-backtrace")]
106                    Layers::AsyncBacktrace => {
107                        op = op.layer(opendal::layers::AsyncBacktraceLayer);
108                    }
109                    #[cfg(all(target_os = "linux", feature = "layers-dtrace"))]
110                    Layers::Dtrace => {
111                        op = op.layer(opendal::layers::DtraceLayer::default());
112                    }
113                    #[allow(unreachable_patterns)]
114                    _ => {
115                        panic!("Maybe you forgotten to enable the [services-{layer}] feature!");
116                    }
117                }
118            }
119        }
120
121        Ok(op)
122    }
123}
124
125#[cfg(test)]
126mod tests {
127    use super::config::*;
128    use super::OpenDALPlugin;
129    use log::debug;
130
131    #[tokio::test]
132    async fn blocking() {
133        let config = OpenDALConfig {
134            scheme: "memory".to_string(),
135            options: None,
136            layers: None,
137        };
138
139        debug!("config: {config:?}");
140
141        let op = OpenDALPlugin::operator(config).unwrap();
142        let o = op.write("test", b"test".to_vec()).await;
143        assert!(o.is_ok(), "Write operation failed: {o:?}");
144
145        let res = op.read("test").await.unwrap();
146
147        assert_eq!(res.to_vec(), b"test".to_vec());
148    }
149}