use crate::path::{InvalidPart, Path, PathPart};
use crate::{ObjectStore, parse_url_opts};
use parking_lot::RwLock;
use std::collections::HashMap;
use std::sync::Arc;
use url::Url;
pub trait ObjectStoreRegistry: Send + Sync + std::fmt::Debug + 'static {
fn register(&self, url: Url, store: Arc<dyn ObjectStore>) -> Option<Arc<dyn ObjectStore>>;
fn resolve(&self, url: &Url) -> crate::Result<(Arc<dyn ObjectStore>, Path)>;
}
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
enum Error {
#[error("ObjectStore not found")]
NotFound,
#[error("Error parsing URL path segment")]
InvalidPart(#[from] InvalidPart),
}
impl From<Error> for crate::Error {
fn from(value: Error) -> Self {
Self::Generic {
store: "ObjectStoreRegistry",
source: Box::new(value),
}
}
}
#[derive(Debug, Default)]
pub struct DefaultObjectStoreRegistry {
map: RwLock<HashMap<String, PathEntry>>,
}
#[derive(Debug, Default)]
struct PathEntry {
store: Option<Arc<dyn ObjectStore>>,
children: HashMap<String, Self>,
}
impl PathEntry {
fn lookup(&self, to_resolve: &Url) -> Option<(&Arc<dyn ObjectStore>, usize)> {
let mut current = self;
let mut ret = self.store.as_ref().map(|store| (store, 0));
let mut depth = 0;
for segment in path_segments(to_resolve.path()) {
match current.children.get(segment) {
Some(e) => {
current = e;
depth += 1;
if let Some(store) = ¤t.store {
ret = Some((store, depth))
}
}
None => break,
}
}
ret
}
}
impl DefaultObjectStoreRegistry {
pub fn new() -> Self {
Self::default()
}
}
impl ObjectStoreRegistry for DefaultObjectStoreRegistry {
fn register(&self, url: Url, store: Arc<dyn ObjectStore>) -> Option<Arc<dyn ObjectStore>> {
let mut map = self.map.write();
let key = url_key(&url);
let mut entry = map.entry(key.to_string()).or_default();
for segment in path_segments(url.path()) {
entry = entry.children.entry(segment.to_string()).or_default();
}
entry.store.replace(store)
}
fn resolve(&self, to_resolve: &Url) -> crate::Result<(Arc<dyn ObjectStore>, Path)> {
let key = url_key(to_resolve);
{
let map = self.map.read();
if let Some((store, depth)) = map.get(key).and_then(|entry| entry.lookup(to_resolve)) {
let path = path_suffix(to_resolve, depth)?;
return Ok((Arc::clone(store), path));
}
}
if let Ok((store, path)) = parse_url_opts(to_resolve, std::env::vars()) {
let depth = num_segments(to_resolve.path()) - num_segments(path.as_ref());
let mut map = self.map.write();
let mut entry = map.entry(key.to_string()).or_default();
for segment in path_segments(to_resolve.path()).take(depth) {
entry = entry.children.entry(segment.to_string()).or_default();
}
let store = Arc::clone(match &entry.store {
None => entry.store.insert(Arc::from(store)),
Some(x) => x, });
let path = path_suffix(to_resolve, depth)?;
return Ok((store, path));
}
Err(Error::NotFound.into())
}
}
fn url_key(url: &Url) -> &str {
&url[..url::Position::AfterPort]
}
fn path_segments(s: &str) -> impl Iterator<Item = &str> {
s.split('/').filter(|x| !x.is_empty())
}
fn num_segments(s: &str) -> usize {
path_segments(s).count()
}
fn path_suffix(url: &Url, depth: usize) -> Result<Path, Error> {
let segments = path_segments(url.path()).skip(depth);
let path = segments.map(PathPart::parse).collect::<Result<_, _>>()?;
Ok(path)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::memory::InMemory;
use crate::prefix::PrefixStore;
#[test]
fn test_num_segments() {
assert_eq!(num_segments(""), 0);
assert_eq!(num_segments("/"), 0);
assert_eq!(num_segments("/banana"), 1);
assert_eq!(num_segments("banana"), 1);
assert_eq!(num_segments("/banana/crumble"), 2);
assert_eq!(num_segments("banana/crumble"), 2);
}
#[test]
fn test_default_registry() {
let registry = DefaultObjectStoreRegistry::new();
let banana_url = Url::parse("memory:///banana").unwrap();
let (resolved, path) = registry.resolve(&banana_url).unwrap();
assert_eq!(path.as_ref(), "banana");
let url = Url::parse("memory:///").unwrap();
let root = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
let replaced = registry.register(url, Arc::clone(&root)).unwrap();
assert!(Arc::ptr_eq(&resolved, &replaced));
let banana = Arc::new(PrefixStore::new(InMemory::new(), "banana")) as Arc<dyn ObjectStore>;
assert!(
registry
.register(banana_url.clone(), Arc::clone(&banana))
.is_none()
);
let (resolved, path) = registry.resolve(&banana_url).unwrap();
assert_eq!(path.as_ref(), "");
assert!(Arc::ptr_eq(&resolved, &banana));
let apples_url = Url::parse("memory:///apples").unwrap();
let apples = Arc::new(PrefixStore::new(InMemory::new(), "apples")) as Arc<dyn ObjectStore>;
assert!(registry.register(apples_url, Arc::clone(&apples)).is_none());
let (resolved, path) = registry.resolve(&banana_url).unwrap();
assert_eq!(path.as_ref(), "");
assert!(Arc::ptr_eq(&resolved, &banana));
let banana_muffins_url = Url::parse("memory:///banana_muffins").unwrap();
let (resolved, path) = registry.resolve(&banana_muffins_url).unwrap();
assert_eq!(path.as_ref(), "banana_muffins");
assert!(Arc::ptr_eq(&resolved, &root));
let to_resolve = Url::parse("memory:///foo/banana").unwrap();
let (resolved, path) = registry.resolve(&to_resolve).unwrap();
assert_eq!(path.as_ref(), "foo/banana");
assert!(Arc::ptr_eq(&resolved, &root));
let nested_url = Url::parse("memory:///apples/bananas").unwrap();
let nested =
Arc::new(PrefixStore::new(InMemory::new(), "apples/bananas")) as Arc<dyn ObjectStore>;
assert!(registry.register(nested_url, Arc::clone(&nested)).is_none());
let to_resolve = Url::parse("memory:///apples/bananas/muffins/cupcakes").unwrap();
let (resolved, path) = registry.resolve(&to_resolve).unwrap();
assert_eq!(path.as_ref(), "muffins/cupcakes");
assert!(Arc::ptr_eq(&resolved, &nested));
let nested_url2 = Url::parse("memory:///1/2/3").unwrap();
let nested2 = Arc::new(PrefixStore::new(InMemory::new(), "1/2/3")) as Arc<dyn ObjectStore>;
assert!(
registry
.register(nested_url2, Arc::clone(&nested2))
.is_none()
);
let to_resolve = Url::parse("memory:///1/2/3/4/5/6").unwrap();
let (resolved, path) = registry.resolve(&to_resolve).unwrap();
assert_eq!(path.as_ref(), "4/5/6");
assert!(Arc::ptr_eq(&resolved, &nested2));
let custom_scheme_url = Url::parse("custom:///").unwrap();
let custom_scheme = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
assert!(
registry
.register(custom_scheme_url, Arc::clone(&custom_scheme))
.is_none()
);
let to_resolve = Url::parse("custom:///6/7").unwrap();
let (resolved, path) = registry.resolve(&to_resolve).unwrap();
assert_eq!(path.as_ref(), "6/7");
assert!(Arc::ptr_eq(&resolved, &custom_scheme));
}
}