Skip to main content

keket_client/
lib.rs

1use keket::{
2    database::path::AssetPath,
3    fetch::{AssetAwaitsResolution, AssetBytesAreReadyToProcess, AssetFetch},
4    third_party::anput::{
5        bundle::DynamicBundle, entity::Entity, query::Update,
6        third_party::intuicio_data::type_hash::TypeHash, world::World,
7    },
8};
9use reqwest::Url;
10use std::{
11    error::Error,
12    net::{SocketAddr, TcpStream},
13};
14use tungstenite::{WebSocket, connect, stream::MaybeTlsStream};
15
16pub mod third_party {
17    pub use reqwest;
18    pub use tungstenite;
19}
20
21/// A marker struct indicating an asset originates from asset server client.
22pub struct AssetFromClient;
23
24/// Client asset fetch from asset server.
25pub struct ClientAssetFetch {
26    root: Url,
27    socket: WebSocket<MaybeTlsStream<TcpStream>>,
28}
29
30impl ClientAssetFetch {
31    /// Creates a new instance of `ClientAssetFetch` with the given address.
32    ///
33    /// # Arguments
34    /// - `address`: A string slice representing the server IP address.
35    ///
36    /// # Returns
37    /// - `Ok(ClientAssetFetch)` if the initialization is successful.
38    /// - `Err(Box<dyn Error>)` if any parsing errors occur.
39    pub fn new(address: &str) -> Result<Self, Box<dyn Error>> {
40        address.parse::<SocketAddr>()?;
41        let root = format!("http://{address}/assets/").parse::<Url>()?;
42        let (socket, _) = connect(format!("ws://{address}/changes"))?;
43        if let MaybeTlsStream::Plain(tcp) = socket.get_ref() {
44            tcp.set_nonblocking(true)?;
45        }
46        Ok(Self { root, socket })
47    }
48}
49
50impl AssetFetch for ClientAssetFetch {
51    fn load_bytes(&self, path: AssetPath) -> Result<DynamicBundle, Box<dyn Error>> {
52        let url = self.root.join(path.path()).map_err(|error| {
53            format!(
54                "Failed to join root URL: `{}` with path: `{}`. Error: {}",
55                self.root,
56                path.path_with_meta(),
57                error
58            )
59        })?;
60        let mut response = reqwest::blocking::get(url.clone())
61            .map_err(|error| format!("Failed to get HTTP content from: `{url}`. Error: {error}"))?;
62        let mut bytes = vec![];
63        response.copy_to(&mut bytes).map_err(|error| {
64            format!("Failed to read bytes response from: `{url}`. Error: {error}")
65        })?;
66        let mut bundle = DynamicBundle::default();
67        let _ = bundle.add_component(AssetBytesAreReadyToProcess(bytes));
68        let _ = bundle.add_component(AssetFromClient);
69        let _ = bundle.add_component(url);
70        Ok(bundle)
71    }
72
73    fn maintain(&mut self, storage: &mut World) -> Result<(), Box<dyn Error>> {
74        if self.socket.can_read() {
75            let paths = std::iter::from_fn(|| self.socket.read().ok())
76                .filter(|message| message.is_text())
77                .filter_map(|message| message.to_text().ok().map(|path| path.to_owned()))
78                .collect::<Vec<_>>();
79            if !paths.is_empty() {
80                let to_refresh = storage
81                    .query::<true, (Entity, Update<AssetPath>)>()
82                    .filter(|(_, path)| paths.iter().any(|p| p == path.read().path()))
83                    .inspect(|(_, path)| path.notify(storage))
84                    .map(|(entity, _)| entity)
85                    .collect::<Vec<_>>();
86                for entity in to_refresh {
87                    let columns = storage
88                        .row::<true>(entity)?
89                        .columns()
90                        .filter(|info| info.type_hash() != TypeHash::of::<AssetPath>())
91                        .cloned()
92                        .collect::<Vec<_>>();
93                    storage.remove_raw(entity, columns)?;
94                    storage.insert(entity, (AssetAwaitsResolution,))?;
95                }
96            }
97        }
98        Ok(())
99    }
100}