kapot_executor/
shutdown.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use tokio::sync::{broadcast, mpsc};
19
20/// Listens for the server shutdown signal(copied from mini-redis example).
21///
22/// Shutdown is signalled using a `broadcast::Receiver`. Only a single value is
23/// ever sent. Once a value has been sent via the broadcast channel, the server
24/// should shutdown.
25///
26/// The `Shutdown` struct listens for the signal and tracks that the signal has
27/// been received. Callers may query for whether the shutdown signal has been
28/// received or not.
29#[derive(Debug)]
30pub struct Shutdown {
31    /// `true` if the shutdown signal has been received
32    shutdown: bool,
33
34    /// The receive half of the channel used to listen for shutdown.
35    notify: broadcast::Receiver<()>,
36}
37
38impl Shutdown {
39    /// Create a new `Shutdown` backed by the given `broadcast::Receiver`.
40    pub fn new(notify: broadcast::Receiver<()>) -> Shutdown {
41        Shutdown {
42            shutdown: false,
43            notify,
44        }
45    }
46
47    /// Returns `true` if the shutdown signal has been received.
48    pub fn is_shutdown(&self) -> bool {
49        self.shutdown
50    }
51
52    /// Receive the shutdown notice, waiting if necessary.
53    pub async fn recv(&mut self) {
54        // If the shutdown signal has already been received, then return
55        // immediately.
56        if self.shutdown {
57            return;
58        }
59
60        // Cannot receive a "lag error" as only one value is ever sent.
61        let _ = self.notify.recv().await;
62
63        // Remember that the signal has been received.
64        self.shutdown = true;
65    }
66}
67
68pub struct ShutdownNotifier {
69    /// Broadcasts a shutdown signal to all related components.
70    pub notify_shutdown: broadcast::Sender<()>,
71
72    /// Used as part of the graceful shutdown process to wait for
73    /// related components to complete processing.
74    ///
75    /// Tokio channels are closed once all `Sender` handles go out of scope.
76    /// When a channel is closed, the receiver receives `None`. This is
77    /// leveraged to detect all shutdown processing completing.
78    pub shutdown_complete_rx: mpsc::Receiver<()>,
79
80    pub shutdown_complete_tx: mpsc::Sender<()>,
81}
82
83impl ShutdownNotifier {
84    /// Create a new ShutdownNotifier instance
85    pub fn new() -> Self {
86        let (notify_shutdown, _) = broadcast::channel(1);
87        let (shutdown_complete_tx, shutdown_complete_rx) = mpsc::channel(1);
88        Self {
89            notify_shutdown,
90            shutdown_complete_rx,
91            shutdown_complete_tx,
92        }
93    }
94
95    /// Subscribe for shutdown notification
96    pub fn subscribe_for_shutdown(&self) -> Shutdown {
97        Shutdown::new(self.notify_shutdown.subscribe())
98    }
99}
100
101impl Default for ShutdownNotifier {
102    fn default() -> Self {
103        ShutdownNotifier::new()
104    }
105}