use crate::cascade_layer_updater::CascadeLayerUpdater;
use crate::memory_index::MemoryScope;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
use tracing::{debug, info};
#[derive(Debug, Clone)]
pub struct DebouncerConfig {
pub debounce_secs: u64,
pub max_delay_secs: u64,
}
impl Default for DebouncerConfig {
fn default() -> Self {
Self {
debounce_secs: 30, max_delay_secs: 120, }
}
}
#[derive(Debug, Clone)]
struct PendingUpdate {
dir_uri: String,
scope: MemoryScope,
owner_id: String,
first_request_at: Instant,
last_request_at: Instant,
request_count: usize,
}
pub struct LayerUpdateDebouncer {
pending: Arc<RwLock<HashMap<String, PendingUpdate>>>,
config: DebouncerConfig,
}
impl LayerUpdateDebouncer {
pub fn new(config: DebouncerConfig) -> Self {
Self {
pending: Arc::new(RwLock::new(HashMap::new())),
config,
}
}
pub async fn request_update(
&self,
dir_uri: String,
scope: MemoryScope,
owner_id: String,
) -> bool {
let mut pending = self.pending.write().await;
if let Some(existing) = pending.get_mut(&dir_uri) {
existing.last_request_at = Instant::now();
existing.request_count += 1;
debug!(
"🔀 Merged update request for {} (total: {} requests)",
dir_uri, existing.request_count
);
false
} else {
pending.insert(
dir_uri.clone(),
PendingUpdate {
dir_uri: dir_uri.clone(),
scope,
owner_id,
first_request_at: Instant::now(),
last_request_at: Instant::now(),
request_count: 1,
},
);
debug!("📝 Registered update request for {}", dir_uri);
true
}
}
pub async fn process_due_updates(&self, updater: &CascadeLayerUpdater) -> usize {
let now = Instant::now();
let debounce_threshold = Duration::from_secs(self.config.debounce_secs);
let max_delay_threshold = Duration::from_secs(self.config.max_delay_secs);
let due_updates: Vec<PendingUpdate> = {
let mut pending = self.pending.write().await;
let due_keys: Vec<String> = pending
.iter()
.filter(|(_, update)| {
let since_last = now - update.last_request_at;
let since_first = now - update.first_request_at;
since_last >= debounce_threshold || since_first >= max_delay_threshold
})
.map(|(key, _)| key.clone())
.collect();
due_keys
.into_iter()
.filter_map(|key| pending.remove(&key))
.collect()
};
let update_count = due_updates.len();
if update_count > 0 {
info!(
"🚀 Processing {} due updates (pending: {})",
update_count,
self.pending.read().await.len()
);
}
for update in due_updates {
debug!(
"⚙️ Executing merged update for {} ({} requests merged, waited {:.2}s)",
update.dir_uri,
update.request_count,
(now - update.first_request_at).as_secs_f64()
);
if let Err(e) = updater
.update_directory_layers(&update.dir_uri, &update.scope, &update.owner_id)
.await
{
tracing::error!("Failed to update layers for {}: {}", update.dir_uri, e);
}
}
update_count
}
pub async fn pending_count(&self) -> usize {
self.pending.read().await.len()
}
pub async fn flush_all(&self, updater: &CascadeLayerUpdater) -> usize {
let all_updates: Vec<PendingUpdate> = {
let mut pending = self.pending.write().await;
pending.drain().map(|(_, v)| v).collect()
};
let update_count = all_updates.len();
if update_count > 0 {
info!(
"🚀 Flushing ALL {} pending updates (shutdown mode)",
update_count
);
}
for update in all_updates {
info!(
"⚙️ Flushing update for {} ({} requests merged)",
update.dir_uri, update.request_count
);
if let Err(e) = updater
.update_directory_layers(&update.dir_uri, &update.scope, &update.owner_id)
.await
{
tracing::error!("Failed to flush layers for {}: {}", update.dir_uri, e);
}
}
update_count
}
pub async fn has_pending(&self) -> bool {
!self.pending.read().await.is_empty()
}
#[cfg(test)]
pub async fn clear(&self) {
self.pending.write().await.clear();
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_request_merge() {
let debouncer = LayerUpdateDebouncer::new(DebouncerConfig::default());
let is_new = debouncer
.request_update(
"cortex://user/test/entities".to_string(),
MemoryScope::User,
"test".to_string(),
)
.await;
assert!(is_new);
assert_eq!(debouncer.pending_count().await, 1);
let is_new = debouncer
.request_update(
"cortex://user/test/entities".to_string(),
MemoryScope::User,
"test".to_string(),
)
.await;
assert!(!is_new);
assert_eq!(debouncer.pending_count().await, 1);
let is_new = debouncer
.request_update(
"cortex://user/test/events".to_string(),
MemoryScope::User,
"test".to_string(),
)
.await;
assert!(is_new);
assert_eq!(debouncer.pending_count().await, 2);
}
#[tokio::test]
async fn test_debounce_delay() {
let config = DebouncerConfig {
debounce_secs: 0, max_delay_secs: 10,
};
let debouncer = LayerUpdateDebouncer::new(config);
debouncer
.request_update(
"cortex://user/test/entities".to_string(),
MemoryScope::User,
"test".to_string(),
)
.await;
assert_eq!(debouncer.pending_count().await, 1);
tokio::time::sleep(Duration::from_millis(10)).await;
let pending = debouncer.pending.read().await;
let update = pending.get("cortex://user/test/entities").unwrap();
let since_last = Instant::now() - update.last_request_at;
assert!(since_last >= Duration::from_secs(0));
}
}