worktable/table/
mod.rs

1pub mod select;
2
3use std::marker::PhantomData;
4
5use data_bucket::{Link, INNER_PAGE_SIZE};
6use derive_more::{Display, Error, From};
7use indexset::core::node::NodeLike;
8use indexset::core::pair::Pair;
9#[cfg(feature = "perf_measurements")]
10use performance_measurement_codegen::performance_measurement;
11use rkyv::api::high::HighDeserializer;
12use rkyv::rancor::Strategy;
13use rkyv::ser::allocator::ArenaHandle;
14use rkyv::ser::sharing::Share;
15use rkyv::ser::Serializer;
16use rkyv::util::AlignedVec;
17use rkyv::{Archive, Deserialize, Serialize};
18
19use crate::in_memory::{DataPages, RowWrapper, StorableRow};
20use crate::lock::LockMap;
21use crate::persistence::{InsertOperation, Operation};
22use crate::prelude::PrimaryKeyGeneratorState;
23use crate::primary_key::{PrimaryKeyGenerator, TablePrimaryKey};
24use crate::{in_memory, IndexMap, TableRow, TableSecondaryIndex, TableSecondaryIndexCdc};
25
26#[derive(Debug)]
27pub struct WorkTable<
28    Row,
29    PrimaryKey,
30    AvailableTypes = (),
31    SecondaryIndexes = (),
32    LockType = (),
33    PkGen = <PrimaryKey as TablePrimaryKey>::Generator,
34    PkNodeType = Vec<Pair<PrimaryKey, Link>>,
35    const DATA_LENGTH: usize = INNER_PAGE_SIZE,
36> where
37    PrimaryKey: Clone + Ord + Send + 'static + std::hash::Hash,
38    Row: StorableRow,
39    PkNodeType: NodeLike<Pair<PrimaryKey, Link>> + Send + 'static,
40{
41    pub data: DataPages<Row, DATA_LENGTH>,
42
43    pub pk_map: IndexMap<PrimaryKey, Link, PkNodeType>,
44
45    pub indexes: SecondaryIndexes,
46
47    pub pk_gen: PkGen,
48
49    pub lock_map: LockMap<LockType, PrimaryKey>,
50
51    pub table_name: &'static str,
52
53    pub pk_phantom: PhantomData<PrimaryKey>,
54
55    pub types_phantom: PhantomData<AvailableTypes>,
56}
57
58// Manual implementations to avoid unneeded trait bounds.
59impl<
60        Row,
61        PrimaryKey,
62        AvailableTypes,
63        SecondaryIndexes,
64        LockType,
65        PkGen,
66        PkNodeType,
67        const DATA_LENGTH: usize,
68    > Default
69    for WorkTable<
70        Row,
71        PrimaryKey,
72        AvailableTypes,
73        SecondaryIndexes,
74        LockType,
75        PkGen,
76        PkNodeType,
77        DATA_LENGTH,
78    >
79where
80    PrimaryKey: Clone + Ord + Send + TablePrimaryKey + std::hash::Hash,
81    SecondaryIndexes: Default,
82    PkGen: Default,
83    PkNodeType: NodeLike<Pair<PrimaryKey, Link>> + Send + 'static,
84    Row: StorableRow,
85    <Row as StorableRow>::WrappedRow: RowWrapper<Row>,
86{
87    fn default() -> Self {
88        Self {
89            data: DataPages::new(),
90            pk_map: IndexMap::default(),
91            indexes: SecondaryIndexes::default(),
92            pk_gen: Default::default(),
93            lock_map: LockMap::new(),
94            table_name: "",
95            pk_phantom: PhantomData,
96            types_phantom: PhantomData,
97        }
98    }
99}
100
101impl<
102        Row,
103        PrimaryKey,
104        AvailableTypes,
105        SecondaryIndexes,
106        LockType,
107        PkGen,
108        PkNodeType,
109        const DATA_LENGTH: usize,
110    >
111    WorkTable<
112        Row,
113        PrimaryKey,
114        AvailableTypes,
115        SecondaryIndexes,
116        LockType,
117        PkGen,
118        PkNodeType,
119        DATA_LENGTH,
120    >
121where
122    Row: TableRow<PrimaryKey>,
123    PrimaryKey: Clone + Ord + Send + TablePrimaryKey + std::hash::Hash,
124    PkNodeType: NodeLike<Pair<PrimaryKey, Link>> + Send + 'static,
125    Row: StorableRow,
126    <Row as StorableRow>::WrappedRow: RowWrapper<Row>,
127{
128    pub fn get_next_pk(&self) -> PrimaryKey
129    where
130        PkGen: PrimaryKeyGenerator<PrimaryKey>,
131    {
132        self.pk_gen.next()
133    }
134
135    /// Selects `Row` from table identified with provided primary key. Returns `None` if no value presented.
136    #[cfg_attr(
137        feature = "perf_measurements",
138        performance_measurement(prefix_name = "WorkTable")
139    )]
140    pub fn select(&self, pk: PrimaryKey) -> Option<Row>
141    where
142        LockType: 'static,
143        Row: Archive
144            + for<'a> Serialize<
145                Strategy<Serializer<AlignedVec, ArenaHandle<'a>, Share>, rkyv::rancor::Error>,
146            >,
147        <<Row as StorableRow>::WrappedRow as Archive>::Archived:
148            Deserialize<<Row as StorableRow>::WrappedRow, HighDeserializer<rkyv::rancor::Error>>,
149    {
150        let link = self.pk_map.get(&pk).map(|v| v.get().value)?;
151        self.data.select(link).ok()
152    }
153
154    #[cfg_attr(
155        feature = "perf_measurements",
156        performance_measurement(prefix_name = "WorkTable")
157    )]
158    pub fn insert(&self, row: Row) -> Result<PrimaryKey, WorkTableError>
159    where
160        Row: Archive
161            + Clone
162            + for<'a> Serialize<
163                Strategy<Serializer<AlignedVec, ArenaHandle<'a>, Share>, rkyv::rancor::Error>,
164            >,
165        <Row as StorableRow>::WrappedRow: Archive
166            + for<'a> Serialize<
167                Strategy<Serializer<AlignedVec, ArenaHandle<'a>, Share>, rkyv::rancor::Error>,
168            >,
169        PrimaryKey: Clone,
170        AvailableTypes: 'static,
171        SecondaryIndexes: TableSecondaryIndex<Row, AvailableTypes>,
172        LockType: 'static,
173    {
174        let pk = row.get_primary_key().clone();
175        let link = self
176            .data
177            .insert(row.clone())
178            .map_err(WorkTableError::PagesError)?;
179        self.pk_map
180            .insert(pk.clone(), link)
181            .map_or(Ok(()), |_| Err(WorkTableError::AlreadyExists))?;
182        self.indexes.save_row(row, link)?;
183
184        Ok(pk)
185    }
186
187    #[allow(clippy::type_complexity)]
188    pub fn insert_cdc<SecondaryEvents>(
189        &self,
190        row: Row,
191    ) -> Result<
192        (
193            PrimaryKey,
194            Operation<<PkGen as PrimaryKeyGeneratorState>::State, PrimaryKey, SecondaryEvents>,
195        ),
196        WorkTableError,
197    >
198    where
199        Row: Archive
200            + Clone
201            + for<'a> Serialize<
202                Strategy<Serializer<AlignedVec, ArenaHandle<'a>, Share>, rkyv::rancor::Error>,
203            >,
204        <Row as StorableRow>::WrappedRow: Archive
205            + for<'a> Serialize<
206                Strategy<Serializer<AlignedVec, ArenaHandle<'a>, Share>, rkyv::rancor::Error>,
207            >,
208        PrimaryKey: Clone,
209        SecondaryIndexes: TableSecondaryIndex<Row, AvailableTypes>
210            + TableSecondaryIndexCdc<Row, AvailableTypes, SecondaryEvents>,
211        PkGen: PrimaryKeyGeneratorState,
212    {
213        let pk = row.get_primary_key().clone();
214        let (link, bytes) = self
215            .data
216            .insert_cdc(row.clone())
217            .map_err(WorkTableError::PagesError)?;
218        let (exists, primary_key_events) = self.pk_map.insert_cdc(pk.clone(), link);
219        if exists.is_some() {
220            return Err(WorkTableError::AlreadyExists);
221        }
222        let secondary_keys_events = self.indexes.save_row_cdc(row, link)?;
223
224        let op = Operation::Insert(InsertOperation {
225            id: Default::default(),
226            pk_gen_state: self.pk_gen.get_state(),
227            primary_key_events,
228            secondary_keys_events,
229            bytes,
230            link,
231        });
232
233        Ok((pk, op))
234    }
235}
236
237#[derive(Debug, Display, Error, From)]
238pub enum WorkTableError {
239    NotFound,
240    AlreadyExists,
241    SerializeError,
242    PagesError(in_memory::PagesExecutionError),
243}
244
245#[cfg(test)]
246mod tests {
247    // mod eyre {
248    //     use eyre::*;
249    //     use worktable_codegen::worktable;
250    //
251    //     use crate::prelude::*;
252    //
253    //     worktable! (
254    //         name: Test,
255    //         columns: {
256    //             id: u64 primary_key,
257    //             test: u64
258    //         }
259    //     );
260    //
261    //     #[test]
262    //     fn test() {
263    //         let table = TestWorkTable::default();
264    //         let row = TestRow {
265    //             id: 1,
266    //             test: 1,
267    //         };
268    //         let pk = table.insert::<{ crate::table::tests::tuple_primary_key::TestRow::ROW_SIZE }>(row.clone()).unwrap();
269    //         let selected_row = table.select(pk).unwrap();
270    //
271    //         assert_eq!(selected_row, row);
272    //         assert!(table.select((1, 0).into()).is_none())
273    //     }
274    // }
275}