shoal_core/server/tables/
persistent.rs1use glommio::TaskQueueHandle;
6use std::collections::HashMap;
7use std::net::SocketAddr;
8use std::path::PathBuf;
9use std::str::FromStr;
10use tracing::instrument;
11
12use super::partitions::Partition;
13use super::storage::Intents;
14use super::storage::ShoalStorage;
15use crate::server::messages::QueryMetadata;
16use crate::server::Conf;
17use crate::server::ServerError;
18use crate::shared::queries::Update;
19use crate::shared::queries::{Get, Query};
20use crate::shared::responses::{Response, ResponseAction};
21use crate::shared::traits::ShoalTable;
22
23#[derive(Debug)]
25pub struct PersistentTable<T: ShoalTable, S: ShoalStorage<T>> {
26 pub partitions: HashMap<u64, Partition<T>>,
28 storage: S,
30 memory_usage: usize,
32 flushed: Vec<(SocketAddr, Response<T>)>,
34}
35
36impl<T: ShoalTable, S: ShoalStorage<T>> PersistentTable<T, S> {
37 #[instrument(name = "PersistentTable::new", skip(conf), err(Debug))]
44 pub async fn new(
45 shard_name: &str,
46 conf: &Conf,
47 medium_priority: TaskQueueHandle,
48 ) -> Result<Self, ServerError> {
49 let mut table = Self {
51 partitions: HashMap::default(),
52 storage: S::new(shard_name, conf, medium_priority).await?,
53 memory_usage: 0,
54 flushed: Vec::with_capacity(1000),
55 };
56 let path = PathBuf::from_str(&format!("/opt/shoal/intents/{shard_name}-active"))?;
58 S::read_intents(&path, &mut table.partitions).await?;
60 Ok(table)
61 }
62
63 #[instrument(name = "PersistentTable::handle", skip(self, query))]
70 pub async fn handle(
71 &mut self,
72 meta: QueryMetadata,
73 query: Query<T>,
74 ) -> Option<(SocketAddr, Response<T>)> {
75 match query {
77 Query::Insert { row, .. } => self.insert(meta, row).await,
79 Query::Get(get) => self.get(meta, &get).await,
81 Query::Delete { key, sort_key } => self.delete(meta, key, sort_key).await,
83 Query::Update(update) => self.update(meta, update).await,
85 }
86 }
87
88 #[instrument(name = "PersistentTable::insert", skip_all)]
95 async fn insert(&mut self, meta: QueryMetadata, row: T) -> Option<(SocketAddr, Response<T>)> {
96 let key = row.get_partition_key();
98 let partition = self
100 .partitions
101 .entry(key)
102 .or_insert_with(|| Partition::new(key));
103 let intent = Intents::Insert(row);
105 let pos = self.storage.insert(&intent).await.unwrap();
107 let row = match intent {
109 Intents::Insert(row) => row,
110 _ => panic!("TODO NOT HAVE THIS POINTLESS MATCH!"),
111 };
112 let (size_diff, action) = partition.insert(row);
114 self.storage.add_pending(meta, pos, action);
116 self.memory_usage = self.memory_usage.saturating_add_signed(size_diff);
118 None
120 }
121
122 #[instrument(name = "PersistentTable::get", skip_all)]
129 async fn get(
130 &mut self,
131 meta: QueryMetadata,
132 get: &Get<T>,
133 ) -> Option<(SocketAddr, Response<T>)> {
134 let mut data = Vec::new();
136 for key in &get.partition_keys {
138 if let Some(partition) = self.partitions.get(key) {
140 partition.get(get, &mut data);
142 }
143 }
144 let action = if data.is_empty() {
146 ResponseAction::Get(None)
148 } else {
149 ResponseAction::Get(Some(data))
151 };
152 let response = Response {
154 id: meta.id,
155 index: meta.index,
156 data: action,
157 end: meta.end,
158 };
159 Some((meta.addr, response))
160 }
161
162 #[instrument(name = "PersistentTable::delete", skip_all)]
170 async fn delete(
171 &mut self,
172 meta: QueryMetadata,
173 key: u64,
174 sort: T::Sort,
175 ) -> Option<(SocketAddr, Response<T>)> {
176 if let Some(partition) = self.partitions.get_mut(&key) {
178 if let Some((size_diff, _)) = partition.remove(&sort) {
180 let pos = self.storage.delete(key, sort).await.unwrap();
182 let action = ResponseAction::Delete(true);
184 self.storage.add_pending(meta, pos, action);
186 self.memory_usage = self.memory_usage.saturating_sub(size_diff);
188 return None;
190 }
191 }
192 let action = ResponseAction::Delete(true);
194 let response = Response {
196 id: meta.id,
197 index: meta.index,
198 data: action,
199 end: meta.end,
200 };
201 Some((meta.addr, response))
202 }
203
204 #[instrument(name = "PersistentTable::update", skip_all)]
211 async fn update(
212 &mut self,
213 meta: QueryMetadata,
214 update: Update<T>,
215 ) -> Option<(SocketAddr, Response<T>)> {
216 if let Some(partition) = self.partitions.get_mut(&update.partition_key) {
218 if partition.update(&update) {
219 let pos = self.storage.update(update).await.unwrap();
221 let action = ResponseAction::Update(false);
223 self.storage.add_pending(meta, pos, action);
225 return None;
227 }
228 }
229 let action = ResponseAction::Update(false);
231 let response = Response {
233 id: meta.id,
234 index: meta.index,
235 data: action,
236 end: meta.end,
237 };
238 Some((meta.addr, response))
239 }
240
241 pub async fn flush(&mut self) -> Result<(), ServerError> {
243 self.storage.flush().await
244 }
245
246 pub async fn get_flushed(
252 &mut self,
253 ) -> Result<&mut Vec<(SocketAddr, Response<T>)>, ServerError> {
254 self.storage.get_flushed(&mut self.flushed).await?;
256 Ok(&mut self.flushed)
258 }
259
260 #[instrument(name = "PersistentTable::shutdown", skip_all)]
262 pub async fn shutdown(&mut self) -> Result<(), ServerError> {
263 self.storage.shutdown().await
265 }
266}