Skip to main content

ntex_server/
wrk.rs

1use std::sync::atomic::{AtomicBool, Ordering};
2use std::task::{Context, Poll, ready};
3use std::{cmp, future::Future, future::poll_fn, hash, pin::Pin, sync::Arc};
4
5use async_channel::{Receiver, Sender, TrySendError, unbounded};
6use atomic_waker::AtomicWaker;
7use core_affinity::CoreId;
8
9use ntex_rt::{Arbiter, spawn};
10use ntex_service::{Pipeline, PipelineBinding, Service, ServiceFactory};
11use ntex_util::future::{Either, Stream, select, stream_recv};
12use ntex_util::time::{Millis, sleep, timeout_checked};
13
14use crate::ServerConfiguration;
15
16const STOP_TIMEOUT: Millis = Millis(3000);
17
18#[derive(Debug)]
19/// Shutdown worker
20struct Shutdown {
21    timeout: Millis,
22    result: oneshot::Sender<bool>,
23}
24
25#[derive(Copy, Clone, Default, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
26/// Worker status
27pub enum WorkerStatus {
28    Available,
29    #[default]
30    Unavailable,
31    Failed,
32}
33
34#[derive(Debug)]
35/// Server worker
36///
37/// Worker accepts message via unbounded channel and starts processing.
38pub struct Worker<T> {
39    name: String,
40    tx1: Sender<T>,
41    tx2: Sender<Shutdown>,
42    avail: WorkerAvailability,
43    failed: Arc<AtomicBool>,
44}
45
46impl<T> cmp::Ord for Worker<T> {
47    fn cmp(&self, other: &Self) -> cmp::Ordering {
48        self.name.cmp(&other.name)
49    }
50}
51
52impl<T> cmp::PartialOrd for Worker<T> {
53    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
54        Some(self.cmp(other))
55    }
56}
57
58impl<T> hash::Hash for Worker<T> {
59    fn hash<H: hash::Hasher>(&self, state: &mut H) {
60        self.name.hash(state);
61    }
62}
63
64impl<T> Eq for Worker<T> {}
65
66impl<T> PartialEq for Worker<T> {
67    fn eq(&self, other: &Worker<T>) -> bool {
68        self.name == other.name
69    }
70}
71
72#[derive(Debug)]
73/// Stop worker process
74///
75/// Stop future resolves when worker completes processing
76/// incoming items and stop arbiter
77pub struct WorkerStop(oneshot::AsyncReceiver<bool>);
78
79impl<T> Worker<T> {
80    /// Start worker.
81    pub fn start<F>(name: String, cfg: F, cid: Option<CoreId>) -> Worker<T>
82    where
83        T: Send + 'static,
84        F: ServerConfiguration<Item = T>,
85    {
86        let (tx1, rx1) = unbounded();
87        let (tx2, rx2) = unbounded();
88        let (avail, avail_tx) = WorkerAvailability::create();
89        let name2 = name.clone();
90
91        Arbiter::with_name(name.clone()).handle().spawn(async move {
92            if let Some(cid) = cid
93                && core_affinity::set_for_current(cid)
94            {
95                log::info!("Set affinity to {cid:?} for worker {name2:?}");
96            }
97
98            spawn(async move {
99                log::info!("Starting worker {name2:?}");
100
101                log::debug!("Creating server instance in {name2:?}");
102                let factory = cfg.create().await;
103
104                match create(name2.clone(), rx1, rx2, factory, avail_tx).await {
105                    Ok((svc, wrk)) => {
106                        log::debug!("Server instance has been created in {name2:?}");
107                        run_worker(svc, wrk).await;
108                    }
109                    Err(e) => {
110                        log::error!("Cannot start worker {name2:?}: {e:?}");
111                    }
112                }
113                Arbiter::current().stop();
114            });
115        });
116
117        Worker {
118            tx1,
119            tx2,
120            name,
121            avail,
122            failed: Arc::new(AtomicBool::new(false)),
123        }
124    }
125
126    /// Worker name
127    pub fn name(&self) -> &str {
128        &self.name
129    }
130
131    /// Send message to the worker.
132    ///
133    /// Returns `Ok` if message got accepted by the worker.
134    /// Otherwise return message back as `Err`
135    pub fn send(&self, msg: T) -> Result<(), T> {
136        self.tx1.try_send(msg).map_err(TrySendError::into_inner)
137    }
138
139    /// Check worker status.
140    pub fn status(&self) -> WorkerStatus {
141        if self.failed.load(Ordering::Acquire) {
142            WorkerStatus::Failed
143        } else if self.avail.available() {
144            WorkerStatus::Available
145        } else {
146            WorkerStatus::Unavailable
147        }
148    }
149
150    /// Wait for worker status updates
151    pub async fn wait_for_status(&mut self) -> WorkerStatus {
152        if self.failed.load(Ordering::Acquire) {
153            WorkerStatus::Failed
154        } else {
155            self.avail.wait_for_update().await;
156            if self.avail.failed() {
157                self.failed.store(true, Ordering::Release);
158            }
159            self.status()
160        }
161    }
162
163    /// Stop worker.
164    ///
165    /// If timeout value is zero, force shutdown worker
166    pub fn stop(&self, timeout: Millis) -> WorkerStop {
167        let (result, rx) = oneshot::async_channel();
168        let _ = self.tx2.try_send(Shutdown { timeout, result });
169        WorkerStop(rx)
170    }
171}
172
173impl<T> Clone for Worker<T> {
174    fn clone(&self) -> Self {
175        Worker {
176            tx1: self.tx1.clone(),
177            tx2: self.tx2.clone(),
178            name: self.name.clone(),
179            avail: self.avail.clone(),
180            failed: self.failed.clone(),
181        }
182    }
183}
184
185impl Future for WorkerStop {
186    type Output = bool;
187
188    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
189        match ready!(Pin::new(&mut self.0).poll(cx)) {
190            Ok(res) => Poll::Ready(res),
191            Err(_) => Poll::Ready(true),
192        }
193    }
194}
195
196#[derive(Debug, Clone)]
197struct WorkerAvailability {
198    inner: Arc<Inner>,
199}
200
201#[derive(Debug, Clone)]
202struct WorkerAvailabilityTx {
203    inner: Arc<Inner>,
204}
205
206#[derive(Debug)]
207struct Inner {
208    waker: AtomicWaker,
209    updated: AtomicBool,
210    available: AtomicBool,
211    failed: AtomicBool,
212}
213
214impl WorkerAvailability {
215    fn create() -> (Self, WorkerAvailabilityTx) {
216        let inner = Arc::new(Inner {
217            waker: AtomicWaker::new(),
218            updated: AtomicBool::new(false),
219            available: AtomicBool::new(false),
220            failed: AtomicBool::new(false),
221        });
222
223        let avail = WorkerAvailability {
224            inner: inner.clone(),
225        };
226        let avail_tx = WorkerAvailabilityTx { inner };
227        (avail, avail_tx)
228    }
229
230    fn failed(&self) -> bool {
231        self.inner.failed.load(Ordering::Acquire)
232    }
233
234    fn available(&self) -> bool {
235        self.inner.available.load(Ordering::Acquire)
236    }
237
238    async fn wait_for_update(&self) {
239        poll_fn(|cx| {
240            self.inner.waker.register(cx.waker());
241            if self.inner.updated.swap(false, Ordering::AcqRel) {
242                Poll::Ready(())
243            } else {
244                Poll::Pending
245            }
246        })
247        .await;
248    }
249}
250
251impl WorkerAvailabilityTx {
252    fn set(&self, val: bool) {
253        let old = self.inner.available.swap(val, Ordering::Release);
254        if old != val {
255            self.inner.updated.store(true, Ordering::Release);
256            self.inner.waker.wake();
257        }
258    }
259}
260
261impl Drop for WorkerAvailabilityTx {
262    fn drop(&mut self) {
263        self.inner.failed.store(true, Ordering::Release);
264        self.inner.updated.store(true, Ordering::Release);
265        self.inner.available.store(false, Ordering::Release);
266        self.inner.waker.wake();
267    }
268}
269
270/// Service worker
271///
272/// Worker accepts message via unbounded channel and starts processing.
273struct WorkerSt<T, F: ServiceFactory<T>> {
274    name: String,
275    rx: Receiver<T>,
276    stop: Pin<Box<dyn Stream<Item = Shutdown>>>,
277    factory: F,
278    availability: WorkerAvailabilityTx,
279}
280
281async fn run_worker<T, F>(mut svc: PipelineBinding<F::Service, T>, mut wrk: WorkerSt<T, F>)
282where
283    T: Send + 'static,
284    F: ServiceFactory<T> + 'static,
285{
286    loop {
287        let mut recv = std::pin::pin!(wrk.rx.recv());
288        let fut = poll_fn(|cx| {
289            match svc.poll_ready(cx) {
290                Poll::Ready(Ok(())) => {
291                    wrk.availability.set(true);
292                }
293                Poll::Ready(Err(err)) => {
294                    wrk.availability.set(false);
295                    return Poll::Ready(Err(err));
296                }
297                Poll::Pending => {
298                    wrk.availability.set(false);
299                    return Poll::Pending;
300                }
301            }
302
303            if let Ok(item) = ready!(recv.as_mut().poll(cx)) {
304                let fut = svc.call(item);
305                spawn(async move {
306                    let _ = fut.await;
307                });
308                Poll::Ready(Ok::<_, F::Error>(true))
309            } else {
310                log::error!("Server is gone");
311                Poll::Ready(Ok(false))
312            }
313        });
314
315        match select(fut, stream_recv(&mut wrk.stop)).await {
316            Either::Left(Ok(true)) => continue,
317            Either::Left(Err(_)) => {
318                ntex_rt::spawn(async move {
319                    svc.shutdown().await;
320                });
321            }
322            Either::Right(Some(Shutdown { timeout, result })) => {
323                wrk.availability.set(false);
324
325                let timeout = if timeout.is_zero() { STOP_TIMEOUT } else { timeout };
326
327                stop_svc(&wrk.name, svc, timeout, Some(result)).await;
328                return;
329            }
330            Either::Left(Ok(false)) | Either::Right(None) => {
331                wrk.availability.set(false);
332                stop_svc(&wrk.name, svc, STOP_TIMEOUT, None).await;
333                return;
334            }
335        }
336
337        // re-create service
338        loop {
339            match select(wrk.factory.create(()), stream_recv(&mut wrk.stop)).await {
340                Either::Left(Ok(service)) => {
341                    svc = Pipeline::new(service).bind();
342                    break;
343                }
344                Either::Left(Err(_)) => sleep(Millis::ONE_SEC).await,
345                Either::Right(_) => return,
346            }
347        }
348    }
349}
350
351async fn stop_svc<T, F>(
352    name: &str,
353    svc: PipelineBinding<F, T>,
354    timeout: Millis,
355    result: Option<oneshot::Sender<bool>>,
356) where
357    T: Send + 'static,
358    F: Service<T> + 'static,
359{
360    let res = timeout_checked(timeout, svc.shutdown()).await;
361    if let Some(result) = result {
362        let _ = result.send(res.is_ok());
363    }
364
365    log::info!("Worker {name:?} has been stopped");
366}
367
368async fn create<T, F>(
369    name: String,
370    rx: Receiver<T>,
371    stop: Receiver<Shutdown>,
372    factory: Result<F, ()>,
373    availability: WorkerAvailabilityTx,
374) -> Result<(PipelineBinding<F::Service, T>, WorkerSt<T, F>), ()>
375where
376    T: Send + 'static,
377    F: ServiceFactory<T> + 'static,
378{
379    availability.set(false);
380    let factory = factory?;
381    let mut stop = Box::pin(stop);
382
383    let svc = match select(factory.create(()), stream_recv(&mut stop)).await {
384        Either::Left(Ok(svc)) => Pipeline::new(svc).bind(),
385        Either::Right(Some(Shutdown { result, .. })) => {
386            log::trace!("Shutdown uninitialized worker");
387            let _ = result.send(false);
388            return Err(());
389        }
390        Either::Left(Err(_)) | Either::Right(None) => return Err(()),
391    };
392    availability.set(true);
393
394    Ok((
395        svc,
396        WorkerSt {
397            name,
398            rx,
399            factory,
400            availability,
401            stop: Box::pin(stop),
402        },
403    ))
404}