dioxus_liveview/
pool.rs

1use crate::{
2    document::init_document,
3    element::LiveviewElement,
4    events::SerializedHtmlEventConverter,
5    query::{QueryEngine, QueryResult},
6    LiveViewError,
7};
8
9use dioxus_core::{provide_context, Element, Event, ScopeId, VirtualDom};
10use dioxus_html::{EventData, HtmlEvent, PlatformEventData};
11use dioxus_interpreter_js::MutationState;
12use futures_util::{pin_mut, SinkExt, StreamExt};
13use serde::Serialize;
14use std::{any::Any, rc::Rc};
15use tokio_util::task::LocalPoolHandle;
16
17#[derive(Clone)]
18pub struct LiveViewPool {
19    pub(crate) pool: LocalPoolHandle,
20}
21
22impl Default for LiveViewPool {
23    fn default() -> Self {
24        Self::new()
25    }
26}
27
28impl LiveViewPool {
29    pub fn new() -> Self {
30        // Set the event converter
31        dioxus_html::set_event_converter(Box::new(SerializedHtmlEventConverter));
32
33        LiveViewPool {
34            pool: LocalPoolHandle::new(
35                std::thread::available_parallelism()
36                    .map(usize::from)
37                    .unwrap_or(1),
38            ),
39        }
40    }
41
42    pub async fn launch(
43        &self,
44        ws: impl LiveViewSocket,
45        app: fn() -> Element,
46    ) -> Result<(), LiveViewError> {
47        self.launch_with_props(ws, |app| app(), app).await
48    }
49
50    pub async fn launch_with_props<T: Clone + Send + 'static>(
51        &self,
52        ws: impl LiveViewSocket,
53        app: fn(T) -> Element,
54        props: T,
55    ) -> Result<(), LiveViewError> {
56        self.launch_virtualdom(ws, move || VirtualDom::new_with_props(app, props))
57            .await
58    }
59
60    pub async fn launch_virtualdom<F: FnOnce() -> VirtualDom + Send + 'static>(
61        &self,
62        ws: impl LiveViewSocket,
63        make_app: F,
64    ) -> Result<(), LiveViewError> {
65        match self.pool.spawn_pinned(move || run(make_app(), ws)).await {
66            Ok(Ok(_)) => Ok(()),
67            Ok(Err(e)) => Err(e),
68            Err(_) => Err(LiveViewError::SendingFailed),
69        }
70    }
71}
72
73/// A LiveViewSocket is a Sink and Stream of Strings that Dioxus uses to communicate with the client
74///
75/// Most websockets from most HTTP frameworks can be converted into a LiveViewSocket using the appropriate adapter.
76///
77/// You can also convert your own socket into a LiveViewSocket by implementing this trait. This trait is an auto trait,
78/// meaning that as long as your type implements Stream and Sink, you can use it as a LiveViewSocket.
79///
80/// For example, the axum implementation is a really small transform:
81///
82/// ```rust, ignore
83/// pub fn axum_socket(ws: WebSocket) -> impl LiveViewSocket {
84///     ws.map(transform_rx)
85///         .with(transform_tx)
86///         .sink_map_err(|_| LiveViewError::SendingFailed)
87/// }
88///
89/// fn transform_rx(message: Result<Message, axum::Error>) -> Result<String, LiveViewError> {
90///     message
91///         .map_err(|_| LiveViewError::SendingFailed)?
92///         .into_text()
93///         .map_err(|_| LiveViewError::SendingFailed)
94/// }
95///
96/// async fn transform_tx(message: String) -> Result<Message, axum::Error> {
97///     Ok(Message::Text(message))
98/// }
99/// ```
100pub trait LiveViewSocket:
101    SinkExt<Vec<u8>, Error = LiveViewError>
102    + StreamExt<Item = Result<Vec<u8>, LiveViewError>>
103    + Send
104    + 'static
105{
106}
107
108impl<S> LiveViewSocket for S where
109    S: SinkExt<Vec<u8>, Error = LiveViewError>
110        + StreamExt<Item = Result<Vec<u8>, LiveViewError>>
111        + Send
112        + 'static
113{
114}
115
116/// The primary event loop for the VirtualDom waiting for user input
117///
118/// This function makes it easy to integrate Dioxus LiveView with any socket-based framework.
119///
120/// As long as your framework can provide a Sink and Stream of Bytes, you can use this function.
121///
122/// You might need to transform the error types of the web backend into the LiveView error type.
123pub async fn run(mut vdom: VirtualDom, ws: impl LiveViewSocket) -> Result<(), LiveViewError> {
124    #[cfg(all(feature = "devtools", debug_assertions))]
125    let mut hot_reload_rx = {
126        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
127        dioxus_devtools::connect(move |template| _ = tx.send(template));
128        rx
129    };
130
131    let mut mutations = MutationState::default();
132
133    // Create the a proxy for query engine
134    let (query_tx, mut query_rx) = tokio::sync::mpsc::unbounded_channel();
135    let query_engine = QueryEngine::new(query_tx);
136    vdom.runtime().in_scope(ScopeId::ROOT, || {
137        provide_context(query_engine.clone());
138        init_document();
139    });
140
141    // pin the futures so we can use select!
142    pin_mut!(ws);
143
144    if let Some(edits) = {
145        vdom.rebuild(&mut mutations);
146        take_edits(&mut mutations)
147    } {
148        // send the initial render to the client
149        ws.send(edits).await?;
150    }
151
152    // desktop uses this wrapper struct thing around the actual event itself
153    // this is sorta driven by tao/wry
154    #[derive(serde::Deserialize, Debug)]
155    #[serde(tag = "method", content = "params")]
156    enum IpcMessage {
157        #[serde(rename = "user_event")]
158        Event(Box<HtmlEvent>),
159        #[serde(rename = "query")]
160        Query(QueryResult),
161    }
162
163    loop {
164        #[cfg(all(feature = "devtools", debug_assertions))]
165        let hot_reload_wait = hot_reload_rx.recv();
166        #[cfg(not(all(feature = "devtools", debug_assertions)))]
167        let hot_reload_wait: std::future::Pending<Option<()>> = std::future::pending();
168
169        tokio::select! {
170            // poll any futures or suspense
171            _ = vdom.wait_for_work() => {}
172
173            evt = ws.next() => {
174                match evt.as_ref().map(|o| o.as_deref()) {
175                    // respond with a pong every ping to keep the websocket alive
176                    Some(Ok(b"__ping__")) => {
177                        ws.send(text_frame("__pong__")).await?;
178                    }
179                    Some(Ok(evt)) => {
180                        if let Ok(message) = serde_json::from_str::<IpcMessage>(&String::from_utf8_lossy(evt)) {
181                            match message {
182                                IpcMessage::Event(evt) => {
183                                    // Intercept the mounted event and insert a custom element type
184                                    let event = if let EventData::Mounted = &evt.data {
185                                        let element = LiveviewElement::new(evt.element, query_engine.clone());
186                                        Event::new(
187                                            Rc::new(PlatformEventData::new(Box::new(element))) as Rc<dyn Any>,
188                                            evt.bubbles,
189                                        )
190                                    } else {
191                                        Event::new(
192                                            evt.data.into_any(),
193                                            evt.bubbles,
194                                        )
195                                    };
196                                    vdom.runtime().handle_event(
197                                        &evt.name,
198                                        event,
199                                        evt.element,
200                                    );
201                                }
202                                IpcMessage::Query(result) => {
203                                    query_engine.send(result);
204                                },
205                            }
206                        }
207                    }
208                    // log this I guess? when would we get an error here?
209                    Some(Err(_e)) => {}
210                    None => return Ok(()),
211                }
212            }
213
214            // handle any new queries
215            Some(query) = query_rx.recv() => {
216                ws.send(text_frame(&serde_json::to_string(&ClientUpdate::Query(query)).unwrap())).await?;
217            }
218
219            Some(msg) = hot_reload_wait => {
220                #[cfg(all(feature = "devtools", debug_assertions))]
221                match msg {
222                    dioxus_devtools::DevserverMsg::HotReload(msg)=> {
223                        dioxus_devtools::apply_changes(&vdom, &msg);
224                    }
225                    dioxus_devtools::DevserverMsg::Shutdown => {
226                        std::process::exit(0);
227                    },
228                    dioxus_devtools::DevserverMsg::FullReloadCommand
229                    | dioxus_devtools::DevserverMsg::FullReloadStart
230                    | dioxus_devtools::DevserverMsg::FullReloadFailed => {
231                        // usually only web gets this message - what are we supposed to do?
232                        // Maybe we could just binary patch ourselves in place without losing window state?
233                    },
234                    _ => {}
235                }
236                #[cfg(not(all(feature = "devtools", debug_assertions)))]
237                let () = msg;
238            }
239        }
240
241        // render the vdom
242        vdom.render_immediate(&mut mutations);
243
244        if let Some(edits) = take_edits(&mut mutations) {
245            ws.send(edits).await?;
246        }
247    }
248}
249
250fn text_frame(text: &str) -> Vec<u8> {
251    let mut bytes = vec![0];
252    bytes.extend(text.as_bytes());
253    bytes
254}
255
256fn take_edits(mutations: &mut MutationState) -> Option<Vec<u8>> {
257    // Add an extra one at the beginning to tell the shim this is a binary frame
258    let mut bytes = vec![1];
259    mutations.write_memory_into(&mut bytes);
260    (bytes.len() > 1).then_some(bytes)
261}
262
263#[derive(Serialize)]
264#[serde(tag = "type", content = "data")]
265enum ClientUpdate {
266    #[serde(rename = "query")]
267    Query(String),
268}