lance_core/utils/
testing.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Testing utilities
5
6use crate::Result;
7use async_trait::async_trait;
8use bytes::Bytes;
9use futures::stream::BoxStream;
10use futures::{StreamExt, TryStreamExt};
11use object_store::path::Path;
12use object_store::{
13    Error as OSError, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
14    PutMultipartOptions, PutOptions, PutPayload, PutResult, Result as OSResult,
15};
16use std::collections::HashMap;
17use std::fmt::Debug;
18use std::future;
19use std::ops::Range;
20use std::sync::{Arc, Mutex};
21
22// A policy function takes in the name of the operation (e.g. "put") and the location
23// that is being accessed / modified and returns an optional error.
24pub trait PolicyFnT: Fn(&str, &Path) -> Result<()> + Send + Sync {}
25impl<F> PolicyFnT for F where F: Fn(&str, &Path) -> Result<()> + Send + Sync {}
26impl Debug for dyn PolicyFnT {
27    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28        write!(f, "PolicyFn")
29    }
30}
31type PolicyFn = Arc<dyn PolicyFnT>;
32
33// These policy functions receive (and optionally transform) an ObjectMeta
34// They apply to functions that list file info
35pub trait ObjectMetaPolicyFnT: Fn(&str, ObjectMeta) -> Result<ObjectMeta> + Send + Sync {}
36impl<F> ObjectMetaPolicyFnT for F where F: Fn(&str, ObjectMeta) -> Result<ObjectMeta> + Send + Sync {}
37impl Debug for dyn ObjectMetaPolicyFnT {
38    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39        write!(f, "PolicyFn")
40    }
41}
42type ObjectMetaPolicyFn = Arc<dyn ObjectMetaPolicyFnT>;
43
44/// A policy container, meant to be shared between test code and the proxy object store.
45///
46/// This container allows you to configure policies that should apply to the proxied calls.
47///
48/// Typically, you would use this to simulate I/O errors or mock out data.
49///
50/// Currently, for simplicity, we only proxy calls that involve some kind of path.  Calls
51/// to copy functions, which have a src and dst, will provide the source to the policy
52#[derive(Debug, Default)]
53pub struct ProxyObjectStorePolicy {
54    /// Policies which run before a method is invoked.  If the policy returns
55    /// an error then the target method will not be invoked and the error will
56    /// be returned instead.
57    before_policies: HashMap<String, PolicyFn>,
58    /// Policies which run after calls that return ObjectMeta.  The policy can
59    /// transform the returned ObjectMeta to mock out file listing results.
60    object_meta_policies: HashMap<String, ObjectMetaPolicyFn>,
61}
62
63impl ProxyObjectStorePolicy {
64    pub fn new() -> Self {
65        Default::default()
66    }
67
68    /// Set a new policy with the given name
69    ///
70    /// The name can be used to later remove this policy
71    pub fn set_before_policy(&mut self, name: &str, policy: PolicyFn) {
72        self.before_policies.insert(name.to_string(), policy);
73    }
74
75    pub fn clear_before_policy(&mut self, name: &str) {
76        self.before_policies.remove(name);
77    }
78
79    pub fn set_obj_meta_policy(&mut self, name: &str, policy: ObjectMetaPolicyFn) {
80        self.object_meta_policies.insert(name.to_string(), policy);
81    }
82}
83
84/// A proxy object store
85///
86/// This store wraps another object store and applies the given policy to all calls
87/// made to the underlying store.  This can be used to simulate failures or, perhaps
88/// in the future, to mock out results or provide other fine-grained control.
89#[derive(Debug)]
90pub struct ProxyObjectStore {
91    target: Arc<dyn ObjectStore>,
92    policy: Arc<Mutex<ProxyObjectStorePolicy>>,
93}
94
95impl ProxyObjectStore {
96    pub fn new(target: Arc<dyn ObjectStore>, policy: Arc<Mutex<ProxyObjectStorePolicy>>) -> Self {
97        Self { target, policy }
98    }
99
100    fn before_method(&self, method: &str, location: &Path) -> OSResult<()> {
101        let policy = self.policy.lock().unwrap();
102        for policy in policy.before_policies.values() {
103            policy(method, location).map_err(OSError::from)?;
104        }
105        Ok(())
106    }
107
108    fn transform_meta(&self, method: &str, meta: ObjectMeta) -> OSResult<ObjectMeta> {
109        let policy = self.policy.lock().unwrap();
110        let mut meta = meta;
111        for policy in policy.object_meta_policies.values() {
112            meta = policy(method, meta).map_err(OSError::from)?;
113        }
114        Ok(meta)
115    }
116}
117
118impl std::fmt::Display for ProxyObjectStore {
119    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
120        write!(f, "ProxyObjectStore({})", self.target)
121    }
122}
123
124#[async_trait]
125impl ObjectStore for ProxyObjectStore {
126    async fn put_opts(
127        &self,
128        location: &Path,
129        bytes: PutPayload,
130        opts: PutOptions,
131    ) -> OSResult<PutResult> {
132        self.before_method("put", location)?;
133        self.target.put_opts(location, bytes, opts).await
134    }
135
136    async fn put_multipart_opts(
137        &self,
138        location: &Path,
139        opts: PutMultipartOptions,
140    ) -> OSResult<Box<dyn MultipartUpload>> {
141        self.before_method("put_multipart", location)?;
142        self.target.put_multipart_opts(location, opts).await
143    }
144
145    async fn get_opts(&self, location: &Path, options: GetOptions) -> OSResult<GetResult> {
146        self.before_method("get_opts", location)?;
147        self.target.get_opts(location, options).await
148    }
149
150    async fn get_range(&self, location: &Path, range: Range<u64>) -> OSResult<Bytes> {
151        self.before_method("get_range", location)?;
152        self.target.get_range(location, range).await
153    }
154
155    async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> OSResult<Vec<Bytes>> {
156        self.before_method("get_ranges", location)?;
157        self.target.get_ranges(location, ranges).await
158    }
159
160    async fn head(&self, location: &Path) -> OSResult<ObjectMeta> {
161        self.before_method("head", location)?;
162        let meta = self.target.head(location).await?;
163        self.transform_meta("head", meta)
164    }
165
166    async fn delete(&self, location: &Path) -> OSResult<()> {
167        self.before_method("delete", location)?;
168        self.target.delete(location).await
169    }
170
171    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, OSResult<ObjectMeta>> {
172        let target = self.target.clone();
173        let policy = Arc::clone(&self.policy);
174
175        target
176            .list(prefix)
177            .and_then(move |meta| {
178                let policy = policy.lock().unwrap();
179                let mut meta = meta;
180                for p in policy.object_meta_policies.values() {
181                    meta = p("list", meta).map_err(OSError::from).unwrap();
182                }
183                future::ready(Ok(meta))
184            })
185            .boxed()
186    }
187
188    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> OSResult<ListResult> {
189        self.target.list_with_delimiter(prefix).await
190    }
191
192    async fn copy(&self, from: &Path, to: &Path) -> OSResult<()> {
193        self.before_method("copy", from)?;
194        self.target.copy(from, to).await
195    }
196
197    async fn rename(&self, from: &Path, to: &Path) -> OSResult<()> {
198        self.before_method("rename", from)?;
199        self.target.rename(from, to).await
200    }
201
202    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> OSResult<()> {
203        self.before_method("copy_if_not_exists", from)?;
204        self.target.copy_if_not_exists(from, to).await
205    }
206}