ntex_server/
wrk.rs

1use std::sync::atomic::{AtomicBool, Ordering};
2use std::task::{Context, Poll, ready};
3use std::{cmp, future::Future, future::poll_fn, hash, pin::Pin, sync::Arc};
4
5use async_channel::{Receiver, Sender, unbounded};
6use atomic_waker::AtomicWaker;
7use core_affinity::CoreId;
8
9use ntex_rt::{Arbiter, spawn};
10use ntex_service::{Pipeline, PipelineBinding, Service, ServiceFactory};
11use ntex_util::future::{Either, Stream, select, stream_recv};
12use ntex_util::time::{Millis, sleep, timeout_checked};
13
14use crate::{ServerConfiguration, WorkerId};
15
16const STOP_TIMEOUT: Millis = Millis(3000);
17
18#[derive(Debug)]
19/// Shutdown worker
20struct Shutdown {
21    timeout: Millis,
22    result: oneshot::Sender<bool>,
23}
24
25#[derive(Copy, Clone, Default, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
26/// Worker status
27pub enum WorkerStatus {
28    Available,
29    #[default]
30    Unavailable,
31    Failed,
32}
33
34#[derive(Debug)]
35/// Server worker
36///
37/// Worker accepts message via unbounded channel and starts processing.
38pub struct Worker<T> {
39    id: WorkerId,
40    tx1: Sender<T>,
41    tx2: Sender<Shutdown>,
42    avail: WorkerAvailability,
43    failed: Arc<AtomicBool>,
44}
45
46impl<T> cmp::Ord for Worker<T> {
47    fn cmp(&self, other: &Self) -> cmp::Ordering {
48        self.id.cmp(&other.id)
49    }
50}
51
52impl<T> cmp::PartialOrd for Worker<T> {
53    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
54        Some(self.cmp(other))
55    }
56}
57
58impl<T> hash::Hash for Worker<T> {
59    fn hash<H: hash::Hasher>(&self, state: &mut H) {
60        self.id.hash(state);
61    }
62}
63
64impl<T> Eq for Worker<T> {}
65
66impl<T> PartialEq for Worker<T> {
67    fn eq(&self, other: &Worker<T>) -> bool {
68        self.id == other.id
69    }
70}
71
72#[derive(Debug)]
73/// Stop worker process
74///
75/// Stop future resolves when worker completes processing
76/// incoming items and stop arbiter
77pub struct WorkerStop(oneshot::Receiver<bool>);
78
79impl<T> Worker<T> {
80    /// Start worker.
81    pub fn start<F>(id: WorkerId, cfg: F, cid: Option<CoreId>) -> Worker<T>
82    where
83        T: Send + 'static,
84        F: ServerConfiguration<Item = T>,
85    {
86        let (tx1, rx1) = unbounded();
87        let (tx2, rx2) = unbounded();
88        let (avail, avail_tx) = WorkerAvailability::create();
89
90        Arbiter::default().exec_fn(move || {
91            if let Some(cid) = cid {
92                if core_affinity::set_for_current(cid) {
93                    log::info!("Set affinity to {cid:?} for worker {id:?}");
94                }
95            }
96
97            let _ = spawn(async move {
98                log::info!("Starting worker {id:?}");
99
100                log::debug!("Creating server instance in {id:?}");
101                let factory = cfg.create().await;
102
103                match create(id, rx1, rx2, factory, avail_tx).await {
104                    Ok((svc, wrk)) => {
105                        log::debug!("Server instance has been created in {id:?}");
106                        run_worker(svc, wrk).await;
107                    }
108                    Err(e) => {
109                        log::error!("Cannot start worker: {e:?}");
110                    }
111                }
112                Arbiter::current().stop();
113            });
114        });
115
116        Worker {
117            id,
118            tx1,
119            tx2,
120            avail,
121            failed: Arc::new(AtomicBool::new(false)),
122        }
123    }
124
125    /// Worker id.
126    pub fn id(&self) -> WorkerId {
127        self.id
128    }
129
130    /// Send message to the worker.
131    ///
132    /// Returns `Ok` if message got accepted by the worker.
133    /// Otherwise return message back as `Err`
134    pub fn send(&self, msg: T) -> Result<(), T> {
135        self.tx1.try_send(msg).map_err(|msg| msg.into_inner())
136    }
137
138    /// Check worker status.
139    pub fn status(&self) -> WorkerStatus {
140        if self.failed.load(Ordering::Acquire) {
141            WorkerStatus::Failed
142        } else if self.avail.available() {
143            WorkerStatus::Available
144        } else {
145            WorkerStatus::Unavailable
146        }
147    }
148
149    /// Wait for worker status updates
150    pub async fn wait_for_status(&mut self) -> WorkerStatus {
151        if self.failed.load(Ordering::Acquire) {
152            WorkerStatus::Failed
153        } else {
154            self.avail.wait_for_update().await;
155            if self.avail.failed() {
156                self.failed.store(true, Ordering::Release);
157            }
158            self.status()
159        }
160    }
161
162    /// Stop worker.
163    ///
164    /// If timeout value is zero, force shutdown worker
165    pub fn stop(&self, timeout: Millis) -> WorkerStop {
166        let (result, rx) = oneshot::channel();
167        let _ = self.tx2.try_send(Shutdown { timeout, result });
168        WorkerStop(rx)
169    }
170}
171
172impl<T> Clone for Worker<T> {
173    fn clone(&self) -> Self {
174        Worker {
175            id: self.id,
176            tx1: self.tx1.clone(),
177            tx2: self.tx2.clone(),
178            avail: self.avail.clone(),
179            failed: self.failed.clone(),
180        }
181    }
182}
183
184impl Future for WorkerStop {
185    type Output = bool;
186
187    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
188        match ready!(Pin::new(&mut self.0).poll(cx)) {
189            Ok(res) => Poll::Ready(res),
190            Err(_) => Poll::Ready(true),
191        }
192    }
193}
194
195#[derive(Debug, Clone)]
196struct WorkerAvailability {
197    inner: Arc<Inner>,
198}
199
200#[derive(Debug, Clone)]
201struct WorkerAvailabilityTx {
202    inner: Arc<Inner>,
203}
204
205#[derive(Debug)]
206struct Inner {
207    waker: AtomicWaker,
208    updated: AtomicBool,
209    available: AtomicBool,
210    failed: AtomicBool,
211}
212
213impl WorkerAvailability {
214    fn create() -> (Self, WorkerAvailabilityTx) {
215        let inner = Arc::new(Inner {
216            waker: AtomicWaker::new(),
217            updated: AtomicBool::new(false),
218            available: AtomicBool::new(false),
219            failed: AtomicBool::new(false),
220        });
221
222        let avail = WorkerAvailability {
223            inner: inner.clone(),
224        };
225        let avail_tx = WorkerAvailabilityTx { inner };
226        (avail, avail_tx)
227    }
228
229    fn failed(&self) -> bool {
230        self.inner.failed.load(Ordering::Acquire)
231    }
232
233    fn available(&self) -> bool {
234        self.inner.available.load(Ordering::Acquire)
235    }
236
237    async fn wait_for_update(&self) {
238        poll_fn(|cx| {
239            if self.inner.updated.load(Ordering::Acquire) {
240                self.inner.updated.store(false, Ordering::Release);
241                Poll::Ready(())
242            } else {
243                self.inner.waker.register(cx.waker());
244                Poll::Pending
245            }
246        })
247        .await;
248    }
249}
250
251impl WorkerAvailabilityTx {
252    fn set(&self, val: bool) {
253        let old = self.inner.available.swap(val, Ordering::Release);
254        if old != val {
255            self.inner.updated.store(true, Ordering::Release);
256            self.inner.waker.wake();
257        }
258    }
259}
260
261impl Drop for WorkerAvailabilityTx {
262    fn drop(&mut self) {
263        self.inner.failed.store(true, Ordering::Release);
264        self.inner.updated.store(true, Ordering::Release);
265        self.inner.available.store(false, Ordering::Release);
266        self.inner.waker.wake();
267    }
268}
269
270/// Service worker
271///
272/// Worker accepts message via unbounded channel and starts processing.
273struct WorkerSt<T, F: ServiceFactory<T>> {
274    id: WorkerId,
275    rx: Receiver<T>,
276    stop: Pin<Box<dyn Stream<Item = Shutdown>>>,
277    factory: F,
278    availability: WorkerAvailabilityTx,
279}
280
281async fn run_worker<T, F>(mut svc: PipelineBinding<F::Service, T>, mut wrk: WorkerSt<T, F>)
282where
283    T: Send + 'static,
284    F: ServiceFactory<T> + 'static,
285{
286    loop {
287        let mut recv = std::pin::pin!(wrk.rx.recv());
288        let fut = poll_fn(|cx| {
289            match svc.poll_ready(cx) {
290                Poll::Ready(Ok(())) => {
291                    wrk.availability.set(true);
292                }
293                Poll::Ready(Err(err)) => {
294                    wrk.availability.set(false);
295                    return Poll::Ready(Err(err));
296                }
297                Poll::Pending => {
298                    wrk.availability.set(false);
299                    return Poll::Pending;
300                }
301            }
302
303            match ready!(recv.as_mut().poll(cx)) {
304                Ok(item) => {
305                    let fut = svc.call(item);
306                    let _ = spawn(async move {
307                        let _ = fut.await;
308                    });
309                    Poll::Ready(Ok::<_, F::Error>(true))
310                }
311                Err(_) => {
312                    log::error!("Server is gone");
313                    Poll::Ready(Ok(false))
314                }
315            }
316        });
317
318        match select(fut, stream_recv(&mut wrk.stop)).await {
319            Either::Left(Ok(true)) => continue,
320            Either::Left(Err(_)) => {
321                let _ = ntex_rt::spawn(async move {
322                    svc.shutdown().await;
323                });
324            }
325            Either::Right(Some(Shutdown { timeout, result })) => {
326                wrk.availability.set(false);
327
328                let timeout = if timeout.is_zero() { STOP_TIMEOUT } else { timeout };
329
330                stop_svc(wrk.id, svc, timeout, Some(result)).await;
331                return;
332            }
333            Either::Left(Ok(false)) | Either::Right(None) => {
334                wrk.availability.set(false);
335                stop_svc(wrk.id, svc, STOP_TIMEOUT, None).await;
336                return;
337            }
338        }
339
340        // re-create service
341        loop {
342            match select(wrk.factory.create(()), stream_recv(&mut wrk.stop)).await {
343                Either::Left(Ok(service)) => {
344                    svc = Pipeline::new(service).bind();
345                    break;
346                }
347                Either::Left(Err(_)) => sleep(Millis::ONE_SEC).await,
348                Either::Right(_) => return,
349            }
350        }
351    }
352}
353
354async fn stop_svc<T, F>(
355    id: WorkerId,
356    svc: PipelineBinding<F, T>,
357    timeout: Millis,
358    result: Option<oneshot::Sender<bool>>,
359) where
360    T: Send + 'static,
361    F: Service<T> + 'static,
362{
363    let res = timeout_checked(timeout, svc.shutdown()).await;
364    if let Some(result) = result {
365        let _ = result.send(res.is_ok());
366    }
367
368    log::info!("Worker {id:?} has been stopped");
369}
370
371async fn create<T, F>(
372    id: WorkerId,
373    rx: Receiver<T>,
374    stop: Receiver<Shutdown>,
375    factory: Result<F, ()>,
376    availability: WorkerAvailabilityTx,
377) -> Result<(PipelineBinding<F::Service, T>, WorkerSt<T, F>), ()>
378where
379    T: Send + 'static,
380    F: ServiceFactory<T> + 'static,
381{
382    availability.set(false);
383    let factory = factory?;
384    let mut stop = Box::pin(stop);
385
386    let svc = match select(factory.create(()), stream_recv(&mut stop)).await {
387        Either::Left(Ok(svc)) => Pipeline::new(svc).bind(),
388        Either::Left(Err(_)) => return Err(()),
389        Either::Right(Some(Shutdown { result, .. })) => {
390            log::trace!("Shutdown uninitialized worker");
391            let _ = result.send(false);
392            return Err(());
393        }
394        Either::Right(None) => return Err(()),
395    };
396    availability.set(true);
397
398    Ok((
399        svc,
400        WorkerSt {
401            id,
402            rx,
403            factory,
404            availability,
405            stop: Box::pin(stop),
406        },
407    ))
408}