arpy_reqwasm/
eventsource.rs

1use std::{
2    marker::PhantomData,
3    pin::Pin,
4    task::{self, Poll},
5};
6
7use arpy::{protocol, ServerSentEvents};
8use async_trait::async_trait;
9use futures::Stream;
10use gloo_net::eventsource::futures::{EventSource, EventSourceSubscription};
11use pin_project::pin_project;
12use serde::de::DeserializeOwned;
13use web_sys::MessageEvent;
14
15use crate::Error;
16
17#[derive(Clone)]
18pub struct Connection(String);
19
20impl Connection {
21    pub fn new(url: impl Into<String>) -> Self {
22        Self(url.into())
23    }
24}
25
26#[async_trait(?Send)]
27impl ServerSentEvents for Connection {
28    type Error = Error;
29    type Output<Item: DeserializeOwned> = SubscriptionMessage<Item>;
30
31    async fn subscribe<T>(&self) -> Result<Self::Output<T>, Self::Error>
32    where
33        T: DeserializeOwned + protocol::MsgId,
34    {
35        let mut source = EventSource::new(&self.0).map_err(Error::send)?;
36        let subscription = source.subscribe(T::ID).map_err(Error::send)?;
37
38        Ok(SubscriptionMessage {
39            source,
40            subscription,
41            phantom: PhantomData,
42        })
43    }
44}
45
46#[pin_project]
47pub struct SubscriptionMessage<Item> {
48    source: EventSource,
49    #[pin]
50    subscription: EventSourceSubscription,
51    phantom: PhantomData<Item>,
52}
53
54impl<Item: DeserializeOwned> Stream for SubscriptionMessage<Item> {
55    type Item = Result<Item, Error>;
56
57    fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
58        self.project().subscription.poll_next(cx).map(|result| {
59            result.map(|result| {
60                let (_id, msg) = result.map_err(Error::receive)?;
61                deserialize_message(&msg)
62            })
63        })
64    }
65}
66
67fn deserialize_message<T: DeserializeOwned>(msg: &MessageEvent) -> Result<T, Error> {
68    serde_json::from_str(
69        &msg.data()
70            .as_string()
71            .ok_or_else(|| Error::deserialize_result("No message data"))?,
72    )
73    .map_err(Error::deserialize_result)
74}