fire_stream/util/
bg_task.rs1macro_rules! bg_stream {
18 ($name:ident, $handler:ty, $bytes:ty, $cfg:ty) => {
19 async fn $name<S, P>(
20 mut stream: PacketStream<S, P>,
21 handler: &mut $handler,
22 cfg_rx: &mut watch::Receiver<$cfg>,
23 mut close: &mut oneshot::Receiver<()>
24 ) -> Result<(), TaskError>
25 where
26 S: ByteStream,
27 P: Packet<$bytes>
28 {
29 let mut should_close = false;
30 let mut close_packet = None;
31
32 let timeout = stream.timeout();
33 let diff = match timeout.as_secs() {
34 0..=1 => 0,
35 0..=10 => 1,
36 _ => 5
37 };
38 let mut interval = interval(timeout - Duration::from_secs(diff));
39 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
40
41 loop {
42 tokio::select!{
43 packet = stream.receive(), if !should_close => {
44 let r_packet = match packet {
45 Ok(p) => {
46 handler.send(p).await?
47 },
48 Err(PacketReceiverError::Io(e)) => {
49 return Err(TaskError::Io(e))
50 },
51 Err(PacketReceiverError::Hard(e)) => {
52 return Err(TaskError::Packet(e))
53 },
54 Err(PacketReceiverError::Soft(h, e)) => {
55 handler.packet_error(h, e)?
56 }
57 };
58
59 match r_packet {
60 SendBack::None => {},
61 SendBack::Packet(p) => {
62 stream.send(p).await
63 .map_err(TaskError::Io)?;
64 },
65 SendBack::Close => {
66 should_close = true;
67 let _ = handler.close();
68 },
69 SendBack::CloseWithPacket => {
70 should_close = true;
71 let packet = handler.close();
72 close_packet = Some(packet);
73 }
74 }
75 },
76 Some(packet) = handler.to_send() => {
77 stream.send(packet).await
80 .map_err(TaskError::Io)?;
81 },
82 _ping = interval.tick(), if !should_close => {
83 stream.send(handler.ping_packet()).await
84 .map_err(TaskError::Io)?;
85 },
86 _ = &mut close, if !should_close => {
87 should_close = true;
88 let packet = handler.close();
89 close_packet = Some(packet);
90 },
91 Some(cfg) = cfg_rx.recv(), if !should_close => {
92 stream.stream.set_timeout(cfg.timeout);
94 stream.builder.set_body_limit(cfg.body_limit);
95 },
96 else => {
97 if let Some(packet) = close_packet.take() {
98 if let Err(e) = stream.send(packet).await {
99 tracing::error!(
100 "error sending close packet {:?}", e
101 );
102 }
103 }
104 if let Err(e) = stream.shutdown().await {
105 tracing::error!("error shutting down {:?}", e);
106 }
107
108 return Ok(())
109 }
110 }
111
112 }
113 }
114 }
115}
116
117macro_rules! client_bg_reconnect {
118 (
119 $fn:ident(
120 $stream:ident,
121 $bg_handler:ident,
122 $cfg_rx:ident,
123 $rx_close:ident,
124 $recon_strat:ident,
125 |$n_stream:ident, $cfg:ident| $block:block
127 )
128 ) => (
129
130 let mut stream = Some($stream);
131
132 loop {
133 let stream = match (stream.take(), &mut $recon_strat) {
135 (Some(s), _) => s,
136 (None, None) => unreachable!(),
141 (None, Some(recon)) => {
142 let mut err_counter = 0;
143
144 loop {
145 let stream = (recon.inner)(err_counter).await;
146 match stream {
147 Ok(s) => break s,
148 Err(e) => {
149 tracing::error!(
150 "reconnect failed attempt {} {:?}",
151 err_counter,
152 e
153 );
154 err_counter += 1;
155 }
156 }
157 }
158 }
159 };
160
161 let cfg = $cfg_rx.newest();
162 let stream = {
163 let $n_stream = stream;
164 let $cfg = cfg;
165
166 $block
167 };
168 let stream = match stream {
169 Ok(s) => s,
170 Err(e) => {
171 tracing::error!("creating packetstream failed {:?}", e);
172 if $recon_strat.is_none() {
174 return Err(e)
175 }
176
177 continue
178 }
179 };
180
181 let r = $fn(
182 stream,
183 &mut $bg_handler,
184 &mut $cfg_rx,
185 &mut $rx_close
186 ).await;
187
188 match r {
189 Ok(o) => return Ok(o),
190 Err(e) => {
191 tracing::error!(
192 "fire stream client connection failed {:?}", e
193 );
194 if $recon_strat.is_none() {
195 return Err(e)
197 }
198 }
199 }
200
201 $bg_handler.close_all_started();
203 }
204 )
205}