dachannel_client/
lib.rs

1#[derive(thiserror::Error, Debug)]
2pub enum Error {
3    #[error("dachannel: {0}")]
4    Dachannel(#[from] dachannel::Error),
5
6    #[error("reqwest: {0}")]
7    Reqwest(#[from] reqwest::Error),
8
9    #[error("malformed body")]
10    MalformedBody,
11}
12
13pub struct ConnectOptions {
14    authorization: Option<String>,
15}
16
17impl ConnectOptions {
18    pub fn new() -> Self {
19        Self {
20            authorization: None,
21        }
22    }
23
24    pub fn authorization(mut self, authorization: Option<String>) -> Self {
25        self.authorization = authorization;
26        self
27    }
28
29    /// Connect to a dachannel server.
30    pub async fn connect(
31        self,
32        cb: dachannel::ConnectionBuilder,
33        url: String,
34    ) -> Result<dachannel::Connection, Error> {
35        let conn = cb.build();
36
37        conn.set_local_description(dachannel::SdpType::Offer)
38            .await?;
39        let offer_sdp = conn.local_description()?.unwrap().sdp;
40
41        let client = reqwest::Client::new();
42        let mut req = client.post(url).body(offer_sdp);
43        if let Some(authorization) = self.authorization {
44            req = req.header(reqwest::header::AUTHORIZATION, authorization);
45        }
46        let res = req.send().await?.error_for_status()?;
47        let answer_sdp =
48            String::from_utf8(res.bytes().await?.to_vec()).map_err(|_| Error::MalformedBody)?;
49
50        conn.set_remote_description(&dachannel::Description {
51            type_: dachannel::SdpType::Answer,
52            sdp: answer_sdp,
53        })
54        .await?;
55
56        Ok(conn)
57    }
58}
59
60#[cfg(test)]
61mod test {
62    use super::*;
63
64    #[cfg(not(target_arch = "wasm32"))]
65    #[tokio::test]
66    pub async fn test_connect() {
67        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
68        let local_addr = listener.local_addr().unwrap();
69
70        let (serve_fut, connecting_rx) = dachannel_server::ServeOptions::new().serve(listener);
71
72        tokio::spawn(async move {
73            serve_fut.await.unwrap();
74        });
75
76        let client_jh = tokio::spawn(async move {
77            let config: dachannel::Configuration = Default::default();
78            let cb = dachannel::Connection::builder(config).unwrap();
79            let dc = cb
80                .create_data_channel(
81                    "test",
82                    dachannel::DataChannelOptions {
83                        negotiated: true,
84                        id: Some(1),
85                        ..Default::default()
86                    },
87                )
88                .unwrap();
89
90            let _conn = ConnectOptions::new()
91                .connect(cb, format!("http://127.0.0.1:{}", local_addr.port()))
92                .await
93                .unwrap();
94
95            dc.send(b"hello world").await.unwrap();
96        });
97
98        let connecting = connecting_rx.recv().await.unwrap();
99        let dc = connecting
100            .create_data_channel(
101                "test",
102                dachannel::DataChannelOptions {
103                    negotiated: true,
104                    id: Some(1),
105                    ..Default::default()
106                },
107            )
108            .unwrap();
109
110        let _conn = connecting.await.unwrap();
111        assert_eq!(dc.recv().await.unwrap(), b"hello world");
112
113        client_jh.await.unwrap();
114    }
115}