bililive_core/retry/
mod.rs1use std::future::Future;
4use std::io;
5use std::io::ErrorKind;
6use std::marker::PhantomData;
7use std::pin::Pin;
8
9use futures::SinkExt;
10use futures::{Sink, Stream};
11use stream_reconnect::UnderlyingStream;
12
13pub use config::RetryConfig;
14pub use context::RetryContext;
15pub use policy::BEBIterator;
16
17use crate::errors::StreamError;
18use crate::packet::Packet;
19
20mod config;
21mod context;
22mod policy;
23
24#[cfg(feature = "not-send")]
32pub trait WsStreamTrait<E> {
33 type Stream: Stream<Item = Result<Packet, StreamError<E>>>
35 + Sink<Packet, Error = StreamError<E>>
36 + Unpin
37 + Sized;
38 fn connect(url: &str) -> Pin<Box<dyn Future<Output = Result<Self::Stream, E>> + '_>>;
43}
44
45#[cfg(not(feature = "not-send"))]
46pub trait WsStreamTrait<E> {
47 type Stream: Stream<Item = Result<Packet, StreamError<E>>>
49 + Sink<Packet, Error = StreamError<E>>
50 + Unpin
51 + Sized
52 + Send;
53 fn connect(url: &str) -> Pin<Box<dyn Future<Output = Result<Self::Stream, E>> + Send + '_>>;
58}
59
60#[derive(Debug, Default)]
64pub struct WsStream<T: WsStreamTrait<E>, E>(PhantomData<(T, E)>);
65
66impl<T, E> WsStream<T, E>
67where
68 T: WsStreamTrait<E>,
69{
70 pub async fn connect(url: &str) -> Result<T::Stream, E> {
75 T::connect(url).await
76 }
77}
78
79#[allow(clippy::type_complexity)]
80impl<T, E> UnderlyingStream<RetryContext, Result<Packet, StreamError<E>>, StreamError<E>>
81 for WsStream<T, E>
82where
83 T: WsStreamTrait<E>,
84 E: std::error::Error,
85{
86 type Stream = T::Stream;
87
88 #[cfg(feature = "not-send")]
89 fn establish(
90 mut ctor_arg: RetryContext,
91 ) -> Pin<Box<dyn Future<Output = Result<Self::Stream, StreamError<E>>>>> {
92 Box::pin(async move {
93 let server = ctor_arg.get();
94 let mut ws = Self::connect(server)
95 .await
96 .map_err(StreamError::from_ws_error)?;
97 ws.send(Packet::new_room_enter(ctor_arg.config())).await?;
98 Ok(ws)
99 })
100 }
101
102 #[cfg(not(feature = "not-send"))]
103 fn establish(
104 mut ctor_arg: RetryContext,
105 ) -> Pin<Box<dyn Future<Output = Result<Self::Stream, StreamError<E>>> + Send>> {
106 Box::pin(async move {
107 let server = ctor_arg.get();
108 let mut ws = Self::connect(server)
109 .await
110 .map_err(StreamError::from_ws_error)?;
111 ws.send(Packet::new_room_enter(ctor_arg.config())).await?;
112 Ok(ws)
113 })
114 }
115
116 fn is_write_disconnect_error(err: &StreamError<E>) -> bool {
117 matches!(err, StreamError::WebSocket(_) | StreamError::IO(_))
118 }
119
120 fn is_read_disconnect_error(item: &Result<Packet, StreamError<E>>) -> bool {
121 if let Err(e) = item {
122 Self::is_write_disconnect_error(e)
123 } else {
124 false
125 }
126 }
127
128 fn exhaust_err() -> StreamError<E> {
129 StreamError::IO(io::Error::new(
130 ErrorKind::NotConnected,
131 "Disconnected. Connection attempts have been exhausted.",
132 ))
133 }
134}