1use std::fs::{remove_file, rename, File};
4use std::path::PathBuf;
5
6use super::log::log_to_builder;
7use super::setsum::Setsum;
8use super::{Builder, Error, LogBuilder, LogOptions, SstBuilder, SstOptions, TABLE_FULL_SIZE};
9
10#[derive(Clone, Debug, Eq, PartialEq)]
14#[cfg_attr(feature = "command_line", derive(arrrg_derive::CommandLine))]
15pub struct IngestOptions {
16 #[cfg_attr(feature = "command_line", arrrg(required, "Path to write logs."))]
18 log_dir: String,
19 #[cfg_attr(feature = "command_line", arrrg(nested))]
21 log: LogOptions,
22 #[cfg_attr(feature = "command_line", arrrg(required, "Path to write ssts."))]
24 sst_dir: String,
25 #[cfg_attr(feature = "command_line", arrrg(nested))]
27 sst: SstOptions,
28}
29
30impl Default for IngestOptions {
31 fn default() -> Self {
32 Self {
33 log_dir: "logs".to_owned(),
34 log: LogOptions::default(),
35 sst_dir: "ssts".to_owned(),
36 sst: SstOptions::default(),
37 }
38 }
39}
40
41pub struct Jester {
51 options: IngestOptions,
52 counter: u64,
53 builder: Option<LogBuilder<File>>,
54 recent: Option<PathBuf>,
55}
56
57impl Jester {
58 pub fn new(options: IngestOptions) -> Self {
60 Self {
61 options,
62 counter: 0,
63 builder: None,
64 recent: None,
65 }
66 }
67
68 pub fn flush(&mut self) -> Result<(), Error> {
70 self.get_builder()?.flush()
71 }
72
73 fn get_builder(&mut self) -> Result<&mut LogBuilder<File>, Error> {
74 if let Some(builder) = &self.builder {
75 let size = builder.approximate_size();
76 if size >= TABLE_FULL_SIZE || size >= self.options.log.rollover_size {
77 self.rollover_builder()?;
78 return self.get_builder();
79 }
80 Ok(self.builder.as_mut().unwrap())
81 } else {
82 loop {
83 let path =
84 PathBuf::from(&self.options.log_dir).join(format!("{}.log", self.counter));
85 self.counter += 1;
86 if !path.exists() {
87 self.builder = Some(LogBuilder::new(self.options.log.clone(), &path)?);
88 self.recent = Some(path);
89 return Ok(self.builder.as_mut().unwrap());
90 }
91 }
92 }
93 }
94
95 fn rollover_builder(&mut self) -> Result<(), Error> {
96 if self.builder.is_some() {
97 let builder = self.builder.take().unwrap();
98 let setsum = builder.seal()?.0;
99 let recent = self.recent.take().unwrap();
100 self.convert_builder(recent, setsum)?;
101 }
102 Ok(())
103 }
104
105 fn convert_builder(&mut self, input: PathBuf, setsum: Setsum) -> Result<(), Error> {
106 let output =
107 PathBuf::from(&self.options.sst_dir).join(format!("{}.tmp", setsum.hexdigest()));
108 let builder = SstBuilder::new(self.options.sst.clone(), &output)?;
109 log_to_builder(self.options.log.clone(), &input, builder)?;
110 let final_file =
111 PathBuf::from(&self.options.sst_dir).join(format!("{}.sst", setsum.hexdigest()));
112 rename(output, final_file)?;
113 remove_file(input)?;
114 Ok(())
115 }
116}
117
118impl Builder for Jester {
119 type Sealed = ();
120
121 fn approximate_size(&self) -> usize {
123 match &self.builder {
124 Some(b) => b.approximate_size(),
125 None => 0,
126 }
127 }
128
129 fn put(&mut self, key: &[u8], timestamp: u64, value: &[u8]) -> Result<(), Error> {
130 match self.get_builder()?.put(key, timestamp, value) {
131 Ok(_) => Ok(()),
132 Err(Error::TableFull { .. }) => {
133 self.rollover_builder()?;
134 self.put(key, timestamp, value)
135 }
136 Err(err) => Err(err),
137 }
138 }
139
140 fn del(&mut self, key: &[u8], timestamp: u64) -> Result<(), Error> {
141 match self.get_builder()?.del(key, timestamp) {
142 Ok(_) => Ok(()),
143 Err(Error::TableFull { .. }) => {
144 self.rollover_builder()?;
145 self.del(key, timestamp)
146 }
147 Err(err) => Err(err),
148 }
149 }
150
151 fn seal(mut self) -> Result<(), Error> {
152 self.rollover_builder()?;
153 Ok(())
154 }
155}