distill_loader/
packfile_io.rs1use std::{
2 collections::{HashMap, HashSet},
3 fs::File,
4 mem::ManuallyDrop,
5 sync::Arc,
6};
7
8use capnp::serialize::SliceSegments;
9use distill_core::{utils::make_array, AssetMetadata, AssetRef, AssetUuid};
10use distill_schema::pack::pack_file;
11use memmap::{Mmap, MmapOptions};
12use thread_local::ThreadLocal;
13
14use crate::{
15 io::{DataRequest, LoaderIO, MetadataRequest, MetadataRequestResult, ResolveRequest},
16 loader::LoaderState,
17};
18
19struct PackfileMessageReader {
20 file: ManuallyDrop<File>,
21 mmap: ManuallyDrop<Mmap>,
22 message_reader: ManuallyDrop<ThreadLocal<capnp::message::Reader<SliceSegments<'static>>>>,
23}
24impl PackfileMessageReader {
25 pub fn new(file: File) -> std::io::Result<Self> {
26 let mmap = unsafe { MmapOptions::new().map(&file)? };
27 Ok(PackfileMessageReader {
28 file: ManuallyDrop::new(file),
29 mmap: ManuallyDrop::new(mmap),
30 message_reader: ManuallyDrop::new(ThreadLocal::new()),
31 })
32 }
33
34 fn get_reader(&self) -> capnp::Result<pack_file::Reader<'_>> {
35 let messge_reader = self.message_reader.get_or_try(|| {
36 let slice: &[u8] = &self.mmap;
38 let mut slice: &[u8] = unsafe { std::mem::transmute::<&[u8], &'static [u8]>(slice) };
39 let mut options = capnp::message::ReaderOptions::new();
40 options.traversal_limit_in_words(Some(1 << 31));
41 capnp::serialize::read_message_from_flat_slice(&mut slice, options)
42 })?;
43 messge_reader.get_root::<pack_file::Reader<'_>>()
44 }
45}
46impl Drop for PackfileMessageReader {
47 fn drop(&mut self) {
48 unsafe {
49 ManuallyDrop::drop(&mut self.message_reader);
50 ManuallyDrop::drop(&mut self.mmap);
51 ManuallyDrop::drop(&mut self.file);
52 }
53 }
54}
55struct PackfileReaderInner {
56 reader: PackfileMessageReader,
57 index_by_uuid: HashMap<AssetUuid, u32>,
58 assets_by_path: HashMap<String, Vec<u32>>,
59 runtime: tokio::runtime::Runtime,
60}
61pub struct PackfileReader(Arc<PackfileReaderInner>);
62
63impl PackfileReader {
64 pub fn new(file: File) -> capnp::Result<Self> {
65 let message_reader = PackfileMessageReader::new(file)?;
66 let reader = message_reader.get_reader()?;
67 let mut index_by_uuid = HashMap::new();
68 let mut assets_by_path: HashMap<String, Vec<u32>> = HashMap::new();
69 for (idx, entry) in reader.get_entries()?.iter().enumerate() {
70 let asset_metadata = entry.get_asset_metadata()?;
71 let id = AssetUuid(make_array(asset_metadata.get_id()?.get_id()?));
72 index_by_uuid.insert(id, idx as u32);
73 let path = entry.get_path()?;
74 let path = std::str::from_utf8(&path)?;
75 assets_by_path
76 .entry(path.into())
77 .and_modify(|v| v.push(idx as u32))
78 .or_insert_with(|| vec![idx as u32]);
79 }
80
81 Ok(PackfileReader(Arc::new(PackfileReaderInner {
82 reader: message_reader,
83 index_by_uuid,
84 assets_by_path,
85 runtime: tokio::runtime::Builder::new_multi_thread().build()?,
86 })))
87 }
88}
89
90impl PackfileReaderInner {
91 fn get_asset_metadata_with_dependencies_impl(
92 &self,
93 request: &MetadataRequest,
94 ) -> capnp::Result<Vec<MetadataRequestResult>> {
95 let reader = self.reader.get_reader()?;
96 let mut to_visit = request.requested_assets().cloned().collect::<Vec<_>>();
97 let mut visited: HashSet<AssetUuid, std::collections::hash_map::RandomState> =
98 to_visit.iter().cloned().collect();
99 let entries = reader.get_entries()?;
100 let mut metadata = Vec::new();
101 while let Some(uuid) = to_visit.pop() {
102 if let Some(idx) = self.index_by_uuid.get(&uuid) {
103 let entry = entries.get(*idx);
104 let artifact_metadata =
105 distill_schema::parse_artifact_metadata(&entry.get_artifact()?.get_metadata()?);
106 for dep in &artifact_metadata.load_deps {
107 if let AssetRef::Uuid(dep_uuid) = dep {
108 if !visited.contains(&dep_uuid) {
109 visited.insert(*dep_uuid);
110 to_visit.push(*dep_uuid);
111 }
112 }
113 }
114 let mut result = MetadataRequestResult {
115 artifact_metadata,
116 asset_metadata: None,
117 };
118 if request.include_asset_metadata() {
119 result.asset_metadata = Some(distill_schema::parse_db_metadata(
120 &entry.get_asset_metadata()?,
121 ));
122 }
123 metadata.push(result);
124 }
125 }
126 Ok(metadata)
127 }
128
129 fn get_artifact_impl(&self, request: &DataRequest) -> capnp::Result<Vec<u8>> {
130 let reader = self.reader.get_reader()?;
131 let entries = reader.get_entries()?;
132 if let Some(idx) = self.index_by_uuid.get(&request.asset_id) {
133 let entry = entries.get(*idx);
134 Ok(Vec::from(entry.get_artifact()?.get_data()?))
135 } else {
136 Err(capnp::Error::failed(format!(
137 "UUID {:?} not found in packfile",
138 request.asset_id
139 )))
140 }
141 }
142
143 fn get_asset_candidates_impl(
144 &self,
145 request: &ResolveRequest,
146 ) -> capnp::Result<Vec<(std::path::PathBuf, Vec<AssetMetadata>)>> {
147 let reader = self.reader.get_reader()?;
148 let entries = reader.get_entries()?;
149 if let Some(indices) = self.assets_by_path.get(request.identifier().path()) {
150 let mut metadata = Vec::with_capacity(indices.len());
151 let path = std::path::PathBuf::from(request.identifier().path().replace("\\", "/"));
153 for idx in indices {
154 let entry = entries.get(*idx);
155 let asset_metadata =
156 distill_schema::parse_db_metadata(&entry.get_asset_metadata()?);
157 metadata.push(asset_metadata);
158 }
159 Ok(vec![(path, metadata)])
160 } else {
161 Err(capnp::Error::failed(format!(
162 "Identifier {:?} not found in packfile",
163 request.identifier()
164 )))
165 }
166 }
167}
168
169impl LoaderIO for PackfileReader {
170 fn get_asset_metadata_with_dependencies(&mut self, request: MetadataRequest) {
171 let _guard = self.0.runtime.enter();
172 let inner = self.0.clone();
173 tokio::spawn(async move {
174 match inner.get_asset_metadata_with_dependencies_impl(&request) {
175 Ok(data) => request.complete(data),
176 Err(err) => request.error(err),
177 }
178 });
179 }
180
181 fn get_asset_candidates(&mut self, requests: Vec<ResolveRequest>) {
182 let _guard = self.0.runtime.enter();
183 for request in requests {
184 let inner = self.0.clone();
185 tokio::spawn(async move {
186 match inner.get_asset_candidates_impl(&request) {
187 Ok(data) => request.complete(data),
188 Err(err) => request.error(err),
189 }
190 });
191 }
192 }
193
194 fn get_artifacts(&mut self, requests: Vec<DataRequest>) {
195 let _guard = self.0.runtime.enter();
196 for request in requests {
197 let inner = self.0.clone();
198 tokio::spawn(async move {
199 match inner.get_artifact_impl(&request) {
200 Ok(data) => request.complete(data),
201 Err(err) => request.error(err),
202 }
203 });
204 }
205 }
206
207 fn tick(&mut self, _loader: &mut LoaderState) {}
208
209 fn with_runtime(&self, f: &mut dyn FnMut(&tokio::runtime::Runtime)) {
210 f(&self.0.runtime);
211 }
212}