distill_daemon/
daemon.rs

1use std::{
2    collections::HashMap,
3    fs,
4    net::SocketAddr,
5    path::{Path, PathBuf},
6    sync::Arc,
7    thread,
8    thread::JoinHandle,
9};
10
11use asset_hub::AssetHub;
12use asset_hub_service::AssetHubService;
13use distill_importer::{BoxedImporter, ImporterContext};
14use distill_schema::data;
15use file_asset_source::FileAssetSource;
16use tokio::sync::oneshot::{self, Receiver, Sender};
17
18use crate::{
19    artifact_cache::ArtifactCache, asset_hub, asset_hub_service, capnp_db::Environment,
20    error::Result, file_asset_source, file_tracker::FileTracker,
21};
22
23#[derive(Default)]
24pub struct ImporterMap(HashMap<String, Box<dyn BoxedImporter>>);
25
26impl ImporterMap {
27    pub fn insert(&mut self, ext: &str, importer: Box<dyn BoxedImporter>) {
28        self.0.insert(ext.to_lowercase(), importer);
29    }
30
31    pub fn get_by_path<'a>(&'a self, path: &Path) -> Option<&'a dyn BoxedImporter> {
32        let lower_extension = path
33            .extension()
34            .map(|s| s.to_str().unwrap().to_lowercase())
35            .unwrap_or_else(|| "".to_string());
36        self.0.get(lower_extension.as_str()).map(|i| i.as_ref())
37    }
38}
39
40struct AssetDaemonTables {
41    /// Contains metadata about the daemon version and settings
42    /// String -> Blob
43    daemon_info: lmdb::Database,
44}
45impl AssetDaemonTables {
46    fn new(db: &Environment) -> Result<Self> {
47        Ok(Self {
48            daemon_info: db.create_db(Some("daemon_info"), lmdb::DatabaseFlags::default())?,
49        })
50    }
51}
52
53const DAEMON_VERSION: u32 = 2;
54pub struct AssetDaemon {
55    pub db_dir: PathBuf,
56    pub address: SocketAddr,
57    pub importers: ImporterMap,
58    pub importer_contexts: Vec<Box<dyn ImporterContext>>,
59    pub asset_dirs: Vec<PathBuf>,
60}
61
62pub fn default_importer_contexts() -> Vec<Box<dyn ImporterContext + 'static>> {
63    vec![distill_loader::if_handle_enabled!(Box::new(
64        distill_loader::handle::HandleSerdeContextProvider
65    ))]
66}
67
68#[allow(unused_mut)]
69pub fn default_importers() -> Vec<(&'static str, Box<dyn BoxedImporter>)> {
70    let mut importers: Vec<(&'static str, Box<dyn BoxedImporter>)> = vec![];
71
72    distill_importer::if_serde_importers!(
73        importers.push(("ron", Box::new(distill_importer::RonImporter::default())))
74    );
75    importers
76}
77impl Default for AssetDaemon {
78    fn default() -> Self {
79        let mut importer_map = ImporterMap::default();
80        for (ext, importer) in default_importers() {
81            importer_map.insert(ext, importer);
82        }
83        Self {
84            db_dir: PathBuf::from(".assets_db"),
85            address: "127.0.0.1:9999".parse().unwrap(),
86            importers: importer_map,
87            importer_contexts: default_importer_contexts(),
88            asset_dirs: vec![PathBuf::from("assets")],
89        }
90    }
91}
92
93impl AssetDaemon {
94    pub fn with_db_path<P: AsRef<Path>>(mut self, path: P) -> Self {
95        self.db_dir = path.as_ref().to_owned();
96        self
97    }
98
99    pub fn with_address(mut self, address: SocketAddr) -> Self {
100        self.address = address;
101        self
102    }
103
104    pub fn with_importer<B>(mut self, ext: &str, importer: B) -> Self
105    where
106        B: BoxedImporter + 'static,
107    {
108        self.importers.insert(ext, Box::new(importer));
109        self
110    }
111
112    pub fn add_importer<B>(&mut self, ext: &str, importer: B)
113    where
114        B: BoxedImporter + 'static,
115    {
116        self.importers.insert(ext, Box::new(importer));
117    }
118
119    pub fn with_importers<B, I>(self, importers: I) -> Self
120    where
121        B: BoxedImporter + 'static,
122        I: IntoIterator<Item = (&'static str, B)>,
123    {
124        importers.into_iter().fold(self, |this, (ext, importer)| {
125            this.with_importer(ext, importer)
126        })
127    }
128
129    pub fn with_importers_boxed<I>(mut self, importers: I) -> Self
130    where
131        I: IntoIterator<Item = (&'static str, Box<dyn BoxedImporter + 'static>)>,
132    {
133        for (ext, importer) in importers.into_iter() {
134            self.importers.insert(ext, importer)
135        }
136        self
137    }
138
139    pub fn add_importers<B, I>(&mut self, importers: I)
140    where
141        B: BoxedImporter + 'static,
142        I: IntoIterator<Item = (&'static str, B)>,
143    {
144        for (ext, importer) in importers {
145            self.add_importer(ext, importer)
146        }
147    }
148
149    pub fn with_importer_context(mut self, context: Box<dyn ImporterContext>) -> Self {
150        self.importer_contexts.push(context);
151        self
152    }
153
154    pub fn with_importer_contexts<I>(mut self, contexts: I) -> Self
155    where
156        I: IntoIterator<Item = Box<dyn ImporterContext>>,
157    {
158        self.importer_contexts.extend(contexts);
159        self
160    }
161
162    pub fn with_asset_dirs(mut self, dirs: Vec<PathBuf>) -> Self {
163        self.asset_dirs = dirs;
164        self
165    }
166
167    pub fn run(self) -> (JoinHandle<()>, Sender<bool>) {
168        let (tx, rx) = oneshot::channel();
169
170        let handle = thread::spawn(|| {
171            let rpc_runtime = tokio::runtime::Builder::new_current_thread()
172                .enable_all()
173                .build()
174                .unwrap();
175            let local = tokio::task::LocalSet::new();
176
177            rpc_runtime.block_on(local.run_until(self.run_rpc_runtime(rx)))
178        });
179
180        (handle, tx)
181    }
182
183    async fn run_rpc_runtime(self, mut rx: Receiver<bool>) {
184        let cache_dir = self.db_dir.join("cache");
185        let _ = fs::create_dir(&self.db_dir);
186        let _ = fs::create_dir(&cache_dir);
187
188        for dir in self.asset_dirs.iter() {
189            let _ = fs::create_dir_all(dir);
190        }
191
192        let asset_db = match Environment::new(&self.db_dir) {
193            Ok(db) => db,
194            Err(crate::Error::Lmdb(lmdb::Error::Other(1455))) => {
195                Environment::with_map_size(&self.db_dir, 1 << 31)
196                    .expect("failed to create asset db")
197            }
198            Err(err) => panic!("failed to create asset db: {:?}", err),
199        };
200        let asset_db = Arc::new(asset_db);
201
202        check_db_version(&asset_db)
203            .await
204            .expect("failed to check daemon version in asset db");
205
206        let to_watch = self.asset_dirs.iter().map(|p| p.to_str().unwrap());
207        let tracker = FileTracker::new(asset_db.clone(), to_watch);
208        let tracker = Arc::new(tracker);
209
210        let hub = AssetHub::new(asset_db.clone()).expect("failed to create asset hub");
211        let hub = Arc::new(hub);
212
213        let importers = Arc::new(self.importers);
214        let ctxs = Arc::new(self.importer_contexts);
215        let cache_db = match Environment::new(&cache_dir) {
216            Ok(db) => db,
217            Err(crate::Error::Lmdb(lmdb::Error::Other(1455))) => {
218                Environment::with_map_size(&cache_dir, 1 << 31).expect("failed to create cache db")
219            }
220            Err(err) => panic!("failed to create cache db: {:?}", err),
221        };
222        let cache_db = Arc::new(cache_db);
223        let artifact_cache =
224            ArtifactCache::new(&cache_db).expect("failed to create artifact cache");
225        let artifact_cache = Arc::new(artifact_cache);
226
227        let asset_source =
228            FileAssetSource::new(&tracker, &hub, &asset_db, &importers, &artifact_cache, ctxs)
229                .expect("failed to create asset source");
230
231        let asset_source = Arc::new(asset_source);
232
233        let service = AssetHubService::new(
234            asset_db.clone(),
235            hub.clone(),
236            asset_source.clone(),
237            tracker.clone(),
238            artifact_cache.clone(),
239        );
240        let service = Arc::new(service);
241
242        let addr = self.address;
243
244        let shutdown_tracker = tracker.clone();
245
246        let mut service_handle = tokio::task::spawn_local(async move { service.run(addr).await });
247        let mut tracker_handle = tokio::task::spawn_local(async move { tracker.run().await });
248        let mut asset_source_handle =
249            tokio::task::spawn_local(async move { asset_source.run().await });
250
251        log::info!("Starting Daemon Loop");
252        loop {
253            tokio::select! {
254                done = &mut service_handle => done.expect("ServiceHandle panicked"),
255                done = &mut tracker_handle => done.expect("FileTracker panicked"),
256                done = &mut asset_source_handle => done.expect("AssetSource panicked"),
257                done = &mut rx => match done {
258                    Ok(_) => {
259                        log::warn!("Shutting Down!");
260                        shutdown_tracker.stop().await;
261                        // shutdown_service.stop().await;
262                        // shutdown_asset_source.stop().await;
263                        // any value on this channel means shutdown
264                        // TODO: better shutdown
265                        return;
266                    }
267                    Err(_) => continue,
268                }
269            }
270        }
271    }
272}
273
274#[allow(clippy::string_lit_as_bytes)]
275async fn check_db_version(env: &Environment) -> Result<()> {
276    use crate::capnp_db::DBTransaction;
277    let tables = AssetDaemonTables::new(env).expect("failed to create AssetDaemon tables");
278    let txn = env.ro_txn().await?;
279    let info_key = "daemon_info".as_bytes();
280    let daemon_info = txn.get::<data::daemon_info::Owned, &[u8]>(tables.daemon_info, &info_key)?;
281    let mut clear_db = true;
282    if let Some(info) = daemon_info {
283        let info = info.get()?;
284        if info.get_version() == DAEMON_VERSION {
285            clear_db = false;
286        }
287    }
288
289    if clear_db {
290        let unnamed_db = env
291            .create_db(None, lmdb::DatabaseFlags::default())
292            .expect("failed to open unnamed DB when checking daemon info");
293        use lmdb::Cursor;
294        let mut databases = Vec::new();
295        for iter_result in txn
296            .open_ro_cursor(unnamed_db)
297            .expect("failed to create cursor when checking daemon info")
298            .iter_start()
299        {
300            let (key, _) = iter_result
301                .expect("failed to start iteration for cursor when checking daemon info");
302            let db_name = std::str::from_utf8(key).expect("failed to parse db name");
303            databases.push(
304                env.create_db(Some(db_name), lmdb::DatabaseFlags::default())
305                    .unwrap_or_else(|err| {
306                        panic!("failed to open db with name {}: {}", db_name, err)
307                    }),
308            );
309        }
310        let mut txn = env.rw_txn().await?;
311        for db in databases {
312            txn.clear_db(db).expect("failed to clear db");
313        }
314        txn.commit()?;
315    }
316    let mut txn = env.rw_txn().await?;
317    let mut value_builder = capnp::message::Builder::new_default();
318    {
319        let mut m = value_builder.init_root::<data::daemon_info::Builder<'_>>();
320        m.set_version(DAEMON_VERSION);
321    }
322    txn.put(tables.daemon_info, &info_key, &value_builder)?;
323    txn.commit()?;
324
325    Ok(())
326}