d4_framefile/
directory.rs

1#[cfg(all(feature = "mapped_io", not(target_arch = "wasm32")))]
2use crate::mapped::MappedDirectory;
3use crate::randfile::RandFile;
4use crate::stream::Stream;
5use crate::Blob;
6use std::io::{Error, ErrorKind, Read, Result, Seek, Write};
7
8#[cfg(all(feature = "mapped_io", not(target_arch = "wasm32")))]
9use std::{fs::File, io::SeekFrom};
10
11use std::path::{Component, Path, PathBuf};
12use std::sync::{Arc, RwLock};
13
14#[derive(Copy, Clone, Eq, PartialEq, Debug)]
15pub enum EntryKind {
16    /// A stream with arbitrary size
17    Stream = 0,
18    /// A sub-directory
19    SubDir = 1,
20    /// A fix sized blob
21    Blob = 2,
22}
23
24/// Describes an entry in a directory
25#[derive(Clone, Debug)]
26pub struct Entry {
27    /// The type of the entry
28    pub kind: EntryKind,
29    /// The absolute offset of the beginning of the data object
30    /// - for streams, this is the address of the primary frame of the stream
31    /// - for blobs, this is the address of the actual blob
32    /// - for directory, this is the address of the primary frame of the metadata stream for this directory
33    /// Note in the file, this is actually the relative offset from the beginning of the parent directory
34    pub primary_offset: u64,
35    /// The size of the primary frame of the data object.
36    /// - For blobs and directories,  it describes the **total size** of the object
37    /// - For streams, this only describes the primary frame of the data object
38    pub primary_size: u64,
39    /// The name of the stream
40    pub name: String,
41}
42
43struct DirectoryImpl<T> {
44    offset: u64,
45    entries: Vec<Entry>,
46    stream: Stream<T>,
47}
48
49pub enum OpenResult<T: Read + Seek> {
50    Blob(Blob<T>),
51    Stream(Stream<T>),
52    SubDir(Directory<T>),
53}
54
55impl<T: Read + Write + Seek> DirectoryImpl<T> {
56    fn write_stream(&mut self, data: &[u8]) -> Result<usize> {
57        self.stream.write_with_alloc_callback(data, |s| {
58            s.double_frame_size(65536);
59        })
60    }
61    fn append_directory(&mut self, new_entry: Entry) -> Result<()> {
62        if self.entries.iter().any(|x| x.name == new_entry.name) {
63            return Err(Error::new(
64                ErrorKind::Other,
65                "Directory entry already exists",
66            ));
67        }
68        //self.stream.write(&[1, new_entry.kind as u8])?;
69        self.stream.update_current_byte(1)?;
70        self.write_stream(&[new_entry.kind as u8])?;
71        self.stream
72            .write(&(new_entry.primary_offset - self.offset).to_le_bytes())?;
73        self.write_stream(&new_entry.primary_size.to_le_bytes())?;
74        self.write_stream(
75            new_entry
76                .name
77                .bytes()
78                .chain(std::iter::once(0))
79                .collect::<Vec<_>>()
80                .as_ref(),
81        )?;
82        self.write_stream(&[0])?;
83        self.entries.push(new_entry);
84        Ok(())
85    }
86}
87
88pub struct Directory<T>(Arc<RwLock<DirectoryImpl<T>>>);
89
90impl<T> Clone for Directory<T> {
91    fn clone(&self) -> Self {
92        Directory(self.0.clone())
93    }
94}
95
96impl<T> Directory<T> {
97    // TODO: For internet accessing, this init block size seems too small.
98    pub const INIT_BLOCK_SIZE: usize = 512;
99    /// Get the type of the child object
100    pub fn entry_kind(&self, name: &str) -> Option<EntryKind> {
101        self.0.read().unwrap().entries.iter().find_map(|e| {
102            if e.name == name {
103                return Some(e.kind);
104            }
105            None
106        })
107    }
108}
109
110impl<T: Clone> Directory<T> {
111    pub fn clone_underlying_file(&self) -> Result<RandFile<T>> {
112        let inner = self
113            .0
114            .read()
115            .map_err(|_| Error::new(ErrorKind::Other, "Lock Error"))?;
116        Ok(inner.stream.clone_underlying_file())
117    }
118}
119
120impl<T: Read + Write + Seek> Directory<T> {
121    pub fn make_root(back: T) -> Result<Directory<T>> {
122        let randfile = RandFile::new(back);
123        let stream = Stream::create(randfile, 512)?;
124        let entries = vec![];
125        Ok(Directory(Arc::new(RwLock::new(DirectoryImpl {
126            offset: stream.get_frame_offset().unwrap(),
127            entries,
128            stream,
129        }))))
130    }
131    pub fn open_root_for_update(back: T, offset: u64) -> Result<Directory<T>> {
132        let randfile = RandFile::new(back);
133        Self::open_directory_rw_impl(randfile, offset)
134    }
135
136    pub fn flush(&mut self) -> Result<()> {
137        let mut inner = self
138            .0
139            .write()
140            .map_err(|_| Error::new(ErrorKind::Other, "Lock Error"))?;
141        inner.stream.flush()
142    }
143
144    pub fn create_blob(&mut self, name: &str, size: usize) -> Result<Blob<T>> {
145        let mut inner = self
146            .0
147            .write()
148            .map_err(|_| Error::new(ErrorKind::Other, "Lock Error"))?;
149        let mut file = inner.stream.clone_underlying_file();
150        let offset = file.reserve_block(size)?;
151        inner.append_directory(Entry {
152            kind: EntryKind::Blob,
153            primary_offset: offset,
154            primary_size: size as u64,
155            name: name.to_string(),
156        })?;
157        Ok(Blob::new(file, offset, size))
158    }
159
160    pub fn open_or_create_directory(&mut self, name: &str) -> Result<Directory<T>>
161    where
162        T: Send + 'static,
163    {
164        if let Ok(dir) = self.open_directory_for_update(name) {
165            Ok(dir)
166        } else {
167            self.create_directory(name)
168        }
169    }
170
171    pub fn create_directory(&mut self, name: &str) -> Result<Directory<T>>
172    where
173        T: Send + 'static,
174    {
175        let file = {
176            let mut parent_file = self
177                .0
178                .read()
179                .map_err(|_| Error::new(ErrorKind::Other, "Lock Error"))?
180                .stream
181                .clone_underlying_file();
182            let dir_addr = parent_file.size()?;
183            let parent_directory = self.clone();
184            let name = name.to_string();
185            parent_file.clone().lock(Box::new(move || {
186                let kind = EntryKind::SubDir;
187                let primary_offset = dir_addr;
188                let primary_size = parent_file.size().unwrap() - dir_addr;
189                let mut inner = parent_directory.0.write().unwrap();
190                let entry = Entry {
191                    kind,
192                    primary_offset,
193                    primary_size,
194                    name,
195                };
196                inner.append_directory(entry).unwrap();
197            }))?
198        };
199        let stream = Stream::create(file, Self::INIT_BLOCK_SIZE)?;
200        let entries = vec![];
201        Ok(Directory(Arc::new(RwLock::new(DirectoryImpl {
202            offset: stream.get_frame_offset().unwrap(),
203            entries,
204            stream,
205        }))))
206    }
207    pub fn create_stream(&mut self, name: &str, frame_size: usize) -> Result<Stream<T>> {
208        let mut inner = self
209            .0
210            .write()
211            .map_err(|_| Error::new(ErrorKind::Other, "Lock Error"))?;
212        let file = inner.stream.clone_underlying_file();
213        let stream = Stream::create(file, frame_size)?;
214        inner.append_directory(Entry {
215            kind: EntryKind::Stream,
216            primary_offset: stream.get_frame_offset().unwrap(),
217            primary_size: stream.get_frame_size().unwrap() as u64,
218            name: name.to_string(),
219        })?;
220        Ok(stream)
221    }
222    pub fn open_directory_for_update(&self, name: &str) -> Result<Directory<T>> {
223        self.open_directory_impl(name, Self::open_directory_rw_impl)
224    }
225}
226
227#[cfg(all(feature = "mapped_io", not(target_arch = "wasm32")))]
228impl Directory<File> {
229    pub fn copy_directory_from_file<T: Read + Seek>(
230        &mut self,
231        name: &str,
232        mut source: T,
233        offset: u64,
234        size: usize,
235    ) -> Result<()> {
236        let mut inner = self
237            .0
238            .write()
239            .map_err(|_| Error::new(ErrorKind::Other, "Lock Error"))?;
240        let mut file = inner.stream.clone_underlying_file();
241        let dest_offset = file.reserve_block(size)?;
242        inner.append_directory(Entry {
243            kind: EntryKind::SubDir,
244            primary_offset: dest_offset,
245            primary_size: size as u64,
246            name: name.to_string(),
247        })?;
248        let mut object = Blob::new(file, dest_offset, size);
249        let mut object_data = object.mmap_mut()?;
250        source.seek(SeekFrom::Start(offset))?;
251        source.read_exact(object_data.as_mut())
252    }
253
254    pub fn map_directory(&self, name: &str) -> Result<MappedDirectory> {
255        let inner = self
256            .0
257            .read()
258            .map_err(|_| Error::new(ErrorKind::Other, "Lock Error"))?;
259        if let Some(entry) = inner
260            .entries
261            .iter()
262            .find(|e| e.name == name && e.kind == EntryKind::SubDir)
263        {
264            return MappedDirectory::new(
265                inner.stream.clone_underlying_file(),
266                entry.primary_offset,
267                entry.primary_size as usize,
268            );
269        }
270
271        Err(Error::new(ErrorKind::Other, "Directory not found"))
272    }
273}
274
275impl<T: Read + Seek> Directory<T> {
276    /// Get a list of children under this directory
277    pub fn entries(&self) -> Vec<Entry> {
278        self.0.read().unwrap().entries.clone()
279    }
280    /// Open an root directory from a seek-able backend stream and an **absolute** offset
281    pub fn open_root(back: T, offset: u64) -> Result<Directory<T>> {
282        let randfile = RandFile::new(back);
283        Self::open_directory_ro_impl(randfile, offset)
284    }
285    pub(crate) fn read_next_entry<R: Read>(base: u64, input: &mut R) -> Result<Option<Entry>> {
286        let mut has_next = [0u8];
287        if input.read(&mut has_next)? != 1 || has_next[0] == 0 {
288            return Ok(None);
289        }
290        let mut kind_buffer = [0];
291        let mut offset_buffer = [0; 8];
292        let mut size_buffer = [0; 8];
293        input.read_exact(&mut kind_buffer)?;
294        input.read_exact(&mut offset_buffer)?;
295        input.read_exact(&mut size_buffer)?;
296        let offset = u64::from_le_bytes(offset_buffer) + base;
297        let size = u64::from_le_bytes(size_buffer);
298        let mut name = vec![];
299        let mut current_byte = [0];
300        while input.read(&mut current_byte)? > 0 {
301            if current_byte[0] == 0 {
302                break;
303            }
304            name.push(current_byte[0]);
305        }
306        let name = String::from_utf8_lossy(&name[..]).to_string();
307        let kind = match kind_buffer[0] {
308            0 => EntryKind::Stream,
309            1 => EntryKind::SubDir,
310            2 => EntryKind::Blob,
311            _ => return Err(Error::new(ErrorKind::Other, "Invalid directory type code")),
312        };
313        Ok(Some(Entry {
314            kind,
315            name,
316            primary_offset: offset,
317            primary_size: size,
318        }))
319    }
320    fn open_directory_with_stream(mut stream: Stream<T>, offset: u64) -> Result<Directory<T>> {
321        let mut entries = vec![];
322        while let Some(entry) = Self::read_next_entry(offset, &mut stream)? {
323            entries.push(entry);
324        }
325        Ok(Directory(Arc::new(RwLock::new(DirectoryImpl {
326            offset,
327            entries,
328            stream,
329        }))))
330    }
331    fn open_directory_ro_impl(randfile: RandFile<T>, offset: u64) -> Result<Directory<T>> {
332        let stream = Stream::open_for_read(randfile, (offset, Self::INIT_BLOCK_SIZE))?;
333        Self::open_directory_with_stream(stream, offset)
334    }
335    fn open_directory_rw_impl(randfile: RandFile<T>, offset: u64) -> Result<Directory<T>>
336    where
337        T: Write,
338    {
339        let stream = Stream::open_for_update(randfile, (offset, Self::INIT_BLOCK_SIZE))?;
340        Self::open_directory_with_stream(stream, offset)
341    }
342
343    pub fn open_blob(&self, name: &str) -> Result<Blob<T>> {
344        let inner = self
345            .0
346            .read()
347            .map_err(|_| Error::new(ErrorKind::Other, "Lock Error"))?;
348        if let Some(entry) = inner
349            .entries
350            .iter()
351            .find(|e| e.name == name && e.kind == EntryKind::Blob)
352        {
353            let file = inner.stream.clone_underlying_file();
354            return Ok(Blob::new(
355                file,
356                entry.primary_offset,
357                entry.primary_size as usize,
358            ));
359        }
360        Err(Error::new(ErrorKind::Other, "Chunk not found"))
361    }
362    pub fn open_stream_by_offset(&self, offset: u64, frame_size: usize) -> Result<Stream<T>> {
363        let inner = self
364            .0
365            .read()
366            .map_err(|_| Error::new(ErrorKind::Other, "Lock Error"))?;
367        let file = inner.stream.clone_underlying_file();
368        Stream::open_for_read(file, (offset + inner.offset, frame_size))
369    }
370    pub fn open_stream(&self, name: &str) -> Result<Stream<T>> {
371        let inner = self
372            .0
373            .read()
374            .map_err(|_| Error::new(ErrorKind::Other, "Lock Error"))?;
375        if let Some(entry) = inner
376            .entries
377            .iter()
378            .find(|e| e.name == name && e.kind == EntryKind::Stream)
379        {
380            let file = inner.stream.clone_underlying_file();
381            return Stream::open_for_read(
382                file,
383                (entry.primary_offset, entry.primary_size as usize),
384            );
385        }
386        Err(Error::new(ErrorKind::Other, "Stream not found"))
387    }
388
389    fn open_directory_impl<H: FnOnce(RandFile<T>, u64) -> Result<Directory<T>>>(
390        &self,
391        name: &str,
392        handle: H,
393    ) -> Result<Directory<T>> {
394        let inner = self
395            .0
396            .read()
397            .map_err(|_| Error::new(ErrorKind::Other, "Lock Error"))?;
398        if let Some(entry) = inner
399            .entries
400            .iter()
401            .find(|e| e.name == name && e.kind == EntryKind::SubDir)
402        {
403            let file = inner.stream.clone_underlying_file();
404            return handle(file, entry.primary_offset);
405        }
406        Err(Error::new(ErrorKind::Other, "Stream not found"))
407    }
408
409    pub fn open_directory(&self, name: &str) -> Result<Directory<T>> {
410        self.open_directory_impl(name, Self::open_directory_ro_impl)
411    }
412
413    pub fn open<P: AsRef<Path>>(&self, path: P) -> Result<OpenResult<T>> {
414        let path = path.as_ref();
415        let n_comp = path.components().count();
416        let mut cur_dir = self.clone();
417        if n_comp == 0 {
418            return Ok(OpenResult::SubDir(self.clone()));
419        }
420        for (idx, comp) in path.components().enumerate() {
421            let comp = match comp {
422                Component::Normal(name) => name.to_string_lossy().to_owned(),
423                _ => continue,
424            };
425            if idx < n_comp - 1 {
426                cur_dir = cur_dir.open_directory(&comp)?
427            } else {
428                match cur_dir.entries().into_iter().find(|e| e.name == comp) {
429                    Some(Entry {
430                        kind: EntryKind::Blob,
431                        ..
432                    }) => {
433                        return cur_dir.open_blob(&comp).map(OpenResult::Blob);
434                    }
435                    Some(Entry {
436                        kind: EntryKind::Stream,
437                        ..
438                    }) => {
439                        return cur_dir.open_stream(&comp).map(OpenResult::Stream);
440                    }
441                    Some(Entry {
442                        kind: EntryKind::SubDir,
443                        ..
444                    }) => {
445                        return cur_dir.open_directory(&comp).map(OpenResult::SubDir);
446                    }
447                    None => {
448                        return Err(Error::new(ErrorKind::Other, "Object not found"));
449                    }
450                }
451            }
452        }
453        Err(Error::new(ErrorKind::Other, "Invalid path"))
454    }
455
456    fn recurse_impl<Handle: FnMut(&Path, EntryKind) -> bool>(
457        &self,
458        handle: &mut Handle,
459        prefix: &mut PathBuf,
460    ) -> bool {
461        for Entry { name, kind, .. } in self.entries() {
462            prefix.push(&name);
463            if handle(prefix.as_path(), kind) == false {
464                prefix.pop();
465                return true;
466            }
467
468            if kind == EntryKind::SubDir {
469                if let Ok(subdir) = self.open_directory(&name) {
470                    if subdir.recurse_impl(handle, prefix) == false {
471                        prefix.pop();
472                        return true;
473                    }
474                }
475            }
476
477            prefix.pop();
478        }
479        true
480    }
481
482    pub fn recurse<Handle: FnMut(&Path, EntryKind) -> bool>(&self, mut handle: Handle) {
483        self.recurse_impl(&mut handle, &mut Default::default());
484    }
485
486    fn find_first_object_impl(&self, name: &str, prefix: &mut PathBuf) -> bool {
487        let entries = self.entries();
488        if entries
489            .iter()
490            .any(|Entry { name: ent_name, .. }| name == ent_name)
491        {
492            prefix.push(name);
493            return true;
494        }
495        for Entry {
496            name: subdir_name, ..
497        } in entries.into_iter().filter(|e| e.kind == EntryKind::SubDir)
498        {
499            if let Ok(subdir) = self.open_directory(&subdir_name) {
500                prefix.push(subdir_name);
501                if subdir.find_first_object_impl(name, prefix) {
502                    return true;
503                }
504                prefix.pop();
505            }
506        }
507        false
508    }
509
510    pub fn find_first_object(&self, name: &str) -> Option<PathBuf> {
511        let mut ret = PathBuf::default();
512        if self.find_first_object_impl(name, &mut ret) {
513            Some(ret)
514        } else {
515            None
516        }
517    }
518}
519
520#[cfg(test)]
521mod test {
522    use super::*;
523    use std::io::{Cursor, Result};
524    #[test]
525    fn test_send_traits() {
526        fn check_sync<T: Send>() {}
527        check_sync::<Entry>();
528        check_sync::<DirectoryImpl<std::fs::File>>();
529        check_sync::<RwLock<DirectoryImpl<std::fs::File>>>();
530    }
531    #[test]
532    fn test_create_stream() -> Result<()> {
533        let mut buf = vec![];
534        {
535            let cursor = Cursor::new(&mut buf);
536            let mut dir = Directory::make_root(cursor)?;
537            let mut stream1 = dir.create_stream("test_stream_1", 128)?;
538            let mut stream2 = dir.create_stream("test_stream_2", 128)?;
539            stream1.write(b"This is the data from the first stream")?;
540            stream2.write(b"This is the data from the second stream")?;
541        }
542        {
543            let cursor = Cursor::new(&buf);
544            let dir = Directory::open_root(cursor, 0)?;
545            let mut first = dir.open_stream("test_stream_1")?;
546            let mut data = [0; 128];
547            let size = first.read(&mut data)?;
548            let result = &data[..size.min(38)];
549            assert_eq!(result, &b"This is the data from the first stream"[..]);
550        }
551        Ok(())
552    }
553    #[test]
554    fn test_directory_update() -> std::result::Result<(), Box<dyn std::error::Error>> {
555        let buf = {
556            let cursor = Cursor::new(vec![]);
557            let mut dir = Directory::make_root(cursor)?;
558            for i in 0..10 {
559                let stream_name = format!("test_stream.{}", i);
560                let mut test_stream = dir.create_stream(stream_name.as_str(), 32)?;
561                test_stream.write("this is a test stream".as_bytes())?;
562            }
563            dir.flush()?;
564            dir.clone_underlying_file()?.clone_inner()?.into_inner()
565        };
566        let buf = {
567            let backend = Cursor::new(buf);
568            let mut root = Directory::open_root_for_update(backend, 0)?;
569            assert_eq!(root.entries().len(), 10);
570            {
571                let mut another_dir = root.create_directory("additional_dir")?;
572                for i in 0..10 {
573                    let stream_name = format!("test_stream.{}", i);
574                    let mut test_stream = another_dir.create_stream(stream_name.as_str(), 32)?;
575                    test_stream.write("this is a test stream".as_bytes())?;
576                }
577            }
578
579            root.flush()?;
580            root.clone_underlying_file()?.clone_inner()?.into_inner()
581        };
582
583        let backend = Cursor::new(buf);
584        let root = Directory::open_root(backend, 0)?;
585        assert_eq!(root.entries().len(), 11);
586        let sub_dir = root.open_directory("additional_dir")?;
587        assert_eq!(sub_dir.entries().len(), 10);
588        Ok(())
589    }
590    #[test]
591    fn test_stream_cluster() -> Result<()> {
592        let buf = {
593            let cursor = Cursor::new(vec![]);
594            let mut dir = Directory::make_root(cursor)?;
595            let mut stream1 = dir.create_stream("test_stream_1", 128)?;
596            stream1.write(b"This is a testing block")?;
597            stream1.flush()?;
598            stream1.write(b"This is a testing block")?;
599            {
600                let mut cluster1 = dir.create_directory("test_cluster")?;
601                let mut cs1 = cluster1.create_stream("clustered_stream_1", 128)?;
602                let mut cs2 = cluster1.create_stream("clustered_stream_2", 128)?;
603                cs1.write(b"cluster test 1234")?;
604                cs2.write(b"hahahaha")?;
605                stream1.write(b"test").ok();
606                stream1.flush().expect_err("Should be error");
607            }
608            stream1.write(b"test")?;
609            stream1.flush()?;
610            dir.flush()?;
611            stream1.clone_underlying_file().clone_inner()?
612        };
613        {
614            let dir = Directory::open_root(buf, 0)?;
615            assert_eq!(dir.0.read().unwrap().entries.len(), 2);
616            let cluster = dir.open_directory("test_cluster")?;
617            let mut test = cluster.open_stream("clustered_stream_1")?;
618            let mut buf = [0; 4];
619            test.read(&mut buf[..])?;
620            assert_eq!(&buf, b"clus");
621        }
622        Ok(())
623    }
624}