kapot_executor/shutdown.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use tokio::sync::{broadcast, mpsc};
19
20/// Listens for the server shutdown signal(copied from mini-redis example).
21///
22/// Shutdown is signalled using a `broadcast::Receiver`. Only a single value is
23/// ever sent. Once a value has been sent via the broadcast channel, the server
24/// should shutdown.
25///
26/// The `Shutdown` struct listens for the signal and tracks that the signal has
27/// been received. Callers may query for whether the shutdown signal has been
28/// received or not.
29#[derive(Debug)]
30pub struct Shutdown {
31 /// `true` if the shutdown signal has been received
32 shutdown: bool,
33
34 /// The receive half of the channel used to listen for shutdown.
35 notify: broadcast::Receiver<()>,
36}
37
38impl Shutdown {
39 /// Create a new `Shutdown` backed by the given `broadcast::Receiver`.
40 pub fn new(notify: broadcast::Receiver<()>) -> Shutdown {
41 Shutdown {
42 shutdown: false,
43 notify,
44 }
45 }
46
47 /// Returns `true` if the shutdown signal has been received.
48 pub fn is_shutdown(&self) -> bool {
49 self.shutdown
50 }
51
52 /// Receive the shutdown notice, waiting if necessary.
53 pub async fn recv(&mut self) {
54 // If the shutdown signal has already been received, then return
55 // immediately.
56 if self.shutdown {
57 return;
58 }
59
60 // Cannot receive a "lag error" as only one value is ever sent.
61 let _ = self.notify.recv().await;
62
63 // Remember that the signal has been received.
64 self.shutdown = true;
65 }
66}
67
68pub struct ShutdownNotifier {
69 /// Broadcasts a shutdown signal to all related components.
70 pub notify_shutdown: broadcast::Sender<()>,
71
72 /// Used as part of the graceful shutdown process to wait for
73 /// related components to complete processing.
74 ///
75 /// Tokio channels are closed once all `Sender` handles go out of scope.
76 /// When a channel is closed, the receiver receives `None`. This is
77 /// leveraged to detect all shutdown processing completing.
78 pub shutdown_complete_rx: mpsc::Receiver<()>,
79
80 pub shutdown_complete_tx: mpsc::Sender<()>,
81}
82
83impl ShutdownNotifier {
84 /// Create a new ShutdownNotifier instance
85 pub fn new() -> Self {
86 let (notify_shutdown, _) = broadcast::channel(1);
87 let (shutdown_complete_tx, shutdown_complete_rx) = mpsc::channel(1);
88 Self {
89 notify_shutdown,
90 shutdown_complete_rx,
91 shutdown_complete_tx,
92 }
93 }
94
95 /// Subscribe for shutdown notification
96 pub fn subscribe_for_shutdown(&self) -> Shutdown {
97 Shutdown::new(self.notify_shutdown.subscribe())
98 }
99}
100
101impl Default for ShutdownNotifier {
102 fn default() -> Self {
103 ShutdownNotifier::new()
104 }
105}