arpy_reqwasm/
eventsource.rs1use std::{
2 marker::PhantomData,
3 pin::Pin,
4 task::{self, Poll},
5};
6
7use arpy::{protocol, ServerSentEvents};
8use async_trait::async_trait;
9use futures::Stream;
10use gloo_net::eventsource::futures::{EventSource, EventSourceSubscription};
11use pin_project::pin_project;
12use serde::de::DeserializeOwned;
13use web_sys::MessageEvent;
14
15use crate::Error;
16
17#[derive(Clone)]
18pub struct Connection(String);
19
20impl Connection {
21 pub fn new(url: impl Into<String>) -> Self {
22 Self(url.into())
23 }
24}
25
26#[async_trait(?Send)]
27impl ServerSentEvents for Connection {
28 type Error = Error;
29 type Output<Item: DeserializeOwned> = SubscriptionMessage<Item>;
30
31 async fn subscribe<T>(&self) -> Result<Self::Output<T>, Self::Error>
32 where
33 T: DeserializeOwned + protocol::MsgId,
34 {
35 let mut source = EventSource::new(&self.0).map_err(Error::send)?;
36 let subscription = source.subscribe(T::ID).map_err(Error::send)?;
37
38 Ok(SubscriptionMessage {
39 source,
40 subscription,
41 phantom: PhantomData,
42 })
43 }
44}
45
46#[pin_project]
47pub struct SubscriptionMessage<Item> {
48 source: EventSource,
49 #[pin]
50 subscription: EventSourceSubscription,
51 phantom: PhantomData<Item>,
52}
53
54impl<Item: DeserializeOwned> Stream for SubscriptionMessage<Item> {
55 type Item = Result<Item, Error>;
56
57 fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
58 self.project().subscription.poll_next(cx).map(|result| {
59 result.map(|result| {
60 let (_id, msg) = result.map_err(Error::receive)?;
61 deserialize_message(&msg)
62 })
63 })
64 }
65}
66
67fn deserialize_message<T: DeserializeOwned>(msg: &MessageEvent) -> Result<T, Error> {
68 serde_json::from_str(
69 &msg.data()
70 .as_string()
71 .ok_or_else(|| Error::deserialize_result("No message data"))?,
72 )
73 .map_err(Error::deserialize_result)
74}