sst/
ingest.rs

1//! Tools for ingesting data into a directory.
2
3use std::fs::{remove_file, rename, File};
4use std::path::PathBuf;
5
6use super::log::log_to_builder;
7use super::setsum::Setsum;
8use super::{Builder, Error, LogBuilder, LogOptions, SstBuilder, SstOptions, TABLE_FULL_SIZE};
9
10/////////////////////////////////////////// IngestOptions //////////////////////////////////////////
11
12/// IngestOptions captures what we care about for ingesting data.
13#[derive(Clone, Debug, Eq, PartialEq)]
14#[cfg_attr(feature = "command_line", derive(arrrg_derive::CommandLine))]
15pub struct IngestOptions {
16    /// The directory in which to write log files.
17    #[cfg_attr(feature = "command_line", arrrg(required, "Path to write logs."))]
18    log_dir: String,
19    /// The LogOptions to use for ingesting data.
20    #[cfg_attr(feature = "command_line", arrrg(nested))]
21    log: LogOptions,
22    /// The directory in which to put ssts once generated.
23    #[cfg_attr(feature = "command_line", arrrg(required, "Path to write ssts."))]
24    sst_dir: String,
25    /// The options to use for creating ssts.
26    #[cfg_attr(feature = "command_line", arrrg(nested))]
27    sst: SstOptions,
28}
29
30impl Default for IngestOptions {
31    fn default() -> Self {
32        Self {
33            log_dir: "logs".to_owned(),
34            log: LogOptions::default(),
35            sst_dir: "ssts".to_owned(),
36            sst: SstOptions::default(),
37        }
38    }
39}
40
41////////////////////////////////////////////// Jester //////////////////////////////////////////////
42
43/// Jester provides a builder interface and writes logs that get converted into ssts.
44///
45/// It's not intended to be a general-purpose key-value store.  Rather, it is intended for things
46/// like logging of stats.
47///
48/// NOTE:  Jester isn't well tested and doesn't recover logs on errors.  It's a TODO to do so.
49// TODO(rescrv): Make this recover logs on crash/restart.
50pub struct Jester {
51    options: IngestOptions,
52    counter: u64,
53    builder: Option<LogBuilder<File>>,
54    recent: Option<PathBuf>,
55}
56
57impl Jester {
58    /// Create a new Jester from IngestOptions.
59    pub fn new(options: IngestOptions) -> Self {
60        Self {
61            options,
62            counter: 0,
63            builder: None,
64            recent: None,
65        }
66    }
67
68    /// Flush the Jester.
69    pub fn flush(&mut self) -> Result<(), Error> {
70        self.get_builder()?.flush()
71    }
72
73    fn get_builder(&mut self) -> Result<&mut LogBuilder<File>, Error> {
74        if let Some(builder) = &self.builder {
75            let size = builder.approximate_size();
76            if size >= TABLE_FULL_SIZE || size >= self.options.log.rollover_size {
77                self.rollover_builder()?;
78                return self.get_builder();
79            }
80            Ok(self.builder.as_mut().unwrap())
81        } else {
82            loop {
83                let path =
84                    PathBuf::from(&self.options.log_dir).join(format!("{}.log", self.counter));
85                self.counter += 1;
86                if !path.exists() {
87                    self.builder = Some(LogBuilder::new(self.options.log.clone(), &path)?);
88                    self.recent = Some(path);
89                    return Ok(self.builder.as_mut().unwrap());
90                }
91            }
92        }
93    }
94
95    fn rollover_builder(&mut self) -> Result<(), Error> {
96        if self.builder.is_some() {
97            let builder = self.builder.take().unwrap();
98            let setsum = builder.seal()?.0;
99            let recent = self.recent.take().unwrap();
100            self.convert_builder(recent, setsum)?;
101        }
102        Ok(())
103    }
104
105    fn convert_builder(&mut self, input: PathBuf, setsum: Setsum) -> Result<(), Error> {
106        let output =
107            PathBuf::from(&self.options.sst_dir).join(format!("{}.tmp", setsum.hexdigest()));
108        let builder = SstBuilder::new(self.options.sst.clone(), &output)?;
109        log_to_builder(self.options.log.clone(), &input, builder)?;
110        let final_file =
111            PathBuf::from(&self.options.sst_dir).join(format!("{}.sst", setsum.hexdigest()));
112        rename(output, final_file)?;
113        remove_file(input)?;
114        Ok(())
115    }
116}
117
118impl Builder for Jester {
119    type Sealed = ();
120
121    /// The approximate size of the current log segment.
122    fn approximate_size(&self) -> usize {
123        match &self.builder {
124            Some(b) => b.approximate_size(),
125            None => 0,
126        }
127    }
128
129    fn put(&mut self, key: &[u8], timestamp: u64, value: &[u8]) -> Result<(), Error> {
130        match self.get_builder()?.put(key, timestamp, value) {
131            Ok(_) => Ok(()),
132            Err(Error::TableFull { .. }) => {
133                self.rollover_builder()?;
134                self.put(key, timestamp, value)
135            }
136            Err(err) => Err(err),
137        }
138    }
139
140    fn del(&mut self, key: &[u8], timestamp: u64) -> Result<(), Error> {
141        match self.get_builder()?.del(key, timestamp) {
142            Ok(_) => Ok(()),
143            Err(Error::TableFull { .. }) => {
144                self.rollover_builder()?;
145                self.del(key, timestamp)
146            }
147            Err(err) => Err(err),
148        }
149    }
150
151    fn seal(mut self) -> Result<(), Error> {
152        self.rollover_builder()?;
153        Ok(())
154    }
155}