euphony_buffer/
lib.rs

1use base64::URL_SAFE_NO_PAD;
2use once_cell::sync::{Lazy, OnceCell};
3use serde::{Deserialize, Serialize};
4use std::{
5    ffi::OsStr,
6    fmt,
7    io::Read,
8    path::{Path, PathBuf},
9    time::{Duration, SystemTime},
10};
11
12#[cfg(feature = "decode")]
13use symphonia::core::codecs::CodecParameters;
14
15#[cfg(feature = "decode")]
16pub use symphonia;
17#[cfg(feature = "decode")]
18pub mod decode;
19pub mod hash;
20
21static TARGET_DIR: Lazy<PathBuf> = Lazy::new(|| {
22    let dir = std::env::var("EUPHONY_TARGET_DIR").unwrap_or_else(|_| "target/euphony".to_owned());
23    std::fs::create_dir_all(&dir).unwrap();
24    PathBuf::from(dir).canonicalize().unwrap()
25});
26
27static BUFFER_DIR: Lazy<PathBuf> = Lazy::new(|| {
28    let dir = TARGET_DIR.join("buffers");
29    std::fs::create_dir_all(&dir).unwrap();
30    dir
31});
32
33pub struct Buffer<S = &'static str> {
34    source: S,
35    values: Values,
36}
37
38impl<S: AsRef<str>> fmt::Debug for Buffer<S> {
39    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
40        f.debug_tuple("Buffer")
41            .field(&self.source.as_ref())
42            .finish()
43    }
44}
45
46impl<S> Buffer<S> {
47    pub const fn new(source: S) -> Self {
48        Self {
49            source,
50            values: Values::new(),
51        }
52    }
53}
54
55impl<S: AsRef<str>> Buffer<S> {
56    #[doc(hidden)]
57    pub fn initialize<F: FnOnce(&Path, &str) -> u64>(&self, init: F) -> u64 {
58        self.values
59            .initialize(&BUFFER_DIR, self.source.as_ref(), init)
60    }
61
62    pub fn duration(&self) -> Duration {
63        self.meta().duration()
64    }
65
66    pub fn channels(&self) -> u32 {
67        self.meta().channels
68    }
69
70    pub fn channel(&self, index: u64) -> Channel<Self> {
71        assert!(self.channels() as u64 > index);
72        Channel(self, index)
73    }
74
75    fn meta(&self) -> &Meta {
76        self.values.meta(&BUFFER_DIR, self.source.as_ref())
77    }
78}
79
80#[cfg(feature = "host")]
81impl Buffer<String> {
82    pub fn init(msg: euphony_command::InitBuffer) -> std::io::Result<()> {
83        let buffer = Self::new(msg.source);
84
85        let meta = buffer
86            .values
87            .meta_path
88            .get_or_init(|| PathBuf::from(msg.meta));
89
90        let buffer_dir = meta.parent().expect("missing parent path");
91
92        buffer.values.load_meta(buffer_dir, &buffer.source)?;
93
94        Ok(())
95    }
96}
97
98struct Values {
99    meta_path: OnceCell<PathBuf>,
100    contents_path: OnceCell<PathBuf>,
101    meta: OnceCell<Meta>,
102    id: OnceCell<u64>,
103}
104
105impl Values {
106    const fn new() -> Self {
107        Self {
108            meta_path: OnceCell::new(),
109            contents_path: OnceCell::new(),
110            meta: OnceCell::new(),
111            id: OnceCell::new(),
112        }
113    }
114
115    fn initialize<F: FnOnce(&Path, &str) -> u64>(
116        &self,
117        buffer_dir: &Path,
118        source: &str,
119        init: F,
120    ) -> u64 {
121        *self.id.get_or_init(|| {
122            let meta = self.meta(buffer_dir, source);
123            let contents = self.contents_path(buffer_dir, source);
124            init(contents, meta.ext.as_deref().unwrap_or(""))
125        })
126    }
127
128    fn contents_path(&self, buffer_dir: &Path, source: &str) -> &Path {
129        self.contents_path.get_or_init(|| {
130            if source.starts_with("https://") || source.starts_with("http://") {
131                return self.http(buffer_dir, source);
132            }
133
134            self.local(buffer_dir, source)
135        })
136    }
137
138    #[cfg(not(feature = "http"))]
139    fn http(&self, buffer_dir: &Path, source: &str) -> PathBuf {
140        self.meta(buffer_dir, source).contents.to_owned()
141    }
142
143    #[cfg(feature = "http")]
144    fn http(&self, buffer_dir: &Path, source: &str) -> PathBuf {
145        let meta_path = self.meta_path(buffer_dir, source);
146        if meta_path.exists() {
147            return self.meta(buffer_dir, source).contents.to_owned();
148        }
149
150        log::info!(" Downloading {}", source);
151
152        let ext = Path::new(source)
153            .extension()
154            .and_then(|e| e.to_str())
155            .unwrap_or("");
156
157        hash::create(buffer_dir, ext, |writer| {
158            reqwest::blocking::get(source)
159                .unwrap()
160                .error_for_status()
161                .unwrap()
162                .copy_to(writer)
163                .unwrap();
164            Ok(())
165        })
166        .unwrap()
167    }
168
169    fn local(&self, buffer_dir: &Path, source: &str) -> PathBuf {
170        let meta_path = self.meta_path(buffer_dir, source);
171
172        if meta_path.exists() {
173            let source_modifed = modified(&source);
174            let meta_modifed = modified(&self.meta_path(buffer_dir, source));
175            if source_modifed <= meta_modifed {
176                return self.meta(buffer_dir, source).contents.to_owned();
177            } else {
178                // remove the old path since the source has been updated
179                let _ = std::fs::remove_file(meta_path);
180            }
181        }
182
183        let mut file = std::fs::File::open(source).unwrap();
184        let hash = hash_reader(&mut file);
185        let path = hash_path(&BUFFER_DIR, &hash);
186        std::fs::copy(source, &path).unwrap();
187        path
188    }
189
190    fn meta_path(&self, buffer_dir: &Path, source: &str) -> &Path {
191        self.meta_path.get_or_init(|| {
192            let hash = blake3::hash(source.as_bytes());
193            hash_path(buffer_dir, &hash)
194        })
195    }
196
197    fn meta(&self, buffer_dir: &Path, source: &str) -> &Meta {
198        self.meta.get_or_init(|| {
199            self.load_meta(buffer_dir, source)
200                .unwrap_or_else(|err| panic!("error while loading buffer {:?} - {:?}", source, err))
201        })
202    }
203
204    fn load_meta(&self, buffer_dir: &Path, source: &str) -> std::io::Result<Meta> {
205        let meta_path = self.meta_path(buffer_dir, source);
206        if meta_path.exists() {
207            return json_from_path(meta_path);
208        }
209
210        #[cfg(not(feature = "decode"))]
211        {
212            euphony_command::api::init_buffer(source.to_string(), meta_path);
213            euphony_command::api::flush();
214
215            for i in 0..7 {
216                std::thread::sleep(core::time::Duration::from_millis(100 * 2u64.pow(i)));
217                if meta_path.exists() {
218                    return json_from_path(meta_path);
219                }
220            }
221
222            panic!("{:?} was not downloaded", source);
223        }
224
225        #[cfg(feature = "decode")]
226        {
227            use std::io::Write;
228
229            let ext = Path::new(source).extension().and_then(|e| e.to_str());
230
231            let contents_path = self.contents_path(buffer_dir, source);
232
233            let contents = std::fs::File::open(contents_path)?;
234            let format = decode::reader(contents, ext.unwrap_or(""))
235                .map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))?;
236
237            let mut meta = Meta {
238                contents: contents_path.to_owned(),
239                frames: 0,
240                sample_rate: 0,
241                channels: 0,
242                ext: ext.map(String::from),
243            };
244
245            if let Some(track) = format.default_track() {
246                meta.update(&track.codec_params);
247            }
248
249            let meta_file = std::fs::File::create(meta_path)?;
250            let mut meta_file = std::io::BufWriter::new(meta_file);
251            serde_json::to_writer(&mut meta_file, &meta)?;
252            meta_file.flush()?;
253
254            Ok(meta)
255        }
256    }
257}
258
259pub struct Channel<'a, Buffer>(&'a Buffer, u64);
260
261impl<'a, B> Channel<'a, B> {
262    pub fn buffer(&self) -> &B {
263        self.0
264    }
265
266    pub fn channel(&self) -> u64 {
267        self.1
268    }
269}
270
271pub trait AsChannel {
272    fn buffer<F: FnOnce(&Path, &str) -> u64>(&self, init: F) -> u64;
273    fn duration(&self) -> Duration;
274    fn channel(&self) -> u64;
275}
276
277impl<T: AsChannel> AsChannel for &T {
278    fn buffer<F: FnOnce(&Path, &str) -> u64>(&self, init: F) -> u64 {
279        (*self).buffer(init)
280    }
281
282    fn duration(&self) -> Duration {
283        (*self).duration()
284    }
285
286    fn channel(&self) -> u64 {
287        (*self).channel()
288    }
289}
290
291impl<'a, S: AsRef<str>> AsChannel for Channel<'a, Buffer<S>> {
292    fn buffer<F: FnOnce(&Path, &str) -> u64>(&self, init: F) -> u64 {
293        self.0.initialize(init)
294    }
295
296    fn duration(&self) -> Duration {
297        self.0.duration()
298    }
299
300    fn channel(&self) -> u64 {
301        self.1
302    }
303}
304
305impl<S: AsRef<str>> AsChannel for Buffer<S> {
306    fn buffer<F: FnOnce(&Path, &str) -> u64>(&self, init: F) -> u64 {
307        self.initialize(init)
308    }
309
310    fn duration(&self) -> Duration {
311        self.duration()
312    }
313
314    fn channel(&self) -> u64 {
315        if self.channels() == 0 {
316            panic!("invalid buffer")
317        }
318        0
319    }
320}
321
322#[derive(Debug, Deserialize, Serialize)]
323struct Meta {
324    contents: PathBuf,
325    frames: u64,
326    sample_rate: u32,
327    channels: u32,
328    ext: Option<String>,
329}
330
331impl Meta {
332    #[cfg(feature = "decode")]
333    fn update(&mut self, params: &CodecParameters) {
334        if let Some(frames) = params.n_frames {
335            self.frames = frames;
336        }
337
338        if let Some(sample_rate) = params.sample_rate {
339            self.sample_rate = sample_rate;
340        }
341
342        if let Some(c) = params.channels {
343            self.channels = c.count() as _;
344        }
345    }
346
347    #[inline]
348    fn duration(&self) -> Duration {
349        if self.sample_rate == 0 || self.frames == 0 {
350            return Duration::ZERO;
351        }
352        Duration::from_secs(self.frames) / self.sample_rate
353    }
354}
355
356fn json_from_path<T: serde::de::DeserializeOwned>(path: &Path) -> std::io::Result<T> {
357    let file = std::fs::File::open(path)?;
358    let file = std::io::BufReader::new(file);
359    Ok(serde_json::from_reader(file)?)
360}
361
362fn modified(path: &impl AsRef<OsStr>) -> SystemTime {
363    Path::new(path).metadata().unwrap().modified().unwrap()
364}
365
366fn hash_path(root: &Path, hash: &blake3::Hash) -> PathBuf {
367    let mut out = [b'A'; 64];
368    let len = base64::encode_config_slice(hash.as_bytes(), URL_SAFE_NO_PAD, &mut out);
369    let out = unsafe { core::str::from_utf8_unchecked_mut(&mut out) };
370    root.join(&out[..len])
371}
372
373fn hash_reader(r: &mut impl Read) -> blake3::Hash {
374    let mut hasher = blake3::Hasher::new();
375    let mut buf = [0; 4096];
376    loop {
377        let len = r.read(&mut buf).unwrap();
378
379        if len == 0 {
380            return hasher.finalize();
381        }
382
383        hasher.update(&buf[..len]);
384    }
385}