feophantlib/engine/io/
constraint_manager.rs1use async_stream::try_stream;
2use futures::Stream;
3use std::sync::Arc;
4use thiserror::Error;
5
6use crate::{
7 constants::Nullable,
8 engine::{
9 objects::{
10 types::{BaseSqlTypes, BaseSqlTypesMapper},
11 SqlTuple, SqlTupleError, Table,
12 },
13 transactions::TransactionId,
14 },
15};
16
17use super::{
18 index_manager::IndexManagerError,
19 row_formats::{ItemPointer, RowData},
20 IndexManager, VisibleRowManager, VisibleRowManagerError,
21};
22
23#[derive(Clone)]
27pub struct ConstraintManager {
28 index_manager: IndexManager,
29 vis_row_man: VisibleRowManager,
30}
31
32impl ConstraintManager {
33 pub fn new(index_manager: IndexManager, vis_row_man: VisibleRowManager) -> ConstraintManager {
34 ConstraintManager {
35 index_manager,
36 vis_row_man,
37 }
38 }
39
40 pub async fn insert_row(
41 &mut self,
42 current_tran_id: TransactionId,
43 table: &Arc<Table>,
44 user_data: SqlTuple,
45 ) -> Result<ItemPointer, ConstraintManagerError> {
46 if table.attributes.len() != user_data.0.len() {
48 return Err(ConstraintManagerError::TableRowSizeMismatch(
49 table.attributes.len(),
50 user_data.0.len(),
51 ));
52 }
53
54 for (data, column) in user_data.0.iter().zip(table.attributes.clone()) {
56 match data {
57 Some(d) => {
58 if !d.type_matches(&column.sql_type) {
59 return Err(ConstraintManagerError::TableRowTypeMismatch(
60 d.clone(),
61 column.sql_type,
62 ));
63 }
64 }
65 None => {
66 if column.nullable != Nullable::Null {
67 return Err(ConstraintManagerError::UnexpectedNull(column.name));
68 }
69 }
70 }
71 }
72
73 for c in &table.constraints {
75 match c {
76 crate::engine::objects::Constraint::PrimaryKey(p) => {
77 debug!("searching for {:?}", user_data);
78 match self
79 .index_manager
80 .search_for_key(
81 &p.index,
82 &user_data
83 .clone()
84 .filter_map(&table.sql_type, &p.index.columns)?,
85 )
86 .await?
87 {
88 Some(rows) => {
89 if self
91 .vis_row_man
92 .any_visible(table, current_tran_id, &rows)
93 .await?
94 {
95 return Err(ConstraintManagerError::PrimaryKeyViolation());
96 }
97 }
98 None => {
99 continue;
100 }
101 }
102 }
103 }
104 }
105
106 let row_item_ptr = self
108 .vis_row_man
109 .insert_row(current_tran_id, table, user_data.clone())
110 .await?;
111
112 for i in &table.indexes {
115 let tuple_for_index = match user_data.clone().filter_map(&table.sql_type, &i.columns) {
116 Ok(u) => u,
117 Err(_) => {
118 continue;
119 }
120 };
121
122 self.index_manager
123 .add(i, tuple_for_index, row_item_ptr)
124 .await?;
125 }
126
127 Ok(row_item_ptr)
128 }
129
130 pub async fn get(
132 &mut self,
133 tran_id: TransactionId,
134 table: &Arc<Table>,
135 row_pointer: ItemPointer,
136 ) -> Result<RowData, ConstraintManagerError> {
137 Ok(self.vis_row_man.get(tran_id, table, row_pointer).await?)
138 }
139
140 pub fn get_stream(
143 self,
144 tran_id: TransactionId,
145 table: Arc<Table>,
146 ) -> impl Stream<Item = Result<RowData, ConstraintManagerError>> {
147 try_stream! {
148 for await row in self.vis_row_man.get_stream(tran_id, &table) {
149 let unwrap_row = row?;
150 yield unwrap_row;
151 }
152 }
153 }
154}
155
156#[derive(Error, Debug)]
157pub enum ConstraintManagerError {
158 #[error(transparent)]
159 IndexManagerError(#[from] IndexManagerError),
160 #[error("Primary Key violation")]
161 PrimaryKeyViolation(),
162 #[error(transparent)]
163 SqlTupleError(#[from] SqlTupleError),
164 #[error("Table definition length {0} does not match columns passed {1}")]
165 TableRowSizeMismatch(usize, usize),
166 #[error("Table definition type {0} does not match column passed {1}")]
167 TableRowTypeMismatch(BaseSqlTypes, BaseSqlTypesMapper),
168 #[error(transparent)]
169 VisibleRowManagerError(#[from] VisibleRowManagerError),
170 #[error("Column null when ask not to be {0}")]
171 UnexpectedNull(String),
172}