1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
pub mod config;

use crate::config::*;
use anyhow::Result;
use opendal::Operator;
use spring_boot::app::AppBuilder;
use spring_boot::async_trait;
use spring_boot::config::Configurable;
use spring_boot::plugin::Plugin;
use std::str::FromStr;

pub type Op = Operator;

#[derive(Configurable)]
#[config_prefix = "opendal"]
pub struct OpenDALPlugin;

#[async_trait]
impl Plugin for OpenDALPlugin {
    async fn build(&self, app: &mut AppBuilder) {
        let config = app
            .get_config::<OpenDALConfig>(self)
            .expect("OpenDAL plugin config load failed");

        let connect: Operator = Self::operator(config).expect("OpenDAL operator construct failed");
        app.add_component(connect);
    }
}

impl OpenDALPlugin {
    pub fn operator(config: OpenDALConfig) -> Result<Operator> {
        let scheme = opendal::Scheme::from_str(&config.scheme).map_err(|err| {
            opendal::Error::new(opendal::ErrorKind::Unexpected, "not supported scheme")
                .set_source(err)
        })?;

        let options = config.options.unwrap_or_default();

        #[allow(unused_mut)]
        let mut op = Operator::via_iter(scheme, options)?;

        if let Some(layers) = config.layers {
            for layer in layers {
                log::debug!("layer-{} enable", layer);
                match layer {
                    #[cfg(feature = "layers-chaos")]
                    Layers::Chaos { error_ratio } => {
                        op = op.layer(opendal::layers::ChaosLayer::new(error_ratio));
                    }
                    #[cfg(feature = "layers-metrics")]
                    Layers::Metrics => {
                        op = op.layer(opendal::layers::MetricsLayer);
                    }
                    #[cfg(feature = "layers-mime-guess")]
                    Layers::MimeGuess => {
                        op = op.layer(opendal::layers::MimeGuessLayer::default());
                    }
                    #[cfg(feature = "layers-prometheus")]
                    Layers::Prometheus {
                        requests_duration_seconds_buckets,
                        bytes_total_buckets,
                        path_label_level,
                    } => {
                        let registry = prometheus::default_registry();
                        let mut prometheus_layer =
                            opendal::layers::PrometheusLayer::with_registry(registry.clone());
                        if let Some(requests_duration_seconds_buckets) =
                            requests_duration_seconds_buckets
                        {
                            prometheus_layer = prometheus_layer.requests_duration_seconds_buckets(
                                requests_duration_seconds_buckets,
                            );
                        }
                        if let Some(bytes_total_buckets) = bytes_total_buckets {
                            prometheus_layer =
                                prometheus_layer.bytes_total_buckets(bytes_total_buckets);
                        }
                        if let Some(path_label_level) = path_label_level {
                            prometheus_layer = prometheus_layer.enable_path_label(path_label_level);
                        }
                        op = op.layer(prometheus_layer);
                    }
                    #[cfg(feature = "layers-prometheus-client")]
                    Layers::PrometheusClient => {
                        let mut registry = prometheus_client::registry::Registry::default();
                        op = op.layer(opendal::layers::PrometheusClientLayer::new(&mut registry));
                    }
                    #[cfg(feature = "layers-fastrace")]
                    Layers::Fastrace => {
                        op = op.layer(opendal::layers::FastraceLayer);
                    }
                    #[cfg(feature = "layers-tracing")]
                    Layers::Tracing => {
                        op = op.layer(opendal::layers::TracingLayer);
                    }
                    #[cfg(feature = "layers-otel-trace")]
                    Layers::OtelTrace => {
                        op = op.layer(opendal::layers::OtelTraceLayer);
                    }
                    #[cfg(feature = "layers-throttle")]
                    Layers::Throttle { bandwidth, burst } => {
                        op = op.layer(opendal::layers::ThrottleLayer::new(bandwidth, burst));
                    }
                    #[cfg(feature = "layers-await-tree")]
                    Layers::AwaitTree => {
                        op = op.layer(opendal::layers::AwaitTreeLayer::new());
                    }
                    #[cfg(feature = "layers-async-backtrace")]
                    Layers::AsyncBacktrace => {
                        op = op.layer(opendal::layers::AsyncBacktraceLayer);
                    }
                    #[cfg(feature = "layers-blocking")]
                    Layers::Blocking => {
                        if !cfg!(feature = "test-layers") && op.info().native_capability().blocking
                        {
                            log::warn!("Blocking layer is not necessary for this operator");
                            continue;
                        }
                        match tokio::runtime::Handle::try_current() {
                            Ok(handle) => {
                                let _guard = handle.enter();
                                op = op.layer(opendal::layers::BlockingLayer::create()?);
                            }
                            Err(e) => {
                                log::error!("{}", e);
                            }
                        }
                    }
                    #[cfg(all(target_os = "linux", feature = "layers-dtrace"))]
                    Layers::Dtrace => {
                        op = op.layer(opendal::layers::DtraceLayer::default());
                    }
                    #[allow(unreachable_patterns)]
                    _ => {
                        panic!(
                            "Maybe you forgotten to enable the [services-{}] feature!",
                            layer
                        );
                    }
                }
            }
        }

        Ok(op)
    }
}