1use keket::{
2 database::path::AssetPath,
3 fetch::{AssetAwaitsResolution, AssetBytesAreReadyToProcess, AssetFetch},
4 third_party::anput::{
5 bundle::DynamicBundle, entity::Entity, query::Update,
6 third_party::intuicio_data::type_hash::TypeHash, world::World,
7 },
8};
9use reqwest::Url;
10use std::{
11 error::Error,
12 net::{SocketAddr, TcpStream},
13};
14use tungstenite::{WebSocket, connect, stream::MaybeTlsStream};
15
16pub mod third_party {
17 pub use reqwest;
18 pub use tungstenite;
19}
20
21pub struct AssetFromClient;
23
24pub struct ClientAssetFetch {
26 root: Url,
27 socket: WebSocket<MaybeTlsStream<TcpStream>>,
28}
29
30impl ClientAssetFetch {
31 pub fn new(address: &str) -> Result<Self, Box<dyn Error>> {
40 address.parse::<SocketAddr>()?;
41 let root = format!("http://{address}/assets/").parse::<Url>()?;
42 let (socket, _) = connect(format!("ws://{address}/changes"))?;
43 if let MaybeTlsStream::Plain(tcp) = socket.get_ref() {
44 tcp.set_nonblocking(true)?;
45 }
46 Ok(Self { root, socket })
47 }
48}
49
50impl AssetFetch for ClientAssetFetch {
51 fn load_bytes(&self, path: AssetPath) -> Result<DynamicBundle, Box<dyn Error>> {
52 let url = self.root.join(path.path()).map_err(|error| {
53 format!(
54 "Failed to join root URL: `{}` with path: `{}`. Error: {}",
55 self.root,
56 path.path_with_meta(),
57 error
58 )
59 })?;
60 let mut response = reqwest::blocking::get(url.clone())
61 .map_err(|error| format!("Failed to get HTTP content from: `{url}`. Error: {error}"))?;
62 let mut bytes = vec![];
63 response.copy_to(&mut bytes).map_err(|error| {
64 format!("Failed to read bytes response from: `{url}`. Error: {error}")
65 })?;
66 let mut bundle = DynamicBundle::default();
67 let _ = bundle.add_component(AssetBytesAreReadyToProcess(bytes));
68 let _ = bundle.add_component(AssetFromClient);
69 let _ = bundle.add_component(url);
70 Ok(bundle)
71 }
72
73 fn maintain(&mut self, storage: &mut World) -> Result<(), Box<dyn Error>> {
74 if self.socket.can_read() {
75 let paths = std::iter::from_fn(|| self.socket.read().ok())
76 .filter(|message| message.is_text())
77 .filter_map(|message| message.to_text().ok().map(|path| path.to_owned()))
78 .collect::<Vec<_>>();
79 if !paths.is_empty() {
80 let to_refresh = storage
81 .query::<true, (Entity, Update<AssetPath>)>()
82 .filter(|(_, path)| paths.iter().any(|p| p == path.read().path()))
83 .inspect(|(_, path)| path.notify(storage))
84 .map(|(entity, _)| entity)
85 .collect::<Vec<_>>();
86 for entity in to_refresh {
87 let columns = storage
88 .row::<true>(entity)?
89 .columns()
90 .filter(|info| info.type_hash() != TypeHash::of::<AssetPath>())
91 .cloned()
92 .collect::<Vec<_>>();
93 storage.remove_raw(entity, columns)?;
94 storage.insert(entity, (AssetAwaitsResolution,))?;
95 }
96 }
97 }
98 Ok(())
99 }
100}