feophantlib/engine/io/
constraint_manager.rs

1use async_stream::try_stream;
2use futures::Stream;
3use std::sync::Arc;
4use thiserror::Error;
5
6use crate::{
7    constants::Nullable,
8    engine::{
9        objects::{
10            types::{BaseSqlTypes, BaseSqlTypesMapper},
11            SqlTuple, SqlTupleError, Table,
12        },
13        transactions::TransactionId,
14    },
15};
16
17use super::{
18    index_manager::IndexManagerError,
19    row_formats::{ItemPointer, RowData},
20    IndexManager, VisibleRowManager, VisibleRowManagerError,
21};
22
23/// The goal of the constraint manager is to ensure all constraints are satisfied
24/// before we hand it off deeper into the stack. For now its taking on the null checks
25/// of RowData
26#[derive(Clone)]
27pub struct ConstraintManager {
28    index_manager: IndexManager,
29    vis_row_man: VisibleRowManager,
30}
31
32impl ConstraintManager {
33    pub fn new(index_manager: IndexManager, vis_row_man: VisibleRowManager) -> ConstraintManager {
34        ConstraintManager {
35            index_manager,
36            vis_row_man,
37        }
38    }
39
40    pub async fn insert_row(
41        &mut self,
42        current_tran_id: TransactionId,
43        table: &Arc<Table>,
44        user_data: SqlTuple,
45    ) -> Result<ItemPointer, ConstraintManagerError> {
46        //column count check
47        if table.attributes.len() != user_data.0.len() {
48            return Err(ConstraintManagerError::TableRowSizeMismatch(
49                table.attributes.len(),
50                user_data.0.len(),
51            ));
52        }
53
54        //null checks
55        for (data, column) in user_data.0.iter().zip(table.attributes.clone()) {
56            match data {
57                Some(d) => {
58                    if !d.type_matches(&column.sql_type) {
59                        return Err(ConstraintManagerError::TableRowTypeMismatch(
60                            d.clone(),
61                            column.sql_type,
62                        ));
63                    }
64                }
65                None => {
66                    if column.nullable != Nullable::Null {
67                        return Err(ConstraintManagerError::UnexpectedNull(column.name));
68                    }
69                }
70            }
71        }
72
73        //constraint check
74        for c in &table.constraints {
75            match c {
76                crate::engine::objects::Constraint::PrimaryKey(p) => {
77                    debug!("searching for {:?}", user_data);
78                    match self
79                        .index_manager
80                        .search_for_key(
81                            &p.index,
82                            &user_data
83                                .clone()
84                                .filter_map(&table.sql_type, &p.index.columns)?,
85                        )
86                        .await?
87                    {
88                        Some(rows) => {
89                            //We need to check if each of these rows are alive
90                            if self
91                                .vis_row_man
92                                .any_visible(table, current_tran_id, &rows)
93                                .await?
94                            {
95                                return Err(ConstraintManagerError::PrimaryKeyViolation());
96                            }
97                        }
98                        None => {
99                            continue;
100                        }
101                    }
102                }
103            }
104        }
105
106        //Insert the row
107        let row_item_ptr = self
108            .vis_row_man
109            .insert_row(current_tran_id, table, user_data.clone())
110            .await?;
111
112        //Update the indexes
113        //TODO figure out if that makes sense in this layer
114        for i in &table.indexes {
115            let tuple_for_index = match user_data.clone().filter_map(&table.sql_type, &i.columns) {
116                Ok(u) => u,
117                Err(_) => {
118                    continue;
119                }
120            };
121
122            self.index_manager
123                .add(i, tuple_for_index, row_item_ptr)
124                .await?;
125        }
126
127        Ok(row_item_ptr)
128    }
129
130    /// Gets a specific tuple from below, at the moment just a passthrough
131    pub async fn get(
132        &mut self,
133        tran_id: TransactionId,
134        table: &Arc<Table>,
135        row_pointer: ItemPointer,
136    ) -> Result<RowData, ConstraintManagerError> {
137        Ok(self.vis_row_man.get(tran_id, table, row_pointer).await?)
138    }
139
140    /// Provides a filtered view that respects transaction visibility
141    /// At the moment this is practically just a passthrough
142    pub fn get_stream(
143        self,
144        tran_id: TransactionId,
145        table: Arc<Table>,
146    ) -> impl Stream<Item = Result<RowData, ConstraintManagerError>> {
147        try_stream! {
148            for await row in self.vis_row_man.get_stream(tran_id, &table) {
149                let unwrap_row = row?;
150                yield unwrap_row;
151            }
152        }
153    }
154}
155
156#[derive(Error, Debug)]
157pub enum ConstraintManagerError {
158    #[error(transparent)]
159    IndexManagerError(#[from] IndexManagerError),
160    #[error("Primary Key violation")]
161    PrimaryKeyViolation(),
162    #[error(transparent)]
163    SqlTupleError(#[from] SqlTupleError),
164    #[error("Table definition length {0} does not match columns passed {1}")]
165    TableRowSizeMismatch(usize, usize),
166    #[error("Table definition type {0} does not match column passed {1}")]
167    TableRowTypeMismatch(BaseSqlTypes, BaseSqlTypesMapper),
168    #[error(transparent)]
169    VisibleRowManagerError(#[from] VisibleRowManagerError),
170    #[error("Column null when ask not to be {0}")]
171    UnexpectedNull(String),
172}