rocketmq_common/common/thread/thread_service_std.rs
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17use std::sync::atomic::AtomicBool;
18use std::sync::atomic::Ordering;
19use std::sync::Arc;
20use std::thread::JoinHandle;
21
22use parking_lot::Mutex;
23use tracing::info;
24
25use crate::common::thread::Runnable;
26use crate::common::thread::ServiceThread;
27
28pub struct ServiceThreadStd {
29 name: String,
30 runnable: Arc<Mutex<dyn Runnable>>,
31 thread: Option<JoinHandle<()>>,
32 stopped: Arc<AtomicBool>,
33 started: Arc<AtomicBool>,
34 notified: (parking_lot::Mutex<()>, parking_lot::Condvar),
35}
36
37impl ServiceThreadStd {
38 pub fn new<T: Runnable>(name: String, runnable: T) -> Self {
39 ServiceThreadStd {
40 name,
41 runnable: Arc::new(Mutex::new(runnable)),
42 thread: None,
43 stopped: Arc::new(AtomicBool::new(false)),
44 started: Arc::new(AtomicBool::new(false)),
45 notified: (Default::default(), Default::default()),
46 }
47 }
48}
49
50impl ServiceThreadStd {
51 pub fn start(&mut self) {
52 if let Ok(value) =
53 self.started
54 .compare_exchange(false, true, Ordering::SeqCst, Ordering::Relaxed)
55 {
56 if value {
57 return;
58 }
59 } else {
60 return;
61 }
62 let name = self.name.clone();
63 let runnable = self.runnable.clone();
64 let stopped = self.stopped.clone();
65 let thread = std::thread::Builder::new()
66 .name(name.clone())
67 .spawn(move || {
68 info!("Starting service thread: {}", name);
69 if stopped.load(std::sync::atomic::Ordering::Relaxed) {
70 info!("Service thread stopped: {}", name);
71 return;
72 }
73 runnable.lock().run();
74 })
75 .expect("Failed to start service thread");
76 self.thread = Some(thread);
77 }
78
79 pub fn shutdown(&mut self) {
80 self.shutdown_interrupt(false);
81 }
82
83 pub fn shutdown_interrupt(&mut self, interrupt: bool) {
84 if let Ok(value) =
85 self.started
86 .compare_exchange(true, false, Ordering::SeqCst, Ordering::Relaxed)
87 {
88 if !value {
89 return;
90 }
91 } else {
92 return;
93 }
94 self.stopped.store(true, Ordering::Relaxed);
95 self.wakeup();
96 if let Some(thread) = self.thread.take() {
97 if interrupt {
98 drop(thread);
99 } else {
100 thread.join().expect("Failed to join service thread");
101 }
102 }
103 }
104
105 pub fn make_stop(&mut self) {
106 if !self.started.load(Ordering::Acquire) {
107 return;
108 }
109 self.stopped.store(true, Ordering::Release);
110 }
111
112 pub fn wakeup(&mut self) {
113 self.notified.1.notify_all();
114 }
115
116 pub fn wait_for_running(&mut self, interval: i64) {
117 let mut guard = self.notified.0.lock();
118 self.notified.1.wait_for(
119 &mut guard,
120 std::time::Duration::from_millis(interval as u64),
121 );
122 }
123
124 pub fn is_stopped(&self) -> bool {
125 self.stopped.load(Ordering::Acquire)
126 }
127
128 pub fn get_service_name(&self) -> String {
129 self.name.clone()
130 }
131}
132
133#[cfg(test)]
134mod tests {
135 /*use mockall::{automock, predicate::*};
136
137 use super::*;
138
139 struct MockTestRunnable;
140 impl MockTestRunnable {
141 fn new() -> MockTestRunnable {
142 MockTestRunnable
143 }
144 }
145 impl Runnable for MockTestRunnable {
146 fn run(&mut self, service_thread: &dyn ServiceThread) {}
147 }
148
149 #[test]
150 fn test_start_and_shutdown() {
151 let mock_runnable = MockTestRunnable::new();
152
153 let mut service_thread =
154 ServiceThreadStd::new("TestServiceThread".to_string(), mock_runnable);
155
156 service_thread.start();
157 assert!(service_thread.started.load(Ordering::SeqCst));
158 assert!(!service_thread.stopped.load(Ordering::SeqCst));
159
160 service_thread.shutdown_interrupt(false);
161 assert!(!service_thread.started.load(Ordering::SeqCst));
162 assert!(service_thread.stopped.load(Ordering::SeqCst));
163 }
164
165 #[test]
166 fn test_make_stop() {
167 let mock_runnable = MockTestRunnable::new();
168 let mut service_thread =
169 ServiceThreadStd::new("TestServiceThread".to_string(), mock_runnable);
170
171 service_thread.start();
172 service_thread.make_stop();
173 assert!(service_thread.is_stopped());
174 }
175
176 #[test]
177 fn test_wait_for_running() {
178 let mock_runnable = MockTestRunnable::new();
179 let mut service_thread =
180 ServiceThreadStd::new("TestServiceThread".to_string(), mock_runnable);
181
182 service_thread.start();
183 service_thread.wait_for_running(100);
184 assert!(service_thread.started.load(Ordering::SeqCst));
185 }
186
187 #[test]
188 fn test_wakeup() {
189 let mock_runnable = MockTestRunnable::new();
190 let mut service_thread =
191 ServiceThreadStd::new("TestServiceThread".to_string(), mock_runnable);
192
193 service_thread.start();
194 service_thread.wakeup();
195 // We expect that the wakeup method is called successfully.
196 }*/
197}