1use std::sync::atomic::{AtomicBool, Ordering};
2use std::task::{ready, Context, Poll};
3use std::{cmp, future::poll_fn, future::Future, hash, pin::Pin, sync::Arc};
4
5use async_channel::{unbounded, Receiver, Sender};
6use atomic_waker::AtomicWaker;
7use core_affinity::CoreId;
8
9use ntex_rt::{spawn, Arbiter};
10use ntex_service::{Pipeline, PipelineBinding, Service, ServiceFactory};
11use ntex_util::future::{select, stream_recv, Either, Stream};
12use ntex_util::time::{sleep, timeout_checked, Millis};
13
14use crate::{ServerConfiguration, WorkerId};
15
16const STOP_TIMEOUT: Millis = Millis(3000);
17
18#[derive(Debug)]
19struct Shutdown {
21 timeout: Millis,
22 result: oneshot::Sender<bool>,
23}
24
25#[derive(Copy, Clone, Default, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
26pub enum WorkerStatus {
28 Available,
29 #[default]
30 Unavailable,
31 Failed,
32}
33
34#[derive(Debug)]
35pub struct Worker<T> {
39 id: WorkerId,
40 tx1: Sender<T>,
41 tx2: Sender<Shutdown>,
42 avail: WorkerAvailability,
43 failed: Arc<AtomicBool>,
44}
45
46impl<T> cmp::Ord for Worker<T> {
47 fn cmp(&self, other: &Self) -> cmp::Ordering {
48 self.id.cmp(&other.id)
49 }
50}
51
52impl<T> cmp::PartialOrd for Worker<T> {
53 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
54 Some(self.id.cmp(&other.id))
55 }
56}
57
58impl<T> hash::Hash for Worker<T> {
59 fn hash<H: hash::Hasher>(&self, state: &mut H) {
60 self.id.hash(state);
61 }
62}
63
64impl<T> Eq for Worker<T> {}
65
66impl<T> PartialEq for Worker<T> {
67 fn eq(&self, other: &Worker<T>) -> bool {
68 self.id == other.id
69 }
70}
71
72#[derive(Debug)]
73pub struct WorkerStop(oneshot::Receiver<bool>);
78
79impl<T> Worker<T> {
80 pub fn start<F>(id: WorkerId, cfg: F, cid: Option<CoreId>) -> Worker<T>
82 where
83 T: Send + 'static,
84 F: ServerConfiguration<Item = T>,
85 {
86 let (tx1, rx1) = unbounded();
87 let (tx2, rx2) = unbounded();
88 let (avail, avail_tx) = WorkerAvailability::create();
89
90 Arbiter::default().exec_fn(move || {
91 if let Some(cid) = cid {
92 if core_affinity::set_for_current(cid) {
93 log::info!("Set affinity to {:?} for worker {:?}", cid, id);
94 }
95 }
96
97 let _ = spawn(async move {
98 log::info!("Starting worker {:?}", id);
99
100 log::debug!("Creating server instance in {:?}", id);
101 let factory = cfg.create().await;
102
103 match create(id, rx1, rx2, factory, avail_tx).await {
104 Ok((svc, wrk)) => {
105 log::debug!("Server instance has been created in {:?}", id);
106 run_worker(svc, wrk).await;
107 }
108 Err(e) => {
109 log::error!("Cannot start worker: {:?}", e);
110 }
111 }
112 Arbiter::current().stop();
113 });
114 });
115
116 Worker {
117 id,
118 tx1,
119 tx2,
120 avail,
121 failed: Arc::new(AtomicBool::new(false)),
122 }
123 }
124
125 pub fn id(&self) -> WorkerId {
127 self.id
128 }
129
130 pub fn send(&self, msg: T) -> Result<(), T> {
135 self.tx1.try_send(msg).map_err(|msg| msg.into_inner())
136 }
137
138 pub fn status(&self) -> WorkerStatus {
140 if self.failed.load(Ordering::Acquire) {
141 WorkerStatus::Failed
142 } else if self.avail.available() {
143 WorkerStatus::Available
144 } else {
145 WorkerStatus::Unavailable
146 }
147 }
148
149 pub async fn wait_for_status(&mut self) -> WorkerStatus {
151 if self.failed.load(Ordering::Acquire) {
152 WorkerStatus::Failed
153 } else {
154 self.avail.wait_for_update().await;
155 if self.avail.failed() {
156 self.failed.store(true, Ordering::Release);
157 }
158 self.status()
159 }
160 }
161
162 pub fn stop(&self, timeout: Millis) -> WorkerStop {
166 let (result, rx) = oneshot::channel();
167 let _ = self.tx2.try_send(Shutdown { timeout, result });
168 WorkerStop(rx)
169 }
170}
171
172impl<T> Clone for Worker<T> {
173 fn clone(&self) -> Self {
174 Worker {
175 id: self.id,
176 tx1: self.tx1.clone(),
177 tx2: self.tx2.clone(),
178 avail: self.avail.clone(),
179 failed: self.failed.clone(),
180 }
181 }
182}
183
184impl Future for WorkerStop {
185 type Output = bool;
186
187 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
188 match ready!(Pin::new(&mut self.0).poll(cx)) {
189 Ok(res) => Poll::Ready(res),
190 Err(_) => Poll::Ready(true),
191 }
192 }
193}
194
195#[derive(Debug, Clone)]
196struct WorkerAvailability {
197 inner: Arc<Inner>,
198}
199
200#[derive(Debug, Clone)]
201struct WorkerAvailabilityTx {
202 inner: Arc<Inner>,
203}
204
205#[derive(Debug)]
206struct Inner {
207 waker: AtomicWaker,
208 updated: AtomicBool,
209 available: AtomicBool,
210 failed: AtomicBool,
211}
212
213impl WorkerAvailability {
214 fn create() -> (Self, WorkerAvailabilityTx) {
215 let inner = Arc::new(Inner {
216 waker: AtomicWaker::new(),
217 updated: AtomicBool::new(false),
218 available: AtomicBool::new(false),
219 failed: AtomicBool::new(false),
220 });
221
222 let avail = WorkerAvailability {
223 inner: inner.clone(),
224 };
225 let avail_tx = WorkerAvailabilityTx { inner };
226 (avail, avail_tx)
227 }
228
229 fn failed(&self) -> bool {
230 self.inner.failed.load(Ordering::Acquire)
231 }
232
233 fn available(&self) -> bool {
234 self.inner.available.load(Ordering::Acquire)
235 }
236
237 async fn wait_for_update(&self) {
238 poll_fn(|cx| {
239 if self.inner.updated.load(Ordering::Acquire) {
240 self.inner.updated.store(false, Ordering::Release);
241 Poll::Ready(())
242 } else {
243 self.inner.waker.register(cx.waker());
244 Poll::Pending
245 }
246 })
247 .await;
248 }
249}
250
251impl WorkerAvailabilityTx {
252 fn set(&self, val: bool) {
253 let old = self.inner.available.swap(val, Ordering::Release);
254 if old != val {
255 self.inner.updated.store(true, Ordering::Release);
256 self.inner.waker.wake();
257 }
258 }
259}
260
261impl Drop for WorkerAvailabilityTx {
262 fn drop(&mut self) {
263 self.inner.failed.store(true, Ordering::Release);
264 self.inner.updated.store(true, Ordering::Release);
265 self.inner.available.store(false, Ordering::Release);
266 self.inner.waker.wake();
267 }
268}
269
270struct WorkerSt<T, F: ServiceFactory<T>> {
274 id: WorkerId,
275 rx: Receiver<T>,
276 stop: Pin<Box<dyn Stream<Item = Shutdown>>>,
277 factory: F,
278 availability: WorkerAvailabilityTx,
279}
280
281async fn run_worker<T, F>(mut svc: PipelineBinding<F::Service, T>, mut wrk: WorkerSt<T, F>)
282where
283 T: Send + 'static,
284 F: ServiceFactory<T> + 'static,
285{
286 loop {
287 let mut recv = std::pin::pin!(wrk.rx.recv());
288 let fut = poll_fn(|cx| {
289 match svc.poll_ready(cx) {
290 Poll::Ready(Ok(())) => {
291 wrk.availability.set(true);
292 }
293 Poll::Ready(Err(err)) => {
294 wrk.availability.set(false);
295 return Poll::Ready(Err(err));
296 }
297 Poll::Pending => {
298 wrk.availability.set(false);
299 return Poll::Pending;
300 }
301 }
302
303 match ready!(recv.as_mut().poll(cx)) {
304 Ok(item) => {
305 let fut = svc.call(item);
306 let _ = spawn(async move {
307 let _ = fut.await;
308 });
309 Poll::Ready(Ok::<_, F::Error>(true))
310 }
311 Err(_) => {
312 log::error!("Server is gone");
313 Poll::Ready(Ok(false))
314 }
315 }
316 });
317
318 match select(fut, stream_recv(&mut wrk.stop)).await {
319 Either::Left(Ok(true)) => continue,
320 Either::Left(Err(_)) => {
321 let _ = ntex_rt::spawn(async move {
322 svc.shutdown().await;
323 });
324 }
325 Either::Right(Some(Shutdown { timeout, result })) => {
326 wrk.availability.set(false);
327
328 let timeout = if timeout.is_zero() {
329 STOP_TIMEOUT
330 } else {
331 timeout
332 };
333
334 stop_svc(wrk.id, svc, timeout, Some(result)).await;
335 return;
336 }
337 Either::Left(Ok(false)) | Either::Right(None) => {
338 wrk.availability.set(false);
339 stop_svc(wrk.id, svc, STOP_TIMEOUT, None).await;
340 return;
341 }
342 }
343
344 loop {
346 match select(wrk.factory.create(()), stream_recv(&mut wrk.stop)).await {
347 Either::Left(Ok(service)) => {
348 svc = Pipeline::new(service).bind();
349 break;
350 }
351 Either::Left(Err(_)) => sleep(Millis::ONE_SEC).await,
352 Either::Right(_) => return,
353 }
354 }
355 }
356}
357
358async fn stop_svc<T, F>(
359 id: WorkerId,
360 svc: PipelineBinding<F, T>,
361 timeout: Millis,
362 result: Option<oneshot::Sender<bool>>,
363) where
364 T: Send + 'static,
365 F: Service<T> + 'static,
366{
367 let res = timeout_checked(timeout, svc.shutdown()).await;
368 if let Some(result) = result {
369 let _ = result.send(res.is_ok());
370 }
371
372 log::info!("Worker {:?} has been stopped", id);
373}
374
375async fn create<T, F>(
376 id: WorkerId,
377 rx: Receiver<T>,
378 stop: Receiver<Shutdown>,
379 factory: Result<F, ()>,
380 availability: WorkerAvailabilityTx,
381) -> Result<(PipelineBinding<F::Service, T>, WorkerSt<T, F>), ()>
382where
383 T: Send + 'static,
384 F: ServiceFactory<T> + 'static,
385{
386 availability.set(false);
387 let factory = factory?;
388 let mut stop = Box::pin(stop);
389
390 let svc = match select(factory.create(()), stream_recv(&mut stop)).await {
391 Either::Left(Ok(svc)) => Pipeline::new(svc).bind(),
392 Either::Left(Err(_)) => return Err(()),
393 Either::Right(Some(Shutdown { result, .. })) => {
394 log::trace!("Shutdown uninitialized worker");
395 let _ = result.send(false);
396 return Err(());
397 }
398 Either::Right(None) => return Err(()),
399 };
400 availability.set(true);
401
402 Ok((
403 svc,
404 WorkerSt {
405 id,
406 rx,
407 factory,
408 availability,
409 stop: Box::pin(stop),
410 },
411 ))
412}