1use crate::Result;
7use async_trait::async_trait;
8use bytes::Bytes;
9use futures::stream::BoxStream;
10use futures::{StreamExt, TryStreamExt};
11use object_store::path::Path;
12use object_store::{
13 Error as OSError, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
14 PutMultipartOptions, PutOptions, PutPayload, PutResult, Result as OSResult,
15};
16use std::collections::HashMap;
17use std::fmt::Debug;
18use std::future;
19use std::ops::Range;
20use std::sync::{Arc, Mutex};
21
22pub trait PolicyFnT: Fn(&str, &Path) -> Result<()> + Send + Sync {}
25impl<F> PolicyFnT for F where F: Fn(&str, &Path) -> Result<()> + Send + Sync {}
26impl Debug for dyn PolicyFnT {
27 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28 write!(f, "PolicyFn")
29 }
30}
31type PolicyFn = Arc<dyn PolicyFnT>;
32
33pub trait ObjectMetaPolicyFnT: Fn(&str, ObjectMeta) -> Result<ObjectMeta> + Send + Sync {}
36impl<F> ObjectMetaPolicyFnT for F where F: Fn(&str, ObjectMeta) -> Result<ObjectMeta> + Send + Sync {}
37impl Debug for dyn ObjectMetaPolicyFnT {
38 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39 write!(f, "PolicyFn")
40 }
41}
42type ObjectMetaPolicyFn = Arc<dyn ObjectMetaPolicyFnT>;
43
44#[derive(Debug, Default)]
53pub struct ProxyObjectStorePolicy {
54 before_policies: HashMap<String, PolicyFn>,
58 object_meta_policies: HashMap<String, ObjectMetaPolicyFn>,
61}
62
63impl ProxyObjectStorePolicy {
64 pub fn new() -> Self {
65 Default::default()
66 }
67
68 pub fn set_before_policy(&mut self, name: &str, policy: PolicyFn) {
72 self.before_policies.insert(name.to_string(), policy);
73 }
74
75 pub fn clear_before_policy(&mut self, name: &str) {
76 self.before_policies.remove(name);
77 }
78
79 pub fn set_obj_meta_policy(&mut self, name: &str, policy: ObjectMetaPolicyFn) {
80 self.object_meta_policies.insert(name.to_string(), policy);
81 }
82}
83
84#[derive(Debug)]
90pub struct ProxyObjectStore {
91 target: Arc<dyn ObjectStore>,
92 policy: Arc<Mutex<ProxyObjectStorePolicy>>,
93}
94
95impl ProxyObjectStore {
96 pub fn new(target: Arc<dyn ObjectStore>, policy: Arc<Mutex<ProxyObjectStorePolicy>>) -> Self {
97 Self { target, policy }
98 }
99
100 fn before_method(&self, method: &str, location: &Path) -> OSResult<()> {
101 let policy = self.policy.lock().unwrap();
102 for policy in policy.before_policies.values() {
103 policy(method, location).map_err(OSError::from)?;
104 }
105 Ok(())
106 }
107
108 fn transform_meta(&self, method: &str, meta: ObjectMeta) -> OSResult<ObjectMeta> {
109 let policy = self.policy.lock().unwrap();
110 let mut meta = meta;
111 for policy in policy.object_meta_policies.values() {
112 meta = policy(method, meta).map_err(OSError::from)?;
113 }
114 Ok(meta)
115 }
116}
117
118impl std::fmt::Display for ProxyObjectStore {
119 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
120 write!(f, "ProxyObjectStore({})", self.target)
121 }
122}
123
124#[async_trait]
125impl ObjectStore for ProxyObjectStore {
126 async fn put_opts(
127 &self,
128 location: &Path,
129 bytes: PutPayload,
130 opts: PutOptions,
131 ) -> OSResult<PutResult> {
132 self.before_method("put", location)?;
133 self.target.put_opts(location, bytes, opts).await
134 }
135
136 async fn put_multipart_opts(
137 &self,
138 location: &Path,
139 opts: PutMultipartOptions,
140 ) -> OSResult<Box<dyn MultipartUpload>> {
141 self.before_method("put_multipart", location)?;
142 self.target.put_multipart_opts(location, opts).await
143 }
144
145 async fn get_opts(&self, location: &Path, options: GetOptions) -> OSResult<GetResult> {
146 self.before_method("get_opts", location)?;
147 self.target.get_opts(location, options).await
148 }
149
150 async fn get_range(&self, location: &Path, range: Range<u64>) -> OSResult<Bytes> {
151 self.before_method("get_range", location)?;
152 self.target.get_range(location, range).await
153 }
154
155 async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> OSResult<Vec<Bytes>> {
156 self.before_method("get_ranges", location)?;
157 self.target.get_ranges(location, ranges).await
158 }
159
160 async fn head(&self, location: &Path) -> OSResult<ObjectMeta> {
161 self.before_method("head", location)?;
162 let meta = self.target.head(location).await?;
163 self.transform_meta("head", meta)
164 }
165
166 async fn delete(&self, location: &Path) -> OSResult<()> {
167 self.before_method("delete", location)?;
168 self.target.delete(location).await
169 }
170
171 fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, OSResult<ObjectMeta>> {
172 let target = self.target.clone();
173 let policy = Arc::clone(&self.policy);
174
175 target
176 .list(prefix)
177 .and_then(move |meta| {
178 let policy = policy.lock().unwrap();
179 let mut meta = meta;
180 for p in policy.object_meta_policies.values() {
181 meta = p("list", meta).map_err(OSError::from).unwrap();
182 }
183 future::ready(Ok(meta))
184 })
185 .boxed()
186 }
187
188 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> OSResult<ListResult> {
189 self.target.list_with_delimiter(prefix).await
190 }
191
192 async fn copy(&self, from: &Path, to: &Path) -> OSResult<()> {
193 self.before_method("copy", from)?;
194 self.target.copy(from, to).await
195 }
196
197 async fn rename(&self, from: &Path, to: &Path) -> OSResult<()> {
198 self.before_method("rename", from)?;
199 self.target.rename(from, to).await
200 }
201
202 async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> OSResult<()> {
203 self.before_method("copy_if_not_exists", from)?;
204 self.target.copy_if_not_exists(from, to).await
205 }
206}