1use std::io;
2
3use async_trait::async_trait;
4use bytes::Bytes;
5use destream::en;
6use futures::TryStreamExt;
7use get_size::GetSize;
8use get_size_derive::*;
9use safecast::as_type;
10#[cfg(feature = "tensor")]
11use safecast::AsType;
12use tokio::fs;
13use tokio_util::io::StreamReader;
14
15#[cfg(feature = "chain")]
16use tc_chain::ChainBlock;
17#[cfg(any(feature = "btree", feature = "table", feature = "tensor"))]
18use tc_collection::btree;
19#[cfg(feature = "tensor")]
20use tc_collection::tensor;
21use tc_scalar::Scalar;
22use tc_value::Link;
23use tcgeneric::Map;
24
25#[cfg(feature = "tensor")]
27#[derive(Clone, GetSize)]
28pub enum DenseBuffer {
29 F32(tensor::Buffer<f32>),
30 F64(tensor::Buffer<f64>),
31 I16(tensor::Buffer<i16>),
32 I32(tensor::Buffer<i32>),
33 I64(tensor::Buffer<i64>),
34 U8(tensor::Buffer<u8>),
35 U16(tensor::Buffer<u16>),
36 U32(tensor::Buffer<u32>),
37 U64(tensor::Buffer<u64>),
38}
39
40#[cfg(feature = "tensor")]
41as_type!(DenseBuffer, F32, tensor::Buffer<f32>);
42#[cfg(feature = "tensor")]
43as_type!(DenseBuffer, F64, tensor::Buffer<f64>);
44#[cfg(feature = "tensor")]
45as_type!(DenseBuffer, I16, tensor::Buffer<i16>);
46#[cfg(feature = "tensor")]
47as_type!(DenseBuffer, I32, tensor::Buffer<i32>);
48#[cfg(feature = "tensor")]
49as_type!(DenseBuffer, I64, tensor::Buffer<i64>);
50#[cfg(feature = "tensor")]
51as_type!(DenseBuffer, U8, tensor::Buffer<u8>);
52#[cfg(feature = "tensor")]
53as_type!(DenseBuffer, U16, tensor::Buffer<u16>);
54#[cfg(feature = "tensor")]
55as_type!(DenseBuffer, U32, tensor::Buffer<u32>);
56#[cfg(feature = "tensor")]
57as_type!(DenseBuffer, U64, tensor::Buffer<u64>);
58
59#[cfg(feature = "tensor")]
60impl<'en> en::ToStream<'en> for DenseBuffer {
61 fn to_stream<E: en::Encoder<'en>>(&'en self, encoder: E) -> Result<E::Ok, E::Error> {
62 match self {
63 Self::F32(this) => this.to_stream(encoder),
64 Self::F64(this) => this.to_stream(encoder),
65 Self::I16(this) => this.to_stream(encoder),
66 Self::I32(this) => this.to_stream(encoder),
67 Self::I64(this) => this.to_stream(encoder),
68 Self::U8(this) => this.to_stream(encoder),
69 Self::U16(this) => this.to_stream(encoder),
70 Self::U32(this) => this.to_stream(encoder),
71 Self::U64(this) => this.to_stream(encoder),
72 }
73 }
74}
75
76#[derive(Clone, GetSize)]
78pub enum CacheBlock {
79 #[cfg(any(feature = "btree", feature = "table", feature = "tensor"))]
80 BTree(btree::Node),
81 #[cfg(feature = "chain")]
82 Chain(ChainBlock),
83 Class((Link, Map<Scalar>)),
84 Library(Map<Scalar>),
85 #[cfg(feature = "tensor")]
86 Sparse(tensor::Node),
87 #[cfg(feature = "tensor")]
88 Dense(DenseBuffer),
89}
90
91#[async_trait]
92impl<'en> tc_transact::fs::FileSave<'en> for CacheBlock {
93 async fn save(&'en self, file: &mut fs::File) -> Result<u64, io::Error> {
94 match self {
95 #[cfg(any(feature = "btree", feature = "table", feature = "tensor"))]
96 Self::BTree(node) => persist(node, file).await,
97 #[cfg(feature = "chain")]
98 Self::Chain(block) => persist(block, file).await,
99 Self::Class(class) => persist(class, file).await,
100 Self::Library(library) => persist(library, file).await,
101 #[cfg(feature = "tensor")]
102 Self::Dense(dense) => persist(dense, file).await,
103 #[cfg(feature = "tensor")]
104 Self::Sparse(sparse) => persist(sparse, file).await,
105 }
106 }
107}
108
109#[cfg(any(feature = "btree", feature = "table", feature = "tensor"))]
110as_type!(CacheBlock, BTree, btree::Node);
111#[cfg(feature = "chain")]
112as_type!(CacheBlock, Chain, ChainBlock);
113as_type!(CacheBlock, Class, (Link, Map<Scalar>));
114as_type!(CacheBlock, Library, Map<Scalar>);
115#[cfg(feature = "tensor")]
116as_type!(CacheBlock, Sparse, tensor::Node);
117
118#[cfg(feature = "tensor")]
119macro_rules! as_dense_type {
120 ($t:ty) => {
121 impl AsType<tensor::Buffer<$t>> for CacheBlock {
122 fn as_type(&self) -> Option<&tensor::Buffer<$t>> {
123 if let Self::Dense(block) = self {
124 block.as_type()
125 } else {
126 None
127 }
128 }
129
130 fn as_type_mut(&mut self) -> Option<&mut tensor::Buffer<$t>> {
131 if let Self::Dense(block) = self {
132 block.as_type_mut()
133 } else {
134 None
135 }
136 }
137
138 fn into_type(self) -> Option<tensor::Buffer<$t>> {
139 if let Self::Dense(block) = self {
140 block.into_type()
141 } else {
142 None
143 }
144 }
145 }
146
147 impl From<tensor::Buffer<$t>> for CacheBlock {
148 fn from(buffer: tensor::Buffer<$t>) -> Self {
149 Self::Dense(buffer.into())
150 }
151 }
152 };
153}
154
155#[cfg(feature = "tensor")]
156as_dense_type!(f32);
157#[cfg(feature = "tensor")]
158as_dense_type!(f64);
159#[cfg(feature = "tensor")]
160as_dense_type!(i16);
161#[cfg(feature = "tensor")]
162as_dense_type!(i32);
163#[cfg(feature = "tensor")]
164as_dense_type!(i64);
165#[cfg(feature = "tensor")]
166as_dense_type!(u8);
167#[cfg(feature = "tensor")]
168as_dense_type!(u16);
169#[cfg(feature = "tensor")]
170as_dense_type!(u32);
171#[cfg(feature = "tensor")]
172as_dense_type!(u64);
173
174async fn persist<'en, T: en::ToStream<'en>>(
175 data: &'en T,
176 file: &mut fs::File,
177) -> Result<u64, io::Error> {
178 let encoded = tbon::en::encode(data)
179 .map_err(|cause| io::Error::new(io::ErrorKind::InvalidData, cause))?;
180
181 let mut reader = StreamReader::new(
182 encoded
183 .map_ok(Bytes::from)
184 .map_err(|cause| io::Error::new(io::ErrorKind::InvalidData, cause)),
185 );
186
187 tokio::io::copy(&mut reader, file).await
188}