rocketmq_remoting/base/
request_task.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17use std::future::Future;
18use std::pin::Pin;
19use std::sync::Arc;
20use std::task::Context;
21use std::task::Poll;
22
23use rocketmq_common::TimeUtils::get_current_millis;
24
25use crate::net::channel::Channel;
26use crate::protocol::remoting_command::RemotingCommand;
27
28pub struct RequestTask {
29    runnable: Arc<dyn Fn() + Send + Sync>,
30    create_timestamp: u64,
31    channel: Channel,
32    request: RemotingCommand,
33    stop_run: Arc<parking_lot::Mutex<bool>>,
34}
35
36impl RequestTask {
37    pub fn new(
38        runnable: Arc<dyn Fn() + Send + Sync>,
39        channel: Channel,
40        request: RemotingCommand,
41    ) -> Self {
42        Self {
43            runnable,
44            create_timestamp: get_current_millis(),
45            channel,
46            request,
47            stop_run: Arc::new(parking_lot::Mutex::new(false)),
48        }
49    }
50
51    pub fn set_stop_run(&self, stop_run: bool) {
52        let mut stop_run_lock = self.stop_run.lock();
53        *stop_run_lock = stop_run;
54    }
55
56    pub fn get_create_timestamp(&self) -> u64 {
57        self.create_timestamp
58    }
59
60    pub fn is_stop_run(&self) -> bool {
61        *self.stop_run.lock()
62    }
63
64    pub async fn return_response(&self, _code: i32, _remark: String) {
65        unimplemented!("return_response")
66    }
67}
68
69impl RequestTask {
70    pub async fn run(&self) {
71        if !self.is_stop_run() {
72            (self.runnable)();
73        }
74    }
75}
76
77impl Future for RequestTask {
78    type Output = ();
79
80    fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
81        if !self.is_stop_run() {
82            (self.runnable)();
83            return Poll::Ready(());
84        }
85        Poll::Pending
86    }
87}