feophantlib/engine/io/
visible_row_manager.rs1use crate::engine::objects::SqlTuple;
6
7use super::super::objects::Table;
8use super::super::transactions::{
9 TransactionId, TransactionManager, TransactionManagerError, TransactionStatus,
10};
11use super::{
12 row_formats::{ItemPointer, RowData},
13 RowManager, RowManagerError,
14};
15use async_stream::try_stream;
16use futures::stream::Stream;
17use std::sync::Arc;
18use thiserror::Error;
19
20#[derive(Clone)]
21pub struct VisibleRowManager {
22 row_manager: RowManager,
23 tran_manager: TransactionManager,
24}
25
26impl VisibleRowManager {
27 pub fn new(row_manager: RowManager, tran_manager: TransactionManager) -> VisibleRowManager {
28 VisibleRowManager {
29 row_manager,
30 tran_manager,
31 }
32 }
33
34 pub async fn insert_row(
35 &self,
36 current_tran_id: TransactionId,
37 table: &Arc<Table>,
38 user_data: SqlTuple,
39 ) -> Result<ItemPointer, VisibleRowManagerError> {
40 self.row_manager
41 .insert_row(current_tran_id, table, user_data)
42 .await
43 .map_err(VisibleRowManagerError::RowManagerError)
44 }
45
46 pub async fn get(
47 &mut self,
48 tran_id: TransactionId,
49 table: &Arc<Table>,
50 row_pointer: ItemPointer,
51 ) -> Result<RowData, VisibleRowManagerError> {
52 let row = self.row_manager.get(table, row_pointer).await?;
53
54 if VisibleRowManager::is_visible(&mut self.tran_manager, tran_id, &row).await? {
55 Ok(row)
56 } else {
57 Err(VisibleRowManagerError::NotVisibleRow(row))
58 }
59 }
60
61 pub fn get_stream(
63 &self,
64 tran_id: TransactionId,
65 table: &Arc<Table>,
66 ) -> impl Stream<Item = Result<RowData, VisibleRowManagerError>> {
67 let rm = self.row_manager.clone();
68 let mut tm = self.tran_manager.clone();
69 let table = table.clone();
70
71 try_stream! {
72 for await row in rm.get_stream(&table) {
73 let unwrap_row = row?;
74 if VisibleRowManager::is_visible(&mut tm, tran_id, &unwrap_row).await? {
75 yield unwrap_row;
76 }
77 }
78 }
79 }
80
81 pub async fn any_visible(
82 &mut self,
83 table: &Arc<Table>,
84 tran_id: TransactionId,
85 ptrs: &Vec<ItemPointer>,
86 ) -> Result<bool, VisibleRowManagerError> {
87 for p in ptrs {
88 match self.get(tran_id, table, *p).await {
89 Ok(o) => return Ok(true),
90 Err(VisibleRowManagerError::NotVisibleRow(_)) => continue,
91 Err(e) => {
92 return Err(e);
93 }
94 }
95 }
96 return Ok(false);
97 }
98
99 async fn is_visible(
101 tm: &mut TransactionManager,
102 tran_id: TransactionId,
103 row_data: &RowData,
104 ) -> Result<bool, VisibleRowManagerError> {
105 if row_data.min == tran_id {
106 match row_data.max {
107 Some(m) => {
108 if m == tran_id {
109 return Ok(false);
110 } else {
111 return Ok(true);
113 }
114 }
115 None => return Ok(true),
116 }
117 }
118
119 if row_data.min > tran_id {
122 return Ok(false);
123 }
124
125 if tm.get_status(row_data.min).await? != TransactionStatus::Commited {
126 return Ok(false);
127 }
128
129 match row_data.max {
130 Some(m) => {
131 if m > tran_id || tm.get_status(m).await? != TransactionStatus::Commited {
132 Ok(true)
133 } else {
134 Ok(false)
135 }
136 }
137 None => Ok(true),
138 }
139 }
140}
141
142#[derive(Error, Debug)]
143pub enum VisibleRowManagerError {
144 #[error("Row {0} is not visible")]
145 NotVisibleRow(RowData),
146 #[error("Test")]
147 Test(),
148 #[error(transparent)]
149 RowManagerError(#[from] RowManagerError),
150 #[error(transparent)]
151 TransactionManagerError(#[from] TransactionManagerError),
152}