1use crate::Result;
7use async_trait::async_trait;
8use bytes::Bytes;
9use futures::stream::BoxStream;
10use futures::{StreamExt, TryStreamExt};
11use object_store::path::Path;
12use object_store::{
13 CopyOptions, Error as OSError, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta,
14 ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, RenameOptions,
15 Result as OSResult,
16};
17use std::collections::HashMap;
18use std::fmt::Debug;
19use std::future;
20use std::ops::Range;
21use std::pin::Pin;
22use std::sync::{
23 Arc, Mutex,
24 atomic::{AtomicUsize, Ordering},
25};
26
27pub trait PolicyFnT: Fn(&str, &Path) -> Result<()> + Send + Sync {}
30impl<F> PolicyFnT for F where F: Fn(&str, &Path) -> Result<()> + Send + Sync {}
31impl Debug for dyn PolicyFnT {
32 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
33 write!(f, "PolicyFn")
34 }
35}
36type PolicyFn = Arc<dyn PolicyFnT>;
37
38pub trait ObjectMetaPolicyFnT: Fn(&str, ObjectMeta) -> Result<ObjectMeta> + Send + Sync {}
41impl<F> ObjectMetaPolicyFnT for F where F: Fn(&str, ObjectMeta) -> Result<ObjectMeta> + Send + Sync {}
42impl Debug for dyn ObjectMetaPolicyFnT {
43 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44 write!(f, "PolicyFn")
45 }
46}
47type ObjectMetaPolicyFn = Arc<dyn ObjectMetaPolicyFnT>;
48
49#[derive(Debug, Default)]
58pub struct ProxyObjectStorePolicy {
59 before_policies: HashMap<String, PolicyFn>,
63 object_meta_policies: HashMap<String, ObjectMetaPolicyFn>,
66}
67
68impl ProxyObjectStorePolicy {
69 pub fn new() -> Self {
70 Default::default()
71 }
72
73 pub fn set_before_policy(&mut self, name: &str, policy: PolicyFn) {
77 self.before_policies.insert(name.to_string(), policy);
78 }
79
80 pub fn clear_before_policy(&mut self, name: &str) {
81 self.before_policies.remove(name);
82 }
83
84 pub fn set_obj_meta_policy(&mut self, name: &str, policy: ObjectMetaPolicyFn) {
85 self.object_meta_policies.insert(name.to_string(), policy);
86 }
87}
88
89#[derive(Debug)]
95pub struct ProxyObjectStore {
96 target: Arc<dyn ObjectStore>,
97 policy: Arc<Mutex<ProxyObjectStorePolicy>>,
98}
99
100impl ProxyObjectStore {
101 pub fn new(target: Arc<dyn ObjectStore>, policy: Arc<Mutex<ProxyObjectStorePolicy>>) -> Self {
102 Self { target, policy }
103 }
104
105 fn before_method(&self, method: &str, location: &Path) -> OSResult<()> {
106 let policy = self.policy.lock().unwrap();
107 for policy in policy.before_policies.values() {
108 policy(method, location).map_err(OSError::from)?;
109 }
110 Ok(())
111 }
112
113 fn transform_meta(&self, method: &str, meta: ObjectMeta) -> OSResult<ObjectMeta> {
114 let policy = self.policy.lock().unwrap();
115 let mut meta = meta;
116 for policy in policy.object_meta_policies.values() {
117 meta = policy(method, meta).map_err(OSError::from)?;
118 }
119 Ok(meta)
120 }
121}
122
123impl std::fmt::Display for ProxyObjectStore {
124 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
125 write!(f, "ProxyObjectStore({})", self.target)
126 }
127}
128
129#[derive(Debug)]
134pub struct CountingObjectStore {
135 target: Arc<dyn ObjectStore>,
136 listing_count: Arc<AtomicUsize>,
137}
138
139impl CountingObjectStore {
140 pub fn new(target: Arc<dyn ObjectStore>, listing_count: Arc<AtomicUsize>) -> Self {
141 Self {
142 target,
143 listing_count,
144 }
145 }
146
147 fn record_listing(&self) {
148 self.listing_count.fetch_add(1, Ordering::SeqCst);
149 }
150
151 fn delegate_list(
152 &self,
153 prefix: Option<&Path>,
154 ) -> Pin<Box<dyn futures::Stream<Item = OSResult<ObjectMeta>> + Send>> {
155 self.target.list(prefix)
156 }
157}
158
159impl std::fmt::Display for CountingObjectStore {
160 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
161 write!(f, "CountingObjectStore({})", self.target)
162 }
163}
164
165#[async_trait]
166impl ObjectStore for ProxyObjectStore {
167 async fn put_opts(
168 &self,
169 location: &Path,
170 bytes: PutPayload,
171 opts: PutOptions,
172 ) -> OSResult<PutResult> {
173 self.before_method("put", location)?;
174 self.target.put_opts(location, bytes, opts).await
175 }
176
177 async fn put_multipart_opts(
178 &self,
179 location: &Path,
180 opts: PutMultipartOptions,
181 ) -> OSResult<Box<dyn MultipartUpload>> {
182 self.before_method("put_multipart", location)?;
183 self.target.put_multipart_opts(location, opts).await
184 }
185
186 async fn get_opts(&self, location: &Path, options: GetOptions) -> OSResult<GetResult> {
187 self.before_method("get_opts", location)?;
188 let is_head = options.head;
189 let mut result = self.target.get_opts(location, options).await?;
190 if is_head {
191 result.meta = self.transform_meta("head", result.meta)?;
192 }
193 Ok(result)
194 }
195
196 async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> OSResult<Vec<Bytes>> {
197 self.before_method("get_ranges", location)?;
198 self.target.get_ranges(location, ranges).await
199 }
200
201 fn delete_stream(
202 &self,
203 locations: BoxStream<'static, OSResult<Path>>,
204 ) -> BoxStream<'static, OSResult<Path>> {
205 let policy = Arc::clone(&self.policy);
206 let checked = locations
207 .and_then(move |location| {
208 let result = {
209 let policy = policy.lock().unwrap();
210 policy
211 .before_policies
212 .values()
213 .try_for_each(|policy| policy("delete", &location).map_err(OSError::from))
214 };
215 future::ready(result.map(|_| location))
216 })
217 .boxed();
218 self.target.delete_stream(checked)
219 }
220
221 fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, OSResult<ObjectMeta>> {
222 let target = self.target.clone();
223 let policy = Arc::clone(&self.policy);
224
225 target
226 .list(prefix)
227 .and_then(move |meta| {
228 let policy = policy.lock().unwrap();
229 let mut meta = meta;
230 for p in policy.object_meta_policies.values() {
231 meta = p("list", meta).map_err(OSError::from).unwrap();
232 }
233 future::ready(Ok(meta))
234 })
235 .boxed()
236 }
237
238 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> OSResult<ListResult> {
239 self.target.list_with_delimiter(prefix).await
240 }
241
242 async fn copy_opts(&self, from: &Path, to: &Path, opts: CopyOptions) -> OSResult<()> {
243 self.before_method("copy", from)?;
244 self.target.copy_opts(from, to, opts).await
245 }
246
247 async fn rename_opts(&self, from: &Path, to: &Path, opts: RenameOptions) -> OSResult<()> {
248 self.before_method("rename", from)?;
249 self.target.rename_opts(from, to, opts).await
250 }
251}
252
253#[async_trait]
254impl ObjectStore for CountingObjectStore {
255 async fn put_opts(
256 &self,
257 location: &Path,
258 bytes: PutPayload,
259 opts: PutOptions,
260 ) -> OSResult<PutResult> {
261 self.target.put_opts(location, bytes, opts).await
262 }
263
264 async fn put_multipart_opts(
265 &self,
266 location: &Path,
267 opts: PutMultipartOptions,
268 ) -> OSResult<Box<dyn MultipartUpload>> {
269 self.target.put_multipart_opts(location, opts).await
270 }
271
272 async fn get_opts(&self, location: &Path, options: GetOptions) -> OSResult<GetResult> {
273 self.target.get_opts(location, options).await
274 }
275
276 async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> OSResult<Vec<Bytes>> {
277 self.target.get_ranges(location, ranges).await
278 }
279
280 fn delete_stream(
281 &self,
282 locations: BoxStream<'static, OSResult<Path>>,
283 ) -> BoxStream<'static, OSResult<Path>> {
284 self.target.delete_stream(locations)
285 }
286
287 fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, OSResult<ObjectMeta>> {
288 self.record_listing();
289 self.delegate_list(prefix).boxed()
290 }
291
292 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> OSResult<ListResult> {
293 self.record_listing();
294 self.target.list_with_delimiter(prefix).await
295 }
296
297 async fn copy_opts(&self, from: &Path, to: &Path, opts: CopyOptions) -> OSResult<()> {
298 self.target.copy_opts(from, to, opts).await
299 }
300}