use crate::models::node::NodeState;
use crate::storage::node_store::NodeStore;
use crate::sync::DataSyncBackend;
use crate::{Error, Result};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::Mutex;
use tracing::{debug, info, instrument, warn};
pub struct ThrottledNodeStore<B: DataSyncBackend> {
inner: NodeStore<B>,
pending_updates: Arc<Mutex<HashMap<String, NodeState>>>,
last_sync: Arc<Mutex<Instant>>,
sync_interval: Duration,
}
impl<B: DataSyncBackend> ThrottledNodeStore<B> {
pub fn new(store: NodeStore<B>, sync_interval: Duration) -> Self {
Self {
inner: store,
pending_updates: Arc::new(Mutex::new(HashMap::new())),
last_sync: Arc::new(Mutex::new(Instant::now())),
sync_interval,
}
}
#[instrument(skip(self, state))]
pub async fn update_state(&self, node_id: &str, state: &NodeState) -> Result<()> {
debug!("Queueing state update for node: {}", node_id);
{
let mut pending = self.pending_updates.lock().await;
pending.insert(node_id.to_string(), state.clone());
}
let should_sync = {
let last_sync = self.last_sync.lock().await;
last_sync.elapsed() >= self.sync_interval
};
if should_sync {
self.flush().await?;
}
Ok(())
}
#[instrument(skip(self))]
pub async fn flush(&self) -> Result<()> {
let mut pending = self.pending_updates.lock().await;
if pending.is_empty() {
debug!("No pending updates to flush");
return Ok(());
}
info!("Flushing {} pending state updates", pending.len());
let mut errors = Vec::new();
for (node_id, state) in pending.iter() {
if let Err(e) = self.inner.store_state(node_id, state).await {
warn!("Failed to store state for {}: {}", node_id, e);
errors.push((node_id.clone(), e));
}
}
pending.clear();
{
let mut last_sync = self.last_sync.lock().await;
*last_sync = Instant::now();
}
if !errors.is_empty() {
return Err(Error::Internal(format!(
"Failed to flush {} state updates",
errors.len()
)));
}
Ok(())
}
pub async fn pending_count(&self) -> usize {
let pending = self.pending_updates.lock().await;
pending.len()
}
pub async fn stats(&self) -> ThrottleStats {
let pending = self.pending_updates.lock().await;
let last_sync = self.last_sync.lock().await;
ThrottleStats {
pending_updates: pending.len(),
time_since_last_sync: last_sync.elapsed(),
sync_interval: self.sync_interval,
should_sync_now: last_sync.elapsed() >= self.sync_interval,
}
}
pub fn inner(&self) -> &NodeStore<B> {
&self.inner
}
}
#[derive(Debug, Clone)]
pub struct ThrottleStats {
pub pending_updates: usize,
pub time_since_last_sync: Duration,
pub sync_interval: Duration,
pub should_sync_now: bool,
}