use crate::error::{CoreError, CoreResult, ErrorContext, ErrorLocation};
use std::any::{Any, TypeId};
use std::collections::HashMap;
use std::fmt;
use std::hash::Hash;
use std::sync::atomic::Ordering;
use std::sync::{Arc, Mutex, RwLock, Weak};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct DataId(u64);
impl DataId {
pub fn new() -> Self {
use std::sync::atomic::AtomicU64;
static COUNTER: AtomicU64 = AtomicU64::new(1);
Self(COUNTER.fetch_add(1, Ordering::Relaxed))
}
pub fn raw(&self) -> u64 {
self.0
}
}
impl fmt::Display for DataId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "DataId({})", self.0)
}
}
impl Default for DataId {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct DataMetadata {
pub type_id: TypeId,
pub type_name: String,
pub size_bytes: usize,
pub element_count: usize,
pub element_size: usize,
pub created_at: std::time::Instant,
pub description: Option<String>,
pub numa_node: Option<usize>,
pub is_mutable: bool,
}
impl DataMetadata {
pub fn new<T: 'static>(
data: &[T],
description: Option<String>,
numa_node: Option<usize>,
is_mutable: bool,
) -> Self {
Self {
type_id: TypeId::of::<T>(),
type_name: std::any::type_name::<T>().to_string(),
size_bytes: std::mem::size_of_val(data),
element_count: data.len(),
element_size: std::mem::size_of::<T>(),
created_at: std::time::Instant::now(),
description,
numa_node,
is_mutable,
}
}
pub fn is_compatible_with<T: 'static>(&self) -> bool {
self.type_id == TypeId::of::<T>()
}
}
#[derive(Debug)]
struct ZeroCopyDataInner<T> {
data: Vec<T>,
metadata: DataMetadata,
#[allow(dead_code)]
weak_refs: Mutex<Vec<Weak<ZeroCopyDataInner<T>>>>,
}
impl<T> ZeroCopyDataInner<T> {
fn new(
data: Vec<T>,
description: Option<String>,
numa_node: Option<usize>,
is_mutable: bool,
) -> Self
where
T: 'static,
{
let metadata = DataMetadata::new(&data, description, numa_node, is_mutable);
Self {
data,
metadata,
weak_refs: Mutex::new(Vec::new()),
}
}
}
#[derive(Debug)]
pub struct ZeroCopyData<T> {
inner: Arc<ZeroCopyDataInner<T>>,
id: DataId,
}
impl<T> ZeroCopyData<T>
where
T: Clone + 'static,
{
pub fn new(data: Vec<T>) -> CoreResult<Self> {
Self::with_metadata(data, None, None, false)
}
pub fn with_metadata(
data: Vec<T>,
description: Option<String>,
numa_node: Option<usize>,
is_mutable: bool,
) -> CoreResult<Self> {
if data.is_empty() {
return Err(CoreError::ValidationError(
ErrorContext::new("Cannot create zero-copy data from empty vector".to_string())
.with_location(ErrorLocation::new(file!(), line!())),
));
}
let inner = Arc::new(ZeroCopyDataInner::new(
data,
description,
numa_node,
is_mutable,
));
let id = DataId::new();
Ok(Self { inner, id })
}
pub fn new_mutable(data: Vec<T>) -> CoreResult<Self> {
Self::with_metadata(data, None, None, true)
}
pub fn id(&self) -> DataId {
self.id
}
pub fn metadata(&self) -> &DataMetadata {
&self.inner.metadata
}
pub fn as_slice(&self) -> &[T] {
&self.inner.data
}
pub fn len(&self) -> usize {
self.inner.data.len()
}
pub fn is_empty(&self) -> bool {
self.inner.data.is_empty()
}
pub fn ref_count(&self) -> usize {
Arc::strong_count(&self.inner)
}
pub fn is_unique(&self) -> bool {
Arc::strong_count(&self.inner) == 1
}
pub fn view(&self, start: usize, len: usize) -> CoreResult<ZeroCopyView<T>> {
if start + len > self.len() {
return Err(CoreError::IndexError(
ErrorContext::new(format!(
"View range [{}..{}] exceeds data length {}",
start,
start + len,
self.len()
))
.with_location(ErrorLocation::new(file!(), line!())),
));
}
Ok(ZeroCopyView::new(self.clone(), start, len))
}
pub fn downgrade(&self) -> ZeroCopyWeakRef<T> {
ZeroCopyWeakRef {
inner: Arc::downgrade(&self.inner),
id: self.id,
}
}
}
impl<T> Clone for ZeroCopyData<T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
id: self.id,
}
}
}
#[derive(Debug)]
pub struct ZeroCopyWeakRef<T> {
inner: Weak<ZeroCopyDataInner<T>>,
id: DataId,
}
impl<T> ZeroCopyWeakRef<T> {
pub fn upgrade(&self) -> Option<ZeroCopyData<T>> {
self.inner
.upgrade()
.map(|inner| ZeroCopyData { inner, id: self.id })
}
pub fn id(&self) -> DataId {
self.id
}
pub fn is_alive(&self) -> bool {
self.inner.strong_count() > 0
}
}
impl<T> Clone for ZeroCopyWeakRef<T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
id: self.id,
}
}
}
#[derive(Debug)]
pub struct ZeroCopyView<T> {
data: ZeroCopyData<T>,
start: usize,
len: usize,
}
impl<T> ZeroCopyView<T>
where
T: Clone + 'static,
{
fn new(data: ZeroCopyData<T>, start: usize, len: usize) -> Self {
Self { data, start, len }
}
pub fn as_slice(&self) -> &[T] {
&self.data.as_slice()[self.start..self.start + self.len]
}
pub fn len(&self) -> usize {
self.len
}
pub fn is_empty(&self) -> bool {
self.len == 0
}
pub const fn underlying_data(&self) -> &ZeroCopyData<T> {
&self.data
}
pub fn subview(&self, start: usize, len: usize) -> CoreResult<ZeroCopyView<T>> {
if start + len > self.len {
return Err(CoreError::IndexError(
ErrorContext::new(format!(
"Subview range [{}..{}] exceeds view length {}",
start,
start + len,
self.len
))
.with_location(ErrorLocation::new(file!(), line!())),
));
}
Ok(ZeroCopyView::new(
self.data.clone(),
self.start + start,
len,
))
}
}
impl<T> Clone for ZeroCopyView<T> {
fn clone(&self) -> Self {
Self {
data: self.data.clone(),
start: self.start,
len: self.len,
}
}
}
trait AnyZeroCopyData: Send + Sync + std::fmt::Debug + Any {
#[allow(dead_code)]
fn type_id(&self) -> TypeId;
fn metadata(&self) -> &DataMetadata;
fn clone_box(&self) -> Box<dyn AnyZeroCopyData>;
fn data_id(&self) -> DataId;
fn as_any(&self) -> &dyn Any;
}
impl<T: Clone + 'static + Send + Sync + std::fmt::Debug> AnyZeroCopyData for ZeroCopyData<T> {
#[allow(dead_code)]
fn type_id(&self) -> TypeId {
TypeId::of::<T>()
}
fn metadata(&self) -> &DataMetadata {
&self.inner.metadata
}
fn clone_box(&self) -> Box<dyn AnyZeroCopyData> {
Box::new(self.clone())
}
fn data_id(&self) -> DataId {
self.id
}
fn as_any(&self) -> &dyn Any {
self
}
}
#[derive(Debug)]
pub struct ZeroCopyInterface {
named_data: RwLock<HashMap<String, Box<dyn AnyZeroCopyData>>>,
id_data: RwLock<HashMap<DataId, Box<dyn AnyZeroCopyData>>>,
type_data: RwLock<HashMap<TypeId, Vec<Box<dyn AnyZeroCopyData>>>>,
stats: RwLock<InterfaceStats>,
}
#[derive(Debug, Clone, Default)]
pub struct InterfaceStats {
pub items_registered: usize,
pub exchanges_successful: usize,
pub exchanges_failed: usize,
pub total_memory_managed: usize,
pub active_references: usize,
pub views_created: usize,
}
impl ZeroCopyInterface {
pub fn new() -> Self {
Self {
named_data: RwLock::new(HashMap::new()),
id_data: RwLock::new(HashMap::new()),
type_data: RwLock::new(HashMap::new()),
stats: RwLock::new(InterfaceStats::default()),
}
}
pub fn register_data<T: Clone + 'static + Send + Sync + std::fmt::Debug>(
&self,
name: &str,
data: ZeroCopyData<T>,
) -> CoreResult<()> {
let boxed_data = Box::new(data.clone()) as Box<dyn AnyZeroCopyData>;
{
let mut named = self.named_data.write().expect("Operation failed");
if named.contains_key(name) {
return Err(CoreError::ValidationError(
ErrorContext::new(format!("Data with name '{name}' already exists"))
.with_location(ErrorLocation::new(file!(), line!())),
));
}
named.insert(name.to_string(), boxed_data.clone_box());
}
{
let mut id_map = self.id_data.write().expect("Operation failed");
id_map.insert(data.id(), boxed_data.clone_box());
}
{
let mut type_map = self.type_data.write().expect("Operation failed");
let type_id = TypeId::of::<T>();
type_map.entry(type_id).or_default().push(boxed_data);
}
{
let mut stats = self.stats.write().expect("Operation failed");
stats.items_registered += 1;
stats.total_memory_managed += data.metadata().size_bytes;
}
Ok(())
}
pub fn get_data<T: Clone + 'static + Send + Sync + std::fmt::Debug>(
&self,
name: &str,
) -> CoreResult<ZeroCopyData<T>> {
let named = self.named_data.read().expect("Operation failed");
if let Some(any_data) = named.get(name) {
if let Some(typed_data) = any_data.as_any().downcast_ref::<ZeroCopyData<T>>() {
self.update_exchange_stats(true);
Ok(typed_data.clone())
} else {
self.update_exchange_stats(false);
Err(CoreError::ValidationError(
ErrorContext::new(format!(
"Data '{}' exists but has wrong type. Expected {}, found {}",
name,
std::any::type_name::<T>(),
any_data.metadata().type_name
))
.with_location(ErrorLocation::new(file!(), line!())),
))
}
} else {
self.update_exchange_stats(false);
Err(CoreError::ValidationError(
ErrorContext::new(format!("No data found with name '{name}'"))
.with_location(ErrorLocation::new(file!(), line!())),
))
}
}
pub fn get_data_by_id<T: Clone + 'static + Send + Sync + std::fmt::Debug>(
&self,
id: DataId,
) -> CoreResult<ZeroCopyData<T>> {
let id_map = self.id_data.read().expect("Operation failed");
if let Some(any_data) = id_map.get(&id) {
if let Some(typed_data) = any_data.as_any().downcast_ref::<ZeroCopyData<T>>() {
self.update_exchange_stats(true);
Ok(typed_data.clone())
} else {
self.update_exchange_stats(false);
Err(CoreError::ValidationError(
ErrorContext::new(format!(
"Data with ID {} exists but has wrong type. Expected {}, found {}",
id,
std::any::type_name::<T>(),
any_data.metadata().type_name
))
.with_location(ErrorLocation::new(file!(), line!())),
))
}
} else {
self.update_exchange_stats(false);
Err(CoreError::ValidationError(
ErrorContext::new(format!("{id}"))
.with_location(ErrorLocation::new(file!(), line!())),
))
}
}
pub fn get_data_by_type<T: Clone + 'static + Send + Sync + std::fmt::Debug>(
&self,
) -> Vec<ZeroCopyData<T>> {
let type_map = self.type_data.read().expect("Operation failed");
let type_id = TypeId::of::<T>();
if let Some(data_vec) = type_map.get(&type_id) {
data_vec
.iter()
.filter_map(|any_data| any_data.as_any().downcast_ref::<ZeroCopyData<T>>())
.cloned()
.collect()
} else {
Vec::new()
}
}
pub fn borrow_data<T: Clone + 'static + Send + Sync + std::fmt::Debug>(
&self,
name: &str,
) -> CoreResult<ZeroCopyView<T>> {
let data = self.get_data::<T>(name)?;
let view = data.view(0, data.len())?;
{
let mut stats = self.stats.write().expect("Operation failed");
stats.views_created += 1;
}
Ok(view)
}
pub fn has_data(&self, name: &str) -> bool {
self.named_data
.read()
.expect("Operation failed")
.contains_key(name)
}
pub fn has_data_by_id(&self, id: DataId) -> bool {
self.id_data
.read()
.expect("Operation failed")
.contains_key(&id)
}
pub fn remove_data(&self, name: &str) -> CoreResult<()> {
let mut named = self.named_data.write().expect("Operation failed");
if let Some(data) = named.remove(name) {
let id = data.data_id();
let mut id_map = self.id_data.write().expect("Operation failed");
id_map.remove(&id);
{
let mut stats = self.stats.write().expect("Operation failed");
stats.total_memory_managed -= data.metadata().size_bytes;
}
Ok(())
} else {
Err(CoreError::ValidationError(
ErrorContext::new(format!("No data found with name '{name}'"))
.with_location(ErrorLocation::new(file!(), line!())),
))
}
}
pub fn list_data_names(&self) -> Vec<String> {
self.named_data
.read()
.expect("Operation failed")
.keys()
.cloned()
.collect()
}
pub fn list_data_ids(&self) -> Vec<DataId> {
self.id_data
.read()
.expect("Operation failed")
.keys()
.cloned()
.collect()
}
pub fn get_metadata(&self, name: &str) -> CoreResult<DataMetadata> {
let named = self.named_data.read().expect("Operation failed");
if let Some(data) = named.get(name) {
Ok(data.metadata().clone())
} else {
Err(CoreError::ValidationError(
ErrorContext::new(format!("No data found with name '{name}'"))
.with_location(ErrorLocation::new(file!(), line!())),
))
}
}
pub fn stats(&self) -> InterfaceStats {
self.stats.read().expect("Operation failed").clone()
}
pub fn clear(&self) {
let mut named = self.named_data.write().expect("Operation failed");
let mut id_map = self.id_data.write().expect("Operation failed");
let mut type_map = self.type_data.write().expect("Operation failed");
named.clear();
id_map.clear();
type_map.clear();
{
let mut stats = self.stats.write().expect("Operation failed");
*stats = InterfaceStats::default();
}
}
fn update_exchange_stats(&self, success: bool) {
let mut stats = self.stats.write().expect("Operation failed");
if success {
stats.exchanges_successful += 1;
} else {
stats.exchanges_failed += 1;
}
}
}
impl Default for ZeroCopyInterface {
fn default() -> Self {
Self::new()
}
}
static GLOBAL_INTERFACE: std::sync::OnceLock<ZeroCopyInterface> = std::sync::OnceLock::new();
#[allow(dead_code)]
pub fn global_interface() -> &'static ZeroCopyInterface {
GLOBAL_INTERFACE.get_or_init(ZeroCopyInterface::new)
}
pub trait DataExchange<T: Clone + 'static> {
fn export_data(&self, interface: &ZeroCopyInterface, name: &str) -> CoreResult<DataId>;
fn from_interface(interface: &ZeroCopyInterface, name: &str) -> CoreResult<Self>
where
Self: Sized;
}
impl<T: Clone + 'static + Send + Sync + std::fmt::Debug> DataExchange<T> for Vec<T> {
fn export_data(&self, interface: &ZeroCopyInterface, name: &str) -> CoreResult<DataId> {
let zero_copy_data = ZeroCopyData::new(self.clone())?;
interface.register_data(name, zero_copy_data)?;
Ok(DataId::new())
}
fn from_interface(interface: &ZeroCopyInterface, name: &str) -> CoreResult<Self> {
let zero_copy_data: ZeroCopyData<T> = interface.get_data(name)?;
Ok(zero_copy_data.as_slice().to_vec())
}
}
impl<A> DataExchange<A> for crate::memory_efficient::memmap::MemoryMappedArray<A>
where
A: Clone + Copy + 'static + Send + Sync + std::fmt::Debug,
{
fn export_data(&self, interface: &ZeroCopyInterface, name: &str) -> CoreResult<DataId> {
let data_slice = self.as_slice();
let data_vec = data_slice.to_vec();
let zero_copy_data = ZeroCopyData::new(data_vec)?;
interface.register_data(name, zero_copy_data)?;
Ok(DataId::new())
}
fn from_interface(interface: &ZeroCopyInterface, name: &str) -> CoreResult<Self> {
let zero_copy_data: ZeroCopyData<A> = interface.get_data(name)?;
let data_vec = zero_copy_data.as_slice().to_vec();
use crate::memory_efficient::memmap::AccessMode;
use tempfile::NamedTempFile;
let temp_file = NamedTempFile::new().map_err(|e| {
CoreError::IoError(crate::error::ErrorContext::new(format!(
"Failed to create temporary file for import: {e}"
)))
})?;
let temp_path = temp_file.path().to_path_buf();
Self::new::<crate::ndarray::OwnedRepr<A>, crate::ndarray::IxDyn>(
None,
&temp_path,
AccessMode::ReadWrite,
0,
)
}
}
pub trait IntoZeroCopy<T: Clone + 'static> {
fn into_zero_copy(self) -> CoreResult<ZeroCopyData<T>>;
}
#[allow(dead_code)]
pub fn export_array_data<T: Clone + 'static + Send + Sync + std::fmt::Debug>(
data: &[T],
name: &str,
) -> CoreResult<DataId> {
let data_vec = data.to_vec();
data_vec.export_data(global_interface(), name)
}
#[allow(dead_code)]
pub fn import_array_data<T: Clone + 'static + Send + Sync + std::fmt::Debug>(
name: &str,
) -> CoreResult<Vec<T>> {
Vec::<T>::from_interface(global_interface(), name)
}
#[allow(dead_code)]
pub fn export_memmap_array<A>(
array: &crate::memory_efficient::memmap::MemoryMappedArray<A>,
name: &str,
) -> CoreResult<DataId>
where
A: Clone + Copy + 'static + Send + Sync + std::fmt::Debug,
{
array.export_data(global_interface(), name)
}
#[allow(dead_code)]
pub fn import_memmap_array<A>(
name: &str,
) -> CoreResult<crate::memory_efficient::memmap::MemoryMappedArray<A>>
where
A: Clone + Copy + 'static + Send + Sync + std::fmt::Debug,
{
crate::memory_efficient::memmap::MemoryMappedArray::<A>::from_interface(
global_interface(),
name,
)
}
impl<T: Clone + 'static> IntoZeroCopy<T> for Vec<T> {
fn into_zero_copy(self) -> CoreResult<ZeroCopyData<T>> {
ZeroCopyData::new(self)
}
}
impl<T: Clone + 'static> IntoZeroCopy<T> for &[T] {
fn into_zero_copy(self) -> CoreResult<ZeroCopyData<T>> {
ZeroCopyData::new(self.to_vec())
}
}
pub trait FromZeroCopy<T: Clone + 'static> {
fn from_zero_copy(data: &ZeroCopyData<T>) -> Self;
}
impl<T: Clone + 'static> FromZeroCopy<T> for Vec<T> {
fn from_zero_copy(data: &ZeroCopyData<T>) -> Self {
data.as_slice().to_vec()
}
}
#[allow(dead_code)]
pub fn create_global_data_registry() -> &'static ZeroCopyInterface {
global_interface()
}
#[allow(dead_code)]
pub fn register_global_data<T: Clone + 'static + Send + Sync + std::fmt::Debug>(
name: &str,
data: ZeroCopyData<T>,
) -> CoreResult<()> {
global_interface().register_data(name, data)
}
#[allow(dead_code)]
pub fn get_global_data<T: Clone + 'static + Send + Sync + std::fmt::Debug>(
name: &str,
) -> CoreResult<ZeroCopyData<T>> {
global_interface().get_data(name)
}
#[allow(dead_code)]
pub fn create_zero_copy_data<T: Clone + 'static + Send + Sync + std::fmt::Debug>(
data: Vec<T>,
) -> CoreResult<ZeroCopyData<T>> {
ZeroCopyData::new(data)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_zero_copy_data_creation() {
let data = vec![1, 2, 3, 4, 5];
let zero_copy = ZeroCopyData::new(data.clone()).expect("Operation failed");
assert_eq!(zero_copy.as_slice(), &data);
assert_eq!(zero_copy.len(), 5);
assert!(!zero_copy.is_empty());
assert_eq!(zero_copy.ref_count(), 1);
assert!(zero_copy.is_unique());
}
#[test]
fn test_zero_copy_data_cloning() {
let data = vec![1, 2, 3, 4, 5];
let zero_copy1 = ZeroCopyData::new(data).expect("Operation failed");
let zero_copy2 = zero_copy1.clone();
assert_eq!(zero_copy1.ref_count(), 2);
assert_eq!(zero_copy2.ref_count(), 2);
assert!(!zero_copy1.is_unique());
assert!(!zero_copy2.is_unique());
assert_eq!(zero_copy1.id(), zero_copy2.id());
}
#[test]
fn test_zero_copy_view() {
let data = vec![1, 2, 3, 4, 5];
let zero_copy = ZeroCopyData::new(data).expect("Operation failed");
let view = zero_copy.view(1, 3).expect("Operation failed");
assert_eq!(view.as_slice(), &[2, 3, 4]);
assert_eq!(view.len(), 3);
let subview = view.subview(1, 1).expect("Operation failed");
assert_eq!(subview.as_slice(), &[3]);
}
#[test]
fn test_zero_copy_interface() {
let interface = ZeroCopyInterface::new();
let data = vec![1.0, 2.0, 3.0];
let zero_copy = ZeroCopyData::new(data.clone()).expect("Operation failed");
interface
.register_data("test_data", zero_copy)
.expect("Operation failed");
assert!(interface.has_data("test_data"));
let retrieved = interface
.get_data::<f64>("test_data")
.expect("Operation failed");
assert_eq!(retrieved.as_slice(), &data);
let view = interface
.borrow_data::<f64>("test_data")
.expect("Operation failed");
assert_eq!(view.as_slice(), &data);
let metadata = interface
.get_metadata("test_data")
.expect("Operation failed");
assert_eq!(metadata.element_count, 3);
assert_eq!(metadata.element_size, std::mem::size_of::<f64>());
}
#[test]
fn test_zero_copy_interface_type_safety() {
let interface = ZeroCopyInterface::new();
let data = vec![1, 2, 3];
let zero_copy = ZeroCopyData::new(data).expect("Operation failed");
interface
.register_data("int_data", zero_copy)
.expect("Operation failed");
let result = interface.get_data::<f64>("int_data");
assert!(result.is_err());
}
#[test]
fn test_weak_references() {
let data = vec![1, 2, 3];
let zero_copy = ZeroCopyData::new(data).expect("Operation failed");
let weak_ref = zero_copy.downgrade();
assert!(weak_ref.is_alive());
assert_eq!(weak_ref.id(), zero_copy.id());
let upgraded = weak_ref.upgrade().expect("Operation failed");
assert_eq!(upgraded.as_slice(), zero_copy.as_slice());
drop(zero_copy);
drop(upgraded);
assert!(!weak_ref.is_alive());
assert!(weak_ref.upgrade().is_none());
}
#[test]
fn test_global_interface() {
let data = vec![1.0, 2.0, 3.0];
let zero_copy = ZeroCopyData::new(data.clone()).expect("Operation failed");
register_global_data("global_test", zero_copy).expect("Operation failed");
let retrieved = get_global_data::<f64>("global_test").expect("Operation failed");
assert_eq!(retrieved.as_slice(), &data);
}
#[test]
fn test_into_zero_copy_trait() {
let data = vec![1, 2, 3, 4, 5];
let zero_copy = data.clone().into_zero_copy().expect("Operation failed");
assert_eq!(zero_copy.as_slice(), &data);
let slice: &[i32] = &data;
let zero_copy2 = slice.into_zero_copy().expect("Operation failed");
assert_eq!(zero_copy2.as_slice(), &data);
}
#[test]
fn test_from_zero_copy_trait() {
let data = vec![1, 2, 3, 4, 5];
let zero_copy = ZeroCopyData::new(data.clone()).expect("Operation failed");
let extracted: Vec<i32> = FromZeroCopy::from_zero_copy(&zero_copy);
assert_eq!(extracted, data);
}
}