feophantlib/engine/io/
visible_row_manager.rs

1//! This sits above the row manager and ensures that all commands follow the visibility rules
2//! See here for basic discussion: http://www.interdb.jp/pg/pgsql05.html#_5.6.
3//!
4//! If you need to bypass this, go down a layer
5use crate::engine::objects::SqlTuple;
6
7use super::super::objects::Table;
8use super::super::transactions::{
9    TransactionId, TransactionManager, TransactionManagerError, TransactionStatus,
10};
11use super::{
12    row_formats::{ItemPointer, RowData},
13    RowManager, RowManagerError,
14};
15use async_stream::try_stream;
16use futures::stream::Stream;
17use std::sync::Arc;
18use thiserror::Error;
19
20#[derive(Clone)]
21pub struct VisibleRowManager {
22    row_manager: RowManager,
23    tran_manager: TransactionManager,
24}
25
26impl VisibleRowManager {
27    pub fn new(row_manager: RowManager, tran_manager: TransactionManager) -> VisibleRowManager {
28        VisibleRowManager {
29            row_manager,
30            tran_manager,
31        }
32    }
33
34    pub async fn insert_row(
35        &self,
36        current_tran_id: TransactionId,
37        table: &Arc<Table>,
38        user_data: SqlTuple,
39    ) -> Result<ItemPointer, VisibleRowManagerError> {
40        self.row_manager
41            .insert_row(current_tran_id, table, user_data)
42            .await
43            .map_err(VisibleRowManagerError::RowManagerError)
44    }
45
46    pub async fn get(
47        &mut self,
48        tran_id: TransactionId,
49        table: &Arc<Table>,
50        row_pointer: ItemPointer,
51    ) -> Result<RowData, VisibleRowManagerError> {
52        let row = self.row_manager.get(table, row_pointer).await?;
53
54        if VisibleRowManager::is_visible(&mut self.tran_manager, tran_id, &row).await? {
55            Ok(row)
56        } else {
57            Err(VisibleRowManagerError::NotVisibleRow(row))
58        }
59    }
60
61    // Provides a filtered view that respects transaction visability
62    pub fn get_stream(
63        &self,
64        tran_id: TransactionId,
65        table: &Arc<Table>,
66    ) -> impl Stream<Item = Result<RowData, VisibleRowManagerError>> {
67        let rm = self.row_manager.clone();
68        let mut tm = self.tran_manager.clone();
69        let table = table.clone();
70
71        try_stream! {
72            for await row in rm.get_stream(&table) {
73                let unwrap_row = row?;
74                if VisibleRowManager::is_visible(&mut tm, tran_id, &unwrap_row).await? {
75                    yield unwrap_row;
76                }
77            }
78        }
79    }
80
81    pub async fn any_visible(
82        &mut self,
83        table: &Arc<Table>,
84        tran_id: TransactionId,
85        ptrs: &Vec<ItemPointer>,
86    ) -> Result<bool, VisibleRowManagerError> {
87        for p in ptrs {
88            match self.get(tran_id, table, *p).await {
89                Ok(o) => return Ok(true),
90                Err(VisibleRowManagerError::NotVisibleRow(_)) => continue,
91                Err(e) => {
92                    return Err(e);
93                }
94            }
95        }
96        return Ok(false);
97    }
98
99    //TODO I want to find a way to NOT depend on tm
100    async fn is_visible(
101        tm: &mut TransactionManager,
102        tran_id: TransactionId,
103        row_data: &RowData,
104    ) -> Result<bool, VisibleRowManagerError> {
105        if row_data.min == tran_id {
106            match row_data.max {
107                Some(m) => {
108                    if m == tran_id {
109                        return Ok(false);
110                    } else {
111                        //In the future for us since min cannot be greater than max
112                        return Ok(true);
113                    }
114                }
115                None => return Ok(true),
116            }
117        }
118
119        //TODO check hint bits
120
121        if row_data.min > tran_id {
122            return Ok(false);
123        }
124
125        if tm.get_status(row_data.min).await? != TransactionStatus::Commited {
126            return Ok(false);
127        }
128
129        match row_data.max {
130            Some(m) => {
131                if m > tran_id || tm.get_status(m).await? != TransactionStatus::Commited {
132                    Ok(true)
133                } else {
134                    Ok(false)
135                }
136            }
137            None => Ok(true),
138        }
139    }
140}
141
142#[derive(Error, Debug)]
143pub enum VisibleRowManagerError {
144    #[error("Row {0} is not visible")]
145    NotVisibleRow(RowData),
146    #[error("Test")]
147    Test(),
148    #[error(transparent)]
149    RowManagerError(#[from] RowManagerError),
150    #[error(transparent)]
151    TransactionManagerError(#[from] TransactionManagerError),
152}