tantivy_object_store/
lib.rs

1// Copyright 2023 Lance Developers.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Object store (S3, GCS, etc.) support for tantivy.
16
17use async_trait::async_trait;
18use log::debug;
19use object_store::{local::LocalFileSystem, ObjectStore};
20use std::{
21    fs::File,
22    io::Write,
23    ops::Range,
24    path::{Path, PathBuf},
25    sync::{
26        atomic::{AtomicBool, Ordering},
27        Arc,
28    },
29};
30use tantivy::{
31    directory::{
32        self,
33        error::{DeleteError, LockError, OpenReadError, OpenWriteError},
34        AntiCallToken, Directory, DirectoryLock, FileHandle, OwnedBytes, TerminatingWrite,
35        WatchCallback, WatchHandle, WritePtr,
36    },
37    HasLen,
38};
39
40use tokio::io::{AsyncReadExt, AsyncWriteExt};
41
42use std::sync::Mutex;
43
44#[derive(Debug, Clone)]
45struct ObjectStoreDirectory {
46    store: Arc<dyn ObjectStore>,
47    base_path: String,
48    read_version: Option<u64>,
49    write_version: u64,
50
51    cache_loc: Arc<PathBuf>,
52
53    local_fs: Arc<LocalFileSystem>,
54
55    rt: Arc<tokio::runtime::Runtime>,
56    atomic_rw_lock: Arc<Mutex<()>>,
57}
58
59#[derive(Debug)]
60struct ObjectStoreFileHandle {
61    store: Arc<dyn ObjectStore>,
62    path: object_store::path::Path,
63    // We need to store this becasue the HasLen trait doesn't return a Result
64    // We need to do the IO at construction time
65    len: usize,
66
67    rt: Arc<tokio::runtime::Runtime>,
68}
69
70impl ObjectStoreFileHandle {
71    pub fn new(
72        store: Arc<dyn ObjectStore>,
73        path: object_store::path::Path,
74        len: usize,
75        rt: Arc<tokio::runtime::Runtime>,
76    ) -> Self {
77        Self {
78            store,
79            path,
80            len,
81            rt,
82        }
83    }
84}
85
86impl HasLen for ObjectStoreFileHandle {
87    fn len(&self) -> usize {
88        self.len
89    }
90}
91
92#[async_trait]
93impl FileHandle for ObjectStoreFileHandle {
94    fn read_bytes(&self, range: Range<usize>) -> std::io::Result<OwnedBytes> {
95        let handle = self.rt.handle();
96        handle.block_on(async { self.read_bytes_async(range).await })
97    }
98
99    #[doc(hidden)]
100    async fn read_bytes_async(&self, byte_range: Range<usize>) -> std::io::Result<OwnedBytes> {
101        debug!("read_bytes_async: {:?} {:?}", self.path, byte_range);
102        let bytes = self.store.get_range(&self.path, byte_range).await?;
103
104        Ok(OwnedBytes::new(bytes.to_vec()))
105    }
106}
107
108// super dumb implementation of a write handle
109// write to local and upload at once
110struct ObjectStoreWriteHandle {
111    store: Arc<dyn ObjectStore>,
112    location: object_store::path::Path,
113    local_path: Arc<PathBuf>,
114
115    write_handle: File,
116    shutdown: AtomicBool,
117    rt: Arc<tokio::runtime::Runtime>,
118}
119
120impl ObjectStoreWriteHandle {
121    pub fn new(
122        store: Arc<dyn ObjectStore>,
123        location: object_store::path::Path,
124        cache_loc: Arc<PathBuf>,
125        rt: Arc<tokio::runtime::Runtime>,
126    ) -> Result<Self, std::io::Error> {
127        let local_path = cache_loc.join(location.as_ref());
128        debug!("creating write handle for {:?}", local_path);
129        let path = Path::new(&local_path);
130        // create the necessary dir path for caching
131        std::fs::create_dir_all(path.parent().ok_or(std::io::Error::new(
132            std::io::ErrorKind::Other,
133            "unable to create parent dir for cache",
134        ))?)?;
135        let f = File::create(local_path.clone())?;
136
137        Ok(Self {
138            store,
139            location,
140            local_path: Arc::new(local_path),
141            write_handle: f,
142            shutdown: AtomicBool::new(false),
143            rt,
144        })
145    }
146}
147
148impl Write for ObjectStoreWriteHandle {
149    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
150        if self.shutdown.load(Ordering::SeqCst) {
151            return Err(std::io::Error::new(
152                std::io::ErrorKind::BrokenPipe,
153                "write handle has been shutdown",
154            ));
155        }
156
157        self.write_handle.write(buf)
158    }
159
160    fn flush(&mut self) -> std::io::Result<()> {
161        if self.shutdown.load(Ordering::SeqCst) {
162            return Err(std::io::Error::new(
163                std::io::ErrorKind::BrokenPipe,
164                "write handle has been shutdown",
165            ));
166        }
167
168        self.write_handle.flush()
169    }
170}
171
172impl TerminatingWrite for ObjectStoreWriteHandle {
173    fn terminate_ref(&mut self, _: AntiCallToken) -> std::io::Result<()> {
174        let res = self.flush();
175        self.shutdown.store(true, Ordering::SeqCst);
176
177        let result: Result<(), std::io::Error> = self.rt.block_on(async {
178            let mut f = tokio::fs::File::open(self.local_path.as_path()).await?;
179
180            let (_, mut sink) = self.store.put_multipart(&self.location).await?;
181            // 1 mb blocks
182            let mut buf = vec![0; 1024 * 1024];
183
184            loop {
185                let n = f.read(&mut buf).await?;
186                if n == 0 {
187                    break;
188                }
189                sink.write_all(&buf[..n]).await?;
190            }
191
192            sink.shutdown().await?;
193
194            Ok(())
195        });
196
197        result?;
198
199        res
200    }
201}
202
203impl ObjectStoreDirectory {
204    fn to_object_path(&self, path: &Path) -> Result<object_store::path::Path, std::io::Error> {
205        let p = path
206            .to_str()
207            .ok_or(std::io::Error::new(
208                std::io::ErrorKind::InvalidInput,
209                "non-utf8 path",
210            ))?
211            .to_string();
212
213        Ok(object_store::path::Path::from(format!(
214            "{}/{}",
215            self.base_path.clone(),
216            p
217        )))
218    }
219
220    fn head(&self, path: &Path) -> Result<object_store::ObjectMeta, OpenReadError> {
221        let location = self
222            .to_object_path(path)
223            .map_err(|e| OpenReadError::wrap_io_error(e, path.to_path_buf()))?;
224        let handle = self.rt.handle();
225        handle
226            .block_on(async { self.store.head(&location).await })
227            .map_err(|e| match e {
228                object_store::Error::NotFound { .. } => {
229                    OpenReadError::FileDoesNotExist(path.to_path_buf())
230                }
231                _ => OpenReadError::wrap_io_error(
232                    std::io::Error::new(std::io::ErrorKind::Other, format!("{:?}", e)),
233                    path.to_path_buf(),
234                ),
235            })
236    }
237}
238
239trait Lock: Send + Sync + 'static {}
240struct NoOpLock {}
241impl NoOpLock {
242    pub fn new() -> Box<Self> {
243        Box::new(Self {})
244    }
245}
246impl Lock for NoOpLock {}
247
248impl Directory for ObjectStoreDirectory {
249    fn get_file_handle(&self, path: &Path) -> Result<Arc<dyn FileHandle>, OpenReadError> {
250        debug!("get_file_handle({:?})", path);
251        let location = self
252            .to_object_path(path)
253            .map_err(|e| OpenReadError::wrap_io_error(e, path.to_path_buf()))?;
254
255        // Check if the file is in local upload cache
256        let cache_path = self.cache_loc.join(location.as_ref());
257        let cache_path = object_store::path::Path::from(cache_path.to_string_lossy().as_ref());
258        if let Ok(meta) = self
259            .rt
260            .block_on(async { self.local_fs.head(&cache_path).await })
261        {
262            debug!("upload cache hit: {:?}", path);
263            return Ok(Arc::new(ObjectStoreFileHandle::new(
264                self.local_fs.clone(),
265                cache_path,
266                meta.size,
267                self.rt.clone(),
268            )));
269        }
270
271        let len = self.head(path)?.size;
272
273        Ok(Arc::new(ObjectStoreFileHandle::new(
274            self.store.clone(),
275            location,
276            len,
277            self.rt.clone(),
278        )))
279    }
280
281    fn atomic_read(&self, path: &Path) -> Result<Vec<u8>, OpenReadError> {
282        debug!("atomic_read({:?})", path);
283        if path != Path::new("meta.json") && path != Path::new(".managed.json") {
284            // Just blow up
285            unimplemented!("Only meta.json is supported, but got {:?}", path)
286        }
287
288        // Inject versioning into path -- we want to enforce CoW here
289        // if the write verison exist, read version has no effect
290        // if the write version doesn't exist we read from the old version
291
292        let buf = path.to_string_lossy();
293        let path_str = format!("{}.{}", buf, self.write_version);
294
295        // Found write version already valid, read from the write version
296        if let Ok(f) = self.get_file_handle(Path::new(&path_str)) {
297            return Ok(f
298                .read_bytes(0..f.len())
299                .map_err(|e| OpenReadError::wrap_io_error(e, path.to_path_buf()))?
300                .to_vec());
301        }
302
303        // No read version to copy from, return DNE
304        if self.read_version.is_none() {
305            return Err(OpenReadError::FileDoesNotExist(path.to_path_buf()));
306        }
307
308        let buf = path.to_string_lossy();
309        let path_str = format!(
310            "{}.{}",
311            buf,
312            self.read_version.expect("already checked exists")
313        );
314        let path = Path::new(&path_str);
315
316        // lock so no one can write a dirty version
317        let _lock = self.atomic_rw_lock.lock().unwrap();
318        let f = self.get_file_handle(path)?;
319        Ok(f.read_bytes(0..f.len())
320            .map_err(|e| OpenReadError::wrap_io_error(e, path.to_path_buf()))?
321            .to_vec())
322    }
323
324    fn exists(&self, path: &std::path::Path) -> Result<bool, OpenReadError> {
325        match self.head(path) {
326            Ok(_) => Ok(true),
327            Err(OpenReadError::FileDoesNotExist(_)) => Ok(false),
328            Err(e) => Err(e),
329        }
330    }
331
332    // NOTE: the only thing that needs atomic write is the meta.json file
333    // we add versioning here in this interface to load different versions
334    // of the meta.json file at Directory construction time
335
336    fn atomic_write(&self, path: &Path, data: &[u8]) -> std::io::Result<()> {
337        debug!("atomic_write({:?})", path);
338        if path != Path::new("meta.json") && path != Path::new(".managed.json") {
339            // Just blow up
340            unimplemented!("Only meta.json is supported")
341        }
342
343        // Inject versioning into path
344        let buf = path.to_string_lossy();
345        let path_str = format!("{}.{}", buf, self.write_version);
346        let path = Path::new(&path_str);
347
348        let location = self.to_object_path(path)?;
349
350        debug!("true location: {:?}", location);
351
352        // Lock so no one can read a dirty version
353        let _lock = self.atomic_rw_lock.lock().unwrap();
354        self.rt.handle().block_on(async {
355            self.store
356                .put(&location, bytes::Bytes::from(data.to_vec()))
357                .await
358        })?;
359
360        Ok(())
361    }
362
363    fn delete(&self, _: &Path) -> Result<(), DeleteError> {
364        // Don't actually garbage collect since we want to have versioning of the meta.json file
365        Ok(())
366    }
367
368    fn open_write(&self, path: &Path) -> Result<WritePtr, OpenWriteError> {
369        debug!("open_write({:?})", path);
370        let location = self
371            .to_object_path(path)
372            .map_err(|e| OpenWriteError::wrap_io_error(e, path.to_path_buf()))?;
373
374        debug!("true location: {:?}", location);
375
376        let write_handle = Box::new(
377            ObjectStoreWriteHandle::new(
378                self.store.clone(),
379                location,
380                self.cache_loc.clone(),
381                self.rt.clone(),
382            )
383            .map_err(|e| OpenWriteError::wrap_io_error(e, path.to_path_buf()))?,
384        );
385
386        Ok(WritePtr::new(write_handle))
387    }
388
389    fn sync_directory(&self) -> std::io::Result<()> {
390        // Noop as synchronization is handled by the object store
391        Ok(())
392    }
393
394    fn watch(&self, _: WatchCallback) -> tantivy::Result<WatchHandle> {
395        // We have reload mechanism from else where in the system
396        // A sinlge index load will always be immutable
397        Ok(WatchHandle::empty())
398    }
399
400    fn acquire_lock(&self, _: &directory::Lock) -> Result<DirectoryLock, LockError> {
401        // We will manually garueentee index RW safety
402        Ok(DirectoryLock::from(NoOpLock::new()))
403    }
404}
405
406/// Create a new object store directory that implements CoW for the meta.json file
407///
408/// # Arguments
409///
410/// * store - An object store object
411///
412/// * base_path - The base path to store the index, relative to the object store root
413///
414/// * read_version - The version of the meta.json file to read from if a write version hasn't been created.
415/// Can not be greater than the write version. Has no effect if there is a write version. Index will be
416/// built from scratch if None is provided.
417///
418/// * write_version - The version of the meta.json file to write to
419///
420/// * cache_loc - The location to cache the meta.json file. If not provided, a random UUID will be used.
421/// This string is used for caching uploaded artifacts under /tmp/{cache_loc} directory
422///
423/// * rt - An optional tokio runtime to use for async operations. If not provided, a new runtime will be created.
424/// NOTE: if you already run from an async context, dropping the returned Directory will cause the runtime to panic
425/// as it will attempt to shutdown the runtime inside an async context.
426///
427/// # Example
428/// ```
429/// use std::sync::Arc;
430///
431/// use object_store::local::LocalFileSystem;
432/// use tantivy::{Index, IndexSettings, schema::{Schema, STORED, STRING, TEXT}};
433/// use tantivy_object_store::new_object_store_directory;
434///
435/// let store = Arc::new(LocalFileSystem::new());
436/// let base_path = format!("/tmp/{}", uuid::Uuid::new_v4());
437/// let dir = new_object_store_directory(store, &base_path, Some(0), 1, None, None).unwrap();
438///
439/// let mut schema_builder = Schema::builder();
440/// let id_field = schema_builder.add_text_field("id", STRING);
441/// let text_field = schema_builder.add_text_field("text", TEXT | STORED);
442/// let schema = schema_builder.build();
443///
444/// let idx = tantivy::Index::create(dir, schema.clone(), IndexSettings::default()).unwrap();
445///
446///
447pub fn new_object_store_directory(
448    store: Arc<dyn ObjectStore>,
449    base_path: &str,
450    read_version: Option<u64>,
451    write_version: u64,
452    cache_loc: Option<&str>,
453    rt: Option<Arc<tokio::runtime::Runtime>>,
454) -> Result<Box<dyn Directory>, std::io::Error> {
455    if let Some(read_version) = read_version {
456        if read_version > write_version {
457            return Err(std::io::Error::new(
458                std::io::ErrorKind::InvalidInput,
459                "read version cannot be greater than write version",
460            ));
461        }
462    }
463
464    let cache_loc = cache_loc
465        .map(|s| Arc::new(Path::new(&s).to_owned()))
466        .unwrap_or(Arc::new(tempfile::tempdir()?.path().to_owned()));
467
468    Ok(Box::new(ObjectStoreDirectory {
469        store,
470        base_path: base_path.to_string(),
471        read_version,
472        write_version,
473        local_fs: Arc::new(LocalFileSystem::new()),
474        cache_loc,
475        rt: rt.unwrap_or(Arc::new(
476            tokio::runtime::Builder::new_multi_thread()
477                .enable_all()
478                .build()?,
479        )),
480
481        atomic_rw_lock: Arc::new(Mutex::new(())),
482    }))
483}
484
485#[cfg(test)]
486mod test {
487    use log::info;
488    use object_store::local::LocalFileSystem;
489    use std::sync::Arc;
490    use tantivy::{
491        doc,
492        schema::{Schema, STORED, STRING, TEXT},
493        IndexSettings,
494    };
495
496    use crate::new_object_store_directory;
497
498    #[test]
499    fn test_full_workflow() {
500        env_logger::init();
501
502        let store = Arc::new(LocalFileSystem::new());
503
504        let base_path = format!("/tmp/{}", uuid::Uuid::new_v4().hyphenated());
505
506        let dir =
507            new_object_store_directory(store.clone(), &base_path, None, 0, None, None).unwrap();
508
509        let mut schema_builder = Schema::builder();
510        let id_field = schema_builder.add_text_field("id", STRING);
511        let text_field = schema_builder.add_text_field("text", TEXT | STORED);
512        let schema = schema_builder.build();
513
514        info!("Creating index");
515        let idx = tantivy::Index::create(dir, schema.clone(), IndexSettings::default()).unwrap();
516
517        info!("Creating writer");
518        let mut writer = idx.writer(1024 * 1024 * 64).unwrap();
519        info!("Write doc 1");
520        writer
521            .add_document(doc!(
522                id_field => "1",
523                text_field => "hello world"
524            ))
525            .unwrap();
526        info!("Write doc 2");
527        writer
528            .add_document(doc!(
529                id_field => "2",
530                text_field => "Deus Ex"
531            ))
532            .unwrap();
533        info!("COMMIT!");
534        writer.commit().unwrap();
535
536        std::mem::drop(writer);
537        std::mem::drop(idx);
538
539        // try open again and add some data
540        let dir =
541            new_object_store_directory(store.clone(), &base_path, Some(0), 1, None, None).unwrap();
542
543        info!("Open index");
544        let idx = tantivy::Index::open(dir).unwrap();
545
546        info!("Creating writer");
547        let mut writer = idx.writer(1024 * 1024 * 64).unwrap();
548        info!("Write doc 3");
549        writer
550            .add_document(doc!(
551                id_field => "3",
552                text_field => "bye bye"
553            ))
554            .unwrap();
555        info!("COMMIT!");
556        writer.commit().unwrap();
557        info!("wait for merging threads");
558        writer.wait_merging_threads().unwrap();
559
560        std::mem::drop(idx);
561
562        // open and search
563        let dir =
564            new_object_store_directory(store.clone(), &base_path, None, 0, None, None).unwrap();
565
566        info!("Open index");
567        let s3_idx = tantivy::Index::open(dir).unwrap();
568        let query_parser =
569            tantivy::query::QueryParser::for_index(&s3_idx, vec![id_field, text_field]);
570        let searcher = s3_idx.reader().unwrap().searcher();
571
572        info!("searching 1");
573        let query = query_parser.parse_query("hello").unwrap();
574        let top_docs = searcher
575            .search(&query, &tantivy::collector::TopDocs::with_limit(10))
576            .unwrap();
577        assert_eq!(top_docs.len(), 1);
578        let doc = top_docs.get(0).unwrap().1;
579        let retrieved_doc = searcher.doc(doc).unwrap();
580        // we only store the text field, so there won't be an id field
581        assert_eq!(
582            retrieved_doc,
583            doc!(
584                text_field => "hello world"
585            )
586        );
587
588        info!("searching 2");
589        // No result -- not in this version
590        let query = query_parser.parse_query("bye").unwrap();
591        let top_docs = searcher
592            .search(&query, &tantivy::collector::TopDocs::with_limit(10))
593            .unwrap();
594        assert_eq!(top_docs.len(), 0);
595
596        info!("searching 3");
597        // finds the other doc
598        let query = query_parser.parse_query("ex").unwrap();
599        let top_docs = searcher
600            .search(&query, &tantivy::collector::TopDocs::with_limit(10))
601            .unwrap();
602
603        assert_eq!(top_docs.len(), 1);
604        let doc = top_docs.get(0).unwrap().1;
605        let retrieved_doc = searcher.doc(doc).unwrap();
606        // we only store the text field, so there won't be an id field
607        assert_eq!(
608            retrieved_doc,
609            doc!(
610                text_field => "Deus Ex"
611            )
612        );
613
614        // open a differnet version and search
615        let dir =
616            new_object_store_directory(store.clone(), &base_path, None, 1, None, None).unwrap();
617        info!("Open Index");
618        let s3_idx = tantivy::Index::open(dir).unwrap();
619        let searcher = s3_idx.reader().unwrap().searcher();
620
621        info!("searching 4");
622        // is in the newer version
623        let query = query_parser.parse_query("bye").unwrap();
624        let top_docs = searcher
625            .search(&query, &tantivy::collector::TopDocs::with_limit(10))
626            .unwrap();
627        assert_eq!(top_docs.len(), 1);
628    }
629}