use std::sync::Arc;
use bytes::Bytes;
use indexmap::IndexMap;
use object_store::{ObjectStore, ObjectStoreExt};
use crate::{
DType, Error, Result,
address::ChunkAddress,
array::ArrayElement,
codec::CompressionCodec,
delta::{
Delta, DeltaAllocator, DeltaCache, DeltaImmutable, DeltaMutable, write_file_then_bytes,
},
footer::{FOOTER_VERSION, Footer},
layout::{
ArrayLayout, ArrayMeta, AttrIndexKind, AttributeValue, Attributes, ChunkEntry, FillValue,
StorageLayout,
},
stats::{ArrayStats, StatsFile, compute_chunk_partial, merge_partial, read_stats_file},
storage::{InMemoryStorage, ObjectStoreBackend, Storage},
};
pub const DEFAULT_BLOCK_TARGET_SIZE: usize = 8 * 1024 * 1024; pub const DEFAULT_CACHE_CAPACITY: usize = 256 * 1024 * 1024; pub const DEFAULT_IO_CACHE_CAPACITY: usize = 64 * 1024 * 1024;
pub struct FileConfig<C: CompressionCodec> {
pub codec: C,
pub block_target_size: usize,
pub cache_capacity: usize,
pub io_cache_capacity: usize,
pub cache: Option<Arc<DeltaCache>>,
}
impl<C: CompressionCodec> FileConfig<C> {
pub fn new(codec: C) -> Self {
Self {
codec,
block_target_size: DEFAULT_BLOCK_TARGET_SIZE,
cache_capacity: DEFAULT_CACHE_CAPACITY,
io_cache_capacity: DEFAULT_IO_CACHE_CAPACITY,
cache: None,
}
}
}
#[derive(Debug, Clone)]
pub struct MergedArrayMeta {
pub name: String,
pub dtype: DType,
pub shape: Vec<u32>,
pub chunk_shape: Vec<u32>,
pub dimension_names: Vec<String>,
pub fill_value: Option<FillValue>,
}
struct StoreDir {
store: Arc<dyn ObjectStore>,
base_path: object_store::path::Path,
}
pub(crate) struct ChunkedSchema {
pub full_shape: Vec<u32>,
pub chunk_shape: Vec<u32>,
pub dtype: DType,
pub all_coords: Vec<Vec<u32>>,
}
pub struct ArrayFile {
deltas: Vec<Delta<DeltaImmutable>>,
pending: Option<Delta<DeltaMutable>>,
codec: Arc<dyn CompressionCodec>,
block_target_size: usize,
cache: Option<Arc<DeltaCache>>,
store_dir: Option<StoreDir>,
stats: Option<StatsFile>,
}
impl ArrayFile {
pub async fn create<C: CompressionCodec + 'static>(
store: Arc<dyn ObjectStore>,
path: object_store::path::Path,
config: FileConfig<C>,
) -> Result<Self> {
let cache = resolve_cache(&config);
let delta_path = Arc::<str>::from(path.as_ref());
let storage =
Arc::new(ObjectStoreBackend::new(Arc::clone(&store), path.clone())) as Arc<dyn Storage>;
write_empty_base(&*storage).await?;
let base_delta = Delta::<DeltaImmutable>::open(storage, delta_path, cache.clone()).await?;
Ok(ArrayFile {
deltas: vec![base_delta],
pending: None,
codec: Arc::new(config.codec),
block_target_size: config.block_target_size,
cache,
store_dir: Some(StoreDir {
store,
base_path: path,
}),
stats: None,
})
}
pub async fn open<C: CompressionCodec + 'static>(
store: Arc<dyn ObjectStore>,
path: object_store::path::Path,
config: FileConfig<C>,
) -> Result<Self> {
let cache = resolve_cache(&config);
let delta_path = Arc::<str>::from(path.as_ref());
let storage =
Arc::new(ObjectStoreBackend::new(Arc::clone(&store), path.clone())) as Arc<dyn Storage>;
let base_delta = Delta::<DeltaImmutable>::open(storage, delta_path, cache.clone()).await?;
let mut deltas = vec![base_delta];
let sidecars = discover_sidecars_store(&*store, &path).await?;
for (_, scar_path) in sidecars {
let scar_delta_path = Arc::<str>::from(scar_path.as_ref());
let scar_storage = Arc::new(ObjectStoreBackend::new(Arc::clone(&store), scar_path))
as Arc<dyn Storage>;
deltas.push(
Delta::<DeltaImmutable>::open(scar_storage, scar_delta_path, cache.clone()).await?,
);
}
let stats = {
let s_storage = ObjectStoreBackend::new(Arc::clone(&store), stats_path(&path));
read_stats_file(&s_storage).await.ok()
};
Ok(ArrayFile {
deltas,
pending: None,
codec: Arc::new(config.codec),
block_target_size: config.block_target_size,
cache,
store_dir: Some(StoreDir {
store,
base_path: path,
}),
stats,
})
}
pub async fn create_memory<C: CompressionCodec + 'static>(
config: FileConfig<C>,
) -> Result<Self> {
let cache = resolve_cache(&config);
let storage = Arc::new(InMemoryStorage::new()) as Arc<dyn Storage>;
write_empty_base(&*storage).await?;
let base_delta =
Delta::<DeltaImmutable>::open(storage, Arc::from("__memory_0__"), cache.clone())
.await?;
Ok(ArrayFile {
deltas: vec![base_delta],
pending: None,
codec: Arc::new(config.codec),
block_target_size: config.block_target_size,
cache,
store_dir: None,
stats: None,
})
}
}
impl ArrayFile {
pub fn get_array(&self, name: &str) -> Result<&ArrayMeta> {
self.resolve_array_meta(name)
.ok_or_else(|| Error::ArrayNotFound {
name: name.to_string(),
})
}
fn resolve_array_meta(&self, name: &str) -> Option<&ArrayMeta> {
if let Some(p) = self.pending.as_ref()
&& let Some(m) = p.inner.array_meta.get(name)
{
return if m.deleted { None } else { Some(m) };
}
for delta in self.deltas.iter().rev() {
if let Some(&i) = delta.inner.array_index.get(name) {
let m = &delta.inner.footer.arrays[i];
return if m.deleted { None } else { Some(m) };
}
}
None
}
fn pending_mut(&mut self) -> &mut Delta<DeltaMutable> {
if self.pending.is_none() {
let overlay_index = self.deltas.len() as u32;
self.pending = Some(Delta::<DeltaMutable>::new(
Arc::clone(&self.codec),
self.block_target_size,
overlay_index,
));
}
self.pending.as_mut().unwrap()
}
pub(crate) fn get_chunked_schema(&self, name: &str) -> Result<ChunkedSchema> {
let meta = self.get_array(name)?;
let full_shape = meta.layout.shape.clone();
let chunk_shape = meta.layout.storage.chunk_shape.clone();
let dtype = meta.dtype.clone();
let mut existing: IndexMap<Vec<u32>, ()> = IndexMap::new();
for delta in &self.deltas {
if let Some(&i) = delta.inner.array_index.get(name) {
for e in &delta.inner.footer.arrays[i].layout.storage.chunks {
existing.entry(e.coord.clone()).or_default();
}
}
}
if let Some(p) = self.pending.as_ref()
&& let Some(m) = p.inner.array_meta.get(name)
{
for e in &m.layout.storage.chunks {
existing.entry(e.coord.clone()).or_default();
}
}
Ok(ChunkedSchema {
full_shape,
chunk_shape,
dtype,
all_coords: existing.into_keys().collect(),
})
}
pub fn list_arrays(&self) -> Vec<MergedArrayMeta> {
let mut seen: IndexMap<String, MergedArrayMeta> = IndexMap::new();
for delta in &self.deltas {
for a in &delta.inner.footer.arrays {
if a.deleted {
seen.shift_remove(&a.name);
} else {
seen.insert(
a.name.clone(),
MergedArrayMeta {
name: a.name.clone(),
dtype: a.dtype.clone(),
shape: a.layout.shape.clone(),
chunk_shape: a.layout.storage.chunk_shape.clone(),
dimension_names: a.layout.dimension_names.clone(),
fill_value: a.fill_value.clone(),
},
);
}
}
}
if let Some(p) = self.pending.as_ref() {
for (name, a) in &p.inner.array_meta {
if a.deleted {
seen.shift_remove(name);
} else {
seen.insert(
name.clone(),
MergedArrayMeta {
name: a.name.clone(),
dtype: a.dtype.clone(),
shape: a.layout.shape.clone(),
chunk_shape: a.layout.storage.chunk_shape.clone(),
dimension_names: a.layout.dimension_names.clone(),
fill_value: a.fill_value.clone(),
},
);
}
}
}
seen.into_values().collect()
}
pub fn array_stats(&self, name: &str) -> Option<&ArrayStats> {
self.stats.as_ref()?.get_array(name)
}
pub fn num_layers(&self) -> usize {
self.deltas.len()
}
pub fn get_attribute(&self, name: &str, key: &str) -> Result<Option<&AttributeValue>> {
let meta = self.get_array(name)?;
let key_idx = match self
.pending
.as_ref()
.and_then(|p| p.inner.attr_keys.iter().position(|k| k == key))
.or_else(|| {
self.deltas
.iter()
.rev()
.find_map(|d| d.inner.footer.attr_keys.iter().position(|k| k == key))
}) {
Some(i) => i,
None => return Ok(None),
};
let val_idx = match meta.attributes.get(key_idx) {
Some(i) => i,
None => return Ok(None),
};
if let Some(p) = self.pending.as_ref()
&& val_idx < p.inner.attr_values.len()
{
return Ok(Some(&p.inner.attr_values[val_idx]));
}
for delta in self.deltas.iter().rev() {
if val_idx < delta.inner.footer.attr_values.len() {
return Ok(Some(&delta.inner.footer.attr_values[val_idx]));
}
}
Ok(None)
}
pub fn set_attribute(&mut self, name: &str, key: &str, value: AttributeValue) -> Result<()> {
let mut existing_meta = self.get_array(name)?.clone();
existing_meta.layout.storage.chunks.clear();
let pending = self.pending_mut();
let key_idx = pending.intern_attr_key(key);
let val_idx = pending.intern_attr_value(value);
if pending.array_meta_mut(name).is_none() {
pending.upsert_array_meta(existing_meta);
}
let meta = pending.array_meta_mut(name).unwrap();
meta.attributes.upsert(key_idx, val_idx);
Ok(())
}
}
impl ArrayFile {
pub fn define_array<T: ArrayElement>(
&mut self,
name: impl Into<String>,
dimension_names: Vec<String>,
shape: Vec<usize>,
chunk_shape: Option<Vec<usize>>,
fill_value: Option<FillValue>,
) -> Result<()> {
let name = name.into();
if self.resolve_array_meta(&name).is_some() {
return Err(Error::ArrayAlreadyExists { name });
}
let shape_u32: Vec<u32> = shape.iter().map(|&s| s as u32).collect();
let ndim = shape_u32.len();
let chunk_shape_u32: Vec<u32> = chunk_shape
.map(|cs| cs.iter().map(|&s| s as u32).collect())
.unwrap_or_else(|| shape_u32.clone());
let dim_names = if dimension_names.len() == ndim {
dimension_names
} else {
(0..ndim).map(|i| format!("dim{i}")).collect()
};
let layout = ArrayLayout {
shape: shape_u32,
dimension_names: dim_names,
storage: StorageLayout {
chunk_shape: chunk_shape_u32,
chunks: vec![],
},
};
self.pending_mut().upsert_array_meta(ArrayMeta {
name,
dtype: T::DTYPE,
layout,
fill_value,
deleted: false,
attributes: Attributes::empty(AttrIndexKind::U16),
});
Ok(())
}
pub fn delete(&mut self, name: &str) -> Result<()> {
let meta = self.get_array(name)?.clone();
self.pending_mut().mark_deleted(meta);
Ok(())
}
}
impl ArrayFile {
pub(crate) async fn read_chunk<T: ArrayElement>(
&self,
name: &str,
coord: &[u32],
) -> Result<Vec<T>> {
if let Some(bytes) = self.resolve_raw_chunk(name, coord).await? {
return Ok(T::decode_chunk(&bytes));
}
let meta = self.get_array(name)?;
let chunk_elems: usize = meta
.layout
.storage
.chunk_shape
.iter()
.enumerate()
.map(|(i, &cs)| {
let axis_len = meta.layout.shape[i] as usize;
let start = coord[i] as usize * cs as usize;
(cs as usize).min(axis_len.saturating_sub(start))
})
.product();
Ok(vec![T::fill_element(meta.fill_value.as_ref()); chunk_elems])
}
pub(crate) fn write_chunk_raw(
&mut self,
name: &str,
coord: Vec<u32>,
bytes: Vec<u8>,
) -> Result<()> {
let snapshot = if self
.pending
.as_ref()
.and_then(|p| p.inner.array_meta.get(name))
.is_none()
{
let mut m = self.get_array(name)?.clone();
m.layout.storage.chunks.clear();
Some(m)
} else {
None
};
let pending = self.pending_mut();
if let Some(meta) = snapshot {
pending.upsert_array_meta(meta);
}
pending.write_raw_chunk(name, coord, &bytes)
}
async fn resolve_raw_chunk(&self, name: &str, coord: &[u32]) -> Result<Option<Bytes>> {
if let Some(p) = self.pending.as_ref()
&& let Some(bytes) = p.read_raw_chunk(name, coord)
{
return Ok(Some(bytes));
}
for delta in self.deltas.iter().rev() {
if let Some(bytes) = delta.read_raw_chunk(name, coord).await? {
return Ok(Some(bytes));
}
}
Ok(None)
}
}
impl ArrayFile {
pub async fn write_array<T: ArrayElement>(
&mut self,
name: &str,
start: Vec<usize>,
data: ndarray::ArrayView<'_, T, ndarray::IxDyn>,
) -> Result<()> {
crate::ndarray_ext::write_nd(self, name, data, &start).await
}
pub async fn read_array<T: ArrayElement>(
&self,
name: &str,
start: Vec<usize>,
shape: Vec<usize>,
) -> Result<ndarray::ArcArray<T, ndarray::IxDyn>> {
use std::ops::Range;
let slice: Option<Vec<Range<usize>>> = if start.is_empty() && shape.is_empty() {
None
} else {
let meta = self.get_array(name)?;
let ndim = meta.layout.shape.len();
let effective_start = if start.len() == ndim {
start.clone()
} else {
vec![0; ndim]
};
let effective_shape: Vec<usize> = if shape.len() == ndim {
shape.clone()
} else {
meta.layout.shape.iter().map(|&s| s as usize).collect()
};
Some(
effective_start
.iter()
.zip(&effective_shape)
.map(|(&s, &sz)| s..s + sz)
.collect(),
)
};
crate::ndarray_ext::assemble_nd(self, name, slice.as_deref()).await
}
}
impl ArrayFile {
pub async fn flush(&mut self) -> Result<()> {
if self.pending.is_none() {
return Ok(());
}
let (store, base_path) = match &self.store_dir {
Some(sd) => (Arc::clone(&sd.store), sd.base_path.clone()),
None => {
return Err(Error::Storage(
"in-memory file: use flush_memory instead".into(),
));
}
};
let overlay_index = self.deltas.len() as u32;
let scar_path = sidecar_path(&base_path, overlay_index);
let delta_path = Arc::<str>::from(scar_path.as_ref());
let storage =
Arc::new(ObjectStoreBackend::new(Arc::clone(&store), scar_path)) as Arc<dyn Storage>;
let hint = base_path.as_ref().to_string();
let dirty_names = self.commit_pending(storage, delta_path, hint).await?;
let merged = self.compute_stats_for(&dirty_names).await?;
let s_storage = ObjectStoreBackend::new(Arc::clone(&store), stats_path(&base_path));
s_storage
.write(bytes::Bytes::from(merged.serialize()?))
.await?;
self.stats = Some(merged);
Ok(())
}
pub async fn flush_memory(&mut self, storage: &InMemoryStorage) -> Result<()> {
if self.pending.is_none() {
return Ok(());
}
let overlay_index = self.deltas.len() as u32;
let delta_path = Arc::<str>::from(format!("__memory_{overlay_index}__").as_str());
let arc: Arc<dyn Storage> = Arc::new(storage.clone());
let dirty_names = self.commit_pending(arc, delta_path, String::new()).await?;
let merged = self.compute_stats_for(&dirty_names).await?;
self.stats = Some(merged);
Ok(())
}
async fn compute_stats_for(&self, dirty_names: &[String]) -> Result<StatsFile> {
let mut merged = self.stats.clone().unwrap_or_default();
for name in dirty_names {
let schema = match self.get_chunked_schema(name) {
Ok(s) => s,
Err(_) => continue,
};
let fill_value = self
.resolve_array_meta(name)
.and_then(|m| m.fill_value.clone());
let shape_product: u64 = schema.full_shape.iter().map(|&s| s as u64).product();
let mut stats = ArrayStats::new(name.clone());
let mut written_non_null: u64 = 0;
for coord in &schema.all_coords {
if let Some(bytes) = self.resolve_raw_chunk(name, coord).await? {
let (min, max, nc, rc) =
compute_chunk_partial(&bytes, &schema.dtype, fill_value.as_ref());
written_non_null += rc - nc;
merge_partial(&mut stats, min, max, nc, rc);
}
}
stats.row_count = shape_product;
stats.null_count = shape_product - written_non_null;
merged.upsert(stats);
}
Ok(merged)
}
async fn commit_pending(
&mut self,
storage: Arc<dyn Storage>,
delta_path: Arc<str>,
base_file_hint: String,
) -> Result<Vec<String>> {
let mutable = self
.pending
.take()
.expect("commit_pending: no pending delta");
let dirty_names: Vec<String> = mutable
.inner
.array_meta
.iter()
.filter(|(_, m)| !m.layout.storage.chunks.is_empty())
.map(|(name, _)| name.clone())
.collect();
let immutable = mutable
.commit(storage, delta_path, self.cache.clone(), base_file_hint)
.await?;
self.deltas.push(immutable);
Ok(dirty_names)
}
}
impl ArrayFile {
pub async fn compact(&mut self) -> Result<()> {
let merged_names: Vec<String> = self.list_arrays().into_iter().map(|m| m.name).collect();
let mut allocator = DeltaAllocator::new(Arc::clone(&self.codec), self.block_target_size);
let mut arrays: Vec<ArrayMeta> = Vec::new();
let mut per_array_stats: Vec<ArrayStats> = Vec::new();
for name in &merged_names {
let meta = self
.resolve_array_meta(name)
.ok_or_else(|| Error::ArrayNotFound { name: name.clone() })?
.clone();
let mut all_coords: indexmap::IndexSet<Vec<u32>> = indexmap::IndexSet::new();
for delta in &self.deltas {
if let Some(&i) = delta.inner.array_index.get(name.as_str()) {
for e in &delta.inner.footer.arrays[i].layout.storage.chunks {
all_coords.insert(e.coord.clone());
}
}
}
let shape_product: u64 = meta.layout.shape.iter().map(|&s| s as u64).product();
let mut new_chunks: Vec<ChunkEntry> = Vec::new();
let mut array_stats = ArrayStats::new(name.clone());
let mut written_non_null: u64 = 0;
for coord in &all_coords {
if let Some(raw) = self.resolve_raw_chunk(name, coord).await? {
let (min, max, nc, rc) =
compute_chunk_partial(&raw, &meta.dtype, meta.fill_value.as_ref());
written_non_null += rc - nc;
merge_partial(&mut array_stats, min, max, nc, rc);
let alloc = allocator.allocate(&raw);
new_chunks.push(ChunkEntry {
coord: coord.clone(),
address: ChunkAddress::from(alloc),
});
}
}
array_stats.row_count = shape_product;
array_stats.null_count = shape_product - written_non_null;
per_array_stats.push(array_stats);
let mut new_meta = meta;
new_meta.layout.storage.chunks = new_chunks;
arrays.push(new_meta);
}
let crate::delta::AllocatorOutput {
mut file,
output_size,
blocks,
} = allocator.commit().await;
let mut attr_keys: Vec<String> = Vec::new();
let mut attr_values: Vec<crate::layout::AttributeValue> = Vec::new();
for delta in &self.deltas {
for k in &delta.inner.footer.attr_keys {
if !attr_keys.contains(k) {
attr_keys.push(k.clone());
}
}
for v in &delta.inner.footer.attr_values {
if !attr_values.contains(v) {
attr_values.push(v.clone());
}
}
}
let footer = Footer {
version: FOOTER_VERSION,
blocks,
arrays,
attr_keys,
attr_values,
overlay_index: 0,
base_file_hint: String::new(),
};
let footer_bytes = footer.serialize()?;
let base_storage: Arc<dyn Storage> = if let Some(sd) = &self.store_dir {
for i in 1..self.deltas.len() {
let _ = sd
.store
.delete(&sidecar_path(&sd.base_path, i as u32))
.await;
}
Arc::new(ObjectStoreBackend::new(
Arc::clone(&sd.store),
sd.base_path.clone(),
))
} else {
Arc::clone(&self.deltas[0].inner.storage)
};
write_file_then_bytes(&mut file, output_size, &footer_bytes, &*base_storage).await?;
let base_delta_path: Arc<str> = if let Some(sd) = &self.store_dir {
Arc::from(sd.base_path.as_ref())
} else {
Arc::from("__memory_0__")
};
let new_base =
Delta::<DeltaImmutable>::open(base_storage, base_delta_path, self.cache.clone())
.await?;
self.deltas = vec![new_base];
let mut new_stats = StatsFile::default();
for s in per_array_stats {
new_stats.upsert(s);
}
if let Some(sd) = &self.store_dir {
let s_storage =
ObjectStoreBackend::new(Arc::clone(&sd.store), stats_path(&sd.base_path));
s_storage
.write(bytes::Bytes::from(new_stats.serialize()?))
.await?;
}
self.stats = Some(new_stats);
Ok(())
}
}
fn resolve_cache<C: CompressionCodec>(config: &FileConfig<C>) -> Option<Arc<DeltaCache>> {
if let Some(c) = &config.cache {
Some(Arc::clone(c))
} else if config.cache_capacity == 0 && config.io_cache_capacity == 0 {
None
} else {
Some(Arc::new(DeltaCache::new(
config.cache_capacity as u64,
config.io_cache_capacity as u64,
)))
}
}
fn sidecar_path(base: &object_store::path::Path, n: u32) -> object_store::path::Path {
let s = base.as_ref();
let without_af = s.strip_suffix(".af").unwrap_or(s);
object_store::path::Path::from(format!("{without_af}.{n}.af").as_str())
}
fn stats_path(base: &object_store::path::Path) -> object_store::path::Path {
let s = base.as_ref();
let without_af = s.strip_suffix(".af").unwrap_or(s);
object_store::path::Path::from(format!("{without_af}.stats").as_str())
}
async fn discover_sidecars_store(
store: &dyn ObjectStore,
base_path: &object_store::path::Path,
) -> Result<Vec<(u32, object_store::path::Path)>> {
use futures::TryStreamExt;
let base_str = base_path.as_ref();
let stem_prefix = base_str
.strip_suffix(".af")
.ok_or_else(|| Error::Storage("path must end with .af".into()))?;
let list_prefix = base_str
.rfind('/')
.map(|pos| object_store::path::Path::from(&base_str[..pos]));
let objects: Vec<_> = store
.list(list_prefix.as_ref())
.try_collect()
.await
.map_err(|e| Error::Storage(e.to_string()))?;
let mut sidecars: Vec<(u32, object_store::path::Path)> = objects
.into_iter()
.filter_map(|meta| {
let s = meta.location.as_ref();
let rest = s.strip_prefix(stem_prefix)?.strip_prefix('.')?;
let (num_str, ext) = rest.rsplit_once('.')?;
if ext != "af" {
return None;
}
let n: u32 = num_str.parse().ok()?;
if n == 0 {
return None;
}
Some((n, meta.location))
})
.collect();
sidecars.sort_by_key(|(n, _)| *n);
Ok(sidecars)
}
async fn write_empty_base(storage: &dyn Storage) -> Result<()> {
let footer = Footer::new();
let bytes = footer.serialize()?;
storage.write(Bytes::from(bytes)).await
}
#[cfg(test)]
mod tests {
use super::*;
use crate::codec::NoCompression;
#[tokio::test]
async fn shared_cache_is_reused_across_files() {
let shared = Arc::new(DeltaCache::new(1024 * 1024, 0));
let mut cfg_a = FileConfig::new(NoCompression);
cfg_a.cache = Some(Arc::clone(&shared));
let file_a = ArrayFile::create_memory(cfg_a).await.unwrap();
let mut cfg_b = FileConfig::new(NoCompression);
cfg_b.cache = Some(Arc::clone(&shared));
let file_b = ArrayFile::create_memory(cfg_b).await.unwrap();
let a = file_a.cache.as_ref().expect("file_a has cache");
let b = file_b.cache.as_ref().expect("file_b has cache");
assert!(Arc::ptr_eq(a, &shared));
assert!(Arc::ptr_eq(b, &shared));
}
}