use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tokio::sync::broadcast;
use aa_proto::assembly::common::v1::AgentId;
use aa_proto::assembly::policy::v1::{OpControlMessage, OpControlSignal};
const CHANNEL_CAPACITY: usize = 256;
#[derive(Debug, Clone)]
pub struct OpControlEnvelope {
pub agent_id: AgentId,
pub message: OpControlMessage,
}
pub struct OpControlPublisher {
tx: broadcast::Sender<OpControlEnvelope>,
sequence: AtomicU64,
}
impl OpControlPublisher {
pub fn new() -> Self {
let (tx, _) = broadcast::channel(CHANNEL_CAPACITY);
Self {
tx,
sequence: AtomicU64::new(0),
}
}
pub fn subscribe(&self) -> broadcast::Receiver<OpControlEnvelope> {
self.tx.subscribe()
}
pub fn publish(&self, agent_id: AgentId, op_id: String, signal: OpControlSignal) -> u64 {
let sequence = self.sequence.fetch_add(1, Ordering::Relaxed);
let envelope = OpControlEnvelope {
agent_id,
message: OpControlMessage {
op_id,
signal: signal as i32,
sequence,
},
};
let _ = self.tx.send(envelope);
sequence
}
pub fn subscriber_count(&self) -> usize {
self.tx.receiver_count()
}
}
impl Default for OpControlPublisher {
fn default() -> Self {
Self::new()
}
}
pub type SharedOpControlPublisher = Arc<OpControlPublisher>;
#[cfg(test)]
mod tests {
use super::*;
fn agent(id: &str) -> AgentId {
AgentId {
org_id: "org".into(),
team_id: "team".into(),
agent_id: id.into(),
}
}
#[tokio::test]
async fn publish_with_no_subscribers_succeeds_and_drops_message() {
let pub_ = OpControlPublisher::new();
let seq = pub_.publish(agent("a1"), "trace:span".into(), OpControlSignal::Pause);
assert_eq!(seq, 0);
assert_eq!(pub_.subscriber_count(), 0);
}
#[tokio::test]
async fn sequence_numbers_are_monotonically_increasing() {
let pub_ = OpControlPublisher::new();
let s0 = pub_.publish(agent("a1"), "o:0".into(), OpControlSignal::Pause);
let s1 = pub_.publish(agent("a1"), "o:1".into(), OpControlSignal::Resume);
let s2 = pub_.publish(agent("a1"), "o:2".into(), OpControlSignal::Terminate);
assert_eq!((s0, s1, s2), (0, 1, 2));
}
#[tokio::test]
async fn subscriber_receives_published_envelope() {
let pub_ = OpControlPublisher::new();
let mut rx = pub_.subscribe();
let seq = pub_.publish(agent("a1"), "trace:span".into(), OpControlSignal::Pause);
let envelope = rx.recv().await.unwrap();
assert_eq!(envelope.agent_id.agent_id, "a1");
assert_eq!(envelope.message.op_id, "trace:span");
assert_eq!(envelope.message.signal, OpControlSignal::Pause as i32);
assert_eq!(envelope.message.sequence, seq);
}
#[tokio::test]
async fn each_subscriber_receives_every_envelope_independently() {
let pub_ = OpControlPublisher::new();
let mut a = pub_.subscribe();
let mut b = pub_.subscribe();
pub_.publish(agent("a1"), "o:0".into(), OpControlSignal::Pause);
assert_eq!(a.recv().await.unwrap().message.op_id, "o:0");
assert_eq!(b.recv().await.unwrap().message.op_id, "o:0");
}
#[tokio::test]
async fn subscriber_count_tracks_active_receivers() {
let pub_ = OpControlPublisher::new();
assert_eq!(pub_.subscriber_count(), 0);
let r1 = pub_.subscribe();
assert_eq!(pub_.subscriber_count(), 1);
let r2 = pub_.subscribe();
assert_eq!(pub_.subscriber_count(), 2);
drop(r1);
assert_eq!(pub_.subscriber_count(), 1);
drop(r2);
assert_eq!(pub_.subscriber_count(), 0);
}
}