px_wsdom_core/
link.rs

1use alloc::{
2    borrow::ToOwned, collections::{BTreeMap, VecDeque}, string::String, sync::Arc
3};
4use core::task::{Poll, Waker};
5use hashbrown::HashMap;
6use spin::Mutex;
7
8use futures_core::Stream;
9
10use crate::js_types::JsValue;
11
12/// A WSDOM client.
13///
14/// You can use this to call JS functions on the JS client (the web browser).
15/// Every JsValue holds a Browser object which they internally use for calling methods, etc.
16///
17/// Browser uses Arc internally, so cloning is cheap and a cloned Browser points to the same client.
18///
19/// ## Use with Integration Library
20///
21/// You can obtain Browser from the WSDOM integration library (for example, `wsdom-axum`).
22///
23/// ## Manual Usage
24///
25/// If there is no WSDOM integration library for your framework,
26/// you can instead create Browser manually with the `new()` method.
27///
28/// Manually created Browsers need to be "driven"
29/// -   Browser implements the [Stream][futures_core::Stream] trait with [String].
30///     You must take items from the stream and send it to the WSDOM JS client
31///     over WebSocket or other transport of your choice.
32/// -   Browser has a `receive_incoming_message(msg: String)` method.
33///     Everything sent by the WSDOM JS client must be fed into this method.
34///
35/// The `counter-manual` example in our repo shows manual usage with Tokio.
36#[derive(Clone, Debug)]
37pub struct Browser(pub(crate) Arc<Mutex<BrowserInternal>>);
38
39impl Browser {
40    /// Create a new Browser object.
41    ///
42    /// This is only needed if you intend to go the "manual" route described above.
43    pub fn new() -> Self {
44        let link = BrowserInternal {
45            retrievals: HashMap::new(),
46            last_id: 1,
47            commands_buf: String::new(),
48            outgoing_waker: None,
49            dead: ErrorState::NoError,
50            imports: BTreeMap::new(),
51            rpc_state: BTreeMap::new(),
52            pure_values: BTreeMap::new(),
53        };
54        Self(Arc::new(Mutex::new(link)))
55    }
56    /// Receive a message sent from the WSDOM JS client.
57    ///
58    /// This is only needed if you intend to go the "manual" route described above.
59    /// If you use an integration library, messages are handled automatically.
60    pub fn receive_incoming_message(&self, message: String) {
61        self.0.lock().receive(message);
62    }
63    /// If the Browser has errored, this will return the error.
64    ///
65    /// The [Error] type is not [Clone], so after the first call returning `Some(_)`,
66    /// this method will return `None`.
67    pub fn take_error(&self) -> Option<Error> {
68        let mut link = self.0.lock();
69        match core::mem::replace(&mut link.dead, ErrorState::ErrorTaken) {
70            ErrorState::NoError => {
71                link.dead = ErrorState::NoError;
72                None
73            }
74            ErrorState::Error(e) => Some(e),
75            ErrorState::ErrorTaken => None,
76        }
77    }
78}
79
80/// The stream of messages that should be sent over WebSocket (or your transport of choice) to the JavaScript WSDOM client.
81impl futures_core::Stream for Browser {
82    type Item = String;
83
84    fn poll_next(
85        self: core::pin::Pin<&mut Self>,
86        cx: &mut core::task::Context<'_>,
87    ) -> Poll<Option<Self::Item>> {
88        let this = self.get_mut();
89        let mut link = this.0.lock();
90
91        if !matches!(&link.dead, ErrorState::NoError) {
92            return Poll::Ready(None);
93        }
94
95        let new_waker = cx.waker();
96        if !link
97            .outgoing_waker
98            .as_ref()
99            .is_some_and(|w| new_waker.will_wake(w))
100        {
101            link.outgoing_waker = Some(new_waker.to_owned());
102        }
103        if !link.commands_buf.is_empty() {
104            Poll::Ready(Some(core::mem::take(&mut link.commands_buf)))
105        } else {
106            Poll::Pending
107        }
108    }
109}
110#[derive(Debug)]
111pub struct RpcCell {
112    pub waker: Waker,
113    pub queue: VecDeque<String>,
114}
115#[derive(Clone, Debug)]
116pub struct RpcCellAM(pub Arc<Mutex<RpcCell>>);
117
118impl Stream for RpcCellAM {
119    type Item = String;
120
121    fn poll_next(
122        self: core::pin::Pin<&mut Self>,
123        cx: &mut core::task::Context<'_>,
124    ) -> Poll<Option<Self::Item>> {
125        let this = self.get_mut();
126        let mut lock = this.0.lock();
127        match lock.queue.pop_front() {
128            Some(v) => return Poll::Ready(Some(v)),
129            None => {}
130        };
131
132        let new_waker = cx.waker();
133        if !lock.waker.will_wake(new_waker) {
134            lock.waker = new_waker.to_owned();
135        };
136
137        return Poll::Pending;
138    }
139}
140
141#[derive(Debug)]
142pub struct BrowserInternal {
143    pub(crate) retrievals: HashMap<u64, RetrievalState>,
144    last_id: u64,
145    commands_buf: String,
146    outgoing_waker: Option<Waker>,
147    dead: ErrorState,
148    pub(crate) imports: BTreeMap<String, JsValue>,
149    pub(crate) rpc_state: BTreeMap<String, RpcCellAM>,
150    pub(crate) pure_values: BTreeMap<String, JsValue>,
151}
152
153/// Error that could happen in WSDOM.
154///
155/// Currently, the only errors that could happen are from [serde] serialization and deserialization.
156#[derive(Debug)]
157pub enum Error {
158    CommandSerialize(core::fmt::Error),
159    DataDeserialize(serde_json::Error),
160}
161#[derive(Debug)]
162enum ErrorState {
163    NoError,
164    Error(Error),
165    ErrorTaken,
166}
167
168#[derive(Debug)]
169pub(crate) struct RetrievalState {
170    pub(crate) waker: Waker,
171    pub(crate) last_value: String,
172    pub(crate) times: usize,
173}
174
175impl BrowserInternal {
176    pub fn receive(&mut self, message: String) {
177        if let Some(message) = message.strip_prefix("p") {
178            match message
179                .split_once(':')
180                .and_then(|(id, _)| id.parse::<u64>().ok())
181            {
182                Some(id) => match self.retrievals.get_mut(&id) {
183                    Some(s) => {
184                        s.times += 1;
185                        s.last_value = message.to_owned();
186                        s.waker.wake_by_ref();
187                    }
188                    _ => {}
189                },
190                None => {}
191            }
192        }
193        if let Some(message) = message.strip_prefix("r") {
194            match message.split_once(':') {
195                Some((id, v)) => match self.rpc_state.get(id) {
196                    Some(s) => {
197                        let mut s = s.0.lock();
198                        s.queue.push_back(v.to_owned());
199                        s.waker.wake_by_ref();
200                    }
201                    _ => {}
202                },
203                None => {}
204            }
205        }
206    }
207    pub fn raw_commands_buf(&mut self) -> &mut String {
208        &mut self.commands_buf
209    }
210    pub(crate) fn get_new_id(&mut self) -> u64 {
211        self.last_id += 1;
212        self.last_id
213    }
214    pub(crate) fn kill(&mut self, err: Error) {
215        if matches!(self.dead, ErrorState::NoError) {
216            self.dead = ErrorState::Error(err);
217        }
218    }
219    pub(crate) fn wake_outgoing(&mut self) {
220        if let Some(waker) = self.outgoing_waker.as_ref() {
221            waker.wake_by_ref();
222        }
223    }
224    pub(crate) fn wake_outgoing_lazy(&mut self) {
225        self.wake_outgoing();
226    }
227}
228
229struct InvalidReturn;
230impl core::fmt::Debug for InvalidReturn {
231    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
232        f.debug_struct("InvalidReturn").finish()
233    }
234}
235impl core::fmt::Display for InvalidReturn {
236    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
237        core::fmt::Debug::fmt(self, f)
238    }
239}
240impl core::error::Error for InvalidReturn {}