object_storage_lib/
lib.rs1mod action;
2mod config;
3mod context;
4
5use async_trait::async_trait;
6pub use config::Config;
7pub use config::Oss;
8use context::Context;
9use headers::{ContentType, HeaderMapExt};
10use hyper;
11use hyper::body::Incoming;
12use hyper::{Request, Response, StatusCode};
13use object_storage_sdk as sdk;
14use sdk::storage::GetReq;
15use std::collections::HashMap;
16use std::net::SocketAddr;
17use std::sync::Arc;
18use tihu::SharedString;
19use tihu_native::http::Body;
20use tihu_native::http::BoxBody;
21use tihu_native::http::HttpHandler;
22use tihu_native::http::RequestData;
23
24pub struct GetMapping {
25 path_prefix: SharedString, key_prefix: SharedString, }
28
29pub struct UploadMapping {
30 path: SharedString, key_prefix: SharedString, }
33
34pub struct OssHandler {
35 context: Arc<Context>,
36 get_mapping: Vec<GetMapping>,
37 upload_mapping: Vec<UploadMapping>,
38 delete_path: Option<SharedString>,
39 namespaces: Vec<SharedString>,
40}
41
42impl OssHandler {
43 pub async fn try_init_from_config(
44 config: Config,
45 adjust_error_code: impl Fn(i32) -> i32 + Send + Sync + 'static,
46 ) -> Result<Self, anyhow::Error> {
47 let context = Context::try_init_from_config(config, adjust_error_code)?;
48 let context = Arc::new(context);
49 return Ok(Self {
50 context: context,
51 get_mapping: Vec::new(),
52 upload_mapping: Vec::new(),
53 delete_path: None,
54 namespaces: Vec::new(),
55 });
56 }
57
58 pub fn add_get_mapping(
59 &mut self,
60 path_prefix: SharedString,
61 key_prefix: SharedString,
62 ) -> &mut Self {
63 self.get_mapping.push(GetMapping {
64 path_prefix: path_prefix.clone(),
65 key_prefix: key_prefix,
66 });
67 self.namespaces.push(path_prefix);
68 return self;
69 }
70
71 pub fn add_upload_mapping(
72 &mut self,
73 path: SharedString,
74 key_prefix: SharedString,
75 ) -> &mut Self {
76 self.upload_mapping.push(UploadMapping {
77 path: path.clone(),
78 key_prefix: key_prefix,
79 });
80 self.namespaces.push(path);
81 return self;
82 }
83
84 pub fn set_delete_path(&mut self, delete_path: SharedString) -> &mut Self {
85 self.delete_path.replace(delete_path);
86 return self;
87 }
88}
89
90#[async_trait]
91impl HttpHandler for OssHandler {
92 fn namespace(&self) -> &[SharedString] {
93 return &self.namespaces;
94 }
95 async fn handle(
96 &self,
97 request: Request<Incoming>,
98 _remote_addr: SocketAddr,
99 _request_data: &mut RequestData,
100 prefix: Option<&str>,
101 ) -> Result<Response<BoxBody>, anyhow::Error> {
102 let prefix = prefix.unwrap_or("");
103 let (_, route) = request.uri().path().split_at(prefix.len());
104 for get_mapping in &self.get_mapping {
105 let path_prefix = &get_mapping.path_prefix;
106 let key_prefix = &get_mapping.key_prefix;
107 if route.starts_with(path_prefix.as_ref()) {
108 let key = format!(
109 "{}{}",
110 key_prefix,
111 String::from_utf8_lossy(&route.as_bytes()[path_prefix.len()..])
112 );
113 if key.is_empty() {
114 let mut response = text_response("file not found!");
115 *response.status_mut() = StatusCode::NOT_FOUND;
116 return Ok(response.map(From::from));
117 } else {
118 let query = request
119 .uri()
120 .query()
121 .map(|query| {
122 form_urlencoded::parse(query.as_bytes()).collect::<HashMap<_, _>>()
123 })
124 .unwrap_or_default();
125 let filename = query.get("filename").map(|filename| filename.as_ref());
126 let response =
127 action::storage::get(self.context.clone(), GetReq { key: key }, filename)
128 .await?;
129 return Ok(response.map(From::from));
130 }
131 }
132 }
133
134 for upload_mapping in &self.upload_mapping {
135 let path = &upload_mapping.path;
136 let key_prefix = &upload_mapping.key_prefix;
137 if route == path.as_ref() {
138 let hash = request.headers().get("X-Hash");
139 let hash = hash
140 .map(|hash| hash.to_str().map(|hash| hash.to_string()).ok())
141 .flatten();
142 if let Some(hash) = hash {
143 let response =
144 action::storage::put(self.context.clone(), request, key_prefix, hash)
145 .await?;
146 return Ok(response.map(From::from));
147 } else {
148 let response = Response::builder()
149 .status(StatusCode::BAD_REQUEST)
150 .body(Body::from("BAD REQUEST"))
151 .unwrap();
152 return Ok(response.map(From::from));
153 }
154 }
155 }
156
157 if let Some(delete_path) = self.delete_path.as_ref() {
158 if route == delete_path.as_ref() {
159 let hash = request.headers().get("X-Hash");
160 let hash = hash
161 .map(|hash| hash.to_str().map(|hash| hash.to_string()).ok())
162 .flatten();
163 if let Some(hash) = hash {
164 let response =
165 action::storage::delete(self.context.clone(), request, hash).await?;
166 return Ok(response.map(From::from));
167 } else {
168 let response = Response::builder()
169 .status(StatusCode::BAD_REQUEST)
170 .body(Body::from("BAD REQUEST"))
171 .unwrap();
172 return Ok(response.map(From::from));
173 }
174 }
175 }
176 let mut response = text_response("file not found!");
177 *response.status_mut() = StatusCode::NOT_FOUND;
178 return Ok(response.map(From::from));
179 }
180}
181
182fn text_response<T: Into<Body>>(body: T) -> Response<Body> {
183 let mut response = Response::new(body.into());
184 response
185 .headers_mut()
186 .typed_insert(ContentType::text_utf8());
187 return response;
188}