use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use tokio::time::Instant;
pub const DEFAULT_KEEPALIVE_WINDOW: Duration = Duration::from_secs(10);
#[derive(Debug, Clone)]
pub struct KeepAlive {
inner: Arc<Mutex<KeepAliveInner>>,
}
#[derive(Debug)]
struct KeepAliveInner {
last_frame_at: Instant,
window: Duration,
}
impl KeepAlive {
pub fn new(window: Duration) -> Self {
Self {
inner: Arc::new(Mutex::new(KeepAliveInner {
last_frame_at: Instant::now(),
window,
})),
}
}
pub async fn window(&self) -> Duration {
self.inner.lock().await.window
}
pub async fn notify_frame(&self) {
let mut g = self.inner.lock().await;
g.last_frame_at = Instant::now();
}
pub async fn is_silenced(&self) -> bool {
let g = self.inner.lock().await;
g.last_frame_at.elapsed() >= g.window
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AgentSilencedSignal {
pub cell_id: String,
pub run_id: String,
pub keepalive_window_ms: u64,
pub elapsed_ms: u64,
}
impl AgentSilencedSignal {
pub const CLOUDEVENT_TYPE: &'static str =
"dev.cellos.events.cell.observability.v1.guest.agent_silenced";
pub fn to_cloudevent_stub(&self) -> serde_json::Value {
serde_json::json!({
"cellId": self.cell_id,
"runId": self.run_id,
"keepaliveWindowMs": self.keepalive_window_ms,
"elapsedMs": self.elapsed_ms,
"epistemicStatus": "DECLARED",
"ruleClass": "GUEST_AGENT_DECLARATION",
"_todo": "F1b: wire to cellos_core::events::observability_guest_agent_silenced_data_v1",
})
}
}
#[derive(Debug)]
pub struct AgentSilencedTrigger {
fired: tokio::sync::Mutex<bool>,
cell_id: String,
run_id: String,
keepalive_window: Duration,
}
impl AgentSilencedTrigger {
pub fn new(
cell_id: impl Into<String>,
run_id: impl Into<String>,
keepalive_window: Duration,
) -> Self {
Self {
fired: tokio::sync::Mutex::new(false),
cell_id: cell_id.into(),
run_id: run_id.into(),
keepalive_window,
}
}
pub async fn fire(&self, elapsed: Duration) -> Option<AgentSilencedSignal> {
let mut fired = self.fired.lock().await;
if *fired {
return None;
}
*fired = true;
Some(AgentSilencedSignal {
cell_id: self.cell_id.clone(),
run_id: self.run_id.clone(),
keepalive_window_ms: self.keepalive_window.as_millis() as u64,
elapsed_ms: elapsed.as_millis() as u64,
})
}
pub async fn has_fired(&self) -> bool {
*self.fired.lock().await
}
}
pub async fn watch_for_silence(
keepalive: KeepAlive,
trigger: Arc<AgentSilencedTrigger>,
poll_interval: Duration,
) -> Option<AgentSilencedSignal> {
loop {
tokio::time::sleep(poll_interval).await;
let elapsed = {
let g = keepalive.inner.lock().await;
let elapsed = g.last_frame_at.elapsed();
if elapsed < g.window {
continue;
}
elapsed
};
return trigger.fire(elapsed).await;
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test(start_paused = true)]
async fn keepalive_fresh_is_not_silenced() {
let ka = KeepAlive::new(Duration::from_millis(100));
assert!(!ka.is_silenced().await);
}
#[tokio::test(start_paused = true)]
async fn keepalive_after_window_is_silenced() {
let ka = KeepAlive::new(Duration::from_millis(100));
tokio::time::advance(Duration::from_millis(150)).await;
assert!(ka.is_silenced().await);
}
#[tokio::test(start_paused = true)]
async fn notify_frame_resets_timer() {
let ka = KeepAlive::new(Duration::from_millis(100));
tokio::time::advance(Duration::from_millis(80)).await;
ka.notify_frame().await;
tokio::time::advance(Duration::from_millis(80)).await;
assert!(!ka.is_silenced().await);
}
#[tokio::test]
async fn trigger_fires_exactly_once() {
let t = AgentSilencedTrigger::new("c", "r", Duration::from_millis(100));
let first = t.fire(Duration::from_millis(150)).await;
let second = t.fire(Duration::from_millis(160)).await;
assert!(first.is_some());
assert!(second.is_none());
let s = first.unwrap();
assert_eq!(s.cell_id, "c");
assert_eq!(s.run_id, "r");
assert_eq!(s.keepalive_window_ms, 100);
assert_eq!(s.elapsed_ms, 150);
}
#[tokio::test(start_paused = true)]
async fn watcher_fires_after_window() {
let ka = KeepAlive::new(Duration::from_millis(50));
let trigger = Arc::new(AgentSilencedTrigger::new(
"cell-1",
"run-1",
Duration::from_millis(50),
));
let watcher_handle = {
let ka = ka.clone();
let trigger = trigger.clone();
tokio::spawn(
async move { watch_for_silence(ka, trigger, Duration::from_millis(10)).await },
)
};
tokio::time::advance(Duration::from_millis(200)).await;
let result = watcher_handle.await.expect("task panicked");
assert!(result.is_some());
assert!(trigger.has_fired().await);
}
}