use {
crate::{
database::{
RecordKey,
chunk::Chunk,
PartitionKey,
PartitionWriteContextRef,
Partitions,
},
hash::Ident,
protocol::lsp::LanguageServer,
scheduler::{lanes::Lane, task::TaskContext},
},
globset::{
Glob,
GlobSet,
GlobSetBuilder,
},
std::{
future::Future,
pin::Pin,
},
};
#[macro_use]
mod watchers_macro;
pub(crate) fn dispatch_builtin_watcher<P, T, F>(
pk: Ident,
updated_sks: Vec<String>,
deleted_sks: Vec<String>,
spawn: F,
) where
P: Partitions,
T: crate::protocol::lsp::LanguageServer<P>,
F: Fn(
Ident,
Vec<String>,
Vec<String>,
for<'a> fn(
&'a mut TaskContext<P, T>,
&'a mut PartitionWriteContextRef<'a, P>,
) -> Pin<Box<dyn Future<Output = WatcherResult<P, T>> + Send + 'a>>,
),
{
if pk == crate::partitions::Diagnostics::KEY
&& (!updated_sks.is_empty() || !deleted_sks.is_empty())
{
spawn(
pk,
updated_sks.clone(),
deleted_sks.clone(),
crate::builtin_watchers::diagnostic_watcher,
);
}
if pk == crate::partitions::WorkDoneProgress::KEY
&& (!updated_sks.is_empty() || !deleted_sks.is_empty())
{
spawn(
pk,
updated_sks,
deleted_sks,
crate::progress::work_done_progress_watcher,
);
}
}
type FollowUpTaskFn<P, T> = Box<
dyn for<'a> FnOnce(
&'a mut TaskContext<P, T>,
&'a mut PartitionWriteContextRef<'a, P>,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>>
+ Send,
>;
pub struct FollowUpTask<P: Partitions, T: LanguageServer<P>> {
pub(crate) task_fn: FollowUpTaskFn<P, T>,
pub(crate) lane: Lane,
}
pub struct WatcherResult<P: Partitions, T: LanguageServer<P>> {
pub(crate) follow_ups: Vec<FollowUpTask<P, T>>,
}
impl<P: Partitions, T: LanguageServer<P>> WatcherResult<P, T> {
pub fn empty() -> Self {
Self {
follow_ups: Vec::new(),
}
}
pub fn with_task<F>(task_fn: F, lane: Lane) -> Self
where
F: for<'a> FnOnce(
&'a mut TaskContext<P, T>,
&'a mut PartitionWriteContextRef<'a, P>,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>>
+ Send
+ 'static,
{
Self {
follow_ups: vec![FollowUpTask {
task_fn: Box::new(task_fn),
lane,
}],
}
}
pub fn push_task<F>(&mut self, task_fn: F, lane: Lane)
where
F: for<'a> FnOnce(
&'a mut TaskContext<P, T>,
&'a mut PartitionWriteContextRef<'a, P>,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>>
+ Send
+ 'static,
{
self.follow_ups.push(FollowUpTask {
task_fn: Box::new(task_fn),
lane,
});
}
}
impl<P: Partitions, T: LanguageServer<P>> From<()> for WatcherResult<P, T> {
fn from(_: ()) -> Self {
Self::empty()
}
}
pub type WatcherHandlerFn<P, T> =
for<'a> fn(
&'a mut TaskContext<P, T>,
&'a mut PartitionWriteContextRef<'a, P>,
) -> Pin<Box<dyn Future<Output = WatcherResult<P, T>> + Send + 'a>>;
pub struct WatcherHandler<P, T>
where
P: Partitions,
T: LanguageServer<P>,
{
pub handler_fn: WatcherHandlerFn<P, T>,
pub glob_set: Option<GlobSet>,
}
pub trait KeyWatcher<P, T>: Send + Sync + 'static
where
P: crate::database::storage::Partitions,
T: crate::protocol::lsp::LanguageServer<P>,
{
fn dispatch_watcher<F>(
pk: Ident,
updated_sks: Vec<String>,
deleted_sks: Vec<String>,
spawn: F,
) where
F: Fn(
Ident,
Vec<String>,
Vec<String>,
for<'a> fn(
&'a mut TaskContext<P, T>,
&'a mut PartitionWriteContextRef<'a, P>,
) -> Pin<Box<dyn Future<Output = WatcherResult<P, T>> + Send + 'a>>,
);
}
pub struct WatcherConfig<P: Partitions, H = ()> {
pub(crate) rules: Vec<WatchRule<P, H>>,
}
impl<P: Partitions, H: Clone> WatcherConfig<P, H> {
pub fn empty() -> Self {
Self { rules: Vec::new() }
}
pub fn add_matcher_enum(
&mut self,
patterns: Vec<PatternSpec>,
handler: H,
) -> Result<(), globset::Error> {
for pattern in patterns {
let (pk_ident, sk_globset, task_id) = match pattern {
| PatternSpec::PkOnly(pk_ident) => {
let task_id = Ident::new(&format!("watcher:{}", pk_ident));
(pk_ident, None, task_id)
},
| PatternSpec::PkAndSk(pk_ident, sk_pattern) => {
let mut builder = GlobSetBuilder::new();
builder.add(Glob::new(&sk_pattern)?);
let sk_globset = Some(builder.build()?);
let task_id =
Ident::new(&format!("watcher:{}:{}", pk_ident, sk_pattern));
(pk_ident, sk_globset, task_id)
},
};
let rule = WatchRule {
pk_ident,
sk_globset,
handler: handler.clone(),
task_id,
_phantom: std::marker::PhantomData,
};
self.rules.push(rule);
}
Ok(())
}
}
pub struct WatchRule<P: Partitions, H = ()> {
pub(crate) pk_ident: Ident,
pub(crate) sk_globset: Option<GlobSet>,
#[allow(dead_code)]
pub(crate) handler: H,
pub(crate) task_id: Ident,
_phantom: std::marker::PhantomData<P>,
}
impl<P: Partitions, H> WatchRule<P, H> {
pub fn match_keys(&self, keys: &[RecordKey]) -> Vec<RecordKey> {
let mut matched = Vec::new();
for key in keys {
if key.partition_key() != self.pk_ident {
continue;
}
let sk_matches = if let Some(ref globset) = self.sk_globset {
globset.is_match(key.sort_key())
} else {
true
};
if sk_matches {
matched.push(key.clone());
}
}
matched
}
pub fn task_id(&self) -> Ident {
self.task_id
}
}
pub enum PatternSpec {
PkOnly(Ident),
PkAndSk(Ident, String),
}
#[cfg_attr(not(test), allow(dead_code))]
pub(crate) fn extract_chunk_keys<P: Partitions>(
chunk: &Chunk<P>,
) -> Vec<RecordKey> {
let mut keys = Vec::new();
for (partition_key, sort_map) in chunk.records() {
for sort_key in sort_map.keys() {
keys.push(RecordKey::new(*partition_key, sort_key.clone()));
}
}
keys
}
#[cfg_attr(not(test), allow(dead_code))]
pub(crate) fn compute_key_changes(
new_keys: &[RecordKey],
old_keys: &[RecordKey],
) -> (Vec<RecordKey>, Vec<RecordKey>) {
use std::collections::HashSet;
let new_set: HashSet<_> = new_keys.iter().collect();
let old_set: HashSet<_> = old_keys.iter().collect();
let updated: Vec<_> =
new_set.difference(&old_set).map(|k| (*k).clone()).collect();
let deleted: Vec<_> =
old_set.difference(&new_set).map(|k| (*k).clone()).collect();
(updated, deleted)
}
#[cfg(test)]
mod tests {
use {
super::*,
crate::database::chunk::RecordWriter,
};
fn rk(pk: &str, sk: &str) -> RecordKey {
RecordKey::new(Ident::new(pk), sk.to_string())
}
#[test]
fn test_extract_keys_from_empty_chunk() {
use crate::database::tests::storage::TestPartitions;
let writer: RecordWriter<TestPartitions> =
RecordWriter::new(Ident::new("test"));
let chunk = writer.build();
let keys = extract_chunk_keys(&chunk);
assert_eq!(keys.len(), 0);
}
#[test]
fn test_extract_keys_from_chunk_with_records() {
use crate::{
database::tests::storage::{
Test1Partition,
Test2Partition,
TestPartitions,
TestRecordData,
},
record::LaburnumRecord,
};
let mut writer: RecordWriter<TestPartitions> =
RecordWriter::new(Ident::new("test"));
let test_record =
TestRecordData::Laburnum(LaburnumRecord::WorkspaceConfig {
value: "test".to_string(),
});
writer.insert::<Test1Partition, _>("sk1".to_string(), test_record.clone());
writer.insert::<Test1Partition, _>("sk2".to_string(), test_record.clone());
writer.insert::<Test2Partition, _>("sk3".to_string(), test_record);
let chunk = writer.build();
let keys = extract_chunk_keys(&chunk);
assert_eq!(keys.len(), 3);
assert!(keys.contains(&rk("pk1", "sk1")));
assert!(keys.contains(&rk("pk1", "sk2")));
assert!(keys.contains(&rk("pk2", "sk3")));
}
#[test]
fn test_no_previous_chunk_all_created() {
let new_keys = vec![rk("pk", "sk1")];
let old_keys = vec![];
let (updated, deleted) = compute_key_changes(&new_keys, &old_keys);
assert_eq!(updated.len(), 1);
assert_eq!(deleted.len(), 0);
}
#[test]
fn test_all_keys_deleted() {
let new_keys = vec![];
let old_keys = vec![rk("pk", "sk1")];
let (updated, deleted) = compute_key_changes(&new_keys, &old_keys);
assert_eq!(updated.len(), 0);
assert_eq!(deleted.len(), 1);
}
#[test]
fn test_partial_overlap() {
let new_keys = vec![rk("pk", "sk2"), rk("pk", "sk3")];
let old_keys = vec![rk("pk", "sk1"), rk("pk", "sk2")];
let (updated, deleted) = compute_key_changes(&new_keys, &old_keys);
assert_eq!(updated.len(), 1);
assert!(updated.contains(&rk("pk", "sk3")));
assert_eq!(deleted.len(), 1);
assert!(deleted.contains(&rk("pk", "sk1")));
}
#[test]
fn test_no_changes() {
let keys = vec![rk("pk", "sk1"), rk("pk", "sk2")];
let (updated, deleted) = compute_key_changes(&keys, &keys);
assert_eq!(updated.len(), 0);
assert_eq!(deleted.len(), 0);
}
#[test]
fn test_unchanged_keys_not_reported_as_updated() {
let keys = vec![rk("pk", "sk1"), rk("pk", "sk2")];
let (updated, deleted) = compute_key_changes(&keys, &keys);
assert_eq!(
updated.len(),
0,
"unchanged keys should not be reported as updated"
);
assert_eq!(deleted.len(), 0);
}
#[test]
fn test_only_new_keys_reported_as_updated() {
let old_keys = vec![rk("pk", "sk1")];
let new_keys = vec![rk("pk", "sk1"), rk("pk", "sk2")];
let (updated, deleted) = compute_key_changes(&new_keys, &old_keys);
assert_eq!(
updated.len(),
1,
"only the new key should be reported as updated"
);
assert!(updated.contains(&rk("pk", "sk2")));
assert_eq!(deleted.len(), 0);
}
}