1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::time::Duration;
pub enum RocketMQRuntime {
Multi(tokio::runtime::Runtime),
}
impl RocketMQRuntime {
pub fn new_multi(threads: usize, name: &str) -> Self {
Self::Multi(
tokio::runtime::Builder::new_multi_thread()
.worker_threads(threads)
.thread_name(name)
.enable_all()
.build()
.unwrap(),
)
}
}
impl RocketMQRuntime {
pub fn get_handle(&self) -> &tokio::runtime::Handle {
match self {
Self::Multi(runtime) => runtime.handle(),
}
}
pub fn get_runtime(&self) -> &tokio::runtime::Runtime {
match self {
Self::Multi(runtime) => runtime,
}
}
pub fn shutdown(self) {
match self {
Self::Multi(runtime) => runtime.shutdown_background(),
}
}
pub fn schedule_at_fixed_rate<F>(
&self,
task: F,
initial_delay: Option<Duration>,
period: Duration,
) where
F: Fn() + Send + 'static,
{
match self {
RocketMQRuntime::Multi(runtime) => {
runtime.handle().spawn(async move {
// initial delay
if let Some(initial_delay_inner) = initial_delay {
tokio::time::sleep(initial_delay_inner).await;
}
loop {
// record current execution time
let current_execution_time = tokio::time::Instant::now();
// execute task
task();
// Calculate the time of the next execution
let next_execution_time = current_execution_time + period;
// Wait until the next execution
let delay = next_execution_time
.saturating_duration_since(tokio::time::Instant::now());
tokio::time::sleep(delay).await;
}
});
}
}
}
}