use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
use rocketmq_common::TimeUtils::current_millis;
use crate::net::channel::Channel;
use crate::protocol::remoting_command::RemotingCommand;
use tracing::warn;
pub struct RequestTask {
runnable: Arc<dyn Fn() + Send + Sync>,
create_timestamp: u64,
channel: Channel,
request: RemotingCommand,
stop_run: Arc<parking_lot::Mutex<bool>>,
}
impl RequestTask {
pub fn new(runnable: Arc<dyn Fn() + Send + Sync>, channel: Channel, request: RemotingCommand) -> Self {
Self {
runnable,
create_timestamp: current_millis(),
channel,
request,
stop_run: Arc::new(parking_lot::Mutex::new(false)),
}
}
pub fn set_stop_run(&self, stop_run: bool) {
let mut stop_run_lock = self.stop_run.lock();
*stop_run_lock = stop_run;
}
pub fn get_create_timestamp(&self) -> u64 {
self.create_timestamp
}
pub fn is_stop_run(&self) -> bool {
*self.stop_run.lock()
}
pub async fn return_response(&self, code: i32, remark: String) {
let response =
RemotingCommand::create_response_command_with_code_remark(code, remark).set_opaque(self.request.opaque());
let mut channel = self.channel.clone();
if let Err(err) = channel.connection_mut().send_command(response).await {
warn!("return response to {} failed: {}", channel.remote_address(), err);
}
}
}
impl RequestTask {
pub async fn run(&self) {
if !self.is_stop_run() {
(self.runnable)();
}
}
}
impl Future for RequestTask {
type Output = ();
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
if !self.is_stop_run() {
(self.runnable)();
return Poll::Ready(());
}
Poll::Pending
}
}