1use super::super::objects::Table;
2use super::super::transactions::TransactionId;
3use super::block_layer::file_manager2::{FileManager2, FileManager2Error};
4use super::block_layer::free_space_manager::{FreeSpaceManager, FreeSpaceManagerError, FreeStat};
5use super::format_traits::Serializable;
6use super::page_formats::{PageData, PageDataError, PageId, PageOffset, PageType, UInt12};
7use super::row_formats::{ItemPointer, RowData, RowDataError};
8use super::EncodedSize;
9use crate::engine::objects::SqlTuple;
10use async_stream::try_stream;
11use futures::stream::Stream;
12use std::sync::Arc;
13use thiserror::Error;
14
15#[derive(Clone)]
19pub struct RowManager {
20 file_manager: Arc<FileManager2>,
21 free_space_manager: FreeSpaceManager,
22}
23
24impl RowManager {
25 pub fn new(
26 file_manager: Arc<FileManager2>,
27 free_space_manager: FreeSpaceManager,
28 ) -> RowManager {
29 RowManager {
30 file_manager,
31 free_space_manager,
32 }
33 }
34
35 pub async fn insert_row(
36 &self,
37 current_tran_id: TransactionId,
38 table: &Arc<Table>,
39 user_data: SqlTuple,
40 ) -> Result<ItemPointer, RowManagerError> {
41 self.insert_row_internal(current_tran_id, table, user_data)
42 .await
43 }
44
45 pub async fn delete_row(
48 &self,
49 current_tran_id: TransactionId,
50 table: &Arc<Table>,
51 row_pointer: ItemPointer,
52 ) -> Result<(), RowManagerError> {
53 let page_id = PageId {
54 resource_key: table.id,
55 page_type: PageType::Data,
56 };
57
58 let (page, page_guard) = self
59 .file_manager
60 .get_page_for_update(&page_id, &row_pointer.page)
61 .await?;
62
63 let mut page = PageData::parse(table, row_pointer.page, &page)?;
64 let mut row = page
65 .get_row(row_pointer.count)
66 .ok_or(RowManagerError::NonExistentRow(
67 row_pointer.count,
68 row_pointer.page,
69 ))?
70 .clone();
71
72 if row.max.is_some() {
73 return Err(RowManagerError::AlreadyDeleted(
74 row_pointer.count,
75 row.max.unwrap(),
76 ));
77 }
78
79 row.max = Some(current_tran_id);
80
81 page.update(row, row_pointer.count)?;
82 let new_page = page.serialize_and_pad();
83
84 self.file_manager.update_page(page_guard, new_page).await?;
85
86 Ok(())
87 }
88
89 pub async fn update_row(
91 &mut self,
92 current_tran_id: TransactionId,
93 table: &Arc<Table>,
94 row_pointer: ItemPointer,
95 new_user_data: SqlTuple,
96 ) -> Result<ItemPointer, RowManagerError> {
97 let page_id = PageId {
99 resource_key: table.id,
100 page_type: PageType::Data,
101 };
102
103 let (old_page_buffer, old_guard) = self
104 .file_manager
105 .get_page_for_update(&page_id, &row_pointer.page)
106 .await?;
107
108 let mut old_page = PageData::parse(table, row_pointer.page, &old_page_buffer)?;
109
110 let mut old_row = old_page
111 .get_row(row_pointer.count)
112 .ok_or(RowManagerError::NonExistentRow(
113 row_pointer.count,
114 row_pointer.page,
115 ))?
116 .clone();
117
118 if old_row.max.is_some() {
119 return Err(RowManagerError::AlreadyDeleted(
120 row_pointer.count,
121 old_row.max.unwrap(),
122 ));
123 }
124
125 let new_row_len = RowData::encoded_size(&new_user_data);
126
127 let new_row_pointer;
129 if old_page.can_fit(new_row_len) {
130 new_row_pointer = old_page.insert(current_tran_id, table, new_user_data)?;
131 } else {
132 self.free_space_manager
133 .mark_page(page_id, row_pointer.page, FreeStat::Full)
134 .await?;
135 new_row_pointer = self
136 .insert_row_internal(current_tran_id, table, new_user_data)
137 .await?;
138 }
139
140 old_row.max = Some(current_tran_id);
141 old_row.item_pointer = new_row_pointer;
142 old_page.update(old_row, row_pointer.count)?;
143 let old_page_buffer = old_page.serialize_and_pad();
144
145 self.file_manager
146 .update_page(old_guard, old_page_buffer)
147 .await?;
148
149 Ok(new_row_pointer)
150 }
151
152 pub async fn get(
153 &self,
154 table: &Arc<Table>,
155 row_pointer: ItemPointer,
156 ) -> Result<RowData, RowManagerError> {
157 let page_id = PageId {
158 resource_key: table.id,
159 page_type: PageType::Data,
160 };
161
162 let (page_buffer, _page_guard) = self
163 .file_manager
164 .get_page(&page_id, &row_pointer.page)
165 .await?;
166
167 let page = PageData::parse(table, row_pointer.page, &page_buffer)?;
168
169 let row = page
170 .get_row(row_pointer.count)
171 .ok_or(RowManagerError::NonExistentRow(
172 row_pointer.count,
173 row_pointer.page,
174 ))?
175 .clone();
176
177 Ok(row)
178 }
179
180 pub fn get_stream(
182 &self,
183 table: &Arc<Table>,
184 ) -> impl Stream<Item = Result<RowData, RowManagerError>> {
185 let page_id = PageId {
186 resource_key: table.id,
187 page_type: PageType::Data,
188 };
189
190 let file_manager = self.file_manager.clone();
191 let table = table.clone();
192
193 try_stream! {
194 let mut page_num = PageOffset(0);
195
196 loop {
197 match file_manager.get_page(&page_id, &page_num).await {
198 Ok((buffer, _guard)) => {
199 let page = PageData::parse(&table, page_num, &buffer)?;
200 for await row in page.get_stream() {
201 yield row;
202 }
203 },
204 Err(_) => {
205 return ();
206 }
207 }
208
209 page_num += PageOffset(1);
210 }
211 }
212 }
213
214 async fn insert_row_internal(
216 &self,
217 current_tran_id: TransactionId,
218 table: &Arc<Table>,
219 user_data: SqlTuple,
220 ) -> Result<ItemPointer, RowManagerError> {
221 let page_id = PageId {
222 resource_key: table.id,
223 page_type: PageType::Data,
224 };
225 let user_data_size = RowData::encoded_size(&user_data);
226
227 loop {
228 let next_free_page = self.free_space_manager.get_next_free_page(page_id).await?;
229 match self
230 .file_manager
231 .get_page_for_update(&page_id, &next_free_page)
232 .await
233 {
234 Ok((buffer, page_guard)) => {
235 let mut page = PageData::parse(table, next_free_page, &buffer)?;
236 if page.can_fit(user_data_size) {
237 let new_row_pointer = page.insert(current_tran_id, table, user_data)?;
238 let buffer = page.serialize_and_pad();
239 self.file_manager.update_page(page_guard, buffer).await?;
240 return Ok(new_row_pointer);
241 } else {
242 self.free_space_manager
243 .mark_page(page_id, next_free_page, FreeStat::Full)
244 .await?;
245 continue;
246 }
247 }
248 Err(_) => {
249 let (new_page_offset, new_page_guard) =
251 self.file_manager.get_next_offset(&page_id).await?;
252
253 let mut new_page = PageData::new(new_page_offset);
254 let new_row_pointer = new_page.insert(current_tran_id, table, user_data)?; let new_page_buffer = new_page.serialize_and_pad();
257
258 self.file_manager
259 .add_page(new_page_guard, new_page_buffer)
260 .await?;
261 return Ok(new_row_pointer);
262 }
263 }
264 }
265 }
266}
267
268#[derive(Error, Debug)]
269pub enum RowManagerError {
270 #[error(transparent)]
271 PageDataError(#[from] PageDataError),
272 #[error(transparent)]
273 FileManager2Error(#[from] FileManager2Error),
274 #[error(transparent)]
275 FreeSpaceManagerError(#[from] FreeSpaceManagerError),
276 #[error(transparent)]
277 RowDataError(#[from] RowDataError),
278 #[error("Page {0} does not exist")]
279 NonExistentPage(PageOffset),
280 #[error("Row {0} in Page {1} does not exist")]
281 NonExistentRow(UInt12, PageOffset),
282 #[error("Row {0} already deleted in {1}")]
283 AlreadyDeleted(UInt12, TransactionId),
284 #[error("Row {0} is not visible")]
285 NotVisibleRow(RowData),
286}
287
288#[cfg(test)]
289mod tests {
290 use super::*;
291 use crate::engine::get_row;
292 use crate::engine::get_table;
293 use futures::pin_mut;
294 use tempfile::TempDir;
295 use tokio_stream::StreamExt;
296
297 #[tokio::test]
298 async fn test_row_manager_mass_insert() -> Result<(), Box<dyn std::error::Error>> {
299 let tmp = TempDir::new()?;
300 let tmp_dir = tmp.path().as_os_str().to_os_string();
301
302 let table = get_table();
303 let fm = Arc::new(FileManager2::new(tmp_dir.clone())?);
304 let fsm = FreeSpaceManager::new(fm.clone());
305 let rm = RowManager::new(fm, fsm);
306
307 let tran_id = TransactionId::new(1);
308
309 for i in 0..50 {
310 rm.clone()
311 .insert_row(tran_id, &table, get_row(i.to_string()))
312 .await?;
313 }
314
315 drop(rm);
316
317 let fm = Arc::new(FileManager2::new(tmp_dir.clone())?);
319 let fsm = FreeSpaceManager::new(fm.clone());
320 let rm = RowManager::new(fm, fsm);
321
322 pin_mut!(rm);
323 let result_rows: Vec<RowData> = rm
324 .clone()
325 .get_stream(&table)
326 .map(Result::unwrap)
327 .collect()
328 .await;
329
330 assert_eq!(result_rows.len(), 50);
331 result_rows
332 .iter()
333 .enumerate()
334 .take(50)
335 .map(|(i, row)| {
336 let sample_row = get_row(i.to_string());
337 assert_eq!(row.user_data, sample_row);
338 })
339 .for_each(drop);
340
341 Ok(())
342 }
343
344 #[tokio::test]
345 async fn test_row_manager_crud() -> Result<(), Box<dyn std::error::Error>> {
346 let tmp = TempDir::new()?;
347 let tmp_dir = tmp.path().as_os_str().to_os_string();
348
349 let table = get_table();
350 let fm = Arc::new(FileManager2::new(tmp_dir.clone())?);
351 let fsm = FreeSpaceManager::new(fm.clone());
352 let mut rm = RowManager::new(fm, fsm);
353
354 let tran_id = TransactionId::new(1);
355
356 let insert_pointer = rm
357 .insert_row(tran_id, &table, get_row("test".to_string()))
358 .await?;
359
360 let tran_id_2 = TransactionId::new(3);
361
362 let update_pointer = rm
363 .update_row(
364 tran_id_2,
365 &table,
366 insert_pointer,
367 get_row("test2".to_string()),
368 )
369 .await?;
370
371 pin_mut!(rm);
373 let result_rows: Vec<RowData> = rm.get_stream(&table).map(Result::unwrap).collect().await;
374 assert_eq!(result_rows.len(), 2);
375
376 let tran_id_3 = TransactionId::new(3);
377
378 rm.delete_row(tran_id_3, &table, update_pointer).await?;
379
380 Ok(())
381 }
382}