tc_state/
block.rs

1use std::io;
2
3use async_trait::async_trait;
4use bytes::Bytes;
5use destream::en;
6use futures::TryStreamExt;
7use get_size::GetSize;
8use get_size_derive::*;
9use safecast::as_type;
10#[cfg(feature = "tensor")]
11use safecast::AsType;
12use tokio::fs;
13use tokio_util::io::StreamReader;
14
15#[cfg(feature = "chain")]
16use tc_chain::ChainBlock;
17#[cfg(any(feature = "btree", feature = "table", feature = "tensor"))]
18use tc_collection::btree;
19#[cfg(feature = "tensor")]
20use tc_collection::tensor;
21use tc_scalar::Scalar;
22use tc_value::Link;
23use tcgeneric::Map;
24
25/// A block of a [`tensor::Dense`] tensor
26#[cfg(feature = "tensor")]
27#[derive(Clone, GetSize)]
28pub enum DenseBuffer {
29    F32(tensor::Buffer<f32>),
30    F64(tensor::Buffer<f64>),
31    I16(tensor::Buffer<i16>),
32    I32(tensor::Buffer<i32>),
33    I64(tensor::Buffer<i64>),
34    U8(tensor::Buffer<u8>),
35    U16(tensor::Buffer<u16>),
36    U32(tensor::Buffer<u32>),
37    U64(tensor::Buffer<u64>),
38}
39
40#[cfg(feature = "tensor")]
41as_type!(DenseBuffer, F32, tensor::Buffer<f32>);
42#[cfg(feature = "tensor")]
43as_type!(DenseBuffer, F64, tensor::Buffer<f64>);
44#[cfg(feature = "tensor")]
45as_type!(DenseBuffer, I16, tensor::Buffer<i16>);
46#[cfg(feature = "tensor")]
47as_type!(DenseBuffer, I32, tensor::Buffer<i32>);
48#[cfg(feature = "tensor")]
49as_type!(DenseBuffer, I64, tensor::Buffer<i64>);
50#[cfg(feature = "tensor")]
51as_type!(DenseBuffer, U8, tensor::Buffer<u8>);
52#[cfg(feature = "tensor")]
53as_type!(DenseBuffer, U16, tensor::Buffer<u16>);
54#[cfg(feature = "tensor")]
55as_type!(DenseBuffer, U32, tensor::Buffer<u32>);
56#[cfg(feature = "tensor")]
57as_type!(DenseBuffer, U64, tensor::Buffer<u64>);
58
59#[cfg(feature = "tensor")]
60impl<'en> en::ToStream<'en> for DenseBuffer {
61    fn to_stream<E: en::Encoder<'en>>(&'en self, encoder: E) -> Result<E::Ok, E::Error> {
62        match self {
63            Self::F32(this) => this.to_stream(encoder),
64            Self::F64(this) => this.to_stream(encoder),
65            Self::I16(this) => this.to_stream(encoder),
66            Self::I32(this) => this.to_stream(encoder),
67            Self::I64(this) => this.to_stream(encoder),
68            Self::U8(this) => this.to_stream(encoder),
69            Self::U16(this) => this.to_stream(encoder),
70            Self::U32(this) => this.to_stream(encoder),
71            Self::U64(this) => this.to_stream(encoder),
72        }
73    }
74}
75
76/// A cached filesystem block.
77#[derive(Clone, GetSize)]
78pub enum CacheBlock {
79    #[cfg(any(feature = "btree", feature = "table", feature = "tensor"))]
80    BTree(btree::Node),
81    #[cfg(feature = "chain")]
82    Chain(ChainBlock),
83    Class((Link, Map<Scalar>)),
84    Library(Map<Scalar>),
85    #[cfg(feature = "tensor")]
86    Sparse(tensor::Node),
87    #[cfg(feature = "tensor")]
88    Dense(DenseBuffer),
89}
90
91#[async_trait]
92impl<'en> tc_transact::fs::FileSave<'en> for CacheBlock {
93    async fn save(&'en self, file: &mut fs::File) -> Result<u64, io::Error> {
94        match self {
95            #[cfg(any(feature = "btree", feature = "table", feature = "tensor"))]
96            Self::BTree(node) => persist(node, file).await,
97            #[cfg(feature = "chain")]
98            Self::Chain(block) => persist(block, file).await,
99            Self::Class(class) => persist(class, file).await,
100            Self::Library(library) => persist(library, file).await,
101            #[cfg(feature = "tensor")]
102            Self::Dense(dense) => persist(dense, file).await,
103            #[cfg(feature = "tensor")]
104            Self::Sparse(sparse) => persist(sparse, file).await,
105        }
106    }
107}
108
109#[cfg(any(feature = "btree", feature = "table", feature = "tensor"))]
110as_type!(CacheBlock, BTree, btree::Node);
111#[cfg(feature = "chain")]
112as_type!(CacheBlock, Chain, ChainBlock);
113as_type!(CacheBlock, Class, (Link, Map<Scalar>));
114as_type!(CacheBlock, Library, Map<Scalar>);
115#[cfg(feature = "tensor")]
116as_type!(CacheBlock, Sparse, tensor::Node);
117
118#[cfg(feature = "tensor")]
119macro_rules! as_dense_type {
120    ($t:ty) => {
121        impl AsType<tensor::Buffer<$t>> for CacheBlock {
122            fn as_type(&self) -> Option<&tensor::Buffer<$t>> {
123                if let Self::Dense(block) = self {
124                    block.as_type()
125                } else {
126                    None
127                }
128            }
129
130            fn as_type_mut(&mut self) -> Option<&mut tensor::Buffer<$t>> {
131                if let Self::Dense(block) = self {
132                    block.as_type_mut()
133                } else {
134                    None
135                }
136            }
137
138            fn into_type(self) -> Option<tensor::Buffer<$t>> {
139                if let Self::Dense(block) = self {
140                    block.into_type()
141                } else {
142                    None
143                }
144            }
145        }
146
147        impl From<tensor::Buffer<$t>> for CacheBlock {
148            fn from(buffer: tensor::Buffer<$t>) -> Self {
149                Self::Dense(buffer.into())
150            }
151        }
152    };
153}
154
155#[cfg(feature = "tensor")]
156as_dense_type!(f32);
157#[cfg(feature = "tensor")]
158as_dense_type!(f64);
159#[cfg(feature = "tensor")]
160as_dense_type!(i16);
161#[cfg(feature = "tensor")]
162as_dense_type!(i32);
163#[cfg(feature = "tensor")]
164as_dense_type!(i64);
165#[cfg(feature = "tensor")]
166as_dense_type!(u8);
167#[cfg(feature = "tensor")]
168as_dense_type!(u16);
169#[cfg(feature = "tensor")]
170as_dense_type!(u32);
171#[cfg(feature = "tensor")]
172as_dense_type!(u64);
173
174async fn persist<'en, T: en::ToStream<'en>>(
175    data: &'en T,
176    file: &mut fs::File,
177) -> Result<u64, io::Error> {
178    let encoded = tbon::en::encode(data)
179        .map_err(|cause| io::Error::new(io::ErrorKind::InvalidData, cause))?;
180
181    let mut reader = StreamReader::new(
182        encoded
183            .map_ok(Bytes::from)
184            .map_err(|cause| io::Error::new(io::ErrorKind::InvalidData, cause)),
185    );
186
187    tokio::io::copy(&mut reader, file).await
188}