1use async_trait::async_trait;
18use log::debug;
19use object_store::{local::LocalFileSystem, ObjectStore};
20use std::{
21 fs::File,
22 io::Write,
23 ops::Range,
24 path::{Path, PathBuf},
25 sync::{
26 atomic::{AtomicBool, Ordering},
27 Arc,
28 },
29};
30use tantivy::{
31 directory::{
32 self,
33 error::{DeleteError, LockError, OpenReadError, OpenWriteError},
34 AntiCallToken, Directory, DirectoryLock, FileHandle, OwnedBytes, TerminatingWrite,
35 WatchCallback, WatchHandle, WritePtr,
36 },
37 HasLen,
38};
39
40use tokio::io::{AsyncReadExt, AsyncWriteExt};
41
42use std::sync::Mutex;
43
44#[derive(Debug, Clone)]
45struct ObjectStoreDirectory {
46 store: Arc<dyn ObjectStore>,
47 base_path: String,
48 read_version: Option<u64>,
49 write_version: u64,
50
51 cache_loc: Arc<PathBuf>,
52
53 local_fs: Arc<LocalFileSystem>,
54
55 rt: Arc<tokio::runtime::Runtime>,
56 atomic_rw_lock: Arc<Mutex<()>>,
57}
58
59#[derive(Debug)]
60struct ObjectStoreFileHandle {
61 store: Arc<dyn ObjectStore>,
62 path: object_store::path::Path,
63 len: usize,
66
67 rt: Arc<tokio::runtime::Runtime>,
68}
69
70impl ObjectStoreFileHandle {
71 pub fn new(
72 store: Arc<dyn ObjectStore>,
73 path: object_store::path::Path,
74 len: usize,
75 rt: Arc<tokio::runtime::Runtime>,
76 ) -> Self {
77 Self {
78 store,
79 path,
80 len,
81 rt,
82 }
83 }
84}
85
86impl HasLen for ObjectStoreFileHandle {
87 fn len(&self) -> usize {
88 self.len
89 }
90}
91
92#[async_trait]
93impl FileHandle for ObjectStoreFileHandle {
94 fn read_bytes(&self, range: Range<usize>) -> std::io::Result<OwnedBytes> {
95 let handle = self.rt.handle();
96 handle.block_on(async { self.read_bytes_async(range).await })
97 }
98
99 #[doc(hidden)]
100 async fn read_bytes_async(&self, byte_range: Range<usize>) -> std::io::Result<OwnedBytes> {
101 debug!("read_bytes_async: {:?} {:?}", self.path, byte_range);
102 let bytes = self.store.get_range(&self.path, byte_range).await?;
103
104 Ok(OwnedBytes::new(bytes.to_vec()))
105 }
106}
107
108struct ObjectStoreWriteHandle {
111 store: Arc<dyn ObjectStore>,
112 location: object_store::path::Path,
113 local_path: Arc<PathBuf>,
114
115 write_handle: File,
116 shutdown: AtomicBool,
117 rt: Arc<tokio::runtime::Runtime>,
118}
119
120impl ObjectStoreWriteHandle {
121 pub fn new(
122 store: Arc<dyn ObjectStore>,
123 location: object_store::path::Path,
124 cache_loc: Arc<PathBuf>,
125 rt: Arc<tokio::runtime::Runtime>,
126 ) -> Result<Self, std::io::Error> {
127 let local_path = cache_loc.join(location.as_ref());
128 debug!("creating write handle for {:?}", local_path);
129 let path = Path::new(&local_path);
130 std::fs::create_dir_all(path.parent().ok_or(std::io::Error::new(
132 std::io::ErrorKind::Other,
133 "unable to create parent dir for cache",
134 ))?)?;
135 let f = File::create(local_path.clone())?;
136
137 Ok(Self {
138 store,
139 location,
140 local_path: Arc::new(local_path),
141 write_handle: f,
142 shutdown: AtomicBool::new(false),
143 rt,
144 })
145 }
146}
147
148impl Write for ObjectStoreWriteHandle {
149 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
150 if self.shutdown.load(Ordering::SeqCst) {
151 return Err(std::io::Error::new(
152 std::io::ErrorKind::BrokenPipe,
153 "write handle has been shutdown",
154 ));
155 }
156
157 self.write_handle.write(buf)
158 }
159
160 fn flush(&mut self) -> std::io::Result<()> {
161 if self.shutdown.load(Ordering::SeqCst) {
162 return Err(std::io::Error::new(
163 std::io::ErrorKind::BrokenPipe,
164 "write handle has been shutdown",
165 ));
166 }
167
168 self.write_handle.flush()
169 }
170}
171
172impl TerminatingWrite for ObjectStoreWriteHandle {
173 fn terminate_ref(&mut self, _: AntiCallToken) -> std::io::Result<()> {
174 let res = self.flush();
175 self.shutdown.store(true, Ordering::SeqCst);
176
177 let result: Result<(), std::io::Error> = self.rt.block_on(async {
178 let mut f = tokio::fs::File::open(self.local_path.as_path()).await?;
179
180 let (_, mut sink) = self.store.put_multipart(&self.location).await?;
181 let mut buf = vec![0; 1024 * 1024];
183
184 loop {
185 let n = f.read(&mut buf).await?;
186 if n == 0 {
187 break;
188 }
189 sink.write_all(&buf[..n]).await?;
190 }
191
192 sink.shutdown().await?;
193
194 Ok(())
195 });
196
197 result?;
198
199 res
200 }
201}
202
203impl ObjectStoreDirectory {
204 fn to_object_path(&self, path: &Path) -> Result<object_store::path::Path, std::io::Error> {
205 let p = path
206 .to_str()
207 .ok_or(std::io::Error::new(
208 std::io::ErrorKind::InvalidInput,
209 "non-utf8 path",
210 ))?
211 .to_string();
212
213 Ok(object_store::path::Path::from(format!(
214 "{}/{}",
215 self.base_path.clone(),
216 p
217 )))
218 }
219
220 fn head(&self, path: &Path) -> Result<object_store::ObjectMeta, OpenReadError> {
221 let location = self
222 .to_object_path(path)
223 .map_err(|e| OpenReadError::wrap_io_error(e, path.to_path_buf()))?;
224 let handle = self.rt.handle();
225 handle
226 .block_on(async { self.store.head(&location).await })
227 .map_err(|e| match e {
228 object_store::Error::NotFound { .. } => {
229 OpenReadError::FileDoesNotExist(path.to_path_buf())
230 }
231 _ => OpenReadError::wrap_io_error(
232 std::io::Error::new(std::io::ErrorKind::Other, format!("{:?}", e)),
233 path.to_path_buf(),
234 ),
235 })
236 }
237}
238
239trait Lock: Send + Sync + 'static {}
240struct NoOpLock {}
241impl NoOpLock {
242 pub fn new() -> Box<Self> {
243 Box::new(Self {})
244 }
245}
246impl Lock for NoOpLock {}
247
248impl Directory for ObjectStoreDirectory {
249 fn get_file_handle(&self, path: &Path) -> Result<Arc<dyn FileHandle>, OpenReadError> {
250 debug!("get_file_handle({:?})", path);
251 let location = self
252 .to_object_path(path)
253 .map_err(|e| OpenReadError::wrap_io_error(e, path.to_path_buf()))?;
254
255 let cache_path = self.cache_loc.join(location.as_ref());
257 let cache_path = object_store::path::Path::from(cache_path.to_string_lossy().as_ref());
258 if let Ok(meta) = self
259 .rt
260 .block_on(async { self.local_fs.head(&cache_path).await })
261 {
262 debug!("upload cache hit: {:?}", path);
263 return Ok(Arc::new(ObjectStoreFileHandle::new(
264 self.local_fs.clone(),
265 cache_path,
266 meta.size,
267 self.rt.clone(),
268 )));
269 }
270
271 let len = self.head(path)?.size;
272
273 Ok(Arc::new(ObjectStoreFileHandle::new(
274 self.store.clone(),
275 location,
276 len,
277 self.rt.clone(),
278 )))
279 }
280
281 fn atomic_read(&self, path: &Path) -> Result<Vec<u8>, OpenReadError> {
282 debug!("atomic_read({:?})", path);
283 if path != Path::new("meta.json") && path != Path::new(".managed.json") {
284 unimplemented!("Only meta.json is supported, but got {:?}", path)
286 }
287
288 let buf = path.to_string_lossy();
293 let path_str = format!("{}.{}", buf, self.write_version);
294
295 if let Ok(f) = self.get_file_handle(Path::new(&path_str)) {
297 return Ok(f
298 .read_bytes(0..f.len())
299 .map_err(|e| OpenReadError::wrap_io_error(e, path.to_path_buf()))?
300 .to_vec());
301 }
302
303 if self.read_version.is_none() {
305 return Err(OpenReadError::FileDoesNotExist(path.to_path_buf()));
306 }
307
308 let buf = path.to_string_lossy();
309 let path_str = format!(
310 "{}.{}",
311 buf,
312 self.read_version.expect("already checked exists")
313 );
314 let path = Path::new(&path_str);
315
316 let _lock = self.atomic_rw_lock.lock().unwrap();
318 let f = self.get_file_handle(path)?;
319 Ok(f.read_bytes(0..f.len())
320 .map_err(|e| OpenReadError::wrap_io_error(e, path.to_path_buf()))?
321 .to_vec())
322 }
323
324 fn exists(&self, path: &std::path::Path) -> Result<bool, OpenReadError> {
325 match self.head(path) {
326 Ok(_) => Ok(true),
327 Err(OpenReadError::FileDoesNotExist(_)) => Ok(false),
328 Err(e) => Err(e),
329 }
330 }
331
332 fn atomic_write(&self, path: &Path, data: &[u8]) -> std::io::Result<()> {
337 debug!("atomic_write({:?})", path);
338 if path != Path::new("meta.json") && path != Path::new(".managed.json") {
339 unimplemented!("Only meta.json is supported")
341 }
342
343 let buf = path.to_string_lossy();
345 let path_str = format!("{}.{}", buf, self.write_version);
346 let path = Path::new(&path_str);
347
348 let location = self.to_object_path(path)?;
349
350 debug!("true location: {:?}", location);
351
352 let _lock = self.atomic_rw_lock.lock().unwrap();
354 self.rt.handle().block_on(async {
355 self.store
356 .put(&location, bytes::Bytes::from(data.to_vec()))
357 .await
358 })?;
359
360 Ok(())
361 }
362
363 fn delete(&self, _: &Path) -> Result<(), DeleteError> {
364 Ok(())
366 }
367
368 fn open_write(&self, path: &Path) -> Result<WritePtr, OpenWriteError> {
369 debug!("open_write({:?})", path);
370 let location = self
371 .to_object_path(path)
372 .map_err(|e| OpenWriteError::wrap_io_error(e, path.to_path_buf()))?;
373
374 debug!("true location: {:?}", location);
375
376 let write_handle = Box::new(
377 ObjectStoreWriteHandle::new(
378 self.store.clone(),
379 location,
380 self.cache_loc.clone(),
381 self.rt.clone(),
382 )
383 .map_err(|e| OpenWriteError::wrap_io_error(e, path.to_path_buf()))?,
384 );
385
386 Ok(WritePtr::new(write_handle))
387 }
388
389 fn sync_directory(&self) -> std::io::Result<()> {
390 Ok(())
392 }
393
394 fn watch(&self, _: WatchCallback) -> tantivy::Result<WatchHandle> {
395 Ok(WatchHandle::empty())
398 }
399
400 fn acquire_lock(&self, _: &directory::Lock) -> Result<DirectoryLock, LockError> {
401 Ok(DirectoryLock::from(NoOpLock::new()))
403 }
404}
405
406pub fn new_object_store_directory(
448 store: Arc<dyn ObjectStore>,
449 base_path: &str,
450 read_version: Option<u64>,
451 write_version: u64,
452 cache_loc: Option<&str>,
453 rt: Option<Arc<tokio::runtime::Runtime>>,
454) -> Result<Box<dyn Directory>, std::io::Error> {
455 if let Some(read_version) = read_version {
456 if read_version > write_version {
457 return Err(std::io::Error::new(
458 std::io::ErrorKind::InvalidInput,
459 "read version cannot be greater than write version",
460 ));
461 }
462 }
463
464 let cache_loc = cache_loc
465 .map(|s| Arc::new(Path::new(&s).to_owned()))
466 .unwrap_or(Arc::new(tempfile::tempdir()?.path().to_owned()));
467
468 Ok(Box::new(ObjectStoreDirectory {
469 store,
470 base_path: base_path.to_string(),
471 read_version,
472 write_version,
473 local_fs: Arc::new(LocalFileSystem::new()),
474 cache_loc,
475 rt: rt.unwrap_or(Arc::new(
476 tokio::runtime::Builder::new_multi_thread()
477 .enable_all()
478 .build()?,
479 )),
480
481 atomic_rw_lock: Arc::new(Mutex::new(())),
482 }))
483}
484
485#[cfg(test)]
486mod test {
487 use log::info;
488 use object_store::local::LocalFileSystem;
489 use std::sync::Arc;
490 use tantivy::{
491 doc,
492 schema::{Schema, STORED, STRING, TEXT},
493 IndexSettings,
494 };
495
496 use crate::new_object_store_directory;
497
498 #[test]
499 fn test_full_workflow() {
500 env_logger::init();
501
502 let store = Arc::new(LocalFileSystem::new());
503
504 let base_path = format!("/tmp/{}", uuid::Uuid::new_v4().hyphenated());
505
506 let dir =
507 new_object_store_directory(store.clone(), &base_path, None, 0, None, None).unwrap();
508
509 let mut schema_builder = Schema::builder();
510 let id_field = schema_builder.add_text_field("id", STRING);
511 let text_field = schema_builder.add_text_field("text", TEXT | STORED);
512 let schema = schema_builder.build();
513
514 info!("Creating index");
515 let idx = tantivy::Index::create(dir, schema.clone(), IndexSettings::default()).unwrap();
516
517 info!("Creating writer");
518 let mut writer = idx.writer(1024 * 1024 * 64).unwrap();
519 info!("Write doc 1");
520 writer
521 .add_document(doc!(
522 id_field => "1",
523 text_field => "hello world"
524 ))
525 .unwrap();
526 info!("Write doc 2");
527 writer
528 .add_document(doc!(
529 id_field => "2",
530 text_field => "Deus Ex"
531 ))
532 .unwrap();
533 info!("COMMIT!");
534 writer.commit().unwrap();
535
536 std::mem::drop(writer);
537 std::mem::drop(idx);
538
539 let dir =
541 new_object_store_directory(store.clone(), &base_path, Some(0), 1, None, None).unwrap();
542
543 info!("Open index");
544 let idx = tantivy::Index::open(dir).unwrap();
545
546 info!("Creating writer");
547 let mut writer = idx.writer(1024 * 1024 * 64).unwrap();
548 info!("Write doc 3");
549 writer
550 .add_document(doc!(
551 id_field => "3",
552 text_field => "bye bye"
553 ))
554 .unwrap();
555 info!("COMMIT!");
556 writer.commit().unwrap();
557 info!("wait for merging threads");
558 writer.wait_merging_threads().unwrap();
559
560 std::mem::drop(idx);
561
562 let dir =
564 new_object_store_directory(store.clone(), &base_path, None, 0, None, None).unwrap();
565
566 info!("Open index");
567 let s3_idx = tantivy::Index::open(dir).unwrap();
568 let query_parser =
569 tantivy::query::QueryParser::for_index(&s3_idx, vec![id_field, text_field]);
570 let searcher = s3_idx.reader().unwrap().searcher();
571
572 info!("searching 1");
573 let query = query_parser.parse_query("hello").unwrap();
574 let top_docs = searcher
575 .search(&query, &tantivy::collector::TopDocs::with_limit(10))
576 .unwrap();
577 assert_eq!(top_docs.len(), 1);
578 let doc = top_docs.get(0).unwrap().1;
579 let retrieved_doc = searcher.doc(doc).unwrap();
580 assert_eq!(
582 retrieved_doc,
583 doc!(
584 text_field => "hello world"
585 )
586 );
587
588 info!("searching 2");
589 let query = query_parser.parse_query("bye").unwrap();
591 let top_docs = searcher
592 .search(&query, &tantivy::collector::TopDocs::with_limit(10))
593 .unwrap();
594 assert_eq!(top_docs.len(), 0);
595
596 info!("searching 3");
597 let query = query_parser.parse_query("ex").unwrap();
599 let top_docs = searcher
600 .search(&query, &tantivy::collector::TopDocs::with_limit(10))
601 .unwrap();
602
603 assert_eq!(top_docs.len(), 1);
604 let doc = top_docs.get(0).unwrap().1;
605 let retrieved_doc = searcher.doc(doc).unwrap();
606 assert_eq!(
608 retrieved_doc,
609 doc!(
610 text_field => "Deus Ex"
611 )
612 );
613
614 let dir =
616 new_object_store_directory(store.clone(), &base_path, None, 1, None, None).unwrap();
617 info!("Open Index");
618 let s3_idx = tantivy::Index::open(dir).unwrap();
619 let searcher = s3_idx.reader().unwrap().searcher();
620
621 info!("searching 4");
622 let query = query_parser.parse_query("bye").unwrap();
624 let top_docs = searcher
625 .search(&query, &tantivy::collector::TopDocs::with_limit(10))
626 .unwrap();
627 assert_eq!(top_docs.len(), 1);
628 }
629}