use crate::{
Circuit, Error,
circuit::checkpointer::Checkpoint,
dynamic::{DataTrait, DynData, DynUnit, Erase, LeanVec, WeightTrait},
trace::{BatchReaderFactories, DbspSerializer, Deserializer, spine_async::WithSnapshot},
};
pub use crate::{
DBData, DBWeight, DynZWeight, Stream, Timestamp, ZWeight,
algebra::{
IndexedZSet as DynIndexedZSet, IndexedZSetReader as DynIndexedZSetReader,
OrdIndexedZSet as DynOrdIndexedZSet, OrdZSet as DynOrdZSet,
VecIndexedZSet as DynVecIndexedZSet, VecZSet as DynVecZSet, ZSet as DynZSet,
ZSetReader as DynZSetReader,
},
trace::{
Batch as DynBatch, BatchReader as DynBatchReader,
BatchReaderWithSnapshot as DynBatchReaderWithSnapshot,
FallbackIndexedWSet as DynFallbackIndexedWSet, FallbackKeyBatch as DynFallbackKeyBatch,
FallbackValBatch as DynFallbackValBatch, FallbackWSet as DynFallbackWSet,
FileIndexedWSet as DynFileIndexedWSet, FileKeyBatch as DynFileKeyBatch,
FileValBatch as DynFileValBatch, FileWSet as DynFileWSet,
OrdIndexedWSet as DynOrdIndexedWSet, OrdKeyBatch as DynOrdKeyBatch,
OrdValBatch as DynOrdValBatch, OrdWSet as DynOrdWSet, Spine as DynSpine,
SpineSnapshot as DynSpineSnapshot, Trace as DynTrace, VecIndexedWSet as DynVecIndexedWSet,
VecKeyBatch as DynVecKeyBatch, VecValBatch as DynVecValBatch, VecWSet as DynVecWSet,
merge_batches as dyn_merge_batches, merge_batches_by_reference,
},
};
use dyn_clone::clone_box;
use rkyv::{Archive, Archived, Deserialize, Fallible, Serialize};
use size_of::SizeOf;
use std::{
marker::PhantomData,
ops::{Deref, DerefMut, Neg},
sync::Arc,
};
use crate::{
NumEntries,
algebra::{AddAssignByRef, AddByRef, HasZero, NegByRef},
dynamic::DowncastTrait,
utils::Tup2,
};
pub trait BatchReader: 'static {
type Inner: DynBatchReader<Time = Self::Time, Key = Self::DynK, Val = Self::DynV, R = Self::DynR>;
type IntoBatch: DynBatch<Time = Self::Time, Key = Self::DynK, Val = Self::DynV, R = Self::DynR>;
type Key: DBData + Erase<Self::DynK>;
type Val: DBData + Erase<Self::DynV>;
type R: DBWeight + Erase<Self::DynR>;
type DynK: DataTrait + ?Sized;
type DynV: DataTrait + ?Sized;
type DynR: WeightTrait + ?Sized;
type Time: Timestamp;
fn factories() -> <Self::Inner as DynBatchReader>::Factories {
BatchReaderFactories::new::<Self::Key, Self::Val, Self::R>()
}
fn inner(&self) -> &Self::Inner;
fn inner_mut(&mut self) -> &mut Self::Inner;
fn into_inner(self) -> Self::Inner;
fn from_inner(inner: Self::Inner) -> Self;
fn stream_inner<C: Clone>(stream: &Stream<C, Self>) -> Stream<C, Self::Inner>
where
Self: Sized;
fn stream_from_inner<C: Clone>(stream: &Stream<C, Self::Inner>) -> Stream<C, Self>
where
Self: Sized;
fn into_dyn_batches(self) -> Vec<Arc<Self::IntoBatch>>;
fn into_batches(self) -> Vec<Arc<TypedBatch<Self::Key, Self::Val, Self::R, Self::IntoBatch>>>;
fn dyn_batches(&self) -> Vec<Arc<Self::IntoBatch>>;
fn batches(&self) -> Vec<Arc<TypedBatch<Self::Key, Self::Val, Self::R, Self::IntoBatch>>>;
fn dyn_snapshot(&self) -> DynSpineSnapshot<Self::IntoBatch>;
fn into_dyn_snapshot(self) -> DynSpineSnapshot<Self::IntoBatch>;
}
pub trait Batch: BatchReader<Inner = Self::InnerBatch> + Clone {
type InnerBatch: DynBatch<Time = Self::Time, Key = Self::DynK, Val = Self::DynV, R = Self::DynR>;
fn filter<F>(&self, predicate: F) -> Self
where
F: Fn(&Self::Key, &Self::Val) -> bool,
Self::Time: PartialEq<()> + From<()>,
{
Self::from_inner(
self.inner()
.filter(&|k, v| unsafe { predicate(k.downcast(), v.downcast()) }),
)
}
}
impl<B> Batch for B
where
B: BatchReader + Clone,
B::Inner: DynBatch,
{
type InnerBatch = B::Inner;
}
pub trait Trace: BatchReader<Inner = Self::InnerTrace> {
type InnerTrace: DynTrace<Time = Self::Time, Key = Self::DynK, Val = Self::DynV, R = Self::DynR>;
}
impl<T> Trace for T
where
T: BatchReader,
T::Inner: DynTrace<Time = T::Time, Key = T::DynK, Val = T::DynV, R = T::DynR>,
{
type InnerTrace = <T as BatchReader>::Inner;
}
pub trait IndexedZSetReader: BatchReader<R = ZWeight, DynR = DynZWeight, Time = ()> {
fn iter(&self) -> impl Iterator<Item = (Self::Key, Self::Val, ZWeight)> + '_ {
self.inner().iter().map(|(boxk, boxv, w)| unsafe {
(
boxk.as_ref().downcast::<Self::Key>().clone(),
boxv.as_ref().downcast::<Self::Val>().clone(),
w,
)
})
}
}
impl<Z> IndexedZSetReader for Z where Z: BatchReader<R = ZWeight, DynR = DynZWeight, Time = ()> {}
pub trait IndexedZSet:
Batch<R = ZWeight, DynR = DynZWeight, Time = (), InnerBatch = Self::InnerIndexedZSet>
{
type InnerIndexedZSet: DynIndexedZSet<Time = Self::Time, Key = Self::DynK, Val = Self::DynV, R = Self::DynR>;
}
impl<Z> IndexedZSet for Z
where
Z: Batch<R = ZWeight, DynR = DynZWeight, Time = ()>,
Z::InnerBatch: DynIndexedZSet,
{
type InnerIndexedZSet = Z::InnerBatch;
}
pub trait ZSetReader: IndexedZSetReader<Val = ()> {}
impl<Z> ZSetReader for Z where Z: IndexedZSetReader<Val = ()> {}
pub trait ZSet: IndexedZSet<Val = (), DynV = DynUnit> {
fn weighted_count(&self) -> ZWeight;
}
impl<Z> ZSet for Z
where
Z: IndexedZSet<Val = (), DynV = DynUnit>,
{
fn weighted_count(&self) -> ZWeight {
let mut w = 0;
self.inner().weighted_count(w.erase_mut());
w
}
}
#[derive(Debug, Clone, Eq, SizeOf)]
#[repr(transparent)]
pub struct TypedBatch<K, V, R, B> {
inner: B,
phantom: PhantomData<fn(&K, &V, &R)>,
}
impl<K, V, R, B, B2> PartialEq<TypedBatch<K, V, R, B2>> for TypedBatch<K, V, R, B>
where
B: PartialEq<B2>,
{
fn eq(&self, other: &TypedBatch<K, V, R, B2>) -> bool {
self.inner.eq(&other.inner)
}
}
impl<K, V, R, B> Default for TypedBatch<K, V, R, B>
where
B: DynBatch,
K: DBData + Erase<B::Key>,
V: DBData + Erase<B::Val>,
R: DBWeight + Erase<B::R>,
{
fn default() -> Self {
Self::new(B::dyn_empty(&BatchReaderFactories::new::<K, V, R>()))
}
}
impl<K, V, R, B> Deref for TypedBatch<K, V, R, B> {
type Target = B;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl<K, V, R, B> HasZero for TypedBatch<K, V, R, B>
where
B: DynBatch<Time = ()>,
K: DBData + Erase<B::Key>,
V: DBData + Erase<B::Val>,
R: DBWeight + Erase<B::R>,
{
fn zero() -> Self {
Self::new(B::dyn_empty(&BatchReaderFactories::new::<K, V, R>()))
}
fn is_zero(&self) -> bool {
self.is_empty()
}
}
impl<K, V, R, B> Neg for TypedBatch<K, V, R, B>
where
B: DynBatchReader + Neg<Output = B>,
K: DBData + Erase<B::Key>,
V: DBData + Erase<B::Val>,
R: DBWeight + Erase<B::R>,
{
type Output = Self;
fn neg(self) -> Self::Output {
Self::new(self.inner.neg())
}
}
impl<K, V, R, B> NegByRef for TypedBatch<K, V, R, B>
where
B: DynBatchReaderWithSnapshot + NegByRef,
K: DBData + Erase<B::Key>,
V: DBData + Erase<B::Val>,
R: DBWeight + Erase<B::R>,
{
fn neg_by_ref(&self) -> Self {
Self::new(self.inner().neg_by_ref())
}
}
impl<K, V, R, B> AddByRef for TypedBatch<K, V, R, B>
where
B: DynBatchReaderWithSnapshot + AddByRef,
K: DBData + Erase<B::Key>,
V: DBData + Erase<B::Val>,
R: DBWeight + Erase<B::R>,
{
fn add_by_ref(&self, other: &Self) -> Self {
Self::new(self.inner.add_by_ref(other.inner()))
}
}
impl<K, V, R, B> AddAssignByRef for TypedBatch<K, V, R, B>
where
B: DynBatchReaderWithSnapshot + AddAssignByRef,
K: DBData + Erase<B::Key>,
V: DBData + Erase<B::Val>,
R: DBWeight + Erase<B::R>,
{
fn add_assign_by_ref(&mut self, other: &Self) {
self.inner.add_assign_by_ref(other.inner())
}
}
impl<K, V, R, B> NumEntries for TypedBatch<K, V, R, B>
where
B: DynBatchReader,
K: DBData + Erase<B::Key>,
V: DBData + Erase<B::Val>,
R: DBWeight + Erase<B::R>,
{
const CONST_NUM_ENTRIES: Option<usize> = B::CONST_NUM_ENTRIES;
fn num_entries_shallow(&self) -> usize {
self.inner.num_entries_shallow()
}
fn num_entries_deep(&self) -> usize {
self.inner.num_entries_deep()
}
}
impl<K, V, R, B> DerefMut for TypedBatch<K, V, R, B> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}
impl<K, V, R, B> TypedBatch<K, V, R, B> {
pub fn new(inner: B) -> Self {
Self {
inner,
phantom: PhantomData,
}
}
}
impl<K, V, R, B> TypedBatch<K, V, R, B>
where
B: DynBatch,
K: DBData + Erase<B::Key>,
V: DBData + Erase<B::Val>,
R: DBWeight + Erase<B::R>,
{
pub fn empty() -> Self {
Self::new(B::dyn_empty(&BatchReaderFactories::new::<K, V, R>()))
}
pub fn from_tuples(time: B::Time, tuples: Vec<Tup2<Tup2<K, V>, R>>) -> Self {
Self::new(B::dyn_from_tuples(
&BatchReaderFactories::new::<K, V, R>(),
time,
&mut Box::new(LeanVec::from(tuples)).erase_box(),
))
}
pub fn merge(&self, other: &Self) -> Self {
Self::new(merge_batches_by_reference(
&self.inner.factories(),
[&self.inner, &other.inner],
&None,
&None,
))
}
pub fn merge_batches<I>(batches: I) -> Self
where
I: IntoIterator<Item = Self>,
{
Self::new(dyn_merge_batches(
&Self::factories(),
batches.into_iter().map(|b| b.into_inner()),
&None,
&None,
))
}
}
impl<K, V, R, B> TypedBatch<K, V, R, B> {
pub fn into_inner(self) -> B {
self.inner
}
}
impl<K, R, B> TypedBatch<K, (), R, B>
where
B: DynBatch,
K: DBData + Erase<B::Key>,
(): Erase<B::Val>,
R: DBWeight + Erase<B::R>,
{
pub fn from_keys(time: B::Time, tuples: Vec<Tup2<K, R>>) -> Self {
Self::from_tuples(
time,
tuples
.into_iter()
.map(|Tup2(k, r)| Tup2(Tup2(k, ()), r))
.collect::<Vec<_>>(),
)
}
}
impl<K, V, R, B> BatchReader for TypedBatch<K, V, R, B>
where
B: DynBatchReaderWithSnapshot,
K: DBData + Erase<B::Key>,
V: DBData + Erase<B::Val>,
R: DBWeight + Erase<B::R>,
{
type Inner = B;
type IntoBatch = B::Batch;
type Key = K;
type Val = V;
type R = R;
type Time = B::Time;
type DynK = B::Key;
type DynV = B::Val;
type DynR = B::R;
fn inner(&self) -> &Self::Inner {
&self.inner
}
fn inner_mut(&mut self) -> &mut Self::Inner {
&mut self.inner
}
fn into_inner(self) -> Self::Inner {
self.inner
}
fn from_inner(inner: Self::Inner) -> Self {
Self {
inner,
phantom: PhantomData,
}
}
fn stream_inner<C: Clone>(stream: &Stream<C, Self>) -> Stream<C, B> {
unsafe { stream.transmute_payload() }
}
fn stream_from_inner<C: Clone>(stream: &Stream<C, Self::Inner>) -> Stream<C, Self> {
unsafe { stream.transmute_payload() }
}
fn into_dyn_batches(self) -> Vec<Arc<Self::IntoBatch>> {
self.inner.into_ro_snapshot().into_batches()
}
fn into_dyn_snapshot(self) -> DynSpineSnapshot<Self::IntoBatch> {
self.inner.into_ro_snapshot()
}
fn into_batches(self) -> Vec<Arc<TypedBatch<Self::Key, Self::Val, Self::R, Self::IntoBatch>>> {
unsafe {
std::mem::transmute::<
Vec<Arc<Self::IntoBatch>>,
Vec<Arc<TypedBatch<Self::Key, Self::Val, Self::R, Self::IntoBatch>>>,
>(self.into_dyn_batches())
}
}
fn dyn_batches(&self) -> Vec<Arc<Self::IntoBatch>> {
self.inner.ro_snapshot().into_batches()
}
fn batches(&self) -> Vec<Arc<TypedBatch<Self::Key, Self::Val, Self::R, Self::IntoBatch>>> {
unsafe {
std::mem::transmute::<
Vec<Arc<Self::IntoBatch>>,
Vec<Arc<TypedBatch<Self::Key, Self::Val, Self::R, Self::IntoBatch>>>,
>(self.dyn_batches())
}
}
fn dyn_snapshot(&self) -> DynSpineSnapshot<Self::IntoBatch> {
self.inner.ro_snapshot()
}
}
impl<K, V, R, B> Checkpoint for TypedBatch<K, V, R, B>
where
B: Checkpoint,
K: DBData,
V: DBData,
R: DBWeight,
{
fn checkpoint(&self) -> Result<Vec<u8>, Error> {
self.inner.checkpoint()
}
fn restore(&mut self, data: &[u8]) -> Result<(), Error> {
self.inner.restore(data)
}
}
pub type OrdWSet<K, R, DynR> = TypedBatch<K, (), R, DynOrdWSet<DynData, DynR>>;
pub type OrdZSet<K> = TypedBatch<K, (), ZWeight, DynOrdZSet<DynData>>;
pub type OrdIndexedWSet<K, V, R, DynR> =
TypedBatch<K, V, R, DynOrdIndexedWSet<DynData, DynData, DynR>>;
pub type OrdIndexedZSet<K, V> = TypedBatch<K, V, ZWeight, DynOrdIndexedZSet<DynData, DynData>>;
pub type OrdKeyBatch<K, T, R, DynR> = TypedBatch<K, (), R, DynOrdKeyBatch<DynData, T, DynR>>;
pub type OrdValBatch<K, V, T, R, DynR> =
TypedBatch<K, V, R, DynOrdValBatch<DynData, DynData, T, DynR>>;
pub type VecWSet<K, R, DynR> = TypedBatch<K, (), R, DynVecWSet<DynData, DynR>>;
pub type VecZSet<K> = TypedBatch<K, (), ZWeight, DynVecZSet<DynData>>;
pub type VecIndexedWSet<K, V, R, DynR> =
TypedBatch<K, V, R, DynVecIndexedWSet<DynData, DynData, DynR>>;
pub type VecIndexedZSet<K, V> = TypedBatch<K, V, ZWeight, DynVecIndexedZSet<DynData, DynData>>;
pub type VecKeyBatch<K, T, R, DynR> = TypedBatch<K, (), R, DynVecKeyBatch<DynData, T, DynR>>;
pub type VecValBatch<K, V, T, R, DynR> =
TypedBatch<K, V, R, DynVecValBatch<DynData, DynData, T, DynR>>;
pub type FileWSet<K, R, DynR> = TypedBatch<K, (), R, DynFileWSet<DynData, DynR>>;
pub type FileZSet<K> = TypedBatch<K, (), ZWeight, DynFileWSet<DynData, DynZWeight>>;
pub type FileIndexedWSet<K, V, R, DynR> =
TypedBatch<K, V, R, DynFileIndexedWSet<DynData, DynData, DynR>>;
pub type FileIndexedZSet<K, V> =
TypedBatch<K, V, ZWeight, DynFileIndexedWSet<DynData, DynData, DynZWeight>>;
pub type FileKeyBatch<K, T, R, DynR> = TypedBatch<K, (), R, DynFileKeyBatch<DynData, T, DynR>>;
pub type FileValBatch<K, V, T, R, DynR> =
TypedBatch<K, V, R, DynFileValBatch<DynData, DynData, T, DynR>>;
pub type FallbackWSet<K, R, DynR> = TypedBatch<K, (), R, DynFallbackWSet<DynData, DynR>>;
pub type FallbackZSet<K> = TypedBatch<K, (), ZWeight, DynFallbackWSet<DynData, DynZWeight>>;
pub type FallbackIndexedWSet<K, V, R, DynR> =
TypedBatch<K, V, R, DynFallbackIndexedWSet<DynData, DynData, DynR>>;
pub type FallbackIndexedZSet<K, V> =
TypedBatch<K, V, ZWeight, DynFallbackIndexedWSet<DynData, DynData, DynZWeight>>;
pub type FallbackKeyBatch<K, T, R, DynR> =
TypedBatch<K, (), R, DynFallbackKeyBatch<DynData, T, DynR>>;
pub type FallbackValBatch<K, V, T, R, DynR> =
TypedBatch<K, V, R, DynFallbackValBatch<DynData, DynData, T, DynR>>;
pub type Spine<B> = TypedBatch<
<B as BatchReader>::Key,
<B as BatchReader>::Val,
<B as BatchReader>::R,
DynSpine<<B as BatchReader>::Inner>,
>;
pub type SpineSnapshot<B> = TypedBatch<
<B as BatchReader>::Key,
<B as BatchReader>::Val,
<B as BatchReader>::R,
DynSpineSnapshot<<B as BatchReader>::Inner>,
>;
impl<K, V, R, B> TypedBatch<K, V, R, DynSpineSnapshot<B>>
where
B: DynBatch,
K: DBData + Erase<B::Key>,
V: DBData + Erase<B::Val>,
R: DBWeight + Erase<B::R>,
{
pub fn concat<'a, I>(snapshots: I) -> TypedBatch<K, V, R, DynSpineSnapshot<B>>
where
I: IntoIterator<Item = &'a Self>,
{
TypedBatch::new(DynSpineSnapshot::concat(
BatchReaderFactories::new::<K, V, R>(),
snapshots.into_iter().map(|snapshot| &snapshot.inner),
))
}
pub fn consolidate(&self) -> TypedBatch<K, V, R, B> {
TypedBatch::new(self.inner.consolidate())
}
}
impl<K, V, R, B> TypedBatch<K, V, R, B>
where
B: DynTrace,
K: DBData + Erase<B::Key>,
V: DBData + Erase<B::Val>,
R: DBWeight + Erase<B::R>,
{
pub fn consolidate(self) -> TypedBatch<K, V, R, B::Batch> {
TypedBatch::new(
self.inner
.consolidate()
.unwrap_or_else(|| B::Batch::dyn_empty(&BatchReaderFactories::new::<K, V, R>())),
)
}
}
impl<K, V, R, B> TypedBatch<K, V, R, DynSpine<B>>
where
B: DynBatch,
K: DBData + Erase<B::Key>,
V: DBData + Erase<B::Val>,
R: DBWeight + Erase<B::R>,
{
pub fn ro_snapshot(&self) -> TypedBatch<K, V, R, DynSpineSnapshot<B>> {
TypedBatch::new(self.inner.ro_snapshot())
}
}
impl<C: Clone, B: BatchReader> Stream<C, B> {
pub fn inner(&self) -> Stream<C, B::Inner> {
BatchReader::stream_inner(self)
}
}
impl<C: Clone, B: DynBatchReader> Stream<C, B> {
pub fn typed<TB>(&self) -> Stream<C, TB>
where
TB: BatchReader<Inner = B>,
{
unsafe { self.transmute_payload() }
}
}
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, SizeOf)]
#[repr(transparent)]
pub struct TypedBox<T, D: ?Sized> {
inner: Box<D>,
phantom: PhantomData<fn(&T)>,
}
#[derive(PartialEq, Eq, PartialOrd, Ord)]
pub struct ArchivedTypedBox<T: Archive>(<T as Archive>::Archived)
where
<T as Archive>::Archived: PartialEq + Eq + PartialOrd + Ord;
impl<T, D> Archive for TypedBox<T, D>
where
T: DBData + Erase<D>,
D: DataTrait + ?Sized,
{
type Archived = ArchivedTypedBox<T>;
type Resolver = <T as Archive>::Resolver;
unsafe fn resolve(&self, pos: usize, resolver: Self::Resolver, out: *mut Self::Archived) {
unsafe {
let val: &T = self.deref();
val.resolve(pos, resolver, &mut (*out).0 as *mut T::Archived);
}
}
}
impl<T, D> Serialize<DbspSerializer<'_>> for TypedBox<T, D>
where
T: DBData + Erase<D>,
D: DataTrait + ?Sized,
{
fn serialize(
&self,
serializer: &mut DbspSerializer,
) -> Result<Self::Resolver, <DbspSerializer<'_> as Fallible>::Error> {
let val: &T = self.deref();
val.serialize(serializer)
}
}
impl<T, D> Deserialize<TypedBox<T, D>, Deserializer> for Archived<TypedBox<T, D>>
where
D: DataTrait + ?Sized,
T: DBData + Erase<D>,
{
fn deserialize(
&self,
deserializer: &mut Deserializer,
) -> Result<TypedBox<T, D>, <Deserializer as Fallible>::Error> {
let val: T = self.0.deserialize(deserializer)?;
Ok(TypedBox::new(val))
}
}
#[cfg(test)]
#[test]
fn test_typedbox_rkyv() {
use rkyv::archived_value;
use crate::storage::file::SerializerInner;
let tbox = TypedBox::<u64, DynData>::new(12345u64);
let bytes = SerializerInner::to_fbuf_with_thread_local(|s| {
rkyv::ser::Serializer::serialize_value(s, &tbox)
})
.into_vec();
let archived: &<TypedBox<u64, DynData> as Archive>::Archived =
unsafe { archived_value::<TypedBox<u64, DynData>>(bytes.as_slice(), 0) };
let tbox2 = archived.deserialize(&mut Deserializer::default()).unwrap();
assert_eq!(tbox, tbox2);
}
impl<T, D> Deref for TypedBox<T, D>
where
D: DataTrait + ?Sized,
T: DBData + Erase<D>,
{
type Target = T;
fn deref(&self) -> &T {
unsafe { self.inner.downcast() }
}
}
impl<T, D> DerefMut for TypedBox<T, D>
where
D: DataTrait + ?Sized,
T: DBData + Erase<D>,
{
fn deref_mut(&mut self) -> &mut T {
unsafe { self.inner.downcast_mut() }
}
}
impl<T, D: ?Sized> NumEntries for TypedBox<T, D> {
const CONST_NUM_ENTRIES: Option<usize> = None;
fn num_entries_shallow(&self) -> usize {
1
}
fn num_entries_deep(&self) -> usize {
1
}
}
impl<T, D: DataTrait + ?Sized> TypedBox<T, D> {
pub fn new(v: T) -> Self
where
T: DBData + Erase<D>,
{
Self {
inner: Box::new(v).erase_box(),
phantom: PhantomData,
}
}
pub fn inner(&self) -> &D {
self.inner.as_ref()
}
pub fn into_inner(self) -> Box<D> {
self.inner
}
}
impl<T, D> Clone for TypedBox<T, D>
where
D: DataTrait + ?Sized,
{
fn clone(&self) -> Self {
Self {
inner: clone_box(self.inner.as_ref()),
phantom: PhantomData,
}
}
}
impl<C: Clone, D: DataTrait + ?Sized> Stream<C, Box<D>> {
pub unsafe fn typed_data<T>(&self) -> Stream<C, TypedBox<T, D>>
where
T: DBData + Erase<D>,
{
unsafe { self.transmute_payload() }
}
}
impl<C: Circuit, T, D: ?Sized> Stream<C, TypedBox<T, D>> {
pub fn inner_data(&self) -> Stream<C, Box<D>> {
unsafe { self.transmute_payload() }
}
}
impl<C: Circuit, T: DBData, D: DataTrait + ?Sized> Stream<C, TypedBox<T, D>> {
pub fn inner_typed(&self) -> Stream<C, T> {
self.apply(|typed_box| unsafe { typed_box.inner().downcast::<T>().clone() })
}
}
impl<C: Circuit, T: DBData> Stream<C, T> {
pub fn typed_box<D>(&self) -> Stream<C, TypedBox<T, D>>
where
D: DataTrait + ?Sized,
T: Erase<D>,
{
self.apply(|x| TypedBox::new(x.clone()))
}
}