bililive_core/retry/
mod.rs

1//! Traits and types used by retry mechanism.
2
3use std::future::Future;
4use std::io;
5use std::io::ErrorKind;
6use std::marker::PhantomData;
7use std::pin::Pin;
8
9use futures::SinkExt;
10use futures::{Sink, Stream};
11use stream_reconnect::UnderlyingStream;
12
13pub use config::RetryConfig;
14pub use context::RetryContext;
15pub use policy::BEBIterator;
16
17use crate::errors::StreamError;
18use crate::packet::Packet;
19
20mod config;
21mod context;
22mod policy;
23
24/// Trait of helper objects to connect bilibili websocket server.
25///
26/// This trait is used when constructing normal bililive streams or auto-retry bililive streams.
27///
28/// An implementation of `WsStreamTrait` takes in a ws server url and decodes the data into a stream
29/// of [`Packet`](crate::packet::Packet) with heartbeat auto-response mechanism implemented
30/// (see [`HeartbeatStream`](crate::stream::HeartbeatStream) for details).
31#[cfg(feature = "not-send")]
32pub trait WsStreamTrait<E> {
33    /// The returned stream type.
34    type Stream: Stream<Item = Result<Packet, StreamError<E>>>
35        + Sink<Packet, Error = StreamError<E>>
36        + Unpin
37        + Sized;
38    /// Connect to bilibili websocket server.
39    ///
40    /// # Errors
41    /// Returns an error when websocket connection fails.
42    fn connect(url: &str) -> Pin<Box<dyn Future<Output = Result<Self::Stream, E>> + '_>>;
43}
44
45#[cfg(not(feature = "not-send"))]
46pub trait WsStreamTrait<E> {
47    /// The returned stream type.
48    type Stream: Stream<Item = Result<Packet, StreamError<E>>>
49        + Sink<Packet, Error = StreamError<E>>
50        + Unpin
51        + Sized
52        + Send;
53    /// Connect to bilibili websocket server.
54    ///
55    /// # Errors
56    /// Returns an error when websocket connection fails.
57    fn connect(url: &str) -> Pin<Box<dyn Future<Output = Result<Self::Stream, E>> + Send + '_>>;
58}
59
60/// Wrapper for types implementing `WsStreamTrait`.
61///
62/// This type is used to avoid the orphan rule. Exposed for stream type construction.
63#[derive(Debug, Default)]
64pub struct WsStream<T: WsStreamTrait<E>, E>(PhantomData<(T, E)>);
65
66impl<T, E> WsStream<T, E>
67where
68    T: WsStreamTrait<E>,
69{
70    /// Connect to bilibili websocket server.
71    ///
72    /// # Errors
73    /// Returns an error when websocket connection fails.
74    pub async fn connect(url: &str) -> Result<T::Stream, E> {
75        T::connect(url).await
76    }
77}
78
79#[allow(clippy::type_complexity)]
80impl<T, E> UnderlyingStream<RetryContext, Result<Packet, StreamError<E>>, StreamError<E>>
81    for WsStream<T, E>
82where
83    T: WsStreamTrait<E>,
84    E: std::error::Error,
85{
86    type Stream = T::Stream;
87
88    #[cfg(feature = "not-send")]
89    fn establish(
90        mut ctor_arg: RetryContext,
91    ) -> Pin<Box<dyn Future<Output = Result<Self::Stream, StreamError<E>>>>> {
92        Box::pin(async move {
93            let server = ctor_arg.get();
94            let mut ws = Self::connect(server)
95                .await
96                .map_err(StreamError::from_ws_error)?;
97            ws.send(Packet::new_room_enter(ctor_arg.config())).await?;
98            Ok(ws)
99        })
100    }
101
102    #[cfg(not(feature = "not-send"))]
103    fn establish(
104        mut ctor_arg: RetryContext,
105    ) -> Pin<Box<dyn Future<Output = Result<Self::Stream, StreamError<E>>> + Send>> {
106        Box::pin(async move {
107            let server = ctor_arg.get();
108            let mut ws = Self::connect(server)
109                .await
110                .map_err(StreamError::from_ws_error)?;
111            ws.send(Packet::new_room_enter(ctor_arg.config())).await?;
112            Ok(ws)
113        })
114    }
115
116    fn is_write_disconnect_error(err: &StreamError<E>) -> bool {
117        matches!(err, StreamError::WebSocket(_) | StreamError::IO(_))
118    }
119
120    fn is_read_disconnect_error(item: &Result<Packet, StreamError<E>>) -> bool {
121        if let Err(e) = item {
122            Self::is_write_disconnect_error(e)
123        } else {
124            false
125        }
126    }
127
128    fn exhaust_err() -> StreamError<E> {
129        StreamError::IO(io::Error::new(
130            ErrorKind::NotConnected,
131            "Disconnected. Connection attempts have been exhausted.",
132        ))
133    }
134}