Skip to main content

lance_core/utils/
testing.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Testing utilities
5
6use crate::Result;
7use async_trait::async_trait;
8use bytes::Bytes;
9use futures::stream::BoxStream;
10use futures::{StreamExt, TryStreamExt};
11use object_store::path::Path;
12use object_store::{
13    Error as OSError, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
14    PutMultipartOptions, PutOptions, PutPayload, PutResult, Result as OSResult,
15};
16use std::collections::HashMap;
17use std::fmt::Debug;
18use std::future;
19use std::ops::Range;
20use std::pin::Pin;
21use std::sync::{
22    Arc, Mutex,
23    atomic::{AtomicUsize, Ordering},
24};
25
26// A policy function takes in the name of the operation (e.g. "put") and the location
27// that is being accessed / modified and returns an optional error.
28pub trait PolicyFnT: Fn(&str, &Path) -> Result<()> + Send + Sync {}
29impl<F> PolicyFnT for F where F: Fn(&str, &Path) -> Result<()> + Send + Sync {}
30impl Debug for dyn PolicyFnT {
31    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32        write!(f, "PolicyFn")
33    }
34}
35type PolicyFn = Arc<dyn PolicyFnT>;
36
37// These policy functions receive (and optionally transform) an ObjectMeta
38// They apply to functions that list file info
39pub trait ObjectMetaPolicyFnT: Fn(&str, ObjectMeta) -> Result<ObjectMeta> + Send + Sync {}
40impl<F> ObjectMetaPolicyFnT for F where F: Fn(&str, ObjectMeta) -> Result<ObjectMeta> + Send + Sync {}
41impl Debug for dyn ObjectMetaPolicyFnT {
42    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43        write!(f, "PolicyFn")
44    }
45}
46type ObjectMetaPolicyFn = Arc<dyn ObjectMetaPolicyFnT>;
47
48/// A policy container, meant to be shared between test code and the proxy object store.
49///
50/// This container allows you to configure policies that should apply to the proxied calls.
51///
52/// Typically, you would use this to simulate I/O errors or mock out data.
53///
54/// Currently, for simplicity, we only proxy calls that involve some kind of path.  Calls
55/// to copy functions, which have a src and dst, will provide the source to the policy
56#[derive(Debug, Default)]
57pub struct ProxyObjectStorePolicy {
58    /// Policies which run before a method is invoked.  If the policy returns
59    /// an error then the target method will not be invoked and the error will
60    /// be returned instead.
61    before_policies: HashMap<String, PolicyFn>,
62    /// Policies which run after calls that return ObjectMeta.  The policy can
63    /// transform the returned ObjectMeta to mock out file listing results.
64    object_meta_policies: HashMap<String, ObjectMetaPolicyFn>,
65}
66
67impl ProxyObjectStorePolicy {
68    pub fn new() -> Self {
69        Default::default()
70    }
71
72    /// Set a new policy with the given name
73    ///
74    /// The name can be used to later remove this policy
75    pub fn set_before_policy(&mut self, name: &str, policy: PolicyFn) {
76        self.before_policies.insert(name.to_string(), policy);
77    }
78
79    pub fn clear_before_policy(&mut self, name: &str) {
80        self.before_policies.remove(name);
81    }
82
83    pub fn set_obj_meta_policy(&mut self, name: &str, policy: ObjectMetaPolicyFn) {
84        self.object_meta_policies.insert(name.to_string(), policy);
85    }
86}
87
88/// A proxy object store
89///
90/// This store wraps another object store and applies the given policy to all calls
91/// made to the underlying store.  This can be used to simulate failures or, perhaps
92/// in the future, to mock out results or provide other fine-grained control.
93#[derive(Debug)]
94pub struct ProxyObjectStore {
95    target: Arc<dyn ObjectStore>,
96    policy: Arc<Mutex<ProxyObjectStorePolicy>>,
97}
98
99impl ProxyObjectStore {
100    pub fn new(target: Arc<dyn ObjectStore>, policy: Arc<Mutex<ProxyObjectStorePolicy>>) -> Self {
101        Self { target, policy }
102    }
103
104    fn before_method(&self, method: &str, location: &Path) -> OSResult<()> {
105        let policy = self.policy.lock().unwrap();
106        for policy in policy.before_policies.values() {
107            policy(method, location).map_err(OSError::from)?;
108        }
109        Ok(())
110    }
111
112    fn transform_meta(&self, method: &str, meta: ObjectMeta) -> OSResult<ObjectMeta> {
113        let policy = self.policy.lock().unwrap();
114        let mut meta = meta;
115        for policy in policy.object_meta_policies.values() {
116            meta = policy(method, meta).map_err(OSError::from)?;
117        }
118        Ok(meta)
119    }
120}
121
122impl std::fmt::Display for ProxyObjectStore {
123    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
124        write!(f, "ProxyObjectStore({})", self.target)
125    }
126}
127
128/// An object store wrapper that counts listing operations.
129///
130/// This increments the shared counter for both `list` and `list_with_delimiter`
131/// so tests can observe all listing-based directory and version discovery calls.
132#[derive(Debug)]
133pub struct CountingObjectStore {
134    target: Arc<dyn ObjectStore>,
135    listing_count: Arc<AtomicUsize>,
136}
137
138impl CountingObjectStore {
139    pub fn new(target: Arc<dyn ObjectStore>, listing_count: Arc<AtomicUsize>) -> Self {
140        Self {
141            target,
142            listing_count,
143        }
144    }
145
146    fn record_listing(&self) {
147        self.listing_count.fetch_add(1, Ordering::SeqCst);
148    }
149
150    fn delegate_list(
151        &self,
152        prefix: Option<&Path>,
153    ) -> Pin<Box<dyn futures::Stream<Item = OSResult<ObjectMeta>> + Send>> {
154        self.target.list(prefix)
155    }
156}
157
158impl std::fmt::Display for CountingObjectStore {
159    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
160        write!(f, "CountingObjectStore({})", self.target)
161    }
162}
163
164#[async_trait]
165impl ObjectStore for ProxyObjectStore {
166    async fn put_opts(
167        &self,
168        location: &Path,
169        bytes: PutPayload,
170        opts: PutOptions,
171    ) -> OSResult<PutResult> {
172        self.before_method("put", location)?;
173        self.target.put_opts(location, bytes, opts).await
174    }
175
176    async fn put_multipart_opts(
177        &self,
178        location: &Path,
179        opts: PutMultipartOptions,
180    ) -> OSResult<Box<dyn MultipartUpload>> {
181        self.before_method("put_multipart", location)?;
182        self.target.put_multipart_opts(location, opts).await
183    }
184
185    async fn get_opts(&self, location: &Path, options: GetOptions) -> OSResult<GetResult> {
186        self.before_method("get_opts", location)?;
187        self.target.get_opts(location, options).await
188    }
189
190    async fn get_range(&self, location: &Path, range: Range<u64>) -> OSResult<Bytes> {
191        self.before_method("get_range", location)?;
192        self.target.get_range(location, range).await
193    }
194
195    async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> OSResult<Vec<Bytes>> {
196        self.before_method("get_ranges", location)?;
197        self.target.get_ranges(location, ranges).await
198    }
199
200    async fn head(&self, location: &Path) -> OSResult<ObjectMeta> {
201        self.before_method("head", location)?;
202        let meta = self.target.head(location).await?;
203        self.transform_meta("head", meta)
204    }
205
206    async fn delete(&self, location: &Path) -> OSResult<()> {
207        self.before_method("delete", location)?;
208        self.target.delete(location).await
209    }
210
211    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, OSResult<ObjectMeta>> {
212        let target = self.target.clone();
213        let policy = Arc::clone(&self.policy);
214
215        target
216            .list(prefix)
217            .and_then(move |meta| {
218                let policy = policy.lock().unwrap();
219                let mut meta = meta;
220                for p in policy.object_meta_policies.values() {
221                    meta = p("list", meta).map_err(OSError::from).unwrap();
222                }
223                future::ready(Ok(meta))
224            })
225            .boxed()
226    }
227
228    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> OSResult<ListResult> {
229        self.target.list_with_delimiter(prefix).await
230    }
231
232    async fn copy(&self, from: &Path, to: &Path) -> OSResult<()> {
233        self.before_method("copy", from)?;
234        self.target.copy(from, to).await
235    }
236
237    async fn rename(&self, from: &Path, to: &Path) -> OSResult<()> {
238        self.before_method("rename", from)?;
239        self.target.rename(from, to).await
240    }
241
242    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> OSResult<()> {
243        self.before_method("copy_if_not_exists", from)?;
244        self.target.copy_if_not_exists(from, to).await
245    }
246}
247
248#[async_trait]
249impl ObjectStore for CountingObjectStore {
250    async fn put_opts(
251        &self,
252        location: &Path,
253        bytes: PutPayload,
254        opts: PutOptions,
255    ) -> OSResult<PutResult> {
256        self.target.put_opts(location, bytes, opts).await
257    }
258
259    async fn put_multipart_opts(
260        &self,
261        location: &Path,
262        opts: PutMultipartOptions,
263    ) -> OSResult<Box<dyn MultipartUpload>> {
264        self.target.put_multipart_opts(location, opts).await
265    }
266
267    async fn get_opts(&self, location: &Path, options: GetOptions) -> OSResult<GetResult> {
268        self.target.get_opts(location, options).await
269    }
270
271    async fn get_range(&self, location: &Path, range: Range<u64>) -> OSResult<Bytes> {
272        self.target.get_range(location, range).await
273    }
274
275    async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> OSResult<Vec<Bytes>> {
276        self.target.get_ranges(location, ranges).await
277    }
278
279    async fn head(&self, location: &Path) -> OSResult<ObjectMeta> {
280        self.target.head(location).await
281    }
282
283    async fn delete(&self, location: &Path) -> OSResult<()> {
284        self.target.delete(location).await
285    }
286
287    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, OSResult<ObjectMeta>> {
288        self.record_listing();
289        self.delegate_list(prefix).boxed()
290    }
291
292    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> OSResult<ListResult> {
293        self.record_listing();
294        self.target.list_with_delimiter(prefix).await
295    }
296
297    async fn copy(&self, from: &Path, to: &Path) -> OSResult<()> {
298        self.target.copy(from, to).await
299    }
300
301    async fn rename(&self, from: &Path, to: &Path) -> OSResult<()> {
302        self.target.rename(from, to).await
303    }
304
305    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> OSResult<()> {
306        self.target.copy_if_not_exists(from, to).await
307    }
308}