flatdata/
memstorage.rs

1use crate::storage::{ResourceStorage, StorageHandle, Stream};
2
3use std::{
4    collections::BTreeMap,
5    fmt,
6    io::{self, Cursor, Read, Seek, Write},
7    path::PathBuf,
8    slice,
9    sync::{Arc, Mutex},
10};
11
12type MemoryStorageStream = Arc<Mutex<Cursor<Vec<u8>>>>;
13
14/// Internal storage of data in memory.
15#[derive(Default, Clone)]
16struct MemoryStorage {
17    // Streams of resources that were written.
18    streams: Arc<Mutex<BTreeMap<PathBuf, MemoryStorageStream>>>,
19    // Data of resources that were opened for reading.
20    resources: Arc<Mutex<BTreeMap<PathBuf, Arc<Vec<u8>>>>>,
21}
22
23impl fmt::Debug for MemoryStorage {
24    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
25        write!(
26            f,
27            "MemoryStorage {{ streams: {:?}, resources: {:?} }}",
28            self.streams
29                .lock()
30                .unwrap()
31                .iter()
32                .map(|(path, _)| path.display())
33                .collect::<Vec<_>>(),
34            self.resources
35                .lock()
36                .unwrap()
37                .iter()
38                .map(|(path, _)| path.display())
39                .collect::<Vec<_>>(),
40        )
41    }
42}
43
44/// Resource storage in memory.
45///
46/// Used to create and read archives from memory, e.g. for writing tests.
47///
48/// # Examples
49///
50/// ```rust
51/// use flatdata::{MemoryResourceStorage,  Vector};
52/// use flatdata::test::{X, XBuilder};
53///
54/// let storage = MemoryResourceStorage::new("/root/to/my/archive/in/memory");
55/// let builder = XBuilder::new(storage.clone()).expect("failed to create builder");
56/// // Write some data and store it archive, e.g.
57/// let v = Vector::new();
58/// builder.set_data(&v.as_view());
59///
60/// let archive = X::open(storage).expect("failed to open");
61/// // read data
62/// archive.data();
63/// ```
64#[derive(Debug)]
65pub struct MemoryResourceStorage {
66    storage: MemoryStorage,
67    path: PathBuf,
68}
69
70impl MemoryResourceStorage {
71    /// Create an empty memory resource storage at a given virtual path.
72    ///
73    /// Resources will be placed in ephemeral memory with prefix `path`. A path
74    /// has to be provided to unify the interface with `FileResourceStorage`.
75    #[allow(clippy::new_ret_no_self)]
76    pub fn new<P: Into<PathBuf>>(path: P) -> Arc<Self> {
77        Arc::new(Self {
78            storage: MemoryStorage::default(),
79            path: path.into(),
80        })
81    }
82}
83
84impl ResourceStorage for MemoryResourceStorage {
85    fn subdir(&self, dir: &str) -> StorageHandle {
86        Arc::new(Self {
87            storage: self.storage.clone(),
88            path: self.path.join(dir),
89        })
90    }
91
92    fn exists(&self, resource_name: &str) -> bool {
93        let resource_path = self.path.join(resource_name);
94        self.storage
95            .resources
96            .lock()
97            .unwrap()
98            .contains_key(&resource_path)
99            || self
100                .storage
101                .streams
102                .lock()
103                .unwrap()
104                .contains_key(&resource_path)
105    }
106
107    fn read_resource(&self, resource_name: &str) -> Result<&[u8], io::Error> {
108        let resource_path = self.path.join(resource_name);
109        if !self
110            .storage
111            .resources
112            .lock()
113            .unwrap()
114            .contains_key(&resource_path)
115        {
116            let streams = self.storage.streams.lock().unwrap();
117            let stream = streams.get(&resource_path);
118            match stream {
119                Some(stream) => {
120                    // Resource is not yet opened, but there is a stream it was written to
121                    // => copy the stream as resource data.
122                    let data = Arc::new(stream.lock().unwrap().get_ref().clone());
123                    self.storage
124                        .resources
125                        .lock()
126                        .unwrap()
127                        .insert(resource_path.clone(), data);
128                }
129                None => {
130                    return Err(io::Error::new(
131                        io::ErrorKind::NotFound,
132                        String::from(resource_path.to_str().unwrap_or(resource_name)),
133                    ));
134                }
135            }
136        }
137        let data = &self.storage.resources.lock().unwrap()[&resource_path];
138        // We cannot prove to Rust that the buffer will live as long as the storage
139        // (we never delete mappings), so we need to manually extend lifetime
140        let extended_lifetime_data = unsafe { slice::from_raw_parts(data.as_ptr(), data.len()) };
141        Ok(extended_lifetime_data)
142    }
143
144    fn create_output_stream(&self, resource_name: &str) -> Result<Box<dyn Stream>, io::Error> {
145        let resource_path = self.path.join(resource_name);
146        let stream = self
147            .storage
148            .streams
149            .lock()
150            .unwrap()
151            .entry(resource_path)
152            .or_insert_with(|| Arc::new(Mutex::new(Cursor::new(Vec::new()))))
153            .clone();
154        Ok(Box::new(StreamWrapper { stream }))
155    }
156}
157
158struct StreamWrapper {
159    stream: Arc<Mutex<Cursor<Vec<u8>>>>,
160}
161
162impl Read for StreamWrapper {
163    fn read(&mut self, result: &mut [u8]) -> std::result::Result<usize, std::io::Error> {
164        self.stream.lock().unwrap().read(result)
165    }
166}
167
168impl Seek for StreamWrapper {
169    fn seek(&mut self, pos: std::io::SeekFrom) -> std::result::Result<u64, std::io::Error> {
170        self.stream.lock().unwrap().seek(pos)
171    }
172}
173
174impl Write for StreamWrapper {
175    fn write(&mut self, data: &[u8]) -> std::result::Result<usize, std::io::Error> {
176        self.stream.lock().unwrap().write(data)
177    }
178
179    fn flush(&mut self) -> std::result::Result<(), std::io::Error> {
180        self.stream.lock().unwrap().flush()
181    }
182}