feophantlib/engine/io/
row_manager.rs

1use super::super::objects::Table;
2use super::super::transactions::TransactionId;
3use super::block_layer::file_manager2::{FileManager2, FileManager2Error};
4use super::block_layer::free_space_manager::{FreeSpaceManager, FreeSpaceManagerError, FreeStat};
5use super::format_traits::Serializable;
6use super::page_formats::{PageData, PageDataError, PageId, PageOffset, PageType, UInt12};
7use super::row_formats::{ItemPointer, RowData, RowDataError};
8use super::EncodedSize;
9use crate::engine::objects::SqlTuple;
10use async_stream::try_stream;
11use futures::stream::Stream;
12use std::sync::Arc;
13use thiserror::Error;
14
15/// The row manager is a mapper between rows and pages on disk.
16///
17/// It operates at the lowest level, no visibility checks are done.
18#[derive(Clone)]
19pub struct RowManager {
20    file_manager: Arc<FileManager2>,
21    free_space_manager: FreeSpaceManager,
22}
23
24impl RowManager {
25    pub fn new(
26        file_manager: Arc<FileManager2>,
27        free_space_manager: FreeSpaceManager,
28    ) -> RowManager {
29        RowManager {
30            file_manager,
31            free_space_manager,
32        }
33    }
34
35    pub async fn insert_row(
36        &self,
37        current_tran_id: TransactionId,
38        table: &Arc<Table>,
39        user_data: SqlTuple,
40    ) -> Result<ItemPointer, RowManagerError> {
41        self.insert_row_internal(current_tran_id, table, user_data)
42            .await
43    }
44
45    //Note this is a logical delete
46    //TODO debating if this should respect the visibility map, probably yes just trying to limit the pain
47    pub async fn delete_row(
48        &self,
49        current_tran_id: TransactionId,
50        table: &Arc<Table>,
51        row_pointer: ItemPointer,
52    ) -> Result<(), RowManagerError> {
53        let page_id = PageId {
54            resource_key: table.id,
55            page_type: PageType::Data,
56        };
57
58        let (page, page_guard) = self
59            .file_manager
60            .get_page_for_update(&page_id, &row_pointer.page)
61            .await?;
62
63        let mut page = PageData::parse(table, row_pointer.page, &page)?;
64        let mut row = page
65            .get_row(row_pointer.count)
66            .ok_or(RowManagerError::NonExistentRow(
67                row_pointer.count,
68                row_pointer.page,
69            ))?
70            .clone();
71
72        if row.max.is_some() {
73            return Err(RowManagerError::AlreadyDeleted(
74                row_pointer.count,
75                row.max.unwrap(),
76            ));
77        }
78
79        row.max = Some(current_tran_id);
80
81        page.update(row, row_pointer.count)?;
82        let new_page = page.serialize_and_pad();
83
84        self.file_manager.update_page(page_guard, new_page).await?;
85
86        Ok(())
87    }
88
89    //Note this is an insert new row, delete old row operation
90    pub async fn update_row(
91        &mut self,
92        current_tran_id: TransactionId,
93        table: &Arc<Table>,
94        row_pointer: ItemPointer,
95        new_user_data: SqlTuple,
96    ) -> Result<ItemPointer, RowManagerError> {
97        //First get the current row so we have it for the update
98        let page_id = PageId {
99            resource_key: table.id,
100            page_type: PageType::Data,
101        };
102
103        let (old_page_buffer, old_guard) = self
104            .file_manager
105            .get_page_for_update(&page_id, &row_pointer.page)
106            .await?;
107
108        let mut old_page = PageData::parse(table, row_pointer.page, &old_page_buffer)?;
109
110        let mut old_row = old_page
111            .get_row(row_pointer.count)
112            .ok_or(RowManagerError::NonExistentRow(
113                row_pointer.count,
114                row_pointer.page,
115            ))?
116            .clone();
117
118        if old_row.max.is_some() {
119            return Err(RowManagerError::AlreadyDeleted(
120                row_pointer.count,
121                old_row.max.unwrap(),
122            ));
123        }
124
125        let new_row_len = RowData::encoded_size(&new_user_data);
126
127        //Prefer using the old page if possible
128        let new_row_pointer;
129        if old_page.can_fit(new_row_len) {
130            new_row_pointer = old_page.insert(current_tran_id, table, new_user_data)?;
131        } else {
132            self.free_space_manager
133                .mark_page(page_id, row_pointer.page, FreeStat::Full)
134                .await?;
135            new_row_pointer = self
136                .insert_row_internal(current_tran_id, table, new_user_data)
137                .await?;
138        }
139
140        old_row.max = Some(current_tran_id);
141        old_row.item_pointer = new_row_pointer;
142        old_page.update(old_row, row_pointer.count)?;
143        let old_page_buffer = old_page.serialize_and_pad();
144
145        self.file_manager
146            .update_page(old_guard, old_page_buffer)
147            .await?;
148
149        Ok(new_row_pointer)
150    }
151
152    pub async fn get(
153        &self,
154        table: &Arc<Table>,
155        row_pointer: ItemPointer,
156    ) -> Result<RowData, RowManagerError> {
157        let page_id = PageId {
158            resource_key: table.id,
159            page_type: PageType::Data,
160        };
161
162        let (page_buffer, _page_guard) = self
163            .file_manager
164            .get_page(&page_id, &row_pointer.page)
165            .await?;
166
167        let page = PageData::parse(table, row_pointer.page, &page_buffer)?;
168
169        let row = page
170            .get_row(row_pointer.count)
171            .ok_or(RowManagerError::NonExistentRow(
172                row_pointer.count,
173                row_pointer.page,
174            ))?
175            .clone();
176
177        Ok(row)
178    }
179
180    // Provides an unfiltered view of the underlying table
181    pub fn get_stream(
182        &self,
183        table: &Arc<Table>,
184    ) -> impl Stream<Item = Result<RowData, RowManagerError>> {
185        let page_id = PageId {
186            resource_key: table.id,
187            page_type: PageType::Data,
188        };
189
190        let file_manager = self.file_manager.clone();
191        let table = table.clone();
192
193        try_stream! {
194            let mut page_num = PageOffset(0);
195
196            loop {
197                match file_manager.get_page(&page_id, &page_num).await {
198                    Ok((buffer, _guard)) => {
199                        let page = PageData::parse(&table, page_num, &buffer)?;
200                        for await row in page.get_stream() {
201                            yield row;
202                        }
203                    },
204                    Err(_) => {
205                        return ();
206                    }
207                }
208
209                page_num += PageOffset(1);
210            }
211        }
212    }
213
214    // TODO implement free space maps so I don't have to scan every page
215    async fn insert_row_internal(
216        &self,
217        current_tran_id: TransactionId,
218        table: &Arc<Table>,
219        user_data: SqlTuple,
220    ) -> Result<ItemPointer, RowManagerError> {
221        let page_id = PageId {
222            resource_key: table.id,
223            page_type: PageType::Data,
224        };
225        let user_data_size = RowData::encoded_size(&user_data);
226
227        loop {
228            let next_free_page = self.free_space_manager.get_next_free_page(page_id).await?;
229            match self
230                .file_manager
231                .get_page_for_update(&page_id, &next_free_page)
232                .await
233            {
234                Ok((buffer, page_guard)) => {
235                    let mut page = PageData::parse(table, next_free_page, &buffer)?;
236                    if page.can_fit(user_data_size) {
237                        let new_row_pointer = page.insert(current_tran_id, table, user_data)?;
238                        let buffer = page.serialize_and_pad();
239                        self.file_manager.update_page(page_guard, buffer).await?;
240                        return Ok(new_row_pointer);
241                    } else {
242                        self.free_space_manager
243                            .mark_page(page_id, next_free_page, FreeStat::Full)
244                            .await?;
245                        continue;
246                    }
247                }
248                Err(_) => {
249                    //We got here because we asked for an offset that didn't exist yet.
250                    let (new_page_offset, new_page_guard) =
251                        self.file_manager.get_next_offset(&page_id).await?;
252
253                    let mut new_page = PageData::new(new_page_offset);
254                    let new_row_pointer = new_page.insert(current_tran_id, table, user_data)?; //TODO Will NOT handle overly large rows
255
256                    let new_page_buffer = new_page.serialize_and_pad();
257
258                    self.file_manager
259                        .add_page(new_page_guard, new_page_buffer)
260                        .await?;
261                    return Ok(new_row_pointer);
262                }
263            }
264        }
265    }
266}
267
268#[derive(Error, Debug)]
269pub enum RowManagerError {
270    #[error(transparent)]
271    PageDataError(#[from] PageDataError),
272    #[error(transparent)]
273    FileManager2Error(#[from] FileManager2Error),
274    #[error(transparent)]
275    FreeSpaceManagerError(#[from] FreeSpaceManagerError),
276    #[error(transparent)]
277    RowDataError(#[from] RowDataError),
278    #[error("Page {0} does not exist")]
279    NonExistentPage(PageOffset),
280    #[error("Row {0} in Page {1} does not exist")]
281    NonExistentRow(UInt12, PageOffset),
282    #[error("Row {0} already deleted in {1}")]
283    AlreadyDeleted(UInt12, TransactionId),
284    #[error("Row {0} is not visible")]
285    NotVisibleRow(RowData),
286}
287
288#[cfg(test)]
289mod tests {
290    use super::*;
291    use crate::engine::get_row;
292    use crate::engine::get_table;
293    use futures::pin_mut;
294    use tempfile::TempDir;
295    use tokio_stream::StreamExt;
296
297    #[tokio::test]
298    async fn test_row_manager_mass_insert() -> Result<(), Box<dyn std::error::Error>> {
299        let tmp = TempDir::new()?;
300        let tmp_dir = tmp.path().as_os_str().to_os_string();
301
302        let table = get_table();
303        let fm = Arc::new(FileManager2::new(tmp_dir.clone())?);
304        let fsm = FreeSpaceManager::new(fm.clone());
305        let rm = RowManager::new(fm, fsm);
306
307        let tran_id = TransactionId::new(1);
308
309        for i in 0..50 {
310            rm.clone()
311                .insert_row(tran_id, &table, get_row(i.to_string()))
312                .await?;
313        }
314
315        drop(rm);
316
317        //Now let's make sure they're really in the table, persisting across restarts
318        let fm = Arc::new(FileManager2::new(tmp_dir.clone())?);
319        let fsm = FreeSpaceManager::new(fm.clone());
320        let rm = RowManager::new(fm, fsm);
321
322        pin_mut!(rm);
323        let result_rows: Vec<RowData> = rm
324            .clone()
325            .get_stream(&table)
326            .map(Result::unwrap)
327            .collect()
328            .await;
329
330        assert_eq!(result_rows.len(), 50);
331        result_rows
332            .iter()
333            .enumerate()
334            .take(50)
335            .map(|(i, row)| {
336                let sample_row = get_row(i.to_string());
337                assert_eq!(row.user_data, sample_row);
338            })
339            .for_each(drop);
340
341        Ok(())
342    }
343
344    #[tokio::test]
345    async fn test_row_manager_crud() -> Result<(), Box<dyn std::error::Error>> {
346        let tmp = TempDir::new()?;
347        let tmp_dir = tmp.path().as_os_str().to_os_string();
348
349        let table = get_table();
350        let fm = Arc::new(FileManager2::new(tmp_dir.clone())?);
351        let fsm = FreeSpaceManager::new(fm.clone());
352        let mut rm = RowManager::new(fm, fsm);
353
354        let tran_id = TransactionId::new(1);
355
356        let insert_pointer = rm
357            .insert_row(tran_id, &table, get_row("test".to_string()))
358            .await?;
359
360        let tran_id_2 = TransactionId::new(3);
361
362        let update_pointer = rm
363            .update_row(
364                tran_id_2,
365                &table,
366                insert_pointer,
367                get_row("test2".to_string()),
368            )
369            .await?;
370
371        //Now let's make sure the update took
372        pin_mut!(rm);
373        let result_rows: Vec<RowData> = rm.get_stream(&table).map(Result::unwrap).collect().await;
374        assert_eq!(result_rows.len(), 2);
375
376        let tran_id_3 = TransactionId::new(3);
377
378        rm.delete_row(tran_id_3, &table, update_pointer).await?;
379
380        Ok(())
381    }
382}