stardust_xr_fusion/
client.rs

1//! Your connection to the Stardust server and other essentials.
2
3use crate::node::NodeResult;
4use crate::root::{Root, RootAspect};
5use crate::{node::NodeError, scenegraph::NodeRegistry};
6use global_counter::primitive::exact::CounterU64;
7use stardust_xr::schemas::flex::flexbuffers::DeserializationError;
8use stardust_xr::{
9	client,
10	messenger::{self, MessengerError},
11	messenger::{MessageReceiver, MessageSender, MessageSenderHandle},
12};
13use std::future::Future;
14use std::path::Path;
15use std::sync::Arc;
16use std::time::Instant;
17use thiserror::Error;
18use tokio::net::UnixStream;
19use tokio::sync::{Notify, OnceCell};
20use tokio::task::JoinHandle;
21
22#[derive(Error, Debug)]
23pub enum ClientError {
24	#[error("Could not connect to the stardust server")]
25	ConnectionFailure,
26	#[error("Node error: {0}")]
27	NodeError(NodeError),
28}
29impl From<NodeError> for ClientError {
30	fn from(e: NodeError) -> Self {
31		ClientError::NodeError(e)
32	}
33}
34impl From<MessengerError> for ClientError {
35	fn from(e: MessengerError) -> Self {
36		ClientError::NodeError(NodeError::MessengerError { e })
37	}
38}
39impl From<String> for ClientError {
40	fn from(e: String) -> Self {
41		ClientError::NodeError(NodeError::ReturnedError { e })
42	}
43}
44impl From<DeserializationError> for ClientError {
45	fn from(e: DeserializationError) -> Self {
46		ClientError::NodeError(NodeError::Deserialization { e })
47	}
48}
49
50#[macro_export]
51macro_rules! project_local_resources {
52	($relative_path:expr) => {
53		std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")).join($relative_path)
54	};
55}
56
57pub struct Client {
58	internal: Arc<ClientHandle>,
59	message_rx: MessageReceiver,
60	message_tx: MessageSender,
61}
62impl Client {
63	/// Try to connect to the server, return messenger halves for manually setting up the event loop.
64	pub async fn connect() -> Result<Self, ClientError> {
65		let connection = client::connect()
66			.await
67			.map_err(|_| ClientError::ConnectionFailure)?;
68		let client = Client::from_connection(connection);
69		Ok(client)
70	}
71
72	/// Create a client and messenger halves from an established tokio async `UnixStream` for manually setting up the event loop.
73	pub fn from_connection(connection: UnixStream) -> Self {
74		let (message_tx, message_rx) = messenger::create(connection);
75		let internal = ClientHandle::new(&message_tx);
76		Client {
77			internal,
78			message_rx,
79			message_tx,
80		}
81	}
82
83	pub fn handle(&self) -> Arc<ClientHandle> {
84		self.internal.clone()
85	}
86	pub fn get_root(&self) -> &Root {
87		self.internal.get_root()
88	}
89
90	pub fn setup_resources(&self, paths: &[&Path]) -> NodeResult<()> {
91		let paths = paths.iter().map(|p| p.to_string_lossy().to_string());
92		let runtime_prefixes = std::env::var("STARDUST_RES_PREFIXES").ok();
93		let env_prefixes = runtime_prefixes
94			.as_deref()
95			.or(option_env!("STARDUST_RES_PREFIXES"))
96			.into_iter()
97			.flat_map(|f| f.split(':'))
98			.map(|p| p.to_string());
99
100		let prefixes = env_prefixes.chain(paths).collect::<Vec<String>>();
101		self.get_root().set_base_prefixes(&prefixes)
102	}
103
104	pub async fn dispatch(&mut self) -> Result<(), MessengerError> {
105		// Use the new registry for better performance, but fallback to legacy scenegraph
106		self.message_rx.dispatch(&self.internal.registry).await
107	}
108	/// this one will wait until there's some message to send
109	pub async fn flush(&mut self) -> Result<(), MessengerError> {
110		self.message_tx.flush().await
111	}
112	/// this one will try to send any messages if they're in the queue and if not return immediately
113	pub async fn try_flush(&mut self) -> Result<(), MessengerError> {
114		self.message_tx.try_flush().await
115	}
116
117	pub async fn await_method<O, F: Future<Output = O>>(
118		&mut self,
119		f: F,
120	) -> Result<O, MessengerError> {
121		let dispatch_loop = async {
122			loop {
123				self.try_flush().await?;
124				self.dispatch().await?;
125			}
126		};
127		tokio::select! {
128			e = dispatch_loop => e,
129			v = f => Ok(v),
130		}
131	}
132
133	pub async fn sync_event_loop<F: FnMut(&Arc<ClientHandle>, &mut ControlFlow)>(
134		&mut self,
135		mut f: F,
136	) -> Result<(), MessengerError> {
137		let mut flow = ControlFlow::Wait;
138		let handle = self.handle();
139		loop {
140			self.try_flush().await?;
141			match flow {
142				ControlFlow::Poll => Ok(()),
143				ControlFlow::Wait => self.dispatch().await,
144				ControlFlow::WaitUntil(instant) => tokio::select! {
145					_ = tokio::time::sleep_until(tokio::time::Instant::from_std(instant)) => Ok(()),
146					r = self.dispatch() => r,
147				},
148				ControlFlow::Stop => break,
149			}?;
150			(f)(&handle, &mut flow);
151		}
152		Ok(())
153	}
154	pub fn async_event_loop(mut self) -> AsyncEventLoop {
155		let client_handle = self.handle();
156		let stop_notify = Arc::new(Notify::new());
157		let wait_notify = Arc::new(Notify::new());
158		let join_handle = tokio::spawn({
159			let stop_notify = stop_notify.clone();
160			let wait_notify = wait_notify.clone();
161			async move {
162				loop {
163					tokio::select! {
164						r = self.message_tx.flush() => r?,
165						r = self.message_rx.dispatch(&self.internal.registry) => r?,
166						_ = stop_notify.notified() => break,
167					}
168					wait_notify.notify_waiters();
169				}
170				Ok(Client {
171					internal: self.internal,
172					message_rx: self.message_rx,
173					message_tx: self.message_tx,
174				})
175			}
176		});
177		AsyncEventLoop {
178			client_handle,
179			stop_notify,
180			join_handle,
181			wait_notify,
182		}
183	}
184}
185
186#[derive(Clone, Debug)]
187pub struct AsyncEventHandle(Arc<Notify>);
188
189impl AsyncEventHandle {
190	pub async fn wait(&self) {
191		self.0.notified().await
192	}
193}
194
195pub struct AsyncEventLoop {
196	pub client_handle: Arc<ClientHandle>,
197	stop_notify: Arc<Notify>,
198	join_handle: JoinHandle<Result<Client, MessengerError>>,
199	wait_notify: Arc<Notify>,
200}
201impl AsyncEventLoop {
202	pub async fn stop(self) -> Result<Client, MessengerError> {
203		self.stop_notify.notify_waiters();
204		self.join_handle
205			.await
206			.map_err(|e| MessengerError::IOError(e.into()))?
207	}
208
209	pub fn get_event_handle(&self) -> AsyncEventHandle {
210		AsyncEventHandle(self.wait_notify.clone())
211	}
212}
213
214#[derive(Debug, Clone, Copy)]
215pub enum ControlFlow {
216	Poll,
217	Wait,
218	WaitUntil(Instant),
219	Stop,
220}
221impl ControlFlow {
222	pub fn poll(&mut self) {
223		*self = ControlFlow::Poll;
224	}
225	pub fn wait(&mut self) {
226		*self = ControlFlow::Wait;
227	}
228	pub fn wait_until(&mut self, instant: Instant) {
229		*self = ControlFlow::WaitUntil(instant);
230	}
231	pub fn stop(&mut self) {
232		*self = ControlFlow::Stop;
233	}
234}
235
236/// Your connection to the Stardust server.
237pub struct ClientHandle {
238	pub message_sender_handle: MessageSenderHandle,
239	pub(crate) registry: NodeRegistry,
240	id_counter: CounterU64,
241	root: OnceCell<Root>,
242}
243impl ClientHandle {
244	fn new(message_tx: &MessageSender) -> Arc<Self> {
245		let client = Arc::new_cyclic(|weak_handle| ClientHandle {
246			message_sender_handle: message_tx.handle(),
247			registry: NodeRegistry::new(weak_handle.clone()),
248			id_counter: CounterU64::new(u64::MAX / 2),
249			root: OnceCell::new(),
250		});
251		let _ = client.root.set(Root::from_id(&client, 0, true));
252		client
253	}
254
255	/// Get a reference to the client's root node, a spatial that exists where the client was spawned.
256	pub fn get_root(&self) -> &Root {
257		self.root.get().as_ref().unwrap()
258	}
259
260	pub fn generate_id(&self) -> u64 {
261		self.id_counter.inc()
262	}
263}
264impl Drop for ClientHandle {
265	fn drop(&mut self) {
266		let _ = self.get_root().disconnect();
267	}
268}
269
270#[tokio::test]
271async fn fusion_client_connect() {
272	Client::connect().await.unwrap();
273}
274
275#[tokio::test]
276async fn fusion_client_life_cycle() {
277	use crate::root::*;
278	let mut client = Client::connect().await.unwrap();
279	tokio::task::spawn(async {
280		tokio::time::sleep(core::time::Duration::from_secs(5)).await;
281		panic!("Timed Out");
282	});
283	client
284		.sync_event_loop(|client, flow| {
285			while let Some(event) = client.get_root().recv_root_event() {
286				match event {
287					RootEvent::Ping { response } => {
288						response.send_ok(());
289					}
290					RootEvent::Frame { info: _ } => {
291						println!("Got frame event");
292						flow.stop();
293					}
294					RootEvent::SaveState { response } => response.send_ok(ClientState::default()),
295				}
296			}
297		})
298		.await
299		.unwrap();
300}