1#[cfg(all(feature = "mapped_io", not(target_arch = "wasm32")))]
2use crate::mapped::MappedDirectory;
3use crate::randfile::RandFile;
4use crate::stream::Stream;
5use crate::Blob;
6use std::io::{Error, ErrorKind, Read, Result, Seek, Write};
7
8#[cfg(all(feature = "mapped_io", not(target_arch = "wasm32")))]
9use std::{fs::File, io::SeekFrom};
10
11use std::path::{Component, Path, PathBuf};
12use std::sync::{Arc, RwLock};
13
14#[derive(Copy, Clone, Eq, PartialEq, Debug)]
15pub enum EntryKind {
16 Stream = 0,
18 SubDir = 1,
20 Blob = 2,
22}
23
24#[derive(Clone, Debug)]
26pub struct Entry {
27 pub kind: EntryKind,
29 pub primary_offset: u64,
35 pub primary_size: u64,
39 pub name: String,
41}
42
43struct DirectoryImpl<T> {
44 offset: u64,
45 entries: Vec<Entry>,
46 stream: Stream<T>,
47}
48
49pub enum OpenResult<T: Read + Seek> {
50 Blob(Blob<T>),
51 Stream(Stream<T>),
52 SubDir(Directory<T>),
53}
54
55impl<T: Read + Write + Seek> DirectoryImpl<T> {
56 fn write_stream(&mut self, data: &[u8]) -> Result<usize> {
57 self.stream.write_with_alloc_callback(data, |s| {
58 s.double_frame_size(65536);
59 })
60 }
61 fn append_directory(&mut self, new_entry: Entry) -> Result<()> {
62 if self.entries.iter().any(|x| x.name == new_entry.name) {
63 return Err(Error::new(
64 ErrorKind::Other,
65 "Directory entry already exists",
66 ));
67 }
68 self.stream.update_current_byte(1)?;
70 self.write_stream(&[new_entry.kind as u8])?;
71 self.stream
72 .write(&(new_entry.primary_offset - self.offset).to_le_bytes())?;
73 self.write_stream(&new_entry.primary_size.to_le_bytes())?;
74 self.write_stream(
75 new_entry
76 .name
77 .bytes()
78 .chain(std::iter::once(0))
79 .collect::<Vec<_>>()
80 .as_ref(),
81 )?;
82 self.write_stream(&[0])?;
83 self.entries.push(new_entry);
84 Ok(())
85 }
86}
87
88pub struct Directory<T>(Arc<RwLock<DirectoryImpl<T>>>);
89
90impl<T> Clone for Directory<T> {
91 fn clone(&self) -> Self {
92 Directory(self.0.clone())
93 }
94}
95
96impl<T> Directory<T> {
97 pub const INIT_BLOCK_SIZE: usize = 512;
99 pub fn entry_kind(&self, name: &str) -> Option<EntryKind> {
101 self.0.read().unwrap().entries.iter().find_map(|e| {
102 if e.name == name {
103 return Some(e.kind);
104 }
105 None
106 })
107 }
108}
109
110impl<T: Clone> Directory<T> {
111 pub fn clone_underlying_file(&self) -> Result<RandFile<T>> {
112 let inner = self
113 .0
114 .read()
115 .map_err(|_| Error::new(ErrorKind::Other, "Lock Error"))?;
116 Ok(inner.stream.clone_underlying_file())
117 }
118}
119
120impl<T: Read + Write + Seek> Directory<T> {
121 pub fn make_root(back: T) -> Result<Directory<T>> {
122 let randfile = RandFile::new(back);
123 let stream = Stream::create(randfile, 512)?;
124 let entries = vec![];
125 Ok(Directory(Arc::new(RwLock::new(DirectoryImpl {
126 offset: stream.get_frame_offset().unwrap(),
127 entries,
128 stream,
129 }))))
130 }
131 pub fn open_root_for_update(back: T, offset: u64) -> Result<Directory<T>> {
132 let randfile = RandFile::new(back);
133 Self::open_directory_rw_impl(randfile, offset)
134 }
135
136 pub fn flush(&mut self) -> Result<()> {
137 let mut inner = self
138 .0
139 .write()
140 .map_err(|_| Error::new(ErrorKind::Other, "Lock Error"))?;
141 inner.stream.flush()
142 }
143
144 pub fn create_blob(&mut self, name: &str, size: usize) -> Result<Blob<T>> {
145 let mut inner = self
146 .0
147 .write()
148 .map_err(|_| Error::new(ErrorKind::Other, "Lock Error"))?;
149 let mut file = inner.stream.clone_underlying_file();
150 let offset = file.reserve_block(size)?;
151 inner.append_directory(Entry {
152 kind: EntryKind::Blob,
153 primary_offset: offset,
154 primary_size: size as u64,
155 name: name.to_string(),
156 })?;
157 Ok(Blob::new(file, offset, size))
158 }
159
160 pub fn open_or_create_directory(&mut self, name: &str) -> Result<Directory<T>>
161 where
162 T: Send + 'static,
163 {
164 if let Ok(dir) = self.open_directory_for_update(name) {
165 Ok(dir)
166 } else {
167 self.create_directory(name)
168 }
169 }
170
171 pub fn create_directory(&mut self, name: &str) -> Result<Directory<T>>
172 where
173 T: Send + 'static,
174 {
175 let file = {
176 let mut parent_file = self
177 .0
178 .read()
179 .map_err(|_| Error::new(ErrorKind::Other, "Lock Error"))?
180 .stream
181 .clone_underlying_file();
182 let dir_addr = parent_file.size()?;
183 let parent_directory = self.clone();
184 let name = name.to_string();
185 parent_file.clone().lock(Box::new(move || {
186 let kind = EntryKind::SubDir;
187 let primary_offset = dir_addr;
188 let primary_size = parent_file.size().unwrap() - dir_addr;
189 let mut inner = parent_directory.0.write().unwrap();
190 let entry = Entry {
191 kind,
192 primary_offset,
193 primary_size,
194 name,
195 };
196 inner.append_directory(entry).unwrap();
197 }))?
198 };
199 let stream = Stream::create(file, Self::INIT_BLOCK_SIZE)?;
200 let entries = vec![];
201 Ok(Directory(Arc::new(RwLock::new(DirectoryImpl {
202 offset: stream.get_frame_offset().unwrap(),
203 entries,
204 stream,
205 }))))
206 }
207 pub fn create_stream(&mut self, name: &str, frame_size: usize) -> Result<Stream<T>> {
208 let mut inner = self
209 .0
210 .write()
211 .map_err(|_| Error::new(ErrorKind::Other, "Lock Error"))?;
212 let file = inner.stream.clone_underlying_file();
213 let stream = Stream::create(file, frame_size)?;
214 inner.append_directory(Entry {
215 kind: EntryKind::Stream,
216 primary_offset: stream.get_frame_offset().unwrap(),
217 primary_size: stream.get_frame_size().unwrap() as u64,
218 name: name.to_string(),
219 })?;
220 Ok(stream)
221 }
222 pub fn open_directory_for_update(&self, name: &str) -> Result<Directory<T>> {
223 self.open_directory_impl(name, Self::open_directory_rw_impl)
224 }
225}
226
227#[cfg(all(feature = "mapped_io", not(target_arch = "wasm32")))]
228impl Directory<File> {
229 pub fn copy_directory_from_file<T: Read + Seek>(
230 &mut self,
231 name: &str,
232 mut source: T,
233 offset: u64,
234 size: usize,
235 ) -> Result<()> {
236 let mut inner = self
237 .0
238 .write()
239 .map_err(|_| Error::new(ErrorKind::Other, "Lock Error"))?;
240 let mut file = inner.stream.clone_underlying_file();
241 let dest_offset = file.reserve_block(size)?;
242 inner.append_directory(Entry {
243 kind: EntryKind::SubDir,
244 primary_offset: dest_offset,
245 primary_size: size as u64,
246 name: name.to_string(),
247 })?;
248 let mut object = Blob::new(file, dest_offset, size);
249 let mut object_data = object.mmap_mut()?;
250 source.seek(SeekFrom::Start(offset))?;
251 source.read_exact(object_data.as_mut())
252 }
253
254 pub fn map_directory(&self, name: &str) -> Result<MappedDirectory> {
255 let inner = self
256 .0
257 .read()
258 .map_err(|_| Error::new(ErrorKind::Other, "Lock Error"))?;
259 if let Some(entry) = inner
260 .entries
261 .iter()
262 .find(|e| e.name == name && e.kind == EntryKind::SubDir)
263 {
264 return MappedDirectory::new(
265 inner.stream.clone_underlying_file(),
266 entry.primary_offset,
267 entry.primary_size as usize,
268 );
269 }
270
271 Err(Error::new(ErrorKind::Other, "Directory not found"))
272 }
273}
274
275impl<T: Read + Seek> Directory<T> {
276 pub fn entries(&self) -> Vec<Entry> {
278 self.0.read().unwrap().entries.clone()
279 }
280 pub fn open_root(back: T, offset: u64) -> Result<Directory<T>> {
282 let randfile = RandFile::new(back);
283 Self::open_directory_ro_impl(randfile, offset)
284 }
285 pub(crate) fn read_next_entry<R: Read>(base: u64, input: &mut R) -> Result<Option<Entry>> {
286 let mut has_next = [0u8];
287 if input.read(&mut has_next)? != 1 || has_next[0] == 0 {
288 return Ok(None);
289 }
290 let mut kind_buffer = [0];
291 let mut offset_buffer = [0; 8];
292 let mut size_buffer = [0; 8];
293 input.read_exact(&mut kind_buffer)?;
294 input.read_exact(&mut offset_buffer)?;
295 input.read_exact(&mut size_buffer)?;
296 let offset = u64::from_le_bytes(offset_buffer) + base;
297 let size = u64::from_le_bytes(size_buffer);
298 let mut name = vec![];
299 let mut current_byte = [0];
300 while input.read(&mut current_byte)? > 0 {
301 if current_byte[0] == 0 {
302 break;
303 }
304 name.push(current_byte[0]);
305 }
306 let name = String::from_utf8_lossy(&name[..]).to_string();
307 let kind = match kind_buffer[0] {
308 0 => EntryKind::Stream,
309 1 => EntryKind::SubDir,
310 2 => EntryKind::Blob,
311 _ => return Err(Error::new(ErrorKind::Other, "Invalid directory type code")),
312 };
313 Ok(Some(Entry {
314 kind,
315 name,
316 primary_offset: offset,
317 primary_size: size,
318 }))
319 }
320 fn open_directory_with_stream(mut stream: Stream<T>, offset: u64) -> Result<Directory<T>> {
321 let mut entries = vec![];
322 while let Some(entry) = Self::read_next_entry(offset, &mut stream)? {
323 entries.push(entry);
324 }
325 Ok(Directory(Arc::new(RwLock::new(DirectoryImpl {
326 offset,
327 entries,
328 stream,
329 }))))
330 }
331 fn open_directory_ro_impl(randfile: RandFile<T>, offset: u64) -> Result<Directory<T>> {
332 let stream = Stream::open_for_read(randfile, (offset, Self::INIT_BLOCK_SIZE))?;
333 Self::open_directory_with_stream(stream, offset)
334 }
335 fn open_directory_rw_impl(randfile: RandFile<T>, offset: u64) -> Result<Directory<T>>
336 where
337 T: Write,
338 {
339 let stream = Stream::open_for_update(randfile, (offset, Self::INIT_BLOCK_SIZE))?;
340 Self::open_directory_with_stream(stream, offset)
341 }
342
343 pub fn open_blob(&self, name: &str) -> Result<Blob<T>> {
344 let inner = self
345 .0
346 .read()
347 .map_err(|_| Error::new(ErrorKind::Other, "Lock Error"))?;
348 if let Some(entry) = inner
349 .entries
350 .iter()
351 .find(|e| e.name == name && e.kind == EntryKind::Blob)
352 {
353 let file = inner.stream.clone_underlying_file();
354 return Ok(Blob::new(
355 file,
356 entry.primary_offset,
357 entry.primary_size as usize,
358 ));
359 }
360 Err(Error::new(ErrorKind::Other, "Chunk not found"))
361 }
362 pub fn open_stream_by_offset(&self, offset: u64, frame_size: usize) -> Result<Stream<T>> {
363 let inner = self
364 .0
365 .read()
366 .map_err(|_| Error::new(ErrorKind::Other, "Lock Error"))?;
367 let file = inner.stream.clone_underlying_file();
368 Stream::open_for_read(file, (offset + inner.offset, frame_size))
369 }
370 pub fn open_stream(&self, name: &str) -> Result<Stream<T>> {
371 let inner = self
372 .0
373 .read()
374 .map_err(|_| Error::new(ErrorKind::Other, "Lock Error"))?;
375 if let Some(entry) = inner
376 .entries
377 .iter()
378 .find(|e| e.name == name && e.kind == EntryKind::Stream)
379 {
380 let file = inner.stream.clone_underlying_file();
381 return Stream::open_for_read(
382 file,
383 (entry.primary_offset, entry.primary_size as usize),
384 );
385 }
386 Err(Error::new(ErrorKind::Other, "Stream not found"))
387 }
388
389 fn open_directory_impl<H: FnOnce(RandFile<T>, u64) -> Result<Directory<T>>>(
390 &self,
391 name: &str,
392 handle: H,
393 ) -> Result<Directory<T>> {
394 let inner = self
395 .0
396 .read()
397 .map_err(|_| Error::new(ErrorKind::Other, "Lock Error"))?;
398 if let Some(entry) = inner
399 .entries
400 .iter()
401 .find(|e| e.name == name && e.kind == EntryKind::SubDir)
402 {
403 let file = inner.stream.clone_underlying_file();
404 return handle(file, entry.primary_offset);
405 }
406 Err(Error::new(ErrorKind::Other, "Stream not found"))
407 }
408
409 pub fn open_directory(&self, name: &str) -> Result<Directory<T>> {
410 self.open_directory_impl(name, Self::open_directory_ro_impl)
411 }
412
413 pub fn open<P: AsRef<Path>>(&self, path: P) -> Result<OpenResult<T>> {
414 let path = path.as_ref();
415 let n_comp = path.components().count();
416 let mut cur_dir = self.clone();
417 if n_comp == 0 {
418 return Ok(OpenResult::SubDir(self.clone()));
419 }
420 for (idx, comp) in path.components().enumerate() {
421 let comp = match comp {
422 Component::Normal(name) => name.to_string_lossy().to_owned(),
423 _ => continue,
424 };
425 if idx < n_comp - 1 {
426 cur_dir = cur_dir.open_directory(&comp)?
427 } else {
428 match cur_dir.entries().into_iter().find(|e| e.name == comp) {
429 Some(Entry {
430 kind: EntryKind::Blob,
431 ..
432 }) => {
433 return cur_dir.open_blob(&comp).map(OpenResult::Blob);
434 }
435 Some(Entry {
436 kind: EntryKind::Stream,
437 ..
438 }) => {
439 return cur_dir.open_stream(&comp).map(OpenResult::Stream);
440 }
441 Some(Entry {
442 kind: EntryKind::SubDir,
443 ..
444 }) => {
445 return cur_dir.open_directory(&comp).map(OpenResult::SubDir);
446 }
447 None => {
448 return Err(Error::new(ErrorKind::Other, "Object not found"));
449 }
450 }
451 }
452 }
453 Err(Error::new(ErrorKind::Other, "Invalid path"))
454 }
455
456 fn recurse_impl<Handle: FnMut(&Path, EntryKind) -> bool>(
457 &self,
458 handle: &mut Handle,
459 prefix: &mut PathBuf,
460 ) -> bool {
461 for Entry { name, kind, .. } in self.entries() {
462 prefix.push(&name);
463 if handle(prefix.as_path(), kind) == false {
464 prefix.pop();
465 return true;
466 }
467
468 if kind == EntryKind::SubDir {
469 if let Ok(subdir) = self.open_directory(&name) {
470 if subdir.recurse_impl(handle, prefix) == false {
471 prefix.pop();
472 return true;
473 }
474 }
475 }
476
477 prefix.pop();
478 }
479 true
480 }
481
482 pub fn recurse<Handle: FnMut(&Path, EntryKind) -> bool>(&self, mut handle: Handle) {
483 self.recurse_impl(&mut handle, &mut Default::default());
484 }
485
486 fn find_first_object_impl(&self, name: &str, prefix: &mut PathBuf) -> bool {
487 let entries = self.entries();
488 if entries
489 .iter()
490 .any(|Entry { name: ent_name, .. }| name == ent_name)
491 {
492 prefix.push(name);
493 return true;
494 }
495 for Entry {
496 name: subdir_name, ..
497 } in entries.into_iter().filter(|e| e.kind == EntryKind::SubDir)
498 {
499 if let Ok(subdir) = self.open_directory(&subdir_name) {
500 prefix.push(subdir_name);
501 if subdir.find_first_object_impl(name, prefix) {
502 return true;
503 }
504 prefix.pop();
505 }
506 }
507 false
508 }
509
510 pub fn find_first_object(&self, name: &str) -> Option<PathBuf> {
511 let mut ret = PathBuf::default();
512 if self.find_first_object_impl(name, &mut ret) {
513 Some(ret)
514 } else {
515 None
516 }
517 }
518}
519
520#[cfg(test)]
521mod test {
522 use super::*;
523 use std::io::{Cursor, Result};
524 #[test]
525 fn test_send_traits() {
526 fn check_sync<T: Send>() {}
527 check_sync::<Entry>();
528 check_sync::<DirectoryImpl<std::fs::File>>();
529 check_sync::<RwLock<DirectoryImpl<std::fs::File>>>();
530 }
531 #[test]
532 fn test_create_stream() -> Result<()> {
533 let mut buf = vec![];
534 {
535 let cursor = Cursor::new(&mut buf);
536 let mut dir = Directory::make_root(cursor)?;
537 let mut stream1 = dir.create_stream("test_stream_1", 128)?;
538 let mut stream2 = dir.create_stream("test_stream_2", 128)?;
539 stream1.write(b"This is the data from the first stream")?;
540 stream2.write(b"This is the data from the second stream")?;
541 }
542 {
543 let cursor = Cursor::new(&buf);
544 let dir = Directory::open_root(cursor, 0)?;
545 let mut first = dir.open_stream("test_stream_1")?;
546 let mut data = [0; 128];
547 let size = first.read(&mut data)?;
548 let result = &data[..size.min(38)];
549 assert_eq!(result, &b"This is the data from the first stream"[..]);
550 }
551 Ok(())
552 }
553 #[test]
554 fn test_directory_update() -> std::result::Result<(), Box<dyn std::error::Error>> {
555 let buf = {
556 let cursor = Cursor::new(vec![]);
557 let mut dir = Directory::make_root(cursor)?;
558 for i in 0..10 {
559 let stream_name = format!("test_stream.{}", i);
560 let mut test_stream = dir.create_stream(stream_name.as_str(), 32)?;
561 test_stream.write("this is a test stream".as_bytes())?;
562 }
563 dir.flush()?;
564 dir.clone_underlying_file()?.clone_inner()?.into_inner()
565 };
566 let buf = {
567 let backend = Cursor::new(buf);
568 let mut root = Directory::open_root_for_update(backend, 0)?;
569 assert_eq!(root.entries().len(), 10);
570 {
571 let mut another_dir = root.create_directory("additional_dir")?;
572 for i in 0..10 {
573 let stream_name = format!("test_stream.{}", i);
574 let mut test_stream = another_dir.create_stream(stream_name.as_str(), 32)?;
575 test_stream.write("this is a test stream".as_bytes())?;
576 }
577 }
578
579 root.flush()?;
580 root.clone_underlying_file()?.clone_inner()?.into_inner()
581 };
582
583 let backend = Cursor::new(buf);
584 let root = Directory::open_root(backend, 0)?;
585 assert_eq!(root.entries().len(), 11);
586 let sub_dir = root.open_directory("additional_dir")?;
587 assert_eq!(sub_dir.entries().len(), 10);
588 Ok(())
589 }
590 #[test]
591 fn test_stream_cluster() -> Result<()> {
592 let buf = {
593 let cursor = Cursor::new(vec![]);
594 let mut dir = Directory::make_root(cursor)?;
595 let mut stream1 = dir.create_stream("test_stream_1", 128)?;
596 stream1.write(b"This is a testing block")?;
597 stream1.flush()?;
598 stream1.write(b"This is a testing block")?;
599 {
600 let mut cluster1 = dir.create_directory("test_cluster")?;
601 let mut cs1 = cluster1.create_stream("clustered_stream_1", 128)?;
602 let mut cs2 = cluster1.create_stream("clustered_stream_2", 128)?;
603 cs1.write(b"cluster test 1234")?;
604 cs2.write(b"hahahaha")?;
605 stream1.write(b"test").ok();
606 stream1.flush().expect_err("Should be error");
607 }
608 stream1.write(b"test")?;
609 stream1.flush()?;
610 dir.flush()?;
611 stream1.clone_underlying_file().clone_inner()?
612 };
613 {
614 let dir = Directory::open_root(buf, 0)?;
615 assert_eq!(dir.0.read().unwrap().entries.len(), 2);
616 let cluster = dir.open_directory("test_cluster")?;
617 let mut test = cluster.open_stream("clustered_stream_1")?;
618 let mut buf = [0; 4];
619 test.read(&mut buf[..])?;
620 assert_eq!(&buf, b"clus");
621 }
622 Ok(())
623 }
624}