#[macro_export]
macro_rules! watchers {
(
(
Server: $server:ident,
Storage: $storage:ty $(,)?
),
$(
$pk:path => $handler_fn:ident ($($glob:literal),+ $(,)?)
),* $(,)?
) => {
$crate::watchers!(@expand_entries $server, [$storage] [] $($pk => $handler_fn ($($glob),+)),*);
};
(
(
Server: $server:ident,
Storage: $storage:ty $(,)?
),
$(
$pk:path => $handler_fn:ident
),* $(,)?
) => {
$crate::watchers!(@expand_entries $server, [$storage] [] $($pk => $handler_fn),*);
};
(@expand_entries $struct_name:ident, [$storage_ty:ty] [$($expanded:tt)*]) => {
$crate::watchers!(@impl $struct_name, [$storage_ty]
$($expanded)*
);
};
(@expand_entries $struct_name:ident, [$storage_ty:ty] [$($expanded:tt)*] $pk:path => $handler:ident ($($glob:literal),+) $(, $($rest:tt)*)?) => {
$crate::watchers!(@expand_entries $struct_name, [$storage_ty] [
$($expanded)*
[$pk] => $handler [$handler] [$($glob),+],
] $($($rest)*)?);
};
(@expand_entries $struct_name:ident, [$storage_ty:ty] [$($expanded:tt)*] $pk:path => $handler:ident $(, $($rest:tt)*)?) => {
$crate::watchers!(@expand_entries $struct_name, [$storage_ty] [
$($expanded)*
[$pk] => $handler [$handler],
] $($($rest)*)?);
};
(@impl $struct_name:ident, [$storage_ty:ty] $($handlers:tt)*) => {
$crate::watchers!(@collect_handlers $struct_name, [$storage_ty] [] $($handlers)*);
};
(@collect_handlers $struct_name:ident, [$storage_ty:ty] [$($grouped:tt)*]) => {
$crate::watchers!(@generate_all $struct_name, [$storage_ty] $($grouped)*);
};
(@collect_handlers $struct_name:ident, [$storage_ty:ty] [$($grouped:tt)*] [$pk:path] => $handler:ident [$handler_path:path] [$($glob:literal),+] $(, $($rest:tt)*)?) => {
$crate::watchers!(@collect_handlers $struct_name, [$storage_ty] [
$($grouped)*
([$pk], $handler, $handler_path, [$($glob),+])
] $($($rest)*)?);
};
(@collect_handlers $struct_name:ident, [$storage_ty:ty] [$($grouped:tt)*] [$pk:path] => $handler:ident [$handler_path:path] $(, $($rest:tt)*)?) => {
$crate::watchers!(@collect_handlers $struct_name, [$storage_ty] [
$($grouped)*
([$pk], $handler, $handler_path)
] $($($rest)*)?);
};
(@generate_all $struct_name:ident, [$storage_ty:ty] $(([$pk:path], $handler:ident, $handler_path:path $(, [$($glob:literal),+])?)) *) => {
paste::paste! {
struct [<$struct_name HandlerMeta>] {
task_id: $crate::Ident,
pk: $crate::Ident,
sk_globset: Option<globset::GlobSet>,
}
struct [<$struct_name Meta>] {
$(
[<$handler:snake>]: [<$struct_name HandlerMeta>],
)*
}
static [<$struct_name:upper _META>]: std::sync::LazyLock<[<$struct_name Meta>]> = std::sync::LazyLock::new(|| {
[<$struct_name Meta>] {
$(
[<$handler:snake>]: [<$struct_name HandlerMeta>] {
task_id: $crate::Ident::new(concat!("watcher:", stringify!($pk), ":", stringify!($handler))),
pk: $pk,
sk_globset: $crate::watchers!(@build_globset $handler, $([$($glob),+])?),
},
)*
}
});
impl $crate::scheduler::key_watcher::KeyWatcher<$storage_ty, $struct_name> for $struct_name
{
fn dispatch_watcher<F>(
pk: $crate::Ident,
updated_sks: Vec<String>,
deleted_sks: Vec<String>,
spawn: F,
)
where
F: Fn(
$crate::Ident,
Vec<String>,
Vec<String>,
for<'a> fn(
&'a mut $crate::scheduler::task::TaskContext<$storage_ty, $struct_name>,
&'a mut $crate::database::PartitionWriteContextRef<'a, $storage_ty>)
-> std::pin::Pin<std::boxed::Box<dyn std::future::Future<Output = $crate::scheduler::key_watcher::WatcherResult<$storage_ty, $struct_name>> + std::marker::Send + 'a>>),
{
$(
if pk == [<$struct_name:upper _META>].[<$handler:snake>].pk {
$crate::watchers!(@spawn_filtered $struct_name, pk, updated_sks, deleted_sks, spawn, $handler, $handler_path $(, [$($glob),+])?);
}
)*
}
}
}
};
(@spawn_filtered $struct_name:ty, $pk:ident, $updated_sks:ident, $deleted_sks:ident, $spawn:ident, $handler:ident, $handler_path:path) => {
paste::paste! {
if !$updated_sks.is_empty() || !$deleted_sks.is_empty() {
let meta = &[<$struct_name:upper _META>].[<$handler:snake>];
$spawn(meta.pk.clone(), $updated_sks.clone(), $deleted_sks.clone(), $handler_path);
}
}
};
(@spawn_filtered $struct_name:ty, $pk:ident, $updated_sks:ident, $deleted_sks:ident, $spawn:ident, $handler:ident, $handler_path:path, [$($glob:literal),+]) => {
paste::paste! {
{
let meta = &[<$struct_name:upper _META>].[<$handler:snake>];
let Some(glob_set) = meta.sk_globset.as_ref() else {
return;
};
let mut filtered_updated = $updated_sks.clone();
let mut filtered_deleted = $deleted_sks.clone();
filtered_updated.retain(|sk| glob_set.is_match(sk));
filtered_deleted.retain(|sk| glob_set.is_match(sk));
if !filtered_updated.is_empty() || !filtered_deleted.is_empty() {
$spawn(meta.pk.clone(), filtered_updated, filtered_deleted, $handler_path);
}
}
}
};
(@build_globset $handler:ident,) => { None };
(@build_globset $handler:ident, [$($glob:literal),+]) => {
{
let mut builder = globset::GlobSetBuilder::new();
$(
match globset::Glob::new($glob) {
| Ok(glob) => { builder.add(glob); },
| Err(e) => {
eprintln!("Warning: Invalid glob pattern `{}` in handler `{}`: {}", $glob, stringify!($handler), e);
}
}
)+
match builder.build() {
| Ok(globset) => Some(globset),
| Err(e) => {
eprintln!("Warning: Failed to build GlobSet for handler `{}`: {}", stringify!($handler), e);
None
}
}
}
};
(@make_globset) => { None };
(@make_globset $($glob:literal),+) => {
{
let mut builder = globset::GlobSetBuilder::new();
$(
match globset::Glob::new($glob) {
| Ok(glob) => { builder.add(glob); },
| Err(e) => {
eprintln!("Warning: Invalid glob pattern `{}`: {}", $glob, e);
}
}
)+
match builder.build() {
| Ok(globset) => Some(globset),
| Err(e) => {
eprintln!("Warning: Failed to build GlobSet: {}", e);
None
}
}
}
};
}