euphony_store/storage/
fs.rs

1use crate::{
2    codec,
3    ext::*,
4    storage::{self, Output as _, Storage},
5};
6use base64::URL_SAFE_NO_PAD;
7use blake3::Hasher;
8use euphony_compiler::{Entry, Hash, Writer};
9use euphony_node::{BoxProcessor, Sink};
10use euphony_units::coordinates::Polar;
11use std::{
12    fs,
13    io::{self, Write},
14    path::{Path, PathBuf},
15    sync::Arc,
16};
17use tempfile::{NamedTempFile, TempPath};
18
19pub type File = io::BufWriter<fs::File>;
20
21#[derive(Clone, Debug)]
22pub struct Directory {
23    state: State,
24}
25
26#[derive(Clone, Debug)]
27struct State {
28    path: Arc<PathBuf>,
29    hasher: Hasher,
30}
31
32impl State {
33    fn hash_path(&self, hash: &Hash) -> PathBuf {
34        let mut out = [b'A'; 64];
35        let len = base64::encode_config_slice(hash, URL_SAFE_NO_PAD, &mut out);
36        let out = unsafe { core::str::from_utf8_unchecked_mut(&mut out) };
37        let out = &out[..len];
38        self.path.join(out)
39    }
40}
41
42pub struct Output(OState);
43
44#[allow(clippy::large_enum_variant)]
45enum OState {
46    PreHashed {
47        file: File,
48        hash: Hash,
49    },
50    Incremental {
51        file: File,
52        path: Option<TempPath>,
53        state: State,
54    },
55}
56
57impl storage::Output for Output {
58    #[inline]
59    fn write(&mut self, bytes: &[u8]) {
60        let result = match &mut self.0 {
61            OState::PreHashed { file, .. } => file.write_all(bytes),
62            OState::Incremental { file, state, .. } => {
63                state.hasher.update(bytes);
64                file.write_all(bytes)
65            }
66        };
67
68        if let Err(err) = result {
69            log::error!("could write samples to output: {}", err);
70        }
71    }
72
73    #[inline]
74    fn finish(&mut self) -> Hash {
75        let result = match &mut self.0 {
76            OState::PreHashed { file, hash } => file.flush().map(|_| *hash),
77            OState::Incremental { file, state, path } => {
78                let tmp_path = path.take().expect("cannot finalize twice");
79                let hash = *state.hasher.finalize().as_bytes();
80                file.flush().and_then(|_| {
81                    let new_path = state.hash_path(&hash);
82
83                    tmp_path
84                        .persist(new_path)
85                        .map(|_| hash)
86                        .map_err(|e| e.error)
87                })
88            }
89        };
90
91        result.expect("could not finish file")
92    }
93}
94
95impl Default for Directory {
96    fn default() -> Self {
97        Self::new(PathBuf::from("target/euphony/contents"))
98    }
99}
100
101impl Directory {
102    pub fn new(path: PathBuf) -> Self {
103        Self {
104            state: State {
105                path: Arc::new(path),
106                hasher: Hasher::new(),
107            },
108        }
109    }
110
111    pub fn path(&self) -> &Path {
112        &self.state.path
113    }
114
115    fn hash_path(&self, hash: &Hash) -> PathBuf {
116        let mut out = [b'A'; 64];
117        let len = base64::encode_config_slice(hash, URL_SAFE_NO_PAD, &mut out);
118        let out = unsafe { core::str::from_utf8_unchecked_mut(&mut out) };
119        let out = &out[..len];
120        self.state.path.join(out)
121    }
122
123    fn open(&self, hash: &Hash) -> io::Result<Option<File>> {
124        let path = self.hash_path(hash);
125        fs::OpenOptions::new()
126            .create_new(true)
127            .write(true)
128            .open(path)
129            .map(io::BufWriter::new)
130            .map(Some)
131            .or_else(|err| {
132                if err.kind() == io::ErrorKind::AlreadyExists {
133                    Ok(None)
134                } else {
135                    Err(err)
136                }
137            })
138    }
139
140    fn write_group<I: Iterator<Item = Entry>>(file: Option<File>, entries: I) -> io::Result<()> {
141        if let Some(mut file) = file {
142            for entry in entries {
143                file.write_all(&entry.sample_offset.to_le_bytes())?;
144                file.write_all(&entry.hash)?;
145            }
146        }
147        Ok(())
148    }
149}
150
151impl Storage for Directory {
152    type Output = Output;
153    type Reader = io::BufReader<fs::File>;
154    type Group = GroupReader;
155    type Sink = codec::Reader<Self::Reader>;
156
157    fn create(&self) -> Self::Output {
158        let (file, path) = NamedTempFile::new().unwrap().into_parts();
159        let file = io::BufWriter::new(file);
160        let state = self.state.clone();
161        Output(OState::Incremental {
162            file,
163            state,
164            path: Some(path),
165        })
166    }
167
168    fn open_raw(&self, hash: &Hash) -> io::Result<Self::Reader> {
169        let path = self.hash_path(hash);
170        let file = fs::File::open(path)?;
171        let file = io::BufReader::new(file);
172        Ok(file)
173    }
174
175    fn open_group(&self, hash: &Hash) -> io::Result<Self::Group> {
176        let group = self.open_raw(hash)?;
177        Ok(GroupReader(group))
178    }
179
180    fn open_sink(&self, hash: &Hash) -> io::Result<Self::Sink> {
181        codec::Reader::new(self, hash)
182    }
183}
184
185impl Writer for Directory {
186    fn is_cached(&self, hash: &Hash) -> bool {
187        self.hash_path(hash).exists()
188    }
189
190    fn sink(&mut self, hash: &Hash) -> BoxProcessor {
191        if let Some(file) = self.open(hash).unwrap() {
192            let output = Output(OState::PreHashed { file, hash: *hash });
193            codec::Writer::new(self, output).spawn()
194        } else {
195            NoopSink.spawn()
196        }
197    }
198
199    fn group<I: Iterator<Item = Entry>>(&mut self, name: &str, hash: &Hash, entries: I) {
200        match self
201            .open(hash)
202            .and_then(|file| Self::write_group(file, entries))
203        {
204            Ok(()) => {}
205            Err(err) => {
206                log::error!("could not create group {:?}: {}", name, err);
207            }
208        }
209    }
210
211    fn buffer<
212        F: FnOnce(
213            Box<dyn euphony_compiler::BufferReader>,
214        ) -> euphony_compiler::Result<Vec<euphony_compiler::ConvertedBuffer>, E>,
215        E,
216    >(
217        &self,
218        path: &str,
219        sample_rate: u64,
220        init: F,
221    ) -> euphony_compiler::Result<Vec<euphony_compiler::CachedBuffer>, E> {
222        let mut hasher = Hasher::new();
223        hasher.update(&sample_rate.to_le_bytes());
224        hasher.update(path.as_bytes());
225        let path_hash = *hasher.finalize().as_bytes();
226        let contents = std::fs::File::open(path).unwrap();
227
228        let mut buffers = vec![];
229
230        if let Ok(mut reader) = self.open_raw(&path_hash) {
231            let contents_modified = contents.metadata().unwrap().modified().unwrap();
232            let hashes_modified = reader.get_ref().metadata().unwrap().modified().unwrap();
233
234            // only return if the hashes are newer than the original contents
235            if hashes_modified >= contents_modified {
236                while let Some(hash) = read_result(reader.read_hash()).unwrap() {
237                    let mut input = self.open_raw(&hash).unwrap();
238                    let mut samples = vec![];
239                    while let Some(sample) = read_result(input.read_f64()).unwrap() {
240                        samples.push(sample);
241                    }
242                    let samples = Arc::from(samples);
243                    buffers.push(euphony_compiler::CachedBuffer { samples, hash });
244                }
245
246                return Ok(buffers);
247            }
248        }
249
250        let res = init(Box::new(contents))?;
251
252        let mut hashes = self.open(&path_hash).unwrap();
253        for samples in res {
254            let mut out = self.create();
255            let ptr = samples.as_ptr();
256            let len = samples.len() * 8;
257            let bytes = unsafe { core::slice::from_raw_parts(ptr as _, len) };
258            out.write(bytes);
259
260            let hash = out.finish();
261            if let Some(hashes) = hashes.as_mut() {
262                hashes.write_all(&hash).unwrap();
263            }
264
265            let samples = Arc::from(samples);
266            buffers.push(euphony_compiler::CachedBuffer { samples, hash });
267        }
268
269        Ok(buffers)
270    }
271}
272
273fn read_result<T>(result: io::Result<T>) -> io::Result<Option<T>> {
274    match result {
275        Ok(value) => Ok(Some(value)),
276        Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => Ok(None),
277        Err(err) => Err(err),
278    }
279}
280
281struct NoopSink;
282
283impl Sink for NoopSink {
284    #[inline]
285    fn write<S: Iterator<Item = (f64, Polar<f64>)>>(&mut self, _samples: S) {}
286}
287
288pub struct GroupReader(io::BufReader<fs::File>);
289
290impl Iterator for GroupReader {
291    type Item = io::Result<Entry>;
292
293    #[inline]
294    fn next(&mut self) -> Option<Self::Item> {
295        match self.0.read_u64() {
296            Ok(sample_offset) => Some(self.0.read_hash().map(|hash| Entry {
297                sample_offset,
298                hash,
299            })),
300            Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => None,
301            Err(err) => Some(Err(err)),
302        }
303    }
304}