1pub mod select;
2
3use std::marker::PhantomData;
4
5use data_bucket::{Link, INNER_PAGE_SIZE};
6use derive_more::{Display, Error, From};
7use indexset::core::node::NodeLike;
8use indexset::core::pair::Pair;
9#[cfg(feature = "perf_measurements")]
10use performance_measurement_codegen::performance_measurement;
11use rkyv::api::high::HighDeserializer;
12use rkyv::rancor::Strategy;
13use rkyv::ser::allocator::ArenaHandle;
14use rkyv::ser::sharing::Share;
15use rkyv::ser::Serializer;
16use rkyv::util::AlignedVec;
17use rkyv::{Archive, Deserialize, Serialize};
18
19use crate::in_memory::{DataPages, RowWrapper, StorableRow};
20use crate::lock::LockMap;
21use crate::persistence::{InsertOperation, Operation};
22use crate::prelude::PrimaryKeyGeneratorState;
23use crate::primary_key::{PrimaryKeyGenerator, TablePrimaryKey};
24use crate::{in_memory, IndexMap, TableRow, TableSecondaryIndex, TableSecondaryIndexCdc};
25
26#[derive(Debug)]
27pub struct WorkTable<
28 Row,
29 PrimaryKey,
30 AvailableTypes = (),
31 SecondaryIndexes = (),
32 LockType = (),
33 PkGen = <PrimaryKey as TablePrimaryKey>::Generator,
34 PkNodeType = Vec<Pair<PrimaryKey, Link>>,
35 const DATA_LENGTH: usize = INNER_PAGE_SIZE,
36> where
37 PrimaryKey: Clone + Ord + Send + 'static + std::hash::Hash,
38 Row: StorableRow,
39 PkNodeType: NodeLike<Pair<PrimaryKey, Link>> + Send + 'static,
40{
41 pub data: DataPages<Row, DATA_LENGTH>,
42
43 pub pk_map: IndexMap<PrimaryKey, Link, PkNodeType>,
44
45 pub indexes: SecondaryIndexes,
46
47 pub pk_gen: PkGen,
48
49 pub lock_map: LockMap<LockType, PrimaryKey>,
50
51 pub table_name: &'static str,
52
53 pub pk_phantom: PhantomData<PrimaryKey>,
54
55 pub types_phantom: PhantomData<AvailableTypes>,
56}
57
58impl<
60 Row,
61 PrimaryKey,
62 AvailableTypes,
63 SecondaryIndexes,
64 LockType,
65 PkGen,
66 PkNodeType,
67 const DATA_LENGTH: usize,
68 > Default
69 for WorkTable<
70 Row,
71 PrimaryKey,
72 AvailableTypes,
73 SecondaryIndexes,
74 LockType,
75 PkGen,
76 PkNodeType,
77 DATA_LENGTH,
78 >
79where
80 PrimaryKey: Clone + Ord + Send + TablePrimaryKey + std::hash::Hash,
81 SecondaryIndexes: Default,
82 PkGen: Default,
83 PkNodeType: NodeLike<Pair<PrimaryKey, Link>> + Send + 'static,
84 Row: StorableRow,
85 <Row as StorableRow>::WrappedRow: RowWrapper<Row>,
86{
87 fn default() -> Self {
88 Self {
89 data: DataPages::new(),
90 pk_map: IndexMap::default(),
91 indexes: SecondaryIndexes::default(),
92 pk_gen: Default::default(),
93 lock_map: LockMap::new(),
94 table_name: "",
95 pk_phantom: PhantomData,
96 types_phantom: PhantomData,
97 }
98 }
99}
100
101impl<
102 Row,
103 PrimaryKey,
104 AvailableTypes,
105 SecondaryIndexes,
106 LockType,
107 PkGen,
108 PkNodeType,
109 const DATA_LENGTH: usize,
110 >
111 WorkTable<
112 Row,
113 PrimaryKey,
114 AvailableTypes,
115 SecondaryIndexes,
116 LockType,
117 PkGen,
118 PkNodeType,
119 DATA_LENGTH,
120 >
121where
122 Row: TableRow<PrimaryKey>,
123 PrimaryKey: Clone + Ord + Send + TablePrimaryKey + std::hash::Hash,
124 PkNodeType: NodeLike<Pair<PrimaryKey, Link>> + Send + 'static,
125 Row: StorableRow,
126 <Row as StorableRow>::WrappedRow: RowWrapper<Row>,
127{
128 pub fn get_next_pk(&self) -> PrimaryKey
129 where
130 PkGen: PrimaryKeyGenerator<PrimaryKey>,
131 {
132 self.pk_gen.next()
133 }
134
135 #[cfg_attr(
137 feature = "perf_measurements",
138 performance_measurement(prefix_name = "WorkTable")
139 )]
140 pub fn select(&self, pk: PrimaryKey) -> Option<Row>
141 where
142 LockType: 'static,
143 Row: Archive
144 + for<'a> Serialize<
145 Strategy<Serializer<AlignedVec, ArenaHandle<'a>, Share>, rkyv::rancor::Error>,
146 >,
147 <<Row as StorableRow>::WrappedRow as Archive>::Archived:
148 Deserialize<<Row as StorableRow>::WrappedRow, HighDeserializer<rkyv::rancor::Error>>,
149 {
150 let link = self.pk_map.get(&pk).map(|v| v.get().value)?;
151 self.data.select(link).ok()
152 }
153
154 #[cfg_attr(
155 feature = "perf_measurements",
156 performance_measurement(prefix_name = "WorkTable")
157 )]
158 pub fn insert(&self, row: Row) -> Result<PrimaryKey, WorkTableError>
159 where
160 Row: Archive
161 + Clone
162 + for<'a> Serialize<
163 Strategy<Serializer<AlignedVec, ArenaHandle<'a>, Share>, rkyv::rancor::Error>,
164 >,
165 <Row as StorableRow>::WrappedRow: Archive
166 + for<'a> Serialize<
167 Strategy<Serializer<AlignedVec, ArenaHandle<'a>, Share>, rkyv::rancor::Error>,
168 >,
169 PrimaryKey: Clone,
170 AvailableTypes: 'static,
171 SecondaryIndexes: TableSecondaryIndex<Row, AvailableTypes>,
172 LockType: 'static,
173 {
174 let pk = row.get_primary_key().clone();
175 let link = self
176 .data
177 .insert(row.clone())
178 .map_err(WorkTableError::PagesError)?;
179 self.pk_map
180 .insert(pk.clone(), link)
181 .map_or(Ok(()), |_| Err(WorkTableError::AlreadyExists))?;
182 self.indexes.save_row(row, link)?;
183
184 Ok(pk)
185 }
186
187 #[allow(clippy::type_complexity)]
188 pub fn insert_cdc<SecondaryEvents>(
189 &self,
190 row: Row,
191 ) -> Result<
192 (
193 PrimaryKey,
194 Operation<<PkGen as PrimaryKeyGeneratorState>::State, PrimaryKey, SecondaryEvents>,
195 ),
196 WorkTableError,
197 >
198 where
199 Row: Archive
200 + Clone
201 + for<'a> Serialize<
202 Strategy<Serializer<AlignedVec, ArenaHandle<'a>, Share>, rkyv::rancor::Error>,
203 >,
204 <Row as StorableRow>::WrappedRow: Archive
205 + for<'a> Serialize<
206 Strategy<Serializer<AlignedVec, ArenaHandle<'a>, Share>, rkyv::rancor::Error>,
207 >,
208 PrimaryKey: Clone,
209 SecondaryIndexes: TableSecondaryIndex<Row, AvailableTypes>
210 + TableSecondaryIndexCdc<Row, AvailableTypes, SecondaryEvents>,
211 PkGen: PrimaryKeyGeneratorState,
212 {
213 let pk = row.get_primary_key().clone();
214 let (link, bytes) = self
215 .data
216 .insert_cdc(row.clone())
217 .map_err(WorkTableError::PagesError)?;
218 let (exists, primary_key_events) = self.pk_map.insert_cdc(pk.clone(), link);
219 if exists.is_some() {
220 return Err(WorkTableError::AlreadyExists);
221 }
222 let secondary_keys_events = self.indexes.save_row_cdc(row, link)?;
223
224 let op = Operation::Insert(InsertOperation {
225 id: Default::default(),
226 pk_gen_state: self.pk_gen.get_state(),
227 primary_key_events,
228 secondary_keys_events,
229 bytes,
230 link,
231 });
232
233 Ok((pk, op))
234 }
235}
236
237#[derive(Debug, Display, Error, From)]
238pub enum WorkTableError {
239 NotFound,
240 AlreadyExists,
241 SerializeError,
242 PagesError(in_memory::PagesExecutionError),
243}
244
245#[cfg(test)]
246mod tests {
247 }