pub(crate) mod take_cell;
use crate::{MetricsContext, worker::*};
use std::{
fmt::{Debug, Formatter},
sync::{
Arc,
atomic::{AtomicBool, AtomicUsize, Ordering},
},
};
use temporalio_common::{telemetry::metrics::TemporalMeter, worker::WorkerDeploymentVersion};
use tokio::sync::watch;
use tokio_util::sync::CancellationToken;
#[derive(Clone)]
pub(crate) struct MeteredPermitDealer<SK: SlotKind> {
supplier: Arc<dyn SlotSupplier<SlotKind = SK> + Send + Sync>,
slot_supplier_kind: SlotSupplierKind,
unused_claimants: Arc<AtomicUsize>,
extant_permits: (watch::Sender<usize>, watch::Receiver<usize>),
max_permits: Option<usize>,
metrics_ctx: MetricsContext,
meter: Option<TemporalMeter>,
is_sticky_poller: bool,
context_data: Arc<PermitDealerContextData>,
}
#[derive(Clone, Debug)]
#[cfg_attr(test, derive(Default))]
pub(crate) struct PermitDealerContextData {
pub(crate) task_queue: String,
pub(crate) worker_identity: String,
pub(crate) worker_deployment_version: Option<WorkerDeploymentVersion>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) enum SlotSupplierKind {
Fixed,
ResourceBased,
Custom(String),
}
impl SlotSupplierKind {
fn from_label(label: &str) -> Self {
if label == "Fixed" {
SlotSupplierKind::Fixed
} else if label == "ResourceBased" {
SlotSupplierKind::ResourceBased
} else {
SlotSupplierKind::Custom(label.to_string())
}
}
}
impl std::fmt::Display for SlotSupplierKind {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
SlotSupplierKind::Fixed => f.write_str("Fixed"),
SlotSupplierKind::ResourceBased => f.write_str("ResourceBased"),
SlotSupplierKind::Custom(name) => f.write_str(name.as_str()),
}
}
}
impl<SK> MeteredPermitDealer<SK>
where
SK: SlotKind + 'static,
{
pub(crate) fn new(
supplier: Arc<dyn SlotSupplier<SlotKind = SK> + Send + Sync>,
metrics_ctx: MetricsContext,
max_permits: Option<usize>,
context_data: Arc<PermitDealerContextData>,
meter: Option<TemporalMeter>,
) -> Self {
let supplier_kind_label = supplier.slot_supplier_kind();
let slot_supplier_kind = SlotSupplierKind::from_label(supplier_kind_label.as_ref());
Self {
supplier,
slot_supplier_kind,
unused_claimants: Arc::new(AtomicUsize::new(0)),
extant_permits: watch::channel(0),
metrics_ctx,
meter,
max_permits,
is_sticky_poller: false,
context_data,
}
}
pub(crate) fn available_permits(&self) -> Option<usize> {
self.supplier.available_slots()
}
pub(crate) fn slot_supplier_kind(&self) -> &SlotSupplierKind {
&self.slot_supplier_kind
}
#[cfg(test)]
pub(crate) fn unused_permits(&self) -> Option<usize> {
self.available_permits()
.map(|ap| ap + self.unused_claimants.load(Ordering::Acquire))
}
pub(crate) async fn acquire_owned(&self) -> OwnedMeteredSemPermit<SK> {
if let Some(max) = self.max_permits {
self.extant_permits
.1
.clone()
.wait_for(|&ep| ep < max)
.await
.expect("Extant permit channel is never closed");
}
let res = self.supplier.reserve_slot(self).await;
self.build_owned(res)
}
pub(crate) fn try_acquire_owned(&self) -> Result<OwnedMeteredSemPermit<SK>, ()> {
if let Some(max) = self.max_permits
&& *self.extant_permits.1.borrow() >= max
{
return Err(());
}
if let Some(res) = self.supplier.try_reserve_slot(self) {
Ok(self.build_owned(res))
} else {
Err(())
}
}
pub(crate) fn get_extant_count_rcv(&self) -> watch::Receiver<usize> {
self.extant_permits.1.clone()
}
fn build_owned(&self, res: SlotSupplierPermit) -> OwnedMeteredSemPermit<SK> {
self.unused_claimants.fetch_add(1, Ordering::Release);
self.extant_permits.0.send_modify(|ep| *ep += 1);
let uc_c = self.unused_claimants.clone();
let ep_rx_c = self.extant_permits.1.clone();
let ep_tx_c = self.extant_permits.0.clone();
let supp = self.supplier.clone();
let supp_c = self.supplier.clone();
let supp_c_c = self.supplier.clone();
let mets = self.metrics_ctx.clone();
let metric_rec =
move |add_one: bool| {
let extra = usize::from(add_one);
let unused = uc_c.load(Ordering::Acquire);
if let Some(avail) = supp.available_slots() {
mets.available_task_slots(avail + unused + extra);
}
mets.task_slots_used((ep_rx_c.borrow().saturating_sub(unused) + extra) as u64);
};
let mrc = metric_rec.clone();
mrc(false);
OwnedMeteredSemPermit {
unused_claimants: Some(self.unused_claimants.clone()),
release_ctx: ReleaseCtx {
permit: res,
stored_info: None,
meter: self.meter.clone(),
},
use_fn: Box::new(move |info| {
supp_c.mark_slot_used(info);
metric_rec(false)
}),
release_fn: Box::new(move |info| {
supp_c_c.release_slot(info);
ep_tx_c.send_modify(|ep| *ep -= 1);
mrc(true)
}),
}
}
}
impl MeteredPermitDealer<WorkflowSlotKind> {
pub(crate) fn into_sticky(mut self) -> Self {
self.is_sticky_poller = true;
self
}
}
impl<SK: SlotKind> SlotReservationContext for MeteredPermitDealer<SK> {
fn task_queue(&self) -> &str {
&self.context_data.task_queue
}
fn worker_identity(&self) -> &str {
&self.context_data.worker_identity
}
fn worker_deployment_version(&self) -> &Option<WorkerDeploymentVersion> {
&self.context_data.worker_deployment_version
}
fn num_issued_slots(&self) -> usize {
*self.extant_permits.1.borrow()
}
fn is_sticky(&self) -> bool {
self.is_sticky_poller
}
fn get_metrics_meter(&self) -> Option<TemporalMeter> {
self.meter.clone()
}
}
struct UseCtx<'a, SK: SlotKind> {
stored_info: &'a SK::Info,
permit: &'a SlotSupplierPermit,
meter: Option<TemporalMeter>,
}
impl<SK: SlotKind> SlotMarkUsedContext for UseCtx<'_, SK> {
type SlotKind = SK;
fn permit(&self) -> &SlotSupplierPermit {
self.permit
}
fn info(&self) -> &<Self::SlotKind as SlotKind>::Info {
self.stored_info
}
fn get_metrics_meter(&self) -> Option<TemporalMeter> {
self.meter.clone()
}
}
struct ReleaseCtx<SK: SlotKind> {
permit: SlotSupplierPermit,
stored_info: Option<SK::Info>,
meter: Option<TemporalMeter>,
}
impl<SK: SlotKind> SlotReleaseContext for ReleaseCtx<SK> {
type SlotKind = SK;
fn permit(&self) -> &SlotSupplierPermit {
&self.permit
}
fn info(&self) -> Option<&<Self::SlotKind as SlotKind>::Info> {
self.stored_info.as_ref()
}
fn get_metrics_meter(&self) -> Option<TemporalMeter> {
self.meter.clone()
}
}
pub(crate) struct ClosableMeteredPermitDealer<SK: SlotKind> {
inner: Arc<MeteredPermitDealer<SK>>,
outstanding_permits: AtomicUsize,
close_requested: AtomicBool,
close_complete_token: CancellationToken,
}
impl<SK> ClosableMeteredPermitDealer<SK>
where
SK: SlotKind,
{
pub(crate) fn new_arc(sem: Arc<MeteredPermitDealer<SK>>) -> Arc<Self> {
Arc::new(Self {
inner: sem,
outstanding_permits: Default::default(),
close_requested: AtomicBool::new(false),
close_complete_token: CancellationToken::new(),
})
}
}
impl<SK> ClosableMeteredPermitDealer<SK>
where
SK: SlotKind + 'static,
{
#[cfg(test)]
pub(crate) fn unused_permits(&self) -> Option<usize> {
self.inner.unused_permits()
}
pub(crate) fn close(&self) {
self.close_requested.store(true, Ordering::Release);
if self.outstanding_permits.load(Ordering::Acquire) == 0 {
self.close_complete_token.cancel();
}
}
pub(crate) async fn close_complete(&self) {
self.close_complete_token.cancelled().await;
}
pub(crate) fn try_acquire_owned(
self: &Arc<Self>,
) -> Result<TrackedOwnedMeteredSemPermit<SK>, ()> {
if self.close_requested.load(Ordering::Acquire) {
return Err(());
}
self.outstanding_permits.fetch_add(1, Ordering::Release);
let res = self.inner.try_acquire_owned();
if res.is_err() {
self.outstanding_permits.fetch_sub(1, Ordering::Release);
}
res.map(|permit| TrackedOwnedMeteredSemPermit {
inner: Some(permit),
on_drop: self.on_permit_dropped(),
})
}
fn on_permit_dropped(self: &Arc<Self>) -> Box<dyn Fn() + Send + Sync> {
let sem = self.clone();
Box::new(move || {
sem.outstanding_permits.fetch_sub(1, Ordering::Release);
if sem.close_requested.load(Ordering::Acquire)
&& sem.outstanding_permits.load(Ordering::Acquire) == 0
{
sem.close_complete_token.cancel();
}
})
}
}
#[derive(derive_more::Debug)]
#[debug("Tracked({inner:?})")]
#[clippy::has_significant_drop]
pub(crate) struct TrackedOwnedMeteredSemPermit<SK: SlotKind> {
inner: Option<OwnedMeteredSemPermit<SK>>,
on_drop: Box<dyn Fn() + Send + Sync>,
}
impl<SK: SlotKind> From<TrackedOwnedMeteredSemPermit<SK>> for OwnedMeteredSemPermit<SK> {
fn from(mut value: TrackedOwnedMeteredSemPermit<SK>) -> Self {
value
.inner
.take()
.expect("Inner permit should be available")
}
}
impl<SK: SlotKind> Drop for TrackedOwnedMeteredSemPermit<SK> {
fn drop(&mut self) {
(self.on_drop)();
}
}
#[clippy::has_significant_drop]
pub(crate) struct OwnedMeteredSemPermit<SK: SlotKind> {
unused_claimants: Option<Arc<AtomicUsize>>,
release_ctx: ReleaseCtx<SK>,
#[allow(clippy::type_complexity)] use_fn: Box<dyn Fn(&UseCtx<SK>) + Send + Sync>,
#[allow(clippy::type_complexity)] release_fn: Box<dyn Fn(&ReleaseCtx<SK>) + Send + Sync>,
}
impl<SK: SlotKind> Drop for OwnedMeteredSemPermit<SK> {
fn drop(&mut self) {
if let Some(uc) = self.unused_claimants.take() {
uc.fetch_sub(1, Ordering::Release);
}
(self.release_fn)(&self.release_ctx);
}
}
impl<SK: SlotKind> Debug for OwnedMeteredSemPermit<SK> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str("OwnedMeteredSemPermit()")
}
}
impl<SK: SlotKind> OwnedMeteredSemPermit<SK> {
pub(crate) fn into_used(mut self, info: SK::Info) -> UsedMeteredSemPermit<SK> {
if let Some(uc) = self.unused_claimants.take() {
uc.fetch_sub(1, Ordering::Release);
}
let ctx = UseCtx {
stored_info: &info,
permit: &self.release_ctx.permit,
meter: self.release_ctx.meter.clone(),
};
(self.use_fn)(&ctx);
self.release_ctx.stored_info = Some(info);
UsedMeteredSemPermit(self)
}
}
#[derive(Debug)]
pub(crate) struct UsedMeteredSemPermit<SK: SlotKind>(#[allow(dead_code)] OwnedMeteredSemPermit<SK>);
macro_rules! dbg_panic {
($($arg:tt)*) => {{
let message = format!($($arg)*);
error!("{}", message);
#[cfg(feature = "antithesis_assertions")]
crate::antithesis::assert_always!(
false,
"dbg_panic invariant triggered",
::serde_json::json!({
"message": message,
"file": file!(),
"line": line!(),
"module": module_path!(),
})
);
debug_assert!(false, "{}", message);
}};
}
pub(crate) use dbg_panic;
pub(crate) struct ActiveCounter<F: Fn(usize)>(watch::Sender<usize>, Option<Arc<F>>);
impl<F> ActiveCounter<F>
where
F: Fn(usize),
{
pub(crate) fn new(a: watch::Sender<usize>, change_fn: Option<Arc<F>>) -> Self {
a.send_modify(|v| {
*v += 1;
if let Some(cfn) = change_fn.as_ref() {
cfn(*v);
}
});
Self(a, change_fn)
}
}
impl<F> Drop for ActiveCounter<F>
where
F: Fn(usize),
{
fn drop(&mut self) {
self.0.send_modify(|v| {
*v -= 1;
if let Some(cfn) = self.1.as_ref() {
cfn(*v)
};
});
}
}
#[cfg(test)]
pub(crate) mod tests {
use super::*;
use crate::{
advance_fut,
worker::{WorkflowSlotKind, tuner::FixedSizeSlotSupplier},
};
use futures_util::FutureExt;
pub(crate) fn fixed_size_permit_dealer<SK: SlotKind + Send + Sync + 'static>(
size: usize,
) -> MeteredPermitDealer<SK> {
MeteredPermitDealer::new(
Arc::new(FixedSizeSlotSupplier::new(size)),
MetricsContext::no_op(),
None,
Arc::new(Default::default()),
None,
)
}
#[test]
fn closable_semaphore_permit_drop_returns_permit() {
let inner = fixed_size_permit_dealer::<WorkflowSlotKind>(2);
let sem = ClosableMeteredPermitDealer::new_arc(Arc::new(inner));
let perm = sem.try_acquire_owned().unwrap();
let permits = sem.outstanding_permits.load(Ordering::Acquire);
assert_eq!(permits, 1);
drop(perm);
let permits = sem.outstanding_permits.load(Ordering::Acquire);
assert_eq!(permits, 0);
}
#[tokio::test]
async fn closable_semaphore_permit_drop_after_close_resolves_close_complete() {
let inner = fixed_size_permit_dealer::<WorkflowSlotKind>(2);
let sem = ClosableMeteredPermitDealer::new_arc(Arc::new(inner));
let perm = sem.try_acquire_owned().unwrap();
sem.close();
drop(perm);
sem.close_complete().await;
}
#[tokio::test]
async fn closable_semaphore_close_complete_ready_if_unused() {
let inner = fixed_size_permit_dealer::<WorkflowSlotKind>(2);
let sem = ClosableMeteredPermitDealer::new_arc(Arc::new(inner));
sem.close();
sem.close_complete().await;
}
#[test]
fn closable_semaphore_does_not_hand_out_permits_after_closed() {
let inner = fixed_size_permit_dealer::<WorkflowSlotKind>(2);
let sem = ClosableMeteredPermitDealer::new_arc(Arc::new(inner));
sem.close();
sem.try_acquire_owned().unwrap_err();
}
#[tokio::test]
async fn respects_max_extant_permits() {
let mut sem = fixed_size_permit_dealer::<WorkflowSlotKind>(2);
sem.max_permits = Some(1);
let perm = sem.try_acquire_owned().unwrap();
sem.try_acquire_owned().unwrap_err();
let acquire_fut = sem.acquire_owned();
advance_fut!(acquire_fut);
drop(perm);
acquire_fut.await;
}
#[test]
fn captures_slot_supplier_kind() {
let dealer = fixed_size_permit_dealer::<WorkflowSlotKind>(1);
assert_eq!(*dealer.slot_supplier_kind(), SlotSupplierKind::Fixed);
}
}