1use std::sync::atomic::{AtomicBool, Ordering};
2use std::task::{Context, Poll, ready};
3use std::{cmp, future::Future, future::poll_fn, hash, pin::Pin, sync::Arc};
4
5use async_channel::{Receiver, Sender, TrySendError, unbounded};
6use atomic_waker::AtomicWaker;
7use core_affinity::CoreId;
8
9use ntex_rt::{Arbiter, spawn};
10use ntex_service::{Pipeline, PipelineBinding, Service, ServiceFactory};
11use ntex_util::future::{Either, Stream, select, stream_recv};
12use ntex_util::time::{Millis, sleep, timeout_checked};
13
14use crate::ServerConfiguration;
15
16const STOP_TIMEOUT: Millis = Millis(3000);
17
18#[derive(Debug)]
19struct Shutdown {
21 timeout: Millis,
22 result: oneshot::Sender<bool>,
23}
24
25#[derive(Copy, Clone, Default, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
26pub enum WorkerStatus {
28 Available,
29 #[default]
30 Unavailable,
31 Failed,
32}
33
34#[derive(Debug)]
35pub struct Worker<T> {
39 name: String,
40 tx1: Sender<T>,
41 tx2: Sender<Shutdown>,
42 avail: WorkerAvailability,
43 failed: Arc<AtomicBool>,
44}
45
46impl<T> cmp::Ord for Worker<T> {
47 fn cmp(&self, other: &Self) -> cmp::Ordering {
48 self.name.cmp(&other.name)
49 }
50}
51
52impl<T> cmp::PartialOrd for Worker<T> {
53 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
54 Some(self.cmp(other))
55 }
56}
57
58impl<T> hash::Hash for Worker<T> {
59 fn hash<H: hash::Hasher>(&self, state: &mut H) {
60 self.name.hash(state);
61 }
62}
63
64impl<T> Eq for Worker<T> {}
65
66impl<T> PartialEq for Worker<T> {
67 fn eq(&self, other: &Worker<T>) -> bool {
68 self.name == other.name
69 }
70}
71
72#[derive(Debug)]
73pub struct WorkerStop(oneshot::AsyncReceiver<bool>);
78
79impl<T> Worker<T> {
80 pub fn start<F>(name: String, cfg: F, cid: Option<CoreId>) -> Worker<T>
82 where
83 T: Send + 'static,
84 F: ServerConfiguration<Item = T>,
85 {
86 let (tx1, rx1) = unbounded();
87 let (tx2, rx2) = unbounded();
88 let (avail, avail_tx) = WorkerAvailability::create();
89 let name2 = name.clone();
90
91 Arbiter::with_name(name.clone()).handle().spawn(async move {
92 if let Some(cid) = cid
93 && core_affinity::set_for_current(cid)
94 {
95 log::info!("Set affinity to {cid:?} for worker {name2:?}");
96 }
97
98 spawn(async move {
99 log::info!("Starting worker {name2:?}");
100
101 log::debug!("Creating server instance in {name2:?}");
102 let factory = cfg.create().await;
103
104 match create(name2.clone(), rx1, rx2, factory, avail_tx).await {
105 Ok((svc, wrk)) => {
106 log::debug!("Server instance has been created in {name2:?}");
107 run_worker(svc, wrk).await;
108 }
109 Err(e) => {
110 log::error!("Cannot start worker {name2:?}: {e:?}");
111 }
112 }
113 Arbiter::current().stop();
114 });
115 });
116
117 Worker {
118 tx1,
119 tx2,
120 name,
121 avail,
122 failed: Arc::new(AtomicBool::new(false)),
123 }
124 }
125
126 pub fn name(&self) -> &str {
128 &self.name
129 }
130
131 pub fn send(&self, msg: T) -> Result<(), T> {
136 self.tx1.try_send(msg).map_err(TrySendError::into_inner)
137 }
138
139 pub fn status(&self) -> WorkerStatus {
141 if self.failed.load(Ordering::Acquire) {
142 WorkerStatus::Failed
143 } else if self.avail.available() {
144 WorkerStatus::Available
145 } else {
146 WorkerStatus::Unavailable
147 }
148 }
149
150 pub async fn wait_for_status(&mut self) -> WorkerStatus {
152 if self.failed.load(Ordering::Acquire) {
153 WorkerStatus::Failed
154 } else {
155 self.avail.wait_for_update().await;
156 if self.avail.failed() {
157 self.failed.store(true, Ordering::Release);
158 }
159 self.status()
160 }
161 }
162
163 pub fn stop(&self, timeout: Millis) -> WorkerStop {
167 let (result, rx) = oneshot::async_channel();
168 let _ = self.tx2.try_send(Shutdown { timeout, result });
169 WorkerStop(rx)
170 }
171}
172
173impl<T> Clone for Worker<T> {
174 fn clone(&self) -> Self {
175 Worker {
176 tx1: self.tx1.clone(),
177 tx2: self.tx2.clone(),
178 name: self.name.clone(),
179 avail: self.avail.clone(),
180 failed: self.failed.clone(),
181 }
182 }
183}
184
185impl Future for WorkerStop {
186 type Output = bool;
187
188 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
189 match ready!(Pin::new(&mut self.0).poll(cx)) {
190 Ok(res) => Poll::Ready(res),
191 Err(_) => Poll::Ready(true),
192 }
193 }
194}
195
196#[derive(Debug, Clone)]
197struct WorkerAvailability {
198 inner: Arc<Inner>,
199}
200
201#[derive(Debug, Clone)]
202struct WorkerAvailabilityTx {
203 inner: Arc<Inner>,
204}
205
206#[derive(Debug)]
207struct Inner {
208 waker: AtomicWaker,
209 updated: AtomicBool,
210 available: AtomicBool,
211 failed: AtomicBool,
212}
213
214impl WorkerAvailability {
215 fn create() -> (Self, WorkerAvailabilityTx) {
216 let inner = Arc::new(Inner {
217 waker: AtomicWaker::new(),
218 updated: AtomicBool::new(false),
219 available: AtomicBool::new(false),
220 failed: AtomicBool::new(false),
221 });
222
223 let avail = WorkerAvailability {
224 inner: inner.clone(),
225 };
226 let avail_tx = WorkerAvailabilityTx { inner };
227 (avail, avail_tx)
228 }
229
230 fn failed(&self) -> bool {
231 self.inner.failed.load(Ordering::Acquire)
232 }
233
234 fn available(&self) -> bool {
235 self.inner.available.load(Ordering::Acquire)
236 }
237
238 async fn wait_for_update(&self) {
239 poll_fn(|cx| {
240 self.inner.waker.register(cx.waker());
241 if self.inner.updated.swap(false, Ordering::AcqRel) {
242 Poll::Ready(())
243 } else {
244 Poll::Pending
245 }
246 })
247 .await;
248 }
249}
250
251impl WorkerAvailabilityTx {
252 fn set(&self, val: bool) {
253 let old = self.inner.available.swap(val, Ordering::Release);
254 if old != val {
255 self.inner.updated.store(true, Ordering::Release);
256 self.inner.waker.wake();
257 }
258 }
259}
260
261impl Drop for WorkerAvailabilityTx {
262 fn drop(&mut self) {
263 self.inner.failed.store(true, Ordering::Release);
264 self.inner.updated.store(true, Ordering::Release);
265 self.inner.available.store(false, Ordering::Release);
266 self.inner.waker.wake();
267 }
268}
269
270struct WorkerSt<T, F: ServiceFactory<T>> {
274 name: String,
275 rx: Receiver<T>,
276 stop: Pin<Box<dyn Stream<Item = Shutdown>>>,
277 factory: F,
278 availability: WorkerAvailabilityTx,
279}
280
281async fn run_worker<T, F>(mut svc: PipelineBinding<F::Service, T>, mut wrk: WorkerSt<T, F>)
282where
283 T: Send + 'static,
284 F: ServiceFactory<T> + 'static,
285{
286 loop {
287 let mut recv = std::pin::pin!(wrk.rx.recv());
288 let fut = poll_fn(|cx| {
289 match svc.poll_ready(cx) {
290 Poll::Ready(Ok(())) => {
291 wrk.availability.set(true);
292 }
293 Poll::Ready(Err(err)) => {
294 wrk.availability.set(false);
295 return Poll::Ready(Err(err));
296 }
297 Poll::Pending => {
298 wrk.availability.set(false);
299 return Poll::Pending;
300 }
301 }
302
303 if let Ok(item) = ready!(recv.as_mut().poll(cx)) {
304 let fut = svc.call(item);
305 spawn(async move {
306 let _ = fut.await;
307 });
308 Poll::Ready(Ok::<_, F::Error>(true))
309 } else {
310 log::error!("Server is gone");
311 Poll::Ready(Ok(false))
312 }
313 });
314
315 match select(fut, stream_recv(&mut wrk.stop)).await {
316 Either::Left(Ok(true)) => continue,
317 Either::Left(Err(_)) => {
318 ntex_rt::spawn(async move {
319 svc.shutdown().await;
320 });
321 }
322 Either::Right(Some(Shutdown { timeout, result })) => {
323 wrk.availability.set(false);
324
325 let timeout = if timeout.is_zero() { STOP_TIMEOUT } else { timeout };
326
327 stop_svc(&wrk.name, svc, timeout, Some(result)).await;
328 return;
329 }
330 Either::Left(Ok(false)) | Either::Right(None) => {
331 wrk.availability.set(false);
332 stop_svc(&wrk.name, svc, STOP_TIMEOUT, None).await;
333 return;
334 }
335 }
336
337 loop {
339 match select(wrk.factory.create(()), stream_recv(&mut wrk.stop)).await {
340 Either::Left(Ok(service)) => {
341 svc = Pipeline::new(service).bind();
342 break;
343 }
344 Either::Left(Err(_)) => sleep(Millis::ONE_SEC).await,
345 Either::Right(_) => return,
346 }
347 }
348 }
349}
350
351async fn stop_svc<T, F>(
352 name: &str,
353 svc: PipelineBinding<F, T>,
354 timeout: Millis,
355 result: Option<oneshot::Sender<bool>>,
356) where
357 T: Send + 'static,
358 F: Service<T> + 'static,
359{
360 let res = timeout_checked(timeout, svc.shutdown()).await;
361 if let Some(result) = result {
362 let _ = result.send(res.is_ok());
363 }
364
365 log::info!("Worker {name:?} has been stopped");
366}
367
368async fn create<T, F>(
369 name: String,
370 rx: Receiver<T>,
371 stop: Receiver<Shutdown>,
372 factory: Result<F, ()>,
373 availability: WorkerAvailabilityTx,
374) -> Result<(PipelineBinding<F::Service, T>, WorkerSt<T, F>), ()>
375where
376 T: Send + 'static,
377 F: ServiceFactory<T> + 'static,
378{
379 availability.set(false);
380 let factory = factory?;
381 let mut stop = Box::pin(stop);
382
383 let svc = match select(factory.create(()), stream_recv(&mut stop)).await {
384 Either::Left(Ok(svc)) => Pipeline::new(svc).bind(),
385 Either::Right(Some(Shutdown { result, .. })) => {
386 log::trace!("Shutdown uninitialized worker");
387 let _ = result.send(false);
388 return Err(());
389 }
390 Either::Left(Err(_)) | Either::Right(None) => return Err(()),
391 };
392 availability.set(true);
393
394 Ok((
395 svc,
396 WorkerSt {
397 name,
398 rx,
399 factory,
400 availability,
401 stop: Box::pin(stop),
402 },
403 ))
404}