pikav_web/
client.rs

1use anyhow::Result;
2use cfg_if::cfg_if;
3use futures::Future;
4use gloo_net::http::Headers;
5use pikav::Event;
6use serde_json::Value;
7
8cfg_if! {
9    if #[cfg(feature = "hydrate")] {
10        use std::{cell::RefCell, pin::Pin, rc::Rc};
11        use std::{
12            collections::HashSet,
13            sync::atomic::{AtomicUsize, Ordering},
14        };
15        use futures::{future::BoxFuture, StreamExt};
16        use gloo_net::{
17            eventsource::futures::EventSource,
18            http::{Request, Response},
19        };
20        use log::error;
21        use wasm_bindgen_futures::spawn_local;
22
23        type HeadersFut = Box<dyn Fn() -> Pin<Box<dyn Future<Output = Result<Headers>>>>>;
24        type ListenerFut = Box<dyn Fn(Event<Value, Value>) -> BoxFuture<'static, ()>>;
25    }
26}
27
28cfg_if! {
29    if #[cfg(feature = "hydrate")] {
30        #[derive(Clone)]
31        pub struct Client {
32            id: Rc<RefCell<Option<String>>>,
33            source_url: String,
34            source: Rc<RefCell<Option<EventSource>>>,
35            endpoint: String,
36            namespace: String,
37            next_listener_id: Rc<AtomicUsize>,
38            get_headers: Rc<RefCell<Option<HeadersFut>>>,
39            listeners: Rc<RefCell<Vec<(usize, String, ListenerFut)>>>,
40        }
41    } else {
42        #[derive(Clone)]
43        pub struct Client {
44            endpoint: String,
45            namespace: String,
46        }
47    }
48}
49
50impl Client {
51    pub fn new(endpoint: impl Into<String>) -> Self {
52        let endpoint = endpoint.into();
53
54        cfg_if! {
55            if #[cfg(feature = "hydrate")] {
56                Self {
57                    id: Rc::default(),
58                    get_headers: Rc::default(),
59                    next_listener_id: Rc::default(),
60                    listeners: Rc::default(),
61                    source: Rc::default(),
62                    source_url: format!("{endpoint}/events"),
63                    endpoint,
64                    namespace: "_".to_owned(),
65                }
66            } else {
67                Self {
68                    endpoint,
69                    namespace: "_".to_owned(),
70                }
71            }
72        }
73    }
74
75    pub fn run(self) -> Result<Self> {
76        cfg_if! {
77            if #[cfg(feature = "hydrate")] {
78                let mut source = gloo_net::eventsource::futures::EventSource::new(&self.source_url)?;
79                let mut stream = source.subscribe("message")?;
80                *self.source.borrow_mut() = Some(source);
81                let id = self.id.clone();
82                let listeners = self.listeners.clone();
83                let fetcher = Fetcher::from(&self);
84
85                spawn_local(async move {
86                    while let Some(Ok((_, msg))) = stream.next().await {
87                        if msg.data().as_string() == Some("ping".to_owned()) {
88                            continue;
89                        }
90
91                        let data = match msg.data().as_string() {
92                            Some(data) => data,
93                            _ => {
94                                error!("invalid type: {:?}", msg.data());
95                                continue;
96                            }
97                        };
98
99                        let event = match serde_json::from_str::<Event<Value, Value>>(&data) {
100                            Ok(data) => data,
101                            Err(e) => {
102                                error!("invalid type: {:?}", e);
103                                continue;
104                            }
105                        };
106
107                        if matches!(
108                            (event.topic.as_ref(), event.name.as_ref()),
109                            ("$SYS/session", "Created")
110                        ) {
111                            *id.borrow_mut() = event.data.as_str().map(|v| v.to_owned());
112
113                            let mut subscribed = HashSet::new();
114
115                            if let Some(client_id) = event.data.as_str() {
116                                let filters = {
117                                    listeners
118                                        .borrow()
119                                        .iter()
120                                        .map(|(_, f, _)| f.to_owned())
121                                        .collect::<Vec<_>>()
122                                };
123
124                                for filter in filters {
125                                    if subscribed.contains(&filter) {
126                                        continue;
127                                    }
128
129                                    if let Err(e) = fetcher.fetch(client_id, "subscribe", &filter).await {
130                                        error!("{e}");
131                                    }
132
133                                    subscribed.insert(filter);
134                                }
135                            }
136                        }
137
138                        let listeners_fut = {
139                            let mut listeners_fut = Vec::new();
140                            for (_, filter, listener) in listeners.borrow().iter() {
141                                let filters = match &event.filters {
142                                    Some(v) => v,
143                                    _ => continue,
144                                };
145
146                                if filters.iter().any(|f| f == filter) {
147                                    listeners_fut.push(listener(event.clone()));
148                                }
149                            }
150                            listeners_fut
151                        };
152
153                        futures::future::join_all(listeners_fut).await;
154                    }
155                });
156            }
157        }
158
159        Ok(self)
160    }
161
162    pub fn endpoint(mut self, v: impl Into<String>) -> Self {
163        self.endpoint = v.into();
164
165        self
166    }
167
168    pub fn namespace(mut self, v: impl Into<String>) -> Self {
169        self.namespace = v.into();
170
171        self
172    }
173
174    pub fn close(&self) {
175        cfg_if! {
176            if #[cfg(feature = "hydrate")] {
177                if let Some(source) = self.source.borrow().as_ref() {
178                    source.clone().close();
179                }
180            }
181        }
182    }
183
184    cfg_if! {
185        if #[cfg(feature = "hydrate")] {
186            pub fn get_headers<Fu>(self, cb: impl Fn() -> Fu + 'static) -> Self
187            where
188                Fu: Future<Output = Result<Headers>> + 'static,
189            {
190                        let get_headers = self.get_headers.clone();
191                        *get_headers.borrow_mut() = Some(Box::new(move || Box::pin(cb())));
192
193                self
194            }
195        } else {
196            pub fn get_headers<Fu>(self, _cb: impl Fn() -> Fu + 'static) -> Self
197            where
198                Fu: Future<Output = Result<Headers>> + 'static,
199            {
200                self
201            }
202        }
203    }
204    cfg_if! {
205        if #[cfg(feature = "hydrate")] {
206            pub fn subscribe<Fu>(
207                &self,
208                filter: impl Into<String>,
209                listener: impl Fn(Event<Value, Value>) -> Fu + 'static,
210            ) -> impl FnOnce()
211            where
212                Fu: Future<Output = ()> + 'static + Send,
213            {
214                let filter = format!("{}/{}", self.namespace, filter.into());
215                let id = self.next_listener_id.fetch_add(1, Ordering::Relaxed);
216                let listeners = self.listeners.clone();
217
218                listeners
219                    .borrow_mut()
220                    .push((id, filter.clone(), Box::new(move |e| Box::pin(listener(e)))));
221
222                let total_filters = listeners
223                    .borrow()
224                    .iter()
225                    .filter(|(_, f, _)| f == &filter)
226                    .count();
227
228                let fetcher = Fetcher::from(self);
229
230                if let (Some(client_id), 1) = (self.id.borrow().to_owned(), total_filters) {
231                    let filter = filter.clone();
232                    let fetcher = fetcher.clone();
233
234                    spawn_local(async move {
235                        if let Err(e) = fetcher.fetch(&client_id, "subscribe", &filter).await {
236                            error!("{e}");
237                        }
238                    });
239                }
240
241                let client_id = self.id.clone();
242
243                move || {
244                    listeners.borrow_mut().retain(|l| l.0 != id);
245
246                    let total_filters = listeners
247                        .borrow()
248                        .iter()
249                        .filter(|(_, f, _)| f == &filter)
250                        .count();
251
252                    if total_filters > 0 {
253                        return;
254                    }
255
256                    if let Some(client_id) = client_id.borrow().to_owned() {
257                        spawn_local(async move {
258                            if let Err(e) = fetcher.fetch(&client_id, "unsubscribe", &filter).await {
259                                error!("{e}");
260                            }
261                        });
262                    }
263                }
264            }
265        }
266        else {
267            pub fn subscribe<Fu>(
268                &self,
269                _filter: impl Into<String>,
270                _listener: impl Fn(Event<Value, Value>) -> Fu + 'static,
271            ) -> impl FnOnce()
272            where
273                Fu: Future<Output = ()> + 'static + Send,
274            {
275                move || {}
276            }
277        }
278    }
279}
280
281cfg_if! {
282    if #[cfg(feature = "hydrate")] {
283        #[derive(Clone)]
284        struct Fetcher {
285            endpoint: String,
286            get_headers: Rc<RefCell<Option<HeadersFut>>>,
287        }
288
289        impl Fetcher {
290            pub async fn fetch(
291                &self,
292                client_id: &str,
293                action: impl Into<String>,
294                filter: &str,
295            ) -> Result<Response> {
296                let filter = filter.to_string();
297                let mut req = Request::put(&format!("{}/{}/{}", self.endpoint, action.into(), filter));
298                let get_headers = { self.get_headers.borrow().as_ref().map(|f| f()) };
299
300                if let Some(get_headers) = get_headers {
301                    let headers = get_headers.await?;
302                    req = req.headers(headers);
303                }
304
305                let res = req
306                    .header("Accept", "application/json")
307                    .header("Content-Type", "application/json")
308                    .header("X-Pikav-Client-ID", client_id)
309                    .send()
310                    .await?;
311
312                Ok(res)
313            }
314        }
315
316        impl From<&Client> for Fetcher {
317            fn from(value: &Client) -> Self {
318                Self {
319                    endpoint: value.endpoint.to_owned(),
320                    get_headers: value.get_headers.clone(),
321                }
322            }
323        }
324    }
325}