distill_loader/
packfile_io.rs

1use std::{
2    collections::{HashMap, HashSet},
3    fs::File,
4    mem::ManuallyDrop,
5    sync::Arc,
6};
7
8use capnp::serialize::SliceSegments;
9use distill_core::{utils::make_array, AssetMetadata, AssetRef, AssetUuid};
10use distill_schema::pack::pack_file;
11use memmap::{Mmap, MmapOptions};
12use thread_local::ThreadLocal;
13
14use crate::{
15    io::{DataRequest, LoaderIO, MetadataRequest, MetadataRequestResult, ResolveRequest},
16    loader::LoaderState,
17};
18
19struct PackfileMessageReader {
20    file: ManuallyDrop<File>,
21    mmap: ManuallyDrop<Mmap>,
22    message_reader: ManuallyDrop<ThreadLocal<capnp::message::Reader<SliceSegments<'static>>>>,
23}
24impl PackfileMessageReader {
25    pub fn new(file: File) -> std::io::Result<Self> {
26        let mmap = unsafe { MmapOptions::new().map(&file)? };
27        Ok(PackfileMessageReader {
28            file: ManuallyDrop::new(file),
29            mmap: ManuallyDrop::new(mmap),
30            message_reader: ManuallyDrop::new(ThreadLocal::new()),
31        })
32    }
33
34    fn get_reader(&self) -> capnp::Result<pack_file::Reader<'_>> {
35        let messge_reader = self.message_reader.get_or_try(|| {
36            // We ensure that the reader is dropped before the mmap so it's ok to cast to 'static here
37            let slice: &[u8] = &self.mmap;
38            let mut slice: &[u8] = unsafe { std::mem::transmute::<&[u8], &'static [u8]>(slice) };
39            let mut options = capnp::message::ReaderOptions::new();
40            options.traversal_limit_in_words(Some(1 << 31));
41            capnp::serialize::read_message_from_flat_slice(&mut slice, options)
42        })?;
43        messge_reader.get_root::<pack_file::Reader<'_>>()
44    }
45}
46impl Drop for PackfileMessageReader {
47    fn drop(&mut self) {
48        unsafe {
49            ManuallyDrop::drop(&mut self.message_reader);
50            ManuallyDrop::drop(&mut self.mmap);
51            ManuallyDrop::drop(&mut self.file);
52        }
53    }
54}
55struct PackfileReaderInner {
56    reader: PackfileMessageReader,
57    index_by_uuid: HashMap<AssetUuid, u32>,
58    assets_by_path: HashMap<String, Vec<u32>>,
59    runtime: tokio::runtime::Runtime,
60}
61pub struct PackfileReader(Arc<PackfileReaderInner>);
62
63impl PackfileReader {
64    pub fn new(file: File) -> capnp::Result<Self> {
65        let message_reader = PackfileMessageReader::new(file)?;
66        let reader = message_reader.get_reader()?;
67        let mut index_by_uuid = HashMap::new();
68        let mut assets_by_path: HashMap<String, Vec<u32>> = HashMap::new();
69        for (idx, entry) in reader.get_entries()?.iter().enumerate() {
70            let asset_metadata = entry.get_asset_metadata()?;
71            let id = AssetUuid(make_array(asset_metadata.get_id()?.get_id()?));
72            index_by_uuid.insert(id, idx as u32);
73            let path = entry.get_path()?;
74            let path = std::str::from_utf8(&path)?;
75            assets_by_path
76                .entry(path.into())
77                .and_modify(|v| v.push(idx as u32))
78                .or_insert_with(|| vec![idx as u32]);
79        }
80
81        Ok(PackfileReader(Arc::new(PackfileReaderInner {
82            reader: message_reader,
83            index_by_uuid,
84            assets_by_path,
85            runtime: tokio::runtime::Builder::new_multi_thread().build()?,
86        })))
87    }
88}
89
90impl PackfileReaderInner {
91    fn get_asset_metadata_with_dependencies_impl(
92        &self,
93        request: &MetadataRequest,
94    ) -> capnp::Result<Vec<MetadataRequestResult>> {
95        let reader = self.reader.get_reader()?;
96        let mut to_visit = request.requested_assets().cloned().collect::<Vec<_>>();
97        let mut visited: HashSet<AssetUuid, std::collections::hash_map::RandomState> =
98            to_visit.iter().cloned().collect();
99        let entries = reader.get_entries()?;
100        let mut metadata = Vec::new();
101        while let Some(uuid) = to_visit.pop() {
102            if let Some(idx) = self.index_by_uuid.get(&uuid) {
103                let entry = entries.get(*idx);
104                let artifact_metadata =
105                    distill_schema::parse_artifact_metadata(&entry.get_artifact()?.get_metadata()?);
106                for dep in &artifact_metadata.load_deps {
107                    if let AssetRef::Uuid(dep_uuid) = dep {
108                        if !visited.contains(&dep_uuid) {
109                            visited.insert(*dep_uuid);
110                            to_visit.push(*dep_uuid);
111                        }
112                    }
113                }
114                let mut result = MetadataRequestResult {
115                    artifact_metadata,
116                    asset_metadata: None,
117                };
118                if request.include_asset_metadata() {
119                    result.asset_metadata = Some(distill_schema::parse_db_metadata(
120                        &entry.get_asset_metadata()?,
121                    ));
122                }
123                metadata.push(result);
124            }
125        }
126        Ok(metadata)
127    }
128
129    fn get_artifact_impl(&self, request: &DataRequest) -> capnp::Result<Vec<u8>> {
130        let reader = self.reader.get_reader()?;
131        let entries = reader.get_entries()?;
132        if let Some(idx) = self.index_by_uuid.get(&request.asset_id) {
133            let entry = entries.get(*idx);
134            Ok(Vec::from(entry.get_artifact()?.get_data()?))
135        } else {
136            Err(capnp::Error::failed(format!(
137                "UUID {:?} not found in packfile",
138                request.asset_id
139            )))
140        }
141    }
142
143    fn get_asset_candidates_impl(
144        &self,
145        request: &ResolveRequest,
146    ) -> capnp::Result<Vec<(std::path::PathBuf, Vec<AssetMetadata>)>> {
147        let reader = self.reader.get_reader()?;
148        let entries = reader.get_entries()?;
149        if let Some(indices) = self.assets_by_path.get(request.identifier().path()) {
150            let mut metadata = Vec::with_capacity(indices.len());
151            // TODO canonicalize the requested path
152            let path = std::path::PathBuf::from(request.identifier().path().replace("\\", "/"));
153            for idx in indices {
154                let entry = entries.get(*idx);
155                let asset_metadata =
156                    distill_schema::parse_db_metadata(&entry.get_asset_metadata()?);
157                metadata.push(asset_metadata);
158            }
159            Ok(vec![(path, metadata)])
160        } else {
161            Err(capnp::Error::failed(format!(
162                "Identifier {:?} not found in packfile",
163                request.identifier()
164            )))
165        }
166    }
167}
168
169impl LoaderIO for PackfileReader {
170    fn get_asset_metadata_with_dependencies(&mut self, request: MetadataRequest) {
171        let _guard = self.0.runtime.enter();
172        let inner = self.0.clone();
173        tokio::spawn(async move {
174            match inner.get_asset_metadata_with_dependencies_impl(&request) {
175                Ok(data) => request.complete(data),
176                Err(err) => request.error(err),
177            }
178        });
179    }
180
181    fn get_asset_candidates(&mut self, requests: Vec<ResolveRequest>) {
182        let _guard = self.0.runtime.enter();
183        for request in requests {
184            let inner = self.0.clone();
185            tokio::spawn(async move {
186                match inner.get_asset_candidates_impl(&request) {
187                    Ok(data) => request.complete(data),
188                    Err(err) => request.error(err),
189                }
190            });
191        }
192    }
193
194    fn get_artifacts(&mut self, requests: Vec<DataRequest>) {
195        let _guard = self.0.runtime.enter();
196        for request in requests {
197            let inner = self.0.clone();
198            tokio::spawn(async move {
199                match inner.get_artifact_impl(&request) {
200                    Ok(data) => request.complete(data),
201                    Err(err) => request.error(err),
202                }
203            });
204        }
205    }
206
207    fn tick(&mut self, _loader: &mut LoaderState) {}
208
209    fn with_runtime(&self, f: &mut dyn FnMut(&tokio::runtime::Runtime)) {
210        f(&self.0.runtime);
211    }
212}