1use std::{
2 collections::HashMap,
3 fs,
4 net::SocketAddr,
5 path::{Path, PathBuf},
6 sync::Arc,
7 thread,
8 thread::JoinHandle,
9};
10
11use asset_hub::AssetHub;
12use asset_hub_service::AssetHubService;
13use distill_importer::{BoxedImporter, ImporterContext};
14use distill_schema::data;
15use file_asset_source::FileAssetSource;
16use tokio::sync::oneshot::{self, Receiver, Sender};
17
18use crate::{
19 artifact_cache::ArtifactCache, asset_hub, asset_hub_service, capnp_db::Environment,
20 error::Result, file_asset_source, file_tracker::FileTracker,
21};
22
23#[derive(Default)]
24pub struct ImporterMap(HashMap<String, Box<dyn BoxedImporter>>);
25
26impl ImporterMap {
27 pub fn insert(&mut self, ext: &str, importer: Box<dyn BoxedImporter>) {
28 self.0.insert(ext.to_lowercase(), importer);
29 }
30
31 pub fn get_by_path<'a>(&'a self, path: &Path) -> Option<&'a dyn BoxedImporter> {
32 let lower_extension = path
33 .extension()
34 .map(|s| s.to_str().unwrap().to_lowercase())
35 .unwrap_or_else(|| "".to_string());
36 self.0.get(lower_extension.as_str()).map(|i| i.as_ref())
37 }
38}
39
40struct AssetDaemonTables {
41 daemon_info: lmdb::Database,
44}
45impl AssetDaemonTables {
46 fn new(db: &Environment) -> Result<Self> {
47 Ok(Self {
48 daemon_info: db.create_db(Some("daemon_info"), lmdb::DatabaseFlags::default())?,
49 })
50 }
51}
52
53const DAEMON_VERSION: u32 = 2;
54pub struct AssetDaemon {
55 pub db_dir: PathBuf,
56 pub address: SocketAddr,
57 pub importers: ImporterMap,
58 pub importer_contexts: Vec<Box<dyn ImporterContext>>,
59 pub asset_dirs: Vec<PathBuf>,
60}
61
62pub fn default_importer_contexts() -> Vec<Box<dyn ImporterContext + 'static>> {
63 vec![distill_loader::if_handle_enabled!(Box::new(
64 distill_loader::handle::HandleSerdeContextProvider
65 ))]
66}
67
68#[allow(unused_mut)]
69pub fn default_importers() -> Vec<(&'static str, Box<dyn BoxedImporter>)> {
70 let mut importers: Vec<(&'static str, Box<dyn BoxedImporter>)> = vec![];
71
72 distill_importer::if_serde_importers!(
73 importers.push(("ron", Box::new(distill_importer::RonImporter::default())))
74 );
75 importers
76}
77impl Default for AssetDaemon {
78 fn default() -> Self {
79 let mut importer_map = ImporterMap::default();
80 for (ext, importer) in default_importers() {
81 importer_map.insert(ext, importer);
82 }
83 Self {
84 db_dir: PathBuf::from(".assets_db"),
85 address: "127.0.0.1:9999".parse().unwrap(),
86 importers: importer_map,
87 importer_contexts: default_importer_contexts(),
88 asset_dirs: vec![PathBuf::from("assets")],
89 }
90 }
91}
92
93impl AssetDaemon {
94 pub fn with_db_path<P: AsRef<Path>>(mut self, path: P) -> Self {
95 self.db_dir = path.as_ref().to_owned();
96 self
97 }
98
99 pub fn with_address(mut self, address: SocketAddr) -> Self {
100 self.address = address;
101 self
102 }
103
104 pub fn with_importer<B>(mut self, ext: &str, importer: B) -> Self
105 where
106 B: BoxedImporter + 'static,
107 {
108 self.importers.insert(ext, Box::new(importer));
109 self
110 }
111
112 pub fn add_importer<B>(&mut self, ext: &str, importer: B)
113 where
114 B: BoxedImporter + 'static,
115 {
116 self.importers.insert(ext, Box::new(importer));
117 }
118
119 pub fn with_importers<B, I>(self, importers: I) -> Self
120 where
121 B: BoxedImporter + 'static,
122 I: IntoIterator<Item = (&'static str, B)>,
123 {
124 importers.into_iter().fold(self, |this, (ext, importer)| {
125 this.with_importer(ext, importer)
126 })
127 }
128
129 pub fn with_importers_boxed<I>(mut self, importers: I) -> Self
130 where
131 I: IntoIterator<Item = (&'static str, Box<dyn BoxedImporter + 'static>)>,
132 {
133 for (ext, importer) in importers.into_iter() {
134 self.importers.insert(ext, importer)
135 }
136 self
137 }
138
139 pub fn add_importers<B, I>(&mut self, importers: I)
140 where
141 B: BoxedImporter + 'static,
142 I: IntoIterator<Item = (&'static str, B)>,
143 {
144 for (ext, importer) in importers {
145 self.add_importer(ext, importer)
146 }
147 }
148
149 pub fn with_importer_context(mut self, context: Box<dyn ImporterContext>) -> Self {
150 self.importer_contexts.push(context);
151 self
152 }
153
154 pub fn with_importer_contexts<I>(mut self, contexts: I) -> Self
155 where
156 I: IntoIterator<Item = Box<dyn ImporterContext>>,
157 {
158 self.importer_contexts.extend(contexts);
159 self
160 }
161
162 pub fn with_asset_dirs(mut self, dirs: Vec<PathBuf>) -> Self {
163 self.asset_dirs = dirs;
164 self
165 }
166
167 pub fn run(self) -> (JoinHandle<()>, Sender<bool>) {
168 let (tx, rx) = oneshot::channel();
169
170 let handle = thread::spawn(|| {
171 let rpc_runtime = tokio::runtime::Builder::new_current_thread()
172 .enable_all()
173 .build()
174 .unwrap();
175 let local = tokio::task::LocalSet::new();
176
177 rpc_runtime.block_on(local.run_until(self.run_rpc_runtime(rx)))
178 });
179
180 (handle, tx)
181 }
182
183 async fn run_rpc_runtime(self, mut rx: Receiver<bool>) {
184 let cache_dir = self.db_dir.join("cache");
185 let _ = fs::create_dir(&self.db_dir);
186 let _ = fs::create_dir(&cache_dir);
187
188 for dir in self.asset_dirs.iter() {
189 let _ = fs::create_dir_all(dir);
190 }
191
192 let asset_db = match Environment::new(&self.db_dir) {
193 Ok(db) => db,
194 Err(crate::Error::Lmdb(lmdb::Error::Other(1455))) => {
195 Environment::with_map_size(&self.db_dir, 1 << 31)
196 .expect("failed to create asset db")
197 }
198 Err(err) => panic!("failed to create asset db: {:?}", err),
199 };
200 let asset_db = Arc::new(asset_db);
201
202 check_db_version(&asset_db)
203 .await
204 .expect("failed to check daemon version in asset db");
205
206 let to_watch = self.asset_dirs.iter().map(|p| p.to_str().unwrap());
207 let tracker = FileTracker::new(asset_db.clone(), to_watch);
208 let tracker = Arc::new(tracker);
209
210 let hub = AssetHub::new(asset_db.clone()).expect("failed to create asset hub");
211 let hub = Arc::new(hub);
212
213 let importers = Arc::new(self.importers);
214 let ctxs = Arc::new(self.importer_contexts);
215 let cache_db = match Environment::new(&cache_dir) {
216 Ok(db) => db,
217 Err(crate::Error::Lmdb(lmdb::Error::Other(1455))) => {
218 Environment::with_map_size(&cache_dir, 1 << 31).expect("failed to create cache db")
219 }
220 Err(err) => panic!("failed to create cache db: {:?}", err),
221 };
222 let cache_db = Arc::new(cache_db);
223 let artifact_cache =
224 ArtifactCache::new(&cache_db).expect("failed to create artifact cache");
225 let artifact_cache = Arc::new(artifact_cache);
226
227 let asset_source =
228 FileAssetSource::new(&tracker, &hub, &asset_db, &importers, &artifact_cache, ctxs)
229 .expect("failed to create asset source");
230
231 let asset_source = Arc::new(asset_source);
232
233 let service = AssetHubService::new(
234 asset_db.clone(),
235 hub.clone(),
236 asset_source.clone(),
237 tracker.clone(),
238 artifact_cache.clone(),
239 );
240 let service = Arc::new(service);
241
242 let addr = self.address;
243
244 let shutdown_tracker = tracker.clone();
245
246 let mut service_handle = tokio::task::spawn_local(async move { service.run(addr).await });
247 let mut tracker_handle = tokio::task::spawn_local(async move { tracker.run().await });
248 let mut asset_source_handle =
249 tokio::task::spawn_local(async move { asset_source.run().await });
250
251 log::info!("Starting Daemon Loop");
252 loop {
253 tokio::select! {
254 done = &mut service_handle => done.expect("ServiceHandle panicked"),
255 done = &mut tracker_handle => done.expect("FileTracker panicked"),
256 done = &mut asset_source_handle => done.expect("AssetSource panicked"),
257 done = &mut rx => match done {
258 Ok(_) => {
259 log::warn!("Shutting Down!");
260 shutdown_tracker.stop().await;
261 return;
266 }
267 Err(_) => continue,
268 }
269 }
270 }
271 }
272}
273
274#[allow(clippy::string_lit_as_bytes)]
275async fn check_db_version(env: &Environment) -> Result<()> {
276 use crate::capnp_db::DBTransaction;
277 let tables = AssetDaemonTables::new(env).expect("failed to create AssetDaemon tables");
278 let txn = env.ro_txn().await?;
279 let info_key = "daemon_info".as_bytes();
280 let daemon_info = txn.get::<data::daemon_info::Owned, &[u8]>(tables.daemon_info, &info_key)?;
281 let mut clear_db = true;
282 if let Some(info) = daemon_info {
283 let info = info.get()?;
284 if info.get_version() == DAEMON_VERSION {
285 clear_db = false;
286 }
287 }
288
289 if clear_db {
290 let unnamed_db = env
291 .create_db(None, lmdb::DatabaseFlags::default())
292 .expect("failed to open unnamed DB when checking daemon info");
293 use lmdb::Cursor;
294 let mut databases = Vec::new();
295 for iter_result in txn
296 .open_ro_cursor(unnamed_db)
297 .expect("failed to create cursor when checking daemon info")
298 .iter_start()
299 {
300 let (key, _) = iter_result
301 .expect("failed to start iteration for cursor when checking daemon info");
302 let db_name = std::str::from_utf8(key).expect("failed to parse db name");
303 databases.push(
304 env.create_db(Some(db_name), lmdb::DatabaseFlags::default())
305 .unwrap_or_else(|err| {
306 panic!("failed to open db with name {}: {}", db_name, err)
307 }),
308 );
309 }
310 let mut txn = env.rw_txn().await?;
311 for db in databases {
312 txn.clear_db(db).expect("failed to clear db");
313 }
314 txn.commit()?;
315 }
316 let mut txn = env.rw_txn().await?;
317 let mut value_builder = capnp::message::Builder::new_default();
318 {
319 let mut m = value_builder.init_root::<data::daemon_info::Builder<'_>>();
320 m.set_version(DAEMON_VERSION);
321 }
322 txn.put(tables.daemon_info, &info_key, &value_builder)?;
323 txn.commit()?;
324
325 Ok(())
326}