shoal_core/server/tables/storage/
fs.rs1use byte_unit::Byte;
4use futures::stream::FuturesUnordered;
5use futures::{AsyncWriteExt, StreamExt};
6use glommio::io::{DmaStreamWriter, DmaStreamWriterBuilder, OpenOptions};
7use glommio::{Task, TaskQueueHandle};
8use kanal::{AsyncReceiver, AsyncSender};
9use rkyv::de::Pool;
10use rkyv::rancor::Strategy;
11use rkyv::Archive;
12use std::collections::{HashMap, VecDeque};
13use std::net::SocketAddr;
14use std::path::PathBuf;
15use std::sync::Arc;
16use tracing::instrument;
17
18mod compactor;
19mod map;
20mod reader;
21
22use compactor::FileSystemCompactor;
23use map::ArchiveMap;
24use reader::IntentLogReader;
25
26use super::{CompactionJob, Intents, ShoalStorage};
27use crate::server::messages::QueryMetadata;
28use crate::shared::responses::{Response, ResponseAction};
29use crate::{
30 server::{Conf, ServerError},
31 shared::{queries::Update, traits::ShoalTable},
32 storage::ArchivedIntents,
33 tables::partitions::Partition,
34};
35
36const INTENT_LIMIT: Byte = Byte::MEBIBYTE.multiply(50).unwrap();
38
39pub struct FileSystem<T: ShoalTable> {
41 shard_name: String,
43 intent_path: PathBuf,
45 intent_log: DmaStreamWriter,
47 generation: u64,
49 pending: VecDeque<(u64, QueryMetadata, ResponseAction<T>)>,
51 medium_priority: TaskQueueHandle,
53 pub conf: Conf,
55 pub intent_tx: AsyncSender<CompactionJob>,
57 pub tasks: FuturesUnordered<Task<Result<(), ServerError>>>,
59 map: Arc<ArchiveMap>,
61}
62
63impl<T: ShoalTable + 'static> FileSystem<T>
64where
65 <T as Archive>::Archived: rkyv::Deserialize<T, Strategy<Pool, rkyv::rancor::Error>>,
66 <T::Sort as Archive>::Archived: rkyv::Deserialize<T::Sort, Strategy<Pool, rkyv::rancor::Error>>,
67 <T::Update as Archive>::Archived:
68 rkyv::Deserialize<T::Update, Strategy<Pool, rkyv::rancor::Error>>,
69 <<T as ShoalTable>::Sort as Archive>::Archived: Ord,
70{
71 #[instrument(name = "FileSystem::write_intent", skip_all, fields(shard = &self.shard_name), err(Debug))]
77 async fn write_intent(&mut self, intent: &Intents<T>) -> Result<usize, ServerError> {
78 let archived = rkyv::to_bytes::<_>(intent)?;
80 let size = archived.len();
82 self.intent_log.write_all(&size.to_le_bytes()).await?;
84 self.intent_log.write_all(archived.as_slice()).await?;
86 Ok(size)
87 }
88
89 async fn spawn_intent_compactor(
91 &mut self,
92 compact_rx: AsyncReceiver<CompactionJob>,
93 ) -> Result<(), ServerError> {
94 let compactor = FileSystemCompactor::<T>::with_capacity(
96 &self.shard_name,
97 &self.conf,
98 compact_rx,
99 &self.map,
100 1000,
101 )
102 .await?;
103 let compactor_handle = glommio::spawn_local_into(
105 async move { compactor.start().await },
106 self.medium_priority,
107 )?;
108 self.tasks.push(compactor_handle);
110 Ok(())
111 }
112
113 async fn new_writer(name: &str, conf: &Conf) -> Result<DmaStreamWriter, ServerError> {
115 let intent_path = conf.storage.fs.intent.join(format!("{name}-active"));
117 let file = OpenOptions::new()
120 .create(true)
121 .read(true)
122 .write(true)
123 .dma_open(&intent_path)
124 .await?;
125 let writer = DmaStreamWriterBuilder::new(file)
127 .build();
129 Ok(writer)
130 }
131
132 async fn is_compactable(&mut self) -> Result<u64, ServerError> {
134 if self.intent_log.current_pos() > INTENT_LIMIT {
136 self.flush().await?;
138 let flushed_pos = self.intent_log.current_flushed_pos();
140 self.intent_log.close().await?;
142 let name = format!("{}-inactive-{}", self.shard_name, self.generation);
144 let new_path = self.conf.storage.fs.intent.join(name);
146 glommio::io::rename(&self.intent_path, &new_path).await?;
148 self.intent_tx
150 .send(CompactionJob::IntentLog(new_path))
151 .await?;
152 let new_writer = Self::new_writer(&self.shard_name, &self.conf).await?;
154 self.intent_log = new_writer;
156 self.generation += 1;
158 self.intent_tx.send(CompactionJob::Archives).await?;
160 Ok(flushed_pos)
161 } else {
162 let flushed_pos = self.intent_log.current_flushed_pos();
164 Ok(flushed_pos)
165 }
166 }
167}
168
169impl<T: ShoalTable + 'static> ShoalStorage<T> for FileSystem<T>
170where
171 <T as Archive>::Archived: rkyv::Deserialize<T, Strategy<Pool, rkyv::rancor::Error>>,
172 <T::Sort as Archive>::Archived: rkyv::Deserialize<T::Sort, Strategy<Pool, rkyv::rancor::Error>>,
173 <T::Update as Archive>::Archived:
174 rkyv::Deserialize<T::Update, Strategy<Pool, rkyv::rancor::Error>>,
175 <<T as ShoalTable>::Sort as Archive>::Archived: Ord,
176{
177 #[instrument(name = "ShoalStorage::<FileSystem>::new", skip(conf), err(Debug))]
184 async fn new(
185 shard_name: &str,
186 conf: &Conf,
187 medium_priority: TaskQueueHandle,
188 ) -> Result<Self, ServerError> {
189 let intent_path = conf.storage.fs.intent.join(format!("{shard_name}-active"));
191 let file = OpenOptions::new()
194 .create(true)
195 .read(true)
196 .write(true)
197 .dma_open(&intent_path)
198 .await?;
199 let intent_log = DmaStreamWriterBuilder::new(file)
201 .build();
203 let (intent_tx, intent_rx) = kanal::unbounded_async();
205 let map = Arc::new(ArchiveMap::new(shard_name, conf).await?);
207 let mut fs = FileSystem {
209 shard_name: shard_name.to_owned(),
210 intent_path,
211 intent_log,
212 generation: 0,
213 pending: VecDeque::with_capacity(1000),
214 medium_priority,
215 conf: conf.clone(),
216 intent_tx,
217 tasks: FuturesUnordered::default(),
218 map,
219 };
220 fs.spawn_intent_compactor(intent_rx).await?;
222 Ok(fs)
223 }
224
225 #[instrument(name = "ShoalStorage::<FileSystem>::insert", skip_all, err(Debug))]
231 async fn insert(&mut self, insert: &Intents<T>) -> Result<u64, ServerError> {
232 self.write_intent(insert).await?;
234 let current = self.intent_log.current_pos();
236 Ok(current)
237 }
238
239 #[instrument(name = "ShoalStorage::<FileSystem>::delete", skip(self), err(Debug))]
246 async fn delete(&mut self, partition_key: u64, sort_key: T::Sort) -> Result<u64, ServerError> {
247 let intent = Intents::<T>::Delete {
249 partition_key,
250 sort_key,
251 };
252 self.write_intent(&intent).await?;
254 let current = self.intent_log.current_pos();
256 Ok(current)
257 }
258
259 #[instrument(name = "ShoalStorage::<FileSystem>::update", skip_all, err(Debug))]
265 async fn update(&mut self, update: Update<T>) -> Result<u64, ServerError> {
266 let intent = Intents::Update(update);
268 self.write_intent(&intent).await?;
270 let current = self.intent_log.current_pos();
272 Ok(current)
273 }
274
275 fn add_pending(&mut self, meta: QueryMetadata, pos: u64, response: ResponseAction<T>) {
283 self.pending.push_back((pos, meta, response));
285 }
286
287 async fn get_flushed(
293 &mut self,
294 flushed: &mut Vec<(SocketAddr, Response<T>)>,
295 ) -> Result<(), ServerError> {
296 let flushed_pos = self.is_compactable().await?;
299 while !self.pending.is_empty() {
302 let is_flushed = match self.pending.front() {
304 Some((pending_pos, _, _)) => flushed_pos >= *pending_pos,
305 None => break,
306 };
307 if is_flushed {
309 if let Some((_, meta, data)) = self.pending.pop_front() {
311 let response = Response {
313 id: meta.id,
314 index: meta.index,
315 data,
316 end: meta.end,
317 };
318 flushed.push((meta.addr, response));
320 }
321 } else {
322 break;
324 }
325 }
326 Ok(())
327 }
328
329 #[instrument(name = "ShoalStorage::<FileSystem>::flush", skip_all, err(Debug))]
331 async fn flush(&mut self) -> Result<(), ServerError> {
332 self.intent_log.sync().await?;
333 Ok(())
334 }
335
336 #[instrument(
342 name = "ShoalStorage::<FileSystem>::read_intents",
343 skip(partitions),
344 err(Debug)
345 )]
346 async fn read_intents(
347 path: &PathBuf,
348 partitions: &mut HashMap<u64, Partition<T>>,
349 ) -> Result<(), ServerError> {
350 let mut reader = IntentLogReader::new(path).await?;
352 while let Some(read) = reader.next_buff().await? {
354 let intent = unsafe { rkyv::access_unchecked::<ArchivedIntents<T>>(&read[..]) };
356 match intent {
358 ArchivedIntents::Insert(archived) => {
359 let row = T::deserialize(archived)?;
361 let key = row.get_partition_key();
363 let entry = partitions.entry(key).or_insert_with(|| Partition::new(key));
365 entry.insert(row);
367 }
368 ArchivedIntents::Delete {
369 partition_key,
370 sort_key,
371 } => {
372 let partition_key = partition_key.to_native();
374 if let Some(partition) = partitions.get_mut(&partition_key) {
376 let sort_key = rkyv::deserialize::<T::Sort, rkyv::rancor::Error>(sort_key)?;
378 partition.remove(&sort_key);
380 }
381 }
382 ArchivedIntents::Update(archived) => {
383 let update = rkyv::deserialize::<Update<T>, rkyv::rancor::Error>(archived)?;
385 if let Some(partition) = partitions.get_mut(&update.partition_key) {
387 if !partition.update(&update) {
389 panic!("Missing row update?");
390 }
391 }
392 }
393 }
394 }
395 Ok(())
396 }
397
398 #[allow(async_fn_in_trait)]
400 async fn shutdown(&mut self) -> Result<(), ServerError> {
401 self.flush().await?;
403 self.intent_tx.send(CompactionJob::Shutdown).await?;
405 while let Some(task) = self.tasks.next().await {
407 task?;
409 }
410 Ok(())
411 }
412}