1use std::sync::atomic::{AtomicBool, Ordering};
2use std::task::{Context, Poll, ready};
3use std::{cmp, future::Future, future::poll_fn, hash, pin::Pin, sync::Arc};
4
5use async_channel::{Receiver, Sender, TrySendError, unbounded};
6use atomic_waker::AtomicWaker;
7use core_affinity::CoreId;
8
9use ntex_rt::{Arbiter, spawn};
10use ntex_service::{Pipeline, PipelineBinding, Service, ServiceFactory};
11use ntex_util::future::{Either, Stream, select, stream_recv};
12use ntex_util::time::{Millis, sleep, timeout_checked};
13
14use crate::ServerConfiguration;
15
16const STOP_TIMEOUT: Millis = Millis(3000);
17
18#[derive(Debug)]
19struct Shutdown {
21 timeout: Millis,
22 result: oneshot::Sender<bool>,
23}
24
25#[derive(Copy, Clone, Default, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
26pub enum WorkerStatus {
28 Available,
29 #[default]
30 Unavailable,
31 Failed,
32}
33
34#[derive(Debug)]
35pub struct Worker<T> {
39 name: String,
40 tx1: Sender<T>,
41 tx2: Sender<Shutdown>,
42 avail: WorkerAvailability,
43 failed: Arc<AtomicBool>,
44}
45
46impl<T> cmp::Ord for Worker<T> {
47 fn cmp(&self, other: &Self) -> cmp::Ordering {
48 self.name.cmp(&other.name)
49 }
50}
51
52impl<T> cmp::PartialOrd for Worker<T> {
53 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
54 Some(self.cmp(other))
55 }
56}
57
58impl<T> hash::Hash for Worker<T> {
59 fn hash<H: hash::Hasher>(&self, state: &mut H) {
60 self.name.hash(state);
61 }
62}
63
64impl<T> Eq for Worker<T> {}
65
66impl<T> PartialEq for Worker<T> {
67 fn eq(&self, other: &Worker<T>) -> bool {
68 self.name == other.name
69 }
70}
71
72#[derive(Debug)]
73pub struct WorkerStop(oneshot::Receiver<bool>);
78
79impl<T> Worker<T> {
80 pub fn start<F>(name: String, cfg: F, cid: Option<CoreId>) -> Worker<T>
82 where
83 T: Send + 'static,
84 F: ServerConfiguration<Item = T>,
85 {
86 let (tx1, rx1) = unbounded();
87 let (tx2, rx2) = unbounded();
88 let (avail, avail_tx) = WorkerAvailability::create();
89 let name2 = name.clone();
90
91 Arbiter::with_name(name.clone()).exec_fn(move || {
92 if let Some(cid) = cid
93 && core_affinity::set_for_current(cid)
94 {
95 log::info!("Set affinity to {cid:?} for worker {name2:?}");
96 }
97
98 spawn(async move {
99 log::info!("Starting worker {name2:?}");
100
101 log::debug!("Creating server instance in {name2:?}");
102 let factory = cfg.create().await;
103
104 match create(name2.clone(), rx1, rx2, factory, avail_tx).await {
105 Ok((svc, wrk)) => {
106 log::debug!("Server instance has been created in {name2:?}");
107 run_worker(svc, wrk).await;
108 }
109 Err(e) => {
110 log::error!("Cannot start worker {name2:?}: {e:?}");
111 }
112 }
113 Arbiter::current().stop();
114 });
115 });
116
117 Worker {
118 tx1,
119 tx2,
120 name,
121 avail,
122 failed: Arc::new(AtomicBool::new(false)),
123 }
124 }
125
126 pub fn name(&self) -> &str {
128 &self.name
129 }
130
131 pub fn send(&self, msg: T) -> Result<(), T> {
136 self.tx1.try_send(msg).map_err(TrySendError::into_inner)
137 }
138
139 pub fn status(&self) -> WorkerStatus {
141 if self.failed.load(Ordering::Acquire) {
142 WorkerStatus::Failed
143 } else if self.avail.available() {
144 WorkerStatus::Available
145 } else {
146 WorkerStatus::Unavailable
147 }
148 }
149
150 pub async fn wait_for_status(&mut self) -> WorkerStatus {
152 if self.failed.load(Ordering::Acquire) {
153 WorkerStatus::Failed
154 } else {
155 self.avail.wait_for_update().await;
156 if self.avail.failed() {
157 self.failed.store(true, Ordering::Release);
158 }
159 self.status()
160 }
161 }
162
163 pub fn stop(&self, timeout: Millis) -> WorkerStop {
167 let (result, rx) = oneshot::channel();
168 let _ = self.tx2.try_send(Shutdown { timeout, result });
169 WorkerStop(rx)
170 }
171}
172
173impl<T> Clone for Worker<T> {
174 fn clone(&self) -> Self {
175 Worker {
176 tx1: self.tx1.clone(),
177 tx2: self.tx2.clone(),
178 name: self.name.clone(),
179 avail: self.avail.clone(),
180 failed: self.failed.clone(),
181 }
182 }
183}
184
185impl Future for WorkerStop {
186 type Output = bool;
187
188 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
189 match ready!(Pin::new(&mut self.0).poll(cx)) {
190 Ok(res) => Poll::Ready(res),
191 Err(_) => Poll::Ready(true),
192 }
193 }
194}
195
196#[derive(Debug, Clone)]
197struct WorkerAvailability {
198 inner: Arc<Inner>,
199}
200
201#[derive(Debug, Clone)]
202struct WorkerAvailabilityTx {
203 inner: Arc<Inner>,
204}
205
206#[derive(Debug)]
207struct Inner {
208 waker: AtomicWaker,
209 updated: AtomicBool,
210 available: AtomicBool,
211 failed: AtomicBool,
212}
213
214impl WorkerAvailability {
215 fn create() -> (Self, WorkerAvailabilityTx) {
216 let inner = Arc::new(Inner {
217 waker: AtomicWaker::new(),
218 updated: AtomicBool::new(false),
219 available: AtomicBool::new(false),
220 failed: AtomicBool::new(false),
221 });
222
223 let avail = WorkerAvailability {
224 inner: inner.clone(),
225 };
226 let avail_tx = WorkerAvailabilityTx { inner };
227 (avail, avail_tx)
228 }
229
230 fn failed(&self) -> bool {
231 self.inner.failed.load(Ordering::Acquire)
232 }
233
234 fn available(&self) -> bool {
235 self.inner.available.load(Ordering::Acquire)
236 }
237
238 async fn wait_for_update(&self) {
239 poll_fn(|cx| {
240 if self.inner.updated.load(Ordering::Acquire) {
241 self.inner.updated.store(false, Ordering::Release);
242 Poll::Ready(())
243 } else {
244 self.inner.waker.register(cx.waker());
245 Poll::Pending
246 }
247 })
248 .await;
249 }
250}
251
252impl WorkerAvailabilityTx {
253 fn set(&self, val: bool) {
254 let old = self.inner.available.swap(val, Ordering::Release);
255 if old != val {
256 self.inner.updated.store(true, Ordering::Release);
257 self.inner.waker.wake();
258 }
259 }
260}
261
262impl Drop for WorkerAvailabilityTx {
263 fn drop(&mut self) {
264 self.inner.failed.store(true, Ordering::Release);
265 self.inner.updated.store(true, Ordering::Release);
266 self.inner.available.store(false, Ordering::Release);
267 self.inner.waker.wake();
268 }
269}
270
271struct WorkerSt<T, F: ServiceFactory<T>> {
275 name: String,
276 rx: Receiver<T>,
277 stop: Pin<Box<dyn Stream<Item = Shutdown>>>,
278 factory: F,
279 availability: WorkerAvailabilityTx,
280}
281
282async fn run_worker<T, F>(mut svc: PipelineBinding<F::Service, T>, mut wrk: WorkerSt<T, F>)
283where
284 T: Send + 'static,
285 F: ServiceFactory<T> + 'static,
286{
287 loop {
288 let mut recv = std::pin::pin!(wrk.rx.recv());
289 let fut = poll_fn(|cx| {
290 match svc.poll_ready(cx) {
291 Poll::Ready(Ok(())) => {
292 wrk.availability.set(true);
293 }
294 Poll::Ready(Err(err)) => {
295 wrk.availability.set(false);
296 return Poll::Ready(Err(err));
297 }
298 Poll::Pending => {
299 wrk.availability.set(false);
300 return Poll::Pending;
301 }
302 }
303
304 if let Ok(item) = ready!(recv.as_mut().poll(cx)) {
305 let fut = svc.call(item);
306 spawn(async move {
307 let _ = fut.await;
308 });
309 Poll::Ready(Ok::<_, F::Error>(true))
310 } else {
311 log::error!("Server is gone");
312 Poll::Ready(Ok(false))
313 }
314 });
315
316 match select(fut, stream_recv(&mut wrk.stop)).await {
317 Either::Left(Ok(true)) => continue,
318 Either::Left(Err(_)) => {
319 ntex_rt::spawn(async move {
320 svc.shutdown().await;
321 });
322 }
323 Either::Right(Some(Shutdown { timeout, result })) => {
324 wrk.availability.set(false);
325
326 let timeout = if timeout.is_zero() { STOP_TIMEOUT } else { timeout };
327
328 stop_svc(&wrk.name, svc, timeout, Some(result)).await;
329 return;
330 }
331 Either::Left(Ok(false)) | Either::Right(None) => {
332 wrk.availability.set(false);
333 stop_svc(&wrk.name, svc, STOP_TIMEOUT, None).await;
334 return;
335 }
336 }
337
338 loop {
340 match select(wrk.factory.create(()), stream_recv(&mut wrk.stop)).await {
341 Either::Left(Ok(service)) => {
342 svc = Pipeline::new(service).bind();
343 break;
344 }
345 Either::Left(Err(_)) => sleep(Millis::ONE_SEC).await,
346 Either::Right(_) => return,
347 }
348 }
349 }
350}
351
352async fn stop_svc<T, F>(
353 name: &str,
354 svc: PipelineBinding<F, T>,
355 timeout: Millis,
356 result: Option<oneshot::Sender<bool>>,
357) where
358 T: Send + 'static,
359 F: Service<T> + 'static,
360{
361 let res = timeout_checked(timeout, svc.shutdown()).await;
362 if let Some(result) = result {
363 let _ = result.send(res.is_ok());
364 }
365
366 log::info!("Worker {name:?} has been stopped");
367}
368
369async fn create<T, F>(
370 name: String,
371 rx: Receiver<T>,
372 stop: Receiver<Shutdown>,
373 factory: Result<F, ()>,
374 availability: WorkerAvailabilityTx,
375) -> Result<(PipelineBinding<F::Service, T>, WorkerSt<T, F>), ()>
376where
377 T: Send + 'static,
378 F: ServiceFactory<T> + 'static,
379{
380 availability.set(false);
381 let factory = factory?;
382 let mut stop = Box::pin(stop);
383
384 let svc = match select(factory.create(()), stream_recv(&mut stop)).await {
385 Either::Left(Ok(svc)) => Pipeline::new(svc).bind(),
386 Either::Right(Some(Shutdown { result, .. })) => {
387 log::trace!("Shutdown uninitialized worker");
388 let _ = result.send(false);
389 return Err(());
390 }
391 Either::Left(Err(_)) | Either::Right(None) => return Err(()),
392 };
393 availability.set(true);
394
395 Ok((
396 svc,
397 WorkerSt {
398 name,
399 rx,
400 factory,
401 availability,
402 stop: Box::pin(stop),
403 },
404 ))
405}