use std::sync::Arc;
use object_store::ObjectStore;
use pyo3::exceptions::{PyRuntimeWarning, PyValueError};
use pyo3::prelude::*;
use pyo3::pybacked::PyBackedStr;
use pyo3::types::{PyDict, PyTuple};
use pyo3::{intern, PyTypeInfo};
use crate::{PyAzureStore, PyGCSStore, PyHttpStore, PyLocalStore, PyMemoryStore, PyS3Store};
pub struct PyObjectStore(Arc<dyn ObjectStore>);
impl<'py> FromPyObject<'_, 'py> for PyObjectStore {
type Error = PyErr;
fn extract(obj: Borrowed<'_, 'py, pyo3::PyAny>) -> PyResult<Self> {
if let Ok(store) = obj.cast::<PyS3Store>() {
Ok(Self(store.get().as_ref().clone()))
} else if let Ok(store) = obj.cast::<PyAzureStore>() {
Ok(Self(store.get().as_ref().clone()))
} else if let Ok(store) = obj.cast::<PyGCSStore>() {
Ok(Self(store.get().as_ref().clone()))
} else if let Ok(store) = obj.cast::<PyHttpStore>() {
Ok(Self(store.get().as_ref().clone()))
} else if let Ok(store) = obj.cast::<PyLocalStore>() {
Ok(Self(store.get().as_ref().clone()))
} else if let Ok(store) = obj.cast::<PyMemoryStore>() {
Ok(Self(store.get().as_ref().clone()))
} else {
let py = obj.py();
let cls_name = obj
.getattr(intern!(py, "__class__"))?
.getattr(intern!(py, "__name__"))?
.extract::<PyBackedStr>()?;
if [
PyAzureStore::NAME,
PyGCSStore::NAME,
PyHttpStore::NAME,
PyLocalStore::NAME,
PyMemoryStore::NAME,
PyS3Store::NAME,
]
.contains(&cls_name.as_ref())
{
return Err(PyValueError::new_err("You must use an object store instance exported from **the same library** as this function. They cannot be used across libraries.\nThis is because object store instances are compiled with a specific version of Rust and Python." ));
}
Err(PyValueError::new_err(format!(
"Expected an object store instance, got {}",
obj.repr()?
)))
}
}
}
impl AsRef<Arc<dyn ObjectStore>> for PyObjectStore {
fn as_ref(&self) -> &Arc<dyn ObjectStore> {
&self.0
}
}
impl From<PyObjectStore> for Arc<dyn ObjectStore> {
fn from(value: PyObjectStore) -> Self {
value.0
}
}
impl PyObjectStore {
pub fn into_inner(self) -> Arc<dyn ObjectStore> {
self.0
}
pub fn into_dyn(self) -> Arc<dyn ObjectStore> {
self.0
}
}
#[derive(Debug, Clone)]
struct PyExternalObjectStoreInner(Arc<dyn ObjectStore>);
impl<'py> FromPyObject<'_, 'py> for PyExternalObjectStoreInner {
type Error = PyErr;
fn extract(obj: Borrowed<'_, 'py, pyo3::PyAny>) -> PyResult<Self> {
let py = obj.py();
let cls_name = obj
.getattr(intern!(py, "__class__"))?
.getattr(intern!(py, "__name__"))?
.extract::<PyBackedStr>()?;
if cls_name == PyAzureStore::NAME {
let (args, kwargs): (Bound<PyTuple>, Bound<PyDict>) = obj
.call_method0(intern!(py, "__getnewargs_ex__"))?
.extract()?;
let store = PyAzureStore::type_object(py)
.call(args, Some(&kwargs))?
.cast::<PyAzureStore>()?
.get()
.clone();
return Ok(Self(store.into_inner()));
}
if cls_name == PyGCSStore::NAME {
let (args, kwargs): (Bound<PyTuple>, Bound<PyDict>) = obj
.call_method0(intern!(py, "__getnewargs_ex__"))?
.extract()?;
let store = PyGCSStore::type_object(py)
.call(args, Some(&kwargs))?
.cast::<PyGCSStore>()?
.get()
.clone();
return Ok(Self(store.into_inner()));
}
if cls_name == PyHttpStore::NAME {
let (args, kwargs): (Bound<PyTuple>, Bound<PyDict>) = obj
.call_method0(intern!(py, "__getnewargs_ex__"))?
.extract()?;
let store = PyHttpStore::type_object(py)
.call(args, Some(&kwargs))?
.cast::<PyHttpStore>()?
.get()
.clone();
return Ok(Self(store.into_inner()));
}
if cls_name == PyLocalStore::NAME {
let (args, kwargs): (Bound<PyTuple>, Bound<PyDict>) = obj
.call_method0(intern!(py, "__getnewargs_ex__"))?
.extract()?;
let store = PyLocalStore::type_object(py)
.call(args, Some(&kwargs))?
.cast::<PyLocalStore>()?
.get()
.clone();
return Ok(Self(store.into_inner()));
}
if cls_name == PyS3Store::NAME {
let (args, kwargs): (Bound<PyTuple>, Bound<PyDict>) = obj
.call_method0(intern!(py, "__getnewargs_ex__"))?
.extract()?;
let store = PyS3Store::type_object(py)
.call(args, Some(&kwargs))?
.cast::<PyS3Store>()?
.get()
.clone();
return Ok(Self(store.into_inner()));
}
Err(PyValueError::new_err(format!(
"Expected an object store-compatible instance, got {}",
obj.repr()?
)))
}
}
#[derive(Debug, Clone)]
pub struct PyExternalObjectStore(PyExternalObjectStoreInner);
impl From<PyExternalObjectStore> for Arc<dyn ObjectStore> {
fn from(value: PyExternalObjectStore) -> Self {
value.0 .0
}
}
impl PyExternalObjectStore {
pub fn into_dyn(self) -> Arc<dyn ObjectStore> {
self.into()
}
}
impl<'py> FromPyObject<'_, 'py> for PyExternalObjectStore {
type Error = PyErr;
fn extract(obj: Borrowed<'_, 'py, pyo3::PyAny>) -> PyResult<Self> {
match obj.extract() {
Ok(inner) => {
#[cfg(feature = "external-store-warning")]
{
let py = obj.py();
let warnings_mod = py.import(intern!(py, "warnings"))?;
let warning = PyRuntimeWarning::new_err(
"Successfully reconstructed a store defined in another Python module. Connection pooling will not be shared across store instances.",
);
let args = PyTuple::new(py, vec![warning])?;
warnings_mod.call_method1(intern!(py, "warn"), args)?;
}
Ok(Self(inner))
}
Err(err) => Err(err),
}
}
}
#[derive(FromPyObject)]
pub enum AnyObjectStore {
PyObjectStore(PyObjectStore),
PyExternalObjectStore(PyExternalObjectStore),
}
impl From<AnyObjectStore> for Arc<dyn ObjectStore> {
fn from(value: AnyObjectStore) -> Self {
match value {
AnyObjectStore::PyObjectStore(store) => store.into(),
AnyObjectStore::PyExternalObjectStore(store) => store.into(),
}
}
}
impl AnyObjectStore {
pub fn into_dyn(self) -> Arc<dyn ObjectStore> {
self.into()
}
}