rocketmq_remoting/base/
request_task.rs1use std::future::Future;
18use std::pin::Pin;
19use std::sync::Arc;
20use std::task::Context;
21use std::task::Poll;
22
23use rocketmq_common::TimeUtils::get_current_millis;
24
25use crate::net::channel::Channel;
26use crate::protocol::remoting_command::RemotingCommand;
27
28pub struct RequestTask {
29 runnable: Arc<dyn Fn() + Send + Sync>,
30 create_timestamp: u64,
31 channel: Channel,
32 request: RemotingCommand,
33 stop_run: Arc<parking_lot::Mutex<bool>>,
34}
35
36impl RequestTask {
37 pub fn new(
38 runnable: Arc<dyn Fn() + Send + Sync>,
39 channel: Channel,
40 request: RemotingCommand,
41 ) -> Self {
42 Self {
43 runnable,
44 create_timestamp: get_current_millis(),
45 channel,
46 request,
47 stop_run: Arc::new(parking_lot::Mutex::new(false)),
48 }
49 }
50
51 pub fn set_stop_run(&self, stop_run: bool) {
52 let mut stop_run_lock = self.stop_run.lock();
53 *stop_run_lock = stop_run;
54 }
55
56 pub fn get_create_timestamp(&self) -> u64 {
57 self.create_timestamp
58 }
59
60 pub fn is_stop_run(&self) -> bool {
61 *self.stop_run.lock()
62 }
63
64 pub async fn return_response(&self, _code: i32, _remark: String) {
65 unimplemented!("return_response")
66 }
67}
68
69impl RequestTask {
70 pub async fn run(&self) {
71 if !self.is_stop_run() {
72 (self.runnable)();
73 }
74 }
75}
76
77impl Future for RequestTask {
78 type Output = ();
79
80 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
81 if !self.is_stop_run() {
82 (self.runnable)();
83 return Poll::Ready(());
84 }
85 Poll::Pending
86 }
87}