1#![doc(html_favicon_url = "https://spring-rs.github.io/favicon.ico")]
3#![doc(html_logo_url = "https://spring-rs.github.io/logo.svg")]
4pub mod config;
5
6use crate::config::*;
7use anyhow::Result;
8use opendal::Operator;
9use spring::app::AppBuilder;
10use spring::async_trait;
11use spring::config::ConfigRegistry;
12use spring::plugin::{MutableComponentRegistry, Plugin};
13use std::str::FromStr;
14
15pub type Op = Operator;
16
17pub struct OpenDALPlugin;
18
19#[async_trait]
20impl Plugin for OpenDALPlugin {
21 async fn build(&self, app: &mut AppBuilder) {
22 let config = app
23 .get_config::<OpenDALConfig>()
24 .expect("OpenDAL plugin config load failed");
25
26 let connect = Self::operator(config).expect("OpenDAL operator construct failed");
27 app.add_component(connect);
28 }
29}
30
31impl OpenDALPlugin {
32 pub fn operator(config: OpenDALConfig) -> Result<Operator> {
33 let scheme = opendal::Scheme::from_str(&config.scheme).map_err(|err| {
34 opendal::Error::new(opendal::ErrorKind::Unexpected, "not supported scheme")
35 .set_source(err)
36 })?;
37
38 let options = config.options.unwrap_or_default();
39
40 #[allow(unused_mut)]
41 let mut op = Operator::via_iter(scheme, options)?;
42
43 if let Some(layers) = config.layers {
44 if let Some(layer) = layers.into_iter().next() {
45 log::debug!("layer-{layer} enable");
46 match layer {
47 #[cfg(feature = "layers-chaos")]
48 Layers::Chaos { error_ratio } => {
49 op = op.layer(opendal::layers::ChaosLayer::new(error_ratio));
50 }
51 #[cfg(feature = "layers-metrics")]
52 Layers::Metrics => {
53 op = op.layer(opendal::layers::MetricsLayer::default());
54 }
55 #[cfg(feature = "layers-mime-guess")]
56 Layers::MimeGuess => {
57 op = op.layer(opendal::layers::MimeGuessLayer::default());
58 }
59 #[cfg(feature = "layers-prometheus")]
60 Layers::Prometheus {
61 duration_seconds_buckets,
62 bytes_buckets,
63 } => {
64 let mut builder = opendal::layers::PrometheusLayer::builder();
65 if let Some(duration_seconds_buckets) = duration_seconds_buckets {
66 builder = builder.duration_seconds_buckets(duration_seconds_buckets);
67 }
68 if let Some(bytes_buckets) = bytes_buckets {
69 builder = builder.bytes_buckets(bytes_buckets);
70 }
71 let prometheus_layer = builder
72 .register_default()
73 .expect("Failed to register with the global registry");
74
75 op = op.layer(prometheus_layer);
76 }
77 #[cfg(feature = "layers-prometheus-client")]
78 Layers::PrometheusClient => {
79 let mut registry = prometheus_client::registry::Registry::default();
80 op = op.layer(
81 opendal::layers::PrometheusClientLayer::builder()
82 .register(&mut registry),
83 );
84 }
85 #[cfg(feature = "layers-fastrace")]
86 Layers::Fastrace => {
87 op = op.layer(opendal::layers::FastraceLayer);
88 }
89 #[cfg(feature = "layers-tracing")]
90 Layers::Tracing => {
91 op = op.layer(opendal::layers::TracingLayer);
92 }
93 #[cfg(feature = "layers-otel-trace")]
94 Layers::OtelTrace => {
95 op = op.layer(opendal::layers::OtelTraceLayer);
96 }
97 #[cfg(feature = "layers-throttle")]
98 Layers::Throttle { bandwidth, burst } => {
99 op = op.layer(opendal::layers::ThrottleLayer::new(bandwidth, burst));
100 }
101 #[cfg(feature = "layers-await-tree")]
102 Layers::AwaitTree => {
103 op = op.layer(opendal::layers::AwaitTreeLayer::new());
104 }
105 #[cfg(feature = "layers-async-backtrace")]
106 Layers::AsyncBacktrace => {
107 op = op.layer(opendal::layers::AsyncBacktraceLayer);
108 }
109 #[cfg(all(target_os = "linux", feature = "layers-dtrace"))]
110 Layers::Dtrace => {
111 op = op.layer(opendal::layers::DtraceLayer::default());
112 }
113 #[allow(unreachable_patterns)]
114 _ => {
115 panic!("Maybe you forgotten to enable the [services-{layer}] feature!");
116 }
117 }
118 }
119 }
120
121 Ok(op)
122 }
123}
124
125#[cfg(test)]
126mod tests {
127 use super::config::*;
128 use super::OpenDALPlugin;
129 use log::debug;
130
131 #[tokio::test]
132 async fn blocking() {
133 let config = OpenDALConfig {
134 scheme: "memory".to_string(),
135 options: None,
136 layers: None,
137 };
138
139 debug!("config: {config:?}");
140
141 let op = OpenDALPlugin::operator(config).unwrap();
142 let o = op.write("test", b"test".to_vec()).await;
143 assert!(o.is_ok(), "Write operation failed: {o:?}");
144
145 let res = op.read("test").await.unwrap();
146
147 assert_eq!(res.to_vec(), b"test".to_vec());
148 }
149}