1use std::collections::HashMap;
2use std::str::FromStr;
3
4use async_trait::async_trait;
5use opendal::{Operator, Scheme};
6use serde::{Deserialize, Serialize};
7use shuttle_service::{
8 error::{CustomError, Error as ShuttleError},
9 IntoResource, ResourceFactory, ResourceInputBuilder,
10};
11
12#[derive(Serialize)]
13pub struct Opendal {
14 scheme: String,
15}
16
17impl Default for Opendal {
18 fn default() -> Self {
19 Self {
20 scheme: "memory".to_string(),
21 }
22 }
23}
24
25impl Opendal {
26 pub fn scheme(mut self, scheme: &str) -> Self {
27 self.scheme = scheme.to_string();
28 self
29 }
30}
31
32#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
33pub struct OpendalOutput {
34 scheme: String,
35 cfg: HashMap<String, String>,
36}
37
38pub struct Error(opendal::Error);
39
40impl From<Error> for shuttle_service::Error {
41 fn from(error: Error) -> Self {
42 let msg = format!("Failed to build opendal resource: {:?}", error.0);
43 ShuttleError::Custom(CustomError::msg(msg))
44 }
45}
46
47#[async_trait]
48impl ResourceInputBuilder for Opendal {
49 type Input = OpendalOutput;
50 type Output = OpendalOutput;
51
52 async fn build(self, factory: &ResourceFactory) -> Result<Self::Input, ShuttleError> {
53 Ok(OpendalOutput {
54 scheme: self.scheme,
55 cfg: factory
56 .get_secrets()
57 .into_iter()
58 .map(|(k, v)| (k, v.expose().clone()))
59 .collect(),
60 })
61 }
62}
63
64#[async_trait]
65impl IntoResource<Operator> for OpendalOutput {
66 async fn into_resource(self) -> Result<Operator, shuttle_service::Error> {
67 let scheme = Scheme::from_str(&self.scheme).map_err(Error)?;
68
69 Ok(Operator::via_iter(scheme, self.cfg).map_err(Error)?)
70 }
71}
72
73#[cfg(test)]
74mod test {
75 use super::*;
76 use shuttle_service::Secret;
77
78 #[tokio::test]
79 async fn opendal_fs() {
80 let factory = ResourceFactory::new(
81 Default::default(),
82 [("root", "/tmp")]
83 .into_iter()
84 .map(|(k, v)| (k.to_string(), Secret::new(v.to_string())))
85 .collect(),
86 Default::default(),
87 );
88
89 let odal = Opendal::default().scheme("fs");
90 let output = odal.build(&factory).await.unwrap();
91 assert_eq!(output.scheme, "fs");
92
93 let op: Operator = output.into_resource().await.unwrap();
94 assert_eq!(op.info().scheme(), Scheme::Fs)
95 }
96
97 #[tokio::test]
98 async fn opendal_s3() {
99 let factory = ResourceFactory::new(
100 Default::default(),
101 [
102 ("bucket", "test"),
103 ("access_key_id", "ak"),
104 ("secret_access_key", "sk"),
105 ("region", "us-east-1"),
106 ]
107 .into_iter()
108 .map(|(k, v)| (k.to_string(), Secret::new(v.to_string())))
109 .collect(),
110 Default::default(),
111 );
112
113 let odal = Opendal::default().scheme("s3");
114 let output = odal.build(&factory).await.unwrap();
115 assert_eq!(output.scheme, "s3");
116 assert_eq!(output.cfg.get("access_key_id").unwrap(), "ak");
117 assert_eq!(output.cfg.get("secret_access_key").unwrap(), "sk");
118
119 let op: Operator = output.into_resource().await.unwrap();
120 assert_eq!(op.info().scheme(), Scheme::S3)
121 }
122}