shoal_core/server/tables/storage/
fs.rs

1//! The file system storage module for shoal
2
3use byte_unit::Byte;
4use futures::stream::FuturesUnordered;
5use futures::{AsyncWriteExt, StreamExt};
6use glommio::io::{DmaStreamWriter, DmaStreamWriterBuilder, OpenOptions};
7use glommio::{Task, TaskQueueHandle};
8use kanal::{AsyncReceiver, AsyncSender};
9use rkyv::de::Pool;
10use rkyv::rancor::Strategy;
11use rkyv::Archive;
12use std::collections::{HashMap, VecDeque};
13use std::net::SocketAddr;
14use std::path::PathBuf;
15use std::sync::Arc;
16use tracing::instrument;
17
18mod compactor;
19mod map;
20mod reader;
21
22use compactor::FileSystemCompactor;
23use map::ArchiveMap;
24use reader::IntentLogReader;
25
26use super::{CompactionJob, Intents, ShoalStorage};
27use crate::server::messages::QueryMetadata;
28use crate::shared::responses::{Response, ResponseAction};
29use crate::{
30    server::{Conf, ServerError},
31    shared::{queries::Update, traits::ShoalTable},
32    storage::ArchivedIntents,
33    tables::partitions::Partition,
34};
35
36/// The max size for our data intent log
37const INTENT_LIMIT: Byte = Byte::MEBIBYTE.multiply(50).unwrap();
38
39/// Store shoal data in an existing filesytem for persistence
40pub struct FileSystem<T: ShoalTable> {
41    /// The name of the shard we are storing data for
42    shard_name: String,
43    /// The path to our current intent log
44    intent_path: PathBuf,
45    /// The intent log to write too
46    intent_log: DmaStreamWriter,
47    /// The current intent log generation
48    generation: u64,
49    /// The still pending writes
50    pending: VecDeque<(u64, QueryMetadata, ResponseAction<T>)>,
51    /// The medium priority task queue
52    medium_priority: TaskQueueHandle,
53    /// The config for shoal
54    pub conf: Conf,
55    /// The channel to send intent log compactions on
56    pub intent_tx: AsyncSender<CompactionJob>,
57    /// The different tasks spawned by this shards file system storage engine
58    pub tasks: FuturesUnordered<Task<Result<(), ServerError>>>,
59    /// The shard local shared map of archive/partition data
60    map: Arc<ArchiveMap>,
61}
62
63impl<T: ShoalTable + 'static> FileSystem<T>
64where
65    <T as Archive>::Archived: rkyv::Deserialize<T, Strategy<Pool, rkyv::rancor::Error>>,
66    <T::Sort as Archive>::Archived: rkyv::Deserialize<T::Sort, Strategy<Pool, rkyv::rancor::Error>>,
67    <T::Update as Archive>::Archived:
68        rkyv::Deserialize<T::Update, Strategy<Pool, rkyv::rancor::Error>>,
69    <<T as ShoalTable>::Sort as Archive>::Archived: Ord,
70{
71    /// Write an intent to storage
72    ///
73    /// # Arguments
74    ///
75    /// * `intent` - The intent to write to disk
76    #[instrument(name = "FileSystem::write_intent", skip_all, fields(shard = &self.shard_name), err(Debug))]
77    async fn write_intent(&mut self, intent: &Intents<T>) -> Result<usize, ServerError> {
78        // archive this intent log entry
79        let archived = rkyv::to_bytes::<_>(intent)?;
80        // get the size of the data to write
81        let size = archived.len();
82        // write our size
83        self.intent_log.write_all(&size.to_le_bytes()).await?;
84        // write our data
85        self.intent_log.write_all(archived.as_slice()).await?;
86        Ok(size)
87    }
88
89    /// Spawn a compactor on this shard
90    async fn spawn_intent_compactor(
91        &mut self,
92        compact_rx: AsyncReceiver<CompactionJob>,
93    ) -> Result<(), ServerError> {
94        // build a compactor
95        let compactor = FileSystemCompactor::<T>::with_capacity(
96            &self.shard_name,
97            &self.conf,
98            compact_rx,
99            &self.map,
100            1000,
101        )
102        .await?;
103        // spawn this compactor
104        let compactor_handle = glommio::spawn_local_into(
105            async move { compactor.start().await },
106            self.medium_priority,
107        )?;
108        // add this compactor to our task list
109        self.tasks.push(compactor_handle);
110        Ok(())
111    }
112
113    /// Get a new stream writer for this shard
114    async fn new_writer(name: &str, conf: &Conf) -> Result<DmaStreamWriter, ServerError> {
115        // build the path to this shards intent log
116        let intent_path = conf.storage.fs.intent.join(format!("{name}-active"));
117        // open this file
118        // don't open with append or new writes will overwrite old ones
119        let file = OpenOptions::new()
120            .create(true)
121            .read(true)
122            .write(true)
123            .dma_open(&intent_path)
124            .await?;
125        // wrap our file in a stream writer
126        let writer = DmaStreamWriterBuilder::new(file)
127            //.with_buffer_size(500)
128            .build();
129        Ok(writer)
130    }
131
132    /// Check if we need to compact the current intent log
133    async fn is_compactable(&mut self) -> Result<u64, ServerError> {
134        // check if this intent log is over 50MiB
135        if self.intent_log.current_pos() > INTENT_LIMIT {
136            // flush this intent log
137            self.flush().await?;
138            // get the current flushed position
139            let flushed_pos = self.intent_log.current_flushed_pos();
140            // close our intent log
141            self.intent_log.close().await?;
142            // build the file name to rename our current intent log too
143            let name = format!("{}-inactive-{}", self.shard_name, self.generation);
144            // build the path to this shards new intent log
145            let new_path = self.conf.storage.fs.intent.join(name);
146            // rename our old intent log
147            glommio::io::rename(&self.intent_path, &new_path).await?;
148            // create an intent log compaction job
149            self.intent_tx
150                .send(CompactionJob::IntentLog(new_path))
151                .await?;
152            // get a new writer
153            let new_writer = Self::new_writer(&self.shard_name, &self.conf).await?;
154            // set our new writer
155            self.intent_log = new_writer;
156            // increment our writer generation
157            self.generation += 1;
158            // increment the attempt
159            self.intent_tx.send(CompactionJob::Archives).await?;
160            Ok(flushed_pos)
161        } else {
162            // get the current position of flushed data
163            let flushed_pos = self.intent_log.current_flushed_pos();
164            Ok(flushed_pos)
165        }
166    }
167}
168
169impl<T: ShoalTable + 'static> ShoalStorage<T> for FileSystem<T>
170where
171    <T as Archive>::Archived: rkyv::Deserialize<T, Strategy<Pool, rkyv::rancor::Error>>,
172    <T::Sort as Archive>::Archived: rkyv::Deserialize<T::Sort, Strategy<Pool, rkyv::rancor::Error>>,
173    <T::Update as Archive>::Archived:
174        rkyv::Deserialize<T::Update, Strategy<Pool, rkyv::rancor::Error>>,
175    <<T as ShoalTable>::Sort as Archive>::Archived: Ord,
176{
177    /// Create a new instance of this storage engine
178    ///
179    /// # Arguments
180    ///
181    /// * `shard_name` - The id of the shard that owns this table
182    /// * `conf` - The Shoal config
183    #[instrument(name = "ShoalStorage::<FileSystem>::new", skip(conf), err(Debug))]
184    async fn new(
185        shard_name: &str,
186        conf: &Conf,
187        medium_priority: TaskQueueHandle,
188    ) -> Result<Self, ServerError> {
189        // build the path to this shards intent log
190        let intent_path = conf.storage.fs.intent.join(format!("{shard_name}-active"));
191        // open this file
192        // don't open with append or new writes will overwrite old ones
193        let file = OpenOptions::new()
194            .create(true)
195            .read(true)
196            .write(true)
197            .dma_open(&intent_path)
198            .await?;
199        // wrap our file in a stream writer
200        let intent_log = DmaStreamWriterBuilder::new(file)
201            //.with_buffer_size(500)
202            .build();
203        // build the channel to our compactor
204        let (intent_tx, intent_rx) = kanal::unbounded_async();
205        // get this shards shared archive map
206        let map = Arc::new(ArchiveMap::new(shard_name, conf).await?);
207        // build our file system storage module
208        let mut fs = FileSystem {
209            shard_name: shard_name.to_owned(),
210            intent_path,
211            intent_log,
212            generation: 0,
213            pending: VecDeque::with_capacity(1000),
214            medium_priority,
215            conf: conf.clone(),
216            intent_tx,
217            tasks: FuturesUnordered::default(),
218            map,
219        };
220        // spawn our intent compactor
221        fs.spawn_intent_compactor(intent_rx).await?;
222        Ok(fs)
223    }
224
225    /// Write this new row to our storage
226    ///
227    /// # Arguments
228    ///
229    /// * `insert` - The row to write
230    #[instrument(name = "ShoalStorage::<FileSystem>::insert", skip_all, err(Debug))]
231    async fn insert(&mut self, insert: &Intents<T>) -> Result<u64, ServerError> {
232        // write this insert intent log
233        self.write_intent(insert).await?;
234        // get the current position of the stream writer
235        let current = self.intent_log.current_pos();
236        Ok(current)
237    }
238
239    /// Delete a row from storage
240    ///
241    /// # Arguments
242    ///
243    /// * `partition_key` - The key to the partition we are deleting data from
244    /// * `sort_key` - The sort key to use to delete data from with in a partition
245    #[instrument(name = "ShoalStorage::<FileSystem>::delete", skip(self), err(Debug))]
246    async fn delete(&mut self, partition_key: u64, sort_key: T::Sort) -> Result<u64, ServerError> {
247        // wrap this delete command in a delete intent
248        let intent = Intents::<T>::Delete {
249            partition_key,
250            sort_key,
251        };
252        // write this delete intent log
253        self.write_intent(&intent).await?;
254        // get the current position of the stream writer
255        let current = self.intent_log.current_pos();
256        Ok(current)
257    }
258
259    /// Write a row update to storage
260    ///
261    /// # Arguments
262    ///
263    /// * `update` - The update that was applied to our row
264    #[instrument(name = "ShoalStorage::<FileSystem>::update", skip_all, err(Debug))]
265    async fn update(&mut self, update: Update<T>) -> Result<u64, ServerError> {
266        // build our intent log update entry
267        let intent = Intents::Update(update);
268        // write this delete intent log
269        self.write_intent(&intent).await?;
270        // get the current position of the stream writer
271        let current = self.intent_log.current_pos();
272        Ok(current)
273    }
274
275    /// Add a pending response action thats data is still being flushed
276    ///
277    /// # Arguments
278    ///
279    /// * `meta` - The metadata for this query
280    /// * `pos` - The position at which this entry will have been flushed to disk
281    /// * `response` - The pending response action
282    fn add_pending(&mut self, meta: QueryMetadata, pos: u64, response: ResponseAction<T>) {
283        // add this pending action to our pending queue
284        self.pending.push_back((pos, meta, response));
285    }
286
287    /// Get all flushed response actions
288    ///
289    /// # Arguments
290    ///
291    /// * `flushed` - The flushed actions to return
292    async fn get_flushed(
293        &mut self,
294        flushed: &mut Vec<(SocketAddr, Response<T>)>,
295    ) -> Result<(), ServerError> {
296        // check if our current intent log large enough to be compacted
297        // this will also get the current flushed position
298        let flushed_pos = self.is_compactable().await?;
299        // keep popping response actions until we find one that isn't yet flushed
300        // or we have no more response actions to check
301        while !self.pending.is_empty() {
302            // check if the first item has been flushed
303            let is_flushed = match self.pending.front() {
304                Some((pending_pos, _, _)) => flushed_pos >= *pending_pos,
305                None => break,
306            };
307            // if this action has been flushed to disk then pop it
308            if is_flushed {
309                // pop this flushed action
310                if let Some((_, meta, data)) = self.pending.pop_front() {
311                    // build the response for this query
312                    let response = Response {
313                        id: meta.id,
314                        index: meta.index,
315                        data,
316                        end: meta.end,
317                    };
318                    // add this action to our flushed vec
319                    flushed.push((meta.addr, response));
320                }
321            } else {
322                // we don't have any flushed data yet
323                break;
324            }
325        }
326        Ok(())
327    }
328
329    /// Flush all currently pending writes to storage
330    #[instrument(name = "ShoalStorage::<FileSystem>::flush", skip_all, err(Debug))]
331    async fn flush(&mut self) -> Result<(), ServerError> {
332        self.intent_log.sync().await?;
333        Ok(())
334    }
335
336    /// Read an intent log from storage
337    ///
338    /// # Arguments
339    ///
340    /// * `path` - The path to the intent log to read in
341    #[instrument(
342        name = "ShoalStorage::<FileSystem>::read_intents",
343        skip(partitions),
344        err(Debug)
345    )]
346    async fn read_intents(
347        path: &PathBuf,
348        partitions: &mut HashMap<u64, Partition<T>>,
349    ) -> Result<(), ServerError> {
350        // create an intent log reader
351        let mut reader = IntentLogReader::new(path).await?;
352        // iterate over the entries in this intent log
353        while let Some(read) = reader.next_buff().await? {
354            // try to deserialize this row from our intent log
355            let intent = unsafe { rkyv::access_unchecked::<ArchivedIntents<T>>(&read[..]) };
356            // add this intent to our btreemap
357            match intent {
358                ArchivedIntents::Insert(archived) => {
359                    // deserialize this row
360                    let row = T::deserialize(archived)?;
361                    // get the partition key for this row
362                    let key = row.get_partition_key();
363                    // get this rows partition
364                    let entry = partitions.entry(key).or_insert_with(|| Partition::new(key));
365                    // insert this row
366                    entry.insert(row);
367                }
368                ArchivedIntents::Delete {
369                    partition_key,
370                    sort_key,
371                } => {
372                    // convert our partition key to its native endianess
373                    let partition_key = partition_key.to_native();
374                    // get the partition to delete a row from
375                    if let Some(partition) = partitions.get_mut(&partition_key) {
376                        // deserialize this rows sort key
377                        let sort_key = rkyv::deserialize::<T::Sort, rkyv::rancor::Error>(sort_key)?;
378                        // remove the sort key from this partition
379                        partition.remove(&sort_key);
380                    }
381                }
382                ArchivedIntents::Update(archived) => {
383                    // deserialize this row's update
384                    let update = rkyv::deserialize::<Update<T>, rkyv::rancor::Error>(archived)?;
385                    // try to get the partition containing our target row
386                    if let Some(partition) = partitions.get_mut(&update.partition_key) {
387                        // find our target row
388                        if !partition.update(&update) {
389                            panic!("Missing row update?");
390                        }
391                    }
392                }
393            }
394        }
395        Ok(())
396    }
397
398    /// Shutdown this storage engine
399    #[allow(async_fn_in_trait)]
400    async fn shutdown(&mut self) -> Result<(), ServerError> {
401        // flush any remaining intent log writes to disk
402        self.flush().await?;
403        // signal our intent log compactor to shutdown
404        self.intent_tx.send(CompactionJob::Shutdown).await?;
405        // wait for all of our tasks to complete
406        while let Some(task) = self.tasks.next().await {
407            // check if this task has failed
408            task?;
409        }
410        Ok(())
411    }
412}