shoal_core/server/tables/
persistent.rs

1//! Persistent tables cache hot data in memory while also storing data on disk.
2//!
3//! This means that data is retained through restarts at the cost of speed.
4
5use glommio::TaskQueueHandle;
6use std::collections::HashMap;
7use std::net::SocketAddr;
8use std::path::PathBuf;
9use std::str::FromStr;
10use tracing::instrument;
11
12use super::partitions::Partition;
13use super::storage::Intents;
14use super::storage::ShoalStorage;
15use crate::server::messages::QueryMetadata;
16use crate::server::Conf;
17use crate::server::ServerError;
18use crate::shared::queries::Update;
19use crate::shared::queries::{Get, Query};
20use crate::shared::responses::{Response, ResponseAction};
21use crate::shared::traits::ShoalTable;
22
23/// A table that stores data both in memory and on disk
24#[derive(Debug)]
25pub struct PersistentTable<T: ShoalTable, S: ShoalStorage<T>> {
26    /// The rows in this table
27    pub partitions: HashMap<u64, Partition<T>>,
28    /// The storage engine backing this table
29    storage: S,
30    /// The total size of all data on this shard
31    memory_usage: usize,
32    /// The responses for queries that have been flushed to disk
33    flushed: Vec<(SocketAddr, Response<T>)>,
34}
35
36impl<T: ShoalTable, S: ShoalStorage<T>> PersistentTable<T, S> {
37    /// Create a persistent shoal table
38    ///
39    /// # Arguments
40    ///
41    /// * `shard_name` - The id of the shard that owns this table
42    /// * `conf` - The Shoal config
43    #[instrument(name = "PersistentTable::new", skip(conf), err(Debug))]
44    pub async fn new(
45        shard_name: &str,
46        conf: &Conf,
47        medium_priority: TaskQueueHandle,
48    ) -> Result<Self, ServerError> {
49        // build our table
50        let mut table = Self {
51            partitions: HashMap::default(),
52            storage: S::new(shard_name, conf, medium_priority).await?,
53            memory_usage: 0,
54            flushed: Vec::with_capacity(1000),
55        };
56        // build the path to this shards intent log
57        let path = PathBuf::from_str(&format!("/opt/shoal/intents/{shard_name}-active"))?;
58        // load our intent log
59        S::read_intents(&path, &mut table.partitions).await?;
60        Ok(table)
61    }
62
63    /// Cast and handle a serialized query
64    ///
65    /// # Arguments
66    ///
67    /// * `meta` - The metadata for this query
68    /// * `query` - The query to execute
69    #[instrument(name = "PersistentTable::handle", skip(self, query))]
70    pub async fn handle(
71        &mut self,
72        meta: QueryMetadata,
73        query: Query<T>,
74    ) -> Option<(SocketAddr, Response<T>)> {
75        // execute the correct query type
76        match query {
77            // insert a row into this partition
78            Query::Insert { row, .. } => self.insert(meta, row).await,
79            // get a row from this partition
80            Query::Get(get) => self.get(meta, &get).await,
81            // delete a row from this partition
82            Query::Delete { key, sort_key } => self.delete(meta, key, sort_key).await,
83            // update a row in this partition
84            Query::Update(update) => self.update(meta, update).await,
85        }
86    }
87
88    /// Insert some data into a partition in this shards table
89    ///
90    /// # Arguments
91    ///
92    /// * `meta` - The metadata about this insert query
93    /// * `row` - The row to insert
94    #[instrument(name = "PersistentTable::insert", skip_all)]
95    async fn insert(&mut self, meta: QueryMetadata, row: T) -> Option<(SocketAddr, Response<T>)> {
96        // get our partition key
97        let key = row.get_partition_key();
98        // get our partition
99        let partition = self
100            .partitions
101            .entry(key)
102            .or_insert_with(|| Partition::new(key));
103        // wrap our row in an insert intent
104        let intent = Intents::Insert(row);
105        // persist this new row to storage
106        let pos = self.storage.insert(&intent).await.unwrap();
107        // extract our row from our intent
108        let row = match intent {
109            Intents::Insert(row) => row,
110            _ => panic!("TODO NOT HAVE THIS POINTLESS MATCH!"),
111        };
112        // insert this row into this partition
113        let (size_diff, action) = partition.insert(row);
114        // add this action to our pending queue
115        self.storage.add_pending(meta, pos, action);
116        // adjust our total shards memory usage
117        self.memory_usage = self.memory_usage.saturating_add_signed(size_diff);
118        // An insert never returns anything immediately
119        None
120    }
121
122    /// Get some rows from some partitions
123    ///
124    /// # Arguments
125    ///
126    /// * `meta` - The metadata about this insert query
127    /// * `get` - The get parameters to use
128    #[instrument(name = "PersistentTable::get", skip_all)]
129    async fn get(
130        &mut self,
131        meta: QueryMetadata,
132        get: &Get<T>,
133    ) -> Option<(SocketAddr, Response<T>)> {
134        // build a vec for the data we found
135        let mut data = Vec::new();
136        // build the sort key
137        for key in &get.partition_keys {
138            // get the partition for this key
139            if let Some(partition) = self.partitions.get(key) {
140                // get rows from this partition
141                partition.get(get, &mut data);
142            }
143        }
144        // add this data to our response
145        let action = if data.is_empty() {
146            // this query did not find data
147            ResponseAction::Get(None)
148        } else {
149            // this query found data
150            ResponseAction::Get(Some(data))
151        };
152        // cast this action to a response
153        let response = Response {
154            id: meta.id,
155            index: meta.index,
156            data: action,
157            end: meta.end,
158        };
159        Some((meta.addr, response))
160    }
161
162    /// Delete a row from this table
163    ///
164    /// # Arguments
165    ///
166    /// * `meta` - The metadata about this delete query
167    /// * `key` - The key to the partition to dlete data from
168    /// * `sort` - The sort key to delete
169    #[instrument(name = "PersistentTable::delete", skip_all)]
170    async fn delete(
171        &mut self,
172        meta: QueryMetadata,
173        key: u64,
174        sort: T::Sort,
175    ) -> Option<(SocketAddr, Response<T>)> {
176        // get this rows partition
177        if let Some(partition) = self.partitions.get_mut(&key) {
178            // try remove the target row from this partition
179            if let Some((size_diff, _)) = partition.remove(&sort) {
180                // wite this delete to our intent log
181                let pos = self.storage.delete(key, sort).await.unwrap();
182                // build the pending action to store
183                let action = ResponseAction::Delete(true);
184                // add this action to our pending queue
185                self.storage.add_pending(meta, pos, action);
186                // adjust this shards total memory usage
187                self.memory_usage = self.memory_usage.saturating_sub(size_diff);
188                // wait for this delete to get flushed to disk
189                return None;
190            }
191        }
192        // we didn't find any data to delete
193        let action = ResponseAction::Delete(true);
194        // cast this action to a response
195        let response = Response {
196            id: meta.id,
197            index: meta.index,
198            data: action,
199            end: meta.end,
200        };
201        Some((meta.addr, response))
202    }
203
204    /// Update a row in this table
205    ///
206    /// # Arguments
207    ///
208    /// * `meta` - The metadata about this insert query
209    /// * `update` - The update to apply to a row in this table
210    #[instrument(name = "PersistentTable::update", skip_all)]
211    async fn update(
212        &mut self,
213        meta: QueryMetadata,
214        update: Update<T>,
215    ) -> Option<(SocketAddr, Response<T>)> {
216        // get this rows partition
217        if let Some(partition) = self.partitions.get_mut(&update.partition_key) {
218            if partition.update(&update) {
219                // write this update to storage
220                let pos = self.storage.update(update).await.unwrap();
221                // we didn't find any data to update
222                let action = ResponseAction::Update(false);
223                // add this action to our pending queue
224                self.storage.add_pending(meta, pos, action);
225                // wait for this delete to get flushed to disk
226                return None;
227            }
228        }
229        // we didn't find any data to update
230        let action = ResponseAction::Update(false);
231        // cast this action to a response
232        let response = Response {
233            id: meta.id,
234            index: meta.index,
235            data: action,
236            end: meta.end,
237        };
238        Some((meta.addr, response))
239    }
240
241    /// Flush all pending writes to disk
242    pub async fn flush(&mut self) -> Result<(), ServerError> {
243        self.storage.flush().await
244    }
245
246    /// Get all flushed response actions
247    ///
248    /// # Arguments
249    ///
250    /// * `flushed` - The flushed actions to return
251    pub async fn get_flushed(
252        &mut self,
253    ) -> Result<&mut Vec<(SocketAddr, Response<T>)>, ServerError> {
254        // check if we have any flushed actions to return
255        self.storage.get_flushed(&mut self.flushed).await?;
256        // return a ref to our flushed responses
257        Ok(&mut self.flushed)
258    }
259
260    /// Shutdown this table
261    #[instrument(name = "PersistentTable::shutdown", skip_all)]
262    pub async fn shutdown(&mut self) -> Result<(), ServerError> {
263        // shutdown our storage engine
264        self.storage.shutdown().await
265    }
266}