rocketmq_common/common/thread/
thread_service_std.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17use std::sync::atomic::AtomicBool;
18use std::sync::atomic::Ordering;
19use std::sync::Arc;
20use std::thread::JoinHandle;
21
22use parking_lot::Mutex;
23use tracing::info;
24
25use crate::common::thread::Runnable;
26use crate::common::thread::ServiceThread;
27
28pub struct ServiceThreadStd {
29    name: String,
30    runnable: Arc<Mutex<dyn Runnable>>,
31    thread: Option<JoinHandle<()>>,
32    stopped: Arc<AtomicBool>,
33    started: Arc<AtomicBool>,
34    notified: (parking_lot::Mutex<()>, parking_lot::Condvar),
35}
36
37impl ServiceThreadStd {
38    pub fn new<T: Runnable>(name: String, runnable: T) -> Self {
39        ServiceThreadStd {
40            name,
41            runnable: Arc::new(Mutex::new(runnable)),
42            thread: None,
43            stopped: Arc::new(AtomicBool::new(false)),
44            started: Arc::new(AtomicBool::new(false)),
45            notified: (Default::default(), Default::default()),
46        }
47    }
48}
49
50impl ServiceThreadStd {
51    pub fn start(&mut self) {
52        if let Ok(value) =
53            self.started
54                .compare_exchange(false, true, Ordering::SeqCst, Ordering::Relaxed)
55        {
56            if value {
57                return;
58            }
59        } else {
60            return;
61        }
62        let name = self.name.clone();
63        let runnable = self.runnable.clone();
64        let stopped = self.stopped.clone();
65        let thread = std::thread::Builder::new()
66            .name(name.clone())
67            .spawn(move || {
68                info!("Starting service thread: {}", name);
69                if stopped.load(std::sync::atomic::Ordering::Relaxed) {
70                    info!("Service thread stopped: {}", name);
71                    return;
72                }
73                runnable.lock().run();
74            })
75            .expect("Failed to start service thread");
76        self.thread = Some(thread);
77    }
78
79    pub fn shutdown(&mut self) {
80        self.shutdown_interrupt(false);
81    }
82
83    pub fn shutdown_interrupt(&mut self, interrupt: bool) {
84        if let Ok(value) =
85            self.started
86                .compare_exchange(true, false, Ordering::SeqCst, Ordering::Relaxed)
87        {
88            if !value {
89                return;
90            }
91        } else {
92            return;
93        }
94        self.stopped.store(true, Ordering::Relaxed);
95        self.wakeup();
96        if let Some(thread) = self.thread.take() {
97            if interrupt {
98                drop(thread);
99            } else {
100                thread.join().expect("Failed to join service thread");
101            }
102        }
103    }
104
105    pub fn make_stop(&mut self) {
106        if !self.started.load(Ordering::Acquire) {
107            return;
108        }
109        self.stopped.store(true, Ordering::Release);
110    }
111
112    pub fn wakeup(&mut self) {
113        self.notified.1.notify_all();
114    }
115
116    pub fn wait_for_running(&mut self, interval: i64) {
117        let mut guard = self.notified.0.lock();
118        self.notified.1.wait_for(
119            &mut guard,
120            std::time::Duration::from_millis(interval as u64),
121        );
122    }
123
124    pub fn is_stopped(&self) -> bool {
125        self.stopped.load(Ordering::Acquire)
126    }
127
128    pub fn get_service_name(&self) -> String {
129        self.name.clone()
130    }
131}
132
133#[cfg(test)]
134mod tests {
135    /*use mockall::{automock, predicate::*};
136
137    use super::*;
138
139    struct MockTestRunnable;
140    impl MockTestRunnable {
141        fn new() -> MockTestRunnable {
142            MockTestRunnable
143        }
144    }
145    impl Runnable for MockTestRunnable {
146        fn run(&mut self, service_thread: &dyn ServiceThread) {}
147    }
148
149    #[test]
150    fn test_start_and_shutdown() {
151        let mock_runnable = MockTestRunnable::new();
152
153        let mut service_thread =
154            ServiceThreadStd::new("TestServiceThread".to_string(), mock_runnable);
155
156        service_thread.start();
157        assert!(service_thread.started.load(Ordering::SeqCst));
158        assert!(!service_thread.stopped.load(Ordering::SeqCst));
159
160        service_thread.shutdown_interrupt(false);
161        assert!(!service_thread.started.load(Ordering::SeqCst));
162        assert!(service_thread.stopped.load(Ordering::SeqCst));
163    }
164
165    #[test]
166    fn test_make_stop() {
167        let mock_runnable = MockTestRunnable::new();
168        let mut service_thread =
169            ServiceThreadStd::new("TestServiceThread".to_string(), mock_runnable);
170
171        service_thread.start();
172        service_thread.make_stop();
173        assert!(service_thread.is_stopped());
174    }
175
176    #[test]
177    fn test_wait_for_running() {
178        let mock_runnable = MockTestRunnable::new();
179        let mut service_thread =
180            ServiceThreadStd::new("TestServiceThread".to_string(), mock_runnable);
181
182        service_thread.start();
183        service_thread.wait_for_running(100);
184        assert!(service_thread.started.load(Ordering::SeqCst));
185    }
186
187    #[test]
188    fn test_wakeup() {
189        let mock_runnable = MockTestRunnable::new();
190        let mut service_thread =
191            ServiceThreadStd::new("TestServiceThread".to_string(), mock_runnable);
192
193        service_thread.start();
194        service_thread.wakeup();
195        // We expect that the wakeup method is called successfully.
196    }*/
197}