stardust_xr_fusion/
client.rs1use crate::node::NodeResult;
4use crate::root::{Root, RootAspect};
5use crate::{node::NodeError, scenegraph::NodeRegistry};
6use global_counter::primitive::exact::CounterU64;
7use stardust_xr::schemas::flex::flexbuffers::DeserializationError;
8use stardust_xr::{
9 client,
10 messenger::{self, MessengerError},
11 messenger::{MessageReceiver, MessageSender, MessageSenderHandle},
12};
13use std::future::Future;
14use std::path::Path;
15use std::sync::Arc;
16use std::time::Instant;
17use thiserror::Error;
18use tokio::net::UnixStream;
19use tokio::sync::{Notify, OnceCell};
20use tokio::task::JoinHandle;
21
22#[derive(Error, Debug)]
23pub enum ClientError {
24 #[error("Could not connect to the stardust server")]
25 ConnectionFailure,
26 #[error("Node error: {0}")]
27 NodeError(NodeError),
28}
29impl From<NodeError> for ClientError {
30 fn from(e: NodeError) -> Self {
31 ClientError::NodeError(e)
32 }
33}
34impl From<MessengerError> for ClientError {
35 fn from(e: MessengerError) -> Self {
36 ClientError::NodeError(NodeError::MessengerError { e })
37 }
38}
39impl From<String> for ClientError {
40 fn from(e: String) -> Self {
41 ClientError::NodeError(NodeError::ReturnedError { e })
42 }
43}
44impl From<DeserializationError> for ClientError {
45 fn from(e: DeserializationError) -> Self {
46 ClientError::NodeError(NodeError::Deserialization { e })
47 }
48}
49
50#[macro_export]
51macro_rules! project_local_resources {
52 ($relative_path:expr) => {
53 std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")).join($relative_path)
54 };
55}
56
57pub struct Client {
58 internal: Arc<ClientHandle>,
59 message_rx: MessageReceiver,
60 message_tx: MessageSender,
61}
62impl Client {
63 pub async fn connect() -> Result<Self, ClientError> {
65 let connection = client::connect()
66 .await
67 .map_err(|_| ClientError::ConnectionFailure)?;
68 let client = Client::from_connection(connection);
69 Ok(client)
70 }
71
72 pub fn from_connection(connection: UnixStream) -> Self {
74 let (message_tx, message_rx) = messenger::create(connection);
75 let internal = ClientHandle::new(&message_tx);
76 Client {
77 internal,
78 message_rx,
79 message_tx,
80 }
81 }
82
83 pub fn handle(&self) -> Arc<ClientHandle> {
84 self.internal.clone()
85 }
86 pub fn get_root(&self) -> &Root {
87 self.internal.get_root()
88 }
89
90 pub fn setup_resources(&self, paths: &[&Path]) -> NodeResult<()> {
91 let paths = paths.iter().map(|p| p.to_string_lossy().to_string());
92 let runtime_prefixes = std::env::var("STARDUST_RES_PREFIXES").ok();
93 let env_prefixes = runtime_prefixes
94 .as_deref()
95 .or(option_env!("STARDUST_RES_PREFIXES"))
96 .into_iter()
97 .flat_map(|f| f.split(':'))
98 .map(|p| p.to_string());
99
100 let prefixes = env_prefixes.chain(paths).collect::<Vec<String>>();
101 self.get_root().set_base_prefixes(&prefixes)
102 }
103
104 pub async fn dispatch(&mut self) -> Result<(), MessengerError> {
105 self.message_rx.dispatch(&self.internal.registry).await
107 }
108 pub async fn flush(&mut self) -> Result<(), MessengerError> {
110 self.message_tx.flush().await
111 }
112 pub async fn try_flush(&mut self) -> Result<(), MessengerError> {
114 self.message_tx.try_flush().await
115 }
116
117 pub async fn await_method<O, F: Future<Output = O>>(
118 &mut self,
119 f: F,
120 ) -> Result<O, MessengerError> {
121 let dispatch_loop = async {
122 loop {
123 self.try_flush().await?;
124 self.dispatch().await?;
125 }
126 };
127 tokio::select! {
128 e = dispatch_loop => e,
129 v = f => Ok(v),
130 }
131 }
132
133 pub async fn sync_event_loop<F: FnMut(&Arc<ClientHandle>, &mut ControlFlow)>(
134 &mut self,
135 mut f: F,
136 ) -> Result<(), MessengerError> {
137 let mut flow = ControlFlow::Wait;
138 let handle = self.handle();
139 loop {
140 self.try_flush().await?;
141 match flow {
142 ControlFlow::Poll => Ok(()),
143 ControlFlow::Wait => self.dispatch().await,
144 ControlFlow::WaitUntil(instant) => tokio::select! {
145 _ = tokio::time::sleep_until(tokio::time::Instant::from_std(instant)) => Ok(()),
146 r = self.dispatch() => r,
147 },
148 ControlFlow::Stop => break,
149 }?;
150 (f)(&handle, &mut flow);
151 }
152 Ok(())
153 }
154 pub fn async_event_loop(mut self) -> AsyncEventLoop {
155 let client_handle = self.handle();
156 let stop_notify = Arc::new(Notify::new());
157 let wait_notify = Arc::new(Notify::new());
158 let join_handle = tokio::spawn({
159 let stop_notify = stop_notify.clone();
160 let wait_notify = wait_notify.clone();
161 async move {
162 loop {
163 tokio::select! {
164 r = self.message_tx.flush() => r?,
165 r = self.message_rx.dispatch(&self.internal.registry) => r?,
166 _ = stop_notify.notified() => break,
167 }
168 wait_notify.notify_waiters();
169 }
170 Ok(Client {
171 internal: self.internal,
172 message_rx: self.message_rx,
173 message_tx: self.message_tx,
174 })
175 }
176 });
177 AsyncEventLoop {
178 client_handle,
179 stop_notify,
180 join_handle,
181 wait_notify,
182 }
183 }
184}
185
186#[derive(Clone, Debug)]
187pub struct AsyncEventHandle(Arc<Notify>);
188
189impl AsyncEventHandle {
190 pub async fn wait(&self) {
191 self.0.notified().await
192 }
193}
194
195pub struct AsyncEventLoop {
196 pub client_handle: Arc<ClientHandle>,
197 stop_notify: Arc<Notify>,
198 join_handle: JoinHandle<Result<Client, MessengerError>>,
199 wait_notify: Arc<Notify>,
200}
201impl AsyncEventLoop {
202 pub async fn stop(self) -> Result<Client, MessengerError> {
203 self.stop_notify.notify_waiters();
204 self.join_handle
205 .await
206 .map_err(|e| MessengerError::IOError(e.into()))?
207 }
208
209 pub fn get_event_handle(&self) -> AsyncEventHandle {
210 AsyncEventHandle(self.wait_notify.clone())
211 }
212}
213
214#[derive(Debug, Clone, Copy)]
215pub enum ControlFlow {
216 Poll,
217 Wait,
218 WaitUntil(Instant),
219 Stop,
220}
221impl ControlFlow {
222 pub fn poll(&mut self) {
223 *self = ControlFlow::Poll;
224 }
225 pub fn wait(&mut self) {
226 *self = ControlFlow::Wait;
227 }
228 pub fn wait_until(&mut self, instant: Instant) {
229 *self = ControlFlow::WaitUntil(instant);
230 }
231 pub fn stop(&mut self) {
232 *self = ControlFlow::Stop;
233 }
234}
235
236pub struct ClientHandle {
238 pub message_sender_handle: MessageSenderHandle,
239 pub(crate) registry: NodeRegistry,
240 id_counter: CounterU64,
241 root: OnceCell<Root>,
242}
243impl ClientHandle {
244 fn new(message_tx: &MessageSender) -> Arc<Self> {
245 let client = Arc::new_cyclic(|weak_handle| ClientHandle {
246 message_sender_handle: message_tx.handle(),
247 registry: NodeRegistry::new(weak_handle.clone()),
248 id_counter: CounterU64::new(u64::MAX / 2),
249 root: OnceCell::new(),
250 });
251 let _ = client.root.set(Root::from_id(&client, 0, true));
252 client
253 }
254
255 pub fn get_root(&self) -> &Root {
257 self.root.get().as_ref().unwrap()
258 }
259
260 pub fn generate_id(&self) -> u64 {
261 self.id_counter.inc()
262 }
263}
264impl Drop for ClientHandle {
265 fn drop(&mut self) {
266 let _ = self.get_root().disconnect();
267 }
268}
269
270#[tokio::test]
271async fn fusion_client_connect() {
272 Client::connect().await.unwrap();
273}
274
275#[tokio::test]
276async fn fusion_client_life_cycle() {
277 use crate::root::*;
278 let mut client = Client::connect().await.unwrap();
279 tokio::task::spawn(async {
280 tokio::time::sleep(core::time::Duration::from_secs(5)).await;
281 panic!("Timed Out");
282 });
283 client
284 .sync_event_loop(|client, flow| {
285 while let Some(event) = client.get_root().recv_root_event() {
286 match event {
287 RootEvent::Ping { response } => {
288 response.send_ok(());
289 }
290 RootEvent::Frame { info: _ } => {
291 println!("Got frame event");
292 flow.stop();
293 }
294 RootEvent::SaveState { response } => response.send_ok(ClientState::default()),
295 }
296 }
297 })
298 .await
299 .unwrap();
300}