Skip to main content

fire_stream/util/
bg_task.rs

1/// Todo once async trait get's stabilized this shoud be refactored
2/// to be a function instead of a macro
3///
4/// ```ignore
5/// trait PacketStream {
6/// 	fn timeout(&self) -> Duration;
7///
8/// 	async fn send(&mut self, packet: P) -> io::Result<()>;
9///
10/// 	async receive(&mut self) -> Result<P, PacketReadError<P::Header>>;
11///
12/// 	async shutdown(&mut self) -> io::Result<()>;
13/// }
14/// ```
15
16
17macro_rules! bg_stream {
18	($name:ident, $handler:ty, $bytes:ty, $cfg:ty) => {
19		async fn $name<S, P>(
20			mut stream: PacketStream<S, P>,
21			handler: &mut $handler,
22			cfg_rx: &mut watch::Receiver<$cfg>,
23			mut close: &mut oneshot::Receiver<()>
24		) -> Result<(), TaskError>
25		where
26			S: ByteStream,
27			P: Packet<$bytes>
28		{
29			let mut should_close = false;
30			let mut close_packet = None;
31
32			let timeout = stream.timeout();
33			let diff = match timeout.as_secs() {
34				0..=1 => 0,
35				0..=10 => 1,
36				_ => 5
37			};
38			let mut interval = interval(timeout - Duration::from_secs(diff));
39			interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
40
41			loop {
42				tokio::select!{
43					packet = stream.receive(), if !should_close => {
44						let r_packet = match packet {
45							Ok(p) => {
46								handler.send(p).await?
47							},
48							Err(PacketReceiverError::Io(e)) => {
49								return Err(TaskError::Io(e))
50							},
51							Err(PacketReceiverError::Hard(e)) => {
52								return Err(TaskError::Packet(e))
53							},
54							Err(PacketReceiverError::Soft(h, e)) => {
55								handler.packet_error(h, e)?
56							}
57						};
58
59						match r_packet {
60							SendBack::None => {},
61							SendBack::Packet(p) => {
62								stream.send(p).await
63									.map_err(TaskError::Io)?;
64							},
65							SendBack::Close => {
66								should_close = true;
67								let _ = handler.close();
68							},
69							SendBack::CloseWithPacket => {
70								should_close = true;
71								let packet = handler.close();
72								close_packet = Some(packet);
73							}
74						}
75					},
76					Some(packet) = handler.to_send() => {
77						// Todo make this not block until everything is sent
78						// this can stop receiving
79						stream.send(packet).await
80							.map_err(TaskError::Io)?;
81					},
82					_ping = interval.tick(), if !should_close => {
83						stream.send(handler.ping_packet()).await
84							.map_err(TaskError::Io)?;
85					},
86					_ = &mut close, if !should_close => {
87						should_close = true;
88						let packet = handler.close();
89						close_packet = Some(packet);
90					},
91					Some(cfg) = cfg_rx.recv(), if !should_close => {
92						// should update configuration
93						stream.stream.set_timeout(cfg.timeout);
94						stream.builder.set_body_limit(cfg.body_limit);
95					},
96					else => {
97						if let Some(packet) = close_packet.take() {
98							if let Err(e) = stream.send(packet).await {
99								tracing::error!(
100									"error sending close packet {:?}", e
101								);
102							}
103						}
104						if let Err(e) = stream.shutdown().await {
105							tracing::error!("error shutting down {:?}", e);
106						}
107
108						return Ok(())
109					}
110				}
111
112			}
113		}
114	}
115}
116
117macro_rules! client_bg_reconnect {
118	(
119		$fn:ident(
120			$stream:ident,
121			$bg_handler:ident,
122			$cfg_rx:ident,
123			$rx_close:ident,
124			$recon_strat:ident,
125			// return Result<PacketStream, TaskError>
126			|$n_stream:ident, $cfg:ident| $block:block
127		)
128	) => (
129
130		let mut stream = Some($stream);
131
132		loop {
133			// reconnect if
134			let stream = match (stream.take(), &mut $recon_strat) {
135				(Some(s), _) => s,
136				// no recon and no stream
137				// this is not possible since on the first iteration a stream
138				// always exists and if the stream failes and there
139				// is no recon strategy we return
140				(None, None) => unreachable!(),
141				(None, Some(recon)) => {
142					let mut err_counter = 0;
143
144					loop {
145						let stream = (recon.inner)(err_counter).await;
146						match stream {
147							Ok(s) => break s,
148							Err(e) => {
149								tracing::error!(
150									"reconnect failed attempt {} {:?}",
151									err_counter,
152									e
153								);
154								err_counter += 1;
155							}
156						}
157					}
158				}
159			};
160
161			let cfg = $cfg_rx.newest();
162			let stream = {
163				let $n_stream = stream;
164				let $cfg = cfg;
165
166				$block
167			};
168			let stream = match stream {
169				Ok(s) => s,
170				Err(e) => {
171					tracing::error!("creating packetstream failed {:?}", e);
172					// close since we can't reconnect
173					if $recon_strat.is_none() {
174						return Err(e)
175					}
176
177					continue
178				}
179			};
180
181			let r = $fn(
182				stream,
183				&mut $bg_handler,
184				&mut $cfg_rx,
185				&mut $rx_close
186			).await;
187
188			match r {
189				Ok(o) => return Ok(o),
190				Err(e) => {
191					tracing::error!(
192						"fire stream client connection failed {:?}", e
193					);
194					if $recon_strat.is_none() {
195						// close since we can't reconnect
196						return Err(e)
197					}
198				}
199			}
200
201			// close all started requests because the connection failed
202			$bg_handler.close_all_started();
203		}
204	)
205}