1use crate::Result;
7use async_trait::async_trait;
8use bytes::Bytes;
9use futures::stream::BoxStream;
10use futures::{StreamExt, TryStreamExt};
11use object_store::path::Path;
12use object_store::{
13 Error as OSError, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
14 PutMultipartOptions, PutOptions, PutPayload, PutResult, Result as OSResult,
15};
16use std::collections::HashMap;
17use std::fmt::Debug;
18use std::future;
19use std::ops::Range;
20use std::pin::Pin;
21use std::sync::{
22 Arc, Mutex,
23 atomic::{AtomicUsize, Ordering},
24};
25
26pub trait PolicyFnT: Fn(&str, &Path) -> Result<()> + Send + Sync {}
29impl<F> PolicyFnT for F where F: Fn(&str, &Path) -> Result<()> + Send + Sync {}
30impl Debug for dyn PolicyFnT {
31 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32 write!(f, "PolicyFn")
33 }
34}
35type PolicyFn = Arc<dyn PolicyFnT>;
36
37pub trait ObjectMetaPolicyFnT: Fn(&str, ObjectMeta) -> Result<ObjectMeta> + Send + Sync {}
40impl<F> ObjectMetaPolicyFnT for F where F: Fn(&str, ObjectMeta) -> Result<ObjectMeta> + Send + Sync {}
41impl Debug for dyn ObjectMetaPolicyFnT {
42 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43 write!(f, "PolicyFn")
44 }
45}
46type ObjectMetaPolicyFn = Arc<dyn ObjectMetaPolicyFnT>;
47
48#[derive(Debug, Default)]
57pub struct ProxyObjectStorePolicy {
58 before_policies: HashMap<String, PolicyFn>,
62 object_meta_policies: HashMap<String, ObjectMetaPolicyFn>,
65}
66
67impl ProxyObjectStorePolicy {
68 pub fn new() -> Self {
69 Default::default()
70 }
71
72 pub fn set_before_policy(&mut self, name: &str, policy: PolicyFn) {
76 self.before_policies.insert(name.to_string(), policy);
77 }
78
79 pub fn clear_before_policy(&mut self, name: &str) {
80 self.before_policies.remove(name);
81 }
82
83 pub fn set_obj_meta_policy(&mut self, name: &str, policy: ObjectMetaPolicyFn) {
84 self.object_meta_policies.insert(name.to_string(), policy);
85 }
86}
87
88#[derive(Debug)]
94pub struct ProxyObjectStore {
95 target: Arc<dyn ObjectStore>,
96 policy: Arc<Mutex<ProxyObjectStorePolicy>>,
97}
98
99impl ProxyObjectStore {
100 pub fn new(target: Arc<dyn ObjectStore>, policy: Arc<Mutex<ProxyObjectStorePolicy>>) -> Self {
101 Self { target, policy }
102 }
103
104 fn before_method(&self, method: &str, location: &Path) -> OSResult<()> {
105 let policy = self.policy.lock().unwrap();
106 for policy in policy.before_policies.values() {
107 policy(method, location).map_err(OSError::from)?;
108 }
109 Ok(())
110 }
111
112 fn transform_meta(&self, method: &str, meta: ObjectMeta) -> OSResult<ObjectMeta> {
113 let policy = self.policy.lock().unwrap();
114 let mut meta = meta;
115 for policy in policy.object_meta_policies.values() {
116 meta = policy(method, meta).map_err(OSError::from)?;
117 }
118 Ok(meta)
119 }
120}
121
122impl std::fmt::Display for ProxyObjectStore {
123 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
124 write!(f, "ProxyObjectStore({})", self.target)
125 }
126}
127
128#[derive(Debug)]
133pub struct CountingObjectStore {
134 target: Arc<dyn ObjectStore>,
135 listing_count: Arc<AtomicUsize>,
136}
137
138impl CountingObjectStore {
139 pub fn new(target: Arc<dyn ObjectStore>, listing_count: Arc<AtomicUsize>) -> Self {
140 Self {
141 target,
142 listing_count,
143 }
144 }
145
146 fn record_listing(&self) {
147 self.listing_count.fetch_add(1, Ordering::SeqCst);
148 }
149
150 fn delegate_list(
151 &self,
152 prefix: Option<&Path>,
153 ) -> Pin<Box<dyn futures::Stream<Item = OSResult<ObjectMeta>> + Send>> {
154 self.target.list(prefix)
155 }
156}
157
158impl std::fmt::Display for CountingObjectStore {
159 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
160 write!(f, "CountingObjectStore({})", self.target)
161 }
162}
163
164#[async_trait]
165impl ObjectStore for ProxyObjectStore {
166 async fn put_opts(
167 &self,
168 location: &Path,
169 bytes: PutPayload,
170 opts: PutOptions,
171 ) -> OSResult<PutResult> {
172 self.before_method("put", location)?;
173 self.target.put_opts(location, bytes, opts).await
174 }
175
176 async fn put_multipart_opts(
177 &self,
178 location: &Path,
179 opts: PutMultipartOptions,
180 ) -> OSResult<Box<dyn MultipartUpload>> {
181 self.before_method("put_multipart", location)?;
182 self.target.put_multipart_opts(location, opts).await
183 }
184
185 async fn get_opts(&self, location: &Path, options: GetOptions) -> OSResult<GetResult> {
186 self.before_method("get_opts", location)?;
187 self.target.get_opts(location, options).await
188 }
189
190 async fn get_range(&self, location: &Path, range: Range<u64>) -> OSResult<Bytes> {
191 self.before_method("get_range", location)?;
192 self.target.get_range(location, range).await
193 }
194
195 async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> OSResult<Vec<Bytes>> {
196 self.before_method("get_ranges", location)?;
197 self.target.get_ranges(location, ranges).await
198 }
199
200 async fn head(&self, location: &Path) -> OSResult<ObjectMeta> {
201 self.before_method("head", location)?;
202 let meta = self.target.head(location).await?;
203 self.transform_meta("head", meta)
204 }
205
206 async fn delete(&self, location: &Path) -> OSResult<()> {
207 self.before_method("delete", location)?;
208 self.target.delete(location).await
209 }
210
211 fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, OSResult<ObjectMeta>> {
212 let target = self.target.clone();
213 let policy = Arc::clone(&self.policy);
214
215 target
216 .list(prefix)
217 .and_then(move |meta| {
218 let policy = policy.lock().unwrap();
219 let mut meta = meta;
220 for p in policy.object_meta_policies.values() {
221 meta = p("list", meta).map_err(OSError::from).unwrap();
222 }
223 future::ready(Ok(meta))
224 })
225 .boxed()
226 }
227
228 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> OSResult<ListResult> {
229 self.target.list_with_delimiter(prefix).await
230 }
231
232 async fn copy(&self, from: &Path, to: &Path) -> OSResult<()> {
233 self.before_method("copy", from)?;
234 self.target.copy(from, to).await
235 }
236
237 async fn rename(&self, from: &Path, to: &Path) -> OSResult<()> {
238 self.before_method("rename", from)?;
239 self.target.rename(from, to).await
240 }
241
242 async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> OSResult<()> {
243 self.before_method("copy_if_not_exists", from)?;
244 self.target.copy_if_not_exists(from, to).await
245 }
246}
247
248#[async_trait]
249impl ObjectStore for CountingObjectStore {
250 async fn put_opts(
251 &self,
252 location: &Path,
253 bytes: PutPayload,
254 opts: PutOptions,
255 ) -> OSResult<PutResult> {
256 self.target.put_opts(location, bytes, opts).await
257 }
258
259 async fn put_multipart_opts(
260 &self,
261 location: &Path,
262 opts: PutMultipartOptions,
263 ) -> OSResult<Box<dyn MultipartUpload>> {
264 self.target.put_multipart_opts(location, opts).await
265 }
266
267 async fn get_opts(&self, location: &Path, options: GetOptions) -> OSResult<GetResult> {
268 self.target.get_opts(location, options).await
269 }
270
271 async fn get_range(&self, location: &Path, range: Range<u64>) -> OSResult<Bytes> {
272 self.target.get_range(location, range).await
273 }
274
275 async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> OSResult<Vec<Bytes>> {
276 self.target.get_ranges(location, ranges).await
277 }
278
279 async fn head(&self, location: &Path) -> OSResult<ObjectMeta> {
280 self.target.head(location).await
281 }
282
283 async fn delete(&self, location: &Path) -> OSResult<()> {
284 self.target.delete(location).await
285 }
286
287 fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, OSResult<ObjectMeta>> {
288 self.record_listing();
289 self.delegate_list(prefix).boxed()
290 }
291
292 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> OSResult<ListResult> {
293 self.record_listing();
294 self.target.list_with_delimiter(prefix).await
295 }
296
297 async fn copy(&self, from: &Path, to: &Path) -> OSResult<()> {
298 self.target.copy(from, to).await
299 }
300
301 async fn rename(&self, from: &Path, to: &Path) -> OSResult<()> {
302 self.target.rename(from, to).await
303 }
304
305 async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> OSResult<()> {
306 self.target.copy_if_not_exists(from, to).await
307 }
308}