1use crate::{
2 document::init_document,
3 element::LiveviewElement,
4 events::SerializedHtmlEventConverter,
5 query::{QueryEngine, QueryResult},
6 LiveViewError,
7};
8
9use dioxus_core::{provide_context, Element, Event, ScopeId, VirtualDom};
10use dioxus_html::{EventData, HtmlEvent, PlatformEventData};
11use dioxus_interpreter_js::MutationState;
12use futures_util::{pin_mut, SinkExt, StreamExt};
13use serde::Serialize;
14use std::{any::Any, rc::Rc};
15use tokio_util::task::LocalPoolHandle;
16
17#[derive(Clone)]
18pub struct LiveViewPool {
19 pub(crate) pool: LocalPoolHandle,
20}
21
22impl Default for LiveViewPool {
23 fn default() -> Self {
24 Self::new()
25 }
26}
27
28impl LiveViewPool {
29 pub fn new() -> Self {
30 dioxus_html::set_event_converter(Box::new(SerializedHtmlEventConverter));
32
33 LiveViewPool {
34 pool: LocalPoolHandle::new(
35 std::thread::available_parallelism()
36 .map(usize::from)
37 .unwrap_or(1),
38 ),
39 }
40 }
41
42 pub async fn launch(
43 &self,
44 ws: impl LiveViewSocket,
45 app: fn() -> Element,
46 ) -> Result<(), LiveViewError> {
47 self.launch_with_props(ws, |app| app(), app).await
48 }
49
50 pub async fn launch_with_props<T: Clone + Send + 'static>(
51 &self,
52 ws: impl LiveViewSocket,
53 app: fn(T) -> Element,
54 props: T,
55 ) -> Result<(), LiveViewError> {
56 self.launch_virtualdom(ws, move || VirtualDom::new_with_props(app, props))
57 .await
58 }
59
60 pub async fn launch_virtualdom<F: FnOnce() -> VirtualDom + Send + 'static>(
61 &self,
62 ws: impl LiveViewSocket,
63 make_app: F,
64 ) -> Result<(), LiveViewError> {
65 match self.pool.spawn_pinned(move || run(make_app(), ws)).await {
66 Ok(Ok(_)) => Ok(()),
67 Ok(Err(e)) => Err(e),
68 Err(_) => Err(LiveViewError::SendingFailed),
69 }
70 }
71}
72
73pub trait LiveViewSocket:
101 SinkExt<Vec<u8>, Error = LiveViewError>
102 + StreamExt<Item = Result<Vec<u8>, LiveViewError>>
103 + Send
104 + 'static
105{
106}
107
108impl<S> LiveViewSocket for S where
109 S: SinkExt<Vec<u8>, Error = LiveViewError>
110 + StreamExt<Item = Result<Vec<u8>, LiveViewError>>
111 + Send
112 + 'static
113{
114}
115
116pub async fn run(mut vdom: VirtualDom, ws: impl LiveViewSocket) -> Result<(), LiveViewError> {
124 #[cfg(all(feature = "devtools", debug_assertions))]
125 let mut hot_reload_rx = {
126 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
127 dioxus_devtools::connect(move |template| _ = tx.send(template));
128 rx
129 };
130
131 let mut mutations = MutationState::default();
132
133 let (query_tx, mut query_rx) = tokio::sync::mpsc::unbounded_channel();
135 let query_engine = QueryEngine::new(query_tx);
136 vdom.runtime().in_scope(ScopeId::ROOT, || {
137 provide_context(query_engine.clone());
138 init_document();
139 });
140
141 pin_mut!(ws);
143
144 if let Some(edits) = {
145 vdom.rebuild(&mut mutations);
146 take_edits(&mut mutations)
147 } {
148 ws.send(edits).await?;
150 }
151
152 #[derive(serde::Deserialize, Debug)]
155 #[serde(tag = "method", content = "params")]
156 enum IpcMessage {
157 #[serde(rename = "user_event")]
158 Event(Box<HtmlEvent>),
159 #[serde(rename = "query")]
160 Query(QueryResult),
161 }
162
163 loop {
164 #[cfg(all(feature = "devtools", debug_assertions))]
165 let hot_reload_wait = hot_reload_rx.recv();
166 #[cfg(not(all(feature = "devtools", debug_assertions)))]
167 let hot_reload_wait: std::future::Pending<Option<()>> = std::future::pending();
168
169 tokio::select! {
170 _ = vdom.wait_for_work() => {}
172
173 evt = ws.next() => {
174 match evt.as_ref().map(|o| o.as_deref()) {
175 Some(Ok(b"__ping__")) => {
177 ws.send(text_frame("__pong__")).await?;
178 }
179 Some(Ok(evt)) => {
180 if let Ok(message) = serde_json::from_str::<IpcMessage>(&String::from_utf8_lossy(evt)) {
181 match message {
182 IpcMessage::Event(evt) => {
183 let event = if let EventData::Mounted = &evt.data {
185 let element = LiveviewElement::new(evt.element, query_engine.clone());
186 Event::new(
187 Rc::new(PlatformEventData::new(Box::new(element))) as Rc<dyn Any>,
188 evt.bubbles,
189 )
190 } else {
191 Event::new(
192 evt.data.into_any(),
193 evt.bubbles,
194 )
195 };
196 vdom.runtime().handle_event(
197 &evt.name,
198 event,
199 evt.element,
200 );
201 }
202 IpcMessage::Query(result) => {
203 query_engine.send(result);
204 },
205 }
206 }
207 }
208 Some(Err(_e)) => {}
210 None => return Ok(()),
211 }
212 }
213
214 Some(query) = query_rx.recv() => {
216 ws.send(text_frame(&serde_json::to_string(&ClientUpdate::Query(query)).unwrap())).await?;
217 }
218
219 Some(msg) = hot_reload_wait => {
220 #[cfg(all(feature = "devtools", debug_assertions))]
221 match msg {
222 dioxus_devtools::DevserverMsg::HotReload(msg)=> {
223 dioxus_devtools::apply_changes(&vdom, &msg);
224 }
225 dioxus_devtools::DevserverMsg::Shutdown => {
226 std::process::exit(0);
227 },
228 dioxus_devtools::DevserverMsg::FullReloadCommand
229 | dioxus_devtools::DevserverMsg::FullReloadStart
230 | dioxus_devtools::DevserverMsg::FullReloadFailed => {
231 },
234 _ => {}
235 }
236 #[cfg(not(all(feature = "devtools", debug_assertions)))]
237 let () = msg;
238 }
239 }
240
241 vdom.render_immediate(&mut mutations);
243
244 if let Some(edits) = take_edits(&mut mutations) {
245 ws.send(edits).await?;
246 }
247 }
248}
249
250fn text_frame(text: &str) -> Vec<u8> {
251 let mut bytes = vec![0];
252 bytes.extend(text.as_bytes());
253 bytes
254}
255
256fn take_edits(mutations: &mut MutationState) -> Option<Vec<u8>> {
257 let mut bytes = vec![1];
259 mutations.write_memory_into(&mut bytes);
260 (bytes.len() > 1).then_some(bytes)
261}
262
263#[derive(Serialize)]
264#[serde(tag = "type", content = "data")]
265enum ClientUpdate {
266 #[serde(rename = "query")]
267 Query(String),
268}