Skip to main content

summer_opendal/
lib.rs

1//! [![summer-rs](https://img.shields.io/github/stars/summer-rs/summer-rs)](https://summer-rs.github.io/docs/plugins/summer-opendal)
2#![doc(html_favicon_url = "https://summer-rs.github.io/favicon.ico")]
3#![doc(html_logo_url = "https://summer-rs.github.io/logo.svg")]
4pub mod config;
5
6use crate::config::*;
7use anyhow::Result;
8use opendal::Operator;
9use summer::app::AppBuilder;
10use summer::async_trait;
11use summer::config::ConfigRegistry;
12use summer::plugin::{MutableComponentRegistry, Plugin};
13use std::str::FromStr;
14
15pub type Op = Operator;
16
17pub struct OpenDALPlugin;
18
19#[async_trait]
20impl Plugin for OpenDALPlugin {
21    async fn build(&self, app: &mut AppBuilder) {
22        let config = app
23            .get_config::<OpenDALConfig>()
24            .expect("OpenDAL plugin config load failed");
25
26        let connect = Self::operator(config).expect("OpenDAL operator construct failed");
27        app.add_component(connect);
28    }
29}
30
31impl OpenDALPlugin {
32    pub fn operator(config: OpenDALConfig) -> Result<Operator> {
33        let scheme = opendal::Scheme::from_str(&config.scheme).map_err(|err| {
34            opendal::Error::new(opendal::ErrorKind::Unexpected, "not supported scheme")
35                .set_source(err)
36        })?;
37
38        let options = config.options.unwrap_or_default();
39
40        #[allow(unused_mut)]
41        let mut op = Operator::via_iter(scheme, options)?;
42
43        if let Some(layers) = config.layers {
44            if let Some(layer) = layers.into_iter().next() {
45                log::debug!("layer-{layer} enable");
46                match layer {
47                    #[cfg(feature = "layers-chaos")]
48                    Layers::Chaos { error_ratio } => {
49                        op = op.layer(opendal::layers::ChaosLayer::new(error_ratio));
50                    }
51                    #[cfg(feature = "layers-metrics")]
52                    Layers::Metrics => {
53                        op = op.layer(opendal::layers::MetricsLayer::default());
54                    }
55                    #[cfg(feature = "layers-mime-guess")]
56                    Layers::MimeGuess => {
57                        op = op.layer(opendal::layers::MimeGuessLayer::default());
58                    }
59                    #[cfg(feature = "layers-prometheus")]
60                    Layers::Prometheus {
61                        duration_seconds_buckets,
62                        bytes_buckets,
63                    } => {
64                        let mut builder = opendal::layers::PrometheusLayer::builder();
65                        if let Some(duration_seconds_buckets) = duration_seconds_buckets {
66                            builder = builder.duration_seconds_buckets(duration_seconds_buckets);
67                        }
68                        if let Some(bytes_buckets) = bytes_buckets {
69                            builder = builder.bytes_buckets(bytes_buckets);
70                        }
71                        let prometheus_layer = builder
72                            .register_default()
73                            .expect("Failed to register with the global registry");
74
75                        op = op.layer(prometheus_layer);
76                    }
77                    #[cfg(feature = "layers-prometheus-client")]
78                    Layers::PrometheusClient => {
79                        let mut registry = prometheus_client::registry::Registry::default();
80                        op = op.layer(
81                            opendal::layers::PrometheusClientLayer::builder()
82                                .register(&mut registry),
83                        );
84                    }
85                    #[cfg(feature = "layers-fastrace")]
86                    Layers::Fastrace => {
87                        op = op.layer(opendal::layers::FastraceLayer);
88                    }
89                    #[cfg(feature = "layers-tracing")]
90                    Layers::Tracing => {
91                        op = op.layer(opendal::layers::TracingLayer);
92                    }
93                    #[cfg(feature = "layers-otel-trace")]
94                    Layers::OtelTrace => {
95                        op = op.layer(opendal::layers::OtelTraceLayer);
96                    }
97                    #[cfg(feature = "layers-throttle")]
98                    Layers::Throttle { bandwidth, burst } => {
99                        op = op.layer(opendal::layers::ThrottleLayer::new(bandwidth, burst));
100                    }
101                    #[cfg(feature = "layers-await-tree")]
102                    Layers::AwaitTree => {
103                        op = op.layer(opendal::layers::AwaitTreeLayer::new());
104                    }
105                    #[cfg(feature = "layers-async-backtrace")]
106                    Layers::AsyncBacktrace => {
107                        op = op.layer(opendal::layers::AsyncBacktraceLayer);
108                    }
109                    #[cfg(feature = "layers-dtrace")]
110                    Layers::Dtrace => {
111                        #[cfg(target_os = "linux")]
112                        {
113                            op = op.layer(opendal::layers::DtraceLayer::default());
114                        }
115                        #[cfg(not(target_os = "linux"))]
116                        {
117                            log::warn!("DtraceLayer is only supported on Linux, skipping");
118                        }
119                    }
120                    #[allow(unreachable_patterns)]
121                    _ => {
122                        panic!("Maybe you forgotten to enable the [services-{layer}] feature!");
123                    }
124                }
125            }
126        }
127
128        Ok(op)
129    }
130}
131
132#[cfg(test)]
133mod tests {
134    use super::config::*;
135    use super::OpenDALPlugin;
136    use log::debug;
137
138    #[tokio::test]
139    async fn blocking() {
140        let config = OpenDALConfig {
141            scheme: "memory".to_string(),
142            options: None,
143            layers: None,
144        };
145
146        debug!("config: {config:?}");
147
148        let op = OpenDALPlugin::operator(config).unwrap();
149        let o = op.write("test", b"test".to_vec()).await;
150        assert!(o.is_ok(), "Write operation failed: {o:?}");
151
152        let res = op.read("test").await.unwrap();
153
154        assert_eq!(res.to_vec(), b"test".to_vec());
155    }
156}