Skip to main content

lance_core/utils/
testing.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Testing utilities
5
6use crate::Result;
7use async_trait::async_trait;
8use bytes::Bytes;
9use futures::stream::BoxStream;
10use futures::{StreamExt, TryStreamExt};
11use object_store::path::Path;
12use object_store::{
13    CopyOptions, Error as OSError, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta,
14    ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, RenameOptions,
15    Result as OSResult,
16};
17use std::collections::HashMap;
18use std::fmt::Debug;
19use std::future;
20use std::ops::Range;
21use std::pin::Pin;
22use std::sync::{
23    Arc, Mutex,
24    atomic::{AtomicUsize, Ordering},
25};
26
27// A policy function takes in the name of the operation (e.g. "put") and the location
28// that is being accessed / modified and returns an optional error.
29pub trait PolicyFnT: Fn(&str, &Path) -> Result<()> + Send + Sync {}
30impl<F> PolicyFnT for F where F: Fn(&str, &Path) -> Result<()> + Send + Sync {}
31impl Debug for dyn PolicyFnT {
32    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
33        write!(f, "PolicyFn")
34    }
35}
36type PolicyFn = Arc<dyn PolicyFnT>;
37
38// These policy functions receive (and optionally transform) an ObjectMeta
39// They apply to functions that list file info
40pub trait ObjectMetaPolicyFnT: Fn(&str, ObjectMeta) -> Result<ObjectMeta> + Send + Sync {}
41impl<F> ObjectMetaPolicyFnT for F where F: Fn(&str, ObjectMeta) -> Result<ObjectMeta> + Send + Sync {}
42impl Debug for dyn ObjectMetaPolicyFnT {
43    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44        write!(f, "PolicyFn")
45    }
46}
47type ObjectMetaPolicyFn = Arc<dyn ObjectMetaPolicyFnT>;
48
49/// A policy container, meant to be shared between test code and the proxy object store.
50///
51/// This container allows you to configure policies that should apply to the proxied calls.
52///
53/// Typically, you would use this to simulate I/O errors or mock out data.
54///
55/// Currently, for simplicity, we only proxy calls that involve some kind of path.  Calls
56/// to copy functions, which have a src and dst, will provide the source to the policy
57#[derive(Debug, Default)]
58pub struct ProxyObjectStorePolicy {
59    /// Policies which run before a method is invoked.  If the policy returns
60    /// an error then the target method will not be invoked and the error will
61    /// be returned instead.
62    before_policies: HashMap<String, PolicyFn>,
63    /// Policies which run after calls that return ObjectMeta.  The policy can
64    /// transform the returned ObjectMeta to mock out file listing results.
65    object_meta_policies: HashMap<String, ObjectMetaPolicyFn>,
66}
67
68impl ProxyObjectStorePolicy {
69    pub fn new() -> Self {
70        Default::default()
71    }
72
73    /// Set a new policy with the given name
74    ///
75    /// The name can be used to later remove this policy
76    pub fn set_before_policy(&mut self, name: &str, policy: PolicyFn) {
77        self.before_policies.insert(name.to_string(), policy);
78    }
79
80    pub fn clear_before_policy(&mut self, name: &str) {
81        self.before_policies.remove(name);
82    }
83
84    pub fn set_obj_meta_policy(&mut self, name: &str, policy: ObjectMetaPolicyFn) {
85        self.object_meta_policies.insert(name.to_string(), policy);
86    }
87}
88
89/// A proxy object store
90///
91/// This store wraps another object store and applies the given policy to all calls
92/// made to the underlying store.  This can be used to simulate failures or, perhaps
93/// in the future, to mock out results or provide other fine-grained control.
94#[derive(Debug)]
95pub struct ProxyObjectStore {
96    target: Arc<dyn ObjectStore>,
97    policy: Arc<Mutex<ProxyObjectStorePolicy>>,
98}
99
100impl ProxyObjectStore {
101    pub fn new(target: Arc<dyn ObjectStore>, policy: Arc<Mutex<ProxyObjectStorePolicy>>) -> Self {
102        Self { target, policy }
103    }
104
105    fn before_method(&self, method: &str, location: &Path) -> OSResult<()> {
106        let policy = self.policy.lock().unwrap();
107        for policy in policy.before_policies.values() {
108            policy(method, location).map_err(OSError::from)?;
109        }
110        Ok(())
111    }
112
113    fn transform_meta(&self, method: &str, meta: ObjectMeta) -> OSResult<ObjectMeta> {
114        let policy = self.policy.lock().unwrap();
115        let mut meta = meta;
116        for policy in policy.object_meta_policies.values() {
117            meta = policy(method, meta).map_err(OSError::from)?;
118        }
119        Ok(meta)
120    }
121}
122
123impl std::fmt::Display for ProxyObjectStore {
124    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
125        write!(f, "ProxyObjectStore({})", self.target)
126    }
127}
128
129/// An object store wrapper that counts listing operations.
130///
131/// This increments the shared counter for both `list` and `list_with_delimiter`
132/// so tests can observe all listing-based directory and version discovery calls.
133#[derive(Debug)]
134pub struct CountingObjectStore {
135    target: Arc<dyn ObjectStore>,
136    listing_count: Arc<AtomicUsize>,
137}
138
139impl CountingObjectStore {
140    pub fn new(target: Arc<dyn ObjectStore>, listing_count: Arc<AtomicUsize>) -> Self {
141        Self {
142            target,
143            listing_count,
144        }
145    }
146
147    fn record_listing(&self) {
148        self.listing_count.fetch_add(1, Ordering::SeqCst);
149    }
150
151    fn delegate_list(
152        &self,
153        prefix: Option<&Path>,
154    ) -> Pin<Box<dyn futures::Stream<Item = OSResult<ObjectMeta>> + Send>> {
155        self.target.list(prefix)
156    }
157}
158
159impl std::fmt::Display for CountingObjectStore {
160    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
161        write!(f, "CountingObjectStore({})", self.target)
162    }
163}
164
165#[async_trait]
166impl ObjectStore for ProxyObjectStore {
167    async fn put_opts(
168        &self,
169        location: &Path,
170        bytes: PutPayload,
171        opts: PutOptions,
172    ) -> OSResult<PutResult> {
173        self.before_method("put", location)?;
174        self.target.put_opts(location, bytes, opts).await
175    }
176
177    async fn put_multipart_opts(
178        &self,
179        location: &Path,
180        opts: PutMultipartOptions,
181    ) -> OSResult<Box<dyn MultipartUpload>> {
182        self.before_method("put_multipart", location)?;
183        self.target.put_multipart_opts(location, opts).await
184    }
185
186    async fn get_opts(&self, location: &Path, options: GetOptions) -> OSResult<GetResult> {
187        self.before_method("get_opts", location)?;
188        let is_head = options.head;
189        let mut result = self.target.get_opts(location, options).await?;
190        if is_head {
191            result.meta = self.transform_meta("head", result.meta)?;
192        }
193        Ok(result)
194    }
195
196    async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> OSResult<Vec<Bytes>> {
197        self.before_method("get_ranges", location)?;
198        self.target.get_ranges(location, ranges).await
199    }
200
201    fn delete_stream(
202        &self,
203        locations: BoxStream<'static, OSResult<Path>>,
204    ) -> BoxStream<'static, OSResult<Path>> {
205        let policy = Arc::clone(&self.policy);
206        let checked = locations
207            .and_then(move |location| {
208                let result = {
209                    let policy = policy.lock().unwrap();
210                    policy
211                        .before_policies
212                        .values()
213                        .try_for_each(|policy| policy("delete", &location).map_err(OSError::from))
214                };
215                future::ready(result.map(|_| location))
216            })
217            .boxed();
218        self.target.delete_stream(checked)
219    }
220
221    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, OSResult<ObjectMeta>> {
222        let target = self.target.clone();
223        let policy = Arc::clone(&self.policy);
224
225        target
226            .list(prefix)
227            .and_then(move |meta| {
228                let policy = policy.lock().unwrap();
229                let mut meta = meta;
230                for p in policy.object_meta_policies.values() {
231                    meta = p("list", meta).map_err(OSError::from).unwrap();
232                }
233                future::ready(Ok(meta))
234            })
235            .boxed()
236    }
237
238    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> OSResult<ListResult> {
239        self.target.list_with_delimiter(prefix).await
240    }
241
242    async fn copy_opts(&self, from: &Path, to: &Path, opts: CopyOptions) -> OSResult<()> {
243        self.before_method("copy", from)?;
244        self.target.copy_opts(from, to, opts).await
245    }
246
247    async fn rename_opts(&self, from: &Path, to: &Path, opts: RenameOptions) -> OSResult<()> {
248        self.before_method("rename", from)?;
249        self.target.rename_opts(from, to, opts).await
250    }
251}
252
253#[async_trait]
254impl ObjectStore for CountingObjectStore {
255    async fn put_opts(
256        &self,
257        location: &Path,
258        bytes: PutPayload,
259        opts: PutOptions,
260    ) -> OSResult<PutResult> {
261        self.target.put_opts(location, bytes, opts).await
262    }
263
264    async fn put_multipart_opts(
265        &self,
266        location: &Path,
267        opts: PutMultipartOptions,
268    ) -> OSResult<Box<dyn MultipartUpload>> {
269        self.target.put_multipart_opts(location, opts).await
270    }
271
272    async fn get_opts(&self, location: &Path, options: GetOptions) -> OSResult<GetResult> {
273        self.target.get_opts(location, options).await
274    }
275
276    async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> OSResult<Vec<Bytes>> {
277        self.target.get_ranges(location, ranges).await
278    }
279
280    fn delete_stream(
281        &self,
282        locations: BoxStream<'static, OSResult<Path>>,
283    ) -> BoxStream<'static, OSResult<Path>> {
284        self.target.delete_stream(locations)
285    }
286
287    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, OSResult<ObjectMeta>> {
288        self.record_listing();
289        self.delegate_list(prefix).boxed()
290    }
291
292    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> OSResult<ListResult> {
293        self.record_listing();
294        self.target.list_with_delimiter(prefix).await
295    }
296
297    async fn copy_opts(&self, from: &Path, to: &Path, opts: CopyOptions) -> OSResult<()> {
298        self.target.copy_opts(from, to, opts).await
299    }
300}