ntex_server/
wrk.rs

1use std::sync::atomic::{AtomicBool, Ordering};
2use std::task::{ready, Context, Poll};
3use std::{cmp, future::poll_fn, future::Future, hash, pin::Pin, sync::Arc};
4
5use async_channel::{unbounded, Receiver, Sender};
6use atomic_waker::AtomicWaker;
7use core_affinity::CoreId;
8
9use ntex_rt::{spawn, Arbiter};
10use ntex_service::{Pipeline, PipelineBinding, Service, ServiceFactory};
11use ntex_util::future::{select, stream_recv, Either, Stream};
12use ntex_util::time::{sleep, timeout_checked, Millis};
13
14use crate::{ServerConfiguration, WorkerId};
15
16const STOP_TIMEOUT: Millis = Millis(3000);
17
18#[derive(Debug)]
19/// Shutdown worker
20struct Shutdown {
21    timeout: Millis,
22    result: oneshot::Sender<bool>,
23}
24
25#[derive(Copy, Clone, Default, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
26/// Worker status
27pub enum WorkerStatus {
28    Available,
29    #[default]
30    Unavailable,
31    Failed,
32}
33
34#[derive(Debug)]
35/// Server worker
36///
37/// Worker accepts message via unbounded channel and starts processing.
38pub struct Worker<T> {
39    id: WorkerId,
40    tx1: Sender<T>,
41    tx2: Sender<Shutdown>,
42    avail: WorkerAvailability,
43    failed: Arc<AtomicBool>,
44}
45
46impl<T> cmp::Ord for Worker<T> {
47    fn cmp(&self, other: &Self) -> cmp::Ordering {
48        self.id.cmp(&other.id)
49    }
50}
51
52impl<T> cmp::PartialOrd for Worker<T> {
53    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
54        Some(self.id.cmp(&other.id))
55    }
56}
57
58impl<T> hash::Hash for Worker<T> {
59    fn hash<H: hash::Hasher>(&self, state: &mut H) {
60        self.id.hash(state);
61    }
62}
63
64impl<T> Eq for Worker<T> {}
65
66impl<T> PartialEq for Worker<T> {
67    fn eq(&self, other: &Worker<T>) -> bool {
68        self.id == other.id
69    }
70}
71
72#[derive(Debug)]
73/// Stop worker process
74///
75/// Stop future resolves when worker completes processing
76/// incoming items and stop arbiter
77pub struct WorkerStop(oneshot::Receiver<bool>);
78
79impl<T> Worker<T> {
80    /// Start worker.
81    pub fn start<F>(id: WorkerId, cfg: F, cid: Option<CoreId>) -> Worker<T>
82    where
83        T: Send + 'static,
84        F: ServerConfiguration<Item = T>,
85    {
86        let (tx1, rx1) = unbounded();
87        let (tx2, rx2) = unbounded();
88        let (avail, avail_tx) = WorkerAvailability::create();
89
90        Arbiter::default().exec_fn(move || {
91            if let Some(cid) = cid {
92                if core_affinity::set_for_current(cid) {
93                    log::info!("Set affinity to {:?} for worker {:?}", cid, id);
94                }
95            }
96
97            let _ = spawn(async move {
98                log::info!("Starting worker {:?}", id);
99
100                log::debug!("Creating server instance in {:?}", id);
101                let factory = cfg.create().await;
102
103                match create(id, rx1, rx2, factory, avail_tx).await {
104                    Ok((svc, wrk)) => {
105                        log::debug!("Server instance has been created in {:?}", id);
106                        run_worker(svc, wrk).await;
107                    }
108                    Err(e) => {
109                        log::error!("Cannot start worker: {:?}", e);
110                    }
111                }
112                Arbiter::current().stop();
113            });
114        });
115
116        Worker {
117            id,
118            tx1,
119            tx2,
120            avail,
121            failed: Arc::new(AtomicBool::new(false)),
122        }
123    }
124
125    /// Worker id.
126    pub fn id(&self) -> WorkerId {
127        self.id
128    }
129
130    /// Send message to the worker.
131    ///
132    /// Returns `Ok` if message got accepted by the worker.
133    /// Otherwise return message back as `Err`
134    pub fn send(&self, msg: T) -> Result<(), T> {
135        self.tx1.try_send(msg).map_err(|msg| msg.into_inner())
136    }
137
138    /// Check worker status.
139    pub fn status(&self) -> WorkerStatus {
140        if self.failed.load(Ordering::Acquire) {
141            WorkerStatus::Failed
142        } else if self.avail.available() {
143            WorkerStatus::Available
144        } else {
145            WorkerStatus::Unavailable
146        }
147    }
148
149    /// Wait for worker status updates
150    pub async fn wait_for_status(&mut self) -> WorkerStatus {
151        if self.failed.load(Ordering::Acquire) {
152            WorkerStatus::Failed
153        } else {
154            self.avail.wait_for_update().await;
155            if self.avail.failed() {
156                self.failed.store(true, Ordering::Release);
157            }
158            self.status()
159        }
160    }
161
162    /// Stop worker.
163    ///
164    /// If timeout value is zero, force shutdown worker
165    pub fn stop(&self, timeout: Millis) -> WorkerStop {
166        let (result, rx) = oneshot::channel();
167        let _ = self.tx2.try_send(Shutdown { timeout, result });
168        WorkerStop(rx)
169    }
170}
171
172impl<T> Clone for Worker<T> {
173    fn clone(&self) -> Self {
174        Worker {
175            id: self.id,
176            tx1: self.tx1.clone(),
177            tx2: self.tx2.clone(),
178            avail: self.avail.clone(),
179            failed: self.failed.clone(),
180        }
181    }
182}
183
184impl Future for WorkerStop {
185    type Output = bool;
186
187    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
188        match ready!(Pin::new(&mut self.0).poll(cx)) {
189            Ok(res) => Poll::Ready(res),
190            Err(_) => Poll::Ready(true),
191        }
192    }
193}
194
195#[derive(Debug, Clone)]
196struct WorkerAvailability {
197    inner: Arc<Inner>,
198}
199
200#[derive(Debug, Clone)]
201struct WorkerAvailabilityTx {
202    inner: Arc<Inner>,
203}
204
205#[derive(Debug)]
206struct Inner {
207    waker: AtomicWaker,
208    updated: AtomicBool,
209    available: AtomicBool,
210    failed: AtomicBool,
211}
212
213impl WorkerAvailability {
214    fn create() -> (Self, WorkerAvailabilityTx) {
215        let inner = Arc::new(Inner {
216            waker: AtomicWaker::new(),
217            updated: AtomicBool::new(false),
218            available: AtomicBool::new(false),
219            failed: AtomicBool::new(false),
220        });
221
222        let avail = WorkerAvailability {
223            inner: inner.clone(),
224        };
225        let avail_tx = WorkerAvailabilityTx { inner };
226        (avail, avail_tx)
227    }
228
229    fn failed(&self) -> bool {
230        self.inner.failed.load(Ordering::Acquire)
231    }
232
233    fn available(&self) -> bool {
234        self.inner.available.load(Ordering::Acquire)
235    }
236
237    async fn wait_for_update(&self) {
238        poll_fn(|cx| {
239            if self.inner.updated.load(Ordering::Acquire) {
240                self.inner.updated.store(false, Ordering::Release);
241                Poll::Ready(())
242            } else {
243                self.inner.waker.register(cx.waker());
244                Poll::Pending
245            }
246        })
247        .await;
248    }
249}
250
251impl WorkerAvailabilityTx {
252    fn set(&self, val: bool) {
253        let old = self.inner.available.swap(val, Ordering::Release);
254        if old != val {
255            self.inner.updated.store(true, Ordering::Release);
256            self.inner.waker.wake();
257        }
258    }
259}
260
261impl Drop for WorkerAvailabilityTx {
262    fn drop(&mut self) {
263        self.inner.failed.store(true, Ordering::Release);
264        self.inner.updated.store(true, Ordering::Release);
265        self.inner.available.store(false, Ordering::Release);
266        self.inner.waker.wake();
267    }
268}
269
270/// Service worker
271///
272/// Worker accepts message via unbounded channel and starts processing.
273struct WorkerSt<T, F: ServiceFactory<T>> {
274    id: WorkerId,
275    rx: Receiver<T>,
276    stop: Pin<Box<dyn Stream<Item = Shutdown>>>,
277    factory: F,
278    availability: WorkerAvailabilityTx,
279}
280
281async fn run_worker<T, F>(mut svc: PipelineBinding<F::Service, T>, mut wrk: WorkerSt<T, F>)
282where
283    T: Send + 'static,
284    F: ServiceFactory<T> + 'static,
285{
286    loop {
287        let mut recv = std::pin::pin!(wrk.rx.recv());
288        let fut = poll_fn(|cx| {
289            match svc.poll_ready(cx) {
290                Poll::Ready(Ok(())) => {
291                    wrk.availability.set(true);
292                }
293                Poll::Ready(Err(err)) => {
294                    wrk.availability.set(false);
295                    return Poll::Ready(Err(err));
296                }
297                Poll::Pending => {
298                    wrk.availability.set(false);
299                    return Poll::Pending;
300                }
301            }
302
303            match ready!(recv.as_mut().poll(cx)) {
304                Ok(item) => {
305                    let fut = svc.call(item);
306                    let _ = spawn(async move {
307                        let _ = fut.await;
308                    });
309                    Poll::Ready(Ok::<_, F::Error>(true))
310                }
311                Err(_) => {
312                    log::error!("Server is gone");
313                    Poll::Ready(Ok(false))
314                }
315            }
316        });
317
318        match select(fut, stream_recv(&mut wrk.stop)).await {
319            Either::Left(Ok(true)) => continue,
320            Either::Left(Err(_)) => {
321                let _ = ntex_rt::spawn(async move {
322                    svc.shutdown().await;
323                });
324            }
325            Either::Right(Some(Shutdown { timeout, result })) => {
326                wrk.availability.set(false);
327
328                let timeout = if timeout.is_zero() {
329                    STOP_TIMEOUT
330                } else {
331                    timeout
332                };
333
334                stop_svc(wrk.id, svc, timeout, Some(result)).await;
335                return;
336            }
337            Either::Left(Ok(false)) | Either::Right(None) => {
338                wrk.availability.set(false);
339                stop_svc(wrk.id, svc, STOP_TIMEOUT, None).await;
340                return;
341            }
342        }
343
344        // re-create service
345        loop {
346            match select(wrk.factory.create(()), stream_recv(&mut wrk.stop)).await {
347                Either::Left(Ok(service)) => {
348                    svc = Pipeline::new(service).bind();
349                    break;
350                }
351                Either::Left(Err(_)) => sleep(Millis::ONE_SEC).await,
352                Either::Right(_) => return,
353            }
354        }
355    }
356}
357
358async fn stop_svc<T, F>(
359    id: WorkerId,
360    svc: PipelineBinding<F, T>,
361    timeout: Millis,
362    result: Option<oneshot::Sender<bool>>,
363) where
364    T: Send + 'static,
365    F: Service<T> + 'static,
366{
367    let res = timeout_checked(timeout, svc.shutdown()).await;
368    if let Some(result) = result {
369        let _ = result.send(res.is_ok());
370    }
371
372    log::info!("Worker {:?} has been stopped", id);
373}
374
375async fn create<T, F>(
376    id: WorkerId,
377    rx: Receiver<T>,
378    stop: Receiver<Shutdown>,
379    factory: Result<F, ()>,
380    availability: WorkerAvailabilityTx,
381) -> Result<(PipelineBinding<F::Service, T>, WorkerSt<T, F>), ()>
382where
383    T: Send + 'static,
384    F: ServiceFactory<T> + 'static,
385{
386    availability.set(false);
387    let factory = factory?;
388    let mut stop = Box::pin(stop);
389
390    let svc = match select(factory.create(()), stream_recv(&mut stop)).await {
391        Either::Left(Ok(svc)) => Pipeline::new(svc).bind(),
392        Either::Left(Err(_)) => return Err(()),
393        Either::Right(Some(Shutdown { result, .. })) => {
394            log::trace!("Shutdown uninitialized worker");
395            let _ = result.send(false);
396            return Err(());
397        }
398        Either::Right(None) => return Err(()),
399    };
400    availability.set(true);
401
402    Ok((
403        svc,
404        WorkerSt {
405            id,
406            rx,
407            factory,
408            availability,
409            stop: Box::pin(stop),
410        },
411    ))
412}