1use crate::{
2 codec,
3 ext::*,
4 storage::{self, Output as _, Storage},
5};
6use base64::URL_SAFE_NO_PAD;
7use blake3::Hasher;
8use euphony_compiler::{Entry, Hash, Writer};
9use euphony_node::{BoxProcessor, Sink};
10use euphony_units::coordinates::Polar;
11use std::{
12 fs,
13 io::{self, Write},
14 path::{Path, PathBuf},
15 sync::Arc,
16};
17use tempfile::{NamedTempFile, TempPath};
18
19pub type File = io::BufWriter<fs::File>;
20
21#[derive(Clone, Debug)]
22pub struct Directory {
23 state: State,
24}
25
26#[derive(Clone, Debug)]
27struct State {
28 path: Arc<PathBuf>,
29 hasher: Hasher,
30}
31
32impl State {
33 fn hash_path(&self, hash: &Hash) -> PathBuf {
34 let mut out = [b'A'; 64];
35 let len = base64::encode_config_slice(hash, URL_SAFE_NO_PAD, &mut out);
36 let out = unsafe { core::str::from_utf8_unchecked_mut(&mut out) };
37 let out = &out[..len];
38 self.path.join(out)
39 }
40}
41
42pub struct Output(OState);
43
44#[allow(clippy::large_enum_variant)]
45enum OState {
46 PreHashed {
47 file: File,
48 hash: Hash,
49 },
50 Incremental {
51 file: File,
52 path: Option<TempPath>,
53 state: State,
54 },
55}
56
57impl storage::Output for Output {
58 #[inline]
59 fn write(&mut self, bytes: &[u8]) {
60 let result = match &mut self.0 {
61 OState::PreHashed { file, .. } => file.write_all(bytes),
62 OState::Incremental { file, state, .. } => {
63 state.hasher.update(bytes);
64 file.write_all(bytes)
65 }
66 };
67
68 if let Err(err) = result {
69 log::error!("could write samples to output: {}", err);
70 }
71 }
72
73 #[inline]
74 fn finish(&mut self) -> Hash {
75 let result = match &mut self.0 {
76 OState::PreHashed { file, hash } => file.flush().map(|_| *hash),
77 OState::Incremental { file, state, path } => {
78 let tmp_path = path.take().expect("cannot finalize twice");
79 let hash = *state.hasher.finalize().as_bytes();
80 file.flush().and_then(|_| {
81 let new_path = state.hash_path(&hash);
82
83 tmp_path
84 .persist(new_path)
85 .map(|_| hash)
86 .map_err(|e| e.error)
87 })
88 }
89 };
90
91 result.expect("could not finish file")
92 }
93}
94
95impl Default for Directory {
96 fn default() -> Self {
97 Self::new(PathBuf::from("target/euphony/contents"))
98 }
99}
100
101impl Directory {
102 pub fn new(path: PathBuf) -> Self {
103 Self {
104 state: State {
105 path: Arc::new(path),
106 hasher: Hasher::new(),
107 },
108 }
109 }
110
111 pub fn path(&self) -> &Path {
112 &self.state.path
113 }
114
115 fn hash_path(&self, hash: &Hash) -> PathBuf {
116 let mut out = [b'A'; 64];
117 let len = base64::encode_config_slice(hash, URL_SAFE_NO_PAD, &mut out);
118 let out = unsafe { core::str::from_utf8_unchecked_mut(&mut out) };
119 let out = &out[..len];
120 self.state.path.join(out)
121 }
122
123 fn open(&self, hash: &Hash) -> io::Result<Option<File>> {
124 let path = self.hash_path(hash);
125 fs::OpenOptions::new()
126 .create_new(true)
127 .write(true)
128 .open(path)
129 .map(io::BufWriter::new)
130 .map(Some)
131 .or_else(|err| {
132 if err.kind() == io::ErrorKind::AlreadyExists {
133 Ok(None)
134 } else {
135 Err(err)
136 }
137 })
138 }
139
140 fn write_group<I: Iterator<Item = Entry>>(file: Option<File>, entries: I) -> io::Result<()> {
141 if let Some(mut file) = file {
142 for entry in entries {
143 file.write_all(&entry.sample_offset.to_le_bytes())?;
144 file.write_all(&entry.hash)?;
145 }
146 }
147 Ok(())
148 }
149}
150
151impl Storage for Directory {
152 type Output = Output;
153 type Reader = io::BufReader<fs::File>;
154 type Group = GroupReader;
155 type Sink = codec::Reader<Self::Reader>;
156
157 fn create(&self) -> Self::Output {
158 let (file, path) = NamedTempFile::new().unwrap().into_parts();
159 let file = io::BufWriter::new(file);
160 let state = self.state.clone();
161 Output(OState::Incremental {
162 file,
163 state,
164 path: Some(path),
165 })
166 }
167
168 fn open_raw(&self, hash: &Hash) -> io::Result<Self::Reader> {
169 let path = self.hash_path(hash);
170 let file = fs::File::open(path)?;
171 let file = io::BufReader::new(file);
172 Ok(file)
173 }
174
175 fn open_group(&self, hash: &Hash) -> io::Result<Self::Group> {
176 let group = self.open_raw(hash)?;
177 Ok(GroupReader(group))
178 }
179
180 fn open_sink(&self, hash: &Hash) -> io::Result<Self::Sink> {
181 codec::Reader::new(self, hash)
182 }
183}
184
185impl Writer for Directory {
186 fn is_cached(&self, hash: &Hash) -> bool {
187 self.hash_path(hash).exists()
188 }
189
190 fn sink(&mut self, hash: &Hash) -> BoxProcessor {
191 if let Some(file) = self.open(hash).unwrap() {
192 let output = Output(OState::PreHashed { file, hash: *hash });
193 codec::Writer::new(self, output).spawn()
194 } else {
195 NoopSink.spawn()
196 }
197 }
198
199 fn group<I: Iterator<Item = Entry>>(&mut self, name: &str, hash: &Hash, entries: I) {
200 match self
201 .open(hash)
202 .and_then(|file| Self::write_group(file, entries))
203 {
204 Ok(()) => {}
205 Err(err) => {
206 log::error!("could not create group {:?}: {}", name, err);
207 }
208 }
209 }
210
211 fn buffer<
212 F: FnOnce(
213 Box<dyn euphony_compiler::BufferReader>,
214 ) -> euphony_compiler::Result<Vec<euphony_compiler::ConvertedBuffer>, E>,
215 E,
216 >(
217 &self,
218 path: &str,
219 sample_rate: u64,
220 init: F,
221 ) -> euphony_compiler::Result<Vec<euphony_compiler::CachedBuffer>, E> {
222 let mut hasher = Hasher::new();
223 hasher.update(&sample_rate.to_le_bytes());
224 hasher.update(path.as_bytes());
225 let path_hash = *hasher.finalize().as_bytes();
226 let contents = std::fs::File::open(path).unwrap();
227
228 let mut buffers = vec![];
229
230 if let Ok(mut reader) = self.open_raw(&path_hash) {
231 let contents_modified = contents.metadata().unwrap().modified().unwrap();
232 let hashes_modified = reader.get_ref().metadata().unwrap().modified().unwrap();
233
234 if hashes_modified >= contents_modified {
236 while let Some(hash) = read_result(reader.read_hash()).unwrap() {
237 let mut input = self.open_raw(&hash).unwrap();
238 let mut samples = vec![];
239 while let Some(sample) = read_result(input.read_f64()).unwrap() {
240 samples.push(sample);
241 }
242 let samples = Arc::from(samples);
243 buffers.push(euphony_compiler::CachedBuffer { samples, hash });
244 }
245
246 return Ok(buffers);
247 }
248 }
249
250 let res = init(Box::new(contents))?;
251
252 let mut hashes = self.open(&path_hash).unwrap();
253 for samples in res {
254 let mut out = self.create();
255 let ptr = samples.as_ptr();
256 let len = samples.len() * 8;
257 let bytes = unsafe { core::slice::from_raw_parts(ptr as _, len) };
258 out.write(bytes);
259
260 let hash = out.finish();
261 if let Some(hashes) = hashes.as_mut() {
262 hashes.write_all(&hash).unwrap();
263 }
264
265 let samples = Arc::from(samples);
266 buffers.push(euphony_compiler::CachedBuffer { samples, hash });
267 }
268
269 Ok(buffers)
270 }
271}
272
273fn read_result<T>(result: io::Result<T>) -> io::Result<Option<T>> {
274 match result {
275 Ok(value) => Ok(Some(value)),
276 Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => Ok(None),
277 Err(err) => Err(err),
278 }
279}
280
281struct NoopSink;
282
283impl Sink for NoopSink {
284 #[inline]
285 fn write<S: Iterator<Item = (f64, Polar<f64>)>>(&mut self, _samples: S) {}
286}
287
288pub struct GroupReader(io::BufReader<fs::File>);
289
290impl Iterator for GroupReader {
291 type Item = io::Result<Entry>;
292
293 #[inline]
294 fn next(&mut self) -> Option<Self::Item> {
295 match self.0.read_u64() {
296 Ok(sample_offset) => Some(self.0.read_hash().map(|hash| Entry {
297 sample_offset,
298 hash,
299 })),
300 Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => None,
301 Err(err) => Some(Err(err)),
302 }
303 }
304}